Source code for genomicarrays.utils_bw

from typing import Optional, Tuple

import numpy as np
import pandas as pd
import pyBigWig as bw

__author__ = "Jayaram Kancherla"
__copyright__ = "Jayaram Kancherla"
__license__ = "MIT"


[docs] def extract_bw_values( bw_path: str, chrom: str, ) -> Tuple[np.ndarray, int]: bwfile = bw.open(bw_path) if chrom not in bwfile.chroms(): return None, None chrom_length = bwfile.chroms(chrom) data = bwfile.values(chrom, 0, chrom_length) return np.array(data), chrom_length
# def extract_aspd(): # df_data = pd.DataFrame(data, columns=["start", "end", "value"]) # df_data["chrom"] = row.chrom # data_nnz = df_data[df_data["value"] > 0] # data_nnz["tmp_group"] = ( # data_nnz["value"] != data_nnz["value"].shift() # ).cumsum() # results.append( # data_nnz.groupby("tmp_group").agg( # {"start": "first", "end": "last", "value": "first"} # ) # )
[docs] def wrapper_extract_bw_values( bw_path: str, intervals: pd.DataFrame, agg_func: Optional[callable], val_dtype: np.dtype = np.float32, total_length: int = None, outsize_per_feature: int = 1, ) -> np.ndarray: print("outsize_per_feature", outsize_per_feature) if total_length is None: total_length = len(intervals) if agg_func is not None: return extract_bw_values_as_vec( bw_path=bw_path, intervals=intervals, agg_func=agg_func, val_dtype=val_dtype, total_length=total_length, outsize_per_feature=outsize_per_feature, ) else: return extract_bw_intervals_as_vec( bw_path=bw_path, intervals=intervals, val_dtype=val_dtype, total_length=total_length )
[docs] def extract_bw_values_as_vec( bw_path: str, intervals: pd.DataFrame, total_length: int, agg_func: Optional[callable] = None, val_dtype: np.dtype = np.float32, outsize_per_feature: int = 1, ) -> np.ndarray: """Extract data from BigWig for a given region and apply the aggregate function. Args: bw_path: Path to the BigWig file. intervals: List of intervals to extract. agg_func: Aggregate function to apply. Defaults to None. val_dtype: Dtype of the resulting array. total_length: Size of all the regions. outsize_per_feature: Expected length of output after applying the ``agg_func``. Returns: A vector with length as number of intervals X outsize_per_feature, a value if the file contains the data for the corresponding region or ``np.nan`` if the region is not measured. """ bwfile = bw.open(bw_path) results = _get_empty_array(total_length, val_dtype=val_dtype) for i, row in enumerate(intervals.itertuples()): start_idx = i * outsize_per_feature if row.seqnames in bwfile.chroms(): try: data = bwfile.values(row.seqnames, row.starts, row.ends, numpy=True) if data is not None and len(data) != 0: results[start_idx : start_idx + outsize_per_feature] = agg_func(data).flatten() except Exception as _: pass return np.array(results, dtype=val_dtype)
def _get_empty_array(size, val_dtype): out_array = np.empty(size, dtype=val_dtype) out_array.fill(np.nan) return out_array
[docs] def extract_bw_intervals_as_vec( bw_path: str, intervals: pd.DataFrame, total_length: int, val_dtype: np.dtype = np.float32, ) -> np.ndarray: """Extract data from BigWig for a given region. Args: bw_path: Path to the BigWig file. intervals: List of intervals to extract. total_length: Size of all the regions. val_dtype: Dtype of the resulting array. Returns: A vector with length as the number of intervals, a value if the file contains the data for the corresponding region or ``np.nan`` if the region is not measured. """ bwfile = bw.open(bw_path) out_array = _get_empty_array(total_length, val_dtype=val_dtype) for row in intervals.itertuples(): if row.seqnames in bwfile.chroms(): try: data = bwfile.intervals(row.seqnames, row.starts, row.ends) tmp_out = _get_empty_array(row.ends - row.starts, val_dtype=val_dtype) if data is not None and len(data) != 0: for d in data: _strt = max(0, d[0] - row.starts) _end = min(d[1] - row.starts, row.ends) tmp_out[_strt:_end] = d[2] out_array[row.genarr_feature_index_start : row.genarr_feature_index_start + row.ends - row.starts] = ( tmp_out ) except Exception as _: pass return out_array