import pandas as pd from fastapi import FastAPI, File, Form, UploadFile, Response, Request, Depends, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse, FileResponse import dataset from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles import uvicorn from typing import List from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm import time import shutil import logging import traceback import aiofiles app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=['*'], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) resource_server = 'www.keyword.web:8000/' tmp_csv_sub_folder = 'tmp_csv/' img_upload_folder = '/var/www/html/'+tmp_csv_sub_folder app.mount("../static", StaticFiles(directory="static"), name="static") # templates = Jinja2Templates(directory="../html") templates = Jinja2Templates(directory="../templates/") oauth_scheme = OAuth2PasswordBearer(tokenUrl="token") @app.get("/user/{id}", response_class=HTMLResponse) def home(request: Request, id:str): name = "cat" return templates.TemplateResponse("home.html", {"request": request, "name": name, "id": id}) @app.get("/index_eng", response_class=HTMLResponse) async def index2(): return FileResponse('../html/index_eng.html') @app.post("/root0") async def root0(file: UploadFile = File(...)): with open(f'{file.filename}', "wb") as buffer: shutil.copyfileobj(file.file, buffer) return {"file_name": file.filename} @app.post("/csv") async def csv(files: List[UploadFile] = File(...)): lists = {} i = 0 for csv in files: with open(f'{csv.filename}', "wb") as buffer: shutil.copyfileobj(csv.file, buffer) lists[i] = csv.filename i += 1 return {"files uploaded": lists} @app.post('/upfile1') async def up_f1(request:Request,upload_list:List[UploadFile]= File(...)): return templates.TemplateResponse( 'f.html', { "request":request, "file_names":[dd.filename for dd in upload_list], "file_sizes":[len(ds.read())/1024 for ds in [dd.file for dd in upload_list]], "file_types":[dd.content_type for dd in upload_list] }) def handle_email_background(email: str, data: str): print(email) print(data) for i in range(100): print(i) time.sleep(0.1) @app.get("/users/email") async def handle_email(email: str, background_task:BackgroundTasks): print(email) background_task.add_task(handle_email_background, email, "This is background task") return {"message": "mail sent"} @app.post("/token") async def token_generate(form_data: OAuth2PasswordRequestForm = Depends()): print(form_data) return {"access_token": form_data.username, "token_type": "bearer"} @app.get("/users/profile") async def profile(token: str = Depends(oauth_scheme)): print(token) return { "user": "test1", "profile": "myprofile" } @app.get("/report/{id}", response_class=HTMLResponse) async def report(request: Request, id: str): return templates.TemplateResponse("index.html", {"request":request, "id": id}) @app.post("/submitform") async def handle_form(request:Request, assignment: str = Form(...), assignment_file: UploadFile = File(...)): print(assignment) print(assignment_file.filename) content_assignment = await assignment_file.read() print(content_assignment) return templates.TemplateResponse("submited.html", {"request": request, "file_name": assignment_file.filename}) @app.get("/legacy/") def get_legacy_data(): data = """
Apply shampoo here.
You'll have to use soap here.
""" return Response(content=data, media_type="application/xml") @app.post("/write/") async def writecsv(file: UploadFile = File(...)): # file in utf8 data = file.file df = pd.read_csv(data, sep=",", skiprows=2, na_values='NULL') db = dataset.connect('mysql://root:pAssw0rd@localhost:3306/keyword?charset=utf8mb4') table = db['test3'] rows = df.shape[0] for i in range(rows): row = df.iloc[i] table.insert(dict(row)) db.close() if __name__ == '__main__': uvicorn.run(app)