from fastapi import APIRouter from supabase import create_client, Client from dotenv import load_dotenv import os from datetime import datetime from random import choice from openai import OpenAI from typing import Annotated from pydantic import Field client = OpenAI() load_dotenv() # supaspace 連線 url: str = os.environ.get('SUPABASE_URL') key: str = os.environ.get('SUPABASE_KEY') supabase: Client = create_client(url, key) dbRouter = APIRouter() @dbRouter.get("/click") def add_click_time(): try: response = supabase.table('click_time').select("*").execute() click_time = response.data[0]['click_time'] + 1 data, count = supabase.table('click_time') \ .update({'click_time':click_time,'update_time':str(datetime.now())})\ .eq('id', 1)\ .execute() return {"state":"success","click_time" : click_time} except Exception as e: return {"state":str(e)} @dbRouter.get("/find_brand") def find_brand(keyword:str = None,language :str = "ch",page_num : int = None,page_amount: int = None,search_name : str = None): if keyword is None : query = supabase.table('101_brand').select('*').eq("language", language) else : keyword_list = keyword.split(",") query= supabase.table('101_brand').select('*').eq("language", language) for keyword_tmp in keyword_list : query = query.like('tags', f'%{keyword_tmp}%') if search_name: query = query.like('name', f'%{search_name}%') result,_ = query.execute() count = len(result[1]) if page_num and page_amount : offset = (page_num - 1) * page_amount query = query.range(offset, offset + page_amount-1) try: data,_ = query.execute() result = [] for shop in data[1] : json = { "type" : shop["type"], "info" : shop } if language != "ch" : if shop["floor"] == "館外" : json["info"]["floor"] = "outside" result.append(json) return {"state":"success","all_num" : count,"data" : result} except Exception as e: return {"state":"fail","message" :str(e)} @dbRouter.get("/arviews") def arviews(start:str,end:str,language:str = "ch"): try : data, count = supabase.table('101_arviews')\ .select('*')\ .eq('start_loc', start) \ .like('tour_place', f'%{end}%') \ .execute() result :str words :str if len(data[1]) != 0: if language == "ch" : result = data[1][0]["url"] words = data[1][0]["words"] else: result = data[1][0]["en_url"] words = data[1][0]["en_words"] else : result = "no this route" return {"state":"success","url" : result,"words" : words} except Exception as e: return {"state":"fail","message" :str(e)} @dbRouter.get("/static_tickets") async def static_tickets(is_Chinese : int = None): try: data =None if is_Chinese : data, count = supabase.table('101_ticket')\ .select('*')\ .in_('id', [1,3,6,7])\ .execute() else : data, count = supabase.table('101_ticket')\ .select('*')\ .in_('id', [182,183,180])\ .execute() result = [] for shop in data[1] : json = { "type" : shop["type"], "info" : shop } result.append(json) return {"state":"success","result" : result} except Exception as e: return {"state":"fail","message" :str(e)} @dbRouter.get("/ad/{type}") def read_root(type:str,language :str = "ch"): keyword1 :str keyword2 :str if type == "美食伴手禮": keyword1 = "餐飲" keyword2 = "伴手禮" else : keyword1 = "住宿" keyword2 = "伴手禮" data, count = supabase.table('101_brand')\ .select('*')\ .eq("floor","館外")\ .eq("language", language)\ .or_(f"tags.ilike.%{keyword1}%,tags.ilike.%{keyword2}%")\ .execute() result = data[1] # 從結果中隨機選擇一筆資料 random_row = choice(result) if language != "ch" : if random_row["floor"] == "館外" : random_row["floor"] = "outside" #print(random_row) return {"data": random_row} @dbRouter.post("/message_not_in_cache") def message_not_in_cache(question :str ,answer :str,client_id : str = "0" ): try: data, count = supabase.table('client_message').select('*').eq("question",question).execute() if len(data[1]) != 0 : return {"state": 200 , "message" : "have saved"} data, count = supabase.table('client_message').insert({"client_id": client_id, "question": question,"answer":answer}).execute() return {"state": 200 , "message" : "success"} except Exception as e: return {"state": 500 , "message" : str(e)} from pydantic import BaseModel, EmailStr import base64 import pickle from email.mime.text import MIMEText from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build import os SCOPES = ['https://www.googleapis.com/auth/gmail.send'] class dataform(BaseModel): title: str content: str client_name: str gender: str email: EmailStr phone: str type:str def send_email(to_email,from_email,message): creds = None # 如果存在 token.pickle 文件,讀取 if os.path.exists('token.pickle'): with open('token.pickle', 'rb') as token: creds = pickle.load(token) # 如果沒有有效的憑據,就進行登入 if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( 'credentials.json', SCOPES) creds = flow.run_local_server(port=0) # 保存憑據 with open('token.pickle', 'wb') as token: pickle.dump(creds, token) service = build('gmail', 'v1', credentials=creds) # 設定郵件 message = MIMEText(message) message['to'] = to_email message['from'] = from_email message['subject'] = '101 ai客服 表單新資料' raw = base64.urlsafe_b64encode(message.as_bytes()).decode() # 發送郵件 try: message = service.users().messages().send(userId='me', body={'raw': raw}).execute() print(f'已發送郵件: {message["id"]}') return "success" except Exception as error: print(f'發送郵件時出錯: {error}') return "fail" @dbRouter.post("/insert_table") def insert_table(data: dataform): try: response,count = supabase.table('lost_property').insert(data.dict()).execute() email_content = f"問題類別:{response[1][0]['type']}\n標題:{response[1][0]['title']}\n內容:{response[1][0]['content']}\n姓名:{response[1][0]['client_name']}\n性別:{response[1][0]['gender']}\n電子郵件:{response[1][0]['email']}\n聯絡電話:{response[1][0]['phone']}\n" try: send_email("steven@choozmo.com","mia@choozmo.com",str(email_content)) # mallservice@tfc101.com.tw except Exception as e: print(str(e)) return {"state": 200 ,"message": "資料已成功提交"} except Exception as e: return {"state": 500 , "message" : str(e)} @dbRouter.post("/video_save_into_cache") def message_not_in_cache(video_name : Annotated[str, Field(description="檔案請丟進/home/mia/101/static/video_cache/others/資料夾裡")],client_message_id :str = None,question:str = None): try: data = [] if client_message_id : data, count = supabase.table('client_message').select('*').eq("id",client_message_id).execute() elif question: data, count = supabase.table('client_message').select('*').eq("question",question).execute() info = data[1][0] response = supabase.table('video_cache').insert({"question": info["question"],"answer":info["answer"],"video_url":f"/static/video_cache/others/{video_name}"}).execute() response = supabase.table('client_message').delete().eq('id', info["id"]).execute() return {"state": 200 , "message" : "success"} except Exception as e: return {"state": 500 , "message" : str(e)} from sherry.semantic_search import ask_question @dbRouter.post("/video_cache") def video_cache(client_message :str ): try: # data, count = supabase.table('video_cache').select('*').like('question', f'%{client_message}%').execute() # if len(data[1]) == 0 : # return {"state": 500 , "message" : "no data"} # return {"state": 200 , "message" : data[1]} result = ask_question(client_message) if result == None : return {"state": 500 , "message" : "no data"} return {"state": 200 , "message" : result } except Exception as e: return {"state": 500 , "message" : str(e)}