"""
A logical, read-only coordinator for TileDB-backed multi-dimensional datasets.
This class synchronizes slicing and metadata retrieval across multiple out-of-core
components:
- Assays: A dictionary of `cellarr-array` objects (Dense or Sparse).
- Row Data: An aligned `cellarr-frame` for row-wise annotations.
- Column Data: An aligned `cellarr-frame` for column-wise annotations.
CellArraySE maintains data on disk, performing synchronized "lazy" slices that
return standard in-memory `summarizedexperiment.SummarizedExperiment` objects
only when requested.
"""
from functools import cached_property
from typing import Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
from cellarr_array import SparseCellArray
from cellarr_array.core import CellArray
from cellarr_frame import CellArrayFrame
from summarizedexperiment import SummarizedExperiment
__author__ = "chanjd"
__copyright__ = "chanjd"
__license__ = "MIT"
def _get_frame_index(frame: CellArrayFrame) -> pd.Index:
"""Get frame index efficiently without loading column data.
Sparse frames store explicit index values (integer or string) via frame.index.
Dense frames only support 0-based integer indices, derived from nonempty_domain.
Note: only supports single CellArrayFrame objects with 1 dimension for now.
"""
idx_df = frame.index
if not idx_df.empty:
# Sparse frame - decode bytes to strings
dim_name = frame.index_names[0]
values = [v.decode() if isinstance(v, bytes) else v for v in idx_df[dim_name]]
return pd.Index(values, name=dim_name)
else:
# Dense frame - derive RangeIndex from nonempty_domain
with frame.open_array(mode="r") as arr:
ned = arr.nonempty_domain()
if ned and ned[0]:
start, stop = ned[0]
return pd.RangeIndex(start=start, stop=stop + 1)
return pd.RangeIndex(start=0, stop=0)
#: Type alias for subset keys accepted by ``__getitem__`` and ``slice()``.
#:
#: Supported types:
#: - ``int``: Single index (e.g., ``5`` or ``-1`` for last element)
#: - ``slice``: Range of indices (e.g., ``slice(0, 10)`` or ``0:10`` in brackets)
#: - ``List[int]``: Multiple indices (e.g., ``[0, 2, 4]``)
#: - ``str``: Single name matching row_names/col_names
#: - ``List[str]``: Multiple names (e.g., ``["gene1", "gene2"]``)
#: - ``None``: Select all elements in that dimension
SubsetKey = int | slice | List[int] | str | List[str] | None
def _validate_input(assays: Dict[str, CellArray], row_data: CellArrayFrame, col_data: CellArrayFrame):
"""Validate that assays and metadata frames are compatible.
Checks:
- assays is a non-empty dict of CellArray instances
- row_data and col_data are CellArrayFrame instances
- All assays have the same shape
- Assay row count matches row_data row count
- Assay column count matches col_data row count
Raises:
TypeError:
If inputs are not the expected types.
ValueError:
If assays is empty or shapes don't match.
"""
# Type Enforcement
if not isinstance(assays, dict):
raise TypeError(f"Assays must be a dictionary, got {type(assays)}.")
if not assays:
raise ValueError("The 'assays' dictionary cannot be empty.")
for name, arr in assays.items():
if not isinstance(arr, CellArray):
raise TypeError(f"Assay '{name}' must be a CellArray instance, got {type(arr)}.")
if not isinstance(row_data, CellArrayFrame):
raise TypeError(f"row_data must be a CellArrayFrame instance, got {type(row_data)}.")
if not isinstance(col_data, CellArrayFrame):
raise TypeError(f"col_data must be a CellArrayFrame instance, got {type(col_data)}.")
n_rows = len(_get_frame_index(row_data))
n_cols = len(_get_frame_index(col_data))
base_shape = (n_rows, n_cols)
for name, arr in assays.items():
if arr.shape != base_shape:
raise ValueError(f"Assay '{name}' shape {arr.shape} != {base_shape}.")
[docs]
class CellArraySE:
[docs]
def __init__(
self,
assays: Dict[str, CellArray],
row_data: CellArrayFrame,
col_data: CellArrayFrame,
):
"""Initialize the SE coordinator with existing TileDB-backed handles.
Args:
assays:
Dictionary mapping assay names to CellArray objects.
All assays must have the same shape.
row_data:
CellArrayFrame containing row metadata.
Number of rows must match assay row count.
col_data:
CellArrayFrame containing column metadata.
Number of rows must match assay column count.
Raises:
ValueError:
If assays is empty, shapes don't match, or inputs are invalid types.
"""
_validate_input(assays, row_data, col_data)
self.assays = assays
self.row_data = row_data
self.col_data = col_data
# --- Shape & Dim Getters ---
@cached_property
def shape(self) -> Tuple[int, int]:
"""Number of rows and columns as (n_rows, n_cols)."""
first_assay = next(iter(self.assays.values()))
return first_assay.shape
@property
def dims(self) -> Tuple[int, int]:
"""Alias for shape."""
return self.shape
# --- Index Accessors ---
@cached_property
def row_names(self) -> pd.Index:
"""Index values for rows metadata table."""
return _get_frame_index(self.row_data)
@cached_property
def col_names(self) -> pd.Index:
"""Index values for columns metadata table."""
return _get_frame_index(self.col_data)
# --- Metadata Discovery ---
@cached_property
def assay_names(self) -> List[str]:
"""Names of available assays."""
return list(self.assays.keys())
@cached_property
def row_columns(self) -> List[str]:
"""Column names of the metadata fields for row_data."""
return self.row_data.column_names
@cached_property
def col_columns(self) -> List[str]:
"""Column names of the metadata fields for column_data."""
return self.col_data.column_names
[docs]
def is_sparse(self, assay_name: str) -> bool:
"""Check if an assay is sparse.
Args:
assay_name:
Name of the assay to check.
Returns:
True if the assay is a SparseCellArray, False otherwise.
"""
if assay_name not in self.assays:
raise KeyError(f"Assay '{assay_name}' not found.")
return isinstance(self.assays[assay_name], SparseCellArray)
[docs]
def get_assay_type(self, assay_name: str) -> np.dtype:
"""Get the data type of an assay matrix.
Args:
assay_name:
Name of the assay.
Returns:
NumPy dtype of the assay's data attribute.
Raises:
KeyError:
If the assay name does not exist.
"""
if assay_name not in self.assays:
raise KeyError(f"Assay '{assay_name}' not found.")
handle = self.assays[assay_name]
# Open array momentarily to read metadata (Zero-copy)
with handle.open_array(mode="r") as A:
return A.schema.attr(0).dtype
# --- cellarr-se Object Summary ---
[docs]
def show(self, n: int = 5):
"""Display a summary of the experiment structure and metadata.
Args:
n:
Number of rows to display from row_data and col_data.
Defaults to 5.
"""
print(f"CellArraySE Object | {self.shape[0]} rows x {self.shape[1]} cols")
print(f"Assays: {', '.join(self.assay_names)}")
print("\n--- Row Data ---")
# Use row_names for slicing to support string-indexed frames
row_subset = list(self.row_names[:n])
print(self.row_data[row_subset])
print("\n--- Column Data ---")
col_subset = list(self.col_names[:n])
print(self.col_data[col_subset])
# --- Subsetting/Slicing ---
def _resolve_key_to_indices(
self,
key: SubsetKey,
names: pd.Index,
dim_size: int,
) -> List[int]:
"""Convert any subset key type to a list of integer positions.
This normalizes the various ways users can specify subsets (int, slice,
name, list) into a consistent list of integer indices. These indices
are then used to slice both metadata frames and assay matrices.
Args:
key:
The subset specification. Can be:
- None: select all
- int: single position (supports negative indexing)
- slice: range of positions (no step support)
- str: single name lookup
- List[int]: multiple positions
- List[str]: multiple name lookups
names:
Index of the dimension (row_names or col_names) for name lookups.
dim_size:
Size of the dimension for bounds checking and negative index resolution.
Returns:
List of integer positions.
Raises:
IndexError:
If position is out of bounds or slice has a step.
KeyError:
If name is not found in the index.
TypeError:
If key type is not supported.
"""
if key is None:
return list(range(dim_size))
if isinstance(key, int):
if key < 0:
key = dim_size + key
if key < 0 or key >= dim_size:
raise IndexError(f"Index {key} out of bounds for dimension of size {dim_size}.")
return [key]
if isinstance(key, slice):
if key.step is not None:
raise IndexError("Slice steps (strides) are not supported.")
start = key.start if key.start is not None else 0
stop = key.stop if key.stop is not None else dim_size
if start < 0:
start = dim_size + start
if stop < 0:
stop = dim_size + stop
return list(range(start, stop))
if isinstance(key, str):
# Single name lookup
if key not in names:
raise KeyError(f"Name '{key}' not found.")
return [names.get_loc(key)]
if isinstance(key, list):
if not key:
return []
if not all(isinstance(k, type(key[0])) for k in key):
raise TypeError("List elements must all be the same type, got mixed types.")
if isinstance(key[0], int):
# List of integers
resolved = []
for idx in key:
if idx < 0:
idx = dim_size + idx
if idx < 0 or idx >= dim_size:
raise IndexError(f"Index {idx} out of bounds for dimension of size {dim_size}.")
resolved.append(idx)
return resolved
if isinstance(key[0], str):
# List of names
resolved = []
for name in key:
if name not in names:
raise KeyError(f"Name '{name}' not found.")
resolved.append(names.get_loc(name))
return resolved
raise TypeError(f"List elements must be int or str, got {type(key[0])}.")
raise TypeError(f"Unsupported key type: {type(key)}. Expected int, slice, str, List[int], or List[str].")
def _subset_frame(
self,
handle: CellArrayFrame,
subset: SubsetKey = None,
query: Optional[str] = None,
columns: Optional[List[str]] = None,
names: Optional[pd.Index] = None,
dim_size: Optional[int] = None,
) -> pd.DataFrame:
"""Subset a CellArrayFrame using either positional/name-based subset or TileDB query.
Args:
handle: The CellArrayFrame to subset.
subset: Positional or name-based subset key.
query: TileDB query condition string.
columns: Columns to select from the frame.
names: Row/column names for name-based lookups (required if subset contains strings).
dim_size: Dimension size for slice resolution.
Returns:
Subsetted DataFrame.
"""
if subset is not None and query is not None:
raise ValueError("Cannot specify both 'subset' and 'query'. Use one or the other.")
# Query-based filtering
if query is not None:
if columns is not None:
return handle[query, columns]
return handle[query]
# No subset - return all rows
if subset is None:
if columns is not None:
return handle[:, columns]
return handle[:]
indices = self._resolve_key_to_indices(subset, names, dim_size)
# Convert integer indices to names for string-indexed frames
# Use the passed 'names' parameter (already cached) instead of re-reading
if len(names) > 0 and isinstance(names[0], str):
if isinstance(indices, list) and len(indices) > 0 and isinstance(indices[0], (int, np.integer)):
indices = [names[i] for i in indices]
# Use bracket notation for subsetting
if columns is not None:
return handle[indices, columns]
return handle[indices]
[docs]
def slice(
self,
row_subset: SubsetKey = None,
col_subset: SubsetKey = None,
row_query: Optional[str] = None,
col_query: Optional[str] = None,
assays: Optional[List[str]] = None,
row_columns: Optional[List[str]] = None,
col_columns: Optional[List[str]] = None,
) -> SummarizedExperiment:
"""Slice the CellArraySE to produce an in-memory SummarizedExperiment.
This method provides full control over subsetting, including TileDB query
support. For simple positional/name-based access, use bracket notation
instead (e.g., ``se[0:10, 0:5]``).
Args:
row_subset: Row subset key. Accepted types:
- ``int``: Single index (e.g., ``5``, ``-1`` for last)
- ``slice``: Range (e.g., ``slice(0, 10)``)
- ``List[int]``: Multiple indices (e.g., ``[0, 2, 4]``)
- ``str``: Single name matching row_names
- ``List[str]``: Multiple names
- ``None``: Select all rows (default)
col_subset: Column subset key. Same types as row_subset.
row_query: TileDB query string for row filtering (e.g.,
``"gene_type == 'protein'"``. Mutually exclusive with row_subset.
col_query: TileDB query string for column filtering. Mutually exclusive
with col_subset.
assays: List of assay names to include. If None, includes all assays.
row_columns: List of row metadata columns to include. If None, includes all.
col_columns: List of column metadata columns to include. If None, includes all.
Returns:
SummarizedExperiment with the requested subset of data.
Raises:
ValueError: If both subset and query are specified for the same dimension.
KeyError: If assay name or row/column name is not found.
IndexError: If index is out of bounds.
Examples:
Basic positional subsetting::
subset = se.slice(
row_subset=slice(
0, 100
),
col_subset=slice(
0, 50
),
)
Select specific assays and metadata columns::
subset = se.slice(
row_subset=[
0,
1,
2,
],
col_subset=slice(
0, 10
),
assays=[
"counts"
],
row_columns=[
"gene_id",
"gene_name",
],
)
Filter using TileDB query strings::
# Get protein-coding genes from liver samples
subset = se.slice(
row_query="gene_type == 'protein'",
col_query="tissue == 'liver'",
)
Combine query with column selection::
subset = se.slice(
row_query="gene_type == 'protein'",
col_subset=[
0,
1,
2,
],
assays=[
"counts",
"tpm",
],
)
"""
# Validate assay names
if assays is not None:
for name in assays:
if name not in self.assays:
raise KeyError(f"Assay '{name}' not found. Available: {self.assay_names}")
# Validate mutual exclusion before branching
if row_subset is not None and row_query is not None:
raise ValueError("Cannot specify both 'subset' and 'query'. Use one or the other.")
if col_subset is not None and col_query is not None:
raise ValueError("Cannot specify both 'subset' and 'query'. Use one or the other.")
# Queries require string-indexed frames — integer-indexed frames have no named
# attributes to filter on; use positional slicing instead.
if row_query is not None and (not len(self.row_names) or not isinstance(self.row_names[0], str)):
raise ValueError("row_query requires a string-indexed row frame.")
if col_query is not None and (not len(self.col_names) or not isinstance(self.col_names[0], str)):
raise ValueError("col_query requires a string-indexed column frame.")
# Resolve assay indices directly — do not derive from the returned DataFrame
# index, which is reset to 0-based for dense frames and cannot be trusted.
if row_query is not None:
# Query path: frame subset first, then convert preserved string index to positions
df_row = self._subset_frame(
self.row_data,
query=row_query,
columns=row_columns,
names=self.row_names,
dim_size=self.shape[0],
)
row_indices = [self.row_names.get_loc(name) for name in df_row.index.tolist()]
else:
row_indices = self._resolve_key_to_indices(row_subset, self.row_names, self.shape[0])
df_row = self._subset_frame(
self.row_data,
subset=row_subset,
columns=row_columns,
names=self.row_names,
dim_size=self.shape[0],
)
if col_query is not None:
df_col = self._subset_frame(
self.col_data,
query=col_query,
columns=col_columns,
names=self.col_names,
dim_size=self.shape[1],
)
col_indices = [self.col_names.get_loc(name) for name in df_col.index.tolist()]
else:
col_indices = self._resolve_key_to_indices(col_subset, self.col_names, self.shape[1])
df_col = self._subset_frame(
self.col_data,
subset=col_subset,
columns=col_columns,
names=self.col_names,
dim_size=self.shape[1],
)
# Determine which assays to include
assay_names_to_use = assays if assays is not None else self.assay_names
# Fetch assay data
sub_assays = {name: self.assays[name][row_indices, col_indices] for name in assay_names_to_use}
return SummarizedExperiment(
assays=sub_assays,
row_data=df_row,
column_data=df_col,
)
[docs]
def __getitem__(self, key: Tuple[SubsetKey, SubsetKey]) -> SummarizedExperiment:
"""Subset using bracket notation: ``se[rows, cols]``.
This method provides simple positional and name-based subsetting. For
advanced filtering with TileDB query strings, use :meth:`slice` instead.
Supported key types (see :data:`SubsetKey`):
- ``int``: Single index (e.g., ``se[0, 5]``)
- ``slice``: Range (e.g., ``se[0:10, 0:5]``)
- ``List[int]``: Multiple indices (e.g., ``se[[0, 1, 2], [3, 4]]``)
- ``str``: Single name (e.g., ``se["gene1", "sample1"]``)
- ``List[str]``: Multiple names (e.g., ``se[["gene1", "gene2"], ["s1", "s2"]]``)
Note:
Query-based filtering is **not** supported via bracket notation.
Use ``se.slice(row_query="...", col_query="...")`` for TileDB queries.
Args:
key: A 2-tuple of (row_key, col_key).
Returns:
SummarizedExperiment with the requested subset.
Raises:
ValueError: If key is not a 2-tuple.
TypeError: If key types are not supported.
IndexError: If indices are out of bounds.
KeyError: If names are not found.
Examples:
::
# Slice by position
subset = se[
0:100, 0:50
]
# Single indices
single = se[5, 3]
# List of indices
subset = se[
[0, 2, 4],
[1, 3],
]
# For query-based filtering, use slice():
subset = se.slice(
row_query="gene_type == 'protein'"
)
"""
if not isinstance(key, tuple) or len(key) != 2:
raise ValueError(
"Slicing requires a 2-dimensional tuple (e.g., se[0:10, 0:5]). "
"For query-based filtering, use se.slice(row_query=..., col_query=...)."
)
row_key, col_key = key
# Validate key types
valid_types = (int, slice, str, list, type(None))
if not isinstance(row_key, valid_types):
raise TypeError(f"Row key must be int, slice, str, or list, got {type(row_key)}.")
if not isinstance(col_key, valid_types):
raise TypeError(f"Column key must be int, slice, str, or list, got {type(col_key)}.")
return self.slice(row_subset=row_key, col_subset=col_key)
[docs]
def __repr__(self) -> str:
"""String representation showing shape and assay names."""
return f"<CellArraySE: {self.shape[0]}x{self.shape[1]} | {', '.join(self.assay_names)}>"