from typing import Any, List, Optional import subprocess from fastapi import UploadFile, File, Form from fastapi.responses import FileResponse, JSONResponse from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, WebSocket from sqlalchemy.orm import Session from fastapi.encoders import jsonable_encoder import app.crud as crud import app.models as models import app.schemas as schemas from app.api import deps from app.core.celery_app import celery_app from app.core.config import settings from pathlib import Path from app.db.session import SessionLocal import asyncio BACKEND_ZIP_STORAGE = Path("/app").joinpath(settings.BACKEND_ZIP_STORAGE) LOCAL_ZIP_STORAGE = Path("/").joinpath(settings.LOCAL_ZIP_STORAGE) router = APIRouter() video_clients = {} @router.get("/", response_model=List[schemas.Video]) def get_video_list( db: Session = Depends(deps.get_db), skip: int = 0, limit: int = 100, current_user: models.User = Depends(deps.get_current_active_user), ) -> Any: """ Retrieve items. """ if crud.user.is_superuser(current_user): videos = crud.video.get_multi(db, skip=skip, limit=limit) else: videos = crud.video.get_multi_by_owner( db=db, owner_id=current_user.id, skip=skip, limit=limit ) return videos @router.post("/test") def test( *, title: str, anchor_id: int, lang_id: int, current_user: models.User = Depends(deps.get_current_active_user), ) -> Any: video_data = {"title":title, "anchor_id":anchor_id, "lang_id":lang_id} print(video_data) task = celery_app.send_task("app.worker.make_video_test", kwargs=video_data, ) print(task) return "ok" @router.post("/", response_model=schemas.Video) def upload_plot( *, db: Session = Depends(deps.get_db), title: str=Form(...), anchor_id: int=Form(...), lang_id: int=Form(...), upload_file: UploadFile=File(), current_user: models.User = Depends(deps.get_current_active_user), background_tasks: BackgroundTasks, ) -> Any: """ Create new video. """ print(title) print(upload_file.filename) filename = crud.video.generate_file_name(db=db, n=20) try: with open(str(Path(BACKEND_ZIP_STORAGE).joinpath(filename+".zip")), 'wb') as f: while contents := upload_file.file.read(1024 * 1024): f.write(contents) except Exception as e: print(e, type(e)) return {"error": str(e)} finally: upload_file.file.close() ''' zip_filename = video.stored_file_name+".zip" print(str(BACKEND_ZIP_STORAGE/zip_filename)) r = subprocess.run(["sshpass", "-p", "choozmo9", "scp", "-P", "5722", "-o", "StrictHostKeyChecking=no", f"{str(BACKEND_ZIP_STORAGE/zip_filename)}", f"root@172.104.93.163:{str(LOCAL_ZIP_STORAGE/zip_filename)}"]) print(r.returncode) celery_app.send_task("app.worker.make_video", args=[video.id, video.stored_file_name, current_user.id, anchor_id, current_user.membership_status, current_user.available_time]) ''' video_create = schemas.VideoCreate(title=title, progress_state="PENDING", stored_filename=filename) video = crud.video.create_with_owner(db=db, obj_in=video_create, owner_id=current_user.id) return_msg = {"video_message":"accepted"} video_data = jsonable_encoder(video) video_data['membership_status'] = current_user.membership_status video_data['available_time'] = current_user.available_time video_data['video_id'] = video_data['id'] background_tasks.add_task(wait_finish, video_data) return JSONResponse(return_msg, background=background_tasks) async def wait_finish(video_data:dict): zip_filename = video_data['stored_filename']+".zip" process = await asyncio.create_subprocess_exec("sshpass", "-p", "choozmo9", "scp", "-P", "5722", "-o", "StrictHostKeyChecking=no", f"{str(BACKEND_ZIP_STORAGE/zip_filename)}", f"root@172.104.93.163:{str(LOCAL_ZIP_STORAGE)}") await process.wait() task = celery_app.send_task("app.worker.make_video", kwargs=video_data) while True: await asyncio.sleep(1) if task.state != "PENDING": break db = SessionLocal() video = db.query(models.Video).get(video_data['id']) video.progress_state = "STARTED" db.commit() db.close() msg_data = f"{video_data['stored_filename']}:STARTED" await publish(msg_data) while True: await asyncio.sleep(1) if task.state != "STARTED": break if task.state == "SUCCESS": db = SessionLocal() video = db.query(models.Video).get(video_data['id']) user = db.query(models.User).get(video_data['owner_id']) video.progress_state = "SUCCESS" if time := task.result: user.available_time -= int(time) video.length = int(time) db.commit() db.close() msg_data = f"{video_data['stored_filename']}:SUCCESS:{int(time)}" elif task.state == "FAILURE": db = SessionLocal() video = db.query(models.Video).get(video_data['id']) video.progress_state = "FAILURE" db.commit() db.close() msg_data = f"{video_data['stored_filename']}:FAILURE" await publish(msg_data) async def publish(data): for video_client in video_clients.values(): await video_client.send_text(f"{data}") @router.get("/{id}") def download_video( *, db: Session = Depends(deps.get_db), id: int, current_user: models.User = Depends(deps.get_current_active_user), ) -> Any: return {"message":"address"} @router.websocket("") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() key = websocket.headers.get('sec-websocket-key') video_clients[key] = websocket try: while True: data = await websocket.receive_text() if not data.startswith("subscribe"): del video_clients[key] #for client in sr_clients.values(): # await client.send_text(f"ID: {key} | Message: {data}") except: # 接続が切れた場合、当該クライアントを削除する del video_clients[key]