# -*- coding: utf-8 -*-
"""
biosppy.signals.ecg
-------------------
This module provides methods to process Electrocardiographic (ECG) signals.
Implemented code assumes a single-channel Lead I like ECG signal.
:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes
:license: BSD 3-clause, see LICENSE for more details.
"""
# Imports
# compat
from __future__ import absolute_import, division, print_function
from six.moves import range, zip
# 3rd party
import math
import numpy as np
import scipy.signal as ss
import matplotlib.pyplot as plt
from scipy import stats, integrate
# local
from . import tools as st
from .. import plotting, utils
from biosppy.inter_plotting import ecg as inter_plotting
from scipy.signal import argrelextrema
[docs]def ecg(signal=None, sampling_rate=1000.0, units=None, path=None, show=True, interactive=False):
"""Process a raw ECG signal and extract relevant signal features using
default parameters.
Parameters
----------
signal : array
Raw ECG signal.
sampling_rate : int, float, optional
Sampling frequency (Hz).
units : str, optional
The units of the input signal. If specified, the plot will have the
y-axis labeled with the corresponding units.
path : str, optional
If provided, the plot will be saved to the specified file.
show : bool, optional
If True, show a summary plot.
interactive : bool, optional
If True, shows an interactive plot.
Returns
-------
ts : array
Signal time axis reference (seconds).
filtered : array
Filtered ECG signal.
rpeaks : array
R-peak location indices.
templates_ts : array
Templates time axis reference (seconds).
templates : array
Extracted heartbeat templates.
heart_rate_ts : array
Heart rate time axis reference (seconds).
heart_rate : array
Instantaneous heart rate (bpm).
"""
# check inputs
if signal is None:
raise TypeError("Please specify an input signal.")
# ensure numpy
signal = np.array(signal)
sampling_rate = float(sampling_rate)
# filter signal
order = int(1.5 * sampling_rate)
filtered, _, _ = st.filter_signal(
signal=signal,
ftype="FIR",
band="bandpass",
order=order,
frequency=[0.67, 45],
sampling_rate=sampling_rate,
)
filtered = filtered - np.mean(filtered) # remove DC offset
# segment
(rpeaks,) = hamilton_segmenter(signal=filtered, sampling_rate=sampling_rate)
# correct R-peak locations
(rpeaks,) = correct_rpeaks(
signal=filtered, rpeaks=rpeaks, sampling_rate=sampling_rate, tol=0.05
)
# extract templates
templates, rpeaks = extract_heartbeats(
signal=filtered,
rpeaks=rpeaks,
sampling_rate=sampling_rate,
before=0.2,
after=0.4,
)
# compute heart rate
hr_idx, hr = st.get_heart_rate(
beats=rpeaks, sampling_rate=sampling_rate, smooth=True, size=3
)
# get time vectors
length = len(signal)
T = (length - 1) / sampling_rate
ts = np.linspace(0, T, length, endpoint=True)
ts_hr = ts[hr_idx]
ts_tmpl = np.linspace(-0.2, 0.4, templates.shape[1], endpoint=False)
# plot
if show:
if interactive:
inter_plotting.plot_ecg(
ts=ts,
raw=signal,
filtered=filtered,
rpeaks=rpeaks,
templates_ts=ts_tmpl,
templates=templates,
heart_rate_ts=ts_hr,
heart_rate=hr,
path=path,
show=True,
)
else:
plotting.plot_ecg(
ts=ts,
raw=signal,
filtered=filtered,
rpeaks=rpeaks,
templates_ts=ts_tmpl,
templates=templates,
heart_rate_ts=ts_hr,
heart_rate=hr,
units=units,
path=path,
show=True,
)
# output
args = (ts, filtered, rpeaks, ts_tmpl, templates, ts_hr, hr)
names = (
"ts",
"filtered",
"rpeaks",
"templates_ts",
"templates",
"heart_rate_ts",
"heart_rate",
)
return utils.ReturnTuple(args, names)
def _extract_heartbeats(signal=None, rpeaks=None, before=200, after=400):
"""Extract heartbeat templates from an ECG signal, given a list of
R-peak locations.
Parameters
----------
signal : array
Input ECG signal.
rpeaks : array
R-peak location indices.
before : int, optional
Number of samples to include before the R peak.
after : int, optional
Number of samples to include after the R peak.
Returns
-------
templates : array
Extracted heartbeat templates.
rpeaks : array
Corresponding R-peak location indices of the extracted heartbeat
templates.
"""
R = np.sort(rpeaks)
length = len(signal)
templates = []
newR = []
for r in R:
a = r - before
if a < 0:
continue
b = r + after
if b > length:
break
templates.append(signal[a:b])
newR.append(r)
templates = np.array(templates)
newR = np.array(newR, dtype="int")
return templates, newR
[docs]def compare_segmentation(
reference=None, test=None, sampling_rate=1000.0, offset=0, minRR=None, tol=0.05
):
"""Compare the segmentation performance of a list of R-peak positions
against a reference list.
Parameters
----------
reference : array
Reference R-peak location indices.
test : array
Test R-peak location indices.
sampling_rate : int, float, optional
Sampling frequency (Hz).
offset : int, optional
Constant a priori offset (number of samples) between reference and
test R-peak locations.
minRR : float, optional
Minimum admissible RR interval (seconds).
tol : float, optional
Tolerance between corresponding reference and test R-peak
locations (seconds).
Returns
-------
TP : int
Number of true positive R-peaks.
FP : int
Number of false positive R-peaks.
performance : float
Test performance; TP / len(reference).
acc : float
Accuracy rate; TP / (TP + FP).
err : float
Error rate; FP / (TP + FP).
match : list
Indices of the elements of 'test' that match to an R-peak
from 'reference'.
deviation : array
Absolute errors of the matched R-peaks (seconds).
mean_deviation : float
Mean error (seconds).
std_deviation : float
Standard deviation of error (seconds).
mean_ref_ibi : float
Mean of the reference interbeat intervals (seconds).
std_ref_ibi : float
Standard deviation of the reference interbeat intervals (seconds).
mean_test_ibi : float
Mean of the test interbeat intervals (seconds).
std_test_ibi : float
Standard deviation of the test interbeat intervals (seconds).
"""
# check inputs
if reference is None:
raise TypeError(
"Please specify an input reference list of R-peak \
locations."
)
if test is None:
raise TypeError(
"Please specify an input test list of R-peak \
locations."
)
if minRR is None:
minRR = np.inf
sampling_rate = float(sampling_rate)
# ensure numpy
reference = np.array(reference)
test = np.array(test)
# convert to samples
minRR = minRR * sampling_rate
tol = tol * sampling_rate
TP = 0
FP = 0
matchIdx = []
dev = []
for i, r in enumerate(test):
# deviation to closest R in reference
ref = reference[np.argmin(np.abs(reference - (r + offset)))]
error = np.abs(ref - (r + offset))
if error < tol:
TP += 1
matchIdx.append(i)
dev.append(error)
else:
if len(matchIdx) > 0:
bdf = r - test[matchIdx[-1]]
if bdf < minRR:
# false positive, but removable with RR interval check
pass
else:
FP += 1
else:
FP += 1
# convert deviations to time
dev = np.array(dev, dtype="float")
dev /= sampling_rate
nd = len(dev)
if nd == 0:
mdev = np.nan
sdev = np.nan
elif nd == 1:
mdev = np.mean(dev)
sdev = 0.0
else:
mdev = np.mean(dev)
sdev = np.std(dev, ddof=1)
# interbeat interval
th1 = 1.5 # 40 bpm
th2 = 0.3 # 200 bpm
rIBI = np.diff(reference)
rIBI = np.array(rIBI, dtype="float")
rIBI /= sampling_rate
good = np.nonzero((rIBI < th1) & (rIBI > th2))[0]
rIBI = rIBI[good]
nr = len(rIBI)
if nr == 0:
rIBIm = np.nan
rIBIs = np.nan
elif nr == 1:
rIBIm = np.mean(rIBI)
rIBIs = 0.0
else:
rIBIm = np.mean(rIBI)
rIBIs = np.std(rIBI, ddof=1)
tIBI = np.diff(test[matchIdx])
tIBI = np.array(tIBI, dtype="float")
tIBI /= sampling_rate
good = np.nonzero((tIBI < th1) & (tIBI > th2))[0]
tIBI = tIBI[good]
nt = len(tIBI)
if nt == 0:
tIBIm = np.nan
tIBIs = np.nan
elif nt == 1:
tIBIm = np.mean(tIBI)
tIBIs = 0.0
else:
tIBIm = np.mean(tIBI)
tIBIs = np.std(tIBI, ddof=1)
# output
perf = float(TP) / len(reference)
acc = float(TP) / (TP + FP)
err = float(FP) / (TP + FP)
args = (
TP,
FP,
perf,
acc,
err,
matchIdx,
dev,
mdev,
sdev,
rIBIm,
rIBIs,
tIBIm,
tIBIs,
)
names = (
"TP",
"FP",
"performance",
"acc",
"err",
"match",
"deviation",
"mean_deviation",
"std_deviation",
"mean_ref_ibi",
"std_ref_ibi",
"mean_test_ibi",
"std_test_ibi",
)
return utils.ReturnTuple(args, names)
[docs]def correct_rpeaks(signal=None, rpeaks=None, sampling_rate=1000.0, tol=0.05):
"""Correct R-peak locations to the maximum within a tolerance.
Parameters
----------
signal : array
ECG signal.
rpeaks : array
R-peak location indices.
sampling_rate : int, float, optional
Sampling frequency (Hz).
tol : int, float, optional
Correction tolerance (seconds).
Returns
-------
rpeaks : array
Cerrected R-peak location indices.
Notes
-----
* The tolerance is defined as the time interval :math:`[R-tol, R+tol[`.
"""
# check inputs
if signal is None:
raise TypeError("Please specify an input signal.")
if rpeaks is None:
raise TypeError("Please specify the input R-peaks.")
tol = int(tol * sampling_rate)
length = len(signal)
newR = []
for r in rpeaks:
a = r - tol
if a < 0:
continue
b = r + tol
if b > length:
break
newR.append(a + np.argmax(signal[a:b]))
newR = sorted(list(set(newR)))
newR = np.array(newR, dtype="int")
return utils.ReturnTuple((newR,), ("rpeaks",))
[docs]def ssf_segmenter(
signal=None, sampling_rate=1000.0, threshold=20, before=0.03, after=0.01
):
"""ECG R-peak segmentation based on the Slope Sum Function (SSF).
Parameters
----------
signal : array
Input filtered ECG signal.
sampling_rate : int, float, optional
Sampling frequency (Hz).
threshold : float, optional
SSF threshold.
before : float, optional
Search window size before R-peak candidate (seconds).
after : float, optional
Search window size after R-peak candidate (seconds).
Returns
-------
rpeaks : array
R-peak location indices.
"""
# check inputs
if signal is None:
raise TypeError("Please specify an input signal.")
# convert to samples
winB = int(before * sampling_rate)
winA = int(after * sampling_rate)
Rset = set()
length = len(signal)
# diff
dx = np.diff(signal)
dx[dx >= 0] = 0
dx = dx**2
# detection
(idx,) = np.nonzero(dx > threshold)
idx0 = np.hstack(([0], idx))
didx = np.diff(idx0)
# search
sidx = idx[didx > 1]
for item in sidx:
a = item - winB
if a < 0:
a = 0
b = item + winA
if b > length:
continue
r = np.argmax(signal[a:b]) + a
Rset.add(r)
# output
rpeaks = list(Rset)
rpeaks.sort()
rpeaks = np.array(rpeaks, dtype="int")
return utils.ReturnTuple((rpeaks,), ("rpeaks",))
[docs]def christov_segmenter(signal=None, sampling_rate=1000.0):
"""ECG R-peak segmentation algorithm.
Follows the approach by Christov [Chri04]_.
Parameters
----------
signal : array
Input filtered ECG signal.
sampling_rate : int, float, optional
Sampling frequency (Hz).
Returns
-------
rpeaks : array
R-peak location indices.
References
----------
.. [Chri04] Ivaylo I. Christov, "Real time electrocardiogram QRS
detection using combined adaptive threshold", BioMedical Engineering
OnLine 2004, vol. 3:28, 2004
"""
# check inputs
if signal is None:
raise TypeError("Please specify an input signal.")
length = len(signal)
# algorithm parameters
v100ms = int(0.1 * sampling_rate)
v50ms = int(0.050 * sampling_rate)
v300ms = int(0.300 * sampling_rate)
v350ms = int(0.350 * sampling_rate)
v200ms = int(0.2 * sampling_rate)
v1200ms = int(1.2 * sampling_rate)
M_th = 0.4 # paper is 0.6
# Pre-processing
# 1. Moving averaging filter for power-line interference suppression:
# averages samples in one period of the powerline
# interference frequency with a first zero at this frequency.
b = np.ones(int(0.02 * sampling_rate)) / 50.0
a = [1]
X = ss.filtfilt(b, a, signal)
# 2. Moving averaging of samples in 28 ms interval for electromyogram
# noise suppression a filter with first zero at about 35 Hz.
b = np.ones(int(sampling_rate / 35.0)) / 35.0
X = ss.filtfilt(b, a, X)
X, _, _ = st.filter_signal(
signal=X,
ftype="butter",
band="lowpass",
order=7,
frequency=40.0,
sampling_rate=sampling_rate,
)
X, _, _ = st.filter_signal(
signal=X,
ftype="butter",
band="highpass",
order=7,
frequency=9.0,
sampling_rate=sampling_rate,
)
k, Y, L = 1, [], len(X)
for n in range(k + 1, L - k):
Y.append(X[n] ** 2 - X[n - k] * X[n + k])
Y = np.array(Y)
Y[Y < 0] = 0
# Complex lead
# Y = abs(scipy.diff(X)) # 1-lead
# 3. Moving averaging of a complex lead (the sintesis is
# explained in the next section) in 40 ms intervals a filter
# with first zero at about 25 Hz. It is suppressing the noise
# magnified by the differentiation procedure used in the
# process of the complex lead sintesis.
b = np.ones(int(sampling_rate / 25.0)) / 25.0
Y = ss.lfilter(b, a, Y)
# Init
MM = M_th * np.max(Y[: int(5 * sampling_rate)]) * np.ones(5)
MMidx = 0
M = np.mean(MM)
slope = np.linspace(1.0, 0.6, int(sampling_rate))
Rdec = 0
R = 0
RR = np.zeros(5)
RRidx = 0
Rm = 0
QRS = []
Rpeak = []
current_sample = 0
skip = False
F = np.mean(Y[:v350ms])
# Go through each sample
while current_sample < len(Y):
if QRS:
# No detection is allowed 200 ms after the current one. In
# the interval QRS to QRS+200ms a new value of M5 is calculated: newM5 = 0.6*max(Yi)
if current_sample <= QRS[-1] + v200ms:
Mnew = M_th * max(Y[QRS[-1] : QRS[-1] + v200ms])
# The estimated newM5 value can become quite high, if
# steep slope premature ventricular contraction or artifact
# appeared, and for that reason it is limited to newM5 = 1.1*M5 if newM5 > 1.5* M5
# The MM buffer is refreshed excluding the oldest component, and including M5 = newM5.
Mnew = Mnew if Mnew <= 1.5 * MM[MMidx - 1] else 1.1 * MM[MMidx - 1]
MM[MMidx] = Mnew
MMidx = np.mod(MMidx + 1, 5)
# M is calculated as an average value of MM.
Mtemp = np.mean(MM)
M = Mtemp
skip = True
# M is decreased in an interval 200 to 1200 ms following
# the last QRS detection at a low slope, reaching 60 % of its
# refreshed value at 1200 ms.
elif (
current_sample >= QRS[-1] + v200ms
and current_sample < QRS[-1] + v1200ms
):
M = Mtemp * slope[current_sample - QRS[-1] - v200ms]
# After 1200 ms M remains unchanged.
# R = 0 V in the interval from the last detected QRS to 2/3 of the expected Rm.
if current_sample >= QRS[-1] and current_sample < QRS[-1] + (2 / 3.0) * Rm:
R = 0
# In the interval QRS + Rm * 2/3 to QRS + Rm, R decreases
# 1.4 times slower then the decrease of the previously discussed
# steep slope threshold (M in the 200 to 1200 ms interval).
elif (
current_sample >= QRS[-1] + (2 / 3.0) * Rm
and current_sample < QRS[-1] + Rm
):
R += Rdec
# After QRS + Rm the decrease of R is stopped
# MFR = M + F + R
MFR = M + F + R
# QRS or beat complex is detected if Yi = MFR
if not skip and Y[current_sample] >= MFR:
QRS += [current_sample]
Rpeak += [QRS[-1] + np.argmax(Y[QRS[-1] : QRS[-1] + v300ms])]
if len(QRS) >= 2:
# A buffer with the 5 last RR intervals is updated at any new QRS detection.
RR[RRidx] = QRS[-1] - QRS[-2]
RRidx = np.mod(RRidx + 1, 5)
skip = False
# With every signal sample, F is updated adding the maximum
# of Y in the latest 50 ms of the 350 ms interval and
# subtracting maxY in the earliest 50 ms of the interval.
if current_sample >= v350ms:
Y_latest50 = Y[current_sample - v50ms : current_sample]
Y_earliest50 = Y[current_sample - v350ms : current_sample - v300ms]
F += (max(Y_latest50) - max(Y_earliest50)) / 1000.0
# Rm is the mean value of the buffer RR.
Rm = np.mean(RR)
current_sample += 1
rpeaks = []
for i in Rpeak:
a, b = i - v100ms, i + v100ms
if a < 0:
a = 0
if b > length:
b = length
rpeaks.append(np.argmax(signal[a:b]) + a)
rpeaks = sorted(list(set(rpeaks)))
rpeaks = np.array(rpeaks, dtype="int")
return utils.ReturnTuple((rpeaks,), ("rpeaks",))
[docs]def engzee_segmenter(signal=None, sampling_rate=1000.0, threshold=0.48):
"""ECG R-peak segmentation algorithm.
Follows the approach by Engelse and Zeelenberg [EnZe79]_ with the
modifications by Lourenco *et al.* [LSLL12]_.
Parameters
----------
signal : array
Input filtered ECG signal.
sampling_rate : int, float, optional
Sampling frequency (Hz).
threshold : float, optional
Detection threshold.
Returns
-------
rpeaks : array
R-peak location indices.
References
----------
.. [EnZe79] W. Engelse and C. Zeelenberg, "A single scan algorithm for
QRS detection and feature extraction", IEEE Comp. in Cardiology,
vol. 6, pp. 37-42, 1979
.. [LSLL12] A. Lourenco, H. Silva, P. Leite, R. Lourenco and A. Fred,
"Real Time Electrocardiogram Segmentation for Finger Based ECG
Biometrics", BIOSIGNALS 2012, pp. 49-54, 2012
"""
# check inputs
if signal is None:
raise TypeError("Please specify an input signal.")
# algorithm parameters
changeM = int(0.75 * sampling_rate)
Miterate = int(1.75 * sampling_rate)
v250ms = int(0.25 * sampling_rate)
v1200ms = int(1.2 * sampling_rate)
v1500ms = int(1.5 * sampling_rate)
v180ms = int(0.18 * sampling_rate)
p10ms = int(np.ceil(0.01 * sampling_rate))
p20ms = int(np.ceil(0.02 * sampling_rate))
err_kill = int(0.01 * sampling_rate)
inc = 1
mmth = threshold
mmp = 0.2
# Differentiator (1)
y1 = [signal[i] - signal[i - 4] for i in range(4, len(signal))]
# Low pass filter (2)
c = [1, 4, 6, 4, 1, -1, -4, -6, -4, -1]
y2 = np.array([np.dot(c, y1[n - 9 : n + 1]) for n in range(9, len(y1))])
y2_len = len(y2)
# vars
MM = mmth * max(y2[:Miterate]) * np.ones(3)
MMidx = 0
Th = np.mean(MM)
NN = mmp * min(y2[:Miterate]) * np.ones(2)
NNidx = 0
ThNew = np.mean(NN)
update = False
nthfpluss = []
rpeaks = []
# Find nthf+ point
while True:
# If a previous intersection was found, continue the analysis from there
if update:
if inc * changeM + Miterate < y2_len:
a = (inc - 1) * changeM
b = inc * changeM + Miterate
Mnew = mmth * max(y2[a:b])
Nnew = mmp * min(y2[a:b])
elif y2_len - (inc - 1) * changeM > v1500ms:
a = (inc - 1) * changeM
Mnew = mmth * max(y2[a:])
Nnew = mmp * min(y2[a:])
if len(y2) - inc * changeM > Miterate:
MM[MMidx] = Mnew if Mnew <= 1.5 * MM[MMidx - 1] else 1.1 * MM[MMidx - 1]
NN[NNidx] = (
Nnew
if abs(Nnew) <= 1.5 * abs(NN[NNidx - 1])
else 1.1 * NN[NNidx - 1]
)
MMidx = np.mod(MMidx + 1, len(MM))
NNidx = np.mod(NNidx + 1, len(NN))
Th = np.mean(MM)
ThNew = np.mean(NN)
inc += 1
update = False
if nthfpluss:
lastp = nthfpluss[-1] + 1
if lastp < (inc - 1) * changeM:
lastp = (inc - 1) * changeM
y22 = y2[lastp : inc * changeM + err_kill]
# find intersection with Th
try:
nthfplus = np.intersect1d(
np.nonzero(y22 > Th)[0], np.nonzero(y22 < Th)[0] - 1
)[0]
except IndexError:
if inc * changeM > len(y2):
break
else:
update = True
continue
# adjust index
nthfplus += int(lastp)
# if a previous R peak was found:
if rpeaks:
# check if intersection is within the 200-1200 ms interval. Modification: 300 ms -> 200 bpm
if nthfplus - rpeaks[-1] > v250ms and nthfplus - rpeaks[-1] < v1200ms:
pass
# if new intersection is within the <200ms interval, skip it. Modification: 300 ms -> 200 bpm
elif nthfplus - rpeaks[-1] < v250ms:
nthfpluss += [nthfplus]
continue
# no previous intersection, find the first one
else:
try:
aux = np.nonzero(
y2[(inc - 1) * changeM : inc * changeM + err_kill] > Th
)[0]
bux = (
np.nonzero(y2[(inc - 1) * changeM : inc * changeM + err_kill] < Th)[
0
]
- 1
)
nthfplus = int((inc - 1) * changeM) + np.intersect1d(aux, bux)[0]
except IndexError:
if inc * changeM > len(y2):
break
else:
update = True
continue
nthfpluss += [nthfplus]
# Define 160ms search region
windowW = np.arange(nthfplus, nthfplus + v180ms)
# Check if the condition y2[n] < Th holds for a specified
# number of consecutive points (experimentally we found this number to be at least 10 points)"
i, f = windowW[0], windowW[-1] if windowW[-1] < len(y2) else -1
hold_points = np.diff(np.nonzero(y2[i:f] < ThNew)[0])
cont = 0
for hp in hold_points:
if hp == 1:
cont += 1
if cont == p10ms - 1: # -1 is because diff eats a sample
max_shift = p20ms # looks for X's max a bit to the right
if nthfpluss[-1] > max_shift:
rpeaks += [np.argmax(signal[i - max_shift : f]) + i - max_shift]
else:
rpeaks += [np.argmax(signal[i:f]) + i]
break
else:
cont = 0
rpeaks = sorted(list(set(rpeaks)))
rpeaks = np.array(rpeaks, dtype="int")
return utils.ReturnTuple((rpeaks,), ("rpeaks",))
[docs]def gamboa_segmenter(signal=None, sampling_rate=1000.0, tol=0.002):
"""ECG R-peak segmentation algorithm.
Follows the approach by Gamboa.
Parameters
----------
signal : array
Input filtered ECG signal.
sampling_rate : int, float, optional
Sampling frequency (Hz).
tol : float, optional
Tolerance parameter.
Returns
-------
rpeaks : array
R-peak location indices.
"""
# check inputs
if signal is None:
raise TypeError("Please specify an input signal.")
# convert to samples
v_100ms = int(0.1 * sampling_rate)
v_300ms = int(0.3 * sampling_rate)
hist, edges = np.histogram(signal, 100, density=True)
TH = 0.01
F = np.cumsum(hist)
v0 = edges[np.nonzero(F > TH)[0][0]]
v1 = edges[np.nonzero(F < (1 - TH))[0][-1]]
nrm = max([abs(v0), abs(v1)])
norm_signal = signal / float(nrm)
d2 = np.diff(norm_signal, 2)
b = np.nonzero((np.diff(np.sign(np.diff(-d2)))) == -2)[0] + 2
b = np.intersect1d(b, np.nonzero(-d2 > tol)[0])
if len(b) < 3:
rpeaks = []
else:
b = b.astype("float")
rpeaks = []
previous = b[0]
for i in b[1:]:
if i - previous > v_300ms:
previous = i
rpeaks.append(np.argmax(signal[int(i) : int(i + v_100ms)]) + i)
rpeaks = sorted(list(set(rpeaks)))
rpeaks = np.array(rpeaks, dtype="int")
return utils.ReturnTuple((rpeaks,), ("rpeaks",))
[docs]def hamilton_segmenter(signal=None, sampling_rate=1000.0):
"""ECG R-peak segmentation algorithm.
Follows the approach by Hamilton [Hami02]_.
Parameters
----------
signal : array
Input filtered ECG signal.
sampling_rate : int, float, optional
Sampling frequency (Hz).
Returns
-------
rpeaks : array
R-peak location indices.
References
----------
.. [Hami02] P.S. Hamilton, "Open Source ECG Analysis Software
Documentation", E.P.Limited, 2002
"""
# check inputs
if signal is None:
raise TypeError("Please specify an input signal.")
sampling_rate = float(sampling_rate)
length = len(signal)
dur = length / sampling_rate
# algorithm parameters
v1s = int(1.0 * sampling_rate)
v100ms = int(0.1 * sampling_rate)
TH_elapsed = np.ceil(0.36 * sampling_rate)
sm_size = int(0.08 * sampling_rate)
init_ecg = 8 # seconds for initialization
if dur < init_ecg:
init_ecg = int(dur)
# filtering
filtered, _, _ = st.filter_signal(
signal=signal,
ftype="butter",
band="lowpass",
order=4,
frequency=25.0,
sampling_rate=sampling_rate,
)
filtered, _, _ = st.filter_signal(
signal=filtered,
ftype="butter",
band="highpass",
order=4,
frequency=3.0,
sampling_rate=sampling_rate,
)
# diff
dx = np.abs(np.diff(filtered, 1) * sampling_rate)
# smoothing
dx, _ = st.smoother(signal=dx, kernel="hamming", size=sm_size, mirror=True)
# buffers
qrspeakbuffer = np.zeros(init_ecg)
noisepeakbuffer = np.zeros(init_ecg)
peak_idx_test = np.zeros(init_ecg)
noise_idx = np.zeros(init_ecg)
rrinterval = sampling_rate * np.ones(init_ecg)
a, b = 0, v1s
all_peaks, _ = st.find_extrema(signal=dx, mode="max")
for i in range(init_ecg):
peaks, values = st.find_extrema(signal=dx[a:b], mode="max")
try:
ind = np.argmax(values)
except ValueError:
pass
else:
# peak amplitude
qrspeakbuffer[i] = values[ind]
# peak location
peak_idx_test[i] = peaks[ind] + a
a += v1s
b += v1s
# thresholds
ANP = np.median(noisepeakbuffer)
AQRSP = np.median(qrspeakbuffer)
TH = 0.475
DT = ANP + TH * (AQRSP - ANP)
DT_vec = []
indexqrs = 0
indexnoise = 0
indexrr = 0
npeaks = 0
offset = 0
beats = []
# detection rules
# 1 - ignore all peaks that precede or follow larger peaks by less than 200ms
lim = int(np.ceil(0.2 * sampling_rate))
diff_nr = int(np.ceil(0.045 * sampling_rate))
bpsi, bpe = offset, 0
for f in all_peaks:
DT_vec += [DT]
# 1 - Checking if f-peak is larger than any peak following or preceding it by less than 200 ms
peak_cond = np.array(
(all_peaks > f - lim) * (all_peaks < f + lim) * (all_peaks != f)
)
peaks_within = all_peaks[peak_cond]
if peaks_within.any() and (max(dx[peaks_within]) > dx[f]):
continue
# 4 - If the peak is larger than the detection threshold call it a QRS complex, otherwise call it noise
if dx[f] > DT:
# 2 - look for both positive and negative slopes in raw signal
if f < diff_nr:
diff_now = np.diff(signal[0 : f + diff_nr])
elif f + diff_nr >= len(signal):
diff_now = np.diff(signal[f - diff_nr : len(dx)])
else:
diff_now = np.diff(signal[f - diff_nr : f + diff_nr])
diff_signer = diff_now[diff_now > 0]
if len(diff_signer) == 0 or len(diff_signer) == len(diff_now):
continue
# RR INTERVALS
if npeaks > 0:
# 3 - in here we check point 3 of the Hamilton paper
# that is, we check whether our current peak is a valid R-peak.
prev_rpeak = beats[npeaks - 1]
elapsed = f - prev_rpeak
# if the previous peak was within 360 ms interval
if elapsed < TH_elapsed:
# check current and previous slopes
if prev_rpeak < diff_nr:
diff_prev = np.diff(signal[0 : prev_rpeak + diff_nr])
elif prev_rpeak + diff_nr >= len(signal):
diff_prev = np.diff(signal[prev_rpeak - diff_nr : len(dx)])
else:
diff_prev = np.diff(
signal[prev_rpeak - diff_nr : prev_rpeak + diff_nr]
)
slope_now = max(diff_now)
slope_prev = max(diff_prev)
if slope_now < 0.5 * slope_prev:
# if current slope is smaller than half the previous one, then it is a T-wave
continue
if dx[f] < 3.0 * np.median(qrspeakbuffer): # avoid retarded noise peaks
beats += [int(f) + bpsi]
else:
continue
if bpe == 0:
rrinterval[indexrr] = beats[npeaks] - beats[npeaks - 1]
indexrr += 1
if indexrr == init_ecg:
indexrr = 0
else:
if beats[npeaks] > beats[bpe - 1] + v100ms:
rrinterval[indexrr] = beats[npeaks] - beats[npeaks - 1]
indexrr += 1
if indexrr == init_ecg:
indexrr = 0
elif dx[f] < 3.0 * np.median(qrspeakbuffer):
beats += [int(f) + bpsi]
else:
continue
npeaks += 1
qrspeakbuffer[indexqrs] = dx[f]
peak_idx_test[indexqrs] = f
indexqrs += 1
if indexqrs == init_ecg:
indexqrs = 0
if dx[f] <= DT:
# 4 - not valid
# 5 - If no QRS has been detected within 1.5 R-to-R intervals,
# there was a peak that was larger than half the detection threshold,
# and the peak followed the preceding detection by at least 360 ms,
# classify that peak as a QRS complex
tf = f + bpsi
# RR interval median
RRM = np.median(rrinterval) # initial values are good?
if len(beats) >= 2:
elapsed = tf - beats[npeaks - 1]
if elapsed >= 1.5 * RRM and elapsed > TH_elapsed:
if dx[f] > 0.5 * DT:
beats += [int(f) + offset]
# RR INTERVALS
if npeaks > 0:
rrinterval[indexrr] = beats[npeaks] - beats[npeaks - 1]
indexrr += 1
if indexrr == init_ecg:
indexrr = 0
npeaks += 1
qrspeakbuffer[indexqrs] = dx[f]
peak_idx_test[indexqrs] = f
indexqrs += 1
if indexqrs == init_ecg:
indexqrs = 0
else:
noisepeakbuffer[indexnoise] = dx[f]
noise_idx[indexnoise] = f
indexnoise += 1
if indexnoise == init_ecg:
indexnoise = 0
else:
noisepeakbuffer[indexnoise] = dx[f]
noise_idx[indexnoise] = f
indexnoise += 1
if indexnoise == init_ecg:
indexnoise = 0
# Update Detection Threshold
ANP = np.median(noisepeakbuffer)
AQRSP = np.median(qrspeakbuffer)
DT = ANP + 0.475 * (AQRSP - ANP)
beats = np.array(beats)
r_beats = []
thres_ch = 0.85
adjacency = 0.05 * sampling_rate
for i in beats:
error = [False, False]
if i - lim < 0:
window = signal[0 : i + lim]
add = 0
elif i + lim >= length:
window = signal[i - lim : length]
add = i - lim
else:
window = signal[i - lim : i + lim]
add = i - lim
# meanval = np.mean(window)
w_peaks, _ = st.find_extrema(signal=window, mode="max")
w_negpeaks, _ = st.find_extrema(signal=window, mode="min")
zerdiffs = np.where(np.diff(window) == 0)[0]
w_peaks = np.concatenate((w_peaks, zerdiffs))
w_negpeaks = np.concatenate((w_negpeaks, zerdiffs))
pospeaks = sorted(zip(window[w_peaks], w_peaks), reverse=True)
negpeaks = sorted(zip(window[w_negpeaks], w_negpeaks))
try:
twopeaks = [pospeaks[0]]
except IndexError:
twopeaks = []
try:
twonegpeaks = [negpeaks[0]]
except IndexError:
twonegpeaks = []
# getting positive peaks
for i in range(len(pospeaks) - 1):
if abs(pospeaks[0][1] - pospeaks[i + 1][1]) > adjacency:
twopeaks.append(pospeaks[i + 1])
break
try:
posdiv = abs(twopeaks[0][0] - twopeaks[1][0])
except IndexError:
error[0] = True
# getting negative peaks
for i in range(len(negpeaks) - 1):
if abs(negpeaks[0][1] - negpeaks[i + 1][1]) > adjacency:
twonegpeaks.append(negpeaks[i + 1])
break
try:
negdiv = abs(twonegpeaks[0][0] - twonegpeaks[1][0])
except IndexError:
error[1] = True
# choosing type of R-peak
n_errors = sum(error)
try:
if not n_errors:
if posdiv > thres_ch * negdiv:
# pos noerr
r_beats.append(twopeaks[0][1] + add)
else:
# neg noerr
r_beats.append(twonegpeaks[0][1] + add)
elif n_errors == 2:
if abs(twopeaks[0][1]) > abs(twonegpeaks[0][1]):
# pos allerr
r_beats.append(twopeaks[0][1] + add)
else:
# neg allerr
r_beats.append(twonegpeaks[0][1] + add)
elif error[0]:
# pos poserr
r_beats.append(twopeaks[0][1] + add)
else:
# neg negerr
r_beats.append(twonegpeaks[0][1] + add)
except IndexError:
continue
rpeaks = sorted(list(set(r_beats)))
rpeaks = np.array(rpeaks, dtype="int")
return utils.ReturnTuple((rpeaks,), ("rpeaks",))
[docs]def ASI_segmenter(signal=None, sampling_rate=1000.0, Pth=5.0):
"""ECG R-peak segmentation algorithm.
Parameters
----------
signal : array
Input ECG signal.
sampling_rate : int, float, optional
Sampling frequency (Hz).
Pth : int, float, optional
Free parameter used in exponential decay
Returns
-------
rpeaks : array
R-peak location indices.
References
----------
Modification by Tiago Rodrigues, based on:
[R. Gutiérrez-rivas 2015] Novel Real-Time Low-Complexity QRS Complex Detector
Based on Adaptive Thresholding. Vol. 15,no. 10, pp. 6036–6043, 2015.
[D. Sadhukhan] R-Peak Detection Algorithm for Ecg using Double Difference
And RRInterval Processing. Procedia Technology, vol. 4, pp. 873–877, 2012.
"""
N = round(3 * sampling_rate / 128)
Nd = N - 1
Rmin = 0.26
rpeaks = []
i = 1
tf = len(signal)
Ramptotal = 0
# Double derivative squared
diff_ecg = [signal[i] - signal[i - Nd] for i in range(Nd, len(signal))]
ddiff_ecg = [diff_ecg[i] - diff_ecg[i - 1] for i in range(1, len(diff_ecg))]
squar = np.square(ddiff_ecg)
# Integrate moving window
b = np.array(np.ones(N))
a = [1]
processed_ecg = ss.lfilter(b, a, squar)
# R-peak finder FSM
while i < tf - sampling_rate: # ignore last second of recording
# State 1: looking for maximum
tf1 = round(i + Rmin * sampling_rate)
Rpeakamp = 0
while i < tf1:
# Rpeak amplitude and position
if processed_ecg[i] > Rpeakamp:
Rpeakamp = processed_ecg[i]
rpeakpos = i + 1
i += 1
Ramptotal = (19 / 20) * Ramptotal + (1 / 20) * Rpeakamp
rpeaks.append(rpeakpos)
# State 2: waiting state
d = tf1 - rpeakpos
tf2 = i + round(0.2 * 250 - d)
while i <= tf2:
i += 1
# State 3: decreasing threshold
Thr = Ramptotal
while processed_ecg[i] < Thr:
Thr = Thr * math.exp(-Pth / sampling_rate)
i += 1
return utils.ReturnTuple((rpeaks,), ("rpeaks",))
[docs]def getQPositions(ecg_proc=None, show=False):
"""Different ECG Waves (Q, R, S, ...) are not present or are not so clear to identify in all ECG signals (I II III V1 V2 V3, ...)
For Q wave we suggest to use signals I, aVL . Avoid II, III, V1, V2, V3, V4, aVR, aVF
Parameters
----------
signal : object
object return by the function ecg.
show : bool, optional
If True, show a plot of the Q Positions on every signal sample/template.
Returns
-------
Q_positions : array
Array with all Q positions on the signal
Q_start_ positions : array
Array with all Q start positions on the signal
"""
templates_ts = ecg_proc["templates_ts"]
template_r_position = np.argmin(np.abs(templates_ts - 0)) # R peak on the template is always on time instant 0 seconds
Q_positions = []
Q_start_positions = []
Q_positions_template = []
Q_start_positions_template = []
for n, each in enumerate(ecg_proc["templates"]):
# Get Q Position
template_left = each[0 : template_r_position + 1]
mininums_from_template_left = argrelextrema(template_left, np.less)
try:
Q_position = ecg_proc["rpeaks"][n] - (
template_r_position - mininums_from_template_left[0][-1]
)
Q_positions.append(Q_position)
Q_positions_template.append(mininums_from_template_left[0][-1])
except:
pass
# Get Q start position
template_Q_left = each[0 : mininums_from_template_left[0][-1] + 1]
maximum_from_template_Q_left = argrelextrema(template_Q_left, np.greater)
try:
Q_start_position = (
ecg_proc["rpeaks"][n]
- template_r_position
+ maximum_from_template_Q_left[0][-1]
)
Q_start_positions.append(Q_start_position)
Q_start_positions_template.append(maximum_from_template_Q_left[0][-1])
except:
pass
if show:
plt.figure()
plt.plot(ecg_proc["templates"].T)
plt.axvline(x=template_r_position, color="r", label="R peak")
plt.axvline(x=Q_positions_template[0],color="yellow",label="Q positions")
for position in range(1,len(Q_positions_template)):
plt.axvline(
x=Q_positions_template[position],
color="yellow",
)
plt.axvline(x=Q_start_positions_template[0],color="green",label="Q Start positions")
for position in range(1,len(Q_start_positions_template)):
plt.axvline(
x=Q_start_positions_template[position],
color="green",
)
plt.legend()
plt.show()
Q_positions = np.array(Q_positions)
Q_start_positions = np.array(Q_start_positions)
return utils.ReturnTuple((Q_positions, Q_start_positions,), ("Q_positions","Q_start_positions",))
[docs]def getSPositions(ecg_proc=None, show=False):
"""Different ECG Waves (Q, R, S, ...) are not present or are not so clear to identify in all ECG signals (I II III V1 V2 V3, ...)
For S wave we suggest to use signals V1, V2, V3. Avoid I, V5, V6, aVR, aVL
Parameters
----------
signal : object
object return by the function ecg.
show : bool, optional
If True, show a plot of the S Positions on every signal sample/template.
Returns
-------
S_positions : array
Array with all S positions on the signal
S_end_ positions : array
Array with all S end positions on the signal
"""
templates_ts = ecg_proc["templates_ts"]
template_r_position = np.argmin(np.abs(templates_ts - 0)) # R peak on the template is always on time instant 0 seconds
S_positions = []
S_end_positions = []
S_positions_template = []
S_end_positions_template = []
template_size = len(ecg_proc["templates"][0])
for n, each in enumerate(ecg_proc["templates"]):
# Get S Position
template_right = each[template_r_position : template_size + 1]
mininums_from_template_right = argrelextrema(template_right, np.less)
try:
S_position = ecg_proc["rpeaks"][n] + mininums_from_template_right[0][0]
S_positions.append(S_position)
S_positions_template.append(template_r_position + mininums_from_template_right[0][0])
except:
pass
# Get S end position
maximums_from_template_right = argrelextrema(template_right, np.greater)
try:
S_end_position = ecg_proc["rpeaks"][n] + maximums_from_template_right[0][0]
S_end_positions.append(S_end_position)
S_end_positions_template.append(template_r_position + maximums_from_template_right[0][0])
except:
pass
if show:
plt.figure()
plt.plot(ecg_proc["templates"].T)
plt.axvline(x=template_r_position, color="r", label="R peak")
plt.axvline(x=S_positions_template[0],color="yellow",label="S positions")
for position in range(1,len(S_positions_template)):
plt.axvline(
x=S_positions_template[position],
color="yellow",
)
plt.axvline(x=S_end_positions_template[0],color="green",label="S end positions")
for position in range(1,len(S_end_positions_template)):
plt.axvline(
x=S_end_positions_template[position],
color="green",
)
plt.legend()
plt.show()
S_positions = np.array(S_positions)
S_end_positions = np.array(S_end_positions)
return utils.ReturnTuple((S_positions, S_end_positions,), ("S_positions","S_end_positions",))
[docs]def getPPositions(ecg_proc=None, show=False):
"""Different ECG Waves (Q, R, S, ...) are not present or are not so clear to identify in all ECG signals (I II III V1 V2 V3, ...)
For P wave we suggest to use signals II, V1, aVF . Avoid I, III, V1, V2, V3, V4, V5, AVL
Parameters
----------
signal : object
object return by the function ecg.
show : bool, optional
If True, show a plot of the P Positions on every signal sample/template.
Returns
-------
P_positions : array
Array with all P positions on the signal
P_start_ positions : array
Array with all P start positions on the signal
P_end_ positions : array
Array with all P end positions on the signal
"""
templates_ts = ecg_proc["templates_ts"]
# R peak on the template is always on time instant 0 seconds
template_r_position = np.argmin(np.abs(templates_ts - 0))
# the P wave end is approximately 0.04 seconds before the R peak
template_p_position_max = np.argmin(np.abs(templates_ts - (-0.04)))
P_positions = []
P_start_positions = []
P_end_positions = []
P_positions_template = []
P_start_positions_template = []
P_end_positions_template = []
for n, each in enumerate(ecg_proc["templates"]):
# Get P position
template_left = each[0 : template_p_position_max + 1]
max_from_template_left = np.argmax(template_left)
# print("P Position=" + str(max_from_template_left))
P_position = (
ecg_proc["rpeaks"][n] - template_r_position + max_from_template_left
)
P_positions.append(P_position)
P_positions_template.append(max_from_template_left)
# Get P start position
template_P_left = each[0 : max_from_template_left + 1]
mininums_from_template_left = argrelextrema(template_P_left, np.less)
# print("P start position=" + str(mininums_from_template_left[0][-1]))
try:
P_start_position = (
ecg_proc["rpeaks"][n]
- template_r_position
+ mininums_from_template_left[0][-1]
)
P_start_positions.append(P_start_position)
P_start_positions_template.append(mininums_from_template_left[0][-1])
except:
pass
# Get P end position
template_P_right = each[max_from_template_left : template_p_position_max + 1]
mininums_from_template_right = argrelextrema(template_P_right, np.less)
try:
P_end_position = (
ecg_proc["rpeaks"][n]
- template_r_position
+ max_from_template_left
+ mininums_from_template_right[0][0]
)
P_end_positions.append(P_end_position)
P_end_positions_template.append(max_from_template_left + mininums_from_template_right[0][0])
except:
pass
if show:
plt.figure()
plt.plot(ecg_proc["templates"].T)
plt.axvline(x=template_r_position, color="r", label="R peak")
plt.axvline(x=P_positions_template[0],color="yellow",label="P positions")
for position in range(1,len(P_positions_template)):
plt.axvline(
x=P_positions_template[position],
color="yellow",
)
plt.axvline(x=P_start_positions_template[0],color="green",label="P starts")
for position in range(1,len(P_start_positions_template)):
plt.axvline(
x=P_start_positions_template[position],
color="green",
)
plt.axvline(x=P_end_positions_template[0],color="green",label="P ends")
for position in range(1,len(P_end_positions_template)):
plt.axvline(
x=P_end_positions_template[position],
color="green",
)
plt.legend()
plt.show()
P_positions = np.array(P_positions)
P_start_positions = np.array(P_start_positions)
P_end_positions = np.array(P_end_positions)
return utils.ReturnTuple((P_positions, P_start_positions, P_end_positions,), ("P_positions","P_start_positions","P_end_positions",))
[docs]def getTPositions(ecg_proc=None, show=False):
"""Different ECG Waves (Q, R, S, ...) are not present or are not so clear to identify in all ECG signals (I II III V1 V2 V3, ...)
For T wave we suggest to use signals V4, v5 (II, V3 have good results, but in less accuracy) . Avoid I, V1, V2, aVR, aVL
Parameters
----------
signal : object
object return by the function ecg.
show : bool, optional
If True, show a plot of the T Positions on every signal sample/template.
Returns
-------
T_positions : array
Array with all T positions on the signal
T_start_ positions : array
Array with all T start positions on the signal
T_end_ positions : array
Array with all T end positions on the signal
"""
templates_ts = ecg_proc["templates_ts"]
# R peak on the template is always on time instant 0 seconds
template_r_position = np.argmin(np.abs(templates_ts - 0))
# the T wave start is approximately 0.14 seconds after R-peak
template_T_position_min = np.argmin(np.abs(templates_ts - 0.14))
T_positions = []
T_start_positions = []
T_end_positions = []
T_positions_template = []
T_start_positions_template = []
T_end_positions_template = []
for n, each in enumerate(ecg_proc["templates"]):
# Get T position
template_right = each[template_T_position_min:]
max_from_template_right = np.argmax(template_right)
# print("T Position=" + str(template_T_position_min + max_from_template_right))
T_position = (
ecg_proc["rpeaks"][n]
- template_r_position
+ template_T_position_min
+ max_from_template_right
)
T_positions.append(T_position)
T_positions_template.append(template_T_position_min + max_from_template_right)
# Get T start position
template_T_left = each[
template_r_position : template_T_position_min + max_from_template_right
]
min_from_template_T_left = argrelextrema(template_T_left, np.less)
try:
T_start_position = ecg_proc["rpeaks"][n] + min_from_template_T_left[0][-1]
T_start_positions.append(T_start_position)
T_start_positions_template.append(template_r_position + min_from_template_T_left[0][-1])
except:
pass
# Get T end position
template_T_right = each[template_T_position_min + max_from_template_right :]
mininums_from_template_T_right = argrelextrema(template_T_right, np.less)
try:
T_end_position = (
ecg_proc["rpeaks"][n]
- template_r_position
+ template_T_position_min
+ max_from_template_right
+ mininums_from_template_T_right[0][0]
)
T_end_positions.append(T_end_position)
T_end_positions_template.append(template_T_position_min+ max_from_template_right+ mininums_from_template_T_right[0][0])
except:
pass
if show:
plt.figure()
plt.plot(ecg_proc["templates"].T)
plt.axvline(x=template_r_position, color="r", label="R peak")
plt.axvline(x=T_positions_template[0],color="yellow",label="T positions")
for position in range(1,len(T_positions_template)):
plt.axvline(
x=T_positions_template[position],
color="yellow",
)
plt.axvline(x=T_start_positions_template[0],color="green",label="T starts")
for position in range(1,len(T_start_positions_template)):
plt.axvline(
x=T_start_positions_template[position],
color="green",
)
plt.axvline(x=T_end_positions_template[0],color="green",label="T ends")
for position in range(1,len(T_end_positions_template)):
plt.axvline(
x=T_end_positions_template[position],
color="green",
)
plt.legend()
plt.show()
T_positions = np.array(T_positions)
T_start_positions = np.array(T_start_positions)
T_end_positions = np.array(T_end_positions)
return utils.ReturnTuple((T_positions, T_start_positions, T_end_positions,), ("T_positions","T_start_positions","T_end_positions",))
[docs]def bSQI(detector_1, detector_2, fs=1000.0, mode="simple", search_window=150):
"""Comparison of the output of two detectors.
Parameters
----------
detector_1 : array
Output of the first detector.
detector_2 : array
Output of the second detector.
fs: int, optional
Sampling rate, in Hz.
mode : str, optional
If 'simple', return only the percentage of beats detected by both. If 'matching', return the peak matching degree.
If 'n_double' returns the number of matches divided by the sum of all minus the matches.
search_window : int, optional
Search window around each peak, in ms.
Returns
-------
bSQI : float
Performance of both detectors.
"""
if detector_1 is None or detector_2 is None:
raise TypeError("Input Error, check detectors outputs")
search_window = int(search_window / 1000 * fs)
both = 0
for i in detector_1:
for j in range(max([0, i - search_window]), i + search_window):
if j in detector_2:
both += 1
break
if mode == "simple":
return (both / len(detector_1)) * 100
elif mode == "matching":
return (2 * both) / (len(detector_1) + len(detector_2))
elif mode == "n_double":
return both / (len(detector_1) + len(detector_2) - both)
[docs]def sSQI(signal):
"""Return the skewness of the signal
Parameters
----------
signal : array
ECG signal.
Returns
-------
skewness : float
Skewness value.
"""
if signal is None:
raise TypeError("Please specify an input signal")
return stats.skew(signal)
[docs]def kSQI(signal, fisher=True):
"""Return the kurtosis of the signal
Parameters
----------
signal : array
ECG signal.
fisher : bool, optional
If True,Fisher’s definition is used (normal ==> 0.0). If False, Pearson’s definition is used (normal ==> 3.0).
Returns
-------
kurtosis : float
Kurtosis value.
"""
if signal is None:
raise TypeError("Please specify an input signal")
return stats.kurtosis(signal, fisher=fisher)
[docs]def pSQI(signal, f_thr=0.01):
"""Return the flatline percentage of the signal
Parameters
----------
signal : array
ECG signal.
f_thr : float, optional
Flatline threshold, in mV / sample
Returns
-------
flatline_percentage : float
Percentage of signal where the absolute value of the derivative is lower then the threshold.
"""
if signal is None:
raise TypeError("Please specify an input signal")
diff = np.diff(signal)
length = len(diff)
flatline = np.where(abs(diff) < f_thr)[0]
return (len(flatline) / length) * 100
[docs]def fSQI(
ecg_signal,
fs=1000.0,
nseg=1024,
num_spectrum=[5, 20],
dem_spectrum=None,
mode="simple",
):
"""Returns the ration between two frequency power bands.
Parameters
----------
ecg_signal : array
ECG signal.
fs : float, optional
ECG sampling frequency, in Hz.
nseg : int, optional
Frequency axis resolution.
num_spectrum : array, optional
Frequency bandwidth for the ratio's numerator, in Hz.
dem_spectrum : array, optional
Frequency bandwidth for the ratio's denominator, in Hz. If None, then the whole spectrum is used.
mode : str, optional
If 'simple' just do the ration, if is 'bas', then do 1 - num_power.
Returns
-------
Ratio : float
Ratio between two powerbands.
"""
def power_in_range(f_range, f, Pxx_den):
_indexes = np.where((f >= f_range[0]) & (f <= f_range[1]))[0]
_power = integrate.trapz(Pxx_den[_indexes], f[_indexes])
return _power
if ecg_signal is None:
raise TypeError("Please specify an input signal")
f, Pxx_den = ss.welch(ecg_signal, fs, nperseg=nseg)
num_power = power_in_range(num_spectrum, f, Pxx_den)
if dem_spectrum is None:
dem_power = power_in_range([0, float(fs / 2.0)], f, Pxx_den)
else:
dem_power = power_in_range(dem_spectrum, f, Pxx_den)
if mode == "simple":
return num_power / dem_power
elif mode == "bas":
return 1 - num_power / dem_power
[docs]def ZZ2018(
signal, detector_1, detector_2, fs=1000, search_window=100, nseg=1024, mode="simple"
):
import numpy as np
""" Signal quality estimator. Designed for signal with a lenght of 10 seconds.
Follows the approach by Zhao *et la.* [Zhao18]_.
Parameters
----------
signal : array
Input ECG signal in mV.
detector_1 : array
Input of the first R peak detector.
detector_2 : array
Input of the second R peak detector.
fs : int, float, optional
Sampling frequency (Hz).
search_window : int, optional
Search window around each peak, in ms.
nseg : int, optional
Frequency axis resolution.
mode : str, optional
If 'simple', simple heurisitc. If 'fuzzy', employ a fuzzy classifier.
Returns
-------
noise : str
Quality classification.
References
----------
.. [Zhao18] Zhao, Z., & Zhang, Y. (2018).
SQI quality evaluation mechanism of single-lead ECG signal based on simple heuristic fusion and fuzzy comprehensive evaluation.
Frontiers in Physiology, 9, 727.
"""
if len(detector_1) == 0 or len(detector_2) == 0:
return "Unacceptable"
## compute indexes
qsqi = bSQI(
detector_1, detector_2, fs=fs, mode="matching", search_window=search_window
)
psqi = fSQI(signal, fs=fs, nseg=nseg, num_spectrum=[5, 15], dem_spectrum=[5, 40])
ksqi = kSQI(signal)
bassqi = fSQI(
signal, fs=fs, nseg=nseg, num_spectrum=[0, 1], dem_spectrum=[0, 40], mode="bas"
)
if mode == "simple":
## First stage rules (0 = unqualified, 1 = suspicious, 2 = optimal)
## qSQI rules
if qsqi > 0.90:
qsqi_class = 2
elif qsqi < 0.60:
qsqi_class = 0
else:
qsqi_class = 1
## pSQI rules
import numpy as np
## Get the maximum bpm
if len(detector_1) > 1:
RR_max = 60000.0 / (1000.0 / fs * np.min(np.diff(detector_1)))
else:
RR_max = 1
if RR_max < 130:
l1, l2, l3 = 0.5, 0.8, 0.4
else:
l1, l2, l3 = 0.4, 0.7, 0.3
if psqi > l1 and psqi < l2:
pSQI_class = 2
elif psqi > l3 and psqi < l1:
pSQI_class = 1
else:
pSQI_class = 0
## kSQI rules
if ksqi > 5:
kSQI_class = 2
else:
kSQI_class = 0
## basSQI rules
if bassqi >= 0.95:
basSQI_class = 2
elif bassqi < 0.9:
basSQI_class = 0
else:
basSQI_class = 1
class_matrix = np.array([qsqi_class, pSQI_class, kSQI_class, basSQI_class])
n_optimal = len(np.where(class_matrix == 2)[0])
n_suspics = len(np.where(class_matrix == 1)[0])
n_unqualy = len(np.where(class_matrix == 0)[0])
if (
n_unqualy >= 3
or (n_unqualy == 2 and n_suspics >= 1)
or (n_unqualy == 1 and n_suspics == 3)
):
return "Unacceptable"
elif n_optimal >= 3 and n_unqualy == 0:
return "Excellent"
else:
return "Barely acceptable"
elif mode == "fuzzy":
# Transform qSQI range from [0, 1] to [0, 100]
qsqi = qsqi * 100.0
# UqH (Excellent)
if qsqi <= 80:
UqH = 0
elif qsqi >= 90:
UqH = qsqi / 100.0
else:
UqH = 1.0 / (1 + (1 / np.power(0.3 * (qsqi - 80), 2)))
# UqI (Barely acceptable)
UqI = 1.0 / (1 + np.power((qsqi - 75) / 7.5, 2))
# UqJ (unacceptable)
if qsqi <= 55:
UqJ = 1
else:
UqJ = 1.0 / (1 + np.power((qsqi - 55) / 5.0, 2))
# Get R1
R1 = np.array([UqH, UqI, UqJ])
# pSQI
# UpH
if psqi <= 0.25:
UpH = 0
elif psqi >= 0.35:
UpH = 1
else:
UpH = 0.1 * (psqi - 0.25)
# UpI
if psqi < 0.18:
UpI = 0
elif psqi >= 0.32:
UpI = 0
elif psqi >= 0.18 and psqi < 0.22:
UpI = 25 * (psqi - 0.18)
elif psqi >= 0.22 and psqi < 0.28:
UpI = 1
else:
UpI = 25 * (0.32 - psqi)
# UpJ
if psqi < 0.15:
UpJ = 1
elif psqi > 0.25:
UpJ = 0
else:
UpJ = 0.1 * (0.25 - psqi)
# Get R2
R2 = np.array([UpH, UpI, UpJ])
# kSQI
# Get R3
if ksqi > 5:
R3 = np.array([1, 0, 0])
else:
R3 = np.array([0, 0, 1])
# basSQI
# UbH
if bassqi <= 90:
UbH = 0
elif bassqi >= 95:
UbH = bassqi / 100.0
else:
UbH = 1.0 / (1 + (1 / np.power(0.8718 * (bassqi - 90), 2)))
# UbI
if bassqi <= 85:
UbI = 1
else:
UbI = 1.0 / (1 + np.power((bassqi - 85) / 5.0, 2))
# UbJ
UbJ = 1.0 / (1 + np.power((bassqi - 95) / 2.5, 2))
# R4
R4 = np.array([UbH, UbI, UbJ])
# evaluation matrix R
R = np.vstack([R1, R2, R3, R4])
# weight vector W
W = np.array([0.4, 0.4, 0.1, 0.1])
S = np.array(
[np.sum((R[:, 0] * W)), np.sum((R[:, 1] * W)), np.sum((R[:, 2] * W))]
)
# classify
V = np.sum(np.power(S, 2) * [1, 2, 3]) / np.sum(np.power(S, 2))
if V < 1.5:
return "Excellent"
elif V >= 2.40:
return "Unnacceptable"
else:
return "Barely acceptable"