123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408 |
- from autosub import DEFAULT_CONCURRENCY
- from autosub import DEFAULT_SUBTITLE_FORMAT
- from pytranscriber.control.ctr_main import Ctr_Main
- from pytranscriber.control.ctr_autosub import Ctr_Autosub
- import re,random, time
- from itertools import groupby
- from operator import itemgetter
- from rakeUtil.Rake import Rake
- import requests,rpyc
- from difflib import SequenceMatcher
- from PIL import Image,ImageDraw,ImageFont
- import openshot
- obj = Rake()
- stop_path = "rakedata/stoplist/中文停用词表(1208个).txt"
- conj_path = "rakedata/stoplist/中文分隔词词库.txt"
- obj.initializeFromPath(stop_path, conj_path)
- def trim_punctuation(s):
- pat_block = u'[^\u4e00-\u9fff0-9a-zA-Z]+';
- pattern = u'([0-9]+{0}[0-9]+)|{0}'.format(pat_block)
- res = re.sub(pattern, lambda x: x.group(1) if x.group(1) else u" " ,s)
- return res
- def txt2image(content, save_target,lang='zh'):
- unicode_text = trim_punctuation(content)
- content = content.replace(' ','')
- font = ''
- if lang=='zh':
- font = ImageFont.truetype(font="font/DFT_B7.ttc", size=38)
- else :
- font = ImageFont.truetype(font="font/arial.ttf", size=38)
- text_width, text_height = font.getsize(unicode_text)
- canvas = Image.new('RGBA', (700, 500), (255, 0, 0, 0) )
- draw = ImageDraw.Draw(canvas)
- text= unicode_text
- draw.text((5,5), text, (255, 255, 0), font)
- canvas.save(save_target, "PNG")
- def generate_subtitle_image_from_dict(cpath, sub_dict):
- for script in sub_dict:
- sv_path = cpath + '/' + str(script['index'])+'.png'
- sub = script['content']
- txt2image(sub,sv_path)
- def listener_progress(string, percent):
- True
- def transScript(cpath):
- Ctr_Autosub.init()
- Ctr_Autosub.generate_subtitles(cpath+"speech.mp3",'zh'
- ,listener_progress
- ,output=cpath+"script.txt"
- ,concurrency=DEFAULT_CONCURRENCY,subtitle_file_format=DEFAULT_SUBTITLE_FORMAT)
- def syllable_count(word):
- word = word.lower()
- count = 0
- vowels = "aeiouy"
- if word[0] in vowels:
- count += 1
- for index in range(1, len(word)):
- if word[index] in vowels and word[index - 1] not in vowels:
- count += 1
- if word.endswith("e"):
- count -= 1
- if count == 0:
- count += 1
- return count
- def getKeyword(inStr):
- re.findall(r'[\u4e00-\u9fff]+', inStr)
- zh_idx = []
- eng_idx= []
- for i in range(len(inStr)):
- if inStr[i] > u'\u4e00' and inStr[i] < u'\u9fff':
- zh_idx.append(i)
- else:
- eng_idx.append(i)
- kws = obj.extractKeywordFromString(inStr)
- engStr =''
- for idx in range(len(eng_idx)):
- if idx != len(eng_idx)-1:
- if eng_idx[idx]+1 == eng_idx[idx+1]:
- engStr+=inStr[eng_idx[idx]]
- else:
- engStr+=inStr[eng_idx[idx]]
- if len(engStr)>2:
- kws.append((engStr,10))
- engStr=''
- else:
- engStr+=inStr[eng_idx[idx]]
- if len(engStr)>2:
- kws.append((engStr,10))
- engStr=''
- return kws
- def getImgDict():
- return [{'kw':'podcast','id':'17j41rqsoWVzc-HD8jvdxb651pbHJhvH8'}
- ,{'kw':'podcast','id':'1wT3uIaoe3xD-wrAo-J9eZweHEuEfI22_'}
- ,{'kw':'podcast','id':'1uaP8_xtqMn_Zbx3DX78ALdCtFjUPYgKQ'}
- ,{'kw':'podcast','id':'1L1NMByTorcDBN8EpbwwUakTexdRAiF4w'}
- ,{'kw':'youtuber','id':'17vUM8xrMgI9y1aEbAoprOxKiK9OOkipE'}
- ,{'kw':'支持者','id':'1sb-DmU5X9YE7HZLva_UueEvzcXqrKoGk'}
- ,{'kw':'app','id':'1jxoZuFlUHyl1L7-WB2ejPKBEW38bsbR6'}]
- def savekeywordImage(kw,imgDict):
- highest_val = 0
- highest_d = None
- for d in imgDict:
- sim = SequenceMatcher(None, d['kw'], kw).ratio()
- #print(sim,d['kw'],kw)
- if sim>highest_val :
- highest_val = sim
- highest_d = d
- return highest_d
- def get_script(cPath):
- fpath = cPath+'script.txt'
- with open(fpath, 'r',encoding="utf-8") as f:
- raw_lines = [line.strip() for line in f]
- lines =[]
- for idx in range(int(len(raw_lines)/4+1)):
- line_content = raw_lines[idx*4+2]
- lines.append(line_content)
- return lines
- def rewriteScript(cPath,newlines):
- fpath = cPath+'script.txt'
- with open(fpath, 'r',encoding="utf-8") as f:
- raw_lines = [line.strip() for line in f]
- for idx in range(int(len(raw_lines)/4+1)):
- raw_lines[idx*4+2] = newlines[idx]
-
-
- f = open(fpath, 'w',encoding="utf-8")
- for l in raw_lines:
- f.write(l+'\n')
- f.close()
- def parse_script(file_path):
- imgDict = getImgDict()
- with open(file_path, 'r',encoding="utf-8") as f:
- raw_lines = [line.strip() for line in f]
- dict_list = []
- sScript_dicts = []
- scriptIdx = 0
- for idx in range(int(len(raw_lines)/4+1)):
- script={}
- line_content = raw_lines[idx*4+2]
- script['content'] = line_content
- time_raw = raw_lines[idx * 4 +1 ].split(' --> ')
- start = time_raw[0].split(':')
- stop = time_raw[1].split(':')
- script['start'] = float(start[0])*3600 + float(start[1])*60 + float(start[2].replace(',','.'))
- script['stop'] = float(stop[0])*3600 + float(stop[1])*60 + float(stop[2].replace(',','.'))
- script['duration'] = script['stop']-script['start']
- duration = script['duration']
- start = script['start']
- try:
- kw= getKeyword(script['content'])
- kw.sort(key=lambda tup: tup[1], reverse=True)
- if kw[0][1]>2:
- script['kw'] = kw[0][0]
- else:
- script['kw'] = ''
- except Exception as e:
- script['kw']=''
- kwd = savekeywordImage(script['kw'],imgDict)
-
- if kwd is not None:
- script['imgid'] = kwd['id']
- imgDict.remove(kwd)
- else:
- script['imgid'] = None
-
- dict_list.append(script)
-
- accumulated_duration = 0
- for sen in shorten(script['content'],15):
- #print(sen)
-
- sScript = {}
- sScript['content'] = sen['content']
- sScript['index'] = scriptIdx
- scriptIdx+=1
- sScript['start'] = accumulated_duration+script['start']
- sScript['duration'] = duration*sen['time_ratio']
- accumulated_duration+=duration*sen['time_ratio']
- sScript_dicts.append(sScript)
- img_dicts=[]
- found = False
- start = 0
- stop = 0
- imgid=''
- imgidx = 0
- for d in dict_list:
- if d['imgid'] is not None:
-
- if found :
- img_dicts.append({'index':imgidx,'imgid':imgid,'start':start,'duration':d['start']-start})
- imgidx+=1
- start = d['start']
- imgid = d['imgid']
- else:
- found=True
- start = d['start']
- imgid = d['imgid']
- if d['start']==dict_list[-1]['start']:
- if d['imgid'] is not None:
- img_dicts.append({'index':imgidx,'imgid':d['imgid'],'start':d['start'],'duration':d['duration']})
- else:
- img_dicts.append({'index':imgidx,'imgid':imgid,'start':start,'duration':d['stop']-start})
- imgidx+=1
-
- return sScript_dicts, img_dicts
- def shorten(in_str, maxLen):
- re.findall(r'[\u4e00-\u9fff]+', in_str)
- zh_idx = []
- eng_idx= []
- for i in range(len(in_str)):
- if in_str[i] > u'\u4e00' and in_str[i] < u'\u9fff':
- zh_idx.append(i)
- else:
- eng_idx.append(i)
- space_index = [m.start() for m in re.finditer(' ', in_str)]
- for idx in space_index:
- eng_idx.remove(idx)
-
- eng_range_list = []
- for k, g in groupby(enumerate(eng_idx), lambda ix : ix[0] - ix[1]):
- eng_range = list(map(itemgetter(1), g))
- eng_range_list.append(eng_range)
- total_syllable = 0
- for i in range(len(eng_range_list)):
- total_syllable += (syllable_count(in_str[eng_range_list[i][0]:eng_range_list[i][-1]+1])+0.5)
- for i in range(len(zh_idx)):
- total_syllable+=1
-
- #final chchchchchc[en][en][en]
- #[en] is a vocabulary dict with occurence of image
- zh_eng_idx_list = []
- i = 0
- while i < len(in_str):
- if in_str[i]==' ':
- i+=1
- if i in zh_idx:
- zh_eng_idx_list.append(i)
- i+=1
- if i in eng_idx:
- for ls in eng_range_list:
- if i in ls:
- zh_eng_idx_list.append(ls)
- i = ls[-1]+1
- break
-
- zh_eng_dict_list = [{'content':'','time_ratio':0}]
- idx = 0
- current_len = 0
- sen_idx = 0
- while idx < len(zh_eng_idx_list):
- str_from_idx = ''
- sylla_cnt = 1
- if type(zh_eng_idx_list[idx])==type([]):
- str_from_idx = in_str[zh_eng_idx_list[idx][0]:zh_eng_idx_list[idx][-1]+1]+' '
- sylla_cnt = syllable_count(str_from_idx)
- else:
- str_from_idx = in_str[zh_eng_idx_list[idx]]
-
-
- if len(zh_eng_dict_list[sen_idx]['content'])+sylla_cnt>=maxLen:
- zh_eng_dict_list[sen_idx]['time_ratio'] = current_len/total_syllable
-
- zh_eng_dict_list.append({'content':'','time_ratio':0})
- sen_idx+=1
- current_len = 0
- else:
- current_len += sylla_cnt
- zh_eng_dict_list[sen_idx]['content'] += str_from_idx
- idx+=1
-
- total_ratio = 0
- for obj in zh_eng_dict_list:
- total_ratio+=obj['time_ratio']
- zh_eng_dict_list[-1]['time_ratio'] = 1-total_ratio
- return zh_eng_dict_list
- def video_writer_init(path):
- w = openshot.FFmpegWriter(path)
- w.SetAudioOptions(True, "aac", 44100, 2, openshot.LAYOUT_STEREO, 3000000)
- w.SetVideoOptions(True, "libx264", openshot.Fraction(30000, 1000), 1280, 720,
- openshot.Fraction(1, 1), False, False, 3000000)
- return w
- def cKey(r,g,b,fuzz):
- col=openshot.Color()
- col.red=openshot.Keyframe(r)
- col.green=openshot.Keyframe(g)
- col.blue=openshot.Keyframe(b)
- return openshot.ChromaKey(col, openshot.Keyframe(fuzz))
- def video_photo_clip(vid=None,layer=None, position=None, end=None
- ,scale_x=1,scale_y=1,location_x=0,location_y=0,ck=None,audio=True):
- clip = openshot.Clip(vid)
- clip.Layer(layer)
- clip.Position(position)
- clip.End(end)
- clip.scale_x=openshot.Keyframe(scale_x)
- clip.scale_y=openshot.Keyframe(scale_y)
- clip.location_x=openshot.Keyframe(location_x)
- clip.location_y=openshot.Keyframe(location_y)
-
- if ck!=None:
- clip.AddEffect(ck)
- if audio==True:
- clip.has_audio=openshot.Keyframe(1)
- else:
- clip.has_audio=openshot.Keyframe(0)
- return clip
- def downloadFromDrive(cPath,fid,idx):
- download_file_from_google_drive(fid, cPath+str(idx)+'img.jpg')
- def download_file_from_google_drive(id, destination):
- URL = "https://docs.google.com/uc?export=download"
- session = requests.Session()
- response = session.get(URL, params = { 'id' : id }, stream = True)
- token = get_confirm_token(response)
- if token:
- params = { 'id' : id, 'confirm' : token }
- response = session.get(URL, params = params, stream = True)
- save_response_content(response, destination)
- def get_confirm_token(response):
- for key, value in response.cookies.items():
- if key.startswith('download_warning'):
- return value
- return None
- def save_response_content(response, destination):
- CHUNK_SIZE = 32768
- with open(destination, "wb") as f:
- for chunk in response.iter_content(CHUNK_SIZE):
- if chunk: # filter out keep-alive new chunks
- f.write(chunk)
- def call_anchor(fileName,avatar):
- conn = rpyc.classic.connect("192.168.1.111",18812)
- ros = conn.modules.os
- rsys = conn.modules.sys
- fr=open(fileName,'rb')# local svoice
- #warning!!! file my be replaced by other process
- fw=conn.builtins.open('/tmp/output.mp3','wb')# remote
- while True:
- b=fr.read(1024)
- if b:
- fw.write(b)
- else:
- break
- fr.close()
- fw.close()
- val=random.randint(1000000,9999999)
- ros.chdir('/home/jared/to_video')
- ros.system('./p'+str(avatar)+'.sh '+str(val)+' &')
- while True:
- print('waiting...')
- if ros.path.exists('/tmp/results/'+str(val)):
- break
- time.sleep(5)
- print('waiting...')
- fr=conn.builtins.open('/tmp/results/'+str(val)+'.mp4','rb')
- newfileName = fileName.replace('speech.mp3','speaker.mp4')
- fw=open(newfileName,'wb')#local anchor
- while True:
- b=fr.read(1024)
- if b:
- fw.write(b)
- else:
- break
- fr.close()
- fw.close()
|