Mercurial > repos > bgruening > sucos_max_score
changeset 6:f67eb93bba2a draft default tip
"planemo upload for repository https://github.com/bgruening/galaxytools/tree/master/chemicaltoolbox/sucos commit 05dc325ce687441e5d3bdbdedcc0e3529cd5e070"
author | bgruening |
---|---|
date | Wed, 14 Apr 2021 09:28:29 +0000 |
parents | 887706e7c3d4 |
children | |
files | sucos.py sucos_cluster.py sucos_max.py utils.py |
diffstat | 4 files changed, 263 insertions(+), 108 deletions(-) [+] |
line wrap: on
line diff
--- a/sucos.py Tue Jul 28 12:11:29 2020 +0000 +++ b/sucos.py Wed Apr 14 09:28:29 2021 +0000 @@ -9,26 +9,39 @@ """ from __future__ import print_function -import argparse, os, sys, gzip + +import argparse +import os + import numpy as np -from rdkit import Chem, rdBase, RDConfig +import utils +from rdkit import Chem, RDConfig from rdkit.Chem import AllChem, rdShapeHelpers from rdkit.Chem.FeatMaps import FeatMaps -import utils - -### start function definitions ######################################### +# start function definitions ######################################### # Setting up the features to use in FeatureMap -fdef = AllChem.BuildFeatureFactory(os.path.join(RDConfig.RDDataDir, 'BaseFeatures.fdef')) +fdef = AllChem.BuildFeatureFactory( + os.path.join(RDConfig.RDDataDir, "BaseFeatures.fdef") +) fmParams = {} for k in fdef.GetFeatureFamilies(): fparams = FeatMaps.FeatMapParams() fmParams[k] = fparams -keep = ('Donor', 'Acceptor', 'NegIonizable', 'PosIonizable', 'ZnBinder', - 'Aromatic', 'Hydrophobe', 'LumpedHydrophobe') +keep = ( + "Donor", + "Acceptor", + "NegIonizable", + "PosIonizable", + "ZnBinder", + "Aromatic", + "Hydrophobe", + "LumpedHydrophobe", +) + def filterFeature(f): result = f.GetFamily() in keep @@ -37,6 +50,7 @@ utils.log("Filtered out feature type", f.GetFamily()) return result + def getRawFeatures(mol): rawFeats = fdef.GetFeaturesForMol(mol) @@ -44,7 +58,10 @@ filtered = list(filter(filterFeature, rawFeats)) return filtered -def get_FeatureMapScore(small_feats, large_feats, tani=False, score_mode=FeatMaps.FeatMapScoreMode.All): + +def get_FeatureMapScore( + small_feats, large_feats, tani=False, score_mode=FeatMaps.FeatMapScoreMode.All +): """ Generate the feature map score. @@ -58,7 +75,10 @@ for rawFeats in [small_feats, large_feats]: # filter that list down to only include the ones we're interested in featLists.append(rawFeats) - fms = [FeatMaps.FeatMap(feats=x, weights=[1] * len(x), params=fmParams) for x in featLists] + fms = [ + FeatMaps.FeatMap(feats=x, weights=[1] * len(x), params=fmParams) + for x in featLists + ] # set the score mode fms[0].scoreMode = score_mode @@ -69,24 +89,35 @@ B = len(featLists[1]) if B != fms[1].GetNumFeatures(): utils.log("Why isn't B equal to number of features...?!") - tani_score = float(c) / (A+B-c) + tani_score = float(c) / (A + B - c) return tani_score else: - fm_score = fms[0].ScoreFeats(featLists[1]) / min(fms[0].GetNumFeatures(), len(featLists[1])) + fm_score = fms[0].ScoreFeats(featLists[1]) / min( + fms[0].GetNumFeatures(), len(featLists[1]) + ) return fm_score except ZeroDivisionError: utils.log("ZeroDivisionError") return 0 if tani: - tani_score = float(c) / (A+B-c) + tani_score = float(c) / (A + B - c) return tani_score else: - fm_score = fms[0].ScoreFeats(featLists[1]) / min(fms[0].GetNumFeatures(), len(featLists[1])) + fm_score = fms[0].ScoreFeats(featLists[1]) / min( + fms[0].GetNumFeatures(), len(featLists[1]) + ) return fm_score -def get_SucosScore(ref_mol, query_mol, tani=False, ref_features=None, query_features=None, score_mode=FeatMaps.FeatMapScoreMode.All): +def get_SucosScore( + ref_mol, + query_mol, + tani=False, + ref_features=None, + query_features=None, + score_mode=FeatMaps.FeatMapScoreMode.All, +): """ This is the key function that calculates the SuCOS scores and is expected to be called from other modules. To improve performance you can pre-calculate the features and pass them in as optional parameters to avoid having @@ -107,29 +138,41 @@ query_features = getRawFeatures(query_mol) fm_score = get_FeatureMapScore(ref_features, query_features, tani, score_mode) - fm_score = np.clip(fm_score, 0, 1) + fm_score = np.float(np.clip(fm_score, 0, 1)) - try : + try: if tani: tani_sim = 1 - float(rdShapeHelpers.ShapeTanimotoDist(ref_mol, query_mol)) tani_sim = np.clip(tani_sim, 0, 1) - SuCOS_score = 0.5*fm_score + 0.5*tani_sim + SuCOS_score = 0.5 * fm_score + 0.5 * tani_sim return SuCOS_score, fm_score, tani_sim else: - protrude_dist = rdShapeHelpers.ShapeProtrudeDist(ref_mol, query_mol, allowReordering=False) + protrude_dist = rdShapeHelpers.ShapeProtrudeDist( + ref_mol, query_mol, allowReordering=False + ) protrude_dist = np.clip(protrude_dist, 0, 1) protrude_val = 1.0 - protrude_dist SuCOS_score = 0.5 * fm_score + 0.5 * protrude_val return SuCOS_score, fm_score, protrude_val - except: + except Exception: utils.log("Failed to calculate SuCOS scores. Returning 0,0,0") return 0, 0, 0 -def process(refmol_filename, inputs_filename, outputs_filename, refmol_index=None, - refmol_format=None, tani=False, score_mode=FeatMaps.FeatMapScoreMode.All): - ref_mol = utils.read_single_molecule(refmol_filename, index=refmol_index, format=refmol_format) - #utils.log("Reference mol has", ref_mol.GetNumHeavyAtoms(), "heavy atoms") +def process( + refmol_filename, + inputs_filename, + outputs_filename, + refmol_index=None, + refmol_format=None, + tani=False, + score_mode=FeatMaps.FeatMapScoreMode.All, +): + + ref_mol = utils.read_single_molecule( + refmol_filename, index=refmol_index, format=refmol_format + ) + # utils.log("Reference mol has", ref_mol.GetNumHeavyAtoms(), "heavy atoms") ref_features = getRawFeatures(ref_mol) input_file = utils.open_file_for_reading(inputs_filename) @@ -141,12 +184,18 @@ total = 0 errors = 0 for mol in suppl: - count +=1 + count += 1 if mol is None: continue - #utils.log("Mol has", str(mol.GetNumHeavyAtoms()), "heavy atoms") + # utils.log("Mol has", str(mol.GetNumHeavyAtoms()), "heavy atoms") try: - sucos_score, fm_score, val3 = get_SucosScore(ref_mol, mol, tani=tani, ref_features=ref_features, score_mode=score_mode) + sucos_score, fm_score, val3 = get_SucosScore( + ref_mol, + mol, + tani=tani, + ref_features=ref_features, + score_mode=score_mode, + ) mol.SetDoubleProp("SuCOS_Score", sucos_score) mol.SetDoubleProp("SuCOS_FeatureMap_Score", fm_score) if tani: @@ -155,9 +204,9 @@ mol.SetDoubleProp("SuCOS_Protrude_Score", val3) utils.log("Scores:", sucos_score, fm_score, val3) writer.write(mol) - total +=1 + total += 1 except ValueError as e: - errors +=1 + errors += 1 utils.log("Molecule", count, "failed to score:", e.message) input_file.close() @@ -165,41 +214,73 @@ writer.close() output_file.close() - utils.log("Completed.", total, "processed, ", count, "succeeded, ", errors, "errors") + utils.log( + "Completed.", total, "processed, ", count, "succeeded, ", errors, "errors" + ) + def parse_score_mode(value): - if value == None or value == 'all': + if value is None or value == "all": return FeatMaps.FeatMapScoreMode.All - elif value == 'closest': + elif value == "closest": return FeatMaps.FeatMapScoreMode.Closest - elif value == 'best': + elif value == "best": return FeatMaps.FeatMapScoreMode.Best else: raise ValueError(value + " is not a valid scoring mode option") -### start main execution ######################################### +# start main execution ######################################### + def main(): - parser = argparse.ArgumentParser(description='SuCOS with RDKit') - parser.add_argument('-i', '--input', help='Input file in SDF format. Can be gzipped (*.gz).') - parser.add_argument('-r', '--refmol', help='Molecule to compare against in Molfile (.mol) or SDF (.sdf) format') - parser.add_argument('--refmol-format', help="Format for the reference molecule (mol or sdf). " + - "Only needed if files don't have the expected extensions") - parser.add_argument('--refmolidx', help='Reference molecule index in SD file if not the first', type=int, default=1) - parser.add_argument('-o', '--output', help='Output file in SDF format. Can be gzipped (*.gz).') - parser.add_argument('--tanimoto', action='store_true', help='Include Tanimoto distance in score') - parser.add_argument('--score_mode', choices=['all', 'closest', 'best'], - help="choose the scoring mode for the feature map, default is 'all'.") + parser = argparse.ArgumentParser(description="SuCOS with RDKit") + parser.add_argument( + "-i", "--input", help="Input file in SDF format. Can be gzipped (*.gz)." + ) + parser.add_argument( + "-r", + "--refmol", + help="Molecule to compare against in Molfile (.mol) or SDF (.sdf) format", + ) + parser.add_argument( + "--refmol-format", + help="Format for the reference molecule (mol or sdf). " + + "Only needed if files don't have the expected extensions", + ) + parser.add_argument( + "--refmolidx", + help="Reference molecule index in SD file if not the first", + type=int, + default=1, + ) + parser.add_argument( + "-o", "--output", help="Output file in SDF format. Can be gzipped (*.gz)." + ) + parser.add_argument( + "--tanimoto", action="store_true", help="Include Tanimoto distance in score" + ) + parser.add_argument( + "--score_mode", + choices=["all", "closest", "best"], + help="choose the scoring mode for the feature map, default is 'all'.", + ) args = parser.parse_args() utils.log("SuCOS Args: ", args) score_mode = parse_score_mode(args.score_mode) - process(args.refmol, args.input, args.output, refmol_index=args.refmolidx, - refmol_format=args.refmol_format, tani=args.tanimoto, score_mode=score_mode) + process( + args.refmol, + args.input, + args.output, + refmol_index=args.refmolidx, + refmol_format=args.refmol_format, + tani=args.tanimoto, + score_mode=score_mode, + ) if __name__ == "__main__":
--- a/sucos_cluster.py Tue Jul 28 12:11:29 2020 +0000 +++ b/sucos_cluster.py Wed Apr 14 09:28:29 2021 +0000 @@ -10,15 +10,16 @@ GitHub: https://github.com/susanhleung/SuCOS Publication: https://doi.org/10.26434/chemrxiv.8100203.v1 """ +import argparse -import sucos, utils -import argparse, gzip -from rdkit import Chem import numpy as np import pandas as pd -from scipy.cluster.hierarchy import linkage, fcluster +import sucos +import utils +from rdkit import Chem +from scipy.cluster.hierarchy import fcluster, linkage -### start main execution ######################################### +# start main execution ######################################### def calc_distance_matrix(mols): @@ -44,13 +45,17 @@ if tuple1[0] == tuple2[0]: tmp.append(0.0) else: - #utils.log("Calculating SuCOS between", mol1, mol2) - sucos_score, fm_score, tani_score = sucos.get_SucosScore(tuple1[0], tuple2[0], - tani=True, ref_features=tuple1[1], query_features=tuple2[1]) + # utils.log("Calculating SuCOS between", mol1, mol2) + sucos_score, fm_score, tani_score = sucos.get_SucosScore( + tuple1[0], + tuple2[0], + tani=True, + ref_features=tuple1[1], + query_features=tuple2[1], + ) tmp.append(1.0 - sucos_score) matrix.append(tmp) - return matrix @@ -64,24 +69,25 @@ indexes = [x for x in range(0, len(matrix))] cols = [x for x in range(0, len(matrix[0]))] - #utils.log("indexes", indexes) - #utils.log("cols", cols) + # utils.log("indexes", indexes) + # utils.log("cols", cols) df = pd.DataFrame(matrix, columns=cols, index=indexes) utils.log("DataFrame:", df.shape) - #utils.log(df) + # utils.log(df) indices = np.triu_indices(df.shape[0], k=1) - #utils.log("Indices:", indices) + # utils.log("Indices:", indices) t = np.array(df)[indices] - Z = linkage(t, 'average') + Z = linkage(t, "average") lig_clusters = [] - cluster_arr = fcluster(Z, t=threshold, criterion='distance') + cluster_arr = fcluster(Z, t=threshold, criterion="distance") for i in range(np.amax(cluster_arr)): - clus = df.columns[np.argwhere(cluster_arr==i+1)] + clus = df.columns[np.argwhere(cluster_arr == i + 1)] lig_clusters.append([x[0] for x in clus.tolist()]) utils.log("Clusters", lig_clusters) return lig_clusters + def write_clusters_to_sdfs(mols, clusters, basename, gzip=False): """ Write the molecules to SDF files, 1 file for each cluster. @@ -99,7 +105,9 @@ filename = basename + str(i) + ".sdf" if gzip: filename += ".gz" - utils.log("Writing ", len(cluster), "molecules in cluster", i, "to file", filename) + utils.log( + "Writing ", len(cluster), "molecules in cluster", i, "to file", filename + ) output_file = utils.open_file_for_writing(filename) writer = Chem.SDWriter(output_file) for index in cluster: @@ -110,14 +118,26 @@ output_file.close() - def main(): - parser = argparse.ArgumentParser(description='Clustering with SuCOS and RDKit') - parser.add_argument('-i', '--input', help='Input file in SDF format. Can be gzipped (*.gz).') - parser.add_argument('-o', '--output', default="cluster", help="Base name for output files in SDF format. " + - "e.g. if value is 'output' then files like output1.sdf, output2.sdf will be created") - parser.add_argument('--gzip', action='store_true', help='Gzip the outputs generating files like output1.sdf.gz, output2.sdf.gz') - parser.add_argument('-t', '--threshold', type=float, default=0.8, help='Clustering threshold') + parser = argparse.ArgumentParser(description="Clustering with SuCOS and RDKit") + parser.add_argument( + "-i", "--input", help="Input file in SDF format. Can be gzipped (*.gz)." + ) + parser.add_argument( + "-o", + "--output", + default="cluster", + help="Base name for output files in SDF format. " + + "e.g. if value is 'output' then files like output1.sdf, output2.sdf will be created", + ) + parser.add_argument( + "--gzip", + action="store_true", + help="Gzip the outputs generating files like output1.sdf.gz, output2.sdf.gz", + ) + parser.add_argument( + "-t", "--threshold", type=float, default=0.8, help="Clustering threshold" + ) args = parser.parse_args() utils.log("SuCOS Cluster Args: ", args) @@ -131,4 +151,4 @@ if __name__ == "__main__": - main() \ No newline at end of file + main()
--- a/sucos_max.py Tue Jul 28 12:11:29 2020 +0000 +++ b/sucos_max.py Wed Apr 14 09:28:29 2021 +0000 @@ -34,12 +34,17 @@ Publication: https://doi.org/10.26434/chemrxiv.8100203.v1 """ -import sucos, utils -import argparse, gzip, os +import argparse +import os + +import sucos +import utils from rdkit import Chem -def process(inputfilename, clusterfilenames, outputfilename, filter_value, filter_field): +def process( + inputfilename, clusterfilenames, outputfilename, filter_value, filter_field +): all_clusters = {} for filename in clusterfilenames: cluster = [] @@ -49,13 +54,20 @@ for mol in suppl: i += 1 if not mol: - utils.log("WARNING: failed to generate molecule", i, "in cluster", filename) + utils.log( + "WARNING: failed to generate molecule", i, "in cluster", filename + ) continue try: features = sucos.getRawFeatures(mol) cluster.append((mol, features)) - except: - utils.log("WARNING: failed to generate features for molecule", i, "in cluster", filename) + except Exception: + utils.log( + "WARNING: failed to generate features for molecule", + i, + "in cluster", + filename, + ) cluster_file.close() all_clusters[filename] = cluster @@ -75,8 +87,10 @@ continue try: query_features = sucos.getRawFeatures(mol) - except: - utils.log("WARNING: failed to generate features for molecule", mol_num, "in input") + except Exception: + utils.log( + "WARNING: failed to generate features for molecule", mol_num, "in input" + ) continue scores_max = [0, 0, 0] scores_cum = [0, 0, 0] @@ -89,9 +103,13 @@ ref_features = entry[1] index += 1 comparisons += 1 - sucos_score, fm_score, vol_score = sucos.get_SucosScore(hit, mol, - tani=False, ref_features=ref_features, - query_features=query_features) + sucos_score, fm_score, vol_score = sucos.get_SucosScore( + hit, + mol, + tani=False, + ref_features=ref_features, + query_features=query_features, + ) if sucos_score > scores_max[0]: scores_max[0] = sucos_score @@ -104,11 +122,14 @@ scores_cum[1] += fm_score scores_cum[2] += vol_score - # utils.log("Max SuCOS:", scores[0], "FM:", scores[1], "P:", scores[2],"File:", cluster_file_name_only, "Index:", cluster_index) mol.SetDoubleProp("Max_SuCOS_Score", scores_max[0] if scores_max[0] > 0 else 0) - mol.SetDoubleProp("Max_SuCOS_FeatureMap_Score", scores_max[1] if scores_max[1] > 0 else 0) - mol.SetDoubleProp("Max_SuCOS_Protrude_Score", scores_max[2] if scores_max[2] > 0 else 0) + mol.SetDoubleProp( + "Max_SuCOS_FeatureMap_Score", scores_max[1] if scores_max[1] > 0 else 0 + ) + mol.SetDoubleProp( + "Max_SuCOS_Protrude_Score", scores_max[2] if scores_max[2] > 0 else 0 + ) if cluster_name: cluster_file_name_only = cluster_name.split(os.sep)[-1] @@ -117,8 +138,12 @@ # utils.log("Cum SuCOS:", scores[0], "FM:", scores[1], "P:", scores[2]) mol.SetDoubleProp("Cum_SuCOS_Score", scores_cum[0] if scores_cum[0] > 0 else 0) - mol.SetDoubleProp("Cum_SuCOS_FeatureMap_Score", scores_cum[1] if scores_cum[1] > 0 else 0) - mol.SetDoubleProp("Cum_SuCOS_Protrude_Score", scores_cum[2] if scores_cum[2] > 0 else 0) + mol.SetDoubleProp( + "Cum_SuCOS_FeatureMap_Score", scores_cum[1] if scores_cum[1] > 0 else 0 + ) + mol.SetDoubleProp( + "Cum_SuCOS_Protrude_Score", scores_cum[2] if scores_cum[2] > 0 else 0 + ) if filter_value and filter_field: if mol.HasProp(filter_field): @@ -136,20 +161,35 @@ utils.log("Completed", comparisons, "comparisons") -### start main execution ######################################### +# start main execution ######################################### + def main(): - parser = argparse.ArgumentParser(description='Max SuCOS scores with RDKit') - parser.add_argument('-i', '--input', help='Input file to score in SDF format. Can be gzipped (*.gz).') - parser.add_argument('-o', '--output', help='Output file in SDF format. Can be gzipped (*.gz).') - parser.add_argument('clusters', nargs='*', help="One or more SDF files with the clustered hits") - parser.add_argument('--filter-value', type=float, help='Filter out values with scores less than this.') - parser.add_argument('--filter-field', help='Field to use to filter values.') + parser = argparse.ArgumentParser(description="Max SuCOS scores with RDKit") + parser.add_argument( + "-i", + "--input", + help="Input file to score in SDF format. Can be gzipped (*.gz).", + ) + parser.add_argument( + "-o", "--output", help="Output file in SDF format. Can be gzipped (*.gz)." + ) + parser.add_argument( + "clusters", nargs="*", help="One or more SDF files with the clustered hits" + ) + parser.add_argument( + "--filter-value", + type=float, + help="Filter out values with scores less than this.", + ) + parser.add_argument("--filter-field", help="Field to use to filter values.") args = parser.parse_args() utils.log("Max SuCOS Args: ", args) - process(args.input, args.clusters, args.output, args.filter_value, args.filter_field) + process( + args.input, args.clusters, args.output, args.filter_value, args.filter_field + ) if __name__ == "__main__":
--- a/utils.py Tue Jul 28 12:11:29 2020 +0000 +++ b/utils.py Wed Apr 14 09:28:29 2021 +0000 @@ -4,40 +4,54 @@ """ from __future__ import print_function -import sys, gzip + +import gzip +import sys + from rdkit import Chem + def log(*args, **kwargs): - """Log output to STDERR - """ + """Log output to STDERR""" print(*args, file=sys.stderr, **kwargs) + def open_file_for_reading(filename): """Open the file gunzipping it if it ends with .gz.""" - if filename.lower().endswith('.gz'): - return gzip.open(filename, 'rb') + if filename.lower().endswith(".gz"): + return gzip.open(filename, "rb") else: - return open(filename, 'rb') + return open(filename, "rb") + def open_file_for_writing(filename): - if filename.lower().endswith('.gz'): - return gzip.open(filename, 'at') + if filename.lower().endswith(".gz"): + return gzip.open(filename, "at") else: - return open(filename, 'w+') + return open(filename, "w+") + def read_single_molecule(filename, index=1, format=None): """Read a single molecule as a RDKit Mol object. This can come from a file in molfile or SDF format. If SDF then you can also specify an index of the molecule that is read (default is the first) """ mol = None - if format == 'mol' or filename.lower().endswith('.mol') or filename.lower().endswith('.mol.gz'): + if ( + format == "mol" + or filename.lower().endswith(".mol") + or filename.lower().endswith(".mol.gz") + ): file = open_file_for_reading(filename) mol = Chem.MolFromMolBlock(file.read()) file.close() - elif format == 'sdf' or filename.lower().endswith('.sdf') or filename.lower().endswith('.sdf.gz'): + elif ( + format == "sdf" + or filename.lower().endswith(".sdf") + or filename.lower().endswith(".sdf.gz") + ): file = open_file_for_reading(filename) supplier = Chem.ForwardSDMolSupplier(file) - for i in range(0,index): + for i in range(0, index): if supplier.atEnd(): break mol = next(supplier) @@ -46,4 +60,4 @@ if not mol: raise ValueError("Unable to read molecule") - return mol \ No newline at end of file + return mol