try:
from types import EllipsisType
except ImportError:
# TODO: This is required for Python <3.10. Remove once Python 3.9 reaches EOL in October 2025
EllipsisType = type(...)
from typing import List, Optional, Tuple, Union
import numpy as np
import tiledb
from ..utils.config import CellArrConfig
__author__ = "Jayaram Kancherla"
__copyright__ = "Jayaram Kancherla"
__license__ = "MIT"
[docs]
def create_cellarray(
uri: str,
shape: Optional[Tuple[Optional[int], ...]] = None,
attr_dtype: Optional[Union[str, np.dtype]] = None,
sparse: bool = False,
mode: str = None,
config: Optional[CellArrConfig] = None,
dim_names: Optional[List[str]] = None,
dim_dtypes: Optional[List[Union[str, np.dtype]]] = None,
attr_name: str = "data",
**kwargs,
):
"""Factory function to create a new TileDB cell array.
Args:
uri:
Array URI.
shape:
Optional array shape. If None or contains None, uses dtype max.
attr_dtype:
Data type for the attribute. Defaults to float32.
sparse:
Whether to create a sparse array.
mode:
Array open mode. Defaults to None for automatic switching.
config:
Optional configuration.
dim_names:
Optional list of dimension names.
dim_dtypes:
Optional list of dimension dtypes. Defaults to numpy's uint32.
attr_name:
Name of the data attribute.
**kwargs:
Additional arguments for array creation.
Returns:
CellArray instance.
Raises:
ValueError: If dimensions are invalid or inputs are inconsistent.
"""
config = config or CellArrConfig()
tiledb_ctx = tiledb.Config(config.ctx_config) if config.ctx_config else None
if attr_dtype is None:
attr_dtype = np.float32
if isinstance(attr_dtype, str):
attr_dtype = np.dtype(attr_dtype)
if shape is None and dim_dtypes is None:
raise ValueError("Either 'shape' or 'dim_dtypes' must be provided.")
if shape is not None:
if len(shape) not in (1, 2):
raise ValueError("Shape must have 1 or 2 dimensions.")
# Set dimension dtypes, defaults to numpy uint32
if dim_dtypes is None:
dim_dtypes = [np.uint32] * len(shape)
else:
if len(dim_dtypes) not in (1, 2):
raise ValueError("Array must have 1 or 2 dimensions.")
dim_dtypes = [np.dtype(dt) if isinstance(dt, str) else dt for dt in dim_dtypes]
if shape is None:
shape = tuple(np.iinfo(dt).max if np.issubdtype(dt, np.integer) else None for dt in dim_dtypes)
if None in shape:
shape = tuple(
np.iinfo(dt).max if s is None and np.issubdtype(dt, np.integer) else s for s, dt in zip(shape, dim_dtypes)
)
if dim_names is None:
dim_names = [f"dim_{i}" for i in range(len(shape))]
if not (len(shape) == len(dim_dtypes) == len(dim_names)):
raise ValueError("Lengths of 'shape', 'dim_dtypes', and 'dim_names' must match.")
dims = []
for name, s, dt in zip(dim_names, shape, dim_dtypes):
if np.issubdtype(dt, np.integer):
domain = (0, 0 if s == 0 else s - 1)
is_max_domain = s == np.iinfo(dt).max
if is_max_domain:
# If domain is maxed out, we cannot set a tile extent
# or TileDB will fail on domain expansion.
tile = None
else:
tile = min(1 if s == 0 else s // 2, config.tile_capacity // 2)
dim_dtype = dt
else: # Assumes string or object dtype
domain = (None, None)
tile = None
dim_dtype = "ascii"
dims.append(
tiledb.Dim(
name=name,
domain=domain,
tile=tile,
dtype=dim_dtype,
filters=config.coords_filters,
)
)
dom = tiledb.Domain(*dims, ctx=tiledb_ctx)
attr_obj = tiledb.Attr(
name=attr_name,
dtype=attr_dtype,
filters=config.attrs_filters.get(attr_name, config.attrs_filters.get("", None)),
ctx=tiledb_ctx,
)
schema = tiledb.ArraySchema(
domain=dom,
attrs=[attr_obj],
cell_order=config.cell_order,
tile_order=config.tile_order,
sparse=sparse,
offsets_filters=config.offsets_filters,
ctx=tiledb_ctx,
)
tiledb.Array.create(uri, schema, ctx=tiledb_ctx)
from .dense import DenseCellArray
from .sparse import SparseCellArray
return (
SparseCellArray(uri=uri, attr=attr_name, mode=mode, config_or_context=tiledb_ctx)
if sparse
else DenseCellArray(uri=uri, attr=attr_name, mode=mode, config_or_context=tiledb_ctx)
)
[docs]
class SliceHelper:
"""Helper class for handling array slicing operations."""
[docs]
@staticmethod
def is_contiguous_indices(indices: List[int]) -> Optional[slice]:
"""Checks if a list of indices is contiguous and can be converted to a slice.
Returns None if the list is not contiguous or contains non-integers.
"""
if not indices:
return None
if not all(isinstance(i, (int, np.integer)) for i in indices):
return None
sorted_indices = sorted(list(set(indices)))
if not sorted_indices:
return None
if len(sorted_indices) == 1:
return slice(sorted_indices[0], sorted_indices[0] + 1, None)
diffs = np.diff(sorted_indices)
if np.all(diffs == 1):
return slice(sorted_indices[0], sorted_indices[-1] + 1, None)
return None
[docs]
@staticmethod
def normalize_index(
idx: Union[int, range, slice, List, str, EllipsisType],
dim_size: int,
dim_dtype: np.dtype,
) -> Union[slice, List, EllipsisType]:
"""Normalize index to handle negative indices and ensure consistency."""
is_string_dim = np.issubdtype(dim_dtype, np.str_) or np.issubdtype(dim_dtype, np.bytes_)
if is_string_dim:
if isinstance(idx, (str, bytes)):
return [idx]
if isinstance(idx, list) and all(isinstance(i, (str, bytes)) for i in idx):
return idx
if isinstance(idx, slice):
# For string dimensions, we do not normalize the slice with integer sizes
return idx
raise TypeError(f"Unsupported index type '{type(idx).__name__}' for string dimension.")
if isinstance(idx, EllipsisType):
return idx
if isinstance(idx, range):
idx = slice(idx.start, idx.stop, idx.step)
if isinstance(idx, slice):
start, stop, step = idx.start, idx.stop, idx.step
# Resolve None to full dimension slice parts
if start is None:
start = 0
if stop is None:
stop = dim_size
# Handle negative indices
if start < 0:
start += dim_size
if stop < 0:
stop += dim_size
# Clamping slice arguments to dimensions
stop = min(stop, dim_size)
start = max(0, start)
return slice(start, stop, step)
if isinstance(idx, list):
if not idx:
return []
# This check only applies to integer lists
if not all(isinstance(i, (int, np.integer)) for i in idx):
raise TypeError(
"List indices must be all integers or all strings, but got mixed types or non-string/int types."
)
norm_idx = [i if i >= 0 else dim_size + i for i in idx]
if any(i < 0 or i >= dim_size for i in norm_idx):
raise IndexError("List indices out of bounds for dimension size.")
return sorted(list(set(norm_idx)))
if isinstance(idx, (int, np.integer)):
norm_idx = int(idx)
if norm_idx < 0:
norm_idx += dim_size
if not (0 <= norm_idx < dim_size):
raise IndexError(f"Index {idx} out of bounds for dimension size.")
return slice(norm_idx, norm_idx + 1, None)
raise TypeError(f"Index type {type(idx)} not supported for normalization.")
[docs]
def create_group(output_path, group_name):
tiledb.group_create(f"{output_path}/{group_name}")