import dataset from fastapi import FastAPI,Cookie, Depends, Query, status,File, UploadFile,Request,Response,HTTPException from first import first from jose import JWTError, jwt from fastapi_jwt_auth import AuthJWT from fastapi_jwt_auth.exceptions import AuthJWTException from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm import util.models from passlib.context import CryptContext import hashlib import time SECRET_KEY = "df2f77bd544240801a048bd4293afd8eeb7fff3cb7050e42c791db4b83ebadcd" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_DAYS = 5 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def get_user_id(token): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = uitl.models.TokenData(username=username) except JWTError: raise credentials_exception user = get_user(username=token_data.username) if user is None: raise credentials_exception user_id = first(db.query('SELECT * FROM users where username="' + user.username+'"'))['id'] db.close() return user_id def check_user_exists( username): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') if int(next(iter(db.query('SELECT COUNT(*) FROM AI_anchor.users WHERE username = "'+username+'"')))['COUNT(*)']) > 0: db.close() return True else: db.close() return False def get_user( username: str): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') if not check_user_exists(username): # if user don't exist return False user_dict = next( iter(db.query('SELECT * FROM users where username ="'+username+'"'))) user = util.models.User(**user_dict) db.close() return user def user_register( user): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') table = db['users'] user.password = get_password_hash(user.password) table.insert(dict(user)) db.close() def get_password_hash( password): return pwd_context.hash(password) def verify_password( plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def authenticate_user( username: str, password: str): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') if not check_user_exists(username): # if user don't exist db.close() return False user_dict = next(iter(db.query('SELECT * FROM AI_anchor.users where username ="'+username+'"'))) user = util.models.User(**user_dict) if not verify_password(password, user.password): db.close() return False return user def get_user_role(id): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') state = 'SELECT * FROM user_role '\ 'INNER JOIN users on user_role.user_id= users.id '\ 'INNER JOIN role on user_role.role_id = role.id '\ 'WHERE users.id='+str(id) role_list = [] for row in db.query(state): role_list.append({'id':row['role_id'],'name':row['name']}) db.close() return role_list def get_user_role_list(id): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') state = 'SELECT * FROM user_role '\ 'INNER JOIN users on user_role.user_id= users.id '\ 'INNER JOIN role on user_role.role_id = role.id '\ 'WHERE users.id='+str(id) role_list = [] for row in db.query(state): role_list.append(row['role_id']) db.close() return role_list def get_avatar_by_role(id): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') state = 'select role_avatar.role_id, avatar_id, avatar.name,num from role_avatar '\ 'INNER JOIN '\ '(SELECT role_id FROM user_role '\ 'INNER JOIN users on user_role.user_id= users.id '\ 'INNER JOIN role on user_role.role_id = role.id '\ 'WHERE users.id='+str(id)+') a '\ 'on role_avatar.role_id = a.role_id '\ 'INNER JOIN avatar on role_avatar.avatar_id = avatar.id;' role_list = [] for row in db.query(state): role_list.append({'role_id':row['role_id'],'avatar_id':row['avatar_id'],'name':row['name'],'num':row['num']}) db.close() return role_list #def add_role( username,role_id): #db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') #user_role_table = db['user_role'] #user_role_table.insert({'user_id':,'role_id':role_id}) def add_time_by_invite(code): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') id = get_id_by_code(code) result = next(iter(db.query('SELECT * FROM users where invite_code ="'+code+'"'))) state = 'UPDATE users SET left_time=left_time+120 WHERE id="'+str(id)+'"' db.query(state) db.close() def get_id_by_code(code): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') user_dict = next(iter(db.query('SELECT * FROM users where invite_code ="'+code+'"'))) db.close() return user_dict['id'] def init_invite_code(id): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') sha = hashlib.sha256() sha.update(str(time.time()).replace('.','').encode()) state = 'UPDATE users SET invite_code="'+sha.hexdigest()[:15]+'" WHERE id="'+str(id)+'"' db.query(state) db.close() def add_to_basic_role(id): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') state ='insert into user_role (user_id,role_id) values('+str(id)+',5),('+str(id)+',6); ' print(state) code = 'ok' try : db.query(state) except: code = 'not ok' return code def get_user_id(token): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = util.models.TokenData(username=username) except JWTError: db.close() raise credentials_exception user = get_user(username=token_data.username) if user is None: db.close() raise credentials_exception user_id = first(db.query('SELECT * FROM users where username="' + user.username+'"'))['id'] db.close() return user_id def get_id_by_email(email): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') user_dict = next(iter(db.query('SELECT * FROM users where email ="'+email+'"'))) db.close() return user_dict['id'] def email_veri_pass(name): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') user_dict = next(iter(db.query('SELECT * FROM users where username ="'+name+'"'))) user_obj = first(db.query('SELECT * FROM register_veri_code where user_id ="'+str(user_dict['id'])+'"')) db.close() if user_obj == None: return True else: return False