0
|
1 =pod
|
|
2
|
|
3 =head1 NAME
|
|
4
|
|
5 Bio::EnsEMBL::Hive::RunnableDB::Funcgen::SetupAlignmentPipeline
|
|
6
|
|
7 =head1 DESCRIPTION
|
|
8
|
|
9 'SetupAlignmentPipeline' Does all the setup before the Alignment is run
|
|
10 Checks for existence of input files, etc...
|
|
11 This Runnable CAN be run multiple times in parallell!
|
|
12
|
|
13 =cut
|
|
14
|
|
15 package Bio::EnsEMBL::Funcgen::RunnableDB::SetupAlignmentPipeline;
|
|
16
|
|
17 use warnings;
|
|
18 use strict;
|
|
19 use Bio::EnsEMBL::DBSQL::DBAdaptor;
|
|
20 use Bio::EnsEMBL::Funcgen::DBSQL::DBAdaptor;
|
|
21 use Bio::EnsEMBL::Utils::Exception qw(throw warning stack_trace_dump);
|
|
22 use Bio::EnsEMBL::Funcgen::Utils::EFGUtils qw(is_gzipped);
|
|
23 use Data::Dumper;
|
|
24
|
|
25 use base ('Bio::EnsEMBL::Funcgen::RunnableDB::Alignment');
|
|
26
|
|
27 #TODO... Maybe use and update the tracking database...
|
|
28 sub fetch_input { # fetch parameters...
|
|
29 my $self = shift @_;
|
|
30
|
|
31 $self->SUPER::fetch_input();
|
|
32
|
|
33 #Magic default number...
|
|
34 my $fastq_chunk_size = 8000000;
|
|
35 if($self->param("fastq_chunk_size")){ $fastq_chunk_size = $self->param("fastq_chunk_size")};
|
|
36 $self->_fastq_chunk_size($fastq_chunk_size);
|
|
37
|
|
38 #Sets up the output dir for this experiment_name
|
|
39 my $output_dir = $self->_output_dir();
|
|
40 if(! -d $output_dir){
|
|
41 system("mkdir -p $output_dir") && throw("Couldn't create output directory $output_dir");
|
|
42 }
|
|
43
|
|
44 my $input_dir = $self->_input_dir();
|
|
45 if(! -d $input_dir ){ throw " Couldn't find input directory $input_dir"; }
|
|
46
|
|
47 opendir(DIR,$input_dir);
|
|
48 my @dirs = grep(/^\d/,readdir(DIR));
|
|
49 closedir(DIR);
|
|
50
|
|
51 if(scalar(@dirs)==0){ throw "No replicates found in $input_dir"; }
|
|
52
|
|
53 my @input_files;
|
|
54 my @replicates;
|
|
55 foreach my $dir (@dirs){
|
|
56 #TODO: maybe use some other code for replicates? (e.g. Rep\d )
|
|
57 if($dir =~ /^(\d)$/){
|
|
58 my $replicate = $1;
|
|
59
|
|
60 opendir(DIR,$input_dir."/".$replicate);
|
|
61 my @files = grep(/.fastq/,readdir(DIR));
|
|
62 closedir(DIR);
|
|
63
|
|
64 if(scalar(@files)==0){ throw "No files for replicate $replicate"; }
|
|
65
|
|
66 my $file_count = 0;
|
|
67 for my $file (@files){
|
|
68 push @input_files, {
|
|
69 path => $input_dir."/".$replicate."/".$file,
|
|
70 replicate => $replicate,
|
|
71 file_index => $file_count++,
|
|
72 };
|
|
73 }
|
|
74
|
|
75 push @replicates, $replicate;
|
|
76 } else { warn "Invalid replicate $dir ignored"; }
|
|
77 }
|
|
78
|
|
79 $self->_input_files(\@input_files);
|
|
80 $self->_replicates(\@replicates);
|
|
81
|
|
82 return 1;
|
|
83 }
|
|
84
|
|
85 sub run {
|
|
86 my $self = shift @_;
|
|
87
|
|
88 my $fastq_chunk_size = $self->_fastq_chunk_size();
|
|
89
|
|
90 my @output_ids;
|
|
91 my $set_name = $self->_set_name();
|
|
92
|
|
93 foreach my $file_info (@{$self->_input_files()}){
|
|
94 my $file = $file_info->{'path'};
|
|
95 my $replicate = $file_info->{'replicate'};
|
|
96 my $file_index = $file_info->{'file_index'};
|
|
97
|
|
98 my $cmd;
|
|
99
|
|
100 if($file =~ /^(.*.fastq).gz$/){
|
|
101 $cmd = "gunzip -c";
|
|
102 }
|
|
103 elsif($file =~ /^(.*.fastq).bz2$/){
|
|
104 $cmd = "bunzip2 -c"
|
|
105 }
|
|
106 else {
|
|
107 $cmd = "cat";
|
|
108 }
|
|
109
|
|
110 $cmd .= ' '.$file.' | split -d -a 4 -l '.$fastq_chunk_size.' - '. $self->_output_dir().'/'.$set_name."_".$replicate.'_'.$file_index.'_';
|
|
111
|
|
112 if(system($cmd) != 0){ throw "Problems running $cmd"; }
|
|
113 }
|
|
114
|
|
115
|
|
116 return 1;
|
|
117 }
|
|
118
|
|
119
|
|
120 sub write_output { # Create the relevant job
|
|
121 my $self = shift @_;
|
|
122
|
|
123 my $set_name = $self->_set_name;
|
|
124
|
|
125 my (@align_output_ids, @merge_output_ids);
|
|
126
|
|
127 opendir(DIR,$self->_output_dir());
|
|
128 for my $split_file ( grep(/^${set_name}_\d+_\d+_\d+$/,readdir(DIR)) ){
|
|
129 my $output = eval($self->input_id);
|
|
130 $output->{input_file} = $split_file;
|
|
131 push @align_output_ids, $output;
|
|
132 }
|
|
133 closedir(DIR);
|
|
134
|
|
135 # merge data for each replicate
|
|
136
|
|
137 for my $rep (@{$self->_replicates}){
|
|
138 my $output = eval($self->input_id);
|
|
139 $output->{replicate} = $rep;
|
|
140 push @merge_output_ids, $output;
|
|
141 }
|
|
142
|
|
143
|
|
144 # files to align
|
|
145 $self->dataflow_output_id(\@align_output_ids, 1);
|
|
146
|
|
147 # merge data acros replicates
|
|
148 $self->dataflow_output_id($self->input_id, 2);#input_id
|
|
149 return 1;
|
|
150
|
|
151 }
|
|
152
|
|
153 #Private getter / setter to the fastq chunk size
|
|
154 sub _fastq_chunk_size {
|
|
155 return $_[0]->_getter_setter('fastq_chunk_size',$_[1]);
|
|
156 }
|
|
157
|
|
158 #Private getter / setter to the output_ids list
|
|
159 sub _output_ids {
|
|
160 return $_[0]->_getter_setter('output_ids',$_[1]);
|
|
161 }
|
|
162
|
|
163 #Private getter / setter to the output_ids list
|
|
164 sub _replicates {
|
|
165 return $_[0]->_getter_setter('replicates',$_[1]);
|
|
166 }
|
|
167
|
|
168 sub _input_files {
|
|
169 return $_[0]->_getter_setter('input_files',$_[1]);
|
|
170 }
|
|
171
|
|
172 1;
|