| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543 | from os import listdirfrom os.path import isfile, isdir, joinimport openshotimport threadingimport zhttsimport os import urllibfrom typing import Listimport requestsfrom pydantic import BaseModelfrom bs4 import BeautifulSoupfrom PIL import Image,ImageDraw,ImageFontimport pyttsx3import rpycimport randomimport reimport timeimport mathimport datasetfrom datetime import datetimefrom gtts import gTTSdir_sound = 'mp3_track/'dir_photo = 'photo/'dir_text = 'text_file/'dir_video = 'video_material/'dir_title = 'title/'dir_subtitle = 'subtitle/'dir_anchor = 'anchor_raw/'tmp_video_dir = 'tmp_video/'video_sub_folder = 'ai_anchor_video/'dir_list = [dir_sound,dir_photo,dir_text,dir_video,dir_title,dir_subtitle,dir_anchor,tmp_video_dir]def notify_group(msg):    glist=['7vilzohcyQMPLfAMRloUawiTV4vtusZhxv8Czo7AJX8','WekCRfnAirSiSxALiD6gcm0B56EejsoK89zFbIaiZQD','1dbtJHbWVbrooXmQqc4r8OyRWDryjD4TMJ6DiDsdgsX','HOB1kVNgIb81tTB4Ort1BfhVp9GFo6NlToMQg88vEhh']    for gid in glist:        headers = {                "Authorization": "Bearer " + gid,                "Content-Type": "application/x-www-form-urlencoded"        }        params = {"message": msg}           r = requests.post("https://notify-api.line.me/api/notify",headers=headers, params=params)def cKey(r,g,b,fuzz):    col=openshot.Color()    col.red=openshot.Keyframe(r)    col.green=openshot.Keyframe(g)    col.blue=openshot.Keyframe(b)    return openshot.ChromaKey(col, openshot.Keyframe(fuzz))def video_photo_clip(vid=None,layer=None, position=None, end=None    ,scale_x=1,scale_y=1,location_x=0,location_y=0,ck=None,audio=True):    clip = openshot.Clip(vid)    clip.Layer(layer)    clip.Position(position)    clip.End(end)    clip.scale_x=openshot.Keyframe(scale_x)    clip.scale_y=openshot.Keyframe(scale_y)    clip.location_x=openshot.Keyframe(location_x)    clip.location_y=openshot.Keyframe(location_y)        if ck!=None:        clip.AddEffect(ck)    if audio==True:        clip.has_audio=openshot.Keyframe(1)    else:        clip.has_audio=openshot.Keyframe(0)    return clipdef myunichchar(unicode_char):        mb_string = unicode_char.encode('big5')        try:            unicode_char = unichr(ord(mb_string[0]) << 8 | ord(mb_string[1]))        except NameError:            unicode_char = chr(mb_string[0] << 8 | mb_string[1])        return unicode_chardef get_url_type(url):    req = urllib.request.Request(url, method='HEAD', headers={'User-Agent': 'Mozilla/5.0'})    r = urllib.request.urlopen(req)    contentType = r.getheader('Content-Type')    return contentType    def make_dir(name_hash):    for direct in dir_list:        if not os.path.isdir(direct):            os.mkdir(direct)    try:        os.mkdir(dir_photo+name_hash)    except FileExistsError:        print("~~~~~~Warning~~~~~~~~~Directory " , dir_photo+name_hash ,  " already exists")    try:        os.mkdir(dir_text+name_hash)    except FileExistsError:        print("~~~~~~Warning~~~~~~~~~Directory " , dir_text+name_hash ,  " already exists")    try:        os.mkdir(dir_sound+name_hash)    except FileExistsError:        print("~~~~~~Warning~~~~~~~~~Directory " , dir_sound+name_hash ,  " already exists")    try:        os.mkdir(dir_anchor+name_hash)    except FileExistsError:        print("~~~~~~Warning~~~~~~~~~Directory " , dir_anchor+name_hash ,  " already exists")    try:        os.mkdir(dir_subtitle+name_hash)    except FileExistsError:        print("~~~~~~Warning~~~~~~~~~Directory " , dir_subtitle+name_hash ,  " already exists")def file_prepare(name, name_hash,text_content,image_urls,lang):    make_dir(name_hash)    img_num = 1    for imgu in image_urls:        if get_url_type(imgu) =='video/mp4':            r=requests.get(imgu)            f=open(dir_photo+name_hash+"/"+str(img_num)+".mp4",'wb')            for chunk in r.iter_content(chunk_size=255):                 if chunk:                    f.write(chunk)            f.close()        else:            im = Image.open(requests.get(imgu, stream=True).raw)            im= im.convert("RGB")            im.save(dir_photo+name_hash+"/"+str(img_num)+".jpg")        img_num+=1    #save text    txt_idx=0    for txt in text_content:        text_file = open(dir_text+name_hash+"/"+str(txt_idx)+".txt", "w")        text_file.write(txt)        text_file.close()        txt_idx+=1    print("text file made")    #make mp3    language = 'zh-tw'    txt_idx = 0    for txt in text_content:        if lang==1:            tts = gTTS(txt)            tts.save(dir_sound+name_hash+"/"+str(txt_idx)+".mp3")        else:            tts = zhtts.TTS()             tts.text2wav(txt,dir_sound+name_hash+"/"+str(txt_idx)+".mp3")        txt_idx+=1    print("mp3 file made")    #make title as image    txt2image_title(name, dir_title+name_hash+".png")def txt2image(content, save_target):    unicode_text = trim_punctuation(content)    font = ImageFont.truetype(font="font/DFT_B7.ttc", size=38)    text_width, text_height = font.getsize(unicode_text)    canvas = Image.new('RGBA', (700, 500), (255, 0, 0, 0) )    draw = ImageDraw.Draw(canvas)    text= unicode_text    draw.text((5,5), text, (255, 255, 0), font)    canvas.save(save_target, "PNG")def txt2image_title(content, save_target):    unicode_text = trim_punctuation(content)    font = ImageFont.truetype(font="font/DFT_B7.ttc", size=28)    text_width, text_height = font.getsize(unicode_text)    canvas = Image.new('RGBA', (510, 500), (255, 0, 0, 0) )    draw = ImageDraw.Draw(canvas)    text= unicode_text    draw.text((5,5), text, (17, 41, 167), font)    canvas.save(save_target, "PNG")def call_anchor(fileName,avatar):    conn = rpyc.classic.connect("192.168.1.105",18812)    ros = conn.modules.os     rsys = conn.modules.sys     fr=open(dir_sound+fileName+".mp3",'rb')# voice    #warning!!!    file my be replaced by other process    fw=conn.builtins.open('/tmp/output.mp3','wb')    while True:        b=fr.read(1024)        if b:            fw.write(b)        else:            break    fr.close()    fw.close()    val=random.randint(1000000,9999999)    ros.chdir('/home/jared/to_video')    ros.system('./p'+str(avatar)+'.sh '+str(val)+' &')    while True:        print('waiting...')        if ros.path.exists('/tmp/results/'+str(val)):            break        time.sleep(5)        print('waiting...')    fr=conn.builtins.open('/tmp/results/'+str(val)+'.mp4','rb')    fw=open(dir_anchor+fileName+".mp4",'wb')    while True:        b=fr.read(1024)        if b:            fw.write(b)        else:            break    fr.close()    fw.close()def trim_punctuation(s):    pat_block = u'[^\u4e00-\u9fff0-9a-zA-Z]+';    pattern = u'([0-9]+{0}[0-9]+)|{0}'.format(pat_block)    res = re.sub(pattern, lambda x: x.group(1) if x.group(1) else u" " ,s)    return resdef splitter(s):    for sent in re.findall(u'[^!?,。\!\?]+[!?。\!\?]?', s, flags=re.U):        yield sentdef split_by_pun(s):    res = list(splitter(s))    return resdef generate_subtitle_image(name_hash,text_content):    img_list = [None]*len(text_content)    for idx in range(len(text_content)):        img_list[idx]=[]        senList = split_by_pun(text_content[idx])        for inner_idx in range(len(senList)):            sv_path = dir_subtitle + name_hash +'/'+str(idx)+ str(inner_idx) +'.png'            sub = senList[inner_idx]            txt2image(sub,sv_path)            img_list[idx]+=[{"count":len(sub),"path":sv_path}]    return img_listdef generate_subtitle_image_ENG(name_hash,text_content):    img_list = [None]*len(text_content)    for idx in range(len(text_content)):        sv_path = dir_subtitle + name_hash +'/'+str(idx)+'.png'        sub = text_content[idx]        txt2image(sub, sv_path)        img_list[idx] = sv_path    return img_listdef anchor_video_v2(name_hash,name,text_content, image_urls,avatar):    print(os.getcwd())    print('sub image made')    file_prepare(name, name_hash, text_content,image_urls,0)    sub_list=generate_subtitle_image(name_hash,text_content)        for fname in range(len(text_content)):        call_anchor(name_hash+"/"+str(fname),avatar)        print('step finish')    print('called............................................')    ck=cKey(0,254,0,270)    ck_anchor=cKey(0,255,1,320)    duration = 0    #average layer level is 3    t = openshot.Timeline(1280, 720, openshot.Fraction(30000, 1000), 44100, 2, openshot.LAYOUT_STEREO)    t.Open()    main_timer = 0        LOGO_OP = openshot.FFmpegReader(dir_video+"LOGO_OP.mp4")    LOGO_OP.Open()         # Open the reader    LOGO_OP_clip = video_photo_clip(vid=LOGO_OP,layer=4,position=0,end=LOGO_OP.info.duration                    ,location_y=-0.03,scale_x=0.8,scale_y=0.704)    t.AddClip(LOGO_OP_clip)    bg_head = openshot.FFmpegReader(dir_video+"bg_head.avi")    bg_head.Open()    bg_head_clip = video_photo_clip(vid=bg_head,layer=2,position=0,end=LOGO_OP.info.duration,ck=ck)    t.AddClip(bg_head_clip)    main_timer += LOGO_OP.info.duration    head_duration = LOGO_OP.info.duration    bg_head.Close()    LOGO_OP.Close()        clip_duration=0    photo_clip_list = [None]*len(text_content)    img_list = [None]*len(text_content)    anchor_clip_list = [None] * len(text_content)    anchor_list = [None] * len(text_content)    audio_clip_list = [None] * len(text_content)    audio_list = [None] * len(text_content)    sub_clip_list = [None] * len(text_content)    sub_img_list = [None] * len(text_content)        idx = 0    for p in listdir(dir_photo+name_hash):                anchor_list[idx] = openshot.FFmpegReader(dir_anchor+name_hash+"/"+str(idx)+".mp4")        clip_duration = anchor_list[idx].info.duration        anchor_list[idx].Open()        anchor_clip_list[idx] = video_photo_clip(vid=anchor_list[idx],layer=4,scale_x=0.65,scale_y=0.65,                location_x=0.35,location_y=0.25,position=main_timer, end=clip_duration,ck=ck_anchor,audio=False)        t.AddClip(anchor_clip_list[idx])        img_list[idx] = openshot.FFmpegReader(dir_photo+name_hash+'/'+p)        img_list[idx].Open()        photo_clip_list[idx] = video_photo_clip(vid=img_list[idx],layer=3                ,scale_x=0.81,scale_y=0.68,location_y=-0.03,position=main_timer,end=clip_duration,audio=False)        t.AddClip(photo_clip_list[idx])        img_list[idx].Close()        audio_list[idx] = openshot.FFmpegReader(dir_sound+name_hash+"/"+str(idx)+".mp3")        audio_list[idx].Open()        audio_clip_list[idx] = openshot.Clip(audio_list[idx])        audio_clip_list[idx].Position(main_timer)        audio_clip_list[idx].End(clip_duration)        t.AddClip(audio_clip_list[idx])        img_list[idx].Close()        anchor_list[idx].Close()        audio_list[idx].Close()                    sub_img_list[idx] = [None] * len(sub_list[idx])        sub_clip_list[idx] = [None] * len(sub_list[idx])        sub_timer = 0        for sub_idx in range(len(sub_list[idx])):            sub_img_list[idx][sub_idx] = openshot.QtImageReader(sub_list[idx][sub_idx]['path'])            sub_img_list[idx][sub_idx].Open()            sub_duration = 0.205*sub_list[idx][sub_idx]['count']            sub_clip_list[idx][sub_idx] = video_photo_clip(vid=sub_img_list[idx][sub_idx], layer=6,location_x=0.069, location_y=0.89,position=main_timer+sub_timer,end=sub_duration)            t.AddClip(sub_clip_list[idx][sub_idx])            sub_img_list[idx][sub_idx].Close()            sub_timer += sub_duration            print(sub_list[idx][sub_idx]['path'])        main_timer += clip_duration        idx+=1        LOGO_ED = openshot.FFmpegReader(dir_video+"LOGO_ED.avi")    LOGO_ED.Open()    LOGO_ED_clip = video_photo_clip(vid=LOGO_ED,layer=4,position=main_timer,end=LOGO_ED.info.duration+2                    ,location_x=0.005,location_y=-0.031                    ,scale_x=0.8,scale_y=0.6825)    t.AddClip(LOGO_ED_clip)    ED_duration = LOGO_ED.info.duration    LOGO_ED.Close()        bg = openshot.FFmpegReader(dir_video+"bg.mp4")    bg.Open()    bg_times = math.floor(main_timer+ED_duration/bg.info.duration)    left_time = (main_timer+ED_duration) % bg.info.duration    bg_clip_list = [None] * bg_times    bg_list = [None] * bg_times    bg.Close()    bg_timer = head_duration    for idx in range(bg_times):        bg_list[idx] = openshot.FFmpegReader(dir_video+"bg.mp4")        bg_list[idx].Open()        bg_clip_list[idx] = video_photo_clip(bg_list[idx],layer=2,position=bg_timer                ,end=bg_list[idx].info.duration,ck=ck)        t.AddClip(bg_clip_list[idx])        bg_timer += bg_list[idx].info.duration        bg_list[idx].Close()    bg_left = openshot.FFmpegReader(dir_video+"bg.mp4")    bg_left.Open()    bg_left_clip = video_photo_clip(bg_left,layer=2,position=bg_timer,end=left_time,ck=ck)    t.AddClip(bg_left_clip)    bg_left.Close()    title = openshot.QtImageReader(dir_title+name_hash+".png")    title.Open()         # Open the reader    title_clip = video_photo_clip(vid=title, layer=4,location_x=-0.047, location_y=0.801,position=0,end=head_duration+main_timer)    t.AddClip(title_clip)    ####start building    w = openshot.FFmpegWriter(tmp_video_dir+name_hash+".mp4")    w.SetAudioOptions(True, "aac", 44100, 2, openshot.LAYOUT_STEREO, 3000000)    w.SetVideoOptions(True, "libx264", openshot.Fraction(30000, 1000), 1280, 720,        openshot.Fraction(1, 1), False, False, 3000000)    w.Open()        #may change duration into t.info.duration    frames = int(t.info.fps)*int(head_duration+main_timer+ED_duration)    for n in range(frames):        f=t.GetFrame(n)        w.WriteFrame(f)            notify_group(name+"的影片已經產生完成囉! www.choozmo.com:8168/"+video_sub_folder+name_hash+".mp4")    t.Close()    w.Close()    print("video at : www.choozmo.com:8168/"+video_sub_folder+name_hash+".mp4")def anchor_video_eng(name_hash,name,text_content, image_urls,sub_titles,avatar):    file_prepare(name, name_hash, text_content,image_urls,1)    sub_list=generate_subtitle_image_ENG(name_hash,sub_titles)        for fname in range(len(text_content)):        call_anchor(name_hash+"/"+str(fname),avatar)        print('step finish')    print('called............................................')    ck=cKey(0,254,0,270)    ck_anchor=cKey(0,255,1,320)    duration = 0    #average layer level is 3    t = openshot.Timeline(1280, 720, openshot.Fraction(30000, 1000), 44100, 2, openshot.LAYOUT_STEREO)    t.Open()    main_timer = 0    #add logo    LOGO_OP = openshot.FFmpegReader(dir_video+"LOGO_OP.mp4")    LOGO_OP.Open()         # Open the reader    LOGO_OP_clip = video_photo_clip(vid=LOGO_OP,layer=4,position=0,end=LOGO_OP.info.duration                    ,location_y=-0.03,scale_x=0.8,scale_y=0.704)    t.AddClip(LOGO_OP_clip)    #add background video  (head is different)    bg_head = openshot.FFmpegReader(dir_video+"bg_head_eng.mp4")    bg_head.Open()    bg_head_clip = video_photo_clip(vid=bg_head,layer=2,position=0,end=LOGO_OP.info.duration,ck=ck)    t.AddClip(bg_head_clip)        main_timer += LOGO_OP.info.duration    head_duration = LOGO_OP.info.duration    bg_head.Close()    LOGO_OP.Close()    #prepare empty list     clip_duration=0    photo_clip_list = [None]*len(text_content)    img_list = [None]*len(text_content)    anchor_clip_list = [None] * len(text_content)    anchor_list = [None] * len(text_content)    audio_clip_list = [None] * len(text_content)    audio_list = [None] * len(text_content)    sub_clip_list = [None] * len(text_content)    #openshot image holder    sub_img_list = [None] * len(text_content)        idx = 0    for p in listdir(dir_photo+name_hash):                anchor_list[idx] = openshot.FFmpegReader(dir_anchor+name_hash+"/"+str(idx)+".mp4")        clip_duration = anchor_list[idx].info.duration        anchor_list[idx].Open()        anchor_clip_list[idx] = video_photo_clip(vid=anchor_list[idx],layer=4,scale_x=0.65,scale_y=0.65,                location_x=0.35,location_y=0.25,position=main_timer, end=clip_duration,ck=ck_anchor,audio=False)        t.AddClip(anchor_clip_list[idx])        #insert image         img_list[idx] = openshot.FFmpegReader(dir_photo+name_hash+'/'+p)        img_list[idx].Open()        photo_clip_list[idx] = video_photo_clip(vid=img_list[idx],layer=3                ,scale_x=0.81,scale_y=0.68,location_y=-0.03,position=main_timer,end=clip_duration,audio=False)        t.AddClip(photo_clip_list[idx])        img_list[idx].Close()        #insert audio (speech)        audio_list[idx] = openshot.FFmpegReader(dir_sound+name_hash+"/"+str(idx)+".mp3")        audio_list[idx].Open()        audio_clip_list[idx] = openshot.Clip(audio_list[idx])        audio_clip_list[idx].Position(main_timer)        audio_clip_list[idx].End(clip_duration)        t.AddClip(audio_clip_list[idx])        #insert subtitle        sub_img_list[idx] = openshot.QtImageReader(sub_list[idx])        sub_img_list[idx].Open()        sub_clip_list[idx] = video_photo_clip(vid=sub_img_list[idx], layer=6,location_x=0.069, location_y=0.89,position=main_timer,end=clip_duration)        t.AddClip(sub_clip_list[idx])        img_list[idx].Close()        anchor_list[idx].Close()        audio_list[idx].Close()        sub_img_list[idx].Close()                    main_timer += clip_duration        idx+=1        LOGO_ED = openshot.FFmpegReader(dir_video+"ED_ENG.mp4")    LOGO_ED.Open()    LOGO_ED_clip = video_photo_clip(vid=LOGO_ED,layer=4,position=main_timer,end=LOGO_ED.info.duration+2                    ,location_x=0.005,location_y=-0.031                    ,scale_x=0.8,scale_y=0.6825)    t.AddClip(LOGO_ED_clip)    ED_duration = LOGO_ED.info.duration    LOGO_ED.Close()        bg = openshot.FFmpegReader(dir_video+"bg_eng.mp4")    bg.Open()    bg_times = math.floor(main_timer+ED_duration/bg.info.duration)    left_time = (main_timer+ED_duration) % bg.info.duration    bg_clip_list = [None] * bg_times    bg_list = [None] * bg_times    bg.Close()    bg_timer = head_duration    for idx in range(bg_times):        bg_list[idx] = openshot.FFmpegReader(dir_video+"bg_eng.mp4")        bg_list[idx].Open()        bg_clip_list[idx] = video_photo_clip(bg_list[idx],layer=2,position=bg_timer                ,end=bg_list[idx].info.duration,ck=ck)        t.AddClip(bg_clip_list[idx])        bg_timer += bg_list[idx].info.duration        bg_list[idx].Close()    bg_left = openshot.FFmpegReader(dir_video+"bg_eng.mp4")    bg_left.Open()    bg_left_clip = video_photo_clip(bg_left,layer=2,position=bg_timer,end=left_time,ck=ck)    t.AddClip(bg_left_clip)    bg_left.Close()    title = openshot.QtImageReader(dir_title+name_hash+".png")    title.Open()         # Open the reader    title_clip = video_photo_clip(vid=title, layer=4,location_x=-0.047, location_y=0.801,position=0,end=head_duration+main_timer)    t.AddClip(title_clip)    ####start building    w = openshot.FFmpegWriter(tmp_video_dir+name_hash+".mp4")    w.SetAudioOptions(True, "aac", 44100, 2, openshot.LAYOUT_STEREO, 3000000)    w.SetVideoOptions(True, "libx264", openshot.Fraction(30000, 1000), 1280, 720,        openshot.Fraction(1, 1), False, False, 3000000)    w.Open()        #may change duration into t.info.duration    frames = int(t.info.fps)*int(head_duration+main_timer+ED_duration)    for n in range(frames):        f=t.GetFrame(n)        w.WriteFrame(f)            notify_group(name+"(ENG)的影片已經產生完成囉! www.choozmo.com:8168/"+video_sub_folder+name_hash+".mp4")    t.Close()    w.Close()    print("video at : www.choozmo.com:8168/"+video_sub_folder+name_hash+".mp4")    #line notifsclass video_service(rpyc.Service):    def exposed_call_video(self,name_hash,name,text_content, image_urls,avatar):        anchor_video_v2(name_hash,name,text_content, image_urls,avatar)    def exposed_call_video_eng(self,name_hash,name,text_content, image_urls,sub_titles,avatar):        anchor_video_eng(name_hash,name,text_content, image_urls,sub_titles,avatar)from rpyc.utils.server import ThreadedServert = ThreadedServer(video_service, port=8878)print('service started')t.start()
 |