123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334 |
- from typing import Any, List, Optional, Literal
- from datetime import datetime
- from fastapi import APIRouter, Body, Depends, HTTPException, Form, status, Response, BackgroundTasks
- from fastapi.encoders import jsonable_encoder
- from pydantic.networks import EmailStr, HttpUrl
- from sqlalchemy.orm import Session
- from fastapi.responses import FileResponse, PlainTextResponse
- import app.crud as crud
- import app.models as models
- import app.schemas as schemas
- from app.api import deps
- from app.core.config import settings
- from app.core.ecpay_payment_sdk import ECPayPaymentSdk
- from app.utils import send_new_account_email
- from pydantic import BaseModel
- import requests
- from random import choice
- import string
- import json
- import os
- import PIL.Image
- from gradio_client import Client
- from openai import OpenAI
- import webuiapi
- import datetime
- from pathlib import Path
- import openpyxl as px
- import tempfile
- import shutil
- import PIL
- import re
- from app.db.session import SessionLocal
- import requests
- import asyncio
- from app.core.celery_app import celery_app
- from app.core.config import settings
- from app.aianchor.utils2 import check_zip, VideoMakerError
- from pathlib import Path
- import emails
- BACKEND_ZIP_STORAGE = Path("/app").joinpath(settings.BACKEND_ZIP_STORAGE)
- LOCAL_ZIP_STORAGE = Path("/").joinpath(settings.LOCAL_ZIP_STORAGE)
- LINE_URL = 'https://notify-api.line.me/api/notify'
- LINE_TOKEN = 'o8dqdVL2k8aiWO4jy3pawZamBu53bbjoSh2u0GJ7F0j'
- router = APIRouter()
- def gen_prompt(content:str):
- client = OpenAI(api_key='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')
- completion = client.chat.completions.create(
- model="gpt-4o-mini",
- messages=[
- {"role": "assistant", "content": "You are a helpful image genaration prompt engineer. \
- You will convert the following inputs into English prompts for image generation AI and respond accordingly. \
- Do not start with 'Create an image. \
- Keep it within 50 words"},
- {
- "role": "user",
- "content": content
- }
- ]
- )
- return completion.choices[0].message.content
- def gen_flux_image(prompt):
- client = Client("http://192.168.192.83:7860/")
- result = client.predict(
- model_id="models/FLUX.1-schnell",
- prompt=prompt,
- width=1280,
- height=720,
- seed=-1,
- steps=4,
- guidance_scale=3.5,
- add_sampling_metadata=True,
- api_name="/generate"
- )
- with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as t:
- PIL.Image.open(result[0]).convert("RGB").save(t.name,"jpeg")
- return t.name
- def gen_sd_image(prompt):
- api = webuiapi.WebUIApi(host='192.168.192.38', port=7860)
- api.util_set_model('sd3_medium')
- result = api.txt2img(prompt=prompt,
- negative_prompt="",
- seed=-1,
- cfg_scale=4,
- sampler_name='Euler',
- scheduler='Automatic',
- steps=40,
- width=1280,
- height=720,
- )
- with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as t:
- result.image.save(t, "jpeg")
- return t.name
- punctuation = r"[.,!?;:。 、!?,;:]"
- @router.post('/gen-zip')
- def generate_zip(
- *,
- background_tasks: BackgroundTasks,
- current_user: models.User = Depends(deps.get_current_active_user),
- model:Literal['sd3', 'flux']="sd3",
- texts:List[str]):
- if not model:
- model = 'sd3'
- wb = px.Workbook()
- ws = wb.active
- ws.title = 'script'
- ws['A1'] = '大標'
- ws['B1'] = '字幕'
- ws['C1'] = '素材'
- with tempfile.TemporaryDirectory() as td:
- dir = Path(f'{td}/{datetime.datetime.now().strftime("%Y%m%d%H%M%S")}')
- dir.mkdir(exist_ok=False)
- texts = [text for text in texts if text]
- for i, text in enumerate(texts):
- print(f'{i+1}/{len(texts)}')
- prompt = gen_prompt(text)
- if model=='flux':
- img_path = Path(gen_flux_image(prompt))
- elif model=='sd3':
- img_path = Path(gen_sd_image(prompt))
- print("before", str(img_path))
- img_path = img_path.rename(dir/(f'{i+1:02}'+img_path.suffix))
- print("after", str(img_path))
- ws['B'+ str(i+2)] = re.sub(punctuation, r"\\", text)
- ws['C'+ str(i+2)] = img_path.name
- excel_path = Path(dir/'script.xlsx')
- wb.save(excel_path)
- output_dir = '/tmp'
- shutil.make_archive(f'{output_dir}/{dir.name}', format='zip', root_dir=td)
- def remove_zip():
- if os.path.exists(f'{output_dir}/{dir.name}.zip'):
- os.remove(f'{output_dir}/{dir.name}.zip')
- background_tasks.add_task(remove_zip)
- return FileResponse(f'{output_dir}/{dir.name}.zip', media_type="application/zip")
- @router.post('/gen-video')
- def generate_video(
- *,
- background_tasks: BackgroundTasks,
- #current_user: models.User = Depends(deps.get_current_active_user),
- model:Literal['sd3', 'flux']="sd3",
- email:EmailStr,
- texts:List[str],
- lang:Literal["en", 'zh']):
-
- flag=False
- for text in texts:
- if text:
- flag=True
- if flag:
- background_tasks.add_task(wait_finish, model, email, texts, lang)
- return PlainTextResponse("OK", background=background_tasks)
- else:
- return HTTPException("No texts.")
- async def wait_finish(model, email, texts, lang):
- db = SessionLocal()
- if not model:
- model = 'sd3'
- wb = px.Workbook()
- ws = wb.active
- ws.title = 'script'
- ws['A1'] = '大標'
- ws['B1'] = '字幕'
- ws['C1'] = '素材'
- with tempfile.TemporaryDirectory() as td:
- dir = Path(f'{td}/{datetime.datetime.now().strftime("%Y%m%d%H%M%S")}')
- dir.mkdir(exist_ok=False)
- texts = [text for text in texts if text]
- for i, text in enumerate(texts):
- print(f'{i+1}/{len(texts)}')
- prompt = gen_prompt(text)
- if model=='flux':
- img_path = Path(gen_flux_image(prompt))
- elif model=='sd3':
- img_path = Path(gen_sd_image(prompt))
- print("before", str(img_path))
- img_path = img_path.rename(dir/(f'{i+1:02}'+img_path.suffix))
- print("after", str(img_path))
- ws['B'+ str(i+2)] = re.sub(punctuation, r"\\", text)
- ws['C'+ str(i+2)] = img_path.name
- excel_path = Path(dir/'script.xlsx')
- wb.save(excel_path)
- output_dir = '/tmp'
- shutil.make_archive(f'{output_dir}/{dir.name}', format='zip', root_dir=td)
- def remove_zip():
- if os.path.exists(f'{output_dir}/{dir.name}.zip'):
- os.remove(f'{output_dir}/{dir.name}.zip')
- current_user = crud.user.get(db, id=0)
- video_create = schemas.VideoCreate(title="guest", progress_state="PENDING", stored_filename=dir.name)
- video = crud.video.create_with_owner(db=db, obj_in=video_create, owner_id=current_user.id)
- return_msg = {"video_message":"accepted", "accepted":True}
- video_data = jsonable_encoder(video)
- video_data['membership_status'] = current_user.membership_status
- video_data['available_time'] = current_user.available_time
- video_data['video_id'] = video_data['id']
- video_data['character'] = "hannah-2"
- video_data['anchor'] = "hannah-2"
- video_data['style'] = "style14"
- video_data['lang'] = lang
- video_data['email'] = email
- db.close()
-
- zip_filename = video_data['stored_filename']+".zip"
- process = await asyncio.create_subprocess_exec("sshpass", "-p", "choozmo9",
- "scp", "-P", "5722", "-o", "StrictHostKeyChecking=no", f"/tmp/{zip_filename}", f"root@172.104.93.163:{str(LOCAL_ZIP_STORAGE)}")
-
- await process.wait()
- if os.path.exists(f"/tmp/{zip_filename}"):
- os.remove(f"/tmp/{zip_filename}")
- headers = {
- 'Authorization': 'Bearer ' + LINE_TOKEN
- }
- data = {
- 'message':f'cloud-choozmo-com\n \
- Video\n \
- user: {current_user.id}\n \
- video: {video_data["id"]}\n \
- state: queuing'
- }
- data = requests.post(LINE_URL, headers=headers, data=data)
- task = celery_app.send_task("app.worker.make_video", kwargs=video_data)
- while True:
- await asyncio.sleep(1)
- if task.state != "PENDING":
- break
-
- db = SessionLocal()
- video = db.query(models.Video).get(video_data['id'])
- video.progress_state = "STARTED"
- db.commit()
- db.close()
- msg_data = f"{video_data['stored_filename']}:STARTED"
- headers = {
- 'Authorization': 'Bearer ' + LINE_TOKEN
- }
- data = {
- 'message':f'cloud-choozmo-com\n \
- Video\n \
- user: {video_data["owner_id"]}\n \
- membership: {video_data["membership_status"]}\n \
- video: {video_data["id"]}\n \
- state: start'
- }
- data = requests.post(LINE_URL, headers=headers, data=data)
- while True:
- await asyncio.sleep(1)
- if task.state != "STARTED":
- break
- if task.state == "SUCCESS":
- db = SessionLocal()
- video = db.query(models.Video).get(video_data['id'])
- user = db.query(models.User).get(video_data['owner_id'])
- video.progress_state = "SUCCESS"
- if time := task.result:
- user.available_time -= int(time) if int(time) <= user.available_time else 0
- video.length = int(time)
- db.commit()
- db.close()
- msg_data = f"{video_data['stored_filename']}:SUCCESS:{int(time)}"
- headers = {
- 'Authorization': 'Bearer ' + LINE_TOKEN
- }
- data = {
- 'message':f'cloud-choozmo-com\n \
- Video\n \
- user: {video_data["owner_id"]}\n \
- membership: {video_data["membership_status"]}\n \
- video: {video_data["id"]}\n \
- state: success'
- }
- data = requests.post(LINE_URL, headers=headers, data=data)
- send_simple_email(email_to=video_data['email'], video_path=f"http://172.104.93.163:30080/{video_data['stored_filename']}.mp4")
- elif task.state == "FAILURE":
- db = SessionLocal()
- video = db.query(models.Video).get(video_data['id'])
- video.progress_state = "FAILURE"
- db.commit()
- db.close()
- msg_data = f"{video_data['stored_filename']}:FAILURE"
- headers = {
- 'Authorization': 'Bearer ' + LINE_TOKEN
- }
- data = {
- 'message':f'cloud-choozmo-com\n \
- Video\n \
- user: {video_data["owner_id"]}\n \
- membership: {video_data["membership_status"]}\n \
- video: {video_data["id"]}\n \
- state: failure'
- }
- data = requests.post(LINE_URL, headers=headers, data=data)
- send_simple_email(email_to=video_data['email'], video_path=None)
-
- def send_simple_email(email_to: str, video_path: str):
- if video_path==None:
- message = emails.html(html="<p>Sorry!<br>The video failed to produce for some reason.</p>",
- subject="Choozmo Video Service",
- mail_from=('ChoozMo Video Service', 'verify@choozmo.com'))
- r = message.send(to=email_to, mail_from='verify@choozmo.com', smtp={'host': 'smtp.gmail.com',
- 'port': '587',
- 'tls':True,
- 'user':'verify@choozmo.com',
- 'password':'hlmaxzjnvpeaulhw',
- 'timeout': 30})
- assert r.status_code == 250
- else:
- message = emails.html(html=f"<p>Hi!<br>Here is Choozmo Video<br><a href='{video_path}'>Video</a></p>",
- subject="Choozmo Video Service",
- mail_from=('ChoozMo Video Service', 'verify@choozmo.com'))
- r = message.send(to=email_to, mail_from='verify@choozmo.com', smtp={'host': 'smtp.gmail.com',
- 'port': '587',
- 'tls':True,
- 'user':'verify@choozmo.com',
- 'password':'hlmaxzjnvpeaulhw',
- 'timeout': 30})
- assert r.status_code == 250
-
|