user.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. def get_user_id(token):
  2. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  3. credentials_exception = HTTPException(
  4. status_code=status.HTTP_401_UNAUTHORIZED,
  5. detail="Could not validate credentials",
  6. headers={"WWW-Authenticate": "Bearer"},
  7. )
  8. try:
  9. payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
  10. username: str = payload.get("sub")
  11. if username is None:
  12. raise credentials_exception
  13. token_data = models.TokenData(username=username)
  14. except JWTError:
  15. raise credentials_exception
  16. user = get_user(username=token_data.username)
  17. if user is None:
  18. raise credentials_exception
  19. user_id = first(db.query('SELECT * FROM users where username="' + user.username+'"'))['id']
  20. return user_id
  21. def check_user_exists( username):
  22. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  23. if int(next(iter(db.query('SELECT COUNT(*) FROM AI_anchor.users WHERE username = "'+username+'"')))['COUNT(*)']) > 0:
  24. return True
  25. else:
  26. return False
  27. def get_user( username: str):
  28. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  29. if not check_user_exists(username): # if user don't exist
  30. return False
  31. user_dict = next(
  32. iter(db.query('SELECT * FROM AI_anchor.users where username ="'+username+'"')))
  33. user = models.User(**user_dict)
  34. return user
  35. def user_register( user):
  36. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  37. table = db['users']
  38. user.password = get_password_hash(user.password)
  39. table.insert(dict(user))
  40. def get_password_hash( password):
  41. return pwd_context.hash(password)
  42. def verify_password( plain_password, hashed_password):
  43. return pwd_context.verify(plain_password, hashed_password)
  44. def authenticate_user( username: str, password: str):
  45. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  46. if not check_user_exists(username): # if user don't exist
  47. return False
  48. user_dict = next(iter(db.query('SELECT * FROM AI_anchor.users where username ="'+username+'"')))
  49. user = models.User(**user_dict)
  50. if not verify_password(password, user.password):
  51. return False
  52. return user
  53. def get_roles( username):
  54. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  55. state = 'SELECT * FROM user_role '\
  56. 'INNER JOIN users on user_role.user_id= users.id'\
  57. 'INNER JOIN role on user_role.role_id = role.id '\
  58. 'WHERE username=username'
  59. role_list = []
  60. for row in db.query(statement):
  61. role_list.append({'id':row['role_id'],'name':row['name']})
  62. return role_list
  63. def add_role( username,role_id):
  64. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  65. user_role_table = db['user_role']
  66. user_role_table.insert({'user_id':,'role_id':role_id})
  67. def get_user_id( username):
  68. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  69. return str(first(db.query('SELECT COUNT(*) FROM history_input WHERE user_id ='+str(user_obj['id'])))['COUNT(*)'])