__author__ = "Clea Parcerisas"
__version__ = "0.1"
__credits__ = "Clea Parcerisas"
__email__ = "clea.parcerisas@vliz.be"
__status__ = "Development"
import datetime
import operator
import os
import pathlib
import zipfile
import dateutil.parser as parser
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import xarray
from tqdm import tqdm
from pypam import acoustic_file
from pypam import plots
from pypam import utils
# Apply the default theme
sns.set_theme()
[docs]class ASA:
"""
Init a AcousticSurveyAnalysis (ASA)
Parameters
----------
hydrophone : Hydrophone class from pyhydrophone
folder_path : string or Path
Where all the sound files are
zipped : boolean
Set to True if the directory is zipped
include_dirs : boolean
Set to True if the folder contains other folders with sound files
p_ref : float
Reference pressure in uPa
binsize : float
Time window considered, in seconds. If set to None, only one value is returned
nfft : int
Samples of the fft bin used for the spectral analysis
bin_overlap : float [0 to 1]
Percentage to overlap the bin windows
period : tuple or list
Tuple or list with two elements: start and stop. Has to be a string in the
format YYYY-MM-DD HH:MM:SS
calibration: float, -1 or None
If it is a float, it is the time ignored at the beginning of the file. If None, nothing is done. If negative,
the function calibrate from the hydrophone is performed, and the first samples ignored (and hydrophone updated)
dc_subtract: bool
Set to True to subtract the dc noise (root mean squared value)
timezone: datetime.tzinfo, pytz.tzinfo.BaseTZInfo, dateutil.tz.tz.tzfile, str or None
Timezone where the data was recorded in
"""
def __init__(self,
hydrophone: object,
folder_path,
zipped=False,
include_dirs=False,
p_ref=1.0,
binsize=None,
bin_overlap=0,
nfft=1.0,
fft_overlap=0.5,
period=None,
timezone='UTC',
channel=0,
calibration=None,
dc_subtract=False,
extra_attrs=None):
self.hydrophone = hydrophone
self.acu_files = AcousticFolder(folder_path=folder_path, zipped=zipped,
include_dirs=include_dirs)
self.p_ref = p_ref
self.binsize = binsize
self.nfft = nfft
self.bin_overlap = bin_overlap
self.fft_overlap = fft_overlap
if period is not None:
if not isinstance(period[0], datetime.datetime):
start = parser.parse(period[0])
end = parser.parse(period[1])
period = [start, end]
self.period = period
self.timezone = timezone
self.datetime_timezone = 'UTC'
self.channel = channel
self.calibration = calibration
self.dc_subtract = dc_subtract
if extra_attrs is None:
self.extra_attrs = {}
else:
self.extra_attrs = extra_attrs
self.file_dependent_attrs = ['file_path', '_start_frame', 'end_to_end_calibration']
def _files(self):
"""
Iterator that returns AcuFile for each wav file in the folder
"""
for file_list in tqdm(self.acu_files):
wav_file = file_list[0]
print(wav_file)
sound_file = self._hydro_file(wav_file)
if sound_file.is_in_period(self.period) and sound_file.file.frames > 0:
yield sound_file
def _hydro_file(self, wav_file):
"""
Return the AcuFile object from the wav_file
Parameters
----------
wav_file : str or Path
Sound file
Returns
-------
Object AcuFile
"""
hydro_file = acoustic_file.AcuFile(sfile=wav_file, hydrophone=self.hydrophone, p_ref=self.p_ref,
timezone=self.timezone, channel=self.channel, calibration=self.calibration,
dc_subtract=self.dc_subtract)
return hydro_file
def _get_metadata_attrs(self):
metadata_keys = [
'binsize',
'nfft',
'bin_overlap',
'fft_overlap',
'timezone',
'datetime_timezone',
'p_ref',
'channel',
'dc_subtract',
'hydrophone.name',
'hydrophone.model',
'hydrophone.sensitivity',
'hydrophone.preamp_gain',
'hydrophone.Vpp',
]
metadata_attrs = self.extra_attrs.copy()
for k in metadata_keys:
d = self
for sub_k in k.split('.'):
d = d.__dict__[sub_k]
if isinstance(d, pathlib.Path):
d = str(d)
metadata_attrs[k.replace('.', '_')] = d
return metadata_attrs
def evolution_multiple(self, method_list: list, band_list=None, **kwargs):
"""
Compute the method in each file and output the evolution
Returns a xarray DataSet with datetime as index and one row for each bin of each file
Parameters
----------
method_list : string
Method name present in AcuFile
band_list: list of tuples, tuple or None
Bands to filter. Can be multiple bands (all of them will be analyzed) or only one band. A band is
represented with a tuple as (low_freq, high_freq). If set to None, the broadband up to the Nyquist
frequency will be analyzed
**kwargs :
Any accepted parameter for the method_name
"""
ds = xarray.Dataset(attrs=self._get_metadata_attrs())
f = operator.methodcaller('_apply_multiple', method_list=method_list, binsize=self.binsize,
nfft=self.nfft, fft_overlap=self.fft_overlap, bin_overlap=self.bin_overlap,
band_list=band_list, **kwargs)
for sound_file in self._files():
ds_output = f(sound_file)
ds = utils.merge_ds(ds, ds_output, self.file_dependent_attrs)
return ds
def evolution(self, method_name, band_list=None, **kwargs):
"""
Evolution of only one param name
Parameters
----------
method_name : string
Method to compute the evolution of
band_list: list of tuples, tuple or None
Bands to filter. Can be multiple bands (all of them will be analyzed) or only one band. A band is
represented with a tuple as (low_freq, high_freq). If set to None, the broadband up to the Nyquist
frequency will be analyzed
**kwargs : any arguments to be passed to the method
"""
return self.evolution_multiple(method_list=[method_name], band_list=band_list, **kwargs)
def evolution_freq_dom(self, method_name, **kwargs):
"""
Returns the evolution of frequency domain parameters
Parameters
----------
method_name : str
Name of the method of the acoustic_file class to compute
Returns
-------
A xarray DataSet with a row per bin with the method name output
"""
ds = xarray.Dataset(attrs=self._get_metadata_attrs())
f = operator.methodcaller(method_name, binsize=self.binsize, nfft=self.nfft, fft_overlap=self.fft_overlap,
bin_overlap=self.bin_overlap, **kwargs)
for sound_file in self._files():
ds_output = f(sound_file)
ds = utils.merge_ds(ds, ds_output, self.file_dependent_attrs)
return ds
def timestamps_array(self):
"""
Return a xarray DataSet with the timestamps of each bin.
"""
ds = xarray.Dataset(attrs=self._get_metadata_attrs())
f = operator.methodcaller('timestamp_da', binsize=self.binsize, bin_overlap=self.bin_overlap)
for sound_file in self._files():
ds_output = f(sound_file)
ds = utils.merge_ds(ds, ds_output, self.file_dependent_attrs)
return ds
def start_end_timestamp(self):
"""
Return the start and the end timestamps
"""
wav_file = self.acu_files[0][0]
print(wav_file)
sound_file = self._hydro_file(wav_file)
start_datetime = sound_file.date
file_list = self.acu_files[-1]
wav_file = file_list[0]
print(wav_file)
sound_file = self._hydro_file(wav_file)
end_datetime = sound_file.date + datetime.timedelta(seconds=sound_file.total_time())
return start_datetime, end_datetime
def apply_to_all(self, method_name, **kwargs):
"""
Apply the method to all the files
Parameters
----------
method_name : string
Method name present in AcuFile
**kwargs :
Any accepted parameter for the method_name
"""
f = operator.methodcaller(method_name, binsize=self.binsize, nfft=self.nfft, fft_overlap=self.fft_overlap,
bin_overlap=self.bin_overlap, **kwargs)
for sound_file in self._files():
f(sound_file)
def duration(self):
"""
Return the duration in seconds of all the survey
"""
total_time = 0
for sound_file in self._files():
total_time += sound_file.total_time()
return total_time
def mean_rms(self, **kwargs):
"""
Return the mean root mean squared value of the survey
Accepts any other input than the correspondent method in the acoustic file.
Returns the rms value of the whole survey
Parameters
----------
**kwargs :
Any accepted arguments for the rms function of the AcuFile
"""
rms_evolution = self.evolution('rms', **kwargs)
return rms_evolution['rms'].mean()
def spd(self, db=True, h=0.1, percentiles=None, min_val=None, max_val=None):
"""
Return the empirical power density.
Parameters
----------
db : boolean
If set to True the result will be given in db. Otherwise, in uPa^2
h : float
Histogram bin (in the correspondent units, uPa or db)
percentiles : list or None
All the percentiles that have to be returned. If set to None, no percentiles
is returned (in 100 per cent)
min_val : float
Minimum value to compute the SPD histogram
max_val : float
Maximum value to compute the SPD histogram
Returns
-------
percentiles : array like
List of the percentiles calculated
p : np.array
Matrix with all the probabilities
"""
psd_evolution = self.evolution_freq_dom('psd', db=db, percentiles=percentiles)
return utils.compute_spd(psd_evolution, h=h, percentiles=percentiles, min_val=min_val, max_val=max_val)
def hybrid_millidecade_bands(self, db=True, method='spectrum', band=None, percentiles=None):
"""
Parameters
----------
db : bool
If set to True the result will be given in db, otherwise in upa^2
method: string
Can be 'spectrum' or 'density'
band : tuple or None
Band to filter the spectrogram in. A band is represented with a tuple - or a list - as
(low_freq, high_freq). If set to None, the broadband up to the Nyquist frequency will be analyzed
percentiles : list or None
List of all the percentiles that have to be returned. If set to empty list,
no percentiles is returned
Returns
-------
An xarray dataset with the band_density (or band_spectrum) and the millidecade_bands variables
"""
spectra_ds = self.evolution_freq_dom('_spectrum', band=band, db=False, percentiles=percentiles, scaling=method)
bands_limits, bands_c = utils.get_hybrid_millidecade_limits(band=band, nfft=self.nfft)
fft_bin_width = band[1] * 2 / self.nfft # Signal has been downsampled
milli_spectra = utils.spectra_ds_to_bands(spectra_ds['band_%s' % method],
bands_limits, bands_c, fft_bin_width=fft_bin_width, db=db)
# Add the millidecade
spectra_ds['millidecade_bands'] = milli_spectra
return spectra_ds
def cut_and_place_files_period(self, period, folder_name, extensions=None):
"""
Cut the files in the specified periods and store them in the right folder
Parameters
----------
period: Tuple or list
Tuple or list with (start, stop)
folder_name: str or Path
Path to the location of the files to cut
extensions: list of strings
the extensions that want to be moved (csv will be split, log will just be moved)
"""
if extensions is None:
extensions = []
start_date = parser.parse(period[0])
end_date = parser.parse(period[1])
print(start_date, end_date)
folder_path = self.acu_files.folder_path.joinpath(folder_name)
self.acu_files.extensions = extensions
for file_list in tqdm(self.acu_files):
wav_file = file_list[0]
sound_file = self._hydro_file(wav_file)
if sound_file.contains_date(start_date) and sound_file.file.frames > 0:
print('start!', wav_file)
# Split the sound file in two files
first, second = sound_file.split(start_date)
move_file(second, folder_path)
# Split the metadata files
for i, metadata_file in enumerate(file_list[1:]):
if extensions[i] not in ['.log.xml', '.sud', '.bcl', '.dwv']:
ds = pd.read_csv(metadata_file)
ds['datetime'] = pd.to_datetime(ds['unix time'] * 1e9)
ds_first = ds[ds['datetime'] < start_date]
ds_second = ds[ds['datetime'] >= start_date]
ds_first.to_csv(metadata_file)
new_metadata_path = second.parent.joinpath(
second.name.replace('.wav', extensions[i]))
ds_second.to_csv(new_metadata_path)
# Move the file
move_file(new_metadata_path, folder_path)
else:
move_file(metadata_file, folder_path)
elif sound_file.contains_date(end_date):
print('end!', wav_file)
# Split the sound file in two files
first, second = sound_file.split(end_date)
move_file(first, folder_path)
# Split the metadata files
for i, metadata_file in enumerate(file_list[1:]):
if extensions[i] not in ['.log.xml', '.sud', '.bcl', '.dwv']:
ds = pd.read_csv(metadata_file)
ds['datetime'] = pd.to_datetime(ds['unix time'] * 1e9)
ds_first = ds[ds['datetime'] < start_date]
ds_second = ds[ds['datetime'] >= start_date]
ds_first.to_csv(metadata_file)
new_metadata_path = second.parent.joinpath(
second.name.replace('.wav', extensions[i]))
ds_second.to_csv(new_metadata_path)
# Move the file (also if log)
move_file(metadata_file, folder_path)
else:
if sound_file.is_in_period([start_date, end_date]):
print('moving', wav_file)
sound_file.file.close()
move_file(wav_file, folder_path)
for metadata_file in file_list[1:]:
move_file(metadata_file, folder_path)
else:
pass
return 0
def source_separation(self, window_time=1.0, n_sources=15, save_path=None, verbose=False, band=None):
"""
Separate the signal in n_sources sources, using non-negative matrix factorization
Parameters
----------
window_time: float
Duration of the window in seconds
n_sources: int
Number of sources to separate the sound in
save_path: str or Path
Where to save the output
verbose: bool
Set to True to make plots of the process
band : tuple or list
Tuple or list with two elements: low-cut and high-cut of the band to analyze
"""
ds = xarray.Dataset(attrs=self._get_metadata_attrs())
for sound_file in self._files():
nmf_ds = sound_file.source_separation(window_time, n_sources, binsize=self.binsize, band=band,
save_path=save_path, verbose=verbose)
ds = utils.merge_ds(ds, nmf_ds, self.file_dependent_attrs)
return ds
def plot_rms_evolution(self, db=True, save_path=None):
"""
Plot the rms evolution
Parameters
----------
db : boolean
If set to True, output in db
save_path : string or Path
Where to save the output graph. If None, it is not saved
"""
rms_evolution = self.evolution('rms', db=db)
plots.plot_rms_evolution(ds=rms_evolution, save_path=save_path)
def plot_rms_daily_patterns(self, db=True, save_path=None):
"""
Plot the daily rms patterns
Parameters
----------
db : boolean
If set to True, the output is in db and will be show in the units output
save_path : string or Path
Where to save the output graph. If None, it is not saved
"""
rms_evolution = self.evolution('rms', db=db).sel(band=0)
plots.plot_daily_patterns_from_ds(ds=rms_evolution, data_var='rms', save_path=save_path)
def plot_mean_power_spectrum(self, db=True, save_path=None, log=True, **kwargs):
"""
Plot the resulting mean power spectrum
Parameters
----------
db : boolean
If set to True, output in db
log : boolean
If set to True, y axis in logarithmic scale
save_path : string or Path
Where to save the output graph. If None, it is not saved
**kwargs : Any accepted for the power_spectrum method
"""
power = self.evolution_freq_dom(method_name='power_spectrum', db=db, **kwargs)
return plots.plot_spectrum_mean(ds=power, data_var='band_spectrum', log=log, save_path=save_path)
def plot_mean_psd(self, db=True, save_path=None, log=True, **kwargs):
"""
Plot the resulting mean psd
Parameters
----------
db : boolean
If set to True, output in db
log : boolean
If set to True, y axis in logarithmic scale
save_path : string or Path
Where to save the output graph. If None, it is not saved
**kwargs : Any accepted for the psd method
"""
psd = self.evolution_freq_dom(method_name='psd', db=db, **kwargs)
return plots.plot_spectrum_mean(ds=psd, data_var='band_density', log=log, save_path=save_path)
def plot_power_ltsa(self, db=True, save_path=None, **kwargs):
"""
Plot the evolution of the power frequency distribution (Long Term Spectrogram Analysis)
Parameters
----------
db : boolean
If set to True, output in db
save_path : string or Path
Where to save the output graph. If None, it is not saved
**kwargs : Any accepted for the power spectrum method
"""
power_evolution = self.evolution_freq_dom(method_name='power_spectrum', db=db, **kwargs)
plots.plot_ltsa(ds=power_evolution, data_var='band_spectrum', save_path=save_path)
return power_evolution
def plot_psd_ltsa(self, db=True, save_path=None, **kwargs):
"""
Plot the evolution of the psd power spectrum density (Long Term Spectrogram Analysis)
Parameters
----------
db : boolean
If set to True, output in db
save_path : string or Path
Where to save the output graph. If None, it is not saved
**kwargs : Any accepted for the psd method
"""
psd_evolution = self.evolution_freq_dom(method_name='psd', db=db, **kwargs)
plots.plot_ltsa(ds=psd_evolution, data_var='band_density', save_path=save_path)
return psd_evolution
def plot_spd(self, db=True, log=True, save_path=None, **kwargs):
"""
Plot the the SPD graph
Parameters
----------
db : boolean
If set to True, output in db
log : boolean
If set to True, y-axis in logarithmic scale
save_path : string or Path
Where to save the output graph. If None, it is not saved
**kwargs : Any accepted for the spd method
"""
spd_ds = self.spd(db=db, **kwargs)
plots.plot_spd(spd_ds, log=log, save_path=save_path)
def save(self, file_path):
"""
Save the ASA with all the computed values
Returns
-------
"""
class AcousticFolder:
"""
Class to help through the iterations of the acoustic folder.
"""
def __init__(self, folder_path, zipped=False, include_dirs=False, extensions=None):
"""
Store the information about the folder.
It will create an iterator that returns all the pairs of extensions having the same name than the wav file
Parameters
----------
folder_path : string or pathlib.Path
Path to the folder containing the acoustic files
zipped : boolean
Set to True if the subfolders are zipped
include_dirs : boolean
Set to True if the subfolders are included in the study
extensions : list
List of strings with all the extensions that will be returned (.wav is automatic)
i.e. extensions=['.xml', '.bcl'] will return [wav, xml and bcl] files
"""
self.folder_path = pathlib.Path(folder_path)
if not self.folder_path.exists():
raise FileNotFoundError('The path %s does not exist. Please choose another one.' % folder_path)
if len(list(self.folder_path.glob('**/*.wav'))) == 0:
raise ValueError('The directory %s is empty. Please select another directory with *.wav files' %
folder_path)
self.zipped = zipped
self.recursive = include_dirs
if extensions is None:
extensions = []
self.extensions = extensions
def __getitem__(self, n):
"""
Get n wav file
"""
self.__iter__()
self.n = n
return self.__next__()
def __iter__(self):
"""
Iteration
"""
self.n = 0
if not self.zipped:
if self.recursive:
self.files_list = sorted(self.folder_path.glob('**/*.wav'))
else:
self.files_list = sorted(self.folder_path.glob('*.wav'))
else:
if self.recursive:
self.folder_list = sorted(self.folder_path.iterdir())
self.zipped_subfolder = AcousticFolder(self.folder_list[self.n],
extensions=self.extensions,
zipped=self.zipped,
include_dirs=self.recursive)
else:
zipped_folder = zipfile.ZipFile(self.folder_path, 'r', allowZip64=True)
self.files_list = []
total_files_list = zipped_folder.namelist()
for f in total_files_list:
extension = f.split(".")[-1]
if extension == 'wav':
self.files_list.append(f)
return self
def __next__(self):
"""
Next wav file
"""
if self.n < len(self.files_list):
files_list = []
if self.zipped:
if self.recursive:
try:
self.files_list = self.zipped_subfolder.__next__()
except StopIteration:
self.n += 1
self.zipped_subfolder = AcousticFolder(self.folder_list[self.n],
extensions=self.extensions,
zipped=self.zipped,
include_dirs=self.recursive)
else:
file_name = self.files_list[self.n]
zipped_folder = zipfile.ZipFile(self.folder_path, 'r', allowZip64=True)
wav_file = zipped_folder.open(file_name)
files_list.append(wav_file)
for extension in self.extensions:
ext_file_name = file_name.parent.joinpath(
file_name.name.replace('.wav', extension))
files_list.append(zipped_folder.open(ext_file_name))
self.n += 1
return files_list
else:
wav_path = self.files_list[self.n]
files_list.append(wav_path)
for extension in self.extensions:
files_list.append(pathlib.Path(str(wav_path).replace('.wav', extension)))
self.n += 1
return files_list
else:
raise StopIteration
def __len__(self):
if not self.zipped:
if self.recursive:
n_files = len(list(self.folder_path.glob('**/*.wav')))
else:
n_files = len(list(self.folder_path.glob('*.wav')))
else:
if self.recursive:
n_files = len(list(self.folder_path.iterdir()))
else:
zipped_folder = zipfile.ZipFile(self.folder_path, 'r', allowZip64=True)
n_files = len(zipped_folder.namelist())
return n_files
def move_file(file_path, new_folder_path):
"""
Move the file to the new folder
Parameters
----------
file_path : string or Path
Original file path
new_folder_path : string or Path
New folder destination (without the file name)
"""
if not isinstance(file_path, pathlib.Path):
file_path = pathlib.Path(file_path)
if not isinstance(new_folder_path, pathlib.Path):
new_folder_path = pathlib.Path(new_folder_path)
if not os.path.exists(new_folder_path):
os.makedirs(new_folder_path)
new_path = new_folder_path.joinpath(file_path.name)
os.rename(file_path, new_path)