-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpk_spectrum.py
204 lines (175 loc) · 6.83 KB
/
pk_spectrum.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
from enum import Enum
import numpy as np
from scipy.optimize import nnls
from openpyxl import load_workbook
Kw = 1e-14
F = 96485
LOG10 = np.log(10)
TOLERANCE = 1e-6
class TitrationModes(Enum):
VOLUMETRIC = 1
COULOMETRIC = 2
class pKSpectrum:
def __init__(self,
source_file,
mode: TitrationModes = TitrationModes.VOLUMETRIC):
self.source_file = source_file
self.mode = mode
self.sample_name = None
self.comment = None
self.date = None
self.time = None
self.sample_volume = None
self.alkaline_concentration = None
self.current = None
self.alkaline_volumes = []
self.times = []
self.ph_values = []
self.alpha_values = []
self.valid_points = 0
self.acid_peaks = []
self._load_data()
def _load_data(self):
"""
Loads data and makes a simple check
:return: None
"""
# Load workbook
wb = load_workbook(self.source_file)
ws = wb.active
# Get sample information
self.sample_name = ws['A1'].value
self.comment = ws['A2'].value
self.timestamp = ws['A3'].value
self.sample_volume = ws['A4'].value
if self.mode == TitrationModes.VOLUMETRIC:
self.alkaline_concentration = ws['A5'].value
else:
self.current = ws['A5'].value
# Get titration data
shift = 0
while True:
volume = ws[f'A{6 + shift}'].value
ph = ws[f'B{6 + shift}'].value
if self._check_number(volume) and self._check_number(ph):
self.alkaline_volumes.append(volume)
self.ph_values.append(ph)
shift += 1
else:
break
# Arrange titration data
swapped = False
while True:
for i in range(len(self.alkaline_volumes)-1):
if self.alkaline_volumes[i] > self.alkaline_volumes[i+1]:
swapped = True
self.alkaline_volumes[i], self.alkaline_volumes[i+1] = \
self.alkaline_volumes[i+1], self.alkaline_volumes[i]
self.ph_values[i], self.ph_values[i+1] = self.ph_values[i+1], self.ph_values[i]
if not swapped:
break
# Transform volume data to time if needed
if self.mode == TitrationModes.COULOMETRIC:
self.times = list(self.alkaline_volumes)
self.alkaline_volumes = []
# Check data validity
for i in range(len(self.alkaline_volumes)):
h = pow(10, -self.ph_values[i])
if self.mode == TitrationModes.VOLUMETRIC:
t = ((h - Kw / h) / self.sample_volume) * (self.alkaline_volumes[i] + self.sample_volume) + \
self.alkaline_concentration * self.alkaline_volumes[i] / self.sample_volume
else:
t = h - Kw / h + self.current * self.times[i] / F / self.sample_volume
if t >= 0:
self.alpha_values.append(t)
self.valid_points = i + 1
else:
break
def make_calculation(self, pk_start=0, pk_end=10, d_pk=0.05, integration_constant=True):
"""
Calculates pK spectrum
:param pk_start: Start pK value (float)
:param pk_end: End pK value (float)
:param d_pk: Delta pK value (float)
:param integration_constant: Use integration constant? (boolean)
:return: Peaks, calculation error
"""
# Check for the valid points
if self.valid_points < 7:
return None, np.nan
# Calculate constant step
pk_step = round((pk_end - pk_start) / d_pk) + 1
# Fill right part
if integration_constant:
shape_1 = pk_step + 2
else:
shape_1 = pk_step
right = np.zeros((self.valid_points, shape_1))
for i in range(self.valid_points):
for j in range(pk_step):
right[i, j] = d_pk / (1 + np.exp(LOG10 * (pk_start + d_pk * j - self.ph_values[i])))
# Add items for the constant calculation
if integration_constant:
right[:, -2] = 1
right[:, -1] = -1
# Solve equation
constants, residual = nnls(right, np.array(self.alpha_values))
# Remove constant from scope
if integration_constant:
constants = constants[:-2]
# Normalization
constants *= d_pk
# Truncate border artefacts
if constants[0] > TOLERANCE > constants[1]:
constants[0] = 0
if constants[-1] > TOLERANCE > constants[-2]:
constants[-1] = 0
sum_constants = constants.sum()
max_constant = constants.max(initial=0)
threshold = max_constant / 100
constants_relative = constants / sum_constants
# Peak calculation sequence
i = 0
while i < pk_step:
if constants[i] > threshold:
self.acid_peaks.append({'point_count': 0, 'concentration': 0, 'first_point': i})
while i < pk_step and constants[i] > threshold:
self.acid_peaks[-1]['point_count'] += 1
self.acid_peaks[-1]['concentration'] += constants[i]
i += 1
else:
i += 1
# Peaks exact position and height calculation
if len(self.acid_peaks) > 0:
for i in range(len(self.acid_peaks)):
t1 = 0
t2 = 0
peak = self.acid_peaks[i]
for j in range(peak['point_count']):
t1 += constants_relative[peak['first_point'] + j] * \
(pk_start + d_pk * (peak['first_point'] + j))
t2 += constants_relative[peak['first_point'] + j]
peak['mean'] = t1 / t2
for i in range(len(self.acid_peaks)):
peak = self.acid_peaks[i]
if peak['point_count'] > 0:
t1 = 0
t2 = 0
for j in range(peak['point_count']):
t1 += constants_relative[peak['first_point'] + j] * \
(pk_start + d_pk * (peak['first_point'] + j) - peak['mean']) ** 2
t2 += constants_relative[peak['first_point'] + j]
peak['interval'] = 1.96 * np.sqrt(t1 / t2) / np.sqrt(peak['point_count'])
else:
peak['interval'] = 0.
# Calculate error
error = np.sqrt(residual) / np.sqrt(pk_step - 1)
return self.acid_peaks, error
@staticmethod
def _check_number(a):
"""
Checks if argument is number
:param a: Value to check (any)
:return: Check result (boolean)
"""
return type(a) == int or type(a) == float