forked from IPKM/nmreval
First commit
This commit is contained in:
5
nmreval/data/__init__.py
Executable file
5
nmreval/data/__init__.py
Executable file
@ -0,0 +1,5 @@
|
||||
from .points import Points
|
||||
from .signals import Signal
|
||||
from .bds import BDS
|
||||
from .dsc import DSC
|
||||
from .nmr import FID, Spectrum
|
47
nmreval/data/bds.py
Normal file
47
nmreval/data/bds.py
Normal file
@ -0,0 +1,47 @@
|
||||
from typing import Any
|
||||
|
||||
import numpy as np
|
||||
from scipy.signal import savgol_filter
|
||||
|
||||
from . import Points
|
||||
from .signals import Signal
|
||||
|
||||
|
||||
class BDS(Signal):
|
||||
"""
|
||||
Extension of Signal for dielectric spectroscopy purposes.
|
||||
"""
|
||||
|
||||
def __init__(self, x, y, **kwargs: Any):
|
||||
super().__init__(x, y, **kwargs)
|
||||
|
||||
if np.all(self._x > 0) and not np.allclose(np.diff(self._x), self._x[1]-self._x[0]):
|
||||
self.dx = self.x[1] / self.x[0]
|
||||
else:
|
||||
self.dx = self._x[1] - self._x[0]
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"{self.meta['mode']}: {self.name}"
|
||||
|
||||
def derivative(self, window_length=9) -> Points:
|
||||
"""
|
||||
Calculates the derivative :math:`-\\frac{\\pi}{2}\\frac{d\\epsilon'}{d\\ln\\omega}`.
|
||||
To reduce :func:`scipy.signal.savgol_filter`
|
||||
|
||||
Args:
|
||||
window_length (int)
|
||||
|
||||
Returns:
|
||||
Points
|
||||
New Points instance with
|
||||
|
||||
References:
|
||||
Wübbenhorst, M.; van Turnhout, J.: Analysis of complex dielectric spectra.
|
||||
I. One-dimensional derivative techniques and three-dimensional modelling.
|
||||
J. Non-Cryst. Solid. 305 (2002) https://doi.org/10.1016/s0022-3093(02)01086-4
|
||||
|
||||
"""
|
||||
y = -savgol_filter(self.y[self.mask].real, window_length, 2, deriv=1) * np.pi / 2
|
||||
data = Points(x=self.x[self.mask], y=y)
|
||||
data.update(self.meta)
|
||||
return data
|
6
nmreval/data/dsc.py
Normal file
6
nmreval/data/dsc.py
Normal file
@ -0,0 +1,6 @@
|
||||
from .points import Points
|
||||
|
||||
|
||||
class DSC(Points):
|
||||
def __init__(self, x, y, **kwargs):
|
||||
super().__init__(x, y, **kwargs)
|
179
nmreval/data/nmr.py
Normal file
179
nmreval/data/nmr.py
Normal file
@ -0,0 +1,179 @@
|
||||
import warnings
|
||||
from typing import Union
|
||||
|
||||
import numpy as np
|
||||
|
||||
from .signals import Signal
|
||||
|
||||
|
||||
class FID(Signal):
|
||||
def __init__(self, x, y, **kwargs):
|
||||
super().__init__(x, y, **kwargs)
|
||||
|
||||
self.meta['apod'] = []
|
||||
self.meta['zf'] = 0
|
||||
|
||||
def __repr__(self):
|
||||
return f"FID: {self.name}"
|
||||
|
||||
def baseline(self, percentage=0.12):
|
||||
self._y -= self._y[int(-percentage * self.length):].mean()
|
||||
|
||||
return self
|
||||
|
||||
def apod(self, p, method=None):
|
||||
try:
|
||||
self._y *= method.apod(self._x, *p)
|
||||
self.meta['apod'].append((str(method), p))
|
||||
except KeyError:
|
||||
print(f'Unknown apodization {method}')
|
||||
|
||||
def zerofill(self, pad: int = 1):
|
||||
_max_x = self._x.max()
|
||||
|
||||
factor = 2**pad
|
||||
if factor < 1:
|
||||
self._x = self._x[:int(self.length*factor)]
|
||||
self._y = self._y[:int(self.length*factor)]
|
||||
self._y_err = self._y_err[:int(self.length*factor)]
|
||||
self.mask = self.mask[:int(self.length*factor)]
|
||||
|
||||
elif factor > 1:
|
||||
add_length = int((factor-1) * self.length)
|
||||
self._y = np.concatenate((self._y, np.zeros(add_length)))
|
||||
self._y_err = np.concatenate((self._y_err, np.zeros(add_length)))
|
||||
self.mask = np.concatenate((self.mask, np.ones(add_length, dtype=bool)))
|
||||
|
||||
_temp_x = np.arange(1, add_length+1) * self.dx + np.max(self._x)
|
||||
self._x = np.concatenate((self._x, _temp_x))
|
||||
|
||||
self.meta['zf'] += pad
|
||||
|
||||
return self
|
||||
|
||||
def fourier(self):
|
||||
ft = np.fft.fftshift(np.fft.fft(self._y)) / self.dx
|
||||
freq = np.fft.fftshift(np.fft.fftfreq(self.length, self.dx))
|
||||
|
||||
spec = Spectrum(freq, ft, dx=self.dx)
|
||||
spec.update(self.meta)
|
||||
|
||||
return spec
|
||||
|
||||
def fft_depake(self, scale: bool = False):
|
||||
"""
|
||||
Calculate deconvoluted Pake spectra
|
||||
M.A. McCabe, S.R. Wassail:
|
||||
Rapid deconvolution of NMR powder spectra by weighted fast Fourier transformation,
|
||||
SSNMR (1997), Vol.10, Nr.1-2, pp. 53-61,
|
||||
"""
|
||||
_y = self._y + 0
|
||||
# Perform FFT depaking
|
||||
_y *= np.sqrt(self._x)
|
||||
|
||||
# built halves
|
||||
right = np.fft.fftshift(np.fft.fft(_y)) * (1 + 1j)
|
||||
left = np.fft.fftshift(np.fft.fft(_y)) * (1 - 1j)
|
||||
|
||||
freq = np.fft.fftshift(np.fft.fftfreq(self.length, self.dx))
|
||||
|
||||
if scale:
|
||||
# fourier transform should be multiplied with \sqrt(|omega|)
|
||||
# but lines at omega=0 are suppressed, so sometimes not a good idea
|
||||
right *= np.sqrt(np.abs(freq))
|
||||
left *= np.sqrt(np.abs(freq))
|
||||
|
||||
# built depaked spectrum
|
||||
new_spc = np.concatenate([left[:(self.length//2)], right[(self.length//2):]])
|
||||
|
||||
spec = Spectrum(2*freq, new_spc, dx=self.dx)
|
||||
spec.update(self.meta)
|
||||
|
||||
return spec
|
||||
|
||||
def convert(self):
|
||||
spec = Spectrum(self._x, self._y, dx=self.dx)
|
||||
self.update(self.meta)
|
||||
|
||||
return spec
|
||||
|
||||
def _calc_noise(self):
|
||||
# noise per channel, so divide by sqrt(2)
|
||||
self._y_err = np.std(self._y[-int(0.1*self.length):])/1.41
|
||||
return self._y_err * np.ones(self.length)
|
||||
|
||||
def shift(self, points: Union[str, float], mode: str = 'pts'):
|
||||
"""
|
||||
|
||||
Args:
|
||||
points : shift value,
|
||||
mode (str):
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
if mode == 'pts':
|
||||
super().shift(points)
|
||||
else:
|
||||
pts = int(points//self.dx)
|
||||
super().shift(pts)
|
||||
|
||||
|
||||
class Spectrum(Signal):
|
||||
def __init__(self, x, y, dx=None, **kwargs):
|
||||
super().__init__(x, y, **kwargs)
|
||||
if dx is None:
|
||||
if 2 * abs(self._x[0])/len(self._x) - abs(self._x[1]-self._x[0]) < 1e-9:
|
||||
self.dx = 1/(2 * abs(self._x[0]))
|
||||
else:
|
||||
self.dx = 1
|
||||
else:
|
||||
self.dx = dx
|
||||
|
||||
def __repr__(self):
|
||||
return f"Spectrum: {self.name}"
|
||||
|
||||
def baseline(self, percentage: float = 0.12):
|
||||
corr = np.mean([self._y[int(-percentage*self.length):], self._y[:int(percentage*self.length)]])
|
||||
self._y -= corr
|
||||
|
||||
return self
|
||||
|
||||
def fourier(self):
|
||||
ft = np.fft.ifft(np.fft.ifftshift(self._y * self.dx))
|
||||
t = np.arange(len(ft))*self.dx
|
||||
shifted = np.fft.ifftshift(self._x)[0]
|
||||
if shifted != 0:
|
||||
warnings.warn(f'Shift of {shifted}: Spectrum is not zero-centered and FFT may give wrong results!')
|
||||
ft *= np.exp(1j * shifted * 2*np.pi*t)
|
||||
|
||||
fid = FID(t, ft)
|
||||
fid.update(self.meta)
|
||||
|
||||
return fid
|
||||
|
||||
def convert(self):
|
||||
fid = FID(self._x, self._y)
|
||||
fid.update(self.meta)
|
||||
|
||||
return fid
|
||||
|
||||
def _calc_noise(self):
|
||||
step = int(0.05 * self.length)
|
||||
start = np.argmin(abs(self._x - max(self._x.min(), -2e5)))
|
||||
stop = np.argmin(abs(self._x - min(self._x.max(), 2e5)))
|
||||
if start > stop:
|
||||
stop, start = start, stop
|
||||
stop -= step
|
||||
self._y_err = np.inf
|
||||
for i in range(start, stop):
|
||||
# scan regions to find minimum noise (undisturbed by signals)
|
||||
self._y_err = min(np.std(self._y[i:i + step])/np.sqrt(2), self._y_err)
|
||||
|
||||
return self._y_err * np.ones(self.length)
|
||||
|
||||
def shift(self, points, mode='pts'):
|
||||
if mode == 'pts':
|
||||
super().shift(points)
|
||||
else:
|
||||
return self
|
665
nmreval/data/points.py
Normal file
665
nmreval/data/points.py
Normal file
@ -0,0 +1,665 @@
|
||||
import copy
|
||||
from numbers import Number, Real
|
||||
from pathlib import Path
|
||||
from typing import Any, List, Optional, Tuple, TypeVar, Union
|
||||
|
||||
import numpy as np
|
||||
try:
|
||||
from scipy.integrate import simpson, cumulative_trapezoid
|
||||
except ImportError:
|
||||
from scipy.integrate import simps as simpson, cumtrapz as cumulative_trapezoid
|
||||
|
||||
from ..lib.utils import ArrayLike
|
||||
from ..utils import NUMBER_RE
|
||||
|
||||
PointLike = TypeVar('PointLike', bound='Points')
|
||||
|
||||
"""
|
||||
This is a test for all and everything
|
||||
"""
|
||||
|
||||
|
||||
class Points:
|
||||
"""
|
||||
Base class for all datatypes
|
||||
|
||||
Args:
|
||||
x (array_like): x values
|
||||
y (array_like): y values
|
||||
y_err (array_like, optional): errors
|
||||
|
||||
"""
|
||||
def __init__(self, x: ArrayLike, y: ArrayLike, y_err: Optional[ArrayLike] = None, **kwargs: Any):
|
||||
|
||||
self._x, self._y, self._y_err, self.mask = self._prepare_xy(x, y, y_err=y_err)
|
||||
|
||||
name = str(kwargs.pop('name', ''))
|
||||
value = kwargs.pop('value', None)
|
||||
self.meta = {
|
||||
'group': '',
|
||||
'name': name.split('/')[-1]
|
||||
}
|
||||
self.meta.update(kwargs)
|
||||
|
||||
if isinstance(value, str):
|
||||
for m in NUMBER_RE.finditer(value):
|
||||
value = float(m.group().replace('p', '.'))
|
||||
|
||||
try:
|
||||
value = float(value)
|
||||
except ValueError:
|
||||
value = None
|
||||
|
||||
if value is None:
|
||||
for m in NUMBER_RE.finditer(name):
|
||||
value = float(m.group().replace('p', '.'))
|
||||
if not value:
|
||||
value = 0.0
|
||||
|
||||
self.meta['value'] = value
|
||||
if self.name == '':
|
||||
self.name = str(value)
|
||||
|
||||
@staticmethod
|
||||
def _prepare_xy(x: ArrayLike, y: ArrayLike, y_err: Optional[ArrayLike] = None):
|
||||
x = np.atleast_1d(x).astype(float)
|
||||
if x.ndim > 1:
|
||||
raise TypeError('x axis cannot be multidimensional')
|
||||
|
||||
y = np.atleast_1d(y)
|
||||
if not np.iscomplexobj(y):
|
||||
y = y.astype(float)
|
||||
|
||||
if x.shape != y.shape:
|
||||
# remove ugly 1-length dims
|
||||
x = np.squeeze(x)
|
||||
y = np.squeeze(y)
|
||||
if x.shape != y.shape:
|
||||
raise ValueError(f'x ({x.shape}) and y ({y.shape}) have different shapes')
|
||||
|
||||
mask = np.ones_like(x, dtype=bool)
|
||||
|
||||
if y_err is None:
|
||||
y_err = np.zeros(y.shape, dtype=float)
|
||||
|
||||
elif isinstance(y_err, Real):
|
||||
y_err = np.ones(y.shape, dtype=float) * y_err
|
||||
else:
|
||||
y_err = np.atleast_1d(y_err)
|
||||
if y_err.shape != y.shape:
|
||||
raise ValueError(f'y_err ({y_err.shape}) and y ({y.shape}) have different shapes')
|
||||
|
||||
return x, y, y_err, mask
|
||||
|
||||
@property
|
||||
def x(self) -> np.ndarray:
|
||||
return self._x
|
||||
|
||||
@x.setter
|
||||
def x(self, value: ArrayLike):
|
||||
value = np.asarray(value, dtype=float)
|
||||
if value.shape == self._x.shape:
|
||||
self._x = value
|
||||
elif value.shape == self._x[self.mask].shape:
|
||||
self._x = value
|
||||
self._y = self._y[self.mask]
|
||||
self._y_err = self._y_err[self.mask]
|
||||
else:
|
||||
raise ValueError('Shape mismatch in setting x')
|
||||
|
||||
@property
|
||||
def y(self) -> np.ndarray:
|
||||
return self._y
|
||||
|
||||
@y.setter
|
||||
def y(self, value: ArrayLike):
|
||||
"""
|
||||
Set new y values. If the length of the new array equals that of original y including masked points
|
||||
this is equivalent to a simple assignment. If the length equals the masked points then masked points are
|
||||
discarded.
|
||||
|
||||
Args:
|
||||
value: array of the same length as full or masked y
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
value = np.asarray(value, dtype=self._y.dtype)
|
||||
if value.shape == self._y.shape:
|
||||
self._y = value
|
||||
elif value.shape == self._y[self.mask].shape:
|
||||
self._y[self.mask] = value
|
||||
self._x = self._x[self.mask]
|
||||
self._y_err = self._y_err[self.mask]
|
||||
else:
|
||||
raise ValueError('Shape mismatch in setting y')
|
||||
|
||||
@property
|
||||
def y_err(self) -> np.ndarray:
|
||||
return self._y_err
|
||||
|
||||
@y_err.setter
|
||||
def y_err(self, value: ArrayLike):
|
||||
value = np.asarray(value, dtype=float)
|
||||
if value.shape == self._y_err.shape:
|
||||
self._y_err = value
|
||||
elif value.shape == self._y_err[self.mask].shape:
|
||||
self._y[self.mask] = value
|
||||
else:
|
||||
raise ValueError('Shape mismatch in setting y_err')
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self._x[self.mask])
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'Point: {self.name}'
|
||||
|
||||
def __getitem__(self, item: str) -> Any:
|
||||
try:
|
||||
return self.meta[item]
|
||||
except KeyError:
|
||||
KeyError(f'{item} not found')
|
||||
|
||||
def __setitem__(self, key: str, value: Any):
|
||||
self.meta[key] = value
|
||||
|
||||
def __delitem__(self, key: str):
|
||||
del self.meta[key]
|
||||
|
||||
def update(self, opts: dict) -> None:
|
||||
"""
|
||||
Update metadata
|
||||
|
||||
Args:
|
||||
opts:
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.meta.update(opts)
|
||||
|
||||
def __lshift__(self, other):
|
||||
if isinstance(other, Real):
|
||||
_tempx = self._x - other
|
||||
ret = type(self)(_tempx, self._y)
|
||||
self.update(self.meta)
|
||||
|
||||
return ret
|
||||
else:
|
||||
raise TypeError(f'{other} is not a number')
|
||||
|
||||
def __rshift__(self, other):
|
||||
return self.__lshift__(-other)
|
||||
|
||||
def __ilshift__(self, other):
|
||||
if isinstance(other, Real):
|
||||
self._x -= other
|
||||
return self
|
||||
else:
|
||||
raise TypeError(f'{other} is not a number')
|
||||
|
||||
def __irshift__(self, other):
|
||||
return self.__ilshift__(-other)
|
||||
|
||||
def __add__(self, other):
|
||||
"""
|
||||
Implements self + other
|
||||
"""
|
||||
if isinstance(other, Number):
|
||||
_tempy = self._y + other
|
||||
ret = type(self)(self._x, _tempy)
|
||||
ret.update(self.meta)
|
||||
return ret
|
||||
elif isinstance(other, self.__class__):
|
||||
if np.equal(self._x, other.x).all():
|
||||
_tempy = self._y + other.y
|
||||
ret = type(self)(self._x, _tempy)
|
||||
ret.update(self.meta)
|
||||
|
||||
return ret
|
||||
else:
|
||||
raise ValueError('Objects have different x values')
|
||||
|
||||
def __radd__(self, other):
|
||||
"""
|
||||
Implements other + self
|
||||
"""
|
||||
return self.__add__(other)
|
||||
|
||||
def __iadd__(self, other):
|
||||
"""
|
||||
Implements self += other
|
||||
"""
|
||||
if isinstance(other, Number):
|
||||
self._y += other
|
||||
return self
|
||||
elif isinstance(other, self.__class__):
|
||||
# restrict to same x values
|
||||
if np.equal(self._x, other.x).all():
|
||||
self._y += other.y
|
||||
else:
|
||||
raise ValueError('Objects have different x values')
|
||||
return self
|
||||
|
||||
def __neg__(self):
|
||||
"""
|
||||
Implements -self
|
||||
"""
|
||||
ret = type(self)(self._x, self._y * (-1))
|
||||
ret.update(self.meta)
|
||||
|
||||
return ret
|
||||
|
||||
@property
|
||||
def value(self):
|
||||
return self.meta['value']
|
||||
|
||||
@property
|
||||
def group(self):
|
||||
return self.meta['group']
|
||||
|
||||
@group.setter
|
||||
def group(self, value: str):
|
||||
self.meta['group'] = str(value)
|
||||
|
||||
@value.setter
|
||||
def value(self, value: float):
|
||||
self.meta['value'] = float(value)
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return self.meta['name']
|
||||
|
||||
@name.setter
|
||||
def name(self, value):
|
||||
self.meta['name'] = str(value)
|
||||
|
||||
@property
|
||||
def length(self):
|
||||
return len(self._x)
|
||||
|
||||
def points(self, idx: Optional[list] = None, special: Optional[str] = None,
|
||||
avg_range: Tuple[int, int] = (0, 0), avg_mode: str = 'mean',
|
||||
pts: Optional[list] = None) -> List[tuple]:
|
||||
"""
|
||||
Return (x, y) values at specific positions.
|
||||
|
||||
Args:
|
||||
idx (list, optional) : List of indexes to evaluate.
|
||||
special (str, {'max', 'min', 'absmax', 'absmin'}, optional) :
|
||||
Special positions to extract.
|
||||
|
||||
`max` : Selects position of the maximum of y, or real part if y is complex.
|
||||
|
||||
'min' : Selects position of the minimum of y, or real part if y is complex.
|
||||
|
||||
'absmax' : Selects position at the maximum of the absolute value of y.
|
||||
|
||||
'absmin' : Selects position at the minimum of the absolute value of y.
|
||||
|
||||
avg_range (tuple of int) :
|
||||
Region for average of y values. Tuple (a, b) uses ``y[i-a:i+b+1]`` around index `i`. Default is (0, 0).
|
||||
avg_mode (str {'mean', 'sum', 'integral'} , optional) :
|
||||
Averaging type
|
||||
|
||||
`mean` : Arithmetic average
|
||||
|
||||
`sum`: Sum over y values
|
||||
|
||||
'integral`: Integration over range using Simpson's rule
|
||||
|
||||
pts (list, optional) :
|
||||
If given, points will be appended.
|
||||
|
||||
Returns :
|
||||
list of tuples (x, y, y_err)
|
||||
"""
|
||||
|
||||
if (idx is None) and (special is None):
|
||||
raise ValueError('Either `idx` or `special` must be given')
|
||||
|
||||
if avg_mode not in ['mean', 'sum', 'integral']:
|
||||
raise ValueError(f'Parameter `avg_mode` is `mean`, `sum`, `integral`, not `{avg_mode}`' )
|
||||
|
||||
if pts is None:
|
||||
pts = []
|
||||
|
||||
for x in idx:
|
||||
if isinstance(x, tuple):
|
||||
x_idx = np.argmin(np.abs(self._x[self.mask] - (x[0]+x[1])/2))
|
||||
left_b = np.argmin(np.abs(self._x[self.mask] - x[0]))
|
||||
right_b = np.argmin(np.abs(self._x[self.mask] - x[1]))
|
||||
else:
|
||||
x_idx = np.argmin(np.abs(self._x[self.mask]-x))
|
||||
left_b = int(max(0, x_idx - avg_range[0]))
|
||||
right_b = int(min(len(self), x_idx + avg_range[1] + 1))
|
||||
|
||||
if left_b < right_b:
|
||||
pts.append([self._x[x_idx], *self._average(avg_mode, x_idx, left_b, right_b)])
|
||||
else:
|
||||
pts.append([self._x[x_idx], self._y[x_idx], self._y_err[x_idx]])
|
||||
|
||||
if special is not None:
|
||||
if special not in ['max', 'min', 'absmax', 'absmin']:
|
||||
raise ValueError('Parameter "special" must be "max", "min", "absmax", "absmin".')
|
||||
|
||||
if special == 'max':
|
||||
x_idx = np.argmax(self._y.real[self.mask])
|
||||
elif special == 'min':
|
||||
x_idx = np.argmax(self._y.real[self.mask])
|
||||
elif special == 'absmax':
|
||||
x_idx = np.argmax(np.abs(self._y[self.mask]))
|
||||
else:
|
||||
x_idx = np.argmin(np.abs(self._y[self.mask]))
|
||||
|
||||
left_b = int(max(0, x_idx - avg_range[0]))
|
||||
right_b = int(min(len(self), x_idx + avg_range[1] + 1))
|
||||
|
||||
pts.append([self._x[self.mask][x_idx], *self._average(avg_mode, x_idx, left_b, right_b)])
|
||||
|
||||
return pts
|
||||
|
||||
def _average(self, mode: str, idx, left: int, right: int) -> Tuple[float, float]:
|
||||
if mode == 'mean':
|
||||
y_mean = np.mean(self._y[self.mask][left:right].real)
|
||||
y_err = np.linalg.norm(self._y_err[self.mask][left:right]) / (right - left)
|
||||
|
||||
elif mode == 'sum':
|
||||
y_mean = np.sum(self._y[self.mask][left:right].real)
|
||||
y_err = np.linalg.norm(self._y_err[self.mask][left:right])
|
||||
|
||||
elif mode == 'integral':
|
||||
y_mean = simpson(self._y[self.mask][left:right].real, x=self._x[left:right])
|
||||
y_err = np.linalg.norm(cumulative_trapezoid(self._y_err[self.mask][left:right].real, x=self._x[left:right]))
|
||||
|
||||
else:
|
||||
y_mean = self._y[self.mask][idx].real
|
||||
y_err = self._y_err[self.mask][idx]
|
||||
|
||||
return y_mean, y_err
|
||||
|
||||
def concatenate(self, other):
|
||||
"""
|
||||
Add the values of an object of the same
|
||||
|
||||
Args:
|
||||
other: Dataset of same type
|
||||
|
||||
"""
|
||||
if isinstance(other, self.__class__):
|
||||
self._x = np.r_[self._x, other.x]
|
||||
self._y = np.r_[self._y, other.y]
|
||||
self.mask = np.r_[self.mask, other.mask]
|
||||
self._y_err = np.r_[self._y_err, other.y_err]
|
||||
else:
|
||||
raise TypeError(f'{other} is of type {type(other)}')
|
||||
|
||||
return self
|
||||
|
||||
def integrate(self, log: bool = False, limits: Tuple[float, float] = None) -> PointLike:
|
||||
new_data = self.copy()
|
||||
|
||||
if limits is not None:
|
||||
new_data.cut(*limits)
|
||||
|
||||
if log:
|
||||
new_data.y = cumulative_trapezoid(y=new_data.y, x=np.log(new_data.x), initial=0)
|
||||
else:
|
||||
new_data.y = cumulative_trapezoid(y=new_data.y, x=new_data.x, initial=0)
|
||||
|
||||
return new_data
|
||||
|
||||
def diff(self, log: bool = False, limits: Optional[Tuple[float, float]] = None) -> PointLike:
|
||||
"""
|
||||
|
||||
Args:
|
||||
log:
|
||||
limits:
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
|
||||
new_data = self.copy()
|
||||
|
||||
if limits is not None:
|
||||
new_data.cut(*limits)
|
||||
|
||||
if log:
|
||||
new_data.y = np.gradient(new_data.y, np.log(new_data.x))
|
||||
else:
|
||||
new_data.y = np.gradient(new_data.y, new_data.x)
|
||||
|
||||
return new_data
|
||||
|
||||
def magnitude(self) -> PointLike:
|
||||
new_data = self.copy()
|
||||
new_data.y.real = np.abs(self.y)
|
||||
if np.iscomplexobj(new_data.y):
|
||||
new_data.y.imag = 0.
|
||||
|
||||
return new_data
|
||||
|
||||
def copy(self) -> PointLike:
|
||||
"""
|
||||
Copy the object
|
||||
|
||||
Returns:
|
||||
A copy of the object
|
||||
"""
|
||||
return copy.deepcopy(self)
|
||||
|
||||
def get_values(self, idx: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
|
||||
"""
|
||||
Return values at a given index
|
||||
|
||||
Args:
|
||||
idx (int) : Index value
|
||||
|
||||
Returns:
|
||||
Tuple of (`x[idx]`, `y[idx]` , `y_err[idx]`)
|
||||
"""
|
||||
return self._x[idx], self._y[idx], self._y_err[idx]
|
||||
|
||||
def set_values(self, idx: int, value: Union[list, tuple, np.ndarray]) -> PointLike:
|
||||
if not (isinstance(value, (list, tuple, np.ndarray)) and (len(value) in [2, 3])):
|
||||
raise TypeError('Values should be list type of length 2 or 3')
|
||||
|
||||
self._x[idx] = value[0]
|
||||
self._y[idx] = value[1]
|
||||
|
||||
if len(value) == 3:
|
||||
self._y_err[idx] = value[2]
|
||||
else:
|
||||
self._y_err[idx] = 0
|
||||
|
||||
return self
|
||||
|
||||
def set_data(self,
|
||||
x: Optional[np.ndarray] = None,
|
||||
y: Optional[np.ndarray] = None,
|
||||
y_err: Optional[np.ndarray] = None) -> PointLike:
|
||||
if x is None:
|
||||
x = self._x
|
||||
if y is None:
|
||||
y = self._y
|
||||
if y_err is not None:
|
||||
y_err = self._y_err
|
||||
|
||||
self._x, self._y, self._y_err, self.mask = self._prepare_xy(x, y, y_err)
|
||||
|
||||
return self
|
||||
|
||||
def append(self, x: ArrayLike, y: ArrayLike, y_err: Optional[ArrayLike] = None):
|
||||
x, y, y_err, mask = self._prepare_xy(x, y, y_err)
|
||||
|
||||
self._x = np.r_[self._x, x]
|
||||
self._y = np.r_[self._y, y]
|
||||
self._y_err = np.r_[self._y_err, y_err]
|
||||
self.mask = np.r_[self.mask, mask]
|
||||
|
||||
return self
|
||||
|
||||
def remove(self, idx):
|
||||
self._x = np.delete(self._x, idx)
|
||||
self._y = np.delete(self._y, idx)
|
||||
self.mask = np.delete(self.mask, idx)
|
||||
self._y_err = np.delete(self._y_err, idx)
|
||||
|
||||
return self
|
||||
|
||||
def cut(self, *limits):
|
||||
if len(limits) != 2:
|
||||
raise ValueError('Two arguments are needed')
|
||||
|
||||
low_lim, high_lim = limits
|
||||
if low_lim is None:
|
||||
low_lim = np.min(self._x)
|
||||
|
||||
if high_lim is None:
|
||||
high_lim = np.max(self._x)
|
||||
|
||||
_mask = np.ma.masked_inside(self._x, low_lim, high_lim).mask
|
||||
|
||||
self._x = self._x[_mask]
|
||||
self._y = self._y[_mask]
|
||||
self._y_err = self._y_err[_mask]
|
||||
self.mask = self.mask[_mask]
|
||||
|
||||
return self
|
||||
|
||||
def shift(self, points: int) -> PointLike:
|
||||
"""
|
||||
Move y values `points` indexes, remove invalid indexes and fill remaining with zeros
|
||||
Negative `points` shift to the left, positive `points` shift to the right
|
||||
|
||||
Args:
|
||||
points (int) : shift value
|
||||
|
||||
Example:
|
||||
>>> pts = Points(x=[0., 1., 2., 3.], y=[1., 2., 3., 4.])
|
||||
>>> pts.shift(2)
|
||||
>>> pts.x
|
||||
array([0., 1., 2., 3.])
|
||||
>>> pts.y
|
||||
array([3., 4., 0., 0.])
|
||||
|
||||
Returns:
|
||||
The original object with shifted values.
|
||||
"""
|
||||
if points > 0:
|
||||
self._y = np.roll(self._y, -int(points))
|
||||
self._y[-int(points)-1:] = 0
|
||||
else:
|
||||
self._y = np.roll(self._y, int(points))
|
||||
self._y[:-int(points)] = 0
|
||||
|
||||
self.meta['shift'] += points
|
||||
|
||||
return self
|
||||
|
||||
def sort(self, axis=0):
|
||||
if axis == 0:
|
||||
sort_order = np.argsort(self._x)
|
||||
else:
|
||||
sort_order = np.argsort(self._y)
|
||||
self._x = self._x[sort_order]
|
||||
self._y = self._y[sort_order]
|
||||
self._y_err = self._y_err[sort_order]
|
||||
self.mask = self.mask[sort_order]
|
||||
|
||||
return self
|
||||
|
||||
def normalize(self, mode):
|
||||
if mode == 'first':
|
||||
scale = self._y[0].real
|
||||
elif mode == 'last':
|
||||
scale = self._y[-1].real
|
||||
elif mode == 'max':
|
||||
scale = np.max(self._y.real)
|
||||
elif mode == 'maxabs':
|
||||
scale = np.max(np.abs(self._y))
|
||||
elif mode == 'area':
|
||||
try:
|
||||
from scipy.integrate import simpson
|
||||
except ImportError:
|
||||
from scipy.integrate import simps as simpson
|
||||
scale = simpson(self._y.real, x=self._x)
|
||||
else:
|
||||
raise ValueError(f'Unknown normalize mode {mode}')
|
||||
|
||||
self._y /= scale
|
||||
self._y_err /= scale
|
||||
|
||||
return scale
|
||||
|
||||
def toarray(self, err=True):
|
||||
if np.count_nonzero(self._y_err) == 0 or not err:
|
||||
return np.c_[self._x, self._y]
|
||||
else:
|
||||
return np.c_[self._x, self._y, self._y_err]
|
||||
|
||||
def tohdf(self, hdffile):
|
||||
grp = hdffile
|
||||
grp2_name = self.name
|
||||
|
||||
try:
|
||||
dset = grp.create_dataset(grp2_name, shape=(len(self._x),),
|
||||
dtype=np.dtype([('x', 'f'), ('y', 'f')]),
|
||||
compression='gzip')
|
||||
except RuntimeError:
|
||||
dset = grp[grp2_name]
|
||||
|
||||
dset['x'] = self._x
|
||||
dset['y'] = self._y
|
||||
dset.attrs['TITLE'] = self.group + '/' + self.name
|
||||
dset.attrs['damaris_type'] = 'MeasurementResult'
|
||||
|
||||
def savetxt(self, fname, err=False):
|
||||
path = Path(fname)
|
||||
|
||||
if not path.parent.exists():
|
||||
path.parent.mkdir(parents=True)
|
||||
|
||||
header = []
|
||||
for k, v in self.meta.items():
|
||||
header.append('%s: %s' % (k, str(v)))
|
||||
header = '\n'.join(header)
|
||||
|
||||
if np.all(self.mask):
|
||||
np.savetxt(path, self.toarray(err=err), header=header, fmt='%.10f')
|
||||
else:
|
||||
with path.open('w') as f:
|
||||
f.write(header)
|
||||
for i, l in enumerate(self.toarray(err=err)):
|
||||
if self.mask[i]:
|
||||
f.write('\t'.join(map(str, l.tolist())) + '\n')
|
||||
else:
|
||||
f.write('#' + '\t'.join(map(str, l.tolist())) + '\n')
|
||||
|
||||
def get_state(self) -> dict:
|
||||
ret_dic = {'x': self._x.tolist(),
|
||||
'y': self._y.tolist(),
|
||||
'mask': (np.where(~self.mask)[0]).tolist()}
|
||||
|
||||
if np.all(self._y_err == 0):
|
||||
ret_dic['y_err'] = 0.0
|
||||
else:
|
||||
ret_dic['y_err'] = self._y_err.tolist()
|
||||
|
||||
ret_dic.update(self.meta)
|
||||
|
||||
return ret_dic
|
||||
|
||||
@classmethod
|
||||
def set_state(cls, state: dict):
|
||||
_x = state.pop('x')
|
||||
_y = state.pop('y')
|
||||
_yerr = state.pop('y_err')
|
||||
data = cls(x=_x, y=_y, y_err=_yerr)
|
||||
_m = state.pop('mask')
|
||||
data.mask[_m] = False
|
||||
data.meta.update(state)
|
||||
|
||||
return data
|
||||
|
100
nmreval/data/signals.py
Normal file
100
nmreval/data/signals.py
Normal file
@ -0,0 +1,100 @@
|
||||
import numpy as np
|
||||
|
||||
|
||||
from .points import Points
|
||||
|
||||
|
||||
class Signal(Points):
|
||||
"""
|
||||
Extension of Points for complex y values.
|
||||
"""
|
||||
|
||||
def __init__(self, x, y, **kwargs):
|
||||
y = np.atleast_1d(y)
|
||||
if (y.ndim == 1) or y.shape[0] == 1:
|
||||
y = y.astype(complex)
|
||||
else:
|
||||
y = y[0, :] + 1j * y[1, :]
|
||||
|
||||
super().__init__(x, y, **kwargs)
|
||||
|
||||
self.dx = kwargs.get('dx', abs(self._x[1] - self._x[0]))
|
||||
self.meta.update({'phase': [], 'shift': 0})
|
||||
|
||||
def __repr__(self):
|
||||
return 'Signal: ' + self.name
|
||||
|
||||
def swap(self):
|
||||
"""
|
||||
|
||||
Swaps real and imaginary part of y
|
||||
|
||||
"""
|
||||
self._y.real, self._y.imag = self._y.imag, self._y.real
|
||||
|
||||
return self
|
||||
|
||||
def phase(self, method: str = 'manual', **kwargs):
|
||||
if method == 'manual':
|
||||
self.manual_phase(**kwargs)
|
||||
elif method == 'simple':
|
||||
self.simple_phase(**kwargs)
|
||||
else:
|
||||
raise KeyError(f'Not implemented method {method}')
|
||||
|
||||
return self
|
||||
|
||||
def manual_phase(self, ph0: float = 0., ph1: float = 0., pvt: float = 0):
|
||||
self._y *= np.exp(1j * ph0 * np.pi / 180.)
|
||||
if ph1 != 0:
|
||||
x = (self._x - pvt) / max(self._x)
|
||||
self._y *= np.exp(1j * x * ph1 * np.pi / 180.)
|
||||
self.meta['phase'].append((ph0, ph1, pvt))
|
||||
|
||||
return self
|
||||
|
||||
def simple_phase(self, pos: int = 0, avg: int = 0):
|
||||
ph = np.mean(np.angle(self._y)[pos-avg:pos+avg+1])
|
||||
self._y *= np.exp(-1j*ph)
|
||||
|
||||
self.meta['phase'].append((ph, 0, 0))
|
||||
|
||||
return self
|
||||
|
||||
def baseline_spline(self, line):
|
||||
self._y -= line
|
||||
|
||||
return self
|
||||
|
||||
def divide(self, part):
|
||||
if part:
|
||||
self.cut(0.01*self._x[0], self._x[int(self.length//2)+1])
|
||||
else:
|
||||
self.cut(self._x[int(self.length//2)], 10*self._x[-1])
|
||||
|
||||
return self
|
||||
|
||||
def toarray(self, err=False):
|
||||
if not err or np.all(self._y_err == 0.):
|
||||
return np.c_[self._x, self._y.real, self._y.imag]
|
||||
else:
|
||||
return np.c_[self._x, self._y.real, self._y.imag, self._y_err]
|
||||
|
||||
def tohdf(self, hdffile):
|
||||
grp = hdffile
|
||||
try:
|
||||
grp2 = grp.create_group(self.name)
|
||||
is_empty = True
|
||||
except ValueError:
|
||||
is_empty = False
|
||||
grp2 = grp[self.name]
|
||||
grp2.attrs['damaris_type'] = 'Accumulation'
|
||||
grp2.attrs['TITLE'] = self.group + '/' + self.name
|
||||
if is_empty:
|
||||
grp2.create_dataset('accu_data', data=np.c_[self._y.real, self._y.imag],
|
||||
compression='gzip')
|
||||
idx = grp2.create_dataset('indices', (1,), dtype=np.dtype([('dwelltime', 'f')]))
|
||||
idx['dwelltime'] = self.dx
|
||||
else:
|
||||
grp2['accu_data'][...] = np.c_[self._y.real, self._y.imag]
|
||||
grp2['indices']['dwelltime'] = self.dx
|
Reference in New Issue
Block a user