import numpy as np
import scipy.sparse as sp
from scipy.stats import chi2, ncx2, norm
from typing import Optional, Union, Tuple
from tqdm import tqdm
from quadsv.kernels import Kernel, SpatialKernel
_DELTA = 1e-10
[docs]
def liu_sf(
t: Union[float, np.ndarray],
lambs: np.ndarray,
dofs: Optional[np.ndarray] = None,
deltas: Optional[np.ndarray] = None,
kurtosis: bool = False
) -> Union[float, np.ndarray]:
"""
Liu approximation to linear combination of noncentral chi-squared variables.
Approximates the tail probability Pr(Q > t) for a weighted sum of
noncentral chi-squared random variables. This is the default p-value
computation method when exact kernel eigenvalues are known.
Parameters
----------
t : float or np.ndarray
Test statistic value(s). Can be scalar or array.
lambs : np.ndarray
Eigenvalues of the kernel matrix, shape (n_evals,).
dofs : np.ndarray, optional
Degrees of freedom for each eigenvalue. Default: ones (chi-squared).
deltas : np.ndarray, optional
Non-centrality parameters. Default: zeros (central chi-squared).
kurtosis : bool, default False
If True, uses kurtosis-based approximation for edge case.
Returns
-------
float or np.ndarray
Tail probability Pr(Q > t). Same shape as input `t`.
Notes
-----
Uses moment-based approximation with chi-squared mixture distribution.
Numerically stable for a wide range of eigenvalue spectra.
"""
if dofs is None:
dofs = np.ones_like(lambs)
if deltas is None:
deltas = np.zeros_like(lambs)
t = np.asarray(t, float)
lambs = np.asarray(lambs, float)
dofs = np.asarray(dofs, float)
deltas = np.asarray(deltas, float)
# Calculate moments of weights
lambs_pow = {i: lambs**i for i in range(1, 5)}
c = {
i: np.sum(lambs_pow[i] * dofs) + i * np.sum(lambs_pow[i] * deltas) for i in range(1, 5)
}
s1 = c[3] / (np.sqrt(c[2]) ** 3 + _DELTA)
s2 = c[4] / (c[2] ** 2 + _DELTA)
s12 = s1**2
if s12 > s2:
a = 1 / (s1 - np.sqrt(s12 - s2))
delta_x = s1 * a**3 - a**2
dof_x = a**2 - 2 * delta_x
else:
delta_x = 0
if kurtosis:
a = 1 / np.sqrt(s2)
dof_x = 1 / s2
else:
a = 1 / (s1 + _DELTA)
dof_x = 1 / (s12 + _DELTA)
mu_q = c[1]
sigma_q = np.sqrt(2 * c[2])
mu_x = dof_x + delta_x
sigma_x = np.sqrt(2 * (dof_x + 2 * delta_x))
t_star = (t - mu_q) / (sigma_q + _DELTA)
tfinal = t_star * sigma_x + mu_x
q = ncx2.sf(tfinal, dof_x, np.maximum(delta_x, 1e-9))
return q
[docs]
def compute_null_params(
kernel: Kernel,
method: str = 'welch',
k_eigen: Optional[int] = None
) -> dict[str, Union[float, np.ndarray]]:
"""
Pre-compute null distribution parameters for spatial tests.
Call this ONCE before running parallel tests on thousands of features.
Caches the expensive computations (traces, eigenvalues) for reuse.
Parameters
----------
kernel : Kernel
The spatial kernel object (SpatialKernel, FFTKernel, or compatible).
method : {'clt', 'welch', 'liu'}, default 'welch'
Null approximation method:
- 'clt': Central Limit Theorem (Z-score normal approximation)
- 'welch': Welch-Satterthwaite moment matching (fast, uses traces)
- 'liu': Liu eigenvalue-based approximation (accurate tail, slower)
k_eigen : int, optional
Number of top eigenvalues to compute if method='liu' and kernel is sparse.
If None, computes all available eigenvalues.
Returns
-------
dict[str, Union[float, np.ndarray]]
Parameters keyed by null_approx method:
- 'method': The method used
- For 'liu': 'eigenvalues' (np.ndarray of kernel eigenvalues)
- For 'welch'/'clt': 'mean_Q', 'var_Q', and for 'welch' also 'scale_g', 'df_h'
Raises
------
AssertionError
If method is not one of 'clt', 'welch', 'liu'.
Examples
--------
>>> kernel = SpatialKernel.from_coordinates(coords, method='gaussian')
>>> params = compute_null_params(kernel, method='welch')
>>> Q, pval = spatial_q_test(data, kernel, null_params=params)
"""
params = {'method': method}
assert method in ['clt', 'welch', 'liu'], "Method must be 'clt', 'welch', or 'liu'."
if method == 'liu':
# Requires eigenvalues
# If kernel is implicit/sparse, we might only get top k
vals = kernel.eigenvalues(k=k_eigen)
# Filter numerical noise
params['eigenvalues'] = vals[np.abs(vals) > 1e-9]
else:
# Compute trace and trace squared
tr_K = kernel.trace()
tr_K2 = kernel.square_trace()
params['mean_Q'] = tr_K
params['var_Q'] = 2 * tr_K2
if method == 'welch':
# Pre-calculate Welch-Satterthwaite parameters
if params['var_Q'] > 0:
params['scale_g'] = params['var_Q'] / (2 * params['mean_Q'])
params['df_h'] = (2 * params['mean_Q']**2) / params['var_Q']
else:
params['scale_g'] = 1.0
params['df_h'] = 1.0
return params
return params
[docs]
def spatial_q_test(
Xn: Union[np.ndarray, sp.spmatrix],
kernel: Kernel,
null_params: Optional[dict] = None,
return_pval: bool = True,
is_standardized: bool = False,
chunk_size: int = -1,
show_progress: bool = False
) -> Union[float, np.ndarray, Tuple[Union[float, np.ndarray], Union[float, np.ndarray]]]:
"""
Univariate spatial Q-test for detecting spatial variability.
Tests whether a spatial variable exhibits significant clustering or dispersion
using the specified kernel weighting scheme. Supports both single features and
batch processing with sparse matrices.
Parameters
----------
Xn : np.ndarray or scipy.sparse matrix
Input data array of shape (N,) for single feature or (N, M) for M features.
Can be dense numpy array or sparse matrix (CSC/CSR format recommended).
Should be standardized before calling unless is_standardized=True.
kernel : Kernel
Pre-constructed kernel object (Kernel, SpatialKernel, FFTKernel, or scipy.sparse matrix).
null_params : dict, optional
Pre-computed null distribution parameters from compute_null_params().
If None, computed on-the-fly using 'welch' method (only accurate when kernel is positive semi-definite).
return_pval : bool, default True
If True, returns (Q, pval) tuple; if False, returns Q only.
is_standardized : bool, default False
If True, skips Z-score standardization internally (assumes input is N(0,1)).
chunk_size : int, default -1
Number of features to process in each chunk. If -1, processes all features at once.
Useful for large feature sets to reduce memory usage. Must be <= M.
show_progress : bool, default False
If True, displays a progress bar during chunk processing.
Returns
-------
Q : float or np.ndarray
Test statistic value(s). Shape (M,) if input was 2D, scalar if input was 1D.
pval : float or np.ndarray, optional
Tail probability under null hypothesis. Only returned if return_pval=True.
Same shape as Q.
Raises
------
ValueError
If kernel dimensions don't match data size or if params is None and kernel is not a Kernel object.
Notes
-----
Under H₀: data is spatially independent.
Under H₁: mean-shift present.
The test statistic Q = x^T K x where K is the kernel matrix,
follows approximately a chi-squared mixture distribution:
$$Q \\sim \\sum_{i=1}^{n} \\lambda_i \\chi^2_{1}$$
where $\\lambda_i$ are the kernel eigenvalues.
By default, we approximate the null using Welch-Satterthwaite moment matching.
For more accurate tail probabilities, set null_params = {'method': 'liu'} or using
null_params = compute_null_params(method = 'liu').
Examples
--------
>>> coords = np.random.randn(100, 2)
>>> kernel = SpatialKernel.from_coordinates(coords, method='gaussian')
>>> data = np.random.randn(100)
>>> Q, pval = spatial_q_test(data, kernel)
>>> # Sparse matrix example
>>> from scipy.sparse import csr_matrix
>>> sparse_data = csr_matrix(np.random.randn(100, 1000))
>>> Q, pval = spatial_q_test(sparse_data, kernel, chunk_size=100, show_progress=True)
"""
# Handle sparse matrices
is_sparse = sp.issparse(Xn)
if is_sparse:
# Get shape from sparse matrix
n, M = Xn.shape if Xn.ndim == 2 else (Xn.shape[0], 1)
if Xn.ndim == 1 or M == 1:
Xn = Xn.reshape(-1, 1) if hasattr(Xn, 'reshape') else Xn
M = 1
else:
Xn = np.asarray(Xn).astype(float)
# Detect batch mode
# If ndim=1 -> (N, 1). If ndim=2 -> (N, M)
if Xn.ndim == 1:
Xn = Xn.reshape(-1, 1)
n, M = Xn.shape
# Determine chunking
if chunk_size == -1 or chunk_size >= M:
chunk_size = M
# Process in chunks
n_chunks = int(np.ceil(M / chunk_size))
Q_results = []
iterator = range(n_chunks)
if show_progress and n_chunks > 1:
iterator = tqdm(iterator, desc="Processing features", total=n_chunks,
bar_format='{l_bar}{bar:30}{r_bar}{bar:-30b}')
for chunk_idx in iterator:
start_idx = chunk_idx * chunk_size
end_idx = min(start_idx + chunk_size, M)
# Extract chunk
Xn_chunk = Xn[:, start_idx:end_idx]
# 1. Preprocessing (Standardization)
if is_standardized:
z = Xn_chunk
else:
# Convert sparse to dense for standardization (centering destroys sparsity anyway)
if is_sparse:
Xn_chunk = Xn_chunk.toarray()
# Compute stats along axis 0 (samples)
means = np.mean(Xn_chunk, axis=0)
stds = np.std(Xn_chunk, axis=0, ddof=1)
# Handle constant features (std=0) to avoid NaNs
valid_mask = stds > 1e-12
z = np.zeros_like(Xn_chunk)
if np.any(valid_mask):
z[:, valid_mask] = (Xn_chunk[:, valid_mask] - means[valid_mask]) / stds[valid_mask]
# 2. Compute Statistic Q = z^T K z
# Use optimized class method if available
if hasattr(kernel, 'xtKx'):
# This now supports (N, M) inputs and returns (M,)
Q_chunk = kernel.xtKx(z)
else:
# Fallback for raw matrices
if sp.issparse(kernel):
Kz = kernel.dot(z)
else:
Kz = np.dot(kernel, z)
# Efficient diagonal of z.T @ Kz without full matrix mult
# sum(z_ij * (Kz)_ij) along axis 0
Q_chunk = np.sum(z * Kz, axis=0)
Q_results.append(Q_chunk)
# Concatenate results
Q = np.concatenate([np.atleast_1d(q) for q in Q_results])
# Unwrap if M=1
if M == 1 and Q.size == 1:
Q = Q.item()
if not return_pval:
return Q
# 3. Compute P-value (Vectorized)
if null_params is None:
if not hasattr(kernel, 'trace'):
raise ValueError("If params is None, kernel must be a Kernel object.")
null_params = compute_null_params(kernel, method='welch')
null_approx_method = 'welch'
else:
null_approx_method = null_params.get('method', 'welch')
# Ensure params exist
if len(null_params) == 1 and hasattr(kernel, 'trace'):
null_params.update(compute_null_params(kernel, method=null_approx_method))
# P-value logic
if null_approx_method == 'clt':
mu_Q = null_params['mean_Q']
var_Q = null_params['var_Q']
if var_Q > 0:
z_score = (Q - mu_Q) / np.sqrt(var_Q)
pval = chi2.sf(z_score**2, df=1)
else:
pval = np.ones_like(Q, dtype=float)
elif null_approx_method == 'welch':
g = null_params['scale_g']
d = null_params['df_h']
pval = chi2.sf(Q / g, df=d)
elif null_approx_method == 'liu':
lambs = null_params['eigenvalues']
# filter numerical noise
lambs = lambs[np.abs(lambs) > _DELTA]
# liu_sf likely needs a loop if not vectorized, or use np.vectorize
# Assuming liu_sf handles array inputs or we map it:
if M > 1: # batched inputs
pval = np.array([liu_sf(q_val, lambs) for q_val in Q])
else:
pval = liu_sf(Q, lambs)
else:
pval = np.ones_like(Q, dtype=float)
# Unwrap if M=1
if M == 1 and pval.size == 1:
pval = pval.item()
return Q, pval
[docs]
def spatial_r_test(
Xn: np.ndarray,
Yn: np.ndarray,
kernel: Kernel,
null_params: Optional[dict] = None,
return_pval: bool = True,
is_standardized: bool = False
) -> Union[float, np.ndarray, Tuple[Union[float, np.ndarray], Union[float, np.ndarray]]]:
"""
Bivariate spatial R-test for correlation between two spatial variables.
Computes the pairwise spatial statistic R = x^T K y, testing for spatial
association between two variables. Supports batch processing.
Parameters
----------
Xn : np.ndarray
First input data vector or batch. Shape (N,) or (N, M).
Yn : np.ndarray
Second input data vector or batch. Shape (N,) or (N, M) matching Xn.
kernel : Kernel
Pre-constructed kernel object compatible with xtKy() method.
null_params : dict, optional
Pre-computed null distribution parameters. Should include 'var_R'.
If None, computed on-the-fly from kernel traces.
return_pval : bool, default True
If True, returns (R, pval) tuple; if False, returns R only.
is_standardized : bool, default False
If True, skips Z-score standardization internally.
Returns
-------
R : float or np.ndarray
Test statistic value(s). Shape (M,) if input was 2D, scalar if input was 1D.
pval : float or np.ndarray, optional
Tail probability under null hypothesis (two-tailed test). Only returned if return_pval=True.
Based on Normal approximation.
Raises
------
ValueError
If Xn and Yn shapes don't match or kernel dimensions are incompatible.
Notes
-----
Under H₀: the two variables are spatially uncorrelated.
The test statistic R = x^T K y is approximated as Normal under the null:
$$R \\sim N(0, \\text{Trace}(K^2))$$
P-value is computed as two-tailed: 2 × Pr(|R| > |r_obs|).
Examples
--------
>>> coords = np.random.randn(100, 2)
>>> kernel = SpatialKernel.from_coordinates(coords, method='gaussian')
>>> x_data = np.random.randn(100)
>>> y_data = np.random.randn(100)
>>> R, pval = spatial_r_test(x_data, y_data, kernel)
"""
Xn = np.asarray(Xn); Yn = np.asarray(Yn)
if Xn.ndim == 1: Xn = Xn.reshape(-1, 1)
if Yn.ndim == 1: Yn = Yn.reshape(-1, 1)
n, M = Xn.shape
assert Xn.shape == Yn.shape, "Xn and Yn shapes must match"
assert n == kernel.n, "Kernel dimensions must match data size"
# 1. Preprocessing (Standardization)
if is_standardized:
Zx, Zy = Xn, Yn
else:
# Standardize Xn
means = np.mean(Xn, axis=0)
stds = np.std(Xn, axis=0, ddof=1)
valid_mask = stds > 1e-12
Zx = np.zeros_like(Xn)
if np.any(valid_mask):
Zx[:, valid_mask] = (Xn[:, valid_mask] - means[valid_mask]) / stds[valid_mask]
# Standardize Yn
means = np.mean(Yn, axis=0)
stds = np.std(Yn, axis=0, ddof=1)
valid_mask = stds > 1e-12
Zy = np.zeros_like(Yn)
if np.any(valid_mask):
Zy[:, valid_mask] = (Yn[:, valid_mask] - means[valid_mask]) / stds[valid_mask]
# 2. Compute R = Zx.T @ K @ Zy
# We use the kernel's optimized multiplication
# K @ Zy
if hasattr(kernel, 'is_implicit') and kernel.is_implicit:
# Implicit solve: M^-1 * Zy
if sp.issparse(kernel._K):
# Check for cached LU factorization
if kernel._lu is None:
from scipy.sparse.linalg import splu
kernel._lu = splu(kernel._K.tocsc())
K_Zy = kernel._lu.solve(Zy)
else:
# Check for cached LU factorization
if kernel._lu is None:
from scipy.linalg import lu_factor, lu_solve
kernel._lu = lu_factor(kernel._K)
K_Zy = lu_solve(kernel._lu, Zy)
else:
# Explicit dot
K_Zy = kernel._K.dot(Zy)
# Dot product: Zx.T @ (K @ Zy)
# We only want the paired elements (diagonal of the result matrix)
# sum(Zx_ij * (K_Zy)_ij) along axis 0
R = np.sum(Zx * K_Zy, axis=0)
# Unwrap if M=1
if M == 1 and R.size == 1:
R = R.item()
if not return_pval:
return R
# 3. P-value (Normal Approximation)
# R ~ N(0, Tr(K K^T))
if null_params is None:
# Compute on fly (expensive)
tr_K2 = kernel.square_trace()
var_R = tr_K2
else:
var_R = null_params.get('var_R', 1.0)
sigma = np.sqrt(var_R)
# Two-sided p-value for Normal distribution
if sigma > 0:
z_score = R / sigma
pval = 2 * norm.sf(np.abs(z_score))
else:
pval = np.ones_like(R) if isinstance(R, np.ndarray) else 1.0
return R, pval