123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- #!usr/bin/env python
- # -*- coding: utf-8 -*-
- # author: kuangdd
- # date: 2019/11/23
- """
- ### audio_noise_remover
- 语音降噪,降低环境噪声。
- """
- from pathlib import Path
- import logging
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger(Path(__name__).stem)
- import numpy as np
- import ctypes as ct
- from .audio_io import load_wav, save_wav
- from .audio_io import _sr
- import traceback
- def remove_noise(wav: np.array, sr=_sr, **kwargs):
- """
- 谱减法去除背景噪声。
- :param wav: 语音信号
- :param sr: 采样率
- :param kwargs:
- :return: np.ndarray
- """
- x = wav
- noise_span = kwargs.get("noise_span", (0, 100))
- noise_wav = kwargs.get("noise_wav", None)
- threshold = kwargs.get("threshold", 3)
- beta = kwargs.get("beta", 0.002)
- # 计算参数
- unit_ = 20 # 每帧时长,单位ms
- len_ = unit_ * sr // 1000 # 样本中帧的大小
- PERC = 50 # 窗口重叠占帧的百分比
- len1 = len_ * PERC // 100 # 重叠窗口
- len2 = len_ - len1 # 非重叠窗口
- # 设置默认参数
- Thres = threshold
- Expnt = 2.0
- beta = beta
- G = 0.9
- # 初始化汉明窗
- win = np.hamming(len_)
- # normalization gain for overlap+add with 50% overlap
- winGain = len2 / sum(win)
- # Noise magnitude calculations - assuming that the first 5 frames is noise/silence
- nFFT = 2 * 2 ** (nextpow2(len_))
- noise_mean = np.zeros(nFFT)
- if noise_wav is None:
- sidx = noise_span[0] // unit_
- eidx = noise_span[1] // unit_
- for k in range(sidx, eidx):
- noise_mean = noise_mean + abs(np.fft.fft(win * x[k * len_:(k + 1) * len_], nFFT))
- noise_mu = noise_mean / (eidx - sidx)
- else:
- if "noise_span" in kwargs:
- sidx = noise_span[0] // unit_
- eidx = noise_span[1] // unit_
- else:
- sidx = 0
- eidx = len(noise_wav) // unit_
- for k in range(sidx, eidx):
- noise_mean = noise_mean + abs(np.fft.fft(win * x[k * len_:(k + 1) * len_], nFFT))
- noise_mu = noise_mean / (eidx - sidx)
- # --- allocate memory and initialize various variables
- k = 1
- img = 1j
- x_old = np.zeros(len1)
- Nframes = len(x) // len2 - 1
- xfinal = np.zeros(Nframes * len2)
- # ========================= Start Processing ===============================
- for n in range(0, Nframes):
- # Windowing
- insign = win * x[k - 1:k + len_ - 1]
- # compute fourier transform of a frame
- spec = np.fft.fft(insign, nFFT)
- # compute the magnitude
- sig = abs(spec)
- # save the noisy phase information
- theta = np.angle(spec)
- SNRseg = 10 * np.log10(np.linalg.norm(sig, 2) ** 2 / np.linalg.norm(noise_mu, 2) ** 2)
- if Expnt == 1: # 幅度谱
- alpha = berouti1(SNRseg)
- else: # 功率谱
- alpha = berouti(SNRseg)
- #############
- sub_speech = sig ** Expnt - alpha * noise_mu ** Expnt
- # 当纯净信号小于噪声信号的功率时
- diffw = sub_speech - beta * noise_mu ** Expnt
- # beta negative components
- z = find_index(diffw)
- if len(z) > 0:
- # 用估计出来的噪声信号表示下限值
- sub_speech[z] = beta * noise_mu[z] ** Expnt
- # --- implement a simple VAD detector --------------
- if SNRseg < Thres: # Update noise spectrum
- noise_temp = G * noise_mu ** Expnt + (1 - G) * sig ** Expnt # 平滑处理噪声功率谱
- noise_mu = noise_temp ** (1 / Expnt) # 新的噪声幅度谱
- # flipud函数实现矩阵的上下翻转,是以矩阵的“水平中线”为对称轴
- # 交换上下对称元素
- sub_speech[nFFT // 2 + 1:nFFT] = np.flipud(sub_speech[1:nFFT // 2])
- x_phase = (sub_speech ** (1 / Expnt)) * (
- np.array([np.cos(x) for x in theta]) + img * (np.array([np.sin(x) for x in theta])))
- # take the IFFT
- xi = np.fft.ifft(x_phase).real
- # --- Overlap and add ---------------
- xfinal[k - 1:k + len2 - 1] = x_old + xi[0:len1]
- x_old = xi[0 + len1:len_]
- k = k + len2
- return winGain * xfinal
- def remove_noise_os(inpath, outpath, **kwargs):
- try:
- wav, sr = load_wav(inpath, with_sr=True)
- out = remove_noise(wav, sr, **kwargs)
- save_wav(out, outpath, sr)
- except Exception as e:
- logger.info('Error path: {}'.format(inpath))
- logger.info('Error info: {}'.format(e))
- traceback.print_exc()
- class FloatBits(ct.Structure):
- _fields_ = [
- ('M', ct.c_uint, 23),
- ('E', ct.c_uint, 8),
- ('S', ct.c_uint, 1)
- ]
- class Float(ct.Union):
- _anonymous_ = ('bits',)
- _fields_ = [
- ('value', ct.c_float),
- ('bits', FloatBits)
- ]
- def nextpow2(x):
- if x < 0:
- x = -x
- if x == 0:
- return 0
- d = Float()
- d.value = x
- if d.M == 0:
- return d.E - 127
- return d.E - 127 + 1
- def berouti(SNR):
- if -5.0 <= SNR <= 20.0:
- a = 4 - SNR * 3 / 20
- elif SNR < -5.0:
- a = 5
- else:
- a = 1
- return a
- def berouti1(SNR):
- if -5.0 <= SNR <= 20.0:
- a = 3 - SNR * 2 / 20
- elif SNR < -5.0:
- a = 4
- else:
- a = 1
- return a
- def find_index(x_list):
- index_list = []
- for i in range(len(x_list)):
- if x_list[i] < 0:
- index_list.append(i)
- return index_list
- if __name__ == "__main__":
- print(__file__)
|