Source code for srvar.sv

from __future__ import annotations

import os
from dataclasses import dataclass
from typing import Literal

import numpy as np
import scipy.linalg
import scipy.special
import scipy.stats

from .linalg import cholesky_jitter, solve_psd, symmetrize


[docs] @dataclass(frozen=True, slots=True) class VolatilitySpec: """Stochastic volatility configuration. When enabled in :class:`srvar.spec.ModelSpec`, estimation uses a diagonal stochastic volatility random-walk (SVRW) model for time-varying variances. By default, residual covariance is diagonal (independent shocks). Setting ``covariance='triangular'`` enables a triangular factorization: ``Sigma_t = Q^{-1} diag(exp(h_t)) (Q^{-1})'`` where ``Q`` is upper-triangular with ones on the diagonal. This yields a full residual covariance matrix with time-varying variances and a time-invariant correlation structure. Setting ``covariance='factor'`` enables factor stochastic volatility (FSV): ``eps_t = Lambda f_t + eta_t`` ``f_t ~ N(0, diag(exp(h_f,t)))`` ``eta_t ~ N(0, diag(exp(h_eta,t)))`` ``Sigma_t = Lambda diag(exp(h_f,t)) Lambda' + diag(exp(h_eta,t))`` This yields a full, time-varying covariance matrix with scalable low-rank structure. Parameters ---------- enabled: Whether stochastic volatility is enabled. dynamics: Volatility state dynamics. ``"rw"`` uses a random-walk prior: ``h_t = h_{t-1} + eta_t`` ``"ar1"`` uses an AR(1) prior: ``h_t = gamma0 + phi * h_{t-1} + eta_t`` with ``phi`` and ``gamma0`` sampled in the Gibbs routine. covariance: Covariance structure. ``"diagonal"`` is independent shocks. ``"triangular"`` uses an upper-triangular factor ``Q`` with ones on the diagonal (a CCCM-style multivariate SV factorization). ``"factor"`` enables factor SV (FSV). q_prior_var: Prior variance for the off-diagonal elements of ``Q`` when ``covariance='triangular'``. k_factors: Number of latent factors for ``covariance='factor'``. loading_prior_var: Prior variance for free elements of the factor loading matrix ``Lambda`` when ``covariance='factor'``. store_factor_draws: If True, store latent factor draws ``f_t`` in the fit result. This can be memory intensive (scales with ``draws × T × k_factors``), so the default is False. epsilon: Small positive constant used in the transform ``log(e_t^2 + epsilon)`` to avoid ``log(0)``. h0_prior_mean: Prior mean for the initial log-variance state ``h0``. h0_prior_var: Prior variance for the initial log-variance state ``h0``. sigma_eta_prior_nu0, sigma_eta_prior_s0: Prior hyperparameters for the innovation variance of the log-volatility random walk. phi_prior_mean, phi_prior_var: Prior mean/variance for the AR(1) coefficient ``phi`` when ``dynamics='ar1'``. A normal prior is used with truncation to ``(-1, 1)``. gamma0_prior_mean, gamma0_prior_var: Prior mean/variance for the AR(1) intercept ``gamma0`` when ``dynamics='ar1'``. """ enabled: bool = True dynamics: Literal["rw", "ar1"] = "rw" covariance: Literal["diagonal", "triangular", "factor"] = "diagonal" q_prior_var: float = 1.0 k_factors: int = 1 loading_prior_var: float = 1.0 store_factor_draws: bool = False epsilon: float = 1e-4 h0_prior_mean: float = 1e-6 h0_prior_var: float = 10.0 sigma_eta_prior_nu0: float = 1.0 sigma_eta_prior_s0: float = 0.01 phi_prior_mean: float = 0.95 phi_prior_var: float = 0.1 gamma0_prior_mean: float = 0.0 gamma0_prior_var: float = 10.0 def __post_init__(self) -> None: if self.dynamics not in {"rw", "ar1"}: raise ValueError("dynamics must be one of {'rw','ar1'}") if self.covariance not in {"diagonal", "triangular", "factor"}: raise ValueError("covariance must be one of {'diagonal','triangular','factor'}") if self.covariance == "triangular" and ( self.q_prior_var <= 0 or not np.isfinite(self.q_prior_var) ): raise ValueError("q_prior_var must be positive and finite when covariance='triangular'") if self.covariance == "factor": if not isinstance(self.k_factors, (int, np.integer)) or isinstance(self.k_factors, bool): raise ValueError("k_factors must be an integer when covariance='factor'") if int(self.k_factors) < 1: raise ValueError("k_factors must be >= 1 when covariance='factor'") if self.loading_prior_var <= 0 or not np.isfinite(self.loading_prior_var): raise ValueError( "loading_prior_var must be positive and finite when covariance='factor'" ) if self.epsilon <= 0 or not np.isfinite(self.epsilon): raise ValueError("epsilon must be positive") if self.h0_prior_var <= 0 or not np.isfinite(self.h0_prior_var): raise ValueError("h0_prior_var must be positive") if self.sigma_eta_prior_nu0 <= 0 or not np.isfinite(self.sigma_eta_prior_nu0): raise ValueError("sigma_eta_prior_nu0 must be positive") if self.sigma_eta_prior_s0 <= 0 or not np.isfinite(self.sigma_eta_prior_s0): raise ValueError("sigma_eta_prior_s0 must be positive") if self.dynamics == "ar1": for name in [ "phi_prior_mean", "phi_prior_var", "gamma0_prior_mean", "gamma0_prior_var", ]: v = float(getattr(self, name)) if not np.isfinite(v): raise ValueError(f"{name} must be finite when dynamics='ar1'") if self.phi_prior_var <= 0: raise ValueError("phi_prior_var must be positive when dynamics='ar1'") if self.gamma0_prior_var <= 0: raise ValueError("gamma0_prior_var must be positive when dynamics='ar1'")
_KSC_PI = np.array([0.0073, 0.10556, 0.00002, 0.04395, 0.34001, 0.24566, 0.2575], dtype=float) _KSC_MI = ( np.array([-10.12999, -3.97281, -8.56686, 2.77786, 0.61942, 1.79518, -1.08819], dtype=float) - 1.2704 ) _KSC_SIGI = np.array([5.79596, 2.61369, 5.17950, 0.16735, 0.64009, 0.34023, 1.26261], dtype=float) _KSC_SQRTSIGI = np.sqrt(_KSC_SIGI) _LOG_KSC_PI = np.log(_KSC_PI) _LOG_SQRT_2PI = 0.5 * np.log(2.0 * np.pi) try: import numba as _nb _HAVE_NUMBA = True except Exception: # pragma: no cover _nb = None _HAVE_NUMBA = False def _numba_enabled() -> bool: v = os.getenv("SRVAR_USE_NUMBA", "0").strip().lower() return v in {"1", "true", "yes", "on"} if _HAVE_NUMBA: @_nb.njit(cache=True) def _sample_mixture_indicators_numba( y: np.ndarray, ht: np.ndarray, u: np.ndarray, ksc_mi: np.ndarray, ksc_sqrtsigi: np.ndarray, log_ksc_pi: np.ndarray, ) -> np.ndarray: t = y.shape[0] out = np.empty(t, dtype=np.int64) log_sqrtsigi = np.log(ksc_sqrtsigi) for i in range(t): maxv = -1.0e300 log_q = np.empty(7, dtype=np.float64) for k in range(7): loc = ht[i] + ksc_mi[k] z = (y[i] - loc) / ksc_sqrtsigi[k] loglik = -_LOG_SQRT_2PI - log_sqrtsigi[k] - 0.5 * z * z v = log_ksc_pi[k] + loglik log_q[k] = v if v > maxv: maxv = v s = 0.0 for k in range(7): s += np.exp(log_q[k] - maxv) c = 0.0 target = u[i] chosen = 0 for k in range(7): c += np.exp(log_q[k] - maxv) / s if target <= c: chosen = k break out[i] = chosen return out
[docs] def log_e2_star(e: np.ndarray, *, epsilon: float) -> np.ndarray: """Compute the log-squared residual transform used for SV mixture sampling. Given residuals ``e_t``, this returns ``log(e_t^2 + epsilon)``. The small ``epsilon`` avoids ``log(0)`` and stabilizes sampling when residuals are extremely small. """ v = np.asarray(e, dtype=float) return np.log(v * v + float(epsilon))
[docs] def sample_mixture_indicators( *, y_star: np.ndarray, h: np.ndarray, rng: np.random.Generator ) -> np.ndarray: """Sample KSC mixture indicators for the log-chi-square approximation. This implements the standard Kim-Shephard-Chib (KSC) 7-component Gaussian mixture approximation for the observation equation arising from: y*_t = log(e_t^2) with e_t ~ N(0, exp(h_t)) Conditional on a discrete indicator ``s_t`` taking values in {0, ..., 6}, the model is: y*_t = h_t + m_{s_t} + u_t, u_t ~ N(0, v_{s_t}) where ``(pi, m, v)`` are fixed mixture weights/means/variances. Notes: The constant ``1.2704`` used in the mixture mean vector definition corresponds to ``E[log(chi^2_1)]`` and is used to match the centered form commonly reported for the KSC approximation. References: Kim, S., Shephard, N., & Chib, S. (1998). Stochastic volatility: Likelihood inference and comparison with ARCH models. Returns: Integer array of shape (T,) with entries in {0, ..., 6}. """ y = np.asarray(y_star, dtype=float).reshape(-1) ht = np.asarray(h, dtype=float).reshape(-1) if y.shape != ht.shape: raise ValueError("y_star and h must have the same shape") if _HAVE_NUMBA and _numba_enabled(): u = rng.random(y.shape[0]) return _sample_mixture_indicators_numba( y, ht, u, _KSC_MI, _KSC_SQRTSIGI, _LOG_KSC_PI ).astype(int) loc = ht[:, None] + _KSC_MI[None, :] z = (y[:, None] - loc) / _KSC_SQRTSIGI[None, :] loglik = -_LOG_SQRT_2PI - np.log(_KSC_SQRTSIGI)[None, :] - 0.5 * z * z log_q = _LOG_KSC_PI[None, :] + loglik log_q = log_q - scipy.special.logsumexp(log_q, axis=1, keepdims=True) q = np.exp(log_q) u = rng.random(y.shape[0]) cdf = np.cumsum(q, axis=1) return (u[:, None] < cdf).argmax(axis=1).astype(int)
[docs] def sample_h_svrw( *, y_star: np.ndarray, h: np.ndarray, sigma_eta2: float, h0: float, rng: np.random.Generator, ) -> np.ndarray: """Sample a full path of log-volatilities under an SV random-walk prior. This updates ``h = (h_1, ..., h_T)`` in the SVRW model y*_t = log(e_t^2) \approx h_t + m_{s_t} + u_t h_t = h_{t-1} + eta_t, eta_t ~ N(0, sigma_eta2) using the KSC mixture approximation and a precision-based Gaussian sampler for the resulting banded linear system. Args: y_star: Transformed residuals (T,). h: Current log-volatility values (T,). sigma_eta2: Innovation variance for the random walk. h0: Initial state value used in the prior for h_1. rng: NumPy RNG. Returns: Updated log-volatility path with shape (T,). """ y = np.asarray(y_star, dtype=float).reshape(-1) ht = np.asarray(h, dtype=float).reshape(-1) if y.shape != ht.shape: raise ValueError("y_star and h must have the same shape") if sigma_eta2 <= 0 or not np.isfinite(sigma_eta2): raise ValueError("sigma_eta2 must be positive") t = y.shape[0] s = sample_mixture_indicators(y_star=y, h=ht, rng=rng) dconst = _KSC_MI[s] omega = _KSC_SIGI[s] inv_omega = 1.0 / omega inv_sig = 1.0 / float(sigma_eta2) diag_kh = (2.0 * inv_sig) * np.ones(t, dtype=float) diag_kh[-1] = inv_sig sub_kh = (-inv_sig) * np.ones(t - 1, dtype=float) diag_ph = diag_kh + inv_omega ab = np.zeros((2, t), dtype=float) ab[0, :] = diag_ph ab[1, :-1] = sub_kh c = scipy.linalg.cholesky_banded(ab, lower=True, check_finite=False) rhs = (y - dconst) * inv_omega rhs[0] = rhs[0] + h0 * inv_sig hhat = scipy.linalg.cho_solve_banded((c, True), rhs, check_finite=False) z = rng.standard_normal(t) ab_t = np.zeros((2, t), dtype=float) ab_t[0, 1:] = c[1, :-1] ab_t[1, :] = c[0, :] delta = scipy.linalg.solve_banded((0, 1), ab_t, z, check_finite=False) return hhat + delta
[docs] def sample_h0( *, h1: float, sigma_eta2: float, prior_mean: float, prior_var: float, rng: np.random.Generator ) -> float: if sigma_eta2 <= 0 or not np.isfinite(sigma_eta2): raise ValueError("sigma_eta2 must be positive") if prior_var <= 0 or not np.isfinite(prior_var): raise ValueError("prior_var must be positive") kh0 = (1.0 / sigma_eta2) + (1.0 / prior_var) h0hat = ((prior_mean / prior_var) + (h1 / sigma_eta2)) / kh0 sd = float(np.sqrt(1.0 / kh0)) return float(h0hat + sd * rng.normal())
def _truncnorm_rvs( *, mean: float, sd: float, lower: float, upper: float, rng: np.random.Generator, ) -> float: if sd <= 0 or not np.isfinite(sd): raise ValueError("sd must be positive") if not np.isfinite(lower) or not np.isfinite(upper) or not (lower < upper): raise ValueError("invalid truncation bounds") a = (lower - mean) / sd b = (upper - mean) / sd return float(scipy.stats.truncnorm.rvs(a=a, b=b, loc=mean, scale=sd, random_state=rng))
[docs] def sample_h_ar1( *, y_star: np.ndarray, h: np.ndarray, sigma_eta2: float, h0: float, gamma0: float, phi: float, rng: np.random.Generator, ) -> np.ndarray: """Sample log-volatilities under an AR(1) state equation. State equation: h_t = gamma0 + phi * h_{t-1} + eta_t, eta_t ~ N(0, sigma_eta2) Observation equation uses the KSC mixture approximation as in :func:`sample_h_svrw`. """ y = np.asarray(y_star, dtype=float).reshape(-1) ht = np.asarray(h, dtype=float).reshape(-1) if y.shape != ht.shape: raise ValueError("y_star and h must have the same shape") if sigma_eta2 <= 0 or not np.isfinite(sigma_eta2): raise ValueError("sigma_eta2 must be positive") if not np.isfinite(h0) or not np.isfinite(gamma0) or not np.isfinite(phi): raise ValueError("h0, gamma0, and phi must be finite") if not (-1.0 < float(phi) < 1.0): raise ValueError("phi must be in (-1, 1)") t = int(y.shape[0]) s = sample_mixture_indicators(y_star=y, h=ht, rng=rng) dconst = _KSC_MI[s] omega = _KSC_SIGI[s] inv_omega = 1.0 / omega inv_sig = 1.0 / float(sigma_eta2) diag_kh = (1.0 + float(phi) ** 2) * inv_sig * np.ones(t, dtype=float) diag_kh[-1] = inv_sig if t == 1: diag_kh[0] = inv_sig sub_kh = (-float(phi) * inv_sig) * np.ones(max(t - 1, 0), dtype=float) diag_ph = diag_kh + inv_omega ab = np.zeros((2, t), dtype=float) ab[0, :] = diag_ph if t > 1: ab[1, :-1] = sub_kh c = scipy.linalg.cholesky_banded(ab, lower=True, check_finite=False) rhs = (y - dconst) * inv_omega r = np.full(t, float(gamma0), dtype=float) r[0] = float(gamma0) + float(phi) * float(h0) at_r = np.empty(t, dtype=float) if t == 1: at_r[0] = r[0] else: at_r[0] = r[0] - float(phi) * r[1] if t > 2: at_r[1:-1] = r[1:-1] - float(phi) * r[2:] at_r[-1] = r[-1] rhs = rhs + inv_sig * at_r hhat = scipy.linalg.cho_solve_banded((c, True), rhs, check_finite=False) z = rng.standard_normal(t) ab_t = np.zeros((2, t), dtype=float) if t > 1: ab_t[0, 1:] = c[1, :-1] ab_t[1, :] = c[0, :] delta = scipy.linalg.solve_banded((0, 1), ab_t, z, check_finite=False) return hhat + delta
[docs] def sample_h0_ar1( *, h1: float, sigma_eta2: float, gamma0: float, phi: float, prior_mean: float, prior_var: float, rng: np.random.Generator, ) -> float: if sigma_eta2 <= 0 or not np.isfinite(sigma_eta2): raise ValueError("sigma_eta2 must be positive") if prior_var <= 0 or not np.isfinite(prior_var): raise ValueError("prior_var must be positive") if ( not np.isfinite(h1) or not np.isfinite(gamma0) or not np.isfinite(phi) or not np.isfinite(prior_mean) ): raise ValueError("inputs must be finite") k = (float(phi) ** 2) / float(sigma_eta2) + 1.0 / float(prior_var) h0hat = ( (float(prior_mean) / float(prior_var)) + (float(phi) * (float(h1) - float(gamma0)) / float(sigma_eta2)) ) / k sd = float(np.sqrt(1.0 / k)) return float(h0hat + sd * rng.normal())
[docs] def sample_sigma_eta2_ar1( *, h: np.ndarray, h0: float, gamma0: float, phi: float, nu0: float, s0: float, rng: np.random.Generator, ) -> float: ht = np.asarray(h, dtype=float).reshape(-1) if nu0 <= 0 or not np.isfinite(nu0): raise ValueError("nu0 must be positive") if s0 <= 0 or not np.isfinite(s0): raise ValueError("s0 must be positive") if not np.isfinite(h0) or not np.isfinite(gamma0) or not np.isfinite(phi): raise ValueError("h0, gamma0, and phi must be finite") prev = np.concatenate([np.array([float(h0)], dtype=float), ht[:-1]]) eh = ht - (float(gamma0) + float(phi) * prev) shape = float(nu0 + ht.shape[0] / 2.0) scale = float(s0 + 0.5 * float(np.sum(eh * eh))) return float(1.0 / rng.gamma(shape=shape, scale=1.0 / scale))
[docs] def sample_ar1_params( *, h: np.ndarray, h0: float, sigma_eta2: float, phi_prior_mean: float, phi_prior_var: float, gamma0_prior_mean: float, gamma0_prior_var: float, rng: np.random.Generator, phi_bounds: tuple[float, float] = (-0.999, 0.999), ) -> tuple[float, float]: """Sample (gamma0, phi) for the AR(1) volatility state equation.""" ht = np.asarray(h, dtype=float).reshape(-1) if ht.size < 1: raise ValueError("h must be non-empty") if sigma_eta2 <= 0 or not np.isfinite(sigma_eta2): raise ValueError("sigma_eta2 must be positive") if phi_prior_var <= 0 or gamma0_prior_var <= 0: raise ValueError("prior variances must be > 0") if not np.isfinite(h0): raise ValueError("h0 must be finite") for name, v in [ ("phi_prior_mean", phi_prior_mean), ("phi_prior_var", phi_prior_var), ("gamma0_prior_mean", gamma0_prior_mean), ("gamma0_prior_var", gamma0_prior_var), ]: if not np.isfinite(float(v)): raise ValueError(f"{name} must be finite") phi_lo, phi_hi = float(phi_bounds[0]), float(phi_bounds[1]) if not (-1.0 < phi_lo < phi_hi < 1.0): raise ValueError("phi_bounds must be inside (-1, 1)") y = ht x = np.concatenate([np.array([float(h0)], dtype=float), ht[:-1]]) phi = float(phi_prior_mean) for _ in range(2): k_g0 = (1.0 / float(gamma0_prior_var)) + (float(y.shape[0]) / float(sigma_eta2)) g0_hat = ( (float(gamma0_prior_mean) / float(gamma0_prior_var)) + (1.0 / float(sigma_eta2)) * float(np.sum(y - phi * x)) ) / k_g0 gamma0 = float(g0_hat + np.sqrt(1.0 / k_g0) * rng.normal()) k_phi = (1.0 / float(phi_prior_var)) + (1.0 / float(sigma_eta2)) * float(np.sum(x * x)) phi_hat = ( (float(phi_prior_mean) / float(phi_prior_var)) + (1.0 / float(sigma_eta2)) * float(np.sum(x * (y - gamma0))) ) / k_phi sd_phi = float(np.sqrt(1.0 / k_phi)) phi = _truncnorm_rvs(mean=float(phi_hat), sd=sd_phi, lower=phi_lo, upper=phi_hi, rng=rng) return float(gamma0), float(phi)
[docs] def sample_sigma_eta2( *, h: np.ndarray, h0: float, nu0: float, s0: float, rng: np.random.Generator, ) -> float: ht = np.asarray(h, dtype=float).reshape(-1) if nu0 <= 0 or not np.isfinite(nu0): raise ValueError("nu0 must be positive") if s0 <= 0 or not np.isfinite(s0): raise ValueError("s0 must be positive") eh = ht - np.concatenate([np.array([h0], dtype=float), ht[:-1]]) shape = float(nu0 + ht.shape[0] / 2.0) scale = float(s0 + 0.5 * float(np.sum(eh * eh))) return float(1.0 / rng.gamma(shape=shape, scale=1.0 / scale))
[docs] def sample_beta_svrw( *, x: np.ndarray, y: np.ndarray, m0: np.ndarray, v0: np.ndarray, h: np.ndarray, rng: np.random.Generator, jitter: float = 1e-6, ) -> np.ndarray: xt = np.asarray(x, dtype=float) yt = np.asarray(y, dtype=float) m0t = np.asarray(m0, dtype=float) v0t = np.asarray(v0, dtype=float) ht = np.asarray(h, dtype=float) if xt.ndim != 2 or yt.ndim != 2: raise ValueError("x and y must be 2D") if ht.ndim != 2: raise ValueError("h must be 2D") if yt.shape != ht.shape: raise ValueError("y and h must have the same shape") t, k = xt.shape _t2, n = yt.shape if m0t.shape != (k, n): raise ValueError("m0 must have shape (K, N)") if v0t.shape != (k, k): raise ValueError("v0 must have shape (K, K)") if jitter <= 0 or not np.isfinite(jitter): raise ValueError("jitter must be positive") inv_v0 = solve_psd(v0t, np.eye(k, dtype=float)) beta = np.empty((k, n), dtype=float) for i in range(n): w = np.exp(-ht[:, i]) xtwx = xt.T @ (w[:, None] * xt) ktheta = symmetrize(inv_v0 + xtwx + jitter * np.eye(k, dtype=float)) rhs = inv_v0 @ m0t[:, i] + xt.T @ (w * yt[:, i]) thetahat = solve_psd(ktheta, rhs) chol = cholesky_jitter(ktheta) z = rng.standard_normal(k) theta = thetahat + scipy.linalg.solve_triangular(chol.T, z, lower=False, check_finite=False) beta[:, i] = theta return beta