Mercurial > repos > tduigou > create_assembly_picklists
comparison CreateAssemblyPicklists_script.py @ 0:4bde3e90ee98 draft
planemo upload for repository https://github.com/Edinburgh-Genome-Foundry/Plateo commit 98d5e65b8008dbca117b2e0655cfdd54655fac48-dirty
| author | tduigou |
|---|---|
| date | Wed, 06 Aug 2025 08:02:58 +0000 |
| parents | |
| children | 196e13c09881 |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:4bde3e90ee98 |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 # coding: utf-8 | |
| 3 # Code copied from CUBA backend tools.py and create_assembly_picklists/CreateAssemblyPicklistsView.py | |
| 4 # Code modified for running in a script in Galaxy. | |
| 5 ############################################################################## | |
| 6 ############################################################################## | |
| 7 # App code | |
| 8 ## EGF Galaxy Create assembly picklists -- script | |
| 9 | |
| 10 ############################################################################## | |
| 11 # IMPORTS | |
| 12 import argparse | |
| 13 import os | |
| 14 from io import StringIO, BytesIO | |
| 15 import re | |
| 16 from base64 import b64encode, b64decode | |
| 17 from copy import deepcopy | |
| 18 import sys | |
| 19 | |
| 20 from collections import OrderedDict | |
| 21 from fuzzywuzzy import process | |
| 22 import matplotlib.pyplot as plt | |
| 23 from matplotlib.backends.backend_pdf import PdfPages | |
| 24 import pandas | |
| 25 | |
| 26 from Bio import SeqIO | |
| 27 from Bio.SeqRecord import SeqRecord | |
| 28 from Bio.Seq import Seq | |
| 29 | |
| 30 import bandwagon as bw | |
| 31 import crazydoc | |
| 32 from dnachisel.biotools import sequence_to_biopython_record | |
| 33 import dnacauldron | |
| 34 import flametree | |
| 35 from plateo import AssemblyPlan | |
| 36 from plateo.parsers import plate_from_content_spreadsheet | |
| 37 from plateo.containers import Plate4ti0960 | |
| 38 from plateo.exporters import AssemblyPicklistGenerator, picklist_to_assembly_mix_report | |
| 39 from plateo.exporters import ( | |
| 40 picklist_to_labcyte_echo_picklist_file, | |
| 41 picklist_to_tecan_evo_picklist_file, | |
| 42 plate_to_platemap_spreadsheet, | |
| 43 PlateTextPlotter, | |
| 44 ) | |
| 45 from plateo.tools import human_volume | |
| 46 from snapgene_reader import snapgene_file_to_seqrecord | |
| 47 | |
| 48 | |
| 49 ############################################################################## | |
| 50 # FUNCTIONS | |
| 51 | |
| 52 def fix_and_rename_paths(paths): | |
| 53 fixed_paths = [] | |
| 54 for path in paths: | |
| 55 new_path = path.replace("__sq__", "'") | |
| 56 if new_path != path: | |
| 57 os.rename(path, new_path) | |
| 58 fixed_paths.append(new_path) | |
| 59 return fixed_paths | |
| 60 | |
| 61 | |
| 62 def did_you_mean(name, other_names, limit=5, min_score=50): # test | |
| 63 results = process.extract(name, list(other_names), limit=limit) | |
| 64 return [e for (e, score) in results if score >= min_score] | |
| 65 | |
| 66 | |
| 67 def fix_ice_genbank(genbank_txt): | |
| 68 lines = genbank_txt.splitlines() | |
| 69 lines[0] += max(0, 80 - len(lines[0])) * " " | |
| 70 return "\n".join(lines) | |
| 71 | |
| 72 | |
| 73 def write_record(record, target, fmt="genbank"): | |
| 74 """Write a record as genbank, fasta, etc. via Biopython, with fixes""" | |
| 75 record = deepcopy(record) | |
| 76 if fmt == "genbank": | |
| 77 if isinstance(record, (list, tuple)): | |
| 78 for r in record: | |
| 79 r.name = r.name[:20] | |
| 80 else: | |
| 81 record.name = record.name[:20] | |
| 82 if hasattr(target, "open"): | |
| 83 target = target.open("w") | |
| 84 SeqIO.write(record, target, fmt) | |
| 85 | |
| 86 | |
| 87 def autoname_genbank_file(record): | |
| 88 return record.id.replace(".", "_") + ".gb" | |
| 89 | |
| 90 | |
| 91 def string_to_records(string): | |
| 92 """Convert a string of a fasta, genbank... into a simple ATGC string. | |
| 93 | |
| 94 Can also be used to detect a format. | |
| 95 """ | |
| 96 matches = re.match("([ATGC][ATGC]*)", string) | |
| 97 # print("============", len(matches.groups()[0]), len(string)) | |
| 98 # print (matches.groups()[0] == string) | |
| 99 if (matches is not None) and (matches.groups()[0] == string): | |
| 100 return [SeqRecord(Seq(string))], "ATGC" | |
| 101 | |
| 102 for fmt in ("fasta", "genbank"): | |
| 103 if fmt == "genbank": | |
| 104 string = fix_ice_genbank(string) | |
| 105 try: | |
| 106 stringio = StringIO(string) | |
| 107 records = list(SeqIO.parse(stringio, fmt)) | |
| 108 if len(records) > 0: | |
| 109 return (records, fmt) | |
| 110 except: | |
| 111 pass | |
| 112 try: | |
| 113 record = snapgene_file_to_seqrecord(filecontent=StringIO(string)) | |
| 114 return [record] | |
| 115 except: | |
| 116 pass | |
| 117 raise ValueError("Invalid sequence format") | |
| 118 | |
| 119 | |
| 120 def file_to_filelike_object(file_, type="byte"): | |
| 121 content = file_.content.split("base64,")[1] | |
| 122 filelike = BytesIO if (type == "byte") else StringIO | |
| 123 return filelike(b64decode(content)) | |
| 124 | |
| 125 | |
| 126 def spreadsheet_file_to_dataframe(filedict, header="infer"): | |
| 127 filelike = file_to_filelike_object(filedict) | |
| 128 if filedict.name.endswith(".csv"): | |
| 129 return pandas.read_csv(filelike, header=header) | |
| 130 else: | |
| 131 return pandas.read_excel(filelike, header=header) | |
| 132 | |
| 133 | |
| 134 def records_from_zip_file(zip_file, use_file_names_as_ids=False): | |
| 135 zip_name = zip_file.name | |
| 136 zip_file = flametree.file_tree(file_to_filelike_object(zip_file)) | |
| 137 records = [] | |
| 138 for f in zip_file._all_files: | |
| 139 ext = f._extension.lower() | |
| 140 if ext in ["gb", "gbk", "fa", "dna"]: | |
| 141 try: | |
| 142 new_records, fmt = string_to_records(f.read()) | |
| 143 if not isinstance(new_records, list): | |
| 144 new_records = [new_records] | |
| 145 except: | |
| 146 content_stream = BytesIO(f.read("rb")) | |
| 147 try: | |
| 148 record = snapgene_file_to_seqrecord(fileobject=content_stream) | |
| 149 new_records, fmt = [record], "snapgene" | |
| 150 except: | |
| 151 try: | |
| 152 parser = crazydoc.CrazydocParser( | |
| 153 ["highlight_color", "bold", "underline"] | |
| 154 ) | |
| 155 new_records = parser.parse_doc_file(content_stream) | |
| 156 fmt = "doc" | |
| 157 except: | |
| 158 raise ValueError("Format not recognized for file " + f._path) | |
| 159 | |
| 160 single_record = len(new_records) == 1 | |
| 161 for i, record in enumerate(new_records): | |
| 162 name = record.id | |
| 163 if name in [ | |
| 164 None, | |
| 165 "", | |
| 166 "<unknown id>", | |
| 167 ".", | |
| 168 " ", | |
| 169 "<unknown name>", | |
| 170 ]: | |
| 171 number = "" if single_record else ("%04d" % i) | |
| 172 name = f._name_no_extension.replace(" ", "_") + number | |
| 173 record.id = name | |
| 174 record.name = name | |
| 175 record.file_name = f._name_no_extension | |
| 176 record.zip_file_name = zip_name | |
| 177 if use_file_names_as_ids and single_record: | |
| 178 basename = os.path.basename(record.file_name) | |
| 179 basename_no_extension = os.path.splitext(basename)[0] | |
| 180 record.id = basename_no_extension | |
| 181 records += new_records | |
| 182 return records | |
| 183 | |
| 184 | |
| 185 def records_from_data_file(data_file): | |
| 186 content = b64decode(data_file.content.split("base64,")[1]) | |
| 187 try: | |
| 188 records, fmt = string_to_records(content.decode("utf-8")) | |
| 189 except: | |
| 190 try: | |
| 191 record = snapgene_file_to_seqrecord(fileobject=BytesIO(content)) | |
| 192 records, fmt = [record], "snapgene" | |
| 193 except: | |
| 194 try: | |
| 195 parser = crazydoc.CrazydocParser( | |
| 196 ["highlight_color", "bold", "underline"] | |
| 197 ) | |
| 198 records = parser.parse_doc_file(BytesIO(content)) | |
| 199 fmt = "doc" | |
| 200 except: | |
| 201 try: | |
| 202 df = spreadsheet_file_to_dataframe(data_file, header=None) | |
| 203 records = [ | |
| 204 sequence_to_biopython_record(sequence=seq, id=name, name=name) | |
| 205 for name, seq in df.values | |
| 206 ] | |
| 207 fmt = "spreadsheet" | |
| 208 except: | |
| 209 raise ValueError("Format not recognized for file " + data_file.name) | |
| 210 if not isinstance(records, list): | |
| 211 records = [records] | |
| 212 return records, fmt | |
| 213 | |
| 214 | |
| 215 def record_to_formated_string(record, fmt="genbank", remove_descr=False): | |
| 216 if remove_descr: | |
| 217 record = deepcopy(record) | |
| 218 if isinstance(record, (list, tuple)): | |
| 219 for r in record: | |
| 220 r.description = "" | |
| 221 else: | |
| 222 record.description = "" | |
| 223 fileobject = StringIO() | |
| 224 write_record(record, fileobject, fmt) | |
| 225 return fileobject.getvalue().encode("utf-8") | |
| 226 | |
| 227 | |
| 228 def records_from_data_files(data_files, use_file_names_as_ids=False): | |
| 229 records = [] | |
| 230 for file_ in data_files: | |
| 231 circular = ("circular" not in file_) or file_.circular | |
| 232 if file_.name.lower().endswith("zip"): | |
| 233 records += records_from_zip_file( | |
| 234 file_, use_file_names_as_ids=use_file_names_as_ids | |
| 235 ) | |
| 236 continue | |
| 237 recs, fmt = records_from_data_file(file_) | |
| 238 single_record = len(recs) == 1 | |
| 239 for i, record in enumerate(recs): | |
| 240 record.circular = circular | |
| 241 record.linear = not circular | |
| 242 name_no_extension = "".join(file_.name.split(".")[:-1]) | |
| 243 name = name_no_extension + ("" if single_record else ("%04d" % i)) | |
| 244 name = name.replace(" ", "_") | |
| 245 UNKNOWN_IDS = [ | |
| 246 "None", | |
| 247 "", | |
| 248 "<unknown id>", | |
| 249 ".", | |
| 250 "EXPORTED", | |
| 251 "<unknown name>", | |
| 252 "Exported", | |
| 253 ] | |
| 254 # Sorry for this parts, it took a lot of "whatever works". | |
| 255 # keep your part names under 20c and pointless, and everything | |
| 256 # will be good | |
| 257 if str(record.id).strip() in UNKNOWN_IDS: | |
| 258 record.id = name | |
| 259 if str(record.name).strip() in UNKNOWN_IDS: | |
| 260 record.name = name | |
| 261 record.file_name = name_no_extension | |
| 262 if use_file_names_as_ids and single_record: | |
| 263 basename = os.path.basename(record.source_file) | |
| 264 basename_no_extension = os.path.splitext(basename)[0] | |
| 265 record.id = basename_no_extension | |
| 266 records += recs | |
| 267 return records | |
| 268 | |
| 269 | |
| 270 def data_to_html_data(data, datatype, filename=None): | |
| 271 """Data types: zip, genbank, fasta, pdf""" | |
| 272 datatype = { | |
| 273 "zip": "application/zip", | |
| 274 "genbank": "application/genbank", | |
| 275 "fasta": "application/fasta", | |
| 276 "pdf": "application/pdf", | |
| 277 "xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", | |
| 278 }.get(datatype, datatype) | |
| 279 datatype = "data:%s;" % datatype | |
| 280 data64 = "base64,%s" % b64encode(data).decode("utf-8") | |
| 281 headers = "" | |
| 282 if filename is not None: | |
| 283 headers += "headers=filename%3D" + filename + ";" | |
| 284 return datatype + headers + data64 | |
| 285 | |
| 286 | |
| 287 def zip_data_to_html_data(data): | |
| 288 return data_to_html_data(data, "application/zip") | |
| 289 | |
| 290 | |
| 291 LADDERS = {"100_to_4k": bw.ladders.LADDER_100_to_4k} | |
| 292 | |
| 293 | |
| 294 def matplotlib_figure_to_svg_base64_data(fig, **kwargs): | |
| 295 """Return a string of the form 'data:image/svg+xml;base64,XXX' where XXX | |
| 296 is the base64-encoded svg version of the figure.""" | |
| 297 output = BytesIO() | |
| 298 fig.savefig(output, format="svg", **kwargs) | |
| 299 svg_txt = output.getvalue().decode("utf-8") | |
| 300 svg_txt = "\n".join(svg_txt.split("\n")[4:]) | |
| 301 svg_txt = "".join(svg_txt.split("\n")) | |
| 302 | |
| 303 content = b64encode(svg_txt.encode("utf-8")) | |
| 304 result = (b"data:image/svg+xml;base64," + content).decode("utf-8") | |
| 305 | |
| 306 return result | |
| 307 | |
| 308 | |
| 309 def matplotlib_figure_to_bitmap_base64_data(fig, fmt="png", **kwargs): | |
| 310 """Return a string of the form 'data:image/png;base64,XXX' where XXX | |
| 311 is the base64-encoded svg version of the figure.""" | |
| 312 output = BytesIO() | |
| 313 fig.savefig(output, format=fmt, **kwargs) | |
| 314 bitmap = output.getvalue() | |
| 315 content = b64encode(bitmap) | |
| 316 result = (b"data:image/%s;base64,%s" % (fmt.encode("utf-8"), content)).decode( | |
| 317 "utf-8" | |
| 318 ) | |
| 319 return result | |
| 320 | |
| 321 | |
| 322 def figures_to_pdf_report_data(figures, filename="report.pdf"): | |
| 323 pdf_io = BytesIO() | |
| 324 with PdfPages(pdf_io) as pdf: | |
| 325 for fig in figures: | |
| 326 pdf.savefig(fig, bbox_inches="tight") | |
| 327 return { | |
| 328 "data": ( | |
| 329 "data:application/pdf;base64," | |
| 330 + b64encode(pdf_io.getvalue()).decode("utf-8") | |
| 331 ), | |
| 332 "name": filename, | |
| 333 "mimetype": "application/pdf", | |
| 334 } | |
| 335 | |
| 336 | |
| 337 def csv_to_list(csv_string, sep=","): | |
| 338 return [ | |
| 339 element.strip() | |
| 340 for line in csv_string.split("\n") | |
| 341 for element in line.split(sep) | |
| 342 if len(element.strip()) | |
| 343 ] | |
| 344 | |
| 345 | |
| 346 def set_record_topology(record, topology): | |
| 347 """Set the Biopython record's topology, possibly passing if already set. | |
| 348 | |
| 349 This actually sets the ``record.annotations['topology']``.The ``topology`` | |
| 350 parameter can be "circular", "linear", "default_to_circular" (will default | |
| 351 to circular if ``annotations['topology']`` is not already set) or | |
| 352 "default_to_linear". | |
| 353 """ | |
| 354 valid_topologies = [ | |
| 355 "circular", | |
| 356 "linear", | |
| 357 "default_to_circular", | |
| 358 "default_to_linear", | |
| 359 ] | |
| 360 if topology not in valid_topologies: | |
| 361 raise ValueError( | |
| 362 "topology (%s) should be one of %s." | |
| 363 % (topology, ", ".join(valid_topologies)) | |
| 364 ) | |
| 365 annotations = record.annotations | |
| 366 default_prefix = "default_to_" | |
| 367 if topology.startswith(default_prefix): | |
| 368 if "topology" not in annotations: | |
| 369 annotations["topology"] = topology[len(default_prefix) :] | |
| 370 else: | |
| 371 annotations["topology"] = topology | |
| 372 | |
| 373 | |
| 374 ############################################################################## | |
| 375 def main(): | |
| 376 | |
| 377 parser = argparse.ArgumentParser(description="Generate picklist for DNA assembly.") | |
| 378 parser.add_argument("--parts_files", help="Directory with parts data or file with part sizes") | |
| 379 parser.add_argument("--picklist", type=str, help="Path to the assembly plan CSV or Excel file") | |
| 380 parser.add_argument("--source_plate", help="Source plate file (CSV or Excel)") | |
| 381 parser.add_argument("--backbone_name", help="Name of the backbone") | |
| 382 parser.add_argument("--result_zip", help="Name of the output zip file") | |
| 383 parser.add_argument("--part_backbone_ratio", type=float, help="Part to backbone molar ratio") | |
| 384 parser.add_argument("--quantity_unit", choices=["fmol", "nM", "ng"], help="Quantity unit") | |
| 385 parser.add_argument("--part_quantity", type=float, help="Quantity of each part") | |
| 386 parser.add_argument("--buffer_volume", type=float, help="Buffer volume in µL") | |
| 387 parser.add_argument("--total_volume", type=float, help="Total reaction volume in µL") | |
| 388 parser.add_argument("--dispenser", choices=["labcyte_echo", "tecan_evo"], help="Dispenser machine") | |
| 389 | |
| 390 args = parser.parse_args() | |
| 391 | |
| 392 # Parameters: | |
| 393 picklist = args.picklist # assembly plan | |
| 394 # directory or can be a csv/Excel with part sizes | |
| 395 if isinstance(args.parts_files, str): | |
| 396 args.parts_files = args.parts_files.split(",") | |
| 397 parts_dir = fix_and_rename_paths(args.parts_files) | |
| 398 source_plate_path = args.source_plate | |
| 399 backbone_name = args.backbone_name | |
| 400 part_backbone_ratio = args.part_backbone_ratio | |
| 401 result_zip_file = args.result_zip # output file name "picklist.zip" | |
| 402 ############################################################################## | |
| 403 # Defaults: | |
| 404 destination_plate = None | |
| 405 destination_type = "new" # this parameter is not actually used | |
| 406 destination_size = 96 # this parameter is not actually used | |
| 407 fill_by = "column" # this parameter is not actually used | |
| 408 quantity_unit = args.quantity_unit | |
| 409 part_quantity = args.part_quantity # 1.3 | |
| 410 buffer_volume = args.buffer_volume # 0.3 # (µL) | |
| 411 total_volume = args.total_volume # 1 # (µL) | |
| 412 dispenser_machine = args.dispenser | |
| 413 dispenser_min_volume = 0.5 # (nL), this parameter is not actually used | |
| 414 dispenser_max_volume = 5 # (µL), this parameter is not actually used | |
| 415 dispenser_resolution = 2.5 # (nL), this parameter is not actually used | |
| 416 dispenser_dead_volume = 8 # (µL), this parameter is not actually used | |
| 417 use_file_names_as_ids = True | |
| 418 | |
| 419 # CODE | |
| 420 if picklist.endswith(".csv"): | |
| 421 csv = picklist.read().decode() | |
| 422 rows = [line.split(",") for line in csv.split("\n") if len(line)] | |
| 423 else: | |
| 424 dataframe = pandas.read_excel(picklist) | |
| 425 rows = [row for i, row in dataframe.iterrows()] | |
| 426 | |
| 427 assembly_plan = AssemblyPlan( | |
| 428 OrderedDict( | |
| 429 [ | |
| 430 ( | |
| 431 row[0], | |
| 432 [ | |
| 433 str(e).strip() | |
| 434 for e in row[1:] | |
| 435 if str(e).strip() not in ["-", "nan", ""] | |
| 436 ], | |
| 437 ) | |
| 438 for row in rows | |
| 439 if row[0] not in ["nan", "Construct name", "constructs", "construct"] | |
| 440 ] | |
| 441 ) | |
| 442 ) | |
| 443 for assembly, parts in assembly_plan.assemblies.items(): | |
| 444 assembly_plan.assemblies[assembly] = [part.replace(" ", "_") for part in parts] | |
| 445 | |
| 446 # Reading part infos | |
| 447 if not isinstance(parts_dir, list): | |
| 448 if parts_dir.endswith((".csv", ".xls", ".xlsx")): # part sizes specified in table | |
| 449 if parts_dir.endswith(".csv"): | |
| 450 dataframe = pandas.read_csv(parts_dir) | |
| 451 else: | |
| 452 dataframe = pandas.read_excel(parts_dir) | |
| 453 parts_data = {row.part: {"size": row["size"]} for i, row in dataframe.iterrows()} | |
| 454 else: # input records | |
| 455 records = dnacauldron.biotools.load_records_from_files( | |
| 456 files=parts_dir, use_file_names_as_ids=use_file_names_as_ids | |
| 457 ) | |
| 458 parts_data = {rec.id.replace(" ", "_").lower(): {"record": rec} for rec in records} | |
| 459 #parts_data = process_parts_with_mapping(records, args.file_name_mapping) | |
| 460 assembly_plan.parts_data = parts_data | |
| 461 parts_without_data = assembly_plan.parts_without_data() | |
| 462 if len(parts_without_data): | |
| 463 print("success: False") | |
| 464 print("message: Some parts have no provided record or data.") | |
| 465 print("missing_parts: ", parts_without_data) | |
| 466 sys.exit() | |
| 467 # Reading protocol | |
| 468 if quantity_unit == "fmol": | |
| 469 part_mol = part_quantity * 1e-15 | |
| 470 part_g = None | |
| 471 if quantity_unit == "nM": | |
| 472 part_mol = part_quantity * total_volume * 1e-15 | |
| 473 part_g = None | |
| 474 if quantity_unit == "ng": | |
| 475 part_mol = None | |
| 476 part_g = part_quantity * 1e-9 | |
| 477 # Backbone:part molar ratio calculation is not performed in this case. | |
| 478 # This ensures no change regardless of form input: | |
| 479 part_backbone_ratio = 1 | |
| 480 print("Generating picklist") | |
| 481 picklist_generator = AssemblyPicklistGenerator( | |
| 482 part_mol=part_mol, | |
| 483 part_g=part_g, | |
| 484 complement_to=total_volume * 1e-6, # convert uL to L | |
| 485 buffer_volume=buffer_volume * 1e-6, | |
| 486 volume_rounding=2.5e-9, # not using parameter from form | |
| 487 minimal_dispense_volume=5e-9, # Echo machine's minimum dispense - | |
| 488 ) | |
| 489 backbone_name_list = backbone_name.split(",") | |
| 490 source_plate = plate_from_content_spreadsheet(source_plate_path) | |
| 491 | |
| 492 for well in source_plate.iter_wells(): | |
| 493 if well.is_empty: | |
| 494 continue | |
| 495 quantities = well.content.quantities | |
| 496 part, quantity = list(quantities.items())[0] | |
| 497 quantities.pop(part) | |
| 498 quantities[part.replace(" ", "_")] = quantity | |
| 499 | |
| 500 if part in backbone_name_list: | |
| 501 # This section multiplies the backbone concentration with the | |
| 502 # part:backbone molar ratio. This tricks the calculator into making | |
| 503 # a picklist with the desired ratio. | |
| 504 # For example, a part:backbone = 2:1 will multiply the | |
| 505 # backbone concentration by 2, therefore half as much of it will be | |
| 506 # added to the well. | |
| 507 quantities[part.replace(" ", "_")] = quantity * part_backbone_ratio | |
| 508 else: | |
| 509 quantities[part.replace(" ", "_")] = quantity | |
| 510 | |
| 511 source_plate.name = "Source" | |
| 512 if destination_plate: | |
| 513 dest_filelike = file_to_filelike_object(destination_plate) | |
| 514 destination_plate = plate_from_content_spreadsheet(destination_plate) | |
| 515 else: | |
| 516 destination_plate = Plate4ti0960("Mixplate") | |
| 517 destination_wells = ( | |
| 518 well for well in destination_plate.iter_wells(direction="column") if well.is_empty | |
| 519 ) | |
| 520 picklist, picklist_data = picklist_generator.make_picklist( | |
| 521 assembly_plan, | |
| 522 source_wells=source_plate.iter_wells(), | |
| 523 destination_wells=destination_wells, | |
| 524 ) | |
| 525 if picklist is None: | |
| 526 print("success: False") | |
| 527 print("message: Some parts in the assembly plan have no corresponding well.") | |
| 528 print("picklist_data: ", picklist_data) | |
| 529 print("missing_parts:", picklist_data.get("missing_parts", None)) | |
| 530 sys.exit() | |
| 531 | |
| 532 future_plates = picklist.simulate(inplace=False) | |
| 533 | |
| 534 | |
| 535 def text(w): | |
| 536 txt = human_volume(w.content.volume) | |
| 537 if "construct" in w.data: | |
| 538 txt = "\n".join([w.data["construct"], txt]) | |
| 539 return txt | |
| 540 | |
| 541 | |
| 542 plotter = PlateTextPlotter(text) | |
| 543 ax, _ = plotter.plot_plate(future_plates[destination_plate], figsize=(20, 8)) | |
| 544 | |
| 545 ziproot = flametree.file_tree(result_zip_file, replace=True) | |
| 546 | |
| 547 # MIXPLATE MAP PLOT | |
| 548 ax.figure.savefig( | |
| 549 ziproot._file("final_mixplate.pdf").open("wb"), | |
| 550 format="pdf", | |
| 551 bbox_inches="tight", | |
| 552 ) | |
| 553 plt.close(ax.figure) | |
| 554 plate_to_platemap_spreadsheet( | |
| 555 future_plates[destination_plate], | |
| 556 lambda w: w.data.get("construct", ""), | |
| 557 filepath=ziproot._file("final_mixplate.xls").open("wb"), | |
| 558 ) | |
| 559 | |
| 560 # ASSEMBLY REPORT | |
| 561 print("Writing report...") | |
| 562 picklist_to_assembly_mix_report( | |
| 563 picklist, | |
| 564 ziproot._file("assembly_mix_picklist_report.pdf").open("wb"), | |
| 565 data=picklist_data, | |
| 566 ) | |
| 567 assembly_plan.write_report(ziproot._file("assembly_plan_summary.pdf").open("wb")) | |
| 568 | |
| 569 # MACHINE PICKLIST | |
| 570 | |
| 571 if dispenser_machine == "labcyte_echo": | |
| 572 picklist_to_labcyte_echo_picklist_file( | |
| 573 picklist, ziproot._file("ECHO_picklist.csv").open("w") | |
| 574 ) | |
| 575 else: | |
| 576 picklist_to_tecan_evo_picklist_file( | |
| 577 picklist, ziproot._file("EVO_picklist.gwl").open("w") | |
| 578 ) | |
| 579 # We'll not write the input source plate. | |
| 580 # raw = file_to_filelike_object(source_plate_path).read() | |
| 581 # f = ziproot.copy(source_plate_path) | |
| 582 # f.write(raw, mode="wb") | |
| 583 ziproot._close() | |
| 584 print("success: True") | |
| 585 | |
| 586 | |
| 587 if __name__ == "__main__": | |
| 588 main() |
