Source code for pypam.acoustic_file

__author__ = "Clea Parcerisas"
__version__ = "0.1"
__credits__ = "Clea Parcerisas"
__email__ = "clea.parcerisas@vliz.be"
__status__ = "Development"

import datetime
import operator
import os
import pathlib
import zipfile

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import soundfile as sf
import xarray
from tqdm.auto import tqdm

from pypam import nmf
from pypam import plots
from pypam import signal as sig
from pypam import utils
from pypam import units as output_units

pd.plotting.register_matplotlib_converters()
plt.rcParams.update({'pcolor.shading': 'auto'})

# Apply the default theme
sns.set_theme()


[docs]class AcuFile: """ Data recorded in a wav file. Parameters ---------- sfile : Sound file Can be a path or a file object hydrophone : Object for the class hydrophone p_ref : Float Reference pressure in upa timezone: datetime.tzinfo, pytz.tzinfo.BaseTZInfo, dateutil.tz.tz.tzfile, str or None Timezone where the data was recorded in channel : int Channel to perform the calculations in calibration: float, -1 or None If it is a float, it is the time ignored at the beginning of the file. If None, nothing is done. If negative, the function calibrate from the hydrophone is performed, and the first samples ignored (and hydrophone updated) dc_subtract: bool Set to True to subtract the dc noise (root mean squared value """ def __init__(self, sfile, hydrophone, p_ref, timezone='UTC', channel=0, calibration=None, dc_subtract=False): # Save hydrophone model self.hydrophone = hydrophone # Get the date from the name if type(sfile) == str: file_name = os.path.split(sfile)[-1] elif issubclass(sfile.__class__, pathlib.Path): file_name = sfile.name elif issubclass(sfile.__class__, zipfile.ZipExtFile): file_name = sfile.name else: raise Exception('The filename has to be either a Path object or a string') try: self.date = hydrophone.get_name_datetime(file_name) except ValueError: self.date = datetime.datetime.now() print('Filename %s does not match the %s file structure. Setting time to now...' % (file_name, self.hydrophone.name)) # Signal self.file_path = sfile self.file = sf.SoundFile(self.file_path, 'r') self.fs = self.file.samplerate # Reference pressure in upa self.p_ref = p_ref # Work on local time or UTC time self.timezone = timezone # datetime will always be in UTC self.datetime_timezone = 'UTC' # Select channel self.channel = channel # Set an empty wav array self.wav = None self.time = None # Set a starting frame for the file if calibration is None: self._start_frame = 0 elif calibration < 0: self._start_frame = self.hydrophone.calibrate(self.file_path) if self._start_frame is None: self._start_frame = 0 else: self._start_frame = int(calibration * self.fs) self.calibration = calibration self.dc_subtract = dc_subtract def __getattr__(self, name): """ Specific methods to make it easier to access attributes """ if name == 'signal': return self.signal('upa') elif name == 'time': if self.__dict__[name] is None: return self._time_array(binsize=1 / self.fs)[0] else: return self.__dict__[name] else: return self.__dict__[name] def _bins(self, binsize=None, bin_overlap=0): """ Yields the bins each binsize Parameters ---------- binsize: float or None Number of seconds per bin to yield. If set to None, a single bin is yield for the entire file bin_overlap : float [0 to 1] Percentage to overlap the bin windows Returns ------- Iterates through all the bins, yields i, time_bin, signal, start_sample, end_sample Where i is the index, time_bin is the datetime of the beginning of the block and signal is the signal object of the bin """ if binsize is None: blocksize = self.file.frames - self._start_frame else: blocksize = self.samples(binsize) noverlap = int(bin_overlap * blocksize) n_blocks = self._n_blocks(blocksize, noverlap=noverlap) time_array, _, _ = self._time_array(binsize, bin_overlap=bin_overlap) for i, block in tqdm(enumerate(sf.blocks(self.file_path, blocksize=blocksize, start=self._start_frame, overlap=bin_overlap, always_2d=True, fill_value=0.0)), total=n_blocks, leave=False, position=0): # Select the desired channel block = block[:, self.channel] time_bin = time_array[i] # Read the signal and prepare it for analysis signal_upa = self.wav2upa(wav=block) signal = sig.Signal(signal=signal_upa, fs=self.fs, channel=self.channel) if self.dc_subtract: signal.remove_dc() start_sample = i * blocksize + self._start_frame end_sample = start_sample + len(signal_upa) yield i, time_bin, signal, start_sample, end_sample self.file.seek(0) def _n_blocks(self, blocksize, noverlap): return int(np.floor(self.file.frames - self._start_frame) / (blocksize - noverlap)) def samples(self, bintime): """ Return the samples according to the fs Parameters ---------- bintime : Float Seconds of bintime desired to convert to samples """ return int(bintime * self.fs) def set_calibration_time(self, calibration_time): """ Set a calibration time in seconds. This time will be ignored in the processing Parameters ---------- calibration_time : float Seconds to ignore at the beginning of the file """ self._start_frame = int(calibration_time * self.fs) def instrument(self): """ Instrument will be the name of the hydrophone """ return self.hydrophone.name def total_time(self): """ Return the total time in seconds of the file """ return self.samples2time(self.file.frames) def samples2time(self, samples): """ Return the samples according to the fs Parameters ---------- samples : Int Number of samples to convert to seconds """ return float(samples) / self.fs def is_in_period(self, period): """ Return True if the WHOLE file is included in the specified period Parameters ---------- period : list or a tuple with (start, end). Values have to be a datetime object """ if period is None: return True else: return (self.date >= period[0]) & (self.date <= period[1]) def contains_date(self, date): """ Return True if data is included in the file Parameters ---------- date : datetime object Datetime to check """ end = self.date + datetime.timedelta(seconds=self.file.frames / self.fs) return (self.date < date) & (end > date) def split(self, date): """ Save two different files out of one splitting on the specified date Parameters ---------- date : datetime object Datetime where to split the file """ if issubclass(self.file_path, zipfile.ZipExtFile): raise Exception('The split method is not implemented for zipped files') if not self.contains_date(date): raise Exception('This date is not included in the file!') else: self.file.seek(0) seconds = (date - self.date).seconds frames = self.samples(seconds) first_file = self.file.read(frames=frames) second_file = self.file.read() self.file.close() new_file_name = self.hydrophone.get_new_name(filename=self.file_path.name, new_date=date) new_file_path = self.file_path.parent.joinpath(new_file_name) sf.write(self.file_path, first_file, samplerate=self.fs) sf.write(new_file_path, second_file, samplerate=self.fs) return self.file_path, new_file_path def freq_resolution_window(self, freq_resolution): """ Given the frequency resolution, window length needed to obtain it Returns window length in samples Parameters ---------- freq_resolution : int Must be a power of 2, in Hz """ n = np.log2(self.fs / freq_resolution) nfft = 2 ** n if nfft > self.file.frames: raise Exception('This is not achievable with this sampling rate, ' 'it must be downsampled!') return nfft def signal(self, units='wav'): """ Returns the signal in the specified units Parameters ---------- units : string Units in which to return the signal. Can be 'wav', 'db', 'upa', 'Pa' or 'acc'. """ # First time, read the file and store it to not read it over and over if self.wav is None: self.wav = self.file.read() self.file.seek(0) if units == 'wav': signal = self.wav elif units == 'db': signal = self.wav2db() elif units == 'upa': signal = self.wav2upa() elif units == 'Pa': signal = self.wav2upa() / 1e6 elif units == 'acc': signal = self.wav2acc() else: raise Exception('%s is not implemented as an outcome unit' % units) return signal def _time_array(self, binsize=None, bin_overlap=0): """ Return a time array for each point of the signal """ if binsize is None: total_block = self.file.frames - self._start_frame else: total_block = self.samples(binsize) blocksize = total_block - int(total_block) * bin_overlap blocks_samples = np.arange(start=self._start_frame, stop=self.file.frames - 1, step=blocksize) end_samples = blocks_samples + blocksize incr = pd.to_timedelta(blocks_samples / self.fs, unit='seconds') self.time = self.date + datetime.timedelta(seconds=self._start_frame / self.fs) + incr if self.timezone != 'UTC': self.time = pd.to_datetime(self.time).tz_localize(self.timezone).tz_convert('UTC').tz_convert(None) return self.time, blocks_samples.astype(int), end_samples.astype(int) def timestamp_da(self, binsize=None, bin_overlap=0): time_array, start_samples, end_samples = self._time_array(binsize, bin_overlap) ds = xarray.Dataset(coords={'id': np.arange(len(time_array)), 'datetime': ('id', time_array.values), 'start_sample': ('id', start_samples), 'end_sample': ('id', end_samples)}) return ds def wav2upa(self, wav=None): """ Compute the pressure from the wav signal Parameters ---------- wav : ndarray Signal in wav (-1 to 1) """ # Read if no signal is passed if wav is None: wav = self.signal('wav') # First convert it to Volts and then to Pascals according to sensitivity mv = 10 ** (self.hydrophone.sensitivity / 20.0) * self.p_ref ma = 10 ** (self.hydrophone.preamp_gain / 20.0) * self.p_ref gain_upa = (self.hydrophone.Vpp / 2.0) / (mv * ma) return utils.set_gain(wave=wav, gain=gain_upa) def wav2db(self, wav=None): """ Compute the db from the wav signal. Consider the hydrophone sensitivity in db. If wav is None, it will read the whole file. Parameters ---------- wav : ndarray Signal in wav (-1 to 1) """ # Read if no signal is passed if wav is None: wav = self.signal('wav') upa = self.wav2upa(wav) return utils.to_db(wave=upa, ref=self.p_ref, square=True) def db2upa(self, db=None): """ Compute the upa from the db signals. If db is None, it will read the whole file. Parameters ---------- db : ndarray Signal in db """ if db is None: db = self.signal('db') # return np.power(10, db / 20.0 - np.log10(self.p_ref)) return utils.to_mag(wave=db, ref=self.p_ref) def upa2db(self, upa=None): """ Compute the db from the upa signal. If upa is None, it will read the whole file. Parameters ---------- upa : ndarray Signal in upa """ if upa is None: upa = self.signal('upa') return utils.to_db(upa, ref=self.p_ref, square=True) def wav2acc(self, wav=None): """ Convert the wav file to acceleration. If wav is None, it will read the whole file. Parameters ---------- wav : ndarray Signal in wav (-1 to 1) """ if wav is None: wav = self.file.read() mv = 10 ** (self.hydrophone.mems_sensitivity / 20.0) return wav / mv def _get_metadata_attrs(self): metadata_keys = [ 'file_path', 'timezone', 'datetime_timezone', 'p_ref', 'channel', '_start_frame', 'calibration', 'dc_subtract', 'fs', 'hydrophone.name', 'hydrophone.model', 'hydrophone.sensitivity', 'hydrophone.preamp_gain', 'hydrophone.Vpp' ] metadata_attrs = {} for k in metadata_keys: d = self for sub_k in k.split('.'): d = d.__dict__[sub_k] if isinstance(d, pathlib.Path): d = str(d) if d is None: d = 0 metadata_attrs[k.replace('.', '_')] = d return metadata_attrs def _apply_multiple(self, method_list, binsize=None, band_list=None, bin_overlap=0, **kwargs): """ Apply multiple methods per bin to save computational time Parameters ---------- method_list: list of strings List of all the methods to apply band_list: list of tuples, tuple or None Bands to filter. Can be multiple bands (all of them will be analyzed) or only one band. A band is represented with a tuple as (low_freq, high_freq). If set to None, the broadband up to the Nyquist frequency will be analyzed binsize: float Length in seconds of the bins to analyze bin_overlap : float [0 to 1] Percentage to overlap the bin windows kwargs: any parameters that have to be passed to the methods Returns ------- DataFrame with time as index and a multiindex column with band, method as levels. """ # TODO decide if it is downsampled or not downsample = False # Bands selected to study if band_list is None: band_list = [[None, self.fs / 2]] # Sort bands to diminish downsampling efforts! sorted_bands = [] for band in band_list: if len(sorted_bands) == 0: sorted_bands = [band] else: if band[1] >= sorted_bands[-1][1]: sorted_bands = [band] + sorted_bands else: sorted_bands = sorted_bands + [band] log = True if 'db' in kwargs.keys(): if not kwargs['db']: log = False # Define an empty dataset ds = xarray.Dataset() for i, time_bin, signal, start_sample, end_sample in self._bins(binsize, bin_overlap=bin_overlap): ds_bands = xarray.Dataset() for j, band in enumerate(sorted_bands): signal.set_band(band, downsample=downsample) methods_output = xarray.Dataset() for method_name in method_list: f = operator.methodcaller(method_name, **kwargs) try: output = f(signal) except Exception as e: print('There was an error in band %s, feature %s. Setting to None. ' 'Error: %s' % (band, method_name, e)) output = None units_attrs = output_units.get_units_attrs(method_name=method_name, log=log, p_ref=self.p_ref, **kwargs) methods_output[method_name] = xarray.DataArray([[output]], coords={'id': [i], 'datetime': ('id', [time_bin]), 'start_sample': ('id', [start_sample]), 'end_sample': ( 'id', [end_sample]), 'band': [j], 'low_freq': ('band', [band[0]]), 'high_freq': ( 'band', [band[1]])}, dims=['id', 'band'], attrs=units_attrs) if j == 0: ds_bands = methods_output else: ds_bands = xarray.concat((ds_bands, methods_output), 'band') if i == 0: ds = ds_bands else: ds = xarray.concat((ds, ds_bands), 'id') ds.attrs = self._get_metadata_attrs() return ds def _apply(self, method_name, binsize=None, db=True, band_list=None, bin_overlap=0, **kwargs): """ Apply one single method Parameters ---------- method_name : string Name of the method to apply binsize : float, in sec Time window considered. If set to None, only one value is returned overlap : float [0 to 1] Percentage to overlap the bin windows db : bool If set to True the result will be given in db, otherwise in upa """ return self._apply_multiple(method_list=[method_name], binsize=binsize, bin_overlap=bin_overlap, db=db, band_list=band_list, **kwargs) def rms(self, binsize=None, bin_overlap=0, db=True): """ Calculation of root mean squared value (rms) of the signal in upa for each bin Returns Dataframe with 'datetime' as index and 'rms' value as a column Parameters ---------- binsize : float, in sec Time window considered. If set to None, only one value is returned bin_overlap : float [0 to 1] Percentage to overlap the bin windows db : bool If set to True the result will be given in db, otherwise in upa """ rms_ds = self._apply(method_name='rms', binsize=binsize, bin_overlap=bin_overlap, db=db) return rms_ds def aci(self, binsize=None, bin_overlap=0, nfft=1024, fft_overlap=0.5): """ Calculation of root mean squared value (rms) of the signal in upa for each bin Returns Dataframe with 'datetime' as index and 'rms' value as a column Parameters ---------- binsize : float, in sec Time window considered. If set to None, only one value is returned bin_overlap : float [0 to 1] Percentage to overlap the bin windows nfft : int Window size for processing fft_overlap : float [0 to 1] Percentage to overlap the bin windows """ aci_ds = self._apply(method_name='aci', binsize=binsize, bin_overlap=bin_overlap, nfft=nfft, fft_overlap=fft_overlap) return aci_ds def dynamic_range(self, binsize=None, bin_overlap=0, db=True): """ Compute the dynamic range of each bin Returns a dataframe with datetime as index and dr as column Parameters ---------- binsize : float, in sec Time window considered. If set to None, only one value is returned bin_overlap : float [0 to 1] Percentage to overlap the bin windows db : bool If set to True the result will be given in db, otherwise in upa """ dr_ds = self._apply(method_name='dynamic_range', binsize=binsize, bin_overlap=bin_overlap, db=db) return dr_ds def cumulative_dynamic_range(self, binsize=None, bin_overlap=0, db=True): """ Compute the cumulative dynamic range for each bin Parameters ---------- binsize : float, in sec Time window considered. If set to None, only one value is returned bin_overlap : float [0 to 1] Percentage to overlap the bin windows db : bool If set to True the result will be given in db, otherwise in upa^2 Returns ------- DataFrame with an extra column with the cumulative sum of dynamic range of each bin """ cumdr = self.dynamic_range(binsize=binsize, bin_overlap=bin_overlap, db=db) cumdr['cumsum_dr'] = cumdr.dr.cumsum() return cumdr def octaves_levels(self, binsize=None, bin_overlap=0, db=True, band=None, **kwargs): """ Return the octave levels Parameters ---------- binsize: float Length in seconds of the bin to analyze bin_overlap : float [0 to 1] Percentage to overlap the bin windows db: boolean Set to True if the result should be in decibels band: list or tuple List or tuple of [low_frequency, high_frequency] Returns ------- DataFrame with multiindex columns with levels method and band. The method is '3-oct' """ return self._octaves_levels(fraction=1, binsize=binsize, bin_overlap=bin_overlap, db=db, band=band) def third_octaves_levels(self, binsize=None, bin_overlap=0, db=True, band=None, **kwargs): """ Return the octave levels Parameters ---------- binsize: float Length in seconds of the bin to analyze bin_overlap : float [0 to 1] Percentage to overlap the bin windows db: boolean Set to True if the result should be in decibels band: list or tuple List or tuple of [low_frequency, high_frequency] Returns ------- DataFrame with multiindex columns with levels method and band. The method is '3-oct' """ return self._octaves_levels(fraction=3, binsize=binsize, bin_overlap=bin_overlap, db=db, band=band) def _octaves_levels(self, fraction=1, binsize=None, bin_overlap=0, db=True, band=None): """ Return the octave levels Parameters ---------- fraction: int Fraction of the desired octave. Set to 1 for octave bands, set to 3 for 1/3-octave bands binsize: float Length in seconds of the bin to analyze bin_overlap : float [0 to 1] Percentage to overlap the bin windows db: boolean Set to True if the result should be in decibels Returns ------- DataFrame with multiindex columns with levels method and band. The method is '3-oct' """ downsample = True if band is None: band = [None, self.fs / 2] oct_str = 'oct%s' % fraction # Create an empty dataset da = xarray.DataArray() units_attrs = output_units.get_units_attrs(method_name='octave_levels', p_ref=self.p_ref, log=db) for i, time_bin, signal, start_sample, end_sample in self._bins(binsize, bin_overlap=bin_overlap): signal.set_band(band, downsample=downsample) fbands, levels = signal.octave_levels(db, fraction) da_levels = xarray.DataArray(data=[levels], coords={'id': [i], 'start_sample': ('id', [start_sample]), 'end_sample': ('id', [end_sample]), 'datetime': ('id', [time_bin]), 'frequency': fbands}, dims=['id', 'frequency'] ) if i == 0: da = da_levels else: da = xarray.concat((da, da_levels), 'id') da.attrs.update(units_attrs) ds = xarray.Dataset(data_vars={oct_str: da}, attrs=self._get_metadata_attrs()) return ds def hybrid_millidecade_bands(self, nfft, fft_overlap=0.5, binsize=None, bin_overlap=0, db=True, method='density', band=None, percentiles=None): """ Parameters ---------- binsize : float, in sec Time window considered. If set to None, only one value is returned bin_overlap : float [0 to 1] Percentage to overlap the bin windows nfft : int Length of the fft window in samples. Power of 2. fft_overlap : float [0 to 1] Percentage to overlap the bin windows db : bool If set to True the result will be given in db, otherwise in upa^2 method: string Can be 'spectrum' or 'density' band : tuple or None Band to filter the spectrogram in. A band is represented with a tuple - or a list - as (low_freq, high_freq). If set to None, the broadband up to the Nyquist frequency will be analyzed percentiles : list or None List of all the percentiles that have to be returned. If set to empty list, no percentiles is returned Returns ------- """ if band is None: band = [0, self.fs / 2] spectra_ds = self._spectrum(scaling=method, binsize=binsize, nfft=nfft, fft_overlap=fft_overlap, db=False, bin_overlap=bin_overlap, percentiles=percentiles, band=band) millidecade_bands_limits, millidecade_bands_c = utils.get_hybrid_millidecade_limits(band, nfft) fft_bin_width = band[1] * 2 / nfft hybrid_millidecade_ds = utils.spectra_ds_to_bands(spectra_ds['band_%s' % method], millidecade_bands_limits, millidecade_bands_c, fft_bin_width=fft_bin_width, db=db) spectra_ds['millidecade_bands'] = hybrid_millidecade_ds return spectra_ds def spectrogram(self, binsize=None, bin_overlap=0, nfft=512, fft_overlap=0.5, scaling='density', db=True, band=None): """ Return the spectrogram of the signal (entire file) Parameters ---------- binsize : float, in sec Time window considered. If set to None, only one value is returned bin_overlap : float [0 to 1] Percentage to overlap the bin windows db : bool If set to True the result will be given in db, otherwise in upa^2 nfft : int Length of the fft window in samples. Power of 2. fft_overlap : float [0 to 1] Percentage to overlap the bin windows scaling : string Can be set to 'spectrum' or 'density' depending on the desired output band : tuple or None Band to filter the spectrogram in. A band is represented with a tuple - or a list - as (low_freq, high_freq). If set to None, the broadband up to the Nyquist frequency will be analyzed Returns ------- time : ndarray Array with the starting time of each bin freq : ndarray Array with all the frequencies t : ndarray Time array in seconds of the windows of the spectrogram sxx_list : list Spectrogram list, one for each bin """ downsample = True if band is None: band = [None, self.fs / 2] da = xarray.DataArray() for i, time_bin, signal, start_sample, end_sample in self._bins(binsize, bin_overlap=bin_overlap): signal.set_band(band, downsample=downsample) freq, t, sxx = signal.spectrogram(nfft=nfft, overlap=fft_overlap, scaling=scaling, db=db) da_sxx = xarray.DataArray([sxx], coords={'id': [i], 'start_sample': ('id', [start_sample]), 'end_sample': ('id', [end_sample]), 'datetime': ('id', [time_bin]), 'frequency': freq, 'time': t}, dims=['id', 'frequency', 'time']) if i == 0: da = da_sxx else: da = xarray.concat((da, da_sxx), 'id') units_attrs = output_units.get_units_attrs(method_name='spectrogram_' + scaling, p_ref=self.p_ref, log=db) da.attrs.update(units_attrs) ds = xarray.Dataset(data_vars={'spectrogram': da}, attrs=self._get_metadata_attrs()) return ds def _spectrum(self, scaling='density', binsize=None, bin_overlap=0, nfft=512, fft_overlap=0.5, db=True, percentiles=None, band=None): """ Return the spectrum : frequency distribution of every bin (periodogram) Returns Dataframe with 'datetime' as index and a column for each frequency and each percentile, and a frequency array Parameters ---------- scaling : string Can be set to 'spectrum' or 'density' depending on the desired output binsize : float, in sec Time window considered. If set to None, only one value is returned bin_overlap : float [0 to 1] Percentage to overlap the bin windows nfft : int Length of the fft window in samples. Power of 2. fft_overlap : float [0 to 1] Percentage to overlap the bin windows db : bool If set to True the result will be given in db, otherwise in upa^2 percentiles : list or None List of all the percentiles that have to be returned. If set to empty list, no percentiles is returned band : tuple or None Band to filter the spectrogram in. A band is represented with a tuple - or a list - as (low_freq, high_freq). If set to None, the broadband up to the Nyquist frequency will be analyzed """ downsample = True if percentiles is None: percentiles = [] if band is None: band = [None, self.fs / 2] spectrum_str = 'band_' + scaling ds = xarray.DataArray() for i, time_bin, signal, start_sample, end_sample in self._bins(binsize, bin_overlap=bin_overlap): signal.set_band(band, downsample=downsample) fbands, spectra, percentiles_val = signal.spectrum(scaling=scaling, nfft=nfft, db=db, percentiles=percentiles, overlap=fft_overlap) spectra_da = xarray.DataArray([spectra], coords={'id': [i], 'datetime': ('id', [time_bin]), 'frequency': fbands, 'start_sample': ('id', [start_sample]), 'end_sample': ('id', [end_sample]), }, dims=['id', 'frequency']) percentiles_da = xarray.DataArray([percentiles_val], coords={'id': [i], 'datetime': ('id', [time_bin]), 'percentiles': percentiles}, dims=['id', 'percentiles']) ds_bin = xarray.Dataset({spectrum_str: spectra_da, 'value_percentiles': percentiles_da}) if i == 0: ds = ds_bin else: ds = xarray.concat((ds, ds_bin), 'id') units_attrs = output_units.get_units_attrs(method_name='spectrum_' + scaling, log=db, p_ref=self.p_ref) ds[spectrum_str].attrs.update(units_attrs) ds['value_percentiles'].attrs.update({'units': '%', 'standard_name': 'percentiles'}) ds.attrs = self._get_metadata_attrs() return ds def psd(self, binsize=None, bin_overlap=0, nfft=512, fft_overlap=0.5, db=True, percentiles=None, band=None): """ Return the power spectrum density (PSD) of all the file (units^2 / Hz) re 1 V 1 upa Returns a Dataframe with 'datetime' as index and a column for each frequency and each percentile Parameters ---------- binsize : float, in sec Time window considered. If set to None, only one value is returned bin_overlap : float [0 to 1] Percentage to overlap the bin windows nfft : int Length of the fft window in samples. Recommended power of 2. fft_overlap : float [0 to 1] Percentage to overlap the bin windows db : bool If set to True the result will be given in db, otherwise in upa^2 percentiles : list or None List of all the percentiles that have to be returned. If set to empty list, no percentiles is returned band : tuple or None Band to filter the spectrogram in. A band is represented with a tuple - or a list - as (low_freq, high_freq). If set to None, the broadband up to the Nyquist frequency will be analyzed """ psd_ds = self._spectrum(scaling='density', binsize=binsize, nfft=nfft, fft_overlap=fft_overlap, db=db, bin_overlap=bin_overlap, percentiles=percentiles, band=band) return psd_ds def power_spectrum(self, binsize=None, bin_overlap=0, nfft=512, fft_overlap=0.5, db=True, percentiles=None, band=None): """ Return the power spectrum of all the file (units^2 / Hz) re 1 V 1 upa Returns a Dataframe with 'datetime' as index and a column for each frequency and each percentile Parameters ---------- binsize : float, in sec Time window considered. If set to None, only one value is returned bin_overlap : float [0 to 1] Percentage to overlap the bin windows nfft : int Length of the fft window in samples. Power of 2. fft_overlap : float [0 to 1] Percentage to overlap the windows in the fft db : bool If set to True the result will be given in db, otherwise in upa^2 percentiles : list or None List of all the percentiles that have to be returned. If set to empty list, no percentiles is returned band : tuple or None Band to filter the spectrogram in. A band is represented with a tuple - or a list - as (low_freq, high_freq). If set to None, the broadband up to the Nyquist frequency will be analyzed """ spectrum_ds = self._spectrum(scaling='spectrum', binsize=binsize, nfft=nfft, fft_overlap=fft_overlap, db=db, bin_overlap=bin_overlap, percentiles=percentiles, band=band) return spectrum_ds def spd(self, binsize=None, bin_overlap=0, h=0.1, nfft=512, fft_overlap=0.5, db=True, percentiles=None, min_val=None, max_val=None, band=None): """ Return the spectral probability density. Parameters ---------- binsize : float, in sec Time window considered. If set to None, only one value is returned bin_overlap : float [0 to 1] Percentage to overlap the bin windows h : float Histogram bin width (in the correspondent units, upa or db) nfft : int Length of the fft window in samples. Power of 2. fft_overlap : float [0 to 1] Percentage to overlap the bin windows db : bool If set to True the result will be given in db, otherwise in upa^2 min_val : float Minimum value to compute the SPD histogram max_val : float Maximum value to compute the SPD histogram percentiles : array_like List of all the percentiles that have to be returned. If set to empty list, no percentiles is returned band : tuple or None Band to filter the spectrogram in. A band is represented with a tuple - or a list - as (low_freq, high_freq). If set to None, the broadband up to the Nyquist frequency will be analyzed Returns ------- time : ndarray list with the starting point of each spd df fbands : ndarray list of all the frequencies percentiles : list of float Percentiles to compute edges_list : list of float list of the psd values of the distribution spd_list : list of ndarray list of dataframes with 'frequency' as index and a column for each psd bin and for each percentile (one df per bin) p_list : list of 2d ndarray list of matrices with all the probabilities """ psd_evolution = self.psd(binsize=binsize, nfft=nfft, fft_overlap=fft_overlap, db=db, percentiles=percentiles, bin_overlap=bin_overlap, band=band) return utils.compute_spd(psd_evolution, h=h, percentiles=percentiles, max_val=max_val, min_val=min_val) def source_separation(self, window_time=1.0, n_sources=15, binsize=None, save_path=None, verbose=False, band=None): """ Perform non-negative Matrix Factorization to separate sources Parameters ---------- window_time: float window time to consider in seconds n_sources : int Number of sources binsize : float Time window considered, in seconds. If set to None, only one value is returned save_path: str or Path Where to save the output verbose: bool Set to True to make plots of the process band : tuple or None Band to filter the spectrogram in. A band is represented with a tuple - or a list - as (low_freq, high_freq). If set to None, the broadband up to the Nyquist frequency will be analyzed """ if band is None: band = [None, self.fs / 2] separator = nmf.NMF(window_time=window_time, rank=n_sources, save_path=save_path) ds = xarray.Dataset() for i, time_bin, signal, start_sample, end_sample in self._bins(binsize, bin_overlap=0.0): signal.set_band(band) separation_ds = separator(signal, verbose=verbose) separation_ds = separation_ds.assign_coords({'id': [i], 'datetime': ('id', [time_bin])}) if i == 0: ds = separation_ds else: ds = xarray.concat((ds, separation_ds), 'id') return ds def plot_spectrum_mean(self, scaling='density', db=True, log=True, save_path=None, **kwargs): """ Plot the power spectrogram density of all the file (units^2 / Hz) re 1 V 1 upa Parameters ---------- scaling : str 'density' or 'spectrum' db : boolean If set to True the result will be given in db. Otherwise in upa^2/Hz log : boolean If set to True the scale of the y axis is set to logarithmic save_path : string or Path Where to save the images **kwargs : any attribute valid on psd() function """ psd = self._spectrum(db=db, scaling=scaling, **kwargs) plots.plot_spectrum_mean(ds=psd, data_var='band_' + scaling, log=log, save_path=save_path) def plot_spectrum_per_chunk(self, scaling='density', db=True, log=True, save_path=None, **kwargs): """ Plot the power spectrogram density of all the file (units^2 / Hz) re 1 V 1 upa Parameters ---------- scaling : str 'density' or 'spectrum' db : boolean If set to True the result will be given in db. Otherwise in upa^2/Hz log : boolean If set to True the scale of the y axis is set to logarithmic save_path : string or Path Where to save the images **kwargs : any attribute valid on psd() function """ psd = self._spectrum(db=db, scaling=scaling, **kwargs) plots.plot_spectrum_per_chunk(ds=psd, data_var='band_' + scaling, log=log, save_path=save_path) def plot_spectrogram(self, db=True, log=True, save_path=None, **kwargs): """ Return the spectrogram of the signal (entire file) Parameters ---------- db : boolean If set to True the result will be given in db. Otherwise in upa^2/Hz log : boolean If set to True the scale of the y axis is set to logarithmic save_path : string or Path Where to save the images **kwargs : any attribute valid on spectrogram() function """ ds_spectrogram = self.spectrogram(db=db, **kwargs) plots.plot_spectrogram_per_chunk(ds_spectrogram, log, save_path) def plot_spd(self, db=True, log=True, save_path=None, **kwargs): """ Plot the SPD graph of the bin Parameters ---------- db : boolean If set to True the result will be given in db. Otherwise, in upa^2/Hz log : boolean If set to True the scale of the y-axis is set to logarithmic save_path : string or Path Where to save the images **kwargs : any attribute valid on spd() function """ spd_ds = self.spd(db=db, **kwargs) plots.plot_spd(spd_ds, db=db, log=log, p_ref=self.p_ref, save_path=save_path)