Source code for quadsv.statistics

from __future__ import annotations

import numpy as np
import scipy.sparse as sp
from scipy.stats import chi2, ncx2, norm
from tqdm import tqdm

from quadsv.kernels import Kernel

__all__ = [
    "auto_chunk_size",
    "compute_null_params",
    "effective_rank",
    "gene_pattern_diversity",
    "within_group_pattern_diversity",
    "liu_sf",
    "spatial_q_test",
    "spatial_r_test",
]

_DELTA = 1e-10


# Default live-memory budget for :func:`auto_chunk_size` — 2 GiB. On an
# 8-core host with joblib parallelism this keeps aggregate peak RAM
# around 16 GiB (2 GiB × 8), comfortable on most modern laptops.
_DEFAULT_CHUNK_BUDGET = 2 * (1 << 30)


# ---------------------------------------------------------------------------
# Effective rank / spectral-pattern diversity
# ---------------------------------------------------------------------------


[docs] def effective_rank( cov: np.ndarray, weights: np.ndarray | None = None, ) -> float: r"""Effective rank (participation ratio) of a covariance matrix. Computes .. math:: K_\mathrm{eff} \;=\; \frac{\big(\sum_k \lambda_k\big)^2}{\sum_k \lambda_k^2} where :math:`\lambda_k` are the eigenvalues of ``cov`` (or, when ``weights`` is given, of :math:`W^{1/2} \mathrm{cov}\, W^{1/2}` with :math:`W = \mathrm{diag}(w)`). The result is bounded by 1 (rank-1, all variance on a single direction) and ``K = cov.shape[0]`` (uniformly spread, all eigenvalues equal). It coincides with the standard inverse Herfindahl index of the (normalised) eigenvalue distribution and quantifies the "effective number of independent components" of a quadratic-form statistic :math:`T^2 = X^\top \mathrm{cov} X` where :math:`X \sim \mathcal{N}(0,I)`. Parameters ---------- cov : np.ndarray Symmetric ``(K, K)`` covariance matrix. Negative eigenvalues from numerical noise are clipped to 0. weights : np.ndarray, optional Non-negative weights of length ``K``. When provided, returns the effective rank of the weighted form :math:`W^{1/2} \mathrm{cov}\,W^{1/2}`, useful for analysing how a frequency-weighted L2 statistic actually distributes its sensitivity across eigen-directions. Returns ------- float Effective rank in ``[1, K]``. Returns ``nan`` when the trace is non-positive (degenerate covariance). Examples -------- >>> import numpy as np >>> effective_rank(np.eye(10)) 10.0 >>> # Rank-1 outer product >>> v = np.zeros(10); v[0] = 1.0 >>> effective_rank(np.outer(v, v)) 1.0 """ if cov.ndim != 2 or cov.shape[0] != cov.shape[1]: raise ValueError(f"cov must be a square 2D matrix, got shape {cov.shape}.") K = cov.shape[0] if weights is None: eigvals = np.linalg.eigvalsh(cov) else: w = np.asarray(weights, dtype=float) if w.shape != (K,): raise ValueError(f"weights must have length K={K}, got shape {w.shape}.") if np.any(w < 0): raise ValueError("weights must be non-negative.") sqW = np.sqrt(w) M = (sqW[:, None] * cov) * sqW[None, :] eigvals = np.linalg.eigvalsh(M) eigvals = np.maximum(eigvals, 0.0) s = float(eigvals.sum()) if s <= 0: return float("nan") return float(s * s / float((eigvals**2).sum()))
[docs] def gene_pattern_diversity( spectra: np.ndarray, weights: np.ndarray | None = None, *, eps: float = 1e-12, ) -> float: r"""Spatial-pattern diversity across genes within a single sample. Quantifies how heterogeneous the per-gene spatial-frequency profiles are within one sample. Computes the cross-gene covariance of the log-spectra, .. math:: \hat\Sigma_\mathrm{genes} \;=\; \frac{1}{G - 1} \sum_g \big(\ell_g - \bar\ell\big)\big(\ell_g - \bar\ell\big)^\top where :math:`\ell_g \in \mathbb{R}^K` is gene :math:`g`'s radially-binned log-spectrum and :math:`\bar\ell` the mean across genes, then returns ``effective_rank(Σ_genes, weights)``. Interpretation: - **Low diversity** (:math:`K_\mathrm{eff} \approx 1`): most genes share the same spatial-frequency profile — the sample's spatial patterns collapse onto a single dominant mode (e.g. all "smooth" or all "punctate"). - **High diversity** (:math:`K_\mathrm{eff} \approx K`): genes vary widely in their spatial structure — the sample carries a rich mix of spatial scales. Parameters ---------- spectra : np.ndarray ``(n_genes, K)`` raw spectrum matrix (typically a single sample's radially-binned spectrum). Log is taken internally with an ``eps`` floor. weights : np.ndarray, optional Per-bin weights, same semantics as :func:`effective_rank`. eps : float, default 1e-12 Floor for ``log(spectra)`` to handle exact-zero bins. """ if spectra.ndim != 2: raise ValueError(f"spectra must be (n_genes, K), got shape {spectra.shape}.") log_s = np.log(np.maximum(spectra, eps)) centred = log_s - log_s.mean(axis=0, keepdims=True) G = log_s.shape[0] cov = (centred.T @ centred) / max(G - 1, 1) return effective_rank(cov, weights=weights)
[docs] def within_group_pattern_diversity( spectra: np.ndarray, groups: np.ndarray, weights: np.ndarray | None = None, *, eps: float = 1e-12, ) -> float: r"""Spatial-pattern diversity of the within-group residual covariance. For a cohort of samples partitioned into two groups, computes the pooled-across-genes within-group covariance of log-spectra (the same estimator used by ``log_l2 + null='wald'`` in the comparator), then returns its effective rank. Interpretation: - **Low diversity** (:math:`K_\mathrm{eff} \approx 1`): within-group sample-to-sample variation aligns with one spatial-frequency direction. Wald-type tests on this cohort effectively reduce to a 1-DoF test → high power per direction but very sensitive to estimation noise in that single eigenvalue. - **High diversity** (:math:`K_\mathrm{eff} \approx K`): noise spreads over many directions; Wald's analytic null is more accurate (CLT smoothing). Parameters ---------- spectra : np.ndarray ``(n_samples, n_genes, K)`` spectrum tensor (raw, not logged). groups : np.ndarray ``(n_samples,)`` with exactly two distinct labels. weights : np.ndarray, optional Per-bin weights, same semantics as :func:`effective_rank`. eps : float, default 1e-12 Floor for ``log(spectra)``. Returns ------- float Effective rank of the pooled within-group covariance. """ if spectra.ndim != 3: raise ValueError(f"spectra must be (n_samples, n_genes, K), got shape {spectra.shape}.") groups = np.asarray(groups) uniq = np.unique(groups) if uniq.size != 2: raise ValueError(f"groups must contain exactly two distinct labels, got {uniq}.") g_int = (groups == uniq[1]).astype(int) a_mask = g_int == 0 log_a = np.log(np.maximum(spectra[a_mask], eps)) log_b = np.log(np.maximum(spectra[~a_mask], eps)) res_a = log_a - log_a.mean(axis=0, keepdims=True) res_b = log_b - log_b.mean(axis=0, keepdims=True) res = np.concatenate([res_a, res_b], axis=0) n_total, G, K = res.shape df = max(int(a_mask.sum()) + int((~a_mask).sum()) - 2, 1) res_flat = res.reshape(n_total * G, K) Sigma = (res_flat.T @ res_flat) / (G * df) return effective_rank(Sigma, weights=weights)
[docs] def auto_chunk_size( kernel: Kernel, n_jobs: int = 1, budget_bytes: int = _DEFAULT_CHUNK_BUDGET, ) -> int: """Pick a per-backend-optimal ``chunk_size`` for the Q / R test. The returned value is used by :func:`spatial_q_test` / :func:`spatial_r_test` (and by :meth:`DetectorGrid.compute_qstat` / :meth:`DetectorIrregular.compute_qstat`) to split a multi-feature batch into chunks. It is the smaller of two caps: 1. **Cache sweet-spot cap** — empirical sweep of per-feature time vs ``chunk`` at ``n ∈ {30k, 100k, 300k, 1M}``: .. list-table:: :header-rows: 1 :widths: 50 50 * - Backend - chunk cap * - :class:`~quadsv.FFTKernel` - 32 * - :class:`~quadsv.NUFFTKernel` - 64 * - MatrixKernel (any sub-type) - 16 (``n < 200k``); 8 (``n ≥ 200k``) Matrix backends don't vectorise over RHS columns (scipy CSR SpMV, SuperLU triangular solve), and the chunk size cap is determined empirically for best per-feature speed under the given memory constraints. FFT / NUFFT *do* benefit from BLAS / ``n_transf`` batching, but their complex workspace spills L3 past the listed cap (15× slowdown for FFT at chunk=512, 1.9× for NUFFT at chunk=256). 2. **Memory cap** — ``budget_bytes / n_jobs // per_feat``, where ``per_feat`` is the backend-specific transient bytes per feature: - MatrixKernel dense / sparse: ``16 · n`` - MatrixKernel precision-stored CAR: ``24 · n`` - FFTKernel: ``24 · n`` - NUFFTKernel: ``16 · ny·nx + 8 · n`` Parameters ---------- kernel : Kernel The backend kernel the chunk will operate on. n_jobs : int, default 1 Number of parallel workers the caller plans to use. The ``budget_bytes`` is divided by ``n_jobs`` so aggregate live memory stays bounded. budget_bytes : int, default 2 GiB Aggregate live-memory cap across *all* workers. Returns ------- int A ``chunk_size`` in ``[8, chunk_cap]``. The lower bound of 8 ensures measurements stay meaningful even when a single feature consumes most of the per-worker budget. """ # Lazy imports to avoid circular dependency with the FFT / NUFFT modules. from quadsv.kernels.fft import FFTKernel from quadsv.kernels.nufft import NUFFTKernel if isinstance(kernel, FFTKernel): n = kernel.n per_feat = max(1, 24 * n) chunk_cap = 32 elif isinstance(kernel, NUFFTKernel): ny, nx = kernel.grid_shape n = kernel.n per_feat = max(1, 16 * ny * nx + 8 * n) chunk_cap = 64 else: # MatrixKernel family. Precision-stored kernels carry an extra # LU-solve workspace on top of the RHS + output buffer. n = int(getattr(kernel, "n", 0)) or 1 stores_precision = bool(getattr(kernel, "stores_precision", False)) per_feat = (24 if stores_precision else 16) * n # Sparse sweet spot shifts from 16 → 8 once the CSR kernel or # LU factor itself fills L3 (~200k for k≈4 nbrs, rho≈0.9). chunk_cap = 16 if n < 200_000 else 8 n_workers = max(1, int(n_jobs)) per_worker_budget = max(per_feat, budget_bytes // n_workers) mem_cap = int(per_worker_budget // per_feat) return int(np.clip(min(mem_cap, chunk_cap), 8, chunk_cap))
def _liu_prepare_from_cumulants( c: dict[int, float], kurtosis: bool = False, n: int | None = None, ) -> dict[str, float]: r"""Pure-math core: Liu shifted-chi² fit from cumulants ``c_1..c_4``. Called by both :func:`_liu_prepare` (from an explicit eigenvalue spectrum) and :func:`_hutchinson_cumulants` (from probe estimates), so the shifted-chi² algebra lives in one place. The input ``c`` is a mapping ``{1: c_1, 2: c_2, 3: c_3, 4: c_4}`` with ``c_p = trace(K^p)`` (any contributions from non-unit ``dofs`` / non-zero ``deltas`` must already be folded into these sums). Parameters ---------- c : dict[int, float] Spectral cumulants ``{1: c_1, 2: c_2, 3: c_3, 4: c_4}``. kurtosis : bool, default False Use the kurtosis-based edge-case approximation when Liu's discriminant ``s_1² − s_2`` is non-positive. n : int, optional Sample size. When provided, ``sigma_Q`` is set from the finite-:math:`n` Dirichlet(1/2) variance ``Var[Q] = 2·(m·c_2 − c_1²)/(m+2)`` with ``m = n-1`` — matching the ``dirichlet_correction=True`` branch of :func:`compute_null_params`. Without ``n`` (default) the large-:math:`n` limit ``sigma_Q = sqrt(2·c_2)`` is used, which overestimates ``Var[Q]`` when the spectrum is broad (:math:`c_1^2 \approx m \cdot c_2`) — e.g. CAR on a dense grid — and collapses Liu's tail probability to zero. Passing ``n = kernel.n`` recovers the Welch variance. Returns -------- dict[str, float] Liu coefficients for the shifted-chi² approximation, with keys: - ``'mu_Q'`` : float — the mean of the original Q statistic. - ``'sigma_Q'`` : float — the standard deviation of the original Q statistic (with optional finite-``n`` Dirichlet(1/2) correction). - ``'mu_x'`` : float — the mean of the fitted shifted-χ² variable X. - ``'sigma_x'`` : float — the standard deviation of X. - ``'dof_x'`` : float — the degrees of freedom of X. - ``'delta_x'`` : float — the non-centrality parameter of X. Consumers (e.g. :func:`spatial_q_test`) read only these coefficients for the final p-value calculation; the input cumulants are not needed beyond this point. """ s1 = c[3] / (np.sqrt(c[2]) ** 3 + _DELTA) s2 = c[4] / (c[2] ** 2 + _DELTA) s12 = s1**2 if s12 > s2: denom = s1 - np.sqrt(s12 - s2) if abs(denom) < _DELTA: # Catastrophic cancellation — fall back to the kurtosis path. delta_x = 0.0 dof_x = 1.0 / (s2 + _DELTA) else: a = 1.0 / denom delta_x = s1 * a**3 - a**2 dof_x = a**2 - 2.0 * delta_x else: delta_x = 0.0 if kurtosis: dof_x = 1.0 / (s2 + _DELTA) else: dof_x = 1.0 / (s12 + _DELTA) dof_x = max(dof_x, _DELTA) delta_x = max(delta_x, 0.0) # sigma_Q: Dirichlet(1/2)-corrected for the z-scored ratio Q when n # is provided. See :func:`compute_null_params` Notes for the full # derivation of ``Var[Q] = 2·(m·c_2 − c_1²)/(m+2)``. if n is not None and n >= 2: m = n - 1 var_Q = 2.0 * max(m * c[2] - c[1] ** 2, 0.0) / (m + 2) else: var_Q = 2.0 * c[2] return { "mu_Q": float(c[1]), "sigma_Q": float(np.sqrt(max(var_Q, 0.0))), "mu_x": float(dof_x + delta_x), "sigma_x": float(np.sqrt(2 * (dof_x + 2 * delta_x))), "dof_x": float(dof_x), "delta_x": float(delta_x), } def _liu_prepare( lambs: np.ndarray, dofs: np.ndarray | None = None, deltas: np.ndarray | None = None, kurtosis: bool = False, n: int | None = None, ) -> dict[str, float]: """Precompute Liu coefficients from the kernel eigenvalue spectrum. Thin wrapper — builds ``c_1..c_4`` from the weighted eigenvalues and calls :func:`_liu_prepare_from_cumulants`. Parameters ---------- lambs : np.ndarray Eigenvalues of ``K``, shape ``(n_evals,)``. dofs, deltas : np.ndarray, optional Per-eigenvalue degrees of freedom and non-centrality parameters. Default to central chi-squared (ones, zeros). kurtosis : bool, default False Use the kurtosis-based edge-case approximation. n : int, optional Sample size for the Dirichlet(1/2) variance correction. See :func:`_liu_prepare_from_cumulants` for details. """ lambs = np.asarray(lambs, dtype=float) if dofs is None: dofs = np.ones_like(lambs) else: dofs = np.asarray(dofs, dtype=float) if deltas is None: deltas = np.zeros_like(lambs) else: deltas = np.asarray(deltas, dtype=float) lambs_pow = {i: lambs**i for i in range(1, 5)} c = { i: float(np.sum(lambs_pow[i] * dofs) + i * np.sum(lambs_pow[i] * deltas)) for i in range(1, 5) } return _liu_prepare_from_cumulants(c, kurtosis=kurtosis, n=n) def _hutchinson_cumulants( kernel: Kernel, n_probes: int = 60, rng_seed: int = 0, use_analytic_c12: bool = True, ) -> dict[int, float]: r"""Estimate ``c_p = tr(K^p)``, ``p = 1..4`` using random probes. General probe form. For iid ``v`` with ``E[vvᵀ] = I``: :math:`c_p = \mathbb{E}[\mathbf{v}^\top \mathbf{K}^p \mathbf{v}]`. Two matvecs per probe yield :math:`\mathbf{u}_s = \mathbf{K}\mathbf{v}_s` and :math:`\mathbf{w}_s = \mathbf{K}^2 \mathbf{v}_s`, from which all four cumulants fall out as inner products: .. math:: \hat c_1 &= \tfrac{1}{m}\textstyle\sum_s \mathbf{v}_s^\top \mathbf{u}_s, &\hat c_2 &= \tfrac{1}{m}\textstyle\sum_s \|\mathbf{u}_s\|^2, \\ \hat c_3 &= \tfrac{1}{m}\textstyle\sum_s \mathbf{u}_s^\top \mathbf{w}_s, &\hat c_4 &= \tfrac{1}{m}\textstyle\sum_s \|\mathbf{w}_s\|^2. Here we use **iid Rademacher** probes (``±1`` with equal probability), which has strictly smaller variance on :math:`v^\top A v` than :math:`\mathcal{N}(0, I)` probes at fixed probe count: .. math:: \mathrm{Var}_{\text{Rad}}[v^\top A v] &= 2 \sum_{i \neq j} A_{ij}^2 = 2\bigl(\|A\|_F^2 - \|\mathrm{diag}(A)\|^2\bigr), \\ \mathrm{Var}_{\mathcal{N}}[v^\top A v] &= 2 \|A\|_F^2. ``K`` centering is inherited from :meth:`~quadsv.kernels.Kernel.Kx` (which applies ``H`` on both sides whenever ``centering=True``). Analytic substitutions listed below all read from backend-specific :meth:`trace` / :meth:`square_trace` methods that already embed the centering correction (``-s₁/n`` for ``c_1``; ``-2·s₂/n + s₁²/n²`` for ``c_2``). Backend-specific fast paths --------------------------- *FFTKernel* — **full spectrum always, all four cumulants analytic** ``c_p = Σ_k λ̃(k)^p`` is computed analytically using the ``n`` Fourier modes (``O(n)``) cached from :meth:`~quadsv.fft.FFTKernel.eigenvalues`. *MatrixKernel / NUFFTKernel — ``use_analytic_c12=True``* (default) ``c_1`` from :meth:`trace`, ``c_2`` from :meth:`square_trace` (both exact: diagonal-sum / Frobenius² on ``MatrixKernel``, coord-invariant ``(n/n')·Σλ`` / doubled-grid linear-conv ``λᵀΨλ`` on ``NUFFTKernel``). ``c_3``, ``c_4`` always come from the Rademacher probe estimator (closed-form formula are often too expensive). *MatrixKernel with ``stores_precision=True``* (CAR, Graph Laplacian) The stored object is the precision :math:`K^{-1}`, and the kernel's :meth:`trace` / :meth:`square_trace` are themselves Hutchinson estimators (``±1`` Rademacher probes through an LU solve on the precision). We forward the current ``n_probes`` so the precision-side Hutchinson budget tracks this caller's budget; ``c_3`` / ``c_4`` use our Rademacher probes as usual. *``use_analytic_c12=False``* All four cumulants from the same Rademacher probes — useful for diagnostics or when the analytic paths are known to be unreliable. Parameters ---------- kernel : Kernel Must expose ``Kx(v)``. ``FFTKernel`` takes the fast path above. n_probes : int, default 60 Probe count for the ``c_3`` / ``c_4`` estimator — also forwarded to :meth:`trace` / :meth:`square_trace` on precision-stored ``MatrixKernel``. ``m=60`` lands Liu p-values within ``~5%`` of the eigenvalue-exact baseline; ``m=120`` within ``~0.2%``. Cost scales as ``2·n_probes`` kernel matvecs (plus the same count of LU solves when precision-stored). rng_seed : int, default 0 Seed for reproducible probe draws. use_analytic_c12 : bool, default True If ``True`` (default), substitute analytic ``c_1`` / ``c_2`` on ``MatrixKernel`` / ``NUFFTKernel`` as described above; on ``FFTKernel`` the full-spectrum fast path runs regardless. Set ``False`` to force the pure-probe estimator (still skips ``FFTKernel``'s fast path). Returns ------- dict[int, float] ``{1: c_1, 2: c_2, 3: c_3, 4: c_4}`` of the input kernel ``K``. """ # ------------------------------------------------------------------ # FFTKernel fast path — full spectrum is O(n) and exact for all c_p. # ------------------------------------------------------------------ from quadsv.kernels.fft import FFTKernel # lazy to avoid circular import if isinstance(kernel, FFTKernel): # ``return_full_layout=True`` unpacks the rfft2 half-spectrum to # the full ``ny·nx`` layout and zeroes the DC bin when # ``centering=True`` (see FFTKernel.eigenvalues docstring). lam = np.asarray(kernel.eigenvalues(return_full_layout=True), dtype=float) sig = lam[np.abs(lam) > _DELTA] return {p: float(np.sum(sig**p)) for p in (1, 2, 3, 4)} # ------------------------------------------------------------------ # General path — probe c_3, c_4; analytic / Hutchinson-via-trace # for c_1, c_2 depending on the backend. # ------------------------------------------------------------------ rng = np.random.default_rng(rng_seed) V_flat = rng.choice(np.array([-1.0, 1.0]), size=(int(kernel.n), int(n_probes))) def _apply(x_flat: np.ndarray) -> np.ndarray: return np.asarray(kernel.Kx(x_flat)) U = _apply(V_flat) W = _apply(U) c1_probe = float(np.mean(np.sum(V_flat * U, axis=0))) c2_probe = float(np.mean(np.sum(U * U, axis=0))) c3 = float(np.mean(np.sum(U * W, axis=0))) c4 = float(np.mean(np.sum(W * W, axis=0))) c1, c2 = c1_probe, c2_probe if use_analytic_c12 and hasattr(kernel, "trace") and hasattr(kernel, "square_trace"): is_precision_stored = bool(getattr(kernel, "stores_precision", False)) try: if is_precision_stored: # MatrixKernel with stored precision: ``trace`` / # ``square_trace`` are themselves Hutchinson estimators # through an LU solve. Forward ``n_probes`` so the # internal cache uses our probe budget. c1 = float(kernel.trace(n_probes=n_probes)) c2 = float(kernel.square_trace(n_probes=n_probes)) else: # Analytic paths. All three backends (MatrixKernel # non-precision, FFTKernel, NUFFTKernel) return # deterministic analytic cumulants from a no-arg call. c1 = float(kernel.trace()) c2 = float(kernel.square_trace()) except (ValueError, NotImplementedError): # Any analytic path that refuses (e.g. indefinite-Λ # cancellation) silently falls back to probe estimates. c1, c2 = c1_probe, c2_probe return {1: c1, 2: c2, 3: c3, 4: c4} def _liu_apply(t: float | np.ndarray, coef: dict[str, float]) -> np.ndarray: """Evaluate ``Pr(Q > t)`` from cached Liu coefficients. ``coef`` is the dict returned by :func:`_liu_prepare`. Broadcasts across array ``t`` in a single :func:`scipy.stats.ncx2.sf` call. """ t = np.asarray(t, dtype=float) t_star = (t - coef["mu_Q"]) / (coef["sigma_Q"] + _DELTA) tfinal = t_star * coef["sigma_x"] + coef["mu_x"] return ncx2.sf(tfinal, coef["dof_x"], max(coef["delta_x"], 1e-9))
[docs] def liu_sf( t: float | np.ndarray, lambs: np.ndarray, dofs: np.ndarray | None = None, deltas: np.ndarray | None = None, kurtosis: bool = False, n: int | None = None, ) -> float | np.ndarray: """ Liu approximation to a linear combination of non-central chi-squared variables. One-shot convenience wrapper equivalent to ``_liu_apply(t, _liu_prepare(lambs, ...))``. For multi-feature workloads, prefer the split form: call :func:`_liu_prepare` once on the spectrum and :func:`_liu_apply` for each Q-batch (:func:`compute_null_params` already caches the coefficients under ``null_params['liu_coef']``, so :func:`spatial_q_test` does this automatically). Parameters ---------- t : float or np.ndarray Test statistic value(s). Array input is broadcast efficiently through a single :func:`scipy.stats.ncx2.sf` call. lambs : np.ndarray Eigenvalues of the kernel matrix, shape ``(n_evals,)``. dofs : np.ndarray, optional Per-eigenvalue degrees of freedom. Default: ones (chi-squared). deltas : np.ndarray, optional Non-centrality parameters. Default: zeros (central). kurtosis : bool, default False If True, use the kurtosis-based edge-case approximation. n : int, optional Sample size. When provided, applies the Dirichlet(1/2) variance correction ``Var[Q] = 2·(m·c_2 - c_1²)/(m+2)`` with ``m = n-1`` for the z-scored ratio ``Q = XᵀK̃X/σ̂²``. Essential for broad-spectrum PSD kernels (CAR, graph_laplacian) on dense grids, where the large-:math:`n` limit ``2·c_2`` overestimates ``Var[Q]`` and collapses the tail to zero. Default ``None`` keeps the original large-:math:`n` behavior for back-compat with callers supplying a raw eigenvalue mixture. Returns ------- float or np.ndarray Tail probability ``Pr(Q > t)`` with the same shape as ``t``. """ coef = _liu_prepare(lambs, dofs=dofs, deltas=deltas, kurtosis=kurtosis, n=n) return _liu_apply(t, coef)
[docs] def compute_null_params( kernel: Kernel, method: str = "welch", k_eigen: int | None = None, dirichlet_correction: bool = True, liu_n_probes: int | None = None, ) -> dict[str, float | np.ndarray]: r""" Pre-compute null distribution parameters for spatial tests. Call this ONCE before running parallel tests on thousands of features. Caches the expensive computations (traces, cumulants, shifted-χ² fit) for reuse across both Q-tests and R-tests. Parameters ---------- kernel : Kernel The spatial kernel object (MatrixKernel, FFTKernel, NUFFTKernel, or compatible). method : {'clt', 'welch', 'liu'}, default 'welch' Null approximation method for the **Q-test**. The R-test entry ``var_R = trace(K̃²)`` is always populated alongside, regardless of ``method`` — R-tests use a Normal approximation and only need this one moment. - 'clt': Central Limit Theorem (Z-score normal approximation) - 'welch': Welch-Satterthwaite moment matching (fast, uses traces) - 'liu': Liu eigenvalue-based approximation (accurate tail, slower) k_eigen : int, optional Number of top eigenvalues to compute if method='liu' and kernel is sparse. If None, computes all available eigenvalues. Ignored when ``liu_n_probes`` is set (eigenvalues are bypassed entirely). liu_n_probes : int, optional If set, bypass the eigendecomposition for ``method='liu'`` and estimate the four spectral cumulants ``c_p = trace(K̃^p)``, ``p = 1..4``, directly from the kernel via Hutchinson probing (:func:`_hutchinson_cumulants`). Cost drops from :math:`O(n^3)` (dense eigensolve) or :math:`O(r^3)` (reduced Toeplitz-M) to :math:`2 \cdot n_\mathrm{probes}` kernel matvecs, at the cost of :math:`O(n_\mathrm{probes}^{-1/2})` Monte-Carlo error in each cumulant. Rule of thumb: ``n_probes = 60`` gives Liu p-values within ``~5\%`` of the eigenvalue-exact baseline; ``n_probes = 120`` within ``~0.2\%``. When ``None`` (default), the eigenvalue path is used. dirichlet_correction : bool, default True When ``True``: use the finite-``n`` Dirichlet(1/2) ratio ``Var[Q] = 2 · (m · trace((HKH)²) - trace(HKH)²) / (m+2)`` with ``m = n-1``. When ``False``: drop the ``mean²`` term to the large-``n`` limit ``Var[Q] = 2 · trace((HKH)²)``, a monotonic upper bound that slightly overestimates ``Var[Q]`` at finite ``n``. See **Notes** for the derivation. Returns ------- dict[str, float or np.ndarray] Always populated (regardless of ``method``): - ``'method'`` : str — the Q-test approximation selected. - ``'var_R'`` : float — ``trace(K̃²)``, the null variance of ``R`` (used by :func:`spatial_r_test`). Method-specific additions: - ``method='liu'`` (default for FFT / NUFFT kernels): * ``'cumulants'`` : ``dict {1: c_1, 2: c_2, 3: c_3, 4: c_4}`` with ``c_p = trace(K̃^p)``. Computed from the full eigendecomposition when available (``liu_n_probes is None``) or from :math:`2m` Hutchinson probes otherwise. * ``'liu_coef'`` : ``dict`` with cached Liu coefficients ``{'mu_Q', 'sigma_Q', 'mu_x', 'sigma_x', 'dof_x', 'delta_x'}`` derived from ``cumulants`` once; consumed by :func:`spatial_q_test` so per-feature p-values reduce to a pure :math:`t`-broadcast. - ``method='welch'`` (default for MatrixKernel Q-tests): * ``'mean_Q'`` : ``trace(K)`` * ``'var_Q'`` : ``2 · trace(K²)`` * ``'scale_g'`` : Welch scale parameter ``var_Q / (2 · mean_Q)`` * ``'df_h'`` : Welch df ``2 · mean_Q² / var_Q`` - ``method='clt'``: ``'mean_Q'``, ``'var_Q'`` only. Consumers (``spatial_q_test`` / ``spatial_r_test``) read only the keys their approximation needs; the dict is safe to reuse across calls. Raises ------ AssertionError If method is not one of 'clt', 'welch', 'liu'. Examples -------- >>> kernel = MatrixKernel.from_coordinates(coords, method='gaussian') >>> params = compute_null_params(kernel, method='welch') >>> Q, pval = spatial_q_test(data, kernel, null_params=params) >>> R, r_pval = spatial_r_test(x, y, kernel, null_params=params) Notes ----- :func:`spatial_q_test` standardizes its input as :math:`Z = (X - \bar{X}\,\mathbf{1}) / \sigma`, so the realized quadratic form is .. math:: Q \;=\; Z^{\top} K Z \;=\; X^{\top}\, H K H\, X / \sigma^{2}, \qquad H = I - \mathbf{1}\mathbf{1}^{\top} / n. Null moments are therefore for the double-centered operator :math:`\tilde{K} = H K H`, not raw :math:`K`. :math:`Q` is additionally a *ratio* of quadratic forms — the denominator :math:`\sigma^{2} = X^{\top} H X / (n-1)` is a random variable correlated with the numerator. ``dirichlet_correction=True`` applies the finite-:math:`n` correction derived from the Dirichlet(1/2) distribution of :math:`Y_i = X_i^{2} / \sum_j X_j^{2}`: .. math:: \mathrm{Var}[Q] \;=\; \frac{2 \bigl[\, m \cdot \mathrm{tr}(\tilde K^{2}) - \mathrm{tr}(\tilde K)^{2} \,\bigr]}{m + 2}, \qquad m = n - 1. With ``dirichlet_correction=False`` the finite term drops out and :math:`\mathrm{Var}[Q] = 2\,\mathrm{tr}(\tilde K^{2})` (large-:math:`n` limit). """ params = {"method": method} assert method in ["clt", "welch", "liu"], "Method must be 'clt', 'welch', or 'liu'." # Moran's I kernel is indefinite (non-PSD) — its eigenvalues span both # signs, and its trace is ≈ 0 by construction. Welch / Liu are both # PSD-assuming moment-matching schemes (Welch needs ``mean_Q > 0``; # Liu fits a shifted χ² to a Σλ·χ²₁ mixture that only makes sense when # the λ are non-negative). Force ``'clt'`` for Moran — a direct Normal # approximation on the CLT-limit distribution of the standardized Q — # across all three backends. kernel_method = getattr(kernel, "method", None) if kernel_method == "moran" and method != "clt": raise ValueError( f"Moran's I kernel is indefinite; only null_method='clt' is " f"supported for the Q-test. Got method={method!r}." ) # Centered traces can be computed cheaply from two additional numbers: # s1 = 𝟏ᵀ K 𝟏, s2 = ‖K·𝟏‖² = 𝟏ᵀ K² 𝟏 # via a single K·𝟏 application (see `Kernel._ones_stats`), giving # trace(HKH) = trace(K) − s1/n # trace((HKH)²) = trace(K²) − 2·s2/n + s1²/n² n = int(kernel.n) tr_HKH = float(kernel.trace()) tr_HKH_sq = float(kernel.square_trace()) params["var_R"] = tr_HKH_sq if method == "liu": # Liu's method is entirely determined by the four spectral # cumulants c_1..c_4. We ALWAYS store them under ``cumulants`` # and the derived shifted-χ² fit under ``liu_coef``; callers # consuming Liu p-values read only ``liu_coef``. # # Two paths produce the cumulants: # - ``liu_n_probes is not None``: Hutchinson — 2·m matvecs. # - ``liu_n_probes is None``: try the kernel's full # eigendecomposition first, fall back to Hutchinson if the # kernel can't produce a full spectrum (NUFFT with broad # support or indefinite Λ → ``NotImplementedError``). if liu_n_probes is not None: c = _hutchinson_cumulants(kernel, n_probes=int(liu_n_probes)) else: try: vals = kernel.eigenvalues(k=k_eigen) sig = vals[np.abs(vals) > 1e-9] c = {p: float(np.sum(sig**p)) for p in (1, 2, 3, 4)} except NotImplementedError: c = _hutchinson_cumulants(kernel, n_probes=60) params["cumulants"] = c # Pass ``n`` for the Dirichlet(1/2) variance correction — matches # the Welch branch below when ``dirichlet_correction=True``. # Broad-spectrum PSD kernels (CAR, graph_laplacian) have # ``c_1² ≈ m · c_2``; the large-n limit ``2·c_2`` then # overestimates ``Var[Q]`` by up to an order of magnitude, # collapsing Liu's tail probability to zero. liu_n = n if dirichlet_correction else None params["liu_coef"] = _liu_prepare_from_cumulants(c, n=liu_n) else: # Q-test CLT / Welch moments. mean_Q = tr_HKH if dirichlet_correction: m = max(n - 1, 1) var_Q = 2.0 * (m * tr_HKH_sq - tr_HKH**2) / (m + 2) else: # Large-n limit — drops the (m·sq − mean²) cancellation that # amplifies ``sq``-errors for broad-spectrum PSD kernels. var_Q = 2.0 * tr_HKH_sq var_Q = max(var_Q, 0.0) # numerical safety params["mean_Q"] = float(mean_Q) params["var_Q"] = float(var_Q) if method == "welch": # Pre-calculate Welch-Satterthwaite parameters. if var_Q > 0 and mean_Q > 0: params["scale_g"] = var_Q / (2.0 * mean_Q) params["df_h"] = (2.0 * mean_Q**2) / var_Q else: params["scale_g"] = 1.0 params["df_h"] = 1.0 return params
def _q_test_matrix( # noqa: C901 Xn: np.ndarray | sp.spmatrix, kernel: Kernel, null_params: dict | None = None, return_pval: bool = True, is_standardized: bool = False, ) -> float | np.ndarray | tuple[float | np.ndarray, float | np.ndarray]: """Single-batch Q-test on a MatrixKernel (no chunking). Parallel to :func:`quadsv.kernels.fft._q_test_fft` / :func:`quadsv.kernels.nufft._q_test_nufft`: takes whatever batch size is handed in and processes it in one call. The chunking loop lives in :func:`spatial_q_test`, which dispatches here per chunk. """ is_sparse = sp.issparse(Xn) if is_sparse: n, M = Xn.shape if Xn.ndim == 2 else (Xn.shape[0], 1) if Xn.ndim == 1 or M == 1: Xn = Xn.reshape(-1, 1) M = 1 else: Xn = np.asarray(Xn, dtype=float) if Xn.ndim == 1: Xn = Xn.reshape(-1, 1) n, M = Xn.shape # Fast path: sparse Xn + unstandardized + kernel exposes xtKx_standardized. # Uses the (K·1, 1ᵀK1) expansion so sparse Xn never needs densification. if is_sparse and not is_standardized and hasattr(kernel, "xtKx_standardized"): col_sum = np.asarray(Xn.sum(axis=0)).ravel() means = col_sum / n sq_sum = np.asarray(Xn.multiply(Xn).sum(axis=0)).ravel() var = (sq_sum - n * means**2) / max(n - 1, 1) var[var < 0] = 0.0 stds = np.sqrt(var) Q = kernel.xtKx_standardized(Xn, means, stds) else: if is_standardized: z = Xn else: if is_sparse: Xn = Xn.toarray() means = np.mean(Xn, axis=0) stds = np.std(Xn, axis=0, ddof=1) valid_mask = stds > 1e-12 z = np.zeros_like(Xn) if np.any(valid_mask): z[:, valid_mask] = (Xn[:, valid_mask] - means[valid_mask]) / stds[valid_mask] if hasattr(kernel, "xtKx"): Q = kernel.xtKx(z) else: # Fallback for raw matrices. Kz = kernel.dot(z) if sp.issparse(kernel) else np.dot(kernel, z) Q = np.sum(z * Kz, axis=0) Q = np.atleast_1d(np.asarray(Q, dtype=float)) if M == 1 and Q.size == 1: Q = Q.item() if not return_pval: return Q # P-value from cached null_params (pre-resolved by spatial_q_test). kernel_method = getattr(kernel, "method", None) null_approx_method = null_params.get("method", "welch") if null_params else "welch" if kernel_method == "moran" and null_approx_method != "clt": raise ValueError( f"Moran's I kernel is indefinite; only null_method='clt' is " f"supported for the Q-test. Got method={null_approx_method!r}." ) if null_approx_method == "clt": mu_Q = null_params["mean_Q"] var_Q = null_params["var_Q"] if var_Q > 0: z_score = (np.atleast_1d(Q) - mu_Q) / np.sqrt(var_Q) pval = chi2.sf(z_score**2, df=1) else: pval = np.ones(max(np.atleast_1d(Q).size, 1), dtype=float) elif null_approx_method == "welch": g = null_params["scale_g"] d = null_params["df_h"] pval = chi2.sf(np.atleast_1d(Q) / g, df=d) elif null_approx_method == "liu": coef = null_params.get("liu_coef") if coef is None: if "cumulants" not in null_params: raise ValueError( "null_params with method='liu' must contain either " "'liu_coef' (preferred) or 'cumulants'. Build via " "compute_null_params(kernel, method='liu')." ) n_kernel = int(getattr(kernel, "n", 0)) or None coef = _liu_prepare_from_cumulants(null_params["cumulants"], n=n_kernel) pval = _liu_apply(np.atleast_1d(Q), coef) else: pval = np.ones(max(np.atleast_1d(Q).size, 1), dtype=float) pval = np.atleast_1d(pval) if M == 1 and pval.size == 1: pval = pval.item() return Q, pval def _chunk_last_axis(X, start: int, end: int): """Slice ``X`` along its trailing (feature) axis. Works on numpy arrays and ``scipy.sparse`` matrices alike.""" if sp.issparse(X) or X.ndim == 2: return X[:, start:end] return X[:, :, start:end] def _feature_count(X, is_fft: bool) -> int: """Return the number of features ``M`` in the trailing axis of ``X``. FFTKernel input shape is ``(ny, nx)`` / ``(ny, nx, M)``; everything else is ``(n,)`` / ``(n, M)``. """ if is_fft: return X.shape[2] if X.ndim == 3 else 1 if sp.issparse(X): return X.shape[1] if X.ndim == 2 else 1 return X.shape[1] if X.ndim == 2 else 1 def _resolve_chunk_size( chunk_size: int | str, kernel: Kernel, M: int, n_jobs: int = 1, ) -> int: """Turn ``chunk_size='auto' | -1 | int`` into a concrete batch size. ``'auto'`` → :func:`auto_chunk_size`. ``-1`` or ``>= M`` → ``M`` (no chunking). Everything else is coerced to a positive int. """ if isinstance(chunk_size, str): if chunk_size != "auto": raise ValueError(f"chunk_size must be 'auto', -1, or int, got {chunk_size!r}.") resolved = auto_chunk_size(kernel, n_jobs=n_jobs) elif int(chunk_size) == -1: resolved = M else: resolved = max(1, int(chunk_size)) return max(1, min(resolved, M))
[docs] def spatial_q_test( # noqa: C901 Xn: np.ndarray | sp.spmatrix, kernel: Kernel, null_params: dict | None = None, return_pval: bool = True, is_standardized: bool = False, chunk_size: int | str = "auto", show_progress: bool = False, ) -> float | np.ndarray | tuple[float | np.ndarray, float | np.ndarray]: """ Univariate spatial Q-test for detecting spatial variability. Top-level chunking wrapper — splits the feature batch along the trailing axis into blocks of ``chunk_size`` features, dispatches each block to the backend-specific per-chunk helper (:func:`quadsv.kernels.fft._q_test_fft`, :func:`quadsv.kernels.nufft._q_test_nufft`, or :func:`_q_test_matrix`), and concatenates the results. The per-chunk helpers do **not** handle chunking themselves. Parameters ---------- Xn : np.ndarray or scipy.sparse matrix Input data of shape ``(n,)`` / ``(n, M)`` for MatrixKernel and NUFFTKernel, or ``(ny, nx)`` / ``(ny, nx, M)`` for FFTKernel. Can be dense numpy array or sparse matrix (CSC/CSR recommended) for MatrixKernel; FFT/NUFFT paths require dense input. kernel : Kernel Pre-constructed :class:`~quadsv.kernels.Kernel` (``MatrixKernel`` / ``FFTKernel`` / ``NUFFTKernel``) or a raw dense / sparse kernel matrix. null_params : dict, optional Pre-computed null distribution parameters from :func:`compute_null_params`. Resolved once at the top level if ``None`` and shared across chunks (no redundant recomputation). return_pval : bool, default True If True, returns ``(Q, pval)``; else returns ``Q`` only. is_standardized : bool, default False If True, skips Z-score standardization internally. chunk_size : int or ``'auto'``, default ``'auto'`` Number of features processed per per-chunk dispatch call. ``'auto'`` defers to :func:`auto_chunk_size` (backend-specific cache sweet spot under a 2 GiB live-memory budget); ``-1`` processes the full batch in a single call. For a cross-backend cost model see :doc:`/guides/scaling`. show_progress : bool, default False If True, displays a tqdm bar over chunks (only when ``M > chunk_size``). Returns ------- Q : float or np.ndarray Test statistic value(s). Shape ``(M,)`` for 2-D / 3-D inputs, scalar for 1-D. pval : float or np.ndarray, optional Tail probability under null hypothesis; returned only if ``return_pval=True``. Same shape as Q. Notes ----- Under H₀: data is spatially independent. Under H₁: mean shift present. The test statistic ``Q = xᵀ K x`` approximates a chi-squared mixture under the null; see :doc:`/guides/theory` and :doc:`/guides/scaling`. Examples -------- >>> coords = np.random.randn(100, 2) >>> kernel = MatrixKernel.from_coordinates(coords, method='gaussian') >>> data = np.random.randn(100) >>> Q, pval = spatial_q_test(data, kernel) >>> # Sparse-matrix batch of features (auto-chunked): >>> from scipy.sparse import csr_matrix >>> sparse_data = csr_matrix(np.random.randn(100, 1000)) >>> Q, pval = spatial_q_test(sparse_data, kernel, show_progress=True) """ # Lazy imports — avoid circular dependency with the FFT / NUFFT modules. from quadsv.kernels.fft import FFTKernel, _q_test_fft from quadsv.kernels.nufft import NUFFTKernel, _q_test_nufft is_fft = isinstance(kernel, FFTKernel) is_nufft = isinstance(kernel, NUFFTKernel) is_matrix_path = not (is_fft or is_nufft) # Resolve null_params once (cached across chunks). kernel_method = getattr(kernel, "method", None) if ( return_pval and null_params is not None and "method" in null_params and len(null_params) == 1 ): # User passed only {'method': ...} — flesh out the full param set. null_params = {**null_params, **compute_null_params(kernel, method=null_params["method"])} elif return_pval and null_params is None and is_matrix_path: if not hasattr(kernel, "square_trace"): # A raw dense / sparse kernel matrix can't produce null moments # on its own — the caller must supply ``null_params`` or wrap # the matrix in a :class:`~quadsv.MatrixKernel`. raise ValueError( "spatial_q_test received a raw kernel matrix without " "null_params; pass a Kernel object or provide " "null_params=compute_null_params(kernel)." ) default_method = "clt" if kernel_method == "moran" else "welch" null_params = compute_null_params(kernel, method=default_method) # Determine M on the trailing axis. M = _feature_count(Xn, is_fft=is_fft) resolved_chunk = _resolve_chunk_size(chunk_size, kernel, M) if is_fft: def _dispatch(X): return _q_test_fft( X, kernel, null_params=null_params, return_pval=return_pval, is_standardized=is_standardized, ) elif is_nufft: def _dispatch(X): return _q_test_nufft( X, kernel, null_params=null_params, return_pval=return_pval, is_standardized=is_standardized, ) else: def _dispatch(X): return _q_test_matrix( X, kernel, null_params=null_params, return_pval=return_pval, is_standardized=is_standardized, ) # Single-batch shortcut. if resolved_chunk >= M: return _dispatch(Xn) # Chunk loop. starts = list(range(0, M, resolved_chunk)) iterator = starts if show_progress and len(starts) > 1: iterator = tqdm( starts, desc="Q-test chunks", total=len(starts), bar_format="{l_bar}{bar:30}{r_bar}{bar:-30b}", ) Q_parts: list[np.ndarray] = [] P_parts: list[np.ndarray] = [] for start in iterator: end = min(start + resolved_chunk, M) block = _chunk_last_axis(Xn, start, end) result = _dispatch(block) if return_pval: Q_b, P_b = result Q_parts.append(np.atleast_1d(Q_b)) P_parts.append(np.atleast_1d(P_b)) else: Q_parts.append(np.atleast_1d(result)) Q = np.concatenate(Q_parts) if return_pval: return Q, np.concatenate(P_parts) return Q
def _r_test_matrix( # noqa: C901 Xn: np.ndarray | sp.spmatrix, Yn: np.ndarray | sp.spmatrix, kernel: Kernel, null_params: dict | None = None, return_pval: bool = True, is_standardized: bool = False, ) -> float | np.ndarray | tuple[float | np.ndarray, float | np.ndarray]: """Single-batch R-test on a MatrixKernel (no chunking). Parallel to :func:`quadsv.kernels.fft._r_test_fft` / :func:`quadsv.kernels.nufft._r_test_nufft`: takes whatever batch size is handed in and processes it in one call. The chunking loop lives in :func:`spatial_r_test`, which dispatches here per chunk. """ # Normalize shapes; preserve sparsity of inputs. def _prep(A): if sp.issparse(A): return A.reshape(-1, 1) if A.ndim == 1 else A arr = np.asarray(A, dtype=float) return arr.reshape(-1, 1) if arr.ndim == 1 else arr Xn, Yn = _prep(Xn), _prep(Yn) if Xn.shape != Yn.shape: raise ValueError(f"Xn and Yn shapes must match, got {Xn.shape} vs {Yn.shape}.") n, M = Xn.shape if n != kernel.n: raise ValueError(f"Kernel.n={kernel.n} does not match data rows {n}.") def _standardize(A): """Z-score A (sparse or dense) column-wise with ddof=1. Returns dense.""" if sp.issparse(A): col_sum = np.asarray(A.sum(axis=0)).ravel() means = col_sum / n sq_sum = np.asarray(A.multiply(A).sum(axis=0)).ravel() var = (sq_sum - n * means**2) / max(n - 1, 1) var[var < 0] = 0.0 stds = np.sqrt(var) Z = A.toarray() - means else: means = np.mean(A, axis=0) stds = np.std(A, axis=0, ddof=1) Z = A - means valid = stds > 1e-12 Z[:, ~valid] = 0.0 if np.any(valid): Z[:, valid] /= stds[valid] return Z if is_standardized: Zx = np.asarray(Xn.toarray() if sp.issparse(Xn) else Xn, dtype=float) Zy = np.asarray(Yn.toarray() if sp.issparse(Yn) else Yn, dtype=float) else: Zx = _standardize(Xn) Zy = _standardize(Yn) # R = diag(Zx^T K Zy) via the kernel's public bilinear primitive. R = np.atleast_1d(np.asarray(kernel.xtKy(Zx, Zy))) if M == 1 and R.size == 1: R = R.item() if not return_pval: return R # P-value (Normal Approximation). Both X, Y are z-scored before # R = Zₓᵀ K Zᵧ, so R ~ N(0, trace((HKH)²)) — NOT trace(K²). if null_params is not None and "var_R" in null_params: var_R = float(null_params["var_R"]) else: var_R = float(kernel.square_trace()) sigma = np.sqrt(var_R) if sigma > 0: z_score = R / sigma pval = 2 * norm.sf(np.abs(z_score)) else: pval = np.ones_like(R) if isinstance(R, np.ndarray) else 1.0 return R, pval
[docs] def spatial_r_test( # noqa: C901 Xn: np.ndarray | sp.spmatrix, Yn: np.ndarray | sp.spmatrix, kernel: Kernel, null_params: dict | None = None, return_pval: bool = True, is_standardized: bool = False, chunk_size: int | str = "auto", show_progress: bool = False, ) -> float | np.ndarray | tuple[float | np.ndarray, float | np.ndarray]: """ Bivariate spatial R-test for correlation between two spatial variables. Top-level chunking wrapper — splits the paired feature batch along the trailing axis into blocks of ``chunk_size`` features, dispatches each block to the backend-specific per-chunk helper (:func:`quadsv.kernels.fft._r_test_fft`, :func:`quadsv.kernels.nufft._r_test_nufft`, or :func:`_r_test_matrix`), and concatenates the results. The per-chunk helpers do **not** handle chunking themselves. Parameters ---------- Xn : np.ndarray or scipy.sparse matrix First input. Shape ``(n,)`` / ``(n, M)`` for MatrixKernel and NUFFTKernel, ``(ny, nx)`` / ``(ny, nx, M)`` for FFTKernel. Yn : np.ndarray or scipy.sparse matrix Second input, same shape as ``Xn`` (paired R-test). For ``NUFFTKernel`` a bipartite mode with ``M_x != M_y`` is passed through without chunking. kernel : Kernel Pre-constructed :class:`~quadsv.kernels.Kernel`. null_params : dict, optional Pre-computed null parameters; only ``'var_R'`` is consumed. Resolved once at the top level if ``None`` and shared across chunks. return_pval : bool, default True If True, returns ``(R, pval)``; else returns ``R`` only. is_standardized : bool, default False If True, skips Z-score standardization internally. chunk_size : int or ``'auto'``, default ``'auto'`` Number of feature pairs processed per per-chunk dispatch call. ``'auto'`` defers to :func:`auto_chunk_size`; ``-1`` processes the full batch in a single call. For a cross-backend cost model see :doc:`/guides/scaling`. show_progress : bool, default False If True, displays a tqdm bar over chunks (only when ``M > chunk_size``). Returns ------- R : float or np.ndarray Test statistic value(s). Shape ``(M,)`` for 2-D / 3-D inputs, scalar for 1-D. For bipartite NUFFT input (``M_x != M_y``), shape ``(M_x, M_y)``. pval : float or np.ndarray, optional Two-tailed tail probability under null hypothesis; returned only if ``return_pval=True``. Notes ----- Under H₀, ``R = xᵀ K y`` is approximated as :math:`\\mathcal{N}(0, \\mathrm{trace}((HKH)^2))` on z-scored inputs. See :doc:`/guides/theory` and :doc:`/guides/scaling`. Examples -------- >>> coords = np.random.randn(100, 2) >>> kernel = MatrixKernel.from_coordinates(coords, method='gaussian') >>> x_data = np.random.randn(100) >>> y_data = np.random.randn(100) >>> R, pval = spatial_r_test(x_data, y_data, kernel) """ # Lazy imports — avoid circular dependency with the FFT / NUFFT modules. from quadsv.kernels.fft import FFTKernel, _r_test_fft from quadsv.kernels.nufft import NUFFTKernel, _r_test_nufft is_fft = isinstance(kernel, FFTKernel) is_nufft = isinstance(kernel, NUFFTKernel) # Resolve var_R once (cached across chunks). if null_params is None: null_params = {"var_R": float(kernel.square_trace())} elif "var_R" not in null_params: null_params = {**null_params, "var_R": float(kernel.square_trace())} Mx = _feature_count(Xn, is_fft=is_fft) My = _feature_count(Yn, is_fft=is_fft) if is_fft: def _dispatch(X, Y): return _r_test_fft( X, Y, kernel, null_params=null_params, return_pval=return_pval, is_standardized=is_standardized, ) elif is_nufft: def _dispatch(X, Y): return _r_test_nufft( X, Y, kernel, null_params=null_params, return_pval=return_pval, is_standardized=is_standardized, ) else: def _dispatch(X, Y): return _r_test_matrix( X, Y, kernel, null_params=null_params, return_pval=return_pval, is_standardized=is_standardized, ) # Bipartite NUFFT (M_x != M_y) doesn't fit the paired-chunk pattern — # pass the full batch through and let the backend handle it. if Mx != My: return _dispatch(Xn, Yn) M = Mx resolved_chunk = _resolve_chunk_size(chunk_size, kernel, M) # Single-batch shortcut. if resolved_chunk >= M: return _dispatch(Xn, Yn) # Chunk loop. starts = list(range(0, M, resolved_chunk)) iterator = starts if show_progress and len(starts) > 1: iterator = tqdm( starts, desc="R-test chunks", total=len(starts), bar_format="{l_bar}{bar:30}{r_bar}{bar:-30b}", ) R_parts: list[np.ndarray] = [] P_parts: list[np.ndarray] = [] for start in iterator: end = min(start + resolved_chunk, M) Xblock = _chunk_last_axis(Xn, start, end) Yblock = _chunk_last_axis(Yn, start, end) result = _dispatch(Xblock, Yblock) if return_pval: R_b, P_b = result R_parts.append(np.atleast_1d(R_b)) P_parts.append(np.atleast_1d(P_b)) else: R_parts.append(np.atleast_1d(result)) R = np.concatenate(R_parts) if return_pval: return R, np.concatenate(P_parts) return R