Source code for quadsv.kernels.nufft

"""
Non-uniform FFT (NUFFT) spectra, kernel and spatial tests for irregular data

When data sit on a regular grid (e.g., a rasterized Visium slide),
:func:`quadsv.power_spectrum_2d` computes :math:`|\\hat{x}(k)|^2` with a plain
2D FFT. For data whose spatial coordinates are **irregular** — e.g.,
imaging-based in situ platforms, Slide-seq, or a Visium slide read straight
from ``adata.obsm['spatial']`` without rasterization — :func:`power_spectrum_2d_nufft`
evaluates the type-1 NUFFT

.. math::

   \\hat c(k_y, k_x) = \\sum_{j=1}^{n} c_j \\,
       \\exp\\!\\bigl[-i(k_y\\,y_j + k_x\\,x_j)\\bigr]

on the same uniform ``(ny, nx)`` k-space grid that :func:`power_spectrum_2d`
would produce for a rasterized input of the same physical extent, and returns
:math:`|\\hat c|^2` in the scipy FFT layout (DC at ``[0, 0]``). Anything
downstream — :func:`quadsv.comparators.multisample.radial_bin_spectrum`,
:class:`quadsv.ComparatorIrregular` — works identically.

Notation (shared across this module)
------------------------------------

Dimensions:

- ``n``: number of spots (on the irregular grid).
- ``(ny, nx)``: internal uniform k-grid dimensions; ``n' = ny · nx``.
- ``(dy, dx)``: **physical** spacing per k-grid cell, same unit as the spatial coordinates
  after multiplying ``unit_scale``.
- ``unit_scale``: multiplier that converts the input coordinates ``S`` to the same unit as
  ``(dy, dx)`` (e.g., 0.35 if ``S`` are in pixels at 0.35 μm/pixel). Samples from different
  slides and platforms may ship coordinates in different units; this parameter harmonizes them
  onto the same **physical** unit for the internal k-grid and all downstream spectra and tests.

Vectors and matrices:

- ``S``: the ``n × 2`` spatial coordinate matrix of the irregular points, ordered as ``(y, x)``.
- ``K``: the ``n × n`` translation-invariant kernel at the irregular points.
- ``K'``: the ``n' × n'`` grid kernel with FFT eigenvalues
  ``λ(k) = F(K')(k)``.
- ``U``: the ``n × n'`` type-2 NUFFT evaluation matrix; the band-limited
  approximation is ``K ≈ (1/n') · U · diag(λ) · Uᴴ``.
- ``x̂ = Uᴴ x``: type-1 NUFFT of a length-``N`` signal onto the k-grid ``(ny, nx)`` (vectorized).

"""

from __future__ import annotations

import logging

import finufft
import numpy as np
import scipy.fft
import scipy.sparse as sp
from scipy.stats import chi2, norm

from quadsv.kernels.base import Kernel
from quadsv.kernels.fft import FFTKernel

__all__ = [
    "power_spectrum_2d_nufft",
    "NUFFTKernel",
]

logger = logging.getLogger(__name__)


def _infer_grid_from_coords(
    coords: np.ndarray,
    unit_scale: float = 1.0,
    oversample: float = 2.0,
    padding: float = 1.05,
    min_side: int = 32,
    max_side: int = 1024,
) -> tuple[tuple[int, int], tuple[float, float]]:
    """Pick ``(grid_shape, spacing)`` from coords alone, with no kernel input.

    The k-grid only needs to resolve the signal's sampling Nyquist, which is
    set by the median nearest-neighbor spacing of the coordinates. Finer than
    that is wasted work (aliasing kicks in anyway). Coarser misses kernel
    spectral content. ``oversample=2.0`` is a safe default.

    Returns ``(grid_shape, spacing)`` rounded to FFT-friendly sizes (multiples
    of 8).
    """
    from scipy.spatial import cKDTree

    scaled = np.asarray(coords, dtype=np.float64) * float(unit_scale)
    if scaled.ndim != 2 or scaled.shape[1] != 2:
        raise ValueError(f"coords must be (n, 2), got {scaled.shape}.")
    L_y = float(scaled[:, 0].max() - scaled[:, 0].min()) * padding
    L_x = float(scaled[:, 1].max() - scaled[:, 1].min()) * padding
    if L_y <= 0 or L_x <= 0:
        raise ValueError("coords have zero extent along one or both axes.")
    # Median 1-NN distance — robust proxy for the sampling scale.
    nn = cKDTree(scaled).query(scaled, k=2)[0][:, 1]
    d_nn = float(np.median(nn[nn > 0]))
    spacing_target = d_nn / oversample

    def _round_up(n: float) -> int:
        return int(min(max_side, max(min_side, 8 * int(np.ceil(n / 8)))))

    ny = _round_up(L_y / spacing_target)
    nx = _round_up(L_x / spacing_target)
    return (ny, nx), (L_y / ny, L_x / nx)


def _resolve_k_neighbors_on_coords(
    coords: np.ndarray,
    k_neighbors: int,
    grid_shape: tuple[int, int],
    spacing: tuple[float, float],
    unit_scale: float = 1.0,
) -> int:
    r"""Map ``k_neighbors`` on irregular coords to a grid-ring ``neighbor_degree``.

    Graph-based kernels (``moran`` / ``graph_laplacian`` / ``car``) on
    :class:`MatrixKernel` use a mutual-``k``-nearest-neighbor graph on the
    *actual* coord set — the connectivity of pair ``(i, j)`` depends on
    whether ``y_i`` and ``y_j`` are among each other's ``k`` closest points
    in coordinate space. :class:`FFTKernel`, by contrast, builds a
    translation-invariant kernel on the internal ``(ny, nx)`` lattice and
    routes ``k_neighbors`` through :meth:`FFTKernel._k_neighbors_to_degree`,
    which counts *grid cells* within a ring of the origin — entirely
    divorced from the coord set's density.

    :class:`NUFFTKernel` bridges the two: it builds the FFT grid kernel
    but applies it at the irregular coords. The correct semantic for
    ``k_neighbors`` on this path is ``MatrixKernel``'s — "nearby in
    coord space". This helper enforces that semantic by picking the grid
    ring whose Euclidean radius is closest to the *median k-th nearest
    neighbor distance of the coord set*. Points within that radius
    couple through the kernel; points farther apart do not.

    Parameters
    ----------
    coords : np.ndarray
        ``(n, 2)`` coord array in the same unit as ``coords`` passed to
        :class:`NUFFTKernel.__init__`.
    k_neighbors : int
        Target mutual-k-NN count, in the coord set (not grid cells).
    grid_shape : tuple[int, int]
        Internal FFT grid ``(ny, nx)``.
    spacing : tuple[float, float]
        Internal FFT grid spacing ``(dy, dx)`` in physical units.
    unit_scale : float, default 1.0
        Multiplier so ``coords * unit_scale`` shares its unit with
        ``spacing``. Mirrors :class:`NUFFTKernel`'s ``unit_scale``.

    Returns
    -------
    int
        ``neighbor_degree`` (≥ 1) to forward to :class:`FFTKernel`.

    Raises
    ------
    ValueError
        If ``k_neighbors`` is out of ``[1, n-1]``.
    """
    from scipy.spatial import cKDTree

    k_neighbors = int(k_neighbors)
    n = int(coords.shape[0])
    if k_neighbors < 1 or k_neighbors >= n:
        raise ValueError(f"k_neighbors must be in [1, {n - 1}], got {k_neighbors}.")

    # Median k-th NN distance in the same physical units as ``spacing``.
    coords_phys = np.asarray(coords, dtype=np.float64) * float(unit_scale)
    # ``k=k_neighbors + 1``: the 0-th "neighbor" is the point itself.
    dists, _ = cKDTree(coords_phys).query(coords_phys, k=k_neighbors + 1)
    r_k = float(np.median(dists[:, k_neighbors]))
    r_k_sq = r_k * r_k

    # Unique ring radii² on the square-topology torus grid (mirrors
    # :meth:`FFTKernel._precompute_square_dists` /
    # :meth:`FFTKernel._unique_ring_distances` for the ``topology='square'``
    # case NUFFTKernel always instantiates).
    ny, nx = int(grid_shape[0]), int(grid_shape[1])
    dy, dx = float(spacing[0]), float(spacing[1])
    y = np.arange(ny) * dy
    x = np.arange(nx) * dx
    y = np.minimum(y, (ny * dy) - y)
    x = np.minimum(x, (nx * dx) - x)
    yy, xx = np.meshgrid(y, x, indexing="ij")
    d_sq = (yy**2 + xx**2).ravel()
    flat = np.sort(d_sq)
    tol = 1e-6 * max(1.0, float(flat[-1]))
    keep = np.concatenate([[True], np.diff(flat) > tol])
    unique_dists_sq = flat[keep]  # ascending, starts at 0 (self)

    if unique_dists_sq.size <= 1:
        return 1  # degenerate (e.g. 2×2 grid); FFTKernel will clamp anyway.

    # Closest non-zero ring to the coord-set median k-NN distance.
    # ``argmin`` on a slice starting at index 1 keeps the self-ring out.
    diffs = np.abs(unique_dists_sq[1:] - r_k_sq)
    return int(np.argmin(diffs)) + 1


[docs] def power_spectrum_2d_nufft( coords: np.ndarray, values: np.ndarray, grid_shape: tuple[int, int], spacing: tuple[float, float], unit_scale: float = 1.0, eps: float = 1e-6, center_coords: bool = True, ) -> np.ndarray: """ Compute the 2D power spectrum via type-1 NUFFT This function computes the power spectrum :math:`P(k) = |\\hat{c}(k)|^2` of one or more non-uniform spatial signals via a type-1 NUFFT. The output has the same ``(ny, nx)`` layout as :func:`quadsv.kernels.fft.power_spectrum_2d` with ``fft_solver='fft2'``: DC at ``[0, 0]``, Nyquist at ``[ny/2, nx/2]`` (when dimensions are even). Parameters ---------- coords : np.ndarray Non-uniform spatial coordinates, shape ``(n, 2)`` in the order ``(y, x)``. Values outside the physical domain implied by ``grid_shape`` and ``spacing`` are folded into ``[-π, π)`` by finufft. values : np.ndarray Signal strengths at each coordinate. Shape ``(n,)`` for a single feature, or ``(n, M)`` for ``M`` stacked features (e.g., genes) on the same coordinates. Real-valued; promoted to complex internally. grid_shape : tuple[int, int] ``(ny, nx)`` of the target uniform k-space grid. Match whatever grid you use for rasterized samples so the two paths produce comparable spectra. spacing : tuple[float, float] ``(dy, dx)`` physical spacing per cell of the target grid, in the same unit as ``unit_scale * coords``. Together with ``grid_shape`` this defines the physical domain extent ``(ny · dy, nx · dx)``. unit_scale : float, default 1.0 Multiplier applied to ``coords`` before scaling into ``[-π, π)``. Use this to convert per-sample coordinate units into the common unit of ``spacing`` (e.g., 0.35 if ``coords`` are in Visium full-res pixels at 0.35 μm/pixel and ``spacing`` is in μm). eps : float, default 1e-6 NUFFT tolerance forwarded to finufft. center_coords : bool, default True If True, subtract the mean of ``coords`` before scaling — avoids wrapping artefacts when coordinates are stored with an arbitrary origin offset (e.g., Visium pixel coordinates start at a few thousand). Power spectra are translation-invariant so recentering does not change the result. Returns ------- np.ndarray Power spectrum. Shape ``(ny, nx)`` for 1D ``values`` or ``(ny, nx, M)`` for 2D ``values``, with DC at index ``[0, 0]``. Raises ------ ImportError If :mod:`finufft` is not installed. ValueError If input shapes are inconsistent. Examples -------- >>> import numpy as np >>> coords = np.random.default_rng(0).uniform(0, 100, size=(500, 2)) >>> vals = np.random.default_rng(1).standard_normal(500) >>> P = power_spectrum_2d_nufft(coords, vals, grid_shape=(32, 32), spacing=(4.0, 4.0)) >>> P.shape (32, 32) """ if coords.ndim != 2 or coords.shape[1] != 2: raise ValueError(f"coords must have shape (n, 2), got {coords.shape}.") if values.shape[0] != coords.shape[0]: raise ValueError( f"values first dim {values.shape[0]} must match coords n={coords.shape[0]}." ) ny, nx = grid_shape dy, dx = spacing if ny <= 0 or nx <= 0 or dy <= 0 or dx <= 0: raise ValueError(f"grid_shape and spacing must be positive, got {grid_shape}, {spacing}.") y = coords[:, 0].astype(np.float64) * unit_scale x = coords[:, 1].astype(np.float64) * unit_scale if center_coords: y = y - y.mean() x = x - x.mean() # Physical domain extents implied by the target uniform grid. Ly = ny * dy Lx = nx * dx # Scale into finufft's [-π, π) window so that mode index k (centred # at zero, range [-n/2, (n-1)/2]) corresponds to physical frequency # k / L cycles per unit length — matching np.fft.fftfreq(n, d). y_scaled = y * (2.0 * np.pi / Ly) x_scaled = x * (2.0 * np.pi / Lx) # Batched transforms: finufft accepts shape (n_tr, M) for c. squeeze = values.ndim == 1 if squeeze: c = values.astype(np.complex128, copy=False) else: # finufft expects (n_tr, N_points). c = np.ascontiguousarray(values.T.astype(np.complex128, copy=False)) # type-1 NUFFT: nonuniform points -> uniform k-space grid. # Output shape: (ny, nx) or (n_tr, ny, nx). DC at CENTRE ([ny//2, nx//2]). f_hat = finufft.nufft2d1(y_scaled, x_scaled, c, n_modes=(ny, nx), eps=eps, isign=-1) # Power spectrum. power = (f_hat.real**2 + f_hat.imag**2).astype(np.float64) # Move DC from the centre to [0, 0] so the layout matches scipy.fft.fft2. power = np.fft.ifftshift(power, axes=(-2, -1)) if squeeze: return power # Put the feature axis back at the end to match power_spectrum_2d(x=(ny, nx, M)). return np.moveaxis(power, 0, -1)
# --------------------------------------------------------------------------- # NUFFTKernel: translation-invariant kernel on irregular spatial points # ---------------------------------------------------------------------------
[docs] class NUFFTKernel(Kernel): """ Spatial kernel over **irregular** 2D coordinates evaluated via NUFFTs. Parallels :class:`quadsv.fft.FFTKernel` (which requires a regular grid) and implements the :class:`~quadsv.kernels.Kernel` interface so it plugs into :func:`quadsv.statistics.spatial_q_test` / :func:`quadsv.statistics.spatial_r_test` the same way. The band-limited approximation of the ``n × n`` irregular-point operator is ``K ≈ (1/n') · U · diag(λ) · Uᴴ``, where ``U`` is the ``n × n'`` type-2 NUFFT matrix and ``λ = F(K')`` is the grid kernel's spectrum. Under this approximation, Parseval's identity gives the fast quadratic form ``xᵀ K x = (1/n') Σ_k λ(k) |x̂(k)|²`` with ``x̂ = Uᴴ x`` (a single type-1 NUFFT). The matrix-vector primitive :meth:`Kx` uses the companion two-shot NUFFT ``K z = (1/n') · U · (λ ⊙ Uᴴ z)`` and backs the Hutchinson-based cumulant estimator (:func:`quadsv.statistics._hutchinson_cumulants`) and the bipartite R-test cross matrix. :func:`quadsv.spatial_q_test` always uses the k-space Parseval path (:meth:`xtKx`); :func:`quadsv.spatial_r_test` dispatches on shape — paired diagonal for ``M_x == M_y`` via :meth:`xtKy`, full ``(M_x, M_y)`` cross matrix via :meth:`Kx` otherwise. The matmul counterparts (:meth:`xtKx_matmul` / :meth:`xtKy_matmul`) are exposed for callers that want to compute the same form directly at the ``n`` irregular points — they agree with the spectral path to NUFFT precision (``eps``) on both regular and irregular coordinates. Parameters ---------- coords : np.ndarray Spot coordinates of shape ``(n, 2)`` in order ``(y, x)``. grid_shape : tuple[int, int], optional ``(ny, nx)`` of the internal uniform k-grid. If ``None`` (default), auto-inferred from ``coords``: the grid is sized to cover the bounding box and to resolve the sampling Nyquist set by the median nearest-neighbor distance (fully coordinate-driven, kernel-agnostic). Override only when you know you need a finer or coarser grid. spacing : tuple[float, float], optional ``(dy, dx)`` physical spacing per k-grid cell (same unit as ``coords`` after ``unit_scale``). If ``None`` (default), auto-inferred alongside ``grid_shape``. When both are supplied, users are responsible for ensuring ``ny · dy``, ``nx · dx`` covers the coordinate extent. method : str, default ``'matern'`` Kernel method forwarded to :class:`FFTKernel`. One of ``'gaussian'``, ``'matern'``, ``'moran'``, ``'graph_laplacian'``, ``'car'``. unit_scale : float, default 1.0 Multiplier applied to ``coords`` so they share the same unit as ``spacing`` (e.g. ``0.35`` if coords are in pixels at 0.35 μm/pixel). oversample : float, default 2.0 Auto-grid oversampling factor above the sampling Nyquist. Used only when ``grid_shape`` / ``spacing`` are auto-derived. Larger values give a finer k-grid (more accurate, slower); 2.0 is safe for all tested kernels. eps : float, default 1e-6 NUFFT tolerance forwarded to finufft. workers : int, optional Forwarded to :mod:`scipy.fft` (used by :meth:`Kx_grid`) and reserved for future finufft parallelism. ``None`` uses the SciPy default. **kwargs Method-specific kernel hyperparameters forwarded to the internal :class:`FFTKernel`. ``bandwidth`` / ``nu`` for ``gaussian`` / ``matern``; ``rho`` for ``car``; plus a *coord-aware* ``k_neighbors`` for the graph methods (``moran`` / ``graph_laplacian`` / ``car``). Note the ``k_neighbors``-to-``neighbor_degree`` mapping differs from that of :class:`FFTKernel` due to an oversampled internal grid. ``neighbor_degree`` is chosen so the internal-grid-ring cutoff matches the median k-th nearest-neighbor distance among the coords — matching :class:`~quadsv.kernels.MatrixKernel`'s mutual-k-NN graph semantic up to the band-limit of the internal grid. See :func:`_resolve_k_neighbors_on_coords` for the mapping detail. Pass ``neighbor_degree`` directly to bypass this and use the raw grid-ring semantic as in :class:`FFTKernel`. Attributes ---------- coords : np.ndarray Original ``(n, 2)`` coordinates. n : int Number of spots ``n``. grid_shape : tuple[int, int] Internal k-grid shape ``(ny, nx)``. spacing : tuple[float, float] Physical spacing per k-grid cell ``(dy, dx)``. method : str Kernel method name. params : dict Resolved kernel hyperparameters (snapshot of the internal FFT kernel). workers : int or None scipy.fft worker count used by :meth:`Kx_grid`. stores_precision : bool Always ``False`` — NUFFTKernel never holds an ``n × n`` matrix. """ _available_kernels = ["gaussian", "matern", "moran", "graph_laplacian", "car"] def __init__( # noqa: C901 self, coords: np.ndarray, grid_shape: tuple[int, int] | None = None, spacing: tuple[float, float] | None = None, method: str = "matern", unit_scale: float = 1.0, oversample: float = 2.0, eps: float = 1e-6, workers: int | None = None, *, centering: bool = True, **kwargs, ) -> None: """Construct a translation-invariant kernel over irregular 2D coordinates. See the class docstring for a full parameter / attribute reference; in brief: Parameters ---------- coords : np.ndarray ``(n, 2)`` spot coordinates in ``(y, x)`` order. grid_shape, spacing : tuple or None Internal k-grid shape and per-cell spacing. Both optional; auto-inferred from ``coords`` when either is missing. method : str, default ``'matern'`` Kernel method forwarded to the internal :class:`FFTKernel`. unit_scale : float, default 1.0 Coord → physical-unit multiplier so ``coords * unit_scale`` is in the same unit as ``spacing``. oversample : float, default 2.0 Auto-grid oversampling above the sampling Nyquist. eps : float, default 1e-6 NUFFT tolerance. workers : int, optional scipy.fft worker count used by :meth:`Kx_grid`. **kwargs Method-specific kernel hyperparameters. ``bandwidth`` / ``nu`` for ``gaussian`` / ``matern``; ``rho`` for ``car``; and a coord-aware ``k_neighbors`` (resolved to a grid-ring cutoff matching the median k-NN distance of the coords, see the class docstring) for ``moran`` / ``graph_laplacian`` / ``car``. Pass ``neighbor_degree`` directly to override and use the internal grid's ring semantic. Raises ------ ValueError If ``coords`` has the wrong shape, ``method`` is unknown, or ``grid_shape`` / ``spacing`` are invalid. """ coords = np.asarray(coords, dtype=np.float64) if coords.ndim != 2 or coords.shape[1] != 2: raise ValueError(f"coords must be shape (n, 2), got {coords.shape}.") if method not in self._available_kernels: raise ValueError(f"method must be one of {self._available_kernels}, got '{method}'.") # Auto-derive grid_shape / spacing from coords alone when either is missing. # Coordinate-driven: picks a k-grid that resolves the sampling Nyquist. if grid_shape is None or spacing is None: auto_gs, auto_sp = _infer_grid_from_coords( coords, unit_scale=unit_scale, oversample=oversample ) if grid_shape is None: grid_shape = auto_gs if spacing is None: spacing = auto_sp logger.info( "NUFFTKernel auto-inferred grid_shape=%s spacing=%s from %d coords.", grid_shape, spacing, coords.shape[0], ) ny, nx = int(grid_shape[0]), int(grid_shape[1]) if ny < 4 or nx < 4: raise ValueError(f"grid_shape must be at least (4, 4), got ({ny}, {nx}).") dy, dx = float(spacing[0]), float(spacing[1]) if dy <= 0 or dx <= 0: raise ValueError(f"spacing must be positive, got ({dy}, {dx}).") super().__init__(centering=centering)
[docs] self.coords: np.ndarray = coords
[docs] self.n: int = coords.shape[0]
[docs] self.grid_shape: tuple[int, int] = (ny, nx)
[docs] self.spacing: tuple[float, float] = (dy, dx)
[docs] self.method: str = method
self._unit_scale: float = float(unit_scale) self._eps: float = float(eps)
[docs] self.workers: int | None = workers
[docs] self.stores_precision: bool = False
# Graph-kernel ``k_neighbors`` semantic: on irregular coords the # user intends the MatrixKernel-style mutual-k-NN graph on the # actual coord set, not the grid-cell-counting ring that # FFTKernel's ``_k_neighbors_to_degree`` would produce. # Resolve it here using the coord density so the grid-ring # cutoff matches the median k-th NN distance among the coords. if method in ("moran", "graph_laplacian", "car") and "k_neighbors" in kwargs: if "neighbor_degree" in kwargs: raise ValueError( "Pass either 'k_neighbors' (coord-k-NN semantic) or " "'neighbor_degree' (grid-ring semantic), not both." ) k_nn = kwargs.pop("k_neighbors") kwargs["neighbor_degree"] = _resolve_k_neighbors_on_coords( coords, k_neighbors=int(k_nn), grid_shape=(ny, nx), spacing=(dy, dx), unit_scale=unit_scale, ) # Stash the original request so callers / tests can inspect it. self._k_neighbors_requested: int | None = int(k_nn) else: self._k_neighbors_requested = None # Internal FFTKernel holds the eigenvalue spectrum on the k-grid. We # use fft2 (full spectrum) so the ifftshift trick aligns NUFFT output # with the scipy FFT layout (DC at [0, 0]). # Internal FFTKernel is always kept on the *raw* spectrum — we own # centering at the NUFFT level (the constant mode of K on scattered # coords isn't the FFT grid's DC mode in general). self._fft_kernel = FFTKernel( shape=(ny, nx), spacing=(dy, dx), topology="square", method=method, fft_solver="fft2", workers=workers, centering=False, **kwargs, )
[docs] self.params: dict = dict(self._fft_kernel.params)
if self._k_neighbors_requested is not None: self.params["k_neighbors"] = self._k_neighbors_requested # Pre-scale coords into finufft's [-π, π) window. Centered so we avoid # origin-offset phase artefacts. y = coords[:, 0] * self._unit_scale x = coords[:, 1] * self._unit_scale self._y_mean = float(y.mean()) self._x_mean = float(x.mean()) self._y_scaled = (y - self._y_mean) * (2.0 * np.pi / (ny * dy)) self._x_scaled = (x - self._x_mean) * (2.0 * np.pi / (nx * dx)) # ------------------------------------------------------------------ def __repr__(self) -> str: # pragma: no cover return ( f"<NUFFTKernel method={self.method} n={self.n} " f"grid={self.grid_shape} spacing={self.spacing} " f"params={self.params}>" ) def __str__(self) -> str: # pragma: no cover return ( f"NUFFTKernel\n" f"- Method: {self.method}\n" f"- Number of spots: {self.n}\n" f"- k-grid: {self.grid_shape} at spacing {self.spacing}\n" f"- Params: {self.params}" ) # ------------------------------------------------------------------ _TOEPLITZ_R_THRESHOLD: int = 2000 _TOEPLITZ_LAM_TOL: float = 1e-5 def _coord_phi(self) -> np.ndarray: """Coord-density function ``φ(k)`` on k-grid (DC at [0, 0]). ``φ(Δ) = (1/n) · Σ_i exp(-iΔ·y_i)`` — one type-1 NUFFT of the all-ones vector. Symmetrized so ``φ[k] = conj(φ[-k mod N])`` holds exactly (fixes Nyquist self-conjugate bins that NUFFT samples as complex on even-length grids; needed for strict Hermiticity of the induced ``G_{k,k'} = n·φ(k'-k)``). Cached on the instance (coord-only). """ cache = getattr(self, "_phi_cache", None) if cache is not None: return cache ones = np.ones(self.n, dtype=float) phi_hat_centered = self._nufft_type1(ones).squeeze() # finufft DC-centered phi = np.fft.ifftshift(phi_hat_centered) / self.n # DC at [0, 0] ny, nx = phi.shape ii = np.arange(ny)[:, None] jj = np.arange(nx)[None, :] phi = 0.5 * (phi + np.conj(phi[(-ii) % ny, (-jj) % nx])) self._phi_cache = phi return phi def _eigvals_toeplitz_M(self, lam_tol: float | None = None) -> np.ndarray | None: """Full-spectrum eigenvalues via the reduced Toeplitz-``G`` matrix. Uses the identity (for PSD ``Λ``) that non-zero eigvals of ``K_n = (1/n') U Λ Uᴴ`` equal non-zero eigvals of ``M = (1/n') Λ^{1/2} G Λ^{1/2}`` with ``G_{k,k'} = n·φ(k'-k)`` (Toeplitz in k-space, built once from ``_coord_phi``). Truncate ``Λ`` to its support — pick the ``r`` modes with ``|λ(k)| > lam_tol · max|λ|`` — form the dense ``r × r`` ``M_r`` by indexed ``φ`` lookups, ``eigvalsh``. For ``centering=True`` we apply the rank-1 shift ``G → G_H = G − (1/n) · (Uᴴ𝟏)(Uᴴ𝟏)ᴴ = G − n · conj(φ) φᵀ``, which accounts for the y-space ``H = I − 𝟏𝟏ᵀ/n`` projection ((HU)ᴴ(HU) = UᴴHU = G − (1/n)(Uᴴ𝟏)(Uᴴ𝟏)ᴴ). Returns ``None`` when the formula doesn't apply cleanly: - Indefinite ``Λ`` (``moran``, anti-phase eigenmodes) — the signed square root makes ``M`` non-Hermitian. - Broad spectrum with ``r > _TOEPLITZ_R_THRESHOLD`` — dense ``r × r`` eigvalsh becomes the bottleneck; callers should route through cumulant-based Liu (:func:`compute_null_params(..., liu_n_probes=...)`) instead of asking for the full spectrum. Returns ------- np.ndarray or None Eigenvalues (descending) or ``None`` to signal "no full spectrum available for this configuration". """ lam = self._fft_kernel.spectrum.reshape(self.grid_shape) # PSD detection — gaussian/matern/car/graph_laplacian are PSD by # construction; their FFT spectra carry sub-1% numerical noise # that shouldn't disqualify Toeplitz-M. Only flag truly # indefinite kernels (moran, sign-balanced Λ). psd_methods = ("gaussian", "matern", "car", "graph_laplacian") lam_abs_max = float(np.abs(lam).max()) if self.method in psd_methods: if lam_abs_max > 0: # Sanity clip: even PSD kernels shouldn't show a negative # lobe bigger than a few percent of the peak. if lam.min() < -0.05 * lam_abs_max: return None elif lam.min() < -1e-3 * lam_abs_max: return None # truly indefinite lam_flat = lam.ravel() lam_max = float(np.abs(lam_flat).max()) if lam_max == 0: return np.zeros(self.n) tol_rel = self._TOEPLITZ_LAM_TOL if lam_tol is None else float(lam_tol) mask_flat = np.abs(lam_flat) > tol_rel * lam_max r = int(mask_flat.sum()) if r > self._TOEPLITZ_R_THRESHOLD: return None # too broad for dense eigvalsh on r × r reduced M ny, nx = self.grid_shape nprime = ny * nx mask = mask_flat.reshape(ny, nx) iy, ix = np.where(mask) lam_r_sqrt = np.sqrt(np.maximum(lam_flat[mask_flat], 0.0)) phi = self._coord_phi() dy = (iy[None, :] - iy[:, None]) % ny dx = (ix[None, :] - ix[:, None]) % nx G_r = self.n * phi[dy, dx] if self.centering: # (HU)ᴴ(HU) = G − (1/n)(Uᴴ𝟏)(Uᴴ𝟏)ᴴ = G − n · conj(φ_r) φ_rᵀ. # Rank-1 outer product on the restricted mode set. phi_r = phi[iy, ix] G_r = G_r - self.n * np.outer(np.conj(phi_r), phi_r) M_r = (lam_r_sqrt[:, None] * G_r * lam_r_sqrt[None, :]) / nprime M_r = 0.5 * (M_r + M_r.conj().T) vals = np.linalg.eigvalsh(M_r) vals = np.real(vals) # For PSD λ the operator ``K_n = (1/n') U Λ Uᴴ`` and ``HKH`` are # both PSD — any negative eigenvalue from ``eigvalsh`` is pure # finite-precision noise on the near-null subspace. Clip to 0. vals = np.maximum(vals, 0.0) return np.sort(vals)[::-1]
[docs] def eigenvalues(self, k: int | None = None, return_full_layout: bool = False) -> np.ndarray: """Eigenvalues of the ``n × n`` irregular-point operator. Returns the spectrum of the realized operator ``K_n`` (raw) or ``H K_n H`` (centered) at the irregular coordinates — **not** the internal FFT-grid spectrum. Purely matrix-free: no dense ``n × n`` construction. Two paths: - **``k`` given — Lanczos top-k.** Wrap :meth:`Kx` as a :class:`~scipy.sparse.linalg.LinearOperator` and call :func:`~scipy.sparse.linalg.eigsh(which='LM')` for the top ``k`` eigenvalues by magnitude. Cost: ``O(k × nufft)``. Not cached. - **``k=None`` — Toeplitz-M (analytic).** See :meth:`_eigvals_toeplitz_M`. Applies when ``Λ`` is PSD and the support size ``r = #{k : |λ(k)| > 10⁻⁵ · max|λ|}`` is below ``_TOEPLITZ_R_THRESHOLD`` (default 2000). Exact to NUFFT ``eps``; cost dominated by ``O(r³)`` eigvalsh on a dense ``r × r`` reduced matrix. Cached per ``centering`` mode. Full-spectrum requests that fall outside Toeplitz-M's reach (indefinite ``Λ`` like Moran, or broad-support kernels like CAR at strong coupling) raise ``NotImplementedError`` rather than falling back to an approximate density reconstruction. For Liu's method, use :func:`compute_null_params(..., method='liu', liu_n_probes=...)` to get cumulant-based Liu directly from :math:`2m` matvecs. Parameters ---------- k : int, optional Top-``k`` eigenvalues by magnitude (Lanczos). ``None`` returns the full spectrum via Toeplitz-M. return_full_layout : bool, default False Kept for API compatibility with :meth:`FFTKernel.eigenvalues`; ignored. Returns ------- np.ndarray Eigenvalues in descending order. Length ``k`` when ``k`` is given, ``n`` for the full-spectrum path. """ del return_full_layout # kept only for API compatibility cache_key = "_spectrum_centered" if self.centering else "_spectrum_raw" n = self.n # Top-k: Lanczos. Not cached. if k is not None: from scipy.sparse.linalg import LinearOperator, eigsh def _matvec(v: np.ndarray) -> np.ndarray: return np.asarray(self.Kx(np.asarray(v, dtype=float))) op = LinearOperator(shape=(n, n), matvec=_matvec, dtype=float) k_req = min(int(k), n - 2) vals, _ = eigsh(op, k=k_req, which="LM") return np.sort(np.real(vals))[::-1] # Full spectrum via Toeplitz-M (cached). cached = getattr(self, cache_key, None) if cached is not None and len(cached) == n: return cached toep = self._eigvals_toeplitz_M() if toep is None: raise NotImplementedError( "Full NUFFT spectrum unavailable for this configuration " "(indefinite spectrum or broad spectral support: r > " f"{self._TOEPLITZ_R_THRESHOLD}). Use " "`compute_null_params(..., method='liu', liu_n_probes=60)` " "for Liu's approximation via Hutchinson-estimated " "cumulants, or `eigenvalues(k=...)` for a Lanczos " "top-k." ) pad = max(0, n - len(toep)) spectrum = np.concatenate([toep, np.zeros(pad)]) if pad else toep[:n] setattr(self, cache_key, spectrum) return spectrum
# ------------------------------------------------------------------ # Path A primitives — k-space Parseval (default for xtKx / xtKy) # ------------------------------------------------------------------ def _nufft_type1(self, x: np.ndarray) -> np.ndarray: """Type-1 NUFFT of ``x`` onto the k-grid at the cached scaled coords. Computes ``x̂(k) = Σ_j x_j exp(-i k · r̃_j)`` where ``r̃_j`` are the mean-centered, ``2π/(n·d)``-scaled versions of the input coordinates. Shared primitive for :meth:`xtKx` (takes ``|·|²``), :meth:`xtKy` (complex inner product), :meth:`Kx`, and :meth:`Kx_grid`. Parameters ---------- x : np.ndarray ``(n,)`` or ``(n, M)``. Returns ------- np.ndarray Complex ``(M, ny, nx)`` (always 3-D; ``M=1`` for 1-D input). DC at the array centre (finufft convention); callers that want the scipy-FFT layout must ``ifftshift`` along the last two axes. """ if x.shape[0] != self.n: raise ValueError(f"x first dim {x.shape[0]} does not match n={self.n}.") ny, nx = self.grid_shape x_in = x[:, None] if x.ndim == 1 else x c = np.ascontiguousarray(x_in.T.astype(np.complex128)) # (M, N) return finufft.nufft2d1( self._y_scaled, self._x_scaled, c, n_modes=(ny, nx), eps=self._eps, isign=-1, ) # (M, ny, nx)
[docs] def xtKx(self, x: np.ndarray) -> float | np.ndarray: """Quadratic form ``xᵀ K x`` via **k-space Parseval**. Implements the default path: .. math:: x^T K x \\;=\\; \\frac{1}{n'} \\sum_k \\lambda(k) \\, |\\hat x(k)|^{2} using one type-1 NUFFT of ``x`` and an elementwise Parseval sum. Only the real power spectrum ``|x̂|²`` of shape ``(ny, nx)`` is materialized — no ``ifft2``, no spatial-grid copy of ``x``. Parameters ---------- x : np.ndarray ``(n,)`` for one feature or ``(n, M)`` for ``M`` features. Returns ------- float or np.ndarray Scalar if ``x`` is 1-D, shape ``(M,)`` otherwise. See Also -------- xtKx_matmul : Compute ``xᵀ · Kx`` via the length-``n`` matrix product. """ ny, nx = self.grid_shape # HKH quadratic form = raw K on H x. Center per-feature (column mean). if self.centering: if x.ndim == 1: x = x - x.mean() else: x = x - x.mean(axis=0, keepdims=True) x_hat_centered = self._nufft_type1(x) # (M, ny, nx) power = x_hat_centered.real**2 + x_hat_centered.imag**2 # (M, ny, nx) # Spectrum is stored in scipy FFT layout (DC at [0,0]); fftshift → centered # to match the NUFFT output before multiplying. lam = np.fft.fftshift(self._fft_kernel.spectrum.reshape(ny, nx)) Q = np.sum(lam[None, :, :] * power, axis=(1, 2)) / (ny * nx) if x.ndim == 1: return float(Q[0]) return Q.astype(np.float64)
[docs] def xtKy(self, x: np.ndarray, y: np.ndarray) -> float | np.ndarray: """Bilinear form ``xᵀ K y`` via **cross Parseval**. Implements the default path: .. math:: x^T K y \\;=\\; \\frac{1}{n'} \\sum_k \\lambda(k) \\, \\overline{\\hat x(k)}\\, \\hat y(k). Paired same-``M`` convention — returns the diagonal of ``Xᵀ K Y`` (shape ``(M,)``) for batched inputs, scalar for 1-D inputs. For the bipartite ``(M_x, M_y)`` cross matrix use :meth:`xtKy_matmul` (or build it explicitly via ``X.T @ self.Kx(Y)``). Parameters ---------- x, y : np.ndarray ``(n,)`` or ``(n, M)`` — must share shape. Returns ------- float or np.ndarray Scalar for 1-D inputs; ``(M,)`` for batched. """ if x.shape[0] != self.n or y.shape[0] != self.n: raise ValueError( f"x, y first dim must equal n={self.n}; got {x.shape[0]}, {y.shape[0]}." ) if x.shape != y.shape: raise ValueError(f"x and y must share shape; got {x.shape} vs {y.shape}.") ny, nx = self.grid_shape if self.centering: # x^T HKH y = (Hx)^T K (Hy) — subtract per-feature means on both sides. if x.ndim == 1: x = x - x.mean() y = y - y.mean() else: x = x - x.mean(axis=0, keepdims=True) y = y - y.mean(axis=0, keepdims=True) x_hat = self._nufft_type1(x) # (M, ny, nx) complex y_hat = self._nufft_type1(y) lam = np.fft.fftshift(self._fft_kernel.spectrum.reshape(ny, nx)) cross = np.real(np.conj(x_hat) * y_hat) * lam[None, :, :] R = np.sum(cross, axis=(1, 2)) / (ny * nx) if x.ndim == 1: return float(R[0]) return R.astype(np.float64)
# ------------------------------------------------------------------ # Path B primitives — n-point vector via NUFFT round-trip # ------------------------------------------------------------------
[docs] def Kx(self, z: np.ndarray) -> np.ndarray: """Matrix–vector product ``K z`` at the ``n`` irregular coordinates. Implements the band-limited apply .. math:: K z \\;\\approx\\; \\tfrac{1}{n'} \\, U \\bigl(\\lambda \\odot U^{\\mathsf H} z\\bigr), evaluated as type-1 NUFFT → elementwise multiply by ``λ(k) / n'`` → type-2 NUFFT. Output length ``n``, same shape as ``z``. Base primitive for :meth:`xtKx_matmul`, :meth:`xtKy_matmul`, the Hutchinson cumulant estimator used by Liu's null approximation, and the bipartite R-test in :class:`DetectorIrregular`. Parameters ---------- z : np.ndarray ``(n,)`` or ``(n, M)``. Returns ------- np.ndarray Same shape as ``z``. """ if z.shape[0] != self.n: raise ValueError(f"z first dim {z.shape[0]} does not match n={self.n}.") ny, nx = self.grid_shape lam_centred = np.fft.fftshift(self._fft_kernel.spectrum.reshape(ny, nx)) squeeze = z.ndim == 1 # HKH z = H · K · H z: center input pre-NUFFT, center output post. if self.centering: if squeeze: z = z - z.mean() else: z = z - z.mean(axis=0, keepdims=True) z_hat = self._nufft_type1(z) # (M, ny, nx) complex, DC centred out_k = np.ascontiguousarray( (lam_centred[None, :, :] * z_hat / (ny * nx)).astype(np.complex128) ) Kz = finufft.nufft2d2( self._y_scaled, self._x_scaled, out_k, eps=self._eps, isign=+1, ) # (M, n) Kz = np.real(Kz).T # (n, M) if self.centering: if squeeze: Kz = Kz - Kz.mean(axis=0, keepdims=True) else: Kz = Kz - Kz.mean(axis=0, keepdims=True) return Kz[:, 0] if squeeze else Kz
[docs] def xtKx_matmul(self, x: np.ndarray) -> float | np.ndarray: """Quadratic form ``xᵀ K x`` via **direct matmul**. Computes ``Q_B = xᵀ · self.Kx(x)`` end-to-end at the ``n`` irregular points. Sparse-aware on ``x`` (``x.multiply(Kx).sum``). ~2× the NUFFT work of :meth:`xtKx` per feature; agrees with it to NUFFT precision on regular grids and to the torus-BC band (~1–2 %) on irregular ones. Parameters ---------- x : np.ndarray or scipy.sparse matrix ``(n,)`` or ``(n, M)``. Returns ------- float or np.ndarray Scalar for 1-D input; ``(M,)`` for batched. """ if sp.issparse(x): if x.ndim == 1 or (x.shape[1] == 1 and x.shape[0] == self.n): x_sp = x.reshape(-1, 1) squeeze = True else: x_sp = x squeeze = False Kx_dense = self.Kx(x_sp.toarray()) result = np.asarray(x_sp.multiply(Kx_dense).sum(axis=0)).ravel() return float(result[0]) if squeeze else result arr = np.asarray(x, dtype=float) Kx_dense = self.Kx(arr) if arr.ndim == 1: return float(np.dot(arr, Kx_dense)) return np.sum(arr * Kx_dense, axis=0).astype(np.float64)
[docs] def xtKy_matmul(self, x: np.ndarray | sp.spmatrix, y: np.ndarray) -> float | np.ndarray: """Bilinear form ``xᵀ K y`` via **direct matmul**. Returns the paired ``(M,)`` diagonal of ``Xᵀ K Y`` (sparse-aware on ``x``). For the full ``(M_x, M_y)`` bipartite cross matrix build it explicitly as ``X.T @ self.Kx(Y)`` — that's what :class:`DetectorIrregular` does for ``compute_rstat`` when the two feature blocks have different widths. Parameters ---------- x, y : np.ndarray or scipy.sparse matrix ``(n,)`` or ``(n, M)``. Returns ------- float or np.ndarray Scalar for 1-D inputs; ``(M,)`` for batched. """ Ky = self.Kx(y) if sp.issparse(x): x_in = x.reshape(-1, 1) if x.ndim == 1 else x Ky_2d = Ky[:, None] if Ky.ndim == 1 else Ky result = np.asarray(x_in.multiply(Ky_2d).sum(axis=0)).ravel() return float(result[0]) if result.size == 1 else result x_arr = np.asarray(x, dtype=float) if x_arr.ndim == 1 and Ky.ndim == 1: return float(np.dot(x_arr, Ky)) x_mat = x_arr.reshape(-1, 1) if x_arr.ndim == 1 else x_arr Ky_mat = Ky.reshape(-1, 1) if Ky.ndim == 1 else Ky return np.sum(x_mat * Ky_mat, axis=0).astype(np.float64)
[docs] def Kx_grid(self, x: np.ndarray) -> np.ndarray: """Grid-domain companion of :meth:`Kx` — ``(ny, nx)`` spatial output. Whereas :meth:`Kx` returns the length-``n`` apply at the irregular coordinates, :meth:`Kx_grid` returns the apply evaluated on the internal uniform grid. Pipeline: type-1 NUFFT → undo the coordinate-centering phase (needed here because we keep complex coefficients; the square-magnitude and adjoint-round-trip paths of :meth:`xtKx` / :meth:`Kx` absorb it automatically) → multiply by ``λ(k)`` → ``ifftshift`` → ``ifft2`` → real. Parameters ---------- x : np.ndarray ``(n,)`` or ``(n, M)``. Returns ------- np.ndarray Real ``(ny, nx)`` or ``(ny, nx, M)`` in the scipy FFT layout (DC at ``[0, 0]``). """ ny, nx = self.grid_shape dy, dx = self.spacing squeeze = x.ndim == 1 x_hat_centered = self._nufft_type1(x) # (M, ny, nx), modes ∈ [-n/2, n/2-1] m_y = np.arange(ny) - ny // 2 m_x = np.arange(nx) - nx // 2 phase = ( np.exp(-1j * m_y * self._y_mean * 2.0 * np.pi / (ny * dy))[:, None] * np.exp(-1j * m_x * self._x_mean * 2.0 * np.pi / (nx * dx))[None, :] ) # Apply spectrum + undo centering phase in one pass. lam_centred = np.fft.fftshift(self._fft_kernel.spectrum.reshape(ny, nx)) weighted = x_hat_centered * phase[None, :, :] * lam_centred[None, :, :] # Shift DC to [0, 0] and inverse-FFT to spatial grid. weighted_shifted = np.fft.ifftshift(weighted, axes=(-2, -1)) Kx_grid = np.real( scipy.fft.ifft2(weighted_shifted, axes=(-2, -1), workers=self.workers) ) # (M, ny, nx) out = np.moveaxis(Kx_grid, 0, -1) # (ny, nx, M) return out[..., 0] if squeeze else out
# ------------------------------------------------------------------ # Null-moment estimators — doubled-grid linear-convolution analytic # (default) and Rademacher Hutchinson probe (opt-in second opinion). # ------------------------------------------------------------------ def _coord_power_spectrum_doubled(self) -> np.ndarray: """``|φ(j)|²`` on a doubled ``(2·ny, 2·nx)`` k-grid (DC-at-[0,0]). The analytic ``trace(K²)`` formula .. math:: \\operatorname{tr}(K_n^2) \\;=\\; \\frac{n^2}{n'^2} \\sum_{k,k'} \\lambda(k)\\,\\lambda(k')\\,|\\varphi(k'-k)|^{2} sums over differences ``Δ = k' - k`` that range in ``{-(ny-1), ..., ny-1}`` (per dim). Evaluating the sum as a 2D FFT convolution on the native ``(ny, nx)`` grid is a *circular* convolution at period ``n'``, which silently wraps values of ``|φ|²`` beyond ``ny/2`` — a valid approximation only when coords coincide exactly with the k-grid (``|φ|² = δ``, the regular-grid collapse). For irregular coords or a typical oversampled NUFFT grid (``n' > n``), ``|φ(j)|²`` is *not* periodic at ``n'`` and the wraparound over-estimates ``tr(K²)`` — up to ~45% on broad-spectrum kernels like CAR. This method evaluates ``|φ|²`` on a doubled ``(2·ny, 2·nx)`` grid via a separate type-1 NUFFT of the all-ones vector onto the doubled mode-range. Paired with zero-padding of ``λ`` to the same doubled layout in :meth:`square_trace`, the doubled-grid FFT convolution then realizes a true linear convolution on the ``Δ ∈ [-(ny-1), ny-1]`` support with no wraparound. Cost: one extra type-1 NUFFT on a ``2n'``-point mode grid (same coords); cached per instance (coord-only). Returns ------- np.ndarray ``(2·ny, 2·nx)`` real, non-negative, with ``|φ(0)|² = 1`` at ``[0, 0]`` (DC-at-origin layout). """ cache = getattr(self, "_phi2_doubled_cache", None) if cache is not None: return cache ny, nx = self.grid_shape ny2, nx2 = 2 * ny, 2 * nx ones = np.ones(self.n, dtype=complex) # One type-1 NUFFT of the ones vector onto the doubled mode grid. # Match _nufft_type1's (isign=-1, eps=self._eps) convention. phi_hat_centered = finufft.nufft2d1( self._y_scaled, self._x_scaled, ones, n_modes=(ny2, nx2), eps=self._eps, isign=-1, ) # (ny2, nx2) complex, finufft DC-centered phi = np.fft.ifftshift(phi_hat_centered) / self.n # DC at [0, 0] # Symmetrize so φ[k] = conj(φ[-k mod 2N]) holds exactly — fixes # the Nyquist-row self-conjugate bins that NUFFT samples as # complex on even-length grids. Mirrors :meth:`_coord_phi`. ii = np.arange(ny2)[:, None] jj = np.arange(nx2)[None, :] phi = 0.5 * (phi + np.conj(phi[(-ii) % ny2, (-jj) % nx2])) phi2 = np.abs(phi) ** 2 self._phi2_doubled_cache = phi2 return phi2
[docs] def trace(self) -> float: """``trace(K)`` (raw) or ``trace(HKH)`` (centered). Closed-form ``(n/n') · Σ_k λ(k)`` — exact because the diagonal of ``G = UᴴU`` is ``n`` regardless of coord arrangement, so ``trace(K_n) = (n/n') · trace(K_grid)`` is independent of the coord layout. Adjusts by ``-s₁/n`` when ``centering=True`` (``s₁ = 𝟏ᵀ K 𝟏`` via a single ``K·𝟏`` apply in :meth:`Kernel._ones_stats`). """ ny, nx = self.grid_shape raw = float(self._fft_kernel.trace() * self.n / (ny * nx)) if not self.centering: return raw s1, _ = self._ones_stats() return raw - s1 / self.n
[docs] def square_trace(self) -> float: """``trace(K²)`` (raw) or ``trace((HKH)²)`` (centered). Closed-form ``(n²/n'²) · λᵀ Ψ λ`` with Toeplitz ``Ψ_{k,k'} = |φ(k'-k)|²``, evaluated as a *linear* (non-circular) 2D convolution of ``|φ|²`` with ``λ`` via a doubled-grid FFT in ``O(n' log n')``. ``φ(j) = (1/n) Σ_i exp(-ij·y_i)`` is evaluated on a ``(2·ny, 2·nx)`` mode grid by a separate type-1 NUFFT of the ones vector (see :meth:`_coord_power_spectrum_doubled`), and ``λ`` is zero-padded to the same doubled layout so the FFT convolution does not wrap values of ``|φ|²`` across the ``n'``-period — a silent bias of up to ~45% on broad-spectrum kernels (CAR, graph_laplacian) on the typical oversampled NUFFT grid. On a regular grid where coords coincide with the k-grid, ``|φ|² = δ`` and the formula collapses to ``(n/n')² · Σ_k λ(k)²``. Adjusts by ``-2·s₂/n + s₁²/n²`` when ``centering=True``. Observed band-limit residuals vs. explicit ``Kx(I)`` truth: ≲ ``1e-7`` on Gaussian / Matern, ``~0.1 %`` on CAR, ``~1 %`` on graph_laplacian, and ``~0.05–1.2 %`` on Moran (indefinite ``Λ``) — accurate across regular, irregular, and clustered coord layouts. """ ny, nx = self.grid_shape nprime = ny * nx lam = self._fft_kernel.spectrum.reshape(ny, nx) phi2_d = self._coord_power_spectrum_doubled() # (2·ny, 2·nx) # Zero-pad ``λ`` to the doubled grid, preserving DC-at-origin. # Embedding the fftshift'd (DC-centered) ``λ`` at offset # ``(ny - ny//2, nx - nx//2)`` of the doubled centered array # aligns its DC bin with the doubled grid's DC and leaves the # freshly-introduced higher-frequency bins zero — the analytic # identity only needs ``λ`` supported on the original k-grid. lam_centered = np.fft.fftshift(lam) lam_pad_centered = np.zeros_like(phi2_d) sy, sx = ny - ny // 2, nx - nx // 2 lam_pad_centered[sy : sy + ny, sx : sx + nx] = lam_centered lam_pad = np.fft.ifftshift(lam_pad_centered) # Linear conv (2·ny, 2·nx). With ``lam_pad`` zero outside the # original support, the circular wraparound at period 2·n' only # reaches indices where ``lam_pad`` vanishes and contributes # nothing to the final sum. lam_f = np.fft.fft2(lam_pad) phi2_f = np.fft.fft2(phi2_d) conv = np.fft.ifft2(lam_f * phi2_f).real # (|φ|² ⋆ λ)(k) # Element-wise product with ``lam_pad`` auto-restricts the outer # sum to the original k-grid support. raw = (self.n**2 / nprime**2) * float(np.sum(lam_pad * conv)) # ``trace(K²) = Σᵢ μᵢ² ≥ 0`` for any real symmetric ``K``. Negative # values here are FFT-cancellation noise on indefinite ``Λ`` # (e.g. Moran), not a valid result — clip to zero. raw = max(raw, 0.0) if not self.centering: return raw s1, s2 = self._ones_stats() return max(raw - 2.0 * s2 / self.n + s1**2 / (self.n**2), 0.0)
# --------------------------------------------------------------------------- # Q-test and R-test on irregular spatial coordinates via NUFFT # --------------------------------------------------------------------------- def _standardize_features(X: np.ndarray) -> np.ndarray: """Z-score each column (ddof=1), leaving constant columns as zeros. Matches :func:`quadsv.statistics.spatial_q_test`'s convention. Used by the NUFFT dispatch to standardize at the ``n`` irregular points before the type-1 NUFFT. """ mu = X.mean(axis=0, keepdims=True) sd = X.std(axis=0, keepdims=True, ddof=1) out = np.zeros_like(X, dtype=float) valid = sd > 1e-12 np.divide(X - mu, sd, out=out, where=valid) return out def _q_test_nufft( # noqa: C901 Xn: np.ndarray, kernel: NUFFTKernel, null_params: dict | None = None, return_pval: bool = True, is_standardized: bool = False, ) -> float | np.ndarray | tuple[float, float] | tuple[np.ndarray, np.ndarray]: """ Spatial Q-test on irregular 2D coordinates. Computes ``Q = xᵀ K x`` via the k-space Parseval identity ``Q = (1/n') Σ_k λ(k) · |ẑ(k)|²`` (one type-1 NUFFT of ``z`` + a Parseval sum; see :meth:`NUFFTKernel.xtKx`). The length-``n`` matmul counterpart :meth:`NUFFTKernel.xtKx_matmul` computes the same form to NUFFT precision and is exposed for callers that prefer the direct round-trip; :func:`spatial_q_test` always uses the spectral path. Null moments route through :func:`quadsv.statistics.compute_null_params`, which on graph kernels defaults to the empirical moment estimator over ``HKH``-centered probes (see :meth:`NUFFTKernel.trace` / :meth:`NUFFTKernel.square_trace`). Standardization at the ``n`` irregular points is applied internally unless ``is_standardized=True``. Parameters ---------- Xn : np.ndarray ``(n,)`` or ``(n, M)``. kernel : NUFFTKernel null_params : dict, optional Pre-built moments (see :func:`quadsv.compute_null_params`). Read keys depend on the null approximation selected via ``null_params['method']``: ``'mean_Q'`` / ``'var_Q'`` for CLT, ``'scale_g'`` / ``'df_h'`` (or ``'mean_Q'`` / ``'var_Q'`` as fallback) for Welch, and ``'liu_coef'`` (preferred) or ``'cumulants'`` for Liu. Pass ``None`` to auto-build. return_pval : bool, default True is_standardized : bool, default False Returns ------- Q : float or np.ndarray pval : float or np.ndarray, optional """ Xn = np.asarray(Xn, dtype=float) batched = Xn.ndim == 2 X_in = Xn if batched else Xn[:, None] if X_in.shape[0] != kernel.n: raise ValueError(f"Xn first dim {X_in.shape[0]} does not match kernel.n={kernel.n}.") z = X_in if is_standardized else _standardize_features(X_in) # Spectral Parseval path — agrees with xtKx_matmul to NUFFT precision # (verified on both regular and irregular grids). Keeping only one path # here avoids a stateful switch on the kernel object. Q_arr = np.atleast_1d(kernel.xtKx(z)).ravel() if not return_pval: return Q_arr if batched else float(Q_arr[0]) # Dispatch on the user-selected null approximation. This mirrors the # MatrixKernel path in `spatial_q_test`: the caller picks one of # {'clt', 'welch', 'liu'} via ``null_params['method']``; defaults keep # backward-compatible behavior — CLT for Moran (its kernel is # indefinite so Welch/Liu are degenerate) and Liu for the PSD kernels. if null_params is not None and "method" in null_params: null_approx = str(null_params["method"]) else: null_approx = "clt" if kernel.method == "moran" else "liu" if kernel.method == "moran" and null_approx != "clt": raise ValueError( f"Moran's I kernel is indefinite; only null_method='clt' is " f"supported for the Q-test. Got method={null_approx!r}." ) def _get_mean_var() -> tuple[float, float]: """Mean/var of Q under H0 — from user-supplied params or recompute.""" if null_params is not None and "mean_Q" in null_params and "var_Q" in null_params: return float(null_params["mean_Q"]), float(null_params["var_Q"]) # Route through compute_null_params to pick up the H-centering + # finite-n ratio correction; for NUFFT graph kernels the # ``'empirical'`` default on trace()/square_trace() ensures the # corrections capture the spreading-kernel smoothing too. from quadsv.statistics import compute_null_params p = compute_null_params(kernel, method="clt") return float(p["mean_Q"]), float(p["var_Q"]) if null_approx == "clt": mean_Q, var_Q = _get_mean_var() sigma = float(np.sqrt(var_Q)) if sigma <= 1e-12: pvals = np.ones_like(Q_arr) else: z_scores = (Q_arr - mean_Q) / sigma pvals = chi2.sf(z_scores**2, df=1) elif null_approx == "welch": # Welch-Satterthwaite: Q ~ g · χ²(df=h) with g = var / (2·mean), # h = 2·mean² / var. Requires mean > 0 (PSD kernel). if null_params is not None and "scale_g" in null_params and "df_h" in null_params: g = float(null_params["scale_g"]) h = float(null_params["df_h"]) else: mean_Q, var_Q = _get_mean_var() if mean_Q <= 0 or var_Q <= 0: pvals = np.ones_like(Q_arr) g = h = None else: g = var_Q / (2.0 * mean_Q) h = 2.0 * mean_Q**2 / var_Q if g is not None: pvals = chi2.sf(Q_arr / g, df=h) elif null_approx == "liu": from quadsv.statistics import ( _hutchinson_cumulants, _liu_apply, _liu_prepare, _liu_prepare_from_cumulants, ) # Prefer cached Liu coefficients from compute_null_params; derive # from caller-supplied ``cumulants`` otherwise. ``n`` is passed # for the Dirichlet(1/2) variance correction — essential on # broad-spectrum PSD kernels (CAR / graph_laplacian) where # c_1² ≈ m·c_2 would otherwise inflate sigma_Q by O(10×). n_kernel = int(kernel.n) coef = None if null_params is None else null_params.get("liu_coef") if coef is None and null_params is not None and "cumulants" in null_params: coef = _liu_prepare_from_cumulants(null_params["cumulants"], n=n_kernel) if coef is None: # ``null_params`` empty or just ``{"method": "liu"}`` — build # the coef from the kernel directly. Any other caller-supplied # keys without ``liu_coef`` / ``cumulants`` is considered # malformed and raises. if null_params is not None and null_params.keys() - {"method"}: raise ValueError( "null_params with method='liu' must contain either " "'liu_coef' (preferred) or 'cumulants'. Build via " "compute_null_params(kernel, method='liu')." ) # Try the exact Toeplitz-M eigendecomposition; if that's # unavailable (broad-support or indefinite Λ), fall back # to Hutchinson-estimated cumulants. try: evals = kernel.eigenvalues(return_full_layout=True) if evals.min() < -0.1: raise ValueError( "Kernel has significant negative eigenvalues; " "Liu's method may be invalid." ) sig_evals = evals[evals > 1e-9] coef = _liu_prepare(sig_evals, n=n_kernel) except NotImplementedError: c = _hutchinson_cumulants(kernel, n_probes=60) coef = _liu_prepare_from_cumulants(c, n=n_kernel) pvals = np.atleast_1d(_liu_apply(Q_arr, coef)) else: raise ValueError(f"Unknown null approximation method: {null_approx!r}") if batched: return Q_arr, pvals return float(Q_arr[0]), float(pvals[0]) def _r_test_nufft( Xn: np.ndarray, Yn: np.ndarray, kernel: NUFFTKernel, null_params: dict | None = None, return_pval: bool = True, is_standardized: bool = False, ) -> float | np.ndarray | tuple[float, float] | tuple[np.ndarray, np.ndarray]: """ Spatial R-test on irregular 2D coordinates. Dispatches purely on the input shape: - Paired (``M_x == M_y``) — returns the ``(M,)`` diagonal of ``Xᵀ K Y`` via cross Parseval ``R = (1/n') Σ_k λ(k) · conj(x̂(k)) · ŷ(k)`` (one pair of type-1 NUFFTs; see :meth:`NUFFTKernel.xtKy`). - Bipartite (``M_x != M_y``) — returns the full ``(M_x, M_y)`` cross matrix via ``Xᵀ · self.Kx(Y)``. In either case ``var_R = kernel.square_trace()`` (the default ``centering=True`` returns ``trace((HKH)²)``, which is exactly ``Var[Xᵀ K Y]`` on z-scored inputs). Parameters ---------- Xn, Yn : np.ndarray ``(n,)`` or ``(n, M)``. kernel : NUFFTKernel null_params : dict, optional ``{'var_R': ...}`` in the n-point-operator units. return_pval : bool, default True is_standardized : bool, default False """ Xn = np.asarray(Xn, dtype=float) Yn = np.asarray(Yn, dtype=float) if Xn.ndim == 1: Xn = Xn[:, None] if Yn.ndim == 1: Yn = Yn[:, None] if Xn.shape[0] != kernel.n or Yn.shape[0] != kernel.n: raise ValueError( f"Xn, Yn first dim must equal kernel.n={kernel.n}; " f"got {Xn.shape[0]}, {Yn.shape[0]}." ) Xz = Xn if is_standardized else _standardize_features(Xn) Yz = Yn if is_standardized else _standardize_features(Yn) if Xn.shape[1] == Yn.shape[1]: # Paired diagonal via cross Parseval — one type-1 NUFFT per column. R = np.atleast_1d(kernel.xtKy(Xz, Yz)) else: # Bipartite (M_x, M_y) cross matrix via NUFFT round-trip on Y. KY = kernel.Kx(Yz) # (n, M_y) R = Xz.T @ KY # (M_x, M_y) if not return_pval: return R.squeeze() if R.size > 1 else float(R) if null_params is not None and "var_R" in null_params: var_R = float(null_params["var_R"]) else: # kernel.square_trace() returns trace((HKH)²) by default (centering=True), # which is exactly Var[R] for Zₓᵀ K Zᵧ with both X, Y z-scored. var_R = float(kernel.square_trace()) sigma = float(np.sqrt(max(var_R, 1e-30))) z_scores = R / sigma pvals = 2.0 * norm.sf(np.abs(z_scores)) return R.squeeze(), pvals.squeeze()