from typing import Any, List, Optional from fastapi import UploadFile, File, Form from fastapi.responses import FileResponse from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session import app.crud as crud import app.models as models import app.schemas as schemas from app.api import deps from app.core.celery_app import celery_app from app.core.config import settings from pathlib import Path from app.core.celery_app import celery_app ZIP_STORAGE = Path("/app").joinpath(settings.BACKEND_ZIP_STORAGE) VIDEO_STORAGE = Path("/app").joinpath(settings.BACKEND_VIDEO_STORAGE) router = APIRouter() @router.get("/", response_model=List[schemas.Video]) def get_video_list( db: Session = Depends(deps.get_db), skip: int = 0, limit: int = 100, current_user: models.User = Depends(deps.get_current_active_user), ) -> Any: """ Retrieve items. """ if crud.user.is_superuser(current_user): videos = crud.video.get_multi(db, skip=skip, limit=limit) else: videos = crud.video.get_multi_by_owner( db=db, owner_id=current_user.id, skip=skip, limit=limit ) return videos @router.post("/", response_model=schemas.Video) def upload_plot( *, db: Session = Depends(deps.get_db), title: str=Form(...), upload_file: UploadFile=File(), current_user: models.User = Depends(deps.get_current_active_user), ) -> Any: """ Create new video. """ print(title) print(upload_file.filename) file_name = crud.video.generate_file_name(db=db, n=20) video_create = schemas.VideoCreate(title=title, progress_state="waiting", stored_file_name=file_name) video = crud.video.create_with_owner(db=db, obj_in=video_create, owner_id=current_user.id) try: with open(str(Path(ZIP_STORAGE).joinpath(video.stored_file_name+".zip")), 'wb') as f: while contents := upload_file.file.read(1024 * 1024): f.write(contents) except Exception as e: print(e, type(e)) return {"error": str(e)} finally: upload_file.file.close() celery_app.send_task("app.worker.make_video", args=[video.id, video.stored_file_name, current_user.id]) return video @router.get("/{id}") def download_video( *, db: Session = Depends(deps.get_db), id: int, current_user: models.User = Depends(deps.get_current_active_user), ) -> Any: video = db.query(models.Video).filter(db=db, id=id).first() if not video: raise HTTPException(status_code=404, detail="Video not found") if not crud.user.is_superuser(current_user) and (video.owner_id != current_user.id): raise HTTPException(status_code=400, detail="Not enough permissions") if not video.progress == "completed": raise HTTPException(status_code=400, detail="Video not completed yet") result = celery_app.send_task("app.worker.upload_to_server", args=[video.id, video.stored_file_name]) if result.get() == "completed" and (file_path:=Path(VIDEO_STORAGE).joinpath(video.stored_file_name+".mp4")).exit(): return FileResponse(path=str(file_path), filename=video.title+".mp4") else: raise HTTPException(status_code=404, detail="Error occurs") @router.get("/worker/{id}") def download_plot( *, db: Session = Depends(deps.get_db), id: int ) -> Any: video = db.query(models.Video).filter(db=db, id=id).first() if not video: raise HTTPException(status_code=404, detail="Video not found") file_path = Path(ZIP_STORAGE).joinpath(video.stored_file_name+".zip") return FileResponse(path=str(file_path)) @router.post("/worker/{id}") def upload_complete_video( *, db: Session = Depends(deps.get_db), id: int, upload_video: Optional[UploadFile]=None, result:int=Form(...) ) -> Any: video = db.query(models.Video).filter(db=db, id=id).first() if not video: raise HTTPException(status_code=404, detail="Video not found") try: with open(str(Path(VIDEO_STORAGE).joinpath(video.stored_file_name+".mp4")), 'wb') as f: while contents := upload_video.file.read(1024 * 1024): f.write(contents) except Exception as e: print(e, type(e)) return {"error": str(e)} finally: upload_video.file.close() return video