Source code for srvar.elb

from __future__ import annotations

from dataclasses import dataclass, field

import numpy as np
import scipy.stats

from .linalg import solve_psd


[docs] @dataclass(frozen=True, slots=True) class ElbSpec: """Effective lower bound (ELB) configuration. When ELB is enabled, selected observed series are treated as censored at an upper bound (the effective lower bound). During estimation, latent shadow values are sampled for observations at/below the bound. Parameters ---------- bound: The ELB level. Observations at or below ``bound + tol`` are treated as constrained. applies_to: Names of variables (as they appear in :class:`srvar.data.dataset.Dataset`) to which the ELB constraint applies. tol: Numerical tolerance when identifying bound observations. enabled: Whether to enable ELB handling. """ bound: float applies_to: list[str] = field(default_factory=list) tol: float = 1e-8 init_offset: float = 0.05 enabled: bool = True def __post_init__(self) -> None: if not np.isfinite(self.bound): raise ValueError("elb.bound must be finite") if self.tol < 0 or not np.isfinite(self.tol): raise ValueError("elb.tol must be finite and >= 0") if self.init_offset < 0 or not np.isfinite(self.init_offset): raise ValueError("elb.init_offset must be finite and >= 0") if self.enabled and len(self.applies_to) < 1: raise ValueError("elb.applies_to must not be empty when enabled")
[docs] def truncnorm_rvs_upper(*, mean: float, sd: float, upper: float, rng: np.random.Generator) -> float: """Draw from a univariate normal distribution truncated above at ``upper``.""" if sd <= 0 or not np.isfinite(sd): raise ValueError("sd must be positive") a = -np.inf b = (upper - mean) / sd return float(scipy.stats.truncnorm.rvs(a=a, b=b, loc=mean, scale=sd, random_state=rng))
[docs] def apply_elb_floor(values: np.ndarray, *, bound: float, indices: list[int]) -> np.ndarray: """Apply an ELB floor to simulation draws. This is used during forecasting to map latent shadow-rate draws back into observed rate draws by enforcing ``y >= bound`` on the constrained variables. """ y = np.asarray(values, dtype=float).copy() if y.ndim < 2: raise ValueError("values must have ndim >= 2") if len(indices) == 0: return y y[..., indices] = np.maximum(y[..., indices], bound) return y
def _x_row(y: np.ndarray, *, t: int, p: int, include_intercept: bool) -> np.ndarray: parts: list[np.ndarray] = [] if include_intercept: parts.append(np.array([1.0], dtype=float)) for lag in range(1, p + 1): parts.append(y[t - lag]) return np.concatenate(parts)
[docs] def sample_shadow_value( *, y: np.ndarray, t: int, j: int, p: int, beta: np.ndarray, sigma: np.ndarray, upper: float, include_intercept: bool, rng: np.random.Generator, ) -> float: """Sample y[t, j] from its full conditional (truncated above at `upper`). This conditional accounts for the fact that y[t, j] enters as a regressor in the next p likelihood terms. Assumptions: - Standard VAR(p): y_t ~ N(x_t beta, Sigma) - The only constraint is: y[t, j] <= upper Returns: new value for y[t, j] """ y = np.asarray(y, dtype=float) beta = np.asarray(beta, dtype=float) sigma = np.asarray(sigma, dtype=float) t_max, n = y.shape if not (0 <= j < n): raise ValueError("j out of range") if not (0 <= t < t_max): raise ValueError("t out of range") if p < 1: raise ValueError("p must be >= 1") e_j = np.zeros(n, dtype=float) e_j[j] = 1.0 sinv_ej = solve_psd(sigma, e_j) sinv_jj = float(sinv_ej[j]) a = 0.0 b = 0.0 s_start = max(p, t) s_end = min(t_max - 1, t + p) # cache current y[t, j] y_curr = float(y[t, j]) for s in range(s_start, s_end + 1): x_s = _x_row(y, t=s, p=p, include_intercept=include_intercept) mu_s = x_s @ beta if s == t: # Likelihood term for time t itself (only exists if t >= p) # mu_t does not depend on y[t, j] in a VAR, since regressors are lagged. u = y[s].copy() u[j] = 0.0 u = u - mu_s sinv_u = solve_psd(sigma, u) a += sinv_jj b += -float(sinv_u[j]) continue # future terms where y[t, j] appears as a lagged regressor lag = s - t if not (1 <= lag <= p): continue k0 = 1 if include_intercept else 0 idx = k0 + (lag - 1) * n + j d = beta[idx, :] r_curr = y[s] - mu_s r_base = r_curr + d * y_curr sinv_d = solve_psd(sigma, d) sinv_r = solve_psd(sigma, r_base) a += float(d @ sinv_d) b += float(d @ sinv_r) if a <= 0.0 or not np.isfinite(a): raise RuntimeError("non-positive or invalid conditional precision") var = 1.0 / a mean = b / a return truncnorm_rvs_upper(mean=mean, sd=float(np.sqrt(var)), upper=upper, rng=rng)
[docs] def sample_shadow_value_svrw( *, y: np.ndarray, h: np.ndarray, t: int, j: int, p: int, beta: np.ndarray, upper: float, include_intercept: bool, rng: np.random.Generator, ) -> float: y = np.asarray(y, dtype=float) ht = np.asarray(h, dtype=float) beta = np.asarray(beta, dtype=float) t_max, n = y.shape if ht.shape != y.shape: raise ValueError("h must have the same shape as y") if not (0 <= j < n): raise ValueError("j out of range") if not (0 <= t < t_max): raise ValueError("t out of range") if p < 1: raise ValueError("p must be >= 1") a = 0.0 b = 0.0 s_start = max(p, t) s_end = min(t_max - 1, t + p) y_curr = float(y[t, j]) for s in range(s_start, s_end + 1): x_s = _x_row(y, t=s, p=p, include_intercept=include_intercept) mu_s = x_s @ beta if s == t: w = float(np.exp(-ht[s, j])) a += w b += w * float(mu_s[j]) continue lag = s - t if not (1 <= lag <= p): continue k0 = 1 if include_intercept else 0 idx = k0 + (lag - 1) * n + j for i in range(n): d = float(beta[idx, i]) if d == 0.0: continue w = float(np.exp(-ht[s, i])) r_base = float(y[s, i] - mu_s[i] + d * y_curr) a += w * d * d b += w * d * r_base if a <= 0.0 or not np.isfinite(a): raise RuntimeError("non-positive or invalid conditional precision") var = 1.0 / a mean = b / a return truncnorm_rvs_upper(mean=mean, sd=float(np.sqrt(var)), upper=upper, rng=rng)
[docs] def sample_shadow_value_svcov( *, y: np.ndarray, h: np.ndarray, q: np.ndarray, t: int, j: int, p: int, beta: np.ndarray, upper: float, include_intercept: bool, rng: np.random.Generator, ) -> float: """Sample y[t, j] from its full conditional under triangular SV covariance. This is the ELB shadow-rate update for a VAR(p) with stochastic volatility where the reduced-form residual covariance is: Sigma_t = Q^{-1} diag(exp(h_t)) (Q^{-1})' with Q upper-triangular and ones on the diagonal. """ y = np.asarray(y, dtype=float) ht = np.asarray(h, dtype=float) qt = np.asarray(q, dtype=float) beta = np.asarray(beta, dtype=float) t_max, n = y.shape if ht.shape != y.shape: raise ValueError("h must have the same shape as y") if qt.shape != (n, n): raise ValueError("q must have shape (N, N)") if not (0 <= j < n): raise ValueError("j out of range") if not (0 <= t < t_max): raise ValueError("t out of range") if p < 1: raise ValueError("p must be >= 1") a = 0.0 b = 0.0 s_start = max(p, t) s_end = min(t_max - 1, t + p) y_curr = float(y[t, j]) for s in range(s_start, s_end + 1): x_s = _x_row(y, t=s, p=p, include_intercept=include_intercept) mu_s = x_s @ beta inv_lambda = np.exp(-ht[s, :]) k_s = qt.T @ (inv_lambda[:, None] * qt) if s == t: u = y[s].copy() u[j] = 0.0 u = u - mu_s sinv_u = k_s @ u a += float(k_s[j, j]) b += -float(sinv_u[j]) continue lag = s - t if not (1 <= lag <= p): continue k0 = 1 if include_intercept else 0 idx = k0 + (lag - 1) * n + j d = beta[idx, :] r_curr = y[s] - mu_s r_base = r_curr + d * y_curr sinv_d = k_s @ d sinv_r = k_s @ r_base a += float(d @ sinv_d) b += float(d @ sinv_r) if a <= 0.0 or not np.isfinite(a): raise RuntimeError("non-positive or invalid conditional precision") var = 1.0 / a mean = b / a return truncnorm_rvs_upper(mean=mean, sd=float(np.sqrt(var)), upper=upper, rng=rng)
[docs] def sample_shadow_value_fsv( *, y: np.ndarray, h_eta: np.ndarray, factor_mean: np.ndarray, t: int, j: int, p: int, beta: np.ndarray, upper: float, include_intercept: bool, rng: np.random.Generator, ) -> float: """Sample y[t, j] from its full conditional under factor SV (given factors/loadings). This is the ELB shadow-rate update for a VAR(p) with factor stochastic volatility, conditioning on the current factor contribution to the mean. The conditional likelihood is treated as diagonal given factors/loadings: y_t = x_t beta + factor_mean_t + eta_t eta_t ~ N(0, diag(exp(h_eta_t))) where ``factor_mean_t`` corresponds to ``Lambda f_t`` evaluated at the current Gibbs state. The only constraint is: ``y[t, j] <= upper``. """ y = np.asarray(y, dtype=float) ht = np.asarray(h_eta, dtype=float) fm = np.asarray(factor_mean, dtype=float) beta = np.asarray(beta, dtype=float) t_max, n = y.shape if ht.shape != y.shape: raise ValueError("h_eta must have the same shape as y") if fm.shape != y.shape: raise ValueError("factor_mean must have the same shape as y") if not (0 <= j < n): raise ValueError("j out of range") if not (0 <= t < t_max): raise ValueError("t out of range") if p < 1: raise ValueError("p must be >= 1") a = 0.0 b = 0.0 s_start = max(p, t) s_end = min(t_max - 1, t + p) y_curr = float(y[t, j]) for s in range(s_start, s_end + 1): x_s = _x_row(y, t=s, p=p, include_intercept=include_intercept) mu_s = x_s @ beta + fm[s] if s == t: w = float(np.exp(-ht[s, j])) a += w b += w * float(mu_s[j]) continue lag = s - t if not (1 <= lag <= p): continue k0 = 1 if include_intercept else 0 idx = k0 + (lag - 1) * n + j for i in range(n): d = float(beta[idx, i]) if d == 0.0: continue w = float(np.exp(-ht[s, i])) r_base = float(y[s, i] - mu_s[i] + d * y_curr) a += w * d * d b += w * d * r_base if a <= 0.0 or not np.isfinite(a): raise RuntimeError("non-positive or invalid conditional precision") var = 1.0 / a mean = b / a return truncnorm_rvs_upper(mean=mean, sd=float(np.sqrt(var)), upper=upper, rng=rng)