from __future__ import annotations import pathlib import re import uuid import numpy as np from nmreval.fit import data as fit_d from nmreval.fit.model import Model from nmreval.fit.result import FitResult from nmreval.fit.minimizer import FitRoutine from nmreval.lib.colors import available_cycles from nmreval.lib.logger import logger from nmreval.math.interpol import interpolate from nmreval.math.logfourier import logft from nmreval.math.smooth import smooth from nmreval.nmr.relaxation import Relaxation from ..Qt import QtCore, QtWidgets from ..lib import Relations from ..lib.undos import * from ..data.container import * from ..io.filereaders import QFileReader from ..lib.utils import busy_cursor class GraphSignals(QtCore.QObject): valueChanged = QtCore.pyqtSignal() class GraphDict(OrderedDict): def __init__(self, data): super().__init__() self._data = data self.signals = GraphSignals() self.valueChanged = self.signals.valueChanged def __setitem__(self, key, value): super().__setitem__(key, value) self.valueChanged.emit() def __delitem__(self, key): super().__delitem__(key) self.valueChanged.emit() def tree(self, key_only=False): ret_val = OrderedDict() for k, g in self.items(): if key_only: ret_val[k] = (g.title, [(s, self._data[s].name) for s in g.sets]) else: ret_val[(k, g.title)] = [(s, self._data[s].name) for s in g.sets] return ret_val def list(self): return [(k, v.title) for k, v in self.items()] def active(self, key: str, return_val: str = 'both'): if not key: return [] else: if return_val == 'both': return [(self._data[i].id, self._data[i].name) for i in self[key]] elif return_val == 'id': return [self._data[i].id for i in self[key]] elif return_val == 'name': return [self._data[i].name for i in self[key]] else: raise ValueError(f'return_val got wrong value {return_val!r}') def current_sets(self, key: str): if key: return [(self._data[i].id, self._data[i].name) for i in self[key].sets] else: return [] class UpperManagement(QtCore.QObject): newGraph = QtCore.pyqtSignal() restoreGraph = QtCore.pyqtSignal(str) deleteGraph = QtCore.pyqtSignal(str) newData = QtCore.pyqtSignal([list, str], [list, str, bool]) deleteData = QtCore.pyqtSignal(list) dataChanged = QtCore.pyqtSignal(str) fitFinished = QtCore.pyqtSignal(list) stopFit = QtCore.pyqtSignal() properties_collected = QtCore.pyqtSignal(dict) unset_state = QtCore.pyqtSignal(list) _colors = cycle(TUColors) _actions = { 'ls': (ShiftCommand, 'Left shift'), 'cut': (CutCommand, 'Cut'), 'ap': (ApodizationCommand, 'Apodization'), 'zf': (ZerofillCommand, 'Zerofill'), 'ph': (PhaseCommand, 'Phase correction'), 'autoph': (AutophaseCommand, 'Autophase'), 'bl': (BaselineCommand, 'Baseline'), 'bls': (BaselineSplineCommand, 'Baseline'), 'ft': (FourierCommand, 'Fourier'), 'ft_pake': 'FT (de-paked)', 'sort': (SortCommand, 'Sort'), 'norm': (NormCommand, 'Normalize'), 'center': (CenterCommand, 'Center on max'), } def __init__(self, window): super().__init__() self._fit_active = False self.fit_thread = None self.fit_worker = None self.counter = 0 self.data = OrderedDict() self.window = window self.current_graph = None self.graphs = GraphDict(self.data) self.namespace = None self.undostack = QtWidgets.QUndoStack() self.deleteData.connect(self.plot_from_graph) self._filereader = None def __setitem__(self, key: str, value, **kwargs): if isinstance(value, ExperimentContainer): item = value item.id = key elif isinstance(value, FitResult): item = FitContainer(key, value, manager=self, **kwargs) elif isinstance(value, Signal): item = SignalContainer(key, value, manager=self, **kwargs) else: item = PointContainer(key, value, manager=self, **kwargs) item.dataChanged.connect(lambda x: self.dataChanged.emit(x)) self.data[key] = item def __getitem__(self, item): return self.data[item] def __contains__(self, item): return item in self.data def __iter__(self): for k, v in self.data.items(): yield k, v @property def active_sets(self): return self.graphs.active(self.current_graph) @property def active_id(self): return self.graphs.active(self.current_graph, return_val='id') def get_attributes(self, graph_id: str, attr: str) -> dict[str, Any]: return {self.data[i].id: getattr(self.data[i], attr) for i in self.graphs[graph_id].sets} def add(self, data, **kwargs): _id = str(uuid.uuid4()) self.__setitem__(_id, data, **kwargs) return _id def load_files(self, fname: list[str], new_plot: str = None): if self._filereader is None: self._filereader = QFileReader(manager=self) ret_dic = self._filereader.readfiles(fname) self.add_new_data(ret_dic, new_plot) def _load_session(self, sets: dict, graphs: dict): sid = self._load_sets(sets) for g in graphs: _ = g.pop('id') graph = QGraphWindow.set_state(g) self.graphs[graph.id] = graph self.restoreGraph.emit(graph.id) children = [sid[c] for c in g['children']] active = [sid[c] for c in g['active']] inactive = [k for k in children if k not in active] self.newData.emit(children, graph.id) graph.active = active graph.listWidget.blockSignals(True) for i, l in enumerate(g['in_legend']): try: graph.listWidget.item(i).setCheckState(QtCore.Qt.Checked if l else QtCore.Qt.Unchecked) except AttributeError: pass graph.listWidget.blockSignals(False) # set unchecked in tree and hide/show in plot self.unset_state.emit(inactive) self.change_visibility(active, inactive) def _load_sets(self, sets: dict) -> dict: sid = {} for _id, (data, opts) in sets.items(): if isinstance(data, FitResult): # for fits, _id belongs to the fitted data, not the fit src_id = data.idx if src_id in sid: new_id = self.add(data, src=sid[src_id]) self.data[sid[src_id]]._fits.append(new_id) else: new_id = self.add(data) else: new_id = self.add(data) sid[_id] = new_id for m in ['real', 'imag']: if m in opts: self.data[new_id].setSymbol(**opts[m][0], mode=m) self.data[new_id].setLine(**opts[m][1], mode=m) return sid def add_new_data(self, data: list, gid: str): sid = [] for d in data: if isinstance(d, tuple): if len(d) == 2: self._load_session(d[0], graphs=d[1]) else: sid.extend(list(self._load_sets(d[0]).values())) else: sid.append(self.add(d)) if sid: gid = '' if not gid else gid self.newData.emit(sid, gid) def plots_to_graph(self, plotkeys: list, gid: str): self.graphs[gid].add(plotkeys, [self.data[k].plots for k in plotkeys]) for k in plotkeys: self.data[k].graph = gid @QtCore.pyqtSlot(list) def plot_from_graph(self, key: list[str]): sort_graph = {} for sid in key: v = self.data[sid].graph if v not in sort_graph: sort_graph[v] = [] sort_graph[v].append(sid) for gid, sets in sort_graph.items(): self.graphs[gid].remove(sets) @QtCore.pyqtSlot(list, str, str) def move_sets(self, sets: list, dest: str, src: (str|list), pos: int = -1): if isinstance(src, str): src = [src]*len(sets) for graph_id, set_id in zip(src, sets): # move all plots to the same graph if graph_id != dest: self.graphs[graph_id].remove(set_id) self.plots_to_graph([set_id], dest) # move to correct position self.graphs[dest].move_sets(sets, pos) def select_window(self, gid: str): for key, plot in self.graphs.items(): if key == gid: self.window.area.setActiveSubWindow(plot.parent()) @QtCore.pyqtSlot() @QtCore.pyqtSlot(list, str) def copy_sets(self, sets: list = None, src: str = None): if sets is None: sets = self.graphs[self.current_graph].active[:] if src is None: src = self.current_graph new_ids = [] for s in sets: copy_of_s = self.data[s].copy(full=True) copy_of_s.id = str(uuid.uuid4()) new_ids.append(copy_of_s.id) self.data[copy_of_s.id] = copy_of_s self.newData.emit(new_ids, src) return new_ids @QtCore.pyqtSlot(list) @QtCore.pyqtSlot(str) @QtCore.pyqtSlot() def delete_sets(self, rm_sets: list = None): rm_graphs = [] if rm_sets is None: rm_sets = self.graphs[self.current_graph].sets + [self.current_graph] self.undostack.beginMacro('Delete') rm_set_by_graph = {} for k in rm_sets[::-1]: if k in self.data: parent_graph = self.data[k].graph if parent_graph not in rm_set_by_graph: rm_set_by_graph[parent_graph] = [] rm_set_by_graph[parent_graph].append(k) elif k in self.graphs: rm_graphs.append(k) else: logger.warning(f'delete_sets: {k} is not in data or graph found') for gid, sid_list in rm_set_by_graph.items(): cmd = DeleteCommand(self.data, sid_list, self.graphs, gid, self.newData, self.deleteData) self.undostack.push(cmd) for k in rm_graphs: cmd = DeleteGraphCommand(self.graphs, k, self.restoreGraph, self.deleteGraph) self.undostack.push(cmd) self.undostack.endMacro() def delete_graph(self, gid): self.delete_sets(self.graphs[gid].sets + [gid]) @QtCore.pyqtSlot() def cat(self, src_sets=None): joined = None group_set = set() name_set = set() value_set = set() if src_sets is None: if self.current_graph: src_sets = self.graphs[self.current_graph].active else: return for sid in src_sets: data_i = self.data[sid] if joined is None: joined = data_i.copy() else: joined.append(data_i.data.x, data_i.data.y, y_err=data_i.data.y_err, mask=data_i.data.mask) name_set.add(data_i.name) group_set.add(data_i.group) value_set.add(data_i.value) if joined is not None: joined.group = '+'.join(group_set) joined.name = '+'.join(name_set) if len(value_set) == 1: joined.value = value_set.pop() else: joined.value = 0.0 self.newData.emit([self.add(joined)], self.current_graph) def get_data(self, sid: str, xy_only: bool = False): """ Return data for a given id. Return value is tuple of [x, y, y_err] and mask if xy_only is False, [x, y] if true. """ d = self.data[sid] if xy_only: return [d.x, d.y] return [d.data.x, d.data.y, d.data.y_err], d.data.mask.data def change_visibility(self, selected: list, deselected: list): """Change status of list of ids after status change in datawidget""" for item_list, func in [(selected, 'show_item'), (deselected, 'hide_item')]: grouping = {} for s in item_list: g = self.data[s].graph if g not in grouping: grouping[g] = [] grouping[g].append(s) for k, v in grouping.items(): getattr(self.graphs[k], func)(v) @QtCore.pyqtSlot(str, str) def change_keys(self, identifier: str, name: str): if identifier in self.data: d = self.data[identifier] d.name = name self.graphs[d.graph].update_legend(identifier, name) elif identifier in self.graphs: self.graphs[identifier].title = name else: raise KeyError('Unknown ID ' + str(identifier)) @QtCore.pyqtSlot(str, tuple) def apply(self, func: str, arguments: tuple): # undos, names displayed by undo action cmd, cmd_text = self._actions[func] self.undostack.beginMacro(cmd_text) for sid in self.graphs[self.current_graph]: single_undo = cmd(self.data[sid], *arguments) self.undostack.push(single_undo) self.undostack.endMacro() def cut(self): if self.current_graph: xlim, _ = self.graphs[self.current_graph].ranges self.apply('cut', xlim) @QtCore.pyqtSlot() def unmask(self): for d in self.data.values(): d.mask = np.ones_like(d.mask, dtype=bool) def prepare_fit(self, parameter: dict, links: list, fit_options: dict) -> bool: if self._fit_active: return False self.__fit_options = (parameter, links, fit_options) self.fitter = FitRoutine() models = {} fit_limits = fit_options['limits'] fit_mode = fit_options['fit_mode'] we_option = fit_options['we'] self.fitter.fitmethod = fit_mode # all-encompassing error catch try: for model_id, model_p in parameter.items(): m = model_p['func'] models[model_id] = m m_complex = model_p['complex'] # sets are not in active order but in order they first appeared in fit dialog # iterate over order of set id in active order and access parameter inside loop # instead of directly looping try: list_ids = list(model_p['data_parameter'].keys()) set_order = [self.active_id.index(i) for i in list_ids] except ValueError as e: raise Exception('Getting order failed') from e for pos in set_order: set_id = list_ids[pos] try: data_i = self.data[set_id] except KeyError as e: raise KeyError(f'{set_id} not found') from e try: set_params = model_p['data_parameter'][set_id] except KeyError as e: raise KeyError(f'No parameter found for {set_id}') from e if we_option.lower() == 'deltay': we = data_i.y_err**2 else: we = we_option if m_complex is None or m_complex == 1: _y = data_i.y.real elif m_complex == 2 and np.iscomplexobj(data_i.y): _y = data_i.y.imag else: _y = data_i.y _x = data_i.x if fit_limits == 'none': inside = slice(None) elif fit_limits == 'x': x_lim, _ = self.graphs[self.current_graph].ranges inside = np.where((_x >= x_lim[0]) & (_x <= x_lim[1])) else: inside = np.where((_x >= fit_limits[0]) & (_x <= fit_limits[1])) try: if isinstance(we, str): d = fit_d.Data(_x[inside], _y[inside], we=we, idx=set_id) else: d = fit_d.Data(_x[inside], _y[inside], we=we[inside], idx=set_id) except Exception as e: raise Exception(f'Setting data failed for {set_id}') d.set_model(m) try: d.set_parameter(set_params[0], fun_kwargs=set_params[1]) except Exception as e: raise Exception('Setting parameter failed') from e self.fitter.add_data(d) for links_i in links: self.fitter.set_link_parameter((models[links_i[0]], links_i[1]), (models[links_i[2]], links_i[3])) return True except Exception as e: logger.error('Fit preparation failed', *e.args) QtWidgets.QMessageBox.warning(QtWidgets.QWidget(), 'Fit prep failed', f'Fit preparation failed:\n' 'Message:\n' f'{e.args}:\n') return False def start_fit(self): with busy_cursor(): self.fit_worker = FitWorker(self.fitter) self.fit_thread = QtCore.QThread() self.fit_worker.moveToThread(self.fit_thread) self.fit_thread.started.connect(self.fit_worker.run) self.fit_worker.finished.connect(self.end_fit) self.fit_worker.finished.connect(self.fit_thread.quit) self.fit_worker.finished.connect(self.fit_worker.deleteLater) self.fit_thread.finished.connect(self.fit_thread.deleteLater) self.stopFit.connect(lambda: self.fit_worker.fitter.abort()) self.fit_thread.start() @QtCore.pyqtSlot(list, bool) def end_fit(self, result: list, success: bool): if success: logger.info('Successful fit') self.fitFinished.emit(result) else: e = result[0] logger.exception(e, exc_info=True) QtWidgets.QMessageBox.warning(QtWidgets.QWidget(), 'Fit failed', f'Fit kaput with exception: \n\n{e!r}') self.fitFinished.emit([]) self._fit_active = False @QtCore.pyqtSlot(dict) def redo_fits(self, res: dict): models = self.__fit_options[0] for single_model, model_args in models.items(): parameter = model_args['parameter'] for set_id, set_parameter in parameter.items(): new_values = [v.value for v in res[set_id].parameter.values()] parameter[set_id] = (new_values, set_parameter[1]) if self.prepare_fit(*self.__fit_options): self.start_fit() def make_fits(self, res: dict, opts: list, param_graph: str, show_fit: bool, parts: bool, extrapolate: list) -> None: """ Args: res: key is that of original data, value is FitResult opts: (ignore this fits, delete previous fits) param_graph: None if no parameter to plot, '' for new graph, or id of existig graph show_fit: plot fit curve? parts: key is that of original data, value is list of subplots extrapolate: """ f_id_list = [] gid = '' tobedeleted = [] accepted = [] for i, (k, fit) in enumerate(res.items()): reject, delete_prev = opts[i] if reject: continue if not all(e is None for e in extrapolate): spacefunc = np.geomspace if fit.islog else np.linspace xmin = fit.x.min() xmax = fit.x.max() len_data = len(fit.x_data) num_pts = 20*len_data-9 if len_data < 51 else 3*len_data if extrapolate[0] is not None: xmin = extrapolate[0] if extrapolate[1] is not None: xmax = extrapolate[1] if extrapolate[2] is not None: num_pts = extrapolate[2] _x = spacefunc(xmin, xmax, num=num_pts) fit = fit.with_new_x(_x) data_k = self.data[k] if delete_prev: tobedeleted.extend([f.id for f in data_k.get_fits()]) data_k.set_fits([]) syms = data_k.plot_real.symbol if syms == SymbolStyle.No: color = data_k.plot_real.linecolor else: color = data_k.plot_real.symbolcolor fit.value = data_k.value fit.group = data_k.group accepted.append(fit) data_name = f' ({data_k.name})' if show_fit: fit.name += data_name f_id = self.add(fit, color=color, src=k) f_id_list.append(f_id) data_k.set_fits(f_id) if parts: color_scheme = available_cycles['colorblind'] for subfunc, col in zip(fit.sub(fit.x), cycle(color_scheme)): subfunc.value = data_k.value subfunc.group = data_k.group subfunc.name += data_name sub_f_id = self.add(subfunc, color=col, linestyle=LineStyle.Dashed, symbol=SymbolStyle.No) self[sub_f_id].add_relation(Relations.isFitPartOf, f_id) self[f_id].add_relation(Relations.hasFitPart, sub_f_id) f_id_list.append(sub_f_id) gid = data_k.graph self.delete_sets(tobedeleted) if accepted and (param_graph != '-1'): self.make_fit_parameter(accepted, graph_id=param_graph) self.newData.emit(f_id_list, gid) def extend_fits(self, set_id: list, x_range: np.ndarray): graphs = {} for sid in set_id: data = self[sid] fit = data.copy(full=True, keep_color=True) fit.data = fit.data.with_new_x(x_range) graph_id = data.graph if graph_id not in graphs: graphs[graph_id] = [] graphs[graph_id].append(self.add(fit)) color_scheme = available_cycles['colorblind'] for subfunc, col in zip(fit.data.sub(fit.x), cycle(color_scheme)): subfunc.value = fit.value subfunc.group = fit.group graphs[graph_id].append(self.add(subfunc, color=col, linestyle=LineStyle.Dashed, symbol=SymbolStyle.No)) for k, v in graphs.items(): self.newData.emit(v, k) def make_fit_parameter(self, fit_sets: list[str | FitResult], graph_id: str = None): fit_dict = self._collect_fit_parameter(fit_sets) if fit_dict: p_id_list = [] for v in fit_dict.values(): xy = np.array(v[0]).T p_id_list.append(self.add(Points(x=xy[0], y=xy[1], y_err=xy[2], name=v[1]))) if not graph_id: graph_id = '' self.newData[list, str, bool].emit(p_id_list, graph_id, True) def save_fit_parameter(self, fname: str | pathlib.Path, fit_sets: list[str] = None): if fit_sets is None: fit_sets = [s for s in self.active_id] for set_id in fit_sets: data = self.data[set_id] if data.mode != 'fit': continue data.data.save_parameter(fname) def _collect_fit_parameter(self, fit_sets: list[str | FitResult]) -> dict: fit_dict = {} for set_id in fit_sets: if isinstance(set_id, str): data = self.data[set_id] if data.mode != 'fit': continue elif isinstance(set_id, FitResult): data = set_id else: continue for fit_key, pvalue in data.parameter.items(): if fit_key not in fit_dict: fit_dict[fit_key] = [[], fit_key] err = 0 if pvalue.error is None else pvalue.error fit_dict[fit_key][0].append([data.value, pvalue.value, err]) return fit_dict @QtCore.pyqtSlot(dict, str) def extract_points(self, params: dict, gid: str): xy_mode = params.pop('xy') _active = self.graphs[self.current_graph].active new_datasets = {} for sid in _active: data_i = self.data[sid] if data_i.group not in new_datasets: new_datasets[data_i.group] = [], [] new_x_axis, _temp = new_datasets[data_i.group] new_x_axis.append(data_i.value) _temp.append(data_i.points(params)) key_list = [] for label, (new_x_axis, _temp) in new_datasets.items(): _temp = np.array(_temp) # (number of sets, number of picks, (x, y, y_err)) num_pts = _temp.shape[1] for i in range(num_pts): if xy_mode[0]: key = self.add(Points(x=new_x_axis, y=_temp[:, i, 0], name=label)) key_list.append(key) if xy_mode[1]: key = self.add(Points(x=new_x_axis, y=_temp[:, i, 1], y_err=_temp[:, i, 2], name=label)) key_list.append(key) self.newData[list, str, bool].emit(key_list, gid, True) @QtCore.pyqtSlot(list) def get_properties(self, sid: list) -> dict: props = {} for key in sid: if key not in self.data: continue props = self.data[key].get_properties() self.properties_collected.emit(props) return props @QtCore.pyqtSlot(list, str, str, object) def update_property(self, sid: list, key1: str, key2: str, value: Any): for s in sid: self.data[s].update_property(key1, key2, value) def create_empty(self): import numpy.random as random dat = Points(x=np.arange(10), y=np.arange(10) + random.rand(10)-0.5, y_err=random.rand(10), name='Das Sein und das Nichts') idd = self.add(dat) self.newData.emit([idd], self.current_graph) @QtCore.pyqtSlot(tuple, dict, str) def calc_mean(self, dist_params, conversion, graph): dist, args = dist_params parameter = [] x = None name = 'tau (%s)' % {conversion["to_"]} value = 0. for i, p in enumerate(args): if isinstance(p, float): parameter.append(p) else: if x is None: x = self.data[p].x if i == 0: name = self.data[p].name value = self.data[p].value parameter.append(self.data[p].y) if x is None: x = 0 key = self.add(Points(x, dist.convert(*parameter, **conversion), name=name, value=value)) self.newData.emit([key], graph) self.sender().update_graphs(self.graphs.tree(key_only=True)) @QtCore.pyqtSlot(list, str, bool, bool, tuple, str) def interpolate_data(self, data_ids, mode, xlog, ylog, new_axis, dest_graph): if len(new_axis) == 4: start, end, steps, loggy = new_axis if loggy: new_x = np.logspace(np.log10(start), np.log10(end), steps) else: new_x = np.linspace(start, end, steps) else: new_x = self.data[new_axis[0]].x new_key = [] for ids in data_ids: k = self.add(interpolate(self.data[ids], new_x, xlog=xlog, ylog=ylog, kind=mode, extrapolate=True)) new_key.append(k) self.newData.emit(new_key, dest_graph) def binning(self, digits: float): _active = self.graphs[self.current_graph].active new_data = [] for sid in _active: key = self.add(self.data[sid].binning(digits=digits)) new_data.append(key) self.newData.emit(new_data, self.current_graph) @QtCore.pyqtSlot(dict, str) def addTg(self, dic: dict, dtype: str): graph_id = self.current_graph if dtype == 'tg' else dic.pop('graph') set_id_list = [] if dtype == 'hodge': for v in dic.values(): set_id_list.append(self.add(v)) else: for k, (data, lines) in dic.items(): p: ExperimentContainer = self[k] col = p.plot_real.linecolor if data is not None: set_id_list.append(self.add(data, color=col)) if dtype == 'tnmh': if lines is not None: lines = [lines] else: lines = [] for line in lines: set_id = self.add(line, color=col) self[set_id].setLine(style=LineStyle.Dashed) self[set_id].setSymbol(symbol=SymbolStyle.No) set_id_list.append(set_id) self.newData.emit(set_id_list, graph_id) @QtCore.pyqtSlot(int, dict) def smooth_data(self, npoints, param_kwargs): _active = self.graphs[self.current_graph].active new_data = [] for sid in _active: try: key = self.add(smooth(self.data[sid], npoints, **param_kwargs)) new_data.append(key) except Exception as e: QtWidgets.QMessageBox().warning(self.window, 'Smoothing failed!', f'Smoothing failed for {self.data[sid].name} with exception:\n{e.args}') if new_data: self.newData.emit(new_data, self.current_graph) @QtCore.pyqtSlot() def set_cycle(self, set_idx: list, cycle_name: str): col = cycle(available_cycles[cycle_name]) for s_id in set_idx: self.data[s_id].setColor(next(col), symbol=True, line=True, mode='all') @QtCore.pyqtSlot(dict, tuple) def shift_scale(self, values: dict, options: tuple): copy_data, value_plot = options sid_list = [] shift_y = [] shift_x = [] for k, v in values.items(): d_k = self.data[k] if copy_data is None: d_k.shift_scale(v[0], v[1]) else: new_data = d_k.copy(full=True) new_data.shift_scale(v[0], v[1]) sid = self.add(new_data) sid_list.append(sid) shift_x.append(d_k.value) shift_y.append(v[0]+v[1]) self.newData.emit(sid_list, copy_data) if value_plot is not None: sid_list = [] shift_y = np.array(shift_y) for i, (mode, default) in enumerate([('x shift', 0.), ('y shift', 0.), ('x scale', 1.), ('y scale', 1.), ]): if np.all(shift_y[:, i] == default): continue data = Points(shift_x, shift_y[:, i], name=mode) sid_list.append(self.add(data)) self.newData.emit(sid_list, value_plot) @QtCore.pyqtSlot(list) def convert_sets(self, src: list): new_graph = {} error_list = [] for sets in src: # merge: sets (real, imag, graph, type) # normal: sets (source set, graph, type) graph_id = sets[-2] new_type = [Points, FID, Spectrum, BDS][sets[-1]] if len(sets) == 4: real_set, imag_set = sets[0], sets[1] if real_set != '': data = self.data[real_set] new_data = new_type(data.x, data.y.real) if imag_set != '': imag_data = self.data[imag_set] if len(imag_data) == len(data): new_data.y.imag = imag_data.y.real else: error_list.append(f'Lengths mismatch of {data.name} ({len(data)}) and {imag_data.name} ({len(imag_data)})') continue else: data = self.data[imag_set] new_data = new_type(data.x, np.zeros(data.x.size)) new_data.y.imag = data.y.real else: data = self.data[sets[0]] if isinstance(data.data, new_type): error_list.append(f'{data.name} is alreade of type {new_type.__name__}') continue new_data = new_type(data.x, np.zeros(data.x.size)) new_data.y.real = data.y.real new_data.update(data.opts) new_id = self.add(data.change_type(new_data)) if graph_id not in new_graph: new_graph[graph_id] = [] new_graph[graph_id].append(new_id) for g, s in new_graph.items(): self.newData.emit(s, g) if error_list: err_string = "\n- ".join(error_list) _ = QtWidgets.QMessageBox.information(QtWidgets.QWidget(), 'Something was skipped', f'Some conversions were skipped:\n{err_string}') def get_namespace(self): from ..lib.namespace import Namespace self.namespace = Namespace(basic=True, const=True, fitfuncs=True) for i, g in enumerate(self.graphs.values()): for j, sid in enumerate(g.sets): sets = self.data[sid] self.namespace.add_namespace(sets.get_namespace(i, j), parents=('Data', f'{sets.name} ({g.title})')) return self.namespace @QtCore.pyqtSlot(list, list, bool) def eval_expression(self, cmds: list, set_ids: list, overwrite: bool): ns = self.namespace.flatten() if overwrite: self.undostack.beginMacro('Evaluate expression') failures = [] for i, g in enumerate(self.graphs.values()): for j, sid in enumerate(g.sets): if sid not in set_ids: continue data_i = self.data[sid] try: # use a copy of original namespace new_data = data_i.eval_expression(cmds, dict(ns), i=i, j=j) if overwrite: cmd = EvalCommand(self.data, sid, new_data, 'Evaluate expression') self.undostack.push(cmd) else: new_id = self.copy_sets(sets=[sid]) self.data[new_id[0]].data = new_data except Exception as e: failures.append((data_i, e)) logger.warning(str(data_i) + ' failed with Exception: ' + ''.join(e.args)) continue if overwrite: self.undostack.endMacro() if failures: err_msg = QtWidgets.QMessageBox(parent=self.sender()) err_msg.setText('One or more errors occured during evaluation.') err_msg.setDetailedText('\n'.join(f'{d.name} failed with error: {err.args}' for d, err in failures)) err_msg.exec() self.sender().success = not failures self.sender().add_data(self.active_sets) @QtCore.pyqtSlot(list, dict) def create_from_function(self, cmds: list, opts: dict): ns = dict(self.namespace.flatten()) dtype = [Points, FID, Spectrum, BDS][opts.pop('dtype')] try: for c in cmds: exec(c, globals(), ns) name = opts.pop('name') value = opts.pop('val') graph = opts.pop('graph') data = dtype(x=ns['x'], y=ns['y'], y_err=ns['y_err'], name=name, value=value) s_id = self.add(data, **opts) self.sender().success = True self.newData.emit([s_id], graph) except Exception as err: logger.exception('Creation failed with error: ' + ', '.join(err.args)) err_msg = QtWidgets.QMessageBox(parent=self.sender()) err_msg.setText('One or more errors occurred during evaluation.') err_msg.setDetailedText('Creation failed with error: ' + ', '.join(err.args)) err_msg.exec() self.sender().success = False def show_statistics(self, mode): x, y, = [], [] for i in self.active_id: _temp = self.data[i] try: x.append(float(_temp.name)) except ValueError: x.append(i) y.append(_temp.statistic(mode)) @QtCore.pyqtSlot() def calc_magn(self): new_id = [] for k in self.active_id: dataset = self.data[k] if isinstance(dataset, SignalContainer): new_value = dataset.copy(full=True) new_value.data = dataset.data.magnitude() new_id.append(self.add(new_value)) self.newData.emit(new_id, '') @QtCore.pyqtSlot() def center(self): new_id = [] for k in self.active_id: new_value = self.data[k].copy(full=True) new_value.x -= new_value.x[np.argmax(new_value.y.real)] new_id.append(self.add(new_value)) self.newData.emit(new_id, '') def integrate(self, **kwargs): new_sets = [] log = kwargs['log'] limits = kwargs.get('limits') mode = kwargs['mode'] for set_id in kwargs['sets']: data_i = self.data[set_id] if mode == 'i': new_data = data_i.data.integrate(log=log, limits=limits) elif mode == 'd': new_data = data_i.data.diff(log=log) else: raise ValueError(f'Unknown mode {mode}.') new_container = data_i.copy(full=True) new_container.data = new_data new_sets.append(self.add(new_container)) self.newData.emit(new_sets, kwargs['graph']) def integral_datasets(self, ranges: list, x_vals: list, y_vals: np.ndarry): new_sets = [] for range_i, y_val_i in zip(ranges, y_vals): new_sets.append(self.add(Points(x=x_vals, y=y_val_i, name=f'{range_i[0]:.4g}-{range_i[1]:.4g}'))) self.newData.emit(new_sets, '') def bds_deriv(self): new_sets = [] for set_id in self.active_id: data_i = self.data[set_id] diff = data_i.data.diff(log=True) new_data = Points(x=diff.x, y=-np.pi/2*diff.y.real) new_data.update(data_i.data.meta) new_sets.append(self.add(new_data, color=data_i.plot_imag.linecolor)) self.newData.emit(new_sets, '') def logft(self, **kwargs): new_sets = [] ft_mode = kwargs['ft_mode'] for set_id in kwargs['sets']: data_i = self.data[set_id] if ft_mode in ['cos', 'sin']: new_data = Points(*logft(data_i.x, data_i.y, mode=ft_mode)) else: new_data = Signal(*logft(data_i.x, data_i.y, mode=ft_mode)) new_sets.append(self.add(new_data, color=data_i['color'], symbol=data_i['symbol'], line=data_i['line'])) self.data[new_sets[-1]].update(data_i.data.meta) self.newData.emit(new_sets, kwargs['graph']) def skip_points(self, offset: int, step: int, invert: bool = False, copy: bool = False): for k in self.active_id: src = self.data[k] if invert: mask = np.mod(np.arange(offset, src.x.size+offset), step) != 0 else: mask = np.mod(np.arange(offset, src.x.size+offset), step) == 0 if copy: data = src.copy() temp = data.mask.copy() temp[temp] = mask data.remove(np.where(~temp)) data.mask = np.ones(data.x.shape) idd = self.add(data) self.newData.emit([idd], self.current_graph) else: src.mask[src.mask] = mask src.mask = src.mask @QtCore.pyqtSlot(dict) def calc_relaxation(self, opts: dict): params = opts['pts'] if len(params) == 4: if params[3]: _x = np.geomspace(params[0], params[1], num=params[2]) x1 = np.geomspace(params[0], params[1], num=params[2]) else: _x = np.linspace(params[0], params[1], num=params[2]) x1 = np.linspace(params[0], params[1], num=params[2]) if opts['axis1'] in ['t', 'invt1000']: t_p = opts['t_param'] if len(t_p) == 2: from nmreval.models import Arrhenius as Func else: from nmreval.models import VFT as Func _x = Func.func(x1, *t_p, invt=opts['axis1']) else: if params[1]: x1 = self.data[params[0]].x _x = self.data[params[0]].y.real else: _x = x1 = self.data[params[0]].x x2 = opts['val2'] sd = opts['spec_dens'] sd_param = list(zip(*[self.data[p].y.real if isinstance(p, str) else [p]*len(_x) for p in opts['sd_param'][0]])) relax = Relaxation() relax.set_distribution(sd, keywords=opts['sd_param'][1]) cp_param = list(zip(*[self.data[p].y.real if isinstance(p, str) else [p]*len(_x) for p in opts['cp_param'][0]])) # relax.set_coupling(opts['coup'], parameter=cp_param, keywords=opts['cp_param'][1]) relax_func = relax.t1 if opts['out'] == 't1' else relax.t2 y = np.zeros(_x.size) for i in range(_x.size): _x_i = sd.convert(_x[i], *sd_param[i], from_=opts['tau_type'], to_='raw') relax.dist_parameter = sd_param[i] relax.set_coupling(opts['coup'], parameter=cp_param[i], keywords=opts['cp_param'][1]) y[i] = relax_func(x2, _x_i) pts = Points(x1, y, name=sd.name) pts.meta.update(opts) # we do not want class instances pts.meta['coup'] = opts['coup'].name pts.meta['spec_dens'] = sd.name self.newData.emit([self.add(pts)], opts['graph']) self.sender().update_graphs(self.graphs.list()) @QtCore.pyqtSlot(str, list) def mask_value(self, idx: str, m: list): self.data[idx].mask = m @QtCore.pyqtSlot(str, list) def remove_values(self, idx: str, m: list): self.data[idx].remove(m) @QtCore.pyqtSlot(str, int) def split_set(self, idx: str, row: int): selected_data = self.data[idx] popular_front_of_judea = selected_data.copy(full=True) selected_data.remove(slice(row, None)) popular_front_of_judea.remove(slice(None, row)) new_id = self.add(popular_front_of_judea) self.newData.emit([new_id], selected_data.graph) def set_values(self, idx: str, pos: tuple, value): self.data[idx].setvalues(pos, value) def append(self, idx: str): self.data[idx].add([0.0, 0.0, 0.0]) def save(self, outpath: str | pathlib.Path, extension: str, strip_spaces=False): path = pathlib.Path(outpath) suffix = path.suffix if not suffix: m = re.match(r'[\w\s]*\(\*(\.\w+)\)', extension) if m: suffix = m.group(1) path = path.with_suffix(suffix) else: raise ValueError('No file extension detected') if suffix == '.nmr': from nmreval.io.sessionwriter import NMRWriter NMRWriter(self.graphs, self.data).export(path) return real_outnames = [] for set_id, set_name in self.active_sets: full_name = path.stem if '