from typing import List, Literal, Optional, Union
import numpy as np
import pandas as pd
import tiledb
from cellarr_array import SparseCellArray
from .base import CellArrayFrame
__author__ = "Jayaram Kancherla"
__copyright__ = "Jayaram Kancherla"
__license__ = "MIT"
[docs]
class SparseCellArrayFrame(CellArrayFrame):
"""Handler for sparse dataframes using a 2D sparse TileDB array.
This class wraps a `cellarr_array.SparseCellArray` instance, assuming
it's a 2D sparse array with string/object data.
"""
[docs]
def __init__(
self,
uri: Optional[str] = None,
tiledb_array_obj: Optional[tiledb.Array] = None,
mode: Optional[Literal["r", "w", "d", "m"]] = None,
config_or_context: Optional[Union[tiledb.Config, tiledb.Ctx]] = None,
):
"""Initialize the object.
Args:
uri:
URI to the array.
tiledb_array_obj:
Optional, an already opened ``tiledb.Array`` instance.
mode:
Default open mode.
config_or_context:
Optional config or context object.
"""
super().__init__(uri=uri, tiledb_array_obj=tiledb_array_obj, mode=mode, config_or_context=config_or_context)
self._array = SparseCellArray(
uri=self.uri,
tiledb_array_obj=tiledb_array_obj,
attr="value",
mode=mode,
config_or_context=self._ctx,
return_sparse=False,
)
[docs]
def write_dataframe(self, df: pd.DataFrame, **kwargs) -> None:
"""Write a sparse pandas DataFrame to a 2D sparse TileDB array.
The DataFrame is converted to a coordinate format (row_idx, col_idx, value).
Args:
df:
The sparse pandas DataFrame to write.
**kwargs:
Additional arguments for the write operation.
"""
if df is None:
return
sdf = df.stack(future_stack=True).dropna()
if sdf.empty:
return
coords = sdf.index.to_frame()
rows = coords.iloc[:, 0].to_numpy()
cols = coords.iloc[:, 1].to_numpy()
values = sdf.to_numpy(dtype=str)
with self._array.open_array(mode="w") as A:
A[rows, cols] = values
[docs]
def read_dataframe(
self,
subset: Optional[Union[slice, int, str]] = None,
columns: Optional[List[str]] = None,
query: Optional[str] = None,
**kwargs,
) -> pd.DataFrame:
"""Read a pandas DataFrame from the TileDB array.
Args:
subset:
A slice or index to select rows.
columns:
A list of column names to read.
query:
A TileDB query condition string.
**kwargs:
Additional arguments for the read operation.
Returns:
The pandas DataFrame.
"""
if columns is not None:
final_columns = pd.Index(columns)
else:
final_columns = self.columns
final_index = None
row_dim_dtype = self._array.dim_dtypes[0]
if subset is None:
if query is None:
final_index = self.index
elif isinstance(subset, slice):
if np.issubdtype(row_dim_dtype, np.integer):
start = subset.start if subset.start is not None else 0
if subset.stop is not None:
final_index = pd.RangeIndex(
start=start, stop=subset.stop, step=subset.step if subset.step is not None else 1
)
else:
all_rows = self.index
final_index = all_rows[all_rows.slice_indexer(start, None, subset.step)]
else:
all_rows = self.index
final_index = all_rows[subset]
subset = final_index.tolist()
elif isinstance(subset, (int, str)):
final_index = pd.Index([subset])
elif isinstance(subset, list):
final_index = pd.Index(subset)
if query:
slice_key = query
elif subset is not None and columns is not None:
slice_key = (subset, columns)
elif subset is not None:
slice_key = (subset, slice(None))
elif columns is not None:
slice_key = (slice(None), columns)
else:
slice_key = (slice(None), slice(None))
data = self._array[slice_key]
if not data or not data["value"].size:
return pd.DataFrame(index=final_index, columns=final_columns)
rows = data[self._array.dim_names[0]]
cols = data[self._array.dim_names[1]]
values = data["value"]
if len(rows) > 0 and isinstance(rows[0], bytes):
rows = [r.decode() for r in rows]
if len(cols) > 0 and isinstance(cols[0], bytes):
cols = [c.decode() for c in cols]
s = pd.Series(values, index=[rows, cols])
s.index.names = self._array.dim_names
df = s.unstack()
df_index_to_use = final_index if final_index is not None else df.index
df = df.reindex(index=df_index_to_use, columns=final_columns)
try:
df.index = pd.to_numeric(df.index)
except (ValueError, TypeError):
pass
try:
df.columns = pd.to_numeric(df.columns)
except (ValueError, TypeError):
pass
for col in df.columns:
try:
df[col] = pd.to_numeric(df[col])
except (ValueError, TypeError):
pass
if columns:
df = df[columns]
df.index.name = None
df.columns.name = None
return df
[docs]
def get_shape(self) -> tuple:
"""Get the shape based on the non-empty domain for sparse arrays."""
with self._array.open_array(mode="r") as A:
non_empty = A.nonempty_domain()
if non_empty is None:
return (0, 0)
rows_ned, cols_ned = non_empty
if np.issubdtype(self._array.dim_dtypes[0], np.str_):
n_rows = len(A.unique_dim_values(self._array.dim_names[0]))
else:
n_rows = rows_ned[1] + 1 if isinstance(rows_ned[1], (int, np.integer)) else 0
if np.issubdtype(self._array.dim_dtypes[1], np.str_):
n_cols = len(A.unique_dim_values(self._array.dim_names[1]))
else:
n_cols = cols_ned[1] + 1 if isinstance(cols_ned[1], (int, np.integer)) else 0
return (n_rows, n_cols)
[docs]
def append_dataframe(self, df: pd.DataFrame, row_offset: Optional[int] = None) -> None:
"""Append data points from a pandas DataFrame to the sparse TileDB array.
If row_offset is provided, adjusts the row indices of the appended data.
Assumes integer row dimensions for offset calculation.
Args:
df:
The pandas DataFrame to write.
row_offset:
Row offset to write the rows to.
"""
if df.empty:
return
if row_offset is None:
row_dim_type = self._array.dim_dtypes[0]
if np.issubdtype(row_dim_type, np.integer):
current_shape = self.get_shape()
if current_shape[0] is not None and current_shape[0] > 0:
row_offset = current_shape[0]
else:
row_offset = 0
else:
row_offset = 0
sdf = df.stack(future_stack=True).dropna()
if sdf.empty:
return
coords = sdf.index.to_frame()
rows = coords.iloc[:, 0].to_numpy()
cols = coords.iloc[:, 1].to_numpy()
values = sdf.to_numpy(dtype=str)
rows_adjusted = rows
if row_offset != 0:
row_dim_type = self._array.dim_dtypes[0]
if np.issubdtype(row_dim_type, np.integer):
rows_adjusted = rows + row_offset
else:
print(f"Warning: Row offset {row_offset} ignored for non-integer row dimension.")
with self._array.open_array(mode="w") as A:
A[rows_adjusted, cols] = values
[docs]
def __getitem__(self, key):
"""Optimized slicing for the DataFrame."""
if isinstance(key, str): # Column selection, e.g., df['col_A']
return self.read_dataframe(columns=[key])
if isinstance(key, list): # Column selection, e.g., df[['col_A', 'col_B']]
return self.read_dataframe(columns=key)
if isinstance(key, (slice, int)): # Row selection, e.g., df[0:10] or df[3]
return self.read_dataframe(subset=key)
if isinstance(key, tuple): # Row and column selection, e.g., df[0:10, ['col_A']]
rows, cols = key
cols_list = cols if isinstance(cols, list) else [cols]
# Support positional indexing for columns
if cols_list and all(isinstance(c, int) for c in cols_list):
all_cols = self.columns
try:
cols_list = [all_cols[i] for i in cols_list]
except IndexError:
raise IndexError("Column index out of bounds")
return self.read_dataframe(subset=rows, columns=cols_list)
raise TypeError(f"Unsupported key type for slicing: {type(key)}")
@property
def shape(self) -> tuple:
"""Get the shape (unique rows, unique columns) of the dataframe."""
return self.get_shape()
@property
def columns(self) -> pd.Index:
"""Get the column names (unique values from 2nd dim) of the dataframe."""
with self._array.open_array("r") as A:
cols = A.unique_dim_values(self._array.dim_names[1])
decoded_cols = [c.decode() if isinstance(c, bytes) else c for c in cols]
try:
return pd.Index(pd.to_numeric(decoded_cols))
except (ValueError, TypeError):
return pd.Index(decoded_cols)
@property
def index(self) -> pd.Index:
"""Get the row index (unique values from 1st dim) of the dataframe."""
with self._array.open_array("r") as A:
rows = A.unique_dim_values(self._array.dim_names[0])
decoded_rows = [r.decode() if isinstance(r, bytes) else r for r in rows]
try:
return pd.Index(pd.to_numeric(decoded_rows))
except (ValueError, TypeError):
return pd.Index(decoded_rows)