Source code for srvar.analysis.hd

from __future__ import annotations

from collections.abc import Sequence
from typing import Any, Literal

import numpy as np

from ..bvar import sample_posterior_niw
from ..linalg import cholesky_jitter
from ..results import FitResult, HistoricalDecompositionResult
from ..var import design_matrix, is_stationary
from .irf import _ar_matrices_from_beta, _parse_ordering, _parse_stationarity, _select_draw_indices


def _parse_quantiles(quantile_levels: list[float] | None) -> list[float]:
    if quantile_levels is None:
        return [0.1, 0.5, 0.9]
    if not isinstance(quantile_levels, list) or len(quantile_levels) < 1:
        raise ValueError("quantile_levels must be a non-empty list")

    qs: list[float] = []
    for q in quantile_levels:
        qf = float(q)
        if not np.isfinite(qf) or not (0.0 < qf < 1.0):
            raise ValueError("quantile_levels must be finite and in (0, 1)")
        qs.append(qf)
    return qs


def _permute_beta(
    *,
    beta: np.ndarray,
    order_idx: np.ndarray,
    n: int,
    p: int,
    include_intercept: bool,
) -> np.ndarray:
    b = np.asarray(beta, dtype=float)
    k_expected = (1 if include_intercept else 0) + int(n) * int(p)
    if b.shape != (k_expected, int(n)):
        raise ValueError("beta has wrong shape for VAR(p)")

    idx = np.asarray(order_idx, dtype=int).reshape(-1)
    if idx.shape != (int(n),):
        raise ValueError("order_idx has wrong shape")

    blocks: list[np.ndarray] = []
    base = 0
    if include_intercept:
        c = b[0, :][idx]
        blocks.append(c.reshape(1, -1))
        base = 1

    for lag in range(int(p)):
        block = b[base + lag * int(n) : base + (lag + 1) * int(n), :]
        blocks.append(block[np.ix_(idx, idx)])

    return np.vstack(blocks)


[docs] def historical_decomposition_cholesky( fit: FitResult, *, draws: int | None = 200, ordering: Sequence[str] | None = None, shock_scale: Literal["one_sd", "unit"] = "one_sd", quantile_levels: list[float] | None = None, stationarity: str = "allow", stationarity_tol: float = 1e-10, stationarity_max_draws: int | None = None, use_latent: bool | None = None, rng: np.random.Generator | None = None, ) -> HistoricalDecompositionResult: """Historical decomposition from Cholesky-identified structural shocks. This decomposes the observed (or latent) dataset into: - a shock-free baseline path conditional on the initial lags - additive contributions from each structural shock at each date Notes ----- - For ELB models, the VAR is defined on the latent shadow series. If `use_latent` is not specified, this function defaults to using `fit.latent_dataset` when available. - For volatility models, shocks are identified using the time-varying covariance state `Sigma_t` implied by `h_draws` (and `q_draws` for triangular covariance). """ if rng is None: rng = np.random.default_rng() if draws is not None: if not isinstance(draws, (int, np.integer)) or isinstance(draws, bool): raise ValueError("draws must be an integer when provided") if int(draws) < 1: raise ValueError("draws must be >= 1") draws = int(draws) q_list = _parse_quantiles(quantile_levels) vars_out, order_idx = _parse_ordering(list(fit.dataset.variables), ordering) st_policy, st_tol, st_max = _parse_stationarity( stationarity=stationarity, stationarity_tol=stationarity_tol, stationarity_max_draws=stationarity_max_draws, ) elb_enabled = bool(fit.model.elb is not None and getattr(fit.model.elb, "enabled", False)) if use_latent is None: use_latent = elb_enabled if use_latent: if fit.latent_dataset is None: raise ValueError("use_latent=True requires fit.latent_dataset") dataset = fit.latent_dataset else: dataset = fit.dataset y_raw = np.asarray(dataset.values, dtype=float) if y_raw.ndim != 2: raise ValueError("dataset.values must be 2D") p = int(fit.model.p) t, n = y_raw.shape if t <= p: raise ValueError("dataset must have T > p for historical decomposition") if n != int(fit.dataset.N): raise ValueError("dataset.values has inconsistent number of variables") idx = np.asarray(order_idx, dtype=int) y = y_raw[:, idx] volatility_enabled = bool( fit.model.volatility is not None and getattr(fit.model.volatility, "enabled", False) ) if volatility_enabled and shock_scale == "unit": raise ValueError("shock_scale='unit' is not supported for volatility models") metadata: dict[str, Any] = { "ordering": list(vars_out), "shock_scale": str(shock_scale), "stationarity": st_policy, "stationarity_tol": float(st_tol), "use_latent": bool(use_latent), "p": int(p), } if fit.beta_draws is not None: idx_draws = _select_draw_indices( fit=fit, draws=draws, stationarity=st_policy, stationarity_tol=st_tol, rng=rng ) beta_draws = np.asarray(fit.beta_draws[idx_draws], dtype=float) sigma_draws = ( np.asarray(fit.sigma_draws[idx_draws], dtype=float) if fit.sigma_draws is not None else None ) h_draws = ( np.asarray(fit.h_draws[idx_draws], dtype=float) if fit.h_draws is not None else None ) q_draws = ( np.asarray(fit.q_draws[idx_draws], dtype=float) if fit.q_draws is not None else None ) lambda_draws = ( np.asarray(fit.lambda_draws[idx_draws], dtype=float) if fit.lambda_draws is not None else None ) h_factor_draws = ( np.asarray(fit.h_factor_draws[idx_draws], dtype=float) if fit.h_factor_draws is not None else None ) metadata["draw_source"] = "stored" else: if fit.posterior is None: raise ValueError("fit has no stored beta_draws and no posterior parameters") if volatility_enabled: raise ValueError( "volatility models require stored beta_draws for historical decomposition" ) d_out = int(1000 if draws is None else draws) metadata["draw_source"] = "posterior" if st_policy == "allow": beta_draws, sigma_draws = sample_posterior_niw( mn=fit.posterior.mn, vn=fit.posterior.vn, sn=fit.posterior.sn, nun=fit.posterior.nun, draws=d_out, rng=rng, ) else: max_total = int(50 * d_out) if st_max is None else int(st_max) accepted_beta: list[np.ndarray] = [] accepted_sigma: list[np.ndarray] = [] attempted = 0 while len(accepted_beta) < d_out and attempted < max_total: need = int(d_out - len(accepted_beta)) batch = min(max(10, 2 * need), max_total - attempted) beta_b, sigma_b = sample_posterior_niw( mn=fit.posterior.mn, vn=fit.posterior.vn, sn=fit.posterior.sn, nun=fit.posterior.nun, draws=batch, rng=rng, ) attempted += batch for i in range(int(batch)): if is_stationary( beta_b[i], n=n, p=p, include_intercept=fit.model.include_intercept, tol=st_tol, ): accepted_beta.append(beta_b[i]) accepted_sigma.append(sigma_b[i]) if len(accepted_beta) == d_out: break if len(accepted_beta) < d_out: raise ValueError( f"stationarity='reject' could not generate {d_out} stationary draws within " f"{max_total} candidate draws; try increasing stationarity_max_draws" ) beta_draws = np.stack(accepted_beta) sigma_draws = np.stack(accepted_sigma) h_draws = None q_draws = None lambda_draws = None h_factor_draws = None d_used = int(beta_draws.shape[0]) t_eff = int(t - p) baseline_draws = np.empty((d_used, t_eff, n), dtype=float) shock_draws = np.empty((d_used, t_eff, n), dtype=float) contrib_draws = np.empty((d_used, t_eff, n, n), dtype=float) max_abs_err = 0.0 for d in range(d_used): beta_perm = _permute_beta( beta=beta_draws[d], order_idx=idx, n=n, p=p, include_intercept=fit.model.include_intercept, ) a_mats = _ar_matrices_from_beta( beta=beta_perm, n=n, p=p, include_intercept=fit.model.include_intercept ) c = beta_perm[0, :] if fit.model.include_intercept else np.zeros(n, dtype=float) x, y_tgt = design_matrix(y, p, include_intercept=fit.model.include_intercept) resid = y_tgt - x @ beta_perm # (T-p, N) if not volatility_enabled: if sigma_draws is None: raise ValueError( "missing covariance draws required for homoskedastic decomposition" ) sigma = np.asarray(sigma_draws[d], dtype=float)[np.ix_(idx, idx)] impact0 = cholesky_jitter(sigma) if shock_scale == "unit": diag = np.diag(impact0) if np.any(diag <= 0): raise ValueError( "cannot use shock_scale='unit' when Cholesky diagonal is non-positive" ) impact0 = impact0 / diag.reshape(1, -1) eps = np.linalg.solve(impact0, resid.T).T impacts_t: np.ndarray | None = impact0[None, :, :] else: if h_draws is None: raise ValueError("missing volatility state draws required for decomposition") h = np.asarray(h_draws[d], dtype=float) if h.shape == (t, n): h_eff = h[p:, :] elif h.shape == (t_eff, n): h_eff = h else: raise ValueError("h_draws must have shape (T, N) or (T - p, N)") cov_mode = getattr(fit.model.volatility, "covariance", "diagonal") if cov_mode == "triangular": if q_draws is None: raise ValueError("triangular volatility decomposition requires fit.q_draws") q = np.asarray(q_draws[d], dtype=float) if q.shape != (n, n): raise ValueError("q_draws must have shape (N, N)") import scipy.linalg inv_q = scipy.linalg.solve_triangular( q, np.eye(n, dtype=float), lower=False, check_finite=False ) eps = np.empty_like(resid, dtype=float) impacts = np.empty((t_eff, n, n), dtype=float) for tt in range(t_eff): ht = h_eff[tt, :] d_sqrt = np.exp(0.5 * ht) a = inv_q * d_sqrt.reshape(1, -1) sigma_t = (a @ a.T)[np.ix_(idx, idx)] l_t = cholesky_jitter(sigma_t) impacts[tt] = l_t eps[tt] = np.linalg.solve(l_t, resid[tt]) impacts_t = impacts elif cov_mode == "factor": if lambda_draws is None or h_factor_draws is None: raise ValueError( "factor SV decomposition requires fit.lambda_draws and fit.h_factor_draws" ) lam = np.asarray(lambda_draws[d], dtype=float) if lam.ndim != 2: raise ValueError("lambda_draws must have shape (D, N, k)") if lam.shape[0] != n: raise ValueError("lambda_draws has wrong N dimension") k = int(lam.shape[1]) hf = np.asarray(h_factor_draws[d], dtype=float) if hf.shape == (t, k): hf_eff = hf[p:, :] elif hf.shape == (t_eff, k): hf_eff = hf else: raise ValueError("h_factor_draws must have shape (T, k) or (T - p, k)") lam_perm = lam[idx, :] # permute to match y/resid ordering h_eta_perm = h_eff[:, idx] eps = np.empty_like(resid, dtype=float) impacts = np.empty((t_eff, n, n), dtype=float) for tt in range(t_eff): sigma_t = ( lam_perm @ np.diag(np.exp(hf_eff[tt, :])) @ lam_perm.T + np.diag(np.exp(h_eta_perm[tt, :])) ) sigma_t = 0.5 * (sigma_t + sigma_t.T) l_t = cholesky_jitter(sigma_t) impacts[tt] = l_t eps[tt] = np.linalg.solve(l_t, resid[tt]) impacts_t = impacts else: scale = np.exp(0.5 * h_eff[:, idx]) eps = resid / scale impacts = np.zeros((t_eff, n, n), dtype=float) for i in range(n): impacts[:, i, i] = scale[:, i] impacts_t = impacts shock_draws[d] = eps baseline = np.empty((t, n), dtype=float) baseline[:p, :] = y[:p, :] for tt in range(p, t): acc = c.copy() for lag in range(1, p + 1): acc += a_mats[lag - 1] @ baseline[tt - lag] baseline[tt] = acc baseline_draws[d] = baseline[p:, :] contrib = np.zeros((t, n, n), dtype=float) for tt in range(p, t): acc = np.zeros((n, n), dtype=float) for lag in range(1, p + 1): acc += a_mats[lag - 1] @ contrib[tt - lag] if impacts_t is None: raise RuntimeError("internal error: impacts_t is None") if impacts_t.shape[0] == 1: imp = impacts_t[0] else: imp = impacts_t[tt - p] innov = imp * eps[tt - p].reshape(1, -1) contrib[tt] = acc + innov contrib_draws[d] = contrib[p:, :, :] recon = baseline[p:, :] + contrib[p:, :, :].sum(axis=2) err = float(np.max(np.abs(recon - y[p:, :]))) max_abs_err = max(max_abs_err, err) mean = contrib_draws.mean(axis=0) quants = {q: np.quantile(contrib_draws, q=q, axis=0) for q in q_list} metadata["reconstruction_max_abs_error"] = float(max_abs_err) return HistoricalDecompositionResult( variables=list(vars_out), shocks=list(vars_out), time_index=dataset.time_index[p:], baseline_draws=baseline_draws, shock_draws=shock_draws, contributions_draws=contrib_draws, mean=mean, quantiles=quants, identification="cholesky", metadata=metadata, )