# fastapi from fastapi import FastAPI, Request, Response, HTTPException, status, Depends , Form from fastapi import templating from fastapi.templating import Jinja2Templates from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from pydantic import BaseModel # fastapi view function parameters from typing import List, Optional import json # path import sys from pydantic.errors import ArbitraryTypeError from sqlalchemy.sql.elements import False_ # time # import datetime from datetime import timedelta, datetime # db import dataset from passlib import context from sqlalchemy.sql.expression import true import models from random import randint,uniform # authorize from passlib.context import CryptContext pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") import jwt from fastapi_jwt_auth import AuthJWT from fastapi_jwt_auth.exceptions import AuthJWTException from fastapi.security import OAuth2AuthorizationCodeBearer, OAuth2PasswordRequestForm import numpy as np import pymysql class dateEncode(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') else: return json.JSONEncoder.default(self, obj) pymysql.install_as_MySQLdb() db_settings = { "host": "db.ptt.cx", "port": 3306, "user": "choozmo", "password": "pAssw0rd", "db": "Water_tower", "charset": "utf8mb4" } # app app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) SECRET_KEY = "df2f77bd544240801a048bd4293afd8eeb7fff3cb7050e42c791db4b83ebadcd" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 3000 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # app.mount(path='/templates', app=StaticFiles(directory='templates'), name='templates') app.mount(path='/static', app=StaticFiles(directory='static'), name='static ') # templates = Jinja2Templates(directory='templates') @AuthJWT.load_config def get_config(): return models.Settings() # view @app.get('/', response_class=HTMLResponse) async def index(request: Request): print(request) return templates.TemplateResponse(name='index.html', context={'request': request}) @app.get('/login', response_class=HTMLResponse) async def login(request: Request): return templates.TemplateResponse(name='login.html', context={'request': request}) @app.post("/login") async def login_for_access_token(request: Request, form_data: OAuth2PasswordRequestForm = Depends(), Authorize: AuthJWT = Depends()): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') user = authenticate_user(form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) table = db['users'] user.token = access_token print(user) table.update(dict(user), ['username'],['password']) access_token = Authorize.create_access_token(subject=user.username) refresh_token = Authorize.create_refresh_token(subject=user.username) Authorize.set_access_cookies(access_token) Authorize.set_refresh_cookies(refresh_token) #return templates.TemplateResponse("home.html", {"request": request, "msg": 'Login'}) return {"access_token": access_token, "token_type": "bearer"} # 回傳token給前端 @app.get('/register', response_class=HTMLResponse) async def login(request: Request): return templates.TemplateResponse(name='rigister_test.html', context={'request': request}) class DataObjects(BaseModel): CreateTime:str DeviceCode: str Key: str Value: str class P_Multiple(BaseModel): Token: str DataObjects: List[DataObjects] # description: Optional[str] = None # price: float # tax: Optional[float] = None db2 = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') table_record_tower = db2['record_tower'] @app.post('/addRecord_Parameter_Multiple') async def register(item: P_Multiple): global table_record_tower # print(item.Token) # print(item.DataObjects) for d in item.DataObjects: # print(d.Key) # print(d.Value) table_record_tower.insert({'key':d.Key,'value':d.Value,'Createtime':d.CreateTime,'device_id':d.DeviceCode}) # print(item) return "OK" @app.post('/register') async def register(request: Request, form_data: OAuth2PasswordRequestForm = Depends()): user = models.User(**await request.form()) print(form_data.username, form_data.password, user) user.id = randint(1000, 9999) user.isAdmin = 0 #預設為非管理者 user.roleType = 0 #預設為employee # 密碼加密 #user.password = get_password_hash(user.password) # 存入DB db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') user_table = db['users'] user_table.insert(dict(user)) # 跳轉頁面至登入 return templates.TemplateResponse(name='login.html', context={'request': request}) @app.post('/record_tower') async def record_tower(request: Request,data : models.record_tower_data,key:str): #data = models.tower_data(**await request.form()) if key!="21232f297a57a5a743894a0e4a801fc3": return {'msg':'no access'} loc_dt = datetime.today() loc_dt_format = loc_dt.strftime("%Y-%m-%d %H:%M:%S") db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') # cmd ="SELECT * FROM device" dict_tmp = data.__dict__ # check = False # for row in db.query(cmd): # if row['id']== tower_id: # check = True # if check : # result={'msg':"success"} # else: # result = {'msg':"no device"} dict_tmp['id']=pymysql.NULL result = db['record_tower'].insert(dict(dict_tmp)) result={'msg':"success"} return json.dumps(result,ensure_ascii=False) @app.post('/record_dcs') async def record_dcs(request: Request,data : models.record_tower_data,key:str): #data = models.tower_data(**await request.form()) if key!="21232f297a57a5a743894a0e4a801fc3": return {'msg':'no access'} loc_dt = datetime.today() loc_dt_format = loc_dt.strftime("%Y-%m-%d %H:%M:%S") db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') dict_tmp = data.__dict__ # cmd ="SELECT * FROM device" # cmd2={} # check = False # if check : # result={'msg':"success"} # else: # result = {'msg':"no device"} result = db['record_dcs'].insert(dict(dict_tmp)) result={'msg':"success"} return json.dumps(result,ensure_ascii=False) @app.post('/record_diagnosis') async def record_diagnosis(request: Request,data : models.record_diagnosis_data,key:str): #data = models.tower_data(**await request.form()) if key!="21232f297a57a5a743894a0e4a801fc3": return {'msg':'no access'} loc_dt = datetime.today() loc_dt_format = loc_dt.strftime("%Y-%m-%d %H:%M:%S") db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') dict_tmp = data.__dict__ dict_tmp['createTime']=loc_dt_format result = db['record_diagnosis'].update(dict(dict_tmp),['vibration_id']) #print(result) if result : result={'msg':'success insert'} else : result={'msg':'fail to insert'} return json.dumps(result,ensure_ascii=False) @app.post('/record_health') async def record_health(request: Request,data : models.record_health_data,key:str): #data = models.tower_data(**await request.form()) if key!="21232f297a57a5a743894a0e4a801fc3": return {'msg':'no access'} db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') dict_tmp = data.__dict__ result = db['record_health'].insert(dict(dict_tmp)) #print(result) if result : result={'msg':'success insert'} else : result={'msg':'fail to insert'} return json.dumps(result,ensure_ascii=False) @app.post('/record_performance') async def record_performance(request: Request,data : models.record_performance_data,key:str): #data = models.tower_data(**await request.form()) if key!="21232f297a57a5a743894a0e4a801fc3": return {'msg':'no access'} db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') dict_tmp = data.__dict__ result = db['record_performance'].insert(dict(dict_tmp)) #print(result) if result : result={'msg':'success insert'} else : result={'msg':'fail to insert'} return json.dumps(result,ensure_ascii=False) @app.post('/record_prediction') async def record_prediction(request: Request,data : models.record_prediction_data,key:str): #data = models.tower_data(**await request.form()) if key!="21232f297a57a5a743894a0e4a801fc3": return {'msg':'no access'} db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') dict_tmp = data.__dict__ result = db['record_prediction'].insert(dict(dict_tmp)) #print(result) if result : result={'msg':'success insert'} else : result={'msg':'fail to insert'} return json.dumps(result,ensure_ascii=False) @app.post('/record_prediction_upd') async def record_prediction(request: Request,data : models.record_prediction_upd_data,key:str): #data = models.tower_data(**await request.form()) if key!="21232f297a57a5a743894a0e4a801fc3": return {'msg':'no access'} db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') dict_tmp = data.__dict__ result = db['record_prediction_upd'].insert(dict(dict_tmp)) #print(result) if result : result={'msg':'success insert'} else : result={'msg':'fail to insert'} return json.dumps(result,ensure_ascii=False) @app.get('/add_tower') async def home(request: Request, Authorize: AuthJWT = Depends()): return templates.TemplateResponse(name='add_tower.html', context={'request': request}) @app.post('/add_tower') async def home(request: Request, Authorize: AuthJWT = Depends()): data=request.form() print(data.device) @app.get('/home', response_class=HTMLResponse) async def home(request: Request, Authorize: AuthJWT = Depends()): try: Authorize.jwt_required() except Exception as e: print(e) return RedirectResponse('/login') #add_data() return templates.TemplateResponse(name='home.html', context={'request': request}) @app.get('/home/show', response_class=HTMLResponse) async def home(request: Request, Authorize: AuthJWT = Depends()): try: Authorize.jwt_required() except Exception as e: print(e) return RedirectResponse('/login') current_user = Authorize.get_jwt_subject() result = [{'user_role':check_role_type(current_user)}] result.append(check_tower_health(current_user)) #print(result) return json.dumps(result,ensure_ascii=False) @app.get('/org', response_class=HTMLResponse) async def tower(request: Request, Authorize: AuthJWT = Depends()): try: Authorize.jwt_required() except Exception as e: print(e) return RedirectResponse('/login') current_user = Authorize.get_jwt_subject() result = get_user_under_organization(current_user) return json.dumps(result,ensure_ascii=False) @app.get('/user_role', response_class=HTMLResponse) async def user_role(request: Request, Authorize: AuthJWT = Depends()): try: Authorize.jwt_required() except Exception as e: print(e) return RedirectResponse('/login') current_user = Authorize.get_jwt_subject() result = {'role':check_role_type(current_user)} print(result) return json.dumps(result,ensure_ascii=False) @app.get('/tower', response_class=HTMLResponse) async def tower(request: Request, Authorize: AuthJWT = Depends()): try: Authorize.jwt_required() except Exception as e: print(e) return RedirectResponse('/login') current_user = Authorize.get_jwt_subject() return templates.TemplateResponse(name='tower.html', context={"request":request}) @app.get('/tower/org', response_class=HTMLResponse) async def tower(request: Request, Authorize: AuthJWT = Depends()): try: Authorize.jwt_required() except Exception as e: print(e) return RedirectResponse('/login') current_user = Authorize.get_jwt_subject() result = get_user_under_organization(current_user) return json.dumps(result,ensure_ascii=False) @app.get('/tower/', response_class=HTMLResponse) async def tower(request: Request,company:str,factory:str,department:str,towerGroup:str, Authorize: AuthJWT = Depends()): try: Authorize.jwt_required() except Exception as e: print(e) return RedirectResponse('/login') #current_user = Authorize.get_jwt_subject() tower_arr = get_tower(company,factory,department,towerGroup) result = [] for tower in tower_arr: result.append({'tower_name': tower,'tower_data': get_tower_info(tower)}) return json.dumps(result,ensure_ascii=False, cls = dateEncode) @app.get('/tower/performance/{tower_id}', response_class=HTMLResponse) async def member_authority(request:Request,tower_id: str,Authorize: AuthJWT = Depends()): """設定成員權限""" try: Authorize.jwt_required() except Exception as e: print(e) return RedirectResponse('/login') result = get_tower_perform(tower_id) #print(result) return json.dumps(result,ensure_ascii=False, cls = dateEncode) @app.get('/optim', response_class=HTMLResponse) async def optim(request: Request, Authorize: AuthJWT = Depends()): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') statement = 'SELECT value FROM record_tower WHERE record_tower.key = "hotTemp"' x = 0 y = 0 z = 0 count = 0 waterflow = 0 fannum = 0 savewater = 0 fanchange = 0 saveelec = 0 total = 0 flowchange = 0 try: Authorize.jwt_required() except Exception as e: print(e) return RedirectResponse('/login') current_user = Authorize.get_jwt_subject() print(check_role_type(current_user)) role = int(check_role_type(current_user)) print(check_role_type(current_user)) if role == 1: statement = 'SELECT value FROM record_tower WHERE record_tower.key = "hotTemp"' for temp in db.query(statement): print(temp['value']) x=temp['value'] statement2 = 'SELECT value FROM record_tower WHERE record_tower.key = "coldTempData1"' for temp2 in db.query(statement2): print(temp2['value']) y=temp2['value'] statement3 = 'SELECT value FROM record_tower WHERE record_tower.key = "wetTemp"' for temp3 in db.query(statement3): print(temp3['value']) z=temp3['value'] statement4 = 'SELECT value FROM record_tower WHERE record_tower.key = "count"' for tower in db.query(statement4): print(tower['value']) count=tower['value'] statement5 = 'SELECT value FROM record_tower WHERE record_tower.key = "waterflow"' for tower in db.query(statement5): waterflow=tower['value'] statement6 = 'SELECT value FROM record_tower WHERE record_tower.key = "fannum"' for tower in db.query(statement6): fannum=tower['value'] statement7 = 'SELECT value FROM record_tower WHERE record_tower.key = "flowchange"' for tower in db.query(statement7): flowchange=tower['value'] statement8 = 'SELECT value FROM record_tower WHERE record_tower.key = "savewater"' for tower in db.query(statement8): savewater=tower['value'] statement9 = 'SELECT value FROM record_tower WHERE record_tower.key = "fanchange"' for tower in db.query(statement9): fanchange=tower['value'] statement10 = 'SELECT value FROM record_tower WHERE record_tower.key = "saveelec"' for tower in db.query(statement10): saveelec=tower['value'] statement11 = 'SELECT value FROM record_tower WHERE record_tower.key = "total"' for tower in db.query(statement11): total=tower['value'] elif role == 3 : statement = 'SELECT value FROM record_tower WHERE record_tower.key = "hotTemp1"' for temp in db.query(statement): print(temp['value']) x=temp['value'] statement2 = 'SELECT value FROM record_tower WHERE record_tower.key = "coldTempData2"' for temp2 in db.query(statement2): print(temp2['value']) y=temp2['value'] statement3 = 'SELECT value FROM record_tower WHERE record_tower.key = "wetTemp1"' for temp3 in db.query(statement3): print(temp3['value']) z=temp3['value'] statement4 = 'SELECT value FROM record_tower WHERE record_tower.key = "count1"' for tower in db.query(statement4): print(tower['value']) count=tower['value'] statement5 = 'SELECT value FROM record_tower WHERE record_tower.key = "waterflow1"' for tower in db.query(statement5): waterflow=tower['value'] statement6 = 'SELECT value FROM record_tower WHERE record_tower.key = "fannum1"' for tower in db.query(statement6): fannum=tower['value'] statement7 = 'SELECT value FROM record_tower WHERE record_tower.key = "flowchange1"' for tower in db.query(statement7): flowchange=tower['value'] statement8 = 'SELECT value FROM record_tower WHERE record_tower.key = "savewater1"' for tower in db.query(statement8): savewater=tower['value'] statement9 = 'SELECT value FROM record_tower WHERE record_tower.key = "fanchange1"' for tower in db.query(statement9): fanchange=tower['value'] statement10 = 'SELECT value FROM record_tower WHERE record_tower.key = "saveelec1"' for tower in db.query(statement10): saveelec=tower['value'] statement11 = 'SELECT value FROM record_tower WHERE record_tower.key = "total1"' for tower in db.query(statement11): total=tower['value'] elif role == 4 : statement = 'SELECT value FROM record_tower WHERE record_tower.key = "hotTemp2"' for temp in db.query(statement): print(temp['value']) x=temp['value'] statement2 = 'SELECT value FROM record_tower WHERE record_tower.key = "coldTempData3"' for temp2 in db.query(statement2): print(temp2['value']) y=temp2['value'] statement3 = 'SELECT value FROM record_tower WHERE record_tower.key = "wetTemp2"' for temp3 in db.query(statement3): print(temp3['value']) z=temp3['value'] statement4 = 'SELECT value FROM record_tower WHERE record_tower.key = "count2"' for tower in db.query(statement4): print(tower['value']) count=tower['value'] statement5 = 'SELECT value FROM record_tower WHERE record_tower.key = "waterflow2"' for tower in db.query(statement5): waterflow=tower['value'] statement6 = 'SELECT value FROM record_tower WHERE record_tower.key = "fannum2"' for tower in db.query(statement6): fannum=tower['value'] statement7 = 'SELECT value FROM record_tower WHERE record_tower.key = "flowchange2"' for tower in db.query(statement7): flowchange=tower['value'] statement8 = 'SELECT value FROM record_tower WHERE record_tower.key = "savewater2"' for tower in db.query(statement8): savewater=tower['value'] statement9 = 'SELECT value FROM record_tower WHERE record_tower.key = "fanchange2"' for tower in db.query(statement9): fanchange=tower['value'] statement10 = 'SELECT value FROM record_tower WHERE record_tower.key = "saveelec2"' for tower in db.query(statement10): saveelec=tower['value'] statement11 = 'SELECT value FROM record_tower WHERE record_tower.key = "total2"' for tower in db.query(statement11): total=tower['value'] else : print("noright") return templates.TemplateResponse(name='optim.html',context= { 'request': request,"x":x,"y":y,"z":z,"count":count,"waterflow":waterflow ,"fannum":fannum , "flowchange":flowchange , "savewater":savewater , "fanchange":fanchange , "saveelec":saveelec , "total":total }) @app.get('/vibration', response_class=HTMLResponse) async def vibration(request: Request, Authorize: AuthJWT = Depends()): try: Authorize.jwt_required() except Exception as e: print(e) return RedirectResponse('/login') return templates.TemplateResponse(name='vibration_test.html', context={'request': request}) @app.get('/channel', response_class=HTMLResponse) async def vibration(request: Request, Authorize: AuthJWT = Depends()): try: Authorize.jwt_required() except Exception as e: print(e) return RedirectResponse('/login') return templates.TemplateResponse(name='channel.html', context={'request': request}) @app.get('/channel/{tower_id}/{channel_id}', response_class=HTMLResponse) async def vibration(request: Request,tower_id:str,channel_id:str,Authorize: AuthJWT = Depends()): try: Authorize.jwt_required() except Exception as e: print(e) return RedirectResponse('/login') print(find_vibration_id(tower_id,channel_id)) result = get_channel_info(find_vibration_id(tower_id,channel_id)) return json.dumps(result,ensure_ascii=False, cls = dateEncode) @app.get('/channel_chart/{tower_id}/{channel_id}', response_class=HTMLResponse) async def vibration(request: Request,tower_id:str,channel_id:str,Authorize: AuthJWT = Depends()): try: Authorize.jwt_required() except Exception as e: print(e) return RedirectResponse('/login') print(find_vibration_id(tower_id,channel_id)) result = get_channel_health(find_vibration_id(tower_id,channel_id)) return json.dumps(result,ensure_ascii=False, cls = dateEncode) @app.get('/channel_predict/{tower_id}/{channel_id}', response_class=HTMLResponse) async def vibration(request: Request,tower_id:str,channel_id:str,Authorize: AuthJWT = Depends()): try: Authorize.jwt_required() except Exception as e: print(e) return RedirectResponse('/login') #print(find_vibration_id(tower_id,channel_id)) result = get_predect_data(find_vibration_id(tower_id,channel_id)) return json.dumps(result,ensure_ascii=False, cls = dateEncode) @app.get('/channel_frequency/{tower_id}/{channel_id}', response_class=HTMLResponse) async def vibration(request: Request,tower_id:str,channel_id:str,Authorize: AuthJWT = Depends()): try: Authorize.jwt_required() except Exception as e: print(e) return RedirectResponse('/login') #print(find_vibration_id(tower_id,channel_id)) result = get_frequency(find_vibration_id(tower_id,channel_id)) return json.dumps(result,ensure_ascii=False, cls = dateEncode) @app.get('/history', response_class=HTMLResponse) async def history(request: Request, Authorize: AuthJWT = Depends()): try: Authorize.jwt_required() except Exception as e: print(e) return RedirectResponse('/login') # current_user = Authorize.get_jwt_subject() return templates.TemplateResponse(name='history.html', context={'request': request}) @app.get('/history/', response_class=HTMLResponse) async def history(request: Request,company:str,factory:str,department:str,towerGroup:str, Authorize: AuthJWT = Depends()): try: Authorize.jwt_required() except Exception as e: print(e) return RedirectResponse('/login') #current_user = Authorize.get_jwt_subject() tower_arr = get_tower(company,factory,department,towerGroup) result = [] for tower in tower_arr: tmp_arr = get_vibration_info(tower) for arr in tmp_arr: result.append(arr) return json.dumps(result,ensure_ascii=False, cls = dateEncode) @app.get('/device', response_class=HTMLResponse) async def device(request: Request, Authorize: AuthJWT = Depends()): try: Authorize.jwt_required() except Exception as e: print(e) return RedirectResponse('/login') db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') statement = 'SELECT * FROM device ' a = [] print("start") for row in db.query(statement): print(row['id'],row['deviceName'],row['hostIP'],row['CompanyCode'],row['FactoryCode'],row['DepartmentCode']) a.append((row['id'],row['deviceName'],row['hostIP'],row['CompanyCode'],row['FactoryCode'],row['DepartmentCode'])) print(a) print("over3") #result = json.dumps(b,ensure_ascii=False) # current_user = Authorize.get_jwt_subject() return templates.TemplateResponse(name='device.html', context={'request': request,'a':a}) @app.get('/system', response_class=HTMLResponse) async def system(request: Request, Authorize: AuthJWT = Depends()): try: Authorize.jwt_required() except Exception as e: print(e) return RedirectResponse('/login') # current_user = Authorize.get_jwt_subject() return templates.TemplateResponse(name='system.html', context={'request': request}) @app.get('/member', response_class=HTMLResponse) async def get_member(request: Request, Authorize: AuthJWT = Depends()): """獲取所有帳號資訊""" try: Authorize.jwt_required() except Exception as e: print(e) return RedirectResponse('/login') db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') statement = 'SELECT id,username,isAdmin FROM users' json_dic = [] for row in db.query(statement): #print(row['id'],row['username']) json_dic.append({'username':row['username'],'isAdmin':row['isAdmin'],'roleType':check_role_type(row['username']),'role_name' :get_role_name(check_role_type(row['username']))}) result = json.dumps(json_dic,ensure_ascii=False) return result @app.get('/member/edit/', response_class=HTMLResponse) async def login(request: Request, name:str,isAdmin:int,isEnable:int ,Authorize: AuthJWT = Depends()): try: Authorize.jwt_required() except Exception as e: print(e) return RedirectResponse('/login') db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') current_user = Authorize.get_jwt_subject() current_user_roleType = check_role_type(current_user) del_user_roleType = check_role_type(name) statement = 'SELECT isAdmin FROM users WHERE userName = "'+current_user+'"' for row in db.query(statement): if row['isAdmin']!=1: return json.dumps([{'msg':'你沒有權限'}],ensure_ascii=False) if del_user_roleType == None: return json.dumps([{'msg':'不存在使用者'}],ensure_ascii=False) elif current_user_roleType>del_user_roleType or current_user_roleType==del_user_roleType: return json.dumps([{'msg':'你沒有權限'}],ensure_ascii=False) user_dic = get_user(name) print(user_dic) user_dic.isAdmin = isAdmin user_dic.isEnable = isEnable table = db['users'] table.update(dict(user_dic), ['username']) return json.dumps([{'msg':"成功更改"}],ensure_ascii=False) @app.get('/member_delete', response_class=HTMLResponse) async def login(request: Request, Authorize: AuthJWT = Depends()): try: Authorize.jwt_required() except Exception as e: print(e) return RedirectResponse('/login') return templates.TemplateResponse(name='delete_member_test2.html', context={'request': request}) @app.post('/member_delete') async def delete_member(request: Request): """刪除成員""" db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') del_user = models.del_user(**await request.form()) delete_name = del_user.del_name statement = 'SELECT * FROM users' current_user = '' for row in db.query(statement): if row['token'] != None : if compare_jwt_token(row['token'],del_user.access_token): current_user = row['username'] if current_user == '': return {'msg':'尚未登入'} statement = 'SELECT isAdmin FROM users WHERE userName = "'+current_user+'"' for row in db.query(statement): if row['isAdmin']!=1: return {'msg': ' 你沒有權限'} current_user_roleType = check_role_type(current_user) del_user_roleType = check_role_type(delete_name) if del_user_roleType == None: return {'msg':'不存在使用者'} elif current_user_roleType>del_user_roleType or current_user_roleType==del_user_roleType: return {'msg': ' 你沒有權限'} else : table = db['users'] table.delete(username=delete_name) return {'msg': ' 成功刪除'} @app.get('/member_authority/{edit_one}', response_class=HTMLResponse) async def member_authority(request:Request,edit_one: int,Authorize: AuthJWT = Depends()): """設定成員權限""" try: Authorize.jwt_required() except Exception as e: print(e) return RedirectResponse('/login') context = {'request': request} current_user = Authorize.get_jwt_subject() db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') statement = check_isAdmin(current_user) if statement == "no user": return templates.TemplateResponse(name='notice.html', context={"request":request,'msg':'no user' }) elif statement == 0: return templates.TemplateResponse(name='notice.html', context={"request":request,'msg':"沒有權限" }) current_user_roleType = check_role_type(current_user) if edit_one == None: return templates.TemplateResponse(name='notice.html', context={"request":request,'msg':'no role' }) elif int(current_user_roleType)>int(edit_one) or int(current_user_roleType)==int(edit_one): return templates.TemplateResponse(name='notice.html', context={"request":request,'msg':"沒有權限" }) result = check_role_acl(edit_one) if result == []: cmd = 'SELECT id FROM module' for row in db.query(cmd): dic_tmp = {'id':0,'isView':0,'isAdd':0 ,'isEdit':0,'isDel':0,'role_id' : edit_one} context[get_modul_name(row['id']) ] = dic_tmp else: for dic in result: modul_name = get_modul_name(dic['module_id']) del dic['module_id'] context[modul_name ] = dic return templates.TemplateResponse(name='member_authority_test.html', context=context) @app.post('/member_authority') async def member_authority(request: Request): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') edit_one = models.user_authority(**await request.form()) statement = 'SELECT * FROM users' current_user = '' for row in db.query(statement): if row['token'] != None : if compare_jwt_token(row['token'],edit_one.access_token): current_user = row['username'] if current_user == '': return templates.TemplateResponse(name='notice.html', context={"request":request,'msg':'尚未登入'}) statement = check_isAdmin(current_user) if statement == "no user": return templates.TemplateResponse(name='notice.html', context={"request":request,'msg':statement }) elif statement == 0: return templates.TemplateResponse(name='notice.html', context={"request":request,'msg':'你沒有權限' }) current_user_roleType = check_role_type(current_user) edit_one_roleType = edit_one.role_id if current_user_roleType>edit_one_roleType or current_user_roleType==edit_one_roleType: return templates.TemplateResponse(name='notice.html', context={"request":request,'msg': ' 你沒有權限'}) else : row = ['ai_prediction' ,'channel' ,'device', 'event', 'index' ,'performance', 'record', 'setting_device' ,'setting_system','tower'] if check_role_acl(edit_one.role_id) == []: for module in row : new_dict = edit_one.get_acl_from_module_name(module) new_dict["id"]= None table = db['role_acl'] table.insert(new_dict) else: for module in row : new_dict = edit_one.get_acl_from_module_name(module) table = db['role_acl'] table.update(new_dict, ['id']) return templates.TemplateResponse(name='notice.html', context={"request":request,'msg': '成功更改權限'}) # 溫度API @app.get('/temperature') async def get_temperatures(): """ 撈DB溫度 """ return {'hot_water': 30.48, 'cold_water': 28.10, 'wet_ball': 25.14} @app.post("/example") async def example(request: Request,Authorize: AuthJWT = Depends()): try: Authorize.jwt_required() except Exception as e: print(e) current_user = Authorize.get_jwt_subject() #form_data = await request.form() print( current_user) return current_user @app.post('/user') def user(Authorize: AuthJWT = Depends()): Authorize.jwt_required() current_user = Authorize.get_jwt_subject() return {"user": current_user} @app.get("/add_data", response_class=HTMLResponse) async def example(request: Request,Authorize: AuthJWT = Depends()): try: Authorize.jwt_required() except Exception as e: print(e) return RedirectResponse('/login') add_data() return templates.TemplateResponse(name='test.html', context={'request': request}) @app.get('/health') async def get_health(date: str): """ 撈健康指標、預設健康指標 """ date = str(datetime.strptime(date, "%Y-%m-%d"))[:10] print(date) print(str(datetime.today())) print(str(datetime.today()-timedelta(days=1))) fake_data = { str(datetime.today())[:10]: {'curr_health': 0.7, 'pred_health': 0.8}, str(datetime.today()-timedelta(days=1))[:10]: {'curr_health': 0.6, 'pred_health': 0.7}, } return fake_data[date] @app.get('/history_data') async def get_history(time_end: str): """ 透過終點時間,抓取歷史資料。 """ date = str(datetime.strptime(time_end, "%Y-%m-%d"))[:10] print(date) print(str(datetime.today())) print(str(datetime.today()-timedelta(days=1))) fake_data = { str(datetime.today())[:10]: { 'curr_history': { 'RPM_1X': list(np.random.rand(13)), 'RPM_2X': list(np.random.rand(13)), 'RPM_3X': list(np.random.rand(13)), 'RPM_4X': list(np.random.rand(13)), 'RPM_5X': list(np.random.rand(13)), 'RPM_6X': list(np.random.rand(13)), 'RPM_7X': list(np.random.rand(13)), 'RPM_8X': list(np.random.rand(13)), 'Gear_1X': list(np.random.rand(13)), 'Gear_2X': list(np.random.rand(13)), 'Gear_3X': list(np.random.rand(13)), 'Gear_4X': list(np.random.rand(13)), }, 'past_history': { 'RPM_1X': list(np.random.rand(13)), 'RPM_2X': list(np.random.rand(13)), 'RPM_3X': list(np.random.rand(13)), 'RPM_4X': list(np.random.rand(13)), 'RPM_5X': list(np.random.rand(13)), 'RPM_6X': list(np.random.rand(13)), 'RPM_7X': list(np.random.rand(13)), 'RPM_8X': list(np.random.rand(13)), 'Gear_1X': list(np.random.rand(13)), 'Gear_2X': list(np.random.rand(13)), 'Gear_3X': list(np.random.rand(13)), 'Gear_4X': list(np.random.rand(13)), } }, str(datetime.today()-timedelta(days=1))[:10]: { 'curr_history': { 'RPM_1X': list(np.random.rand(13)), 'RPM_2X': list(np.random.rand(13)), 'RPM_3X': list(np.random.rand(13)), 'RPM_4X': list(np.random.rand(13)), 'RPM_5X': list(np.random.rand(13)), 'RPM_6X': list(np.random.rand(13)), 'RPM_7X': list(np.random.rand(13)), 'RPM_8X': list(np.random.rand(13)), 'Gear_1X': list(np.random.rand(13)), 'Gear_2X': list(np.random.rand(13)), 'Gear_3X': list(np.random.rand(13)), 'Gear_4X': list(np.random.rand(13)), }, 'past_history': { 'RPM_1X': list(np.random.rand(13)), 'RPM_2X': list(np.random.rand(13)), 'RPM_3X': list(np.random.rand(13)), 'RPM_4X': list(np.random.rand(13)), 'RPM_5X': list(np.random.rand(13)), 'RPM_6X': list(np.random.rand(13)), 'RPM_7X': list(np.random.rand(13)), 'RPM_8X': list(np.random.rand(13)), 'Gear_1X': list(np.random.rand(13)), 'Gear_2X': list(np.random.rand(13)), 'Gear_3X': list(np.random.rand(13)), 'Gear_4X': list(np.random.rand(13)), } }, } return fake_data[date] # Login funtion part def check_user_exists(username): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') if int(next(iter(db.query('SELECT COUNT(*) FROM Water_tower.users WHERE userName = "'+username+'"')))['COUNT(*)']) > 0: return True else: return False def get_user(username: str): """ 取得使用者資訊(Model) """ db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') if not check_user_exists(username): # if user don't exist return False user_dict = next( iter(db.query('SELECT * FROM Water_tower.users where userName ="'+username+'"'))) user = models.User(**user_dict) return user def user_register(user): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') table = db['users'] #user.password = get_password_hash(user.password) table.insert(dict(user)) def get_password_hash(password): """ 加密密碼 """ return pwd_context.hash(password) def verify_password(plain_password, hashed_password): """ 驗證密碼(hashed) """ return pwd_context.verify(plain_password, hashed_password) def authenticate_user(username: str, password: str): """ 連線DB,讀取使用者是否存在。 """ db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') if not check_user_exists(username): # if user don't exist return False if not check_user_isEnable(username): return False user_dict = next(iter(db.query('SELECT * FROM Water_tower.users where userName ="'+username+'"'))) user = models.User(**user_dict) #if not verify_password(password, user.password): #return False return user def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): """ 創建token,並設定過期時間。 """ to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def compare_jwt_token(access_token: str, token: str): """比對jwt token""" if len(access_token) < len(token): if access_token in token: return True else : return False elif len(access_token) > len(token): if token in access_token: return True else : return False else : if token == access_token: return True else : return False def check_isAdmin(user_name:str): """查看是否為管理員""" db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') isAdmin = None cmd = 'SELECT isAdmin FROM users WHERE userName = "'+user_name+'"' for row in db.query(cmd) : isAdmin = row['isAdmin'] if isAdmin== None: return "no user" return isAdmin def check_role_type(user_name:str)->int: """查看使用者權限""" db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') cmd = 'SELECT user_role.role_id FROM `users` JOIN `user_role` ON `users`.id = `user_role`.user_id where `users`.username = "'+user_name+'"' role_type = None for row in db.query(cmd) : role_type = row['role_id'] return role_type def check_role_acl(role:int): """查看權限""" db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') cmd = 'SELECT * FROM role_acl where role_id = '+str(role) result = [] for row in db.query(cmd) : dic ={} for col_name in db['role_acl'].columns: dic[col_name] = row[col_name] if dic != {}: result.append(dic) return result def get_role_name(role_id:int): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') cmd = 'SELECT * FROM role where id = '+str(role_id) role:str for row in db.query(cmd) : role = row['name'] return role def check_user_isEnable(user_name:str)->bool: db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') cmd = 'SELECT isEnable FROM users where username = "'+str(user_name)+'"' able:bool for row in db.query(cmd) : able = row['isEnable'] return able def get_user_under_organization(user_name:str): """查看所屬公司""" db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') user_role = check_role_type(user_name) print(user_name,user_role) cmd = 'SELECT * FROM organization' result = [] if int(user_role) == 1 : for row in db.query(cmd) : company = row['Company'] factory = row['Factory'] department = row['Department'] cmd2 = 'SELECT TowerGroupCode FROM device WHERE CompanyCode = "' + company + '" AND FactoryCode = "' + factory + '" AND DepartmentCode = "' + department + '"' group = [] for row2 in db.query(cmd2): if row2['TowerGroupCode'] not in group : group.append(row2['TowerGroupCode']) result.append({'company':company,'factory':factory,'department':department,'group':group,'able':1}) elif int(user_role) == 2: cmd2 = 'SELECT company FROM user WHERE user.username ="'+user_name +'"' company_able:str for row in db.query(cmd2) : company_able = row['company'] for row in db.query(cmd) : company = row['Company'] factory = row['Factory'] department = row['Department'] cmd3 = 'SELECT TowerGroupCode FROM device WHERE CompanyCode = "' + company + '" AND FactoryCode = "' + factory + '" AND DepartmentCode = "' + department + '"' group = [] for row2 in db.query(cmd3): if row2['TowerGroupCode'] not in group : group.append(row2['TowerGroupCode']) if company == company_able: result.append({'company':company,'factory':factory,'department':department,'group':group,'able':1}) else: result.append({'company':company,'factory':factory,'department':department,'group':group,'able':0}) elif int(user_role) == 3: cmd2 = 'SELECT company,factory FROM users WHERE users.username = "'+user_name +'"' company_able:str factory_able:str num = 0 for row in db.query(cmd2) : company_able = row['company'] factory_able = row['factory'] for row in db.query(cmd) : company = row['Company'] factory = row['Factory'] department = row['Department'] cmd3 = 'SELECT TowerGroupCode FROM device WHERE CompanyCode = "' + company + '" AND FactoryCode = "' + factory + '" AND DepartmentCode = "' + department + '"' group = [] for row2 in db.query(cmd3): if row2['TowerGroupCode'] not in group : group.append(row2['TowerGroupCode']) if company == company_able and factory==factory_able: result.append({'company':company,'factory':factory,'department':department,'group':group,'able':1}) else: result.append({'company':company,'factory':factory,'department':department,'group':group,'able':0}) elif int(user_role) == 4: cmd2 = 'SELECT company,factory,department FROM users WHERE username = "'+user_name +'"' company_able:str factory_able:str department_able:str for row in db.query(cmd2) : company_able = row['company'] factory_able = row['factory'] department_able = row['department'] for row in db.query(cmd) : company = row['Company'] factory = row['Factory'] department = row['Department'] cmd3 = 'SELECT TowerGroupCode FROM device WHERE CompanyCode = "' + company + '" AND FactoryCode = "' + factory + '" AND DepartmentCode = "' + department + '"' group = [] for row2 in db.query(cmd3): if row2['TowerGroupCode'] not in group : group.append(row2['TowerGroupCode']) if company == company_able and factory==factory_able and department==department_able: result.append({'company':company,'factory':factory,'department':department,'group':group,'able':1}) else: result.append({'company':company,'factory':factory,'department':department,'group':group,'able':0}) else : result =[ {'msg':"error"}] return result def get_user_id(user_name:str): """獲取user id""" db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') cmd = 'SELECT id FROM `users` where username = "'+user_name+'"' id = None for row in db.query(cmd) : id = row['id'] return id def get_user_name(user_id:int): """獲取user name""" db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') cmd = 'SELECT username FROM `users` where id = "'+user_id+'"' id = None for row in db.query(cmd) : id = row['username'] return id def get_modul_name(modul_id:str): """獲取modul名稱""" db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') cmd = 'SELECT moduleName FROM `module` where id = "'+modul_id+'"' modul_name = None for row in db.query(cmd) : modul_name = row['moduleName'] return modul_name def get_tower_info(tower_id:str): """獲取水塔資料""" db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') cmd = 'SELECT * FROM `record_dcs` where device_id = "'+tower_id+'"' days=0 dateFormatter = "%Y-%B-%d %H:%M:%S" loc_dt = datetime.today() result ={'DCS':{},'Fan':{},'Moter':{},'Device_info':{}} for row in db.query(cmd) : dateString=row['time_stamp'] if days==0: days= (loc_dt-dateString).days result['DCS'][row['key']]=row['value'] elif row['key'] in result['DCS'].keys(): #days= (loc_dt-dateString).days result['DCS'][row['key']]=row['value'] elif (loc_dt-dateString).days < days: days= (loc_dt-dateString).days result['DCS'][row['key']]=row['value'] elif (loc_dt-dateString).days == days: days= (loc_dt-dateString).days result['DCS'][row['key']]=row['value'] if result['DCS']=={}: result['DCS']['hotTemp']=32.00 result['DCS']['coldTemp']=32.00 result['DCS']['waterFlow']=32.00 result['DCS']['fanMotorCur']=32.00 result['DCS']['fanMotorSpeedFreq']=32.00 days=0 cmd = 'SELECT * FROM `record_tower` where device_id = "'+tower_id+'"' for row in db.query(cmd) : dateString=row['Createtime'] if dateString == None: dateString=row['time_stamp'] if isinstance(dateString, datetime)==False: dateFormatter = "%Y-%m-%d %H:%M:%S.%f" dateString = datetime.strptime(dateString, dateFormatter) if days==0: days= (loc_dt-dateString).days result['Fan'][row['key']]=row['value'] elif row['key'] in result['DCS'].keys(): result['Fan'][row['key']]=row['value'] elif (loc_dt-dateString).days < days: days= (loc_dt-dateString).days result['Fan'][row['key']]=row['value'] elif (loc_dt-dateString).days == days: days= (loc_dt-dateString).days result['Fan'][row['key']]=row['value'] result['Moter'] = [] cmd = 'SELECT * FROM `vibration` where device_id = "'+tower_id+'"' tmp = {} for row in db.query(cmd) : for col in db['vibration'].columns : tmp[col] = row[col] result['Moter'].append(tmp) tmp = {} if result['Moter']== []: result['Moter'].append({'DataValue':213,'CVindex':222}) result['Moter'].append({'DataValue':213,'CVindex':222}) result['Moter'].append({'DataValue':213,'CVindex':222}) result['Moter'].append({'DataValue':213,'CVindex':222}) cmd = 'SELECT * FROM `device` where id = "'+tower_id+'"' for row in db.query(cmd): for col in db['device'].columns : if col != 'createTime': result['Device_info'][col] = row[col] return result def get_tower_perform(tower_id:str): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') cmd = 'SELECT * FROM `record_performance` where deviceCode = "'+tower_id+'" AND designWFR = 27000' result = [{}] for row in db.query(cmd): for col in db['record_performance'].columns : if col != 'createTime': result[0][col] = row[col] #print(result) return result def get_tower(company:str,factory:str,department:str,towerGroup:str): towergroup_arr =[] db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') cmd = 'SELECT id FROM `device` where CompanyCode = "'+company+'" AND FactoryCode = "' +factory+'" AND DepartmentCode = "'+department+'" AND TowerGroupCode = "' + towerGroup + '"' for row in db.query(cmd) : towergroup_arr.append(row['id']) return towergroup_arr def check_tower_health(user_name:str): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') org = get_user_under_organization(user_name) #print(org) towers : list result = [] for dic in org: #print(dic) if dic['able'] == 1: for group in dic['group']: towers = get_tower(dic['company'],dic['factory'],dic['department'],group) for tower in towers: cmd = 'SELECT CVIndex,threshold FROM `vibration` where device_id = "'+tower+'"' health =1 for row in db.query(cmd) : print(row['CVIndex'],row['threshold']) if row['CVIndex'] < int(row['threshold']): health = 0 print("heheh") print(health) result.append({'company':dic['company'], 'factory':dic['factory'], 'department':dic['department'], 'group': group, 'tower': tower, 'health': health}) return result def find_vibration_id(tower_id:str,channel_id:str): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') cmd = 'SELECT id FROM `vibration` where device_id = "'+tower_id+'" AND channelName = "' +channel_id+'"' for row in db.query(cmd) : return row['id'] def get_channel_info(vibration_id): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') cmd = 'SELECT * FROM `record_diagnosis` where vibration_id = "'+vibration_id+'"' result = {} for row in db.query(cmd) : for col in db['record_diagnosis'].columns : result[col] = row[col] cmd2='SELECT CVIndex,threshold FROM `vibration` where id = "'+vibration_id+'"' for row2 in db.query(cmd2) : result['CVIndex'] = row2['CVIndex'] result['threshold'] = row2['threshold'] print(result) return result def get_channel_health(vibration_id:str): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') result = [] data_name = ['CV_index', 'Vrms' ,'RPM'] for row in data_name: result.append({'name':row,'data':[]}) cmd = 'SELECT * FROM record_health where vibration_id = "'+vibration_id+'"' for row in db.query(cmd) : for dic in result: dic['data'].append(row[dic['name']]) return result def get_predect_data(vibration_id:str): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') result = {} cmd = 'SELECT * FROM record_prediction_upd where vibration_id = "'+vibration_id+'"' for row in db.query(cmd) : arr = row['predictData'].split(',') result['data']=arr return result def get_frequency(vibration_id:str): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') result = {'Hz':[],'value':[]} print(vibration_id) cmd = 'SELECT * FROM record_frequency where vibration_id = "'+vibration_id+'"' for row in db.query(cmd) : result['Hz'].append(row['MFHz']) result['value'].append(row['MFValue']) return result def get_vibration_info(deviceCode:str): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') cmd = 'SELECT * FROM vibration where device_id = "'+deviceCode+'"' tmp = [] for row in db.query(cmd) : result = {} for col in db['vibration'].columns : result[col] = row[col] #print(result) tmp.append(result) #print(tmp) return tmp def add_data(): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') loc_dt = datetime.today() loc_dt_format = loc_dt.strftime("%Y-%m-%d %H:%M:%S") cmd = "TRUNCATE TABLE record_health" db.query(cmd) for j in range(1,5): for i in range(0,7): time_del = timedelta(days=i) date=loc_dt-time_del db['record_health'].insert(dict(time_stamp=date, CV_index=uniform(0.6, 0.8), Vrms=uniform(1,3),Grms = uniform(1,3),RPM=uniform(1,3),vibration_id = j)) cmd = "TRUNCATE TABLE record_frequency" db.query(cmd) for i in range(0,1000): print(i) db['record_frequency'].insert(dict(time_stamp=loc_dt_format, MFHz=i, MFValue=uniform(0,0.12),vibration_id = 1)) cmd = "TRUNCATE TABLE record_frequency" db.query(cmd) for i in range(0,1000): print(i) db['record_frequency'].insert(dict(time_stamp=loc_dt_format, MFHz=i, MFValue=uniform(0,0.12),vibration_id = 1)) def add_tower(): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4') loc_dt = datetime.today() loc_dt_format = loc_dt.strftime("%Y-%m-%d %H:%M:%S") for device in ['dev005','dev006','dev007','dev008','dev009']: for key in ['hotTemp','waterFlow','coldTemp','fanMotorCur','fanMotorSpeedFreq']: db['record_dcs'].insert(dict(time_stamp=loc_dt_format, MFValue=uniform(0,0.12),vibration_id = 1))