Source code for jube2.result
# JUBE Benchmarking Environment
# Copyright (C) 2008-2024
# Forschungszentrum Juelich GmbH, Juelich Supercomputing Centre
# http://www.fz-juelich.de/jsc/jube
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Resulttype definition"""
from __future__ import (print_function,
unicode_literals,
division)
import jube2.util.util
import xml.etree.ElementTree as ET
import re
import jube2.log
LOGGER = jube2.log.get_logger(__name__)
[docs]class Result(object):
"""A generic result type"""
[docs] class ResultData(object):
"""A gerneric result data type"""
def __init__(self, name):
self._name = name
@property
def name(self):
"""Return the result name"""
return self._name
[docs] def create_result(self, show=True, filename=None, **kwargs):
"""Create result output"""
raise NotImplementedError("")
[docs] def add_result_data(self, result_data):
"""Add additional result data"""
raise NotImplementedError("")
def __eq__(self, other):
return self.name == other.name
def __init__(self, name, res_filter=None):
self._use = set()
self._name = name
self._res_filter = res_filter
self._result_dir = None
self._benchmark = None
@property
def name(self):
"""Return the result name"""
return self._name
@property
def benchmark(self):
"""Return the benchmark"""
return self._benchmark
@property
def result_dir(self):
"""Return the result_dir"""
return self._result_dir
@result_dir.setter
def result_dir(self, result_dir):
"""Set the result_dir"""
self._result_dir = result_dir
@benchmark.setter
def benchmark(self, benchmark):
"""Set the benchmark"""
self._benchmark = benchmark
[docs] def add_uses(self, use_names):
"""Add an addtional analyser name"""
for use_name in use_names:
if use_name in self._use:
raise ValueError(("Element \"{0}\" can only be used once")
.format(use_name))
self._use.add(use_name)
[docs] def create_result_data(self):
"""Create result representation"""
raise NotImplementedError("")
def _analyse_data(self):
"""Load analyse data out of given analysers"""
for analyser_name in self._use:
analyser = self._benchmark.analyser[analyser_name]
analyse = analyser.analyse_result
# Ignore empty analyse results
if analyse is None:
LOGGER.debug(("No data found for analyser \"{0}\" "
"in benchmark run {1}. "
"Run analyse step automatically.")
.format(analyser_name, self._benchmark.id))
self._benchmark.analyse(show_info=False,
specific_analyser_name=analyser_name)
analyse = \
self._benchmark.analyser[analyser_name].analyse_result
# Check if analyse is still empty
if analyse is None:
LOGGER.warning(("No data found for analyser \"{0}\" "
"in benchmark run {1}.")
.format(analyser_name, self._benchmark.id))
continue
# Create workpackage chains
wp_chains = list()
all_wps = set()
for ids in [analyse[stepname].keys() for stepname in analyse]:
all_wps.update(set(map(int, ids)))
# Find workpackages without children (or at least no childen in
# the given analyser)
last_wps = set()
for id in all_wps:
child_ids = set([wp.id for wp in self._benchmark.
workpackage_by_id(id).children_future])
if not child_ids.intersection(all_wps):
last_wps.add(id)
while (len(last_wps) > 0):
next_id = last_wps.pop()
# Create new chain
wp_chains.append(list())
# Add all parents to the chain
for wp in self._benchmark.workpackage_by_id(next_id).\
parent_history:
if wp.id not in wp_chains[-1]:
wp_chains[-1].append(wp.id)
# Add wp itself to the chain
wp_chains[-1].append(next_id)
# Create output datasets by combining analyse and parameter data
for chain in wp_chains:
analyse_dict = dict()
for wp_id in chain:
workpackage = self._benchmark.workpackage_by_id(wp_id)
# add analyse data
if (wp_id in all_wps):
analyse_dict.update(
analyse[workpackage.step.name][wp_id])
# add parameter
parameter_dict = dict()
for par in workpackage.parameterset:
value = \
jube2.util.util.convert_type(par.parameter_type,
par.value, stop=False)
# add suffix to the parameter name
if (par.name + "_" + workpackage.step.name
not in parameter_dict):
parameter_dict[par.name + "_" +
workpackage.step.name] = value
# parmater without suffix is used for the last WP in
# the chain
if wp_id == chain[-1]:
parameter_dict[par.name] = value
analyse_dict.update(parameter_dict)
# Add jube additional information
analyse_dict.update({
"jube_res_analyser": analyser_name,
})
# If res_filter is set, only show matching result lines
if self._res_filter is not None:
res_filter = jube2.util.util.substitution(
self._res_filter, analyse_dict)
if not jube2.util.util.eval_bool(res_filter):
continue
yield analyse_dict
def _load_units(self, pattern_names):
"""Load units"""
units = dict()
alt_pattern_names = list(pattern_names)
for i, pattern_name in enumerate(alt_pattern_names):
for option in ["first", "last", "min", "max", "avg", "sum", "std"]:
matcher = re.match("^(.+)_{0}$".format(option), pattern_name)
if matcher:
alt_pattern_names[i] = matcher.group(1)
for analyser_name in self._use:
if analyser_name not in self._benchmark.analyser:
raise RuntimeError(
"<analyser name=\"{0}\"> not found".format(analyser_name))
patternset_names = \
self._benchmark.analyser[analyser_name].use.copy()
for analyse_files in \
self._benchmark.analyser[analyser_name].analyser.values():
for analyse_file in analyse_files:
for use in analyse_file.use:
patternset_names.add(use)
for patternset_name in patternset_names:
patternset = self._benchmark.patternsets[patternset_name]
for i, pattern_name in enumerate(pattern_names):
alt_pattern_name = alt_pattern_names[i]
if (pattern_name in patternset) or \
(alt_pattern_name in patternset):
pattern = patternset[pattern_name]
if pattern is None:
pattern = patternset[alt_pattern_name]
if (pattern.unit is not None) and (pattern.unit != ""):
units[pattern_name] = pattern.unit
for parameterset_name in self._benchmark.parametersets:
parameterset = self._benchmark.parametersets[parameterset_name]
for parameter in parameterset:
if (parameter.unit is not None) and (parameter.unit != ""):
units[parameter.name] = parameter.unit
return units
[docs] def etree_repr(self):
"""Return etree object representation"""
result_etree = ET.Element("result")
if self._result_dir is not None:
result_etree.attrib["result_dir"] = self._result_dir
for use in self._use:
use_etree = ET.SubElement(result_etree, "use")
use_etree.text = use
return result_etree