All checks were successful
Build AppImage / Explore-Gitea-Actions (push) Successful in 1m47s
closes #231
367 lines
13 KiB
Python
367 lines
13 KiB
Python
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
import re
|
|
from collections import namedtuple
|
|
|
|
import numpy as np
|
|
from scipy.stats import linregress
|
|
|
|
try:
|
|
from scipy.integrate import simpson
|
|
except ImportError:
|
|
from scipy.integrate import simps as simpson
|
|
from scipy.interpolate import CubicSpline
|
|
|
|
|
|
ReferenceValue = namedtuple('Reference', ['name', 'transitions'])
|
|
Cyclohexane = ReferenceValue('Cyclohexane', [(-87.06+273.15, 79.58), (6.54+273.15, None)])
|
|
|
|
|
|
class DSCSample:
|
|
NUMBER_RE = re.compile(r'(-?\d+(?:\.\d*)?)')
|
|
DSC_RUN_RE = re.compile(r'(\d+)\) DSC')
|
|
DSC_END_RE = re.compile(r'DSC8[50]00 MANUAL TUNE')
|
|
|
|
def __init__(self, fname: str | Path):
|
|
fname = Path(fname)
|
|
if fname.exists():
|
|
self.fname = fname
|
|
else:
|
|
raise IOError(f'{fname} does not exist')
|
|
|
|
self.weight = None
|
|
self.steps = []
|
|
self.starts = []
|
|
|
|
self.read_file(fname)
|
|
|
|
def read_file(self, fname: str | Path) -> None:
|
|
fname = Path(fname)
|
|
|
|
# file contains weird deg C character in stupid ISO encoding
|
|
with fname.open('r', encoding='iso-8859-15') as f:
|
|
ii = 1
|
|
for line in f:
|
|
# iterate over file and look for different steps and start/end positions
|
|
if 'Heat from' in line:
|
|
match = DSCSample.NUMBER_RE.findall(line)
|
|
self.steps.append(('h', float(match[3]), float(match[1])+273.15, float(match[2])+273.15))
|
|
elif 'Cool from' in line:
|
|
match = DSCSample.NUMBER_RE.findall(line)
|
|
self.steps.append(('c', float(match[3]), float(match[1])+273.15, float(match[2])+273.15))
|
|
elif 'Hold for' in line:
|
|
match = DSCSample.NUMBER_RE.findall(line)
|
|
self.steps.append(('i', float(match[2])+273.15, float(match[1])*60))
|
|
|
|
# start of measurement step
|
|
elif DSCSample.DSC_RUN_RE.match(line):
|
|
match = DSCSample.DSC_RUN_RE.match(line)
|
|
r = int(match.group(1))
|
|
if r == 1:
|
|
# account for table header
|
|
self.starts.append(ii + 2)
|
|
else:
|
|
self.starts.append(ii)
|
|
|
|
elif 'Display Weight' in line:
|
|
self.weight = float(DSCSample.NUMBER_RE.search(line).group()) * 1e-3
|
|
|
|
# end of measurements
|
|
elif DSCSample.DSC_END_RE.search(line) is not None:
|
|
end_line = ii - 2
|
|
|
|
ii += 1
|
|
# there are weird empty lines in the beginning that mess up the line count
|
|
if r'\r\r\n' in repr(line):
|
|
ii += 1
|
|
|
|
self.starts.append(end_line)
|
|
|
|
@property
|
|
def cooling(self) -> dict:
|
|
ret_val = {}
|
|
for i, step in enumerate(self.steps):
|
|
if step[0] == 'c':
|
|
ret_val[i] = step[1]
|
|
|
|
return ret_val
|
|
|
|
@property
|
|
def heating(self) -> dict:
|
|
ret_val = {}
|
|
for i, step in enumerate(self.steps):
|
|
if step[0] == 'h':
|
|
ret_val[i] = step[1]
|
|
|
|
return ret_val
|
|
|
|
def get_run(self, rate: float, mode: str = 'h') -> int:
|
|
r = None
|
|
|
|
for k, v in enumerate(self.steps):
|
|
if (v[0] == mode) and (v[1] == rate):
|
|
r = k
|
|
break
|
|
|
|
if r is None:
|
|
raise ValueError(f'Rate {rate} not found')
|
|
|
|
return r
|
|
|
|
def length(self, run: int) -> int:
|
|
return self.starts[run+1] - self.starts[run] - 2
|
|
|
|
def flow_data(self, run: int, length: int = None):
|
|
start = self.starts[run]
|
|
if length is None:
|
|
length = self.length(run)
|
|
|
|
with self.fname.open('r', encoding='iso-8859-15') as f:
|
|
raw_data = np.genfromtxt(f, skip_header=start, max_rows=length-1, usecols=(4, 1, 0))
|
|
|
|
raw_data[:, 0] += 273.15
|
|
raw_data[:, 2] *= 60
|
|
|
|
return raw_data.T
|
|
|
|
def isotherm_data(self, run: int, length: int = None) -> np.ndarray:
|
|
start = self.starts[run]
|
|
if length is None:
|
|
length = self.length(run)
|
|
|
|
with self.fname.open('r', encoding='iso-8859-15') as f:
|
|
raw_data = np.genfromtxt(f, skip_header=start, max_rows=length-1, usecols=(0, 1))
|
|
|
|
raw_data[:, 0] *= 60
|
|
|
|
return raw_data.T
|
|
|
|
|
|
class DSCCalibrator:
|
|
def __init__(self):
|
|
self.sample = None
|
|
self.empty = None
|
|
self.reference = []
|
|
self.ref_list = []
|
|
|
|
def set_measurement(
|
|
self: DSCCalibrator,
|
|
fname: str | Path | DSCSample,
|
|
mode: str = 'sample',
|
|
reference: ReferenceValue = Cyclohexane
|
|
):
|
|
if mode not in ['sample', 'empty', 'reference']:
|
|
raise ValueError(f'Unknown mode {mode}, not "sample", "empty", "reference"')
|
|
if mode == 'reference' and not isinstance(reference, ReferenceValue):
|
|
raise ValueError(f'Reference measurement has no reference values')
|
|
|
|
if not isinstance(fname, DSCSample):
|
|
fname = DSCSample(fname)
|
|
|
|
if mode == 'sample':
|
|
self.sample = fname
|
|
elif mode == 'empty':
|
|
self.empty = fname
|
|
else:
|
|
self.reference.append(fname)
|
|
self.ref_list.append(reference)
|
|
|
|
return fname
|
|
|
|
def remove_reference(self, fname: str | Path):
|
|
fname = Path(fname)
|
|
i = None
|
|
for i, ref in enumerate(self.reference):
|
|
if ref.fname == fname:
|
|
self.reference.pop(i)
|
|
break
|
|
|
|
if i is not None:
|
|
self.ref_list.pop(i)
|
|
|
|
def get_calibration(self, rate: float):
|
|
real_tm = []
|
|
melts = []
|
|
calib_y = []
|
|
|
|
results = []
|
|
|
|
for data, ref_value in zip(self.reference, self.ref_list):
|
|
try:
|
|
idx = data.get_run(rate, mode='h')
|
|
except ValueError:
|
|
return None, None, []
|
|
|
|
measurement = data.flow_data(idx)
|
|
|
|
for trans_temp, enthalpy in ref_value.transitions:
|
|
real_tm.append(trans_temp)
|
|
t_low_lim, t_high_lim = trans_temp - 10, trans_temp + np.clip(rate, 20, 35)
|
|
|
|
low_border = np.argmin(np.abs(measurement[0] - t_low_lim))
|
|
high_border = np.argmin(np.abs(measurement[0] - t_high_lim))
|
|
ref_zoom = measurement[:, low_border:high_border]
|
|
|
|
if enthalpy is not None:
|
|
x_val = np.array([[ref_zoom[0, 0], 1], [ref_zoom[0, -1], 1]])
|
|
y_val = np.array([ref_zoom[1, 0], ref_zoom[1, -1]])
|
|
|
|
sol = np.linalg.solve(x_val, y_val)
|
|
ref_zoom[1] -= (ref_zoom[0] * sol[0] + sol[1])
|
|
|
|
ref_grad = np.gradient(ref_zoom[1])
|
|
crossing = np.where(np.diff(np.sign(np.abs(ref_grad)-0.001)))[0]
|
|
|
|
almost_flat = np.sort(crossing-np.argmax(ref_zoom[1]))
|
|
integ_limit = (almost_flat[np.where((almost_flat < -40))[0][-1]]+np.argmax(ref_zoom[1]),
|
|
almost_flat[np.where((almost_flat > 40))[0][0]]+np.argmax(ref_zoom[1]))
|
|
|
|
# subtract baseline around reference peak
|
|
sol = self.solve_linear_eq(integ_limit, ref_zoom)
|
|
ref_zoom[1] -= (ref_zoom[0] * sol[0] + sol[1])
|
|
|
|
# integrate over peak to calibrate y axis
|
|
# delta H in J/g: Integrate Peak over time and divide by weight
|
|
area = simpson(ref_zoom[1, integ_limit[0]:integ_limit[1]],
|
|
ref_zoom[2, integ_limit[0]:integ_limit[1]],
|
|
even='avg') * 1e-3
|
|
calib_y.append(enthalpy / (area / data.weight))
|
|
|
|
else:
|
|
ref_grad = np.gradient(ref_zoom[1])
|
|
res = linregress(ref_zoom[0, :len(ref_grad)//5], ref_zoom[1, :len(ref_grad)//5])
|
|
|
|
ref_zoom[1] -= (res.slope*ref_zoom[0] + res.intercept)
|
|
|
|
# calculate onset slope (use points at position of maximum gradient - 100/rate (+50/rate))
|
|
ref_grad = np.gradient(ref_zoom[1])
|
|
max_grad = np.argmax(ref_grad)
|
|
|
|
grad_pos = max_grad - max(1, int(160 / rate)), max_grad
|
|
|
|
sol = self.solve_linear_eq(grad_pos, ref_zoom)
|
|
onset = sol[0] * ref_zoom[0] + sol[1]
|
|
|
|
melts.append(-sol[1] / sol[0])
|
|
|
|
results.append([ref_zoom, onset, ref_zoom[:, grad_pos]])
|
|
|
|
if len(melts) > 1:
|
|
A = np.vstack([melts, np.ones(len(melts))]).T
|
|
y = np.array(real_tm)
|
|
|
|
calib_x = np.linalg.lstsq(A, y, rcond=None)[0]
|
|
else:
|
|
calib_x = [1, 0]
|
|
|
|
if calib_y:
|
|
calib_y = 1e-3 * np.mean(calib_y) * 60. / self.sample.weight / rate
|
|
else:
|
|
calib_y = 1
|
|
|
|
return calib_x, calib_y, results
|
|
|
|
@staticmethod
|
|
def solve_linear_eq(limits: tuple, ref_zoom: np.ndarray) -> np.ndarray:
|
|
x_val = np.array([[ref_zoom[0, limits[0]], 1], [ref_zoom[0, limits[1]], 1]])
|
|
y_val = np.array([ref_zoom[1, limits[0]], ref_zoom[1, limits[1]]])
|
|
sol = np.linalg.solve(x_val, y_val)
|
|
|
|
return sol
|
|
|
|
def get_data(
|
|
self: DSCCalibrator,
|
|
idx: int,
|
|
slope: str = 'iso',
|
|
limits: tuple[float, float] = None
|
|
) -> tuple[np.ndarray, np.ndarray, np.ndarray,np.ndarray | None, np.ndarray]:
|
|
if self.sample.steps[idx][0] == 'i':
|
|
raise ValueError('baseline correction is not implemented for isotherms')
|
|
|
|
if slope not in ['iso', 'curve', None]:
|
|
raise ValueError(f'Unknown argument for slope {slope}, not "iso" or "curve"')
|
|
|
|
sample_data = self.sample.flow_data(idx)
|
|
raw_sample = sample_data.copy()
|
|
empty_data = None
|
|
|
|
mode, rate, _, _ = self.sample.steps[idx]
|
|
|
|
empty_y = 0
|
|
idx_empty = 0
|
|
if self.empty is not None:
|
|
try:
|
|
idx_empty = self.empty.get_run(rate, mode=mode)
|
|
except ValueError:
|
|
raise ValueError(f'Empty measurement has no heat rate {rate} K/min')
|
|
|
|
empty_data = self.empty.flow_data(idx_empty)
|
|
|
|
empty_y = empty_data[1]
|
|
if self.sample.length(idx) != self.empty.length(idx_empty):
|
|
with np.errstate(all='ignore'):
|
|
empty_y = CubicSpline(empty_data[2]-empty_data[2, 0], empty_data[1], extrapolate=True)(sample_data[2] - sample_data[2, 0])
|
|
|
|
sample_data[1] -= empty_y
|
|
drift_value = sample_data.copy()[(2, 1), :]
|
|
sample_data = sample_data[:, np.isfinite(sample_data[1, :])]
|
|
|
|
mean_isotherms = []
|
|
for offset in [-1, 1]:
|
|
# read isotherms
|
|
isotherm_sample = self.sample.isotherm_data(idx+offset)
|
|
if self.empty is not None:
|
|
isotherm_empty = self.empty.isotherm_data(idx_empty+offset)
|
|
isotherm_sample[1] -= isotherm_empty[1, 200:-200].mean()
|
|
|
|
# get mean isotherm value
|
|
m = isotherm_sample[1, 200:-200].mean()
|
|
mean_isotherms.append(m)
|
|
|
|
if offset == -1:
|
|
drift_value = np.c_[isotherm_sample, drift_value]
|
|
else:
|
|
drift_value = np.c_[drift_value, isotherm_sample]
|
|
|
|
if slope is not None:
|
|
|
|
if slope == 'iso':
|
|
# calculate slope from difference between isotherms
|
|
m = (mean_isotherms[1] - mean_isotherms[0]) / (sample_data[2, -1] - sample_data[2, 0])
|
|
region = sample_data[1:, 200:201]
|
|
|
|
else:
|
|
# calculate mean slope of heat flow from points in the beginning
|
|
region = None
|
|
if limits is not None:
|
|
if len(limits) != 2:
|
|
raise ValueError(f'limits must be tuple of len 2, not {limits!r}')
|
|
min_lim, max_lim = min(limits), max(limits)
|
|
window = (sample_data[2, :] >= min_lim) * (sample_data[2, :] <= max_lim)
|
|
region = sample_data[1:, window]
|
|
|
|
if region.shape[1] <= 2:
|
|
# raise ValueError(f'No data inside selected time window {min_lim/60} min and {max_lim/60} min')
|
|
region = None
|
|
|
|
if region is None:
|
|
# if no limits, use all
|
|
region = sample_data[1:, 200:-200]
|
|
|
|
grad = np.gradient(region[0, :], region[1, :])
|
|
grad = grad[~np.isnan(grad)]
|
|
m = grad[(grad < grad.mean()+grad.std()/5)*(grad > grad.mean()-grad.std()/5)].mean()
|
|
|
|
offset = region[0, 0]
|
|
sample_data[1] -= m * (sample_data[2] - region[1, 0]) + offset
|
|
line = np.array([
|
|
[sample_data[2, 0], sample_data[2, -1]],
|
|
[m * (sample_data[2, 0] - region[1, 0]) + offset, m * (sample_data[2, -1] - region[1, 0]) + offset]
|
|
])
|
|
|
|
else:
|
|
line = np.array([[sample_data[2, 0], sample_data[2, -1]], [0, 0]])
|
|
|
|
return raw_sample, drift_value, sample_data, empty_data, line
|