Kaynağa Gözat

Initial commit

conrad 3 ay önce
işleme
d6dafdf872

+ 4 - 0
.gitignore

@@ -0,0 +1,4 @@
+.env
+chroma_db/
+__pycache__/
+speech_audio/

+ 53 - 0
api/openai_scripts_chinese/audio_processing.py

@@ -0,0 +1,53 @@
+from openai import OpenAI
+from api.openai_scripts_chinese.config import SYSTEM_PROMPT, OPENAI_API_KEY, SUPABASE_KEY, SUPABASE_URL
+from supabase import create_client, Client
+from api.openai_scripts_chinese.text_processing import fuzzy_correct_chinese
+
+client = OpenAI(api_key=OPENAI_API_KEY)
+supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
+
+def transcribe(audio_file):
+    try:
+        table_name = "word_database"
+        response = supabase.table(table_name).select("term").execute()
+        custom_vocab = []
+        if response.data:
+            for item in response.data:
+                custom_vocab.append({item['term']})
+        else:
+            print(f"No data found or an error occurred: {response.error}")
+            print("Using default dictionary as Supabase data couldn't be fetched.")
+        transcript = client.audio.transcriptions.create(
+            file=audio_file,
+            model="whisper-1",
+            response_format="text", 
+            prompt=f"請注意以下詞彙:{custom_vocab}"
+        )
+        return transcript
+    except Exception as e:
+        print(f"轉錄時發生錯誤:{str(e)}")
+        return None
+
+def post_process_transcript(transcript, temperature=0):
+    corrected_transcript = fuzzy_correct_chinese(transcript)
+    
+    messages = [
+        {"role": "system", "content": SYSTEM_PROMPT},
+        {"role": "user", "content": f"請校對並修正以下轉錄文本,但不要改變其原意或回答問題:\n\n{corrected_transcript}"}
+    ]
+
+    response = client.chat.completions.create(
+        model="gpt-4",
+        temperature=temperature,
+        messages=messages
+    )
+
+    return response.choices[0].message.content
+
+def process_audio(audio_data):
+    raw_transcript = transcribe(audio_data)
+    print(raw_transcript)
+    if raw_transcript is None:
+        return None, None
+    corrected_transcript = post_process_transcript(raw_transcript)
+    return raw_transcript, corrected_transcript

+ 31 - 0
api/openai_scripts_chinese/config.py

@@ -0,0 +1,31 @@
+import os
+from dotenv import load_dotenv
+
+load_dotenv()
+
+SUPABASE_URL: str = os.getenv('SUPABASE_URL')
+SUPABASE_KEY: str = os.getenv('SUPABASE_KEY')
+OPENAI_API_KEY: str = os.getenv('OPENAI_API_KEY')
+
+if not SUPABASE_URL or not SUPABASE_KEY:
+    raise ValueError("SUPABASE_URL and SUPABASE_KEY must be set in the .env file")
+
+
+SYSTEM_PROMPT = """你是一位專業的中文轉錄校對助理,專門處理有關長照/日照中心的中文對話轉錄。
+你的任務是:
+1. 確保以下專業術語的準確性:垃圾、冷氣、電風扇、喝水、倒垃圾、上廁所、上班、看孫子、走路、起床、幾點、毛筆、冰箱、哥哥、妹妹、菜市場、夜市、工作、老闆。
+2. 在必要時添加適當的標點符號,如句號、逗號
+3. 使用台灣的繁體中文,確保語言表達符合台灣的用語習慣。
+4. 只更正明顯的錯誤或改善可讀性,不要改變原文的意思或結構。
+5. 不要回答問題、解釋概念或添加任何不在原文中的信息。
+6. 如果原文是一個問句,保持它的問句形式,不要提供答案。
+
+請只根據提供的原文進行必要的更正,不要添加或刪除任何實質性內容。在修正時,請特別注意上下文,確保修正後的詞語符合整句話的語境。"""
+
+CORRECT_TERMS = [
+    "日照中心", "吃飯", "垃圾", "冷氣", "孫子", "電風扇", "喝水", "倒垃圾", "上廁所", "上班", "看孫子", "聯繫", "走路", "起床", "幾點", "毛筆", "電視", "冰箱", "廚房", "可以", "哥哥", "妹妹", "夜市", "工作", "老闆"
+]
+
+ERROR_CORRECTION = {
+}
+

+ 27 - 0
api/openai_scripts_chinese/dictionary_loader.py

@@ -0,0 +1,27 @@
+import io
+import jieba
+from supabase import create_client, Client
+from api.openai_scripts_chinese.config import SUPABASE_URL, SUPABASE_KEY
+
+supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
+
+
+def load_word_database_dictionary_from_supabase():
+    table_name = "word_database"
+    response = supabase.table(table_name).select("term, weight, type").execute()
+    
+    if response.data:
+        dict_data = io.StringIO()
+        for item in response.data:
+            dict_data.write(f"{item['term']} {item['weight']} {item['type']}\n")
+        
+        dict_data.seek(0)
+        jieba.load_userdict(dict_data)
+        # print("Loaded dictionary from Supabase")
+        return True
+    else:
+        print(f"No data found or an error occurred: {response.error}")
+        print("Using default dictionary as Supabase data couldn't be fetched.")
+        return False
+
+

+ 47 - 0
api/openai_scripts_chinese/main_script.py

@@ -0,0 +1,47 @@
+import sys
+from api.openai_scripts_chinese.dictionary_loader import load_word_database_dictionary_from_supabase
+from api.openai_scripts_chinese.audio_processing import process_audio
+
+def initialize():
+    word_database_success = load_word_database_dictionary_from_supabase()
+    if not word_database_success:
+        print("Warning: Word Database Dictionary loading failed. Proceeding with default dictionary.")
+
+
+def process_audio_file(audio_file):
+    try:
+        result = process_audio(audio_file)
+        if isinstance(result, tuple) and len(result) == 2:
+            return result
+        else:
+            print("Unexpected result from process_audio")
+            return None, None
+    except Exception as e:
+        print(f"Error processing audio: {str(e)}")
+        return None, None
+    
+# 加入檢查user是否詢問特定問題
+def main(audio_file):
+    initialize()
+    try:
+        raw_transcript, corrected_transcript = process_audio_file(audio_file)
+        
+        return raw_transcript, corrected_transcript
+    except FileNotFoundError:
+        print(f"Error: The file was not found.")
+        return None, None
+    except Exception as e:
+        print(f"An unexpected error occurred: {str(e)}")
+        import traceback
+        traceback.print_exc()
+        return None, None
+        
+
+
+if __name__ == "__main__":
+    if len(sys.argv) != 2:
+        print("Usage: python script_name.py <audio_file_path>")
+        sys.exit(1)
+    
+    audio_file_path = sys.argv[1]
+    main(audio_file_path)

+ 72 - 0
api/openai_scripts_chinese/text_processing.py

@@ -0,0 +1,72 @@
+import jieba
+from pypinyin import pinyin, Style
+from api.openai_scripts_chinese.config import CORRECT_TERMS, ERROR_CORRECTION
+
+def chinese_soundex(pinyin_str):
+    soundex_map = {
+        'b': '1', 'p': '1', 'm': '1', 'f': '1',
+        'd': '2', 'n': '2', 'l': '2',
+        'g': '3', 'k': '3', 'h': '3', 't': '3',
+        'j': '4', 'q': '4', 'x': '4',
+        'zh': '5', 'ch': '5', 'sh': '5', 'r': '5',
+        'z': '6', 'c': '6', 's': '6',
+        'an': '7', 'ang': '7', 'en': '8', 'eng': '8', 'in': '8', 'ing': '8',
+        'ong': '9', 'un': '9', 'uan': '9',
+        'i': 'A', 'u': 'A', 'v': 'A',
+        'e': 'B', 'o': 'B',
+    }
+    
+    code = ''
+    tone = '0'
+    i = 0
+    while i < len(pinyin_str):
+        if pinyin_str[i:i+2] in soundex_map:
+            code += soundex_map[pinyin_str[i:i+2]]
+            i += 2
+        elif pinyin_str[i] in soundex_map:
+            code += soundex_map[pinyin_str[i]]
+            i += 1
+        elif pinyin_str[i].isdigit():
+            tone = pinyin_str[i]
+            i += 1
+        else:
+            i += 1
+    
+    code = code[:1] + ''.join(sorted(set(code[1:])))
+    return (code[:3] + tone).ljust(4, '0')
+
+def compare_chinese_words(word1, word2, tone_sensitive=True):
+    pinyin1 = ''.join([p[0] for p in pinyin(word1, style=Style.TONE3, neutral_tone_with_five=True)])
+    pinyin2 = ''.join([p[0] for p in pinyin(word2, style=Style.TONE3, neutral_tone_with_five=True)])
+    
+    soundex1 = chinese_soundex(pinyin1)
+    # print(soundex1)
+    soundex2 = chinese_soundex(pinyin2)
+    # print('soundex2', soundex2)
+
+    if tone_sensitive:
+        return soundex1 == soundex2
+    else:
+        return soundex1[:3] == soundex2[:3]
+
+def fuzzy_correct_chinese(text):
+    words = jieba.lcut(text)
+    corrected_words = []
+    for word in words:
+        if word.isalpha():
+            corrected_words.append(word)
+            continue
+        word_pinyin = ''.join([p[0] for p in pinyin(word, style=Style.NORMAL)])
+        # print(f"Term: {word}, Pinyin: {word_pinyin}")
+        if word in ERROR_CORRECTION:
+            corrected_words.append(ERROR_CORRECTION[word])
+        else:
+            for term in CORRECT_TERMS:
+                if compare_chinese_words(word, term, tone_sensitive=True):
+                    # print(f"corrected: {word} -> {term}")
+                    corrected_words.append(term)
+                    break
+            else:
+                corrected_words.append(word)
+    return ''.join(corrected_words)
+

+ 97 - 0
api/openai_scripts_new/audio_processing.py

@@ -0,0 +1,97 @@
+from openai import OpenAI
+from api.openai_scripts_new.config import SYSTEM_PROMPT, OPENAI_API_KEY, SUPABASE_KEY, SUPABASE_URL
+from supabase import create_client, Client
+from api.openai_scripts_new.text_processing import fuzzy_correct_chinese
+from transformers import pipeline
+from langchain_core.prompts import ChatPromptTemplate
+from langchain_community.callbacks import get_openai_callback
+from langchain_core.output_parsers import StrOutputParser
+from langchain_openai import ChatOpenAI
+import torchaudio
+import torch
+
+client = OpenAI(api_key=OPENAI_API_KEY)
+supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
+
+pipe = pipeline(model="linshoufan/linshoufanfork-whisper-small-nan-tw-pinyin")
+
+def transcribe(audio_data):
+    try:
+        # table_name = "word_database"
+        # response = supabase.table(table_name).select("term").execute()
+        # custom_vocab = []
+        # if response.data:
+        #     for item in response.data:
+        #         custom_vocab.append({item['term']})
+        # else:
+        #     print(f"No data found or an error occurred: {response.error}")
+        #     print("Using default dictionary as Supabase data couldn't be fetched.")
+        # 如果音頻是立體聲,轉換為單聲道
+        text = pipe(audio_data)["text"]
+        print(transcript)
+        # 翻譯台羅拼音為繁體中文
+        model_name = "gpt-4o"
+        llm = ChatOpenAI(model_name=model_name, temperature=0.7, api_key=OPENAI_API_KEY, max_tokens=4096)
+        with get_openai_callback() as cb:
+            qa_system_prompt = f"""你是一個專門翻譯台羅拼音的助理,可以將台語音精準的轉換成在繁體中文中的意思.
+    你是一名資深的大語言模型領域的專家,精通模型架構原理和落地應用實踐,只需要翻譯成繁體中文即可."""
+
+            qa_prompt = ChatPromptTemplate.from_messages(
+                [
+                    ("system", qa_system_prompt),
+                    ("human", "{transcript}"),
+                ]
+            )
+
+            rag_chain = (
+                qa_prompt
+                | llm
+                | StrOutputParser()
+            )
+
+            # session_id = "abc123"  # 這應該是從某個上下文獲取的動態值
+            # chat_history = get_session_history(session_id)
+
+            text = rag_chain.invoke(
+                {"transcript": transcript}
+            )
+
+            # # 更新聊天歷史
+            # chat_history.add_user_message(inp)
+            # chat_history.add_ai_message(text)
+            # # chat_history.add_message({'role':HumanMessage(content=input), 'message':AIMessage(content=text)})
+            # save_session_history(session_id, chat_history)
+            print(f"Total Tokens: {cb.total_tokens}")
+            print(f"Prompt Tokens: {cb.prompt_tokens}")
+            print(f"Completion Tokens: {cb.completion_tokens}")
+            print(f"Total Cost (USD): ${cb.total_cost}")
+
+        return text
+    except Exception as e:
+        print(f"轉錄時發生錯誤:{str(e)}")
+        return None
+
+def post_process_transcript(transcript, temperature=0):
+    corrected_transcript = fuzzy_correct_chinese(transcript)
+    
+    messages = [
+        {"role": "system", "content": SYSTEM_PROMPT},
+        {"role": "user", "content": f"請校對並修正以下轉錄文本,但不要改變其原意或回答問題:\n\n{corrected_transcript}"}
+    ]
+
+    response = client.chat.completions.create(
+        model="gpt-4",
+        temperature=temperature,
+        messages=messages
+    )
+
+    return response.choices[0].message.content
+
+def process_audio(audio_data):
+    raw_transcript = transcribe(audio_data)
+    print(raw_transcript)
+    if raw_transcript is None:
+        return None, None
+    corrected_transcript = post_process_transcript(raw_transcript)
+    return raw_transcript, corrected_transcript
+

+ 33 - 0
api/openai_scripts_new/config.py

@@ -0,0 +1,33 @@
+import os
+from dotenv import load_dotenv
+
+load_dotenv()
+
+SUPABASE_URL: str = os.getenv('SUPABASE_URL')
+SUPABASE_KEY: str = os.getenv('SUPABASE_KEY')
+OPENAI_API_KEY: str = os.getenv('OPENAI_API_KEY')
+
+if not SUPABASE_URL or not SUPABASE_KEY:
+    raise ValueError("SUPABASE_URL and SUPABASE_KEY must be set in the .env file")
+
+
+SYSTEM_PROMPT = """你是一位專業的閩南語轉錄為中文校對助理,專門處理有關長照/日照中心的台語對話轉錄。
+你的任務是:
+1. 確保以下專業術語在台語拼音轉換時的準確性:什麼、看什麼、可以、什麼意思、垃圾、冷氣、電風扇、喝水、倒垃圾、上廁所、上班、看孫子、走路、起床、幾點、毛筆、冰箱、哥哥、妹妹、菜市場、夜市、工作、老闆。
+2. 在必要時添加適當的標點符號,如句號、逗號
+3. 使用台灣的繁體中文,確保語言表達符合台灣的用語習慣。
+4. 只更正明顯的錯誤或改善可讀性,不要改變原文的意思或結構。
+5. 不要回答問題、解釋概念或添加任何不在原文中的信息。
+6. 如果原文是一個問句,保持它的問句形式,不要提供答案。
+
+請只根據提供的原文進行必要的更正,不要添加或刪除任何實質性內容。在修正時,請特別注意上下文,確保修正後的詞語符合整句話的語境。"""
+
+CORRECT_TERMS = [
+    "日照中心", "吃飯", "垃圾", "冷氣", "孫子", "電風扇", "喝水", "倒垃圾", "上廁所", "上班", "看孫子", "聯繫", "走路", "起床", "幾點", "毛筆", "電視", "冰箱", "廚房", "可以", "哥哥", "妹妹", "夜市", "工作", "老闆", "什麼"
+]
+
+ERROR_CORRECTION = {
+    '三歲' : "什麼", 
+    '跨三回' : '看什麼'
+}
+

+ 27 - 0
api/openai_scripts_new/dictionary_loader.py

@@ -0,0 +1,27 @@
+import io
+import jieba
+from supabase import create_client, Client
+from api.openai_scripts_new.config import SUPABASE_URL, SUPABASE_KEY
+
+supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
+
+
+def load_word_database_dictionary_from_supabase():
+    table_name = "word_database"
+    response = supabase.table(table_name).select("term, weight, type").execute()
+    
+    if response.data:
+        dict_data = io.StringIO()
+        for item in response.data:
+            dict_data.write(f"{item['term']} {item['weight']} {item['type']}\n")
+        
+        dict_data.seek(0)
+        jieba.load_userdict(dict_data)
+        # print("Loaded dictionary from Supabase")
+        return True
+    else:
+        print(f"No data found or an error occurred: {response.error}")
+        print("Using default dictionary as Supabase data couldn't be fetched.")
+        return False
+
+

+ 47 - 0
api/openai_scripts_new/main_script.py

@@ -0,0 +1,47 @@
+import sys
+from api.openai_scripts_new.dictionary_loader import load_word_database_dictionary_from_supabase
+from api.openai_scripts_new.audio_processing import process_audio
+
+def initialize():
+    word_database_success = load_word_database_dictionary_from_supabase()
+    if not word_database_success:
+        print("Warning: Word Database Dictionary loading failed. Proceeding with default dictionary.")
+
+
+def process_audio_file(audio_data):
+    try:
+        result = process_audio(audio_data)
+        if isinstance(result, tuple) and len(result) == 2:
+            return result
+        else:
+            print("Unexpected result from process_audio")
+            return None, None
+    except Exception as e:
+        print(f"Error processing audio: {str(e)}")
+        return None, None
+    
+# 加入檢查user是否詢問特定問題
+def main(audio_data):
+    initialize()
+    try:
+        raw_transcript, corrected_transcript = process_audio_file(audio_data)
+        
+        return raw_transcript, corrected_transcript
+    except FileNotFoundError:
+        print(f"Error: The file was not found.")
+        return None, None
+    except Exception as e:
+        print(f"An unexpected error occurred: {str(e)}")
+        import traceback
+        traceback.print_exc()
+        return None, None
+        
+
+
+if __name__ == "__main__":
+    if len(sys.argv) != 2:
+        print("Usage: python script_name.py <audio_file_path>")
+        sys.exit(1)
+    
+    audio_file_path = sys.argv[1]
+    main(audio_file_path)

+ 72 - 0
api/openai_scripts_new/text_processing.py

@@ -0,0 +1,72 @@
+import jieba
+from pypinyin import pinyin, Style
+from api.openai_scripts_new.config import CORRECT_TERMS, ERROR_CORRECTION
+
+def chinese_soundex(pinyin_str):
+    soundex_map = {
+        'b': '1', 'p': '1', 'm': '1', 'f': '1',
+        'd': '2', 'n': '2', 'l': '2',
+        'g': '3', 'k': '3', 'h': '3', 't': '3',
+        'j': '4', 'q': '4', 'x': '4',
+        'zh': '5', 'ch': '5', 'sh': '5', 'r': '5',
+        'z': '6', 'c': '6', 's': '6',
+        'an': '7', 'ang': '7', 'en': '8', 'eng': '8', 'in': '8', 'ing': '8',
+        'ong': '9', 'un': '9', 'uan': '9',
+        'i': 'A', 'u': 'A', 'v': 'A',
+        'e': 'B', 'o': 'B',
+    }
+    
+    code = ''
+    tone = '0'
+    i = 0
+    while i < len(pinyin_str):
+        if pinyin_str[i:i+2] in soundex_map:
+            code += soundex_map[pinyin_str[i:i+2]]
+            i += 2
+        elif pinyin_str[i] in soundex_map:
+            code += soundex_map[pinyin_str[i]]
+            i += 1
+        elif pinyin_str[i].isdigit():
+            tone = pinyin_str[i]
+            i += 1
+        else:
+            i += 1
+    
+    code = code[:1] + ''.join(sorted(set(code[1:])))
+    return (code[:3] + tone).ljust(4, '0')
+
+def compare_chinese_words(word1, word2, tone_sensitive=True):
+    pinyin1 = ''.join([p[0] for p in pinyin(word1, style=Style.TONE3, neutral_tone_with_five=True)])
+    pinyin2 = ''.join([p[0] for p in pinyin(word2, style=Style.TONE3, neutral_tone_with_five=True)])
+    
+    soundex1 = chinese_soundex(pinyin1)
+    # print(soundex1)
+    soundex2 = chinese_soundex(pinyin2)
+    # print('soundex2', soundex2)
+
+    if tone_sensitive:
+        return soundex1 == soundex2
+    else:
+        return soundex1[:3] == soundex2[:3]
+
+def fuzzy_correct_chinese(text):
+    words = jieba.lcut(text)
+    corrected_words = []
+    for word in words:
+        if word.isalpha():
+            corrected_words.append(word)
+            continue
+        word_pinyin = ''.join([p[0] for p in pinyin(word, style=Style.NORMAL)])
+        # print(f"Term: {word}, Pinyin: {word_pinyin}")
+        if word in ERROR_CORRECTION:
+            corrected_words.append(ERROR_CORRECTION[word])
+        else:
+            for term in CORRECT_TERMS:
+                if compare_chinese_words(word, term, tone_sensitive=True):
+                    # print(f"corrected: {word} -> {term}")
+                    corrected_words.append(term)
+                    break
+            else:
+                corrected_words.append(word)
+    return ''.join(corrected_words)
+

+ 54 - 0
api/openai_scripts_tai_gi/audio_processing.py

@@ -0,0 +1,54 @@
+from openai import OpenAI
+from api.openai_scripts_tai_gi.config import SYSTEM_PROMPT, OPENAI_API_KEY, SUPABASE_KEY, SUPABASE_URL, ERROR_CORRECTION
+from supabase import create_client, Client
+from api.openai_scripts_tai_gi.text_processing import fuzzy_correct_chinese
+
+client = OpenAI(api_key=OPENAI_API_KEY)
+supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
+
+def transcribe(audio_file):
+    try:
+        table_name = "word_database"
+        response = supabase.table(table_name).select("term").execute()
+        custom_vocab = []
+        if response.data:
+            for item in response.data:
+                custom_vocab.append({item['term']})
+        else:
+            print(f"No data found or an error occurred: {response.error}")
+            print("Using default dictionary as Supabase data couldn't be fetched.")
+        transcript = client.audio.transcriptions.create(
+            file=audio_file,
+            model="whisper-1",
+            response_format="text", 
+            prompt=f"轉錄時對於「早餐、午餐、晚餐、幼兒園、國小、國中、高中、大學」等詞需特別注意。也需注意以下詞彙:{custom_vocab}"
+        )
+        return transcript
+    except Exception as e:
+        print(f"轉錄時發生錯誤:{str(e)}")
+        return None
+
+
+def post_process_transcript(transcript, temperature=0):
+    corrected_transcript = fuzzy_correct_chinese(transcript)
+    
+    messages = [
+        {"role": "system", "content": SYSTEM_PROMPT},
+        {"role": "user", "content": f"請校對並修正下面引號內的轉錄文本且只需回傳修正後的文本內容:「{corrected_transcript}」,重點是要查看上述文本是否還有台語的部分,若有則需要修正為繁體中文意思,沒有的話只需要順一下句子。文本裡面若出現「{ERROR_CORRECTION}」這個字典中的字,就直接以此字典內的規則修正文本。最後強調,只需要回傳順過之後的文本,不用加其他不相干的字或是說明。"}
+    ]
+
+    response = client.chat.completions.create(
+        model="gpt-4",
+        temperature=temperature,
+        messages=messages
+    )
+
+    return response.choices[0].message.content
+
+def process_audio(audio_data):
+    raw_transcript = transcribe(audio_data)
+    print(raw_transcript)
+    if raw_transcript is None:
+        return None, None
+    corrected_transcript = post_process_transcript(raw_transcript)
+    return raw_transcript, corrected_transcript

+ 33 - 0
api/openai_scripts_tai_gi/config.py

@@ -0,0 +1,33 @@
+import os
+from dotenv import load_dotenv
+
+load_dotenv()
+
+SUPABASE_URL: str = os.getenv('SUPABASE_URL')
+SUPABASE_KEY: str = os.getenv('SUPABASE_KEY')
+OPENAI_API_KEY: str = os.getenv('OPENAI_API_KEY')
+
+if not SUPABASE_URL or not SUPABASE_KEY:
+    raise ValueError("SUPABASE_URL and SUPABASE_KEY must be set in the .env file")
+
+
+SYSTEM_PROMPT = """你是一位專業的閩南語轉錄為中文校對助理,專門處理有關長照/日照中心的台語對話轉錄。
+你的任務是:
+1. 確保以下專業術語在台語拼音轉換時的準確性:幫我、身體、什麼時候、現在、有點、冷、會不會、車、昨晚、血壓、需要、量血壓、早餐、午餐、晚餐、主任、早安、午安、晚安、聯絡簿、等車、交通車、國小、什麼、看什麼、可以、什麼意思、垃圾、冷氣、電風扇、喝水、倒垃圾、上廁所、上班、看孫子、走路、起床、幾點、毛筆、冰箱、哥哥、妹妹、菜市場、夜市、工作、老闆。
+2. 在必要時添加適當的標點符號,如句號、逗號
+3. 使用台灣的繁體中文,確保語言表達符合台灣的用語習慣。
+4. 只更正明顯的錯誤或改善可讀性,不要改變原文的意思或結構。
+5. 不要回答問題、解釋概念或添加任何不在原文中的信息。
+6. 如果原文是一個問句,保持它的問句形式,不要提供答案。
+
+請只根據提供的原文進行必要的更正,不要添加或刪除任何實質性內容。在修正時,請特別注意上下文,確保修正後的詞語符合整句話的語境。"""
+
+CORRECT_TERMS = ["幫我", "什麼時候", "身體", "現在", "有點", "冷", "會不會", "車", "昨晚", "血壓", "需要", "量血壓", "早餐", "午餐", "晚餐", "主任", "早安", "午安", "晚安", "聯絡簿", "等車", "交通車", "幼兒園", "國小", "國中", "高中", "大學", "日照中心", "吃飯", "垃圾", "冷氣", "孫子", "電風扇", "喝水", "倒垃圾", "上廁所", "上班", "看孫子", "聯繫", "走路", "起床", "幾點", "毛筆", "電視", "冰箱", "廚房", "可以", "哥哥", "妹妹", "夜市", "工作", "老闆", "什麼"]
+
+ERROR_CORRECTION = {
+    '跨三回' : '看什麼', 
+    '畫沙小' : '看三小', 
+    '高通車' : '交通車', 
+    '的東西搞' : '什麼時候到'
+}
+

+ 27 - 0
api/openai_scripts_tai_gi/dictionary_loader.py

@@ -0,0 +1,27 @@
+import io
+import jieba
+from supabase import create_client, Client
+from api.openai_scripts_tai_gi.config import SUPABASE_URL, SUPABASE_KEY
+
+supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
+
+
+def load_word_database_dictionary_from_supabase():
+    table_name = "word_database"
+    response = supabase.table(table_name).select("term, weight, type").execute()
+    
+    if response.data:
+        dict_data = io.StringIO()
+        for item in response.data:
+            dict_data.write(f"{item['term']} {item['weight']} {item['type']}\n")
+        
+        dict_data.seek(0)
+        jieba.load_userdict(dict_data)
+        # print("Loaded dictionary from Supabase")
+        return True
+    else:
+        print(f"No data found or an error occurred: {response.error}")
+        print("Using default dictionary as Supabase data couldn't be fetched.")
+        return False
+
+

+ 47 - 0
api/openai_scripts_tai_gi/main_script.py

@@ -0,0 +1,47 @@
+import sys
+from api.openai_scripts_tai_gi.dictionary_loader import load_word_database_dictionary_from_supabase
+from api.openai_scripts_tai_gi.audio_processing import process_audio
+
+def initialize():
+    word_database_success = load_word_database_dictionary_from_supabase()
+    if not word_database_success:
+        print("Warning: Word Database Dictionary loading failed. Proceeding with default dictionary.")
+
+
+def process_audio_file(audio_file):
+    try:
+        result = process_audio(audio_file)
+        if isinstance(result, tuple) and len(result) == 2:
+            return result
+        else:
+            print("Unexpected result from process_audio")
+            return None, None
+    except Exception as e:
+        print(f"Error processing audio: {str(e)}")
+        return None, None
+    
+# 加入檢查user是否詢問特定問題
+def main(audio_file):
+    initialize()
+    try:
+        raw_transcript, corrected_transcript = process_audio_file(audio_file)
+        
+        return raw_transcript, corrected_transcript
+    except FileNotFoundError:
+        print(f"Error: The file was not found.")
+        return None, None
+    except Exception as e:
+        print(f"An unexpected error occurred: {str(e)}")
+        import traceback
+        traceback.print_exc()
+        return None, None
+        
+
+
+if __name__ == "__main__":
+    if len(sys.argv) != 2:
+        print("Usage: python script_name.py <audio_file_path>")
+        sys.exit(1)
+    
+    audio_file_path = sys.argv[1]
+    main(audio_file_path)

+ 72 - 0
api/openai_scripts_tai_gi/text_processing.py

@@ -0,0 +1,72 @@
+import jieba
+from pypinyin import pinyin, Style
+from api.openai_scripts_tai_gi.config import CORRECT_TERMS, ERROR_CORRECTION
+
+def chinese_soundex(pinyin_str):
+    soundex_map = {
+        'b': '1', 'p': '1', 'm': '1', 'f': '1',
+        'd': '2', 'n': '2', 'l': '2',
+        'g': '3', 'k': '3', 'h': '3', 't': '3',
+        'j': '4', 'q': '4', 'x': '4',
+        'zh': '5', 'ch': '5', 'sh': '5', 'r': '5',
+        'z': '6', 'c': '6', 's': '6',
+        'an': '7', 'ang': '7', 'en': '8', 'eng': '8', 'in': '8', 'ing': '8',
+        'ong': '9', 'un': '9', 'uan': '9',
+        'i': 'A', 'u': 'A', 'v': 'A',
+        'e': 'B', 'o': 'B',
+    }
+    
+    code = ''
+    tone = '0'
+    i = 0
+    while i < len(pinyin_str):
+        if pinyin_str[i:i+2] in soundex_map:
+            code += soundex_map[pinyin_str[i:i+2]]
+            i += 2
+        elif pinyin_str[i] in soundex_map:
+            code += soundex_map[pinyin_str[i]]
+            i += 1
+        elif pinyin_str[i].isdigit():
+            tone = pinyin_str[i]
+            i += 1
+        else:
+            i += 1
+    
+    code = code[:1] + ''.join(sorted(set(code[1:])))
+    return (code[:3] + tone).ljust(4, '0')
+
+def compare_chinese_words(word1, word2, tone_sensitive=True):
+    pinyin1 = ''.join([p[0] for p in pinyin(word1, style=Style.TONE3, neutral_tone_with_five=True)])
+    pinyin2 = ''.join([p[0] for p in pinyin(word2, style=Style.TONE3, neutral_tone_with_five=True)])
+    
+    soundex1 = chinese_soundex(pinyin1)
+    # print(soundex1)
+    soundex2 = chinese_soundex(pinyin2)
+    # print('soundex2', soundex2)
+
+    if tone_sensitive:
+        return soundex1 == soundex2
+    else:
+        return soundex1[:3] == soundex2[:3]
+
+def fuzzy_correct_chinese(text):
+    words = jieba.lcut(text)
+    corrected_words = []
+    for word in words:
+        if word.isalpha():
+            corrected_words.append(word)
+            continue
+        word_pinyin = ''.join([p[0] for p in pinyin(word, style=Style.NORMAL)])
+        # print(f"Term: {word}, Pinyin: {word_pinyin}")
+        if word in ERROR_CORRECTION:
+            corrected_words.append(ERROR_CORRECTION[word])
+        else:
+            for term in CORRECT_TERMS:
+                if compare_chinese_words(word, term, tone_sensitive=True):
+                    # print(f"corrected: {word} -> {term}")
+                    corrected_words.append(term)
+                    break
+            else:
+                corrected_words.append(word)
+    return ''.join(corrected_words)
+

+ 112 - 0
api/whisper.py

@@ -0,0 +1,112 @@
+# import os, sys
+# from typing import List, Any
+from fastapi import Request, APIRouter, UploadFile, File
+from fastapi.responses import FileResponse, PlainTextResponse
+# from fastapi import FastAPI, HTTPException, status
+# from fastapi.middleware.cors import CORSMiddleware
+# import uvicorn
+# from fastapi.exceptions import HTTPException
+# from fastapi.encoders import jsonable_encoder
+# from urllib.parse import urlparse, urljoin
+from pathlib import Path
+from api.openai_scripts_tai_gi.main_script import main
+from api.openai_scripts_chinese.main_script import main as main2
+# from api.openai_scripts_new.main_script import main as main3
+from datetime import datetime
+import random
+import string
+
+router = APIRouter()
+
+# router = FastAPI()
+# router.add_middleware(
+#     CORSMiddleware,
+#     allow_origins=["*"],
+#     allow_credentials=True,
+#     allow_methods=["*"],
+#     allow_headers=["*"],
+# )
+
+
+@router.post('/tai_gi')
+async def whisper_auto(file: UploadFile = File()):
+    if file == None:
+        return {'message': '請上傳檔案'}
+    extension = file.filename.split(".")[-1] 
+    if extension not in ("mp3", "wav", "webm"):
+        return PlainTextResponse("Audio must be mp3, wav or webm format!", 400)
+    filename = Path(__file__).parent.parent/'speech_audio'/datetime.now().strftime(f"%Y%m%d%H%M%S_{''.join(random.sample(string.ascii_lowercase, 3))}.{extension}")
+    with open(filename, 'wb') as f:
+        f.write(await file.read())
+    with open(filename, 'rb') as f:
+        raw_transcript, corrected_transcript = main(f)
+    # if raw_transcript and corrected_transcript:
+        # os.remove(filename)
+        # return {'message': corrected_transcript}
+    # else:
+        # os.remove(filename)
+        # return {"message": "Audio processing failed."}
+    if raw_transcript and corrected_transcript:
+        # os.remove(save_path)
+        # return {'message': {"Raw transcript": raw_transcript, "Corrected transcript": corrected_transcript}}
+        return {'message': corrected_transcript}
+    else:
+        # os.remove(save_path)
+        return {"message": "Audio processing failed."}
+        
+        
+@router.post('/chinese')
+async def whisper_auto(file: UploadFile = File()):
+    if file == None:
+        return {'message': '請上傳檔案'}
+    extension = file.filename.split(".")[-1] 
+    if extension not in ("mp3", "wav", "webm"):
+        return PlainTextResponse("Audio must be mp3, wav or webm format!", 400)
+    filename = Path(__file__).parent.parent/'speech_audio'/datetime.now().strftime(f"%Y%m%d%H%M%S_{''.join(random.sample(string.ascii_lowercase, 3))}.{extension}")
+    with open(filename, 'wb') as f:
+        f.write(await file.read())
+    with open(filename, 'rb') as f:
+        raw_transcript, corrected_transcript = main2(f)
+    if raw_transcript and corrected_transcript:
+        # return {'message': {"Raw transcript": raw_transcript, "Corrected transcript": corrected_transcript}}
+        return {'message': corrected_transcript}
+    else:
+        return {"message": "Audio processing failed."}
+        
+
+# import numpy as np
+# from transformers import pipeline
+# import gradio as gr
+
+# @router.post('/tai_gi_new')
+# async def whisper_auto(file: UploadFile = File()):
+#     if file == None:
+#         return {'message': '請上傳檔案'}
+#     extension = file.filename.split(".")[-1] 
+#     if extension not in ("mp3", "wav", "webm"):
+#         return PlainTextResponse("Audio must be mp3, wav or webm format!", 400)
+#     filename = Path(__file__).parent/'speech_audio'/datetime.now().strftime(f"%Y%m%d%H%M%S_{''.join(random.sample(string.ascii_lowercase, 3))}.{extension}")
+#     with open(filename, 'wb') as f:
+#         f.write(await file.read())
+#     # with open(filename, 'rb') as f:
+#         # raw_transcript, corrected_transcript = main3(f)
+#     # audio_data, sample_rate = librosa.load(filename, sr=None)
+#     # 讀取音頻文件
+#     audio_input = gr.Audio(type="filepath")
+#     raw_transcript, corrected_transcript = main3(audio_input(filename))
+#     # if raw_transcript and corrected_transcript:
+#         # os.remove(filename)
+#         # return {'message': corrected_transcript}
+#     # else:
+#         # os.remove(filename)
+#         # return {"message": "Audio processing failed."}
+#     if raw_transcript and corrected_transcript:
+#         # os.remove(save_path)
+#         return {'message': {"Raw transcript": raw_transcript, "Corrected transcript": corrected_transcript}}
+#     else:
+#         # os.remove(save_path)
+#         return {"message": "Audio processing failed."}
+
+        
+# if __name__ == "__main__":
+#     uvicorn.run("whisper:router", reload=False, port=8086, host='cmm.ai', ssl_keyfile="/etc/letsencrypt/live/cmm.ai/privkey.pem", ssl_certfile="/etc/letsencrypt/live/cmm.ai/fullchain.pem")

+ 83 - 0
app.py

@@ -0,0 +1,83 @@
+import uvicorn
+
+from typing import List
+from pydantic import BaseModel
+from fastapi import FastAPI, Body
+from fastapi.middleware.cors import CORSMiddleware
+from langchain.schema import SystemMessage, AIMessage, HumanMessage
+from langchain_core.prompts import ChatPromptTemplate
+from langchain_openai import ChatOpenAI
+from langchain.callbacks import get_openai_callback
+
+import os
+from dotenv import load_dotenv
+load_dotenv()
+
+
+from supabase.client import Client, create_client
+supabase_url = os.environ.get("SUPABASE_URL")
+supabase_key = os.environ.get("SUPABASE_KEY")
+supabase: Client = create_client(supabase_url, supabase_key)
+
+chat_model = ChatOpenAI(model='gpt-4o-mini')
+
+app = FastAPI()
+app.add_middleware(
+    CORSMiddleware,
+    allow_origins=["*"],
+    allow_credentials=True,
+    allow_methods=["*"],
+    allow_headers=["*"],
+)
+
+from semantic_search import semantic_cache
+from api import whisper
+app.include_router(whisper.router, prefix='/whisper', tags=['whisper'])
+
+
+class ChatHistoryItem(BaseModel):
+    q: str
+    a: str
+
+@app.post("/chat/")
+async def chat(message, chat_history: List[ChatHistoryItem] = Body(...)):
+    print(chat_history)
+    messages_list = [
+        SystemMessage(content="你是一名日照中心的志工,你的職責是陪伴老人聊天,你需要考量到老人的健康與安全,並能安撫老人,只有家人准許時才能離開日照中心。請用繁體中文"),
+        AIMessage(content="你好!很高興能和您聊天。今天您過得怎麼樣呢?有沒有什麼想分享的事情?")
+    ]
+    
+    for item in chat_history:
+        if item.q == "string" or item.a == "string" : continue
+        messages_list.append(HumanMessage(content=item.q))
+        messages_list.append(AIMessage(content=item.a))
+
+    messages_list.append(HumanMessage(content=message))
+    print(messages_list)
+    prompt = ChatPromptTemplate(
+        messages=messages_list
+    )
+
+    with get_openai_callback() as cb:
+        cache_question, cache_answer = semantic_cache(supabase, message)
+        if cache_answer:
+            save_history(message, cache_answer)
+            return {"message": cache_answer}
+
+    AIMessage_ = chat_model.invoke(prompt.format_messages()).content
+    save_history(message, AIMessage_)
+    
+    return {"message": AIMessage_}
+
+def save_history(question, answer):
+    response = (
+            supabase.table("INNOLUX_record")
+            .insert({"question": question, "answer": answer})
+            .execute()
+        )
+    
+if __name__ == "__main__":
+    
+    uvicorn.run("app:app", reload=False, port=8087, host='cmm.ai', ssl_keyfile="/etc/letsencrypt/live/cmm.ai/privkey.pem", ssl_certfile="/etc/letsencrypt/live/cmm.ai/fullchain.pem")
+
+

+ 60 - 0
semantic_search.py

@@ -0,0 +1,60 @@
+### Python = 3.9
+import os
+from dotenv import load_dotenv
+load_dotenv('.env')
+
+import openai 
+openai_api_key = os.getenv("OPENAI_API_KEY")
+openai.api_key = openai_api_key
+
+from langchain_openai import OpenAIEmbeddings
+embeddings_model = OpenAIEmbeddings()
+
+from langchain_community.document_loaders.csv_loader import CSVLoader
+from langchain_community.vectorstores import Chroma
+
+import pandas as pd
+import re
+
+from langchain_community.embeddings.openai import OpenAIEmbeddings
+from langchain_community.vectorstores import SupabaseVectorStore
+from supabase.client import create_client
+
+def create_qa_vectordb(supabase, vectordb_directory="./chroma_db"):
+
+    if os.path.isdir(vectordb_directory):
+        vectorstore = Chroma(persist_directory=vectordb_directory, embedding_function=embeddings_model)
+        vectorstore.delete_collection()
+
+    response = supabase.table("INNOLUX_cache").select("question, answer").execute()
+    questions = [row["question"] for row in response.data]
+
+    vectorstore = Chroma.from_texts(
+        texts=questions,
+        embedding=embeddings_model,
+        persist_directory=vectordb_directory
+        )
+    
+    return vectorstore
+
+def semantic_cache(supabase, q, SIMILARITY_THRESHOLD=0.83, k=1, vectordb_directory="./chroma_db"):
+
+    if os.path.isdir(vectordb_directory):
+        vectorstore = Chroma(persist_directory=vectordb_directory, embedding_function=embeddings_model)
+    else:
+        print("create new vector db ...")
+        vectorstore = create_qa_vectordb(supabase, vectordb_directory)
+
+    docs_and_scores = vectorstore.similarity_search_with_relevance_scores(q, k=1)
+    doc, score = docs_and_scores[0]
+    print(score)
+    
+    if score >= SIMILARITY_THRESHOLD:
+        cache_question = doc.page_content
+
+        response = supabase.table("INNOLUX_cache").select("question, answer").eq("question", cache_question).execute()
+
+        answer = response.data[0]["answer"]
+        return cache_question, answer
+    else:
+        return None, None