# JUBE Benchmarking Environment
# Copyright (C) 2008-2024
# Forschungszentrum Juelich GmbH, Juelich Supercomputing Centre
# http://www.fz-juelich.de/jsc/jube
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Step contains the commands for steps"""
from __future__ import (print_function,
unicode_literals,
division)
import subprocess
import os
import re
import time
import xml.etree.ElementTree as ET
import jube2.util.util
import jube2.conf
import jube2.log
import jube2.parameter
LOGGER = jube2.log.get_logger(__name__)
[docs]class Step(object):
"""A Step represent one execution step. It contains a list of
Do-operations and multiple parametersets, substitutionsets and filesets.
A Step is a template for Workpackages.
"""
def __init__(self, name, depend, iterations=1, alt_work_dir=None,
shared_name=None, export=False, max_wps="0",
active="true", suffix="", cycles=1, procs=1, do_log_file=None):
self._name = name
self._use = list()
self._operations = list()
self._iterations = iterations
self._depend = depend
self._alt_work_dir = alt_work_dir
self._shared_name = shared_name
self._export = export
self._max_wps = max_wps
self._active = active
self._suffix = suffix
self._cycles = cycles
self._procs = procs
self._do_log_file = do_log_file
[docs] def etree_repr(self):
"""Return etree object representation"""
step_etree = ET.Element("step")
step_etree.attrib["name"] = self._name
if len(self._depend) > 0:
step_etree.attrib["depend"] = \
jube2.conf.DEFAULT_SEPARATOR.join(self._depend)
if self._alt_work_dir is not None:
step_etree.attrib["work_dir"] = self._alt_work_dir
if self._shared_name is not None:
step_etree.attrib["shared"] = self._shared_name
if self._active != "true":
step_etree.attrib["active"] = self._active
if self._suffix != "":
step_etree.attrib["suffix"] = self._suffix
if self._export:
step_etree.attrib["export"] = "true"
if self._max_wps != "0":
step_etree.attrib["max_async"] = self._max_wps
if self._iterations > 1:
step_etree.attrib["iterations"] = str(self._iterations)
if self._cycles > 1:
step_etree.attrib["cycles"] = str(self._cycles)
if self._procs != 1:
step_etree.attrib["procs"] = str(self._procs)
if self._do_log_file != None:
step_etree.attrib["do_log_file"] = str(self._do_log_file)
for use in self._use:
use_etree = ET.SubElement(step_etree, "use")
use_etree.text = jube2.conf.DEFAULT_SEPARATOR.join(use)
for operation in self._operations:
step_etree.append(operation.etree_repr())
return step_etree
def __repr__(self):
return "{0}".format(vars(self))
[docs] def add_operation(self, operation):
"""Add operation"""
self._operations.append(operation)
[docs] def add_uses(self, use_names):
"""Add use"""
for use_name in use_names:
if any([use_name in use_list for use_list in self._use]):
raise ValueError(("Element \"{0}\" can only be used once")
.format(use_name))
self._use.append(use_names)
@property
def name(self):
"""Return step name"""
return self._name
@property
def active(self):
"""Return active state"""
return self._active
@property
def export(self):
"""Return export behaviour"""
return self._export
@property
def iterations(self):
"""Return iterations"""
return self._iterations
@property
def cycles(self):
"""Return number of cycles"""
return self._cycles
@property
def procs(self):
"""Return number of procs"""
return self._procs
@property
def shared_link_name(self):
"""Return shared link name"""
return self._shared_name
@property
def max_wps(self):
"""Return maximum number of simultaneous workpackages"""
return self._max_wps
@property
def do_log_file(self):
"""Return do log file name"""
return self._do_log_file
[docs] def get_used_sets(self, available_sets, parameter_dict=None):
"""Get list of all used sets, which can be found in available_sets"""
set_names = list()
if parameter_dict is None:
parameter_dict = dict()
for use in self._use:
for name in use:
name = jube2.util.util.substitution(name, parameter_dict)
if (name in available_sets) and (name not in set_names):
set_names.append(name)
return set_names
[docs] def shared_folder_path(self, benchdir, parameter_dict=None):
"""Return shared folder name"""
if self._shared_name is not None:
if parameter_dict is not None:
shared_name = jube2.util.util.substitution(self._shared_name,
parameter_dict)
else:
shared_name = self._shared_name
return os.path.join(benchdir,
"{0}_{1}".format(self._name, shared_name))
else:
return ""
[docs] def get_jube_parameterset(self):
"""Return parameterset which contains step related
information"""
parameterset = jube2.parameter.Parameterset()
# step name
parameterset.add_parameter(
jube2.parameter.Parameter.
create_parameter("jube_step_name", self._name,
update_mode=jube2.parameter.JUBE_MODE))
# iterations
parameterset.add_parameter(
jube2.parameter.Parameter.
create_parameter("jube_step_iterations", str(self._iterations),
parameter_type="int",
update_mode=jube2.parameter.JUBE_MODE))
# cycles
parameterset.add_parameter(
jube2.parameter.Parameter.
create_parameter("jube_step_cycles", str(self._cycles),
parameter_type="int",
update_mode=jube2.parameter.JUBE_MODE))
# default worpackage cycle, will be overwritten by specific worpackage
# cycle
parameterset.add_parameter(
jube2.parameter.Parameter.
create_parameter("jube_wp_cycle", "0", parameter_type="int",
update_mode=jube2.parameter.JUBE_MODE))
return parameterset
[docs] def create_workpackages(self, benchmark, global_parameterset,
local_parameterset=None, used_sets=None,
iteration_base=0, parents=None,
incompatible_parameters=None):
"""Create workpackages for current step using given
benchmark context"""
if used_sets is None:
used_sets = set()
update_parameters = jube2.parameter.Parameterset()
if local_parameterset is None:
local_parameterset = jube2.parameter.Parameterset()
global_parameterset.add_parameterset(
benchmark.get_jube_parameterset())
global_parameterset.add_parameterset(self.get_jube_parameterset())
update_parameters.add_parameterset(
global_parameterset.get_updatable_parameter(
jube2.parameter.STEP_MODE))
for parameter in update_parameters:
incompatible_parameters.discard(parameter.name)
if parents is None:
parents = list()
new_workpackages = list()
# Create parameter dictionary for substitution
parameter_dict = \
dict([[par.name, par.value] for par in
global_parameterset.constant_parameter_dict.values()])
# Filter for parametersets in uses
parameterset_names = \
set(self.get_used_sets(benchmark.parametersets, parameter_dict))
new_sets_found = len(parameterset_names.difference(used_sets)) > 0
if new_sets_found:
parameterset_names = parameterset_names.difference(used_sets)
used_sets = used_sets.union(parameterset_names)
for parameterset_name in parameterset_names:
# The parametersets in a single step must be compatible
if not local_parameterset.is_compatible(
benchmark.parametersets[parameterset_name]):
incompatible_names = \
local_parameterset.get_incompatible_parameter(
benchmark.parametersets[parameterset_name])
raise ValueError(("Cannot use parameterset '{0}' in " +
"step '{1}'.\nParameter '{2}' is/are " +
"already defined by a different " +
"parameterset.")
.format(parameterset_name, self.name,
",".join(incompatible_names)))
local_parameterset.add_parameterset(
benchmark.parametersets[parameterset_name])
# Combine local and history parameterset
if local_parameterset.is_compatible(
global_parameterset, update_mode=jube2.parameter.USE_MODE):
update_parameters.add_parameterset(
local_parameterset.get_updatable_parameter(
jube2.parameter.USE_MODE))
for parameter in update_parameters:
incompatible_parameters.discard(parameter.name)
global_parameterset = \
local_parameterset.copy().add_parameterset(
global_parameterset)
else:
incompatible_names = \
local_parameterset.get_incompatible_parameter(
global_parameterset,
update_mode=jube2.parameter.USE_MODE)
LOGGER.debug("Incompatible parameterset combination found " +
"between current and parent steps. \nParameter " +
"'{0}' is/are already defined different.".format(
",".join(incompatible_names)))
return new_workpackages
# update parameters
global_parameterset.update_parameterset(update_parameters)
# Set tag-mode evaluation helper function to allow access to tag list
# during paramter evaluation
for parameter in global_parameterset.all_parameters:
if parameter.mode == "tag":
parameter.eval_helper = \
lambda tag: tag if tag in benchmark.tags else ""
# Expand templates
parametersets = [global_parameterset]
change = True
while change:
change = False
new_parametersets = list()
for parameterset in parametersets:
parameterset.parameter_substitution()
# Maybe new templates were created
if parameterset.has_templates:
LOGGER.debug("Expand parameter templates:\n{0}".format(
"\n".join(" \"{0}\": {1}".format(i, j.value)
for i, j in parameterset.
template_parameter_dict.items())))
new_parametersets += \
[new_parameterset for new_parameterset in
parameterset.expand_templates()]
change = True
else:
new_parametersets += [parameterset]
parametersets = new_parametersets
# Create workpackages
for parameterset in parametersets:
workpackage_parameterset = local_parameterset.copy()
workpackage_parameterset.update_parameterset(parameterset)
if new_sets_found:
new_workpackages += \
self.create_workpackages(benchmark, parameterset,
workpackage_parameterset,
used_sets, iteration_base,
parents,
incompatible_parameters.copy())
else:
# Check if all incompatible_parameters were updated
if len(incompatible_parameters) > 0:
return new_workpackages
# Create new workpackage
created_workpackages = list()
for iteration in range(self.iterations):
workpackage = jube2.workpackage.Workpackage(
benchmark=benchmark,
step=self,
parameterset=parameterset.copy(),
local_parameter_names=[
par.name for par in workpackage_parameterset],
iteration=iteration_base * self.iterations + iteration,
cycle=0)
# --- Link parent workpackages ---
for parent in parents:
workpackage.add_parent(parent)
# --- Add workpackage JUBE parameterset ---
workpackage.parameterset.add_parameterset(
workpackage.get_jube_parameterset())
# --- Final parameter substitution ---
workpackage.parameterset.parameter_substitution(
final_sub=True)
# --- Check parameter type ---
for parameter in workpackage.parameterset:
if not parameter.is_template:
jube2.util.util.convert_type(
parameter.parameter_type, parameter.value)
# --- Enable workpackage dir cache ---
workpackage.allow_workpackage_dir_caching()
if workpackage.active:
created_workpackages.append(workpackage)
else:
jube2.workpackage.Workpackage.\
reduce_workpackage_id_counter()
for workpackage in created_workpackages:
workpackage.iteration_siblings.update(
set(created_workpackages))
new_workpackages += created_workpackages
return new_workpackages
@property
def alt_work_dir(self):
"""Return alternativ work directory"""
return self._alt_work_dir
@property
def use(self):
"""Return parameters and substitutions"""
return self._use
@property
def suffix(self):
"""Return directory suffix"""
return self._suffix
@property
def operations(self):
"""Return operations"""
return self._operations
@property
def depend(self):
"""Return dependencies"""
return self._depend
[docs] def get_depend_history(self, benchmark):
"""Creates a set of all dependent steps in history for given
benchmark"""
depend_history = set()
for step_name in self._depend:
if step_name not in depend_history:
depend_history.add(step_name)
depend_history.update(
benchmark.steps[step_name].get_depend_history(benchmark))
return depend_history
[docs]class Operation(object):
"""The Operation-class represents a single instruction, which will be
executed in a shell environment.
"""
def __init__(self, do, async_filename=None, stdout_filename=None,
stderr_filename=None, active="true", shared=False,
work_dir=None, break_filename=None, error_filename=None):
self._do = do
self._error_filename = error_filename
self._async_filename = async_filename
self._break_filename = break_filename
self._stdout_filename = stdout_filename
self._stderr_filename = stderr_filename
self._active = active
self._shared = shared
self._work_dir = work_dir
@property
def stdout_filename(self):
"""Get stdout filename"""
return self._stdout_filename
@property
def stderr_filename(self):
"""Get stderr filename"""
return self._stderr_filename
@property
def error_filename(self):
"""Get error filename"""
return self._error_filename
@property
def async_filename(self):
"""Get async filename"""
return self._async_filename
@property
def shared(self):
"""Shared operation?"""
return self._shared
[docs] def active(self, parameter_dict):
"""Return active status of the current operation depending on the
given parameter_dict"""
active_str = jube2.util.util.substitution(self._active, parameter_dict)
return jube2.util.util.eval_bool(active_str)
[docs] def execute(self, parameter_dict, work_dir, only_check_pending=False,
environment=None, pid=None, dolog=None):
"""Execute the operation. work_dir must be set to the given context
path. The parameter_dict used for inline substitution.
If only_check_pending is set to True, the operation will not be
executed, only the async_file will be checked.
Return operation status:
True => operation finished
False => operation pending
"""
if not self.active(parameter_dict):
return True
if environment is not None:
env = environment
else:
env = os.environ
if not only_check_pending:
# Inline substitution
do = jube2.util.util.substitution(self._do, parameter_dict)
# Remove leading and trailing ; because otherwise ;; will cause
# trouble when adding ; env
do = do.strip(";")
if (not jube2.conf.DEBUG_MODE) and (do.strip() != ""):
# Change stdout
if self._stdout_filename is not None:
stdout_filename = jube2.util.util.substitution(
self._stdout_filename, parameter_dict)
stdout_filename = \
os.path.expandvars(os.path.expanduser(stdout_filename))
else:
stdout_filename = "stdout"
stdout_path = os.path.join(work_dir, stdout_filename)
stdout = open(stdout_path, "a")
# Change stderr
if self._stderr_filename is not None:
stderr_filename = jube2.util.util.substitution(
self._stderr_filename, parameter_dict)
stderr_filename = \
os.path.expandvars(os.path.expanduser(stderr_filename))
else:
stderr_filename = "stderr"
stderr_path = os.path.join(work_dir, stderr_filename)
stderr = open(stderr_path, "a")
# Use operation specific work directory
if self._work_dir is not None and len(self._work_dir) > 0:
new_work_dir = jube2.util.util.substitution(
self._work_dir, parameter_dict)
new_work_dir = os.path.expandvars(os.path.expanduser(new_work_dir))
work_dir = os.path.join(work_dir, new_work_dir)
if re.search(jube2.parameter.Parameter.parameter_regex, work_dir):
raise IOError(("Given work directory {0} contains a unknown " +
"JUBE or environment variable.").format(
work_dir))
# Create directory if it does not exist
if not jube2.conf.DEBUG_MODE and not os.path.exists(work_dir):
try:
os.makedirs(work_dir)
except FileExistsError:
pass
if not only_check_pending:
if pid is not None:
env_file_name = jube2.conf.ENVIRONMENT_INFO.replace(
'.', '_{}.'.format(pid))
else:
env_file_name = jube2.conf.ENVIRONMENT_INFO
abs_info_file_path = \
os.path.abspath(os.path.join(work_dir, env_file_name))
# Select unix shell
shell = jube2.conf.STANDARD_SHELL
if "JUBE_EXEC_SHELL" in os.environ:
alt_shell = os.environ["JUBE_EXEC_SHELL"].strip()
if len(alt_shell) > 0:
shell = alt_shell
# Execute "do"
LOGGER.debug(">>> {0}".format(do))
if (not jube2.conf.DEBUG_MODE) and (do != ""):
LOGGER.debug(" stdout: {0}".format(
os.path.abspath(stdout_path)))
LOGGER.debug(" stderr: {0}".format(
os.path.abspath(stderr_path)))
try:
if jube2.conf.VERBOSE_LEVEL > 1:
stdout_handle = subprocess.PIPE
else:
stdout_handle = stdout
if dolog != None:
dolog.store_do(do=do, shell=shell, work_dir=os.path.abspath(
work_dir), parameter_dict=parameter_dict, shared=self.shared)
sub = subprocess.Popen(
[shell, "-c",
"{0} && env > \"{1}\"".format(do,
abs_info_file_path)],
cwd=work_dir, stdout=stdout_handle,
stderr=stderr, shell=False,
env=env)
except OSError:
stdout.close()
stderr.close()
raise RuntimeError(("Error (returncode <> 0) while " +
"running \"{0}\" in " +
"directory \"{1}\"")
.format(do, os.path.abspath(work_dir)))
# stdout verbose output
if jube2.conf.VERBOSE_LEVEL > 1:
while True:
read_out = sub.stdout.read(
jube2.conf.VERBOSE_STDOUT_READ_CHUNK_SIZE)
if (not read_out):
break
else:
try:
print(read_out.decode(errors="ignore"), end="")
except TypeError:
print(read_out.decode("utf-8", "ignore"),
end="")
try:
stdout.write(read_out)
except TypeError:
try:
stdout.write(read_out.decode(
errors="ignore"))
except TypeError:
stdout.write(read_out.decode("utf-8",
"ignore"))
time.sleep(jube2.conf.VERBOSE_STDOUT_POLL_SLEEP)
sub.communicate()
returncode = sub.wait()
# Close filehandles
stdout.close()
stderr.close()
env = Operation.read_process_environment(work_dir, pid=pid)
# Read and store new environment
if (environment is not None) and (returncode == 0):
environment.clear()
environment.update(env)
if returncode != 0:
if os.path.isfile(stderr_path):
stderr = open(stderr_path, "r")
stderr_msg = stderr.readlines()
stderr.close()
else:
stderr_msg = ""
try:
raise RuntimeError(
("Error (returncode <> 0) while running \"{0}\" " +
"in directory \"{1}\"\nMessage in \"{2}\":" +
"{3}\n{4}").format(
do,
os.path.abspath(work_dir),
os.path.abspath(stderr_path),
"\n..." if len(stderr_msg) >
jube2.conf.ERROR_MSG_LINES else "",
"\n".join(stderr_msg[
-jube2.conf.ERROR_MSG_LINES:])))
except UnicodeDecodeError:
raise RuntimeError(
("Error (returncode <> 0) while running \"{0}\" " +
"in directory \"{1}\"").format(
do,
os.path.abspath(work_dir)))
continue_op = True
continue_cycle = True
# Check if further execution was skipped
if self._break_filename is not None:
break_filename = jube2.util.util.substitution(
self._break_filename, parameter_dict)
break_filename = \
os.path.expandvars(os.path.expanduser(break_filename))
if os.path.exists(os.path.join(work_dir, break_filename)):
LOGGER.debug(("\"{0}\" was found, workpackage execution and "
" further loop continuation was stopped.")
.format(break_filename))
continue_cycle = False
# Waiting to continue
if self._async_filename is not None:
async_filename = jube2.util.util.substitution(
self._async_filename, parameter_dict)
async_filename = \
os.path.expandvars(os.path.expanduser(async_filename))
if not os.path.exists(os.path.join(work_dir, async_filename)):
LOGGER.debug("Waiting for file \"{0}\" ..."
.format(async_filename))
if jube2.conf.DEBUG_MODE:
LOGGER.debug(" skip waiting")
else:
continue_op = False
# Search for error file
if self._error_filename is not None:
error_filename = jube2.util.util.substitution(
self._error_filename, parameter_dict)
error_filename = \
os.path.expandvars(os.path.expanduser(error_filename))
if os.path.exists(os.path.join(work_dir, error_filename)):
LOGGER.debug("Checking for error file \"{0}\" ..."
.format(error_filename))
if jube2.conf.DEBUG_MODE:
LOGGER.debug(" skip error")
else:
do = jube2.util.util.substitution(self._do, parameter_dict)
raise(RuntimeError(("Error file \"{0}\" found after " +
"running the command \"{1}\".").format(
error_filename, do)))
return continue_op, continue_cycle
[docs] def etree_repr(self):
"""Return etree object representation"""
do_etree = ET.Element("do")
do_etree.text = self._do
if self._async_filename is not None:
do_etree.attrib["done_file"] = self._async_filename
if self._error_filename is not None:
do_etree.attrib["error_file"] = self._error_filename
if self._break_filename is not None:
do_etree.attrib["break_file"] = self._break_filename
if self._stdout_filename is not None:
do_etree.attrib["stdout"] = self._stdout_filename
if self._stderr_filename is not None:
do_etree.attrib["stderr"] = self._stderr_filename
if self._active != "true":
do_etree.attrib["active"] = self._active
if self._shared:
do_etree.attrib["shared"] = "true"
if self._work_dir is not None:
do_etree.attrib["work_dir"] = self._work_dir
return do_etree
def __repr__(self):
return self._do
[docs] @staticmethod
def read_process_environment(work_dir, remove_after_read=True, pid=None):
"""Read standard environment info file in given directory."""
env = dict()
last = None
if pid is not None:
env_file_name = jube2.conf.ENVIRONMENT_INFO.replace(
'.', '_{}.'.format(pid))
else:
env_file_name = jube2.conf.ENVIRONMENT_INFO
env_file_path = os.path.join(work_dir, env_file_name)
if os.path.isfile(env_file_path):
env_file = open(env_file_path, "r")
for line in env_file:
line = line.rstrip()
matcher = re.match(r"^(\S.*?)=(.*?)$", line)
if matcher:
env[matcher.group(1)] = matcher.group(2)
last = matcher.group(1)
elif last is not None:
env[last] += "\n" + line
env_file.close()
if remove_after_read:
os.remove(env_file_path)
return env
[docs]class DoLog(object):
"""A DoLog class containing the operations and information for setting up the do log."""
def __init__(self, log_dir, log_file, initial_env, cycle=0):
self._log_dir = log_dir
if log_file != None:
if log_file[-1] == '/':
raise ValueError(
"The path of do_log_file is ending with / which is a invalid file path.")
self._log_file = log_file
self._initial_env = initial_env
self._work_dir = None
self._cycle = cycle
self._log_path = None
@property
def log_path(self):
"""Get log directory"""
return self._log_path
@property
def log_file(self):
"""Get log file"""
return self._log_file
@property
def log_path(self):
"""Get log path"""
return self._log_path
@property
def work_dir(self):
"""Get last work directory"""
return self._work_dir
@property
def initial_env(self):
"""Get initial env"""
return self._initial_env
[docs] def initialiseFile(self, shell):
"""Initialise file if not yet existent."""
fdologout = open(self.log_path, 'a')
fdologout.write('#!'+shell+'\n\n')
for envVarName, envVarValue in self.initial_env.items():
fdologout.write('set '+envVarName+"='" +
envVarValue.replace('\n', '\\n')+"'\n")
fdologout.write('\n')
fdologout.close()
[docs] def store_do(self, do, shell, work_dir, parameter_dict=None, shared=False):
"""Store the current execution directive to the do log and set up the environment if file does not yet exist."""
if self._log_file == None:
return
if self._log_path == None:
if parameter_dict:
new_log_file = jube2.util.util.substitution(
self._log_file, parameter_dict)
new_log_file = os.path.expandvars(
os.path.expanduser(new_log_file))
self._log_file = new_log_file
if re.search(jube2.parameter.Parameter.parameter_regex, self._log_file):
raise IOError(("Given do_log_file path {0} contains a unknown " +
"JUBE or environment variable.").format(
self._log_file))
if self._log_file[0] == '/':
self._log_path = self._log_file
elif '/' not in self._log_file:
self._log_path = os.path.join(self._log_dir, self._log_file)
else:
self._log_path = os.path.join(os.getcwd(), self._log_file)
# create directory if not yet existent
if not os.path.exists(os.path.dirname(self.log_path)):
os.makedirs(os.path.dirname(self.log_path))
if not os.path.exists(self.log_path):
self.initialiseFile(shell)
fdologout = open(self.log_path, 'a')
if work_dir != self.work_dir:
fdologout.write('cd '+work_dir+'\n')
self._work_dir = work_dir
fdologout.write(do)
if shared:
fdologout.write(' # shared execution')
fdologout.write('\n')
fdologout.close()