Source code for srvar.analysis.fevd

from __future__ import annotations

from collections.abc import Sequence
from typing import Any, Literal

import numpy as np

from ..results import FEVDResult, FitResult, IRFResult
from .irf import irf_cholesky


def _parse_horizons(horizons: int | Sequence[int]) -> list[int]:
    if isinstance(horizons, (int, np.integer)) and not isinstance(horizons, bool):
        hmax = int(horizons)
        if hmax < 1:
            raise ValueError("horizons must be >= 1")
        return list(range(1, hmax + 1))

    if not isinstance(horizons, Sequence) or isinstance(horizons, (str, bytes)):
        raise ValueError("horizons must be an int or a sequence of ints")

    hs: list[int] = []
    for h in horizons:
        if not isinstance(h, (int, np.integer)) or isinstance(h, bool):
            raise ValueError("horizons must contain only integers")
        hi = int(h)
        if hi < 1:
            raise ValueError("horizons must contain only positive integers")
        hs.append(hi)

    if len(hs) < 1:
        raise ValueError("horizons must be non-empty")
    if len(set(hs)) != len(hs):
        raise ValueError("horizons must not contain duplicates")
    return sorted(hs)


def _parse_quantiles(quantile_levels: list[float] | None) -> list[float]:
    if quantile_levels is None:
        return [0.1, 0.5, 0.9]
    if not isinstance(quantile_levels, list) or len(quantile_levels) < 1:
        raise ValueError("quantile_levels must be a non-empty list")

    qs: list[float] = []
    for q in quantile_levels:
        qf = float(q)
        if not np.isfinite(qf) or not (0.0 < qf < 1.0):
            raise ValueError("quantile_levels must be finite and in (0, 1)")
        qs.append(qf)
    return qs


[docs] def fevd_from_irf( irf: IRFResult, *, horizons: int | Sequence[int] | None = None, quantile_levels: list[float] | None = None, ) -> FEVDResult: """Compute FEVD from a structural IRF. Horizon definition ------------------ For forecast horizon ``h`` (steps ahead, ``h >= 1``), FEVD uses the cumulative sum of squared structural impulse responses from IRF horizons ``0..h-1``. Notes ----- - This requires an orthogonal (structural) IRF. Reduced-form IRFs are not supported. - The IRF must include all integer horizons ``0..(max(horizons)-1)``. """ if irf.identification == "reduced_form": raise ValueError("FEVD requires a structural IRF; reduced-form identification is not valid") if not isinstance(irf.horizons, list) or len(irf.horizons) < 1: raise ValueError("irf.horizons must be a non-empty list") if horizons is None: default_hmax = max(1, int(max(irf.horizons))) h_list = _parse_horizons(default_hmax) else: h_list = _parse_horizons(horizons) q_list = _parse_quantiles(quantile_levels) hmax = int(max(h_list)) max_lag = hmax - 1 irf_h = list(irf.horizons) idx = {int(h): i for i, h in enumerate(irf_h)} if 0 not in idx: raise ValueError("FEVD requires IRF horizons to include 0 (impact response)") missing = [k for k in range(max_lag + 1) if k not in idx] if missing: raise ValueError( f"FEVD requires IRF horizons to include all integers 0..{max_lag}; missing: {missing}" ) draws = np.asarray(irf.draws, dtype=float) if draws.ndim != 4: raise ValueError("irf.draws must have shape (D, H, N, N)") if draws.shape[1] != len(irf_h): raise ValueError("irf.draws second dimension must match len(irf.horizons)") n = len(irf.variables) s = len(irf.shocks) if draws.shape[2:] != (n, s): raise ValueError("irf.draws has wrong shape relative to variables/shocks") needed = [idx[k] for k in range(max_lag + 1)] theta = draws[:, needed, :, :] # (D, L, N, S), where L = max_lag+1 cum_sq = np.cumsum(theta**2, axis=1) # (D, L, N, S) sel = np.asarray([h - 1 for h in h_list], dtype=int) num = cum_sq[:, sel, :, :] # (D, H_out, N, S) denom = num.sum(axis=3, keepdims=True) # (D, H_out, N, 1) out = np.full_like(num, np.nan, dtype=float) valid = np.isfinite(denom) & (denom > 0) np.divide(num, denom, out=out, where=valid) mean = np.nanmean(out, axis=0) quants = {q: np.nanquantile(out, q=q, axis=0) for q in q_list} metadata: dict[str, Any] = dict(irf.metadata) if irf.metadata is not None else {} metadata.update( { "fevd_horizon_definition": "h steps ahead uses IRF horizons 0..h-1", "source": "irf", } ) zero_var = int(np.count_nonzero(~valid)) if zero_var: metadata["zero_variance_cells"] = zero_var return FEVDResult( variables=list(irf.variables), shocks=list(irf.shocks), horizons=list(h_list), draws=out, mean=mean, quantiles=quants, identification=str(irf.identification), metadata=metadata, )
[docs] def fevd_cholesky( fit: FitResult, *, horizons: int | Sequence[int] = 24, draws: int | None = None, ordering: Sequence[str] | None = None, shock_scale: Literal["one_sd", "unit"] = "one_sd", quantile_levels: list[float] | None = None, stationarity: str = "allow", stationarity_tol: float = 1e-10, stationarity_max_draws: int | None = None, rng: np.random.Generator | None = None, ) -> FEVDResult: """Convenience wrapper: compute Cholesky FEVD from a fitted model.""" h_list = _parse_horizons(horizons) max_lag = int(max(h_list) - 1) irf = irf_cholesky( fit, horizons=max_lag, draws=draws, ordering=ordering, shock_scale=shock_scale, quantile_levels=[0.5], stationarity=stationarity, stationarity_tol=stationarity_tol, stationarity_max_draws=stationarity_max_draws, rng=rng, ) return fevd_from_irf(irf, horizons=h_list, quantile_levels=quantile_levels)