Source code for jube2.analyser

# JUBE Benchmarking Environment
# Copyright (C) 2008-2024
# Forschungszentrum Juelich GmbH, Juelich Supercomputing Centre
# http://www.fz-juelich.de/jsc/jube
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""The Analyser class handles the analyse process"""

from __future__ import (print_function,
                        unicode_literals,
                        division)

import xml.etree.ElementTree as ET
import jube2.log
import os
import re
import glob
import math
import jube2.pattern
import jube2.util.util
import jube2.util.output

LOGGER = jube2.log.get_logger(__name__)


[docs]class Analyser(object): """The Analyser handles the analyse process and store all important data to run a new analyse."""
[docs] class AnalyseFile(object): """A file which should be analysed""" def __init__(self, path): self._path = path self._use = set()
[docs] def add_uses(self, use_names): """Add an addtional patternset name""" for use_name in use_names: if use_name in self._use: raise ValueError(("Element \"{0}\" can only be used once") .format(use_name)) self._use.add(use_name)
def __eq__(self, other): result = len(self._use.symmetric_difference(other.use)) == 0 return result and (self._path == other.path) def __repr__(self): return "AnalyseFile({0})".format(self._path) @property def use(self): """Return uses""" return self._use @property def path(self): """Get file path""" return self._path
[docs] def etree_repr(self): """Return etree object representation""" file_etree = ET.Element("file") file_etree.text = self._path if len(self._use) > 0: file_etree.attrib["use"] = \ jube2.conf.DEFAULT_SEPARATOR.join(self._use) return file_etree
def __init__(self, name, reduce_iteration=True): self._name = name self._use = set() self._analyse = dict() self._benchmark = None self._analyse_result = None self._reduce_iteration = reduce_iteration @property def benchmark(self): """Get benchmark information""" return self._benchmark @benchmark.setter def benchmark(self, benchmark): """Set benchmark information""" self._benchmark = benchmark @property def use(self): """Return uses""" return self._use @property def analyser(self): """Return analyse dict""" return self._analyse @property def analyse_result(self): """Return analyse result""" return self._analyse_result @analyse_result.setter def analyse_result(self, analyse_result): """Set analyse result""" self._analyse_result = analyse_result
[docs] def add_analyse(self, step_name, analyse_file): """Add an addtional analyse file""" if step_name not in self._analyse: self._analyse[step_name] = list() if (analyse_file not in self._analyse[step_name]) and \ (analyse_file is not None): self._analyse[step_name].append(analyse_file)
[docs] def add_uses(self, use_names): """Add an addtional patternset name""" for use_name in use_names: if use_name in self._use: raise ValueError(("Element \"{0}\" can only be used once") .format(use_name)) self._use.add(use_name)
@property def name(self): """Get analyser name""" return self._name
[docs] def etree_repr(self): """Return etree object representation""" analyser_etree = ET.Element("analyser") analyser_etree.attrib["name"] = self._name analyser_etree.attrib["reduce"] = str(self._reduce_iteration) for use in self._use: use_etree = ET.SubElement(analyser_etree, "use") use_etree.text = use for step_name in self._analyse: analyse_etree = ET.SubElement(analyser_etree, "analyse") analyse_etree.attrib["step"] = step_name for fileobj in self._analyse[step_name]: analyse_etree.append(fileobj.etree_repr()) return analyser_etree
def _combine_and_check_patternsets(self, patternset, uses): """Combine patternsets given by uses and check compatibility""" for use in uses: if use not in self._benchmark.patternsets: raise RuntimeError(("<patternset name=\"{0}\"> used but not " + "found").format(use)) if not patternset.is_compatible(self._benchmark.patternsets[use]): incompatible_names = patternset.get_incompatible_pattern( self._benchmark.patternsets[use]) raise RuntimeError(("Cannot use patternset \"{0}\" " + "in analyser \"{1}\", because there are " + "incompatible pattern name combinations: " "{2}") .format(use, self._name, ",".join(incompatible_names))) patternset.add_patternset(self._benchmark.patternsets[use])
[docs] def analyse(self): """Run the analyser""" LOGGER.debug("Run analyser \"{0}\"".format(self._name)) if self._benchmark is None: raise RuntimeError("No benchmark found using analyser {0}" .format(self._name)) result = dict() # Combine all patternsets patternset = jube2.pattern.Patternset() self._combine_and_check_patternsets(patternset, self._use) # Print debug info debugstr = " available pattern:\n" debugstr += \ jube2.util.output.text_table( [("pattern", "value")] + sorted([(par.name, par.value) for par in patternset.pattern_storage]), use_header_line=True, indent=9, align_right=False) debugstr += "\n available derived pattern:\n" debugstr += \ jube2.util.output.text_table( [("pattern", "value")] + sorted([(par.name, par.value) for par in patternset.derived_pattern_storage]), use_header_line=True, indent=9, align_right=False) LOGGER.debug(debugstr) for stepname in self._analyse: result[stepname] = dict() LOGGER.debug(" analyse step \"{0}\"".format(stepname)) if stepname not in self._benchmark.steps: raise RuntimeError(("Could not find <step name=\"{0}\"> " "when using analyser \"{1}\"").format( stepname, self._name)) step = self._benchmark.steps[stepname] workpackages = set(self._benchmark.workpackages[stepname]) while len(workpackages) > 0: root_workpackage = workpackages.pop() match_dict = dict() # Global patternset to store all existing pattern (e.g. from # individual file uses), necessary to evaluate default pattern # and derived pattern global_patternset = patternset.copy() result[stepname][root_workpackage.id] = dict() # Should multiple iterations be reduced to a single result line if self._reduce_iteration: siblings = set(root_workpackage.iteration_siblings) else: siblings = set([root_workpackage]) while len(siblings) > 0: workpackage = siblings.pop() if workpackage in workpackages: workpackages.remove(workpackage) # Ignore workpackages not started yet if not workpackage.started: continue parameter = \ dict([[par.name, par.value] for par in workpackage.parameterset. constant_parameter_dict.values()]) for file_obj in self._analyse[stepname]: if step.alt_work_dir is not None: file_path = step.alt_work_dir file_path = jube2.util.util.substitution( file_path, parameter) file_path = \ os.path.expandvars( os.path.expanduser(file_path)) file_path = os.path.join( self._benchmark.file_path_ref, file_path) else: file_path = workpackage.work_dir filename = \ jube2.util.util.substitution(file_obj.path, parameter) filename = \ os.path.expandvars(os.path.expanduser(filename)) file_path = os.path.join(file_path, filename) for path in glob.glob(file_path): # scan files LOGGER.debug((" scan file {0}").format(path)) new_result_dict, match_dict = \ self._analyse_file(path, patternset, global_patternset, workpackage.parameterset, match_dict, file_obj.use) result[stepname][root_workpackage.id].update( new_result_dict) # Set default pattern values if available and necessary new_result_dict = result[stepname][root_workpackage.id] for pattern in global_patternset.pattern_storage: if (pattern.default_value is not None) and \ (pattern.name not in new_result_dict): default = pattern.default_value # Convert default value if pattern.content_type == "int": if default == "nan": default = float("nan") else: default = int(float(default)) elif pattern.content_type == "float": default = float(default) new_result_dict[pattern.name] = default new_result_dict[pattern.name + "_cnt"] = 0 new_result_dict[pattern.name + "_first"] = default new_result_dict[pattern.name + "_last"] = default if pattern.content_type in ["int", "float"]: new_result_dict.update( {pattern.name + "_sum": default, pattern.name + "_min": default, pattern.name + "_max": default, pattern.name + "_avg": default, pattern.name + "_sum2": default ** 2, pattern.name + "_std": 0}) # Evaluate derived pattern new_result_dict = self._eval_derived_pattern( global_patternset, root_workpackage.parameterset, result[stepname][root_workpackage.id]) result[stepname][root_workpackage.id].update( new_result_dict) self._analyse_result = result
def _eval_derived_pattern(self, patternset, parameterset, result_dict): """Evaluate all derived pattern in patternset using parameterset and result_dict""" resultset = jube2.parameter.Parameterset() for name in result_dict: resultset.add_parameter( jube2.parameter.Parameter.create_parameter( name, value=str(result_dict[name]))) # Get jube patternset jube_pattern = jube2.pattern.get_jube_pattern() # calculate derived pattern patternset.derived_pattern_substitution( [parameterset, resultset, jube_pattern.pattern_storage]) new_result_dict = dict() # Convert content type for par in patternset.derived_pattern_storage: if par.mode not in jube2.conf.ALLOWED_SCRIPTTYPES: new_result_dict[par.name] = \ jube2.util.util.convert_type(par.content_type, par.value, stop=False) return new_result_dict def _analyse_file(self, file_path, patternset, global_patternset, parameterset, match_dict=None, additional_uses=None): """Scan given files with given pattern and produce a result parameterset""" if additional_uses is None: additional_uses = set() if match_dict is None: match_dict = dict() if not os.path.isfile(file_path): return dict(), match_dict local_patternset = patternset.copy() # Add file specific uses self._combine_and_check_patternsets(local_patternset, additional_uses) self._combine_and_check_patternsets(global_patternset, additional_uses) # Unique pattern/parameter check if (not parameterset.is_compatible( local_patternset.pattern_storage)) or \ (not parameterset.is_compatible( local_patternset.derived_pattern_storage)): incompatible_names = parameterset.get_incompatible_parameter( local_patternset.pattern_storage) incompatible_names.update(parameterset.get_incompatible_parameter( local_patternset.derived_pattern_storage)) raise RuntimeError(("A pattern and a parameter (\"{0}\") " "using the same name in " "analyser \"{1}\"").format( ",".join(incompatible_names), self._name)) # Get jube patternset jube_pattern = jube2.pattern.get_jube_pattern() # Do pattern substitution local_patternset.pattern_substitution( [parameterset, jube_pattern.pattern_storage]) patternlist = [p for p in local_patternset.pattern_storage] file_handle = open(file_path, "r") # Read file content data = file_handle.read() for pattern in patternlist: if pattern.name not in match_dict: match_dict[pattern.name] = dict() try: mode = re.MULTILINE if pattern.dotall: mode += re.DOTALL regex = re.compile(pattern.value, mode) except re.error as ree: raise RuntimeError(("Error inside pattern \"{0}\" : " + "\"{1}\" : {2}") .format(pattern.name, pattern.value, ree)) # Run regular expression matches = re.findall(regex, data) # If there are different groups reduce result shape if regex.groups > 1: match_list = list() for match in matches: match_list = match_list + list(match) else: match_list = matches # Remove empty matches match_list = [match for match in match_list if match != ""] # Convert to pattern type new_match_list = list() for match in match_list: try: if pattern.content_type == "int": if match == "nan": new_match_list.append(float("nan")) else: new_match_list.append(int(float(match))) elif pattern.content_type == "float": new_match_list.append(float(match)) else: new_match_list.append(match) except ValueError: LOGGER.warning(("\"{0}\" cannot be represented " + "as a \"{1}\"") .format(match, pattern.content_type)) match_list = new_match_list if len(match_list) > 0: # First match is default if "first" not in match_dict[pattern.name]: match_dict[pattern.name]["first"] = match_list[0] for match in match_list: if pattern.content_type in ["int", "float"]: if "min" in match_dict[pattern.name]: match_dict[pattern.name]["min"] = \ min(match_dict[pattern.name]["min"], match) else: match_dict[pattern.name]["min"] = match if "max" in match_dict[pattern.name]: match_dict[pattern.name]["max"] = \ max(match_dict[pattern.name]["max"], match) else: match_dict[pattern.name]["max"] = match if "sum" in match_dict[pattern.name]: match_dict[pattern.name]["sum"] += match else: match_dict[pattern.name]["sum"] = match try: if "sum2" in match_dict[pattern.name]: match_dict[pattern.name]["sum2"] += match ** 2 else: match_dict[pattern.name]["sum2"] = match ** 2 except OverflowError: LOGGER.warning( "Squared sum cannot be represented, " + "numerical result out of range.") match_dict[pattern.name]["sum2"] = math.nan if "cnt" in match_dict[pattern.name]: match_dict[pattern.name]["cnt"] += 1 else: match_dict[pattern.name]["cnt"] = 1 if pattern.content_type in ["int", "float"]: if match_dict[pattern.name]["cnt"] > 0: match_dict[pattern.name]["avg"] = \ (match_dict[pattern.name]["sum"] / match_dict[pattern.name]["cnt"]) if match_dict[pattern.name]["cnt"] > 1: try: match_dict[pattern.name]["std"] = math.sqrt( (abs(match_dict[pattern.name]["sum2"] - (match_dict[pattern.name]["sum"] ** 2 / match_dict[pattern.name]["cnt"])) / (match_dict[pattern.name]["cnt"] - 1))) except OverflowError: match_dict[pattern.name]["std"] = 0 else: match_dict[pattern.name]["std"] = 0 match_dict[pattern.name]["last"] = match_list[-1] info_str = " file \"{0}\" scanned pattern found:\n".format( os.path.basename(file_path)) info_str += jube2.util.output.text_table( [(_name, ", ".join(["{0}:{1}".format(key, con) for key, con in value.items()])) for _name, value in match_dict.items()], indent=9, align_right=True, auto_linebreak=True) LOGGER.debug(info_str) file_handle.close() # Create result dict result_dict = dict() for pattern_name in match_dict: for option in match_dict[pattern_name]: if option == "first": result_dict[pattern_name] = \ match_dict[pattern_name][option] name = "{0}_{1}".format(pattern_name, option) result_dict[name] = match_dict[pattern_name][option] return result_dict, match_dict
[docs] def analyse_etree_repr(self): """Create an etree representation of a analyse dict: stepname -> workpackage_id -> filename -> patternname -> value """ etree = list() if self._analyse_result is None: return etree for stepname in self._analyse_result: step_etree = ET.Element("step") step_etree.attrib["name"] = stepname for workpackage_id in self._analyse_result[stepname]: workpackage_etree = ET.SubElement(step_etree, "workpackage") workpackage_etree.attrib["id"] = str(workpackage_id) for pattern in self._analyse_result[stepname][workpackage_id]: if type(self._analyse_result[stepname][workpackage_id] [pattern]) is int: content_type = "int" elif type(self._analyse_result[stepname][ workpackage_id][pattern]) is float: content_type = "float" else: content_type = "string" pattern_etree = ET.SubElement(workpackage_etree, "pattern") pattern_etree.attrib["name"] = pattern pattern_etree.attrib["type"] = content_type pattern_etree.text = \ str(self._analyse_result[stepname][workpackage_id] [pattern]) etree.append(step_etree) return etree