Source code for qmlhc.optim.numpy_optim.mpc

# -*- coding: utf-8 -*-
"""
Short-Horizon MPC Optimizer
---------------------------
Model-Predictive Control (MPC) over a short horizon. Optimizes control-like
parameters (e.g., alpha) by rolling out a few steps ahead, minimizing the
cumulative cost with a small action penalty. Uses simple projected gradient
descent over the horizon (can be swapped for a QP solver later).

Interface:
    - initialize(params) -> state
    - step_params(model, params, context) -> (new_params, state)

Context:
    - context["rollout_fn"](model, params, horizon, context) -> (traj_info, cum_loss)
      where cum_loss already aggregates Task + Cons + Coh + action_penalty.
    - "horizon": int, number of predictive steps (default 3)
    - Optional: "project_fn"(params) to project back to feasible set.
"""

from __future__ import annotations
from typing import Any, Dict, Mapping, Tuple
import numpy as np


[docs] class HCMPCShortHorizon: """Short-horizon MPC with simple gradient descent over cumulative loss.""" def __init__(self, lr: float = 1e-2, horizon: int = 3, clip: float | None = None): self.lr = float(lr) self.horizon_default = int(horizon) self.clip = clip self._state: Dict[str, Any] = {}
[docs] def initialize(self, params: Mapping[str, Any]) -> Dict[str, Any]: self._state = {"steps": 0} return dict(self._state)
[docs] def step_params( self, model: Any, params: Mapping[str, Any], context: Mapping[str, Any] ) -> Tuple[Dict[str, Any], Dict[str, Any]]: horizon = int(context.get("horizon", self.horizon_default)) rollout_fn = context["rollout_fn"] project_fn = context.get("project_fn", None) # finite-difference on params w.r.t cumulative loss over the horizon keys = sorted(params.keys()) theta = np.concatenate([np.atleast_1d(np.asarray(params[k], dtype=float)) for k in keys]) eps = 1e-3 grad = np.zeros_like(theta) def loss_at(vec): p = {} idx = 0 for k in keys: v_like = np.atleast_1d(np.asarray(params[k], dtype=float)) n = v_like.size p[k] = np.squeeze(vec[idx: idx + n]) if n == 1 else vec[idx: idx + n].reshape(v_like.shape) idx += n _, cum_loss = rollout_fn(model, p, horizon, context) return float(cum_loss) for i in range(theta.size): e = np.zeros_like(theta); e[i] = eps lp = loss_at(theta + e) lm = loss_at(theta - e) grad[i] = (lp - lm) / (2.0 * eps) theta_new = theta - self.lr * grad if self.clip is not None: theta_new = np.clip(theta_new, -self.clip, self.clip) # rebuild params new_params: Dict[str, Any] = {} idx = 0 for k in keys: v_like = np.atleast_1d(np.asarray(params[k], dtype=float)) n = v_like.size new_params[k] = np.squeeze(theta_new[idx: idx + n]) if n == 1 else theta_new[idx: idx + n].reshape(v_like.shape) idx += n # optional projection if project_fn is not None: new_params = project_fn(new_params) self._state = { "steps": self._state.get("steps", 0) + 1, "horizon": horizon, "grad_norm": float(np.linalg.norm(grad)), } return new_params, dict(self._state)