from os import listdir from os.path import isfile, isdir, join import openshot import threading import zhtts import os import urllib from typing import List import requests from pydantic import BaseModel from bs4 import BeautifulSoup from PIL import Image,ImageDraw,ImageFont import pyttsx3 import rpyc import random import re import time import math import dataset from datetime import datetime from gtts import gTTS import cv2 from mail import mail_to_users dir_sound = 'mp3_track/' dir_photo = 'photo/' dir_text = 'text_file/' dir_video = 'video_material/' dir_title = 'title/' dir_subtitle = 'subtitle/' dir_anchor = 'anchor_raw/' tmp_video_dir = 'tmp_video/' video_sub_folder = 'ai_anchor_video/' dir_list = [dir_sound,dir_photo,dir_text,dir_video,dir_title,dir_subtitle,dir_anchor,tmp_video_dir] def notify_group(msg, glist=['7vilzohcyQMPLfAMRloUawiTV4vtusZhxv8Czo7AJX8','WekCRfnAirSiSxALiD6gcm0B56EejsoK89zFbIaiZQD','1dbtJHbWVbrooXmQqc4r8OyRWDryjD4TMJ6DiDsdgsX','HOB1kVNgIb81tTB4Ort1BfhVp9GFo6NlToMQg88vEhh']): for gid in glist: headers = { "Authorization": "Bearer " + gid, "Content-Type": "application/x-www-form-urlencoded" } params = {"message": msg} r = requests.post("https://notify-api.line.me/api/notify",headers=headers, params=params) def get_mp4_duration(video_name='mp4-test-file.mp4'): # 可輸入url cap = cv2.VideoCapture(video_name) # 幀率 fps = int(round(cap.get(cv2.CAP_PROP_FPS))) # 分辨率-寬度 width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) # 分辨率-高度 height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # 總幀數 frame_counter = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) cap.release() cv2.destroyAllWindows() # 時長,單位:秒(s) duration = int(frame_counter / fps) return duration def cKey(r,g,b,fuzz): col=openshot.Color() col.red=openshot.Keyframe(r) col.green=openshot.Keyframe(g) col.blue=openshot.Keyframe(b) return openshot.ChromaKey(col, openshot.Keyframe(fuzz)) def video_photo_clip(vid=None,layer=None, position=None, end=None ,scale_x=1,scale_y=1,location_x=0,location_y=0,ck=None,audio=True): clip = openshot.Clip(vid) clip.Layer(layer) clip.Position(position) clip.End(end) clip.scale_x=openshot.Keyframe(scale_x) clip.scale_y=openshot.Keyframe(scale_y) clip.location_x=openshot.Keyframe(location_x) clip.location_y=openshot.Keyframe(location_y) if ck!=None: clip.AddEffect(ck) if audio==True: clip.has_audio=openshot.Keyframe(1) else: clip.has_audio=openshot.Keyframe(0) return clip def myunichchar(unicode_char): mb_string = unicode_char.encode('big5') try: unicode_char = unichr(ord(mb_string[0]) << 8 | ord(mb_string[1])) except NameError: unicode_char = chr(mb_string[0] << 8 | mb_string[1]) return unicode_char def get_url_type(url): req = urllib.request.Request(url, method='HEAD', headers={'User-Agent': 'Mozilla/5.0'}) r = urllib.request.urlopen(req) contentType = r.getheader('Content-Type') return contentType def make_dir(name_hash): for direct in dir_list: if not os.path.isdir(direct): os.mkdir(direct) try: os.mkdir(dir_photo+name_hash) except FileExistsError: print("~~~~~~Warning~~~~~~~~~Directory " , dir_photo+name_hash , " already exists") try: os.mkdir(dir_text+name_hash) except FileExistsError: print("~~~~~~Warning~~~~~~~~~Directory " , dir_text+name_hash , " already exists") try: os.mkdir(dir_sound+name_hash) except FileExistsError: print("~~~~~~Warning~~~~~~~~~Directory " , dir_sound+name_hash , " already exists") try: os.mkdir(dir_anchor+name_hash) except FileExistsError: print("~~~~~~Warning~~~~~~~~~Directory " , dir_anchor+name_hash , " already exists") try: os.mkdir(dir_subtitle+name_hash) except FileExistsError: print("~~~~~~Warning~~~~~~~~~Directory " , dir_subtitle+name_hash , " already exists") def file_prepare(name, name_hash,text_content,image_urls,lang='zh'): make_dir(name_hash) img_num = 1 for imgu in image_urls: if get_url_type(imgu) =='video/mp4': r=requests.get(imgu) f=open(dir_photo+name_hash+"/"+str(img_num)+".mp4",'wb') for chunk in r.iter_content(chunk_size=255): if chunk: f.write(chunk) f.close() else: im = Image.open(requests.get(imgu, stream=True).raw) im= im.convert("RGB") im.save(dir_photo+name_hash+"/"+str(img_num)+".jpg") img_num+=1 #save text txt_idx=0 for txt in text_content: text_file = open(dir_text+name_hash+"/"+str(txt_idx)+".txt", "w") text_file.write(txt) text_file.close() txt_idx+=1 print("text file made") #make mp3 language = 'zh-tw' txt_idx = 0 for txt in text_content: if lang==1: tts = gTTS(txt) tts.save(dir_sound+name_hash+"/"+str(txt_idx)+".mp3") else: tts = zhtts.TTS() tts.text2wav(txt,dir_sound+name_hash+"/"+str(txt_idx)+".mp3") txt_idx+=1 print("mp3 file made") #make title as image txt2image_title(name, dir_title+name_hash+".png",lang) def txt2image(content, save_target,lang='zh'): unicode_text = trim_punctuation(content) font = '' if lang=='zh': font = ImageFont.truetype(font="font/DFT_B7.ttc", size=38) else : font = ImageFont.load("arial.pil") text_width, text_height = font.getsize(unicode_text) canvas = Image.new('RGBA', (700, 500), (255, 0, 0, 0) ) draw = ImageDraw.Draw(canvas) text= unicode_text draw.text((5,5), text, (255, 255, 0), font) canvas.save(save_target, "PNG") def txt2image_title(content, save_target, lang='zh'): unicode_text = trim_punctuation(content) font = '' if lang=='zh': font = ImageFont.truetype(font="font/DFT_B7.ttc", size=38) else : font = ImageFont.load("arial.pil") text_width, text_height = font.getsize(unicode_text) canvas = Image.new('RGBA', (510, 500), (255, 0, 0, 0) ) draw = ImageDraw.Draw(canvas) text= unicode_text draw.text((5,5), text, (17, 41, 167), font) canvas.save(save_target, "PNG") def call_anchor(fileName,avatar): conn = rpyc.classic.connect("192.168.1.105",18812) ros = conn.modules.os rsys = conn.modules.sys fr=open(dir_sound+fileName+".mp3",'rb')# voice #warning!!! file my be replaced by other process fw=conn.builtins.open('/tmp/output.mp3','wb') while True: b=fr.read(1024) if b: fw.write(b) else: break fr.close() fw.close() val=random.randint(1000000,9999999) ros.chdir('/home/jared/to_video') ros.system('./p'+str(avatar)+'.sh '+str(val)+' &') while True: print('waiting...') if ros.path.exists('/tmp/results/'+str(val)): break time.sleep(5) print('waiting...') fr=conn.builtins.open('/tmp/results/'+str(val)+'.mp4','rb') fw=open(dir_anchor+fileName+".mp4",'wb') while True: b=fr.read(1024) if b: fw.write(b) else: break fr.close() fw.close() def trim_punctuation(s): pat_block = u'[^\u4e00-\u9fff0-9a-zA-Z]+'; pattern = u'([0-9]+{0}[0-9]+)|{0}'.format(pat_block) res = re.sub(pattern, lambda x: x.group(1) if x.group(1) else u" " ,s) return res def splitter(s): for sent in re.findall(u'[^!?,。\!\?]+[!?。\!\?]?', s, flags=re.U): yield sent def split_by_pun(s): res = list(splitter(s)) return res def generate_subtitle_image(name_hash,text_content): img_list = [None]*len(text_content) for idx in range(len(text_content)): img_list[idx]=[] senList = split_by_pun(text_content[idx]) for inner_idx in range(len(senList)): sv_path = dir_subtitle + name_hash +'/'+str(idx)+ str(inner_idx) +'.png' sub = senList[inner_idx] txt2image(sub,sv_path) img_list[idx]+=[{"count":len(sub),"path":sv_path}] return img_list def generate_subtitle_image_ENG(name_hash,text_content): img_list = [None]*len(text_content) for idx in range(len(text_content)): sv_path = dir_subtitle + name_hash +'/'+str(idx)+'.png' sub = text_content[idx] txt2image(sub, sv_path,lang='eng') img_list[idx] = sv_path return img_list def anchor_video_v2(name_hash,name,text_content, image_urls,avatar): """ 影片產生主程式。 """ print(os.getcwd()) print('sub image made') file_prepare(name, name_hash, text_content,image_urls,0) sub_list=generate_subtitle_image(name_hash,text_content) for fname in range(len(text_content)): call_anchor(name_hash+"/"+str(fname),avatar) print('step finish') print('called............................................') ck=cKey(0,254,0,270) ck_anchor=cKey(0,255,1,320) duration = 0 #average layer level is 3 t = openshot.Timeline(1280, 720, openshot.Fraction(30000, 1000), 44100, 2, openshot.LAYOUT_STEREO) t.Open() main_timer = 0 LOGO_OP = openshot.FFmpegReader(dir_video+"LOGO_OP.mp4") LOGO_OP.Open() # Open the reader LOGO_OP_clip = video_photo_clip(vid=LOGO_OP,layer=4,position=0,end=LOGO_OP.info.duration ,location_y=-0.03,scale_x=0.8,scale_y=0.704) t.AddClip(LOGO_OP_clip) bg_head = openshot.FFmpegReader(dir_video+"bg_head.avi") bg_head.Open() bg_head_clip = video_photo_clip(vid=bg_head,layer=2,position=0,end=LOGO_OP.info.duration,ck=ck) t.AddClip(bg_head_clip) main_timer += LOGO_OP.info.duration head_duration = LOGO_OP.info.duration bg_head.Close() LOGO_OP.Close() clip_duration=0 photo_clip_list = [None]*len(text_content) img_list = [None]*len(text_content) anchor_clip_list = [None] * len(text_content) anchor_list = [None] * len(text_content) audio_clip_list = [None] * len(text_content) audio_list = [None] * len(text_content) sub_clip_list = [None] * len(text_content) sub_img_list = [None] * len(text_content) idx = 0 for p in listdir(dir_photo+name_hash): anchor_list[idx] = openshot.FFmpegReader(dir_anchor+name_hash+"/"+str(idx)+".mp4") clip_duration = anchor_list[idx].info.duration anchor_list[idx].Open() anchor_clip_list[idx] = video_photo_clip(vid=anchor_list[idx],layer=4,scale_x=0.65,scale_y=0.65, location_x=0.35,location_y=0.25,position=main_timer, end=clip_duration,ck=ck_anchor,audio=False) t.AddClip(anchor_clip_list[idx]) img_list[idx] = openshot.FFmpegReader(dir_photo+name_hash+'/'+p) img_list[idx].Open() photo_clip_list[idx] = video_photo_clip(vid=img_list[idx],layer=3 ,scale_x=0.81,scale_y=0.68,location_y=-0.03,position=main_timer,end=clip_duration,audio=False) t.AddClip(photo_clip_list[idx]) img_list[idx].Close() audio_list[idx] = openshot.FFmpegReader(dir_sound+name_hash+"/"+str(idx)+".mp3") audio_list[idx].Open() audio_clip_list[idx] = openshot.Clip(audio_list[idx]) audio_clip_list[idx].Position(main_timer) audio_clip_list[idx].End(clip_duration) t.AddClip(audio_clip_list[idx]) img_list[idx].Close() anchor_list[idx].Close() audio_list[idx].Close() sub_img_list[idx] = [None] * len(sub_list[idx]) sub_clip_list[idx] = [None] * len(sub_list[idx]) sub_timer = 0 for sub_idx in range(len(sub_list[idx])): sub_img_list[idx][sub_idx] = openshot.QtImageReader(sub_list[idx][sub_idx]['path']) sub_img_list[idx][sub_idx].Open() sub_duration = 0.205*sub_list[idx][sub_idx]['count'] sub_clip_list[idx][sub_idx] = video_photo_clip(vid=sub_img_list[idx][sub_idx], layer=6,location_x=0.069, location_y=0.89,position=main_timer+sub_timer,end=sub_duration) t.AddClip(sub_clip_list[idx][sub_idx]) sub_img_list[idx][sub_idx].Close() sub_timer += sub_duration print(sub_list[idx][sub_idx]['path']) main_timer += clip_duration idx+=1 LOGO_ED = openshot.FFmpegReader(dir_video+"LOGO_ED.avi") LOGO_ED.Open() LOGO_ED_clip = video_photo_clip(vid=LOGO_ED,layer=4,position=main_timer,end=LOGO_ED.info.duration+2 ,location_x=0.005,location_y=-0.031 ,scale_x=0.8,scale_y=0.6825) t.AddClip(LOGO_ED_clip) ED_duration = LOGO_ED.info.duration LOGO_ED.Close() bg = openshot.FFmpegReader(dir_video+"bg.mp4") bg.Open() bg_times = math.floor(main_timer+ED_duration/bg.info.duration) left_time = (main_timer+ED_duration) % bg.info.duration bg_clip_list = [None] * bg_times bg_list = [None] * bg_times bg.Close() bg_timer = head_duration for idx in range(bg_times): bg_list[idx] = openshot.FFmpegReader(dir_video+"bg.mp4") bg_list[idx].Open() bg_clip_list[idx] = video_photo_clip(bg_list[idx],layer=2,position=bg_timer ,end=bg_list[idx].info.duration,ck=ck) t.AddClip(bg_clip_list[idx]) bg_timer += bg_list[idx].info.duration bg_list[idx].Close() bg_left = openshot.FFmpegReader(dir_video+"bg.mp4") bg_left.Open() bg_left_clip = video_photo_clip(bg_left,layer=2,position=bg_timer,end=left_time,ck=ck) t.AddClip(bg_left_clip) bg_left.Close() title = openshot.QtImageReader(dir_title+name_hash+".png") title.Open() # Open the reader title_clip = video_photo_clip(vid=title, layer=4,location_x=-0.047, location_y=0.801,position=0,end=head_duration+main_timer) t.AddClip(title_clip) ####start building w = openshot.FFmpegWriter(tmp_video_dir+name_hash+".mp4") w.SetAudioOptions(True, "aac", 44100, 2, openshot.LAYOUT_STEREO, 3000000) w.SetVideoOptions(True, "libx264", openshot.Fraction(30000, 1000), 1280, 720, openshot.Fraction(1, 1), False, False, 3000000) w.Open() #may change duration into t.info.duration frames = int(t.info.fps)*int(head_duration+main_timer+ED_duration) for n in range(frames): f=t.GetFrame(n) w.WriteFrame(f) # 更新剩下時間、duration video_link = f"www.choozmo.com:8168/{video_sub_folder}{name_hash}.mp4" duration, user_id = update_hisotry_duration(video_link) is_left_time = update_user_left_time(user_id, duration) # 1.餘額足 2. 餘額不足 if is_left_time: notify_group(name+"的影片已經產生完成囉! www.choozmo.com:8168/"+video_sub_folder+name_hash+".mp4") contents = f""" 影片下載網址: {video_link}""" mail_to_users(user_id, f'您好,您的影片 "{name}" 已經完成', contents=contents) else: notify_group(msg="您的餘額不足,請去...儲值,才能取得影片唷!") contents = f""" 餘額不足,請往: ...儲值url儲值 """ mail_to_users(user_id, f'您好,您的餘額不足', contents) t.Close() w.Close() print("video at : www.choozmo.com:8168/"+video_sub_folder+name_hash+".mp4") def anchor_video_eng(name_hash,name,text_content, image_urls,sub_titles,avatar): file_prepare(name, name_hash, text_content,image_urls,'eng') sub_list=generate_subtitle_image_ENG(name_hash,sub_titles) for fname in range(len(text_content)): call_anchor(name_hash+"/"+str(fname),avatar) print('step finish') print('called............................................') ck=cKey(0,254,0,270) ck_anchor=cKey(0,255,1,320) duration = 0 #average layer level is 3 t = openshot.Timeline(1280, 720, openshot.Fraction(30000, 1000), 44100, 2, openshot.LAYOUT_STEREO) t.Open() main_timer = 0 #add logo LOGO_OP = openshot.FFmpegReader(dir_video+"LOGO_OP.mp4") LOGO_OP.Open() # Open the reader LOGO_OP_clip = video_photo_clip(vid=LOGO_OP,layer=4,position=0,end=LOGO_OP.info.duration ,location_y=-0.03,scale_x=0.8,scale_y=0.704) t.AddClip(LOGO_OP_clip) #add background video (head is different) bg_head = openshot.FFmpegReader(dir_video+"bg_head_eng.mp4") bg_head.Open() bg_head_clip = video_photo_clip(vid=bg_head,layer=2,position=0,end=LOGO_OP.info.duration,ck=ck) t.AddClip(bg_head_clip) main_timer += LOGO_OP.info.duration head_duration = LOGO_OP.info.duration bg_head.Close() LOGO_OP.Close() #prepare empty list clip_duration=0 photo_clip_list = [None]*len(text_content) img_list = [None]*len(text_content) anchor_clip_list = [None] * len(text_content) anchor_list = [None] * len(text_content) audio_clip_list = [None] * len(text_content) audio_list = [None] * len(text_content) sub_clip_list = [None] * len(text_content) #openshot image holder sub_img_list = [None] * len(text_content) idx = 0 for p in listdir(dir_photo+name_hash): anchor_list[idx] = openshot.FFmpegReader(dir_anchor+name_hash+"/"+str(idx)+".mp4") clip_duration = anchor_list[idx].info.duration anchor_list[idx].Open() anchor_clip_list[idx] = video_photo_clip(vid=anchor_list[idx],layer=4,scale_x=0.65,scale_y=0.65, location_x=0.35,location_y=0.25,position=main_timer, end=clip_duration,ck=ck_anchor,audio=False) t.AddClip(anchor_clip_list[idx]) #insert image img_list[idx] = openshot.FFmpegReader(dir_photo+name_hash+'/'+p) img_list[idx].Open() photo_clip_list[idx] = video_photo_clip(vid=img_list[idx],layer=3 ,scale_x=0.81,scale_y=0.68,location_y=-0.03,position=main_timer,end=clip_duration,audio=False) t.AddClip(photo_clip_list[idx]) img_list[idx].Close() #insert audio (speech) audio_list[idx] = openshot.FFmpegReader(dir_sound+name_hash+"/"+str(idx)+".mp3") audio_list[idx].Open() audio_clip_list[idx] = openshot.Clip(audio_list[idx]) audio_clip_list[idx].Position(main_timer) audio_clip_list[idx].End(clip_duration) t.AddClip(audio_clip_list[idx]) #insert subtitle sub_img_list[idx] = openshot.QtImageReader(sub_list[idx]) sub_img_list[idx].Open() sub_clip_list[idx] = video_photo_clip(vid=sub_img_list[idx], layer=6,location_x=0.069, location_y=0.89,position=main_timer,end=clip_duration) t.AddClip(sub_clip_list[idx]) img_list[idx].Close() anchor_list[idx].Close() audio_list[idx].Close() sub_img_list[idx].Close() main_timer += clip_duration idx+=1 LOGO_ED = openshot.FFmpegReader(dir_video+"ED_ENG.mp4") LOGO_ED.Open() LOGO_ED_clip = video_photo_clip(vid=LOGO_ED,layer=4,position=main_timer,end=LOGO_ED.info.duration+2 ,location_x=0.005,location_y=-0.031 ,scale_x=0.8,scale_y=0.6825) t.AddClip(LOGO_ED_clip) ED_duration = LOGO_ED.info.duration LOGO_ED.Close() bg = openshot.FFmpegReader(dir_video+"bg_eng.mp4") bg.Open() bg_times = math.floor(main_timer+ED_duration/bg.info.duration) left_time = (main_timer+ED_duration) % bg.info.duration bg_clip_list = [None] * bg_times bg_list = [None] * bg_times bg.Close() bg_timer = head_duration for idx in range(bg_times): bg_list[idx] = openshot.FFmpegReader(dir_video+"bg_eng.mp4") bg_list[idx].Open() bg_clip_list[idx] = video_photo_clip(bg_list[idx],layer=2,position=bg_timer ,end=bg_list[idx].info.duration,ck=ck) t.AddClip(bg_clip_list[idx]) bg_timer += bg_list[idx].info.duration bg_list[idx].Close() bg_left = openshot.FFmpegReader(dir_video+"bg_eng.mp4") bg_left.Open() bg_left_clip = video_photo_clip(bg_left,layer=2,position=bg_timer,end=left_time,ck=ck) t.AddClip(bg_left_clip) bg_left.Close() title = openshot.QtImageReader(dir_title+name_hash+".png") title.Open() # Open the reader title_clip = video_photo_clip(vid=title, layer=4,location_x=-0.047, location_y=0.801,position=0,end=head_duration+main_timer) t.AddClip(title_clip) ####start building w = openshot.FFmpegWriter(tmp_video_dir+name_hash+".mp4") w.SetAudioOptions(True, "aac", 44100, 2, openshot.LAYOUT_STEREO, 3000000) w.SetVideoOptions(True, "libx264", openshot.Fraction(30000, 1000), 1280, 720, openshot.Fraction(1, 1), False, False, 3000000) w.Open() #may change duration into t.info.duration frames = int(t.info.fps)*int(head_duration+main_timer+ED_duration) for n in range(frames): f=t.GetFrame(n) w.WriteFrame(f) # 更新剩下時間、duration video_link = f"www.choozmo.com:8168/{video_sub_folder}{name_hash}.mp4" duration, user_id = update_hisotry_duration(video_link) is_left_time = update_user_left_time(user_id, duration) # 1.餘額足 2. 餘額不足 if is_left_time: notify_group(name+", Your video is complete! www.choozmo.com:8168/"+video_sub_folder+name_hash+".mp4") contents = f""" The download address: {video_link}""" mail_to_users(user_id, f'Hi, your video "{name}" is complete', contents=contents) else: notify_group(msg="The left money is not enough, please deposit to get the video!") contents = f""" The left money is not enough, please go to ... to deposit. """ mail_to_users(user_id, f'Hi, your remain deposit is not enough', contents) t.Close() w.Close() print("video at : www.choozmo.com:8168/"+video_sub_folder+name_hash+".mp4") #line notifs class video_service(rpyc.Service): def exposed_call_video(self,name_hash,name,text_content, image_urls,avatar): anchor_video_v2(name_hash,name,text_content, image_urls,avatar) def exposed_call_video_eng(self,name_hash,name,text_content, image_urls,sub_titles,avatar): anchor_video_eng(name_hash,name,text_content, image_urls,sub_titles,avatar) def update_hisotry_duration(video_link): """ 更新資訊影片長度資訊。 """ db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') table = db['history_input'] duration = get_mp4_duration(video_link) data = dict(link=video_link, duration=duration) table.update(data, ['link']) rows = db.query(f'SELECT * FROM history_input WHERE link="{video_link}"') for row in rows: user_id = row['user_id'] return duration, user_id def update_user_left_time(user_id, duration): """ 更新使用者剩下時間,如果為負,接著提醒。 """ db = dataset.connect('mysql://choozmo:pAssw0rd@db.ptt.cx:3306/AI_anchor?charset=utf8mb4') table = db['users'] rows = db.query(f"SELECT * FROM user WHERE id={user_id}") for row in rows: left_time = row['left_time'] line_token = row['line_token'] left_time -= duration data = dict(id=user_id, left_time=left_time-duration) table.update(data, ['id']) if left_time < 0: return False else: return True from rpyc.utils.server import ThreadedServer t = ThreadedServer(video_service, port=8878) print('service started') t.start()