user.py 3.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. import dataset
  2. from first import first
  3. def get_user_id(token):
  4. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  5. credentials_exception = HTTPException(
  6. status_code=status.HTTP_401_UNAUTHORIZED,
  7. detail="Could not validate credentials",
  8. headers={"WWW-Authenticate": "Bearer"},
  9. )
  10. try:
  11. payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
  12. username: str = payload.get("sub")
  13. if username is None:
  14. raise credentials_exception
  15. token_data = models.TokenData(username=username)
  16. except JWTError:
  17. raise credentials_exception
  18. user = get_user(username=token_data.username)
  19. if user is None:
  20. raise credentials_exception
  21. user_id = first(db.query('SELECT * FROM users where username="' + user.username+'"'))['id']
  22. return user_id
  23. def check_user_exists( username):
  24. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  25. if int(next(iter(db.query('SELECT COUNT(*) FROM AI_anchor.users WHERE username = "'+username+'"')))['COUNT(*)']) > 0:
  26. return True
  27. else:
  28. return False
  29. def get_user( username: str):
  30. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  31. if not check_user_exists(username): # if user don't exist
  32. return False
  33. user_dict = next(
  34. iter(db.query('SELECT * FROM users where username ="'+username+'"')))
  35. user = models.User(**user_dict)
  36. return user
  37. def user_register( user):
  38. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  39. table = db['users']
  40. user.password = get_password_hash(user.password)
  41. table.insert(dict(user))
  42. def get_password_hash( password):
  43. return pwd_context.hash(password)
  44. def verify_password( plain_password, hashed_password):
  45. return pwd_context.verify(plain_password, hashed_password)
  46. def authenticate_user( username: str, password: str):
  47. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  48. if not check_user_exists(username): # if user don't exist
  49. return False
  50. user_dict = next(iter(db.query('SELECT * FROM AI_anchor.users where username ="'+username+'"')))
  51. user = models.User(**user_dict)
  52. if not verify_password(password, user.password):
  53. return False
  54. return user
  55. def get_roles( username):
  56. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  57. state = 'SELECT * FROM user_role '\
  58. 'INNER JOIN users on user_role.user_id= users.id'\
  59. 'INNER JOIN role on user_role.role_id = role.id '\
  60. 'WHERE username=username'
  61. role_list = []
  62. for row in db.query(statement):
  63. role_list.append({'id':row['role_id'],'name':row['name']})
  64. return role_list
  65. #def add_role( username,role_id):
  66. #db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  67. #user_role_table = db['user_role']
  68. #user_role_table.insert({'user_id':,'role_id':role_id})
  69. def get_user_id( username):
  70. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  71. return str(first(db.query('SELECT COUNT(*) FROM history_input WHERE user_id ='+str(user_obj['id'])))['COUNT(*)'])
  72. def get_id_by_email(email):
  73. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  74. user_dict = next(iter(db.query('SELECT * FROM users where email ="'+email+'"')))
  75. return user_dict['id']
  76. def email_veri_pass(name):
  77. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  78. user_dict = next(iter(db.query('SELECT * FROM users where username ="'+name+'"')))
  79. user_obj = first(db.query('SELECT * FROM register_veri_code where user_id ="'+str(user_dict['id'])+'"'))
  80. if user_obj == None:
  81. return True
  82. else:
  83. return False