|
@@ -1,78 +1,78 @@
|
|
|
-class user_util():
|
|
|
- def get_user_id(self, token):
|
|
|
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
|
|
|
- credentials_exception = HTTPException(
|
|
|
- status_code=status.HTTP_401_UNAUTHORIZED,
|
|
|
- detail="Could not validate credentials",
|
|
|
- headers={"WWW-Authenticate": "Bearer"},
|
|
|
- )
|
|
|
- try:
|
|
|
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
|
- username: str = payload.get("sub")
|
|
|
- if username is None:
|
|
|
- raise credentials_exception
|
|
|
- token_data = models.TokenData(username=username)
|
|
|
- except JWTError:
|
|
|
+def get_user_id(token):
|
|
|
+ db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
|
|
|
+ credentials_exception = HTTPException(
|
|
|
+ status_code=status.HTTP_401_UNAUTHORIZED,
|
|
|
+ detail="Could not validate credentials",
|
|
|
+ headers={"WWW-Authenticate": "Bearer"},
|
|
|
+ )
|
|
|
+ try:
|
|
|
+ payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
|
+ username: str = payload.get("sub")
|
|
|
+ if username is None:
|
|
|
raise credentials_exception
|
|
|
- user = get_user(username=token_data.username)
|
|
|
- if user is None:
|
|
|
- raise credentials_exception
|
|
|
- user_id = first(db.query('SELECT * FROM users where username="' + user.username+'"'))['id']
|
|
|
- return user_id
|
|
|
+ token_data = models.TokenData(username=username)
|
|
|
+ except JWTError:
|
|
|
+ raise credentials_exception
|
|
|
+ user = get_user(username=token_data.username)
|
|
|
+ if user is None:
|
|
|
+ raise credentials_exception
|
|
|
+ user_id = first(db.query('SELECT * FROM users where username="' + user.username+'"'))['id']
|
|
|
+ return user_id
|
|
|
+
|
|
|
+def check_user_exists( username):
|
|
|
+ db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
|
|
|
+ if int(next(iter(db.query('SELECT COUNT(*) FROM AI_anchor.users WHERE username = "'+username+'"')))['COUNT(*)']) > 0:
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ return False
|
|
|
|
|
|
- def check_user_exists(self, username):
|
|
|
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
|
|
|
- if int(next(iter(db.query('SELECT COUNT(*) FROM AI_anchor.users WHERE username = "'+username+'"')))['COUNT(*)']) > 0:
|
|
|
- return True
|
|
|
- else:
|
|
|
- return False
|
|
|
+def get_user( username: str):
|
|
|
+ db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
|
|
|
+ if not check_user_exists(username): # if user don't exist
|
|
|
+ return False
|
|
|
+ user_dict = next(
|
|
|
+ iter(db.query('SELECT * FROM AI_anchor.users where username ="'+username+'"')))
|
|
|
+ user = models.User(**user_dict)
|
|
|
+ return user
|
|
|
+
|
|
|
+def user_register( user):
|
|
|
+ db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
|
|
|
+ table = db['users']
|
|
|
+ user.password = get_password_hash(user.password)
|
|
|
+ table.insert(dict(user))
|
|
|
|
|
|
- def get_user(self, username: str):
|
|
|
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
|
|
|
- if not check_user_exists(username): # if user don't exist
|
|
|
- return False
|
|
|
- user_dict = next(
|
|
|
- iter(db.query('SELECT * FROM AI_anchor.users where username ="'+username+'"')))
|
|
|
- user = models.User(**user_dict)
|
|
|
- return user
|
|
|
-
|
|
|
- def user_register(self, user):
|
|
|
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
|
|
|
- table = db['users']
|
|
|
- user.password = get_password_hash(user.password)
|
|
|
- table.insert(dict(user))
|
|
|
+def get_password_hash( password):
|
|
|
+ return pwd_context.hash(password)
|
|
|
+def verify_password( plain_password, hashed_password):
|
|
|
+ return pwd_context.verify(plain_password, hashed_password)
|
|
|
+def authenticate_user( username: str, password: str):
|
|
|
+ db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
|
|
|
+ if not check_user_exists(username): # if user don't exist
|
|
|
+ return False
|
|
|
+ user_dict = next(iter(db.query('SELECT * FROM AI_anchor.users where username ="'+username+'"')))
|
|
|
+ user = models.User(**user_dict)
|
|
|
+ if not verify_password(password, user.password):
|
|
|
+ return False
|
|
|
+ return user
|
|
|
|
|
|
- def get_password_hash(self, password):
|
|
|
- return pwd_context.hash(password)
|
|
|
- def verify_password(self, plain_password, hashed_password):
|
|
|
- return pwd_context.verify(plain_password, hashed_password)
|
|
|
- def authenticate_user(self, username: str, password: str):
|
|
|
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
|
|
|
- if not check_user_exists(username): # if user don't exist
|
|
|
- return False
|
|
|
- user_dict = next(iter(db.query('SELECT * FROM AI_anchor.users where username ="'+username+'"')))
|
|
|
- user = models.User(**user_dict)
|
|
|
- if not verify_password(password, user.password):
|
|
|
- return False
|
|
|
- return user
|
|
|
+def get_roles( username):
|
|
|
+ db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
|
|
|
+ state = 'SELECT * FROM user_role '\
|
|
|
+ 'INNER JOIN users on user_role.user_id= users.id'\
|
|
|
+ 'INNER JOIN role on user_role.role_id = role.id '\
|
|
|
+ 'WHERE username=username'
|
|
|
+ role_list = []
|
|
|
+ for row in db.query(statement):
|
|
|
+ role_list.append({'id':row['role_id'],'name':row['name']})
|
|
|
+ return role_list
|
|
|
+
|
|
|
+def add_role( username,role_id):
|
|
|
+ db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
|
|
|
+ user_role_table = db['user_role']
|
|
|
+ user_role_table.insert({'user_id':,'role_id':role_id})
|
|
|
|
|
|
- def get_roles(self, username):
|
|
|
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
|
|
|
- state = 'SELECT * FROM user_role '\
|
|
|
- 'INNER JOIN users on user_role.user_id= users.id'\
|
|
|
- 'INNER JOIN role on user_role.role_id = role.id '\
|
|
|
- 'WHERE username=username'
|
|
|
- role_list = []
|
|
|
- for row in db.query(statement):
|
|
|
- role_list.append({'id':row['role_id'],'name':row['name']})
|
|
|
- return role_list
|
|
|
-
|
|
|
- def add_role(self, username,role_id):
|
|
|
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
|
|
|
- user_role_table = db['user_role']
|
|
|
- user_role_table.insert({'user_id':,'role_id':role_id})
|
|
|
+def get_user_id( username):
|
|
|
+ db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
|
|
|
+ return str(first(db.query('SELECT COUNT(*) FROM history_input WHERE user_id ='+str(user_obj['id'])))['COUNT(*)'])
|
|
|
+
|
|
|
|
|
|
- def get_user_id(self, username):
|
|
|
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
|
|
|
- return str(first(db.query('SELECT COUNT(*) FROM history_input WHERE user_id ='+str(user_obj['id'])))['COUNT(*)'])
|
|
|
-
|