From bffcd56cdcacdf8e93c67d7a27cff7bca4d9ce8d Mon Sep 17 00:00:00 2001 From: Sebastian Kloth Date: Wed, 27 Dec 2023 12:43:05 +0100 Subject: [PATCH] Changed gr to work with time_average and use next_neighbors --- src/mdevaluate/distribution.py | 179 +++++++++++---------------------- 1 file changed, 58 insertions(+), 121 deletions(-) diff --git a/src/mdevaluate/distribution.py b/src/mdevaluate/distribution.py index 7f0cd90..85f1e70 100644 --- a/src/mdevaluate/distribution.py +++ b/src/mdevaluate/distribution.py @@ -1,4 +1,4 @@ -from typing import Callable, Optional +from typing import Callable, Optional, Union import numpy as np from numpy.typing import ArrayLike @@ -16,7 +16,6 @@ from .coordinates import ( from .autosave import autosave_data from .utils import runningmean from .pbc import pbc_diff, pbc_points -from .logging import logger @autosave_data(nargs=2, kwargs_keys=("coordinates_b",)) @@ -33,7 +32,7 @@ def time_average( Args: function: The function that will be averaged, it has to accept exactly one argument - which is the current atom set + which is the current atom set (or two if coordinates_b is provided) coordinates: The coordinates object of the simulation coordinates_b: Additional coordinates object of the simulation skip: @@ -54,44 +53,13 @@ def time_average( return np.mean(result, axis=0) -def time_histogram(function, coordinates, bins, hist_range, pool=None): - coordinate_iter = iter(coordinates) - - if pool is not None: - _map = pool.imap - else: - _map = map - - evaluated = _map(function, coordinate_iter) - - results = [] - hist_results = [] - for num, ev in enumerate(evaluated): - results.append(ev) - - if num % 100 == 0 and num > 0: - print(num) - r = np.array(results).T - for i, row in enumerate(r): - histo, _ = np.histogram(row, bins=bins, range=hist_range) - if len(hist_results) <= i: - hist_results.append(histo) - else: - hist_results[i] += histo - results = [] - return hist_results - - -def calc_gr( - traj_a: Coordinates, - traj_b: Coordinates = None, - r_max: float = None, - delta_r: float = 0.02, - segments: int = 1000, - skip: float = 0.1, +def gr( + atoms_a: CoordinateFrame, + atoms_b: Optional[CoordinateFrame] = None, + bins: Union[int, ArrayLike] = 100, remove_intra: bool = False, - shear: bool = False, -): + **kwargs +) -> np.ndarray: r""" Compute the radial pair distribution of one or two sets of atoms. @@ -99,98 +67,67 @@ def calc_gr( g_{AB}(r) = \frac{1}{\langle \rho_B\rangle N_A}\sum\limits_{i\in A}^{N_A} \sum\limits_{j\in B}^{N_B}\frac{\delta(r_{ij} -r)}{4\pi r^2} - For use with :func:`time_average`, define bins through the use of :func:`~functools.partial`, - the atom sets are passed to :func:`time_average`, if a second set of atoms should be used - specify it as ``coordinates_b`` and it will be passed to this function. + For use with :func:`time_average`, define bins through the use of + :func:`~functools.partial`, the atom sets are passed to :func:`time_average`, if a + second set of atoms should be used specify it as ``coordinates_b`` and it will be + passed to this function. Args: atoms_a: First set of atoms, used internally - atoms_b (opt.): Second set of atoms, used internally + atoms_b (opt.): Second set of atoms, used internal bins: Bins of the radial distribution function - box (opt.): Simulations box, if not specified this is taken from ``atoms_a.box`` - kind (opt.): Can be 'inter', 'intra' or None (default). - chunksize (opt.): - For large systems (N > 1000) the distaces have to be computed in chunks so the arrays - fit into memory, this parameter controlls the size of these chunks. It should be - as large as possible, depending on the available memory. - returnx (opt.): If True the x ordinate of the histogram is returned. + remove_intra: removes contributions from intra molecular pairs """ + distinct = True + if atoms_b is None: + atoms_b = atoms_a + distinct = False + elif np.array_equal(atoms_a, atoms_b): + distinct = False - def gr_frame( - atoms_a: CoordinateFrame, - atoms_b: CoordinateFrame, - bins: ArrayLike, - remove_intra: bool = False, - ): - box = atoms_b.box - n = len(atoms_a) / np.prod(np.diag(box)) - V = 4 / 3 * np.pi * bins[-1] ** 3 - particles_in_volume = int(n * V * 1.1) - if np.all(np.diag(np.diag(box)) == box): - atoms_b = atoms_b % np.diag(box) - atoms_b_res_ids = atoms_b.residue_ids - atoms_b_tree = KDTree(atoms_b, boxsize=np.diag(box)) - else: - atoms_b_pbc, atoms_b_pbc_index = pbc_points( - atoms_b, box, thickness=bins[-1] + 0.1, index=True, shear=shear - ) - atoms_b_res_ids = atoms_b.residue_ids[atoms_b_pbc_index] - atoms_b_tree = KDTree(atoms_b_pbc) - distances, distances_index = atoms_b_tree.query( - atoms_a, particles_in_volume, distance_upper_bound=bins[-1] + 0.1 - ) - if np.array_equal(atoms_a, atoms_b): - distances = distances[:, 1:] - distances_index = distances_index[:, 1:] - - if remove_intra: - new_distances = [] - for entry in list(zip(atoms_a.residue_ids, distances, distances_index)): - mask = entry[1] < np.inf - new_distances.append( - entry[1][mask][atoms_b_res_ids[entry[2][mask]] != entry[0]] - ) - distances = np.concatenate(new_distances) - else: - distances = distances.flatten() - - hist = np.histogram(distances, bins=bins, range=(0, bins[-1]), density=False)[0] - gr = hist / len(atoms_a) - gr = gr / (4 / 3 * np.pi * bins[1:] ** 3 - 4 / 3 * np.pi * bins[:-1] ** 3) - n = len(atoms_b) / np.prod(np.diag(atoms_b.box)) - gr = gr / n - - return gr, n - - if traj_b is None: - traj_b = traj_a - - start_frame = traj_a[int(len(traj_a) * skip)] - if r_max: - upper_bound = r_max + box = atoms_b.box + if isinstance(bins, int): + upper_bound = np.min(np.diag(box)) else: - upper_bound = round(np.min(np.diag(start_frame.box)) / 2 - 0.05, 1) + upper_bound = bins[-1] - num_steps = int(upper_bound * (1 / delta_r) + 1) - bins = np.linspace(0, upper_bound, num_steps) - r = bins[1:] - (bins[1] - bins[0]) / 2 - frame_indices = np.unique( - np.int_(np.linspace(len(traj_a) * skip, len(traj_a) - 1, num=segments)) + n = len(atoms_a) / np.prod(np.diag(box)) + V = 4 / 3 * np.pi * bins[-1] ** 3 + particles_in_volume = int(n * V * 1.1) + distances, indices = next_neighbors( + atoms_a, + atoms_b, + number_of_neighbors=particles_in_volume, + distance_upper_bound=upper_bound, + distinct=distinct, + **kwargs ) - gr = [] - n = [] - for frame_index in frame_indices: - result = gr_frame( - traj_a[frame_index], traj_b[frame_index], bins, remove_intra=remove_intra - ) - gr.append(result[0]) - n.append(result[1]) - gr = np.mean(gr, axis=0) - n = np.mean(n, axis=0) - return r, gr, n + + if remove_intra: + new_distances = [] + for entry in list(zip(atoms_a.residue_ids, distances, indices)): + mask = entry[1] < np.inf + new_distances.append( + entry[1][mask][atoms_b.residue_ids[entry[2][mask]] != entry[0]] + ) + distances = np.concatenate(new_distances) + else: + distances = distances.flatten() + + hist, bins = np.histogram( + distances, bins=bins, range=(0, upper_bound), density=False + ) + hist = hist / len(atoms_a) + hist = hist / (4 / 3 * np.pi * bins[1:] ** 3 - 4 / 3 * np.pi * bins[:-1] ** 3) + n = len(atoms_b) / np.prod(np.diag(atoms_b.box)) + hist = hist / n + + return hist -def distance_distribution(atoms, bins): +def distance_distribution( + atoms: ArrayLike, bins: Optional[int, ArrayLike] +) -> np.ndarray: connection_vectors = atoms[:-1, :] - atoms[1:, :] connection_lengths = (connection_vectors**2).sum(axis=1) ** 0.5 return np.histogram(connection_lengths, bins)[0]