user.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. import dataset
  2. from fastapi import FastAPI,Cookie, Depends, Query, status,File, UploadFile,Request,Response,HTTPException
  3. from first import first
  4. from jose import JWTError, jwt
  5. from fastapi_jwt_auth import AuthJWT
  6. from fastapi_jwt_auth.exceptions import AuthJWTException
  7. from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
  8. import util.models
  9. from passlib.context import CryptContext
  10. import hashlib
  11. import time
  12. SECRET_KEY = "df2f77bd544240801a048bd4293afd8eeb7fff3cb7050e42c791db4b83ebadcd"
  13. ALGORITHM = "HS256"
  14. ACCESS_TOKEN_EXPIRE_DAYS = 5
  15. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  16. def get_user_id(token):
  17. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  18. credentials_exception = HTTPException(
  19. status_code=status.HTTP_401_UNAUTHORIZED,
  20. detail="Could not validate credentials",
  21. headers={"WWW-Authenticate": "Bearer"},
  22. )
  23. try:
  24. payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
  25. username: str = payload.get("sub")
  26. if username is None:
  27. raise credentials_exception
  28. token_data = uitl.models.TokenData(username=username)
  29. except JWTError:
  30. raise credentials_exception
  31. user = get_user(username=token_data.username)
  32. if user is None:
  33. raise credentials_exception
  34. user_id = first(db.query('SELECT * FROM users where username="' + user.username+'"'))['id']
  35. db.close()
  36. return user_id
  37. def check_user_exists( username):
  38. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  39. if int(next(iter(db.query('SELECT COUNT(*) FROM AI_anchor.users WHERE username = "'+username+'"')))['COUNT(*)']) > 0:
  40. db.close()
  41. return True
  42. else:
  43. db.close()
  44. return False
  45. def get_user( username: str):
  46. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  47. if not check_user_exists(username): # if user don't exist
  48. return False
  49. user_dict = next(
  50. iter(db.query('SELECT * FROM users where username ="'+username+'"')))
  51. user = util.models.User(**user_dict)
  52. db.close()
  53. return user
  54. def user_register( user):
  55. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  56. table = db['users']
  57. user.password = get_password_hash(user.password)
  58. table.insert(dict(user))
  59. db.close()
  60. def get_password_hash( password):
  61. return pwd_context.hash(password)
  62. def verify_password( plain_password, hashed_password):
  63. return pwd_context.verify(plain_password, hashed_password)
  64. def authenticate_user( username: str, password: str):
  65. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  66. if not check_user_exists(username): # if user don't exist
  67. db.close()
  68. return False
  69. user_dict = next(iter(db.query('SELECT * FROM AI_anchor.users where username ="'+username+'"')))
  70. user = util.models.User(**user_dict)
  71. if not verify_password(password, user.password):
  72. db.close()
  73. return False
  74. return user
  75. def get_user_role(id):
  76. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  77. state = 'SELECT * FROM user_role '\
  78. 'INNER JOIN users on user_role.user_id= users.id '\
  79. 'INNER JOIN role on user_role.role_id = role.id '\
  80. 'WHERE users.id='+str(id)
  81. role_list = []
  82. for row in db.query(state):
  83. role_list.append({'id':row['role_id'],'name':row['name']})
  84. db.close()
  85. return role_list
  86. def get_user_role_list(id):
  87. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  88. state = 'SELECT * FROM user_role '\
  89. 'INNER JOIN users on user_role.user_id= users.id '\
  90. 'INNER JOIN role on user_role.role_id = role.id '\
  91. 'WHERE users.id='+str(id)
  92. role_list = []
  93. for row in db.query(state):
  94. role_list.append(row['role_id'])
  95. db.close()
  96. return role_list
  97. def get_avatar_by_role(id):
  98. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  99. state = 'select role_avatar.role_id, avatar_id, avatar.name,num from role_avatar '\
  100. 'INNER JOIN '\
  101. '(SELECT role_id FROM user_role '\
  102. 'INNER JOIN users on user_role.user_id= users.id '\
  103. 'INNER JOIN role on user_role.role_id = role.id '\
  104. 'WHERE users.id='+str(id)+') a '\
  105. 'on role_avatar.role_id = a.role_id '\
  106. 'INNER JOIN avatar on role_avatar.avatar_id = avatar.id;'
  107. role_list = []
  108. for row in db.query(state):
  109. role_list.append({'role_id':row['role_id'],'avatar_id':row['avatar_id'],'name':row['name'],'num':row['num']})
  110. db.close()
  111. return role_list
  112. #def add_role( username,role_id):
  113. #db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  114. #user_role_table = db['user_role']
  115. #user_role_table.insert({'user_id':,'role_id':role_id})
  116. def add_time_by_invite(code):
  117. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  118. id = get_id_by_code(code)
  119. result = next(iter(db.query('SELECT * FROM users where invite_code ="'+code+'"')))
  120. state = 'UPDATE users SET left_time=left_time+120 WHERE id="'+str(id)+'"'
  121. db.query(state)
  122. db.close()
  123. def get_id_by_code(code):
  124. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  125. user_dict = next(iter(db.query('SELECT * FROM users where invite_code ="'+code+'"')))
  126. db.close()
  127. return user_dict['id']
  128. def init_invite_code(id):
  129. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  130. sha = hashlib.sha256()
  131. sha.update(str(time.time()).replace('.','').encode())
  132. state = 'UPDATE users SET invite_code="'+sha.hexdigest()[:15]+'" WHERE id="'+str(id)+'"'
  133. db.query(state)
  134. db.close()
  135. def add_to_basic_role(id):
  136. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  137. state ='insert into user_role (user_id,role_id) values('+str(id)+',5),('+str(id)+',6); '
  138. print(state)
  139. code = 'ok'
  140. try :
  141. db.query(state)
  142. except:
  143. code = 'not ok'
  144. return code
  145. def get_user_id(token):
  146. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  147. credentials_exception = HTTPException(
  148. status_code=status.HTTP_401_UNAUTHORIZED,
  149. detail="Could not validate credentials",
  150. headers={"WWW-Authenticate": "Bearer"},
  151. )
  152. try:
  153. payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
  154. username: str = payload.get("sub")
  155. if username is None:
  156. raise credentials_exception
  157. token_data = util.models.TokenData(username=username)
  158. except JWTError:
  159. db.close()
  160. raise credentials_exception
  161. user = get_user(username=token_data.username)
  162. if user is None:
  163. db.close()
  164. raise credentials_exception
  165. user_id = first(db.query('SELECT * FROM users where username="' + user.username+'"'))['id']
  166. db.close()
  167. return user_id
  168. def get_id_by_email(email):
  169. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  170. user_dict = next(iter(db.query('SELECT * FROM users where email ="'+email+'"')))
  171. db.close()
  172. return user_dict['id']
  173. def email_veri_pass(name):
  174. db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
  175. user_dict = next(iter(db.query('SELECT * FROM users where username ="'+name+'"')))
  176. user_obj = first(db.query('SELECT * FROM register_veri_code where user_id ="'+str(user_dict['id'])+'"'))
  177. db.close()
  178. if user_obj == None:
  179. return True
  180. else:
  181. return False