from fastapi import FastAPI,Cookie, Depends, FastAPI, Query, WebSocket, status, WebSocketDisconnect from os import listdir from os.path import isfile, isdir, join import threading import zhtts import os import urllib from typing import List import requests from pydantic import BaseModel from bs4 import BeautifulSoup from PIL import Image,ImageDraw,ImageFont import pyttsx3 import rpyc import random import time import math import hashlib import re import asyncio import urllib.request from fastapi.responses import FileResponse from websocket import create_connection from fastapi.middleware.cors import CORSMiddleware import dataset from datetime import datetime from util.swap_face import swap_face from fastapi.staticfiles import StaticFiles import shutil #test app = FastAPI() origins = [ "https://hhh.com.tw" "http://172.105.205.52", "http://172.105.205.52:8001", "http://172.104.93.163", ] app.add_middleware( CORSMiddleware, # allow_origins=origins, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.mount("/static", StaticFiles(directory="static"), name="static") app.mount("/static/img", StaticFiles(directory="static/img"), name="static/img") tmp_video_dir = '../OpenshotService/tmp_video/' video_dest = '/var/www/html/' class swap_req(BaseModel): imgurl: str class request(BaseModel): name: str text_content: List[str] image_urls: List[str] avatar: str client_id :str @app.get("/index2") async def index2(): return FileResponse('static/index2.html') @app.get("/gen_avatar") async def gen_avatar(): return FileResponse('static/gen_avatar.html') @app.post("/swapFace") async def swapFace(req:swap_req): sf = swap_face(req.imgurl) result = sf.run() #notify_group(result)hi return result @app.post("/make_anchor_video_v2") async def make_anchor_video_v2(req:request): for txt in req.text_content: if re.search('[a-zA-Z]', txt) !=None: return {'msg':'輸入字串不能包含英文字!'} name_hash = str(time.time()).replace('.','') for imgu in req.image_urls: try: if get_url_type(imgu) =='video/mp4': r=requests.get(imgu) else: im = Image.open(requests.get(imgu, stream=True).raw) im= im.convert("RGB") except: return {'msg':"無法辨別圖片網址"+imgu} save_history(req,name_hash) x = threading.Thread(target=gen_video, args=(name_hash,req.name, req.text_content, req.image_urls,int(req.avatar),req.client_id)) x.start() return {"msg":"製作影片需要時間,請您耐心等候 稍後可以在www.choozmo.com:8168/"+name_hash+".mp4 中觀看"} @app.get("/history_input") async def history_input(): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') statement = 'SELECT * FROM history_input ORDER BY timestamp DESC LIMIT 50' logs = [] for row in db.query(statement): logs.append({'id':row['id'],'name':row['name'],'text_content':row['text_content'].split(','),'link':row['link'],'image_urls':row['image_urls'].split(',')}) return logs def save_history(req,name_hash): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') log_table = db['history_input'] txt_content_seperate_by_dot = '' for txt in req.text_content: txt_content_seperate_by_dot += txt+"," txt_content_seperate_by_dot = txt_content_seperate_by_dot[:-1] img_urls_seperate_by_dot = '' for iurl in req.image_urls: img_urls_seperate_by_dot += iurl+"," img_urls_seperate_by_dot = img_urls_seperate_by_dot[:-1] time_stamp = datetime.fromtimestamp(time.time()) time_stamp = time_stamp.strftime("%Y-%m-%d %H:%M:%S") pk = log_table.insert({'name':req.name,'text_content':txt_content_seperate_by_dot,'image_urls':img_urls_seperate_by_dot,'link':'www.choozmo.com:8168/'+name_hash+'.mp4','timestamp':time_stamp}) def get_url_type(url): req = urllib.request.Request(url, method='HEAD', headers={'User-Agent': 'Mozilla/5.0'}) r = urllib.request.urlopen(req) contentType = r.getheader('Content-Type') return contentType def gen_video(name_hash,name,text_content, image_urls,avatar,client_id): c = rpyc.connect("localhost", 8878) remote_svc = c.root my_answer = remote_svc.call_video(name_hash,name,text_content, image_urls,avatar,client_id) # method call shutil.copy(tmp_video_dir+name_hash+'.mp4',video_dest+name_hash+'.mp4') os.remove(tmp_video_dir+name_hash+'.mp4')