from fastapi import APIRouter from supabase import create_client, Client from dotenv import load_dotenv import os from datetime import datetime from random import choice from openai import OpenAI from typing import Annotated from pydantic import Field client = OpenAI() load_dotenv() # supaspace 連線 url: str = os.environ.get('SUPABASE_URL') key: str = os.environ.get('SUPABASE_KEY') supabase: Client = create_client(url, key) dbRouter = APIRouter() @dbRouter.get("/click") def add_click_time(): try: response = supabase.table('click_time').select("*").execute() click_time = response.data[0]['click_time'] + 1 data, count = supabase.table('click_time') \ .update({'click_time':click_time,'update_time':str(datetime.now())})\ .eq('id', 1)\ .execute() return {"state":"success","click_time" : click_time} except Exception as e: return {"state":str(e)} @dbRouter.get("/find_brand") def find_brand(keyword:str = None,language :str = "ch",page_num : int = None,page_amount: int = None,search_name : str = None): if keyword is None : query = supabase.table('101_brand').select('*').eq("language", language) else : keyword_list = keyword.split(",") query= supabase.table('101_brand').select('*').eq("language", language) for keyword_tmp in keyword_list : query = query.like('tags', f'%{keyword_tmp}%') if search_name: query = query.like('name', f'%{search_name}%') result,_ = query.execute() count = len(result[1]) if page_num and page_amount : offset = (page_num - 1) * page_amount query = query.range(offset, offset + page_amount-1) try: data,_ = query.execute() result = [] for shop in data[1] : json = { "type" : shop["type"], "info" : shop } if language != "ch" : if shop["floor"] == "館外" : json["info"]["floor"] = "outside" result.append(json) return {"state":"success","all_num" : count,"data" : result} except Exception as e: return {"state":"fail","message" :str(e)} @dbRouter.get("/arviews") def arviews(start:str,end:str,language:str = "ch"): try : data, count = supabase.table('101_arviews')\ .select('*')\ .eq('start_loc', start) \ .like('tour_place', f'%{end}%') \ .execute() result :str words :str if len(data[1]) != 0: if language == "ch" : result = data[1][0]["url"] words = data[1][0]["words"] else: result = data[1][0]["en_url"] words = data[1][0]["en_words"] else : result = "no this route" return {"state":"success","url" : result,"words" : words} except Exception as e: return {"state":"fail","message" :str(e)} @dbRouter.get("/static_tickets") async def static_tickets(is_Chinese : int = None): try: data =None if is_Chinese : data, count = supabase.table('101_ticket')\ .select('*')\ .in_('id', [1,3,6,7])\ .execute() else : data, count = supabase.table('101_ticket')\ .select('*')\ .in_('id', [182,183,180])\ .execute() result = [] for shop in data[1] : json = { "type" : shop["type"], "info" : shop } result.append(json) return {"state":"success","result" : result} except Exception as e: return {"state":"fail","message" :str(e)} @dbRouter.get("/ad/{type}") def read_root(type:str,language :str = "ch"): keyword1 :str keyword2 :str if type == "美食伴手禮": keyword1 = "餐飲" keyword2 = "伴手禮" else : keyword1 = "住宿" keyword2 = "伴手禮" data, count = supabase.table('101_brand')\ .select('*')\ .eq("floor","館外")\ .eq("language", language)\ .or_(f"tags.ilike.%{keyword1}%,tags.ilike.%{keyword2}%")\ .execute() result = data[1] # 從結果中隨機選擇一筆資料 random_row = choice(result) if language != "ch" : if random_row["floor"] == "館外" : random_row["floor"] = "outside" #print(random_row) return {"data": random_row} @dbRouter.post("/message_not_in_cache") def message_not_in_cache(question :str ,answer :str,client_id : str = "0" ): try: data, count = supabase.table('client_message').select('*').eq("question",question).execute() if len(data[1]) != 0 : return {"state": 200 , "message" : "have saved"} data, count = supabase.table('client_message').insert({"client_id": client_id, "question": question,"answer":answer}).execute() return {"state": 200 , "message" : "success"} except Exception as e: return {"state": 500 , "message" : str(e)} @dbRouter.post("/video_save_into_cache") def message_not_in_cache(video_name : Annotated[str, Field(description="檔案請丟進/home/mia/101/static/video_cache/others/資料夾裡")],client_message_id :str = None,question:str = None): try: data = [] if client_message_id : data, count = supabase.table('client_message').select('*').eq("id",client_message_id).execute() elif question: data, count = supabase.table('client_message').select('*').eq("question",question).execute() response = supabase.table('video_cache').insert({"question": data[1]["question"],"answer":data[1]["answer"],"video_url":f"/static/video_cache/others/{video_name}"}).execute() response = supabase.table('client_message').delete().eq('id', data[1]["id"]).execute() return {"state": 200 , "message" : "success"} except Exception as e: return {"state": 500 , "message" : str(e)} @dbRouter.post("/video_cache") def video_cache(client_message :str ): try: data, count = supabase.table('video_cache').select('*').eq("question",client_message).execute() if len(data[1]) == 0 : return {"state": 500 , "message" : "no data"} return {"state": 200 , "message" : data[1]} except Exception as e: return {"state": 500 , "message" : str(e)}