Source code for moca.wigoperations.query
"""Perform query on bigwig files
"""
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
from builtins import object
import warnings
import numpy as np
import pyBigWig
from ..helpers import MocaException
[docs]class WigReader(object):
"""Class for reading and querying wigfiles"""
def __init__(self, wig_location):
"""
Arguments
---------
wig_location: Path to bigwig
"""
self.wig_location = wig_location
try:
self.wig = pyBigWig.open(self.wig_location)
except Exception as e:
raise MocaException('Error reading wig file: {}'.format(e))
[docs] def query(self, intervals):
""" Query regions for scores
Arguments
---------
intervals: list of tuples
A list of tuples with the following format: (chr, chrStart, chrEnd, strand)
Returns
-------
scores: np.array
A numpy array containing scores for each tuple
"""
scores = []
chrom_lengths = self.get_chromosomes
for chrom, chromStart, chromEnd, strand in intervals:
if chrom not in list(chrom_lengths.keys()):
warnings.warn('Chromosome {} does not appear in the bigwig'.format(chrom), UserWarning)
continue
chrom_length = chrom_lengths[chrom]
if int(chromStart)> chrom_length:
raise MocaException('Chromsome start point exceeds chromosome length: {}>{}'.format(chromStart, chrom_length))
elif int(chromEnd)> chrom_length:
raise MocaException('Chromsome end point exceeds chromosome length: {}>{}'.format(chromEnd, chrom_length))
score = self.wig.values(chrom, int(chromStart), int(chromEnd))
if strand == '-':
score.reverse()
scores.append(score)
return np.array(scores)
@property
def get_chromosomes(self):
"""Return list of chromsome and their sizes
as in the wig file
Returns
-------
chroms: dict
Dictionary with {"chr": "Length"} format
"""
return self.wig.chroms()