videos.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. from typing import Any, List, Optional
  2. import subprocess
  3. from fastapi import UploadFile, File, Form
  4. from fastapi.responses import FileResponse, JSONResponse
  5. from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, WebSocket
  6. from sqlalchemy.orm import Session
  7. from fastapi.encoders import jsonable_encoder
  8. import app.crud as crud
  9. import app.models as models
  10. import app.schemas as schemas
  11. from app.api import deps
  12. from app.core.celery_app import celery_app
  13. from app.core.config import settings
  14. from pathlib import Path
  15. from app.db.session import SessionLocal
  16. import asyncio
  17. BACKEND_ZIP_STORAGE = Path("/app").joinpath(settings.BACKEND_ZIP_STORAGE)
  18. LOCAL_ZIP_STORAGE = Path("/").joinpath(settings.LOCAL_ZIP_STORAGE)
  19. router = APIRouter()
  20. video_clients = {}
  21. @router.get("/", response_model=List[schemas.Video])
  22. def get_video_list(
  23. db: Session = Depends(deps.get_db),
  24. skip: int = 0,
  25. limit: int = 100,
  26. current_user: models.User = Depends(deps.get_current_active_user),
  27. ) -> Any:
  28. """
  29. Retrieve items.
  30. """
  31. if crud.user.is_superuser(current_user):
  32. videos = crud.video.get_multi(db, skip=skip, limit=limit)
  33. else:
  34. videos = crud.video.get_multi_by_owner(
  35. db=db, owner_id=current_user.id, skip=skip, limit=limit
  36. )
  37. return videos
  38. @router.post("/test")
  39. def test(
  40. *,
  41. title: str,
  42. anchor_id: int,
  43. lang_id: int,
  44. current_user: models.User = Depends(deps.get_current_active_user),
  45. ) -> Any:
  46. video_data = {"title":title, "anchor_id":anchor_id, "lang_id":lang_id}
  47. print(video_data)
  48. task = celery_app.send_task("app.worker.make_video_test", kwargs=video_data, )
  49. print(task)
  50. return "ok"
  51. @router.post("/", response_model=schemas.Video)
  52. def upload_plot(
  53. *,
  54. db: Session = Depends(deps.get_db),
  55. title: str=Form(...),
  56. anchor_id: int=Form(...),
  57. lang_id: int=Form(...),
  58. upload_file: UploadFile=File(),
  59. current_user: models.User = Depends(deps.get_current_active_user),
  60. background_tasks: BackgroundTasks,
  61. ) -> Any:
  62. """
  63. Create new video.
  64. """
  65. print(title)
  66. print(upload_file.filename)
  67. filename = crud.video.generate_file_name(db=db, n=20)
  68. try:
  69. with open(str(Path(BACKEND_ZIP_STORAGE).joinpath(filename+".zip")), 'wb') as f:
  70. while contents := upload_file.file.read(1024 * 1024):
  71. f.write(contents)
  72. except Exception as e:
  73. print(e, type(e))
  74. return {"error": str(e)}
  75. finally:
  76. upload_file.file.close()
  77. '''
  78. zip_filename = video.stored_file_name+".zip"
  79. print(str(BACKEND_ZIP_STORAGE/zip_filename))
  80. r = subprocess.run(["sshpass", "-p", "choozmo9",
  81. "scp", "-P", "5722", "-o", "StrictHostKeyChecking=no", f"{str(BACKEND_ZIP_STORAGE/zip_filename)}", f"root@172.104.93.163:{str(LOCAL_ZIP_STORAGE/zip_filename)}"])
  82. print(r.returncode)
  83. celery_app.send_task("app.worker.make_video", args=[video.id, video.stored_file_name, current_user.id, anchor_id, current_user.membership_status, current_user.available_time])
  84. '''
  85. video_create = schemas.VideoCreate(title=title, progress_state="PENDING", stored_filename=filename)
  86. video = crud.video.create_with_owner(db=db, obj_in=video_create, owner_id=current_user.id)
  87. return_msg = {"video_message":"accepted"}
  88. video_data = jsonable_encoder(video)
  89. video_data['membership_status'] = current_user.membership_status
  90. video_data['available_time'] = current_user.available_time
  91. video_data['video_id'] = video_data['id']
  92. background_tasks.add_task(wait_finish, video_data)
  93. return JSONResponse(return_msg, background=background_tasks)
  94. async def wait_finish(video_data:dict):
  95. zip_filename = video_data['stored_filename']+".zip"
  96. process = await asyncio.create_subprocess_exec("sshpass", "-p", "choozmo9",
  97. "scp", "-P", "5722", "-o", "StrictHostKeyChecking=no", f"{str(BACKEND_ZIP_STORAGE/zip_filename)}", f"root@172.104.93.163:{str(LOCAL_ZIP_STORAGE)}")
  98. await process.wait()
  99. task = celery_app.send_task("app.worker.make_video", kwargs=video_data)
  100. while True:
  101. await asyncio.sleep(1)
  102. if task.state != "PENDING":
  103. break
  104. db = SessionLocal()
  105. video = db.query(models.Video).get(video_data['id'])
  106. video.progress_state = "STARTED"
  107. db.commit()
  108. db.close()
  109. msg_data = f"{video_data['stored_filename']}:STARTED"
  110. await publish(msg_data)
  111. while True:
  112. await asyncio.sleep(1)
  113. if task.state != "STARTED":
  114. break
  115. if task.state == "SUCCESS":
  116. db = SessionLocal()
  117. video = db.query(models.Video).get(video_data['id'])
  118. user = db.query(models.User).get(video_data['owner_id'])
  119. video.progress_state = "SUCCESS"
  120. if time := task.result:
  121. user.available_time -= int(time)
  122. video.length = int(time)
  123. db.commit()
  124. db.close()
  125. msg_data = f"{video_data['stored_filename']}:SUCCESS:{int(time)}"
  126. elif task.state == "FAILURE":
  127. db = SessionLocal()
  128. video = db.query(models.Video).get(video_data['id'])
  129. video.progress_state = "FAILURE"
  130. db.commit()
  131. db.close()
  132. msg_data = f"{video_data['stored_filename']}:FAILURE"
  133. await publish(msg_data)
  134. async def publish(data):
  135. for video_client in video_clients.values():
  136. await video_client.send_text(f"{data}")
  137. @router.get("/{id}")
  138. def download_video(
  139. *,
  140. db: Session = Depends(deps.get_db),
  141. id: int,
  142. current_user: models.User = Depends(deps.get_current_active_user),
  143. ) -> Any:
  144. return {"message":"address"}
  145. @router.websocket("")
  146. async def websocket_endpoint(websocket: WebSocket):
  147. await websocket.accept()
  148. key = websocket.headers.get('sec-websocket-key')
  149. video_clients[key] = websocket
  150. try:
  151. while True:
  152. data = await websocket.receive_text()
  153. if not data.startswith("subscribe"):
  154. del video_clients[key]
  155. #for client in sr_clients.values():
  156. # await client.send_text(f"ID: {key} | Message: {data}")
  157. except:
  158. # 接続が切れた場合、当該クライアントを削除する
  159. del video_clients[key]