diff --git a/ITU_P56.py b/ITU_P56.py index 7e6291e..6952cea 100644 --- a/ITU_P56.py +++ b/ITU_P56.py @@ -31,6 +31,7 @@ def asl_P56(x, fs, nbits): H = 0.2 # hangover time in seconds M = 15.9 # margin in dB of the difference between threshold and ASL thres_no = nbits-1 # number of thresholds, for 16 bit, it's 15 + c0 = None I = np.ceil(fs*H) # hangover in samples g = np.exp(-1/(fs*T)) # smoothing factor in envelop detection diff --git a/eeg_test_thread.py b/eeg_test_thread.py index 3a3be14..aeb053b 100644 --- a/eeg_test_thread.py +++ b/eeg_test_thread.py @@ -12,6 +12,7 @@ from shutil import copy2 import re import sounddevice as sd from ITU_P56 import asl_P56 +from snrops import rms_no_silences from hearing_loss_sim import apply_hearing_loss_sim from test_base import BaseThread, run_test_thread @@ -250,15 +251,17 @@ class EEGTestThread(BaseThread): speech = audio[:, :2] triggers = audio[:, 2] - speech_rms, _, _ = asl_P56(speech, fs, 16.) + #speech_rms, _, _ = asl_P56(speech, fs, 16.) + rms_no_silences(speech, fs, -30.) + wf = [] wm = [] for ind2, s in enumerate(snr): start = randint(0, noise_file.frames()-speech.shape[0]) noise_file.seek(start) noise = noise_file.read_frames(speech.shape[0]) - #noise_rms = np.sqrt(np.mean(noise**2)) - noise_rms = asl_P56(noise, fs, 16) + noise_rms = np.sqrt(np.mean(noise**2)) + # noise_rms = asl_P56(noise, fs, 16) snr_fs = 10**(-s/20) if snr_fs == np.inf: snr_fs = 0. diff --git a/matrix_test/behavioural_stim/gen_noise.py b/matrix_test/behavioural_stim/gen_noise.py index 3c199ae..117fd12 100755 --- a/matrix_test/behavioural_stim/gen_noise.py +++ b/matrix_test/behavioural_stim/gen_noise.py @@ -20,6 +20,7 @@ from pysndfile import PySndfile, sndio import matplotlib.pyplot as plt from ITU_P56 import asl_P56 from pathlib import Path +from snrops import rms_no_silences from multiprocessing.dummy import Pool as ThreadPool import multiprocessing @@ -215,7 +216,8 @@ def gen_noise(OutDir, b, fs): y = noise_norm_wav.read_frames(fs*60) y = y/(np.abs(y).max() * 0.95) # rms = np.sqrt(np.mean(y**2)) - rms, _, _ = asl_P56(y, fs, 16) + # rms, _, _ = asl_P56(y, fs, 16) + rms = rms_no_silences(y, fs, -30.) print(f"Noise level: {rms}") peak = np.abs(y).max() @@ -235,7 +237,9 @@ def calc_speech_rms(files, silences, rmsDir, fs=44100, plot=False): def level_calc(args): ind, wavfile = args x, fs, _ = sndio.read(wavfile) - level = asl_P56(x, fs, 16.)[0] + # level = asl_P56(x, fs, 16.)[0] + level = rms_no_silences(x, fs, -30.) + print(f"Calculated level of {Path(wavfile).name} ({ind+1}/{n_files}): {level}") return level diff --git a/matrix_test/helper_modules/signalops.py b/matrix_test/helper_modules/signalops.py index bd961f0..6c1a6b0 100644 --- a/matrix_test/helper_modules/signalops.py +++ b/matrix_test/helper_modules/signalops.py @@ -147,10 +147,9 @@ def gen_trigger(x, freq, length, fs): def calc_rms(y, window, plot=False): y_2 = y**2 - rms = np.zeros(y_2.size + round(window/2.)) + y_2 = np.pad(y_2, (round(window/2.)-1, round(window/2.)), 'constant', constant_values=(0, 0)) y_i = rolling_window_lastaxis(y_2, window) - for ind, frame in enumerate(y_i): - rms[ind+round(window/2.)] = np.sqrt(np.mean(frame)) + rms = np.sqrt(np.mean(y_i, axis=1)) rms[np.isnan(rms)] = 0 if plot: plt.plot(y) diff --git a/matrix_test_thread.py b/matrix_test_thread.py index 02d363c..eae1cee 100644 --- a/matrix_test_thread.py +++ b/matrix_test_thread.py @@ -19,6 +19,8 @@ from matrix_test.helper_modules.filesystem import globDir from test_base import BaseThread import sounddevice as sd import pdb +from ITU_P56 import asl_P56 +from snrops import rms_no_silences from config import socketio from hearing_loss_sim import apply_hearing_loss_sim @@ -400,7 +402,8 @@ class MatTestThread(BaseThread): # Read in audio file and calculate it's RMS x, self.fs, _ = sndio.read(fp) logger.info(f"Calculating level for {Path(fp).name}") - x_rms, _, _ = asl_P56(x, self.fs, 16.) + # x_rms, _, _ = asl_P56(x, self.fs, 16.) + x_rms = rms_no_silences(x, self.fs, -30.) self.lists[-1].append(x) self.listsRMS[-1].append(x_rms) self.listsString[-1].append(words) @@ -537,8 +540,10 @@ class AdaptiveTrack(): end = start + noiseLen self.noise.seek(start) x_noise = self.noise.read_frames(end-start) + # x_rms = np.sqrt(np.mean(x**2)) # Scale noise to match the RMS of the speech - x_noise *= x_rms/self.noise_rms + noise_rms = np.sqrt(np.mean(x_noise**2)) + x_noise *= x_rms/noise_rms y = x_noise # Set speech to start 500ms after the noise, scaled to the desired SNR sigStart = random.randint(round(self.fs/2.), round(2*self.fs)) diff --git a/rms_test.py b/rms_test.py new file mode 100644 index 0000000..07a76e5 --- /dev/null +++ b/rms_test.py @@ -0,0 +1,11 @@ +from pysndfile import sndio +from snrops import rms_no_silences + +def main(): + x, fs, enc = sndio.read('./matrix_test/behavioural_stim/stimulus/wav/sentence-lists/ukmatrix10.1/Trial_00001.wav') + rms = rms_no_silences(x, fs, -30) + breakpoint() + + +if __name__ == '__main__': + main() diff --git a/snrops.py b/snrops.py new file mode 100644 index 0000000..f1e7034 --- /dev/null +++ b/snrops.py @@ -0,0 +1,39 @@ +import sys +sys.path.insert(0, "matrix_test/helper_modules") +import numpy as np +from signalops import rolling_window_lastaxis, calc_rms + + +def detect_silences(x, fs, threshold=-30.): + print("Detecting silence in wav files...") + if len(x.shape) < 2: + x = x[:, np.newaxis] + x = x.sum(axis=1)/2. + env = calc_rms(x, window=int(fs*0.1)) + threshold = (10**(threshold/20.))*np.max(env) + silence = env < threshold + # Get segment start end indexes for all silences in envelope + sil_start = np.where(np.sign(np.diff(silence.astype(float))) == 1)[0] + sil_end = np.where(np.sign(np.diff(silence.astype(float))) == -1)[0] + if silence[0]: + sil_start = np.concatenate([[0], sil_start]) + if silence[-1]: + sil_end = np.concatenate([sil_end, [env.size]]) + segs = np.vstack([sil_start, sil_end]).T + validSegs = np.diff(segs) > 0.02*fs + segs = segs[np.repeat(validSegs, 2, axis=1)].reshape(-1, 2) + return segs + + +def slices_to_mask(slices, mask_length): + out = np.zeros(mask_length, dtype=bool) + for s in slices: + out[s[0]:s[1]] = True + return out + + +def rms_no_silences(x, fs, threshold): + silences = detect_silences(x, fs, threshold) + sil_mask = slices_to_mask(silences, x.size) + rms = np.sqrt(np.mean(x[~sil_mask]**2)) + return rms