Source code for cellarr_array.core.base

from abc import ABC, abstractmethod
from contextlib import contextmanager

try:
    from types import EllipsisType
except ImportError:
    # TODO: This is required for Python <3.10. Remove once Python 3.9 reaches EOL in October 2025
    EllipsisType = type(...)
from typing import Any, List, Literal, Optional, Tuple, Union

import numpy as np
import tiledb
from scipy import sparse

from ..utils.config import ConsolidationConfig
from .helpers import SliceHelper

__author__ = "Jayaram Kancherla"
__copyright__ = "Jayaram Kancherla"
__license__ = "MIT"


[docs] class CellArray(ABC): """Abstract base class for TileDB array operations."""
[docs] def __init__( self, uri: Optional[str] = None, tiledb_array_obj: Optional[tiledb.Array] = None, attr: str = "data", mode: Optional[Literal["r", "w", "d", "m"]] = None, config_or_context: Optional[Union[tiledb.Config, tiledb.Ctx]] = None, validate: bool = True, ): """Initialize the object. Args: uri: URI to the array. Required if 'tiledb_array_obj' is not provided. tiledb_array_obj: Optional, an already opened ``tiledb.Array`` instance. If provided, 'uri' can be None, and 'config_or_context' is ignored. attr: Attribute to access. Defaults to "data". mode: Open the array object in read 'r', write 'w', modify 'm' mode, or delete 'd' mode. Defaults to None for automatic mode switching. If 'tiledb_array_obj' is provided, this mode should ideally match the mode of the provided array or be None. config_or_context: Optional config or context object. Ignored if 'tiledb_array_obj' is provided, as context will be derived from the object. Defaults to None. validate: Whether to validate the attributes. Defaults to True. """ self._array_passed_in = False self._opened_array_external = None self._ctx = None if tiledb_array_obj is not None: if not isinstance(tiledb_array_obj, tiledb.Array): raise ValueError("'tiledb_array_obj' must be a tiledb.Array instance.") if not tiledb_array_obj.isopen: # Option 1: Raise error raise ValueError("If 'tiledb_array_obj' is provided, it must be an open tiledb.Array instance.") # Option 2: Try to reopen (less safe as we don't know original intent) # try: # tiledb_array_obj.reopen() # except tiledb.TileDBError as e: # raise ValueError( # f"Provided 'tiledb_array_obj' is closed and could not be reopened: {e}" # ) self.uri = tiledb_array_obj.uri self._array_passed_in = True self._opened_array_external = tiledb_array_obj # infer mode if possible, or require it matches if mode is not None and tiledb_array_obj.mode != mode: # we could try to reopen with the desired mode raise ValueError( f"Provided array mode '{tiledb_array_obj.mode}' does not match requested mode '{mode}'.", "Re-open the external array with the desired mode or pass matching mode.", ) self._mode = tiledb_array_obj.mode self._ctx = tiledb_array_obj.ctx elif uri is not None: self.uri = uri self._mode = mode self._array_passed_in = False self._opened_array_external = None if config_or_context is None: self._ctx = None elif isinstance(config_or_context, tiledb.Config): self._ctx = tiledb.Ctx(config_or_context) elif isinstance(config_or_context, tiledb.Ctx): self._ctx = config_or_context else: raise TypeError("'config_or_context' must be a TileDB Config or Ctx object.") else: raise ValueError("Either 'uri' or 'tiledb_array_obj' must be provided.") self._shape = None self._ndim = None self._dim_names = None self._attr_names = None self._nonempty_domain = None if validate: self._validate(attr=attr) self._attr = attr
def _validate(self, attr): with self.open_array(mode="r") as A: schema = A.schema if schema.ndim > 2: raise ValueError("Only 1D and 2D arrays are supported.") current_attr_names = [schema.attr(i).name for i in range(schema.nattr)] if attr not in current_attr_names: raise ValueError( f"Attribute '{attr}' does not exist in the array. Available attributes: {current_attr_names}." ) @property def mode(self) -> Optional[str]: """Get current array mode. If an external array is used, this is its open mode.""" if self._array_passed_in and self._opened_array_external is not None: return self._opened_array_external.mode return self._mode @mode.setter def mode(self, value: Optional[str]): """Set array mode for subsequent operations if not using an external array. This action does not affect an already passed-in external array's mode. """ if self._array_passed_in: # To change mode of an external array, user must reopen it and pass it again. current_ext_mode = self._opened_array_external.mode if self._opened_array_external else "unknown" if value != current_ext_mode: raise ValueError( f"Cannot change mode of an externally managed array (current: {current_ext_mode}). " "Re-open the external array with the new mode and re-initialize CellArray." ) if value is not None and value not in ["r", "w", "m", "d"]: raise ValueError("Mode must be one of: None, 'r', 'w', 'm', 'd'") self._mode = value @property def dim_names(self) -> List[str]: """Get dimension names of the array.""" if self._dim_names is None: with self.open_array(mode="r") as A: self._dim_names = [dim.name for dim in A.schema.domain] return self._dim_names @property def attr_names(self) -> List[str]: """Get attribute names of the array.""" if self._attr_names is None: with self.open_array(mode="r") as A: self._attr_names = [A.schema.attr(i).name for i in range(A.schema.nattr)] return self._attr_names @property def shape(self) -> Tuple[int, ...]: if self._shape is None: with self.open_array(mode="r") as A: self._shape = tuple(int(dim.domain[1] - dim.domain[0] + 1) for dim in A.schema.domain) return self._shape @property def nonempty_domain(self) -> Optional[Tuple[Any, ...]]: if self._nonempty_domain is None: with self.open_array(mode="r") as A: # nonempty_domain() can return None if the array is empty. ned = A.nonempty_domain() if ned is None: self._nonempty_domain = None else: self._nonempty_domain = tuple(ned) if isinstance(ned[0], tuple) else (ned,) return self._nonempty_domain @property def ndim(self) -> int: """Get number of dimensions.""" if self._ndim is None: with self.open_array(mode="r") as A: self._ndim = A.schema.ndim # self._ndim = len(self.shape) return self._ndim
[docs] @contextmanager def open_array(self, mode: Optional[str] = None): """Context manager for array operations. Uses the externally provided array if available, otherwise opens from URI. Args: mode: Desired mode for the operation ('r', 'w', 'm', 'd'). If an external array is used, this mode must be compatible with (or same as) the mode the external array was opened with. If None, uses the CellArray's default mode. """ if self._array_passed_in and self._opened_array_external is not None: if not self._opened_array_external.isopen: # Attempt to reopen if closed. This assumes the user might have closed it # and expects CellArr to reopen it if still possible. try: self._opened_array_external.reopen() except Exception as e: raise tiledb.TileDBError( f"Externally provided array is closed and could not be reopened: {e}" ) from e effective_mode = mode if mode is not None else self._opened_array_external.mode current_external_mode = self._opened_array_external.mode if effective_mode == "r" and current_external_mode not in ["r", "w", "m"]: # Read ops ok on write/modify modes pass elif effective_mode in ["w", "d"] and current_external_mode != effective_mode: raise tiledb.TileDBError( f"Requested operation mode '{effective_mode}' is incompatible with the " f"externally provided array's mode '{current_external_mode}'. " "Ensure the external array is opened in a compatible mode." ) # DO NOT close self._opened_array_external here; its lifecycle is managed by the user. yield self._opened_array_external else: effective_mode = mode if mode is not None else self.mode effective_mode = effective_mode if effective_mode is not None else "r" array = tiledb.open(self.uri, mode=effective_mode, ctx=self._ctx) try: yield array finally: array.close()
[docs] def __getitem__(self, key: Union[slice, EllipsisType, Tuple[Union[slice, List[int]], ...], EllipsisType]): """Get item implementation that routes to either direct slicing or multi_index based on the type of indices provided. Args: key: Slice or list of indices for each dimension in the array. """ if not isinstance(key, tuple): key = (key,) if len(key) > self.ndim: raise IndexError(f"Invalid number of dimensions: got {len(key)}, expected {self.ndim}") # Normalize all indices normalized_key = tuple(SliceHelper.normalize_index(idx, self.shape[i]) for i, idx in enumerate(key)) num_ellipsis = sum(isinstance(i, EllipsisType) for i in normalized_key) if num_ellipsis > 1: raise IndexError(f"Found more than 1 Ellipsis (...) in key: {normalized_key}") # Check if we can use direct slicing use_direct = all(isinstance(idx, (slice, EllipsisType)) for idx in normalized_key) if use_direct: return self._direct_slice(normalized_key) else: if num_ellipsis > 0: raise IndexError(f"tiledb does not support ellipsis in multi-index access: {normalized_key}") return self._multi_index(normalized_key)
@abstractmethod def _direct_slice(self, key: Tuple[Union[slice, EllipsisType], ...]) -> np.ndarray: """Implementation for direct slicing.""" pass @abstractmethod def _multi_index(self, key: Tuple[Union[slice, List[int]], ...]) -> np.ndarray: """Implementation for multi-index access.""" pass
[docs] def vacuum(self) -> None: """Remove deleted fragments from the array.""" tiledb.vacuum(self.uri)
[docs] def consolidate(self, config: Optional[ConsolidationConfig] = None) -> None: """Consolidate array fragments. Args: config: Optional consolidation configuration. """ if config is None: config = ConsolidationConfig() consolidation_cfg = tiledb.Config() consolidation_cfg["sm.consolidation.steps"] = config.steps consolidation_cfg["sm.consolidation.step_min_frags"] = config.step_min_frags consolidation_cfg["sm.consolidation.step_max_frags"] = config.step_max_frags consolidation_cfg["sm.consolidation.buffer_size"] = config.buffer_size consolidation_cfg["sm.mem.total_budget"] = config.total_budget tiledb.consolidate(self.uri, config=consolidation_cfg) if config.vacuum_after: self.vacuum()
[docs] @abstractmethod def write_batch(self, data: Union[np.ndarray, sparse.spmatrix], start_row: int, **kwargs) -> None: """Write a batch of data to the array starting at the specified row. Args: data: Data to write (numpy array for dense, scipy sparse matrix for sparse). start_row: Starting row index for writing. **kwargs: Additional arguments for write operation. """ pass