resolved conflicts in merge in favour of remote. affected file: analysis_tools.py
This commit is contained in:
@@ -8,14 +8,13 @@ def my_first_DFT(input, window_size=4096):
|
||||
# Create arrays at half the size of the sample provided.
|
||||
# These will store the real and imaginary values as amplitudes of sine and
|
||||
# cosine waves for each frequency bin
|
||||
#window = np.hamming(window_size)
|
||||
#input = input*window
|
||||
real_values = np.zeros(window_size/2+1)
|
||||
imaginary_values = np.zeros(window_size/2+1)
|
||||
real_values = np.zeros(window_size/2)
|
||||
imaginary_values = np.zeros(window_size/2)
|
||||
|
||||
# Create vectors for each index of the input and the output arrays.
|
||||
|
||||
i = np.arange(window_size)
|
||||
k = np.arange(window_size/2+1)
|
||||
k = np.arange(window_size/2)
|
||||
|
||||
# For each index in the input array...
|
||||
for ind in i:
|
||||
@@ -28,13 +27,79 @@ def my_first_DFT(input, window_size=4096):
|
||||
# Imaginary values are similar to real values, but use sine waves
|
||||
# as the basis functions as oppsoed to cosine waves.
|
||||
imaginary_values[ind2] -= input[ind] * np.sin(2*np.pi*ind2*ind/window_size)
|
||||
|
||||
# Values are returned as arrays of amplitudes between 0.0 and 1.0 for the
|
||||
# waves of each frequency bin.
|
||||
'''
|
||||
real_values = real_values / (window_size / 2+1)
|
||||
imaginary_values = imaginary_values / (window_size / 2+1)
|
||||
'''
|
||||
|
||||
return (real_values, imaginary_values)
|
||||
|
||||
def my_first_fft(input, window_size=2048):
|
||||
X = np.array(input, dtype=complex)
|
||||
N = window_size
|
||||
|
||||
# M is the base 2 logarithm of the window size.
|
||||
# M represents the number of stage required in the decomposition.
|
||||
M = int(np.log2(window_size))
|
||||
|
||||
# For every index from index 1 up to the length of the input -2...
|
||||
i = np.arange(window_size-2)
|
||||
i += 1
|
||||
J = window_size/2
|
||||
for ind in i:
|
||||
# If the index is less than half the total number of input samples...
|
||||
if ind <= J:
|
||||
# Swap values at current index and index half the input value
|
||||
X[ind], X[J] = X[J], X[ind]
|
||||
# K is the length of the input divided by 2
|
||||
K = window_size/2
|
||||
# While K <= J...
|
||||
while K <= J:
|
||||
# J is it's current value - K's current value
|
||||
J -= K
|
||||
# Divide K by 2
|
||||
K /= 2
|
||||
#J is it's current value + K's current value
|
||||
J += K
|
||||
|
||||
# For each stage of combining the N frequency spectra
|
||||
L = 1
|
||||
while L < M:
|
||||
LE = int(2**L)
|
||||
LE2 = LE/2
|
||||
U = 1+0j
|
||||
# Calculate the index to the power of 2
|
||||
# Divide the result by 2
|
||||
# Calculate the cosine using the index bin number
|
||||
S=np.cos(np.pi/float(LE2))-np.sin(np.pi/float(LE2))*1j
|
||||
|
||||
# for each sub DFT...
|
||||
Jind = np.arange(LE2)
|
||||
for J in Jind:
|
||||
I = J
|
||||
while I < N-1:
|
||||
print(X)
|
||||
IP = I+LE2
|
||||
# Butterfly calculation
|
||||
T = X[I] + X[IP]
|
||||
X[IP] = (X[I] - X[IP]) * U
|
||||
X[I] = T
|
||||
I += LE
|
||||
U = U * S
|
||||
L+=1
|
||||
pdb.set_trace()
|
||||
return X
|
||||
|
||||
|
||||
|
||||
def window_input(input):
|
||||
|
||||
window = np.hamming(window_size)
|
||||
return input*window
|
||||
|
||||
def my_first_IDFT(real_input, imaginary_input, window_size=4096):
|
||||
# Create an array to store accumulated values of sine and cosine waves
|
||||
# generated
|
||||
@@ -56,7 +121,7 @@ def my_first_IDFT(real_input, imaginary_input, window_size=4096):
|
||||
return out
|
||||
|
||||
def rect_to_pol(real, imaginary):
|
||||
magnitude = np.power((np.power(real, 2) + np.power(imaginary, 2)), 0.5)
|
||||
magnitude = np.sqrt(np.power(real, 2) + np.power(imaginary, 2))
|
||||
# calculate the phases through the equation: arctan(x/y)
|
||||
# the arctan2 function is used to account for the divide by zero problem
|
||||
# aswell as the incorrect arctan problem
|
||||
@@ -93,6 +158,10 @@ def amp_to_dB(a):
|
||||
if __name__ == "__main__":
|
||||
# Create a signal containing 2 cosine waves at 3hz and 9hz, and a single
|
||||
# sine wave at 5 hz
|
||||
|
||||
input = np.arange(16)
|
||||
my_first_fft(input, window_size=16)
|
||||
|
||||
samples = np.arange(512)
|
||||
freq = np.linspace(3, 10, samples.size)
|
||||
amp = np.linspace(1, 0, samples.size)
|
||||
@@ -101,7 +170,6 @@ if __name__ == "__main__":
|
||||
test_wave += np.cos(2*np.pi*9*samples/samples.size)
|
||||
test_wave += np.sin(2*np.pi*5*samples/samples.size)
|
||||
test_wave += np.cos(2*np.pi*230*samples/samples.size)
|
||||
"""
|
||||
#test_wave = amp * test_wave
|
||||
#test_wave = np.sin(2*np.pi*freq*samples/samples.size)
|
||||
plt.subplot(311)
|
||||
@@ -117,7 +185,7 @@ if __name__ == "__main__":
|
||||
plt.plot(test_wave)
|
||||
plt.show()
|
||||
|
||||
"""
|
||||
|
||||
# Plot test wave
|
||||
plt.subplot(311)
|
||||
plt.title('Test Wave')
|
||||
@@ -129,27 +197,69 @@ if __name__ == "__main__":
|
||||
|
||||
# Generate real and imaginary values by performing a DFT analysis. Store
|
||||
# these values as 2 arrays with size == samples/2
|
||||
output = my_first_DFT(test_wave, window_size=samples.size)
|
||||
# output = my_first_DFT(test_wave, window_size=samples.size)
|
||||
|
||||
impulse = my_first_IDFT(np.hstack(([1], np.zeros(255))), np.zeros(256), window_size=512)
|
||||
# impulse = my_first_IDFT(np.hstack((np.zeros(110), [1], np.zeros(145))), np.zeros(256), window_size=512)
|
||||
impulse = np.hstack((np.zeros(3), [1], np.zeros(60)))
|
||||
windowed_impulse = impulse*np.hamming(64)
|
||||
|
||||
# amp_to_dB(polar_values[0])
|
||||
window_dft = my_first_DFT(impulse, window_size=512)
|
||||
polar_values = rect_to_pol(window_dft[0], window_dft[1])
|
||||
plt.subplot(211)
|
||||
plt.yscale('log')
|
||||
plt.plot(polar_values[0])
|
||||
|
||||
windowed_impulse = impulse*np.hamming(512)
|
||||
'''
|
||||
window_dft = my_first_DFT(windowed_impulse, window_size=512)
|
||||
polar_values = rect_to_pol(window_dft[0], window_dft[1])
|
||||
'''
|
||||
'''
|
||||
# Check phase and amplitude of an impulse
|
||||
dft = my_first_DFT(impulse, window_size=64)
|
||||
polar_values = rect_to_pol(dft[0], dft[1])
|
||||
|
||||
amp_to_dB(polar_values[0])
|
||||
plt.subplot(411)
|
||||
plt.title('Magnitude')
|
||||
plt.plot(polar_values[0])
|
||||
plt.subplot(412)
|
||||
plt.title('Phase')
|
||||
plt.plot(polar_values[1])
|
||||
plt.subplot(413)
|
||||
plt.title('Real Part')
|
||||
plt.plot(dft[0])
|
||||
plt.subplot(414)
|
||||
plt.title('Imaginary Part')
|
||||
plt.plot(dft[1])
|
||||
plt.show()
|
||||
'''
|
||||
|
||||
# Check phase and amplitude of an sinc impulse
|
||||
impulse = np.hstack((np.zeros(3), [1], np.zeros(508)))
|
||||
sinc_impulse = np.hstack((np.ones(32), np.zeros(449), np.ones(31)))
|
||||
dft = my_first_DFT(sinc_impulse, window_size=512)
|
||||
|
||||
# amp_to_dB(polar_values[0])
|
||||
plt.subplot(413)
|
||||
plt.title('Real Part')
|
||||
plt.plot(dft[0])
|
||||
plt.subplot(414)
|
||||
plt.title('Imaginary Part')
|
||||
plt.plot(dft[1])
|
||||
polar_values = rect_to_pol(dft[0], dft[1])
|
||||
plt.subplot(411)
|
||||
plt.title('Magnitude')
|
||||
plt.plot(polar_values[0])
|
||||
plt.subplot(412)
|
||||
plt.title('Phase')
|
||||
plt.plot(unwrap_phase(polar_values[1]))
|
||||
print(polar_values[1])
|
||||
plt.show()
|
||||
exit()
|
||||
|
||||
window_dft = my_first_DFT(impulse, window_size=2048)
|
||||
polar_values = rect_to_pol(window_dft[0], window_dft[1])
|
||||
plt.subplot(212)
|
||||
plt.yscale('log')
|
||||
plt.plot(polar_values[0])
|
||||
plt.show()
|
||||
exit()
|
||||
|
||||
# Plot output arrays to show
|
||||
y_formatter = matplotlib.ticker.ScalarFormatter(useOffset=False)
|
||||
plt.subplot(312)
|
||||
plt.title('Real values')
|
||||
plt.axis((0, output[0].size, -1.2, 1.2))
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
from __future__ import print_function, division
|
||||
import numpy as np
|
||||
from sppysound import AudioFile
|
||||
import matplotlib.pyplot as plt
|
||||
import pdb
|
||||
import scipy
|
||||
|
||||
|
||||
def convolve(input, impulse_response):
|
||||
@@ -9,9 +13,184 @@ def convolve(input, impulse_response):
|
||||
out[input_ind+imp_ind] = out[input_ind+imp_ind] + i*j
|
||||
return out
|
||||
|
||||
def moving_average_filter_recursive(input, M, symetry = 'after'):
|
||||
'''
|
||||
Applies a moving average filter to the input.
|
||||
|
||||
Arguments:
|
||||
input - the input signal to filter.
|
||||
symetry - ('before' or 'middle') defines how points will be
|
||||
averaged around the index
|
||||
M - the number of coefficients.
|
||||
'''
|
||||
# Calculate the filter coefficients
|
||||
filter_kernal = np.ones(M) / M
|
||||
# Get the pre-zero-padded input size.
|
||||
input_size = input.size
|
||||
# Pad end of input with zeros.
|
||||
if symetry == 'after':
|
||||
# Zero-pad input at end on input for averaging of end samples
|
||||
input = np.hstack((input, np.zeros(M)))
|
||||
elif symetry == 'middle':
|
||||
# M value must be odd to have an equal number of samples on each side.
|
||||
if not M % 2:
|
||||
raise ValueError("M must be odd for symetrical averaging")
|
||||
# Calculate the zero padding size.
|
||||
offset = np.floor(M/2.0)
|
||||
# Zero pad input on both sides to allow for averaging from first sample
|
||||
# to last sample
|
||||
input = np.hstack((np.zeros(offset), input, np.zeros(offset)))
|
||||
|
||||
|
||||
# Calculate the number of output samples.
|
||||
# y = np.zeros(input.size-M)
|
||||
|
||||
y = np.zeros(input.size-M)
|
||||
# If averaging after first sample.
|
||||
if symetry == 'after':
|
||||
# For each sample in the input
|
||||
acc = 0
|
||||
|
||||
i = 0
|
||||
while i < M:
|
||||
acc += input[i]
|
||||
i += 1
|
||||
y[0] = acc / M
|
||||
|
||||
i = 1
|
||||
while i < input.size-M:
|
||||
acc += input[i+M-1] - input[i-1]
|
||||
y[i] = acc/M
|
||||
i += 1
|
||||
print(y)
|
||||
|
||||
elif symetry == 'middle':
|
||||
# TODO: Make recursive
|
||||
i = 0
|
||||
# For all the input samples
|
||||
while i < input_size-offset:
|
||||
# The output sample is the average sample value for M samples.
|
||||
y[i] = np.sum(input[i:i+M] * filter_kernal)
|
||||
i += 1
|
||||
return y
|
||||
|
||||
def moving_average_filter(input, M, symetry = 'after'):
|
||||
'''
|
||||
Applies a moving average filter to the input.
|
||||
|
||||
Arguments:
|
||||
input - the input signal to filter.
|
||||
symetry - ('before' or 'middle') defines how points will be
|
||||
averaged around the index
|
||||
M - the number of coefficients.
|
||||
'''
|
||||
# Calculate the filter coefficients
|
||||
filter_kernal = np.ones(M) / M
|
||||
# Get the pre-zero-padded input size.
|
||||
input_size = input.size
|
||||
# Pad end of input with zeros.
|
||||
if symetry == 'after':
|
||||
# Zero-pad input at end on input for averaging of end samples
|
||||
input = np.hstack((input, np.zeros(M)))
|
||||
elif symetry == 'middle':
|
||||
# M value must be odd to have an equal number of samples on each side.
|
||||
if not M % 2:
|
||||
raise ValueError("M must be odd for symetrical averaging")
|
||||
# Calculate the zero padding size.
|
||||
offset = np.floor(M/2.0)
|
||||
# Zero pad input on both sides to allow for averaging from first sample
|
||||
# to last sample
|
||||
input = np.hstack((np.zeros(offset), input, np.zeros(offset)))
|
||||
|
||||
|
||||
# Calculate the number of output samples.
|
||||
y = np.zeros(input.size-M)
|
||||
|
||||
# If averaging after first sample.
|
||||
if symetry == 'after':
|
||||
i = 0
|
||||
# For each sample in the input
|
||||
while i < input_size:
|
||||
y[i] = np.sum(input[i:i+M] / M)
|
||||
i += 1
|
||||
# If averaging symetrically
|
||||
elif symetry == 'middle':
|
||||
i = 0
|
||||
# For all the input samples
|
||||
while i < input_size-offset:
|
||||
# The output sample is the average sample value for M samples.
|
||||
y[i] = np.sum(input[i:i+M] / M)
|
||||
i += 1
|
||||
return y
|
||||
|
||||
def blackman_filter(input, window_size, freq):
|
||||
'''
|
||||
Create a blackman windowed-sinc filter.
|
||||
|
||||
freq - The cutoff frequency of the filter specified as a proportion of the
|
||||
samplerate of the signal.
|
||||
'''
|
||||
# TODO: Check the definition of freq is correct.
|
||||
|
||||
i = np.arange(window_size)
|
||||
# Create a sinc function of M length.
|
||||
# The output will be a sinc function shifted from -M/2 - M/2 to 0 - M.
|
||||
# This will result in a sinc function that can be used to create a filter
|
||||
# at the cutoff-frequency provided in freq.
|
||||
sinc_kernal = np.sin(2*np.pi*freq*(i-window_size/2))/(i-window_size/2)
|
||||
|
||||
# Create a blackman window
|
||||
window = 0.42 - 0.5 * np.cos(2 * np.pi * (i / window_size)) + 0.08 * np.cos(4 * np.pi * (i / window_size))
|
||||
window_sinc = sinc_kernal * window
|
||||
|
||||
# Number of samplepoints
|
||||
N = window_size
|
||||
# sample spacing
|
||||
T = 1.0 / 800.0
|
||||
yf = scipy.fftpack.fft(window_sinc)
|
||||
xf = np.linspace(0.0, 1.0/(2.0*T), N/2)
|
||||
|
||||
|
||||
plt.subplot(311)
|
||||
plt.title('Blackman Window')
|
||||
plt.plot(window)
|
||||
plt.ylabel('Amplitude')
|
||||
plt.xlabel('sample')
|
||||
plt.subplot(312)
|
||||
plt.title('Window sinc function')
|
||||
plt.plot(sinc_kernal)
|
||||
plt.subplot(313)
|
||||
plt.title('FFT')
|
||||
plt.plot(xf, 2.0/N * np.abs(yf[0:N/2]))
|
||||
plt.show()
|
||||
|
||||
if __name__ == "__main__":
|
||||
'''
|
||||
a = np.array([1, 0.5, 3, 1])
|
||||
b = np.array([1, 0, 0, 0])
|
||||
c = convolve(a, b)
|
||||
print(c)
|
||||
print(np.convolve(a, b))
|
||||
'''
|
||||
with AudioFile('./test_audio.aif', 'r') as test_audio:
|
||||
grain = test_audio.read_grain(0, -1)
|
||||
grain = np.arange(5000)
|
||||
filtered_grain = moving_average_filter(grain, 101)
|
||||
filtered_r_grain = moving_average_filter_recursive(grain, 101)
|
||||
|
||||
blackman_filter(grain, 101, 0.14)
|
||||
|
||||
'''
|
||||
# Plot test wave
|
||||
plt.subplot(211)
|
||||
plt.title('Original Wave')
|
||||
plt.plot(grain)
|
||||
plt.ylabel('Amplitude')
|
||||
plt.xlabel('sample')
|
||||
plt.subplot(212)
|
||||
plt.title('Filtered Wave')
|
||||
plt.plot(filtered_grain)
|
||||
plt.ylabel('Amplitude')
|
||||
plt.xlabel('sample')
|
||||
plt.show()
|
||||
'''
|
||||
|
||||
Reference in New Issue
Block a user