Used black for formatting.

This commit is contained in:
sebastiankloth 2023-06-27 10:26:23 +02:00
parent a2164507d5
commit ef125c2a89
14 changed files with 860 additions and 553 deletions

View File

@ -11,11 +11,19 @@ from . import autosave
from . import reader
from .logging import logger
__version__ = '23.4'
__version__ = "23.6"
def open(directory='', topology='*.tpr', trajectory='*.xtc', cached=False,
nojump=False, index_file=None, charges=None, masses=None):
def open(
directory="",
topology="*.tpr",
trajectory="*.xtc",
cached=False,
nojump=False,
index_file=None,
charges=None,
masses=None,
):
"""
Open a simulation from a directory.
@ -51,8 +59,8 @@ def open(directory='', topology='*.tpr', trajectory='*.xtc', cached=False,
"""
top_glob = glob(os.path.join(directory, topology), recursive=True)
if top_glob is not None and len(top_glob) == 1:
top_file, = top_glob
logger.info('Loading topology: {}'.format(top_file))
(top_file,) = top_glob
logger.info("Loading topology: {}".format(top_file))
if index_file is not None:
index_glob = glob(os.path.join(directory, index_file), recursive=True)
if index_glob is not None:
@ -60,18 +68,22 @@ def open(directory='', topology='*.tpr', trajectory='*.xtc', cached=False,
else:
index_file = None
else:
raise FileNotFoundError('Topology file could not be identified.')
raise FileNotFoundError("Topology file could not be identified.")
traj_glob = glob(os.path.join(directory, trajectory), recursive=True)
if traj_glob is not None and len(traj_glob) == 1:
traj_file = traj_glob[0]
logger.info('Loading trajectory: {}'.format(traj_file))
logger.info("Loading trajectory: {}".format(traj_file))
else:
raise FileNotFoundError('Trajectory file could not be identified.')
raise FileNotFoundError("Trajectory file could not be identified.")
atom_set, frames = reader.open_with_mdanalysis(
top_file, traj_file, cached=cached, index_file=index_file,
charges=charges, masses=masses
top_file,
traj_file,
cached=cached,
index_file=index_file,
charges=charges,
masses=masses,
)
coords = coordinates.Coordinates(frames, atom_subset=atom_set)
if nojump:
@ -81,11 +93,12 @@ def open(directory='', topology='*.tpr', trajectory='*.xtc', cached=False,
reader.generate_nojump_matrixes(coords)
return coords
def open_energy(file, energies=None):
"""Reads an gromacs energy file and output the data in a pandas DataFrame.
Args:
file: Filename of the energy file
energies (opt.): Specify energies to extract from the energy file
Args:
file: Filename of the energy file
energies (opt.): Specify energies to extract from the energy file
"""
df = reader.energy_reader(file, energies=energies)
return df

View File

@ -6,20 +6,23 @@ from .checksum import checksum
import numpy as np
import scipy
if scipy.version.version >= '0.17.0':
if scipy.version.version >= "0.17.0":
from scipy.spatial import cKDTree as KDTree
else:
from scipy.spatial import KDTree
def compare_regex(list, exp):
"""
Compare a list of strings with a regular expression.
"""
if not exp.endswith('$'):
exp += '$'
if not exp.endswith("$"):
exp += "$"
regex = re.compile(exp)
return np.array([regex.match(s) is not None for s in list])
class Atoms:
"""
Basic container class for atom information.
@ -62,8 +65,7 @@ class AtomMismatch(Exception):
class AtomSubset:
def __init__(self, atoms, selection=None, description=''):
def __init__(self, atoms, selection=None, description=""):
"""
Args:
atoms: Base atom object
@ -71,7 +73,7 @@ class AtomSubset:
description (opt.): Descriptive string of the subset.
"""
if selection is None:
selection = np.ones(len(atoms), dtype='bool')
selection = np.ones(len(atoms), dtype="bool")
self.selection = selection
self.atoms = atoms
self.description = description
@ -92,26 +94,28 @@ class AtomSubset:
new_subset &= AtomSubset(
self.atoms,
selection=compare_regex(self.atoms.atom_names, atom_name),
description=atom_name
description=atom_name,
)
if residue_name is not None:
new_subset &= AtomSubset(
self.atoms,
selection=compare_regex(self.atoms.residue_names, residue_name),
description=residue_name
description=residue_name,
)
if residue_id is not None:
if np.iterable(residue_id):
selection = np.zeros(len(self.selection), dtype='bool')
selection = np.zeros(len(self.selection), dtype="bool")
selection[np.in1d(self.atoms.residue_ids, residue_id)] = True
new_subset &= AtomSubset(self.atoms, selection)
else:
new_subset &= AtomSubset(self.atoms, self.atoms.residue_ids == residue_id)
new_subset &= AtomSubset(
self.atoms, self.atoms.residue_ids == residue_id
)
if indices is not None:
selection = np.zeros(len(self.selection), dtype='bool')
selection = np.zeros(len(self.selection), dtype="bool")
selection[indices] = True
new_subset &= AtomSubset(self.atoms, selection)
return new_subset
@ -142,15 +146,15 @@ class AtomSubset:
def __and__(self, other):
if self.atoms != other.atoms:
raise AtomMismatch
selection = (self.selection & other.selection)
description = '{}_{}'.format(self.description, other.description).strip('_')
selection = self.selection & other.selection
description = "{}_{}".format(self.description, other.description).strip("_")
return AtomSubset(self.atoms, selection, description)
def __or__(self, other):
if self.atoms != other.atoms:
raise AtomMismatch
selection = (self.selection | other.selection)
description = '{}_{}'.format(self.description, other.description).strip('_')
selection = self.selection | other.selection
description = "{}_{}".format(self.description, other.description).strip("_")
return AtomSubset(self.atoms, selection, description)
def __invert__(self):
@ -158,14 +162,20 @@ class AtomSubset:
return AtomSubset(self.atoms, selection, self.description)
def __repr__(self):
return 'Subset of Atoms ({} of {})'.format(len(self.atoms.residue_names[self.selection]),
len(self.atoms))
return "Subset of Atoms ({} of {})".format(
len(self.atoms.residue_names[self.selection]), len(self.atoms)
)
@property
def summary(self):
return "\n".join(["{}{} {}".format(resid, resname, atom_names)
for resid, resname, atom_names in zip(self.residue_ids, self.residue_names, self.atom_names)
])
return "\n".join(
[
"{}{} {}".format(resid, resname, atom_names)
for resid, resname, atom_names in zip(
self.residue_ids, self.residue_names, self.atom_names
)
]
)
def __checksum__(self):
return checksum(self.description)
@ -191,26 +201,23 @@ def gyration_radius(position):
\rigth)^{\frac{1}{2}}
"""
gyration_radii = np.array([])
for resid in np.unique(position.residue_ids):
pos = position.whole[position.residue_ids==resid]
mass = position.masses[position.residue_ids==resid][:,np.newaxis]
COM = center_of_mass(pos,mass)
r_sq = ((pbc_diff(pos,COM,pos.box.diagonal()))**2).sum(1)[:,np.newaxis]
g_radius = ((r_sq*mass).sum()/mass.sum())**(0.5)
pos = position.whole[position.residue_ids == resid]
mass = position.masses[position.residue_ids == resid][:, np.newaxis]
COM = center_of_mass(pos, mass)
r_sq = ((pbc_diff(pos, COM, pos.box.diagonal())) ** 2).sum(1)[:, np.newaxis]
g_radius = ((r_sq * mass).sum() / mass.sum()) ** (0.5)
gyration_radii = np.append(gyration_radii,g_radius)
gyration_radii = np.append(gyration_radii, g_radius)
return gyration_radii
def layer_of_atoms(atoms,
thickness,
plane_offset=np.array([0, 0, 0]),
plane_normal=np.array([1, 0, 0])):
def layer_of_atoms(
atoms, thickness, plane_offset=np.array([0, 0, 0]), plane_normal=np.array([1, 0, 0])
):
p_ = atoms - plane_offset
distance = np.dot(p_, plane_normal)
@ -227,6 +234,7 @@ def distance_to_atoms(ref, atoms, box=None):
out[i] = np.sqrt(diff)
return out
def distance_to_atoms_cKDtree(ref, atoms, box=None, thickness=None):
"""
Get the minimal distance from atoms to ref.
@ -236,10 +244,10 @@ def distance_to_atoms_cKDtree(ref, atoms, box=None, thickness=None):
If box is not given then periodic boundary conditions are not applied!
"""
if thickness == None:
thickness = box/5
thickness = box / 5
if box is not None:
start_coords = np.copy(atoms)%box
all_frame_coords = pbc_points(ref, box, thickness = thickness)
start_coords = np.copy(atoms) % box
all_frame_coords = pbc_points(ref, box, thickness=thickness)
else:
start_coords = atoms
all_frame_coords = ref
@ -248,7 +256,13 @@ def distance_to_atoms_cKDtree(ref, atoms, box=None, thickness=None):
return tree.query(start_coords)[0]
def next_neighbors(atoms, query_atoms=None, number_of_neighbors=1, distance_upper_bound=np.inf, distinct=False):
def next_neighbors(
atoms,
query_atoms=None,
number_of_neighbors=1,
distance_upper_bound=np.inf,
distinct=False,
):
"""
Find the N next neighbors of a set of atoms.
@ -265,6 +279,9 @@ def next_neighbors(atoms, query_atoms=None, number_of_neighbors=1, distance_uppe
query_atoms = atoms
elif not distinct:
dnn = 1
dist, indices = tree.query(query_atoms, number_of_neighbors + dnn,
distance_upper_bound=distance_upper_bound)
dist, indices = tree.query(
query_atoms,
number_of_neighbors + dnn,
distance_upper_bound=distance_upper_bound,
)
return indices[:, dnn:]

View File

@ -9,7 +9,7 @@ from .logging import logger
autosave_directory = None
load_autosave_data = False
verbose_print = True
user_autosave_directory = os.path.join(os.environ['HOME'], '.mdevaluate/autosave')
user_autosave_directory = os.path.join(os.environ["HOME"], ".mdevaluate/autosave")
def notify(msg):
@ -33,7 +33,7 @@ def enable(dir, load_data=True, verbose=True):
# os.makedirs(absolute, exist_ok=True)
autosave_directory = dir
load_autosave_data = load_data
notify('Enabled autosave in directory: {}'.format(autosave_directory))
notify("Enabled autosave in directory: {}".format(autosave_directory))
def disable():
@ -58,6 +58,7 @@ class disabled:
# After the context is exited, autosave will work as before.
"""
def __enter__(self):
self._autosave_directory = autosave_directory
disable()
@ -76,8 +77,12 @@ def get_directory(reader):
except PermissionError:
pass
if not os.access(savedir, os.W_OK):
savedir = os.path.join(user_autosave_directory, savedir.lstrip('/'))
logger.info('Switched autosave directory to {}, since original location is not writeable.'.format(savedir))
savedir = os.path.join(user_autosave_directory, savedir.lstrip("/"))
logger.info(
"Switched autosave directory to {}, since original location is not writeable.".format(
savedir
)
)
os.makedirs(savedir, exist_ok=True)
return savedir
@ -86,17 +91,17 @@ def get_filename(function, checksum, description, *args):
"""Get the autosave filename for a specific function call."""
func_desc = function.__name__
for arg in args:
if hasattr(arg, '__name__'):
func_desc += '_{}'.format(arg.__name__)
if hasattr(arg, "__name__"):
func_desc += "_{}".format(arg.__name__)
elif isinstance(arg, functools.partial):
func_desc += '_{}'.format(arg.func.__name__)
func_desc += "_{}".format(arg.func.__name__)
if hasattr(arg, 'frames'):
if hasattr(arg, "frames"):
savedir = get_directory(arg.frames)
if hasattr(arg, 'description') and arg.description != '':
description += '_{}'.format(arg.description)
filename = '{}_{}.npz'.format(func_desc.strip('_'), description.strip('_'))
if hasattr(arg, "description") and arg.description != "":
description += "_{}".format(arg.description)
filename = "{}_{}.npz".format(func_desc.strip("_"), description.strip("_"))
return os.path.join(savedir, filename)
@ -105,14 +110,14 @@ def verify_file(filename, checksum):
file_checksum = 0
if os.path.exists(filename):
data = np.load(filename, allow_pickle=True)
if 'checksum' in data:
file_checksum = data['checksum']
if "checksum" in data:
file_checksum = data["checksum"]
return file_checksum == checksum
def save_data(filename, checksum, data):
"""Save data and checksum to a file."""
notify('Saving result to file: {}'.format(filename))
notify("Saving result to file: {}".format(filename))
try:
data = np.array(data)
except ValueError:
@ -125,13 +130,13 @@ def save_data(filename, checksum, data):
def load_data(filename):
"""Load data from a npz file."""
notify('Loading result from file: {}'.format(filename))
notify("Loading result from file: {}".format(filename))
fdata = np.load(filename, allow_pickle=True)
if 'data' in fdata:
return fdata['data']
if "data" in fdata:
return fdata["data"]
else:
data = tuple(fdata[k] for k in sorted(fdata) if ('arr' in k))
save_data(filename, fdata['checksum'], data)
data = tuple(fdata[k] for k in sorted(fdata) if ("arr" in k))
save_data(filename, fdata["checksum"], data)
return data
@ -146,6 +151,7 @@ def autosave_data(nargs, kwargs_keys=None, version=None):
An optional version number of the decorated function, which replaces the checksum of
the function code, hence the checksum does not depend on the function code.
"""
def decorator_function(function):
# make sure too include names of positional arguments in kwargs_keys,
# sice otherwise they will be ignored if passed via keyword.
@ -154,8 +160,8 @@ def autosave_data(nargs, kwargs_keys=None, version=None):
@functools.wraps(function)
def autosave(*args, **kwargs):
description = kwargs.pop('description', '')
autoload = kwargs.pop('autoload', True) and load_autosave_data
description = kwargs.pop("description", "")
autoload = kwargs.pop("autoload", True) and load_autosave_data
if autosave_directory is not None:
relevant_args = list(args[:nargs])
if kwargs_keys is not None:
@ -170,7 +176,9 @@ def autosave_data(nargs, kwargs_keys=None, version=None):
legacy_csum = checksum(function, *relevant_args)
filename = get_filename(function, csum, description, *relevant_args)
if autoload and (verify_file(filename, csum) or verify_file(filename, legacy_csum)):
if autoload and (
verify_file(filename, csum) or verify_file(filename, legacy_csum)
):
result = load_data(filename)
else:
result = function(*args, **kwargs)
@ -179,5 +187,7 @@ def autosave_data(nargs, kwargs_keys=None, version=None):
result = function(*args, **kwargs)
return result
return autosave
return decorator_function

View File

@ -1,4 +1,3 @@
import functools
import hashlib
from .logging import logger
@ -14,6 +13,7 @@ SALT = 42
def version(version_nr, calls=[]):
"""Function decorator that assigns a custom checksum to a function."""
def decorator(func):
cs = checksum(func.__name__, version_nr, *calls)
func.__checksum__ = lambda: cs
@ -23,18 +23,19 @@ def version(version_nr, calls=[]):
return func(*args, **kwargs)
return wrapped
return decorator
def strip_comments(s):
"""Strips comment lines and docstring from Python source string."""
o = ''
o = ""
in_docstring = False
for l in s.split('\n'):
if l.strip().startswith(('#', '"', "'")) or in_docstring:
for l in s.split("\n"):
if l.strip().startswith(("#", '"', "'")) or in_docstring:
in_docstring = l.strip().startswith(('"""', "'''")) + in_docstring == 1
continue
o += l + '\n'
o += l + "\n"
return o
@ -58,8 +59,8 @@ def checksum(*args, csum=None):
csum.update(str(SALT).encode())
for arg in args:
if hasattr(arg, '__checksum__'):
logger.debug('Checksum via __checksum__: %s', str(arg))
if hasattr(arg, "__checksum__"):
logger.debug("Checksum via __checksum__: %s", str(arg))
csum.update(str(arg.__checksum__()).encode())
elif isinstance(arg, bytes):
csum.update(arg)
@ -74,7 +75,7 @@ def checksum(*args, csum=None):
if v is not arg:
checksum(v, csum=csum)
elif isinstance(arg, functools.partial):
logger.debug('Checksum via partial for %s', str(arg))
logger.debug("Checksum via partial for %s", str(arg))
checksum(arg.func, csum=csum)
for x in arg.args:
checksum(x, csum=csum)
@ -84,8 +85,7 @@ def checksum(*args, csum=None):
elif isinstance(arg, np.ndarray):
csum.update(arg.tobytes())
else:
logger.debug('Checksum via str for %s', str(arg))
logger.debug("Checksum via str for %s", str(arg))
csum.update(str(arg).encode())
return int.from_bytes(csum.digest(), 'big')
return int.from_bytes(csum.digest(), "big")

View File

@ -6,30 +6,32 @@ from . import open as md_open
def run(*args, **kwargs):
parser = argparse.ArgumentParser()
parser.add_argument(
'xtcfile',
help='The xtc file to index.',
"xtcfile",
help="The xtc file to index.",
)
parser.add_argument(
'--tpr',
help='The tprfile of the trajectory.',
dest='tpr', default=None
"--tpr", help="The tprfile of the trajectory.", dest="tpr", default=None
)
parser.add_argument(
'--nojump',
help='Generate Nojump Matrices, requires a tpr file.',
dest='nojump', action='store_true', default=False
"--nojump",
help="Generate Nojump Matrices, requires a tpr file.",
dest="nojump",
action="store_true",
default=False,
)
parser.add_argument(
'--debug',
help='Set logging level to debug.',
dest='debug', action='store_true', default=False
"--debug",
help="Set logging level to debug.",
dest="debug",
action="store_true",
default=False,
)
args = parser.parse_args()
if args.debug:
logging.setlevel('DEBUG')
logging.setlevel("DEBUG")
md_open('', trajectory=args.xtcfile, topology=args.tpr, nojump=args.nojump)
md_open("", trajectory=args.xtcfile, topology=args.tpr, nojump=args.nojump)
if __name__ == '__main__':
if __name__ == "__main__":
run()

View File

@ -31,14 +31,14 @@ def rotate_axis(coords, axis):
# return theta/pi, rotation_axis
ux, uy, uz = rotation_axis
cross_matrix = np.array([
[0, -uz, uy],
[uz, 0, -ux],
[-uy, ux, 0]
])
rotation_matrix = np.cos(theta) * np.identity(len(axis)) \
+ (1 - np.cos(theta)) * rotation_axis.reshape(-1, 1) @ rotation_axis.reshape(1, -1) \
cross_matrix = np.array([[0, -uz, uy], [uz, 0, -ux], [-uy, ux, 0]])
rotation_matrix = (
np.cos(theta) * np.identity(len(axis))
+ (1 - np.cos(theta))
* rotation_axis.reshape(-1, 1)
@ rotation_axis.reshape(1, -1)
+ np.sin(theta) * cross_matrix
)
if len(coords.shape) == 2:
rotated = np.array([rotation_matrix @ xyz for xyz in coords])
@ -54,12 +54,12 @@ def spherical_radius(frame, origin=None):
"""
if origin is None:
origin = frame.box.diagonal() / 2
return ((frame - origin)**2).sum(axis=-1)**0.5
return ((frame - origin) ** 2).sum(axis=-1) ** 0.5
def polar_coordinates(x, y):
"""Convert cartesian to polar coordinates."""
radius = (x**2 + y**2)**0.5
radius = (x**2 + y**2) ** 0.5
phi = np.arctan2(y, x)
return radius, phi
@ -67,7 +67,7 @@ def polar_coordinates(x, y):
def spherical_coordinates(x, y, z):
"""Convert cartesian to spherical coordinates."""
xy, phi = polar_coordinates(x, y)
radius = (x**2 + y**2 + z**2)**0.5
radius = (x**2 + y**2 + z**2) ** 0.5
theta = np.arccos(z / radius)
return radius, phi, theta
@ -103,8 +103,7 @@ def spatial_selector(frame, transform, rmin, rmax):
class CoordinateFrame(np.ndarray):
_known_modes = ('pbc', 'whole', 'nojump')
_known_modes = ("pbc", "whole", "nojump")
@property
def box(self):
@ -149,28 +148,41 @@ class CoordinateFrame(np.ndarray):
@property
def whole(self):
frame = whole(self)
frame.mode = 'whole'
frame.mode = "whole"
return frame
@property
def pbc(self):
frame = self % self.box.diagonal()
frame.mode = 'pbc'
frame.mode = "pbc"
return frame
@property
def nojump(self):
if self.mode != 'nojump':
if self.mode != "nojump":
if self.mode is not None:
logger.warn('Combining Nojump with other Coordinate modes is not supported and may cause unexpected results.')
logger.warn(
"Combining Nojump with other Coordinate modes is not supported and may cause unexpected results."
)
frame = nojump(self)
frame.mode = 'nojump'
frame.mode = "nojump"
return frame
else:
return self
def __new__(subtype, shape, dtype=float, buffer=None, offset=0, strides=None, order=None,
coordinates=None, step=None, box=None, mode=None):
def __new__(
subtype,
shape,
dtype=float,
buffer=None,
offset=0,
strides=None,
order=None,
coordinates=None,
step=None,
box=None,
mode=None,
):
obj = np.ndarray.__new__(subtype, shape, dtype, buffer, offset, strides)
obj.coordinates = coordinates
@ -182,11 +194,11 @@ class CoordinateFrame(np.ndarray):
if obj is None:
return
self.coordinates = getattr(obj, 'coordinates', None)
self.step = getattr(obj, 'step', None)
self.mode = getattr(obj, 'mode', None)
if hasattr(obj, 'reference'):
self.reference = getattr(obj, 'reference')
self.coordinates = getattr(obj, "coordinates", None)
self.step = getattr(obj, "step", None)
self.mode = getattr(obj, "mode", None)
if hasattr(obj, "reference"):
self.reference = getattr(obj, "reference")
class Coordinates:
@ -198,21 +210,25 @@ class Coordinates:
def get_mode(self, mode):
if self.atom_subset is not None:
return Coordinates(frames=self.frames, atom_subset=self.atom_subset, mode=mode)[self._slice]
return Coordinates(
frames=self.frames, atom_subset=self.atom_subset, mode=mode
)[self._slice]
else:
return Coordinates(frames=self.frames, atom_filter=self.atom_filter, mode=mode)[self._slice]
return Coordinates(
frames=self.frames, atom_filter=self.atom_filter, mode=mode
)[self._slice]
@property
def pbc(self):
return self.get_mode('pbc')
return self.get_mode("pbc")
@property
def whole(self):
return self.get_mode('whole')
return self.get_mode("whole")
@property
def nojump(self):
return self.get_mode('nojump')
return self.get_mode("nojump")
@property
def mode(self):
@ -221,12 +237,17 @@ class Coordinates:
@mode.setter
def mode(self, val):
if val in CoordinateFrame._known_modes:
logger.warn('Changing the Coordinates mode directly is deprecated. Use Coordinates.%s instead, which returns a copy.', val)
logger.warn(
"Changing the Coordinates mode directly is deprecated. Use Coordinates.%s instead, which returns a copy.",
val,
)
self._mode = val
else:
raise UnknownCoordinatesMode('No such mode: {}'.format(val))
raise UnknownCoordinatesMode("No such mode: {}".format(val))
def __init__(self, frames, atom_filter=None, atom_subset: AtomSubset=None, mode=None):
def __init__(
self, frames, atom_filter=None, atom_subset: AtomSubset = None, mode=None
):
"""
Args:
frames: The trajectory reader
@ -241,7 +262,9 @@ class Coordinates:
self._mode = mode
self.frames = frames
self._slice = slice(None)
assert atom_filter is None or atom_subset is None, "Cannot use both: subset and filter"
assert (
atom_filter is None or atom_subset is None
), "Cannot use both: subset and filter"
if atom_filter is not None:
self.atom_filter = atom_filter
@ -258,7 +281,9 @@ class Coordinates:
"""Returns the fnr-th frame."""
try:
if self.atom_filter is not None:
frame = self.frames[fnr].positions[self.atom_filter].view(CoordinateFrame)
frame = (
self.frames[fnr].positions[self.atom_filter].view(CoordinateFrame)
)
else:
frame = self.frames.__getitem__(fnr).positions.view(CoordinateFrame)
frame.coordinates = self
@ -272,7 +297,7 @@ class Coordinates:
def clear_cache(self):
"""Clears the frame cache, if it is enabled."""
if hasattr(self.get_frame, 'clear_cache'):
if hasattr(self.get_frame, "clear_cache"):
self.get_frame.clear_cache()
def __iter__(self):
@ -300,7 +325,9 @@ class Coordinates:
@wraps(AtomSubset.subset)
def subset(self, **kwargs):
return Coordinates(self.frames, atom_subset=self.atom_subset.subset(**kwargs), mode=self._mode)
return Coordinates(
self.frames, atom_subset=self.atom_subset.subset(**kwargs), mode=self._mode
)
@property
def description(self):
@ -312,7 +339,6 @@ class Coordinates:
class MeanCoordinates(Coordinates):
def __init__(self, frames, atom_filter=None, mean=1):
super().__init__(frames, atom_filter)
self.mean = mean
@ -330,7 +356,6 @@ class MeanCoordinates(Coordinates):
class CoordinatesMap:
def __init__(self, coordinates, function):
self.coordinates = coordinates
self.frames = self.coordinates.frames
@ -374,7 +399,7 @@ class CoordinatesMap:
@property
def description(self):
return '{}_{}'.format(self._description, self.coordinates.description)
return "{}_{}".format(self._description, self.coordinates.description)
@description.setter
def description(self, desc):
@ -392,8 +417,8 @@ class CoordinatesMap:
def pbc(self):
return CoordinatesMap(self.coordinates.pbc, self.function)
class CoordinatesFilter:
class CoordinatesFilter:
@property
def atom_subset(self):
pass
@ -465,6 +490,7 @@ def map_coordinates(func):
@wraps(func)
def wrapped(coordinates, **kwargs):
return CoordinatesMap(coordinates, partial(func, **kwargs))
return wrapped
@ -496,13 +522,15 @@ def centers_of_mass(c, *, masses=None):
number_of_masses = len(masses)
number_of_coordinates, number_of_dimensions = c.shape
number_of_new_coordinates = number_of_coordinates // number_of_masses
grouped_masses = c.reshape(number_of_new_coordinates, number_of_masses, number_of_dimensions)
grouped_masses = c.reshape(
number_of_new_coordinates, number_of_masses, number_of_dimensions
)
return np.average(grouped_masses, axis=1, weights=masses)
@map_coordinates
def pore_coordinates(coordinates, origin, sym_axis='z'):
def pore_coordinates(coordinates, origin, sym_axis="z"):
"""
Map coordinates of a pore simulation so the pore has cylindrical symmetry.
@ -512,9 +540,9 @@ def pore_coordinates(coordinates, origin, sym_axis='z'):
sym_axis (opt.): Symmtery axis of the pore, may be a literal direction
'x', 'y' or 'z' or an array of shape (3,)
"""
if sym_axis in ('x', 'y', 'z'):
if sym_axis in ("x", "y", "z"):
rot_axis = np.zeros(shape=(3,))
rot_axis[['x', 'y', 'z'].index(sym_axis)] = 1
rot_axis[["x", "y", "z"].index(sym_axis)] = 1
else:
rot_axis = sym_axis
@ -560,4 +588,4 @@ def vectors(coordinates, atoms_a, atoms_b, normed=False, box=None):
vectors = pbc_diff(coords_a, coords_b, box=box)
norm = np.linalg.norm(vectors, axis=-1).reshape(-1, 1) if normed else 1
vectors.reference = coords_a
return vectors / norm
return vectors / norm

View File

@ -34,59 +34,78 @@ def subensemble_correlation(selector_function, correlation_function=correlation)
selector = selector_function(start_frame)
subensemble = map(lambda f: f[selector], chain([start_frame], iterator))
return correlation_function(function, subensemble)
return c
def multi_subensemble_correlation(selector_function):
"""
selector_function has to expect a frame and to
return either valid indices (as with subensemble_correlation)
or a multidimensional array whose entries are valid indices
selector_function has to expect a frame and to
return either valid indices (as with subensemble_correlation)
or a multidimensional array whose entries are valid indices
e.g. slice(10,100,2)
e.g. slice(10,100,2)
e.g. [1,2,3,4,5]
e.g. [1,2,3,4,5]
e.g. [[[0,1],[2],[3]],[[4],[5],[6]] -> shape: 2,3 with
list of indices of varying length
e.g. [[[0,1],[2],[3]],[[4],[5],[6]] -> shape: 2,3 with
list of indices of varying length
e.g. [slice(1653),slice(1653,None,3)]
e.g. [slice(1653),slice(1653,None,3)]
e.g. [np.ones(len_of_frames, bool)]
e.g. [np.ones(len_of_frames, bool)]
in general using slices is the most efficient.
if the selections are small subsets of a frame or when many subsets are empty
using indices will be more efficient than using masks.
in general using slices is the most efficient.
if the selections are small subsets of a frame or when many subsets are empty
using indices will be more efficient than using masks.
"""
@set_has_counter
def cmulti(function, frames):
iterator = iter(frames)
start_frame = next(iterator)
selectors = np.asarray(selector_function(start_frame))
sel_shape = selectors.shape
if sel_shape[-1] == 0: selectors = np.asarray(selectors,int)
if (selectors.dtype != object): sel_shape = sel_shape[:-1]
f_values = np.zeros(sel_shape + function(start_frame,start_frame).shape,)
count = np.zeros(sel_shape, dtype=int)
if sel_shape[-1] == 0:
selectors = np.asarray(selectors, int)
if selectors.dtype != object:
sel_shape = sel_shape[:-1]
f_values = np.zeros(
sel_shape + function(start_frame, start_frame).shape,
)
count = np.zeros(sel_shape, dtype=int)
is_first_frame_loop = True
def cc(act_frame):
nonlocal is_first_frame_loop
for index in np.ndindex(sel_shape):
sel = selectors[index]
sf_sel = start_frame[sel]
if is_first_frame_loop:
count[index] = len(sf_sel)
f_values[index] = function(sf_sel, act_frame[sel]) if count[index] != 0 else 0
count[index] = len(sf_sel)
f_values[index] = (
function(sf_sel, act_frame[sel]) if count[index] != 0 else 0
)
is_first_frame_loop = False
return np.asarray(f_values.copy())
return map(cc, chain([start_frame], iterator)), count
return cmulti
@autosave_data(2)
def shifted_correlation(function, frames, selector=None, segments=10,
skip=0.1, window=0.5, average=True, points=100,
nodes=8):
def shifted_correlation(
function,
frames,
selector=None,
segments=10,
skip=0.1,
window=0.5,
average=True,
points=100,
nodes=8,
):
"""
Calculate the time series for a correlation function.
@ -130,6 +149,7 @@ def shifted_correlation(function, frames, selector=None, segments=10,
>>> time, data = shifted_correlation(msd, coords)
"""
def get_correlation(start_frame, idx, selector=None):
shifted_idx = idx + start_frame
if selector:
@ -140,17 +160,23 @@ def shifted_correlation(function, frames, selector=None, segments=10,
return np.zeros(len(idx))
else:
start = frames[start_frame][index]
correlation = np.array([ function(start, frames[frame][index])
for frame in shifted_idx ])
correlation = np.array(
[function(start, frames[frame][index]) for frame in shifted_idx]
)
return correlation
if 1-skip < window:
window = 1-skip
if 1 - skip < window:
window = 1 - skip
start_frames = np.unique(np.linspace(len(frames)*skip,
len(frames)*(1-window),
num=segments, endpoint=False,
dtype=int))
start_frames = np.unique(
np.linspace(
len(frames) * skip,
len(frames) * (1 - window),
num=segments,
endpoint=False,
dtype=int,
)
)
num_frames = int(len(frames) * window)
ls = np.logspace(0, np.log10(num_frames + 1), num=points)
@ -158,14 +184,17 @@ def shifted_correlation(function, frames, selector=None, segments=10,
t = np.array([frames[i].time for i in idx]) - frames[0].time
if nodes == 1:
result = np.array([get_correlation(start_frame, idx=idx,
selector=selector)
for start_frame in start_frames])
result = np.array(
[
get_correlation(start_frame, idx=idx, selector=selector)
for start_frame in start_frames
]
)
else:
pool = ProcessPool(nodes=nodes)
result = np.array(pool.map(partial(get_correlation, idx=idx,
selector=selector),
start_frames))
result = np.array(
pool.map(partial(get_correlation, idx=idx, selector=selector), start_frames)
)
pool.terminate()
pool.restart()
@ -186,7 +215,7 @@ def msd(start, frame):
Mean square displacement
"""
vec = start - frame
return (vec ** 2).sum(axis=1).mean()
return (vec**2).sum(axis=1).mean()
def isf(start, frame, q, box=None):
@ -197,7 +226,7 @@ def isf(start, frame, q, box=None):
:param q: length of scattering vector
"""
vec = start - frame
distance = (vec ** 2).sum(axis=1) ** .5
distance = (vec**2).sum(axis=1) ** 0.5
return np.sinc(distance * q / np.pi).mean()
@ -226,11 +255,13 @@ def van_hove_self(start, end, bins):
G(r, t) = \sum_i \delta(|\vec r_i(0) - \vec r_i(t)| - r)
"""
vec = start - end
delta_r = ((vec)**2).sum(axis=-1)**.5
delta_r = ((vec) ** 2).sum(axis=-1) ** 0.5
return 1 / len(start) * histogram(delta_r, bins)[0]
def van_hove_distinct(onset, frame, bins, box=None, use_dask=True, comp=False, bincount=True):
def van_hove_distinct(
onset, frame, bins, box=None, use_dask=True, comp=False, bincount=True
):
r"""
Compute the distinct part of the Van Hove autocorrelation function.
@ -242,9 +273,13 @@ def van_hove_distinct(onset, frame, bins, box=None, use_dask=True, comp=False, b
dimension = len(box)
N = len(onset)
if use_dask:
onset = darray.from_array(onset, chunks=(500, dimension)).reshape(1, N, dimension)
frame = darray.from_array(frame, chunks=(500, dimension)).reshape(N, 1, dimension)
dist = ((pbc_diff(onset, frame, box)**2).sum(axis=-1)**0.5).ravel()
onset = darray.from_array(onset, chunks=(500, dimension)).reshape(
1, N, dimension
)
frame = darray.from_array(frame, chunks=(500, dimension)).reshape(
N, 1, dimension
)
dist = ((pbc_diff(onset, frame, box) ** 2).sum(axis=-1) ** 0.5).ravel()
if np.diff(bins).std() < 1e6:
dx = bins[0] - bins[1]
hist = darray.bincount((dist // dx).astype(int), minlength=(len(bins) - 1))
@ -253,16 +288,20 @@ def van_hove_distinct(onset, frame, bins, box=None, use_dask=True, comp=False, b
return hist.compute() / N
else:
if comp:
dx = bins[1] - bins[0]
minlength = len(bins) - 1
def f(x):
d = (pbc_diff(x, frame, box)**2).sum(axis=-1)**0.5
return np.bincount((d // dx).astype(int), minlength=minlength)[:minlength]
d = (pbc_diff(x, frame, box) ** 2).sum(axis=-1) ** 0.5
return np.bincount((d // dx).astype(int), minlength=minlength)[
:minlength
]
hist = sum(f(x) for x in onset)
else:
dist = (pbc_diff(onset.reshape(1, -1, 3), frame.reshape(-1, 1, 3), box)**2).sum(axis=-1)**0.5
dist = (
pbc_diff(onset.reshape(1, -1, 3), frame.reshape(-1, 1, 3), box) ** 2
).sum(axis=-1) ** 0.5
hist = histogram(dist, bins=bins)[0]
return hist / N
@ -304,9 +343,12 @@ def susceptibility(time, correlation, **kwargs):
**kwargs (opt.):
Additional keyword arguments will be passed to :func:`filon_fourier_transformation`.
"""
frequencies, fourier = filon_fourier_transformation(time, correlation, imag=False, **kwargs)
frequencies, fourier = filon_fourier_transformation(
time, correlation, imag=False, **kwargs
)
return frequencies, frequencies * fourier
def coherent_scattering_function(onset, frame, q):
"""
Calculate the coherent scattering function.
@ -331,11 +373,12 @@ def coherent_scattering_function(onset, frame, q):
return coherent_sum(scfunc, onset.pbc, frame.pbc) / len(onset)
def non_gaussian(onset, frame):
"""
Calculate the Non-Gaussian parameter :
..math:
\alpha_2 (t) = \frac{3}{5}\frac{\langle r_i^4(t)\rangle}{\langle r_i^2(t)\rangle^2} - 1
"""
r_2 = ((frame - onset)**2).sum(axis=-1)
return 3 / 5 * (r_2**2).mean() / r_2.mean()**2 - 1
r_2 = ((frame - onset) ** 2).sum(axis=-1)
return 3 / 5 * (r_2**2).mean() / r_2.mean() ** 2 - 1

View File

@ -9,7 +9,7 @@ from .logging import logger
from scipy import spatial
@autosave_data(nargs=2, kwargs_keys=('coordinates_b',), version='time_average-1')
@autosave_data(nargs=2, kwargs_keys=("coordinates_b",), version="time_average-1")
def time_average(function, coordinates, coordinates_b=None, pool=None):
"""
Compute the time average of a function.
@ -45,7 +45,7 @@ def time_average(function, coordinates, coordinates_b=None, pool=None):
number_of_averages += 1
result += ev
if number_of_averages % 100 == 0:
logger.debug('time_average: %d', number_of_averages)
logger.debug("time_average: %d", number_of_averages)
return result / number_of_averages
@ -78,7 +78,16 @@ def time_histogram(function, coordinates, bins, hist_range, pool=None):
return hist_results
def rdf(atoms_a, atoms_b=None, bins=None, box=None, kind=None, chunksize=50000, returnx=False, **kwargs):
def rdf(
atoms_a,
atoms_b=None,
bins=None,
box=None,
kind=None,
chunksize=50000,
returnx=False,
**kwargs
):
r"""
Compute the radial pair distribution of one or two sets of atoms.
@ -102,8 +111,12 @@ def rdf(atoms_a, atoms_b=None, bins=None, box=None, kind=None, chunksize=50000,
as large as possible, depending on the available memory.
returnx (opt.): If True the x ordinate of the histogram is returned.
"""
assert bins is not None, 'Bins of the pair distribution have to be defined.'
assert kind in ['intra', 'inter', None], 'Argument kind must be one of the following: intra, inter, None.'
assert bins is not None, "Bins of the pair distribution have to be defined."
assert kind in [
"intra",
"inter",
None,
], "Argument kind must be one of the following: intra, inter, None."
if box is None:
box = atoms_a.box.diagonal()
if atoms_b is None:
@ -121,18 +134,24 @@ def rdf(atoms_a, atoms_b=None, bins=None, box=None, kind=None, chunksize=50000,
for chunk in range(0, len(indices[0]), chunksize):
sl = slice(chunk, chunk + chunksize)
diff = pbc_diff(atoms_a[indices[0][sl]], atoms_b[indices[1][sl]], box)
dist = (diff**2).sum(axis=1)**0.5
if kind == 'intra':
mask = atoms_a.residue_ids[indices[0][sl]] == atoms_b.residue_ids[indices[1][sl]]
dist = (diff**2).sum(axis=1) ** 0.5
if kind == "intra":
mask = (
atoms_a.residue_ids[indices[0][sl]]
== atoms_b.residue_ids[indices[1][sl]]
)
dist = dist[mask]
elif kind == 'inter':
mask = atoms_a.residue_ids[indices[0][sl]] != atoms_b.residue_ids[indices[1][sl]]
elif kind == "inter":
mask = (
atoms_a.residue_ids[indices[0][sl]]
!= atoms_b.residue_ids[indices[1][sl]]
)
dist = dist[mask]
nr_of_samples += len(dist)
hist += np.histogram(dist, bins)[0]
volume = 4 / 3 * np.pi * (bins[1:]**3 - bins[:-1]**3)
volume = 4 / 3 * np.pi * (bins[1:] ** 3 - bins[:-1] ** 3)
density = nr_of_samples / np.prod(box)
res = hist / volume / density
if returnx:
@ -141,87 +160,117 @@ def rdf(atoms_a, atoms_b=None, bins=None, box=None, kind=None, chunksize=50000,
return res
def pbc_tree_rdf(atoms_a, atoms_b=None, bins=None, box=None, exclude=0, returnx=False, **kwargs):
def pbc_tree_rdf(
atoms_a, atoms_b=None, bins=None, box=None, exclude=0, returnx=False, **kwargs
):
if box is None:
box = atoms_a.box.diagonal()
all_coords = pbc_points(atoms_b, box, thickness=np.amax(bins)+0.1)
all_coords = pbc_points(atoms_b, box, thickness=np.amax(bins) + 0.1)
to_tree = spatial.cKDTree(all_coords)
dist = to_tree.query(pbc_diff(atoms_a,box=box),k=len(atoms_b), distance_upper_bound=np.amax(bins)+0.1)[0].flatten()
dist = to_tree.query(
pbc_diff(atoms_a, box=box),
k=len(atoms_b),
distance_upper_bound=np.amax(bins) + 0.1,
)[0].flatten()
dist = dist[dist < np.inf]
hist = np.histogram(dist, bins)[0]
volume = 4/3*np.pi*(bins[1:]**3-bins[:-1]**3)
res = (hist) * np.prod(box) / volume / len(atoms_a) / (len(atoms_b)-exclude)
volume = 4 / 3 * np.pi * (bins[1:] ** 3 - bins[:-1] ** 3)
res = (hist) * np.prod(box) / volume / len(atoms_a) / (len(atoms_b) - exclude)
if returnx:
return np.vstack((runningmean(bins, 2), res))
else:
return res
def pbc_spm_rdf(atoms_a, atoms_b=None, bins=None, box=None, exclude=0, returnx=False, **kwargs):
def pbc_spm_rdf(
atoms_a, atoms_b=None, bins=None, box=None, exclude=0, returnx=False, **kwargs
):
if box is None:
box = atoms_a.box
all_coords = pbc_points(atoms_b, box, thickness=np.amax(bins)+0.1)
all_coords = pbc_points(atoms_b, box, thickness=np.amax(bins) + 0.1)
to_tree = spatial.cKDTree(all_coords)
if all_coords.nbytes/1024**3 * len(atoms_a) < 2:
from_tree = spatial.cKDTree(pbc_diff(atoms_a,box=box))
dist = to_tree.sparse_distance_matrix(from_tree, max_distance=np.amax(bins)+0.1, output_type='ndarray')
dist = np.asarray(dist.tolist())[:,2]
if all_coords.nbytes / 1024**3 * len(atoms_a) < 2:
from_tree = spatial.cKDTree(pbc_diff(atoms_a, box=box))
dist = to_tree.sparse_distance_matrix(
from_tree, max_distance=np.amax(bins) + 0.1, output_type="ndarray"
)
dist = np.asarray(dist.tolist())[:, 2]
hist = np.histogram(dist, bins)[0]
else:
chunksize = int(2 * len(atoms_a) / (all_coords.nbytes/1024**3 * len(atoms_a)))
chunksize = int(
2 * len(atoms_a) / (all_coords.nbytes / 1024**3 * len(atoms_a))
)
hist = 0
for chunk in range(0, len(atoms_a), chunksize):
sl = slice(chunk, chunk + chunksize)
from_tree = spatial.cKDTree(pbc_diff(atoms_a[sl],box=box))
dist = to_tree.sparse_distance_matrix(from_tree, max_distance=np.amax(bins)+0.1, output_type='ndarray')
dist = np.asarray(dist.tolist())[:,2]
from_tree = spatial.cKDTree(pbc_diff(atoms_a[sl], box=box))
dist = to_tree.sparse_distance_matrix(
from_tree, max_distance=np.amax(bins) + 0.1, output_type="ndarray"
)
dist = np.asarray(dist.tolist())[:, 2]
hist += np.histogram(dist, bins)[0]
volume = 4/3*np.pi*(bins[1:]**3-bins[:-1]**3)
res = (hist) * np.prod(box) / volume / len(atoms_a) / (len(atoms_b)-exclude)
volume = 4 / 3 * np.pi * (bins[1:] ** 3 - bins[:-1] ** 3)
res = (hist) * np.prod(box) / volume / len(atoms_a) / (len(atoms_b) - exclude)
if returnx:
return np.vstack((runningmean(bins, 2), res))
else:
return res
@autosave_data(nargs=2, kwargs_keys=('to_coords','times'))
@autosave_data(nargs=2, kwargs_keys=("to_coords", "times"))
def fast_averaged_rdf(from_coords, bins, to_coords=None, times=10, exclude=0, **kwargs):
if to_coords is None:
to_coords = from_coords
exclude = 1
# first find timings for the different rdf functions
import time
# only consider sparse matrix for this condition
if (len(from_coords[0])*len(to_coords[0]) <= 3000 * 2000 ) & (len(from_coords[0])/len(to_coords[0]) > 5 ):
if (len(from_coords[0]) * len(to_coords[0]) <= 3000 * 2000) & (
len(from_coords[0]) / len(to_coords[0]) > 5
):
funcs = [rdf, pbc_tree_rdf, pbc_spm_rdf]
else:
funcs = [rdf, pbc_tree_rdf]
timings = []
for f in funcs:
start = time.time()
f(from_coords[0], atoms_b=to_coords[0], bins=bins, box=np.diag(from_coords[0].box))
f(
from_coords[0],
atoms_b=to_coords[0],
bins=bins,
box=np.diag(from_coords[0].box),
)
end = time.time()
timings.append(end-start)
timings.append(end - start)
timings = np.array(timings)
timings[0] = 2*timings[0] # statistics for the other functions is twice as good per frame
logger.debug('rdf function timings: ' + str(timings))
timings[0] = (
2 * timings[0]
) # statistics for the other functions is twice as good per frame
logger.debug("rdf function timings: " + str(timings))
rdffunc = funcs[np.argmin(timings)]
logger.debug('rdf function used: ' + str(rdffunc))
logger.debug("rdf function used: " + str(rdffunc))
if rdffunc == rdf:
times = times*2 # duplicate times for same statistics
times = times * 2 # duplicate times for same statistics
frames = np.array(range(0, len(from_coords), int(len(from_coords)/times)))[:times]
out = np.zeros(len(bins)-1)
frames = np.array(range(0, len(from_coords), int(len(from_coords) / times)))[:times]
out = np.zeros(len(bins) - 1)
for j, i in enumerate(frames):
logger.debug('multi_radial_pair_distribution: %d/%d', j, len(frames))
out += rdffunc(from_coords[i], to_coords[i], bins, box=np.diag(from_coords[i].box), exclude=exclude)
return out/len(frames)
logger.debug("multi_radial_pair_distribution: %d/%d", j, len(frames))
out += rdffunc(
from_coords[i],
to_coords[i],
bins,
box=np.diag(from_coords[i].box),
exclude=exclude,
)
return out / len(frames)
def distance_distribution(atoms, bins):
connection_vectors = atoms[:-1, :] - atoms[1:, :]
connection_lengths = (connection_vectors**2).sum(axis=1)**.5
connection_lengths = (connection_vectors**2).sum(axis=1) ** 0.5
return np.histogram(connection_lengths, bins)[0]
@ -230,8 +279,12 @@ def tetrahedral_order(atoms, reference_atoms=None):
reference_atoms = atoms
indices = next_neighbors(reference_atoms, query_atoms=atoms, number_of_neighbors=4)
neighbors = reference_atoms[indices]
neighbors_1, neighbors_2, neighbors_3, neighbors_4 = \
neighbors[:, 0, :], neighbors[:, 1, :], neighbors[:, 2, :], neighbors[:, 3, :]
neighbors_1, neighbors_2, neighbors_3, neighbors_4 = (
neighbors[:, 0, :],
neighbors[:, 1, :],
neighbors[:, 2, :],
neighbors[:, 3, :],
)
# Connection vectors
neighbors_1 -= atoms
@ -245,14 +298,14 @@ def tetrahedral_order(atoms, reference_atoms=None):
neighbors_3 /= np.linalg.norm(neighbors_3, axis=-1).reshape(-1, 1)
neighbors_4 /= np.linalg.norm(neighbors_4, axis=-1).reshape(-1, 1)
a_1_2 = ((neighbors_1 * neighbors_2).sum(axis=1) + 1 / 3)**2
a_1_3 = ((neighbors_1 * neighbors_3).sum(axis=1) + 1 / 3)**2
a_1_4 = ((neighbors_1 * neighbors_4).sum(axis=1) + 1 / 3)**2
a_1_2 = ((neighbors_1 * neighbors_2).sum(axis=1) + 1 / 3) ** 2
a_1_3 = ((neighbors_1 * neighbors_3).sum(axis=1) + 1 / 3) ** 2
a_1_4 = ((neighbors_1 * neighbors_4).sum(axis=1) + 1 / 3) ** 2
a_2_3 = ((neighbors_2 * neighbors_3).sum(axis=1) + 1 / 3)**2
a_2_4 = ((neighbors_2 * neighbors_4).sum(axis=1) + 1 / 3)**2
a_2_3 = ((neighbors_2 * neighbors_3).sum(axis=1) + 1 / 3) ** 2
a_2_4 = ((neighbors_2 * neighbors_4).sum(axis=1) + 1 / 3) ** 2
a_3_4 = ((neighbors_3 * neighbors_4).sum(axis=1) + 1 / 3)**2
a_3_4 = ((neighbors_3 * neighbors_4).sum(axis=1) + 1 / 3) ** 2
q = 1 - 3 / 8 * (a_1_2 + a_1_3 + a_1_4 + a_2_3 + a_2_4 + a_3_4)
@ -260,12 +313,14 @@ def tetrahedral_order(atoms, reference_atoms=None):
def tetrahedral_order_distribution(atoms, reference_atoms=None, bins=None):
assert bins is not None, 'Bin edges of the distribution have to be specified.'
assert bins is not None, "Bin edges of the distribution have to be specified."
Q = tetrahedral_order(atoms, reference_atoms=reference_atoms)
return np.histogram(Q, bins=bins)[0]
def radial_density(atoms, bins, symmetry_axis=(0, 0, 1), origin=(0, 0, 0), height=1, returnx=False):
def radial_density(
atoms, bins, symmetry_axis=(0, 0, 1), origin=(0, 0, 0), height=1, returnx=False
):
"""
Calculate the radial density distribution.
@ -290,7 +345,7 @@ def radial_density(atoms, bins, symmetry_axis=(0, 0, 1), origin=(0, 0, 0), heigh
cartesian = rotate_axis(atoms - origin, symmetry_axis)
radius, _ = polar_coordinates(cartesian[:, 0], cartesian[:, 1])
hist = np.histogram(radius, bins=bins)[0]
volume = np.pi * (bins[1:]**2 - bins[:-1]**2) * height
volume = np.pi * (bins[1:] ** 2 - bins[:-1] ** 2) * height
res = hist / volume
if returnx:
return np.vstack((runningmean(bins, 2), res))
@ -298,8 +353,14 @@ def radial_density(atoms, bins, symmetry_axis=(0, 0, 1), origin=(0, 0, 0), heigh
return res
def shell_density(atoms, shell_radius, bins, shell_thickness=0.5,
symmetry_axis=(0, 0, 1), origin=(0, 0, 0)):
def shell_density(
atoms,
shell_radius,
bins,
shell_thickness=0.5,
symmetry_axis=(0, 0, 1),
origin=(0, 0, 0),
):
"""
Compute the density distribution on a cylindrical shell.
@ -316,9 +377,11 @@ def shell_density(atoms, shell_radius, bins, shell_thickness=0.5,
Returns:
Two-dimensional density distribution of the atoms in the defined shell.
"""
cartesian = rotate_axis(atoms-origin, symmetry_axis)
cartesian = rotate_axis(atoms - origin, symmetry_axis)
radius, theta = polar_coordinates(cartesian[:, 0], cartesian[:, 1])
shell_indices = (shell_radius <= radius) & (radius <= shell_radius + shell_thickness)
shell_indices = (shell_radius <= radius) & (
radius <= shell_radius + shell_thickness
)
hist = np.histogram2d(theta[shell_indices], cartesian[shell_indices, 2], bins)[0]
return hist
@ -332,34 +395,56 @@ def spatial_density(atoms, bins, weights=None):
return density
def mixing_ratio_distribution(atoms_a, atoms_b, bins_ratio, bins_density,
weights_a=None, weights_b=None, weights_ratio=None):
def mixing_ratio_distribution(
atoms_a,
atoms_b,
bins_ratio,
bins_density,
weights_a=None,
weights_b=None,
weights_ratio=None,
):
"""
Compute the distribution of the mixing ratio of two sets of atoms.
"""
density_a, _ = time_average
density_b, _ = np.histogramdd(atoms_b, bins=bins_density, weights=weights_b)
mixing_ratio = density_a/(density_a + density_b)
mixing_ratio = density_a / (density_a + density_b)
good_inds = (density_a != 0) & (density_b != 0)
hist, _ = np.histogram(mixing_ratio[good_inds], bins=bins_ratio, weights=weights_ratio)
hist, _ = np.histogram(
mixing_ratio[good_inds], bins=bins_ratio, weights=weights_ratio
)
return hist
def next_neighbor_distribution(atoms, reference=None, number_of_neighbors=4, bins=None, normed=True):
def next_neighbor_distribution(
atoms, reference=None, number_of_neighbors=4, bins=None, normed=True
):
"""
Compute the distribution of next neighbors with the same residue name.
"""
assert bins is not None, 'Bins have to be specified.'
assert bins is not None, "Bins have to be specified."
if reference is None:
reference = atoms
nn = next_neighbors(reference, query_atoms=atoms, number_of_neighbors=number_of_neighbors)
nn = next_neighbors(
reference, query_atoms=atoms, number_of_neighbors=number_of_neighbors
)
resname_nn = reference.residue_names[nn]
count_nn = (resname_nn == atoms.residue_names.reshape(-1, 1)).sum(axis=1)
return np.histogram(count_nn, bins=bins, normed=normed)[0]
def hbonds(D, H, A, box, DA_lim=0.35, HA_lim=0.35, min_cos=np.cos(30*np.pi/180), full_output=False):
def hbonds(
D,
H,
A,
box,
DA_lim=0.35,
HA_lim=0.35,
min_cos=np.cos(30 * np.pi / 180),
full_output=False,
):
"""
Compute h-bond pairs
@ -380,42 +465,51 @@ def hbonds(D, H, A, box, DA_lim=0.35, HA_lim=0.35, min_cos=np.cos(30*np.pi/180),
"""
def dist_DltA(D, H, A, box, max_dist=0.35):
ppoints, pind = pbc_points(D, box, thickness=max_dist+0.1, index=True)
ppoints, pind = pbc_points(D, box, thickness=max_dist + 0.1, index=True)
Dtree = spatial.cKDTree(ppoints)
Atree = spatial.cKDTree(A)
pairs = Dtree.sparse_distance_matrix(Atree, max_dist, output_type='ndarray')
pairs = Dtree.sparse_distance_matrix(Atree, max_dist, output_type="ndarray")
pairs = np.asarray(pairs.tolist())
pairs = np.int_(pairs[pairs[:,2] > 0][:,:2])
pairs[:,0] = pind[pairs[:,0]]
pairs = np.int_(pairs[pairs[:, 2] > 0][:, :2])
pairs[:, 0] = pind[pairs[:, 0]]
return pairs
def dist_AltD(D, H, A, box, max_dist=0.35):
ppoints, pind = pbc_points(A, box, thickness=max_dist+0.1, index=True)
ppoints, pind = pbc_points(A, box, thickness=max_dist + 0.1, index=True)
Atree = spatial.cKDTree(ppoints)
Dtree = spatial.cKDTree(D)
pairs = Atree.sparse_distance_matrix(Dtree, max_dist, output_type='ndarray')
pairs = Atree.sparse_distance_matrix(Dtree, max_dist, output_type="ndarray")
pairs = np.asarray(pairs.tolist())
pairs = np.int_(pairs[pairs[:,2] > 0][:,:2])
pairs = pairs[:,::-1]
pairs[:,1] = pind[pairs[:,1]]
pairs = np.int_(pairs[pairs[:, 2] > 0][:, :2])
pairs = pairs[:, ::-1]
pairs[:, 1] = pind[pairs[:, 1]]
return pairs
if len(D) <= len(A):
pairs = dist_DltA(D,H,A,box,DA_lim)
pairs = dist_DltA(D, H, A, box, DA_lim)
else:
pairs = dist_AltD(D,H,A,box,DA_lim)
pairs = dist_AltD(D, H, A, box, DA_lim)
vDH = pbc_diff(D[pairs[:,0]], H[pairs[:,0]], box)
vDA = pbc_diff(D[pairs[:,0]], A[pairs[:,1]], box)
vHA = pbc_diff(H[pairs[:,0]], A[pairs[:,1]], box)
angles_cos = np.clip(np.einsum('ij,ij->i', vDH, vDA)/
np.linalg.norm(vDH,axis=1)/
np.linalg.norm(vDA,axis=1), -1, 1)
is_bond = ((angles_cos >= min_cos) &
(np.sum(vHA**2,axis=-1) <= HA_lim**2) &
(np.sum(vDA**2,axis=-1) <= DA_lim**2))
vDH = pbc_diff(D[pairs[:, 0]], H[pairs[:, 0]], box)
vDA = pbc_diff(D[pairs[:, 0]], A[pairs[:, 1]], box)
vHA = pbc_diff(H[pairs[:, 0]], A[pairs[:, 1]], box)
angles_cos = np.clip(
np.einsum("ij,ij->i", vDH, vDA)
/ np.linalg.norm(vDH, axis=1)
/ np.linalg.norm(vDA, axis=1),
-1,
1,
)
is_bond = (
(angles_cos >= min_cos)
& (np.sum(vHA**2, axis=-1) <= HA_lim**2)
& (np.sum(vDA**2, axis=-1) <= DA_lim**2)
)
if full_output:
return pairs[is_bond], angles_cos[is_bond], np.sum(vDA[is_bond]**2,axis=-1)**.5
return (
pairs[is_bond],
angles_cos[is_bond],
np.sum(vDA[is_bond] ** 2, axis=-1) ** 0.5,
)
else:
return pairs[is_bond]

View File

@ -2,20 +2,25 @@ import numpy as np
def kww(t, A, τ, β):
return A * np.exp(-(t / τ)**β)
return A * np.exp(-((t / τ) ** β))
def kww_1e(A, τ, β):
return τ * (-np.log(1 / (np.e * A)))**(1 / β)
return τ * (-np.log(1 / (np.e * A))) ** (1 / β)
def cole_davidson(w, A, b, t0):
P = np.arctan(w * t0)
return A * np.cos(P)**b * np.sin(b * P)
return A * np.cos(P) ** b * np.sin(b * P)
def cole_cole(w, A, b, t0):
return A * (w * t0)**b * np.sin(np.pi * b / 2) / (1 + 2 * (w * t0)**b * np.cos(np.pi * b / 2) + (w * t0)**(2 * b))
return (
A
* (w * t0) ** b
* np.sin(np.pi * b / 2)
/ (1 + 2 * (w * t0) ** b * np.cos(np.pi * b / 2) + (w * t0) ** (2 * b))
)
def havriliak_negami(ω, A, β, α, τ):
@ -25,14 +30,14 @@ def havriliak_negami(ω, A, β, α, τ):
.. math::
\chi_{HN}(\omega) = \Im\left(\frac{A}{(1 + (i\omega\tau)^\alpha)^\beta}\right)
"""
return -(A / (1 + (1j * ω * τ)**α)**β).imag
return -(A / (1 + (1j * ω * τ) ** α) ** β).imag
# fits decay of correlation times, e.g. with distance to pore walls
def colen(d, X, t8, A):
return t8 * np.exp(A*np.exp(-d/X))
return t8 * np.exp(A * np.exp(-d / X))
# fits decay of the plateau height of the overlap function, e.g. with distance to pore walls
def colenQ(d, X, Qb, g):
return (1-Qb)*np.exp(-(d/X)**g)+Qb
return (1 - Qb) * np.exp(-((d / X) ** g)) + Qb

View File

@ -1,12 +1,15 @@
import logging
logger = logging.getLogger('mdevaluate')
logger = logging.getLogger("mdevaluate")
logger.setLevel(logging.INFO)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
logger.addHandler(stream_handler)
formatter = logging.Formatter('{levelname[0]}{levelname[1]}{levelname[2]}[{asctime}]:{funcName}: {message}', style='{')
formatter = logging.Formatter(
"{levelname[0]}{levelname[1]}{levelname[2]}[{asctime}]:{funcName}: {message}",
style="{",
)
stream_handler.setFormatter(formatter)

View File

@ -8,6 +8,7 @@ from itertools import product
from .logging import logger
def pbc_diff_old(v1, v2, box):
"""
Calculate the difference of two vestors, considering optional boundary conditions.
@ -21,16 +22,19 @@ def pbc_diff_old(v1, v2, box):
return v
def pbc_diff(v1, v2=None, box=None):
if box is None:
out = v1 - v2
elif len(getattr(box, 'shape', [])) == 1:
elif len(getattr(box, "shape", [])) == 1:
out = pbc_diff_rect(v1, v2, box)
elif len(getattr(box, 'shape', [])) == 2:
elif len(getattr(box, "shape", [])) == 2:
out = pbc_diff_tric(v1, v2, box)
else: raise NotImplementedError("cannot handle box")
else:
raise NotImplementedError("cannot handle box")
return out
def pbc_diff_rect(v1, v2, box):
"""
Calculate the difference of two vectors, considering periodic boundary conditions.
@ -38,7 +42,7 @@ def pbc_diff_rect(v1, v2, box):
if v2 is None:
v = v1
else:
v = v1 -v2
v = v1 - v2
s = v / box
v = box * (s - s.round())
@ -52,71 +56,92 @@ def pbc_diff_tric(v1, v2=None, box=None):
Args:
box_matrix: CoordinateFrame.box
"""
if len(box.shape) == 1: box = np.diag(box)
if v1.shape == (3,): v1 = v1.reshape((1,3)) #quick 'n dirty
if v2.shape == (3,): v2 = v2.reshape((1,3))
if len(box.shape) == 1:
box = np.diag(box)
if v1.shape == (3,):
v1 = v1.reshape((1, 3)) # quick 'n dirty
if v2.shape == (3,):
v2 = v2.reshape((1, 3))
if box is not None:
r3 = np.subtract(v1, v2)
r2 = np.subtract(r3, (np.rint(np.divide(r3[:,2],box[2][2])))[:,np.newaxis] * box[2][np.newaxis,:])
r1 = np.subtract(r2, (np.rint(np.divide(r2[:,1],box[1][1])))[:,np.newaxis] * box[1][np.newaxis,:])
v = np.subtract(r1, (np.rint(np.divide(r1[:,0],box[0][0])))[:,np.newaxis] * box[0][np.newaxis,:])
r2 = np.subtract(
r3,
(np.rint(np.divide(r3[:, 2], box[2][2])))[:, np.newaxis]
* box[2][np.newaxis, :],
)
r1 = np.subtract(
r2,
(np.rint(np.divide(r2[:, 1], box[1][1])))[:, np.newaxis]
* box[1][np.newaxis, :],
)
v = np.subtract(
r1,
(np.rint(np.divide(r1[:, 0], box[0][0])))[:, np.newaxis]
* box[0][np.newaxis, :],
)
else:
v = v1 - v2
return v
def pbc_dist(a1,a2,box = None):
return ((pbc_diff(a1,a2,box)**2).sum(axis=1))**0.5
def pbc_dist(a1, a2, box=None):
return ((pbc_diff(a1, a2, box) ** 2).sum(axis=1)) ** 0.5
def pbc_extend(c, box):
"""
in: c is frame, box is frame.box
out: all atoms in frame and their perio. image (shape => array(len(c)*27,3))
in: c is frame, box is frame.box
out: all atoms in frame and their perio. image (shape => array(len(c)*27,3))
"""
c=np.asarray(c)
if c.shape == (3,): c = c.reshape((1,3)) #quick 'n dirty
comb = np.array([np.asarray(i) for i in product([0,-1,1],[0,-1,1],[0,-1,1])])
b_matrices = comb[:,:,np.newaxis]*box[np.newaxis,:,:]
b_vectors = b_matrices.sum(axis=1)[np.newaxis,:,:]
return (c[:,np.newaxis,:]+b_vectors)
c = np.asarray(c)
if c.shape == (3,):
c = c.reshape((1, 3)) # quick 'n dirty
comb = np.array(
[np.asarray(i) for i in product([0, -1, 1], [0, -1, 1], [0, -1, 1])]
)
b_matrices = comb[:, :, np.newaxis] * box[np.newaxis, :, :]
b_vectors = b_matrices.sum(axis=1)[np.newaxis, :, :]
return c[:, np.newaxis, :] + b_vectors
def pbc_kdtree(v1,box, leafsize = 32, compact_nodes = False, balanced_tree = False):
def pbc_kdtree(v1, box, leafsize=32, compact_nodes=False, balanced_tree=False):
"""
kd_tree with periodic images
box - whole matrix
rest: optional optimization
"""
r0 = cKDTree(pbc_extend(v1,box).reshape((-1,3)),leafsize ,compact_nodes ,balanced_tree)
r0 = cKDTree(
pbc_extend(v1, box).reshape((-1, 3)), leafsize, compact_nodes, balanced_tree
)
return r0
def pbc_kdtree_query(v1,v2,box,n = 1):
def pbc_kdtree_query(v1, v2, box, n=1):
"""
kd_tree query with periodic images
"""
r0, r1 = pbc_kdtree(v1,box).query(v2,n)
r0, r1 = pbc_kdtree(v1, box).query(v2, n)
r1 = r1 // 27
return r0, r1
def pbc_backfold_rect(act_frame,box_matrix):
def pbc_backfold_rect(act_frame, box_matrix):
"""
mimics "trjconv ... -pbc atom -ur rect"
folds coords of act_frame in cuboid
"""
af=np.asarray(act_frame)
if af.shape == (3,): act_frame = act_frame.reshape((1,3)) #quick 'n dirty
af = np.asarray(act_frame)
if af.shape == (3,):
act_frame = act_frame.reshape((1, 3)) # quick 'n dirty
b = box_matrix
c = np.diag(b)/2
af = pbc_diff(np.zeros((1,3)),af-c,b)
c = np.diag(b) / 2
af = pbc_diff(np.zeros((1, 3)), af - c, b)
return af + c
def pbc_backfold_compact(act_frame,box_matrix):
def pbc_backfold_compact(act_frame, box_matrix):
"""
mimics "trjconv ... -pbc atom -ur compact"
@ -124,18 +149,20 @@ def pbc_backfold_compact(act_frame,box_matrix):
"""
c = act_frame
box = box_matrix
ctr = box.sum(0)/2
c=np.asarray(c)
ctr = box.sum(0) / 2
c = np.asarray(c)
shape = c.shape
if shape == (3,):
c = c.reshape((1,3))
shape = (1,3) #quick 'n dirty
comb = np.array([np.asarray(i) for i in product([0,-1,1],[0,-1,1],[0,-1,1])])
b_matrices = comb[:,:,np.newaxis]*box[np.newaxis,:,:]
b_vectors = b_matrices.sum(axis=1)[np.newaxis,:,:]
sc = c[:,np.newaxis,:]+b_vectors
w = np.argsort((((sc)-ctr)**2).sum(2),1)[:,0]
return sc[range(shape[0]),w]
c = c.reshape((1, 3))
shape = (1, 3) # quick 'n dirty
comb = np.array(
[np.asarray(i) for i in product([0, -1, 1], [0, -1, 1], [0, -1, 1])]
)
b_matrices = comb[:, :, np.newaxis] * box[np.newaxis, :, :]
b_vectors = b_matrices.sum(axis=1)[np.newaxis, :, :]
sc = c[:, np.newaxis, :] + b_vectors
w = np.argsort((((sc) - ctr) ** 2).sum(2), 1)[:, 0]
return sc[range(shape[0]), w]
def whole(frame):
@ -147,8 +174,8 @@ def whole(frame):
# make sure, residue_ids are sorted, then determine indices at which the res id changes
# kind='stable' assures that any existent ordering is preserved
logger.debug('Using first atom as reference for whole.')
sort_ind = residue_ids.argsort(kind='stable')
logger.debug("Using first atom as reference for whole.")
sort_ind = residue_ids.argsort(kind="stable")
i = np.concatenate([[0], np.where(np.diff(residue_ids[sort_ind]) > 0)[0] + 1])
coms = frame[sort_ind[i]][residue_ids - 1]
@ -172,7 +199,7 @@ def nojump(frame, usecache=True):
selection = frame.selection
reader = frame.coordinates.frames
if usecache:
if not hasattr(reader, '_nojump_cache'):
if not hasattr(reader, "_nojump_cache"):
reader._nojump_cache = OrderedDict()
# make sure to use absolute (non negative) index
abstep = frame.step % len(frame.coordinates)
@ -185,23 +212,38 @@ def nojump(frame, usecache=True):
i0 = 0
delta = 0
delta = delta + np.array(np.vstack(
[m[i0:abstep + 1].sum(axis=0) for m in reader.nojump_matrixes]
).T) * frame.box.diagonal()
delta = (
delta
+ np.array(
np.vstack(
[m[i0 : abstep + 1].sum(axis=0) for m in reader.nojump_matrixes]
).T
)
* frame.box.diagonal()
)
reader._nojump_cache[abstep] = delta
while len(reader._nojump_cache) > NOJUMP_CACHESIZE:
reader._nojump_cache.popitem(last=False)
delta = delta[selection, :]
else:
delta = np.array(np.vstack(
[m[:frame.step + 1, selection].sum(axis=0) for m in reader.nojump_matrixes]
).T) * frame.box.diagonal()
delta = (
np.array(
np.vstack(
[
m[: frame.step + 1, selection].sum(axis=0)
for m in reader.nojump_matrixes
]
).T
)
* frame.box.diagonal()
)
return frame - delta
def pbc_points(coordinates, box, thickness=0, index=False, shear=False,
extra_image=False):
def pbc_points(
coordinates, box, thickness=0, index=False, shear=False, extra_image=False
):
"""
Returns the points their first periodic images. Does not fold
them back into the box.
@ -213,21 +255,28 @@ def pbc_points(coordinates, box, thickness=0, index=False, shear=False,
if shear:
box[2, 0] = box[2, 0] % box[0, 0]
if shear or extra_image:
grid = np.array([[i, j, k] for k in [-2, -1, 0, 1, 2]
for j in [2, 1, 0, -1, -2] for i in [-2, -1, 0, 1, 2]])
grid = np.array(
[
[i, j, k]
for k in [-2, -1, 0, 1, 2]
for j in [2, 1, 0, -1, -2]
for i in [-2, -1, 0, 1, 2]
]
)
indices = np.tile(np.arange(len(coordinates)), 125)
else:
grid = np.array([[i, j, k] for k in [-1, 0, 1] for j in [1, 0, -1]
for i in [-1, 0, 1]])
grid = np.array(
[[i, j, k] for k in [-1, 0, 1] for j in [1, 0, -1] for i in [-1, 0, 1]]
)
indices = np.tile(np.arange(len(coordinates)), 27)
coordinates_pbc = np.concatenate([coordinates+v@box for v in grid], axis=0)
coordinates_pbc = np.concatenate([coordinates + v @ box for v in grid], axis=0)
size = np.diag(box)
if thickness != 0:
mask = np.all(coordinates_pbc > -thickness, axis=1)
coordinates_pbc = coordinates_pbc[mask]
indices = indices[mask]
mask = np.all(coordinates_pbc < size+thickness, axis=1)
mask = np.all(coordinates_pbc < size + thickness, axis=1)
coordinates_pbc = coordinates_pbc[mask]
indices = indices[mask]
if index:

View File

@ -31,11 +31,14 @@ import re
class NojumpError(Exception):
pass
class WrongTopologyError(Exception):
pass
def open_with_mdanalysis(topology, trajectory, cached=False, index_file=None,
charges=None, masses=None):
def open_with_mdanalysis(
topology, trajectory, cached=False, index_file=None, charges=None, masses=None
):
"""Open a the topology and trajectory with mdanalysis."""
uni = mdanalysis.Universe(topology, trajectory, convert_units=False)
if cached is not False:
@ -47,10 +50,10 @@ def open_with_mdanalysis(topology, trajectory, cached=False, index_file=None,
else:
reader = BaseReader(uni.trajectory)
reader.universe = uni
if topology.endswith('.tpr'):
if topology.endswith(".tpr"):
charges = uni.atoms.charges
masses = uni.atoms.masses
elif topology.endswith('.gro'):
elif topology.endswith(".gro"):
charges = charges
masses = masses
else:
@ -60,13 +63,17 @@ def open_with_mdanalysis(topology, trajectory, cached=False, index_file=None,
indices = load_indices(index_file)
atms = atoms.Atoms(
np.stack((uni.atoms.resids, uni.atoms.resnames, uni.atoms.names), axis=1),
charges=charges, masses=masses, indices=indices
).subset()
np.stack((uni.atoms.resids, uni.atoms.resnames, uni.atoms.names), axis=1),
charges=charges,
masses=masses,
indices=indices,
).subset()
return atms, reader
group_re = re.compile('\[ ([-+\w]+) \]')
group_re = re.compile("\[ ([-+\w]+) \]")
def load_indices(index_file):
indices = {}
@ -79,13 +86,14 @@ def load_indices(index_file):
index_array = indices.get(group_name, [])
indices[group_name] = index_array
else:
elements = line.strip().split('\t')
elements = [x.split(' ') for x in elements]
elements = line.strip().split("\t")
elements = [x.split(" ") for x in elements]
elements = itertools.chain(*elements) # make a flat iterator
elements = [x for x in elements if x != '']
elements = [x for x in elements if x != ""]
index_array += [int(x) - 1 for x in elements]
return indices
def is_writeable(fname):
"""Test if a directory is actually writeable, by writing a temporary file."""
fdir = os.path.dirname(fname)
@ -95,7 +103,7 @@ def is_writeable(fname):
if os.access(fdir, os.W_OK):
try:
with builtins.open(ftmp, 'w'):
with builtins.open(ftmp, "w"):
pass
os.remove(ftmp)
return True
@ -106,68 +114,70 @@ def is_writeable(fname):
def nojump_load_filename(reader):
directory, fname = path.split(reader.filename)
full_path = path.join(directory, '.{}.nojump.npz'.format(fname))
full_path = path.join(directory, ".{}.nojump.npz".format(fname))
if not is_writeable(directory):
user_data_dir = os.path.join("/data/",
os.environ['HOME'].split("/")[-1])
user_data_dir = os.path.join("/data/", os.environ["HOME"].split("/")[-1])
full_path_fallback = os.path.join(
os.path.join(user_data_dir, '.mdevaluate/nojump'),
directory.lstrip('/'),
'.{}.nojump.npz'.format(fname)
os.path.join(user_data_dir, ".mdevaluate/nojump"),
directory.lstrip("/"),
".{}.nojump.npz".format(fname),
)
if os.path.exists(full_path_fallback):
return full_path_fallback
if os.path.exists(fname) or is_writeable(directory):
return full_path
else:
user_data_dir = os.path.join("/data/",
os.environ['HOME'].split("/")[-1])
user_data_dir = os.path.join("/data/", os.environ["HOME"].split("/")[-1])
full_path_fallback = os.path.join(
os.path.join(user_data_dir, '.mdevaluate/nojump'),
directory.lstrip('/'),
'.{}.nojump.npz'.format(fname)
os.path.join(user_data_dir, ".mdevaluate/nojump"),
directory.lstrip("/"),
".{}.nojump.npz".format(fname),
)
return full_path
def nojump_save_filename(reader):
directory, fname = path.split(reader.filename)
full_path = path.join(directory, '.{}.nojump.npz'.format(fname))
full_path = path.join(directory, ".{}.nojump.npz".format(fname))
if is_writeable(directory):
return full_path
else:
user_data_dir = os.path.join("/data/",
os.environ['HOME'].split("/")[-1])
user_data_dir = os.path.join("/data/", os.environ["HOME"].split("/")[-1])
full_path_fallback = os.path.join(
os.path.join(user_data_dir, '.mdevaluate/nojump'),
directory.lstrip('/'),
'.{}.nojump.npz'.format(fname)
os.path.join(user_data_dir, ".mdevaluate/nojump"),
directory.lstrip("/"),
".{}.nojump.npz".format(fname),
)
logger.info(
"Saving nojump to {}, since original location is not writeable.".format(
full_path_fallback
)
)
logger.info('Saving nojump to {}, since original location is not writeable.'.format(full_path_fallback))
os.makedirs(os.path.dirname(full_path_fallback), exist_ok=True)
return full_path_fallback
CSR_ATTRS = ('data', 'indices', 'indptr')
CSR_ATTRS = ("data", "indices", "indptr")
NOJUMP_MAGIC = 2016
def parse_jumps(trajectory):
prev = trajectory[0].whole
box = prev.box.diagonal()
SparseData = namedtuple('SparseData', ['data', 'row', 'col'])
SparseData = namedtuple("SparseData", ["data", "row", "col"])
jump_data = (
SparseData(data=array('b'), row=array('l'), col=array('l')),
SparseData(data=array('b'), row=array('l'), col=array('l')),
SparseData(data=array('b'), row=array('l'), col=array('l'))
SparseData(data=array("b"), row=array("l"), col=array("l")),
SparseData(data=array("b"), row=array("l"), col=array("l")),
SparseData(data=array("b"), row=array("l"), col=array("l")),
)
for i, curr in enumerate(trajectory):
if i % 500 == 0:
logger.debug('Parse jumps Step: %d', i)
logger.debug("Parse jumps Step: %d", i)
delta = ((curr - prev) / box).round().astype(np.int8)
prev = curr
for d in range(3):
col, = np.where(delta[:, d] != 0)
(col,) = np.where(delta[:, d] != 0)
jump_data[d].col.extend(col)
jump_data[d].row.extend([i] * len(col))
jump_data[d].data.extend(delta[col, d])
@ -179,14 +189,15 @@ def generate_nojump_matrixes(trajectory):
"""
Create the matrixes with pbc jumps for a trajectory.
"""
logger.info('generate Nojump Matrixes for: {}'.format(trajectory))
logger.info("generate Nojump Matrixes for: {}".format(trajectory))
jump_data = parse_jumps(trajectory)
N = len(trajectory)
M = len(trajectory[0])
trajectory.frames.nojump_matrixes = tuple(
sparse.csr_matrix((np.array(m.data), (m.row, m.col)), shape=(N, M)) for m in jump_data
sparse.csr_matrix((np.array(m.data), (m.row, m.col)), shape=(N, M))
for m in jump_data
)
save_nojump_matrixes(trajectory.frames)
@ -194,11 +205,11 @@ def generate_nojump_matrixes(trajectory):
def save_nojump_matrixes(reader, matrixes=None):
if matrixes is None:
matrixes = reader.nojump_matrixes
data = {'checksum': checksum(NOJUMP_MAGIC, checksum(reader))}
data = {"checksum": checksum(NOJUMP_MAGIC, checksum(reader))}
for d, mat in enumerate(matrixes):
data['shape'] = mat.shape
data["shape"] = mat.shape
for attr in CSR_ATTRS:
data['{}_{}'.format(attr, d)] = getattr(mat, attr)
data["{}_{}".format(attr, d)] = getattr(mat, attr)
np.savez(nojump_save_filename(reader), **data)
@ -209,23 +220,25 @@ def load_nojump_matrixes(reader):
data = np.load(zipname, allow_pickle=True)
except (AttributeError, BadZipFile, OSError):
# npz-files can be corrupted, propably a bug for big arrays saved with savez_compressed?
logger.info('Removing zip-File: %s', zipname)
logger.info("Removing zip-File: %s", zipname)
os.remove(nojump_load_filename(reader))
return
try:
if data['checksum'] == checksum(NOJUMP_MAGIC, checksum(reader)):
if data["checksum"] == checksum(NOJUMP_MAGIC, checksum(reader)):
reader.nojump_matrixes = tuple(
sparse.csr_matrix(
tuple(data['{}_{}'.format(attr, d)] for attr in CSR_ATTRS),
shape=data['shape']
tuple(data["{}_{}".format(attr, d)] for attr in CSR_ATTRS),
shape=data["shape"],
)
for d in range(3)
)
logger.info('Loaded Nojump Matrixes: {}'.format(nojump_load_filename(reader)))
logger.info(
"Loaded Nojump Matrixes: {}".format(nojump_load_filename(reader))
)
else:
logger.info('Invlaid Nojump Data: {}'.format(nojump_load_filename(reader)))
logger.info("Invlaid Nojump Data: {}".format(nojump_load_filename(reader)))
except KeyError:
logger.info('Removing zip-File: %s', zipname)
logger.info("Removing zip-File: %s", zipname)
os.remove(nojump_load_filename(reader))
return
@ -242,18 +255,28 @@ def correct_nojump_matrixes_for_whole(trajectory):
def energy_reader(file, energies=None):
"""Reads an gromacs energy file and output the data in a pandas DataFrame.
Args:
file: Filename of the energy file
energies (opt.): Specify energies to extract from the energy file
Args:
file: Filename of the energy file
energies (opt.): Specify energies to extract from the energy file
"""
if energies is None:
energies = np.arange(1, 100).astype('str')
energies = np.arange(1, 100).astype("str")
directory = file.rsplit("/", 1)[0]
ps = subprocess.Popen(("echo", *energies), stdout=subprocess.PIPE)
try:
subprocess.run(("gmx", "energy", "-f", file, "-o",
f"{directory}/tmp.xvg", "-quiet", "-nobackup"),
stdin=ps.stdout)
subprocess.run(
(
"gmx",
"energy",
"-f",
file,
"-o",
f"{directory}/tmp.xvg",
"-quiet",
"-nobackup",
),
stdin=ps.stdout,
)
except FileNotFoundError:
print("No GROMACS found!")
ps.wait()
@ -271,9 +294,9 @@ def energy_reader(file, energies=None):
data = np.loadtxt(f"{directory}/tmp.xvg", skiprows=header)
df = pd.DataFrame({"Time":data[:,0]})
df = pd.DataFrame({"Time": data[:, 0]})
for i, label in enumerate(labels):
tmp_df = pd.DataFrame({label:data[:,i+1]})
tmp_df = pd.DataFrame({label: data[:, i + 1]})
df = pd.concat([df, tmp_df], axis=1)
subprocess.run(("rm", f"{directory}/tmp.xvg"))
return df
@ -289,7 +312,7 @@ class BaseReader:
@property
def nojump_matrixes(self):
if self._nojump_matrixes is None:
raise NojumpError('Nojump Data not available: {}'.format(self.filename))
raise NojumpError("Nojump Data not available: {}".format(self.filename))
return self._nojump_matrixes
@nojump_matrixes.setter
@ -314,12 +337,12 @@ class BaseReader:
return len(self.rd)
def __checksum__(self):
if hasattr(self.rd, 'cache'):
if hasattr(self.rd, "cache"):
# Has an pygmx reader
return checksum(self.filename, str(self.rd.cache))
elif hasattr(self.rd, '_xdr'):
elif hasattr(self.rd, "_xdr"):
# Has an mdanalysis reader
cache = array('L', self.rd._xdr.offsets.tobytes())
cache = array("L", self.rd._xdr.offsets.tobytes())
return checksum(self.filename, str(cache))
@ -353,7 +376,6 @@ class CachedReader(BaseReader):
class DelayedReader(BaseReader):
@property
def filename(self):
if self.rd is not None:
@ -376,4 +398,3 @@ class DelayedReader(BaseReader):
def __getitem__(self, frame):
return self._get_item(frame)

View File

@ -33,14 +33,18 @@ def five_point_stencil(xdata, ydata):
See: https://en.wikipedia.org/wiki/Five-point_stencil
"""
return xdata[2:-2], (
(-ydata[4:] + 8 * ydata[3:-1] - 8 * ydata[1:-3] + ydata[:-4]) /
(3 * (xdata[4:] - xdata[:-4]))
(-ydata[4:] + 8 * ydata[3:-1] - 8 * ydata[1:-3] + ydata[:-4])
/ (3 * (xdata[4:] - xdata[:-4]))
)
def filon_fourier_transformation(time, correlation,
frequencies=None, derivative='linear', imag=True,
):
def filon_fourier_transformation(
time,
correlation,
frequencies=None,
derivative="linear",
imag=True,
):
"""
Fourier-transformation for slow varrying functions. The filon algorithmus is
described in detail in ref [Blochowicz]_, ch. 3.2.3.
@ -68,17 +72,15 @@ def filon_fourier_transformation(time, correlation,
"""
if frequencies is None:
f_min = 1 / max(time)
f_max = 0.05**(1.2 - max(correlation)) / min(time[time > 0])
frequencies = 2 * np.pi * np.logspace(
np.log10(f_min), np.log10(f_max), num=60
)
f_max = 0.05 ** (1.2 - max(correlation)) / min(time[time > 0])
frequencies = 2 * np.pi * np.logspace(np.log10(f_min), np.log10(f_max), num=60)
frequencies.reshape(1, -1)
if derivative == 'linear':
if derivative == "linear":
derivative = (np.diff(correlation) / np.diff(time)).reshape(-1, 1)
elif derivative == 'stencil':
elif derivative == "stencil":
_, derivative = five_point_stencil(time, correlation)
time = ((time[2:-1] * time[1:-2])**.5).reshape(-1, 1)
time = ((time[2:-1] * time[1:-2]) ** 0.5).reshape(-1, 1)
derivative = derivative.reshape(-1, 1)
elif np.iterable(derivative) and len(time) is len(derivative):
derivative.reshape(-1, 1)
@ -88,14 +90,29 @@ def filon_fourier_transformation(time, correlation,
)
time = time.reshape(-1, 1)
integral = (np.cos(frequencies * time[1:]) - np.cos(frequencies * time[:-1])) / frequencies**2
integral = (
np.cos(frequencies * time[1:]) - np.cos(frequencies * time[:-1])
) / frequencies**2
fourier = (derivative * integral).sum(axis=0)
if imag:
integral = 1j * (np.sin(frequencies * time[1:]) - np.sin(frequencies * time[:-1])) / frequencies**2
fourier = fourier + (derivative * integral).sum(axis=0) + 1j * correlation[0] / frequencies
integral = (
1j
* (np.sin(frequencies * time[1:]) - np.sin(frequencies * time[:-1]))
/ frequencies**2
)
fourier = (
fourier
+ (derivative * integral).sum(axis=0)
+ 1j * correlation[0] / frequencies
)
return frequencies.reshape(-1,), fourier
return (
frequencies.reshape(
-1,
),
fourier,
)
def mask2indices(mask):
@ -127,22 +144,24 @@ def superpose(x1, y1, x2, y2, N=100, damping=1.0):
x_ol = np.logspace(
np.log10(max(x1[~reg1][0], x2[~reg2][0]) + 0.001),
np.log10(min(x1[~reg1][-1], x2[~reg2][-1]) - 0.001),
(sum(~reg1) + sum(~reg2)) / 2
(sum(~reg1) + sum(~reg2)) / 2,
)
def w(x):
A = x_ol.min()
B = x_ol.max()
return (np.log10(B / x) / np.log10(B / A))**damping
return (np.log10(B / x) / np.log10(B / A)) ** damping
xdata = np.concatenate((x1[reg1], x_ol, x2[reg2]))
y1_interp = interp1d(x1[~reg1], y1[~reg1])
y2_interp = interp1d(x2[~reg2], y2[~reg2])
ydata = np.concatenate((
y1[x1 < x2.min()],
w(x_ol) * y1_interp(x_ol) + (1 - w(x_ol)) * y2_interp(x_ol),
y2[x2 > x1.max()]
))
ydata = np.concatenate(
(
y1[x1 < x2.min()],
w(x_ol) * y1_interp(x_ol) + (1 - w(x_ol)) * y2_interp(x_ol),
y2[x2 > x1.max()],
)
)
return xdata, ydata
@ -157,9 +176,10 @@ def runningmean(data, nav):
Returns:
Array of shape (N-(nav-1), )
"""
return np.convolve(data, np.ones((nav,)) / nav, mode='valid')
return np.convolve(data, np.ones((nav,)) / nav, mode="valid")
def moving_average(A,n=3):
def moving_average(A, n=3):
"""
Compute the running mean of an array.
Uses the second axis if it is of higher dimensionality.
@ -174,15 +194,15 @@ def moving_average(A,n=3):
Supports 2D-Arrays.
Slower than runningmean for small n but faster for large n.
"""
k1 = int(n/2)
k2 = int((n-1)/2)
k1 = int(n / 2)
k2 = int((n - 1) / 2)
if k2 == 0:
if A.ndim > 1:
return uniform_filter1d(A,n)[:,k1:]
return uniform_filter1d(A,n)[k1:]
return uniform_filter1d(A, n)[:, k1:]
return uniform_filter1d(A, n)[k1:]
if A.ndim > 1:
return uniform_filter1d(A,n)[:,k1:-k2]
return uniform_filter1d(A,n)[k1:-k2]
return uniform_filter1d(A, n)[:, k1:-k2]
return uniform_filter1d(A, n)[k1:-k2]
def coherent_sum(func, coord_a, coord_b):
@ -235,7 +255,9 @@ def coherent_histogram(func, coord_a, coord_b, bins, distinct=False):
if isinstance(func, FunctionType):
func = numba.jit(func, nopython=True, cache=True)
assert np.isclose(np.diff(bins).mean(), np.diff(bins)).all(), 'A regular distribution of bins is required.'
assert np.isclose(
np.diff(bins).mean(), np.diff(bins)
).all(), "A regular distribution of bins is required."
hmin = bins[0]
hmax = bins[-1]
N = len(bins) - 1
@ -297,11 +319,11 @@ def Fqt_from_Grt(data, q):
if isinstance(data, pd.DataFrame):
df = data.copy()
else:
df = pd.DataFrame(data, columns=['r', 'time', 'G'])
df['isf'] = df['G'] * np.sinc(q / np.pi * df['r'])
isf = df.groupby('time')['isf'].sum()
df = pd.DataFrame(data, columns=["r", "time", "G"])
df["isf"] = df["G"] * np.sinc(q / np.pi * df["r"])
isf = df.groupby("time")["isf"].sum()
if isinstance(data, pd.DataFrame):
return pd.DataFrame({'time': isf.index, 'isf': isf.values, 'q': q})
return pd.DataFrame({"time": isf.index, "isf": isf.values, "q": q})
else:
return isf.index, isf.values
@ -312,6 +334,7 @@ def singledispatchmethod(func):
def wrapper(*args, **kw):
return dispatcher.dispatch(args[1].__class__)(*args, **kw)
wrapper.register = dispatcher.register
functools.update_wrapper(wrapper, func)
return wrapper
@ -323,7 +346,7 @@ def histogram(data, bins):
dx = dbins.mean()
if bins.min() == 0 and dbins.std() < 1e-6:
logger.debug("Using numpy.bincount for histogramm compuation.")
hist = np.bincount((data // dx).astype(int), minlength=len(dbins))[:len(dbins)]
hist = np.bincount((data // dx).astype(int), minlength=len(dbins))[: len(dbins)]
else:
hist = np.histogram(data, bins=bins)[0]
@ -341,20 +364,22 @@ def quick1etau(t, C, n=7):
n is the minimum number of points around 1/e required
"""
# first rough estimate, the closest time. This is returned if the interpolation fails!
tau_est = t[np.argmin(np.fabs(C-np.exp(-1)))]
tau_est = t[np.argmin(np.fabs(C - np.exp(-1)))]
# reduce the data to points around 1/e
k = 0.1
mask = (C < np.exp(-1)+k) & (C > np.exp(-1)-k)
mask = (C < np.exp(-1) + k) & (C > np.exp(-1) - k)
while np.sum(mask) < n:
k += 0.01
mask = (C < np.exp(-1)+k) & (C > np.exp(-1)-k)
mask = (C < np.exp(-1) + k) & (C > np.exp(-1) - k)
if k + np.exp(-1) > 1.0:
break
# if enough points are found, try a curve fit, else and in case of failing keep using the estimate
if np.sum(mask) >= n:
try:
with np.errstate(invalid='ignore'):
fit, _ = curve_fit(kww, t[mask], C[mask], p0=[0.9, tau_est, 0.9], maxfev=100000)
with np.errstate(invalid="ignore"):
fit, _ = curve_fit(
kww, t[mask], C[mask], p0=[0.9, tau_est, 0.9], maxfev=100000
)
tau_est = kww_1e(*fit)
except:
pass

View File

@ -2,26 +2,23 @@ from setuptools import setup
def get_version(module):
version = ''
version = ""
with open(module) as f:
for line in f:
if '__version__' in line:
version = line.split('=')[-1].strip("' \n\t")
if "__version__" in line:
version = line.split("=")[-1].strip("' \n\t")
break
return version
setup(
name='mdevaluate',
description='Collection of python utilities for md simulations',
author_email='niels.mueller@physik.tu-darmstadt.de',
packages=['mdevaluate',],
entry_points={
'console_scripts': [
'index-xtc = mdevaluate.cli:run'
]
},
version='23.4',
requires=['numpy', 'scipy'],
name="mdevaluate",
description="Collection of python utilities for md simulations",
author_email="niels.mueller@physik.tu-darmstadt.de",
packages=[
"mdevaluate",
],
entry_points={"console_scripts": ["index-xtc = mdevaluate.cli:run"]},
version="23.6",
requires=["numpy", "scipy"],
)