"""Job Processor Module
"""
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import os
import warnings
import re
from ..helpers import ConfigurationParser
from ..helpers import run_job
from ..helpers import xstr
from ..helpers import get_cpu_count
from ..helpers.filename import touch
from ..wigoperations import WigReader
import numpy as np
[docs]class Pipeline(ConfigurationParser):
"""Generic class to run pipelines
Parameters
----------
config_file: string
Optional file input to load all configurations
"""
def __init__(self, config_file=None):
super(Pipeline, self).__init__(config_file)
self.commands_run = list()
#TODO This can be removed if config_file is optional
if not os.path.isfile(xstr(config_file)):
#TODO This should raise a warning and no xception
#raise MocaException('Config file {} not found'.format(config_file))
warnings.warn('No configuration file supplied. Defaults will be used.', UserWarning)
self.cpu_cores = get_cpu_count()
self.meme_default_params = '-dna -mod zoops -nmotifs 5 -minw 6 -maxw 30 -revcomp -nostatus -maxsize 1000000'
self.meme_strargs = None
self.meme_location = 'meme'
self.fimo_default_params = ''
self.fimo_strargs = None
self.fimo_location = 'fimo'
self.shuffler_location = 'fasta-shuffle-letters'
self.centrimo_args = None
self.centrimo_location = 'centrimo'
self.memechip_default_params = '-dna -meme-mod zoops -meme-nmotifs 5 -meme-minw 6 -meme-maxw 30 -meme-maxsize 1000000 -meme-p {}'.format(self.cpu_cores)
self.memechip_args = None
self.memechip_location = 'meme-chip'
self.commands_run = []
[docs] def can_run_meme_parallel(self, cmd):
stdout, stderr, exitcode = run_job(cmd=cmd, cwd='/tmp')
if '-p <np> given but Parallel MEME not configured.' in stderr:
return False
return True
[docs] def run_meme(self, fasta_in, out_dir=None, strargs=None):
"""Run meme
Run meme on a given input fasta
Parameters
---------
strargs: string
A concatenated string containing parameters as would be passed to standalone meme
Defualt parametes used: '-dna -revcomp -maxsize 1000000 -nmotifs 5 -p 4'
To modify: Pipeline.meme_default_params
fasta_in: string
Location of the fasta file
out_dir: string
Location to write all meme analysis output
Returns
-------
meme_out: string
Location of meme output
"""
self.meme_strargs = strargs
if not self.meme_strargs:
self.meme_strargs = self.meme_default_params
meme_binary = self.get_binary_path('meme').strip()
if not meme_binary or meme_binary == '':
# Use meme from envirnonment
meme_binary = 'meme'
else:
# Use absolute path meme
meme_binary += '/meme'
self.meme_location = meme_binary
if not out_dir:
out_dir = os.path.join(os.path.dirname(fasta_in), 'meme_out')
out_dir = os.path.abspath(out_dir)
regex = re.compile(r'-p.*')
print('#################'+self.meme_strargs)
if self.can_run_meme_parallel('{} -p 2'.format(meme_binary)) and not regex.findall(self.meme_strargs):
self.meme_strargs += ' -p {}'.format(self.cpu_cores)
cmd = '{} {} -oc {} {}'.format(self.meme_location, self.meme_strargs,
out_dir, os.path.abspath(fasta_in))
stdout, stderr, exitcode = run_job(cmd=cmd,
cwd=os.path.dirname(out_dir))
output = {'out_dir': out_dir, 'stdout': stdout,
'stderr': stderr, 'exitcode': exitcode,
'cmd': cmd}
self.commands_run.append({'cmd': cmd, 'metadata': output})
return output
[docs] def run_fimo(self, motif_file, motif_num, sequence_file, out_dir=None, strargs=None):
"""Run fimo to find out locations where motif occurs
Parameters
---------
motif_file: str
Path to meme.txt
motif_num: int
Motif number to investigate
sequence_file: str
Path to sequence file
out_dir: str
Location to output fimo results
strargs: str
string arguments as would be passed to fimo commandline
"""
#TODO This code is same as in the above fmethod. Make this a separate method?
self.fimo_strargs = strargs
if not out_dir:
out_dir = os.path.join(os.path.dirname(motif_file), 'fimo_out')
self.fimo_strargs = xstr(self.fimo_strargs) + ' --motif {} -oc {}'.format(motif_num, os.path.abspath(out_dir))
fimo_binary = self.get_binary_path('meme')
if not fimo_binary or fimo_binary == '':
# Use meme from envirnonment
fimo_binary = 'fimo'
else:
# Use absolute path meme
fimo_binary += '/fimo'
self.fimo_location = fimo_binary
cmd = '{}{} {} {}'.format(self.fimo_location, self.fimo_strargs,
os.path.abspath(motif_file),
os.path.abspath(sequence_file))
stdout, stderr, exitcode = run_job(cmd=cmd,
cwd=os.path.dirname(out_dir))
output = {'out_dir': out_dir, 'stdout': stdout,
'stderr': stderr, 'exitcode': exitcode,
'cmd': cmd}
self.commands_run.append({'cmd': cmd, 'metadata': output})
return output
[docs] def run_fasta_shuffler(self, fasta_in, fasta_out):
"""Run fasta-dinucleotide-shuffle to generate random fasta"""
shuffler_binary = self.get_binary_path('meme')
if not shuffler_binary or shuffler_binary == '':
# Use meme from envirnonment
shuffler_binary = 'fasta-shuffle-letters'
else:
# Use absolute path meme
shuffler_binary += '/fasta-shuffle-letters'
self.shuffler_location = shuffler_binary
cmd = '{} -kmer 2 -dna {} '.format(self.shuffler_location,
os.path.abspath(fasta_in))
stdout, stderr, exitcode = run_job(cmd=cmd,
cwd=os.path.dirname(fasta_out))
with open(os.path.abspath(fasta_out), 'w') as f:
f.write(stdout)
output = {'stdout': stdout,
'stderr': stderr,
'exitcode': exitcode,
'cmd': cmd}
return output
[docs] def run_centrimo(self, fasta_in, meme_file, out_dir):
"""Run centrimo
Parameters
----------
fasta_in: string
Path to input fasta
meme_file: string
Path to MEME's motif file
out_dir: string
Output directory
Returns
-------
output: dict
A dictionary with 'stderr,stdout,cmd,exitcode,out_dir'
"""
centrimo_binary = self.get_binary_path('meme').strip()
if not centrimo_binary or centrimo_binary=='':
centrimo_binary = 'centrimo'
else:
centrimo_binary += '/centrimo'
self.centrimo_location = centrimo_binary
if not out_dir:
out_dir = os.path.join(os.path.abspath(os.path.join(os.path.dirname(fasta_in), os.pardir)), 'centrimo_out')
else:
out_dir = os.path.abspath(out_dir)
cmd = '{} -oc {} {} {}'.format(self.centrimo_location, out_dir, os.path.abspath(fasta_in), os.path.abspath(meme_file))
stdout, stderr, exitcode = run_job(cmd=cmd,
cwd=os.path.dirname(out_dir))
output = {'out_dir': out_dir, 'stdout': stdout,
'stderr': stderr, 'exitcode': exitcode,
'cmd': cmd}
self.commands_run.append({'cmd': cmd, 'metadata': output})
return output
[docs] def run_memechip(self, fasta_in, out_dir=None, strargs=None):
"""Run meme-chip
Run meme-chip on a given input fasta
Parameters
---------
strargs: string
A concatenated string containing parameters as would be passed to standalone meme
Defualt parametes used: '-dna -revcomp -maxsize 1000000 -nmotifs 5 -p 4'
To modify: Pipeline.meme_default_params
fasta_in: string
Location of the fasta file
out_dir: string
Location to write all meme analysis output
Returns
-------
output: dict
A dictionary with 'stderr,stdout,cmd,exitcode,out_dir'
"""
self.memechip_strargs = strargs
if not self.memechip_strargs:
self.memechip_strargs = self.memechip_default_params
meme_binary = self.get_binary_path('meme').strip()
if not meme_binary or meme_binary == '':
# Use meme from envirnonment
meme_binary = 'meme-chip'
else:
# Use absolute path meme
meme_binary += '/meme-chip'
self.meme_location = meme_binary
if not out_dir:
out_dir = os.path.join(os.path.dirname(fasta_in), 'memechip_out')
out_dir = os.path.abspath(out_dir)
cmd = '{} {} -oc {} {}'.format(self.meme_location, self.memechip_strargs,
out_dir, os.path.abspath(fasta_in))
stdout, stderr, exitcode = run_job(cmd=cmd,
cwd=os.path.dirname(out_dir))
output = {'out_dir': out_dir, 'stdout': stdout,
'stderr': stderr, 'exitcode': exitcode,
'cmd': cmd}
self.commands_run.append({'cmd': cmd, 'metadata': output})
return output
@staticmethod
[docs] def save_conservation_scores(intervals, wig_file, out_directory, out_prefix='phylop'):
"""Extract and save conservation scores
Parameters
----------
intervals: list
list of tuple
wig_file: string
Wig file location
out_directory: string
Out file directory
out_prefix: string
Out file prefix
"""
wig = WigReader(wig_file)
conservation_scores = wig.query(intervals)
if np.any(conservation_scores):
conservation_scores_mean = np.nanmean(conservation_scores, axis=0)
np.savetxt(os.path.join(out_directory, '{}.raw.txt'.format(out_prefix)),
conservation_scores, fmt='%.4f')
np.savetxt(os.path.join(out_directory, '{}.mean.txt'.format(out_prefix)),
conservation_scores_mean, fmt='%.4f')
else:
touch(os.path.join(out_directory, '{}.mean.txt'.format(out_prefix)))
touch(os.path.join(out_directory, '{}.raw.txt'.format(out_prefix)))
return os.path.join(out_directory, '{}.mean.txt'.format(out_prefix))
@property
def get_meme_default_params(self):
return self.meme_default_params
@property
def get_memechip_default_params(self):
return self.memechip_default_params
@property
def get_fimo_default_params(self):
return self.fimo_default_params