Source code for jube2.util.yaml_converter

# JUBE Benchmarking Environment
# Copyright (C) 2008-2024
# Forschungszentrum Juelich GmbH, Juelich Supercomputing Centre
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <>.
"""YAML to XML converter"""

from __future__ import (print_function,

import xml.etree.ElementTree as etree
import xml.dom.minidom as DOM
    import ruamel.yaml
except ImportError:
    import yaml
except ImportError:
import jube2.log
import jube2.conf
import jube2.util.output
import os
import copy
import jube2.util.util
    from StringIO import StringIO as IOStream
except ImportError:
    from io import BytesIO as IOStream

LOGGER = jube2.log.get_logger(__name__)

[docs]class YAML_Converter(object): """YAML to XML converter""" allowed_tags = \ {"/": ["benchmark", "parameterset", "comment", "step", "fileset", "substituteset", "analyser", "result", "patternset", "selection", "include-path", "check_tags"], "/benchmark": ["benchmark", "parameterset", "fileset", "substituteset", "patternset", "selection", "include-path", "check_tags"], "benchmark": ["parameterset", "comment", "step", "fileset", "substituteset", "analyser", "result", "patternset"], "analyse": ["file"], "analyser": ["use", "analyse"], "fileset": ["link", "copy", "prepare"], "include-path": ["path"], "parameterset": ["parameter"], "patternset": ["pattern"], "result": ["use", "table", "syslog", "database"], "selection": ["not", "only", "tag"], "step": ["use", "do"], "substituteset": ["iofile", "sub"], "syslog": ["key"], "table": ["column"], "database": ["key"]} def __init__(self, path, include_path=None, tags=None): self._path = path if include_path is None: include_path = [] if tags is None: tags = set() self._include_path = list(include_path) self._include_path += [os.path.dirname(self._path)] self._tags = set(tags) try: yaml.add_constructor("!include", self.__yaml_include) except NameError: raise NameError("yaml module not available; either install it " + "(, or switch to .xml input " + "files.") self._ignore_search_errors = True self._tags.update(self.__search_for_tags()) old_tags = set(self._tags) changed = True counter = 0 # It is possible to add new tags by including external files into a # selection block therefore the input must be scanned multiple times # to gather all available tags while changed and counter < jube2.conf.PREPROCESS_MAX_ITERATION: self._include_path = list(include_path) + \ self.__search_for_include_pathes() + \ [os.path.dirname(self._path)] self._tags.update(self.__search_for_tags()) changed = len(self._tags.difference(old_tags)) > 0 old_tags = set(self._tags) counter += 1 self._ignore_search_errors = False self._int_file = IOStream() self.__convert() def __convert(self): """ Opens given file, make a Tree of it and print it """ # Check the validity of the yaml file with open(self._path, "r") as file_handle: try: ruamel.yaml.YAML().load(file_handle) except NameError: pass except ruamel.yaml.constructor.DuplicateKeyError as e: e.note="" raise(e) LOGGER.debug(" Start YAML to XML file conversion for file {0}".format( self._path)) # Read the yaml file and create an xml tree with open(self._path, "r") as file_handle: xmltree = etree.Element('jube') data = yaml.load(, Loader=yaml.Loader) YAML_Converter.create_headtags(data, xmltree, self._include_path) xml = jube2.util.output.element_tree_tostring( xmltree, encoding="UTF-8") self._int_file.write(xml.encode('UTF-8')) LOGGER.debug(" YAML Conversion finalized")
[docs] def read(self): """Read data of converted file""" return self._int_file.getvalue()
[docs] def close(self): """Close converted file""" self._int_file.close()
def __find_include_file(self, filename): """Search for filename in include-pathes and return resulting path""" for path in self._include_path: file_path = os.path.join(path, filename) if os.path.exists(file_path): break else: raise ValueError(("\"{0}\" not found in possible " + "include pathes").format(filename)) return file_path def __search_for_tags(self): """Search a YAML file for stored tag information""" tags = set() with open(self._path, "r") as file_handle: data = yaml.load(, Loader=yaml.Loader) if "selection" in data and "tag" in data["selection"]: if type(data["selection"]["tag"]) is not list: data["selection"]["tag"] = [data["selection"]["tag"]] for tag in data["selection"]["tag"]: if not tag.startswith("!include "): tags.update( set(tag.split(jube2.conf.DEFAULT_SEPARATOR))) return tags def __search_for_include_pathes(self): """Search a YAML file for stored include-path information""" include_pathes = [] with open(self._path, "r") as file_handle: data = yaml.load(, Loader=yaml.Loader) # include-path is only allowed on the top level of the tree if "include-path" in data: if type(data["include-path"]) is not list: data["include-path"] = [data["include-path"]] values = self.__search_for_pathes(data["include-path"]) for val in values: include_pathes.append(os.path.join( os.path.dirname(self._path), val)) return include_pathes def __search_for_pathes(self, data): """Search in given data for stored path informations""" paths = [] for path in data: if type(path) is dict: if "tag" in path and not jube2.util.util.valid_tags(path["tag"], self._tags): return value = path["path"] if "path" in path else path["_"] if type(value) is not list: value = [value] path_val = self.__search_for_pathes(value) paths.extend(path_val) elif type(path) is list: path_val = self.__search_for_pathes(path) paths.extend(path_val) else: paths.append(path) return paths # adapted from # def __yaml_include(self, loader, node): """ Constructor for the include tag""" yaml_node_data = node.value.split(":") try: file = self.__find_include_file(yaml_node_data[0]) if os.path.normpath(file) == os.path.normpath(self._path): # Avoid recursive !include loops loader = yaml.BaseLoader else: loader = yaml.Loader with open(file) as inputfile: try: _ = yaml.load(, Loader=loader) except yaml.parser.ParserError: LOGGER.error(("Including data from \"{0}\" into \"{1}\" " + "raised an error.").format(file, self._path)) raise inputfile.close() if len(yaml_node_data) > 1: _ = eval("_" + yaml_node_data[1]) if len(yaml_node_data) > 2: _ = eval(yaml_node_data[2]) return _ except ValueError as ve: if self._ignore_search_errors: return "!include {0}".format(node.value) else: raise ve
[docs] @staticmethod def create_headtags(data, parent_node, include_pathes): """ Search for the headtags in given dictionary """ if type(data) is not dict: data = {'benchmark': data} to_delete = list() for tag in data.keys(): # Override include-path with parsed include-path if tag == "include-path": data[tag] = include_pathes if type(data[tag]) is not list: data[tag] = [data[tag]] # benchmark is optional on the top level, but if it is used only # a limited number of options are allowed on top level # (listed in "/benchmark") if "benchmark" in data and tag in YAML_Converter.allowed_tags[ "/benchmark"]: for attr_and_tags in data[tag]: YAML_Converter.create_tag(tag, attr_and_tags, parent_node) elif "benchmark" not in data and \ tag in YAML_Converter.allowed_tags["/"]: if tag not in YAML_Converter.allowed_tags["benchmark"]: for attr_and_tags in data[tag]: YAML_Converter.create_tag( tag, attr_and_tags, parent_node) to_delete.append(tag) for tag in to_delete: del(data[tag]) if "benchmark" not in data: YAML_Converter.create_tag("benchmark", data, parent_node)
[docs] @staticmethod def create_tag(new_node_name, data, parent_node): """ Create the Subtag name, search for known tags and set the given attributes""" LOGGER.debug(" Create XML tag <{0}>".format(new_node_name)) new_node = etree.SubElement(parent_node, new_node_name) # Check if tag can have subtags if new_node_name in YAML_Converter.allowed_tags and type(data) is dict: allowed_tags = YAML_Converter.allowed_tags[new_node_name] for key, value in data.items(): if (type(value) is not list): value = [value] for val in value: if key in allowed_tags: # Create new subtag YAML_Converter.create_tag(key, val, new_node) else: # Create attribute new_node.set(key, str(val) if val is not None else "") else: tag_value = "" if type(data) is not dict: # standard tag value tag_value = data if data is not None else "" else: for key, value in data.items(): if key == "_": # _ represents the standard tag value tag_value = value if value is not None else "" else: # Create attribute new_node.set(key, str(value) if value is not None else "") if type(tag_value) is list: new_node.text = str(tag_value.pop(0)) while len(tag_value) > 0: new_node = copy.deepcopy(new_node) parent_node.append(new_node) new_node.text = str(tag_value.pop(0)) else: new_node.text = str(tag_value)
[docs] @staticmethod def is_parseable_yaml_file(filename): try: with open(filename, "r") as file_handle: if type(yaml.load( is str: return False else: return True except Exception as parseerror: return False