Source code for simkit.core.simulations

# -*- coding: utf-8 -*-
"""
This is the Simulation module. The Simulation layer takes of creating output
variables, writing data to disk, iterating over data and calculations at
each interval in the simulation and setting any parameters required to perform
the simulation. It gets all its info from the model, which in turn gets it from
each layer which gets info from the layers' sources.
"""

from __future__ import (
    absolute_import, division, print_function, unicode_literals)
from simkit.core import logging, CommonBase, Registry, UREG, Q_, Parameter
from simkit.core.exceptions import CircularDependencyError, MissingDataError
import json
import errno
import os
import sys
import numpy as np
from queue import Queue  # after pip install future
import functools
from datetime import datetime

LOGGER = logging.getLogger(__name__)


def mkdir_p(path):
    """
    http://stackoverflow.com/questions/600268/mkdir-p-functionality-in-python
    :param path: path to make recursively
    """
    try:
        os.makedirs(path)
    except OSError as exc:
        if exc.errno == errno.EEXIST and os.path.isdir(path):
            pass
        else:
            raise exc


def id_maker(obj):
    """
    Makes an ID from the object's class name and the datetime now in ISO format.

    :param obj: the class from which to make the ID
    :return: ID
    """
    dtfmt = '%Y%m%d-%H%M%S'
    return '%s-%s' % (obj.__class__.__name__, datetime.now().strftime(dtfmt))


def sim_progress_hook(format_args, display_header=False):
    if isinstance(format_args, basestring):
        format_str = '---------- %s ----------\n'
    else:
        idx = format_args[0]
        fields, values = zip(*format_args[1:])
        format_str = '\r%5d' + ' %10.4g' * len(values)
        if display_header:
            units = (str(v.dimensionality) for v in values)
            units = tuple(['n/d' if u == 'dimensionless' else u
                           for u in units])
            format_args = fields + units + (idx,) + values
            format_units = ('units' + ' %10s' * len(units)) + '\n'
            fmt_header = ('index' + ' %10s' * len(fields)) + '\n'
            format_str = fmt_header + format_units + format_str
        else:
            format_args = (idx,) + values
    sys.stdout.write(format_str % format_args)


def topological_sort(dag):
    """
    topological sort

    :param dag: directed acyclic graph
    :type dag: dict

    .. seealso:: `Topographical Sorting
        <http://en.wikipedia.org/wiki/Topological_sorting>`_,
        `Directed Acyclic Graph (DAG)
        <https://en.wikipedia.org/wiki/Directed_acyclic_graph>`_
    """
    # find all edges of dag
    topsort = [node for node, edge in dag.iteritems() if not edge]
    # loop through nodes until topologically sorted
    while len(topsort) < len(dag):
        num_nodes = len(topsort)  # number of nodes
        # unsorted nodes
        for node in dag.viewkeys() - set(topsort):
            # nodes with no incoming edges
            if set(dag[node]) <= set(topsort):
                topsort.append(node)
                break
        # circular dependencies
        if len(topsort) == num_nodes:
            raise CircularDependencyError(dag.viewkeys() - set(topsort))
    return topsort


[docs]class SimParameter(Parameter): _attrs = ['ID', 'path', 'commands', 'data', 'thresholds', 'interval', 'sim_length', 'display_frequency', 'display_fields', 'write_frequency', 'write_fields']
[docs]class SimRegistry(Registry): """ Registry for simulations. """ #: meta names meta_names = ['commands'] def register(self, sim, *args, **kwargs): """ register simulation and metadata. * ``commands`` - list of methods to callable from model :param sim: new simulation """ kwargs.update(zip(self.meta_names, args)) # call super method, now meta can be passed as args or kwargs. super(SimRegistry, self).register(sim, **kwargs)
[docs]class SimBase(CommonBase): """ Meta class for simulations. """ _path_attr = 'sim_path' _file_attr = 'sim_file' _attributes = 'attrs' _deprecated = 'deprecated' _param_cls = SimParameter def __new__(mcs, name, bases, attr): # use only with Simulation subclasses if not CommonBase.get_parents(bases, SimBase): LOGGER.debug('bases:\n%r', bases) return super(SimBase, mcs).__new__(mcs, name, bases, attr) # set _meta combined from bases attr = mcs.set_meta(bases, attr) # let some attributes in subclasses be override super attributes = attr.pop(mcs._attributes, None) deprecated = attr.pop(mcs._deprecated, None) # set param file full path if simulations path and file specified or # try to set parameters from class attributes except private/magic attr = mcs.set_param_file_or_parameters(attr) # reset subclass attributes if attributes is not None: attr[mcs._attributes] = attributes if deprecated is not None: attr[mcs._deprecated] = deprecated LOGGER.debug('attibutes:\n%r', attr) return super(SimBase, mcs).__new__(mcs, name, bases, attr)
[docs]class Simulation(object): """ A class for simulations. :param simfile: Filename of simulation configuration file. :type simfile: str :param settings: keyword name of simulation parameter to use for settings :type str: Simulation attributes can be passed directly as keyword arguments directly to :class:`~simkit.core.simulations.Simulation` or in a JSON file or as class attributes in a subclass or a combination of all 3 methods. To get a list of :class:`~simkit.core.simulations.Simulation` attributes and defaults get the :attr:`~simkit.core.simulations.Simulation.attrs` attribute. Any additional settings provided as keyword arguments will override settings from file. """ __metaclass__ = SimBase attrs = { 'ID': None, 'path': os.path.join('~', 'SimKit', 'Simulations'), 'commands': ['start', 'pause'], 'data': None, 'thresholds': None, 'interval': 1 * UREG.hour, 'sim_length': 1 * UREG.year, 'display_frequency': 1, 'display_fields': None, 'write_frequency': 8760, 'write_fields': None } deprecated = { 'interval': 'interval_length', 'sim_length': 'simulation_length' } def __init__(self, simfile=None, settings=None, **kwargs): # load simfile if it's an argument if simfile is not None: # read and load JSON parameter map file as "parameters" self.param_file = simfile with open(self.param_file, 'r') as param_file: file_params = json.load(param_file) #: simulation parameters from file self.parameters = {settings: SimParameter(**params) for settings, params in file_params.iteritems()} # if not subclassed and metaclass skipped, then use kwargs if not hasattr(self, 'parameters'): #: parameter file self.param_file = None #: simulation parameters from keyword arguments self.parameters = kwargs else: # use first settings if settings is None: self.settings, self.parameters = self.parameters.items()[0] else: #: name of sim settings used for parameters self.settings = settings self.parameters = self.parameters[settings] # use any keyword arguments instead of parameters self.parameters.update(kwargs) # make pycharm happy - attributes assigned in loop by attrs self.thresholds = {} self.display_frequency = 0 self.display_fields = {} self.write_frequency = 0 self.write_fields = {} # pop deprecated attribute names for k, v in self.deprecated.iteritems(): val = self.parameters['extras'].pop(v, None) # update parameters if deprecated attr used and no new attr if val and k not in self.parameters: self.parameters[k] = val # Attributes for k, v in self.attrs.iteritems(): setattr(self, k, self.parameters.get(k, v)) # member docstrings are in documentation since attrs are generated if self.ID is None: # generate id from object class name and datetime in ISO format self.ID = id_maker(self) if self.path is not None: # expand environment variables, ~ and make absolute path self.path = os.path.expandvars(os.path.expanduser(self.path)) self.path = os.path.abspath(self.path) # convert simulation interval to Pint Quantity if isinstance(self.interval, basestring): self.interval = UREG(self.interval) elif not isinstance(self.interval, Q_): self.interval = self.interval[0] * UREG(str(self.interval[1])) # convert simulation length to Pint Quantity if isinstance(self.sim_length, basestring): self.sim_length = UREG(self.sim_length) elif not isinstance(self.sim_length, Q_): self.sim_length = self.sim_length[0] * UREG(str(self.sim_length[1])) # convert simulation length to interval units to calc total intervals sim_to_interval_units = self.sim_length.to(self.interval.units) #: total number of intervals simulated self.number_intervals = np.ceil(sim_to_interval_units / self.interval) #: interval index, start at zero self.interval_idx = 0 #: pause status self._ispaused = False #: finished status self._iscomplete = False #: initialized status self._isinitialized = False #: order of calculations self.calc_order = [] #: command queue self.cmd_queue = Queue.Queue() #: index iterator self.idx_iter = self.index_iterator() #: data loaded status self._is_data_loaded = False @property def ispaused(self): """ Pause property, read only. True if paused. """ return self._ispaused @property def iscomplete(self): """ Completion property, read only. True if finished. """ return self._iscomplete @property def isinitialized(self): """ Initialization property, read only. True if initialized. """ return self._isinitialized @property def is_data_loaded(self): """ Data loaded property, read only. True if data loaded. """ return self._is_data_loaded
[docs] def check_data(self, data): """ Check if data loaded for all sources in data layer. :param data: data layer from model :type data: :class:`~simkit.core.layer.Data` :return: dictionary of data sources and objects or `None` if not loaded """ data_objs = { data_src: data.objects.get(data_src) for data_src in data.layer } self._is_data_loaded = all(data_objs.values()) return data_objs
[docs] def initialize(self, calc_reg): """ Initialize the simulation. Organize calculations by dependency. :param calc_reg: Calculation registry. :type calc_reg: :class:`~simkit.core.calculation.CalcRegistry` """ self._isinitialized = True # TODO: if calculations are edited, loaded, added, etc. then reset self.calc_order = topological_sort(calc_reg.dependencies)
[docs] def index_iterator(self): """ Generator that resumes from same index, or restarts from sent index. """ idx = 0 # index while idx < self.number_intervals: new_idx = yield idx idx += 1 if new_idx: idx = new_idx - 1
# TODO: change start to run
[docs] def start(self, model, progress_hook=None): """ Start the simulation from time zero. :param model: Model with layers and registries containing parameters :type: :class:`~simkit.core.models.Model` :param progress_hook: A function that receives either a string or a list containing the index followed by tuples of the data or outputs names and values specified by ``write_fields`` in the simfile. :type progress_hook: function The model registries should contain the following layer registries: * :class:`~simkit.core.data_sources.DataRegistry`, * :class:`~simkit.core.formulas.FormulaRegistry`, * :class:`~simkit.core.outputs.OutputRegistry`, * :class:`~simkit.core.calculation.CalcRegistry` """ # check if data loaded data_objs = self.check_data(model.data) if not self.is_data_loaded: raise MissingDataError([ds for ds in data_objs if ds is None]) # get layer registries data_reg = model.registries['data'] formula_reg = model.registries['formulas'] out_reg = model.registries['outputs'] calc_reg = model.registries['calculations'] # initialize if not self.isinitialized: self.initialize(calc_reg) # default progress hook if not progress_hook: progress_hook = functools.partial( sim_progress_hook, display_header=True ) # start, resume or restart if self.ispaused: # if paused, then resume, do not resize outputs again. self._ispaused = False # change pause state progress_hook('resume simulation') elif self.iscomplete: # if complete, then restart, do not resize outputs again. self._iscomplete = False # change pause state progress_hook('restart simulation') self.idx_iter = self.index_iterator() else: # resize outputs # assumes that self.write_frequency is immutable # TODO: allow self.write_frequency to be changed # only resize outputs first time simulation is started # repeat output rows to self.write_frequency # put initial conditions of outputs last so it's copied when # idx == 0 progress_hook('resize outputs') # display progress for k in out_reg: if out_reg.isconstant[k]: continue # repeat rows (axis=0) out_reg[k] = out_reg[k].repeat(self.write_frequency, 0) _initial_value = out_reg.initial_value[k] if not _initial_value: continue if isinstance(_initial_value, basestring): # initial value is from data registry # assign in a scalar to a vector fills in the vector, yes! out_reg[k][-1] = data_reg[_initial_value] else: out_reg[k][-1] = _initial_value * out_reg[k].units progress_hook('start simulation') # check and/or make SimKit_Simulations and simulation ID folders mkdir_p(self.path) sim_id_path = os.path.join(self.path, self.ID) mkdir_p(sim_id_path) # header & units for save files data_fields = self.write_fields.get('data', []) # any data fields out_fields = self.write_fields.get('outputs', []) # any outputs fields save_header = tuple(data_fields + out_fields) # concatenate fields # get units as strings from data & outputs data_units = [str(data_reg[f].dimensionality) for f in data_fields] out_units = [str(out_reg[f].dimensionality) for f in out_fields] save_units = tuple(data_units + out_units) # concatenate units # string format for header & units save_str = ('%s' + ',%s' * (len(save_header) - 1)) + '\n' # format save_header = (save_str * 2) % (save_header + save_units) # header save_header = save_header[:-1] # remove trailing new line # =================== # Static calculations # =================== progress_hook('static calcs') for calc in self.calc_order: if not calc_reg.is_dynamic[calc]: calc_reg.calculator[calc].calculate( calc_reg[calc], formula_reg, data_reg, out_reg ) # ==================== # Dynamic calculations # ==================== progress_hook('dynamic calcs') # TODO: assumes that interval size and indices are same, but should # interpolate for any size interval or indices for idx_tot in self.idx_iter: self.interval_idx = idx_tot # update simulation interval counter idx = idx_tot % self.write_frequency # update properties for k, v in out_reg.isproperty.iteritems(): # set properties from previous interval at night if v: out_reg[k][idx] = out_reg[k][idx - 1] # night if any threshold exceeded if self.thresholds: night = not all(limits[0] < data_reg[data][idx] < limits[1] for data, limits in self.thresholds.iteritems()) else: night = None # daytime or always calculated outputs for calc in self.calc_order: # Determine if calculation is scheduled for this timestep # TODO: add ``start_at`` parameter combined with ``frequency`` freq = calc_reg.frequency[calc] if not freq.dimensionality: is_scheduled = (idx_tot % freq) == 0 else: # Frequency with units of time is_scheduled = ((idx_tot * self.interval) % freq) == 0 is_scheduled = ( is_scheduled and (not night or calc_reg.always_calc[calc]) ) if calc_reg.is_dynamic[calc] and is_scheduled: calc_reg.calculator[calc].calculate( calc_reg[calc], formula_reg, data_reg, out_reg, timestep=self.interval, idx=idx ) # display progress if not (idx % self.display_frequency): progress_hook(self.format_progress(idx, data_reg, out_reg)) # disp_head = False # create an index for the save file, 0 if not saving if not ((idx_tot + 1) % self.write_frequency): savenum = (idx_tot + 1) / self.write_frequency elif idx_tot == self.number_intervals - 1: # save file index should be integer! savenum = int(np.ceil((idx_tot + 1) / float(self.write_frequency))) else: savenum = 0 # not saving this iteration # save file to disk if savenum: savename = self.ID + '_' + str(savenum) + '.csv' # filename savepath = os.path.join(sim_id_path, savename) # path # create array of all data & outputs to save save_array = self.format_write(data_reg, out_reg, idx + 1) # save as csv using default format & turn comments off np.savetxt(savepath, save_array, delimiter=',', header=save_header, comments='') try: cmd = self.cmd_queue.get_nowait() except Queue.Empty: continue if cmd == 'pause': self._ispaused = True return self._iscomplete = True # change completion status
def format_progress(self, idx, data_reg, out_reg): data_fields = self.display_fields.get('data', []) # data fields data_args = [(f, data_reg[f][idx]) for f in data_fields] out_fields = self.display_fields.get('outputs', []) # outputs fields out_args = [(f, out_reg[f][idx]) for f in out_fields] return [idx] + data_args + out_args def format_write(self, data_reg, out_reg, idx=None): data_fields = self.write_fields.get('data', []) # any data fields data_args = [data_reg[f][:idx].reshape((-1, 1)) for f in data_fields] out_fields = self.write_fields.get('outputs', []) # any outputs fields out_args = [out_reg[f][:idx] for f in out_fields] return np.concatenate(data_args + out_args, axis=1)
[docs] def pause(self, progress_hook=None): """ Pause the simulation. How is this different from stopping it? Maintain info sufficient to restart simulation. Sets ``is_paused`` to True. Will this state allow analysis? changing parameters? What can you do with a paused simulation? Should be capable of saving paused simulation for loading/resuming later, that is the main usage. EG: someone else need computer, or power goes out, so on battery backup quickly pause simulation, and save. Is save automatic? Should there be a parameter for auto save changed? """ # default progress hook if progress_hook is None: progress_hook = sim_progress_hook progress_hook('simulation paused') self.cmd_queue.put('pause') self._ispaused = True
def load(self, model, progress_hook=None, *args, **kwargs): # default progress hook if progress_hook is None: progress_hook = sim_progress_hook data = kwargs.get('data', {}) if not data and args: data = args[0] for k, v in data.iteritems(): progress_hook('loading simulation for %s' % k) model.data.open(k, **v) self.check_data(model.data) def run(self, model, progress_hook=None, *args, **kwargs): # default progress hook if progress_hook is None: progress_hook = sim_progress_hook progress_hook('running simulation') self.load(model, progress_hook, *args, **kwargs) self.start(model, progress_hook)