from os import listdir from os.path import isfile, isdir, join import openshot import threading import zhtts import os import urllib from typing import List import requests from pydantic import BaseModel from bs4 import BeautifulSoup from PIL import Image,ImageDraw,ImageFont import pyttsx3 import rpyc import random import re import time import math import dataset from datetime import datetime from gtts import gTTS dir_sound = 'mp3_track/' dir_photo = 'photo/' dir_text = 'text_file/' dir_video = 'video_material/' dir_title = 'title/' dir_subtitle = 'subtitle/' dir_anchor = 'anchor_raw/' tmp_video_dir = 'tmp_video/' video_sub_folder = 'ai_anchor_video/' dir_list = [dir_sound,dir_photo,dir_text,dir_video,dir_title,dir_subtitle,dir_anchor,tmp_video_dir] def notify_group(msg): glist=['7vilzohcyQMPLfAMRloUawiTV4vtusZhxv8Czo7AJX8','WekCRfnAirSiSxALiD6gcm0B56EejsoK89zFbIaiZQD','1dbtJHbWVbrooXmQqc4r8OyRWDryjD4TMJ6DiDsdgsX','HOB1kVNgIb81tTB4Ort1BfhVp9GFo6NlToMQg88vEhh'] for gid in glist: headers = { "Authorization": "Bearer " + gid, "Content-Type": "application/x-www-form-urlencoded" } params = {"message": msg} r = requests.post("https://notify-api.line.me/api/notify",headers=headers, params=params) def cKey(r,g,b,fuzz): col=openshot.Color() col.red=openshot.Keyframe(r) col.green=openshot.Keyframe(g) col.blue=openshot.Keyframe(b) return openshot.ChromaKey(col, openshot.Keyframe(fuzz)) def video_photo_clip(vid=None,layer=None, position=None, end=None ,scale_x=1,scale_y=1,location_x=0,location_y=0,ck=None,audio=True): clip = openshot.Clip(vid) clip.Layer(layer) clip.Position(position) clip.End(end) clip.scale_x=openshot.Keyframe(scale_x) clip.scale_y=openshot.Keyframe(scale_y) clip.location_x=openshot.Keyframe(location_x) clip.location_y=openshot.Keyframe(location_y) if ck!=None: clip.AddEffect(ck) if audio==True: clip.has_audio=openshot.Keyframe(1) else: clip.has_audio=openshot.Keyframe(0) return clip def myunichchar(unicode_char): mb_string = unicode_char.encode('big5') try: unicode_char = unichr(ord(mb_string[0]) << 8 | ord(mb_string[1])) except NameError: unicode_char = chr(mb_string[0] << 8 | mb_string[1]) return unicode_char def get_url_type(url): req = urllib.request.Request(url, method='HEAD', headers={'User-Agent': 'Mozilla/5.0'}) r = urllib.request.urlopen(req) contentType = r.getheader('Content-Type') return contentType def make_dir(name_hash): for direct in dir_list: if not os.path.isdir(direct): os.mkdir(direct) try: os.mkdir(dir_photo+name_hash) except FileExistsError: print("~~~~~~Warning~~~~~~~~~Directory " , dir_photo+name_hash , " already exists") try: os.mkdir(dir_text+name_hash) except FileExistsError: print("~~~~~~Warning~~~~~~~~~Directory " , dir_text+name_hash , " already exists") try: os.mkdir(dir_sound+name_hash) except FileExistsError: print("~~~~~~Warning~~~~~~~~~Directory " , dir_sound+name_hash , " already exists") try: os.mkdir(dir_anchor+name_hash) except FileExistsError: print("~~~~~~Warning~~~~~~~~~Directory " , dir_anchor+name_hash , " already exists") try: os.mkdir(dir_subtitle+name_hash) except FileExistsError: print("~~~~~~Warning~~~~~~~~~Directory " , dir_subtitle+name_hash , " already exists") def file_prepare(name, name_hash,text_content,image_urls,lang='zh'): make_dir(name_hash) img_num = 1 for imgu in image_urls: if get_url_type(imgu) =='video/mp4': r=requests.get(imgu) f=open(dir_photo+name_hash+"/"+str(img_num)+".mp4",'wb') for chunk in r.iter_content(chunk_size=255): if chunk: f.write(chunk) f.close() else: im = Image.open(requests.get(imgu, stream=True).raw) im= im.convert("RGB") im.save(dir_photo+name_hash+"/"+str(img_num)+".jpg") img_num+=1 #save text txt_idx=0 for txt in text_content: text_file = open(dir_text+name_hash+"/"+str(txt_idx)+".txt", "w") text_file.write(txt) text_file.close() txt_idx+=1 print("text file made") #make mp3 language = 'zh-tw' txt_idx = 0 for txt in text_content: if lang!='zh': tts = gTTS(txt) tts.save(dir_sound+name_hash+"/"+str(txt_idx)+".mp3") else: tts = zhtts.TTS() tts.text2wav(txt,dir_sound+name_hash+"/"+str(txt_idx)+".mp3") txt_idx+=1 print("mp3 file made") #make title as image txt2image_title(name, dir_title+name_hash+".png",lang) def txt2image(content, save_target,lang='zh'): unicode_text = trim_punctuation(content) font = '' if lang=='zh': font = ImageFont.truetype(font="font/DFT_B7.ttc", size=38) else : font = ImageFont.truetype(font="font/arial.ttf", size=38) text_width, text_height = font.getsize(unicode_text) canvas = Image.new('RGBA', (700, 500), (255, 0, 0, 0) ) draw = ImageDraw.Draw(canvas) text= unicode_text draw.text((5,5), text, (255, 255, 0), font) canvas.save(save_target, "PNG") def txt2image_title(content, save_target, lang='zh'): unicode_text = trim_punctuation(content) font = '' if lang=='zh': font = ImageFont.truetype(font="font/DFT_B7.ttc", size=22) else : font = ImageFont.truetype(font="font/arial.ttf", size=22) text_width, text_height = font.getsize(unicode_text) canvas = Image.new('RGBA', (510, 500), (255, 0, 0, 0) ) draw = ImageDraw.Draw(canvas) text= unicode_text draw.text((5,5), text, (17, 41, 167), font) canvas.save(save_target, "PNG") def call_anchor(fileName,avatar): conn = rpyc.classic.connect("192.168.1.105",18812) ros = conn.modules.os rsys = conn.modules.sys fr=open(dir_sound+fileName+".mp3",'rb')# voice #warning!!! file my be replaced by other process fw=conn.builtins.open('/tmp/output.mp3','wb') while True: b=fr.read(1024) if b: fw.write(b) else: break fr.close() fw.close() val=random.randint(1000000,9999999) ros.chdir('/home/jared/to_video') ros.system('./p'+str(avatar)+'.sh '+str(val)+' &') while True: print('waiting...') if ros.path.exists('/tmp/results/'+str(val)): break time.sleep(5) print('waiting...') fr=conn.builtins.open('/tmp/results/'+str(val)+'.mp4','rb') fw=open(dir_anchor+fileName+".mp4",'wb') while True: b=fr.read(1024) if b: fw.write(b) else: break fr.close() fw.close() def trim_punctuation(s): pat_block = u'[^\u4e00-\u9fff0-9a-zA-Z]+'; pattern = u'([0-9]+{0}[0-9]+)|{0}'.format(pat_block) res = re.sub(pattern, lambda x: x.group(1) if x.group(1) else u" " ,s) return res def splitter(s): for sent in re.findall(u'[^!?,。\!\?]+[!?。\!\?]?', s, flags=re.U): yield sent def split_by_pun(s): res = list(splitter(s)) return res def generate_subtitle_image(name_hash,text_content): img_list = [None]*len(text_content) for idx in range(len(text_content)): img_list[idx]=[] senList = split_by_pun(text_content[idx]) for inner_idx in range(len(senList)): sv_path = dir_subtitle + name_hash +'/'+str(idx)+ str(inner_idx) +'.png' sub = senList[inner_idx] txt2image(sub,sv_path) img_list[idx]+=[{"count":len(sub),"path":sv_path}] return img_list def generate_subtitle_image_ENG(name_hash,text_content): img_list = [None]*len(text_content) for idx in range(len(text_content)): sv_path = dir_subtitle + name_hash +'/'+str(idx)+'.png' sub = text_content[idx] txt2image(sub, sv_path,lang='eng') img_list[idx] = sv_path return img_list def anchor_video_v2(name_hash,name,text_content, image_urls,avatar): print(os.getcwd()) print('sub image made') file_prepare(name, name_hash, text_content,image_urls) sub_list=generate_subtitle_image(name_hash,text_content) for fname in range(len(text_content)): call_anchor(name_hash+"/"+str(fname),avatar) print('step finish') print('called............................................') ck=cKey(0,254,0,270) ck_anchor=cKey(0,255,1,320) duration = 0 #average layer level is 3 t = openshot.Timeline(1280, 720, openshot.Fraction(30000, 1000), 44100, 2, openshot.LAYOUT_STEREO) t.Open() main_timer = 0 LOGO_OP = openshot.FFmpegReader(dir_video+"LOGO_OP_4.mp4") LOGO_OP.Open() # Open the reader LOGO_OP_clip = video_photo_clip(vid=LOGO_OP,layer=4,position=0,end=LOGO_OP.info.duration ,location_y=-0.03,scale_x=0.8,scale_y=0.704) t.AddClip(LOGO_OP_clip) bg_head = openshot.FFmpegReader(dir_video+"complete_head_aispokesgirl.mp4") bg_head.Open() bg_head_clip = video_photo_clip(vid=bg_head,layer=2,position=0,end=LOGO_OP.info.duration,ck=ck) t.AddClip(bg_head_clip) main_timer += LOGO_OP.info.duration head_duration = LOGO_OP.info.duration bg_head.Close() LOGO_OP.Close() clip_duration=0 photo_clip_list = [None]*len(text_content) img_list = [None]*len(text_content) anchor_clip_list = [None] * len(text_content) anchor_list = [None] * len(text_content) audio_clip_list = [None] * len(text_content) audio_list = [None] * len(text_content) sub_clip_list = [None] * len(text_content) sub_img_list = [None] * len(text_content) idx = 0 for p in listdir(dir_photo+name_hash): anchor_list[idx] = openshot.FFmpegReader(dir_anchor+name_hash+"/"+str(idx)+".mp4") clip_duration = anchor_list[idx].info.duration anchor_list[idx].Open() anchor_clip_list[idx] = video_photo_clip(vid=anchor_list[idx],layer=4,scale_x=0.65,scale_y=0.65, location_x=0.35,location_y=0.25,position=main_timer, end=clip_duration,ck=ck_anchor,audio=False) t.AddClip(anchor_clip_list[idx]) img_list[idx] = openshot.FFmpegReader(dir_photo+name_hash+'/'+p) img_list[idx].Open() photo_clip_list[idx] = video_photo_clip(vid=img_list[idx],layer=3 ,scale_x=0.81,scale_y=0.68,location_y=-0.03,position=main_timer,end=clip_duration,audio=False) t.AddClip(photo_clip_list[idx]) img_list[idx].Close() audio_list[idx] = openshot.FFmpegReader(dir_sound+name_hash+"/"+str(idx)+".mp3") audio_list[idx].Open() audio_clip_list[idx] = openshot.Clip(audio_list[idx]) audio_clip_list[idx].Position(main_timer) audio_clip_list[idx].End(clip_duration) t.AddClip(audio_clip_list[idx]) img_list[idx].Close() anchor_list[idx].Close() audio_list[idx].Close() sub_img_list[idx] = [None] * len(sub_list[idx]) sub_clip_list[idx] = [None] * len(sub_list[idx]) sub_timer = 0 for sub_idx in range(len(sub_list[idx])): sub_img_list[idx][sub_idx] = openshot.QtImageReader(sub_list[idx][sub_idx]['path']) sub_img_list[idx][sub_idx].Open() sub_duration = 0.205*sub_list[idx][sub_idx]['count'] sub_clip_list[idx][sub_idx] = video_photo_clip(vid=sub_img_list[idx][sub_idx], layer=6,location_x=0.069, location_y=0.89,position=main_timer+sub_timer,end=sub_duration) t.AddClip(sub_clip_list[idx][sub_idx]) sub_img_list[idx][sub_idx].Close() sub_timer += sub_duration print(sub_list[idx][sub_idx]['path']) main_timer += clip_duration idx+=1 LOGO_ED = openshot.FFmpegReader(dir_video+"LOGO_ED.avi") LOGO_ED.Open() LOGO_ED_clip = video_photo_clip(vid=LOGO_ED,layer=4,position=main_timer,end=LOGO_ED.info.duration+2 ,location_x=0.005,location_y=-0.031 ,scale_x=0.8,scale_y=0.6825) t.AddClip(LOGO_ED_clip) ED_duration = LOGO_ED.info.duration LOGO_ED.Close() bg = openshot.FFmpegReader(dir_video+"complete_double_aispokesgirl.mp4") bg.Open() bg_times = math.floor(main_timer+ED_duration/bg.info.duration) left_time = (main_timer+ED_duration) % bg.info.duration bg_clip_list = [None] * bg_times bg_list = [None] * bg_times bg.Close() bg_timer = head_duration for idx in range(bg_times): bg_list[idx] = openshot.FFmpegReader(dir_video+"complete_double_aispokesgirl.mp4") bg_list[idx].Open() bg_clip_list[idx] = video_photo_clip(bg_list[idx],layer=2,position=bg_timer ,end=bg_list[idx].info.duration,ck=ck) t.AddClip(bg_clip_list[idx]) bg_timer += bg_list[idx].info.duration bg_list[idx].Close() bg_left = openshot.FFmpegReader(dir_video+"complete_double_aispokesgirl.mp4") bg_left.Open() bg_left_clip = video_photo_clip(bg_left,layer=2,position=bg_timer,end=left_time,ck=ck) t.AddClip(bg_left_clip) bg_left.Close() title = openshot.QtImageReader(dir_title+name_hash+".png") title.Open() # Open the reader title_clip = video_photo_clip(vid=title, layer=4,location_x=-0.047, location_y=0.801,position=0,end=head_duration+main_timer) t.AddClip(title_clip) ####start building w = openshot.FFmpegWriter(tmp_video_dir+name_hash+".mp4") w.SetAudioOptions(True, "aac", 44100, 2, openshot.LAYOUT_STEREO, 3000000) w.SetVideoOptions(True, "libx264", openshot.Fraction(30000, 1000), 1280, 720, openshot.Fraction(1, 1), False, False, 3000000) w.Open() #may change duration into t.info.duration frames = int(t.info.fps)*int(head_duration+main_timer+ED_duration) for n in range(frames): f=t.GetFrame(n) w.WriteFrame(f) notify_group(name+"的影片已經產生完成囉! www.choozmo.com:8168/"+video_sub_folder+name_hash+".mp4") t.Close() w.Close() print("video at : www.choozmo.com:8168/"+video_sub_folder+name_hash+".mp4") def anchor_video_eng(name_hash,name,text_content, image_urls,sub_titles,avatar): file_prepare(name, name_hash, text_content,image_urls,'eng') sub_list=generate_subtitle_image_ENG(name_hash,sub_titles) for fname in range(len(text_content)): call_anchor(name_hash+"/"+str(fname),avatar) print('step finish') print('called............................................') ck=cKey(0,254,0,270) ck_anchor=cKey(0,255,1,320) duration = 0 #average layer level is 3 t = openshot.Timeline(1280, 720, openshot.Fraction(30000, 1000), 44100, 2, openshot.LAYOUT_STEREO) t.Open() main_timer = 0 #add logo LOGO_OP = openshot.FFmpegReader(dir_video+"LOGO_OP_4.mp4") LOGO_OP.Open() # Open the reader LOGO_OP_clip = video_photo_clip(vid=LOGO_OP,layer=4,position=0,end=LOGO_OP.info.duration ,location_y=-0.03,scale_x=0.8,scale_y=0.704) t.AddClip(LOGO_OP_clip) #add background video (head is different) bg_head = openshot.FFmpegReader(dir_video+"complete_head_aispokesgirl.mp4") bg_head.Open() bg_head_clip = video_photo_clip(vid=bg_head,layer=2,position=0,end=LOGO_OP.info.duration,ck=ck) t.AddClip(bg_head_clip) main_timer += LOGO_OP.info.duration head_duration = LOGO_OP.info.duration bg_head.Close() LOGO_OP.Close() #prepare empty list clip_duration=0 photo_clip_list = [None]*len(text_content) img_list = [None]*len(text_content) anchor_clip_list = [None] * len(text_content) anchor_list = [None] * len(text_content) audio_clip_list = [None] * len(text_content) audio_list = [None] * len(text_content) sub_clip_list = [None] * len(text_content) #openshot image holder sub_img_list = [None] * len(text_content) idx = 0 for p in listdir(dir_photo+name_hash): anchor_list[idx] = openshot.FFmpegReader(dir_anchor+name_hash+"/"+str(idx)+".mp4") clip_duration = anchor_list[idx].info.duration anchor_list[idx].Open() anchor_clip_list[idx] = video_photo_clip(vid=anchor_list[idx],layer=4,scale_x=0.65,scale_y=0.65, location_x=0.35,location_y=0.25,position=main_timer, end=clip_duration,ck=ck_anchor,audio=False) t.AddClip(anchor_clip_list[idx]) #insert image img_list[idx] = openshot.FFmpegReader(dir_photo+name_hash+'/'+p) img_list[idx].Open() photo_clip_list[idx] = video_photo_clip(vid=img_list[idx],layer=3 ,scale_x=0.81,scale_y=0.68,location_y=-0.03,position=main_timer,end=clip_duration,audio=False) t.AddClip(photo_clip_list[idx]) img_list[idx].Close() #insert audio (speech) audio_list[idx] = openshot.FFmpegReader(dir_sound+name_hash+"/"+str(idx)+".mp3") audio_list[idx].Open() audio_clip_list[idx] = openshot.Clip(audio_list[idx]) audio_clip_list[idx].Position(main_timer) audio_clip_list[idx].End(clip_duration) t.AddClip(audio_clip_list[idx]) #insert subtitle sub_img_list[idx] = openshot.QtImageReader(sub_list[idx]) sub_img_list[idx].Open() sub_clip_list[idx] = video_photo_clip(vid=sub_img_list[idx], layer=6,location_x=0.069, location_y=0.89,position=main_timer,end=clip_duration) t.AddClip(sub_clip_list[idx]) img_list[idx].Close() anchor_list[idx].Close() audio_list[idx].Close() sub_img_list[idx].Close() main_timer += clip_duration idx+=1 LOGO_ED = openshot.FFmpegReader(dir_video+"ED_ENG.mp4") LOGO_ED.Open() LOGO_ED_clip = video_photo_clip(vid=LOGO_ED,layer=4,position=main_timer,end=LOGO_ED.info.duration+2 ,location_x=0.005,location_y=-0.031 ,scale_x=0.8,scale_y=0.6825) t.AddClip(LOGO_ED_clip) ED_duration = LOGO_ED.info.duration LOGO_ED.Close() bg = openshot.FFmpegReader(dir_video+"complete_double_aispokesgirl.mp4") bg.Open() bg_times = math.floor(main_timer+ED_duration/bg.info.duration) left_time = (main_timer+ED_duration) % bg.info.duration bg_clip_list = [None] * bg_times bg_list = [None] * bg_times bg.Close() bg_timer = head_duration for idx in range(bg_times): bg_list[idx] = openshot.FFmpegReader(dir_video+"complete_double_aispokesgirl.mp4") bg_list[idx].Open() bg_clip_list[idx] = video_photo_clip(bg_list[idx],layer=2,position=bg_timer ,end=bg_list[idx].info.duration,ck=ck) t.AddClip(bg_clip_list[idx]) bg_timer += bg_list[idx].info.duration bg_list[idx].Close() bg_left = openshot.FFmpegReader(dir_video+"complete_double_aispokesgirl.mp4") bg_left.Open() bg_left_clip = video_photo_clip(bg_left,layer=2,position=bg_timer,end=left_time,ck=ck) t.AddClip(bg_left_clip) bg_left.Close() title = openshot.QtImageReader(dir_title+name_hash+".png") title.Open() # Open the reader title_clip = video_photo_clip(vid=title, layer=4,location_x=-0.047, location_y=0.801,position=0,end=head_duration+main_timer) t.AddClip(title_clip) ####start building w = openshot.FFmpegWriter(tmp_video_dir+name_hash+".mp4") w.SetAudioOptions(True, "aac", 44100, 2, openshot.LAYOUT_STEREO, 3000000) w.SetVideoOptions(True, "libx264", openshot.Fraction(30000, 1000), 1280, 720, openshot.Fraction(1, 1), False, False, 3000000) w.Open() #may change duration into t.info.duration frames = int(t.info.fps)*int(head_duration+main_timer+ED_duration) for n in range(frames): f=t.GetFrame(n) w.WriteFrame(f) notify_group(name+"(ENG)的影片已經產生完成囉! www.choozmo.com:8168/"+video_sub_folder+name_hash+".mp4") t.Close() w.Close() print("video at : www.choozmo.com:8168/"+video_sub_folder+name_hash+".mp4") #line notifs class video_service(rpyc.Service): def exposed_call_video(self,name_hash,name,text_content, image_urls,avatar): anchor_video_v2(name_hash,name,text_content, image_urls,avatar) def exposed_call_video_eng(self,name_hash,name,text_content, image_urls,sub_titles,avatar): anchor_video_eng(name_hash,name,text_content, image_urls,sub_titles,avatar) from rpyc.utils.server import ThreadedServer t = ThreadedServer(video_service, port=8878) print('service started') t.start()