diff CreateAssemblyPicklists_script.py @ 0:4bde3e90ee98 draft

planemo upload for repository https://github.com/Edinburgh-Genome-Foundry/Plateo commit 98d5e65b8008dbca117b2e0655cfdd54655fac48-dirty
author tduigou
date Wed, 06 Aug 2025 08:02:58 +0000
parents
children 196e13c09881
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/CreateAssemblyPicklists_script.py	Wed Aug 06 08:02:58 2025 +0000
@@ -0,0 +1,588 @@
+#!/usr/bin/env python
+# coding: utf-8
+# Code copied from CUBA backend tools.py and create_assembly_picklists/CreateAssemblyPicklistsView.py
+# Code modified for running in a script in Galaxy.
+##############################################################################
+##############################################################################
+# App code
+## EGF Galaxy Create assembly picklists -- script
+
+##############################################################################
+# IMPORTS
+import argparse
+import os
+from io import StringIO, BytesIO
+import re
+from base64 import b64encode, b64decode
+from copy import deepcopy
+import sys
+
+from collections import OrderedDict
+from fuzzywuzzy import process
+import matplotlib.pyplot as plt
+from matplotlib.backends.backend_pdf import PdfPages
+import pandas
+
+from Bio import SeqIO
+from Bio.SeqRecord import SeqRecord
+from Bio.Seq import Seq
+
+import bandwagon as bw
+import crazydoc
+from dnachisel.biotools import sequence_to_biopython_record
+import dnacauldron
+import flametree
+from plateo import AssemblyPlan
+from plateo.parsers import plate_from_content_spreadsheet
+from plateo.containers import Plate4ti0960
+from plateo.exporters import AssemblyPicklistGenerator, picklist_to_assembly_mix_report
+from plateo.exporters import (
+    picklist_to_labcyte_echo_picklist_file,
+    picklist_to_tecan_evo_picklist_file,
+    plate_to_platemap_spreadsheet,
+    PlateTextPlotter,
+)
+from plateo.tools import human_volume
+from snapgene_reader import snapgene_file_to_seqrecord
+
+
+##############################################################################
+# FUNCTIONS
+
+def fix_and_rename_paths(paths):
+    fixed_paths = []
+    for path in paths:
+        new_path = path.replace("__sq__", "'")
+        if new_path != path:
+            os.rename(path, new_path)
+        fixed_paths.append(new_path)
+    return fixed_paths
+
+
+def did_you_mean(name, other_names, limit=5, min_score=50):  # test
+    results = process.extract(name, list(other_names), limit=limit)
+    return [e for (e, score) in results if score >= min_score]
+
+
+def fix_ice_genbank(genbank_txt):
+    lines = genbank_txt.splitlines()
+    lines[0] += max(0, 80 - len(lines[0])) * " "
+    return "\n".join(lines)
+
+
+def write_record(record, target, fmt="genbank"):
+    """Write a record as genbank, fasta, etc. via Biopython, with fixes"""
+    record = deepcopy(record)
+    if fmt == "genbank":
+        if isinstance(record, (list, tuple)):
+            for r in record:
+                r.name = r.name[:20]
+        else:
+            record.name = record.name[:20]
+    if hasattr(target, "open"):
+        target = target.open("w")
+    SeqIO.write(record, target, fmt)
+
+
+def autoname_genbank_file(record):
+    return record.id.replace(".", "_") + ".gb"
+
+
+def string_to_records(string):
+    """Convert a string of a fasta, genbank... into a simple ATGC string.
+
+    Can also be used to detect a format.
+    """
+    matches = re.match("([ATGC][ATGC]*)", string)
+    # print("============", len(matches.groups()[0]), len(string))
+    # print (matches.groups()[0] == string)
+    if (matches is not None) and (matches.groups()[0] == string):
+        return [SeqRecord(Seq(string))], "ATGC"
+
+    for fmt in ("fasta", "genbank"):
+        if fmt == "genbank":
+            string = fix_ice_genbank(string)
+        try:
+            stringio = StringIO(string)
+            records = list(SeqIO.parse(stringio, fmt))
+            if len(records) > 0:
+                return (records, fmt)
+        except:
+            pass
+    try:
+        record = snapgene_file_to_seqrecord(filecontent=StringIO(string))
+        return [record]
+    except:
+        pass
+    raise ValueError("Invalid sequence format")
+
+
+def file_to_filelike_object(file_, type="byte"):
+    content = file_.content.split("base64,")[1]
+    filelike = BytesIO if (type == "byte") else StringIO
+    return filelike(b64decode(content))
+
+
+def spreadsheet_file_to_dataframe(filedict, header="infer"):
+    filelike = file_to_filelike_object(filedict)
+    if filedict.name.endswith(".csv"):
+        return pandas.read_csv(filelike, header=header)
+    else:
+        return pandas.read_excel(filelike, header=header)
+
+
+def records_from_zip_file(zip_file, use_file_names_as_ids=False):
+    zip_name = zip_file.name
+    zip_file = flametree.file_tree(file_to_filelike_object(zip_file))
+    records = []
+    for f in zip_file._all_files:
+        ext = f._extension.lower()
+        if ext in ["gb", "gbk", "fa", "dna"]:
+            try:
+                new_records, fmt = string_to_records(f.read())
+                if not isinstance(new_records, list):
+                    new_records = [new_records]
+            except:
+                content_stream = BytesIO(f.read("rb"))
+                try:
+                    record = snapgene_file_to_seqrecord(fileobject=content_stream)
+                    new_records, fmt = [record], "snapgene"
+                except:
+                    try:
+                        parser = crazydoc.CrazydocParser(
+                            ["highlight_color", "bold", "underline"]
+                        )
+                        new_records = parser.parse_doc_file(content_stream)
+                        fmt = "doc"
+                    except:
+                        raise ValueError("Format not recognized for file " + f._path)
+
+            single_record = len(new_records) == 1
+            for i, record in enumerate(new_records):
+                name = record.id
+                if name in [
+                    None,
+                    "",
+                    "<unknown id>",
+                    ".",
+                    " ",
+                    "<unknown name>",
+                ]:
+                    number = "" if single_record else ("%04d" % i)
+                    name = f._name_no_extension.replace(" ", "_") + number
+                record.id = name
+                record.name = name
+                record.file_name = f._name_no_extension
+                record.zip_file_name = zip_name
+                if use_file_names_as_ids and single_record:
+                    basename = os.path.basename(record.file_name)
+                    basename_no_extension = os.path.splitext(basename)[0]
+                    record.id = basename_no_extension
+            records += new_records
+    return records
+
+
+def records_from_data_file(data_file):
+    content = b64decode(data_file.content.split("base64,")[1])
+    try:
+        records, fmt = string_to_records(content.decode("utf-8"))
+    except:
+        try:
+            record = snapgene_file_to_seqrecord(fileobject=BytesIO(content))
+            records, fmt = [record], "snapgene"
+        except:
+            try:
+                parser = crazydoc.CrazydocParser(
+                    ["highlight_color", "bold", "underline"]
+                )
+                records = parser.parse_doc_file(BytesIO(content))
+                fmt = "doc"
+            except:
+                try:
+                    df = spreadsheet_file_to_dataframe(data_file, header=None)
+                    records = [
+                        sequence_to_biopython_record(sequence=seq, id=name, name=name)
+                        for name, seq in df.values
+                    ]
+                    fmt = "spreadsheet"
+                except:
+                    raise ValueError("Format not recognized for file " + data_file.name)
+    if not isinstance(records, list):
+        records = [records]
+    return records, fmt
+
+
+def record_to_formated_string(record, fmt="genbank", remove_descr=False):
+    if remove_descr:
+        record = deepcopy(record)
+        if isinstance(record, (list, tuple)):
+            for r in record:
+                r.description = ""
+        else:
+            record.description = ""
+    fileobject = StringIO()
+    write_record(record, fileobject, fmt)
+    return fileobject.getvalue().encode("utf-8")
+
+
+def records_from_data_files(data_files, use_file_names_as_ids=False):
+    records = []
+    for file_ in data_files:
+        circular = ("circular" not in file_) or file_.circular
+        if file_.name.lower().endswith("zip"):
+            records += records_from_zip_file(
+                file_, use_file_names_as_ids=use_file_names_as_ids
+            )
+            continue
+        recs, fmt = records_from_data_file(file_)
+        single_record = len(recs) == 1
+        for i, record in enumerate(recs):
+            record.circular = circular
+            record.linear = not circular
+            name_no_extension = "".join(file_.name.split(".")[:-1])
+            name = name_no_extension + ("" if single_record else ("%04d" % i))
+            name = name.replace(" ", "_")
+            UNKNOWN_IDS = [
+                "None",
+                "",
+                "<unknown id>",
+                ".",
+                "EXPORTED",
+                "<unknown name>",
+                "Exported",
+            ]
+            # Sorry for this parts, it took a lot of "whatever works".
+            # keep your part names under 20c and pointless, and everything
+            # will be good
+            if str(record.id).strip() in UNKNOWN_IDS:
+                record.id = name
+            if str(record.name).strip() in UNKNOWN_IDS:
+                record.name = name
+            record.file_name = name_no_extension
+            if use_file_names_as_ids and single_record:
+                basename = os.path.basename(record.source_file)
+                basename_no_extension = os.path.splitext(basename)[0]
+                record.id = basename_no_extension
+        records += recs
+    return records
+
+
+def data_to_html_data(data, datatype, filename=None):
+    """Data types: zip, genbank, fasta, pdf"""
+    datatype = {
+        "zip": "application/zip",
+        "genbank": "application/genbank",
+        "fasta": "application/fasta",
+        "pdf": "application/pdf",
+        "xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
+    }.get(datatype, datatype)
+    datatype = "data:%s;" % datatype
+    data64 = "base64,%s" % b64encode(data).decode("utf-8")
+    headers = ""
+    if filename is not None:
+        headers += "headers=filename%3D" + filename + ";"
+    return datatype + headers + data64
+
+
+def zip_data_to_html_data(data):
+    return data_to_html_data(data, "application/zip")
+
+
+LADDERS = {"100_to_4k": bw.ladders.LADDER_100_to_4k}
+
+
+def matplotlib_figure_to_svg_base64_data(fig, **kwargs):
+    """Return a string of the form 'data:image/svg+xml;base64,XXX' where XXX
+    is the base64-encoded svg version of the figure."""
+    output = BytesIO()
+    fig.savefig(output, format="svg", **kwargs)
+    svg_txt = output.getvalue().decode("utf-8")
+    svg_txt = "\n".join(svg_txt.split("\n")[4:])
+    svg_txt = "".join(svg_txt.split("\n"))
+
+    content = b64encode(svg_txt.encode("utf-8"))
+    result = (b"data:image/svg+xml;base64," + content).decode("utf-8")
+
+    return result
+
+
+def matplotlib_figure_to_bitmap_base64_data(fig, fmt="png", **kwargs):
+    """Return a string of the form 'data:image/png;base64,XXX' where XXX
+    is the base64-encoded svg version of the figure."""
+    output = BytesIO()
+    fig.savefig(output, format=fmt, **kwargs)
+    bitmap = output.getvalue()
+    content = b64encode(bitmap)
+    result = (b"data:image/%s;base64,%s" % (fmt.encode("utf-8"), content)).decode(
+        "utf-8"
+    )
+    return result
+
+
+def figures_to_pdf_report_data(figures, filename="report.pdf"):
+    pdf_io = BytesIO()
+    with PdfPages(pdf_io) as pdf:
+        for fig in figures:
+            pdf.savefig(fig, bbox_inches="tight")
+    return {
+        "data": (
+            "data:application/pdf;base64,"
+            + b64encode(pdf_io.getvalue()).decode("utf-8")
+        ),
+        "name": filename,
+        "mimetype": "application/pdf",
+    }
+
+
+def csv_to_list(csv_string, sep=","):
+    return [
+        element.strip()
+        for line in csv_string.split("\n")
+        for element in line.split(sep)
+        if len(element.strip())
+    ]
+
+
+def set_record_topology(record, topology):
+    """Set the Biopython record's topology, possibly passing if already set.
+
+    This actually sets the ``record.annotations['topology']``.The ``topology``
+    parameter can be "circular", "linear", "default_to_circular" (will default
+    to circular if ``annotations['topology']`` is not already set) or
+    "default_to_linear".
+    """
+    valid_topologies = [
+        "circular",
+        "linear",
+        "default_to_circular",
+        "default_to_linear",
+    ]
+    if topology not in valid_topologies:
+        raise ValueError(
+            "topology (%s) should be one of %s."
+            % (topology, ", ".join(valid_topologies))
+        )
+    annotations = record.annotations
+    default_prefix = "default_to_"
+    if topology.startswith(default_prefix):
+        if "topology" not in annotations:
+            annotations["topology"] = topology[len(default_prefix) :]
+    else:
+        annotations["topology"] = topology
+
+
+##############################################################################
+def main():
+
+    parser = argparse.ArgumentParser(description="Generate picklist for DNA assembly.")
+    parser.add_argument("--parts_files", help="Directory with parts data or file with part sizes")
+    parser.add_argument("--picklist", type=str, help="Path to the assembly plan CSV or Excel file")
+    parser.add_argument("--source_plate", help="Source plate file (CSV or Excel)")
+    parser.add_argument("--backbone_name", help="Name of the backbone")
+    parser.add_argument("--result_zip", help="Name of the output zip file")
+    parser.add_argument("--part_backbone_ratio", type=float, help="Part to backbone molar ratio")
+    parser.add_argument("--quantity_unit", choices=["fmol", "nM", "ng"], help="Quantity unit")
+    parser.add_argument("--part_quantity", type=float, help="Quantity of each part")
+    parser.add_argument("--buffer_volume", type=float, help="Buffer volume in µL")
+    parser.add_argument("--total_volume", type=float, help="Total reaction volume in µL")
+    parser.add_argument("--dispenser", choices=["labcyte_echo", "tecan_evo"], help="Dispenser machine")
+
+    args = parser.parse_args()
+
+    # Parameters:
+    picklist = args.picklist  # assembly plan
+    # directory or can be a csv/Excel with part sizes
+    if isinstance(args.parts_files, str):
+        args.parts_files = args.parts_files.split(",")
+    parts_dir = fix_and_rename_paths(args.parts_files)
+    source_plate_path = args.source_plate
+    backbone_name = args.backbone_name
+    part_backbone_ratio = args.part_backbone_ratio
+    result_zip_file = args.result_zip  # output file name "picklist.zip"
+    ##############################################################################
+    # Defaults:
+    destination_plate = None
+    destination_type = "new"  # this parameter is not actually used
+    destination_size = 96  # this parameter is not actually used
+    fill_by = "column"  # this parameter is not actually used
+    quantity_unit = args.quantity_unit
+    part_quantity = args.part_quantity # 1.3
+    buffer_volume = args.buffer_volume # 0.3  # (µL)
+    total_volume = args.total_volume # 1  # (µL)
+    dispenser_machine = args.dispenser
+    dispenser_min_volume = 0.5  # (nL), this parameter is not actually used
+    dispenser_max_volume = 5  # (µL), this parameter is not actually used
+    dispenser_resolution = 2.5  # (nL), this parameter is not actually used
+    dispenser_dead_volume = 8  # (µL), this parameter is not actually used
+    use_file_names_as_ids = True
+
+    # CODE
+    if picklist.endswith(".csv"):
+        csv = picklist.read().decode()
+        rows = [line.split(",") for line in csv.split("\n") if len(line)]
+    else:
+        dataframe = pandas.read_excel(picklist)
+        rows = [row for i, row in dataframe.iterrows()]
+
+    assembly_plan = AssemblyPlan(
+        OrderedDict(
+            [
+                (
+                    row[0],
+                    [
+                        str(e).strip()
+                        for e in row[1:]
+                        if str(e).strip() not in ["-", "nan", ""]
+                    ],
+                )
+                for row in rows
+                if row[0] not in ["nan", "Construct name", "constructs", "construct"]
+            ]
+        )
+    )
+    for assembly, parts in assembly_plan.assemblies.items():
+        assembly_plan.assemblies[assembly] = [part.replace(" ", "_") for part in parts]
+
+    # Reading part infos
+    if not isinstance(parts_dir, list):
+        if parts_dir.endswith((".csv", ".xls", ".xlsx")):  # part sizes specified in table
+            if parts_dir.endswith(".csv"):
+                dataframe = pandas.read_csv(parts_dir)
+            else:
+                dataframe = pandas.read_excel(parts_dir)
+            parts_data = {row.part: {"size": row["size"]} for i, row in dataframe.iterrows()}
+    else:  # input records
+        records = dnacauldron.biotools.load_records_from_files(
+            files=parts_dir, use_file_names_as_ids=use_file_names_as_ids
+        )
+        parts_data = {rec.id.replace(" ", "_").lower(): {"record": rec} for rec in records}
+        #parts_data = process_parts_with_mapping(records, args.file_name_mapping)
+    assembly_plan.parts_data = parts_data
+    parts_without_data = assembly_plan.parts_without_data()
+    if len(parts_without_data):
+        print("success: False")
+        print("message: Some parts have no provided record or data.")
+        print("missing_parts: ", parts_without_data)
+        sys.exit()
+    # Reading protocol
+    if quantity_unit == "fmol":
+        part_mol = part_quantity * 1e-15
+        part_g = None
+    if quantity_unit == "nM":
+        part_mol = part_quantity * total_volume * 1e-15
+        part_g = None
+    if quantity_unit == "ng":
+        part_mol = None
+        part_g = part_quantity * 1e-9
+        # Backbone:part molar ratio calculation is not performed in this case.
+        # This ensures no change regardless of form input:
+        part_backbone_ratio = 1
+    print("Generating picklist")
+    picklist_generator = AssemblyPicklistGenerator(
+        part_mol=part_mol,
+        part_g=part_g,
+        complement_to=total_volume * 1e-6,  # convert uL to L
+        buffer_volume=buffer_volume * 1e-6,
+        volume_rounding=2.5e-9,  # not using parameter from form
+        minimal_dispense_volume=5e-9,  # Echo machine's minimum dispense -
+    )
+    backbone_name_list = backbone_name.split(",")
+    source_plate = plate_from_content_spreadsheet(source_plate_path)
+
+    for well in source_plate.iter_wells():
+        if well.is_empty:
+            continue
+        quantities = well.content.quantities
+        part, quantity = list(quantities.items())[0]
+        quantities.pop(part)
+        quantities[part.replace(" ", "_")] = quantity
+
+        if part in backbone_name_list:
+            # This section multiplies the backbone concentration with the
+            # part:backbone molar ratio. This tricks the calculator into making
+            # a picklist with the desired ratio.
+            # For example, a part:backbone = 2:1 will multiply the
+            # backbone concentration by 2, therefore half as much of it will be
+            # added to the well.
+            quantities[part.replace(" ", "_")] = quantity * part_backbone_ratio
+        else:
+            quantities[part.replace(" ", "_")] = quantity
+
+    source_plate.name = "Source"
+    if destination_plate:
+        dest_filelike = file_to_filelike_object(destination_plate)
+        destination_plate = plate_from_content_spreadsheet(destination_plate)
+    else:
+        destination_plate = Plate4ti0960("Mixplate")
+    destination_wells = (
+        well for well in destination_plate.iter_wells(direction="column") if well.is_empty
+    )
+    picklist, picklist_data = picklist_generator.make_picklist(
+        assembly_plan,
+        source_wells=source_plate.iter_wells(),
+        destination_wells=destination_wells,
+    )
+    if picklist is None:
+        print("success: False")
+        print("message: Some parts in the assembly plan have no corresponding well.")
+        print("picklist_data: ", picklist_data)
+        print("missing_parts:", picklist_data.get("missing_parts", None))
+        sys.exit()
+
+    future_plates = picklist.simulate(inplace=False)
+
+
+    def text(w):
+        txt = human_volume(w.content.volume)
+        if "construct" in w.data:
+            txt = "\n".join([w.data["construct"], txt])
+        return txt
+
+
+    plotter = PlateTextPlotter(text)
+    ax, _ = plotter.plot_plate(future_plates[destination_plate], figsize=(20, 8))
+
+    ziproot = flametree.file_tree(result_zip_file, replace=True)
+
+    # MIXPLATE MAP PLOT
+    ax.figure.savefig(
+        ziproot._file("final_mixplate.pdf").open("wb"),
+        format="pdf",
+        bbox_inches="tight",
+    )
+    plt.close(ax.figure)
+    plate_to_platemap_spreadsheet(
+        future_plates[destination_plate],
+        lambda w: w.data.get("construct", ""),
+        filepath=ziproot._file("final_mixplate.xls").open("wb"),
+    )
+
+    # ASSEMBLY REPORT
+    print("Writing report...")
+    picklist_to_assembly_mix_report(
+        picklist,
+        ziproot._file("assembly_mix_picklist_report.pdf").open("wb"),
+        data=picklist_data,
+    )
+    assembly_plan.write_report(ziproot._file("assembly_plan_summary.pdf").open("wb"))
+
+    # MACHINE PICKLIST
+
+    if dispenser_machine == "labcyte_echo":
+        picklist_to_labcyte_echo_picklist_file(
+            picklist, ziproot._file("ECHO_picklist.csv").open("w")
+        )
+    else:
+        picklist_to_tecan_evo_picklist_file(
+            picklist, ziproot._file("EVO_picklist.gwl").open("w")
+        )
+    # We'll not write the input source plate.
+    # raw = file_to_filelike_object(source_plate_path).read()
+    # f = ziproot.copy(source_plate_path)
+    # f.write(raw, mode="wb")
+    ziproot._close()
+    print("success: True")
+    
+
+if __name__ == "__main__":
+    main()