comparison variant_effect_predictor/Bio/EnsEMBL/Funcgen/RunnableDB/SetupAlignmentPipeline.pm @ 0:1f6dce3d34e0

Uploaded
author mahtabm
date Thu, 11 Apr 2013 02:01:53 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:1f6dce3d34e0
1 =pod
2
3 =head1 NAME
4
5 Bio::EnsEMBL::Hive::RunnableDB::Funcgen::SetupAlignmentPipeline
6
7 =head1 DESCRIPTION
8
9 'SetupAlignmentPipeline' Does all the setup before the Alignment is run
10 Checks for existence of input files, etc...
11 This Runnable CAN be run multiple times in parallell!
12
13 =cut
14
15 package Bio::EnsEMBL::Funcgen::RunnableDB::SetupAlignmentPipeline;
16
17 use warnings;
18 use strict;
19 use Bio::EnsEMBL::DBSQL::DBAdaptor;
20 use Bio::EnsEMBL::Funcgen::DBSQL::DBAdaptor;
21 use Bio::EnsEMBL::Utils::Exception qw(throw warning stack_trace_dump);
22 use Bio::EnsEMBL::Funcgen::Utils::EFGUtils qw(is_gzipped);
23 use Data::Dumper;
24
25 use base ('Bio::EnsEMBL::Funcgen::RunnableDB::Alignment');
26
27 #TODO... Maybe use and update the tracking database...
28 sub fetch_input { # fetch parameters...
29 my $self = shift @_;
30
31 $self->SUPER::fetch_input();
32
33 #Magic default number...
34 my $fastq_chunk_size = 8000000;
35 if($self->param("fastq_chunk_size")){ $fastq_chunk_size = $self->param("fastq_chunk_size")};
36 $self->_fastq_chunk_size($fastq_chunk_size);
37
38 #Sets up the output dir for this experiment_name
39 my $output_dir = $self->_output_dir();
40 if(! -d $output_dir){
41 system("mkdir -p $output_dir") && throw("Couldn't create output directory $output_dir");
42 }
43
44 my $input_dir = $self->_input_dir();
45 if(! -d $input_dir ){ throw " Couldn't find input directory $input_dir"; }
46
47 opendir(DIR,$input_dir);
48 my @dirs = grep(/^\d/,readdir(DIR));
49 closedir(DIR);
50
51 if(scalar(@dirs)==0){ throw "No replicates found in $input_dir"; }
52
53 my @input_files;
54 my @replicates;
55 foreach my $dir (@dirs){
56 #TODO: maybe use some other code for replicates? (e.g. Rep\d )
57 if($dir =~ /^(\d)$/){
58 my $replicate = $1;
59
60 opendir(DIR,$input_dir."/".$replicate);
61 my @files = grep(/.fastq/,readdir(DIR));
62 closedir(DIR);
63
64 if(scalar(@files)==0){ throw "No files for replicate $replicate"; }
65
66 my $file_count = 0;
67 for my $file (@files){
68 push @input_files, {
69 path => $input_dir."/".$replicate."/".$file,
70 replicate => $replicate,
71 file_index => $file_count++,
72 };
73 }
74
75 push @replicates, $replicate;
76 } else { warn "Invalid replicate $dir ignored"; }
77 }
78
79 $self->_input_files(\@input_files);
80 $self->_replicates(\@replicates);
81
82 return 1;
83 }
84
85 sub run {
86 my $self = shift @_;
87
88 my $fastq_chunk_size = $self->_fastq_chunk_size();
89
90 my @output_ids;
91 my $set_name = $self->_set_name();
92
93 foreach my $file_info (@{$self->_input_files()}){
94 my $file = $file_info->{'path'};
95 my $replicate = $file_info->{'replicate'};
96 my $file_index = $file_info->{'file_index'};
97
98 my $cmd;
99
100 if($file =~ /^(.*.fastq).gz$/){
101 $cmd = "gunzip -c";
102 }
103 elsif($file =~ /^(.*.fastq).bz2$/){
104 $cmd = "bunzip2 -c"
105 }
106 else {
107 $cmd = "cat";
108 }
109
110 $cmd .= ' '.$file.' | split -d -a 4 -l '.$fastq_chunk_size.' - '. $self->_output_dir().'/'.$set_name."_".$replicate.'_'.$file_index.'_';
111
112 if(system($cmd) != 0){ throw "Problems running $cmd"; }
113 }
114
115
116 return 1;
117 }
118
119
120 sub write_output { # Create the relevant job
121 my $self = shift @_;
122
123 my $set_name = $self->_set_name;
124
125 my (@align_output_ids, @merge_output_ids);
126
127 opendir(DIR,$self->_output_dir());
128 for my $split_file ( grep(/^${set_name}_\d+_\d+_\d+$/,readdir(DIR)) ){
129 my $output = eval($self->input_id);
130 $output->{input_file} = $split_file;
131 push @align_output_ids, $output;
132 }
133 closedir(DIR);
134
135 # merge data for each replicate
136
137 for my $rep (@{$self->_replicates}){
138 my $output = eval($self->input_id);
139 $output->{replicate} = $rep;
140 push @merge_output_ids, $output;
141 }
142
143
144 # files to align
145 $self->dataflow_output_id(\@align_output_ids, 1);
146
147 # merge data acros replicates
148 $self->dataflow_output_id($self->input_id, 2);#input_id
149 return 1;
150
151 }
152
153 #Private getter / setter to the fastq chunk size
154 sub _fastq_chunk_size {
155 return $_[0]->_getter_setter('fastq_chunk_size',$_[1]);
156 }
157
158 #Private getter / setter to the output_ids list
159 sub _output_ids {
160 return $_[0]->_getter_setter('output_ids',$_[1]);
161 }
162
163 #Private getter / setter to the output_ids list
164 sub _replicates {
165 return $_[0]->_getter_setter('replicates',$_[1]);
166 }
167
168 sub _input_files {
169 return $_[0]->_getter_setter('input_files',$_[1]);
170 }
171
172 1;