Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/dvsim/runtime/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,21 @@ def _prepare_launch(self, job: JobSpec) -> None:
self._make_job_output_directory(job)

def _finish_job(
self, handle: JobHandle, exit_code: int, runtime: float | None
self, handle: JobHandle, exit_code: int | None, runtime: float | None
) -> tuple[JobStatus, JobStatusInfo | None]:
"""Determine the outcome of a job that ran to completion, and parse extra log info.

Updates the handle with any extracted job runtime & simulation time info.

Args:
handle: The handle to the job that finished.
exit_code: The exit code if the job finished gracefully, or None if it was terminated.
runtime: The amount of time taken to complete the job, if known.

Returns:
A tuple containing the final job status and optionally an additional context/reason
object describing the job completion.

"""
if handle.spec.dry_run:
return JobStatus.PASSED, None
Expand All @@ -241,6 +251,11 @@ def _finish_job(
handle.simulated_time.set(*simulated_time.get())

# Determine the final status from the logs and exit code.
if exit_code is None:
return JobStatus.KILLED, JobStatusInfo(
message=f"Job timed out after {handle.spec.timeout_mins} minutes"
)

status, reason = log_results.get_status_from_logs()
if status is not None:
return status, reason
Expand Down
12 changes: 6 additions & 6 deletions src/dvsim/runtime/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,17 @@ async def _monitor_job(self, handle: LocalJobHandle) -> None:
]
status = JobStatus.KILLED
reason = None
exit_code = None

try:
exit_code = await asyncio.wait_for(
handle.process.wait(), timeout=handle.spec.timeout_secs
)
runtime = time.monotonic() - handle.start_time
status, reason = self._finish_job(handle, exit_code, runtime)
except asyncio.TimeoutError:
await self._kill_job(handle)
status = JobStatus.KILLED
timeout_message = f"Job timed out after {handle.spec.timeout_mins} minutes"
reason = JobStatusInfo(message=timeout_message)
finally:
runtime = time.monotonic() - handle.start_time

# Explicitly cancel reader tasks and wait for them to finish before closing the log
# file. We first give them a second to finish naturally to reduce log loss.
with contextlib.suppress(asyncio.TimeoutError):
Expand All @@ -131,6 +129,8 @@ async def _monitor_job(self, handle: LocalJobHandle) -> None:
task.cancel()
await asyncio.gather(*reader_tasks, return_exceptions=True)

status, reason = self._finish_job(handle, exit_code, runtime)

if handle.log_file:
handle.log_file.close()
if handle.kill_requested:
Expand Down Expand Up @@ -322,7 +322,6 @@ async def _kill_job(self, handle: LocalJobHandle) -> None:
return

if proc.returncode is None:
handle.kill_requested = True
try:
self._send_kill_signal(proc, signal.SIGTERM)
except ProcessLookupError:
Expand Down Expand Up @@ -366,6 +365,7 @@ async def kill_many(self, handles: Iterable[JobHandle]) -> None:
msg = f"Local backend expected handle of type LocalJobHandle, not `{type(handle)}`."
raise TypeError(msg)
if handle.process and not handle.kill_requested and handle.process.returncode is None:
handle.kill_requested = True
tasks.append(asyncio.create_task(self._kill_job(handle)))

if tasks:
Expand Down
Loading