"""
General-purpose helpers: grid/coordinate generators, periodic-domain distance
matrices, Visium I/O, and the Benjamini–Hochberg correction used by every
detector / comparator.
"""
from __future__ import annotations
import json
import logging
import warnings
from pathlib import Path
import numpy as np
import pandas as pd
import scipy.sparse as sp
__all__ = [
# Grid & coordinate helpers
"get_rect_coords",
"get_visium_coords",
"convert_visium_to_physical",
"compute_torus_distance_matrix",
# Visium I/O
"VISIUM_V1_SPOT_SPACING_UM",
"visium_hex_spacing_um",
"load_visium_sample",
"visium_to_grid",
]
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Benjamini–Hochberg (private; shared by detectors / comparators)
# ---------------------------------------------------------------------------
def _apply_bh_correction(df: pd.DataFrame) -> None:
"""Apply Benjamini–Hochberg FDR correction to ``df['P_value']`` in place.
Writes adjusted p-values into a new ``P_adj`` column. Non-finite p-values
(NaN / inf) are left as NaN. Shared helper used by
:class:`quadsv.DetectorIrregular`, :class:`quadsv.DetectorGrid`, and the
comparator pipeline.
Parameters
----------
df : pandas.DataFrame
Must contain a ``P_value`` column. Modified in place to add ``P_adj``.
"""
pvals = df["P_value"].to_numpy()
valid_mask = np.isfinite(pvals)
m = valid_mask.sum()
if m == 0:
return
p_sorted_idx = np.argsort(pvals[valid_mask])
p_sorted = pvals[valid_mask][p_sorted_idx]
ranks = np.arange(1, m + 1)
bh_vals = p_sorted * m / ranks
bh_adj = np.minimum.accumulate(bh_vals[::-1])[::-1]
bh_adj = np.clip(bh_adj, 0, 1)
p_adj = np.full_like(pvals, np.nan)
p_adj_indices = np.where(valid_mask)[0][p_sorted_idx]
p_adj[p_adj_indices] = bh_adj
df["P_adj"] = p_adj
# ---------------------------------------------------------------------------
# Grid & coordinate generators
# ---------------------------------------------------------------------------
[docs]
def get_rect_coords(n_rows: int = 32, n_cols: int = 32) -> tuple[np.ndarray, tuple[int, int]]:
"""Generate rectangular grid coordinates with unit spacing.
Parameters
----------
n_rows, n_cols : int
Grid dimensions.
Returns
-------
coords : np.ndarray
``(n, 2)`` row-major ``[[y, x], ...]`` with ``n = n_rows · n_cols``.
grid_dims : tuple of int
``(n_rows, n_cols)``.
Examples
--------
>>> coords, dims = get_rect_coords(n_rows=10, n_cols=10)
>>> coords.shape
(100, 2)
>>> dims
(10, 10)
"""
y = np.arange(n_rows)
x = np.arange(n_cols)
yy, xx = np.meshgrid(y, x, indexing="ij")
return np.column_stack([yy.ravel(), xx.ravel()]), (n_rows, n_cols)
[docs]
def get_visium_coords(n_rows: int = 78, n_cols: int = 64) -> tuple[np.ndarray, tuple[int, int]]:
"""Generate Visium-like hexagonal array indices.
Offset-column layout: even rows use even ``array_col`` indices, odd rows
use odd indices. Each row holds ``n_cols`` spots.
Parameters
----------
n_rows : int, default 78
Number of array rows (78 for a full Visium v1 slide).
n_cols : int, default 64
Spots per row.
Returns
-------
coords : np.ndarray
``(n_rows · n_cols, 2)`` integer ``[[row, col], ...]``.
grid_dims : tuple of int
``(n_rows, n_cols)``.
See Also
--------
convert_visium_to_physical : Map these indices to physical ``(y, x)``.
Examples
--------
>>> coords, dims = get_visium_coords(n_rows=78, n_cols=64)
>>> coords.shape[0]
4992
"""
coords = []
for r in range(n_rows):
start_col = 0 if r % 2 == 0 else 1
for i in range(n_cols):
coords.append([r, start_col + 2 * i])
return np.array(coords), (n_rows, n_cols)
[docs]
def convert_visium_to_physical(coords: np.ndarray) -> np.ndarray:
"""Convert integer Visium ``(row, col)`` indices to physical ``(y, x)``.
Hexagonal geometry with equilateral triangles: ``Δcol = 2 → Δx = 1`` and
``Δrow = 1 → Δy = √3/2``.
Parameters
----------
coords : np.ndarray
``(n, 2)`` ``[[row, col], ...]`` (integers, from
:func:`get_visium_coords` or ``adata.obs[['array_row','array_col']]``).
Returns
-------
np.ndarray
``(n, 2)`` ``[[y, x], ...]`` with ``y = row · √3/2``, ``x = col / 2``.
Examples
--------
>>> coords = np.array([[0, 0], [0, 2], [1, 1]])
>>> convert_visium_to_physical(coords)
array([[0. , 0. ],
[0. , 1. ],
[0.8660254, 0.5 ]])
"""
rows = coords[:, 0]
cols = coords[:, 1]
phys_y = rows * np.sqrt(3) / 2.0
phys_x = cols * 0.5
return np.column_stack([phys_y, phys_x])
[docs]
def compute_torus_distance_matrix(
phys_coords: np.ndarray, domain_dims: tuple[float, float]
) -> np.ndarray:
"""Pairwise Euclidean distances on a rectangular torus.
Each pair's distance is the minimum over direct and wrap-around offsets:
``d_wrapped = min(|Δp|, D - |Δp|)`` per axis.
Parameters
----------
phys_coords : np.ndarray
``(n, 2)`` ``[[y, x], ...]``.
domain_dims : tuple of float
Periodic domain extent ``(height, width)``.
Returns
-------
np.ndarray
``(n, n)`` pairwise distances.
Examples
--------
>>> coords = np.array([[0.0, 0.0], [1.0, 0.0], [9.9, 0.0]])
>>> dists = compute_torus_distance_matrix(coords, (10.0, 10.0))
>>> round(dists[0, 2], 2)
0.1
"""
domain_h, domain_w = domain_dims
diff_y = np.abs(phys_coords[:, None, 0] - phys_coords[None, :, 0])
diff_x = np.abs(phys_coords[:, None, 1] - phys_coords[None, :, 1])
wrapped_dy = np.minimum(diff_y, domain_h - diff_y)
wrapped_dx = np.minimum(diff_x, domain_w - diff_x)
return np.sqrt(wrapped_dy**2 + wrapped_dx**2)
# ---------------------------------------------------------------------------
# Visium I/O and hex → regular-grid rasterization
# ---------------------------------------------------------------------------
#: Physical center-to-center distance between Visium v1 (6.5 mm capture) spots, in μm.
[docs]
VISIUM_V1_SPOT_SPACING_UM: float = 100.0
[docs]
def visium_hex_spacing_um(
spot_spacing_um: float = VISIUM_V1_SPOT_SPACING_UM,
grid: str = "dense",
) -> tuple[float, float]:
"""Physical ``(dy, dx)`` per grid cell for a Visium hex raster.
Parameters
----------
spot_spacing_um : float, default 100.0
Center-to-center distance between adjacent spots (100 μm for Visium v1).
grid : {'dense', 'collapsed'}, default 'dense'
Rasterization mode — see :func:`visium_to_grid`.
Returns
-------
tuple of float
``(dy, dx)`` in micrometres per grid cell.
Raises
------
ValueError
If ``grid`` is unknown.
"""
dy = spot_spacing_um * np.sqrt(3.0) / 2.0
if grid == "dense":
dx = spot_spacing_um / 2.0
elif grid == "collapsed":
dx = spot_spacing_um
else:
raise ValueError(f"grid must be 'dense' or 'collapsed', got '{grid}'.")
return float(dy), float(dx)
def _read_tissue_positions(spatial_dir: Path) -> pd.DataFrame:
"""Read ``tissue_positions_list.csv`` (SR v1) or ``tissue_positions.csv`` (v2)."""
candidates = [
spatial_dir / "tissue_positions_list.csv", # Space Ranger < 2.0, no header
spatial_dir / "tissue_positions.csv", # Space Ranger >= 2.0, with header
]
for path in candidates:
if path.exists():
break
else:
raise FileNotFoundError(f"No tissue_positions[_list].csv found in {spatial_dir}.")
if path.name == "tissue_positions_list.csv":
return pd.read_csv(
path,
header=None,
names=[
"barcode",
"in_tissue",
"array_row",
"array_col",
"pxl_row_in_fullres",
"pxl_col_in_fullres",
],
)
return pd.read_csv(path)
[docs]
def load_visium_sample( # noqa: C901
path: str | Path,
matrix_path: str | Path | None = None,
in_tissue_only: bool = True,
) -> "anndata.AnnData": # noqa: F821, UP037
"""Load a Visium Space Ranger output directory as :class:`anndata.AnnData`.
Accepts either the flat layout (``<path>/<sample>_filtered_feature_bc_matrix.h5``
+ ``<path>/spatial/``) or the canonical Space Ranger ``outs/`` layout
(``<path>/filtered_feature_bc_matrix.h5`` + ``<path>/spatial/``); auto-detects.
Parameters
----------
path : str or Path
Directory holding the filtered matrix and ``spatial/`` subfolder.
matrix_path : str or Path, optional
Explicit path to the filtered ``.h5`` matrix. Defaults to
``<path>/filtered_feature_bc_matrix.h5`` or ``*_filtered_feature_bc_matrix.h5``.
in_tissue_only : bool, default True
Restrict to spots with ``in_tissue == 1``.
Returns
-------
anndata.AnnData
``adata.obs`` has ``in_tissue``, ``array_row``, ``array_col``,
``pxl_row_in_fullres``, ``pxl_col_in_fullres``.
``adata.obsm['spatial']`` holds ``(pxl_col, pxl_row)`` full-resolution
pixel coords. ``adata.uns['spatial']`` stores ``scalefactors_json.json``
and the source path.
Raises
------
FileNotFoundError
If the matrix or spatial folder cannot be located.
ImportError
If :mod:`anndata` / :mod:`scanpy` are not installed.
"""
try:
import anndata # noqa: F401
import scanpy as sc
except ImportError as e: # pragma: no cover
raise ImportError(
"load_visium_sample requires scanpy + anndata. "
"Install with `pip install 'quadsv[spatial]'` or `pip install scanpy`."
) from e
path = Path(path)
if not path.is_dir():
raise FileNotFoundError(f"{path} is not a directory.")
spatial_dir = path / "spatial"
if not spatial_dir.is_dir():
alt = path / "outs" / "spatial"
if alt.is_dir():
spatial_dir = alt
path = path / "outs"
else:
raise FileNotFoundError(f"No spatial/ subfolder found under {path}.")
if matrix_path is None:
mp = path / "filtered_feature_bc_matrix.h5"
if not mp.exists():
matches = sorted(path.glob("*_filtered_feature_bc_matrix.h5"))
if not matches:
raise FileNotFoundError(f"No filtered_feature_bc_matrix.h5 found under {path}.")
mp = matches[0]
else:
mp = Path(matrix_path)
logger.info("Reading Visium matrix: %s", mp)
adata = sc.read_10x_h5(mp)
adata.var_names_make_unique()
tp = _read_tissue_positions(spatial_dir).set_index("barcode")
missing = set(adata.obs_names) - set(tp.index)
if missing:
warnings.warn(
f"{len(missing)} barcodes in matrix lack entries in tissue_positions; "
"they will be dropped.",
UserWarning,
stacklevel=2,
)
adata = adata[adata.obs_names.isin(tp.index)].copy()
tp = tp.reindex(adata.obs_names)
for col in ("in_tissue", "array_row", "array_col", "pxl_row_in_fullres", "pxl_col_in_fullres"):
adata.obs[col] = tp[col].astype(int if col == "in_tissue" else float).values
adata.obsm["spatial"] = np.column_stack(
[tp["pxl_col_in_fullres"].to_numpy(), tp["pxl_row_in_fullres"].to_numpy()]
)
scalefactors_path = spatial_dir / "scalefactors_json.json"
if scalefactors_path.exists():
with scalefactors_path.open() as f:
scalefactors = json.load(f)
else:
scalefactors = {}
adata.uns["spatial"] = {"scalefactors": scalefactors, "path": str(path)}
if in_tissue_only:
adata = adata[adata.obs["in_tissue"].to_numpy().astype(bool)].copy()
logger.info(
"Loaded Visium sample: %d spots (%d in tissue), %d genes.",
adata.n_obs,
int(adata.obs["in_tissue"].sum()) if "in_tissue" in adata.obs else adata.n_obs,
adata.n_vars,
)
return adata
def _fill_nearest_hex_neighbor(grid: np.ndarray, mask: np.ndarray) -> np.ndarray:
"""Fill empty cells of a Visium hex raster from same-row neighbours.
For the alternating-parity layout, empty cells at ``(r, c)`` with
``c % 2 != r % 2`` have two real spot neighbours on the same row at
``(r, c±1)``. Average them (fall back to one side at edges).
"""
out = grid.copy()
ny, nx = mask.shape
empty = np.argwhere(~mask)
if empty.size == 0:
return out
for r, c in empty:
left = out[..., r, c - 1] if c - 1 >= 0 and mask[r, c - 1] else None
right = out[..., r, c + 1] if c + 1 < nx and mask[r, c + 1] else None
if left is not None and right is not None:
out[..., r, c] = 0.5 * (left + right)
elif left is not None:
out[..., r, c] = left
elif right is not None:
out[..., r, c] = right
return out
[docs]
def visium_to_grid( # noqa: C901
adata: "anndata.AnnData", # noqa: F821, UP037
genes: list[str] | None = None,
layer: str | None = None,
grid: str = "dense",
fill: str = "nearest",
spot_spacing_um: float = VISIUM_V1_SPOT_SPACING_UM,
max_row: int | None = None,
max_col: int | None = None,
) -> tuple[np.ndarray, tuple[float, float]]:
"""Rasterize a Visium ``adata`` onto a regular rectangular grid.
Rasterization modes:
- ``grid='dense'`` (default): ``(78, 128)`` array filled from real spots on
half the cells; remaining cells imputed from their two nearest hex
neighbours on the same row. Physical spacing ``(100·√3/2, 50)`` μm.
Exact hex geometry preserved.
- ``grid='collapsed'``: ``(78, 64)`` array using ``array_col // 2`` as
column index. Physical spacing ``(100·√3/2, 100)`` μm. Faster, but
drops the 50 μm horizontal offset between alternating rows
(≤5 % geometric distortion).
Parameters
----------
adata : anndata.AnnData
Must carry ``adata.obs['array_row']`` and ``adata.obs['array_col']``
(e.g. from :func:`load_visium_sample`).
genes : list of str, optional
Gene subset. ``None`` → all ``adata.var_names``.
layer : str, optional
``adata.layers`` key. ``None`` → ``adata.X``.
grid : {'dense', 'collapsed'}, default 'dense'
fill : {'nearest', 'zero'}, default 'nearest'
Empty-cell handling. ``'nearest'`` averages the two row-neighbour spots
(avoids FFT aliasing); ``'zero'`` leaves cells at 0. Ignored when
``grid='collapsed'``.
spot_spacing_um : float, default 100.0
max_row, max_col : int, optional
Pad output to a common size; defaults to the maxima observed in ``adata``.
Returns
-------
grid_arr : np.ndarray
``(n_genes, ny, nx)`` float64, ready for
:func:`quadsv.power_spectrum_2d`.
spacing_um : tuple of float
``(dy, dx)`` in μm.
Raises
------
KeyError
If ``array_row`` / ``array_col`` are absent from ``adata.obs``.
ValueError
If ``grid`` / ``fill`` are unknown, or ``layer`` is missing.
"""
if "array_row" not in adata.obs or "array_col" not in adata.obs:
raise KeyError(
"adata.obs must contain 'array_row' and 'array_col'. "
"Load the sample with quadsv.utils.load_visium_sample first."
)
if grid not in ("dense", "collapsed"):
raise ValueError(f"grid must be 'dense' or 'collapsed', got '{grid}'.")
if fill not in ("nearest", "zero"):
raise ValueError(f"fill must be 'nearest' or 'zero', got '{fill}'.")
rows = adata.obs["array_row"].to_numpy().astype(int)
cols = adata.obs["array_col"].to_numpy().astype(int)
ny = int(rows.max() + 1) if max_row is None else max_row
if grid == "dense":
nx = int(cols.max() + 1) if max_col is None else max_col
else: # collapsed: use array_col // 2 as column index
nx = int(cols.max() // 2 + 1) if max_col is None else max_col
if layer is None:
X = adata.X
else:
if layer not in adata.layers:
raise ValueError(f"layer '{layer}' not found in adata.layers.")
X = adata.layers[layer]
if sp.issparse(X):
X = X.toarray()
X = np.asarray(X, dtype=np.float64)
if genes is not None:
gene_idx = [adata.var_names.get_loc(g) for g in genes]
X = X[:, gene_idx]
n_genes = X.shape[1]
out = np.zeros((n_genes, ny, nx), dtype=np.float64)
filled = np.zeros((ny, nx), dtype=bool)
if grid == "dense":
for i in range(X.shape[0]):
r, c = rows[i], cols[i]
if 0 <= r < ny and 0 <= c < nx:
out[:, r, c] = X[i]
filled[r, c] = True
if fill == "nearest":
out = _fill_nearest_hex_neighbor(out, filled)
else: # collapsed
for i in range(X.shape[0]):
r, c = rows[i], cols[i] // 2
if 0 <= r < ny and 0 <= c < nx:
out[:, r, c] = X[i]
spacing = visium_hex_spacing_um(spot_spacing_um=spot_spacing_um, grid=grid)
logger.info(
"Rasterized %d genes x %d spots onto %s grid of shape (%d, %d), spacing=%s μm.",
n_genes,
adata.n_obs,
grid,
ny,
nx,
spacing,
)
return out, spacing