# -*- coding: utf-8 -*-
"""
biosppy.signals.tools
---------------------
This module provides various signal analysis methods in the time and
frequency domains.
:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes
:license: BSD 3-clause, see LICENSE for more details.
"""
# Imports
# compat
from __future__ import absolute_import, division, print_function
from six.moves import range
import six
# 3rd party
import sys
import numpy as np
import scipy.signal as ss
from scipy import interpolate, optimize
from scipy.stats import stats
from scipy.sparse import spdiags, eye, csr_matrix
# local
from .. import utils
def _norm_freq(frequency=None, sampling_rate=1000.0):
"""Normalize frequency to Nyquist Frequency (Fs/2).
Parameters
----------
frequency : int, float, list, array
Frequencies to normalize.
sampling_rate : int, float, optional
Sampling frequency (Hz).
Returns
-------
wn : float, array
Normalized frequencies.
"""
# check inputs
if frequency is None:
raise TypeError("Please specify a frequency to normalize.")
# convert inputs to correct representation
try:
frequency = float(frequency)
except TypeError:
# maybe frequency is a list or array
frequency = np.array(frequency, dtype="float")
Fs = float(sampling_rate)
wn = 2.0 * frequency / Fs
return wn
def _filter_init(b, a, alpha=1.0):
"""Get an initial filter state that corresponds to the steady-state
of the step response.
Parameters
----------
b : array
Numerator coefficients.
a : array
Denominator coefficients.
alpha : float, optional
Scaling factor.
Returns
-------
zi : array
Initial filter state.
"""
zi = alpha * ss.lfilter_zi(b, a)
return zi
def _filter_signal(b, a, signal, zi=None, check_phase=True, **kwargs):
"""Filter a signal with given coefficients.
Parameters
----------
b : array
Numerator coefficients.
a : array
Denominator coefficients.
signal : array
Signal to filter.
zi : array, optional
Initial filter state.
check_phase : bool, optional
If True, use the forward-backward technique.
``**kwargs`` : dict, optional
Additional keyword arguments are passed to the underlying filtering
function.
Returns
-------
filtered : array
Filtered signal.
zf : array
Final filter state.
Notes
-----
* If check_phase is True, zi cannot be set.
"""
# check inputs
if check_phase and zi is not None:
raise ValueError(
"Incompatible arguments: initial filter state cannot be set when \
check_phase is True."
)
if zi is None:
zf = None
if check_phase:
filtered = ss.filtfilt(b, a, signal, **kwargs)
else:
filtered = ss.lfilter(b, a, signal, **kwargs)
else:
filtered, zf = ss.lfilter(b, a, signal, zi=zi, **kwargs)
return filtered, zf
def _filter_resp(b, a, sampling_rate=1000.0, nfreqs=4096):
"""Compute the filter frequency response.
Parameters
----------
b : array
Numerator coefficients.
a : array
Denominator coefficients.
sampling_rate : int, float, optional
Sampling frequency (Hz).
nfreqs : int, optional
Number of frequency points to compute.
Returns
-------
freqs : array
Array of frequencies (Hz) at which the response was computed.
resp : array
Frequency response.
"""
w, resp = ss.freqz(b, a, nfreqs)
# convert frequencies
freqs = w * sampling_rate / (2.0 * np.pi)
return freqs, resp
def _get_window(kernel, size, **kwargs):
"""Return a window with the specified parameters.
Parameters
----------
kernel : str
Type of window to create.
size : int
Size of the window.
``**kwargs`` : dict, optional
Additional keyword arguments are passed to the underlying
scipy.signal.windows function.
Returns
-------
window : array
Created window.
"""
# mimics scipy.signal.get_window
if kernel in ["blackman", "black", "blk"]:
winfunc = ss.blackman
elif kernel in ["triangle", "triang", "tri"]:
winfunc = ss.triang
elif kernel in ["hamming", "hamm", "ham"]:
winfunc = ss.hamming
elif kernel in ["bartlett", "bart", "brt"]:
winfunc = ss.bartlett
elif kernel in ["hanning", "hann", "han"]:
winfunc = ss.hann
elif kernel in ["blackmanharris", "blackharr", "bkh"]:
winfunc = ss.blackmanharris
elif kernel in ["parzen", "parz", "par"]:
winfunc = ss.parzen
elif kernel in ["bohman", "bman", "bmn"]:
winfunc = ss.bohman
elif kernel in ["nuttall", "nutl", "nut"]:
winfunc = ss.nuttall
elif kernel in ["barthann", "brthan", "bth"]:
winfunc = ss.barthann
elif kernel in ["flattop", "flat", "flt"]:
winfunc = ss.flattop
elif kernel in ["kaiser", "ksr"]:
winfunc = ss.kaiser
elif kernel in ["gaussian", "gauss", "gss"]:
winfunc = ss.gaussian
elif kernel in [
"general gaussian",
"general_gaussian",
"general gauss",
"general_gauss",
"ggs",
]:
winfunc = ss.general_gaussian
elif kernel in ["boxcar", "box", "ones", "rect", "rectangular"]:
winfunc = ss.boxcar
elif kernel in ["slepian", "slep", "optimal", "dpss", "dss"]:
winfunc = ss.slepian
elif kernel in ["cosine", "halfcosine"]:
winfunc = ss.cosine
elif kernel in ["chebwin", "cheb"]:
winfunc = ss.chebwin
else:
raise ValueError("Unknown window type.")
try:
window = winfunc(size, **kwargs)
except TypeError as e:
raise TypeError("Invalid window arguments: %s." % e)
return window
[docs]def get_filter(
ftype="FIR",
band="lowpass",
order=None,
frequency=None,
sampling_rate=1000.0,
**kwargs
):
"""Compute digital (FIR or IIR) filter coefficients with the given
parameters.
Parameters
----------
ftype : str
Filter type:
* Finite Impulse Response filter ('FIR');
* Butterworth filter ('butter');
* Chebyshev filters ('cheby1', 'cheby2');
* Elliptic filter ('ellip');
* Bessel filter ('bessel').
* Notch filter ('notch').
band : str
Band type:
* Low-pass filter ('lowpass');
* High-pass filter ('highpass');
* Band-pass filter ('bandpass');
* Band-stop filter ('bandstop').
order : int
Order of the filter.
frequency : int, float, list, array
Cutoff frequencies; format depends on type of band:
* 'lowpass' or 'highpass': single frequency;
* 'bandpass' or 'bandstop': pair of frequencies.
sampling_rate : int, float, optional
Sampling frequency (Hz).
``**kwargs`` : dict, optional
Additional keyword arguments are passed to the underlying
scipy.signal function.
- Q : float
Quality factor (only for 'notch' filter). Default: 30.
Returns
-------
b : array
Numerator coefficients.
a : array
Denominator coefficients.
See Also:
scipy.signal
"""
# check inputs
if order is None and ftype != "notch":
raise TypeError("Please specify the filter order.")
if frequency is None:
raise TypeError("Please specify the cutoff frequency.")
if band not in ["lowpass", "highpass", "bandpass", "bandstop"] and ftype != "notch":
raise ValueError(
"Unknown filter band type '%r'; choose 'lowpass', 'highpass', \
'bandpass', or 'bandstop'."
% band
)
# convert frequencies
frequency = _norm_freq(frequency, sampling_rate)
# get coeffs
b, a = [], []
if ftype == "FIR":
# FIR filter
if order % 2 == 0:
order += 1
a = np.array([1])
if band in ["lowpass", "bandstop"]:
b = ss.firwin(numtaps=order, cutoff=frequency, pass_zero=True, **kwargs)
elif band in ["highpass", "bandpass"]:
b = ss.firwin(numtaps=order, cutoff=frequency, pass_zero=False, **kwargs)
elif ftype == "butter":
# Butterworth filter
b, a = ss.butter(
N=order, Wn=frequency, btype=band, analog=False, output="ba", **kwargs
)
elif ftype == "cheby1":
# Chebyshev type I filter
b, a = ss.cheby1(
N=order, Wn=frequency, btype=band, analog=False, output="ba", **kwargs
)
elif ftype == "cheby2":
# chebyshev type II filter
b, a = ss.cheby2(
N=order, Wn=frequency, btype=band, analog=False, output="ba", **kwargs
)
elif ftype == "ellip":
# Elliptic filter
b, a = ss.ellip(
N=order, Wn=frequency, btype=band, analog=False, output="ba", **kwargs
)
elif ftype == "bessel":
# Bessel filter
b, a = ss.bessel(
N=order, Wn=frequency, btype=band, analog=False, output="ba", **kwargs
)
elif ftype == "notch":
# Notch filter
b, a = ss.iirnotch(
w0=frequency, Q=kwargs.get("Q", 30)
)
return utils.ReturnTuple((b, a), ("b", "a"))
[docs]def filter_signal(
signal=None,
ftype="FIR",
band="lowpass",
order=None,
frequency=None,
sampling_rate=1000.0,
**kwargs
):
"""Filter a signal according to the given parameters.
Parameters
----------
signal : array
Signal to filter.
ftype : str
Filter type:
* Finite Impulse Response filter ('FIR');
* Butterworth filter ('butter');
* Chebyshev filters ('cheby1', 'cheby2');
* Elliptic filter ('ellip');
* Bessel filter ('bessel').
* Notch filter ('notch').
band : str
Band type:
* Low-pass filter ('lowpass');
* High-pass filter ('highpass');
* Band-pass filter ('bandpass');
* Band-stop filter ('bandstop').
order : int
Order of the filter.
frequency : int, float, list, array
Cutoff frequencies; format depends on type of band:
* 'lowpass' or 'bandpass': single frequency;
* 'bandpass' or 'bandstop': pair of frequencies.
sampling_rate : int, float, optional
Sampling frequency (Hz).
``**kwargs`` : dict, optional
Additional keyword arguments are passed to the underlying
scipy.signal function.
- Q : float
Quality factor (only for 'notch' filter). Default: 30.
Returns
-------
signal : array
Filtered signal.
sampling_rate : float
Sampling frequency (Hz).
params : dict
Filter parameters.
Notes
-----
* Uses a forward-backward filter implementation. Therefore, the combined
filter has linear phase.
"""
# check inputs
if signal is None:
raise TypeError("Please specify a signal to filter.")
# get filter
b, a = get_filter(
ftype=ftype,
order=order,
frequency=frequency,
sampling_rate=sampling_rate,
band=band,
**kwargs
)
# filter
filtered, _ = _filter_signal(b, a, signal, check_phase=True)
# parameters for notch filter
if ftype == "notch":
order = 2
band = "bandstop"
# output
params = {
"ftype": ftype,
"order": order,
"frequency": frequency,
"band": band,
**kwargs,
}
params.update(kwargs)
args = (filtered, sampling_rate, params)
names = ("signal", "sampling_rate", "params")
return utils.ReturnTuple(args, names)
[docs]class OnlineFilter(object):
"""Online filtering.
Parameters
----------
b : array
Numerator coefficients.
a : array
Denominator coefficients.
"""
def __init__(self, b=None, a=None):
# check inputs
if b is None:
raise TypeError("Please specify the numerator coefficients.")
if a is None:
raise TypeError("Please specify the denominator coefficients.")
# self things
self.b = b
self.a = a
# reset
self.reset()
[docs] def reset(self):
"""Reset the filter state."""
self.zi = None
[docs] def filter(self, signal=None):
"""Filter a signal segment.
Parameters
----------
signal : array
Signal segment to filter.
Returns
-------
filtered : array
Filtered signal segment.
"""
# check input
if signal is None:
raise TypeError("Please specify the input signal.")
if self.zi is None:
self.zi = signal[0] * ss.lfilter_zi(self.b, self.a)
filtered, self.zi = ss.lfilter(self.b, self.a, signal, zi=self.zi)
return utils.ReturnTuple((filtered,), ("filtered",))
[docs]def smoother(signal=None, kernel="boxzen", size=10, mirror=True, **kwargs):
"""Smooth a signal using an N-point moving average [MAvg]_ filter.
This implementation uses the convolution of a filter kernel with the input
signal to compute the smoothed signal [Smit97]_.
Availabel kernels: median, boxzen, boxcar, triang, blackman, hamming, hann,
bartlett, flattop, parzen, bohman, blackmanharris, nuttall, barthann,
kaiser (needs beta), gaussian (needs std), general_gaussian (needs power,
width), slepian (needs width), chebwin (needs attenuation).
Parameters
----------
signal : array
Signal to smooth.
kernel : str, array, optional
Type of kernel to use; if array, use directly as the kernel.
size : int, optional
Size of the kernel; ignored if kernel is an array.
mirror : bool, optional
If True, signal edges are extended to avoid boundary effects.
``**kwargs`` : dict, optional
Additional keyword arguments are passed to the underlying
scipy.signal.windows function.
Returns
-------
signal : array
Smoothed signal.
params : dict
Smoother parameters.
Notes
-----
* When the kernel is 'median', mirror is ignored.
References
----------
.. [MAvg] Wikipedia, "Moving Average",
http://en.wikipedia.org/wiki/Moving_average
.. [Smit97] S. W. Smith, "Moving Average Filters - Implementation by
Convolution", http://www.dspguide.com/ch15/1.htm, 1997
"""
# check inputs
if signal is None:
raise TypeError("Please specify a signal to smooth.")
length = len(signal)
if isinstance(kernel, six.string_types):
# check length
if size > length:
size = length - 1
if size < 1:
size = 1
if kernel == "boxzen":
# hybrid method
# 1st pass - boxcar kernel
aux, _ = smoother(signal, kernel="boxcar", size=size, mirror=mirror)
# 2nd pass - parzen kernel
smoothed, _ = smoother(aux, kernel="parzen", size=size, mirror=mirror)
params = {"kernel": kernel, "size": size, "mirror": mirror}
args = (smoothed, params)
names = ("signal", "params")
return utils.ReturnTuple(args, names)
elif kernel == "median":
# median filter
if size % 2 == 0:
raise ValueError("When the kernel is 'median', size must be odd.")
smoothed = ss.medfilt(signal, kernel_size=size)
params = {"kernel": kernel, "size": size, "mirror": mirror}
args = (smoothed, params)
names = ("signal", "params")
return utils.ReturnTuple(args, names)
else:
win = _get_window(kernel, size, **kwargs)
elif isinstance(kernel, np.ndarray):
win = kernel
size = len(win)
# check length
if size > length:
raise ValueError("Kernel size is bigger than signal length.")
if size < 1:
raise ValueError("Kernel size is smaller than 1.")
else:
raise TypeError("Unknown kernel type.")
# convolve
w = win / win.sum()
if mirror:
aux = np.concatenate(
(signal[0] * np.ones(size), signal, signal[-1] * np.ones(size))
)
smoothed = np.convolve(w, aux, mode="same")
smoothed = smoothed[size:-size]
else:
smoothed = np.convolve(w, signal, mode="same")
# output
params = {"kernel": kernel, "size": size, "mirror": mirror}
params.update(kwargs)
args = (smoothed, params)
names = ("signal", "params")
return utils.ReturnTuple(args, names)
[docs]def analytic_signal(signal=None, N=None):
"""Compute analytic signal, using the Hilbert Transform.
Parameters
----------
signal : array
Input signal.
N : int, optional
Number of Fourier components; default is `len(signal)`.
Returns
-------
amplitude : array
Amplitude envelope of the analytic signal.
phase : array
Instantaneous phase component of the analystic signal.
"""
# check inputs
if signal is None:
raise TypeError("Please specify an input signal.")
# hilbert transform
asig = ss.hilbert(signal, N=N)
# amplitude envelope
amp = np.absolute(asig)
# instantaneous
phase = np.angle(asig)
return utils.ReturnTuple((amp, phase), ("amplitude", "phase"))
[docs]def phase_locking(signal1=None, signal2=None, N=None):
"""Compute the Phase-Locking Factor (PLF) between two signals.
Parameters
----------
signal1 : array
First input signal.
signal2 : array
Second input signal.
N : int, optional
Number of Fourier components.
Returns
-------
plf : float
The PLF between the two signals.
"""
# check inputs
if signal1 is None:
raise TypeError("Please specify the first input signal.")
if signal2 is None:
raise TypeError("Please specify the second input signal.")
if len(signal1) != len(signal2):
raise ValueError("The input signals must have the same length.")
# compute analytic signal
asig1 = ss.hilbert(signal1, N=N)
phase1 = np.angle(asig1)
asig2 = ss.hilbert(signal2, N=N)
phase2 = np.angle(asig2)
# compute PLF
plf = np.absolute(np.mean(np.exp(1j * (phase1 - phase2))))
return utils.ReturnTuple((plf,), ("plf",))
[docs]def power_spectrum(
signal=None, sampling_rate=1000.0, pad=None, pow2=False, decibel=True
):
"""Compute the power spectrum of a signal (one-sided).
Parameters
----------
signal : array
Input signal.
sampling_rate : int, float, optional
Sampling frequency (Hz).
pad : int, optional
Padding for the Fourier Transform (number of zeros added);
defaults to no padding..
pow2 : bool, optional
If True, rounds the number of points `N = len(signal) + pad` to the
nearest power of 2 greater than N.
decibel : bool, optional
If True, returns the power in decibels.
Returns
-------
freqs : array
Array of frequencies (Hz) at which the power was computed.
power : array
Power spectrum.
"""
# check inputs
if signal is None:
raise TypeError("Please specify an input signal.")
npoints = len(signal)
if pad is not None:
if pad >= 0:
npoints += pad
else:
raise ValueError("Padding must be a positive integer.")
# power of 2
if pow2:
npoints = 2 ** (np.ceil(np.log2(npoints)))
Nyq = float(sampling_rate) / 2
hpoints = npoints // 2
freqs = np.linspace(0, Nyq, hpoints)
power = np.abs(np.fft.fft(signal, npoints)) / npoints
# one-sided
power = power[:hpoints]
power[1:] *= 2
power = np.power(power, 2)
if decibel:
power = 10.0 * np.log10(power)
return utils.ReturnTuple((freqs, power), ("freqs", "power"))
[docs]def welch_spectrum(
signal=None,
sampling_rate=1000.0,
size=None,
overlap=None,
window="hanning",
window_kwargs=None,
pad=None,
decibel=True,
):
"""Compute the power spectrum of a signal using Welch's method (one-sided).
Parameters
----------
signal : array
Input signal.
sampling_rate : int, float, optional
Sampling frequency (Hz).
size : int, optional
Number of points in each Welch segment;
defaults to the equivalent of 1 second;
ignored when 'window' is an array.
overlap : int, optional
Number of points to overlap between segments; defaults to `size / 2`.
window : str, array, optional
Type of window to use.
window_kwargs : dict, optional
Additional keyword arguments to pass on window creation; ignored if
'window' is an array.
pad : int, optional
Padding for the Fourier Transform (number of zeros added);
defaults to no padding.
decibel : bool, optional
If True, returns the power in decibels.
Returns
-------
freqs : array
Array of frequencies (Hz) at which the power was computed.
power : array
Power spectrum.
Notes
-----
* Detrends each Welch segment by removing the mean.
"""
# check inputs
if signal is None:
raise TypeError("Please specify an input signal.")
length = len(signal)
sampling_rate = float(sampling_rate)
if size is None:
size = int(sampling_rate)
if window_kwargs is None:
window_kwargs = {}
if isinstance(window, six.string_types):
win = _get_window(window, size, **window_kwargs)
elif isinstance(window, np.ndarray):
win = window
size = len(win)
if size > length:
raise ValueError("Segment size must be smaller than signal length.")
if overlap is None:
overlap = size // 2
elif overlap > size:
raise ValueError("Overlap must be smaller than segment size.")
nfft = size
if pad is not None:
if pad >= 0:
nfft += pad
else:
raise ValueError("Padding must be a positive integer.")
freqs, power = ss.welch(
signal,
fs=sampling_rate,
window=win,
nperseg=size,
noverlap=overlap,
nfft=nfft,
detrend="constant",
return_onesided=True,
scaling="spectrum",
)
# compensate one-sided
power *= 2
if decibel:
power = 10.0 * np.log10(power)
return utils.ReturnTuple((freqs, power), ("freqs", "power"))
[docs]def band_power(freqs=None, power=None, frequency=None, decibel=True):
"""Compute the avearge power in a frequency band.
Parameters
----------
freqs : array
Array of frequencies (Hz) at which the power was computed.
power : array
Input power spectrum.
frequency : list, array
Pair of frequencies defining the band.
decibel : bool, optional
If True, input power is in decibels.
Returns
-------
avg_power : float
The average power in the band.
"""
# check inputs
if freqs is None:
raise TypeError("Please specify the 'freqs' array.")
if power is None:
raise TypeError("Please specify the input power spectrum.")
if len(freqs) != len(power):
raise ValueError(
"The input 'freqs' and 'power' arrays must have the same length."
)
if frequency is None:
raise TypeError("Please specify the band frequencies.")
try:
f1, f2 = frequency
except ValueError:
raise ValueError("Input 'frequency' must be a pair of frequencies.")
# make frequencies sane
if f1 > f2:
f1, f2 = f2, f1
if f1 < freqs[0]:
f1 = freqs[0]
if f2 > freqs[-1]:
f2 = freqs[-1]
# average
sel = np.nonzero(np.logical_and(f1 <= freqs, freqs <= f2))[0]
if decibel:
aux = 10 ** (power / 10.0)
avg = np.mean(aux[sel])
avg = 10.0 * np.log10(avg)
else:
avg = np.mean(power[sel])
return utils.ReturnTuple((avg,), ("avg_power",))
[docs]def signal_stats(signal=None):
"""Compute various metrics describing the signal.
Parameters
----------
signal : array
Input signal.
Returns
-------
mean : float
Mean of the signal.
median : float
Median of the signal.
min : float
Minimum signal value.
max : float
Maximum signal value.
max_amp : float
Maximum absolute signal amplitude, in relation to the mean.
range : float
Signal range (max - min).
q1 : float
First quartile of the signal.
q3 : float
Third quartile of the signal.
var : float
Signal variance (unbiased).
std_dev : float
Standard signal deviation (unbiased).
abs_dev : float
Mean absolute signal deviation around the median.
rms : float
Root-mean-square of the signal.
kurtosis : float
Signal kurtosis (unbiased).
skew : float
Signal skewness (unbiased).
"""
# check inputs
if signal is None:
raise TypeError("Please specify an input signal.")
# ensure numpy
signal = np.array(signal)
# mean
mean = np.mean(signal)
# median
median = np.median(signal)
# min
minVal = np.min(signal)
# max
maxVal = np.max(signal)
# maximum amplitude
maxAmp = np.abs(signal - mean).max()
# range
rng = maxVal - minVal
# variance
sigma2 = signal.var(ddof=1)
# standard deviation
sigma = signal.std(ddof=1)
# absolute deviation
ad = np.mean(np.abs(signal - median))
# root-mean-square
rms = np.sqrt(np.mean(signal**2))
# kurtosis
kurt = stats.kurtosis(signal, bias=False)
# skewness
skew = stats.skew(signal, bias=False)
# output
args = (mean, median, minVal, maxVal, maxAmp, rng, sigma2, sigma, ad, rms,
kurt, skew)
names = ("mean", "median", "min", "max", "max_amp", "range", "var", "std_dev",
"abs_dev", "rms", "kurtosis", "skew")
return utils.ReturnTuple(args, names)
[docs]def normalize(signal=None, ddof=1):
"""Normalize a signal to zero mean and unitary standard deviation.
Parameters
----------
signal : array
Input signal.
ddof : int, optional
Delta degrees of freedom for standard deviation computation;
the divisor is `N - ddof`, where `N` is the number of elements;
default is one.
Returns
-------
signal : array
Normalized signal.
"""
# check inputs
if signal is None:
raise TypeError("Please specify an input signal.")
# ensure numpy
signal = np.array(signal)
normalized = signal - signal.mean()
normalized /= normalized.std(ddof=ddof)
return utils.ReturnTuple((normalized,), ("signal",))
[docs]def zero_cross(signal=None, detrend=False):
"""Locate the indices where the signal crosses zero.
Parameters
----------
signal : array
Input signal.
detrend : bool, optional
If True, remove signal mean before computation.
Returns
-------
zeros : array
Indices of zero crossings.
Notes
-----
* When the signal crosses zero between samples, the first index
is returned.
"""
# check inputs
if signal is None:
raise TypeError("Please specify an input signal.")
if detrend:
signal = signal - np.mean(signal)
# zeros
df = np.diff(np.sign(signal))
zeros = np.nonzero(np.abs(df) > 0)[0]
return utils.ReturnTuple((zeros,), ("zeros",))
[docs]def find_extrema(signal=None, mode="both"):
"""Locate local extrema points in a signal.
Based on Fermat's Theorem [Ferm]_.
Parameters
----------
signal : array
Input signal.
mode : str, optional
Whether to find maxima ('max'), minima ('min'), or both ('both').
Returns
-------
extrema : array
Indices of the extrema points.
values : array
Signal values at the extrema points.
References
----------
.. [Ferm] Wikipedia, "Fermat's theorem (stationary points)",
https://en.wikipedia.org/wiki/Fermat%27s_theorem_(stationary_points)
"""
# check inputs
if signal is None:
raise TypeError("Please specify an input signal.")
if mode not in ["max", "min", "both"]:
raise ValueError("Unknwon mode %r." % mode)
aux = np.diff(np.sign(np.diff(signal)))
if mode == "both":
aux = np.abs(aux)
extrema = np.nonzero(aux > 0)[0] + 1
elif mode == "max":
extrema = np.nonzero(aux < 0)[0] + 1
elif mode == "min":
extrema = np.nonzero(aux > 0)[0] + 1
values = signal[extrema]
return utils.ReturnTuple((extrema, values), ("extrema", "values"))
[docs]def windower(
signal=None,
size=None,
step=None,
fcn=None,
fcn_kwargs=None,
kernel="boxcar",
kernel_kwargs=None,
):
"""Apply a function to a signal in sequential windows, with optional overlap.
Availabel window kernels: boxcar, triang, blackman, hamming, hann,
bartlett, flattop, parzen, bohman, blackmanharris, nuttall, barthann,
kaiser (needs beta), gaussian (needs std), general_gaussian (needs power,
width), slepian (needs width), chebwin (needs attenuation).
Parameters
----------
signal : array
Input signal.
size : int
Size of the signal window.
step : int, optional
Size of window shift; if None, there is no overlap.
fcn : callable
Function to apply to each window.
fcn_kwargs : dict, optional
Additional keyword arguments to pass to 'fcn'.
kernel : str, array, optional
Type of kernel to use; if array, use directly as the kernel.
kernel_kwargs : dict, optional
Additional keyword arguments to pass on window creation; ignored if
'kernel' is an array.
Returns
-------
index : array
Indices characterizing window locations (start of the window).
values : array
Concatenated output of calling 'fcn' on each window.
"""
# check inputs
if signal is None:
raise TypeError("Please specify an input signal.")
if fcn is None:
raise TypeError("Please specify a function to apply to each window.")
if fcn_kwargs is None:
fcn_kwargs = {}
if kernel_kwargs is None:
kernel_kwargs = {}
length = len(signal)
if isinstance(kernel, six.string_types):
# check size
if size > length:
raise ValueError("Window size must be smaller than signal length.")
win = _get_window(kernel, size, **kernel_kwargs)
elif isinstance(kernel, np.ndarray):
win = kernel
size = len(win)
# check size
if size > length:
raise ValueError("Window size must be smaller than signal length.")
if step is None:
step = size
if step <= 0:
raise ValueError("Step size must be at least 1.")
# number of windows
nb = 1 + (length - size) // step
# check signal dimensionality
if np.ndim(signal) == 2:
# time along 1st dim, tile window
nch = np.shape(signal)[1]
win = np.tile(np.reshape(win, (size, 1)), nch)
index = []
values = []
for i in range(nb):
start = i * step
stop = start + size
index.append(start)
aux = signal[start:stop] * win
# apply function
out = fcn(aux, **fcn_kwargs)
values.append(out)
# transform to numpy
index = np.array(index, dtype="int")
values = np.array(values)
return utils.ReturnTuple((index, values), ("index", "values"))
[docs]def synchronize(x=None, y=None, detrend=True):
"""Align two signals based on cross-correlation.
Parameters
----------
x : array
First input signal.
y : array
Second input signal.
detrend : bool, optional
If True, remove signal means before computation.
Returns
-------
delay : int
Delay (number of samples) of 'x' in relation to 'y';
if 'delay' < 0 , 'x' is ahead in relation to 'y';
if 'delay' > 0 , 'x' is delayed in relation to 'y'.
corr : float
Value of maximum correlation.
synch_x : array
Biggest possible portion of 'x' in synchronization.
synch_y : array
Biggest possible portion of 'y' in synchronization.
"""
# check inputs
if x is None:
raise TypeError("Please specify the first input signal.")
if y is None:
raise TypeError("Please specify the second input signal.")
n1 = len(x)
n2 = len(y)
if detrend:
x = x - np.mean(x)
y = y - np.mean(y)
# correlate
corr = np.correlate(x, y, mode="full")
d = np.arange(-n2 + 1, n1, dtype="int")
ind = np.argmax(corr)
delay = d[ind]
maxCorr = corr[ind]
# get synchronization overlap
if delay < 0:
c = min([n1, len(y[-delay:])])
synch_x = x[:c]
synch_y = y[-delay : -delay + c]
elif delay > 0:
c = min([n2, len(x[delay:])])
synch_x = x[delay : delay + c]
synch_y = y[:c]
else:
c = min([n1, n2])
synch_x = x[:c]
synch_y = y[:c]
# output
args = (delay, maxCorr, synch_x, synch_y)
names = ("delay", "corr", "synch_x", "synch_y")
return utils.ReturnTuple(args, names)
[docs]def pearson_correlation(x=None, y=None):
"""Compute the Pearson Correlation Coefficient bertween two signals.
The coefficient is given by:
.. math::
r_{xy} = \\frac{E[(X - \\mu_X) (Y - \\mu_Y)]}{\\sigma_X \\sigma_Y}
Parameters
----------
x : array
First input signal.
y : array
Second input signal.
Returns
-------
rxy : float
Pearson correlation coefficient, ranging between -1 and +1.
Raises
------
ValueError
If the input signals do not have the same length.
"""
print(
"tools.pearson_correlation is deprecated, use stats.pearson_correlation instead",
file=sys.stderr,
)
# check inputs
if x is None:
raise TypeError("Please specify the first input signal.")
if y is None:
raise TypeError("Please specify the second input signal.")
# ensure numpy
x = np.array(x)
y = np.array(y)
n = len(x)
if n != len(y):
raise ValueError("Input signals must have the same length.")
mx = np.mean(x)
my = np.mean(y)
Sxy = np.sum(x * y) - n * mx * my
Sxx = np.sum(np.power(x, 2)) - n * mx**2
Syy = np.sum(np.power(y, 2)) - n * my**2
rxy = Sxy / (np.sqrt(Sxx) * np.sqrt(Syy))
# avoid propagation of numerical errors
if rxy > 1.0:
rxy = 1.0
elif rxy < -1.0:
rxy = -1.0
return utils.ReturnTuple((rxy,), ("rxy",))
[docs]def rms_error(x=None, y=None):
"""Compute the Root-Mean-Square Error between two signals.
The error is given by:
.. math::
rmse = \\sqrt{E[(X - Y)^2]}
Parameters
----------
x : array
First input signal.
y : array
Second input signal.
Returns
-------
rmse : float
Root-mean-square error.
Raises
------
ValueError
If the input signals do not have the same length.
"""
# check inputs
if x is None:
raise TypeError("Please specify the first input signal.")
if y is None:
raise TypeError("Please specify the second input signal.")
# ensure numpy
x = np.array(x)
y = np.array(y)
n = len(x)
if n != len(y):
raise ValueError("Input signals must have the same length.")
rmse = np.sqrt(np.mean(np.power(x - y, 2)))
return utils.ReturnTuple((rmse,), ("rmse",))
[docs]def get_heart_rate(beats=None, sampling_rate=1000.0, smooth=False, size=3):
"""Compute instantaneous heart rate from an array of beat indices.
Parameters
----------
beats : array
Beat location indices.
sampling_rate : int, float, optional
Sampling frequency (Hz).
smooth : bool, optional
If True, perform smoothing on the resulting heart rate.
size : int, optional
Size of smoothing window; ignored if `smooth` is False.
Returns
-------
index : array
Heart rate location indices.
heart_rate : array
Instantaneous heart rate (bpm).
Notes
-----
* Assumes normal human heart rate to be between 40 and 200 bpm.
"""
# check inputs
if beats is None:
raise TypeError("Please specify the input beat indices.")
if len(beats) < 2:
raise ValueError("Not enough beats to compute heart rate.")
# compute heart rate
ts = beats[1:]
hr = sampling_rate * (60.0 / np.diff(beats))
# physiological limits
indx = np.nonzero(np.logical_and(hr >= 40, hr <= 200))
ts = ts[indx]
hr = hr[indx]
# smooth with moving average
if smooth and (len(hr) > 1):
hr, _ = smoother(signal=hr, kernel="boxcar", size=size, mirror=True)
return utils.ReturnTuple((ts, hr), ("index", "heart_rate"))
def _pdiff(x, p1, p2):
"""Compute the squared difference between two interpolators, given the
x-coordinates.
Parameters
----------
x : array
Array of x-coordinates.
p1 : object
First interpolator.
p2 : object
Second interpolator.
Returns
-------
diff : array
Squared differences.
"""
diff = (p1(x) - p2(x)) ** 2
return diff
[docs]def find_intersection(
x1=None, y1=None, x2=None, y2=None, alpha=1.5, xtol=1e-6, ytol=1e-6
):
"""Find the intersection points between two lines using piecewise
polynomial interpolation.
Parameters
----------
x1 : array
Array of x-coordinates of the first line.
y1 : array
Array of y-coordinates of the first line.
x2 : array
Array of x-coordinates of the second line.
y2 : array
Array of y-coordinates of the second line.
alpha : float, optional
Resolution factor for the x-axis; fraction of total number of
x-coordinates.
xtol : float, optional
Tolerance for the x-axis.
ytol : float, optional
Tolerance for the y-axis.
Returns
-------
roots : array
Array of x-coordinates of found intersection points.
values : array
Array of y-coordinates of found intersection points.
Notes
-----
* If no intersection is found, returns the closest point.
"""
# check inputs
if x1 is None:
raise TypeError("Please specify the x-coordinates of the first line.")
if y1 is None:
raise TypeError("Please specify the y-coordinates of the first line.")
if x2 is None:
raise TypeError("Please specify the x-coordinates of the second line.")
if y2 is None:
raise TypeError("Please specify the y-coordinates of the second line.")
# ensure numpy
x1 = np.array(x1)
y1 = np.array(y1)
x2 = np.array(x2)
y2 = np.array(y2)
if x1.shape != y1.shape:
raise ValueError(
"Input coordinates for the first line must have the same shape."
)
if x2.shape != y2.shape:
raise ValueError(
"Input coordinates for the second line must have the same shape."
)
# interpolate
p1 = interpolate.BPoly.from_derivatives(x1, y1[:, np.newaxis])
p2 = interpolate.BPoly.from_derivatives(x2, y2[:, np.newaxis])
# combine x intervals
x = np.r_[x1, x2]
x_min = x.min()
x_max = x.max()
npoints = int(len(np.unique(x)) * alpha)
x = np.linspace(x_min, x_max, npoints)
# initial estimates
pd = p1(x) - p2(x)
(zerocs,) = zero_cross(pd)
pd_abs = np.abs(pd)
zeros = np.nonzero(pd_abs < ytol)[0]
ind = np.unique(np.concatenate((zerocs, zeros)))
xi = x[ind]
# search for solutions
roots = set()
for v in xi:
root, _, ier, _ = optimize.fsolve(
_pdiff,
v,
args=(p1, p2),
full_output=True,
xtol=xtol,
)
if ier == 1 and x_min <= root <= x_max:
roots.add(root[0])
if len(roots) == 0:
# no solution was found => give the best from the initial estimates
aux = np.abs(pd)
bux = aux.min() * np.ones(npoints, dtype="float")
roots, _ = find_intersection(x, aux, x, bux, alpha=1.0, xtol=xtol, ytol=ytol)
# compute values
roots = list(roots)
roots.sort()
roots = np.array(roots)
values = np.mean(np.vstack((p1(roots), p2(roots))), axis=0)
return utils.ReturnTuple((roots, values), ("roots", "values"))
[docs]def finite_difference(signal=None, weights=None):
"""Apply the Finite Difference method to compute derivatives.
Parameters
----------
signal : array
Signal to differentiate.
weights : list, array
Finite difference weight coefficients.
Returns
-------
index : array
Indices from `signal` for which the derivative was computed.
derivative : array
Computed derivative.
Notes
-----
* The method assumes central differences weights.
* The method accounts for the delay introduced by the algorithm.
Raises
------
ValueError
If the number of weights is not odd.
"""
# check inputs
if signal is None:
raise TypeError("Please specify a signal to differentiate.")
if weights is None:
raise TypeError("Please specify the weight coefficients.")
N = len(weights)
if N % 2 == 0:
raise ValueError("Number of weights must be odd.")
# diff
weights = weights[::-1]
derivative = ss.lfilter(weights, [1], signal)
# trim delay
D = N - 1
D2 = D // 2
index = np.arange(D2, len(signal) - D2, dtype="int")
derivative = derivative[D:]
return utils.ReturnTuple((index, derivative), ("index", "derivative"))
def _init_dist_profile(m, n, signal):
"""Compute initial time series signal statistics for distance profile.
Implements the algorithm described in [Mueen2014]_, using the notation
from [Yeh2016_a]_.
Parameters
----------
m : int
Sub-sequence length.
n : int
Target signal length.
signal : array
Target signal.
Returns
-------
X : array
Fourier Transform (FFT) of the signal.
sigma : array
Moving standard deviation in windows of length `m`.
References
----------
.. [Mueen2014] Abdullah Mueen, Hossein Hamooni, "Trilce Estrada: Time
Series Join on Subsequence Correlation", ICDM 2014: 450-459
.. [Yeh2016_a] Chin-Chia Michael Yeh, Yan Zhu, Liudmila Ulanova,
Nurjahan Begum, Yifei Ding, Hoang Anh Dau, Diego Furtado Silva,
Abdullah Mueen, Eamonn Keogh, "Matrix Profile I: All Pairs Similarity
Joins for Time Series: A Unifying View that Includes Motifs, Discords
and Shapelets", IEEE ICDM 2016
"""
# compute signal stats
csumx = np.zeros(n + 1, dtype="float")
csumx[1:] = np.cumsum(signal)
sumx = csumx[m:] - csumx[:-m]
csumx2 = np.zeros(n + 1, dtype="float")
csumx2[1:] = np.cumsum(np.power(signal, 2))
sumx2 = csumx2[m:] - csumx2[:-m]
meanx = sumx / m
sigmax2 = (sumx2 / m) - np.power(meanx, 2)
sigma = np.sqrt(sigmax2)
# append zeros
x = np.concatenate((signal, np.zeros(n, dtype="float")))
# compute FFT
X = np.fft.fft(x)
return X, sigma
def _ditance_profile(m, n, query, X, sigma):
"""Compute the distance profile of a query sequence against a signal.
Implements the algorithm described in [Mueen2014]_, using the notation
from [Yeh2016]_.
Parameters
----------
m : int
Query sub-sequence length.
n : int
Target time series length.
query : array
Query sub-sequence.
X : array
Target time series Fourier Transform (FFT).
sigma : array
Moving standard deviation in windows of length `m`.
Returns
-------
dist : array
Distance profile (squared).
Notes
-----
* Computes distances on z-normalized data.
References
----------
.. [Mueen2014] Abdullah Mueen, Hossein Hamooni, "Trilce Estrada: Time
Series Join on Subsequence Correlation", ICDM 2014: 450-459
.. [Yeh2016] Chin-Chia Michael Yeh, Yan Zhu, Liudmila Ulanova,
Nurjahan Begum, Yifei Ding, Hoang Anh Dau, Diego Furtado Silva,
Abdullah Mueen, Eamonn Keogh, "Matrix Profile I: All Pairs Similarity
Joins for Time Series: A Unifying View that Includes Motifs, Discords
and Shapelets", IEEE ICDM 2016
"""
# normalize query
q = (query - np.mean(query)) / np.std(query)
# reverse query and append zeros
y = np.concatenate((q[::-1], np.zeros(2 * n - m, dtype="float")))
# compute dot products fast
Y = np.fft.fft(y)
Z = X * Y
z = np.fft.ifft(Z)
z = z[m - 1 : n]
# compute distances (z-normalized squared euclidean distance)
dist = 2 * m * (1 - z / (m * sigma))
return dist
[docs]def distance_profile(query=None, signal=None, metric="euclidean"):
"""Compute the distance profile of a query sequence against a signal.
Implements the algorithm described in [Mueen2014]_.
Parameters
----------
query : array
Input query signal sequence.
signal : array
Input target time series signal.
metric : str, optional
The distance metric to use; one of 'euclidean' or 'pearson'; default
is 'euclidean'.
Returns
-------
dist : array
Distance of the query sequence to every sub-sequnce in the signal.
Notes
-----
* Computes distances on z-normalized data.
References
----------
.. [Mueen2014] Abdullah Mueen, Hossein Hamooni, "Trilce Estrada: Time
Series Join on Subsequence Correlation", ICDM 2014: 450-459
"""
# check inputs
if query is None:
raise TypeError("Please specify the input query sequence.")
if signal is None:
raise TypeError("Please specify the input time series signal.")
if metric not in ["euclidean", "pearson"]:
raise ValueError("Unknown distance metric.")
# ensure numpy
query = np.array(query)
signal = np.array(signal)
m = len(query)
n = len(signal)
if m > n / 2:
raise ValueError("Time series signal is too short relative to" " query length.")
# get initial signal stats
X, sigma = _init_dist_profile(m, n, signal)
# compute distance profile
dist = _ditance_profile(m, n, query, X, sigma)
if metric == "pearson":
dist = 1 - np.abs(dist) / (2 * m)
elif metric == "euclidean":
dist = np.abs(np.sqrt(dist))
return utils.ReturnTuple((dist,), ("dist",))
[docs]def signal_self_join(signal=None, size=None, index=None, limit=None):
"""Compute the matrix profile for a self-similarity join of a time series.
Implements the algorithm described in [Yeh2016_b]_.
Parameters
----------
signal : array
Input target time series signal.
size : int
Size of the query sub-sequences.
index : list, array, optional
Starting indices for query sub-sequences; the default is to search all
sub-sequences.
limit : int, optional
Upper limit for the number of query sub-sequences; the default is to
search all sub-sequences.
Returns
-------
matrix_index : array
Matric profile index.
matrix_profile : array
Computed matrix profile (distances).
Notes
-----
* Computes euclidean distances on z-normalized data.
References
----------
.. [Yeh2016_b] Chin-Chia Michael Yeh, Yan Zhu, Liudmila Ulanova,
Nurjahan Begum, Yifei Ding, Hoang Anh Dau, Diego Furtado Silva,
Abdullah Mueen, Eamonn Keogh, "Matrix Profile I: All Pairs Similarity
Joins for Time Series: A Unifying View that Includes Motifs, Discords
and Shapelets", IEEE ICDM 2016
"""
# check inputs
if signal is None:
raise TypeError("Please specify the input time series signal.")
if size is None:
raise TypeError("Please specify the sub-sequence size.")
# ensure numpy
signal = np.array(signal)
n = len(signal)
if size > n / 2:
raise ValueError(
"Time series signal is too short relative to desired"
" sub-sequence length."
)
if size < 4:
raise ValueError("Sub-sequence length must be at least 4.")
# matrix profile length
nb = n - size + 1
# get search index
if index is None:
index = np.random.permutation(np.arange(nb, dtype="int"))
else:
index = np.array(index)
if not np.all(index < nb):
raise ValueError("Provided `index` exceeds allowable sub-sequences.")
# limit search
if limit is not None:
if limit < 1:
raise ValueError("Search limit must be at least 1.")
index = index[:limit]
# exclusion zone (to avoid query self-matches)
ezone = int(round(size / 4))
# initialization
matrix_profile = np.inf * np.ones(nb, dtype="float")
matrix_index = np.zeros(nb, dtype="int")
X, sigma = _init_dist_profile(size, n, signal)
# compute matrix profile
for idx in index:
# compute distance profile
query = signal[idx : idx + size]
dist = _ditance_profile(size, n, query, X, sigma)
dist = np.abs(np.sqrt(dist)) # to have euclidean distance
# apply exlusion zone
a = max([0, idx - ezone])
b = min([nb, idx + ezone + 1])
dist[a:b] = np.inf
# find nearest neighbors
pos = dist < matrix_profile
matrix_profile[pos] = dist[pos]
matrix_index[pos] = idx
# account for exlusion zone
neighbor = np.argmin(dist)
matrix_profile[idx] = dist[neighbor]
matrix_index[idx] = neighbor
# output
args = (matrix_index, matrix_profile)
names = ("matrix_index", "matrix_profile")
return utils.ReturnTuple(args, names)
[docs]def signal_cross_join(signal1=None, signal2=None, size=None, index=None, limit=None):
"""Compute the matrix profile for a similarity join of two time series.
Computes the nearest sub-sequence in `signal2` for each sub-sequence in
`signal1`. Implements the algorithm described in [Yeh2016_c]_.
Parameters
----------
signal1 : array
Fisrt input time series signal.
signal2 : array
Second input time series signal.
size : int
Size of the query sub-sequences.
index : list, array, optional
Starting indices for query sub-sequences; the default is to search all
sub-sequences.
limit : int, optional
Upper limit for the number of query sub-sequences; the default is to
search all sub-sequences.
Returns
-------
matrix_index : array
Matric profile index.
matrix_profile : array
Computed matrix profile (distances).
Notes
-----
* Computes euclidean distances on z-normalized data.
References
----------
.. [Yeh2016_c] Chin-Chia Michael Yeh, Yan Zhu, Liudmila Ulanova,
Nurjahan Begum, Yifei Ding, Hoang Anh Dau, Diego Furtado Silva,
Abdullah Mueen, Eamonn Keogh, "Matrix Profile I: All Pairs Similarity
Joins for Time Series: A Unifying View that Includes Motifs, Discords
and Shapelets", IEEE ICDM 2016
"""
# check inputs
if signal1 is None:
raise TypeError("Please specify the first input time series signal.")
if signal2 is None:
raise TypeError("Please specify the second input time series signal.")
if size is None:
raise TypeError("Please specify the sub-sequence size.")
# ensure numpy
signal1 = np.array(signal1)
signal2 = np.array(signal2)
n1 = len(signal1)
if size > n1 / 2:
raise ValueError(
"First time series signal is too short relative to"
" desired sub-sequence length."
)
n2 = len(signal2)
if size > n2 / 2:
raise ValueError(
"Second time series signal is too short relative to"
" desired sub-sequence length."
)
if size < 4:
raise ValueError("Sub-sequence length must be at least 4.")
# matrix profile length
nb1 = n1 - size + 1
nb2 = n2 - size + 1
# get search index
if index is None:
index = np.random.permutation(np.arange(nb2, dtype="int"))
else:
index = np.array(index)
if not np.all(index < nb2):
raise ValueError(
"Provided `index` exceeds allowable `signal2`" " sub-sequences."
)
# limit search
if limit is not None:
if limit < 1:
raise ValueError("Search limit must be at least 1.")
index = index[:limit]
# initialization
matrix_profile = np.inf * np.ones(nb1, dtype="float")
matrix_index = np.zeros(nb1, dtype="int")
X, sigma = _init_dist_profile(size, n1, signal1)
# compute matrix profile
for idx in index:
# compute distance profile
query = signal2[idx : idx + size]
dist = _ditance_profile(size, n1, query, X, sigma)
dist = np.abs(np.sqrt(dist)) # to have euclidean distance
# find nearest neighbor
pos = dist <= matrix_profile
matrix_profile[pos] = dist[pos]
matrix_index[pos] = idx
# output
args = (matrix_index, matrix_profile)
names = ("matrix_index", "matrix_profile")
return utils.ReturnTuple(args, names)
[docs]def mean_waves(data=None, size=None, step=None):
"""Extract mean samples from a data set.
Parameters
----------
data : array
An m by n array of m data samples in an n-dimensional space.
size : int
Number of samples to use for each mean sample.
step : int, optional
Number of samples to jump, controlling overlap; default is equal to
`size` (no overlap).
Returns
-------
waves : array
An k by n array of mean samples.
Notes
-----
* Discards trailing samples if they are not enough to satify the `size`
parameter.
Raises
------
ValueError
If `step` is an invalid value.
ValueError
If there are not enough samples for the given `size`.
"""
# check inputs
if data is None:
raise TypeError("Please specify an input data set.")
if size is None:
raise TypeError("Please specify the number of samples for the mean.")
if step is None:
step = size
if step < 0:
raise ValueError("The step must be a positive integer.")
# number of waves
L = len(data) - size
nb = 1 + L // step
if nb <= 0:
raise ValueError("Not enough samples for the given `size`.")
# compute
waves = [np.mean(data[i : i + size], axis=0) for i in range(0, L + 1, step)]
waves = np.array(waves)
return utils.ReturnTuple((waves,), ("waves",))
[docs]def detrend_smoothness_priors(signal, smoothing_factor=10):
""" Detrending method based on smoothness priors applied to HRV signal analysis.
Follows the approach by Tarvainen et al. [Tarivainen2002].
Parameters
----------
signal : array
Signal to be detrended.
smoothing_factor : int, float, optional
Smoothing parameter lambda. Default: 10.
Returns
-------
detrended : array
Detrended signal.
trend : array
The trend component of the signal.
References
----------
.. [Tarivainen2002] M. P. Tarvainen, P. O. Ranta-aho and P. A. Karjalainen,
"An advanced detrending method with application to HRV analysis," in IEEE
Transactions on Biomedical Engineering, vol. 49, no. 2, pp. 172-175, Feb.
2002, doi: 10.1109/10.979357.
"""
# variables
t = len(signal)
identity = eye(t)
# first computation (D2)
aux1 = np.dot(np.ones((t, 1)), np.array([[1, -2, 1]]))
d2 = spdiags(aux1.T, [0, 1, 2], t-2, t)
# second computation (z_stat)
aux2 = smoothing_factor ** 2 * d2.T * d2
inv = np.linalg.inv(csr_matrix(identity) + aux2.todense())
z_stat = csr_matrix(identity - inv) * signal
# detrending
z_trend = np.asarray(signal - z_stat)
z_detrended = np.array(signal) - z_trend
return utils.ReturnTuple((z_detrended.T, z_trend.T), ('detrended', 'trend'))