@@ -1,10 +1,10 @@
from typing import Any, List, Optional
import subprocess
from fastapi import UploadFile, File, Form
-from fastapi.responses import FileResponse
-from fastapi import APIRouter, Depends, HTTPException
+from fastapi.responses import FileResponse, JSONResponse
+from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, WebSocket
from sqlalchemy.orm import Session
+from fastapi.encoders import jsonable_encoder
import app.crud as crud
import app.models as models
import app.schemas as schemas
@@ -13,14 +13,16 @@ from app.api import deps
from app.core.celery_app import celery_app
from app.core.config import settings
from pathlib import Path
+from app.db.session import SessionLocal
-from app.core.celery_app import celery_app
+import asyncio
BACKEND_ZIP_STORAGE = Path("/app").joinpath(settings.BACKEND_ZIP_STORAGE)
LOCAL_ZIP_STORAGE = Path("/").joinpath(settings.LOCAL_ZIP_STORAGE)
router = APIRouter()
+video_clients = {}
@router.get("/", response_model=List[schemas.Video])
def get_video_list(
@@ -40,6 +42,20 @@ def get_video_list(
return videos
+def test(
+ *,
+ title: str,
+ anchor_id: int,
+ lang_id: int,
+ current_user: models.User = Depends(deps.get_current_active_user),
+) -> Any:
+ video_data = {"title":title, "anchor_id":anchor_id, "lang_id":lang_id}
+ print(video_data)
+ task = celery_app.send_task("app.worker.make_video_test", kwargs=video_data, )
+ print(task)
+ return "ok"
@router.post("/", response_model=schemas.Video)
def upload_plot(
@@ -49,18 +65,17 @@ def upload_plot(
lang_id: int=Form(...),
upload_file: UploadFile=File(),
current_user: models.User = Depends(deps.get_current_active_user),
+ background_tasks: BackgroundTasks,
) -> Any:
Create new video.
- file_name = crud.video.generate_file_name(db=db, n=20)
- video_create = schemas.VideoCreate(title=title, progress_state="waiting", stored_file_name=file_name)
- video = crud.video.create_with_owner(db=db, obj_in=video_create, owner_id=current_user.id)
+ filename = crud.video.generate_file_name(db=db, n=20)
- with open(str(Path(BACKEND_ZIP_STORAGE).joinpath(video.stored_file_name+".zip")), 'wb') as f:
+ with open(str(Path(BACKEND_ZIP_STORAGE).joinpath(filename+".zip")), 'wb') as f:
while contents := upload_file.file.read(1024 * 1024):
except Exception as e:
@@ -68,13 +83,76 @@ def upload_plot(
return {"error": str(e)}
+ '''
zip_filename = video.stored_file_name+".zip"
r = subprocess.run(["sshpass", "-p", "choozmo9",
"scp", "-P", "5722", "-o", "StrictHostKeyChecking=no", f"{str(BACKEND_ZIP_STORAGE/zip_filename)}", f"root@{str(LOCAL_ZIP_STORAGE/zip_filename)}"])
celery_app.send_task("app.worker.make_video", args=[video.id, video.stored_file_name, current_user.id, anchor_id, current_user.membership_status, current_user.available_time])
- return video
+ '''
+ video_create = schemas.VideoCreate(title=title, progress_state="PENDING", stored_filename=filename)
+ video = crud.video.create_with_owner(db=db, obj_in=video_create, owner_id=current_user.id)
+ return_msg = {"video_message":"accepted"}
+ video_data = jsonable_encoder(video)
+ video_data['membership_status'] = current_user.membership_status
+ video_data['available_time'] = current_user.available_time
+ video_data['video_id'] = video_data['id']
+ background_tasks.add_task(wait_finish, video_data)
+ return JSONResponse(return_msg, background=background_tasks)
+async def wait_finish(video_data:dict):
+ zip_filename = video_data['stored_filename']+".zip"
+ process = await asyncio.create_subprocess_exec("sshpass", "-p", "choozmo9",
+ "scp", "-P", "5722", "-o", "StrictHostKeyChecking=no", f"{str(BACKEND_ZIP_STORAGE/zip_filename)}", f"root@{str(LOCAL_ZIP_STORAGE)}")
+ await process.wait()
+ task = celery_app.send_task("app.worker.make_video", kwargs=video_data)
+ while True:
+ await asyncio.sleep(1)
+ if task.state != "PENDING":
+ break
+ db = SessionLocal()
+ video = db.query(models.Video).get(video_data['id'])
+ video.progress_state = "STARTED"
+ db.commit()
+ db.close()
+ msg_data = f"{video_data['stored_filename']}:STARTED"
+ await publish(msg_data)
+ while True:
+ await asyncio.sleep(1)
+ if task.state != "STARTED":
+ break
+ if task.state == "SUCCESS":
+ db = SessionLocal()
+ video = db.query(models.Video).get(video_data['id'])
+ user = db.query(models.User).get(video_data['owner_id'])
+ video.progress_state = "SUCCESS"
+ if time := task.result:
+ user.available_time -= int(time)
+ video.length = int(time)
+ db.commit()
+ db.close()
+ msg_data = f"{video_data['stored_filename']}:SUCCESS:{int(time)}"
+ elif task.state == "FAILURE":
+ db = SessionLocal()
+ video = db.query(models.Video).get(video_data['id'])
+ video.progress_state = "FAILURE"
+ db.commit()
+ db.close()
+ msg_data = f"{video_data['stored_filename']}:FAILURE"
+ await publish(msg_data)
+async def publish(data):
+ for video_client in video_clients.values():
+ await video_client.send_text(f"{data}")
def download_video(
@@ -86,3 +164,21 @@ def download_video(
return {"message":"address"}
+async def websocket_endpoint(websocket: WebSocket):
+ await websocket.accept()
+ key = websocket.headers.get('sec-websocket-key')
+ video_clients[key] = websocket
+ try:
+ while True:
+ data = await websocket.receive_text()
+ if not data.startswith("subscribe"):
+ del video_clients[key]
+ #for client in sr_clients.values():
+ # await client.send_text(f"ID: {key} | Message: {data}")
+ except:
+ # 接続が切れた場合、当該クライアントを削除する
+ del video_clients[key]