user.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. import dataset
  2. from fastapi import FastAPI,Cookie, Depends, Query, status,File, UploadFile,Request,Response,HTTPException
  3. from first import first
  4. from jose import JWTError, jwt
  5. from fastapi_jwt_auth import AuthJWT
  6. from fastapi_jwt_auth.exceptions import AuthJWTException
  7. from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
  8. import util.models
  9. from passlib.context import CryptContext
  10. SECRET_KEY = "df2f77bd544240801a048bd4293afd8eeb7fff3cb7050e42c791db4b83ebadcd"
  11. ALGORITHM = "HS256"
  12. ACCESS_TOKEN_EXPIRE_DAYS = 5
  13. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  14. def get_user_id(token):
  15. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  16. credentials_exception = HTTPException(
  17. status_code=status.HTTP_401_UNAUTHORIZED,
  18. detail="Could not validate credentials",
  19. headers={"WWW-Authenticate": "Bearer"},
  20. )
  21. try:
  22. payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
  23. username: str = payload.get("sub")
  24. if username is None:
  25. raise credentials_exception
  26. token_data = uitl.models.TokenData(username=username)
  27. except JWTError:
  28. raise credentials_exception
  29. user = get_user(username=token_data.username)
  30. if user is None:
  31. raise credentials_exception
  32. user_id = first(db.query('SELECT * FROM users where username="' + user.username+'"'))['id']
  33. db.close()
  34. return user_id
  35. def check_user_exists( username):
  36. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  37. if int(next(iter(db.query('SELECT COUNT(*) FROM AI_anchor.users WHERE username = "'+username+'"')))['COUNT(*)']) > 0:
  38. db.close()
  39. return True
  40. else:
  41. db.close()
  42. return False
  43. def get_user( username: str):
  44. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  45. if not check_user_exists(username): # if user don't exist
  46. return False
  47. user_dict = next(
  48. iter(db.query('SELECT * FROM users where username ="'+username+'"')))
  49. user = util.models.User(**user_dict)
  50. db.close()
  51. return user
  52. def user_register( user):
  53. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  54. table = db['users']
  55. user.password = get_password_hash(user.password)
  56. table.insert(dict(user))
  57. db.close()
  58. def get_password_hash( password):
  59. return pwd_context.hash(password)
  60. def verify_password( plain_password, hashed_password):
  61. return pwd_context.verify(plain_password, hashed_password)
  62. def authenticate_user( username: str, password: str):
  63. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  64. if not check_user_exists(username): # if user don't exist
  65. db.close()
  66. return False
  67. user_dict = next(iter(db.query('SELECT * FROM AI_anchor.users where username ="'+username+'"')))
  68. user = util.models.User(**user_dict)
  69. if not verify_password(password, user.password):
  70. db.close()
  71. return False
  72. return user
  73. def get_user_role(id):
  74. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  75. state = 'SELECT * FROM user_role '\
  76. 'INNER JOIN users on user_role.user_id= users.id '\
  77. 'INNER JOIN role on user_role.role_id = role.id '\
  78. 'WHERE users.id='+str(id)
  79. role_list = []
  80. for row in db.query(state):
  81. role_list.append({'id':row['role_id'],'name':row['name']})
  82. db.close()
  83. return role_list
  84. def get_avatar_by_role(id):
  85. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  86. state = 'select role_avatar.role_id, avatar_id, avatar.name,num from role_avatar '\
  87. 'INNER JOIN '\
  88. '(SELECT role_id FROM user_role '\
  89. 'INNER JOIN users on user_role.user_id= users.id '\
  90. 'INNER JOIN role on user_role.role_id = role.id '\
  91. 'WHERE users.id='+str(id)+') a '\
  92. 'on role_avatar.role_id = a.role_id '\
  93. 'INNER JOIN avatar on role_avatar.avatar_id = avatar.id;'
  94. role_list = []
  95. for row in db.query(state):
  96. role_list.append({'role_id':row['role_id'],'avatar_id':row['avatar_id'],'name':row['name'],'num':row['num']})
  97. db.close()
  98. return role_list
  99. #def add_role( username,role_id):
  100. #db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  101. #user_role_table = db['user_role']
  102. #user_role_table.insert({'user_id':,'role_id':role_id})
  103. def add_to_basic_role(id):
  104. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  105. state ='insert into user_role (user_id,role_id) values('+str(id)+',5) '
  106. print(state)
  107. code = 'ok'
  108. try :
  109. db.query(state)
  110. except:
  111. code = 'not ok'
  112. return code
  113. def get_user_id(token):
  114. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  115. credentials_exception = HTTPException(
  116. status_code=status.HTTP_401_UNAUTHORIZED,
  117. detail="Could not validate credentials",
  118. headers={"WWW-Authenticate": "Bearer"},
  119. )
  120. try:
  121. payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
  122. username: str = payload.get("sub")
  123. if username is None:
  124. raise credentials_exception
  125. token_data = util.models.TokenData(username=username)
  126. except JWTError:
  127. db.close()
  128. raise credentials_exception
  129. user = get_user(username=token_data.username)
  130. if user is None:
  131. db.close()
  132. raise credentials_exception
  133. user_id = first(db.query('SELECT * FROM users where username="' + user.username+'"'))['id']
  134. db.close()
  135. return user_id
  136. def get_id_by_email(email):
  137. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  138. user_dict = next(iter(db.query('SELECT * FROM users where email ="'+email+'"')))
  139. db.close()
  140. return user_dict['id']
  141. def email_veri_pass(name):
  142. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  143. user_dict = next(iter(db.query('SELECT * FROM users where username ="'+name+'"')))
  144. user_obj = first(db.query('SELECT * FROM register_veri_code where user_id ="'+str(user_dict['id'])+'"'))
  145. db.close()
  146. if user_obj == None:
  147. return True
  148. else:
  149. return False