from fastapi import FastAPI,Cookie, Depends, FastAPI, Query, WebSocket, status, WebSocketDisconnect from os import listdir from os.path import isfile, isdir, join import threading import zhtts import os import urllib from typing import List import requests from pydantic import BaseModel from bs4 import BeautifulSoup from PIL import Image,ImageDraw,ImageFont import pyttsx3 import rpyc import random import time import math import hashlib import re import asyncio import urllib.request from fastapi.responses import FileResponse from websocket import create_connection from fastapi.middleware.cors import CORSMiddleware import dataset from datetime import datetime from util.swap_face import swap_face from fastapi.staticfiles import StaticFiles #service nginx restart #uvicorn main:app --host="0.0.0.0" --reload --port 8878 app = FastAPI() origins = [ "https://hhh.com.tw" "http://172.105.205.52", "http://172.105.205.52:8001", "http://172.104.93.163", ] app.add_middleware( CORSMiddleware, # allow_origins=origins, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.mount("/static", StaticFiles(directory="static"), name="static") app.mount("/static/img", StaticFiles(directory="static/img"), name="static/img") dir_sound = 'mp3_track/' dir_photo = 'photo/' dir_text = 'text_file/' dir_video = 'video_material/' dir_title = 'title/' dir_subtitle = 'subtitle/' dir_anchor = 'anchor_raw/' class swap_req(BaseModel): imgurl: str class request(BaseModel): name: str text_content: List[str] image_urls: List[str] avatar: str client_id :str class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) async def send_personal_message(self, message: str, websocket: WebSocket): await websocket.send_text(message) async def broadcast(self, message: str): for connection in self.active_connections: await connection.send_text(message) @app.get("/") async def root(): return {"message": "Hello, this is index"} @app.get("/index2") async def index2(): return FileResponse('static/index2.html') @app.get("/gen_avatar") async def index2(): return FileResponse('gen_avatar.html') @app.post("/swapFace") async def swapFace(req:swap_req): sf = swap_face(req.imgurl) result = sf.run() #notify_group(result)hi return result @app.post("/make_anchor_video_v2") async def make_anchor_video_v2(req:request): for txt in req.text_content: if re.search('[a-zA-Z]', txt) !=None: return {'msg':'輸入字串不能包含英文字!'} name_hash = str(time.time()).replace('.','') for imgu in req.image_urls: try: if get_url_type(imgu) =='video/mp4': r=requests.get(imgu) f=open(dir_photo+name_hash+"/"+str(img_num)+".mp4",'wb') else: im = Image.open(requests.get(imgu, stream=True).raw) im= im.convert("RGB") except: return {'msg':"無法辨別圖片網址"+imgu} save_history(req,name_hash) x = threading.Thread(target=anchor_video_v2, args=(name_hash,req.name, req.text_content, req.image_urls,int(req.avatar),req.client_id)) x.start() return {"msg":"製作影片需要時間,請您耐心等候 稍後可以在www.choozmo.com:8168/"+name_hash+".mp4 中觀看"} manager = ConnectionManager() @app.websocket("/progress/{client_id}") async def websocket_endpoint(websocket: WebSocket, client_id: int): await manager.connect(websocket) try: while True: data = await websocket.receive_text() await manager.send_personal_message(data, websocket) await manager.broadcast(data) except WebSocketDisconnect: manager.disconnect(websocket) @app.get("/history_input") async def history_input(): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') statement = 'SELECT * FROM history_input ORDER BY timestamp DESC LIMIT 50' logs = [] for row in db.query(statement): logs.append({'id':row['id'],'name':row['name'],'text_content':row['text_content'].split(','),'link':row['link'],'image_urls':row['image_urls'].split(',')}) return logs def notify_group(msg): glist=['7vilzohcyQMPLfAMRloUawiTV4vtusZhxv8Czo7AJX8','WekCRfnAirSiSxALiD6gcm0B56EejsoK89zFbIaiZQD','1dbtJHbWVbrooXmQqc4r8OyRWDryjD4TMJ6DiDsdgsX'] for gid in glist: headers = { "Authorization": "Bearer " + gid, "Content-Type": "application/x-www-form-urlencoded" } params = {"message": msg} r = requests.post("https://notify-api.line.me/api/notify",headers=headers, params=params) def save_history(req,name_hash): db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') log_table = db['history_input'] txt_content_seperate_by_dot = '' for txt in req.text_content: txt_content_seperate_by_dot += txt+"," txt_content_seperate_by_dot = txt_content_seperate_by_dot[:-1] img_urls_seperate_by_dot = '' for iurl in req.image_urls: img_urls_seperate_by_dot += iurl+"," img_urls_seperate_by_dot = img_urls_seperate_by_dot[:-1] time_stamp = datetime.fromtimestamp(time.time()) time_stamp = time_stamp.strftime("%Y-%m-%d %H:%M:%S") pk = log_table.insert({'name':req.name,'text_content':txt_content_seperate_by_dot,'image_urls':img_urls_seperate_by_dot,'link':'www.choozmo.com:8168/'+name_hash+'.mp4','timestamp':time_stamp}) def cKey(r,g,b,fuzz): col=openshot.Color() col.red=openshot.Keyframe(r) col.green=openshot.Keyframe(g) col.blue=openshot.Keyframe(b) return openshot.ChromaKey(col, openshot.Keyframe(fuzz)) def video_photo_clip(vid=None,layer=None, position=None, end=None ,scale_x=1,scale_y=1,location_x=0,location_y=0,ck=None,audio=True): clip = openshot.Clip(vid) clip.Layer(layer) clip.Position(position) clip.End(end) clip.scale_x=openshot.Keyframe(scale_x) clip.scale_y=openshot.Keyframe(scale_y) clip.location_x=openshot.Keyframe(location_x) clip.location_y=openshot.Keyframe(location_y) if ck!=None: clip.AddEffect(ck) if audio==True: clip.has_audio=openshot.Keyframe(1) else: clip.has_audio=openshot.Keyframe(0) return clip def myunichchar(unicode_char): mb_string = unicode_char.encode('big5') try: unicode_char = unichr(ord(mb_string[0]) << 8 | ord(mb_string[1])) except NameError: unicode_char = chr(mb_string[0] << 8 | mb_string[1]) return unicode_char def file_prepare(name, name_hash,text_content,image_urls): #save image try: os.mkdir(dir_photo+name_hash) except FileExistsError: print("Directory " , dir_photo+name_hash , " already exists") img_num = 1 for imgu in image_urls: im = Image.open(requests.get(imgu, stream=True).raw) im.save(dir_photo+name_hash+"/"+str(img_num)+".jpg") img_num+=1 #save text text_file = open(dir_text+name_hash+".txt", "w") text_file.write(text_content) text_file.close() print("text file made") #make mp3 tts = zhtts.TTS() tts.text2wav(text_content,dir_sound+name_hash+".mp3") print("mp3 file made") #make title as image txt2image(name, dir_title+name_hash+".png") def get_url_type(url): req = urllib.request.Request(url, method='HEAD', headers={'User-Agent': 'Mozilla/5.0'}) r = urllib.request.urlopen(req) contentType = r.getheader('Content-Type') return contentType def downloadfile(name,url): name=name+".mp4" def make_dir(name_hash): #save image try: os.mkdir(dir_photo+name_hash) except FileExistsError: print("~~~~~~Warning~~~~~~~~~Directory " , dir_photo+name_hash , " already exists") try: os.mkdir(dir_text+name_hash) except FileExistsError: print("~~~~~~Warning~~~~~~~~~Directory " , dir_text+name_hash , " already exists") try: os.mkdir(dir_sound+name_hash) except FileExistsError: print("~~~~~~Warning~~~~~~~~~Directory " , dir_sound+name_hash , " already exists") try: os.mkdir(dir_video+name_hash) except FileExistsError: print("~~~~~~Warning~~~~~~~~~Directory " , dir_video+name_hash , " already exists") try: os.mkdir(dir_anchor+name_hash) except FileExistsError: print("~~~~~~Warning~~~~~~~~~Directory " , dir_anchor+name_hash , " already exists") try: os.mkdir(dir_subtitle+name_hash) except FileExistsError: print("~~~~~~Warning~~~~~~~~~Directory " , dir_subtitle+name_hash , " already exists") def file_prepare_v2(name, name_hash,text_content,image_urls): make_dir(name_hash) img_num = 1 for imgu in image_urls: if get_url_type(imgu) =='video/mp4': r=requests.get(imgu) f=open(dir_photo+name_hash+"/"+str(img_num)+".mp4",'wb') for chunk in r.iter_content(chunk_size=255): if chunk: f.write(chunk) f.close() else: im = Image.open(requests.get(imgu, stream=True).raw) im= im.convert("RGB") im.save(dir_photo+name_hash+"/"+str(img_num)+".jpg") img_num+=1 #save text txt_idx=0 for txt in text_content: text_file = open(dir_text+name_hash+"/"+str(txt_idx)+".txt", "w") text_file.write(txt) text_file.close() txt_idx+=1 print("text file made") #make mp3 language = 'zh-tw' txt_idx = 0 for txt in text_content: tts = zhtts.TTS() tts.text2wav(txt,dir_sound+name_hash+"/"+str(txt_idx)+".mp3") txt_idx+=1 print("mp3 file made") #make title as image txt2image_title(name, dir_title+name_hash+".png") def txt2image(content, save_target): unicode_text = trim_punctuation(content) font = ImageFont.truetype(font="font/DFT_B7.ttc", size=38) text_width, text_height = font.getsize(unicode_text) canvas = Image.new('RGBA', (700, 500), (255, 0, 0, 0) ) draw = ImageDraw.Draw(canvas) text= unicode_text draw.text((5,5), text, (255, 255, 0), font) canvas.save(save_target, "PNG") def txt2image_title(content, save_target): unicode_text = trim_punctuation(content) font = ImageFont.truetype(font="font/DFT_B7.ttc", size=28) text_width, text_height = font.getsize(unicode_text) canvas = Image.new('RGBA', (510, 500), (255, 0, 0, 0) ) draw = ImageDraw.Draw(canvas) text= unicode_text draw.text((5,5), text, (17, 41, 167), font) canvas.save(save_target, "PNG") ''' def txt2image_title(content, save_target): unicode_text =content font = ImageFont.truetype("font.ttf", 23,encoding='big5') text_width, text_height = font.getsize(unicode_text) canvas = Image.new('RGBA', (500, 500), (255, 0, 0, 0) ) draw = ImageDraw.Draw(canvas) text='' for c in unicode_text: if len(re.findall(r'[\u4e00-\u9fff]+', c))>0: text+=myunichchar(c) else: text+=c draw.text((5,5), text, (17, 41, 167), font) canvas.save(save_target, "PNG") ''' def call_anchor(fileName,avatar): conn = rpyc.classic.connect("192.168.1.105",18812) ros = conn.modules.os rsys = conn.modules.sys fr=open(dir_sound+fileName+".mp3",'rb')# voice #warning!!! file my be replaced by other process fw=conn.builtins.open('/tmp/output.mp3','wb') while True: b=fr.read(1024) if b: fw.write(b) else: break fr.close() fw.close() val=random.randint(1000000,9999999) ros.chdir('/home/jared/to_video') ros.system('./p'+str(avatar)+'.sh '+str(val)+' &') while True: print('waiting...') if ros.path.exists('/tmp/results/'+str(val)): break time.sleep(5) print('waiting...') fr=conn.builtins.open('/tmp/results/'+str(val)+'.mp4','rb') fw=open(dir_anchor+fileName+".mp4",'wb')#peggy1_1 while True: b=fr.read(1024) if b: fw.write(b) else: break fr.close() fw.close() def trim_punctuation(s): pat_block = u'[^\u4e00-\u9fff0-9a-zA-Z]+'; pattern = u'([0-9]+{0}[0-9]+)|{0}'.format(pat_block) res = re.sub(pattern, lambda x: x.group(1) if x.group(1) else u"" ,s) return res def splitter(s): for sent in re.findall(u'[^!?,。\!\?]+[!?。\!\?]?', s, flags=re.U): yield sent def split_by_pun(s): res = list(splitter(s)) return res def generate_subtitle_image(name_hash,text_content): img_list = [None]*len(text_content) for idx in range(len(text_content)): img_list[idx]=[] senList = split_by_pun(text_content[idx]) for inner_idx in range(len(senList)): sv_path = dir_subtitle + name_hash +'/'+str(idx)+ str(inner_idx) +'.png' sub = senList[inner_idx] txt2image(sub,sv_path) img_list[idx]+=[{"count":len(sub),"path":sv_path}] return img_list async def sendProgress(progress,client_id): ws = create_connection("ws://192.168.1.106:8887/progress/"+client_id) ws.send(str(progress)) ws.close() def anchor_video_v2(name_hash,name,text_content, image_urls,avatar,client_id): progress = 0 asyncio.run(sendProgress(progress,client_id)) print('sub image made') file_prepare_v2(name, name_hash, text_content,image_urls) progress = 20 asyncio.run(sendProgress(progress,client_id)) sub_list=generate_subtitle_image(name_hash,text_content) progress = 30 asyncio.run(sendProgress(progress,client_id)) progress_per_video = int(40/len(text_content)) for fname in range(len(text_content)): call_anchor(name_hash+"/"+str(fname),avatar) progress += progress_per_video print('step finish') asyncio.run(sendProgress(progress,client_id)) print('called............................................') ck=cKey(0,254,0,270) ck_anchor=cKey(0,255,1,320) duration = 0 #average layer level is 3 t = openshot.Timeline(1280, 720, openshot.Fraction(30000, 1000), 44100, 2, openshot.LAYOUT_STEREO) t.Open() main_timer = 0 LOGO_OP = openshot.FFmpegReader(dir_video+"LOGO_OP.mp4") LOGO_OP.Open() # Open the reader LOGO_OP_clip = video_photo_clip(vid=LOGO_OP,layer=4,position=0,end=LOGO_OP.info.duration ,location_y=-0.03,scale_x=0.8,scale_y=0.704) t.AddClip(LOGO_OP_clip) bg_head = openshot.FFmpegReader(dir_video+"bg_head.avi") bg_head.Open() bg_head_clip = video_photo_clip(vid=bg_head,layer=2,position=0,end=LOGO_OP.info.duration,ck=ck) t.AddClip(bg_head_clip) main_timer += LOGO_OP.info.duration head_duration = LOGO_OP.info.duration bg_head.Close() LOGO_OP.Close() progress += 10 clip_duration=0 photo_clip_list = [None]*len(text_content) img_list = [None]*len(text_content) anchor_clip_list = [None] * len(text_content) anchor_list = [None] * len(text_content) audio_clip_list = [None] * len(text_content) audio_list = [None] * len(text_content) sub_clip_list = [None] * len(text_content) sub_img_list = [None] * len(text_content) idx = 0 for p in listdir(dir_photo+name_hash): anchor_list[idx] = openshot.FFmpegReader(dir_anchor+name_hash+"/"+str(idx)+".mp4") clip_duration = anchor_list[idx].info.duration anchor_list[idx].Open() anchor_clip_list[idx] = video_photo_clip(vid=anchor_list[idx],layer=4,scale_x=0.65,scale_y=0.65, location_x=0.35,location_y=0.25,position=main_timer, end=clip_duration,ck=ck_anchor,audio=False) t.AddClip(anchor_clip_list[idx]) img_list[idx] = openshot.FFmpegReader(dir_photo+name_hash+'/'+p) img_list[idx].Open() photo_clip_list[idx] = video_photo_clip(vid=img_list[idx],layer=3 ,scale_x=0.81,scale_y=0.68,location_y=-0.03,position=main_timer,end=clip_duration,audio=False) t.AddClip(photo_clip_list[idx]) img_list[idx].Close() audio_list[idx] = openshot.FFmpegReader(dir_sound+name_hash+"/"+str(idx)+".mp3") audio_list[idx].Open() audio_clip_list[idx] = openshot.Clip(audio_list[idx]) audio_clip_list[idx].Position(main_timer) audio_clip_list[idx].End(clip_duration) t.AddClip(audio_clip_list[idx]) img_list[idx].Close() anchor_list[idx].Close() audio_list[idx].Close() sub_img_list[idx] = [None] * len(sub_list[idx]) sub_clip_list[idx] = [None] * len(sub_list[idx]) sub_timer = 0 for sub_idx in range(len(sub_list[idx])): sub_img_list[idx][sub_idx] = openshot.QtImageReader(sub_list[idx][sub_idx]['path']) sub_img_list[idx][sub_idx].Open() sub_duration = 0.205*sub_list[idx][sub_idx]['count'] sub_clip_list[idx][sub_idx] = video_photo_clip(vid=sub_img_list[idx][sub_idx], layer=6,location_x=0.069, location_y=0.89,position=main_timer+sub_timer,end=sub_duration) t.AddClip(sub_clip_list[idx][sub_idx]) sub_img_list[idx][sub_idx].Close() sub_timer += sub_duration print(sub_list[idx][sub_idx]['path']) main_timer += clip_duration idx+=1 progress+=10 asyncio.run(sendProgress(progress,client_id)) LOGO_ED = openshot.FFmpegReader(dir_video+"LOGO_ED.avi") LOGO_ED.Open() LOGO_ED_clip = video_photo_clip(vid=LOGO_ED,layer=4,position=main_timer,end=LOGO_ED.info.duration+2 ,location_x=0.005,location_y=-0.031 ,scale_x=0.8,scale_y=0.6825) t.AddClip(LOGO_ED_clip) ED_duration = LOGO_ED.info.duration LOGO_ED.Close() bg = openshot.FFmpegReader(dir_video+"bg.mp4") bg.Open() bg_times = math.floor(main_timer+ED_duration/bg.info.duration) left_time = (main_timer+ED_duration) % bg.info.duration bg_clip_list = [None] * bg_times bg_list = [None] * bg_times bg.Close() bg_timer = head_duration for idx in range(bg_times): bg_list[idx] = openshot.FFmpegReader(dir_video+"bg.mp4") bg_list[idx].Open() bg_clip_list[idx] = video_photo_clip(bg_list[idx],layer=2,position=bg_timer ,end=bg_list[idx].info.duration,ck=ck) t.AddClip(bg_clip_list[idx]) bg_timer += bg_list[idx].info.duration bg_list[idx].Close() bg_left = openshot.FFmpegReader(dir_video+"bg.mp4") bg_left.Open() bg_left_clip = video_photo_clip(bg_left,layer=2,position=bg_timer,end=left_time,ck=ck) t.AddClip(bg_left_clip) bg_left.Close() title = openshot.QtImageReader(dir_title+name_hash+".png") title.Open() # Open the reader title_clip = video_photo_clip(vid=title, layer=4,location_x=-0.047, location_y=0.801,position=0,end=head_duration+main_timer) t.AddClip(title_clip) ####start building w = openshot.FFmpegWriter("../html/"+name_hash+".mp4") w.SetAudioOptions(True, "aac", 44100, 2, openshot.LAYOUT_STEREO, 3000000) w.SetVideoOptions(True, "libx264", openshot.Fraction(30000, 1000), 1280, 720, openshot.Fraction(1, 1), False, False, 3000000) w.Open() #may change duration into t.info.duration frames = int(t.info.fps)*int(head_duration+main_timer+ED_duration) for n in range(frames): f=t.GetFrame(n) w.WriteFrame(f) progress = 100 asyncio.run(sendProgress(progress,client_id)) notify_group(name+"的影片已經產生完成囉! www.choozmo.com:8168/"+name_hash+".mp4") t.Close() w.Close() progress = 100 asyncio.run(sendProgress(progress,client_id)) print("Raw Video done") print("video at : www.choozmo.com:8168/"+name_hash+".mp4") #line notifs