| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 | import datasetfrom fastapi import FastAPI,Cookie, Depends, Query, status,File, UploadFile,Request,Response,HTTPExceptionfrom first import firstfrom jose import JWTError, jwtfrom fastapi_jwt_auth import AuthJWTfrom fastapi_jwt_auth.exceptions import AuthJWTExceptionfrom fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestFormimport util.modelsfrom passlib.context import CryptContextimport hashlibimport timeSECRET_KEY = "df2f77bd544240801a048bd4293afd8eeb7fff3cb7050e42c791db4b83ebadcd"ALGORITHM = "HS256"ACCESS_TOKEN_EXPIRE_DAYS = 5pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")def get_user_id(token):    db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')    credentials_exception = HTTPException(        status_code=status.HTTP_401_UNAUTHORIZED,        detail="Could not validate credentials",        headers={"WWW-Authenticate": "Bearer"},    )    try:        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])        username: str = payload.get("sub")        if username is None:            raise credentials_exception        token_data = uitl.models.TokenData(username=username)    except JWTError:        raise credentials_exception    user = get_user(username=token_data.username)    if user is None:        raise credentials_exception    user_id = first(db.query('SELECT * FROM users where username="' + user.username+'"'))['id']    db.close()    return user_iddef check_user_exists( username):    db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')    if int(next(iter(db.query('SELECT COUNT(*) FROM AI_anchor.users WHERE username = "'+username+'"')))['COUNT(*)']) > 0:        db.close()        return True    else:        db.close()        return Falsedef get_user( username: str):    db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')    if not check_user_exists(username):  # if user don't exist        return False    user_dict = next(        iter(db.query('SELECT * FROM users where username ="'+username+'"')))    user = util.models.User(**user_dict)    db.close()    return user    def user_register( user):    db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')    table = db['users']    user.password = get_password_hash(user.password)    table.insert(dict(user))    db.close()def get_password_hash( password):    return pwd_context.hash(password)def verify_password( plain_password, hashed_password):    return pwd_context.verify(plain_password, hashed_password)def authenticate_user( username: str, password: str):    db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')    if not check_user_exists(username):  # if user don't exist        db.close()        return False    user_dict = next(iter(db.query('SELECT * FROM AI_anchor.users where username ="'+username+'"')))    user = util.models.User(**user_dict)    if not verify_password(password, user.password):        db.close()        return False    return userdef get_user_role(id):    db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')    state = 'SELECT * FROM user_role '\    'INNER JOIN users on user_role.user_id= users.id '\    'INNER JOIN role on user_role.role_id = role.id '\    'WHERE users.id='+str(id)    role_list = []    for row in db.query(state):        role_list.append({'id':row['role_id'],'name':row['name']})    db.close()    return role_listdef get_user_role_list(id):    db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')    state = 'SELECT * FROM user_role '\    'INNER JOIN users on user_role.user_id= users.id '\    'INNER JOIN role on user_role.role_id = role.id '\    'WHERE users.id='+str(id)    role_list = []    for row in db.query(state):        role_list.append(row['role_id'])    db.close()    return role_listdef get_avatar_by_role(id):    db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')    state = 'select role_avatar.role_id, avatar_id, avatar.name,num from role_avatar '\            'INNER JOIN '\            '(SELECT role_id FROM user_role '\            'INNER JOIN users on user_role.user_id= users.id '\            'INNER JOIN role on user_role.role_id = role.id '\            'WHERE users.id='+str(id)+') a '\            'on role_avatar.role_id = a.role_id '\            'INNER JOIN avatar on role_avatar.avatar_id = avatar.id;'    role_list = []    for row in db.query(state):        role_list.append({'role_id':row['role_id'],'avatar_id':row['avatar_id'],'name':row['name'],'num':row['num']})    db.close()    return role_list#def add_role( username,role_id):    #db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')    #user_role_table = db['user_role']    #user_role_table.insert({'user_id':,'role_id':role_id})def add_time_by_invite(code):    db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')    id = get_id_by_code(code)    result = next(iter(db.query('SELECT * FROM users where invite_code ="'+code+'"')))    state = 'UPDATE users SET left_time=left_time+120 WHERE id="'+str(id)+'"'    db.query(state)    db.close()def get_id_by_code(code):    db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')    user_dict = next(iter(db.query('SELECT * FROM users where invite_code ="'+code+'"')))    db.close()    return user_dict['id']def init_invite_code(id):    db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')    sha = hashlib.sha256()    sha.update(str(time.time()).replace('.','').encode())    state = 'UPDATE users SET invite_code="'+sha.hexdigest()[:15]+'" WHERE id="'+str(id)+'"'    db.query(state)    db.close()def add_to_basic_role(id):    db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')    state ='insert into user_role (user_id,role_id) values('+str(id)+',5),('+str(id)+',6); '    print(state)    code = 'ok'    try :        db.query(state)    except:        code = 'not ok'    return codedef get_user_id(token):    db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')    credentials_exception = HTTPException(        status_code=status.HTTP_401_UNAUTHORIZED,        detail="Could not validate credentials",        headers={"WWW-Authenticate": "Bearer"},    )    try:        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])        username: str = payload.get("sub")        if username is None:            raise credentials_exception        token_data = util.models.TokenData(username=username)    except JWTError:        db.close()        raise credentials_exception    user = get_user(username=token_data.username)    if user is None:        db.close()        raise credentials_exception    user_id = first(db.query('SELECT * FROM users where username="' + user.username+'"'))['id']    db.close()    return user_id    def get_id_by_email(email):    db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')    user_dict = next(iter(db.query('SELECT * FROM users where email ="'+email+'"')))    db.close()    return user_dict['id']def email_veri_pass(name):    db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')    user_dict = next(iter(db.query('SELECT * FROM users where username ="'+name+'"')))    user_obj = first(db.query('SELECT * FROM register_veri_code where user_id ="'+str(user_dict['id'])+'"'))    db.close()    if user_obj == None:        return True    else:        return False
 |