123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- import pandas as pd
- from fastapi import FastAPI, File, Form, UploadFile, Response, Request, Depends, BackgroundTasks
- from fastapi.middleware.cors import CORSMiddleware
- from fastapi.responses import HTMLResponse, FileResponse
- import dataset
- from fastapi.templating import Jinja2Templates
- from fastapi.staticfiles import StaticFiles
- import uvicorn
- from typing import List
- from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
- import time
- import shutil
- import logging
- import traceback
- import aiofiles
- app = FastAPI()
- app.add_middleware(
- CORSMiddleware,
- allow_origins=['*'],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- resource_server = 'www.keyword.web:8000/'
- tmp_csv_sub_folder = 'tmp_csv/'
- img_upload_folder = '/var/www/html/'+tmp_csv_sub_folder
- app.mount("../static", StaticFiles(directory="static"), name="static")
- # templates = Jinja2Templates(directory="../html")
- templates = Jinja2Templates(directory="../templates/")
- oauth_scheme = OAuth2PasswordBearer(tokenUrl="token")
- @app.get("/user/{id}", response_class=HTMLResponse)
- def home(request: Request, id:str):
- name = "cat"
- return templates.TemplateResponse("home.html", {"request": request, "name": name, "id": id})
- @app.get("/index_eng", response_class=HTMLResponse)
- async def index2():
- return FileResponse('../html/index_eng.html')
- @app.post("/root0")
- async def root0(file: UploadFile = File(...)):
- with open(f'{file.filename}', "wb") as buffer:
- shutil.copyfileobj(file.file, buffer)
- return {"file_name": file.filename}
- @app.post("/csv")
- async def csv(files: List[UploadFile] = File(...)):
- lists = {}
- i = 0
- for csv in files:
- with open(f'{csv.filename}', "wb") as buffer:
- shutil.copyfileobj(csv.file, buffer)
- lists[i] = csv.filename
- i += 1
- return {"files uploaded": lists}
- @app.post('/upfile1')
- async def up_f1(request:Request,upload_list:List[UploadFile]= File(...)):
- return templates.TemplateResponse(
- 'f.html',
- {
- "request":request,
- "file_names":[dd.filename for dd in upload_list],
- "file_sizes":[len(ds.read())/1024 for ds in [dd.file for dd in upload_list]],
- "file_types":[dd.content_type for dd in upload_list]
- })
- def handle_email_background(email: str, data: str):
- print(email)
- print(data)
- for i in range(100):
- print(i)
- time.sleep(0.1)
- @app.get("/users/email")
- async def handle_email(email: str, background_task:BackgroundTasks):
- print(email)
- background_task.add_task(handle_email_background, email, "This is background task")
- return {"message": "mail sent"}
- @app.post("/token")
- async def token_generate(form_data: OAuth2PasswordRequestForm = Depends()):
- print(form_data)
- return {"access_token": form_data.username, "token_type": "bearer"}
- @app.get("/users/profile")
- async def profile(token: str = Depends(oauth_scheme)):
- print(token)
- return {
- "user": "test1",
- "profile": "myprofile"
- }
- @app.get("/report/{id}", response_class=HTMLResponse)
- async def report(request: Request, id: str):
- return templates.TemplateResponse("index.html", {"request":request, "id": id})
- @app.post("/submitform")
- async def handle_form(request:Request, assignment: str = Form(...), assignment_file: UploadFile = File(...)):
- print(assignment)
- print(assignment_file.filename)
- content_assignment = await assignment_file.read()
- print(content_assignment)
- return templates.TemplateResponse("submited.html", {"request": request, "file_name": assignment_file.filename})
- @app.get("/legacy/")
- def get_legacy_data():
- data = """<?xml version="1.0"?>
- <shampoo>
- <Header>
- Apply shampoo here.
- </Header>
- <Body>
- You'll have to use soap here.
- </Body>
- </shampoo>
- """
- return Response(content=data, media_type="application/xml")
- @app.post("/write/")
- async def writecsv(file: UploadFile = File(...)):
- # file in utf8
- data = file.file
- df = pd.read_csv(data, sep=",", skiprows=2, na_values='NULL')
- db = dataset.connect('mysql://root:pAssw0rd@localhost:3306/keyword?charset=utf8mb4')
- table = db['test3']
- rows = df.shape[0]
- for i in range(rows):
- row = df.iloc[i]
- table.insert(dict(row))
- db.close()
- if __name__ == '__main__':
- uvicorn.run(app)
|