974 lines
34 KiB
Python
974 lines
34 KiB
Python
from __future__ import print_function, division
|
|
import os
|
|
import shutil
|
|
import collections
|
|
from scipy import signal
|
|
import numpy as np
|
|
import pysndfile
|
|
import pdb
|
|
import sys
|
|
import traceback
|
|
import logging
|
|
import h5py
|
|
import multiprocessing as mp
|
|
from collections import namedtuple, defaultdict
|
|
import gc
|
|
from functools import wraps
|
|
|
|
from fileops import pathops
|
|
import analysis.RMSAnalysis as RMSAnalysis
|
|
import analysis.PeakAnalysis as PeakAnalysis
|
|
import analysis.AttackAnalysis as AttackAnalysis
|
|
import analysis.ZeroXAnalysis as ZeroXAnalysis
|
|
import analysis.FFTAnalysis as FFTAnalysis
|
|
import analysis.SpectralCentroidAnalysis as SpectralCentroidAnalysis
|
|
import analysis.SpectralSpreadAnalysis as SpectralSpreadAnalysis
|
|
import analysis.SpectralFluxAnalysis as SpectralFluxAnalysis
|
|
import analysis.SpectralCrestFactorAnalysis as SpectralCrestFactorAnalysis
|
|
import analysis.SpectralFlatnessAnalysis as SpectralFlatnessAnalysis
|
|
import analysis.CentroidAnalysis as CentroidAnalysis
|
|
import analysis.F0Analysis as F0Analysis
|
|
import analysis.VarianceAnalysis as VarianceAnalysis
|
|
import analysis.KurtosisAnalysis as KurtosisAnalysis
|
|
import analysis.SkewnessAnalysis as SkewnessAnalysis
|
|
import analysis.F0HarmRatioAnalysis as F0HarmRatioAnalysis
|
|
|
|
logger = logging.getLogger(__name__).addHandler(logging.NullHandler())
|
|
|
|
class AudioFile(object):
|
|
|
|
"""
|
|
Object for storing and accessing basic information for an audio file.
|
|
|
|
This object is a wrapper for the pysndfile audio object. It provides
|
|
additional functionality alongside the ability to open and close audiofiles
|
|
without deleting their containing object.
|
|
|
|
Arguments:
|
|
|
|
- filepath: path to the file to be opened/written to
|
|
|
|
- mode: mode to open file in. either 'r' (read) or 'w' (write)
|
|
|
|
- format: the file format to use when opening a new file for writing. see
|
|
get_sndfile_formats() for more information.
|
|
|
|
- channels: the number of audio channels to use.
|
|
|
|
- samplerate: the samplerate in herts to use.
|
|
|
|
- name: the sound file object name.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
filepath,
|
|
mode,
|
|
format=None,
|
|
channels=None,
|
|
samplerate=None,
|
|
name=None,
|
|
*args,
|
|
**kwargs
|
|
):
|
|
self.logger = logging.getLogger(__name__ + '.AudioFile')
|
|
self.logger.debug("Initialised AudioFile")
|
|
|
|
self.filepath = filepath
|
|
# TODO: If a name isn't provided then create a default name based on
|
|
# the file name without an extension
|
|
self.name = name
|
|
self.mode = mode
|
|
self.samplerate = samplerate
|
|
self.format = format
|
|
self.channels = channels
|
|
self.frames = None
|
|
self.times = None
|
|
|
|
def __enter__(self):
|
|
"""Allow AudioFile object to be opened by 'with' statements"""
|
|
self.logger.info("Opening soundfile {0}".format(self.filepath))
|
|
if self.mode == 'r':
|
|
if not os.path.exists(self.filepath):
|
|
raise IOError(
|
|
"Cannot open {0} for reading as it cannot be "
|
|
"found.".format(self.filepath)
|
|
)
|
|
self.pysndfile_object = pysndfile.PySndfile(
|
|
self.filepath,
|
|
mode=self.mode
|
|
)
|
|
self.samplerate = self.get_samplerate()
|
|
self.format = self.get_format()
|
|
self.channels = self.get_channels()
|
|
self.frames = self.get_frames()
|
|
return self
|
|
else:
|
|
self.pysndfile_object = pysndfile.PySndfile(
|
|
self.filepath,
|
|
mode=self.mode,
|
|
format=self.format,
|
|
channels=self.channels,
|
|
samplerate=self.samplerate
|
|
)
|
|
return self
|
|
|
|
def open(self):
|
|
"""Use for opening the associated audio file outside of a with statement"""
|
|
self.logger.debug("Opening soundfile {0}".format(self.filepath))
|
|
return self.__enter__()
|
|
|
|
def close(self):
|
|
"""Use for closing the associated audio file outside of a with statement"""
|
|
self.logger.debug("Closing soundfile {0}".format(self.filepath))
|
|
self.pysndfile_object = None
|
|
|
|
def __exit__(self, type, value, traceback):
|
|
"""Closes sound file when exiting 'with' statement."""
|
|
self.logger.debug("Closing soundfile {0}".format(self.filepath))
|
|
self.pysndfile_object = None
|
|
|
|
def __if_open(method):
|
|
"""Handles error from using methods when the audio file is closed"""
|
|
@wraps(method)
|
|
def wrapper(*args, **kwargs):
|
|
try:
|
|
return method(*args, **kwargs)
|
|
except AttributeError, err:
|
|
raise IOError("{0}: Audio file isn't open.".format(err), sys.exc_info()[2])
|
|
|
|
return wrapper
|
|
|
|
@__if_open
|
|
def get_channels(self):
|
|
"""Return number of channels of sndfile."""
|
|
self.channels = self.pysndfile_object.channels()
|
|
return self.channels
|
|
|
|
@__if_open
|
|
def encoding_str(self):
|
|
"""
|
|
Return string representation of encoding (e.g. pcm16).
|
|
|
|
See get_sndfile_encodings() for a list of available encoding strings
|
|
that are supported by a given sndfile format
|
|
"""
|
|
return self.pysndfile_object.encoding_str()
|
|
|
|
@__if_open
|
|
def error(self):
|
|
"""Report error numbers related to the current sound file."""
|
|
return self.pysndfile_object.error()
|
|
|
|
@__if_open
|
|
def get_format(self):
|
|
"""Return raw format specification from sndfile."""
|
|
return self.pysndfile_object.format()
|
|
|
|
@__if_open
|
|
def get_frames(self):
|
|
"""Return number of frames in file (number of samples per channel)."""
|
|
self.frames = self.pysndfile_object.frames()
|
|
return self.frames
|
|
|
|
@__if_open
|
|
def get_strings(self):
|
|
"""
|
|
get all stringtypes from the sound file.
|
|
|
|
see stringtype_name_top_id.keys() for the list of strings that
|
|
are supported by the libsndfile version you use.
|
|
"""
|
|
return self.pysndfile_object.get_strings()
|
|
|
|
@__if_open
|
|
def major_format_str(self):
|
|
"""
|
|
return short string representation of major format.
|
|
|
|
(e.g. aiff) see get_sndfile_formats() for a complete
|
|
list of file formats
|
|
"""
|
|
return self.pysndfile_object.major_format_str()
|
|
|
|
@__if_open
|
|
def read_frames(self, nframes=-1, dtype=np.float64):
|
|
"""
|
|
Read the given number of frames and fill numpy array.
|
|
|
|
Arguments
|
|
|
|
- nframes: <int>
|
|
|
|
- number of frames to read (default = -1 -> read all).
|
|
|
|
- dtype: <numpy dtype>
|
|
|
|
- dtype of the returned array containing read data (see note).
|
|
|
|
Notes:
|
|
One column per channel.
|
|
"""
|
|
return self.pysndfile_object.read_frames(nframes, dtype)
|
|
|
|
@__if_open
|
|
def rewind(self, mode='rw'):
|
|
"""
|
|
Rewind read/write/read and write position given by mode to
|
|
start of file.
|
|
"""
|
|
return self.pysndfile_object.format(mode)
|
|
|
|
@__if_open
|
|
def get_samplerate(self):
|
|
"""Return the samplerate of the file."""
|
|
return self.pysndfile_object.samplerate()
|
|
|
|
@__if_open
|
|
def seek(self, offset, whence=0, mode='rw'):
|
|
"""
|
|
Seek into audio file: similar to python seek function,
|
|
but taking only audio data into account.
|
|
|
|
Arguments
|
|
|
|
- offset: <int>
|
|
the number of frames (eg two samples for stereo files) to move
|
|
relatively to position set by whence.
|
|
|
|
- whence: <int>
|
|
only 0 (beginning), 1 (current) and 2 (end of the file) are valid.
|
|
|
|
- mode: <string>
|
|
If set to 'rw', both read and write pointers are updated. If 'r' is
|
|
given, only read pointer is updated, if 'w', only the write one is
|
|
(this may of course make sense only if you open the file in a certain
|
|
mode).
|
|
|
|
Returns
|
|
offset: int the number of frames from the beginning of the file
|
|
|
|
Notes:
|
|
Offset relative to audio data: meta-data are ignored.
|
|
if an invalid seek is given (beyond or before the file), an IOError
|
|
is raised; note that this is different from the seek method of a
|
|
File object.
|
|
"""
|
|
return self.pysndfile_object.seek(offset, whence, mode)
|
|
|
|
@__if_open
|
|
def seekable(self):
|
|
"""Return true for soundfiles that support seeking."""
|
|
return self.seekable()
|
|
|
|
@__if_open
|
|
def set_auto_clipping(self, arg=True):
|
|
"""
|
|
Enable auto clipping when reading/writing samples from/to sndfile.
|
|
|
|
auto clipping is enabled by default. auto clipping is required by
|
|
libsndfile to properly handle scaling between sndfiles with pcm
|
|
encoding and float representation of the samples in numpy. When auto
|
|
clipping is set to on reading pcm data into a float vector and writing
|
|
it back with libsndfile will reproduce the original samples. If auto
|
|
clipping is off, samples will be changed slightly as soon as the
|
|
amplitude is close to the sample range because libsndfile applies
|
|
slightly different scaling factors during read and write.
|
|
"""
|
|
return self.pysndfile_object.set_auto_clipping(arg)
|
|
|
|
@__if_open
|
|
def set_string(self, stringtype_name, string):
|
|
"""
|
|
Set one of the stringtypes to the string given as argument.
|
|
|
|
If you try to write a stringtype that is not supported by the library
|
|
a RuntimeError will be raised
|
|
"""
|
|
return self.pysndfile_object.set_string(stringtype_name, string)
|
|
|
|
@__if_open
|
|
def strError(self):
|
|
"""Report error strings related to the current sound file."""
|
|
return self.pysndfile_object.strError()
|
|
|
|
@__if_open
|
|
def writeSync(self):
|
|
"""
|
|
Call the operating system's function to force the writing of all file
|
|
cache buffers to disk.
|
|
No effect if file is open as read
|
|
"""
|
|
return self.pysndfile_object.writeSync()
|
|
|
|
@__if_open
|
|
def write_frames(self, input):
|
|
"""
|
|
write 1 or 2 dimensional array into sndfile.
|
|
|
|
Arguments:
|
|
|
|
- input: <numpy array>
|
|
containing data to write.
|
|
|
|
Notes:
|
|
|
|
One column per channel.
|
|
|
|
updates the write pointer.
|
|
|
|
if the input type is float, and the file encoding is an integer type,
|
|
you should make sure the input data are normalized normalized data
|
|
(that is in the range [-1..1] - which will corresponds to the maximum
|
|
range allowed by the integer bitwidth).
|
|
"""
|
|
return self.pysndfile_object.write_frames(input)
|
|
|
|
@__if_open
|
|
def construct_format(self, major, encoding):
|
|
"""
|
|
construct a format specification for libsndfile from major format
|
|
string and encoding string
|
|
"""
|
|
return self.pysndfile_object.construct_format(major, encoding)
|
|
|
|
@__if_open
|
|
def get_pysndfile_version(self):
|
|
"""return tuple describing the version of pysndfile"""
|
|
return self.pysndfile_object.get_pysndfile_version()
|
|
|
|
@__if_open
|
|
def get_sndfile_version(self):
|
|
"""
|
|
return a tuple of ints representing the version of the libsdnfile that
|
|
is used
|
|
"""
|
|
return self.pysndfile_object.get_sndfile_version()
|
|
|
|
@__if_open
|
|
def get_sndfile_formats(self):
|
|
"""
|
|
Return lists of available file formats supported by libsndfile and
|
|
pysndfile.
|
|
"""
|
|
return self.pysndfile_object.get_sndfile_formats()
|
|
|
|
@__if_open
|
|
def get_sndfile_encodings(self, major):
|
|
"""
|
|
Return lists of available encoding for the given sndfile format.
|
|
|
|
Arguments:
|
|
|
|
- major: Major sndfile format for the list of available formats.
|
|
format should be specified as a string, using one of the strings
|
|
returned by get_sndfile_formats()
|
|
"""
|
|
return self.pysndfile_object.get_sndfile_encodings(major)
|
|
|
|
@__if_open
|
|
def read_grain(self, start_index=0, grain_size=None, padding=True):
|
|
"""
|
|
Read a grain of audio from the file. if grain ends after the end of
|
|
the file, the grain can be padded with zeros using the padding
|
|
argument.
|
|
|
|
Audio object seeker is not changed
|
|
|
|
Arguments:
|
|
|
|
- start_index: the index in samples to read from.
|
|
|
|
- grain_size: The size of the grain (in samples) to read
|
|
|
|
- padding: if the end of the audio file is reaches, the grain will be
|
|
padded with additional zeros.
|
|
"""
|
|
self.switch_mode('r')
|
|
if start_index < 0:
|
|
start_index = self.get_frames() + start_index
|
|
if not grain_size:
|
|
grain_size = self.get_frames()
|
|
grain_size = int(grain_size)
|
|
position = self.get_seek_position()
|
|
# Read grain
|
|
index = self.pysndfile_object.seek(start_index, 0)
|
|
if index + grain_size > self.get_frames():
|
|
grain = self.read_frames(self.get_frames() - index)
|
|
if padding:
|
|
grain = np.pad(
|
|
grain,
|
|
(0, index + grain_size - self.get_frames()),
|
|
'constant',
|
|
constant_values=(0, 0)
|
|
)
|
|
else:
|
|
grain = self.read_frames(grain_size)
|
|
self.seek(position, 0)
|
|
return grain
|
|
|
|
def normalize_file(self, overwrite_original=False):
|
|
"""Normalizes the entire file"""
|
|
# Get current file name and it's extension
|
|
(current_filename, current_fileextension) = (
|
|
os.path.splitext(self.filepath)
|
|
)
|
|
# Create a seperate filepath to use for the mono file to be created
|
|
normalized_filename = ''.join(
|
|
(current_filename, ".norm", current_fileextension)
|
|
)
|
|
# If the mono audio file already exists then use that to replace the
|
|
# stereo file, rather than computing again from scratch
|
|
if os.path.exists(normalized_filename):
|
|
self.replace_audiofile(normalized_filename)
|
|
return None
|
|
# Create the empty mono file object
|
|
normalized_file = AudioFile(
|
|
normalized_filename,
|
|
mode='w',
|
|
format=self.format,
|
|
channels=1,
|
|
samplerate=self.samplerate
|
|
).open()
|
|
# Read current file in chunks and convert to mono by deviding all
|
|
# samples by 2 and combining to create a single signal
|
|
self.seek(0, 0)
|
|
self.switch_mode('r')
|
|
samples = self.pysndfile_object.read_frames()
|
|
self.normalize_audio(samples)
|
|
normalized_file.write_frames(samples)
|
|
normalized_file.switch_mode('r')
|
|
|
|
# If overwriting the original sound file, delete the original stereo
|
|
# audio file from the system and replace the audio object with the mono
|
|
# audio object created earlier. Re-name the mono audio file to be the
|
|
# same as the audio file it was replacing
|
|
if overwrite_original:
|
|
self.replace_audiofile(normalized_filename)
|
|
return None
|
|
else:
|
|
return normalized_file
|
|
|
|
def check_mono(self):
|
|
"""Check that the audio file is a mono audio file"""
|
|
if self.channels != 1:
|
|
return False
|
|
return True
|
|
|
|
def replace_audiofile(self, replacement_filename):
|
|
"""
|
|
Replace the current audiofile and audiofile object with the file
|
|
specified.
|
|
"""
|
|
pathops.file_must_exist(replacement_filename)
|
|
self.close()
|
|
os.remove(self.filepath)
|
|
os.rename(replacement_filename, self.filepath)
|
|
self.mode = 'r'
|
|
self.__enter__()
|
|
|
|
def convert_to_mono(self, overwrite_original=False):
|
|
"""
|
|
Converts stereo audiofiles to mono.
|
|
|
|
Arguments:
|
|
|
|
- overwrite_original: If True then the current object will be
|
|
reloaded as the mono file. Otherwise, the new mono file will be
|
|
returned as a new AudioFile object.
|
|
"""
|
|
# TODO: Implement mixdown for multi-channel audio other than 2 channel
|
|
# stereo.
|
|
|
|
# Get current file name and it's extension
|
|
(current_filename, current_fileextension) = (
|
|
os.path.splitext(self.filepath)
|
|
)
|
|
# Create a seperate filepath to use for the mono file to be created
|
|
mono_filename = ''.join(
|
|
(current_filename, ".mono", current_fileextension)
|
|
)
|
|
# If the mono audio file already exists then use that to replace the
|
|
# stereo file, rather than computing again from scratch
|
|
if os.path.exists(mono_filename):
|
|
self.replace_audiofile(mono_filename)
|
|
return None
|
|
# If the file is already mono then stop
|
|
if self.channels == 1:
|
|
return None
|
|
# Create the empty mono file object
|
|
mono_file = AudioFile(
|
|
mono_filename,
|
|
mode='w',
|
|
format=self.get_format(),
|
|
channels=1,
|
|
samplerate=self.get_samplerate()
|
|
).open()
|
|
# Read current file in chunks and convert to mono by deviding all
|
|
# samples by 2 and combining to create a single signal
|
|
self.seek(0, 0)
|
|
i = 0
|
|
chunk_size = 2048
|
|
while i < self.get_frames():
|
|
chunk = self.read_grain(i, chunk_size, padding=False)
|
|
chunk = ((chunk[:, 0] * 0.5) + (chunk[:, 1] * 0.5))
|
|
mono_file.write_frames(chunk)
|
|
i += chunk_size
|
|
# If overwriting the original sound file, delete the original stereo
|
|
# audio file from the system and replace the audio object with the mono
|
|
# audio object created earlier. Re-name the mono audio file to be the
|
|
# same as the audio file it was replacing
|
|
if overwrite_original:
|
|
del mono_file
|
|
self.replace_audiofile(mono_filename)
|
|
return None
|
|
else:
|
|
return mono_file
|
|
|
|
@__if_open
|
|
def rename_file(self, filename):
|
|
"""
|
|
Renames the audio file associated with the object to the name
|
|
specified as an argument
|
|
|
|
Arguments:
|
|
|
|
- filename: the new path of the audio file.
|
|
"""
|
|
# TODO: Consider the race condition here. Is this a problem?
|
|
# Check name doesn't already exist
|
|
if os.path.exists(filename):
|
|
raise ValueError("The filepath: {0} is an already existing file")
|
|
# Check name is a valid file path
|
|
if not os.path.exists(os.path.dirname(filename)):
|
|
raise ValueError("The filepath: {0} does not point to an existing "
|
|
"directory".format(filename))
|
|
# Check name has the same extension as previous file
|
|
old_ext = os.path.splitext(self.filepath)[1]
|
|
new_ext = os.path.splitext(filename)[1]
|
|
if old_ext != new_ext:
|
|
raise ValueError("The renamed file's extension ({0})"
|
|
"must be the same as the original extension"
|
|
"({1})".format(old_ext, new_ext))
|
|
# Delete pysndfile object
|
|
seek = self.get_seek_position()
|
|
del self.pysndfile_object
|
|
# Rename file
|
|
os.rename(self.filepath, filename)
|
|
# Reinitialize pysndfile object
|
|
self.pysndfile_object = pysndfile.PySndfile(
|
|
filename,
|
|
mode='r',
|
|
format=self.format,
|
|
samplerate=self.samplerate,
|
|
channels=self.channels
|
|
)
|
|
self.filepath = filename
|
|
# Re-set seek position to previous position
|
|
self.seek(seek, 0)
|
|
|
|
def get_seek_position(self):
|
|
"""Returns the current seeker position in the file"""
|
|
return self.seek(0, 1)
|
|
|
|
def ms_to_samps(self, ms):
|
|
"""
|
|
Converts milliseconds to samples based on the sample rate of the audio
|
|
file
|
|
"""
|
|
seconds = ms / 1000.0
|
|
return int(round(seconds * self.samplerate))
|
|
|
|
def secs_to_samps(self, seconds):
|
|
"""
|
|
Converts seconds to samples based on the sample rate of the audio file
|
|
"""
|
|
return int(round(seconds * self.samplerate))
|
|
|
|
def samps_to_secs(self, samps):
|
|
"""
|
|
Converts samples to seconds based on the sample rate of the audio
|
|
file
|
|
"""
|
|
return float(samps) / self.samplerate
|
|
|
|
def samps_to_ms(self, samps):
|
|
"""
|
|
Convert samples to milliseconds based on the sample rate of the audio
|
|
file
|
|
"""
|
|
return float(samps) / self.samplerate * 1000.0
|
|
|
|
def fade_audio(self, audio, position, fade_time, mode):
|
|
"""
|
|
Fade the audio in or out linearly from the position specified over the
|
|
time specified.
|
|
|
|
Arguments:
|
|
|
|
- audio: A numpy array of audio to manipulate
|
|
|
|
- start_position: The starting position to begin the fade from (ms)
|
|
|
|
- fade_time: The length of the fade (ms)
|
|
|
|
- mode: choose to fade the audio in or out (string: "in" or "out")
|
|
"""
|
|
if mode == "in":
|
|
# Calculate the amplitude values to multiply the audio by
|
|
fade = np.linspace(0.0, 1.0, self.ms_to_samps(fade_time))
|
|
position = self.ms_to_samps(position)
|
|
# multiply samples by the fade values from the start position for
|
|
# the duration of the fade
|
|
audio[position:position+fade.size] *= fade
|
|
# zero any samples before the fade in
|
|
audio[:position] *= 0
|
|
|
|
elif mode == "out":
|
|
# Calculate the amplitude values to multiply the audio by
|
|
fade = np.linspace(1.0, 0.0, self.ms_to_samps(fade_time))
|
|
position = self.ms_to_samps(position)
|
|
# multiply samples by the fade values from the start position for
|
|
# the duration of the fade
|
|
audio[position:position+fade.size] *= fade
|
|
# zero any samples after the fade in
|
|
audio[position+fade.size:] *= 0
|
|
else:
|
|
self.logger.exception("{0} is not a valid fade option. Use either \"in\" or "
|
|
"\"out\"".format(mode))
|
|
raise ValueError
|
|
return audio
|
|
|
|
def check_not_empty(self):
|
|
"""Check that the file contains audio"""
|
|
if self.get_frames() > 0:
|
|
return True
|
|
return False
|
|
|
|
def check_valid(self, force_mono=False):
|
|
"""
|
|
Test to make sure that the audio file is valid for use.
|
|
ie mono, not empty
|
|
"""
|
|
if not self.check_mono():
|
|
if force_mono:
|
|
self.convert_to_mono(overwrite_original=True)
|
|
return True
|
|
return False
|
|
if not self.check_not_empty():
|
|
return False
|
|
return True
|
|
|
|
def switch_mode(self, mode):
|
|
"""
|
|
Switch audiofile to mode specified.
|
|
|
|
This allows for convenient reading and writing of audiofiles without
|
|
direct closing and opening of the underlying pysndfile object.
|
|
"""
|
|
assert mode == 'r' or mode == 'w'
|
|
# Change mode only if it is different from the currently set mode
|
|
if self.mode != mode:
|
|
seek = self.get_seek_position()
|
|
del self.pysndfile_object
|
|
self.mode = mode
|
|
self.__enter__()
|
|
self.pysndfile_object.seek(seek)
|
|
|
|
def generate_grain_times(self, grain_length, overlap):
|
|
"""
|
|
Generates an array of start and finish pairs based on overlapping
|
|
frames at the grain length specified.
|
|
|
|
Note that only full grains within the size of the sample are returned.
|
|
incomplete grains found at the end of files are ignored.
|
|
|
|
Arguments:
|
|
|
|
- grain_length: length of each grain in seconds.
|
|
|
|
- overlap: the factor by which grains overlap (integer)
|
|
"""
|
|
length = self.samps_to_ms(self.frames)
|
|
hop_size = grain_length / overlap
|
|
grain_count = int(length / hop_size) - 1
|
|
times = np.arange(grain_count).reshape(-1, 1)
|
|
times = np.hstack((times, times)).astype(np.dtype('float64'))
|
|
times *= hop_size
|
|
times[:, 1] += grain_length
|
|
# Save grain times as a member variable for later refference.
|
|
self.times = times
|
|
return times
|
|
|
|
def __getitem__(self, key):
|
|
"""
|
|
Allow for grains to be retreived by indexing after grain times have been generated.
|
|
"""
|
|
if self.times == None:
|
|
raise IndexError("AudioFile object grain times must be generated "
|
|
"before grains can be accesed by index. Try running "
|
|
"AnalysedAudioFile.generate_grain_times(grain_size, "
|
|
"overlap)")
|
|
grain_times = self.times[key].copy()
|
|
grain_times *= (self.samplerate / 1000)
|
|
return self.read_grain(start_index=grain_times[0], grain_size=grain_times[1]-grain_times[0])
|
|
|
|
|
|
@staticmethod
|
|
def gen_window(window_type, window_size, sym=True):
|
|
"""
|
|
Generates a window function of given size and type
|
|
Returns a 1D numpy array
|
|
|
|
sym: Used in the triangle window generation. When True (default),
|
|
generates a symmetric window, for use in filter design. When False,
|
|
generates a periodic window, for use in spectral analysis
|
|
|
|
Available window types:
|
|
|
|
- hanning
|
|
|
|
- hamming
|
|
|
|
- bartlett
|
|
|
|
- blackman
|
|
|
|
- kaiser
|
|
|
|
- triangle
|
|
"""
|
|
if window_type is "hanning":
|
|
return np.hanning(window_size)
|
|
elif window_type is "hamming":
|
|
return np.hamming(window_size)
|
|
elif window_type is "bartlett":
|
|
return np.bartlett(window_size)
|
|
elif window_type is "blackman":
|
|
return np.blackman(window_size)
|
|
elif window_type is "kaiser":
|
|
return np.kaiser(window_size)
|
|
elif window_type is "triangle":
|
|
return signal.triang(window_size, sym=sym)
|
|
else:
|
|
raise ValueError("'{0}' is not a valid window"
|
|
" type".format(window_type))
|
|
|
|
@staticmethod
|
|
def normalize_audio(audio, maximum=1.0):
|
|
"""
|
|
Normalize array of audio so that the maximum sample value is equal to
|
|
the maximum provided
|
|
"""
|
|
if audio.size < 1:
|
|
raise ValueError("Audio array is empty. Cannot be normalized""")
|
|
max_sample = np.max(np.abs(audio))
|
|
audio *= maximum / max_sample
|
|
return audio
|
|
|
|
@staticmethod
|
|
def mono_arrays_to_stereo(array1, array2):
|
|
"""
|
|
Converts two horizontal numpy arrays to one concatenated verticaly
|
|
stacked array that can be written to a stereo file.
|
|
"""
|
|
return np.hstack((np.vstack(array1), np.vstack(array2)))
|
|
|
|
@staticmethod
|
|
def gen_white_noise(length, gain):
|
|
"""
|
|
Generate mono white noise of the number of samples specified.
|
|
|
|
Arguments:
|
|
|
|
- length (samples)
|
|
|
|
- gain (silence 0.0 - full volume 1.0)
|
|
"""
|
|
return np.random.uniform(low=-gain, high=gain, size=length)
|
|
|
|
@staticmethod
|
|
def gen_default_wav(path, overwrite_existing=False, mode='w', channels=1):
|
|
"""
|
|
Convenience method that creates a wav file with the following spec at
|
|
the path given:
|
|
|
|
- Samplerate: 44.1Khz
|
|
|
|
- Bit rate: 24Bit
|
|
"""
|
|
if os.path.exists(path):
|
|
if not overwrite_existing:
|
|
raise IOError(
|
|
''.join(("File: \"", path, "\" already exists."))
|
|
)
|
|
else:
|
|
os.remove(path)
|
|
|
|
return AudioFile(
|
|
path,
|
|
mode,
|
|
format=pysndfile.construct_format("wav", "pcm24"),
|
|
channels=channels,
|
|
samplerate=44100
|
|
).open()
|
|
|
|
def __repr__(self):
|
|
return 'AudioFile(name={0}, wav={1})'.format(self.name, self.filepath)
|
|
|
|
|
|
class AnalysedAudioFile(AudioFile):
|
|
|
|
"""
|
|
Generates and stores analysis information for an audio file.
|
|
|
|
Arguments:
|
|
|
|
- (All AudioFile arguments)
|
|
|
|
- db_dir: if the object is part of a database, this is the path to the root
|
|
of the database.
|
|
|
|
- config: The config module used for configuration settings. See examples
|
|
for further details.
|
|
|
|
- data_file: the path to the HDF5 file used to store analyses for this
|
|
audio file.
|
|
|
|
- reanalyse: bool for whether to overwrite any previously created analyses
|
|
for this audio file.
|
|
|
|
- available_analyses: a list of strings for each analyses to be generated.
|
|
ie. [\'f0\', \'rms\']
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
# Initialise the AudioFile parent class
|
|
self.logger = logging.getLogger('audiofile.AnalysedAudioFile')
|
|
super(AnalysedAudioFile, self).__init__(*args, **kwargs)
|
|
|
|
# Initialise database variables
|
|
# Stores the path to the database if object is part of a database.
|
|
self.db_dir = kwargs.pop('db_dir', None)
|
|
|
|
# Store configuration file used for various settings.
|
|
self.config = kwargs.pop('config', None)
|
|
|
|
# Refferences the HDF5 file object to use for storing analysis data.
|
|
analysis_file = kwargs.pop('data_file', None)
|
|
|
|
self.analysis_storage = self.create_analysis_group(analysis_file)
|
|
|
|
# If True then files are re-analysed, discarding any previous analysis.
|
|
self.force_analysis = kwargs.pop('reanalyse', False)
|
|
|
|
# Analysis members. If an analysis is specified either as a tag, or as
|
|
# a filepath, it will be generated and either saved at the path
|
|
# specified or if one isn't specified, it will be created.
|
|
# A set containing tags for analyses to be created for the file
|
|
self.available_analyses = kwargs.pop("analyses", None)
|
|
|
|
def create_analysis(self):
|
|
"""Generate all analyses that have been set in the self.available_analyses member."""
|
|
analysis_object = namedtuple("AnalysisObject", "name, analysis_object")
|
|
analysis_object_list = [
|
|
analysis_object("fft", FFTAnalysis),
|
|
analysis_object("rms", RMSAnalysis),
|
|
analysis_object("zerox", ZeroXAnalysis),
|
|
analysis_object("spccntr", SpectralCentroidAnalysis),
|
|
analysis_object("spcsprd", SpectralSpreadAnalysis),
|
|
analysis_object("spcflux", SpectralFluxAnalysis),
|
|
analysis_object("spccf", SpectralCrestFactorAnalysis),
|
|
analysis_object("spcflatness", SpectralFlatnessAnalysis),
|
|
analysis_object("f0", F0Analysis),
|
|
analysis_object("peak", PeakAnalysis),
|
|
analysis_object("centroid", CentroidAnalysis),
|
|
analysis_object("variance", VarianceAnalysis),
|
|
analysis_object("kurtosis", KurtosisAnalysis),
|
|
analysis_object("skewness", SkewnessAnalysis),
|
|
analysis_object("harm_ratio", F0HarmRatioAnalysis)
|
|
]
|
|
|
|
self.analyses = defaultdict(None)
|
|
|
|
# Create the analysis objects for analyses that have been specified in
|
|
# the analyses member variable.
|
|
for analysis in analysis_object_list:
|
|
if analysis.name in self.available_analyses:
|
|
self.analyses[analysis.name] = analysis.analysis_object(self, self.analysis_storage, config=self.config)
|
|
self.analysis_storage.file.flush()
|
|
gc.collect()
|
|
|
|
|
|
def create_analysis_group(self, analysis_file):
|
|
"""
|
|
Create HDF5 group for object to store analyses for this audio file.
|
|
|
|
Audio file analyses are organized in groups per audio file.
|
|
This function creates a group in the analysis HDF5 file with the name
|
|
of the audio file. Analyses of this file are stored in analysis
|
|
sub-groups.
|
|
"""
|
|
# If an analysis file object is not provided, try to create on based on
|
|
# the object's name and audio file location.
|
|
if not analysis_file:
|
|
if self.db_dir:
|
|
# Raise error as database should have analysis file.
|
|
raise IOError("Database doesn't have an analysis file.")
|
|
else:
|
|
# Attempt to create a new analysis file using the name of the
|
|
# audiofile.
|
|
path = os.path.split(self.filepath)[0]
|
|
name = '_'.join((os.path.splitext(self.name)[0], 'analysis_data.hdf5'))
|
|
datapath = os.path.join(path, name)
|
|
analysis_file = h5py.File(datapath, 'a')
|
|
# Create a group to store analyses for this file in
|
|
group_name = ''.join(("analysis/", self.name))
|
|
try:
|
|
analysis_file.create_group(group_name)
|
|
except ValueError:
|
|
self.logger.warning("A file with the same name ({0}) already "
|
|
"exists in the analysis data. Using data from "
|
|
"this file.".format(group_name))
|
|
analysis_file[group_name].attrs['filepath'] = self.filepath
|
|
return analysis_file[group_name]
|
|
|
|
def __enter__(self):
|
|
"""Allow AudioFile object to be opened by 'with' statements"""
|
|
super(AnalysedAudioFile, self).__enter__()
|
|
if not self.check_valid(force_mono=True):
|
|
raise IOError(
|
|
"File isn't valid: {0}\nCheck that file is mono and isn't "
|
|
"empty".format(self.name))
|
|
|
|
return self
|
|
|
|
def open(self):
|
|
return self
|
|
|
|
def analysis_data_grains(self, times, analysis, *args, **kwargs):
|
|
"""
|
|
retrieve data for analysis within start and end time pairs in the format specified.
|
|
|
|
Arguments:
|
|
|
|
- times: an array of start and end times to retrieve analysis from (np.array)
|
|
|
|
- analysis: analysis string specifying analysis to retrieve
|
|
"""
|
|
format_type = kwargs.pop("format", None)
|
|
|
|
analysis_object = self.analyses[analysis]
|
|
|
|
if len(times.shape) != 2:
|
|
times = np.array([times])
|
|
analysis_frames, selection = analysis_object.get_analysis_grains(times[:, 0], times[:, 1])
|
|
|
|
if format_type:
|
|
analysis_frames = analysis_object.analysis_formatter(analysis_frames, selection, format_type)
|
|
|
|
return analysis_frames, selection
|
|
|
|
def __repr__(self):
|
|
return ('AnalysedAudioFile(name={0})'.format(self.name))
|