Source code for jube2.workpackage

# JUBE Benchmarking Environment
# Copyright (C) 2008-2024
# Forschungszentrum Juelich GmbH, Juelich Supercomputing Centre
# http://www.fz-juelich.de/jsc/jube
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""The Workpackage class handles a step and its parameter space"""

from __future__ import (print_function,
                        unicode_literals,
                        division)

import multiprocessing as mp
import xml.etree.ElementTree as ET
import jube2.util.util
import jube2.util.output
import jube2.conf
import jube2.log
import jube2.parameter
import jube2.step
import os
import re
import stat
import shutil

LOGGER = jube2.log.get_logger(__name__)


[docs]class Workpackage(object): """A Workpackage contains all information to run a specific step with its given parameterset. """ # class based counter for unique id creation id_counter = 0 def __init__(self, benchmark, step, local_parameter_names, parameterset, workpackage_id=None, iteration=0, cycle=0): # set id if workpackage_id is None: self._id = Workpackage.id_counter Workpackage.id_counter = Workpackage.id_counter + 1 else: self._id = workpackage_id self._benchmark = benchmark self._step = step self._local_parameter_names = local_parameter_names self._parameterset = parameterset self._iteration = iteration self._parents = list() self._children = list() self._iteration_siblings = set() self._queued = False self._env = dict(os.environ) self._cycle = cycle self._workpackage_dir_caching_enabled = False self._workpackage_dir_cache = None
[docs] def etree_repr(self): """Return etree object representation""" workpackage_etree = ET.Element("workpackage") workpackage_etree.attrib["id"] = str(self._id) step_etree = ET.SubElement(workpackage_etree, "step") step_etree.attrib["iteration"] = str(self._iteration) step_etree.attrib["cycle"] = str(self._cycle) step_etree.text = self._step.name if len(self._local_parameter_names) > 0: workpackage_etree.append( self.local_parameterset.etree_repr(use_current_selection=True)) if len(self._parents) > 0: parents_etree = ET.SubElement(workpackage_etree, "parents") parents_etree.text = ",".join( [str(parent.id) for parent in self._parents]) if len(self._iteration_siblings) > 0: sibling_etree = ET.SubElement(workpackage_etree, "iteration_siblings") sibling_etree.text = ",".join( [str(sibling.id) for sibling in self._iteration_siblings]) environment_etree = ET.SubElement(workpackage_etree, "environment") for env_name, value in self._env.items(): if (env_name not in ["PWD", "OLDPWD", "_"]) and \ (env_name not in os.environ or os.environ[env_name] != value): env_etree = ET.SubElement(environment_etree, "env") env_etree.attrib["name"] = env_name # use string repr to avoid special characters env_etree.text = repr(value) for env_name in os.environ: if (env_name not in ["PWD", "OLDPWD", "_"]) and \ (env_name not in self._env): env_etree = ET.SubElement(environment_etree, "nonenv") env_etree.attrib["name"] = env_name return workpackage_etree
def __repr__(self): return (("Workpackage(Id:{0:2d}; Step:{1}; ParentIDs:{2}; " + "ChildIDs:{3} {4})"). format(self._id, self._step.name, [parent.id for parent in self._parents], [child.id for child in self._children], self.local_parameterset)) def __eq__(self, other): if isinstance(other, Workpackage): return self.id == other.id else: return False def __hash__(self): return object.__hash__(self) @property def parameter_dict(self): """get all available parameter inside a dict""" # Collect parameter for substitution parameter = dict([[par.name, par.value] for par in self._parameterset.constant_parameter_dict.values()]) return parameter @property def env(self): """Return workpackage environment""" return self._env @env.setter def env(self, set_env): """Replace own environment by set_env""" self._env = set_env @property def cycle(self): """Return current loop cycle""" return self._cycle @cycle.setter def cycle(self, set_cycle): """Update loop cycle counter""" self._cycle = set_cycle
[docs] def allow_workpackage_dir_caching(self): """Enable workpackage dir cache""" self._workpackage_dir_caching_enabled = True self._workpackage_dir_cache = None
@property def active(self): """Check active state""" active = self._step.active # Collect parameter for substitution parameter = self.parameter_dict # Parameter substitution active = jube2.util.util.substitution(active, parameter) # Evaluate active state return jube2.util.util.eval_bool(active) @property def done(self): """Workpackage done?""" done_file = os.path.join(self.workpackage_dir, jube2.conf.WORKPACKAGE_DONE_FILENAME) exist = os.path.exists(done_file) if jube2.conf.DEBUG_MODE: exist = exist or os.path.exists(done_file + "_DEBUG") return exist @done.setter def done(self, set_done): """Set/reset Workpackage done""" done_file = os.path.join(self.workpackage_dir, jube2.conf.WORKPACKAGE_DONE_FILENAME) if jube2.conf.DEBUG_MODE: done_file = done_file + "_DEBUG" if set_done: fout = open(done_file, "w") fout.write(jube2.util.util.now_str()) fout.close() self._remove_operation_info_files() else: if os.path.exists(done_file): os.remove(done_file) @property def error(self): """Workpackage error?""" error_file = os.path.join(self.workpackage_dir, jube2.conf.WORKPACKAGE_ERROR_FILENAME) return os.path.exists(error_file)
[docs] def set_error(self, set_error, msg=""): """Set/reset Workpackage error""" error_file = os.path.join(self.workpackage_dir, jube2.conf.WORKPACKAGE_ERROR_FILENAME) if set_error: fout = open(error_file, "w") fout.write(msg) fout.close() else: if os.path.exists(error_file): os.remove(error_file)
@property def queued(self): """Workpackage queued?""" return self._queued @queued.setter def queued(self, set_queued): """Set queued state""" self._queued = set_queued @property def started(self): """Workpackage started?""" return os.path.exists(self.workpackage_dir)
[docs] def operation_done_but_pending(self, operation_number): """Check if an operation was executed, but the result is still pending (because it is a async do)""" result = self.operation_done(operation_number) operation = self._step.operations[operation_number] if result and (operation.async_filename is not None): parameter_dict = self.parameter_dict if operation.active(parameter_dict): work_dir = self.work_dir alt_work_dir = self.alt_work_dir(parameter_dict) if alt_work_dir is not None: work_dir = alt_work_dir async_filename = jube2.util.util.substitution( operation.async_filename, parameter_dict) async_filename = \ os.path.expandvars(os.path.expanduser(async_filename)) result = not os.path.exists(os.path.join(work_dir, async_filename)) else: result = False else: result = False return result
[docs] def operation_done(self, operation_number, set_done=None): """Mark/checks operation status""" done_file = os.path.join(self.workpackage_dir, "wp_{0}_{1:02d}".format( jube2.conf.WORKPACKAGE_DONE_FILENAME, operation_number)) if set_done is None: exist = os.path.exists(done_file) if jube2.conf.DEBUG_MODE: exist = exist or os.path.exists(done_file + "_DEBUG") return exist else: if jube2.conf.DEBUG_MODE: done_file = done_file + "_DEBUG" elif ((set_done and not os.path.exists(done_file)) or (not set_done and os.path.exists(done_file))): jube2.util.util.update_timestamps( os.path.join(self._benchmark.bench_dir, jube2.conf.TIMESTAMPS_INFO), "change") if set_done: fout = open(done_file, "w") fout.close() else: if os.path.exists(done_file): os.remove(done_file) return set_done
def _remove_operation_info_files(self): """Remove all operation info files""" for operation_number in range(len(self._step.operations)): self.operation_done(operation_number, False)
[docs] def remove(self, remove_config_from_benchmark=False): """Remove all data of this workpackage""" for children in self.children: children.remove(remove_config_from_benchmark=True) shutil.rmtree(self.workpackage_dir, ignore_errors=True) # Remove shared folder if all workpackages of the current step were # removed if self._step.shared_link_name is not None: all_deleted = True for workpackage in self._benchmark.workpackages[self._step.name]: if workpackage.started: all_deleted = False if all_deleted: shared_folder = self._step.shared_folder_path( self._benchmark.bench_dir, self.parameter_dict) shutil.rmtree(shared_folder, ignore_errors=True) if remove_config_from_benchmark: self.benchmark.remove_workpackage(self)
[docs] def add_parent(self, workpackage): """Add a parent Workpackage""" self._parents.append(workpackage)
@property def parameterset(self): """Return parameterset""" return self._parameterset @parameterset.setter def parameterset(self, set_parameterset): """Set/overwrite parameterset""" self._parameterset.add_parameterset(set_parameterset)
[docs] def add_children(self, workpackage): """Add a children workpackage""" self._children.append(workpackage)
@property def local_parameterset(self): """Return local parameterset""" parameterset = jube2.parameter.Parameterset() for name in self._local_parameter_names: parameterset.add_parameter(self._parameterset[name]) return parameterset @property def parent_history(self): """Create a list of all parents in the history of this workpackage""" history = list() for parent in self._parents: history += parent.parent_history history += self._parents return history @property def benchmark(self): """Return benchmark of this workpackage""" return self._benchmark @property def children_future(self): """Create a list of all children in the future of this workpackage""" future = list() future += self._children for child in self._children: future += child.children_future return future @property def id(self): """Return workpackage id""" return self._id @property def parents(self): """Return list of parent workpackages""" return self._parents @property def iteration_siblings(self): """Return set of iteration siblings""" return self._iteration_siblings @property def iteration(self): """Return workpackage iteration number""" return self._iteration @property def children(self): """Return list of child workpackages""" return self._children @property def step(self): """Return Step data""" return self._step
[docs] def status(self): """return FINISHED, RUNNING or DONE dependign on the workpackage status""" if self.done: return "DONE" elif self.error: return "ERROR" else: return "RUNNING"
[docs] def update_status(self): """Update status in jube parameter""" parameterset = jube2.parameter.Parameterset() parameterset.add_parameter( jube2.parameter.Parameter. create_parameter("jube_wp_status", self.status(), parameter_type="string", update_mode=jube2.parameter.JUBE_MODE)) self.parameterset.update_parameterset(parameterset)
[docs] def get_jube_cycle_parameterset(self): """Return parameterset which contains cycle related information""" parameterset = jube2.parameter.Parameterset() # worpackage cycle parameterset.add_parameter( jube2.parameter.Parameter. create_parameter("jube_wp_cycle", str(self._cycle), parameter_type="int", update_mode=jube2.parameter.JUBE_MODE)) return parameterset
[docs] def create_relpath(self, value): """Create relative path representation""" return os.path.relpath(value, self._benchmark.file_path_ref)
[docs] def create_abspath(self, value): """Create absolute path representation""" return os.path.abspath(value)
[docs] def get_jube_parameterset(self): """Return parameterset which contains workpackage related information""" parameterset = jube2.parameter.Parameterset() # workpackage id parameterset.add_parameter( jube2.parameter.Parameter. create_parameter("jube_wp_id", str(self._id), parameter_type="int", update_mode=jube2.parameter.JUBE_MODE)) # workpackage id with padding parameterset.add_parameter( jube2.parameter.Parameter. create_parameter("jube_wp_padid", jube2.util.util.id_dir("", self._id), parameter_type="string", update_mode=jube2.parameter.JUBE_MODE)) # workpackage status parameterset.add_parameter( jube2.parameter.Parameter. create_parameter("jube_wp_status", self.status(), parameter_type="string", update_mode=jube2.parameter.JUBE_MODE)) # workpackage iteration parameterset.add_parameter( jube2.parameter.Parameter. create_parameter("jube_wp_iteration", str(self._iteration), parameter_type="int", update_mode=jube2.parameter.JUBE_MODE)) parameterset.add_parameterset(self.get_jube_cycle_parameterset()) # pathes if self._step.alt_work_dir is None: path = self.work_dir else: path = self._step.alt_work_dir # workpackage relative folder path parameterset.add_parameter( jube2.parameter.Parameter. create_parameter("jube_wp_relpath", path, update_mode=jube2.parameter.JUBE_MODE, eval_helper=self.create_relpath)) # workpackage absolute folder path parameterset.add_parameter( jube2.parameter.Parameter. create_parameter("jube_wp_abspath", path, update_mode=jube2.parameter.JUBE_MODE, eval_helper=self.create_abspath)) # parent workpackage id for parent in self._parents: parameterset.add_parameter( jube2.parameter.Parameter. create_parameter(("jube_wp_parent_{0}_id") .format(parent.step.name), str(parent.id), parameter_type="int", update_mode=jube2.parameter.JUBE_MODE)) # environment export string env_str = "" parameter_names = [parameter.name for parameter in self._parameterset.export_parameter_dict.values()] parameter_names.sort(key=str.lower) for name in parameter_names: env_str += "export {0}=${1}\n".format(name, name) env_par = jube2.parameter.Parameter.create_parameter( "jube_wp_envstr", env_str, no_templates=True, update_mode=jube2.parameter.JUBE_MODE, eval_helper=jube2.parameter.StaticParameter.fix_export_string) parameterset.add_parameter(env_par) # environment export list parameterset.add_parameter( jube2.parameter.Parameter.create_parameter( "jube_wp_envlist", ",".join([name for name in parameter_names]), no_templates=True, update_mode=jube2.parameter.JUBE_MODE)) return parameterset
[docs] def create_workpackage_dir(self): """Create work directory""" if not os.path.exists(self.workpackage_dir): if "$" in self.workpackage_dir: raise RuntimeError(("'{0}' could not be evaluated and used " + "as a workpackage directory name. " + "Please check the suffix setting.") .format(self.workpackage_dir)) os.mkdir(self.workpackage_dir) os.mkdir(self.work_dir) # Create symbolic link to parent workpackage folder for parent in self._parents: link_path = os.path.join(self.work_dir, parent.step.name) parent_path = os.path.relpath(parent.work_dir, self.work_dir) if not os.path.exists(link_path): os.symlink(parent_path, link_path)
@property def workpackage_dir(self): """Return workpackage directory""" if not self._workpackage_dir_caching_enabled or \ self._workpackage_dir_cache is None: suffix = self.step.suffix if suffix != "": # Collect parameter for substitution parameter = \ dict([[par.name, par.value] for par in self._parameterset.constant_parameter_dict.values()]) # Parameter substitution suffix = jube2.util.util.substitution(suffix, parameter) suffix = "_" + os.path.expandvars(os.path.expanduser(suffix)) path = "{path}_{step_name}{suffix}".format( path=jube2.util.util.id_dir( self._benchmark.bench_dir, self._id), step_name=self._step.name, suffix=suffix) if self._workpackage_dir_caching_enabled: if self._workpackage_dir_cache is None: self._workpackage_dir_cache = path return self._workpackage_dir_cache else: return path @property def work_dir(self): """Return working directory (user space)""" return os.path.join(self.workpackage_dir, "work")
[docs] def alt_work_dir(self, parameter_dict=None): """Return location of alternative working_dir""" if self._step.alt_work_dir is not None: if parameter_dict is None: parameter_dict = self.parameter_dict alt_work_dir = self._step.alt_work_dir alt_work_dir = jube2.util.util.substitution(alt_work_dir, parameter_dict) alt_work_dir = os.path.expandvars(os.path.expanduser(alt_work_dir)) alt_work_dir = os.path.join(self._benchmark.file_path_ref, alt_work_dir) return alt_work_dir else: return None
def _run_operations(self, parameter, work_dir, pid=None): """Run all available operations""" continue_op = True continue_cycle = True doLog = jube2.step.DoLog(log_dir=os.path.dirname( self.work_dir), log_file=self.step._do_log_file, initial_env=self.env, cycle=self._cycle) for operation_number, operation in enumerate(self._step.operations): # Check if the operation is activated active = operation.active(parameter) if not active: self.operation_done(operation_number, True) # Do nothing, if the next operation is already finished. # Otherwise a removed async_file will result in a new # pending operation, if there are two async-operations in # a row elif not self.operation_done(operation_number + 1): # shared operation if operation.shared: # wait for all other workpackages and check if shared # operation already finished shared_done = False for workpackage in \ self._benchmark.workpackages[self._step.name]: # All workpackages must reach the same position in # the program if operation_number > 0: continue_op = continue_op and \ ((workpackage.operation_done( operation_number - 1) and (not workpackage.operation_done_but_pending( operation_number - 1)) ) or workpackage.done) and \ workpackage.cycle == self._cycle # Check if another workpackage already finalized # the operation, only if the operation was active # for this particular workpackage shared_done = shared_done or \ ((workpackage.operation_done( operation_number + 1) or workpackage.done ) and operation.active(workpackage.parameter_dict)) # If a workpackage is removed and restarted, a shared # operation will not be re-executed, user should be warned if shared_done and not self.operation_done( operation_number): LOGGER.warning( "\nShared operation in {0} was already executed". format(self._step.name)) # All older workpackages in tree must be done for step_name in self._step.get_depend_history( self._benchmark): for workpackage in self._benchmark.workpackages[ step_name]: continue_op = continue_op and workpackage.done if continue_op and not shared_done: # remove workpackage specific parameter shared_parameter = dict(parameter) for jube_parameter in self.get_jube_parameterset()\ .all_parameter_names: if jube_parameter in shared_parameter: del shared_parameter[jube_parameter] # work_dir = shared_dir shared_dir = \ self._step.shared_folder_path( self._benchmark.bench_dir, shared_parameter) LOGGER.debug("====== {0} - shared ======" .format(self._step.name)) continue_op, continue_cycle = operation.execute( parameter_dict=shared_parameter, work_dir=shared_dir, environment=self._env, only_check_pending=self.operation_done( operation_number), dolog=doLog) # update all workpackages for workpackage in self._benchmark.workpackages[ self._step.name]: # if the operation wasn't active in the shared # operation it must not be triggered to # restart if operation.active( workpackage.parameter_dict): if not workpackage.started: workpackage.create_workpackage_dir() workpackage.operation_done( operation_number, True) if continue_op and not continue_cycle: workpackage.done = True # requeue other workpackages if not workpackage.queued and continue_op: self._benchmark.work_stat.put( workpackage) LOGGER.debug("======================={0}" .format(len(self._step.name) * "=")) else: continue_op, continue_cycle = operation.execute( parameter_dict=parameter, work_dir=work_dir, environment=self._env, only_check_pending=self.operation_done( operation_number), pid=pid, dolog=doLog) self.operation_done(operation_number, True) if not continue_op or not continue_cycle: break return continue_op, continue_cycle
[docs] def run(self, mode='s'): """Run step and use current parameter space mode: s = seriell (default); p = parallel """ proc_id = None # create individual log files for each processor in a parallel run if mode == "p": proc_id = mp.current_process()._identity[0] log_fname = jube2.log.LOGFILE_NAME.split('/')[-1] jube2.log.change_logfile_name(os.path.join( self.benchmark.bench_dir, log_fname.replace('.', '_{}.').format(proc_id) if (('_'+str(proc_id)) not in log_fname) else log_fname)) # Workpackage already done or error? if self.done or self.error: # the return value is only relevant for the parallel case, for now # for the serial case the return value is not used at all return {"id": self._id, "step_name": self._step.name} continue_op = True continue_cycle = True while (continue_cycle and continue_op): stepstr = ("{0} ( iter:{2} | id:{1} | parents:{3} | cycle:{4} | procs:{5} )" .format(self._step.name, self._id, self._iteration, ",".join([parent.step.name + "(" + str(parent.id) + ")" for parent in self._parents]), self._cycle, self._step.procs)) stepstr = "----- {0} -----".format(stepstr) LOGGER.debug(stepstr) # --- Check if this is the first run --- started_before = self.started # --- Create directory structure --- if not started_before: self.create_workpackage_dir() # --- Load environment of parent steps --- if not started_before: for parent in self._parents: if parent.step.export: self._env.update(parent.env) # --- Update JUBE parameter for new cycle --- if self._cycle > 0: self.parameterset.update_parameterset( self.get_jube_cycle_parameterset()) # --- Update cycle parameter --- update_parameter = \ self.parameterset.get_updatable_parameter( mode=jube2.parameter.CYCLE_MODE, keep_index=True) if len(update_parameter) > 0: fixed_parameterset = self.parameterset.copy() for parameter in update_parameter: fixed_parameterset.delete_parameter(parameter) change = True while change: change = False update_parameter.parameter_substitution( [fixed_parameterset]) if update_parameter.has_templates: update_parameter = list( update_parameter.expand_templates())[0] change = True update_parameter.parameter_substitution( [fixed_parameterset], final_sub=True) self.parameterset.update_parameterset(update_parameter) debugstr = " updated parameter:\n" debugstr += jube2.util.output.text_table( [("parameter", "value")] + sorted( [(par.name, par.value) for par in update_parameter]), use_header_line=True, indent=9, align_right=False) LOGGER.debug(debugstr) # --- Collect parameter for substitution --- parameter = self.parameter_dict if not started_before: # --- Collect export parameter --- self._env.update( dict([[par.name, par.value] for par in self._parameterset.export_parameter_dict.values()])) # --- Create shared folder connection --- if self._cycle == 0: self.create_shared_folder_link(parameter) # --- Create alternativ working dir --- alt_work_dir = self.alt_work_dir(parameter) if alt_work_dir is not None: # Check if given work directory contains any remaining variable if re.search(jube2.parameter.Parameter.parameter_regex, alt_work_dir): raise IOError(("Given work directory {0} contains a " + "unknown JUBE or environment variable.") .format(alt_work_dir)) LOGGER.debug(" switch to alternativ work dir: \"{0}\"" .format(alt_work_dir)) if not jube2.conf.DEBUG_MODE and \ not os.path.exists(alt_work_dir): try: os.makedirs(alt_work_dir) except FileExistsError: pass # Get group_id if available (given by JUBE_GROUP_NAME) group_id = jube2.util.util.check_and_get_group_id() if group_id is not None: os.chown(alt_work_dir, os.getuid(), group_id) os.chmod(alt_work_dir, os.stat(alt_work_dir).st_mode | stat.S_ISGID) # Print debug info if self._cycle == 0: debugstr = " available parameter:\n" debugstr += jube2.util.output.text_table( [("parameter", "value")] + sorted( [(name, par) for name, par in parameter.items()]), use_header_line=True, indent=9, align_right=False) LOGGER.debug(debugstr) # --- Copy files to working dir or create links --- if not started_before: # Filter for filesets in uses fileset_names = \ self._step.get_used_sets(self._benchmark.filesets, parameter) for name in fileset_names: self._benchmark.filesets[name].create( work_dir=self.work_dir, parameter_dict=parameter, alt_work_dir=alt_work_dir, environment=self._env, file_path_ref=self._benchmark.file_path_ref) work_dir = self.work_dir if alt_work_dir is not None: work_dir = alt_work_dir # --- File substitution --- if not started_before: # Filter for substitutionsets in uses substituteset_names = \ self._step.get_used_sets(self._benchmark.substitutesets, parameter) for name in substituteset_names: self._benchmark.substitutesets[name].substitute( parameter_dict=parameter, work_dir=work_dir) try: # Run all operations # continue_op = false means -> async operation or wait for # others in shared operation # continue_cycle = false -> loop cycle was interrupted continue_op, continue_cycle = \ self._run_operations(parameter, work_dir, pid=proc_id) # --- Check cycle limit --- if self._cycle + 1 >= self._step.cycles: continue_cycle = False if continue_op and continue_cycle: # --- Prepare additional cycle if needed --- self._cycle += 1 self._remove_operation_info_files() elif continue_op: # --- Write information file to mark end of work --- self.done = True except RuntimeError as e: self.set_error(True, str(e)) continue_cycle = False if jube2.conf.EXIT_ON_ERROR: raise(RuntimeError(str(e))) else: LOGGER.debug( "{0}\n{1}\n{2}".format(40 * "-", str(e), 40 * "-")) # Delete parameters, which contain a method being # a function of a class. This avoids excessive memory # usage when the data is sent back to the main process. # It happens here, that these parameters are static and # therefore not changed within this workpackage execution. if mode == 'p': parameterDeletionList = list() for p in self._parameterset.all_parameters: if(p.search_method(propertyString="eval_helper", recursiveProperty="based_on")): parameterDeletionList.append(p) for p in parameterDeletionList: self._parameterset.delete_parameter(p) parameterDeletionList = None return {"id": self._id, "step_name": self._step.name, "env": self._env, "cycle": self._cycle, "parameterset": self._parameterset}
[docs] @staticmethod def reduce_workpackage_id_counter(): Workpackage.id_counter = Workpackage.id_counter - 1