Compare commits
10 Commits
mdeval_dev
...
feature/co
Author | SHA1 | Date | |
---|---|---|---|
|
0ffce2f17a | ||
|
0eff84910b | ||
|
dae2d6ed95 | ||
|
ec4094cd92 | ||
|
00043637e9 | ||
|
7585e598dc | ||
|
6d8b86c1ef | ||
|
a2a0ae8d7b | ||
90bd90a608 | |||
|
67d3e70a66 |
@@ -16,7 +16,7 @@ from . import reader
|
||||
from . import system
|
||||
from . import utils
|
||||
from . import extra
|
||||
from .logging import logger
|
||||
from .logging_util import logger
|
||||
|
||||
|
||||
def open(
|
||||
|
@@ -5,7 +5,7 @@ from typing import Optional, Callable, Iterable
|
||||
|
||||
import numpy as np
|
||||
from .checksum import checksum
|
||||
from .logging import logger
|
||||
from .logging_util import logger
|
||||
|
||||
autosave_directory: Optional[str] = None
|
||||
load_autosave_data = False
|
||||
|
@@ -1,9 +1,14 @@
|
||||
import functools
|
||||
import hashlib
|
||||
from .logging import logger
|
||||
from .logging_util import logger
|
||||
from types import ModuleType, FunctionType
|
||||
import inspect
|
||||
from typing import Iterable
|
||||
import ast
|
||||
import io
|
||||
import tokenize
|
||||
import re
|
||||
import textwrap
|
||||
|
||||
import numpy as np
|
||||
|
||||
@@ -28,19 +33,46 @@ def version(version_nr: int, calls: Iterable = ()):
|
||||
return decorator
|
||||
|
||||
|
||||
def strip_comments(s: str):
|
||||
"""Strips comment lines and docstring from Python source string."""
|
||||
o = ""
|
||||
in_docstring = False
|
||||
for l in s.split("\n"):
|
||||
if l.strip().startswith(("#", '"', "'")) or in_docstring:
|
||||
in_docstring = l.strip().startswith(('"""', "'''")) + in_docstring == 1
|
||||
def strip_comments(source: str) -> str:
|
||||
"""Removes docstrings, comments, and irrelevant whitespace from Python source code."""
|
||||
|
||||
# Step 1: Remove docstrings using AST
|
||||
def remove_docstrings(node):
|
||||
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef, ast.Module)):
|
||||
if (doc := ast.get_docstring(node, clean=False)):
|
||||
first_stmt = node.body[0]
|
||||
if isinstance(first_stmt, ast.Expr) and isinstance(first_stmt.value, ast.Constant):
|
||||
node.body.pop(0) # Remove the docstring entirely
|
||||
for child in ast.iter_child_nodes(node):
|
||||
remove_docstrings(child)
|
||||
|
||||
tree = ast.parse(textwrap.dedent(source))
|
||||
remove_docstrings(tree)
|
||||
code_without_docstrings = ast.unparse(tree)
|
||||
|
||||
# Step 2: Remove comments using tokenize
|
||||
tokens = tokenize.generate_tokens(io.StringIO(code_without_docstrings).readline)
|
||||
result = []
|
||||
last_lineno = -1
|
||||
last_col = 0
|
||||
|
||||
for toknum, tokval, (srow, scol), (erow, ecol), line in tokens:
|
||||
if toknum == tokenize.COMMENT:
|
||||
continue
|
||||
o += l + "\n"
|
||||
return o
|
||||
if srow > last_lineno:
|
||||
last_col = 0
|
||||
if scol > last_col:
|
||||
result.append(" " * (scol - last_col))
|
||||
result.append(tokval)
|
||||
last_lineno, last_col = erow, ecol
|
||||
|
||||
code_no_comments = ''.join(result)
|
||||
|
||||
# Step 3: Remove empty lines (whitespace-only or truly blank)
|
||||
return "\n".join([line for line in code_no_comments.splitlines() if line.strip() != ""])
|
||||
|
||||
|
||||
def checksum(*args, csum=None):
|
||||
def checksum(*args, csum=None, _seen=None):
|
||||
"""
|
||||
Calculate a checksum of any object, by sha1 hash.
|
||||
|
||||
@@ -60,7 +92,15 @@ def checksum(*args, csum=None):
|
||||
csum = hashlib.sha1()
|
||||
csum.update(str(SALT).encode())
|
||||
|
||||
if _seen is None:
|
||||
_seen = set()
|
||||
|
||||
for arg in args:
|
||||
obj_id = id(arg)
|
||||
if obj_id in _seen:
|
||||
continue
|
||||
_seen.add(obj_id)
|
||||
|
||||
if hasattr(arg, "__checksum__"):
|
||||
logger.debug("Checksum via __checksum__: %s", str(arg))
|
||||
csum.update(str(arg.__checksum__()).encode())
|
||||
@@ -73,17 +113,19 @@ def checksum(*args, csum=None):
|
||||
elif isinstance(arg, FunctionType):
|
||||
csum.update(strip_comments(inspect.getsource(arg)).encode())
|
||||
c = inspect.getclosurevars(arg)
|
||||
for v in {**c.nonlocals, **c.globals}.values():
|
||||
merged = {**c.nonlocals, **c.globals}
|
||||
for key in sorted(merged): # deterministic ordering
|
||||
v = merged[key]
|
||||
if v is not arg:
|
||||
checksum(v, csum=csum)
|
||||
checksum(v, csum=csum, _seen=_seen)
|
||||
elif isinstance(arg, functools.partial):
|
||||
logger.debug("Checksum via partial for %s", str(arg))
|
||||
checksum(arg.func, csum=csum)
|
||||
checksum(arg.func, csum=csum, _seen=_seen)
|
||||
for x in arg.args:
|
||||
checksum(x, csum=csum)
|
||||
checksum(x, csum=csum, _seen=_seen)
|
||||
for k in sorted(arg.keywords.keys()):
|
||||
csum.update(k.encode())
|
||||
checksum(arg.keywords[k], csum=csum)
|
||||
checksum(arg.keywords[k], csum=csum, _seen=_seen)
|
||||
elif isinstance(arg, np.ndarray):
|
||||
csum.update(arg.tobytes())
|
||||
else:
|
||||
|
@@ -1,6 +1,6 @@
|
||||
from functools import partial, wraps
|
||||
from copy import copy
|
||||
from .logging import logger
|
||||
from .logging_util import logger
|
||||
from typing import Optional, Callable, List, Tuple
|
||||
|
||||
import numpy as np
|
||||
|
@@ -18,7 +18,7 @@ def log_indices(first: int, last: int, num: int = 100) -> np.ndarray:
|
||||
return np.unique(np.int_(ls) - 1 + first)
|
||||
|
||||
|
||||
@autosave_data(2)
|
||||
@autosave_data(nargs=2, kwargs_keys=('selector', 'segments', 'skip', 'window', 'average', 'points',), version=1.0)
|
||||
def shifted_correlation(
|
||||
function: Callable,
|
||||
frames: Coordinates,
|
||||
@@ -199,7 +199,7 @@ def msd(
|
||||
raise ValueError('Parameter axis has to be ether "all", "x", "y", or "z"!')
|
||||
|
||||
|
||||
def isf(
|
||||
def isf_raw(
|
||||
start_frame: CoordinateFrame,
|
||||
end_frame: CoordinateFrame,
|
||||
q: float = 22.7,
|
||||
@@ -216,29 +216,59 @@ def isf(
|
||||
displacements = displacements_without_drift(start_frame, end_frame, trajectory)
|
||||
if axis == "all":
|
||||
distance = (displacements**2).sum(axis=1) ** 0.5
|
||||
return np.sinc(distance * q / np.pi).mean()
|
||||
return np.sinc(distance * q / np.pi)
|
||||
elif axis == "xy" or axis == "yx":
|
||||
distance = (displacements[:, [0, 1]]**2).sum(axis=1) ** 0.5
|
||||
return np.real(jn(0, distance * q)).mean()
|
||||
return np.real(jn(0, distance * q))
|
||||
elif axis == "xz" or axis == "zx":
|
||||
distance = (displacements[:, [0, 2]]**2).sum(axis=1) ** 0.5
|
||||
return np.real(jn(0, distance * q)).mean()
|
||||
return np.real(jn(0, distance * q))
|
||||
elif axis == "yz" or axis == "zy":
|
||||
distance = (displacements[:, [1, 2]]**2).sum(axis=1) ** 0.5
|
||||
return np.real(jn(0, distance * q)).mean()
|
||||
return np.real(jn(0, distance * q))
|
||||
elif axis == "x":
|
||||
distance = np.abs(displacements[:, 0])
|
||||
return np.mean(np.cos(np.abs(q * distance)))
|
||||
return np.cos(np.abs(q * distance))
|
||||
elif axis == "y":
|
||||
distance = np.abs(displacements[:, 1])
|
||||
return np.mean(np.cos(np.abs(q * distance)))
|
||||
return np.cos(np.abs(q * distance))
|
||||
elif axis == "z":
|
||||
distance = np.abs(displacements[:, 2])
|
||||
return np.mean(np.cos(np.abs(q * distance)))
|
||||
return np.cos(np.abs(q * distance))
|
||||
else:
|
||||
raise ValueError('Parameter axis has to be ether "all", "x", "y", or "z"!')
|
||||
|
||||
|
||||
def isf(
|
||||
start_frame: CoordinateFrame,
|
||||
end_frame: CoordinateFrame,
|
||||
q: float = 22.7,
|
||||
trajectory: Coordinates = None,
|
||||
axis: str = "all",
|
||||
) -> float:
|
||||
"""
|
||||
Incoherent intermediate scattering function averaged over all particles.
|
||||
See isf_raw for details.
|
||||
"""
|
||||
return isf_raw(start_frame, end_frame, q=q, trajectory=trajectory, axis=axis).mean()
|
||||
|
||||
|
||||
def isf_mean_var(
|
||||
start_frame: CoordinateFrame,
|
||||
end_frame: CoordinateFrame,
|
||||
q: float = 22.7,
|
||||
trajectory: Coordinates = None,
|
||||
axis: str = "all",
|
||||
) -> float:
|
||||
"""
|
||||
Incoherent intermediate scattering function averaged over all particles and the
|
||||
variance.
|
||||
See isf_raw for details.
|
||||
"""
|
||||
values = isf_raw(start_frame, end_frame, q=q, trajectory=trajectory, axis=axis)
|
||||
return values.mean(), values.var()
|
||||
|
||||
|
||||
def rotational_autocorrelation(
|
||||
start_frame: CoordinateFrame, end_frame: CoordinateFrame, order: int = 2
|
||||
) -> float:
|
||||
@@ -430,10 +460,11 @@ def non_gaussian_parameter(
|
||||
end_frame: CoordinateFrame,
|
||||
trajectory: Coordinates = None,
|
||||
axis: str = "all",
|
||||
full_output = False,
|
||||
) -> float:
|
||||
"""
|
||||
r"""
|
||||
Calculate the non-Gaussian parameter.
|
||||
..math:
|
||||
.. math:
|
||||
\alpha_2 (t) =
|
||||
\frac{3}{5}\frac{\langle r_i^4(t)\rangle}{\langle r_i^2(t)\rangle^2} - 1
|
||||
"""
|
||||
@@ -442,27 +473,41 @@ def non_gaussian_parameter(
|
||||
else:
|
||||
vectors = displacements_without_drift(start_frame, end_frame, trajectory)
|
||||
if axis == "all":
|
||||
r = (vectors**2).sum(axis=1)
|
||||
r2 = (vectors**2).sum(axis=1)
|
||||
dimensions = 3
|
||||
elif axis == "xy" or axis == "yx":
|
||||
r = (vectors[:, [0, 1]]**2).sum(axis=1)
|
||||
r2 = (vectors[:, [0, 1]]**2).sum(axis=1)
|
||||
dimensions = 2
|
||||
elif axis == "xz" or axis == "zx":
|
||||
r = (vectors[:, [0, 2]]**2).sum(axis=1)
|
||||
r2 = (vectors[:, [0, 2]]**2).sum(axis=1)
|
||||
dimensions = 2
|
||||
elif axis == "yz" or axis == "zy":
|
||||
r = (vectors[:, [1, 2]]**2).sum(axis=1)
|
||||
r2 = (vectors[:, [1, 2]]**2).sum(axis=1)
|
||||
dimensions = 2
|
||||
elif axis == "x":
|
||||
r = vectors[:, 0] ** 2
|
||||
r2 = vectors[:, 0] ** 2
|
||||
dimensions = 1
|
||||
elif axis == "y":
|
||||
r = vectors[:, 1] ** 2
|
||||
r2 = vectors[:, 1] ** 2
|
||||
dimensions = 1
|
||||
elif axis == "z":
|
||||
r = vectors[:, 2] ** 2
|
||||
r2 = vectors[:, 2] ** 2
|
||||
dimensions = 1
|
||||
else:
|
||||
raise ValueError('Parameter axis has to be ether "all", "x", "y", or "z"!')
|
||||
|
||||
m2 = np.mean(r2)
|
||||
m4 = np.mean(r2**2)
|
||||
if m2 == 0.0:
|
||||
if full_output:
|
||||
return 0.0, 0.0, 0.0
|
||||
else:
|
||||
return 0.0
|
||||
|
||||
alpha_2 = (m4 / ((1 + 2 / dimensions) * m2**2)) - 1
|
||||
if full_output:
|
||||
return alpha_2, m2, m4
|
||||
else:
|
||||
return alpha_2
|
||||
|
||||
|
||||
return (np.mean(r**2) / ((1 + 2 / dimensions) * (np.mean(r) ** 2))) - 1
|
||||
|
@@ -182,10 +182,10 @@ def tetrahedral_order(
|
||||
)
|
||||
|
||||
# Connection vectors
|
||||
neighbors_1 -= atoms
|
||||
neighbors_2 -= atoms
|
||||
neighbors_3 -= atoms
|
||||
neighbors_4 -= atoms
|
||||
neighbors_1 = pbc_diff(neighbors_1, atoms, box=atoms.box)
|
||||
neighbors_2 = pbc_diff(neighbors_2, atoms, box=atoms.box)
|
||||
neighbors_3 = pbc_diff(neighbors_3, atoms, box=atoms.box)
|
||||
neighbors_4 = pbc_diff(neighbors_4, atoms, box=atoms.box)
|
||||
|
||||
# Normed Connection vectors
|
||||
neighbors_1 /= np.linalg.norm(neighbors_1, axis=-1).reshape(-1, 1)
|
||||
|
@@ -7,7 +7,7 @@ from numpy.typing import ArrayLike, NDArray
|
||||
|
||||
from itertools import product
|
||||
|
||||
from .logging import logger
|
||||
from .logging_util import logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from mdevaluate.coordinates import CoordinateFrame
|
||||
|
@@ -19,13 +19,13 @@ import MDAnalysis
|
||||
from scipy import sparse
|
||||
|
||||
from .checksum import checksum
|
||||
from .logging import logger
|
||||
from .logging_util import logger
|
||||
from . import atoms
|
||||
from .coordinates import Coordinates
|
||||
|
||||
CSR_ATTRS = ("data", "indices", "indptr")
|
||||
NOJUMP_MAGIC = 2016
|
||||
Group_RE = re.compile("\[ ([-+\w]+) \]")
|
||||
Group_RE = re.compile(r"\[ ([-+\w]+) \]")
|
||||
|
||||
|
||||
class NojumpError(Exception):
|
||||
|
@@ -14,7 +14,7 @@ from scipy.ndimage import uniform_filter1d
|
||||
from scipy.interpolate import interp1d
|
||||
from scipy.optimize import curve_fit
|
||||
|
||||
from .logging import logger
|
||||
from .logging_util import logger
|
||||
from .functions import kww, kww_1e
|
||||
|
||||
|
||||
@@ -357,6 +357,37 @@ def quick1etau(t: ArrayLike, C: ArrayLike, n: int = 7) -> float:
|
||||
return tau_est
|
||||
|
||||
|
||||
def quicknongaussfit(t, C, width=2):
|
||||
"""
|
||||
Estimates the time and height of the peak in the non-Gaussian function.
|
||||
C is C(t) the correlation function
|
||||
"""
|
||||
def ffunc(t,y0,A_main,log_tau_main,sig_main):
|
||||
main_peak = A_main*np.exp(-(t - log_tau_main)**2 / (2 * sig_main**2))
|
||||
return y0 + main_peak
|
||||
|
||||
# first rough estimate, the closest time. This is returned if the interpolation fails!
|
||||
tau_est = t[np.argmax(C)]
|
||||
nG_max = np.amax(C)
|
||||
try:
|
||||
with np.errstate(invalid='ignore'):
|
||||
corr = C[t > 0]
|
||||
time = np.log10(t[t > 0])
|
||||
tau = time[np.argmax(corr)]
|
||||
mask = (time>tau-width/2) & (time<tau+width/2)
|
||||
time = time[mask] ; corr = corr[mask]
|
||||
nG_min = C[t > 0].min()
|
||||
guess = [nG_min, nG_max-nG_min, tau, 0.6]
|
||||
popt = curve_fit(ffunc, time, corr, p0=guess, maxfev=10000)[0]
|
||||
tau_est = 10**popt[-2]
|
||||
nG_max = popt[0] + popt[1]
|
||||
except:
|
||||
pass
|
||||
if np.isnan(tau_est):
|
||||
tau_est = np.inf
|
||||
return tau_est, nG_max
|
||||
|
||||
|
||||
def susceptibility(
|
||||
time: NDArray, correlation: NDArray, **kwargs
|
||||
) -> tuple[NDArray, NDArray]:
|
||||
|
Reference in New Issue
Block a user