Source code for biosppy.signals.ppg

# -*- coding: utf-8 -*-
"""
biosppy.signals.ppg
-------------------

This module provides methods to process Photoplethysmogram (PPG) signals.

:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes
:license: BSD 3-clause, see LICENSE for more details.
"""

# Imports
# compat
from __future__ import absolute_import, division, print_function
from six.moves import range

# 3rd party
import numpy as np
import scipy.signal as ss
import matplotlib.pyplot as plt
from scipy.stats import gaussian_kde

# local
from . import tools as st
from .. import plotting, utils


[docs]def ppg(signal=None, sampling_rate=1000., units=None, show=True): """Process a raw PPG signal and extract relevant signal features using default parameters. Parameters ---------- signal : array Raw PPG signal. sampling_rate : int, float, optional Sampling frequency (Hz). units : str, optional The units of the input signal. If specified, the plot will have the y-axis labeled with the corresponding units. show : bool, optional If True, show a summary plot. Returns ------- ts : array Signal time axis reference (seconds). filtered : array Filtered PPG signal. peaks : array Indices of PPG pulse peaks. templates_ts : array Templates time axis reference (seconds). templates : array Extracted heartbeat templates. heart_rate_ts : array Heart rate time axis reference (seconds). heart_rate : array Instantaneous heart rate (bpm). """ # check inputs if signal is None: raise TypeError("Please specify an input signal.") # ensure numpy signal = np.array(signal) sampling_rate = float(sampling_rate) # filter signal filtered, _, _ = st.filter_signal(signal=signal, ftype='butter', band='bandpass', order=4, frequency=[1, 8], sampling_rate=sampling_rate) # find peaks peaks, _ = find_onsets_elgendi2013(signal=filtered, sampling_rate=sampling_rate) # extract templates onsets, peaks, segments_loc = ppg_segmentation(filtered, sampling_rate, peaks) templates_ts, templates = _extract_templates(filtered, sampling_rate, onsets, peaks, segments_loc) # compute heart rate hr_idx, hr = st.get_heart_rate(beats=onsets, sampling_rate=sampling_rate, smooth=True, size=3) # get time vectors length = len(signal) T = (length - 1) / sampling_rate ts = np.linspace(0, T, length, endpoint=False) ts_hr = ts[hr_idx] # plot if show: plotting.plot_ppg(ts=ts, raw=signal, filtered=filtered, peaks=peaks, templates_ts=templates_ts, templates=templates, heart_rate_ts=ts_hr, heart_rate=hr, units=units, path=None, show=True) # output args = (ts, filtered, peaks, templates_ts, templates, ts_hr, hr) names = ('ts', 'filtered', 'peaks', 'templates_ts', 'templates', 'heart_rate_ts', 'heart_rate') return utils.ReturnTuple(args, names)
[docs]def find_onsets_elgendi2013(signal=None, sampling_rate=1000., peakwindow=0.111, beatwindow=0.667, beatoffset=0.02, mindelay=0.3): """ Determines onsets of PPG pulses. Parameters ---------- signal : array Input filtered PPG signal. sampling_rate : int, float, optional Sampling frequency (Hz). peakwindow : float Parameter W1 on referenced article Optimized at 0.111 beatwindow : float Parameter W2 on referenced article Optimized at 0.667 beatoffset : float Parameter beta on referenced article Optimized at 0.2 mindelay : float Minimum delay between peaks. Avoids false positives Returns ---------- onsets : array Indices of PPG pulse onsets. params : dict Input parameters of the function References ---------- - Elgendi M, Norton I, Brearley M, Abbott D, Schuurmans D (2013) Systolic Peak Detection in Acceleration Photoplethysmograms Measured from Emergency Responders in Tropical Conditions. PLoS ONE 8(10): e76585. doi:10.1371/journal.pone.0076585. Notes --------------------- Optimal ranges for signal filtering (from Elgendi et al. 2013): "Optimization of the beat detector’s spectral window for the lower frequency resulted in a value within 0.5– 1 Hz with the higher frequency within 7–15 Hz" All the number references below between curly brackets {...} by the code refer to the line numbers of code in "Table 2 Algorithm IV: DETECTOR (PPG signal, F1, F2, W1, W2, b)" from Elgendi et al. 2013 for a better comparison of the algorithm """ # check inputs if signal is None: raise TypeError("Please specify an input signal.") # Create copy of signal (not to modify the original object) signal_copy = np.copy(signal) # Truncate to zero and square # {3, 4} signal_copy[signal_copy < 0] = 0 squared_signal = signal_copy ** 2 # Calculate peak detection threshold # {5} ma_peak_kernel = int(np.rint(peakwindow * sampling_rate)) ma_peak, _ = st.smoother(squared_signal, kernel="boxcar", size=ma_peak_kernel) # {6} ma_beat_kernel = int(np.rint(beatwindow * sampling_rate)) ma_beat, _ = st.smoother(squared_signal, kernel="boxcar", size=ma_beat_kernel) # Calculate threshold value # {7, 8, 9} thr1 = ma_beat + beatoffset * np.mean(squared_signal) # Identify start and end of PPG waves. # {10-16} waves = ma_peak > thr1 beg_waves = np.where(np.logical_and(np.logical_not(waves[0:-1]), waves[1:]))[0] end_waves = np.where(np.logical_and(waves[0:-1], np.logical_not(waves[1:])))[0] # Throw out wave-ends that precede first wave-start. end_waves = end_waves[end_waves > beg_waves[0]] # Identify systolic peaks within waves (ignore waves that are too short). num_waves = min(beg_waves.size, end_waves.size) # {18} min_len = int(np.rint(peakwindow * sampling_rate)) min_delay = int(np.rint(mindelay * sampling_rate)) onsets = [0] # {19} for i in range(num_waves): beg = beg_waves[i] end = end_waves[i] len_wave = end - beg # {20, 22, 23} if len_wave < min_len: continue # Find local maxima and their prominence within wave span. # {21} data = signal_copy[beg:end] locmax, props = ss.find_peaks(data, prominence=(None, None)) # If more than one peak if locmax.size > 0: # Identify most prominent local maximum. peak = beg + locmax[np.argmax(props["prominences"])] # Enforce minimum delay between onsets. if peak - onsets[-1] > min_delay: onsets.append(peak) onsets.pop(0) onsets = np.array(onsets, dtype='int') # output params = {'signal': signal, 'sampling_rate': sampling_rate, 'peakwindow': peakwindow, 'beatwindow': beatwindow, 'beatoffset': beatoffset, 'mindelay': mindelay} args = (onsets, params) names = ('onsets', 'params') return utils.ReturnTuple(args, names)
[docs]def find_onsets_kavsaoglu2016( signal=None, sampling_rate=1000.0, alpha=0.2, k=4, init_bpm=90, min_delay=0.6, max_BPM=150, ): """ Determines onsets of PPG pulses. Parameters ---------- signal : array Input filtered PPG signal. sampling_rate : int, float, optional Sampling frequency (Hz). alpha : float, optional Low-pass filter factor. Avoids abrupt changes of BPM. k : int, float, optional Number of segments by pulse. Width of each segment = Period of pulse according to current BPM / k init_bpm : int, float, optional Initial BPM. Higher value results in a smaller segment width. min_delay : float Minimum delay between peaks as percentage of current BPM pulse period. Avoids false positives max_bpm : int, float, optional Maximum BPM. Maximum value accepted as valid BPM. Returns ---------- onsets : array Indices of PPG pulse onsets. window_marks : array Indices of segments window boundaries. params : dict Input parameters of the function References ---------- - Kavsaoğlu, Ahmet & Polat, Kemal & Bozkurt, Mehmet. (2016). An innovative peak detection algorithm for photoplethysmography signals: An adaptive segmentation method. TURKISH JOURNAL OF ELECTRICAL ENGINEERING & COMPUTER SCIENCES. 24. 1782-1796. 10.3906/elk-1310-177. Notes --------------------- This algorithm is an adaption of the one described on Kavsaoğlu et al. (2016). This version takes into account a minimum delay between peaks and builds upon the adaptive segmentation by using a low-pass filter for BPM changes. This way, even if the algorithm wrongly detects a peak, the BPM value will stay relatively constant so the next pulse can be correctly segmented. """ # check inputs if signal is None: raise TypeError("Please specify an input signal.") if alpha <= 0 or alpha > 1: raise TypeError("The value of alpha must be in the range: ]0, 1].") if k <= 0: raise TypeError("The number of divisions by pulse should be greater than 0.") if init_bpm <= 0: raise TypeError("Provide a valid BPM value for initial estimation.") if min_delay < 0 or min_delay > 1: raise TypeError( "The minimum delay percentage between peaks must be between 0 and 1" ) if max_BPM >= 248: raise TypeError("The maximum BPM must assure the person is alive") # current bpm bpm = init_bpm # current segment window width window = int(sampling_rate * (60 / bpm) / k) # onsets array onsets = [] # window marks array - stores the boundaries of each segment window_marks = [] # buffer for peak indices idx_buffer = [-1, -1, -1] # buffer to store the previous 3 values for onset detection min_buffer = [0, 0, 0] # signal pointer i = 0 while i + window < len(signal): # remove oldest values idx_buffer.pop(0) min_buffer.pop(0) # add the index of the minimum value of the current segment to buffer idx_buffer.append(int(i + np.argmin(signal[i : i + window]))) # add the minimum value of the current segment to buffer min_buffer.append(signal[idx_buffer[-1]]) if ( # the buffer has to be filled with valid values idx_buffer[0] > -1 # the center value of the buffer must be smaller than its neighbours and (min_buffer[1] < min_buffer[0] and min_buffer[1] <= min_buffer[2]) # if an onset was previously detected, guarantee that the new onset respects the minimum delay, minimum BPM and maximum BPM and ( len(onsets) == 0 or ( (idx_buffer[1] - onsets[-1]) / sampling_rate >= min_delay * 60 / bpm and (idx_buffer[1] - onsets[-1]) / sampling_rate > 60 / max_BPM ) ) ): # store the onset onsets.append(idx_buffer[1]) # if more than one onset was detected, update the bpm and the segment width if len(onsets) > 1: # calculate new bpm from the latest two onsets new_bpm = int(60 * sampling_rate / (onsets[-1] - onsets[-2])) # update the bpm value bpm = alpha * new_bpm + (1 - alpha) * bpm # update the segment window width window = int(sampling_rate * (60 / bpm) / k) # update the signal pointer i += window # store window segment boundaries index window_marks.append(i) onsets = np.array(onsets, dtype="int") window_marks = np.array(window_marks, dtype="int") # output params = { "signal": signal, "sampling_rate": sampling_rate, "alpha": alpha, "k": k, "init_bpm": init_bpm, "min_delay": min_delay, "max_bpm": max_BPM, } args = (onsets, window_marks, params) names = ("onsets", "window_marks", "params") return utils.ReturnTuple(args, names)
[docs]def ppg_segmentation(signal=None, sampling_rate=1000., peaks=None, selection=False, peak_threshold=None): """Segments a filtered PPG signal. Segmentation filtering is achieved by taking into account segments selected by peak height and pulse morphology. Parameters ---------- signal : array Filtered PPG signal. sampling_rate : int, float, optional Sampling frequency (Hz). peaks : array List of PPG systolic peaks. selection : bool, optional If True, performs selection with peak height and pulse morphology. peak_threshold : int, float, optional If `selection` is True, selects peaks with height greater than defined threshold. Returns ------- onsets : array Indices of PPG pulse onsets (i.e., start of beats) of the selected segments. peaks : array List of PPG systolic peaks of the selected segments. segments_loc : array Start and end indices for each selected pulse segment. """ # check inputs if signal is None or peaks is None: raise TypeError("Please check inputs.") # ensure input format signal = np.array(signal) sampling_rate = float(sampling_rate) # find onsets onsets = [] minima = (np.diff(np.sign(np.diff(signal))) > 0).nonzero()[0] for ind in peaks: onsets.append(minima[minima < ind].max()) onsets = np.array(onsets, dtype='int') # raise error if onset detection failed if len(onsets) == 0: raise TypeError("No onsets detected.") # assign start and end of each segment segments_loc = np.vstack((onsets[:-1], onsets[1:])).T # segment selection by morphology if selection: segments_sel = [] for ind in range(segments_loc.shape[0]): # search segments with at least 4 max+min (standard waveform) segment = signal[segments_loc[ind, 0]: segments_loc[ind, 1]] if len(np.where(np.diff(np.sign(np.diff(segment))))[0]) >= 4: segments_sel.append(ind) segments_loc = segments_loc[segments_sel] onsets = onsets[segments_sel] peaks = peaks[segments_sel] # segment selection by height if peak_threshold is not None: segments_sel = [] for ind in range(segments_loc.shape[0]): # search segments with peak higher than threshold segment = signal[segments_loc[ind, 0]: segments_loc[ind, 1]] if max(segment) > peak_threshold: segments_sel.append(ind) segments_loc = segments_loc[segments_sel] onsets = onsets[segments_sel] peaks = peaks[segments_sel] # output args = (onsets, peaks, segments_loc) names = ('onsets', 'peaks', 'segments_loc') return utils.ReturnTuple(args, names)
def _extract_templates(signal=None, sampling_rate=1000., onsets=None, peaks=None, segments_loc=None): """Extracts the templates from the PPG signal, which are aligned with their systolic peaks. To achieve this, the segments are padded with NaNs. Should be used in combination with signals.ppg.ppg_segmentation. Parameters ---------- signal : array Filtered PPG signal. sampling_rate : int, float, optional Sampling frequency (Hz). onsets : array List of onsets (i.e., start of beats) of the PPG waves. peaks : array List of PPG systolic peaks. segments_loc : array Start and end indices for each selected pulse segment. Returns ------- templates_ts : array Time axis common to all templates. templates : array List of templates aligned with the systolic peaks. """ # initialize output templates = [] # find the longest onset-peak duration shifts = peaks - onsets max_shift = np.max(peaks - onsets) # left padding max_len = 0 for i in range(len(segments_loc)): segment = signal[segments_loc[i, 0]: segments_loc[i, 1]] segment = np.pad(segment, max_shift - shifts[i], mode='constant', constant_values=(np.nan,)) templates.append(segment) # find the largest segment if len(segment) > max_len: max_len = len(segment) # right padding for index, segment in enumerate(templates): templates[index] = np.pad(segment, (0, max_len - len(segment)), mode='constant', constant_values=(np.nan,)) templates = np.asarray(templates).T # time vector templates_ts = np.arange(-max_shift, max_len - max_shift) / sampling_rate # output args = (templates_ts, templates) names = ('templates_ts', 'templates') return utils.ReturnTuple(args, names)