| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405 | """Defines autosub's main functionality."""#!/usr/bin/env pythonfrom __future__ import absolute_import, print_function, unicode_literalsimport argparseimport audioopimport jsonimport mathimport multiprocessingimport osimport subprocessimport sysimport tempfileimport waveimport requestsfrom googleapiclient.discovery import buildfrom progressbar import ProgressBar, Percentage, Bar, ETAfrom autosub.constants import (    LANGUAGE_CODES, GOOGLE_SPEECH_API_KEY, GOOGLE_SPEECH_API_URL,)from autosub.formatters import FORMATTERSDEFAULT_SUBTITLE_FORMAT = 'srt'DEFAULT_CONCURRENCY = 10DEFAULT_SRC_LANGUAGE = 'en'DEFAULT_DST_LANGUAGE = 'en'def percentile(arr, percent):    """    Calculate the given percentile of arr.    """    arr = sorted(arr)    index = (len(arr) - 1) * percent    floor = math.floor(index)    ceil = math.ceil(index)    if floor == ceil:        return arr[int(index)]    low_value = arr[int(floor)] * (ceil - index)    high_value = arr[int(ceil)] * (index - floor)    return low_value + high_valueclass FLACConverter(object): # pylint: disable=too-few-public-methods    """    Class for converting a region of an input audio or video file into a FLAC audio file    """    def __init__(self, source_path, include_before=0.25, include_after=0.25):        self.source_path = source_path        self.include_before = include_before        self.include_after = include_after    def __call__(self, region):        try:            start, end = region            start = max(0, start - self.include_before)            end += self.include_after            temp = tempfile.NamedTemporaryFile(suffix='.flac')            command = ["ffmpeg", "-ss", str(start), "-t", str(end - start),                       "-y", "-i", self.source_path,                       "-loglevel", "error", temp.name]            use_shell = True if os.name == "nt" else False            subprocess.check_output(command, stdin=open(os.devnull), shell=use_shell)            return temp.read()        except KeyboardInterrupt:            return Noneclass SpeechRecognizer(object): # pylint: disable=too-few-public-methods    """    Class for performing speech-to-text for an input FLAC file.    """    def __init__(self, language="en", rate=44100, retries=3, api_key=GOOGLE_SPEECH_API_KEY):        self.language = language        self.rate = rate        self.api_key = api_key        self.retries = retries    def __call__(self, data):        try:            for _ in range(self.retries):                url = GOOGLE_SPEECH_API_URL.format(lang=self.language, key=self.api_key)                headers = {"Content-Type": "audio/x-flac; rate=%d" % self.rate}                try:                    resp = requests.post(url, data=data, headers=headers)                except requests.exceptions.ConnectionError:                    continue                for line in resp.content.decode('utf-8').split("\n"):                    try:                        line = json.loads(line)                        line = line['result'][0]['alternative'][0]['transcript']                        return line[:1].upper() + line[1:]                    except IndexError:                        # no result                        continue        except KeyboardInterrupt:            return Noneclass Translator(object): # pylint: disable=too-few-public-methods    """    Class for translating a sentence from a one language to another.    """    def __init__(self, language, api_key, src, dst):        self.language = language        self.api_key = api_key        self.service = build('translate', 'v2',                             developerKey=self.api_key)        self.src = src        self.dst = dst    def __call__(self, sentence):        try:            if not sentence:                return None            result = self.service.translations().list( # pylint: disable=no-member                source=self.src,                target=self.dst,                q=[sentence]            ).execute()            if 'translations' in result and result['translations'] and \                'translatedText' in result['translations'][0]:                return result['translations'][0]['translatedText']            return None        except KeyboardInterrupt:            return Nonedef which(program):    """    Return the path for a given executable.    """    def is_exe(file_path):        """        Checks whether a file is executable.        """        return os.path.isfile(file_path) and os.access(file_path, os.X_OK)    fpath, _ = os.path.split(program)    if fpath:        if is_exe(program):            return program    else:        for path in os.environ["PATH"].split(os.pathsep):            path = path.strip('"')            exe_file = os.path.join(path, program)            if is_exe(exe_file):                return exe_file    return Nonedef extract_audio(filename, channels=1, rate=16000):    """    Extract audio from an input file to a temporary WAV file.    """    temp = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)    if not os.path.isfile(filename):        print("The given file does not exist: {}".format(filename))        raise Exception("Invalid filepath: {}".format(filename))    if not which("ffmpeg"):        print("ffmpeg: Executable not found on machine.")        raise Exception("Dependency not found: ffmpeg")    command = ["ffmpeg", "-y", "-i", filename,               "-ac", str(channels), "-ar", str(rate),               "-loglevel", "error", temp.name]    use_shell = True if os.name == "nt" else False    subprocess.check_output(command, stdin=open(os.devnull), shell=use_shell)    return temp.name, ratedef find_speech_regions(filename, frame_width=4096, min_region_size=0.5, max_region_size=6): # pylint: disable=too-many-locals    """    Perform voice activity detection on a given audio file.    """    reader = wave.open(filename)    sample_width = reader.getsampwidth()    rate = reader.getframerate()    n_channels = reader.getnchannels()    chunk_duration = float(frame_width) / rate    n_chunks = int(math.ceil(reader.getnframes()*1.0 / frame_width))    energies = []    for _ in range(n_chunks):        chunk = reader.readframes(frame_width)        energies.append(audioop.rms(chunk, sample_width * n_channels))    threshold = percentile(energies, 0.2)    elapsed_time = 0    regions = []    region_start = None    for energy in energies:        is_silence = energy <= threshold        max_exceeded = region_start and elapsed_time - region_start >= max_region_size        if (max_exceeded or is_silence) and region_start:            if elapsed_time - region_start >= min_region_size:                regions.append((region_start, elapsed_time))                region_start = None        elif (not region_start) and (not is_silence):            region_start = elapsed_time        elapsed_time += chunk_duration    return regionsdef generate_subtitles( # pylint: disable=too-many-locals,too-many-arguments        source_path,        output=None,        concurrency=DEFAULT_CONCURRENCY,        src_language=DEFAULT_SRC_LANGUAGE,        dst_language=DEFAULT_DST_LANGUAGE,        subtitle_file_format=DEFAULT_SUBTITLE_FORMAT,        api_key=None,    ):    """    Given an input audio/video file, generate subtitles in the specified language and format.    """    audio_filename, audio_rate = extract_audio(source_path)    regions = find_speech_regions(audio_filename)    pool = multiprocessing.Pool(concurrency)    converter = FLACConverter(source_path=audio_filename)    recognizer = SpeechRecognizer(language=src_language, rate=audio_rate,                                  api_key=GOOGLE_SPEECH_API_KEY)    transcripts = []    if regions:        try:            widgets = ["Converting speech regions to FLAC files: ", Percentage(), ' ', Bar(), ' ',                       ETA()]            pbar = ProgressBar(widgets=widgets, maxval=len(regions)).start()            extracted_regions = []            for i, extracted_region in enumerate(pool.imap(converter, regions)):                extracted_regions.append(extracted_region)                pbar.update(i)            pbar.finish()            widgets = ["Performing speech recognition: ", Percentage(), ' ', Bar(), ' ', ETA()]            pbar = ProgressBar(widgets=widgets, maxval=len(regions)).start()            for i, transcript in enumerate(pool.imap(recognizer, extracted_regions)):                transcripts.append(transcript)                pbar.update(i)            pbar.finish()            if src_language.split("-")[0] != dst_language.split("-")[0]:                if api_key:                    google_translate_api_key = api_key                    translator = Translator(dst_language, google_translate_api_key,                                            dst=dst_language,                                            src=src_language)                    prompt = "Translating from {0} to {1}: ".format(src_language, dst_language)                    widgets = [prompt, Percentage(), ' ', Bar(), ' ', ETA()]                    pbar = ProgressBar(widgets=widgets, maxval=len(regions)).start()                    translated_transcripts = []                    for i, transcript in enumerate(pool.imap(translator, transcripts)):                        translated_transcripts.append(transcript)                        pbar.update(i)                    pbar.finish()                    transcripts = translated_transcripts                else:                    print(                        "Error: Subtitle translation requires specified Google Translate API key. "                        "See --help for further information."                    )                    return 1        except KeyboardInterrupt:            pbar.finish()            pool.terminate()            pool.join()            print("Cancelling transcription")            raise    timed_subtitles = [(r, t) for r, t in zip(regions, transcripts) if t]    formatter = FORMATTERS.get(subtitle_file_format)    formatted_subtitles = formatter(timed_subtitles)    dest = output    if not dest:        base = os.path.splitext(source_path)[0]        dest = "{base}.{format}".format(base=base, format=subtitle_file_format)    with open(dest, 'wb') as output_file:        output_file.write(formatted_subtitles.encode("utf-8"))    os.remove(audio_filename)    return destdef validate(args):    """    Check that the CLI arguments passed to autosub are valid.    """    if args.format not in FORMATTERS:        print(            "Subtitle format not supported. "            "Run with --list-formats to see all supported formats."        )        return False    if args.src_language not in LANGUAGE_CODES.keys():        print(            "Source language not supported. "            "Run with --list-languages to see all supported languages."        )        return False    if args.dst_language not in LANGUAGE_CODES.keys():        print(            "Destination language not supported. "            "Run with --list-languages to see all supported languages."        )        return False    if not args.source_path:        print("Error: You need to specify a source path.")        return False    return Truedef main():    """    Run autosub as a command-line program.    """    parser = argparse.ArgumentParser()    parser.add_argument('source_path', help="Path to the video or audio file to subtitle",                        nargs='?')    parser.add_argument('-C', '--concurrency', help="Number of concurrent API requests to make",                        type=int, default=DEFAULT_CONCURRENCY)    parser.add_argument('-o', '--output',                        help="Output path for subtitles (by default, subtitles are saved in \                        the same directory and name as the source path)")    parser.add_argument('-F', '--format', help="Destination subtitle format",                        default=DEFAULT_SUBTITLE_FORMAT)    parser.add_argument('-S', '--src-language', help="Language spoken in source file",                        default=DEFAULT_SRC_LANGUAGE)    parser.add_argument('-D', '--dst-language', help="Desired language for the subtitles",                        default=DEFAULT_DST_LANGUAGE)    parser.add_argument('-K', '--api-key',                        help="The Google Translate API key to be used. \                        (Required for subtitle translation)")    parser.add_argument('--list-formats', help="List all available subtitle formats",                        action='store_true')    parser.add_argument('--list-languages', help="List all available source/destination languages",                        action='store_true')    args = parser.parse_args()    if args.list_formats:        print("List of formats:")        for subtitle_format in FORMATTERS:            print("{format}".format(format=subtitle_format))        return 0    if args.list_languages:        print("List of all languages:")        for code, language in sorted(LANGUAGE_CODES.items()):            print("{code}\t{language}".format(code=code, language=language))        return 0    if not validate(args):        return 1    try:        subtitle_file_path = generate_subtitles(            source_path=args.source_path,            concurrency=args.concurrency,            src_language=args.src_language,            dst_language=args.dst_language,            api_key=args.api_key,            subtitle_file_format=args.format,            output=args.output,        )        print("Subtitles file created at {}".format(subtitle_file_path))    except KeyboardInterrupt:        return 1    return 0if __name__ == '__main__':    sys.exit(main())
 |