Source code for biosppy.synthesizers.emg
# -*- coding: utf-8 -*-
"""
biosppy.synthesizers.emg
-------------------
This module provides methods to synthesize Electromyographic (EMG) signals.
:copyright: (c) 2015-2021 by Instituto de Telecomunicacoes
:license: BSD 3-clause, see LICENSE for more details.
"""
# Imports
import numbers
# 3rd party
import numpy as np
import warnings
import matplotlib.pyplot as plt
# local
from .. import plotting, utils
[docs]def synth_uniform(duration=10,
length=None,
sampling_rate=1000,
noise=0.01,
baseline = None,
burst_number=1,
burst_duration=1.0,
burst_location = None,
amplitude_mult = None,
random_state=None
): # Default values
"""Generates an artificial (synthetic) EMG signal of a given duration and sampling rate, with muscle activity bursts modeled
as an uniform distribution and background noise modeled as a zero-mean Gaussian process with adjustable standard deviation.
Follows the approach by Diong, Joanna [ModelEMG1], but, additionally, this function also allows to manually choose the muscle
activity burst locations, and add amplitude multipliers for each burst.
If the parameters introduced lead to superimposed burst locations, an error will be raised, and if they lead to consecutive
bursts, a warning will be raised. Warnings will also be raised, if the precision derived from the selected sampling rate is not
compatible with each burst's location or duration, or the duration of quiet periods.
Parameters
----------
duration : int, optional
Desired recording length in seconds.
sampling_rate : int, optional
The desired sampling rate (in Hz, i.e., samples/second).
length : int, optional
The desired length of the signal (in samples).
noise : float, optional
Noise level (standard deviation of the gaussian distribution from which values are sampled).
baseline : float, optional
Signal offset from zero. If no value is given, it is assumed that the baseline has already been removed.
burst_number : int, optional
Desired number of bursts of activity (active muscle periods).
burst_duration : float or list, optional
Duration of the bursts. Can be a float (each burst will have the same duration) or a list of durations for
each burst.
burst_location : list, optional
Location of the bursts (in seconds).
amplitude_mult : float or list, optional
Amplitude multiplier for the bursts. Can be a float (each burst will have the same amplitude range) or a
list of multipliers for each bursts.
random_state : None, int, numpy.random.RandomState or numpy.random.Generator, optional
Seed for the random number generator.
Returns
----------
emg : array
Vector containing the EMG signal.
t : array
Time values accoring to the provided sampling rate.
params : dict
Input parameters of the function, clean EMG and noise signals, and SNR
Examples
----------
sampling_rate = 1000
duration = 10
noise_amplitude = 0.05
bursts = 7
burst_duration = [0.5,1,0.5,0.6,1,0.5,0.5]
burst_location = [0.1,2.5,4,5.5,7,8.5,9.4]
amplitude_mult = [1,1,0.5,1.5,1,0.75,1]
emg_synth, t, params = synth_uniform(duration=duration, sampling_rate=sampling_rate, noise=noise_amplitude,
burst_number=bursts, burst_duration=burst_duration, burst_location=burst_location,
amplitude_mult=amplitude_mult)
# Get muscle activity state
activity = params["activity"]
plt.plot(t,emg_synth,label="EMG")
plt.plot(t,activity,label="Muscle activity")
plt.xlabel("Time (s)")
plt.ylabel("Amplitude (mV)")
plt.grid()
plt.title("EMG")
plt.legend()
plt.show()
References
-----------
.. [ModelEMG1] Joanna DIONG,
"PYTHON: ANALYSING EMG SIGNALS",
https://scientificallysound.org/2016/08/11/python-analysing-emg-signals-part-1/
"""
# Seed the random generator for reproducible results
# If seed is an integer, use the legacy RandomState class
if isinstance(random_state, numbers.Integral):
rng = np.random.RandomState(random_state)
# If seed is already a random number generator class return it as it is
elif isinstance(random_state, (np.random.RandomState, np.random.Generator)):
rng = random_state
# If seed is something else, use the new Generator class
else:
rng = np.random.default_rng(random_state)
if not isinstance(sampling_rate, (int, float)):
raise TypeError("Error! 'sampling_rate' value must be an integer or float.")
if sampling_rate <= 0:
raise ValueError("Error! 'sampling_rate' value must be positive.")
if sampling_rate < 100:
warnings.warn("Sampling rate values below 100 Hz might lead to signals where onsets cannot be detected properly.")
if sampling_rate < 1000:
warnings.warn("Sampling rate values below 1000 Hz will not allow to effectively capture all significant frequency components of the EMG signal.")
if not isinstance(burst_number, int):
raise TypeError("Error! 'burst_number' value must be an integer.")
if burst_number <= 0:
raise ValueError("Error! 'burst_number' value must be positive.")
if not isinstance(noise, (int, float)):
raise TypeError("Error! 'noise' value must be an integer or float.")
if noise < 0:
raise ValueError("Error! 'noise' value must be non-negative.")
if not isinstance(duration, (int, float)):
raise TypeError("Error! 'duration' value must be an integer or float.")
if duration <= 0:
raise ValueError("Error! 'duration' value must be positive.")
# Generate number of samples automatically if length is unspecified
if length is None:
length = duration * sampling_rate + 1
else:
if not isinstance(length, (int, float)):
raise TypeError("Error! 'length' value must be an integer or float.")
if length != duration * sampling_rate + 1:
raise ValueError("Error! Signal length does not match duration with the given sampling rate.")
if baseline is None:
baseline = 0
if not isinstance(baseline, (int, float)):
raise TypeError("Error! Baseline value must be an integer or float.")
if isinstance(burst_duration, (int, float)):
burst_duration = np.repeat(burst_duration, burst_number)
if not all((np.round((dur)/(1/sampling_rate),10)).is_integer() for dur in burst_duration):
warnings.warn("The precision derived from the selected sampling rate is not compatible with the burst duration. This might alter or compromise synthesization of the signals, increasing the sampling rate is recommended.")
if len(burst_duration) != burst_number:
raise ValueError("Error! 'burst_duration' cannot be longer than the value of 'burst_number'.")
total_duration_bursts = np.sum(burst_duration)
if total_duration_bursts > duration:
raise ValueError("Error! The total duration of bursts cannot exceed the total duration of the signal.")
# Number of quiet periods (in between bursts)
n_quiet = burst_number + 1
quiet_duration = np.repeat((duration - total_duration_bursts) / n_quiet, n_quiet)
if burst_location is not None:
if not (isinstance(burst_location, list) and all((isinstance(loc, int) or isinstance(loc, float)) for loc in burst_location)):
raise TypeError("Error! 'burst_location' must be a list of integers or floats.")
if len(burst_location) != burst_number:
raise ValueError("Error! 'burst_location' list cannot be longer than the value of 'burst_number'.")
if not all(np.round(((loc)/(1/sampling_rate)),10).is_integer() for loc in burst_location):
warnings.warn("The precision derived from the selected sampling rate is not compatible with the burst locations. This might alter or compromise synthesization of the signals, increasing the sampling rate or altering the burst location/duration is recommended.")
burst_location.sort()
consecutive_bursts = False
for i in range(len(burst_location)):
loc = burst_location[i]
dur = burst_duration[i]
if loc < 0 or loc + dur > duration:
raise ValueError("Error! Burst location times must be numbers greater than 0, and summed with their duration cannot be greater than the signal duration.")
else:
if i == 0: # First burst
quiet_duration[i] = loc
if burst_number == 1:
quiet_duration[i+1] = duration - (loc + dur)
else:
next_loc = burst_location[i+1]
if loc + dur >= next_loc:
raise ValueError("Error! Burst location times overlap.")
elif loc + dur == next_loc - 1/sampling_rate:
consecutive_bursts = True
elif i < burst_number - 1: # Intermediate burst
next_loc = burst_location[i+1]
if loc + dur >= next_loc:
raise ValueError("Error! Burst location times overlap.")
elif loc + dur == next_loc - 1/sampling_rate:
consecutive_bursts = True
prev_loc = burst_location[i-1]
prev_dur = burst_duration[i-1]
quiet_duration[i] = loc - (prev_loc + prev_dur)
else: # Final burst
prev_loc = burst_location[i-1]
prev_dur = burst_duration[i-1]
quiet_duration[i] = loc - (prev_loc + prev_dur)
quiet_duration[i+1] = duration - (loc + dur)
if consecutive_bursts:
warnings.warn("The defined burst locations and respective duration of each have lead to consecutive muscle activity bursts. This might hinder the distinction between bursts.")
else:
if not all(np.round((dur)/(1/sampling_rate),10).is_integer() for dur in quiet_duration):
warnings.warn("The precision derived from the selected sampling rate is not compatible with the duration of quiet periods between evenly distributed bursts. This might alter (unequal duration of quiet periods) or compromise (impossible to separate bursts in the time domain) synthesization of the signals.")
total_duration = total_duration_bursts + np.sum(quiet_duration)
if np.round(total_duration,10) != duration:
raise ValueError("Error! The total duration of bursts and quiet periods does not match the total duration of the signal.")
if amplitude_mult is None:
amplitude_mult = 1
if isinstance(amplitude_mult, (int, float)):
amplitude_mult = [amplitude_mult] * burst_number
if not ((isinstance(amplitude_mult, list))and all((isinstance(amp, int) or isinstance(amp, float)) for amp in amplitude_mult)):
raise TypeError("Error! 'amplitude_mult' must be an integer or float, or a list of this type.")
if len(amplitude_mult) != burst_number:
raise ValueError("Error! 'amplitude_mult' cannot be longer than the value of 'burst_number'.")
# Generate bursts
bursts = []
curr_samples = 0
for burst in range(burst_number):
size = int(sampling_rate * burst_duration[burst]+1)
bursts += [list(rng.uniform(-1*amplitude_mult[burst], 1*amplitude_mult[burst], size=size) + baseline)]
curr_samples += size
# Generate quiet periods
quiets = []
for quiet in range(n_quiet):
if quiet == 0:
size = int(round(sampling_rate * quiet_duration[quiet],0))
elif quiet != n_quiet - 1:
size = int(round(sampling_rate * quiet_duration[quiet]-1,0))
else:
size = int(round(sampling_rate * quiet_duration[quiet]-1,0))
# Guarantee signal length is respected (could be more or less samples due to rounding off errors, when using a sampling
# rate where the derived precision is not compatible with burst locations/duration or the duration of quiet periods):
if length != curr_samples+size:
size = length-curr_samples
curr_samples += size
quiets += [list(np.zeros((size,)) + baseline)]
# Merge muscle activity bursts and quiet periods
emg = []
activity = []
for i in range(len(quiets)):
emg += quiets[i]
activity += [list(np.zeros((len(quiets[i]),)))]
if i < len(bursts):
emg += bursts[i]
activity += [list(np.ones((len(bursts[i]),)))]
activity = np.concatenate(activity)
emg = np.array(emg)
# Add random (gaussian distributed) noise
noise_signal = rng.normal(0, noise, length)
max_noise = max(noise_signal)
if any(max_noise > amplitude_mult):
warnings.warn("Maximum burst amplitude is smaller than the maximum level of noise.")
SNR = 20*np.log10(np.sqrt(np.mean(emg**2))/np.sqrt(np.mean(noise_signal**2)))
emg_synth = emg + noise_signal
t = np.arange(0,duration+1/sampling_rate,1/sampling_rate)
params = {"duration": duration,
"length": length,
"sampling_rate": sampling_rate,
"noise": noise,
"baseline": baseline,
"burst_number": burst_number,
"burst_duration": burst_duration,
"burst_location": burst_location,
"amplitude_mult": amplitude_mult,
"random_state": rng,
"clean_signal": emg,
"noise_signal": noise_signal,
"SNR": SNR,
"activity": activity}
args = (emg_synth, t, params)
names = ("emg", "t", "params")
return utils.ReturnTuple(args, names)
def _truncated_gaussian_window(sigma,
alpha,
sampling_rate):
"""Generates a truncated Gaussian window with duration (in seconds) given by 2*sigma*alpha
Follows the approach by Ghislieri, Cerone, Knaflitz and Agostini [ModelEMG2].
If the parameters introduced don't make sense in this context, an error will raise.
Parameters
----------
sigma : float
Standard deviation of the truncated Gaussian process with zero mean.
alpha : float
Multiplier that multiplied with 'sigma' defines the time support of the truncated Gaussian process.
sampling_rate : int
The desired sampling rate (in Hz, i.e., samples/second).
Returns
-------
win : array
The truncated Gaussian window.
References
-----------
.. [ModelEMG] Marco GHISLIERI, Giacinto Luigi CERONE, Marco KNAFLITZ & Valentina AGOSTINI
"LONG SHORT-TERM MEMORY (LSTM) RECURRENT NEURAL NETWORK FOR MUSCLE ACTIVITY DETECTION"
Journal of NeuroEngineering and Rehabilitation, Vol. 18, No. 1, 2021, 3–4
"""
if not isinstance(sigma, (int, float)):
raise TypeError("Error! 'sigma' value must be an integer or float.")
if sigma <= 0:
raise ValueError("Error! 'sigma' value must be positive.")
if not isinstance(alpha, (int, float)):
raise TypeError("Error! 'alpha' value must be an integer or float.")
if alpha <= 0:
raise ValueError("Error! 'alpha' value must be positive.")
dur = 2*sigma*alpha
x = np.arange(np.round(-dur/2,5), np.round(dur/2+1/sampling_rate,5), 1/sampling_rate)
win = np.exp(-(x**2)/(2*sigma**2))
return win
[docs]def synth_gaussian(duration=10,
sampling_rate=1000,
length=None,
SNR=30,
sigma=0.1,
alpha=2.5,
baseline=None,
burst_number=1,
burst_location=None,
random_state=None):
"""Generates an artificial (synthetic) EMG signal of a given duration and sampling rate.
Follows the approach by by Ghislieri, Cerone, Knaflitz and Agostini [ModelEMG2], where muscle activity bursts are modeled as a
zero-mean Gaussian process with standard deviation equal to 10**(SNR/20) mV, and the background noise is modeled as a zero-mean
Gaussian process with standard deviation equal to 1 mV. All muscle activity bursts have the same duration, which is equal to
2*sigma*alpha (in seconds).
If the parameters introduced lead to superimposed burst locations, an error will be raised, and if they lead to consecutive
bursts, a warning will raise. Warnings will also be raised, if the precision derived from the selected sampling rate is not
compatible with each burst's location or duration, or the duration of quiet periods.
Parameters
----------
duration : int, optional
Desired recording length in seconds.
sampling_rate : int, optional
The desired sampling rate (in Hz, i.e., samples/second).
length : int, optional, optional
The desired length of the signal (in samples).
SNR : float, optional
Desired signal-to-noise ratio of the signal (in dB).
sigma : float, optional
Standard deviation of the truncated Gaussian process with zero mean used to simulate muscle activity.
alpha : float, optional
Multiplier that multiplied with the 'sigma' value defines the time support of the truncated Gaussian process, which
will be the duration of the bursts (burst_duration = 2*sigma*alpha, with default values, this duration is 0.5s).
baseline : float, optional
Signal offset from zero. If no value is given, it is assumed that the baseline has already been removed.
burst_number : int, optional
Desired number of bursts of activity (active muscle periods).
burst_location : list, optional
Location of the bursts (in seconds).
random_state : None, int, numpy.random.RandomState or numpy.random.Generator
Seed for the random number generator.
Returns
----------
emg : array
Vector containing the EMG signal.
t : array
Time values accoring to the provided sampling rate.
params : dict
Input parameters of the function, clean EMG and noise signals, and SNR
Examples
----------
sampling_rate = 1000
duration = 10
SNR = 30
sigma = 0.2
alpha = 1.25
bursts = 4
burst_location = [2,4,6,8]
output = synth_gaussian(duration=duration, sampling_rate=sampling_rate, SNR=SNR,
sigma=sigma, alpha=alpha, burst_number=bursts, burst_location=burst_location,
random_state=0)
emg_synth, t, params = output["emg"], output["t"], output["params"]
# Get muscle activity state
activity = params["activity"]
plt.figure()
plt.plot(t,emg_synth,label="EMG")
plt.plot(t,activity,label="Muscle activity")
plt.xlabel("Time (s)")
plt.ylabel("Amplitude (mV)")
plt.grid()
plt.title("EMG")
plt.legend()
plt.show()
References
-----------
.. [ModelEMG2] Marco GHISLIERI, Giacinto Luigi CERONE, Marco KNAFLITZ & Valentina AGOSTINI
"LONG SHORT-TERM MEMORY (LSTM) RECURRENT NEURAL NETWORK FOR MUSCLE ACTIVITY DETECTION"
Journal of NeuroEngineering and Rehabilitation, Vol. 18, No. 1, 2021, 3–4
"""
# Seed the random generator for reproducible results
# If seed is an integer, use the legacy RandomState class
if isinstance(random_state, numbers.Integral):
rng = np.random.RandomState(random_state)
# If seed is already a random number generator class return it as it is
elif isinstance(random_state, (np.random.Generator, np.random.RandomState)):
rng = random_state
# If seed is something else, use the new Generator class
else:
rng = np.random.default_rng(random_state)
if not isinstance(sampling_rate, (int, float)):
raise TypeError("Error! 'sampling_rate' value must be an integer or float.")
if sampling_rate <= 0:
raise ValueError("Error! 'sampling_rate' value must be positive.")
if sampling_rate < 100:
warnings.warn("Sampling rate values below 100 Hz might lead to signals where onsets cannot be detected properly.")
if sampling_rate < 1000:
warnings.warn("Sampling rate values below 1000 Hz will not allow to effectively capture all significant frequency components of the EMG signal.")
if not isinstance(burst_number, int):
raise TypeError("Error! 'burst_number' value must be an integer.")
if burst_number <= 0:
raise ValueError("Error! 'burst_number' value must be positive.")
if not isinstance(SNR, (int, float)):
raise TypeError("Error! 'SNR' value must be an integer or float.")
if not isinstance(duration, (int, float)):
raise TypeError("Error! 'duration' value must be an integer or float.")
if duration <= 0:
raise ValueError("Error! 'duration' value must be positive.")
# Generate number of samples automatically if length is unspecified
if length is None:
length = int(duration * sampling_rate + 1)
else:
if not isinstance(length, (int, float)):
raise TypeError("Error! 'length' value must be an integer or float.")
if length != duration * sampling_rate + 1:
raise ValueError("Error! Signal length does not match duration with the given sampling rate.")
# Checking for invalid inputs
if baseline is None:
baseline = 0
if not isinstance(baseline, (int, float)):
raise TypeError("Error! Baseline value must be an integer or float.")
if not isinstance(sigma, (int, float)):
raise TypeError("Error! 'sigma' value must be an integer or float.")
if sigma <= 0:
raise ValueError("Error! 'sigma' value must be positive.")
if not isinstance(alpha, (int, float)):
raise TypeError("Error! 'alpha' value must be an integer or float.")
if alpha <= 0:
raise ValueError("Error! 'alpha' value must be positive.")
burst_duration = 2*alpha*sigma
if not (np.round((burst_duration)/(1/sampling_rate),10)).is_integer():
warnings.warn("The precision derived from the selected sampling rate is not compatible with the burst duration. This might alter or compromise synthesization of the signals, increasing the sampling rate is recommended.")
if isinstance(burst_duration, (int, float)):
burst_duration = np.repeat(burst_duration, burst_number)
if len(burst_duration) != burst_number:
raise ValueError("Error! 'burst_duration' list cannot be longer than the value of 'burst_number'.")
total_duration_bursts = np.sum(burst_duration)
if total_duration_bursts > duration:
raise ValueError("Error! The total duration of bursts cannot exceed the total duration of the signal.")
# Number of quiet periods (in between bursts)
n_quiet = burst_number + 1
quiet_duration = np.repeat((duration - total_duration_bursts) / n_quiet, n_quiet)
if burst_location is not None:
if not (isinstance(burst_location, list) and all((isinstance(loc, int) or isinstance(loc, float)) for loc in burst_location)):
raise TypeError("Error! 'burst_location' must be a list of integers or floats.")
if len(burst_location) != burst_number:
raise ValueError("Error! 'burst_location' list cannot be longer than the value of 'burst_number'.")
if not all(np.round(((loc)/(1/sampling_rate)),10).is_integer() for loc in burst_location):
warnings.warn("The precision derived from the selected sampling rate is not compatible with the burst locations. This might alter or compromise synthesization of the signals, increasing the sampling rate or altering the burst location/duration is recommended.")
burst_location.sort()
consecutive_bursts = False
for i in range(len(burst_location)):
loc = burst_location[i]
dur = burst_duration[i]
if loc < 0 or loc + dur > duration:
raise ValueError("Error! Burst location times must be numbers greater than 0, and summed with their duration cannot be greater than the signal duration.")
else:
if i == 0: # First burst
quiet_duration[i] = loc
if burst_number == 1:
quiet_duration[i+1] = duration - (loc + dur)
else:
next_loc = burst_location[i+1]
if loc + dur >= next_loc:
raise ValueError("Error! Burst location times overlap.")
elif loc + dur == next_loc - 1/sampling_rate:
consecutive_bursts = True
elif i < burst_number - 1: # Intermediate burst
next_loc = burst_location[i+1]
if loc + dur >= next_loc:
raise ValueError("Error! Burst location times overlap.")
elif loc + dur == next_loc - 1/sampling_rate:
consecutive_bursts = True
prev_loc = burst_location[i-1]
prev_dur = burst_duration[i-1]
quiet_duration[i] = loc - (prev_loc + prev_dur)
else: # Final burst
prev_loc = burst_location[i-1]
prev_dur = burst_duration[i-1]
quiet_duration[i] = loc - (prev_loc + prev_dur)
quiet_duration[i+1] = duration - (loc + dur)
if consecutive_bursts:
warnings.warn("The defined burst locations and respective duration of each have lead to consecutive muscle activity bursts. This might hinder the distinction between bursts.")
else:
if not all(np.round((dur)/(1/sampling_rate),10).is_integer() for dur in quiet_duration):
warnings.warn("The precision derived from the selected sampling rate is not compatible with the duration of quiet periods between evenly distributed bursts. This might alter (unequal duration of quiet periods) or compromise (impossible to separate bursts in the time domain) synthesization of the signals.")
total_duration = total_duration_bursts + np.sum(quiet_duration)
if np.round(total_duration,10) != duration:
raise ValueError("Error! The total duration of bursts and quiet periods does not match the total duration of the signal.")
# Generate muscle activity bursts
bursts = []
curr_samples = 0
sigma_burst = 10 ** (SNR/20)
for burst in range(burst_number):
size = int(round(sampling_rate * burst_duration[burst]+1,0))
signal = rng.normal(0, sigma_burst, size)
truncated_gaussian = _truncated_gaussian_window(sigma,alpha,sampling_rate)
bursts += [list(signal*truncated_gaussian + baseline)]
curr_samples += size
# Generate quiet periods
quiets = []
for quiet in range(n_quiet):
if quiet == 0:
size = int(round(sampling_rate * quiet_duration[quiet],0))
elif quiet != n_quiet - 1:
size = int(round(sampling_rate * quiet_duration[quiet]-1,0))
else:
size = int(round(sampling_rate * quiet_duration[quiet]-1,0))
# Guarantee signal length is respected (could be more or less samples due to rounding off errors, when using a sampling
# rate where the derived precision is not compatible with burst locations/duration or the duration of quiet periods):
if length != curr_samples+size:
size = length-curr_samples
curr_samples += size
quiets += [list(np.zeros((size,)) + baseline)]
# Merge muscle activity bursts and quiet periods
emg = []
activity = []
for i in range(len(quiets)):
emg += quiets[i]
activity += [list(np.zeros((len(quiets[i]),)))]
if i < len(bursts):
emg += bursts[i]
activity += [list(np.ones((len(bursts[i]),)))]
activity = np.concatenate(activity)
emg = np.array(emg)
# Add random (gaussian distributed) noise
noise_signal = rng.normal(0, 1, length)
emg_synth = emg + noise_signal
t = np.arange(0,duration+1/sampling_rate,1/sampling_rate)
params = {"duration": duration,
"length": length,
"sampling_rate": sampling_rate,
"SNR": SNR,
"sigma": sigma,
"alpha": alpha,
"baseline": baseline,
"burst_number": burst_number,
"burst_location": burst_location,
"random_state": rng,
"clean_signal": emg,
"noise_signal": noise_signal,
"activity": activity}
args = (emg_synth, t, params)
names = ("emg", "t", "params")
return utils.ReturnTuple(args, names)