From d20e263c5c5fbddba3cb8f6b5c5faf7411a2e674 Mon Sep 17 00:00:00 2001 From: KhawarHabibKhan Date: Fri, 13 Mar 2026 23:09:47 +0500 Subject: [PATCH 1/3] fix(timer): clamp negative remain_time and fix is_timeout TOCTOU race --- rdagent/log/timer.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/rdagent/log/timer.py b/rdagent/log/timer.py index a130541fb..5e1b058aa 100644 --- a/rdagent/log/timer.py +++ b/rdagent/log/timer.py @@ -56,13 +56,12 @@ def add_duration(self, duration: timedelta) -> None: def is_timeout(self) -> bool: if self.started and self.target_time is not None: self.update_remain_time() - if datetime.now() > self.target_time: - return True + return self._remain_time_duration == timedelta(0) return False def update_remain_time(self) -> None: if self.started and self.target_time is not None: - self._remain_time_duration = self.target_time - datetime.now() + self._remain_time_duration = max(self.target_time - datetime.now(), timedelta(0)) return None def remain_time(self) -> timedelta | None: From 16cfb983d2aced2ba1e0487d9ad35b3556a88be6 Mon Sep 17 00:00:00 2001 From: KhawarHabibKhan Date: Fri, 13 Mar 2026 23:10:07 +0500 Subject: [PATCH 2/3] fix(tracking): defensive clamp on MLflow remain_time metrics --- rdagent/utils/workflow/tracking.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rdagent/utils/workflow/tracking.py b/rdagent/utils/workflow/tracking.py index 598b16a77..729d3d951 100644 --- a/rdagent/utils/workflow/tracking.py +++ b/rdagent/utils/workflow/tracking.py @@ -85,10 +85,10 @@ def log_workflow_state(self) -> None: if self.loop_base.timer.started: remain_time = self.loop_base.timer.remain_time() assert remain_time is not None - mlflow.log_metric("remain_time", remain_time.total_seconds()) + mlflow.log_metric("remain_time", max(remain_time.total_seconds(), 0.0)) mlflow.log_metric( "remain_percent", - remain_time / self.loop_base.timer.all_duration * 100, + max(remain_time / self.loop_base.timer.all_duration * 100, 0.0), ) # Keep only the log_workflow_state method as it's the primary entry point now From ebda4c87de66035fea9228a87be0bad07b9020dd Mon Sep 17 00:00:00 2001 From: KhawarHabibKhan Date: Fri, 13 Mar 2026 23:10:22 +0500 Subject: [PATCH 3/3] feat(workflow): add async lock for step_n concurrency control --- rdagent/scenarios/data_science/loop.py | 4 ++-- rdagent/utils/workflow/loop.py | 19 +++++++++++-------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/rdagent/scenarios/data_science/loop.py b/rdagent/scenarios/data_science/loop.py index bdf028312..ce5cacc15 100644 --- a/rdagent/scenarios/data_science/loop.py +++ b/rdagent/scenarios/data_science/loop.py @@ -350,10 +350,10 @@ def record(self, prev_out: dict[str, Any]): ) # backup when upper code line is killed when running self.timer.add_duration(datetime.now() - start_archive_datetime) - def _check_exit_conditions_on_step(self, loop_id: Optional[int] = None, step_id: Optional[int] = None): + async def _check_exit_conditions_on_step(self, loop_id: Optional[int] = None, step_id: Optional[int] = None): if step_id not in [self.steps.index("running"), self.steps.index("feedback")]: # pass the check for running and feedbacks since they are very likely to be finished soon. - super()._check_exit_conditions_on_step(loop_id=loop_id, step_id=step_id) + await super()._check_exit_conditions_on_step(loop_id=loop_id, step_id=step_id) @classmethod def load( diff --git a/rdagent/utils/workflow/loop.py b/rdagent/utils/workflow/loop.py index c5548529f..11ae276d1 100644 --- a/rdagent/utils/workflow/loop.py +++ b/rdagent/utils/workflow/loop.py @@ -132,6 +132,7 @@ def __init__(self) -> None: self.step_n: Optional[int] = None # remain step count self.semaphores: dict[str, asyncio.Semaphore] = {} + self._step_n_lock: asyncio.Lock = asyncio.Lock() def get_unfinished_loop_cnt(self, next_loop: int) -> int: n = 0 @@ -168,7 +169,7 @@ def close_pbar(self) -> None: self._pbar.close() del self._pbar - def _check_exit_conditions_on_step(self, loop_id: Optional[int] = None, step_id: Optional[int] = None) -> None: + async def _check_exit_conditions_on_step(self, loop_id: Optional[int] = None, step_id: Optional[int] = None) -> None: """Check if the loop should continue or terminate. Raises @@ -176,11 +177,12 @@ def _check_exit_conditions_on_step(self, loop_id: Optional[int] = None, step_id: LoopTerminationException When conditions indicate that the loop should terminate """ - # Check step count limitation - if self.step_n is not None: - if self.step_n <= 0: - raise self.LoopTerminationError("Step count reached") - self.step_n -= 1 + # Check step count limitation — guarded by lock to prevent race under parallel steps + async with self._step_n_lock: + if self.step_n is not None: + if self.step_n <= 0: + raise self.LoopTerminationError("Step count reached") + self.step_n -= 1 # Check timer timeout if self.timer.started: @@ -305,7 +307,7 @@ async def _run_step(self, li: int, force_subproc: bool = False) -> None: # it has been executed successfully self.dump(self.session_folder / f"{li}" / f"{si}_{name}") - self._check_exit_conditions_on_step(loop_id=li, step_id=si) + await self._check_exit_conditions_on_step(loop_id=li, step_id=si) else: logger.warning(f"Step forward {si} of loop {li} is skipped.") @@ -528,7 +530,7 @@ def load( def __getstate__(self) -> dict[str, Any]: res = {} for k, v in self.__dict__.items(): - if k not in ["queue", "semaphores", "_pbar"]: + if k not in ["queue", "semaphores", "_pbar", "_step_n_lock"]: res[k] = v return res @@ -536,6 +538,7 @@ def __setstate__(self, state: dict[str, Any]) -> None: self.__dict__.update(state) self.queue = asyncio.Queue() self.semaphores = {} + self._step_n_lock = asyncio.Lock() def kill_subprocesses() -> None: