1
0
forked from IPKM/nmreval
nmreval/nmreval/gui_qt/main/management.py
2022-03-24 17:35:10 +01:00

1073 lines
35 KiB
Python

from __future__ import annotations
import pathlib
import re
import uuid
from typing import List
from ...fit import data as fit_d
from ...fit.model import Model
from ...fit.result import FitResult
from ...fit.minimizer import FitRoutine
from ...math.interpol import interpolate
from ...math.logfourier import logft
from ...math.smooth import smooth
from ...nmr.relaxation import Relaxation
from ..lib.undos import *
from ..data.container import *
from ..io.filereaders import QFileReader
from ..lib.utils import busy_cursor
class GraphSignals(QtCore.QObject):
valueChanged = QtCore.pyqtSignal()
class GraphDict(OrderedDict):
def __init__(self, data):
super().__init__()
self._data = data
self.signals = GraphSignals()
self.valueChanged = self.signals.valueChanged
def __setitem__(self, key, value):
super().__setitem__(key, value)
self.valueChanged.emit()
def __delitem__(self, key):
super().__delitem__(key)
self.valueChanged.emit()
def tree(self, key_only=False):
ret_val = OrderedDict()
for k, g in self.items():
if key_only:
ret_val[k] = (g.title, [(s, self._data[s].name) for s in g.sets])
else:
ret_val[(k, g.title)] = [(s, self._data[s].name) for s in g.sets]
return ret_val
def list(self):
return [(k, v.title) for k, v in self.items()]
def active(self, key: str):
if key:
return [(self._data[i].id, self._data[i].name) for i in self[key]]
else:
return []
def current_sets(self, key: str):
if key:
return [(self._data[i].id, self._data[i].name) for i in self[key].sets]
else:
return []
class UpperManagement(QtCore.QObject):
newGraph = QtCore.pyqtSignal()
restoreGraph = QtCore.pyqtSignal(str)
deleteGraph = QtCore.pyqtSignal(str)
newData = QtCore.pyqtSignal(list, str)
deleteData = QtCore.pyqtSignal(str)
dataChanged = QtCore.pyqtSignal(str)
fitFinished = QtCore.pyqtSignal(list)
stopFit = QtCore.pyqtSignal()
properties_collected = QtCore.pyqtSignal(dict)
unset_state = QtCore.pyqtSignal(list)
_colors = cycle(Colors)
_actions = {
'ls': (ShiftCommand, 'Left shift'),
'cut': (CutCommand, 'Cut'),
'ap': (ApodizationCommand, 'Apodization'),
'zf': (ZerofillCommand, 'Zerofill'),
'ph': (PhaseCommand, 'Phase'),
'bl': (BaselineCommand, 'Baseline'),
'bls': (BaselineSplineCommand, 'Baseline'),
'ft': (FourierCommand, 'Fourier'),
'ft_pake': 'FT (de-paked)',
'sort': (SortCommand, 'Sort'),
'norm': (NormCommand, 'Normalize'),
'center': (CenterCommand, 'Center on max')
}
def __init__(self, window):
super().__init__()
self._fit_active = False
self.fit_thread = None
self.fit_worker = None
self.counter = 0
self.data = OrderedDict()
self.window = window
self.current_graph = ''
self.graphs = GraphDict(self.data)
self.namespace = None
self.undostack = QtWidgets.QUndoStack()
self.deleteData.connect(self.plot_from_graph)
def __setitem__(self, key: str, value, **kwargs):
if isinstance(value, ExperimentContainer):
item = value
item.id = key
elif isinstance(value, FitResult):
item = FitContainer(key, value, manager=self, **kwargs)
elif isinstance(value, Signal):
item = SignalContainer(key, value, manager=self, **kwargs)
else:
item = PointContainer(key, value, manager=self, **kwargs)
item.dataChanged.connect(lambda x: self.dataChanged.emit(x))
self.data[key] = item
def __getitem__(self, item):
return self.data[item]
def __contains__(self, item):
return item in self.data
def __iter__(self):
for k, v in self.data.items():
yield k, v
@property
def active_sets(self):
return self.graphs.active(self.current_graph)
def add(self, data, **kwargs):
_id = str(uuid.uuid4())
self.__setitem__(_id, data, **kwargs)
return _id
def load_files(self, fname: List[str], new_plot: str = None):
ret_dic = QFileReader(manager=self).readfiles(fname)
self.add_new_data(ret_dic, new_plot)
def _load_session(self, sets: dict, graphs: dict):
sid = self._load_sets(sets)
for g in graphs:
_ = g.pop('id')
graph = QGraphWindow.set_state(g)
self.graphs[graph.id] = graph
self.restoreGraph.emit(graph.id)
children = [sid[c] for c in g['children']]
active = [sid[c] for c in g['active']]
inactive = [k for k in children if k not in active]
self.newData.emit(children, graph.id)
graph.active = active
graph.listWidget.blockSignals(True)
for i, l in enumerate(g['in_legend']):
graph.listWidget.item(i).setCheckState(l)
graph.listWidget.blockSignals(False)
# set unchecked in tree and hide/show in plot
self.unset_state.emit(inactive)
self.change_visibility(active, inactive)
def _load_sets(self, sets: dict) -> dict:
sid = {}
for _id, (data, opts) in sets.items():
if isinstance(data, FitResult):
# for fits, _id belongs to the fitted data, not the fit
src_id = data.idx
if src_id in sid:
new_id = self.add(data, src=sid[src_id])
self.data[sid[src_id]]._fits.append(new_id)
else:
new_id = self.add(data)
else:
new_id = self.add(data)
sid[_id] = new_id
for m in ['real', 'imag']:
if m in opts:
self.data[new_id].setSymbol(**opts[m][0], mode=m)
self.data[new_id].setLine(**opts[m][1], mode=m)
return sid
def add_new_data(self, data: list, gid: str):
sid = []
for d in data:
if isinstance(d, tuple):
if len(d) == 2:
self._load_session(d[0], graphs=d[1])
else:
sid.extend(list(self._load_sets(d[0]).values()))
else:
sid.append(self.add(d))
if sid:
gid = '' if not gid else gid
self.newData.emit(sid, gid)
def plots_to_graph(self, plotkeys: list, gid: str):
self.graphs[gid].add(plotkeys, [self.data[k].plots for k in plotkeys])
for k in plotkeys:
self.data[k].graph = gid
@QtCore.pyqtSlot(str)
def plot_from_graph(self, key: str):
self.graphs[self.data[key].graph].remove(key)
@QtCore.pyqtSlot(list, str, str)
def move_sets(self, sets: list, dest: str, src, pos: int = -1):
if isinstance(src, str):
src = [src]*len(sets)
for graph_id, set_id in zip(src, sets):
# move all plots to the same graph
if graph_id != dest:
self.graphs[graph_id].remove(set_id)
self.plots_to_graph([set_id], dest)
# move to correct position
self.graphs[dest].move_sets(sets, pos)
@QtCore.pyqtSlot()
@QtCore.pyqtSlot(list, str)
def copy_sets(self, sets: list = None, src: str = None):
if sets is None:
sets = self.graphs[self.current_graph].active[:]
if src is None:
src = self.current_graph
new_ids = []
for s in sets:
copy_of_s = self.data[s].copy(full=True)
copy_of_s.id = str(uuid.uuid4())
new_ids.append(copy_of_s.id)
self.data[copy_of_s.id] = copy_of_s
self.newData.emit(new_ids, src)
return new_ids
@QtCore.pyqtSlot(list)
@QtCore.pyqtSlot(str)
def delete_sets(self, rm_sets: list = None):
rm_graphs = []
if rm_sets is None:
rm_sets = self.graphs[self.current_graph].sets + [self.current_graph]
self.undostack.beginMacro('Delete')
for k in rm_sets[::-1]:
if k in self.data:
cmd = DeleteCommand(self.data, k, self.newData, self.deleteData)
self.undostack.push(cmd)
else:
rm_graphs.append(k)
for k in rm_graphs:
cmd = DeleteGraphCommand(self.graphs, k, self.restoreGraph, self.deleteGraph)
self.undostack.push(cmd)
self.undostack.endMacro()
@QtCore.pyqtSlot()
def cat(self, src_sets=None):
joined = None
group_set = set()
name_set = set()
value_set = set()
if src_sets is None:
src_sets = self.graphs[self.current_graph].active
for sid in src_sets:
data_i = self.data[sid]
if joined is None:
joined = data_i.copy()
else:
joined.append(data_i.x, data_i.y, data_i.y_err)
name_set.add(data_i.name)
group_set.add(data_i.group)
value_set.add(data_i.value)
if joined is not None:
joined.group = '/'.join(group_set)
joined.name = '/'.join(name_set)
if len(value_set) == 1:
joined.value = value_set.pop()
else:
joined.value = 0.0
self.newData.emit([self.add(joined)], self.current_graph)
def get_data(self, sid: str, xy_only=False):
"""
Return data for a given id.
Return value is tuple of [x, y, y_err] and mask if xy_only is False, [x, y] if true.
"""
d = self.data[sid]
if xy_only:
return [d.x, d.y]
return [d.data.x, d.data.y, d.data.y_err], d.data.mask.data
def change_visibility(self, selected: list, deselected: list):
"""Change status of list of ids after status change in datawidget"""
for s in selected:
self.graphs[self.data[s].graph].show_item([s])
for d in deselected:
self.graphs[self.data[d].graph].hide_item([d])
@QtCore.pyqtSlot(str, str)
def change_keys(self, identifier: str, name: str):
if identifier in self.data:
d = self.data[identifier]
d.name = name
self.graphs[d.graph].update_legend(identifier, name)
elif identifier in self.graphs:
self.graphs[identifier].title = name
else:
raise KeyError('Unknown ID ' + str(identifier))
@QtCore.pyqtSlot(str, tuple)
def apply(self, func: str, arguments: tuple):
# undos, names displayed by undo action
cmd, cmd_text = self._actions[func]
self.undostack.beginMacro(cmd_text)
for sid in self.graphs[self.current_graph]:
single_undo = cmd(self.data[sid], *arguments)
self.undostack.push(single_undo)
self.undostack.endMacro()
def cut(self):
xlim, _ = self.graphs[self.current_graph].ranges
self.apply('cut', xlim)
@QtCore.pyqtSlot()
def unmask(self):
for d in self.data.values():
d.mask = np.ones_like(d.mask, dtype=bool)
def start_fit(self, parameter: dict, links: list, fit_options: dict):
if self._fit_active:
return
self.__fit_options = (parameter, links, fit_options)
fitter = FitRoutine()
models = {}
fit_limits = fit_options['limits']
fit_mode = fit_options['fit_mode']
we = fit_options['we']
for model_id, model_p in parameter.items():
m = Model(model_p['func'])
models[model_id] = m
m_complex = model_p['complex']
m.set_complex(m_complex)
for set_id, set_params in model_p['parameter'].items():
data_i = self.data[set_id]
if we == 'Deltay':
we = data_i.y_err**2
if m_complex is None or m_complex == 'real':
_y = data_i.y.real
elif m_complex == 'imag' and np.iscomplexobj(self.data[set_id].y):
_y = data_i.y.imag
else:
_y = data_i.y
_x = data_i.x
if fit_limits == 'none':
inside = slice(None)
elif fit_limits == 'x':
x_lim, _ = self.graphs[self.current_graph].ranges
inside = np.where((_x >= x_lim[0]) & (_x <= x_lim[1]))
else:
inside = np.where((_x >= fit_limits[0]) & (_x <= fit_limits[1]))
if isinstance(we, str):
d = fit_d.Data(_x[inside], _y[inside], we=we, idx=set_id)
else:
d = fit_d.Data(_x[inside], _y[inside], we=we[inside], idx=set_id)
d.set_model(m)
d.set_parameter(set_params[0], var=model_p['var'],
lb=model_p['lb'], ub=model_p['ub'],
fun_kwargs=set_params[1])
fitter.add_data(d)
model_globs = model_p['glob']
if model_globs:
m.set_global_parameter(**model_p['glob'])
for links_i in links:
fitter.set_link_parameter((models[links_i[0]], links_i[1]),
(models[links_i[2]], links_i[3]))
with busy_cursor():
self.fit_worker = FitWorker(fitter, fit_mode)
self.fit_thread = QtCore.QThread()
self.fit_worker.moveToThread(self.fit_thread)
self.fit_thread.started.connect(self.fit_worker.run)
self.fit_worker.finished.connect(self.end_fit)
self.fit_worker.finished.connect(self.fit_thread.quit)
self.fit_worker.finished.connect(self.fit_worker.deleteLater)
self.fit_thread.finished.connect(self.fit_thread.deleteLater)
self.stopFit.connect(lambda: self.fit_worker.fitter.abort())
self.fit_thread.start()
@QtCore.pyqtSlot(list, bool)
def end_fit(self, result: list, success: bool):
print('FIT FINISHED')
if success:
self.fitFinished.emit(result)
else:
QtWidgets.QMessageBox.warning(QtWidgets.QWidget(), 'Fit failed',
'Fit kaput with exception: \n' + "\n".join(result[0]))
self.fitFinished.emit([])
self._fit_active = False
@QtCore.pyqtSlot(dict)
def redo_fits(self, res: dict):
models = self.__fit_options[0]
for single_model, model_args in models.items():
parameter = model_args['parameter']
for set_id, set_parameter in parameter.items():
new_values = [v.value for v in res[set_id].parameter.values()]
parameter[set_id] = (new_values, set_parameter[1])
self.start_fit(*self.__fit_options)
@QtCore.pyqtSlot(dict, list)
def make_fits(self, res: dict, opts: list):
f_id_list = []
gid = ''
subplots = opts.pop(-1)
param_graph = opts.pop(-1)
tobedeleted = []
for i, (k, fit) in enumerate(res.items()):
reject, delete_prev = opts[i]
if reject:
continue
data_k = self.data[k]
if delete_prev:
tobedeleted.extend([f.id for f in data_k.get_fits()])
data_k.set_fits([])
syms = data_k.plot_real.symbol
if syms == SymbolStyle.No:
color = data_k.plot_real.linecolor
else:
color = data_k.plot_real.symbolcolor
fit.value = data_k.value
fit.group = data_k.group
f_id = self.add(fit, color=color, src=k)
f_id_list.append(f_id)
data_k.set_fits(f_id)
gid = data_k.graph
self.delete_sets(tobedeleted)
if f_id_list:
self.newData.emit(f_id_list, gid)
self.make_fit_parameter(f_id_list, graph_id=param_graph)
def make_fit_parameter(self, fit_sets: List[str], graph_id: str = None):
fit_dict = self._collect_fit_parameter(fit_sets)
if fit_dict:
p_id_list = []
for v in fit_dict.values():
xy = np.array(v[0]).T
p_id_list.append(self.add(Points(x=xy[0], y=xy[1], y_err=xy[2], name=v[1])))
if not graph_id:
graph_id = ''
self.newData.emit(p_id_list, graph_id)
def save_fit_parameter(self, fname: str | pathlib.Path, fit_sets: List[str] = None):
if fit_sets is None:
fit_sets = [s for (s, _) in self.active_sets]
for set_id in fit_sets:
data = self.data[set_id]
if data.mode != 'fit':
continue
data.data.save_parameter(fname)
def _collect_fit_parameter(self, fit_sets: List[str]) -> dict:
fit_dict = {}
for set_id in fit_sets:
data = self.data[set_id]
if data.mode != 'fit':
continue
for key, pvalue in data.parameter.items():
name = pvalue.full_name
fit_key = key + data.model_name
if fit_key not in fit_dict:
fit_dict[fit_key] = [[], name]
err = 0 if pvalue.error is None else pvalue.error
fit_dict[fit_key][0].append([data.value, pvalue.value, err])
return fit_dict
@QtCore.pyqtSlot(dict, str)
def extract_points(self, params: dict, gid: str):
xy_mode = params.pop('xy')
_active = self.graphs[self.current_graph].active
new_datasets = {}
for sid in _active:
data_i = self.data[sid]
if data_i.group not in new_datasets:
new_datasets[data_i.group] = [], []
new_x_axis, _temp = new_datasets[data_i.group]
new_x_axis.append(data_i.value)
_temp.append(data_i.points(params))
key_list = []
for label, (new_x_axis, _temp) in new_datasets.items():
_temp = np.array(_temp) # (number of sets, number of picks, (x, y, y_err))
num_pts = _temp.shape[1]
for i in range(num_pts):
if xy_mode[0]:
key = self.add(Points(x=new_x_axis, y=_temp[:, i, 0], name=label))
key_list.append(key)
if xy_mode[1]:
key = self.add(Points(x=new_x_axis, y=_temp[:, i, 1], y_err=_temp[:, i, 2], name=label))
key_list.append(key)
self.newData.emit(key_list, gid)
@QtCore.pyqtSlot(list)
def get_properties(self, sid: list) -> dict:
props = {}
for key in sid:
if key not in self.data:
continue
props = self.data[key].get_properties()
self.properties_collected.emit(props)
return props
@QtCore.pyqtSlot(list, str, str, object)
def update_property(self, sid: list, key1: str, key2: str, value: Any):
for s in sid:
self.data[s].update_property(key1, key2, value)
def create_empty(self):
import numpy.random as random
dat = Points(x=np.arange(10), y=np.arange(10) + random.rand(10)-0.5, y_err=random.rand(10),
name='Das Sein und das Nichts')
idd = self.add(dat)
self.newData.emit([idd], self.current_graph)
@QtCore.pyqtSlot(tuple, dict, str)
def calc_mean(self, dist_params, conversion, graph):
dist, args = dist_params
parameter = []
x = None
name = 'tau (%s)' % {conversion["to_"]}
value = 0.
for i, p in enumerate(args):
if isinstance(p, float):
parameter.append(p)
else:
if x is None:
x = self.data[p].x
if i == 0:
name = self.data[p].name
value = self.data[p].value
parameter.append(self.data[p].y)
if x is None:
x = 0
key = self.add(Points(x, dist.convert(*parameter, **conversion), name=name, value=value))
self.newData.emit([key], graph)
self.sender().update_graphs(self.graphs.tree(key_only=True))
@QtCore.pyqtSlot(list, str, bool, bool, tuple, str)
def interpolate_data(self, data_ids, mode, xlog, ylog, new_axis, dest_graph):
if len(new_axis) == 4:
start, end, steps, loggy = new_axis
if loggy:
new_x = np.logspace(np.log10(start), np.log10(end), steps)
else:
new_x = np.linspace(start, end, steps)
else:
new_x = self.data[new_axis[0]].x
new_key = []
for ids in data_ids:
k = self.add(interpolate(self.data[ids], new_x, xlog=xlog, ylog=ylog, kind=mode, extrapolate=True))
new_key.append(k)
self.newData.emit(new_key, dest_graph)
@QtCore.pyqtSlot(int, dict)
def smooth_data(self, npoints, param_kwargs):
_active = self.graphs[self.current_graph].active
new_data = []
for sid in _active:
try:
key = self.add(smooth(self.data[sid], npoints, **param_kwargs))
new_data.append(key)
except Exception as e:
QtWidgets.QMessageBox().warning(self.window,
'Smoothing failed!',
f'Smoothing failed for {self.data[sid].name} with exception:\n{e.args}')
if new_data:
self.newData.emit(new_data, self.current_graph)
@QtCore.pyqtSlot()
def update_color(self):
UpperManagement._colors = cycle(Colors)
for i in self.active:
self.data[i].color = next(UpperManagement._colors)
@QtCore.pyqtSlot(dict, tuple)
def shift_scale(self, values: dict, options: tuple):
copy_data, value_plot = options
sid_list = []
shift_y = []
shift_x = []
for k, v in values.items():
d_k = self.data[k]
if copy_data is None:
d_k.x = d_k.x*v[1][0] + v[0][0]
d_k.y = d_k.y*v[1][1] + v[0][1]
else:
new_data = d_k.copy(full=True)
new_data.update({'shift': v[0], 'scale': v[1]})
new_data.data.x = new_data.x*v[1][0] + v[0][0]
new_data.y = new_data.y*v[1][1] + v[0][1]
sid = self.add(new_data)
sid_list.append(sid)
shift_x.append(d_k.value)
shift_y.append(v[0]+v[1])
self.newData.emit(sid_list, copy_data)
if value_plot is not None:
sid_list = []
shift_y = np.array(shift_y)
for i, (mode, default) in enumerate([('x shift', 0.), ('y shift', 0.),
('x scale', 1.), ('y scale', 1.), ]):
if np.all(shift_y[:, i] == default):
continue
data = Points(shift_x, shift_y[:, i], name=mode)
sid_list.append(self.add(data))
self.newData.emit(sid_list, value_plot)
@QtCore.pyqtSlot(list)
def convert_sets(self, src: list):
new_graph = {}
error_list = []
for sets in src:
# merge: sets (real, imag, graph, type)
# normal: sets (source set, graph, type)
graph_id = sets[-2]
new_type = [Points, FID, Spectrum, BDS][sets[-1]]
if len(sets) == 4:
real_set, imag_set = sets[0], sets[1]
if real_set != '':
data = self.data[real_set]
new_data = new_type(data.x, data.y.real)
if imag_set != '':
imag_data = self.data[imag_set]
if len(imag_data) == len(data):
new_data.y.imag = imag_data.y.real
else:
error_list.append(f'Lengths mismatch of {data.name} ({len(data)}) and {imag_data.name} ({len(imag_data)})')
continue
else:
data = self.data[imag_set]
new_data = new_type(data.x, np.zeros(data.x.size))
new_data.y.imag = data.y.real
else:
data = self.data[sets[0]]
if isinstance(data.data, new_type):
error_list.append(f'{data.name} is alreade of type {new_type.__name__}')
continue
new_data = new_type(data.x, np.zeros(data.x.size))
new_data.y.real = data.y.real
new_data.update(data.opts)
new_id = self.add(data.change_type(new_data))
if graph_id not in new_graph:
new_graph[graph_id] = []
new_graph[graph_id].append(new_id)
for g, s in new_graph.items():
self.newData.emit(s, g)
if error_list:
err_string = "\n- ".join(error_list)
_ = QtWidgets.QMessageBox.information(QtWidgets.QWidget(), 'Something was skipped',
f'Some conversions were skipped:\n{err_string}')
def get_namespace(self):
from ..lib.namespace import Namespace
self.namespace = Namespace(basic=True, const=True, fitfuncs=True)
for i, g in enumerate(self.graphs.values()):
for j, sid in enumerate(g.sets):
sets = self.data[sid]
self.namespace.add_namespace(sets.get_namespace(i, j), parents=('Data', f'{sets.name} ({g.title})'))
return self.namespace
@QtCore.pyqtSlot(list, list, bool)
def eval_expression(self, cmds: list, set_ids: list, overwrite: bool):
ns = self.namespace.flatten()
if overwrite:
self.undostack.beginMacro('Evaluate expression')
failures = []
for sid in set_ids:
data_i = self.data[sid]
try:
# use a copy of original namespace
new_data = data_i.eval_expression(cmds, dict(ns))
if overwrite:
cmd = EvalCommand(self.data, sid, new_data, 'Evaluate expression')
self.undostack.push(cmd)
else:
new_id = self.copy_sets(sets=[sid])
self.data[new_id[0]].data = new_data
except Exception as e:
failures.append((data_i, e))
print(str(data_i) + ' failed with Exception: ' + ''.join(e.args))
continue
if overwrite:
self.undostack.endMacro()
if failures:
err_msg = QtWidgets.QMessageBox(parent=self.sender())
err_msg.setText('One or more errors occured during evaluation.')
err_msg.setDetailedText('\n'.join(f'{d.name} failed with error: {err.args}' for d, err in failures))
err_msg.exec()
self.sender().success = not failures
self.sender().add_data(self.active_sets)
@QtCore.pyqtSlot(list, dict)
def create_from_function(self, cmds: list, opts: dict):
ns = dict(self.namespace.flatten())
dtype = [Points, FID, Spectrum, BDS][opts.pop('dtype')]
try:
for c in cmds:
exec(c, globals(), ns)
name = opts.pop('name')
value = opts.pop('val')
graph = opts.pop('graph')
data = dtype(x=ns['x'], y=ns['y'], y_err=ns['y_err'], name=name, value=value)
s_id = self.add(data, **opts)
self.sender().success = True
self.newData.emit([s_id], graph)
except Exception as err:
print('Creation failed with error: ' + ', '.join(err.args))
err_msg = QtWidgets.QMessageBox(parent=self.sender())
err_msg.setText('One or more errors occured during evaluation.')
err_msg.setDetailedText('Creation failed with error: ' + ', '.join(err.args))
err_msg.exec()
self.sender().success = False
def show_statistics(self, mode):
x, y, = [], []
for i, _ in self.active_sets:
_temp = self.data[i]
try:
x.append(float(_temp.name))
except ValueError:
x.append(i)
y.append(_temp.statistic(mode))
@QtCore.pyqtSlot()
def calc_magn(self):
new_id = []
for k, _ in self.active_sets:
dataset = self.data[k]
if isinstance(dataset, SignalContainer):
new_value = dataset.copy(full=True)
new_value.data = dataset.data.magnitude()
new_id.append(self.add(new_value))
self.newData.emit(new_id, '')
@QtCore.pyqtSlot()
def center(self):
new_id = []
for k, _ in self.active_sets:
new_value = self.data[k].copy(full=True)
new_value.x -= new_value.x[np.argmax(new_value.y.real)]
new_id.append(self.add(new_value))
self.newData.emit(new_id, '')
def integrate(self, **kwargs):
new_sets = []
log = kwargs['log']
limits = kwargs.get('limits')
mode = kwargs['mode']
for set_id in kwargs['sets']:
data_i = self.data[set_id]
if mode == 'i':
new_data = data_i.data.integrate(log=log, limits=limits)
elif mode == 'd':
new_data = data_i.data.diff(log=log)
else:
raise ValueError(f'Unknown mode {mode}.')
new_container = data_i.copy(full=True)
new_container.data = new_data
new_sets.append(self.add(new_container))
self.newData.emit(new_sets, kwargs['graph'])
def bds_deriv(self):
new_sets = []
for (set_id, _) in self.active_sets:
data_i = self.data[set_id]
diff = data_i.data.diff(log=True)
new_data = Points(x=diff.x, y=-np.pi/2*diff.y.real)
new_data.update(data_i.data.meta)
new_sets.append(self.add(new_data, color=data_i.plot_imag.linecolor))
self.newData.emit(new_sets, '')
def logft(self, **kwargs):
new_sets = []
ft_mode = kwargs['ft_mode']
for set_id in kwargs['sets']:
data_i = self.data[set_id]
if ft_mode in ['cos', 'sin']:
new_data = Points(*logft(data_i.x, data_i.y, mode=ft_mode))
else:
new_data = Signal(*logft(data_i.x, data_i.y, mode=ft_mode))
new_sets.append(self.add(new_data, color=data_i['color'], symbol=data_i['symbol'], line=data_i['line']))
self.data[new_sets[-1]].update(data_i.data.meta)
self.newData.emit(new_sets, kwargs['graph'])
def skip_points(self, offset: int, step: int, invert: bool = False, copy: bool = False):
for k, _ in self.active_sets:
src = self.data[k]
if invert:
mask = np.mod(np.arange(offset, src.x.size+offset), step) != 0
else:
mask = np.mod(np.arange(offset, src.x.size+offset), step) == 0
if copy:
data = src.copy()
temp = data.mask.copy()
temp[temp] = mask
data.remove(np.where(~temp))
data.mask = np.ones(data.x.shape)
idd = self.add(data)
self.newData.emit([idd], self.current_graph)
else:
src.mask[src.mask] = mask
src.mask = src.mask
@QtCore.pyqtSlot(dict)
def calc_relaxation(self, opts: dict):
params = opts['pts']
if len(params) == 4:
if params[3]:
_x = x1 = np.geomspace(params[0], params[1], num=params[2])
else:
_x = x1 = np.linspace(params[0], params[1], num=params[2])
if opts['axis1'] in ['t', 'invt1000']:
t_p = opts['t_param']
if len(t_p) == 2:
from ...models import Arrhenius as Func
else:
from ...models import VFT as Func
_x = Func.func(x1, *t_p, invt=opts['axis1'])
else:
if params[1]:
x1 = self.data[params[0]].x
_x = self.data[params[0]].y.real
else:
_x = x1 = self.data[params[0]].x
x2 = opts['val2']
sd = opts['spec_dens']
sd_param = [self.data[p].y.real if isinstance(p, str) else p for p in opts['sd_param'][0]]
sd.convert(_x, *sd_param, from_=opts['tau_type'], to_='raw')
relax = Relaxation()
relax.set_distribution(sd, parameter=sd_param, keywords=opts['sd_param'][1])
cp_param = [self.data[p].y.real if isinstance(p, str) else p for p in opts['cp_param'][0]]
relax.set_coupling(opts['coup'], parameter=cp_param, keywords=opts['cp_param'][1])
if opts['out'] == 't1':
y = relax.t1(x2, _x)
else:
y = relax.t2(x2, _x)
pts = Points(x1, y, name=sd.name)
pts.meta.update(opts)
# we do not want class instances
pts.meta['coup'] = opts['coup'].name
pts.meta['spec_dens'] = sd.name
self.newData.emit([self.add(pts)], opts['graph'])
self.sender().update_graphs(self.graphs.list())
def mask_value(self, idx: str, m: list):
self.data[idx].mask = m
def remove_values(self, idx: str, m: list):
self.data[idx].remove(m)
def set_values(self, idx: str, pos: tuple, value):
self.data[idx].setvalues(pos, value)
def append(self, idx: str):
self.data[idx].add([0.0, 0.0, 0.0])
def save(self, outpath: str | pathlib.Path, extension: str, strip_spaces=False):
path = pathlib.Path(outpath)
suffix = path.suffix
if not suffix:
m = re.match(r'[\w\s]*\(\*(\.\w+)\)', extension)
if m:
suffix = m.group(1)
path = path.with_suffix(suffix)
else:
raise ValueError('No file extension detected')
if suffix == '.nmr':
from ...io.sessionwriter import NMRWriter
NMRWriter(self.graphs, self.data).export(path)
return
real_outnames = []
for set_id, set_name in self.active_sets:
full_name = path.stem
if '<label>' in full_name:
full_name = full_name.replace('<label>', convert(set_name, old='tex', new='str'))
data_i = self.data[set_id]
if isinstance(data_i, FitContainer):
full_name += ' fit(' + self.data[data_i.parent_set].name + ')'
if strip_spaces:
full_name = full_name.replace(' ', '_')
if full_name in real_outnames:
cnt = 1
while (full_name + '_' + str(cnt)) in real_outnames:
cnt += 1
real_outnames.append(full_name + '_' + str(cnt))
else:
real_outnames.append(full_name)
outpath_set = path.with_name(real_outnames[-1]+path.suffix)
data_i.save(outpath_set)
class FitWorker(QtCore.QObject):
finished = QtCore.pyqtSignal(list, bool)
def __init__(self, fitter, mode):
super().__init__()
self.fitter = fitter
self.mode = mode
@QtCore.pyqtSlot()
def run(self):
try:
res = self.fitter.run(mode=self.mode)
success = True
except Exception as e:
res = [e.args]
success = False
self.finished.emit(res, success)