Added type hints, refactored and deleted some functions

This commit is contained in:
Sebastian Kloth 2023-12-28 14:10:16 +01:00
parent f2806ca3ca
commit 608cdb12eb
2 changed files with 86 additions and 106 deletions

View File

@ -13,6 +13,7 @@ from . import pbc
from . import autosave from . import autosave
from . import reader from . import reader
from . import system from . import system
from . import utils
from .extra import free_energy_landscape, chill from .extra import free_energy_landscape, chill
from .logging import logger from .logging import logger

View File

@ -4,20 +4,21 @@ Collection of utility functions.
import functools import functools
from time import time as pytime from time import time as pytime
from subprocess import run from subprocess import run
from types import FunctionType from typing import Callable, Optional, Union
import numpy as np import numpy as np
from numpy.typing import ArrayLike, NDArray
import pandas as pd import pandas as pd
from .functions import kww, kww_1e
from scipy.ndimage import uniform_filter1d from scipy.ndimage import uniform_filter1d
from scipy.interpolate import interp1d from scipy.interpolate import interp1d
from scipy.optimize import curve_fit from scipy.optimize import curve_fit
from .logging import logger from .logging import logger
from .functions import kww, kww_1e
def five_point_stencil(xdata, ydata): def five_point_stencil(xdata: ArrayLike, ydata: ArrayLike) -> ArrayLike:
""" """
Calculate the derivative dy/dx with a five point stencil. Calculate the derivative dy/dx with a five point stencil.
This algorith is only valid for equally distributed x values. This algorith is only valid for equally distributed x values.
@ -42,28 +43,28 @@ def five_point_stencil(xdata, ydata):
def filon_fourier_transformation( def filon_fourier_transformation(
time, time: NDArray,
correlation, correlation: NDArray,
frequencies=None, frequencies: Optional[NDArray] = None,
derivative="linear", derivative: Union[str, NDArray] = "linear",
imag=True, imag: bool = True,
): ) -> tuple[NDArray, NDArray]:
""" """
Fourier-transformation for slow varrying functions. The filon algorithmus is Fourier-transformation for slow varying functions. The filon algorithm is
described in detail in ref [Blochowicz]_, ch. 3.2.3. described in detail in ref [Blochowicz]_, ch. 3.2.3.
Args: Args:
time: List of times where the correlation function was sampled. time: List of times when the correlation function was sampled.
correlation: Values of the correlation function. correlation: Values of the correlation function.
frequencies (opt.): frequencies (opt.):
List of frequencies where the fourier transformation will be calculated. List of frequencies where the fourier transformation will be calculated.
If None the frequencies will be choosen based on the input times. If None the frequencies will be chosen based on the input times.
derivative (opt.): derivative (opt.):
Approximation algorithmus for the derivative of the correlation function. Approximation algorithm for the derivative of the correlation function.
Possible values are: 'linear', 'stencil' or a list of derivatives. Possible values are: 'linear', 'stencil' or a list of derivatives.
imag (opt.): If imaginary part of the integral should be calculated. imag (opt.): If imaginary part of the integral should be calculated.
If frequencies are not explicitly given they will be evenly placed on a log scale If frequencies are not explicitly given, they will be evenly placed on a log scale
in the interval [1/tmax, 0.1/tmin] where tmin and tmax are the smallest respectively in the interval [1/tmax, 0.1/tmin] where tmin and tmax are the smallest respectively
the biggest time (greater than 0) of the provided times. The frequencies are cut off the biggest time (greater than 0) of the provided times. The frequencies are cut off
at high values by one decade, since the fourier transformation deviates quite at high values by one decade, since the fourier transformation deviates quite
@ -85,7 +86,7 @@ def filon_fourier_transformation(
_, derivative = five_point_stencil(time, correlation) _, derivative = five_point_stencil(time, correlation)
time = ((time[2:-1] * time[1:-2]) ** 0.5).reshape(-1, 1) time = ((time[2:-1] * time[1:-2]) ** 0.5).reshape(-1, 1)
derivative = derivative.reshape(-1, 1) derivative = derivative.reshape(-1, 1)
elif np.iterable(derivative) and len(time) is len(derivative): elif isinstance(derivative, NDArray) and len(time) is len(derivative):
derivative.reshape(-1, 1) derivative.reshape(-1, 1)
else: else:
raise NotImplementedError( raise NotImplementedError(
@ -111,15 +112,12 @@ def filon_fourier_transformation(
+ 1j * correlation[0] / frequencies + 1j * correlation[0] / frequencies
) )
return ( return frequencies.reshape(-1), fourier
frequencies.reshape(
-1,
),
fourier,
)
def superpose(x1, y1, x2, y2, N=100, damping=1.0): def superpose(
x1: NDArray, y1: NDArray, x2: NDArray, y2: NDArray, damping: float = 1.0
) -> tuple[NDArray, NDArray]:
if x2[0] == 0: if x2[0] == 0:
x2 = x2[1:] x2 = x2[1:]
y2 = y2[1:] y2 = y2[1:]
@ -127,12 +125,12 @@ def superpose(x1, y1, x2, y2, N=100, damping=1.0):
reg1 = x1 < x2[0] reg1 = x1 < x2[0]
reg2 = x2 > x1[-1] reg2 = x2 > x1[-1]
x_ol = np.logspace( x_ol = np.logspace(
np.log10(max(x1[~reg1][0], x2[~reg2][0]) + 0.001), np.log10(np.max(x1[~reg1][0], x2[~reg2][0]) + 0.001),
np.log10(min(x1[~reg1][-1], x2[~reg2][-1]) - 0.001), np.log10(np.min(x1[~reg1][-1], x2[~reg2][-1]) - 0.001),
(sum(~reg1) + sum(~reg2)) / 2, (np.sum(~reg1) + np.sum(~reg2)) / 2,
) )
def w(x): def w(x: NDArray) -> NDArray:
A = x_ol.min() A = x_ol.min()
B = x_ol.max() B = x_ol.max()
return (np.log10(B / x) / np.log10(B / A)) ** damping return (np.log10(B / x) / np.log10(B / A)) ** damping
@ -150,21 +148,7 @@ def superpose(x1, y1, x2, y2, N=100, damping=1.0):
return xdata, ydata return xdata, ydata
def runningmean(data, nav): def moving_average(data: NDArray, n: int = 3) -> NDArray:
"""
Compute the running mean of a 1-dimenional array.
Args:
data: Input data of shape (N, )
nav: Number of points over which the data will be averaged
Returns:
Array of shape (N-(nav-1), )
"""
return np.convolve(data, np.ones((nav,)) / nav, mode="valid")
def moving_average(A, n=3):
""" """
Compute the running mean of an array. Compute the running mean of an array.
Uses the second axis if it is of higher dimensionality. Uses the second axis if it is of higher dimensionality.
@ -177,27 +161,30 @@ def moving_average(A, n=3):
Array of shape (N-(n-1), ) Array of shape (N-(n-1), )
Supports 2D-Arrays. Supports 2D-Arrays.
Slower than runningmean for small n but faster for large n.
""" """
k1 = int(n / 2) k1 = int(n / 2)
k2 = int((n - 1) / 2) k2 = int((n - 1) / 2)
if k2 == 0: if k2 == 0:
if A.ndim > 1: if data.ndim > 1:
return uniform_filter1d(A, n)[:, k1:] return uniform_filter1d(data, n)[:, k1:]
return uniform_filter1d(A, n)[k1:] return uniform_filter1d(data, n)[k1:]
if A.ndim > 1: if data.ndim > 1:
return uniform_filter1d(A, n)[:, k1:-k2] return uniform_filter1d(data, n)[:, k1:-k2]
return uniform_filter1d(A, n)[k1:-k2] return uniform_filter1d(data, n)[k1:-k2]
def coherent_sum(func, coord_a, coord_b): def coherent_sum(
func: Callable[[ArrayLike, ArrayLike], float],
coord_a: ArrayLike,
coord_b: ArrayLike,
) -> float:
""" """
Perform a coherent sum over two arrays :math:`A, B`. Perform a coherent sum over two arrays :math:`A, B`.
.. math:: .. math::
\\frac{1}{N_A N_B}\\sum_i\\sum_j f(A_i, B_j) \\frac{1}{N_A N_B}\\sum_i\\sum_j f(A_i, B_j)
For numpy arrays this is equal to:: For numpy arrays, this is equal to::
N, d = x.shape N, d = x.shape
M, d = y.shape M, d = y.shape
@ -206,24 +193,27 @@ def coherent_sum(func, coord_a, coord_b):
Args: Args:
func: The function is called for each two items in both arrays, this should func: The function is called for each two items in both arrays, this should
return a scalar value. return a scalar value.
coord_a, coord_b: The two arrays. coord_a: First array.
coord_b: Second array.
""" """
res = 0
def cohsum(coord_a, coord_b): for i in range(len(coord_a)):
res = 0 for j in range(len(coord_b)):
for i in range(len(coord_a)): res += func(coord_a[i], coord_b[j])
for j in range(len(coord_b)): return res
res += func(coord_a[i], coord_b[j])
return res
return cohsum(coord_a, coord_b)
def coherent_histogram(func, coord_a, coord_b, bins, distinct=False): def coherent_histogram(
func: Callable[[ArrayLike, ArrayLike], float],
coord_a: ArrayLike,
coord_b: ArrayLike,
bins: ArrayLike,
distinct: bool = False,
) -> NDArray:
""" """
Compute a coherent histogram over two arrays, equivalent to coherent_sum. Compute a coherent histogram over two arrays, equivalent to coherent_sum.
For numpy arrays ofthis is equal to:: For numpy arrays, this is equal to::
N, d = x.shape N, d = x.shape
M, d = y.shape M, d = y.shape
@ -235,9 +225,11 @@ def coherent_histogram(func, coord_a, coord_b, bins, distinct=False):
Args: Args:
func: The function is called for each two items in both arrays, this should func: The function is called for each two items in both arrays, this should
return a scalar value. return a scalar value.
coord_a, coord_b: The two arrays. coord_a: First array.
bins: The bins used for the histogram must be distributed regular on a linear coord_b: Second array.
bins: The bins used for the histogram must be distributed regularly on a linear
scale. scale.
distinct: Only calculate distinct part.
""" """
assert np.isclose( assert np.isclose(
@ -248,32 +240,29 @@ def coherent_histogram(func, coord_a, coord_b, bins, distinct=False):
N = len(bins) - 1 N = len(bins) - 1
dh = (hmax - hmin) / N dh = (hmax - hmin) / N
def cohsum(coord_a, coord_b): res = np.zeros((N,))
res = np.zeros((N,)) for i in range(len(coord_a)):
for i in range(len(coord_a)): for j in range(len(coord_b)):
for j in range(len(coord_b)): if not (distinct and i == j):
if not (distinct and i == j): h = func(coord_a[i], coord_b[j])
h = func(coord_a[i], coord_b[j]) if hmin <= h < hmax:
if hmin <= h < hmax: res[int((h - hmin) / dh)] += 1
res[int((h - hmin) / dh)] += 1 return res
return res
return cohsum(coord_a, coord_b)
def Sq_from_gr(r, gr, q, ρ): def Sq_from_gr(r: NDArray, gr: NDArray, q: NDArray, n: float) -> NDArray:
r""" r"""
Compute the static structure factor as fourier transform of the pair correlation Compute the static structure factor as fourier transform of the pair correlation
function. [Yarnell]_ function. [Yarnell]_
.. math:: .. math::
S(q) - 1 = \\frac{4\\pi \\rho}{q}\\int\\limits_0^\\infty (g(r) - 1)\\,r \\sin(qr) dr S(q)-1 = \\frac{4\\pi\\rho}{q}\\int\\limits_0^\\infty (g(r)-1)\\,r \\sin(qr) dr
Args: Args:
r: Radii of the pair correlation function r: Radii of the pair correlation function
gr: Values of the pair correlation function gr: Values of the pair correlation function
q: List of q values q: List of q values
ρ: Average number density n: Average number density
.. [Yarnell] .. [Yarnell]
Yarnell, J. L., Katz, M. J., Wenzel, R. G., & Koenig, S. H. (1973). Physical Yarnell, J. L., Katz, M. J., Wenzel, R. G., & Koenig, S. H. (1973). Physical
@ -282,10 +271,12 @@ def Sq_from_gr(r, gr, q, ρ):
""" """
ydata = ((gr - 1) * r).reshape(-1, 1) * np.sin(r.reshape(-1, 1) * q.reshape(1, -1)) ydata = ((gr - 1) * r).reshape(-1, 1) * np.sin(r.reshape(-1, 1) * q.reshape(1, -1))
return np.trapz(x=r, y=ydata, axis=0) * (4 * np.pi * ρ / q) + 1 return np.trapz(x=r, y=ydata, axis=0) * (4 * np.pi * n / q) + 1
def Fqt_from_Grt(data, q): def Fqt_from_Grt(
data: Union[pd.DataFrame, ArrayLike], q: ArrayLike
) -> Union[pd.DataFrame, tuple[NDArray, NDArray]]:
""" """
Calculate the ISF from the van Hove function for a given q value by fourier Calculate the ISF from the van Hove function for a given q value by fourier
transform. transform.
@ -317,7 +308,7 @@ def Fqt_from_Grt(data, q):
return isf.index, isf.values return isf.index, isf.values
def singledispatchmethod(func): def singledispatchmethod(func: Callable) -> Callable:
""" """
A decorator to define a genric instance method, analogue to A decorator to define a genric instance method, analogue to
functools.singledispatch. functools.singledispatch.
@ -332,22 +323,7 @@ def singledispatchmethod(func):
return wrapper return wrapper
def histogram(data, bins): def quick1etau(t: ArrayLike, C: ArrayLike, n: int = 7) -> float:
"""
Compute the histogram of the given data. Uses numpy.bincount function, if possible.
"""
dbins = np.diff(bins)
dx = dbins.mean()
if bins.min() == 0 and dbins.std() < 1e-6:
logger.debug("Using numpy.bincount for histogramm compuation.")
hist = np.bincount((data // dx).astype(int), minlength=len(dbins))[: len(dbins)]
else:
hist = np.histogram(data, bins=bins)[0]
return hist, runningmean(bins, 2)
def quick1etau(t, C, n=7):
""" """
Estimate the time for a correlation function that goes from 1 to 0 to decay to 1/e. Estimate the time for a correlation function that goes from 1 to 0 to decay to 1/e.
@ -381,15 +357,15 @@ def quick1etau(t, C, n=7):
return tau_est return tau_est
def susceptibility(time, correlation, **kwargs): def susceptibility(
time: NDArray, correlation: NDArray, **kwargs
) -> tuple[NDArray, NDArray]:
""" """
Calculate the susceptibility of a correlation function. Calculate the susceptibility of a correlation function.
Args: Args:
time: Timesteps of the correlation data time: Timesteps of the correlation data
correlation: Value of the correlation function correlation: Value of the correlation function
**kwargs (opt.):
Additional keyword arguments will be passed to :func:`filon_fourier_transformation`.
""" """
frequencies, fourier = filon_fourier_transformation( frequencies, fourier = filon_fourier_transformation(
time, correlation, imag=False, **kwargs time, correlation, imag=False, **kwargs
@ -397,7 +373,7 @@ def susceptibility(time, correlation, **kwargs):
return frequencies, frequencies * fourier return frequencies, frequencies * fourier
def read_gro(file): def read_gro(file: str) -> tuple[pd.DataFrame, NDArray, str]:
with open(file, "r") as f: with open(file, "r") as f:
lines = f.readlines() lines = f.readlines()
description = lines[0].splitlines()[0] description = lines[0].splitlines()[0]
@ -438,7 +414,9 @@ def read_gro(file):
return atoms_DF, box, description return atoms_DF, box, description
def write_gro(file, atoms_DF, box, description): def write_gro(
file: str, atoms_DF: pd.DataFrame, box: NDArray, description: str
) -> None:
with open(file, "w") as f: with open(file, "w") as f:
f.write(f"{description} \n") f.write(f"{description} \n")
f.write(f"{len(atoms_DF)}\n") f.write(f"{len(atoms_DF)}\n")
@ -456,7 +434,7 @@ def write_gro(file, atoms_DF, box, description):
) )
def fibonacci_sphere(samples=1000): def fibonacci_sphere(samples: int = 1000) -> NDArray:
points = [] points = []
phi = np.pi * (np.sqrt(5.0) - 1.0) # golden angle in radians phi = np.pi * (np.sqrt(5.0) - 1.0) # golden angle in radians
@ -471,7 +449,7 @@ def fibonacci_sphere(samples=1000):
return np.array(points) return np.array(points)
def timing(function): def timing(function: Callable) -> Callable:
@functools.wraps(function) @functools.wraps(function)
def wrap(*args, **kw): def wrap(*args, **kw):
start_time = pytime() start_time = pytime()
@ -483,7 +461,8 @@ def timing(function):
return wrap return wrap
def cleanup_h5(hdf5_file) -> None:
def cleanup_h5(hdf5_file: str) -> None:
hdf5_temp_file = f"{hdf5_file[:-3]}_temp.h5" hdf5_temp_file = f"{hdf5_file[:-3]}_temp.h5"
run( run(
[ [
@ -496,4 +475,4 @@ def cleanup_h5(hdf5_file) -> None:
hdf5_temp_file, hdf5_temp_file,
] ]
) )
run(["mv", hdf5_temp_file, hdf5_file]) run(["mv", hdf5_temp_file, hdf5_file])