comparison CreateAssemblyPicklists_script.py @ 0:4bde3e90ee98 draft

planemo upload for repository https://github.com/Edinburgh-Genome-Foundry/Plateo commit 98d5e65b8008dbca117b2e0655cfdd54655fac48-dirty
author tduigou
date Wed, 06 Aug 2025 08:02:58 +0000
parents
children 196e13c09881
comparison
equal deleted inserted replaced
-1:000000000000 0:4bde3e90ee98
1 #!/usr/bin/env python
2 # coding: utf-8
3 # Code copied from CUBA backend tools.py and create_assembly_picklists/CreateAssemblyPicklistsView.py
4 # Code modified for running in a script in Galaxy.
5 ##############################################################################
6 ##############################################################################
7 # App code
8 ## EGF Galaxy Create assembly picklists -- script
9
10 ##############################################################################
11 # IMPORTS
12 import argparse
13 import os
14 from io import StringIO, BytesIO
15 import re
16 from base64 import b64encode, b64decode
17 from copy import deepcopy
18 import sys
19
20 from collections import OrderedDict
21 from fuzzywuzzy import process
22 import matplotlib.pyplot as plt
23 from matplotlib.backends.backend_pdf import PdfPages
24 import pandas
25
26 from Bio import SeqIO
27 from Bio.SeqRecord import SeqRecord
28 from Bio.Seq import Seq
29
30 import bandwagon as bw
31 import crazydoc
32 from dnachisel.biotools import sequence_to_biopython_record
33 import dnacauldron
34 import flametree
35 from plateo import AssemblyPlan
36 from plateo.parsers import plate_from_content_spreadsheet
37 from plateo.containers import Plate4ti0960
38 from plateo.exporters import AssemblyPicklistGenerator, picklist_to_assembly_mix_report
39 from plateo.exporters import (
40 picklist_to_labcyte_echo_picklist_file,
41 picklist_to_tecan_evo_picklist_file,
42 plate_to_platemap_spreadsheet,
43 PlateTextPlotter,
44 )
45 from plateo.tools import human_volume
46 from snapgene_reader import snapgene_file_to_seqrecord
47
48
49 ##############################################################################
50 # FUNCTIONS
51
52 def fix_and_rename_paths(paths):
53 fixed_paths = []
54 for path in paths:
55 new_path = path.replace("__sq__", "'")
56 if new_path != path:
57 os.rename(path, new_path)
58 fixed_paths.append(new_path)
59 return fixed_paths
60
61
62 def did_you_mean(name, other_names, limit=5, min_score=50): # test
63 results = process.extract(name, list(other_names), limit=limit)
64 return [e for (e, score) in results if score >= min_score]
65
66
67 def fix_ice_genbank(genbank_txt):
68 lines = genbank_txt.splitlines()
69 lines[0] += max(0, 80 - len(lines[0])) * " "
70 return "\n".join(lines)
71
72
73 def write_record(record, target, fmt="genbank"):
74 """Write a record as genbank, fasta, etc. via Biopython, with fixes"""
75 record = deepcopy(record)
76 if fmt == "genbank":
77 if isinstance(record, (list, tuple)):
78 for r in record:
79 r.name = r.name[:20]
80 else:
81 record.name = record.name[:20]
82 if hasattr(target, "open"):
83 target = target.open("w")
84 SeqIO.write(record, target, fmt)
85
86
87 def autoname_genbank_file(record):
88 return record.id.replace(".", "_") + ".gb"
89
90
91 def string_to_records(string):
92 """Convert a string of a fasta, genbank... into a simple ATGC string.
93
94 Can also be used to detect a format.
95 """
96 matches = re.match("([ATGC][ATGC]*)", string)
97 # print("============", len(matches.groups()[0]), len(string))
98 # print (matches.groups()[0] == string)
99 if (matches is not None) and (matches.groups()[0] == string):
100 return [SeqRecord(Seq(string))], "ATGC"
101
102 for fmt in ("fasta", "genbank"):
103 if fmt == "genbank":
104 string = fix_ice_genbank(string)
105 try:
106 stringio = StringIO(string)
107 records = list(SeqIO.parse(stringio, fmt))
108 if len(records) > 0:
109 return (records, fmt)
110 except:
111 pass
112 try:
113 record = snapgene_file_to_seqrecord(filecontent=StringIO(string))
114 return [record]
115 except:
116 pass
117 raise ValueError("Invalid sequence format")
118
119
120 def file_to_filelike_object(file_, type="byte"):
121 content = file_.content.split("base64,")[1]
122 filelike = BytesIO if (type == "byte") else StringIO
123 return filelike(b64decode(content))
124
125
126 def spreadsheet_file_to_dataframe(filedict, header="infer"):
127 filelike = file_to_filelike_object(filedict)
128 if filedict.name.endswith(".csv"):
129 return pandas.read_csv(filelike, header=header)
130 else:
131 return pandas.read_excel(filelike, header=header)
132
133
134 def records_from_zip_file(zip_file, use_file_names_as_ids=False):
135 zip_name = zip_file.name
136 zip_file = flametree.file_tree(file_to_filelike_object(zip_file))
137 records = []
138 for f in zip_file._all_files:
139 ext = f._extension.lower()
140 if ext in ["gb", "gbk", "fa", "dna"]:
141 try:
142 new_records, fmt = string_to_records(f.read())
143 if not isinstance(new_records, list):
144 new_records = [new_records]
145 except:
146 content_stream = BytesIO(f.read("rb"))
147 try:
148 record = snapgene_file_to_seqrecord(fileobject=content_stream)
149 new_records, fmt = [record], "snapgene"
150 except:
151 try:
152 parser = crazydoc.CrazydocParser(
153 ["highlight_color", "bold", "underline"]
154 )
155 new_records = parser.parse_doc_file(content_stream)
156 fmt = "doc"
157 except:
158 raise ValueError("Format not recognized for file " + f._path)
159
160 single_record = len(new_records) == 1
161 for i, record in enumerate(new_records):
162 name = record.id
163 if name in [
164 None,
165 "",
166 "<unknown id>",
167 ".",
168 " ",
169 "<unknown name>",
170 ]:
171 number = "" if single_record else ("%04d" % i)
172 name = f._name_no_extension.replace(" ", "_") + number
173 record.id = name
174 record.name = name
175 record.file_name = f._name_no_extension
176 record.zip_file_name = zip_name
177 if use_file_names_as_ids and single_record:
178 basename = os.path.basename(record.file_name)
179 basename_no_extension = os.path.splitext(basename)[0]
180 record.id = basename_no_extension
181 records += new_records
182 return records
183
184
185 def records_from_data_file(data_file):
186 content = b64decode(data_file.content.split("base64,")[1])
187 try:
188 records, fmt = string_to_records(content.decode("utf-8"))
189 except:
190 try:
191 record = snapgene_file_to_seqrecord(fileobject=BytesIO(content))
192 records, fmt = [record], "snapgene"
193 except:
194 try:
195 parser = crazydoc.CrazydocParser(
196 ["highlight_color", "bold", "underline"]
197 )
198 records = parser.parse_doc_file(BytesIO(content))
199 fmt = "doc"
200 except:
201 try:
202 df = spreadsheet_file_to_dataframe(data_file, header=None)
203 records = [
204 sequence_to_biopython_record(sequence=seq, id=name, name=name)
205 for name, seq in df.values
206 ]
207 fmt = "spreadsheet"
208 except:
209 raise ValueError("Format not recognized for file " + data_file.name)
210 if not isinstance(records, list):
211 records = [records]
212 return records, fmt
213
214
215 def record_to_formated_string(record, fmt="genbank", remove_descr=False):
216 if remove_descr:
217 record = deepcopy(record)
218 if isinstance(record, (list, tuple)):
219 for r in record:
220 r.description = ""
221 else:
222 record.description = ""
223 fileobject = StringIO()
224 write_record(record, fileobject, fmt)
225 return fileobject.getvalue().encode("utf-8")
226
227
228 def records_from_data_files(data_files, use_file_names_as_ids=False):
229 records = []
230 for file_ in data_files:
231 circular = ("circular" not in file_) or file_.circular
232 if file_.name.lower().endswith("zip"):
233 records += records_from_zip_file(
234 file_, use_file_names_as_ids=use_file_names_as_ids
235 )
236 continue
237 recs, fmt = records_from_data_file(file_)
238 single_record = len(recs) == 1
239 for i, record in enumerate(recs):
240 record.circular = circular
241 record.linear = not circular
242 name_no_extension = "".join(file_.name.split(".")[:-1])
243 name = name_no_extension + ("" if single_record else ("%04d" % i))
244 name = name.replace(" ", "_")
245 UNKNOWN_IDS = [
246 "None",
247 "",
248 "<unknown id>",
249 ".",
250 "EXPORTED",
251 "<unknown name>",
252 "Exported",
253 ]
254 # Sorry for this parts, it took a lot of "whatever works".
255 # keep your part names under 20c and pointless, and everything
256 # will be good
257 if str(record.id).strip() in UNKNOWN_IDS:
258 record.id = name
259 if str(record.name).strip() in UNKNOWN_IDS:
260 record.name = name
261 record.file_name = name_no_extension
262 if use_file_names_as_ids and single_record:
263 basename = os.path.basename(record.source_file)
264 basename_no_extension = os.path.splitext(basename)[0]
265 record.id = basename_no_extension
266 records += recs
267 return records
268
269
270 def data_to_html_data(data, datatype, filename=None):
271 """Data types: zip, genbank, fasta, pdf"""
272 datatype = {
273 "zip": "application/zip",
274 "genbank": "application/genbank",
275 "fasta": "application/fasta",
276 "pdf": "application/pdf",
277 "xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
278 }.get(datatype, datatype)
279 datatype = "data:%s;" % datatype
280 data64 = "base64,%s" % b64encode(data).decode("utf-8")
281 headers = ""
282 if filename is not None:
283 headers += "headers=filename%3D" + filename + ";"
284 return datatype + headers + data64
285
286
287 def zip_data_to_html_data(data):
288 return data_to_html_data(data, "application/zip")
289
290
291 LADDERS = {"100_to_4k": bw.ladders.LADDER_100_to_4k}
292
293
294 def matplotlib_figure_to_svg_base64_data(fig, **kwargs):
295 """Return a string of the form 'data:image/svg+xml;base64,XXX' where XXX
296 is the base64-encoded svg version of the figure."""
297 output = BytesIO()
298 fig.savefig(output, format="svg", **kwargs)
299 svg_txt = output.getvalue().decode("utf-8")
300 svg_txt = "\n".join(svg_txt.split("\n")[4:])
301 svg_txt = "".join(svg_txt.split("\n"))
302
303 content = b64encode(svg_txt.encode("utf-8"))
304 result = (b"data:image/svg+xml;base64," + content).decode("utf-8")
305
306 return result
307
308
309 def matplotlib_figure_to_bitmap_base64_data(fig, fmt="png", **kwargs):
310 """Return a string of the form 'data:image/png;base64,XXX' where XXX
311 is the base64-encoded svg version of the figure."""
312 output = BytesIO()
313 fig.savefig(output, format=fmt, **kwargs)
314 bitmap = output.getvalue()
315 content = b64encode(bitmap)
316 result = (b"data:image/%s;base64,%s" % (fmt.encode("utf-8"), content)).decode(
317 "utf-8"
318 )
319 return result
320
321
322 def figures_to_pdf_report_data(figures, filename="report.pdf"):
323 pdf_io = BytesIO()
324 with PdfPages(pdf_io) as pdf:
325 for fig in figures:
326 pdf.savefig(fig, bbox_inches="tight")
327 return {
328 "data": (
329 "data:application/pdf;base64,"
330 + b64encode(pdf_io.getvalue()).decode("utf-8")
331 ),
332 "name": filename,
333 "mimetype": "application/pdf",
334 }
335
336
337 def csv_to_list(csv_string, sep=","):
338 return [
339 element.strip()
340 for line in csv_string.split("\n")
341 for element in line.split(sep)
342 if len(element.strip())
343 ]
344
345
346 def set_record_topology(record, topology):
347 """Set the Biopython record's topology, possibly passing if already set.
348
349 This actually sets the ``record.annotations['topology']``.The ``topology``
350 parameter can be "circular", "linear", "default_to_circular" (will default
351 to circular if ``annotations['topology']`` is not already set) or
352 "default_to_linear".
353 """
354 valid_topologies = [
355 "circular",
356 "linear",
357 "default_to_circular",
358 "default_to_linear",
359 ]
360 if topology not in valid_topologies:
361 raise ValueError(
362 "topology (%s) should be one of %s."
363 % (topology, ", ".join(valid_topologies))
364 )
365 annotations = record.annotations
366 default_prefix = "default_to_"
367 if topology.startswith(default_prefix):
368 if "topology" not in annotations:
369 annotations["topology"] = topology[len(default_prefix) :]
370 else:
371 annotations["topology"] = topology
372
373
374 ##############################################################################
375 def main():
376
377 parser = argparse.ArgumentParser(description="Generate picklist for DNA assembly.")
378 parser.add_argument("--parts_files", help="Directory with parts data or file with part sizes")
379 parser.add_argument("--picklist", type=str, help="Path to the assembly plan CSV or Excel file")
380 parser.add_argument("--source_plate", help="Source plate file (CSV or Excel)")
381 parser.add_argument("--backbone_name", help="Name of the backbone")
382 parser.add_argument("--result_zip", help="Name of the output zip file")
383 parser.add_argument("--part_backbone_ratio", type=float, help="Part to backbone molar ratio")
384 parser.add_argument("--quantity_unit", choices=["fmol", "nM", "ng"], help="Quantity unit")
385 parser.add_argument("--part_quantity", type=float, help="Quantity of each part")
386 parser.add_argument("--buffer_volume", type=float, help="Buffer volume in µL")
387 parser.add_argument("--total_volume", type=float, help="Total reaction volume in µL")
388 parser.add_argument("--dispenser", choices=["labcyte_echo", "tecan_evo"], help="Dispenser machine")
389
390 args = parser.parse_args()
391
392 # Parameters:
393 picklist = args.picklist # assembly plan
394 # directory or can be a csv/Excel with part sizes
395 if isinstance(args.parts_files, str):
396 args.parts_files = args.parts_files.split(",")
397 parts_dir = fix_and_rename_paths(args.parts_files)
398 source_plate_path = args.source_plate
399 backbone_name = args.backbone_name
400 part_backbone_ratio = args.part_backbone_ratio
401 result_zip_file = args.result_zip # output file name "picklist.zip"
402 ##############################################################################
403 # Defaults:
404 destination_plate = None
405 destination_type = "new" # this parameter is not actually used
406 destination_size = 96 # this parameter is not actually used
407 fill_by = "column" # this parameter is not actually used
408 quantity_unit = args.quantity_unit
409 part_quantity = args.part_quantity # 1.3
410 buffer_volume = args.buffer_volume # 0.3 # (µL)
411 total_volume = args.total_volume # 1 # (µL)
412 dispenser_machine = args.dispenser
413 dispenser_min_volume = 0.5 # (nL), this parameter is not actually used
414 dispenser_max_volume = 5 # (µL), this parameter is not actually used
415 dispenser_resolution = 2.5 # (nL), this parameter is not actually used
416 dispenser_dead_volume = 8 # (µL), this parameter is not actually used
417 use_file_names_as_ids = True
418
419 # CODE
420 if picklist.endswith(".csv"):
421 csv = picklist.read().decode()
422 rows = [line.split(",") for line in csv.split("\n") if len(line)]
423 else:
424 dataframe = pandas.read_excel(picklist)
425 rows = [row for i, row in dataframe.iterrows()]
426
427 assembly_plan = AssemblyPlan(
428 OrderedDict(
429 [
430 (
431 row[0],
432 [
433 str(e).strip()
434 for e in row[1:]
435 if str(e).strip() not in ["-", "nan", ""]
436 ],
437 )
438 for row in rows
439 if row[0] not in ["nan", "Construct name", "constructs", "construct"]
440 ]
441 )
442 )
443 for assembly, parts in assembly_plan.assemblies.items():
444 assembly_plan.assemblies[assembly] = [part.replace(" ", "_") for part in parts]
445
446 # Reading part infos
447 if not isinstance(parts_dir, list):
448 if parts_dir.endswith((".csv", ".xls", ".xlsx")): # part sizes specified in table
449 if parts_dir.endswith(".csv"):
450 dataframe = pandas.read_csv(parts_dir)
451 else:
452 dataframe = pandas.read_excel(parts_dir)
453 parts_data = {row.part: {"size": row["size"]} for i, row in dataframe.iterrows()}
454 else: # input records
455 records = dnacauldron.biotools.load_records_from_files(
456 files=parts_dir, use_file_names_as_ids=use_file_names_as_ids
457 )
458 parts_data = {rec.id.replace(" ", "_").lower(): {"record": rec} for rec in records}
459 #parts_data = process_parts_with_mapping(records, args.file_name_mapping)
460 assembly_plan.parts_data = parts_data
461 parts_without_data = assembly_plan.parts_without_data()
462 if len(parts_without_data):
463 print("success: False")
464 print("message: Some parts have no provided record or data.")
465 print("missing_parts: ", parts_without_data)
466 sys.exit()
467 # Reading protocol
468 if quantity_unit == "fmol":
469 part_mol = part_quantity * 1e-15
470 part_g = None
471 if quantity_unit == "nM":
472 part_mol = part_quantity * total_volume * 1e-15
473 part_g = None
474 if quantity_unit == "ng":
475 part_mol = None
476 part_g = part_quantity * 1e-9
477 # Backbone:part molar ratio calculation is not performed in this case.
478 # This ensures no change regardless of form input:
479 part_backbone_ratio = 1
480 print("Generating picklist")
481 picklist_generator = AssemblyPicklistGenerator(
482 part_mol=part_mol,
483 part_g=part_g,
484 complement_to=total_volume * 1e-6, # convert uL to L
485 buffer_volume=buffer_volume * 1e-6,
486 volume_rounding=2.5e-9, # not using parameter from form
487 minimal_dispense_volume=5e-9, # Echo machine's minimum dispense -
488 )
489 backbone_name_list = backbone_name.split(",")
490 source_plate = plate_from_content_spreadsheet(source_plate_path)
491
492 for well in source_plate.iter_wells():
493 if well.is_empty:
494 continue
495 quantities = well.content.quantities
496 part, quantity = list(quantities.items())[0]
497 quantities.pop(part)
498 quantities[part.replace(" ", "_")] = quantity
499
500 if part in backbone_name_list:
501 # This section multiplies the backbone concentration with the
502 # part:backbone molar ratio. This tricks the calculator into making
503 # a picklist with the desired ratio.
504 # For example, a part:backbone = 2:1 will multiply the
505 # backbone concentration by 2, therefore half as much of it will be
506 # added to the well.
507 quantities[part.replace(" ", "_")] = quantity * part_backbone_ratio
508 else:
509 quantities[part.replace(" ", "_")] = quantity
510
511 source_plate.name = "Source"
512 if destination_plate:
513 dest_filelike = file_to_filelike_object(destination_plate)
514 destination_plate = plate_from_content_spreadsheet(destination_plate)
515 else:
516 destination_plate = Plate4ti0960("Mixplate")
517 destination_wells = (
518 well for well in destination_plate.iter_wells(direction="column") if well.is_empty
519 )
520 picklist, picklist_data = picklist_generator.make_picklist(
521 assembly_plan,
522 source_wells=source_plate.iter_wells(),
523 destination_wells=destination_wells,
524 )
525 if picklist is None:
526 print("success: False")
527 print("message: Some parts in the assembly plan have no corresponding well.")
528 print("picklist_data: ", picklist_data)
529 print("missing_parts:", picklist_data.get("missing_parts", None))
530 sys.exit()
531
532 future_plates = picklist.simulate(inplace=False)
533
534
535 def text(w):
536 txt = human_volume(w.content.volume)
537 if "construct" in w.data:
538 txt = "\n".join([w.data["construct"], txt])
539 return txt
540
541
542 plotter = PlateTextPlotter(text)
543 ax, _ = plotter.plot_plate(future_plates[destination_plate], figsize=(20, 8))
544
545 ziproot = flametree.file_tree(result_zip_file, replace=True)
546
547 # MIXPLATE MAP PLOT
548 ax.figure.savefig(
549 ziproot._file("final_mixplate.pdf").open("wb"),
550 format="pdf",
551 bbox_inches="tight",
552 )
553 plt.close(ax.figure)
554 plate_to_platemap_spreadsheet(
555 future_plates[destination_plate],
556 lambda w: w.data.get("construct", ""),
557 filepath=ziproot._file("final_mixplate.xls").open("wb"),
558 )
559
560 # ASSEMBLY REPORT
561 print("Writing report...")
562 picklist_to_assembly_mix_report(
563 picklist,
564 ziproot._file("assembly_mix_picklist_report.pdf").open("wb"),
565 data=picklist_data,
566 )
567 assembly_plan.write_report(ziproot._file("assembly_plan_summary.pdf").open("wb"))
568
569 # MACHINE PICKLIST
570
571 if dispenser_machine == "labcyte_echo":
572 picklist_to_labcyte_echo_picklist_file(
573 picklist, ziproot._file("ECHO_picklist.csv").open("w")
574 )
575 else:
576 picklist_to_tecan_evo_picklist_file(
577 picklist, ziproot._file("EVO_picklist.gwl").open("w")
578 )
579 # We'll not write the input source plate.
580 # raw = file_to_filelike_object(source_plate_path).read()
581 # f = ziproot.copy(source_plate_path)
582 # f.write(raw, mode="wb")
583 ziproot._close()
584 print("success: True")
585
586
587 if __name__ == "__main__":
588 main()