Mercurial > repos > tduigou > cloning_simulation
comparison cloning_simulation.py @ 16:fbb241adf6c2 draft default tip
planemo upload for repository https://github.com/Edinburgh-Genome-Foundry/DnaCauldron/tree/master commit af45e5e0e81535ab0423b0bcff8b5b220bb9b4d0-dirty
author | tduigou |
---|---|
date | Thu, 17 Jul 2025 10:17:24 +0000 |
parents | 16ccb36aa8e3 |
children |
comparison
equal
deleted
inserted
replaced
15:074a76e84b80 | 16:fbb241adf6c2 |
---|---|
1 import argparse | |
1 import os | 2 import os |
3 import json | |
4 import zipfile | |
5 import pandas | |
2 import dnacauldron | 6 import dnacauldron |
3 from Bio import SeqIO | 7 |
4 import pandas | |
5 import argparse | |
6 import zipfile | |
7 | 8 |
8 def cloning_simulation(files_to_assembly, domesticated_list, | 9 def cloning_simulation(files_to_assembly, domesticated_list, |
9 csv_file, assembly_type, topology, | 10 csv_file, assembly_type, topology, |
10 file_name_mapping, file_name_mapping_dom, | 11 file_name_mapping, file_name_mapping_dom, |
11 use_file_names_as_id, | 12 use_file_names_as_id, |
12 outdir_simulation, output_simulation,enzyme,outdir_gb): | 13 outdir_simulation, output_simulation, enzyme, outdir_gb): |
13 | 14 |
14 files_to_assembly = files_to_assembly.split(',') | 15 files_to_assembly = files_to_assembly.split(',') |
15 | 16 |
16 repository = dnacauldron.SequenceRepository() | 17 repository = dnacauldron.SequenceRepository() |
17 repository.import_records(files=files_to_assembly, | 18 repository.import_records(files=files_to_assembly, |
18 use_file_names_as_ids=use_file_names_as_id, | 19 use_file_names_as_ids=use_file_names_as_id, |
19 topology=topology) | 20 topology=topology) |
20 if domesticated_list: | 21 if domesticated_list: |
21 domesticated_files = domesticated_list.split(',') | 22 domesticated_files = domesticated_list.split(',') |
22 repository.import_records(files=domesticated_files, | 23 repository.import_records(files=domesticated_files, |
23 use_file_names_as_ids=use_file_names_as_id, | 24 use_file_names_as_ids=use_file_names_as_id, |
24 topology=topology) | 25 topology=topology) |
25 | 26 |
26 #refine the real record name dict | 27 # refine the real record name dict |
27 if isinstance(file_name_mapping, str): | 28 if isinstance(file_name_mapping, str): |
28 file_name_mapping = dict( | 29 file_name_mapping = dict( |
29 item.split(":") for item in file_name_mapping.split(",") | 30 item.split(":") for item in file_name_mapping.split(",") |
30 ) | 31 ) |
31 real_names = { | 32 real_names = { |
32 os.path.splitext(os.path.basename(k))[0]: v.replace(".gb", "") | 33 os.path.splitext(os.path.basename(k))[0]: v.replace(".gb", "") |
33 for k, v in file_name_mapping.items() | 34 for k, v in file_name_mapping.items() |
34 } | 35 } |
35 | 36 |
36 #refine the real record name dict_dom | 37 # refine the real record name dict_dom |
37 if file_name_mapping_dom == "": | 38 if file_name_mapping_dom == "": |
38 file_name_mapping_dom={} | 39 file_name_mapping_dom = {} |
39 else: | 40 else: |
40 if isinstance(file_name_mapping_dom, str): | 41 if isinstance(file_name_mapping_dom, str): |
41 file_name_mapping_dom = dict( | 42 file_name_mapping_dom = dict( |
42 item.split(":") for item in file_name_mapping_dom.split(",") | 43 item.split(":") for item in file_name_mapping_dom.split(",") |
43 ) | 44 ) |
44 dom_real_names = { | 45 dom_real_names = { |
45 os.path.splitext(os.path.basename(k))[0]: v.replace(".gb", "") | 46 os.path.splitext(os.path.basename(k))[0]: v.replace(".gb", "") |
46 for k, v in file_name_mapping_dom.items() | 47 for k, v in file_name_mapping_dom.items() |
47 } | 48 } |
48 real_names.update(dom_real_names) | 49 real_names.update(dom_real_names) |
49 | 50 |
50 #update the records | 51 # update the records |
51 | 52 |
52 for key, record in list(repository.collections["parts"].items()): | 53 for key, record in list(repository.collections["parts"].items()): |
53 current_id = record.id | 54 current_id = record.id |
54 if current_id in real_names: | 55 if current_id in real_names: |
55 new_id = real_names[current_id] | 56 new_id = real_names[current_id] |
56 record.id = new_id | 57 record.id = new_id |
57 record.name = new_id | 58 record.name = new_id |
58 record.description = new_id | 59 record.description = new_id |
59 repository.collections["parts"][new_id] = repository.collections["parts"].pop(key) | 60 repository.collections["parts"][new_id] = repository.collections["parts"].pop(key) |
60 ######################################################## | 61 ######################################################## |
61 #print (f"repo: {vars(repository)}") | 62 # print (f"repo: {vars(repository)}") |
62 #any(pandas.read_csv(csv_file, index_col=0, header=None).duplicated()) | 63 # any(pandas.read_csv(csv_file, index_col=0, header=None).duplicated()) |
63 df=pandas.read_csv(csv_file, index_col=0, header=None) | 64 df = pandas.read_csv(csv_file, index_col=0, header=None) |
64 if df.duplicated().any(): | 65 if df.duplicated().any(): |
65 raise ValueError("Duplicate rows found in the data!") | 66 raise ValueError("Duplicate rows found in the data!") |
66 | 67 |
67 if assembly_type == "Type2sRestrictionAssembly": | 68 if assembly_type == "Type2sRestrictionAssembly": |
68 assembly_class = dnacauldron.Type2sRestrictionAssembly | 69 assembly_class = dnacauldron.Type2sRestrictionAssembly |
76 assembly_class = dnacauldron.OligoPairAnnealin | 77 assembly_class = dnacauldron.OligoPairAnnealin |
77 elif assembly_type == "LigaseCyclingReactionAssembly": | 78 elif assembly_type == "LigaseCyclingReactionAssembly": |
78 assembly_class = dnacauldron.LigaseCyclingReactionAssembly | 79 assembly_class = dnacauldron.LigaseCyclingReactionAssembly |
79 else: | 80 else: |
80 raise ValueError(f"Unsupported assembly type: {assembly_type}") | 81 raise ValueError(f"Unsupported assembly type: {assembly_type}") |
81 | 82 |
82 new_csvname = "assambly.csv" | 83 new_csvname = "assambly.csv" |
83 os.rename(csv_file, new_csvname) | 84 os.rename(csv_file, new_csvname) |
84 | 85 |
85 assembly_plan = dnacauldron.AssemblyPlan.from_spreadsheet( | 86 assembly_plan = dnacauldron.AssemblyPlan.from_spreadsheet( |
86 name="auto_from_filename", | 87 name="auto_from_filename", |
111 for root, dirs, files in os.walk(outdir_simulation): | 112 for root, dirs, files in os.walk(outdir_simulation): |
112 for file in files: | 113 for file in files: |
113 full_path = os.path.join(root, file) | 114 full_path = os.path.join(root, file) |
114 arcname = os.path.relpath(full_path, outdir_simulation) | 115 arcname = os.path.relpath(full_path, outdir_simulation) |
115 zipf.write(full_path, arcname) | 116 zipf.write(full_path, arcname) |
116 #print("Files in the zip archive:") | 117 # print("Files in the zip archive:") |
117 #for info in zipf.infolist(): | 118 # for info in zipf.infolist(): |
118 #print(info.filename) | 119 # print(info.filename) |
119 for member in zipf.namelist(): | 120 for member in zipf.namelist(): |
120 # Only extract actual files inside 'all_construct_records/' (not subfolders) | 121 # Only extract actual files inside 'all_construct_records/' (not subfolders) |
121 if member.startswith("assambly_simulation/all_construct_records/") and not member.endswith("/"): | 122 if member.startswith("assambly_simulation/all_construct_records/") and not member.endswith("/"): |
122 # Get the file name only (strip folder path) | 123 # Get the file name only (strip folder path) |
123 filename = os.path.basename(member) | 124 filename = os.path.basename(member) |
135 | 136 |
136 | 137 |
137 def parse_command_line_args(): | 138 def parse_command_line_args(): |
138 parser = argparse.ArgumentParser(description="Domestication") | 139 parser = argparse.ArgumentParser(description="Domestication") |
139 | 140 |
140 parser.add_argument("--parts_files", required=True, | 141 parser.add_argument("--parts_files", required=True, |
141 help="List of GenBank files (Comma-separated)") | 142 help="List of GenBank files (Comma-separated)") |
142 parser.add_argument("--domesticated_seq", required=True, | 143 parser.add_argument("--domesticated_seq", required=True, |
143 help="output of domestication (ganbank list)") | 144 help="output of domestication (ganbank list)") |
144 parser.add_argument("--assembly_csv", required=True, | 145 parser.add_argument("--assembly_csv", required=True, |
145 help="csv assembly") | 146 help="csv assembly") |
146 parser.add_argument('--assembly_plan_name', type=str, | 147 parser.add_argument('--assembly_plan_name', type=str, required=False, |
147 help='type of assembly') | 148 help='type of assembly') |
148 parser.add_argument('--topology', type=str, | 149 parser.add_argument('--topology', type=str, required=False, |
149 help='"circular" or "linear"') | 150 help='"circular" or "linear"') |
150 parser.add_argument('--file_name_mapping', type=str, | 151 parser.add_argument('--file_name_mapping', type=str, |
151 help='Mapping of Galaxy filenames to original filenames') | 152 help='Mapping of Galaxy filenames to original filenames') |
152 parser.add_argument('--file_name_mapping_dom', type=str, | 153 parser.add_argument('--file_name_mapping_dom', type=str, |
153 help='Mapping of Galaxy filenames to original domestication filenames') | 154 help='Mapping of Galaxy filenames to original domestication filenames') |
154 parser.add_argument("--use_file_names_as_id", type=lambda x: x.lower() == 'true', default=True, | 155 parser.add_argument("--use_file_names_as_id", type=lambda x: x.lower() == 'true', default=True, |
155 help="Use file names as IDs (True/False)") | 156 help="Use file names as IDs (True/False)") |
156 parser.add_argument("--outdir_simulation", required=True, | 157 parser.add_argument("--outdir_simulation", required=True, |
157 help="dir output for cloning simulation results") | 158 help="dir output for cloning simulation results") |
158 parser.add_argument("--output_simulation", required=True, | 159 parser.add_argument("--output_simulation", required=True, |
159 help="zip output for cloning simulation results") | 160 help="zip output for cloning simulation results") |
160 parser.add_argument('--enzyme', type=str, | 161 parser.add_argument('--enzyme', type=str,required=False, |
161 help='enzyme to use') | 162 help='enzyme to use') |
162 parser.add_argument("--outdir_gb", required=True, | 163 parser.add_argument("--outdir_gb", required=True, |
163 help="dir output constructs gb files") | 164 help="dir output constructs gb files") |
165 parser.add_argument("--use_json_paramers", required=True, | |
166 help="Use parameters from JSON: true/false") | |
167 parser.add_argument("--json_conf", required=False, | |
168 help="JSON config file with DB parameters") | |
164 | 169 |
165 return parser.parse_args() | 170 return parser.parse_args() |
166 | 171 |
172 | |
167 if __name__ == "__main__": | 173 if __name__ == "__main__": |
168 args = parse_command_line_args() | 174 args = parse_command_line_args() |
169 | 175 |
176 #json param checking | |
177 config_params = {} | |
178 use_json = args.use_json_paramers == 'true' | |
179 if use_json: | |
180 if not args.json_conf: | |
181 raise ValueError("You must provide --json_conf when --use_json_paramers is 'true'") | |
182 with open(args.json_conf, "r") as f: | |
183 config_params = json.load(f) | |
184 else: | |
185 config_params = { | |
186 "assembly_plan_name": args.assembly_plan_name, | |
187 "topology": args.topology, | |
188 "enzyme": args.enzyme | |
189 } | |
190 assembly_plan_name = config_params["assembly_plan_name"] | |
191 topology = config_params["topology"] | |
192 enzyme = config_params["enzyme"] | |
193 | |
170 cloning_simulation( | 194 cloning_simulation( |
171 args.parts_files, args.domesticated_seq, | 195 args.parts_files, args.domesticated_seq, |
172 args.assembly_csv, args.assembly_plan_name, args.topology, | 196 args.assembly_csv, assembly_plan_name, topology, |
173 args.file_name_mapping, args.file_name_mapping_dom, | 197 args.file_name_mapping, args.file_name_mapping_dom, |
174 args.use_file_names_as_id, args.outdir_simulation, | 198 args.use_file_names_as_id, args.outdir_simulation, |
175 args.output_simulation, args.enzyme, args.outdir_gb | 199 args.output_simulation, enzyme, args.outdir_gb |
176 ) | 200 ) |