fit with global parameter

This commit is contained in:
Dominik Demuth 2023-08-24 16:19:09 +02:00
parent 88a32ea7fd
commit 5d43ccb05d
4 changed files with 180 additions and 166 deletions

View File

@ -1,7 +1,7 @@
import numpy as np
from .model import Model
from .parameter import Parameters
from .parameter import Parameters, Parameter
class Data(object):
@ -16,7 +16,7 @@ class Data(object):
self.model = None
self.minimizer = None
self.parameter = Parameters()
self.para_keys = None
self.para_keys: list = []
self.fun_kwargs = {}
def __len__(self):
@ -123,6 +123,18 @@ class Data(object):
else:
return [p.value for p in self.minimizer.parameters[self.parameter]]
def replace_parameter(self, key: str, parameter: Parameter) -> None:
tobereplaced = None
for k, v in self.parameter.items():
if v.name == parameter.name:
tobereplaced = k
break
if tobereplaced is None:
raise KeyError(f'Global parameter {key} not found in list of parameters')
self.para_keys[self.para_keys.index(tobereplaced)] = key
self.parameter.replace_parameter(tobereplaced, key, parameter)
def cost(self, p):
"""
Cost function :math:`y-f(p, x)`

View File

@ -21,6 +21,32 @@ class FitAbortException(Exception):
pass
# COST FUNCTIONS: f(x) - y (least_square, minimize), and f(x) (ODR)
def _cost_scipy_glob(p: list[float], data: list[Data], varpars: list[str], used_pars: list[list[str]]):
# replace values
for keys, values in zip(varpars, p):
for data_i in data:
if keys in data_i.parameter:
data_i.parameter[keys].scaled_value = values
data_i.parameter[keys].namespace[keys] = data_i.parameter[keys].value
r = []
# unpack parameter and calculate y values and concatenate all
for values, p_idx in zip(data, used_pars):
actual_parameters = [values.parameter[keys].value for keys in p_idx]
r = np.r_[r, values.cost(actual_parameters)]
return r
def _cost_scipy(p, data, varpars, used_pars):
for keys, values in zip(varpars, p):
data.parameter[keys].scaled_value = values
data.parameter[keys].namespace[keys] = data.parameter[keys].value
actual_parameters = [data.parameter[keys].value for keys in used_pars]
return data.cost(actual_parameters)
class FitRoutine(object):
def __init__(self, mode='lsq'):
self._fitmethod = mode
@ -101,7 +127,8 @@ class FitRoutine(object):
for v in self.data:
linked_sender[v] = set()
self.parameter.update(v.parameter.copy())
for k, p_i in v.parameter.items():
self.parameter.add_parameter(k, p_i.copy())
# set temporary model
if v.model is None:
@ -111,6 +138,8 @@ class FitRoutine(object):
# register model
if v.model not in _found_models:
_found_models[v.model] = []
for k, p in v.model.parameter.items():
self.parameter.add_parameter(k, p)
# m_param = v.model.parameter.copy()
# self.parameter.update(m_param)
#
@ -120,38 +149,39 @@ class FitRoutine(object):
linked_sender[v.model] = set()
linked_parameter = {}
for par, par_parm, repl, repl_par in self.linked:
if isinstance(par, Data):
if isinstance(repl, Data):
linked_parameter[par.para_keys[par_parm]] = repl.para_keys[repl_par]
else:
linked_parameter[par.para_keys[par_parm]] = repl.global_parameter[repl_par]
else:
if isinstance(repl, Data):
par.global_parameter[par_parm] = repl.para_keys[repl_par]
else:
par.global_parameter[par_parm] = repl.global_parameter[repl_par]
linked_sender[repl].add(par)
linked_sender[par].add(repl)
print(_found_models)
# for par, par_parm, repl, repl_par in self.linked:
# if isinstance(par, Data):
# if isinstance(repl, Data):
# linked_parameter[par.para_keys[par_parm]] = repl.para_keys[repl_par]
# else:
# linked_parameter[par.para_keys[par_parm]] = repl.parameter[repl_par]
#
# else:
# if isinstance(repl, Data):
# par.global_parameter[par_parm] = repl.para_keys[repl_par]
# else:
# par.global_parameter[par_parm] = repl.global_parameter[repl_par]
#
# linked_sender[repl].add(par)
# linked_sender[par].add(repl)
for mm, m_data in _found_models.items():
if mm.global_parameter:
# print('has global', mm.parameter)
if mm.parameter:
for dd in m_data:
linked_sender[mm].add(dd)
linked_sender[dd].add(mm)
coupled_data = []
visited_data = []
# print('linked', linked_sender)
for s in linked_sender.keys():
if s in visited_data:
continue
sub_graph = []
self.find_paths(s, linked_sender, sub_graph, visited_data)
if sub_graph:
# print('sub', sub_graph)
coupled_data.append(sub_graph)
return coupled_data, linked_parameter
@ -177,6 +207,8 @@ class FitRoutine(object):
fit_groups, linked_parameter = self.prepare_links()
# print(fit_groups, self.linked)
for data_groups in fit_groups:
if len(data_groups) == 1 and not self.linked:
data = data_groups[0]
@ -210,7 +242,6 @@ class FitRoutine(object):
return self.result
def _prep_data(self, data):
if data.get_model() is None:
data._model = self.fit_model
self._no_own_model.append(data)
@ -237,22 +268,29 @@ class FitRoutine(object):
var = []
data_pars = []
# print(data_group)
# loopy-loop over data that belong to one fit (linked or global)
for data in data_group:
actual_pars = []
for i, (p_k, v_k) in enumerate(data.parameter.items()):
p_k_used = p_k
v_k_used = v_k
# is parameter replaced by global parameter?
for k, v in data.model.parameter.items():
data.replace_parameter(k, v)
# is parameter replaced by global parameter?
if i in data.model.global_parameter:
p_k_used = data.model.global_parameter[i]
v_k_used = self.parameter[p_k_used]
actual_pars = []
for i, p_k in enumerate(data.para_keys):
# print(i, p_k)
p_k_used = p_k
v_k_used = data.parameter[p_k]
# if i in data.model.parameter:
# p_k_used = data.model.parameter[i]
# v_k_used = self.parameter[p_k_used]
# data.parameter.add_parameter(i, data.model.parameter[i])
# links trump global parameter
if p_k_used in linked:
p_k_used = linked[p_k_used]
v_k_used = self.parameter[p_k_used]
# if p_k_used in linked:
# p_k_used = linked[p_k_used]
# v_k_used = self.parameter[p_k_used]
actual_pars.append(p_k_used)
# parameter is variable and was not found before as shared parameter
@ -262,6 +300,8 @@ class FitRoutine(object):
ub.append(v_k_used.ub / v_k_used.scale)
var.append(p_k_used)
# print('aloha, ', actual_pars)
data_pars.append(actual_pars)
return data_pars, p0, lb, ub, var
@ -272,16 +312,8 @@ class FitRoutine(object):
self._no_own_model = []
# COST FUNCTIONS: f(x) - y (least_square, minimize), and f(x) (ODR)
def __cost_scipy(self, p, data, varpars, used_pars):
for keys, values in zip(varpars, p):
data.parameter[keys].scaled_value = values
data.parameter[keys].namespace[keys] = data.parameter[keys].value
actual_parameters = [data.parameter[keys].value for keys in used_pars]
return data.cost(actual_parameters)
def __cost_odr(self, p, data, varpars, used_pars):
def __cost_odr(self, p: list[float], data: Data, varpars: list[str], used_pars: list[str]):
for keys, values in zip(varpars, p):
self.parameter[keys].scaled_value = values
@ -289,19 +321,6 @@ class FitRoutine(object):
return data.func(actual_parameters, data.x)
def __cost_scipy_glob(self, p, data, varpars, used_pars):
# replace values
for keys, values in zip(varpars, p):
self.parameter[keys].scaled_value = values
r = []
# unpack parameter and calculate y values and concatenate all
for values, p_idx in zip(data, used_pars):
actual_parameters = [self.parameter[keys].value for keys in p_idx]
r = np.r_[r, values.cost(actual_parameters)]
return r
def __cost_odr_glob(self, p, data, varpars, used_pars):
# replace values
for keys, values in zip(varpars, p):
@ -323,7 +342,7 @@ class FitRoutine(object):
if self._abort:
raise FitAbortException(f'Fit aborted by user')
return self.__cost_scipy(p, data, var, data.para_keys)
return _cost_scipy(p, data, var, data.para_keys)
with np.errstate(all='ignore'):
res = optimize.least_squares(cost, p0, bounds=(lb, ub), max_nfev=500 * len(p0))
@ -337,7 +356,7 @@ class FitRoutine(object):
self.step += 1
if self._abort:
raise FitAbortException(f'Fit aborted by user')
return self.__cost_scipy_glob(p, data, var, data_pars)
return _cost_scipy_glob(p, data, var, data_pars)
with np.errstate(all='ignore'):
res = optimize.least_squares(cost, p0, bounds=(lb, ub), max_nfev=500 * len(p0))
@ -352,7 +371,7 @@ class FitRoutine(object):
self.step += 1
if self._abort:
raise FitAbortException(f'Fit aborted by user')
return (self.__cost_scipy(p, data, var, data.para_keys)**2).sum()
return (_cost_scipy(p, data, var, data.para_keys) ** 2).sum()
with np.errstate(all='ignore'):
res = optimize.minimize(cost, p0, bounds=[(b1, b2) for (b1, b2) in zip(lb, ub)],
@ -365,7 +384,7 @@ class FitRoutine(object):
self.step += 1
if self._abort:
raise FitAbortException(f'Fit aborted by user')
return (self.__cost_scipy_glob(p, data, var, data_pars)**2).sum()
return (_cost_scipy_glob(p, data, var, data_pars) ** 2).sum()
with np.errstate(all='ignore'):
res = optimize.minimize(cost, p0, bounds=[(b1, b2) for (b1, b2) in zip(lb, ub)],
@ -438,12 +457,13 @@ class FitRoutine(object):
if err is None:
err = [0] * len(p)
print(p, var_pars, used_pars)
# update parameter values
for keys, p_value, err_value in zip(var_pars, p, err):
data.parameter[keys].scaled_value = p_value
data.parameter[keys].scaled_error = err_value
data.parameter[keys].namespace[keys] = data.parameter[keys].value
if keys in data.parameter:
data.parameter[keys].scaled_value = p_value
data.parameter[keys].scaled_error = err_value
data.parameter[keys].namespace[keys] = data.parameter[keys].value
combinations = list(product(var_pars, var_pars))
actual_parameters = []
@ -511,3 +531,4 @@ class FitRoutine(object):
partial_corr = corr
return _err, corr, partial_corr

View File

@ -25,7 +25,6 @@ class Model(object):
self.ub = [i if i is not None else inf for i in self.ub]
self.parameter = Parameters()
self.global_parameter = {}
self.is_complex = None
self._complex_part = False

View File

@ -1,99 +1,84 @@
from __future__ import annotations
from numbers import Number
import re
from itertools import count
from re import sub
from io import StringIO
import numpy as np
class Parameters(dict):
count = count()
parameter_counter = count()
namespace: dict = {}
def __init__(self):
super().__init__()
self._namespace = {}
self._mapping: dict = {}
def __str__(self):
return 'Parameters:\n' + '\n'.join([str(k)+': '+str(v) for k, v in self.items()])
def __str__(self) -> str:
return 'Parameters:\n' + '\n'.join([f'{k}: {v}' for k, v in self.items()])
def __getitem__(self, item):
if isinstance(item, (list, tuple, np.ndarray)):
values = []
for item_i in item:
values.append(super().__getitem__(item_i))
return values
def __getitem__(self, item) -> Parameter:
if item in self._mapping:
return super().__getitem__(self._mapping[item])
else:
return super().__getitem__(item)
@staticmethod
def _prep_bounds(val: list, p_len: int) -> list:
# helper function to ensure that bounds and variable are of parameter shape
if isinstance(val, (Number, bool)) or val is None:
return [val] * p_len
def __setitem__(self, key, value):
super().__setitem__(key, value)
elif len(val) == p_len:
return val
def add(self,
name: str,
value: str | float | int = None,
*,
var: bool = True,
lb: float = -np.inf, ub: float = np.inf) -> Parameter:
elif len(val) == 1:
return [val[0]] * p_len
par = Parameter(name=name, value=value, var=var, lb=lb, ub=ub)
else:
raise ValueError(f'Input {val} has wrong dimensions')
key = f'p{next(Parameters.parameter_counter)}'
def add_parameter(self, param: float | list[float], var=None, lb=None, ub=None, names=None) -> list:
if isinstance(param, (float, int)):
param = [param]
self.add_parameter(key, par)
p_len = len(param)
return par
# make list if only single value is given
var = self._prep_bounds(var, p_len)
lb = self._prep_bounds(lb, p_len)
ub = self._prep_bounds(ub, p_len)
def add_parameter(self, key: str, parameter: Parameter):
self._mapping[parameter.name] = key
self[key] = parameter
new_keys = []
for i in range(p_len):
new_idx = next(self.count)
if names is not None:
key = names[i]
else:
key = f'_p{new_idx}'
new_keys.append(key)
self[key] = Parameter(key, param[i], var=var[i], lb=lb[i], ub=ub[i])
self._mapping[parameter.name] = key
return new_keys
self[key] = parameter
parameter.eval_allowed = False
self.namespace[key] = parameter.value
parameter.namespace = self.namespace
parameter.eval_allowed = True
def add(self, name: str | Parameter, value: str | float | int = None, *, var=True, lb=-np.inf, ub=np.inf):
if isinstance(name, Parameter):
par = name
name = par.name
else:
par = Parameter(name=name, value=value, var=var, lb=lb, ub=ub)
# look for variables in expression and replace with valid names
for p in self.values():
if p._expr is not None:
expression = p._expr
for n, k in self._mapping.items():
expression = re.sub(re.escape(n), k, expression)
name = _prepare_namespace_string(name)
p._expr = expression
self[name] = par
par.eval_allowed = False
self._namespace[name] = par.value
par.namespace = self._namespace
par.eval_allowed = True
def replace_parameter(self, key_out: str, key_in: str, parameter: Parameter):
for k, v in self._mapping.items():
if v == key_out:
self._mapping[k] = key_in
break
def copy(self) -> Parameters:
new_para_dict = Parameters()
for k, v in self.items():
new_para = v.copy()
new_para_dict.add(new_para)
self.add_parameter(key_in, parameter)
del self.namespace[key_out]
# if len(p) == 0:
# return p
# max_k = int(max(p.keys(), key=lambda x: int(k[2:]))[2:])
# c = next(p.count)
# while c < max_k:
# c = next(p.count)
return new_para_dict
for p in self.values():
try:
p.value
except NameError:
expression = p._expr_disp
for n, k in self._mapping.items():
expression = re.sub(re.escape(n), k, expression)
p._expr = expression
def get_state(self):
return {k: v.get_state() for k, v in self.items()}
@ -105,25 +90,23 @@ class Parameter:
"""
def __init__(self, name: str, value: float | str, var: bool = True, lb: float = -np.inf, ub: float = np.inf):
self.scale = 1
self.var = bool(var) if var is not None else True
self.error = None if self.var is False else 0.0
self.name = name
self.function = ''
self.lb = lb if lb is not None else -np.inf
self.ub = ub if ub is not None else np.inf
self._value = None
self.namespace = None
self.eval_allowed = True
self._expr = None
self._expr_disp = None
self._value: float | None = None
self.var: bool = bool(var) if var is not None else True
self.error: None | float = None if self.var is False else 0.0
self.name: str = name
self.function: str = ""
self.lb: None | float = lb if lb is not None else -np.inf
self.ub: float | None = ub if ub is not None else np.inf
self.namespace: dict = {}
self.eval_allowed: bool = True
self._expr: None | str = None
self._expr_disp: None | str = None
if isinstance(value, str):
self._expr_disp = value
value = _prepare_namespace_string(value)
self._expr = value
self.var = False
else:
if self.lb <= value <= self.ub:
self._value = value
@ -140,23 +123,24 @@ class Parameter:
self.scale = 1.
def __str__(self) -> str:
start = ''
start = StringIO()
if self.name:
if self.function:
start = f'{self.name} ({self.function}): '
start.write(f"{self.name} ({self.function}): ")
else:
start = self.name + ': '
start.write(self.name)
start.write(": ")
if self.var:
return start + f'{self.value:.4g} +/- {self.error:.4g}, init={self.init_val}'
start.write(f"{self.value:.4g} +/- {self.error:.4g}, init={self.init_val}")
else:
ret_string = start + f'{self.value:}'
start.write(f"{self.value:}")
if self._expr is None:
ret_string += ' (fixed)'
start.write(" (fixed)")
else:
ret_string += f' (calc: {self._expr_disp})'
start.write(f" (calc: {self._expr_disp})")
return ret_string
return start.getvalue()
def __add__(self, other: Parameter | float | int) -> float:
if isinstance(other, (float, int)):
@ -168,11 +152,11 @@ class Parameter:
return self.__add__(other)
@property
def scaled_value(self):
def scaled_value(self) -> float:
return self.value / self.scale
@scaled_value.setter
def scaled_value(self, value: float):
def scaled_value(self, value: float) -> None:
self._value = value * self.scale
@property
@ -183,22 +167,21 @@ class Parameter:
return self._value
@property
def scaled_error(self):
def scaled_error(self) -> None | float:
if self.error is None:
return self.error
else:
return self.error / self.scale
@scaled_error.setter
def scaled_error(self, value):
def scaled_error(self, value) -> None:
self.error = value * self.scale
def get_state(self):
def get_state(self) -> dict:
return {slot: getattr(self, slot) for slot in self.__slots__}
@staticmethod
def set_state(state: dict):
def set_state(state: dict) -> Parameter:
par = Parameter(state.pop('value'))
for k, v in state.items():
setattr(par, k, v)
@ -206,10 +189,10 @@ class Parameter:
return par
@property
def full_name(self):
def full_name(self) -> str:
name = self.name
if self.function:
name += ' (' + self.function + ')'
name += f" ({self.function})"
return name
@ -219,9 +202,8 @@ class Parameter:
else:
val = self._value
para_copy = Parameter(name=self.name, value=val, var=self.var, lb=self.lb, ub=self.ub)
para_copy._expr = self._expr
para_copy.namespace = self.namespace
return para_copy
def _prepare_namespace_string(expr: str) -> str:
return sub('[\(\)\{.\}\\\]', '_', expr)