# -*- coding: utf-8 -*-
"""
This is the SimKit :mod:`~simkit.core.models` module that contains
definitions for the :class:`~simkit.core.models.Model` class.
The SimKit model contains five layers:
:class:`~simkit.core.layers.Data`,
:class:`~simkit.core.layers.Formulas`,
:class:`~simkit.core.layers.Calculations`,
:class:`~simkit.core.layers.Outputs` and
:class:`~simkit.core.layers.Simulations`. The
:class:`~simkit.core.layers.Data` layer organizes
:ref:`data-sources` by providing methods to add and load data for SimKit.
The :class:`~simkit.core.layers.Formulas` layer loads
:ref:`formulas` used by :class:`~simkit.core.layers.Calculations`
calculations. The :class:`~simkit.core.layers.Outputs` layer
organizes the calculated outputs for use in other calculations. Finally the
:class:`~simkit.core.layers.Simulations` layer organizes
options such as how long the simulation should run and takes care of actually
running the simulation.
"""
import importlib
import json
import os
import copy
from simkit.core import logging, _listify, CommonBase, Parameter
LOGGER = logging.getLogger(__name__)
LAYERS_MOD = '.layers'
LAYERS_PKG = 'simkit.core'
LAYER_CLS_NAMES = {'data': 'Data', 'calculations': 'Calculations',
'formulas': 'Formulas', 'outputs': 'Outputs',
'simulations': 'Simulations'}
[docs]class ModelParameter(Parameter):
_attrs = ['layer', 'module', 'package', 'path', 'sources']
[docs]class ModelBase(CommonBase):
"""
Base model meta class. If model has class attributes "modelpath" and
"modelfile" then layer class names and model configuration will be read from
the file on that path. Otherwise layer class names will be read from the
class attributes.
"""
_path_attr = 'modelpath'
_file_attr = 'modelfile'
_param_cls = ModelParameter
_layers_cls_attr = 'layer_cls_names'
_layers_mod_attr = 'layers_mod'
_layers_pkg_attr = 'layers_pkg'
_cmd_layer_attr = 'cmd_layer_name'
_attr_default = {
_layers_cls_attr: LAYER_CLS_NAMES, _layers_mod_attr: LAYERS_MOD,
_layers_pkg_attr: LAYERS_PKG, _cmd_layer_attr: 'simulations'
}
def __new__(mcs, name, bases, attr):
# use only with Model subclasses
if not CommonBase.get_parents(bases, ModelBase):
return super(ModelBase, mcs).__new__(mcs, name, bases, attr)
attr = mcs.set_meta(bases, attr)
# set param file full path if data source path and file specified or
# try to set parameters from class attributes except private/magic
attr = mcs.set_param_file_or_parameters(attr)
# set default meta attributes
meta = attr[mcs._meta_attr]
for ma, dflt in mcs._attr_default.iteritems():
a = getattr(meta, ma, None)
if a is None:
setattr(meta, ma, dflt)
return super(ModelBase, mcs).__new__(mcs, name, bases, attr)
[docs]class Model(object):
"""
A class for models. SimKit is a subclass of the :class:`Model` class.
:param modelfile: The name of the JSON file with model data.
:type modelfile: str
"""
__metaclass__ = ModelBase
def __init__(self, modelfile=None):
meta = getattr(self, ModelBase._meta_attr)
parameters = getattr(self, ModelBase._param_attr)
# load modelfile if it's an argument
if modelfile is not None:
#: model file
self.param_file = os.path.abspath(modelfile)
LOGGER.debug('modelfile: %s', modelfile)
else:
modelfile = self.param_file
# check meta class for model if declared inline
if parameters:
# TODO: separate model and parameters according to comments in #78
#: dictionary of the model
self.model = model = copy.deepcopy(parameters)
else:
#: dictionary of the model
self.model = model = None
# layer attributes initialized in meta class or _initialize()
# for k, v in layer_cls_names.iteritems():
# setattr(self, k, v)
# XXX: this seems bad to initialize attributes outside of constructor
#: dictionary of model layer classes
self.layers = {}
#: state of model, initialized or uninitialized
self._state = 'uninitialized'
# need either model file or model and layer class names to initialize
ready_to_initialize = ((modelfile is not None or model is not None) and
meta.layer_cls_names is not None)
if ready_to_initialize:
self._initialize() # initialize using modelfile or model
@property
def state(self):
"""
current state of the model
"""
return self._state
def _load(self, layer=None):
"""
Load or update all or part of :attr:`model`.
:param layer: Optionally load only specified layer.
:type layer: str
"""
# open model file for reading and convert JSON object to dictionary
# read and load JSON parameter map file as "parameters"
with open(self.param_file, 'r') as param_file:
file_params = json.load(param_file)
for layer, params in file_params.iteritems():
# update parameters from file
self.parameters[layer] = ModelParameter(**params)
# if layer argument spec'd then only update/load spec'd layer
if not layer or not self.model:
# update/load model if layer not spec'd or if no model exists yet
# TODO: separate model and parameters according to comments in #78
self.model = copy.deepcopy(self.parameters)
else:
# convert non-sequence to tuple
layers = _listify(layer)
# update/load layers
for layer in layers:
self.model[layer] = copy.deepcopy(self.parameters[layer])
def _update(self, layer=None):
"""
Update layers in model.
"""
meta = getattr(self, ModelBase._meta_attr)
if not layer:
layers = self.layers
else:
# convert non-sequence to tuple
layers = _listify(layer)
for layer in layers:
# relative path to layer files from model file
path = os.path.abspath(os.path.join(meta.modelpath, layer))
getattr(self, layer).load(path)
def _initialize(self):
"""
Initialize model and layers.
"""
meta = getattr(self, ModelBase._meta_attr)
# read modelfile, convert JSON and load/update model
if self.param_file is not None:
self._load()
LOGGER.debug('model:\n%r', self.model)
# initialize layers
# FIXME: move import inside loop for custom layers in different modules
mod = importlib.import_module(meta.layers_mod, meta.layers_pkg)
src_model = {}
for layer, value in self.model.iteritems():
# from layers module get the layer's class definition
layer_cls = getattr(mod, meta.layer_cls_names[layer]) # class def
self.layers[layer] = layer_cls # add layer class def to model
# check if model layers are classes
src_value = {} # layer value generated from source classes
for src in value['sources']:
# check if source has keyword arguments
try:
src, kwargs = src
except (TypeError, ValueError):
kwargs = {} # no key work arguments
# skip if not a source class
if isinstance(src, basestring):
continue
# generate layer value from source class
src_value[src.__name__] = {'module': src.__module__,
'package': None}
# update layer keyword arguments
src_value[src.__name__].update(kwargs)
# use layer values generated from source class
if src_value:
value = src_model[layer] = src_value
else:
srcmod, srcpkg = value.get('module'), value.get('package')
try:
value = dict(value['sources'])
except ValueError:
value = dict.fromkeys(value['sources'], {})
for src in value.viewkeys():
if srcmod is not None:
value[src]['module'] = srcmod
if srcpkg is not None:
value[src]['package'] = srcpkg
# set layer attribute with model data
setattr(self, layer, layer_cls(value))
# update model with layer values generated from source classes
if src_model:
self.model.update(src_model)
self._update()
self._state = 'initialized'
[docs] def load(self, modelfile, layer=None):
"""
Load or update a model or layers in a model.
:param modelfile: The name of the json file to load.
:type modelfile: str
:param layer: Optionally load only specified layer.
:type layer: str
"""
# read modelfile, convert JSON and load/update model
self.param_file = modelfile
self._load(layer)
self._update(layer)
[docs] def edit(self, layer, item, delete=False):
"""
Edit model.
:param layer: Layer of model to edit
:type layer: str
:param item: Items to edit.
:type item: dict
:param delete: Flag to return
:class:`~simkit.core.layers.Layer` to delete item.
:type delete: bool
"""
# get layer attribute with model data
if hasattr(self, layer):
layer_obj = getattr(self, layer)
else:
raise AttributeError('missing layer: %s', layer)
if delete:
return layer_obj
# iterate over items and edit layer
for k, v in item.iteritems():
if k in layer_obj.layer:
layer_obj.edit(k, v) # edit layer
else:
raise AttributeError('missing layer item: %s', k)
# update model data
if k in self.model[layer]:
self.model[layer][k].update(v)
else:
raise AttributeError('missing model layer item: %s', k)
[docs] def add(self, layer, items):
"""
Add items in model.
"""
for k in items.iterkeys():
if k in self.model[layer]:
raise Exception('item %s is already in layer %s' % (k, layer))
self.model[layer].update(items)
# this should also update Layer.layer, the layer data
# same as calling layer constructor
# so now just need to add items to the layer
for k, v in items.iteritems():
getattr(self, layer).add(k, v['module'], v.get('package'))
[docs] def delete(self, layer, items):
"""
Delete items in model.
"""
# Use edit to get the layer obj containing item
items = _listify(items) # make items a list if it's not
layer_obj = self.edit(layer, dict.fromkeys(items), delete=True)
for k in items:
if k in layer_obj.layer:
layer_obj.delete(k)
else:
raise AttributeError('item %s missing from layer %s' %
(k, layer))
# don't need to pop items from self.model, because, self.layer
# points to the same object as the item in model!
# for example:
# (Pdb) id(self.model['data']) # same ID as layer in data
# 125639560L
# (Pdb) id(self.data.layer) # same ID as data in model
# 125639560L
[docs] def save(self, modelfile, layer=None):
"""
Save a model file.
:param modelfile: The name of the json file to save.
:type modelfile: str
:param layer: Optionally save only specified layer.
:type layer: str
"""
if layer:
obj = {layer: self.model[layer]}
else:
obj = self.model
with open(modelfile, 'w') as fp:
json.dump(obj, fp, indent=2, sort_keys=True)
@property
def registries(self):
return {layer: getattr(self, layer).reg
for layer in self.layers}
@property
def cmd_layer(self):
meta = getattr(self, ModelBase._meta_attr)
return getattr(self, meta.cmd_layer_name, NotImplemented)
@property
def commands(self):
return self.cmd_layer.reg.commands
[docs] def command(self, cmd, progress_hook=None, *args, **kwargs):
"""
Execute a model command.
:param cmd: Name of the command.
:param progress_hook: A function to which progress updates are passed.
"""
cmds = cmd.split(None, 1) # split commands and simulations
sim_names = cmds[1:] # simulations
if not sim_names:
sim_names = self.cmd_layer.reg.iterkeys()
for sim_name in sim_names:
sim_cmd = getattr(self.cmd_layer.reg[sim_name], cmd)
sim_cmd(self, progress_hook=progress_hook, *args, **kwargs)