diff --git a/src/dvsim/runtime/backend.py b/src/dvsim/runtime/backend.py index 1c6f355d..7f22203d 100644 --- a/src/dvsim/runtime/backend.py +++ b/src/dvsim/runtime/backend.py @@ -218,11 +218,21 @@ def _prepare_launch(self, job: JobSpec) -> None: self._make_job_output_directory(job) def _finish_job( - self, handle: JobHandle, exit_code: int, runtime: float | None + self, handle: JobHandle, exit_code: int | None, runtime: float | None ) -> tuple[JobStatus, JobStatusInfo | None]: """Determine the outcome of a job that ran to completion, and parse extra log info. Updates the handle with any extracted job runtime & simulation time info. + + Args: + handle: The handle to the job that finished. + exit_code: The exit code if the job finished gracefully, or None if it was terminated. + runtime: The amount of time taken to complete the job, if known. + + Returns: + A tuple containing the final job status and optionally an additional context/reason + object describing the job completion. + """ if handle.spec.dry_run: return JobStatus.PASSED, None @@ -241,6 +251,11 @@ def _finish_job( handle.simulated_time.set(*simulated_time.get()) # Determine the final status from the logs and exit code. + if exit_code is None: + return JobStatus.KILLED, JobStatusInfo( + message=f"Job timed out after {handle.spec.timeout_mins} minutes" + ) + status, reason = log_results.get_status_from_logs() if status is not None: return status, reason diff --git a/src/dvsim/runtime/local.py b/src/dvsim/runtime/local.py index 8d7a128f..e01e82fe 100644 --- a/src/dvsim/runtime/local.py +++ b/src/dvsim/runtime/local.py @@ -109,19 +109,17 @@ async def _monitor_job(self, handle: LocalJobHandle) -> None: ] status = JobStatus.KILLED reason = None + exit_code = None try: exit_code = await asyncio.wait_for( handle.process.wait(), timeout=handle.spec.timeout_secs ) - runtime = time.monotonic() - handle.start_time - status, reason = self._finish_job(handle, exit_code, runtime) except asyncio.TimeoutError: await self._kill_job(handle) - status = JobStatus.KILLED - timeout_message = f"Job timed out after {handle.spec.timeout_mins} minutes" - reason = JobStatusInfo(message=timeout_message) finally: + runtime = time.monotonic() - handle.start_time + # Explicitly cancel reader tasks and wait for them to finish before closing the log # file. We first give them a second to finish naturally to reduce log loss. with contextlib.suppress(asyncio.TimeoutError): @@ -131,6 +129,8 @@ async def _monitor_job(self, handle: LocalJobHandle) -> None: task.cancel() await asyncio.gather(*reader_tasks, return_exceptions=True) + status, reason = self._finish_job(handle, exit_code, runtime) + if handle.log_file: handle.log_file.close() if handle.kill_requested: @@ -322,7 +322,6 @@ async def _kill_job(self, handle: LocalJobHandle) -> None: return if proc.returncode is None: - handle.kill_requested = True try: self._send_kill_signal(proc, signal.SIGTERM) except ProcessLookupError: @@ -366,6 +365,7 @@ async def kill_many(self, handles: Iterable[JobHandle]) -> None: msg = f"Local backend expected handle of type LocalJobHandle, not `{type(handle)}`." raise TypeError(msg) if handle.process and not handle.kill_requested and handle.process.returncode is None: + handle.kill_requested = True tasks.append(asyncio.create_task(self._kill_job(handle))) if tasks: