Moved functions from mdeval_skloth and removed deprecated code

This commit is contained in:
Sebastian Kloth 2023-11-05 15:06:28 +01:00
parent 4b139db471
commit 1aa5eed106
10 changed files with 655 additions and 516 deletions

View File

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "mdevaluate"
version = "23.7"
version = "23.11"
dependencies = [
"mdanalysis",
"pandas",

View File

@ -11,6 +11,8 @@ from . import functions
from . import pbc
from . import autosave
from . import reader
from . import chill
from . import system
from .logging import logger

79
src/mdevaluate/chill.py Normal file
View File

@ -0,0 +1,79 @@
import numpy as np
from scipy.spatial import KDTree
from scipy.special import sph_harm
def a_ij(atoms, N=4, l=3):
tree = KDTree(atoms)
dist, indices = tree.query(atoms, N + 1)
indices = indices[:, 1:]
vecs = atoms[:, np.newaxis, :] - atoms[indices]
vecs /= np.linalg.norm(vecs, axis=-1)[..., np.newaxis]
theta = np.arctan2(vecs[..., 1], vecs[..., 0]) + np.pi
phi = np.arccos(vecs[..., 2])
qijlm = sph_harm(
np.arange(-l, l + 1)[np.newaxis, np.newaxis, :],
l,
theta[..., np.newaxis],
phi[..., np.newaxis],
)
qilm = np.average(qijlm, axis=1)
qil = np.sum(qilm * np.conj(qilm), axis=-1) ** 0.5
aij = (
np.sum(qilm[:, np.newaxis, :] * np.conj(qilm[indices]), axis=-1)
/ qil[:, np.newaxis]
/ qil[indices]
)
return np.real(aij), indices
def number_of_neighbors(atoms):
tree = KDTree(atoms)
dist, _ = tree.query(atoms, 10, distance_upper_bound=0.35)
return np.array([len(distance[distance < 0.4]) - 1 for distance in dist])
def classify_ice(aij, indices, neighbors, indexSOL):
staggerdBonds = np.sum(aij <= -0.8, axis=1)
eclipsedBonds = np.sum((aij >= -0.35) & (aij <= 0.25), axis=1)
iceTypes = np.full(len(aij), 5)
for i in indexSOL:
if neighbors[i] != 4:
continue
elif staggerdBonds[i] == 4:
iceTypes[i] = 0
elif staggerdBonds[i] == 3 and eclipsedBonds[i] == 1:
iceTypes[i] = 1
elif staggerdBonds[i] == 3:
for j in indices[i]:
if staggerdBonds[j] >= 2:
iceTypes[i] = 2
break
elif staggerdBonds[i] == 2:
for j in indices[i]:
if staggerdBonds[j] >= 3:
iceTypes[i] = 2
break
elif eclipsedBonds[i] == 4:
iceTypes[i] = 3
elif eclipsedBonds[i] == 3:
iceTypes[i] = 4
iceTypes = iceTypes[indexSOL]
return iceTypes
def ice_parts(iceTypes):
cubic = len(iceTypes[iceTypes == 0])
hexagonal = len(iceTypes[iceTypes == 1])
interface = len(iceTypes[iceTypes == 2])
clathrate = len(iceTypes[iceTypes == 3])
clathrate_interface = len(iceTypes[iceTypes == 4])
liquid = len(iceTypes[iceTypes == 5])
return np.array(
[cubic, hexagonal, interface, clathrate, clathrate_interface, liquid]
)

View File

@ -1,37 +0,0 @@
import argparse
from . import logging
from . import open as md_open
def run(*args, **kwargs):
parser = argparse.ArgumentParser()
parser.add_argument(
"xtcfile",
help="The xtc file to index.",
)
parser.add_argument(
"--tpr", help="The tprfile of the trajectory.", dest="tpr", default=None
)
parser.add_argument(
"--nojump",
help="Generate Nojump Matrices, requires a tpr file.",
dest="nojump",
action="store_true",
default=False,
)
parser.add_argument(
"--debug",
help="Set logging level to debug.",
dest="debug",
action="store_true",
default=False,
)
args = parser.parse_args()
if args.debug:
logging.setlevel("DEBUG")
md_open("", trajectory=args.xtcfile, topology=args.tpr, nojump=args.nojump)
if __name__ == "__main__":
run()

View File

@ -339,23 +339,6 @@ class Coordinates:
self.atom_subset.description = desc
class MeanCoordinates(Coordinates):
def __init__(self, frames, atom_filter=None, mean=1):
super().__init__(frames, atom_filter)
self.mean = mean
assert mean >= 1, "Mean must be positive"
def __getitem__(self, item):
frame = super().__getitem__(item)
for i in range(item + 1, item + self.mean):
frame += super().__getitem__(i)
return frame / self.mean
def len(self):
return len(super() - self.mean + 1)
class CoordinatesMap:
def __init__(self, coordinates, function):
self.coordinates = coordinates
@ -419,74 +402,6 @@ class CoordinatesMap:
return CoordinatesMap(self.coordinates.pbc, self.function)
class CoordinatesFilter:
@property
def atom_subset(self):
pass
def __init__(self, coordinates, atom_filter):
self.coordinates = coordinates
self.atom_filter = atom_filter
def __getitem__(self, item):
if isinstance(item, slice):
sliced = copy(self)
sliced.coordinates = self.coordinates[item]
return sliced
else:
frame = self.coordinates[item]
return frame[self.atom_filter]
class CoordinatesKDTree:
"""
A KDTree of coordinates frames. The KDtrees are cached by a :func:`functools.lru_cache`.
Uses :class:`scipy.spatial.cKDTree` by default, since it's significantly faster.
Make sure to use scipy 0.17 or later or switch to the normal KDTree, since cKDTree has
a memory leak in earlier versions.
"""
def clear_cache(self):
"""Clear the LRU cache."""
self._get_tree_at_index.cache_clear()
@property
def cache_info(self):
"""Return info about the state of the cache."""
return self._get_tree_at_index.cache_info()
def _get_tree_at_index(self, index):
frame = self.frames[index]
return self.kdtree(frame[self.selector(frame)])
def __init__(self, frames, selector=None, boxsize=None, maxcache=128, ckdtree=True):
"""
Args:
frames: Trajectory of the simulation, can be Coordinates object or reader
selector: Selector function that selects a subset of each frame
maxcache: Maxsize of the :func:`~functools.lru_cache`
ckdtree: Use :class:`~scipy.spatial.cKDTree` or :class:`~scipy.spatial.KDTree` if False
"""
if selector is not None:
self.selector = selector
else:
self.selector = lambda x: slice(None)
self.frames = frames
self.kdtree = cKDTree if ckdtree else KDTree
if boxsize is not None:
self.kdtree = partial(self.kdtree, boxsize=boxsize)
self._get_tree_at_index = lru_cache(maxsize=maxcache)(self._get_tree_at_index)
def __getitem__(self, index):
return self._get_tree_at_index(index)
def __checksum__(self):
return checksum(self.selector, self.frames)
def __eq__(self, other):
return super().__eq__(other)
def map_coordinates(func):
@wraps(func)
def wrapped(coordinates, **kwargs):
@ -496,38 +411,35 @@ def map_coordinates(func):
@map_coordinates
def centers_of_mass(c, *, masses=None):
def center_of_masses(coordinates, atoms, shear: bool = False):
"""
A- 1
B- 2
A- 1
C 3
A-
B-
A-
C
A-
B-
A-
C
Example:
rd = XTCReader('t.xtc')
coordinates = Coordinates(rd)
com = centers_of_mass(coordinates, (1.0, 2.0, 1.0, 3.0))
"""
# At first, regroup our array
number_of_masses = len(masses)
number_of_coordinates, number_of_dimensions = c.shape
number_of_new_coordinates = number_of_coordinates // number_of_masses
grouped_masses = c.reshape(
number_of_new_coordinates, number_of_masses, number_of_dimensions
)
return np.average(grouped_masses, axis=1, weights=masses)
res_ids = coordinates.residue_ids[atoms]
masses = coordinates.masses[atoms]
if shear:
coords = coordinates[atoms]
box = coords.box
sort_ind = res_ids.argsort(kind="stable")
i = np.concatenate([[0], np.where(np.diff(res_ids[sort_ind]) > 0)[0] + 1])
coms = coords[sort_ind[i]][res_ids - min(res_ids)]
cor = md.pbc.pbc_diff(coords, coms, box)
coords = coms + cor
else:
coords = coordinates.whole[atoms]
mask = np.bincount(res_ids)[1:] != 0
positions = np.array(
[
np.bincount(res_ids, weights=c * masses)[1:]
/ np.bincount(res_ids, weights=masses)[1:]
for c in coords.T
]
).T[mask]
return np.array(positions)
@map_coordinates
@ -587,7 +499,83 @@ def vectors(coordinates, atoms_a, atoms_b, normed=False):
coords_b = coordinates[atoms_b]
if len(coords_b.shape) > 2:
coords_b = coords_b.mean(axis=0)
vectors = pbc_diff(coords_a, coords_b, box=box)
norm = np.linalg.norm(vectors, axis=-1).reshape(-1, 1) if normed else 1
vectors.reference = coords_a
return vectors / norm
vec = pbc_diff(coords_a, coords_b, box=box)
if normed:
vec / np.linalg.norm(vec, axis=-1).reshape(-1, 1)
vec.reference = coords_a
return vec
@map_coordinates
def dipole_vector(coordinates, atoms, normed=None):
coords = coordinates.whole[atoms]
res_ids = coordinates.residue_ids[atoms]
charges = coordinates.charges[atoms]
mask = np.bincount(res_ids)[1:] != 0
dipoles = np.array(
[np.bincount(res_ids, weights=c * charges)[1:] for c in coords.T]
).T[mask]
dipoles = np.array(dipoles)
if normed:
dipoles /= np.linalg.norm(dipoles, axis=-1).reshape(-1, 1)
return dipoles
@map_coordinates
def sum_dipole_vector(
coordinates: CoordinateFrame,
atoms: list[int],
normed: bool = True,
):
coords = coordinates.whole[atoms]
charges = coordinates.charges[atoms]
dipole = np.array([c * charges for c in coords.T]).T
if normed:
dipole /= np.linalg.norm(dipole)
return dipole
@map_coordinates
def normal_vectors(
coordinates: CoordinateFrame,
atoms_a: list[int],
atoms_b: list[int],
atoms_c: list[int],
normed: bool = True,
):
coords_a = coordinates[atoms_a]
coords_b = coordinates[atoms_b]
coords_c = coordinates[atoms_c]
box = coordinates.box
vectors_a = pbc_diff(coords_a, coords_b, box=box)
vectors_b = pbc_diff(coords_a, coords_c, box=box)
vec = np.cross(vectors_a, vectors_b)
if normed:
vec /= np.linalg.norm(vec, axis=-1).reshape(-1, 1)
vec.reference = coords_a
return vec
def displacements_without_drift(
start_frame: CoordinateFrame, end_frame: CoordinateFrame, trajectory: Coordinates
) -> np.array:
start_all = trajectory[start_frame.step]
frame_all = trajectory[end_frame.step]
displacements = (
start_frame
- end_frame
- (np.average(start_all, axis=0) - np.average(frame_all, axis=0))
)
return displacements
@map_coordinates
def cylindrical_coordinates(frame, origin=None):
if origin is None:
origin = np.diag(frame.box) / 2
x = frame[:, 0] - origin[0]
y = frame[:, 1] - origin[1]
z = frame[:, 2]
radius = (x**2 + y**2) ** 0.5
phi = np.arctan2(y, x)
return np.array([radius, phi, z]).T

View File

@ -1,18 +1,17 @@
from typing import Callable
import numpy as np
from numpy.typing import ArrayLike
from scipy.special import legendre
from itertools import chain
import dask.array as darray
from pathos.pools import ProcessPool
from functools import partial
from scipy.spatial import KDTree
from .autosave import autosave_data
from .utils import filon_fourier_transformation, coherent_sum, histogram
from .pbc import pbc_diff
def set_has_counter(func):
func.has_counter = True
return func
from .utils import coherent_sum, histogram
from .pbc import pbc_diff, pbc_points
from .coordinates import Coordinates, CoordinateFrame, displacements_without_drift
def log_indices(first, last, num=100):
@ -26,85 +25,17 @@ def correlation(function, frames):
return map(lambda f: function(start_frame, f), chain([start_frame], iterator))
def subensemble_correlation(selector_function, correlation_function=correlation):
def c(function, frames):
iterator = iter(frames)
start_frame = next(iterator)
selector = selector_function(start_frame)
subensemble = map(lambda f: f[selector], chain([start_frame], iterator))
return correlation_function(function, subensemble)
return c
def multi_subensemble_correlation(selector_function):
"""
selector_function has to expect a frame and to
return either valid indices (as with subensemble_correlation)
or a multidimensional array whose entries are valid indices
e.g. slice(10,100,2)
e.g. [1,2,3,4,5]
e.g. [[[0,1],[2],[3]],[[4],[5],[6]] -> shape: 2,3 with
list of indices of varying length
e.g. [slice(1653),slice(1653,None,3)]
e.g. [np.ones(len_of_frames, bool)]
in general using slices is the most efficient.
if the selections are small subsets of a frame or when many subsets are empty
using indices will be more efficient than using masks.
"""
@set_has_counter
def cmulti(function, frames):
iterator = iter(frames)
start_frame = next(iterator)
selectors = np.asarray(selector_function(start_frame))
sel_shape = selectors.shape
if sel_shape[-1] == 0:
selectors = np.asarray(selectors, int)
if selectors.dtype != object:
sel_shape = sel_shape[:-1]
f_values = np.zeros(
sel_shape + function(start_frame, start_frame).shape,
)
count = np.zeros(sel_shape, dtype=int)
is_first_frame_loop = True
def cc(act_frame):
nonlocal is_first_frame_loop
for index in np.ndindex(sel_shape):
sel = selectors[index]
sf_sel = start_frame[sel]
if is_first_frame_loop:
count[index] = len(sf_sel)
f_values[index] = (
function(sf_sel, act_frame[sel]) if count[index] != 0 else 0
)
is_first_frame_loop = False
return np.asarray(f_values.copy())
return map(cc, chain([start_frame], iterator)), count
return cmulti
@autosave_data(2)
def shifted_correlation(
function,
frames,
selector=None,
multi_selector=None,
segments=10,
skip=0.1,
window=0.5,
average=True,
points=100,
nodes=1,
function: Callable,
frames: Coordinates,
selector: ArrayLike = None,
multi_selector: ArrayLike = None,
segments: int = 10,
skip: float = 0.1,
window: float = 0.5,
average: bool = True,
points: int = 100,
):
"""
Calculate the time series for a correlation function.
@ -141,9 +72,6 @@ def shifted_correlation(
points (int, opt.):
The number of timeshifts for which the correlation
should be calculated
nodes (int, opt.):
Number of nodes used for parallelization
Returns:
tuple:
A list of length N that contains the timeshiftes of the frames at which
@ -186,9 +114,7 @@ def shifted_correlation(
correlation = []
for index in indices:
correlation.append(
get_correlation(
frames, start_frame, index, shifted_idx
)
get_correlation(frames, start_frame, index, shifted_idx)
)
return correlation
@ -210,33 +136,18 @@ def shifted_correlation(
idx = np.unique(np.int_(ls) - 1)
t = np.array([frames[i].time for i in idx]) - frames[0].time
if nodes == 1:
result = np.array(
[
apply_selector(start_frame, frames=frames, idx=idx,
selector=selector,
multi_selector=multi_selector)
for start_frame in start_frames
]
)
else:
pool = ProcessPool(nodes=nodes)
# Use try finally instead of a context manager to ensure the pool is
# restarted in case of working in a jupyter-notebook,
# otherwise the kernel has to be restarted.
try:
result = np.array(
pool.map(
partial(apply_selector,
apply_selector(
start_frame,
frames=frames,
idx=idx,
selector=selector,
multi_selector=multi_selector), start_frames
multi_selector=multi_selector,
)
for start_frame in start_frames
]
)
finally:
pool.terminate()
pool.restart()
if average:
clean_result = []
@ -250,24 +161,62 @@ def shifted_correlation(
return t, result
def msd(start, frame):
def msd(
start_frame: CoordinateFrame,
end_frame: CoordinateFrame,
trajectory: Coordinates = None,
axis: str = "all",
) -> float:
"""
Mean square displacement
"""
vec = start - frame
return (vec**2).sum(axis=1).mean()
if trajectory is None:
displacements = start_frame - end_frame
else:
displacements = displacements_without_drift(start_frame, end_frame, trajectory)
if axis == "all":
return (displacements**2).sum(axis=1).mean()
elif axis == "x":
return (displacements[:, 0] ** 2).mean()
elif axis == "y":
return (displacements[:, 1] ** 2).mean()
elif axis == "z":
return (displacements[:, 2] ** 2).mean()
else:
raise ValueError('Parameter axis has to be ether "all", "x", "y", or "z"!')
def isf(start, frame, q, box=None):
def isf(
start_frame: CoordinateFrame,
end_frame: CoordinateFrame,
q: float = 22.7,
trajectory: Coordinates = None,
axis: str = "all",
) -> float:
"""
Incoherent intermediate scattering function. To specify q, use
water_isf = functools.partial(isf, q=22.77) # q has the value 22.77 nm^-1
:param q: length of scattering vector
"""
vec = start - frame
distance = (vec**2).sum(axis=1) ** 0.5
if trajectory is None:
displacements = start_frame - end_frame
else:
displacements = displacements_without_drift(start_frame, end_frame, trajectory)
if axis == "all":
distance = (displacements**2).sum(axis=1) ** 0.5
return np.sinc(distance * q / np.pi).mean()
elif axis == "x":
distance = np.abs(displacements[:, 0])
return np.mean(np.cos(np.abs(q * distance)))
elif axis == "y":
distance = np.abs(displacements[:, 1])
return np.mean(np.cos(np.abs(q * distance)))
elif axis == "z":
distance = np.abs(displacements[:, 2])
return np.mean(np.cos(np.abs(q * distance)))
else:
raise ValueError('Parameter axis has to be ether "all", "x", "y", or "z"!')
def rotational_autocorrelation(onset, frame, order=2):
@ -287,16 +236,37 @@ def rotational_autocorrelation(onset, frame, order=2):
return poly(scalar_prod).mean()
def van_hove_self(start, end, bins):
def van_hove_self(
start_frame: CoordinateFrame,
end_frame: CoordinateFrame,
bins: ArrayLike,
trajectory: Coordinates = None,
axis: str = "all",
):
r"""
Compute the self part of the Van Hove autocorrelation function.
..math::
G(r, t) = \sum_i \delta(|\vec r_i(0) - \vec r_i(t)| - r)
"""
vec = start - end
delta_r = ((vec) ** 2).sum(axis=-1) ** 0.5
return 1 / len(start) * histogram(delta_r, bins)[0]
if trajectory is None:
vectors = start_frame - end_frame
else:
vectors = displacements_without_drift(start_frame, end_frame, trajectory)
if axis == "all":
delta_r = (vectors**2).sum(axis=1) ** 0.5
elif axis == "x":
delta_r = np.abs(vectors[:, 0])
elif axis == "y":
delta_r = np.abs(vectors[:, 1])
elif axis == "z":
delta_r = np.abs(vectors[:, 2])
else:
raise ValueError('Parameter axis has to be ether "all", "x", "y", or "z"!')
hist = np.histogram(delta_r, bins)[0]
r = (bins[1:] + bins[:-1]) / 2
return r, hist / len(start_frame)
def van_hove_distinct(
@ -346,14 +316,18 @@ def van_hove_distinct(
return hist / N
def overlap(onset, frame, crds_tree, radius):
def overlap(
start_frame: CoordinateFrame,
end_frame: CoordinateFrame,
radius: float = 0.1,
mode: str = "self",
):
"""
Compute the overlap with a reference configuration defined in a CoordinatesTree.
Args:
onset: Initial frame, this is only used to get the frame index
frame: The current configuration
crds_tree: A CoordinatesTree of the reference configurations
radius: The cutoff radius for the overlap
This function is intended to be used with :func:`shifted_correlation`.
@ -369,24 +343,21 @@ def overlap(onset, frame, crds_tree, radius):
... traj
... )
"""
tree = crds_tree[onset.step]
return (tree.query(frame)[0] <= radius).sum() / tree.n
def susceptibility(time, correlation, **kwargs):
"""
Calculate the susceptibility of a correlation function.
Args:
time: Timesteps of the correlation data
correlation: Value of the correlation function
**kwargs (opt.):
Additional keyword arguments will be passed to :func:`filon_fourier_transformation`.
"""
frequencies, fourier = filon_fourier_transformation(
time, correlation, imag=False, **kwargs
start_PBC, index_PBC = pbc_points(
start_frame, start_frame.box, index=True, thickness=2 * radius
)
return frequencies, frequencies * fourier
start_tree = KDTree(start_PBC)
dist, index_dist = start_tree.query(end_frame, 1, distance_upper_bound=radius)
if mode == "indifferent":
return np.sum(dist <= radius) / len(start_frame)
index_dist = index_PBC[index_dist]
index = np.arange(len(start_frame))
index = index[dist <= radius]
index_dist = index_dist[dist <= radius]
if mode == "self":
return np.sum(index == index_dist) / len(start_frame)
elif mode == "distinct":
return np.sum(index != index_dist) / len(start_frame)
def coherent_scattering_function(onset, frame, q):
@ -414,11 +385,35 @@ def coherent_scattering_function(onset, frame, q):
return coherent_sum(scfunc, onset.pbc, frame.pbc) / len(onset)
def non_gaussian(onset, frame):
def non_gaussian_parameter(
start_frame: CoordinateFrame,
end_frame: CoordinateFrame,
trajectory: Coordinates = None,
axis: str = "all",
) -> float:
"""
Calculate the Non-Gaussian parameter :
..math:
\alpha_2 (t) = \frac{3}{5}\frac{\langle r_i^4(t)\rangle}{\langle r_i^2(t)\rangle^2} - 1
\alpha_2 (t) =
\frac{3}{5}\frac{\langle r_i^4(t)\rangle}{\langle r_i^2(t)\rangle^2} - 1
"""
r_2 = ((frame - onset) ** 2).sum(axis=-1)
return 3 / 5 * (r_2**2).mean() / r_2.mean() ** 2 - 1
if trajectory is None:
vectors = start_frame - end_frame
else:
vectors = displacements_without_drift(start_frame, end_frame, trajectory)
if axis == "all":
r = (vectors**2).sum(axis=1)
dimensions = 3
elif axis == "x":
r = vectors[:, 0] ** 2
dimensions = 1
elif axis == "y":
r = vectors[:, 1] ** 2
dimensions = 1
elif axis == "z":
r = vectors[:, 2] ** 2
dimensions = 1
else:
raise ValueError('Parameter axis has to be ether "all", "x", "y", or "z"!')
return (np.mean(r**2) / ((1 + 2 / dimensions) * (np.mean(r) ** 2))) - 1

View File

@ -1,12 +1,14 @@
import numpy as np
from numpy.typing import ArrayLike
from scipy import spatial
from scipy.spatial import KDTree
from .coordinates import rotate_axis, polar_coordinates, spherical_coordinates
from .coordinates import rotate_axis, polar_coordinates, Coordinates, CoordinateFrame
from .atoms import next_neighbors
from .autosave import autosave_data
from .utils import runningmean
from .pbc import pbc_diff, pbc_points
from .logging import logger
from scipy import spatial
@autosave_data(nargs=2, kwargs_keys=("coordinates_b",), version="time_average-1")
@ -78,15 +80,15 @@ def time_histogram(function, coordinates, bins, hist_range, pool=None):
return hist_results
def rdf(
atoms_a,
atoms_b=None,
bins=None,
box=None,
kind=None,
chunksize=50000,
returnx=False,
**kwargs
def calc_gr(
traj_a: Coordinates,
traj_b: Coordinates = None,
r_max: float = None,
delta_r: float = 0.02,
segments: int = 1000,
skip: float = 0.1,
remove_intra: bool = False,
shear: bool = False,
):
r"""
Compute the radial pair distribution of one or two sets of atoms.
@ -111,161 +113,78 @@ def rdf(
as large as possible, depending on the available memory.
returnx (opt.): If True the x ordinate of the histogram is returned.
"""
assert bins is not None, "Bins of the pair distribution have to be defined."
assert kind in [
"intra",
"inter",
None,
], "Argument kind must be one of the following: intra, inter, None."
if box is None:
box = atoms_a.box.diagonal()
if atoms_b is None:
atoms_b = atoms_a
nr_of_atoms = len(atoms_a)
indices = np.triu_indices(nr_of_atoms, k=1)
else:
nr_a, dim = atoms_a.shape
nr_b, dim = atoms_b.shape
indices = np.array([(i, j) for i in range(nr_a) for j in range(nr_b)]).T
# compute the histogram in chunks for large systems
hist = 0
nr_of_samples = 0
for chunk in range(0, len(indices[0]), chunksize):
sl = slice(chunk, chunk + chunksize)
diff = pbc_diff(atoms_a[indices[0][sl]], atoms_b[indices[1][sl]], box)
dist = (diff**2).sum(axis=1) ** 0.5
if kind == "intra":
mask = (
atoms_a.residue_ids[indices[0][sl]]
== atoms_b.residue_ids[indices[1][sl]]
)
dist = dist[mask]
elif kind == "inter":
mask = (
atoms_a.residue_ids[indices[0][sl]]
!= atoms_b.residue_ids[indices[1][sl]]
)
dist = dist[mask]
nr_of_samples += len(dist)
hist += np.histogram(dist, bins)[0]
volume = 4 / 3 * np.pi * (bins[1:] ** 3 - bins[:-1] ** 3)
density = nr_of_samples / np.prod(box)
res = hist / volume / density
if returnx:
return np.vstack((runningmean(bins, 2), res))
else:
return res
def pbc_tree_rdf(
atoms_a, atoms_b=None, bins=None, box=None, exclude=0, returnx=False, **kwargs
):
if box is None:
box = atoms_a.box.diagonal()
all_coords = pbc_points(atoms_b, box, thickness=np.amax(bins) + 0.1)
to_tree = spatial.cKDTree(all_coords)
dist = to_tree.query(
pbc_diff(atoms_a, box=box),
k=len(atoms_b),
distance_upper_bound=np.amax(bins) + 0.1,
)[0].flatten()
dist = dist[dist < np.inf]
hist = np.histogram(dist, bins)[0]
volume = 4 / 3 * np.pi * (bins[1:] ** 3 - bins[:-1] ** 3)
res = (hist) * np.prod(box) / volume / len(atoms_a) / (len(atoms_b) - exclude)
if returnx:
return np.vstack((runningmean(bins, 2), res))
else:
return res
def pbc_spm_rdf(
atoms_a, atoms_b=None, bins=None, box=None, exclude=0, returnx=False, **kwargs
):
if box is None:
box = atoms_a.box
all_coords = pbc_points(atoms_b, box, thickness=np.amax(bins) + 0.1)
to_tree = spatial.cKDTree(all_coords)
if all_coords.nbytes / 1024**3 * len(atoms_a) < 2:
from_tree = spatial.cKDTree(pbc_diff(atoms_a, box=box))
dist = to_tree.sparse_distance_matrix(
from_tree, max_distance=np.amax(bins) + 0.1, output_type="ndarray"
)
dist = np.asarray(dist.tolist())[:, 2]
hist = np.histogram(dist, bins)[0]
else:
chunksize = int(
2 * len(atoms_a) / (all_coords.nbytes / 1024**3 * len(atoms_a))
)
hist = 0
for chunk in range(0, len(atoms_a), chunksize):
sl = slice(chunk, chunk + chunksize)
from_tree = spatial.cKDTree(pbc_diff(atoms_a[sl], box=box))
dist = to_tree.sparse_distance_matrix(
from_tree, max_distance=np.amax(bins) + 0.1, output_type="ndarray"
)
dist = np.asarray(dist.tolist())[:, 2]
hist += np.histogram(dist, bins)[0]
volume = 4 / 3 * np.pi * (bins[1:] ** 3 - bins[:-1] ** 3)
res = (hist) * np.prod(box) / volume / len(atoms_a) / (len(atoms_b) - exclude)
if returnx:
return np.vstack((runningmean(bins, 2), res))
else:
return res
@autosave_data(nargs=2, kwargs_keys=("to_coords", "times"))
def fast_averaged_rdf(from_coords, bins, to_coords=None, times=10, exclude=0, **kwargs):
if to_coords is None:
to_coords = from_coords
exclude = 1
# first find timings for the different rdf functions
import time
# only consider sparse matrix for this condition
if (len(from_coords[0]) * len(to_coords[0]) <= 3000 * 2000) & (
len(from_coords[0]) / len(to_coords[0]) > 5
def gr_frame(
atoms_a: CoordinateFrame,
atoms_b: CoordinateFrame,
bins: ArrayLike,
remove_intra: bool = False,
):
funcs = [rdf, pbc_tree_rdf, pbc_spm_rdf]
box = atoms_b.box
n = len(atoms_a) / np.prod(np.diag(box))
V = 4 / 3 * np.pi * bins[-1] ** 3
particles_in_volume = int(n * V * 1.1)
if np.all(np.diag(np.diag(box)) == box):
atoms_b = atoms_b % np.diag(box)
atoms_b_res_ids = atoms_b.residue_ids
atoms_b_tree = KDTree(atoms_b, boxsize=np.diag(box))
else:
funcs = [rdf, pbc_tree_rdf]
timings = []
for f in funcs:
start = time.time()
f(
from_coords[0],
atoms_b=to_coords[0],
bins=bins,
box=np.diag(from_coords[0].box),
atoms_b_pbc, atoms_b_pbc_index = pbc_points(
atoms_b, box, thickness=bins[-1] + 0.1, index=True, shear=shear
)
end = time.time()
timings.append(end - start)
timings = np.array(timings)
timings[0] = (
2 * timings[0]
) # statistics for the other functions is twice as good per frame
logger.debug("rdf function timings: " + str(timings))
rdffunc = funcs[np.argmin(timings)]
logger.debug("rdf function used: " + str(rdffunc))
if rdffunc == rdf:
times = times * 2 # duplicate times for same statistics
atoms_b_res_ids = atoms_b.residue_ids[atoms_b_pbc_index]
atoms_b_tree = KDTree(atoms_b_pbc)
distances, distances_index = atoms_b_tree.query(
atoms_a, particles_in_volume, distance_upper_bound=bins[-1] + 0.1
)
if np.array_equal(atoms_a, atoms_b):
distances = distances[:, 1:]
distances_index = distances_index[:, 1:]
frames = np.array(range(0, len(from_coords), int(len(from_coords) / times)))[:times]
out = np.zeros(len(bins) - 1)
for j, i in enumerate(frames):
logger.debug("multi_radial_pair_distribution: %d/%d", j, len(frames))
out += rdffunc(
from_coords[i],
to_coords[i],
bins,
box=np.diag(from_coords[i].box),
exclude=exclude,
if remove_intra:
new_distances = []
for entry in list(zip(atoms_a.residue_ids, distances, distances_index)):
mask = entry[1] < np.inf
new_distances.append(
entry[1][mask][atoms_b_res_ids[entry[2][mask]] != entry[0]]
)
return out / len(frames)
distances = np.concatenate(new_distances)
else:
distances = distances.flatten()
hist = np.histogram(distances, bins=bins, range=(0, bins[-1]), density=False)[0]
gr = hist / len(atoms_a)
gr = gr / (4 / 3 * np.pi * bins[1:] ** 3 - 4 / 3 * np.pi * bins[:-1] ** 3)
n = len(atoms_b) / np.prod(np.diag(atoms_b.box))
gr = gr / n
return gr, n
if traj_b is None:
traj_b = traj_a
start_frame = traj_a[int(len(traj_a) * skip)]
if r_max:
upper_bound = r_max
else:
upper_bound = round(np.min(np.diag(start_frame.box)) / 2 - 0.05, 1)
num_steps = int(upper_bound * (1 / delta_r) + 1)
bins = np.linspace(0, upper_bound, num_steps)
r = bins[1:] - (bins[1] - bins[0]) / 2
frame_indices = np.unique(
np.int_(np.linspace(len(traj_a) * skip, len(traj_a) - 1, num=segments))
)
gr = []
n = []
for frame_index in frame_indices:
result = gr_frame(
traj_a[frame_index], traj_b[frame_index], bins, remove_intra=remove_intra
)
gr.append(result[0])
n.append(result[1])
gr = np.mean(gr, axis=0)
n = np.mean(n, axis=0)
return r, gr, n
def distance_distribution(atoms, bins):

View File

@ -1,36 +1,42 @@
import numpy as np
from scipy.special import gamma as spgamma
from scipy.integrate import quad as spquad
def kww(t, A, τ, β):
return A * np.exp(-((t / τ) ** β))
def kww(t, A, tau, beta):
return A * np.exp(-((t / tau) ** beta))
def kww_1e(A, τ, β):
return τ * (-np.log(1 / (np.e * A))) ** (1 / β)
def kww_1e(A, tau, beta):
return tau * (-np.log(1 / (np.e * A))) ** (1 / beta)
def cole_davidson(w, A, b, t0):
P = np.arctan(w * t0)
return A * np.cos(P) ** b * np.sin(b * P)
def cole_davidson(omega, A, beta, tau):
P = np.arctan(omega * tau)
return A * np.cos(P) ** beta * np.sin(beta * P)
def cole_cole(w, A, b, t0):
def cole_cole(omega, A, beta, tau):
return (
A
* (w * t0) ** b
* np.sin(np.pi * b / 2)
/ (1 + 2 * (w * t0) ** b * np.cos(np.pi * b / 2) + (w * t0) ** (2 * b))
* (omega * tau) ** beta
* np.sin(np.pi * beta / 2)
/ (
1
+ 2 * (omega * tau) ** beta * np.cos(np.pi * beta / 2)
+ (omega * tau) ** (2 * beta)
)
)
def havriliak_negami(ω, A, β, α, τ):
def havriliak_negami(omega, A, beta, alpha, tau):
r"""
Imaginary part of the Havriliak-Negami function.
.. math::
\chi_{HN}(\omega) = \Im\left(\frac{A}{(1 + (i\omega\tau)^\alpha)^\beta}\right)
"""
return -(A / (1 + (1j * ω * τ) ** α) ** β).imag
return -(A / (1 + (1j * omega * tau) ** alpha) ** beta).imag
# fits decay of correlation times, e.g. with distance to pore walls
@ -38,6 +44,47 @@ def colen(d, X, t8, A):
return t8 * np.exp(A * np.exp(-d / X))
# fits decay of the plateau height of the overlap function, e.g. with distance to pore walls
# fits decay of the plateau height of the overlap function,
# e.g. with distance to pore walls
def colenQ(d, X, Qb, g):
return (1 - Qb) * np.exp(-((d / X) ** g)) + Qb
def vft(T, tau_0, B, T_inf):
return tau_0 * np.exp(B / (T - T_inf))
def arrhenius(T, tau_0, E_a):
return tau_0 * np.exp(E_a / T)
def MLfit(t, tau, A, alpha):
def MLf(z, a):
"""Mittag-Leffler function"""
z = np.atleast_1d(z)
if a == 0:
return 1 / (1 - z)
elif a == 1:
return np.exp(z)
elif a > 1 or all(z > 0):
k = np.arange(100)
return np.polynomial.polynomial.polyval(z, 1 / spgamma(a * k + 1))
# a helper for tricky case, from Gorenflo, Loutchko & Luchko
def _MLf(z, a):
if z < 0:
f = lambda x: (
np.exp(-x * (-z) ** (1 / a))
* x ** (a - 1)
* np.sin(np.pi * a)
/ (x ** (2 * a) + 2 * x**a * np.cos(np.pi * a) + 1)
)
return 1 / np.pi * spquad(f, 0, np.inf)[0]
elif z == 0:
return 1
else:
return MLf(z, a)
return np.vectorize(_MLf)(z, a)
return A * MLf(-((t / tau) ** alpha), alpha)

44
src/mdevaluate/system.py Normal file
View File

@ -0,0 +1,44 @@
import abc
from dataclasses import dataclass, field
from subprocess import run
import pandas as pd
@dataclass(kw_only=True)
class MDSystem(abc.ABC):
load_only_results: bool = False
system_dir: str = field(init=False)
@abc.abstractmethod
def _add_description(self, data: pd.DataFrame) -> pd.DataFrame:
pass
def save_results(self, data: pd.DataFrame, key: str) -> None:
data = self._add_description(data)
hdf5_file = f"{self.system_dir}/out/results.h5"
data.to_hdf(hdf5_file, key=key, complevel=9, complib="blosc")
def load_results(self, key: str) -> pd.DataFrame:
hdf5_file = f"{self.system_dir}/out/results.h5"
data = pd.read_hdf(hdf5_file, key=key)
if isinstance(data, pd.DataFrame):
return data
else:
raise TypeError("Result is not a DataFrame!")
def cleanup_results(self) -> None:
hdf5_file = f"{self.system_dir}/out/results.h5"
hdf5_temp_file = f"{self.system_dir}/out/results_temp.h5"
run(
[
"ptrepack",
"--chunkshape=auto",
"--propindexes",
"--complevel=9",
"--complib=blosc",
hdf5_file,
hdf5_temp_file,
]
)
run(["mv", hdf5_temp_file, hdf5_file])

View File

@ -25,7 +25,8 @@ def five_point_stencil(xdata, ydata):
ydata: y values of the data points
Returns:
Values where the derivative was estimated and the value of the derivative at these points.
Values where the derivative was estimated and the value of the derivative at
these points.
This algorithm is only valid for values on a regular grid, for unevenly distributed
data it is only an approximation, albeit a quite good one.
@ -63,8 +64,8 @@ def filon_fourier_transformation(
If frequencies are not explicitly given they will be evenly placed on a log scale
in the interval [1/tmax, 0.1/tmin] where tmin and tmax are the smallest respectively
the biggest time (greater than 0) of the provided times. The frequencies are cut off
at high values by one decade, since the fourier transformation deviates quite strongly
in this regime.
at high values by one decade, since the fourier transformation deviates quite
strongly in this regime.
.. [Blochowicz]
T. Blochowicz, Broadband dielectric spectroscopy in neat and binary
@ -86,20 +87,21 @@ def filon_fourier_transformation(
derivative.reshape(-1, 1)
else:
raise NotImplementedError(
'Invalid approximation method {}. Possible values are "linear", "stencil" or a list of values.'
'Invalid approximation method {}. Possible values are "linear", "stencil" '
'or a list of values.'
)
time = time.reshape(-1, 1)
integral = (
np.cos(frequencies * time[1:]) - np.cos(frequencies * time[:-1])
) / frequencies**2
) / frequencies ** 2
fourier = (derivative * integral).sum(axis=0)
if imag:
integral = (
1j
* (np.sin(frequencies * time[1:]) - np.sin(frequencies * time[:-1]))
/ frequencies**2
/ frequencies ** 2
)
fourier = (
fourier
@ -219,12 +221,11 @@ def coherent_sum(func, coord_a, coord_b):
coherent_sum(f, x, y) == f(x.reshape(N, 1, d), x.reshape(1, M, d)).sum()
Args:
func: The function is called for each two items in both arrays, this should return a scalar value.
func: The function is called for each two items in both arrays, this should
return a scalar value.
coord_a, coord_b: The two arrays.
"""
if isinstance(func, FunctionType):
func = numba.jit(func, nopython=True, cache=True)
def cohsum(coord_a, coord_b):
res = 0
@ -244,17 +245,18 @@ def coherent_histogram(func, coord_a, coord_b, bins, distinct=False):
N, d = x.shape
M, d = y.shape
bins = np.arange(1, 5, 0.1)
coherent_histogram(f, x, y, bins) == histogram(f(x.reshape(N, 1, d), x.reshape(1, M, d)), bins=bins)
coherent_histogram(f, x, y, bins) == histogram(
f(x.reshape(N, 1, d), x.reshape(1, M, d)), bins=bins
)
Args:
func: The function is called for each two items in both arrays, this should return a scalar value.
func: The function is called for each two items in both arrays, this should
return a scalar value.
coord_a, coord_b: The two arrays.
bins: The bins used for the histogram must be distributed regular on a linear scale.
bins: The bins used for the histogram must be distributed regular on a linear
scale.
"""
if isinstance(func, FunctionType):
func = numba.jit(func, nopython=True, cache=True)
assert np.isclose(
np.diff(bins).mean(), np.diff(bins)
).all(), "A regular distribution of bins is required."
@ -278,7 +280,8 @@ def coherent_histogram(func, coord_a, coord_b, bins, distinct=False):
def Sq_from_gr(r, gr, q, ρ):
r"""
Compute the static structure factor as fourier transform of the pair correlation function. [Yarnell]_
Compute the static structure factor as fourier transform of the pair correlation
function. [Yarnell]_
.. math::
S(q) - 1 = \\frac{4\\pi \\rho}{q}\\int\\limits_0^\\infty (g(r) - 1)\\,r \\sin(qr) dr
@ -290,7 +293,8 @@ def Sq_from_gr(r, gr, q, ρ):
ρ: Average number density
.. [Yarnell]
Yarnell, J. L., Katz, M. J., Wenzel, R. G., & Koenig, S. H. (1973). Physical Review A, 7(6), 21302144.
Yarnell, J. L., Katz, M. J., Wenzel, R. G., & Koenig, S. H. (1973). Physical
Review A, 7(6), 21302144.
http://doi.org/10.1017/CBO9781107415324.004
"""
@ -300,7 +304,8 @@ def Sq_from_gr(r, gr, q, ρ):
def Fqt_from_Grt(data, q):
"""
Calculate the ISF from the van Hove function for a given q value by fourier transform.
Calculate the ISF from the van Hove function for a given q value by fourier
transform.
.. math::
F_q(t) = \\int\\limits_0^\\infty dr \\; G(r, t) \\frac{\\sin(qr)}{qr}
@ -312,8 +317,9 @@ def Fqt_from_Grt(data, q):
q: Value of q.
Returns:
If input data was a dataframe the result will be returned as one too, else two arrays
will be returned, which will contain times and values of Fq(t) respectively.
If input data was a dataframe the result will be returned as one too, else two
arrays will be returned, which will contain times and values of Fq(t)
respectively.
"""
if isinstance(data, pd.DataFrame):
@ -329,7 +335,10 @@ def Fqt_from_Grt(data, q):
def singledispatchmethod(func):
"""A decorator to define a genric instance method, analogue to functools.singledispatch."""
"""
A decorator to define a genric instance method, analogue to
functools.singledispatch.
"""
dispatcher = functools.singledispatch(func)
def wrapper(*args, **kw):
@ -341,7 +350,9 @@ def singledispatchmethod(func):
def histogram(data, bins):
"""Compute the histogram of the given data. Uses numpy.bincount function, if possible."""
"""
Compute the histogram of the given data. Uses numpy.bincount function, if possible.
"""
dbins = np.diff(bins)
dx = dbins.mean()
if bins.min() == 0 and dbins.std() < 1e-6:
@ -358,7 +369,8 @@ def quick1etau(t, C, n=7):
Estimate the time for a correlation function that goes from 1 to 0 to decay to 1/e.
If successful, returns tau as fine interpolation with a kww fit.
The data is reduce to points around 1/e to remove short and long times from the kww fit!
The data is reduce to points around 1/e to remove short and long times from the kww
fit!
t is the time
C is C(t) the correlation function
n is the minimum number of points around 1/e required
@ -384,3 +396,93 @@ def quick1etau(t, C, n=7):
except:
pass
return tau_est
def susceptibility(time, correlation, **kwargs):
"""
Calculate the susceptibility of a correlation function.
Args:
time: Timesteps of the correlation data
correlation: Value of the correlation function
**kwargs (opt.):
Additional keyword arguments will be passed to :func:`filon_fourier_transformation`.
"""
frequencies, fourier = filon_fourier_transformation(
time, correlation, imag=False, **kwargs
)
return frequencies, frequencies * fourier
def read_gro(file):
with open(file, "r") as f:
lines = f.readlines()
description = lines[0].splitlines()[0]
boxsize = lines[-1]
box = boxsize.split()
if len(box) == 3:
box = np.array([[box[0], 0, 0], [0, box[1], 0], [0, 0, box[2]]], dtype=float)
else:
box = np.array(
[
[box[0], box[3], box[4]],
[box[5], box[1], box[6]],
[box[7], box[8], box[2]],
],
dtype=float,
)
atomdata = np.genfromtxt(
file,
delimiter=[5, 5, 5, 5, 8, 8, 8],
dtype="i8,U5,U5,i8,f8,f8,f8",
skip_header=2,
skip_footer=1,
unpack=True,
)
atoms_DF = pd.DataFrame(
{
"residue_id": atomdata[0],
"residue_name": atomdata[1],
"atom_name": atomdata[2],
"atom_id": atomdata[3],
"pos_x": atomdata[4],
"pos_y": atomdata[5],
"pos_z": atomdata[6],
}
)
return atoms_DF, box, description
def write_gro(file, atoms_DF, box, description):
with open(file, "w") as f:
f.write(f"{description} \n")
f.write(f"{len(atoms_DF)}\n")
for i, atom in atoms_DF.iterrows():
f.write(
f"{atom['residue_id']:>5}{atom['residue_name']:<5}"
f"{atom['atom_name']:>5}{atom['atom_id']:>5}"
f"{atom['pos_x']:8.3f}{atom['pos_y']:8.3f}"
f"{atom['pos_z']:8.3f}\n"
)
f.write(
f"{box[0,0]:10.5f}{box[1,1]:10.5f}{box[2,2]:10.5f}"
f"{box[0,1]:10.5f}{box[0,2]:10.5f}{box[1,0]:10.5f}"
f"{box[1,2]:10.5f}{box[2,0]:10.5f}{box[2,1]:10.5f}\n"
)
def fibonacci_sphere(samples=1000):
points = []
phi = np.pi * (np.sqrt(5.0) - 1.0) # golden angle in radians
for i in range(samples):
y = 1 - (i / float(samples - 1)) * 2 # y goes from 1 to -1
radius = np.sqrt(1 - y * y) # radius at y
theta = phi * i # golden angle increment
x = np.cos(theta) * radius
z = np.sin(theta) * radius
points.append((x, y, z))
return np.array(points)