Source code for lsst.sims.ocs.database.socs_db

from __future__ import absolute_import
from builtins import object
import collections
from datetime import datetime
import logging
import numpy
import os
from sqlalchemy import create_engine, desc, exc, MetaData

from lsst.sims.ocs.setup import LoggingLevel
from . import tables
from lsst.sims.ocs.utilities import expand_path, get_hostname, get_user, get_version
from lsst.sims.ocs.utilities.socs_exceptions import SocsDatabaseError

__all__ = ["SocsDatabase"]

[docs]class SocsDatabase(object): """Main class for simulation database interaction. This class is responsible for interacting with the main SQLite simulation database. This consists of a machine session tracking database and individual database files for each run session of the simulation. Attributes ---------- db_dialect : str The flavor of the database to use. Options: sqlite. metadata : sqlalchemy.MetaData The instance for holding the relevant tables. engine : sqlalchemy.engine.Engine The instance of the database engine. sqlite_save_path : str A path to save all resulting database files for SQLite. sqlite_session_save_path : str A path to save the SQLite session tracking database. session_engine : sqlalchemy.engine.Engine The session specific instance of the database engine. SQLite only. session_metadata : sqlalchemy.MetaData The instance for holding the session specific tables. SQLite only. session_start : int A new starting session Id for counting new simulations. """ def __init__(self, sqlite_save_path=None, session_id_start=None, sqlite_session_save_path=None): """Initialize the class. Parameters ---------- sqlite_save_path : str A path to save all resulting database files for SQLite. session_id_start : int A new starting session Id for counting new simulations. """ self.log = logging.getLogger("database.SocsDatabase") self.db_dialect = "sqlite" self.session_id = -1 self.session_start = session_id_start if session_id_start is not None else 2000 self.metadata = MetaData() self.engine = None self.sqlite_save_path = sqlite_save_path self.sqlite_session_save_path = sqlite_session_save_path # Parameters for SQLite operations self.session_engine = None self.session_metadata = MetaData() self.session_tracking = tables.create_session(self.metadata, autoincrement=False) sqlite_session_tracking_db = "{}_sessions.db".format(get_hostname()) self.engine = self._make_engine(sqlite_session_tracking_db, self.sqlite_session_save_path) # Parameter for holding data lists self.data_list = collections.defaultdict(list) @property def data_empty(self): """bool: Is internal data list empty """ return len(self.data_list) == 0 def _create_tables(self, metadata=None, use_autoincrement=True, session_id_start=2000): """Create all the relevant tables. Parameters ---------- metadata : sqlalchemy.MetaData The instance for holding the relevant tables. use_autoincrement: bool A flag to set auto increment behavior on the Session table. session_id_start : int A new starting session Id for counting new simulations. """ if metadata is None: metadata = self.metadata self.session = tables.create_session(metadata, use_autoincrement, session_id_start) self.field = tables.create_field(metadata) self.target_history = tables.create_target_history(metadata) self.observation_history = tables.create_observation_history(metadata) self.slew_history = tables.create_slew_history(metadata) self.slew_initial_state = tables.create_slew_initial_state(metadata) self.slew_final_state = tables.create_slew_final_state(metadata) self.slew_activities = tables.create_slew_activities(metadata) self.slew_maxspeeds = tables.create_slew_maxspeeds(metadata) self.target_exposures = tables.create_target_exposures(metadata) self.observation_exposures = tables.create_observation_exposures(metadata) self.scheduled_downtime = tables.create_scheduled_downtime(metadata) self.unscheduled_downtime = tables.create_unscheduled_downtime(metadata) self.proposal = tables.create_proposal(metadata) self.proposal_field = tables.create_proposal_field(metadata) self.observation_proposal_history = tables.create_observation_proposal_history(metadata) self.target_proposal_history = tables.create_target_proposal_history(metadata) self.config = tables.create_config(metadata) self.summary_all_props = tables.create_summary_all_props(metadata, self.observation_history, self.slew_history, self.slew_initial_state, self.proposal, self.observation_proposal_history, self.field) def _make_engine(self, sqlite_db=None, alternate_save_path=None): """Create the engine for database interactions. Parameters ---------- sqlite_db : str The name of the database file for SQLite. alternate_save_path : str, optional Specify an alternate path to save the database. """ save_path = None if self.sqlite_save_path is not None: save_path = expand_path(self.sqlite_save_path) if alternate_save_path is not None: save_path = expand_path(alternate_save_path) if save_path is not None: sqlite_db = os.path.join(save_path, sqlite_db) return create_engine("sqlite:///{}".format(sqlite_db))
[docs] def create_db(self): """Create the database tables. This function does the actual work of creating all the relevant database tables. This creates the session tracking database with the Session table. """ self.metadata.create_all(self.engine)
[docs] def new_session(self, run_comment): """Log a new session to the database and return the ID. This function logs a new session to the database and returns the session ID. This writes an entry to the session tracking database. Then a session ID specific database is created and the information is replicated in that Session table. Since SQLite auto increment values always start at one, an offset is applied to make the session IDs commensurate with OpSim3 style ones. Parameters ---------- run_comment: str The startup comment for the simulation run. Returns ------- int The session ID for this simulation run. """ hostname = get_hostname() user = get_user() version = get_version() date = datetime.utcnow() # Get the session ID from the tracking file unless it was just created. conn = self.engine.connect() select = self.session_tracking.select().order_by(desc(self.session_tracking.c.sessionId)).limit(1) result = conn.execute(select) row = result.fetchone() try: self.session_id = int(row[0]) + 1 except TypeError: self.session_id = self.session_start insert = self.session_tracking.insert() result = conn.execute(insert, sessionId=self.session_id, sessionUser=user, sessionHost=hostname, sessionDate=date, version=version, runComment=run_comment) # Create the database for the given session ID. sqlite_session_db = "{}_{}.db".format(get_hostname(), self.session_id) self.session_engine = self._make_engine(sqlite_session_db) self._create_tables(self.session_metadata, use_autoincrement=False) self.session_metadata.create_all(self.session_engine) insert = self.session.insert() conn = self.session_engine.connect() result = conn.execute(insert, sessionId=self.session_id, sessionUser=user, sessionHost=hostname, sessionDate=date, version=version, runComment=run_comment) return self.session_id
[docs] def append_data(self, table_name, table_data): """Collect information for the provided table. Parameters ---------- table_name: str The attribute name holding the sqlalchemy.Table instance. table_data: topic The Scheduler topic data instance. """ write_func = getattr(tables, "write_{}".format(table_name)) result = write_func(table_data, self.session_id) self.data_list[table_name].append(result)
[docs] def clear_data(self): """Clear all stored data lists. """ self.data_list.clear() self.log.log(LoggingLevel.EXTENSIVE.value, "After clearing: {}".format(self.data_list))
def _get_conn(self): """Get the DB connection. Returns ------- sqlalchemy.engine.Connection The DB connection for the associated type. """ e = self.session_engine return e.connect()
[docs] def write(self): """Write collected information into the database. """ conn = self._get_conn() db_errors = [] for table_name, table_data in self.data_list.items(): try: self.log.log(LoggingLevel.EXTENSIVE.value, "Writing {} data into DB.".format(table_name)) self.log.log(LoggingLevel.EXTENSIVE.value, "Length of data: {}".format(len(table_data))) tbl = getattr(self, table_name) conn.execute(tbl.insert(), table_data) except exc.IntegrityError as err: self.log.error("Database insertion failed for {}!".format(table_name)) output = collections.defaultdict(list) for values in table_data: for k, v in values.items(): output[k].append(v) for k, v in output.items(): output[k] = numpy.array(v) filename = "{}_{}.npz".format(table_name, self.session_id) numpy.savez(open(filename, 'w'), **output) self.log.error("Dumping information into {}".format(filename)) db_errors.append(err.message) if len(db_errors): raise SocsDatabaseError(os.linesep.join(db_errors))
[docs] def write_table(self, table_name, table_data): """Collect information for the provided table. Parameters ---------- table_name : str The attribute name holding the sqlalchemy.Table instance. table_data : list[topic] A set of Scheduler topic data instances. """ conn = self._get_conn() tbl = getattr(self, table_name) conn.execute(tbl.insert(), table_data)