Mercurial > repos > dave > data_manager_selection_background
comparison data_manager/data_manager_selection_background.py @ 0:98beedb0e74b draft
"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/data_managers/data_manager_selection_background commit 4854cf92350265cac72ba5bb462e08b8cd239b8b-dirty"
| author | dave |
|---|---|
| date | Tue, 24 Aug 2021 14:59:07 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:98beedb0e74b |
|---|---|
| 1 # -*- coding: utf-8 -*- | |
| 2 import argparse | |
| 3 import bz2 | |
| 4 import gzip | |
| 5 import json | |
| 6 import os | |
| 7 import shutil | |
| 8 import sys | |
| 9 import uuid | |
| 10 import zipfile | |
| 11 from urllib.request import urlretrieve | |
| 12 | |
| 13 # Nice solution to opening compressed files (zip/bz2/gz) transparently | |
| 14 # https://stackoverflow.com/a/13045892/638445 | |
| 15 | |
| 16 | |
| 17 class CompressedFile(object): | |
| 18 magic = None | |
| 19 file_type = None | |
| 20 mime_type = None | |
| 21 proper_extension = None | |
| 22 | |
| 23 def __init__(self, f): | |
| 24 # f is an open file or file like object | |
| 25 self.f = f | |
| 26 self.accessor = self.open() | |
| 27 | |
| 28 @classmethod | |
| 29 def is_magic(self, data): | |
| 30 return data.startswith(self.magic) | |
| 31 | |
| 32 def open(self): | |
| 33 return None | |
| 34 | |
| 35 | |
| 36 class ZIPFile(CompressedFile): | |
| 37 magic = '\x50\x4b\x03\x04' | |
| 38 file_type = 'zip' | |
| 39 mime_type = 'compressed/zip' | |
| 40 | |
| 41 def open(self): | |
| 42 return zipfile.ZipFile(self.f) | |
| 43 | |
| 44 | |
| 45 class BZ2File(CompressedFile): | |
| 46 magic = '\x42\x5a\x68' | |
| 47 file_type = 'bz2' | |
| 48 mime_type = 'compressed/bz2' | |
| 49 | |
| 50 def open(self): | |
| 51 return bz2.BZ2File(self.f) | |
| 52 | |
| 53 | |
| 54 class GZFile(CompressedFile): | |
| 55 magic = '\x1f\x8b\x08' | |
| 56 file_type = 'gz' | |
| 57 mime_type = 'compressed/gz' | |
| 58 | |
| 59 def open(self): | |
| 60 return gzip.GzipFile(self.f) | |
| 61 | |
| 62 | |
| 63 # factory function to create a suitable instance for accessing files | |
| 64 def get_compressed_file(filename): | |
| 65 with open(filename, 'rb') as f: | |
| 66 start_of_file = f.read(1024) | |
| 67 f.seek(0) | |
| 68 for cls in (ZIPFile, BZ2File, GZFile): | |
| 69 if cls.is_magic(start_of_file): | |
| 70 f.close() | |
| 71 return cls(filename) | |
| 72 | |
| 73 return None | |
| 74 | |
| 75 | |
| 76 def url_download(url): | |
| 77 """Attempt to download gene annotation file from a given url | |
| 78 :param url: full url to gene annotation file | |
| 79 :type url: str. | |
| 80 :returns: name of downloaded gene annotation file | |
| 81 :raises: ContentDecodingError, IOError | |
| 82 """ | |
| 83 | |
| 84 # Generate file_name | |
| 85 file_name = url.split('/')[-1] | |
| 86 | |
| 87 try: | |
| 88 # download URL (FTP and HTTP work, probably local and data too) | |
| 89 urlretrieve(url, file_name) | |
| 90 | |
| 91 # uncompress file if needed | |
| 92 cf = get_compressed_file(file_name) | |
| 93 if cf is not None: | |
| 94 uncompressed_file_name = os.path.splitext(file_name)[0] | |
| 95 with open(uncompressed_file_name, 'w+') as uncompressed_file: | |
| 96 shutil.copyfileobj(cf.accessor, uncompressed_file) | |
| 97 os.remove(file_name) | |
| 98 file_name = uncompressed_file_name | |
| 99 except IOError as e: | |
| 100 sys.stderr.write('Error occured downloading reference file: %s' % e) | |
| 101 os.remove(file_name) | |
| 102 return file_name | |
| 103 | |
| 104 | |
| 105 def main(): | |
| 106 parser = argparse.ArgumentParser(description='Create data manager JSON.') | |
| 107 parser.add_argument('--output', dest='output', action='store', required=True, help='JSON filename') | |
| 108 parser.add_argument('--dbkey', dest='dbkey', action='store', default=uuid.uuid4(), help='Data table entry unique ID') | |
| 109 parser.add_argument('--label', dest='label', action='store', required=True, help='Label to display') | |
| 110 parser.add_argument('--uri', dest='uri', action='store', help='URI for the sequences') | |
| 111 parser.add_argument('--dataset', dest='dataset', action='store', help='Path for the sequences') | |
| 112 | |
| 113 args = parser.parse_args() | |
| 114 | |
| 115 work_dir = os.getcwd() | |
| 116 | |
| 117 if args.uri is not None: | |
| 118 background_fasta = url_download(args.uri) | |
| 119 else: | |
| 120 background_fasta = args.dataset | |
| 121 | |
| 122 table_entry = '%s.fa' % args.dbkey | |
| 123 shutil.copy(background_fasta, os.path.join(work_dir, table_entry)) | |
| 124 | |
| 125 # Update Data Manager JSON and write to file | |
| 126 data_manager_entry = { | |
| 127 'data_tables': { | |
| 128 'selection_background': {'value': args.dbkey, 'label': args.label, 'path': table_entry} | |
| 129 } | |
| 130 } | |
| 131 | |
| 132 with open(os.path.join(args.output), 'w+') as fh: | |
| 133 json.dump(data_manager_entry, fh, sort_keys=True) | |
| 134 | |
| 135 | |
| 136 if __name__ == '__main__': | |
| 137 main() |
