worker.py 4.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. from raven import Client
  2. import os, subprocess
  3. from app.core.celery_app import celery_app
  4. from app.core.config import settings
  5. import requests
  6. from pathlib import Path
  7. from urllib.parse import urlparse, urljoin
  8. from .aianchor import make_video_from_zip
  9. import gc
  10. #client_sentry = Client(settings.SENTRY_DSN)
  11. import dataset
  12. from app.db.session import SessionLocal
  13. from app.models import video
  14. from app import crud
  15. import gc
  16. download_to_local_url = urljoin(settings.SERVER_HOST, settings.API_V1_STR+"/videos/worker")
  17. upload_to_server_url = urljoin(settings.SERVER_HOST, settings.API_V1_STR+"/videos/worker")
  18. LOCAL_ZIP_STORAGE = Path("/").joinpath(settings.LOCAL_ZIP_STORAGE)
  19. CELERY_ZIP_STORAGE = Path("/").joinpath(settings.CELERY_ZIP_STORAGE)
  20. CELERY_VIDEO_STORAGE = Path("/").joinpath(settings.LOCAL_VIDEO_STORAGE)
  21. LOCAL_VIDEO_STORAGE = Path("/").joinpath(settings.LOCAL_VIDEO_STORAGE)
  22. STORAGE_IP = '192.168.192.252'#os.getenv('STORAGE_IP')
  23. BACKEND_IP = '172.'
  24. if not STORAGE_IP:
  25. raise Exception
  26. @celery_app.task(acks_late=True, bind=True, track_started=True)
  27. def make_video_test(self, title=None, anchor_id=None, lang_id=None)->int:
  28. print(title, anchor_id, lang_id)
  29. return 0
  30. @celery_app.task(acks_late=True, bind=True, track_started=True)
  31. def make_video(self, *, video_id, stored_filename=None, user_id=None, anchor_id=None, membership=None, available_time=None, **others) -> int:
  32. #video_id, zip_filename, user_id = args
  33. # download
  34. '''
  35. r = requests.get(download_to_local_url, stream=True)
  36. with open(str(VIDEO_STORAGE/zip_filename), 'wb') as f:
  37. r.raise_for_status()
  38. for chunk in r.iter_content(chunk_size=1024):
  39. f.write(chunk)
  40. '''
  41. # db = SessionLocal()
  42. # db.execute("SELECT 1")
  43. zip_filename = stored_filename + ".zip"
  44. r = subprocess.run(["sshpass", "-p", "choozmo9",
  45. "scp", "-o", "StrictHostKeyChecking=no", f"root@{STORAGE_IP}:{str(LOCAL_ZIP_STORAGE/zip_filename)}", f"{str(CELERY_ZIP_STORAGE/zip_filename)}"])
  46. print(f'get from local storage: {r.returncode}')
  47. print(f"video_id: {video_id}, file name: {stored_filename}")
  48. # db.execute(f"UPDATE video SET progress_state='processing' where id={video_id}")
  49. # db.commit()
  50. # make video
  51. watermark_path='medias/logo_watermark.jpg'
  52. content_time = 0
  53. try:
  54. if membership=="infinite":
  55. watermark_path=None
  56. content_time = make_video_from_zip(working_dir=CELERY_ZIP_STORAGE,style=Path("app/style/choozmo"),
  57. inputfile=zip_filename,
  58. opening=False,
  59. ending=False,
  60. watermark_path=watermark_path,
  61. available_time=available_time)
  62. except Exception as e:
  63. print(f"error:{e}")
  64. # db.execute(f"UPDATE video SET progress_state='failed' where id={video_id}")
  65. # db.commit()
  66. else:
  67. #
  68. video_filename = stored_filename + ".mp4"
  69. r = subprocess.run(["sshpass", "-p", "choozmo9",
  70. "scp", "-o", "StrictHostKeyChecking=no", f"{str(CELERY_ZIP_STORAGE/'output.mp4')}", f"root@{STORAGE_IP}:{'/var/www/videos/'+video_filename}"])
  71. print(f"return to local storage: {r.returncode}")
  72. print(f"video_id: {video_id}, file name: {stored_filename}")
  73. # db.execute(f"UPDATE video SET progress_state='completed', length={int(content_time)} where id={video_id}")
  74. # db.commit()
  75. gc_num = gc.collect()
  76. print(f"gc_num: {gc_num}")
  77. return int(content_time)
  78. @celery_app.task(acks_late=True, bind=True, track_started=True)
  79. def super_resolution(self, dirname, filenames):
  80. r = subprocess.run(["sshpass", "-p", "choozmo9",
  81. "scp", "-o", "StrictHostKeyChecking=no", "-r", f"root@{STORAGE_IP}:{str(LOCAL_ZIP_STORAGE/dirname)}", f"{str(CELERY_ZIP_STORAGE)}"])
  82. r = subprocess.run(["python", "/root/github/GFPGAN/inference_gfpgan.py", "-i", f"{str(CELERY_ZIP_STORAGE/dirname)}", "-o", f"{str(CELERY_ZIP_STORAGE/dirname)}", "-v", "1.4", "-s", "2"])
  83. r = subprocess.run(["sshpass", "-p", "choozmo9",
  84. "scp", "-o", "StrictHostKeyChecking=no", "-r", f"{str(CELERY_ZIP_STORAGE/dirname)}", f"root@{STORAGE_IP}:{str(LOCAL_ZIP_STORAGE)}"])