Files
python-store/store/analyse.py
2022-04-11 10:59:06 +02:00

276 lines
10 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from functools import partial
import numpy as np
import pandas as pd
from scipy.optimize import curve_fit
import mdevaluate as md
import store
from .utils import data_frame, traj_slice, set_correlation_defaults, collect_short, excess_entropy, enhanced_bins
def openNojump(path, maxcache=500):
trajectory = md.open(path, trajectory='nojump.xtc', cached=maxcache)
return trajectory
def savetxt_newer(filename, newData):
if not os.path.isfile(filename):
np.savetxt(filename, newData)
return
f = NamedTemporaryFile()
np.savetxt(f.name, newData)
if not filecmp.cmp(filename, f.name):
np.savetxt(filename, newData)
f.close()
return
def isf(trajectory, q, nojump=True, nav=3, outdir=None, **kwargs):
"""
Compute the incoherent intermediate scattering function and and meta analyses.
Meta analyses include variance in time, correlation time and susceptibility.
"""
kwargs = set_correlation_defaults(kwargs)
kwargs['average'] = False
description = 'isf'
if 'description' in kwargs:
description += '_' + kwargs.pop('description')
if nojump:
trajectory = trajectory.nojump
t, S = md.correlation.shifted_correlation(
partial(md.correlation.isf, q=q), trajectory,
description=description, **kwargs)
N = len(trajectory[0])
time = md.utils.runningmean(t[1:], nav)
χ4 = md.utils.runningmean(N * S.var(axis=0)[1:], nav)
S = S.mean(axis=0)
freq, suscept = md.correlation.susceptibility(t, S)
result = {
'isf': data_frame(time=t, cor=S),
'isf_var': data_frame(time=time, var=χ4),
'isf_sus': data_frame(freq=freq, sus=suscept)
}
try:
m = (S < 0.7)
tau = t[np.absolute(S - 1 / np.e).argmin()]
fit, _ = curve_fit(md.functions.kww, t[m], S[m], p0=(1, tau, 1), maxfev=5000)
tau_1e = md.functions.kww_1e(*fit)
result['isf_tau'] = tau_1e
except RuntimeError:
md.logger.info('Could not estimate tau: %s', trajectory)
return result
# wraps around isf, determines automatic
def isfwater(trajectory, storeparam, **kwargs):
kwargs['segments'] = 10000000 // len(trajectory[0])
t, S = md.correlation.shifted_correlation(partial(md.correlation.isf, q=22.7), trajectory,
segments=min(5, kwargs['segments'] // 50), window=0.5, skip=0.1,
average=True, description='rough')
mask = (S < 0.9) & (S > 0.1)
with np.errstate(invalid='ignore'):
fit, _ = curve_fit(md.functions.kww, t[mask], S[mask])
tau_est = md.functions.kww_1e(*fit)
time = trajectory[-1].time - trajectory[0].time
kwargs['window'] = max(min(500 * tau_est / time, 0.5), 0.01)
kwargs['skip'] = max(0.01, min(0.1, 2 * tau_est / time))
return isf(trajectory, storeparam, q=22.7, nojump=False, **kwargs)
def csf(trajectory, q, nav=3, **kwargs):
"""Compute the coherent intermediate scattering function and its variance."""
kwargs = set_correlation_defaults(kwargs)
kwargs['average'] = False
t, S = md.correlation.shifted_correlation(
partial(md.correlation.coherent_scattering_function, q=q),
trajectory.pbc,
**kwargs
)
csf_variance = data_frame(
time=md.utils.runningmean(t, nav),
χ4=md.utils.runningmean(S.var(axis=0), nav)
)
return {'csf': data_frame(time=t, csf=S.mean(axis=0)), 'csf_variance': csf_variance}
def msd(trajectory, **kwargs):
"""Compute the mean squared displacement."""
kwargs = set_correlation_defaults(kwargs)
kwargs.setdefault('average', True)
@collect_short
def calc(trajectory, **kwargs):
t, S = md.correlation.shifted_correlation(
md.correlation.msd,
trajectory.nojump,
**kwargs
)
return {'msd': data_frame(time=t, msd=S)}
res = calc(trajectory, **kwargs)
# Mean diffusivity, calculated over the last half-decade.
t = res['msd'].time.values
S = res['msd'].msd.values
m = t > (t.max() / 3.16)
D = (S[m] / (6 * t[m])).mean()
res['D'] = D
return res
def tetrahedral_order(trajectory, other=None, skip=0.1, nr_averages=500, normed=True, **kwargs):
if other is None:
other = trajectory
bins = np.arange(-3, 1, 0.01)
q = md.utils.runningmean(bins, 2)
sl = traj_slice(len(trajectory), skip, nr_averages)
res = md.distribution.time_average(
partial(md.distribution.tetrahedral_order_distribution, bins=bins),
trajectory.pbc[sl],
coordinates_b=other.pbc[sl],
**kwargs
)
N = len(trajectory[0]) if normed else 1
res /= N
tet_mean = data_frame(**{'<Q>': sum(res * q), 'S_Q': sum(res * np.log(1 - q))})
return {'tetrahedral_order': data_frame(q=q, distribution=res),
'tetrahedral_mean': tet_mean}
def vector(trajectory, atoms=None):
"""
Calcuate the vectors between two types of atoms in the trajectory.
The names of the atoms between which the vectors should be calculated can be either given
as a list of length 2, or the trajectory should contain excactly two atom types.
"""
if atoms is None:
atoms = sorted(list(set(trajectory.atom_subset.atom_names)))
assert len(atoms) == 2, 'Need to specifiy excactly two atom types for vector calculation.'
at_a, at_b = atoms
box = trajectory[0].box.diagonal()
atoms_a = trajectory.atom_subset.atom_names == at_a
atoms_b = trajectory.atom_subset.atom_names == at_b
return md.coordinates.vectors(trajectory, atoms_a=atoms_a, atoms_b=atoms_b, normed=True, box=box)
def water_dipole(trajectory):
"""Create a cordinates map of normed water dipole vectors."""
sel = trajectory.atom_subset.selection
atoms_a = np.where(trajectory.subset(atom_name='OW').atom_subset.selection[sel])[0]
atoms_b = np.where(trajectory.subset(atom_name='HW.').atom_subset.selection[sel])[0]
atoms_b = atoms_b.reshape(len(atoms_a), 2).T
box = trajectory[0].box.diagonal()
vec = md.coordinates.vectors(trajectory, atoms_a=atoms_a, atoms_b=atoms_b, normed=True, box=box)
vec.description = 'water-dipole'
return vec
def water_OH_bonds(trajectory):
"""Create a coordinates map of normed OH vectors for water molecules."""
sel = trajectory.atom_subset.selection
atoms_a = np.where(trajectory.subset(atom_name='OW').atom_subset.selection[sel])[0]
atoms_b = np.where(trajectory.subset(atom_name='HW.').atom_subset.selection[sel])[0]
atoms_a = np.vstack([atoms_a] * 2).T.reshape(atoms_b.shape)
box = trajectory[0].box.diagonal()
vec = md.coordinates.vectors(trajectory, atoms_a=atoms_a, atoms_b=atoms_b, normed=True, box=box)
vec.description = 'water-OH-bonds'
return vec
def oaf(trajectory, order, nav=1, **kwargs):
"""
Calculate the orientational autocorrelation function and meta analyses.
Use in conjunction with a coordinates map, like water_dipole, to map atom coordinates to the
desired vectors. Meta anaylsis include the functions variance in time, correlation time and
the susceptibility. Results are called F{order} (i.e. F1, F2) and accordingly.
Args:
trajectory: The correlated vectore, will be passed automatically in automatic evaluations.
order: Order of the used legendre polynomial, typically 1 or 2.
nav (opt.): Running average bevor calculating variance
**kwargs: Any keyowrd arguments are passed to the time_average function.
"""
kwargs = set_correlation_defaults(kwargs)
fdesc = 'F{}'.format(order)
description = fdesc
if 'description' in kwargs:
description += '_' + kwargs.pop('description')
# Collect short should apply to the basic compuations only,
# additional meta analyses, like tau and susceptibility, are computed from the result.
@collect_short
def calc_oaf(vectors, order, nav=1, **kwargs):
kwargs['average'] = False
t, F = md.correlation.shifted_correlation(
partial(md.correlation.rotational_autocorrelation, order=order),
vectors,
description=description,
**kwargs
)
result = {fdesc: data_frame(time=t, cor=F.mean(axis=0))}
Na = len(vectors[0])
result[fdesc + '_var'] = data_frame(
time=md.utils.runningmean(t, nav), var=md.utils.runningmean(Na * F.var(axis=0), nav)
)
return result
res = calc_oaf(trajectory, order, nav=nav, **kwargs)
t, F = res[fdesc].time.values, res[fdesc].cor.values
if F.min() < 0.3:
res[fdesc + '_tau'] = md.utils.quick1etau(t, F)
freq, sus = md.correlation.susceptibility(t, F)
res[fdesc + '_sus'] = data_frame(freq=freq, sus=sus)
return res
def rdf(trajectory, other=None, skip=0.01, nr_averages=100, q=None, kind=None, **kwargs):
"""
Compute radial pair-distribution function, static structure factor and pair entropy.
The quantities can be calculated for one ore two sets of atoms. The bins of g(r) are
determined automatically, with rmax = L/2 and variable bins with a maximal width of 0.01.
Args:
trajectory: First set of atoms
other (opt.): Second set of atoms.
skip (opt.): Part of the trajectory to skip at the beginning
nr_averages (opt.): How many averages should be taken
q (opt.): Scattering vectors of S(q). Default is 2π/(L/2) < q <2π/0.1, with Δq=0.2.
"""
bins = np.arange(0.0, trajectory[0].box.diagonal().min() / 2, 0.001)
r = md.utils.runningmean(bins, 2)
sl = traj_slice(len(trajectory), skip, nr_averages)
if other is not None:
kwargs['coordinates_b'] = other.pbc[sl]
res = md.distribution.time_average(
partial(md.distribution.rdf, bins=bins, kind=kind),
trajectory.pbc[sl],
**kwargs
)
L = trajectory[0].box.diagonal().min()
if q is None:
q = 2 * np.pi * np.arange(1 / L, 10, 0.5 / L)
Nb = len(other[0]) if other is not None else len(trajectory[0])
ρ = Nb / trajectory[0].volume
Sq = md.utils.Sq_from_gr(r, res, q, ρ)
ρ = len(trajectory[0]) / trajectory[0].volume
S_excess = excess_entropy(r, res, ρ)
return {'rdf': data_frame(r=r, rdf=res), 'structure_factor': data_frame(q=q, Sq=Sq),
'excess_entropy': S_excess}