浏览代码

add translate zip in text2zip api

tomoya 1 月之前
父节点
当前提交
eb80fc1dea

+ 3 - 3
backend/app/app/aianchor/utils2.py

@@ -56,8 +56,7 @@ def check_zip(zip_filepath:str):
       # excel 裡的圖檔跟zip裡的檔案要一致
       if not table.loc[i, ['素材']].isna().item():
         img =  table.loc[i, ['素材']].item()
-        img = str(img)
-        img_files = [x.strip() for x in img.split(',')]
+        img_files = [x.strip() for x in str(img).split(',')]
         for img in img_files:
           if Path(img).suffix:
             n = len([x for x in true_filenames if x[0] == img])
@@ -128,7 +127,7 @@ def check_and_extract_zip(zip_filepath:str, working_dirpath:str):
       # excel 裡的圖檔跟zip裡的檔案要一致
       if not table.loc[i, ['素材']].isna().item():
         img =  table.loc[i, ['素材']].item()
-        img_files = [x.strip() for x in img.split(',')]
+        img_files = [x.strip() for x in str(img).split(',')]
         for img in img_files:
           if Path(img).suffix:
             target_filenames = [x for x in true_filenames if x[0] == img]
@@ -145,6 +144,7 @@ def check_and_extract_zip(zip_filepath:str, working_dirpath:str):
               with zf.open(filenames[target_filenames[0][1]], 'r') as origin_file:  # 開啟原檔案
                 shutil.copyfileobj(origin_file, output_file)  # 將原檔案內容複製到新檔案
           else:
+            continue
             raise VideoMakerError(f"{target_filenames[0][0]} already exists.")
         
       # 需要tts文字或音檔

+ 37 - 2
backend/app/app/api/api_v1/endpoints/text2zip.py

@@ -4,7 +4,7 @@ from fastapi import APIRouter, Body, Depends, HTTPException, Form, status, Respo
 from fastapi.encoders import jsonable_encoder
 from pydantic.networks import EmailStr, HttpUrl
 from sqlalchemy.orm import Session
-from fastapi.responses import FileResponse, PlainTextResponse
+from fastapi.responses import FileResponse, PlainTextResponse, JSONResponse
 import app.crud as crud
 import app.models as models
 import app.schemas as schemas 
@@ -37,6 +37,8 @@ from app.core.config import settings
 from app.aianchor.utils2 import check_zip, VideoMakerError
 from pathlib import Path
 import emails
+from fastapi import UploadFile, File, Form
+from app.core.video_utils import update_zip
 
 BACKEND_ZIP_STORAGE = Path("/app").joinpath(settings.BACKEND_ZIP_STORAGE)
 LOCAL_ZIP_STORAGE = Path("/").joinpath(settings.LOCAL_ZIP_STORAGE)
@@ -107,7 +109,8 @@ async def generate_zip(
       *,
     background_tasks: BackgroundTasks,
     current_user: models.User = Depends(deps.get_current_active_user),
-    model:Literal['sd3', 'flux']="sd3", 
+    
+    upload_file: UploadFile=File(),
     texts:List[str]):
 
     if not model:
@@ -334,3 +337,35 @@ def send_simple_email(email_to: str, video_path: str):
         print(r)
         assert r.status_code == 250
         
+@router.post('/zip-translate')
+def zip_translate(
+    *,
+    background_tasks: BackgroundTasks,
+    upload_file: UploadFile=File(),
+    lang:Literal["en", 'zh', 'zh-TW', 'ja', 'vi']):
+
+    try:
+        with open(upload_file.filename, 'wb') as f:
+            while contents := upload_file.file.read(1024 * 1024):
+                f.write(contents)
+    except Exception as e:
+        print(e, type(e))
+        error_msg = {"error_message": str(e)}
+        return JSONResponse(error_msg)
+    finally:
+        upload_file.file.close()
+    try:
+      if check_zip(upload_file.filename):
+        print("passed check_zip")
+    except VideoMakerError as e:
+      print(e)
+      error_msg = {"accepted": False, "error_message":f'{e}'}
+      return JSONResponse(error_msg)
+    path = Path(upload_file.filename)
+    update_zip(str(path), 'zh-TW', str(path.parent/(path.stem+"_"+lang+path.suffix)))
+
+    def remove_zip():
+            if os.path.exists(str(path.parent/(path.stem+"_"+lang+path.suffix))):
+                os.remove(str(path.parent/(path.stem+"_"+lang+path.suffix)))
+    background_tasks.add_task(remove_zip)
+    return FileResponse(str(path.parent/(path.stem+"_"+lang+path.suffix)), media_type="application/zip")

+ 0 - 17
backend/app/app/core/test_chardet.py

@@ -1,17 +0,0 @@
-from chardet.universaldetector import UniversalDetector
-
-DEFAULT_ENCODING = "utf-8"
-
-def guess_codec(filenames: list) -> str:
-  codec_detector = UniversalDetector()
-  for filename in filenames:
-    codec_detector.feed(filename.encode('cp437'))
-    if codec_detector.done:
-      break
-
-  result = codec_detector.close()
-  encoding = result.get("encoding")
-  return encoding or DEFAULT_ENCODING
-  
-if __name__=="__main__":
-  pass

+ 58 - 1
backend/app/app/core/video_utils.py

@@ -5,8 +5,22 @@ import shutil
 import os
 import chardet
 import zipfile
-from test_chardet import guess_codec
 from io import BytesIO
+from translate import Translator
+from chardet.universaldetector import UniversalDetector
+
+DEFAULT_ENCODING = "utf-8"
+
+def guess_codec(filenames: list) -> str:
+  codec_detector = UniversalDetector()
+  for filename in filenames:
+    codec_detector.feed(filename.encode('cp437'))
+    if codec_detector.done:
+      break
+
+  result = codec_detector.close()
+  encoding = result.get("encoding")
+  return encoding or DEFAULT_ENCODING
   
 def check_zip(zip_filepath:str):
   path = Path(zip_filepath)
@@ -54,5 +68,48 @@ def check_zip(zip_filepath:str):
         n = stems.count(voice_file)
         if n != 1:
           raise ValueError(f"voice file is can't find is zip at scene {i+1}.")
+      
+def update_zip(zip_path, lang):
+    temp_zip_path = zip_path + ".tmp"
+
+    with zipfile.ZipFile(zip_path, 'r') as zip_in, zipfile.ZipFile(temp_zip_path, 'w') as zip_out:
+        for item in zip_in.infolist():
+            with zip_in.open(item.filename) as src_file:
+                if item.filename.split('.')[-1] == "xlsx":
+                    table = pd.read_excel(src_file, dtype=object)
+                    table = translate_table(table, lang)
+                    table.to_excel(Path(item.filename).name ,sheet_name='Sheet_name_1')
+                    zip_out.write(Path(item.filename).name, item.filename)
+                    os.remove(Path(item.filename).name)
+                elif item.filename.split('.')[-1] == "csv":
+                    table = pd.read_csv(src_file, dtype=object)
+                    table = translate_table(table, lang)
+                    table.to_excel(Path(item.filename).name ,sheet_name='Sheet_name_1')
+                    zip_out.write(Path(item.filename).name, item.filename)
+                    os.remove(Path(item.filename).name)
+                else:
+                    # それ以外のファイルはそのままコピー
+                    with zip_out.open(item.filename, 'w') as dst_file:
+                        shutil.copyfileobj(src_file, dst_file)
+
+    # 旧ZIPを削除し、新ZIPをリネーム
+    os.remove(zip_path)
+    os.rename(temp_zip_path, zip_path)
+    
+def translate_table(table, lang):
+    translator= Translator(to_lang=lang)
+    print(f"translate to {lang}")
+    for i in range(len(table)):
+        if (text:=table.loc[i, ['大標']].item()):
+            print("大標:",text)
+            translation = translator.translate(text)
+            print("大標翻譯:",translation)
+            table.loc[i, ['字幕']] = translation
+        if (text:=table.loc[i, ['字幕']].item()):
+            print('字幕:',text)
+            translation = translator.translate(text)
+            print('字幕翻譯:',translation)
+            table.loc[i, ['字幕']] = translation
+    return table