From 4f17cfe87631dabb5cfce91102350aec9629de9e Mon Sep 17 00:00:00 2001 From: Sebastian Kloth Date: Fri, 5 Jan 2024 16:36:34 +0100 Subject: [PATCH] Added type hints and removed number_of_next_neighbors --- src/mdevaluate/extra/chill.py | 138 +++++++++++++++++++++++++++++++--- 1 file changed, 129 insertions(+), 9 deletions(-) diff --git a/src/mdevaluate/extra/chill.py b/src/mdevaluate/extra/chill.py index dfb0bc5..f626915 100644 --- a/src/mdevaluate/extra/chill.py +++ b/src/mdevaluate/extra/chill.py @@ -1,9 +1,17 @@ +from typing import Tuple, Callable + import numpy as np +from numpy.typing import ArrayLike, NDArray +import pandas as pd +from scipy import sparse from scipy.spatial import KDTree from scipy.special import sph_harm +from mdevaluate.coordinates import CoordinateFrame, Coordinates +from mdevaluate.pbc import pbc_points -def a_ij(atoms, N=4, l=3): + +def a_ij(atoms: ArrayLike, N: int = 4, l: int = 3) -> tuple[NDArray, NDArray]: tree = KDTree(atoms) dist, indices = tree.query(atoms, N + 1) @@ -30,13 +38,9 @@ def a_ij(atoms, N=4, l=3): return np.real(aij), indices -def number_of_neighbors(atoms): - tree = KDTree(atoms) - dist, _ = tree.query(atoms, 10, distance_upper_bound=0.35) - return np.array([len(distance[distance < 0.4]) - 1 for distance in dist]) - - -def classify_ice(aij, indices, neighbors, indexSOL): +def classify_ice( + aij: NDArray, indices: NDArray, neighbors: NDArray, indexSOL: NDArray +) -> NDArray: staggerdBonds = np.sum(aij <= -0.8, axis=1) eclipsedBonds = np.sum((aij >= -0.35) & (aij <= 0.25), axis=1) @@ -67,7 +71,7 @@ def classify_ice(aij, indices, neighbors, indexSOL): return iceTypes -def ice_parts(iceTypes): +def count_ice_types(iceTypes: NDArray) -> NDArray: cubic = len(iceTypes[iceTypes == 0]) hexagonal = len(iceTypes[iceTypes == 1]) interface = len(iceTypes[iceTypes == 2]) @@ -77,3 +81,119 @@ def ice_parts(iceTypes): return np.array( [cubic, hexagonal, interface, clathrate, clathrate_interface, liquid] ) + + +def selector_ice( + start_frame: CoordinateFrame, + traj: Coordinates, + chosen_ice_types: ArrayLike, + combined: bool = True, +) -> NDArray: + oxygen = traj.subset(atom_name="OW") + atoms = oxygen[start_frame.step] + atoms = atoms % np.diag(atoms.box) + atoms_PBC = pbc_points(atoms, thickness=1) + aij, indices = a_ij(atoms_PBC) + tree = KDTree(atoms_PBC) + neighbors = tree.query_ball_point(atoms_PBC, 0.35, return_length=True) + index_SOL = atoms_PBC.tolist().index(atoms[0].tolist()) + index_SOL = np.arange(index_SOL, index_SOL + len(atoms)) + ice_Types = classify_ice(aij, indices, neighbors, index_SOL) + index = [] + if combined is True: + for i, ice_Type in enumerate(ice_Types): + if ice_Type in chosen_ice_types: + index.append(i) + else: + for entry in chosen_ice_types: + index_entry = [] + for i, ice_Type in enumerate(ice_Types): + if ice_Type == entry: + index_entry.append(i) + index.append(np.array(index_entry)) + return np.array(index) + + +def ice_types(trajectory: Coordinates, segments: int = 10000) -> pd.DataFrame: + def ice_types_distribution(frame: CoordinateFrame, selector: Callable) -> NDArray: + atoms_PBC = pbc_points(frame, thickness=1) + aij, indices = a_ij(atoms_PBC) + tree = KDTree(atoms_PBC) + neighbors = tree.query_ball_point(atoms_PBC, 0.35, return_length=True) + index = selector(frame, atoms_PBC) + ice_types_data = classify_ice(aij, indices, neighbors, index) + ice_parts_data = count_ice_types(ice_types_data) + return ice_parts_data + + def selector(frame: CoordinateFrame, atoms_PBC: ArrayLike) -> NDArray: + atoms_SOL = traj.subset(residue_name="SOL")[frame.step] + index = atoms_PBC.tolist().index(atoms_SOL[0].tolist()) + index = np.arange(index, index + len(atoms_SOL)) + return np.array(index) + + traj = trajectory.subset(atom_name="OW") + + frame_indices = np.unique(np.int_(np.linspace(0, len(traj) - 1, num=segments))) + + result = np.array( + [ + [ + traj[frame_index].time, + *ice_types_distribution(traj[frame_index], selector), + ] + for frame_index in frame_indices + ] + ) + + return pd.DataFrame( + { + "time": result[:, 0], + "cubic": result[:, 1], + "hexagonal": result[:, 2], + "ice_interface": result[:, 3], + "clathrate": result[:, 4], + "clathrate_interface": result[:, 5], + "liquid": result[:, 6], + } + ) + + +def ice_clusters_traj( + traj: Coordinates, segments: int = 10000, skip: float = 0.1 +) -> list: + def ice_clusters(frame: CoordinateFrame, traj: Coordinates) -> Tuple[float, list]: + selection = selector_ice(frame, traj, [0, 1, 2]) + if len(selection) == 0: + return frame.time, [] + else: + ice = frame[selection] + ice_PBC, indices_PBC = pbc_points( + ice, box=frame.box, thickness=0.5, index=True + ) + ice_tree = KDTree(ice_PBC) + ice_matrix = ice_tree.sparse_distance_matrix( + ice_tree, 0.35, output_type="ndarray" + ) + new_ice_matrix = np.zeros((len(ice), len(ice))) + for entry in ice_matrix: + if entry[2] > 0: + new_ice_matrix[indices_PBC[entry[0]], indices_PBC[entry[1]]] = 1 + n_components, labels = sparse.csgraph.connected_components( + new_ice_matrix, directed=False + ) + clusters = [] + selection = np.array(selection) + for i in range(0, np.max(labels) + 1): + if len(ice[labels == i]) > 1: + clusters.append( + list(zip(selection[labels == i], ice[labels == i].tolist())) + ) + return frame.time, clusters + + frame_indices = np.unique( + np.int_(np.linspace(len(traj) * skip, len(traj) - 1, num=segments)) + ) + all_clusters = [ + ice_clusters(traj[frame_index], traj) for frame_index in frame_indices + ] + return all_clusters