123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- from typing import Any, List, Optional
- import subprocess
- from fastapi import UploadFile, File, Form
- from fastapi.responses import FileResponse, JSONResponse
- from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, WebSocket
- from sqlalchemy.orm import Session
- from fastapi.encoders import jsonable_encoder
- import app.crud as crud
- import app.models as models
- import app.schemas as schemas
- from app.api import deps
- from app.core.celery_app import celery_app
- from app.core.config import settings
- from pathlib import Path
- from app.db.session import SessionLocal
- import asyncio
- BACKEND_ZIP_STORAGE = Path("/app").joinpath(settings.BACKEND_ZIP_STORAGE)
- LOCAL_ZIP_STORAGE = Path("/").joinpath(settings.LOCAL_ZIP_STORAGE)
- router = APIRouter()
- video_clients = {}
- @router.get("/", response_model=List[schemas.Video])
- def get_video_list(
- db: Session = Depends(deps.get_db),
- skip: int = 0,
- limit: int = 100,
- current_user: models.User = Depends(deps.get_current_active_user),
- ) -> Any:
- """
- Retrieve items.
- """
- if crud.user.is_superuser(current_user):
- videos = crud.video.get_multi(db, skip=skip, limit=limit)
- else:
- videos = crud.video.get_multi_by_owner(
- db=db, owner_id=current_user.id, skip=skip, limit=limit
- )
- return videos
- @router.post("/test")
- def test(
- *,
- title: str,
- anchor_id: int,
- lang_id: int,
- current_user: models.User = Depends(deps.get_current_active_user),
- ) -> Any:
- video_data = {"title":title, "anchor_id":anchor_id, "lang_id":lang_id}
- print(video_data)
- task = celery_app.send_task("app.worker.make_video_test", kwargs=video_data, )
- print(task)
- return "ok"
- @router.post("/", response_model=schemas.Video)
- def upload_plot(
- *,
- db: Session = Depends(deps.get_db),
- title: str=Form(...),
- anchor_id: int=Form(...),
- lang_id: int=Form(...),
- upload_file: UploadFile=File(),
- current_user: models.User = Depends(deps.get_current_active_user),
- background_tasks: BackgroundTasks,
- ) -> Any:
- """
- Create new video.
- """
- print(title)
- print(upload_file.filename)
- filename = crud.video.generate_file_name(db=db, n=20)
-
- try:
- with open(str(Path(BACKEND_ZIP_STORAGE).joinpath(filename+".zip")), 'wb') as f:
- while contents := upload_file.file.read(1024 * 1024):
- f.write(contents)
- except Exception as e:
- print(e, type(e))
- return {"error": str(e)}
- finally:
- upload_file.file.close()
-
- '''
- zip_filename = video.stored_file_name+".zip"
- print(str(BACKEND_ZIP_STORAGE/zip_filename))
- r = subprocess.run(["sshpass", "-p", "choozmo9",
- "scp", "-P", "5722", "-o", "StrictHostKeyChecking=no", f"{str(BACKEND_ZIP_STORAGE/zip_filename)}", f"root@172.104.93.163:{str(LOCAL_ZIP_STORAGE/zip_filename)}"])
- print(r.returncode)
- celery_app.send_task("app.worker.make_video", args=[video.id, video.stored_file_name, current_user.id, anchor_id, current_user.membership_status, current_user.available_time])
- '''
- video_create = schemas.VideoCreate(title=title, progress_state="PENDING", stored_filename=filename)
- video = crud.video.create_with_owner(db=db, obj_in=video_create, owner_id=current_user.id)
- return_msg = {"video_message":"accepted"}
- video_data = jsonable_encoder(video)
- video_data['membership_status'] = current_user.membership_status
- video_data['available_time'] = current_user.available_time
- video_data['video_id'] = video_data['id']
- background_tasks.add_task(wait_finish, video_data)
- return JSONResponse(return_msg, background=background_tasks)
- async def wait_finish(video_data:dict):
- zip_filename = video_data['stored_filename']+".zip"
- process = await asyncio.create_subprocess_exec("sshpass", "-p", "choozmo9",
- "scp", "-P", "5722", "-o", "StrictHostKeyChecking=no", f"{str(BACKEND_ZIP_STORAGE/zip_filename)}", f"root@172.104.93.163:{str(LOCAL_ZIP_STORAGE)}")
- await process.wait()
- task = celery_app.send_task("app.worker.make_video", kwargs=video_data)
- while True:
- await asyncio.sleep(1)
- if task.state != "PENDING":
- break
-
- db = SessionLocal()
- video = db.query(models.Video).get(video_data['id'])
- video.progress_state = "STARTED"
- db.commit()
- db.close()
- msg_data = f"{video_data['stored_filename']}:STARTED"
- await publish(msg_data)
- while True:
- await asyncio.sleep(1)
- if task.state != "STARTED":
- break
- if task.state == "SUCCESS":
- db = SessionLocal()
- video = db.query(models.Video).get(video_data['id'])
- user = db.query(models.User).get(video_data['owner_id'])
- video.progress_state = "SUCCESS"
- if time := task.result:
- user.available_time -= int(time)
- video.length = int(time)
- db.commit()
- db.close()
- msg_data = f"{video_data['stored_filename']}:SUCCESS:{int(time)}"
- elif task.state == "FAILURE":
- db = SessionLocal()
- video = db.query(models.Video).get(video_data['id'])
- video.progress_state = "FAILURE"
- db.commit()
- db.close()
- msg_data = f"{video_data['stored_filename']}:FAILURE"
- await publish(msg_data)
- async def publish(data):
- for video_client in video_clients.values():
- await video_client.send_text(f"{data}")
- @router.get("/{id}")
- def download_video(
- *,
- db: Session = Depends(deps.get_db),
- id: int,
- current_user: models.User = Depends(deps.get_current_active_user),
- ) -> Any:
-
- return {"message":"address"}
- @router.websocket("")
- async def websocket_endpoint(websocket: WebSocket):
- await websocket.accept()
- key = websocket.headers.get('sec-websocket-key')
- video_clients[key] = websocket
- try:
- while True:
- data = await websocket.receive_text()
- if not data.startswith("subscribe"):
- del video_clients[key]
- #for client in sr_clients.values():
- # await client.send_text(f"ID: {key} | Message: {data}")
- except:
- # 接続が切れた場合、当該クライアントを削除する
- del video_clients[key]
|