浏览代码

新增 'main_tmp.py'

miacheng913 3 年之前
父节点
当前提交
f13b5bc25c
共有 1 个文件被更改,包括 486 次插入0 次删除
  1. 486 0
      main_tmp.py

+ 486 - 0
main_tmp.py

@@ -0,0 +1,486 @@
+# fastapi
+from fastapi import FastAPI, Request, Response, HTTPException, status, Depends , Form
+from fastapi import templating
+from fastapi.templating import Jinja2Templates
+from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
+from fastapi.middleware.cors import CORSMiddleware
+
+from fastapi.staticfiles import StaticFiles
+
+# fastapi view function parameters
+from typing import List, Optional
+import json
+# path
+import sys  
+
+# time
+# import datetime
+from datetime import timedelta, datetime
+# db
+import dataset
+from passlib import context
+import models
+from random import randint
+# authorize
+from passlib.context import CryptContext
+pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
+import jwt
+from fastapi_jwt_auth import AuthJWT
+from fastapi_jwt_auth.exceptions import AuthJWTException
+from fastapi.security import OAuth2AuthorizationCodeBearer, OAuth2PasswordRequestForm
+import numpy as np
+import pymysql
+pymysql.install_as_MySQLdb()
+db_settings = {
+    "host": "db.ptt.cx",
+    "port": 3306,
+    "user": "choozmo",
+    "password": "pAssw0rd",
+    "db": "Water_tower",
+    "charset": "utf8mb4"
+}
+
+# app
+app = FastAPI()
+app.add_middleware(
+    CORSMiddleware,
+    allow_origins=["*"],
+    allow_credentials=True,
+    allow_methods=["*"],
+    allow_headers=["*"],
+)
+
+
+
+SECRET_KEY = "df2f77bd544240801a048bd4293afd8eeb7fff3cb7050e42c791db4b83ebadcd"
+ALGORITHM = "HS256"
+ACCESS_TOKEN_EXPIRE_MINUTES = 3000
+pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
+
+# 
+app.mount(path='/templates', app=StaticFiles(directory='templates'), name='templates')
+app.mount(path='/static', app=StaticFiles(directory='static'), name='static ')
+
+
+
+# 
+templates = Jinja2Templates(directory='templates')
+
+
+@AuthJWT.load_config
+def get_config():
+    return models.Settings()
+
+
+# view
+@app.get('/', response_class=HTMLResponse)
+async def index(request: Request):
+    print(request)
+    return templates.TemplateResponse(name='index.html', context={'request': request})
+     
+
+@app.get('/login', response_class=HTMLResponse)
+async def login(request: Request):
+    return templates.TemplateResponse(name='login_test.html', context={'request': request})
+
+
+@app.post("/login")
+async def login_for_access_token(request: Request, form_data: OAuth2PasswordRequestForm = Depends(), Authorize: AuthJWT = Depends()):
+    db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
+    
+    user = authenticate_user(form_data.username, form_data.password)
+    if not user:
+        raise HTTPException(
+            status_code=status.HTTP_401_UNAUTHORIZED,
+            detail="Incorrect username or password",
+            headers={"WWW-Authenticate": "Bearer"},
+        )
+    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
+    access_token = create_access_token(
+        data={"sub": user.username}, expires_delta=access_token_expires
+    )
+    table = db['users']
+    user.token = access_token
+    print(user)
+    table.update(dict(user), ['username'])
+    access_token = Authorize.create_access_token(subject=user.username)
+    refresh_token = Authorize.create_refresh_token(subject=user.username)
+    Authorize.set_access_cookies(access_token)
+    Authorize.set_refresh_cookies(refresh_token)
+    #return templates.TemplateResponse("home.html", {"request": request, "msg": 'Login'})
+    return {"access_token": access_token, "token_type": "bearer"}       # 回傳token給前端
+
+
+@app.get('/register', response_class=HTMLResponse)
+async def login(request: Request):
+    return templates.TemplateResponse(name='rigister_test.html', context={'request': request})
+
+
+@app.post('/register')
+async def register(request: Request, form_data: OAuth2PasswordRequestForm = Depends()):
+    user = models.User(**await request.form())
+    print(form_data.username, form_data.password, user)
+    user.id = randint(1000, 9999)
+    user.isAdmin = 0 #預設為非管理者
+    user.roleType = 0 #預設為employee
+    # 密碼加密
+    #user.password = get_password_hash(user.password)
+    
+    # 存入DB
+    db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
+    user_table = db['users']
+    user_table.insert(dict(user))
+    
+    # 跳轉頁面至登入
+    return templates.TemplateResponse(name='login.html', context={'request': request})
+
+
+@app.get('/home', response_class=HTMLResponse)
+async def home(request: Request):
+    return templates.TemplateResponse(name='home.html', context={'request': request})
+
+
+@app.get('/tower', response_class=HTMLResponse)
+async def tower(request: Request, Authorize: AuthJWT = Depends()):
+    try:
+        Authorize.jwt_required()
+    except Exception as e:
+        print(e)
+        return RedirectResponse('/login')
+    # current_user = Authorize.get_jwt_subject()
+    return templates.TemplateResponse(name='tower.html', context={'request': request})
+    
+
+@app.get('/optim', response_class=HTMLResponse)
+async def optim(request: Request, Authorize: AuthJWT = Depends()):
+    try:
+        Authorize.jwt_required()
+    except Exception as e:
+        print(e)
+        return RedirectResponse('/login')
+    # current_user = Authorize.get_jwt_subject()
+    return templates.TemplateResponse(name='optim.html', context={'request': request})
+
+
+@app.get('/vibration', response_class=HTMLResponse)
+async def vibration(request: Request, Authorize: AuthJWT = Depends()):
+    try:
+        Authorize.jwt_required()
+    except Exception as e:
+        print(e)
+        return RedirectResponse('/login')
+    # current_user = Authorize.get_jwt_subject()
+    return templates.TemplateResponse(name='vibration.html', context={'request': request})
+
+
+@app.get('/history', response_class=HTMLResponse)
+async def history(request: Request, Authorize: AuthJWT = Depends()):
+    try:
+        Authorize.jwt_required()
+    except Exception as e:
+        print(e)
+        return RedirectResponse('/login')
+    # current_user = Authorize.get_jwt_subject()
+    return templates.TemplateResponse(name='history.html', context={'request': request})
+
+
+@app.get('/device', response_class=HTMLResponse)
+async def device(request: Request, Authorize: AuthJWT = Depends()):
+    try:
+        Authorize.jwt_required()
+    except Exception as e:
+        print(e)
+        return RedirectResponse('/login')
+    # current_user = Authorize.get_jwt_subject()
+    return templates.TemplateResponse(name='device.html', context={'request': request})
+
+
+@app.get('/system', response_class=HTMLResponse)
+async def system(request: Request, Authorize: AuthJWT = Depends()):
+    try:
+        Authorize.jwt_required()
+    except Exception as e:
+        print(e)
+        return RedirectResponse('/login')
+    # current_user = Authorize.get_jwt_subject()
+    return templates.TemplateResponse(name='system.html', context={'request': request})
+
+@app.get('/member', response_class=HTMLResponse)
+async def get_member(request: Request, Authorize: AuthJWT = Depends()):
+    """獲取所有帳號資訊"""
+    try:
+        Authorize.jwt_required()
+    except Exception as e:
+        print(e)
+        return RedirectResponse('/login')
+
+    db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
+    statement = 'SELECT  id,username,isAdmin,roleType FROM users'
+    json_dic = {}
+    for row in db.query(statement):
+        #print(row['id'],row['username'])
+        json_dic[row['username']] = {'isAdmin':row['isAdmin'],'roleType':row['roleType']}
+    result  = json.dumps(json_dic,ensure_ascii=False)
+    current_user = Authorize.get_jwt_subject()
+    print(current_user)
+    return result
+
+@app.get('/member/edit', response_class=HTMLResponse)
+async def login(request: Request, Authorize: AuthJWT = Depends()):
+    try:
+        Authorize.jwt_required()
+    except Exception as e:
+        print(e)
+        return RedirectResponse('/login')
+    return templates.TemplateResponse(name='member_edit_test.html', context={'request': request})
+
+@app.get('/member_delete', response_class=HTMLResponse)
+async def login(request: Request, Authorize: AuthJWT = Depends()):
+    try:
+        Authorize.jwt_required()
+    except Exception as e:
+        print(e)
+        return RedirectResponse('/login')
+    return templates.TemplateResponse(name='delete_member_test2.html', context={'request': request})
+
+@app.post('/member_delete')
+async def delete_member(request: Request,Authorize: AuthJWT = Depends()):
+    """刪除成員"""
+    try:
+        Authorize.jwt_required()
+    except Exception as e:
+        print(e)
+        return RedirectResponse('/login')
+    del_user = await request.form()
+    current_user = Authorize.get_jwt_subject()
+    delete_one = del_user.del_username
+    db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
+    statement = 'SELECT isAdmin FROM users WHERE userName = "'+current_user+'"'
+    for row in db.query(statement):
+        if row['isAdmin']!=1:
+            return {'msg': ' 你沒有權限'}
+    else:
+        current_user_roleType = 0
+        del_user_roleType = -1
+        statement = 'SELECT roleType FROM users WHERE userName = "'+current_user+'"'
+        for row in db.query(statement):
+            current_user_roleType = row['roleType']
+        statement = 'SELECT roleType FROM users WHERE userName = "'+delete_one+'"'
+        for row in db.query(statement):
+            del_user_roleType = row['roleType']
+        if del_user_roleType == -1:
+            return {'msg':'不存在使用者'}
+        elif current_user_roleType<=del_user_roleType :
+            return {'msg': ' 你沒有權限'}
+        else :
+            statement = 'DELETE FROM users WHERE userName = "'+delete_one+'"'
+            db.query(statement)
+    return {'msg': ' 成功刪除'}
+
+@app.get('/member/authority', response_class=HTMLResponse)
+async def member_authority(request: Request,Authorize: AuthJWT = Depends()):
+    """設定成員權限"""
+    try:
+        Authorize.jwt_required()
+    except Exception as e:
+        print(e)
+        return RedirectResponse('/login')
+    return templates.TemplateResponse(name='member_authority_test.html', context={'request': request})
+    
+@app.post('/member/authority')
+async def member_authority(request: Request,Authorize: AuthJWT = Depends()):
+    del_user = await request.form()
+    current_user = Authorize.get_jwt_subject()
+    delete_one = del_user.del_username
+    db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
+    statement = 'SELECT isAdmin FROM users WHERE userName = "'+current_user+'"'
+    await request.form()
+
+
+# 溫度API
+@app.get('/temperature')
+async def get_temperatures():
+    """ 撈DB溫度 """
+    return {'hot_water': 30.48, 'cold_water': 28.10, 'wet_ball': 25.14}
+
+@app.post("/example")
+async def example(request: Request,Authorize: AuthJWT = Depends()):
+    try:
+        Authorize.jwt_required()
+    except Exception as e:
+        print(e)
+    current_user = Authorize.get_jwt_subject()
+    #form_data = await request.form()
+    
+    print( current_user)
+    return current_user
+
+@app.post('/user')
+def user(Authorize: AuthJWT = Depends()):
+    Authorize.jwt_required()
+
+    current_user = Authorize.get_jwt_subject()
+    return {"user": current_user}
+
+@app.get("/example", response_class=HTMLResponse)
+async def example(request: Request,Authorize: AuthJWT = Depends()):
+    try:
+        Authorize.jwt_required()
+    except Exception as e:
+        print(e)
+        return RedirectResponse('/login')
+    current_user = Authorize.get_jwt_subject()
+    print( current_user)
+    return  current_user
+
+
+@app.get('/health')
+async def get_health(date: str):
+    """ 撈健康指標、預設健康指標 """
+    date = str(datetime.strptime(date, "%Y-%m-%d"))[:10]
+    print(date)
+    print(str(datetime.today()))
+    print(str(datetime.today()-timedelta(days=1)))
+
+
+    fake_data = {
+        str(datetime.today())[:10]: {'curr_health': 0.7, 'pred_health': 0.8},
+        str(datetime.today()-timedelta(days=1))[:10]: {'curr_health': 0.6, 'pred_health': 0.7},
+    }
+    
+    return fake_data[date]
+
+
+@app.get('/history_data')
+async def get_history(time_end: str):
+    """ 透過終點時間,抓取歷史資料。 """
+    date = str(datetime.strptime(time_end, "%Y-%m-%d"))[:10]
+    print(date)
+    print(str(datetime.today()))
+    print(str(datetime.today()-timedelta(days=1)))
+
+
+    fake_data = {
+        str(datetime.today())[:10]: {
+            'curr_history': {
+                'RPM_1X': list(np.random.rand(13)),
+                'RPM_2X': list(np.random.rand(13)),
+                'RPM_3X': list(np.random.rand(13)),
+                'RPM_4X': list(np.random.rand(13)),
+                'RPM_5X': list(np.random.rand(13)),
+                'RPM_6X': list(np.random.rand(13)),
+                'RPM_7X': list(np.random.rand(13)),
+                'RPM_8X': list(np.random.rand(13)),
+                'Gear_1X': list(np.random.rand(13)),
+                'Gear_2X': list(np.random.rand(13)),
+                'Gear_3X': list(np.random.rand(13)),
+                'Gear_4X': list(np.random.rand(13)),
+            },
+            'past_history': {
+                'RPM_1X': list(np.random.rand(13)),
+                'RPM_2X': list(np.random.rand(13)),
+                'RPM_3X': list(np.random.rand(13)),
+                'RPM_4X': list(np.random.rand(13)),
+                'RPM_5X': list(np.random.rand(13)),
+                'RPM_6X': list(np.random.rand(13)),
+                'RPM_7X': list(np.random.rand(13)),
+                'RPM_8X': list(np.random.rand(13)),
+                'Gear_1X': list(np.random.rand(13)),
+                'Gear_2X': list(np.random.rand(13)),
+                'Gear_3X': list(np.random.rand(13)),
+                'Gear_4X': list(np.random.rand(13)),
+            }
+        },
+        str(datetime.today()-timedelta(days=1))[:10]: {
+            'curr_history': {
+                'RPM_1X': list(np.random.rand(13)),
+                'RPM_2X': list(np.random.rand(13)),
+                'RPM_3X': list(np.random.rand(13)),
+                'RPM_4X': list(np.random.rand(13)),
+                'RPM_5X': list(np.random.rand(13)),
+                'RPM_6X': list(np.random.rand(13)),
+                'RPM_7X': list(np.random.rand(13)),
+                'RPM_8X': list(np.random.rand(13)),
+                'Gear_1X': list(np.random.rand(13)),
+                'Gear_2X': list(np.random.rand(13)),
+                'Gear_3X': list(np.random.rand(13)),
+                'Gear_4X': list(np.random.rand(13)),
+            },
+            'past_history': {
+                'RPM_1X': list(np.random.rand(13)),
+                'RPM_2X': list(np.random.rand(13)),
+                'RPM_3X': list(np.random.rand(13)),
+                'RPM_4X': list(np.random.rand(13)),
+                'RPM_5X': list(np.random.rand(13)),
+                'RPM_6X': list(np.random.rand(13)),
+                'RPM_7X': list(np.random.rand(13)),
+                'RPM_8X': list(np.random.rand(13)),
+                'Gear_1X': list(np.random.rand(13)),
+                'Gear_2X': list(np.random.rand(13)),
+                'Gear_3X': list(np.random.rand(13)),
+                'Gear_4X': list(np.random.rand(13)),
+            }
+        },
+    }
+    return fake_data[date]
+
+
+# Login funtion part
+def check_user_exists(username):
+    db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
+    if int(next(iter(db.query('SELECT COUNT(*) FROM Water_tower.users WHERE userName = "'+username+'"')))['COUNT(*)']) > 0:
+        return True
+    else:
+        return False
+
+
+def get_user(username: str):
+    """ 取得使用者資訊(Model) """
+    db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
+    if not check_user_exists(username):  # if user don't exist
+        return False
+    user_dict = next(
+        iter(db.query('SELECT * FROM Water_towe.users where userName ="'+username+'"')))
+    user = models.User(**user_dict)
+    return user
+    
+
+def user_register(user):
+    db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
+    table = db['users']
+    #user.password = get_password_hash(user.password)
+    table.insert(dict(user))
+
+
+def get_password_hash(password):
+    """ 加密密碼 """
+    return pwd_context.hash(password)
+
+
+def verify_password(plain_password, hashed_password):
+    """ 驗證密碼(hashed) """
+    return pwd_context.verify(plain_password, hashed_password)
+
+
+def authenticate_user(username: str, password: str):
+    """ 連線DB,讀取使用者是否存在。 """
+    db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/Water_tower?charset=utf8mb4')
+    if not check_user_exists(username):  # if user don't exist
+        return False
+    user_dict = next(iter(db.query('SELECT * FROM Water_tower.users where userName ="'+username+'"')))
+    user = models.User(**user_dict)
+    #if not verify_password(password, user.password):
+        #return False
+    return user
+
+
+def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
+    """ 創建token,並設定過期時間。 """
+    to_encode = data.copy()
+    if expires_delta:
+        expire = datetime.utcnow() + expires_delta
+    else:
+        expire = datetime.utcnow() + timedelta(minutes=15)
+    to_encode.update({"exp": expire})
+    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
+    return encoded_jwt