nmreval/src/gui_qt/main/management.py
2023-09-07 19:52:53 +02:00

1301 lines
44 KiB
Python

from __future__ import annotations
import pathlib
import re
import uuid
import numpy as np
from nmreval.fit import data as fit_d
from nmreval.fit.model import Model
from nmreval.fit.result import FitResult
from nmreval.fit.minimizer import FitRoutine
from nmreval.lib.colors import available_cycles
from nmreval.lib.logger import logger
from nmreval.math.interpol import interpolate
from nmreval.math.logfourier import logft
from nmreval.math.smooth import smooth
from nmreval.nmr.relaxation import Relaxation
from ..Qt import QtCore, QtWidgets
from ..lib import Relations
from ..lib.undos import *
from ..data.container import *
from ..io.filereaders import QFileReader
from ..lib.utils import busy_cursor
class GraphSignals(QtCore.QObject):
valueChanged = QtCore.pyqtSignal()
class GraphDict(OrderedDict):
def __init__(self, data):
super().__init__()
self._data = data
self.signals = GraphSignals()
self.valueChanged = self.signals.valueChanged
def __setitem__(self, key, value):
super().__setitem__(key, value)
self.valueChanged.emit()
def __delitem__(self, key):
super().__delitem__(key)
self.valueChanged.emit()
def tree(self, key_only=False):
ret_val = OrderedDict()
for k, g in self.items():
if key_only:
ret_val[k] = (g.title, [(s, self._data[s].name) for s in g.sets])
else:
ret_val[(k, g.title)] = [(s, self._data[s].name) for s in g.sets]
return ret_val
def list(self):
return [(k, v.title) for k, v in self.items()]
def active(self, key: str, return_val: str = 'both'):
if not key:
return []
else:
if return_val == 'both':
return [(self._data[i].id, self._data[i].name) for i in self[key]]
elif return_val == 'id':
return [self._data[i].id for i in self[key]]
elif return_val == 'name':
return [self._data[i].name for i in self[key]]
else:
raise ValueError(f'return_val got wrong value {return_val!r}')
def current_sets(self, key: str):
if key:
return [(self._data[i].id, self._data[i].name) for i in self[key].sets]
else:
return []
class UpperManagement(QtCore.QObject):
newGraph = QtCore.pyqtSignal()
restoreGraph = QtCore.pyqtSignal(str)
deleteGraph = QtCore.pyqtSignal(str)
newData = QtCore.pyqtSignal([list, str], [list, str, bool])
deleteData = QtCore.pyqtSignal(list)
dataChanged = QtCore.pyqtSignal(str)
fitFinished = QtCore.pyqtSignal(list)
stopFit = QtCore.pyqtSignal()
properties_collected = QtCore.pyqtSignal(dict)
unset_state = QtCore.pyqtSignal(list)
_colors = cycle(TUColors)
_actions = {
'ls': (ShiftCommand, 'Left shift'),
'cut': (CutCommand, 'Cut'),
'ap': (ApodizationCommand, 'Apodization'),
'zf': (ZerofillCommand, 'Zerofill'),
'ph': (PhaseCommand, 'Phase correction'),
'autoph': (AutophaseCommand, 'Autophase'),
'bl': (BaselineCommand, 'Baseline'),
'bls': (BaselineSplineCommand, 'Baseline'),
'ft': (FourierCommand, 'Fourier'),
'ft_pake': 'FT (de-paked)',
'sort': (SortCommand, 'Sort'),
'norm': (NormCommand, 'Normalize'),
'center': (CenterCommand, 'Center on max'),
}
def __init__(self, window):
super().__init__()
self._fit_active = False
self.fit_thread = None
self.fit_worker = None
self.counter = 0
self.data = OrderedDict()
self.window = window
self.current_graph = None
self.graphs = GraphDict(self.data)
self.namespace = None
self.undostack = QtWidgets.QUndoStack()
self.deleteData.connect(self.plot_from_graph)
self._filereader = None
def __setitem__(self, key: str, value, **kwargs):
if isinstance(value, ExperimentContainer):
item = value
item.id = key
elif isinstance(value, FitResult):
item = FitContainer(key, value, manager=self, **kwargs)
elif isinstance(value, Signal):
item = SignalContainer(key, value, manager=self, **kwargs)
else:
item = PointContainer(key, value, manager=self, **kwargs)
item.dataChanged.connect(lambda x: self.dataChanged.emit(x))
self.data[key] = item
def __getitem__(self, item):
return self.data[item]
def __contains__(self, item):
return item in self.data
def __iter__(self):
for k, v in self.data.items():
yield k, v
@property
def active_sets(self):
return self.graphs.active(self.current_graph)
@property
def active_id(self):
return self.graphs.active(self.current_graph, return_val='id')
def get_attributes(self, graph_id: str, attr: str) -> dict[str, Any]:
return {self.data[i].id: getattr(self.data[i], attr) for i in self.graphs[graph_id].sets}
def add(self, data, **kwargs):
_id = str(uuid.uuid4())
self.__setitem__(_id, data, **kwargs)
return _id
def load_files(self, fname: list[str], new_plot: str = None):
if self._filereader is None:
self._filereader = QFileReader(manager=self)
ret_dic = self._filereader.readfiles(fname)
self.add_new_data(ret_dic, new_plot)
def _load_session(self, sets: dict, graphs: dict):
sid = self._load_sets(sets)
for g in graphs:
_ = g.pop('id')
graph = QGraphWindow.set_state(g)
self.graphs[graph.id] = graph
self.restoreGraph.emit(graph.id)
children = [sid[c] for c in g['children']]
active = [sid[c] for c in g['active']]
inactive = [k for k in children if k not in active]
self.newData.emit(children, graph.id)
graph.active = active
graph.listWidget.blockSignals(True)
for i, l in enumerate(g['in_legend']):
try:
graph.listWidget.item(i).setCheckState(QtCore.Qt.Checked if l else QtCore.Qt.Unchecked)
except AttributeError:
pass
graph.listWidget.blockSignals(False)
# set unchecked in tree and hide/show in plot
self.unset_state.emit(inactive)
self.change_visibility(active, inactive)
def _load_sets(self, sets: dict) -> dict:
sid = {}
for _id, (data, opts) in sets.items():
if isinstance(data, FitResult):
# for fits, _id belongs to the fitted data, not the fit
src_id = data.idx
if src_id in sid:
new_id = self.add(data, src=sid[src_id])
self.data[sid[src_id]]._fits.append(new_id)
else:
new_id = self.add(data)
else:
new_id = self.add(data)
sid[_id] = new_id
for m in ['real', 'imag']:
if m in opts:
self.data[new_id].setSymbol(**opts[m][0], mode=m)
self.data[new_id].setLine(**opts[m][1], mode=m)
return sid
def add_new_data(self, data: list, gid: str):
sid = []
for d in data:
if isinstance(d, tuple):
if len(d) == 2:
self._load_session(d[0], graphs=d[1])
else:
sid.extend(list(self._load_sets(d[0]).values()))
else:
sid.append(self.add(d))
if sid:
gid = '' if not gid else gid
self.newData.emit(sid, gid)
def plots_to_graph(self, plotkeys: list, gid: str):
self.graphs[gid].add(plotkeys, [self.data[k].plots for k in plotkeys])
for k in plotkeys:
self.data[k].graph = gid
@QtCore.pyqtSlot(list)
def plot_from_graph(self, key: list[str]):
sort_graph = {}
for sid in key:
v = self.data[sid].graph
if v not in sort_graph:
sort_graph[v] = []
sort_graph[v].append(sid)
for gid, sets in sort_graph.items():
self.graphs[gid].remove(sets)
@QtCore.pyqtSlot(list, str, str)
def move_sets(self, sets: list, dest: str, src: (str|list), pos: int = -1):
if isinstance(src, str):
src = [src]*len(sets)
for graph_id, set_id in zip(src, sets):
# move all plots to the same graph
if graph_id != dest:
self.graphs[graph_id].remove(set_id)
self.plots_to_graph([set_id], dest)
# move to correct position
self.graphs[dest].move_sets(sets, pos)
def select_window(self, gid: str):
for key, plot in self.graphs.items():
if key == gid:
self.window.area.setActiveSubWindow(plot.parent())
@QtCore.pyqtSlot()
@QtCore.pyqtSlot(list, str)
def copy_sets(self, sets: list = None, src: str = None):
if sets is None:
sets = self.graphs[self.current_graph].active[:]
if src is None:
src = self.current_graph
new_ids = []
for s in sets:
copy_of_s = self.data[s].copy(full=True)
copy_of_s.id = str(uuid.uuid4())
new_ids.append(copy_of_s.id)
self.data[copy_of_s.id] = copy_of_s
self.newData.emit(new_ids, src)
return new_ids
@QtCore.pyqtSlot(list)
@QtCore.pyqtSlot(str)
@QtCore.pyqtSlot()
def delete_sets(self, rm_sets: list = None):
rm_graphs = []
if rm_sets is None:
rm_sets = self.graphs[self.current_graph].sets + [self.current_graph]
self.undostack.beginMacro('Delete')
rm_set_by_graph = {}
for k in rm_sets[::-1]:
if k in self.data:
parent_graph = self.data[k].graph
if parent_graph not in rm_set_by_graph:
rm_set_by_graph[parent_graph] = []
rm_set_by_graph[parent_graph].append(k)
elif k in self.graphs:
rm_graphs.append(k)
else:
logger.warning(f'delete_sets: {k} is not in data or graph found')
for gid, sid_list in rm_set_by_graph.items():
cmd = DeleteCommand(self.data, sid_list, self.graphs, gid, self.newData, self.deleteData)
self.undostack.push(cmd)
for k in rm_graphs:
cmd = DeleteGraphCommand(self.graphs, k, self.restoreGraph, self.deleteGraph)
self.undostack.push(cmd)
self.undostack.endMacro()
def delete_graph(self, gid):
self.delete_sets(self.graphs[gid].sets + [gid])
@QtCore.pyqtSlot()
def cat(self, src_sets=None):
joined = None
group_set = set()
name_set = set()
value_set = set()
if src_sets is None:
if self.current_graph:
src_sets = self.graphs[self.current_graph].active
else:
return
for sid in src_sets:
data_i = self.data[sid]
if joined is None:
joined = data_i.copy()
else:
joined.append(data_i.data.x, data_i.data.y, y_err=data_i.data.y_err, mask=data_i.data.mask)
name_set.add(data_i.name)
group_set.add(data_i.group)
value_set.add(data_i.value)
if joined is not None:
joined.group = '+'.join(group_set)
joined.name = '+'.join(name_set)
if len(value_set) == 1:
joined.value = value_set.pop()
else:
joined.value = 0.0
self.newData.emit([self.add(joined)], self.current_graph)
def get_data(self, sid: str, xy_only: bool = False):
"""
Return data for a given id.
Return value is tuple of [x, y, y_err] and mask if xy_only is False, [x, y] if true.
"""
d = self.data[sid]
if xy_only:
return [d.x, d.y]
return [d.data.x, d.data.y, d.data.y_err], d.data.mask.data
def change_visibility(self, selected: list, deselected: list):
"""Change status of list of ids after status change in datawidget"""
for s in selected:
self.graphs[self.data[s].graph].show_item([s])
for d in deselected:
self.graphs[self.data[d].graph].hide_item([d])
@QtCore.pyqtSlot(str, str)
def change_keys(self, identifier: str, name: str):
if identifier in self.data:
d = self.data[identifier]
d.name = name
self.graphs[d.graph].update_legend(identifier, name)
elif identifier in self.graphs:
self.graphs[identifier].title = name
else:
raise KeyError('Unknown ID ' + str(identifier))
@QtCore.pyqtSlot(str, tuple)
def apply(self, func: str, arguments: tuple):
# undos, names displayed by undo action
cmd, cmd_text = self._actions[func]
self.undostack.beginMacro(cmd_text)
for sid in self.graphs[self.current_graph]:
single_undo = cmd(self.data[sid], *arguments)
self.undostack.push(single_undo)
self.undostack.endMacro()
def cut(self):
if self.current_graph:
xlim, _ = self.graphs[self.current_graph].ranges
self.apply('cut', xlim)
@QtCore.pyqtSlot()
def unmask(self):
for d in self.data.values():
d.mask = np.ones_like(d.mask, dtype=bool)
def prepare_fit(self, parameter: dict, links: list, fit_options: dict) -> bool:
if self._fit_active:
return False
self.__fit_options = (parameter, links, fit_options)
self.fitter = FitRoutine()
models = {}
fit_limits = fit_options['limits']
fit_mode = fit_options['fit_mode']
we_option = fit_options['we']
self.fitter.fitmethod = fit_mode
# all-encompassing error catch
try:
for model_id, model_p in parameter.items():
m = Model(model_p['func'])
models[model_id] = m
m_complex = model_p['complex']
# sets are not in active order but in order they first appeared in fit dialog
# iterate over order of set id in active order and access parameter inside loop
# instead of directly looping
list_ids = list(model_p['parameter'].keys())
set_order = [self.active_id.index(i) for i in list_ids]
for pos in set_order:
set_id = list_ids[pos]
data_i = self.data[set_id]
set_params = model_p['parameter'][set_id]
if we_option.lower() == 'deltay':
we = data_i.y_err**2
else:
we = we_option
if m_complex is None or m_complex == 1:
_y = data_i.y.real
elif m_complex == 2 and np.iscomplexobj(data_i.y):
_y = data_i.y.imag
else:
_y = data_i.y
_x = data_i.x
if fit_limits == 'none':
inside = slice(None)
elif fit_limits == 'x':
x_lim, _ = self.graphs[self.current_graph].ranges
inside = np.where((_x >= x_lim[0]) & (_x <= x_lim[1]))
else:
inside = np.where((_x >= fit_limits[0]) & (_x <= fit_limits[1]))
if isinstance(we, str):
d = fit_d.Data(_x[inside], _y[inside], we=we, idx=set_id)
else:
d = fit_d.Data(_x[inside], _y[inside], we=we[inside], idx=set_id)
d.set_model(m)
d.set_parameter(set_params[0], var=model_p['var'],
lb=model_p['lb'], ub=model_p['ub'],
fun_kwargs=set_params[1])
self.fitter.add_data(d)
model_globs = model_p['glob']
if model_globs:
m.set_global_parameter(**model_p['glob'])
for links_i in links:
self.fitter.set_link_parameter((models[links_i[0]], links_i[1]),
(models[links_i[2]], links_i[3]))
return True
except Exception as e:
logger.error('Fit preparation failed', *e.args)
QtWidgets.QMessageBox.warning(QtWidgets.QWidget(),
'Fit prep failed',
f'Fit preparation failed with message\n{e.args}')
return False
def start_fit(self):
with busy_cursor():
self.fit_worker = FitWorker(self.fitter)
self.fit_thread = QtCore.QThread()
self.fit_worker.moveToThread(self.fit_thread)
self.fit_thread.started.connect(self.fit_worker.run)
self.fit_worker.finished.connect(self.end_fit)
self.fit_worker.finished.connect(self.fit_thread.quit)
self.fit_worker.finished.connect(self.fit_worker.deleteLater)
self.fit_thread.finished.connect(self.fit_thread.deleteLater)
self.stopFit.connect(lambda: self.fit_worker.fitter.abort())
self.fit_thread.start()
@QtCore.pyqtSlot(list, bool)
def end_fit(self, result: list, success: bool):
if success:
logger.info('Successful fit')
self.fitFinished.emit(result)
else:
e = result[0]
logger.exception(e, exc_info=True)
QtWidgets.QMessageBox.warning(QtWidgets.QWidget(), 'Fit failed',
f'Fit kaput with exception: \n\n{e!r}')
self.fitFinished.emit([])
self._fit_active = False
@QtCore.pyqtSlot(dict)
def redo_fits(self, res: dict):
models = self.__fit_options[0]
for single_model, model_args in models.items():
parameter = model_args['parameter']
for set_id, set_parameter in parameter.items():
new_values = [v.value for v in res[set_id].parameter.values()]
parameter[set_id] = (new_values, set_parameter[1])
if self.prepare_fit(*self.__fit_options):
self.start_fit()
def make_fits(self, res: dict, opts: list, param_graph: str, show_fit: bool, parts: bool, extrapolate: list) -> None:
"""
Args:
res: key is that of original data, value is FitResult
opts: (ignore this fits, delete previous fits)
param_graph: None if no parameter to plot, '' for new graph, or id of existig graph
show_fit: plot fit curve?
parts: key is that of original data, value is list of subplots
extrapolate:
"""
f_id_list = []
gid = ''
tobedeleted = []
accepted = []
for i, (k, fit) in enumerate(res.items()):
reject, delete_prev = opts[i]
if reject:
continue
if not all(e is None for e in extrapolate):
spacefunc = np.geomspace if fit.islog else np.linspace
xmin = fit.x.min()
xmax = fit.x.max()
len_data = len(fit.x_data)
num_pts = 20*len_data-9 if len_data < 51 else 3*len_data
if extrapolate[0] is not None:
xmin = extrapolate[0]
if extrapolate[1] is not None:
xmax = extrapolate[1]
if extrapolate[2] is not None:
num_pts = extrapolate[2]
_x = spacefunc(xmin, xmax, num=num_pts)
fit = fit.with_new_x(_x)
data_k = self.data[k]
if delete_prev:
tobedeleted.extend([f.id for f in data_k.get_fits()])
data_k.set_fits([])
syms = data_k.plot_real.symbol
if syms == SymbolStyle.No:
color = data_k.plot_real.linecolor
else:
color = data_k.plot_real.symbolcolor
fit.value = data_k.value
fit.group = data_k.group
accepted.append(fit)
data_name = f' ({data_k.name})'
if show_fit:
fit.name += data_name
f_id = self.add(fit, color=color, src=k)
f_id_list.append(f_id)
data_k.set_fits(f_id)
if parts:
color_scheme = available_cycles['colorblind']
for subfunc, col in zip(fit.sub(fit.x), cycle(color_scheme)):
subfunc.value = data_k.value
subfunc.group = data_k.group
subfunc.name += data_name
sub_f_id = self.add(subfunc, color=col, linestyle=LineStyle.Dashed, symbol=SymbolStyle.No)
self[sub_f_id].add_relation(Relations.isFitPartOf, f_id)
self[f_id].add_relation(Relations.hasFitPart, sub_f_id)
f_id_list.append(sub_f_id)
gid = data_k.graph
self.delete_sets(tobedeleted)
if accepted and (param_graph != '-1'):
self.make_fit_parameter(accepted, graph_id=param_graph)
self.newData.emit(f_id_list, gid)
def extend_fits(self, set_id: list, x_range: np.ndarray):
graphs = {}
for sid in set_id:
data = self[sid]
fit = data.copy(full=True, keep_color=True)
fit.data = fit.data.with_new_x(x_range)
graph_id = data.graph
if graph_id not in graphs:
graphs[graph_id] = []
graphs[graph_id].append(self.add(fit))
color_scheme = available_cycles['colorblind']
for subfunc, col in zip(fit.data.sub(fit.x), cycle(color_scheme)):
subfunc.value = fit.value
subfunc.group = fit.group
graphs[graph_id].append(self.add(subfunc, color=col, linestyle=LineStyle.Dashed, symbol=SymbolStyle.No))
for k, v in graphs.items():
self.newData.emit(v, k)
def make_fit_parameter(self, fit_sets: list[str | FitResult], graph_id: str = None):
fit_dict = self._collect_fit_parameter(fit_sets)
if fit_dict:
p_id_list = []
for v in fit_dict.values():
xy = np.array(v[0]).T
p_id_list.append(self.add(Points(x=xy[0], y=xy[1], y_err=xy[2], name=v[1])))
if not graph_id:
graph_id = ''
self.newData[list, str, bool].emit(p_id_list, graph_id, True)
def save_fit_parameter(self, fname: str | pathlib.Path, fit_sets: list[str] = None):
if fit_sets is None:
fit_sets = [s for s in self.active_id]
for set_id in fit_sets:
data = self.data[set_id]
if data.mode != 'fit':
continue
data.data.save_parameter(fname)
def _collect_fit_parameter(self, fit_sets: list[str | FitResult]) -> dict:
fit_dict = {}
for set_id in fit_sets:
if isinstance(set_id, str):
data = self.data[set_id]
if data.mode != 'fit':
continue
elif isinstance(set_id, FitResult):
data = set_id
else:
continue
for fit_key, pvalue in data.parameter.items():
if fit_key not in fit_dict:
fit_dict[fit_key] = [[], fit_key]
err = 0 if pvalue.error is None else pvalue.error
fit_dict[fit_key][0].append([data.value, pvalue.value, err])
return fit_dict
@QtCore.pyqtSlot(dict, str)
def extract_points(self, params: dict, gid: str):
xy_mode = params.pop('xy')
_active = self.graphs[self.current_graph].active
new_datasets = {}
for sid in _active:
data_i = self.data[sid]
if data_i.group not in new_datasets:
new_datasets[data_i.group] = [], []
new_x_axis, _temp = new_datasets[data_i.group]
new_x_axis.append(data_i.value)
_temp.append(data_i.points(params))
key_list = []
for label, (new_x_axis, _temp) in new_datasets.items():
_temp = np.array(_temp) # (number of sets, number of picks, (x, y, y_err))
num_pts = _temp.shape[1]
for i in range(num_pts):
if xy_mode[0]:
key = self.add(Points(x=new_x_axis, y=_temp[:, i, 0], name=label))
key_list.append(key)
if xy_mode[1]:
key = self.add(Points(x=new_x_axis, y=_temp[:, i, 1], y_err=_temp[:, i, 2], name=label))
key_list.append(key)
self.newData[list, str, bool].emit(key_list, gid, True)
@QtCore.pyqtSlot(list)
def get_properties(self, sid: list) -> dict:
props = {}
for key in sid:
if key not in self.data:
continue
props = self.data[key].get_properties()
self.properties_collected.emit(props)
return props
@QtCore.pyqtSlot(list, str, str, object)
def update_property(self, sid: list, key1: str, key2: str, value: Any):
for s in sid:
self.data[s].update_property(key1, key2, value)
def create_empty(self):
import numpy.random as random
dat = Points(x=np.arange(10), y=np.arange(10) + random.rand(10)-0.5, y_err=random.rand(10),
name='Das Sein und das Nichts')
idd = self.add(dat)
self.newData.emit([idd], self.current_graph)
@QtCore.pyqtSlot(tuple, dict, str)
def calc_mean(self, dist_params, conversion, graph):
dist, args = dist_params
parameter = []
x = None
name = 'tau (%s)' % {conversion["to_"]}
value = 0.
for i, p in enumerate(args):
if isinstance(p, float):
parameter.append(p)
else:
if x is None:
x = self.data[p].x
if i == 0:
name = self.data[p].name
value = self.data[p].value
parameter.append(self.data[p].y)
if x is None:
x = 0
key = self.add(Points(x, dist.convert(*parameter, **conversion), name=name, value=value))
self.newData.emit([key], graph)
self.sender().update_graphs(self.graphs.tree(key_only=True))
@QtCore.pyqtSlot(list, str, bool, bool, tuple, str)
def interpolate_data(self, data_ids, mode, xlog, ylog, new_axis, dest_graph):
if len(new_axis) == 4:
start, end, steps, loggy = new_axis
if loggy:
new_x = np.logspace(np.log10(start), np.log10(end), steps)
else:
new_x = np.linspace(start, end, steps)
else:
new_x = self.data[new_axis[0]].x
new_key = []
for ids in data_ids:
k = self.add(interpolate(self.data[ids], new_x, xlog=xlog, ylog=ylog, kind=mode, extrapolate=True))
new_key.append(k)
self.newData.emit(new_key, dest_graph)
def binning(self, digits: float):
_active = self.graphs[self.current_graph].active
new_data = []
for sid in _active:
key = self.add(self.data[sid].binning(digits=digits))
new_data.append(key)
self.newData.emit(new_data, self.current_graph)
@QtCore.pyqtSlot(dict, str)
def addTg(self, dic: dict, dtype: str):
graph_id = self.current_graph if dtype == 'tg' else dic.pop('graph')
set_id_list = []
if dtype == 'hodge':
for v in dic.values():
set_id_list.append(self.add(v))
else:
for k, (data, lines) in dic.items():
p: ExperimentContainer = self[k]
col = p.plot_real.linecolor
if data is not None:
set_id_list.append(self.add(data, color=col))
if dtype == 'tnmh':
if lines is not None:
lines = [lines]
else:
lines = []
for line in lines:
set_id = self.add(line, color=col)
self[set_id].setLine(style=LineStyle.Dashed)
self[set_id].setSymbol(symbol=SymbolStyle.No)
set_id_list.append(set_id)
self.newData.emit(set_id_list, graph_id)
@QtCore.pyqtSlot(int, dict)
def smooth_data(self, npoints, param_kwargs):
_active = self.graphs[self.current_graph].active
new_data = []
for sid in _active:
try:
key = self.add(smooth(self.data[sid], npoints, **param_kwargs))
new_data.append(key)
except Exception as e:
QtWidgets.QMessageBox().warning(self.window,
'Smoothing failed!',
f'Smoothing failed for {self.data[sid].name} with exception:\n{e.args}')
if new_data:
self.newData.emit(new_data, self.current_graph)
@QtCore.pyqtSlot()
def set_cycle(self, set_idx: list, cycle_name: str):
col = cycle(available_cycles[cycle_name])
for s_id in set_idx:
self.data[s_id].setColor(next(col), symbol=True, line=True, mode='all')
@QtCore.pyqtSlot(dict, tuple)
def shift_scale(self, values: dict, options: tuple):
copy_data, value_plot = options
sid_list = []
shift_y = []
shift_x = []
for k, v in values.items():
d_k = self.data[k]
if copy_data is None:
d_k.shift_scale(v[0], v[1])
else:
new_data = d_k.copy(full=True)
new_data.shift_scale(v[0], v[1])
sid = self.add(new_data)
sid_list.append(sid)
shift_x.append(d_k.value)
shift_y.append(v[0]+v[1])
self.newData.emit(sid_list, copy_data)
if value_plot is not None:
sid_list = []
shift_y = np.array(shift_y)
for i, (mode, default) in enumerate([('x shift', 0.), ('y shift', 0.),
('x scale', 1.), ('y scale', 1.), ]):
if np.all(shift_y[:, i] == default):
continue
data = Points(shift_x, shift_y[:, i], name=mode)
sid_list.append(self.add(data))
self.newData.emit(sid_list, value_plot)
@QtCore.pyqtSlot(list)
def convert_sets(self, src: list):
new_graph = {}
error_list = []
for sets in src:
# merge: sets (real, imag, graph, type)
# normal: sets (source set, graph, type)
graph_id = sets[-2]
new_type = [Points, FID, Spectrum, BDS][sets[-1]]
if len(sets) == 4:
real_set, imag_set = sets[0], sets[1]
if real_set != '':
data = self.data[real_set]
new_data = new_type(data.x, data.y.real)
if imag_set != '':
imag_data = self.data[imag_set]
if len(imag_data) == len(data):
new_data.y.imag = imag_data.y.real
else:
error_list.append(f'Lengths mismatch of {data.name} ({len(data)}) and {imag_data.name} ({len(imag_data)})')
continue
else:
data = self.data[imag_set]
new_data = new_type(data.x, np.zeros(data.x.size))
new_data.y.imag = data.y.real
else:
data = self.data[sets[0]]
if isinstance(data.data, new_type):
error_list.append(f'{data.name} is alreade of type {new_type.__name__}')
continue
new_data = new_type(data.x, np.zeros(data.x.size))
new_data.y.real = data.y.real
new_data.update(data.opts)
new_id = self.add(data.change_type(new_data))
if graph_id not in new_graph:
new_graph[graph_id] = []
new_graph[graph_id].append(new_id)
for g, s in new_graph.items():
self.newData.emit(s, g)
if error_list:
err_string = "\n- ".join(error_list)
_ = QtWidgets.QMessageBox.information(QtWidgets.QWidget(), 'Something was skipped',
f'Some conversions were skipped:\n{err_string}')
def get_namespace(self):
from ..lib.namespace import Namespace
self.namespace = Namespace(basic=True, const=True, fitfuncs=True)
for i, g in enumerate(self.graphs.values()):
for j, sid in enumerate(g.sets):
sets = self.data[sid]
self.namespace.add_namespace(sets.get_namespace(i, j), parents=('Data', f'{sets.name} ({g.title})'))
return self.namespace
@QtCore.pyqtSlot(list, list, bool)
def eval_expression(self, cmds: list, set_ids: list, overwrite: bool):
ns = self.namespace.flatten()
if overwrite:
self.undostack.beginMacro('Evaluate expression')
failures = []
for i, g in enumerate(self.graphs.values()):
for j, sid in enumerate(g.sets):
if sid not in set_ids:
continue
data_i = self.data[sid]
try:
# use a copy of original namespace
new_data = data_i.eval_expression(cmds, dict(ns), i=i, j=j)
if overwrite:
cmd = EvalCommand(self.data, sid, new_data, 'Evaluate expression')
self.undostack.push(cmd)
else:
new_id = self.copy_sets(sets=[sid])
self.data[new_id[0]].data = new_data
except Exception as e:
failures.append((data_i, e))
logger.warning(str(data_i) + ' failed with Exception: ' + ''.join(e.args))
continue
if overwrite:
self.undostack.endMacro()
if failures:
err_msg = QtWidgets.QMessageBox(parent=self.sender())
err_msg.setText('One or more errors occured during evaluation.')
err_msg.setDetailedText('\n'.join(f'{d.name} failed with error: {err.args}' for d, err in failures))
err_msg.exec()
self.sender().success = not failures
self.sender().add_data(self.active_sets)
@QtCore.pyqtSlot(list, dict)
def create_from_function(self, cmds: list, opts: dict):
ns = dict(self.namespace.flatten())
dtype = [Points, FID, Spectrum, BDS][opts.pop('dtype')]
try:
for c in cmds:
exec(c, globals(), ns)
name = opts.pop('name')
value = opts.pop('val')
graph = opts.pop('graph')
data = dtype(x=ns['x'], y=ns['y'], y_err=ns['y_err'], name=name, value=value)
s_id = self.add(data, **opts)
self.sender().success = True
self.newData.emit([s_id], graph)
except Exception as err:
logger.exception('Creation failed with error: ' + ', '.join(err.args))
err_msg = QtWidgets.QMessageBox(parent=self.sender())
err_msg.setText('One or more errors occurred during evaluation.')
err_msg.setDetailedText('Creation failed with error: ' + ', '.join(err.args))
err_msg.exec()
self.sender().success = False
def show_statistics(self, mode):
x, y, = [], []
for i in self.active_id:
_temp = self.data[i]
try:
x.append(float(_temp.name))
except ValueError:
x.append(i)
y.append(_temp.statistic(mode))
@QtCore.pyqtSlot()
def calc_magn(self):
new_id = []
for k in self.active_id:
dataset = self.data[k]
if isinstance(dataset, SignalContainer):
new_value = dataset.copy(full=True)
new_value.data = dataset.data.magnitude()
new_id.append(self.add(new_value))
self.newData.emit(new_id, '')
@QtCore.pyqtSlot()
def center(self):
new_id = []
for k in self.active_id:
new_value = self.data[k].copy(full=True)
new_value.x -= new_value.x[np.argmax(new_value.y.real)]
new_id.append(self.add(new_value))
self.newData.emit(new_id, '')
def integrate(self, **kwargs):
new_sets = []
log = kwargs['log']
limits = kwargs.get('limits')
mode = kwargs['mode']
for set_id in kwargs['sets']:
data_i = self.data[set_id]
if mode == 'i':
new_data = data_i.data.integrate(log=log, limits=limits)
elif mode == 'd':
new_data = data_i.data.diff(log=log)
else:
raise ValueError(f'Unknown mode {mode}.')
new_container = data_i.copy(full=True)
new_container.data = new_data
new_sets.append(self.add(new_container))
self.newData.emit(new_sets, kwargs['graph'])
def integral_datasets(self, ranges: list, x_vals: list, y_vals: np.ndarry):
new_sets = []
for range_i, y_val_i in zip(ranges, y_vals):
new_sets.append(self.add(Points(x=x_vals, y=y_val_i, name=f'{range_i[0]:.4g}-{range_i[1]:.4g}')))
self.newData.emit(new_sets, '')
def bds_deriv(self):
new_sets = []
for set_id in self.active_id:
data_i = self.data[set_id]
diff = data_i.data.diff(log=True)
new_data = Points(x=diff.x, y=-np.pi/2*diff.y.real)
new_data.update(data_i.data.meta)
new_sets.append(self.add(new_data, color=data_i.plot_imag.linecolor))
self.newData.emit(new_sets, '')
def logft(self, **kwargs):
new_sets = []
ft_mode = kwargs['ft_mode']
for set_id in kwargs['sets']:
data_i = self.data[set_id]
if ft_mode in ['cos', 'sin']:
new_data = Points(*logft(data_i.x, data_i.y, mode=ft_mode))
else:
new_data = Signal(*logft(data_i.x, data_i.y, mode=ft_mode))
new_sets.append(self.add(new_data, color=data_i['color'], symbol=data_i['symbol'], line=data_i['line']))
self.data[new_sets[-1]].update(data_i.data.meta)
self.newData.emit(new_sets, kwargs['graph'])
def skip_points(self, offset: int, step: int, invert: bool = False, copy: bool = False):
for k in self.active_id:
src = self.data[k]
if invert:
mask = np.mod(np.arange(offset, src.x.size+offset), step) != 0
else:
mask = np.mod(np.arange(offset, src.x.size+offset), step) == 0
if copy:
data = src.copy()
temp = data.mask.copy()
temp[temp] = mask
data.remove(np.where(~temp))
data.mask = np.ones(data.x.shape)
idd = self.add(data)
self.newData.emit([idd], self.current_graph)
else:
src.mask[src.mask] = mask
src.mask = src.mask
@QtCore.pyqtSlot(dict)
def calc_relaxation(self, opts: dict):
params = opts['pts']
if len(params) == 4:
if params[3]:
_x = x1 = np.geomspace(params[0], params[1], num=params[2])
else:
_x = x1 = np.linspace(params[0], params[1], num=params[2])
if opts['axis1'] in ['t', 'invt1000']:
t_p = opts['t_param']
if len(t_p) == 2:
from nmreval.models import Arrhenius as Func
else:
from nmreval.models import VFT as Func
_x = Func.func(x1, *t_p, invt=opts['axis1'])
else:
if params[1]:
x1 = self.data[params[0]].x
_x = self.data[params[0]].y.real
else:
_x = x1 = self.data[params[0]].x
x2 = opts['val2']
sd = opts['spec_dens']
sd_param = [self.data[p].y.real if isinstance(p, str) else p for p in opts['sd_param'][0]]
sd.convert(_x, *sd_param, from_=opts['tau_type'], to_='raw')
relax = Relaxation()
relax.set_distribution(sd, parameter=sd_param, keywords=opts['sd_param'][1])
cp_param = [self.data[p].y.real if isinstance(p, str) else p for p in opts['cp_param'][0]]
relax.set_coupling(opts['coup'], parameter=cp_param, keywords=opts['cp_param'][1])
if opts['out'] == 't1':
y = relax.t1(x2, _x)
else:
y = relax.t2(x2, _x)
pts = Points(x1, y, name=sd.name)
pts.meta.update(opts)
# we do not want class instances
pts.meta['coup'] = opts['coup'].name
pts.meta['spec_dens'] = sd.name
self.newData.emit([self.add(pts)], opts['graph'])
self.sender().update_graphs(self.graphs.list())
@QtCore.pyqtSlot(str, list)
def mask_value(self, idx: str, m: list):
self.data[idx].mask = m
@QtCore.pyqtSlot(str, list)
def remove_values(self, idx: str, m: list):
self.data[idx].remove(m)
@QtCore.pyqtSlot(str, int)
def split_set(self, idx: str, row: int):
selected_data = self.data[idx]
popular_front_of_judea = selected_data.copy(full=True)
selected_data.remove(slice(row, None))
popular_front_of_judea.remove(slice(None, row))
new_id = self.add(popular_front_of_judea)
self.newData.emit([new_id], selected_data.graph)
def set_values(self, idx: str, pos: tuple, value):
self.data[idx].setvalues(pos, value)
def append(self, idx: str):
self.data[idx].add([0.0, 0.0, 0.0])
def save(self, outpath: str | pathlib.Path, extension: str, strip_spaces=False):
path = pathlib.Path(outpath)
suffix = path.suffix
if not suffix:
m = re.match(r'[\w\s]*\(\*(\.\w+)\)', extension)
if m:
suffix = m.group(1)
path = path.with_suffix(suffix)
else:
raise ValueError('No file extension detected')
if suffix == '.nmr':
from nmreval.io.sessionwriter import NMRWriter
NMRWriter(self.graphs, self.data).export(path)
return
real_outnames = []
for set_id, set_name in self.active_sets:
full_name = path.stem
if '<label>' in full_name:
full_name = full_name.replace('<label>', convert(set_name, old='tex', new='str'))
data_i = self.data[set_id]
if isinstance(data_i, FitContainer):
try:
full_name += ' fit(' + self.data[data_i.parent_set].name + ')'
except KeyError:
# TODO fits should have a parent set
logger.warning(f'Fit {data_i} without valid parent : key is [{data_i.parent_set}')
full_name += ' fit'
if strip_spaces:
full_name = full_name.replace(' ', '_')
if full_name in real_outnames:
cnt = 1
while (full_name + '_' + str(cnt)) in real_outnames:
cnt += 1
real_outnames.append(full_name + '_' + str(cnt))
else:
real_outnames.append(full_name)
out_name = real_outnames[-1]
bad_character = r'/*<>\|:"'
for c in bad_character:
out_name = out_name.replace(c, '')
outpath_set = path.with_name(out_name+path.suffix)
data_i.save(outpath_set)
class FitWorker(QtCore.QObject):
finished = QtCore.pyqtSignal(list, bool)
def __init__(self, fitter):
super().__init__()
self.fitter = fitter
@QtCore.pyqtSlot()
def run(self):
try:
res = self.fitter.run()
success = True
except Exception as e:
res = [e]
success = False
self.finished.emit(res, success)