Source code for quadsv.detector

from typing import Optional, Union, Any, List

import numpy as np
import pandas as pd
import scipy.sparse as sp
from scipy.stats import norm
import scanpy as sc
from joblib import Parallel, delayed
from tqdm import tqdm

from quadsv.statistics import compute_null_params, spatial_q_test, spatial_r_test
from quadsv.kernels import Kernel, SpatialKernel


# helper function for parallel Q-stat computation
def _qstat_worker(
    X_csc: sp.csc_matrix,
    feature_indices: np.ndarray,
    kernel_obj: Kernel,
    null_params: dict[str, Union[float, np.ndarray]],
    means: np.ndarray,
    stds: np.ndarray,
    names: List[str],
    return_pval: bool,
    chunk_size: int = 64
) -> List[dict[str, Union[str, float]]]:
    """
    Worker function for parallel Q-statistic computation.
    
    Processes a batch of features using pre-computed null distribution parameters.
    Handles zero-variance features gracefully.
    
    Parameters
    ----------
    X_csc : scipy.sparse.csc_matrix
        Sparse feature matrix (N_samples, N_features_batch). Column-compressed format.
    feature_indices : np.ndarray
        Global indices of features in this batch (for looking up means/stds/names).
    kernel_obj : Kernel
        Pre-constructed kernel object (shared across workers).
    null_params : dict
        Pre-computed null distribution parameters from compute_null_params().
        Keys: 'mean_Q', 'var_Q'.
    means : np.ndarray
        Global feature means (shape: N_total_features).
    stds : np.ndarray
        Global feature standard deviations (shape: N_total_features).
    names : List[str]
        Global feature names (length: N_total_features).
    return_pval : bool
        Whether to compute p-values for each feature.
    chunk_size : int, default 64
        Number of features to process in each internal batch for vectorization.
    
    Returns
    -------
    List[dict[str, Union[str, float]]]
        Each dict contains: {'Feature': str, 'Q': float, 'P_value': float, 'Z_score': float}
    """
    results = []
    
    # Process in chunks (e.g., 64 features at a time)
    BATCH_SIZE = chunk_size
    # n_features is the number of features in THIS worker's chunk
    n_features = len(feature_indices) 
    
    mu = null_params['mean_Q']
    sigma = np.sqrt(null_params['var_Q'])
    
    for start in range(0, n_features, BATCH_SIZE):
        end = min(start + BATCH_SIZE, n_features)
        
        # 1. Determine Indices
        # 'local_slice' is for indexing X_csc (which is already a subset/chunk)
        local_slice = slice(start, end)
        
        # 'batch_global_indices' is for looking up global metadata (means, stds, names)
        batch_global_indices = feature_indices[start:end]
        
        # 2. Extract Batch: Convert sparse subset to dense (N, batch_size)
        # We index X_csc using LOCAL indices (0..chunk_size), not global IDs
        X_batch = X_csc[:, local_slice].toarray()
        
        # 3. Standardize Batch using GLOBAL statistics
        b_means = means[batch_global_indices]
        b_stds = stds[batch_global_indices]
        
        # Prevent division by zero
        valid_mask = b_stds > 1e-9
        
        Z_batch = np.zeros_like(X_batch)
        # Broadcasting: (N, M) - (M,) / (M,)
        if np.any(valid_mask):
            Z_batch[:, valid_mask] = (X_batch[:, valid_mask] - b_means[valid_mask]) / b_stds[valid_mask]

        # 4. Vectorized Q-test
        if return_pval:
            Q_batch, P_batch = spatial_q_test(Z_batch, kernel_obj, null_params, 
                                              return_pval=True, is_standardized=True)
        else:
            Q_batch = spatial_q_test(Z_batch, kernel_obj, null_params, 
                                     return_pval=False, is_standardized=True)
            P_batch = np.full(Q_batch.shape, np.nan)
            
        # Ensure outputs are arrays even if batch size is 1
        Q_batch = np.atleast_1d(Q_batch)
        P_batch = np.atleast_1d(P_batch)
        
        # 5. Compute Z-scores relative to null distribution
        Z_scores = (Q_batch - mu) / sigma if sigma > 0 else np.zeros_like(Q_batch)

        # 6. Pack results
        for k, global_idx in enumerate(batch_global_indices):
            # If the feature had 0 variance, force null results
            if not valid_mask[k]:
                res = {
                    'Feature': names[global_idx], 
                    'Q': 0.0, 
                    'P_value': 1.0, 
                    'Z_score': 0.0
                }
            else:
                res = {
                    'Feature': names[global_idx], 
                    'Q': Q_batch[k], 
                    'P_value': P_batch[k], 
                    'Z_score': Z_scores[k]
                }
            results.append(res)
            
    return results

# optimized R-stat worker with pre-computed K@Y
def _rstat_worker_chunked(
    X_csc: sp.csc_matrix,
    y_chunk_indices: np.ndarray,
    x_indices: np.ndarray,
    kernel_obj: Kernel,
    null_params: dict[str, float],
    means: np.ndarray,
    stds: np.ndarray,
    names: List[str],
    return_pval: bool
) -> List[dict[str, Union[str, float]]]:
    """
    Optimized worker for R-statistic computation with pre-computed K@Y.
    
    Pre-computes K@Y_chunk once, then reuses for all X features paired with Y chunk.
    This avoids redundant K matrix multiplications.
    
    Parameters
    ----------
    X_csc : scipy.sparse.csc_matrix
        Sparse feature matrix (N_samples, N_features).
    y_chunk_indices : np.ndarray
        Indices of Y features to process in this chunk.
    x_indices : np.ndarray
        Indices of all X features to pair with Y chunk.
    kernel_obj : Kernel
        Pre-constructed kernel object.
    null_params : dict
        Pre-computed null parameters: 'mean_R', 'var_R'.
    means : np.ndarray
        Feature means for standardization.
    stds : np.ndarray
        Feature standard deviations.
    names : List[str]
        Feature names.
    return_pval : bool
        Whether to compute p-values.
    
    Returns
    -------
    List[dict[str, Union[str, float]]]
        Results for all (x, y) pairs in this chunk.
    """
    results = []
    sigma = np.sqrt(null_params['var_R'])
    
    # 1. Extract and standardize Y chunk
    Y_chunk = X_csc[:, y_chunk_indices].toarray()  # (N, n_y)
    y_means = means[y_chunk_indices]
    y_stds = stds[y_chunk_indices]
    y_valid = y_stds > 1e-9
    
    Zy_chunk = np.zeros_like(Y_chunk)
    if np.any(y_valid):
        Zy_chunk[:, y_valid] = (Y_chunk[:, y_valid] - y_means[y_valid]) / y_stds[y_valid]
    
    # 2. Pre-compute K @ Zy_chunk (ONCE for entire chunk)
    # Handle implicit vs explicit kernels
    if hasattr(kernel_obj, 'is_implicit') and kernel_obj.is_implicit:
        # Implicit solve: M^-1 @ Zy_chunk
        if sp.issparse(kernel_obj._K):
            from scipy.sparse.linalg import splu
            if kernel_obj._lu is None:
                kernel_obj._lu = splu(kernel_obj._K.tocsc())
            KZy_chunk = kernel_obj._lu.solve(Zy_chunk)
        else:
            from scipy.linalg import lu_factor, lu_solve
            if kernel_obj._lu is None:
                kernel_obj._lu = lu_factor(kernel_obj._K)
            KZy_chunk = lu_solve(kernel_obj._lu, Zy_chunk)
    else:
        # Explicit kernel
        KZy_chunk = kernel_obj._K.dot(Zy_chunk)  # (N, n_y)
    
    # 3. Process all X features against this Y chunk
    n_x = len(x_indices)
    n_y = len(y_chunk_indices)
    
    for x_idx in x_indices:
        # Extract X feature
        x_col = X_csc[:, x_idx].toarray().flatten()  # (N,)
        
        # Standardize X
        x_mean = means[x_idx]
        x_std = stds[x_idx]
        
        if x_std > 1e-9:
            zx = (x_col - x_mean) / x_std
        else:
            zx = np.zeros_like(x_col)
        
        # Compute R for all pairs (x, y_i) in chunk
        # R_i = zx^T @ (K @ zy_i) = sum(zx * (K@zy_i))
        # Broadcast: (N,) * (N, n_y) -> (N, n_y), then sum(axis=0) -> (n_y,)
        R_batch = np.sum(zx[:, np.newaxis] * KZy_chunk, axis=0)  # (n_y,)
        
        # P-values (Normal approximation)
        if return_pval:
            Z_scores = R_batch / sigma if sigma > 0 else np.zeros_like(R_batch)
            P_batch = 2 * norm.sf(np.abs(Z_scores))
        else:
            Z_scores = np.full_like(R_batch, np.nan)
            P_batch = np.full_like(R_batch, np.nan)
        
        # Pack results for this X feature
        for y_idx, y_global_idx in enumerate(y_chunk_indices):
            name_x = names[x_idx]
            name_y = names[y_global_idx]
            
            res = {
                'Feature_1': name_x,
                'Feature_2': name_y,
                'R': R_batch[y_idx],
                'Z_score': Z_scores[y_idx],
                'P_value': P_batch[y_idx]
            }
            results.append(res)
    
    return results


[docs] class PatternDetector: """ Detect spatial or graph-based feature variability using the quadratic-form statistic. This class implements univariate (Q-test) and bivariate (R-test) spatial statistics for feature screening using kernel-based methods. Supports multiple kernel types (Gaussian, Matérn, Moran's I, graph Laplacian, CAR) and integrates with scanpy/AnnData. The core test statistic is: - Univariate: $Q = \\mathbf{x}^T \\mathbf{K} \\mathbf{x}$ for standardized feature vector $\\mathbf{x}$ and kernel $\\mathbf{K}$. - Bivariate: $R = \\mathbf{x}^T \\mathbf{K} \\mathbf{y}$ for cross-spatial correlation. Parameters ---------- adata : scanpy.AnnData Annotated data matrix containing gene expression or other features. min_cells : int, default 1 Minimum number of non-zero observations required for a feature to be tested. min_cells_frac : float, optional If provided, overrides min_cells to be this fraction of total cells. Attributes ---------- adata : scanpy.AnnData Reference to the input data object. n : int Number of samples (cells/observations). kernel_ : Kernel or None Active kernel object (set by build_kernel_* methods). kernel_method_ : str or None Method used to construct the current kernel. kernel_params_ : dict or None Parameters used to construct the current kernel. Methods ------- build_kernel_from_coordinates(coords, method='car', **kernel_params) Construct kernel from spatial coordinates. build_kernel_from_obsp(key, is_distance=False, method='car', **kernel_params) Construct kernel from precomputed distance/adjacency matrix. compute_qstat(source='var', features=None, n_jobs=-1, layer=None, return_pval=True) Compute univariate spatial statistics. compute_rstat(features_x=None, features_y=None, source='var', n_jobs=-1, layer=None, return_pval=True) Compute bivariate spatial statistics. Examples -------- >>> import anndata >>> adata = anndata.read_h5ad('data.h5ad') >>> detector = PatternDetector(adata, min_cells=10) >>> detector.build_kernel_from_coordinates(adata.obsm['spatial'], method='gaussian', bandwidth=2.0) >>> q_results = detector.compute_qstat(source='var', n_jobs=4) """ _available_kernels = ['gaussian', 'matern', 'moran', 'graph_laplacian', 'car']
[docs] def __init__( self, adata: Any, min_cells: int = 1, min_cells_frac: Optional[float] = None ) -> None: """ Initializes the PatternDetector with an AnnData object. Parameters ---------- adata : scanpy.AnnData Annotated data matrix. min_cells : int, default 1 Minimum number of cells with non-zero expression for feature inclusion. min_cells_frac : float, optional If provided, overrides min_cells to be max(1, int(min_cells_frac * n_cells)). """ self.adata = adata self.n = adata.shape[0] if min_cells_frac is not None: self.min_cells = max(1, int(min_cells_frac * self.n)) else: self.min_cells = min(min_cells, self.n) # Minimum cells for feature inclusion self.kernel_ = None # Stores the active Kernel object self.kernel_method_ = None # Stores the method used to build the kernel self.kernel_params_ = None # Stores parameters used to build the kernel
def _get_default_params(self, method: str) -> dict[str, Any]: """ Returns default parameters for specific kernel methods. Parameters ---------- method : str Kernel method name. Should be one of _available_kernels. Returns ------- dict[str, Any] Method defaults: bandwidth (gaussian/matern), nu (matern), neighbor_degree (moran/graph_laplacian/car), rho (car). """ method_defaults = { 'gaussian': {'bandwidth': 2.0}, 'matern': {'bandwidth': 2.0, 'nu': 1.5}, 'moran': {'k_neighbors': 4}, 'graph_laplacian': {'k_neighbors': 4}, 'car': {'rho': 0.9, 'k_neighbors': 4, 'standardize': False}, } return method_defaults.get(method, {})
[docs] def build_kernel_from_coordinates( self, coords: np.ndarray, method: str = 'car', **kernel_params: Any ) -> None: """ Builds a spatial kernel from an array of coordinates. Constructs distance-based kernel matrix from sample coordinates using specified method. Updates self.kernel_ and self.kernel_params_. Parameters ---------- coords : np.ndarray Coordinate array of shape (n_samples, n_dims). Typically 2D spatial coordinates. method : str, default 'car' Kernel construction method. Must be one of: 'gaussian', 'matern', 'moran', 'graph_laplacian', 'car'. kernel_params : dict, optional Additional kernel-specific parameters (e.g., bandwidth, rho, k_neighbors, nu). Raises ------ ValueError If method not in available kernels or coordinate shape mismatch. Notes ----- Common parameters by method: - gaussian: bandwidth (float, default 1.0) - matern: bandwidth (float, default 1.0), nu (float, default 1.5) - moran: k_neighbors (int, default 10) - graph_laplacian: k_neighbors (int, default 10) - car: rho (float in [0, 1), default 0.9), k_neighbors (int, default 10) Examples -------- >>> coords = np.random.randn(1000, 2) >>> detector.build_kernel_from_coordinates(coords, method='gaussian', bandwidth=1.5) """ coords = np.asarray(coords) if method not in self._available_kernels: raise ValueError(f"Method '{method}' not recognized. Must be one of {self._available_kernels}.") if coords.shape[0] != self.n: raise ValueError(f"Coordinate shape {coords.shape} does not match adata shape ({self.n},).") print(f"Building {method} kernel from coordinates (n_samples={self.n})...") if not kernel_params: # Use default parameters if none provided kernel_params_ = self._get_default_params(method).copy() else: # Update provided params with defaults for missing keys kernel_params_ = self._get_default_params(method).copy() for key, value in kernel_params.items(): if key in kernel_params_: kernel_params_[key] = value else: raise ValueError(f"Unknown parameter '{key}' for method '{method}'") self.kernel_ = SpatialKernel.from_coordinates( coords, method=method, **kernel_params_ ) self.kernel_method_ = method self.kernel_params_ = kernel_params_
[docs] def build_kernel_from_obsp( self, key: str, is_distance: bool = False, method: str = 'car', **kernel_params: Any ) -> None: """ Builds a graph kernel from a precomputed matrix in adata.obsp. Performs necessary transformations (e.g., distance to Gaussian weights, adjacency to graph Laplacian) before initializing the kernel object. Isolated nodes (zero degree) are automatically removed. Parameters ---------- key : str Key in adata.obsp containing the precomputed matrix. is_distance : bool, default False If True, matrix is treated as distances. Otherwise treated as adjacency/connectivity. method : str, default 'car' Kernel construction method. Must be one of: 'gaussian', 'matern', 'moran', 'graph_laplacian', 'car', or 'precomputed'. - 'precomputed': Uses obsp[key] directly as kernel matrix K (no transformation). - Other methods: Apply distance/adjacency transformations. kernel_params : dict, optional Additional kernel-specific parameters (e.g., bandwidth, rho, nu). Raises ------ KeyError If key not found in adata.obsp. ValueError If method not recognized or invalid for matrix type (e.g., Gaussian on adjacency). Notes ----- For distance matrices (is_distance=True): - gaussian: $K = \\exp(-d^2 / (2 \\sigma^2))$ where $\\sigma$ is bandwidth. - matern: Matérn kernel with nu and bandwidth parameters. For adjacency matrices (is_distance=False): - Symmetric normalization: $W_{norm} = D^{-1/2} W D^{-1/2}$. - moran: Uses W_norm directly as kernel. - graph_laplacian: $K = I - W_{norm}$. - car: $K = (I - \\rho W_{norm})^{-1}$ (implicit kernel). Isolated nodes (degree=0) are automatically removed from the data. Examples -------- >>> detector.build_kernel_from_obsp('distances', is_distance=True, method='gaussian', bandwidth=5.0) """ if key not in self.adata.obsp: raise KeyError(f"Matrix key '{key}' not found in adata.obsp") matrix = self.adata.obsp[key] if method not in self._available_kernels + ['precomputed']: raise ValueError(f"Method '{method}' not recognized. Must be one of {self._available_kernels} or 'precomputed'.") # If the user says 'precomputed', they imply the matrix IS the kernel K if method == 'precomputed': print(f"Using obsp['{key}'] directly as kernel matrix (n_samples={self.n})...") self.kernel_ = SpatialKernel.from_matrix(matrix, is_inverse=False) self.kernel_params_ = kernel_params self.kernel_method_ = method return print(f"Building {method} kernel from obsp['{key}'] (is_distance={is_distance}, n_samples={self.n})...") # Set kernel configuration parameters self.kernel_method_ = method if not kernel_params: # Use default parameters if none provided kernel_params_ = self._get_default_params(method).copy() else: # Update provided params with defaults for missing keys kernel_params_ = self._get_default_params(method).copy() for key, value in kernel_params.items(): if key in kernel_params_: kernel_params_[key] = value else: raise ValueError(f"Unknown parameter '{key}' for method '{method}'") # --- Distance Based Transformations --- if is_distance: if method == 'gaussian': bw = kernel_params_['bandwidth'] # K = exp(-d^2 / 2bw^2) if sp.issparse(matrix): matrix = matrix.toarray() # Gaussian usually requires dense K = np.exp(-matrix**2 / (2 * bw**2)) self.kernel_ = SpatialKernel.from_matrix(K, is_inverse=False) self.kernel_params_ = {'bandwidth': bw} elif method == 'matern': from scipy.special import kv, gamma bw = kernel_params_['bandwidth'] nu = kernel_params_['nu'] if sp.issparse(matrix): matrix = matrix.toarray() dists = matrix.copy() dists[dists == 0] = 1e-15 factor = (np.sqrt(2 * nu) * dists) / bw K = (2**(1 - nu) / gamma(nu)) * (factor**nu) * kv(nu, factor) np.fill_diagonal(K, 1.0) self.kernel_ = SpatialKernel.from_matrix(K, is_inverse=False) self.kernel_params_ = {'bandwidth': bw, 'nu': nu} else: raise ValueError(f"Method {method} not supported for distance matrices.") # --- Connectivity Based Transformations --- else: # Assume the input matrix is W (adjacency/connectivity) W = matrix if not sp.issparse(W): W = sp.csr_matrix(W) # Remove isolated cells (zero-degree nodes) row_sums_raw = np.array(W.sum(axis=1)).flatten() keep_mask = row_sums_raw > 0 if not np.all(keep_mask): removed = int((~keep_mask).sum()) print(f"Removing {removed} isolated samples with zero degree from adjacency matrix...") W = W[keep_mask][:, keep_mask] self.adata = self.adata[keep_mask].copy() self.n = W.shape[0] self.min_cells = min(self.min_cells, self.n) # Symmetrize and symmetric normalization W = W.astype(float) W_sym = 0.5 * (W + W.T) row_sums = np.array(W_sym.sum(axis=1)).flatten() row_sums[row_sums == 0] = 1.0 inv_D_sqrt = sp.diags(1.0 / np.sqrt(row_sums)) W_norm = inv_D_sqrt @ W_sym @ inv_D_sqrt if method == 'moran': # Already symmetric and normalized K = W_norm self.kernel_ = SpatialKernel.from_matrix(K, is_inverse=False) self.kernel_params_ = {} elif method == 'graph_laplacian': # K = I - W_norm I = sp.eye(self.n, format='csr') K = I - W_norm self.kernel_ = SpatialKernel.from_matrix(K, is_inverse=False) self.kernel_params_ = {} elif method == 'car': # This is the "Implicit" case. # The kernel K = (I - rho*W)^-1. # We construct M = (I - rho*W) and tell SpatialKernel it is the INVERSE. rho = kernel_params_['rho'] standardize = kernel_params_['standardize'] I = sp.eye(self.n, format='csc') M = I - rho * W_norm # M is the inverse of K # We pass M and set is_inverse=True. SpatialKernel will handle # standardizing the precision to ensure diag(K)=1 if requested. self.kernel_ = SpatialKernel.from_matrix( M, is_inverse=True, method='car', rho=rho, standardize=standardize, ) self.kernel_params_ = {'rho': rho, 'standardize': standardize} else: raise ValueError(f"Method {method} not supported for connectivity matrices.")
def _prepare_data( self, source: str, keys: Optional[List[str]], min_cells: int, layer: Optional[str] = None ) -> tuple[sp.csr_matrix, List[str], np.ndarray, np.ndarray]: """ Prepare anndata feature data for testing (standardization, filtering). Extracts features from .obs or .var, applies quality filters (min non-zero count), handles categorical features (one-hot encoding), and computes per-feature statistics. Parameters ---------- source : str Feature source: 'obs' or 'var'. keys : Optional[List[str]] Feature names to extract. If None, uses all features in source. min_cells : int Minimum number of non-zero values required per feature. layer : Optional[str] If source='var', which layer to use. If None, uses .X. Returns ------- X_csc : scipy.sparse.csc_matrix Sparse feature matrix (n_samples, n_features), column-compressed. names : List[str] Feature names in order matching X_csc columns. means : np.ndarray Per-feature means (shape: n_features). stds : np.ndarray Per-feature standard deviations (shape: n_features). Notes ----- - Categorical features in .obs are automatically one-hot encoded. - Features with zero variance or fewer than min_cells non-zeros are filtered. - Sparse matrices are preserved for memory efficiency. """ if source == 'obs': # check if keys are in obs if keys is not None: valid = [k for k in keys if k in self.adata.obs.columns] if len(valid) == 0: raise ValueError("None of the specified keys found in adata.obs.") adata_tmp = self.adata.obs[valid].copy() else: raise ValueError("Keys must be provided when feature source is 'obs'.") # check if .obs[keys] are char/categorical if any(adata_tmp.dtypes == 'object') or any(adata_tmp.dtypes == 'category'): print("Categorical features detected in .obs[keys]; performing one-hot encoding...") # one-hot encode categorical variables while keeping others unchanged dummies = pd.get_dummies(adata_tmp) names = dummies.columns.tolist() X_dense = dummies.values.astype(np.float32) means = X_dense.mean(axis=0) stds = X_dense.std(axis=0) X_csc = sp.csc_matrix(X_dense) else: # All numeric - no encoding needed names = adata_tmp.columns.tolist() X_dense = adata_tmp.values.astype(np.float32) means = X_dense.mean(axis=0) stds = X_dense.std(axis=0) X_csc = sp.csc_matrix(X_dense) elif source == 'var': # check if keys are in var if keys is not None: valid = [g for g in keys if g in self.adata.var_names] adata_tmp = self.adata[:, valid].copy() else: adata_tmp = self.adata.copy() # extract feature matrix if layer is not None: X = adata_tmp.layers[layer] else: X = adata_tmp.X names = adata_tmp.var_names.tolist() # compute means and stds if sp.issparse(X): means = np.array(X.mean(axis=0)).flatten() X2 = X.copy(); X2.data **= 2 means2 = np.array(X2.mean(axis=0)).flatten() var = means2 - (means ** 2) var[var < 0] = 0 stds = np.sqrt(var) X_csc = X.tocsc() else: means = np.mean(X, axis=0) stds = np.std(X, axis=0) X_csc = sp.csc_matrix(X) else: raise ValueError("Source must be either 'obs' or 'var'.") # Filter constant features and those with too few non-zeros to_keep = (stds > 0) & (X_csc.getnnz(axis=0) >= min_cells) X_csc = X_csc[:, to_keep] names = [names[i] for i in range(len(names)) if to_keep[i]] means = means[to_keep] stds = stds[to_keep] return X_csc, names, means, stds def _apply_bh_correction(self, df): """Applies Benjamini-Hochberg correction to P_value column in place.""" pvals = df['P_value'].to_numpy() valid_mask = np.isfinite(pvals) m = valid_mask.sum() if m == 0: return p_sorted_idx = np.argsort(pvals[valid_mask]) p_sorted = pvals[valid_mask][p_sorted_idx] ranks = np.arange(1, m + 1) bh_vals = (p_sorted * m / ranks) bh_adj = np.minimum.accumulate(bh_vals[::-1])[::-1] bh_adj = np.clip(bh_adj, 0, 1) p_adj = np.full_like(pvals, np.nan) p_adj_indices = np.where(valid_mask)[0][p_sorted_idx] p_adj[p_adj_indices] = bh_adj df['P_adj'] = p_adj
[docs] def compute_qstat( self, source: str = 'var', features: Optional[List[str]] = None, n_jobs: int = -1, layer: Optional[str] = None, return_pval: bool = True, chunk_size: int = 64 ) -> pd.DataFrame: """ Compute univariate spatial Q-statistic for selected features. Tests each feature for significant spatial clustering or dispersion using the pre-built kernel. Parallelizes across features and applies Benjamini-Hochberg multiple testing correction. Parameters ---------- source : str, default 'var' Feature source: 'var' (genes) or 'obs' (metadata columns). features : Optional[List[str]] Feature names to test. If None, tests all features in source. n_jobs : int, default -1 Number of parallel jobs. -1 uses all available cores; 1 for sequential. layer : Optional[str] If source='var', which layer to use (e.g., 'raw', 'log1p'). If None, uses .X. return_pval : bool, default True If True, returns p-values and BH-corrected p-values. If False, returns Q only. chunk_size : int, default 64 Number of features to process in each internal batch for vectorization. If n_jobs > 1, each worker processes chunk_size features at a time. May be adjusted for memory/speed tradeoff. Returns ------- df : pd.DataFrame Results sorted by Q (descending). Columns: - Feature: feature name - Q: test statistic (univariate spatial variability) - Z_score: standardized Q by null mean/std - P_value: tail probability under null (if return_pval=True) - P_adj: Benjamini-Hochberg adjusted p-value (if return_pval=True) Raises ------ ValueError If kernel not initialized, or source is invalid. Notes ----- Under H₀: feature has no spatial structure. Under H₁: significant spatial signal (clustering or dispersion). Zero-variance features are assigned Q=0, P_value=1.0. Examples -------- >>> detector.build_kernel_from_coordinates(adata.obsm['spatial']) >>> results = detector.compute_qstat(source='var', features=['Gene1', 'Gene2'], n_jobs=-1) >>> top_genes = results.iloc[:10] """ # 1. Ensure Kernel Exists if self.kernel_ is None: raise ValueError(f"Kernel not initialized. Please run 'build_kernel_from_coordinates' or 'build_kernel_from_obsp' first.") # 2. Compute Null Distribution null_method = 'clt' if self.kernel_method_ in ['moran'] else 'welch' print(f"Computing null distribution approximation (method={null_method})...") null_params = compute_null_params(self.kernel_, method=null_method) # 3. Prepare Data X_csc, names, means, stds = self._prepare_data( source=source, keys=features, min_cells=self.min_cells, layer=layer ) # 4. Parallel Execution n_feats = len(names) if n_jobs == -1: import os n_jobs = os.cpu_count() or 1 indices = np.arange(n_feats) chunks = np.array_split(indices, max(n_jobs * 4, 1)) print(f"Testing {n_feats} features using {n_jobs} cores...") results_list = Parallel(n_jobs=n_jobs)( delayed(_qstat_worker)( X_csc[:, chunk_idxs], chunk_idxs, self.kernel_, null_params, means, stds, names, return_pval, chunk_size ) for chunk_idxs in tqdm(chunks, desc=f"Q ({self.kernel_method_})", bar_format='{l_bar}{bar:30}{r_bar}{bar:-30b}') ) # 5. Aggregate Results flat_results = [item for sublist in results_list for item in sublist] df = pd.DataFrame(flat_results).set_index('Feature') if not return_pval: df = df.drop(columns=['P_value']) # 6. Multiple testing correction (Benjamini-Hochberg) if return_pval: self._apply_bh_correction(df) return df.sort_values(by='Q', ascending=False)
[docs] def compute_rstat( self, features_x: Optional[List[str]] = None, features_y: Optional[List[str]] = None, source: str = 'var', n_jobs: int = -1, layer: Optional[str] = None, return_pval: bool = True, chunk_size: int = 64 ) -> pd.DataFrame: """ Compute bivariate spatial R-statistic (cross-spatial correlation) for feature pairs. Tests for significant spatial co-variation between pairs of features using the pre-built kernel. Supports symmetric (all pairs within one set) or bipartite (all X vs Y pairs) modes. Parallelizes computation and applies multiple testing correction. Parameters ---------- features_x : Optional[List[str]] Feature names for the first set. If None and features_y is None, uses all features (symmetric mode). features_y : Optional[List[str]] Feature names for the second set. If None, computes all pairwise within features_x. If provided, computes all X vs Y pairs (bipartite mode). source : str, default 'var' Feature source: 'var' (genes) or 'obs' (metadata columns). n_jobs : int, default -1 Number of parallel jobs. -1 uses all available cores; 1 for sequential. layer : Optional[str] If source='var', which layer to use (e.g., 'raw', 'log1p'). If None, uses .X. return_pval : bool, default True If True, returns p-values and BH-corrected p-values. If False, returns R only. chunk_size : int, default 64 Number of Y features to process together, pre-computing K@Y_chunk. Larger values use more memory but fewer K matrix multiplications. Returns ------- df : pd.DataFrame Results sorted by |Z_score| (descending). Columns: - Feature_1: name of first feature - Feature_2: name of second feature - R: test statistic (bivariate spatial correlation, range approximately [-1, 1]) - Z_score: standardized R by null mean/std - P_value: two-tailed p-value under null (if return_pval=True) - P_adj: Benjamini-Hochberg adjusted p-value (if return_pval=True) Raises ------ ValueError If kernel not initialized, features_x is None when features_y is provided, or no valid pairs generated. Notes ----- Under H₀: features are spatially independent. Under H₁: significant spatial co-clustering or co-dispersion. Different than quadsv.statistics.spatial_r_test, this function always returns R-statistics of all requested feature pairs in the symmetric mode (features_y = None). Thus, for features_x = [A, B, C], the output will contain (A, A), (A, B), (A, C), (B, A), (B, B), (B, C), (C, A), (C, B), (C, C). P-value calculation uses Normal approximation based on Trace(K²). Zero-variance features are handled gracefully (assigned R=0, P=1). Examples -------- >>> detector.build_kernel_from_coordinates(adata.obsm['spatial']) >>> # All pairwise correlations within gene set >>> results = detector.compute_rstat(features_x=['Gene1', 'Gene2', 'Gene3'], n_jobs=-1) >>> # Cross-correlation between two gene sets >>> results = detector.compute_rstat( ... features_x=['Gene1', 'Gene2'], ... features_y=['Gene3', 'Gene4'], ... n_jobs=-1 ... ) """ if self.kernel_ is None: raise ValueError("Kernel not initialized.") # 1. Compute Null Params for R # We need Trace(KK^T) which is Trace(K^2) for symmetric K # compute_null_params already computes var_Q = 2*Tr(K^2). # var_R = Tr(K^2) = var_Q / 2. print("Computing null distribution for R statistic...") q_null = compute_null_params(self.kernel_, method='clt') null_params = { 'mean_R': 0.0, 'var_R': q_null['var_Q'] / 2.0 # Derive from existing trace calculation } # 2. Prepare Data # We load all unique features needed if features_y is None: unique_feats = features_x mode = 'symmetric' else: if features_x is None: raise ValueError("features_x cannot be None.") unique_feats = list(set(features_x) | set(features_y)) mode = 'bipartite' X_csc, names, means, stds = self._prepare_data( source=source, keys=unique_feats, min_cells=self.min_cells, layer=layer ) # Map names to indices in the prepared matrix name_to_idx = {n: i for i, n in enumerate(names)} # 3. Generate Y chunks (for pre-computing K@Y) if mode == 'symmetric': valid_feats = [f for f in features_x if f in name_to_idx] valid_y_indices = [name_to_idx[f] for f in valid_feats] else: valid_y = [f for f in features_y if f in name_to_idx] valid_y_indices = [name_to_idx[f] for f in valid_y] # Chunk Y indices for K@Y pre-computation y_chunks = [] for start in range(0, len(valid_y_indices), chunk_size): end = min(start + chunk_size, len(valid_y_indices)) y_chunks.append(valid_y_indices[start:end]) # 4. Generate X features list if mode == 'symmetric': valid_x_indices = [name_to_idx[f] for f in valid_feats] else: valid_x = [f for f in features_x if f in name_to_idx] valid_x_indices = [name_to_idx[f] for f in valid_x] if len(valid_x_indices) == 0 or len(valid_y_indices) == 0: raise ValueError("No valid features found after filtering.") # 5. Parallel Execution: Process each Y chunk if n_jobs == -1: import os n_jobs = os.cpu_count() or 1 print(f"Testing {len(valid_x_indices)} x {len(valid_y_indices)} pairs using {n_jobs} cores with chunk_size={chunk_size}...") results_list = Parallel(n_jobs=n_jobs, prefer="threads")( delayed(_rstat_worker_chunked)( X_csc, y_chunk, valid_x_indices, self.kernel_, null_params, means, stds, names, return_pval ) for y_chunk in tqdm(y_chunks, desc=f"R ({self.kernel_method_})", bar_format='{l_bar}{bar:30}{r_bar}{bar:-30b}') ) # 6. Aggregate flat_results = [item for sublist in results_list for item in sublist] df = pd.DataFrame(flat_results) # 7. Multiple testing correction (Benjamini-Hochberg) if return_pval: self._apply_bh_correction(df) return df.sort_values(by='Z_score', key=abs, ascending=False)