comparison cloning_simulation.py @ 16:fbb241adf6c2 draft default tip

planemo upload for repository https://github.com/Edinburgh-Genome-Foundry/DnaCauldron/tree/master commit af45e5e0e81535ab0423b0bcff8b5b220bb9b4d0-dirty
author tduigou
date Thu, 17 Jul 2025 10:17:24 +0000
parents 16ccb36aa8e3
children
comparison
equal deleted inserted replaced
15:074a76e84b80 16:fbb241adf6c2
1 import argparse
1 import os 2 import os
3 import json
4 import zipfile
5 import pandas
2 import dnacauldron 6 import dnacauldron
3 from Bio import SeqIO 7
4 import pandas
5 import argparse
6 import zipfile
7 8
8 def cloning_simulation(files_to_assembly, domesticated_list, 9 def cloning_simulation(files_to_assembly, domesticated_list,
9 csv_file, assembly_type, topology, 10 csv_file, assembly_type, topology,
10 file_name_mapping, file_name_mapping_dom, 11 file_name_mapping, file_name_mapping_dom,
11 use_file_names_as_id, 12 use_file_names_as_id,
12 outdir_simulation, output_simulation,enzyme,outdir_gb): 13 outdir_simulation, output_simulation, enzyme, outdir_gb):
13 14
14 files_to_assembly = files_to_assembly.split(',') 15 files_to_assembly = files_to_assembly.split(',')
15 16
16 repository = dnacauldron.SequenceRepository() 17 repository = dnacauldron.SequenceRepository()
17 repository.import_records(files=files_to_assembly, 18 repository.import_records(files=files_to_assembly,
18 use_file_names_as_ids=use_file_names_as_id, 19 use_file_names_as_ids=use_file_names_as_id,
19 topology=topology) 20 topology=topology)
20 if domesticated_list: 21 if domesticated_list:
21 domesticated_files = domesticated_list.split(',') 22 domesticated_files = domesticated_list.split(',')
22 repository.import_records(files=domesticated_files, 23 repository.import_records(files=domesticated_files,
23 use_file_names_as_ids=use_file_names_as_id, 24 use_file_names_as_ids=use_file_names_as_id,
24 topology=topology) 25 topology=topology)
25 26
26 #refine the real record name dict 27 # refine the real record name dict
27 if isinstance(file_name_mapping, str): 28 if isinstance(file_name_mapping, str):
28 file_name_mapping = dict( 29 file_name_mapping = dict(
29 item.split(":") for item in file_name_mapping.split(",") 30 item.split(":") for item in file_name_mapping.split(",")
30 ) 31 )
31 real_names = { 32 real_names = {
32 os.path.splitext(os.path.basename(k))[0]: v.replace(".gb", "") 33 os.path.splitext(os.path.basename(k))[0]: v.replace(".gb", "")
33 for k, v in file_name_mapping.items() 34 for k, v in file_name_mapping.items()
34 } 35 }
35 36
36 #refine the real record name dict_dom 37 # refine the real record name dict_dom
37 if file_name_mapping_dom == "": 38 if file_name_mapping_dom == "":
38 file_name_mapping_dom={} 39 file_name_mapping_dom = {}
39 else: 40 else:
40 if isinstance(file_name_mapping_dom, str): 41 if isinstance(file_name_mapping_dom, str):
41 file_name_mapping_dom = dict( 42 file_name_mapping_dom = dict(
42 item.split(":") for item in file_name_mapping_dom.split(",") 43 item.split(":") for item in file_name_mapping_dom.split(",")
43 ) 44 )
44 dom_real_names = { 45 dom_real_names = {
45 os.path.splitext(os.path.basename(k))[0]: v.replace(".gb", "") 46 os.path.splitext(os.path.basename(k))[0]: v.replace(".gb", "")
46 for k, v in file_name_mapping_dom.items() 47 for k, v in file_name_mapping_dom.items()
47 } 48 }
48 real_names.update(dom_real_names) 49 real_names.update(dom_real_names)
49 50
50 #update the records 51 # update the records
51 52
52 for key, record in list(repository.collections["parts"].items()): 53 for key, record in list(repository.collections["parts"].items()):
53 current_id = record.id 54 current_id = record.id
54 if current_id in real_names: 55 if current_id in real_names:
55 new_id = real_names[current_id] 56 new_id = real_names[current_id]
56 record.id = new_id 57 record.id = new_id
57 record.name = new_id 58 record.name = new_id
58 record.description = new_id 59 record.description = new_id
59 repository.collections["parts"][new_id] = repository.collections["parts"].pop(key) 60 repository.collections["parts"][new_id] = repository.collections["parts"].pop(key)
60 ######################################################## 61 ########################################################
61 #print (f"repo: {vars(repository)}") 62 # print (f"repo: {vars(repository)}")
62 #any(pandas.read_csv(csv_file, index_col=0, header=None).duplicated()) 63 # any(pandas.read_csv(csv_file, index_col=0, header=None).duplicated())
63 df=pandas.read_csv(csv_file, index_col=0, header=None) 64 df = pandas.read_csv(csv_file, index_col=0, header=None)
64 if df.duplicated().any(): 65 if df.duplicated().any():
65 raise ValueError("Duplicate rows found in the data!") 66 raise ValueError("Duplicate rows found in the data!")
66 67
67 if assembly_type == "Type2sRestrictionAssembly": 68 if assembly_type == "Type2sRestrictionAssembly":
68 assembly_class = dnacauldron.Type2sRestrictionAssembly 69 assembly_class = dnacauldron.Type2sRestrictionAssembly
76 assembly_class = dnacauldron.OligoPairAnnealin 77 assembly_class = dnacauldron.OligoPairAnnealin
77 elif assembly_type == "LigaseCyclingReactionAssembly": 78 elif assembly_type == "LigaseCyclingReactionAssembly":
78 assembly_class = dnacauldron.LigaseCyclingReactionAssembly 79 assembly_class = dnacauldron.LigaseCyclingReactionAssembly
79 else: 80 else:
80 raise ValueError(f"Unsupported assembly type: {assembly_type}") 81 raise ValueError(f"Unsupported assembly type: {assembly_type}")
81 82
82 new_csvname = "assambly.csv" 83 new_csvname = "assambly.csv"
83 os.rename(csv_file, new_csvname) 84 os.rename(csv_file, new_csvname)
84 85
85 assembly_plan = dnacauldron.AssemblyPlan.from_spreadsheet( 86 assembly_plan = dnacauldron.AssemblyPlan.from_spreadsheet(
86 name="auto_from_filename", 87 name="auto_from_filename",
111 for root, dirs, files in os.walk(outdir_simulation): 112 for root, dirs, files in os.walk(outdir_simulation):
112 for file in files: 113 for file in files:
113 full_path = os.path.join(root, file) 114 full_path = os.path.join(root, file)
114 arcname = os.path.relpath(full_path, outdir_simulation) 115 arcname = os.path.relpath(full_path, outdir_simulation)
115 zipf.write(full_path, arcname) 116 zipf.write(full_path, arcname)
116 #print("Files in the zip archive:") 117 # print("Files in the zip archive:")
117 #for info in zipf.infolist(): 118 # for info in zipf.infolist():
118 #print(info.filename) 119 # print(info.filename)
119 for member in zipf.namelist(): 120 for member in zipf.namelist():
120 # Only extract actual files inside 'all_construct_records/' (not subfolders) 121 # Only extract actual files inside 'all_construct_records/' (not subfolders)
121 if member.startswith("assambly_simulation/all_construct_records/") and not member.endswith("/"): 122 if member.startswith("assambly_simulation/all_construct_records/") and not member.endswith("/"):
122 # Get the file name only (strip folder path) 123 # Get the file name only (strip folder path)
123 filename = os.path.basename(member) 124 filename = os.path.basename(member)
135 136
136 137
137 def parse_command_line_args(): 138 def parse_command_line_args():
138 parser = argparse.ArgumentParser(description="Domestication") 139 parser = argparse.ArgumentParser(description="Domestication")
139 140
140 parser.add_argument("--parts_files", required=True, 141 parser.add_argument("--parts_files", required=True,
141 help="List of GenBank files (Comma-separated)") 142 help="List of GenBank files (Comma-separated)")
142 parser.add_argument("--domesticated_seq", required=True, 143 parser.add_argument("--domesticated_seq", required=True,
143 help="output of domestication (ganbank list)") 144 help="output of domestication (ganbank list)")
144 parser.add_argument("--assembly_csv", required=True, 145 parser.add_argument("--assembly_csv", required=True,
145 help="csv assembly") 146 help="csv assembly")
146 parser.add_argument('--assembly_plan_name', type=str, 147 parser.add_argument('--assembly_plan_name', type=str, required=False,
147 help='type of assembly') 148 help='type of assembly')
148 parser.add_argument('--topology', type=str, 149 parser.add_argument('--topology', type=str, required=False,
149 help='"circular" or "linear"') 150 help='"circular" or "linear"')
150 parser.add_argument('--file_name_mapping', type=str, 151 parser.add_argument('--file_name_mapping', type=str,
151 help='Mapping of Galaxy filenames to original filenames') 152 help='Mapping of Galaxy filenames to original filenames')
152 parser.add_argument('--file_name_mapping_dom', type=str, 153 parser.add_argument('--file_name_mapping_dom', type=str,
153 help='Mapping of Galaxy filenames to original domestication filenames') 154 help='Mapping of Galaxy filenames to original domestication filenames')
154 parser.add_argument("--use_file_names_as_id", type=lambda x: x.lower() == 'true', default=True, 155 parser.add_argument("--use_file_names_as_id", type=lambda x: x.lower() == 'true', default=True,
155 help="Use file names as IDs (True/False)") 156 help="Use file names as IDs (True/False)")
156 parser.add_argument("--outdir_simulation", required=True, 157 parser.add_argument("--outdir_simulation", required=True,
157 help="dir output for cloning simulation results") 158 help="dir output for cloning simulation results")
158 parser.add_argument("--output_simulation", required=True, 159 parser.add_argument("--output_simulation", required=True,
159 help="zip output for cloning simulation results") 160 help="zip output for cloning simulation results")
160 parser.add_argument('--enzyme', type=str, 161 parser.add_argument('--enzyme', type=str,required=False,
161 help='enzyme to use') 162 help='enzyme to use')
162 parser.add_argument("--outdir_gb", required=True, 163 parser.add_argument("--outdir_gb", required=True,
163 help="dir output constructs gb files") 164 help="dir output constructs gb files")
165 parser.add_argument("--use_json_paramers", required=True,
166 help="Use parameters from JSON: true/false")
167 parser.add_argument("--json_conf", required=False,
168 help="JSON config file with DB parameters")
164 169
165 return parser.parse_args() 170 return parser.parse_args()
166 171
172
167 if __name__ == "__main__": 173 if __name__ == "__main__":
168 args = parse_command_line_args() 174 args = parse_command_line_args()
169 175
176 #json param checking
177 config_params = {}
178 use_json = args.use_json_paramers == 'true'
179 if use_json:
180 if not args.json_conf:
181 raise ValueError("You must provide --json_conf when --use_json_paramers is 'true'")
182 with open(args.json_conf, "r") as f:
183 config_params = json.load(f)
184 else:
185 config_params = {
186 "assembly_plan_name": args.assembly_plan_name,
187 "topology": args.topology,
188 "enzyme": args.enzyme
189 }
190 assembly_plan_name = config_params["assembly_plan_name"]
191 topology = config_params["topology"]
192 enzyme = config_params["enzyme"]
193
170 cloning_simulation( 194 cloning_simulation(
171 args.parts_files, args.domesticated_seq, 195 args.parts_files, args.domesticated_seq,
172 args.assembly_csv, args.assembly_plan_name, args.topology, 196 args.assembly_csv, assembly_plan_name, topology,
173 args.file_name_mapping, args.file_name_mapping_dom, 197 args.file_name_mapping, args.file_name_mapping_dom,
174 args.use_file_names_as_id, args.outdir_simulation, 198 args.use_file_names_as_id, args.outdir_simulation,
175 args.output_simulation, args.enzyme, args.outdir_gb 199 args.output_simulation, enzyme, args.outdir_gb
176 ) 200 )