@@ -0,0 +1,238 @@
+import os
+# import requests
+# from bs4 import BeautifulSoup
+from selenium.webdriver.common.by import By
+# import re
+import time
+# from fake_useragent import UserAgent
+import undetected_chromedriver as uc
+from datetime import datetime
+import random
+import string
+# from selenium.webdriver.common.action_chains import ActionChains
+# from selenium.webdriver.common.keys import Keys
+# from selenium.webdriver.support.ui import WebDriverWait
+# from selenium.webdriver.support import expected_conditions as EC
+# from ga4mp import FirebaseMP
+from dotenv import load_dotenv
+import os
+import shutil
+import logging
+from fastapi import APIRouter, FastAPI
+import uvicorn
+from fastapi.middleware.cors import CORSMiddleware
+from supabase import create_client, Client
+SUPABASE_URL: str = os.environ.get('SUPABASE_URL')
+SUPABASE_KEY: str = os.environ.get('SUPABASE_KEY')
+supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
+def download_blob(browser, blob_url, filename='output.mp3'):
+ # 使用 Selenium 獲取 Blob 內容
+ js_code = f"""
+ fetch('{blob_url}')
+ .then(response => response.blob())
+ .then(blob => {{
+ const url = URL.createObjectURL(blob);
+ const a = document.createElement('a');
+ a.href = url;
+ a.download = '{filename}';
+ document.body.appendChild(a);
+ a.click();
+ document.body.removeChild(a);
+ }})
+ .catch(error => console.error('Error downloading file:', error));
+ """
+ browser.execute_script(js_code)
+def tts_downloadfile(text):
+ start_time = time.time()
+ print(f'text長度: {len(text)}')
+ num = random.randint(3,5)
+ url = 'http://tts001.iptcloud.net:8804/'
+ default_download_folder = os.path.join(os.path.expanduser('~'), 'Downloads')
+ download_folder = '/var/www/html/innolux/tts_folder'
+ file_name = datetime.now().strftime(f"%Y%m%d%H%M%S_{''.join(random.sample(string.ascii_lowercase, 3))}.mp3")
+ # 替換為你希望的文件夾路徑
+ os.makedirs(download_folder, exist_ok=True)
+ options = uc.ChromeOptions()
+ options.add_argument('--ignore-certificate-errors')
+ prefs = {
+ "download.default_directory": download_folder, # 設定預設下載文件夾
+ "download.prompt_for_download": False,
+ "safebrowsing.enabled": True, # 確保安全瀏覽
+ }
+ options.add_experimental_option("prefs", prefs)
+ # options.add_argument('--incognito')
+ options.add_argument('--headless') # 如果不想顯示瀏覽器可以啟用這行
+ options.add_argument("--disable-gpu") # 禁用 GPU 加速
+ # 設置自定義 headers
+ with uc.Chrome(options=options, version_main=129) as browser:
+ try:
+ browser.get(url)
+ time.sleep(num)
+ # 轉中文成台語拼音
+ browser.find_element(By.XPATH, '//*[@id="js-input"]').send_keys(text)
+ time.sleep(0.1)
+ browser.find_element(By.XPATH, '//*[@id="js-translate"]').click()
+ time.sleep(0.1 + len(text)*0.01)
+ browser.execute_script('window.scrollBy(0, 200);')
+ # 轉語音
+ browser.find_element(By.XPATH, '//*[@id="button1"]').click()
+ # time.sleep(len(text)*0.6)
+ # audio_element = browser.find_element(By.XPATH, '//*[@id="audio1"]')
+ # time.sleep(0.2)
+ # # 取得 <audio> 標籤的屬性(例如 src)
+ audio_src = None
+ while not audio_src:
+ if time.time() - start_time > 45:
+ return 'Time exceeded'
+ audio_element = browser.find_element(By.XPATH, '//*[@id="audio1"]')
+ audio_src = audio_element.get_attribute('src')
+ if audio_src:
+ print("音頻來源:", audio_src)
+ download_blob(browser, audio_src, file_name)
+ else:
+ print("尚未檢測到音頻,繼續等待...")
+ time.sleep(0.3) # 每隔 0.3 秒檢測一次
+ # 下載音檔
+ # download_blob(browser, audio_src, file_name)
+ is_default = False
+ # check 是否下載完成
+ file_path = '/var/www/html/innolux/tts_folder' + '/' + file_name
+ default_file_path = '/root/Downloads' + '/' + file_name
+ while not os.path.exists(file_path):
+ print('...')
+ # if default_file_path:
+ # is_default = True
+ # break
+ time.sleep(0.001)
+ # destination_path = os.path.join(download_folder, datetime.now().strftime(f"%Y%m%d%H%M%S_{''.join(random.sample(string.ascii_lowercase, 3))}.wav"))
+ # shutil.move(file_path, destination_path)
+ # if is_default:
+ # while not os.path.exists(default_file_path):
+ # print('...')
+ # time.sleep(0.001)
+ # shutil.move(default_file_path, file_path)
+ # print("檔案移動完成")
+ print(f"下載完成: {file_path}")
+ file_path = file_path.split('html/')[1]
+ print(file_path)
+ print(time.time() - start_time)
+ return file_path
+ except Exception as e:
+ print(f'Error: {e}')
+ return e
+app = FastAPI()
+ CORSMiddleware,
+ allow_origins=["*"],
+ allow_credentials=True,
+ allow_methods=["*"],
+ allow_headers=["*"],
+async def tts(answer: str):
+ file_path = tts_downloadfile(answer)
+ # if '.mp3' not in file_path:
+ # return {"message": file_path}
+ if file_path:
+ return {'message': {'mp3_url': file_path}}
+ else:
+ return {"message": "tts processing failed."}
+# from apscheduler.schedulers.background import BackgroundScheduler
+# import asyncio
+# import requests
+# scheduler = BackgroundScheduler()
+# loop = asyncio.get_event_loop()
+# # 巴巴群組
+# def notify_line(id, question, message):
+# # url = 'http://cmm.ai:3001/api/push/PLSDC1vOG9'
+# url = 'https://notify-api.line.me/api/notify'
+# token = 'OtAC4mBxi1tHjFT5RDcCiA8NwNKuxHVOAlZU5iO04XB' # 巴巴工程師群組
+# headers = {
+# 'Authorization': 'Bearer ' + token
+# }
+# # 構造請求的數據
+# data = {
+# 'message': f"\n群創\nid:【{id}】\n問題: {question}\nmessage: {message}"
+# }
+# try:
+# response = requests.post(url, headers=headers, data=data)
+# if response.status_code == 200:
+# print("Notification sent successfully.")
+# else:
+# print(f"Failed to send notification. Status code: {response.status_code}")
+# except Exception as e:
+# print(f"An error occurred: {e}")
+# def sub_1_minute():
+# time.sleep(1)
+# cursor = supabase.table('INNOLUX_cache').select('*').filter('mp3_url', 'is', 'null').order('id', desc=False).execute()
+# if cursor.data:
+# data = cursor.data[0]
+# if data['is_run']:
+# return
+# try:
+# supabase.table('INNOLUX_cache').update({'is_run':True}).eq('question', data['question']).eq('answer', data['answer']).execute()
+# file_path = tts_downloadfile(data['answer'])
+# if '.mp3' in file_path:
+# supabase.table('INNOLUX_cache').update({'mp3_url':file_path}).eq('question', data['question']).eq('answer', data['answer']).eq('is_run', True).execute()
+# print(f'{file_path} 已存入資料庫')
+# supabase.table('INNOLUX_cache').update({'is_run':None}).eq('question', data['question']).eq('answer', data['answer']).eq('is_run', True).execute()
+# return
+# else:
+# notify_line(data['id'], data['question'], file_path)
+# supabase.table('INNOLUX_cache').update({'is_run':None}).eq('question', data['question']).eq('answer', data['answer']).execute()
+# print(file_path)
+# return
+# except Exception as e:
+# print(f'Error: {e}')
+# supabase.table('INNOLUX_cache').update({'is_run':None}).eq('question', data['question']).eq('answer', data['answer']).execute()
+# else:
+# return
+# # 添加定时任务
+# scheduler.add_job(sub_1_minute, 'interval', minutes=0.8)
+# scheduler.start()
+# @app.on_event("shutdown")
+# def shutdown_event():
+# scheduler.shutdown()
+if __name__ == '__main__':
+ uvicorn.run("台語tts:app", reload=False, port=8093, host='cmm.ai', ssl_keyfile="/etc/letsencrypt/live/cmm.ai/privkey.pem", ssl_certfile="/etc/letsencrypt/live/cmm.ai/fullchain.pem")