class user_util(): def get_user_id(self, token): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = models.TokenData(username=username) except JWTError: raise credentials_exception user = get_user(username=token_data.username) if user is None: raise credentials_exception user_id = first(db.query('SELECT * FROM users where username="' + user.username+'"'))['id'] return user_id def check_user_exists(self, username): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') if int(next(iter(db.query('SELECT COUNT(*) FROM AI_anchor.users WHERE username = "'+username+'"')))['COUNT(*)']) > 0: return True else: return False def get_user(self, username: str): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') if not check_user_exists(username): # if user don't exist return False user_dict = next( iter(db.query('SELECT * FROM AI_anchor.users where username ="'+username+'"'))) user = models.User(**user_dict) return user def user_register(self, user): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') table = db['users'] user.password = get_password_hash(user.password) table.insert(dict(user)) def get_password_hash(self, password): return pwd_context.hash(password) def verify_password(self, plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def authenticate_user(self, username: str, password: str): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') if not check_user_exists(username): # if user don't exist return False user_dict = next(iter(db.query('SELECT * FROM AI_anchor.users where username ="'+username+'"'))) user = models.User(**user_dict) if not verify_password(password, user.password): return False return user def get_roles(self, username): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') state = 'SELECT * FROM user_role '\ 'INNER JOIN users on user_role.user_id= users.id'\ 'INNER JOIN role on user_role.role_id = role.id '\ 'WHERE username=username' role_list = [] for row in db.query(statement): role_list.append({'id':row['role_id'],'name':row['name']}) return role_list def add_role(self, username,role_id): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') user_role_table = db['user_role'] user_role_table.insert({'user_id':,'role_id':role_id}) def get_user_id(self, username): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') return str(first(db.query('SELECT COUNT(*) FROM history_input WHERE user_id ='+str(user_obj['id'])))['COUNT(*)'])