diff --git a/contrib/slurm-benchmark.py b/contrib/slurm-benchmark.py index d91e298b8..ae724665a 100755 --- a/contrib/slurm-benchmark.py +++ b/contrib/slurm-benchmark.py @@ -14,6 +14,8 @@ import os import sys +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + import benchexec.benchexec import benchexec.tools import benchexec.util @@ -43,6 +45,12 @@ def create_argument_parser(self): action="store_true", help="Use SLURM to execute benchmarks.", ) + slurm_args.add_argument( + "--slurm-array", + dest="slurm_array", + action="store_true", + help="Use SLURM array jobs to execute benchmarks.", + ) slurm_args.add_argument( "--singularity", dest="singularity", @@ -61,13 +69,65 @@ def create_argument_parser(self): dest="retry", type=int, default="0", - help="Retry killed jobs this many times. Use -1 for unbounded retry attempts.", + help="Retry killed jobs this many times. Use -1 for unbounded retry attempts (cannot be used with --slurm-array).", + ) + + slurm_args.add_argument( + "--aggregation-factor", + dest="aggregation_factor", + type=int, + default="10", + help="Aggregation factor for batch jobs (this many tasks will run in a single SLURM job).", + ) + slurm_args.add_argument( + "--batch-size", + dest="batch_size", + type=int, + default="5000", + help="Split run sets into batches of at most this size. Helpful in avoiding errors with script sizes.", + ) + slurm_args.add_argument( + "--parallelization", + dest="concurrency_factor", + type=int, + default="4", + help="Run this many tasks at once in one job.", + ) + slurm_args.add_argument( + "--overtime-factor", + dest="overtime_factor", + type=float, + default="1.1", + help="Factor which by to scale timelimits to overapproximate CPU time limit with walltime limit.", + ) + slurm_args.add_argument( + "--continue-interrupted", + dest="continue_interrupted", + action="store_true", + help="Continue a previously interrupted job.", + ) + slurm_args.add_argument( + "--copy-tool", + dest="copy_tool", + action="store_true", + help="Make a copy of the tool folder in the container.", + ) + slurm_args.add_argument( + "--generate-only", + dest="generate_only", + action="store_true", + help="Only generate the SLURM array description, don't run it.", ) return parser def load_executor(self): - if self.config.slurm: + if self.config.slurm_array: + from slurm import arrayexecutor as executor + elif self.config.slurm: + logging.error( + "Single-job-based SLURM-integration is no longer supported. Use --slurm-array instead." + ) from slurm import slurmexecutor as executor else: logging.warning( diff --git a/contrib/slurm/README-old.md b/contrib/slurm/README-old.md new file mode 100644 index 000000000..035c8ee44 --- /dev/null +++ b/contrib/slurm/README-old.md @@ -0,0 +1,94 @@ + +# BenchExec Extension for Benchmarking via SLURM + +> [!CAUTION] +> This, single-job-based SLURM integration is no longer maintained. For the maintained, array-based version's documentation, see [README.md](./README.md) + +This Python script extends BenchExec, a benchmarking framework, to facilitate benchmarking via SLURM, optionally using a Singularity container. + +In case of problems, please tag in an [issue](https://github.com/sosy-lab/benchexec/issues/new/choose): [Levente Bajczi](https://github.com/leventeBajczi) (@leventeBajczi). + +## Preliminaries + +* [SLURM](https://slurm.schedmd.com/documentation.html) is an open-source job scheduling and workload management system used primarily in high-performance computing (HPC) environments. +* [Singularity](https://docs.sylabs.io/guides/latest/user-guide/) is a containerization platform designed for scientific and high-performance computing (HPC) workloads, providing users with a reproducible and portable environment for running applications and workflows. + +## Requirements + +* SLURM, tested with `slurm 22.05.7`, should work within `22.x.x` +* Singularity (optional), tested with `singularity-ce version 4.0.1`, should work within `4.x.x` + +## Usage +1. Run the script with Python 3: + ``` + python3 $BENCHEXEC_FOLDER/contrib/slurm-benchmark.py [options] + ``` + Options: + - `--slurm`: Use SLURM to execute benchmarks. Will revert to regular (local) benchexec if not given. + - `--singularity `: Specify the path to the Singularity .sif file to use. See usage later. + - `--scratchdir `: Specify the directory for temporary files. The script will use this parameter to create temporary directories for file storage per-run, which get discarded later. By default, this is the CWD, which might result in temporary files being generated by the thousands in the working directory. On some systems, this must be on the same mount, or even under the same hierarchy as the current directory. Must exist, be writable, and be a directory. + - `--retry-killed `: Retry killed jobs (e.g., due to SLURM errors) this many times. Use -1 for unbounded retry attempts. + - `-N `: Specify the factor of parallelism, i.e., how many instances to start at a time. Tested with up to `1000`, probably works with much higher values as well. + +## Overview of the Workflow + +This works similarly to BenchExec, however, instead of delegating each run to `runexec`, it delegates to `srun` from SLURM. + +1. If the `--singularity` option is given, the script wraps the command to run in a container. This is useful for dependency management (in most HPC environments, arbitrary package installations are frowned upon). For a simple container, use the following: + + ```singularity + BootStrap: docker + From: ubuntu:22.04 + + %post + apt -y update + apt -y install openjdk-17-jre-headless libgomp1 libmpfr-dev fuse-overlayfs + ``` + + Use `singularity build [--remote / --fakeroot] --fix-perms .sif .def` to build the container. + + Notice the `fuse-overlayfs` package. That is mandatory for the overlay filesystem to work properly. + + The script parameterizes `singularity exec` with the following params: + * `-B $PWD:/lower`: Bind the working directory to `/lower` (could be read-only) + * `--no-home`: Do not bind the home directory + * `-B {tempdir}:/overlay`: Bind the temporary directory to `/overlay` (must be writeable) + * `--fusemount "container:fuse-overlayfs -o lowerdir=/lower -o upperdir=/overlay/upper -o workdir=/overlay/work $HOME"`: mount an overlay filesystem at $HOME, where modifications go in the temp dir but files can be read from the current dir + + We also wrap this command inside the container using `bash -c "{command} && echo 0 > exitcode || echo $? > exitcode` to save the exitcode of the process, _and_ always have 0 as the exitcode of a completed run. Otherwise, we cannot differentiate between a FAILURE happening due to SLURM-issues (e.g., transport failures), or a simply failing command. Otherwise, retrying would not work. + +2. Currently, the following parameters are passed to `srun` (calculated from the benchmark's parameters): + * `-t ` CPU timelimit (generally, SLURM will round up to nearest minute) + * `-c ` number of cpus + * `--threads-per-core=1` only use one thread per core + * `--mem-per-cpu ` memory allocaiton in MBs per cpu + * `--ntasks=1` number of tasks per node + +3. The script parses the resulting job ID, and after the job finishes, runs `seff` to gather resource usage data: + * Exit code + * Status + * CPU time [s] + * Wall time [s] + * Memory [MB] + +## Limitations + +Currently, there are the following limitations compared to local benchexec: + +1. No advanced resource constraining / monitoring: only CPU time, CPU core and memory limits are handled, and only CPU time, wall time, and memory usage are monitored. +2. No exotic paths in the command are handled: only the current working directory and its children are visible in the container +3. The user on the host and the container should not differ (due to using $HOME in the commands). +4. Without singularity, no constraint is placed on the resulting files of the runs: this will populate the current directory with all the output files of all the runs. +5. For timed-out runs, where SLURM terminated the run, no CPU time values are available. +6. The executor only works with hyperthreading disabled, due to the inability to query nodes about the number of threads per core. Assuming it's always 2 is risky, as it may not hold true universally. Consequently, because we can only request whole cores from SLURM instead of threads, we must divide the requested number of threads by the threads-per-core value, which is unknown if hyperthreading could be enabled. +7. Cancelling a benchmark run (by sending SIGINT) could be delayed up to a few minutes depending on the SLURM configuration. \ No newline at end of file diff --git a/contrib/slurm/README.md b/contrib/slurm/README.md index dff49d3d1..69e51b513 100644 --- a/contrib/slurm/README.md +++ b/contrib/slurm/README.md @@ -11,7 +11,10 @@ SPDX-License-Identifier: Apache-2.0 --> # BenchExec Extension for Benchmarking via SLURM -This Python script extends BenchExec, a benchmarking framework, to facilitate benchmarking via SLURM, optionally using a Singularity container. +> [!IMPORTANT] +> The previous, single-job-based SLURM integration is no longer maintained. For its documentation, see [README-old.md](./README-old.md) + +This Python script extends BenchExec, a benchmarking framework, to facilitate benchmarking via SLURM array jobs using Singularity containers. In case of problems, please tag in an [issue](https://github.com/sosy-lab/benchexec/issues/new/choose): [Levente Bajczi](https://github.com/leventeBajczi) (@leventeBajczi). @@ -23,7 +26,8 @@ In case of problems, please tag in an [issue](https://github.com/sosy-lab/benche ## Requirements * SLURM, tested with `slurm 22.05.7`, should work within `22.x.x` -* Singularity (optional), tested with `singularity-ce version 4.0.1`, should work within `4.x.x` +* Singularity, tested with `singularity-ce version 4.0.1`, should work within `4.x.x` +* cgroup support is required ## Usage 1. Run the script with Python 3: @@ -31,61 +35,55 @@ In case of problems, please tag in an [issue](https://github.com/sosy-lab/benche python3 $BENCHEXEC_FOLDER/contrib/slurm-benchmark.py [options] ``` Options: - - `--slurm`: Use SLURM to execute benchmarks. Will revert to regular (local) benchexec if not given. + - `--slurm-array`: Use SLURM array jobs to execute benchmarks. Will revert to regular (local) benchexec if not given. - `--singularity `: Specify the path to the Singularity .sif file to use. See usage later. - `--scratchdir `: Specify the directory for temporary files. The script will use this parameter to create temporary directories for file storage per-run, which get discarded later. By default, this is the CWD, which might result in temporary files being generated by the thousands in the working directory. On some systems, this must be on the same mount, or even under the same hierarchy as the current directory. Must exist, be writable, and be a directory. - `--retry-killed `: Retry killed jobs (e.g., due to SLURM errors) this many times. Use -1 for unbounded retry attempts. - - `-N `: Specify the factor of parallelism, i.e., how many instances to start at a time. Tested with up to `1000`, probably works with much higher values as well. + - `-N `: Specify the factor of parallelism, i.e., how many jobs to submit at a time. Tested with up to `1000`, probably works with much higher values as well. + - `--aggregation-factor`: Put this many jobs into a single job of the array. + - `--batch-size`: Allow this many runs inside a runcollection to be submitted. Lower values might hurt responsiveness, higher values might cause problems with script sizes. Suggested size is around a few thousand. + - `--parallelization`: Execute this many jobs in parallel inside a job of the array. ## Overview of the Workflow -This works similarly to BenchExec, however, instead of delegating each run to `runexec`, it delegates to `srun` from SLURM. +This works similarly to BenchExec, however, instead of delegating each run directly to `runexec`, it creates a hierarchy of run infos and an array job description for SLURM, which is then executed using `sbatch`. `runexec` is still used to measure and limit resources. -1. If the `--singularity` option is given, the script wraps the command to run in a container. This is useful for dependency management (in most HPC environments, arbitrary package installations are frowned upon). For a simple container, use the following: +1. The script wraps the command to run in a container. This is useful for dependency management (in most HPC environments, arbitrary package installations are frowned upon). For a simple container, use the following: ```singularity - BootStrap: docker - From: ubuntu:22.04 - - %post - apt -y update - apt -y install openjdk-17-jre-headless libgomp1 libmpfr-dev fuse-overlayfs + BootStrap: docker + From: ubuntu:24.04 + + %post + apt -y update + apt -y install + apt -y install software-properties-common + add-apt-repository ppa:sosy-lab/benchmarking + apt -y install benchexec fuse-overlayfs + mkdir /work + mkdir /upper ``` - Use `singularity build [--remote / --fakeroot] --fix-perms .sif .def` to build the container. + Use `singularity build [--remote / --fakeroot] --fix-perms .sif .def` to build the container. A remote service (e.g., [sylabs](https://cloud.sylabs.io/builder)) may be used if root permissions are missing. - Notice the `fuse-overlayfs` package. That is mandatory for the overlay filesystem to work properly. + Notice the `fuse-overlayfs` and `benchexec` packages. That is mandatory for the overlay filesystem to work properly and for `runexec` to exist in the container. The script parameterizes `singularity exec` with the following params: - * `-B $PWD:/lower`: Bind the working directory to `/lower` (could be read-only) + * `-B "/sys/fs/cgroup:/sys/fs/cgroup"`: Bind the cgroup hierarchy for use inside the container + * `-B {basedir}`: Bind the "base directory" (directory of the .sif file) (can be read-only) + * `-B {workdir}:/lower`: Bind the current directory to `/lower` (can be read-only) * `--no-home`: Do not bind the home directory * `-B {tempdir}:/overlay`: Bind the temporary directory to `/overlay` (must be writeable) - * `--fusemount "container:fuse-overlayfs -o lowerdir=/lower -o upperdir=/overlay/upper -o workdir=/overlay/work $HOME"`: mount an overlay filesystem at $HOME, where modifications go in the temp dir but files can be read from the current dir - - We also wrap this command inside the container using `bash -c "{command} && echo 0 > exitcode || echo $? > exitcode` to save the exitcode of the process, _and_ always have 0 as the exitcode of a completed run. Otherwise, we cannot differentiate between a FAILURE happening due to SLURM-issues (e.g., transport failures), or a simply failing command. Otherwise, retrying would not work. + * `--fusemount "container:fuse-overlayfs -o lowerdir=/lower -o upperdir=/overlay/upper -o workdir=/overlay/work {workdir}"`: mount an overlay filesystem at {workdir} under {basedir}, where modifications go in the temp dir -2. Currently, the following parameters are passed to `srun` (calculated from the benchmark's parameters): - * `-t ` CPU timelimit (generally, SLURM will round up to nearest minute) - * `-c ` number of cpus - * `--threads-per-core=1` only use one thread per core - * `--mem-per-cpu ` memory allocaiton in MBs per cpu - * `--ntasks=1` number of tasks per node +2. A `--batch-size`-sized portion of the runs is organized into bins of size `--aggregation-factor`. Each bin will correspond to a job in the array. Inside each bin, `--parallelization`-many `runexec` instances can be started with exact resource allocations and usage reporting. Output files and output log are stored inside the temp dir. If an error is encountered (most commonly this is due to `fuse` locking up and causing a TIMEOUT without any logs being ready) the run is put into a second-chance queue to be run again, at most `--retry-killed` times. -3. The script parses the resulting job ID, and after the job finishes, runs `seff` to gather resource usage data: - * Exit code - * Status - * CPU time [s] - * Wall time [s] - * Memory [MB] +3. The script parses the resource usage and status of each run, as it would with regular `runexec`. ## Limitations Currently, there are the following limitations compared to local benchexec: -1. No advanced resource constraining / monitoring: only CPU time, CPU core and memory limits are handled, and only CPU time, wall time, and memory usage are monitored. -2. No exotic paths in the command are handled: only the current working directory and its children are visible in the container -3. The user on the host and the container should not differ (due to using $HOME in the commands). -4. Without singularity, no constraint is placed on the resulting files of the runs: this will populate the current directory with all the output files of all the runs. -5. For timed-out runs, where SLURM terminated the run, no CPU time values are available. -6. The executor only works with hyperthreading disabled, due to the inability to query nodes about the number of threads per core. Assuming it's always 2 is risky, as it may not hold true universally. Consequently, because we can only request whole cores from SLURM instead of threads, we must divide the requested number of threads by the threads-per-core value, which is unknown if hyperthreading could be enabled. -7. Cancelling a benchmark run (by sending SIGINT) could be delayed up to a few minutes depending on the SLURM configuration. \ No newline at end of file +1. No exotic paths in the command are handled: only the directory of the `.sif` file and its children are visible in the container. +1. The executor only works with hyperthreading disabled, due to the inability to query nodes about the number of threads per core. Assuming it's always 2 is risky, as it may not hold true universally. Consequently, because we can only request whole cores from SLURM instead of threads, we must divide the requested number of threads by the threads-per-core value, which is unknown if hyperthreading could be enabled. +1. `fuse` sometimes locks up (more precisely: is in an uninterruptible state) for the entire duration of a job. My guess is the underlying lustre file system does not like it when the same path is overlayed from hundreds of nodes at the same time. As a mitigation, we re-run timed out jobs (not runs!). \ No newline at end of file diff --git a/contrib/slurm/arrayexecutor.py b/contrib/slurm/arrayexecutor.py new file mode 100644 index 000000000..736dba2ab --- /dev/null +++ b/contrib/slurm/arrayexecutor.py @@ -0,0 +1,552 @@ +# This file is part of BenchExec, a framework for reliable benchmarking: +# https://github.com/sosy-lab/benchexec +# +# SPDX-FileCopyrightText: 2007-2020 Dirk Beyer +# SPDX-FileCopyrightText: 2024 Levente Bajczi +# SPDX-FileCopyrightText: Critical Systems Research Group +# SPDX-FileCopyrightText: Budapest University of Technology and Economics +# +# SPDX-License-Identifier: Apache-2.0 +import glob +import logging +import math +import os +import re +import shlex +import shutil +import subprocess +import sys +import tempfile +import time +import zipfile + +from benchexec import tooladapter +from benchexec.tablegenerator import parse_results_file +from benchexec.util import ProcessExitCode, relative_path +from contrib.slurm.utils import ( + version_in_container, + get_system_info_srun, +) + +sys.dont_write_bytecode = True # prevent creation of .pyc files + +STOPPED_BY_INTERRUPT = False +singularity = None + + +def init(config, benchmark): + global singularity + assert ( + benchmark.config.singularity + ), "Singularity is required for array-based SLURM jobs." + singularity = benchmark.config.singularity + + tool_locator = tooladapter.create_tool_locator(config) + benchmark.tool.version = version_in_container(singularity, benchmark.tool_module) + benchmark.executable = benchmark.tool.executable(tool_locator) + try: + benchmark.tool_version = benchmark.tool.version(benchmark.executable) + except Exception as e: + logging.warning( + "could not determine version due to error: %s", + e, + ) + + +def get_system_info(): + return get_system_info_srun(singularity) + + +def execute_benchmark(benchmark, output_handler): + if benchmark.config.use_hyperthreading: + sys.exit( + "SLURM can only work properly without hyperthreading enabled, by passing the --no-hyperthreading option. See README.md for details." + ) + + if not benchmark.config.scratchdir: + sys.exit("No scratchdir present. Please specify using --scratchdir .") + elif not os.path.exists(benchmark.config.scratchdir): + os.makedirs(benchmark.config.scratchdir) + logging.debug(f"Created scratchdir: {benchmark.config.scratchdir}") + elif not os.path.isdir(benchmark.config.scratchdir): + sys.exit( + f"Scratchdir {benchmark.config.scratchdir} not a directory. Please specify using --scratchdir ." + ) + + # First we execute the tests + runs = [] + for runSet in benchmark.run_sets: + if STOPPED_BY_INTERRUPT: + break + + if not runSet.should_be_executed(): + output_handler.output_for_skipping_run_set(runSet) + + elif not runSet.runs: + output_handler.output_for_skipping_run_set( + runSet, "because it has no files" + ) + + else: + output_handler.output_before_run_set(runSet) + if benchmark.config.continue_interrupted: + runs.extend(filter_previous_results(runSet, benchmark, output_handler)) + else: + runs.extend(runSet.runs) + + for i in range(0, len(runs), benchmark.config.batch_size): + if not STOPPED_BY_INTERRUPT: + chunk = runs[i : min(i + benchmark.config.batch_size, len(runs))] + execute_batch(chunk, benchmark, output_handler) + + # Second we set the outputs + for runSet in benchmark.run_sets: + if STOPPED_BY_INTERRUPT: + break + + if not runSet.should_be_executed(): + output_handler.output_for_skipping_run_set(runSet) + + elif not runSet.runs: + output_handler.output_for_skipping_run_set( + runSet, "because it has no files" + ) + + else: + output_handler.output_after_run_set(runSet) + + time.sleep(5) + + output_handler.output_after_benchmark(STOPPED_BY_INTERRUPT) + + +sbatch_pattern = re.compile(r"Submitted batch job (\d+)") + + +def filter_previous_results(run_set, benchmark, output_handler): + prefix_base = f"{benchmark.config.output_path}{benchmark.name}." + files = list( + filter( + lambda file: file != benchmark.log_zip, + glob.glob(f"{prefix_base}*.logfiles.zip"), + ) + ) + if files and len(files) > 0: + prefix = str(max(files, key=os.path.getmtime))[0 : -(len(".logfiles.zip"))] + else: + logging.warning("No logfile zip found. Giving up recovery.") + return run_set.runs + logging.info(f"Logfile zip found with prefix {prefix}. Attempting recovery.") + + logfile_zip = prefix + ".logfiles.zip" + file_zip = prefix + ".files.zip" + + if not os.path.isfile(file_zip): + logging.warning(f"No {file_zip} found. Giving up recovery.") + return run_set.runs + + with zipfile.ZipFile(logfile_zip, "r") as logfile_zip_ref: + + with zipfile.ZipFile(file_zip, "r") as file_zip_ref: + + xml_filename_base = prefix + ".results." + run_set.name + xml = xml_filename_base + ".xml" + xml_bz2 = xml_filename_base + ".xml.bz2" + if os.path.exists(xml): + result_file = xml + elif os.path.exists(xml_bz2): + result_file = xml_bz2 + else: + logging.warning( + ".xml or .xml.bz2 must exist for previous run. Giving up recovery." + ) + return run_set.runs + + previous_results = parse_results_file(result_file) + + old_version = previous_results.get("version") + new_version = benchmark.tool_version + if old_version != new_version: + logging.warning( + f"Mismatch in tool version: old version={old_version}, current version: {new_version}" + ) + return run_set.runs + + old_options = previous_results.get("options") + new_options = " ".join(run_set.options) + if old_options != new_options: + logging.warning( + f"Mismatch in tool options: old options='{old_options}', current options: '{new_options}'" + ) + return run_set.runs + + previous_runs = {} + for elem in previous_results: + if elem.tag == "run": + values = {} + for col in elem: + if col.tag == "column": + if "walltime" == col.get("title"): + values["walltime"] = float( + str(col.get("value"))[:-1] + ) # ends in 's' + elif "cputime" == col.get("title"): + values["cputime"] = float( + str(col.get("value"))[:-1] + ) # ends in 's' + elif "memory" == col.get("title"): + values["memory"] = int( + str(col.get("value"))[:-1] + ) # ends in 'B' + elif "returnvalue" == col.get("title"): + values["exitcode"] = ProcessExitCode.create( + value=int(col.get("value")) + ) + elif "exitsignal" == col.get("title"): + values["exitcode"] = ProcessExitCode.create( + signal=int(col.get("value")) + ) + elif "terminationreason" == col.get("title"): + values["terminationreason"] = col.get("value") + # I think 'name' and 'properties' are enough to uniquely identify runs, but this should probably be more extensible + if values != {}: + previous_runs[(elem.get("name"), elem.get("properties"))] = ( + values + ) + + missing_runs = [] + for run in run_set.runs: + props = " ".join(sorted([prop.name for prop in run.properties])) + name = relative_path(run.identifier, result_file) + key = (name, props) + if key in previous_runs: + old_log = str( + os.path.join( + str(os.path.basename(logfile_zip))[0 : -(len(".zip"))], + run_set.real_name + + "." + + os.path.basename(run.identifier) + + ".log", + ) + ) + if old_log in logfile_zip_ref.namelist(): + with logfile_zip_ref.open(old_log) as zipped_log, open( + run.log_file, "wb" + ) as target_log: + shutil.copyfileobj(zipped_log, target_log) + + old_files_prefix = ( + str( + os.path.join( + str(os.path.basename(file_zip))[0 : -(len(".zip"))], + run_set.real_name, + os.path.basename(run.identifier), + ) + ) + + "/" + ) + + files_in_zip = [ + f + for f in file_zip_ref.namelist() + if f.startswith(old_files_prefix) + ] + if files_in_zip and len(files_in_zip) > 0: + os.makedirs(run.result_files_folder, exist_ok=True) + for file_in_zip in files_in_zip: + if not file_in_zip.endswith("/"): + with file_zip_ref.open( + file_in_zip + ) as source_file, open( + os.path.join( + run.result_files_folder, + os.path.basename(file_in_zip), + ), + "wb", + ) as target_file: + shutil.copyfileobj(source_file, target_file) + + run.cmdline() # we need to call this, because it sets the _cmdline value + run.set_result(previous_runs[key]) + output_handler.output_after_run(run) + else: + logging.warning( + f"Old files directory {old_files_prefix} does not exist. Skipping run {name}." + ) + missing_runs.append(run) + else: + logging.warning( + f"Old log {old_log} does not exist. Skipping run {name}." + ) + missing_runs.append(run) + else: + logging.warning( + f"Run with key {key} not found in results. Skipping run {name}." + ) + missing_runs.append(run) + + logging.info( + f"Successfully recovered {len(run_set.runs) - len(missing_runs)} runs, still missing {len(missing_runs)} more." + ) + return missing_runs + + +def execute_batch( + runs, + benchmark, + output_handler, + counter=0, +): + global STOPPED_BY_INTERRUPT + number_of_bins = int(len(runs) / benchmark.config.aggregation_factor) + 1 + + with tempfile.TemporaryDirectory(dir=benchmark.config.scratchdir, delete=not benchmark.config.generate_only) as tempdir: + batch_lines = ["#!/bin/bash"] + + for setting in get_resource_limits(benchmark, benchmark.config.concurrency_factor, math.ceil(benchmark.config.aggregation_factor * 1.0 / benchmark.config.concurrency_factor)): + batch_lines.extend(["\n#SBATCH " + str(setting)]) + + batch_lines.extend( + [f"\n#SBATCH --array=0-{number_of_bins - 1}%{benchmark.num_of_threads}"] + ) + batch_lines.extend(["\n\nTMPDIR=$(mktemp -d)\nchmod 777 $TMPDIR"]) + + bins = {} + # put all runs into a queue + for i, run in enumerate(runs): + if i % number_of_bins not in bins: + bins[i % number_of_bins] = [] + bins[i % number_of_bins].append((i, run)) + + batch_lines.extend(["\n\ncase $SLURM_ARRAY_TASK_ID in"]) + for bin in bins: + batch_lines.extend(["\n" + str(bin) + ") "]) + taskfile_name = f"bin{str(bin)}.tasks" + taskfile = os.path.join(tempdir, taskfile_name) + with open(taskfile, "w") as f: + task_lines = [] + for i, run in bins[bin]: + task_lines.extend( + [str( + get_run_cli( + benchmark, + run.cmdline(), + os.path.join(tempdir, str(i)), + ) + )+ "\n" + ] + ) + f.writelines(task_lines) + batch_lines.extend( + f'\n while read -r x; do /bin/sh -c "$x" & done < {taskfile}' + ) + batch_lines.extend("\n wait") + batch_lines.extend(["\n;;"]) + + batch_lines.extend(["\nesac"]) + + batchfile = os.path.join(tempdir, "array.sbatch") + with open(batchfile, "w") as f: + f.writelines(batch_lines) + + if benchmark.config.generate_only: + return + + try: + sbatch_cmd = ["sbatch", "--wait", str(batchfile)] + logging.debug("Command to run: %s", shlex.join(sbatch_cmd)) + sbatch_result = subprocess.run( + sbatch_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + except KeyboardInterrupt: + STOPPED_BY_INTERRUPT = True + + if STOPPED_BY_INTERRUPT: + logging.debug("Canceling sbatch job if already started") + if sbatch_result and sbatch_result.stdout: + for line in sbatch_result.stdout.splitlines(): + jobid_match = sbatch_pattern.search(str(line)) + if jobid_match: + jobid = int(jobid_match.group(1)) + logging.debug(f"Canceling sbatch job #{jobid}") + subprocess.run(["scancel", str(jobid)]) + + time.sleep(5) + + missing_runs = [] + success_runs = [] + for bin in bins: + for i, run in bins[bin]: + try: + result = get_run_result( + os.path.join(tempdir, str(i)), + run, + benchmark.result_files_patterns + + ["*witness*"], # e.g., deagle uses mismatched naming + ) + success_runs.append((run, result)) + except Exception as e: + logging.warning("could not set result due to error: %s", e) + if counter < benchmark.config.retry or benchmark.config.retry < 0: + missing_runs.append(run) + else: + if not STOPPED_BY_INTERRUPT: + logging.debug("preserving log(s) due to error with run") + for file in glob.glob(f"{tempdir}/logs/*_{bin}.out"): + os.makedirs( + benchmark.result_files_folder, exist_ok=True + ) + shutil.copy( + file, + os.path.join( + benchmark.result_files_folder, + os.path.basename(file) + ".error", + ), + ) + + time.sleep(10) + + for run, result in success_runs: + try: + run.set_result(result) + output_handler.output_after_run(run) + except Exception as e: + logging.warning( + "could not set result due to error, and won't retry: %s", e + ) + + if len(missing_runs) > 0 and not STOPPED_BY_INTERRUPT: + logging.info( + f"Retrying {len(missing_runs)} runs due to errors. Current retry count for this batch: {counter}" + ) + execute_batch(missing_runs, benchmark, output_handler, counter + 1) + + +def stop(): + global STOPPED_BY_INTERRUPT + STOPPED_BY_INTERRUPT = True + + +def get_resource_limits(benchmark, parallel_factor=1, sequential_factor=1): + timelimit = int( + max( + int(benchmark.rlimits.cputime if benchmark.rlimits.cputime else 0), + int(benchmark.rlimits.walltime if benchmark.rlimits.walltime else 0), + int( + benchmark.rlimits.cputime_hard if benchmark.rlimits.cputime_hard else 0 + ), + ) * benchmark.config.overtime_factor * sequential_factor + ) + assert timelimit > 0, "Either cputime, cputime_hard, or walltime should be given." + cpus = benchmark.rlimits.cpu_cores * parallel_factor + memory = ( + benchmark.rlimits.memory * parallel_factor + ) + + srun_timelimit_h = int(timelimit / 3600) + srun_timelimit_m = int((timelimit % 3600) / 60) + srun_timelimit_s = int(timelimit % 60) + srun_timelimit = ( + f"{srun_timelimit_h:02d}:{srun_timelimit_m:02d}:{srun_timelimit_s:02d}" + ) + + ret = [ + "--time=" + str(srun_timelimit), + "--cpus-per-task=" + str(cpus), + "--mem=" + str(int(memory / 1000000)) + "M", + "--threads-per-core=1", # --use_hyperthreading=False is always given here + "--mincpus=" + str(cpus), + "--ntasks=1", + ] + return ret + + +def get_run_cli(benchmark, args, resultdir): + os.makedirs(resultdir) + cli = [] + basedir = os.path.abspath(os.path.dirname(singularity)) + + base_cmd = ["srun", "--exclusive", *get_resource_limits(benchmark, 1, 1)] + + base_cmd.extend( + [ + "singularity", + "exec", + "-B", + "/sys/fs/cgroup:/sys/fs/cgroup:ro", + "-B", + f"{basedir}:/lower:ro", + "-B", + f"{resultdir}:/results:rw", + "--no-home", + "--fusemount", + f"container:fuse-overlayfs -o lowerdir=/lower -o upperdir=/tmp -o workdir=/tmp {basedir}", + singularity, + ] + ) + base_cmd.extend( + [ + "bash", # bash is needed for ${PIPESTATUS[0]} + "-c", + f"cd {os.getcwd()}; " + "start=$(date +%s.%N); " + "CG_CPU=$(awk -F: \"$2 ~ /cpu/ {print $3;exit}\" /proc/self/cgroup); " + "CG_MEM=$(awk -F: \"$2 ~ /memory/ {print $3;exit}\" /proc/self/cgroup); " + "BASE_CPU=\"/sys/fs/cgroup/cpu$CG_CPU\"; " + "BASE_MEM=\"/sys/fs/cgroup/memory$CG_MEM\"; " + "before_cpu=$(cat $BASE_CPU/cpuacct.usage 2>/dev/null); " + f"{shlex.join(['echo', 'Running command: ', *args])}; " + f"{shlex.join(args)} 2>&1 | tee /results/log; " + "rv=${PIPESTATUS[0]}; " + "end=$(date +%s.%N); " + "after_cpu=$(cat $BASE_CPU/cpuacct.usage 2>/dev/null); " + "mem=$(cat $BASE_MEM/memory.max_usage_in_bytes 2>/dev/null || cat $BASE_MEM/memory.usage_in_bytes 2>/dev/null); " + "awk -v start=\"$start\" -v end=\"$end\" -v before_cpu=\"$before_cpu\" -v after_cpu=\"$after_cpu\" -v mem=\"$mem\" -v rv=\"$rv\" \"BEGIN { walltime=end-start;cputime=(after_cpu-before_cpu)/1e9; printf \\\"walltime=%.3fs\\ncputime=%.3fs\\nmemory=%dB\\nreturnvalue=%d\\n\\\", walltime, cputime, mem, rv }\"' >/results/output.log", + ] + ) + + cli = shlex.join(base_cmd) + logging.debug("Command to run: %s", cli) + + return cli + + +def get_run_result(tempdir, run, result_files_patterns): + runexec_log = f"{tempdir}/log" + tmp_log = f"{tempdir}/output.log" + + data_dict = {} + with open(runexec_log, "r") as file: + for line in file: + line = line.strip() + if line and "=" in line: + key, value = line.split("=", 1) + data_dict[key.strip()] = value.strip() + + ret = {} + if "walltime" in data_dict: + ret["walltime"] = float(data_dict["walltime"][:-1]) # ends in 's' + if "cputime" in data_dict: + ret["cputime"] = float(data_dict["cputime"][:-1]) # ends in 's' + if "memory" in data_dict: + ret["memory"] = int(data_dict["memory"][:-1]) # ends in 'B' + if "returnvalue" in data_dict: + ret["exitcode"] = ProcessExitCode.create(value=int(data_dict["returnvalue"])) + if "exitsignal" in data_dict: + ret["exitcode"] = ProcessExitCode.create(signal=int(data_dict["exitsignal"])) + if "terminationreason" in data_dict: + ret["terminationreason"] = data_dict["terminationreason"] + + shutil.copy(tmp_log, run.log_file) + + if os.path.exists(tempdir): + os.makedirs(run.result_files_folder, exist_ok=True) + for result_files_pattern in result_files_patterns: + for file_name in glob.glob(f"{tempdir}/{result_files_pattern}"): + if os.path.isfile(file_name): + shutil.copy(file_name, run.result_files_folder) + shutil.rmtree(tempdir) + + return ret diff --git a/contrib/slurm/utils.py b/contrib/slurm/utils.py new file mode 100644 index 000000000..ffd897506 --- /dev/null +++ b/contrib/slurm/utils.py @@ -0,0 +1,96 @@ +# This file is part of BenchExec, a framework for reliable benchmarking: +# https://github.com/sosy-lab/benchexec +# +# SPDX-FileCopyrightText: 2024 Levente Bajczi +# SPDX-FileCopyrightText: Critical Systems Research Group +# SPDX-FileCopyrightText: Budapest University of Technology and Economics +# +# SPDX-License-Identifier: Apache-2.0 +import functools +import json +import logging +import subprocess + +from benchexec.systeminfo import SystemInfo + + +def version_in_container(singularity, tool_module): + version_printer = f"""from benchexec import tooladapter +from benchexec.model import load_tool_info +class Config(): + pass + +config = Config() +config.container = False +config.tool_directory = "." +locator = tooladapter.create_tool_locator(config) +tool = load_tool_info("{tool_module}", config)[1] +executable = tool.executable(locator) +print(tool.version(executable))""" + + @functools.lru_cache() + def version_from_tool_in_container(executable): + try: + with open(".get_version.py", "w") as script: + script.write(version_printer) + process = subprocess.run( + [ + "singularity", + "exec", + singularity, + "python3", + ".get_version.py", + ], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + universal_newlines=True, + ) + if process.stdout: + return process.stdout.strip() + + except Exception as e: + logging.warning( + "could not determine version (in container) due to error: %s", e + ) + return "" + + return version_from_tool_in_container + + +def get_system_info_srun(singularity): + try: + process = subprocess.run( + [ + "srun", + "-t", + "1" + "singularity", + "exec", + singularity, + "python3", + "-c", + "import benchexec.systeminfo; " + "import json; " + "print(json.dumps(benchexec.systeminfo.SystemInfo().__dict__))", + ], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + universal_newlines=True, + ) + if process.stdout: + actual_sysinfo = json.loads(process.stdout.strip()) + blank_sysinfo = SystemInfo() + blank_sysinfo.hostname = str(actual_sysinfo["hostname"]) + " (sample)" + blank_sysinfo.os = actual_sysinfo["os"] + blank_sysinfo.cpu_max_frequency = actual_sysinfo["cpu_max_frequency"] + blank_sysinfo.cpu_number_of_cores = actual_sysinfo["cpu_number_of_cores"] + blank_sysinfo.cpu_model = actual_sysinfo["cpu_model"] + blank_sysinfo.cpu_turboboost = actual_sysinfo["cpu_turboboost"] + blank_sysinfo.memory = actual_sysinfo["memory"] + return blank_sysinfo + + except Exception as e: + logging.warning("could not determine system info due to error: %s", e) + return None \ No newline at end of file