view CreateAssemblyPicklists_script.py @ 9:c3951e41e488 draft default tip

planemo upload for repository https://github.com/Edinburgh-Genome-Foundry/Plateo commit ed922f834f891e6048385803c3004465551c911d-dirty
author tduigou
date Thu, 07 Aug 2025 12:37:19 +0000
parents 196e13c09881
children
line wrap: on
line source

#!/usr/bin/env python
# coding: utf-8
# Code copied from CUBA backend tools.py and create_assembly_picklists/CreateAssemblyPicklistsView.py
# Code modified for running in a script in Galaxy.
##############################################################################
##############################################################################
# App code
## EGF Galaxy Create assembly picklists -- script

##############################################################################
# IMPORTS
import argparse
import os
from io import StringIO, BytesIO
import re
from base64 import b64encode, b64decode
from copy import deepcopy
import sys

from collections import OrderedDict
from fuzzywuzzy import process
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
import pandas

from Bio import SeqIO
from Bio.SeqRecord import SeqRecord
from Bio.Seq import Seq

import bandwagon as bw
import crazydoc
from dnachisel.biotools import sequence_to_biopython_record
import dnacauldron
import flametree
from plateo import AssemblyPlan
from plateo.parsers import plate_from_content_spreadsheet
from plateo.containers import Plate4ti0960
from plateo.exporters import AssemblyPicklistGenerator, picklist_to_assembly_mix_report
from plateo.exporters import (
    picklist_to_labcyte_echo_picklist_file,
    picklist_to_tecan_evo_picklist_file,
    plate_to_platemap_spreadsheet,
    PlateTextPlotter,
)
from plateo.tools import human_volume
from snapgene_reader import snapgene_file_to_seqrecord


##############################################################################
# FUNCTIONS

def fix_and_rename_paths(paths):
    fixed_paths = []
    for path in paths:
        new_path = path.replace("__sq__", "'")
        if new_path != path:
            os.rename(path, new_path)
        fixed_paths.append(new_path)
    return fixed_paths


def parse_optional_float(x):
    if x == '':
        return None
    return float(x)


def did_you_mean(name, other_names, limit=5, min_score=50):  # test
    results = process.extract(name, list(other_names), limit=limit)
    return [e for (e, score) in results if score >= min_score]


def fix_ice_genbank(genbank_txt):
    lines = genbank_txt.splitlines()
    lines[0] += max(0, 80 - len(lines[0])) * " "
    return "\n".join(lines)


def write_record(record, target, fmt="genbank"):
    """Write a record as genbank, fasta, etc. via Biopython, with fixes"""
    record = deepcopy(record)
    if fmt == "genbank":
        if isinstance(record, (list, tuple)):
            for r in record:
                r.name = r.name[:20]
        else:
            record.name = record.name[:20]
    if hasattr(target, "open"):
        target = target.open("w")
    SeqIO.write(record, target, fmt)


def autoname_genbank_file(record):
    return record.id.replace(".", "_") + ".gb"


def string_to_records(string):
    """Convert a string of a fasta, genbank... into a simple ATGC string.

    Can also be used to detect a format.
    """
    matches = re.match("([ATGC][ATGC]*)", string)
    # print("============", len(matches.groups()[0]), len(string))
    # print (matches.groups()[0] == string)
    if (matches is not None) and (matches.groups()[0] == string):
        return [SeqRecord(Seq(string))], "ATGC"

    for fmt in ("fasta", "genbank"):
        if fmt == "genbank":
            string = fix_ice_genbank(string)
        try:
            stringio = StringIO(string)
            records = list(SeqIO.parse(stringio, fmt))
            if len(records) > 0:
                return (records, fmt)
        except:
            pass
    try:
        record = snapgene_file_to_seqrecord(filecontent=StringIO(string))
        return [record]
    except:
        pass
    raise ValueError("Invalid sequence format")


def file_to_filelike_object(file_, type="byte"):
    content = file_.content.split("base64,")[1]
    filelike = BytesIO if (type == "byte") else StringIO
    return filelike(b64decode(content))


def spreadsheet_file_to_dataframe(filedict, header="infer"):
    filelike = file_to_filelike_object(filedict)
    if filedict.name.endswith(".csv"):
        return pandas.read_csv(filelike, header=header)
    else:
        return pandas.read_excel(filelike, header=header)


def records_from_zip_file(zip_file, use_file_names_as_ids=False):
    zip_name = zip_file.name
    zip_file = flametree.file_tree(file_to_filelike_object(zip_file))
    records = []
    for f in zip_file._all_files:
        ext = f._extension.lower()
        if ext in ["gb", "gbk", "fa", "dna"]:
            try:
                new_records, fmt = string_to_records(f.read())
                if not isinstance(new_records, list):
                    new_records = [new_records]
            except:
                content_stream = BytesIO(f.read("rb"))
                try:
                    record = snapgene_file_to_seqrecord(fileobject=content_stream)
                    new_records, fmt = [record], "snapgene"
                except:
                    try:
                        parser = crazydoc.CrazydocParser(
                            ["highlight_color", "bold", "underline"]
                        )
                        new_records = parser.parse_doc_file(content_stream)
                        fmt = "doc"
                    except:
                        raise ValueError("Format not recognized for file " + f._path)

            single_record = len(new_records) == 1
            for i, record in enumerate(new_records):
                name = record.id
                if name in [
                    None,
                    "",
                    "<unknown id>",
                    ".",
                    " ",
                    "<unknown name>",
                ]:
                    number = "" if single_record else ("%04d" % i)
                    name = f._name_no_extension.replace(" ", "_") + number
                record.id = name
                record.name = name
                record.file_name = f._name_no_extension
                record.zip_file_name = zip_name
                if use_file_names_as_ids and single_record:
                    basename = os.path.basename(record.file_name)
                    basename_no_extension = os.path.splitext(basename)[0]
                    record.id = basename_no_extension
            records += new_records
    return records


def records_from_data_file(data_file):
    content = b64decode(data_file.content.split("base64,")[1])
    try:
        records, fmt = string_to_records(content.decode("utf-8"))
    except:
        try:
            record = snapgene_file_to_seqrecord(fileobject=BytesIO(content))
            records, fmt = [record], "snapgene"
        except:
            try:
                parser = crazydoc.CrazydocParser(
                    ["highlight_color", "bold", "underline"]
                )
                records = parser.parse_doc_file(BytesIO(content))
                fmt = "doc"
            except:
                try:
                    df = spreadsheet_file_to_dataframe(data_file, header=None)
                    records = [
                        sequence_to_biopython_record(sequence=seq, id=name, name=name)
                        for name, seq in df.values
                    ]
                    fmt = "spreadsheet"
                except:
                    raise ValueError("Format not recognized for file " + data_file.name)
    if not isinstance(records, list):
        records = [records]
    return records, fmt


def record_to_formated_string(record, fmt="genbank", remove_descr=False):
    if remove_descr:
        record = deepcopy(record)
        if isinstance(record, (list, tuple)):
            for r in record:
                r.description = ""
        else:
            record.description = ""
    fileobject = StringIO()
    write_record(record, fileobject, fmt)
    return fileobject.getvalue().encode("utf-8")


def records_from_data_files(data_files, use_file_names_as_ids=False):
    records = []
    for file_ in data_files:
        circular = ("circular" not in file_) or file_.circular
        if file_.name.lower().endswith("zip"):
            records += records_from_zip_file(
                file_, use_file_names_as_ids=use_file_names_as_ids
            )
            continue
        recs, fmt = records_from_data_file(file_)
        single_record = len(recs) == 1
        for i, record in enumerate(recs):
            record.circular = circular
            record.linear = not circular
            name_no_extension = "".join(file_.name.split(".")[:-1])
            name = name_no_extension + ("" if single_record else ("%04d" % i))
            name = name.replace(" ", "_")
            UNKNOWN_IDS = [
                "None",
                "",
                "<unknown id>",
                ".",
                "EXPORTED",
                "<unknown name>",
                "Exported",
            ]
            # Sorry for this parts, it took a lot of "whatever works".
            # keep your part names under 20c and pointless, and everything
            # will be good
            if str(record.id).strip() in UNKNOWN_IDS:
                record.id = name
            if str(record.name).strip() in UNKNOWN_IDS:
                record.name = name
            record.file_name = name_no_extension
            if use_file_names_as_ids and single_record:
                basename = os.path.basename(record.source_file)
                basename_no_extension = os.path.splitext(basename)[0]
                record.id = basename_no_extension
        records += recs
    return records


def data_to_html_data(data, datatype, filename=None):
    """Data types: zip, genbank, fasta, pdf"""
    datatype = {
        "zip": "application/zip",
        "genbank": "application/genbank",
        "fasta": "application/fasta",
        "pdf": "application/pdf",
        "xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
    }.get(datatype, datatype)
    datatype = "data:%s;" % datatype
    data64 = "base64,%s" % b64encode(data).decode("utf-8")
    headers = ""
    if filename is not None:
        headers += "headers=filename%3D" + filename + ";"
    return datatype + headers + data64


def zip_data_to_html_data(data):
    return data_to_html_data(data, "application/zip")


LADDERS = {"100_to_4k": bw.ladders.LADDER_100_to_4k}


def matplotlib_figure_to_svg_base64_data(fig, **kwargs):
    """Return a string of the form 'data:image/svg+xml;base64,XXX' where XXX
    is the base64-encoded svg version of the figure."""
    output = BytesIO()
    fig.savefig(output, format="svg", **kwargs)
    svg_txt = output.getvalue().decode("utf-8")
    svg_txt = "\n".join(svg_txt.split("\n")[4:])
    svg_txt = "".join(svg_txt.split("\n"))

    content = b64encode(svg_txt.encode("utf-8"))
    result = (b"data:image/svg+xml;base64," + content).decode("utf-8")

    return result


def matplotlib_figure_to_bitmap_base64_data(fig, fmt="png", **kwargs):
    """Return a string of the form 'data:image/png;base64,XXX' where XXX
    is the base64-encoded svg version of the figure."""
    output = BytesIO()
    fig.savefig(output, format=fmt, **kwargs)
    bitmap = output.getvalue()
    content = b64encode(bitmap)
    result = (b"data:image/%s;base64,%s" % (fmt.encode("utf-8"), content)).decode(
        "utf-8"
    )
    return result


def figures_to_pdf_report_data(figures, filename="report.pdf"):
    pdf_io = BytesIO()
    with PdfPages(pdf_io) as pdf:
        for fig in figures:
            pdf.savefig(fig, bbox_inches="tight")
    return {
        "data": (
            "data:application/pdf;base64,"
            + b64encode(pdf_io.getvalue()).decode("utf-8")
        ),
        "name": filename,
        "mimetype": "application/pdf",
    }


def csv_to_list(csv_string, sep=","):
    return [
        element.strip()
        for line in csv_string.split("\n")
        for element in line.split(sep)
        if len(element.strip())
    ]


def set_record_topology(record, topology):
    """Set the Biopython record's topology, possibly passing if already set.

    This actually sets the ``record.annotations['topology']``.The ``topology``
    parameter can be "circular", "linear", "default_to_circular" (will default
    to circular if ``annotations['topology']`` is not already set) or
    "default_to_linear".
    """
    valid_topologies = [
        "circular",
        "linear",
        "default_to_circular",
        "default_to_linear",
    ]
    if topology not in valid_topologies:
        raise ValueError(
            "topology (%s) should be one of %s."
            % (topology, ", ".join(valid_topologies))
        )
    annotations = record.annotations
    default_prefix = "default_to_"
    if topology.startswith(default_prefix):
        if "topology" not in annotations:
            annotations["topology"] = topology[len(default_prefix) :]
    else:
        annotations["topology"] = topology


##############################################################################
def main():

    parser = argparse.ArgumentParser(description="Generate picklist for DNA assembly.")
    parser.add_argument("--parts_files", help="Directory with parts data or file with part sizes")
    parser.add_argument("--picklist", type=str, help="Path to the assembly plan CSV or Excel file")
    parser.add_argument("--source_plate", help="Source plate file (CSV or Excel)")
    parser.add_argument("--backbone_name", required=False, help="Name of the backbone")
    parser.add_argument("--result_zip", help="Name of the output zip file")
    parser.add_argument("--part_backbone_ratio", type=parse_optional_float, required=False, help="Part to backbone molar ratio")
    parser.add_argument("--quantity_unit", choices=["fmol", "nM", "ng"], help="Quantity unit")
    parser.add_argument("--part_quantity", type=float, help="Quantity of each part")
    parser.add_argument("--buffer_volume", type=float, help="Buffer volume in µL")
    parser.add_argument("--total_volume", type=float, help="Total reaction volume in µL")
    parser.add_argument("--dispenser", choices=["labcyte_echo", "tecan_evo"], help="Dispenser machine")

    args = parser.parse_args()

    # Parameters:
    picklist = args.picklist  # assembly plan
    # directory or can be a csv/Excel with part sizes
    if isinstance(args.parts_files, str):
        args.parts_files = args.parts_files.split(",")
    parts_dir = fix_and_rename_paths(args.parts_files)
    source_plate_path = args.source_plate
    backbone_name = args.backbone_name
    part_backbone_ratio = args.part_backbone_ratio
    result_zip_file = args.result_zip  # output file name "picklist.zip"
    ##############################################################################
    # Defaults:
    destination_plate = None
    destination_type = "new"  # this parameter is not actually used
    destination_size = 96  # this parameter is not actually used
    fill_by = "column"  # this parameter is not actually used
    quantity_unit = args.quantity_unit
    part_quantity = args.part_quantity # 1.3
    buffer_volume = args.buffer_volume # 0.3  # (µL)
    total_volume = args.total_volume # 1  # (µL)
    dispenser_machine = args.dispenser
    dispenser_min_volume = 0.5  # (nL), this parameter is not actually used
    dispenser_max_volume = 5  # (µL), this parameter is not actually used
    dispenser_resolution = 2.5  # (nL), this parameter is not actually used
    dispenser_dead_volume = 8  # (µL), this parameter is not actually used
    use_file_names_as_ids = True

    # CODE
    if picklist.endswith(".csv"):
        csv = picklist.read().decode()
        rows = [line.split(",") for line in csv.split("\n") if len(line)]
    else:
        dataframe = pandas.read_excel(picklist)
        rows = [row for i, row in dataframe.iterrows()]

    assembly_plan = AssemblyPlan(
        OrderedDict(
            [
                (
                    row[0],
                    [
                        str(e).strip()
                        for e in row[1:]
                        if str(e).strip() not in ["-", "nan", ""]
                    ],
                )
                for row in rows
                if row[0] not in ["nan", "Construct name", "constructs", "construct"]
            ]
        )
    )
    for assembly, parts in assembly_plan.assemblies.items():
        assembly_plan.assemblies[assembly] = [part.replace(" ", "_") for part in parts]

    # Reading part infos
    if not isinstance(parts_dir, list):
        if parts_dir.endswith((".csv", ".xls", ".xlsx")):  # part sizes specified in table
            if parts_dir.endswith(".csv"):
                dataframe = pandas.read_csv(parts_dir)
            else:
                dataframe = pandas.read_excel(parts_dir)
            parts_data = {row.part: {"size": row["size"]} for i, row in dataframe.iterrows()}
    else:  # input records
        records = dnacauldron.biotools.load_records_from_files(
            files=parts_dir, use_file_names_as_ids=use_file_names_as_ids
        )
        parts_data = {rec.id.replace(" ", "_").lower(): {"record": rec} for rec in records}
        #parts_data = process_parts_with_mapping(records, args.file_name_mapping)
    assembly_plan.parts_data = parts_data
    parts_without_data = assembly_plan.parts_without_data()
    if len(parts_without_data):
        print("success: False")
        print("message: Some parts have no provided record or data.")
        print("missing_parts: ", parts_without_data)
        sys.exit()
    # Reading protocol
    if quantity_unit == "fmol":
        part_mol = part_quantity * 1e-15
        part_g = None
    if quantity_unit == "nM":
        part_mol = part_quantity * total_volume * 1e-15
        part_g = None
    if quantity_unit == "ng":
        part_mol = None
        part_g = part_quantity * 1e-9
        # Backbone:part molar ratio calculation is not performed in this case.
        # This ensures no change regardless of form input:
        part_backbone_ratio = 1
    print("Generating picklist")
    picklist_generator = AssemblyPicklistGenerator(
        part_mol=part_mol,
        part_g=part_g,
        complement_to=total_volume * 1e-6,  # convert uL to L
        buffer_volume=buffer_volume * 1e-6,
        volume_rounding=2.5e-9,  # not using parameter from form
        minimal_dispense_volume=5e-9,  # Echo machine's minimum dispense -
    )
    if backbone_name != '' and backbone_name != 'Non':
        backbone_name_list = backbone_name.split(",")
    source_plate = plate_from_content_spreadsheet(source_plate_path)

    for well in source_plate.iter_wells():
        if well.is_empty:
            continue
        quantities = well.content.quantities
        part, quantity = list(quantities.items())[0]
        quantities.pop(part)
        quantities[part.replace(" ", "_")] = quantity

        if backbone_name != '' and backbone_name != 'Non':
            if part in backbone_name_list:
                # This section multiplies the backbone concentration with the
                # part:backbone molar ratio. This tricks the calculator into making
                # a picklist with the desired ratio.
                # For example, a part:backbone = 2:1 will multiply the
                # backbone concentration by 2, therefore half as much of it will be
                # added to the well.
                quantities[part.replace(" ", "_")] = quantity * part_backbone_ratio
            else:
                quantities[part.replace(" ", "_")] = quantity

    source_plate.name = "Source"
    if destination_plate:
        dest_filelike = file_to_filelike_object(destination_plate)
        destination_plate = plate_from_content_spreadsheet(destination_plate)
    else:
        destination_plate = Plate4ti0960("Mixplate")
    destination_wells = (
        well for well in destination_plate.iter_wells(direction="column") if well.is_empty
    )
    picklist, picklist_data = picklist_generator.make_picklist(
        assembly_plan,
        source_wells=source_plate.iter_wells(),
        destination_wells=destination_wells,
    )
    if picklist is None:
        print("success: False")
        print("message: Some parts in the assembly plan have no corresponding well.")
        print("picklist_data: ", picklist_data)
        print("missing_parts:", picklist_data.get("missing_parts", None))
        sys.exit()

    future_plates = picklist.simulate(inplace=False)


    def text(w):
        txt = human_volume(w.content.volume)
        if "construct" in w.data:
            txt = "\n".join([w.data["construct"], txt])
        return txt


    plotter = PlateTextPlotter(text)
    ax, _ = plotter.plot_plate(future_plates[destination_plate], figsize=(20, 8))

    ziproot = flametree.file_tree(result_zip_file, replace=True)

    # MIXPLATE MAP PLOT
    ax.figure.savefig(
        ziproot._file("final_mixplate.pdf").open("wb"),
        format="pdf",
        bbox_inches="tight",
    )
    plt.close(ax.figure)
    plate_to_platemap_spreadsheet(
        future_plates[destination_plate],
        lambda w: w.data.get("construct", ""),
        filepath=ziproot._file("final_mixplate.xls").open("wb"),
    )

    # ASSEMBLY REPORT
    print("Writing report...")
    picklist_to_assembly_mix_report(
        picklist,
        ziproot._file("assembly_mix_picklist_report.pdf").open("wb"),
        data=picklist_data,
    )
    assembly_plan.write_report(ziproot._file("assembly_plan_summary.pdf").open("wb"))

    # MACHINE PICKLIST

    if dispenser_machine == "labcyte_echo":
        picklist_to_labcyte_echo_picklist_file(
            picklist, ziproot._file("ECHO_picklist.csv").open("w")
        )
    else:
        picklist_to_tecan_evo_picklist_file(
            picklist, ziproot._file("EVO_picklist.gwl").open("w")
        )
    # We'll not write the input source plate.
    # raw = file_to_filelike_object(source_plate_path).read()
    # f = ziproot.copy(source_plate_path)
    # f.write(raw, mode="wb")
    ziproot._close()
    print("success: True")
    

if __name__ == "__main__":
    main()