from fastapi import FastAPI, Form, Request from fastapi.encoders import jsonable_encoder from fastapi.middleware.cors import CORSMiddleware import dataset import pandas as pd from starlette.responses import FileResponse, StreamingResponse from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles import io from starlette.responses import RedirectResponse import codecs import random from starlette.status import HTTP_302_FOUND,HTTP_303_SEE_OTHER import smtplib import traceback import os from email.mime.text import MIMEText from email.mime.image import MIMEImage from email.mime.multipart import MIMEMultipart import requests app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=['*'], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def get_db(): # db = dataset.connect('mysql://choozmo:pAssw0rd@139.162.121.30:33306/hhh?charset=utf8mb4') db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/hhh?charset=utf8mb4') step_questions_table = db['step_questions'] return step_questions_table def mail_to_user(umail): r = requests.get('http://3.220.159.187:8081/mailto/'+umail) print(r.text) # os.chdir('/root/hhh_step_question/step_question/tests') os.system('python3 /root/hhh_step_question/step_question/tests/dftest.py &') # umail @app.post("/step_questions/submit", response_class=RedirectResponse) async def submit(request: Request): form_data = await request.form() # form_data = request.form() email=form_data.get('q3') result = { 'sex': form_data.get('q1'), 'phone': form_data.get('q2'), 'email': email, 'building_case_name': form_data.get('q4'), 'building_case_type': form_data.get('q5'), 'decoration_style': ','.join(form_data.getlist('q6')), 'decoration_budget': ','.join(form_data.getlist('q7')), 'decoration_size': ','.join(form_data.getlist('q8')), 'src':'SMS' } print(result) get_db().insert(result) mail_to_user(email) return RedirectResponse(url="/a1/index_complete_msg.html", status_code=HTTP_302_FOUND) # return result @app.post("/step_questions/line-submit", response_class=RedirectResponse) async def line_submit(request: Request): form_data = await request.form() # form_data = request.form() email=form_data.get('q3') result = { 'sex': form_data.get('q1'), 'phone': form_data.get('q2'), 'email': email, 'building_case_name': form_data.get('q4'), 'building_case_type': form_data.get('q5'), 'decoration_style': ','.join(form_data.getlist('q6')), 'decoration_budget': ','.join(form_data.getlist('q7')), 'decoration_size': ','.join(form_data.getlist('q8')), 'src':'LINE' } print(result) get_db().insert(result) mail_to_user(email) return RedirectResponse(url="/a1/index_complete_line_after.html", status_code=HTTP_302_FOUND) # return result app.mount("/a1", StaticFiles(directory="static"), name="static") @app.get("/a1") async def redirect(): # url = app.url_path_for("/a1/index.html") response = RedirectResponse(url='/a1/index.html') return response #@app.get("/a1", response_class=HTMLResponse) #@app.get("/a1", response_class=HTMLResponse) #async def read_items(): # return """ # # # Some HTML in here # # #

Look ma! HTML!

# # # """ @app.get("/step_questions") async def get_step_question(request: Request): db_data = list(get_db().find()) df = pd.DataFrame(db_data, columns=db_data[0].keys()) stream = io.StringIO() df.to_csv(stream, index = False) # response = StreamingResponse(data_frame.to_csv(index=False), media_type="text/csv") response = StreamingResponse(iter([stream.getvalue()]), media_type="text/csv") response.headers["Content-Disposition"] = "attachment; filename=export.csv" return response