nmreval/src/gui_qt/main/management.py

1320 lines
45 KiB
Python
Raw Normal View History

2022-03-24 16:35:10 +00:00
from __future__ import annotations
2022-03-08 09:27:40 +00:00
import pathlib
import re
import uuid
import numpy as np
2022-10-20 15:23:15 +00:00
from nmreval.fit import data as fit_d
from nmreval.fit.model import Model
from nmreval.fit.result import FitResult
from nmreval.fit.minimizer import FitRoutine
from nmreval.lib.colors import available_cycles
from nmreval.lib.logger import logger
2022-10-20 15:23:15 +00:00
from nmreval.math.interpol import interpolate
from nmreval.math.logfourier import logft
from nmreval.math.smooth import smooth
from nmreval.nmr.relaxation import Relaxation
2022-03-08 09:27:40 +00:00
from ..Qt import QtCore, QtWidgets
2023-07-12 18:48:28 +00:00
from ..lib import Relations
2022-03-08 09:27:40 +00:00
from ..lib.undos import *
from ..data.container import *
from ..io.filereaders import QFileReader
from ..lib.utils import busy_cursor
class GraphSignals(QtCore.QObject):
valueChanged = QtCore.pyqtSignal()
class GraphDict(OrderedDict):
def __init__(self, data):
super().__init__()
self._data = data
self.signals = GraphSignals()
self.valueChanged = self.signals.valueChanged
def __setitem__(self, key, value):
super().__setitem__(key, value)
self.valueChanged.emit()
def __delitem__(self, key):
super().__delitem__(key)
self.valueChanged.emit()
def tree(self, key_only=False):
ret_val = OrderedDict()
for k, g in self.items():
if key_only:
ret_val[k] = (g.title, [(s, self._data[s].name) for s in g.sets])
else:
ret_val[(k, g.title)] = [(s, self._data[s].name) for s in g.sets]
return ret_val
def list(self):
return [(k, v.title) for k, v in self.items()]
def active(self, key: str, return_val: str = 'both'):
if not key:
2022-03-08 09:27:40 +00:00
return []
else:
if return_val == 'both':
return [(self._data[i].id, self._data[i].name) for i in self[key]]
elif return_val == 'id':
return [self._data[i].id for i in self[key]]
elif return_val == 'name':
return [self._data[i].name for i in self[key]]
else:
raise ValueError(f'return_val got wrong value {return_val!r}')
2022-03-08 09:27:40 +00:00
def current_sets(self, key: str):
if key:
return [(self._data[i].id, self._data[i].name) for i in self[key].sets]
else:
return []
class UpperManagement(QtCore.QObject):
newGraph = QtCore.pyqtSignal()
restoreGraph = QtCore.pyqtSignal(str)
deleteGraph = QtCore.pyqtSignal(str)
newData = QtCore.pyqtSignal([list, str], [list, str, bool])
deleteData = QtCore.pyqtSignal(list)
2022-03-08 09:27:40 +00:00
dataChanged = QtCore.pyqtSignal(str)
fitFinished = QtCore.pyqtSignal(list)
stopFit = QtCore.pyqtSignal()
properties_collected = QtCore.pyqtSignal(dict)
unset_state = QtCore.pyqtSignal(list)
_colors = cycle(TUColors)
2022-03-08 09:27:40 +00:00
_actions = {
'ls': (ShiftCommand, 'Left shift'),
'cut': (CutCommand, 'Cut'),
'ap': (ApodizationCommand, 'Apodization'),
'zf': (ZerofillCommand, 'Zerofill'),
2023-01-07 18:13:13 +00:00
'ph': (PhaseCommand, 'Phase correction'),
'autoph': (AutophaseCommand, 'Autophase'),
2022-03-08 09:27:40 +00:00
'bl': (BaselineCommand, 'Baseline'),
'bls': (BaselineSplineCommand, 'Baseline'),
'ft': (FourierCommand, 'Fourier'),
'ft_pake': 'FT (de-paked)',
'sort': (SortCommand, 'Sort'),
'norm': (NormCommand, 'Normalize'),
2023-01-07 18:13:13 +00:00
'center': (CenterCommand, 'Center on max'),
2022-03-08 09:27:40 +00:00
}
def __init__(self, window):
super().__init__()
self._fit_active = False
self.fit_thread = None
self.fit_worker = None
self.counter = 0
self.data = OrderedDict()
self.window = window
self.current_graph = None
2022-03-08 09:27:40 +00:00
self.graphs = GraphDict(self.data)
self.namespace = None
self.undostack = QtWidgets.QUndoStack()
self.deleteData.connect(self.plot_from_graph)
self._filereader = None
2022-03-08 09:27:40 +00:00
def __setitem__(self, key: str, value, **kwargs):
if isinstance(value, ExperimentContainer):
item = value
item.id = key
elif isinstance(value, FitResult):
item = FitContainer(key, value, manager=self, **kwargs)
elif isinstance(value, Signal):
item = SignalContainer(key, value, manager=self, **kwargs)
else:
item = PointContainer(key, value, manager=self, **kwargs)
item.dataChanged.connect(lambda x: self.dataChanged.emit(x))
self.data[key] = item
def __getitem__(self, item):
return self.data[item]
def __contains__(self, item):
return item in self.data
def __iter__(self):
for k, v in self.data.items():
yield k, v
@property
def active_sets(self):
return self.graphs.active(self.current_graph)
@property
def active_id(self):
return self.graphs.active(self.current_graph, return_val='id')
def get_attributes(self, graph_id: str, attr: str) -> dict[str, Any]:
return {self.data[i].id: getattr(self.data[i], attr) for i in self.graphs[graph_id].sets}
2022-03-08 09:27:40 +00:00
def add(self, data, **kwargs):
_id = str(uuid.uuid4())
self.__setitem__(_id, data, **kwargs)
return _id
def load_files(self, fname: list[str], new_plot: str = None):
if self._filereader is None:
self._filereader = QFileReader(manager=self)
ret_dic = self._filereader.readfiles(fname)
2022-03-08 09:27:40 +00:00
self.add_new_data(ret_dic, new_plot)
def _load_session(self, sets: dict, graphs: dict):
sid = self._load_sets(sets)
for g in graphs:
_ = g.pop('id')
graph = QGraphWindow.set_state(g)
self.graphs[graph.id] = graph
self.restoreGraph.emit(graph.id)
children = [sid[c] for c in g['children']]
active = [sid[c] for c in g['active']]
inactive = [k for k in children if k not in active]
self.newData.emit(children, graph.id)
graph.active = active
graph.listWidget.blockSignals(True)
for i, l in enumerate(g['in_legend']):
try:
graph.listWidget.item(i).setCheckState(QtCore.Qt.Checked if l else QtCore.Qt.Unchecked)
except AttributeError:
pass
2022-03-08 09:27:40 +00:00
graph.listWidget.blockSignals(False)
# set unchecked in tree and hide/show in plot
self.unset_state.emit(inactive)
self.change_visibility(active, inactive)
def _load_sets(self, sets: dict) -> dict:
sid = {}
for _id, (data, opts) in sets.items():
if isinstance(data, FitResult):
# for fits, _id belongs to the fitted data, not the fit
src_id = data.idx
if src_id in sid:
new_id = self.add(data, src=sid[src_id])
self.data[sid[src_id]]._fits.append(new_id)
else:
new_id = self.add(data)
else:
new_id = self.add(data)
sid[_id] = new_id
for m in ['real', 'imag']:
if m in opts:
self.data[new_id].setSymbol(**opts[m][0], mode=m)
self.data[new_id].setLine(**opts[m][1], mode=m)
return sid
def add_new_data(self, data: list, gid: str):
sid = []
for d in data:
if isinstance(d, tuple):
if len(d) == 2:
self._load_session(d[0], graphs=d[1])
else:
sid.extend(list(self._load_sets(d[0]).values()))
else:
sid.append(self.add(d))
if sid:
gid = '' if not gid else gid
self.newData.emit(sid, gid)
def plots_to_graph(self, plotkeys: list, gid: str):
self.graphs[gid].add(plotkeys, [self.data[k].plots for k in plotkeys])
for k in plotkeys:
self.data[k].graph = gid
@QtCore.pyqtSlot(list)
def plot_from_graph(self, key: list[str]):
sort_graph = {}
for sid in key:
v = self.data[sid].graph
if v not in sort_graph:
sort_graph[v] = []
sort_graph[v].append(sid)
for gid, sets in sort_graph.items():
self.graphs[gid].remove(sets)
2022-03-08 09:27:40 +00:00
@QtCore.pyqtSlot(list, str, str)
def move_sets(self, sets: list, dest: str, src: (str|list), pos: int = -1):
2022-03-08 09:27:40 +00:00
if isinstance(src, str):
src = [src]*len(sets)
for graph_id, set_id in zip(src, sets):
# move all plots to the same graph
if graph_id != dest:
self.graphs[graph_id].remove(set_id)
self.plots_to_graph([set_id], dest)
# move to correct position
self.graphs[dest].move_sets(sets, pos)
def select_window(self, gid: str):
for key, plot in self.graphs.items():
if key == gid:
self.window.area.setActiveSubWindow(plot.parent())
2022-03-08 09:27:40 +00:00
@QtCore.pyqtSlot()
@QtCore.pyqtSlot(list, str)
def copy_sets(self, sets: list = None, src: str = None):
if sets is None:
sets = self.graphs[self.current_graph].active[:]
if src is None:
src = self.current_graph
new_ids = []
for s in sets:
copy_of_s = self.data[s].copy(full=True)
copy_of_s.id = str(uuid.uuid4())
new_ids.append(copy_of_s.id)
self.data[copy_of_s.id] = copy_of_s
self.newData.emit(new_ids, src)
return new_ids
@QtCore.pyqtSlot(list)
@QtCore.pyqtSlot(str)
@QtCore.pyqtSlot()
2022-03-08 09:27:40 +00:00
def delete_sets(self, rm_sets: list = None):
rm_graphs = []
if rm_sets is None:
rm_sets = self.graphs[self.current_graph].sets + [self.current_graph]
self.undostack.beginMacro('Delete')
rm_set_by_graph = {}
2022-03-08 09:27:40 +00:00
for k in rm_sets[::-1]:
if k in self.data:
parent_graph = self.data[k].graph
if parent_graph not in rm_set_by_graph:
rm_set_by_graph[parent_graph] = []
rm_set_by_graph[parent_graph].append(k)
elif k in self.graphs:
2022-03-08 09:27:40 +00:00
rm_graphs.append(k)
else:
logger.warning(f'delete_sets: {k} is not in data or graph found')
2022-03-08 09:27:40 +00:00
for gid, sid_list in rm_set_by_graph.items():
cmd = DeleteCommand(self.data, sid_list, self.graphs, gid, self.newData, self.deleteData)
self.undostack.push(cmd)
2022-03-08 09:27:40 +00:00
for k in rm_graphs:
cmd = DeleteGraphCommand(self.graphs, k, self.restoreGraph, self.deleteGraph)
self.undostack.push(cmd)
self.undostack.endMacro()
def delete_graph(self, gid):
self.delete_sets(self.graphs[gid].sets + [gid])
2022-03-08 09:27:40 +00:00
@QtCore.pyqtSlot()
def cat(self, src_sets=None):
joined = None
group_set = set()
name_set = set()
value_set = set()
if src_sets is None:
2022-03-24 19:24:28 +00:00
if self.current_graph:
src_sets = self.graphs[self.current_graph].active
else:
return
2022-03-08 09:27:40 +00:00
for sid in src_sets:
data_i = self.data[sid]
if joined is None:
joined = data_i.copy()
else:
2023-06-19 16:15:25 +00:00
joined.append(data_i.data.x, data_i.data.y, y_err=data_i.data.y_err, mask=data_i.data.mask)
2022-03-08 09:27:40 +00:00
name_set.add(data_i.name)
group_set.add(data_i.group)
value_set.add(data_i.value)
if joined is not None:
joined.group = '+'.join(group_set)
joined.name = '+'.join(name_set)
2022-03-08 09:27:40 +00:00
if len(value_set) == 1:
joined.value = value_set.pop()
else:
joined.value = 0.0
self.newData.emit([self.add(joined)], self.current_graph)
def get_data(self, sid: str, xy_only: bool = False):
2022-03-08 09:27:40 +00:00
"""
Return data for a given id.
Return value is tuple of [x, y, y_err] and mask if xy_only is False, [x, y] if true.
"""
d = self.data[sid]
if xy_only:
return [d.x, d.y]
return [d.data.x, d.data.y, d.data.y_err], d.data.mask.data
def change_visibility(self, selected: list, deselected: list):
"""Change status of list of ids after status change in datawidget"""
for s in selected:
self.graphs[self.data[s].graph].show_item([s])
for d in deselected:
self.graphs[self.data[d].graph].hide_item([d])
@QtCore.pyqtSlot(str, str)
def change_keys(self, identifier: str, name: str):
if identifier in self.data:
d = self.data[identifier]
d.name = name
self.graphs[d.graph].update_legend(identifier, name)
elif identifier in self.graphs:
self.graphs[identifier].title = name
else:
raise KeyError('Unknown ID ' + str(identifier))
@QtCore.pyqtSlot(str, tuple)
def apply(self, func: str, arguments: tuple):
# undos, names displayed by undo action
cmd, cmd_text = self._actions[func]
self.undostack.beginMacro(cmd_text)
for sid in self.graphs[self.current_graph]:
single_undo = cmd(self.data[sid], *arguments)
self.undostack.push(single_undo)
self.undostack.endMacro()
def cut(self):
2022-03-24 19:24:28 +00:00
if self.current_graph:
xlim, _ = self.graphs[self.current_graph].ranges
self.apply('cut', xlim)
2022-03-08 09:27:40 +00:00
@QtCore.pyqtSlot()
def unmask(self):
for d in self.data.values():
d.mask = np.ones_like(d.mask, dtype=bool)
2023-09-07 17:52:53 +00:00
def prepare_fit(self, parameter: dict, links: list, fit_options: dict) -> bool:
2022-03-08 09:27:40 +00:00
if self._fit_active:
2023-09-07 17:52:53 +00:00
return False
2022-03-08 09:27:40 +00:00
self.__fit_options = (parameter, links, fit_options)
self.fitter = FitRoutine()
2022-03-08 09:27:40 +00:00
models = {}
fit_limits = fit_options['limits']
fit_mode = fit_options['fit_mode']
we_option = fit_options['we']
2022-03-08 09:27:40 +00:00
2023-09-07 17:52:53 +00:00
self.fitter.fitmethod = fit_mode
2022-03-08 09:27:40 +00:00
2023-09-07 17:52:53 +00:00
# all-encompassing error catch
try:
for model_id, model_p in parameter.items():
m = model_p['func']
2023-09-07 17:52:53 +00:00
models[model_id] = m
2022-03-08 09:27:40 +00:00
2023-09-07 17:52:53 +00:00
m_complex = model_p['complex']
2023-09-07 17:52:53 +00:00
# sets are not in active order but in order they first appeared in fit dialog
# iterate over order of set id in active order and access parameter inside loop
# instead of directly looping
try:
list_ids = list(model_p['data_parameter'].keys())
set_order = [self.active_id.index(i) for i in list_ids]
except ValueError as e:
raise Exception('Getting order failed') from e
2023-09-07 17:52:53 +00:00
for pos in set_order:
set_id = list_ids[pos]
2022-03-08 09:27:40 +00:00
2023-09-07 17:52:53 +00:00
data_i = self.data[set_id]
set_params = model_p['data_parameter'][set_id]
try:
data_i = self.data[set_id]
except KeyError as e:
raise KeyError(f'{set_id} not found') from e
try:
set_params = model_p['parameter'][set_id]
except KeyError as e:
raise KeyError(f'No parameter found for {set_id}') from e
2022-03-08 09:27:40 +00:00
2023-09-07 17:52:53 +00:00
if we_option.lower() == 'deltay':
we = data_i.y_err**2
else:
we = we_option
2022-03-08 09:27:40 +00:00
2023-09-07 17:52:53 +00:00
if m_complex is None or m_complex == 1:
_y = data_i.y.real
elif m_complex == 2 and np.iscomplexobj(data_i.y):
_y = data_i.y.imag
else:
_y = data_i.y
2022-03-08 09:27:40 +00:00
2023-09-07 17:52:53 +00:00
_x = data_i.x
2022-03-08 09:27:40 +00:00
2023-09-07 17:52:53 +00:00
if fit_limits == 'none':
inside = slice(None)
elif fit_limits == 'x':
x_lim, _ = self.graphs[self.current_graph].range
2023-09-07 17:52:53 +00:00
inside = np.where((_x >= x_lim[0]) & (_x <= x_lim[1]))
else:
inside = np.where((_x >= fit_limits[0]) & (_x <= fit_limits[1]))
try:
if isinstance(we, str):
d = fit_d.Data(_x[inside], _y[inside], we=we, idx=set_id)
else:
d = fit_d.Data(_x[inside], _y[inside], we=we[inside], idx=set_id)
except Exception as e:
raise Exception(f'Setting data failed for {set_id}')
2023-09-07 17:52:53 +00:00
d.set_model(m)
try:
d.set_parameter(set_params[0], fun_kwargs=set_params[1])
except Exception as e:
raise Exception('Setting parameter failed') from e
2022-03-08 09:27:40 +00:00
2023-09-07 17:52:53 +00:00
self.fitter.add_data(d)
2022-03-08 09:27:40 +00:00
2023-09-07 17:52:53 +00:00
for links_i in links:
self.fitter.set_link_parameter((models[links_i[0]], links_i[1]),
(models[links_i[2]], links_i[3]))
return True
2022-03-08 09:27:40 +00:00
2023-09-07 17:52:53 +00:00
except Exception as e:
logger.error('Fit preparation failed', *e.args)
QtWidgets.QMessageBox.warning(QtWidgets.QWidget(),
'Fit prep failed',
f'Fit preparation failed:\n'
'Message:\n'
f'{e.args}:\n')
2023-09-07 17:52:53 +00:00
return False
2022-03-08 09:27:40 +00:00
2023-09-07 17:52:53 +00:00
def start_fit(self):
2022-03-08 09:27:40 +00:00
with busy_cursor():
2023-09-07 17:52:53 +00:00
self.fit_worker = FitWorker(self.fitter)
2022-03-08 09:27:40 +00:00
self.fit_thread = QtCore.QThread()
self.fit_worker.moveToThread(self.fit_thread)
self.fit_thread.started.connect(self.fit_worker.run)
self.fit_worker.finished.connect(self.end_fit)
self.fit_worker.finished.connect(self.fit_thread.quit)
self.fit_worker.finished.connect(self.fit_worker.deleteLater)
self.fit_thread.finished.connect(self.fit_thread.deleteLater)
self.stopFit.connect(lambda: self.fit_worker.fitter.abort())
self.fit_thread.start()
@QtCore.pyqtSlot(list, bool)
def end_fit(self, result: list, success: bool):
if success:
logger.info('Successful fit')
2022-03-08 09:27:40 +00:00
self.fitFinished.emit(result)
else:
e = result[0]
logger.exception(e, exc_info=True)
2022-03-08 09:27:40 +00:00
QtWidgets.QMessageBox.warning(QtWidgets.QWidget(), 'Fit failed',
f'Fit kaput with exception: \n\n{e!r}')
2022-03-08 09:27:40 +00:00
self.fitFinished.emit([])
self._fit_active = False
@QtCore.pyqtSlot(dict)
def redo_fits(self, res: dict):
models = self.__fit_options[0]
for single_model, model_args in models.items():
parameter = model_args['parameter']
for set_id, set_parameter in parameter.items():
new_values = [v.value for v in res[set_id].parameter.values()]
parameter[set_id] = (new_values, set_parameter[1])
2023-09-07 17:52:53 +00:00
if self.prepare_fit(*self.__fit_options):
self.start_fit()
2022-03-08 09:27:40 +00:00
def make_fits(self, res: dict, opts: list, param_graph: str, show_fit: bool, parts: bool, extrapolate: list) -> None:
"""
Args:
res: key is that of original data, value is FitResult
opts: (ignore this fits, delete previous fits)
param_graph: None if no parameter to plot, '' for new graph, or id of existig graph
show_fit: plot fit curve?
parts: key is that of original data, value is list of subplots
extrapolate:
"""
2022-03-08 09:27:40 +00:00
f_id_list = []
gid = ''
tobedeleted = []
accepted = []
2022-03-08 09:27:40 +00:00
for i, (k, fit) in enumerate(res.items()):
reject, delete_prev = opts[i]
if reject:
continue
if not all(e is None for e in extrapolate):
spacefunc = np.geomspace if fit.islog else np.linspace
xmin = fit.x.min()
xmax = fit.x.max()
len_data = len(fit.x_data)
num_pts = 20*len_data-9 if len_data < 51 else 3*len_data
if extrapolate[0] is not None:
xmin = extrapolate[0]
if extrapolate[1] is not None:
xmax = extrapolate[1]
if extrapolate[2] is not None:
num_pts = extrapolate[2]
_x = spacefunc(xmin, xmax, num=num_pts)
fit = fit.with_new_x(_x)
2022-03-08 09:27:40 +00:00
data_k = self.data[k]
if delete_prev:
tobedeleted.extend([f.id for f in data_k.get_fits()])
data_k.set_fits([])
syms = data_k.plot_real.symbol
if syms == SymbolStyle.No:
color = data_k.plot_real.linecolor
else:
color = data_k.plot_real.symbolcolor
fit.value = data_k.value
fit.group = data_k.group
accepted.append(fit)
data_name = f' ({data_k.name})'
if show_fit:
fit.name += data_name
f_id = self.add(fit, color=color, src=k)
f_id_list.append(f_id)
data_k.set_fits(f_id)
2022-03-08 09:27:40 +00:00
if parts:
color_scheme = available_cycles['colorblind']
for subfunc, col in zip(fit.sub(fit.x), cycle(color_scheme)):
subfunc.value = data_k.value
subfunc.group = data_k.group
subfunc.name += data_name
sub_f_id = self.add(subfunc, color=col, linestyle=LineStyle.Dashed, symbol=SymbolStyle.No)
2023-07-12 18:48:28 +00:00
self[sub_f_id].add_relation(Relations.isFitPartOf, f_id)
self[f_id].add_relation(Relations.hasFitPart, sub_f_id)
f_id_list.append(sub_f_id)
gid = data_k.graph
2022-03-08 09:27:40 +00:00
self.delete_sets(tobedeleted)
2022-03-30 15:27:02 +00:00
if accepted and (param_graph != '-1'):
self.make_fit_parameter(accepted, graph_id=param_graph)
self.newData.emit(f_id_list, gid)
2022-03-08 09:27:40 +00:00
def extend_fits(self, set_id: list, x_range: np.ndarray):
graphs = {}
for sid in set_id:
data = self[sid]
fit = data.copy(full=True, keep_color=True)
fit.data = fit.data.with_new_x(x_range)
graph_id = data.graph
if graph_id not in graphs:
graphs[graph_id] = []
graphs[graph_id].append(self.add(fit))
color_scheme = available_cycles['colorblind']
for subfunc, col in zip(fit.data.sub(fit.x), cycle(color_scheme)):
subfunc.value = fit.value
subfunc.group = fit.group
graphs[graph_id].append(self.add(subfunc, color=col, linestyle=LineStyle.Dashed, symbol=SymbolStyle.No))
for k, v in graphs.items():
self.newData.emit(v, k)
def make_fit_parameter(self, fit_sets: list[str | FitResult], graph_id: str = None):
2022-03-08 09:27:40 +00:00
fit_dict = self._collect_fit_parameter(fit_sets)
if fit_dict:
p_id_list = []
for v in fit_dict.values():
xy = np.array(v[0]).T
p_id_list.append(self.add(Points(x=xy[0], y=xy[1], y_err=xy[2], name=v[1])))
if not graph_id:
graph_id = ''
self.newData[list, str, bool].emit(p_id_list, graph_id, True)
2022-03-08 09:27:40 +00:00
def save_fit_parameter(self, fname: str | pathlib.Path, fit_sets: list[str] = None):
2022-03-08 09:27:40 +00:00
if fit_sets is None:
fit_sets = [s for s in self.active_id]
2022-03-08 09:27:40 +00:00
for set_id in fit_sets:
data = self.data[set_id]
if data.mode != 'fit':
continue
data.data.save_parameter(fname)
def _collect_fit_parameter(self, fit_sets: list[str | FitResult]) -> dict:
2022-03-08 09:27:40 +00:00
fit_dict = {}
for set_id in fit_sets:
if isinstance(set_id, str):
data = self.data[set_id]
if data.mode != 'fit':
continue
elif isinstance(set_id, FitResult):
data = set_id
else:
2022-03-08 09:27:40 +00:00
continue
for fit_key, pvalue in data.parameter.items():
2022-03-08 09:27:40 +00:00
if fit_key not in fit_dict:
fit_dict[fit_key] = [[], fit_key]
2022-03-08 09:27:40 +00:00
err = 0 if pvalue.error is None else pvalue.error
fit_dict[fit_key][0].append([data.value, pvalue.value, err])
return fit_dict
@QtCore.pyqtSlot(dict, str)
def extract_points(self, params: dict, gid: str):
xy_mode = params.pop('xy')
_active = self.graphs[self.current_graph].active
new_datasets = {}
for sid in _active:
data_i = self.data[sid]
if data_i.group not in new_datasets:
new_datasets[data_i.group] = [], []
new_x_axis, _temp = new_datasets[data_i.group]
new_x_axis.append(data_i.value)
_temp.append(data_i.points(params))
key_list = []
for label, (new_x_axis, _temp) in new_datasets.items():
_temp = np.array(_temp) # (number of sets, number of picks, (x, y, y_err))
num_pts = _temp.shape[1]
for i in range(num_pts):
if xy_mode[0]:
key = self.add(Points(x=new_x_axis, y=_temp[:, i, 0], name=label))
key_list.append(key)
if xy_mode[1]:
key = self.add(Points(x=new_x_axis, y=_temp[:, i, 1], y_err=_temp[:, i, 2], name=label))
key_list.append(key)
self.newData[list, str, bool].emit(key_list, gid, True)
2022-03-08 09:27:40 +00:00
@QtCore.pyqtSlot(list)
def get_properties(self, sid: list) -> dict:
props = {}
for key in sid:
if key not in self.data:
continue
props = self.data[key].get_properties()
self.properties_collected.emit(props)
return props
@QtCore.pyqtSlot(list, str, str, object)
def update_property(self, sid: list, key1: str, key2: str, value: Any):
for s in sid:
self.data[s].update_property(key1, key2, value)
def create_empty(self):
import numpy.random as random
dat = Points(x=np.arange(10), y=np.arange(10) + random.rand(10)-0.5, y_err=random.rand(10),
name='Das Sein und das Nichts')
idd = self.add(dat)
self.newData.emit([idd], self.current_graph)
@QtCore.pyqtSlot(tuple, dict, str)
def calc_mean(self, dist_params, conversion, graph):
dist, args = dist_params
parameter = []
x = None
name = 'tau (%s)' % {conversion["to_"]}
value = 0.
for i, p in enumerate(args):
if isinstance(p, float):
parameter.append(p)
else:
if x is None:
x = self.data[p].x
if i == 0:
name = self.data[p].name
value = self.data[p].value
parameter.append(self.data[p].y)
if x is None:
x = 0
key = self.add(Points(x, dist.convert(*parameter, **conversion), name=name, value=value))
self.newData.emit([key], graph)
self.sender().update_graphs(self.graphs.tree(key_only=True))
@QtCore.pyqtSlot(list, str, bool, bool, tuple, str)
def interpolate_data(self, data_ids, mode, xlog, ylog, new_axis, dest_graph):
if len(new_axis) == 4:
start, end, steps, loggy = new_axis
if loggy:
new_x = np.logspace(np.log10(start), np.log10(end), steps)
else:
new_x = np.linspace(start, end, steps)
else:
new_x = self.data[new_axis[0]].x
new_key = []
for ids in data_ids:
k = self.add(interpolate(self.data[ids], new_x, xlog=xlog, ylog=ylog, kind=mode, extrapolate=True))
new_key.append(k)
self.newData.emit(new_key, dest_graph)
2023-06-19 18:03:21 +00:00
def binning(self, digits: float):
2023-06-15 17:57:13 +00:00
_active = self.graphs[self.current_graph].active
new_data = []
for sid in _active:
key = self.add(self.data[sid].binning(digits=digits))
new_data.append(key)
2023-06-16 18:25:24 +00:00
self.newData.emit(new_data, self.current_graph)
2023-06-15 17:57:13 +00:00
2023-07-05 15:35:31 +00:00
@QtCore.pyqtSlot(dict, str)
2023-07-06 18:05:10 +00:00
def addTg(self, dic: dict, dtype: str):
graph_id = self.current_graph if dtype == 'tg' else dic.pop('graph')
2023-07-05 15:35:31 +00:00
2023-07-06 18:05:10 +00:00
set_id_list = []
2023-07-05 15:35:31 +00:00
2023-07-06 18:05:10 +00:00
if dtype == 'hodge':
for v in dic.values():
2023-07-05 15:35:31 +00:00
set_id_list.append(self.add(v))
2023-06-19 15:50:36 +00:00
2023-07-06 18:05:10 +00:00
else:
for k, (data, lines) in dic.items():
2023-07-05 17:51:06 +00:00
p: ExperimentContainer = self[k]
col = p.plot_real.linecolor
2023-07-06 18:05:10 +00:00
if data is not None:
set_id_list.append(self.add(data, color=col))
2023-07-05 17:51:06 +00:00
2023-07-06 18:05:10 +00:00
if dtype == 'tnmh':
if lines is not None:
lines = [lines]
else:
lines = []
2023-07-05 17:51:06 +00:00
2023-07-06 18:05:10 +00:00
for line in lines:
set_id = self.add(line, color=col)
self[set_id].setLine(style=LineStyle.Dashed)
self[set_id].setSymbol(symbol=SymbolStyle.No)
set_id_list.append(set_id)
2023-07-05 17:51:06 +00:00
2023-07-06 18:05:10 +00:00
self.newData.emit(set_id_list, graph_id)
2023-07-05 17:51:06 +00:00
2022-03-08 09:27:40 +00:00
@QtCore.pyqtSlot(int, dict)
def smooth_data(self, npoints, param_kwargs):
_active = self.graphs[self.current_graph].active
new_data = []
for sid in _active:
try:
key = self.add(smooth(self.data[sid], npoints, **param_kwargs))
new_data.append(key)
except Exception as e:
QtWidgets.QMessageBox().warning(self.window,
'Smoothing failed!',
f'Smoothing failed for {self.data[sid].name} with exception:\n{e.args}')
if new_data:
self.newData.emit(new_data, self.current_graph)
@QtCore.pyqtSlot()
def set_cycle(self, set_idx: list, cycle_name: str):
col = cycle(available_cycles[cycle_name])
for s_id in set_idx:
self.data[s_id].setColor(next(col), symbol=True, line=True, mode='all')
2022-03-08 09:27:40 +00:00
@QtCore.pyqtSlot(dict, tuple)
def shift_scale(self, values: dict, options: tuple):
copy_data, value_plot = options
sid_list = []
shift_y = []
shift_x = []
for k, v in values.items():
d_k = self.data[k]
if copy_data is None:
d_k.shift_scale(v[0], v[1])
2022-03-08 09:27:40 +00:00
else:
new_data = d_k.copy(full=True)
new_data.shift_scale(v[0], v[1])
2022-03-08 09:27:40 +00:00
sid = self.add(new_data)
sid_list.append(sid)
shift_x.append(d_k.value)
shift_y.append(v[0]+v[1])
self.newData.emit(sid_list, copy_data)
if value_plot is not None:
sid_list = []
shift_y = np.array(shift_y)
for i, (mode, default) in enumerate([('x shift', 0.), ('y shift', 0.),
('x scale', 1.), ('y scale', 1.), ]):
if np.all(shift_y[:, i] == default):
continue
data = Points(shift_x, shift_y[:, i], name=mode)
sid_list.append(self.add(data))
self.newData.emit(sid_list, value_plot)
@QtCore.pyqtSlot(list)
def convert_sets(self, src: list):
new_graph = {}
error_list = []
for sets in src:
# merge: sets (real, imag, graph, type)
# normal: sets (source set, graph, type)
graph_id = sets[-2]
new_type = [Points, FID, Spectrum, BDS][sets[-1]]
if len(sets) == 4:
real_set, imag_set = sets[0], sets[1]
if real_set != '':
data = self.data[real_set]
new_data = new_type(data.x, data.y.real)
if imag_set != '':
imag_data = self.data[imag_set]
if len(imag_data) == len(data):
new_data.y.imag = imag_data.y.real
else:
error_list.append(f'Lengths mismatch of {data.name} ({len(data)}) and {imag_data.name} ({len(imag_data)})')
continue
else:
data = self.data[imag_set]
new_data = new_type(data.x, np.zeros(data.x.size))
new_data.y.imag = data.y.real
else:
data = self.data[sets[0]]
if isinstance(data.data, new_type):
error_list.append(f'{data.name} is alreade of type {new_type.__name__}')
continue
new_data = new_type(data.x, np.zeros(data.x.size))
new_data.y.real = data.y.real
new_data.update(data.opts)
new_id = self.add(data.change_type(new_data))
if graph_id not in new_graph:
new_graph[graph_id] = []
new_graph[graph_id].append(new_id)
for g, s in new_graph.items():
self.newData.emit(s, g)
if error_list:
err_string = "\n- ".join(error_list)
_ = QtWidgets.QMessageBox.information(QtWidgets.QWidget(), 'Something was skipped',
f'Some conversions were skipped:\n{err_string}')
def get_namespace(self):
from ..lib.namespace import Namespace
self.namespace = Namespace(basic=True, const=True, fitfuncs=True)
for i, g in enumerate(self.graphs.values()):
for j, sid in enumerate(g.sets):
sets = self.data[sid]
self.namespace.add_namespace(sets.get_namespace(i, j), parents=('Data', f'{sets.name} ({g.title})'))
return self.namespace
@QtCore.pyqtSlot(list, list, bool)
def eval_expression(self, cmds: list, set_ids: list, overwrite: bool):
ns = self.namespace.flatten()
if overwrite:
self.undostack.beginMacro('Evaluate expression')
failures = []
2023-07-09 17:44:33 +00:00
for i, g in enumerate(self.graphs.values()):
for j, sid in enumerate(g.sets):
if sid not in set_ids:
continue
data_i = self.data[sid]
try:
# use a copy of original namespace
new_data = data_i.eval_expression(cmds, dict(ns), i=i, j=j)
if overwrite:
cmd = EvalCommand(self.data, sid, new_data, 'Evaluate expression')
self.undostack.push(cmd)
else:
new_id = self.copy_sets(sets=[sid])
self.data[new_id[0]].data = new_data
except Exception as e:
failures.append((data_i, e))
logger.warning(str(data_i) + ' failed with Exception: ' + ''.join(e.args))
continue
2022-03-08 09:27:40 +00:00
if overwrite:
self.undostack.endMacro()
if failures:
err_msg = QtWidgets.QMessageBox(parent=self.sender())
err_msg.setText('One or more errors occured during evaluation.')
err_msg.setDetailedText('\n'.join(f'{d.name} failed with error: {err.args}' for d, err in failures))
err_msg.exec()
self.sender().success = not failures
self.sender().add_data(self.active_sets)
@QtCore.pyqtSlot(list, dict)
def create_from_function(self, cmds: list, opts: dict):
ns = dict(self.namespace.flatten())
dtype = [Points, FID, Spectrum, BDS][opts.pop('dtype')]
try:
for c in cmds:
exec(c, globals(), ns)
name = opts.pop('name')
value = opts.pop('val')
graph = opts.pop('graph')
data = dtype(x=ns['x'], y=ns['y'], y_err=ns['y_err'], name=name, value=value)
s_id = self.add(data, **opts)
self.sender().success = True
self.newData.emit([s_id], graph)
except Exception as err:
logger.exception('Creation failed with error: ' + ', '.join(err.args))
2022-03-08 09:27:40 +00:00
err_msg = QtWidgets.QMessageBox(parent=self.sender())
err_msg.setText('One or more errors occurred during evaluation.')
2022-03-08 09:27:40 +00:00
err_msg.setDetailedText('Creation failed with error: ' + ', '.join(err.args))
err_msg.exec()
self.sender().success = False
def show_statistics(self, mode):
x, y, = [], []
for i in self.active_id:
2022-03-08 09:27:40 +00:00
_temp = self.data[i]
try:
x.append(float(_temp.name))
except ValueError:
x.append(i)
y.append(_temp.statistic(mode))
@QtCore.pyqtSlot()
def calc_magn(self):
new_id = []
for k in self.active_id:
2022-03-08 09:27:40 +00:00
dataset = self.data[k]
if isinstance(dataset, SignalContainer):
new_value = dataset.copy(full=True)
new_value.data = dataset.data.magnitude()
new_id.append(self.add(new_value))
self.newData.emit(new_id, '')
@QtCore.pyqtSlot()
def center(self):
new_id = []
for k in self.active_id:
2022-03-08 09:27:40 +00:00
new_value = self.data[k].copy(full=True)
new_value.x -= new_value.x[np.argmax(new_value.y.real)]
new_id.append(self.add(new_value))
self.newData.emit(new_id, '')
def integrate(self, **kwargs):
new_sets = []
log = kwargs['log']
limits = kwargs.get('limits')
mode = kwargs['mode']
for set_id in kwargs['sets']:
data_i = self.data[set_id]
if mode == 'i':
new_data = data_i.data.integrate(log=log, limits=limits)
elif mode == 'd':
new_data = data_i.data.diff(log=log)
else:
raise ValueError(f'Unknown mode {mode}.')
new_container = data_i.copy(full=True)
new_container.data = new_data
new_sets.append(self.add(new_container))
self.newData.emit(new_sets, kwargs['graph'])
def integral_datasets(self, ranges: list, x_vals: list, y_vals: np.ndarry):
new_sets = []
for range_i, y_val_i in zip(ranges, y_vals):
new_sets.append(self.add(Points(x=x_vals, y=y_val_i, name=f'{range_i[0]:.4g}-{range_i[1]:.4g}')))
self.newData.emit(new_sets, '')
2022-03-08 09:27:40 +00:00
def bds_deriv(self):
new_sets = []
for set_id in self.active_id:
2022-03-08 09:27:40 +00:00
data_i = self.data[set_id]
diff = data_i.data.diff(log=True)
new_data = Points(x=diff.x, y=-np.pi/2*diff.y.real)
new_data.update(data_i.data.meta)
new_sets.append(self.add(new_data, color=data_i.plot_imag.linecolor))
self.newData.emit(new_sets, '')
def logft(self, **kwargs):
new_sets = []
ft_mode = kwargs['ft_mode']
for set_id in kwargs['sets']:
data_i = self.data[set_id]
if ft_mode in ['cos', 'sin']:
new_data = Points(*logft(data_i.x, data_i.y, mode=ft_mode))
else:
new_data = Signal(*logft(data_i.x, data_i.y, mode=ft_mode))
new_sets.append(self.add(new_data, color=data_i['color'], symbol=data_i['symbol'], line=data_i['line']))
self.data[new_sets[-1]].update(data_i.data.meta)
self.newData.emit(new_sets, kwargs['graph'])
def skip_points(self, offset: int, step: int, invert: bool = False, copy: bool = False):
for k in self.active_id:
2022-03-08 09:27:40 +00:00
src = self.data[k]
if invert:
mask = np.mod(np.arange(offset, src.x.size+offset), step) != 0
else:
mask = np.mod(np.arange(offset, src.x.size+offset), step) == 0
if copy:
data = src.copy()
temp = data.mask.copy()
temp[temp] = mask
data.remove(np.where(~temp))
data.mask = np.ones(data.x.shape)
idd = self.add(data)
self.newData.emit([idd], self.current_graph)
else:
src.mask[src.mask] = mask
src.mask = src.mask
@QtCore.pyqtSlot(dict)
def calc_relaxation(self, opts: dict):
params = opts['pts']
if len(params) == 4:
if params[3]:
2023-09-16 15:42:59 +00:00
_x = np.geomspace(params[0], params[1], num=params[2])
x1 = np.geomspace(params[0], params[1], num=params[2])
2022-03-08 09:27:40 +00:00
else:
2023-09-16 15:42:59 +00:00
_x = np.linspace(params[0], params[1], num=params[2])
x1 = np.linspace(params[0], params[1], num=params[2])
2022-03-08 09:27:40 +00:00
if opts['axis1'] in ['t', 'invt1000']:
t_p = opts['t_param']
if len(t_p) == 2:
from nmreval.models import Arrhenius as Func
2022-03-08 09:27:40 +00:00
else:
from nmreval.models import VFT as Func
2022-03-08 09:27:40 +00:00
_x = Func.func(x1, *t_p, invt=opts['axis1'])
else:
if params[1]:
x1 = self.data[params[0]].x
_x = self.data[params[0]].y.real
else:
_x = x1 = self.data[params[0]].x
x2 = opts['val2']
sd = opts['spec_dens']
sd_param = list(zip(*[self.data[p].y.real if isinstance(p, str) else [p]*len(_x) for p in opts['sd_param'][0]]))
2022-03-08 09:27:40 +00:00
relax = Relaxation()
relax.set_distribution(sd, keywords=opts['sd_param'][1])
2022-03-08 09:27:40 +00:00
cp_param = list(zip(*[self.data[p].y.real if isinstance(p, str) else [p]*len(_x) for p in opts['cp_param'][0]]))
# relax.set_coupling(opts['coup'], parameter=cp_param, keywords=opts['cp_param'][1])
2022-03-08 09:27:40 +00:00
relax_func = relax.t1 if opts['out'] == 't1' else relax.t2
y = np.zeros(_x.size)
for i in range(_x.size):
_x_i = sd.convert(_x[i], *sd_param[i], from_=opts['tau_type'], to_='raw')
relax.dist_parameter = sd_param[i]
relax.set_coupling(opts['coup'], parameter=cp_param[i], keywords=opts['cp_param'][1])
y[i] = relax_func(x2, _x_i)
2022-03-08 09:27:40 +00:00
pts = Points(x1, y, name=sd.name)
pts.meta.update(opts)
# we do not want class instances
pts.meta['coup'] = opts['coup'].name
pts.meta['spec_dens'] = sd.name
self.newData.emit([self.add(pts)], opts['graph'])
self.sender().update_graphs(self.graphs.list())
2022-04-03 14:42:44 +00:00
@QtCore.pyqtSlot(str, list)
2022-03-08 09:27:40 +00:00
def mask_value(self, idx: str, m: list):
self.data[idx].mask = m
2022-04-03 14:42:44 +00:00
@QtCore.pyqtSlot(str, list)
2022-03-08 09:27:40 +00:00
def remove_values(self, idx: str, m: list):
self.data[idx].remove(m)
2022-04-03 14:42:44 +00:00
@QtCore.pyqtSlot(str, int)
def split_set(self, idx: str, row: int):
selected_data = self.data[idx]
popular_front_of_judea = selected_data.copy(full=True)
selected_data.remove(slice(row, None))
popular_front_of_judea.remove(slice(None, row))
new_id = self.add(popular_front_of_judea)
self.newData.emit([new_id], selected_data.graph)
2022-03-08 09:27:40 +00:00
def set_values(self, idx: str, pos: tuple, value):
self.data[idx].setvalues(pos, value)
def append(self, idx: str):
self.data[idx].add([0.0, 0.0, 0.0])
2022-03-24 16:35:10 +00:00
def save(self, outpath: str | pathlib.Path, extension: str, strip_spaces=False):
2022-03-08 09:27:40 +00:00
path = pathlib.Path(outpath)
suffix = path.suffix
if not suffix:
m = re.match(r'[\w\s]*\(\*(\.\w+)\)', extension)
if m:
suffix = m.group(1)
path = path.with_suffix(suffix)
else:
raise ValueError('No file extension detected')
if suffix == '.nmr':
from nmreval.io.sessionwriter import NMRWriter
2022-03-08 09:27:40 +00:00
NMRWriter(self.graphs, self.data).export(path)
return
real_outnames = []
for set_id, set_name in self.active_sets:
full_name = path.stem
2022-03-24 16:35:10 +00:00
if '<label>' in full_name:
2022-03-08 09:27:40 +00:00
full_name = full_name.replace('<label>', convert(set_name, old='tex', new='str'))
data_i = self.data[set_id]
if isinstance(data_i, FitContainer):
try:
full_name += ' fit(' + self.data[data_i.parent_set].name + ')'
except KeyError:
# TODO fits should have a parent set
logger.warning(f'Fit {data_i} without valid parent : key is [{data_i.parent_set}')
full_name += ' fit'
2022-03-08 09:27:40 +00:00
if strip_spaces:
full_name = full_name.replace(' ', '_')
if full_name in real_outnames:
cnt = 1
while (full_name + '_' + str(cnt)) in real_outnames:
cnt += 1
real_outnames.append(full_name + '_' + str(cnt))
else:
real_outnames.append(full_name)
2023-01-26 18:30:39 +00:00
out_name = real_outnames[-1]
bad_character = r'/*<>\|:"'
for c in bad_character:
out_name = out_name.replace(c, '')
outpath_set = path.with_name(out_name+path.suffix)
2022-03-08 09:27:40 +00:00
data_i.save(outpath_set)
class FitWorker(QtCore.QObject):
finished = QtCore.pyqtSignal(list, bool)
2023-09-07 17:52:53 +00:00
def __init__(self, fitter):
2022-03-08 09:27:40 +00:00
super().__init__()
self.fitter = fitter
@QtCore.pyqtSlot()
def run(self):
try:
2023-09-07 17:52:53 +00:00
res = self.fitter.run()
2022-03-08 09:27:40 +00:00
success = True
except Exception as e:
res = [e]
2022-03-08 09:27:40 +00:00
success = False
self.finished.emit(res, success)