Explorar o código

commit before switch branch

SherryLiu hai 7 meses
pai
achega
892fdb36f4
Modificáronse 2 ficheiros con 259 adicións e 82 borrados
  1. 199 0
      llama_asr.py
  2. 60 82
      whisper.py

+ 199 - 0
llama_asr.py

@@ -0,0 +1,199 @@
+import os
+import argparse
+from openai import OpenAI
+from dotenv import load_dotenv
+import jieba
+from datetime import datetime
+from pypinyin import pinyin, Style
+from langchain_community.chat_models import ChatOllama
+from langchain.schema import HumanMessage, SystemMessage
+
+load_dotenv('environment.env')
+client = OpenAI()
+
+local_llm = "llama3-groq-tool-use:latest"
+llm = ChatOllama(model=local_llm, temperature=0)
+
+system_prompt = """你是一位專業的轉錄校對助理,專門處理有關溫室氣體、碳排放和碳管理的對話轉錄。
+你的任務是:
+1. 確保以下專業術語的準確性:溫室氣體、碳排放、碳管理、碳盤查、碳權交易、碳足跡、淨零排放、碳權。
+2. 在必要時添加適當的標點符號,如句號、逗號
+3. 使用台灣的繁體中文,確保語言表達符合台灣的用語習慣。
+4. 只更正明顯的錯誤或改善可讀性,不要改變原文的意思或結構。
+5. 不要回答問題、解釋概念或添加任何不在原文中的信息。
+6. 如果原文是一個問句,保持它的問句形式,不要提供答案。
+
+請只根據提供的原文進行必要的更正,不要添加或刪除任何實質性內容。在修正時,請特別注意上下文,確保修正後的詞語符合整句話的語境。"""
+
+def transcribe(audio_file):
+    try:
+        transcript = client.audio.transcriptions.create(
+            file=audio_file,
+            model="whisper-1",
+            response_format="text"
+        )
+        return transcript
+    except Exception as e:
+        print(f"轉錄時發生錯誤:{str(e)}")
+        return None
+
+def save_output(file_name, raw_transcript, corrected_transcript):
+    output_dir = "output"
+    os.makedirs(output_dir, exist_ok=True)
+    
+    output_file = os.path.join(output_dir, "transcription_results.txt")
+    
+    with open(output_file, "a", encoding="utf-8") as f:
+        f.write(f"\n{'='*50}\n")
+        f.write(f"文件名: {file_name}\n")
+        f.write(f"處理時間: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
+        f.write("原始轉錄:\n")
+        f.write(f"{raw_transcript}\n\n")
+        f.write("修正後的轉錄:\n")
+        f.write(f"{corrected_transcript}\n")
+
+def process_audio_file(file_path):
+    try:
+        with open(file_path, "rb") as audio_file:
+            file_size = os.path.getsize(file_path) / (1024 * 1024)  # 轉換為 MB
+            if file_size > 25:
+                print(f"警告:文件 {os.path.basename(file_path)} 大小為 {file_size:.2f} MB,超過了 25 MB 的限制。可能無法處理。")
+
+            print(f"\n處理文件:{os.path.basename(file_path)}")
+            raw_transcript = transcribe(audio_file)
+            if raw_transcript is None:
+                return
+
+            print("\n原始轉錄:")
+            print(raw_transcript)
+
+            corrected_transcript = post_process_transcript(raw_transcript)
+            print("\n修正後的轉錄:")
+            print(corrected_transcript)
+
+            save_output(os.path.basename(file_path), raw_transcript, corrected_transcript)
+
+    except Exception as e:
+        print(f"處理文件 {os.path.basename(file_path)} 時發生錯誤:{str(e)}")
+
+def process_folder(folder_path):
+    processed_files = 0
+
+    for filename in os.listdir(folder_path):
+        if filename.lower().endswith((".mp3", ".wav", ".m4a")):
+            file_path = os.path.join(folder_path, filename)
+            process_audio_file(file_path)
+            processed_files += 1
+
+    print("\n=== 總結 ===")
+    print(f"處理的文件數:{processed_files}")
+
+def chinese_soundex(pinyin_str):
+    soundex_map = {
+        'b': '1', 'p': '1', 'm': '1', 'f': '1',
+        'd': '2', 't': '2', 'n': '2', 'l': '2',
+        'g': '3', 'k': '3', 'h': '3',
+        'j': '4', 'q': '4', 'x': '4',
+        'zh': '5', 'ch': '5', 'sh': '5', 'r': '5',
+        'z': '6', 'c': '6', 's': '6',
+        'an': '7', 'ang': '7', 'en': '8', 'eng': '8', 'in': '8', 'ing': '8',
+        'ong': '9', 'un': '9', 'uan': '9',
+        'i': 'A', 'u': 'A', 'v': 'A',  # 'v' is used for 'ü' in some systems
+        'e': 'B', 'o': 'B',
+    }
+    
+    code = ''
+    tone = '0'
+    i = 0
+    while i < len(pinyin_str):
+        if pinyin_str[i:i+2] in soundex_map:
+            code += soundex_map[pinyin_str[i:i+2]]
+            i += 2
+        elif pinyin_str[i] in soundex_map:
+            code += soundex_map[pinyin_str[i]]
+            i += 1
+        elif pinyin_str[i].isdigit():
+            tone = pinyin_str[i]
+            i += 1
+        else:
+            i += 1
+    
+    code = code[:1] + ''.join(sorted(set(code[1:])))
+    return (code[:3] + tone).ljust(4, '0')
+
+def compare_chinese_words(word1, word2, tone_sensitive=True):
+    pinyin1 = ''.join([p[0] for p in pinyin(word1, style=Style.TONE3, neutral_tone_with_five=True)])
+    pinyin2 = ''.join([p[0] for p in pinyin(word2, style=Style.TONE3, neutral_tone_with_five=True)])
+    
+    soundex1 = chinese_soundex(pinyin1)
+    soundex2 = chinese_soundex(pinyin2)
+    
+    if tone_sensitive:
+        return soundex1 == soundex2
+    else:
+        return soundex1[:3] == soundex2[:3]
+
+error_correction = {
+    "看拳": "碳權",
+    "看盤插": "碳盤查",
+    "盤插": "盤查",
+    "看": "碳"
+}
+
+def fuzzy_correct_chinese(text, correct_terms):
+    words = jieba.cut(text)
+    corrected_words = []
+    for word in words:
+        if word in error_correction:
+            corrected_words.append(error_correction[word])
+        else:
+            for term in correct_terms:
+                if compare_chinese_words(word, term, tone_sensitive=True):
+                    print(f"corrected: {word} -> {term}")
+                    corrected_words.append(term)
+                    break
+            else:
+                corrected_words.append(word)
+    return ''.join(corrected_words)
+
+def post_process_transcript(transcript):
+    correct_terms = ["碳", "溫室氣體", "碳排放", "排放", "碳管理", "管理", "碳盤查", "盤查", "碳權交易", "碳費",
+                     "碳權", "碳足跡", "足跡", "淨零排放", "零排放", "排放", "淨零",
+                     "氣候變遷法", "氣候", "氣候變遷", "法",
+                     "是什麼", "請解釋", "為什麼", "什麼意思",
+                     "台灣"]
+    
+    corrected_transcript = fuzzy_correct_chinese(transcript, correct_terms)
+    
+    # 準備輸入
+    messages = [
+        SystemMessage(content=system_prompt),
+        HumanMessage(content=f"請校對並修正以下轉錄文本,但不要改變其原意或回答問題:\n\n{corrected_transcript}")
+    ]
+    
+    # 使用 ChatOllama 生成回應
+    response = llm(messages)
+    
+    return response.content
+
+def main():
+    parser = argparse.ArgumentParser(description="處理音頻文件使用 llama3-groq-tool-use:latest 模型")
+    parser.add_argument("--file", help="要處理的單個音頻文件的路徑")
+    parser.add_argument("--folder", default="data", help="包含音頻文件的文件夾路徑(默認:data)")
+    args = parser.parse_args()
+
+    if args.file:
+        if os.path.isfile(args.file):
+            process_audio_file(args.file)
+        else:
+            print(f"錯誤:文件 '{args.file}' 不存在。")
+    elif args.folder:
+        if os.path.isdir(args.folder):
+            process_folder(args.folder)
+        else:
+            print(f"錯誤:文件夾 '{args.folder}' 不存在。")
+    else:
+        print("錯誤:請指定一個文件(--file)或文件夾(--folder)來處理。")
+
+if __name__ == "__main__":
+    main()

+ 60 - 82
whisper.py

@@ -2,13 +2,12 @@ import os
 import argparse
 from openai import OpenAI
 from dotenv import load_dotenv
-from pypinyin import pinyin, Style
 import jieba
 from datetime import datetime
-import logging
+from pypinyin import pinyin, Style
 
 load_dotenv('environment.env')
-client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
+client = OpenAI()
 
 system_prompt = """你是一位專業的轉錄校對助理,專門處理有關溫室氣體、碳排放和碳管理的對話轉錄。
 你的任務是:
@@ -21,32 +20,21 @@ system_prompt = """你是一位專業的轉錄校對助理,專門處理有關
 
 請只根據提供的原文進行必要的更正,不要添加或刪除任何實質性內容。在修正時,請特別注意上下文,確保修正後的詞語符合整句話的語境。"""
 
-def setup_logger():
-    logging.basicConfig(level=logging.DEBUG, 
-                        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-    return logging.getLogger(__name__)
-
-logger = setup_logger()
-
 def transcribe(audio_file):
     try:
-        logger.info(f"Attempting to transcribe file: {audio_file}")
-        with open(audio_file, "rb") as file:
-            response = client.audio.transcriptions.create(
-                file=file,
-                model="whisper-1",
-                response_format="text"
-            )
-        logger.info("Transcription successful")
-        return response
+        transcript = client.audio.transcriptions.create(
+            file=audio_file,
+            model="whisper-1",
+            response_format="text"
+        )
+        return transcript
     except Exception as e:
-        logger.error(f"Transcription failed: {str(e)}", exc_info=True)
+        print(f"轉錄時發生錯誤:{str(e)}")
         return None
-    
+
 def save_output(file_name, raw_transcript, corrected_transcript):
     output_dir = "output"
-    if not os.path.exists(output_dir):
-        os.makedirs(output_dir)
+    os.makedirs(output_dir, exist_ok=True)
     
     output_file = os.path.join(output_dir, "transcription_results.txt")
     
@@ -59,7 +47,6 @@ def save_output(file_name, raw_transcript, corrected_transcript):
         f.write("修正後的轉錄:\n")
         f.write(f"{corrected_transcript}\n")
 
-
 def process_audio_file(file_path):
     try:
         with open(file_path, "rb") as audio_file:
@@ -79,19 +66,16 @@ def process_audio_file(file_path):
             print("\n修正後的轉錄:")
             print(corrected_transcript)
 
-            # 保存輸出結果
             save_output(os.path.basename(file_path), raw_transcript, corrected_transcript)
 
     except Exception as e:
         print(f"處理文件 {os.path.basename(file_path)} 時發生錯誤:{str(e)}")
-    return transcript
-
 
 def process_folder(folder_path):
     processed_files = 0
 
     for filename in os.listdir(folder_path):
-        if filename.endswith((".mp3", ".wav", ".m4a")):  
+        if filename.lower().endswith((".mp3", ".wav", ".m4a")):
             file_path = os.path.join(folder_path, filename)
             process_audio_file(file_path)
             processed_files += 1
@@ -99,38 +83,50 @@ def process_folder(folder_path):
     print("\n=== 總結 ===")
     print(f"處理的文件數:{processed_files}")
 
-def chinese_soundex(pinyin):
+def chinese_soundex(pinyin_str):
     soundex_map = {
         'b': '1', 'p': '1', 'm': '1', 'f': '1',
         'd': '2', 't': '2', 'n': '2', 'l': '2',
         'g': '3', 'k': '3', 'h': '3',
         'j': '4', 'q': '4', 'x': '4',
         'zh': '5', 'ch': '5', 'sh': '5', 'r': '5',
-        'z': '6', 'c': '6', 's': '6'
+        'z': '6', 'c': '6', 's': '6',
+        'an': '7', 'ang': '7', 'en': '8', 'eng': '8', 'in': '8', 'ing': '8',
+        'ong': '9', 'un': '9', 'uan': '9',
+        'i': 'A', 'u': 'A', 'v': 'A',  # 'v' is used for 'ü' in some systems
+        'e': 'B', 'o': 'B',
     }
     
-    code = pinyin[0].upper()
+    code = ''
     tone = '0'
+    i = 0
+    while i < len(pinyin_str):
+        if pinyin_str[i:i+2] in soundex_map:
+            code += soundex_map[pinyin_str[i:i+2]]
+            i += 2
+        elif pinyin_str[i] in soundex_map:
+            code += soundex_map[pinyin_str[i]]
+            i += 1
+        elif pinyin_str[i].isdigit():
+            tone = pinyin_str[i]
+            i += 1
+        else:
+            i += 1
     
-    for char in pinyin[1:]:
-        if char.isdigit():
-            tone = char
-        elif char in soundex_map:
-            if len(code) == 1 or code[-1] != soundex_map[char]:
-                code += soundex_map[char]
-        if len(code) == 4:
-            break
-    
-    return (code.ljust(4, '0') + tone)[:5]
+    code = code[:1] + ''.join(sorted(set(code[1:])))
+    return (code[:3] + tone).ljust(4, '0')
 
-def compare_chinese_words(word1, word2):
+def compare_chinese_words(word1, word2, tone_sensitive=True):
     pinyin1 = ''.join([p[0] for p in pinyin(word1, style=Style.TONE3, neutral_tone_with_five=True)])
     pinyin2 = ''.join([p[0] for p in pinyin(word2, style=Style.TONE3, neutral_tone_with_five=True)])
     
     soundex1 = chinese_soundex(pinyin1)
     soundex2 = chinese_soundex(pinyin2)
     
-    return soundex1 == soundex2
+    if tone_sensitive:
+        return soundex1 == soundex2
+    else:
+        return soundex1[:3] == soundex2[:3]
 
 error_correction = {
     "看拳": "碳權",
@@ -139,7 +135,6 @@ error_correction = {
     "看": "碳"
 }
 
-
 def fuzzy_correct_chinese(text, correct_terms):
     words = jieba.cut(text)
     corrected_words = []
@@ -148,13 +143,13 @@ def fuzzy_correct_chinese(text, correct_terms):
             corrected_words.append(error_correction[word])
         else:
             for term in correct_terms:
-                if compare_chinese_words(word, term):
+                if compare_chinese_words(word, term, tone_sensitive=True):
+                    print(f"corrected: {word} -> {term}")
                     corrected_words.append(term)
                     break
             else:
                 corrected_words.append(word)
-    return ' '.join(corrected_words)
-
+    return ''.join(corrected_words)
 
 def post_process_transcript(transcript, temperature=0):
     correct_terms = ["碳", "溫室氣體", "碳排放", "排放", "碳管理", "管理", "碳盤查", "盤查", "碳權交易", "碳費",
@@ -171,48 +166,31 @@ def post_process_transcript(transcript, temperature=0):
     ]
 
     response = client.chat.completions.create(
-        model="gpt-4o",
+        model="gpt-4",
         temperature=temperature,
         messages=messages
     )
 
-    # return response.choices[0].message.content
-    return transcript
+    return response.choices[0].message.content
 
-# 處理單個音頻,使用transcript端點發送音頻給API
-if __name__ == "__main__":
-    import argparse
+def main():
     parser = argparse.ArgumentParser(description="處理音頻文件使用 Whisper")
     parser.add_argument("--file", help="要處理的單個音頻文件的路徑")
+    parser.add_argument("--folder", default="data", help="包含音頻文件的文件夾路徑(默認:data)")
     args = parser.parse_args()
 
     if args.file:
-        with open(args.file, "rb") as audio_file:
-            transcript = transcribe(audio_file)
-            if transcript:
-                corrected = post_process_transcript(transcript)
-                print("Original:", transcript)
-                print("Corrected:", corrected)
-
-
-# def main():
-#     parser = argparse.ArgumentParser(description="處理音頻文件使用 Whisper")
-#     parser.add_argument("--file", help="要處理的單個音頻文件的路徑")
-#     parser.add_argument("--folder", default="data", help="包含音頻文件的文件夾路徑(默認:data)")
-#     args = parser.parse_args()
-
-#     if args.file:
-#         if os.path.isfile(args.file):
-#             process_audio_file(args.file)
-#         else:
-#             print(f"錯誤:文件 '{args.file}' 不存在。")
-#     elif args.folder:
-#         if os.path.isdir(args.folder):
-#             process_folder(args.folder)
-#         else:
-#             print(f"錯誤:文件夾 '{args.folder}' 不存在。")
-#     else:
-#         print("錯誤:請指定一個文件(--file)或文件夾(--folder)來處理。")
-
-# if __name__ == "__main__":
-#     main()
+        if os.path.isfile(args.file):
+            process_audio_file(args.file)
+        else:
+            print(f"錯誤:文件 '{args.file}' 不存在。")
+    elif args.folder:
+        if os.path.isdir(args.folder):
+            process_folder(args.folder)
+        else:
+            print(f"錯誤:文件夾 '{args.folder}' 不存在。")
+    else:
+        print("錯誤:請指定一個文件(--file)或文件夾(--folder)來處理。")
+
+if __name__ == "__main__":
+    main()