Merge branch 'mdeval_dev'

# Conflicts:
#	.gitignore
#	doc/conf.py
#	examples/plot_chi4.py
#	examples/plot_isf.py
#	examples/plot_spatialisf.py
#	examples/plot_temperature.py
#	src/mdevaluate/cli.py
#	src/mdevaluate/correlation.py
#	src/mdevaluate/distribution.py
#	src/mdevaluate/functions.py
#	test/test_atoms.py
This commit is contained in:
2024-01-16 16:54:54 +01:00
49 changed files with 2231 additions and 3310 deletions

View File

@@ -1,43 +1,52 @@
import os
from glob import glob
from typing import Optional
import pandas as pd
from . import atoms
from . import autosave
from . import checksum
from . import coordinates
from . import correlation
from . import distribution
from . import functions
from . import pbc
from . import autosave
from . import reader
from . import system
from . import utils
from . import extra
from .logging import logger
def open(
directory="",
topology="*.tpr",
trajectory="*.xtc",
cached=False,
nojump=False,
index_file=None,
charges=None,
masses=None,
):
directory: str = "",
topology: str = "*.tpr",
trajectory: str = "*.xtc",
nojump: bool = False,
index_file: Optional[str] = None,
charges: Optional[list[float]] = None,
masses: Optional[list[float]] = None,
) -> coordinates.Coordinates:
"""
Open a simulation from a directory.
Args:
directory: Directory of the simulation.
topology (opt.):
Descriptor of the topology file (tpr or gro). By default a tpr file is
Descriptor of the topology file (tpr or gro). By default, a tpr file is
used, if there is exactly one in the directoy.
trajectory (opt.): Descriptor of the trajectory (xtc file).
cached (opt.):
If the trajectory reader should be cached. Can be True, an integer or None.
If this is True maxsize is 128, otherwise this is used as maxsize for
the cache, None means infinite cache (this is a potential memory leak!).
nojump (opt.): If nojump matrixes should be generated. They will alwyas be loaded if present
trajectory (opt.): Descriptor of the trajectory (xtc or trr file).
nojump (opt.):
If nojump matrices should be generated. They will alwyas be loaded
if present
index_file (opt.): Descriptor of the index file (ndx file).
charges (opt.):
List with charges for each atom. It Has to be the same length as the number
of atoms in the system. Only used if topology file is a gro file.
masses (opt.):
List with masses for each atom. It Has to be the same length as the number
of atoms in the system. Only used if topology file is a gro file.
Returns:
A Coordinate object of the simulation.
@@ -45,13 +54,14 @@ def open(
Example:
Open a simulation located in '/path/to/sim', where the trajectory is
located in a sub-directory '/path/to/sim/out' and named for Example
'nojump_traj.xtc'. All read frames will be cached in memory.
'nojump_traj.xtc'.
>>> open('/path/to/sim', trajectory='out/nojump*.xtc', cached=None)
>>> open('/path/to/sim', trajectory='out/nojump*.xtc')
The file descriptors can use unix style pathname expansion to define the filenames.
The default patterns use the recursive placeholder `**` which matches the base or
any subdirctory, thus files in subdirectories with matching file type will be found too.
any subdirctory, thus files in subdirectories with matching file type will be found
too.
For example: 'out/nojump*.xtc' would match xtc files in a subdirectory `out` that
start with `nojump` and end with `.xtc`.
@@ -80,7 +90,6 @@ def open(
atom_set, frames = reader.open_with_mdanalysis(
top_file,
traj_file,
cached=cached,
index_file=index_file,
charges=charges,
masses=masses,
@@ -88,15 +97,17 @@ def open(
coords = coordinates.Coordinates(frames, atom_subset=atom_set)
if nojump:
try:
frames.nojump_matrixes
frames.nojump_matrices
except reader.NojumpError:
reader.generate_nojump_matrixes(coords)
reader.generate_nojump_matrices(coords)
return coords
def open_energy(file):
"""Reads an gromacs energy file and output the data in a pandas DataFrame.
def open_energy(file: str) -> pd.DataFrame:
"""Reads a gromacs energy file and output the data in a pandas DataFrame.
Args:
file: Filename of the energy file
Returns:
A DataFrame with the different energies doe each time.
"""
return pd.DataFrame(reader.energy_reader(file).data_dict)

View File

@@ -1,25 +1,16 @@
import re
from .pbc import pbc_diff
from .checksum import checksum
import numpy as np
import scipy
if scipy.version.version >= "0.17.0":
from scipy.spatial import cKDTree as KDTree
else:
from scipy.spatial import KDTree
from .checksum import checksum
def compare_regex(list, exp):
def compare_regex(str_list: list[str], exp: str) -> np.ndarray:
"""
Compare a list of strings with a regular expression.
"""
if not exp.endswith("$"):
exp += "$"
regex = re.compile(exp)
return np.array([regex.match(s) is not None for s in list])
return np.array([regex.match(s) is not None for s in str_list])
class Atoms:
@@ -83,10 +74,10 @@ class AtomSubset:
the keyworss below. Names are matched as a regular expression with `re.match`.
Args:
atom_name: Specification of the atom name
residue_name: Specification of the resiude name
residue_id: Residue ID or list of IDs
indices: List of atom indices
atom_name: Specification of the atom name
residue_name: Specification of the resiude name
residue_id: Residue ID or list of IDs
indices: List of atom indices
"""
new_subset = self
if atom_name is not None:
@@ -180,107 +171,6 @@ class AtomSubset:
return checksum(self.description)
def center_of_mass(position, mass=None):
if mass is not None:
return 1 / mass.sum() * (mass * position).sum(axis=0)
else:
return 1 / len(position) * position.sum(axis=0)
def gyration_radius(position):
r"""
Calculates a list of all radii of gyration of all molecules given in the coordinate frame,
weighted with the masses of the individual atoms.
Args:
position: Coordinate frame object
..math::
R_G = \left(\frac{\sum_{i=1}^{n} m_i |\vec{r_i} - \vec{r_{COM}}|^2 }{\sum_{i=1}^{n} m_i }
\rigth)^{\frac{1}{2}}
"""
gyration_radii = np.array([])
for resid in np.unique(position.residue_ids):
pos = position.whole[position.residue_ids == resid]
mass = position.masses[position.residue_ids == resid][:, np.newaxis]
COM = center_of_mass(pos, mass)
r_sq = ((pbc_diff(pos, COM, pos.box.diagonal())) ** 2).sum(1)[:, np.newaxis]
g_radius = ((r_sq * mass).sum() / mass.sum()) ** (0.5)
gyration_radii = np.append(gyration_radii, g_radius)
return gyration_radii
def layer_of_atoms(
atoms, thickness, plane_offset=np.array([0, 0, 0]), plane_normal=np.array([1, 0, 0])
):
p_ = atoms - plane_offset
distance = np.dot(p_, plane_normal)
return abs(distance) <= thickness
def distance_to_atoms(ref, atoms, box=None):
"""Get the minimal distance from atoms to ref.
The result is an array of with length == len(atoms)
"""
out = np.empty(atoms.shape[0])
for i, atom in enumerate(atoms):
diff = (pbc_diff(atom, ref, box) ** 2).sum(axis=1).min()
out[i] = np.sqrt(diff)
return out
def distance_to_atoms_cKDtree(ref, atoms, box=None, thickness=None):
"""
Get the minimal distance from atoms to ref.
The result is an array of with length == len(atoms)
Can be faster than distance_to_atoms.
Thickness defaults to box/5. If this is too small results may be wrong.
If box is not given then periodic boundary conditions are not applied!
"""
if thickness == None:
thickness = box / 5
if box is not None:
start_coords = np.copy(atoms) % box
all_frame_coords = pbc_points(ref, box, thickness=thickness)
else:
start_coords = atoms
all_frame_coords = ref
tree = spatial.cKDTree(all_frame_coords)
return tree.query(start_coords)[0]
def next_neighbors(
atoms,
query_atoms=None,
number_of_neighbors=1,
distance_upper_bound=np.inf,
distinct=False,
):
"""
Find the N next neighbors of a set of atoms.
Args:
atoms: The reference atoms and also the atoms which are queried if `query_atoms` is net provided
query_atoms (opt.): If this is not None, these atoms will be queried
number_of_neighbors (int, opt.): Number of neighboring atoms to find
distance_upper_bound (float, opt.): Upper bound of the distance between neighbors
distinct (bool, opt.): If this is true, the atoms and query atoms are taken as distinct sets of atoms
"""
tree = KDTree(atoms)
dnn = 0
if query_atoms is None:
query_atoms = atoms
elif not distinct:
dnn = 1
dist, indices = tree.query(
query_atoms,
number_of_neighbors + dnn,
distance_upper_bound=distance_upper_bound,
)
return indices[:, dnn:]

View File

@@ -1,31 +1,33 @@
import os
import numpy as np
import functools
import inspect
from typing import Optional, Callable, Iterable
import numpy as np
from .checksum import checksum
from .logging import logger
autosave_directory = None
autosave_directory: Optional[str] = None
load_autosave_data = False
verbose_print = True
user_autosave_directory = os.path.join(os.environ["HOME"], ".mdevaluate/autosave")
def notify(msg):
def notify(msg: str):
if verbose_print:
logger.info(msg)
else:
logger.debug(msg)
def enable(dir, load_data=True, verbose=True):
def enable(dir: str, load_data: bool = True, verbose: bool = True):
"""
Enable auto saving results of functions decorated with :func:`autosave_data`.
Enable auto saving results of functions decorated with: func: `autosave_data`.
Args:
dir: Directory where the data should be saved.
load_data (opt., bool): If data should also be loaded.
verbose (opt., bool): If autosave should be verbose.
"""
global autosave_directory, load_autosave_data, verbose_print
verbose_print = verbose
@@ -79,9 +81,8 @@ def get_directory(reader):
if not os.access(savedir, os.W_OK):
savedir = os.path.join(user_autosave_directory, savedir.lstrip("/"))
logger.info(
"Switched autosave directory to {}, since original location is not writeable.".format(
savedir
)
"Switched autosave directory to {}, "
"since original location is not writeable.".format(savedir)
)
os.makedirs(savedir, exist_ok=True)
return savedir
@@ -140,20 +141,24 @@ def load_data(filename):
return data
def autosave_data(nargs, kwargs_keys=None, version=None):
def autosave_data(
nargs: int, kwargs_keys: Optional[Iterable[str]] = None, version: Optional[str] = None
) -> Callable:
"""
Enable autosaving of results for a function.
Args:
nargs: Number of args which are relevant for the calculation.
kwargs_keys (opt.): List of keyword arguments which are relevant for the calculation.
kwargs_keys (opt.):
List of keyword arguments which are relevant for the calculation.
version (opt.):
An optional version number of the decorated function, which replaces the checksum of
the function code, hence the checksum does not depend on the function code.
An optional version number of the decorated function, which replaces the
checksum of the function code, hence the checksum does not depend on the
function code.
"""
def decorator_function(function):
# make sure too include names of positional arguments in kwargs_keys,
# make sure to include names of positional arguments in kwargs_keys,
# sice otherwise they will be ignored if passed via keyword.
# nonlocal kwargs_keys
posargs_keys = list(inspect.signature(function).parameters)[:nargs]

View File

@@ -3,6 +3,7 @@ import hashlib
from .logging import logger
from types import ModuleType, FunctionType
import inspect
from typing import Iterable
import numpy as np
@@ -11,7 +12,7 @@ import numpy as np
SALT = 42
def version(version_nr, calls=[]):
def version(version_nr: int, calls: Iterable = ()):
"""Function decorator that assigns a custom checksum to a function."""
def decorator(func):
@@ -27,7 +28,7 @@ def version(version_nr, calls=[]):
return decorator
def strip_comments(s):
def strip_comments(s: str):
"""Strips comment lines and docstring from Python source string."""
o = ""
in_docstring = False
@@ -43,14 +44,15 @@ def checksum(*args, csum=None):
"""
Calculate a checksum of any object, by sha1 hash.
Input for the hash are some salt bytes and the byte encoding of a string
Inputs for the hash are some salt bytes and the byte encoding of a string
that depends on the object and its type:
- If a method __checksum__ is available, it's return value is converted to bytes
- If a method __checksum__ is available, its return value is converted to bytes
- str or bytes are used as sha1 input directly
- modules use the __name__ attribute
- functions use the function code and any closures of the function
- functools.partial uses the checksum of the function and any arguments, that were defined
- functools.partial uses the checksum of the function and any arguments, that were
defined
- numpy.ndarray uses bytes representation of the array (arr.tobytes())
- Anything else is converted to a str
"""

View File

@@ -1,37 +0,0 @@
import argparse
from . import logging
from . import open as md_open
def run(*args, **kwargs):
parser = argparse.ArgumentParser()
parser.add_argument(
"xtcfile",
help="The xtc file to index.",
)
parser.add_argument(
"--tpr", help="The tprfile of the trajectory.", dest="tpr", default=None
)
parser.add_argument(
"--nojump",
help="Generate Nojump Matrices, requires a tpr file.",
dest="nojump",
action="store_true",
default=False,
)
parser.add_argument(
"--debug",
help="Set logging level to debug.",
dest="debug",
action="store_true",
default=False,
)
args = parser.parse_args()
if args.debug:
logging.setlevel("DEBUG")
md_open("", trajectory=args.xtcfile, topology=args.tpr, nojump=args.nojump)
if __name__ == "__main__":
run()

View File

@@ -1,13 +1,15 @@
from functools import partial, lru_cache, wraps
from functools import partial, wraps
from copy import copy
from .logging import logger
from typing import Optional, Callable, List, Tuple
import numpy as np
from scipy.spatial import cKDTree, KDTree
from numpy.typing import ArrayLike, NDArray
from scipy.spatial import KDTree
from .atoms import AtomSubset
from .pbc import whole, nojump, pbc_diff
from .utils import mask2indices, singledispatchmethod
from .pbc import whole, nojump, pbc_diff, pbc_points
from .utils import singledispatchmethod
from .checksum import checksum
@@ -15,94 +17,7 @@ class UnknownCoordinatesMode(Exception):
pass
def rotate_axis(coords, axis):
"""
Rotate a set of coordinates to a given axis.
"""
axis = np.array(axis) / np.linalg.norm(axis)
zaxis = np.array([0, 0, 1])
if (axis == zaxis).sum() == 3:
return coords
rotation_axis = np.cross(axis, zaxis)
rotation_axis = rotation_axis / np.linalg.norm(rotation_axis)
theta = np.arccos(axis @ zaxis / np.linalg.norm(axis))
# return theta/pi, rotation_axis
ux, uy, uz = rotation_axis
cross_matrix = np.array([[0, -uz, uy], [uz, 0, -ux], [-uy, ux, 0]])
rotation_matrix = (
np.cos(theta) * np.identity(len(axis))
+ (1 - np.cos(theta))
* rotation_axis.reshape(-1, 1)
@ rotation_axis.reshape(1, -1)
+ np.sin(theta) * cross_matrix
)
if len(coords.shape) == 2:
rotated = np.array([rotation_matrix @ xyz for xyz in coords])
else:
rotated = rotation_matrix @ coords
return rotated
def spherical_radius(frame, origin=None):
"""
Transform a frame of cartesian coordinates into the sperical radius.
If origin=None the center of the box is taken as the coordinates origin.
"""
if origin is None:
origin = frame.box.diagonal() / 2
return ((frame - origin) ** 2).sum(axis=-1) ** 0.5
def polar_coordinates(x, y):
"""Convert cartesian to polar coordinates."""
radius = (x**2 + y**2) ** 0.5
phi = np.arctan2(y, x)
return radius, phi
def spherical_coordinates(x, y, z):
"""Convert cartesian to spherical coordinates."""
xy, phi = polar_coordinates(x, y)
radius = (x**2 + y**2 + z**2) ** 0.5
theta = np.arccos(z / radius)
return radius, phi, theta
def radial_selector(frame, coordinates, rmin, rmax):
"""
Return a selection of all atoms with radius in the interval [rmin, rmax].
"""
crd = coordinates[frame.step]
rad, _ = polar_coordinates(crd[:, 0], crd[:, 1])
selector = (rad >= rmin) & (rad <= rmax)
return mask2indices(selector)
def spatial_selector(frame, transform, rmin, rmax):
"""
Select a subset of atoms which have a radius between rmin and rmax.
Coordinates are filtered by the condition::
rmin <= transform(frame) <= rmax
Args:
frame: The coordinates of the actual trajectory
transform:
A function that transforms the coordinates of the frames into
the one-dimensional spatial coordinate (e.g. radius).
rmin: Minimum value of the radius
rmax: Maximum value of the radius
"""
r = transform(frame)
selector = (rmin <= r) & (rmax >= r)
return mask2indices(selector)
class CoordinateFrame(np.ndarray):
class CoordinateFrame(NDArray):
_known_modes = ("pbc", "whole", "nojump")
@property
@@ -184,7 +99,7 @@ class CoordinateFrame(np.ndarray):
box=None,
mode=None,
):
obj = np.ndarray.__new__(subtype, shape, dtype, buffer, offset, strides)
obj = NDArray.__new__(subtype, shape, dtype, buffer, offset, strides)
obj.coordinates = coordinates
obj.step = step
@@ -206,7 +121,7 @@ class Coordinates:
"""
Coordinates represent trajectory data, which is used for evaluation functions.
Atoms may be selected by specifing a atom_subset or a atom_filter.
Atoms may be selected by specifying an atom_subset or an atom_filter.
"""
def get_mode(self, mode):
@@ -239,7 +154,8 @@ class Coordinates:
def mode(self, val):
if val in CoordinateFrame._known_modes:
logger.warn(
"Changing the Coordinates mode directly is deprecated. Use Coordinates.%s instead, which returns a copy.",
"Changing the Coordinates mode directly is deprecated. "
"Use Coordinates.%s instead, which returns a copy.",
val,
)
self._mode = val
@@ -339,23 +255,6 @@ class Coordinates:
self.atom_subset.description = desc
class MeanCoordinates(Coordinates):
def __init__(self, frames, atom_filter=None, mean=1):
super().__init__(frames, atom_filter)
self.mean = mean
assert mean >= 1, "Mean must be positive"
def __getitem__(self, item):
frame = super().__getitem__(item)
for i in range(item + 1, item + self.mean):
frame += super().__getitem__(i)
return frame / self.mean
def len(self):
return len(super() - self.mean + 1)
class CoordinatesMap:
def __init__(self, coordinates, function):
self.coordinates = coordinates
@@ -367,6 +266,7 @@ class CoordinatesMap:
self._description = self.function.func.__name__
else:
self._description = self.function.__name__
self._slice = slice(None)
def __iter__(self):
for frame in self.coordinates:
@@ -420,124 +320,131 @@ class CoordinatesMap:
return CoordinatesMap(self.coordinates.pbc, self.function)
class CoordinatesFilter:
@property
def atom_subset(self):
pass
def __init__(self, coordinates, atom_filter):
self.coordinates = coordinates
self.atom_filter = atom_filter
def __getitem__(self, item):
if isinstance(item, slice):
sliced = copy(self)
sliced.coordinates = self.coordinates[item]
return sliced
else:
frame = self.coordinates[item]
return frame[self.atom_filter]
class CoordinatesKDTree:
def rotate_axis(coords: ArrayLike, axis: ArrayLike) -> NDArray:
"""
A KDTree of coordinates frames. The KDtrees are cached by a :func:`functools.lru_cache`.
Uses :class:`scipy.spatial.cKDTree` by default, since it's significantly faster.
Make sure to use scipy 0.17 or later or switch to the normal KDTree, since cKDTree has
a memory leak in earlier versions.
Rotate a set of coordinates to a given axis.
"""
axis = np.array(axis) / np.linalg.norm(axis)
zaxis = np.array([0, 0, 1])
if (axis == zaxis).sum() == 3:
return coords
rotation_axis = np.cross(axis, zaxis)
rotation_axis = rotation_axis / np.linalg.norm(rotation_axis)
def clear_cache(self):
"""Clear the LRU cache."""
self._get_tree_at_index.cache_clear()
theta = np.arccos(axis @ zaxis / np.linalg.norm(axis))
@property
def cache_info(self):
"""Return info about the state of the cache."""
return self._get_tree_at_index.cache_info()
# return theta/pi, rotation_axis
def _get_tree_at_index(self, index):
frame = self.frames[index]
return self.kdtree(frame[self.selector(frame)])
ux, uy, uz = rotation_axis
cross_matrix = np.array([[0, -uz, uy], [uz, 0, -ux], [-uy, ux, 0]])
rotation_matrix = (
np.cos(theta) * np.identity(len(axis))
+ (1 - np.cos(theta))
* rotation_axis.reshape(-1, 1)
@ rotation_axis.reshape(1, -1)
+ np.sin(theta) * cross_matrix
)
def __init__(self, frames, selector=None, boxsize=None, maxcache=128, ckdtree=True):
"""
Args:
frames: Trajectory of the simulation, can be Coordinates object or reader
selector: Selector function that selects a subset of each frame
maxcache: Maxsize of the :func:`~functools.lru_cache`
ckdtree: Use :class:`~scipy.spatial.cKDTree` or :class:`~scipy.spatial.KDTree` if False
"""
if selector is not None:
self.selector = selector
else:
self.selector = lambda x: slice(None)
self.frames = frames
self.kdtree = cKDTree if ckdtree else KDTree
if boxsize is not None:
self.kdtree = partial(self.kdtree, boxsize=boxsize)
self._get_tree_at_index = lru_cache(maxsize=maxcache)(self._get_tree_at_index)
def __getitem__(self, index):
return self._get_tree_at_index(index)
def __checksum__(self):
return checksum(self.selector, self.frames)
def __eq__(self, other):
return super().__eq__(other)
if len(coords.shape) == 2:
rotated = np.array([rotation_matrix @ xyz for xyz in coords])
else:
rotated = rotation_matrix @ coords
return rotated
def map_coordinates(func):
def spherical_radius(
frame: CoordinateFrame, origin: Optional[ArrayLike] = None
) -> NDArray:
"""
Transform a frame of cartesian coordinates into the spherical radius.
If origin=None, the center of the box is taken as the coordinates' origin.
"""
if origin is None:
origin = frame.box.diagonal() / 2
return ((frame - origin) ** 2).sum(axis=-1) ** 0.5
def polar_coordinates(x: ArrayLike, y: ArrayLike) -> (NDArray, NDArray):
"""Convert cartesian to polar coordinates."""
radius = (x**2 + y**2) ** 0.5
phi = np.arctan2(y, x)
return radius, phi
def spherical_coordinates(
x: ArrayLike, y: ArrayLike, z: ArrayLike
) -> (NDArray, NDArray, NDArray):
"""Convert cartesian to spherical coordinates."""
xy, phi = polar_coordinates(x, y)
radius = (x**2 + y**2 + z**2) ** 0.5
theta = np.arccos(z / radius)
return radius, phi, theta
def selector_radial_cylindrical(
atoms: CoordinateFrame,
r_min: float,
r_max: float,
origin: Optional[ArrayLike] = None,
) -> NDArray:
box = atoms.box
atoms = atoms % np.diag(box)
if origin is None:
origin = [box[0, 0] / 2, box[1, 1] / 2, box[2, 2] / 2]
r_vec = (atoms - origin)[:, :2]
r = np.linalg.norm(r_vec, axis=1)
index = np.argwhere((r >= r_min) * (r < r_max))
return index.flatten()
def map_coordinates(
func: Callable[[CoordinateFrame, ...], NDArray]
) -> Callable[..., CoordinatesMap]:
@wraps(func)
def wrapped(coordinates, **kwargs):
def wrapped(coordinates: Coordinates, **kwargs) -> CoordinatesMap:
return CoordinatesMap(coordinates, partial(func, **kwargs))
return wrapped
@map_coordinates
def centers_of_mass(c, *, masses=None):
"""
A- 1
B- 2
A- 1
C 3
A-
B-
A-
C
A-
B-
A-
C
Example:
rd = XTCReader('t.xtc')
coordinates = Coordinates(rd)
com = centers_of_mass(coordinates, (1.0, 2.0, 1.0, 3.0))
"""
# At first, regroup our array
number_of_masses = len(masses)
number_of_coordinates, number_of_dimensions = c.shape
number_of_new_coordinates = number_of_coordinates // number_of_masses
grouped_masses = c.reshape(
number_of_new_coordinates, number_of_masses, number_of_dimensions
)
return np.average(grouped_masses, axis=1, weights=masses)
def center_of_masses(
frame: CoordinateFrame, atom_indices=None, shear: bool = False
) -> NDArray:
if atom_indices is None:
atom_indices = list(range(len(frame)))
res_ids = frame.residue_ids[atom_indices]
masses = frame.masses[atom_indices]
if shear:
coords = frame[atom_indices]
box = frame.box
sort_ind = res_ids.argsort(kind="stable")
i = np.concatenate([[0], np.where(np.diff(res_ids[sort_ind]) > 0)[0] + 1])
coms = coords[sort_ind[i]][res_ids - min(res_ids)]
cor = pbc_diff(coords, coms, box)
coords = coms + cor
else:
coords = frame.whole[atom_indices]
mask = np.bincount(res_ids)[1:] != 0
positions = np.array(
[
np.bincount(res_ids, weights=c * masses)[1:]
/ np.bincount(res_ids, weights=masses)[1:]
for c in coords.T
]
).T[mask]
return np.array(positions)
@map_coordinates
def pore_coordinates(coordinates, origin, sym_axis="z"):
def pore_coordinates(
frame: CoordinateFrame, origin: ArrayLike, sym_axis: str = "z"
) -> NDArray:
"""
Map coordinates of a pore simulation so the pore has cylindrical symmetry.
Args:
coordinates: Coordinates of the simulation
frame: Coordinates of the simulation
origin: Origin of the pore which will be the coordinates origin after mapping
sym_axis (opt.): Symmtery axis of the pore, may be a literal direction
'x', 'y' or 'z' or an array of shape (3,)
@@ -547,30 +454,34 @@ def pore_coordinates(coordinates, origin, sym_axis="z"):
rot_axis[["x", "y", "z"].index(sym_axis)] = 1
else:
rot_axis = sym_axis
return rotate_axis(coordinates - origin, rot_axis)
return rotate_axis(frame - origin, rot_axis)
@map_coordinates
def vectors(coordinates, atoms_a, atoms_b, normed=False):
def vectors(
frame: CoordinateFrame,
atom_indices_a: ArrayLike,
atom_indices_b: ArrayLike,
normed: bool = False,
) -> NDArray:
"""
Compute the vectors between the atoms of two subsets.
Args:
coordinates: The Coordinates object the atoms will be taken from
atoms_a: Mask or indices of the first atom subset
atoms_b: Mask or indices of the second atom subset
frame: The Coordinates object the atoms will be taken from
atom_indices_a: Mask or indices of the first atom subset
atom_indices_b: Mask or indices of the second atom subset
normed (opt.): If the vectors should be normed
box (opt.): If not None, the vectors are calcualte with PBC
The defintion of atoms_a/b can be any possible subript of a numpy array.
The definition of atoms_a/b can be any possible subript of a numpy array.
They can, for example, be given as a masking array of bool values with the
same length as the frames of the coordinates. Or they can be a list of
indices selecting the atoms of these indices from each frame.
same length as the frames of the coordinates.
Or there can be a list of indices selecting the atoms of these indices from each
frame.
It is possible to compute the mean of several atoms before calculating the vectors,
It is possible to compute the means of several atoms before calculating the vectors,
by using a two-dimensional list of indices. The following code computes the vectors
between atoms 0, 3, 6 and the mean coordinate of atoms 1, 4, 7 and 2, 5, 8::
between atoms 0, 3, 6 and the mean coordinate of atoms 1, 4, 7 and 2, 5, 8:
>>> inds_a = [0, 3, 6]
>>> inds_b = [[1, 4, 7], [2, 5, 8]]
@@ -581,14 +492,213 @@ def vectors(coordinates, atoms_a, atoms_b, normed=False):
coords[6] - (coords[7] + coords[8])/2,
])
"""
box = coordinates.box
coords_a = coordinates[atoms_a]
box = frame.box
coords_a = frame[atom_indices_a]
if len(coords_a.shape) > 2:
coords_a = coords_a.mean(axis=0)
coords_b = coordinates[atoms_b]
coords_b = frame[atom_indices_b]
if len(coords_b.shape) > 2:
coords_b = coords_b.mean(axis=0)
vectors = pbc_diff(coords_a, coords_b, box=box)
norm = np.linalg.norm(vectors, axis=-1).reshape(-1, 1) if normed else 1
vectors.reference = coords_a
return vectors / norm
vec = pbc_diff(coords_a, coords_b, box=box)
if normed:
vec /= np.linalg.norm(vec, axis=-1).reshape(-1, 1)
vec.reference = coords_a
return vec
@map_coordinates
def dipole_vector(
frame: CoordinateFrame, atom_indices: ArrayLike, normed: bool = None
) -> NDArray:
coords = frame.whole[atom_indices]
res_ids = frame.residue_ids[atom_indices]
charges = frame.charges[atom_indices]
mask = np.bincount(res_ids)[1:] != 0
dipoles = np.array(
[np.bincount(res_ids, weights=c * charges)[1:] for c in coords.T]
).T[mask]
dipoles = np.array(dipoles)
if normed:
dipoles /= np.linalg.norm(dipoles, axis=-1).reshape(-1, 1)
return dipoles
@map_coordinates
def sum_dipole_vector(
coordinates: CoordinateFrame,
atom_indices: ArrayLike,
normed: bool = True,
) -> NDArray:
coords = coordinates.whole[atom_indices]
charges = coordinates.charges[atom_indices]
dipole = np.array([c * charges for c in coords.T]).T
if normed:
dipole /= np.linalg.norm(dipole)
return dipole
@map_coordinates
def normal_vectors(
frame: CoordinateFrame,
atom_indices_a: ArrayLike,
atom_indices_b: ArrayLike,
atom_indices_c: ArrayLike,
normed: bool = True,
) -> NDArray:
coords_a = frame[atom_indices_a]
coords_b = frame[atom_indices_b]
coords_c = frame[atom_indices_c]
box = frame.box
vectors_a = pbc_diff(coords_a, coords_b, box=box)
vectors_b = pbc_diff(coords_a, coords_c, box=box)
vec = np.cross(vectors_a, vectors_b)
if normed:
vec /= np.linalg.norm(vec, axis=-1).reshape(-1, 1)
return vec
def displacements_without_drift(
start_frame: CoordinateFrame, end_frame: CoordinateFrame, trajectory: Coordinates
) -> np.array:
start_all = trajectory[start_frame.step]
frame_all = trajectory[end_frame.step]
displacements = (
start_frame
- end_frame
- (np.average(start_all, axis=0) - np.average(frame_all, axis=0))
)
return displacements
@map_coordinates
def cylindrical_coordinates(
frame: CoordinateFrame, origin: ArrayLike = None
) -> NDArray:
if origin is None:
origin = np.diag(frame.box) / 2
x = frame[:, 0] - origin[0]
y = frame[:, 1] - origin[1]
z = frame[:, 2]
radius = (x**2 + y**2) ** 0.5
phi = np.arctan2(y, x)
return np.array([radius, phi, z]).T
def layer_of_atoms(
atoms: CoordinateFrame,
thickness: float,
plane_normal: ArrayLike,
plane_offset: Optional[ArrayLike] = np.array([0, 0, 0]),
) -> np.array:
if plane_offset is None:
np.array([0, 0, 0])
atoms = atoms - plane_offset
distance = np.dot(atoms, plane_normal)
return np.abs(distance) <= thickness
def next_neighbors(
atoms: CoordinateFrame,
query_atoms: Optional[CoordinateFrame] = None,
number_of_neighbors: int = 1,
distance_upper_bound: float = np.inf,
distinct: bool = False,
**kwargs
) -> Tuple[List, List]:
"""
Find the N next neighbors of a set of atoms.
Args:
atoms:
The reference atoms and also the atoms which are queried if `query_atoms`
is net provided
query_atoms (opt.): If this is not None, these atoms will be queried
number_of_neighbors (int, opt.): Number of neighboring atoms to find
distance_upper_bound (float, opt.):
Upper bound of the distance between neighbors
distinct (bool, opt.):
If this is true, the atoms and query atoms are taken as distinct sets of
atoms
"""
dnn = 0
if query_atoms is None:
query_atoms = atoms
dnn = 1
elif not distinct:
dnn = 1
box = atoms.box
if np.all(np.diag(np.diag(box)) == box):
atoms = atoms % np.diag(box)
tree = KDTree(atoms, boxsize=np.diag(box))
distances, indices = tree.query(
query_atoms,
number_of_neighbors + dnn,
distance_upper_bound=distance_upper_bound,
)
distances = distances[:, dnn:]
indices = indices[:, dnn:]
distances_new = []
indices_new = []
for dist, ind in zip(distances, indices):
distances_new.append(dist[dist <= distance_upper_bound])
indices_new.append(ind[dist <= distance_upper_bound])
return distances_new, indices_new
else:
atoms_pbc, atoms_pbc_index = pbc_points(
query_atoms, box, thickness=distance_upper_bound + 0.1, index=True, **kwargs
)
tree = KDTree(atoms_pbc)
distances, indices = tree.query(
query_atoms,
number_of_neighbors + dnn,
distance_upper_bound=distance_upper_bound,
)
distances = distances[:, dnn:]
indices = indices[:, dnn:]
distances_new = []
indices_new = []
for dist, ind in zip(distances, indices):
distances_new.append(dist[dist <= distance_upper_bound])
indices_new.append(atoms_pbc_index[ind[dist <= distance_upper_bound]])
return distances_new, indices_new
def number_of_neighbors(
atoms: CoordinateFrame,
query_atoms: Optional[CoordinateFrame] = None,
r_max: float = 1,
distinct: bool = False,
**kwargs
) -> Tuple[List, List]:
"""
Find the N next neighbors of a set of atoms.
Args:
atoms:
The reference atoms and also the atoms which are queried if `query_atoms`
is net provided
query_atoms (opt.): If this is not None, these atoms will be queried
r_max (float, opt.):
Upper bound of the distance between neighbors
distinct (bool, opt.):
If this is true, the atoms and query atoms are taken as distinct sets of
atoms
"""
dnn = 0
if query_atoms is None:
query_atoms = atoms
dnn = 1
elif not distinct:
dnn = 1
box = atoms.box
if np.all(np.diag(np.diag(box)) == box):
atoms = atoms % np.diag(box)
tree = KDTree(atoms, boxsize=np.diag(box))
else:
atoms_pbc = pbc_points(query_atoms, box, thickness=r_max + 0.1, **kwargs)
tree = KDTree(atoms_pbc)
num_of_neighbors = tree.query_ball_point(query_atoms, r_max, return_length=True)
return num_of_neighbors - dnn

View File

@@ -1,111 +1,34 @@
from typing import Callable, Optional
import numpy as np
from numpy.typing import ArrayLike
from scipy.special import legendre
from itertools import chain
import dask.array as darray
from pathos.pools import ProcessPool
from functools import partial
from scipy.spatial import KDTree
from .autosave import autosave_data
from .utils import filon_fourier_transformation, coherent_sum, histogram
from .pbc import pbc_diff
from .utils import coherent_sum
from .pbc import pbc_diff, pbc_points
from .coordinates import Coordinates, CoordinateFrame, displacements_without_drift
def set_has_counter(func):
func.has_counter = True
return func
def log_indices(first, last, num=100):
def log_indices(first: int, last: int, num: int = 100) -> np.ndarray:
ls = np.logspace(0, np.log10(last - first + 1), num=num)
return np.unique(np.int_(ls) - 1 + first)
def correlation(function, frames):
iterator = iter(frames)
start_frame = next(iterator)
return map(lambda f: function(start_frame, f), chain([start_frame], iterator))
def subensemble_correlation(selector_function, correlation_function=correlation):
def c(function, frames):
iterator = iter(frames)
start_frame = next(iterator)
selector = selector_function(start_frame)
subensemble = map(lambda f: f[selector], chain([start_frame], iterator))
return correlation_function(function, subensemble)
return c
def multi_subensemble_correlation(selector_function):
"""
selector_function has to expect a frame and to
return either valid indices (as with subensemble_correlation)
or a multidimensional array whose entries are valid indices
e.g. slice(10,100,2)
e.g. [1,2,3,4,5]
e.g. [[[0,1],[2],[3]],[[4],[5],[6]] -> shape: 2,3 with
list of indices of varying length
e.g. [slice(1653),slice(1653,None,3)]
e.g. [np.ones(len_of_frames, bool)]
in general using slices is the most efficient.
if the selections are small subsets of a frame or when many subsets are empty
using indices will be more efficient than using masks.
"""
@set_has_counter
def cmulti(function, frames):
iterator = iter(frames)
start_frame = next(iterator)
selectors = np.asarray(selector_function(start_frame))
sel_shape = selectors.shape
if sel_shape[-1] == 0:
selectors = np.asarray(selectors, int)
if selectors.dtype != object:
sel_shape = sel_shape[:-1]
f_values = np.zeros(
sel_shape + function(start_frame, start_frame).shape,
)
count = np.zeros(sel_shape, dtype=int)
is_first_frame_loop = True
def cc(act_frame):
nonlocal is_first_frame_loop
for index in np.ndindex(sel_shape):
sel = selectors[index]
sf_sel = start_frame[sel]
if is_first_frame_loop:
count[index] = len(sf_sel)
f_values[index] = (
function(sf_sel, act_frame[sel]) if count[index] != 0 else 0
)
is_first_frame_loop = False
return np.asarray(f_values.copy())
return map(cc, chain([start_frame], iterator)), count
return cmulti
@autosave_data(2)
def shifted_correlation(
function,
frames,
selector=None,
multi_selector=None,
segments=10,
skip=0.1,
window=0.5,
average=True,
points=100,
nodes=1,
):
function: Callable,
frames: Coordinates,
selector: Optional[Callable] = None,
segments: int = 10,
skip: float = 0.1,
window: float = 0.5,
average: bool = True,
points: int = 100,
) -> (np.ndarray, np.ndarray):
"""
Calculate the time series for a correlation function.
@@ -113,18 +36,12 @@ def shifted_correlation(
a logarithmic distribution.
Args:
function: The function that should be correlated
frames: The coordinates of the simulation data
function: The function that should be correlated
frames: The coordinates of the simulation data
selector (opt.):
A function that returns the indices depending on
the staring frame for which particles the
correlation should be calculated.
Can not be used with multi_selector.
multi_selector (opt.):
A function that returns multiple lists of indices depending on
the staring frame for which particles the
correlation should be calculated.
Can not be used with selector.
segments (int, opt.):
The number of segments the time window will be
shifted
@@ -141,9 +58,6 @@ def shifted_correlation(
points (int, opt.):
The number of timeshifts for which the correlation
should be calculated
nodes (int, opt.):
Number of nodes used for parallelization
Returns:
tuple:
A list of length N that contains the timeshiftes of the frames at which
@@ -151,12 +65,18 @@ def shifted_correlation(
that holds the (non-avaraged) correlation data
Example:
Calculating the mean square displacement of a coordinates object named ``coords``:
Calculating the mean square displacement of a coordinate object
named ``coords``:
>>> time, data = shifted_correlation(msd, coords)
"""
def get_correlation(frames, start_frame, index, shifted_idx):
def get_correlation(
frames: CoordinateFrame,
start_frame: CoordinateFrame,
index: np.ndarray,
shifted_idx: np.ndarray,
) -> np.ndarray:
if len(index) == 0:
correlation = np.zeros(len(shifted_idx))
else:
@@ -166,29 +86,50 @@ def shifted_correlation(
)
return correlation
def apply_selector(start_frame, frames, idx, selector=None, multi_selector=None):
def apply_selector(
start_frame: CoordinateFrame,
frames: CoordinateFrame,
idx: np.ndarray,
selector: Optional[Callable] = None,
):
shifted_idx = idx + start_frame
if selector is None and multi_selector is None:
if selector is None:
index = np.arange(len(frames[start_frame]))
return get_correlation(frames, start_frame, index, shifted_idx)
elif selector is not None and multi_selector is not None:
raise ValueError(
"selector and multi_selector can not be used at the same time"
)
elif selector is not None:
else:
index = selector(frames[start_frame])
return get_correlation(frames, start_frame, index, shifted_idx)
if len(index) == 0:
return np.zeros(len(shifted_idx))
elif multi_selector is not None:
indices = multi_selector(frames[start_frame])
correlation = []
for index in indices:
correlation.append(
get_correlation(frames, start_frame, index, shifted_idx)
)
return correlation
elif (
isinstance(index[0], int)
or isinstance(index[0], bool)
or isinstance(index[0], np.integer)
or isinstance(index[0], np.bool_)
):
return get_correlation(frames, start_frame, index, shifted_idx)
else:
correlations = []
for ind in index:
if len(ind) == 0:
correlations.append(np.zeros(len(shifted_idx)))
elif (
isinstance(ind[0], int)
or isinstance(ind[0], bool)
or isinstance(ind[0], np.integer)
or isinstance(ind[0], np.bool_)
):
correlations.append(
get_correlation(frames, start_frame, ind, shifted_idx)
)
else:
raise ValueError(
"selector has more than two dimensions or does not "
"contain int or bool types"
)
return correlations
if 1 - skip < window:
window = 1 - skip
@@ -208,40 +149,12 @@ def shifted_correlation(
idx = np.unique(np.int_(ls) - 1)
t = np.array([frames[i].time for i in idx]) - frames[0].time
if nodes == 1:
result = np.array(
[
apply_selector(
start_frame,
frames=frames,
idx=idx,
selector=selector,
multi_selector=multi_selector,
)
for start_frame in start_frames
]
)
else:
pool = ProcessPool(nodes=nodes)
# Use try finally instead of a context manager to ensure the pool is
# restarted in case of working in a jupyter-notebook,
# otherwise the kernel has to be restarted.
try:
result = np.array(
pool.map(
partial(
apply_selector,
frames=frames,
idx=idx,
selector=selector,
multi_selector=multi_selector,
),
start_frames,
)
)
finally:
pool.terminate()
pool.restart()
result = np.array(
[
apply_selector(start_frame, frames=frames, idx=idx, selector=selector)
for start_frame in start_frames
]
)
if average:
clean_result = []
@@ -255,58 +168,121 @@ def shifted_correlation(
return t, result
def msd(start, frame):
def msd(
start_frame: CoordinateFrame,
end_frame: CoordinateFrame,
trajectory: Coordinates = None,
axis: str = "all",
) -> float:
"""
Mean square displacement
"""
vec = start - frame
return (vec**2).sum(axis=1).mean()
if trajectory is None:
displacements = start_frame - end_frame
else:
displacements = displacements_without_drift(start_frame, end_frame, trajectory)
if axis == "all":
return (displacements**2).sum(axis=1).mean()
elif axis == "x":
return (displacements[:, 0] ** 2).mean()
elif axis == "y":
return (displacements[:, 1] ** 2).mean()
elif axis == "z":
return (displacements[:, 2] ** 2).mean()
else:
raise ValueError('Parameter axis has to be ether "all", "x", "y", or "z"!')
def isf(start, frame, q, box=None):
def isf(
start_frame: CoordinateFrame,
end_frame: CoordinateFrame,
q: float = 22.7,
trajectory: Coordinates = None,
axis: str = "all",
) -> float:
"""
Incoherent intermediate scattering function. To specify q, use
water_isf = functools.partial(isf, q=22.77) # q has the value 22.77 nm^-1
:param q: length of scattering vector
"""
vec = start - frame
distance = (vec**2).sum(axis=1) ** 0.5
return np.sinc(distance * q / np.pi).mean()
if trajectory is None:
displacements = start_frame - end_frame
else:
displacements = displacements_without_drift(start_frame, end_frame, trajectory)
if axis == "all":
distance = (displacements**2).sum(axis=1) ** 0.5
return np.sinc(distance * q / np.pi).mean()
elif axis == "x":
distance = np.abs(displacements[:, 0])
return np.mean(np.cos(np.abs(q * distance)))
elif axis == "y":
distance = np.abs(displacements[:, 1])
return np.mean(np.cos(np.abs(q * distance)))
elif axis == "z":
distance = np.abs(displacements[:, 2])
return np.mean(np.cos(np.abs(q * distance)))
else:
raise ValueError('Parameter axis has to be ether "all", "x", "y", or "z"!')
def rotational_autocorrelation(onset, frame, order=2):
def rotational_autocorrelation(
start_frame: CoordinateFrame, end_frame: CoordinateFrame, order: int = 2
) -> float:
"""
Compute the rotational autocorrelation of the legendre polynomial for the
given vectors.
Args:
onset, frame: CoordinateFrames of vectors
start_frame, end_frame: CoordinateFrames of vectors
order (opt.): Order of the legendre polynomial.
Returns:
Scalar value of the correlation function.
"""
scalar_prod = (onset * frame).sum(axis=-1)
scalar_prod = (start_frame * end_frame).sum(axis=-1)
poly = legendre(order)
return poly(scalar_prod).mean()
def van_hove_self(start, end, bins):
def van_hove_self(
start_frame: CoordinateFrame,
end_frame: CoordinateFrame,
bins: ArrayLike,
trajectory: Coordinates = None,
axis: str = "all",
) -> np.ndarray:
r"""
Compute the self part of the Van Hove autocorrelation function.
Compute the self-part of the Van Hove autocorrelation function.
..math::
G(r, t) = \sum_i \delta(|\vec r_i(0) - \vec r_i(t)| - r)
"""
vec = start - end
delta_r = ((vec) ** 2).sum(axis=-1) ** 0.5
return 1 / len(start) * histogram(delta_r, bins)[0]
if trajectory is None:
vectors = start_frame - end_frame
else:
vectors = displacements_without_drift(start_frame, end_frame, trajectory)
if axis == "all":
delta_r = (vectors**2).sum(axis=1) ** 0.5
elif axis == "x":
delta_r = np.abs(vectors[:, 0])
elif axis == "y":
delta_r = np.abs(vectors[:, 1])
elif axis == "z":
delta_r = np.abs(vectors[:, 2])
else:
raise ValueError('Parameter axis has to be ether "all", "x", "y", or "z"!')
hist = np.histogram(delta_r, bins, range=(bins[0], bins[-1]))[0]
hist = hist / (bins[1:] - bins[:-1])
return hist / len(start_frame)
def van_hove_distinct(
onset, frame, bins, box=None, use_dask=True, comp=False, bincount=True
):
start_frame: CoordinateFrame,
end_frame: CoordinateFrame,
bins: ArrayLike,
box: ArrayLike = None,
use_dask: bool = True,
comp: bool = False,
) -> np.ndarray:
r"""
Compute the distinct part of the Van Hove autocorrelation function.
@@ -314,17 +290,19 @@ def van_hove_distinct(
G(r, t) = \sum_{i, j} \delta(|\vec r_i(0) - \vec r_j(t)| - r)
"""
if box is None:
box = onset.box.diagonal()
box = start_frame.box.diagonal()
dimension = len(box)
N = len(onset)
N = len(start_frame)
if use_dask:
onset = darray.from_array(onset, chunks=(500, dimension)).reshape(
start_frame = darray.from_array(start_frame, chunks=(500, dimension)).reshape(
1, N, dimension
)
frame = darray.from_array(frame, chunks=(500, dimension)).reshape(
end_frame = darray.from_array(end_frame, chunks=(500, dimension)).reshape(
N, 1, dimension
)
dist = ((pbc_diff(onset, frame, box) ** 2).sum(axis=-1) ** 0.5).ravel()
dist = (
(pbc_diff(start_frame, end_frame, box) ** 2).sum(axis=-1) ** 0.5
).ravel()
if np.diff(bins).std() < 1e6:
dx = bins[0] - bins[1]
hist = darray.bincount((dist // dx).astype(int), minlength=(len(bins) - 1))
@@ -337,32 +315,40 @@ def van_hove_distinct(
minlength = len(bins) - 1
def f(x):
d = (pbc_diff(x, frame, box) ** 2).sum(axis=-1) ** 0.5
d = (pbc_diff(x, end_frame, box) ** 2).sum(axis=-1) ** 0.5
return np.bincount((d // dx).astype(int), minlength=minlength)[
:minlength
]
hist = sum(f(x) for x in onset)
hist = sum(f(x) for x in start_frame)
else:
dist = (
pbc_diff(onset.reshape(1, -1, 3), frame.reshape(-1, 1, 3), box) ** 2
pbc_diff(
start_frame.reshape(1, -1, 3), end_frame.reshape(-1, 1, 3), box
)
** 2
).sum(axis=-1) ** 0.5
hist = histogram(dist, bins=bins)[0]
hist = np.histogram(dist, bins=bins)[0]
return hist / N
def overlap(onset, frame, crds_tree, radius):
def overlap(
start_frame: CoordinateFrame,
end_frame: CoordinateFrame,
radius: float = 0.1,
mode: str = "self",
) -> float:
"""
Compute the overlap with a reference configuration defined in a CoordinatesTree.
Args:
onset: Initial frame, this is only used to get the frame index
frame: The current configuration
crds_tree: A CoordinatesTree of the reference configurations
start_frame: Initial frame, this is only used to get the frame index
end_frame: The current configuration
radius: The cutoff radius for the overlap
mode: Select between "indifferent", "self" or "distict" part of the overlap
This function is intended to be used with :func:`shifted_correlation`.
As usual the first two arguments are used internally and the remaining ones
As usual, the first two arguments are used internally, and the remaining ones
should be defined with :func:`functools.partial`.
If the overlap of a subset of the system should be calculated, this has to be
@@ -374,31 +360,30 @@ def overlap(onset, frame, crds_tree, radius):
... traj
... )
"""
tree = crds_tree[onset.step]
return (tree.query(frame)[0] <= radius).sum() / tree.n
def susceptibility(time, correlation, **kwargs):
"""
Calculate the susceptibility of a correlation function.
Args:
time: Timesteps of the correlation data
correlation: Value of the correlation function
**kwargs (opt.):
Additional keyword arguments will be passed to :func:`filon_fourier_transformation`.
"""
frequencies, fourier = filon_fourier_transformation(
time, correlation, imag=False, **kwargs
start_PBC, index_PBC = pbc_points(
start_frame, start_frame.box, index=True, thickness=2 * radius
)
return frequencies, frequencies * fourier
start_tree = KDTree(start_PBC)
dist, index_dist = start_tree.query(end_frame, 1, distance_upper_bound=radius)
if mode == "indifferent":
return np.sum(dist <= radius) / len(start_frame)
index_dist = index_PBC[index_dist]
index = np.arange(len(start_frame))
index = index[dist <= radius]
index_dist = index_dist[dist <= radius]
if mode == "self":
return np.sum(index == index_dist) / len(start_frame)
elif mode == "distinct":
return np.sum(index != index_dist) / len(start_frame)
def coherent_scattering_function(onset, frame, q):
def coherent_scattering_function(
start_frame: CoordinateFrame, end_frame: CoordinateFrame, q: float
) -> np.ndarray:
"""
Calculate the coherent scattering function.
"""
box = onset.box.diagonal()
box = start_frame.box.diagonal()
dimension = len(box)
def scfunc(x, y):
@@ -416,14 +401,38 @@ def coherent_scattering_function(onset, frame, q):
else:
return np.sin(x) / x
return coherent_sum(scfunc, onset.pbc, frame.pbc) / len(onset)
return coherent_sum(scfunc, start_frame.pbc, end_frame.pbc) / len(start_frame)
def non_gaussian(onset, frame):
def non_gaussian_parameter(
start_frame: CoordinateFrame,
end_frame: CoordinateFrame,
trajectory: Coordinates = None,
axis: str = "all",
) -> float:
"""
Calculate the Non-Gaussian parameter :
Calculate the non-Gaussian parameter.
..math:
\alpha_2 (t) = \frac{3}{5}\frac{\langle r_i^4(t)\rangle}{\langle r_i^2(t)\rangle^2} - 1
\alpha_2 (t) =
\frac{3}{5}\frac{\langle r_i^4(t)\rangle}{\langle r_i^2(t)\rangle^2} - 1
"""
r_2 = ((frame - onset) ** 2).sum(axis=-1)
return 3 / 5 * (r_2**2).mean() / r_2.mean() ** 2 - 1
if trajectory is None:
vectors = start_frame - end_frame
else:
vectors = displacements_without_drift(start_frame, end_frame, trajectory)
if axis == "all":
r = (vectors**2).sum(axis=1)
dimensions = 3
elif axis == "x":
r = vectors[:, 0] ** 2
dimensions = 1
elif axis == "y":
r = vectors[:, 1] ** 2
dimensions = 1
elif axis == "z":
r = vectors[:, 2] ** 2
dimensions = 1
else:
raise ValueError('Parameter axis has to be ether "all", "x", "y", or "z"!')
return (np.mean(r**2) / ((1 + 2 / dimensions) * (np.mean(r) ** 2))) - 1

View File

@@ -1,93 +1,101 @@
from typing import Callable, Optional, Union, Tuple, List
import numpy as np
from .coordinates import rotate_axis, polar_coordinates, spherical_coordinates
from .atoms import next_neighbors
from .autosave import autosave_data
from .utils import runningmean
from .pbc import pbc_diff, pbc_points
from .logging import logger
from numpy.typing import ArrayLike, NDArray
from scipy import spatial
from scipy.spatial import KDTree
from scipy.sparse.csgraph import connected_components
from .coordinates import (
rotate_axis,
polar_coordinates,
Coordinates,
CoordinateFrame,
next_neighbors,
number_of_neighbors,
)
from .autosave import autosave_data
from .pbc import pbc_diff, pbc_points
@autosave_data(nargs=2, kwargs_keys=("coordinates_b",), version="time_average-1")
def time_average(function, coordinates, coordinates_b=None, pool=None):
@autosave_data(nargs=2, kwargs_keys=("coordinates_b",))
def time_average(
function: Callable,
coordinates: Coordinates,
coordinates_b: Optional[Coordinates] = None,
skip: float = 0.1,
segments: int = 100,
) -> NDArray:
"""
Compute the time average of a function.
Args:
function:
The function that will be averaged, it has to accept exactly one argument
which is the current atom set
which is the current atom set (or two if coordinates_b is provided)
coordinates: The coordinates object of the simulation
pool (multiprocessing.Pool, opt.):
A multiprocessing pool which will be used for cocurrent calculation of the
averaged function
coordinates_b: Additional coordinates object of the simulation
skip:
segments:
"""
if pool is not None:
_map = pool.imap
frame_indices = np.unique(
np.int_(
np.linspace(len(coordinates) * skip, len(coordinates) - 1, num=segments)
)
)
if coordinates_b is None:
result = [function(coordinates[frame_index]) for frame_index in frame_indices]
else:
_map = map
result = [
function(coordinates[frame_index], coordinates_b[frame_index])
for frame_index in frame_indices
]
return np.mean(result, axis=0)
number_of_averages = 0
result = 0
if coordinates_b is not None:
if coordinates._slice != coordinates_b._slice:
logger.warning("Different slice for coordinates and coordinates_b.")
coordinate_iter = (iter(coordinates), iter(coordinates_b))
@autosave_data(nargs=2, kwargs_keys=("coordinates_b",))
def time_distribution(
function: Callable,
coordinates: Coordinates,
coordinates_b: Optional[Coordinates] = None,
skip: float = 0,
segments: int = 100,
) -> Tuple[NDArray, List]:
"""
Compute the time distribution of a function.
Args:
function:
The function that will be averaged, it has to accept exactly one argument
which is the current atom set (or two if coordinates_b is provided)
coordinates: The coordinates object of the simulation
coordinates_b: Additional coordinates object of the simulation
skip:
segments:
"""
frame_indices = np.unique(
np.int_(
np.linspace(len(coordinates) * skip, len(coordinates) - 1, num=segments)
)
)
times = np.array([coordinates[frame_index].time for frame_index in frame_indices])
if coordinates_b is None:
result = [function(coordinates[frame_index]) for frame_index in frame_indices]
else:
coordinate_iter = (iter(coordinates),)
evaluated = _map(function, *coordinate_iter)
for ev in evaluated:
number_of_averages += 1
result += ev
if number_of_averages % 100 == 0:
logger.debug("time_average: %d", number_of_averages)
return result / number_of_averages
def time_histogram(function, coordinates, bins, hist_range, pool=None):
coordinate_iter = iter(coordinates)
if pool is not None:
_map = pool.imap
else:
_map = map
evaluated = _map(function, coordinate_iter)
results = []
hist_results = []
for num, ev in enumerate(evaluated):
results.append(ev)
if num % 100 == 0 and num > 0:
print(num)
r = np.array(results).T
for i, row in enumerate(r):
histo, _ = np.histogram(row, bins=bins, range=hist_range)
if len(hist_results) <= i:
hist_results.append(histo)
else:
hist_results[i] += histo
results = []
return hist_results
result = [
function(coordinates[frame_index], coordinates_b[frame_index])
for frame_index in frame_indices
]
return times, result
def rdf(
atoms_a,
atoms_b=None,
bins=None,
box=None,
kind=None,
chunksize=50000,
returnx=False,
atoms_a: CoordinateFrame,
atoms_b: Optional[CoordinateFrame] = None,
bins: Optional[ArrayLike] = None,
remove_intra: bool = False,
**kwargs
):
) -> NDArray:
r"""
Compute the radial pair distribution of one or two sets of atoms.
@@ -95,189 +103,76 @@ def rdf(
g_{AB}(r) = \frac{1}{\langle \rho_B\rangle N_A}\sum\limits_{i\in A}^{N_A}
\sum\limits_{j\in B}^{N_B}\frac{\delta(r_{ij} -r)}{4\pi r^2}
For use with :func:`time_average`, define bins through the use of :func:`~functools.partial`,
the atom sets are passed to :func:`time_average`, if a second set of atoms should be used
specify it as ``coordinates_b`` and it will be passed to this function.
For use with :func:`time_average`, define bins through the use of
:func:`~functools.partial`, the atom sets are passed to :func:`time_average`, if a
second set of atoms should be used specify it as ``coordinates_b`` and it will be
passed to this function.
Args:
atoms_a: First set of atoms, used internally
atoms_b (opt.): Second set of atoms, used internally
atoms_b (opt.): Second set of atoms, used internal
bins: Bins of the radial distribution function
box (opt.): Simulations box, if not specified this is taken from ``atoms_a.box``
kind (opt.): Can be 'inter', 'intra' or None (default).
chunksize (opt.):
For large systems (N > 1000) the distaces have to be computed in chunks so the arrays
fit into memory, this parameter controlls the size of these chunks. It should be
as large as possible, depending on the available memory.
returnx (opt.): If True the x ordinate of the histogram is returned.
remove_intra: removes contributions from intra molecular pairs
"""
assert bins is not None, "Bins of the pair distribution have to be defined."
assert kind in [
"intra",
"inter",
None,
], "Argument kind must be one of the following: intra, inter, None."
if box is None:
box = atoms_a.box.diagonal()
distinct = True
if atoms_b is None:
atoms_b = atoms_a
nr_of_atoms = len(atoms_a)
indices = np.triu_indices(nr_of_atoms, k=1)
else:
nr_a, dim = atoms_a.shape
nr_b, dim = atoms_b.shape
indices = np.array([(i, j) for i in range(nr_a) for j in range(nr_b)]).T
distinct = False
elif np.array_equal(atoms_a, atoms_b):
distinct = False
if bins is None:
bins = np.arange(0, 1, 0.01)
# compute the histogram in chunks for large systems
hist = 0
nr_of_samples = 0
for chunk in range(0, len(indices[0]), chunksize):
sl = slice(chunk, chunk + chunksize)
diff = pbc_diff(atoms_a[indices[0][sl]], atoms_b[indices[1][sl]], box)
dist = (diff**2).sum(axis=1) ** 0.5
if kind == "intra":
mask = (
atoms_a.residue_ids[indices[0][sl]]
== atoms_b.residue_ids[indices[1][sl]]
particles_in_volume = int(
np.max(number_of_neighbors(atoms_a, query_atoms=atoms_b, r_max=bins[-1])) * 1.1
)
distances, indices = next_neighbors(
atoms_a,
atoms_b,
number_of_neighbors=particles_in_volume,
distance_upper_bound=bins[-1],
distinct=distinct,
**kwargs
)
if remove_intra:
new_distances = []
for entry in list(zip(atoms_a.residue_ids, distances, indices)):
mask = entry[1] < np.inf
new_distances.append(
entry[1][mask][atoms_b.residue_ids[entry[2][mask]] != entry[0]]
)
dist = dist[mask]
elif kind == "inter":
mask = (
atoms_a.residue_ids[indices[0][sl]]
!= atoms_b.residue_ids[indices[1][sl]]
)
dist = dist[mask]
nr_of_samples += len(dist)
hist += np.histogram(dist, bins)[0]
volume = 4 / 3 * np.pi * (bins[1:] ** 3 - bins[:-1] ** 3)
density = nr_of_samples / np.prod(box)
res = hist / volume / density
if returnx:
return np.vstack((runningmean(bins, 2), res))
distances = np.concatenate(new_distances)
else:
return res
distances = [d for dist in distances for d in dist]
hist, bins = np.histogram(distances, bins=bins, range=(0, bins[-1]), density=False)
hist = hist / len(atoms_a)
hist = hist / (4 / 3 * np.pi * bins[1:] ** 3 - 4 / 3 * np.pi * bins[:-1] ** 3)
n = len(atoms_b) / np.prod(np.diag(atoms_b.box))
hist = hist / n
return hist
def pbc_tree_rdf(
atoms_a, atoms_b=None, bins=None, box=None, exclude=0, returnx=False, **kwargs
):
if box is None:
box = atoms_a.box.diagonal()
all_coords = pbc_points(atoms_b, box, thickness=np.amax(bins) + 0.1)
to_tree = spatial.cKDTree(all_coords)
dist = to_tree.query(
pbc_diff(atoms_a, box=box),
k=len(atoms_b),
distance_upper_bound=np.amax(bins) + 0.1,
)[0].flatten()
dist = dist[dist < np.inf]
hist = np.histogram(dist, bins)[0]
volume = 4 / 3 * np.pi * (bins[1:] ** 3 - bins[:-1] ** 3)
res = (hist) * np.prod(box) / volume / len(atoms_a) / (len(atoms_b) - exclude)
if returnx:
return np.vstack((runningmean(bins, 2), res))
else:
return res
def pbc_spm_rdf(
atoms_a, atoms_b=None, bins=None, box=None, exclude=0, returnx=False, **kwargs
):
if box is None:
box = atoms_a.box
all_coords = pbc_points(atoms_b, box, thickness=np.amax(bins) + 0.1)
to_tree = spatial.cKDTree(all_coords)
if all_coords.nbytes / 1024**3 * len(atoms_a) < 2:
from_tree = spatial.cKDTree(pbc_diff(atoms_a, box=box))
dist = to_tree.sparse_distance_matrix(
from_tree, max_distance=np.amax(bins) + 0.1, output_type="ndarray"
)
dist = np.asarray(dist.tolist())[:, 2]
hist = np.histogram(dist, bins)[0]
else:
chunksize = int(
2 * len(atoms_a) / (all_coords.nbytes / 1024**3 * len(atoms_a))
)
hist = 0
for chunk in range(0, len(atoms_a), chunksize):
sl = slice(chunk, chunk + chunksize)
from_tree = spatial.cKDTree(pbc_diff(atoms_a[sl], box=box))
dist = to_tree.sparse_distance_matrix(
from_tree, max_distance=np.amax(bins) + 0.1, output_type="ndarray"
)
dist = np.asarray(dist.tolist())[:, 2]
hist += np.histogram(dist, bins)[0]
volume = 4 / 3 * np.pi * (bins[1:] ** 3 - bins[:-1] ** 3)
res = (hist) * np.prod(box) / volume / len(atoms_a) / (len(atoms_b) - exclude)
if returnx:
return np.vstack((runningmean(bins, 2), res))
else:
return res
@autosave_data(nargs=2, kwargs_keys=("to_coords", "times"))
def fast_averaged_rdf(from_coords, bins, to_coords=None, times=10, exclude=0, **kwargs):
if to_coords is None:
to_coords = from_coords
exclude = 1
# first find timings for the different rdf functions
import time
# only consider sparse matrix for this condition
if (len(from_coords[0]) * len(to_coords[0]) <= 3000 * 2000) & (
len(from_coords[0]) / len(to_coords[0]) > 5
):
funcs = [rdf, pbc_tree_rdf, pbc_spm_rdf]
else:
funcs = [rdf, pbc_tree_rdf]
timings = []
for f in funcs:
start = time.time()
f(
from_coords[0],
atoms_b=to_coords[0],
bins=bins,
box=np.diag(from_coords[0].box),
)
end = time.time()
timings.append(end - start)
timings = np.array(timings)
timings[0] = (
2 * timings[0]
) # statistics for the other functions is twice as good per frame
logger.debug("rdf function timings: " + str(timings))
rdffunc = funcs[np.argmin(timings)]
logger.debug("rdf function used: " + str(rdffunc))
if rdffunc == rdf:
times = times * 2 # duplicate times for same statistics
frames = np.array(range(0, len(from_coords), int(len(from_coords) / times)))[:times]
out = np.zeros(len(bins) - 1)
for j, i in enumerate(frames):
logger.debug("multi_radial_pair_distribution: %d/%d", j, len(frames))
out += rdffunc(
from_coords[i],
to_coords[i],
bins,
box=np.diag(from_coords[i].box),
exclude=exclude,
)
return out / len(frames)
def distance_distribution(atoms, bins):
def distance_distribution(
atoms: CoordinateFrame, bins: Union[int, ArrayLike]
) -> NDArray:
connection_vectors = atoms[:-1, :] - atoms[1:, :]
connection_lengths = (connection_vectors**2).sum(axis=1) ** 0.5
return np.histogram(connection_lengths, bins)[0]
def tetrahedral_order(atoms, reference_atoms=None):
def tetrahedral_order(
atoms: CoordinateFrame, reference_atoms: CoordinateFrame = None
) -> NDArray:
if reference_atoms is None:
reference_atoms = atoms
indices = next_neighbors(reference_atoms, query_atoms=atoms, number_of_neighbors=4)
indices = next_neighbors(
reference_atoms,
query_atoms=atoms,
number_of_neighbors=4,
)[1]
neighbors = reference_atoms[indices]
neighbors_1, neighbors_2, neighbors_3, neighbors_4 = (
neighbors[:, 0, :],
@@ -312,15 +207,22 @@ def tetrahedral_order(atoms, reference_atoms=None):
return q
def tetrahedral_order_distribution(atoms, reference_atoms=None, bins=None):
def tetrahedral_order_distribution(
atoms: CoordinateFrame,
reference_atoms: Optional[CoordinateFrame] = None,
bins: Optional[ArrayLike] = None,
) -> NDArray:
assert bins is not None, "Bin edges of the distribution have to be specified."
Q = tetrahedral_order(atoms, reference_atoms=reference_atoms)
return np.histogram(Q, bins=bins)[0]
def radial_density(
atoms, bins, symmetry_axis=(0, 0, 1), origin=(0, 0, 0), height=1, returnx=False
):
atoms: CoordinateFrame,
bins: Optional[ArrayLike] = None,
symmetry_axis: ArrayLike = (0, 0, 1),
origin: Optional[ArrayLike] = None,
) -> NDArray:
"""
Calculate the radial density distribution.
@@ -329,7 +231,7 @@ def radial_density(
Args:
atoms:
Set of coordinates.
bins:
bins (opt.):
Bin specification that is passed to numpy.histogram. This needs to be
a list of bin edges if the function is used within time_average.
symmetry_axis (opt.):
@@ -337,30 +239,27 @@ def radial_density(
default is z-axis.
origin (opt.):
Origin of the rotational symmetry, e.g. center of the pore.
height (opt.):
Height of the pore, necessary for correct normalization of the density.
returnx (opt.):
If True, the x ordinate of the distribution is returned.
"""
if origin is None:
origin = np.diag(atoms.box) / 2
if bins is None:
bins = np.arange(0, np.min(np.diag(atoms.box) / 2), 0.01)
length = np.diag(atoms.box) * symmetry_axis
cartesian = rotate_axis(atoms - origin, symmetry_axis)
radius, _ = polar_coordinates(cartesian[:, 0], cartesian[:, 1])
hist = np.histogram(radius, bins=bins)[0]
volume = np.pi * (bins[1:] ** 2 - bins[:-1] ** 2) * height
res = hist / volume
if returnx:
return np.vstack((runningmean(bins, 2), res))
else:
return res
volume = np.pi * (bins[1:] ** 2 - bins[:-1] ** 2) * length
return hist / volume
def shell_density(
atoms,
shell_radius,
bins,
shell_thickness=0.5,
symmetry_axis=(0, 0, 1),
origin=(0, 0, 0),
):
atoms: CoordinateFrame,
shell_radius: float,
bins: ArrayLike,
shell_thickness: float = 0.5,
symmetry_axis: ArrayLike = (0, 0, 1),
origin: Optional[ArrayLike] = None,
) -> NDArray:
"""
Compute the density distribution on a cylindrical shell.
@@ -377,6 +276,8 @@ def shell_density(
Returns:
Two-dimensional density distribution of the atoms in the defined shell.
"""
if origin is None:
origin = np.diag(atoms.box) / 2
cartesian = rotate_axis(atoms - origin, symmetry_axis)
radius, theta = polar_coordinates(cartesian[:, 0], cartesian[:, 1])
shell_indices = (shell_radius <= radius) & (
@@ -387,40 +288,13 @@ def shell_density(
return hist
def spatial_density(atoms, bins, weights=None):
"""
Compute the spatial density distribution.
"""
density, _ = np.histogramdd(atoms, bins=bins, weights=weights)
return density
def mixing_ratio_distribution(
atoms_a,
atoms_b,
bins_ratio,
bins_density,
weights_a=None,
weights_b=None,
weights_ratio=None,
):
"""
Compute the distribution of the mixing ratio of two sets of atoms.
"""
density_a, _ = time_average
density_b, _ = np.histogramdd(atoms_b, bins=bins_density, weights=weights_b)
mixing_ratio = density_a / (density_a + density_b)
good_inds = (density_a != 0) & (density_b != 0)
hist, _ = np.histogram(
mixing_ratio[good_inds], bins=bins_ratio, weights=weights_ratio
)
return hist
def next_neighbor_distribution(
atoms, reference=None, number_of_neighbors=4, bins=None, normed=True
):
atoms: CoordinateFrame,
reference: Optional[CoordinateFrame] = None,
number_of_neighbors: int = 4,
bins: Optional[ArrayLike] = None,
normed: bool = True,
) -> NDArray:
"""
Compute the distribution of next neighbors with the same residue name.
"""
@@ -429,34 +303,34 @@ def next_neighbor_distribution(
reference = atoms
nn = next_neighbors(
reference, query_atoms=atoms, number_of_neighbors=number_of_neighbors
)
)[1]
resname_nn = reference.residue_names[nn]
count_nn = (resname_nn == atoms.residue_names.reshape(-1, 1)).sum(axis=1)
return np.histogram(count_nn, bins=bins, normed=normed)[0]
def hbonds(
D,
H,
A,
box,
DA_lim=0.35,
HA_lim=0.35,
min_cos=np.cos(30 * np.pi / 180),
full_output=False,
):
atoms: CoordinateFrame,
donator_indices: ArrayLike,
hydrogen_indices: ArrayLike,
acceptor_indices: ArrayLike,
DA_lim: float = 0.35,
HA_lim: float = 0.35,
max_angle_deg: float = 30,
full_output: bool = False,
) -> Union[NDArray, tuple[NDArray, NDArray, NDArray]]:
"""
Compute h-bond pairs
Args:
D: Set of coordinates for donators.
H: Set of coordinates for hydrogen atoms. Should have the same
atoms: Set of all coordinates for a frame.
donator_indices: Set of indices for donators.
hydrogen_indices: Set of indices for hydrogen atoms. Should have the same
length as D.
A: Set of coordinates for acceptors.
DA_lim (opt.): Minimum distance beteen donator and acceptor.
HA_lim (opt.): Minimum distance beteen hydrogen and acceptor.
min_cos (opt.): Minimum cosine for the HDA angle. Default is
equivalent to a maximum angle of 30 degree.
acceptor_indices: Set of indices for acceptors.
DA_lim (opt.): Minimum distance between donator and acceptor.
HA_lim (opt.): Minimum distance between hydrogen and acceptor.
max_angle_deg (opt.): Maximum angle in degree for the HDA angle.
full_output (opt.): Returns additionally the cosine of the
angles and the DA distances
@@ -464,8 +338,10 @@ def hbonds(
List of (D,A)-pairs in hbonds.
"""
def dist_DltA(D, H, A, box, max_dist=0.35):
ppoints, pind = pbc_points(D, box, thickness=max_dist + 0.1, index=True)
def dist_DltA(
D: CoordinateFrame, A: CoordinateFrame, max_dist: float = 0.35
) -> NDArray:
ppoints, pind = pbc_points(D, thickness=max_dist + 0.1, index=True)
Dtree = spatial.cKDTree(ppoints)
Atree = spatial.cKDTree(A)
pairs = Dtree.sparse_distance_matrix(Atree, max_dist, output_type="ndarray")
@@ -474,8 +350,10 @@ def hbonds(
pairs[:, 0] = pind[pairs[:, 0]]
return pairs
def dist_AltD(D, H, A, box, max_dist=0.35):
ppoints, pind = pbc_points(A, box, thickness=max_dist + 0.1, index=True)
def dist_AltD(
D: CoordinateFrame, A: CoordinateFrame, max_dist: float = 0.35
) -> NDArray:
ppoints, pind = pbc_points(A, thickness=max_dist + 0.1, index=True)
Atree = spatial.cKDTree(ppoints)
Dtree = spatial.cKDTree(D)
pairs = Atree.sparse_distance_matrix(Dtree, max_dist, output_type="ndarray")
@@ -485,10 +363,15 @@ def hbonds(
pairs[:, 1] = pind[pairs[:, 1]]
return pairs
D = atoms[donator_indices]
H = atoms[hydrogen_indices]
A = atoms[acceptor_indices]
min_cos = np.cos(max_angle_deg * np.pi / 180)
box = D.box
if len(D) <= len(A):
pairs = dist_DltA(D, H, A, box, DA_lim)
pairs = dist_DltA(D, A, DA_lim)
else:
pairs = dist_AltD(D, H, A, box, DA_lim)
pairs = dist_AltD(D, A, DA_lim)
vDH = pbc_diff(D[pairs[:, 0]], H[pairs[:, 0]], box)
vDA = pbc_diff(D[pairs[:, 0]], A[pairs[:, 1]], box)
@@ -513,3 +396,45 @@ def hbonds(
)
else:
return pairs[is_bond]
def calc_cluster_sizes(atoms: CoordinateFrame, r_max: float = 0.35) -> NDArray:
frame_PBC, indices_PBC = pbc_points(atoms, thickness=r_max + 0.1, index=True)
tree = KDTree(frame_PBC)
matrix = tree.sparse_distance_matrix(tree, r_max, output_type="ndarray")
new_matrix = np.zeros((len(atoms), len(atoms)))
for entry in matrix:
if entry[2] > 0:
new_matrix[indices_PBC[entry[0]], indices_PBC[entry[1]]] = 1
n_components, labels = connected_components(new_matrix, directed=False)
cluster_sizes = []
for i in range(0, np.max(labels) + 1):
cluster_sizes.append(np.sum(labels == i))
return np.array(cluster_sizes).flatten()
def gyration_radius(position: CoordinateFrame) -> NDArray:
r"""
Calculates a list of all radii of gyration of all molecules given in the coordinate
frame, weighted with the masses of the individual atoms.
Args:
position: Coordinate frame object
..math::
R_G = \left(\frac{\sum_{i=1}^{n} m_i |\vec{r_i}
- \vec{r_{COM}}|^2 }{\sum_{i=1}^{n} m_i }
\rigth)^{\frac{1}{2}}
"""
gyration_radii = np.array([])
for resid in np.unique(position.residue_ids):
pos = position.whole[position.residue_ids == resid]
mass = position.masses[position.residue_ids == resid][:, np.newaxis]
COM = 1 / mass.sum() * (mass * position).sum(axis=0)
r_sq = ((pbc_diff(pos, COM, pos.box.diagonal())) ** 2).sum(1)[:, np.newaxis]
g_radius = ((r_sq * mass).sum() / mass.sum()) ** 0.5
gyration_radii = np.append(gyration_radii, g_radius)
return gyration_radii

View File

@@ -0,0 +1,3 @@
from . import chill
from . import free_energy_landscape
from . import water

View File

@@ -0,0 +1,199 @@
from typing import Tuple, Callable
import numpy as np
from numpy.typing import ArrayLike, NDArray
import pandas as pd
from scipy import sparse
from scipy.spatial import KDTree
from scipy.special import sph_harm
from mdevaluate.coordinates import CoordinateFrame, Coordinates
from mdevaluate.pbc import pbc_points
def a_ij(atoms: ArrayLike, N: int = 4, l: int = 3) -> tuple[NDArray, NDArray]:
tree = KDTree(atoms)
dist, indices = tree.query(atoms, N + 1)
indices = indices[:, 1:]
vecs = atoms[:, np.newaxis, :] - atoms[indices]
vecs /= np.linalg.norm(vecs, axis=-1)[..., np.newaxis]
theta = np.arctan2(vecs[..., 1], vecs[..., 0]) + np.pi
phi = np.arccos(vecs[..., 2])
qijlm = sph_harm(
np.arange(-l, l + 1)[np.newaxis, np.newaxis, :],
l,
theta[..., np.newaxis],
phi[..., np.newaxis],
)
qilm = np.average(qijlm, axis=1)
qil = np.sum(qilm * np.conj(qilm), axis=-1) ** 0.5
aij = (
np.sum(qilm[:, np.newaxis, :] * np.conj(qilm[indices]), axis=-1)
/ qil[:, np.newaxis]
/ qil[indices]
)
return np.real(aij), indices
def classify_ice(
aij: NDArray, indices: NDArray, neighbors: NDArray, indexSOL: NDArray
) -> NDArray:
staggerdBonds = np.sum(aij <= -0.8, axis=1)
eclipsedBonds = np.sum((aij >= -0.35) & (aij <= 0.25), axis=1)
iceTypes = np.full(len(aij), 5)
for i in indexSOL:
if neighbors[i] != 4:
continue
elif staggerdBonds[i] == 4:
iceTypes[i] = 0
elif staggerdBonds[i] == 3 and eclipsedBonds[i] == 1:
iceTypes[i] = 1
elif staggerdBonds[i] == 3:
for j in indices[i]:
if staggerdBonds[j] >= 2:
iceTypes[i] = 2
break
elif staggerdBonds[i] == 2:
for j in indices[i]:
if staggerdBonds[j] >= 3:
iceTypes[i] = 2
break
elif eclipsedBonds[i] == 4:
iceTypes[i] = 3
elif eclipsedBonds[i] == 3:
iceTypes[i] = 4
iceTypes = iceTypes[indexSOL]
return iceTypes
def count_ice_types(iceTypes: NDArray) -> NDArray:
cubic = len(iceTypes[iceTypes == 0])
hexagonal = len(iceTypes[iceTypes == 1])
interface = len(iceTypes[iceTypes == 2])
clathrate = len(iceTypes[iceTypes == 3])
clathrate_interface = len(iceTypes[iceTypes == 4])
liquid = len(iceTypes[iceTypes == 5])
return np.array(
[cubic, hexagonal, interface, clathrate, clathrate_interface, liquid]
)
def selector_ice(
start_frame: CoordinateFrame,
traj: Coordinates,
chosen_ice_types: ArrayLike,
combined: bool = True,
) -> NDArray:
oxygen = traj.subset(atom_name="OW")
atoms = oxygen[start_frame.step]
atoms = atoms % np.diag(atoms.box)
atoms_PBC = pbc_points(atoms, thickness=1)
aij, indices = a_ij(atoms_PBC)
tree = KDTree(atoms_PBC)
neighbors = tree.query_ball_point(atoms_PBC, 0.35, return_length=True)
index_SOL = atoms_PBC.tolist().index(atoms[0].tolist())
index_SOL = np.arange(index_SOL, index_SOL + len(atoms))
ice_Types = classify_ice(aij, indices, neighbors, index_SOL)
index = []
if combined is True:
for i, ice_Type in enumerate(ice_Types):
if ice_Type in chosen_ice_types:
index.append(i)
else:
for entry in chosen_ice_types:
index_entry = []
for i, ice_Type in enumerate(ice_Types):
if ice_Type == entry:
index_entry.append(i)
index.append(np.array(index_entry))
return np.array(index)
def ice_types(trajectory: Coordinates, segments: int = 10000) -> pd.DataFrame:
def ice_types_distribution(frame: CoordinateFrame, selector: Callable) -> NDArray:
atoms_PBC = pbc_points(frame, thickness=1)
aij, indices = a_ij(atoms_PBC)
tree = KDTree(atoms_PBC)
neighbors = tree.query_ball_point(atoms_PBC, 0.35, return_length=True)
index = selector(frame, atoms_PBC)
ice_types_data = classify_ice(aij, indices, neighbors, index)
ice_parts_data = count_ice_types(ice_types_data)
return ice_parts_data
def selector(frame: CoordinateFrame, atoms_PBC: ArrayLike) -> NDArray:
atoms_SOL = traj.subset(residue_name="SOL")[frame.step]
index = atoms_PBC.tolist().index(atoms_SOL[0].tolist())
index = np.arange(index, index + len(atoms_SOL))
return np.array(index)
traj = trajectory.subset(atom_name="OW")
frame_indices = np.unique(np.int_(np.linspace(0, len(traj) - 1, num=segments)))
result = np.array(
[
[
traj[frame_index].time,
*ice_types_distribution(traj[frame_index], selector),
]
for frame_index in frame_indices
]
)
return pd.DataFrame(
{
"time": result[:, 0],
"cubic": result[:, 1],
"hexagonal": result[:, 2],
"ice_interface": result[:, 3],
"clathrate": result[:, 4],
"clathrate_interface": result[:, 5],
"liquid": result[:, 6],
}
)
def ice_clusters_traj(
traj: Coordinates, segments: int = 10000, skip: float = 0.1
) -> list:
def ice_clusters(frame: CoordinateFrame, traj: Coordinates) -> Tuple[float, list]:
selection = selector_ice(frame, traj, [0, 1, 2])
if len(selection) == 0:
return frame.time, []
else:
ice = frame[selection]
ice_PBC, indices_PBC = pbc_points(
ice, box=frame.box, thickness=0.5, index=True
)
ice_tree = KDTree(ice_PBC)
ice_matrix = ice_tree.sparse_distance_matrix(
ice_tree, 0.35, output_type="ndarray"
)
new_ice_matrix = np.zeros((len(ice), len(ice)))
for entry in ice_matrix:
if entry[2] > 0:
new_ice_matrix[indices_PBC[entry[0]], indices_PBC[entry[1]]] = 1
n_components, labels = sparse.csgraph.connected_components(
new_ice_matrix, directed=False
)
clusters = []
selection = np.array(selection)
for i in range(0, np.max(labels) + 1):
if len(ice[labels == i]) > 1:
clusters.append(
list(zip(selection[labels == i], ice[labels == i].tolist()))
)
return frame.time, clusters
frame_indices = np.unique(
np.int_(np.linspace(len(traj) * skip, len(traj) - 1, num=segments))
)
all_clusters = [
ice_clusters(traj[frame_index], traj) for frame_index in frame_indices
]
return all_clusters

View File

@@ -0,0 +1,206 @@
from functools import partial
import numpy as np
from numpy.typing import ArrayLike, NDArray
from numpy.polynomial.polynomial import Polynomial as Poly
import math
from scipy.spatial import KDTree
import pandas as pd
import multiprocessing as mp
from ..coordinates import Coordinates
def occupation_matrix(
trajectory: Coordinates,
edge_length: float = 0.05,
segments: int = 1000,
skip: float = 0.1,
nodes: int = 8,
) -> pd.DataFrame:
frame_indices = np.unique(
np.int_(np.linspace(len(trajectory) * skip, len(trajectory) - 1, num=segments))
)
box = trajectory[0].box
x_bins = np.arange(0, box[0][0] + edge_length, edge_length)
y_bins = np.arange(0, box[1][1] + edge_length, edge_length)
z_bins = np.arange(0, box[2][2] + edge_length, edge_length)
bins = [x_bins, y_bins, z_bins]
# Trajectory is split for parallel computing
size = math.ceil(len(frame_indices) / nodes)
indices = [
np.arange(len(frame_indices))[i : i + size]
for i in range(0, len(frame_indices), size)
]
pool = mp.Pool(nodes)
results = pool.map(
partial(_calc_histogram, trajectory=trajectory, bins=bins), indices
)
pool.close()
matbin = np.sum(results, axis=0)
x = (x_bins[:-1] + x_bins[1:]) / 2
y = (y_bins[:-1] + y_bins[1:]) / 2
z = (z_bins[:-1] + z_bins[1:]) / 2
coords = np.array(np.meshgrid(x, y, z, indexing="ij"))
coords = np.array([x.flatten() for x in coords])
matbin_new = matbin.flatten()
occupation_df = pd.DataFrame(
{"x": coords[0], "y": coords[1], "z": coords[2], "occupation": matbin_new}
)
occupation_df = occupation_df.query("occupation != 0").reset_index(drop=True)
return occupation_df
def _calc_histogram(
indices: ArrayLike, trajectory: Coordinates, bins: ArrayLike
) -> NDArray:
matbin = None
for index in range(0, len(indices), 1000):
try:
current_indices = indices[index : index + 1000]
except IndexError:
current_indices = indices[index:]
frames = np.concatenate(np.array([trajectory.pbc[i] for i in current_indices]))
hist, _ = np.histogramdd(frames, bins=bins)
if matbin is None:
matbin = hist
else:
matbin += hist
return matbin
def find_maxima(
occupation_df: pd.DataFrame, box: ArrayLike, edge_length: float = 0.05
) -> pd.DataFrame:
maxima_df = occupation_df.copy()
maxima_df["maxima"] = None
points = np.array(maxima_df[["x", "y", "z"]])
tree = KDTree(points, boxsize=box)
all_neighbors = tree.query_ball_point(
points, edge_length * 3 ** (1 / 2) + edge_length / 100
)
for i in range(len(maxima_df)):
if maxima_df.loc[i, "maxima"] is not None:
continue
neighbors = np.array(all_neighbors[i])
neighbors = neighbors[neighbors != i]
if len(neighbors) == 0:
maxima_df.loc[i, "maxima"] = True
elif (
maxima_df.loc[neighbors, "occupation"].max()
< maxima_df.loc[i, "occupation"]
):
maxima_df.loc[neighbors, "maxima"] = False
maxima_df.loc[i, "maxima"] = True
else:
maxima_df.loc[i, "maxima"] = False
return maxima_df
def _calc_energies(
maxima_indices: ArrayLike,
maxima_df: pd.DataFrame,
bins: ArrayLike,
box: NDArray,
T: float,
) -> NDArray:
points = np.array(maxima_df[["x", "y", "z"]])
tree = KDTree(points, boxsize=box)
maxima = maxima_df.loc[maxima_indices, ["x", "y", "z"]]
maxima_occupations = np.array(maxima_df.loc[maxima_indices, "occupation"])
num_of_neighbors = np.max(
tree.query_ball_point(maxima, bins[-1], return_length=True)
)
distances, indices = tree.query(
maxima, k=num_of_neighbors, distance_upper_bound=bins[-1]
)
all_energy_hist = []
all_occupied_bins_hist = []
if distances.ndim == 1:
current_distances = distances[1:][distances[1:] <= bins[-1]]
current_indices = indices[1:][distances[1:] <= bins[-1]]
energy = (
-np.log(maxima_df.loc[current_indices, "occupation"] / maxima_occupations)
* T
)
energy_hist = np.histogram(current_distances, bins=bins, weights=energy)[0]
occupied_bins_hist = np.histogram(current_distances, bins=bins)[0]
result = energy_hist / occupied_bins_hist
return result
for i, maxima_occupation in enumerate(maxima_occupations):
current_distances = distances[i, 1:][distances[i, 1:] <= bins[-1]]
current_indices = indices[i, 1:][distances[i, 1:] <= bins[-1]]
energy = (
-np.log(maxima_df.loc[current_indices, "occupation"] / maxima_occupation)
* T
)
energy_hist = np.histogram(current_distances, bins=bins, weights=energy)[0]
occupied_bins_hist = np.histogram(current_distances, bins=bins)[0]
all_energy_hist.append(energy_hist)
all_occupied_bins_hist.append(occupied_bins_hist)
result = np.sum(all_energy_hist, axis=0) / np.sum(all_occupied_bins_hist, axis=0)
return result
def add_distances(
occupation_df: pd.DataFrame, pore_geometry: str, origin: ArrayLike
) -> pd.DataFrame:
distance_df = occupation_df.copy()
if pore_geometry == "cylindrical":
distance_df["distance"] = (
(distance_df["x"] - origin[0]) ** 2 + (distance_df["y"] - origin[1]) ** 2
) ** (1 / 2)
elif pore_geometry == "slit":
distance_df["distance"] = np.abs(distance_df["z"] - origin[2])
else:
raise ValueError(
f"pore_geometry is {pore_geometry}, should either be "
f"'cylindrical' or 'slit'"
)
return distance_df
def distance_resolved_energies(
maxima_df: pd.DataFrame,
distance_bins: ArrayLike,
r_bins: ArrayLike,
box: NDArray,
temperature: float,
) -> pd.DataFrame:
results = []
for i in range(len(distance_bins) - 1):
maxima_indices = np.array(
maxima_df.index[
(maxima_df["distance"] >= distance_bins[i])
* (maxima_df["distance"] < distance_bins[i + 1])
* (maxima_df["maxima"] == True)
]
)
results.append(
_calc_energies(maxima_indices, maxima_df, r_bins, box, temperature)
)
distances = (distance_bins[:-1] + distance_bins[1:]) / 2
radii = (r_bins[:-1] + r_bins[1:]) / 2
d = np.array([d for d in distances for r in radii])
r = np.array([r for d in distances for r in radii])
result = np.array(results).flatten()
return pd.DataFrame({"d": d, "r": r, "energy": result})
def find_energy_maxima(
energy_df: pd.DataFrame, r_min: float, r_max: float
) -> pd.DataFrame:
distances = []
energies = []
for d, data_d in energy_df.groupby("d"):
distances.append(d)
x = np.array(data_d["r"])
y = np.array(data_d["energy"])
mask = (x >= r_min) * (x <= r_max)
p3 = Poly.fit(x[mask], y[mask], deg=2)
energies.append(np.max(p3(np.linspace(r_min, r_max, 1000))))
return pd.DataFrame({"d": distances, "energy": energies})

View File

@@ -0,0 +1,419 @@
from functools import partial
from typing import Tuple, Callable, Optional
import numpy as np
from numpy.typing import NDArray, ArrayLike
import pandas as pd
from scipy.spatial import KDTree
from ..distribution import hbonds
from ..pbc import pbc_points
from ..correlation import shifted_correlation, overlap
from ..coordinates import Coordinates, CoordinateFrame
def tanaka_zeta(
trajectory: Coordinates, angle: float = 30, segments: int = 100, skip: float = 0.1
) -> pd.DataFrame:
frame_indices = np.unique(
np.int_(np.linspace(len(trajectory) * skip, len(trajectory) - 1, num=segments))
)
sel = trajectory.atom_subset.selection
A = np.where(
trajectory.subset(atom_name="OW", residue_name="SOL").atom_subset.selection[sel]
)[0]
D = np.vstack([A] * 2).T.reshape((-1,))
H = np.where(
trajectory.subset(atom_name="HW.", residue_name="SOL").atom_subset.selection[
sel
]
)[0]
zeta_dist = []
zeta_cg_dist = []
for frame_index in frame_indices:
D_frame = trajectory[frame_index][D]
H_frame = trajectory[frame_index][H]
A_frame = trajectory[frame_index][A]
box = trajectory[frame_index].box
pairs = hbonds(
D_frame, H_frame, A_frame, box, min_cos=np.cos(angle / 180 * np.pi)
)
pairs[:, 0] = np.int_((pairs[:, 0] / 2))
pairs = np.sort(pairs, axis=1)
pairs = np.unique(pairs, axis=0)
pairs = pairs.tolist()
A_PBC, A_index = pbc_points(A_frame, box, thickness=0.7, index=True)
A_tree = KDTree(A_PBC)
dist, dist_index = A_tree.query(A_frame, 16, distance_upper_bound=0.7)
dist_index = A_index[dist_index]
zeta = []
for i, indices in enumerate(dist_index):
dist_hbond = []
dist_non_hbond = []
for j, index in enumerate(indices):
if j != 0:
if np.sort([indices[0], index]).tolist() in pairs:
dist_hbond.append(dist[i, j])
else:
dist_non_hbond.append(dist[i, j])
try:
zeta.append(np.min(dist_non_hbond) - np.max(dist_hbond))
except ValueError:
zeta.append(0)
zeta = np.array(zeta)
dist, dist_index = A_tree.query(A_frame, 16, distance_upper_bound=0.7)
dist_index = A_index[dist_index]
dist_index = np.array(
[indices[dist[i] <= 0.35] for i, indices in enumerate(dist_index)]
)
zeta_cg = np.array([np.mean(zeta[indices]) for indices in dist_index])
bins = np.linspace(-0.1, 0.2, 301)
zeta_dist.append(np.histogram(zeta, bins=bins)[0])
zeta_cg_dist.append(np.histogram(zeta_cg, bins=bins)[0])
z = bins[1:] - (bins[1] - bins[0]) / 2
zeta_dist = np.mean(zeta_dist, axis=0)
zeta_dist = zeta_dist / np.mean(zeta_dist)
zeta_cg_dist = np.mean(zeta_cg_dist, axis=0)
zeta_cg_dist = zeta_cg_dist / np.mean(zeta_cg_dist)
return pd.DataFrame({"zeta": z, "result": zeta_dist, "result_cg": zeta_cg_dist})
def chi_four_trans(
trajectory: Coordinates, skip: float = 0.1, segments: int = 10000
) -> pd.DataFrame:
traj = trajectory.nojump
N = len(trajectory[0])
t, S = shifted_correlation(
partial(overlap, radius=0.1), traj, skip=skip, segments=segments, average=False
)
chi = 1 / N * S.var(axis=0)[1:]
return pd.DataFrame({"time": t[1:], "chi": chi})
def tanaka_correlation_map(
trajectory: Coordinates,
data_chi_four_trans: pd.DataFrame,
angle: float = 30,
segments: int = 100,
skip: float = 0.1,
) -> pd.DataFrame:
def tanaka_zeta_cg(
trajectory: Coordinates,
angle: float = 30,
segments: int = 1000,
skip: float = 0.1,
) -> Tuple[NDArray, NDArray]:
frame_indices = np.unique(
np.int_(
np.linspace(len(trajectory) * skip, len(trajectory) - 1, num=segments)
)
)
sel = trajectory.atom_subset.selection
A = np.where(
trajectory.subset(atom_name="OW", residue_name="SOL").atom_subset.selection[
sel
]
)[0]
D = np.vstack([A] * 2).T.reshape((-1,))
H = np.where(
trajectory.subset(
atom_name="HW.", residue_name="SOL"
).atom_subset.selection[sel]
)[0]
zeta_cg = []
times = []
for frame_index in frame_indices:
D_frame = trajectory[frame_index][D]
H_frame = trajectory[frame_index][H]
A_frame = trajectory[frame_index][A]
box = trajectory[frame_index].box
pairs = hbonds(
D_frame, H_frame, A_frame, box, min_cos=np.cos(angle / 180 * np.pi)
)
pairs[:, 0] = np.int_((pairs[:, 0] / 2))
pairs = np.sort(pairs, axis=1)
pairs = np.unique(pairs, axis=0)
pairs = pairs.tolist()
A_PBC, A_index = pbc_points(A_frame, box, thickness=0.7, index=True)
A_tree = KDTree(A_PBC)
dist, dist_index = A_tree.query(A_frame, 16, distance_upper_bound=0.7)
dist_index = A_index[dist_index]
zeta = []
for i, indices in enumerate(dist_index):
dist_hbond = []
dist_non_hbond = []
for j, index in enumerate(indices):
if j != 0:
if np.sort([indices[0], index]).tolist() in pairs:
dist_hbond.append(dist[i, j])
else:
dist_non_hbond.append(dist[i, j])
try:
zeta.append(np.min(dist_non_hbond) - np.max(dist_hbond))
except ValueError:
zeta.append(0)
zeta = np.array(zeta)
dist_index = np.array(
[indices[dist[i] <= 0.35] for i, indices in enumerate(dist_index)]
)
zeta_cg.append(np.array([np.mean(zeta[indices]) for indices in dist_index]))
times.append(trajectory[frame_index].time)
return np.array(times), np.array(zeta_cg)
def delta_r_max(
trajectory: Coordinates, frame: CoordinateFrame, tau_4: float
) -> NDArray:
dt = trajectory[1].time - trajectory[0].time
index_start = frame.step
index_end = index_start + int(tau_4 / dt) + 1
frame_indices = np.arange(index_start, index_end + 1)
end_cords = np.array([trajectory[frame_index] for frame_index in frame_indices])
vectors = trajectory[index_start] - end_cords
delta_r = np.linalg.norm(vectors, axis=-1)
delta_r = np.max(delta_r, axis=0)
return delta_r
d = np.array(data_chi_four_trans[["time", "chi"]])
mask = d[:, 1] >= 0.7 * np.max(d[:, 1])
fit = np.polyfit(d[mask, 0], d[mask, 1], 4)
p = np.poly1d(fit)
x_inter = np.linspace(d[mask, 0][0], d[mask, 0][-1], 1e6)
y_inter = p(x_inter)
tau_4 = x_inter[y_inter == np.max(y_inter)]
oxygens = trajectory.nojump.subset(atom_name="OW")
window = tau_4 / trajectory[-1].time
start_frames = np.unique(
np.linspace(
len(trajectory) * skip,
len(trajectory) * (1 - window),
num=segments,
endpoint=False,
dtype=int,
)
)
times, zeta_cg = tanaka_zeta_cg(trajectory, angle=angle)
zeta_cg_mean = np.array(
[
np.mean(
zeta_cg[
(times >= trajectory[start_frame].time)
* (times <= (trajectory[start_frame].time + tau_4))
],
axis=0,
)
for start_frame in start_frames
]
).flatten()
delta_r = np.array(
[
delta_r_max(oxygens, oxygens[start_frame], tau_4)
for start_frame in start_frames
]
).flatten()
return pd.DataFrame({"zeta_cg": zeta_cg_mean, "delta_r": delta_r})
def LSI_atom(distances: ArrayLike) -> NDArray:
r_j = distances[distances <= 0.37]
r_j = r_j.tolist()
r_j.append(distances[len(r_j)])
delta_ji = [r_j[i + 1] - r_j[i] for i in range(0, len(r_j) - 1)]
mean_delta_i = np.mean(delta_ji)
I = 1 / len(delta_ji) * np.sum((mean_delta_i - delta_ji) ** 2)
return I
def LSI(
trajectory: Coordinates, segments: int = 10000, skip: float = 0
) -> pd.DataFrame:
def LSI_distribution(
frame: CoordinateFrame, bins: NDArray, selector: Optional[Callable] = None
) -> NDArray:
atoms_PBC = pbc_points(frame, frame.box, thickness=0.7)
atoms_tree = KDTree(atoms_PBC)
if selector:
index = selector(frame)
else:
index = np.arange(len(frame))
dist, _ = atoms_tree.query(frame[index], 50, distance_upper_bound=0.6)
distances = dist[:, 1:]
LSI_values = np.array([LSI_atom(distance) for distance in distances])
dist = np.histogram(LSI_values, bins=bins, density=True)[0]
return dist
bins = np.linspace(0, 0.007, 201)
I = bins[1:] - (bins[1] - bins[0]) / 2
frame_indices = np.unique(
np.int_(np.linspace(len(trajectory) * skip, len(trajectory) - 1, num=segments))
)
distributions = np.array(
[
LSI_distribution(trajectory[frame_index], trajectory, bins, selector=None)
for frame_index in frame_indices
]
)
P = np.mean(distributions, axis=0)
return pd.DataFrame({"I": I, "P": P})
def HDL_LDL_positions(
frame: CoordinateFrame, selector: Optional[Callable] = None
) -> Tuple[NDArray, NDArray]:
atoms_PBC = pbc_points(frame, frame.box, thickness=0.7)
atoms_tree = KDTree(atoms_PBC)
if selector:
index = selector(frame)
else:
index = range(len(frame))
dist = atoms_tree.query(frame[index], 50, distance_upper_bound=0.6)[0]
distances = dist[:, 1:]
LSI_values = np.array([LSI_atom(distance) for distance in distances])
LDL = LSI_values >= 0.0013
HDL = LSI_values < 0.0013
pos_HDL = frame[index][HDL]
pos_LDL = frame[index][LDL]
return pos_HDL, pos_LDL
def HDL_LDL_gr(
trajectory: Coordinates, segments: int = 10000, skip: float = 0.1
) -> pd.DataFrame:
def gr_frame(
frame: CoordinateFrame, trajectory: Coordinates, bins: ArrayLike
) -> NDArray:
atoms_ALL = frame
atoms_HDL, atoms_LDL = HDL_LDL_positions(frame, trajectory)
atoms_PBC_ALL = pbc_points(atoms_ALL, frame.box)
atoms_PBC_LDL = pbc_points(atoms_LDL, frame.box)
atoms_PBC_HDL = pbc_points(atoms_HDL, frame.box)
tree_ALL = KDTree(atoms_PBC_ALL)
tree_LDL = KDTree(atoms_PBC_LDL)
tree_HDL = KDTree(atoms_PBC_HDL)
dist_ALL_ALL, _ = tree_ALL.query(
atoms_ALL, len(frame) // 2, distance_upper_bound=bins[-1] + 0.1
)
dist_HDL_HDL, _ = tree_HDL.query(
atoms_HDL, len(frame) // 2, distance_upper_bound=bins[-1] + 0.1
)
dist_LDL_LDL, _ = tree_LDL.query(
atoms_LDL, len(frame) // 2, distance_upper_bound=bins[-1] + 0.1
)
dist_HDL_LDL, _ = tree_LDL.query(
atoms_HDL, len(frame) // 2, distance_upper_bound=bins[-1] + 0.1
)
dist_ALL_ALL = dist_ALL_ALL[:, 1:].flatten()
dist_HDL_HDL = dist_HDL_HDL[:, 1:].flatten()
dist_LDL_LDL = dist_LDL_LDL[:, 1:].flatten()
dist_HDL_LDL = dist_HDL_LDL.flatten()
hist_ALL_ALL = np.histogram(
dist_ALL_ALL, bins=bins, range=(0, bins[-1]), density=False
)[0]
hist_HDL_HDL = np.histogram(
dist_HDL_HDL, bins=bins, range=(0, bins[-1]), density=False
)[0]
hist_LDL_LDL = np.histogram(
dist_LDL_LDL, bins=bins, range=(0, bins[-1]), density=False
)[0]
hist_HDL_LDL = np.histogram(
dist_HDL_LDL, bins=bins, range=(0, bins[-1]), density=False
)[0]
return np.array(
[
hist_ALL_ALL / len(atoms_ALL),
hist_HDL_HDL / len(atoms_HDL),
hist_LDL_LDL / len(atoms_LDL),
hist_HDL_LDL / len(atoms_HDL),
]
)
start_frame = trajectory[int(len(trajectory) * skip)]
upper_bound = round(np.min(np.diag(start_frame.box)) / 2 - 0.05, 1)
bins = np.linspace(0, upper_bound, upper_bound * 500 + 1)
frame_indices = np.unique(
np.int_(np.linspace(len(trajectory) * skip, len(trajectory) - 1, num=segments))
)
gr = []
for frame_index in frame_indices:
hists = gr_frame(trajectory[frame_index], trajectory, bins)
gr.append(hists)
gr = np.mean(gr, axis=0)
gr = gr / (4 / 3 * np.pi * bins[1:] ** 3 - 4 / 3 * np.pi * bins[:-1] ** 3)
r = bins[1:] - (bins[1] - bins[0]) / 2
return pd.DataFrame(
{"r": r, "gr_ALL": [0], "gr_HDL": gr[1], "gr_LDL": gr[2], "gr_MIX": gr[3]}
)
def HDL_LDL_concentration(
trajectory: Coordinates, segments: int = 10000, skip: float = 0.1
) -> pd.DataFrame:
def HDL_LDL_concentration_frame(
frame: CoordinateFrame, bins: ArrayLike
) -> Tuple[NDArray, NDArray]:
atoms_HDL, atoms_LDL = HDL_LDL_positions(frame, trajectory)
atoms_PBC_HDL = pbc_points(atoms_HDL, frame.box, thickness=0.61)
atoms_PBC_LDL = pbc_points(atoms_LDL, frame.box, thickness=0.61)
tree_LDL = KDTree(atoms_PBC_LDL)
tree_HDL = KDTree(atoms_PBC_HDL)
dist_HDL_HDL, _ = tree_HDL.query(atoms_HDL, 31, distance_upper_bound=0.6)
dist_HDL_LDL, _ = tree_LDL.query(atoms_HDL, 30, distance_upper_bound=0.6)
HDL_near_HDL = np.sum(
dist_HDL_HDL <= 0.5, axis=-1
) # Ausgangsteilchen dazu zählen
LDL_near_HDL = np.sum(dist_HDL_LDL <= 0.5, axis=-1)
x_HDL = HDL_near_HDL / (HDL_near_HDL + LDL_near_HDL)
x_HDL_dist = np.histogram(x_HDL, bins=bins, range=(0, bins[-1]), density=True)[
0
]
dist_LDL_LDL, _ = tree_LDL.query(atoms_LDL, 31, distance_upper_bound=0.6)
dist_LDL_HDL, _ = tree_HDL.query(atoms_LDL, 30, distance_upper_bound=0.6)
LDL_near_LDL = np.sum(
dist_LDL_LDL <= 0.5, axis=-1
) # Ausgangsteilchen dazu zählen
HDL_near_LDL = np.sum(dist_LDL_HDL <= 0.5, axis=-1)
x_LDL = LDL_near_LDL / (LDL_near_LDL + HDL_near_LDL)
x_LDL_dist = np.histogram(x_LDL, bins=bins, range=(0, bins[-1]), density=True)[
0
]
return x_HDL_dist, x_LDL_dist
bins = np.linspace(0, 1, 21)
x = bins[1:] - (bins[1] - bins[0]) / 2
frame_indices = np.unique(
np.int_(np.linspace(len(trajectory) * skip, len(trajectory) - 1, num=segments))
)
local_concentration_dist = np.array(
[
HDL_LDL_concentration_frame(trajectory[frame_index], trajectory, bins)
for frame_index in frame_indices
]
)
x_HDL = np.mean(local_concentration_dist[:, 0], axis=0)
x_LDL = np.mean(local_concentration_dist[:, 1], axis=0)
return pd.DataFrame({"x": x, "x_HDL": x_HDL, "x_LDL": x_LDL})

View File

@@ -1,43 +1,93 @@
import numpy as np
from numpy.typing import ArrayLike
from scipy.special import gamma as spgamma
from scipy.integrate import quad as spquad
def kww(t, A, τ, β):
return A * np.exp(-((t / τ) ** β))
def kww(t: ArrayLike, A: float, tau: float, beta: float) -> ArrayLike:
return A * np.exp(-((t / tau) ** beta))
def kww_1e(A, τ, β):
return τ * (-np.log(1 / (np.e * A))) ** (1 / β)
def kww_1e(A: float, tau: float, beta: float) -> float:
return tau * (-np.log(1 / (np.e * A))) ** (1 / beta)
def cole_davidson(w, A, b, t0):
P = np.arctan(w * t0)
return A * np.cos(P) ** b * np.sin(b * P)
def cole_davidson(omega: ArrayLike, A: float, beta: float, tau: float) -> ArrayLike:
P = np.arctan(omega * tau)
return A * np.cos(P) ** beta * np.sin(beta * P)
def cole_cole(w, A, b, t0):
def cole_cole(omega: ArrayLike, A: float, beta: float, tau: float) -> ArrayLike:
return (
A
* (w * t0) ** b
* np.sin(np.pi * b / 2)
/ (1 + 2 * (w * t0) ** b * np.cos(np.pi * b / 2) + (w * t0) ** (2 * b))
* (omega * tau) ** beta
* np.sin(np.pi * beta / 2)
/ (
1
+ 2 * (omega * tau) ** beta * np.cos(np.pi * beta / 2)
+ (omega * tau) ** (2 * beta)
)
)
def havriliak_negami(ω, A, β, α, τ):
def havriliak_negami(
omega: ArrayLike, A: float, beta: float, alpha: float, tau: float
) -> ArrayLike:
r"""
Imaginary part of the Havriliak-Negami function.
.. math::
\chi_{HN}(\omega) = \Im\left(\frac{A}{(1 + (i\omega\tau)^\alpha)^\beta}\right)
"""
return -(A / (1 + (1j * ω * τ) ** α) ** β).imag
return -(A / (1 + (1j * omega * tau) ** alpha) ** beta).imag
# fits decay of correlation times, e.g. with distance to pore walls
def colen(d, X, t8, A):
return t8 * np.exp(A * np.exp(-d / X))
def colen(d: ArrayLike, X: float, tau_pc: float, A: float) -> ArrayLike:
return tau_pc * np.exp(A * np.exp(-d / X))
# fits decay of the plateau height of the overlap function, e.g. with distance to pore walls
def colenQ(d, X, Qb, g):
# fits decay of the plateau height of the overlap function,
# e.g. with distance to pore walls
def colenQ(d: ArrayLike, X: float, Qb: float, g: float) -> ArrayLike:
return (1 - Qb) * np.exp(-((d / X) ** g)) + Qb
def vft(T: ArrayLike, tau_0: float, B: float, T_inf: float) -> ArrayLike:
return tau_0 * np.exp(B / (T - T_inf))
def arrhenius(T: ArrayLike, tau_0: float, E_a: float) -> ArrayLike:
return tau_0 * np.exp(E_a / T)
def MLfit(t: ArrayLike, tau: float, A: float, alpha: float) -> ArrayLike:
def MLf(z: ArrayLike, a: float) -> ArrayLike:
"""Mittag-Leffler function"""
z = np.atleast_1d(z)
if a == 0:
return 1 / (1 - z)
elif a == 1:
return np.exp(z)
elif a > 1 or all(z > 0):
k = np.arange(100)
return np.polynomial.polynomial.polyval(z, 1 / spgamma(a * k + 1))
# a helper for tricky case, from Gorenflo, Loutchko & Luchko
def _MLf(z: float, a: float) -> ArrayLike:
if z < 0:
f = lambda x: (
np.exp(-x * (-z) ** (1 / a))
* x ** (a - 1)
* np.sin(np.pi * a)
/ (x ** (2 * a) + 2 * x**a * np.cos(np.pi * a) + 1)
)
return 1 / np.pi * spquad(f, 0, np.inf)[0]
elif z == 0:
return 1
else:
return MLf(z, a)
return np.vectorize(_MLf)(z, a)
return A * MLf(-((t / tau) ** alpha), alpha)

View File

@@ -1,68 +1,57 @@
from __future__ import annotations
from collections import OrderedDict
from typing import Optional, Union, TYPE_CHECKING
import numpy as np
from numpy.typing import ArrayLike, NDArray
from scipy.spatial import cKDTree
from itertools import product
from .logging import logger
if TYPE_CHECKING:
from mdevaluate.coordinates import CoordinateFrame
def pbc_diff_old(v1, v2, box):
"""
Calculate the difference of two vestors, considering optional boundary conditions.
"""
def pbc_diff(
coords_a: NDArray, coords_b: NDArray, box: Optional[NDArray] = None
) -> NDArray:
if box is None:
v = v1 - v2
else:
v = v1 % box - v2 % box
v -= (v > box / 2) * box
v += (v < -box / 2) * box
return v
def pbc_diff(v1, v2=None, box=None):
if box is None:
out = v1 - v2
out = coords_a - coords_b
elif len(getattr(box, "shape", [])) == 1:
out = pbc_diff_rect(v1, v2, box)
out = pbc_diff_rect(coords_a, coords_b, box)
elif len(getattr(box, "shape", [])) == 2:
out = pbc_diff_tric(v1, v2, box)
out = pbc_diff_tric(coords_a, coords_b, box)
else:
raise NotImplementedError("cannot handle box")
return out
def pbc_diff_rect(v1, v2, box):
def pbc_diff_rect(coords_a: NDArray, coords_b: NDArray, box: NDArray) -> NDArray:
"""
Calculate the difference of two vectors, considering periodic boundary conditions.
"""
if v2 is None:
v = v1
else:
v = v1 - v2
v = coords_a - coords_b
s = v / box
v = box * (s - s.round())
v = box * (s - np.round(s))
return v
def pbc_diff_tric(v1, v2=None, box=None):
def pbc_diff_tric(coords_a: NDArray, coords_b: NDArray, box: NDArray) -> NDArray:
"""
difference vector for arbitrary pbc
Difference vector for arbitrary pbc
Args:
box_matrix: CoordinateFrame.box
"""
if len(box.shape) == 1:
box = np.diag(box)
if v1.shape == (3,):
v1 = v1.reshape((1, 3)) # quick 'n dirty
if v2.shape == (3,):
v2 = v2.reshape((1, 3))
if coords_a.shape == (3,):
coords_a = coords_a.reshape((1, 3)) # quick 'n dirty
if coords_b.shape == (3,):
coords_b = coords_b.reshape((1, 3))
if box is not None:
r3 = np.subtract(v1, v2)
r3 = np.subtract(coords_a, coords_b)
r2 = np.subtract(
r3,
(np.rint(np.divide(r3[:, 2], box[2][2])))[:, np.newaxis]
@@ -79,68 +68,17 @@ def pbc_diff_tric(v1, v2=None, box=None):
* box[0][np.newaxis, :],
)
else:
v = v1 - v2
v = coords_a - coords_b
return v
def pbc_dist(a1, a2, box=None):
return ((pbc_diff(a1, a2, box) ** 2).sum(axis=1)) ** 0.5
def pbc_dist(
atoms_a: NDArray, atoms_b: NDArray, box: Optional[NDArray] = None
) -> ArrayLike:
return ((pbc_diff(atoms_a, atoms_b, box) ** 2).sum(axis=1)) ** 0.5
def pbc_extend(c, box):
"""
in: c is frame, box is frame.box
out: all atoms in frame and their perio. image (shape => array(len(c)*27,3))
"""
c = np.asarray(c)
if c.shape == (3,):
c = c.reshape((1, 3)) # quick 'n dirty
comb = np.array(
[np.asarray(i) for i in product([0, -1, 1], [0, -1, 1], [0, -1, 1])]
)
b_matrices = comb[:, :, np.newaxis] * box[np.newaxis, :, :]
b_vectors = b_matrices.sum(axis=1)[np.newaxis, :, :]
return c[:, np.newaxis, :] + b_vectors
def pbc_kdtree(v1, box, leafsize=32, compact_nodes=False, balanced_tree=False):
"""
kd_tree with periodic images
box - whole matrix
rest: optional optimization
"""
r0 = cKDTree(
pbc_extend(v1, box).reshape((-1, 3)), leafsize, compact_nodes, balanced_tree
)
return r0
def pbc_kdtree_query(v1, v2, box, n=1):
"""
kd_tree query with periodic images
"""
r0, r1 = pbc_kdtree(v1, box).query(v2, n)
r1 = r1 // 27
return r0, r1
def pbc_backfold_rect(act_frame, box_matrix):
"""
mimics "trjconv ... -pbc atom -ur rect"
folds coords of act_frame in cuboid
"""
af = np.asarray(act_frame)
if af.shape == (3,):
act_frame = act_frame.reshape((1, 3)) # quick 'n dirty
b = box_matrix
c = np.diag(b) / 2
af = pbc_diff(np.zeros((1, 3)), af - c, b)
return af + c
def pbc_backfold_compact(act_frame, box_matrix):
def pbc_backfold_compact(act_frame: NDArray, box_matrix: NDArray) -> NDArray:
"""
mimics "trjconv ... -pbc atom -ur compact"
@@ -160,11 +98,11 @@ def pbc_backfold_compact(act_frame, box_matrix):
b_matrices = comb[:, :, np.newaxis] * box[np.newaxis, :, :]
b_vectors = b_matrices.sum(axis=1)[np.newaxis, :, :]
sc = c[:, np.newaxis, :] + b_vectors
w = np.argsort((((sc) - ctr) ** 2).sum(2), 1)[:, 0]
w = np.argsort(((sc - ctr) ** 2).sum(2), 1)[:, 0]
return sc[range(shape[0]), w]
def whole(frame):
def whole(frame: CoordinateFrame) -> CoordinateFrame:
"""
Apply ``-pbc whole`` to a CoordinateFrame.
"""
@@ -191,7 +129,7 @@ def whole(frame):
NOJUMP_CACHESIZE = 128
def nojump(frame, usecache=True):
def nojump(frame: CoordinateFrame, usecache: bool = True) -> CoordinateFrame:
"""
Return the nojump coordinates of a frame, based on a jump matrix.
"""
@@ -215,7 +153,7 @@ def nojump(frame, usecache=True):
delta
+ np.array(
np.vstack(
[m[i0 : abstep + 1].sum(axis=0) for m in reader.nojump_matrixes]
[m[i0 : abstep + 1].sum(axis=0) for m in reader.nojump_matrices]
).T
)
* frame.box.diagonal()
@@ -231,7 +169,7 @@ def nojump(frame, usecache=True):
np.vstack(
[
m[: frame.step + 1, selection].sum(axis=0)
for m in reader.nojump_matrixes
for m in reader.nojump_matrices
]
).T
)
@@ -240,15 +178,23 @@ def nojump(frame, usecache=True):
return frame - delta
def pbc_points(coordinates, box, thickness=0, index=False, shear=False):
def pbc_points(
coordinates: ArrayLike,
box: Optional[NDArray] = None,
thickness: Optional[float] = None,
index: bool = False,
shear: bool = False,
) -> Union[NDArray, tuple[NDArray, NDArray]]:
"""
Returns the points their first periodic images. Does not fold
them back into the box.
Thickness 0 means all 27 boxes. Positive means the box+thickness.
Negative values mean that less than the box is returned.
index=True also returns the indices with indices of images being their
originals values.
original values.
"""
if box is None:
box = coordinates.box
if shear:
box[2, 0] = box[2, 0] % box[0, 0]
# Shifts the box images in the other directions if they moved more than
@@ -263,7 +209,7 @@ def pbc_points(coordinates, box, thickness=0, index=False, shear=False):
coordinates_pbc = np.concatenate([coordinates + v @ box for v in grid], axis=0)
size = np.diag(box)
if thickness != 0:
if thickness is not None:
mask = np.all(coordinates_pbc > -thickness, axis=1)
coordinates_pbc = coordinates_pbc[mask]
indices = indices[mask]

View File

@@ -2,13 +2,8 @@
Module that provides different readers for trajectory files.
It also provides a common interface layer between the file IO packages,
namely pygmx and mdanalysis, and mdevaluate.
namely mdanalysis, and mdevaluate.
"""
from .checksum import checksum
from .logging import logger
from . import atoms
from functools import lru_cache
from collections import namedtuple
import os
from os import path
@@ -19,9 +14,19 @@ import re
import itertools
import numpy as np
import MDAnalysis as mdanalysis
import numpy.typing as npt
import MDAnalysis
from scipy import sparse
from .checksum import checksum
from .logging import logger
from . import atoms
from .coordinates import Coordinates
CSR_ATTRS = ("data", "indices", "indptr")
NOJUMP_MAGIC = 2016
Group_RE = re.compile("\[ ([-+\w]+) \]")
class NojumpError(Exception):
pass
@@ -31,19 +36,50 @@ class WrongTopologyError(Exception):
pass
class BaseReader:
"""Base class for trajectory readers."""
@property
def filename(self):
return self.rd.filename
@property
def nojump_matrices(self):
if self._nojump_matrices is None:
raise NojumpError("Nojump Data not available: {}".format(self.filename))
return self._nojump_matrices
@nojump_matrices.setter
def nojump_matrices(self, mats):
self._nojump_matrices = mats
def __init__(self, rd):
self.rd = rd
self._nojump_matrices = None
if path.exists(nojump_load_filename(self)):
load_nojump_matrices(self)
def __getitem__(self, item):
return self.rd[item]
def __len__(self):
return len(self.rd)
def __checksum__(self):
cache = array("L", self.rd._xdr.offsets.tobytes())
return checksum(self.filename, str(cache))
def open_with_mdanalysis(
topology, trajectory, cached=False, index_file=None, charges=None, masses=None
):
topology: str,
trajectory: str,
index_file: str = None,
charges: npt.ArrayLike = None,
masses: npt.ArrayLike = None,
) -> (atoms.Atoms, BaseReader):
"""Open the topology and trajectory with mdanalysis."""
uni = mdanalysis.Universe(topology, trajectory, convert_units=False)
if cached is not False:
if cached is True:
maxsize = 128
else:
maxsize = cached
reader = CachedReader(uni.trajectory, maxsize)
else:
reader = BaseReader(uni.trajectory)
uni = MDAnalysis.Universe(topology, trajectory, convert_units=False)
reader = BaseReader(uni.trajectory)
reader.universe = uni
if topology.endswith(".tpr"):
charges = uni.atoms.charges
@@ -67,15 +103,12 @@ def open_with_mdanalysis(
return atms, reader
group_re = re.compile("\[ ([-+\w]+) \]")
def load_indices(index_file):
def load_indices(index_file: str):
indices = {}
index_array = None
with open(index_file) as idx_file:
for line in idx_file:
m = group_re.search(line)
m = Group_RE.search(line)
if m is not None:
group_name = m.group(1)
index_array = indices.get(group_name, [])
@@ -89,7 +122,7 @@ def load_indices(index_file):
return indices
def is_writeable(fname):
def is_writeable(fname: str):
"""Test if a directory is actually writeable, by writing a temporary file."""
fdir = os.path.dirname(fname)
ftmp = os.path.join(fdir, str(np.random.randint(999999999)))
@@ -107,7 +140,7 @@ def is_writeable(fname):
return False
def nojump_load_filename(reader):
def nojump_load_filename(reader: BaseReader):
directory, fname = path.split(reader.filename)
full_path = path.join(directory, ".{}.nojump.npz".format(fname))
if not is_writeable(directory):
@@ -123,7 +156,7 @@ def nojump_load_filename(reader):
return full_path
else:
user_data_dir = os.path.join("/data/", os.environ["HOME"].split("/")[-1])
full_path_fallback = os.path.join(
full_path = os.path.join(
os.path.join(user_data_dir, ".mdevaluate/nojump"),
directory.lstrip("/"),
".{}.nojump.npz".format(fname),
@@ -131,7 +164,7 @@ def nojump_load_filename(reader):
return full_path
def nojump_save_filename(reader):
def nojump_save_filename(reader: BaseReader):
directory, fname = path.split(reader.filename)
full_path = path.join(directory, ".{}.nojump.npz".format(fname))
if is_writeable(directory):
@@ -152,11 +185,7 @@ def nojump_save_filename(reader):
return full_path_fallback
CSR_ATTRS = ("data", "indices", "indptr")
NOJUMP_MAGIC = 2016
def parse_jumps(trajectory):
def parse_jumps(trajectory: Coordinates):
prev = trajectory[0].whole
box = prev.box.diagonal()
SparseData = namedtuple("SparseData", ["data", "row", "col"])
@@ -180,28 +209,28 @@ def parse_jumps(trajectory):
return jump_data
def generate_nojump_matrixes(trajectory):
def generate_nojump_matrices(trajectory: Coordinates):
"""
Create the matrixes with pbc jumps for a trajectory.
Create the matrices with pbc jumps for a trajectory.
"""
logger.info("generate Nojump Matrixes for: {}".format(trajectory))
logger.info("generate Nojump matrices for: {}".format(trajectory))
jump_data = parse_jumps(trajectory)
N = len(trajectory)
M = len(trajectory[0])
trajectory.frames.nojump_matrixes = tuple(
trajectory.frames.nojump_matrices = tuple(
sparse.csr_matrix((np.array(m.data), (m.row, m.col)), shape=(N, M))
for m in jump_data
)
save_nojump_matrixes(trajectory.frames)
save_nojump_matrices(trajectory.frames)
def save_nojump_matrixes(reader, matrixes=None):
if matrixes is None:
matrixes = reader.nojump_matrixes
def save_nojump_matrices(reader: BaseReader, matrices: npt.ArrayLike = None):
if matrices is None:
matrices = reader.nojump_matrices
data = {"checksum": checksum(NOJUMP_MAGIC, checksum(reader))}
for d, mat in enumerate(matrixes):
for d, mat in enumerate(matrices):
data["shape"] = mat.shape
for attr in CSR_ATTRS:
data["{}_{}".format(attr, d)] = getattr(mat, attr)
@@ -209,18 +238,19 @@ def save_nojump_matrixes(reader, matrixes=None):
np.savez(nojump_save_filename(reader), **data)
def load_nojump_matrixes(reader):
def load_nojump_matrices(reader: BaseReader):
zipname = nojump_load_filename(reader)
try:
data = np.load(zipname, allow_pickle=True)
except (AttributeError, BadZipFile, OSError):
# npz-files can be corrupted, propably a bug for big arrays saved with savez_compressed?
# npz-files can be corrupted, probably a bug for big arrays saved with
# savez_compressed?
logger.info("Removing zip-File: %s", zipname)
os.remove(nojump_load_filename(reader))
return
try:
if data["checksum"] == checksum(NOJUMP_MAGIC, checksum(reader)):
reader.nojump_matrixes = tuple(
reader.nojump_matrices = tuple(
sparse.csr_matrix(
tuple(data["{}_{}".format(attr, d)] for attr in CSR_ATTRS),
shape=data["shape"],
@@ -228,7 +258,7 @@ def load_nojump_matrixes(reader):
for d in range(3)
)
logger.info(
"Loaded Nojump Matrixes: {}".format(nojump_load_filename(reader))
"Loaded Nojump matrices: {}".format(nojump_load_filename(reader))
)
else:
logger.info("Invlaid Nojump Data: {}".format(nojump_load_filename(reader)))
@@ -238,92 +268,19 @@ def load_nojump_matrixes(reader):
return
def correct_nojump_matrixes_for_whole(trajectory):
def correct_nojump_matrices_for_whole(trajectory: Coordinates):
reader = trajectory.frames
frame = trajectory[0]
box = frame.box.diagonal()
cor = ((frame - frame.whole) / box).round().astype(np.int8)
for d in range(3):
reader.nojump_matrixes[d][0] = cor[:, d]
save_nojump_matrixes(reader)
reader.nojump_matrices[d][0] = cor[:, d]
save_nojump_matrices(reader)
def energy_reader(file):
def energy_reader(file: str):
"""Reads a gromacs energy file with mdanalysis and returns an auxiliary file.
Args:
file: Filename of the energy file
"""
return mdanalysis.auxiliary.EDR.EDRReader(file)
class BaseReader:
"""Base class for trajectory readers."""
@property
def filename(self):
return self.rd.filename
@property
def nojump_matrixes(self):
if self._nojump_matrixes is None:
raise NojumpError("Nojump Data not available: {}".format(self.filename))
return self._nojump_matrixes
@nojump_matrixes.setter
def nojump_matrixes(self, mats):
self._nojump_matrixes = mats
def __init__(self, rd):
"""
Args:
filename: Trajectory file to open.
reindex (bool, opt.): If True, regenerate the index file if necessary.
"""
self.rd = rd
self._nojump_matrixes = None
if path.exists(nojump_load_filename(self)):
load_nojump_matrixes(self)
def __getitem__(self, item):
return self.rd[item]
def __len__(self):
return len(self.rd)
def __checksum__(self):
if hasattr(self.rd, "cache"):
# Has an pygmx reader
return checksum(self.filename, str(self.rd.cache))
elif hasattr(self.rd, "_xdr"):
# Has an mdanalysis reader
cache = array("L", self.rd._xdr.offsets.tobytes())
return checksum(self.filename, str(cache))
class CachedReader(BaseReader):
"""A reader that has a least-recently-used cache for frames."""
@property
def cache_info(self):
"""Get Information about the lru cache."""
return self._get_item.cache_info()
def clear_cache(self):
"""Clear the cache of the frames."""
self._get_item.cache_clear()
def __init__(self, rd, maxsize):
"""
Args:
filename (str): Trajectory file that will be opened.
maxsize: Maximum size of the lru_cache or None for infinite cache.
"""
super().__init__(rd)
self._get_item = lru_cache(maxsize=maxsize)(self._get_item)
def _get_item(self, item):
"""Buffer function for lru_cache, since __getitem__ can not be cached."""
return super().__getitem__(item)
def __getitem__(self, item):
return self._get_item(item)
return MDAnalysis.auxiliary.EDR.EDRReader(file)

58
src/mdevaluate/system.py Normal file
View File

@@ -0,0 +1,58 @@
import abc
from dataclasses import dataclass, field
from subprocess import run
from typing import Iterable
import pandas as pd
from tables import NoSuchNodeError
@dataclass(kw_only=True)
class MDSystem(abc.ABC):
load_only_results: bool = False
system_dir: str = field(init=False)
@abc.abstractmethod
def _add_description(self, data: pd.DataFrame) -> pd.DataFrame:
pass
def save_results(self, data: pd.DataFrame, key: str) -> None:
data = self._add_description(data)
hdf5_file = f"{self.system_dir}/out/results.h5"
data.to_hdf(hdf5_file, key=key, complevel=9, complib="blosc")
def load_results(self, key: str) -> pd.DataFrame:
hdf5_file = f"{self.system_dir}/out/results.h5"
data = pd.read_hdf(hdf5_file, key=key)
if isinstance(data, pd.DataFrame):
return data
else:
raise TypeError("Result is not a DataFrame!")
def cleanup_results(self) -> None:
hdf5_file = f"{self.system_dir}/out/results.h5"
hdf5_temp_file = f"{self.system_dir}/out/results_temp.h5"
run(
[
"ptrepack",
"--chunkshape=auto",
"--propindexes",
"--complevel=9",
"--complib=blosc",
hdf5_file,
hdf5_temp_file,
]
)
run(["mv", hdf5_temp_file, hdf5_file])
def load_and_concat_data(systems: Iterable[MDSystem], key: str, verbose: bool = False):
data = []
for system in systems:
try:
data.append(system.load_results(key=key))
if verbose:
print(f"Load {system}")
except (FileNotFoundError, KeyError, NoSuchNodeError):
continue
return pd.concat(data, ignore_index=True)

View File

@@ -2,20 +2,23 @@
Collection of utility functions.
"""
import functools
from types import FunctionType
from time import time as pytime
from subprocess import run
from typing import Callable, Optional, Union
import numpy as np
from numpy.typing import ArrayLike, NDArray
import pandas as pd
from .functions import kww, kww_1e
from scipy.ndimage import uniform_filter1d
from scipy.interpolate import interp1d
from scipy.optimize import curve_fit
from .logging import logger
from .functions import kww, kww_1e
def five_point_stencil(xdata, ydata):
def five_point_stencil(xdata: ArrayLike, ydata: ArrayLike) -> ArrayLike:
"""
Calculate the derivative dy/dx with a five point stencil.
This algorith is only valid for equally distributed x values.
@@ -25,7 +28,8 @@ def five_point_stencil(xdata, ydata):
ydata: y values of the data points
Returns:
Values where the derivative was estimated and the value of the derivative at these points.
Values where the derivative was estimated and the value of the derivative at
these points.
This algorithm is only valid for values on a regular grid, for unevenly distributed
data it is only an approximation, albeit a quite good one.
@@ -39,32 +43,32 @@ def five_point_stencil(xdata, ydata):
def filon_fourier_transformation(
time,
correlation,
frequencies=None,
derivative="linear",
imag=True,
):
time: NDArray,
correlation: NDArray,
frequencies: Optional[NDArray] = None,
derivative: Union[str, NDArray] = "linear",
imag: bool = True,
) -> tuple[NDArray, NDArray]:
"""
Fourier-transformation for slow varrying functions. The filon algorithmus is
Fourier-transformation for slow varying functions. The filon algorithm is
described in detail in ref [Blochowicz]_, ch. 3.2.3.
Args:
time: List of times where the correlation function was sampled.
time: List of times when the correlation function was sampled.
correlation: Values of the correlation function.
frequencies (opt.):
List of frequencies where the fourier transformation will be calculated.
If None the frequencies will be choosen based on the input times.
If None the frequencies will be chosen based on the input times.
derivative (opt.):
Approximation algorithmus for the derivative of the correlation function.
Approximation algorithm for the derivative of the correlation function.
Possible values are: 'linear', 'stencil' or a list of derivatives.
imag (opt.): If imaginary part of the integral should be calculated.
If frequencies are not explicitly given they will be evenly placed on a log scale
If frequencies are not explicitly given, they will be evenly placed on a log scale
in the interval [1/tmax, 0.1/tmin] where tmin and tmax are the smallest respectively
the biggest time (greater than 0) of the provided times. The frequencies are cut off
at high values by one decade, since the fourier transformation deviates quite strongly
in this regime.
at high values by one decade, since the fourier transformation deviates quite
strongly in this regime.
.. [Blochowicz]
T. Blochowicz, Broadband dielectric spectroscopy in neat and binary
@@ -82,11 +86,12 @@ def filon_fourier_transformation(
_, derivative = five_point_stencil(time, correlation)
time = ((time[2:-1] * time[1:-2]) ** 0.5).reshape(-1, 1)
derivative = derivative.reshape(-1, 1)
elif np.iterable(derivative) and len(time) is len(derivative):
elif isinstance(derivative, NDArray) and len(time) is len(derivative):
derivative.reshape(-1, 1)
else:
raise NotImplementedError(
'Invalid approximation method {}. Possible values are "linear", "stencil" or a list of values.'
'Invalid approximation method {}. Possible values are "linear", "stencil" '
"or a list of values."
)
time = time.reshape(-1, 1)
@@ -107,34 +112,12 @@ def filon_fourier_transformation(
+ 1j * correlation[0] / frequencies
)
return (
frequencies.reshape(
-1,
),
fourier,
)
return frequencies.reshape(-1), fourier
def mask2indices(mask):
"""
Return the selected indices of an array mask.
If the mask is two-dimensional, the indices will be calculated for the second axis.
Example:
>>> mask2indices([True, False, True, False])
array([0, 2])
>>> mask2indices([[True, True, False], [True, False, True]])
array([[0, 1], [0, 2]])
"""
mask = np.array(mask)
if len(mask.shape) == 1:
indices = np.where(mask)
else:
indices = np.array([np.where(m) for m in mask])
return indices
def superpose(x1, y1, x2, y2, N=100, damping=1.0):
def superpose(
x1: NDArray, y1: NDArray, x2: NDArray, y2: NDArray, damping: float = 1.0
) -> tuple[NDArray, NDArray]:
if x2[0] == 0:
x2 = x2[1:]
y2 = y2[1:]
@@ -142,12 +125,12 @@ def superpose(x1, y1, x2, y2, N=100, damping=1.0):
reg1 = x1 < x2[0]
reg2 = x2 > x1[-1]
x_ol = np.logspace(
np.log10(max(x1[~reg1][0], x2[~reg2][0]) + 0.001),
np.log10(min(x1[~reg1][-1], x2[~reg2][-1]) - 0.001),
(sum(~reg1) + sum(~reg2)) / 2,
np.log10(np.max(x1[~reg1][0], x2[~reg2][0]) + 0.001),
np.log10(np.min(x1[~reg1][-1], x2[~reg2][-1]) - 0.001),
(np.sum(~reg1) + np.sum(~reg2)) / 2,
)
def w(x):
def w(x: NDArray) -> NDArray:
A = x_ol.min()
B = x_ol.max()
return (np.log10(B / x) / np.log10(B / A)) ** damping
@@ -165,21 +148,7 @@ def superpose(x1, y1, x2, y2, N=100, damping=1.0):
return xdata, ydata
def runningmean(data, nav):
"""
Compute the running mean of a 1-dimenional array.
Args:
data: Input data of shape (N, )
nav: Number of points over which the data will be averaged
Returns:
Array of shape (N-(nav-1), )
"""
return np.convolve(data, np.ones((nav,)) / nav, mode="valid")
def moving_average(A, n=3):
def moving_average(data: NDArray, n: int = 3) -> NDArray:
"""
Compute the running mean of an array.
Uses the second axis if it is of higher dimensionality.
@@ -192,69 +161,77 @@ def moving_average(A, n=3):
Array of shape (N-(n-1), )
Supports 2D-Arrays.
Slower than runningmean for small n but faster for large n.
"""
k1 = int(n / 2)
k2 = int((n - 1) / 2)
if k2 == 0:
if A.ndim > 1:
return uniform_filter1d(A, n)[:, k1:]
return uniform_filter1d(A, n)[k1:]
if A.ndim > 1:
return uniform_filter1d(A, n)[:, k1:-k2]
return uniform_filter1d(A, n)[k1:-k2]
if data.ndim > 1:
return uniform_filter1d(data, n)[:, k1:]
return uniform_filter1d(data, n)[k1:]
if data.ndim > 1:
return uniform_filter1d(data, n)[:, k1:-k2]
return uniform_filter1d(data, n)[k1:-k2]
def coherent_sum(func, coord_a, coord_b):
def coherent_sum(
func: Callable[[ArrayLike, ArrayLike], float],
coord_a: ArrayLike,
coord_b: ArrayLike,
) -> float:
"""
Perform a coherent sum over two arrays :math:`A, B`.
.. math::
\\frac{1}{N_A N_B}\\sum_i\\sum_j f(A_i, B_j)
For numpy arrays this is equal to::
For numpy arrays, this is equal to::
N, d = x.shape
M, d = y.shape
coherent_sum(f, x, y) == f(x.reshape(N, 1, d), x.reshape(1, M, d)).sum()
Args:
func: The function is called for each two items in both arrays, this should return a scalar value.
coord_a, coord_b: The two arrays.
func: The function is called for each two items in both arrays, this should
return a scalar value.
coord_a: First array.
coord_b: Second array.
"""
if isinstance(func, FunctionType):
func = numba.jit(func, nopython=True, cache=True)
def cohsum(coord_a, coord_b):
res = 0
for i in range(len(coord_a)):
for j in range(len(coord_b)):
res += func(coord_a[i], coord_b[j])
return res
return cohsum(coord_a, coord_b)
res = 0
for i in range(len(coord_a)):
for j in range(len(coord_b)):
res += func(coord_a[i], coord_b[j])
return res
def coherent_histogram(func, coord_a, coord_b, bins, distinct=False):
def coherent_histogram(
func: Callable[[ArrayLike, ArrayLike], float],
coord_a: ArrayLike,
coord_b: ArrayLike,
bins: ArrayLike,
distinct: bool = False,
) -> NDArray:
"""
Compute a coherent histogram over two arrays, equivalent to coherent_sum.
For numpy arrays ofthis is equal to::
For numpy arrays, this is equal to::
N, d = x.shape
M, d = y.shape
bins = np.arange(1, 5, 0.1)
coherent_histogram(f, x, y, bins) == histogram(f(x.reshape(N, 1, d), x.reshape(1, M, d)), bins=bins)
coherent_histogram(f, x, y, bins) == histogram(
f(x.reshape(N, 1, d), x.reshape(1, M, d)), bins=bins
)
Args:
func: The function is called for each two items in both arrays, this should return a scalar value.
coord_a, coord_b: The two arrays.
bins: The bins used for the histogram must be distributed regular on a linear scale.
func: The function is called for each two items in both arrays, this should
return a scalar value.
coord_a: First array.
coord_b: Second array.
bins: The bins used for the histogram must be distributed regularly on a linear
scale.
distinct: Only calculate distinct part.
"""
if isinstance(func, FunctionType):
func = numba.jit(func, nopython=True, cache=True)
assert np.isclose(
np.diff(bins).mean(), np.diff(bins)
).all(), "A regular distribution of bins is required."
@@ -263,44 +240,46 @@ def coherent_histogram(func, coord_a, coord_b, bins, distinct=False):
N = len(bins) - 1
dh = (hmax - hmin) / N
def cohsum(coord_a, coord_b):
res = np.zeros((N,))
for i in range(len(coord_a)):
for j in range(len(coord_b)):
if not (distinct and i == j):
h = func(coord_a[i], coord_b[j])
if hmin <= h < hmax:
res[int((h - hmin) / dh)] += 1
return res
return cohsum(coord_a, coord_b)
res = np.zeros((N,))
for i in range(len(coord_a)):
for j in range(len(coord_b)):
if not (distinct and i == j):
h = func(coord_a[i], coord_b[j])
if hmin <= h < hmax:
res[int((h - hmin) / dh)] += 1
return res
def Sq_from_gr(r, gr, q, ρ):
def Sq_from_gr(r: NDArray, gr: NDArray, q: NDArray, n: float) -> NDArray:
r"""
Compute the static structure factor as fourier transform of the pair correlation function. [Yarnell]_
Compute the static structure factor as fourier transform of the pair correlation
function. [Yarnell]_
.. math::
S(q) - 1 = \\frac{4\\pi \\rho}{q}\\int\\limits_0^\\infty (g(r) - 1)\\,r \\sin(qr) dr
S(q)-1 = \\frac{4\\pi\\rho}{q}\\int\\limits_0^\\infty (g(r)-1)\\,r \\sin(qr) dr
Args:
r: Radii of the pair correlation function
gr: Values of the pair correlation function
q: List of q values
ρ: Average number density
n: Average number density
.. [Yarnell]
Yarnell, J. L., Katz, M. J., Wenzel, R. G., & Koenig, S. H. (1973). Physical Review A, 7(6), 21302144.
Yarnell, J. L., Katz, M. J., Wenzel, R. G., & Koenig, S. H. (1973). Physical
Review A, 7(6), 21302144.
http://doi.org/10.1017/CBO9781107415324.004
"""
ydata = ((gr - 1) * r).reshape(-1, 1) * np.sin(r.reshape(-1, 1) * q.reshape(1, -1))
return np.trapz(x=r, y=ydata, axis=0) * (4 * np.pi * ρ / q) + 1
return np.trapz(x=r, y=ydata, axis=0) * (4 * np.pi * n / q) + 1
def Fqt_from_Grt(data, q):
def Fqt_from_Grt(
data: Union[pd.DataFrame, ArrayLike], q: ArrayLike
) -> Union[pd.DataFrame, tuple[NDArray, NDArray]]:
"""
Calculate the ISF from the van Hove function for a given q value by fourier transform.
Calculate the ISF from the van Hove function for a given q value by fourier
transform.
.. math::
F_q(t) = \\int\\limits_0^\\infty dr \\; G(r, t) \\frac{\\sin(qr)}{qr}
@@ -312,8 +291,9 @@ def Fqt_from_Grt(data, q):
q: Value of q.
Returns:
If input data was a dataframe the result will be returned as one too, else two arrays
will be returned, which will contain times and values of Fq(t) respectively.
If input data was a dataframe the result will be returned as one too, else two
arrays will be returned, which will contain times and values of Fq(t)
respectively.
"""
if isinstance(data, pd.DataFrame):
@@ -328,8 +308,11 @@ def Fqt_from_Grt(data, q):
return isf.index, isf.values
def singledispatchmethod(func):
"""A decorator to define a genric instance method, analogue to functools.singledispatch."""
def singledispatchmethod(func: Callable) -> Callable:
"""
A decorator to define a genric instance method, analogue to
functools.singledispatch.
"""
dispatcher = functools.singledispatch(func)
def wrapper(*args, **kw):
@@ -340,25 +323,13 @@ def singledispatchmethod(func):
return wrapper
def histogram(data, bins):
"""Compute the histogram of the given data. Uses numpy.bincount function, if possible."""
dbins = np.diff(bins)
dx = dbins.mean()
if bins.min() == 0 and dbins.std() < 1e-6:
logger.debug("Using numpy.bincount for histogramm compuation.")
hist = np.bincount((data // dx).astype(int), minlength=len(dbins))[: len(dbins)]
else:
hist = np.histogram(data, bins=bins)[0]
return hist, runningmean(bins, 2)
def quick1etau(t, C, n=7):
def quick1etau(t: ArrayLike, C: ArrayLike, n: int = 7) -> float:
"""
Estimate the time for a correlation function that goes from 1 to 0 to decay to 1/e.
If successful, returns tau as fine interpolation with a kww fit.
The data is reduce to points around 1/e to remove short and long times from the kww fit!
The data is reduce to points around 1/e to remove short and long times from the kww
fit!
t is the time
C is C(t) the correlation function
n is the minimum number of points around 1/e required
@@ -384,3 +355,124 @@ def quick1etau(t, C, n=7):
except:
pass
return tau_est
def susceptibility(
time: NDArray, correlation: NDArray, **kwargs
) -> tuple[NDArray, NDArray]:
"""
Calculate the susceptibility of a correlation function.
Args:
time: Timesteps of the correlation data
correlation: Value of the correlation function
"""
frequencies, fourier = filon_fourier_transformation(
time, correlation, imag=False, **kwargs
)
return frequencies, frequencies * fourier
def read_gro(file: str) -> tuple[pd.DataFrame, NDArray, str]:
with open(file, "r") as f:
lines = f.readlines()
description = lines[0].splitlines()[0]
boxsize = lines[-1]
box = boxsize.split()
if len(box) == 3:
box = np.array([[box[0], 0, 0], [0, box[1], 0], [0, 0, box[2]]], dtype=float)
else:
box = np.array(
[
[box[0], box[3], box[4]],
[box[5], box[1], box[6]],
[box[7], box[8], box[2]],
],
dtype=float,
)
atomdata = np.genfromtxt(
file,
delimiter=[5, 5, 5, 5, 8, 8, 8],
dtype="i8,U5,U5,i8,f8,f8,f8",
skip_header=2,
skip_footer=1,
unpack=True,
)
atoms_DF = pd.DataFrame(
{
"residue_id": atomdata[0],
"residue_name": atomdata[1],
"atom_name": atomdata[2],
"atom_id": atomdata[3],
"pos_x": atomdata[4],
"pos_y": atomdata[5],
"pos_z": atomdata[6],
}
)
return atoms_DF, box, description
def write_gro(
file: str, atoms_DF: pd.DataFrame, box: NDArray, description: str
) -> None:
with open(file, "w") as f:
f.write(f"{description} \n")
f.write(f"{len(atoms_DF)}\n")
for i, atom in atoms_DF.iterrows():
f.write(
f"{atom['residue_id']:>5}{atom['residue_name']:<5}"
f"{atom['atom_name']:>5}{atom['atom_id']:>5}"
f"{atom['pos_x']:8.3f}{atom['pos_y']:8.3f}"
f"{atom['pos_z']:8.3f}\n"
)
f.write(
f"{box[0,0]:10.5f}{box[1,1]:10.5f}{box[2,2]:10.5f}"
f"{box[0,1]:10.5f}{box[0,2]:10.5f}{box[1,0]:10.5f}"
f"{box[1,2]:10.5f}{box[2,0]:10.5f}{box[2,1]:10.5f}\n"
)
def fibonacci_sphere(samples: int = 1000) -> NDArray:
points = []
phi = np.pi * (np.sqrt(5.0) - 1.0) # golden angle in radians
for i in range(samples):
y = 1 - (i / float(samples - 1)) * 2 # y goes from 1 to -1
radius = np.sqrt(1 - y * y) # radius at y
theta = phi * i # golden angle increment
x = np.cos(theta) * radius
z = np.sin(theta) * radius
points.append((x, y, z))
return np.array(points)
def timing(function: Callable) -> Callable:
@functools.wraps(function)
def wrap(*args, **kw):
start_time = pytime()
result = function(*args, **kw)
end_time = pytime()
time_needed = end_time - start_time
print(f"Finished in {int(time_needed // 60)} min " f"{int(time_needed % 60)} s")
return result
return wrap
def cleanup_h5(hdf5_file: str) -> None:
hdf5_temp_file = f"{hdf5_file[:-3]}_temp.h5"
run(
[
"ptrepack",
"--chunkshape=auto",
"--propindexes",
"--complevel=9",
"--complib=blosc",
hdf5_file,
hdf5_temp_file,
]
)
run(["mv", hdf5_temp_file, hdf5_file])