Source code for quadsv.kernels

import numpy as np
import warnings
import scipy.sparse as sp
from scipy.spatial.distance import pdist, squareform
from scipy.linalg import inv, solve, lu_factor, lu_solve
from scipy.sparse.linalg import spsolve, splu
from sklearn.neighbors import NearestNeighbors
from abc import ABC, abstractmethod
from scipy.special import kv, gamma
from tqdm import tqdm
from typing import Optional, Union, Tuple, Any

[docs] class Kernel(ABC): """ Abstract base class for spatial kernels. Handles dense, sparse, and implicit (operator-based) kernels. Implements a dual-mode system that switches between explicit (dense) and implicit (sparse) computation based on problem size. Attributes ---------- n : int Number of observations (samples). method : str The kernel method ('gaussian', 'matern', 'moran', 'graph_laplacian', 'car'). is_implicit : bool If True, uses sparse solver for implicit kernels (N > 5000). If False, uses dense matrix operations. params : dict Additional kernel parameters (bandwidth, nu, rho, etc.). """
[docs] def __init__(self, n: int, method: str = 'gaussian', **kwargs) -> None: """ Initialize the Kernel. Parameters ---------- n : int Number of observations. method : str, default 'gaussian' Kernel method to use. **kwargs : dict Additional kernel-specific parameters. """ self.n = n self.method = method self.params = kwargs # Threshold for switching between dense realization and implicit solving self.implicit_threshold = 5000 self.is_implicit = False self._lu = None # Cache for sparse LU factorization if needed # _K can be the kernel matrix OR the inverse/precision matrix depending on is_implicit self._K = self._build_kernel() self.spectrum = None # Lazy-evaluated eigenvalues cache
@abstractmethod def _build_kernel(self): """Constructs the kernel matrix or its inverse operator.""" pass def _format_params(self): """Format kernel params safely without dumping large arrays/matrices.""" if not self.params: return "None" parts = [] for k, v in self.params.items(): try: if isinstance(v, np.ndarray): parts.append(f"{k}=array(shape={v.shape}, dtype={v.dtype})") elif sp.issparse(v): parts.append(f"{k}=sparse(shape={v.shape}, nnz={v.nnz})") else: parts.append(f"{k}={v}") except Exception: parts.append(f"{k}=?") return ", ".join(parts) def __repr__(self): return ( f"<Kernel method={self.method} n={self.n} implicit={self.is_implicit} " f"threshold={self.implicit_threshold} params={{ {self._format_params()} }}>" ) def __str__(self): return ( "Kernel\n" f"- Method: {self.method}\n" f"- Samples: {self.n}\n" f"- Implicit: {self.is_implicit} (threshold={self.implicit_threshold})\n" f"- Params: {self._format_params()}" )
[docs] def realization(self) -> np.ndarray: """ Return the realized (N, N) kernel matrix. Returns ------- np.ndarray Dense (N, N) kernel matrix. Warnings -------- If is_implicit is True, this forces expensive dense inversion of the precision matrix. Use xtKx() and trace() for implicit kernels instead. """ if self.is_implicit: # _K is M = K^-1. We need to invert it. if sp.issparse(self._K): return inv(self._K.toarray()) else: return inv(self._K) return self._K
[docs] def eigenvalues(self, k: Optional[int] = None) -> np.ndarray: """ Compute the k largest eigenvalues of the kernel matrix. Parameters ---------- k : int, optional Number of largest eigenvalues to return. If None, returns all. Returns ------- np.ndarray Eigenvalues sorted in descending order, shape (k,) or (n,). """ if self.spectrum is not None: # check if we have enough cached if k is None and len(self.spectrum) == self.n: return self.spectrum elif k is not None and len(self.spectrum) >= k: return self.spectrum[-k:] if self.is_implicit: # Implicit case with kernel inverse: Use sparse methods from scipy.sparse.linalg import eigsh k = k if k is not None else max(6, self.n - 2) vals, _ = eigsh(self._K, k=k, which='SM') # Smallest magnitude of K^-1 = largest of K vals = np.real(1.0 / vals) else: # Handle kernel matrix directly if sp.issparse(self._K): from scipy.sparse.linalg import eigsh k = k if k is not None else max(6, self.n - 2) vals, _ = eigsh(self._K, k=k, which='LM') vals = np.real(vals) else: vals = np.linalg.eigvalsh(self._K) # ascending order if k is not None: vals = np.sort(vals)[-k:] self.spectrum = vals return vals
[docs] def xtKx(self, x: Union[np.ndarray, sp.spmatrix]) -> Union[float, np.ndarray]: """ Efficiently compute the quadratic form x^T K x. Handles both single vectors and batches. Uses implicit solvers for large N (implicit_threshold) to avoid dense matrix operations. Supports sparse input matrices without densification. Parameters ---------- x : np.ndarray or scipy.sparse matrix Input vector of shape (N,) or batch of vectors (N, M). Can be dense numpy array or sparse matrix (CSC/CSR format). Returns ------- float or np.ndarray Quadratic form value(s). Scalar if input was 1D, shape (M,) if input was 2D. Notes ----- For implicit kernels, uses sparse solve instead of matrix inversion. For sparse input, maintains sparsity throughout computation where possible. """ # Handle sparse input is_sparse_input = sp.issparse(x) # Allow x to be a matrix (N, n_vectors) or vector (N,) # If vector, reshape to (N, 1) to ensure matrix math works if is_sparse_input: if x.ndim == 1 or (hasattr(x, 'shape') and len(x.shape) == 1): x_in = x.reshape(-1, 1) else: x_in = x n_cols = x_in.shape[1] else: x_in = x if x.ndim > 1 else x.reshape(-1, 1) n_cols = x_in.shape[1] if self.is_implicit: # Case: CAR model (Large N) # We want x^T M^-1 x # Solve My = x if sp.issparse(self._K): # Cache LU factorization for efficiency if self._lu is None: self._lu = splu(self._K.tocsc()) if is_sparse_input: # Convert to dense for solver (most efficient for sparse solvers) y = self._lu.solve(x_in.toarray()) else: y = self._lu.solve(x_in) else: # Cache LU factorization for efficiency if self._lu is None: self._lu = lu_factor(self._K) if is_sparse_input: y = lu_solve(self._lu, x_in.toarray()) else: y = lu_solve(self._lu, x_in) # Compute x^T @ y efficiently if is_sparse_input: # For sparse x, use multiply and sum result = np.asarray(x_in.multiply(y).sum(axis=0)).flatten() else: result_matrix = x_in.T.dot(y) # (M, M) result = np.diag(result_matrix) if n_cols > 1 else result_matrix.item() else: # Standard Case: K is realized as a dense or sparse matrix if is_sparse_input: # Sparse @ Dense matrix multiplication Kx = self._K.dot(x_in.toarray() if hasattr(x_in, 'toarray') else x_in) # x^T @ Kx with sparse x result = np.asarray(x_in.multiply(Kx).sum(axis=0)).flatten() else: Kx = self._K.dot(x_in) result_matrix = x_in.T.dot(Kx) # (M, M) result = np.diag(result_matrix) if n_cols > 1 else result_matrix.item() # Return appropriate shape if n_cols > 1: return result if isinstance(result, np.ndarray) else np.diag(result) else: return result if np.isscalar(result) else result.item()
def _get_rvs_trace_cache(self, n_vectors=15): """Generate random vectors for trace estimation caching.""" if not self.is_implicit: raise RuntimeError("Trace caching is only for implicit kernels.") # Check if cache exists if hasattr(self, '_trace_rvs_cache'): if self._trace_rvs_cache['n_vectors'] == n_vectors: return self._trace_rvs_cache else: warnings.warn("Updating trace random vectors cache with different n_vectors.", RuntimeWarning) # Hutchinson's trick random vectors rvs = np.random.choice([-1, 1], size=(self.n, n_vectors)) # Batched Solve: Solve M * Y = rvs # spsolve can handle multiple RHS if passed as dense 2D array if sp.issparse(self._K): if self._lu is None: self._lu = splu(self._K.tocsc()) Y = self._lu.solve(rvs) # Ensure Y is 2D even if n_vectors=1 if Y.ndim == 1: Y = Y.reshape(-1, 1) else: if self._lu is None: self._lu = lu_factor(self._K) Y = lu_solve(self._lu, rvs) # Cache for future use self._trace_rvs_cache = {'n_vectors': n_vectors, 'rvs': rvs, 'Y': Y} return self._trace_rvs_cache
[docs] def trace(self) -> float: """ Compute the trace of the kernel matrix Tr(K). For implicit kernels, uses Hutchinson's trick with random vectors for efficient O(N) estimation instead of O(N³) eigendecomposition. Returns ------- float Trace of the kernel matrix. """ if self.is_implicit: # Trace estimation using Hutchinson's trick n_vectors = 15 cache = self._get_rvs_trace_cache(n_vectors) # Hutchinson's estimator is (1/n_vectors) * sum(v_i^T * y_i) rvs = cache['rvs'] Y = cache['Y'] return np.sum(rvs * Y) / n_vectors elif sp.issparse(self._K): return self._K.diagonal().sum() else: return np.trace(self._K)
[docs] def square_trace(self) -> float: """ Compute the trace of the squared kernel Tr(K²). Used for variance estimation in statistical tests. For implicit kernels, uses Hutchinson's trick for efficient O(N) estimation. Returns ------- float Trace of K squared. """ if self.is_implicit: # Trace(K^2) estimation n_vectors = 15 cache = self._get_rvs_trace_cache(n_vectors) # Trace(K^2) ~= (1/m) * sum( ||K v_i||^2 ) # Since Y = K * rvs, we just need sum(Y^2) Y = cache['Y'] return np.sum(Y**2) / n_vectors elif sp.issparse(self._K): return self._K.power(2).sum() else: return np.sum(self._K**2)
def _compute_inv_diag(self, M): """Compute diagonal of K = M^{-1} using batched solves to save memory. For sparse M, uses batched splu solves on chunks of the identity matrix. This avoids allocating a dense N x N inverse matrix. """ n = self.n if sp.issparse(M): # Factorize once if self._lu is None: self._lu = splu(M.tocsc()) lu = self._lu # Result array for the diagonal diag_vals = np.zeros(n) # Determine batch size (100-1000 is usually optimal for cache locality) batch_size = 128 with tqdm(total=n, desc="Computing diagonal of K", bar_format='{l_bar}{bar:10}{r_bar}{bar:-10b}') as pbar: for i in range(0, n, batch_size): end = min(i + batch_size, n) current_batch_size = end - i # Create the RHS block: A slice of the Identity matrix. # This corresponds to columns i through end of I. # Shape: (n, current_batch_size) # We construct it directly to save memory. b = np.zeros((n, current_batch_size)) # Fill the specific rows that correspond to the diagonal 1s # For the k-th column in this batch (which corresponds to global column i+k), # the 1 is at row i+k. b[i:end, :] = np.eye(current_batch_size) # Solve M * x = b => x = M^{-1} * b x = lu.solve(b) # We only need the diagonal elements of M^{-1}. # In the result x (shape n, batch), the diagonal elements of M^{-1} # are located at x[i, 0], x[i+1, 1], ..., x[end-1, end-1-i] # This corresponds to the diagonal of the square block starting at row i. diag_vals[i:end] = x[i:end, :].diagonal() # Update the progress bar by the actual number of items processed in this batch pbar.update(current_batch_size) return diag_vals else: # Dense case: Direct inversion (slow, should work with kernel matrix directly) warnings.warn( "Dense precision inversion invoked. Consider using the kernel matrix directly.", RuntimeWarning ) Minv = inv(M) return np.diag(Minv) def _standardize_precision(self, M): """Scale precision M so that covariance K has unit diagonal, without forming dense K when implicit.""" diag_K = self._compute_inv_diag(M).copy() diag_K[diag_K <= 0] = 1e-12 s = 1.0 / np.sqrt(diag_K) if sp.issparse(M): S_inv = sp.diags(1.0 / s) return S_inv @ M @ S_inv else: S_inv = np.diag(1.0 / s) return S_inv @ M @ S_inv
[docs] def __getstate__(self): """ Custom pickling behavior: exclude unpicklable SuperLU objects. """ state = self.__dict__.copy() # Remove the cached LU factorization because SuperLU objects cannot be pickled. # Workers will re-compute this locally. state['_lu'] = None return state
[docs] def __setstate__(self, state): """ Restore state and ensure _lu is reset to None. """ self.__dict__.update(state) # Ensure _lu is explicitly None upon restoration self._lu = None
[docs] class SpatialKernel(Kernel): """ Concrete implementation of Spatial Kernel. Can be built from raw coordinates OR from precomputed matrices. Attributes ---------- data : np.ndarray Raw data (coordinates or matrix) used to construct the kernel. mode : {'coords', 'precomputed', 'precomputed_inverse'} How the kernel was initialized. """ _available_kernels = ['gaussian', 'matern', 'moran', 'graph_laplacian', 'car']
[docs] def __init__(self, data: np.ndarray, mode: str = 'coords', method: str = 'matern', **kwargs) -> None: """ Internal constructor. Use .from_coordinates() or .from_matrix() instead. Parameters ---------- data : np.ndarray Input data (coordinates or kernel matrix). mode : {'coords', 'precomputed', 'precomputed_inverse'}, default 'coords' Interpretation of the data. method : str, default 'matern' Kernel method to use. **kwargs : dict Additional kernel parameters. """ self.data = data self.mode = mode assert mode in ['coords', 'precomputed', 'precomputed_inverse'], \ "Invalid mode. Must be 'coords', 'precomputed', or 'precomputed_inverse'." assert method in self._available_kernels + ['precomputed'], f"Unknown kernel method: {method}." # Update kernel parameters from defaults defaults = self._get_default_params(method).copy() if kwargs: for key, value in kwargs.items(): if key in defaults: defaults[key] = value else: raise ValueError(f"Unknown parameter '{key}' for method '{method}'") n = data.shape[0] super().__init__(n, method=method, **defaults)
def _get_default_params(self, method: str) -> dict[str, Any]: """ Returns default parameters for specific kernel methods. Parameters ---------- method : str Kernel method name. Should be one of _available_kernels. Returns ------- dict[str, Any] Method defaults: bandwidth (gaussian/matern), nu (matern), neighbor_degree (moran/graph_laplacian/car), rho (car). """ method_defaults = { 'gaussian': {'bandwidth': 2.0}, 'matern': {'bandwidth': 2.0, 'nu': 1.5}, 'moran': {'k_neighbors': 4}, 'graph_laplacian': {'k_neighbors': 4}, 'car': {'rho': 0.9, 'k_neighbors': 4, 'standardize': False}, } return method_defaults.get(method, {})
[docs] @classmethod def from_coordinates( cls, coords: np.ndarray, method: str = 'matern', **kwargs ) -> 'SpatialKernel': """ Build kernel from spatial coordinates. Parameters ---------- coords : np.ndarray Array of spatial coordinates, shape (N, D). method : str, default 'matern' Kernel method. Must be one of 'gaussian', 'matern', 'moran', 'graph_laplacian', 'car'. **kwargs : dict Additional kernel parameters (bandwidth, nu, rho, k_neighbors, etc.). Returns ------- SpatialKernel Initialized kernel object. Examples -------- >>> coords = np.random.randn(100, 2) >>> kernel = SpatialKernel.from_coordinates(coords, method='gaussian', bandwidth=1.0) """ assert method in cls._available_kernels, f"Unknown kernel method for coordinates: {method}." return cls(coords, mode='coords', method=method, **kwargs)
[docs] @classmethod def from_matrix( cls, matrix: Union[np.ndarray, sp.spmatrix], is_inverse: bool = False, method: str = 'precomputed', **kwargs ) -> 'SpatialKernel': """ Build kernel from a precomputed kernel matrix or its inverse. Parameters ---------- matrix : np.ndarray or scipy.sparse matrix Kernel matrix (N, N) or its inverse (precision matrix). is_inverse : bool, default False If True, matrix is treated as the inverse (precision) matrix K^-1. method : str, default 'precomputed' The logical kernel method (e.g., 'car' for precision matrices). **kwargs : dict Additional parameters. Returns ------- SpatialKernel Initialized kernel object. Examples -------- >>> K = np.array([[2, -1], [-1, 2]]) # kernel matrix >>> kernel = SpatialKernel.from_matrix(K, is_inverse=False) """ mode = 'precomputed_inverse' if is_inverse else 'precomputed' return cls(matrix, mode=mode, method=method, **kwargs)
def _build_kernel(self): method = self.method # ========================================== # 1. PREPARE RAW INPUTS (Dists or Weights) # ========================================== # Case A: Coordinates provided -> Compute Dists or W from scratch if self.mode == 'coords': coords = self.data if method in ['gaussian', 'matern']: # Compute dense distance matrix dists = squareform(pdist(coords, metric='euclidean')) W = None elif method in ['moran', 'graph_laplacian', 'car']: # Compute sparse adjacency graph k = self.params['k_neighbors'] nbrs = NearestNeighbors(n_neighbors=k, algorithm='ball_tree').fit(coords) W = nbrs.kneighbors_graph(coords, mode='connectivity').astype(float) dists = None else: raise ValueError(f"Unknown method for coordinates: {method}") # Case B: Precomputed Kernel provided elif self.mode == 'precomputed': return self.data # Case C: Precomputed Inverse Kernel provided elif self.mode == 'precomputed_inverse': M = self.data standardize = self.params.get('standardize', False) # If small, realize dense K; else keep implicit precision if self.n <= self.implicit_threshold: try: M_dense = M.toarray() if sp.issparse(M) else M K_dense = inv(M_dense) except Exception: M_dense = M.toarray() if sp.issparse(M) else M K_dense = np.linalg.pinv(M_dense) if standardize: diag_K = np.diag(np.asarray(K_dense)).copy() diag_K[diag_K <= 0] = 1e-12 s = 1.0 / np.sqrt(diag_K) S = np.diag(s) K_dense = S @ K_dense @ S return 0.5 * (K_dense + K_dense.T) else: self.is_implicit = True if standardize: M = self._standardize_precision(M) return M # ========================================== # 2. CONSTRUCT KERNEL FROM INPUTS # ========================================== # --- Distance Based --- if method in ['gaussian', 'matern']: bw = self.params['bandwidth'] if method == 'gaussian': K = np.exp(-dists**2 / (2 * bw**2)) elif method == 'matern': nu = self.params['nu'] length_scale = bw # Avoid div by zero dists_safe = dists.copy() dists_safe[dists_safe == 0] = 1e-15 factor = (np.sqrt(2 * nu) * dists_safe) / length_scale K = (2**(1 - nu) / gamma(nu)) * (factor**nu) * kv(nu, factor) np.fill_diagonal(K, 1.0) return K # --- Graph Based --- elif method in ['moran', 'graph_laplacian', 'car']: # Symmetrize and apply symmetric normalization: D^{-1/2} W D^{-1/2} if W is None: raise ValueError("Graph weights (W) required for graph kernels.") # Ensure float W = W.astype(float) # Symmetrize first W_sym = 0.5 * (W + W.T) # Zero out self-loops W_sym.setdiag(0) # Degree-based symmetric normalization row_sums = np.array(W_sym.sum(axis=1)).flatten() row_sums[row_sums == 0] = 1.0 inv_D_sqrt = sp.diags(1.0 / np.sqrt(row_sums)) W_norm = inv_D_sqrt @ W_sym @ inv_D_sqrt if method == 'moran': # Already symmetric and normalized return W_norm elif method == 'graph_laplacian': I = sp.eye(self.n, format='csr') return I - W_norm elif method == 'car': rho = self.params['rho'] standardize = self.params['standardize'] I = sp.eye(self.n, format='csc') # M = (I - rho * W_norm) is the inverse of the CAR kernel M = I - rho * W_norm if self.n > self.implicit_threshold: self.is_implicit = True if standardize: M = self._standardize_precision(M) return M else: try: K_dense = inv(M.toarray()) except: K_dense = np.linalg.pinv(M.toarray()) if standardize: diag_K = np.diag(np.asarray(K_dense)).copy() diag_K[diag_K <= 0] = 1e-12 s = 1.0 / np.sqrt(diag_K) S = np.diag(s) K_dense = S @ K_dense @ S return 0.5 * (K_dense + K_dense.T) else: raise ValueError(f"Unknown kernel method: {method}") def __repr__(self): # Describe input data succinctly if self.mode == 'coords': coords = self.data data_desc = f"coords shape={getattr(coords, 'shape', '?')}" elif self.mode == 'precomputed': M = self.data if sp.issparse(M): data_desc = f"matrix shape={M.shape} sparse nnz={M.nnz}" else: data_desc = f"matrix shape={getattr(M, 'shape', '?')} dense" elif self.mode == 'precomputed_inverse': M = self.data if sp.issparse(M): data_desc = f"precision shape={M.shape} sparse nnz={M.nnz}" else: data_desc = f"precision shape={getattr(M, 'shape', '?')} dense" else: data_desc = "data=?" return ( f"<SpatialKernel method={self.method} mode={self.mode} n={self.n} " f"implicit={self.is_implicit} data={data_desc} params={{ {self._format_params()} }}>" ) def __str__(self): # Human-friendly multi-line summary lines = [ "SpatialKernel", f"- Method: {self.method}", f"- Mode: {self.mode}", f"- Samples: {self.n}", f"- Implicit: {self.is_implicit} (threshold={self.implicit_threshold})", ] # Add a brief data description try: if self.mode == 'coords': coords = self.data lines.append(f"- Data: coords shape={getattr(coords, 'shape', '?')}") else: M = self.data if sp.issparse(M): kind = 'precision' if self.mode == 'precomputed_inverse' else 'matrix' lines.append(f"- Data: {kind} shape={M.shape} sparse nnz={M.nnz}") else: kind = 'precision' if self.mode == 'precomputed_inverse' else 'matrix' lines.append(f"- Data: {kind} shape={getattr(M, 'shape', '?')} dense") except Exception: lines.append("- Data: ?") lines.append(f"- Params: {self._format_params()}") return "\n".join(lines)