Changed gr to work with time_average and use next_neighbors

This commit is contained in:
Sebastian Kloth 2023-12-27 12:43:05 +01:00
parent cc3768925a
commit bffcd56cdc

View File

@ -1,4 +1,4 @@
from typing import Callable, Optional from typing import Callable, Optional, Union
import numpy as np import numpy as np
from numpy.typing import ArrayLike from numpy.typing import ArrayLike
@ -16,7 +16,6 @@ from .coordinates import (
from .autosave import autosave_data from .autosave import autosave_data
from .utils import runningmean from .utils import runningmean
from .pbc import pbc_diff, pbc_points from .pbc import pbc_diff, pbc_points
from .logging import logger
@autosave_data(nargs=2, kwargs_keys=("coordinates_b",)) @autosave_data(nargs=2, kwargs_keys=("coordinates_b",))
@ -33,7 +32,7 @@ def time_average(
Args: Args:
function: function:
The function that will be averaged, it has to accept exactly one argument The function that will be averaged, it has to accept exactly one argument
which is the current atom set which is the current atom set (or two if coordinates_b is provided)
coordinates: The coordinates object of the simulation coordinates: The coordinates object of the simulation
coordinates_b: Additional coordinates object of the simulation coordinates_b: Additional coordinates object of the simulation
skip: skip:
@ -54,44 +53,13 @@ def time_average(
return np.mean(result, axis=0) return np.mean(result, axis=0)
def time_histogram(function, coordinates, bins, hist_range, pool=None): def gr(
coordinate_iter = iter(coordinates) atoms_a: CoordinateFrame,
atoms_b: Optional[CoordinateFrame] = None,
if pool is not None: bins: Union[int, ArrayLike] = 100,
_map = pool.imap
else:
_map = map
evaluated = _map(function, coordinate_iter)
results = []
hist_results = []
for num, ev in enumerate(evaluated):
results.append(ev)
if num % 100 == 0 and num > 0:
print(num)
r = np.array(results).T
for i, row in enumerate(r):
histo, _ = np.histogram(row, bins=bins, range=hist_range)
if len(hist_results) <= i:
hist_results.append(histo)
else:
hist_results[i] += histo
results = []
return hist_results
def calc_gr(
traj_a: Coordinates,
traj_b: Coordinates = None,
r_max: float = None,
delta_r: float = 0.02,
segments: int = 1000,
skip: float = 0.1,
remove_intra: bool = False, remove_intra: bool = False,
shear: bool = False, **kwargs
): ) -> np.ndarray:
r""" r"""
Compute the radial pair distribution of one or two sets of atoms. Compute the radial pair distribution of one or two sets of atoms.
@ -99,98 +67,67 @@ def calc_gr(
g_{AB}(r) = \frac{1}{\langle \rho_B\rangle N_A}\sum\limits_{i\in A}^{N_A} g_{AB}(r) = \frac{1}{\langle \rho_B\rangle N_A}\sum\limits_{i\in A}^{N_A}
\sum\limits_{j\in B}^{N_B}\frac{\delta(r_{ij} -r)}{4\pi r^2} \sum\limits_{j\in B}^{N_B}\frac{\delta(r_{ij} -r)}{4\pi r^2}
For use with :func:`time_average`, define bins through the use of :func:`~functools.partial`, For use with :func:`time_average`, define bins through the use of
the atom sets are passed to :func:`time_average`, if a second set of atoms should be used :func:`~functools.partial`, the atom sets are passed to :func:`time_average`, if a
specify it as ``coordinates_b`` and it will be passed to this function. second set of atoms should be used specify it as ``coordinates_b`` and it will be
passed to this function.
Args: Args:
atoms_a: First set of atoms, used internally atoms_a: First set of atoms, used internally
atoms_b (opt.): Second set of atoms, used internally atoms_b (opt.): Second set of atoms, used internal
bins: Bins of the radial distribution function bins: Bins of the radial distribution function
box (opt.): Simulations box, if not specified this is taken from ``atoms_a.box`` remove_intra: removes contributions from intra molecular pairs
kind (opt.): Can be 'inter', 'intra' or None (default).
chunksize (opt.):
For large systems (N > 1000) the distaces have to be computed in chunks so the arrays
fit into memory, this parameter controlls the size of these chunks. It should be
as large as possible, depending on the available memory.
returnx (opt.): If True the x ordinate of the histogram is returned.
""" """
distinct = True
if atoms_b is None:
atoms_b = atoms_a
distinct = False
elif np.array_equal(atoms_a, atoms_b):
distinct = False
def gr_frame( box = atoms_b.box
atoms_a: CoordinateFrame, if isinstance(bins, int):
atoms_b: CoordinateFrame, upper_bound = np.min(np.diag(box))
bins: ArrayLike,
remove_intra: bool = False,
):
box = atoms_b.box
n = len(atoms_a) / np.prod(np.diag(box))
V = 4 / 3 * np.pi * bins[-1] ** 3
particles_in_volume = int(n * V * 1.1)
if np.all(np.diag(np.diag(box)) == box):
atoms_b = atoms_b % np.diag(box)
atoms_b_res_ids = atoms_b.residue_ids
atoms_b_tree = KDTree(atoms_b, boxsize=np.diag(box))
else:
atoms_b_pbc, atoms_b_pbc_index = pbc_points(
atoms_b, box, thickness=bins[-1] + 0.1, index=True, shear=shear
)
atoms_b_res_ids = atoms_b.residue_ids[atoms_b_pbc_index]
atoms_b_tree = KDTree(atoms_b_pbc)
distances, distances_index = atoms_b_tree.query(
atoms_a, particles_in_volume, distance_upper_bound=bins[-1] + 0.1
)
if np.array_equal(atoms_a, atoms_b):
distances = distances[:, 1:]
distances_index = distances_index[:, 1:]
if remove_intra:
new_distances = []
for entry in list(zip(atoms_a.residue_ids, distances, distances_index)):
mask = entry[1] < np.inf
new_distances.append(
entry[1][mask][atoms_b_res_ids[entry[2][mask]] != entry[0]]
)
distances = np.concatenate(new_distances)
else:
distances = distances.flatten()
hist = np.histogram(distances, bins=bins, range=(0, bins[-1]), density=False)[0]
gr = hist / len(atoms_a)
gr = gr / (4 / 3 * np.pi * bins[1:] ** 3 - 4 / 3 * np.pi * bins[:-1] ** 3)
n = len(atoms_b) / np.prod(np.diag(atoms_b.box))
gr = gr / n
return gr, n
if traj_b is None:
traj_b = traj_a
start_frame = traj_a[int(len(traj_a) * skip)]
if r_max:
upper_bound = r_max
else: else:
upper_bound = round(np.min(np.diag(start_frame.box)) / 2 - 0.05, 1) upper_bound = bins[-1]
num_steps = int(upper_bound * (1 / delta_r) + 1) n = len(atoms_a) / np.prod(np.diag(box))
bins = np.linspace(0, upper_bound, num_steps) V = 4 / 3 * np.pi * bins[-1] ** 3
r = bins[1:] - (bins[1] - bins[0]) / 2 particles_in_volume = int(n * V * 1.1)
frame_indices = np.unique( distances, indices = next_neighbors(
np.int_(np.linspace(len(traj_a) * skip, len(traj_a) - 1, num=segments)) atoms_a,
atoms_b,
number_of_neighbors=particles_in_volume,
distance_upper_bound=upper_bound,
distinct=distinct,
**kwargs
) )
gr = []
n = [] if remove_intra:
for frame_index in frame_indices: new_distances = []
result = gr_frame( for entry in list(zip(atoms_a.residue_ids, distances, indices)):
traj_a[frame_index], traj_b[frame_index], bins, remove_intra=remove_intra mask = entry[1] < np.inf
) new_distances.append(
gr.append(result[0]) entry[1][mask][atoms_b.residue_ids[entry[2][mask]] != entry[0]]
n.append(result[1]) )
gr = np.mean(gr, axis=0) distances = np.concatenate(new_distances)
n = np.mean(n, axis=0) else:
return r, gr, n distances = distances.flatten()
hist, bins = np.histogram(
distances, bins=bins, range=(0, upper_bound), density=False
)
hist = hist / len(atoms_a)
hist = hist / (4 / 3 * np.pi * bins[1:] ** 3 - 4 / 3 * np.pi * bins[:-1] ** 3)
n = len(atoms_b) / np.prod(np.diag(atoms_b.box))
hist = hist / n
return hist
def distance_distribution(atoms, bins): def distance_distribution(
atoms: ArrayLike, bins: Optional[int, ArrayLike]
) -> np.ndarray:
connection_vectors = atoms[:-1, :] - atoms[1:, :] connection_vectors = atoms[:-1, :] - atoms[1:, :]
connection_lengths = (connection_vectors**2).sum(axis=1) ** 0.5 connection_lengths = (connection_vectors**2).sum(axis=1) ** 0.5
return np.histogram(connection_lengths, bins)[0] return np.histogram(connection_lengths, bins)[0]