diff --git a/src/mdevaluate/extra/free_energy_landscape.py b/src/mdevaluate/extra/free_energy_landscape.py index af558fa..5190d5c 100644 --- a/src/mdevaluate/extra/free_energy_landscape.py +++ b/src/mdevaluate/extra/free_energy_landscape.py @@ -1,4 +1,5 @@ from functools import partial +from typing import Optional import numpy as np from numpy.typing import ArrayLike, NDArray @@ -11,6 +12,41 @@ import multiprocessing as mp from ..coordinates import Coordinates +def _pbc_points_reduced( + coordinates: ArrayLike, + pore_geometry: str, + box: Optional[NDArray] = None, + thickness: Optional[float] = None, +) -> tuple[NDArray, NDArray]: + if box is None: + box = coordinates.box + if pore_geometry == "cylindrical": + grid = np.array([[i, j, k] for k in [-1, 0, 1] for j in [0] for i in [0]]) + indices = np.tile(np.arange(len(coordinates)), 3) + elif pore_geometry == "slit": + grid = np.array( + [[i, j, k] for k in [0] for j in [1, 0, -1] for i in [-1, 0, 1]] + ) + indices = np.tile(np.arange(len(coordinates)), 9) + else: + raise ValueError( + f"pore_geometry is {pore_geometry}, should either be " + f"'cylindrical' or 'slit'" + ) + coordinates_pbc = np.concatenate([coordinates + v @ box for v in grid], axis=0) + size = np.diag(box) + + if thickness is not None: + mask = np.all(coordinates_pbc > -thickness, axis=1) + coordinates_pbc = coordinates_pbc[mask] + indices = indices[mask] + mask = np.all(coordinates_pbc < size + thickness, axis=1) + coordinates_pbc = coordinates_pbc[mask] + indices = indices[mask] + + return coordinates_pbc, indices + + def occupation_matrix( trajectory: Coordinates, edge_length: float = 0.05, @@ -72,15 +108,24 @@ def _calc_histogram( def find_maxima( - occupation_df: pd.DataFrame, box: ArrayLike, edge_length: float = 0.05 + occupation_df: pd.DataFrame, box: ArrayLike, radius: float, pore_geometry: str ) -> pd.DataFrame: maxima_df = occupation_df.copy() maxima_df["maxima"] = None points = np.array(maxima_df[["x", "y", "z"]]) - tree = KDTree(points, boxsize=box) - all_neighbors = tree.query_ball_point( - points, edge_length * 3 ** (1 / 2) + edge_length / 100 - ) + if np.all(np.diag(np.diag(box)) == box): + tree = KDTree(points, boxsize=box) + all_neighbors = tree.query_ball_point(points, radius) + else: + points_pbc, points_pbc_index = _pbc_points_reduced( + points, + pore_geometry, + box, + thickness=radius + 0.01, + ) + tree = KDTree(points_pbc) + all_neighbors = tree.query_ball_point(points, radius) + all_neighbors = points_pbc_index[all_neighbors] for i in range(len(maxima_df)): if maxima_df.loc[i, "maxima"] is not None: continue @@ -104,23 +149,48 @@ def _calc_energies( maxima_df: pd.DataFrame, bins: ArrayLike, box: NDArray, + pore_geometry: str, T: float, + nodes: int = 8, ) -> NDArray: points = np.array(maxima_df[["x", "y", "z"]]) - tree = KDTree(points, boxsize=box) + if np.all(np.diag(np.diag(box)) == box): + tree = KDTree(points, boxsize=box) + else: + points_pbc, points_pbc_index = _pbc_points_reduced( + points, + pore_geometry, + box, + thickness=bins[-1] + 0.01, + ) + tree = KDTree(points_pbc) maxima = maxima_df.loc[maxima_indices, ["x", "y", "z"]] maxima_occupations = np.array(maxima_df.loc[maxima_indices, "occupation"]) num_of_neighbors = np.max( tree.query_ball_point(maxima, bins[-1], return_length=True) ) - distances, indices = tree.query( - maxima, k=num_of_neighbors, distance_upper_bound=bins[-1] - ) + split_maxima = [] + for i in range(0, len(maxima), 1000): + split_maxima.append(maxima[i : i + 1000]) + + distances = [] + indices = [] + for maxima in split_maxima: + distances_step, indices_step = tree.query( + maxima, k=num_of_neighbors, distance_upper_bound=bins[-1], workers=nodes + ) + distances.append(distances_step) + indices.append(indices_step) + distances = np.concatenate(distances) + indices = np.concatenate(indices) all_energy_hist = [] all_occupied_bins_hist = [] if distances.ndim == 1: current_distances = distances[1:][distances[1:] <= bins[-1]] - current_indices = indices[1:][distances[1:] <= bins[-1]] + if np.all(np.diag(np.diag(box)) == box): + current_indices = indices[1:][distances[1:] <= bins[-1]] + else: + current_indices = points_pbc_index[indices[1:][distances[1:] <= bins[-1]]] energy = ( -np.log(maxima_df.loc[current_indices, "occupation"] / maxima_occupations) * T @@ -131,8 +201,12 @@ def _calc_energies( return result for i, maxima_occupation in enumerate(maxima_occupations): current_distances = distances[i, 1:][distances[i, 1:] <= bins[-1]] - current_indices = indices[i, 1:][distances[i, 1:] <= bins[-1]] - + if np.all(np.diag(np.diag(box)) == box): + current_indices = indices[i, 1:][distances[i, 1:] <= bins[-1]] + else: + current_indices = points_pbc_index[ + indices[i, 1:][distances[i, 1:] <= bins[-1]] + ] energy = ( -np.log(maxima_df.loc[current_indices, "occupation"] / maxima_occupation) * T @@ -168,9 +242,12 @@ def distance_resolved_energies( distance_bins: ArrayLike, r_bins: ArrayLike, box: NDArray, + pore_geometry: str, temperature: float, + nodes: int = 8, ) -> pd.DataFrame: results = [] + distances = [] for i in range(len(distance_bins) - 1): maxima_indices = np.array( maxima_df.index[ @@ -179,11 +256,22 @@ def distance_resolved_energies( * (maxima_df["maxima"] == True) ] ) - results.append( - _calc_energies(maxima_indices, maxima_df, r_bins, box, temperature) - ) + try: + results.append( + _calc_energies( + maxima_indices, + maxima_df, + r_bins, + box, + pore_geometry, + temperature, + nodes, + ) + ) + distances.append((distance_bins[i] + distance_bins[i + 1]) / 2) + except ValueError: + pass - distances = (distance_bins[:-1] + distance_bins[1:]) / 2 radii = (r_bins[:-1] + r_bins[1:]) / 2 d = np.array([d for d in distances for r in radii]) r = np.array([r for d in distances for r in radii]) diff --git a/test/test_free_energy_landscape.py b/test/test_free_energy_landscape.py index 9835b23..a964078 100644 --- a/test/test_free_energy_landscape.py +++ b/test/test_free_energy_landscape.py @@ -39,15 +39,21 @@ def test_get_fel(trajectory): OW = trajectory.subset(atom_name="OW") - box = np.diag(trajectory[0].box) - box_voxels = (box // [0.05, 0.05, 0.05] + [1, 1, 1]) * [0.05, 0.05, 0.05] + box = trajectory[0].box + box_voxels = (np.diag(box) // [0.05, 0.05, 0.05] + [1, 1, 1]) * [0.05, 0.05, 0.05] occupation_matrix = fel.occupation_matrix(OW, skip=0, segments=1000) - maxima_matrix = fel.find_maxima(occupation_matrix, box=box_voxels, edge_length=0.05) - maxima_matrix = fel.add_distances(maxima_matrix, "cylindrical", box / 2) - r_bins = np.arange(0, 2, 0.02) + radius_maxima = 0.05 * 3 ** (1 / 2) + 0.05 / 100 + maxima_matrix = fel.find_maxima( + occupation_matrix, + box=box_voxels, + radius=radius_maxima, + pore_geometry="cylindrical" + ) + maxima_matrix = fel.add_distances(maxima_matrix, "cylindrical", np.diag(box) / 2) + r_bins = np.arange(0, 1, 0.02) distance_bins = np.arange(0.05, 2.05, 0.1) energy_df = fel.distance_resolved_energies( - maxima_matrix, distance_bins, r_bins, box, 225 + maxima_matrix, distance_bins, r_bins, box, "cylindrical", 225 ) result = fel.find_energy_maxima(energy_df, r_min=0.05, r_max=0.15)