|
- from fastapi import APIRouter
- from supabase import create_client, Client
- from dotenv import load_dotenv
- import os
- from datetime import datetime
- from random import choice
- from openai import OpenAI
- from typing import Annotated
- from pydantic import Field
- client = OpenAI()
- load_dotenv()
- # supaspace 連線
- url: str = os.environ.get('SUPABASE_URL')
- key: str = os.environ.get('SUPABASE_KEY')
- supabase: Client = create_client(url, key)
- dbRouter = APIRouter()
- @dbRouter.get("/click")
- def add_click_time():
- try:
- response = supabase.table('click_time').select("*").execute()
- click_time = response.data[0]['click_time'] + 1
-
- data, count = supabase.table('click_time') \
- .update({'click_time':click_time,'update_time':str(datetime.now())})\
- .eq('id', 1)\
- .execute()
-
- return {"state":"success","click_time" : click_time}
- except Exception as e:
-
- return {"state":str(e)}
-
- @dbRouter.get("/find_brand")
- def find_brand(keyword:str = None,language :str = "ch",page_num : int = None,page_amount: int = None,search_name : str = None):
- if keyword is None :
- query = supabase.table('101_brand').select('*').eq("language", language)
- else :
- keyword_list = keyword.split(",")
- query= supabase.table('101_brand').select('*').eq("language", language)
- for keyword_tmp in keyword_list :
- query = query.like('tags', f'%{keyword_tmp}%')
- if search_name:
- query = query.like('name', f'%{search_name}%')
- result,_ = query.execute()
- count = len(result[1])
- if page_num and page_amount :
- offset = (page_num - 1) * page_amount
- query = query.range(offset, offset + page_amount-1)
- try:
- data,_ = query.execute()
- result = []
- for shop in data[1] :
- json = {
- "type" : shop["type"],
- "info" : shop
- }
- if language != "ch" :
- if shop["floor"] == "館外" :
- json["info"]["floor"] = "outside"
- result.append(json)
- return {"state":"success","all_num" : count,"data" : result}
- except Exception as e:
- return {"state":"fail","message" :str(e)}
-
- @dbRouter.get("/arviews")
- def arviews(start:str,end:str,language:str = "ch"):
- try :
- data, count = supabase.table('101_arviews')\
- .select('*')\
- .eq('start_loc', start) \
- .like('tour_place', f'%{end}%') \
- .execute()
- result :str
- words :str
- if len(data[1]) != 0:
- if language == "ch" :
- result = data[1][0]["url"]
- words = data[1][0]["words"]
- else:
- result = data[1][0]["en_url"]
- words = data[1][0]["en_words"]
- else :
- result = "no this route"
- return {"state":"success","url" : result,"words" : words}
- except Exception as e:
- return {"state":"fail","message" :str(e)}
- @dbRouter.get("/static_tickets")
- async def static_tickets(is_Chinese : int = None):
- try:
- data =None
- if is_Chinese :
- data, count = supabase.table('101_ticket')\
- .select('*')\
- .in_('id', [1,3,6,7])\
- .execute()
- else :
- data, count = supabase.table('101_ticket')\
- .select('*')\
- .in_('id', [182,183,180])\
- .execute()
- result = []
-
- for shop in data[1] :
- json = {
- "type" : shop["type"],
- "info" : shop
- }
- result.append(json)
- return {"state":"success","result" : result}
- except Exception as e:
- return {"state":"fail","message" :str(e)}
- @dbRouter.get("/ad/{type}")
- def read_root(type:str,language :str = "ch"):
- keyword1 :str
- keyword2 :str
- if type == "美食伴手禮":
- keyword1 = "餐飲"
- keyword2 = "伴手禮"
- else :
- keyword1 = "住宿"
- keyword2 = "伴手禮"
- data, count = supabase.table('101_brand')\
- .select('*')\
- .eq("floor","館外")\
- .eq("language", language)\
- .or_(f"tags.ilike.%{keyword1}%,tags.ilike.%{keyword2}%")\
- .execute()
-
- result = data[1]
- # 從結果中隨機選擇一筆資料
- random_row = choice(result)
- if language != "ch" :
- if random_row["floor"] == "館外" :
- random_row["floor"] = "outside"
- #print(random_row)
-
- return {"data": random_row}
- @dbRouter.post("/message_not_in_cache")
- def message_not_in_cache(question :str ,answer :str,client_id : str = "0" ):
- try:
- data, count = supabase.table('client_message').select('*').eq("question",question).execute()
- if len(data[1]) != 0 :
- return {"state": 200 , "message" : "have saved"}
-
- data, count = supabase.table('client_message').insert({"client_id": client_id, "question": question,"answer":answer}).execute()
-
- return {"state": 200 , "message" : "success"}
-
- except Exception as e:
- return {"state": 500 , "message" : str(e)}
-
- @dbRouter.post("/video_save_into_cache")
- def message_not_in_cache(video_name : Annotated[str, Field(description="檔案請丟進/home/mia/101/static/video_cache/others/資料夾裡")],client_message_id :str = None,question:str = None):
- try:
- data = []
- if client_message_id :
- data, count = supabase.table('client_message').select('*').eq("id",client_message_id).execute()
- elif question:
- data, count = supabase.table('client_message').select('*').eq("question",question).execute()
- response = supabase.table('video_cache').insert({"question": data[1]["question"],"answer":data[1]["answer"],"video_url":f"/static/video_cache/others/{video_name}"}).execute()
-
- response = supabase.table('client_message').delete().eq('id', data[1]["id"]).execute()
-
- return {"state": 200 , "message" : "success"}
-
- except Exception as e:
- return {"state": 500 , "message" : str(e)}
-
- @dbRouter.post("/video_cache")
- def video_cache(client_message :str ):
- try:
- data, count = supabase.table('video_cache').select('*').eq("question",client_message).execute()
- if len(data[1]) == 0 :
- return {"state": 500 , "message" : "no data"}
-
- return {"state": 200 , "message" : data[1]}
-
- except Exception as e:
- return {"state": 500 , "message" : str(e)}
-
|