try:
from types import EllipsisType
except ImportError:
# TODO: This is required for Python <3.10. Remove once Python 3.9 reaches EOL in October 2025
EllipsisType = type(...)
from typing import List, Optional, Tuple, Union
import numpy as np
import tiledb
from ..utils.config import CellArrConfig
__author__ = "Jayaram Kancherla"
__copyright__ = "Jayaram Kancherla"
__license__ = "MIT"
[docs]
def create_cellarray(
uri: str,
shape: Optional[Tuple[Optional[int], ...]] = None,
attr_dtype: Optional[Union[str, np.dtype]] = None,
sparse: bool = False,
mode: str = None,
config: Optional[CellArrConfig] = None,
dim_names: Optional[List[str]] = None,
dim_dtypes: Optional[List[Union[str, np.dtype]]] = None,
attr_name: str = "data",
**kwargs,
):
"""Factory function to create a new TileDB cell array.
Args:
uri:
Array URI.
shape:
Optional array shape. If None or contains None, uses dtype max.
attr_dtype:
Data type for the attribute. Defaults to float32.
sparse:
Whether to create a sparse array.
mode:
Array open mode. Defaults to None for automatic switching.
config:
Optional configuration.
dim_names:
Optional list of dimension names.
dim_dtypes:
Optional list of dimension dtypes. Defaults to numpy's uint32.
attr_name:
Name of the data attribute.
**kwargs:
Additional arguments for array creation.
Returns:
CellArray instance.
Raises:
ValueError: If dimensions are invalid or inputs are inconsistent.
"""
config = config or CellArrConfig()
tiledb_ctx = tiledb.Config(config.ctx_config) if config.ctx_config else None
if attr_dtype is None:
attr_dtype = np.float32
if isinstance(attr_dtype, str):
attr_dtype = np.dtype(attr_dtype)
if shape is None and dim_dtypes is None:
raise ValueError("Either 'shape' or 'dim_dtypes' must be provided.")
if shape is not None:
if len(shape) not in (1, 2):
raise ValueError("Shape must have 1 or 2 dimensions.")
# Set dimension dtypes, defaults to numpy uint32
if dim_dtypes is None:
dim_dtypes = [np.uint32] * len(shape)
else:
if len(dim_dtypes) not in (1, 2):
raise ValueError("Array must have 1 or 2 dimensions.")
dim_dtypes = [np.dtype(dt) if isinstance(dt, str) else dt for dt in dim_dtypes]
if shape is None:
shape = tuple(np.iinfo(dt).max if np.issubdtype(dt, np.integer) else None for dt in dim_dtypes)
if None in shape:
shape = tuple(
np.iinfo(dt).max if s is None and np.issubdtype(dt, np.integer) else s for s, dt in zip(shape, dim_dtypes)
)
if dim_names is None:
dim_names = [f"dim_{i}" for i in range(len(shape))]
# Validate all input lengths
if not (len(shape) == len(dim_dtypes) == len(dim_names)):
raise ValueError("Lengths of 'shape', 'dim_dtypes', and 'dim_names' must match.")
dom = tiledb.Domain(
*[
tiledb.Dim(
name=name,
# supporting empty dimensions
domain=(0, 0 if s == 0 else s - 1),
tile=min(1 if s == 0 else s // 2, config.tile_capacity // 2),
dtype=dt,
)
for name, s, dt in zip(dim_names, shape, dim_dtypes)
],
ctx=tiledb_ctx,
)
attr_obj = tiledb.Attr(
name=attr_name,
dtype=attr_dtype,
filters=config.attrs_filters.get(attr_name, config.attrs_filters.get("", None)),
ctx=tiledb_ctx,
)
schema = tiledb.ArraySchema(
domain=dom,
attrs=[attr_obj],
cell_order=config.cell_order,
tile_order=config.tile_order,
sparse=sparse,
coords_filters=config.coords_filters,
offsets_filters=config.offsets_filters,
ctx=tiledb_ctx,
)
tiledb.Array.create(uri, schema, ctx=tiledb_ctx)
# Import here to avoid circular imports
from .dense import DenseCellArray
from .sparse import SparseCellArray
return (
SparseCellArray(uri=uri, attr=attr_name, mode=mode, config_or_context=tiledb_ctx)
if sparse
else DenseCellArray(uri=uri, attr=attr_name, mode=mode, config_or_context=tiledb_ctx)
)
[docs]
class SliceHelper:
"""Helper class for handling array slicing operations."""
[docs]
@staticmethod
def is_contiguous_indices(indices: List[int]) -> Optional[slice]:
if not indices:
return None
sorted_indices = sorted(list(set(indices)))
if not sorted_indices:
return None
if len(sorted_indices) == 1:
return slice(sorted_indices[0], sorted_indices[0] + 1, None)
diffs = np.diff(sorted_indices)
if np.all(diffs == 1):
return slice(sorted_indices[0], sorted_indices[-1] + 1, None)
return None
[docs]
@staticmethod
def normalize_index(
idx: Union[int, range, slice, List[int], EllipsisType], dim_size: int
) -> Union[slice, List[int], EllipsisType]:
"""Normalize index to handle negative indices and ensure consistency."""
if isinstance(idx, EllipsisType):
return idx
# Convert ranges to slices
if isinstance(idx, range):
idx = slice(idx.start, idx.stop, idx.step)
if isinstance(idx, slice):
start = idx.start
stop = idx.stop
step = idx.step
# Resolve None to full dimension slice parts
if start is None:
start = 0
if stop is None:
stop = dim_size
# Handle negative indices
if start < 0:
start += dim_size
if stop < 0:
stop += dim_size
# slice allows start > dim_size or stop < 0 to result in empty slices.
# Note: start == dim_size is OK for empty slice like arr[dim_size:]
if start < 0 or (start >= dim_size and dim_size > 0):
if not (start == dim_size and (step is None or step > 0)):
if start >= dim_size:
raise IndexError(
f"Start index {idx.start if idx.start is not None else 'None'} results in {start}, which is out of bounds for dimension size {dim_size}."
)
# Clamping slice arguments to dimensions
stop = min(stop, dim_size)
start = max(0, start)
return slice(start, stop, step)
elif isinstance(idx, list):
if not idx:
return []
norm_idx = [i if i >= 0 else dim_size + i for i in idx]
if any(i < 0 or i >= dim_size for i in norm_idx):
oob_indices = [orig_i for orig_i, norm_i in zip(idx, norm_idx) if not (0 <= norm_i < dim_size)]
raise IndexError(
f"List indices {oob_indices} (original values) are out of bounds for dimension size {dim_size}."
)
# TileDB multi_index usually returns data sorted by coordinates
return sorted(list(set(norm_idx)))
elif isinstance(idx, (int, np.integer)):
norm_idx = int(idx)
if norm_idx < 0:
norm_idx += dim_size
if not (0 <= norm_idx < dim_size):
raise IndexError(f"Index {idx} out of bounds for dimension size {dim_size}")
return slice(norm_idx, norm_idx + 1, None)
else:
raise TypeError(f"Index type {type(idx)} not supported for normalization.")
[docs]
def create_group(output_path, group_name):
tiledb.group_create(f"{output_path}/{group_name}")