from typing import List, Optional, Tuple, Union
import numpy as np
import tiledb
from .config import CellArrConfig
__author__ = "Jayaram Kancherla"
__copyright__ = "Jayaram Kancherla"
__license__ = "MIT"
[docs]
def create_cellarray(
uri: str,
shape: Optional[Tuple[Optional[int], ...]] = None,
attr_dtype: Optional[Union[str, np.dtype]] = None,
sparse: bool = False,
mode: str = None,
config: Optional[CellArrConfig] = None,
dim_names: Optional[List[str]] = None,
dim_dtypes: Optional[List[Union[str, np.dtype]]] = None,
attr_name: str = "data",
**kwargs,
):
"""Factory function to create a new TileDB cell array.
Args:
uri:
Array URI.
shape:
Optional array shape. If None or contains None, uses dtype max.
attr_dtype:
Data type for the attribute. Defaults to float32.
sparse:
Whether to create a sparse array.
mode:
Array open mode. Defaults to None for automatic switching.
config:
Optional configuration.
dim_names:
Optional list of dimension names.
dim_dtypes:
Optional list of dimension dtypes.
attr_name:
Name of the data attribute.
**kwargs:
Additional arguments for array creation.
Returns:
CellArray instance.
Raises:
ValueError: If dimensions are invalid or inputs are inconsistent.
"""
config = config or CellArrConfig()
if attr_dtype is None:
attr_dtype = np.float32
if isinstance(attr_dtype, str):
attr_dtype = np.dtype(attr_dtype)
# Require either shape or dim_dtypes
if shape is None and dim_dtypes is None:
raise ValueError("Either 'shape' or 'dim_dtypes' must be provided.")
if shape is not None:
if len(shape) not in (1, 2):
raise ValueError("Only 1D and 2D arrays are supported.")
# Set dimension dtypes, defaults to numpy uint32
if dim_dtypes is None:
dim_dtypes = [np.uint32] * len(shape)
else:
if len(dim_dtypes) not in (1, 2):
raise ValueError("Only 1D and 2D arrays are supported.")
dim_dtypes = [np.dtype(dt) if isinstance(dt, str) else dt for dt in dim_dtypes]
# Calculate shape from dtypes if needed
if shape is None:
shape = tuple(np.iinfo(dt).max if np.issubdtype(dt, np.integer) else None for dt in dim_dtypes)
if None in shape:
shape = tuple(
np.iinfo(dt).max if s is None and np.issubdtype(dt, np.integer) else s for s, dt in zip(shape, dim_dtypes)
)
# Set dimension names
if dim_names is None:
dim_names = [f"dim_{i}" for i in range(len(shape))]
# Validate all input lengths
if not (len(shape) == len(dim_dtypes) == len(dim_names)):
raise ValueError("Lengths of 'shape', 'dim_dtypes', and 'dim_names' must match.")
dom = tiledb.Domain(
*[
tiledb.Dim(name=name, domain=(0, s - 1), tile=min(s, config.tile_capacity), dtype=dt)
for name, s, dt in zip(dim_names, shape, dim_dtypes)
],
ctx=tiledb.Ctx(config.ctx_config),
)
attr = tiledb.Attr(
name=attr_name,
dtype=attr_dtype,
filters=config.attrs_filters.get(attr_name, config.attrs_filters.get("", None)),
)
schema = tiledb.ArraySchema(
domain=dom,
attrs=[attr],
cell_order=config.cell_order,
tile_order=config.tile_order,
sparse=sparse,
coords_filters=config.coords_filters,
offsets_filters=config.offsets_filters,
ctx=tiledb.Ctx(config.ctx_config),
)
tiledb.Array.create(uri, schema)
# Import here to avoid circular imports
from .DenseCellArray import DenseCellArray
from .SparseCellArray import SparseCellArray
# Return appropriate array type
return SparseCellArray(uri, attr=attr_name, mode=mode) if sparse else DenseCellArray(uri, attr=attr_name, mode=mode)
[docs]
class SliceHelper:
"""Helper class for handling array slicing operations."""
[docs]
@staticmethod
def is_contiguous_indices(indices: List[int]) -> Optional[slice]:
"""Check if indices can be represented as a contiguous slice."""
if not indices:
return None
diffs = np.diff(indices)
if np.all(diffs == 1):
return slice(indices[0], indices[-1] + 1, None)
return None
[docs]
@staticmethod
def normalize_index(idx: Union[int, slice, List[int]], dim_size: int) -> Union[slice, List[int]]:
"""Normalize index to handle negative indices and ensure consistency."""
# Convert ranges to slices
if isinstance(idx, range):
idx = slice(idx.start, idx.stop, idx.step)
if isinstance(idx, slice):
start = idx.start if idx.start is not None else 0
stop = idx.stop if idx.stop is not None else dim_size
step = idx.step
# Handle negative indices
if start < 0:
start = dim_size + start
if stop < 0:
stop = dim_size + stop
if start < 0 or start > dim_size:
raise IndexError(f"Start index {start} out of bounds for dimension size {dim_size}")
if stop < 0 or stop > dim_size:
raise IndexError(f"Stop index {stop} out of bounds for dimension size {dim_size}")
return slice(start, stop, step)
elif isinstance(idx, list):
norm_idx = [i if i >= 0 else dim_size + i for i in idx]
if any(i < 0 or i >= dim_size for i in norm_idx):
raise IndexError(f"List indices {idx} out of bounds for dimension size {dim_size}")
return norm_idx
else: # Single integer index
norm_idx = idx if idx >= 0 else dim_size + idx
if norm_idx < 0 or norm_idx >= dim_size:
raise IndexError(f"Index {idx} out of bounds for dimension size {dim_size}")
return slice(norm_idx, norm_idx + 1, None)
[docs]
def create_group(output_path, group_name):
tiledb.group_create(f"{output_path}/{group_name}")