123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- import os
- # import requests
- # from bs4 import BeautifulSoup
- from selenium.webdriver.common.by import By
- # import re
- import time
- # from fake_useragent import UserAgent
- import undetected_chromedriver as uc
- from datetime import datetime
- import random
- import string
- # from selenium.webdriver.common.action_chains import ActionChains
- # from selenium.webdriver.common.keys import Keys
- # from selenium.webdriver.support.ui import WebDriverWait
- # from selenium.webdriver.support import expected_conditions as EC
- # from ga4mp import FirebaseMP
- from dotenv import load_dotenv
- import os
- import shutil
- import logging
- from fastapi import APIRouter, FastAPI
- import uvicorn
- from fastapi.middleware.cors import CORSMiddleware
- from supabase import create_client, Client
- load_dotenv()
- logging.basicConfig(level=logging.INFO)
- SUPABASE_URL: str = os.environ.get('SUPABASE_URL')
- SUPABASE_KEY: str = os.environ.get('SUPABASE_KEY')
- supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
- def download_blob(browser, blob_url, filename='output.mp3'):
- # 使用 Selenium 獲取 Blob 內容
- js_code = f"""
- fetch('{blob_url}')
- .then(response => response.blob())
- .then(blob => {{
- const url = URL.createObjectURL(blob);
- const a = document.createElement('a');
- a.href = url;
- a.download = '{filename}';
- document.body.appendChild(a);
- a.click();
- document.body.removeChild(a);
- }})
- .catch(error => console.error('Error downloading file:', error));
- """
- browser.execute_script(js_code)
- def tts_downloadfile(text):
- start_time = time.time()
- print(f'text長度: {len(text)}')
- num = random.randint(3,5)
- url = 'http://tts001.iptcloud.net:8804/'
- default_download_folder = os.path.join(os.path.expanduser('~'), 'Downloads')
- download_folder = '/var/www/html/innolux/tts_folder'
- file_name = datetime.now().strftime(f"%Y%m%d%H%M%S_{''.join(random.sample(string.ascii_lowercase, 3))}.mp3")
- # 替換為你希望的文件夾路徑
- os.makedirs(download_folder, exist_ok=True)
- options = uc.ChromeOptions()
- options.add_argument('--ignore-certificate-errors')
- prefs = {
- "download.default_directory": download_folder, # 設定預設下載文件夾
- "download.prompt_for_download": False,
- "safebrowsing.enabled": True, # 確保安全瀏覽
- }
- options.add_experimental_option("prefs", prefs)
- # options.add_argument('--incognito')
- options.add_argument('--headless') # 如果不想顯示瀏覽器可以啟用這行
- options.add_argument("--disable-gpu") # 禁用 GPU 加速
- # 設置自定義 headers
- with uc.Chrome(options=options, version_main=129) as browser:
- try:
- browser.get(url)
- time.sleep(num)
- # 轉中文成台語拼音
- browser.find_element(By.XPATH, '//*[@id="js-input"]').send_keys(text)
- time.sleep(0.1)
- browser.find_element(By.XPATH, '//*[@id="js-translate"]').click()
- time.sleep(0.1 + len(text)*0.01)
- browser.execute_script('window.scrollBy(0, 200);')
- # 轉語音
- browser.find_element(By.XPATH, '//*[@id="button1"]').click()
- # time.sleep(len(text)*0.6)
- # audio_element = browser.find_element(By.XPATH, '//*[@id="audio1"]')
- # time.sleep(0.2)
- # # 取得 <audio> 標籤的屬性(例如 src)
- audio_src = None
- while not audio_src:
- if time.time() - start_time > 45:
- return 'Time exceeded'
-
- audio_element = browser.find_element(By.XPATH, '//*[@id="audio1"]')
- audio_src = audio_element.get_attribute('src')
-
- if audio_src:
- print("音頻來源:", audio_src)
- download_blob(browser, audio_src, file_name)
- else:
- print("尚未檢測到音頻,繼續等待...")
- time.sleep(0.3) # 每隔 0.3 秒檢測一次
- # 下載音檔
- # download_blob(browser, audio_src, file_name)
- is_default = False
- # check 是否下載完成
- file_path = '/var/www/html/innolux/tts_folder' + '/' + file_name
- default_file_path = '/root/Downloads' + '/' + file_name
- while not os.path.exists(file_path):
- print('...')
- # if default_file_path:
- # is_default = True
- # break
- time.sleep(0.001)
- # destination_path = os.path.join(download_folder, datetime.now().strftime(f"%Y%m%d%H%M%S_{''.join(random.sample(string.ascii_lowercase, 3))}.wav"))
- # shutil.move(file_path, destination_path)
- # if is_default:
- # while not os.path.exists(default_file_path):
- # print('...')
- # time.sleep(0.001)
- # shutil.move(default_file_path, file_path)
- # print("檔案移動完成")
- print(f"下載完成: {file_path}")
- file_path = file_path.split('html/')[1]
- print(file_path)
- print(time.time() - start_time)
- return file_path
- except Exception as e:
- print(f'Error: {e}')
- return e
- app = FastAPI()
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- @app.post('/tts')
- async def tts(answer: str):
- file_path = tts_downloadfile(answer)
- # if '.mp3' not in file_path:
- # return {"message": file_path}
- if file_path:
- return {'message': {'mp3_url': file_path}}
- else:
- return {"message": "tts processing failed."}
- # from apscheduler.schedulers.background import BackgroundScheduler
- # import asyncio
- # import requests
- # scheduler = BackgroundScheduler()
- # loop = asyncio.get_event_loop()
- # # 巴巴群組
- # def notify_line(id, question, message):
- # # url = 'http://cmm.ai:3001/api/push/PLSDC1vOG9'
- # url = 'https://notify-api.line.me/api/notify'
- # token = 'OtAC4mBxi1tHjFT5RDcCiA8NwNKuxHVOAlZU5iO04XB' # 巴巴工程師群組
- # headers = {
- # 'Authorization': 'Bearer ' + token
- # }
- # # 構造請求的數據
- # data = {
- # 'message': f"\n群創\nid:【{id}】\n問題: {question}\nmessage: {message}"
- # }
-
- # try:
- # response = requests.post(url, headers=headers, data=data)
- # if response.status_code == 200:
- # print("Notification sent successfully.")
- # else:
- # print(f"Failed to send notification. Status code: {response.status_code}")
- # except Exception as e:
- # print(f"An error occurred: {e}")
- # def sub_1_minute():
- # time.sleep(1)
- # cursor = supabase.table('INNOLUX_cache').select('*').filter('mp3_url', 'is', 'null').order('id', desc=False).execute()
- # if cursor.data:
- # data = cursor.data[0]
- # if data['is_run']:
- # return
- # try:
- # supabase.table('INNOLUX_cache').update({'is_run':True}).eq('question', data['question']).eq('answer', data['answer']).execute()
- # file_path = tts_downloadfile(data['answer'])
- # if '.mp3' in file_path:
- # supabase.table('INNOLUX_cache').update({'mp3_url':file_path}).eq('question', data['question']).eq('answer', data['answer']).eq('is_run', True).execute()
- # print(f'{file_path} 已存入資料庫')
- # supabase.table('INNOLUX_cache').update({'is_run':None}).eq('question', data['question']).eq('answer', data['answer']).eq('is_run', True).execute()
- # return
- # else:
- # notify_line(data['id'], data['question'], file_path)
- # supabase.table('INNOLUX_cache').update({'is_run':None}).eq('question', data['question']).eq('answer', data['answer']).execute()
- # print(file_path)
- # return
- # except Exception as e:
- # print(f'Error: {e}')
- # supabase.table('INNOLUX_cache').update({'is_run':None}).eq('question', data['question']).eq('answer', data['answer']).execute()
- # else:
- # return
- # # 添加定时任务
- # scheduler.add_job(sub_1_minute, 'interval', minutes=0.8)
- # scheduler.start()
- # @app.on_event("shutdown")
- # def shutdown_event():
- # scheduler.shutdown()
- if __name__ == '__main__':
- uvicorn.run("台語tts:app", reload=False, port=8093, host='cmm.ai', ssl_keyfile="/etc/letsencrypt/live/cmm.ai/privkey.pem", ssl_certfile="/etc/letsencrypt/live/cmm.ai/fullchain.pem")
|