comparison save_to_db.py @ 0:034686b5bc15 draft

planemo upload for repository https://github.com/brsynth commit 6ae809b563b40bcdb6be2e74fe2a84ddad5484ae
author tduigou
date Thu, 24 Apr 2025 09:56:36 +0000
parents
children c7a7520afb4b
comparison
equal deleted inserted replaced
-1:000000000000 0:034686b5bc15
1 import subprocess
2 import time
3 import argparse
4 import socket
5 import os
6 import re
7 import pandas as pd
8 from sqlalchemy import create_engine, inspect
9 from sqlalchemy.sql import text
10 from sqlalchemy.engine.url import make_url
11 from sqlalchemy.exc import OperationalError
12
13 def fix_db_uri(uri):
14 """Replace __at__ with @ in the URI if needed."""
15 return uri.replace("__at__", "@")
16
17 def is_port_in_use(port):
18 """Check if a TCP port is already in use on localhost."""
19 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
20 return s.connect_ex(('localhost', port)) == 0
21
22 def extract_db_name(uri):
23 """Extract the database name from the SQLAlchemy URI."""
24 url = make_url(uri)
25 return url.database
26
27 def start_postgres_container(db_name):
28 """Start a PostgreSQL container with the given database name as the container name."""
29 container_name = db_name
30
31 # Check if container is already running
32 container_running = subprocess.run(
33 f"docker ps -q -f name={container_name}", shell=True, capture_output=True, text=True
34 )
35
36 if container_running.stdout.strip():
37 print(f"Container '{container_name}' is already running.")
38 return
39
40 # Check if container exists (stopped)
41 container_exists = subprocess.run(
42 f"docker ps -a -q -f name={container_name}", shell=True, capture_output=True, text=True
43 )
44
45 if container_exists.stdout.strip():
46 print(f"Starting existing container '{container_name}'...")
47 subprocess.run(f"docker start {container_name}", shell=True)
48 print(f"PostgreSQL Docker container '{container_name}' activated.")
49 return
50
51 # If container does not exist, create and start a new one
52 port = 5432 if not is_port_in_use(5432) else 5433
53 postgres_password = os.getenv("POSTGRES_PASSWORD", "RK17")
54
55 start_command = [
56 "docker", "run", "--name", container_name,
57 "-e", f"POSTGRES_PASSWORD={postgres_password}",
58 "-p", f"{port}:5432",
59 "-d", "postgres"
60 ]
61
62 try:
63 subprocess.run(start_command, check=True)
64 print(f"PostgreSQL Docker container '{container_name}' started on port {port}.")
65 except subprocess.CalledProcessError as e:
66 print(f"Failed to start Docker container: {e}")
67
68 def wait_for_db(uri, timeout=60):
69 """Try connecting to the DB until it works or timeout."""
70 engine = create_engine(uri)
71 start_time = time.time()
72 while time.time() - start_time < timeout:
73 try:
74 with engine.connect():
75 print("Connected to database.")
76 return
77 except OperationalError:
78 print("Database not ready, retrying...")
79 time.sleep(2)
80 raise Exception("Database connection failed after timeout.")
81
82 def push_gb_annotations(gb_files, sequence_column, annotation_column, db_uri, table_name, fragment_column_name, output, file_name_mapping):
83 """Push GenBank file content into the database if the fragment is not already present."""
84 db_uri = fix_db_uri(db_uri)
85 engine = create_engine(db_uri)
86 inserted_fragments = []
87
88 try:
89 # Parse the file_name_mapping string into a dictionary {base_file_name: fragment_name}
90 file_name_mapping_dict = {
91 os.path.basename(path): os.path.splitext(fragment_name)[0]
92 for mapping in file_name_mapping.split(",")
93 for path, fragment_name in [mapping.split(":")]
94 }
95
96 #print("File name mapping dictionary:")
97 #print(file_name_mapping_dict) # Debugging: Print the mapping dictionary
98
99 with engine.begin() as connection:
100 inspector = inspect(engine)
101 columns = [col['name'] for col in inspector.get_columns(table_name)]
102
103 if fragment_column_name not in columns:
104 raise ValueError(f"Fragment column '{fragment_column_name}' not found in table '{table_name}'.")
105
106 # Get existing fragments
107 all_rows = connection.execute(text(f"SELECT {fragment_column_name} FROM {table_name}")).fetchall()
108 existing_fragments = {row[0] for row in all_rows}
109
110 insert_rows = []
111
112 for gb_file in gb_files:
113 # Extract base file name (just the file name, not the full path)
114 real_file_name = os.path.basename(gb_file)
115
116 print(f"Processing file: {real_file_name}") # Debugging: Log the current file
117
118 # Get the corresponding fragment name from the mapping
119 fragment_name = file_name_mapping_dict.get(real_file_name)
120
121 if not fragment_name:
122 raise ValueError(f"Fragment name not found for file '{real_file_name}' in file_name_mapping.")
123
124 # If the fragment is already in the DB, raise an error and stop the process
125 if fragment_name in existing_fragments:
126 raise RuntimeError(f"Fatal Error: Fragment '{fragment_name}' already exists in DB. Stopping the process.")
127
128 with open(gb_file, "r") as f:
129 content = f.read()
130
131 origin_match = re.search(r"^ORIGIN.*$", content, flags=re.MULTILINE)
132 if not origin_match:
133 raise ValueError(f"ORIGIN section not found in file: {gb_file}")
134
135 origin_start = origin_match.start()
136 annotation_text = content[:origin_start].strip()
137 sequence_text = content[origin_start:].strip()
138
139 values = {}
140 values[fragment_column_name] = fragment_name
141 values[annotation_column] = annotation_text
142 values[sequence_column] = sequence_text
143
144 insert_rows.append(values)
145 inserted_fragments.append(fragment_name)
146
147 # Insert the rows into the database
148 for values in insert_rows:
149 col_names = ", ".join(values.keys())
150 placeholders = ", ".join([f":{key}" for key in values.keys()])
151 insert_stmt = text(f"INSERT INTO {table_name} ({col_names}) VALUES ({placeholders})")
152
153 #print(f"Inserting into DB: {values}") # Debugging print statement
154 result = connection.execute(insert_stmt, values)
155
156 #print(f"Insert result: {result.rowcount if hasattr(result, 'rowcount') else 'N/A'}") # Debugging the row count
157
158 print(f"Inserted {len(insert_rows)} fragments.")
159
160 # Write inserted fragment names to a text file
161 with open(output, "w") as log_file:
162 for frag in inserted_fragments:
163 log_file.write(f"{frag}\n")
164 print(f"Fragment names written to '{output}'.")
165
166 except Exception as e:
167 print(f"Error during GB file insertion: {e}")
168 raise
169
170 def main():
171 parser = argparse.ArgumentParser(description="Fetch annotations from PostgreSQL database and save as JSON.")
172 parser.add_argument("--input", required=True, help="Input gb files")
173 parser.add_argument("--sequence_column", required=True, help="DB column contains sequence for ganbank file")
174 parser.add_argument("--annotation_column", required=True, help="DB column contains head for ganbank file")
175 parser.add_argument("--db_uri", required=True, help="Database URI connection string")
176 parser.add_argument("--table", required=True, help="Table name in the database")
177 parser.add_argument("--fragment_column", required=True, help="Fragment column name in the database")
178 parser.add_argument("--output", required=True, help="Text report")
179 parser.add_argument("--file_name_mapping", required=True, help="real fragments names")
180 args = parser.parse_args()
181
182 # Start the Docker container (if not already running)
183 gb_file_list = [f.strip() for f in args.input.split(",") if f.strip()]
184 db_uri = fix_db_uri(args.db_uri)
185 db_name = extract_db_name(db_uri)
186 start_postgres_container(db_name)
187
188 # Wait until the database is ready
189 wait_for_db(db_uri)
190
191 # Fetch annotations from the database and save as gb
192 push_gb_annotations(gb_file_list, args.sequence_column, args.annotation_column, db_uri, args.table, args.fragment_column, args.output, args.file_name_mapping)
193
194 if __name__ == "__main__":
195 main()