1
0
forked from IPKM/nmreval
nmreval/nmreval/data/points.py
2022-03-24 20:24:28 +01:00

688 lines
20 KiB
Python

import copy
from numbers import Number, Real
from pathlib import Path
from typing import Any, List, Optional, Tuple, TypeVar, Union
import numpy as np
try:
from scipy.integrate import simpson, cumulative_trapezoid
except ImportError:
from scipy.integrate import simps as simpson, cumtrapz as cumulative_trapezoid
from ..lib.utils import ArrayLike
from ..utils import NUMBER_RE
PointLike = TypeVar('PointLike', bound='Points')
class Points:
"""
Base class for all datatypes
Args:
x (array_like): x values
y (array_like): y values
y_err (array_like, optional): errors
"""
def __init__(self, x: ArrayLike, y: ArrayLike, y_err: Optional[ArrayLike] = None, **kwargs: Any):
self._x, self._y, self._y_err, self.mask = self._prepare_xy(x, y, y_err=y_err)
name = str(kwargs.pop('name', ''))
value = kwargs.pop('value', None)
self.meta = {
'group': '',
'name': name.split('/')[-1]
}
self.meta.update(kwargs)
if isinstance(value, str):
for m in NUMBER_RE.finditer(value):
value = float(m.group().replace('p', '.'))
try:
value = float(value)
except ValueError:
value = None
if value is None:
for m in NUMBER_RE.finditer(name):
value = float(m.group().replace('p', '.'))
if not value:
value = 0.0
self.meta['value'] = value
if self.name == '':
self.name = str(value)
@staticmethod
def _prepare_xy(x: ArrayLike, y: ArrayLike, y_err: Optional[ArrayLike] = None):
x = np.atleast_1d(x).astype(float)
if x.ndim > 1:
raise TypeError('x axis cannot be multidimensional')
y = np.atleast_1d(y)
if not np.iscomplexobj(y):
y = y.astype(float)
if x.shape != y.shape:
# remove ugly 1-length dims
x = np.squeeze(x)
y = np.squeeze(y)
if x.shape != y.shape:
raise ValueError(f'x ({x.shape}) and y ({y.shape}) have different shapes')
mask = np.ones_like(x, dtype=bool)
if y_err is None:
y_err = np.zeros(y.shape, dtype=float)
elif isinstance(y_err, Real):
y_err = np.ones(y.shape, dtype=float) * y_err
else:
y_err = np.atleast_1d(y_err)
if y_err.shape != y.shape:
raise ValueError(f'y_err ({y_err.shape}) and y ({y.shape}) have different shapes')
return x, y, y_err, mask
@property
def x(self) -> np.ndarray:
return self._x
@x.setter
def x(self, value: ArrayLike):
value = np.asarray(value, dtype=float)
if value.shape == self._x.shape:
self._x = value
elif value.shape == self._x[self.mask].shape:
self._x = value
self._y = self._y[self.mask]
self._y_err = self._y_err[self.mask]
else:
raise ValueError('Shape mismatch in setting x')
@property
def y(self) -> np.ndarray:
return self._y
@y.setter
def y(self, value: ArrayLike):
"""
Set new y values. If the length of the new array equals that of original y including masked points
this is equivalent to a simple assignment. If the length equals the masked points then masked points are
discarded.
Args:
value: array of the same length as full or masked y
Returns:
"""
value = np.asarray(value, dtype=self._y.dtype)
if value.shape == self._y.shape:
self._y = value
elif value.shape == self._y[self.mask].shape:
self._y[self.mask] = value
self._x = self._x[self.mask]
self._y_err = self._y_err[self.mask]
else:
raise ValueError('Shape mismatch in setting y')
@property
def y_err(self) -> np.ndarray:
return self._y_err
@y_err.setter
def y_err(self, value: ArrayLike):
value = np.asarray(value, dtype=float)
if value.shape == self._y_err.shape:
self._y_err = value
elif value.shape == self._y_err[self.mask].shape:
self._y[self.mask] = value
else:
raise ValueError('Shape mismatch in setting y_err')
def __len__(self) -> int:
return len(self._x[self.mask])
def __repr__(self) -> str:
return f'Point: {self.name}'
def __getitem__(self, item: str) -> Any:
try:
return self.meta[item]
except KeyError:
KeyError(f'{item} not found')
def __setitem__(self, key: str, value: Any):
self.meta[key] = value
def __delitem__(self, key: str):
del self.meta[key]
def update(self, opts: dict) -> None:
"""
Update metadata
Args:
opts:
Returns:
"""
self.meta.update(opts)
def __lshift__(self, other):
if isinstance(other, Real):
_tempx = self._x - other
ret = type(self)(_tempx, self._y)
self.update(self.meta)
return ret
else:
raise TypeError(f'{other} is not a number')
def __rshift__(self, other):
return self.__lshift__(-other)
def __ilshift__(self, other):
if isinstance(other, Real):
self._x -= other
return self
else:
raise TypeError(f'{other} is not a number')
def __irshift__(self, other):
return self.__ilshift__(-other)
def __add__(self, other):
"""
Implements self + other
"""
if isinstance(other, Number):
_tempy = self._y + other
ret = type(self)(self._x, _tempy)
ret.update(self.meta)
return ret
elif isinstance(other, self.__class__):
if np.equal(self._x, other.x).all():
_tempy = self._y + other.y
ret = type(self)(self._x, _tempy)
ret.update(self.meta)
return ret
else:
raise ValueError('Objects have different x values')
def __radd__(self, other):
"""
Implements other + self
"""
return self.__add__(other)
def __iadd__(self, other):
"""
Implements self += other
"""
if isinstance(other, Number):
self._y += other
return self
elif isinstance(other, self.__class__):
# restrict to same x values
if np.equal(self._x, other.x).all():
self._y += other.y
else:
raise ValueError('Objects have different x values')
return self
def __neg__(self):
"""
Implements -self
"""
ret = type(self)(self._x, self._y * (-1))
ret.update(self.meta)
return ret
@property
def value(self):
return self.meta['value']
@property
def group(self):
return self.meta['group']
@group.setter
def group(self, value: str):
self.meta['group'] = str(value)
@value.setter
def value(self, value: float):
self.meta['value'] = float(value)
@property
def name(self):
return self.meta['name']
@name.setter
def name(self, value):
self.meta['name'] = str(value)
@property
def length(self):
return len(self._x)
def points(self, idx: Optional[list] = None, special: Optional[str] = None,
avg_range: Tuple[int, int] = (0, 0), avg_mode: str = 'mean',
pts: Optional[list] = None) -> List[tuple]:
"""
Return (x, y) values at specific positions.
Args:
idx (list, optional) : List of indexes to evaluate.
special (str, {'max', 'min', 'absmax', 'absmin'}, optional) :
Special positions to extract.
`max` : Selects position of the maximum of y, or real part if y is complex.
'min' : Selects position of the minimum of y, or real part if y is complex.
'absmax' : Selects position at the maximum of the absolute value of y.
'absmin' : Selects position at the minimum of the absolute value of y.
avg_range (tuple of int) :
Region for average of y values. Tuple (a, b) uses ``y[i-a:i+b+1]`` around index `i`. Default is (0, 0).
avg_mode (str {'mean', 'sum', 'integral'} , optional) :
Averaging type
`mean` : Arithmetic average
`sum`: Sum over y values
'integral`: Integration over range using Simpson's rule
pts (list, optional) :
If given, points will be appended.
Returns :
list of tuples (x, y, y_err)
"""
if (idx is None) and (special is None):
raise ValueError('Either `idx` or `special` must be given')
if avg_mode not in ['mean', 'sum', 'integral']:
raise ValueError(f'Parameter `avg_mode` is `mean`, `sum`, `integral`, not `{avg_mode}`' )
if pts is None:
pts = []
for x in idx:
if isinstance(x, tuple):
x_idx = np.argmin(np.abs(self._x[self.mask] - (x[0]+x[1])/2))
left_b = np.argmin(np.abs(self._x[self.mask] - x[0]))
right_b = np.argmin(np.abs(self._x[self.mask] - x[1]))
else:
x_idx = np.argmin(np.abs(self._x[self.mask]-x))
left_b = int(max(0, x_idx - avg_range[0]))
right_b = int(min(len(self), x_idx + avg_range[1] + 1))
if left_b < right_b:
pts.append([self._x[x_idx], *self._average(avg_mode, x_idx, left_b, right_b)])
else:
pts.append([self._x[x_idx], self._y[x_idx], self._y_err[x_idx]])
if special is not None:
if special not in ['max', 'min', 'absmax', 'absmin']:
raise ValueError('Parameter "special" must be "max", "min", "absmax", "absmin".')
if special == 'max':
x_idx = np.argmax(self._y.real[self.mask])
elif special == 'min':
x_idx = np.argmax(self._y.real[self.mask])
elif special == 'absmax':
x_idx = np.argmax(np.abs(self._y[self.mask]))
else:
x_idx = np.argmin(np.abs(self._y[self.mask]))
left_b = int(max(0, x_idx - avg_range[0]))
right_b = int(min(len(self), x_idx + avg_range[1] + 1))
pts.append([self._x[self.mask][x_idx], *self._average(avg_mode, x_idx, left_b, right_b)])
return pts
def _average(self, mode: str, idx, left: int, right: int) -> Tuple[float, float]:
if mode == 'mean':
y_mean = np.mean(self._y[self.mask][left:right].real)
y_err = np.linalg.norm(self._y_err[self.mask][left:right]) / (right - left)
elif mode == 'sum':
y_mean = np.sum(self._y[self.mask][left:right].real)
y_err = np.linalg.norm(self._y_err[self.mask][left:right])
elif mode == 'integral':
y_mean = simpson(self._y[self.mask][left:right].real, x=self._x[left:right])
y_err = np.linalg.norm(cumulative_trapezoid(self._y_err[self.mask][left:right].real, x=self._x[left:right]))
else:
y_mean = self._y[self.mask][idx].real
y_err = self._y_err[self.mask][idx]
return y_mean, y_err
def concatenate(self, other):
"""
Add the values of an object of the same
Args:
other: Dataset of same type
"""
if isinstance(other, self.__class__):
self._x = np.r_[self._x, other.x]
self._y = np.r_[self._y, other.y]
self.mask = np.r_[self.mask, other.mask]
self._y_err = np.r_[self._y_err, other.y_err]
else:
raise TypeError(f'{other} is of type {type(other)}')
return self
def integrate(self, log: bool = False, limits: Tuple[float, float] = None) -> PointLike:
new_data = self.copy()
if limits is not None:
new_data.cut(*limits)
if log:
new_data.y = cumulative_trapezoid(y=new_data.y, x=np.log(new_data.x), initial=0)
else:
new_data.y = cumulative_trapezoid(y=new_data.y, x=new_data.x, initial=0)
return new_data
def diff(self, log: bool = False, limits: Optional[Tuple[float, float]] = None) -> PointLike:
"""
Calculate first derivate :math:`dy/dx` or :math:`dy/d(\ln x)`.
Args:
log (bool) : Flag if derivative is with respect to ln(x) instead of x. Default is `False`.
limits (tuple, optional): Range `(xmin, xmax)` which is differentiated.
Returns:
Copy of set with new y values.
"""
new_data = self.copy()
if limits is not None:
new_data.cut(*limits)
if log:
new_data.y = np.gradient(new_data.y, np.log(new_data.x))
else:
new_data.y = np.gradient(new_data.y, new_data.x)
new_data.remove(np.argwhere(np.isnan(new_data.y)))
return new_data
def magnitude(self) -> PointLike:
new_data = self.copy()
new_data.y.real = np.abs(self.y)
if np.iscomplexobj(new_data.y):
new_data.y.imag = 0.
return new_data
def copy(self) -> PointLike:
"""
Copy the object
Returns:
A copy of the object
"""
return copy.deepcopy(self)
def get_values(self, idx: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Return values at a given index
Args:
idx (int) : Index value
Returns:
Tuple of (`x[idx]`, `y[idx]` , `y_err[idx]`)
"""
return self._x[idx], self._y[idx], self._y_err[idx]
def set_values(self, idx: int, value: Union[list, tuple, np.ndarray]) -> PointLike:
if not (isinstance(value, (list, tuple, np.ndarray)) and (len(value) in [2, 3])):
raise TypeError('Values should be list type of length 2 or 3')
self._x[idx] = value[0]
self._y[idx] = value[1]
if len(value) == 3:
self._y_err[idx] = value[2]
else:
self._y_err[idx] = 0
return self
def set_data(self,
x: Optional[np.ndarray] = None,
y: Optional[np.ndarray] = None,
y_err: Optional[np.ndarray] = None) -> PointLike:
if x is None:
x = self._x
if y is None:
y = self._y
if y_err is not None:
y_err = self._y_err
self._x, self._y, self._y_err, self.mask = self._prepare_xy(x, y, y_err)
return self
def append(self, x: ArrayLike, y: ArrayLike, y_err: Optional[ArrayLike] = None):
x, y, y_err, mask = self._prepare_xy(x, y, y_err)
self._x = np.r_[self._x, x]
self._y = np.r_[self._y, y]
self._y_err = np.r_[self._y_err, y_err]
self.mask = np.r_[self.mask, mask]
return self
def remove(self, idx):
self._x = np.delete(self._x, idx)
self._y = np.delete(self._y, idx)
self.mask = np.delete(self.mask, idx)
self._y_err = np.delete(self._y_err, idx)
return self
def cut(self, low_lim=None, high_lim=None):
"""
Cut
Args:
low_lim:
high_lim:
Returns:
"""
if low_lim is None and high_lim is None:
raise ValueError('Two arguments are needed')
if low_lim is None:
low_lim = np.min(self._x)
if high_lim is None:
high_lim = np.max(self._x)
_mask = np.ma.masked_inside(self._x, low_lim, high_lim).mask
self._x = self._x[_mask]
self._y = self._y[_mask]
self._y_err = self._y_err[_mask]
self.mask = self.mask[_mask]
return self
def shift(self, points: int) -> PointLike:
"""
Shift indexes of y values.
Move y values `points` indexes, remove invalid indexes and fill remaining with zeros
Negative `points` shift to the left, positive `points` shift to the right
Args:
points (int) : shift value
Example:
>>> pts = Points(x=[0., 1., 2., 3.], y=[1., 2., 3., 4.])
>>> pts.shift(2)
>>> pts.x
array([0., 1., 2., 3.])
>>> pts.y
array([3., 4., 0., 0.])
Returns:
The original object with shifted values.
"""
if points > 0:
self._y = np.roll(self._y, -int(points))
self._y[-int(points)-1:] = 0
else:
self._y = np.roll(self._y, int(points))
self._y[:-int(points)] = 0
self.meta['shift'] += points
return self
def sort(self, axis: str = 'x') -> PointLike:
"""
Sort data in increased order.
Args:
axis (str, {`x`, `y`}) : Axis that determines new order. Default is `x`
"""
if axis == 'x':
sort_order = np.argsort(self._x)
elif axis == 'y':
sort_order = np.argsort(self._y)
else:
raise ValueError('Unknown sort axis, use `x` or `y`.')
self._x = self._x[sort_order]
self._y = self._y[sort_order]
self._y_err = self._y_err[sort_order]
self.mask = self.mask[sort_order]
return self
def normalize(self, mode):
if mode == 'first':
scale = self._y[0].real
elif mode == 'last':
scale = self._y[-1].real
elif mode == 'max':
scale = np.max(self._y.real)
elif mode == 'maxabs':
scale = np.max(np.abs(self._y))
elif mode == 'area':
try:
from scipy.integrate import simpson
except ImportError:
from scipy.integrate import simps as simpson
scale = simpson(self._y.real, x=self._x)
else:
raise ValueError(f'Unknown normalize mode {mode}')
self._y /= scale
self._y_err /= scale
return scale
def toarray(self, err=True):
if np.count_nonzero(self._y_err) == 0 or not err:
return np.c_[self._x, self._y]
else:
return np.c_[self._x, self._y, self._y_err]
def tohdf(self, hdffile):
grp = hdffile
grp2_name = self.name
try:
dset = grp.create_dataset(grp2_name, shape=(len(self._x),),
dtype=np.dtype([('x', 'f'), ('y', 'f')]),
compression='gzip')
except RuntimeError:
dset = grp[grp2_name]
dset['x'] = self._x
dset['y'] = self._y
dset.attrs['TITLE'] = self.group + '/' + self.name
dset.attrs['damaris_type'] = 'MeasurementResult'
def savetxt(self, fname, err=False):
path = Path(fname)
if not path.parent.exists():
path.parent.mkdir(parents=True)
header = []
for k, v in self.meta.items():
header.append('%s: %s' % (k, str(v)))
header = '\n'.join(header)
if np.all(self.mask):
np.savetxt(path, self.toarray(err=err), header=header, fmt='%.10f')
else:
with path.open('w') as f:
f.write(header)
for i, l in enumerate(self.toarray(err=err)):
if self.mask[i]:
f.write('\t'.join(map(str, l.tolist())) + '\n')
else:
f.write('#' + '\t'.join(map(str, l.tolist())) + '\n')
def get_state(self) -> dict:
ret_dic = {'x': self._x.tolist(),
'y': self._y.tolist(),
'mask': (np.where(~self.mask)[0]).tolist()}
if np.all(self._y_err == 0):
ret_dic['y_err'] = 0.0
else:
ret_dic['y_err'] = self._y_err.tolist()
ret_dic.update(self.meta)
return ret_dic
@classmethod
def set_state(cls, state: dict):
_x = state.pop('x')
_y = state.pop('y')
_yerr = state.pop('y_err')
data = cls(x=_x, y=_y, y_err=_yerr)
_m = state.pop('mask')
data.mask[_m] = False
data.meta.update(state)
return data