from __future__ import annotations
import logging
from typing import Any
import numpy as np
import pandas as pd
import scipy.sparse as sp
from joblib import Parallel, delayed
from scipy.stats import norm
from tqdm import tqdm
from quadsv.detectors.base import Detector
from quadsv.kernels import Kernel, MatrixKernel
from quadsv.kernels.nufft import NUFFTKernel, _standardize_features
from quadsv.statistics import compute_null_params, spatial_q_test
from quadsv.utils import _apply_bh_correction
__all__ = ["DetectorIrregular"]
logger = logging.getLogger(__name__)
# helper function for parallel Q-stat computation
def _qstat_worker(
X_csc: sp.csc_matrix,
feature_indices: np.ndarray,
kernel_obj: Kernel,
null_params: dict[str, float | np.ndarray],
means: np.ndarray,
stds: np.ndarray,
names: list[str],
return_pval: bool,
chunk_size: int = 64,
) -> list[dict[str, str | float]]:
"""
Worker function for parallel Q-statistic computation.
Processes a batch of features using pre-computed null distribution parameters.
Handles zero-variance features gracefully.
Parameters
----------
X_csc : scipy.sparse.csc_matrix
Sparse feature matrix (N_samples, N_features_batch). Column-compressed format.
feature_indices : np.ndarray
Global indices of features in this batch (for looking up means/stds/names).
kernel_obj : Kernel
Pre-constructed kernel object (shared across workers).
null_params : dict
Pre-computed null distribution parameters from compute_null_params().
Keys: 'mean_Q', 'var_Q'.
means : np.ndarray
Global feature means (shape: N_total_features).
stds : np.ndarray
Global feature standard deviations (shape: N_total_features).
names : List[str]
Global feature names (length: N_total_features).
return_pval : bool
Whether to compute p-values for each feature.
chunk_size : int, default 64
Number of features to process in each internal batch for vectorization.
Returns
-------
list[dict[str, str or float]]
Each dict contains: {'Feature': str, 'Q': float, 'P_value': float, 'Z_score': float}
"""
results = []
n_features = len(feature_indices)
mu = null_params["mean_Q"]
sigma = np.sqrt(null_params["var_Q"])
# Fast path: pass sparse X_batch + (means, stds) straight into the kernel's
# sparsity-preserving standardized quadratic form. Avoids the (n, batch) dense
# copy that the old path allocated just to subtract the per-column mean.
use_sparse_fastpath = hasattr(kernel_obj, "xtKx_standardized")
for start in range(0, n_features, chunk_size):
end = min(start + chunk_size, n_features)
local_slice = slice(start, end)
batch_global_indices = feature_indices[start:end]
b_means = means[batch_global_indices]
b_stds = stds[batch_global_indices]
valid_mask = b_stds > 1e-9
if use_sparse_fastpath:
X_batch_sp = X_csc[:, local_slice]
Q_batch = np.atleast_1d(kernel_obj.xtKx_standardized(X_batch_sp, b_means, b_stds))
# xtKx_standardized already returns 0.0 for std<=0 columns.
P_batch = (
_pvals_from_null(Q_batch, null_params)
if return_pval
else np.full(Q_batch.shape, np.nan)
)
else:
# Fallback (e.g. raw-matrix "kernel"): densify and z-score explicitly.
X_batch = X_csc[:, local_slice].toarray()
Z_batch = np.zeros_like(X_batch)
if np.any(valid_mask):
Z_batch[:, valid_mask] = (X_batch[:, valid_mask] - b_means[valid_mask]) / b_stds[
valid_mask
]
if return_pval:
Q_batch, P_batch = spatial_q_test(
Z_batch, kernel_obj, null_params, return_pval=True, is_standardized=True
)
else:
Q_batch = spatial_q_test(
Z_batch, kernel_obj, null_params, return_pval=False, is_standardized=True
)
P_batch = np.full(np.shape(Q_batch), np.nan)
Q_batch = np.atleast_1d(Q_batch)
P_batch = np.atleast_1d(P_batch)
Z_scores = (Q_batch - mu) / sigma if sigma > 0 else np.zeros_like(Q_batch)
for k, global_idx in enumerate(batch_global_indices):
if not valid_mask[k]:
res = {"Feature": names[global_idx], "Q": 0.0, "P_value": 1.0, "Z_score": 0.0}
else:
res = {
"Feature": names[global_idx],
"Q": Q_batch[k],
"P_value": P_batch[k],
"Z_score": Z_scores[k],
}
results.append(res)
return results
def _pvals_from_null(Q: np.ndarray, null_params: dict) -> np.ndarray:
"""Apply the configured null approximation to a pre-computed Q vector.
Used by the sparse fast path in :func:`_qstat_worker`, where the quadratic
form has already been computed via
:meth:`~quadsv.kernels.MatrixKernel.xtKx_standardized` and only the p-value
stage remains. Mirrors the dispatch logic in
:func:`quadsv.statistics.spatial_q_test`.
"""
from scipy.stats import chi2 as _chi2
method = null_params.get("method", "welch")
if method == "welch":
g = null_params["scale_g"]
d = null_params["df_h"]
return _chi2.sf(Q / g, df=d)
if method == "clt":
mu = null_params["mean_Q"]
var = null_params["var_Q"]
if var <= 0:
return np.ones_like(Q, dtype=float)
z = (Q - mu) / np.sqrt(var)
return _chi2.sf(z**2, df=1)
if method == "liu":
from quadsv.statistics import (
_liu_apply,
_liu_prepare_from_cumulants,
)
coef = null_params.get("liu_coef")
if coef is None:
if "cumulants" not in null_params:
raise ValueError(
"null_params with method='liu' must contain either "
"'liu_coef' (preferred) or 'cumulants'. Build via "
"compute_null_params(kernel, method='liu')."
)
coef = _liu_prepare_from_cumulants(null_params["cumulants"])
return np.atleast_1d(_liu_apply(np.asarray(Q, dtype=float), coef))
return np.ones_like(Q, dtype=float)
# optimized R-stat worker with pre-computed K@Y
def _rstat_worker_chunked(
X_csc: sp.csc_matrix,
y_chunk_indices: np.ndarray,
x_indices: np.ndarray,
kernel_obj: Kernel,
null_params: dict[str, float],
means: np.ndarray,
stds: np.ndarray,
names: list[str],
return_pval: bool,
) -> list[dict[str, str | float]]:
"""
Optimized worker for R-statistic computation with pre-computed K@Y.
Pre-computes K@Y_chunk once, then reuses for all X features paired with Y chunk.
This avoids redundant K matrix multiplications.
Parameters
----------
X_csc : scipy.sparse.csc_matrix
Sparse feature matrix (N_samples, N_features).
y_chunk_indices : np.ndarray
Indices of Y features to process in this chunk.
x_indices : np.ndarray
Indices of all X features to pair with Y chunk.
kernel_obj : Kernel
Pre-constructed kernel object.
null_params : dict
Pre-computed null parameters: ``'var_R'`` (``trace(K²)``). ``'mean_R'``
is implicitly 0.
means : np.ndarray
Feature means for standardization.
stds : np.ndarray
Feature standard deviations.
names : List[str]
Feature names.
return_pval : bool
Whether to compute p-values.
Returns
-------
list[dict[str, str or float]]
Results for all (x, y) pairs in this chunk.
"""
results = []
sigma = np.sqrt(null_params["var_R"])
# Slice X and Y blocks as *sparse* — no densification for standardization.
# The only unavoidable densification is ``K @ Y_block`` for kernels whose
# apply is naturally dense (LU solve / BLAS matmul); done once per Y-chunk.
Y_block = X_csc[:, y_chunk_indices] # (n, n_y) sparse
X_block = X_csc[:, x_indices] # (n, n_x) sparse
y_means = means[y_chunk_indices]
y_stds = stds[y_chunk_indices]
x_means = means[x_indices]
x_stds = stds[x_indices]
y_valid = y_stds > 1e-9
x_valid = x_stds > 1e-9
if hasattr(kernel_obj, "_apply_K_dense") and hasattr(kernel_obj, "_K_column_sums"):
# Sparse-preserving cross R-test.
# R[i,j] = (x_i - μx[i])ᵀ K (y_j - μy[j]) / (σx[i] σy[j])
# = ( x_iᵀ K y_j - μx[i]·K_sumᵀy_j - μy[j]·K_sumᵀx_i
# + μx[i]·μy[j]·K_total ) / (σx[i]·σy[j])
# Every term is computed from sparse X / Y blocks and the kernel's
# cached (K·1, 1ᵀK1) moments.
K_sum, K_total = kernel_obj._K_column_sums() # (n,), scalar
KY = kernel_obj._apply_K_dense(Y_block.toarray()) # (n, n_y) dense, once
R_raw = np.asarray(X_block.T @ KY) # (n_x, n_y) sparse.T @ dense
ksum_x = np.asarray(X_block.T @ K_sum).ravel() # (n_x,)
ksum_y = np.asarray(Y_block.T @ K_sum).ravel() # (n_y,)
R_corrected = (
R_raw
- x_means[:, None] * ksum_y[None, :]
- y_means[None, :] * ksum_x[:, None]
+ np.outer(x_means, y_means) * K_total
)
sx = np.where(x_valid, x_stds, 1.0)
sy = np.where(y_valid, y_stds, 1.0)
R_block = R_corrected / (sx[:, None] * sy[None, :])
R_block[~x_valid, :] = 0.0
R_block[:, ~y_valid] = 0.0
else:
# Fallback for raw matrices without the Kernel helpers: old dense path.
Y_dense = Y_block.toarray()
Zy = np.zeros_like(Y_dense)
if np.any(y_valid):
Zy[:, y_valid] = (Y_dense[:, y_valid] - y_means[y_valid]) / y_stds[y_valid]
KZy = kernel_obj.dot(Zy) if hasattr(kernel_obj, "dot") else np.asarray(kernel_obj @ Zy)
X_dense = X_block.toarray()
Zx = np.zeros_like(X_dense)
if np.any(x_valid):
Zx[:, x_valid] = (X_dense[:, x_valid] - x_means[x_valid]) / x_stds[x_valid]
R_block = Zx.T @ KZy # (n_x, n_y)
# Vectorized p-value / z-score stage
if return_pval and sigma > 0:
Z_scores_block = R_block / sigma
P_block = 2 * norm.sf(np.abs(Z_scores_block))
else:
Z_scores_block = np.full_like(R_block, np.nan)
P_block = np.full_like(R_block, np.nan)
# Emit one result row per (x, y) pair — preserves the original output shape
# (y_idx iterates local positions within this chunk).
for xi, x_idx in enumerate(x_indices):
name_x = names[x_idx]
for yj, y_global_idx in enumerate(y_chunk_indices):
results.append(
{
"Feature_1": name_x,
"Feature_2": names[y_global_idx],
"R": R_block[xi, yj],
"Z_score": Z_scores_block[xi, yj],
"P_value": P_block[xi, yj],
}
)
return results
[docs]
class DetectorIrregular(Detector):
r"""
Detect spatial patterns on **irregular** samples (AnnData spots / cells).
Univariate (Q-test) and bivariate (R-test) kernel-based spatial statistics.
Supports two backends:
- ``backend='matrix'`` — :class:`~quadsv.MatrixKernel` (dense or implicit
sparse-precision, auto-selected by ``n``). Good up to ~10⁴ spots.
- ``backend='nufft'`` — :class:`~quadsv.NUFFTKernel`, ``O(n log n)`` quadratic
forms on arbitrary point sets. Recommended for ≥ 10⁴ spots.
The core test statistics are:
- Univariate: :math:`Q = \\mathbf{x}^T \\mathbf{K} \\mathbf{x}`
- Bivariate: :math:`R = \\mathbf{x}^T \\mathbf{K} \\mathbf{y}`
Workflow
--------
1. **Construct** with kernel method + backend + kernel hyperparameters.
2. **Setup** with :meth:`setup_data` passing the :class:`anndata.AnnData`
plus spatial source (``obsm_key`` in ``obsm``, or
``obsp_key`` for precomputed adjacency / distance).
3. **Compute** with :meth:`compute_qstat` / :meth:`compute_rstat`.
Parameters
----------
kernel_method : str, default ``'matern'``
One of ``'gaussian'``, ``'matern'``, ``'moran'``, ``'graph_laplacian'``,
``'car'``.
backend : {``'matrix'``, ``'nufft'``}, default ``'matrix'``
Kernel backend.
**kernel_params
Method- and backend-specific kernel hyperparameters. Matrix backend:
``bandwidth``, ``nu``, ``rho``, ``k_neighbors``, ``standardize``.
NUFFT backend: ``bandwidth``, ``nu``, ``rho``, ``neighbor_degree``,
plus grid controls ``grid_shape``, ``spacing``, ``unit_scale``,
``oversample``, ``eps``.
Attributes
----------
backend\_ : {``'matrix'``, ``'nufft'``}
Which backend was selected at construction.
adata : :class:`anndata.AnnData` or None
Input container set by :meth:`setup_data`.
min_cells : int or None
Minimum non-zero count per feature; set by :meth:`setup_data`.
kernel\_ : :class:`~quadsv.kernels.Kernel` or None
The built kernel; populated by :meth:`setup_data`.
kernel_method\_, kernel_params\_, n
See :class:`Detector`.
Examples
--------
>>> import anndata as ad, numpy as np
>>> from quadsv import DetectorIrregular
>>> rng = np.random.default_rng(0)
>>> adata = ad.AnnData(X=rng.standard_normal((200, 5)))
>>> adata.obsm["spatial"] = rng.standard_normal((200, 2))
>>> det = DetectorIrregular(kernel_method="car", rho=0.9, k_neighbors=8)
>>> det.setup_data(adata, min_cells=5) # doctest: +ELLIPSIS
<DetectorIrregular ...>
>>> # q = det.compute_qstat()
"""
_NUFFT_ONLY_GRID_KEYS = ("grid_shape", "spacing", "unit_scale", "oversample", "eps")
def __init__(
self,
kernel_method: str = "matern",
backend: str = "matrix",
**kernel_params: Any,
) -> None:
if backend not in ("matrix", "nufft"):
raise ValueError(f"backend must be 'matrix' or 'nufft', got {backend!r}.")
[docs]
self.backend_: str = backend
"""Which backend will build the kernel — ``'matrix'`` or ``'nufft'``."""
super().__init__(kernel_method, **kernel_params)
# Data-state attrs (populated by setup_data):
[docs]
self.adata: Any | None = None
"""Reference to the input :class:`anndata.AnnData`, set by :meth:`setup_data`."""
[docs]
self.min_cells: int | None = None
"""Minimum non-zero-count threshold applied in :meth:`setup_data`."""
def _merge_kernel_defaults(self, method: str, user_params: dict) -> dict:
"""Merge per-method defaults with user overrides.
Matrix backend uses ``k_neighbors``; NUFFT backend uses
``neighbor_degree`` (matching :class:`FFTKernel`) and exposes extra
grid-spacing controls.
"""
if self.backend_ == "matrix":
method_defaults = {
"gaussian": {"bandwidth": 2.0},
"matern": {"bandwidth": 2.0, "nu": 1.5},
"moran": {"k_neighbors": 4},
"graph_laplacian": {"k_neighbors": 4},
"car": {"rho": 0.9, "k_neighbors": 4, "standardize": False},
}
else: # nufft
method_defaults = {
"gaussian": {"bandwidth": 2.0},
"matern": {"bandwidth": 2.0, "nu": 1.5},
"moran": {"neighbor_degree": 1},
"graph_laplacian": {"neighbor_degree": 1},
"car": {"rho": 0.9, "neighbor_degree": 1},
}
defaults = method_defaults.get(method, {}).copy()
# NUFFT accepts grid controls in addition to the kernel-method params.
# None sentinels → auto-infer inside NUFFTKernel.
if self.backend_ == "nufft":
defaults.update(
{
"grid_shape": None,
"spacing": None,
"unit_scale": 1.0,
"oversample": 2.0,
"eps": 1e-6,
}
)
for k, v in user_params.items():
if k not in defaults:
raise ValueError(
f"Unknown parameter {k!r} for method {method!r} under "
f"backend={self.backend_!r}. Allowed: {sorted(defaults)}."
)
defaults[k] = v
return defaults
[docs]
def setup_data(
self,
adata: Any,
*,
obsm_key: str = "spatial",
obsp_key: str | None = None,
is_distance: bool = False,
min_cells: int = 1,
min_cells_frac: float | None = None,
) -> DetectorIrregular:
"""
Attach ``adata``, apply feature filters, build the kernel.
Parameters
----------
adata : :class:`anndata.AnnData`
Input container. Must have ``adata.obsm[obsm_key]`` (unless
``obsp_key`` is provided instead).
obsm_key : str, default ``'spatial'``
Key in ``adata.obsm`` holding ``(n_obs, 2)`` spatial coordinates.
Used when ``obsp_key`` is ``None``.
obsp_key : str, optional
If provided, build the kernel from ``adata.obsp[obsp_key]``
instead of from coordinates. Not compatible with ``backend='nufft'``.
is_distance : bool, default ``False``
When ``obsp_key`` is given: treat the matrix as pairwise distances
(``True``) or adjacency / connectivity (``False``).
min_cells : int, default 1
Minimum number of cells with non-zero value for a feature to be
tested. Clamped to ``[1, n_obs]``.
min_cells_frac : float, optional
If provided, overrides ``min_cells`` with
``max(1, int(min_cells_frac * n_obs))``.
Returns
-------
self : DetectorIrregular
"""
self.adata = adata
self.n = adata.shape[0]
if min_cells_frac is not None:
self.min_cells = max(1, int(min_cells_frac * self.n))
else:
self.min_cells = min(min_cells, self.n)
if obsp_key is not None:
if self.backend_ == "nufft":
raise ValueError("obsp_key is not supported with backend='nufft'.")
self._build_kernel_from_obsp(
obsp_key,
is_distance=is_distance,
method=self.kernel_method_,
**self.kernel_params_,
)
else:
self._build_kernel_from_obsm(obsm_key=obsm_key)
self._data_ready = True
return self
# ------------------------------------------------------------------
# Auto-tuning helpers
# ------------------------------------------------------------------
@staticmethod
def _resolve_n_jobs(n_jobs: int | str) -> int:
"""Turn a joblib-style ``n_jobs`` (``-1``, ``'auto'``, positive int)
into a concrete worker count."""
import os
if isinstance(n_jobs, str):
if n_jobs != "auto":
raise ValueError(f"n_jobs must be 'auto', -1, or a positive int; got {n_jobs!r}.")
return os.cpu_count() or 1
n_jobs = int(n_jobs)
if n_jobs == -1:
return os.cpu_count() or 1
if n_jobs < 1:
raise ValueError(f"n_jobs must be >= 1 (or -1/'auto' for all cores); got {n_jobs}.")
return n_jobs
def _auto_chunk_size(
self,
n_jobs: int = 1,
budget_bytes: int = 2 * (1 << 30),
) -> int:
"""Thin wrapper around :func:`quadsv.statistics.auto_chunk_size`.
Delegates to the shared helper so the chunk-size policy stays
in one place across :class:`DetectorIrregular`,
:class:`DetectorGrid`, and :func:`~quadsv.spatial_q_test` /
:func:`~quadsv.spatial_r_test`. See the helper's docstring for
the cache sweet-spot caps and per-feature memory model.
Parameters
----------
n_jobs : int, default 1
Number of parallel workers the caller plans to use. Callers
should pre-resolve ``-1`` / ``'auto'`` via
:meth:`_resolve_n_jobs`.
budget_bytes : int, default 2 GiB
Aggregate live-memory cap across *all* workers.
Returns
-------
int
Batch size to use inside :func:`~quadsv.spatial_q_test` /
:func:`~quadsv.spatial_r_test`.
"""
from quadsv.statistics import auto_chunk_size
if self.kernel_ is None:
# Kernel not built yet — fall back to a conservative MatrixKernel
# default. Used only by very early setup paths; normal flows call
# this after ``setup_data``.
n = self.n or 1
per_feat = 16 * n
chunk_cap = 16
n_workers = max(1, int(n_jobs))
per_worker_budget = max(per_feat, budget_bytes // n_workers)
mem_cap = int(per_worker_budget // per_feat)
return int(np.clip(min(mem_cap, chunk_cap), 8, chunk_cap))
return auto_chunk_size(self.kernel_, n_jobs=n_jobs, budget_bytes=budget_bytes)
def _build_kernel_from_obsm(self, obsm_key: str = "spatial") -> None:
"""Build the kernel over ``adata.obsm[obsm_key]`` using the
backend / method / params selected at construction. Called from
:meth:`setup_data`.
"""
if obsm_key not in self.adata.obsm:
raise KeyError(
f"adata.obsm has no key '{obsm_key}'; "
f"available: {list(self.adata.obsm.keys())}."
)
coords = np.asarray(self.adata.obsm[obsm_key], dtype=np.float64)
if coords.ndim != 2 or coords.shape[1] != 2:
raise ValueError(f"adata.obsm['{obsm_key}'] must be (n_obs, 2), got {coords.shape}.")
if coords.shape[0] != self.n:
raise ValueError(
f"coords shape {coords.shape} inconsistent with adata.shape=({self.n}, ...)."
)
logger.info(
"Building %s %sKernel over %d spots...",
self.kernel_method_,
"Matrix" if self.backend_ == "matrix" else "NUFFT",
self.n,
)
if self.backend_ == "matrix":
self.kernel_ = MatrixKernel.from_coordinates(
coords, method=self.kernel_method_, **self.kernel_params_
)
else:
self.kernel_ = NUFFTKernel(
coords=coords, method=self.kernel_method_, **self.kernel_params_
)
# NUFFTKernel resolves its own params (including any None-sentinel
# auto-infers); snapshot them back so kernel_params_ reflects reality.
self.kernel_params_ = dict(self.kernel_.params)
def _build_kernel_from_obsp( # noqa: C901
self, key: str, is_distance: bool = False, method: str = "car", **kernel_params: Any
) -> None:
"""Build a graph kernel from ``adata.obsp[key]``. Called from
:meth:`setup_data` when ``obsp_key`` is provided.
Handles 'precomputed' (matrix IS the kernel), distance matrices
(Gaussian / Matern transform), and adjacency matrices (Moran /
graph_laplacian / CAR). Isolated nodes (zero degree) are removed.
"""
if key not in self.adata.obsp:
raise KeyError(f"Matrix key '{key}' not found in adata.obsp")
matrix = self.adata.obsp[key]
if method not in list(self._available_kernels) + ["precomputed"]:
raise ValueError(
f"Method '{method}' not recognized. Must be one of {self._available_kernels} or 'precomputed'."
)
# If the user says 'precomputed', they imply the matrix IS the kernel K
if method == "precomputed":
logger.info("Using obsp['%s'] directly as kernel matrix (n_samples=%d)...", key, self.n)
self.kernel_ = MatrixKernel.from_matrix(matrix, is_precision=False)
self.kernel_params_ = kernel_params
self.kernel_method_ = method
return
logger.info(
"Building %s kernel from obsp['%s'] (is_distance=%s, n_samples=%d)...",
method,
key,
is_distance,
self.n,
)
# kernel_params already carries defaults merged in __init__; no re-merge needed.
kernel_params_ = dict(kernel_params)
# --- Distance Based Transformations ---
if is_distance:
if method == "gaussian":
bw = kernel_params_["bandwidth"]
# K = exp(-d^2 / 2bw^2)
if sp.issparse(matrix):
matrix = matrix.toarray() # Gaussian usually requires dense
K = np.exp(-(matrix**2) / (2 * bw**2))
self.kernel_ = MatrixKernel.from_matrix(K, is_precision=False)
self.kernel_params_ = {"bandwidth": bw}
elif method == "matern":
from scipy.special import gamma, kv
bw = kernel_params_["bandwidth"]
nu = kernel_params_["nu"]
if sp.issparse(matrix):
matrix = matrix.toarray()
dists = matrix.copy()
dists[dists == 0] = 1e-15
factor = (np.sqrt(2 * nu) * dists) / bw
K = (2 ** (1 - nu) / gamma(nu)) * (factor**nu) * kv(nu, factor)
np.fill_diagonal(K, 1.0)
self.kernel_ = MatrixKernel.from_matrix(K, is_precision=False)
self.kernel_params_ = {"bandwidth": bw, "nu": nu}
else:
raise ValueError(f"Method {method} not supported for distance matrices.")
# --- Connectivity Based Transformations ---
else:
# Assume the input matrix is W (adjacency/connectivity)
W = matrix
if not sp.issparse(W):
W = sp.csr_matrix(W)
# Remove isolated cells (zero-degree nodes)
row_sums_raw = np.array(W.sum(axis=1)).flatten()
keep_mask = row_sums_raw > 0
if not np.all(keep_mask):
removed = int((~keep_mask).sum())
logger.info(
"Removing %d isolated samples with zero degree from adjacency matrix...",
removed,
)
W = W[keep_mask][:, keep_mask]
self.adata = self.adata[keep_mask].copy()
self.n = W.shape[0]
self.min_cells = min(self.min_cells, self.n)
# Symmetrize and symmetric normalization
W = W.astype(float)
W_sym = 0.5 * (W + W.T)
row_sums = np.array(W_sym.sum(axis=1)).flatten()
row_sums[row_sums == 0] = 1.0
inv_D_sqrt = sp.diags(1.0 / np.sqrt(row_sums))
W_norm = inv_D_sqrt @ W_sym @ inv_D_sqrt
if method == "moran":
# Already symmetric and normalized
K = W_norm
self.kernel_ = MatrixKernel.from_matrix(K, is_precision=False)
self.kernel_params_ = {}
elif method == "graph_laplacian":
# K = I - W_norm
I = sp.eye(self.n, format="csr")
K = I - W_norm
self.kernel_ = MatrixKernel.from_matrix(K, is_precision=False)
self.kernel_params_ = {}
elif method == "car":
# This is the "Implicit" case.
# The kernel K = (I - rho*W)^-1.
# We construct M = (I - rho*W) and tell MatrixKernel it is the INVERSE.
rho = kernel_params_["rho"]
standardize = kernel_params_["standardize"]
I = sp.eye(self.n, format="csc")
M = I - rho * W_norm # M is the inverse of K
# We pass M and set is_precision=True. MatrixKernel will handle
# standardizing the precision to ensure diag(K)=1 if requested.
self.kernel_ = MatrixKernel.from_matrix(
M,
is_precision=True,
method="car",
rho=rho,
standardize=standardize,
)
self.kernel_params_ = {"rho": rho, "standardize": standardize}
else:
raise ValueError(f"Method {method} not supported for connectivity matrices.")
self.backend_ = "matrix"
def _prepare_data(
self, source: str, keys: list[str] | None, min_cells: int, layer: str | None = None
) -> tuple[sp.csr_matrix, list[str], np.ndarray, np.ndarray]:
"""
Prepare anndata feature data for testing (standardization, filtering).
Extracts features from .obs or .var, applies quality filters (min non-zero count),
handles categorical features (one-hot encoding), and computes per-feature statistics.
Parameters
----------
source : str
Feature source: 'obs' or 'var'.
keys : Optional[List[str]]
Feature names to extract. If None, uses all features in source.
min_cells : int
Minimum number of non-zero values required per feature.
layer : Optional[str]
If source='var', which layer to use. If None, uses .X.
Returns
-------
X_csc : scipy.sparse.csc_matrix
Sparse feature matrix (n_samples, n_features), column-compressed.
names : List[str]
Feature names in order matching X_csc columns.
means : np.ndarray
Per-feature means (shape: n_features).
stds : np.ndarray
Per-feature standard deviations (shape: n_features).
Notes
-----
- Categorical features in .obs are automatically one-hot encoded.
- Features with zero variance or fewer than min_cells non-zeros are filtered.
- Sparse matrices are preserved for memory efficiency.
"""
if source == "obs":
# check if keys are in obs
if keys is not None:
valid = [k for k in keys if k in self.adata.obs.columns]
if len(valid) == 0:
raise ValueError("None of the specified keys found in adata.obs.")
adata_tmp = self.adata.obs[valid].copy()
else:
raise ValueError("Keys must be provided when feature source is 'obs'.")
# check if .obs[keys] are char/categorical
if any(adata_tmp.dtypes == "object") or any(adata_tmp.dtypes == "category"):
logger.info(
"Categorical features detected in .obs[keys]; performing one-hot encoding..."
)
# one-hot encode categorical variables while keeping others unchanged
dummies = pd.get_dummies(adata_tmp)
names = dummies.columns.tolist()
X_dense = dummies.values.astype(np.float32)
means = X_dense.mean(axis=0)
stds = X_dense.std(axis=0, ddof=1)
X_csc = sp.csc_matrix(X_dense)
else:
# All numeric - no encoding needed
names = adata_tmp.columns.tolist()
X_dense = adata_tmp.values.astype(np.float32)
means = X_dense.mean(axis=0)
stds = X_dense.std(axis=0, ddof=1)
X_csc = sp.csc_matrix(X_dense)
elif source == "var":
# check if keys are in var
if keys is not None:
valid = [g for g in keys if g in self.adata.var_names]
adata_tmp = self.adata[:, valid].copy()
else:
adata_tmp = self.adata.copy()
# extract feature matrix
if layer is not None:
X = adata_tmp.layers[layer]
else:
X = adata_tmp.X
names = adata_tmp.var_names.tolist()
# compute means and stds (ddof=1 for sample std, consistent with statistics.py)
n_obs = X.shape[0]
if sp.issparse(X):
means = np.array(X.mean(axis=0)).flatten()
X2 = X.copy()
X2.data **= 2
means2 = np.array(X2.mean(axis=0)).flatten()
var = means2 - (means**2)
var[var < 0] = 0
# Bessel correction: population var -> sample var
if n_obs > 1:
var = var * n_obs / (n_obs - 1)
stds = np.sqrt(var)
X_csc = X.tocsc()
else:
means = np.mean(X, axis=0)
stds = np.std(X, axis=0, ddof=1)
X_csc = sp.csc_matrix(X)
else:
raise ValueError("Source must be either 'obs' or 'var'.")
# Filter constant features and those with too few non-zeros
to_keep = (stds > 0) & (X_csc.getnnz(axis=0) >= min_cells)
X_csc = X_csc[:, to_keep]
names = [names[i] for i in range(len(names)) if to_keep[i]]
means = means[to_keep]
stds = stds[to_keep]
return X_csc, names, means, stds
[docs]
def compute_qstat(
self,
source: str = "var",
features: list[str] | None = None,
n_jobs: int = -1,
layer: str | None = None,
return_pval: bool = True,
chunk_size: int | str = "auto",
show_progress: bool = True,
) -> pd.DataFrame:
"""
Compute univariate spatial Q-statistic for selected features.
Tests each feature for significant spatial clustering or dispersion using the
pre-built kernel. Parallelizes across features and applies Benjamini-Hochberg
multiple testing correction.
Parameters
----------
source : str, default 'var'
Feature source: 'var' (genes) or 'obs' (metadata columns).
features : Optional[List[str]]
Feature names to test. If None, tests all features in source.
n_jobs : int, default -1
Number of parallel jobs. -1 uses all available cores; 1 for sequential.
layer : Optional[str]
If source='var', which layer to use (e.g., 'raw', 'log1p'). If None, uses .X.
return_pval : bool, default True
If True, returns p-values and BH-corrected p-values. If False, returns Q only.
chunk_size : int or ``'auto'``, default ``'auto'``
Number of features each worker densifies at once (inner batch). ``'auto'``
targets ~256 MB per batch using :meth:`_auto_chunk_size`, yielding
``chunk_size ≈ clip(16, 512, 256 MB / (4 · n · 8 B))``. Override with an
integer when memory is tight or you want deterministic batching.
show_progress : bool, default True
Show a tqdm progress bar over worker chunks.
Returns
-------
df : pd.DataFrame
Results sorted by Q (descending). Columns:
- Feature: feature name
- Q: test statistic (univariate spatial variability)
- Z_score: standardized Q by null mean/std
- P_value: tail probability under null (if return_pval=True)
- P_adj: Benjamini-Hochberg adjusted p-value (if return_pval=True)
Raises
------
ValueError
If kernel not initialized, or source is invalid.
Notes
-----
Under H₀: feature has no spatial structure.
Under H₁: significant spatial signal (clustering or dispersion).
Zero-variance features are assigned Q=0, P_value=1.0.
The null-distribution approximation is auto-selected from
``self.kernel_method_`` (``'clt'`` for Moran's I, ``'welch'`` for all other
kernels) and cannot be overridden through this method. For full control
over the null method (including ``'liu'``), call
:func:`quadsv.statistics.spatial_q_test` directly.
Examples
--------
>>> detector.setup_data(adata)
>>> results = detector.compute_qstat(source='var', features=['Gene1', 'Gene2'], n_jobs=-1)
>>> top_genes = results.iloc[:10]
"""
# 1. Ensure Kernel Exists
self._require_setup()
# Resolve n_jobs first so chunk_size='auto' can divide the live-
# memory budget across the actual worker count (see
# _auto_chunk_size).
n_jobs = self._resolve_n_jobs(n_jobs)
if isinstance(chunk_size, str):
if chunk_size != "auto":
raise ValueError(f"chunk_size must be 'auto' or int, got {chunk_size!r}.")
chunk_size = self._auto_chunk_size(n_jobs=n_jobs)
# NUFFT backend takes a different code path (no dense K, different
# null rescaling). Dispatch early and delegate.
if self.backend_ == "nufft":
return self._compute_qstat_nufft(
source=source,
features=features,
n_jobs=n_jobs,
layer=layer,
return_pval=return_pval,
chunk_size=chunk_size,
show_progress=show_progress,
)
# 2. Compute Null Distribution
null_method = "clt" if self.kernel_method_ in ["moran"] else "welch"
logger.info("Computing null distribution approximation (method=%s)...", null_method)
null_params = compute_null_params(self.kernel_, method=null_method)
# 3. Prepare Data
X_csc, names, means, stds = self._prepare_data(
source=source, keys=features, min_cells=self.min_cells, layer=layer
)
# 4. Parallel Execution
n_feats = len(names)
indices = np.arange(n_feats)
chunks = np.array_split(indices, max(n_jobs * 4, 1))
logger.info(
"Testing %d features (n_jobs=%d, chunk_size=%d)...", n_feats, n_jobs, chunk_size
)
chunk_iter = chunks
if show_progress:
chunk_iter = tqdm(
chunks,
desc=f"Q ({self.kernel_method_})",
bar_format="{l_bar}{bar:30}{r_bar}{bar:-30b}",
)
results_list = Parallel(n_jobs=n_jobs)(
delayed(_qstat_worker)(
X_csc[:, chunk_idxs],
chunk_idxs,
self.kernel_,
null_params,
means,
stds,
names,
return_pval,
chunk_size,
)
for chunk_idxs in chunk_iter
)
# 5. Aggregate Results
flat_results = [item for sublist in results_list for item in sublist]
df = pd.DataFrame(flat_results).set_index("Feature")
if not return_pval:
df = df.drop(columns=["P_value"])
# 6. Multiple testing correction (Benjamini-Hochberg)
if return_pval:
_apply_bh_correction(df)
return df.sort_values(by="Q", ascending=False)
[docs]
def compute_rstat( # noqa: C901
self,
features_x: list[str] | None = None,
features_y: list[str] | None = None,
source: str = "var",
n_jobs: int = -1,
layer: str | None = None,
return_pval: bool = True,
chunk_size: int | str = "auto",
show_progress: bool = True,
) -> pd.DataFrame:
"""
Compute bivariate spatial R-statistic (cross-spatial correlation) for feature pairs.
Tests for significant spatial co-variation between pairs of features using
the pre-built kernel. Supports symmetric (all pairs within one set) or bipartite
(all X vs Y pairs) modes. Parallelizes computation and applies multiple testing correction.
Parameters
----------
features_x : Optional[List[str]]
Feature names for the first set. If None and features_y is None, uses all features (symmetric mode).
features_y : Optional[List[str]]
Feature names for the second set. If None, computes all pairwise within features_x.
If provided, computes all X vs Y pairs (bipartite mode).
source : str, default 'var'
Feature source: 'var' (genes) or 'obs' (metadata columns).
n_jobs : int, default -1
Number of parallel jobs. -1 uses all available cores; 1 for sequential.
layer : Optional[str]
If source='var', which layer to use (e.g., 'raw', 'log1p'). If None, uses .X.
return_pval : bool, default True
If True, returns p-values and BH-corrected p-values. If False, returns R only.
chunk_size : int or ``'auto'``, default ``'auto'``
Number of Y features to batch together when pre-computing ``K @ Y_chunk``.
``'auto'`` uses :meth:`_auto_chunk_size` (~256 MB per batch target);
integer values override the heuristic.
show_progress : bool, default True
Show a tqdm progress bar over the Y-chunk loop.
Returns
-------
df : pd.DataFrame
Results sorted by absolute Z_score (descending). Columns:
- Feature_1: name of first feature
- Feature_2: name of second feature
- R: test statistic (bivariate spatial correlation, range approximately [-1, 1])
- Z_score: standardized R by null mean/std
- P_value: two-tailed p-value under null (if return_pval=True)
- P_adj: Benjamini-Hochberg adjusted p-value (if return_pval=True)
Raises
------
ValueError
If kernel not initialized, features_x is None when features_y is provided, or no valid pairs generated.
Notes
-----
Under H₀: features are spatially independent.
Under H₁: significant spatial co-clustering or co-dispersion.
Unlike :func:`quadsv.statistics.spatial_r_test`, this method always returns R-statistics
for all requested feature pairs in the symmetric mode (``features_y=None``). For
``features_x=[A, B, C]``, the output contains
``(A, A), (A, B), (A, C), (B, A), (B, B), (B, C), (C, A), (C, B), (C, C)``.
P-value calculation uses a normal approximation based on Tr(K²) and is not
configurable through this method. For finer control over the null model,
call :func:`quadsv.statistics.spatial_r_test` directly.
Zero-variance features are handled gracefully (assigned R=0, P=1).
Examples
--------
>>> detector.setup_data(adata)
>>> # All pairwise correlations within gene set
>>> results = detector.compute_rstat(features_x=['Gene1', 'Gene2', 'Gene3'], n_jobs=-1)
>>> # Cross-correlation between two gene sets
>>> results = detector.compute_rstat(
... features_x=['Gene1', 'Gene2'],
... features_y=['Gene3', 'Gene4'],
... n_jobs=-1
... )
"""
self._require_setup()
n_jobs = self._resolve_n_jobs(n_jobs)
if isinstance(chunk_size, str):
if chunk_size != "auto":
raise ValueError(f"chunk_size must be 'auto' or int, got {chunk_size!r}.")
chunk_size = self._auto_chunk_size(n_jobs=n_jobs)
if self.backend_ == "nufft":
return self._compute_rstat_nufft(
features_x=features_x,
features_y=features_y,
source=source,
n_jobs=n_jobs,
layer=layer,
return_pval=return_pval,
chunk_size=chunk_size,
show_progress=show_progress,
)
# 1. Compute Null Params for R
# We need Trace(KK^T) which is Trace(K^2) for symmetric K
# compute_null_params already computes var_Q = 2*Tr(K^2).
# var_R = Tr(K^2) = var_Q / 2.
logger.info("Computing null distribution for R statistic...")
q_null = compute_null_params(self.kernel_, method="clt")
null_params = {
"mean_R": 0.0,
"var_R": q_null["var_Q"] / 2.0, # Derive from existing trace calculation
}
# 2. Prepare Data
# We load all unique features needed
if features_y is None:
unique_feats = features_x
mode = "symmetric"
else:
if features_x is None:
raise ValueError("features_x cannot be None.")
unique_feats = list(set(features_x) | set(features_y))
mode = "bipartite"
X_csc, names, means, stds = self._prepare_data(
source=source, keys=unique_feats, min_cells=self.min_cells, layer=layer
)
# Map names to indices in the prepared matrix
name_to_idx = {n: i for i, n in enumerate(names)}
# 3. Generate Y chunks (for pre-computing K@Y)
if mode == "symmetric":
valid_feats = [f for f in features_x if f in name_to_idx]
valid_y_indices = [name_to_idx[f] for f in valid_feats]
else:
valid_y = [f for f in features_y if f in name_to_idx]
valid_y_indices = [name_to_idx[f] for f in valid_y]
# Chunk Y indices for K@Y pre-computation
y_chunks = []
for start in range(0, len(valid_y_indices), chunk_size):
end = min(start + chunk_size, len(valid_y_indices))
y_chunks.append(valid_y_indices[start:end])
# 4. Generate X features list
if mode == "symmetric":
valid_x_indices = [name_to_idx[f] for f in valid_feats]
else:
valid_x = [f for f in features_x if f in name_to_idx]
valid_x_indices = [name_to_idx[f] for f in valid_x]
if len(valid_x_indices) == 0 or len(valid_y_indices) == 0:
raise ValueError("No valid features found after filtering.")
# 5. Parallel Execution: Process each Y chunk.
# n_jobs is pre-resolved at the compute_rstat entry point.
logger.info(
"Testing %d x %d pairs using %d cores with chunk_size=%d...",
len(valid_x_indices),
len(valid_y_indices),
n_jobs,
chunk_size,
)
results_list = Parallel(n_jobs=n_jobs, prefer="threads")(
delayed(_rstat_worker_chunked)(
X_csc,
y_chunk,
valid_x_indices,
self.kernel_,
null_params,
means,
stds,
names,
return_pval,
)
for y_chunk in (
tqdm(
y_chunks,
desc=f"R ({self.kernel_method_})",
bar_format="{l_bar}{bar:30}{r_bar}{bar:-30b}",
)
if show_progress
else y_chunks
)
)
# 6. Aggregate
flat_results = [item for sublist in results_list for item in sublist]
df = pd.DataFrame(flat_results)
# 7. Multiple testing correction (Benjamini-Hochberg)
if return_pval:
_apply_bh_correction(df)
return df.sort_values(by="Z_score", key=abs, ascending=False)
# ------------------------------------------------------------------
# NUFFT backend paths
# ------------------------------------------------------------------
def _prepare_features_nufft(
self, source: str, features: list[str] | None, layer: str | None
) -> tuple[sp.csc_matrix, list[str], np.ndarray, np.ndarray]:
"""Pull a sparse ``(n_spots, n_features)`` CSC + names + per-feature
mean/std. NUFFT-friendly: computes variance from sparse moments and
never materializes a full dense ``X``."""
if source == "var":
X = self.adata.X if layer is None else self.adata.layers[layer]
X_csc = X.tocsc() if sp.issparse(X) else sp.csc_matrix(X)
names = list(self.adata.var_names)
if features is not None:
selected = [g for g in features if g in names]
missing = set(features) - set(selected)
if missing:
logger.warning(
"Requested features not in adata.var_names: %s", sorted(missing)[:5]
)
idx = [names.index(g) for g in selected]
X_csc = X_csc[:, idx]
names = selected
elif source == "obs":
if features is None:
raise ValueError("source='obs' requires `features` (obs column names).")
cols = [features] if isinstance(features, str) else list(features)
missing = [c for c in cols if c not in self.adata.obs.columns]
if missing:
raise KeyError(f"obs columns missing: {missing}")
X_csc = sp.csc_matrix(self.adata.obs[cols].to_numpy(dtype=np.float64))
names = list(cols)
else:
raise ValueError(f"source must be 'var' or 'obs', got '{source}'.")
nnz_per = np.asarray((X_csc != 0).sum(axis=0)).ravel()
means = np.asarray(X_csc.mean(axis=0)).ravel()
sq = X_csc.multiply(X_csc)
sq_mean = np.asarray(sq.mean(axis=0)).ravel()
var = np.maximum(sq_mean - means**2, 0.0)
stds = np.sqrt(var)
keep = (stds > 0) & (nnz_per >= self.min_cells)
X_kept = X_csc[:, keep]
names_kept = [names[i] for i, k in enumerate(keep) if k]
return X_kept, names_kept, means[keep], stds[keep]
def _compute_qstat_nufft( # noqa: C901
self,
source: str,
features: list[str] | None,
n_jobs: int,
layer: str | None,
return_pval: bool,
chunk_size: int,
show_progress: bool = True,
) -> pd.DataFrame:
"""NUFFT dispatch for :meth:`compute_qstat`. Builds null params once
(n-point-scaled) and delegates per-feature work to
:func:`quadsv.spatial_q_test` on Path A (spectral).
The NUFFT Q-test targets the n-point operator ``K``; moments come from
:meth:`NUFFTKernel.trace` / :meth:`~NUFFTKernel.square_trace` /
:meth:`~NUFFTKernel.eigenvalues` (all already n-point-scaled by the
kernel). No ``n / (ny · nx)`` rescaling at the caller.
"""
kernel = self.kernel_
null_params: dict[str, float | np.ndarray] | None = None
if return_pval:
# Delegate to compute_null_params — it auto-falls back to
# Hutchinson-cumulant Liu when the NUFFT spectrum is
# unavailable (broad support / indefinite Λ). Moran is
# indefinite → CLT enforced there.
nm = "clt" if kernel.method == "moran" else "liu"
null_params = compute_null_params(kernel, method=nm)
logger.info("Preparing %s features (layer=%s)...", source, layer)
X_kept, names_kept, means, stds = self._prepare_features_nufft(source, features, layer)
n_feats = len(names_kept)
if n_feats == 0:
cols = ["Feature", "Q", "Z_score", "P_value", "P_adj"]
return pd.DataFrame(columns=cols)
logger.info("Testing %d features via NUFFT (n_jobs=%s)...", n_feats, n_jobs)
def _batch(batch_idx: np.ndarray) -> list[dict[str, Any]]:
# Densify one small block at a time; never materialize full X.
# Do NOT pre-standardize at irregular points — the NUFFT Q-test
# is now defined on the *grid* representation, so grid-space
# standardization (done inside _q_test_fft via the NUFFT dispatch)
# is what matches the FFT-kernel null distribution.
block = np.asarray(X_kept[:, batch_idx].todense(), dtype=np.float64)
if return_pval:
Q_arr, P_arr = spatial_q_test(block, kernel, null_params=null_params)
Q_arr = np.atleast_1d(Q_arr)
P_arr = np.atleast_1d(P_arr)
else:
Q_arr = np.atleast_1d(spatial_q_test(block, kernel, return_pval=False))
P_arr = np.full_like(Q_arr, np.nan)
# Reference trace and trace²-based Z for reporting.
if null_params is not None:
# Compute Z-score from the cached moments — prefer the
# explicit ``mean_Q/var_Q`` (CLT/Welch path), else
# ``cumulants`` (Liu path, ``c_1``/``c_2`` = trace/sq).
# Any other config yields Z_score=nan (no reference
# moments available).
if "mean_Q" in null_params and "var_Q" in null_params:
trK = float(null_params["mean_Q"])
varQ = float(null_params["var_Q"])
elif "cumulants" in null_params:
c = null_params["cumulants"]
trK = float(c[1])
varQ = 2.0 * float(c[2])
else:
trK, varQ = 0.0, 0.0
sigma = float(np.sqrt(varQ)) if varQ > 0 else 0.0
Z_arr = (Q_arr - trK) / sigma if sigma > 0 else np.zeros_like(Q_arr)
else:
Z_arr = np.full_like(Q_arr, np.nan)
out = []
for local_i, gi in enumerate(batch_idx):
out.append(
{
"Feature": names_kept[gi],
"Q": float(Q_arr[local_i]),
"Z_score": float(Z_arr[local_i]),
"P_value": float(P_arr[local_i]),
}
)
return out
idx_all = np.arange(n_feats)
batches = [idx_all[i : i + chunk_size] for i in range(0, n_feats, chunk_size)]
batch_iter = (
tqdm(batches, desc=f"Q (NUFFT, {self.kernel_method_})") if show_progress else batches
)
results = Parallel(n_jobs=n_jobs, prefer="threads")(
delayed(_batch)(batch) for batch in batch_iter
)
flat = [row for chunk in results for row in chunk]
df = pd.DataFrame(flat)
if not return_pval:
df = df.drop(columns=["P_value", "Z_score"], errors="ignore")
elif return_pval:
_apply_bh_correction(df)
return df.sort_values("Q", ascending=False).reset_index(drop=True)
def _compute_rstat_nufft(
self,
features_x: list[str] | None,
features_y: list[str] | None,
source: str,
n_jobs: int,
layer: str | None,
return_pval: bool,
chunk_size: int,
show_progress: bool = True,
) -> pd.DataFrame:
"""NUFFT dispatch for :meth:`compute_rstat`.
``var_R`` comes from :meth:`NUFFTKernel.square_trace` (already
n-point-scaled analytic default).
"""
kernel = self.kernel_
var_R = float(kernel.square_trace()) if return_pval else 0.0
null_params = {"var_R": var_R}
if features_x is None and features_y is not None:
raise ValueError("Provide features_x when features_y is specified.")
X_kept, names_kept, means_x, stds_x = self._prepare_features_nufft(
source, features_x, layer
)
if features_y is None:
X_y, names_y, means_y, stds_y = X_kept, names_kept, means_x, stds_x
else:
X_y, names_y, means_y, stds_y = self._prepare_features_nufft(source, features_y, layer)
if len(names_kept) == 0 or len(names_y) == 0:
return pd.DataFrame(
columns=["Feature_1", "Feature_2", "R", "Z_score", "P_value", "P_adj"]
)
logger.info("Testing %d x %d feature pairs via NUFFT...", len(names_kept), len(names_y))
# Densify + z-score the X block once; the NUFFT bipartite R path is
# ``Xzᵀ · kernel.Kx(Yz)``. We bypass :func:`spatial_r_test` here so
# we always get the full ``(M_x, M_y)`` cross matrix regardless of
# whether ``M_x == M_y`` (the shape-based dispatch in
# :func:`_r_test_nufft` would otherwise return a paired diagonal in
# the coincident case).
X_block = np.asarray(X_kept.todense(), dtype=np.float64)
Xz = _standardize_features(X_block)
results: list[dict[str, Any]] = []
y_chunks = [slice(i, i + chunk_size) for i in range(0, len(names_y), chunk_size)]
y_iter = (
tqdm(y_chunks, desc=f"R (NUFFT, {self.kernel_method_})") if show_progress else y_chunks
)
# Silence pyflakes about ``null_params`` now that we bypass the
# dispatch helper; ``var_R`` below is the authoritative source.
del null_params
for ysl in y_iter:
Y_block = np.asarray(X_y[:, ysl].todense(), dtype=np.float64)
Yz = _standardize_features(Y_block)
KY = kernel.Kx(Yz) # (n, M_y)
R_chunk = Xz.T @ KY # (M_x, M_y)
for i, name_x in enumerate(names_kept):
for j, name_y in enumerate(names_y[ysl]):
r = float(R_chunk[i, j])
row = {"Feature_1": name_x, "Feature_2": name_y, "R": r}
if return_pval and var_R > 0:
z = r / np.sqrt(var_R)
row["Z_score"] = z
row["P_value"] = float(2.0 * norm.sf(abs(z)))
results.append(row)
# Silence "unused" warnings for means_x/_y/stds now that grid-space
# standardization is done inside the NUFFT dispatch.
del means_x, means_y, stds_x, stds_y
df = pd.DataFrame(results)
if return_pval:
_apply_bh_correction(df)
return df.sort_values("R", ascending=False).reset_index(drop=True)