comparison data_manager/nextclade_dm.py @ 0:6e64cb3d2b1d draft

planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/data_managers/data_manager_nextclade commit 3d6dabd066dcbe31cfa38fbfac340e253d8a984d
author iuc
date Sat, 30 Jul 2022 08:09:07 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:6e64cb3d2b1d
1 #!/usr/bin/env python
2
3 import argparse
4 import datetime
5 import json
6 import operator
7 import pathlib
8 import subprocess
9 import sys
10 from typing import List
11
12
13 def parse_date(d: str) -> datetime.datetime:
14 # Parses the publication date from the nextclade release tags or user input into a datetime object.
15 date = None
16 try:
17 date = datetime.datetime.strptime(d, "%Y-%m-%dT%H:%M:%SZ")
18 except ValueError:
19 date = datetime.datetime.strptime(d, "%Y-%m-%d")
20 return date
21
22
23 def entry_to_tag(entry: dict) -> str:
24 return (
25 entry["attributes"]["name"]["value"] + "_" + entry["attributes"]["tag"]["value"]
26 )
27
28
29 def get_database_list() -> List[dict]:
30 list_cmd = [
31 "nextclade",
32 "dataset",
33 "list",
34 "--json",
35 "--include-old",
36 "--include-incompatible",
37 ]
38 list_proc = subprocess.run(list_cmd, capture_output=True, check=True)
39 database_list = json.loads(list_proc.stdout)
40 entry_list = []
41 for db_entry in database_list:
42 attributes = db_entry["attributes"]
43 entry = {
44 "value": entry_to_tag(db_entry),
45 "database_name": attributes["name"]["value"],
46 "description": attributes["name"]["valueFriendly"],
47 "date": datetime.datetime.fromisoformat(
48 attributes["tag"]["value"].replace("Z", "")
49 ),
50 "tag": attributes["tag"]["value"],
51 "min_nextclade_version": db_entry["compatibility"]["nextcladeCli"]["min"],
52 }
53 entry_list.append(entry)
54 return entry_list
55
56
57 def filter_by_date(
58 existing_release_tags: List[str],
59 name: str,
60 releases: list,
61 start_date: datetime.datetime = None,
62 end_date: datetime.datetime = None,
63 ) -> List[dict]:
64 ret = []
65 for release in releases:
66 if (
67 release["database_name"] != name
68 or release["value"] in existing_release_tags
69 ):
70 continue
71 if start_date and release["date"] < start_date:
72 break
73 if not end_date or release["date"] <= end_date:
74 ret.append(release)
75
76 return ret
77
78
79 def download_and_unpack(name: str, release: str, output_directory: str) -> pathlib.Path:
80 download_cmd = [
81 "nextclade",
82 "dataset",
83 "get",
84 "--name",
85 name,
86 "--tag",
87 release,
88 "--output-dir",
89 ]
90 output_path = pathlib.Path(output_directory) / (
91 name + "_" + release.replace(":", "-")
92 )
93 download_cmd.append(str(output_path))
94 subprocess.run(download_cmd, check=True)
95 return output_path
96
97
98 def comma_split(args: str) -> List[str]:
99 return args.split(",")
100
101
102 if __name__ == "__main__":
103
104 parser = argparse.ArgumentParser()
105 parser.add_argument("--testmode", default=False, action="store_true")
106 parser.add_argument("--latest", default=False, action="store_true")
107 parser.add_argument("--start_date", type=parse_date)
108 parser.add_argument("--end_date", type=parse_date)
109 parser.add_argument("--known_revisions", type=comma_split)
110 parser.add_argument("--datasets", type=comma_split, default=["sars-cov-2"])
111 parser.add_argument("datatable_name", default="nextclade")
112 parser.add_argument("galaxy_config")
113 args = parser.parse_args()
114
115 # known-revisions is populated from the Galaxy data table by the wrapper
116 if args.known_revisions is not None:
117 existing_release_tags = set(args.known_revisions)
118 else:
119 existing_release_tags = set()
120
121 releases_available = get_database_list()
122 if args.testmode:
123 releases = []
124 for name in args.datasets:
125 releases.extend(
126 filter_by_date(
127 [],
128 name,
129 releases_available,
130 start_date=args.start_date,
131 end_date=args.end_date,
132 )
133 )
134 for release in releases:
135 print(
136 release["value"],
137 release["description"],
138 release["date"].isoformat(),
139 release["min_nextclade_version"],
140 )
141 sys.exit(0)
142
143 with open(args.galaxy_config) as fh:
144 config = json.load(fh)
145
146 output_directory = config.get("output_data", [{}])[0].get("extra_files_path", None)
147
148 data_manager_dict = {"data_tables": {args.datatable_name: []}}
149
150 releases = []
151 if args.latest:
152 for dataset in args.datasets:
153 for release in releases_available:
154 if release["database_name"] == dataset:
155 if release["value"] not in existing_release_tags:
156 # add the latest release for this dataset, but only if we don't already have it
157 releases.append(release)
158 break
159 else:
160 for dataset in args.datasets:
161 releases_for_ds = filter_by_date(
162 existing_release_tags,
163 dataset,
164 releases_available,
165 start_date=args.start_date,
166 end_date=args.end_date,
167 )
168 releases.extend(releases_for_ds)
169
170 for release in releases:
171 fname = download_and_unpack(
172 release["database_name"], release["tag"], output_directory
173 )
174 if fname is not None:
175 data_manager_dict["data_tables"][args.datatable_name].append(
176 {
177 "value": release["value"],
178 "database_name": release["database_name"],
179 "description": release["description"],
180 "min_nextclade_version": release["min_nextclade_version"],
181 "date": release["date"].isoformat(), # ISO 8601 is easily sortable
182 "path": str(output_directory / fname),
183 }
184 )
185 data_manager_dict["data_tables"][args.datatable_name].sort(
186 key=operator.itemgetter("value"), reverse=True
187 )
188 with open(args.galaxy_config, "w") as fh:
189 json.dump(data_manager_dict, fh, indent=2, sort_keys=True)