"""
Module for optimization OModel.
"""
import logging
from typing import Any, TYPE_CHECKING
from collections import OrderedDict
from ams.utils.misc import elapsed
import cvxpy as cp
from ams.opt.optzbase import ensure_symbols, ensure_mats_and_parsed
if TYPE_CHECKING:
from ams.routines.routine import RoutineBase
logger = logging.getLogger(__name__)
[docs]
class OModelBase:
"""
Template class for optimization models.
"""
[docs]
def __init__(self, routine: "RoutineBase") -> None:
self.rtn = routine
self.prob = None
self.exprs = OrderedDict()
self.params = OrderedDict()
self.vars = OrderedDict()
self.constrs = OrderedDict()
self.obj = None
self.parsed = False
self.evaluated = False
self.finalized = False
@property
def initialized(self):
"""
Return the initialization status.
"""
return self.parsed and self.evaluated and self.finalized
[docs]
def parse(self, force=False):
self.parsed = True
return self.parsed
def _evaluate_params(self):
return True
def _evaluate_vars(self):
return True
def _evaluate_constrs(self):
return True
def _evaluate_obj(self):
return True
def _evaluate_exprs(self):
return True
def _evaluate_exprcs(self):
return True
[docs]
def evaluate(self, force=False):
self._evaluate_params()
self._evaluate_vars()
self._evaluate_exprs()
self._evaluate_constrs()
self._evaluate_obj()
self._evaluate_exprcs()
self.evaluated = True
return self.evaluated
[docs]
def finalize(self, force=False):
self.finalized = True
return True
[docs]
def init(self, force=False):
self.parse(force)
self.evaluate(force)
self.finalize(force)
return self.initialized
@property
def class_name(self):
return self.__class__.__name__
def _register_attribute(self, key, value):
"""
Register a pair of attributes to OModel instance.
Called within ``__setattr__``, this is where the magic happens.
Subclass attributes are automatically registered based on the variable type.
"""
if isinstance(value, cp.Variable):
self.vars[key] = value
elif isinstance(value, cp.Constraint):
self.constrs[key] = value
elif isinstance(value, cp.Parameter):
self.params[key] = value
elif isinstance(value, cp.Expression):
self.exprs[key] = value
def __setattr__(self, name: str, value: Any):
super().__setattr__(name, value)
self._register_attribute(name, value)
[docs]
def update(self, params):
return True
def __repr__(self) -> str:
return f'{self.rtn.class_name}.{self.__class__.__name__} at {hex(id(self))}'
[docs]
class OModel(OModelBase):
"""
Base class for optimization models.
Parameters
----------
routine: Routine
Routine that to be modeled.
Attributes
----------
prob: cvxpy.Problem
Optimization model.
exprs: OrderedDict
Expressions registry.
params: OrderedDict
Parameters registry.
vars: OrderedDict
Decision variables registry.
constrs: OrderedDict
Constraints registry.
obj: Objective
Objective function.
initialized: bool
Flag indicating if the model is initialized.
parsed: bool
Flag indicating if the model is parsed.
evaluated: bool
Flag indicating if the model is evaluated.
finalized: bool
Flag indicating if the model is finalized.
"""
[docs]
def __init__(self, routine: "RoutineBase") -> None:
OModelBase.__init__(self, routine)
@ensure_symbols
def parse(self, force=False):
"""
Parse the optimization model from the symbolic description.
This method should be called after the routine symbols are generated
`self.rtn.syms.generate_symbols()`. It parses the following components
of the optimization model: parameters, decision variables, constraints,
objective function, and expressions.
Parameters
----------
force : bool, optional
Flag indicating if to force the parsing.
Returns
-------
bool
Returns True if the parsing is successful, False otherwise.
"""
if self.parsed and not force:
logger.debug("Model is already parsed.")
return self.parsed
t, _ = elapsed()
logger.warning(f'Parsing OModel for <{self.rtn.class_name}>')
# --- expressions ---
for key, val in self.rtn.exprs.items():
try:
val.parse()
except Exception as e:
raise Exception(f"Failed to parse Expression <{key}>.\n{e}")
# --- RParams and Services as parameters ---
for key, val in self.rtn.params.items():
if not val.no_parse:
val.parse()
# --- decision variables ---
for key, val in self.rtn.vars.items():
val.parse()
# --- constraints ---
for key, val in self.rtn.constrs.items():
val.parse()
# --- ExpressionCalcs ---
for key, val in self.rtn.exprcs.items():
val.parse()
# --- objective ---
if self.rtn.obj is not None:
try:
self.rtn.obj.parse()
except Exception as e:
raise Exception(f"Failed to parse Objective <{self.rtn.obj.name}>.\n{e}")
elif self.rtn.class_name not in ['DCPF0']:
logger.warning(f"{self.rtn.class_name} has no objective function!")
self.parsed = False
return self.parsed
_, s = elapsed(t)
logger.debug(f" -> Parsed in {s}")
self.parsed = True
return self.parsed
def _evaluate_params(self):
"""
Evaluate the parameters.
"""
for key, val in self.rtn.params.items():
try:
val.evaluate()
setattr(self, key, val.optz)
except Exception as e:
raise Exception(f"Failed to evaluate Param <{key}>.\n{e}")
def _evaluate_vars(self):
"""
Evaluate the decision variables.
"""
for key, val in self.rtn.vars.items():
try:
val.evaluate()
setattr(self, key, val.optz)
except Exception as e:
raise Exception(f"Failed to evaluate Var <{key}>.\n{e}")
def _evaluate_constrs(self):
"""
Evaluate the constraints.
"""
for key, val in self.rtn.constrs.items():
try:
val.evaluate()
setattr(self, key, val.optz)
except Exception as e:
raise Exception(f"Failed to evaluate Constr <{key}>.\n{e}")
def _evaluate_obj(self):
"""
Evaluate the objective function.
"""
# NOTE: since we already have the attribute `obj`,
# we can update it rather than setting it
if self.rtn.obj is not None:
self.rtn.obj.evaluate()
self.obj = self.rtn.obj.optz
def _evaluate_exprs(self):
"""
Evaluate the expressions.
"""
for key, val in self.rtn.exprs.items():
try:
val.evaluate()
setattr(self, key, val.optz)
except Exception as e:
raise Exception(f"Failed to evaluate Expression <{key}>.\n{e}")
def _evaluate_exprcs(self):
"""
Evaluate the expressions.
"""
for key, val in self.rtn.exprcs.items():
try:
val.evaluate()
except Exception as e:
raise Exception(f"Failed to evaluate ExpressionCalc <{key}>.\n{e}")
@ensure_mats_and_parsed
def evaluate(self, force=False):
"""
Evaluate the optimization model.
This method should be called after `self.parse()`. It evaluates the following
components of the optimization model: parameters, decision variables, constraints,
objective function, and expressions.
Parameters
----------
force : bool, optional
Flag indicating if to force the evaluation
Returns
-------
bool
Returns True if the evaluation is successful, False otherwise.
"""
if self.evaluated and not force:
logger.debug("Model is already evaluated.")
return self.evaluated
logger.warning(f"Evaluating OModel for <{self.rtn.class_name}>")
t, _ = elapsed()
# NOTE: should evaluate in sequence
self._evaluate_params()
self._evaluate_vars()
self._evaluate_exprs()
self._evaluate_constrs()
self._evaluate_obj()
self._evaluate_exprcs()
self.evaluated = True
_, s = elapsed(t)
logger.debug(f" -> Evaluated in {s}")
return self.evaluated
[docs]
def finalize(self, force=False):
"""
Finalize the optimization model.
This method should be called after `self.evaluate()`. It assemble the optimization
problem from the evaluated components.
Returns
-------
bool
Returns True if the finalization is successful, False otherwise.
"""
# NOTE: for power flow type, we skip the finalization
if self.rtn.class_name in ['DCPF0']:
self.finalized = True
return self.finalized
if self.finalized and not force:
logger.debug("Model is already finalized.")
return self.finalized
logger.warning(f"Finalizing OModel for <{self.rtn.class_name}>")
t, _ = elapsed()
# Collect constraints that are not disabled
constrs_add = [val.optz for key, val in self.rtn.constrs.items(
) if not val.is_disabled and val is not None]
# Construct the problem using cvxpy.Problem
self.prob = cp.Problem(self.obj, constrs_add)
_, s = elapsed(t)
logger.debug(f" -> Finalized in {s}")
self.finalized = True
self._log_dpp_diagnostic()
return self.finalized
def _log_dpp_diagnostic(self):
"""
Log DPP-compliance for re-solve perf.
DPP compliance is a per-problem property — one non-DPP sub-term
spoils caching for every other parameter change. Logging this
at finalize lets routine authors spot when an accidental
non-DPP construct (e.g., ``c2 * pg ** 2`` with ``c2`` promoted
to ``cp.Parameter``) silently kills warm-resolve performance.
"""
try:
is_dpp = self.prob.is_dcp(dpp=True)
except Exception as exc:
logger.debug(f"DPP check failed for <{self.rtn.class_name}>: {exc}")
return
n_param = len(self.prob.parameters())
logger.debug(
f"DPP diagnostic <{self.rtn.class_name}>: dpp={is_dpp}, "
f"params={n_param}, vars={len(self.prob.variables())}"
)
if not is_dpp and n_param > 0:
logger.info(
f"<{self.rtn.class_name}> has parameters but is not DPP — "
f"warm re-solves will re-canonicalize. Check non-DPP terms."
)
[docs]
def init(self, force=False):
"""
Set up the optimization model from the symbolic description.
This method initializes the optimization model by parsing decision variables,
constraints, and the objective function from the associated routine.
Parameters
----------
force : bool, optional
Flag indicating if to force the OModel initialization.
If True, following methods will be called by force: `self.parse()`,
`self.evaluate()`, `self.finalize()`
Returns
-------
bool
Returns True if the setup is successful, False otherwise.
"""
if self.initialized and not force:
logger.debug("OModel is already initialized.")
return self.initialized
t, _ = elapsed()
# Auto-prep: ensure pycode for this routine is up-to-date and
# wire generated e_fn callables before parse/evaluate. Idempotent
# — runs the cache-check on every init call but only regenerates
# when md5 / version markers change. ``_link_pycode`` is the
# routine's internal hook for the OModel lifecycle, intentionally
# underscore-prefixed because it's not a stable user-facing API.
self.rtn._link_pycode() # pylint: disable=protected-access
self.parse(force=force)
self.evaluate(force=force)
self.finalize(force=force)
_, s = elapsed(t)
logger.debug(f"OModel for <{self.rtn.class_name}> initialized in {s}")
return self.initialized
@property
def class_name(self):
"""
Return the class name
"""
return self.__class__.__name__
def _register_attribute(self, key, value):
"""
Register a pair of attributes to OModel instance.
Called within ``__setattr__``, this is where the magic happens.
Subclass attributes are automatically registered based on the variable type.
"""
if isinstance(value, cp.Variable):
self.vars[key] = value
elif isinstance(value, cp.Constraint):
self.constrs[key] = value
elif isinstance(value, cp.Parameter):
self.params[key] = value
elif isinstance(value, cp.Expression):
self.exprs[key] = value
def __setattr__(self, name: str, value: Any):
super().__setattr__(name, value)
self._register_attribute(name, value)
[docs]
def update(self, params):
"""
Update the Parameter values.
Parameters
----------
params: list
List of parameters to be updated.
"""
for param in params:
param.update()
return True
def __repr__(self) -> str:
return f'{self.rtn.class_name}.{self.__class__.__name__} at {hex(id(self))}'