from fastapi import FastAPI, Form, Request from fastapi.encoders import jsonable_encoder from fastapi.middleware.cors import CORSMiddleware import dataset import pandas as pd from starlette.responses import FileResponse, StreamingResponse from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles import io from starlette.responses import RedirectResponse import codecs import random from starlette.status import HTTP_302_FOUND,HTTP_303_SEE_OTHER import smtplib import traceback import requests import threading import os from email.mime.text import MIMEText from email.mime.image import MIMEImage from email.mime.multipart import MIMEMultipart import boto3 from botocore.exceptions import ClientError from email.mime.base import MIMEBase from email.mime.application import MIMEApplication import random app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=['*'], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) coffee='0' def hhh_send_mail(email,coffee=False): SENDER = "Gorgeous Space " RECIPIENT = "jeweiliang@gmail.com" AWS_REGION = "us-east-1" CHARSET = "UTF-8" client = boto3.client('ses',region_name=AWS_REGION) # Try to send the email. try: #Provide the contents of the email. msg = MIMEMultipart() msg["Subject"] = "[幸福空間] 您的五萬元裝修折價券 " msg["From"] = "noreply@hhh.com.tw" msg["To"] = email # Set message body if coffee: body = MIMEText("感謝填寫幸福空間問卷,開啟下方完整券樣(密碼9888)至櫃台掃碼兌換。https://txp.rs/v/EvX69b4Xq9 \n\n 附件是您的五萬元裝修折價券\n", "plain") else: body = MIMEText("感謝填寫幸福空間問卷,這是您的五萬元裝修折價券\n", "plain") msg.attach(body) filename = "coupon.png" # In same directory as script with open(filename, "rb") as attachment: part = MIMEApplication(attachment.read()) part.add_header("Content-Disposition", "attachment", filename=filename) msg.attach(part) response = client.send_raw_email( Source=msg["From"], Destinations=[msg["To"]], RawMessage={"Data": msg.as_string()} ) print(response) # Display an error if something goes wrong. except ClientError as e: print(e.response['Error']['Message']) else: print("Email sent! Message ID:"), print(response['MessageId']) os.system('python3 /root/hhh_step_question/step_question/tests/dftest.py &') def thread_mail(umail): print('thread_mail called') # r = requests.get('http://3.220.159.187:8081/mailto/'+umail) # print(r.text) # os.chdir('/root/hhh_step_question/step_question/tests') # os.system('python3 /root/hhh_step_question/step_question/tests/dftest.py &') def get_db(): # db = dataset.connect('mysql://choozmo:pAssw0rd@139.162.121.30:33306/hhh?charset=utf8mb4') db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/hhh?charset=utf8mb4') step_questions_table = db['step_questions'] return step_questions_table def mail_to_user(umail): r = requests.get('http://3.220.159.187:8081/mailto/'+umail) print(r.text) # os.chdir('/root/hhh_step_question/step_question/tests') os.system('python3 /root/hhh_step_question/step_question/tests/dftest.py &') # umail @app.post("/step_questions/submit", response_class=RedirectResponse) async def submit(request: Request): form_data = await request.form() # form_data = request.form() email=form_data.get('q3') result = { 'sex': form_data.get('q1'), 'phone': form_data.get('q2'), 'email': email, 'building_case_name': form_data.get('q4'), 'building_case_type': form_data.get('q5'), 'decoration_style': ','.join(form_data.getlist('q6')), 'decoration_budget': ','.join(form_data.getlist('q7')), 'decoration_size': ','.join(form_data.getlist('q8')), 'src':'SMS' } print(result) get_db().insert(result) # mail_to_user(email) x = threading.Thread(target=hhh_send_mail, args=(email,)) x.start() return RedirectResponse(url="/a1/index_complete_msg.html", status_code=HTTP_302_FOUND) # return result @app.post("/step_questions/line-submit", response_class=RedirectResponse) async def line_submit(request: Request): form_data = await request.form() print(form_data) # form_data = request.form() email=form_data.get('q3') result = { 'sex': form_data.get('q1'), 'phone': form_data.get('q2'), 'email': email, 'building_case_name': form_data.get('q4'), 'building_case_type': form_data.get('q5'), 'decoration_style': ','.join(form_data.getlist('q6')), 'decoration_budget': ','.join(form_data.getlist('q7')), 'decoration_size': ','.join(form_data.getlist('q8')), 'src':'LINE', 'userid': form_data.get('userid'), 'id': form_data.get('id'), 'area': form_data.get('area'), } print(result) get_db().insert(result) # mail_to_user(email) x = threading.Thread(target=hhh_send_mail, args=(email,True)) x.start() return RedirectResponse(url="/a1/index_complete_line_after.html", status_code=HTTP_302_FOUND) # return result app.mount("/a1", StaticFiles(directory="static"), name="static") @app.get("/coffee/set/{item_id}") async def coffee_set(item_id: int): global coffee if item_id==0: coffee='0' else: coffee='1' return {"code":"ok" } @app.get("/coffee") async def coffee_get(): global coffee # val=random.randint(5,100) # coffee='0' # if val%2==0: # coffee='1' return {'coffee':coffee} @app.get("/220") async def redirect220(): response = RedirectResponse(url='/a1/index-line.html') response.set_cookie(key="area", value="220", domain="q.ptt.cx") return response @app.get("/242") async def redirect242(): response = RedirectResponse(url='/a1/index-line.html') response.set_cookie(key="area", value="242", domain="q.ptt.cx") return response @app.get("/241") async def redirect241(): response = RedirectResponse(url='/a1/index-line.html') response.set_cookie(key="area", value="241", domain="q.ptt.cx") return response @app.get("/112") async def redirect112(): response = RedirectResponse(url='/a1/index-line.html') response.set_cookie(key="area", value="112", domain="q.ptt.cx") return response @app.get("/111") async def redirect111(): response = RedirectResponse(url='/a1/index-line.html') response.set_cookie(key="area", value="111", domain="q.ptt.cx") return response @app.get("/a1") async def redirect(): # url = app.url_path_for("/a1/index.html") response = RedirectResponse(url='/a1/index.html') return response #@app.get("/a1", response_class=HTMLResponse) #@app.get("/a1", response_class=HTMLResponse) #async def read_items(): # return """ # # # Some HTML in here # # #

Look ma! HTML!

# # # """ @app.get("/step_questions") async def get_step_question(request: Request): db_data = list(get_db().find()) df = pd.DataFrame(db_data, columns=db_data[0].keys()) stream = io.StringIO() df.to_csv(stream, index = False) # response = StreamingResponse(data_frame.to_csv(index=False), media_type="text/csv") response = StreamingResponse(iter([stream.getvalue()]), media_type="text/csv") response.headers["Content-Disposition"] = "attachment; filename=export.csv" return response