|
@@ -30,13 +30,16 @@ def get_user_id(token):
|
|
|
if user is None:
|
|
|
raise credentials_exception
|
|
|
user_id = first(db.query('SELECT * FROM users where username="' + user.username+'"'))['id']
|
|
|
+ db.close()
|
|
|
return user_id
|
|
|
|
|
|
def check_user_exists( username):
|
|
|
db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
|
|
|
if int(next(iter(db.query('SELECT COUNT(*) FROM AI_anchor.users WHERE username = "'+username+'"')))['COUNT(*)']) > 0:
|
|
|
+ db.close()
|
|
|
return True
|
|
|
else:
|
|
|
+ db.close()
|
|
|
return False
|
|
|
|
|
|
def get_user( username: str):
|
|
@@ -46,6 +49,7 @@ def get_user( username: str):
|
|
|
user_dict = next(
|
|
|
iter(db.query('SELECT * FROM users where username ="'+username+'"')))
|
|
|
user = util.models.User(**user_dict)
|
|
|
+ db.close()
|
|
|
return user
|
|
|
|
|
|
def user_register( user):
|
|
@@ -53,6 +57,7 @@ def user_register( user):
|
|
|
table = db['users']
|
|
|
user.password = get_password_hash(user.password)
|
|
|
table.insert(dict(user))
|
|
|
+ db.close()
|
|
|
|
|
|
def get_password_hash( password):
|
|
|
return pwd_context.hash(password)
|
|
@@ -63,10 +68,12 @@ def verify_password( plain_password, hashed_password):
|
|
|
def authenticate_user( username: str, password: str):
|
|
|
db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
|
|
|
if not check_user_exists(username): # if user don't exist
|
|
|
+ db.close()
|
|
|
return False
|
|
|
user_dict = next(iter(db.query('SELECT * FROM AI_anchor.users where username ="'+username+'"')))
|
|
|
user = util.models.User(**user_dict)
|
|
|
if not verify_password(password, user.password):
|
|
|
+ db.close()
|
|
|
return False
|
|
|
return user
|
|
|
|
|
@@ -79,6 +86,7 @@ def get_user_role(id):
|
|
|
role_list = []
|
|
|
for row in db.query(state):
|
|
|
role_list.append({'id':row['role_id'],'name':row['name']})
|
|
|
+ db.close()
|
|
|
return role_list
|
|
|
|
|
|
def get_avatar_by_role(id):
|
|
@@ -93,6 +101,7 @@ def get_avatar_by_role(id):
|
|
|
role_list = []
|
|
|
for row in db.query(state):
|
|
|
role_list.append({'id':row['role_id'],'name':row['name']})
|
|
|
+ db.close()
|
|
|
return role_list
|
|
|
#def add_role( username,role_id):
|
|
|
#db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
|
|
@@ -113,22 +122,27 @@ def get_user_id(token):
|
|
|
raise credentials_exception
|
|
|
token_data = util.models.TokenData(username=username)
|
|
|
except JWTError:
|
|
|
+ db.close()
|
|
|
raise credentials_exception
|
|
|
user = get_user(username=token_data.username)
|
|
|
if user is None:
|
|
|
+ db.close()
|
|
|
raise credentials_exception
|
|
|
user_id = first(db.query('SELECT * FROM users where username="' + user.username+'"'))['id']
|
|
|
+ db.close()
|
|
|
return user_id
|
|
|
|
|
|
def get_id_by_email(email):
|
|
|
db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
|
|
|
user_dict = next(iter(db.query('SELECT * FROM users where email ="'+email+'"')))
|
|
|
+ db.close()
|
|
|
return user_dict['id']
|
|
|
|
|
|
def email_veri_pass(name):
|
|
|
db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
|
|
|
user_dict = next(iter(db.query('SELECT * FROM users where username ="'+name+'"')))
|
|
|
user_obj = first(db.query('SELECT * FROM register_veri_code where user_id ="'+str(user_dict['id'])+'"'))
|
|
|
+ db.close()
|
|
|
if user_obj == None:
|
|
|
return True
|
|
|
else:
|