123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- from fastapi import FastAPI,Cookie, Depends, FastAPI, Query, WebSocket, status, WebSocketDisconnect
- from os import listdir
- from os.path import isfile, isdir, join
- import threading
- import zhtts
- import os
- import urllib
- from typing import List
- import requests
- from pydantic import BaseModel
- from bs4 import BeautifulSoup
- from PIL import Image,ImageDraw,ImageFont
- import pyttsx3
- import rpyc
- import random
- import time
- import math
- import hashlib
- import re
- import asyncio
- import urllib.request
- from fastapi.responses import FileResponse
- from websocket import create_connection
- from fastapi.middleware.cors import CORSMiddleware
- import dataset
- from datetime import datetime
- from util.swap_face import swap_face
- from fastapi.staticfiles import StaticFiles
- import shutil
- #test
- app = FastAPI()
- origins = [
- "https://hhh.com.tw"
- "http://172.105.205.52",
- "http://172.105.205.52:8001",
- "http://172.104.93.163",
- ]
- app.add_middleware(
- CORSMiddleware,
- # allow_origins=origins,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- app.mount("/static", StaticFiles(directory="static"), name="static")
- app.mount("/static/img", StaticFiles(directory="static/img"), name="static/img")
- tmp_video_dir = '../OpenshotService/tmp_video/'
- video_dest = '/var/www/html/'
- class swap_req(BaseModel):
- imgurl: str
- class request(BaseModel):
- name: str
- text_content: List[str]
- image_urls: List[str]
- avatar: str
- client_id :str
- @app.get("/index2")
- async def index2():
- return FileResponse('static/index2.html')
- @app.get("/gen_avatar")
- async def gen_avatar():
- return FileResponse('static/gen_avatar.html')
- @app.post("/swapFace")
- async def swapFace(req:swap_req):
- sf = swap_face(req.imgurl)
- result = sf.run()
- #notify_group(result)hi
- return result
- @app.post("/make_anchor_video_v2")
- async def make_anchor_video_v2(req:request):
- for txt in req.text_content:
- if re.search('[a-zA-Z]', txt) !=None:
- return {'msg':'輸入字串不能包含英文字!'}
- name_hash = str(time.time()).replace('.','')
- for imgu in req.image_urls:
- try:
- if get_url_type(imgu) =='video/mp4':
- r=requests.get(imgu)
- else:
- im = Image.open(requests.get(imgu, stream=True).raw)
- im= im.convert("RGB")
- except:
- return {'msg':"無法辨別圖片網址"+imgu}
-
- save_history(req,name_hash)
- x = threading.Thread(target=gen_video, args=(name_hash,req.name, req.text_content, req.image_urls,int(req.avatar),req.client_id))
- x.start()
- return {"msg":"製作影片需要時間,請您耐心等候 稍後可以在www.choozmo.com:8168/"+name_hash+".mp4 中觀看"}
- @app.get("/history_input")
- async def history_input():
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
- statement = 'SELECT * FROM history_input ORDER BY timestamp DESC LIMIT 50'
- logs = []
- for row in db.query(statement):
- logs.append({'id':row['id'],'name':row['name'],'text_content':row['text_content'].split(','),'link':row['link'],'image_urls':row['image_urls'].split(',')})
- return logs
- def save_history(req,name_hash):
- db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
- log_table = db['history_input']
- txt_content_seperate_by_dot = ''
- for txt in req.text_content:
- txt_content_seperate_by_dot += txt+","
- txt_content_seperate_by_dot = txt_content_seperate_by_dot[:-1]
- img_urls_seperate_by_dot = ''
- for iurl in req.image_urls:
- img_urls_seperate_by_dot += iurl+","
- img_urls_seperate_by_dot = img_urls_seperate_by_dot[:-1]
- time_stamp = datetime.fromtimestamp(time.time())
- time_stamp = time_stamp.strftime("%Y-%m-%d %H:%M:%S")
- pk = log_table.insert({'name':req.name,'text_content':txt_content_seperate_by_dot,'image_urls':img_urls_seperate_by_dot,'link':'www.choozmo.com:8168/'+name_hash+'.mp4','timestamp':time_stamp})
-
- def get_url_type(url):
- req = urllib.request.Request(url, method='HEAD', headers={'User-Agent': 'Mozilla/5.0'})
- r = urllib.request.urlopen(req)
- contentType = r.getheader('Content-Type')
- return contentType
-
- def gen_video(name_hash,name,text_content, image_urls,avatar,client_id):
- c = rpyc.connect("localhost", 8878)
- remote_svc = c.root
- my_answer = remote_svc.call_video(name_hash,name,text_content, image_urls,avatar,client_id) # method call
-
- shutil.copy(tmp_video_dir+name_hash+'.mp4',video_dest+name_hash+'.mp4')
- os.remove(tmp_video_dir+name_hash+'.mp4')
|