Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 148 additions & 9 deletions newton/tests/thirdparty/unittest_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import tempfile
import time
import unittest
from concurrent.futures.process import BrokenProcessPool # NVIDIA Modification
from contextlib import contextmanager
from io import StringIO

Expand All @@ -40,6 +41,7 @@

# The following variables are NVIDIA Modifications
START_DIRECTORY = os.path.dirname(__file__) # The directory to start test discovery
_SUITE_TIMEOUT = 3600 # Wall-clock limit for parallel execution / per-suite limit in isolated fallback


def main(argv=None):
Expand Down Expand Up @@ -230,15 +232,119 @@ def main(argv=None):
test_manager = ParallelTestManager(manager, args, temp_dir)
results = pool.map(test_manager.run_tests, test_suites)
else:
# NVIDIA Modification added concurrent.futures
with concurrent.futures.ProcessPoolExecutor(
max_workers=process_count,
mp_context=multiprocessing.get_context(method="spawn"),
initializer=initialize_test_process,
initargs=(manager.Lock(), shared_index, args, temp_dir),
) as executor:
test_manager = ParallelTestManager(manager, args, temp_dir)
results = list(executor.map(test_manager.run_tests, test_suites, timeout=2400))
# NVIDIA Modification: concurrent.futures with crash handling
# and per-suite isolated fallback (see #1847).
results = []
parallel_failed = False
parallel_fail_reason = "unknown"

try:
with concurrent.futures.ProcessPoolExecutor(
max_workers=process_count,
mp_context=multiprocessing.get_context(method="spawn"),
initializer=initialize_test_process,
initargs=(manager.Lock(), shared_index, args, temp_dir),
) as executor:
test_manager = ParallelTestManager(manager, args, temp_dir)
for result in executor.map(test_manager.run_tests, test_suites, timeout=_SUITE_TIMEOUT):
results.append(result)

except TimeoutError:
pending_index = len(results)
total = len(test_suites)
suite_name = _get_suite_name(test_suites[pending_index]) if pending_index < total else "unknown"
print(
f"Warning: Parallel execution timed out (total timeout={_SUITE_TIMEOUT}s). "
f"Next pending result was suite "
f"{pending_index + 1}/{total} ({suite_name}), "
f"but a different suite may be the actual blocker. "
f"Switching to isolated single-process fallback.",
file=sys.stderr,
)
parallel_failed = True
parallel_fail_reason = "timed out"
Comment on lines +241 to +265
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

In Python's concurrent.futuresdocumentation, doesExecutor.map(timeout=...)apply the timeout per item, or from the originalExecutor.map() call?

💡 Result:

Web search failed: Server error: no LLM provider could handle the message


🌐 Web query:

Python concurrent.futures Executor.map timeout semantics per item vs total duration

💡 Result:

Executor.map(..., timeout=T) is a single overall deadline, not “T seconds per item”.

  • Python sets a deadline at the moment you call Executor.map(). The iterator it returns will raise TimeoutError when you call __next__() and the next result isn’t ready by T seconds after that original map() call. So each next() effectively only gets the remaining time until the same global deadline. [1]
  • If you never iterate the returned iterator (e.g., you don’t loop it, list() it, etc.), then __next__() is never called and no timeout is triggered. [2]
  • This timeout does not cancel/stop running work; it only stops waiting for (ordered) results. Also, because map() yields results in input order, one slow early item can block yielding later items even if they’re already finished. [1]

If you want per-item timeouts, use submit() and then future.result(timeout=...) per future (or manage futures with as_completed()), rather than relying on Executor.map(timeout=...). [1]

Sources: Python docs for Executor.map() semantics [1]; discussion/example showing timeout only occurs during iteration [2].


🏁 Script executed:

# First, let's examine the actual code in the file to understand context
cat -n newton/tests/thirdparty/unittest_parallel.py | sed -n '240,270p'

Repository: newton-physics/newton

Length of output: 2127


🏁 Script executed:

# Let's also check if there are any comments in the code about timeout intent
rg -A 5 -B 5 "executor.map.*timeout" newton/tests/thirdparty/unittest_parallel.py

Repository: newton-physics/newton

Length of output: 741


🏁 Script executed:

# Check the definition of _SUITE_TIMEOUT to understand the intent
rg "_SUITE_TIMEOUT" newton/tests/thirdparty/unittest_parallel.py

Repository: newton-physics/newton

Length of output: 684


The timeout comment and warning message are misleading about the timeout scope.

Line 249's executor.map(timeout=_SUITE_TIMEOUT) applies a wall-clock deadline to the entire parallel phase (not per-suite), which matches the _SUITE_TIMEOUT comment "Wall-clock limit for parallel execution". However, the warning message at line 257 states "Next pending result was suite ... but a different suite may be the actual blocker," which is inconsistent with wall-clock timeout semantics—under a global deadline, which suite is slow doesn't matter; the entire parallel phase has exhausted its time budget. If the intent is truly a per-suite wall-clock cap for parallel execution (to match the isolated fallback's per-suite behavior), clarify the code comment and revise the warning message to reflect that. Otherwise, update the warning to remove the misleading reference to specific suites being blockers when the timeout is global.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@newton/tests/thirdparty/unittest_parallel.py` around lines 241 - 265, The
warning message incorrectly implies a per-suite blocker even though
executor.map(timeout=_SUITE_TIMEOUT) enforces a wall-clock limit on the whole
parallel phase; update the code to either implement a true per-suite timeout
(submit each suite via executor.submit and enforce per-future timeouts with
as_completed/future.result(timeout=_SUITE_TIMEOUT)) or, more simply, change the
warning text printed in the except TimeoutError block (the print that references
pending_index and suite_name) to remove the misleading “different suite may be
the actual blocker” wording and state that the parallel phase hit the global
wall-clock timeout; refer to executor.map, _SUITE_TIMEOUT,
ParallelTestManager.run_tests, pending_index and suite_name to locate the
relevant code to change.

except BrokenProcessPool:
print(
"Warning: Process pool broken during parallel execution. "
"Switching to isolated single-process fallback.",
file=sys.stderr,
)
parallel_failed = True
parallel_fail_reason = "process pool broken"
except Exception as e:
print(
f"Warning: Process pool error: {e}. Switching to isolated single-process fallback.",
file=sys.stderr,
)
parallel_failed = True
parallel_fail_reason = str(e)

# Fallback to isolated single-process execution if parallel failed.
# Skip fallback in CI/CD environments to respect job timeouts.
in_ci = os.environ.get("CI") or os.environ.get("GITHUB_ACTIONS") or os.environ.get("GITLAB_CI")
if parallel_failed and in_ci:
parser.exit(
status=1,
message=f"Error: Parallel execution failed ({parallel_fail_reason}) "
f"in CI/CD environment. Skipping single-process fallback "
f"due to job timeout constraints.\n",
)
elif parallel_failed:
print(
"Running all tests in isolated single-process mode...",
file=sys.stderr,
)
results = []
for i, suite in enumerate(test_suites):
try:
with concurrent.futures.ProcessPoolExecutor(
max_workers=1,
mp_context=multiprocessing.get_context(method="spawn"),
initializer=initialize_test_process,
initargs=(manager.Lock(), shared_index, args, temp_dir),
) as executor:
test_manager = ParallelTestManager(manager, args, temp_dir)
future = executor.submit(test_manager.run_tests, suite)
try:
result = future.result(timeout=_SUITE_TIMEOUT)
results.append(result)
except TimeoutError:
suite_name = _get_suite_name(suite)
print(
f"Warning: Isolated test suite {i + 1}/{len(test_suites)} "
f"({suite_name}) timed out (timeout={_SUITE_TIMEOUT}s). "
f"Marking tests as crashed.",
file=sys.stderr,
)
results.append(
create_crash_result(
suite,
reason=f"Process timed out (timeout={_SUITE_TIMEOUT}s)",
)
)
except BrokenProcessPool:
print(
f"Warning: Process crashed or was terminated unexpectedly "
f"in isolated execution for test suite "
f"{i + 1}/{len(test_suites)}. Marking tests as crashed.",
file=sys.stderr,
)
results.append(create_crash_result(suite))
except Exception as e:
print(
f"Warning: Error in isolated test suite "
f"{i + 1}/{len(test_suites)}: {e}. "
f"Marking tests as crashed.",
file=sys.stderr,
)
results.append(create_crash_result(suite))
except Exception as e:
print(
f"Warning: Failed to create isolated process for test suite "
f"{i + 1}/{len(test_suites)}: {e}. Marking tests as crashed.",
file=sys.stderr,
)
results.append(create_crash_result(suite))
else:
# This entire path is an NVIDIA Modification

Expand Down Expand Up @@ -411,6 +517,39 @@ def _iter_test_cases(test_suite):
yield from _iter_test_cases(suite)


def _get_suite_name(test_suite):
"""Return a human-readable name for a test suite (e.g. 'TestTileMatmul').

This entire function is an NVIDIA modification.
"""
first_test = next(_iter_test_cases(test_suite), None)
return type(first_test).__name__ if first_test is not None else "unknown"


def create_crash_result(test_suite, reason="Process crashed or was terminated unexpectedly"):
"""Create a result indicating the process failed while running this test suite.

This entire function is an NVIDIA modification.
"""
crash_errors = []

for test in _iter_test_cases(test_suite):
error_msg = f"{reason} while running this test suite (unknown which test caused the failure): {test}"
crash_errors.append(
"\n".join(
[
unittest.TextTestResult.separator1,
str(test),
unittest.TextTestResult.separator2,
error_msg,
]
)
)

# Same format as run_tests: (test_count, errors, failures, skipped, expected_failures, unexpected_successes, test_records)
return (test_suite.countTestCases(), crash_errors, [], 0, 0, 0, [])


class ParallelTestManager:
# Manager proxy calls can fail with ConnectionError, TypeError, or OSError
# due to a TOCTOU race in Connection.send() where GC can close the
Expand Down
Loading