Source code for biosppy.signals.abp

# -*- coding: utf-8 -*-
"""
biosppy.signals.abp
-------------------

This module provides methods to process Arterial Blood Pressure (ABP) signals.


:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes
:license: BSD 3-clause, see LICENSE for more details.
"""

# Imports
# compat
from __future__ import absolute_import, division, print_function
from six.moves import range

# 3rd party
import numpy as np

# local
from . import tools as st
from .. import plotting, utils


[docs]def abp(signal=None, sampling_rate=1000.0, show=True): """Process a raw ABP signal and extract relevant signal features using default parameters. Parameters ---------- signal : array Raw ABP signal. sampling_rate : int, float, optional Sampling frequency (Hz). show : bool, optional If True, show a summary plot. Returns ------- ts : array Signal time axis reference (seconds). filtered : array Filtered ABP signal. onsets : array Indices of ABP pulse onsets. heart_rate_ts : array Heart rate time axis reference (seconds). heart_rate : array Instantaneous heart rate (bpm). """ # check inputs if signal is None: raise TypeError("Please specify an input signal.") # ensure numpy signal = np.array(signal) sampling_rate = float(sampling_rate) # filter signal filtered, _, _ = st.filter_signal( signal=signal, ftype="butter", band="bandpass", order=4, frequency=[1, 8], sampling_rate=sampling_rate, ) # find onsets (onsets,) = find_onsets_zong2003(signal=filtered, sampling_rate=sampling_rate) # compute heart rate hr_idx, hr = st.get_heart_rate( beats=onsets, sampling_rate=sampling_rate, smooth=True, size=3 ) # get time vectors length = len(signal) T = (length - 1) / sampling_rate ts = np.linspace(0, T, length, endpoint=False) ts_hr = ts[hr_idx] # plot if show: plotting.plot_abp( ts=ts, raw=signal, filtered=filtered, onsets=onsets, heart_rate_ts=ts_hr, heart_rate=hr, path=None, show=True, ) # output args = (ts, filtered, onsets, ts_hr, hr) names = ("ts", "filtered", "onsets", "heart_rate_ts", "heart_rate") return utils.ReturnTuple(args, names)
[docs]def find_onsets_zong2003( signal=None, sampling_rate=1000.0, sm_size=None, size=None, alpha=2.0, wrange=None, d1_th=0, d2_th=None, ): """Determine onsets of ABP pulses. Skips corrupted signal parts. Based on the approach by Zong *et al.* [Zong03]_. Parameters ---------- signal : array Input filtered ABP signal. sampling_rate : int, float, optional Sampling frequency (Hz). sm_size : int, optional Size of smoother kernel (seconds). Defaults to 0.25 size : int, optional Window to search for maxima (seconds). Defaults to 5 alpha : float, optional Normalization parameter. Defaults to 2.0 wrange : int, optional The window in which to search for a peak (seconds). Defaults to 0.1 d1_th : int, optional Smallest allowed difference between maxima and minima. Defaults to 0 d2_th : int, optional Smallest allowed time between maxima and minima (seconds), Defaults to 0.15 Returns ------- onsets : array Indices of ABP pulse onsets. References ---------- .. [Zong03] W Zong, T Heldt, GB Moody and RG Mark, "An Open-source Algorithm to Detect Onset of Arterial Blood Pressure Pulses", IEEE Comp. in Cardiology, vol. 30, pp. 259-262, 2003 """ # check inputs if signal is None: raise TypeError("Please specify an input signal.") # parameters sm_size = 0.25 if not sm_size else sm_size sm_size = int(sm_size * sampling_rate) size = 5 if not size else size size = int(size * sampling_rate) wrange = 0.1 if not wrange else wrange wrange = int(wrange * sampling_rate) d2_th = 0.15 if not d2_th else d2_th d2_th = int(d2_th * sampling_rate) length = len(signal) # slope sum function dy = np.diff(signal) dy[dy < 0] = 0 ssf, _ = st.smoother(signal=dy, kernel="boxcar", size=sm_size, mirror=True) # main loop start = 0 stop = size if stop > length: stop = length idx = [] while True: sq = np.copy(signal[start:stop]) sq -= sq.mean() # sq = sq[1:] ss = 25 * ssf[start:stop] sss = 100 * np.diff(ss) sss[sss < 0] = 0 sss = sss - alpha * np.mean(sss) # find maxima pk, pv = st.find_extrema(signal=sss, mode="max") pk = pk[np.nonzero(pv > 0)] pk += wrange dpidx = pk # analyze between maxima of 2nd derivative of ss detected = False for i in range(1, len(dpidx) + 1): try: v, u = dpidx[i - 1], dpidx[i] except IndexError: v, u = dpidx[-1], -1 s = sq[v:u] Mk, Mv = st.find_extrema(signal=s, mode="max") mk, mv = st.find_extrema(signal=s, mode="min") try: M = Mk[np.argmax(Mv)] m = mk[np.argmax(mv)] except ValueError: continue if (s[M] - s[m] > d1_th) and (m - M > d2_th): idx += [v + start] detected = True # next round continues from previous detected beat if detected: start = idx[-1] + wrange else: start += size # stop condition if start > length: break # update stop stop += size if stop > length: stop = length idx = np.array(idx, dtype="int") return utils.ReturnTuple((idx,), ("onsets",))