from functools import partial import numpy as np import pandas as pd from scipy.optimize import curve_fit import mdevaluate as md import store from .utils import data_frame, traj_slice, set_correlation_defaults, collect_short, excess_entropy, enhanced_bins def openNojump(path, maxcache=500): trajectory = md.open(path, trajectory='nojump.xtc', cached=maxcache) return trajectory def savetxt_newer(filename, newData): if not os.path.isfile(filename): np.savetxt(filename, newData) return f = NamedTemporaryFile() np.savetxt(f.name, newData) if not filecmp.cmp(filename, f.name): np.savetxt(filename, newData) f.close() return def isf(trajectory, q, nojump=True, nav=3, outdir=None, **kwargs): """ Compute the incoherent intermediate scattering function and and meta analyses. Meta analyses include variance in time, correlation time and susceptibility. """ kwargs = set_correlation_defaults(kwargs) kwargs['average'] = False description = 'isf' if 'description' in kwargs: description += '_' + kwargs.pop('description') if nojump: trajectory = trajectory.nojump t, S = md.correlation.shifted_correlation( partial(md.correlation.isf, q=q), trajectory, description=description, **kwargs) N = len(trajectory[0]) time = md.utils.runningmean(t[1:], nav) χ4 = md.utils.runningmean(N * S.var(axis=0)[1:], nav) S = S.mean(axis=0) freq, suscept = md.correlation.susceptibility(t, S) result = { 'isf': data_frame(time=t, cor=S), 'isf_var': data_frame(time=time, var=χ4), 'isf_sus': data_frame(freq=freq, sus=suscept) } try: m = (S < 0.7) tau = t[np.absolute(S - 1 / np.e).argmin()] fit, _ = curve_fit(md.functions.kww, t[m], S[m], p0=(1, tau, 1), maxfev=5000) tau_1e = md.functions.kww_1e(*fit) result['isf_tau'] = tau_1e except RuntimeError: md.logger.info('Could not estimate tau: %s', trajectory) return result # wraps around isf, determines automatic def isfwater(trajectory, storeparam, **kwargs): kwargs['segments'] = 10000000 // len(trajectory[0]) t, S = md.correlation.shifted_correlation(partial(md.correlation.isf, q=22.7), trajectory, segments=min(5, kwargs['segments'] // 50), window=0.5, skip=0.1, average=True, description='rough') mask = (S < 0.9) & (S > 0.1) with np.errstate(invalid='ignore'): fit, _ = curve_fit(md.functions.kww, t[mask], S[mask]) tau_est = md.functions.kww_1e(*fit) time = trajectory[-1].time - trajectory[0].time kwargs['window'] = max(min(500 * tau_est / time, 0.5), 0.01) kwargs['skip'] = max(0.01, min(0.1, 2 * tau_est / time)) return isf(trajectory, storeparam, q=22.7, nojump=False, **kwargs) def csf(trajectory, q, nav=3, **kwargs): """Compute the coherent intermediate scattering function and its variance.""" kwargs = set_correlation_defaults(kwargs) kwargs['average'] = False t, S = md.correlation.shifted_correlation( partial(md.correlation.coherent_scattering_function, q=q), trajectory.pbc, **kwargs ) csf_variance = data_frame( time=md.utils.runningmean(t, nav), χ4=md.utils.runningmean(S.var(axis=0), nav) ) return {'csf': data_frame(time=t, csf=S.mean(axis=0)), 'csf_variance': csf_variance} def msd(trajectory, **kwargs): """Compute the mean squared displacement.""" kwargs = set_correlation_defaults(kwargs) kwargs.setdefault('average', True) @collect_short def calc(trajectory, **kwargs): t, S = md.correlation.shifted_correlation( md.correlation.msd, trajectory.nojump, **kwargs ) return {'msd': data_frame(time=t, msd=S)} res = calc(trajectory, **kwargs) # Mean diffusivity, calculated over the last half-decade. t = res['msd'].time.values S = res['msd'].msd.values m = t > (t.max() / 3.16) D = (S[m] / (6 * t[m])).mean() res['D'] = D return res def tetrahedral_order(trajectory, other=None, skip=0.1, nr_averages=500, normed=True, **kwargs): if other is None: other = trajectory bins = np.arange(-3, 1, 0.01) q = md.utils.runningmean(bins, 2) sl = traj_slice(len(trajectory), skip, nr_averages) res = md.distribution.time_average( partial(md.distribution.tetrahedral_order_distribution, bins=bins), trajectory.pbc[sl], coordinates_b=other.pbc[sl], **kwargs ) N = len(trajectory[0]) if normed else 1 res /= N tet_mean = data_frame(**{'': sum(res * q), 'S_Q': sum(res * np.log(1 - q))}) return {'tetrahedral_order': data_frame(q=q, distribution=res), 'tetrahedral_mean': tet_mean} def vector(trajectory, atoms=None): """ Calcuate the vectors between two types of atoms in the trajectory. The names of the atoms between which the vectors should be calculated can be either given as a list of length 2, or the trajectory should contain excactly two atom types. """ if atoms is None: atoms = sorted(list(set(trajectory.atom_subset.atom_names))) assert len(atoms) == 2, 'Need to specifiy excactly two atom types for vector calculation.' at_a, at_b = atoms box = trajectory[0].box.diagonal() atoms_a = trajectory.atom_subset.atom_names == at_a atoms_b = trajectory.atom_subset.atom_names == at_b return md.coordinates.vectors(trajectory, atoms_a=atoms_a, atoms_b=atoms_b, normed=True, box=box) def water_dipole(trajectory): """Create a cordinates map of normed water dipole vectors.""" sel = trajectory.atom_subset.selection atoms_a = np.where(trajectory.subset(atom_name='OW').atom_subset.selection[sel])[0] atoms_b = np.where(trajectory.subset(atom_name='HW.').atom_subset.selection[sel])[0] atoms_b = atoms_b.reshape(len(atoms_a), 2).T box = trajectory[0].box.diagonal() vec = md.coordinates.vectors(trajectory, atoms_a=atoms_a, atoms_b=atoms_b, normed=True, box=box) vec.description = 'water-dipole' return vec def water_OH_bonds(trajectory): """Create a coordinates map of normed OH vectors for water molecules.""" sel = trajectory.atom_subset.selection atoms_a = np.where(trajectory.subset(atom_name='OW').atom_subset.selection[sel])[0] atoms_b = np.where(trajectory.subset(atom_name='HW.').atom_subset.selection[sel])[0] atoms_a = np.vstack([atoms_a] * 2).T.reshape(atoms_b.shape) box = trajectory[0].box.diagonal() vec = md.coordinates.vectors(trajectory, atoms_a=atoms_a, atoms_b=atoms_b, normed=True, box=box) vec.description = 'water-OH-bonds' return vec def oaf(trajectory, order, nav=1, **kwargs): """ Calculate the orientational autocorrelation function and meta analyses. Use in conjunction with a coordinates map, like water_dipole, to map atom coordinates to the desired vectors. Meta anaylsis include the functions variance in time, correlation time and the susceptibility. Results are called F{order} (i.e. F1, F2) and accordingly. Args: trajectory: The correlated vectore, will be passed automatically in automatic evaluations. order: Order of the used legendre polynomial, typically 1 or 2. nav (opt.): Running average bevor calculating variance **kwargs: Any keyowrd arguments are passed to the time_average function. """ kwargs = set_correlation_defaults(kwargs) fdesc = 'F{}'.format(order) description = fdesc if 'description' in kwargs: description += '_' + kwargs.pop('description') # Collect short should apply to the basic compuations only, # additional meta analyses, like tau and susceptibility, are computed from the result. @collect_short def calc_oaf(vectors, order, nav=1, **kwargs): kwargs['average'] = False t, F = md.correlation.shifted_correlation( partial(md.correlation.rotational_autocorrelation, order=order), vectors, description=description, **kwargs ) result = {fdesc: data_frame(time=t, cor=F.mean(axis=0))} Na = len(vectors[0]) result[fdesc + '_var'] = data_frame( time=md.utils.runningmean(t, nav), var=md.utils.runningmean(Na * F.var(axis=0), nav) ) return result res = calc_oaf(trajectory, order, nav=nav, **kwargs) t, F = res[fdesc].time.values, res[fdesc].cor.values if F.min() < 0.3: res[fdesc + '_tau'] = md.utils.quick1etau(t, F) freq, sus = md.correlation.susceptibility(t, F) res[fdesc + '_sus'] = data_frame(freq=freq, sus=sus) return res def rdf(trajectory, other=None, skip=0.01, nr_averages=100, q=None, kind=None, **kwargs): """ Compute radial pair-distribution function, static structure factor and pair entropy. The quantities can be calculated for one ore two sets of atoms. The bins of g(r) are determined automatically, with rmax = L/2 and variable bins with a maximal width of 0.01. Args: trajectory: First set of atoms other (opt.): Second set of atoms. skip (opt.): Part of the trajectory to skip at the beginning nr_averages (opt.): How many averages should be taken q (opt.): Scattering vectors of S(q). Default is 2π/(L/2) < q <2π/0.1, with Δq=0.2. """ bins = np.arange(0.0, trajectory[0].box.diagonal().min() / 2, 0.001) r = md.utils.runningmean(bins, 2) sl = traj_slice(len(trajectory), skip, nr_averages) if other is not None: kwargs['coordinates_b'] = other.pbc[sl] res = md.distribution.time_average( partial(md.distribution.rdf, bins=bins, kind=kind), trajectory.pbc[sl], **kwargs ) L = trajectory[0].box.diagonal().min() if q is None: q = 2 * np.pi * np.arange(1 / L, 10, 0.5 / L) Nb = len(other[0]) if other is not None else len(trajectory[0]) ρ = Nb / trajectory[0].volume Sq = md.utils.Sq_from_gr(r, res, q, ρ) ρ = len(trajectory[0]) / trajectory[0].volume S_excess = excess_entropy(r, res, ρ) return {'rdf': data_frame(r=r, rdf=res), 'structure_factor': data_frame(q=q, Sq=Sq), 'excess_entropy': S_excess}