Source code for srvar.api

from __future__ import annotations

import numpy as np

from .bvar import sample_posterior_niw, simulate_var_forecast
from .data.dataset import Dataset
from .elb import apply_elb_floor
from .results import FitResult, ForecastResult
from .samplers import _fit_elb_gibbs, _fit_fsv, _fit_no_elb, _fit_svcov, _fit_svrw
from .spec import ModelSpec, PriorSpec, SamplerConfig


[docs] def fit( dataset: Dataset, model: ModelSpec, prior: PriorSpec, sampler: SamplerConfig, *, rng: np.random.Generator | None = None, ) -> FitResult: """Fit an SRVAR/BVAR model and return posterior draws. This is the primary user-facing entry point for estimating models in this package. Supported configurations ------------------------ - Conjugate BVAR with Normal-Inverse-Wishart prior (``prior.family='niw'``) - Spike-and-slab variable selection (``prior.family='ssvs'``) - Effective lower bound (ELB) data-augmentation Gibbs sampler (``model.elb.enabled``) - Stochastic volatility random-walk (SVRW) (``model.volatility.enabled``; requires NIW) Parameters ---------- dataset: Observed data (T, N). When ELB is enabled, some variables are interpreted as censored at the ELB. model: Model configuration, including lag order ``p`` and optional ELB/SV specs. prior: Prior configuration. The prior family is determined by ``prior.family``. sampler: MCMC configuration (draws, burn-in, thinning). rng: Optional NumPy RNG. Returns ------- FitResult Fit result with posterior parameters and/or stored posterior draws depending on the model configuration. Notes ----- For conjugate NIW without ELB/SV, ``FitResult.posterior`` is populated and ``beta_draws``/``sigma_draws`` are produced by direct sampling. For Gibbs samplers (ELB, SSVS, SV), burn-in and thinning are applied online and the returned ``*_draws`` arrays contain only kept draws. When ELB is enabled, the latent series is initialized by setting observations at or below the bound to a small amount below the bound (``bound - elb.init_offset``). This is a numerical initialization choice intended to avoid starting exactly at the truncation boundary. """ prior_family = prior.family.lower() if prior_family not in {"niw", "ssvs", "blasso", "dl"}: raise ValueError("only prior.family in {'niw','ssvs','blasso','dl'} is supported") eqwise_minnesota = prior.minnesota_canonical prior_mode = "minnesota_canonical" if eqwise_minnesota is not None else prior_family if rng is None: rng = np.random.default_rng() if eqwise_minnesota is not None and eqwise_minnesota.mode == "tempered": vol = model.volatility if vol is None or not vol.enabled or vol.covariance != "diagonal": raise ValueError( "minnesota_tempered currently supports only diagonal stochastic volatility" ) if model.shocks is not None and model.shocks.family != "gaussian": vol = model.volatility vol_factor = vol is not None and vol.enabled and vol.covariance == "factor" if model.elb is not None and model.elb.enabled and not vol_factor: raise ValueError( "robust shocks with ELB require factor SV (volatility.covariance='factor')" ) if model.steady_state is not None and not vol_factor: raise ValueError( "robust shocks with steady_state require factor SV (volatility.covariance='factor')" ) if vol is not None and vol.enabled and not vol_factor: raise ValueError( "robust shocks are currently supported only with factor SV " "(volatility.covariance='factor')" ) if model.volatility is not None and model.volatility.enabled: if prior_mode == "minnesota_canonical" and model.volatility.covariance in { "triangular", "factor", }: raise ValueError( "minnesota_canonical currently supports only homoskedastic models " "and diagonal stochastic volatility" ) if prior_family not in {"niw", "blasso", "dl"}: raise ValueError( "stochastic volatility currently requires prior.family in {'niw','blasso','dl'}" ) if model.volatility.covariance == "triangular": if prior_family != "niw": raise ValueError("triangular SV covariance currently requires prior.family='niw'") return _fit_svcov(dataset=dataset, model=model, prior=prior, sampler=sampler, rng=rng) if model.volatility.covariance == "factor": if prior_family != "niw": raise ValueError("FSV currently requires prior.family='niw'") return _fit_fsv(dataset=dataset, model=model, prior=prior, sampler=sampler, rng=rng) return _fit_svrw(dataset=dataset, model=model, prior=prior, sampler=sampler, rng=rng) if model.elb is None or not model.elb.enabled: return _fit_no_elb( dataset=dataset, model=model, prior=prior, sampler=sampler, prior_family=prior_mode, rng=rng, ) # Phase 3: ELB data-augmentation Gibbs # Initialize latent series: start at observed, but nudge ELB-bound observations slightly below bound # Update latent shadow rates at ELB observations return _fit_elb_gibbs( dataset=dataset, model=model, prior=prior, sampler=sampler, prior_family=prior_mode, rng=rng, )
[docs] def forecast( fit: FitResult, horizons: list[int], *, draws: int = 1000, quantile_levels: list[float] | None = None, stationarity: str = "allow", stationarity_tol: float = 1e-10, stationarity_max_draws: int | None = None, rng: np.random.Generator | None = None, ) -> ForecastResult: """Generate predictive simulations from a fitted model. Forecasts are produced by Monte Carlo simulation using posterior parameter draws. Parameters ---------- fit: Result from :func:`srvar.api.fit`. horizons: List of forecast horizons (in steps) requested by the caller. Internally, simulations are generated out to ``H = max(horizons)``. draws: Number of predictive simulation paths. quantile_levels: Quantiles to compute from the simulated draws. Defaults to ``[0.1, 0.5, 0.9]``. rng: Optional NumPy RNG. stationarity: Stationarity policy for VAR coefficient draws used in forecasting: - ``"allow"`` (default): do not enforce stability. - ``"reject"``: require stable VAR dynamics (companion eigenvalues strictly inside the unit circle), rejecting non-stationary coefficient draws. stationarity_tol: Numerical tolerance for the stability check. A draw is considered stationary if ``max(abs(eigvals)) < 1 - stationarity_tol``. stationarity_max_draws: When `fit` does not contain stored posterior draws (i.e., forecasting samples from a conjugate NIW posterior), this caps the **total number of candidate posterior draws** attempted to collect `draws` stationary draws under ``stationarity="reject"``. If not provided, defaults to ``50 * draws``. Returns ------- ForecastResult Forecast result containing: - ``draws``: simulated observations with shape ``(D, H, N)`` - ``mean``: mean across draws with shape ``(H, N)`` - ``quantiles``: dict mapping each requested quantile to an array ``(H, N)`` Notes ----- If ELB is enabled in the fitted model, returned ``draws`` are the observed (floored) draws, and ``latent_draws`` contains the unconstrained latent draws. If you call ``forecast(fit, horizons=[1, 3], ...)`` then ``result.mean[0]`` corresponds to horizon 1 and ``result.mean[2]`` corresponds to horizon 3. """ if rng is None: rng = np.random.default_rng() if quantile_levels is None: quantile_levels = [0.1, 0.5, 0.9] if not isinstance(draws, (int, np.integer)) or isinstance(draws, bool): raise ValueError("draws must be an integer") if int(draws) < 1: raise ValueError("draws must be >= 1") draws = int(draws) if not isinstance(horizons, list) or len(horizons) == 0: raise ValueError("horizons must be a non-empty list of positive integers") horizons_int: list[int] = [] for h in horizons: if not isinstance(h, (int, np.integer)) or isinstance(h, bool): raise ValueError("horizons must contain only integers") hi = int(h) if hi < 1: raise ValueError("horizons must contain only positive integers") horizons_int.append(hi) horizons = horizons_int if not isinstance(quantile_levels, list) or len(quantile_levels) == 0: raise ValueError("quantile_levels must be a non-empty list") quantiles_float: list[float] = [] for q in quantile_levels: qf = float(q) if not np.isfinite(qf) or not (0.0 < qf < 1.0): raise ValueError("quantile_levels must be finite and in (0, 1)") quantiles_float.append(qf) quantile_levels = quantiles_float hmax = int(max(horizons)) p = fit.model.p if not isinstance(stationarity, str) or not stationarity: raise ValueError("stationarity must be a non-empty string") stationarity_l = stationarity.lower() if stationarity_l not in {"allow", "reject"}: raise ValueError("stationarity must be one of: allow, reject") if not np.isfinite(float(stationarity_tol)) or float(stationarity_tol) < 0: raise ValueError("stationarity_tol must be finite and >= 0") stationarity_tol = float(stationarity_tol) if stationarity_max_draws is not None: if not isinstance(stationarity_max_draws, (int, np.integer)) or isinstance( stationarity_max_draws, bool ): raise ValueError("stationarity_max_draws must be an integer when provided") if int(stationarity_max_draws) < 1: raise ValueError("stationarity_max_draws must be >= 1") stationarity_max_draws = int(stationarity_max_draws) if fit.posterior is None and fit.beta_draws is None: raise ValueError( "fit does not contain posterior parameters or stored draws; " "this can happen if burn_in/thin leaves zero kept draws" ) if fit.model.steady_state is not None and fit.beta_draws is None: raise ValueError( "steady_state forecasting requires stored beta_draws; reduce burn_in or thin" ) base_dataset = fit.latent_dataset if fit.latent_dataset is not None else fit.dataset if base_dataset.T < p: raise ValueError("dataset is too short for requested lag order p") y_last = base_dataset.values[-p:, :] if fit.beta_draws is not None and fit.sigma_draws is not None: # sample with replacement from stored posterior draws idx: np.ndarray if stationarity_l == "reject": from .var import is_stationary stable = [ is_stationary( fit.beta_draws[i], n=fit.dataset.N, p=p, include_intercept=fit.model.include_intercept, tol=stationarity_tol, ) for i in range(int(fit.beta_draws.shape[0])) ] stable_idx = np.flatnonzero(np.asarray(stable, dtype=bool)) if stable_idx.size < 1: raise ValueError("no stationary coefficient draws available in fit.beta_draws") sel = rng.integers(0, stable_idx.size, size=draws) idx = stable_idx[sel] else: idx = rng.integers(0, fit.beta_draws.shape[0], size=draws) beta_draws = fit.beta_draws[idx] sigma_draws = fit.sigma_draws[idx] sims = np.empty((draws, hmax, fit.dataset.N), dtype=float) for d in range(draws): sims[d] = simulate_var_forecast( y_last=y_last, beta=beta_draws[d], sigma=sigma_draws[d], horizon=hmax, include_intercept=fit.model.include_intercept, rng=rng, shocks=fit.model.shocks, ) elif ( fit.beta_draws is not None and fit.h_draws is not None and fit.sigma_eta2_draws is not None and fit.q_draws is not None ): import scipy.linalg idx: np.ndarray if stationarity_l == "reject": from .var import is_stationary stable = [ is_stationary( fit.beta_draws[i], n=fit.dataset.N, p=p, include_intercept=fit.model.include_intercept, tol=stationarity_tol, ) for i in range(int(fit.beta_draws.shape[0])) ] stable_idx = np.flatnonzero(np.asarray(stable, dtype=bool)) if stable_idx.size < 1: raise ValueError("no stationary coefficient draws available in fit.beta_draws") sel = rng.integers(0, stable_idx.size, size=draws) idx = stable_idx[sel] else: idx = rng.integers(0, fit.beta_draws.shape[0], size=draws) beta_draws = fit.beta_draws[idx] h_draws = fit.h_draws[idx] sigma_eta2_draws = fit.sigma_eta2_draws[idx] q_draws = fit.q_draws[idx] sv_gamma0_draws = fit.sv_gamma0_draws[idx] if fit.sv_gamma0_draws is not None else None sv_phi_draws = fit.sv_phi_draws[idx] if fit.sv_phi_draws is not None else None sims = np.empty((draws, hmax, fit.dataset.N), dtype=float) for d in range(draws): lags = y_last.copy() h_curr = h_draws[d, -1, :].copy() sig_eta = sigma_eta2_draws[d].copy() q = q_draws[d] gamma0 = sv_gamma0_draws[d] if sv_gamma0_draws is not None else None phi = sv_phi_draws[d] if sv_phi_draws is not None else None path = np.empty((hmax, fit.dataset.N), dtype=float) for h_step in range(hmax): x_parts = [] if fit.model.include_intercept: x_parts.append(np.array([1.0], dtype=float)) for lag in range(1, p + 1): x_parts.append(lags[-lag, :]) x_row = np.concatenate(x_parts) mean = x_row @ beta_draws[d] z = rng.normal(size=fit.dataset.N) eps = z * np.exp(0.5 * h_curr) innov = scipy.linalg.solve_triangular(q, eps, lower=False, check_finite=False) y_next = mean + innov path[h_step] = y_next lags = np.vstack([lags[1:, :], y_next]) if p > 1 else y_next.reshape(1, -1) if gamma0 is not None and phi is not None: h_curr = ( gamma0 + phi * h_curr + np.sqrt(sig_eta) * rng.normal(size=fit.dataset.N) ) else: h_curr = h_curr + np.sqrt(sig_eta) * rng.normal(size=fit.dataset.N) sims[d] = path elif ( fit.beta_draws is not None and fit.h_draws is not None and fit.sigma_eta2_draws is not None and fit.lambda_draws is not None and fit.h_factor_draws is not None and fit.sigma_eta2_factor_draws is not None ): idx: np.ndarray if stationarity_l == "reject": from .var import is_stationary stable = [ is_stationary( fit.beta_draws[i], n=fit.dataset.N, p=p, include_intercept=fit.model.include_intercept, tol=stationarity_tol, ) for i in range(int(fit.beta_draws.shape[0])) ] stable_idx = np.flatnonzero(np.asarray(stable, dtype=bool)) if stable_idx.size < 1: raise ValueError("no stationary coefficient draws available in fit.beta_draws") sel = rng.integers(0, stable_idx.size, size=draws) idx = stable_idx[sel] else: idx = rng.integers(0, fit.beta_draws.shape[0], size=draws) beta_draws = fit.beta_draws[idx] h_eta_draws = fit.h_draws[idx] sigma_eta2_eta_draws = fit.sigma_eta2_draws[idx] lam_draws = fit.lambda_draws[idx] h_f_draws = fit.h_factor_draws[idx] sigma_eta2_f_draws = fit.sigma_eta2_factor_draws[idx] sims = np.empty((draws, hmax, fit.dataset.N), dtype=float) for d in range(draws): lags = y_last.copy() h_eta_curr = h_eta_draws[d, -1, :].copy() sig_eta2_eta = sigma_eta2_eta_draws[d].copy() lam = lam_draws[d] h_f_curr = h_f_draws[d, -1, :].copy() sig_eta2_f = sigma_eta2_f_draws[d].copy() k = int(h_f_curr.shape[0]) path = np.empty((hmax, fit.dataset.N), dtype=float) for h_step in range(hmax): x_parts = [] if fit.model.include_intercept: x_parts.append(np.array([1.0], dtype=float)) for lag in range(1, p + 1): x_parts.append(lags[-lag, :]) x_row = np.concatenate(x_parts) mean = x_row @ beta_draws[d] f_step = rng.normal(size=k) * np.exp(0.5 * h_f_curr) eta_step = rng.normal(size=fit.dataset.N) * np.exp(0.5 * h_eta_curr) eps = lam @ f_step + eta_step if fit.model.shocks is not None and fit.model.shocks.family != "gaussian": fam = str(fit.model.shocks.family).lower() if fam == "student_t": nu = float(fit.model.shocks.df) lam_shock = float(rng.gamma(shape=0.5 * nu, scale=2.0 / nu)) eps = eps / float(np.sqrt(lam_shock)) elif fam == "mixture_outlier": prob = float(fit.model.shocks.outlier_prob) kappa = float(fit.model.shocks.outlier_variance) is_outlier = bool(rng.uniform() < prob) eps = eps * (float(np.sqrt(kappa)) if is_outlier else 1.0) else: # pragma: no cover raise ValueError(f"unknown shocks.family: {fit.model.shocks.family}") y_next = mean + eps path[h_step] = y_next lags = np.vstack([lags[1:, :], y_next]) if p > 1 else y_next.reshape(1, -1) h_eta_curr = h_eta_curr + np.sqrt(sig_eta2_eta) * rng.normal(size=fit.dataset.N) h_f_curr = h_f_curr + np.sqrt(sig_eta2_f) * rng.normal(size=k) sims[d] = path elif ( fit.beta_draws is not None and fit.h_draws is not None and fit.sigma_eta2_draws is not None ): idx: np.ndarray if stationarity_l == "reject": from .var import is_stationary stable = [ is_stationary( fit.beta_draws[i], n=fit.dataset.N, p=p, include_intercept=fit.model.include_intercept, tol=stationarity_tol, ) for i in range(int(fit.beta_draws.shape[0])) ] stable_idx = np.flatnonzero(np.asarray(stable, dtype=bool)) if stable_idx.size < 1: raise ValueError("no stationary coefficient draws available in fit.beta_draws") sel = rng.integers(0, stable_idx.size, size=draws) idx = stable_idx[sel] else: idx = rng.integers(0, fit.beta_draws.shape[0], size=draws) beta_draws = fit.beta_draws[idx] h_draws = fit.h_draws[idx] sigma_eta2_draws = fit.sigma_eta2_draws[idx] sv_gamma0_draws = fit.sv_gamma0_draws[idx] if fit.sv_gamma0_draws is not None else None sv_phi_draws = fit.sv_phi_draws[idx] if fit.sv_phi_draws is not None else None sims = np.empty((draws, hmax, fit.dataset.N), dtype=float) for d in range(draws): lags = y_last.copy() h_curr = h_draws[d, -1, :].copy() sig_eta = sigma_eta2_draws[d].copy() gamma0 = sv_gamma0_draws[d] if sv_gamma0_draws is not None else None phi = sv_phi_draws[d] if sv_phi_draws is not None else None path = np.empty((hmax, fit.dataset.N), dtype=float) for h_step in range(hmax): x_parts = [] if fit.model.include_intercept: x_parts.append(np.array([1.0], dtype=float)) for lag in range(1, p + 1): x_parts.append(lags[-lag, :]) x_row = np.concatenate(x_parts) mean = x_row @ beta_draws[d] eps = rng.normal(size=fit.dataset.N) * np.exp(0.5 * h_curr) y_next = mean + eps path[h_step] = y_next lags = np.vstack([lags[1:, :], y_next]) if p > 1 else y_next.reshape(1, -1) if gamma0 is not None and phi is not None: h_curr = ( gamma0 + phi * h_curr + np.sqrt(sig_eta) * rng.normal(size=fit.dataset.N) ) else: h_curr = h_curr + np.sqrt(sig_eta) * rng.normal(size=fit.dataset.N) sims[d] = path else: if fit.posterior is None: raise ValueError("fit has no posterior parameters or stored draws") if stationarity_l == "reject": from .var import is_stationary max_total = ( int(50 * draws) if stationarity_max_draws is None else int(stationarity_max_draws) ) accepted_beta: list[np.ndarray] = [] accepted_sigma: list[np.ndarray] = [] attempted = 0 while len(accepted_beta) < draws and attempted < max_total: need = int(draws - len(accepted_beta)) # oversample to reduce expected loop iterations, but stay within budget batch = min(max(10, 2 * need), max_total - attempted) beta_b, sigma_b = sample_posterior_niw( mn=fit.posterior.mn, vn=fit.posterior.vn, sn=fit.posterior.sn, nun=fit.posterior.nun, draws=batch, rng=rng, ) attempted += batch for i in range(int(batch)): if is_stationary( beta_b[i], n=fit.dataset.N, p=p, include_intercept=fit.model.include_intercept, tol=stationarity_tol, ): accepted_beta.append(beta_b[i]) accepted_sigma.append(sigma_b[i]) if len(accepted_beta) == draws: break if len(accepted_beta) < draws: raise ValueError( f"stationarity='reject' could not generate {draws} stationary draws within " f"{max_total} candidate draws; try increasing stationarity_max_draws or relaxing priors" ) beta_draws = np.stack(accepted_beta) sigma_draws = np.stack(accepted_sigma) else: beta_draws, sigma_draws = sample_posterior_niw( mn=fit.posterior.mn, vn=fit.posterior.vn, sn=fit.posterior.sn, nun=fit.posterior.nun, draws=draws, rng=rng, ) sims = np.empty((draws, hmax, fit.dataset.N), dtype=float) for d in range(draws): sims[d] = simulate_var_forecast( y_last=y_last, beta=beta_draws[d], sigma=sigma_draws[d], horizon=hmax, include_intercept=fit.model.include_intercept, rng=rng, shocks=fit.model.shocks, ) mean = sims.mean(axis=0) quantiles = {q: np.quantile(sims, q=q, axis=0) for q in quantile_levels} latent_sims: np.ndarray | None = None # If ELB enabled, return observed draws (apply floor) for constrained variables if fit.model.elb is not None and fit.model.elb.enabled: latent_sims = sims.copy() applies_to_idx = [fit.dataset.variables.index(v) for v in fit.model.elb.applies_to] sims = apply_elb_floor(sims, bound=fit.model.elb.bound, indices=applies_to_idx) mean = sims.mean(axis=0) quantiles = {q: np.quantile(sims, q=q, axis=0) for q in quantile_levels} return ForecastResult( variables=list(fit.dataset.variables), horizons=list(horizons), draws=sims, latent_draws=latent_sims, mean=mean, quantiles=quantiles, )