Moved package files to src
This commit is contained in:
102
src/mdevaluate/__init__.py
Normal file
102
src/mdevaluate/__init__.py
Normal file
@@ -0,0 +1,102 @@
|
||||
import os
|
||||
from glob import glob
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from . import atoms
|
||||
from . import coordinates
|
||||
from . import correlation
|
||||
from . import distribution
|
||||
from . import functions
|
||||
from . import pbc
|
||||
from . import autosave
|
||||
from . import reader
|
||||
from .logging import logger
|
||||
|
||||
|
||||
def open(
|
||||
directory="",
|
||||
topology="*.tpr",
|
||||
trajectory="*.xtc",
|
||||
cached=False,
|
||||
nojump=False,
|
||||
index_file=None,
|
||||
charges=None,
|
||||
masses=None,
|
||||
):
|
||||
"""
|
||||
Open a simulation from a directory.
|
||||
|
||||
Args:
|
||||
directory: Directory of the simulation.
|
||||
topology (opt.):
|
||||
Descriptor of the topology file (tpr or gro). By default a tpr file is
|
||||
used, if there is exactly one in the directoy.
|
||||
trajectory (opt.): Descriptor of the trajectory (xtc file).
|
||||
cached (opt.):
|
||||
If the trajectory reader should be cached. Can be True, an integer or None.
|
||||
If this is True maxsize is 128, otherwise this is used as maxsize for
|
||||
the cache, None means infinite cache (this is a potential memory leak!).
|
||||
nojump (opt.): If nojump matrixes should be generated. They will alwyas be loaded if present
|
||||
|
||||
Returns:
|
||||
A Coordinate object of the simulation.
|
||||
|
||||
Example:
|
||||
Open a simulation located in '/path/to/sim', where the trajectory is
|
||||
located in a sub-directory '/path/to/sim/out' and named for Example
|
||||
'nojump_traj.xtc'. All read frames will be cached in memory.
|
||||
|
||||
>>> open('/path/to/sim', trajectory='out/nojump*.xtc', cached=None)
|
||||
|
||||
The file descriptors can use unix style pathname expansion to define the filenames.
|
||||
The default patterns use the recursive placeholder `**` which matches the base or
|
||||
any subdirctory, thus files in subdirectories with matching file type will be found too.
|
||||
For example: 'out/nojump*.xtc' would match xtc files in a subdirectory `out` that
|
||||
start with `nojump` and end with `.xtc`.
|
||||
|
||||
For more details see: https://docs.python.org/3/library/glob.html
|
||||
"""
|
||||
top_glob = glob(os.path.join(directory, topology), recursive=True)
|
||||
if top_glob is not None and len(top_glob) == 1:
|
||||
(top_file,) = top_glob
|
||||
logger.info("Loading topology: {}".format(top_file))
|
||||
if index_file is not None:
|
||||
index_glob = glob(os.path.join(directory, index_file), recursive=True)
|
||||
if index_glob is not None:
|
||||
index_file = index_glob[0]
|
||||
else:
|
||||
index_file = None
|
||||
else:
|
||||
raise FileNotFoundError("Topology file could not be identified.")
|
||||
|
||||
traj_glob = glob(os.path.join(directory, trajectory), recursive=True)
|
||||
if traj_glob is not None and len(traj_glob) == 1:
|
||||
traj_file = traj_glob[0]
|
||||
logger.info("Loading trajectory: {}".format(traj_file))
|
||||
else:
|
||||
raise FileNotFoundError("Trajectory file could not be identified.")
|
||||
|
||||
atom_set, frames = reader.open_with_mdanalysis(
|
||||
top_file,
|
||||
traj_file,
|
||||
cached=cached,
|
||||
index_file=index_file,
|
||||
charges=charges,
|
||||
masses=masses,
|
||||
)
|
||||
coords = coordinates.Coordinates(frames, atom_subset=atom_set)
|
||||
if nojump:
|
||||
try:
|
||||
frames.nojump_matrixes
|
||||
except reader.NojumpError:
|
||||
reader.generate_nojump_matrixes(coords)
|
||||
return coords
|
||||
|
||||
|
||||
def open_energy(file):
|
||||
"""Reads an gromacs energy file and output the data in a pandas DataFrame.
|
||||
Args:
|
||||
file: Filename of the energy file
|
||||
"""
|
||||
return pd.DataFrame(reader.energy_reader(file).data_dict)
|
||||
286
src/mdevaluate/atoms.py
Normal file
286
src/mdevaluate/atoms.py
Normal file
@@ -0,0 +1,286 @@
|
||||
import re
|
||||
|
||||
from .pbc import pbc_diff
|
||||
from .checksum import checksum
|
||||
import numpy as np
|
||||
|
||||
import scipy
|
||||
|
||||
if scipy.version.version >= "0.17.0":
|
||||
from scipy.spatial import cKDTree as KDTree
|
||||
else:
|
||||
from scipy.spatial import KDTree
|
||||
|
||||
|
||||
def compare_regex(list, exp):
|
||||
"""
|
||||
Compare a list of strings with a regular expression.
|
||||
"""
|
||||
if not exp.endswith("$"):
|
||||
exp += "$"
|
||||
regex = re.compile(exp)
|
||||
return np.array([regex.match(s) is not None for s in list])
|
||||
|
||||
|
||||
class Atoms:
|
||||
"""
|
||||
Basic container class for atom information.
|
||||
|
||||
Args:
|
||||
atoms: N tuples of residue id, residue name and atom name.
|
||||
indices (optional): Dictionary of named atom index groups.
|
||||
|
||||
Attributes:
|
||||
residue_ids: Indices of the atoms residues
|
||||
residue_names: Names of the atoms residues
|
||||
atom_names: Names of the atoms
|
||||
indices: Dictionary of named atom index groups, if specified
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, atoms, indices=None, masses=None, charges=None, reader=None):
|
||||
self.residue_ids, self.residue_names, self.atom_names = atoms.T
|
||||
self.residue_ids = np.array([int(m) for m in self.residue_ids])
|
||||
self.indices = indices
|
||||
self.masses = masses
|
||||
self.charges = charges
|
||||
self.reader = reader
|
||||
|
||||
def subset(self, *args, **kwargs):
|
||||
"""
|
||||
Return a subset of these atoms with all atoms selected.
|
||||
|
||||
All arguments are passed to the :meth:`AtomSubset.subset` method directly.
|
||||
|
||||
"""
|
||||
return AtomSubset(self).subset(*args, **kwargs)
|
||||
|
||||
def __len__(self):
|
||||
return len(self.atom_names)
|
||||
|
||||
|
||||
class AtomMismatch(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class AtomSubset:
|
||||
def __init__(self, atoms, selection=None, description=""):
|
||||
"""
|
||||
Args:
|
||||
atoms: Base atom object
|
||||
selection (opt.): Selected atoms
|
||||
description (opt.): Descriptive string of the subset.
|
||||
"""
|
||||
if selection is None:
|
||||
selection = np.ones(len(atoms), dtype="bool")
|
||||
self.selection = selection
|
||||
self.atoms = atoms
|
||||
self.description = description
|
||||
|
||||
def subset(self, atom_name=None, residue_name=None, residue_id=None, indices=None):
|
||||
"""
|
||||
Return a subset of the system. The selection is specified by one or more of
|
||||
the keyworss below. Names are matched as a regular expression with `re.match`.
|
||||
|
||||
Args:
|
||||
atom_name: Specification of the atom name
|
||||
residue_name: Specification of the resiude name
|
||||
residue_id: Residue ID or list of IDs
|
||||
indices: List of atom indices
|
||||
"""
|
||||
new_subset = self
|
||||
if atom_name is not None:
|
||||
new_subset &= AtomSubset(
|
||||
self.atoms,
|
||||
selection=compare_regex(self.atoms.atom_names, atom_name),
|
||||
description=atom_name,
|
||||
)
|
||||
|
||||
if residue_name is not None:
|
||||
new_subset &= AtomSubset(
|
||||
self.atoms,
|
||||
selection=compare_regex(self.atoms.residue_names, residue_name),
|
||||
description=residue_name,
|
||||
)
|
||||
|
||||
if residue_id is not None:
|
||||
if np.iterable(residue_id):
|
||||
selection = np.zeros(len(self.selection), dtype="bool")
|
||||
selection[np.in1d(self.atoms.residue_ids, residue_id)] = True
|
||||
new_subset &= AtomSubset(self.atoms, selection)
|
||||
else:
|
||||
new_subset &= AtomSubset(
|
||||
self.atoms, self.atoms.residue_ids == residue_id
|
||||
)
|
||||
|
||||
if indices is not None:
|
||||
selection = np.zeros(len(self.selection), dtype="bool")
|
||||
selection[indices] = True
|
||||
new_subset &= AtomSubset(self.atoms, selection)
|
||||
return new_subset
|
||||
|
||||
@property
|
||||
def atom_names(self):
|
||||
return self.atoms.atom_names[self.selection]
|
||||
|
||||
@property
|
||||
def residue_names(self):
|
||||
return self.atoms.residue_names[self.selection]
|
||||
|
||||
@property
|
||||
def residue_ids(self):
|
||||
return self.atoms.residue_ids[self.selection]
|
||||
|
||||
@property
|
||||
def indices(self):
|
||||
return np.where(self.selection)
|
||||
|
||||
def __getitem__(self, slice):
|
||||
if isinstance(slice, str):
|
||||
indices = self.atoms.indices[slice]
|
||||
return self.atoms.subset()[indices] & self
|
||||
|
||||
return self.subset(indices=self.indices[0].__getitem__(slice))
|
||||
|
||||
def __and__(self, other):
|
||||
if self.atoms != other.atoms:
|
||||
raise AtomMismatch
|
||||
selection = self.selection & other.selection
|
||||
description = "{}_{}".format(self.description, other.description).strip("_")
|
||||
return AtomSubset(self.atoms, selection, description)
|
||||
|
||||
def __or__(self, other):
|
||||
if self.atoms != other.atoms:
|
||||
raise AtomMismatch
|
||||
selection = self.selection | other.selection
|
||||
description = "{}_{}".format(self.description, other.description).strip("_")
|
||||
return AtomSubset(self.atoms, selection, description)
|
||||
|
||||
def __invert__(self):
|
||||
selection = ~self.selection
|
||||
return AtomSubset(self.atoms, selection, self.description)
|
||||
|
||||
def __repr__(self):
|
||||
return "Subset of Atoms ({} of {})".format(
|
||||
len(self.atoms.residue_names[self.selection]), len(self.atoms)
|
||||
)
|
||||
|
||||
@property
|
||||
def summary(self):
|
||||
return "\n".join(
|
||||
[
|
||||
"{}{} {}".format(resid, resname, atom_names)
|
||||
for resid, resname, atom_names in zip(
|
||||
self.residue_ids, self.residue_names, self.atom_names
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
def __checksum__(self):
|
||||
return checksum(self.description)
|
||||
|
||||
|
||||
def center_of_mass(position, mass=None):
|
||||
if mass is not None:
|
||||
return 1 / mass.sum() * (mass * position).sum(axis=0)
|
||||
else:
|
||||
return 1 / len(position) * position.sum(axis=0)
|
||||
|
||||
|
||||
def gyration_radius(position):
|
||||
r"""
|
||||
Calculates a list of all radii of gyration of all molecules given in the coordinate frame,
|
||||
weighted with the masses of the individual atoms.
|
||||
|
||||
Args:
|
||||
position: Coordinate frame object
|
||||
|
||||
..math::
|
||||
R_G = \left(\frac{\sum_{i=1}^{n} m_i |\vec{r_i} - \vec{r_{COM}}|^2 }{\sum_{i=1}^{n} m_i }
|
||||
\rigth)^{\frac{1}{2}}
|
||||
"""
|
||||
|
||||
gyration_radii = np.array([])
|
||||
|
||||
for resid in np.unique(position.residue_ids):
|
||||
pos = position.whole[position.residue_ids == resid]
|
||||
mass = position.masses[position.residue_ids == resid][:, np.newaxis]
|
||||
COM = center_of_mass(pos, mass)
|
||||
r_sq = ((pbc_diff(pos, COM, pos.box.diagonal())) ** 2).sum(1)[:, np.newaxis]
|
||||
g_radius = ((r_sq * mass).sum() / mass.sum()) ** (0.5)
|
||||
|
||||
gyration_radii = np.append(gyration_radii, g_radius)
|
||||
|
||||
return gyration_radii
|
||||
|
||||
|
||||
def layer_of_atoms(
|
||||
atoms, thickness, plane_offset=np.array([0, 0, 0]), plane_normal=np.array([1, 0, 0])
|
||||
):
|
||||
p_ = atoms - plane_offset
|
||||
distance = np.dot(p_, plane_normal)
|
||||
|
||||
return abs(distance) <= thickness
|
||||
|
||||
|
||||
def distance_to_atoms(ref, atoms, box=None):
|
||||
"""Get the minimal distance from atoms to ref.
|
||||
The result is an array of with length == len(atoms)
|
||||
"""
|
||||
out = np.empty(atoms.shape[0])
|
||||
for i, atom in enumerate(atoms):
|
||||
diff = (pbc_diff(atom, ref, box) ** 2).sum(axis=1).min()
|
||||
out[i] = np.sqrt(diff)
|
||||
return out
|
||||
|
||||
|
||||
def distance_to_atoms_cKDtree(ref, atoms, box=None, thickness=None):
|
||||
"""
|
||||
Get the minimal distance from atoms to ref.
|
||||
The result is an array of with length == len(atoms)
|
||||
Can be faster than distance_to_atoms.
|
||||
Thickness defaults to box/5. If this is too small results may be wrong.
|
||||
If box is not given then periodic boundary conditions are not applied!
|
||||
"""
|
||||
if thickness == None:
|
||||
thickness = box / 5
|
||||
if box is not None:
|
||||
start_coords = np.copy(atoms) % box
|
||||
all_frame_coords = pbc_points(ref, box, thickness=thickness)
|
||||
else:
|
||||
start_coords = atoms
|
||||
all_frame_coords = ref
|
||||
|
||||
tree = spatial.cKDTree(all_frame_coords)
|
||||
return tree.query(start_coords)[0]
|
||||
|
||||
|
||||
def next_neighbors(
|
||||
atoms,
|
||||
query_atoms=None,
|
||||
number_of_neighbors=1,
|
||||
distance_upper_bound=np.inf,
|
||||
distinct=False,
|
||||
):
|
||||
"""
|
||||
Find the N next neighbors of a set of atoms.
|
||||
|
||||
Args:
|
||||
atoms: The reference atoms and also the atoms which are queried if `query_atoms` is net provided
|
||||
query_atoms (opt.): If this is not None, these atoms will be queried
|
||||
number_of_neighbors (int, opt.): Number of neighboring atoms to find
|
||||
distance_upper_bound (float, opt.): Upper bound of the distance between neighbors
|
||||
distinct (bool, opt.): If this is true, the atoms and query atoms are taken as distinct sets of atoms
|
||||
"""
|
||||
tree = KDTree(atoms)
|
||||
dnn = 0
|
||||
if query_atoms is None:
|
||||
query_atoms = atoms
|
||||
elif not distinct:
|
||||
dnn = 1
|
||||
dist, indices = tree.query(
|
||||
query_atoms,
|
||||
number_of_neighbors + dnn,
|
||||
distance_upper_bound=distance_upper_bound,
|
||||
)
|
||||
return indices[:, dnn:]
|
||||
193
src/mdevaluate/autosave.py
Normal file
193
src/mdevaluate/autosave.py
Normal file
@@ -0,0 +1,193 @@
|
||||
import os
|
||||
import numpy as np
|
||||
import functools
|
||||
import inspect
|
||||
|
||||
from .checksum import checksum
|
||||
from .logging import logger
|
||||
|
||||
autosave_directory = None
|
||||
load_autosave_data = False
|
||||
verbose_print = True
|
||||
user_autosave_directory = os.path.join(os.environ["HOME"], ".mdevaluate/autosave")
|
||||
|
||||
|
||||
def notify(msg):
|
||||
if verbose_print:
|
||||
logger.info(msg)
|
||||
else:
|
||||
logger.debug(msg)
|
||||
|
||||
|
||||
def enable(dir, load_data=True, verbose=True):
|
||||
"""
|
||||
Enable auto saving results of functions decorated with :func:`autosave_data`.
|
||||
|
||||
Args:
|
||||
dir: Directory where the data should be saved.
|
||||
load_data (opt., bool): If data should also be loaded.
|
||||
"""
|
||||
global autosave_directory, load_autosave_data, verbose_print
|
||||
verbose_print = verbose
|
||||
# absolute = os.path.abspath(dir)
|
||||
# os.makedirs(absolute, exist_ok=True)
|
||||
autosave_directory = dir
|
||||
load_autosave_data = load_data
|
||||
notify("Enabled autosave in directory: {}".format(autosave_directory))
|
||||
|
||||
|
||||
def disable():
|
||||
"""Disable autosave."""
|
||||
global autosave_directory, load_autosave_data
|
||||
autosave_directory = None
|
||||
load_autosave_data = False
|
||||
|
||||
|
||||
class disabled:
|
||||
"""
|
||||
A context manager that disbales the autosave module within its context.
|
||||
|
||||
Example:
|
||||
import mdevaluate as md
|
||||
md.autosave.enable('data')
|
||||
with md.autosave.disabled():
|
||||
# Autosave functionality is disabled within this context.
|
||||
md.correlation.shifted_correlation(
|
||||
...
|
||||
)
|
||||
|
||||
# After the context is exited, autosave will work as before.
|
||||
"""
|
||||
|
||||
def __enter__(self):
|
||||
self._autosave_directory = autosave_directory
|
||||
disable()
|
||||
|
||||
def __exit__(self, *args):
|
||||
enable(self._autosave_directory)
|
||||
|
||||
|
||||
def get_directory(reader):
|
||||
"""Get the autosave directory for a trajectory reader."""
|
||||
outdir = os.path.dirname(reader.filename)
|
||||
savedir = os.path.join(outdir, autosave_directory)
|
||||
if not os.path.exists(savedir):
|
||||
try:
|
||||
os.makedirs(savedir)
|
||||
except PermissionError:
|
||||
pass
|
||||
if not os.access(savedir, os.W_OK):
|
||||
savedir = os.path.join(user_autosave_directory, savedir.lstrip("/"))
|
||||
logger.info(
|
||||
"Switched autosave directory to {}, since original location is not writeable.".format(
|
||||
savedir
|
||||
)
|
||||
)
|
||||
os.makedirs(savedir, exist_ok=True)
|
||||
return savedir
|
||||
|
||||
|
||||
def get_filename(function, checksum, description, *args):
|
||||
"""Get the autosave filename for a specific function call."""
|
||||
func_desc = function.__name__
|
||||
for arg in args:
|
||||
if hasattr(arg, "__name__"):
|
||||
func_desc += "_{}".format(arg.__name__)
|
||||
elif isinstance(arg, functools.partial):
|
||||
func_desc += "_{}".format(arg.func.__name__)
|
||||
|
||||
if hasattr(arg, "frames"):
|
||||
savedir = get_directory(arg.frames)
|
||||
|
||||
if hasattr(arg, "description") and arg.description != "":
|
||||
description += "_{}".format(arg.description)
|
||||
filename = "{}_{}.npz".format(func_desc.strip("_"), description.strip("_"))
|
||||
return os.path.join(savedir, filename)
|
||||
|
||||
|
||||
def verify_file(filename, checksum):
|
||||
"""Verify if the file matches the function call."""
|
||||
file_checksum = 0
|
||||
if os.path.exists(filename):
|
||||
data = np.load(filename, allow_pickle=True)
|
||||
if "checksum" in data:
|
||||
file_checksum = data["checksum"]
|
||||
return file_checksum == checksum
|
||||
|
||||
|
||||
def save_data(filename, checksum, data):
|
||||
"""Save data and checksum to a file."""
|
||||
notify("Saving result to file: {}".format(filename))
|
||||
try:
|
||||
data = np.array(data)
|
||||
except ValueError:
|
||||
arr = np.empty((len(data),), dtype=object)
|
||||
arr[:] = data
|
||||
data = arr
|
||||
|
||||
np.savez(filename, checksum=checksum, data=data)
|
||||
|
||||
|
||||
def load_data(filename):
|
||||
"""Load data from a npz file."""
|
||||
notify("Loading result from file: {}".format(filename))
|
||||
fdata = np.load(filename, allow_pickle=True)
|
||||
if "data" in fdata:
|
||||
return fdata["data"]
|
||||
else:
|
||||
data = tuple(fdata[k] for k in sorted(fdata) if ("arr" in k))
|
||||
save_data(filename, fdata["checksum"], data)
|
||||
return data
|
||||
|
||||
|
||||
def autosave_data(nargs, kwargs_keys=None, version=None):
|
||||
"""
|
||||
Enable autosaving of results for a function.
|
||||
|
||||
Args:
|
||||
nargs: Number of args which are relevant for the calculation.
|
||||
kwargs_keys (opt.): List of keyword arguments which are relevant for the calculation.
|
||||
version (opt.):
|
||||
An optional version number of the decorated function, which replaces the checksum of
|
||||
the function code, hence the checksum does not depend on the function code.
|
||||
"""
|
||||
|
||||
def decorator_function(function):
|
||||
# make sure too include names of positional arguments in kwargs_keys,
|
||||
# sice otherwise they will be ignored if passed via keyword.
|
||||
# nonlocal kwargs_keys
|
||||
posargs_keys = list(inspect.signature(function).parameters)[:nargs]
|
||||
|
||||
@functools.wraps(function)
|
||||
def autosave(*args, **kwargs):
|
||||
description = kwargs.pop("description", "")
|
||||
autoload = kwargs.pop("autoload", True) and load_autosave_data
|
||||
if autosave_directory is not None:
|
||||
relevant_args = list(args[:nargs])
|
||||
if kwargs_keys is not None:
|
||||
for key in [*posargs_keys, *kwargs_keys]:
|
||||
if key in kwargs:
|
||||
relevant_args.append(kwargs[key])
|
||||
|
||||
if version is None:
|
||||
csum = legacy_csum = checksum(function, *relevant_args)
|
||||
else:
|
||||
csum = checksum(version, *relevant_args)
|
||||
legacy_csum = checksum(function, *relevant_args)
|
||||
|
||||
filename = get_filename(function, csum, description, *relevant_args)
|
||||
if autoload and (
|
||||
verify_file(filename, csum) or verify_file(filename, legacy_csum)
|
||||
):
|
||||
result = load_data(filename)
|
||||
else:
|
||||
result = function(*args, **kwargs)
|
||||
save_data(filename, csum, result)
|
||||
else:
|
||||
result = function(*args, **kwargs)
|
||||
|
||||
return result
|
||||
|
||||
return autosave
|
||||
|
||||
return decorator_function
|
||||
91
src/mdevaluate/checksum.py
Executable file
91
src/mdevaluate/checksum.py
Executable file
@@ -0,0 +1,91 @@
|
||||
import functools
|
||||
import hashlib
|
||||
from .logging import logger
|
||||
from types import ModuleType, FunctionType
|
||||
import inspect
|
||||
|
||||
import numpy as np
|
||||
|
||||
# This variable is used within the checksum function to salt the sha1 sum.
|
||||
# May be changed to force a different checksum for similar objects.
|
||||
SALT = 42
|
||||
|
||||
|
||||
def version(version_nr, calls=[]):
|
||||
"""Function decorator that assigns a custom checksum to a function."""
|
||||
|
||||
def decorator(func):
|
||||
cs = checksum(func.__name__, version_nr, *calls)
|
||||
func.__checksum__ = lambda: cs
|
||||
|
||||
@functools.wraps(func)
|
||||
def wrapped(*args, **kwargs):
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return wrapped
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
def strip_comments(s):
|
||||
"""Strips comment lines and docstring from Python source string."""
|
||||
o = ""
|
||||
in_docstring = False
|
||||
for l in s.split("\n"):
|
||||
if l.strip().startswith(("#", '"', "'")) or in_docstring:
|
||||
in_docstring = l.strip().startswith(('"""', "'''")) + in_docstring == 1
|
||||
continue
|
||||
o += l + "\n"
|
||||
return o
|
||||
|
||||
|
||||
def checksum(*args, csum=None):
|
||||
"""
|
||||
Calculate a checksum of any object, by sha1 hash.
|
||||
|
||||
Input for the hash are some salt bytes and the byte encoding of a string
|
||||
that depends on the object and its type:
|
||||
|
||||
- If a method __checksum__ is available, it's return value is converted to bytes
|
||||
- str or bytes are used as sha1 input directly
|
||||
- modules use the __name__ attribute
|
||||
- functions use the function code and any closures of the function
|
||||
- functools.partial uses the checksum of the function and any arguments, that were defined
|
||||
- numpy.ndarray uses bytes representation of the array (arr.tobytes())
|
||||
- Anything else is converted to a str
|
||||
"""
|
||||
if csum is None:
|
||||
csum = hashlib.sha1()
|
||||
csum.update(str(SALT).encode())
|
||||
|
||||
for arg in args:
|
||||
if hasattr(arg, "__checksum__"):
|
||||
logger.debug("Checksum via __checksum__: %s", str(arg))
|
||||
csum.update(str(arg.__checksum__()).encode())
|
||||
elif isinstance(arg, bytes):
|
||||
csum.update(arg)
|
||||
elif isinstance(arg, str):
|
||||
csum.update(arg.encode())
|
||||
elif isinstance(arg, ModuleType):
|
||||
csum.update(arg.__name__.encode())
|
||||
elif isinstance(arg, FunctionType):
|
||||
csum.update(strip_comments(inspect.getsource(arg)).encode())
|
||||
c = inspect.getclosurevars(arg)
|
||||
for v in {**c.nonlocals, **c.globals}.values():
|
||||
if v is not arg:
|
||||
checksum(v, csum=csum)
|
||||
elif isinstance(arg, functools.partial):
|
||||
logger.debug("Checksum via partial for %s", str(arg))
|
||||
checksum(arg.func, csum=csum)
|
||||
for x in arg.args:
|
||||
checksum(x, csum=csum)
|
||||
for k in sorted(arg.keywords.keys()):
|
||||
csum.update(k.encode())
|
||||
checksum(arg.keywords[k], csum=csum)
|
||||
elif isinstance(arg, np.ndarray):
|
||||
csum.update(arg.tobytes())
|
||||
else:
|
||||
logger.debug("Checksum via str for %s", str(arg))
|
||||
csum.update(str(arg).encode())
|
||||
|
||||
return int.from_bytes(csum.digest(), "big")
|
||||
37
src/mdevaluate/cli.py
Normal file
37
src/mdevaluate/cli.py
Normal file
@@ -0,0 +1,37 @@
|
||||
import argparse
|
||||
from . import logging
|
||||
from . import open as md_open
|
||||
|
||||
|
||||
def run(*args, **kwargs):
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"xtcfile",
|
||||
help="The xtc file to index.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--tpr", help="The tprfile of the trajectory.", dest="tpr", default=None
|
||||
)
|
||||
parser.add_argument(
|
||||
"--nojump",
|
||||
help="Generate Nojump Matrices, requires a tpr file.",
|
||||
dest="nojump",
|
||||
action="store_true",
|
||||
default=False,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--debug",
|
||||
help="Set logging level to debug.",
|
||||
dest="debug",
|
||||
action="store_true",
|
||||
default=False,
|
||||
)
|
||||
args = parser.parse_args()
|
||||
if args.debug:
|
||||
logging.setlevel("DEBUG")
|
||||
|
||||
md_open("", trajectory=args.xtcfile, topology=args.tpr, nojump=args.nojump)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
run()
|
||||
593
src/mdevaluate/coordinates.py
Executable file
593
src/mdevaluate/coordinates.py
Executable file
@@ -0,0 +1,593 @@
|
||||
from functools import partial, lru_cache, wraps
|
||||
from copy import copy
|
||||
from .logging import logger
|
||||
|
||||
import numpy as np
|
||||
from scipy.spatial import cKDTree, KDTree
|
||||
|
||||
from .atoms import AtomSubset
|
||||
from .pbc import whole, nojump, pbc_diff
|
||||
from .utils import mask2indices, singledispatchmethod
|
||||
from .checksum import checksum
|
||||
|
||||
|
||||
class UnknownCoordinatesMode(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def rotate_axis(coords, axis):
|
||||
"""
|
||||
Rotate a set of coordinates to a given axis.
|
||||
"""
|
||||
axis = np.array(axis) / np.linalg.norm(axis)
|
||||
zaxis = np.array([0, 0, 1])
|
||||
if (axis == zaxis).sum() == 3:
|
||||
return coords
|
||||
rotation_axis = np.cross(axis, zaxis)
|
||||
rotation_axis = rotation_axis / np.linalg.norm(rotation_axis)
|
||||
|
||||
theta = np.arccos(axis @ zaxis / np.linalg.norm(axis))
|
||||
|
||||
# return theta/pi, rotation_axis
|
||||
|
||||
ux, uy, uz = rotation_axis
|
||||
cross_matrix = np.array([[0, -uz, uy], [uz, 0, -ux], [-uy, ux, 0]])
|
||||
rotation_matrix = (
|
||||
np.cos(theta) * np.identity(len(axis))
|
||||
+ (1 - np.cos(theta))
|
||||
* rotation_axis.reshape(-1, 1)
|
||||
@ rotation_axis.reshape(1, -1)
|
||||
+ np.sin(theta) * cross_matrix
|
||||
)
|
||||
|
||||
if len(coords.shape) == 2:
|
||||
rotated = np.array([rotation_matrix @ xyz for xyz in coords])
|
||||
else:
|
||||
rotated = rotation_matrix @ coords
|
||||
return rotated
|
||||
|
||||
|
||||
def spherical_radius(frame, origin=None):
|
||||
"""
|
||||
Transform a frame of cartesian coordinates into the sperical radius.
|
||||
If origin=None the center of the box is taken as the coordinates origin.
|
||||
"""
|
||||
if origin is None:
|
||||
origin = frame.box.diagonal() / 2
|
||||
return ((frame - origin) ** 2).sum(axis=-1) ** 0.5
|
||||
|
||||
|
||||
def polar_coordinates(x, y):
|
||||
"""Convert cartesian to polar coordinates."""
|
||||
radius = (x**2 + y**2) ** 0.5
|
||||
phi = np.arctan2(y, x)
|
||||
return radius, phi
|
||||
|
||||
|
||||
def spherical_coordinates(x, y, z):
|
||||
"""Convert cartesian to spherical coordinates."""
|
||||
xy, phi = polar_coordinates(x, y)
|
||||
radius = (x**2 + y**2 + z**2) ** 0.5
|
||||
theta = np.arccos(z / radius)
|
||||
return radius, phi, theta
|
||||
|
||||
|
||||
def radial_selector(frame, coordinates, rmin, rmax):
|
||||
"""
|
||||
Return a selection of all atoms with radius in the interval [rmin, rmax].
|
||||
"""
|
||||
crd = coordinates[frame.step]
|
||||
rad, _ = polar_coordinates(crd[:, 0], crd[:, 1])
|
||||
selector = (rad >= rmin) & (rad <= rmax)
|
||||
return mask2indices(selector)
|
||||
|
||||
|
||||
def spatial_selector(frame, transform, rmin, rmax):
|
||||
"""
|
||||
Select a subset of atoms which have a radius between rmin and rmax.
|
||||
Coordinates are filtered by the condition::
|
||||
|
||||
rmin <= transform(frame) <= rmax
|
||||
|
||||
Args:
|
||||
frame: The coordinates of the actual trajectory
|
||||
transform:
|
||||
A function that transforms the coordinates of the frames into
|
||||
the one-dimensional spatial coordinate (e.g. radius).
|
||||
rmin: Minimum value of the radius
|
||||
rmax: Maximum value of the radius
|
||||
"""
|
||||
r = transform(frame)
|
||||
selector = (rmin <= r) & (rmax >= r)
|
||||
return mask2indices(selector)
|
||||
|
||||
|
||||
class CoordinateFrame(np.ndarray):
|
||||
_known_modes = ("pbc", "whole", "nojump")
|
||||
|
||||
@property
|
||||
def box(self):
|
||||
return np.array(self.coordinates.frames[self.step].triclinic_dimensions)
|
||||
|
||||
@property
|
||||
def volume(self):
|
||||
return self.box.diagonal().cumprod()[-1]
|
||||
|
||||
@property
|
||||
def time(self):
|
||||
return self.coordinates.frames[self.step].time
|
||||
|
||||
@property
|
||||
def masses(self):
|
||||
return self.coordinates.atoms.masses[self.coordinates.atom_subset.selection]
|
||||
|
||||
@property
|
||||
def charges(self):
|
||||
return self.coordinates.atoms.charges[self.coordinates.atom_subset.selection]
|
||||
|
||||
@property
|
||||
def residue_ids(self):
|
||||
return self.coordinates.atom_subset.residue_ids
|
||||
|
||||
@property
|
||||
def residue_names(self):
|
||||
return self.coordinates.atom_subset.residue_names
|
||||
|
||||
@property
|
||||
def atom_names(self):
|
||||
return self.coordinates.atom_subset.atom_names
|
||||
|
||||
@property
|
||||
def indices(self):
|
||||
return self.coordinates.atom_subset.indices
|
||||
|
||||
@property
|
||||
def selection(self):
|
||||
return self.coordinates.atom_subset.selection
|
||||
|
||||
@property
|
||||
def whole(self):
|
||||
frame = whole(self)
|
||||
frame.mode = "whole"
|
||||
return frame
|
||||
|
||||
@property
|
||||
def pbc(self):
|
||||
frame = self % self.box.diagonal()
|
||||
frame.mode = "pbc"
|
||||
return frame
|
||||
|
||||
@property
|
||||
def nojump(self):
|
||||
if self.mode != "nojump":
|
||||
if self.mode is not None:
|
||||
logger.warn(
|
||||
"Combining Nojump with other Coordinate modes is not supported and "
|
||||
"may cause unexpected results."
|
||||
)
|
||||
frame = nojump(self)
|
||||
frame.mode = "nojump"
|
||||
return frame
|
||||
else:
|
||||
return self
|
||||
|
||||
def __new__(
|
||||
subtype,
|
||||
shape,
|
||||
dtype=float,
|
||||
buffer=None,
|
||||
offset=0,
|
||||
strides=None,
|
||||
order=None,
|
||||
coordinates=None,
|
||||
step=None,
|
||||
box=None,
|
||||
mode=None,
|
||||
):
|
||||
obj = np.ndarray.__new__(subtype, shape, dtype, buffer, offset, strides)
|
||||
|
||||
obj.coordinates = coordinates
|
||||
obj.step = step
|
||||
obj.mode = mode
|
||||
return obj
|
||||
|
||||
def __array_finalize__(self, obj):
|
||||
if obj is None:
|
||||
return
|
||||
|
||||
self.coordinates = getattr(obj, "coordinates", None)
|
||||
self.step = getattr(obj, "step", None)
|
||||
self.mode = getattr(obj, "mode", None)
|
||||
if hasattr(obj, "reference"):
|
||||
self.reference = getattr(obj, "reference")
|
||||
|
||||
|
||||
class Coordinates:
|
||||
"""
|
||||
Coordinates represent trajectory data, which is used for evaluation functions.
|
||||
|
||||
Atoms may be selected by specifing a atom_subset or a atom_filter.
|
||||
"""
|
||||
|
||||
def get_mode(self, mode):
|
||||
if self.atom_subset is not None:
|
||||
return Coordinates(
|
||||
frames=self.frames, atom_subset=self.atom_subset, mode=mode
|
||||
)[self._slice]
|
||||
else:
|
||||
return Coordinates(
|
||||
frames=self.frames, atom_filter=self.atom_filter, mode=mode
|
||||
)[self._slice]
|
||||
|
||||
@property
|
||||
def pbc(self):
|
||||
return self.get_mode("pbc")
|
||||
|
||||
@property
|
||||
def whole(self):
|
||||
return self.get_mode("whole")
|
||||
|
||||
@property
|
||||
def nojump(self):
|
||||
return self.get_mode("nojump")
|
||||
|
||||
@property
|
||||
def mode(self):
|
||||
return self._mode
|
||||
|
||||
@mode.setter
|
||||
def mode(self, val):
|
||||
if val in CoordinateFrame._known_modes:
|
||||
logger.warn(
|
||||
"Changing the Coordinates mode directly is deprecated. Use Coordinates.%s instead, which returns a copy.",
|
||||
val,
|
||||
)
|
||||
self._mode = val
|
||||
else:
|
||||
raise UnknownCoordinatesMode("No such mode: {}".format(val))
|
||||
|
||||
def __init__(
|
||||
self, frames, atom_filter=None, atom_subset: AtomSubset = None, mode=None
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
frames: The trajectory reader
|
||||
atom_filter (opt.): A mask which selects a subset of the system
|
||||
atom_subset (opt.): A AtomSubset that selects a subset of the system
|
||||
mode (opt.): PBC mode of the Coordinates, can be pbc, whole or nojump.
|
||||
|
||||
Note:
|
||||
The caching in Coordinates is deprecated, use the CachedReader or the function open
|
||||
from the reader module instead.
|
||||
"""
|
||||
self._mode = mode
|
||||
self.frames = frames
|
||||
self._slice = slice(None)
|
||||
assert (
|
||||
atom_filter is None or atom_subset is None
|
||||
), "Cannot use both: subset and filter"
|
||||
|
||||
if atom_filter is not None:
|
||||
self.atom_filter = atom_filter
|
||||
self.atom_subset = None
|
||||
elif atom_subset is not None:
|
||||
self.atom_filter = atom_subset.selection
|
||||
self.atom_subset = atom_subset
|
||||
self.atoms = atom_subset.atoms
|
||||
else:
|
||||
self.atom_filter = np.ones(shape=(len(frames[0].coordinates),), dtype=bool)
|
||||
self.atom_subset = None
|
||||
|
||||
def get_frame(self, fnr):
|
||||
"""Returns the fnr-th frame."""
|
||||
try:
|
||||
if self.atom_filter is not None:
|
||||
frame = (
|
||||
self.frames[fnr].positions[self.atom_filter].view(CoordinateFrame)
|
||||
)
|
||||
else:
|
||||
frame = self.frames.__getitem__(fnr).positions.view(CoordinateFrame)
|
||||
frame.coordinates = self
|
||||
frame.step = fnr
|
||||
if self.mode is not None:
|
||||
frame = getattr(frame, self.mode)
|
||||
except EOFError:
|
||||
raise IndexError
|
||||
|
||||
return frame
|
||||
|
||||
def clear_cache(self):
|
||||
"""Clears the frame cache, if it is enabled."""
|
||||
if hasattr(self.get_frame, "clear_cache"):
|
||||
self.get_frame.clear_cache()
|
||||
|
||||
def __iter__(self):
|
||||
for i in range(len(self))[self._slice]:
|
||||
yield self[i]
|
||||
|
||||
@singledispatchmethod
|
||||
def __getitem__(self, item):
|
||||
return self.get_frame(item)
|
||||
|
||||
@__getitem__.register(slice)
|
||||
def _(self, item):
|
||||
sliced = copy(self)
|
||||
sliced._slice = item
|
||||
return sliced
|
||||
|
||||
def __len__(self):
|
||||
return len(self.frames)
|
||||
|
||||
def __checksum__(self):
|
||||
return checksum(self.frames, self.atom_filter, self._slice, self.mode)
|
||||
|
||||
def __repr__(self):
|
||||
return "Coordinates <{}>: {}".format(self.frames.filename, self.atom_subset)
|
||||
|
||||
@wraps(AtomSubset.subset)
|
||||
def subset(self, **kwargs):
|
||||
return Coordinates(
|
||||
self.frames, atom_subset=self.atom_subset.subset(**kwargs), mode=self._mode
|
||||
)
|
||||
|
||||
@property
|
||||
def description(self):
|
||||
return self.atom_subset.description
|
||||
|
||||
@description.setter
|
||||
def description(self, desc):
|
||||
self.atom_subset.description = desc
|
||||
|
||||
|
||||
class MeanCoordinates(Coordinates):
|
||||
def __init__(self, frames, atom_filter=None, mean=1):
|
||||
super().__init__(frames, atom_filter)
|
||||
self.mean = mean
|
||||
assert mean >= 1, "Mean must be positive"
|
||||
|
||||
def __getitem__(self, item):
|
||||
frame = super().__getitem__(item)
|
||||
for i in range(item + 1, item + self.mean):
|
||||
frame += super().__getitem__(i)
|
||||
|
||||
return frame / self.mean
|
||||
|
||||
def len(self):
|
||||
return len(super() - self.mean + 1)
|
||||
|
||||
|
||||
class CoordinatesMap:
|
||||
def __init__(self, coordinates, function):
|
||||
self.coordinates = coordinates
|
||||
self.frames = self.coordinates.frames
|
||||
self.atom_subset = self.coordinates.atom_subset
|
||||
self.function = function
|
||||
if isinstance(function, partial):
|
||||
self._description = self.function.func.__name__
|
||||
else:
|
||||
self._description = self.function.__name__
|
||||
|
||||
def __iter__(self):
|
||||
for frame in self.coordinates:
|
||||
step = frame.step
|
||||
frame = self.function(frame)
|
||||
if not isinstance(frame, CoordinateFrame):
|
||||
frame = frame.view(CoordinateFrame)
|
||||
frame.coordinates = self
|
||||
frame.step = step
|
||||
yield frame
|
||||
|
||||
def __getitem__(self, item):
|
||||
if isinstance(item, slice):
|
||||
return self.__class__(self.coordinates[item], self.function)
|
||||
else:
|
||||
frame = self.function(self.coordinates.__getitem__(item))
|
||||
if not isinstance(frame, CoordinateFrame):
|
||||
frame = frame.view(CoordinateFrame)
|
||||
frame.coordinates = self
|
||||
frame.step = item
|
||||
return frame
|
||||
|
||||
def __len__(self):
|
||||
return len(self.coordinates.frames)
|
||||
|
||||
def __checksum__(self):
|
||||
return checksum(self.coordinates, self.function)
|
||||
|
||||
@wraps(Coordinates.subset)
|
||||
def subset(self, **kwargs):
|
||||
return CoordinatesMap(self.coordinates.subset(**kwargs), self.function)
|
||||
|
||||
@property
|
||||
def description(self):
|
||||
return "{}_{}".format(self._description, self.coordinates.description)
|
||||
|
||||
@description.setter
|
||||
def description(self, desc):
|
||||
self._description = desc
|
||||
|
||||
@property
|
||||
def nojump(self):
|
||||
return CoordinatesMap(self.coordinates.nojump, self.function)
|
||||
|
||||
@property
|
||||
def whole(self):
|
||||
return CoordinatesMap(self.coordinates.whole, self.function)
|
||||
|
||||
@property
|
||||
def pbc(self):
|
||||
return CoordinatesMap(self.coordinates.pbc, self.function)
|
||||
|
||||
|
||||
class CoordinatesFilter:
|
||||
@property
|
||||
def atom_subset(self):
|
||||
pass
|
||||
|
||||
def __init__(self, coordinates, atom_filter):
|
||||
self.coordinates = coordinates
|
||||
self.atom_filter = atom_filter
|
||||
|
||||
def __getitem__(self, item):
|
||||
if isinstance(item, slice):
|
||||
sliced = copy(self)
|
||||
sliced.coordinates = self.coordinates[item]
|
||||
return sliced
|
||||
else:
|
||||
frame = self.coordinates[item]
|
||||
return frame[self.atom_filter]
|
||||
|
||||
|
||||
class CoordinatesKDTree:
|
||||
"""
|
||||
A KDTree of coordinates frames. The KDtrees are cached by a :func:`functools.lru_cache`.
|
||||
Uses :class:`scipy.spatial.cKDTree` by default, since it's significantly faster.
|
||||
Make sure to use scipy 0.17 or later or switch to the normal KDTree, since cKDTree has
|
||||
a memory leak in earlier versions.
|
||||
"""
|
||||
|
||||
def clear_cache(self):
|
||||
"""Clear the LRU cache."""
|
||||
self._get_tree_at_index.cache_clear()
|
||||
|
||||
@property
|
||||
def cache_info(self):
|
||||
"""Return info about the state of the cache."""
|
||||
return self._get_tree_at_index.cache_info()
|
||||
|
||||
def _get_tree_at_index(self, index):
|
||||
frame = self.frames[index]
|
||||
return self.kdtree(frame[self.selector(frame)])
|
||||
|
||||
def __init__(self, frames, selector=None, boxsize=None, maxcache=128, ckdtree=True):
|
||||
"""
|
||||
Args:
|
||||
frames: Trajectory of the simulation, can be Coordinates object or reader
|
||||
selector: Selector function that selects a subset of each frame
|
||||
maxcache: Maxsize of the :func:`~functools.lru_cache`
|
||||
ckdtree: Use :class:`~scipy.spatial.cKDTree` or :class:`~scipy.spatial.KDTree` if False
|
||||
"""
|
||||
if selector is not None:
|
||||
self.selector = selector
|
||||
else:
|
||||
self.selector = lambda x: slice(None)
|
||||
self.frames = frames
|
||||
self.kdtree = cKDTree if ckdtree else KDTree
|
||||
if boxsize is not None:
|
||||
self.kdtree = partial(self.kdtree, boxsize=boxsize)
|
||||
self._get_tree_at_index = lru_cache(maxsize=maxcache)(self._get_tree_at_index)
|
||||
|
||||
def __getitem__(self, index):
|
||||
return self._get_tree_at_index(index)
|
||||
|
||||
def __checksum__(self):
|
||||
return checksum(self.selector, self.frames)
|
||||
|
||||
def __eq__(self, other):
|
||||
return super().__eq__(other)
|
||||
|
||||
|
||||
def map_coordinates(func):
|
||||
@wraps(func)
|
||||
def wrapped(coordinates, **kwargs):
|
||||
return CoordinatesMap(coordinates, partial(func, **kwargs))
|
||||
|
||||
return wrapped
|
||||
|
||||
|
||||
@map_coordinates
|
||||
def centers_of_mass(c, *, masses=None):
|
||||
"""
|
||||
|
||||
A- 1
|
||||
B- 2
|
||||
A- 1
|
||||
C 3
|
||||
A-
|
||||
B-
|
||||
A-
|
||||
C
|
||||
A-
|
||||
B-
|
||||
A-
|
||||
C
|
||||
|
||||
|
||||
Example:
|
||||
rd = XTCReader('t.xtc')
|
||||
coordinates = Coordinates(rd)
|
||||
com = centers_of_mass(coordinates, (1.0, 2.0, 1.0, 3.0))
|
||||
|
||||
"""
|
||||
# At first, regroup our array
|
||||
number_of_masses = len(masses)
|
||||
number_of_coordinates, number_of_dimensions = c.shape
|
||||
number_of_new_coordinates = number_of_coordinates // number_of_masses
|
||||
grouped_masses = c.reshape(
|
||||
number_of_new_coordinates, number_of_masses, number_of_dimensions
|
||||
)
|
||||
|
||||
return np.average(grouped_masses, axis=1, weights=masses)
|
||||
|
||||
|
||||
@map_coordinates
|
||||
def pore_coordinates(coordinates, origin, sym_axis="z"):
|
||||
"""
|
||||
Map coordinates of a pore simulation so the pore has cylindrical symmetry.
|
||||
|
||||
Args:
|
||||
coordinates: Coordinates of the simulation
|
||||
origin: Origin of the pore which will be the coordinates origin after mapping
|
||||
sym_axis (opt.): Symmtery axis of the pore, may be a literal direction
|
||||
'x', 'y' or 'z' or an array of shape (3,)
|
||||
"""
|
||||
if sym_axis in ("x", "y", "z"):
|
||||
rot_axis = np.zeros(shape=(3,))
|
||||
rot_axis[["x", "y", "z"].index(sym_axis)] = 1
|
||||
else:
|
||||
rot_axis = sym_axis
|
||||
|
||||
return rotate_axis(coordinates - origin, rot_axis)
|
||||
|
||||
|
||||
@map_coordinates
|
||||
def vectors(coordinates, atoms_a, atoms_b, normed=False):
|
||||
"""
|
||||
Compute the vectors between the atoms of two subsets.
|
||||
|
||||
Args:
|
||||
coordinates: The Coordinates object the atoms will be taken from
|
||||
atoms_a: Mask or indices of the first atom subset
|
||||
atoms_b: Mask or indices of the second atom subset
|
||||
normed (opt.): If the vectors should be normed
|
||||
box (opt.): If not None, the vectors are calcualte with PBC
|
||||
|
||||
The defintion of atoms_a/b can be any possible subript of a numpy array.
|
||||
They can, for example, be given as a masking array of bool values with the
|
||||
same length as the frames of the coordinates. Or they can be a list of
|
||||
indices selecting the atoms of these indices from each frame.
|
||||
|
||||
It is possible to compute the mean of several atoms before calculating the vectors,
|
||||
by using a two-dimensional list of indices. The following code computes the vectors
|
||||
between atoms 0, 3, 6 and the mean coordinate of atoms 1, 4, 7 and 2, 5, 8::
|
||||
|
||||
>>> inds_a = [0, 3, 6]
|
||||
>>> inds_b = [[1, 4, 7], [2, 5, 8]]
|
||||
>>> vectors(coords, inds_a, inds_b)
|
||||
array([
|
||||
coords[0] - (coords[1] + coords[2])/2,
|
||||
coords[3] - (coords[4] + coords[5])/2,
|
||||
coords[6] - (coords[7] + coords[8])/2,
|
||||
])
|
||||
"""
|
||||
box = coordinates.box
|
||||
coords_a = coordinates[atoms_a]
|
||||
if len(coords_a.shape) > 2:
|
||||
coords_a = coords_a.mean(axis=0)
|
||||
coords_b = coordinates[atoms_b]
|
||||
if len(coords_b.shape) > 2:
|
||||
coords_b = coords_b.mean(axis=0)
|
||||
vectors = pbc_diff(coords_a, coords_b, box=box)
|
||||
norm = np.linalg.norm(vectors, axis=-1).reshape(-1, 1) if normed else 1
|
||||
vectors.reference = coords_a
|
||||
return vectors / norm
|
||||
424
src/mdevaluate/correlation.py
Normal file
424
src/mdevaluate/correlation.py
Normal file
@@ -0,0 +1,424 @@
|
||||
import numpy as np
|
||||
from scipy.special import legendre
|
||||
from itertools import chain
|
||||
import dask.array as darray
|
||||
from pathos.pools import ProcessPool
|
||||
from functools import partial
|
||||
|
||||
from .autosave import autosave_data
|
||||
from .utils import filon_fourier_transformation, coherent_sum, histogram
|
||||
from .pbc import pbc_diff
|
||||
|
||||
|
||||
def set_has_counter(func):
|
||||
func.has_counter = True
|
||||
return func
|
||||
|
||||
|
||||
def log_indices(first, last, num=100):
|
||||
ls = np.logspace(0, np.log10(last - first + 1), num=num)
|
||||
return np.unique(np.int_(ls) - 1 + first)
|
||||
|
||||
|
||||
def correlation(function, frames):
|
||||
iterator = iter(frames)
|
||||
start_frame = next(iterator)
|
||||
return map(lambda f: function(start_frame, f), chain([start_frame], iterator))
|
||||
|
||||
|
||||
def subensemble_correlation(selector_function, correlation_function=correlation):
|
||||
def c(function, frames):
|
||||
iterator = iter(frames)
|
||||
start_frame = next(iterator)
|
||||
selector = selector_function(start_frame)
|
||||
subensemble = map(lambda f: f[selector], chain([start_frame], iterator))
|
||||
return correlation_function(function, subensemble)
|
||||
|
||||
return c
|
||||
|
||||
|
||||
def multi_subensemble_correlation(selector_function):
|
||||
"""
|
||||
selector_function has to expect a frame and to
|
||||
return either valid indices (as with subensemble_correlation)
|
||||
or a multidimensional array whose entries are valid indices
|
||||
|
||||
e.g. slice(10,100,2)
|
||||
|
||||
e.g. [1,2,3,4,5]
|
||||
|
||||
e.g. [[[0,1],[2],[3]],[[4],[5],[6]] -> shape: 2,3 with
|
||||
list of indices of varying length
|
||||
|
||||
e.g. [slice(1653),slice(1653,None,3)]
|
||||
|
||||
e.g. [np.ones(len_of_frames, bool)]
|
||||
|
||||
in general using slices is the most efficient.
|
||||
if the selections are small subsets of a frame or when many subsets are empty
|
||||
using indices will be more efficient than using masks.
|
||||
"""
|
||||
|
||||
@set_has_counter
|
||||
def cmulti(function, frames):
|
||||
iterator = iter(frames)
|
||||
start_frame = next(iterator)
|
||||
selectors = np.asarray(selector_function(start_frame))
|
||||
sel_shape = selectors.shape
|
||||
if sel_shape[-1] == 0:
|
||||
selectors = np.asarray(selectors, int)
|
||||
if selectors.dtype != object:
|
||||
sel_shape = sel_shape[:-1]
|
||||
f_values = np.zeros(
|
||||
sel_shape + function(start_frame, start_frame).shape,
|
||||
)
|
||||
count = np.zeros(sel_shape, dtype=int)
|
||||
is_first_frame_loop = True
|
||||
|
||||
def cc(act_frame):
|
||||
nonlocal is_first_frame_loop
|
||||
for index in np.ndindex(sel_shape):
|
||||
sel = selectors[index]
|
||||
sf_sel = start_frame[sel]
|
||||
if is_first_frame_loop:
|
||||
count[index] = len(sf_sel)
|
||||
f_values[index] = (
|
||||
function(sf_sel, act_frame[sel]) if count[index] != 0 else 0
|
||||
)
|
||||
is_first_frame_loop = False
|
||||
return np.asarray(f_values.copy())
|
||||
|
||||
return map(cc, chain([start_frame], iterator)), count
|
||||
|
||||
return cmulti
|
||||
|
||||
|
||||
@autosave_data(2)
|
||||
def shifted_correlation(
|
||||
function,
|
||||
frames,
|
||||
selector=None,
|
||||
multi_selector=None,
|
||||
segments=10,
|
||||
skip=0.1,
|
||||
window=0.5,
|
||||
average=True,
|
||||
points=100,
|
||||
nodes=1,
|
||||
):
|
||||
"""
|
||||
Calculate the time series for a correlation function.
|
||||
|
||||
The times at which the correlation is calculated are determined by
|
||||
a logarithmic distribution.
|
||||
|
||||
Args:
|
||||
function: The function that should be correlated
|
||||
frames: The coordinates of the simulation data
|
||||
selector (opt.):
|
||||
A function that returns the indices depending on
|
||||
the staring frame for which particles the
|
||||
correlation should be calculated.
|
||||
Can not be used with multi_selector.
|
||||
multi_selector (opt.):
|
||||
A function that returns multiple lists of indices depending on
|
||||
the staring frame for which particles the
|
||||
correlation should be calculated.
|
||||
Can not be used with selector.
|
||||
segments (int, opt.):
|
||||
The number of segments the time window will be
|
||||
shifted
|
||||
skip (float, opt.):
|
||||
The fraction of the trajectory that will be skipped
|
||||
at the beginning, if this is None the start index
|
||||
of the frames slice will be used, which defaults
|
||||
to 0.1.
|
||||
window (float, opt.):
|
||||
The fraction of the simulation the time series will
|
||||
cover
|
||||
average (bool, opt.):
|
||||
If True, returns averaged correlation function
|
||||
points (int, opt.):
|
||||
The number of timeshifts for which the correlation
|
||||
should be calculated
|
||||
nodes (int, opt.):
|
||||
Number of nodes used for parallelization
|
||||
|
||||
Returns:
|
||||
tuple:
|
||||
A list of length N that contains the timeshiftes of the frames at which
|
||||
the time series was calculated and a numpy array of shape (segments, N)
|
||||
that holds the (non-avaraged) correlation data
|
||||
|
||||
Example:
|
||||
Calculating the mean square displacement of a coordinates object named ``coords``:
|
||||
|
||||
>>> time, data = shifted_correlation(msd, coords)
|
||||
"""
|
||||
|
||||
def get_correlation(frames, start_frame, index, shifted_idx):
|
||||
if len(index) == 0:
|
||||
correlation = np.zeros(len(shifted_idx))
|
||||
else:
|
||||
start = frames[start_frame][index]
|
||||
correlation = np.array(
|
||||
[function(start, frames[frame][index]) for frame in shifted_idx]
|
||||
)
|
||||
return correlation
|
||||
|
||||
def apply_selector(start_frame, frames, idx, selector=None, multi_selector=None):
|
||||
shifted_idx = idx + start_frame
|
||||
if selector is None and multi_selector is None:
|
||||
index = np.arange(len(frames[start_frame]))
|
||||
return get_correlation(frames, start_frame, index, shifted_idx)
|
||||
|
||||
elif selector is not None and multi_selector is not None:
|
||||
raise ValueError(
|
||||
"selector and multi_selector can not be used at the same time"
|
||||
)
|
||||
|
||||
elif selector is not None:
|
||||
index = selector(frames[start_frame])
|
||||
return get_correlation(frames, start_frame, index, shifted_idx)
|
||||
|
||||
elif multi_selector is not None:
|
||||
indices = multi_selector(frames[start_frame])
|
||||
correlation = []
|
||||
for index in indices:
|
||||
correlation.append(
|
||||
get_correlation(
|
||||
frames, start_frame, index, shifted_idx
|
||||
)
|
||||
)
|
||||
return correlation
|
||||
|
||||
if 1 - skip < window:
|
||||
window = 1 - skip
|
||||
|
||||
start_frames = np.unique(
|
||||
np.linspace(
|
||||
len(frames) * skip,
|
||||
len(frames) * (1 - window),
|
||||
num=segments,
|
||||
endpoint=False,
|
||||
dtype=int,
|
||||
)
|
||||
)
|
||||
|
||||
num_frames = int(len(frames) * window)
|
||||
ls = np.logspace(0, np.log10(num_frames + 1), num=points)
|
||||
idx = np.unique(np.int_(ls) - 1)
|
||||
t = np.array([frames[i].time for i in idx]) - frames[0].time
|
||||
|
||||
if nodes == 1:
|
||||
result = np.array(
|
||||
[
|
||||
apply_selector(start_frame, frames=frames, idx=idx,
|
||||
selector=selector,
|
||||
multi_selector=multi_selector)
|
||||
for start_frame in start_frames
|
||||
]
|
||||
)
|
||||
else:
|
||||
pool = ProcessPool(nodes=nodes)
|
||||
# Use try finally instead of a context manager to ensure the pool is
|
||||
# restarted in case of working in a jupyter-notebook,
|
||||
# otherwise the kernel has to be restarted.
|
||||
try:
|
||||
result = np.array(
|
||||
pool.map(
|
||||
partial(apply_selector,
|
||||
frames=frames,
|
||||
idx=idx,
|
||||
selector=selector,
|
||||
multi_selector=multi_selector), start_frames
|
||||
)
|
||||
)
|
||||
finally:
|
||||
pool.terminate()
|
||||
pool.restart()
|
||||
|
||||
if average:
|
||||
clean_result = []
|
||||
for entry in result:
|
||||
if np.all(entry == 0):
|
||||
continue
|
||||
else:
|
||||
clean_result.append(entry)
|
||||
result = np.array(clean_result)
|
||||
result = np.average(result, axis=0)
|
||||
return t, result
|
||||
|
||||
|
||||
def msd(start, frame):
|
||||
"""
|
||||
Mean square displacement
|
||||
"""
|
||||
vec = start - frame
|
||||
return (vec**2).sum(axis=1).mean()
|
||||
|
||||
|
||||
def isf(start, frame, q, box=None):
|
||||
"""
|
||||
Incoherent intermediate scattering function. To specify q, use
|
||||
water_isf = functools.partial(isf, q=22.77) # q has the value 22.77 nm^-1
|
||||
|
||||
:param q: length of scattering vector
|
||||
"""
|
||||
vec = start - frame
|
||||
distance = (vec**2).sum(axis=1) ** 0.5
|
||||
return np.sinc(distance * q / np.pi).mean()
|
||||
|
||||
|
||||
def rotational_autocorrelation(onset, frame, order=2):
|
||||
"""
|
||||
Compute the rotational autocorrelation of the legendre polynomial for the
|
||||
given vectors.
|
||||
|
||||
Args:
|
||||
onset, frame: CoordinateFrames of vectors
|
||||
order (opt.): Order of the legendre polynomial.
|
||||
|
||||
Returns:
|
||||
Scalar value of the correlation function.
|
||||
"""
|
||||
scalar_prod = (onset * frame).sum(axis=-1)
|
||||
poly = legendre(order)
|
||||
return poly(scalar_prod).mean()
|
||||
|
||||
|
||||
def van_hove_self(start, end, bins):
|
||||
r"""
|
||||
Compute the self part of the Van Hove autocorrelation function.
|
||||
|
||||
..math::
|
||||
G(r, t) = \sum_i \delta(|\vec r_i(0) - \vec r_i(t)| - r)
|
||||
"""
|
||||
vec = start - end
|
||||
delta_r = ((vec) ** 2).sum(axis=-1) ** 0.5
|
||||
return 1 / len(start) * histogram(delta_r, bins)[0]
|
||||
|
||||
|
||||
def van_hove_distinct(
|
||||
onset, frame, bins, box=None, use_dask=True, comp=False, bincount=True
|
||||
):
|
||||
r"""
|
||||
Compute the distinct part of the Van Hove autocorrelation function.
|
||||
|
||||
..math::
|
||||
G(r, t) = \sum_{i, j} \delta(|\vec r_i(0) - \vec r_j(t)| - r)
|
||||
"""
|
||||
if box is None:
|
||||
box = onset.box.diagonal()
|
||||
dimension = len(box)
|
||||
N = len(onset)
|
||||
if use_dask:
|
||||
onset = darray.from_array(onset, chunks=(500, dimension)).reshape(
|
||||
1, N, dimension
|
||||
)
|
||||
frame = darray.from_array(frame, chunks=(500, dimension)).reshape(
|
||||
N, 1, dimension
|
||||
)
|
||||
dist = ((pbc_diff(onset, frame, box) ** 2).sum(axis=-1) ** 0.5).ravel()
|
||||
if np.diff(bins).std() < 1e6:
|
||||
dx = bins[0] - bins[1]
|
||||
hist = darray.bincount((dist // dx).astype(int), minlength=(len(bins) - 1))
|
||||
else:
|
||||
hist = darray.histogram(dist, bins=bins)[0]
|
||||
return hist.compute() / N
|
||||
else:
|
||||
if comp:
|
||||
dx = bins[1] - bins[0]
|
||||
minlength = len(bins) - 1
|
||||
|
||||
def f(x):
|
||||
d = (pbc_diff(x, frame, box) ** 2).sum(axis=-1) ** 0.5
|
||||
return np.bincount((d // dx).astype(int), minlength=minlength)[
|
||||
:minlength
|
||||
]
|
||||
|
||||
hist = sum(f(x) for x in onset)
|
||||
else:
|
||||
dist = (
|
||||
pbc_diff(onset.reshape(1, -1, 3), frame.reshape(-1, 1, 3), box) ** 2
|
||||
).sum(axis=-1) ** 0.5
|
||||
hist = histogram(dist, bins=bins)[0]
|
||||
return hist / N
|
||||
|
||||
|
||||
def overlap(onset, frame, crds_tree, radius):
|
||||
"""
|
||||
Compute the overlap with a reference configuration defined in a CoordinatesTree.
|
||||
|
||||
Args:
|
||||
onset: Initial frame, this is only used to get the frame index
|
||||
frame: The current configuration
|
||||
crds_tree: A CoordinatesTree of the reference configurations
|
||||
radius: The cutoff radius for the overlap
|
||||
|
||||
This function is intended to be used with :func:`shifted_correlation`.
|
||||
As usual the first two arguments are used internally and the remaining ones
|
||||
should be defined with :func:`functools.partial`.
|
||||
|
||||
If the overlap of a subset of the system should be calculated, this has to be
|
||||
defined through a selection of the reference configurations in the CoordinatesTree.
|
||||
|
||||
Example:
|
||||
>>> shifted_correlation(
|
||||
... partial(overlap, crds_tree=CoordinatesTree(traj), radius=0.11),
|
||||
... traj
|
||||
... )
|
||||
"""
|
||||
tree = crds_tree[onset.step]
|
||||
return (tree.query(frame)[0] <= radius).sum() / tree.n
|
||||
|
||||
|
||||
def susceptibility(time, correlation, **kwargs):
|
||||
"""
|
||||
Calculate the susceptibility of a correlation function.
|
||||
|
||||
Args:
|
||||
time: Timesteps of the correlation data
|
||||
correlation: Value of the correlation function
|
||||
**kwargs (opt.):
|
||||
Additional keyword arguments will be passed to :func:`filon_fourier_transformation`.
|
||||
"""
|
||||
frequencies, fourier = filon_fourier_transformation(
|
||||
time, correlation, imag=False, **kwargs
|
||||
)
|
||||
return frequencies, frequencies * fourier
|
||||
|
||||
|
||||
def coherent_scattering_function(onset, frame, q):
|
||||
"""
|
||||
Calculate the coherent scattering function.
|
||||
"""
|
||||
box = onset.box.diagonal()
|
||||
dimension = len(box)
|
||||
|
||||
def scfunc(x, y):
|
||||
sqdist = 0
|
||||
for i in range(dimension):
|
||||
d = x[i] - y[i]
|
||||
if d > box[i] / 2:
|
||||
d -= box[i]
|
||||
if d < -box[i] / 2:
|
||||
d += box[i]
|
||||
sqdist += d**2
|
||||
x = sqdist**0.5 * q
|
||||
if x == 0:
|
||||
return 1.0
|
||||
else:
|
||||
return np.sin(x) / x
|
||||
|
||||
return coherent_sum(scfunc, onset.pbc, frame.pbc) / len(onset)
|
||||
|
||||
|
||||
def non_gaussian(onset, frame):
|
||||
"""
|
||||
Calculate the Non-Gaussian parameter :
|
||||
..math:
|
||||
\alpha_2 (t) = \frac{3}{5}\frac{\langle r_i^4(t)\rangle}{\langle r_i^2(t)\rangle^2} - 1
|
||||
"""
|
||||
r_2 = ((frame - onset) ** 2).sum(axis=-1)
|
||||
return 3 / 5 * (r_2**2).mean() / r_2.mean() ** 2 - 1
|
||||
515
src/mdevaluate/distribution.py
Normal file
515
src/mdevaluate/distribution.py
Normal file
@@ -0,0 +1,515 @@
|
||||
import numpy as np
|
||||
|
||||
from .coordinates import rotate_axis, polar_coordinates, spherical_coordinates
|
||||
from .atoms import next_neighbors
|
||||
from .autosave import autosave_data
|
||||
from .utils import runningmean
|
||||
from .pbc import pbc_diff, pbc_points
|
||||
from .logging import logger
|
||||
from scipy import spatial
|
||||
|
||||
|
||||
@autosave_data(nargs=2, kwargs_keys=("coordinates_b",), version="time_average-1")
|
||||
def time_average(function, coordinates, coordinates_b=None, pool=None):
|
||||
"""
|
||||
Compute the time average of a function.
|
||||
|
||||
Args:
|
||||
function:
|
||||
The function that will be averaged, it has to accept exactly one argument
|
||||
which is the current atom set
|
||||
coordinates: The coordinates object of the simulation
|
||||
pool (multiprocessing.Pool, opt.):
|
||||
A multiprocessing pool which will be used for cocurrent calculation of the
|
||||
averaged function
|
||||
|
||||
"""
|
||||
if pool is not None:
|
||||
_map = pool.imap
|
||||
else:
|
||||
_map = map
|
||||
|
||||
number_of_averages = 0
|
||||
result = 0
|
||||
|
||||
if coordinates_b is not None:
|
||||
if coordinates._slice != coordinates_b._slice:
|
||||
logger.warning("Different slice for coordinates and coordinates_b.")
|
||||
coordinate_iter = (iter(coordinates), iter(coordinates_b))
|
||||
else:
|
||||
coordinate_iter = (iter(coordinates),)
|
||||
|
||||
evaluated = _map(function, *coordinate_iter)
|
||||
|
||||
for ev in evaluated:
|
||||
number_of_averages += 1
|
||||
result += ev
|
||||
if number_of_averages % 100 == 0:
|
||||
logger.debug("time_average: %d", number_of_averages)
|
||||
|
||||
return result / number_of_averages
|
||||
|
||||
|
||||
def time_histogram(function, coordinates, bins, hist_range, pool=None):
|
||||
coordinate_iter = iter(coordinates)
|
||||
|
||||
if pool is not None:
|
||||
_map = pool.imap
|
||||
else:
|
||||
_map = map
|
||||
|
||||
evaluated = _map(function, coordinate_iter)
|
||||
|
||||
results = []
|
||||
hist_results = []
|
||||
for num, ev in enumerate(evaluated):
|
||||
results.append(ev)
|
||||
|
||||
if num % 100 == 0 and num > 0:
|
||||
print(num)
|
||||
r = np.array(results).T
|
||||
for i, row in enumerate(r):
|
||||
histo, _ = np.histogram(row, bins=bins, range=hist_range)
|
||||
if len(hist_results) <= i:
|
||||
hist_results.append(histo)
|
||||
else:
|
||||
hist_results[i] += histo
|
||||
results = []
|
||||
return hist_results
|
||||
|
||||
|
||||
def rdf(
|
||||
atoms_a,
|
||||
atoms_b=None,
|
||||
bins=None,
|
||||
box=None,
|
||||
kind=None,
|
||||
chunksize=50000,
|
||||
returnx=False,
|
||||
**kwargs
|
||||
):
|
||||
r"""
|
||||
Compute the radial pair distribution of one or two sets of atoms.
|
||||
|
||||
.. math::
|
||||
g_{AB}(r) = \frac{1}{\langle \rho_B\rangle N_A}\sum\limits_{i\in A}^{N_A}
|
||||
\sum\limits_{j\in B}^{N_B}\frac{\delta(r_{ij} -r)}{4\pi r^2}
|
||||
|
||||
For use with :func:`time_average`, define bins through the use of :func:`~functools.partial`,
|
||||
the atom sets are passed to :func:`time_average`, if a second set of atoms should be used
|
||||
specify it as ``coordinates_b`` and it will be passed to this function.
|
||||
|
||||
Args:
|
||||
atoms_a: First set of atoms, used internally
|
||||
atoms_b (opt.): Second set of atoms, used internally
|
||||
bins: Bins of the radial distribution function
|
||||
box (opt.): Simulations box, if not specified this is taken from ``atoms_a.box``
|
||||
kind (opt.): Can be 'inter', 'intra' or None (default).
|
||||
chunksize (opt.):
|
||||
For large systems (N > 1000) the distaces have to be computed in chunks so the arrays
|
||||
fit into memory, this parameter controlls the size of these chunks. It should be
|
||||
as large as possible, depending on the available memory.
|
||||
returnx (opt.): If True the x ordinate of the histogram is returned.
|
||||
"""
|
||||
assert bins is not None, "Bins of the pair distribution have to be defined."
|
||||
assert kind in [
|
||||
"intra",
|
||||
"inter",
|
||||
None,
|
||||
], "Argument kind must be one of the following: intra, inter, None."
|
||||
if box is None:
|
||||
box = atoms_a.box.diagonal()
|
||||
if atoms_b is None:
|
||||
atoms_b = atoms_a
|
||||
nr_of_atoms = len(atoms_a)
|
||||
indices = np.triu_indices(nr_of_atoms, k=1)
|
||||
else:
|
||||
nr_a, dim = atoms_a.shape
|
||||
nr_b, dim = atoms_b.shape
|
||||
indices = np.array([(i, j) for i in range(nr_a) for j in range(nr_b)]).T
|
||||
|
||||
# compute the histogram in chunks for large systems
|
||||
hist = 0
|
||||
nr_of_samples = 0
|
||||
for chunk in range(0, len(indices[0]), chunksize):
|
||||
sl = slice(chunk, chunk + chunksize)
|
||||
diff = pbc_diff(atoms_a[indices[0][sl]], atoms_b[indices[1][sl]], box)
|
||||
dist = (diff**2).sum(axis=1) ** 0.5
|
||||
if kind == "intra":
|
||||
mask = (
|
||||
atoms_a.residue_ids[indices[0][sl]]
|
||||
== atoms_b.residue_ids[indices[1][sl]]
|
||||
)
|
||||
dist = dist[mask]
|
||||
elif kind == "inter":
|
||||
mask = (
|
||||
atoms_a.residue_ids[indices[0][sl]]
|
||||
!= atoms_b.residue_ids[indices[1][sl]]
|
||||
)
|
||||
dist = dist[mask]
|
||||
|
||||
nr_of_samples += len(dist)
|
||||
hist += np.histogram(dist, bins)[0]
|
||||
|
||||
volume = 4 / 3 * np.pi * (bins[1:] ** 3 - bins[:-1] ** 3)
|
||||
density = nr_of_samples / np.prod(box)
|
||||
res = hist / volume / density
|
||||
if returnx:
|
||||
return np.vstack((runningmean(bins, 2), res))
|
||||
else:
|
||||
return res
|
||||
|
||||
|
||||
def pbc_tree_rdf(
|
||||
atoms_a, atoms_b=None, bins=None, box=None, exclude=0, returnx=False, **kwargs
|
||||
):
|
||||
if box is None:
|
||||
box = atoms_a.box.diagonal()
|
||||
all_coords = pbc_points(atoms_b, box, thickness=np.amax(bins) + 0.1)
|
||||
to_tree = spatial.cKDTree(all_coords)
|
||||
dist = to_tree.query(
|
||||
pbc_diff(atoms_a, box=box),
|
||||
k=len(atoms_b),
|
||||
distance_upper_bound=np.amax(bins) + 0.1,
|
||||
)[0].flatten()
|
||||
dist = dist[dist < np.inf]
|
||||
hist = np.histogram(dist, bins)[0]
|
||||
volume = 4 / 3 * np.pi * (bins[1:] ** 3 - bins[:-1] ** 3)
|
||||
res = (hist) * np.prod(box) / volume / len(atoms_a) / (len(atoms_b) - exclude)
|
||||
if returnx:
|
||||
return np.vstack((runningmean(bins, 2), res))
|
||||
else:
|
||||
return res
|
||||
|
||||
|
||||
def pbc_spm_rdf(
|
||||
atoms_a, atoms_b=None, bins=None, box=None, exclude=0, returnx=False, **kwargs
|
||||
):
|
||||
if box is None:
|
||||
box = atoms_a.box
|
||||
all_coords = pbc_points(atoms_b, box, thickness=np.amax(bins) + 0.1)
|
||||
to_tree = spatial.cKDTree(all_coords)
|
||||
if all_coords.nbytes / 1024**3 * len(atoms_a) < 2:
|
||||
from_tree = spatial.cKDTree(pbc_diff(atoms_a, box=box))
|
||||
dist = to_tree.sparse_distance_matrix(
|
||||
from_tree, max_distance=np.amax(bins) + 0.1, output_type="ndarray"
|
||||
)
|
||||
dist = np.asarray(dist.tolist())[:, 2]
|
||||
hist = np.histogram(dist, bins)[0]
|
||||
else:
|
||||
chunksize = int(
|
||||
2 * len(atoms_a) / (all_coords.nbytes / 1024**3 * len(atoms_a))
|
||||
)
|
||||
hist = 0
|
||||
for chunk in range(0, len(atoms_a), chunksize):
|
||||
sl = slice(chunk, chunk + chunksize)
|
||||
from_tree = spatial.cKDTree(pbc_diff(atoms_a[sl], box=box))
|
||||
dist = to_tree.sparse_distance_matrix(
|
||||
from_tree, max_distance=np.amax(bins) + 0.1, output_type="ndarray"
|
||||
)
|
||||
dist = np.asarray(dist.tolist())[:, 2]
|
||||
hist += np.histogram(dist, bins)[0]
|
||||
|
||||
volume = 4 / 3 * np.pi * (bins[1:] ** 3 - bins[:-1] ** 3)
|
||||
res = (hist) * np.prod(box) / volume / len(atoms_a) / (len(atoms_b) - exclude)
|
||||
if returnx:
|
||||
return np.vstack((runningmean(bins, 2), res))
|
||||
else:
|
||||
return res
|
||||
|
||||
|
||||
@autosave_data(nargs=2, kwargs_keys=("to_coords", "times"))
|
||||
def fast_averaged_rdf(from_coords, bins, to_coords=None, times=10, exclude=0, **kwargs):
|
||||
if to_coords is None:
|
||||
to_coords = from_coords
|
||||
exclude = 1
|
||||
# first find timings for the different rdf functions
|
||||
import time
|
||||
|
||||
# only consider sparse matrix for this condition
|
||||
if (len(from_coords[0]) * len(to_coords[0]) <= 3000 * 2000) & (
|
||||
len(from_coords[0]) / len(to_coords[0]) > 5
|
||||
):
|
||||
funcs = [rdf, pbc_tree_rdf, pbc_spm_rdf]
|
||||
else:
|
||||
funcs = [rdf, pbc_tree_rdf]
|
||||
timings = []
|
||||
for f in funcs:
|
||||
start = time.time()
|
||||
f(
|
||||
from_coords[0],
|
||||
atoms_b=to_coords[0],
|
||||
bins=bins,
|
||||
box=np.diag(from_coords[0].box),
|
||||
)
|
||||
end = time.time()
|
||||
timings.append(end - start)
|
||||
timings = np.array(timings)
|
||||
timings[0] = (
|
||||
2 * timings[0]
|
||||
) # statistics for the other functions is twice as good per frame
|
||||
logger.debug("rdf function timings: " + str(timings))
|
||||
rdffunc = funcs[np.argmin(timings)]
|
||||
logger.debug("rdf function used: " + str(rdffunc))
|
||||
if rdffunc == rdf:
|
||||
times = times * 2 # duplicate times for same statistics
|
||||
|
||||
frames = np.array(range(0, len(from_coords), int(len(from_coords) / times)))[:times]
|
||||
out = np.zeros(len(bins) - 1)
|
||||
for j, i in enumerate(frames):
|
||||
logger.debug("multi_radial_pair_distribution: %d/%d", j, len(frames))
|
||||
out += rdffunc(
|
||||
from_coords[i],
|
||||
to_coords[i],
|
||||
bins,
|
||||
box=np.diag(from_coords[i].box),
|
||||
exclude=exclude,
|
||||
)
|
||||
return out / len(frames)
|
||||
|
||||
|
||||
def distance_distribution(atoms, bins):
|
||||
connection_vectors = atoms[:-1, :] - atoms[1:, :]
|
||||
connection_lengths = (connection_vectors**2).sum(axis=1) ** 0.5
|
||||
return np.histogram(connection_lengths, bins)[0]
|
||||
|
||||
|
||||
def tetrahedral_order(atoms, reference_atoms=None):
|
||||
if reference_atoms is None:
|
||||
reference_atoms = atoms
|
||||
indices = next_neighbors(reference_atoms, query_atoms=atoms, number_of_neighbors=4)
|
||||
neighbors = reference_atoms[indices]
|
||||
neighbors_1, neighbors_2, neighbors_3, neighbors_4 = (
|
||||
neighbors[:, 0, :],
|
||||
neighbors[:, 1, :],
|
||||
neighbors[:, 2, :],
|
||||
neighbors[:, 3, :],
|
||||
)
|
||||
|
||||
# Connection vectors
|
||||
neighbors_1 -= atoms
|
||||
neighbors_2 -= atoms
|
||||
neighbors_3 -= atoms
|
||||
neighbors_4 -= atoms
|
||||
|
||||
# Normed Connection vectors
|
||||
neighbors_1 /= np.linalg.norm(neighbors_1, axis=-1).reshape(-1, 1)
|
||||
neighbors_2 /= np.linalg.norm(neighbors_2, axis=-1).reshape(-1, 1)
|
||||
neighbors_3 /= np.linalg.norm(neighbors_3, axis=-1).reshape(-1, 1)
|
||||
neighbors_4 /= np.linalg.norm(neighbors_4, axis=-1).reshape(-1, 1)
|
||||
|
||||
a_1_2 = ((neighbors_1 * neighbors_2).sum(axis=1) + 1 / 3) ** 2
|
||||
a_1_3 = ((neighbors_1 * neighbors_3).sum(axis=1) + 1 / 3) ** 2
|
||||
a_1_4 = ((neighbors_1 * neighbors_4).sum(axis=1) + 1 / 3) ** 2
|
||||
|
||||
a_2_3 = ((neighbors_2 * neighbors_3).sum(axis=1) + 1 / 3) ** 2
|
||||
a_2_4 = ((neighbors_2 * neighbors_4).sum(axis=1) + 1 / 3) ** 2
|
||||
|
||||
a_3_4 = ((neighbors_3 * neighbors_4).sum(axis=1) + 1 / 3) ** 2
|
||||
|
||||
q = 1 - 3 / 8 * (a_1_2 + a_1_3 + a_1_4 + a_2_3 + a_2_4 + a_3_4)
|
||||
|
||||
return q
|
||||
|
||||
|
||||
def tetrahedral_order_distribution(atoms, reference_atoms=None, bins=None):
|
||||
assert bins is not None, "Bin edges of the distribution have to be specified."
|
||||
Q = tetrahedral_order(atoms, reference_atoms=reference_atoms)
|
||||
return np.histogram(Q, bins=bins)[0]
|
||||
|
||||
|
||||
def radial_density(
|
||||
atoms, bins, symmetry_axis=(0, 0, 1), origin=(0, 0, 0), height=1, returnx=False
|
||||
):
|
||||
"""
|
||||
Calculate the radial density distribution.
|
||||
|
||||
This function is meant to be used with time_average.
|
||||
|
||||
Args:
|
||||
atoms:
|
||||
Set of coordinates.
|
||||
bins:
|
||||
Bin specification that is passed to numpy.histogram. This needs to be
|
||||
a list of bin edges if the function is used within time_average.
|
||||
symmetry_axis (opt.):
|
||||
Vector of the symmetry axis, around which the radial density is calculated,
|
||||
default is z-axis.
|
||||
origin (opt.):
|
||||
Origin of the rotational symmetry, e.g. center of the pore.
|
||||
height (opt.):
|
||||
Height of the pore, necessary for correct normalization of the density.
|
||||
returnx (opt.):
|
||||
If True, the x ordinate of the distribution is returned.
|
||||
"""
|
||||
cartesian = rotate_axis(atoms - origin, symmetry_axis)
|
||||
radius, _ = polar_coordinates(cartesian[:, 0], cartesian[:, 1])
|
||||
hist = np.histogram(radius, bins=bins)[0]
|
||||
volume = np.pi * (bins[1:] ** 2 - bins[:-1] ** 2) * height
|
||||
res = hist / volume
|
||||
if returnx:
|
||||
return np.vstack((runningmean(bins, 2), res))
|
||||
else:
|
||||
return res
|
||||
|
||||
|
||||
def shell_density(
|
||||
atoms,
|
||||
shell_radius,
|
||||
bins,
|
||||
shell_thickness=0.5,
|
||||
symmetry_axis=(0, 0, 1),
|
||||
origin=(0, 0, 0),
|
||||
):
|
||||
"""
|
||||
Compute the density distribution on a cylindrical shell.
|
||||
|
||||
Args:
|
||||
atoms: The coordinates of the atoms
|
||||
shell_radius: Inner radius of the shell
|
||||
bins: Histogramm bins, this has to be a two-dimensional list of bins: [angle, z]
|
||||
shell_thickness (opt.): Thicknes of the shell, default is 0.5
|
||||
symmetry_axis (opt.): The symmtery axis of the pore, the coordinates will be
|
||||
rotated such that this axis is the z-axis
|
||||
origin (opt.): Origin of the pore, the coordinates will be moved such that this
|
||||
is the new origin.
|
||||
|
||||
Returns:
|
||||
Two-dimensional density distribution of the atoms in the defined shell.
|
||||
"""
|
||||
cartesian = rotate_axis(atoms - origin, symmetry_axis)
|
||||
radius, theta = polar_coordinates(cartesian[:, 0], cartesian[:, 1])
|
||||
shell_indices = (shell_radius <= radius) & (
|
||||
radius <= shell_radius + shell_thickness
|
||||
)
|
||||
hist = np.histogram2d(theta[shell_indices], cartesian[shell_indices, 2], bins)[0]
|
||||
|
||||
return hist
|
||||
|
||||
|
||||
def spatial_density(atoms, bins, weights=None):
|
||||
"""
|
||||
Compute the spatial density distribution.
|
||||
"""
|
||||
density, _ = np.histogramdd(atoms, bins=bins, weights=weights)
|
||||
return density
|
||||
|
||||
|
||||
def mixing_ratio_distribution(
|
||||
atoms_a,
|
||||
atoms_b,
|
||||
bins_ratio,
|
||||
bins_density,
|
||||
weights_a=None,
|
||||
weights_b=None,
|
||||
weights_ratio=None,
|
||||
):
|
||||
"""
|
||||
Compute the distribution of the mixing ratio of two sets of atoms.
|
||||
"""
|
||||
|
||||
density_a, _ = time_average
|
||||
density_b, _ = np.histogramdd(atoms_b, bins=bins_density, weights=weights_b)
|
||||
mixing_ratio = density_a / (density_a + density_b)
|
||||
good_inds = (density_a != 0) & (density_b != 0)
|
||||
hist, _ = np.histogram(
|
||||
mixing_ratio[good_inds], bins=bins_ratio, weights=weights_ratio
|
||||
)
|
||||
return hist
|
||||
|
||||
|
||||
def next_neighbor_distribution(
|
||||
atoms, reference=None, number_of_neighbors=4, bins=None, normed=True
|
||||
):
|
||||
"""
|
||||
Compute the distribution of next neighbors with the same residue name.
|
||||
"""
|
||||
assert bins is not None, "Bins have to be specified."
|
||||
if reference is None:
|
||||
reference = atoms
|
||||
nn = next_neighbors(
|
||||
reference, query_atoms=atoms, number_of_neighbors=number_of_neighbors
|
||||
)
|
||||
resname_nn = reference.residue_names[nn]
|
||||
count_nn = (resname_nn == atoms.residue_names.reshape(-1, 1)).sum(axis=1)
|
||||
return np.histogram(count_nn, bins=bins, normed=normed)[0]
|
||||
|
||||
|
||||
def hbonds(
|
||||
D,
|
||||
H,
|
||||
A,
|
||||
box,
|
||||
DA_lim=0.35,
|
||||
HA_lim=0.35,
|
||||
min_cos=np.cos(30 * np.pi / 180),
|
||||
full_output=False,
|
||||
):
|
||||
"""
|
||||
Compute h-bond pairs
|
||||
|
||||
Args:
|
||||
D: Set of coordinates for donators.
|
||||
H: Set of coordinates for hydrogen atoms. Should have the same
|
||||
length as D.
|
||||
A: Set of coordinates for acceptors.
|
||||
DA_lim (opt.): Minimum distance beteen donator and acceptor.
|
||||
HA_lim (opt.): Minimum distance beteen hydrogen and acceptor.
|
||||
min_cos (opt.): Minimum cosine for the HDA angle. Default is
|
||||
equivalent to a maximum angle of 30 degree.
|
||||
full_output (opt.): Returns additionally the cosine of the
|
||||
angles and the DA distances
|
||||
|
||||
Return:
|
||||
List of (D,A)-pairs in hbonds.
|
||||
"""
|
||||
|
||||
def dist_DltA(D, H, A, box, max_dist=0.35):
|
||||
ppoints, pind = pbc_points(D, box, thickness=max_dist + 0.1, index=True)
|
||||
Dtree = spatial.cKDTree(ppoints)
|
||||
Atree = spatial.cKDTree(A)
|
||||
pairs = Dtree.sparse_distance_matrix(Atree, max_dist, output_type="ndarray")
|
||||
pairs = np.asarray(pairs.tolist())
|
||||
pairs = np.int_(pairs[pairs[:, 2] > 0][:, :2])
|
||||
pairs[:, 0] = pind[pairs[:, 0]]
|
||||
return pairs
|
||||
|
||||
def dist_AltD(D, H, A, box, max_dist=0.35):
|
||||
ppoints, pind = pbc_points(A, box, thickness=max_dist + 0.1, index=True)
|
||||
Atree = spatial.cKDTree(ppoints)
|
||||
Dtree = spatial.cKDTree(D)
|
||||
pairs = Atree.sparse_distance_matrix(Dtree, max_dist, output_type="ndarray")
|
||||
pairs = np.asarray(pairs.tolist())
|
||||
pairs = np.int_(pairs[pairs[:, 2] > 0][:, :2])
|
||||
pairs = pairs[:, ::-1]
|
||||
pairs[:, 1] = pind[pairs[:, 1]]
|
||||
return pairs
|
||||
|
||||
if len(D) <= len(A):
|
||||
pairs = dist_DltA(D, H, A, box, DA_lim)
|
||||
else:
|
||||
pairs = dist_AltD(D, H, A, box, DA_lim)
|
||||
|
||||
vDH = pbc_diff(D[pairs[:, 0]], H[pairs[:, 0]], box)
|
||||
vDA = pbc_diff(D[pairs[:, 0]], A[pairs[:, 1]], box)
|
||||
vHA = pbc_diff(H[pairs[:, 0]], A[pairs[:, 1]], box)
|
||||
angles_cos = np.clip(
|
||||
np.einsum("ij,ij->i", vDH, vDA)
|
||||
/ np.linalg.norm(vDH, axis=1)
|
||||
/ np.linalg.norm(vDA, axis=1),
|
||||
-1,
|
||||
1,
|
||||
)
|
||||
is_bond = (
|
||||
(angles_cos >= min_cos)
|
||||
& (np.sum(vHA**2, axis=-1) <= HA_lim**2)
|
||||
& (np.sum(vDA**2, axis=-1) <= DA_lim**2)
|
||||
)
|
||||
if full_output:
|
||||
return (
|
||||
pairs[is_bond],
|
||||
angles_cos[is_bond],
|
||||
np.sum(vDA[is_bond] ** 2, axis=-1) ** 0.5,
|
||||
)
|
||||
else:
|
||||
return pairs[is_bond]
|
||||
43
src/mdevaluate/functions.py
Normal file
43
src/mdevaluate/functions.py
Normal file
@@ -0,0 +1,43 @@
|
||||
import numpy as np
|
||||
|
||||
|
||||
def kww(t, A, τ, β):
|
||||
return A * np.exp(-((t / τ) ** β))
|
||||
|
||||
|
||||
def kww_1e(A, τ, β):
|
||||
return τ * (-np.log(1 / (np.e * A))) ** (1 / β)
|
||||
|
||||
|
||||
def cole_davidson(w, A, b, t0):
|
||||
P = np.arctan(w * t0)
|
||||
return A * np.cos(P) ** b * np.sin(b * P)
|
||||
|
||||
|
||||
def cole_cole(w, A, b, t0):
|
||||
return (
|
||||
A
|
||||
* (w * t0) ** b
|
||||
* np.sin(np.pi * b / 2)
|
||||
/ (1 + 2 * (w * t0) ** b * np.cos(np.pi * b / 2) + (w * t0) ** (2 * b))
|
||||
)
|
||||
|
||||
|
||||
def havriliak_negami(ω, A, β, α, τ):
|
||||
r"""
|
||||
Imaginary part of the Havriliak-Negami function.
|
||||
|
||||
.. math::
|
||||
\chi_{HN}(\omega) = \Im\left(\frac{A}{(1 + (i\omega\tau)^\alpha)^\beta}\right)
|
||||
"""
|
||||
return -(A / (1 + (1j * ω * τ) ** α) ** β).imag
|
||||
|
||||
|
||||
# fits decay of correlation times, e.g. with distance to pore walls
|
||||
def colen(d, X, t8, A):
|
||||
return t8 * np.exp(A * np.exp(-d / X))
|
||||
|
||||
|
||||
# fits decay of the plateau height of the overlap function, e.g. with distance to pore walls
|
||||
def colenQ(d, X, Qb, g):
|
||||
return (1 - Qb) * np.exp(-((d / X) ** g)) + Qb
|
||||
29
src/mdevaluate/logging.py
Normal file
29
src/mdevaluate/logging.py
Normal file
@@ -0,0 +1,29 @@
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger("mdevaluate")
|
||||
logger.setLevel(logging.INFO)
|
||||
stream_handler = logging.StreamHandler()
|
||||
stream_handler.setLevel(logging.INFO)
|
||||
logger.addHandler(stream_handler)
|
||||
|
||||
formatter = logging.Formatter(
|
||||
"{levelname[0]}{levelname[1]}{levelname[2]}[{asctime}]:{funcName}: {message}",
|
||||
style="{",
|
||||
)
|
||||
stream_handler.setFormatter(formatter)
|
||||
|
||||
|
||||
def setlevel(level, file=None):
|
||||
"""
|
||||
Change the level of logging. If `file` is specified, logs are written to this file.
|
||||
"""
|
||||
if isinstance(level, str):
|
||||
level = getattr(logging, level.upper())
|
||||
logger.setLevel(level)
|
||||
if file is not None:
|
||||
handler = logging.FileHandler(file)
|
||||
handler.setLevel(level)
|
||||
handler.setFormatter(formatter)
|
||||
logger.addHandler(handler)
|
||||
else:
|
||||
stream_handler.setLevel(level)
|
||||
275
src/mdevaluate/pbc.py
Normal file
275
src/mdevaluate/pbc.py
Normal file
@@ -0,0 +1,275 @@
|
||||
from collections import OrderedDict
|
||||
|
||||
import numpy as np
|
||||
|
||||
from scipy.spatial import cKDTree
|
||||
from itertools import product
|
||||
|
||||
from .logging import logger
|
||||
|
||||
|
||||
def pbc_diff_old(v1, v2, box):
|
||||
"""
|
||||
Calculate the difference of two vestors, considering optional boundary conditions.
|
||||
"""
|
||||
if box is None:
|
||||
v = v1 - v2
|
||||
else:
|
||||
v = v1 % box - v2 % box
|
||||
v -= (v > box / 2) * box
|
||||
v += (v < -box / 2) * box
|
||||
|
||||
return v
|
||||
|
||||
|
||||
def pbc_diff(v1, v2=None, box=None):
|
||||
if box is None:
|
||||
out = v1 - v2
|
||||
elif len(getattr(box, "shape", [])) == 1:
|
||||
out = pbc_diff_rect(v1, v2, box)
|
||||
elif len(getattr(box, "shape", [])) == 2:
|
||||
out = pbc_diff_tric(v1, v2, box)
|
||||
else:
|
||||
raise NotImplementedError("cannot handle box")
|
||||
return out
|
||||
|
||||
|
||||
def pbc_diff_rect(v1, v2, box):
|
||||
"""
|
||||
Calculate the difference of two vectors, considering periodic boundary conditions.
|
||||
"""
|
||||
if v2 is None:
|
||||
v = v1
|
||||
else:
|
||||
v = v1 - v2
|
||||
|
||||
s = v / box
|
||||
v = box * (s - s.round())
|
||||
return v
|
||||
|
||||
|
||||
def pbc_diff_tric(v1, v2=None, box=None):
|
||||
"""
|
||||
difference vector for arbitrary pbc
|
||||
|
||||
Args:
|
||||
box_matrix: CoordinateFrame.box
|
||||
"""
|
||||
if len(box.shape) == 1:
|
||||
box = np.diag(box)
|
||||
if v1.shape == (3,):
|
||||
v1 = v1.reshape((1, 3)) # quick 'n dirty
|
||||
if v2.shape == (3,):
|
||||
v2 = v2.reshape((1, 3))
|
||||
if box is not None:
|
||||
r3 = np.subtract(v1, v2)
|
||||
r2 = np.subtract(
|
||||
r3,
|
||||
(np.rint(np.divide(r3[:, 2], box[2][2])))[:, np.newaxis]
|
||||
* box[2][np.newaxis, :],
|
||||
)
|
||||
r1 = np.subtract(
|
||||
r2,
|
||||
(np.rint(np.divide(r2[:, 1], box[1][1])))[:, np.newaxis]
|
||||
* box[1][np.newaxis, :],
|
||||
)
|
||||
v = np.subtract(
|
||||
r1,
|
||||
(np.rint(np.divide(r1[:, 0], box[0][0])))[:, np.newaxis]
|
||||
* box[0][np.newaxis, :],
|
||||
)
|
||||
else:
|
||||
v = v1 - v2
|
||||
return v
|
||||
|
||||
|
||||
def pbc_dist(a1, a2, box=None):
|
||||
return ((pbc_diff(a1, a2, box) ** 2).sum(axis=1)) ** 0.5
|
||||
|
||||
|
||||
def pbc_extend(c, box):
|
||||
"""
|
||||
in: c is frame, box is frame.box
|
||||
out: all atoms in frame and their perio. image (shape => array(len(c)*27,3))
|
||||
"""
|
||||
c = np.asarray(c)
|
||||
if c.shape == (3,):
|
||||
c = c.reshape((1, 3)) # quick 'n dirty
|
||||
comb = np.array(
|
||||
[np.asarray(i) for i in product([0, -1, 1], [0, -1, 1], [0, -1, 1])]
|
||||
)
|
||||
b_matrices = comb[:, :, np.newaxis] * box[np.newaxis, :, :]
|
||||
b_vectors = b_matrices.sum(axis=1)[np.newaxis, :, :]
|
||||
return c[:, np.newaxis, :] + b_vectors
|
||||
|
||||
|
||||
def pbc_kdtree(v1, box, leafsize=32, compact_nodes=False, balanced_tree=False):
|
||||
"""
|
||||
kd_tree with periodic images
|
||||
box - whole matrix
|
||||
rest: optional optimization
|
||||
"""
|
||||
r0 = cKDTree(
|
||||
pbc_extend(v1, box).reshape((-1, 3)), leafsize, compact_nodes, balanced_tree
|
||||
)
|
||||
return r0
|
||||
|
||||
|
||||
def pbc_kdtree_query(v1, v2, box, n=1):
|
||||
"""
|
||||
kd_tree query with periodic images
|
||||
"""
|
||||
r0, r1 = pbc_kdtree(v1, box).query(v2, n)
|
||||
r1 = r1 // 27
|
||||
return r0, r1
|
||||
|
||||
|
||||
def pbc_backfold_rect(act_frame, box_matrix):
|
||||
"""
|
||||
mimics "trjconv ... -pbc atom -ur rect"
|
||||
|
||||
folds coords of act_frame in cuboid
|
||||
|
||||
"""
|
||||
af = np.asarray(act_frame)
|
||||
if af.shape == (3,):
|
||||
act_frame = act_frame.reshape((1, 3)) # quick 'n dirty
|
||||
b = box_matrix
|
||||
c = np.diag(b) / 2
|
||||
af = pbc_diff(np.zeros((1, 3)), af - c, b)
|
||||
return af + c
|
||||
|
||||
|
||||
def pbc_backfold_compact(act_frame, box_matrix):
|
||||
"""
|
||||
mimics "trjconv ... -pbc atom -ur compact"
|
||||
|
||||
folds coords of act_frame in wigner-seitz-cell (e.g. dodecahedron)
|
||||
"""
|
||||
c = act_frame
|
||||
box = box_matrix
|
||||
ctr = box.sum(0) / 2
|
||||
c = np.asarray(c)
|
||||
shape = c.shape
|
||||
if shape == (3,):
|
||||
c = c.reshape((1, 3))
|
||||
shape = (1, 3) # quick 'n dirty
|
||||
comb = np.array(
|
||||
[np.asarray(i) for i in product([0, -1, 1], [0, -1, 1], [0, -1, 1])]
|
||||
)
|
||||
b_matrices = comb[:, :, np.newaxis] * box[np.newaxis, :, :]
|
||||
b_vectors = b_matrices.sum(axis=1)[np.newaxis, :, :]
|
||||
sc = c[:, np.newaxis, :] + b_vectors
|
||||
w = np.argsort((((sc) - ctr) ** 2).sum(2), 1)[:, 0]
|
||||
return sc[range(shape[0]), w]
|
||||
|
||||
|
||||
def whole(frame):
|
||||
"""
|
||||
Apply ``-pbc whole`` to a CoordinateFrame.
|
||||
"""
|
||||
residue_ids = frame.coordinates.atom_subset.residue_ids
|
||||
box = frame.box.diagonal()
|
||||
|
||||
# make sure, residue_ids are sorted, then determine indices at which the res id changes
|
||||
# kind='stable' assures that any existent ordering is preserved
|
||||
logger.debug("Using first atom as reference for whole.")
|
||||
sort_ind = residue_ids.argsort(kind="stable")
|
||||
i = np.concatenate([[0], np.where(np.diff(residue_ids[sort_ind]) > 0)[0] + 1])
|
||||
coms = frame[sort_ind[i]][residue_ids - 1]
|
||||
|
||||
cor = np.zeros_like(frame)
|
||||
cd = frame - coms
|
||||
n, d = np.where(cd > box / 2 * 0.9)
|
||||
cor[n, d] = -box[d]
|
||||
n, d = np.where(cd < -box / 2 * 0.9)
|
||||
cor[n, d] = box[d]
|
||||
|
||||
return frame + cor
|
||||
|
||||
|
||||
NOJUMP_CACHESIZE = 128
|
||||
|
||||
|
||||
def nojump(frame, usecache=True):
|
||||
"""
|
||||
Return the nojump coordinates of a frame, based on a jump matrix.
|
||||
"""
|
||||
selection = frame.selection
|
||||
reader = frame.coordinates.frames
|
||||
if usecache:
|
||||
if not hasattr(reader, "_nojump_cache"):
|
||||
reader._nojump_cache = OrderedDict()
|
||||
# make sure to use absolute (non negative) index
|
||||
abstep = frame.step % len(frame.coordinates)
|
||||
i0s = [x for x in reader._nojump_cache if x <= abstep]
|
||||
if len(i0s) > 0:
|
||||
i0 = max(i0s)
|
||||
delta = reader._nojump_cache[i0]
|
||||
i0 += 1
|
||||
else:
|
||||
i0 = 0
|
||||
delta = 0
|
||||
|
||||
delta = (
|
||||
delta
|
||||
+ np.array(
|
||||
np.vstack(
|
||||
[m[i0 : abstep + 1].sum(axis=0) for m in reader.nojump_matrixes]
|
||||
).T
|
||||
)
|
||||
* frame.box.diagonal()
|
||||
)
|
||||
|
||||
reader._nojump_cache[abstep] = delta
|
||||
while len(reader._nojump_cache) > NOJUMP_CACHESIZE:
|
||||
reader._nojump_cache.popitem(last=False)
|
||||
delta = delta[selection, :]
|
||||
else:
|
||||
delta = (
|
||||
np.array(
|
||||
np.vstack(
|
||||
[
|
||||
m[: frame.step + 1, selection].sum(axis=0)
|
||||
for m in reader.nojump_matrixes
|
||||
]
|
||||
).T
|
||||
)
|
||||
* frame.box.diagonal()
|
||||
)
|
||||
return frame - delta
|
||||
|
||||
|
||||
def pbc_points(coordinates, box, thickness=0, index=False, shear=False):
|
||||
"""
|
||||
Returns the points their first periodic images. Does not fold
|
||||
them back into the box.
|
||||
Thickness 0 means all 27 boxes. Positive means the box+thickness.
|
||||
Negative values mean that less than the box is returned.
|
||||
index=True also returns the indices with indices of images being their
|
||||
originals values.
|
||||
"""
|
||||
if shear:
|
||||
box[2, 0] = box[2, 0] % box[0, 0]
|
||||
# Shifts the box images in the other directions if they moved more than
|
||||
# half the box length
|
||||
if box[2, 0] > box[2, 2] / 2:
|
||||
box[2, 0] = box[2, 0] - box[0, 0]
|
||||
|
||||
grid = np.array(
|
||||
[[i, j, k] for k in [-1, 0, 1] for j in [1, 0, -1] for i in [-1, 0, 1]]
|
||||
)
|
||||
indices = np.tile(np.arange(len(coordinates)), 27)
|
||||
coordinates_pbc = np.concatenate([coordinates + v @ box for v in grid], axis=0)
|
||||
size = np.diag(box)
|
||||
|
||||
if thickness != 0:
|
||||
mask = np.all(coordinates_pbc > -thickness, axis=1)
|
||||
coordinates_pbc = coordinates_pbc[mask]
|
||||
indices = indices[mask]
|
||||
mask = np.all(coordinates_pbc < size + thickness, axis=1)
|
||||
coordinates_pbc = coordinates_pbc[mask]
|
||||
indices = indices[mask]
|
||||
if index:
|
||||
return coordinates_pbc, indices
|
||||
return coordinates_pbc
|
||||
329
src/mdevaluate/reader.py
Executable file
329
src/mdevaluate/reader.py
Executable file
@@ -0,0 +1,329 @@
|
||||
"""
|
||||
Module that provides different readers for trajectory files.
|
||||
|
||||
It also provides a common interface layer between the file IO packages,
|
||||
namely pygmx and mdanalysis, and mdevaluate.
|
||||
"""
|
||||
from .checksum import checksum
|
||||
from .logging import logger
|
||||
from . import atoms
|
||||
|
||||
from functools import lru_cache
|
||||
from collections import namedtuple
|
||||
import os
|
||||
from os import path
|
||||
from array import array
|
||||
from zipfile import BadZipFile
|
||||
import builtins
|
||||
import re
|
||||
import itertools
|
||||
|
||||
import numpy as np
|
||||
import MDAnalysis as mdanalysis
|
||||
from scipy import sparse
|
||||
|
||||
|
||||
class NojumpError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class WrongTopologyError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def open_with_mdanalysis(
|
||||
topology, trajectory, cached=False, index_file=None, charges=None, masses=None
|
||||
):
|
||||
"""Open the topology and trajectory with mdanalysis."""
|
||||
uni = mdanalysis.Universe(topology, trajectory, convert_units=False)
|
||||
if cached is not False:
|
||||
if cached is True:
|
||||
maxsize = 128
|
||||
else:
|
||||
maxsize = cached
|
||||
reader = CachedReader(uni.trajectory, maxsize)
|
||||
else:
|
||||
reader = BaseReader(uni.trajectory)
|
||||
reader.universe = uni
|
||||
if topology.endswith(".tpr"):
|
||||
charges = uni.atoms.charges
|
||||
masses = uni.atoms.masses
|
||||
elif topology.endswith(".gro"):
|
||||
charges = charges
|
||||
masses = masses
|
||||
else:
|
||||
raise WrongTopologyError('Topology file should end with ".tpr" or ".gro"')
|
||||
indices = None
|
||||
if index_file:
|
||||
indices = load_indices(index_file)
|
||||
|
||||
atms = atoms.Atoms(
|
||||
np.stack((uni.atoms.resids, uni.atoms.resnames, uni.atoms.names), axis=1),
|
||||
charges=charges,
|
||||
masses=masses,
|
||||
indices=indices,
|
||||
).subset()
|
||||
|
||||
return atms, reader
|
||||
|
||||
|
||||
group_re = re.compile("\[ ([-+\w]+) \]")
|
||||
|
||||
|
||||
def load_indices(index_file):
|
||||
indices = {}
|
||||
index_array = None
|
||||
with open(index_file) as idx_file:
|
||||
for line in idx_file:
|
||||
m = group_re.search(line)
|
||||
if m is not None:
|
||||
group_name = m.group(1)
|
||||
index_array = indices.get(group_name, [])
|
||||
indices[group_name] = index_array
|
||||
else:
|
||||
elements = line.strip().split("\t")
|
||||
elements = [x.split(" ") for x in elements]
|
||||
elements = itertools.chain(*elements) # make a flat iterator
|
||||
elements = [x for x in elements if x != ""]
|
||||
index_array += [int(x) - 1 for x in elements]
|
||||
return indices
|
||||
|
||||
|
||||
def is_writeable(fname):
|
||||
"""Test if a directory is actually writeable, by writing a temporary file."""
|
||||
fdir = os.path.dirname(fname)
|
||||
ftmp = os.path.join(fdir, str(np.random.randint(999999999)))
|
||||
while os.path.exists(ftmp):
|
||||
ftmp = os.path.join(fdir, str(np.random.randint(999999999)))
|
||||
|
||||
if os.access(fdir, os.W_OK):
|
||||
try:
|
||||
with builtins.open(ftmp, "w"):
|
||||
pass
|
||||
os.remove(ftmp)
|
||||
return True
|
||||
except PermissionError:
|
||||
pass
|
||||
return False
|
||||
|
||||
|
||||
def nojump_load_filename(reader):
|
||||
directory, fname = path.split(reader.filename)
|
||||
full_path = path.join(directory, ".{}.nojump.npz".format(fname))
|
||||
if not is_writeable(directory):
|
||||
user_data_dir = os.path.join("/data/", os.environ["HOME"].split("/")[-1])
|
||||
full_path_fallback = os.path.join(
|
||||
os.path.join(user_data_dir, ".mdevaluate/nojump"),
|
||||
directory.lstrip("/"),
|
||||
".{}.nojump.npz".format(fname),
|
||||
)
|
||||
if os.path.exists(full_path_fallback):
|
||||
return full_path_fallback
|
||||
if os.path.exists(fname) or is_writeable(directory):
|
||||
return full_path
|
||||
else:
|
||||
user_data_dir = os.path.join("/data/", os.environ["HOME"].split("/")[-1])
|
||||
full_path_fallback = os.path.join(
|
||||
os.path.join(user_data_dir, ".mdevaluate/nojump"),
|
||||
directory.lstrip("/"),
|
||||
".{}.nojump.npz".format(fname),
|
||||
)
|
||||
return full_path
|
||||
|
||||
|
||||
def nojump_save_filename(reader):
|
||||
directory, fname = path.split(reader.filename)
|
||||
full_path = path.join(directory, ".{}.nojump.npz".format(fname))
|
||||
if is_writeable(directory):
|
||||
return full_path
|
||||
else:
|
||||
user_data_dir = os.path.join("/data/", os.environ["HOME"].split("/")[-1])
|
||||
full_path_fallback = os.path.join(
|
||||
os.path.join(user_data_dir, ".mdevaluate/nojump"),
|
||||
directory.lstrip("/"),
|
||||
".{}.nojump.npz".format(fname),
|
||||
)
|
||||
logger.info(
|
||||
"Saving nojump to {}, since original location is not writeable.".format(
|
||||
full_path_fallback
|
||||
)
|
||||
)
|
||||
os.makedirs(os.path.dirname(full_path_fallback), exist_ok=True)
|
||||
return full_path_fallback
|
||||
|
||||
|
||||
CSR_ATTRS = ("data", "indices", "indptr")
|
||||
NOJUMP_MAGIC = 2016
|
||||
|
||||
|
||||
def parse_jumps(trajectory):
|
||||
prev = trajectory[0].whole
|
||||
box = prev.box.diagonal()
|
||||
SparseData = namedtuple("SparseData", ["data", "row", "col"])
|
||||
jump_data = (
|
||||
SparseData(data=array("b"), row=array("l"), col=array("l")),
|
||||
SparseData(data=array("b"), row=array("l"), col=array("l")),
|
||||
SparseData(data=array("b"), row=array("l"), col=array("l")),
|
||||
)
|
||||
|
||||
for i, curr in enumerate(trajectory):
|
||||
if i % 500 == 0:
|
||||
logger.debug("Parse jumps Step: %d", i)
|
||||
delta = ((curr - prev) / box).round().astype(np.int8)
|
||||
prev = curr
|
||||
for d in range(3):
|
||||
(col,) = np.where(delta[:, d] != 0)
|
||||
jump_data[d].col.extend(col)
|
||||
jump_data[d].row.extend([i] * len(col))
|
||||
jump_data[d].data.extend(delta[col, d])
|
||||
|
||||
return jump_data
|
||||
|
||||
|
||||
def generate_nojump_matrixes(trajectory):
|
||||
"""
|
||||
Create the matrixes with pbc jumps for a trajectory.
|
||||
"""
|
||||
logger.info("generate Nojump Matrixes for: {}".format(trajectory))
|
||||
|
||||
jump_data = parse_jumps(trajectory)
|
||||
N = len(trajectory)
|
||||
M = len(trajectory[0])
|
||||
|
||||
trajectory.frames.nojump_matrixes = tuple(
|
||||
sparse.csr_matrix((np.array(m.data), (m.row, m.col)), shape=(N, M))
|
||||
for m in jump_data
|
||||
)
|
||||
save_nojump_matrixes(trajectory.frames)
|
||||
|
||||
|
||||
def save_nojump_matrixes(reader, matrixes=None):
|
||||
if matrixes is None:
|
||||
matrixes = reader.nojump_matrixes
|
||||
data = {"checksum": checksum(NOJUMP_MAGIC, checksum(reader))}
|
||||
for d, mat in enumerate(matrixes):
|
||||
data["shape"] = mat.shape
|
||||
for attr in CSR_ATTRS:
|
||||
data["{}_{}".format(attr, d)] = getattr(mat, attr)
|
||||
|
||||
np.savez(nojump_save_filename(reader), **data)
|
||||
|
||||
|
||||
def load_nojump_matrixes(reader):
|
||||
zipname = nojump_load_filename(reader)
|
||||
try:
|
||||
data = np.load(zipname, allow_pickle=True)
|
||||
except (AttributeError, BadZipFile, OSError):
|
||||
# npz-files can be corrupted, propably a bug for big arrays saved with savez_compressed?
|
||||
logger.info("Removing zip-File: %s", zipname)
|
||||
os.remove(nojump_load_filename(reader))
|
||||
return
|
||||
try:
|
||||
if data["checksum"] == checksum(NOJUMP_MAGIC, checksum(reader)):
|
||||
reader.nojump_matrixes = tuple(
|
||||
sparse.csr_matrix(
|
||||
tuple(data["{}_{}".format(attr, d)] for attr in CSR_ATTRS),
|
||||
shape=data["shape"],
|
||||
)
|
||||
for d in range(3)
|
||||
)
|
||||
logger.info(
|
||||
"Loaded Nojump Matrixes: {}".format(nojump_load_filename(reader))
|
||||
)
|
||||
else:
|
||||
logger.info("Invlaid Nojump Data: {}".format(nojump_load_filename(reader)))
|
||||
except KeyError:
|
||||
logger.info("Removing zip-File: %s", zipname)
|
||||
os.remove(nojump_load_filename(reader))
|
||||
return
|
||||
|
||||
|
||||
def correct_nojump_matrixes_for_whole(trajectory):
|
||||
reader = trajectory.frames
|
||||
frame = trajectory[0]
|
||||
box = frame.box.diagonal()
|
||||
cor = ((frame - frame.whole) / box).round().astype(np.int8)
|
||||
for d in range(3):
|
||||
reader.nojump_matrixes[d][0] = cor[:, d]
|
||||
save_nojump_matrixes(reader)
|
||||
|
||||
|
||||
def energy_reader(file):
|
||||
"""Reads a gromacs energy file with mdanalysis and returns an auxiliary file.
|
||||
Args:
|
||||
file: Filename of the energy file
|
||||
"""
|
||||
return mdanalysis.auxiliary.EDR.EDRReader(file)
|
||||
|
||||
|
||||
class BaseReader:
|
||||
"""Base class for trajectory readers."""
|
||||
|
||||
@property
|
||||
def filename(self):
|
||||
return self.rd.filename
|
||||
|
||||
@property
|
||||
def nojump_matrixes(self):
|
||||
if self._nojump_matrixes is None:
|
||||
raise NojumpError("Nojump Data not available: {}".format(self.filename))
|
||||
return self._nojump_matrixes
|
||||
|
||||
@nojump_matrixes.setter
|
||||
def nojump_matrixes(self, mats):
|
||||
self._nojump_matrixes = mats
|
||||
|
||||
def __init__(self, rd):
|
||||
"""
|
||||
Args:
|
||||
filename: Trajectory file to open.
|
||||
reindex (bool, opt.): If True, regenerate the index file if necessary.
|
||||
"""
|
||||
self.rd = rd
|
||||
self._nojump_matrixes = None
|
||||
if path.exists(nojump_load_filename(self)):
|
||||
load_nojump_matrixes(self)
|
||||
|
||||
def __getitem__(self, item):
|
||||
return self.rd[item]
|
||||
|
||||
def __len__(self):
|
||||
return len(self.rd)
|
||||
|
||||
def __checksum__(self):
|
||||
if hasattr(self.rd, "cache"):
|
||||
# Has an pygmx reader
|
||||
return checksum(self.filename, str(self.rd.cache))
|
||||
elif hasattr(self.rd, "_xdr"):
|
||||
# Has an mdanalysis reader
|
||||
cache = array("L", self.rd._xdr.offsets.tobytes())
|
||||
return checksum(self.filename, str(cache))
|
||||
|
||||
|
||||
class CachedReader(BaseReader):
|
||||
"""A reader that has a least-recently-used cache for frames."""
|
||||
|
||||
@property
|
||||
def cache_info(self):
|
||||
"""Get Information about the lru cache."""
|
||||
return self._get_item.cache_info()
|
||||
|
||||
def clear_cache(self):
|
||||
"""Clear the cache of the frames."""
|
||||
self._get_item.cache_clear()
|
||||
|
||||
def __init__(self, rd, maxsize):
|
||||
"""
|
||||
Args:
|
||||
filename (str): Trajectory file that will be opened.
|
||||
maxsize: Maximum size of the lru_cache or None for infinite cache.
|
||||
"""
|
||||
super().__init__(rd)
|
||||
self._get_item = lru_cache(maxsize=maxsize)(self._get_item)
|
||||
|
||||
def _get_item(self, item):
|
||||
"""Buffer function for lru_cache, since __getitem__ can not be cached."""
|
||||
return super().__getitem__(item)
|
||||
|
||||
def __getitem__(self, item):
|
||||
return self._get_item(item)
|
||||
386
src/mdevaluate/utils.py
Normal file
386
src/mdevaluate/utils.py
Normal file
@@ -0,0 +1,386 @@
|
||||
"""
|
||||
Collection of utility functions.
|
||||
"""
|
||||
import functools
|
||||
from types import FunctionType
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from .functions import kww, kww_1e
|
||||
from scipy.ndimage import uniform_filter1d
|
||||
|
||||
from scipy.interpolate import interp1d
|
||||
from scipy.optimize import curve_fit
|
||||
|
||||
from .logging import logger
|
||||
|
||||
|
||||
def five_point_stencil(xdata, ydata):
|
||||
"""
|
||||
Calculate the derivative dy/dx with a five point stencil.
|
||||
This algorith is only valid for equally distributed x values.
|
||||
|
||||
Args:
|
||||
xdata: x values of the data points
|
||||
ydata: y values of the data points
|
||||
|
||||
Returns:
|
||||
Values where the derivative was estimated and the value of the derivative at these points.
|
||||
|
||||
This algorithm is only valid for values on a regular grid, for unevenly distributed
|
||||
data it is only an approximation, albeit a quite good one.
|
||||
|
||||
See: https://en.wikipedia.org/wiki/Five-point_stencil
|
||||
"""
|
||||
return xdata[2:-2], (
|
||||
(-ydata[4:] + 8 * ydata[3:-1] - 8 * ydata[1:-3] + ydata[:-4])
|
||||
/ (3 * (xdata[4:] - xdata[:-4]))
|
||||
)
|
||||
|
||||
|
||||
def filon_fourier_transformation(
|
||||
time,
|
||||
correlation,
|
||||
frequencies=None,
|
||||
derivative="linear",
|
||||
imag=True,
|
||||
):
|
||||
"""
|
||||
Fourier-transformation for slow varrying functions. The filon algorithmus is
|
||||
described in detail in ref [Blochowicz]_, ch. 3.2.3.
|
||||
|
||||
Args:
|
||||
time: List of times where the correlation function was sampled.
|
||||
correlation: Values of the correlation function.
|
||||
frequencies (opt.):
|
||||
List of frequencies where the fourier transformation will be calculated.
|
||||
If None the frequencies will be choosen based on the input times.
|
||||
derivative (opt.):
|
||||
Approximation algorithmus for the derivative of the correlation function.
|
||||
Possible values are: 'linear', 'stencil' or a list of derivatives.
|
||||
imag (opt.): If imaginary part of the integral should be calculated.
|
||||
|
||||
If frequencies are not explicitly given they will be evenly placed on a log scale
|
||||
in the interval [1/tmax, 0.1/tmin] where tmin and tmax are the smallest respectively
|
||||
the biggest time (greater than 0) of the provided times. The frequencies are cut off
|
||||
at high values by one decade, since the fourier transformation deviates quite strongly
|
||||
in this regime.
|
||||
|
||||
.. [Blochowicz]
|
||||
T. Blochowicz, Broadband dielectric spectroscopy in neat and binary
|
||||
molecular glass formers, Ph.D. thesis, Universität Bayreuth (2003)
|
||||
"""
|
||||
if frequencies is None:
|
||||
f_min = 1 / max(time)
|
||||
f_max = 0.05 ** (1.2 - max(correlation)) / min(time[time > 0])
|
||||
frequencies = 2 * np.pi * np.logspace(np.log10(f_min), np.log10(f_max), num=60)
|
||||
frequencies.reshape(1, -1)
|
||||
|
||||
if derivative == "linear":
|
||||
derivative = (np.diff(correlation) / np.diff(time)).reshape(-1, 1)
|
||||
elif derivative == "stencil":
|
||||
_, derivative = five_point_stencil(time, correlation)
|
||||
time = ((time[2:-1] * time[1:-2]) ** 0.5).reshape(-1, 1)
|
||||
derivative = derivative.reshape(-1, 1)
|
||||
elif np.iterable(derivative) and len(time) is len(derivative):
|
||||
derivative.reshape(-1, 1)
|
||||
else:
|
||||
raise NotImplementedError(
|
||||
'Invalid approximation method {}. Possible values are "linear", "stencil" or a list of values.'
|
||||
)
|
||||
time = time.reshape(-1, 1)
|
||||
|
||||
integral = (
|
||||
np.cos(frequencies * time[1:]) - np.cos(frequencies * time[:-1])
|
||||
) / frequencies**2
|
||||
fourier = (derivative * integral).sum(axis=0)
|
||||
|
||||
if imag:
|
||||
integral = (
|
||||
1j
|
||||
* (np.sin(frequencies * time[1:]) - np.sin(frequencies * time[:-1]))
|
||||
/ frequencies**2
|
||||
)
|
||||
fourier = (
|
||||
fourier
|
||||
+ (derivative * integral).sum(axis=0)
|
||||
+ 1j * correlation[0] / frequencies
|
||||
)
|
||||
|
||||
return (
|
||||
frequencies.reshape(
|
||||
-1,
|
||||
),
|
||||
fourier,
|
||||
)
|
||||
|
||||
|
||||
def mask2indices(mask):
|
||||
"""
|
||||
Return the selected indices of an array mask.
|
||||
If the mask is two-dimensional, the indices will be calculated for the second axis.
|
||||
|
||||
Example:
|
||||
>>> mask2indices([True, False, True, False])
|
||||
array([0, 2])
|
||||
>>> mask2indices([[True, True, False], [True, False, True]])
|
||||
array([[0, 1], [0, 2]])
|
||||
"""
|
||||
mask = np.array(mask)
|
||||
if len(mask.shape) == 1:
|
||||
indices = np.where(mask)
|
||||
else:
|
||||
indices = np.array([np.where(m) for m in mask])
|
||||
return indices
|
||||
|
||||
|
||||
def superpose(x1, y1, x2, y2, N=100, damping=1.0):
|
||||
if x2[0] == 0:
|
||||
x2 = x2[1:]
|
||||
y2 = y2[1:]
|
||||
|
||||
reg1 = x1 < x2[0]
|
||||
reg2 = x2 > x1[-1]
|
||||
x_ol = np.logspace(
|
||||
np.log10(max(x1[~reg1][0], x2[~reg2][0]) + 0.001),
|
||||
np.log10(min(x1[~reg1][-1], x2[~reg2][-1]) - 0.001),
|
||||
(sum(~reg1) + sum(~reg2)) / 2,
|
||||
)
|
||||
|
||||
def w(x):
|
||||
A = x_ol.min()
|
||||
B = x_ol.max()
|
||||
return (np.log10(B / x) / np.log10(B / A)) ** damping
|
||||
|
||||
xdata = np.concatenate((x1[reg1], x_ol, x2[reg2]))
|
||||
y1_interp = interp1d(x1[~reg1], y1[~reg1])
|
||||
y2_interp = interp1d(x2[~reg2], y2[~reg2])
|
||||
ydata = np.concatenate(
|
||||
(
|
||||
y1[x1 < x2.min()],
|
||||
w(x_ol) * y1_interp(x_ol) + (1 - w(x_ol)) * y2_interp(x_ol),
|
||||
y2[x2 > x1.max()],
|
||||
)
|
||||
)
|
||||
return xdata, ydata
|
||||
|
||||
|
||||
def runningmean(data, nav):
|
||||
"""
|
||||
Compute the running mean of a 1-dimenional array.
|
||||
|
||||
Args:
|
||||
data: Input data of shape (N, )
|
||||
nav: Number of points over which the data will be averaged
|
||||
|
||||
Returns:
|
||||
Array of shape (N-(nav-1), )
|
||||
"""
|
||||
return np.convolve(data, np.ones((nav,)) / nav, mode="valid")
|
||||
|
||||
|
||||
def moving_average(A, n=3):
|
||||
"""
|
||||
Compute the running mean of an array.
|
||||
Uses the second axis if it is of higher dimensionality.
|
||||
|
||||
Args:
|
||||
data: Input data of shape (N, )
|
||||
n: Number of points over which the data will be averaged
|
||||
|
||||
Returns:
|
||||
Array of shape (N-(n-1), )
|
||||
|
||||
Supports 2D-Arrays.
|
||||
Slower than runningmean for small n but faster for large n.
|
||||
"""
|
||||
k1 = int(n / 2)
|
||||
k2 = int((n - 1) / 2)
|
||||
if k2 == 0:
|
||||
if A.ndim > 1:
|
||||
return uniform_filter1d(A, n)[:, k1:]
|
||||
return uniform_filter1d(A, n)[k1:]
|
||||
if A.ndim > 1:
|
||||
return uniform_filter1d(A, n)[:, k1:-k2]
|
||||
return uniform_filter1d(A, n)[k1:-k2]
|
||||
|
||||
|
||||
def coherent_sum(func, coord_a, coord_b):
|
||||
"""
|
||||
Perform a coherent sum over two arrays :math:`A, B`.
|
||||
|
||||
.. math::
|
||||
\\frac{1}{N_A N_B}\\sum_i\\sum_j f(A_i, B_j)
|
||||
|
||||
For numpy arrays this is equal to::
|
||||
|
||||
N, d = x.shape
|
||||
M, d = y.shape
|
||||
coherent_sum(f, x, y) == f(x.reshape(N, 1, d), x.reshape(1, M, d)).sum()
|
||||
|
||||
Args:
|
||||
func: The function is called for each two items in both arrays, this should return a scalar value.
|
||||
coord_a, coord_b: The two arrays.
|
||||
|
||||
"""
|
||||
if isinstance(func, FunctionType):
|
||||
func = numba.jit(func, nopython=True, cache=True)
|
||||
|
||||
def cohsum(coord_a, coord_b):
|
||||
res = 0
|
||||
for i in range(len(coord_a)):
|
||||
for j in range(len(coord_b)):
|
||||
res += func(coord_a[i], coord_b[j])
|
||||
return res
|
||||
|
||||
return cohsum(coord_a, coord_b)
|
||||
|
||||
|
||||
def coherent_histogram(func, coord_a, coord_b, bins, distinct=False):
|
||||
"""
|
||||
Compute a coherent histogram over two arrays, equivalent to coherent_sum.
|
||||
For numpy arrays ofthis is equal to::
|
||||
|
||||
N, d = x.shape
|
||||
M, d = y.shape
|
||||
bins = np.arange(1, 5, 0.1)
|
||||
coherent_histogram(f, x, y, bins) == histogram(f(x.reshape(N, 1, d), x.reshape(1, M, d)), bins=bins)
|
||||
|
||||
Args:
|
||||
func: The function is called for each two items in both arrays, this should return a scalar value.
|
||||
coord_a, coord_b: The two arrays.
|
||||
bins: The bins used for the histogram must be distributed regular on a linear scale.
|
||||
|
||||
"""
|
||||
if isinstance(func, FunctionType):
|
||||
func = numba.jit(func, nopython=True, cache=True)
|
||||
|
||||
assert np.isclose(
|
||||
np.diff(bins).mean(), np.diff(bins)
|
||||
).all(), "A regular distribution of bins is required."
|
||||
hmin = bins[0]
|
||||
hmax = bins[-1]
|
||||
N = len(bins) - 1
|
||||
dh = (hmax - hmin) / N
|
||||
|
||||
def cohsum(coord_a, coord_b):
|
||||
res = np.zeros((N,))
|
||||
for i in range(len(coord_a)):
|
||||
for j in range(len(coord_b)):
|
||||
if not (distinct and i == j):
|
||||
h = func(coord_a[i], coord_b[j])
|
||||
if hmin <= h < hmax:
|
||||
res[int((h - hmin) / dh)] += 1
|
||||
return res
|
||||
|
||||
return cohsum(coord_a, coord_b)
|
||||
|
||||
|
||||
def Sq_from_gr(r, gr, q, ρ):
|
||||
r"""
|
||||
Compute the static structure factor as fourier transform of the pair correlation function. [Yarnell]_
|
||||
|
||||
.. math::
|
||||
S(q) - 1 = \\frac{4\\pi \\rho}{q}\\int\\limits_0^\\infty (g(r) - 1)\\,r \\sin(qr) dr
|
||||
|
||||
Args:
|
||||
r: Radii of the pair correlation function
|
||||
gr: Values of the pair correlation function
|
||||
q: List of q values
|
||||
ρ: Average number density
|
||||
|
||||
.. [Yarnell]
|
||||
Yarnell, J. L., Katz, M. J., Wenzel, R. G., & Koenig, S. H. (1973). Physical Review A, 7(6), 2130–2144.
|
||||
http://doi.org/10.1017/CBO9781107415324.004
|
||||
|
||||
"""
|
||||
ydata = ((gr - 1) * r).reshape(-1, 1) * np.sin(r.reshape(-1, 1) * q.reshape(1, -1))
|
||||
return np.trapz(x=r, y=ydata, axis=0) * (4 * np.pi * ρ / q) + 1
|
||||
|
||||
|
||||
def Fqt_from_Grt(data, q):
|
||||
"""
|
||||
Calculate the ISF from the van Hove function for a given q value by fourier transform.
|
||||
|
||||
.. math::
|
||||
F_q(t) = \\int\\limits_0^\\infty dr \\; G(r, t) \\frac{\\sin(qr)}{qr}
|
||||
|
||||
Args:
|
||||
data:
|
||||
Input data can be a pandas dataframe with columns 'r', 'time' and 'G'
|
||||
or an array of shape (N, 3), of tuples (r, t, G).
|
||||
q: Value of q.
|
||||
|
||||
Returns:
|
||||
If input data was a dataframe the result will be returned as one too, else two arrays
|
||||
will be returned, which will contain times and values of Fq(t) respectively.
|
||||
|
||||
"""
|
||||
if isinstance(data, pd.DataFrame):
|
||||
df = data.copy()
|
||||
else:
|
||||
df = pd.DataFrame(data, columns=["r", "time", "G"])
|
||||
df["isf"] = df["G"] * np.sinc(q / np.pi * df["r"])
|
||||
isf = df.groupby("time")["isf"].sum()
|
||||
if isinstance(data, pd.DataFrame):
|
||||
return pd.DataFrame({"time": isf.index, "isf": isf.values, "q": q})
|
||||
else:
|
||||
return isf.index, isf.values
|
||||
|
||||
|
||||
def singledispatchmethod(func):
|
||||
"""A decorator to define a genric instance method, analogue to functools.singledispatch."""
|
||||
dispatcher = functools.singledispatch(func)
|
||||
|
||||
def wrapper(*args, **kw):
|
||||
return dispatcher.dispatch(args[1].__class__)(*args, **kw)
|
||||
|
||||
wrapper.register = dispatcher.register
|
||||
functools.update_wrapper(wrapper, func)
|
||||
return wrapper
|
||||
|
||||
|
||||
def histogram(data, bins):
|
||||
"""Compute the histogram of the given data. Uses numpy.bincount function, if possible."""
|
||||
dbins = np.diff(bins)
|
||||
dx = dbins.mean()
|
||||
if bins.min() == 0 and dbins.std() < 1e-6:
|
||||
logger.debug("Using numpy.bincount for histogramm compuation.")
|
||||
hist = np.bincount((data // dx).astype(int), minlength=len(dbins))[: len(dbins)]
|
||||
else:
|
||||
hist = np.histogram(data, bins=bins)[0]
|
||||
|
||||
return hist, runningmean(bins, 2)
|
||||
|
||||
|
||||
def quick1etau(t, C, n=7):
|
||||
"""
|
||||
Estimate the time for a correlation function that goes from 1 to 0 to decay to 1/e.
|
||||
|
||||
If successful, returns tau as fine interpolation with a kww fit.
|
||||
The data is reduce to points around 1/e to remove short and long times from the kww fit!
|
||||
t is the time
|
||||
C is C(t) the correlation function
|
||||
n is the minimum number of points around 1/e required
|
||||
"""
|
||||
# first rough estimate, the closest time. This is returned if the interpolation fails!
|
||||
tau_est = t[np.argmin(np.fabs(C - np.exp(-1)))]
|
||||
# reduce the data to points around 1/e
|
||||
k = 0.1
|
||||
mask = (C < np.exp(-1) + k) & (C > np.exp(-1) - k)
|
||||
while np.sum(mask) < n:
|
||||
k += 0.01
|
||||
mask = (C < np.exp(-1) + k) & (C > np.exp(-1) - k)
|
||||
if k + np.exp(-1) > 1.0:
|
||||
break
|
||||
# if enough points are found, try a curve fit, else and in case of failing keep using the estimate
|
||||
if np.sum(mask) >= n:
|
||||
try:
|
||||
with np.errstate(invalid="ignore"):
|
||||
fit, _ = curve_fit(
|
||||
kww, t[mask], C[mask], p0=[0.9, tau_est, 0.9], maxfev=100000
|
||||
)
|
||||
tau_est = kww_1e(*fit)
|
||||
except:
|
||||
pass
|
||||
return tau_est
|
||||
Reference in New Issue
Block a user