from __future__ import annotations
from dataclasses import dataclass
from typing import Literal
import numpy as np
from .elb import ElbSpec
from .sv import VolatilitySpec
from .var import design_matrix
[docs]
@dataclass(frozen=True, slots=True)
class MuSSVSSpec:
spike_var: float = 1e-4
slab_var: float = 100.0
inclusion_prob: float = 0.5
def __post_init__(self) -> None:
if self.spike_var <= 0 or not np.isfinite(self.spike_var):
raise ValueError("spike_var must be finite and > 0")
if self.slab_var <= 0 or not np.isfinite(self.slab_var):
raise ValueError("slab_var must be finite and > 0")
if not (0.0 < float(self.inclusion_prob) < 1.0):
raise ValueError("inclusion_prob must be in (0, 1)")
[docs]
@dataclass(frozen=True, slots=True)
class SteadyStateSpec:
mu0: np.ndarray # (N,)
v0_mu: float | np.ndarray
ssvs: MuSSVSSpec | None = None
def __post_init__(self) -> None:
m = np.asarray(self.mu0, dtype=float).reshape(-1)
if m.ndim != 1 or m.shape[0] < 1:
raise ValueError("mu0 must be a 1D array of shape (N,)")
if np.any(~np.isfinite(m)):
raise ValueError("mu0 must be finite")
if isinstance(self.v0_mu, (float, int, np.floating, np.integer)) and not isinstance(
self.v0_mu, bool
):
v_scalar = float(self.v0_mu)
if not np.isfinite(v_scalar) or v_scalar <= 0:
raise ValueError("v0_mu must be finite and > 0")
else:
v_vec = np.asarray(self.v0_mu, dtype=float).reshape(-1)
if v_vec.shape != (m.shape[0],):
raise ValueError("v0_mu must be a scalar or have shape (N,)")
if np.any(~np.isfinite(v_vec)) or np.any(v_vec <= 0):
raise ValueError("v0_mu must be finite and > 0")
[docs]
@dataclass(frozen=True, slots=True)
class ShockSpec:
"""Innovation (shock) distribution configuration.
Parameters
----------
family:
Shock family identifier.
- ``"gaussian"``: standard Gaussian innovations.
- ``"student_t"``: multivariate Student-t innovations via a scale-mixture of normals.
- ``"mixture_outlier"``: two-component Gaussian mixture with an outlier variance inflation.
df:
Degrees of freedom for ``family="student_t"``. Must be finite and > 2.
outlier_prob:
Outlier probability for ``family="mixture_outlier"``. Must be in (0, 1).
outlier_variance:
Variance inflation factor for outliers in ``family="mixture_outlier"``. Must be finite
and > 1.0. When an outlier occurs, innovations have covariance ``outlier_variance * Sigma``.
"""
family: Literal["gaussian", "student_t", "mixture_outlier"] = "gaussian"
df: float = 7.0
outlier_prob: float = 0.05
outlier_variance: float = 10.0
def __post_init__(self) -> None:
fam = str(self.family).lower()
if fam not in {"gaussian", "student_t", "mixture_outlier"}:
raise ValueError("family must be one of: gaussian, student_t, mixture_outlier")
if fam == "student_t":
nu = float(self.df)
if not np.isfinite(nu) or nu <= 2.0:
raise ValueError("df must be finite and > 2 for student_t")
if fam == "mixture_outlier":
p = float(self.outlier_prob)
if not (0.0 < p < 1.0):
raise ValueError("outlier_prob must be in (0, 1) for mixture_outlier")
kappa = float(self.outlier_variance)
if not np.isfinite(kappa) or kappa <= 1.0:
raise ValueError("outlier_variance must be finite and > 1 for mixture_outlier")
[docs]
@dataclass(frozen=True, slots=True)
class ModelSpec:
"""Model configuration for VAR/SRVAR estimation.
Parameters
----------
p:
VAR lag order.
include_intercept:
Whether to include an intercept term in the VAR design matrix.
steady_state:
Optional steady-state configuration.
elb:
Optional effective-lower-bound (ELB) configuration. When enabled, specified
variables are treated as censored at the bound and latent shadow values are
sampled during estimation.
volatility:
Optional stochastic volatility configuration. When enabled, the model uses a
stochastic volatility specification for time-varying variances (random-walk or
AR(1) dynamics). Residual covariance can be diagonal (independent shocks) or a
triangular factorization with time-invariant correlations.
Notes
-----
``ModelSpec`` is intentionally small and immutable. The heavy lifting is done in
:func:`srvar.api.fit`.
"""
p: int
include_intercept: bool = True
steady_state: SteadyStateSpec | None = None
elb: ElbSpec | None = None
volatility: VolatilitySpec | None = None
shocks: ShockSpec | None = None
def __post_init__(self) -> None:
if self.p < 1:
raise ValueError("p must be >= 1")
if self.steady_state is not None and not self.include_intercept:
raise ValueError("steady_state requires include_intercept=True")
[docs]
@dataclass(frozen=True, slots=True)
class NIWPrior:
"""Normal-Inverse-Wishart (NIW) prior parameters.
This prior is used for conjugate Bayesian VAR estimation.
Notes
-----
Shapes follow the conventions used throughout the toolkit:
- ``m0`` has shape ``(K, N)`` where ``K = (1 if include_intercept else 0) + N * p``
- ``v0`` has shape ``(K, K)``
- ``s0`` has shape ``(N, N)``
"""
m0: np.ndarray # (K, N)
v0: np.ndarray # (K, K)
s0: np.ndarray # (N, N)
nu0: float
[docs]
@dataclass(frozen=True, slots=True)
class SSVSSpec:
"""Hyperparameters for stochastic search variable selection (SSVS).
The SSVS implementation uses a spike-and-slab Gaussian prior over coefficient
rows (predictor-specific inclusion indicators).
Parameters
----------
spike_var:
Prior variance for excluded predictors (spike component).
slab_var:
Prior variance for included predictors (slab component).
inclusion_prob:
Prior inclusion probability for each predictor.
intercept_slab_var:
Optional slab variance override for the intercept term.
fix_intercept:
If True and an intercept is included in the model, the intercept is always
included.
"""
spike_var: float = 1e-4
slab_var: float = 100.0
inclusion_prob: float = 0.5
intercept_slab_var: float | None = None
fix_intercept: bool = True
def __post_init__(self) -> None:
if self.spike_var <= 0:
raise ValueError("spike_var must be > 0")
if self.slab_var <= 0:
raise ValueError("slab_var must be > 0")
if not (0.0 < self.inclusion_prob < 1.0):
raise ValueError("inclusion_prob must be in (0, 1)")
if self.intercept_slab_var is not None and self.intercept_slab_var <= 0:
raise ValueError("intercept_slab_var must be > 0")
[docs]
@dataclass(frozen=True, slots=True)
class BLassoSpec:
mode: Literal["global", "adaptive"] = "global"
a0_global: float = 1.0
b0_global: float = 1.0
a0_c: float = 1.0
b0_c: float = 1.0
a0_L: float = 1.0
b0_L: float = 1.0
tau_init: float = 1e4
lambda_init: float = 2.0
def __post_init__(self) -> None:
if self.mode not in {"global", "adaptive"}:
raise ValueError("mode must be one of: global, adaptive")
for name in [
"a0_global",
"b0_global",
"a0_c",
"b0_c",
"a0_L",
"b0_L",
"tau_init",
"lambda_init",
]:
v = float(getattr(self, name))
if not np.isfinite(v) or v <= 0:
raise ValueError(f"{name} must be finite and > 0")
[docs]
@dataclass(frozen=True, slots=True)
class DLSpec:
"""Hyperparameters for the Dirichlet–Laplace (DL) shrinkage prior.
This is a global–local shrinkage prior implemented to match the MATLAB reference
code in ``functions/parfor_vintages.m`` (model mnemonic "DirLap").
Parameters
----------
abeta:
Dirichlet concentration parameter (named ``abeta`` in the MATLAB code).
dl_scaler:
Small positive initialization scale used for the latent DL variables
(named ``DL_scaler`` in the MATLAB code).
"""
abeta: float = 0.5
dl_scaler: float = 0.1
def __post_init__(self) -> None:
for name in ["abeta", "dl_scaler"]:
v = float(getattr(self, name))
if not np.isfinite(v) or v <= 0:
raise ValueError(f"{name} must be finite and > 0")
[docs]
@dataclass(frozen=True, slots=True)
class MinnesotaCanonicalSpec:
"""Equation-wise canonical Minnesota prior metadata.
Parameters
----------
sigma2:
Scale variances used in the Minnesota construction (length ``N``).
inv_v0_vec:
Equation-wise prior precision vector over ``vec(beta)`` in Fortran / column-major order.
The block of ``K`` coefficients for equation ``j`` occupies
``inv_v0_vec[j*K:(j+1)*K]``.
mode:
Equation-wise Minnesota variant label. ``"canonical"`` is the exact canonical
construction; ``"tempered"`` is the experimental geometric bridge between the
legacy and canonical variance maps.
tempered_alpha:
Blend weight for ``mode="tempered"``. ``0.0`` reproduces the legacy coefficient
variances and ``1.0`` reproduces the canonical coefficient variances.
"""
sigma2: np.ndarray
inv_v0_vec: np.ndarray
mode: Literal["canonical", "tempered"] = "canonical"
tempered_alpha: float | None = None
def __post_init__(self) -> None:
sigma2 = np.asarray(self.sigma2, dtype=float).reshape(-1)
inv_v0_vec = np.asarray(self.inv_v0_vec, dtype=float).reshape(-1)
if sigma2.shape[0] < 1:
raise ValueError("sigma2 must be a non-empty 1D array")
if np.any(~np.isfinite(sigma2)) or np.any(sigma2 <= 0):
raise ValueError("sigma2 must be finite and > 0")
if inv_v0_vec.shape[0] < 1:
raise ValueError("inv_v0_vec must be a non-empty 1D array")
if np.any(~np.isfinite(inv_v0_vec)) or np.any(inv_v0_vec <= 0):
raise ValueError("inv_v0_vec must be finite and > 0")
mode = str(self.mode).lower()
if mode not in {"canonical", "tempered"}:
raise ValueError("mode must be one of: canonical, tempered")
if mode == "canonical":
if self.tempered_alpha is not None:
raise ValueError("tempered_alpha must be omitted when mode='canonical'")
return
if self.tempered_alpha is None:
raise ValueError("tempered_alpha is required when mode='tempered'")
alpha = float(self.tempered_alpha)
if not np.isfinite(alpha) or not (0.0 <= alpha <= 1.0):
raise ValueError("tempered_alpha must be finite and in [0, 1]")
def _validate_minnesota_hyperparameters(
*,
p: int,
y: np.ndarray,
n: int | None,
lambda1: float,
lambda2: float,
lambda3: float,
lambda4: float,
own_lag_means: np.ndarray | list[float] | None,
own_lag_mean: float,
) -> tuple[np.ndarray, int]:
v = np.asarray(y, dtype=float)
if v.ndim != 2:
raise ValueError("y must be a 2D array of shape (T, N)")
t, n_y = v.shape
if n is None:
n = int(n_y)
if int(n) != int(n_y):
raise ValueError("n must match y.shape[1]")
if p < 1:
raise ValueError("p must be >= 1")
if t <= p:
raise ValueError("T must be > p")
if lambda1 <= 0:
raise ValueError("lambda1 must be > 0")
if lambda2 <= 0:
raise ValueError("lambda2 must be > 0")
if lambda3 < 0:
raise ValueError("lambda3 must be >= 0")
if lambda4 <= 0:
raise ValueError("lambda4 must be > 0")
if own_lag_means is not None and own_lag_mean != 0.0:
raise ValueError("specify at most one of own_lag_means and own_lag_mean")
return v, int(n)
def _estimate_minnesota_sigma2(
*,
y: np.ndarray,
p: int,
include_intercept: bool,
min_sigma2: float,
) -> np.ndarray:
v = np.asarray(y, dtype=float)
n = int(v.shape[1])
sigma2 = np.empty(n, dtype=float)
for i in range(n):
xi, yi = design_matrix(v[:, [i]], p, include_intercept=include_intercept)
b, *_ = np.linalg.lstsq(xi, yi, rcond=None)
resid = yi - xi @ b
denom = max(int(resid.shape[0] - xi.shape[1]), 1)
s2 = float((resid.T @ resid)[0, 0] / denom)
sigma2[i] = max(s2, float(min_sigma2))
return sigma2
def _build_minnesota_prior_mean(
*,
n: int,
p: int,
include_intercept: bool,
own_lag_means: np.ndarray | list[float] | None,
own_lag_mean: float,
) -> np.ndarray:
k = (1 if include_intercept else 0) + n * p
m0 = np.zeros((k, n), dtype=float)
base = 1 if include_intercept else 0
if own_lag_means is not None:
olm = np.asarray(own_lag_means, dtype=float).reshape(-1)
if olm.shape != (n,):
raise ValueError("own_lag_means must have shape (N,)")
for j in range(n):
m0[base + j, j] = float(olm[j])
elif own_lag_mean != 0.0:
for j in range(n):
m0[base + j, j] = float(own_lag_mean)
return m0
def _canonical_minnesota_variances(
*,
n: int,
p: int,
include_intercept: bool,
sigma2: np.ndarray,
lambda1: float,
lambda2: float,
lambda3: float,
lambda4: float,
) -> np.ndarray:
sigma2_arr = np.asarray(sigma2, dtype=float).reshape(-1)
if sigma2_arr.shape != (n,):
raise ValueError("sigma2 must have shape (N,)")
k = (1 if include_intercept else 0) + n * p
var = np.zeros((k, n), dtype=float)
if include_intercept:
var[0, :] = float((lambda1 * lambda4) ** 2) * sigma2_arr
base = 1 if include_intercept else 0
for lag in range(1, p + 1):
lag_scale = float((lambda1**2) / (lag ** (2.0 * lambda3)))
for pred_idx in range(n):
row = base + (lag - 1) * n + pred_idx
for eq_idx in range(n):
if pred_idx == eq_idx:
var[row, eq_idx] = lag_scale
else:
var[row, eq_idx] = (
lag_scale * (lambda2**2) * sigma2_arr[eq_idx] / sigma2_arr[pred_idx]
)
return var
def _tempered_minnesota_variances(
*,
legacy_var: np.ndarray,
canonical_var: np.ndarray,
alpha: float,
) -> np.ndarray:
legacy_var_arr = np.asarray(legacy_var, dtype=float)
canonical_var_arr = np.asarray(canonical_var, dtype=float)
if legacy_var_arr.shape != canonical_var_arr.shape:
raise ValueError("legacy_var and canonical_var must have the same shape")
if np.any(~np.isfinite(legacy_var_arr)) or np.any(legacy_var_arr <= 0):
raise ValueError("legacy_var must be finite and > 0")
if np.any(~np.isfinite(canonical_var_arr)) or np.any(canonical_var_arr <= 0):
raise ValueError("canonical_var must be finite and > 0")
alpha_f = float(alpha)
if not np.isfinite(alpha_f) or not (0.0 <= alpha_f <= 1.0):
raise ValueError("alpha must be finite and in [0, 1]")
return legacy_var_arr * np.power(canonical_var_arr / legacy_var_arr, alpha_f)
[docs]
@dataclass(frozen=True, slots=True)
class PriorSpec:
"""Prior specification wrapper.
This object selects the prior family via ``family`` and carries the required
parameter blocks.
Parameters
----------
family:
Prior family identifier. Supported values include ``"niw"``, ``"ssvs"``,
``"blasso"``, and ``"dl"``.
niw:
NIW parameter block. This is required for both families because SSVS uses
a NIW-like structure with a modified coefficient covariance ``V0``.
ssvs:
Optional SSVS hyperparameters (required when ``family='ssvs'``).
blasso:
Optional Bayesian Lasso hyperparameters (required when ``family='blasso'``).
dl:
Optional Dirichlet–Laplace hyperparameters (required when ``family='dl'``).
minnesota_canonical:
Optional equation-wise Minnesota metadata. When present, the prior is fit
via the explicit equation-wise Minnesota path rather than the legacy NIW
matrix-normal path. ``mode="canonical"`` is the exact canonical
construction; ``mode="tempered"`` is the experimental bridge between the
legacy and canonical variance maps.
"""
family: str
niw: NIWPrior
ssvs: SSVSSpec | None = None
blasso: BLassoSpec | None = None
dl: DLSpec | None = None
minnesota_canonical: MinnesotaCanonicalSpec | None = None
[docs]
@staticmethod
def niw_default(*, k: int, n: int) -> PriorSpec:
"""Construct a simple default NIW prior.
This uses a zero prior mean for coefficients and relatively weak
regularization.
Parameters
----------
k:
Number of regressors ``K``.
n:
Number of endogenous variables ``N``.
"""
m0 = np.zeros((k, n), dtype=float)
v0 = 10.0 * np.eye(k, dtype=float)
s0 = np.eye(n, dtype=float)
nu0 = float(n + 2)
return PriorSpec(family="niw", niw=NIWPrior(m0=m0, v0=v0, s0=s0, nu0=nu0))
[docs]
@staticmethod
def niw_minnesota(
*,
p: int,
y: np.ndarray,
n: int | None = None,
include_intercept: bool = True,
lambda1: float = 0.1,
lambda2: float = 0.5,
lambda3: float = 1.0,
lambda4: float = 100.0,
own_lag_means: np.ndarray | list[float] | None = None,
own_lag_mean: float = 0.0,
min_sigma2: float = 1e-12,
) -> PriorSpec:
"""Backward-compatible alias for the legacy Minnesota-style NIW prior.
This method preserves the historical ``srvar-toolkit`` behaviour. For an
explicit reproducibility path, prefer
:meth:`PriorSpec.niw_minnesota_legacy`.
Notes
-----
The current implementation is a legacy Minnesota-style NIW construction,
not a full canonical Minnesota prior. In particular, it uses one scalar
cross-variable shrinkage weight derived from ``lambda2`` rather than
equation-specific own-vs-cross shrinkage.
"""
return PriorSpec.niw_minnesota_legacy(
p=p,
y=y,
n=n,
include_intercept=include_intercept,
lambda1=lambda1,
lambda2=lambda2,
lambda3=lambda3,
lambda4=lambda4,
own_lag_means=own_lag_means,
own_lag_mean=own_lag_mean,
min_sigma2=min_sigma2,
)
[docs]
@staticmethod
def niw_minnesota_legacy(
*,
p: int,
y: np.ndarray,
n: int | None = None,
include_intercept: bool = True,
lambda1: float = 0.1,
lambda2: float = 0.5,
lambda3: float = 1.0,
lambda4: float = 100.0,
own_lag_means: np.ndarray | list[float] | None = None,
own_lag_mean: float = 0.0,
min_sigma2: float = 1e-12,
) -> PriorSpec:
"""Construct the legacy Minnesota-style NIW prior used by this toolkit.
This preserves the historical ``srvar-toolkit`` prior semantics. It is a
Minnesota-style NIW construction with lag decay and a scalar
cross-variable shrinkage weight, but it is not a full canonical
Minnesota prior.
Parameters
----------
p:
VAR lag order.
y:
Data array used to estimate scaling variances (T, N).
include_intercept:
Whether the resulting prior is compatible with a VAR that includes an
intercept.
lambda1, lambda2, lambda3, lambda4:
Legacy Minnesota-style hyperparameters. ``lambda2`` is collapsed into
a scalar cross-variable weight shared across equations.
own_lag_means / own_lag_mean:
Optional prior mean(s) for own first lag.
min_sigma2:
Floor for estimated variances used in scaling.
Returns
-------
PriorSpec
A ``PriorSpec`` instance with ``family='niw'``.
"""
v, n = _validate_minnesota_hyperparameters(
p=p,
y=y,
n=n,
lambda1=lambda1,
lambda2=lambda2,
lambda3=lambda3,
lambda4=lambda4,
own_lag_means=own_lag_means,
own_lag_mean=own_lag_mean,
)
sigma2 = _estimate_minnesota_sigma2(
y=v,
p=p,
include_intercept=include_intercept,
min_sigma2=min_sigma2,
)
k = (1 if include_intercept else 0) + n * p
m0 = _build_minnesota_prior_mean(
n=n,
p=p,
include_intercept=include_intercept,
own_lag_means=own_lag_means,
own_lag_mean=own_lag_mean,
)
v0 = np.zeros((k, k), dtype=float)
if include_intercept:
v0[0, 0] = float((lambda1 * lambda4) ** 2)
if n == 1:
cross_weight = 1.0
else:
cross_weight = float((1.0 + (n - 1) * (lambda2**2)) / n)
base = 1 if include_intercept else 0
for lag in range(1, p + 1):
lag_scale = float((lambda1**2) / (lag ** (2.0 * lambda3)))
for i in range(n):
idx = base + (lag - 1) * n + i
v0[idx, idx] = float(lag_scale * cross_weight / sigma2[i])
s0 = np.diag(sigma2)
nu0 = float(n + 2)
return PriorSpec(family="niw", niw=NIWPrior(m0=m0, v0=v0, s0=s0, nu0=nu0))
[docs]
@staticmethod
def niw_minnesota_canonical(
*,
p: int,
y: np.ndarray,
n: int | None = None,
include_intercept: bool = True,
lambda1: float = 0.1,
lambda2: float = 0.5,
lambda3: float = 1.0,
lambda4: float = 100.0,
own_lag_means: np.ndarray | list[float] | None = None,
own_lag_mean: float = 0.0,
min_sigma2: float = 1e-12,
) -> PriorSpec:
"""Construct an explicit canonical Minnesota prior on the equation-wise path.
This constructor preserves the standard own-vs-cross variance distinction by
storing per-equation prior precisions rather than collapsing everything into a
shared NIW row covariance. The resulting fit path is currently supported for
homoskedastic models and diagonal stochastic-volatility models.
Notes
-----
The exact equation-wise coefficient prior is carried in
``prior.minnesota_canonical.inv_v0_vec``. The companion ``prior.niw.v0``
block is a row-averaged diagonal summary kept for shape compatibility with the
rest of the toolkit.
"""
v, n = _validate_minnesota_hyperparameters(
p=p,
y=y,
n=n,
lambda1=lambda1,
lambda2=lambda2,
lambda3=lambda3,
lambda4=lambda4,
own_lag_means=own_lag_means,
own_lag_mean=own_lag_mean,
)
sigma2 = _estimate_minnesota_sigma2(
y=v,
p=p,
include_intercept=include_intercept,
min_sigma2=min_sigma2,
)
var = _canonical_minnesota_variances(
n=n,
p=p,
include_intercept=include_intercept,
sigma2=sigma2,
lambda1=lambda1,
lambda2=lambda2,
lambda3=lambda3,
lambda4=lambda4,
)
m0 = _build_minnesota_prior_mean(
n=n,
p=p,
include_intercept=include_intercept,
own_lag_means=own_lag_means,
own_lag_mean=own_lag_mean,
)
# The exact canonical prior is equation-wise. Keep a row-average summary in
# NIW.v0 for compatibility with code that expects a (K, K) block.
v0_summary = np.diag(np.mean(var, axis=1))
niw = NIWPrior(
m0=m0,
v0=v0_summary,
s0=np.diag(sigma2),
nu0=2.0,
)
canonical = MinnesotaCanonicalSpec(
sigma2=sigma2,
inv_v0_vec=1.0 / var.reshape(-1, order="F"),
)
return PriorSpec(
family="niw",
niw=niw,
minnesota_canonical=canonical,
)
[docs]
@staticmethod
def niw_minnesota_tempered(
*,
p: int,
y: np.ndarray,
n: int | None = None,
include_intercept: bool = True,
lambda1: float = 0.1,
lambda2: float = 0.5,
lambda3: float = 1.0,
lambda4: float = 100.0,
alpha: float = 0.25,
own_lag_means: np.ndarray | list[float] | None = None,
own_lag_mean: float = 0.0,
min_sigma2: float = 1e-12,
) -> PriorSpec:
"""Construct an experimental tempered bridge between legacy and canonical Minnesota.
This keeps the equation-wise sampling path used by
:meth:`PriorSpec.niw_minnesota_canonical`, but geometrically blends the
legacy and canonical coefficient variances:
- ``alpha=0.0`` reproduces the legacy variance map
- ``alpha=1.0`` reproduces the canonical variance map
Notes
-----
This is an explicit experimental mitigation path. It does not redefine the
semantics of :meth:`PriorSpec.niw_minnesota_canonical`.
"""
alpha_f = float(alpha)
if not np.isfinite(alpha_f) or not (0.0 <= alpha_f <= 1.0):
raise ValueError("alpha must be finite and in [0, 1]")
legacy = PriorSpec.niw_minnesota_legacy(
p=p,
y=y,
n=n,
include_intercept=include_intercept,
lambda1=lambda1,
lambda2=lambda2,
lambda3=lambda3,
lambda4=lambda4,
own_lag_means=own_lag_means,
own_lag_mean=own_lag_mean,
min_sigma2=min_sigma2,
)
canonical = PriorSpec.niw_minnesota_canonical(
p=p,
y=y,
n=n,
include_intercept=include_intercept,
lambda1=lambda1,
lambda2=lambda2,
lambda3=lambda3,
lambda4=lambda4,
own_lag_means=own_lag_means,
own_lag_mean=own_lag_mean,
min_sigma2=min_sigma2,
)
canonical_meta = canonical.minnesota_canonical
if canonical_meta is None:
raise RuntimeError("canonical Minnesota metadata missing")
n_eq = canonical.niw.m0.shape[1]
legacy_diag = np.diag(np.asarray(legacy.niw.v0, dtype=float))
legacy_var = np.repeat(legacy_diag.reshape(-1, 1), repeats=n_eq, axis=1)
canonical_var = 1.0 / np.asarray(canonical_meta.inv_v0_vec, dtype=float).reshape(
legacy_var.shape,
order="F",
)
tempered_var = _tempered_minnesota_variances(
legacy_var=legacy_var,
canonical_var=canonical_var,
alpha=alpha_f,
)
v0_summary = np.diag(np.mean(tempered_var, axis=1))
niw = NIWPrior(
m0=np.asarray(canonical.niw.m0, dtype=float).copy(),
v0=v0_summary,
s0=np.asarray(canonical.niw.s0, dtype=float).copy(),
nu0=float(canonical.niw.nu0),
)
tempered = MinnesotaCanonicalSpec(
sigma2=np.asarray(canonical_meta.sigma2, dtype=float).copy(),
inv_v0_vec=1.0 / tempered_var.reshape(-1, order="F"),
mode="tempered",
tempered_alpha=alpha_f,
)
return PriorSpec(
family="niw",
niw=niw,
minnesota_canonical=tempered,
)
[docs]
@staticmethod
def from_ssvs(
*,
k: int,
n: int,
include_intercept: bool = True,
m0: np.ndarray | None = None,
s0: np.ndarray | None = None,
nu0: float | None = None,
spike_var: float = 1e-4,
slab_var: float = 100.0,
inclusion_prob: float = 0.5,
intercept_slab_var: float | None = None,
fix_intercept: bool = True,
) -> PriorSpec:
"""Construct a prior specification for SSVS estimation.
Parameters
----------
k:
Number of regressors ``K``.
n:
Number of endogenous variables ``N``.
include_intercept:
Whether the corresponding model includes an intercept.
m0, s0, nu0:
Optional NIW blocks. If omitted, sensible defaults are used.
spike_var, slab_var, inclusion_prob:
SSVS hyperparameters.
intercept_slab_var:
Optional slab variance override for the intercept term.
fix_intercept:
Whether to force inclusion of the intercept row.
"""
if k < 1:
raise ValueError("k must be >= 1")
if n < 1:
raise ValueError("n must be >= 1")
m0a: np.ndarray
if m0 is None:
m0a = np.zeros((k, n), dtype=float)
else:
m0a = np.asarray(m0, dtype=float)
if m0a.shape != (k, n):
raise ValueError("m0 must have shape (K, N)")
if s0 is None:
s0a = np.eye(n, dtype=float)
else:
s0a = np.asarray(s0, dtype=float)
if s0a.shape != (n, n):
raise ValueError("s0 must have shape (N, N)")
nu0a = float(n + 2) if nu0 is None else float(nu0)
niw = NIWPrior(
m0=m0a,
v0=np.eye(k, dtype=float),
s0=s0a,
nu0=nu0a,
)
spec = SSVSSpec(
spike_var=float(spike_var),
slab_var=float(slab_var),
inclusion_prob=float(inclusion_prob),
intercept_slab_var=None if intercept_slab_var is None else float(intercept_slab_var),
fix_intercept=bool(fix_intercept and include_intercept),
)
return PriorSpec(family="ssvs", niw=niw, ssvs=spec)
[docs]
@staticmethod
def from_blasso(
*,
k: int,
n: int,
mode: Literal["global", "adaptive"] = "global",
include_intercept: bool = True,
m0: np.ndarray | None = None,
s0: np.ndarray | None = None,
nu0: float | None = None,
a0_global: float = 1.0,
b0_global: float = 1.0,
a0_c: float = 1.0,
b0_c: float = 1.0,
a0_L: float = 1.0,
b0_L: float = 1.0,
tau_init: float = 1e4,
lambda_init: float = 2.0,
) -> PriorSpec:
if k < 1:
raise ValueError("k must be >= 1")
if n < 1:
raise ValueError("n must be >= 1")
m0a: np.ndarray
if m0 is None:
m0a = np.zeros((k, n), dtype=float)
else:
m0a = np.asarray(m0, dtype=float)
if m0a.shape != (k, n):
raise ValueError("m0 must have shape (K, N)")
if s0 is None:
s0a = np.eye(n, dtype=float)
else:
s0a = np.asarray(s0, dtype=float)
if s0a.shape != (n, n):
raise ValueError("s0 must have shape (N, N)")
nu0a = float(n + 2) if nu0 is None else float(nu0)
niw = NIWPrior(
m0=m0a,
v0=np.eye(k, dtype=float),
s0=s0a,
nu0=nu0a,
)
spec = BLassoSpec(
mode=mode,
a0_global=float(a0_global),
b0_global=float(b0_global),
a0_c=float(a0_c),
b0_c=float(b0_c),
a0_L=float(a0_L),
b0_L=float(b0_L),
tau_init=float(tau_init),
lambda_init=float(lambda_init),
)
_ = bool(include_intercept)
return PriorSpec(family="blasso", niw=niw, blasso=spec)
[docs]
@staticmethod
def from_dl(
*,
k: int,
n: int,
include_intercept: bool = True,
m0: np.ndarray | None = None,
s0: np.ndarray | None = None,
nu0: float | None = None,
abeta: float = 0.5,
dl_scaler: float = 0.1,
) -> PriorSpec:
if k < 1:
raise ValueError("k must be >= 1")
if n < 1:
raise ValueError("n must be >= 1")
m0a: np.ndarray
if m0 is None:
m0a = np.zeros((k, n), dtype=float)
else:
m0a = np.asarray(m0, dtype=float)
if m0a.shape != (k, n):
raise ValueError("m0 must have shape (K, N)")
if s0 is None:
s0a = np.eye(n, dtype=float)
else:
s0a = np.asarray(s0, dtype=float)
if s0a.shape != (n, n):
raise ValueError("s0 must have shape (N, N)")
nu0a = float(n + 2) if nu0 is None else float(nu0)
niw = NIWPrior(
m0=m0a,
v0=np.eye(k, dtype=float),
s0=s0a,
nu0=nu0a,
)
spec = DLSpec(abeta=float(abeta), dl_scaler=float(dl_scaler))
_ = bool(include_intercept)
return PriorSpec(family="dl", niw=niw, dl=spec)
[docs]
@dataclass(frozen=True, slots=True)
class SamplerConfig:
"""MCMC configuration for Gibbs samplers.
Parameters
----------
draws:
Total number of iterations to run.
burn_in:
Number of initial iterations to discard.
thin:
Keep every ``thin``-th draw after burn-in.
Notes
-----
For conjugate NIW estimation (no ELB/SV), draws are sampled directly and then
burn-in/thinning is applied post hoc. For iterative samplers (ELB, SSVS, SV),
burn-in/thinning is applied online.
"""
draws: int = 2000
burn_in: int = 500
thin: int = 1
def __post_init__(self) -> None:
if self.draws < 1:
raise ValueError("draws must be >= 1")
if self.burn_in < 0:
raise ValueError("burn_in must be >= 0")
if self.thin < 1:
raise ValueError("thin must be >= 1")