18 Commits

Author SHA1 Message Date
robrobo d97863e356 added fake _xdr attributes in case the trajectory was constructed by other means than reading a gromacs xtc file 2026-04-11 17:22:39 +02:00
robrobo ddf7d42c27 extended the has checksum case for edge cases that caused problems 2026-04-11 17:21:57 +02:00
robrobo 715deea1e0 norm in quick fit 2025-10-16 17:44:30 +02:00
robrobo 6b7641f152 stop reading many frames to get times for shifted_correlation 2025-09-05 14:47:53 +02:00
robrobo cd7097ad46 remove print 2025-08-14 16:43:56 +02:00
robrobo a0ca2d8657 Merge branch 'fix/nojump_and_npt_caching' into feature/compatibility_robin 2025-08-14 16:35:12 +02:00
robrobo 9ff3badab1 have to wrap delta with np.array to make sure it is ndarray and result stays CoordinateFrame 2025-08-14 16:33:37 +02:00
robrobo 0f47475f22 quick version of center_of_masses for equal weights 2025-08-14 15:20:16 +02:00
robrobo f6ff7606ad decorator for autosave_data now consumes and autosave_dir_overwrite. this can enable autosave by itself and takes precedence over the enable(dir) value. provided the same way as the description 2025-08-09 18:43:50 +02:00
robrobo 96c624efee Merge branch 'fix/nojump_and_npt_caching' into feature/compatibility_robin 2025-08-09 16:11:42 +02:00
robrobo 492098fe01 apply selection and scaling with current box after delta in jumps has been cached or calculated directly. this should fix using nojump on NPT simulations 2025-08-09 16:11:24 +02:00
robrobo accb43d7e6 Merge branch 'refactor_logging' into feature/compatibility_robin 2025-08-09 13:58:15 +02:00
robrobo 07b14a6cd6 Merge branch 'main' into feature/compatibility_robin 2025-08-09 13:57:29 +02:00
robrobo 9f6af2af11 corrected spelling 2025-07-17 20:21:43 +02:00
robrobo 0ffce2f17a added full_output option to non gaussian parameter so that correct statistics can be done afterwards 2025-07-17 18:45:11 +02:00
robrobo 0eff84910b simplified quick non gaussian fit function so that it is more robust 2025-07-17 18:44:20 +02:00
robrobo dae2d6ed95 created wrappers around the isf to allow for access of more raw data and the variance for accurate kai4 calculation 2025-07-17 12:50:49 +02:00
robrobo ec4094cd92 added quick estimation for peak tau and height of non Gaussian 2025-07-17 12:34:43 +02:00
7 changed files with 168 additions and 41 deletions
+3 -1
View File
@@ -166,8 +166,10 @@ def autosave_data(
@functools.wraps(function)
def autosave(*args, **kwargs):
description = kwargs.pop("description", "")
autosave_dir_overwrite = kwargs.pop("autosave_dir_overwrite", None)
autosave_dir = autosave_dir_overwrite if autosave_dir_overwrite is not None else autosave_directory
autoload = kwargs.pop("autoload", True) and load_autosave_data
if autosave_directory is not None:
if autosave_dir is not None:
relevant_args = list(args[:nargs])
if kwargs_keys is not None:
for key in [*posargs_keys, *kwargs_keys]:
+14 -2
View File
@@ -102,8 +102,20 @@ def checksum(*args, csum=None, _seen=None):
_seen.add(obj_id)
if hasattr(arg, "__checksum__"):
logger.debug("Checksum via __checksum__: %s", str(arg))
csum.update(str(arg.__checksum__()).encode())
method = getattr(arg, "__checksum__")
if callable(method) and not isinstance(arg, type):
logger.debug("Checksum via __checksum__: %s", str(arg))
csum.update(str(method()).encode())
elif isinstance(arg, type):
try:
src = inspect.getsource(arg)
csum.update(strip_comments(src).encode())
logger.debug("Checksum via class source for %s", arg.__name__)
except (OSError, TypeError):
csum.update(arg.__name__.encode())
logger.debug("Checksum via class name for %s", arg.__name__)
else:
logger.debug("Skipping unbound __checksum__ on %s", type(arg))
elif isinstance(arg, bytes):
csum.update(arg)
elif isinstance(arg, str):
+28
View File
@@ -434,6 +434,34 @@ def center_of_masses(
]
).T[mask]
return np.array(positions)
@map_coordinates
def center_of_atoms(
frame: CoordinateFrame, atom_indices=None, shear: bool = False
) -> NDArray:
if atom_indices is None:
atom_indices = list(range(len(frame)))
res_ids = frame.residue_ids[atom_indices]
if shear:
coords = frame[atom_indices]
box = frame.box
sort_ind = res_ids.argsort(kind="stable")
i = np.concatenate([[0], np.where(np.diff(res_ids[sort_ind]) > 0)[0] + 1])
coms = coords[sort_ind[i]][res_ids - min(res_ids)]
cor = pbc_diff(coords, coms, box)
coords = coms + cor
else:
coords = frame.whole[atom_indices]
mask = np.bincount(res_ids)[1:] != 0
positions = np.array(
[
np.bincount(res_ids, weights=c)[1:]
/ np.bincount(res_ids)[1:]
for c in coords.T
]
).T[mask]
return np.array(positions)
@map_coordinates
+64 -18
View File
@@ -18,7 +18,7 @@ def log_indices(first: int, last: int, num: int = 100) -> np.ndarray:
return np.unique(np.int_(ls) - 1 + first)
@autosave_data(2)
@autosave_data(nargs=2, kwargs_keys=('selector', 'segments', 'skip', 'window', 'average', 'points',), version=1.0)
def shifted_correlation(
function: Callable,
frames: Coordinates,
@@ -147,7 +147,8 @@ def shifted_correlation(
num_frames = int(len(frames) * window)
ls = np.logspace(0, np.log10(num_frames + 1), num=points)
idx = np.unique(np.int_(ls) - 1)
t = np.array([frames[i].time for i in idx]) - frames[0].time
dt = round(frames[1].time - frames[0].time, 6) # round to avoid bad floats
t = idx * dt
result = np.array(
[
@@ -199,7 +200,7 @@ def msd(
raise ValueError('Parameter axis has to be ether "all", "x", "y", or "z"!')
def isf(
def isf_raw(
start_frame: CoordinateFrame,
end_frame: CoordinateFrame,
q: float = 22.7,
@@ -216,29 +217,59 @@ def isf(
displacements = displacements_without_drift(start_frame, end_frame, trajectory)
if axis == "all":
distance = (displacements**2).sum(axis=1) ** 0.5
return np.sinc(distance * q / np.pi).mean()
return np.sinc(distance * q / np.pi)
elif axis == "xy" or axis == "yx":
distance = (displacements[:, [0, 1]]**2).sum(axis=1) ** 0.5
return np.real(jn(0, distance * q)).mean()
return np.real(jn(0, distance * q))
elif axis == "xz" or axis == "zx":
distance = (displacements[:, [0, 2]]**2).sum(axis=1) ** 0.5
return np.real(jn(0, distance * q)).mean()
return np.real(jn(0, distance * q))
elif axis == "yz" or axis == "zy":
distance = (displacements[:, [1, 2]]**2).sum(axis=1) ** 0.5
return np.real(jn(0, distance * q)).mean()
return np.real(jn(0, distance * q))
elif axis == "x":
distance = np.abs(displacements[:, 0])
return np.mean(np.cos(np.abs(q * distance)))
return np.cos(np.abs(q * distance))
elif axis == "y":
distance = np.abs(displacements[:, 1])
return np.mean(np.cos(np.abs(q * distance)))
return np.cos(np.abs(q * distance))
elif axis == "z":
distance = np.abs(displacements[:, 2])
return np.mean(np.cos(np.abs(q * distance)))
return np.cos(np.abs(q * distance))
else:
raise ValueError('Parameter axis has to be ether "all", "x", "y", or "z"!')
def isf(
start_frame: CoordinateFrame,
end_frame: CoordinateFrame,
q: float = 22.7,
trajectory: Coordinates = None,
axis: str = "all",
) -> float:
"""
Incoherent intermediate scattering function averaged over all particles.
See isf_raw for details.
"""
return isf_raw(start_frame, end_frame, q=q, trajectory=trajectory, axis=axis).mean()
def isf_mean_var(
start_frame: CoordinateFrame,
end_frame: CoordinateFrame,
q: float = 22.7,
trajectory: Coordinates = None,
axis: str = "all",
) -> float:
"""
Incoherent intermediate scattering function averaged over all particles and the
variance.
See isf_raw for details.
"""
values = isf_raw(start_frame, end_frame, q=q, trajectory=trajectory, axis=axis)
return values.mean(), values.var()
def rotational_autocorrelation(
start_frame: CoordinateFrame, end_frame: CoordinateFrame, order: int = 2
) -> float:
@@ -430,6 +461,7 @@ def non_gaussian_parameter(
end_frame: CoordinateFrame,
trajectory: Coordinates = None,
axis: str = "all",
full_output = False,
) -> float:
r"""
Calculate the non-Gaussian parameter.
@@ -442,27 +474,41 @@ def non_gaussian_parameter(
else:
vectors = displacements_without_drift(start_frame, end_frame, trajectory)
if axis == "all":
r = (vectors**2).sum(axis=1)
r2 = (vectors**2).sum(axis=1)
dimensions = 3
elif axis == "xy" or axis == "yx":
r = (vectors[:, [0, 1]]**2).sum(axis=1)
r2 = (vectors[:, [0, 1]]**2).sum(axis=1)
dimensions = 2
elif axis == "xz" or axis == "zx":
r = (vectors[:, [0, 2]]**2).sum(axis=1)
r2 = (vectors[:, [0, 2]]**2).sum(axis=1)
dimensions = 2
elif axis == "yz" or axis == "zy":
r = (vectors[:, [1, 2]]**2).sum(axis=1)
r2 = (vectors[:, [1, 2]]**2).sum(axis=1)
dimensions = 2
elif axis == "x":
r = vectors[:, 0] ** 2
r2 = vectors[:, 0] ** 2
dimensions = 1
elif axis == "y":
r = vectors[:, 1] ** 2
r2 = vectors[:, 1] ** 2
dimensions = 1
elif axis == "z":
r = vectors[:, 2] ** 2
r2 = vectors[:, 2] ** 2
dimensions = 1
else:
raise ValueError('Parameter axis has to be ether "all", "x", "y", or "z"!')
m2 = np.mean(r2)
m4 = np.mean(r2**2)
if m2 == 0.0:
if full_output:
return 0.0, 0.0, 0.0
else:
return 0.0
alpha_2 = (m4 / ((1 + 2 / dimensions) * m2**2)) - 1
if full_output:
return alpha_2, m2, m4
else:
return alpha_2
return (np.mean(r**2) / ((1 + 2 / dimensions) * (np.mean(r) ** 2))) - 1
+9 -20
View File
@@ -149,32 +149,21 @@ def nojump(frame: CoordinateFrame, usecache: bool = True) -> CoordinateFrame:
i0 = 0
delta = 0
delta = (
delta
+ np.array(
np.vstack(
[m[i0 : abstep + 1].sum(axis=0) for m in reader.nojump_matrices]
).T
)
@ frame.box
)
delta = (delta
+ np.vstack(
[m[i0 : abstep + 1].sum(axis=0) for m in reader.nojump_matrices]
).T)
reader._nojump_cache[abstep] = delta
while len(reader._nojump_cache) > NOJUMP_CACHESIZE:
reader._nojump_cache.popitem(last=False)
delta = delta[selection, :]
else:
delta = (
np.array(
np.vstack(
[
m[: frame.step + 1, selection].sum(axis=0)
for m in reader.nojump_matrices
]
delta = np.vstack(
[m[: frame.step + 1, selection].sum(axis=0) for m in reader.nojump_matrices]
).T
)
@ frame.box
)
delta = delta[selection, :]
delta = np.array(delta @ frame.box)
return frame - delta
+14
View File
@@ -23,6 +23,8 @@ from .logging_util import logger
from . import atoms
from .coordinates import Coordinates
from unittest.mock import MagicMock
CSR_ATTRS = ("data", "indices", "indptr")
NOJUMP_MAGIC = 2016
Group_RE = re.compile(r"\[ ([-+\w]+) \]")
@@ -240,7 +242,18 @@ def generate_nojump_matrices(trajectory: Coordinates):
save_nojump_matrices(trajectory.frames)
def _ensure_xdr(reader: BaseReader):
"""Patch missing _xdr attribute for non-XDR readers (e.g. LAMMPS DumpReader)
with a stable mock so checksums are consistent across runs."""
if not hasattr(reader.rd, '_xdr'):
mock_xdr = MagicMock()
mock_xdr.offsets = np.arange(len(reader))
print(f"Adding mock _xdr attribute for to reader of length {len(reader)}.")
reader.rd._xdr = mock_xdr
def save_nojump_matrices(reader: BaseReader, matrices: npt.ArrayLike = None):
_ensure_xdr(reader)
if matrices is None:
matrices = reader.nojump_matrices
data = {"checksum": checksum(NOJUMP_MAGIC, checksum(reader))}
@@ -253,6 +266,7 @@ def save_nojump_matrices(reader: BaseReader, matrices: npt.ArrayLike = None):
def load_nojump_matrices(reader: BaseReader):
_ensure_xdr(reader)
zipname = nojump_load_filename(reader)
try:
data = np.load(zipname, allow_pickle=True)
+36
View File
@@ -334,6 +334,11 @@ def quick1etau(t: ArrayLike, C: ArrayLike, n: int = 7) -> float:
C is C(t) the correlation function
n is the minimum number of points around 1/e required
"""
# norm, if t=0 provided
if t[0] == 0:
C /= C[0]
C, t = C[t>0], t[t>0] # make sure t=0 is dropped
# first rough estimate, the closest time. This is returned if the interpolation fails!
tau_est = t[np.argmin(np.fabs(C - np.exp(-1)))]
# reduce the data to points around 1/e
@@ -357,6 +362,37 @@ def quick1etau(t: ArrayLike, C: ArrayLike, n: int = 7) -> float:
return tau_est
def quicknongaussfit(t, C, width=2):
"""
Estimates the time and height of the peak in the non-Gaussian function.
C is C(t) the correlation function
"""
def ffunc(t,y0,A_main,log_tau_main,sig_main):
main_peak = A_main*np.exp(-(t - log_tau_main)**2 / (2 * sig_main**2))
return y0 + main_peak
# first rough estimate, the closest time. This is returned if the interpolation fails!
tau_est = t[np.argmax(C)]
nG_max = np.amax(C)
try:
with np.errstate(invalid='ignore'):
corr = C[t > 0]
time = np.log10(t[t > 0])
tau = time[np.argmax(corr)]
mask = (time>tau-width/2) & (time<tau+width/2)
time = time[mask] ; corr = corr[mask]
nG_min = C[t > 0].min()
guess = [nG_min, nG_max-nG_min, tau, 0.6]
popt = curve_fit(ffunc, time, corr, p0=guess, maxfev=10000)[0]
tau_est = 10**popt[-2]
nG_max = popt[0] + popt[1]
except:
pass
if np.isnan(tau_est):
tau_est = np.inf
return tau_est, nG_max
def susceptibility(
time: NDArray, correlation: NDArray, **kwargs
) -> tuple[NDArray, NDArray]: