forked from IPKM/nmreval
324 lines
12 KiB
Python
324 lines
12 KiB
Python
from pathlib import Path
|
|
import re
|
|
from collections import namedtuple
|
|
from typing import Union
|
|
|
|
import numpy as np
|
|
try:
|
|
from scipy.integrate import simpson
|
|
except ImportError:
|
|
from scipy.integrate import simps as simpson
|
|
from scipy.interpolate import interp1d
|
|
|
|
ReferenceValue = namedtuple('Reference', ['name', 'transitions'])
|
|
Cyclohexane = ReferenceValue('Cyclohexane', [(-87.06+273.15, 79.58), (6.54+273.15, None)])
|
|
|
|
|
|
class DSCSample:
|
|
NUMBER_RE = re.compile(r'(-?\d+(?:\.\d*)?)')
|
|
DSC_RUN_RE = re.compile(r'(\d+)\) DSC')
|
|
DSC_END_RE = re.compile(r'DSC8[50]00 MANUAL TUNE')
|
|
|
|
def __init__(self, fname: Union[str, Path]):
|
|
fname = Path(fname)
|
|
if fname.exists():
|
|
self.fname = fname
|
|
else:
|
|
raise IOError(f'{fname} does not exist')
|
|
|
|
self.weight = None
|
|
self.steps = []
|
|
self.starts = []
|
|
|
|
self.read_file(fname)
|
|
|
|
def read_file(self, fname: Union[str, Path]):
|
|
fname = Path(fname)
|
|
|
|
# file contains weird deg C character in stupiod ISO encoding
|
|
with fname.open('r', encoding='iso-8859-15') as f:
|
|
ii = 1
|
|
for line in f:
|
|
# iterate over file and look for different steps and start/end positions
|
|
if 'Heat from' in line:
|
|
match = DSCSample.NUMBER_RE.findall(line)
|
|
self.steps.append(('h', float(match[3]), float(match[1])+273.15, float(match[2])+273.15))
|
|
|
|
elif 'Cool from' in line:
|
|
match = DSCSample.NUMBER_RE.findall(line)
|
|
self.steps.append(('c', float(match[3]), float(match[1])+273.15, float(match[2])+273.15))
|
|
|
|
elif 'Hold for' in line:
|
|
match = DSCSample.NUMBER_RE.findall(line)
|
|
self.steps.append(('i', float(match[2])+273.15, float(match[1])*60))
|
|
|
|
# start of measurement step
|
|
elif DSCSample.DSC_RUN_RE.match(line):
|
|
match = DSCSample.DSC_RUN_RE.match(line)
|
|
r = int(match.group(1))
|
|
if r == 1:
|
|
# account for table header
|
|
self.starts.append(ii + 2)
|
|
else:
|
|
self.starts.append(ii)
|
|
|
|
elif 'Display Weight' in line:
|
|
self.weight = float(DSCSample.NUMBER_RE.search(line).group()) * 1e-3
|
|
|
|
# end of measurements
|
|
elif DSCSample.DSC_END_RE.search(line) is not None:
|
|
end_line = ii - 2
|
|
|
|
ii += 1
|
|
# there are weird empty lines in the beginning that mess up the line count
|
|
if r'\r\r\n' in repr(line):
|
|
ii += 1
|
|
|
|
self.starts.append(end_line)
|
|
|
|
@property
|
|
def cooling(self) -> dict:
|
|
ret_val = {}
|
|
for i, step in enumerate(self.steps):
|
|
if step[0] == 'c':
|
|
ret_val[i] = step[1]
|
|
|
|
return ret_val
|
|
|
|
@property
|
|
def heating(self) -> dict:
|
|
ret_val = {}
|
|
for i, step in enumerate(self.steps):
|
|
if step[0] == 'h':
|
|
ret_val[i] = step[1]
|
|
|
|
return ret_val
|
|
|
|
def get_run(self, rate: float, mode: str = 'h') -> int:
|
|
r = None
|
|
|
|
for k, v in enumerate(self.steps):
|
|
if (v[0] == mode) and (v[1] == rate):
|
|
r = k
|
|
break
|
|
|
|
if r is None:
|
|
raise ValueError(f'Rate {rate} not found')
|
|
|
|
return r
|
|
|
|
def length(self, run: int) -> int:
|
|
return self.starts[run+1] - self.starts[run] - 2
|
|
|
|
def flow_data(self, run: int, length: int = None):
|
|
start = self.starts[run]
|
|
if length is None:
|
|
length = self.length(run)
|
|
|
|
with self.fname.open('r', encoding='iso-8859-15') as f:
|
|
raw_data = np.genfromtxt(f, skip_header=start, max_rows=length-1, usecols=(4, 1, 0))
|
|
|
|
raw_data[:, 0] += 273.15
|
|
raw_data[:, 2] *= 60
|
|
|
|
return raw_data.T
|
|
|
|
def isotherm_data(self, run: int, length: int = None) -> np.ndarray:
|
|
start = self.starts[run]
|
|
if length is None:
|
|
length = self.length(run)
|
|
|
|
with self.fname.open('r', encoding='iso-8859-15') as f:
|
|
raw_data = np.genfromtxt(f, skip_header=start, max_rows=length-1, usecols=(0, 1))
|
|
|
|
raw_data[:, 0] *= 60
|
|
|
|
return raw_data.T
|
|
|
|
|
|
class DSCCalibrator:
|
|
def __init__(self):
|
|
self.sample = None
|
|
self.empty = None
|
|
self.reference =[]
|
|
self.ref_list = []
|
|
|
|
def set_measurement(self,
|
|
fname: Union[str, Path, DSCSample], mode: str = 'sample',
|
|
reference: ReferenceValue = Cyclohexane):
|
|
if mode not in ['sample', 'empty', 'reference']:
|
|
raise ValueError(f'Unknown mode {mode}, not "sample", "empty", "reference"')
|
|
if mode == 'reference' and not isinstance(reference, ReferenceValue):
|
|
raise ValueError(f'Reference measurement has no reference values')
|
|
|
|
if not isinstance(fname, DSCSample):
|
|
fname = DSCSample(fname)
|
|
|
|
if mode == 'sample':
|
|
self.sample = fname
|
|
elif mode == 'empty':
|
|
self.empty = fname
|
|
else:
|
|
self.reference.append(fname)
|
|
self.ref_list.append(reference)
|
|
|
|
return fname
|
|
|
|
def remove_reference(self, fname: Union[str, Path]):
|
|
fname = Path(fname)
|
|
i = None
|
|
for i, ref in enumerate(self.reference):
|
|
if ref.fname == fname:
|
|
self.reference.pop(i)
|
|
break
|
|
|
|
if i is not None:
|
|
self.ref_list.pop(i)
|
|
|
|
def get_calibration(self, rate: float):
|
|
real_tm = []
|
|
melts = []
|
|
calib_y = []
|
|
|
|
results = []
|
|
|
|
for data, ref_value in zip(self.reference, self.ref_list):
|
|
try:
|
|
idx = data.get_run(rate, mode='h')
|
|
except ValueError:
|
|
return None, None, []
|
|
|
|
measurement = data.flow_data(idx)
|
|
|
|
for trans_temp, enthalpy in ref_value.transitions:
|
|
real_tm.append(trans_temp)
|
|
t_low_lim, t_high_lim = trans_temp - 10, trans_temp + 20
|
|
|
|
low_border = np.argmin(np.abs(measurement[0] - t_low_lim))
|
|
high_border = np.argmin(np.abs(measurement[0] - t_high_lim))
|
|
ref_zoom = measurement[:, low_border:high_border]
|
|
x_val = np.array([[ref_zoom[0, 0], 1], [ref_zoom[0, -1], 1]])
|
|
y_val = np.array([ref_zoom[1, 0], ref_zoom[1, -1]])
|
|
|
|
sol = np.linalg.solve(x_val, y_val)
|
|
ref_zoom[1] -= (ref_zoom[0] * sol[0] + sol[1])
|
|
peak_max = ref_zoom[:, np.argmax(ref_zoom[1])]
|
|
integ_limit = (np.argmin(np.abs(ref_zoom[0] - peak_max[0] + 3)),
|
|
np.argmin(np.abs(ref_zoom[0] - peak_max[0] - 3)))
|
|
|
|
# substract baseline around reference peaks
|
|
x_val = np.array([[ref_zoom[0, integ_limit[0]], 1], [ref_zoom[0, integ_limit[1]], 1]])
|
|
y_val = np.array([ref_zoom[1, integ_limit[0]], ref_zoom[1, integ_limit[1]]])
|
|
|
|
sol = np.linalg.solve(x_val, y_val)
|
|
ref_zoom[1] -= (ref_zoom[0] * sol[0] + sol[1])
|
|
|
|
# calculate onset slope (use points at position of maximum gradient - 100/rate (+50/rate))
|
|
ref_grad = np.gradient(ref_zoom[1])
|
|
max_grad = np.argmax(ref_grad)
|
|
|
|
grad_pos = max_grad-int(50 / rate), max_grad
|
|
|
|
x_val = np.array([[ref_zoom[0, grad_pos[0]], 1],
|
|
[ref_zoom[0, grad_pos[1]], 1]])
|
|
y_val = np.array([ref_zoom[1, grad_pos[0]],
|
|
ref_zoom[1,grad_pos[1]]])
|
|
sol = np.linalg.solve(x_val, y_val)
|
|
onset = sol[0] * ref_zoom[0] + sol[1]
|
|
|
|
melts.append(-sol[1] / sol[0])
|
|
|
|
if enthalpy is not None:
|
|
# integrate over peak to calibrate y axis
|
|
# delta H in J/g: Integrate Peak over time and divide by weight
|
|
area = simpson(ref_zoom[1, integ_limit[0]:integ_limit[1]],
|
|
ref_zoom[2, integ_limit[0]:integ_limit[1]],
|
|
even='avg') * 1e-3
|
|
calib_y.append(enthalpy / (area / data.weight))
|
|
|
|
results.append([ref_zoom, onset, ref_zoom[:, grad_pos]])
|
|
|
|
if len(melts) > 1:
|
|
A = np.vstack([melts, np.ones(len(melts))]).T
|
|
y = np.array(real_tm)
|
|
|
|
calib_x = np.linalg.lstsq(A, y, rcond=None)[0]
|
|
else:
|
|
calib_x = [1, 0]
|
|
|
|
if calib_y:
|
|
calib_y = 1e-3 * np.mean(calib_y) * 60. / self.sample.weight / rate
|
|
else:
|
|
calib_y = 1
|
|
|
|
return calib_x, calib_y, results
|
|
|
|
def get_data(self, idx, slope='iso'):
|
|
if self.sample.steps[idx][0] == 'i':
|
|
raise ValueError('baseline correction is not implemented for isotherms')
|
|
|
|
if slope not in ['iso', 'curve', None]:
|
|
raise ValueError(f'Unknown argument for slope {slope}, not "iso" or "curve"')
|
|
|
|
sample_data = self.sample.flow_data(idx)
|
|
raw_sample = sample_data.copy()
|
|
empty_data = None
|
|
|
|
mode, rate, _, _ = self.sample.steps[idx]
|
|
|
|
empty_y = 0
|
|
idx_empty = 0
|
|
if self.empty is not None:
|
|
try:
|
|
idx_empty = self.empty.get_run(rate, mode=mode)
|
|
except ValueError:
|
|
raise ValueError(f'Empty measurement has no heat rate {rate} K/min')
|
|
|
|
empty_data = self.empty.flow_data(idx_empty)
|
|
|
|
empty_y = empty_data[1]
|
|
if self.sample.length(idx) != self.empty.length(idx_empty):
|
|
with np.errstate(all='ignore'):
|
|
empty_y = interp1d(empty_data[0], empty_data[1], fill_value='extrapolate')(sample_data[0])
|
|
|
|
sample_data[1] -= empty_y
|
|
drift_value = sample_data.copy()[(2, 1), :]
|
|
|
|
mean_isotherms = []
|
|
for offset in [-1, 1]:
|
|
# read isotherms
|
|
isotherm_sample = self.sample.isotherm_data(idx+offset)
|
|
if self.empty is not None:
|
|
isotherm_empty = self.empty.isotherm_data(idx_empty+offset)
|
|
isotherm_sample[1] -= isotherm_empty[1, 200:-200].mean()
|
|
|
|
# get mean isotherm value
|
|
m = isotherm_sample[1, 200:-200].mean()
|
|
mean_isotherms.append(m)
|
|
|
|
if offset == -1:
|
|
drift_value = np.c_[isotherm_sample, drift_value]
|
|
else:
|
|
drift_value = np.c_[drift_value, isotherm_sample]
|
|
|
|
if slope is not None:
|
|
if slope == 'iso':
|
|
# calculate slope from difference between isotherms
|
|
m = (mean_isotherms[1] - mean_isotherms[0]) / (sample_data[2, -1] - sample_data[2, 0])
|
|
offset = sample_data[1, 200]
|
|
else:
|
|
# calculate mean slope of heat flow from points in the beginning
|
|
offset = sample_data[1, 200]
|
|
grad = np.gradient(sample_data[1, :], sample_data[2, :])
|
|
m = grad[(grad < grad.mean()+grad.std()/5)*(grad > grad.mean()-grad.std()/5)].mean()
|
|
|
|
sample_data[1] -= m * (sample_data[2] - sample_data[2, 200]) + offset
|
|
line = np.array([[sample_data[2, 0], sample_data[2, -1]],
|
|
[m * (sample_data[2, 200] - sample_data[2, 200]) + offset,
|
|
m * (sample_data[2, -1] - sample_data[2, 200]) + offset]])
|
|
|
|
else:
|
|
line = np.array([[sample_data[2, 0], sample_data[2, -1]], [0, 0]])
|
|
|
|
return raw_sample, drift_value, sample_data, empty_data, line
|