user.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. class user_util():
  2. def get_user_id(self, token):
  3. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  4. credentials_exception = HTTPException(
  5. status_code=status.HTTP_401_UNAUTHORIZED,
  6. detail="Could not validate credentials",
  7. headers={"WWW-Authenticate": "Bearer"},
  8. )
  9. try:
  10. payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
  11. username: str = payload.get("sub")
  12. if username is None:
  13. raise credentials_exception
  14. token_data = models.TokenData(username=username)
  15. except JWTError:
  16. raise credentials_exception
  17. user = get_user(username=token_data.username)
  18. if user is None:
  19. raise credentials_exception
  20. user_id = first(db.query('SELECT * FROM users where username="' + user.username+'"'))['id']
  21. return user_id
  22. def check_user_exists(self, username):
  23. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  24. if int(next(iter(db.query('SELECT COUNT(*) FROM AI_anchor.users WHERE username = "'+username+'"')))['COUNT(*)']) > 0:
  25. return True
  26. else:
  27. return False
  28. def get_user(self, username: str):
  29. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  30. if not check_user_exists(username): # if user don't exist
  31. return False
  32. user_dict = next(
  33. iter(db.query('SELECT * FROM AI_anchor.users where username ="'+username+'"')))
  34. user = models.User(**user_dict)
  35. return user
  36. def user_register(self, user):
  37. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  38. table = db['users']
  39. user.password = get_password_hash(user.password)
  40. table.insert(dict(user))
  41. def get_password_hash(self, password):
  42. return pwd_context.hash(password)
  43. def verify_password(self, plain_password, hashed_password):
  44. return pwd_context.verify(plain_password, hashed_password)
  45. def authenticate_user(self, username: str, password: str):
  46. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  47. if not check_user_exists(username): # if user don't exist
  48. return False
  49. user_dict = next(iter(db.query('SELECT * FROM AI_anchor.users where username ="'+username+'"')))
  50. user = models.User(**user_dict)
  51. if not verify_password(password, user.password):
  52. return False
  53. return user
  54. def get_roles(self, username):
  55. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  56. state = 'SELECT * FROM user_role '\
  57. 'INNER JOIN users on user_role.user_id= users.id'\
  58. 'INNER JOIN role on user_role.role_id = role.id '\
  59. 'WHERE username=username'
  60. role_list = []
  61. for row in db.query(statement):
  62. role_list.append({'id':row['role_id'],'name':row['name']})
  63. return role_list
  64. def add_role(self, username,role_id):
  65. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  66. user_role_table = db['user_role']
  67. user_role_table.insert({'user_id':,'role_id':role_id})
  68. def get_user_id(self, username):
  69. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  70. return str(first(db.query('SELECT COUNT(*) FROM history_input WHERE user_id ='+str(user_obj['id'])))['COUNT(*)'])