Mercurial > repos > iuc > data_manager_nextclade
comparison data_manager/nextclade_dm.py @ 0:6e64cb3d2b1d draft
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/data_managers/data_manager_nextclade commit 3d6dabd066dcbe31cfa38fbfac340e253d8a984d
author | iuc |
---|---|
date | Sat, 30 Jul 2022 08:09:07 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:6e64cb3d2b1d |
---|---|
1 #!/usr/bin/env python | |
2 | |
3 import argparse | |
4 import datetime | |
5 import json | |
6 import operator | |
7 import pathlib | |
8 import subprocess | |
9 import sys | |
10 from typing import List | |
11 | |
12 | |
13 def parse_date(d: str) -> datetime.datetime: | |
14 # Parses the publication date from the nextclade release tags or user input into a datetime object. | |
15 date = None | |
16 try: | |
17 date = datetime.datetime.strptime(d, "%Y-%m-%dT%H:%M:%SZ") | |
18 except ValueError: | |
19 date = datetime.datetime.strptime(d, "%Y-%m-%d") | |
20 return date | |
21 | |
22 | |
23 def entry_to_tag(entry: dict) -> str: | |
24 return ( | |
25 entry["attributes"]["name"]["value"] + "_" + entry["attributes"]["tag"]["value"] | |
26 ) | |
27 | |
28 | |
29 def get_database_list() -> List[dict]: | |
30 list_cmd = [ | |
31 "nextclade", | |
32 "dataset", | |
33 "list", | |
34 "--json", | |
35 "--include-old", | |
36 "--include-incompatible", | |
37 ] | |
38 list_proc = subprocess.run(list_cmd, capture_output=True, check=True) | |
39 database_list = json.loads(list_proc.stdout) | |
40 entry_list = [] | |
41 for db_entry in database_list: | |
42 attributes = db_entry["attributes"] | |
43 entry = { | |
44 "value": entry_to_tag(db_entry), | |
45 "database_name": attributes["name"]["value"], | |
46 "description": attributes["name"]["valueFriendly"], | |
47 "date": datetime.datetime.fromisoformat( | |
48 attributes["tag"]["value"].replace("Z", "") | |
49 ), | |
50 "tag": attributes["tag"]["value"], | |
51 "min_nextclade_version": db_entry["compatibility"]["nextcladeCli"]["min"], | |
52 } | |
53 entry_list.append(entry) | |
54 return entry_list | |
55 | |
56 | |
57 def filter_by_date( | |
58 existing_release_tags: List[str], | |
59 name: str, | |
60 releases: list, | |
61 start_date: datetime.datetime = None, | |
62 end_date: datetime.datetime = None, | |
63 ) -> List[dict]: | |
64 ret = [] | |
65 for release in releases: | |
66 if ( | |
67 release["database_name"] != name | |
68 or release["value"] in existing_release_tags | |
69 ): | |
70 continue | |
71 if start_date and release["date"] < start_date: | |
72 break | |
73 if not end_date or release["date"] <= end_date: | |
74 ret.append(release) | |
75 | |
76 return ret | |
77 | |
78 | |
79 def download_and_unpack(name: str, release: str, output_directory: str) -> pathlib.Path: | |
80 download_cmd = [ | |
81 "nextclade", | |
82 "dataset", | |
83 "get", | |
84 "--name", | |
85 name, | |
86 "--tag", | |
87 release, | |
88 "--output-dir", | |
89 ] | |
90 output_path = pathlib.Path(output_directory) / ( | |
91 name + "_" + release.replace(":", "-") | |
92 ) | |
93 download_cmd.append(str(output_path)) | |
94 subprocess.run(download_cmd, check=True) | |
95 return output_path | |
96 | |
97 | |
98 def comma_split(args: str) -> List[str]: | |
99 return args.split(",") | |
100 | |
101 | |
102 if __name__ == "__main__": | |
103 | |
104 parser = argparse.ArgumentParser() | |
105 parser.add_argument("--testmode", default=False, action="store_true") | |
106 parser.add_argument("--latest", default=False, action="store_true") | |
107 parser.add_argument("--start_date", type=parse_date) | |
108 parser.add_argument("--end_date", type=parse_date) | |
109 parser.add_argument("--known_revisions", type=comma_split) | |
110 parser.add_argument("--datasets", type=comma_split, default=["sars-cov-2"]) | |
111 parser.add_argument("datatable_name", default="nextclade") | |
112 parser.add_argument("galaxy_config") | |
113 args = parser.parse_args() | |
114 | |
115 # known-revisions is populated from the Galaxy data table by the wrapper | |
116 if args.known_revisions is not None: | |
117 existing_release_tags = set(args.known_revisions) | |
118 else: | |
119 existing_release_tags = set() | |
120 | |
121 releases_available = get_database_list() | |
122 if args.testmode: | |
123 releases = [] | |
124 for name in args.datasets: | |
125 releases.extend( | |
126 filter_by_date( | |
127 [], | |
128 name, | |
129 releases_available, | |
130 start_date=args.start_date, | |
131 end_date=args.end_date, | |
132 ) | |
133 ) | |
134 for release in releases: | |
135 print( | |
136 release["value"], | |
137 release["description"], | |
138 release["date"].isoformat(), | |
139 release["min_nextclade_version"], | |
140 ) | |
141 sys.exit(0) | |
142 | |
143 with open(args.galaxy_config) as fh: | |
144 config = json.load(fh) | |
145 | |
146 output_directory = config.get("output_data", [{}])[0].get("extra_files_path", None) | |
147 | |
148 data_manager_dict = {"data_tables": {args.datatable_name: []}} | |
149 | |
150 releases = [] | |
151 if args.latest: | |
152 for dataset in args.datasets: | |
153 for release in releases_available: | |
154 if release["database_name"] == dataset: | |
155 if release["value"] not in existing_release_tags: | |
156 # add the latest release for this dataset, but only if we don't already have it | |
157 releases.append(release) | |
158 break | |
159 else: | |
160 for dataset in args.datasets: | |
161 releases_for_ds = filter_by_date( | |
162 existing_release_tags, | |
163 dataset, | |
164 releases_available, | |
165 start_date=args.start_date, | |
166 end_date=args.end_date, | |
167 ) | |
168 releases.extend(releases_for_ds) | |
169 | |
170 for release in releases: | |
171 fname = download_and_unpack( | |
172 release["database_name"], release["tag"], output_directory | |
173 ) | |
174 if fname is not None: | |
175 data_manager_dict["data_tables"][args.datatable_name].append( | |
176 { | |
177 "value": release["value"], | |
178 "database_name": release["database_name"], | |
179 "description": release["description"], | |
180 "min_nextclade_version": release["min_nextclade_version"], | |
181 "date": release["date"].isoformat(), # ISO 8601 is easily sortable | |
182 "path": str(output_directory / fname), | |
183 } | |
184 ) | |
185 data_manager_dict["data_tables"][args.datatable_name].sort( | |
186 key=operator.itemgetter("value"), reverse=True | |
187 ) | |
188 with open(args.galaxy_config, "w") as fh: | |
189 json.dump(data_manager_dict, fh, indent=2, sort_keys=True) |