diff --git a/easybuild/tools/run.py b/easybuild/tools/run.py index 2e118e0860..cd5b6e51df 100644 --- a/easybuild/tools/run.py +++ b/easybuild/tools/run.py @@ -46,6 +46,7 @@ import subprocess import tempfile import time +import threading from collections import namedtuple from datetime import datetime @@ -356,11 +357,80 @@ def _answer_question(stdout, proc, qa_patterns, qa_wait_patterns): return match_found +def _read_pipe(pipe, output): + """Helper function to read from a pipe and store output in a list. + :param pipe: pipe to read from + :param output: list to store output in + """ + out = b'' + for line in iter(pipe.readline, b''): + _log.debug(f"Captured: {line.decode(errors='ignore').rstrip()}") + out += line + output.append(out) + + +def read_process_pipe(proc, pipe_name, start=None, timeout=None): + """Read from a pipe form a process using a separate thread to avoid blocking and implement a timeout. + :param proc: process to read from + :param pipe_name: name of the pipe to read from (stdout or stderr) + :param start: time when the process was started (used to calculate timeout) + :param timeout: timeout in seconds (default: None = no timeout) + + :return: data read from pipe + + :raises EasyBuildError: when reading from pipe takes longer than specified timeout + """ + pipe = getattr(proc, pipe_name, None) + if pipe is None: + raise EasyBuildError(f"Pipe '{pipe_name}' not found in process '{proc}'. This is probably a bug.") + + error_msg = "Unexpected timeout error during read_process_pipe" + current_timeout = None + if start is not None and timeout is not None: + current_timeout = timeout - (time.time() - start) + error_msg = f"Timeout during `{proc.args}` after {timeout} seconds" + if current_timeout <= 0: + _log.warning(error_msg) + try: + terminate_process(proc) + except subprocess.TimeoutExpired as exc: + _log.warning(f"Failed to terminate process '{proc.args}': {exc}") + raise EasyBuildError(error_msg) + + output = [] + t = threading.Thread(target=_read_pipe, args=(pipe, output)) + t.start() + t.join(current_timeout) + if t.is_alive(): + raise EasyBuildError(error_msg) + return output[0] + + +def terminate_process(proc, timeout=20): + """ + Terminate specified process (subprocess.Popen instance). + Attempt to terminate the process using proc.terminate(), and if that fails, use proc.kill(). + + :param proc: process to terminate + :param timeout: timeout in seconds to wait for process to terminate + + :raises subprocess.TimeoutExpired: if process does not terminate within specified timeout + """ + proc.terminate() + try: + proc.wait(timeout=timeout) + except subprocess.TimeoutExpired: + _log.warning(f"Process did not terminate after {timeout} seconds, sending SIGKILL") + + proc.kill() + proc.wait(timeout=timeout) + + @run_shell_cmd_cache def run_shell_cmd(cmd, fail_on_error=True, split_stderr=False, stdin=None, env=None, hidden=False, in_dry_run=False, verbose_dry_run=False, work_dir=None, use_bash=True, output_file=True, stream_output=None, asynchronous=False, task_id=None, with_hooks=True, - qa_patterns=None, qa_wait_patterns=None, qa_timeout=100): + timeout=None, qa_patterns=None, qa_wait_patterns=None, qa_timeout=100): """ Run specified (interactive) shell command, and capture output + exit code. @@ -378,6 +448,7 @@ def run_shell_cmd(cmd, fail_on_error=True, split_stderr=False, stdin=None, env=N :param asynchronous: indicate that command is being run asynchronously :param task_id: task ID for specified shell command (included in return value) :param with_hooks: trigger pre/post run_shell_cmd hooks (if defined) + :param timeout: timeout in seconds for command execution :param qa_patterns: list of 2-tuples with patterns for questions + corresponding answers :param qa_wait_patterns: list of strings with patterns for non-questions :param qa_timeout: amount of seconds to wait until more output is produced when there is no matching question @@ -524,16 +595,14 @@ def to_cmd_str(cmd): time_no_match = 0 prev_stdout = '' + # collect output piece-wise, while checking for questions to answer (if qa_patterns is provided) + start = time.time() while exit_code is None: - # collect output line by line, while checking for questions to answer (if qa_patterns is provided) - for line in iter(proc.stdout.readline, b''): - _log.debug(f"Captured stdout: {line.decode(errors='ignore').rstrip()}") - stdout += line + stdout += read_process_pipe(proc, 'stdout', start=start, timeout=timeout) # note: we assume that there won't be any questions in stderr output if split_stderr: - for line in iter(proc.stderr.readline, b''): - stderr += line + stderr += read_process_pipe(proc, 'stderr', start=start, timeout=timeout) if qa_patterns: # only check for question patterns if additional output is available @@ -565,7 +634,13 @@ def to_cmd_str(cmd): if split_stderr: stderr += proc.stderr.read() or b'' else: - (stdout, stderr) = proc.communicate(input=stdin) + try: + (stdout, stderr) = proc.communicate(input=stdin, timeout=timeout) + except subprocess.TimeoutExpired: + error_msg = f"Timeout during `{cmd}` after {timeout} seconds" + _log.warning(error_msg) + terminate_process(proc) + raise EasyBuildError(error_msg) # return output as a regular string rather than a byte sequence (and non-UTF-8 characters get stripped out) # getpreferredencoding normally gives 'utf-8' but can be ASCII (ANSI_X3.4-1968) diff --git a/test/framework/run.py b/test/framework/run.py index 1c5cf3a422..1198d45a97 100644 --- a/test/framework/run.py +++ b/test/framework/run.py @@ -1726,6 +1726,41 @@ def test_run_shell_cmd_eof_stdin(self): self.assertEqual(res.exit_code, 0, "Non-streaming output: Command timed out") self.assertEqual(res.output, inp) + def test_run_shell_cmd_timeout(self): + """Test use of run_shell_cmd with a timeout.""" + cmd = 'sleep 1; echo hello' + # Failure on process timeout + with self.mocked_stdout_stderr(): + self.assertErrorRegex( + EasyBuildError, "Timeout during `.*` after .* seconds", + run_shell_cmd, cmd, timeout=.5 + ) + + # Success + with self.mocked_stdout_stderr(): + res = run_shell_cmd(cmd, timeout=3) + self.assertEqual(res.exit_code, 0) + self.assertEqual(res.output, "hello\n") + + def test_run_shell_cmd_timeout_stream(self): + """Test use of run_shell_cmd with a timeout.""" + data = '0'*128 + # Failure on process timeout + cmd = f'for i in {{1..20}}; do echo {data} && sleep 0.1; done' + with self.mocked_stdout_stderr(): + self.assertErrorRegex( + EasyBuildError, "Timeout during `.*` after .* seconds", + run_shell_cmd, cmd, timeout=.5, stream_output=True + ) + + # Success + cmd = 'sleep .5 && echo hello' + with self.mocked_stdout_stderr(): + res = run_shell_cmd(cmd, timeout=1.5, stream_output=True) + + self.assertEqual(res.exit_code, 0) + self.assertEqual(res.output, "hello\n") + def test_run_cmd_async(self): """Test asynchronously running of a shell command via run_cmd + complete_cmd."""