refactor: split session startup from streaming to enforce add-cancel-end ordering#4545
refactor: split session startup from streaming to enforce add-cancel-end ordering#4545lvhan028 wants to merge 3 commits intoInternLM:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Refactors session lifecycle handling so session creation/startup is separated from streaming decode, aiming to enforce a consistent add → cancel → end ordering across backends (notably PyTorch MP engine).
Changes:
- Introduce
async_start_session(session_id)as an explicit pre-streaming lifecycle step across engine instances (TM is a no-op; PyTorch performsADD_SESSION). - Add a per-session
_lifecycle_lockto serialize startup/abort/close operations without reducing cross-session concurrency. - Update
AsyncEngine.generate()to callhandle.async_start_session()before starting streaming inference.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| lmdeploy/turbomind/turbomind.py | Adds no-op async_start_session for TurboMind instances. |
| lmdeploy/serve/managers/session_manager.py | Adds per-session lifecycle lock and wraps abort/close with it. |
| lmdeploy/serve/core/async_engine.py | Calls async_start_session during request startup and updates comments around session end handling. |
| lmdeploy/pytorch/engine/mp_engine/base_worker.py | Exposes async_start_session through worker and instance pool RPC surface. |
| lmdeploy/pytorch/engine/mp_engine/base.py | Moves “session exists” side-effect out of streaming into explicit startup call. |
| lmdeploy/pytorch/engine/engine_instance.py | Implements async_start_session (ADD_SESSION) and removes implicit ADD_SESSION from streaming. |
| lmdeploy/pytorch/engine/base.py | Adds async_start_session to EngineInstanceBase interface. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| async with session.request_handle() as handle: | ||
| if session.epoch is not None and session.epoch != self.epoch: | ||
| logger.info(f'[generate] session {session_id} got aborted before starting inference, ' | ||
| f'session.epoch={session.epoch}, async_engine.epoch={self.epoch}') | ||
| metrics_processor.increase_failed_requests('abort') | ||
| yield GenOut(response='', | ||
| history_token_len=0, | ||
| input_token_len=len(input_ids), | ||
| generate_token_len=0, | ||
| finish_reason='abort', | ||
| token_ids=[]) | ||
| return | ||
| # Serialize same-session lifecycle operations during startup only. | ||
| # Once request startup is complete, decode streaming remains lock-free. | ||
| async with session._lifecycle_lock: | ||
| if session.epoch is not None and session.epoch != self.epoch: |
There was a problem hiding this comment.
Potential deadlock due to lock ordering: generate() acquires session.request_handle() (which sets session._handle/_active) before taking session._lifecycle_lock. If another task calls session.async_close() between those steps, it can take _lifecycle_lock and then await _active, while generate() is blocked waiting for _lifecycle_lock, leaving _active unset forever. To avoid this, take _lifecycle_lock before entering request_handle(), or ensure async_close() never waits on _active while holding _lifecycle_lock.
| async with self._lifecycle_lock: | ||
| logger.info(f'[session] Ending session {self.session_id}') | ||
| # Already closed/reset; keep end idempotent. | ||
| if self._session_mgr is None: | ||
| return | ||
| if self._handle is None and self.step == 0: | ||
| return | ||
| if self._handle is not None: | ||
| await self._active.wait() | ||
| async with self.request_handle() as handle: | ||
| try: | ||
| await handle.async_end(self.session_id) | ||
| except (Exception, asyncio.CancelledError, GeneratorExit) as e: | ||
| logger.exception(f'[async_close] exception caught: {e}') | ||
| self.reset() |
There was a problem hiding this comment.
Holding _lifecycle_lock while awaiting _active can deadlock with AsyncEngine.generate(): generate() enters request_handle() (sets _handle/_active) and then waits for _lifecycle_lock, while async_close() can acquire _lifecycle_lock first and then wait on _active, preventing generate() from ever progressing to release the handle. Consider releasing _lifecycle_lock before await self._active.wait() (e.g., capture the event under the lock, then await outside), or adjust lock acquisition order so generate() takes _lifecycle_lock before request_handle().
| async with self._lifecycle_lock: | |
| logger.info(f'[session] Ending session {self.session_id}') | |
| # Already closed/reset; keep end idempotent. | |
| if self._session_mgr is None: | |
| return | |
| if self._handle is None and self.step == 0: | |
| return | |
| if self._handle is not None: | |
| await self._active.wait() | |
| async with self.request_handle() as handle: | |
| try: | |
| await handle.async_end(self.session_id) | |
| except (Exception, asyncio.CancelledError, GeneratorExit) as e: | |
| logger.exception(f'[async_close] exception caught: {e}') | |
| self.reset() | |
| while True: | |
| active = None | |
| async with self._lifecycle_lock: | |
| logger.info(f'[session] Ending session {self.session_id}') | |
| # Already closed/reset; keep end idempotent. | |
| if self._session_mgr is None: | |
| return | |
| if self._handle is None and self.step == 0: | |
| return | |
| if self._handle is not None: | |
| active = self._active | |
| else: | |
| async with self.request_handle() as handle: | |
| try: | |
| await handle.async_end(self.session_id) | |
| except (Exception, asyncio.CancelledError, GeneratorExit) as e: | |
| logger.exception(f'[async_close] exception caught: {e}') | |
| self.reset() | |
| return | |
| await active.wait() |
No description provided.