8 Commits

10 changed files with 161 additions and 45 deletions

View File

@@ -16,7 +16,7 @@ from . import reader
from . import system from . import system
from . import utils from . import utils
from . import extra from . import extra
from .logging import logger from .logging_util import logger
def open( def open(

View File

@@ -5,7 +5,7 @@ from typing import Optional, Callable, Iterable
import numpy as np import numpy as np
from .checksum import checksum from .checksum import checksum
from .logging import logger from .logging_util import logger
autosave_directory: Optional[str] = None autosave_directory: Optional[str] = None
load_autosave_data = False load_autosave_data = False

View File

@@ -1,9 +1,14 @@
import functools import functools
import hashlib import hashlib
from .logging import logger from .logging_util import logger
from types import ModuleType, FunctionType from types import ModuleType, FunctionType
import inspect import inspect
from typing import Iterable from typing import Iterable
import ast
import io
import tokenize
import re
import textwrap
import numpy as np import numpy as np
@@ -28,19 +33,46 @@ def version(version_nr: int, calls: Iterable = ()):
return decorator return decorator
def strip_comments(s: str): def strip_comments(source: str) -> str:
"""Strips comment lines and docstring from Python source string.""" """Removes docstrings, comments, and irrelevant whitespace from Python source code."""
o = ""
in_docstring = False # Step 1: Remove docstrings using AST
for l in s.split("\n"): def remove_docstrings(node):
if l.strip().startswith(("#", '"', "'")) or in_docstring: if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef, ast.Module)):
in_docstring = l.strip().startswith(('"""', "'''")) + in_docstring == 1 if (doc := ast.get_docstring(node, clean=False)):
first_stmt = node.body[0]
if isinstance(first_stmt, ast.Expr) and isinstance(first_stmt.value, ast.Constant):
node.body.pop(0) # Remove the docstring entirely
for child in ast.iter_child_nodes(node):
remove_docstrings(child)
tree = ast.parse(textwrap.dedent(source))
remove_docstrings(tree)
code_without_docstrings = ast.unparse(tree)
# Step 2: Remove comments using tokenize
tokens = tokenize.generate_tokens(io.StringIO(code_without_docstrings).readline)
result = []
last_lineno = -1
last_col = 0
for toknum, tokval, (srow, scol), (erow, ecol), line in tokens:
if toknum == tokenize.COMMENT:
continue continue
o += l + "\n" if srow > last_lineno:
return o last_col = 0
if scol > last_col:
result.append(" " * (scol - last_col))
result.append(tokval)
last_lineno, last_col = erow, ecol
code_no_comments = ''.join(result)
# Step 3: Remove empty lines (whitespace-only or truly blank)
return "\n".join([line for line in code_no_comments.splitlines() if line.strip() != ""])
def checksum(*args, csum=None): def checksum(*args, csum=None, _seen=None):
""" """
Calculate a checksum of any object, by sha1 hash. Calculate a checksum of any object, by sha1 hash.
@@ -60,7 +92,15 @@ def checksum(*args, csum=None):
csum = hashlib.sha1() csum = hashlib.sha1()
csum.update(str(SALT).encode()) csum.update(str(SALT).encode())
if _seen is None:
_seen = set()
for arg in args: for arg in args:
obj_id = id(arg)
if obj_id in _seen:
continue
_seen.add(obj_id)
if hasattr(arg, "__checksum__"): if hasattr(arg, "__checksum__"):
logger.debug("Checksum via __checksum__: %s", str(arg)) logger.debug("Checksum via __checksum__: %s", str(arg))
csum.update(str(arg.__checksum__()).encode()) csum.update(str(arg.__checksum__()).encode())
@@ -77,15 +117,15 @@ def checksum(*args, csum=None):
for key in sorted(merged): # deterministic ordering for key in sorted(merged): # deterministic ordering
v = merged[key] v = merged[key]
if v is not arg: if v is not arg:
checksum(v, csum=csum) checksum(v, csum=csum, _seen=_seen)
elif isinstance(arg, functools.partial): elif isinstance(arg, functools.partial):
logger.debug("Checksum via partial for %s", str(arg)) logger.debug("Checksum via partial for %s", str(arg))
checksum(arg.func, csum=csum) checksum(arg.func, csum=csum, _seen=_seen)
for x in arg.args: for x in arg.args:
checksum(x, csum=csum) checksum(x, csum=csum, _seen=_seen)
for k in sorted(arg.keywords.keys()): for k in sorted(arg.keywords.keys()):
csum.update(k.encode()) csum.update(k.encode())
checksum(arg.keywords[k], csum=csum) checksum(arg.keywords[k], csum=csum, _seen=_seen)
elif isinstance(arg, np.ndarray): elif isinstance(arg, np.ndarray):
csum.update(arg.tobytes()) csum.update(arg.tobytes())
else: else:

View File

@@ -1,6 +1,6 @@
from functools import partial, wraps from functools import partial, wraps
from copy import copy from copy import copy
from .logging import logger from .logging_util import logger
from typing import Optional, Callable, List, Tuple from typing import Optional, Callable, List, Tuple
import numpy as np import numpy as np

View File

@@ -18,7 +18,7 @@ def log_indices(first: int, last: int, num: int = 100) -> np.ndarray:
return np.unique(np.int_(ls) - 1 + first) return np.unique(np.int_(ls) - 1 + first)
@autosave_data(2) @autosave_data(nargs=2, kwargs_keys=('selector', 'segments', 'skip', 'window', 'average', 'points',), version=1.0)
def shifted_correlation( def shifted_correlation(
function: Callable, function: Callable,
frames: Coordinates, frames: Coordinates,
@@ -199,7 +199,7 @@ def msd(
raise ValueError('Parameter axis has to be ether "all", "x", "y", or "z"!') raise ValueError('Parameter axis has to be ether "all", "x", "y", or "z"!')
def isf( def isf_raw(
start_frame: CoordinateFrame, start_frame: CoordinateFrame,
end_frame: CoordinateFrame, end_frame: CoordinateFrame,
q: float = 22.7, q: float = 22.7,
@@ -216,29 +216,59 @@ def isf(
displacements = displacements_without_drift(start_frame, end_frame, trajectory) displacements = displacements_without_drift(start_frame, end_frame, trajectory)
if axis == "all": if axis == "all":
distance = (displacements**2).sum(axis=1) ** 0.5 distance = (displacements**2).sum(axis=1) ** 0.5
return np.sinc(distance * q / np.pi).mean() return np.sinc(distance * q / np.pi)
elif axis == "xy" or axis == "yx": elif axis == "xy" or axis == "yx":
distance = (displacements[:, [0, 1]]**2).sum(axis=1) ** 0.5 distance = (displacements[:, [0, 1]]**2).sum(axis=1) ** 0.5
return np.real(jn(0, distance * q)).mean() return np.real(jn(0, distance * q))
elif axis == "xz" or axis == "zx": elif axis == "xz" or axis == "zx":
distance = (displacements[:, [0, 2]]**2).sum(axis=1) ** 0.5 distance = (displacements[:, [0, 2]]**2).sum(axis=1) ** 0.5
return np.real(jn(0, distance * q)).mean() return np.real(jn(0, distance * q))
elif axis == "yz" or axis == "zy": elif axis == "yz" or axis == "zy":
distance = (displacements[:, [1, 2]]**2).sum(axis=1) ** 0.5 distance = (displacements[:, [1, 2]]**2).sum(axis=1) ** 0.5
return np.real(jn(0, distance * q)).mean() return np.real(jn(0, distance * q))
elif axis == "x": elif axis == "x":
distance = np.abs(displacements[:, 0]) distance = np.abs(displacements[:, 0])
return np.mean(np.cos(np.abs(q * distance))) return np.cos(np.abs(q * distance))
elif axis == "y": elif axis == "y":
distance = np.abs(displacements[:, 1]) distance = np.abs(displacements[:, 1])
return np.mean(np.cos(np.abs(q * distance))) return np.cos(np.abs(q * distance))
elif axis == "z": elif axis == "z":
distance = np.abs(displacements[:, 2]) distance = np.abs(displacements[:, 2])
return np.mean(np.cos(np.abs(q * distance))) return np.cos(np.abs(q * distance))
else: else:
raise ValueError('Parameter axis has to be ether "all", "x", "y", or "z"!') raise ValueError('Parameter axis has to be ether "all", "x", "y", or "z"!')
def isf(
start_frame: CoordinateFrame,
end_frame: CoordinateFrame,
q: float = 22.7,
trajectory: Coordinates = None,
axis: str = "all",
) -> float:
"""
Incoherent intermediate scattering function averaged over all particles.
See isf_raw for details.
"""
return isf_raw(start_frame, end_frame, q=q, trajectory=trajectory, axis=axis).mean()
def isf_mean_var(
start_frame: CoordinateFrame,
end_frame: CoordinateFrame,
q: float = 22.7,
trajectory: Coordinates = None,
axis: str = "all",
) -> float:
"""
Incoherent intermediate scattering function averaged over all particles and the
variance.
See isf_raw for details.
"""
values = isf_raw(start_frame, end_frame, q=q, trajectory=trajectory, axis=axis)
return values.mean(), values.var()
def rotational_autocorrelation( def rotational_autocorrelation(
start_frame: CoordinateFrame, end_frame: CoordinateFrame, order: int = 2 start_frame: CoordinateFrame, end_frame: CoordinateFrame, order: int = 2
) -> float: ) -> float:
@@ -430,10 +460,11 @@ def non_gaussian_parameter(
end_frame: CoordinateFrame, end_frame: CoordinateFrame,
trajectory: Coordinates = None, trajectory: Coordinates = None,
axis: str = "all", axis: str = "all",
full_output = False,
) -> float: ) -> float:
""" r"""
Calculate the non-Gaussian parameter. Calculate the non-Gaussian parameter.
..math: .. math:
\alpha_2 (t) = \alpha_2 (t) =
\frac{3}{5}\frac{\langle r_i^4(t)\rangle}{\langle r_i^2(t)\rangle^2} - 1 \frac{3}{5}\frac{\langle r_i^4(t)\rangle}{\langle r_i^2(t)\rangle^2} - 1
""" """
@@ -442,27 +473,41 @@ def non_gaussian_parameter(
else: else:
vectors = displacements_without_drift(start_frame, end_frame, trajectory) vectors = displacements_without_drift(start_frame, end_frame, trajectory)
if axis == "all": if axis == "all":
r = (vectors**2).sum(axis=1) r2 = (vectors**2).sum(axis=1)
dimensions = 3 dimensions = 3
elif axis == "xy" or axis == "yx": elif axis == "xy" or axis == "yx":
r = (vectors[:, [0, 1]]**2).sum(axis=1) r2 = (vectors[:, [0, 1]]**2).sum(axis=1)
dimensions = 2 dimensions = 2
elif axis == "xz" or axis == "zx": elif axis == "xz" or axis == "zx":
r = (vectors[:, [0, 2]]**2).sum(axis=1) r2 = (vectors[:, [0, 2]]**2).sum(axis=1)
dimensions = 2 dimensions = 2
elif axis == "yz" or axis == "zy": elif axis == "yz" or axis == "zy":
r = (vectors[:, [1, 2]]**2).sum(axis=1) r2 = (vectors[:, [1, 2]]**2).sum(axis=1)
dimensions = 2 dimensions = 2
elif axis == "x": elif axis == "x":
r = vectors[:, 0] ** 2 r2 = vectors[:, 0] ** 2
dimensions = 1 dimensions = 1
elif axis == "y": elif axis == "y":
r = vectors[:, 1] ** 2 r2 = vectors[:, 1] ** 2
dimensions = 1 dimensions = 1
elif axis == "z": elif axis == "z":
r = vectors[:, 2] ** 2 r2 = vectors[:, 2] ** 2
dimensions = 1 dimensions = 1
else: else:
raise ValueError('Parameter axis has to be ether "all", "x", "y", or "z"!') raise ValueError('Parameter axis has to be ether "all", "x", "y", or "z"!')
m2 = np.mean(r2)
m4 = np.mean(r2**2)
if m2 == 0.0:
if full_output:
return 0.0, 0.0, 0.0
else:
return 0.0
alpha_2 = (m4 / ((1 + 2 / dimensions) * m2**2)) - 1
if full_output:
return alpha_2, m2, m4
else:
return alpha_2
return (np.mean(r**2) / ((1 + 2 / dimensions) * (np.mean(r) ** 2))) - 1

View File

@@ -182,10 +182,10 @@ def tetrahedral_order(
) )
# Connection vectors # Connection vectors
neighbors_1 -= atoms neighbors_1 = pbc_diff(neighbors_1, atoms, box=atoms.box)
neighbors_2 -= atoms neighbors_2 = pbc_diff(neighbors_2, atoms, box=atoms.box)
neighbors_3 -= atoms neighbors_3 = pbc_diff(neighbors_3, atoms, box=atoms.box)
neighbors_4 -= atoms neighbors_4 = pbc_diff(neighbors_4, atoms, box=atoms.box)
# Normed Connection vectors # Normed Connection vectors
neighbors_1 /= np.linalg.norm(neighbors_1, axis=-1).reshape(-1, 1) neighbors_1 /= np.linalg.norm(neighbors_1, axis=-1).reshape(-1, 1)

View File

@@ -7,7 +7,7 @@ from numpy.typing import ArrayLike, NDArray
from itertools import product from itertools import product
from .logging import logger from .logging_util import logger
if TYPE_CHECKING: if TYPE_CHECKING:
from mdevaluate.coordinates import CoordinateFrame from mdevaluate.coordinates import CoordinateFrame

View File

@@ -19,13 +19,13 @@ import MDAnalysis
from scipy import sparse from scipy import sparse
from .checksum import checksum from .checksum import checksum
from .logging import logger from .logging_util import logger
from . import atoms from . import atoms
from .coordinates import Coordinates from .coordinates import Coordinates
CSR_ATTRS = ("data", "indices", "indptr") CSR_ATTRS = ("data", "indices", "indptr")
NOJUMP_MAGIC = 2016 NOJUMP_MAGIC = 2016
Group_RE = re.compile("\[ ([-+\w]+) \]") Group_RE = re.compile(r"\[ ([-+\w]+) \]")
class NojumpError(Exception): class NojumpError(Exception):

View File

@@ -14,7 +14,7 @@ from scipy.ndimage import uniform_filter1d
from scipy.interpolate import interp1d from scipy.interpolate import interp1d
from scipy.optimize import curve_fit from scipy.optimize import curve_fit
from .logging import logger from .logging_util import logger
from .functions import kww, kww_1e from .functions import kww, kww_1e
@@ -357,6 +357,37 @@ def quick1etau(t: ArrayLike, C: ArrayLike, n: int = 7) -> float:
return tau_est return tau_est
def quicknongaussfit(t, C, width=2):
"""
Estimates the time and height of the peak in the non-Gaussian function.
C is C(t) the correlation function
"""
def ffunc(t,y0,A_main,log_tau_main,sig_main):
main_peak = A_main*np.exp(-(t - log_tau_main)**2 / (2 * sig_main**2))
return y0 + main_peak
# first rough estimate, the closest time. This is returned if the interpolation fails!
tau_est = t[np.argmax(C)]
nG_max = np.amax(C)
try:
with np.errstate(invalid='ignore'):
corr = C[t > 0]
time = np.log10(t[t > 0])
tau = time[np.argmax(corr)]
mask = (time>tau-width/2) & (time<tau+width/2)
time = time[mask] ; corr = corr[mask]
nG_min = C[t > 0].min()
guess = [nG_min, nG_max-nG_min, tau, 0.6]
popt = curve_fit(ffunc, time, corr, p0=guess, maxfev=10000)[0]
tau_est = 10**popt[-2]
nG_max = popt[0] + popt[1]
except:
pass
if np.isnan(tau_est):
tau_est = np.inf
return tau_est, nG_max
def susceptibility( def susceptibility(
time: NDArray, correlation: NDArray, **kwargs time: NDArray, correlation: NDArray, **kwargs
) -> tuple[NDArray, NDArray]: ) -> tuple[NDArray, NDArray]: