tomoya 3 maanden geleden
bovenliggende
commit
a951b0ce61
1 gewijzigde bestanden met toevoegingen van 247 en 7 verwijderingen
  1. 247 7
      backend/app/app/api/api_v1/endpoints/edm_saas_api.py

+ 247 - 7
backend/app/app/api/api_v1/endpoints/edm_saas_api.py

@@ -2,7 +2,10 @@ from fastapi import APIRouter,FastAPI, Depends, HTTPException, status
 from pydantic import BaseModel
 from pydantic import BaseModel
 from supabase import create_client, Client
 from supabase import create_client, Client
 from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
 from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
-from fastapi import UploadFile, File, Form
+from fastapi import UploadFile, File, Form, BackgroundTasks, WebSocket
+from fastapi.responses import FileResponse
+import os
+from pathlib import Path
 from jose import JWTError, jwt
 from jose import JWTError, jwt
 from passlib.context import CryptContext
 from passlib.context import CryptContext
 from datetime import datetime, timedelta
 from datetime import datetime, timedelta
@@ -14,24 +17,39 @@ from email.mime.multipart import MIMEMultipart
 from email.mime.text import MIMEText
 from email.mime.text import MIMEText
 from google.auth.transport.requests import Request
 from google.auth.transport.requests import Request
 import supabase
 import supabase
+import asyncio
+from edge_tts import VoicesManager
+from app.aianchor.utils2 import check_zip, VideoMakerError
+from app.core.video_utils import update_zip
+from typing import Any, List, Optional, Literal
+import app.schemas as schemas 
+from app.core.celery_app import celery_app
+from app.core.config import settings
+import requests
 
 
-
-
+BACKEND_ZIP_STORAGE = Path("/app").joinpath(settings.BACKEND_ZIP_STORAGE)
+LOCAL_ZIP_STORAGE = Path("/").joinpath(settings.LOCAL_ZIP_STORAGE)
 
 
 SUPABASE_URL = "http://172.105.241.163:8000/"
 SUPABASE_URL = "http://172.105.241.163:8000/"
 SUPABASE_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyAgCiAgICAicm9sZSI6ICJzZXJ2aWNlX3JvbGUiLAogICAgImlzcyI6ICJzdXBhYmFzZS1kZW1vIiwKICAgICJpYXQiOiAxNjQxNzY5MjAwLAogICAgImV4cCI6IDE3OTk1MzU2MDAKfQ.DaYlNEoUrrEn2Ig7tqibS-PHK5vgusbcbo7X36XVt4Q"
 SUPABASE_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyAgCiAgICAicm9sZSI6ICJzZXJ2aWNlX3JvbGUiLAogICAgImlzcyI6ICJzdXBhYmFzZS1kZW1vIiwKICAgICJpYXQiOiAxNjQxNzY5MjAwLAogICAgImV4cCI6IDE3OTk1MzU2MDAKfQ.DaYlNEoUrrEn2Ig7tqibS-PHK5vgusbcbo7X36XVt4Q"
 EDM_SECRET_KEY = "EdmSaasUserSecretKey"
 EDM_SECRET_KEY = "EdmSaasUserSecretKey"
 USER_TABLE="EDM_SAAS_USER"
 USER_TABLE="EDM_SAAS_USER"
+VIDEO_TABLE="EDM_SAAS_VIDEO"
 ALGORITHM = "HS256"
 ALGORITHM = "HS256"
 
 
+LINE_URL = 'https://notify-api.line.me/api/notify'
+LINE_TOKEN = 'o8dqdVL2k8aiWO4jy3pawZamBu53bbjoSh2u0GJ7F0j'
+
 supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
 supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
 router = APIRouter()
 router = APIRouter()
+video_clients = {}
 
 
 class User(BaseModel):
 class User(BaseModel):
     username: str
     username: str
     password: str
     password: str
 
 
 class UserInDB(BaseModel):
 class UserInDB(BaseModel):
+    id: int
     username: str
     username: str
     hashed_password: str
     hashed_password: str
     token :Optional[str]
     token :Optional[str]
@@ -39,8 +57,17 @@ class UserInDB(BaseModel):
     point : Optional[int]
     point : Optional[int]
     recharge : Optional[int]
     recharge : Optional[int]
     
     
+class VideoInDB(BaseModel):
+    id:int
+    stored_filename:str
+    progress_state:str
+    created_datetime:str
+    length:Optional[int]
+    owner_id:int
+
+    
 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
-oauth2_scheme = OAuth2PasswordBearer(tokenUrl="https://crm.choozmo.com/auth/token")
+oauth2_scheme = OAuth2PasswordBearer(tokenUrl="https://cmm.ai:10001/auth/token")
 
 
 async def get_user(username: str):
 async def get_user(username: str):
     user_table: str = USER_TABLE
     user_table: str = USER_TABLE
@@ -53,7 +80,8 @@ async def get_user(username: str):
 
 
 @router.post("/zip2video")
 @router.post("/zip2video")
 async def update_detect_info(
 async def update_detect_info(
-    token: str = Depends(oauth2_scheme),
+    background_tasks: BackgroundTasks,
+    token: str ,
     title: str=Form(...), 
     title: str=Form(...), 
     anchor: str=Form(...),
     anchor: str=Form(...),
     style: str=Form(...),
     style: str=Form(...),
@@ -67,6 +95,170 @@ async def update_detect_info(
         headers={"WWW-Authenticate": "Bearer"},
         headers={"WWW-Authenticate": "Bearer"},
     )
     )
     try:
     try:
+        print(token)
+        payload = jwt.decode(token, EDM_SECRET_KEY, algorithms=[ALGORITHM])
+        username: str = payload.get("sub")
+        print(username)
+        if username is None:
+            raise credentials_exception
+    except JWTError:
+        raise credentials_exception
+
+    user = await get_user(username)
+    if user is None:
+        raise credentials_exception
+    
+    filename = datetime.now().strftime("VIDEO%Y%m%d%H%M%S")
+    filepath = str(Path(BACKEND_ZIP_STORAGE).joinpath(filename+".zip"))
+    try:
+        with open(filepath, 'wb') as f:
+            while contents := upload_file.file.read(1024 * 1024):
+                f.write(contents)
+    except Exception as e:
+        print(e, type(e))
+        error_msg = {"error_message": str(e)}
+        return JSONResponse(error_msg)
+    finally:
+        upload_file.file.close()
+    try:
+      if check_zip(filepath):
+        print("passed check_zip")
+    except VideoMakerError as e:
+      print(e)
+      error_msg = {"accepted": False, "error_message":f'{e}'}
+      return JSONResponse(error_msg)
+  
+    #video_create = schemas.VideoCreate(title=title, progress_state="PENDING", stored_filename=filename)
+    #video = crud.video.create_with_owner(db=db, obj_in=video_create, owner_id=current_user.id)
+    video = {"stored_filename":filename, "progress_state":"PENDING", "owner_id":user.id}
+    response = supabase.table(VIDEO_TABLE).insert(video).execute()
+    return_msg = {"video_message":"accepted", "accepted":True}
+    video_data = response.data[0]
+    video_data['membership_status'] = "infinite" ##
+    video_data['available_time'] = 0 ##
+    video_data['video_id'] = video_data['id']
+    video_data['character'] = anchor
+    video_data['anchor'] = anchor
+    video_data['style'] = style
+    video_data['lang'] = lang
+    background_tasks.add_task(wait_finish, video_data)
+    
+async def wait_finish(video_data:dict): 
+    zip_filename = video_data['stored_filename']+".zip"
+    process = await asyncio.create_subprocess_exec("sshpass", "-p", "choozmo9", 
+                    "scp", "-P", "5722", "-o", "StrictHostKeyChecking=no", f"{str(BACKEND_ZIP_STORAGE/zip_filename)}", f"root@172.104.93.163:{str(LOCAL_ZIP_STORAGE)}")
+    await process.wait()
+
+    task = celery_app.send_task("app.worker.make_video", kwargs=video_data)
+    while True:
+       await asyncio.sleep(1)
+       if task.state != "PENDING":
+           break
+       
+    video = supabase.table(VIDEO_TABLE).update({"progress_state":"STARTED"}).eq('id', video_data['id']).execute()
+    msg_data = f"{video_data['stored_filename']}:STARTED"
+    await publish(msg_data)
+    headers = {
+        'Authorization': 'Bearer ' + LINE_TOKEN    
+    }
+    data = {
+        'message':f'cloud-choozmo-com\n \
+                    Video\n \
+                    user: {video_data["owner_id"]}\n \
+                    membership: {video_data["membership_status"]}\n \
+                    video: {video_data["id"]}\n \
+                    state: start'     
+    }
+    data = requests.post(LINE_URL, headers=headers, data=data) 
+    while True:
+        await asyncio.sleep(1)
+        if task.state != "STARTED":
+            break
+
+    if task.state == "SUCCESS":
+        video = supabase.table(VIDEO_TABLE).update({"progress_state":"SUCCESS"}).eq('id', video_data['id']).execute()
+        if time := task.result:
+            # user.available_time -= int(time) if int(time) <= user.available_time else 0
+            # video.length = int(time)
+            pass
+        msg_data = f"{video_data['stored_filename']}:SUCCESS:{int(time)}"
+        headers = {
+            'Authorization': 'Bearer ' + LINE_TOKEN    
+        }
+        data = {
+            'message':f'cloud-choozmo-com\n \
+                        Video\n \
+                        user: {video_data["owner_id"]}\n \
+                        membership: {video_data["membership_status"]}\n \
+                        video: {video_data["id"]}\n \
+                        state: success'    
+        }
+        data = requests.post(LINE_URL, headers=headers, data=data) 
+
+    elif task.state == "FAILURE":
+        video = supabase.table(VIDEO_TABLE).update({"progress_state":"FAILURE"}).eq('id', video_data['id']).execute()
+        msg_data = f"{video_data['stored_filename']}:FAILURE"
+
+        headers = {
+            'Authorization': 'Bearer ' + LINE_TOKEN   
+        }
+        data = {
+            'message':f'cloud-choozmo-com\n \
+                        Video\n \
+                        user: {video_data["owner_id"]}\n \
+                        membership: {video_data["membership_status"]}\n \
+                        video: {video_data["id"]}\n \
+                        state: failure'     
+        }
+        data = requests.post(LINE_URL, headers=headers, data=data) 
+
+    await publish(msg_data)
+
+async def publish(data):
+    for video_client in video_clients.values():
+        await video_client.send_text(f"{data}")
+        
+@router.websocket("")
+async def websocket_endpoint(websocket: WebSocket):
+    await websocket.accept()
+    key = websocket.headers.get('sec-websocket-key')
+    video_clients[key] = websocket
+    try:
+        while True:
+            data = await websocket.receive_text()
+            if not data.startswith("subscribe"):
+              del video_clients[key]
+              #for client in sr_clients.values():
+              #      await client.send_text(f"ID: {key} | Message: {data}")
+
+    except:
+        # 接続が切れた場合、当該クライアントを削除する
+        del video_clients[key]
+    
+async def get_videos(user_id: int):
+    user_table: str = VIDEO_TABLE
+    response = supabase.table(user_table).select('*').eq('user_id', user_id).execute()
+    videos = response.data
+    if videos:
+        videos = [VideoInDB(**video) for video in videos]
+        return videos
+    return None
+    
+@router.get("/zip2video")
+async def get_video_list(
+    token: str ,
+) -> Any:
+    """
+    Retrieve items.
+    """
+    # 驗證身分
+    credentials_exception = HTTPException(
+        status_code=status.HTTP_401_UNAUTHORIZED,
+        detail="Could not validate credentials",
+        headers={"WWW-Authenticate": "Bearer"},
+    )
+    try:
+        print(token)
         payload = jwt.decode(token, EDM_SECRET_KEY, algorithms=[ALGORITHM])
         payload = jwt.decode(token, EDM_SECRET_KEY, algorithms=[ALGORITHM])
         username: str = payload.get("sub")
         username: str = payload.get("sub")
         print(username)
         print(username)
@@ -79,7 +271,55 @@ async def update_detect_info(
     if user is None:
     if user is None:
         raise credentials_exception
         raise credentials_exception
     
     
-    video_data = {"title":title, "anchor":anchor, "lang_id":lang}
-    print(video_data)
+    videos = get_videos(user_id=user.id)
+    
+    return videos
+
+@router.post('/zip-translate')
+def zip_translate(
+    *,
+    background_tasks: BackgroundTasks,
+    upload_file: UploadFile=File(),
+    lang:str):
+
+    try:
+        with open(upload_file.filename, 'wb') as f:
+            while contents := upload_file.file.read(1024 * 1024):
+                f.write(contents)
+    except Exception as e:
+        print(e, type(e))
+        error_msg = {"error_message": str(e)}
+        return JSONResponse(error_msg)
+    finally:
+        upload_file.file.close()
+    try:
+      if check_zip(upload_file.filename):
+        print("passed check_zip")
+    except VideoMakerError as e:
+      print(e)
+      error_msg = {"accepted": False, "error_message":f'{e}'}
+      return JSONResponse(error_msg)
+    path = Path(upload_file.filename)
+    local = None
+    if "-" in lang:
+        local = lang
+        lang = lang.split("-")[0]
+    gender = 'Female'
+    voices = asyncio.run(VoicesManager.create())
+    if local:
+        voice = voices.find(Gender=gender, Language=lang, Locale=local)
+        update_zip(str(path), local, str(path.parent/(path.stem+f"_{lang}"+path.suffix)), voice[0]['ShortName'])
+    else:
+        voice = voices.find(Gender=gender, Language=lang)
+        update_zip(str(path), lang, str(path.parent/(path.stem+f"_{lang}"+path.suffix)), voice[0]['ShortName'])
+
+    def remove_zip():
+            if os.path.exists(str(path.parent/(path.stem+"_"+lang+path.suffix))):
+                os.remove(str(path.parent/(path.stem+"_"+lang+path.suffix)))
+    background_tasks.add_task(remove_zip)
+    return FileResponse(str(path.parent/(path.stem+"_"+lang+path.suffix)), media_type="application/zip")
+
+
+