diff --git a/newton/tests/thirdparty/unittest_parallel.py b/newton/tests/thirdparty/unittest_parallel.py index 2bb635823c..37b8333d55 100644 --- a/newton/tests/thirdparty/unittest_parallel.py +++ b/newton/tests/thirdparty/unittest_parallel.py @@ -16,6 +16,7 @@ import tempfile import time import unittest +from concurrent.futures.process import BrokenProcessPool # NVIDIA Modification from contextlib import contextmanager from io import StringIO @@ -40,6 +41,7 @@ # The following variables are NVIDIA Modifications START_DIRECTORY = os.path.dirname(__file__) # The directory to start test discovery +_SUITE_TIMEOUT = 3600 # Wall-clock limit for parallel execution / per-suite limit in isolated fallback def main(argv=None): @@ -230,15 +232,119 @@ def main(argv=None): test_manager = ParallelTestManager(manager, args, temp_dir) results = pool.map(test_manager.run_tests, test_suites) else: - # NVIDIA Modification added concurrent.futures - with concurrent.futures.ProcessPoolExecutor( - max_workers=process_count, - mp_context=multiprocessing.get_context(method="spawn"), - initializer=initialize_test_process, - initargs=(manager.Lock(), shared_index, args, temp_dir), - ) as executor: - test_manager = ParallelTestManager(manager, args, temp_dir) - results = list(executor.map(test_manager.run_tests, test_suites, timeout=2400)) + # NVIDIA Modification: concurrent.futures with crash handling + # and per-suite isolated fallback (see #1847). + results = [] + parallel_failed = False + parallel_fail_reason = "unknown" + + try: + with concurrent.futures.ProcessPoolExecutor( + max_workers=process_count, + mp_context=multiprocessing.get_context(method="spawn"), + initializer=initialize_test_process, + initargs=(manager.Lock(), shared_index, args, temp_dir), + ) as executor: + test_manager = ParallelTestManager(manager, args, temp_dir) + for result in executor.map(test_manager.run_tests, test_suites, timeout=_SUITE_TIMEOUT): + results.append(result) + + except TimeoutError: + pending_index = len(results) + total = len(test_suites) + suite_name = _get_suite_name(test_suites[pending_index]) if pending_index < total else "unknown" + print( + f"Warning: Parallel execution timed out (total timeout={_SUITE_TIMEOUT}s). " + f"Next pending result was suite " + f"{pending_index + 1}/{total} ({suite_name}), " + f"but a different suite may be the actual blocker. " + f"Switching to isolated single-process fallback.", + file=sys.stderr, + ) + parallel_failed = True + parallel_fail_reason = "timed out" + except BrokenProcessPool: + print( + "Warning: Process pool broken during parallel execution. " + "Switching to isolated single-process fallback.", + file=sys.stderr, + ) + parallel_failed = True + parallel_fail_reason = "process pool broken" + except Exception as e: + print( + f"Warning: Process pool error: {e}. Switching to isolated single-process fallback.", + file=sys.stderr, + ) + parallel_failed = True + parallel_fail_reason = str(e) + + # Fallback to isolated single-process execution if parallel failed. + # Skip fallback in CI/CD environments to respect job timeouts. + in_ci = os.environ.get("CI") or os.environ.get("GITHUB_ACTIONS") or os.environ.get("GITLAB_CI") + if parallel_failed and in_ci: + parser.exit( + status=1, + message=f"Error: Parallel execution failed ({parallel_fail_reason}) " + f"in CI/CD environment. Skipping single-process fallback " + f"due to job timeout constraints.\n", + ) + elif parallel_failed: + print( + "Running all tests in isolated single-process mode...", + file=sys.stderr, + ) + results = [] + for i, suite in enumerate(test_suites): + try: + with concurrent.futures.ProcessPoolExecutor( + max_workers=1, + mp_context=multiprocessing.get_context(method="spawn"), + initializer=initialize_test_process, + initargs=(manager.Lock(), shared_index, args, temp_dir), + ) as executor: + test_manager = ParallelTestManager(manager, args, temp_dir) + future = executor.submit(test_manager.run_tests, suite) + try: + result = future.result(timeout=_SUITE_TIMEOUT) + results.append(result) + except TimeoutError: + suite_name = _get_suite_name(suite) + print( + f"Warning: Isolated test suite {i + 1}/{len(test_suites)} " + f"({suite_name}) timed out (timeout={_SUITE_TIMEOUT}s). " + f"Marking tests as crashed.", + file=sys.stderr, + ) + results.append( + create_crash_result( + suite, + reason=f"Process timed out (timeout={_SUITE_TIMEOUT}s)", + ) + ) + except BrokenProcessPool: + print( + f"Warning: Process crashed or was terminated unexpectedly " + f"in isolated execution for test suite " + f"{i + 1}/{len(test_suites)}. Marking tests as crashed.", + file=sys.stderr, + ) + results.append(create_crash_result(suite)) + except Exception as e: + print( + f"Warning: Error in isolated test suite " + f"{i + 1}/{len(test_suites)}: {e}. " + f"Marking tests as crashed.", + file=sys.stderr, + ) + results.append(create_crash_result(suite)) + except Exception as e: + print( + f"Warning: Failed to create isolated process for test suite " + f"{i + 1}/{len(test_suites)}: {e}. Marking tests as crashed.", + file=sys.stderr, + ) + results.append(create_crash_result(suite)) else: # This entire path is an NVIDIA Modification @@ -411,6 +517,39 @@ def _iter_test_cases(test_suite): yield from _iter_test_cases(suite) +def _get_suite_name(test_suite): + """Return a human-readable name for a test suite (e.g. 'TestTileMatmul'). + + This entire function is an NVIDIA modification. + """ + first_test = next(_iter_test_cases(test_suite), None) + return type(first_test).__name__ if first_test is not None else "unknown" + + +def create_crash_result(test_suite, reason="Process crashed or was terminated unexpectedly"): + """Create a result indicating the process failed while running this test suite. + + This entire function is an NVIDIA modification. + """ + crash_errors = [] + + for test in _iter_test_cases(test_suite): + error_msg = f"{reason} while running this test suite (unknown which test caused the failure): {test}" + crash_errors.append( + "\n".join( + [ + unittest.TextTestResult.separator1, + str(test), + unittest.TextTestResult.separator2, + error_msg, + ] + ) + ) + + # Same format as run_tests: (test_count, errors, failures, skipped, expected_failures, unexpected_successes, test_records) + return (test_suite.countTestCases(), crash_errors, [], 0, 0, 0, []) + + class ParallelTestManager: # Manager proxy calls can fail with ConnectionError, TypeError, or OSError # due to a TOCTOU race in Connection.send() where GC can close the