1
0
forked from IPKM/nmreval
nmreval/src/nmreval/math/smooth.py

191 lines
4.5 KiB
Python

import numpy as np
import numpy.polynomial.polynomial as poly
from scipy import signal as signal
__all__ = [
'smooth',
'loess',
'savgol',
'running_max',
'running_min',
'running_var',
'running_std',
'running_median',
'running_mean',
'running_sum',
]
def loess(x, y, window_size: int, it: int = 0, deg: int = 2):
# ULTRA LANGSAM !!!
it = max(it, 0)
if window_size >= len(x):
raise ValueError('Window is too large for data')
if deg not in [1, 2]:
raise ValueError('Loess supports only degrees 1 and 2.')
if np.iscomplexobj(y):
_y = np.array([y.real, y.imag])
else:
_y = np.array([y])
new_y = np.zeros_like(_y)
for i, x_i in enumerate(x):
dist = np.abs(x - x_i)
idx = np.argsort(dist)[:window_size]
x_w = x[idx]
y_w = y[idx]
dist = dist[idx]
weight = (1 - np.abs(dist / np.max(dist))**3)**3
p = poly.polyfit(x_w, y_w, deg, w=weight)
for _ in range(it):
y_fit = poly.polyval(x_w, p)
resid = np.abs(y_fit - y_w)
mad = np.median(resid)
ww = (resid / 6. / mad)**2
ww = np.clip(ww, 0, 1)
if np.all(ww > 0.97):
break
robust_wt = (1 - np.clip(ww, 0, 1))**2
p = poly.polyfit(x_w, y_w, deg, w=weight*robust_wt)
new_y[:, i] = poly.polyval(x_i, p)
if new_y.shape[0] == 2:
new_y = new_y[0] + 1j * new_y[1]
new_y = new_y.reshape(x.shape)
return new_y
def savgol(x, y, window_size: int, deg: int = 2, mode: str = 'mirror'):
if window_size % 2 == 0:
print('number of points must be odd, increased by one')
window_size += 1
if window_size > len(x):
raise ValueError('Window is bigger than size of x')
if deg >= window_size:
raise ValueError(f'Polynomial order {deg} is larger than window size {window_size}')
new_y = np.zeros_like(y)
new_y += signal.savgol_filter(y.real, window_size, deg, mode=mode)
if np.iscomplexobj(y):
new_y += 1j * signal.savgol_filter(y.imag, window_size, deg, mode=mode)
return new_y
def running_mean(x, y, window_size: int):
return _running_func(np.nanmean, x, y, window_size)
def running_median(x, y, window_size: int):
return _running_func(np.nanmedian, x, y, window_size)
def running_std(x, y, window_size: int):
return _running_func(np.nanstd, x, y, window_size)
def running_var(x, y, window_size: int):
return _running_func(np.nanvar, x, y, window_size)
def running_max(x, y, window_size):
return _running_func(np.nanmax, x, y, window_size)
def running_min(x, y, window_size):
return _running_func(np.nanmin, x, y, window_size)
def running_sum(x, y, window_size):
return _running_func(np.nansum, x, y, window_size)
def _running_func(func, x, y, window_size):
x = np.asarray(x)
y = np.asarray(y)
if x.ndim != 1 or y.ndim != 1:
raise ValueError('Only 1-dim arrays can be moved')
if not (0 < window_size < y.shape[-1]):
raise ValueError('Window should be positive and smaller then arrays')
new_x = np.mean(_moving_window(x, window_size), axis=-1)
new_y = func(_moving_window(y, window_size), axis=-1)
return new_x, new_y
def _moving_window(arr, nn):
shapes = arr.shape[:-1] + (arr.shape[-1]-nn+1, nn) # create shape (..., len(arr)-n+1, n)
strides = arr.strides + (arr.strides[-1], ) # extend strides
return np.lib.stride_tricks.as_strided(arr, shapes, strides)
_funcs = {
'loess': loess,
'savgol': savgol,
'mean': running_mean,
'median': running_median,
'std': running_std,
'var': running_var,
'max': running_max,
'min': running_min,
'sum': running_sum,
}
def smooth(
data: 'Data',
window_size: int,
mode: str = 'mean',
logx: bool = False,
logy: bool = False,
**kwargs
):
try:
func = _funcs[mode]
except KeyError:
raise ValueError(f'Unknown mode {mode}')
_x, _y = data.x, data.y
if logx:
_x = np.log10(_x)
if logy:
_y = np.log10(_y)
new_values = func(_x, _y, window_size, **kwargs)
if isinstance(new_values, tuple):
new_x, new_y = new_values
else:
new_x = _x
new_y = new_values
if logx:
new_x = 10**new_x
if logy:
new_y = 10**new_y
new_data = data.copy()
new_data.set_data(x=new_x, y=new_y, y_err=np.zeros_like(new_y))
return new_data