audio_noise_remover.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. #!usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # author: kuangdd
  4. # date: 2019/11/23
  5. """
  6. ### audio_noise_remover
  7. 语音降噪,降低环境噪声。
  8. """
  9. from pathlib import Path
  10. import logging
  11. logging.basicConfig(level=logging.INFO)
  12. logger = logging.getLogger(Path(__name__).stem)
  13. import numpy as np
  14. import ctypes as ct
  15. from .audio_io import load_wav, save_wav
  16. from .audio_io import _sr
  17. import traceback
  18. def remove_noise(wav: np.array, sr=_sr, **kwargs):
  19. """
  20. 谱减法去除背景噪声。
  21. :param wav: 语音信号
  22. :param sr: 采样率
  23. :param kwargs:
  24. :return: np.ndarray
  25. """
  26. x = wav
  27. noise_span = kwargs.get("noise_span", (0, 100))
  28. noise_wav = kwargs.get("noise_wav", None)
  29. threshold = kwargs.get("threshold", 3)
  30. beta = kwargs.get("beta", 0.002)
  31. # 计算参数
  32. unit_ = 20 # 每帧时长,单位ms
  33. len_ = unit_ * sr // 1000 # 样本中帧的大小
  34. PERC = 50 # 窗口重叠占帧的百分比
  35. len1 = len_ * PERC // 100 # 重叠窗口
  36. len2 = len_ - len1 # 非重叠窗口
  37. # 设置默认参数
  38. Thres = threshold
  39. Expnt = 2.0
  40. beta = beta
  41. G = 0.9
  42. # 初始化汉明窗
  43. win = np.hamming(len_)
  44. # normalization gain for overlap+add with 50% overlap
  45. winGain = len2 / sum(win)
  46. # Noise magnitude calculations - assuming that the first 5 frames is noise/silence
  47. nFFT = 2 * 2 ** (nextpow2(len_))
  48. noise_mean = np.zeros(nFFT)
  49. if noise_wav is None:
  50. sidx = noise_span[0] // unit_
  51. eidx = noise_span[1] // unit_
  52. for k in range(sidx, eidx):
  53. noise_mean = noise_mean + abs(np.fft.fft(win * x[k * len_:(k + 1) * len_], nFFT))
  54. noise_mu = noise_mean / (eidx - sidx)
  55. else:
  56. if "noise_span" in kwargs:
  57. sidx = noise_span[0] // unit_
  58. eidx = noise_span[1] // unit_
  59. else:
  60. sidx = 0
  61. eidx = len(noise_wav) // unit_
  62. for k in range(sidx, eidx):
  63. noise_mean = noise_mean + abs(np.fft.fft(win * x[k * len_:(k + 1) * len_], nFFT))
  64. noise_mu = noise_mean / (eidx - sidx)
  65. # --- allocate memory and initialize various variables
  66. k = 1
  67. img = 1j
  68. x_old = np.zeros(len1)
  69. Nframes = len(x) // len2 - 1
  70. xfinal = np.zeros(Nframes * len2)
  71. # ========================= Start Processing ===============================
  72. for n in range(0, Nframes):
  73. # Windowing
  74. insign = win * x[k - 1:k + len_ - 1]
  75. # compute fourier transform of a frame
  76. spec = np.fft.fft(insign, nFFT)
  77. # compute the magnitude
  78. sig = abs(spec)
  79. # save the noisy phase information
  80. theta = np.angle(spec)
  81. SNRseg = 10 * np.log10(np.linalg.norm(sig, 2) ** 2 / np.linalg.norm(noise_mu, 2) ** 2)
  82. if Expnt == 1: # 幅度谱
  83. alpha = berouti1(SNRseg)
  84. else: # 功率谱
  85. alpha = berouti(SNRseg)
  86. #############
  87. sub_speech = sig ** Expnt - alpha * noise_mu ** Expnt
  88. # 当纯净信号小于噪声信号的功率时
  89. diffw = sub_speech - beta * noise_mu ** Expnt
  90. # beta negative components
  91. z = find_index(diffw)
  92. if len(z) > 0:
  93. # 用估计出来的噪声信号表示下限值
  94. sub_speech[z] = beta * noise_mu[z] ** Expnt
  95. # --- implement a simple VAD detector --------------
  96. if SNRseg < Thres: # Update noise spectrum
  97. noise_temp = G * noise_mu ** Expnt + (1 - G) * sig ** Expnt # 平滑处理噪声功率谱
  98. noise_mu = noise_temp ** (1 / Expnt) # 新的噪声幅度谱
  99. # flipud函数实现矩阵的上下翻转,是以矩阵的“水平中线”为对称轴
  100. # 交换上下对称元素
  101. sub_speech[nFFT // 2 + 1:nFFT] = np.flipud(sub_speech[1:nFFT // 2])
  102. x_phase = (sub_speech ** (1 / Expnt)) * (
  103. np.array([np.cos(x) for x in theta]) + img * (np.array([np.sin(x) for x in theta])))
  104. # take the IFFT
  105. xi = np.fft.ifft(x_phase).real
  106. # --- Overlap and add ---------------
  107. xfinal[k - 1:k + len2 - 1] = x_old + xi[0:len1]
  108. x_old = xi[0 + len1:len_]
  109. k = k + len2
  110. return winGain * xfinal
  111. def remove_noise_os(inpath, outpath, **kwargs):
  112. try:
  113. wav, sr = load_wav(inpath, with_sr=True)
  114. out = remove_noise(wav, sr, **kwargs)
  115. save_wav(out, outpath, sr)
  116. except Exception as e:
  117. logger.info('Error path: {}'.format(inpath))
  118. logger.info('Error info: {}'.format(e))
  119. traceback.print_exc()
  120. class FloatBits(ct.Structure):
  121. _fields_ = [
  122. ('M', ct.c_uint, 23),
  123. ('E', ct.c_uint, 8),
  124. ('S', ct.c_uint, 1)
  125. ]
  126. class Float(ct.Union):
  127. _anonymous_ = ('bits',)
  128. _fields_ = [
  129. ('value', ct.c_float),
  130. ('bits', FloatBits)
  131. ]
  132. def nextpow2(x):
  133. if x < 0:
  134. x = -x
  135. if x == 0:
  136. return 0
  137. d = Float()
  138. d.value = x
  139. if d.M == 0:
  140. return d.E - 127
  141. return d.E - 127 + 1
  142. def berouti(SNR):
  143. if -5.0 <= SNR <= 20.0:
  144. a = 4 - SNR * 3 / 20
  145. elif SNR < -5.0:
  146. a = 5
  147. else:
  148. a = 1
  149. return a
  150. def berouti1(SNR):
  151. if -5.0 <= SNR <= 20.0:
  152. a = 3 - SNR * 2 / 20
  153. elif SNR < -5.0:
  154. a = 4
  155. else:
  156. a = 1
  157. return a
  158. def find_index(x_list):
  159. index_list = []
  160. for i in range(len(x_list)):
  161. if x_list[i] < 0:
  162. index_list.append(i)
  163. return index_list
  164. if __name__ == "__main__":
  165. print(__file__)