|
@@ -11,7 +11,9 @@ from fastapi.staticfiles import StaticFiles
|
|
|
from typing import List, Optional
|
|
|
import json
|
|
|
# path
|
|
|
-import sys
|
|
|
+import sys
|
|
|
+
|
|
|
+from sqlalchemy.sql.elements import False_
|
|
|
|
|
|
# time
|
|
|
# import datetime
|
|
@@ -102,7 +104,7 @@ async def login_for_access_token(request: Request, form_data: OAuth2PasswordRequ
|
|
|
table = db['users']
|
|
|
user.token = access_token
|
|
|
print(user)
|
|
|
- table.update(dict(user), ['username'])
|
|
|
+ table.update(dict(user), ['username'],['password'])
|
|
|
access_token = Authorize.create_access_token(subject=user.username)
|
|
|
refresh_token = Authorize.create_refresh_token(subject=user.username)
|
|
|
Authorize.set_access_cookies(access_token)
|
|
@@ -244,57 +246,120 @@ async def login(request: Request, Authorize: AuthJWT = Depends()):
|
|
|
return templates.TemplateResponse(name='delete_member_test2.html', context={'request': request})
|
|
|
|
|
|
@app.post('/member_delete')
|
|
|
-async def delete_member(request: Request,Authorize: AuthJWT = Depends()):
|
|
|
+async def delete_member(request: Request):
|
|
|
"""刪除成員"""
|
|
|
- try:
|
|
|
- Authorize.jwt_required()
|
|
|
- except Exception as e:
|
|
|
- print(e)
|
|
|
- return RedirectResponse('/login')
|
|
|
- del_user = await request.form()
|
|
|
- current_user = Authorize.get_jwt_subject()
|
|
|
- delete_one = del_user.del_username
|
|
|
db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
|
|
|
+ del_user = models.del_user(**await request.form())
|
|
|
+ delete_name = del_user.del_name
|
|
|
+ statement = 'SELECT * FROM users'
|
|
|
+ current_user = ''
|
|
|
+ for row in db.query(statement):
|
|
|
+ if row['token'] != None :
|
|
|
+ if compare_jwt_token(row['token'],del_user.access_token):
|
|
|
+ current_user = row['username']
|
|
|
+ if current_user == '':
|
|
|
+ return {'msg':'尚未登入'}
|
|
|
+
|
|
|
statement = 'SELECT isAdmin FROM users WHERE userName = "'+current_user+'"'
|
|
|
for row in db.query(statement):
|
|
|
if row['isAdmin']!=1:
|
|
|
return {'msg': ' 你沒有權限'}
|
|
|
- else:
|
|
|
- current_user_roleType = 0
|
|
|
- del_user_roleType = -1
|
|
|
- statement = 'SELECT roleType FROM users WHERE userName = "'+current_user+'"'
|
|
|
- for row in db.query(statement):
|
|
|
- current_user_roleType = row['roleType']
|
|
|
- statement = 'SELECT roleType FROM users WHERE userName = "'+delete_one+'"'
|
|
|
- for row in db.query(statement):
|
|
|
- del_user_roleType = row['roleType']
|
|
|
- if del_user_roleType == -1:
|
|
|
- return {'msg':'不存在使用者'}
|
|
|
- elif current_user_roleType<=del_user_roleType :
|
|
|
- return {'msg': ' 你沒有權限'}
|
|
|
- else :
|
|
|
- statement = 'DELETE FROM users WHERE userName = "'+delete_one+'"'
|
|
|
- db.query(statement)
|
|
|
+
|
|
|
+ current_user_roleType = check_role_type(current_user)
|
|
|
+ del_user_roleType = check_role_type(delete_name)
|
|
|
+
|
|
|
+ if del_user_roleType == None:
|
|
|
+ return {'msg':'不存在使用者'}
|
|
|
+ elif current_user_roleType>del_user_roleType or current_user_roleType==del_user_roleType:
|
|
|
+ return {'msg': ' 你沒有權限'}
|
|
|
+ else :
|
|
|
+ table = db['users']
|
|
|
+ table.delete(username=delete_name)
|
|
|
+
|
|
|
return {'msg': ' 成功刪除'}
|
|
|
|
|
|
-@app.get('/member/authority', response_class=HTMLResponse)
|
|
|
-async def member_authority(request: Request,Authorize: AuthJWT = Depends()):
|
|
|
+@app.get('/member_authority/{edit_one}', response_class=HTMLResponse)
|
|
|
+async def member_authority(request:Request,edit_one: str,Authorize: AuthJWT = Depends()):
|
|
|
"""設定成員權限"""
|
|
|
+
|
|
|
try:
|
|
|
Authorize.jwt_required()
|
|
|
except Exception as e:
|
|
|
print(e)
|
|
|
return RedirectResponse('/login')
|
|
|
- return templates.TemplateResponse(name='member_authority_test.html', context={'request': request})
|
|
|
-
|
|
|
-@app.post('/member/authority')
|
|
|
-async def member_authority(request: Request,Authorize: AuthJWT = Depends()):
|
|
|
- del_user = await request.form()
|
|
|
+ context = {'request': request}
|
|
|
current_user = Authorize.get_jwt_subject()
|
|
|
- delete_one = del_user.del_username
|
|
|
db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
|
|
|
- statement = 'SELECT isAdmin FROM users WHERE userName = "'+current_user+'"'
|
|
|
- await request.form()
|
|
|
+ statement = check_isAdmin(current_user)
|
|
|
+ if statement == "no user":
|
|
|
+ return {'msg':statement }
|
|
|
+ elif statement == 0:
|
|
|
+ return {'msg':'你沒有權限' }
|
|
|
+ current_user_roleType = check_role_type(current_user)
|
|
|
+
|
|
|
+ edit_one_roleType = check_role_type(edit_one)
|
|
|
+ if edit_one_roleType == None:
|
|
|
+ return {'msg':'不存在使用者'}
|
|
|
+ elif current_user_roleType>edit_one_roleType or current_user_roleType==edit_one_roleType:
|
|
|
+ return {'msg': ' 你沒有權限'}
|
|
|
+
|
|
|
+ result = check_role_acl(edit_one)
|
|
|
+
|
|
|
+ if result == []:
|
|
|
+ cmd = 'SELECT id FROM module'
|
|
|
+ for row in db.query(cmd):
|
|
|
+ dic_tmp = {'id':get_user_id(edit_one),'isView':0,'isAdd':0 ,'isEdit':0,'isDel':0,'role_id' : check_role_type(edit_one)}
|
|
|
+ context[get_modul_name(row['id']) ] = dic_tmp
|
|
|
+ else:
|
|
|
+ for dic in result:
|
|
|
+ modul_name = get_modul_name(dic['module_id'])
|
|
|
+ del dic['module_id']
|
|
|
+ context[modul_name ] = dic
|
|
|
+ print(context)
|
|
|
+ return templates.TemplateResponse(name='member_authority_test.html', context=context)
|
|
|
+
|
|
|
+@app.post('/member_authority')
|
|
|
+async def member_authority(request: Request):
|
|
|
+ db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
|
|
|
+ edit_one = models.user_authority(**await request.form())
|
|
|
+ statement = 'SELECT * FROM users'
|
|
|
+ current_user = ''
|
|
|
+ for row in db.query(statement):
|
|
|
+ if row['token'] != None :
|
|
|
+ if compare_jwt_token(row['token'],edit_one.access_token):
|
|
|
+ current_user = row['username']
|
|
|
+ if current_user == '':
|
|
|
+ return {'msg':'尚未登入'}
|
|
|
+
|
|
|
+ statement = check_isAdmin(current_user)
|
|
|
+ if statement == "no user":
|
|
|
+ return {'msg':statement }
|
|
|
+ elif statement == 0:
|
|
|
+ return {'msg':'你沒有權限' }
|
|
|
+
|
|
|
+ current_user_roleType = check_role_type(current_user)
|
|
|
+ edit_one_roleType = edit_one.role_id
|
|
|
+
|
|
|
+ if edit_one.id == None:
|
|
|
+ return {'msg':'不存在使用者'}
|
|
|
+ elif current_user_roleType>edit_one_roleType or current_user_roleType==edit_one_roleType:
|
|
|
+ return {'msg': ' 你沒有權限'}
|
|
|
+ else :
|
|
|
+ row = ['ai_prediction' ,'channel' ,'device', 'event', 'index' ,'performance', 'record', 'setting_device' ,'setting_system','tower']
|
|
|
+ if check_role_acl(get_user_name(edit_one.id)) == []:
|
|
|
+ for module in row :
|
|
|
+ new_dict = edit_one.get_acl_from_module_name(module)
|
|
|
+ table = db['role_acl']
|
|
|
+ table.insert(new_dict)
|
|
|
+ else:
|
|
|
+ for module in row :
|
|
|
+ new_dict = edit_one.get_acl_from_module_name(module)
|
|
|
+ table = db['role_acl']
|
|
|
+ table.update(new_dict, ['id'],['module_id'])
|
|
|
+
|
|
|
+ return {'msg': ' 成功更改'}
|
|
|
+
|
|
|
+
|
|
|
|
|
|
|
|
|
# 溫度API
|
|
@@ -440,7 +505,7 @@ def get_user(username: str):
|
|
|
if not check_user_exists(username): # if user don't exist
|
|
|
return False
|
|
|
user_dict = next(
|
|
|
- iter(db.query('SELECT * FROM Water_towe.users where userName ="'+username+'"')))
|
|
|
+ iter(db.query('SELECT * FROM Water_tower.users where userName ="'+username+'"')))
|
|
|
user = models.User(**user_dict)
|
|
|
return user
|
|
|
|
|
@@ -484,3 +549,84 @@ def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
|
|
|
to_encode.update({"exp": expire})
|
|
|
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
|
|
return encoded_jwt
|
|
|
+
|
|
|
+def compare_jwt_token(access_token: str, token: str):
|
|
|
+ """比對jwt token"""
|
|
|
+ if len(access_token) < len(token):
|
|
|
+ if access_token in token:
|
|
|
+ return True
|
|
|
+ else :
|
|
|
+ return False
|
|
|
+ elif len(access_token) > len(token):
|
|
|
+ if token in access_token:
|
|
|
+ return True
|
|
|
+ else :
|
|
|
+ return False
|
|
|
+ else :
|
|
|
+ if token == access_token:
|
|
|
+ return True
|
|
|
+ else :
|
|
|
+ return False
|
|
|
+
|
|
|
+def check_isAdmin(user_name:str):
|
|
|
+ """查看是否為管理員"""
|
|
|
+ db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
|
|
|
+ isAdmin = None
|
|
|
+ cmd = 'SELECT isAdmin FROM users WHERE userName = "'+user_name+'"'
|
|
|
+ for row in db.query(cmd) :
|
|
|
+ isAdmin = row['isAdmin']
|
|
|
+ if isAdmin== None:
|
|
|
+ return "no user"
|
|
|
+ return isAdmin
|
|
|
+
|
|
|
+def check_role_type(user_name:str):
|
|
|
+ """查看使用者權限"""
|
|
|
+ db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
|
|
|
+ cmd = 'SELECT role.id FROM `users` JOIN `role` ON `users`.roleType = `role`.name where `users`.username = "'+user_name+'"'
|
|
|
+ role_type = None
|
|
|
+ for row in db.query(cmd) :
|
|
|
+ role_type = row['id']
|
|
|
+ return role_type
|
|
|
+
|
|
|
+
|
|
|
+def check_role_acl(user_name:str):
|
|
|
+ """查看權限"""
|
|
|
+ db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
|
|
|
+ cmd = 'SELECT role_acl.* FROM `users` JOIN `role_acl` ON `users`.id = `role_acl`.user_id where `users`.username = "'+user_name+'"'
|
|
|
+
|
|
|
+ result = []
|
|
|
+ for row in db.query(cmd) :
|
|
|
+ dic ={}
|
|
|
+ for col_name in db['role_acl'].columns:
|
|
|
+ dic[col_name] = row[col_name]
|
|
|
+ if dic != {}:
|
|
|
+ result.append(dic)
|
|
|
+ return result
|
|
|
+
|
|
|
+def get_user_id(user_name:str):
|
|
|
+ """獲取user id"""
|
|
|
+ db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
|
|
|
+ cmd = 'SELECT id FROM `users` where username = "'+user_name+'"'
|
|
|
+ id = None
|
|
|
+ for row in db.query(cmd) :
|
|
|
+ id = row['id']
|
|
|
+ return id
|
|
|
+
|
|
|
+def get_user_name(user_id:int):
|
|
|
+ """獲取user id"""
|
|
|
+ db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
|
|
|
+ cmd = 'SELECT username FROM `users` where id = "'+user_id+'"'
|
|
|
+ id = None
|
|
|
+ for row in db.query(cmd) :
|
|
|
+ id = row['username']
|
|
|
+ return id
|
|
|
+
|
|
|
+def get_modul_name(modul_id:str):
|
|
|
+ """獲取modul名稱"""
|
|
|
+ db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
|
|
|
+ cmd = 'SELECT moduleName FROM `module` where id = "'+modul_id+'"'
|
|
|
+ modul_name = None
|
|
|
+ for row in db.query(cmd) :
|
|
|
+ modul_name = row['moduleName']
|
|
|
+ return modul_name
|
|
|
+
|