+from fastapi import FastAPI,Cookie, Depends, FastAPI, Query, WebSocket, status, WebSocketDisconnect
+import openshot
+from os import listdir
+from os.path import isfile, isdir, join
+import threading
+import zhtts
+import os
+import urllib
+from typing import List
+import requests
+from pydantic import BaseModel
+from bs4 import BeautifulSoup
+from PIL import Image,ImageDraw,ImageFont
+import pyttsx3
+import rpyc
+import random
+import time
+import math
+import hashlib
+import re
+import asyncio
+import urllib.request
+from fastapi.responses import FileResponse
+from websocket import create_connection
+from fastapi.middleware.cors import CORSMiddleware
+import dataset
+from datetime import datetime
+from util.swap_face import swap_face
+from fastapi.staticfiles import StaticFiles
+app = FastAPI()
+origins = [
+ "https://hhh.com.tw"
+ "",
+ "",
+ "",
+ CORSMiddleware,
+ # allow_origins=origins,
+ allow_origins=["*"],
+ allow_credentials=True,
+ allow_methods=["*"],
+ allow_headers=["*"],
+app.mount("/static", StaticFiles(directory="static"), name="static")
+app.mount("/static/img", StaticFiles(directory="static/img"), name="static/img")
+dir_sound = 'mp3_track/'
+dir_photo = 'photo/'
+dir_text = 'text_file/'
+dir_video = 'video_material/'
+dir_title = 'title/'
+dir_subtitle = 'subtitle/'
+dir_anchor = 'anchor_raw/'
+class swap_req(BaseModel):
+ imgurl: str
+class request(BaseModel):
+ name: str
+ text_content: List[str]
+ image_urls: List[str]
+ avatar: str
+ client_id :str
+class ConnectionManager:
+ def __init__(self):
+ self.active_connections: List[WebSocket] = []
+ async def connect(self, websocket: WebSocket):
+ await websocket.accept()
+ self.active_connections.append(websocket)
+ def disconnect(self, websocket: WebSocket):
+ self.active_connections.remove(websocket)
+ async def send_personal_message(self, message: str, websocket: WebSocket):
+ await websocket.send_text(message)
+ async def broadcast(self, message: str):
+ for connection in self.active_connections:
+ await connection.send_text(message)
+async def root():
+ return {"message": "Hello, this is index"}
+async def index2():
+ return FileResponse('static/index2.html')
+async def index2():
+ return FileResponse('gen_avatar.html')
+async def swapFace(req:swap_req):
+ sf = swap_face(req.imgurl)
+ result = sf.run()
+ #notify_group(result)hi
+ return result
+async def make_anchor_video_v2(req:request):
+ for txt in req.text_content:
+ if re.search('[a-zA-Z]', txt) !=None:
+ return {'msg':'輸入字串不能包含英文字!'}
+ name_hash = str(time.time()).replace('.','')
+ for imgu in req.image_urls:
+ try:
+ if get_url_type(imgu) =='video/mp4':
+ r=requests.get(imgu)
+ f=open(dir_photo+name_hash+"/"+str(img_num)+".mp4",'wb')
+ else:
+ im = Image.open(requests.get(imgu, stream=True).raw)
+ im= im.convert("RGB")
+ except:
+ return {'msg':"無法辨別圖片網址"+imgu}
+ save_history(req,name_hash)
+ x = threading.Thread(target=anchor_video_v2, args=(name_hash,req.name, req.text_content, req.image_urls,int(req.avatar),req.client_id))
+ x.start()
+ return {"msg":"製作影片需要時間,請您耐心等候 稍後可以在www.choozmo.com:8168/"+name_hash+".mp4 中觀看"}
+manager = ConnectionManager()
+async def websocket_endpoint(websocket: WebSocket, client_id: int):
+ await manager.connect(websocket)
+ try:
+ while True:
+ data = await websocket.receive_text()
+ await manager.send_personal_message(data, websocket)
+ await manager.broadcast(data)
+ except WebSocketDisconnect:
+ manager.disconnect(websocket)
+async def history_input():
+ db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
+ statement = 'SELECT * FROM history_input ORDER BY timestamp DESC LIMIT 50'
+ logs = []
+ for row in db.query(statement):
+ logs.append({'id':row['id'],'name':row['name'],'text_content':row['text_content'].split(','),'link':row['link'],'image_urls':row['image_urls'].split(',')})
+ return logs
+def notify_group(msg):
+ glist=['7vilzohcyQMPLfAMRloUawiTV4vtusZhxv8Czo7AJX8','WekCRfnAirSiSxALiD6gcm0B56EejsoK89zFbIaiZQD','1dbtJHbWVbrooXmQqc4r8OyRWDryjD4TMJ6DiDsdgsX']
+ for gid in glist:
+ headers = {
+ "Authorization": "Bearer " + gid,
+ "Content-Type": "application/x-www-form-urlencoded"
+ }
+ params = {"message": msg}
+ r = requests.post("https://notify-api.line.me/api/notify",headers=headers, params=params)
+def save_history(req,name_hash):
+ db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4')
+ log_table = db['history_input']
+ txt_content_seperate_by_dot = ''
+ for txt in req.text_content:
+ txt_content_seperate_by_dot += txt+","
+ txt_content_seperate_by_dot = txt_content_seperate_by_dot[:-1]
+ img_urls_seperate_by_dot = ''
+ for iurl in req.image_urls:
+ img_urls_seperate_by_dot += iurl+","
+ img_urls_seperate_by_dot = img_urls_seperate_by_dot[:-1]
+ time_stamp = datetime.fromtimestamp(time.time())
+ time_stamp = time_stamp.strftime("%Y-%m-%d %H:%M:%S")
+ pk = log_table.insert({'name':req.name,'text_content':txt_content_seperate_by_dot,'image_urls':img_urls_seperate_by_dot,'link':'www.choozmo.com:8168/'+name_hash+'.mp4','timestamp':time_stamp})
+def cKey(r,g,b,fuzz):
+ col=openshot.Color()
+ col.red=openshot.Keyframe(r)
+ col.green=openshot.Keyframe(g)
+ col.blue=openshot.Keyframe(b)
+ return openshot.ChromaKey(col, openshot.Keyframe(fuzz))
+def video_photo_clip(vid=None,layer=None, position=None, end=None
+ ,scale_x=1,scale_y=1,location_x=0,location_y=0,ck=None,audio=True):
+ clip = openshot.Clip(vid)
+ clip.Layer(layer)
+ clip.Position(position)
+ clip.End(end)
+ clip.scale_x=openshot.Keyframe(scale_x)
+ clip.scale_y=openshot.Keyframe(scale_y)
+ clip.location_x=openshot.Keyframe(location_x)
+ clip.location_y=openshot.Keyframe(location_y)
+ if ck!=None:
+ clip.AddEffect(ck)
+ if audio==True:
+ clip.has_audio=openshot.Keyframe(1)
+ else:
+ clip.has_audio=openshot.Keyframe(0)
+ return clip
+def myunichchar(unicode_char):
+ mb_string = unicode_char.encode('big5')
+ try:
+ unicode_char = unichr(ord(mb_string[0]) << 8 | ord(mb_string[1]))
+ except NameError:
+ unicode_char = chr(mb_string[0] << 8 | mb_string[1])
+ return unicode_char
+def file_prepare(name, name_hash,text_content,image_urls):
+ #save image
+ try:
+ os.mkdir(dir_photo+name_hash)
+ except FileExistsError:
+ print("Directory " , dir_photo+name_hash , " already exists")
+ img_num = 1
+ for imgu in image_urls:
+ im = Image.open(requests.get(imgu, stream=True).raw)
+ im.save(dir_photo+name_hash+"/"+str(img_num)+".jpg")
+ img_num+=1
+ #save text
+ text_file = open(dir_text+name_hash+".txt", "w")
+ text_file.write(text_content)
+ text_file.close()
+ print("text file made")
+ #make mp3
+ tts = zhtts.TTS()
+ tts.text2wav(text_content,dir_sound+name_hash+".mp3")
+ print("mp3 file made")
+ #make title as image
+ txt2image(name, dir_title+name_hash+".png")
+def get_url_type(url):
+ req = urllib.request.Request(url, method='HEAD', headers={'User-Agent': 'Mozilla/5.0'})
+ r = urllib.request.urlopen(req)
+ contentType = r.getheader('Content-Type')
+ return contentType
+def downloadfile(name,url):
+ name=name+".mp4"
+def make_dir(name_hash):
+ #save image
+ try:
+ os.mkdir(dir_photo+name_hash)
+ except FileExistsError:
+ print("~~~~~~Warning~~~~~~~~~Directory " , dir_photo+name_hash , " already exists")
+ try:
+ os.mkdir(dir_text+name_hash)
+ except FileExistsError:
+ print("~~~~~~Warning~~~~~~~~~Directory " , dir_text+name_hash , " already exists")
+ try:
+ os.mkdir(dir_sound+name_hash)
+ except FileExistsError:
+ print("~~~~~~Warning~~~~~~~~~Directory " , dir_sound+name_hash , " already exists")
+ try:
+ os.mkdir(dir_video+name_hash)
+ except FileExistsError:
+ print("~~~~~~Warning~~~~~~~~~Directory " , dir_video+name_hash , " already exists")
+ try:
+ os.mkdir(dir_anchor+name_hash)
+ except FileExistsError:
+ print("~~~~~~Warning~~~~~~~~~Directory " , dir_anchor+name_hash , " already exists")
+ try:
+ os.mkdir(dir_subtitle+name_hash)
+ except FileExistsError:
+ print("~~~~~~Warning~~~~~~~~~Directory " , dir_subtitle+name_hash , " already exists")
+def file_prepare_v2(name, name_hash,text_content,image_urls):
+ make_dir(name_hash)
+ img_num = 1
+ for imgu in image_urls:
+ if get_url_type(imgu) =='video/mp4':
+ r=requests.get(imgu)
+ f=open(dir_photo+name_hash+"/"+str(img_num)+".mp4",'wb')
+ for chunk in r.iter_content(chunk_size=255):
+ if chunk:
+ f.write(chunk)
+ f.close()
+ else:
+ im = Image.open(requests.get(imgu, stream=True).raw)
+ im= im.convert("RGB")
+ im.save(dir_photo+name_hash+"/"+str(img_num)+".jpg")
+ img_num+=1
+ #save text
+ txt_idx=0
+ for txt in text_content:
+ text_file = open(dir_text+name_hash+"/"+str(txt_idx)+".txt", "w")
+ text_file.write(txt)
+ text_file.close()
+ txt_idx+=1
+ print("text file made")
+ #make mp3
+ language = 'zh-tw'
+ txt_idx = 0
+ for txt in text_content:
+ tts = zhtts.TTS()
+ tts.text2wav(txt,dir_sound+name_hash+"/"+str(txt_idx)+".mp3")
+ txt_idx+=1
+ print("mp3 file made")
+ #make title as image
+ txt2image_title(name, dir_title+name_hash+".png")
+def txt2image(content, save_target):
+ unicode_text = trim_punctuation(content)
+ font = ImageFont.truetype(font="font/DFT_B7.ttc", size=38)
+ text_width, text_height = font.getsize(unicode_text)
+ canvas = Image.new('RGBA', (700, 500), (255, 0, 0, 0) )
+ draw = ImageDraw.Draw(canvas)
+ text= unicode_text
+ draw.text((5,5), text, (255, 255, 0), font)
+ canvas.save(save_target, "PNG")
+def txt2image_title(content, save_target):
+ unicode_text = trim_punctuation(content)
+ font = ImageFont.truetype(font="font/DFT_B7.ttc", size=28)
+ text_width, text_height = font.getsize(unicode_text)
+ canvas = Image.new('RGBA', (510, 500), (255, 0, 0, 0) )
+ draw = ImageDraw.Draw(canvas)
+ text= unicode_text
+ draw.text((5,5), text, (17, 41, 167), font)
+ canvas.save(save_target, "PNG")
+def txt2image_title(content, save_target):
+ unicode_text =content
+ font = ImageFont.truetype("font.ttf", 23,encoding='big5')
+ text_width, text_height = font.getsize(unicode_text)
+ canvas = Image.new('RGBA', (500, 500), (255, 0, 0, 0) )
+ draw = ImageDraw.Draw(canvas)
+ text=''
+ for c in unicode_text:
+ if len(re.findall(r'[\u4e00-\u9fff]+', c))>0:
+ text+=myunichchar(c)
+ else:
+ text+=c
+ draw.text((5,5), text, (17, 41, 167), font)
+ canvas.save(save_target, "PNG")
+def call_anchor(fileName,avatar):
+ conn = rpyc.classic.connect("",18812)
+ ros = conn.modules.os
+ rsys = conn.modules.sys
+ fr=open(dir_sound+fileName+".mp3",'rb')# voice
+ #warning!!! file my be replaced by other process
+ fw=conn.builtins.open('/tmp/output.mp3','wb')
+ while True:
+ b=fr.read(1024)
+ if b:
+ fw.write(b)
+ else:
+ break
+ fr.close()
+ fw.close()
+ val=random.randint(1000000,9999999)
+ ros.chdir('/home/jared/to_video')
+ ros.system('./p'+str(avatar)+'.sh '+str(val)+' &')
+ while True:
+ print('waiting...')
+ if ros.path.exists('/tmp/results/'+str(val)):
+ break
+ time.sleep(5)
+ print('waiting...')
+ fr=conn.builtins.open('/tmp/results/'+str(val)+'.mp4','rb')
+ fw=open(dir_anchor+fileName+".mp4",'wb')#peggy1_1
+ while True:
+ b=fr.read(1024)
+ if b:
+ fw.write(b)
+ else:
+ break
+ fr.close()
+ fw.close()
+def trim_punctuation(s):
+ pat_block = u'[^\u4e00-\u9fff0-9a-zA-Z]+';
+ pattern = u'([0-9]+{0}[0-9]+)|{0}'.format(pat_block)
+ res = re.sub(pattern, lambda x: x.group(1) if x.group(1) else u"" ,s)
+ return res
+def splitter(s):
+ for sent in re.findall(u'[^!?,。\!\?]+[!?。\!\?]?', s, flags=re.U):
+ yield sent
+def split_by_pun(s):
+ res = list(splitter(s))
+ return res
+def generate_subtitle_image(name_hash,text_content):
+ img_list = [None]*len(text_content)
+ for idx in range(len(text_content)):
+ img_list[idx]=[]
+ senList = split_by_pun(text_content[idx])
+ for inner_idx in range(len(senList)):
+ sv_path = dir_subtitle + name_hash +'/'+str(idx)+ str(inner_idx) +'.png'
+ sub = senList[inner_idx]
+ txt2image(sub,sv_path)
+ img_list[idx]+=[{"count":len(sub),"path":sv_path}]
+ return img_list
+async def sendProgress(progress,client_id):
+ ws = create_connection("ws://www.choozmo.com:8888/progress/"+client_id)
+ ws.send(str(progress))
+ ws.close()
+def anchor_video_v2(name_hash,name,text_content, image_urls,avatar,client_id):
+ progress = 0
+ asyncio.run(sendProgress(progress,client_id))
+ print('sub image made')
+ file_prepare_v2(name, name_hash, text_content,image_urls)
+ progress = 20
+ asyncio.run(sendProgress(progress,client_id))
+ sub_list=generate_subtitle_image(name_hash,text_content)
+ progress = 30
+ asyncio.run(sendProgress(progress,client_id))
+ progress_per_video = int(40/len(text_content))
+ for fname in range(len(text_content)):
+ call_anchor(name_hash+"/"+str(fname),avatar)
+ progress += progress_per_video
+ print('step finish')
+ asyncio.run(sendProgress(progress,client_id))
+ print('called............................................')
+ ck=cKey(0,254,0,270)
+ ck_anchor=cKey(0,255,1,320)
+ duration = 0
+ #average layer level is 3
+ t = openshot.Timeline(1280, 720, openshot.Fraction(30000, 1000), 44100, 2, openshot.LAYOUT_STEREO)
+ t.Open()
+ main_timer = 0
+ LOGO_OP = openshot.FFmpegReader(dir_video+"LOGO_OP.mp4")
+ LOGO_OP.Open() # Open the reader
+ LOGO_OP_clip = video_photo_clip(vid=LOGO_OP,layer=4,position=0,end=LOGO_OP.info.duration
+ ,location_y=-0.03,scale_x=0.8,scale_y=0.704)
+ t.AddClip(LOGO_OP_clip)
+ bg_head = openshot.FFmpegReader(dir_video+"bg_head.avi")
+ bg_head.Open()
+ bg_head_clip = video_photo_clip(vid=bg_head,layer=2,position=0,end=LOGO_OP.info.duration,ck=ck)
+ t.AddClip(bg_head_clip)
+ main_timer += LOGO_OP.info.duration
+ head_duration = LOGO_OP.info.duration
+ bg_head.Close()
+ LOGO_OP.Close()
+ progress += 10
+ clip_duration=0
+ photo_clip_list = [None]*len(text_content)
+ img_list = [None]*len(text_content)
+ anchor_clip_list = [None] * len(text_content)
+ anchor_list = [None] * len(text_content)
+ audio_clip_list = [None] * len(text_content)
+ audio_list = [None] * len(text_content)
+ sub_clip_list = [None] * len(text_content)
+ sub_img_list = [None] * len(text_content)
+ idx = 0
+ for p in listdir(dir_photo+name_hash):
+ anchor_list[idx] = openshot.FFmpegReader(dir_anchor+name_hash+"/"+str(idx)+".mp4")
+ clip_duration = anchor_list[idx].info.duration
+ anchor_list[idx].Open()
+ anchor_clip_list[idx] = video_photo_clip(vid=anchor_list[idx],layer=4,scale_x=0.65,scale_y=0.65,
+ location_x=0.35,location_y=0.25,position=main_timer, end=clip_duration,ck=ck_anchor,audio=False)
+ t.AddClip(anchor_clip_list[idx])
+ img_list[idx] = openshot.FFmpegReader(dir_photo+name_hash+'/'+p)
+ img_list[idx].Open()
+ photo_clip_list[idx] = video_photo_clip(vid=img_list[idx],layer=3
+ ,scale_x=0.81,scale_y=0.68,location_y=-0.03,position=main_timer,end=clip_duration,audio=False)
+ t.AddClip(photo_clip_list[idx])
+ img_list[idx].Close()
+ audio_list[idx] = openshot.FFmpegReader(dir_sound+name_hash+"/"+str(idx)+".mp3")
+ audio_list[idx].Open()
+ audio_clip_list[idx] = openshot.Clip(audio_list[idx])
+ audio_clip_list[idx].Position(main_timer)
+ audio_clip_list[idx].End(clip_duration)
+ t.AddClip(audio_clip_list[idx])
+ img_list[idx].Close()
+ anchor_list[idx].Close()
+ audio_list[idx].Close()
+ sub_img_list[idx] = [None] * len(sub_list[idx])
+ sub_clip_list[idx] = [None] * len(sub_list[idx])
+ sub_timer = 0
+ for sub_idx in range(len(sub_list[idx])):
+ sub_img_list[idx][sub_idx] = openshot.QtImageReader(sub_list[idx][sub_idx]['path'])
+ sub_img_list[idx][sub_idx].Open()
+ sub_duration = 0.205*sub_list[idx][sub_idx]['count']
+ sub_clip_list[idx][sub_idx] = video_photo_clip(vid=sub_img_list[idx][sub_idx], layer=6,location_x=0.069, location_y=0.89,position=main_timer+sub_timer,end=sub_duration)
+ t.AddClip(sub_clip_list[idx][sub_idx])
+ sub_img_list[idx][sub_idx].Close()
+ sub_timer += sub_duration
+ print(sub_list[idx][sub_idx]['path'])
+ main_timer += clip_duration
+ idx+=1
+ progress+=10
+ asyncio.run(sendProgress(progress,client_id))
+ LOGO_ED = openshot.FFmpegReader(dir_video+"LOGO_ED.avi")
+ LOGO_ED.Open()
+ LOGO_ED_clip = video_photo_clip(vid=LOGO_ED,layer=4,position=main_timer,end=LOGO_ED.info.duration+2
+ ,location_x=0.005,location_y=-0.031
+ ,scale_x=0.8,scale_y=0.6825)
+ t.AddClip(LOGO_ED_clip)
+ ED_duration = LOGO_ED.info.duration
+ LOGO_ED.Close()
+ bg = openshot.FFmpegReader(dir_video+"bg.mp4")
+ bg.Open()
+ bg_times = math.floor(main_timer+ED_duration/bg.info.duration)
+ left_time = (main_timer+ED_duration) % bg.info.duration
+ bg_clip_list = [None] * bg_times
+ bg_list = [None] * bg_times
+ bg.Close()
+ bg_timer = head_duration
+ for idx in range(bg_times):
+ bg_list[idx] = openshot.FFmpegReader(dir_video+"bg.mp4")
+ bg_list[idx].Open()
+ bg_clip_list[idx] = video_photo_clip(bg_list[idx],layer=2,position=bg_timer
+ ,end=bg_list[idx].info.duration,ck=ck)
+ t.AddClip(bg_clip_list[idx])
+ bg_timer += bg_list[idx].info.duration
+ bg_list[idx].Close()
+ bg_left = openshot.FFmpegReader(dir_video+"bg.mp4")
+ bg_left.Open()
+ bg_left_clip = video_photo_clip(bg_left,layer=2,position=bg_timer,end=left_time,ck=ck)
+ t.AddClip(bg_left_clip)
+ bg_left.Close()
+ title = openshot.QtImageReader(dir_title+name_hash+".png")
+ title.Open() # Open the reader
+ title_clip = video_photo_clip(vid=title, layer=4,location_x=-0.047, location_y=0.801,position=0,end=head_duration+main_timer)
+ t.AddClip(title_clip)
+ ####start building
+ w = openshot.FFmpegWriter("../html/"+name_hash+".mp4")
+ w.SetAudioOptions(True, "aac", 44100, 2, openshot.LAYOUT_STEREO, 3000000)
+ w.SetVideoOptions(True, "libx264", openshot.Fraction(30000, 1000), 1280, 720,
+ openshot.Fraction(1, 1), False, False, 3000000)
+ w.Open()
+ #may change duration into t.info.duration
+ frames = int(t.info.fps)*int(head_duration+main_timer+ED_duration)
+ for n in range(frames):
+ f=t.GetFrame(n)
+ w.WriteFrame(f)
+ progress = 100
+ asyncio.run(sendProgress(progress,client_id))
+ notify_group(name+"的影片已經產生完成囉! www.choozmo.com:8168/"+name_hash+".mp4")
+ t.Close()
+ w.Close()
+ progress = 100
+ asyncio.run(sendProgress(progress,client_id))
+ print("Raw Video done")
+ print("video at : www.choozmo.com:8168/"+name_hash+".mp4")
+ #line notifs