from __future__ import annotations import pathlib import re import uuid from nmreval.fit import data as fit_d from nmreval.fit.model import Model from nmreval.fit.result import FitResult from nmreval.fit.minimizer import FitRoutine from nmreval.lib.colors import available_cycles from nmreval.math.interpol import interpolate from nmreval.math.logfourier import logft from nmreval.math.smooth import smooth from nmreval.nmr.relaxation import Relaxation from ..Qt import QtCore, QtWidgets from ..lib.undos import * from ..data.container import * from ..io.filereaders import QFileReader from ..lib.utils import busy_cursor class GraphSignals(QtCore.QObject): valueChanged = QtCore.pyqtSignal() class GraphDict(OrderedDict): def __init__(self, data): super().__init__() self._data = data self.signals = GraphSignals() self.valueChanged = self.signals.valueChanged def __setitem__(self, key, value): super().__setitem__(key, value) self.valueChanged.emit() def __delitem__(self, key): super().__delitem__(key) self.valueChanged.emit() def tree(self, key_only=False): ret_val = OrderedDict() for k, g in self.items(): if key_only: ret_val[k] = (g.title, [(s, self._data[s].name) for s in g.sets]) else: ret_val[(k, g.title)] = [(s, self._data[s].name) for s in g.sets] return ret_val def list(self): return [(k, v.title) for k, v in self.items()] def active(self, key: str): if key: return [(self._data[i].id, self._data[i].name) for i in self[key]] else: return [] def current_sets(self, key: str): if key: return [(self._data[i].id, self._data[i].name) for i in self[key].sets] else: return [] class UpperManagement(QtCore.QObject): newGraph = QtCore.pyqtSignal() restoreGraph = QtCore.pyqtSignal(str) deleteGraph = QtCore.pyqtSignal(str) newData = QtCore.pyqtSignal(list, str) deleteData = QtCore.pyqtSignal(str) dataChanged = QtCore.pyqtSignal(str) fitFinished = QtCore.pyqtSignal(list) stopFit = QtCore.pyqtSignal() properties_collected = QtCore.pyqtSignal(dict) unset_state = QtCore.pyqtSignal(list) _colors = cycle(TUColors) _actions = { 'ls': (ShiftCommand, 'Left shift'), 'cut': (CutCommand, 'Cut'), 'ap': (ApodizationCommand, 'Apodization'), 'zf': (ZerofillCommand, 'Zerofill'), 'ph': (PhaseCommand, 'Phase correction'), 'autoph': (AutophaseCommand, 'Autophase'), 'bl': (BaselineCommand, 'Baseline'), 'bls': (BaselineSplineCommand, 'Baseline'), 'ft': (FourierCommand, 'Fourier'), 'ft_pake': 'FT (de-paked)', 'sort': (SortCommand, 'Sort'), 'norm': (NormCommand, 'Normalize'), 'center': (CenterCommand, 'Center on max'), } def __init__(self, window): super().__init__() self._fit_active = False self.fit_thread = None self.fit_worker = None self.counter = 0 self.data = OrderedDict() self.window = window self.current_graph = '' self.graphs = GraphDict(self.data) self.namespace = None self.undostack = QtWidgets.QUndoStack() self.deleteData.connect(self.plot_from_graph) self._filereader = None def __setitem__(self, key: str, value, **kwargs): if isinstance(value, ExperimentContainer): item = value item.id = key elif isinstance(value, FitResult): item = FitContainer(key, value, manager=self, **kwargs) elif isinstance(value, Signal): item = SignalContainer(key, value, manager=self, **kwargs) else: item = PointContainer(key, value, manager=self, **kwargs) item.dataChanged.connect(lambda x: self.dataChanged.emit(x)) self.data[key] = item def __getitem__(self, item): return self.data[item] def __contains__(self, item): return item in self.data def __iter__(self): for k, v in self.data.items(): yield k, v @property def active_sets(self): return self.graphs.active(self.current_graph) def get_attributes(self, graph_id: str, attr: str) -> dict[str, Any]: return {self.data[i].id: getattr(self.data[i], attr) for i in self.graphs[graph_id].sets} def add(self, data, **kwargs): _id = str(uuid.uuid4()) self.__setitem__(_id, data, **kwargs) return _id def load_files(self, fname: list[str], new_plot: str = None): if self._filereader is None: self._filereader = QFileReader(manager=self) ret_dic = self._filereader.readfiles(fname) self.add_new_data(ret_dic, new_plot) def _load_session(self, sets: dict, graphs: dict): sid = self._load_sets(sets) for g in graphs: _ = g.pop('id') graph = QGraphWindow.set_state(g) self.graphs[graph.id] = graph self.restoreGraph.emit(graph.id) children = [sid[c] for c in g['children']] active = [sid[c] for c in g['active']] inactive = [k for k in children if k not in active] self.newData.emit(children, graph.id) graph.active = active graph.listWidget.blockSignals(True) for i, l in enumerate(g['in_legend']): graph.listWidget.item(i).setCheckState(l) graph.listWidget.blockSignals(False) # set unchecked in tree and hide/show in plot self.unset_state.emit(inactive) self.change_visibility(active, inactive) def _load_sets(self, sets: dict) -> dict: sid = {} for _id, (data, opts) in sets.items(): if isinstance(data, FitResult): # for fits, _id belongs to the fitted data, not the fit src_id = data.idx if src_id in sid: new_id = self.add(data, src=sid[src_id]) self.data[sid[src_id]]._fits.append(new_id) else: new_id = self.add(data) else: new_id = self.add(data) sid[_id] = new_id for m in ['real', 'imag']: if m in opts: self.data[new_id].setSymbol(**opts[m][0], mode=m) self.data[new_id].setLine(**opts[m][1], mode=m) return sid def add_new_data(self, data: list, gid: str): sid = [] for d in data: if isinstance(d, tuple): if len(d) == 2: self._load_session(d[0], graphs=d[1]) else: sid.extend(list(self._load_sets(d[0]).values())) else: sid.append(self.add(d)) if sid: gid = '' if not gid else gid self.newData.emit(sid, gid) def plots_to_graph(self, plotkeys: list, gid: str): self.graphs[gid].add(plotkeys, [self.data[k].plots for k in plotkeys]) for k in plotkeys: self.data[k].graph = gid @QtCore.pyqtSlot(str) def plot_from_graph(self, key: str): self.graphs[self.data[key].graph].remove(key) @QtCore.pyqtSlot(list, str, str) def move_sets(self, sets: list, dest: str, src: (str|list), pos: int = -1): if isinstance(src, str): src = [src]*len(sets) for graph_id, set_id in zip(src, sets): # move all plots to the same graph if graph_id != dest: self.graphs[graph_id].remove(set_id) self.plots_to_graph([set_id], dest) # move to correct position self.graphs[dest].move_sets(sets, pos) def select_window(self, gid: str): for key, plot in self.graphs.items(): if key == gid: self.window.area.setActiveSubWindow(plot.parent()) @QtCore.pyqtSlot() @QtCore.pyqtSlot(list, str) def copy_sets(self, sets: list = None, src: str = None): if sets is None: sets = self.graphs[self.current_graph].active[:] if src is None: src = self.current_graph new_ids = [] for s in sets: copy_of_s = self.data[s].copy(full=True) copy_of_s.id = str(uuid.uuid4()) new_ids.append(copy_of_s.id) self.data[copy_of_s.id] = copy_of_s self.newData.emit(new_ids, src) return new_ids @QtCore.pyqtSlot(list) @QtCore.pyqtSlot(str) def delete_sets(self, rm_sets: list = None): rm_graphs = [] if rm_sets is None: rm_sets = self.graphs[self.current_graph].sets + [self.current_graph] self.undostack.beginMacro('Delete') for k in rm_sets[::-1]: if k in self.data: cmd = DeleteCommand(self.data, k, self.newData, self.deleteData) self.undostack.push(cmd) else: rm_graphs.append(k) for k in rm_graphs: cmd = DeleteGraphCommand(self.graphs, k, self.restoreGraph, self.deleteGraph) self.undostack.push(cmd) self.undostack.endMacro() def delete_graph(self, gid): self.delete_sets(self.graphs[gid].sets + [gid]) @QtCore.pyqtSlot() def cat(self, src_sets=None): joined = None group_set = set() name_set = set() value_set = set() if src_sets is None: if self.current_graph: src_sets = self.graphs[self.current_graph].active else: return for sid in src_sets: data_i = self.data[sid] if joined is None: joined = data_i.copy() else: joined.append(data_i.x, data_i.y, data_i.y_err) name_set.add(data_i.name) group_set.add(data_i.group) value_set.add(data_i.value) if joined is not None: joined.group = '/'.join(group_set) joined.name = '/'.join(name_set) if len(value_set) == 1: joined.value = value_set.pop() else: joined.value = 0.0 self.newData.emit([self.add(joined)], self.current_graph) def get_data(self, sid: str, xy_only: bool = False): """ Return data for a given id. Return value is tuple of [x, y, y_err] and mask if xy_only is False, [x, y] if true. """ d = self.data[sid] if xy_only: return [d.x, d.y] return [d.data.x, d.data.y, d.data.y_err], d.data.mask.data def change_visibility(self, selected: list, deselected: list): """Change status of list of ids after status change in datawidget""" for s in selected: self.graphs[self.data[s].graph].show_item([s]) for d in deselected: self.graphs[self.data[d].graph].hide_item([d]) @QtCore.pyqtSlot(str, str) def change_keys(self, identifier: str, name: str): if identifier in self.data: d = self.data[identifier] d.name = name self.graphs[d.graph].update_legend(identifier, name) elif identifier in self.graphs: self.graphs[identifier].title = name else: raise KeyError('Unknown ID ' + str(identifier)) @QtCore.pyqtSlot(str, tuple) def apply(self, func: str, arguments: tuple): # undos, names displayed by undo action cmd, cmd_text = self._actions[func] self.undostack.beginMacro(cmd_text) for sid in self.graphs[self.current_graph]: single_undo = cmd(self.data[sid], *arguments) self.undostack.push(single_undo) self.undostack.endMacro() def cut(self): if self.current_graph: xlim, _ = self.graphs[self.current_graph].ranges self.apply('cut', xlim) @QtCore.pyqtSlot() def unmask(self): for d in self.data.values(): d.mask = np.ones_like(d.mask, dtype=bool) def start_fit(self, parameter: dict, links: list, fit_options: dict): if self._fit_active: return self.__fit_options = (parameter, links, fit_options) fitter = FitRoutine() models = {} fit_limits = fit_options['limits'] fit_mode = fit_options['fit_mode'] we = fit_options['we'] for model_id, model_p in parameter.items(): m = Model(model_p['func']) models[model_id] = m m_complex = model_p['complex'] for set_id, set_params in model_p['parameter'].items(): data_i = self.data[set_id] if we.lower() == 'deltay': we = data_i.y_err**2 if m_complex is None or m_complex == 1: _y = data_i.y.real elif m_complex == 2 and np.iscomplexobj(data_i.y): _y = data_i.y.imag else: _y = data_i.y _x = data_i.x if fit_limits == 'none': inside = slice(None) elif fit_limits == 'x': x_lim, _ = self.graphs[self.current_graph].ranges inside = np.where((_x >= x_lim[0]) & (_x <= x_lim[1])) else: inside = np.where((_x >= fit_limits[0]) & (_x <= fit_limits[1])) if isinstance(we, str): d = fit_d.Data(_x[inside], _y[inside], we=we, idx=set_id) else: d = fit_d.Data(_x[inside], _y[inside], we=we[inside], idx=set_id) d.set_model(m) d.set_parameter(set_params[0], var=model_p['var'], lb=model_p['lb'], ub=model_p['ub'], fun_kwargs=set_params[1]) fitter.add_data(d) model_globs = model_p['glob'] if model_globs: m.set_global_parameter(**model_p['glob']) for links_i in links: fitter.set_link_parameter((models[links_i[0]], links_i[1]), (models[links_i[2]], links_i[3])) with busy_cursor(): self.fit_worker = FitWorker(fitter, fit_mode) self.fit_thread = QtCore.QThread() self.fit_worker.moveToThread(self.fit_thread) self.fit_thread.started.connect(self.fit_worker.run) self.fit_worker.finished.connect(self.end_fit) self.fit_worker.finished.connect(self.fit_thread.quit) self.fit_worker.finished.connect(self.fit_worker.deleteLater) self.fit_thread.finished.connect(self.fit_thread.deleteLater) self.stopFit.connect(lambda: self.fit_worker.fitter.abort()) self.fit_thread.start() @QtCore.pyqtSlot(list, bool) def end_fit(self, result: list, success: bool): print('FIT FINISHED') if success: self.fitFinished.emit(result) else: QtWidgets.QMessageBox.warning(QtWidgets.QWidget(), 'Fit failed', 'Fit kaput with exception: \n' + "\n".join(result[0])) self.fitFinished.emit([]) self._fit_active = False @QtCore.pyqtSlot(dict) def redo_fits(self, res: dict): models = self.__fit_options[0] for single_model, model_args in models.items(): parameter = model_args['parameter'] for set_id, set_parameter in parameter.items(): new_values = [v.value for v in res[set_id].parameter.values()] parameter[set_id] = (new_values, set_parameter[1]) self.start_fit(*self.__fit_options) def make_fits(self, res: dict, opts: list, param_graph: str, show_fit: bool, parts: dict) -> None: """ Args: res: key is that of original data, value is FitResult opts: (ignore this fits, delete previous fits) param_graph: None if no parameter to plot, '' for new graph, or id of existig graph show_fit: plot fit curve? parts: key is that of original data, value is list of subplots """ f_id_list = [] gid = '' tobedeleted = [] accepted = [] for i, (k, fit) in enumerate(res.items()): reject, delete_prev = opts[i] if reject: continue data_k = self.data[k] if delete_prev: tobedeleted.extend([f.id for f in data_k.get_fits()]) data_k.set_fits([]) syms = data_k.plot_real.symbol if syms == SymbolStyle.No: color = data_k.plot_real.linecolor else: color = data_k.plot_real.symbolcolor fit.value = data_k.value fit.group = data_k.group accepted.append(fit) data_name = f' ({data_k.name})' if show_fit: fit.name += data_name f_id = self.add(fit, color=color, src=k) f_id_list.append(f_id) data_k.set_fits(f_id) gid = data_k.graph if k in parts and show_fit: color_scheme = available_cycles['colorblind'] for subfunc, col in zip(parts[k], cycle(color_scheme)): subfunc.value = data_k.value subfunc.group = data_k.group subfunc.name += data_name sub_f_id = self.add(subfunc, color=col, linestyle=LineStyle.Dashed, symbol=SymbolStyle.No) f_id_list.append(sub_f_id) self.delete_sets(tobedeleted) if accepted and (param_graph != '-1'): self.make_fit_parameter(accepted, graph_id=param_graph) self.newData.emit(f_id_list, gid) def make_fit_parameter(self, fit_sets: list[str | FitResult], graph_id: str = None): fit_dict = self._collect_fit_parameter(fit_sets) if fit_dict: p_id_list = [] for v in fit_dict.values(): xy = np.array(v[0]).T p_id_list.append(self.add(Points(x=xy[0], y=xy[1], y_err=xy[2], name=v[1]))) if not graph_id: graph_id = '' self.newData.emit(p_id_list, graph_id) def save_fit_parameter(self, fname: str | pathlib.Path, fit_sets: list[str] = None): if fit_sets is None: fit_sets = [s for (s, _) in self.active_sets] for set_id in fit_sets: data = self.data[set_id] if data.mode != 'fit': continue data.data.save_parameter(fname) def _collect_fit_parameter(self, fit_sets: list[str | FitResult]) -> dict: fit_dict = {} for set_id in fit_sets: if isinstance(set_id, str): data = self.data[set_id] if data.mode != 'fit': continue elif isinstance(set_id, FitResult): data = set_id else: continue for key, pvalue in data.parameter.items(): name = pvalue.full_name fit_key = key + data.model_name if fit_key not in fit_dict: fit_dict[fit_key] = [[], name] err = 0 if pvalue.error is None else pvalue.error fit_dict[fit_key][0].append([data.value, pvalue.value, err]) return fit_dict @QtCore.pyqtSlot(dict, str) def extract_points(self, params: dict, gid: str): xy_mode = params.pop('xy') _active = self.graphs[self.current_graph].active new_datasets = {} for sid in _active: data_i = self.data[sid] if data_i.group not in new_datasets: new_datasets[data_i.group] = [], [] new_x_axis, _temp = new_datasets[data_i.group] new_x_axis.append(data_i.value) _temp.append(data_i.points(params)) key_list = [] for label, (new_x_axis, _temp) in new_datasets.items(): _temp = np.array(_temp) # (number of sets, number of picks, (x, y, y_err)) num_pts = _temp.shape[1] for i in range(num_pts): if xy_mode[0]: key = self.add(Points(x=new_x_axis, y=_temp[:, i, 0], name=label)) key_list.append(key) if xy_mode[1]: key = self.add(Points(x=new_x_axis, y=_temp[:, i, 1], y_err=_temp[:, i, 2], name=label)) key_list.append(key) self.newData.emit(key_list, gid) @QtCore.pyqtSlot(list) def get_properties(self, sid: list) -> dict: props = {} for key in sid: if key not in self.data: continue props = self.data[key].get_properties() self.properties_collected.emit(props) return props @QtCore.pyqtSlot(list, str, str, object) def update_property(self, sid: list, key1: str, key2: str, value: Any): for s in sid: self.data[s].update_property(key1, key2, value) def create_empty(self): import numpy.random as random dat = Points(x=np.arange(10), y=np.arange(10) + random.rand(10)-0.5, y_err=random.rand(10), name='Das Sein und das Nichts') idd = self.add(dat) self.newData.emit([idd], self.current_graph) @QtCore.pyqtSlot(tuple, dict, str) def calc_mean(self, dist_params, conversion, graph): dist, args = dist_params parameter = [] x = None name = 'tau (%s)' % {conversion["to_"]} value = 0. for i, p in enumerate(args): if isinstance(p, float): parameter.append(p) else: if x is None: x = self.data[p].x if i == 0: name = self.data[p].name value = self.data[p].value parameter.append(self.data[p].y) if x is None: x = 0 key = self.add(Points(x, dist.convert(*parameter, **conversion), name=name, value=value)) self.newData.emit([key], graph) self.sender().update_graphs(self.graphs.tree(key_only=True)) @QtCore.pyqtSlot(list, str, bool, bool, tuple, str) def interpolate_data(self, data_ids, mode, xlog, ylog, new_axis, dest_graph): if len(new_axis) == 4: start, end, steps, loggy = new_axis if loggy: new_x = np.logspace(np.log10(start), np.log10(end), steps) else: new_x = np.linspace(start, end, steps) else: new_x = self.data[new_axis[0]].x new_key = [] for ids in data_ids: k = self.add(interpolate(self.data[ids], new_x, xlog=xlog, ylog=ylog, kind=mode, extrapolate=True)) new_key.append(k) self.newData.emit(new_key, dest_graph) @QtCore.pyqtSlot(int, dict) def smooth_data(self, npoints, param_kwargs): _active = self.graphs[self.current_graph].active new_data = [] for sid in _active: try: key = self.add(smooth(self.data[sid], npoints, **param_kwargs)) new_data.append(key) except Exception as e: QtWidgets.QMessageBox().warning(self.window, 'Smoothing failed!', f'Smoothing failed for {self.data[sid].name} with exception:\n{e.args}') if new_data: self.newData.emit(new_data, self.current_graph) @QtCore.pyqtSlot() def set_cycle(self, set_idx: list, cycle_name: str): col = cycle(available_cycles[cycle_name]) for s_id in set_idx: self.data[s_id].setColor(next(col), symbol=True, line=True, mode='all') @QtCore.pyqtSlot(dict, tuple) def shift_scale(self, values: dict, options: tuple): copy_data, value_plot = options sid_list = [] shift_y = [] shift_x = [] for k, v in values.items(): d_k = self.data[k] if copy_data is None: d_k.x = d_k.x*v[1][0] + v[0][0] d_k.y = d_k.y*v[1][1] + v[0][1] else: new_data = d_k.copy(full=True) new_data.update({'shift': v[0], 'scale': v[1]}) new_data.data.x = new_data.x*v[1][0] + v[0][0] new_data.y = new_data.y*v[1][1] + v[0][1] sid = self.add(new_data) sid_list.append(sid) shift_x.append(d_k.value) shift_y.append(v[0]+v[1]) self.newData.emit(sid_list, copy_data) if value_plot is not None: sid_list = [] shift_y = np.array(shift_y) for i, (mode, default) in enumerate([('x shift', 0.), ('y shift', 0.), ('x scale', 1.), ('y scale', 1.), ]): if np.all(shift_y[:, i] == default): continue data = Points(shift_x, shift_y[:, i], name=mode) sid_list.append(self.add(data)) self.newData.emit(sid_list, value_plot) @QtCore.pyqtSlot(list) def convert_sets(self, src: list): new_graph = {} error_list = [] for sets in src: # merge: sets (real, imag, graph, type) # normal: sets (source set, graph, type) graph_id = sets[-2] new_type = [Points, FID, Spectrum, BDS][sets[-1]] if len(sets) == 4: real_set, imag_set = sets[0], sets[1] if real_set != '': data = self.data[real_set] new_data = new_type(data.x, data.y.real) if imag_set != '': imag_data = self.data[imag_set] if len(imag_data) == len(data): new_data.y.imag = imag_data.y.real else: error_list.append(f'Lengths mismatch of {data.name} ({len(data)}) and {imag_data.name} ({len(imag_data)})') continue else: data = self.data[imag_set] new_data = new_type(data.x, np.zeros(data.x.size)) new_data.y.imag = data.y.real else: data = self.data[sets[0]] if isinstance(data.data, new_type): error_list.append(f'{data.name} is alreade of type {new_type.__name__}') continue new_data = new_type(data.x, np.zeros(data.x.size)) new_data.y.real = data.y.real new_data.update(data.opts) new_id = self.add(data.change_type(new_data)) if graph_id not in new_graph: new_graph[graph_id] = [] new_graph[graph_id].append(new_id) for g, s in new_graph.items(): self.newData.emit(s, g) if error_list: err_string = "\n- ".join(error_list) _ = QtWidgets.QMessageBox.information(QtWidgets.QWidget(), 'Something was skipped', f'Some conversions were skipped:\n{err_string}') def get_namespace(self): from ..lib.namespace import Namespace self.namespace = Namespace(basic=True, const=True, fitfuncs=True) for i, g in enumerate(self.graphs.values()): for j, sid in enumerate(g.sets): sets = self.data[sid] self.namespace.add_namespace(sets.get_namespace(i, j), parents=('Data', f'{sets.name} ({g.title})')) return self.namespace @QtCore.pyqtSlot(list, list, bool) def eval_expression(self, cmds: list, set_ids: list, overwrite: bool): ns = self.namespace.flatten() if overwrite: self.undostack.beginMacro('Evaluate expression') failures = [] for sid in set_ids: data_i = self.data[sid] try: # use a copy of original namespace new_data = data_i.eval_expression(cmds, dict(ns)) if overwrite: cmd = EvalCommand(self.data, sid, new_data, 'Evaluate expression') self.undostack.push(cmd) else: new_id = self.copy_sets(sets=[sid]) self.data[new_id[0]].data = new_data except Exception as e: failures.append((data_i, e)) print(str(data_i) + ' failed with Exception: ' + ''.join(e.args)) continue if overwrite: self.undostack.endMacro() if failures: err_msg = QtWidgets.QMessageBox(parent=self.sender()) err_msg.setText('One or more errors occured during evaluation.') err_msg.setDetailedText('\n'.join(f'{d.name} failed with error: {err.args}' for d, err in failures)) err_msg.exec() self.sender().success = not failures self.sender().add_data(self.active_sets) @QtCore.pyqtSlot(list, dict) def create_from_function(self, cmds: list, opts: dict): ns = dict(self.namespace.flatten()) dtype = [Points, FID, Spectrum, BDS][opts.pop('dtype')] try: for c in cmds: exec(c, globals(), ns) name = opts.pop('name') value = opts.pop('val') graph = opts.pop('graph') data = dtype(x=ns['x'], y=ns['y'], y_err=ns['y_err'], name=name, value=value) s_id = self.add(data, **opts) self.sender().success = True self.newData.emit([s_id], graph) except Exception as err: print('Creation failed with error: ' + ', '.join(err.args)) err_msg = QtWidgets.QMessageBox(parent=self.sender()) err_msg.setText('One or more errors occured during evaluation.') err_msg.setDetailedText('Creation failed with error: ' + ', '.join(err.args)) err_msg.exec() self.sender().success = False def show_statistics(self, mode): x, y, = [], [] for i, _ in self.active_sets: _temp = self.data[i] try: x.append(float(_temp.name)) except ValueError: x.append(i) y.append(_temp.statistic(mode)) @QtCore.pyqtSlot() def calc_magn(self): new_id = [] for k, _ in self.active_sets: dataset = self.data[k] if isinstance(dataset, SignalContainer): new_value = dataset.copy(full=True) new_value.data = dataset.data.magnitude() new_id.append(self.add(new_value)) self.newData.emit(new_id, '') @QtCore.pyqtSlot() def center(self): new_id = [] for k, _ in self.active_sets: new_value = self.data[k].copy(full=True) new_value.x -= new_value.x[np.argmax(new_value.y.real)] new_id.append(self.add(new_value)) self.newData.emit(new_id, '') def integrate(self, **kwargs): new_sets = [] log = kwargs['log'] limits = kwargs.get('limits') mode = kwargs['mode'] for set_id in kwargs['sets']: data_i = self.data[set_id] if mode == 'i': new_data = data_i.data.integrate(log=log, limits=limits) elif mode == 'd': new_data = data_i.data.diff(log=log) else: raise ValueError(f'Unknown mode {mode}.') new_container = data_i.copy(full=True) new_container.data = new_data new_sets.append(self.add(new_container)) self.newData.emit(new_sets, kwargs['graph']) def integral_datasets(self, ranges: list, x_vals: list, y_vals: np.ndarry): new_sets = [] for range_i, y_val_i in zip(ranges, y_vals): new_sets.append(self.add(Points(x=x_vals, y=y_val_i, name=f'{range_i[0]:.4g}-{range_i[1]:.4g}'))) self.newData.emit(new_sets, '') def bds_deriv(self): new_sets = [] for (set_id, _) in self.active_sets: data_i = self.data[set_id] diff = data_i.data.diff(log=True) new_data = Points(x=diff.x, y=-np.pi/2*diff.y.real) new_data.update(data_i.data.meta) new_sets.append(self.add(new_data, color=data_i.plot_imag.linecolor)) self.newData.emit(new_sets, '') def logft(self, **kwargs): new_sets = [] ft_mode = kwargs['ft_mode'] for set_id in kwargs['sets']: data_i = self.data[set_id] if ft_mode in ['cos', 'sin']: new_data = Points(*logft(data_i.x, data_i.y, mode=ft_mode)) else: new_data = Signal(*logft(data_i.x, data_i.y, mode=ft_mode)) new_sets.append(self.add(new_data, color=data_i['color'], symbol=data_i['symbol'], line=data_i['line'])) self.data[new_sets[-1]].update(data_i.data.meta) self.newData.emit(new_sets, kwargs['graph']) def skip_points(self, offset: int, step: int, invert: bool = False, copy: bool = False): for k, _ in self.active_sets: src = self.data[k] if invert: mask = np.mod(np.arange(offset, src.x.size+offset), step) != 0 else: mask = np.mod(np.arange(offset, src.x.size+offset), step) == 0 if copy: data = src.copy() temp = data.mask.copy() temp[temp] = mask data.remove(np.where(~temp)) data.mask = np.ones(data.x.shape) idd = self.add(data) self.newData.emit([idd], self.current_graph) else: src.mask[src.mask] = mask src.mask = src.mask @QtCore.pyqtSlot(dict) def calc_relaxation(self, opts: dict): params = opts['pts'] if len(params) == 4: if params[3]: _x = x1 = np.geomspace(params[0], params[1], num=params[2]) else: _x = x1 = np.linspace(params[0], params[1], num=params[2]) if opts['axis1'] in ['t', 'invt1000']: t_p = opts['t_param'] if len(t_p) == 2: from nmreval.models import Arrhenius as Func else: from nmreval.models import VFT as Func _x = Func.func(x1, *t_p, invt=opts['axis1']) else: if params[1]: x1 = self.data[params[0]].x _x = self.data[params[0]].y.real else: _x = x1 = self.data[params[0]].x x2 = opts['val2'] sd = opts['spec_dens'] sd_param = [self.data[p].y.real if isinstance(p, str) else p for p in opts['sd_param'][0]] sd.convert(_x, *sd_param, from_=opts['tau_type'], to_='raw') relax = Relaxation() relax.set_distribution(sd, parameter=sd_param, keywords=opts['sd_param'][1]) cp_param = [self.data[p].y.real if isinstance(p, str) else p for p in opts['cp_param'][0]] relax.set_coupling(opts['coup'], parameter=cp_param, keywords=opts['cp_param'][1]) if opts['out'] == 't1': y = relax.t1(x2, _x) else: y = relax.t2(x2, _x) pts = Points(x1, y, name=sd.name) pts.meta.update(opts) # we do not want class instances pts.meta['coup'] = opts['coup'].name pts.meta['spec_dens'] = sd.name self.newData.emit([self.add(pts)], opts['graph']) self.sender().update_graphs(self.graphs.list()) @QtCore.pyqtSlot(str, list) def mask_value(self, idx: str, m: list): self.data[idx].mask = m @QtCore.pyqtSlot(str, list) def remove_values(self, idx: str, m: list): self.data[idx].remove(m) @QtCore.pyqtSlot(str, int) def split_set(self, idx: str, row: int): selected_data = self.data[idx] popular_front_of_judea = selected_data.copy(full=True) selected_data.remove(slice(row, None)) popular_front_of_judea.remove(slice(None, row)) new_id = self.add(popular_front_of_judea) self.newData.emit([new_id], selected_data.graph) def set_values(self, idx: str, pos: tuple, value): self.data[idx].setvalues(pos, value) def append(self, idx: str): self.data[idx].add([0.0, 0.0, 0.0]) def save(self, outpath: str | pathlib.Path, extension: str, strip_spaces=False): path = pathlib.Path(outpath) suffix = path.suffix if not suffix: m = re.match(r'[\w\s]*\(\*(\.\w+)\)', extension) if m: suffix = m.group(1) path = path.with_suffix(suffix) else: raise ValueError('No file extension detected') if suffix == '.nmr': from nmreval.io.sessionwriter import NMRWriter NMRWriter(self.graphs, self.data).export(path) return real_outnames = [] for set_id, set_name in self.active_sets: full_name = path.stem if '