tomoya 2 سال پیش
والد
کامیت
d4a384626c
3فایلهای تغییر یافته به همراه103 افزوده شده و 17 حذف شده
  1. 56 4
      backend/app/app/api/api_v1/endpoints/images.py
  2. 1 1
      backend/app/app/main.py
  3. 46 12
      backend/app/app/worker.py

+ 56 - 4
backend/app/app/api/api_v1/endpoints/images.py

@@ -3,8 +3,10 @@ import subprocess
 from fastapi import UploadFile, File, Form
 from fastapi.responses import FileResponse
 from fastapi import APIRouter, Depends, HTTPException
+from fastapi import WebSocket, BackgroundTasks
 from sqlalchemy.orm import Session
-
+import asyncio
+import shutil
 import app.crud as crud
 import app.models as models
 import app.schemas as schemas 
@@ -24,19 +26,45 @@ LOCAL_ZIP_STORAGE = Path("/").joinpath(settings.LOCAL_ZIP_STORAGE)
 router = APIRouter()
 
 
-
 @router.post("/sr")
 async def supser_resolution(
     *,
     db: Session = Depends(deps.get_db),
     current_user: models.User = Depends(deps.get_current_active_user),
     upload_files: List[UploadFile]=File(description="Multiple files as UploadFile"),
+    background_tasks: BackgroundTasks,
 ) -> Any:
     """
     Super Resolution.
     """
-    filenames = [random_name(20) for file in upload_files]
-    return {"filenames": filenames}
+    filenames = [random_name(20)+Path(file.filename).suffix for file in upload_files]
+    stemnames = [Path(filename).stem for filename in filenames]
+    new_dir = random_name(10)
+    new_dir_path = Path(BACKEND_ZIP_STORAGE).joinpath(new_dir)
+    new_dir_path.mkdir()
+    for i in range(len(upload_files)):
+      try:
+          with open(str(Path(new_dir_path).joinpath(filenames[i])), 'wb') as f:
+              while contents := upload_files[i].file.read(1024 * 1024):
+                  f.write(contents)
+      except Exception as e:
+          print(e, type(e))
+          return {"error": str(e)}
+      finally:
+          upload_files[i].file.close()
+    source = [f"{str(BACKEND_ZIP_STORAGE/filename)}" for filename in filenames]
+    r = subprocess.run(["sshpass", "-p", "choozmo9", 
+                    "scp", "-P", "5722", "-o", "StrictHostKeyChecking=no", "-r", f"{str(new_dir_path)}", f"root@172.104.93.163:{str(LOCAL_ZIP_STORAGE)}"])
+    
+    background_tasks.add_task(wait_finish, new_dir_path, stemnames)
+    
+    print(filenames)
+    return {"filenames": stemnames}
+
+async def wait_finish(dirname, filenames):
+    for filename in filenames:
+      await asyncio.sleep(3)
+      await publish(filename)
 
 @router.get("/sr")
 def get_image(
@@ -53,3 +81,27 @@ def get_image(
     response_filename = filename.stem + "_hr.png"
     return FileResponse(path="test_medias/superman_resolution.png", media_type='image/png', filename=response_filename)
 
+sr_clients = {}
+
+@router.websocket("/sr")
+async def websocket_endpoint(websocket: WebSocket):
+    await websocket.accept()
+    key = websocket.headers.get('sec-websocket-key')
+    sr_clients[key] = websocket
+    try:
+        while True:
+            data = await websocket.receive_text()
+            if not data.startswith("subscribe"):
+              del sr_clients[key]
+              return 
+              #for client in sr_clients.values():
+              #      await client.send_text(f"ID: {key} | Message: {data}")
+
+    except:
+        #await websocket.close()
+        # 接続が切れた場合、当該クライアントを削除する
+        del sr_clients[key]
+
+async def publish(data):
+    for sr_client in sr_clients.values():
+        await sr_client.send_text(f"{data}")

+ 1 - 1
backend/app/app/main.py

@@ -12,7 +12,7 @@ app = FastAPI(
 if settings.BACKEND_CORS_ORIGINS:
     app.add_middleware(
         CORSMiddleware,
-        allow_origins=["*"],#str(origin) for origin in settings.BACKEND_CORS_ORIGINS],
+        allow_origins=[str(origin) for origin in settings.BACKEND_CORS_ORIGINS],
         allow_credentials=True,
         allow_methods=["*"],
         allow_headers=["*"],

+ 46 - 12
backend/app/app/worker.py

@@ -1,23 +1,30 @@
 from raven import Client
-import os
+import os, subprocess
 from app.core.celery_app import celery_app
 from app.core.config import settings
 import requests
 from pathlib import Path
 from urllib.parse import urlparse, urljoin
-import os
+from .aianchor import  make_video_from_zip
 #client_sentry = Client(settings.SENTRY_DSN)
-
+import dataset
+from app.db.session import SessionLocal
+from app.models import video
+from app import crud
 download_to_local_url = urljoin(settings.SERVER_HOST, settings.API_V1_STR+"/videos/worker")
 upload_to_server_url = urljoin(settings.SERVER_HOST, settings.API_V1_STR+"/videos/worker")
 
-ZIP_STORAGE = Path("/").joinpath(settings.LOCAL_ZIP_STORAGE) 
-VIDEO_STORAGE = Path("/").joinpath(settings.CELERY_VIDEO_STORAGE)
-
+LOCAL_ZIP_STORAGE = Path("/").joinpath(settings.LOCAL_ZIP_STORAGE)
+CELERY_ZIP_STORAGE = Path("/").joinpath(settings.CELERY_ZIP_STORAGE) 
+CELERY_VIDEO_STORAGE = Path("/").joinpath(settings.LOCAL_VIDEO_STORAGE)
+LOCAL_VIDEO_STORAGE = Path("/").joinpath(settings.LOCAL_VIDEO_STORAGE)
 
+STORAGE_IP = '192.168.192.252'#os.getenv('STORAGE_IP')
+if not STORAGE_IP:
+    raise Exception
 
 @celery_app.task(acks_late=True)
-def make_video(video_id, zip_filename, user_id) -> str:
+def make_video(video_id, filename, user_id) -> str:
     #video_id, zip_filename, user_id = args
     # download 
     '''
@@ -27,13 +34,40 @@ def make_video(video_id, zip_filename, user_id) -> str:
         for chunk in r.iter_content(chunk_size=1024):
             f.write(chunk)
     '''
-    zip_filename = zip_filename + ".zip"
-    print(str(ZIP_STORAGE/zip_filename))
-    print((ZIP_STORAGE/zip_filename).exists())
+    db = SessionLocal()
+    db.execute("SELECT 1")
+    zip_filename = filename + ".zip"
+    r = subprocess.run(["sshpass", "-p", "choozmo9", 
+                        "scp", "-o", "StrictHostKeyChecking=no", f"root@{STORAGE_IP}:{str(LOCAL_ZIP_STORAGE/zip_filename)}", f"{str(CELERY_ZIP_STORAGE/zip_filename)}"])
+    print(f'get from local storage: {r.returncode}')
+    print(f"video_id: {video_id}, file name: {filename}")
+    db.execute(f"UPDATE video SET progress_state='processing' where id={video_id}")
+    db.commit()
     # make video
+    try:
+      make_video_from_zip(working_dir=CELERY_ZIP_STORAGE,style=Path("app/style/choozmo"),  inputfile=zip_filename,opening=False, ending=False)
+    except Exception as e:
+      print(f"error:{e}")
+      db.execute(f"UPDATE video SET progress_state='failed' where id={video_id}")
+      db.commit()
+    else:
+      # 
+      video_filename = filename + ".mp4"
+      r = subprocess.run(["sshpass", "-p", "choozmo9", 
+                          "scp", "-o", "StrictHostKeyChecking=no", f"{str(CELERY_ZIP_STORAGE/'output.mp4')}", f"root@{STORAGE_IP}:{'/var/www/videos/'+video_filename}"])
+      print(f"return to local storage: {r.returncode}")
+      print(f"video_id: {video_id}, file name: {filename}")
 
+      db.execute(f"UPDATE video SET progress_state='completed' where id={video_id}")
+      db.commit()
 
-    
-    return "complete"
+      
+      return "complete"
 
+@celery_app.task(acks_late=True)
+def super_resolution(filenames):
+   source = [f"root@{STORAGE_IP}:{str(LOCAL_ZIP_STORAGE/filename)}" for filename in filenames]
+   r = subprocess.run(["sshpass", "-p", "choozmo9", 
+                        "scp", "-o", "StrictHostKeyChecking=no", f"root@{STORAGE_IP}:{str(LOCAL_ZIP_STORAGE/zip_filename)}", f"{str(CELERY_ZIP_STORAGE/zip_filename)}"])
+