|
- # fastapi
- from fastapi import FastAPI, Request, Response, HTTPException, status, Depends , Form
- from fastapi import templating
- from fastapi.templating import Jinja2Templates
- from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
- from fastapi.middleware.cors import CORSMiddleware
- from fastapi.staticfiles import StaticFiles
- from pydantic import BaseModel
- # fastapi view function parameters
- from typing import List, Optional
- import json
- # path
- import sys
- from pydantic.errors import ArbitraryTypeError
- from sqlalchemy.sql.elements import False_
- # time
- # import datetime
- from datetime import timedelta, datetime
- # db
- import dataset
- from passlib import context
- from sqlalchemy.sql.expression import true
- import models
- from random import randint,uniform
- # authorize
- from passlib.context import CryptContext
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
- import jwt
- from fastapi_jwt_auth import AuthJWT
- from fastapi_jwt_auth.exceptions import AuthJWTException
- from fastapi.security import OAuth2AuthorizationCodeBearer, OAuth2PasswordRequestForm
- import numpy as np
- import pymysql
- class dateEncode(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, datetime):
- return obj.strftime('%Y-%m-%d %H:%M:%S')
- else:
- return json.JSONEncoder.default(self, obj)
- pymysql.install_as_MySQLdb()
- db_settings = {
- "host": "db.ptt.cx",
- "port": 3306,
- "user": "choozmo",
- "password": "pAssw0rd",
- "db": "Water_tower",
- "charset": "utf8mb4"
- }
- # app
- app = FastAPI()
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- SECRET_KEY = "df2f77bd544240801a048bd4293afd8eeb7fff3cb7050e42c791db4b83ebadcd"
- ALGORITHM = "HS256"
- ACCESS_TOKEN_EXPIRE_MINUTES = 3000
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
- #
- app.mount(path='/templates', app=StaticFiles(directory='templates'), name='templates')
- app.mount(path='/static', app=StaticFiles(directory='static'), name='static ')
- #
- templates = Jinja2Templates(directory='templates')
- @AuthJWT.load_config
- def get_config():
- return models.Settings()
- # view
- @app.get('/', response_class=HTMLResponse)
- async def index(request: Request):
- print(request)
- return templates.TemplateResponse(name='index.html', context={'request': request})
-
- @app.get('/login', response_class=HTMLResponse)
- async def login(request: Request):
- return templates.TemplateResponse(name='login.html', context={'request': request})
- @app.post("/login")
- async def login_for_access_token(request: Request, form_data: OAuth2PasswordRequestForm = Depends(), Authorize: AuthJWT = Depends()):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
-
- user = authenticate_user(form_data.username, form_data.password)
- if not user:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Incorrect username or password",
- headers={"WWW-Authenticate": "Bearer"},
- )
- access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
- access_token = create_access_token(
- data={"sub": user.username}, expires_delta=access_token_expires
- )
- table = db['users']
- user.token = access_token
- print(user)
- table.update(dict(user), ['username'],['password'])
- access_token = Authorize.create_access_token(subject=user.username)
- refresh_token = Authorize.create_refresh_token(subject=user.username)
- Authorize.set_access_cookies(access_token)
- Authorize.set_refresh_cookies(refresh_token)
- #return templates.TemplateResponse("home.html", {"request": request, "msg": 'Login'})
- return {"access_token": access_token, "token_type": "bearer"} # 回傳token給前端
- @app.get('/register', response_class=HTMLResponse)
- async def login(request: Request):
- return templates.TemplateResponse(name='rigister_test.html', context={'request': request})
- class DataObjects(BaseModel):
- CreateTime:str
- DeviceCode: str
- Key: str
- Value: str
- class P_Multiple(BaseModel):
- Token: str
- DataObjects: List[DataObjects]
- # description: Optional[str] = None
- # price: float
- # tax: Optional[float] = None
- db2 = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- table_record_tower = db2['record_tower']
- @app.post('/addRecord_Parameter_Multiple')
- async def register(item: P_Multiple):
- global table_record_tower
- # print(item.Token)
- # print(item.DataObjects)
- for d in item.DataObjects:
- # print(d.Key)
- # print(d.Value)
- table_record_tower.insert({'key':d.Key,'value':d.Value,'Createtime':d.CreateTime,'device_id':d.DeviceCode})
- # print(item)
- return "OK"
- @app.post('/register')
- async def register(request: Request, form_data: OAuth2PasswordRequestForm = Depends()):
- user = models.User(**await request.form())
- print(form_data.username, form_data.password, user)
- user.id = randint(1000, 9999)
- user.isAdmin = 0 #預設為非管理者
- user.roleType = 0 #預設為employee
- # 密碼加密
- #user.password = get_password_hash(user.password)
-
- # 存入DB
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- user_table = db['users']
- user_table.insert(dict(user))
-
- # 跳轉頁面至登入
- return templates.TemplateResponse(name='login.html', context={'request': request})
- @app.post('/record_tower')
- async def record_tower(request: Request,data : models.record_tower_data,key:str):
- #data = models.tower_data(**await request.form())
- if key!="21232f297a57a5a743894a0e4a801fc3":
- return {'msg':'no access'}
- loc_dt = datetime.today()
- loc_dt_format = loc_dt.strftime("%Y-%m-%d %H:%M:%S")
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- # cmd ="SELECT * FROM device"
- dict_tmp = data.__dict__
- # check = False
- # for row in db.query(cmd):
- # if row['id']== tower_id:
- # check = True
- # if check :
-
- # result={'msg':"success"}
- # else:
- # result = {'msg':"no device"}
- dict_tmp['id']=pymysql.NULL
- result = db['record_tower'].insert(dict(dict_tmp))
- result={'msg':"success"}
- return json.dumps(result,ensure_ascii=False)
- @app.post('/record_dcs')
- async def record_dcs(request: Request,data : models.record_tower_data,key:str):
- #data = models.tower_data(**await request.form())
- if key!="21232f297a57a5a743894a0e4a801fc3":
- return {'msg':'no access'}
- loc_dt = datetime.today()
- loc_dt_format = loc_dt.strftime("%Y-%m-%d %H:%M:%S")
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- dict_tmp = data.__dict__
- # cmd ="SELECT * FROM device"
- # cmd2={}
- # check = False
-
- # if check :
-
- # result={'msg':"success"}
- # else:
- # result = {'msg':"no device"}
- result = db['record_dcs'].insert(dict(dict_tmp))
- result={'msg':"success"}
- return json.dumps(result,ensure_ascii=False)
- @app.post('/record_diagnosis')
- async def record_diagnosis(request: Request,data : models.record_diagnosis_data,key:str):
- #data = models.tower_data(**await request.form())
- if key!="21232f297a57a5a743894a0e4a801fc3":
- return {'msg':'no access'}
- loc_dt = datetime.today()
- loc_dt_format = loc_dt.strftime("%Y-%m-%d %H:%M:%S")
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- dict_tmp = data.__dict__
- dict_tmp['createTime']=loc_dt_format
- result = db['record_diagnosis'].update(dict(dict_tmp),['vibration_id'])
- #print(result)
- if result :
- result={'msg':'success insert'}
- else :
- result={'msg':'fail to insert'}
- return json.dumps(result,ensure_ascii=False)
- @app.post('/record_health')
- async def record_health(request: Request,data : models.record_health_data,key:str):
- #data = models.tower_data(**await request.form())
- if key!="21232f297a57a5a743894a0e4a801fc3":
- return {'msg':'no access'}
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- dict_tmp = data.__dict__
- result = db['record_health'].insert(dict(dict_tmp))
- #print(result)
- if result :
- result={'msg':'success insert'}
- else :
- result={'msg':'fail to insert'}
- return json.dumps(result,ensure_ascii=False)
- @app.post('/record_performance')
- async def record_performance(request: Request,data : models.record_performance_data,key:str):
- #data = models.tower_data(**await request.form())
- if key!="21232f297a57a5a743894a0e4a801fc3":
- return {'msg':'no access'}
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- dict_tmp = data.__dict__
- result = db['record_performance'].insert(dict(dict_tmp))
- #print(result)
- if result :
- result={'msg':'success insert'}
- else :
- result={'msg':'fail to insert'}
- return json.dumps(result,ensure_ascii=False)
- @app.post('/record_prediction')
- async def record_prediction(request: Request,data : models.record_prediction_data,key:str):
- #data = models.tower_data(**await request.form())
- if key!="21232f297a57a5a743894a0e4a801fc3":
- return {'msg':'no access'}
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- dict_tmp = data.__dict__
- result = db['record_prediction'].insert(dict(dict_tmp))
- #print(result)
- if result :
- result={'msg':'success insert'}
- else :
- result={'msg':'fail to insert'}
- return json.dumps(result,ensure_ascii=False)
- @app.post('/record_prediction_upd')
- async def record_prediction(request: Request,data : models.record_prediction_upd_data,key:str):
- #data = models.tower_data(**await request.form())
- if key!="21232f297a57a5a743894a0e4a801fc3":
- return {'msg':'no access'}
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- dict_tmp = data.__dict__
- result = db['record_prediction_upd'].insert(dict(dict_tmp))
- #print(result)
- if result :
- result={'msg':'success insert'}
- else :
- result={'msg':'fail to insert'}
- return json.dumps(result,ensure_ascii=False)
- @app.get('/add_tower')
- async def home(request: Request, Authorize: AuthJWT = Depends()):
- return templates.TemplateResponse(name='add_tower.html', context={'request': request})
- @app.post('/add_tower')
- async def home(request: Request, Authorize: AuthJWT = Depends()):
- data=request.form()
- print(data.device)
- @app.get('/home', response_class=HTMLResponse)
- async def home(request: Request, Authorize: AuthJWT = Depends()):
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- return RedirectResponse('/login')
- #add_data()
- return templates.TemplateResponse(name='home.html', context={'request': request})
- @app.get('/home/show', response_class=HTMLResponse)
- async def home(request: Request, Authorize: AuthJWT = Depends()):
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- return RedirectResponse('/login')
- current_user = Authorize.get_jwt_subject()
- result = [{'user_role':check_role_type(current_user)}]
- result.append(check_tower_health(current_user))
- #print(result)
- return json.dumps(result,ensure_ascii=False)
- @app.get('/org', response_class=HTMLResponse)
- async def tower(request: Request, Authorize: AuthJWT = Depends()):
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- return RedirectResponse('/login')
- current_user = Authorize.get_jwt_subject()
- result = get_user_under_organization(current_user)
- return json.dumps(result,ensure_ascii=False)
- @app.get('/user_role', response_class=HTMLResponse)
- async def user_role(request: Request, Authorize: AuthJWT = Depends()):
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- return RedirectResponse('/login')
- current_user = Authorize.get_jwt_subject()
- result = {'role':check_role_type(current_user)}
- print(result)
- return json.dumps(result,ensure_ascii=False)
- @app.get('/tower', response_class=HTMLResponse)
- async def tower(request: Request, Authorize: AuthJWT = Depends()):
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- return RedirectResponse('/login')
- current_user = Authorize.get_jwt_subject()
- return templates.TemplateResponse(name='tower.html', context={"request":request})
- @app.get('/tower/org', response_class=HTMLResponse)
- async def tower(request: Request, Authorize: AuthJWT = Depends()):
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- return RedirectResponse('/login')
- current_user = Authorize.get_jwt_subject()
- result = get_user_under_organization(current_user)
- return json.dumps(result,ensure_ascii=False)
- @app.get('/tower/', response_class=HTMLResponse)
- async def tower(request: Request,company:str,factory:str,department:str,towerGroup:str, Authorize: AuthJWT = Depends()):
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- return RedirectResponse('/login')
- #current_user = Authorize.get_jwt_subject()
- tower_arr = get_tower(company,factory,department,towerGroup)
- result = []
- for tower in tower_arr:
- result.append({'tower_name': tower,'tower_data': get_tower_info(tower)})
-
- return json.dumps(result,ensure_ascii=False, cls = dateEncode)
- @app.get('/tower/performance/{tower_id}', response_class=HTMLResponse)
- async def member_authority(request:Request,tower_id: str,Authorize: AuthJWT = Depends()):
- """設定成員權限"""
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- return RedirectResponse('/login')
- result = get_tower_perform(tower_id)
- #print(result)
- return json.dumps(result,ensure_ascii=False, cls = dateEncode)
-
- @app.get('/optim', response_class=HTMLResponse)
- async def optim(request: Request, Authorize: AuthJWT = Depends()):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- statement = 'SELECT value FROM record_tower WHERE record_tower.key = "hotTemp"'
- x = 0
- y = 0
- z = 0
- count = 0
- waterflow = 0
- fannum = 0
- savewater = 0
- fanchange = 0
- saveelec = 0
- total = 0
- flowchange = 0
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- return RedirectResponse('/login')
- current_user = Authorize.get_jwt_subject()
- print(check_role_type(current_user))
- role = int(check_role_type(current_user))
- print(check_role_type(current_user))
- if role == 1:
- statement = 'SELECT value FROM record_tower WHERE record_tower.key = "hotTemp"'
- for temp in db.query(statement):
- print(temp['value'])
- x=temp['value']
- statement2 = 'SELECT value FROM record_tower WHERE record_tower.key = "coldTempData1"'
- for temp2 in db.query(statement2):
- print(temp2['value'])
- y=temp2['value']
- statement3 = 'SELECT value FROM record_tower WHERE record_tower.key = "wetTemp"'
- for temp3 in db.query(statement3):
- print(temp3['value'])
- z=temp3['value']
- statement4 = 'SELECT value FROM record_tower WHERE record_tower.key = "count"'
- for tower in db.query(statement4):
- print(tower['value'])
- count=tower['value']
- statement5 = 'SELECT value FROM record_tower WHERE record_tower.key = "waterflow"'
- for tower in db.query(statement5):
- waterflow=tower['value']
- statement6 = 'SELECT value FROM record_tower WHERE record_tower.key = "fannum"'
- for tower in db.query(statement6):
- fannum=tower['value']
- statement7 = 'SELECT value FROM record_tower WHERE record_tower.key = "flowchange"'
- for tower in db.query(statement7):
- flowchange=tower['value']
- statement8 = 'SELECT value FROM record_tower WHERE record_tower.key = "savewater"'
- for tower in db.query(statement8):
- savewater=tower['value']
- statement9 = 'SELECT value FROM record_tower WHERE record_tower.key = "fanchange"'
- for tower in db.query(statement9):
- fanchange=tower['value']
- statement10 = 'SELECT value FROM record_tower WHERE record_tower.key = "saveelec"'
- for tower in db.query(statement10):
- saveelec=tower['value']
- statement11 = 'SELECT value FROM record_tower WHERE record_tower.key = "total"'
- for tower in db.query(statement11):
- total=tower['value']
- elif role == 3 :
- statement = 'SELECT value FROM record_tower WHERE record_tower.key = "hotTemp1"'
- for temp in db.query(statement):
- print(temp['value'])
- x=temp['value']
- statement2 = 'SELECT value FROM record_tower WHERE record_tower.key = "coldTempData2"'
- for temp2 in db.query(statement2):
- print(temp2['value'])
- y=temp2['value']
- statement3 = 'SELECT value FROM record_tower WHERE record_tower.key = "wetTemp1"'
- for temp3 in db.query(statement3):
- print(temp3['value'])
- z=temp3['value']
- statement4 = 'SELECT value FROM record_tower WHERE record_tower.key = "count1"'
- for tower in db.query(statement4):
- print(tower['value'])
- count=tower['value']
- statement5 = 'SELECT value FROM record_tower WHERE record_tower.key = "waterflow1"'
- for tower in db.query(statement5):
- waterflow=tower['value']
- statement6 = 'SELECT value FROM record_tower WHERE record_tower.key = "fannum1"'
- for tower in db.query(statement6):
- fannum=tower['value']
- statement7 = 'SELECT value FROM record_tower WHERE record_tower.key = "flowchange1"'
- for tower in db.query(statement7):
- flowchange=tower['value']
- statement8 = 'SELECT value FROM record_tower WHERE record_tower.key = "savewater1"'
- for tower in db.query(statement8):
- savewater=tower['value']
- statement9 = 'SELECT value FROM record_tower WHERE record_tower.key = "fanchange1"'
- for tower in db.query(statement9):
- fanchange=tower['value']
- statement10 = 'SELECT value FROM record_tower WHERE record_tower.key = "saveelec1"'
- for tower in db.query(statement10):
- saveelec=tower['value']
- statement11 = 'SELECT value FROM record_tower WHERE record_tower.key = "total1"'
- for tower in db.query(statement11):
- total=tower['value']
- elif role == 4 :
- statement = 'SELECT value FROM record_tower WHERE record_tower.key = "hotTemp2"'
- for temp in db.query(statement):
- print(temp['value'])
- x=temp['value']
- statement2 = 'SELECT value FROM record_tower WHERE record_tower.key = "coldTempData3"'
- for temp2 in db.query(statement2):
- print(temp2['value'])
- y=temp2['value']
- statement3 = 'SELECT value FROM record_tower WHERE record_tower.key = "wetTemp2"'
- for temp3 in db.query(statement3):
- print(temp3['value'])
- z=temp3['value']
- statement4 = 'SELECT value FROM record_tower WHERE record_tower.key = "count2"'
- for tower in db.query(statement4):
- print(tower['value'])
- count=tower['value']
- statement5 = 'SELECT value FROM record_tower WHERE record_tower.key = "waterflow2"'
- for tower in db.query(statement5):
- waterflow=tower['value']
- statement6 = 'SELECT value FROM record_tower WHERE record_tower.key = "fannum2"'
- for tower in db.query(statement6):
- fannum=tower['value']
- statement7 = 'SELECT value FROM record_tower WHERE record_tower.key = "flowchange2"'
- for tower in db.query(statement7):
- flowchange=tower['value']
- statement8 = 'SELECT value FROM record_tower WHERE record_tower.key = "savewater2"'
- for tower in db.query(statement8):
- savewater=tower['value']
- statement9 = 'SELECT value FROM record_tower WHERE record_tower.key = "fanchange2"'
- for tower in db.query(statement9):
- fanchange=tower['value']
- statement10 = 'SELECT value FROM record_tower WHERE record_tower.key = "saveelec2"'
- for tower in db.query(statement10):
- saveelec=tower['value']
- statement11 = 'SELECT value FROM record_tower WHERE record_tower.key = "total2"'
- for tower in db.query(statement11):
- total=tower['value']
- else :
- print("noright")
- return templates.TemplateResponse(name='optim.html',context=
- {
- 'request': request,"x":x,"y":y,"z":z,"count":count,"waterflow":waterflow
- ,"fannum":fannum , "flowchange":flowchange , "savewater":savewater
- , "fanchange":fanchange , "saveelec":saveelec , "total":total
- })
- @app.get('/vibration', response_class=HTMLResponse)
- async def vibration(request: Request, Authorize: AuthJWT = Depends()):
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- return RedirectResponse('/login')
- return templates.TemplateResponse(name='vibration_test.html', context={'request': request})
- @app.get('/channel', response_class=HTMLResponse)
- async def vibration(request: Request, Authorize: AuthJWT = Depends()):
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- return RedirectResponse('/login')
- return templates.TemplateResponse(name='channel.html', context={'request': request})
- @app.get('/channel/{tower_id}/{channel_id}', response_class=HTMLResponse)
- async def vibration(request: Request,tower_id:str,channel_id:str,Authorize: AuthJWT = Depends()):
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- return RedirectResponse('/login')
-
- print(find_vibration_id(tower_id,channel_id))
- result = get_channel_info(find_vibration_id(tower_id,channel_id))
- return json.dumps(result,ensure_ascii=False, cls = dateEncode)
- @app.get('/channel_chart/{tower_id}/{channel_id}', response_class=HTMLResponse)
- async def vibration(request: Request,tower_id:str,channel_id:str,Authorize: AuthJWT = Depends()):
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- return RedirectResponse('/login')
-
- print(find_vibration_id(tower_id,channel_id))
- result = get_channel_health(find_vibration_id(tower_id,channel_id))
- return json.dumps(result,ensure_ascii=False, cls = dateEncode)
- @app.get('/channel_predict/{tower_id}/{channel_id}', response_class=HTMLResponse)
- async def vibration(request: Request,tower_id:str,channel_id:str,Authorize: AuthJWT = Depends()):
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- return RedirectResponse('/login')
-
- #print(find_vibration_id(tower_id,channel_id))
- result = get_predect_data(find_vibration_id(tower_id,channel_id))
- return json.dumps(result,ensure_ascii=False, cls = dateEncode)
- @app.get('/channel_frequency/{tower_id}/{channel_id}', response_class=HTMLResponse)
- async def vibration(request: Request,tower_id:str,channel_id:str,Authorize: AuthJWT = Depends()):
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- return RedirectResponse('/login')
-
- #print(find_vibration_id(tower_id,channel_id))
- result = get_frequency(find_vibration_id(tower_id,channel_id))
- return json.dumps(result,ensure_ascii=False, cls = dateEncode)
- @app.get('/history', response_class=HTMLResponse)
- async def history(request: Request, Authorize: AuthJWT = Depends()):
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- return RedirectResponse('/login')
- # current_user = Authorize.get_jwt_subject()
- return templates.TemplateResponse(name='history.html', context={'request': request})
- @app.get('/history/', response_class=HTMLResponse)
- async def history(request: Request,company:str,factory:str,department:str,towerGroup:str, Authorize: AuthJWT = Depends()):
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- return RedirectResponse('/login')
- #current_user = Authorize.get_jwt_subject()
- tower_arr = get_tower(company,factory,department,towerGroup)
- result = []
- for tower in tower_arr:
- tmp_arr = get_vibration_info(tower)
- for arr in tmp_arr:
- result.append(arr)
-
- return json.dumps(result,ensure_ascii=False, cls = dateEncode)
- @app.get('/device', response_class=HTMLResponse)
- async def device(request: Request, Authorize: AuthJWT = Depends()):
-
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- return RedirectResponse('/login')
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- statement = 'SELECT * FROM device '
- a = []
- print("start")
- for row in db.query(statement):
- print(row['id'],row['deviceName'],row['hostIP'],row['CompanyCode'],row['FactoryCode'],row['DepartmentCode'])
- a.append((row['id'],row['deviceName'],row['hostIP'],row['CompanyCode'],row['FactoryCode'],row['DepartmentCode']))
- print(a)
- print("over3")
- #result = json.dumps(b,ensure_ascii=False)
- # current_user = Authorize.get_jwt_subject()
- return templates.TemplateResponse(name='device.html', context={'request': request,'a':a})
- @app.get('/system', response_class=HTMLResponse)
- async def system(request: Request, Authorize: AuthJWT = Depends()):
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- return RedirectResponse('/login')
- # current_user = Authorize.get_jwt_subject()
- return templates.TemplateResponse(name='system.html', context={'request': request})
- @app.get('/member', response_class=HTMLResponse)
- async def get_member(request: Request, Authorize: AuthJWT = Depends()):
- """獲取所有帳號資訊"""
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- return RedirectResponse('/login')
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- statement = 'SELECT id,username,isAdmin FROM users'
- json_dic = []
- for row in db.query(statement):
- #print(row['id'],row['username'])
- json_dic.append({'username':row['username'],'isAdmin':row['isAdmin'],'roleType':check_role_type(row['username']),'role_name' :get_role_name(check_role_type(row['username']))})
- result = json.dumps(json_dic,ensure_ascii=False)
-
- return result
- @app.get('/member/edit/', response_class=HTMLResponse)
- async def login(request: Request, name:str,isAdmin:int,isEnable:int ,Authorize: AuthJWT = Depends()):
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- return RedirectResponse('/login')
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- current_user = Authorize.get_jwt_subject()
- current_user_roleType = check_role_type(current_user)
- del_user_roleType = check_role_type(name)
- statement = 'SELECT isAdmin FROM users WHERE userName = "'+current_user+'"'
- for row in db.query(statement):
- if row['isAdmin']!=1:
- return json.dumps([{'msg':'你沒有權限'}],ensure_ascii=False)
- if del_user_roleType == None:
- return json.dumps([{'msg':'不存在使用者'}],ensure_ascii=False)
- elif current_user_roleType>del_user_roleType or current_user_roleType==del_user_roleType:
- return json.dumps([{'msg':'你沒有權限'}],ensure_ascii=False)
- user_dic = get_user(name)
- print(user_dic)
- user_dic.isAdmin = isAdmin
- user_dic.isEnable = isEnable
-
- table = db['users']
- table.update(dict(user_dic), ['username'])
- return json.dumps([{'msg':"成功更改"}],ensure_ascii=False)
- @app.get('/member_delete', response_class=HTMLResponse)
- async def login(request: Request, Authorize: AuthJWT = Depends()):
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- return RedirectResponse('/login')
- return templates.TemplateResponse(name='delete_member_test2.html', context={'request': request})
- @app.post('/member_delete')
- async def delete_member(request: Request):
- """刪除成員"""
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- del_user = models.del_user(**await request.form())
- delete_name = del_user.del_name
- statement = 'SELECT * FROM users'
- current_user = ''
- for row in db.query(statement):
- if row['token'] != None :
- if compare_jwt_token(row['token'],del_user.access_token):
- current_user = row['username']
- if current_user == '':
- return {'msg':'尚未登入'}
- statement = 'SELECT isAdmin FROM users WHERE userName = "'+current_user+'"'
- for row in db.query(statement):
- if row['isAdmin']!=1:
- return {'msg': ' 你沒有權限'}
-
- current_user_roleType = check_role_type(current_user)
- del_user_roleType = check_role_type(delete_name)
-
- if del_user_roleType == None:
- return {'msg':'不存在使用者'}
- elif current_user_roleType>del_user_roleType or current_user_roleType==del_user_roleType:
- return {'msg': ' 你沒有權限'}
- else :
- table = db['users']
- table.delete(username=delete_name)
- return {'msg': ' 成功刪除'}
- @app.get('/member_authority/{edit_one}', response_class=HTMLResponse)
- async def member_authority(request:Request,edit_one: int,Authorize: AuthJWT = Depends()):
- """設定成員權限"""
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- return RedirectResponse('/login')
- context = {'request': request}
- current_user = Authorize.get_jwt_subject()
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- statement = check_isAdmin(current_user)
- if statement == "no user":
- return templates.TemplateResponse(name='notice.html', context={"request":request,'msg':'no user' })
- elif statement == 0:
- return templates.TemplateResponse(name='notice.html', context={"request":request,'msg':"沒有權限" })
- current_user_roleType = check_role_type(current_user)
-
- if edit_one == None:
- return templates.TemplateResponse(name='notice.html', context={"request":request,'msg':'no role' })
- elif int(current_user_roleType)>int(edit_one) or int(current_user_roleType)==int(edit_one):
- return templates.TemplateResponse(name='notice.html', context={"request":request,'msg':"沒有權限" })
- result = check_role_acl(edit_one)
-
- if result == []:
- cmd = 'SELECT id FROM module'
- for row in db.query(cmd):
- dic_tmp = {'id':0,'isView':0,'isAdd':0 ,'isEdit':0,'isDel':0,'role_id' : edit_one}
- context[get_modul_name(row['id']) ] = dic_tmp
- else:
- for dic in result:
- modul_name = get_modul_name(dic['module_id'])
- del dic['module_id']
- context[modul_name ] = dic
- return templates.TemplateResponse(name='member_authority_test.html', context=context)
-
- @app.post('/member_authority')
- async def member_authority(request: Request):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- edit_one = models.user_authority(**await request.form())
- statement = 'SELECT * FROM users'
- current_user = ''
- for row in db.query(statement):
- if row['token'] != None :
- if compare_jwt_token(row['token'],edit_one.access_token):
- current_user = row['username']
- if current_user == '':
- return templates.TemplateResponse(name='notice.html', context={"request":request,'msg':'尚未登入'})
- statement = check_isAdmin(current_user)
- if statement == "no user":
- return templates.TemplateResponse(name='notice.html', context={"request":request,'msg':statement })
- elif statement == 0:
- return templates.TemplateResponse(name='notice.html', context={"request":request,'msg':'你沒有權限' })
-
- current_user_roleType = check_role_type(current_user)
- edit_one_roleType = edit_one.role_id
-
- if current_user_roleType>edit_one_roleType or current_user_roleType==edit_one_roleType:
- return templates.TemplateResponse(name='notice.html', context={"request":request,'msg': ' 你沒有權限'})
- else :
- row = ['ai_prediction' ,'channel' ,'device', 'event', 'index' ,'performance', 'record', 'setting_device' ,'setting_system','tower']
- if check_role_acl(edit_one.role_id) == []:
- for module in row :
- new_dict = edit_one.get_acl_from_module_name(module)
- new_dict["id"]= None
- table = db['role_acl']
- table.insert(new_dict)
- else:
- for module in row :
- new_dict = edit_one.get_acl_from_module_name(module)
- table = db['role_acl']
- table.update(new_dict, ['id'])
- return templates.TemplateResponse(name='notice.html', context={"request":request,'msg': '成功更改權限'})
-
-
- # 溫度API
- @app.get('/temperature')
- async def get_temperatures():
- """ 撈DB溫度 """
- return {'hot_water': 30.48, 'cold_water': 28.10, 'wet_ball': 25.14}
- @app.post("/example")
- async def example(request: Request,Authorize: AuthJWT = Depends()):
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- current_user = Authorize.get_jwt_subject()
- #form_data = await request.form()
-
- print( current_user)
- return current_user
- @app.post('/user')
- def user(Authorize: AuthJWT = Depends()):
- Authorize.jwt_required()
- current_user = Authorize.get_jwt_subject()
- return {"user": current_user}
- @app.get("/add_data", response_class=HTMLResponse)
- async def example(request: Request,Authorize: AuthJWT = Depends()):
- try:
- Authorize.jwt_required()
- except Exception as e:
- print(e)
- return RedirectResponse('/login')
- add_data()
- return templates.TemplateResponse(name='test.html', context={'request': request})
- @app.get('/health')
- async def get_health(date: str):
- """ 撈健康指標、預設健康指標 """
- date = str(datetime.strptime(date, "%Y-%m-%d"))[:10]
- print(date)
- print(str(datetime.today()))
- print(str(datetime.today()-timedelta(days=1)))
- fake_data = {
- str(datetime.today())[:10]: {'curr_health': 0.7, 'pred_health': 0.8},
- str(datetime.today()-timedelta(days=1))[:10]: {'curr_health': 0.6, 'pred_health': 0.7},
- }
-
- return fake_data[date]
- @app.get('/history_data')
- async def get_history(time_end: str):
- """ 透過終點時間,抓取歷史資料。 """
- date = str(datetime.strptime(time_end, "%Y-%m-%d"))[:10]
- print(date)
- print(str(datetime.today()))
- print(str(datetime.today()-timedelta(days=1)))
- fake_data = {
- str(datetime.today())[:10]: {
- 'curr_history': {
- 'RPM_1X': list(np.random.rand(13)),
- 'RPM_2X': list(np.random.rand(13)),
- 'RPM_3X': list(np.random.rand(13)),
- 'RPM_4X': list(np.random.rand(13)),
- 'RPM_5X': list(np.random.rand(13)),
- 'RPM_6X': list(np.random.rand(13)),
- 'RPM_7X': list(np.random.rand(13)),
- 'RPM_8X': list(np.random.rand(13)),
- 'Gear_1X': list(np.random.rand(13)),
- 'Gear_2X': list(np.random.rand(13)),
- 'Gear_3X': list(np.random.rand(13)),
- 'Gear_4X': list(np.random.rand(13)),
- },
- 'past_history': {
- 'RPM_1X': list(np.random.rand(13)),
- 'RPM_2X': list(np.random.rand(13)),
- 'RPM_3X': list(np.random.rand(13)),
- 'RPM_4X': list(np.random.rand(13)),
- 'RPM_5X': list(np.random.rand(13)),
- 'RPM_6X': list(np.random.rand(13)),
- 'RPM_7X': list(np.random.rand(13)),
- 'RPM_8X': list(np.random.rand(13)),
- 'Gear_1X': list(np.random.rand(13)),
- 'Gear_2X': list(np.random.rand(13)),
- 'Gear_3X': list(np.random.rand(13)),
- 'Gear_4X': list(np.random.rand(13)),
- }
- },
- str(datetime.today()-timedelta(days=1))[:10]: {
- 'curr_history': {
- 'RPM_1X': list(np.random.rand(13)),
- 'RPM_2X': list(np.random.rand(13)),
- 'RPM_3X': list(np.random.rand(13)),
- 'RPM_4X': list(np.random.rand(13)),
- 'RPM_5X': list(np.random.rand(13)),
- 'RPM_6X': list(np.random.rand(13)),
- 'RPM_7X': list(np.random.rand(13)),
- 'RPM_8X': list(np.random.rand(13)),
- 'Gear_1X': list(np.random.rand(13)),
- 'Gear_2X': list(np.random.rand(13)),
- 'Gear_3X': list(np.random.rand(13)),
- 'Gear_4X': list(np.random.rand(13)),
- },
- 'past_history': {
- 'RPM_1X': list(np.random.rand(13)),
- 'RPM_2X': list(np.random.rand(13)),
- 'RPM_3X': list(np.random.rand(13)),
- 'RPM_4X': list(np.random.rand(13)),
- 'RPM_5X': list(np.random.rand(13)),
- 'RPM_6X': list(np.random.rand(13)),
- 'RPM_7X': list(np.random.rand(13)),
- 'RPM_8X': list(np.random.rand(13)),
- 'Gear_1X': list(np.random.rand(13)),
- 'Gear_2X': list(np.random.rand(13)),
- 'Gear_3X': list(np.random.rand(13)),
- 'Gear_4X': list(np.random.rand(13)),
- }
- },
- }
- return fake_data[date]
- # Login funtion part
- def check_user_exists(username):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- if int(next(iter(db.query('SELECT COUNT(*) FROM Water_tower.users WHERE userName = "'+username+'"')))['COUNT(*)']) > 0:
- return True
- else:
- return False
- def get_user(username: str):
- """ 取得使用者資訊(Model) """
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- if not check_user_exists(username): # if user don't exist
- return False
- user_dict = next(
- iter(db.query('SELECT * FROM Water_tower.users where userName ="'+username+'"')))
- user = models.User(**user_dict)
- return user
-
- def user_register(user):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- table = db['users']
- #user.password = get_password_hash(user.password)
- table.insert(dict(user))
- def get_password_hash(password):
- """ 加密密碼 """
- return pwd_context.hash(password)
- def verify_password(plain_password, hashed_password):
- """ 驗證密碼(hashed) """
- return pwd_context.verify(plain_password, hashed_password)
- def authenticate_user(username: str, password: str):
- """ 連線DB,讀取使用者是否存在。 """
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- if not check_user_exists(username): # if user don't exist
- return False
- if not check_user_isEnable(username):
- return False
- user_dict = next(iter(db.query('SELECT * FROM Water_tower.users where userName ="'+username+'"')))
- user = models.User(**user_dict)
- #if not verify_password(password, user.password):
- #return False
- return user
- def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
- """ 創建token,並設定過期時間。 """
- to_encode = data.copy()
- if expires_delta:
- expire = datetime.utcnow() + expires_delta
- else:
- expire = datetime.utcnow() + timedelta(minutes=15)
- to_encode.update({"exp": expire})
- encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
- return encoded_jwt
- def compare_jwt_token(access_token: str, token: str):
- """比對jwt token"""
- if len(access_token) < len(token):
- if access_token in token:
- return True
- else :
- return False
- elif len(access_token) > len(token):
- if token in access_token:
- return True
- else :
- return False
- else :
- if token == access_token:
- return True
- else :
- return False
- def check_isAdmin(user_name:str):
- """查看是否為管理員"""
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- isAdmin = None
- cmd = 'SELECT isAdmin FROM users WHERE userName = "'+user_name+'"'
- for row in db.query(cmd) :
- isAdmin = row['isAdmin']
- if isAdmin== None:
- return "no user"
- return isAdmin
- def check_role_type(user_name:str)->int:
- """查看使用者權限"""
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- cmd = 'SELECT user_role.role_id FROM `users` JOIN `user_role` ON `users`.id = `user_role`.user_id where `users`.username = "'+user_name+'"'
- role_type = None
- for row in db.query(cmd) :
- role_type = row['role_id']
- return role_type
-
- def check_role_acl(role:int):
- """查看權限"""
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- cmd = 'SELECT * FROM role_acl where role_id = '+str(role)
-
- result = []
- for row in db.query(cmd) :
- dic ={}
- for col_name in db['role_acl'].columns:
- dic[col_name] = row[col_name]
- if dic != {}:
- result.append(dic)
- return result
- def get_role_name(role_id:int):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- cmd = 'SELECT * FROM role where id = '+str(role_id)
- role:str
- for row in db.query(cmd) :
- role = row['name']
- return role
- def check_user_isEnable(user_name:str)->bool:
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- cmd = 'SELECT isEnable FROM users where username = "'+str(user_name)+'"'
- able:bool
- for row in db.query(cmd) :
- able = row['isEnable']
- return able
- def get_user_under_organization(user_name:str):
- """查看所屬公司"""
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- user_role = check_role_type(user_name)
- print(user_name,user_role)
- cmd = 'SELECT * FROM organization'
- result = []
- if int(user_role) == 1 :
- for row in db.query(cmd) :
- company = row['Company']
- factory = row['Factory']
- department = row['Department']
- cmd2 = 'SELECT TowerGroupCode FROM device WHERE CompanyCode = "' + company + '" AND FactoryCode = "' + factory + '" AND DepartmentCode = "' + department + '"'
- group = []
- for row2 in db.query(cmd2):
- if row2['TowerGroupCode'] not in group :
- group.append(row2['TowerGroupCode'])
- result.append({'company':company,'factory':factory,'department':department,'group':group,'able':1})
-
- elif int(user_role) == 2:
- cmd2 = 'SELECT company FROM user WHERE user.username ="'+user_name +'"'
- company_able:str
-
- for row in db.query(cmd2) :
- company_able = row['company']
- for row in db.query(cmd) :
- company = row['Company']
- factory = row['Factory']
- department = row['Department']
-
- cmd3 = 'SELECT TowerGroupCode FROM device WHERE CompanyCode = "' + company + '" AND FactoryCode = "' + factory + '" AND DepartmentCode = "' + department + '"'
- group = []
- for row2 in db.query(cmd3):
- if row2['TowerGroupCode'] not in group :
- group.append(row2['TowerGroupCode'])
- if company == company_able:
- result.append({'company':company,'factory':factory,'department':department,'group':group,'able':1})
- else:
- result.append({'company':company,'factory':factory,'department':department,'group':group,'able':0})
- elif int(user_role) == 3:
- cmd2 = 'SELECT company,factory FROM users WHERE users.username = "'+user_name +'"'
- company_able:str
- factory_able:str
- num = 0
- for row in db.query(cmd2) :
- company_able = row['company']
- factory_able = row['factory']
- for row in db.query(cmd) :
- company = row['Company']
- factory = row['Factory']
- department = row['Department']
-
- cmd3 = 'SELECT TowerGroupCode FROM device WHERE CompanyCode = "' + company + '" AND FactoryCode = "' + factory + '" AND DepartmentCode = "' + department + '"'
- group = []
- for row2 in db.query(cmd3):
- if row2['TowerGroupCode'] not in group :
- group.append(row2['TowerGroupCode'])
- if company == company_able and factory==factory_able:
- result.append({'company':company,'factory':factory,'department':department,'group':group,'able':1})
- else:
- result.append({'company':company,'factory':factory,'department':department,'group':group,'able':0})
-
- elif int(user_role) == 4:
- cmd2 = 'SELECT company,factory,department FROM users WHERE username = "'+user_name +'"'
- company_able:str
- factory_able:str
- department_able:str
-
- for row in db.query(cmd2) :
- company_able = row['company']
- factory_able = row['factory']
- department_able = row['department']
- for row in db.query(cmd) :
- company = row['Company']
- factory = row['Factory']
- department = row['Department']
-
- cmd3 = 'SELECT TowerGroupCode FROM device WHERE CompanyCode = "' + company + '" AND FactoryCode = "' + factory + '" AND DepartmentCode = "' + department + '"'
- group = []
- for row2 in db.query(cmd3):
- if row2['TowerGroupCode'] not in group :
- group.append(row2['TowerGroupCode'])
- if company == company_able and factory==factory_able and department==department_able:
- result.append({'company':company,'factory':factory,'department':department,'group':group,'able':1})
- else:
- result.append({'company':company,'factory':factory,'department':department,'group':group,'able':0})
-
- else :
- result =[ {'msg':"error"}]
- return result
- def get_user_id(user_name:str):
- """獲取user id"""
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- cmd = 'SELECT id FROM `users` where username = "'+user_name+'"'
- id = None
- for row in db.query(cmd) :
- id = row['id']
- return id
- def get_user_name(user_id:int):
- """獲取user name"""
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- cmd = 'SELECT username FROM `users` where id = "'+user_id+'"'
- id = None
- for row in db.query(cmd) :
- id = row['username']
- return id
- def get_modul_name(modul_id:str):
- """獲取modul名稱"""
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- cmd = 'SELECT moduleName FROM `module` where id = "'+modul_id+'"'
- modul_name = None
- for row in db.query(cmd) :
- modul_name = row['moduleName']
- return modul_name
- def get_tower_info(tower_id:str):
- """獲取水塔資料"""
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- cmd = 'SELECT * FROM `record_dcs` where device_id = "'+tower_id+'"'
- days=0
- dateFormatter = "%Y-%B-%d %H:%M:%S"
- loc_dt = datetime.today()
- result ={'DCS':{},'Fan':{},'Moter':{},'Device_info':{}}
- for row in db.query(cmd) :
- dateString=row['time_stamp']
- if days==0:
- days= (loc_dt-dateString).days
- result['DCS'][row['key']]=row['value']
- elif row['key'] in result['DCS'].keys():
- #days= (loc_dt-dateString).days
- result['DCS'][row['key']]=row['value']
- elif (loc_dt-dateString).days < days:
- days= (loc_dt-dateString).days
- result['DCS'][row['key']]=row['value']
- elif (loc_dt-dateString).days == days:
- days= (loc_dt-dateString).days
- result['DCS'][row['key']]=row['value']
- if result['DCS']=={}:
- result['DCS']['hotTemp']=32.00
- result['DCS']['coldTemp']=32.00
- result['DCS']['waterFlow']=32.00
- result['DCS']['fanMotorCur']=32.00
- result['DCS']['fanMotorSpeedFreq']=32.00
- days=0
-
- cmd = 'SELECT * FROM `record_tower` where device_id = "'+tower_id+'"'
- for row in db.query(cmd) :
- dateString=row['Createtime']
- if dateString == None:
- dateString=row['time_stamp']
- if isinstance(dateString, datetime)==False:
- dateFormatter = "%Y-%m-%d %H:%M:%S.%f"
- dateString = datetime.strptime(dateString, dateFormatter)
- if days==0:
- days= (loc_dt-dateString).days
- result['Fan'][row['key']]=row['value']
- elif row['key'] in result['DCS'].keys():
- result['Fan'][row['key']]=row['value']
- elif (loc_dt-dateString).days < days:
- days= (loc_dt-dateString).days
- result['Fan'][row['key']]=row['value']
- elif (loc_dt-dateString).days == days:
- days= (loc_dt-dateString).days
- result['Fan'][row['key']]=row['value']
- result['Moter'] = []
- cmd = 'SELECT * FROM `vibration` where device_id = "'+tower_id+'"'
- tmp = {}
- for row in db.query(cmd) :
- for col in db['vibration'].columns :
- tmp[col] = row[col]
- result['Moter'].append(tmp)
- tmp = {}
- if result['Moter']== []:
- result['Moter'].append({'DataValue':213,'CVindex':222})
- result['Moter'].append({'DataValue':213,'CVindex':222})
- result['Moter'].append({'DataValue':213,'CVindex':222})
- result['Moter'].append({'DataValue':213,'CVindex':222})
-
- cmd = 'SELECT * FROM `device` where id = "'+tower_id+'"'
- for row in db.query(cmd):
- for col in db['device'].columns :
- if col != 'createTime':
- result['Device_info'][col] = row[col]
- return result
- def get_tower_perform(tower_id:str):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- cmd = 'SELECT * FROM `record_performance` where deviceCode = "'+tower_id+'" AND designWFR = 27000'
- result = [{}]
- for row in db.query(cmd):
- for col in db['record_performance'].columns :
- if col != 'createTime':
- result[0][col] = row[col]
- #print(result)
- return result
- def get_tower(company:str,factory:str,department:str,towerGroup:str):
- towergroup_arr =[]
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- cmd = 'SELECT id FROM `device` where CompanyCode = "'+company+'" AND FactoryCode = "' +factory+'" AND DepartmentCode = "'+department+'" AND TowerGroupCode = "' + towerGroup + '"'
- for row in db.query(cmd) :
- towergroup_arr.append(row['id'])
-
- return towergroup_arr
- def check_tower_health(user_name:str):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- org = get_user_under_organization(user_name)
- #print(org)
- towers : list
- result = []
- for dic in org:
- #print(dic)
- if dic['able'] == 1:
- for group in dic['group']:
- towers = get_tower(dic['company'],dic['factory'],dic['department'],group)
- for tower in towers:
- cmd = 'SELECT CVIndex,threshold FROM `vibration` where device_id = "'+tower+'"'
- health =1
- for row in db.query(cmd) :
- print(row['CVIndex'],row['threshold'])
- if row['CVIndex'] < int(row['threshold']):
- health = 0
- print("heheh")
- print(health)
- result.append({'company':dic['company'], 'factory':dic['factory'], 'department':dic['department'], 'group': group, 'tower': tower, 'health': health})
- return result
- def find_vibration_id(tower_id:str,channel_id:str):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- cmd = 'SELECT id FROM `vibration` where device_id = "'+tower_id+'" AND channelName = "' +channel_id+'"'
- for row in db.query(cmd) :
- return row['id']
- def get_channel_info(vibration_id):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- cmd = 'SELECT * FROM `record_diagnosis` where vibration_id = "'+vibration_id+'"'
- result = {}
- for row in db.query(cmd) :
- for col in db['record_diagnosis'].columns :
- result[col] = row[col]
- cmd2='SELECT CVIndex,threshold FROM `vibration` where id = "'+vibration_id+'"'
- for row2 in db.query(cmd2) :
- result['CVIndex'] = row2['CVIndex']
- result['threshold'] = row2['threshold']
- print(result)
- return result
- def get_channel_health(vibration_id:str):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- result = []
- data_name = ['CV_index', 'Vrms' ,'RPM']
- for row in data_name:
- result.append({'name':row,'data':[]})
- cmd = 'SELECT * FROM record_health where vibration_id = "'+vibration_id+'"'
- for row in db.query(cmd) :
- for dic in result:
- dic['data'].append(row[dic['name']])
- return result
- def get_predect_data(vibration_id:str):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- result = {}
- cmd = 'SELECT * FROM record_prediction_upd where vibration_id = "'+vibration_id+'"'
- for row in db.query(cmd) :
- arr = row['predictData'].split(',')
- result['data']=arr
- return result
- def get_frequency(vibration_id:str):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- result = {'Hz':[],'value':[]}
- print(vibration_id)
- cmd = 'SELECT * FROM record_frequency where vibration_id = "'+vibration_id+'"'
- for row in db.query(cmd) :
- result['Hz'].append(row['MFHz'])
- result['value'].append(row['MFValue'])
- return result
- def get_vibration_info(deviceCode:str):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- cmd = 'SELECT * FROM vibration where device_id = "'+deviceCode+'"'
-
- tmp = []
- for row in db.query(cmd) :
- result = {}
- for col in db['vibration'].columns :
- result[col] = row[col]
- #print(result)
- tmp.append(result)
- #print(tmp)
- return tmp
- def add_data():
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
- loc_dt = datetime.today()
- loc_dt_format = loc_dt.strftime("%Y-%m-%d %H:%M:%S")
- cmd = "TRUNCATE TABLE record_health"
- db.query(cmd)
- for j in range(1,5):
- for i in range(0,7):
- time_del = timedelta(days=i)
- date=loc_dt-time_del
- db['record_health'].insert(dict(time_stamp=date, CV_index=uniform(0.6, 0.8), Vrms=uniform(1,3),Grms = uniform(1,3),RPM=uniform(1,3),vibration_id = j))
- cmd = "TRUNCATE TABLE record_frequency"
- db.query(cmd)
- for i in range(0,1000):
- print(i)
- db['record_frequency'].insert(dict(time_stamp=loc_dt_format, MFHz=i, MFValue=uniform(0,0.12),vibration_id = 1))
- cmd = "TRUNCATE TABLE record_frequency"
- db.query(cmd)
- for i in range(0,1000):
- print(i)
- db['record_frequency'].insert(dict(time_stamp=loc_dt_format, MFHz=i, MFValue=uniform(0,0.12),vibration_id = 1))
- def add_tower():
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
-
- loc_dt = datetime.today()
- loc_dt_format = loc_dt.strftime("%Y-%m-%d %H:%M:%S")
- for device in ['dev005','dev006','dev007','dev008','dev009']:
- for key in ['hotTemp','waterFlow','coldTemp','fanMotorCur','fanMotorSpeedFreq']:
- db['record_dcs'].insert(dict(time_stamp=loc_dt_format, MFValue=uniform(0,0.12),vibration_id = 1))
|