| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 | 
							- import dataset
 
- from fastapi import FastAPI,Cookie, Depends, Query, status,File, UploadFile,Request,Response,HTTPException
 
- from first import first
 
- from jose import JWTError, jwt
 
- from fastapi_jwt_auth import AuthJWT
 
- from fastapi_jwt_auth.exceptions import AuthJWTException
 
- from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
 
- import util.models
 
- from passlib.context import CryptContext
 
- import hashlib
 
- import time
 
- SECRET_KEY = "df2f77bd544240801a048bd4293afd8eeb7fff3cb7050e42c791db4b83ebadcd"
 
- ALGORITHM = "HS256"
 
- ACCESS_TOKEN_EXPIRE_DAYS = 5
 
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
 
- def get_user_id(token):
 
-     db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
 
-     credentials_exception = HTTPException(
 
-         status_code=status.HTTP_401_UNAUTHORIZED,
 
-         detail="Could not validate credentials",
 
-         headers={"WWW-Authenticate": "Bearer"},
 
-     )
 
-     try:
 
-         payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
 
-         username: str = payload.get("sub")
 
-         if username is None:
 
-             raise credentials_exception
 
-         token_data = uitl.models.TokenData(username=username)
 
-     except JWTError:
 
-         raise credentials_exception
 
-     user = get_user(username=token_data.username)
 
-     if user is None:
 
-         raise credentials_exception
 
-     user_id = first(db.query('SELECT * FROM users where username="' + user.username+'"'))['id']
 
-     db.close()
 
-     return user_id
 
- def check_user_exists( username):
 
-     db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
 
-     if int(next(iter(db.query('SELECT COUNT(*) FROM AI_anchor.users WHERE username = "'+username+'"')))['COUNT(*)']) > 0:
 
-         db.close()
 
-         return True
 
-     else:
 
-         db.close()
 
-         return False
 
- def get_user( username: str):
 
-     db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
 
-     if not check_user_exists(username):  # if user don't exist
 
-         return False
 
-     user_dict = next(
 
-         iter(db.query('SELECT * FROM users where username ="'+username+'"')))
 
-     user = util.models.User(**user_dict)
 
-     db.close()
 
-     return user
 
-     
 
- def user_register( user):
 
-     db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
 
-     table = db['users']
 
-     user.password = get_password_hash(user.password)
 
-     table.insert(dict(user))
 
-     db.close()
 
- def get_password_hash( password):
 
-     return pwd_context.hash(password)
 
- def verify_password( plain_password, hashed_password):
 
-     return pwd_context.verify(plain_password, hashed_password)
 
- def authenticate_user( username: str, password: str):
 
-     db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
 
-     if not check_user_exists(username):  # if user don't exist
 
-         db.close()
 
-         return False
 
-     user_dict = next(iter(db.query('SELECT * FROM AI_anchor.users where username ="'+username+'"')))
 
-     user = util.models.User(**user_dict)
 
-     if not verify_password(password, user.password):
 
-         db.close()
 
-         return False
 
-     return user
 
- def get_user_role(id):
 
-     db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
 
-     state = 'SELECT * FROM user_role '\
 
-     'INNER JOIN users on user_role.user_id= users.id '\
 
-     'INNER JOIN role on user_role.role_id = role.id '\
 
-     'WHERE users.id='+str(id)
 
-     role_list = []
 
-     for row in db.query(state):
 
-         role_list.append({'id':row['role_id'],'name':row['name']})
 
-     db.close()
 
-     return role_list
 
- def get_user_role_list(id):
 
-     db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
 
-     state = 'SELECT * FROM user_role '\
 
-     'INNER JOIN users on user_role.user_id= users.id '\
 
-     'INNER JOIN role on user_role.role_id = role.id '\
 
-     'WHERE users.id='+str(id)
 
-     role_list = []
 
-     for row in db.query(state):
 
-         role_list.append(row['role_id'])
 
-     db.close()
 
-     return role_list
 
- def get_avatar_by_role(id):
 
-     db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
 
-     state = 'select role_avatar.role_id, avatar_id, avatar.name,num from role_avatar '\
 
-             'INNER JOIN '\
 
-             '(SELECT role_id FROM user_role '\
 
-             'INNER JOIN users on user_role.user_id= users.id '\
 
-             'INNER JOIN role on user_role.role_id = role.id '\
 
-             'WHERE users.id='+str(id)+') a '\
 
-             'on role_avatar.role_id = a.role_id '\
 
-             'INNER JOIN avatar on role_avatar.avatar_id = avatar.id;'
 
-     role_list = []
 
-     for row in db.query(state):
 
-         role_list.append({'role_id':row['role_id'],'avatar_id':row['avatar_id'],'name':row['name'],'num':row['num']})
 
-     db.close()
 
-     return role_list
 
- #def add_role( username,role_id):
 
-     #db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
 
-     #user_role_table = db['user_role']
 
-     #user_role_table.insert({'user_id':,'role_id':role_id})
 
- def add_time_by_invite(code):
 
-     db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
 
-     id = get_id_by_code(code)
 
-     result = next(iter(db.query('SELECT * FROM users where invite_code ="'+code+'"')))
 
-     state = 'UPDATE users SET left_time=left_time+120 WHERE id="'+str(id)+'"'
 
-     db.query(state)
 
-     db.close()
 
- def get_id_by_code(code):
 
-     db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
 
-     user_dict = next(iter(db.query('SELECT * FROM users where invite_code ="'+code+'"')))
 
-     db.close()
 
-     return user_dict['id']
 
- def init_invite_code(id):
 
-     db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
 
-     sha = hashlib.sha256()
 
-     sha.update(str(time.time()).replace('.','').encode())
 
-     state = 'UPDATE users SET invite_code="'+sha.hexdigest()[:15]+'" WHERE id="'+str(id)+'"'
 
-     db.query(state)
 
-     db.close()
 
- def add_to_basic_role(id):
 
-     db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
 
-     state ='insert into user_role (user_id,role_id) values('+str(id)+',5),('+str(id)+',6); '
 
-     print(state)
 
-     code = 'ok'
 
-     try :
 
-         db.query(state)
 
-     except:
 
-         code = 'not ok'
 
-     return code
 
- def get_user_id(token):
 
-     db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
 
-     credentials_exception = HTTPException(
 
-         status_code=status.HTTP_401_UNAUTHORIZED,
 
-         detail="Could not validate credentials",
 
-         headers={"WWW-Authenticate": "Bearer"},
 
-     )
 
-     try:
 
-         payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
 
-         username: str = payload.get("sub")
 
-         if username is None:
 
-             raise credentials_exception
 
-         token_data = util.models.TokenData(username=username)
 
-     except JWTError:
 
-         db.close()
 
-         raise credentials_exception
 
-     user = get_user(username=token_data.username)
 
-     if user is None:
 
-         db.close()
 
-         raise credentials_exception
 
-     user_id = first(db.query('SELECT * FROM users where username="' + user.username+'"'))['id']
 
-     db.close()
 
-     return user_id
 
-     
 
- def get_id_by_email(email):
 
-     db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
 
-     user_dict = next(iter(db.query('SELECT * FROM users where email ="'+email+'"')))
 
-     db.close()
 
-     return user_dict['id']
 
- def email_veri_pass(name):
 
-     db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
 
-     user_dict = next(iter(db.query('SELECT * FROM users where username ="'+name+'"')))
 
-     user_obj = first(db.query('SELECT * FROM register_veri_code where user_id ="'+str(user_dict['id'])+'"'))
 
-     db.close()
 
-     if user_obj == None:
 
-         return True
 
-     else:
 
-         return False
 
 
  |