123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434 |
- """
- Defines autosub's main functionality.
- """
- #!/usr/bin/env python
- from __future__ import absolute_import, print_function, unicode_literals
- import argparse
- import audioop
- import math
- import multiprocessing
- import os
- from json import JSONDecodeError
- import subprocess
- import sys
- import tempfile
- import wave
- import json
- import requests
- try:
- from json.decoder import JSONDecodeError
- except ImportError:
- JSONDecodeError = ValueError
- from googleapiclient.discovery import build
- from progressbar import ProgressBar, Percentage, Bar, ETA
- from autosub.constants import (
- LANGUAGE_CODES, GOOGLE_SPEECH_API_KEY, GOOGLE_SPEECH_API_URL,
- )
- from autosub.formatters import FORMATTERS
- DEFAULT_SUBTITLE_FORMAT = 'srt'
- DEFAULT_CONCURRENCY = 10
- DEFAULT_SRC_LANGUAGE = 'en'
- DEFAULT_DST_LANGUAGE = 'en'
- def percentile(arr, percent):
- """
- Calculate the given percentile of arr.
- """
- arr = sorted(arr)
- index = (len(arr) - 1) * percent
- floor = math.floor(index)
- ceil = math.ceil(index)
- if floor == ceil:
- return arr[int(index)]
- low_value = arr[int(floor)] * (ceil - index)
- high_value = arr[int(ceil)] * (index - floor)
- return low_value + high_value
- class FLACConverter(object): # pylint: disable=too-few-public-methods
- """
- Class for converting a region of an input audio or video file into a FLAC audio file
- """
- def __init__(self, source_path, include_before=0.25, include_after=0.25):
- self.source_path = source_path
- self.include_before = include_before
- self.include_after = include_after
- def __call__(self, region):
- try:
- start, end = region
- start = max(0, start - self.include_before)
- end += self.include_after
- #delete=False necessary for running on Windows
- temp = tempfile.NamedTemporaryFile(suffix='.flac', delete=False)
- program_ffmpeg = which("ffmpeg")
- command = [str(program_ffmpeg), "-ss", str(start), "-t", str(end - start),
- "-y", "-i", self.source_path,
- "-loglevel", "error", temp.name]
- use_shell = True if os.name == "nt" else False
- subprocess.check_output(command, stdin=open(os.devnull), shell=use_shell)
- read_data = temp.read()
- temp.close()
- os.unlink(temp.name)
- return read_data
- except KeyboardInterrupt:
- return None
- class SpeechRecognizer(object): # pylint: disable=too-few-public-methods
- """
- Class for performing speech-to-text for an input FLAC file.
- """
- def __init__(self, language="en", rate=44100, retries=3, api_key=GOOGLE_SPEECH_API_KEY):
- self.language = language
- self.rate = rate
- self.api_key = api_key
- self.retries = retries
- def __call__(self, data):
- try:
- for _ in range(self.retries):
- url = GOOGLE_SPEECH_API_URL.format(lang=self.language, key=self.api_key)
- headers = {"Content-Type": "audio/x-flac; rate=%d" % self.rate}
- try:
- resp = requests.post(url, data=data, headers=headers)
- except requests.exceptions.ConnectionError:
- continue
- for line in resp.content.decode('utf-8').split("\n"):
- try:
- line = json.loads(line)
- line = line['result'][0]['alternative'][0]['transcript']
- return line[:1].upper() + line[1:]
- except IndexError:
- # no result
- continue
- except JSONDecodeError:
- continue
- except KeyboardInterrupt:
- return None
- class Translator(object): # pylint: disable=too-few-public-methods
- """
- Class for translating a sentence from a one language to another.
- """
- def __init__(self, language, api_key, src, dst):
- self.language = language
- self.api_key = api_key
- self.service = build('translate', 'v2',
- developerKey=self.api_key)
- self.src = src
- self.dst = dst
- def __call__(self, sentence):
- try:
- if not sentence:
- return None
- result = self.service.translations().list( # pylint: disable=no-member
- source=self.src,
- target=self.dst,
- q=[sentence]
- ).execute()
- if 'translations' in result and result['translations'] and \
- 'translatedText' in result['translations'][0]:
- return result['translations'][0]['translatedText']
- return None
- except KeyboardInterrupt:
- return None
- def which(program):
- """
- Return the path for a given executable.
- """
- def is_exe(file_path):
- """
- Checks whether a file is executable.
- """
- return os.path.isfile(file_path) and os.access(file_path, os.X_OK)
- #necessary to run on Windows
- if os.name == "nt":
- program += ".exe"
- fpath, _ = os.path.split(program)
- if fpath:
- if is_exe(program):
- return program
- else:
- #looks for file in the script execution folder before checking on system path
- current_dir = os.getcwd()
- local_program = os.path.join(current_dir, program)
- if is_exe(local_program):
- return local_program
- else:
- for path in os.environ["PATH"].split(os.pathsep):
- path = path.strip('"')
- exe_file = os.path.join(path, program)
- if is_exe(exe_file):
- return exe_file
- return None
- def extract_audio(filename, channels=1, rate=16000):
- """
- Extract audio from an input file to a temporary WAV file.
- """
- temp = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
- if not os.path.isfile(filename):
- print("The given file does not exist: {}".format(filename))
- raise Exception("Invalid filepath: {}".format(filename))
- program_ffmpeg = which("ffmpeg")
- if not program_ffmpeg:
- print("ffmpeg: Executable not found on machine.")
- raise Exception("Dependency not found: ffmpeg")
- command = [str(program_ffmpeg), "-y", "-i", filename,
- "-ac", str(channels), "-ar", str(rate),
- "-loglevel", "error", temp.name]
- use_shell = True if os.name == "nt" else False
- subprocess.check_output(command, stdin=open(os.devnull), shell=use_shell)
- return temp.name, rate
- def find_speech_regions(filename, frame_width=4096, min_region_size=0.5, max_region_size=6): # pylint: disable=too-many-locals
- """
- Perform voice activity detection on a given audio file.
- """
- reader = wave.open(filename)
- sample_width = reader.getsampwidth()
- rate = reader.getframerate()
- n_channels = reader.getnchannels()
- chunk_duration = float(frame_width) / rate
- n_chunks = int(math.ceil(reader.getnframes()*1.0 / frame_width))
- energies = []
- for _ in range(n_chunks):
- chunk = reader.readframes(frame_width)
- energies.append(audioop.rms(chunk, sample_width * n_channels))
- threshold = percentile(energies, 0.2)
- elapsed_time = 0
- regions = []
- region_start = None
- for energy in energies:
- is_silence = energy <= threshold
- max_exceeded = region_start and elapsed_time - region_start >= max_region_size
- if (max_exceeded or is_silence) and region_start:
- if elapsed_time - region_start >= min_region_size:
- regions.append((region_start, elapsed_time))
- region_start = None
- elif (not region_start) and (not is_silence):
- region_start = elapsed_time
- elapsed_time += chunk_duration
- return regions
- def generate_subtitles( # pylint: disable=too-many-locals,too-many-arguments
- source_path,
- output=None,
- concurrency=DEFAULT_CONCURRENCY,
- src_language=DEFAULT_SRC_LANGUAGE,
- dst_language=DEFAULT_DST_LANGUAGE,
- subtitle_file_format=DEFAULT_SUBTITLE_FORMAT,
- api_key=None,
- ):
- """
- Given an input audio/video file, generate subtitles in the specified language and format.
- """
- if os.name != "nt" and "Darwin" in os.uname():
- #the default unix fork method does not work on Mac OS
- #need to use forkserver
- if 'forkserver' != multiprocessing.get_start_method(allow_none=True):
- multiprocessing.set_start_method('forkserver')
- audio_filename, audio_rate = extract_audio(source_path)
- regions = find_speech_regions(audio_filename)
- pool = multiprocessing.Pool(concurrency)
- converter = FLACConverter(source_path=audio_filename)
- recognizer = SpeechRecognizer(language=src_language, rate=audio_rate,
- api_key=GOOGLE_SPEECH_API_KEY)
- transcripts = []
- if regions:
- try:
- widgets = ["Converting speech regions to FLAC files: ", Percentage(), ' ', Bar(), ' ',
- ETA()]
- pbar = ProgressBar(widgets=widgets, maxval=len(regions)).start()
- extracted_regions = []
- for i, extracted_region in enumerate(pool.imap(converter, regions)):
- extracted_regions.append(extracted_region)
- pbar.update(i)
- pbar.finish()
- widgets = ["Performing speech recognition: ", Percentage(), ' ', Bar(), ' ', ETA()]
- pbar = ProgressBar(widgets=widgets, maxval=len(regions)).start()
- for i, transcript in enumerate(pool.imap(recognizer, extracted_regions)):
- transcripts.append(transcript)
- pbar.update(i)
- pbar.finish()
- if src_language.split("-")[0] != dst_language.split("-")[0]:
- if api_key:
- google_translate_api_key = api_key
- translator = Translator(dst_language, google_translate_api_key,
- dst=dst_language,
- src=src_language)
- prompt = "Translating from {0} to {1}: ".format(src_language, dst_language)
- widgets = [prompt, Percentage(), ' ', Bar(), ' ', ETA()]
- pbar = ProgressBar(widgets=widgets, maxval=len(regions)).start()
- translated_transcripts = []
- for i, transcript in enumerate(pool.imap(translator, transcripts)):
- translated_transcripts.append(transcript)
- pbar.update(i)
- pbar.finish()
- transcripts = translated_transcripts
- else:
- print(
- "Error: Subtitle translation requires specified Google Translate API key. "
- "See --help for further information."
- )
- return 1
- except KeyboardInterrupt:
- pbar.finish()
- pool.terminate()
- pool.join()
- print("Cancelling transcription")
- raise
- timed_subtitles = [(r, t) for r, t in zip(regions, transcripts) if t]
- formatter = FORMATTERS.get(subtitle_file_format)
- formatted_subtitles = formatter(timed_subtitles)
- dest = output
- if not dest:
- base = os.path.splitext(source_path)[0]
- dest = "{base}.{format}".format(base=base, format=subtitle_file_format)
- with open(dest, 'wb') as output_file:
- output_file.write(formatted_subtitles.encode("utf-8"))
- os.remove(audio_filename)
- return dest
- def validate(args):
- """
- Check that the CLI arguments passed to autosub are valid.
- """
- if args.format not in FORMATTERS:
- print(
- "Subtitle format not supported. "
- "Run with --list-formats to see all supported formats."
- )
- return False
- if args.src_language not in LANGUAGE_CODES.keys():
- print(
- "Source language not supported. "
- "Run with --list-languages to see all supported languages."
- )
- return False
- if args.dst_language not in LANGUAGE_CODES.keys():
- print(
- "Destination language not supported. "
- "Run with --list-languages to see all supported languages."
- )
- return False
- if not args.source_path:
- print("Error: You need to specify a source path.")
- return False
- return True
- def main():
- """
- Run autosub as a command-line program.
- """
- parser = argparse.ArgumentParser()
- parser.add_argument('source_path', help="Path to the video or audio file to subtitle",
- nargs='?')
- parser.add_argument('-C', '--concurrency', help="Number of concurrent API requests to make",
- type=int, default=DEFAULT_CONCURRENCY)
- parser.add_argument('-o', '--output',
- help="Output path for subtitles (by default, subtitles are saved in \
- the same directory and name as the source path)")
- parser.add_argument('-F', '--format', help="Destination subtitle format",
- default=DEFAULT_SUBTITLE_FORMAT)
- parser.add_argument('-S', '--src-language', help="Language spoken in source file",
- default=DEFAULT_SRC_LANGUAGE)
- parser.add_argument('-D', '--dst-language', help="Desired language for the subtitles",
- default=DEFAULT_DST_LANGUAGE)
- parser.add_argument('-K', '--api-key',
- help="The Google Translate API key to be used. \
- (Required for subtitle translation)")
- parser.add_argument('--list-formats', help="List all available subtitle formats",
- action='store_true')
- parser.add_argument('--list-languages', help="List all available source/destination languages",
- action='store_true')
- args = parser.parse_args()
- if args.list_formats:
- print("List of formats:")
- for subtitle_format in FORMATTERS:
- print("{format}".format(format=subtitle_format))
- return 0
- if args.list_languages:
- print("List of all languages:")
- for code, language in sorted(LANGUAGE_CODES.items()):
- print("{code}\t{language}".format(code=code, language=language))
- return 0
- if not validate(args):
- return 1
- try:
- subtitle_file_path = generate_subtitles(
- source_path=args.source_path,
- concurrency=args.concurrency,
- src_language=args.src_language,
- dst_language=args.dst_language,
- api_key=args.api_key,
- subtitle_file_format=args.format,
- output=args.output,
- )
- print("Subtitles file created at {}".format(subtitle_file_path))
- except KeyboardInterrupt:
- return 1
- return 0
- if __name__ == '__main__':
- sys.exit(main())
|