Source code for genomicarrays.build_genomicarray

"""Build the `GenomicArrayDatset`.

This modules provides tools for converting genomic data from BigWig format to TileDB.
It supports parallel processing for handling large collection of genomic datasets.

Example:

    .. code-block:: python

        import pyBigWig as bw
        import numpy as np
        import tempfile
        from genomicarrays import build_genomicarray, MatrixOptions

        # Create a temporary directory
        tempdir = tempfile.mkdtemp()

        # Read BigWig objects
        bw1 = bw.open("path/to/object1.bw", "r")
        # or just provide the path
        bw2 = "path/to/object2.bw"

        features = pd.DataFrame({
            "seqnames": ["chr1", "chr1"],
            "starts": [1000, 2000],
            "ends": [1500, 2500]
        })

        # Build GenomicArray
        dataset = build_genomicarray(
            features=features
            output_path=tempdir,
            files=[bw1, bw2],
            matrix_options=MatrixOptions(dtype=np.float32),
        )
"""

import os
import warnings
from multiprocessing import Pool
from typing import Union

import pandas as pd
from cellarr import buildutils_tiledb_frame as utf
from pyfaidx import Fasta

from . import build_options as bopt
from . import buildutils_tiledb_array as uta
from . import utils_bw as ubw
from .GenomicArrayDataset import GenomicArrayDataset

__author__ = "Jayaram Kancherla"
__copyright__ = "Jayaram Kancherla"
__license__ = "MIT"


# TODO: Accept files as a dictionary with names to each dataset.
[docs] def build_genomicarray( files: list, output_path: str, features: Union[str, pd.DataFrame], genome_fasta: str, # genome: Union[str, pd.DataFrame] = "hg38", sample_metadata: Union[pd.DataFrame, str] = None, sample_metadata_options: bopt.SampleMetadataOptions = bopt.SampleMetadataOptions(), matrix_options: bopt.MatrixOptions = bopt.MatrixOptions(), feature_annotation_options: bopt.FeatureAnnotationOptions = bopt.FeatureAnnotationOptions(), optimize_tiledb: bool = True, num_threads: int = 1, ): """Generate the `GenomicArrayDatset`. All files are expected to be consistent and any modifications to make them consistent is outside the scope of this function and package. Args: files: List of file paths to `BigWig` files. output_path: Path to where the output TileDB files should be stored. features: A :py:class:`~pandas.DataFrame` containing the input genomic intervals.. Alternatively, may provide path to the file containing a list of intervals. In this case, the first row is expected to contain the column names, "seqnames", "starts" and "ends". Additionally, the file may contain a column `sequences`, to specify the sequence string for each region. Otherwise, provide a link to the fasta file using the ``genome_fasta`` parameter. genome_fasta: Path to a fasta file containing the sequence information. Sequence information will be updated for each region in ``features`` if the DataFrame/path does not contain a column `sequences`. sample_metadata: A :py:class:`~pandas.DataFrame` containing the sample metadata for each file in ``files``. Hences the number of rows in the dataframe must match the number of ``files``. Alternatively, may provide path to the file containing a concatenated sample metadata across all BigWig files. In this case, the first row is expected to contain the column names. Additionally, the order of rows is expected to be in the same order as the input list of ``files``. Defaults to `None`, in which case, we create a simple sample metadata dataframe containing the list of datasets, aka each BigWig files. Each dataset is named as ``sample_{i}`` where `i` refers to the index position of the object in ``files``. sample_metadata_options: Optional parameters when generating ``sample_metadata`` store. matrix_options: Optional parameters when generating ``matrix`` store. feature_annotation_options: Optional parameters when generating ``feature_annotation`` store. optimize_tiledb: Whether to run TileDB's vaccum and consolidation (may take long). num_threads: Number of threads. Defaults to 1. """ if not os.path.isdir(output_path): raise ValueError("'output_path' must be a directory.") #### ## Process genome information #### # if isinstance(genome, str): # chrom_sizes = pd.read_csv( # "https://hgdownload.soe.ucsc.edu/goldenpath/hg38/bigZips/hg38.chrom.sizes", # sep="\t", # header=None, # names=["chrom", "length"], # ) # elif isinstance(genome, pd.DataFrame): # chrom_sizes = genome # if "chrom" not in chrom_sizes: # raise ValueError("genome does not contain column: 'chrom'.") # if "length" not in chrom_sizes: # raise ValueError("genome does not contain column: 'length'.") # else: # raise TypeError( # "'genome' is not an expected type (either 'str' or 'Dataframe')." # ) #### ## Writing the features aka interval regions #### if isinstance(features, str): input_intervals = pd.read_csv(features, header=0) elif isinstance(features, pd.DataFrame): input_intervals = features.copy() required_cols = {"seqnames", "starts", "ends"} if not required_cols.issubset(input_intervals.columns): missing = required_cols - set(input_intervals.columns) raise ValueError(f"Missing required columns: {missing}") else: raise TypeError("'input_intervals' is not an expected type (either 'str' or 'Dataframe').") # append start index for each interval input_intervals["widths"] = input_intervals["ends"] - input_intervals["starts"] total_length = None outsize_per_feature = 1 if feature_annotation_options.aggregate_function is None: total_length = int(input_intervals["widths"].sum()) counter = input_intervals["widths"].shift(1) counter[0] = 0 input_intervals["genarr_feature_start_index"] = counter.cumsum().astype(int) else: outsize_per_feature = feature_annotation_options.expected_agg_function_length counter = [outsize_per_feature] * len(input_intervals) total_length = len(input_intervals) * outsize_per_feature counter[0] = 0 input_intervals["genarr_feature_start_index"] = pd.Series(counter).cumsum().astype(int) ends = input_intervals["genarr_feature_start_index"].shift(-1) ends.iloc[-1] = total_length input_intervals["genarr_feature_end_index"] = ends.astype(int) if not feature_annotation_options.skip: if "sequence" not in input_intervals.columns: gen_fa = Fasta(genome_fasta, as_raw=True) sequences = [] for row in input_intervals.itertuples(): seq = gen_fa.get_seq(row.seqnames, row.starts, row.ends) sequences.append(seq) input_intervals["sequences"] = sequences _col_types = utf.infer_column_types(input_intervals, feature_annotation_options.column_types) if "genarr_feature_index" not in input_intervals.columns: input_intervals["genarr_feature_index"] = range(0, len(input_intervals)) _feature_output_uri = f"{output_path}/{feature_annotation_options.tiledb_store_name}" utf.create_tiledb_frame_from_dataframe(_feature_output_uri, input_intervals, column_types=_col_types) if optimize_tiledb: uta.optimize_tiledb_array(_feature_output_uri) #### ## Writing the sample metadata file #### _samples = [] for idx, _ in enumerate(files): _samples.append(f"sample_{idx + 1}") if sample_metadata is None: warnings.warn( "Sample metadata is not provided, each dataset in 'files' is considered a sample", UserWarning, ) sample_metadata = pd.DataFrame({"genarr_sample": _samples}) elif isinstance(sample_metadata, str): sample_metadata = pd.read_csv(sample_metadata, header=0) if "genarr_sample" not in sample_metadata.columns: sample_metadata["genarr_sample"] = _samples elif isinstance(sample_metadata, pd.DataFrame): if "genarr_sample" not in sample_metadata.columns: sample_metadata["genarr_sample"] = _samples else: raise TypeError("'sample_metadata' is not an expected type.") if not sample_metadata_options.skip: _col_types = utf.infer_column_types(sample_metadata, sample_metadata_options.column_types) _sample_output_uri = f"{output_path}/{sample_metadata_options.tiledb_store_name}" utf.create_tiledb_frame_from_dataframe(_sample_output_uri, sample_metadata, column_types=_col_types) if optimize_tiledb: uta.optimize_tiledb_array(_sample_output_uri) #### ## Writing the genomic ranges file #### if not matrix_options.skip: _cov_uri = f"{output_path}/{matrix_options.tiledb_store_name}" uta.create_tiledb_array( _cov_uri, matrix_attr_name=matrix_options.matrix_attr_name, x_dim_dtype=feature_annotation_options.dtype, y_dim_dtype=sample_metadata_options.dtype, matrix_dim_dtype=matrix_options.dtype, x_dim_length=total_length, y_dim_length=len(files), is_sparse=False, ) all_bws_options = [ ( _cov_uri, input_intervals, bwpath, idx, feature_annotation_options.aggregate_function, total_length, outsize_per_feature, ) for idx, bwpath in enumerate(files) ] if num_threads > 1: with Pool(num_threads) as p: p.map(_wrapper_extract_bwinfo, all_bws_options) else: for opt in all_bws_options: _wrapper_extract_bwinfo(opt) if optimize_tiledb: uta.optimize_tiledb_array(_cov_uri) return GenomicArrayDataset( dataset_path=output_path, sample_metadata_uri=sample_metadata_options.tiledb_store_name, feature_annotation_uri=feature_annotation_options.tiledb_store_name, matrix_tdb_uri=matrix_options.tiledb_store_name, )
def _write_intervals_to_tiledb(outpath, intervals, bwpath, bwidx, agg_func, total_length, outsize_per_feature): """Wrapper to extract the data for the given intervals from the bigwig file and write the output to the tiledb file.""" data = ubw.wrapper_extract_bw_values( bw_path=bwpath, intervals=intervals, agg_func=agg_func, total_length=total_length, outsize_per_feature=outsize_per_feature, ) if data is not None and len(data) > 0: uta.write_frame_intervals_to_tiledb(outpath, data=data, y_idx=bwidx) def _wrapper_extract_bwinfo(args): """Wrapper for multiprocessing multiple files and intervals.""" counts_uri, input_intervals, bwpath, idx, agg_func, total_length, outsize_per_feature = args return _write_intervals_to_tiledb(counts_uri, input_intervals, bwpath, idx, agg_func, total_length, outsize_per_feature)