Mercurial > repos > holtgrewe > ngs_roi
view ctd2galaxy.py @ 2:08cb79ffac4c draft
Uploaded
| author | holtgrewe |
|---|---|
| date | Mon, 06 May 2013 12:46:46 -0400 |
| parents | 61d9bdb6d519 |
| children | 170e48a55078 |
line wrap: on
line source
#!/usr/bin/env python """Conversion of the CTD format into Galaxy XML. The CTD parser should be reusable but is not in its own module since it is only used here at the moment. """ import argparse import operator import sys import xml.sax import xml.sax.saxutils class CTDFormatException(Exception): """Raised when there is a format error in CTD.""" class CLIElement(object): """Represents a <clielement> tag. :ivar option_identifier: with parameters (e.g. --param), empty if argument. :type option_identifier: str :ivar is_list: whether the element is a list. :type is_list: bool :ivar param_node: link to ParametersNode, set after parsing, None if unset :ivar is_list: w or not this element is a list. :type is_list: bool """ def __init__(self, option_identifier='', mapping_path='', is_list=False): """Initialize object.""" self.option_identifier = option_identifier self.param_node = None # Link to ParametersNode, set after parsing. self.mapping_path = mapping_path self.is_list = is_list def __str__(self): """String representaiton of CLIElement.""" t = (self.option_identifier, self.mapping_path, self.is_list) return 'CLIElement(%s, %s, %s)' % tuple(map(repr, list(t))) class ParametersNode(object): """Represents a <NODE> tag inside the <PARAMETERS> tags. :ivar name: name attribute of the node :ivar description: text for description attribute of the node :ivar value: value attribute of the node :ivar type_: type attribute of the node :ivar tags: tags attribute of the node :ivar supported_formats: supported_format attribute of the node :ivar restrictions: restrictions attribute of the node :ivar path: the path to the node :ivar path: list of strings :ivar parent: link to the parent of the node :ivar children: children of the node :type children: dict with name to node mapping :ivar cli_element: CLIElement that this parameter is mapped to. """ def __init__(self, kind='', name='', description='', value='', type_='', tags='', restrictions='', supported_formats=''): """Initialize the object.""" self.kind = kind self.name = name self.description = description self.value = value self.type_ = type_ self.tags = tags self.supported_formats = supported_formats self.restrictions = restrictions self.path = None # root if is None self.parent = None # not set, usually a list self.children = {} self.cli_element = None def computePath(self, is_root=True, path=[]): """Compute path entry from parent links. :param is_root: whether or not this is the root node :type is_root: bool :param path: path to this node, excluding root :type path: list of strings """ self.path = list(path) if not is_root: self.path.append(self.name) if not self.children: return # nothing to do: early exit. for name, child in self.children.items(): child.computePath(False, self.path) def applyFunc(self, f): """Apply f to self and all children.""" f(self) for c in self.children.values(): c.applyFunc(f) def find(self, path): """Return ParametersNode object at the path below the node.""" if not path: return self if not self.children.get(path[0]): return None return self.children[path[0]].find(path[1:]) def __str__(self): """Return string representation.""" t = (self.name, self.description, self.value, self.type_, self.tags, self.supported_formats, self.children, self.path) return 'ParametersNode(%s, %s, %s, %s, %s, %s, %s, path=%s)' % tuple(map(repr, t)) def __repr__(self): """Return programmatic representation, same as __str__().""" return str(self) class Tool(object): """Represents the top-level <tool> tag from a CTD file. :ivar name: name attribute value :type name: str :ivar executable_name: executableName attribute value :type executable_name: str :ivar version: version attribute value :type version: str :ivar description: description attribute value :type description: str :ivar manual: manual attribute value :type manual: str :ivar doc_url: docurl attribute value :type doc_url: str :ivar category: category attribute value :type category: str :ivar cli_elements: list of CLIElement objects :ivar parameters: root parameters node :type parameters: ParametersNode """ def __init__(self, name='', executable_name='', version='', description='', manual='', doc_url='', category=''): self.name = name self.executable_name = executable_name self.version = version self.description = description self.manual = manual self.doc_url = doc_url self.category = category self.cli_elements = [] self.parameters = None def parsingDone(self): """Called after parsing is done. The method will compute the paths of the parameter nodes and link the CLIElement objects in self.cli_elements to the ParameterNode objects. """ self.parameters.computePath() for ce in self.cli_elements: if not ce.option_identifier: continue # Skip arguments path = ce.mapping_path.split('.') node = self.parameters.find(path) if not node: raise CTDFormatException('Unknown parameter %s' % '.'.join(path)) ce.param_node = node node.cli_element = ce def __str__(self): t = (self.name, self.executable_name, self.version, self.description, self.manual, self.doc_url, self.category) return 'Tool(%s, %s, %s, %s, %s, %s, %s)' % tuple(map(repr, list(t))) class CTDHandler(xml.sax.handler.ContentHandler): def __init__(self): self.result = None # A stack of tag names that are currently open. self.stack = [] # The current parameter to append nodes below. self.parameter_node = None def startElement(self, name, attrs): """Handle start of element.""" # Maintain a stack of open tags. self.stack.append(name) # Handle the individual cases. The innermost tag is self.stack[-1]. if self.stack == ['tool']: # Create the top level Tool object. self.tool = Tool() self.result = self.tool elif self.stack == ['tool', 'cli', 'clielement']: # Create a new CLIElement object for a <clieelement> tag. if not attrs.get('isList'): raise CTDFormatException('No attribute isList in <clielement>.') if attrs.get('optionIdentifier') is None: raise CTDFormatException('no attribute optionIdentifier in <clielement>.') is_list = (attrs.get('isList') == 'false') option_identifier = attrs.get('optionIdentifier') self.tool.cli_elements.append(CLIElement(option_identifier=option_identifier, is_list=is_list)) elif self.stack == ['tool', 'cli', 'clielement', 'mapping']: # Handle a <mapping> sub entry of a <clieelement> tag. if not attrs.get('referenceName'): raise CTDFormatException('no attribute referenceName in <mapping>') self.tool.cli_elements[-1].mapping_path = attrs['referenceName'] elif self.stack == ['tool', 'PARAMETERS']: # Handle the <PARAMETERS> entry by creating a new top parameters node. self.tool.parameters = ParametersNode(kind='node', name='<root>') self.parameter_node = self.tool.parameters elif self.stack[:2] == ['tool', 'PARAMETERS'] and self.stack[-1] == 'NODE': # Create a new node ParametersNode for the <PARAMETERS> entry. if not attrs.get('name'): raise CTDFormatException('no attribute name in <NODE>') name = attrs.get('name') node = ParametersNode(kind='node', name=name) node.parent = self.parameter_node self.parameter_node.children[name] = node self.parameter_node = node elif self.stack[:2] == ['tool', 'PARAMETERS'] and self.stack[-1] == 'ITEM': # Create a new item ParametersNode for the <ITEM> entry. if not attrs.get('name'): raise CTDFormatException('no attribute name in <ITEM>') name = attrs.get('name') value = attrs.get('value') type_ = attrs.get('type') tags = attrs.get('tags') description = attrs.get('description') restrictions = attrs.get('restrictions') supported_formats = attrs.get('supported_formats') child = ParametersNode( kind='item', name=name, description=description, value=value, type_=type_, tags=tags, supported_formats=supported_formats, restrictions=restrictions) self.parameter_node.children[name] = child def endElement(self, name): """Handle closing tag.""" # Maintain stack. self.stack.pop() # Go up one node in the parameters tree if </NODE> if name == 'NODE': self.parameter_node = self.parameter_node.parent def characters(self, content): """Handle characters in XML file.""" if self.stack == ['tool', 'name']: self.tool.name += content elif self.stack == ['tool', 'executableName']: self.tool.executable_name += content elif self.stack == ['tool', 'version']: self.tool.version += content elif self.stack == ['tool', 'description']: self.tool.description += content elif self.stack == ['tool', 'manual']: self.tool.manual += content elif self.stack == ['tool', 'docurl']: self.tool.doc_url += content elif self.stack == ['tool', 'category']: self.tool.category += content class CTDParser(object): """Parser for CTD files.""" def __init__(self): self.handler = CTDHandler() def parse(self, path): # Parse XML into Tool object. parser = xml.sax.make_parser() parser.setContentHandler(self.handler) parser.parse(path) # Compute paths for tool's parameters. self.handler.result.parsingDone() return self.handler.result class XMLWriter(object): """Base class for XML writers. :ivar result: list of strings that are joined for the final XML :ivar indent_level: int with the indentation level """ def __init__(self): self.result = [] self.indent_level = 0 def indent(self): """Return indentation whitespace.""" return ' ' * self.indent_level def appendTag(self, tag, text='', args={}): """Append a tag to self.result with text content only or no content at all.""" e = xml.sax.saxutils.quoteattr args_str = ' '.join('%s=%s' % (key, e(str(value))) for key, value in args.items()) if args_str: args_str = ' '+ args_str vals = {'indent': self.indent(), 'tag': tag, 'text': text.strip(), 'args': args_str} if text: self.result.append('%(indent)s<%(tag)s%(args)s>%(text)s</%(tag)s>\n' % vals) else: self.result.append('%(indent)s<%(tag)s%(args)s />\n' % vals) def openTag(self, tag, args={}): """Append an opening tag to self.result.""" e = xml.sax.saxutils.quoteattr args_str = ' '.join('%s=%s' % (key, e(str(value))) for key, value in args.items()) if args_str: args_str = ' ' + args_str vals = {'indent': self.indent(), 'tag': tag, 'args': args_str} self.result.append('%(indent)s<%(tag)s%(args)s>\n' % vals) def closeTag(self, tag): """Append a closing tag to self.result.""" vals = {'indent': self.indent(), 'tag': tag} self.result.append('%(indent)s</%(tag)s>\n' % vals) def handleParameters(self, node): """Recursion for appending tags for ParametersNode.""" for pn in node.children.values(): if pn.kind == 'item': args = {'name': pn.name, 'value': pn.value, 'type': pn.type_, 'description': pn.description, 'restrictions': pn.restrictions, 'tags': pn.tags} self.appendTag('ITEM', args=args) else: # node.kind == 'node' args = {'name': pn.name, 'description': pn.description} self.openTag('NODE', args=args) self.indent_level += 1 self.handleParameters(pn) self.indent_level -= 1 self.closeTag('NODE') class CTDWriter(XMLWriter): """Write a Tool to CTD format.""" def run(self, tool, f): """Write the given Tool to file f.""" self.result.append('<?xml version="1.0" encoding="UTF-8"?>\n') self.openTag('tool') self.indent_level += 1 self.appendTag('name', tool.name) self.appendTag('executableName', tool.executable_name) self.appendTag('version', tool.version) self.appendTag('description', tool.description) self.appendTag('manual', tool.manual) self.appendTag('docurl', tool.doc_url) self.appendTag('category', tool.category) # <cli> and <clielement> group self.openTag('cli') self.indent_level += 1 for ce in tool.cli_elements: self.openTag('clielement', args={'optionIdentifier': ce.option_identifier, 'isList': {True: 'true', False: 'false'}[ce.is_list]}) self.indent_level += 1 self.appendTag('mapping', args={'referenceName': ce.mapping_path}) self.indent_level -= 1 self.closeTag('clielement') self.indent_level -= 1 self.closeTag('cli') # <PARAMETERS>, <NODE>, <ITEM> group self.openTag('PARAMETERS', args={'version': 1.4, 'xsi:noNamespaceSchemaLocation': 'http://open-ms.sourceforge.net/schemas/Param_1_4.xsd', 'xmlns:xsi': 'http://www.w3.org/2001/XMLSchema-instance'}) self.indent_level += 1 self.handleParameters(tool.parameters) self.indent_level -= 1 self.closeTag('PARAMETERS') self.indent_level -= 1 self.closeTag('tool') # Write result for x in self.result: f.write(x) class GalaxyWriter(XMLWriter): """Write a Tool to the Galaxy format.""" def run(self, tool, f): """Write the given Tool to file f.""" self.result.append('<?xml version="1.0" encoding="UTF-8"?>\n') self.openTag('tool', {'id': tool.executable_name, 'name': tool.name}) self.indent_level += 1 self.addCommandTag(tool) self.appendTag('description', text=tool.description) self.openTag('inputs') self.indent_level += 1 tool.parameters.applyFunc(lambda x: self.addInputParam(x)) self.indent_level -= 1 self.closeTag('inputs') self.openTag('outputs') self.indent_level += 1 tool.parameters.applyFunc(lambda x: self.addOutputParam(x)) self.indent_level -= 1 self.closeTag('outputs') self.openTag('stdio') self.indent_level += 1 self.appendTag('exit_code', args={'range': '1:', 'level': 'fatal'}) self.appendTag('exit_code', args={'range': ':-1', 'level': 'fatal'}) self.indent_level -= 1 self.closeTag('stdio') self.indent_level -= 1 self.closeTag('tool') # Write result for x in self.result: f.write(x) def addInputParam(self, param_node): """Add a ParametersNode object if it is to go to <inputs>.""" if param_node.tags and 'output file' in param_node.tags.split(','): return # Skip output files if param_node.kind != 'item': return # Skip if not item. args = {} if param_node.tags and 'input file' in param_node.tags.split(','): args['type'] = 'data' args['format'] = ','.join([x.replace('*', '').replace('.', '') for x in param_node.supported_formats.split(',')]) args['name'] = '_'.join(param_node.path).replace('-', '_').replace('.', '_') args['label'] = param_node.description args['type'] = 'data' self.appendTag('param', args=args) else: TYPE_MAP = { 'string': 'text', 'double': 'float', 'int': 'integer' } args['type'] = TYPE_MAP[param_node.type_] args['name'] = '_'.join(param_node.path).replace('-', '_').replace('.', '_') args['label'] = param_node.description if param_node.type_ == 'string' and param_node.restrictions and \ sorted(param_node.restrictions.split(',')) == ['false', 'true']: args['type'] = 'boolean' if param_node.value == 'true': args['checked'] = 'true' args['truevalue'] = param_node.cli_element.option_identifier args['falsevalue'] = '' self.appendTag('param', args=args) return args['value'] = param_node.value if param_node.type_ == 'string' and param_node.restrictions: args['type'] = 'select' self.openTag('param', args=args) self.indent_level += 1 for v in param_node.restrictions.split(','): self.appendTag('option', v, {'value': v}) self.indent_level -= 1 self.closeTag('param') else: self.appendTag('param', args=args) def addOutputParam(self, param_node): """Add a ParametersNode object if it is to go to <inputs>.""" if not param_node.tags or not 'output file' in param_node.tags.split(','): return # Only add for output files. args = {} if '.' in param_node.supported_formats: args['format'] = param_node.supported_formats.split(',')[0].split('.')[-1] else: args['format'] = param_node.supported_formats.split(',')[0].split('*')[-1] args['name'] = '_'.join(param_node.path).replace('-', '_').replace('.', '_') args['label'] = param_node.description self.appendTag('data', args=args) def addCommandTag(self, tool): """Write <command> tag to self.result.""" lst = [] for ce in tool.cli_elements: bool_param = False if ce.param_node.type_ == 'string' and ce.param_node.restrictions and \ sorted(ce.param_node.restrictions.split(',')) == ['false', 'true']: bool_param = True if not bool_param and ce.option_identifier: lst.append(ce.option_identifier) # The path mapping is not ideal but should work OK. lst.append('$' + ce.mapping_path.replace('-', '_').replace('.', '_')) txt = [tool.executable_name] + lst self.appendTag('command', text=' '.join(txt)) def main(): """Main function.""" # Setup argument parser. parser = argparse.ArgumentParser(description='Convert CTD to Galaxy XML') parser.add_argument('-i', '--in-file', metavar='FILE', help='CTD file to read.', dest='in_file', required=True) parser.add_argument('-o', '--out-file', metavar='FILE', help='File to write. Output type depends on extension.', dest='out_file', required=True) args = parser.parse_args() # Parse input. sys.stderr.write('Parsing %s...\n' % args.in_file) ctd_parser = CTDParser() tool = ctd_parser.parse(args.in_file) # Write output. sys.stderr.write('Writing to %s...\n' % args.out_file) if args.out_file.endswith('.ctd'): writer = CTDWriter() else: writer = GalaxyWriter() with open(args.out_file, 'wb') as f: writer.run(tool, f) return 0 if __name__ == '__main__': sys.exit(main())
