import os from collections.abc import Iterable from logging import info import argparse import pickle import pandas as pd import sqlalchemy as sql from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.pool import NullPool from .utils import numlike, number_shorthand, lazy_eq from . import config Base = declarative_base() Engine = sql.create_engine(config['store']['db_file'], poolclass=NullPool) Session = sql.orm.sessionmaker(bind=Engine) def init_db(): pass class FloatAttribute(Base): __tablename__ = 'floatattributes' id = sql.Column(sql.Integer, primary_key=True) name = sql.Column(sql.String) value = sql.Column(sql.Float) simulation_id = sql.Column(sql.Integer, sql.ForeignKey('simulations.id')) def __init__(self, name, value): self.name = name self.value = value def __repr__(self): return '<{classname}({name}={value})>'.format( classname=self.__class__.__name__, name=self.name, value=self.value ) class StringAttribute(Base): __tablename__ = 'stringattributes' id = sql.Column(sql.Integer, primary_key=True) name = sql.Column(sql.String) value = sql.Column(sql.String) simulation_id = sql.Column(sql.Integer, sql.ForeignKey('simulations.id')) def __init__(self, name, value): self.name = name self.value = value def __repr__(self): return '<{classname}({name}={value})>'.format( classname=self.__class__.__name__, name=self.name, value=self.value ) class Parameter(Base): __tablename__ = 'parameters' id = sql.Column(sql.Integer, primary_key=True) name = sql.Column(sql.String) value = sql.Column(sql.Float) evaluation_id = sql.Column(sql.Integer, sql.ForeignKey('evaluations.id')) def __init__(self, name, value): self.name = name self.value = value def __repr__(self): return '<{classname}({name}={value})>'.format( classname=self.__class__.__name__, name=self.name, value=self.value ) class Simulation(Base): __tablename__ = 'simulations' id = sql.Column(sql.Integer, primary_key=True) directory = sql.Column(sql.String) user = sql.Column(sql.String) temperature = sql.Column(sql.Float) float_params = sql.orm.relationship(FloatAttribute) string_params = sql.orm.relationship(StringAttribute) evaluations = sql.orm.relationship('Evaluation', order_by='Evaluation.id', back_populates='simulation') def add_attribute(self, name, value): self.attributes.append(FloatAttribute(name, value)) def add_parameter(self, name, value): if numlike(value): self.float_params.append(FloatAttribute(name, value)) else: self.string_params.append(StringAttribute(name, value)) def copy(self): params = {p.name: p.value for p in self.float_params + self.string_params} return Simulation(self.directory, self.user, self.temperature, **params) def delete(self, session=None): if session is None: session = Session() for e in self.evaluations: e.delete(session=session) for p in self.float_params + self.string_params: session.delete(p) session.delete(self) def __init__(self, directory, user, temperature, **parameters): self.directory = directory or '' self.user = user or '' self.temperature = temperature for name, value in parameters.items(): self.add_parameter(name, value) def __repr__(self): attrs = [] for attr, val in self.__dict__.items(): if attr[0] != '_' and attr != 'type': attrs.append('{}={}'.format(attr, val)) return '<{classname}({attributes})>'.format(classname=self.__class__.__name__, attributes=', '.join(attrs)) # Dataframes can only be unpickled with the same version of Pandas used to store them class PandasPickler: @staticmethod def dumps(obj, protocol=None): if isinstance(obj, pd.DataFrame): obj = obj.to_dict() return pickle.dumps(obj, protocol=protocol) @staticmethod def loads(buf): obj = pickle.loads(buf) if isinstance(obj, dict): obj = pd.DataFrame(obj) return obj class Evaluation(Base): __tablename__ = 'evaluations' id = sql.Column(sql.Integer, primary_key=True) simulation_id = sql.Column(sql.Integer, sql.ForeignKey('simulations.id')) observable = sql.Column(sql.String) selection = sql.Column(sql.String) data = sql.Column(sql.PickleType(comparator=lazy_eq, pickler=PandasPickler)) parameters = sql.orm.relationship('Parameter') simulation = sql.orm.relationship('Simulation', back_populates='evaluations') @property def dataframe(self): """ Return a dataframe representing the evaluated data and meta data of the evaluation. """ if self.data is None: df = pd.DataFrame() elif numlike(self.data): df = pd.DataFrame({self.observable: self.data}, index=[0]) else: df = self.data.copy() df['selection'] = self.selection for p in self.parameters: df[p.name] = p.value df['directory'] = self.simulation.directory df['user'] = self.simulation.user df['T'] = self.simulation.temperature for a in self.simulation.float_params: df[a.name] = a.value for a in self.simulation.string_params: df[a.name] = a.value return df def add_parameter(self, name, value): self.parameters.append(Parameter(name, value)) def copy(self, simulation=None): return Evaluation(self.observable, simulation or self.simulation, self.data, parameters={p.name: p.value for p in self.parameters}) def delete(self, session=None): if session is None: session = Session() for p in self.parameters: session.delete(p) session.delete(self) def __init__(self, observable, simulation, data, selection='', parameters={}): self.observable = observable self.simulation = simulation self.data = data self.selection = selection for name, value in parameters.items(): self.add_parameter(name, value) def __repr__(self): attrs = [] for attr, val in self.__dict__.items(): if attr[0] != '_' and attr not in ['type', 'data']: attrs.append('{}={}'.format(attr, val)) return '<{classname}({attributes})>'.format(classname=self.__class__.__name__, attributes=', '.join(attrs)) Base.metadata.create_all(Engine) def list_query(args, comparator): if isinstance(args, str): args = [args] return sql.or_(*[comparator(a) for a in args]) def float_query(value, dbattr): if numlike(value): crit = dbattr == float(value) elif isinstance(value, str): limits = value.split('-') if len(limits) == 2: try: limits = [float(x) for x in limits] except: raise ValueError('Could not parse value={}'.format(value)) crit = sql.and_(float(limits[0]) <= dbattr, dbattr <= float(limits[1])) else: raise ValueError('Could not parse value={}'.format(value)) elif isinstance(value, Iterable): crit = dbattr.in_(value) else: raise ValueError('Could not parse {} as float value.'.format(value)) return crit def query_simulation(directory=None, user=None, T=None, session=None, **attributes): if session is None: session = Session() query = session.query(Simulation) if directory is not None: query = query.filter(list_query(directory, Simulation.directory.like)) if user is not None: query = query.filter(list_query(user, Simulation.user.like)) if T is not None: query = query.filter(float_query(T, Simulation.temperature)) for name, value in attributes.items(): if number_shorthand(value): alias = sql.orm.aliased(FloatAttribute) query = query.join(alias).filter(sql.and_(alias.name == name, float_query(value, alias.value))) else: alias = sql.orm.aliased(StringAttribute) query = query.join(alias).filter(sql.and_(alias.name == name, list_query(value, alias.value.like))) return query def query_evaluation(observable, simulation, selection=None, session=None, **parameters): """ Query the database for evaluations. Args: observable: Observable of the evaluation directory: Simulation directory as a string, or a list of the former. user: Username os list of users T: Temperature to query, can be a number, a list of numbers or a string giving a range selection: Selection of the simulation **attributes: Any attributes of the simulations as keyword arguments This query function allows some shorthands for the definition of query values. For string comparisons the SQL statement LIKE is used, which allows the use of the wildcard '%'. Besides a concrete value, all arguments allow specification of a list of arguments, which will query any of the given values. Number values (i.e. temperature or any simulation attribute) can also be given as strings which represent a range, i. e. '100 - 150'. Example: query_observable('isf', ['%/sys1', 'sys2'], T='100-200') """ if session is None: session = Session() if not isinstance(simulation, Iterable): simulation = [simulation] query = session.query(Evaluation).filter( Evaluation.simulation_id.in_([s.id for s in simulation]), Evaluation.observable == observable ) if selection is not None: query = query.filter(list_query(selection, Evaluation.selection.like)) for name, value in parameters.items(): alias = sql.orm.aliased(Parameter) query = query.join(alias).filter(sql.and_(alias.name == name, float_query(value, alias.value))) return query def get(observable, directory=None, user=None, T=None, selection=None, evaluation_params={}, simulation_params={}): """ Get evaluation data from the database. Args: observable: The observable of the evaluation data: The data of the evaluation simulation: The evaluated simulation directory: The simulation directory user: User of the simulation T: Temperature of the simulation selection: Selection of the evaluation evaluation_params: A dict of parameters of the evaluation simualtion_params: A dict of parameters of the evaluation """ session = Session() sim_query = query_simulation(directory, user=user, T=T, session=session, **simulation_params) if sim_query.count() == 0: return None #else: # simulations = sim_query.all() if isinstance(observable, str): observable = [observable] results = [] for obs in observable: evals = query_evaluation(obs, sim_query, selection=selection, session=session, **evaluation_params) results.extend([ev.dataframe for ev in evals]) if len(results) > 0: df = pd.concat(results, ignore_index=True) res = df.sort_values(by=['directory', 'selection', 'T']) else: res = None session.close() return res def merge(observables, *args, **kwargs): """ Get the merged datasets of several observables. Args: observables: List of observables *args, **kwargs: Additional Argmunets, passed to :func:`get` """ if len(observables) > 1: df = pd.merge(get(observables[0], *args, **kwargs), merge(observables[1:], *args, **kwargs)) return df.sort_values(by=['system', 'selection', 'T']) else: return get(observables[0], *args, **kwargs) def update(observable, data, simulation=None, directory=None, user=None, T=None, selection=None, evaluation_params={}, simulation_params={}): """ Update an existing evaluation record or insert a new one into the database. Args: observable: The observable of the evaluation data: The data of the evaluation simulation: The evaluated simulation directory: The simulation directory user: User of the simulation T: Temperature of the simulation selection: Selection of the evaluation evaluation_params: A dict of parameters of the evaluation simualtion_params: A dict of parameters of the evaluation """ session = Session() if simulation is None: sim_query = query_simulation(directory, user=user, T=T, session=session, **simulation_params) qu_count = sim_query.count() if qu_count == 0: simulation = Simulation(directory, user, T, **simulation_params) elif qu_count == 1: simulation = sim_query.first() else: session.close() raise ValueError('Found multiple simulation records while updating: {}'.format(sim_query.all())) eval_query = query_evaluation(observable, simulation, selection=selection, session=session, **evaluation_params) qu_count = eval_query.count() if qu_count == 0: session.add(Evaluation(observable, simulation, data, selection=selection, parameters=evaluation_params)) elif qu_count == 1: eval_query.first().data = data else: session.close() raise ValueError('Found more than one record in database while updating.') session.commit() session.close() def delete(*args, **kwargs): """ Delete evaluations from the database. """ session = Session() query = query_evaluation(*args, **kwargs) for item in query: session.delete(item) # item.delete() session.commit() session.close() def observables(directory=None, user=None, T=None, **parameters): """ List all observables, if system is not None for this specific system. """ query = query_simulation(directory, user=user, T=T, **parameters) obs = set() for sim in query: obs.update({ev.observable for ev in sim.evaluations}) return obs def systems(observable=None, user=None): """ List all system, if observable is not None for this specific observable. """ query = query_simulation(user=user) if observable is not None: query = query.join(Evaluation).filter(Evaluation.observable == observable) return {sim.directory for sim in query} def clean_leaves(): """Delete objects with missing foreign key from the database.""" session = Session() for cls in (StringAttribute, FloatAttribute): session.query(cls).filter(cls.simulation_id == None).delete() session.query(Parameter).filter(Parameter.evaluation_id == None).delete() # TODO: Delete evaluation leaves, requires joint query with parameter session.commit() session.close() def dump_db(dbfile, user=None, **kwargs): """ Dump the database into a new databse. Args: dbfile: File descriptor of the new database. user (opt.): If None, the database of the current user is dumped. **kwargs: Any keyword arguments are passed to query_simulation """ if user is None: user = os.getlogin() if '://' not in dbfile: dbfile = 'sqlite:///' + dbfile dump_engine = sql.create_engine(dbfile) Base.metadata.create_all(dump_engine) dump_session = sql.orm.sessionmaker(bind=dump_engine)() for sim in query_simulation(user=user, **kwargs): dump_sim = sim.copy() for ev in sim.evaluations: dump_ev = ev.copy(simulation=dump_sim) dump_session.add(dump_ev) dump_session.commit() def dump_cli(args=None): """Command line interface to dump the database to a file.""" parser = argparse.ArgumentParser(description='CLI tool to create a databse dump.') parser.add_argument('dbfile', help='The database file for the dump.') parser.add_argument( '--user', '-u', help='The user for which the database is dumped, default is the current user.' ) args = parser.parse_args() dump_db(args.dbfile, user=args.user) def delete_dirs(dirpattern): session = Session() simulations = query_simulation(directory=dirpattern, session=session) for sim in simulations: sim.delete(session=session) session.commit() session.close()