# -*- coding: utf-8 -*-
"""
biosppy.storage
---------------
This module provides several data storage methods.
:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes
:license: BSD 3-clause, see LICENSE for more details.
"""
# Imports
# compat
from __future__ import absolute_import, division, print_function
from six.moves import range
import six
# built-in
import datetime
import json
import os
import zipfile
# 3rd party
import h5py
import numpy as np
import shortuuid
import joblib
# local
from . import utils
[docs]def serialize(data, path, compress=3):
"""Serialize data and save to a file using sklearn's joblib.
Parameters
----------
data : object
Object to serialize.
path : str
Destination path.
compress : int, optional
Compression level; from 0 to 9 (highest compression).
"""
# normalize path
utils.normpath(path)
joblib.dump(data, path, compress=compress)
[docs]def deserialize(path):
"""Deserialize data from a file using sklearn's joblib.
Parameters
----------
path : str
Source path.
Returns
-------
data : object
Deserialized object.
"""
# normalize path
path = utils.normpath(path)
return joblib.load(path)
[docs]def dumpJSON(data, path):
"""Save JSON data to a file.
Parameters
----------
data : dict
The JSON data to dump.
path : str
Destination path.
"""
# normalize path
path = utils.normpath(path)
with open(path, 'w') as fid:
json.dump(data, fid)
[docs]def loadJSON(path):
"""Load JSON data from a file.
Parameters
----------
path : str
Source path.
Returns
-------
data : dict
The loaded JSON data.
"""
# normalize path
path = utils.normpath(path)
with open(path, 'r') as fid:
return json.load(fid)
[docs]def zip_write(fid, files, recursive=True, root=None):
"""Write files to zip archive.
Parameters
----------
fid : file-like object
The zip file to write into.
files : iterable
List of files or directories to pack.
recursive : bool, optional
If True, sub-directories and sub-folders are also written to the
archive.
root : str, optional
Relative folder path.
Notes
-----
* Ignores non-existent files and directories.
"""
if root is None:
root = ''
for item in files:
fpath = utils.normpath(item)
if not os.path.exists(fpath):
continue
# relative archive name
arcname = os.path.join(root, os.path.split(fpath)[1])
# write
fid.write(fpath, arcname)
# recur
if recursive and os.path.isdir(fpath):
rfiles = [os.path.join(fpath, subitem)
for subitem in os.listdir(fpath)]
zip_write(fid, rfiles, recursive=recursive, root=arcname)
[docs]def pack_zip(files, path, recursive=True, forceExt=True):
"""Pack files into a zip archive.
Parameters
----------
files : iterable
List of files or directories to pack.
path : str
Destination path.
recursive : bool, optional
If True, sub-directories and sub-folders are also written to the
archive.
forceExt : bool, optional
Append default extension.
Returns
-------
zip_path : str
Full path to created zip archive.
"""
# normalize destination path
zip_path = utils.normpath(path)
if forceExt:
zip_path += '.zip'
# allowZip64 is True to allow files > 2 GB
with zipfile.ZipFile(zip_path, 'w', allowZip64=True) as fid:
zip_write(fid, files, recursive=recursive)
return zip_path
[docs]def unpack_zip(zip_path, path):
"""Unpack a zip archive.
Parameters
----------
zip_path : str
Path to zip archive.
path : str
Destination path (directory).
"""
# allowZip64 is True to allow files > 2 GB
with zipfile.ZipFile(zip_path, 'r', allowZip64=True) as fid:
fid.extractall(path)
[docs]def alloc_h5(path):
"""Prepare an HDF5 file.
Parameters
----------
path : str
Path to file.
"""
# normalize path
path = utils.normpath(path)
with h5py.File(path):
pass
[docs]def store_h5(path, label, data):
"""Store data to HDF5 file.
Parameters
----------
path : str
Path to file.
label : hashable
Data label.
data : array
Data to store.
"""
# normalize path
path = utils.normpath(path)
with h5py.File(path) as fid:
label = str(label)
try:
fid.create_dataset(label, data=data)
except (RuntimeError, ValueError):
# existing label, replace
del fid[label]
fid.create_dataset(label, data=data)
[docs]def load_h5(path, label):
"""Load data from an HDF5 file.
Parameters
----------
path : str
Path to file.
label : hashable
Data label.
Returns
-------
data : array
Loaded data.
"""
# normalize path
path = utils.normpath(path)
with h5py.File(path) as fid:
label = str(label)
try:
return fid[label][...]
except KeyError:
return None
[docs]def store_txt(path, data, sampling_rate=1000., resolution=None, date=None,
labels=None, precision=6):
"""Store data to a simple text file.
Parameters
----------
path : str
Path to file.
data : array
Data to store (up to 2 dimensions).
sampling_rate : int, float, optional
Sampling frequency (Hz).
resolution : int, optional
Sampling resolution.
date : datetime, str, optional
Datetime object, or an ISO 8601 formatted date-time string.
labels : list, optional
Labels for each column of `data`.
precision : int, optional
Precision for string conversion.
Raises
------
ValueError
If the number of data dimensions is greater than 2.
ValueError
If the number of labels is inconsistent with the data.
"""
# ensure numpy
data = np.array(data)
# check dimension
if data.ndim > 2:
raise ValueError("Number of data dimensions cannot be greater than 2.")
# build header
header = "Simple Text Format\n"
header += "Sampling Rate (Hz):= %0.2f\n" % sampling_rate
if resolution is not None:
header += "Resolution:= %d\n" % resolution
if date is not None:
if isinstance(date, six.string_types):
header += "Date:= %s\n" % date
elif isinstance(date, datetime.datetime):
header += "Date:= %s\n" % date.isoformat()
else:
ct = datetime.datetime.utcnow().isoformat()
header += "Date:= %s\n" % ct
# data type
header += "Data Type:= %s\n" % data.dtype
# labels
if data.ndim == 1:
ncols = 1
elif data.ndim == 2:
ncols = data.shape[1]
if labels is None:
labels = ['%d' % i for i in range(ncols)]
elif len(labels) != ncols:
raise ValueError("Inconsistent number of labels.")
header += "Labels:= %s" % '\t'.join(labels)
# normalize path
path = utils.normpath(path)
# data format
p = '%d' % precision
if np.issubdtype(data.dtype, np.integer):
fmt = '%d'
elif np.issubdtype(data.dtype, np.float):
fmt = '%%.%sf' % p
elif np.issubdtype(data.dtype, np.bool_):
fmt = '%d'
else:
fmt = '%%.%se' % p
# store
np.savetxt(path, data, header=header, fmt=fmt, delimiter='\t')
[docs]def load_txt(path):
"""Load data from a text file.
Parameters
----------
path : str
Path to file.
Returns
-------
data : array
Loaded data.
mdata : dict
Metadata.
"""
# normalize path
path = utils.normpath(path)
with open(path, 'rb') as fid:
lines = fid.readlines()
# extract header
mdata_tmp = {}
fields = ['Sampling Rate', 'Resolution', 'Date', 'Data Type', 'Labels']
values = []
for item in lines:
if b'#' in item:
item = item.decode('utf-8')
# parse comment
for f in fields:
if f in item:
mdata_tmp[f] = item.split(':= ')[1].strip()
fields.remove(f)
break
else:
values.append(item)
# convert mdata
mdata = {}
df = '%Y-%m-%dT%H:%M:%S.%f'
try:
mdata['sampling_rate'] = float(mdata_tmp['Sampling Rate'])
except KeyError:
pass
try:
mdata['resolution'] = int(mdata_tmp['Resolution'])
except KeyError:
pass
try:
dtype = mdata_tmp['Data Type']
except KeyError:
dtype = None
try:
d = datetime.datetime.strptime(mdata_tmp['Date'], df)
mdata['date'] = d
except (KeyError, ValueError):
pass
try:
labels = mdata_tmp['Labels'].split('\t')
mdata['labels'] = labels
except KeyError:
pass
# load array
data = np.genfromtxt(values, dtype=dtype, delimiter=b'\t')
return data, mdata
[docs]class HDF(object):
"""Wrapper class to operate on BioSPPy HDF5 files.
Parameters
----------
path : str
Path to the HDF5 file.
mode : str, optional
File mode; one of:
* 'a': read/write, creates file if it does not exist;
* 'r+': read/write, file must exist;
* 'r': read only, file must exist;
* 'w': create file, truncate if it already exists;
* 'w-': create file, fails if it already esists.
"""
def __init__(self, path=None, mode='a'):
# normalize path
path = utils.normpath(path)
# open file
self._file = h5py.File(path, mode)
# check BioSPPy structures
try:
self._signals = self._file['signals']
except KeyError:
if mode == 'r':
raise IOError(
"Unable to create 'signals' group with current file mode.")
self._signals = self._file.create_group('signals')
try:
self._events = self._file['events']
except KeyError:
if mode == 'r':
raise IOError(
"Unable to create 'events' group with current file mode.")
self._events = self._file.create_group('events')
def __enter__(self):
"""Method for with statement."""
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Method for with statement."""
self.close()
def _join_group(self, *args):
"""Join group elements.
Parameters
----------
``*args`` : list
Group elements to join.
Returns
-------
weg : str
Joined group path.
"""
bits = []
for item in args:
bits.extend(item.split('/'))
# filter out blanks, slashes, white space
out = []
for item in bits:
item = item.strip()
if item == '':
continue
elif item == '/':
continue
out.append(item)
weg = '/' + '/'.join(out)
return weg
[docs] def add_signal(self,
signal=None,
mdata=None,
group='',
name=None,
compress=False):
"""Add a signal to the file.
Parameters
----------
signal : array
Signal to add.
mdata : dict, optional
Signal metadata.
group : str, optional
Destination signal group.
name : str, optional
Name of the dataset to create.
compress : bool, optional
If True, the signal will be compressed with gzip.
Returns
-------
group : str
Destination group.
name : str
Name of the created signal dataset.
"""
# check inputs
if signal is None:
raise TypeError("Please specify an input signal.")
if mdata is None:
mdata = {}
if name is None:
name = shortuuid.uuid()
# navigate to group
weg = self._join_group(self._signals.name, group)
try:
node = self._file[weg]
except KeyError:
# create group
node = self._file.create_group(weg)
# create dataset
if compress:
dset = node.create_dataset(name, data=signal, compression='gzip')
else:
dset = node.create_dataset(name, data=signal)
# add metadata
dset.attrs['json'] = json.dumps(mdata)
# output
grp = weg.replace('/signals', '')
return utils.ReturnTuple((grp, name), ('group', 'name'))
def _get_signal(self, group='', name=None):
"""Retrieve a signal dataset from the file.
Parameters
----------
group : str, optional
Signal group.
name : str
Name of the signal dataset.
Returns
-------
dataset : h5py.Dataset
HDF5 dataset.
"""
# check inputs
if name is None:
raise TypeError(
"Please specify the name of the signal to retrieve.")
# navigate to group
weg = self._join_group(self._signals.name, group)
try:
node = self._file[weg]
except KeyError:
raise KeyError("Inexistent signal group.")
# get data
try:
dset = node[name]
except KeyError:
raise KeyError("Inexistent signal dataset.")
return dset
[docs] def get_signal(self, group='', name=None):
"""Retrieve a signal from the file.
Parameters
----------
group : str, optional
Signal group.
name : str
Name of the signal dataset.
Returns
-------
signal : array
Retrieved signal.
mdata : dict
Signal metadata.
Notes
-----
* Loads the entire signal data into memory.
"""
dset = self._get_signal(group=group, name=name)
signal = dset[...]
try:
mdata = json.loads(dset.attrs['json'])
except KeyError:
mdata = {}
return utils.ReturnTuple((signal, mdata), ('signal', 'mdata'))
[docs] def del_signal(self, group='', name=None):
"""Delete a signal from the file.
Parameters
----------
group : str, optional
Signal group.
name : str
Name of the dataset.
"""
dset = self._get_signal(group=group, name=name)
try:
del self._file[dset.name]
except IOError:
raise IOError("Unable to delete object with current file mode.")
[docs] def del_signal_group(self, group=''):
"""Delete all signals in a file group.
Parameters
----------
group : str, optional
Signal group.
"""
# navigate to group
weg = self._join_group(self._signals.name, group)
try:
node = self._file[weg]
except KeyError:
raise KeyError("Inexistent signal group.")
if node.name == '/signals':
# delete all elements
for _, item in six.iteritems(node):
try:
del self._file[item.name]
except IOError:
raise IOError(
"Unable to delete object with current file mode.")
else:
# delete single node
try:
del self._file[node.name]
except IOError:
raise IOError(
"Unable to delete object with current file mode.")
[docs] def list_signals(self, group='', recursive=False):
"""List signals in the file.
Parameters
----------
group : str, optional
Signal group.
recursive : bool, optional
If True, also lists signals in sub-groups.
Returns
-------
signals : list
List of (group, name) tuples of the found signals.
"""
# navigate to group
weg = self._join_group(self._signals.name, group)
try:
node = self._file[weg]
except KeyError:
raise KeyError("Inexistent signal group.")
out = []
for name, item in six.iteritems(node):
if isinstance(item, h5py.Dataset):
out.append((group, name))
elif recursive and isinstance(item, h5py.Group):
aux = self._join_group(group, name)
out.extend(self.list_signals(group=aux,
recursive=True)['signals'])
return utils.ReturnTuple((out,), ('signals',))
[docs] def add_event(self,
ts=None,
values=None,
mdata=None,
group='',
name=None,
compress=False):
"""Add an event to the file.
Parameters
----------
ts : array
Array of time stamps.
values : array, optional
Array with data for each time stamp.
mdata : dict, optional
Event metadata.
group : str, optional
Destination event group.
name : str, optional
Name of the dataset to create.
compress : bool, optional
If True, the data will be compressed with gzip.
Returns
-------
group : str
Destination group.
name : str
Name of the created event dataset.
"""
# check inputs
if ts is None:
raise TypeError("Please specify an input array of time stamps.")
if values is None:
values = []
if mdata is None:
mdata = {}
if name is None:
name = shortuuid.uuid()
# navigate to group
weg = self._join_group(self._events.name, group)
try:
node = self._file[weg]
except KeyError:
# create group
node = self._file.create_group(weg)
# create new event group
evt_node = node.create_group(name)
# create datasets
if compress:
_ = evt_node.create_dataset('ts', data=ts, compression='gzip')
_ = evt_node.create_dataset('values',
data=values,
compression='gzip')
else:
_ = evt_node.create_dataset('ts', data=ts)
_ = evt_node.create_dataset('values', data=values)
# add metadata
evt_node.attrs['json'] = json.dumps(mdata)
# output
grp = weg.replace('/events', '')
return utils.ReturnTuple((grp, name), ('group', 'name'))
def _get_event(self, group='', name=None):
"""Retrieve event datasets from the file.
Parameters
----------
group : str, optional
Event group.
name : str
Name of the event dataset.
Returns
-------
event : h5py.Group
HDF5 event group.
ts : h5py.Dataset
HDF5 time stamps dataset.
values : h5py.Dataset
HDF5 values dataset.
"""
# check inputs
if name is None:
raise TypeError(
"Please specify the name of the signal to retrieve.")
# navigate to group
weg = self._join_group(self._events.name, group)
try:
node = self._file[weg]
except KeyError:
raise KeyError("Inexistent event group.")
# event group
try:
evt_group = node[name]
except KeyError:
raise KeyError("Inexistent event dataset.")
# get data
try:
ts = evt_group['ts']
except KeyError:
raise KeyError("Could not find expected time stamps structure.")
try:
values = evt_group['values']
except KeyError:
raise KeyError("Could not find expected values structure.")
return evt_group, ts, values
[docs] def get_event(self, group='', name=None):
"""Retrieve an event from the file.
Parameters
----------
group : str, optional
Event group.
name : str
Name of the event dataset.
Returns
-------
ts : array
Array of time stamps.
values : array
Array with data for each time stamp.
mdata : dict
Event metadata.
Notes
-----
Loads the entire event data into memory.
"""
evt_group, dset_ts, dset_values = self._get_event(group=group,
name=name)
ts = dset_ts[...]
values = dset_values[...]
try:
mdata = json.loads(evt_group.attrs['json'])
except KeyError:
mdata = {}
return utils.ReturnTuple((ts, values, mdata),
('ts', 'values', 'mdata'))
[docs] def del_event(self, group='', name=None):
"""Delete an event from the file.
Parameters
----------
group : str, optional
Event group.
name : str
Name of the event dataset.
"""
evt_group, _, _ = self._get_event(group=group, name=name)
try:
del self._file[evt_group.name]
except IOError:
raise IOError("Unable to delete object with current file mode.")
[docs] def del_event_group(self, group=''):
"""Delete all events in a file group.
Parameters
----------
group str, optional
Event group.
"""
# navigate to group
weg = self._join_group(self._events.name, group)
try:
node = self._file[weg]
except KeyError:
raise KeyError("Inexistent event group.")
if node.name == '/events':
# delete all elements
for _, item in six.iteritems(node):
try:
del self._file[item.name]
except IOError:
raise IOError(
"Unable to delete object with current file mode.")
else:
# delete single node
try:
del self._file[node.name]
except IOError:
raise IOError(
"Unable to delete object with current file mode.")
[docs] def list_events(self, group='', recursive=False):
"""List events in the file.
Parameters
----------
group : str, optional
Event group.
recursive : bool, optional
If True, also lists events in sub-groups.
Returns
-------
events : list
List of (group, name) tuples of the found events.
"""
# navigate to group
weg = self._join_group(self._events.name, group)
try:
node = self._file[weg]
except KeyError:
raise KeyError("Inexistent event group.")
out = []
for name, item in six.iteritems(node):
if isinstance(item, h5py.Group):
try:
_ = item.attrs['json']
except KeyError:
# normal group
if recursive:
aux = self._join_group(group, name)
out.extend(self.list_events(group=aux,
recursive=True)['events'])
else:
# event group
out.append((group, name))
return utils.ReturnTuple((out,), ('events',))
[docs] def close(self):
"""Close file descriptor."""
# flush buffers
self._file.flush()
# close
self._file.close()