Mercurial > repos > holtgrewe > ngs_roi
diff ctd2galaxy.py @ 0:61d9bdb6d519 draft
Uploaded
| author | holtgrewe |
|---|---|
| date | Thu, 18 Apr 2013 08:03:38 -0400 |
| parents | |
| children | 170e48a55078 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/ctd2galaxy.py Thu Apr 18 08:03:38 2013 -0400 @@ -0,0 +1,527 @@ +#!/usr/bin/env python +"""Conversion of the CTD format into Galaxy XML. + +The CTD parser should be reusable but is not in its own module since it is +only used here at the moment. +""" + +import argparse +import operator +import sys +import xml.sax +import xml.sax.saxutils + +class CTDFormatException(Exception): + """Raised when there is a format error in CTD.""" + + +class CLIElement(object): + """Represents a <clielement> tag. + + :ivar option_identifier: with parameters (e.g. --param), empty if argument. + :type option_identifier: str + :ivar is_list: whether the element is a list. + :type is_list: bool + :ivar param_node: link to ParametersNode, set after parsing, None if unset + :ivar is_list: w or not this element is a list. + :type is_list: bool + """ + + def __init__(self, option_identifier='', mapping_path='', is_list=False): + """Initialize object.""" + self.option_identifier = option_identifier + self.param_node = None # Link to ParametersNode, set after parsing. + self.mapping_path = mapping_path + self.is_list = is_list + + def __str__(self): + """String representaiton of CLIElement.""" + t = (self.option_identifier, self.mapping_path, self.is_list) + return 'CLIElement(%s, %s, %s)' % tuple(map(repr, list(t))) + + +class ParametersNode(object): + """Represents a <NODE> tag inside the <PARAMETERS> tags. + + :ivar name: name attribute of the node + :ivar description: text for description attribute of the node + :ivar value: value attribute of the node + :ivar type_: type attribute of the node + :ivar tags: tags attribute of the node + :ivar supported_formats: supported_format attribute of the node + :ivar restrictions: restrictions attribute of the node + :ivar path: the path to the node + :ivar path: list of strings + :ivar parent: link to the parent of the node + :ivar children: children of the node + :type children: dict with name to node mapping + :ivar cli_element: CLIElement that this parameter is mapped to. + """ + + def __init__(self, kind='', name='', description='', value='', type_='', tags='', + restrictions='', supported_formats=''): + """Initialize the object.""" + self.kind = kind + self.name = name + self.description = description + self.value = value + self.type_ = type_ + self.tags = tags + self.supported_formats = supported_formats + self.restrictions = restrictions + self.path = None # root if is None + self.parent = None # not set, usually a list + self.children = {} + self.cli_element = None + + def computePath(self, is_root=True, path=[]): + """Compute path entry from parent links. + + :param is_root: whether or not this is the root node + :type is_root: bool + :param path: path to this node, excluding root + :type path: list of strings + """ + self.path = list(path) + if not is_root: + self.path.append(self.name) + if not self.children: + return # nothing to do: early exit. + for name, child in self.children.items(): + child.computePath(False, self.path) + + def applyFunc(self, f): + """Apply f to self and all children.""" + f(self) + for c in self.children.values(): + c.applyFunc(f) + + def find(self, path): + """Return ParametersNode object at the path below the node.""" + if not path: + return self + if not self.children.get(path[0]): + return None + return self.children[path[0]].find(path[1:]) + + def __str__(self): + """Return string representation.""" + t = (self.name, self.description, self.value, self.type_, self.tags, + self.supported_formats, self.children, self.path) + return 'ParametersNode(%s, %s, %s, %s, %s, %s, %s, path=%s)' % tuple(map(repr, t)) + + def __repr__(self): + """Return programmatic representation, same as __str__().""" + return str(self) + + +class Tool(object): + """Represents the top-level <tool> tag from a CTD file. + + :ivar name: name attribute value + :type name: str + :ivar executable_name: executableName attribute value + :type executable_name: str + :ivar version: version attribute value + :type version: str + :ivar description: description attribute value + :type description: str + :ivar manual: manual attribute value + :type manual: str + :ivar doc_url: docurl attribute value + :type doc_url: str + :ivar category: category attribute value + :type category: str + :ivar cli_elements: list of CLIElement objects + :ivar parameters: root parameters node + :type parameters: ParametersNode + """ + + def __init__(self, name='', executable_name='', version='', + description='', manual='', doc_url='', + category=''): + self.name = name + self.executable_name = executable_name + self.version = version + self.description = description + self.manual = manual + self.doc_url = doc_url + self.category = category + self.cli_elements = [] + self.parameters = None + + def parsingDone(self): + """Called after parsing is done. + + The method will compute the paths of the parameter nodes and link the + CLIElement objects in self.cli_elements to the ParameterNode objects. + """ + self.parameters.computePath() + for ce in self.cli_elements: + if not ce.option_identifier: + continue # Skip arguments + path = ce.mapping_path.split('.') + node = self.parameters.find(path) + if not node: + raise CTDFormatException('Unknown parameter %s' % '.'.join(path)) + ce.param_node = node + node.cli_element = ce + + def __str__(self): + t = (self.name, self.executable_name, self.version, self.description, + self.manual, self.doc_url, self.category) + return 'Tool(%s, %s, %s, %s, %s, %s, %s)' % tuple(map(repr, list(t))) + + + +class CTDHandler(xml.sax.handler.ContentHandler): + def __init__(self): + self.result = None + # A stack of tag names that are currently open. + self.stack = [] + # The current parameter to append nodes below. + self.parameter_node = None + + def startElement(self, name, attrs): + """Handle start of element.""" + # Maintain a stack of open tags. + self.stack.append(name) + # Handle the individual cases. The innermost tag is self.stack[-1]. + if self.stack == ['tool']: + # Create the top level Tool object. + self.tool = Tool() + self.result = self.tool + elif self.stack == ['tool', 'cli', 'clielement']: + # Create a new CLIElement object for a <clieelement> tag. + if not attrs.get('isList'): + raise CTDFormatException('No attribute isList in <clielement>.') + if attrs.get('optionIdentifier') is None: + raise CTDFormatException('no attribute optionIdentifier in <clielement>.') + is_list = (attrs.get('isList') == 'false') + option_identifier = attrs.get('optionIdentifier') + self.tool.cli_elements.append(CLIElement(option_identifier=option_identifier, is_list=is_list)) + elif self.stack == ['tool', 'cli', 'clielement', 'mapping']: + # Handle a <mapping> sub entry of a <clieelement> tag. + if not attrs.get('referenceName'): + raise CTDFormatException('no attribute referenceName in <mapping>') + self.tool.cli_elements[-1].mapping_path = attrs['referenceName'] + elif self.stack == ['tool', 'PARAMETERS']: + # Handle the <PARAMETERS> entry by creating a new top parameters node. + self.tool.parameters = ParametersNode(kind='node', name='<root>') + self.parameter_node = self.tool.parameters + elif self.stack[:2] == ['tool', 'PARAMETERS'] and self.stack[-1] == 'NODE': + # Create a new node ParametersNode for the <PARAMETERS> entry. + if not attrs.get('name'): + raise CTDFormatException('no attribute name in <NODE>') + name = attrs.get('name') + node = ParametersNode(kind='node', name=name) + node.parent = self.parameter_node + self.parameter_node.children[name] = node + self.parameter_node = node + elif self.stack[:2] == ['tool', 'PARAMETERS'] and self.stack[-1] == 'ITEM': + # Create a new item ParametersNode for the <ITEM> entry. + if not attrs.get('name'): + raise CTDFormatException('no attribute name in <ITEM>') + name = attrs.get('name') + value = attrs.get('value') + type_ = attrs.get('type') + tags = attrs.get('tags') + description = attrs.get('description') + restrictions = attrs.get('restrictions') + supported_formats = attrs.get('supported_formats') + child = ParametersNode( + kind='item', name=name, description=description, value=value, + type_=type_, tags=tags, supported_formats=supported_formats, + restrictions=restrictions) + self.parameter_node.children[name] = child + + def endElement(self, name): + """Handle closing tag.""" + # Maintain stack. + self.stack.pop() + # Go up one node in the parameters tree if </NODE> + if name == 'NODE': + self.parameter_node = self.parameter_node.parent + + def characters(self, content): + """Handle characters in XML file.""" + if self.stack == ['tool', 'name']: + self.tool.name += content + elif self.stack == ['tool', 'executableName']: + self.tool.executable_name += content + elif self.stack == ['tool', 'version']: + self.tool.version += content + elif self.stack == ['tool', 'description']: + self.tool.description += content + elif self.stack == ['tool', 'manual']: + self.tool.manual += content + elif self.stack == ['tool', 'docurl']: + self.tool.doc_url += content + elif self.stack == ['tool', 'category']: + self.tool.category += content + + +class CTDParser(object): + """Parser for CTD files.""" + + def __init__(self): + self.handler = CTDHandler() + + def parse(self, path): + # Parse XML into Tool object. + parser = xml.sax.make_parser() + parser.setContentHandler(self.handler) + parser.parse(path) + # Compute paths for tool's parameters. + self.handler.result.parsingDone() + return self.handler.result + + +class XMLWriter(object): + """Base class for XML writers. + + + :ivar result: list of strings that are joined for the final XML + :ivar indent_level: int with the indentation level + """ + + def __init__(self): + self.result = [] + self.indent_level = 0 + + def indent(self): + """Return indentation whitespace.""" + return ' ' * self.indent_level + + def appendTag(self, tag, text='', args={}): + """Append a tag to self.result with text content only or no content at all.""" + e = xml.sax.saxutils.quoteattr + args_str = ' '.join('%s=%s' % (key, e(str(value))) for key, value in args.items()) + if args_str: + args_str = ' '+ args_str + vals = {'indent': self.indent(), + 'tag': tag, + 'text': text.strip(), + 'args': args_str} + if text: + self.result.append('%(indent)s<%(tag)s%(args)s>%(text)s</%(tag)s>\n' % vals) + else: + self.result.append('%(indent)s<%(tag)s%(args)s />\n' % vals) + + def openTag(self, tag, args={}): + """Append an opening tag to self.result.""" + e = xml.sax.saxutils.quoteattr + args_str = ' '.join('%s=%s' % (key, e(str(value))) for key, value in args.items()) + if args_str: + args_str = ' ' + args_str + vals = {'indent': self.indent(), + 'tag': tag, + 'args': args_str} + self.result.append('%(indent)s<%(tag)s%(args)s>\n' % vals) + + def closeTag(self, tag): + """Append a closing tag to self.result.""" + vals = {'indent': self.indent(), 'tag': tag} + self.result.append('%(indent)s</%(tag)s>\n' % vals) + + def handleParameters(self, node): + """Recursion for appending tags for ParametersNode.""" + for pn in node.children.values(): + if pn.kind == 'item': + args = {'name': pn.name, + 'value': pn.value, + 'type': pn.type_, + 'description': pn.description, + 'restrictions': pn.restrictions, + 'tags': pn.tags} + self.appendTag('ITEM', args=args) + else: # node.kind == 'node' + args = {'name': pn.name, + 'description': pn.description} + self.openTag('NODE', args=args) + self.indent_level += 1 + self.handleParameters(pn) + self.indent_level -= 1 + self.closeTag('NODE') + + +class CTDWriter(XMLWriter): + """Write a Tool to CTD format.""" + + def run(self, tool, f): + """Write the given Tool to file f.""" + self.result.append('<?xml version="1.0" encoding="UTF-8"?>\n') + self.openTag('tool') + self.indent_level += 1 + self.appendTag('name', tool.name) + self.appendTag('executableName', tool.executable_name) + self.appendTag('version', tool.version) + self.appendTag('description', tool.description) + self.appendTag('manual', tool.manual) + self.appendTag('docurl', tool.doc_url) + self.appendTag('category', tool.category) + # <cli> and <clielement> group + self.openTag('cli') + self.indent_level += 1 + for ce in tool.cli_elements: + self.openTag('clielement', args={'optionIdentifier': ce.option_identifier, + 'isList': {True: 'true', False: 'false'}[ce.is_list]}) + self.indent_level += 1 + self.appendTag('mapping', args={'referenceName': ce.mapping_path}) + self.indent_level -= 1 + self.closeTag('clielement') + self.indent_level -= 1 + self.closeTag('cli') + # <PARAMETERS>, <NODE>, <ITEM> group + self.openTag('PARAMETERS', args={'version': 1.4, + 'xsi:noNamespaceSchemaLocation': 'http://open-ms.sourceforge.net/schemas/Param_1_4.xsd', + 'xmlns:xsi': 'http://www.w3.org/2001/XMLSchema-instance'}) + self.indent_level += 1 + self.handleParameters(tool.parameters) + self.indent_level -= 1 + self.closeTag('PARAMETERS') + self.indent_level -= 1 + self.closeTag('tool') + # Write result + for x in self.result: + f.write(x) + + +class GalaxyWriter(XMLWriter): + """Write a Tool to the Galaxy format.""" + + def run(self, tool, f): + """Write the given Tool to file f.""" + self.result.append('<?xml version="1.0" encoding="UTF-8"?>\n') + self.openTag('tool', {'id': tool.executable_name, 'name': tool.name}) + self.indent_level += 1 + self.addCommandTag(tool) + self.appendTag('description', text=tool.description) + self.openTag('inputs') + self.indent_level += 1 + tool.parameters.applyFunc(lambda x: self.addInputParam(x)) + self.indent_level -= 1 + self.closeTag('inputs') + self.openTag('outputs') + self.indent_level += 1 + tool.parameters.applyFunc(lambda x: self.addOutputParam(x)) + self.indent_level -= 1 + self.closeTag('outputs') + self.openTag('stdio') + self.indent_level += 1 + self.appendTag('exit_code', args={'range': '1:', 'level': 'fatal'}) + self.appendTag('exit_code', args={'range': ':-1', 'level': 'fatal'}) + self.indent_level -= 1 + self.closeTag('stdio') + self.indent_level -= 1 + self.closeTag('tool') + # Write result + for x in self.result: + f.write(x) + + def addInputParam(self, param_node): + """Add a ParametersNode object if it is to go to <inputs>.""" + if param_node.tags and 'output file' in param_node.tags.split(','): + return # Skip output files + if param_node.kind != 'item': + return # Skip if not item. + args = {} + if param_node.tags and 'input file' in param_node.tags.split(','): + args['type'] = 'data' + args['format'] = ','.join([x.replace('*', '').replace('.', '') + for x in param_node.supported_formats.split(',')]) + args['name'] = '_'.join(param_node.path).replace('-', '_').replace('.', '_') + args['label'] = param_node.description + args['type'] = 'data' + self.appendTag('param', args=args) + else: + TYPE_MAP = { + 'string': 'text', + 'double': 'float', + 'int': 'integer' + } + args['type'] = TYPE_MAP[param_node.type_] + args['name'] = '_'.join(param_node.path).replace('-', '_').replace('.', '_') + args['label'] = param_node.description + if param_node.type_ == 'string' and param_node.restrictions and \ + sorted(param_node.restrictions.split(',')) == ['false', 'true']: + args['type'] = 'boolean' + if param_node.value == 'true': + args['checked'] = 'true' + args['truevalue'] = param_node.cli_element.option_identifier + args['falsevalue'] = '' + self.appendTag('param', args=args) + return + args['value'] = param_node.value + if param_node.type_ == 'string' and param_node.restrictions: + args['type'] = 'select' + self.openTag('param', args=args) + self.indent_level += 1 + for v in param_node.restrictions.split(','): + self.appendTag('option', v, {'value': v}) + self.indent_level -= 1 + self.closeTag('param') + else: + self.appendTag('param', args=args) + + def addOutputParam(self, param_node): + """Add a ParametersNode object if it is to go to <inputs>.""" + if not param_node.tags or not 'output file' in param_node.tags.split(','): + return # Only add for output files. + args = {} + if '.' in param_node.supported_formats: + args['format'] = param_node.supported_formats.split(',')[0].split('.')[-1] + else: + args['format'] = param_node.supported_formats.split(',')[0].split('*')[-1] + args['name'] = '_'.join(param_node.path).replace('-', '_').replace('.', '_') + args['label'] = param_node.description + self.appendTag('data', args=args) + + def addCommandTag(self, tool): + """Write <command> tag to self.result.""" + lst = [] + for ce in tool.cli_elements: + bool_param = False + if ce.param_node.type_ == 'string' and ce.param_node.restrictions and \ + sorted(ce.param_node.restrictions.split(',')) == ['false', 'true']: + bool_param = True + if not bool_param and ce.option_identifier: + lst.append(ce.option_identifier) + # The path mapping is not ideal but should work OK. + lst.append('$' + ce.mapping_path.replace('-', '_').replace('.', '_')) + txt = [tool.executable_name] + lst + self.appendTag('command', text=' '.join(txt)) + + +def main(): + """Main function.""" + # Setup argument parser. + parser = argparse.ArgumentParser(description='Convert CTD to Galaxy XML') + parser.add_argument('-i', '--in-file', metavar='FILE', + help='CTD file to read.', dest='in_file', + required=True) + parser.add_argument('-o', '--out-file', metavar='FILE', + help='File to write. Output type depends on extension.', + dest='out_file', required=True) + + args = parser.parse_args() + + # Parse input. + sys.stderr.write('Parsing %s...\n' % args.in_file) + ctd_parser = CTDParser() + tool = ctd_parser.parse(args.in_file) + + # Write output. + sys.stderr.write('Writing to %s...\n' % args.out_file) + if args.out_file.endswith('.ctd'): + writer = CTDWriter() + else: + writer = GalaxyWriter() + with open(args.out_file, 'wb') as f: + writer.run(tool, f) + + return 0 + + +if __name__ == '__main__': + sys.exit(main())
