| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758 | class user_util():    def get_user_id(token):        db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')        credentials_exception = HTTPException(            status_code=status.HTTP_401_UNAUTHORIZED,            detail="Could not validate credentials",            headers={"WWW-Authenticate": "Bearer"},        )        try:            payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])            username: str = payload.get("sub")            if username is None:                raise credentials_exception            token_data = models.TokenData(username=username)        except JWTError:            raise credentials_exception        user = get_user(username=token_data.username)        if user is None:            raise credentials_exception        user_id = first(db.query('SELECT * FROM users where username="' + user.username+'"'))['id']        return user_id    def check_user_exists(username):        db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')        if int(next(iter(db.query('SELECT COUNT(*) FROM AI_anchor.users WHERE username = "'+username+'"')))['COUNT(*)']) > 0:            return True        else:            return False    def get_user(username: str):        db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')        if not check_user_exists(username):  # if user don't exist            return False        user_dict = next(            iter(db.query('SELECT * FROM AI_anchor.users where username ="'+username+'"')))        user = models.User(**user_dict)        return user            def user_register(user):        db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')        table = db['users']        user.password = get_password_hash(user.password)        table.insert(dict(user))    def get_password_hash(password):        return pwd_context.hash(password)    def verify_password(plain_password, hashed_password):        return pwd_context.verify(plain_password, hashed_password)    def authenticate_user(username: str, password: str):        db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')        if not check_user_exists(username):  # if user don't exist            return False        user_dict = next(iter(db.query('SELECT * FROM AI_anchor.users where username ="'+username+'"')))        user = models.User(**user_dict)        if not verify_password(password, user.password):            return False        return user
 |