from fastapi import APIRouter,UploadFile, File,Body from supabase import create_client, Client from dotenv import load_dotenv import os from datetime import datetime from random import choice from typing import Annotated from pydantic import Field load_dotenv() # supaspace 連線 url: str = os.environ.get('SUPABASE_URL') key: str = os.environ.get('SUPABASE_KEY') supabase: Client = create_client(url, key) dbRouter = APIRouter() @dbRouter.get("/click") def add_click_time(click_type :str = None,brand:str = None): try: if click_type == None : response = supabase.table('click_time').select("*").eq('id', 1).execute() click_time = response.data[0]['click_time'] + 1 data, count = supabase.table('click_time') \ .update({'click_time':click_time,'update_time':str(datetime.now())})\ .eq('id', 1)\ .execute() elif click_type == "card" : response = supabase.table('click_time').select("*").eq('id', 2).execute() click_time = response.data[0]['click_time'] + 1 data, count = supabase.table('click_time') \ .update({'click_time':click_time,'update_time':str(datetime.now())})\ .eq('id', 2)\ .execute() elif click_type == "card-en" : response = supabase.table('click_time').select("*").eq('name', "國際貴賓卡-en").execute() click_time = response.data[0]['click_time'] + 1 print(response) data, count = supabase.table('click_time') \ .update({'click_time':click_time,'update_time':str(datetime.now())})\ .eq('id', 3)\ .execute() elif click_type == "click_url" and brand: response = supabase.table('101_brand').select("*").eq("name",brand).execute() click_time = response.data[0][click_type] + 1 data, count = supabase.table('101_brand') \ .update({click_type:click_time})\ .eq("name",brand)\ .execute() elif click_type == "click_address" and brand: response = supabase.table('101_brand').select("*").eq("name",brand).execute() click_time = response.data[0][click_type] + 1 data, count = supabase.table('101_brand') \ .update({click_type:click_time})\ .eq("name",brand)\ .execute() return {"state":"success","click_time" : click_time} except Exception as e: return {"state":str(e)} @dbRouter.get("/find_brand") def find_brand(keyword:str = None,language :str = "ch",page_num : int = None,page_amount: int = None,search_name : str = None): if keyword is None : query = supabase.table('101_brand').select('*').eq("language", language) else : keyword_list = keyword.split(",") query= supabase.table('101_brand').select('*').eq("language", language) for keyword_tmp in keyword_list : query = query.like('tags', f'%{keyword_tmp}%') if search_name: query = query.like('name', f'%{search_name}%') result,_ = query.order("id", desc=True).execute() count = len(result[1]) if page_num and page_amount : offset = (page_num - 1) * page_amount query = query.range(offset, offset + page_amount-1) try: data,_ = query.execute() result = [] for shop in data[1] : json = { "type" : shop["type"], "info" : shop } if language != "ch" : if shop["floor"] == "館外" : json["info"]["floor"] = "outside" result.append(json) return {"state":"success","all_num" : count,"data" : result} except Exception as e: return {"state":"fail","message" :str(e)} @dbRouter.get("/arviews") def arviews(start:str,end:str,language:str = "ch"): try : data, count = supabase.table('101_arviews')\ .select('*')\ .eq('start_loc', start) \ .like('tour_place', f'%{end}%') \ .execute() result :str words :str if len(data[1]) != 0: if language == "ch" : result = data[1][0]["url"] words = data[1][0]["words"] else: result = data[1][0]["en_url"] words = data[1][0]["en_words"] else : result = "no this route" return {"state":"success","url" : result,"words" : words} except Exception as e: return {"state":"fail","message" :str(e)} @dbRouter.get("/static_tickets") async def static_tickets(is_Chinese : int = None): try: data =None if is_Chinese : data, count = supabase.table('101_ticket')\ .select('*')\ .in_('id', [1,3,6,7])\ .execute() else : data, count = supabase.table('101_ticket')\ .select('*')\ .in_('id', [182,183,180])\ .execute() result = [] for shop in data[1] : json = { "type" : shop["type"], "info" : shop } result.append(json) return {"state":"success","result" : result} except Exception as e: return {"state":"fail","message" :str(e)} @dbRouter.get("/ad/{type}") def read_root(type:str,language :str = "ch"): keyword1 :str keyword2 :str if type == "美食伴手禮": keyword1 = "餐飲" keyword2 = "伴手禮" else : keyword1 = "住宿" keyword2 = "伴手禮" data, count = supabase.table('101_brand')\ .select('*')\ .eq("floor","館外")\ .eq("language", language)\ .or_(f"tags.ilike.%{keyword1}%,tags.ilike.%{keyword2}%")\ .order("id", desc=True)\ .execute() result = data[1] # 從結果中隨機選擇一筆資料 random_row = choice(result) if language != "ch" : if random_row["floor"] == "館外" : random_row["floor"] = "outside" #print(random_row) return {"data": random_row} @dbRouter.post("/message_not_in_cache") def message_not_in_cache(question :str ,answer :str,client_id : str = "0",language:str = "ch" ): try: data, count = supabase.table('client_message').select('*').eq("question",question).execute() if len(data[1]) != 0 : return {"state": 200 , "message" : "have saved"} data, count = supabase.table('client_message').insert({"client_id": client_id, "question": question,"answer":answer,"language":language}).execute() return {"state": 200 , "message" : "success"} except Exception as e: return {"state": 500 , "message" : str(e)} from typing import List, Optional from pydantic import BaseModel class MessageSaveRequest(BaseModel): question: str answer: str data_list: str @dbRouter.post("/message_save") async def message_save(request:MessageSaveRequest ): try : data, count = supabase.table("log_record").insert({ "question": request.question, "answer": request.answer, "data_list": request.data_list, # "mp3_url": mp3_url }).execute() return {"state": 200 , "message" : "success"} except Exception as e: return {"state": 500 , "message" : str(e)} @dbRouter.post("/message_save_mp3") async def message_save(question:str = Body(None),answer: str = Body(None),data_list: str= Body(None), mp3_file: UploadFile = File(None) ): try : mp3_url = None # 初始化 mp3_url date_time = datetime.now().strftime('%Y-%m-%d_%H:%M:%S') new_filename = f"{mp3_file.filename.split('.')[0]}_{date_time}.mp3" if mp3_file: # 检查是否提供了 MP3 文件 # 定义保存文件的路径 save_path = os.path.join("static", "mp3", new_filename) # 确保目录存在 os.makedirs(os.path.dirname(save_path), exist_ok=True) # 将上传的文件保存到指定路径 with open(save_path, "wb") as f: f.write(await mp3_file.read()) mp3_url = save_path # 设置 mp3_url data, count = supabase.table("log_record").insert({ "question": question, "answer": answer, "data_list": data_list, "mp3_url": f"/{mp3_url}" }).execute() return {"state": 200 , "message" : "success"} except Exception as e: return {"state": 500 , "message" : str(e)} from pydantic import BaseModel, EmailStr import base64 import pickle from email.mime.text import MIMEText from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build import os SCOPES = ['https://www.googleapis.com/auth/gmail.send'] class dataform(BaseModel): title: str content: str client_name: str gender: str email: EmailStr phone: str type:str def send_email(to_email,from_email,message): creds = None # 如果存在 token.pickle 文件,讀取 if os.path.exists('token.pickle'): with open('token.pickle', 'rb') as token: creds = pickle.load(token) # 如果沒有有效的憑據,就進行登入 if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( 'credentials.json', SCOPES) creds = flow.run_local_server(port=0) # 保存憑據 with open('token.pickle', 'wb') as token: pickle.dump(creds, token) service = build('gmail', 'v1', credentials=creds) # 設定郵件 message = MIMEText(message) message['to'] = to_email message['from'] = from_email message['subject'] = '101 ai客服 表單新資料' raw = base64.urlsafe_b64encode(message.as_bytes()).decode() # 發送郵件 try: message = service.users().messages().send(userId='me', body={'raw': raw}).execute() print(f'已發送郵件: {message["id"]}') return "success" except Exception as error: print(f'發送郵件時出錯: {error}') return "fail" @dbRouter.post("/insert_table") def insert_table(data: dataform): try: response,count = supabase.table('lost_property').insert(data.dict()).execute() email_content = f"問題類別:{response[1][0]['type']}\n標題:{response[1][0]['title']}\n內容:{response[1][0]['content']}\n姓名:{response[1][0]['client_name']}\n性別:{response[1][0]['gender']}\n電子郵件:{response[1][0]['email']}\n聯絡電話:{response[1][0]['phone']}\n" try: send_email("mallservice@tfc101.com.tw","mia@choozmo.com",str(email_content)) # mallservice@tfc101.com.tw except Exception as e: print(str(e)) return {"state": 200 ,"message": "資料已成功提交"} except Exception as e: return {"state": 500 , "message" : str(e)} from sherry.semantic_search import ask_question,ask_question_find_brand @dbRouter.post("/video_cache") async def video_cache(client_message :str,language:str ="ch"): try: # data, count = supabase.table('video_cache').select('*').like('question', f'%{client_message}%').execute() # if len(data[1]) == 0 : # return {"state": 500 , "message" : "no data"} # return {"state": 200 , "message" : data[1]} result = ask_question(client_message,language=language) data = await search_date(client_message,language=language) # result[0]["answer"] if result == None : return {"state": 500 , "message" : "no data"} # data, count = supabase.table("log_record").insert({"question":client_message, "answer":result[0]["answer"]}).execute() return {"state": 200 , "message" : result ,"data":data} except Exception as e: return {"state": 500 , "message" : str(e)} from openai import OpenAI import json client = OpenAI( # This is the default and can be omitted api_key=os.environ.get("OPENAI_API_KEY"), ) def access_openai(prompt_value): chat_completion = client.chat.completions.create( messages=[ { "role": "user", "content": f"請將以下的內容翻譯為韓文:\n\n {prompt_value}", } ], model="gpt-3.5-turbo", ) return chat_completion.choices[0].message.content @dbRouter.post("/translate") def translate(): try: response = supabase.table('video_cache').select('*').eq('language', 'ch').execute() datas = response.data for data in datas : translated_question = access_openai(data['question']) translated_answer = access_openai(data['answer']) print(data['question']) print(translated_question) insert = supabase.table('client_message').insert({"client_id":"0", "question":translated_question,"answer":translated_answer,"language":"ko"}).execute() return {"state": 200 } except Exception as e: return {"state": 500 , "message" : str(e)} import spacy import jieba @dbRouter.post("/search_date") async def search_date(question:str,language:str="ch"): try: global nlp,exclude_conditions,keywords if language == "ch": nlp = spacy.load("zh_core_web_sm") exclude_languages = ["韓文", "日文", "英文"] stop_words = ["請問","停車","收費","方式"] # 處理輸入 doc = jieba.lcut(question) # 提取關鍵字 keywords = [word for word in doc if len(word) > 1 and word != '101' and word not in stop_words] elif language == "en": nlp = spacy.load("en_core_web_sm") exclude_languages = ["韓文", "日文", "中文"] stop_words = nlp.Defaults.stop_words doc = nlp(question) keywords = [token.text for token in doc if token.text != '101' and not token.is_stop] elif language == "jp": nlp = spacy.load("ja_core_news_sm") exclude_languages = ["韓文", "英文", "中文"] doc = nlp(question) keywords = [token.text for token in doc if token.text != '101'] elif language == "ko": nlp = spacy.load("ko_core_news_sm") exclude_languages = ["日文", "英文", "中文"] doc = nlp(question) keywords = [token.text for token in doc if token.text != '101'] if len(keywords) == 0 : return None print(keywords) # 構築條件 brand_query = supabase.from_("101_brand").select("*").eq("language",language) # 生成查询条件,分别针对 tags 和 content 字段 keywords_condition = [] for keyword in keywords: keywords_condition.append(f"tags.ilike.%{keyword}%") keywords_condition.append(f"content.ilike.%{keyword}%") # 使用 'or' 运算符连接条件 conditions_str = ",".join(keywords_condition) # 查询 101_brand 表 brand_query = brand_query.or_(conditions_str) # 排除其他國家語言標籤 # for lang in exclude_languages: # brand_query = brand_query.not_.ilike("tags", f"%{lang}%") brand_results = brand_query.execute() keywords_condition = [] for keyword in keywords: keywords_condition.append(f"tags.ilike.%{keyword}%") keywords_condition.append(f"description.ilike.%{keyword}%") # 使用 'or' 运算符连接条件 conditions_str = ",".join(keywords_condition) # 查詢 101_ticket 表 ticket_query = supabase.from_("101_ticket").select("*").eq("is_avilible",True).or_(conditions_str) # 排除其他國家語言標籤 for lang in exclude_languages: ticket_query = ticket_query.not_.ilike("tags", f"%{lang}%") ticket_results = ticket_query.execute() merged_results = [] for record in ticket_results.data: merged_results.append({ "type": record.get("type"), "info": record }) # 格式化 `101_brand` 的结果 for record in brand_results.data: merged_results.append({ "type": record.get("type"), "info": record }) return merged_results except Exception as e: return {"state": 500 , "message" : str(e)} @dbRouter.post("/close_not_stage") async def close_not_stage(): try : request = supabase.table("101_ticket").select("*").execute() for data in request.data : # print(data["title"]) if "stage101" not in data["website_url"] : _ = supabase.table('101_ticket') \ .update({'is_avilible':False})\ .eq('id', data["id"])\ .execute() print(data["title"]," close") except Exception as e: return {"state": 500 , "message" : str(e)}