from fastapi import FastAPI, Form, Request from fastapi.encoders import jsonable_encoder from fastapi.middleware.cors import CORSMiddleware import dataset import pandas as pd from starlette.responses import FileResponse, StreamingResponse from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles import io from starlette.responses import RedirectResponse import codecs import random from starlette.status import HTTP_302_FOUND,HTTP_303_SEE_OTHER import smtplib import traceback import requests import threading import os from email.mime.text import MIMEText from email.mime.image import MIMEImage from email.mime.multipart import MIMEMultipart import boto3 from botocore.exceptions import ClientError from email.mime.base import MIMEBase from email.mime.application import MIMEApplication app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=['*'], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def hhh_send_mail(email): SENDER = "Gorgeous Space " RECIPIENT = "jeweiliang@gmail.com" AWS_REGION = "us-east-1" CHARSET = "UTF-8" client = boto3.client('ses',region_name=AWS_REGION) # Try to send the email. try: #Provide the contents of the email. msg = MIMEMultipart() msg["Subject"] = "[幸福空間] 您的五萬元裝修折價券 " msg["From"] = "noreply@hhh.com.tw" msg["To"] = email # Set message body body = MIMEText("感謝填寫幸福空間問卷,這是您的五萬元裝修折價券\n", "plain") msg.attach(body) filename = "coupon.png" # In same directory as script with open(filename, "rb") as attachment: part = MIMEApplication(attachment.read()) part.add_header("Content-Disposition", "attachment", filename=filename) msg.attach(part) response = client.send_raw_email( Source=msg["From"], Destinations=[msg["To"]], RawMessage={"Data": msg.as_string()} ) print(response) # Display an error if something goes wrong. except ClientError as e: print(e.response['Error']['Message']) else: print("Email sent! Message ID:"), print(response['MessageId']) def thread_mail(umail): print('thread_mail called') # r = requests.get('http://3.220.159.187:8081/mailto/'+umail) # print(r.text) # os.chdir('/root/hhh_step_question/step_question/tests') # os.system('python3 /root/hhh_step_question/step_question/tests/dftest.py &') def get_db(): # db = dataset.connect('mysql://choozmo:pAssw0rd@139.162.121.30:33306/hhh?charset=utf8mb4') db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/hhh?charset=utf8mb4') step_questions_table = db['step_questions'] return step_questions_table def mail_to_user(umail): r = requests.get('http://3.220.159.187:8081/mailto/'+umail) print(r.text) # os.chdir('/root/hhh_step_question/step_question/tests') os.system('python3 /root/hhh_step_question/step_question/tests/dftest.py &') # umail @app.post("/step_questions/submit", response_class=RedirectResponse) async def submit(request: Request): form_data = await request.form() # form_data = request.form() email=form_data.get('q3') result = { 'sex': form_data.get('q1'), 'phone': form_data.get('q2'), 'email': email, 'building_case_name': form_data.get('q4'), 'building_case_type': form_data.get('q5'), 'decoration_style': ','.join(form_data.getlist('q6')), 'decoration_budget': ','.join(form_data.getlist('q7')), 'decoration_size': ','.join(form_data.getlist('q8')), 'src':'SMS' } print(result) get_db().insert(result) mail_to_user(email) return RedirectResponse(url="/a1/index_complete_msg.html", status_code=HTTP_302_FOUND) # return result @app.post("/step_questions/line-submit", response_class=RedirectResponse) async def line_submit(request: Request): form_data = await request.form() # form_data = request.form() email=form_data.get('q3') result = { 'sex': form_data.get('q1'), 'phone': form_data.get('q2'), 'email': email, 'building_case_name': form_data.get('q4'), 'building_case_type': form_data.get('q5'), 'decoration_style': ','.join(form_data.getlist('q6')), 'decoration_budget': ','.join(form_data.getlist('q7')), 'decoration_size': ','.join(form_data.getlist('q8')), 'src':'LINE' } print(result) get_db().insert(result) mail_to_user(email) return RedirectResponse(url="/a1/index_complete_line_after.html", status_code=HTTP_302_FOUND) # return result app.mount("/a1", StaticFiles(directory="static"), name="static") @app.get("/a1") async def redirect(): # url = app.url_path_for("/a1/index.html") response = RedirectResponse(url='/a1/index.html') x = threading.Thread(target=hhh_send_mail, args=('jared@choozmo.com',)) x.start() return response #@app.get("/a1", response_class=HTMLResponse) #@app.get("/a1", response_class=HTMLResponse) #async def read_items(): # return """ # # # Some HTML in here # # #

Look ma! HTML!

# # # """ @app.get("/step_questions") async def get_step_question(request: Request): db_data = list(get_db().find()) df = pd.DataFrame(db_data, columns=db_data[0].keys()) stream = io.StringIO() df.to_csv(stream, index = False) # response = StreamingResponse(data_frame.to_csv(index=False), media_type="text/csv") response = StreamingResponse(iter([stream.getvalue()]), media_type="text/csv") response.headers["Content-Disposition"] = "attachment; filename=export.csv" return response