5 Commits

11 changed files with 193 additions and 287 deletions

View File

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "mdevaluate" name = "mdevaluate"
version = "24.06" version = "24.02"
dependencies = [ dependencies = [
"mdanalysis", "mdanalysis",
"pandas", "pandas",

View File

@ -16,7 +16,7 @@ from . import reader
from . import system from . import system
from . import utils from . import utils
from . import extra from . import extra
from .logging import logger from .logging_util import logger
def open( def open(

View File

@ -5,7 +5,7 @@ from typing import Optional, Callable, Iterable
import numpy as np import numpy as np
from .checksum import checksum from .checksum import checksum
from .logging import logger from .logging_util import logger
autosave_directory: Optional[str] = None autosave_directory: Optional[str] = None
load_autosave_data = False load_autosave_data = False

View File

@ -1,9 +1,14 @@
import functools import functools
import hashlib import hashlib
from .logging import logger from .logging_util import logger
from types import ModuleType, FunctionType from types import ModuleType, FunctionType
import inspect import inspect
from typing import Iterable from typing import Iterable
import ast
import io
import tokenize
import re
import textwrap
import numpy as np import numpy as np
@ -28,16 +33,43 @@ def version(version_nr: int, calls: Iterable = ()):
return decorator return decorator
def strip_comments(s: str): def strip_comments(source: str) -> str:
"""Strips comment lines and docstring from Python source string.""" """Removes docstrings, comments, and irrelevant whitespace from Python source code."""
o = ""
in_docstring = False # Step 1: Remove docstrings using AST
for l in s.split("\n"): def remove_docstrings(node):
if l.strip().startswith(("#", '"', "'")) or in_docstring: if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef, ast.Module)):
in_docstring = l.strip().startswith(('"""', "'''")) + in_docstring == 1 if (doc := ast.get_docstring(node, clean=False)):
first_stmt = node.body[0]
if isinstance(first_stmt, ast.Expr) and isinstance(first_stmt.value, ast.Constant):
node.body.pop(0) # Remove the docstring entirely
for child in ast.iter_child_nodes(node):
remove_docstrings(child)
tree = ast.parse(textwrap.dedent(source))
remove_docstrings(tree)
code_without_docstrings = ast.unparse(tree)
# Step 2: Remove comments using tokenize
tokens = tokenize.generate_tokens(io.StringIO(code_without_docstrings).readline)
result = []
last_lineno = -1
last_col = 0
for toknum, tokval, (srow, scol), (erow, ecol), line in tokens:
if toknum == tokenize.COMMENT:
continue continue
o += l + "\n" if srow > last_lineno:
return o last_col = 0
if scol > last_col:
result.append(" " * (scol - last_col))
result.append(tokval)
last_lineno, last_col = erow, ecol
code_no_comments = ''.join(result)
# Step 3: Remove empty lines (whitespace-only or truly blank)
return "\n".join([line for line in code_no_comments.splitlines() if line.strip() != ""])
def checksum(*args, csum=None): def checksum(*args, csum=None):
@ -73,7 +105,9 @@ def checksum(*args, csum=None):
elif isinstance(arg, FunctionType): elif isinstance(arg, FunctionType):
csum.update(strip_comments(inspect.getsource(arg)).encode()) csum.update(strip_comments(inspect.getsource(arg)).encode())
c = inspect.getclosurevars(arg) c = inspect.getclosurevars(arg)
for v in {**c.nonlocals, **c.globals}.values(): merged = {**c.nonlocals, **c.globals}
for key in sorted(merged): # deterministic ordering
v = merged[key]
if v is not arg: if v is not arg:
checksum(v, csum=csum) checksum(v, csum=csum)
elif isinstance(arg, functools.partial): elif isinstance(arg, functools.partial):

View File

@ -1,6 +1,6 @@
from functools import partial, wraps from functools import partial, wraps
from copy import copy from copy import copy
from .logging import logger from .logging_util import logger
from typing import Optional, Callable, List, Tuple from typing import Optional, Callable, List, Tuple
import numpy as np import numpy as np

View File

@ -13,95 +13,9 @@ from .pbc import pbc_diff, pbc_points
from .coordinates import Coordinates, CoordinateFrame, displacements_without_drift from .coordinates import Coordinates, CoordinateFrame, displacements_without_drift
def _is_multi_selector(selection): def log_indices(first: int, last: int, num: int = 100) -> np.ndarray:
if len(selection) == 0: ls = np.logspace(0, np.log10(last - first + 1), num=num)
return False return np.unique(np.int_(ls) - 1 + first)
elif (
isinstance(selection[0], int)
or isinstance(selection[0], bool)
or isinstance(selection[0], np.integer)
or isinstance(selection[0], np.bool_)
):
return False
else:
for indices in selection:
if len(indices) == 0:
continue
elif (
isinstance(indices[0], int)
or isinstance(indices[0], bool)
or isinstance(indices[0], np.integer)
or isinstance(indices[0], np.bool_)
):
return True
else:
raise ValueError(
"selector has more than two dimensions or does not "
"contain int or bool types"
)
def _calc_correlation(
frames: Coordinates,
start_frame: CoordinateFrame,
function: Callable,
selection: np.ndarray,
shifted_idx: np.ndarray,
) -> np.ndarray:
if len(selection) == 0:
correlation = np.zeros(len(shifted_idx))
else:
start = start_frame[selection]
correlation = np.array(
[
function(start, frames[frame_index][selection])
for frame_index in shifted_idx
]
)
return correlation
def _calc_correlation_multi(
frames: Coordinates,
start_frame: CoordinateFrame,
function: Callable,
selection: np.ndarray,
shifted_idx: np.ndarray,
) -> np.ndarray:
correlations = np.zeros((len(selection), len(shifted_idx)))
for i, frame_index in enumerate(shifted_idx):
frame = frames[frame_index]
for j, current_selection in enumerate(selection):
if len(selection) == 0:
correlations[j, i] = 0
else:
correlations[j, i] = function(
start_frame[current_selection], frame[current_selection]
)
return correlations
def _average_correlation(result):
averaged_result = []
for n in range(result.shape[1]):
clean_result = []
for entry in result[:, n]:
if np.all(entry == 0):
continue
else:
clean_result.append(entry)
averaged_result.append(np.average(np.array(clean_result), axis=0))
return np.array(averaged_result)
def _average_correlation_multi(result):
clean_result = []
for entry in result:
if np.all(entry == 0):
continue
else:
clean_result.append(entry)
return np.average(np.array(clean_result), axis=0)
@autosave_data(2) @autosave_data(2)
@ -114,82 +28,113 @@ def shifted_correlation(
window: float = 0.5, window: float = 0.5,
average: bool = True, average: bool = True,
points: int = 100, points: int = 100,
) -> tuple[np.ndarray, np.ndarray]: ) -> (np.ndarray, np.ndarray):
"""Compute a time-dependent correlation function for a given trajectory.
To improve statistics, multiple (possibly overlapping) windows will be
layed over the whole trajectory and the correlation is computed for them separately.
The start frames of the windows are spaced linearly over the valid region of
the trajectory (skipping frames in the beginning given by skip parameter).
The points within each window are spaced logarithmically.
Only a certain subset of the given atoms may be selected for each window
individually using a selector function.
Note that this function is specifically optimized for multi selectors, which select
multiple selection sets per window, for which the correlation is to be computed
separately.
Arguments
---------
function:
The (correlation) function to evaluate.
Should be of the form (CoordinateFrame, CoordinateFrame) -> float
frames:
Trajectory to evaluate on
selector: (optional)
Selection function so select only certain selection sets for each start frame.
Should be of the form
(CoordinateFrame) -> list[A]
where A is something you can index an ndarray with.
For example a list of indices or a bool array.
Must return the same number of selection sets for every frame.
segments:
Number of start frames
skip:
Percentage of trajectory to skip from the start
window:
Length of each segment given as percentage of trajectory
average:
Whether to return averaged results.
See below for details on the returned ndarray.
points:
Number of points per segment
Returns
-------
times: ndarray
1d array of time differences to start frame
result: ndarray
2d ndarray of averaged (or non-averaged) correlations.
When average==True (default) the returned array will be of the shape (S, P)
where S is the number of selection sets and P the number of points per window.
For selection sets that where empty for all start frames all data points will be
zero.
When average==False the returned array will be of shape (W, S) with
dtype=object. The elements are either ndarrays of shape (P,) containing the
correlation data for the specific window and selection set or None if the
corresponding selection set was empty.
W is the number of segments (windows).
S and P are the same as for average==True.
""" """
Calculate the time series for a correlation function.
The times at which the correlation is calculated are determined by
a logarithmic distribution.
Args:
function: The function that should be correlated
frames: The coordinates of the simulation data
selector (opt.):
A function that returns the indices depending on
the staring frame for which particles the
correlation should be calculated.
segments (int, opt.):
The number of segments the time window will be
shifted
skip (float, opt.):
The fraction of the trajectory that will be skipped
at the beginning, if this is None the start index
of the frames slice will be used, which defaults
to 0.1.
window (float, opt.):
The fraction of the simulation the time series will
cover
average (bool, opt.):
If True, returns averaged correlation function
points (int, opt.):
The number of timeshifts for which the correlation
should be calculated
Returns:
tuple:
A list of length N that contains the timeshiftes of the frames at which
the time series was calculated and a numpy array of shape (segments, N)
that holds the (non-avaraged) correlation data
Example:
Calculating the mean square displacement of a coordinate object
named ``coords``:
>>> time, data = shifted_correlation(msd, coords)
"""
def get_correlation(
frames: CoordinateFrame,
start_frame: CoordinateFrame,
index: np.ndarray,
shifted_idx: np.ndarray,
) -> np.ndarray:
if len(index) == 0:
correlation = np.zeros(len(shifted_idx))
else:
start = frames[start_frame][index]
correlation = np.array(
[function(start, frames[frame][index]) for frame in shifted_idx]
)
return correlation
def apply_selector(
start_frame: CoordinateFrame,
frames: CoordinateFrame,
idx: np.ndarray,
selector: Optional[Callable] = None,
):
shifted_idx = idx + start_frame
if selector is None:
index = np.arange(len(frames[start_frame]))
return get_correlation(frames, start_frame, index, shifted_idx)
else:
index = selector(frames[start_frame])
if len(index) == 0:
return np.zeros(len(shifted_idx))
elif (
isinstance(index[0], int)
or isinstance(index[0], bool)
or isinstance(index[0], np.integer)
or isinstance(index[0], np.bool_)
):
return get_correlation(frames, start_frame, index, shifted_idx)
else:
correlations = []
for ind in index:
if len(ind) == 0:
correlations.append(np.zeros(len(shifted_idx)))
elif (
isinstance(ind[0], int)
or isinstance(ind[0], bool)
or isinstance(ind[0], np.integer)
or isinstance(ind[0], np.bool_)
):
correlations.append(
get_correlation(frames, start_frame, ind, shifted_idx)
)
else:
raise ValueError(
"selector has more than two dimensions or does not "
"contain int or bool types"
)
return correlations
if 1 - skip < window: if 1 - skip < window:
window = 1 - skip window = 1 - skip
start_frame_indices = np.unique( start_frames = np.unique(
np.linspace( np.linspace(
len(frames) * skip, len(frames) * skip,
len(frames) * (1 - window), len(frames) * (1 - window),
@ -199,44 +144,28 @@ def shifted_correlation(
) )
) )
num_frames_per_window = int(len(frames) * window) num_frames = int(len(frames) * window)
logspaced_indices = np.logspace(0, np.log10(num_frames_per_window + 1), num=points) ls = np.logspace(0, np.log10(num_frames + 1), num=points)
logspaced_indices = np.unique(np.int_(logspaced_indices) - 1) idx = np.unique(np.int_(ls) - 1)
logspaced_time = ( t = np.array([frames[i].time for i in idx]) - frames[0].time
np.array([frames[i].time for i in logspaced_indices]) - frames[0].time
)
if selector is None: result = np.array(
multi_selector = False [
else: apply_selector(start_frame, frames=frames, idx=idx, selector=selector)
selection = selector(frames[0]) for start_frame in start_frames
multi_selector = _is_multi_selector(selection) ]
result = []
for start_frame_index in start_frame_indices:
shifted_idx = logspaced_indices + start_frame_index
start_frame = frames[start_frame_index]
if selector is None:
selection = np.arange(len(start_frame))
else:
selection = selector(start_frame)
if multi_selector:
result_segment = _calc_correlation_multi(
frames, start_frame, function, selection, shifted_idx
) )
else:
result_segment = _calc_correlation(
frames, start_frame, function, selection, shifted_idx
)
result.append(result_segment)
result = np.array(result)
if average: if average:
if multi_selector: clean_result = []
result = _average_correlation_multi(result) for entry in result:
if np.all(entry == 0):
continue
else: else:
result = _average_correlation(result) clean_result.append(entry)
return logspaced_time, result result = np.array(clean_result)
result = np.average(result, axis=0)
return t, result
def msd( def msd(
@ -255,11 +184,11 @@ def msd(
if axis == "all": if axis == "all":
return (displacements**2).sum(axis=1).mean() return (displacements**2).sum(axis=1).mean()
elif axis == "xy" or axis == "yx": elif axis == "xy" or axis == "yx":
return (displacements[:, [0, 1]] ** 2).sum(axis=1).mean() return (displacements[:, [0, 1]]**2).sum(axis=1).mean()
elif axis == "xz" or axis == "zx": elif axis == "xz" or axis == "zx":
return (displacements[:, [0, 2]] ** 2).sum(axis=1).mean() return (displacements[:, [0, 2]]**2).sum(axis=1).mean()
elif axis == "yz" or axis == "zy": elif axis == "yz" or axis == "zy":
return (displacements[:, [1, 2]] ** 2).sum(axis=1).mean() return (displacements[:, [1, 2]]**2).sum(axis=1).mean()
elif axis == "x": elif axis == "x":
return (displacements[:, 0] ** 2).mean() return (displacements[:, 0] ** 2).mean()
elif axis == "y": elif axis == "y":
@ -289,13 +218,13 @@ def isf(
distance = (displacements**2).sum(axis=1) ** 0.5 distance = (displacements**2).sum(axis=1) ** 0.5
return np.sinc(distance * q / np.pi).mean() return np.sinc(distance * q / np.pi).mean()
elif axis == "xy" or axis == "yx": elif axis == "xy" or axis == "yx":
distance = (displacements[:, [0, 1]] ** 2).sum(axis=1) ** 0.5 distance = (displacements[:, [0, 1]]**2).sum(axis=1) ** 0.5
return np.real(jn(0, distance * q)).mean() return np.real(jn(0, distance * q)).mean()
elif axis == "xz" or axis == "zx": elif axis == "xz" or axis == "zx":
distance = (displacements[:, [0, 2]] ** 2).sum(axis=1) ** 0.5 distance = (displacements[:, [0, 2]]**2).sum(axis=1) ** 0.5
return np.real(jn(0, distance * q)).mean() return np.real(jn(0, distance * q)).mean()
elif axis == "yz" or axis == "zy": elif axis == "yz" or axis == "zy":
distance = (displacements[:, [1, 2]] ** 2).sum(axis=1) ** 0.5 distance = (displacements[:, [1, 2]]**2).sum(axis=1) ** 0.5
return np.real(jn(0, distance * q)).mean() return np.real(jn(0, distance * q)).mean()
elif axis == "x": elif axis == "x":
distance = np.abs(displacements[:, 0]) distance = np.abs(displacements[:, 0])
@ -349,11 +278,11 @@ def van_hove_self(
if axis == "all": if axis == "all":
delta_r = (vectors**2).sum(axis=1) ** 0.5 delta_r = (vectors**2).sum(axis=1) ** 0.5
elif axis == "xy" or axis == "yx": elif axis == "xy" or axis == "yx":
delta_r = (vectors[:, [0, 1]] ** 2).sum(axis=1) ** 0.5 delta_r = (vectors[:, [0, 1]]**2).sum(axis=1) ** 0.5
elif axis == "xz" or axis == "zx": elif axis == "xz" or axis == "zx":
delta_r = (vectors[:, [0, 2]] ** 2).sum(axis=1) ** 0.5 delta_r = (vectors[:, [0, 2]]**2).sum(axis=1) ** 0.5
elif axis == "yz" or axis == "zy": elif axis == "yz" or axis == "zy":
delta_r = (vectors[:, [1, 2]] ** 2).sum(axis=1) ** 0.5 delta_r = (vectors[:, [1, 2]]**2).sum(axis=1) ** 0.5
elif axis == "x": elif axis == "x":
delta_r = np.abs(vectors[:, 0]) delta_r = np.abs(vectors[:, 0])
elif axis == "y": elif axis == "y":
@ -502,9 +431,9 @@ def non_gaussian_parameter(
trajectory: Coordinates = None, trajectory: Coordinates = None,
axis: str = "all", axis: str = "all",
) -> float: ) -> float:
""" r"""
Calculate the non-Gaussian parameter. Calculate the non-Gaussian parameter.
..math: .. math:
\alpha_2 (t) = \alpha_2 (t) =
\frac{3}{5}\frac{\langle r_i^4(t)\rangle}{\langle r_i^2(t)\rangle^2} - 1 \frac{3}{5}\frac{\langle r_i^4(t)\rangle}{\langle r_i^2(t)\rangle^2} - 1
""" """
@ -516,13 +445,13 @@ def non_gaussian_parameter(
r = (vectors**2).sum(axis=1) r = (vectors**2).sum(axis=1)
dimensions = 3 dimensions = 3
elif axis == "xy" or axis == "yx": elif axis == "xy" or axis == "yx":
r = (vectors[:, [0, 1]] ** 2).sum(axis=1) r = (vectors[:, [0, 1]]**2).sum(axis=1)
dimensions = 2 dimensions = 2
elif axis == "xz" or axis == "zx": elif axis == "xz" or axis == "zx":
r = (vectors[:, [0, 2]] ** 2).sum(axis=1) r = (vectors[:, [0, 2]]**2).sum(axis=1)
dimensions = 2 dimensions = 2
elif axis == "yz" or axis == "zy": elif axis == "yz" or axis == "zy":
r = (vectors[:, [1, 2]] ** 2).sum(axis=1) r = (vectors[:, [1, 2]]**2).sum(axis=1)
dimensions = 2 dimensions = 2
elif axis == "x": elif axis == "x":
r = vectors[:, 0] ** 2 r = vectors[:, 0] ** 2

View File

@ -7,7 +7,7 @@ from numpy.typing import ArrayLike, NDArray
from itertools import product from itertools import product
from .logging import logger from .logging_util import logger
if TYPE_CHECKING: if TYPE_CHECKING:
from mdevaluate.coordinates import CoordinateFrame from mdevaluate.coordinates import CoordinateFrame

View File

@ -19,13 +19,13 @@ import MDAnalysis
from scipy import sparse from scipy import sparse
from .checksum import checksum from .checksum import checksum
from .logging import logger from .logging_util import logger
from . import atoms from . import atoms
from .coordinates import Coordinates from .coordinates import Coordinates
CSR_ATTRS = ("data", "indices", "indptr") CSR_ATTRS = ("data", "indices", "indptr")
NOJUMP_MAGIC = 2016 NOJUMP_MAGIC = 2016
Group_RE = re.compile("\[ ([-+\w]+) \]") Group_RE = re.compile(r"\[ ([-+\w]+) \]")
class NojumpError(Exception): class NojumpError(Exception):

View File

@ -14,7 +14,7 @@ from scipy.ndimage import uniform_filter1d
from scipy.interpolate import interp1d from scipy.interpolate import interp1d
from scipy.optimize import curve_fit from scipy.optimize import curve_fit
from .logging import logger from .logging_util import logger
from .functions import kww, kww_1e from .functions import kww, kww_1e
@ -177,7 +177,7 @@ def coherent_sum(
func: Callable[[ArrayLike, ArrayLike], float], func: Callable[[ArrayLike, ArrayLike], float],
coord_a: ArrayLike, coord_a: ArrayLike,
coord_b: ArrayLike, coord_b: ArrayLike,
) -> NDArray: ) -> float:
""" """
Perform a coherent sum over two arrays :math:`A, B`. Perform a coherent sum over two arrays :math:`A, B`.

View File

@ -1,57 +0,0 @@
import os
import pytest
import mdevaluate
from mdevaluate import correlation
import numpy as np
@pytest.fixture
def trajectory(request):
return mdevaluate.open(os.path.join(os.path.dirname(__file__), "data/water"))
def test_shifted_correlation(trajectory):
test_array = np.array([100, 82, 65, 49, 39, 29, 20, 13, 7])
OW = trajectory.subset(atom_name="OW")
t, result = correlation.shifted_correlation(
correlation.isf, OW, segments=10, skip=0.1, points=10
)
assert (np.array(result * 100, dtype=int) == test_array).all()
def test_shifted_correlation_no_average(trajectory):
t, result = correlation.shifted_correlation(
correlation.isf, trajectory, segments=10, skip=0.1, points=5, average=False
)
assert result.shape == (10, 5)
def test_shifted_correlation_selector(trajectory):
test_array = np.array([100, 82, 64, 48, 37, 28, 19, 11, 5])
def selector(frame):
index = np.argwhere((frame[:, 0] >= 0) * (frame[:, 0] < 1))
return index.flatten()
OW = trajectory.subset(atom_name="OW")
t, result = correlation.shifted_correlation(
correlation.isf, OW, segments=10, skip=0.1, points=10, selector=selector
)
assert (np.array(result * 100, dtype=int) == test_array).all()
def test_shifted_correlation_multi_selector(trajectory):
def selector(frame):
indices = []
for i in range(3):
x = frame[:, 0].flatten()
index = np.argwhere((x >= i) * (x < i + 1))
indices.append(index.flatten())
return indices
OW = trajectory.subset(atom_name="OW")
t, result = correlation.shifted_correlation(
correlation.isf, OW, segments=10, skip=0.1, points=10, selector=selector
)
assert result.shape == (3, 9)