1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- import dataset
- from first import first
- def get_user_id(token):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
- credentials_exception = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise credentials_exception
- token_data = models.TokenData(username=username)
- except JWTError:
- raise credentials_exception
- user = get_user(username=token_data.username)
- if user is None:
- raise credentials_exception
- user_id = first(db.query('SELECT * FROM users where username="' + user.username+'"'))['id']
- return user_id
- def check_user_exists( username):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
- if int(next(iter(db.query('SELECT COUNT(*) FROM AI_anchor.users WHERE username = "'+username+'"')))['COUNT(*)']) > 0:
- return True
- else:
- return False
- def get_user( username: str):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
- if not check_user_exists(username): # if user don't exist
- return False
- user_dict = next(
- iter(db.query('SELECT * FROM users where username ="'+username+'"')))
- user = models.User(**user_dict)
- return user
-
- def user_register( user):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
- table = db['users']
- user.password = get_password_hash(user.password)
- table.insert(dict(user))
- def get_password_hash( password):
- return pwd_context.hash(password)
- def verify_password( plain_password, hashed_password):
- return pwd_context.verify(plain_password, hashed_password)
- def authenticate_user( username: str, password: str):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
- if not check_user_exists(username): # if user don't exist
- return False
- user_dict = next(iter(db.query('SELECT * FROM AI_anchor.users where username ="'+username+'"')))
- user = models.User(**user_dict)
- if not verify_password(password, user.password):
- return False
- return user
- def get_roles( username):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
- state = 'SELECT * FROM user_role '\
- 'INNER JOIN users on user_role.user_id= users.id'\
- 'INNER JOIN role on user_role.role_id = role.id '\
- 'WHERE username=username'
- role_list = []
- for row in db.query(statement):
- role_list.append({'id':row['role_id'],'name':row['name']})
- return role_list
-
- #def add_role( username,role_id):
- #db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
- #user_role_table = db['user_role']
- #user_role_table.insert({'user_id':,'role_id':role_id})
- def get_user_id( username):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
- return str(first(db.query('SELECT COUNT(*) FROM history_input WHERE user_id ='+str(user_obj['id'])))['COUNT(*)'])
-
- def get_id_by_email(email):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
- user_dict = next(iter(db.query('SELECT * FROM users where email ="'+email+'"')))
- return user_dict['id']
- def email_veri_pass(name):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
- user_dict = next(iter(db.query('SELECT * FROM users where username ="'+name+'"')))
- user_obj = first(db.query('SELECT * FROM register_veri_code where user_id ="'+str(user_dict['id'])+'"'))
- if user_obj == None:
- return True
- else:
- return False
|