Files
nmreval/src/nmreval/fit/minimizer.py
Dominik Demuth 80d9c7098c
All checks were successful
Build AppImage / Explore-Gitea-Actions (push) Successful in 1m51s
207-noncomplex-fits (#244)
Co-authored-by: Dominik Demuth <dominik.demuth@physik.tu-darmstadt.de>
Reviewed-on: #244

closes #207
2024-02-11 17:40:50 +00:00

560 lines
19 KiB
Python

from __future__ import annotations
import warnings
from itertools import product
import numpy as np
import scipy.linalg as la
from scipy import optimize
import scipy.odr as odr
from . import EPS
from .data import Data
from .model import Model
from .parameter import Parameters
from .result import FitResultCreator
__all__ = ['FitRoutine', 'FitAbortException']
from ..lib.logger import logger
class FitAbortException(Exception):
pass
# COST FUNCTIONS: f(x) - y (least_square, minimize), and f(x) (ODR)
def _cost_scipy_glob(p: list[float], data: list[Data], varpars: list[str], used_pars: list[list[str]]):
# replace values
for keys, values in zip(varpars, p):
for data_i in data:
if keys in data_i.parameter.keys():
# TODO move this to scaled_value setter
data_i.parameter[keys].scaled_value = values
data_i.parameter[keys].namespace[keys] = data_i.parameter[keys].value
r = []
# unpack parameter and calculate y values and concatenate all
for values, p_idx in zip(data, used_pars):
actual_parameters = [values.parameter[keys].value for keys in p_idx]
r = np.r_[r, values.cost(actual_parameters)]
return r
def _cost_scipy(p, data, varpars, used_pars):
for keys, values in zip(varpars, p):
data.parameter[keys].scaled_value = values
data.parameter[keys].namespace[keys] = data.parameter[keys].value
actual_parameters = [data.parameter[keys].value for keys in used_pars]
return data.cost(actual_parameters)
def _cost_odr(p: list[float], data: Data, varpars: list[str], used_pars: list[str], fitmode: int=0):
for keys, values in zip(varpars, p):
data.parameter[keys].scaled_value = values
data.parameter[keys].namespace[keys] = data.parameter[keys].value
actual_parameters = [data.parameter[keys].value for keys in used_pars]
return data.func(actual_parameters, data.x)
def _cost_odr_glob(p: list[float], data: list[Data], var_pars: list[str], used_pars: list[str]):
# replace values
for data_i in data:
_update_parameter(data_i, var_pars, p)
r = []
# unpack parameter and calculate y values and concatenate all
for values, p_idx in zip(data, used_pars):
actual_parameters = [values.parameter[keys].value for keys in p_idx]
r = np.r_[r, values.func(actual_parameters, values.x)]
return r
def _update_parameter(data: Data, varied_keys: list[str], parameter: list[float]):
for keys, values in zip(varied_keys, parameter):
if keys in data.parameter.keys():
data.parameter[keys].scaled_value = values
data.parameter[keys].namespace[keys] = data.parameter[keys].value
class FitRoutine(object):
def __init__(self, mode='lsq'):
self.fitmethod = mode
self.data = []
self.fit_model = None
self._no_own_model = []
self.result = []
self.linked = []
self._abort = False
self.step = 0
def add_data(self, x, y=None, we=None, idx=None):
if isinstance(x, Data):
d = x
else:
_x = np.asarray(x)
if _x.ndim == 2 and _x.shape[1] == 2:
d = Data(*x, we=we)
elif y is None:
raise ValueError('First argument must be of type Data, 2d-array, '
'or second argument must be given')
else:
d = Data(x, y, we=we, idx=idx)
if idx is not None:
d.idx = idx
d.minimizer = self
self.data.append(d)
self.result.append(None)
return d
def remove_data(self, data):
try:
idx = self.data.index(data)
_ = self.data.pop(idx)
self.result.pop(idx)
except ValueError:
raise IndexError(f'Data {data} not found')
def set_model(self, func, *args, idx=None, **kwargs):
if isinstance(func, Model):
model = func
else:
model = Model(func, *args, **kwargs)
if idx is not None:
for i, data in enumerate(self.data):
if data.idx == idx:
data.set_model(model)
else:
self.fit_model = model
return self.fit_model
def set_link_parameter(self, dismissed_param: tuple[Model | Data, str], replacement: tuple[Model, str]):
if isinstance(replacement[0], Model):
if replacement[1] not in replacement[0].parameter:
raise KeyError(f'Parameter {replacement[1]} of '
f'model {replacement[0]} is not global')
if isinstance(dismissed_param[0], Model):
warnings.warn(f'Replaced parameter {dismissed_param[1]} in {dismissed_param[0]} '
f'becomes global with linkage.')
self.linked.append((*dismissed_param, *replacement))
def prepare_links(self):
self._no_own_model = []
_found_models = {}
linked_sender = {}
for v in self.data:
linked_sender[v] = set()
# set temporary model
if v.model is None:
v.model = self.fit_model
self._no_own_model.append(v)
# register model
if v.model not in _found_models:
_found_models[v.model] = []
_found_models[v.model].append(v)
if v.model not in linked_sender:
linked_sender[v.model] = set()
linked_parameter = {}
for dismiss_model, dismiss_param, replace_model, replace_param in self.linked:
linked_sender[replace_model].add(dismiss_model)
linked_sender[replace_model].add(replace_model)
replace_key = replace_model.parameter.get_key(replace_param)
dismiss_key = dismiss_model.parameter.get_key(dismiss_param)
if isinstance(replace_model, Data):
linked_parameter[dismiss_key] = replace_key
else:
p = dismiss_model.set_global_parameter(dismiss_param, replace_key)
p._expr_disp = replace_param
for mm, m_data in _found_models.items():
if mm.parameter:
for dd in m_data:
linked_sender[mm].add(dd)
linked_sender[dd].add(mm)
coupled_data = []
visited_data = []
for s in linked_sender.keys():
if s in visited_data:
continue
sub_graph = []
self.find_paths(s, linked_sender, sub_graph, visited_data)
if sub_graph:
coupled_data.append(sub_graph)
return coupled_data, linked_parameter
def find_paths(self, start, graph, coupled_nodes=None, visited_nodes=None):
visited_nodes.append(start)
if isinstance(start, Data):
coupled_nodes.append(start)
for neighbor in graph[start]:
if neighbor in visited_nodes:
continue
self.find_paths(neighbor, graph, coupled_nodes, visited_nodes)
def abort(self):
logger.info('Fit aborted by user')
self._abort = True
def run(self, mode: str = None):
self._abort = False
if mode is None:
mode = self.fitmethod
fit_groups, linked_parameter = self.prepare_links()
for data_groups in fit_groups:
if len(data_groups) == 1 and not self.linked:
data = data_groups[0]
# get variable parameter for fitter
p0_k, lb_k, ub_k, var_pars_k = self._prep_data(data)
if p0_k is None:
self.make_results(data, data.para_keys, var_pars_k, data.para_keys, (len(data.para_keys), len(data.para_keys)),
err=None, corr=None, partial_corr=None)
else:
if mode == 'lsq':
self._least_squares_single(data, p0_k, lb_k, ub_k, var_pars_k)
elif mode == 'nm':
self._nm_single(data, p0_k, lb_k, ub_k, var_pars_k)
elif mode == 'odr':
# ODR takes no bounds
self._odr_single(data, p0_k, var_pars_k)
else:
data_pars, p0, lb, ub, var_pars = self._prep_global(data_groups, linked_parameter)
if not p0:
for data_k, p_k in zip(data_groups, data_pars):
self.make_results(data_k, p_k, [], p_k, (len(p_k), len(p_k)),
err=None, corr=None, partial_corr=None)
else:
if mode == 'lsq':
self._least_squares_global(data_groups, p0, lb, ub, var_pars, data_pars)
elif mode == 'nm':
self._nm_global(data_groups, p0, lb, ub, var_pars, data_pars)
elif mode == 'odr':
self._odr_global(data_groups, p0, var_pars, data_pars)
self.unprep_run()
for r in self.result:
r.pprint()
return self.result
def make_preview(self, x: np.ndarray) -> list[np.ndarray]:
y_pred = []
fit_groups, linked_parameter = self.prepare_links()
for data_groups in fit_groups:
data = data_groups[0]
actual_parameters = [p.value for p in data.parameter.values()]
y_pred.append(data.func(actual_parameters, x))
return y_pred
def _prep_data(self, data):
if data.get_model() is None:
data._model = self.fit_model
self._no_own_model.append(data)
# data.parameter.prepare_bounds()
return self._prep_parameter(data.parameter)
@staticmethod
def _prep_parameter(parameter: Parameters):
vals = []
var_pars = []
for p_k, v_k in parameter.items():
if v_k.var:
vals.append([v_k.scaled_value, v_k.lb / v_k.scale, v_k.ub / v_k.scale])
var_pars.append(p_k)
if vals:
pp, lb, ub = zip(*vals)
else:
pp = lb = ub = None
return pp, lb, ub, var_pars
@staticmethod
def _prep_global(data_group: list[Data], linked):
p0 = []
lb = []
ub = []
var = []
data_pars = []
# loopy-loop over data that belong to one fit (linked or global)
for data in data_group:
# is parameter replaced by global parameter?
for k, v in data.model.parameter.items():
data.replace_parameter(k, v)
# data.parameter.prepare_bounds()
actual_pars = []
for i, p_k in enumerate(data.para_keys):
p_k_used = p_k
v_k_used = data.parameter[p_k]
actual_pars.append(p_k_used)
# parameter is variable and was not found before as shared parameter
if v_k_used.var and p_k_used not in var:
var.append(p_k_used)
p0.append(v_k_used.scaled_value)
lb.append(v_k_used.lb / v_k_used.scale)
ub.append(v_k_used.ub / v_k_used.scale)
data_pars.append(actual_pars)
return data_pars, p0, lb, ub, var
def unprep_run(self):
for d in self._no_own_model:
d._model = None
self._no_own_model = []
Parameters.reset()
def _least_squares_single(self, data, p0, lb, ub, var):
self.step = 0
def cost(p):
self.step += 1
if self._abort:
raise FitAbortException(f'Fit aborted by user')
return _cost_scipy(p, data, var, data.para_keys)
with np.errstate(all='ignore'):
res = optimize.least_squares(cost, p0, bounds=(lb, ub), max_nfev=500 * len(p0))
err, corr, partial_corr = _calc_error(res.jac, np.sum(res.fun**2), *res.jac.shape)
self.make_results(data, res.x, var, data.para_keys, res.jac.shape,
err=err, corr=corr, partial_corr=partial_corr)
def _least_squares_global(self, data, p0, lb, ub, var, data_pars):
def cost(p):
self.step += 1
if self._abort:
raise FitAbortException(f'Fit aborted by user')
return _cost_scipy_glob(p, data, var, data_pars)
with np.errstate(all='ignore'):
res = optimize.least_squares(cost, p0, bounds=(lb, ub), max_nfev=500 * len(p0))
err, corr, partial_corr = _calc_error(res.jac, np.sum(res.fun**2), *res.jac.shape)
for v, var_pars_k in zip(data, data_pars):
self.make_results(v, res.x, var, var_pars_k, res.jac.shape,
err=err, corr=corr, partial_corr=partial_corr)
def _nm_single(self, data, p0, lb, ub, var):
def cost(p):
self.step += 1
if self._abort:
raise FitAbortException(f'Fit aborted by user')
return (_cost_scipy(p, data, var, data.para_keys) ** 2).sum()
with np.errstate(all='ignore'):
res = optimize.minimize(cost, p0, bounds=[(b1, b2) for (b1, b2) in zip(lb, ub)],
method='Nelder-Mead', options={'maxiter': 500 * len(p0)})
self.make_results(data, res.x, var, data.para_keys, (len(data), len(p0)))
def _nm_global(self, data, p0, lb, ub, var, data_pars):
def cost(p):
self.step += 1
if self._abort:
raise FitAbortException(f'Fit aborted by user')
return (_cost_scipy_glob(p, data, var, data_pars) ** 2).sum()
with np.errstate(all='ignore'):
res = optimize.minimize(cost, p0, bounds=[(b1, b2) for (b1, b2) in zip(lb, ub)],
method='Nelder-Mead', options={'maxiter': 500 * len(p0)})
for v, var_pars_k in zip(data, data_pars):
self.make_results(v, res.x, var, var_pars_k, (sum(len(d) for d in data), len(p0)))
def _odr_single(self, data, p0, var_pars):
odr_data = odr.Data(data.x, data.y)
def func(p, _):
self.step += 1
if self._abort:
raise FitAbortException(f'Fit aborted by user')
return _cost_odr(p, data, var_pars, data.para_keys)
odr_model = odr.Model(func)
corr, partial_corr, res = self._odr_fit(odr_data, odr_model, p0)
self.make_results(data, res.beta, var_pars, data.para_keys, (len(data), len(p0)),
err=res.sd_beta, corr=corr, partial_corr=partial_corr)
def _odr_fit(self, odr_data, odr_model, p0):
o = odr.ODR(odr_data, odr_model, beta0=p0)
res = o.run()
corr = res.cov_beta / (res.sd_beta[:, None] * res.sd_beta[None, :]) * res.res_var
try:
corr_inv = np.linalg.inv(corr)
corr_inv_diag = np.diag(np.sqrt(1 / np.diag(corr_inv)))
partial_corr = -1. * np.dot(np.dot(corr_inv_diag, corr_inv), corr_inv_diag) # Partial correlation matrix
partial_corr[np.diag_indices_from(partial_corr)] = 1.
except np.linalg.LinAlgError:
partial_corr = corr
return corr, partial_corr, res
def _odr_global(self, data, p0, var, data_pars):
def func(p, _):
self.step += 1
if self._abort:
raise FitAbortException(f'Fit aborted by user')
return _cost_odr_glob(p, data, var, data_pars)
x = []
y = []
for d in data:
x = np.r_[x, d.x]
y = np.r_[y, d.y]
odr_data = odr.Data(x, y)
odr_model = odr.Model(func)
corr, partial_corr, res = self._odr_fit(odr_data, odr_model, p0)
for v, var_pars_k in zip(data, data_pars):
self.make_results(v, res.beta, var, var_pars_k, (sum(len(d) for d in data), len(p0)),
err=res.sd_beta, corr=corr, partial_corr=partial_corr)
def make_results(
self,
data: Data,
p: list[float],
var_pars: list[str],
used_pars: list[str],
shape: tuple[int, int],
err: list[float] = None,
corr: np.ndarray = None,
partial_corr: np.ndarray = None,
):
if err is None:
err = [0] * len(p)
# update parameter values
for keys, p_value, err_value in zip(var_pars, p, err):
if keys in data.parameter.keys():
data.parameter[keys].scaled_value = p_value
data.parameter[keys].scaled_error = err_value
data.parameter[keys].namespace[keys] = data.parameter[keys].value
combinations = list(product(var_pars, var_pars))
actual_parameters = []
corr_idx = []
for i, p_i in enumerate(used_pars):
actual_parameters.append(data.parameter[p_i])
for j, p_j in enumerate(used_pars):
try:
# find the position of the parameter combinations
corr_idx.append(combinations.index((p_i, p_j)))
except ValueError:
pass
# reshape the correlation matrices
if corr is None or not corr_idx:
actual_corr = None
actual_pcorr = None
else:
indexes = np.unravel_index(corr_idx, corr.shape)
dim_one = int(np.sqrt(len(corr_idx)))
actual_corr = corr[indexes].reshape((dim_one, -1))
actual_pcorr = partial_corr[indexes].reshape((dim_one, -1))
idx = self.data.index(data)
model = data.get_model()
self.result[idx] = FitResultCreator.make_with_model(
model=model,
x_orig=data.x,
y_orig=data.y,
p=actual_parameters,
fun_kwargs=data.fun_kwargs,
we=data.we_string,
idx=data.idx,
nobs=shape[0],
nvar=shape[1],
corr=actual_corr,
pcorr=actual_pcorr,
data_mode=data.complex_type,
)
return self.result
def _calc_error(jac, chi, nobs, nvars):
# copy of scipy.curve_fit to calculate covariance
# noinspection PyTupleAssignmentBalance
try:
_, s, vt = la.svd(jac, full_matrices=False)
except ValueError as e:
# this may be issue #39: On entry to DGESSD parameter had an illegal value
# catch this exception and ignore error calculation
logger.error(f'Error calculation failed with {e.args}')
pcov = None
else:
threshold = EPS * max(jac.shape) * s[0]
s = s[s > threshold]
vt = vt[:s.size]
pcov = np.dot(vt.T / s**2, vt) * chi / (nobs - nvars)
if pcov is None:
_err = np.zeros(nvars)
corr = np.zeros((nvars, nvars))
else:
_err = np.sqrt(np.diag(pcov))
corr = pcov / (_err[:, None] * _err[None, :])
corr = corr.astype(np.float64)
try:
corr_inv = np.linalg.inv(corr)
corr_inv_diag = np.diag(np.sqrt(1 / np.diag(corr_inv)))
partial_corr = -1. * np.dot(np.dot(corr_inv_diag, corr_inv), corr_inv_diag) # Partial correlation matrix
partial_corr[np.diag_indices_from(partial_corr)] = 1.
except np.linalg.LinAlgError:
partial_corr = corr
return _err, corr, partial_corr