Files
python-store/store/eval.py
2022-04-11 15:38:09 +02:00

404 lines
14 KiB
Python
Executable File

import os
import filecmp
import re
from tempfile import NamedTemporaryFile
import argparse
from psutil import virtual_memory, cpu_count
import inspect
import yaml
from glob import glob
from multiprocessing.pool import Pool
import traceback
from datetime import datetime, timedelta
import functools
from enum import Enum
import time
from pathlib import Path
from sqlalchemy.exc import OperationalError
import pydoc
import numpy as np
import mdevaluate as md
from mdevaluate.logging import logger
from . import store
from . import config
def locate(fname, namespace=''):
return pydoc.locate(namespace + '.' + fname) or pydoc.locate('store.analyse.' + fname)
def open_sim(directory, maxcache=None):
tr = None
if len(glob(os.path.join(directory, 'nojump.xtc'))) == 1:
tr = md.open(directory, trajectory='nojump.xtc', cached=maxcache, reindex=True)
else:
tr = md.open(directory, trajectory='out/*.xtc', cached=maxcache, reindex=True, nojump=True)
if tr is None:
raise FileNotFoundError('Can not open trajectory.')
return tr
def open_energy(directory):
return md.open_energy(os.path.join(directory, 'out/*.edr'))
def dataframe_to_txt(fname, df):
"""Save a dataframe to textfile."""
header = ' '.join(df.columns)
np.savetxt(fname, df.values, header=header)
class RunState:
OK = 0
ERROR = 1
UNKNOWN = 2
def __init__(self, st, timing=None):
if isinstance(st, str):
self.state = getattr(self, st)
else:
self.state = st
self.timing = timing
def __str__(self):
if self.state == self.OK:
s = '\x1b[0;32m\u2713\x1b[0m'
if self.timing is not None:
s += '({})'.format(self.timing)
return s
elif self.state == self.ERROR:
return '\x1b[0;31m\u2717\x1b[0m'
elif self.state == self.UNKNOWN:
return '\x1b[0;33m?\x1b[0m'
@functools.total_ordering
class Report:
def apply(self, func, *args, err_count=1, **kwargs):
if isinstance(func, functools.partial):
fname = func.func.__name__
else:
fname = func.__name__
try:
start = datetime.now()
res = func(*args, **kwargs)
time = timedelta(seconds=round((datetime.now() - start).total_seconds()))
self.runok(fname, timing=time)
return res
except Exception as e:
self.runerror(fname)
self.error(e, err_count=err_count)
def runok(self, name, timing=None):
# st = RunStates.OK.timing(timing) if timing is not None else RunStates.OK
self.current['runs'].append((name, RunState('OK', timing)))
def runerror(self, name):
self.current['runs'].append((name, RunState('ERROR')))
def unknown(self, name):
self.current['runs'].append((name, RunState('UNKNOWN')))
def error(self, e, err_count=1):
if err_count > 0:
traceback.print_exc()
self.current['errors'].append(e)
self.err_count += err_count
def system(self, sys):
self.systems.setdefault(sys, {'runs': [], 'errors': []})
self.current_system = sys
def __repr__(self):
rep = ''
for sys in self.systems:
rep += '--= {} =--\n'.format(sys)
for run, state in self.systems[sys]['runs']:
rep += '{}:{} '.format(run, state)
rep += '\n'
for e in self.systems[sys]['errors']:
rep += str(e) + '\n'
rep += '\n'
return rep
@property
def current(self):
return self.systems[self.current_system]
@property
def nerrors(self):
return sum(len(sys['errors']) for sys in self.systems.values())
def __init__(self):
self.systems = {}
self.current_system = None
self.err_count = 0
def __eq__(self, other):
return self.systems == other.systems
def __lt__(self, other):
return self.err_count < other.err_count
class Loader(yaml.Loader):
def include(self, node):
fname = os.path.abspath(self.construct_scalar(node))
with open(fname, 'r') as f:
return yaml.load(f, Loader)
def calc(self, node):
return eval(self.construct_scalar(node))
Loader.add_constructor('!include', Loader.include)
Loader.add_constructor('!calc', Loader.calc)
def find_user():
return os.environ.get('USER') or os.environ.get('LOGNAME')
def run_eval(yamlfile, debug=False, txtout=None, autosavedir=None):
"""
Read an eval.yaml file and run the specified functions and store the result.
"""
report = Report()
with open(yamlfile) as f:
yaml_dict = yaml.load(f.read(), Loader)
directory = os.path.dirname(yamlfile)
user = find_user()
namespace = yaml_dict.pop('namespace', '')
report.system(directory)
for k, v in yaml_dict.pop('config', {}).items():
config['eval'][k] = str(v)
temperature = yaml_dict['simulation_params'].pop('T', None)
if temperature is None:
m = re.match('.*[^\d](\d+)K.*', directory)
if m is not None:
temperature = int(m.group(1))
if debug:
print('Evaluation for', directory)
print('User:', user)
print('T:', temperature)
# dont overdo caching, this allows several parallel analysis with caching
MAXCACHE = int(min(3000, (virtual_memory().total / 1024**2 / 5 / cpu_count() / 0.4) // 10 * 10))
if autosavedir is not None:
md.autosave.enable(autosavedir, verbose=True)
if 'trajectory-open' in yaml_dict:
fopen = locate(yaml_dict['trajectory-open'], namespace)
if fopen is None:
raise ValueError("Trajectory loader couldn't be located: {}".format(yaml_dict['trajectory-open']))
else:
fopen = open_sim
traj = report.apply(fopen, directory, maxcache=config['eval'].getint('maxcache', MAXCACHE))
if 'energy-open' in yaml_dict:
eopen = locate(yaml_dict['energy-open'], namespace)
eopen_err_count = 1
if fopen is None:
raise ValueError("Energy loader couldn't be located: {}".format(yaml_dict['energy-open']))
else:
eopen = open_energy
eopen_err_count = 0
energyfile = report.apply(eopen, directory, err_count=eopen_err_count)
if config.getboolean('eval', 'require_trajectory') and traj is None:
return report
if traj is None and energyfile is None:
return report
logger.info('Running evaluations for: %s', directory)
for evaluation in yaml_dict['evaluations']:
subset = evaluation['subset']
selection = subset.pop('selection')
if 'coordinates-map' in subset:
cmap = subset.pop('coordinates-map')
crds_map = locate(cmap, namespace)
if crds_map is None:
report.unknown(cmap)
continue
subtraj = crds_map(traj.subset(**subset)) if traj is not None else None
else:
subtraj = traj.subset(**subset) if traj is not None else None
subtraj.selection = selection
other = evaluation.pop('other', None)
if other is not None and traj is not None:
selection += '-' + other.pop('selection')
if 'coordinates-map' in other:
cmap = other.pop('coordinates-map')
crds_map = locate(cmap, namespace)
if crds_map is None:
report.unknown(cmap)
continue
othertraj = crds_map(traj.subset(**other))
else:
othertraj = traj.subset(**other)
else:
othertraj = None
functions = evaluation.pop('functions')
report.system('{}@{}'.format(selection, directory))
if debug:
print('*****')
print('subset:', subset)
print('selection:', selection)
print('subtraj:', subtraj)
for func in functions:
if isinstance(func, dict):
if len(func) == 1:
(func, params), = func.items()
else:
logger.info('Function definition is unclear: %s', str(func))
else:
params = {}
if othertraj is not None:
params['other'] = othertraj
# Locate the function: First under a specified namespace, then in the default module
f = locate(namespace + '.' + func) or locate('store.analyse.' + func)
if f is not None:
if debug:
print('---')
print('function:', f, func)
print('params:', params)
report.runok(func)
else:
logger.info('Run function %s', func)
func_args = inspect.signature(f).parameters
if 'trajectory' in func_args:
params['trajectory'] = subtraj
if 'energyfile' in func_args:
params['energyfile'] = energyfile
res = report.apply(f, **params)
if not isinstance(res, dict):
res = {func: res}
params.pop('trajectory', None)
params.pop('other', None)
params.pop('energyfile', None)
for obs, data in res.items():
if txtout is not None:
dataframe_to_txt(os.path.join(txtout, '{}.dat'.format(obs)), data)
store.update(
obs, data, directory=directory, user=user, selection=selection, T=temperature,
simulation_params=yaml_dict.get('simulation_params', {}),
evaluation_params=params
)
else:
report.unknown(func)
logger.info('Function %s was not found. Namespace was: %s', func, namespace)
return report
def recursive_analysis(basedir, processes=None, debug=False, txtout=None, autosavedir=None):
"""
Run analysis functions recursively for baseidr on several processes.
"""
logger.info('Starting recursive analysis in directory: {}'.format(basedir))
reports = []
def collect_reports(rep):
reports.append(rep)
def catch_error(err):
traceback.print_exception(type(err), err, err.__traceback__)
yaml_files = [str(y) for p in glob(basedir) for y in Path(p).glob('**/eval.yaml')]
logger.info('Finished walking directories:')
for y in yaml_files:
logger.info(y)
# Evaluation is syncronous only if processes=False.
if processes is False:
for y in yaml_files:
print(y)
try:
reports.append(run_eval(y, debug=debug, txtout=txtout, autosavedir=autosavedir))
except FileNotFoundError:
print('Skipping evaluation...')
else:
#pool = Pool(processes=processes)
#for y in yaml_files:
# pool.apply_async(run_eval, args=(y,), kwds={'debug': debug, 'txtout': txtout, 'autosavedir': autosavedir},
# callback=collect_reports, error_callback=catch_error)
#pool.close()
#pool.join()
with Pool(processes=processes) as pool:
for y in yaml_files:
time.sleep(5)
pool.apply_async(run_eval, args=(y,), kwds={'debug': debug, 'txtout': txtout, 'autosavedir': autosavedir},
callback=collect_reports, error_callback=catch_error)
pool.close()
pool.join()
# reports = pool.map(
# functools.partial(run_eval, debug=debug, txtout=txtout, autosavedir=autosavedir),
# yaml_files
# )
print('#*' * 22)
print('Finished analysis!: {}'.format(datetime.strftime(datetime.now(), '%c')))
print('#*' * 22)
for rep in sorted(reports):
print(rep)
if len(yaml_files) != len(reports):
print('#### Error: {} / {} tasks were reported.'.format(len(reports), len(yaml_files)))
for y in yaml_files:
root = os.path.dirname(y)
if not any(root in str(rep) for rep in reports):
print('Task not reported: {}'.format(root))
# ===========================================
def cli(args=None):
parser = argparse.ArgumentParser(description='Analyse a certain simulation')
parser.add_argument('--recursive', '-r', action='store_true', default=False,
help='Perform a recusrive evaluation of the directory.')
parser.add_argument('-d', default=None, help='simulation directory; default cwd')
parser.add_argument('-o', default=None,
help='default None; output directory for human readable *.dat, if this is not a path ending with "/" then the last part will be a common prefix')
parser.add_argument('--autosave', '-a', default=None, help='Autosave directory')
parser.add_argument('--verbose', '-v', default=False, action='store_true',
help='Be verbose, i.e. set logging level to DEBUG, default is INFO')
parser.add_argument('--processes', '-np', default=None, type=int,
help='Number of sub-processes for the recursive evaluation.')
args = parser.parse_args()
if args.verbose:
md.logging.setlevel('DEBUG')
SIMDIR = args.d
if SIMDIR is None:
SIMDIR = os.getcwd()
SIMDIR = os.path.abspath(SIMDIR)
OUTDIR = args.o
if OUTDIR is not None:
OUTDIR = os.path.abspath(OUTDIR)
if args.o[-1] == '/':
OUTDIR = OUTDIR + '/'
if args.recursive:
recursive_analysis(SIMDIR, txtout=OUTDIR, autosavedir=args.autosave, processes=args.processes)
else:
yamlfile = os.path.join(SIMDIR, 'eval.yaml')
if os.path.exists(yamlfile):
rep = run_eval(yamlfile, txtout=OUTDIR, autosavedir=args.autosave)
print(rep)
else:
print('eval.yaml not found, exiting')
quit()