| 0 | 1 '''Integration tests for the GCMS project''' | 
|  | 2 | 
|  | 3 from pkg_resources import resource_filename  # @UnresolvedImport # pylint: disable=E0611 | 
|  | 4 from GCMS import library_lookup, combine_output | 
|  | 5 from GCMS.rankfilter_GCMS import rankfilter | 
|  | 6 import os.path | 
|  | 7 import sys | 
|  | 8 import unittest | 
|  | 9 import re | 
|  | 10 | 
|  | 11 | 
|  | 12 class IntegrationTest(unittest.TestCase): | 
|  | 13     def test_library_lookup(self): | 
|  | 14         ''' | 
|  | 15         Run main for data/NIST_tabular and compare produced files with references determined earlier. | 
|  | 16         ''' | 
|  | 17         # Create out folder | 
|  | 18         outdir = "output/" #tempfile.mkdtemp(prefix='test_library_lookup') | 
|  | 19         if not os.path.exists(outdir): | 
|  | 20             os.makedirs(outdir) | 
|  | 21         outfile_base = os.path.join(outdir, 'produced_library_lookup') | 
|  | 22         outfile_txt = outfile_base + '.txt' | 
|  | 23 | 
|  | 24         #Build up arguments and run | 
|  | 25         input_txt = resource_filename(__name__, "data/NIST_tabular.txt") | 
|  | 26         library = resource_filename(__name__, "data/RIDB_subset.txt") | 
|  | 27         regress_model = resource_filename(__name__, "data/ridb_poly_regression.txt") | 
|  | 28         sys.argv = ['test', | 
|  | 29                     library, | 
|  | 30                     input_txt, | 
|  | 31                     'Capillary', | 
|  | 32                     'Semi-standard non-polar', | 
|  | 33                     outfile_txt, | 
|  | 34                     'HP-5', | 
|  | 35                     regress_model] | 
|  | 36         # Execute main function with arguments provided through sys.argv | 
|  | 37         library_lookup.main() | 
|  | 38         #Compare with reference files | 
|  | 39         reference_txt = resource_filename(__name__, 'reference/produced_library_lookup.txt') | 
|  | 40 | 
|  | 41         #read both the reference file  and actual output files | 
|  | 42         expected = _read_file(reference_txt) | 
|  | 43         actual = _read_file(outfile_txt) | 
|  | 44 | 
|  | 45         #convert the read in files to lists we can compare | 
|  | 46         expected = expected.split() | 
|  | 47         actual = actual.split() | 
|  | 48 | 
|  | 49         for exp, act in zip(expected, actual): | 
|  | 50             if re.match('\\d+\\.\\d+', exp): | 
|  | 51                 exp = float(exp) | 
|  | 52                 act = float(act) | 
|  | 53                 self.assertAlmostEqual(exp, act, places=5) | 
|  | 54             else: | 
|  | 55                 # compare values | 
|  | 56                 self.failUnlessEqual(expected, actual) | 
|  | 57 | 
|  | 58 | 
|  | 59     def test_combine_output_simple(self): | 
|  | 60         ''' | 
|  | 61         Run main for data/NIST_tabular and compare produced files with references determined earlier. | 
|  | 62         ''' | 
|  | 63         # Create out folder | 
|  | 64         outdir = "output/" #tempfile.mkdtemp(prefix='test_library_lookup') | 
|  | 65         if not os.path.exists(outdir): | 
|  | 66             os.makedirs(outdir) | 
|  | 67         outfile_base = os.path.join(outdir, 'produced_combine_output') | 
|  | 68         outfile_single_txt = outfile_base + '_single.txt' | 
|  | 69         outfile_multi_txt = outfile_base + '_multi.txt' | 
|  | 70 | 
|  | 71         #Build up arguments and run | 
|  | 72         input_rankfilter = resource_filename(__name__, "data/Rankfilter.txt") | 
|  | 73         input_caslookup = resource_filename(__name__, "data/Caslookup.txt") | 
|  | 74         sys.argv = ['test', | 
|  | 75                     input_rankfilter, | 
|  | 76                     input_caslookup, | 
|  | 77                     outfile_single_txt, | 
|  | 78                     outfile_multi_txt] | 
|  | 79         # Execute main function with arguments provided through sys.argv | 
|  | 80         combine_output.main() | 
|  | 81         #Compare with reference files | 
|  | 82         # reference_single_txt = resource_filename(__name__, 'reference/produced_combine_output_single.txt') | 
|  | 83         # reference_multi_txt = resource_filename(__name__, 'reference/produced_combine_output_multi.txt') | 
|  | 84         # self.failUnlessEqual(_read_file(reference_single_txt), _read_file(outfile_single_txt)) | 
|  | 85         # self.failUnlessEqual(_read_file(reference_multi_txt), _read_file(outfile_multi_txt)) | 
|  | 86 | 
|  | 87         #Clean up | 
|  | 88         #shutil.rmtree(tempdir) | 
|  | 89 | 
|  | 90 | 
|  | 91 | 
|  | 92     def def_test_rank_filter_advanced(self): | 
|  | 93         ''' | 
|  | 94         Run main of RankFilter | 
|  | 95         ''' | 
|  | 96         # Create out folder | 
|  | 97         outdir = "output/integration/" | 
|  | 98         if not os.path.exists(outdir): | 
|  | 99             os.makedirs(outdir) | 
|  | 100 | 
|  | 101         #Build up arguments and run | 
|  | 102         input_txt = resource_filename(__name__, "data/integration/RankFilterInput_conf.txt") | 
|  | 103         sys.argv = ['test', | 
|  | 104                     input_txt] | 
|  | 105         # Execute main function with arguments provided through sys.argv | 
|  | 106         rankfilter.main() | 
|  | 107         #Compare with reference files | 
|  | 108 | 
|  | 109     def def_test_library_lookup_advanced(self): | 
|  | 110         ''' | 
|  | 111         Run main for data/NIST_tabular and compare produced files with references determined earlier. | 
|  | 112         ''' | 
|  | 113         # Create out folder | 
|  | 114         outdir = "output/integration/" | 
|  | 115         if not os.path.exists(outdir): | 
|  | 116             os.makedirs(outdir) | 
|  | 117         outfile_base = os.path.join(outdir, 'produced_library_lookup_ADVANCED') | 
|  | 118         outfile_txt = outfile_base + '.txt' | 
|  | 119 | 
|  | 120         #Build up arguments and run | 
|  | 121         input_txt = resource_filename(__name__, "data/integration/NIST_identification_results_tabular.txt") | 
|  | 122         library = resource_filename(__name__, "data/integration/Library_RI_DB_capillary_columns-noDuplicates.txt") | 
|  | 123         regress_model = resource_filename(__name__, "data/integration/regression_MODEL_for_columns.txt") | 
|  | 124         sys.argv = ['test', | 
|  | 125                     library, | 
|  | 126                     input_txt, | 
|  | 127                     'Capillary', | 
|  | 128                     'Semi-standard non-polar', | 
|  | 129                     outfile_txt, | 
|  | 130                     'DB-5', | 
|  | 131                     regress_model] | 
|  | 132         # Execute main function with arguments provided through sys.argv | 
|  | 133         library_lookup.main() | 
|  | 134 | 
|  | 135 | 
|  | 136 | 
|  | 137     def test_combine_output_advanced(self): | 
|  | 138         ''' | 
|  | 139         Variant on test case above, but a bit more complex as some of the centrotypes have | 
|  | 140         different NIST hits which should give them different RI values. This test also | 
|  | 141         runs not only the combine output, but the other two preceding steps as well, | 
|  | 142         so it ensures the integration also works on the current code of all three tools. | 
|  | 143         ''' | 
|  | 144 | 
|  | 145         # Run RankFilter | 
|  | 146         self.def_test_rank_filter_advanced() | 
|  | 147 | 
|  | 148         # Run library CAS RI lookup | 
|  | 149         self.def_test_library_lookup_advanced() | 
|  | 150 | 
|  | 151         outdir = "output/integration/" | 
|  | 152         outfile_base = os.path.join(outdir, 'produced_combine_output') | 
|  | 153         outfile_single_txt = outfile_base + '_single.txt' | 
|  | 154         outfile_multi_txt = outfile_base + '_multi.txt' | 
|  | 155 | 
|  | 156         #Build up arguments and run | 
|  | 157         input_rankfilter = resource_filename(__name__, "output/integration/produced_rank_filter_out.txt") | 
|  | 158         input_caslookup = resource_filename(__name__, "output/integration/produced_library_lookup_ADVANCED.txt") | 
|  | 159         sys.argv = ['test', | 
|  | 160                     input_rankfilter, | 
|  | 161                     input_caslookup, | 
|  | 162                     outfile_single_txt, | 
|  | 163                     outfile_multi_txt] | 
|  | 164         # Execute main function with arguments provided through sys.argv | 
|  | 165         combine_output.main() | 
|  | 166         #Compare with reference files | 
|  | 167 #        reference_single_txt = resource_filename(__name__, 'reference/produced_combine_output_single.txt') | 
|  | 168 #        reference_multi_txt = resource_filename(__name__, 'reference/produced_combine_output_multi.txt') | 
|  | 169 #        self.failUnlessEqual(_read_file(reference_single_txt), _read_file(outfile_single_txt)) | 
|  | 170 #        self.failUnlessEqual(_read_file(reference_multi_txt), _read_file(outfile_multi_txt)) | 
|  | 171 | 
|  | 172         # Check 1: output single should have one record per centrotype: | 
|  | 173 | 
|  | 174 | 
|  | 175         # Check 2: output single has more records than output single: | 
|  | 176         combine_result_single_items =  combine_output._process_data(outfile_single_txt) | 
|  | 177         combine_result_multi_items =  combine_output._process_data(outfile_multi_txt) | 
|  | 178         self.assertGreater(len(combine_result_single_items['Centrotype']), | 
|  | 179                            len(combine_result_multi_items['Centrotype'])) | 
|  | 180 | 
|  | 181 | 
|  | 182         # Check 3: library_lookup RI column, centrotype column, ri_svr column are correct: | 
|  | 183         caslookup_items = combine_output._process_data(input_caslookup) | 
|  | 184         rankfilter_items = combine_output._process_data(input_rankfilter) | 
|  | 185 | 
|  | 186         # check that the caslookup RI column is correctly maintained in its original order in | 
|  | 187         # the combined file: | 
|  | 188         ri_caslookup = caslookup_items['RI'] | 
|  | 189         ri_combine_single = combine_result_single_items['RI'] | 
|  | 190         self.assertListEqual(ri_caslookup, ri_combine_single) | 
|  | 191 | 
|  | 192         # check the centrotype column's integrity: | 
|  | 193         centrotype_caslookup = caslookup_items['Centrotype'] | 
|  | 194         centrotype_combine_single = combine_result_single_items['Centrotype'] | 
|  | 195         centrotype_rankfilter = _get_centrotype_rankfilter(rankfilter_items['ID']) | 
|  | 196         self.assertListEqual(centrotype_caslookup, centrotype_combine_single) | 
|  | 197         self.assertListEqual(centrotype_caslookup, centrotype_rankfilter) | 
|  | 198 | 
|  | 199         # integration and integrity checks: | 
|  | 200         file_NIST = resource_filename(__name__, "data/integration/NIST_identification_results_tabular.txt") | 
|  | 201         file_NIST_items = combine_output._process_data(file_NIST) | 
|  | 202         # check that rank filter output has exactly the same ID items as the original NIST input file: | 
|  | 203         self.assertListEqual(file_NIST_items['ID'], rankfilter_items['ID']) | 
|  | 204         # check the same for the CAS column: | 
|  | 205         self.assertListEqual(_get_strippedcas(file_NIST_items['CAS']), rankfilter_items['CAS']) | 
|  | 206         # now check the NIST CAS column against the cas lookup results: | 
|  | 207         cas_NIST = _get_processedcas(file_NIST_items['CAS']) | 
|  | 208         self.assertListEqual(cas_NIST, caslookup_items['CAS']) | 
|  | 209         # now check the CAS of the combined result. If all checks are OK, it means the CAS column's order | 
|  | 210         # and values remained stable throughout all steps: | 
|  | 211         self.assertListEqual(rankfilter_items['CAS'], combine_result_single_items['CAS']) | 
|  | 212 | 
|  | 213         # check that the rankfilter RIsvr column is correctly maintained in its original order in | 
|  | 214         # the combined file: | 
|  | 215         risvr_rankfilter = rankfilter_items['RIsvr'] | 
|  | 216         risvr_combine_single = combine_result_single_items['RIsvr'] | 
|  | 217         self.assertListEqual(risvr_rankfilter, risvr_combine_single) | 
|  | 218 | 
|  | 219 | 
|  | 220 | 
|  | 221 | 
|  | 222 def _get_centrotype_rankfilter(id_list): | 
|  | 223     ''' | 
|  | 224     returns the list of centrotype ids given a list of ID in the | 
|  | 225     form e.g. 74-1.0-564-1905200-7, where the numbers before the | 
|  | 226     first "-" are the centrotype id | 
|  | 227     ''' | 
|  | 228     result = [] | 
|  | 229     for compound_id_idx in xrange(len(id_list)): | 
|  | 230         compound_id = id_list[compound_id_idx] | 
|  | 231         centrotype = compound_id.split('-')[0] | 
|  | 232         result.append(centrotype) | 
|  | 233 | 
|  | 234     return result | 
|  | 235 | 
|  | 236 | 
|  | 237 def _get_processedcas(cas_list): | 
|  | 238     ''' | 
|  | 239     returns the list cas numbers in the form C64175 instead of 64-17-5 | 
|  | 240     ''' | 
|  | 241     result = [] | 
|  | 242     for cas_id_idx in xrange(len(cas_list)): | 
|  | 243         cas = cas_list[cas_id_idx] | 
|  | 244         processed_cas = 'C' + str(cas.replace('-', '').strip()) | 
|  | 245         result.append(processed_cas) | 
|  | 246 | 
|  | 247     return result | 
|  | 248 | 
|  | 249 def _get_strippedcas(cas_list): | 
|  | 250     ''' | 
|  | 251     removes the leading white space from e.g. " 64-17-5" | 
|  | 252     ''' | 
|  | 253     result = [] | 
|  | 254     for cas_id_idx in xrange(len(cas_list)): | 
|  | 255         cas = cas_list[cas_id_idx] | 
|  | 256         processed_cas = cas.strip() | 
|  | 257         result.append(processed_cas) | 
|  | 258 | 
|  | 259     return result | 
|  | 260 | 
|  | 261 | 
|  | 262 def _read_file(filename): | 
|  | 263     ''' | 
|  | 264     Helper method to quickly read a file | 
|  | 265     @param filename: | 
|  | 266     ''' | 
|  | 267     with open(filename) as handle: | 
|  | 268         return handle.read() |