From 608cdb12ebaf0ad00d43c75b9c2da9ba25c045cd Mon Sep 17 00:00:00 2001 From: Sebastian Kloth Date: Thu, 28 Dec 2023 14:10:16 +0100 Subject: [PATCH] Added type hints, refactored and deleted some functions --- src/mdevaluate/__init__.py | 1 + src/mdevaluate/utils.py | 191 +++++++++++++++++-------------------- 2 files changed, 86 insertions(+), 106 deletions(-) diff --git a/src/mdevaluate/__init__.py b/src/mdevaluate/__init__.py index 2b8ef14..2687fda 100644 --- a/src/mdevaluate/__init__.py +++ b/src/mdevaluate/__init__.py @@ -13,6 +13,7 @@ from . import pbc from . import autosave from . import reader from . import system +from . import utils from .extra import free_energy_landscape, chill from .logging import logger diff --git a/src/mdevaluate/utils.py b/src/mdevaluate/utils.py index 38334db..cf21285 100644 --- a/src/mdevaluate/utils.py +++ b/src/mdevaluate/utils.py @@ -4,20 +4,21 @@ Collection of utility functions. import functools from time import time as pytime from subprocess import run -from types import FunctionType +from typing import Callable, Optional, Union import numpy as np +from numpy.typing import ArrayLike, NDArray import pandas as pd -from .functions import kww, kww_1e from scipy.ndimage import uniform_filter1d from scipy.interpolate import interp1d from scipy.optimize import curve_fit from .logging import logger +from .functions import kww, kww_1e -def five_point_stencil(xdata, ydata): +def five_point_stencil(xdata: ArrayLike, ydata: ArrayLike) -> ArrayLike: """ Calculate the derivative dy/dx with a five point stencil. This algorith is only valid for equally distributed x values. @@ -42,28 +43,28 @@ def five_point_stencil(xdata, ydata): def filon_fourier_transformation( - time, - correlation, - frequencies=None, - derivative="linear", - imag=True, -): + time: NDArray, + correlation: NDArray, + frequencies: Optional[NDArray] = None, + derivative: Union[str, NDArray] = "linear", + imag: bool = True, +) -> tuple[NDArray, NDArray]: """ - Fourier-transformation for slow varrying functions. The filon algorithmus is + Fourier-transformation for slow varying functions. The filon algorithm is described in detail in ref [Blochowicz]_, ch. 3.2.3. Args: - time: List of times where the correlation function was sampled. + time: List of times when the correlation function was sampled. correlation: Values of the correlation function. frequencies (opt.): List of frequencies where the fourier transformation will be calculated. - If None the frequencies will be choosen based on the input times. + If None the frequencies will be chosen based on the input times. derivative (opt.): - Approximation algorithmus for the derivative of the correlation function. + Approximation algorithm for the derivative of the correlation function. Possible values are: 'linear', 'stencil' or a list of derivatives. imag (opt.): If imaginary part of the integral should be calculated. - If frequencies are not explicitly given they will be evenly placed on a log scale + If frequencies are not explicitly given, they will be evenly placed on a log scale in the interval [1/tmax, 0.1/tmin] where tmin and tmax are the smallest respectively the biggest time (greater than 0) of the provided times. The frequencies are cut off at high values by one decade, since the fourier transformation deviates quite @@ -85,7 +86,7 @@ def filon_fourier_transformation( _, derivative = five_point_stencil(time, correlation) time = ((time[2:-1] * time[1:-2]) ** 0.5).reshape(-1, 1) derivative = derivative.reshape(-1, 1) - elif np.iterable(derivative) and len(time) is len(derivative): + elif isinstance(derivative, NDArray) and len(time) is len(derivative): derivative.reshape(-1, 1) else: raise NotImplementedError( @@ -111,15 +112,12 @@ def filon_fourier_transformation( + 1j * correlation[0] / frequencies ) - return ( - frequencies.reshape( - -1, - ), - fourier, - ) + return frequencies.reshape(-1), fourier -def superpose(x1, y1, x2, y2, N=100, damping=1.0): +def superpose( + x1: NDArray, y1: NDArray, x2: NDArray, y2: NDArray, damping: float = 1.0 +) -> tuple[NDArray, NDArray]: if x2[0] == 0: x2 = x2[1:] y2 = y2[1:] @@ -127,12 +125,12 @@ def superpose(x1, y1, x2, y2, N=100, damping=1.0): reg1 = x1 < x2[0] reg2 = x2 > x1[-1] x_ol = np.logspace( - np.log10(max(x1[~reg1][0], x2[~reg2][0]) + 0.001), - np.log10(min(x1[~reg1][-1], x2[~reg2][-1]) - 0.001), - (sum(~reg1) + sum(~reg2)) / 2, + np.log10(np.max(x1[~reg1][0], x2[~reg2][0]) + 0.001), + np.log10(np.min(x1[~reg1][-1], x2[~reg2][-1]) - 0.001), + (np.sum(~reg1) + np.sum(~reg2)) / 2, ) - def w(x): + def w(x: NDArray) -> NDArray: A = x_ol.min() B = x_ol.max() return (np.log10(B / x) / np.log10(B / A)) ** damping @@ -150,21 +148,7 @@ def superpose(x1, y1, x2, y2, N=100, damping=1.0): return xdata, ydata -def runningmean(data, nav): - """ - Compute the running mean of a 1-dimenional array. - - Args: - data: Input data of shape (N, ) - nav: Number of points over which the data will be averaged - - Returns: - Array of shape (N-(nav-1), ) - """ - return np.convolve(data, np.ones((nav,)) / nav, mode="valid") - - -def moving_average(A, n=3): +def moving_average(data: NDArray, n: int = 3) -> NDArray: """ Compute the running mean of an array. Uses the second axis if it is of higher dimensionality. @@ -177,27 +161,30 @@ def moving_average(A, n=3): Array of shape (N-(n-1), ) Supports 2D-Arrays. - Slower than runningmean for small n but faster for large n. """ k1 = int(n / 2) k2 = int((n - 1) / 2) if k2 == 0: - if A.ndim > 1: - return uniform_filter1d(A, n)[:, k1:] - return uniform_filter1d(A, n)[k1:] - if A.ndim > 1: - return uniform_filter1d(A, n)[:, k1:-k2] - return uniform_filter1d(A, n)[k1:-k2] + if data.ndim > 1: + return uniform_filter1d(data, n)[:, k1:] + return uniform_filter1d(data, n)[k1:] + if data.ndim > 1: + return uniform_filter1d(data, n)[:, k1:-k2] + return uniform_filter1d(data, n)[k1:-k2] -def coherent_sum(func, coord_a, coord_b): +def coherent_sum( + func: Callable[[ArrayLike, ArrayLike], float], + coord_a: ArrayLike, + coord_b: ArrayLike, +) -> float: """ Perform a coherent sum over two arrays :math:`A, B`. .. math:: \\frac{1}{N_A N_B}\\sum_i\\sum_j f(A_i, B_j) - For numpy arrays this is equal to:: + For numpy arrays, this is equal to:: N, d = x.shape M, d = y.shape @@ -206,24 +193,27 @@ def coherent_sum(func, coord_a, coord_b): Args: func: The function is called for each two items in both arrays, this should return a scalar value. - coord_a, coord_b: The two arrays. + coord_a: First array. + coord_b: Second array. """ - - def cohsum(coord_a, coord_b): - res = 0 - for i in range(len(coord_a)): - for j in range(len(coord_b)): - res += func(coord_a[i], coord_b[j]) - return res - - return cohsum(coord_a, coord_b) + res = 0 + for i in range(len(coord_a)): + for j in range(len(coord_b)): + res += func(coord_a[i], coord_b[j]) + return res -def coherent_histogram(func, coord_a, coord_b, bins, distinct=False): +def coherent_histogram( + func: Callable[[ArrayLike, ArrayLike], float], + coord_a: ArrayLike, + coord_b: ArrayLike, + bins: ArrayLike, + distinct: bool = False, +) -> NDArray: """ Compute a coherent histogram over two arrays, equivalent to coherent_sum. - For numpy arrays ofthis is equal to:: + For numpy arrays, this is equal to:: N, d = x.shape M, d = y.shape @@ -235,9 +225,11 @@ def coherent_histogram(func, coord_a, coord_b, bins, distinct=False): Args: func: The function is called for each two items in both arrays, this should return a scalar value. - coord_a, coord_b: The two arrays. - bins: The bins used for the histogram must be distributed regular on a linear + coord_a: First array. + coord_b: Second array. + bins: The bins used for the histogram must be distributed regularly on a linear scale. + distinct: Only calculate distinct part. """ assert np.isclose( @@ -248,32 +240,29 @@ def coherent_histogram(func, coord_a, coord_b, bins, distinct=False): N = len(bins) - 1 dh = (hmax - hmin) / N - def cohsum(coord_a, coord_b): - res = np.zeros((N,)) - for i in range(len(coord_a)): - for j in range(len(coord_b)): - if not (distinct and i == j): - h = func(coord_a[i], coord_b[j]) - if hmin <= h < hmax: - res[int((h - hmin) / dh)] += 1 - return res - - return cohsum(coord_a, coord_b) + res = np.zeros((N,)) + for i in range(len(coord_a)): + for j in range(len(coord_b)): + if not (distinct and i == j): + h = func(coord_a[i], coord_b[j]) + if hmin <= h < hmax: + res[int((h - hmin) / dh)] += 1 + return res -def Sq_from_gr(r, gr, q, ρ): +def Sq_from_gr(r: NDArray, gr: NDArray, q: NDArray, n: float) -> NDArray: r""" Compute the static structure factor as fourier transform of the pair correlation function. [Yarnell]_ .. math:: - S(q) - 1 = \\frac{4\\pi \\rho}{q}\\int\\limits_0^\\infty (g(r) - 1)\\,r \\sin(qr) dr + S(q)-1 = \\frac{4\\pi\\rho}{q}\\int\\limits_0^\\infty (g(r)-1)\\,r \\sin(qr) dr Args: r: Radii of the pair correlation function gr: Values of the pair correlation function q: List of q values - ρ: Average number density + n: Average number density .. [Yarnell] Yarnell, J. L., Katz, M. J., Wenzel, R. G., & Koenig, S. H. (1973). Physical @@ -282,10 +271,12 @@ def Sq_from_gr(r, gr, q, ρ): """ ydata = ((gr - 1) * r).reshape(-1, 1) * np.sin(r.reshape(-1, 1) * q.reshape(1, -1)) - return np.trapz(x=r, y=ydata, axis=0) * (4 * np.pi * ρ / q) + 1 + return np.trapz(x=r, y=ydata, axis=0) * (4 * np.pi * n / q) + 1 -def Fqt_from_Grt(data, q): +def Fqt_from_Grt( + data: Union[pd.DataFrame, ArrayLike], q: ArrayLike +) -> Union[pd.DataFrame, tuple[NDArray, NDArray]]: """ Calculate the ISF from the van Hove function for a given q value by fourier transform. @@ -317,7 +308,7 @@ def Fqt_from_Grt(data, q): return isf.index, isf.values -def singledispatchmethod(func): +def singledispatchmethod(func: Callable) -> Callable: """ A decorator to define a genric instance method, analogue to functools.singledispatch. @@ -332,22 +323,7 @@ def singledispatchmethod(func): return wrapper -def histogram(data, bins): - """ - Compute the histogram of the given data. Uses numpy.bincount function, if possible. - """ - dbins = np.diff(bins) - dx = dbins.mean() - if bins.min() == 0 and dbins.std() < 1e-6: - logger.debug("Using numpy.bincount for histogramm compuation.") - hist = np.bincount((data // dx).astype(int), minlength=len(dbins))[: len(dbins)] - else: - hist = np.histogram(data, bins=bins)[0] - - return hist, runningmean(bins, 2) - - -def quick1etau(t, C, n=7): +def quick1etau(t: ArrayLike, C: ArrayLike, n: int = 7) -> float: """ Estimate the time for a correlation function that goes from 1 to 0 to decay to 1/e. @@ -381,15 +357,15 @@ def quick1etau(t, C, n=7): return tau_est -def susceptibility(time, correlation, **kwargs): +def susceptibility( + time: NDArray, correlation: NDArray, **kwargs +) -> tuple[NDArray, NDArray]: """ Calculate the susceptibility of a correlation function. Args: time: Timesteps of the correlation data correlation: Value of the correlation function - **kwargs (opt.): - Additional keyword arguments will be passed to :func:`filon_fourier_transformation`. """ frequencies, fourier = filon_fourier_transformation( time, correlation, imag=False, **kwargs @@ -397,7 +373,7 @@ def susceptibility(time, correlation, **kwargs): return frequencies, frequencies * fourier -def read_gro(file): +def read_gro(file: str) -> tuple[pd.DataFrame, NDArray, str]: with open(file, "r") as f: lines = f.readlines() description = lines[0].splitlines()[0] @@ -438,7 +414,9 @@ def read_gro(file): return atoms_DF, box, description -def write_gro(file, atoms_DF, box, description): +def write_gro( + file: str, atoms_DF: pd.DataFrame, box: NDArray, description: str +) -> None: with open(file, "w") as f: f.write(f"{description} \n") f.write(f"{len(atoms_DF)}\n") @@ -456,7 +434,7 @@ def write_gro(file, atoms_DF, box, description): ) -def fibonacci_sphere(samples=1000): +def fibonacci_sphere(samples: int = 1000) -> NDArray: points = [] phi = np.pi * (np.sqrt(5.0) - 1.0) # golden angle in radians @@ -471,7 +449,7 @@ def fibonacci_sphere(samples=1000): return np.array(points) -def timing(function): +def timing(function: Callable) -> Callable: @functools.wraps(function) def wrap(*args, **kw): start_time = pytime() @@ -483,7 +461,8 @@ def timing(function): return wrap -def cleanup_h5(hdf5_file) -> None: + +def cleanup_h5(hdf5_file: str) -> None: hdf5_temp_file = f"{hdf5_file[:-3]}_temp.h5" run( [ @@ -496,4 +475,4 @@ def cleanup_h5(hdf5_file) -> None: hdf5_temp_file, ] ) - run(["mv", hdf5_temp_file, hdf5_file]) \ No newline at end of file + run(["mv", hdf5_temp_file, hdf5_file])