Source code for biosppy.signals.eda

# -*- coding: utf-8 -*-
"""
biosppy.signals.eda
-------------------

This module provides methods to process Electrodermal Activity (EDA)
signals, also known as Galvanic Skin Response (GSR).

:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes
:license: BSD 3-clause, see LICENSE for more details.
"""

# Imports
# compat
from __future__ import absolute_import, division, print_function
from six.moves import range

# 3rd party
import numpy as np
from scipy import interpolate

# local
from . import tools as st
from .. import plotting, utils


[docs]def eda(signal=None, sampling_rate=1000., units=None, path=None, show=True): """Process a raw EDA signal and extract relevant signal features using default parameters. Parameters ---------- signal : array Raw EDA signal. sampling_rate : int, float, optional Sampling frequency (Hz). units : str, optional The units of the input signal. If specified, the plot will have the y-axis labeled with the corresponding units. path : str, optional If provided, the plot will be saved to the specified file. show : bool, optional If True, show a summary plot. Returns ------- ts : array Signal time axis reference (seconds). filtered : array Filtered EDA signal. edr : array Electrodermal response (EDR) signal. edl : array Electrodermal level (EDL) signal. onsets : array Indices of SCR pulse onsets. peaks : array Indices of the SCR peaks. amplitudes : array SCR pulse amplitudes. """ # check inputs if signal is None: raise TypeError("Please specify an input signal.") # ensure numpy signal = np.array(signal) sampling_rate = float(sampling_rate) # filter signal aux, _, _ = st.filter_signal( signal=signal, ftype="butter", band="lowpass", order=4, frequency=5, sampling_rate=sampling_rate, ) # smooth sm_size = int(0.75 * sampling_rate) filtered, _ = st.smoother(signal=aux, kernel="boxzen", size=sm_size, mirror=True) # get SCR info onsets, peaks, amplitudes, phasic_rate, rise_times, half_rec, six_rec = eda_events(signal=filtered, sampling_rate=sampling_rate, min_amplitude=0.1, size=0.9) # get time vectors length = len(signal) t = (length - 1) / sampling_rate ts = np.linspace(0, t, length, endpoint=True) # get EDR and EDL edl_signal, edr_signal = biosppy_decomposition(signal=filtered, sampling_rate=sampling_rate, method="onsets", onsets=onsets) # plot if show: plotting.plot_eda( ts=ts, raw=signal, filtered=filtered, edr=edr_signal, edl=edl_signal, onsets=onsets, peaks=peaks, amplitudes=amplitudes, units=units, path=path, show=True, ) # output args = (ts, filtered, edr_signal, edl_signal, onsets, peaks, amplitudes, phasic_rate, rise_times, half_rec, six_rec) names = ("ts", "filtered", "edr", "edl", "onsets", "peaks", "amplitudes", "phasic_rate", "rise_times", "half_rec", "six_rec") return utils.ReturnTuple(args, names)
[docs]def eda_events(signal=None, sampling_rate=1000., method="emotiphai", **kwargs): """Returns characteristic EDA events. Parameters ---------- signal : array Input signal. sampling_rate : int, float, optional Sampling frequency (Hz). method : str, optional Method to compute eda events: 'emotiphai', 'kbk' or 'basic'. kwargs : dict, optional Method parameters. Returns ------- onsets : array Signal EDR events onsets. peaks : array Signal EDR events peaks. amps : array Signal EDR events Amplitudes. phasic_rate : array Signal EDR events rate in 60s. rise_times : array Rise times, i.e. onset-peak time difference. half_rec : array Half Recovery times, i.e. time between peak and 63% amplitude. six_rec : array 63 % recovery times, i.e. time between peak and 50% amplitude. """ # check inputs if signal is None: raise TypeError("Please specify an input signal.") # ensure numpy signal = np.array(signal) # compute onsets, peaks and amplitudes if method == "emotiphai": onsets, peaks, amps = emotiphai_eda(signal=signal, sampling_rate=sampling_rate, **kwargs) elif method == "kbk": onsets, peaks, amps = kbk_scr(signal=signal, sampling_rate=sampling_rate, **kwargs) elif method == "basic": onsets, peaks, amps = basic_scr(signal=signal) else: raise TypeError("Please specify a supported method.") # compute phasic rate try: phasic_rate = sampling_rate * (60. / np.diff(peaks)) except Exception as e: print(e) phasic_rate = None # compute rise times rise_times = (peaks - onsets) / sampling_rate # to seconds # compute half and 63% recovery times half_rec, six_rec = rec_times(signal=signal, sampling_rate=sampling_rate, onsets=onsets, peaks=peaks) args = (onsets, peaks, amps, phasic_rate, rise_times, half_rec, six_rec) names = ("onsets", "peaks", "amplitudes", "phasic_rate", "rise_times", "half_rec", "six_rec") return utils.ReturnTuple(args, names)
[docs]def biosppy_decomposition(signal=None, sampling_rate=1000.0, method="smoother", onsets=None, **kwargs): """Extracts EDL and EDR signals using either a smoothing filter or onsets' interpolation. Parameters ---------- signal : array Input filtered EDA signal. sampling_rate : int, float, optional Sampling frequency (Hz). method: str, optional Method to compute the edl signal: "smoother" to compute a smoothing filter; "onsets" to obtain edl by onsets' interpolation. onsets : array, optional List of onsets for the interpolation method. kwargs : dict, optional window_size : Size of the smoother kernel (seconds). Returns ------- edl : array Electrodermal level (EDL) signal. edr : array Electrodermal response (EDR) signal. References ---------- .. [KiBK04] K.H. Kim, S.W. Bang, and S.R. Kim, "Emotion recognition system using short-term monitoring of physiological signals", Med. Biol. Eng. Comput., vol. 42, pp. 419-427, 2004 """ # check inputs if signal is None: raise TypeError("Please specify an input signal.") if method == "onsets" and onsets is None: raise TypeError("Please specify 'onsets' to use the onset " "interpolation method.") # smooth method if method == "smoother": window_size = kwargs['window_size'] if 'window_size' in kwargs else 10.0 size = int(window_size * sampling_rate) edl_signal, _ = st.smoother(signal=signal, kernel="bartlett", size=size, mirror=True) # interpolation method elif method == "onsets": # get time vectors length = len(signal) t = (length - 1) / sampling_rate ts = np.linspace(0, t, length, endpoint=True) # extract edl edl_on = np.hstack((ts[0], ts[onsets], ts[-1])) edl_amp = np.hstack((signal[0], signal[onsets], signal[-1])) f = interpolate.interp1d(edl_on, edl_amp) edl_signal = f(ts) else: raise TypeError("Please specify a supported method.") # differentiation df = np.diff(signal) # smooth size = int(1.0 * sampling_rate) edr_signal, _ = st.smoother(signal=df, kernel="bartlett", size=size, mirror=True) # output args = (edl_signal, edr_signal) names = ("edl", "edr") return utils.ReturnTuple(args, names)
[docs]def cvx_decomposition(signal=None, sampling_rate=1000.0, tau0=2., tau1=0.7, delta_knot=10., alpha=8e-4, gamma=1e-2, solver=None, options={'reltol': 1e-9}): """Performs EDA decomposition using the cvxEDA algorithm. This function was originally developed by Luca Citi and Alberto Greco. You can find the original code and repository at: https://github.com/lciti/cvxEDA If you use this function in your work, please cite the original authors as follows: A Greco, G Valenza, A Lanata, EP Scilingo, and L Citi "cvxEDA: a Convex Optimization Approach to Electrodermal Activity Processing" IEEE Transactions on Biomedical Engineering, 2015. This function is used under the terms of the GNU General Public License v3.0 (GPLv3). You should comply with the GPLv3 if you use this code (see 'License' section below). Copyright (C) 2014-2015 Luca Citi, Alberto Greco Parameters ---------- signal : array Observed EDA signal (we recommend normalizing it: y = zscore(y)) sampling_rate : int, float Sampling frequency (Hz). tau0 : float Slow time constant of the Bateman function tau1 : float Fast time constant of the Bateman function delta_knot: float Time between knots of the tonic spline function alpha: float Penalization for the sparse SMNA driver gamma : float Penalization for the tonic spline coefficients solver : ndarray Sparse QP solver to be used, see cvxopt.solvers.qp options : dict solver options, see: http://cvxopt.org/userguide/coneprog.html#algorithm-parameters Returns ------- edr : array Phasic component smna : array Sparse SMNA driver of phasic component edl : array Tonic component tonic_coeff : array Coefficients of tonic spline linear_drift : array Offset and slope of the linear drift term res : array Model residuals obj : array Value of objective function being minimized (eq 15 of paper) References ---------- .. [cvxEDA] A Greco, G Valenza, A Lanata, EP Scilingo, and L Citi "cvxEDA: a Convex Optimization Approach to Electrodermal Activity Processing" IEEE Transactions on Biomedical Engineering, 2015. DOI: 10.1109/TBME.2015.2474131 .. [Figner2011] Figner, Bernd & Murphy, Ryan. (2011). Using skin conductance in judgment and decision making research. A Handbook of Process Tracing Methods for Decision Research. License ------- The cvxEDA function is distributed under the GNU General Public License v3.0 (GPLv3). For details, please see the full license text at: https://www.gnu.org/licenses/gpl-3.0.en.html This code is provided as-is, without any warranty or support from the original authors. Notes ----- Changes from original code: - 'y' -> 'signal' - 'delta' -> 1. / 'sampling_rate' """ # try to import cvxopt try: import cvxopt as cv except ImportError: raise ImportError("The 'cvxopt' module is required for this function " "to run. Please install it first (`pip install " "cvxopt`).") # check inputs if signal is None: raise TypeError("Please specify an input signal.") n = len(signal) y = cv.matrix(signal) delta = 1. / sampling_rate # sampling interval in seconds # bateman ARMA model a1 = 1. / min(tau1, tau0) # a1 > a0 a0 = 1. / max(tau1, tau0) ar = np.array([(a1 * delta + 2.) * (a0 * delta + 2.), 2. * a1 * a0 * delta ** 2 - 8., (a1 * delta - 2.) * (a0 * delta - 2.)]) / ((a1 - a0) * delta ** 2) ma = np.array([1., 2., 1.]) # matrices for ARMA model i = np.arange(2, n) A = cv.spmatrix(np.tile(ar, (n - 2, 1)), np.c_[i, i, i], np.c_[i, i - 1, i - 2], (n, n)) M = cv.spmatrix(np.tile(ma, (n - 2, 1)), np.c_[i, i, i], np.c_[i, i - 1, i - 2], (n, n)) # spline delta_knot_s = int(round(delta_knot / delta)) spl = np.r_[np.arange(1., delta_knot_s), np.arange(delta_knot_s, 0., -1.)] # order 1 spl = np.convolve(spl, spl, 'full') spl /= max(spl) # matrix of spline regressors i = np.c_[np.arange(-(len(spl) // 2), (len(spl) + 1) // 2)] + np.r_[np.arange(0, n, delta_knot_s)] nB = i.shape[1] j = np.tile(np.arange(nB), (len(spl), 1)) p = np.tile(spl, (nB, 1)).T valid = (i >= 0) & (i < n) B = cv.spmatrix(p[valid], i[valid], j[valid]) # trend C = cv.matrix(np.c_[np.ones(n), np.arange(1., n + 1.) / n]) nC = C.size[1] # Solve the problem: # .5*(M*q + B*l + C*d - y)^2 + alpha*sum(A,1)*p + .5*gamma*l'*l # s.t. A*q >= 0 old_options = cv.solvers.options.copy() cv.solvers.options.clear() cv.solvers.options.update(options) if solver == 'conelp': # Use conelp z = lambda m, n: cv.spmatrix([], [], [], (m, n)) G = cv.sparse([[-A, z(2, n), M, z(nB + 2, n)], [z(n + 2, nC), C, z(nB + 2, nC)], [z(n, 1), -1, 1, z(n + nB + 2, 1)], [z(2 * n + 2, 1), -1, 1, z(nB, 1)], [z(n + 2, nB), B, z(2, nB), cv.spmatrix(1.0, range(nB), range(nB))]]) h = cv.matrix([z(n, 1), .5, .5, y, .5, .5, z(nB, 1)]) c = cv.matrix([(cv.matrix(alpha, (1, n)) * A).T, z(nC, 1), 1, gamma, z(nB, 1)]) res = cv.solvers.conelp(c, G, h, dims={'l': n, 'q': [n + 2, nB + 2], 's': []}) obj = res['primal objective'] else: # Use qp Mt, Ct, Bt = M.T, C.T, B.T H = cv.sparse([[Mt * M, Ct * M, Bt * M], [Mt * C, Ct * C, Bt * C], [Mt * B, Ct * B, Bt * B + gamma * cv.spmatrix(1.0, range(nB), range(nB))]]) f = cv.matrix([(cv.matrix(alpha, (1, n)) * A).T - Mt * y, -(Ct * y), -(Bt * y)]) res = cv.solvers.qp(H, f, cv.spmatrix(-A.V, A.I, A.J, (n, len(f))), cv.matrix(0., (n, 1)), solver=solver) obj = res['primal objective'] + .5 * (y.T * y) cv.solvers.options.clear() cv.solvers.options.update(old_options) l = res['x'][-nB:] d = res['x'][n:n + nC] t = B * l + C * d q = res['x'][:n] p = A * q r = M * q e = y - r - t # output args = list(np.array(a).ravel() for a in (r, p, t, l, d, e, obj)) names = ("edr", "smna", "edl", "tonic_coeff", "linear_drift", "res", "obj") return utils.ReturnTuple(args, names)
[docs]def basic_scr(signal=None): """Basic method to extract Skin Conductivity Responses (SCR) from an EDA signal. Follows the approach in [Gamb08]_. Parameters ---------- signal : array Input filtered EDA signal. Returns ------- onsets : array Indices of the SCR onsets. peaks : array Indices of the SCR peaks. amplitudes : array SCR pulse amplitudes. References ---------- .. [Gamb08] Hugo Gamboa, "Multi-modal Behavioral Biometrics Based on HCI and Electrophysiology", PhD thesis, Instituto Superior T{\'e}cnico, 2008 """ # check inputs if signal is None: raise TypeError("Please specify an input signal.") # find extrema pi, _ = st.find_extrema(signal=signal, mode="max") ni, _ = st.find_extrema(signal=signal, mode="min") # sanity check if len(pi) == 0 or len(ni) == 0: raise ValueError("Could not find SCR pulses.") # pair vectors if ni[0] > pi[0]: ni = ni[1:] if pi[-1] < ni[-1]: pi = pi[:-1] if len(pi) > len(ni): pi = pi[:-1] li = min(len(pi), len(ni)) i1 = pi[:li] i3 = ni[:li] # indices i0 = np.array((i1 + i3) / 2.0, dtype=int) if i0[0] < 0: i0[0] = 0 # amplitude a = signal[i0] - signal[i3] # output args = (i3, i0, a) names = ("onsets", "peaks", "amplitudes") return utils.ReturnTuple(args, names)
[docs]def kbk_scr(signal=None, sampling_rate=1000.0, min_amplitude=0.1): """KBK method to extract Skin Conductivity Responses (SCR) from an EDA signal. Follows the approach by Kim *et al.* [KiBK04]_. Parameters ---------- signal : array Input filtered EDA signal. sampling_rate : int, float, optional Sampling frequency (Hz). min_amplitude : float, optional Minimum threshold by which to exclude SCRs. Returns ------- onsets : array Indices of the SCR onsets. peaks : array Indices of the SCR peaks. amplitudes : array SCR pulse amplitudes. References ---------- .. [KiBK04] K.H. Kim, S.W. Bang, and S.R. Kim, "Emotion recognition system using short-term monitoring of physiological signals", Med. Biol. Eng. Comput., vol. 42, pp. 419-427, 2004 """ # check inputs if signal is None: raise TypeError("Please specify an input signal.") # extract edr signal df = biosppy_decomposition(signal, sampling_rate=sampling_rate)['edr'] # zero crosses (zeros,) = st.zero_cross(signal=df, detrend=False) if np.all(df[: zeros[0]] > 0): zeros = zeros[1:] if np.all(df[zeros[-1]:] > 0): zeros = zeros[:-1] scrs, amps, ZC, peaks = [], [], [], [] for i in range(0, len(zeros) - 1, 2): scrs += [df[zeros[i]: zeros[i + 1]]] ZC += [zeros[i]] ZC += [zeros[i + 1]] peaks += [zeros[i] + np.argmax(df[zeros[i]: zeros[i + 1]])] amps += [signal[peaks[-1]] - signal[ZC[-2]]] # exclude SCRs with small amplitude thr = min_amplitude * np.max(amps) idx = np.where(amps > thr) scrs = np.array(scrs, dtype=np.object)[idx] amps = np.array(amps)[idx] ZC = np.array(ZC)[np.array(idx) * 2] peaks = np.array(peaks, dtype=int)[idx] onsets = ZC[0].astype(int) # output args = (onsets, peaks, amps) names = ("onsets", "peaks", "amplitudes") return utils.ReturnTuple(args, names)
[docs]def emotiphai_eda(signal=None, sampling_rate=1000., min_amplitude=0.1, filt=True, size=1.): """Returns characteristic EDA events. Parameters ---------- signal : array Input signal. sampling_rate : int, float, optional Sampling frequency (Hz). min_amplitude : float, optional Minimum threshold by which to exclude SCRs. filt: bool, optional Whether to filter signal to remove noise and low amplitude events. size: float Size of the filter in seconds Returns ------- onsets : array Indices of the SCR onsets. peaks : array Indices of the SCR peaks. amplitudes : array SCR pulse amplitudes. """ # check inputs if signal is None: raise TypeError("Please specify an input signal.") # smooth if filt: try: if sampling_rate > 1: signal, _, _ = st.filter_signal(signal=signal, ftype='butter', band='lowpass', order=4, frequency=2, sampling_rate=sampling_rate) except Exception as e: print(e, "Error filtering EDA") # smooth try: sm_size = int(size * sampling_rate) signal, _ = st.smoother(signal=signal, kernel='boxzen', size=sm_size, mirror=True) except Exception as e: print(e) # extract onsets, peaks and amplitudes onsets, peaks, amps = [], [], [] zeros = st.find_extrema(signal=signal, mode='min')[0] # get zeros for z in range(len(zeros)): if z == len(zeros) - 1: # last zero s = signal[zeros[z]:] # signal amplitude between event else: s = signal[zeros[z]:zeros[z + 1]] # signal amplitude between event pk = st.find_extrema(signal=s, mode='max')[0] # get pk between events for p in pk: if (s[p] - s[0]) > min_amplitude: # only count events with minimum amplitude peaks += [zeros[z] + p] onsets += [zeros[z]] amps += [s[p] - s[0]] # convert to array onsets, peaks, amps = np.array(onsets), np.array(peaks), np.array(amps) # output args = (onsets, peaks, amps) names = ("onsets", "peaks", "amplitudes") return utils.ReturnTuple(args, names)
[docs]def rec_times(signal=None, sampling_rate=1000., onsets=None, peaks=None): """Returns EDA recovery times. Parameters ---------- signal : array Input signal. sampling_rate : int, float, optional Sampling frequency (Hz). onsets : array Indices of the SCR onsets. peaks : array Indices of the SCR peaks. Returns ------- half_rec : list Half Recovery times, i.e. time between peak and 50% amplitude. six_rec : list 63 % recovery times, i.e. time between peak and 63% amplitude. """ # ensure input format peaks = np.array(peaks, dtype=int) onsets = np.array(onsets, dtype=int) amps = np.array(signal[peaks[:]] - signal[onsets[:]]) # SCR amplitudes li = min(len(onsets), len(peaks)) half_rec, six_rec = [], [] for i in range(li): # iterate over onset half_rec_amp = 0.5 * amps[i] + signal[onsets][i] six_rec_amp = 0.37 * amps[i] + signal[onsets][i] try: wind = np.array(signal[peaks[i]:onsets[i + 1]]) except: wind = np.array(signal[peaks[i]:]) # last peak to end of signal half_rec_idx = np.argwhere(wind <= half_rec_amp) six_rec_idx = np.argwhere(wind <= six_rec_amp) if len(half_rec_idx) > 0: half_rec += [half_rec_idx[0][0] / sampling_rate] else: half_rec += [None] if len(six_rec_idx) > 0: six_rec += [six_rec_idx[0][0] / sampling_rate] else: six_rec += [None] # convert to numpy half_rec = np.array(half_rec) six_rec = np.array(six_rec) # output names = ("half_rec", "six_rec") args = (half_rec, six_rec) return utils.ReturnTuple(args, names)