-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtransformations.py
91 lines (65 loc) · 2.48 KB
/
transformations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import pywt
import numpy as np
def calc_baseline(signal):
"""
Calculate the baseline of signal.
Args:
signal (numpy 1d array): signal whose baseline should be calculated
Returns:
baseline (numpy 1d array with same size as signal): baseline of the signal
"""
ssds = np.zeros((3))
cur_lp = np.copy(signal)
iterations = 0
while True:
# Decompose 1 level
lp, hp = pywt.dwt(cur_lp, "db4")
# Shift and calculate the energy of detail/high pass coefficient
ssds = np.concatenate(([np.sum(hp ** 2)], ssds[:-1]))
# Check if we are in the local minimum of energy function of high-pass signal
if ssds[2] > ssds[1] and ssds[1] < ssds[0]:
break
cur_lp = lp[:]
iterations += 1
# Reconstruct the baseline from this level low pass signal up to the original length
baseline = cur_lp[:]
for _ in range(iterations):
baseline = pywt.idwt(baseline, np.zeros((len(baseline))), "db4")
return signal-baseline[: len(signal)]
def normalize(x):
x = (x - x.min()) / (x.max() - x.min())
return x
def powerline(signal):
wavelet_name = 'db4' # Wavelet name (you can choose a different one)
level = 4 # Decomposition level
coeffs = pywt.wavedec(signal, wavelet_name, level=level)
threshold = np.std(coeffs[-1]) * np.sqrt(2 * np.log(len(signal)))
coeffs[1:] = (pywt.threshold(c, threshold, mode='soft') for c in coeffs[1:])
denoised_ecg_signal = pywt.waverec(coeffs, wavelet_name)
return denoised_ecg_signal
def rms_transform(signal):
rms = np.zeros([len(signal[:, 0])])
tempp = np.zeros((12, len(signal[:, 0])))
for i in range(12):
tempp[i] = signal[:, i] ** 2
for i in range(len(signal[:, 0])):
rms[i] = np.sqrt(np.mean(tempp[:, i]))
return rms
def rms_transform2(signal):
rms = np.zeros([len(signal[:, 0])])
tempp = np.zeros((12, len(signal[:, 0])))
for i in range(12):
temp = normalize(calc_baseline(powerline(signal[:, i])))
tempp[i] = temp ** 2
for i in range(len(signal[:, 0])):
rms[i] = np.sqrt(np.mean(tempp[:, i]))
return rms
def all_transform(signal):
rms = np.zeros([len(signal[:, 0])])
tempp = np.zeros((12, len(signal[:, 0])))
for i in range(12):
tempp[i] = normalize(calc_baseline(powerline(signal[:, i]))) ** 2
for i in range(len(signal[:, 0])):
rms[i] = np.sqrt(np.mean(tempp[:, i]))
rms = normalize(rms)
return rms