123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582 |
- from fastapi import APIRouter,UploadFile, File,Body
- from supabase import create_client, Client
- from dotenv import load_dotenv
- import os
- from datetime import datetime
- from random import choice
- from typing import Annotated
- from pydantic import Field
- load_dotenv()
- # supaspace 連線
- url: str = os.environ.get('SUPABASE_URL')
- key: str = os.environ.get('SUPABASE_KEY')
- supabase: Client = create_client(url, key)
- dbRouter = APIRouter()
- @dbRouter.get("/click")
- def add_click_time(click_type :str = None,brand:str = None):
- try:
- if click_type == None :
- response = supabase.table('click_time').select("*").eq('id', 1).execute()
- click_time = response.data[0]['click_time'] + 1
-
- data, count = supabase.table('click_time') \
- .update({'click_time':click_time,'update_time':str(datetime.now())})\
- .eq('id', 1)\
- .execute()
-
- elif click_type == "card" :
- response = supabase.table('click_time').select("*").eq('id', 2).execute()
- click_time = response.data[0]['click_time'] + 1
-
- data, count = supabase.table('click_time') \
- .update({'click_time':click_time,'update_time':str(datetime.now())})\
- .eq('id', 2)\
- .execute()
-
- elif click_type == "card-en" :
- response = supabase.table('click_time').select("*").eq('name', "國際貴賓卡-en").execute()
- click_time = response.data[0]['click_time'] + 1
- print(response)
-
- data, count = supabase.table('click_time') \
- .update({'click_time':click_time,'update_time':str(datetime.now())})\
- .eq('id', 3)\
- .execute()
-
- elif click_type == "click_url" and brand:
- response = supabase.table('101_brand').select("*").eq("name",brand).execute()
- click_time = response.data[0][click_type] + 1
-
- data, count = supabase.table('101_brand') \
- .update({click_type:click_time})\
- .eq("name",brand)\
- .execute()
-
- elif click_type == "click_address" and brand:
- response = supabase.table('101_brand').select("*").eq("name",brand).execute()
- click_time = response.data[0][click_type] + 1
-
- data, count = supabase.table('101_brand') \
- .update({click_type:click_time})\
- .eq("name",brand)\
- .execute()
-
- return {"state":"success","click_time" : click_time}
- except Exception as e:
-
- return {"state":str(e)}
-
- @dbRouter.get("/find_brand")
- def find_brand(keyword:str = None,language :str = "ch",page_num : int = None,page_amount: int = None,search_name : str = None):
- if keyword is None :
- query = supabase.table('101_brand').select('*').eq("language", language)
- else :
- keyword_list = keyword.split(",")
- query= supabase.table('101_brand').select('*').eq("language", language)
- for keyword_tmp in keyword_list :
- query = query.like('tags', f'%{keyword_tmp}%')
- if search_name:
- query = query.like('name', f'%{search_name}%')
- result,_ = query.order("id", desc=True).execute()
- count = len(result[1])
- if page_num and page_amount :
- offset = (page_num - 1) * page_amount
- query = query.range(offset, offset + page_amount-1)
- try:
- data,_ = query.execute()
- result = []
- for shop in data[1] :
- json = {
- "type" : shop["type"],
- "info" : shop
- }
- if language != "ch" :
- if shop["floor"] == "館外" :
- json["info"]["floor"] = "outside"
- result.append(json)
- return {"state":"success","all_num" : count,"data" : result}
- except Exception as e:
- return {"state":"fail","message" :str(e)}
-
- @dbRouter.get("/arviews")
- def arviews(start:str,end:str,language:str = "ch"):
- try :
- data, count = supabase.table('101_arviews')\
- .select('*')\
- .eq('start_loc', start) \
- .like('tour_place', f'%{end}%') \
- .execute()
- result :str
- words :str
- if len(data[1]) != 0:
- if language == "ch" :
- result = data[1][0]["url"]
- words = data[1][0]["words"]
- else:
- result = data[1][0]["en_url"]
- words = data[1][0]["en_words"]
- else :
- result = "no this route"
- return {"state":"success","url" : result,"words" : words}
- except Exception as e:
- return {"state":"fail","message" :str(e)}
- @dbRouter.get("/static_tickets")
- async def static_tickets(is_Chinese : int = None):
- try:
- data =None
- if is_Chinese :
- data, count = supabase.table('101_ticket')\
- .select('*')\
- .in_('id', [1,3,6,7])\
- .execute()
- else :
- data, count = supabase.table('101_ticket')\
- .select('*')\
- .in_('id', [182,183,180])\
- .execute()
- result = []
-
- for shop in data[1] :
- json = {
- "type" : shop["type"],
- "info" : shop
- }
- result.append(json)
- return {"state":"success","result" : result}
- except Exception as e:
- return {"state":"fail","message" :str(e)}
- @dbRouter.get("/ad/{type}")
- def read_root(type:str,language :str = "ch"):
- keyword1 :str
- keyword2 :str
- if type == "美食伴手禮":
- keyword1 = "餐飲"
- keyword2 = "伴手禮"
- else :
- keyword1 = "住宿"
- keyword2 = "伴手禮"
- data, count = supabase.table('101_brand')\
- .select('*')\
- .eq("floor","館外")\
- .eq("language", language)\
- .or_(f"tags.ilike.%{keyword1}%,tags.ilike.%{keyword2}%")\
- .order("id", desc=True)\
- .execute()
-
- result = data[1]
- # 從結果中隨機選擇一筆資料
- random_row = choice(result)
- if language != "ch" :
- if random_row["floor"] == "館外" :
- random_row["floor"] = "outside"
- #print(random_row)
-
- return {"data": random_row}
- @dbRouter.post("/message_not_in_cache")
- def message_not_in_cache(question :str ,answer :str,client_id : str = "0",language:str = "ch" ):
- try:
- data, count = supabase.table('client_message').select('*').eq("question",question).execute()
-
- if len(data[1]) != 0 :
- return {"state": 200 , "message" : "have saved"}
-
- data, count = supabase.table('client_message').insert({"client_id": client_id, "question": question,"answer":answer,"language":language}).execute()
- return {"state": 200 , "message" : "success"}
-
- except Exception as e:
- return {"state": 500 , "message" : str(e)}
-
- from typing import List, Optional
- from pydantic import BaseModel
-
- class MessageSaveRequest(BaseModel):
- question: str
- answer: str
- data_list: str
-
- @dbRouter.post("/message_save")
- async def message_save(request:MessageSaveRequest ):
- try :
- data, count = supabase.table("log_record").insert({
- "question": request.question,
- "answer": request.answer,
- "data_list": request.data_list,
- # "mp3_url": mp3_url
- }).execute()
- return {"state": 200 , "message" : "success"}
-
- except Exception as e:
- return {"state": 500 , "message" : str(e)}
-
- @dbRouter.post("/message_save_mp3")
- async def message_save(question:str = Body(None),answer: str = Body(None),data_list: str= Body(None), mp3_file: UploadFile = File(None) ):
- try :
- mp3_url = None # 初始化 mp3_url
- date_time = datetime.now().strftime('%Y-%m-%d_%H:%M:%S')
- new_filename = f"{mp3_file.filename.split('.')[0]}_{date_time}.mp3"
-
- if mp3_file: # 检查是否提供了 MP3 文件
- # 定义保存文件的路径
- save_path = os.path.join("static", "mp3", new_filename)
-
- # 确保目录存在
- os.makedirs(os.path.dirname(save_path), exist_ok=True)
-
- # 将上传的文件保存到指定路径
- with open(save_path, "wb") as f:
- f.write(await mp3_file.read())
-
- mp3_url = save_path # 设置 mp3_url
- data, count = supabase.table("log_record").insert({
- "question": question,
- "answer": answer,
- "data_list": data_list,
- "mp3_url": f"/{mp3_url}"
- }).execute()
- return {"state": 200 , "message" : "success"}
-
- except Exception as e:
- return {"state": 500 , "message" : str(e)}
-
- from pydantic import BaseModel, EmailStr
- import base64
- import pickle
- from email.mime.text import MIMEText
- from google.auth.transport.requests import Request
- from google.oauth2.credentials import Credentials
- from google_auth_oauthlib.flow import InstalledAppFlow
- from googleapiclient.discovery import build
- import os
- SCOPES = ['https://www.googleapis.com/auth/gmail.send']
- class dataform(BaseModel):
- title: str
- content: str
- client_name: str
- gender: str
- email: EmailStr
- phone: str
- type:str
- def send_email(to_email,from_email,message):
- creds = None
- # 如果存在 token.pickle 文件,讀取
- if os.path.exists('token.pickle'):
- with open('token.pickle', 'rb') as token:
- creds = pickle.load(token)
- # 如果沒有有效的憑據,就進行登入
- if not creds or not creds.valid:
- if creds and creds.expired and creds.refresh_token:
- creds.refresh(Request())
- else:
- flow = InstalledAppFlow.from_client_secrets_file(
- 'credentials.json', SCOPES)
- creds = flow.run_local_server(port=0)
- # 保存憑據
- with open('token.pickle', 'wb') as token:
- pickle.dump(creds, token)
- service = build('gmail', 'v1', credentials=creds)
- # 設定郵件
- message = MIMEText(message)
- message['to'] = to_email
- message['from'] = from_email
- message['subject'] = '101 ai客服 表單新資料'
- raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
- # 發送郵件
- try:
- message = service.users().messages().send(userId='me', body={'raw': raw}).execute()
- print(f'已發送郵件: {message["id"]}')
- return "success"
- except Exception as error:
- print(f'發送郵件時出錯: {error}')
- return "fail"
-
- @dbRouter.post("/insert_table")
- def insert_table(data: dataform):
- try:
- response,count = supabase.table('lost_property').insert(data.dict()).execute()
- email_content = f"問題類別:{response[1][0]['type']}\n標題:{response[1][0]['title']}\n內容:{response[1][0]['content']}\n姓名:{response[1][0]['client_name']}\n性別:{response[1][0]['gender']}\n電子郵件:{response[1][0]['email']}\n聯絡電話:{response[1][0]['phone']}\n"
- try:
- send_email("mallservice@tfc101.com.tw","mia@choozmo.com",str(email_content)) # mallservice@tfc101.com.tw
- except Exception as e:
- print(str(e))
-
- return {"state": 200 ,"message": "資料已成功提交"}
-
- except Exception as e:
- return {"state": 500 , "message" : str(e)}
-
-
- from sherry.semantic_search import ask_question,ask_question_find_brand
-
- @dbRouter.post("/video_cache")
- async def video_cache(client_message :str,language:str ="ch"):
- try:
- # data, count = supabase.table('video_cache').select('*').like('question', f'%{client_message}%').execute()
- # if len(data[1]) == 0 :
- # return {"state": 500 , "message" : "no data"}
- # return {"state": 200 , "message" : data[1]}
- result = ask_question(client_message,language=language)
- data = await search_date(client_message,language=language)
- # result[0]["answer"]
- if result == None :
- return {"state": 500 , "message" : "no data"}
- # data, count = supabase.table("log_record").insert({"question":client_message, "answer":result[0]["answer"]}).execute()
-
- return {"state": 200 , "message" : result ,"data":data}
-
- except Exception as e:
- return {"state": 500 , "message" : str(e)}
- from openai import OpenAI
- import json
- client = OpenAI(
- # This is the default and can be omitted
- api_key=os.environ.get("OPENAI_API_KEY"),
- )
- def access_openai(prompt_value):
- chat_completion = client.chat.completions.create(
- messages=[
- {
- "role": "user",
- "content": f"請將以下的內容翻譯為韓文:\n\n {prompt_value}",
- }
- ],
- model="gpt-3.5-turbo",
- )
-
- return chat_completion.choices[0].message.content
- @dbRouter.post("/translate")
- def translate():
- try:
- response = supabase.table('video_cache').select('*').eq('language', 'ch').execute()
- datas = response.data
- for data in datas :
- translated_question = access_openai(data['question'])
- translated_answer = access_openai(data['answer'])
- print(data['question'])
- print(translated_question)
- insert = supabase.table('client_message').insert({"client_id":"0", "question":translated_question,"answer":translated_answer,"language":"ko"}).execute()
- return {"state": 200 }
-
- except Exception as e:
- return {"state": 500 , "message" : str(e)}
- import spacy
- import jieba
- @dbRouter.post("/search_date")
- async def search_date(question:str,language:str="ch"):
- try:
- global nlp,exclude_conditions,keywords
- if language == "ch":
-
- nlp = spacy.load("zh_core_web_sm")
- exclude_languages = ["韓文", "日文", "英文"]
- stop_words = ["請問","停車","收費","方式"]
- # 處理輸入
- doc = jieba.lcut(question)
- # 提取關鍵字
- keywords = [word for word in doc if len(word) > 1 and word != '101' and word not in stop_words]
-
- elif language == "en":
- nlp = spacy.load("en_core_web_sm")
- exclude_languages = ["韓文", "日文", "中文"]
- stop_words = nlp.Defaults.stop_words
- doc = nlp(question)
- keywords = [token.text for token in doc if token.text != '101' and not token.is_stop]
- elif language == "jp":
- nlp = spacy.load("ja_core_news_sm")
- exclude_languages = ["韓文", "英文", "中文"]
- doc = nlp(question)
- keywords = [token.text for token in doc if token.text != '101']
- elif language == "ko":
- nlp = spacy.load("ko_core_news_sm")
- exclude_languages = ["日文", "英文", "中文"]
- doc = nlp(question)
- keywords = [token.text for token in doc if token.text != '101']
-
- if len(keywords) == 0 :
- return None
-
- print(keywords)
- # 構築條件
- brand_query = supabase.from_("101_brand").select("*").eq("language",language)
-
- # 生成查询条件,分别针对 tags 和 content 字段
- keywords_condition = []
- for keyword in keywords:
- keywords_condition.append(f"tags.ilike.%{keyword}%")
- keywords_condition.append(f"content.ilike.%{keyword}%")
- # 使用 'or' 运算符连接条件
- conditions_str = ",".join(keywords_condition)
- # 查询 101_brand 表
- brand_query = brand_query.or_(conditions_str)
- # 排除其他國家語言標籤
- # for lang in exclude_languages:
- # brand_query = brand_query.not_.ilike("tags", f"%{lang}%")
- brand_results = brand_query.execute()
- keywords_condition = []
- for keyword in keywords:
- keywords_condition.append(f"tags.ilike.%{keyword}%")
- keywords_condition.append(f"description.ilike.%{keyword}%")
- # 使用 'or' 运算符连接条件
- conditions_str = ",".join(keywords_condition)
- # 查詢 101_ticket 表
- ticket_query = supabase.from_("101_ticket").select("*").eq("is_avilible",True).or_(conditions_str)
- # 排除其他國家語言標籤
- for lang in exclude_languages:
- ticket_query = ticket_query.not_.ilike("tags", f"%{lang}%")
- ticket_results = ticket_query.execute()
-
- merged_results = []
- for record in ticket_results.data:
- merged_results.append({
- "type": record.get("type"),
- "info": record
- })
- # 格式化 `101_brand` 的结果
- for record in brand_results.data:
- merged_results.append({
- "type": record.get("type"),
- "info": record
- })
- return merged_results
-
- except Exception as e:
- return {"state": 500 , "message" : str(e)}
- @dbRouter.post("/close_not_stage")
- async def close_not_stage():
- try :
- request = supabase.table("101_ticket").select("*").execute()
-
- for data in request.data :
- # print(data["title"])
- if "stage101" not in data["website_url"] :
- _ = supabase.table('101_ticket') \
- .update({'is_avilible':False})\
- .eq('id', data["id"])\
- .execute()
- print(data["title"]," close")
- except Exception as e:
- return {"state": 500 , "message" : str(e)}
|