From ec92069a767cbe225d920c16b4b101bf46b62fe6 Mon Sep 17 00:00:00 2001 From: JPPhoto Date: Fri, 10 Apr 2026 12:23:38 -0500 Subject: [PATCH 1/5] fix queue item ordering across events and snapshots --- invokeai/app/services/events/events_common.py | 5 + .../session_queue/session_queue_common.py | 4 + .../session_queue/session_queue_sqlite.py | 17 ++- .../app/services/shared/sqlite/sqlite_util.py | 2 + .../migrations/migration_28.py | 33 +++++ .../components/StagingArea/state.test.ts | 138 ++++++++++++++++++ .../components/StagingArea/state.ts | 109 ++++++++++++-- .../frontend/web/src/services/api/schema.ts | 11 ++ .../src/services/events/setEventListeners.tsx | 5 + .../test_session_queue_status_sequence.py | 73 +++++++++ tests/test_sqlite_migrator.py | 24 +++ 11 files changed, 404 insertions(+), 17 deletions(-) create mode 100644 invokeai/app/services/shared/sqlite_migrator/migrations/migration_28.py create mode 100644 tests/app/services/session_queue/test_session_queue_status_sequence.py diff --git a/invokeai/app/services/events/events_common.py b/invokeai/app/services/events/events_common.py index bfb44eb48e8..fa862e561e9 100644 --- a/invokeai/app/services/events/events_common.py +++ b/invokeai/app/services/events/events_common.py @@ -232,6 +232,10 @@ class QueueItemStatusChangedEvent(QueueItemEventBase): __event_name__ = "queue_item_status_changed" status: QUEUE_ITEM_STATUS = Field(description="The new status of the queue item") + status_sequence: int | None = Field( + default=None, + description="A monotonically increasing version for this queue item's visible status lifecycle", + ) error_type: Optional[str] = Field(default=None, description="The error type, if any") error_message: Optional[str] = Field(default=None, description="The error message, if any") error_traceback: Optional[str] = Field(default=None, description="The error traceback, if any") @@ -256,6 +260,7 @@ def build( user_id=queue_item.user_id, session_id=queue_item.session_id, status=queue_item.status, + status_sequence=queue_item.status_sequence, error_type=queue_item.error_type, error_message=queue_item.error_message, error_traceback=queue_item.error_traceback, diff --git a/invokeai/app/services/session_queue/session_queue_common.py b/invokeai/app/services/session_queue/session_queue_common.py index 58544422119..84e21fe13b6 100644 --- a/invokeai/app/services/session_queue/session_queue_common.py +++ b/invokeai/app/services/session_queue/session_queue_common.py @@ -219,6 +219,10 @@ class SessionQueueItem(BaseModel): item_id: int = Field(description="The identifier of the session queue item") status: QUEUE_ITEM_STATUS = Field(default="pending", description="The status of this queue item") + status_sequence: int | None = Field( + default=None, + description="A monotonically increasing version for this queue item's visible status lifecycle", + ) priority: int = Field(default=0, description="The priority of this queue item") batch_id: str = Field(description="The ID of the batch associated with this queue item") origin: str | None = Field( diff --git a/invokeai/app/services/session_queue/session_queue_sqlite.py b/invokeai/app/services/session_queue/session_queue_sqlite.py index 4f46136fd79..3a07580c20e 100644 --- a/invokeai/app/services/session_queue/session_queue_sqlite.py +++ b/invokeai/app/services/session_queue/session_queue_sqlite.py @@ -63,7 +63,8 @@ def _set_in_progress_to_canceled(self) -> None: cursor.execute( """--sql UPDATE session_queue - SET status = 'canceled' + SET status = 'canceled', + status_sequence = status_sequence + 1 WHERE status = 'in_progress'; """ ) @@ -253,7 +254,7 @@ def _set_queue_item_status( cursor.execute( """--sql UPDATE session_queue - SET status = ?, error_type = ?, error_message = ?, error_traceback = ? + SET status = ?, status_sequence = status_sequence + 1, error_type = ?, error_message = ?, error_traceback = ? WHERE item_id = ? """, (status, error_type, error_message, error_traceback, item_id), @@ -435,7 +436,8 @@ def cancel_by_batch_ids( cursor.execute( f"""--sql UPDATE session_queue - SET status = 'canceled' + SET status = 'canceled', + status_sequence = status_sequence + 1 {where}; """, tuple(params), @@ -483,7 +485,8 @@ def cancel_by_destination( cursor.execute( f"""--sql UPDATE session_queue - SET status = 'canceled' + SET status = 'canceled', + status_sequence = status_sequence + 1 {where}; """, tuple(params), @@ -595,7 +598,8 @@ def cancel_by_queue_id(self, queue_id: str) -> CancelByQueueIDResult: cursor.execute( f"""--sql UPDATE session_queue - SET status = 'canceled' + SET status = 'canceled', + status_sequence = status_sequence + 1 {where}; """, tuple(params), @@ -631,7 +635,8 @@ def cancel_all_except_current(self, queue_id: str, user_id: Optional[str] = None cursor.execute( f"""--sql UPDATE session_queue - SET status = 'canceled' + SET status = 'canceled', + status_sequence = status_sequence + 1 {where}; """, tuple(params), diff --git a/invokeai/app/services/shared/sqlite/sqlite_util.py b/invokeai/app/services/shared/sqlite/sqlite_util.py index 645509f1dde..2478e8cdcae 100644 --- a/invokeai/app/services/shared/sqlite/sqlite_util.py +++ b/invokeai/app/services/shared/sqlite/sqlite_util.py @@ -30,6 +30,7 @@ from invokeai.app.services.shared.sqlite_migrator.migrations.migration_25 import build_migration_25 from invokeai.app.services.shared.sqlite_migrator.migrations.migration_26 import build_migration_26 from invokeai.app.services.shared.sqlite_migrator.migrations.migration_27 import build_migration_27 +from invokeai.app.services.shared.sqlite_migrator.migrations.migration_28 import build_migration_28 from invokeai.app.services.shared.sqlite_migrator.sqlite_migrator_impl import SqliteMigrator @@ -77,6 +78,7 @@ def init_db(config: InvokeAIAppConfig, logger: Logger, image_files: ImageFileSto migrator.register_migration(build_migration_25(app_config=config, logger=logger)) migrator.register_migration(build_migration_26(app_config=config, logger=logger)) migrator.register_migration(build_migration_27()) + migrator.register_migration(build_migration_28()) migrator.run_migrations() return db diff --git a/invokeai/app/services/shared/sqlite_migrator/migrations/migration_28.py b/invokeai/app/services/shared/sqlite_migrator/migrations/migration_28.py new file mode 100644 index 00000000000..7ddc05d278d --- /dev/null +++ b/invokeai/app/services/shared/sqlite_migrator/migrations/migration_28.py @@ -0,0 +1,33 @@ +"""Migration 28: Add per-item queue status sequencing. + +This migration adds a `status_sequence` column to `session_queue` so queue item +status updates can be ordered across asynchronous event and snapshot channels. +""" + +import sqlite3 + +from invokeai.app.services.shared.sqlite_migrator.sqlite_migrator_common import Migration + + +class Migration28Callback: + """Add a per-queue-item status sequence for cross-channel ordering.""" + + def __call__(self, cursor: sqlite3.Cursor) -> None: + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='session_queue';") + if cursor.fetchone() is None: + return + + cursor.execute("PRAGMA table_info(session_queue);") + columns = [row[1] for row in cursor.fetchall()] + + if "status_sequence" not in columns: + cursor.execute("ALTER TABLE session_queue ADD COLUMN status_sequence INTEGER DEFAULT 0;") + cursor.execute("UPDATE session_queue SET status_sequence = 0 WHERE status_sequence IS NULL;") + + +def build_migration_28() -> Migration: + return Migration( + from_version=27, + to_version=28, + callback=Migration28Callback(), + ) diff --git a/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.test.ts b/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.test.ts index 1f9687e35aa..785c2ecb606 100644 --- a/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.test.ts +++ b/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.test.ts @@ -1037,6 +1037,144 @@ describe('StagingAreaApi', () => { expect(progressData[1]?.imageDTOs[1]).toBe(imageDTO2); }); + it('should ignore a stale pending snapshot after an item has already completed', async () => { + const imageDTO = createMockImageDTO({ image_name: 'output1.png' }); + mockApp._setImageDTO('output1.png', imageDTO); + + const completedItems = [ + createMockQueueItem({ + item_id: 1, + status: 'completed', + session: { + id: sessionId, + source_prepared_mapping: { + 'canvas_output:abc': ['prepared-1'], + }, + results: { + 'prepared-1': { image: { image_name: 'output1.png' } }, + }, + }, + }), + ]; + + const stalePendingItems = [ + createMockQueueItem({ + item_id: 1, + status: 'pending', + session: { + id: sessionId, + source_prepared_mapping: {}, + results: {}, + }, + }), + ]; + + await api.onItemsChangedEvent(completedItems); + await api.onItemsChangedEvent(stalePendingItems); + + expect(api.$items.get()).toHaveLength(1); + expect(api.$items.get()[0]?.status).toBe('completed'); + expect(api.$isPending.get()).toBe(false); + expect(api.$progressData.get()[1]?.imageDTOs).toEqual([imageDTO]); + }); + + it('should ignore a stale pending snapshot after a completed status event arrives first', async () => { + api.onQueueItemStatusChangedEvent( + createMockQueueItemStatusChangedEvent({ + item_id: 1, + destination: sessionId, + status: 'completed', + }) + ); + + await api.onItemsChangedEvent([ + createMockQueueItem({ + item_id: 1, + status: 'pending', + session: { + id: sessionId, + source_prepared_mapping: {}, + results: {}, + }, + }), + ]); + + expect(api.$items.get()).toEqual([]); + expect(api.$isPending.get()).toBe(false); + }); + + it('should ignore a stale in_progress snapshot after a completed status event arrives first', async () => { + api.onQueueItemStatusChangedEvent( + createMockQueueItemStatusChangedEvent({ + item_id: 1, + destination: sessionId, + status: 'completed', + }) + ); + + await api.onItemsChangedEvent([ + createMockQueueItem({ + item_id: 1, + status: 'in_progress', + session: { + id: sessionId, + source_prepared_mapping: {}, + results: {}, + }, + }), + ]); + + expect(api.$items.get()).toEqual([]); + expect(api.$isPending.get()).toBe(false); + }); + + it('should prefer the higher status_sequence when terminal snapshots disagree', async () => { + const completedItem = createMockQueueItem({ + item_id: 1, + status: 'completed', + }); + (completedItem as typeof completedItem & { status_sequence?: number }).status_sequence = 2; + + const failedItem = createMockQueueItem({ + item_id: 1, + status: 'failed', + }); + (failedItem as typeof failedItem & { status_sequence?: number }).status_sequence = 3; + + await api.onItemsChangedEvent([completedItem]); + await api.onItemsChangedEvent([failedItem]); + + expect(api.$items.get()).toHaveLength(1); + expect(api.$items.get()[0]?.status).toBe('failed'); + }); + + it('should ignore a lower status_sequence on stale in_progress snapshots', async () => { + const completedEvent = createMockQueueItemStatusChangedEvent({ + item_id: 1, + destination: sessionId, + status: 'completed', + }); + (completedEvent as typeof completedEvent & { status_sequence?: number }).status_sequence = 3; + + api.onQueueItemStatusChangedEvent(completedEvent); + + const staleInProgressItem = createMockQueueItem({ + item_id: 1, + status: 'in_progress', + session: { + id: sessionId, + source_prepared_mapping: {}, + results: {}, + }, + }); + (staleInProgressItem as typeof staleInProgressItem & { status_sequence?: number }).status_sequence = 2; + + await api.onItemsChangedEvent([staleInProgressItem]); + + expect(api.$items.get()).toEqual([]); + expect(api.$isPending.get()).toBe(false); + }); + it('should load all images from multiple canvas_output nodes', async () => { const imageDTO1 = createMockImageDTO({ image_name: 'output1.png' }); const imageDTO2 = createMockImageDTO({ image_name: 'output2.png' }); diff --git a/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.ts b/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.ts index 834a3c475c4..5cd14e37eb2 100644 --- a/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.ts +++ b/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.ts @@ -64,6 +64,27 @@ export const getInitialProgressData = (itemId: number): ProgressData => ({ }); type ProgressDataMap = Record; +const TERMINAL_QUEUE_ITEM_STATUS_RANK = 2; +type QueueItemWithStatusSequence = S['SessionQueueItem'] & { status_sequence?: number | null }; +type QueueItemStatusChangedEventWithStatusSequence = S['QueueItemStatusChangedEvent'] & { status_sequence?: number | null }; + +const getQueueItemStatusRank = (status: S['SessionQueueItem']['status']): number => { + switch (status) { + case 'pending': + return 0; + case 'in_progress': + return 1; + case 'completed': + case 'failed': + case 'canceled': + return TERMINAL_QUEUE_ITEM_STATUS_RANK; + } +}; + +const getStatusSequence = (item: { status_sequence?: number | null }): number | undefined => { + return item.status_sequence ?? undefined; +}; + /** * API for managing the Canvas Staging Area - a view of the image generation queue. * Provides reactive state management for pending, in-progress, and completed images. @@ -82,6 +103,13 @@ export class StagingAreaApi { /** Generation counter to prevent stale async writes in onItemsChangedEvent */ _itemsEventGeneration = 0; + /** + * Highest lifecycle status observed for each queue item. + * Used to ignore stale query snapshots that regress terminal items back to pending/in_progress. + */ + _seenItemStatusRanks = new Map(); + _seenItemStatusSequences = new Map(); + /** Item ID of the last started item. Used for auto-switch on start. */ $lastStartedItemId = atom(null); @@ -348,6 +376,19 @@ export class StagingAreaApi { if (data.destination !== this._sessionId) { return; } + const eventWithStatusSequence = data as QueueItemStatusChangedEventWithStatusSequence; + const statusSequence = getStatusSequence(eventWithStatusSequence); + if (statusSequence !== undefined) { + const previousSequence = this._seenItemStatusSequences.get(data.item_id) ?? -1; + if (statusSequence > previousSequence) { + this._seenItemStatusSequences.set(data.item_id, statusSequence); + } + } + const nextRank = getQueueItemStatusRank(data.status); + const previousRank = this._seenItemStatusRanks.get(data.item_id) ?? -1; + if (nextRank > previousRank) { + this._seenItemStatusRanks.set(data.item_id, nextRank); + } if (data.status === 'completed') { /** * There is an unpleasant bit of indirection here. When an item is completed, and auto-switch is set to @@ -375,31 +416,75 @@ export class StagingAreaApi { const generation = ++this._itemsEventGeneration; const oldItems = this.$items.get(); + let didSubstituteStaleItem = false; + const nextItems = items.flatMap((item) => { + const itemWithStatusSequence = item as QueueItemWithStatusSequence; + const statusSequence = getStatusSequence(itemWithStatusSequence); + const previousSequence = this._seenItemStatusSequences.get(item.item_id); + const previousRank = this._seenItemStatusRanks.get(item.item_id); + let shouldAccept = false; + + if (statusSequence !== undefined) { + if (previousSequence === undefined) { + shouldAccept = true; + } else if (statusSequence > previousSequence) { + shouldAccept = true; + } else if (statusSequence === previousSequence) { + shouldAccept = previousRank === undefined || getQueueItemStatusRank(item.status) >= previousRank; + } + } else { + shouldAccept = previousRank === undefined || getQueueItemStatusRank(item.status) >= previousRank; + } - if (items === oldItems) { + if (shouldAccept) { + return [item]; + } + didSubstituteStaleItem = true; + const previousItem = oldItems.find(({ item_id }) => item_id === item.item_id); + return previousItem ? [previousItem] : []; + }); + const normalizedItems = didSubstituteStaleItem ? nextItems : items; + + for (const item of normalizedItems) { + const itemWithStatusSequence = item as QueueItemWithStatusSequence; + const statusSequence = getStatusSequence(itemWithStatusSequence); + const nextRank = getQueueItemStatusRank(item.status); + const previousRank = this._seenItemStatusRanks.get(item.item_id) ?? -1; + if (statusSequence !== undefined) { + const previousSequence = this._seenItemStatusSequences.get(item.item_id) ?? -1; + if (statusSequence > previousSequence) { + this._seenItemStatusSequences.set(item.item_id, statusSequence); + } + } + if (nextRank > previousRank) { + this._seenItemStatusRanks.set(item.item_id, nextRank); + } + } + + if (normalizedItems === oldItems) { return; } - if (items.length === 0) { + if (normalizedItems.length === 0) { // If there are no items, cannot have a selected item. this.$selectedItemId.set(null); this.$selectedImageIndex.set(0); - } else if (this.$selectedItemId.get() === null && items.length > 0) { + } else if (this.$selectedItemId.get() === null && normalizedItems.length > 0) { // If there is no selected item but there are items, select the first one. - this.$selectedItemId.set(items[0]?.item_id ?? null); + this.$selectedItemId.set(normalizedItems[0]?.item_id ?? null); this.$selectedImageIndex.set(0); } const progressData = this.$progressData.get(); for (const [id, datum] of objectEntries(progressData)) { - if (!datum || !items.find(({ item_id }) => item_id === datum.itemId)) { + if (!datum || !normalizedItems.find(({ item_id }) => item_id === datum.itemId)) { this.$progressData.setKey(id, undefined); continue; } } - for (const item of items) { + for (const item of normalizedItems) { const datum = progressData[item.item_id]; if (item.status === 'canceled' || item.status === 'failed') { @@ -467,30 +552,30 @@ export class StagingAreaApi { } const selectedItemId = this.$selectedItemId.get(); - if (selectedItemId !== null && !items.find(({ item_id }) => item_id === selectedItemId)) { + if (selectedItemId !== null && !normalizedItems.find(({ item_id }) => item_id === selectedItemId)) { // If the selected item no longer exists, select the next best item. // Prefer the next item in the list - must check oldItems to determine this const nextItemIndex = oldItems.findIndex(({ item_id }) => item_id === selectedItemId); if (nextItemIndex !== -1) { - const nextItem = items[nextItemIndex] ?? items[nextItemIndex - 1]; + const nextItem = normalizedItems[nextItemIndex] ?? normalizedItems[nextItemIndex - 1]; if (nextItem) { this.$selectedItemId.set(nextItem.item_id); this.$selectedImageIndex.set(0); } } else { // Next, if there is an in-progress item, select that. - const inProgressItem = items.find(({ status }) => status === 'in_progress'); + const inProgressItem = normalizedItems.find(({ status }) => status === 'in_progress'); if (inProgressItem) { this.$selectedItemId.set(inProgressItem.item_id); this.$selectedImageIndex.set(0); } // Finally just select the first item. - this.$selectedItemId.set(items[0]?.item_id ?? null); + this.$selectedItemId.set(normalizedItems[0]?.item_id ?? null); this.$selectedImageIndex.set(0); } } - this.$items.set(items); + this.$items.set(normalizedItems); }; onImageLoaded = (itemId: number) => { @@ -521,6 +606,8 @@ export class StagingAreaApi { /** Cleans up all state and unsubscribes from all events. */ cleanup = () => { this._itemsEventGeneration++; + this._seenItemStatusRanks.clear(); + this._seenItemStatusSequences.clear(); this.$lastStartedItemId.set(null); this.$lastCompletedItemId.set(null); this.$items.set([]); diff --git a/invokeai/frontend/web/src/services/api/schema.ts b/invokeai/frontend/web/src/services/api/schema.ts index 18216bca2c0..5d4c85cd4dd 100644 --- a/invokeai/frontend/web/src/services/api/schema.ts +++ b/invokeai/frontend/web/src/services/api/schema.ts @@ -23020,6 +23020,12 @@ export type components = { * @enum {string} */ status: "pending" | "in_progress" | "completed" | "failed" | "canceled"; + /** + * Status Sequence + * @description A monotonically increasing version for this queue item's visible status lifecycle + * @default null + */ + status_sequence: number | null; /** * Error Type * @description The error type, if any @@ -25191,6 +25197,11 @@ export type components = { * @enum {string} */ status: "pending" | "in_progress" | "completed" | "failed" | "canceled"; + /** + * Status Sequence + * @description A monotonically increasing version for this queue item's visible status lifecycle + */ + status_sequence?: number | null; /** * Priority * @description The priority of this queue item diff --git a/invokeai/frontend/web/src/services/events/setEventListeners.tsx b/invokeai/frontend/web/src/services/events/setEventListeners.tsx index 2e0ff2251eb..de772ba88a3 100644 --- a/invokeai/frontend/web/src/services/events/setEventListeners.tsx +++ b/invokeai/frontend/web/src/services/events/setEventListeners.tsx @@ -53,6 +53,8 @@ type SetEventListenersArg = { setIsConnected: (isConnected: boolean) => void; }; +type QueueItemWithStatusSequence = { status_sequence?: number | null }; + const selectModelInstalls = modelsApi.endpoints.listModelInstalls.select(); /** @@ -387,6 +389,7 @@ export const setEventListeners = ({ socket, store, setIsConnected }: SetEventLis const { item_id, status, + status_sequence, batch_status, error_type, error_message, @@ -403,6 +406,7 @@ export const setEventListeners = ({ socket, store, setIsConnected }: SetEventLis dispatch( queueApi.util.updateQueryData('getQueueItem', item_id, (draft) => { draft.status = status; + (draft as typeof draft & QueueItemWithStatusSequence).status_sequence = status_sequence; draft.started_at = started_at; draft.updated_at = updated_at; draft.completed_at = completed_at; @@ -420,6 +424,7 @@ export const setEventListeners = ({ socket, store, setIsConnected }: SetEventLis const item = draft.find((i) => i.item_id === item_id); if (item) { item.status = status; + (item as typeof item & QueueItemWithStatusSequence).status_sequence = status_sequence; item.started_at = started_at; item.updated_at = updated_at; item.completed_at = completed_at; diff --git a/tests/app/services/session_queue/test_session_queue_status_sequence.py b/tests/app/services/session_queue/test_session_queue_status_sequence.py new file mode 100644 index 00000000000..8b5d3e0ad6d --- /dev/null +++ b/tests/app/services/session_queue/test_session_queue_status_sequence.py @@ -0,0 +1,73 @@ +import uuid + +import pytest + +from invokeai.app.services.events.events_common import QueueItemStatusChangedEvent +from invokeai.app.services.invoker import Invoker +from invokeai.app.services.session_queue.session_queue_sqlite import SqliteSessionQueue +from invokeai.app.services.shared.graph import Graph, GraphExecutionState +from tests.test_nodes import PromptTestInvocation, TestEventService + + +@pytest.fixture +def session_queue(mock_invoker: Invoker) -> SqliteSessionQueue: + db = mock_invoker.services.board_records._db + queue = SqliteSessionQueue(db=db) + queue.start(mock_invoker) + return queue + + +def _insert_queue_item(session_queue: SqliteSessionQueue) -> int: + graph = Graph() + graph.add_node(PromptTestInvocation(id="prompt", prompt="test")) + session = GraphExecutionState(graph=graph) + session_json = session.model_dump_json(warnings=False, exclude_none=True) + batch_id = str(uuid.uuid4()) + with session_queue._db.transaction() as cursor: + cursor.execute( + """--sql + INSERT INTO session_queue ( + queue_id, + session, + session_id, + batch_id, + field_values, + priority, + workflow, + origin, + destination, + retried_from_item_id, + user_id + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ("default", session_json, session.id, batch_id, None, 0, None, None, None, None, "system"), + ) + return cursor.lastrowid + + +def test_status_sequence_increments_for_queue_item_lifecycle( + session_queue: SqliteSessionQueue, mock_invoker: Invoker +) -> None: + item_id = _insert_queue_item(session_queue) + + pending_item = session_queue.get_queue_item(item_id) + assert pending_item.status == "pending" + assert pending_item.status_sequence == 0 + + in_progress_item = session_queue.dequeue() + assert in_progress_item is not None + assert in_progress_item.item_id == item_id + assert in_progress_item.status == "in_progress" + assert in_progress_item.status_sequence == 1 + + completed_item = session_queue.complete_queue_item(item_id) + assert completed_item.status == "completed" + assert completed_item.status_sequence == 2 + + event_bus: TestEventService = mock_invoker.services.events + status_events = [event for event in event_bus.events if isinstance(event, QueueItemStatusChangedEvent)] + + assert len(status_events) == 2 + assert [event.status for event in status_events] == ["in_progress", "completed"] + assert [event.status_sequence for event in status_events] == [1, 2] diff --git a/tests/test_sqlite_migrator.py b/tests/test_sqlite_migrator.py index e03224b5a9a..7012cf6db27 100644 --- a/tests/test_sqlite_migrator.py +++ b/tests/test_sqlite_migrator.py @@ -250,6 +250,30 @@ def test_migrator_runs_all_migrations_file(logger: Logger) -> None: db._conn.close() +def test_migration_28_adds_status_sequence_to_session_queue(memory_db_conn: sqlite3.Connection) -> None: + from invokeai.app.services.shared.sqlite_migrator.migrations.migration_28 import Migration28Callback + + cursor = memory_db_conn.cursor() + cursor.execute( + """--sql + CREATE TABLE IF NOT EXISTS session_queue ( + item_id INTEGER PRIMARY KEY, + status TEXT NOT NULL DEFAULT 'pending' + ); + """ + ) + cursor.execute("INSERT INTO session_queue (item_id, status) VALUES (1, 'pending');") + + Migration28Callback()(cursor) + + cursor.execute("PRAGMA table_info(session_queue);") + columns = {row[1]: row for row in cursor.fetchall()} + assert "status_sequence" in columns + + cursor.execute("SELECT status_sequence FROM session_queue WHERE item_id = 1;") + assert cursor.fetchone()[0] == 0 + + def test_migrator_backs_up_db(logger: Logger) -> None: with TemporaryDirectory() as tempdir: original_db_path = Path(tempdir) / "invokeai.db" From 2b6010a91ea492e6ffeed8f5433ef5df43a63410 Mon Sep 17 00:00:00 2001 From: JPPhoto Date: Fri, 10 Apr 2026 12:32:15 -0500 Subject: [PATCH 2/5] clean up queue item status sequencing --- .../session_queue/session_queue_sqlite.py | 12 +- .../components/StagingArea/state.test.ts | 38 +++++- .../components/StagingArea/state.ts | 111 ++++++++++-------- .../src/services/events/setEventListeners.tsx | 6 +- .../test_session_queue_status_sequence.py | 21 +++- tests/test_sqlite_migrator.py | 9 +- 6 files changed, 131 insertions(+), 66 deletions(-) diff --git a/invokeai/app/services/session_queue/session_queue_sqlite.py b/invokeai/app/services/session_queue/session_queue_sqlite.py index 3a07580c20e..7cc8171e666 100644 --- a/invokeai/app/services/session_queue/session_queue_sqlite.py +++ b/invokeai/app/services/session_queue/session_queue_sqlite.py @@ -64,7 +64,7 @@ def _set_in_progress_to_canceled(self) -> None: """--sql UPDATE session_queue SET status = 'canceled', - status_sequence = status_sequence + 1 + status_sequence = COALESCE(status_sequence, 0) + 1 WHERE status = 'in_progress'; """ ) @@ -254,7 +254,7 @@ def _set_queue_item_status( cursor.execute( """--sql UPDATE session_queue - SET status = ?, status_sequence = status_sequence + 1, error_type = ?, error_message = ?, error_traceback = ? + SET status = ?, status_sequence = COALESCE(status_sequence, 0) + 1, error_type = ?, error_message = ?, error_traceback = ? WHERE item_id = ? """, (status, error_type, error_message, error_traceback, item_id), @@ -437,7 +437,7 @@ def cancel_by_batch_ids( f"""--sql UPDATE session_queue SET status = 'canceled', - status_sequence = status_sequence + 1 + status_sequence = COALESCE(status_sequence, 0) + 1 {where}; """, tuple(params), @@ -486,7 +486,7 @@ def cancel_by_destination( f"""--sql UPDATE session_queue SET status = 'canceled', - status_sequence = status_sequence + 1 + status_sequence = COALESCE(status_sequence, 0) + 1 {where}; """, tuple(params), @@ -599,7 +599,7 @@ def cancel_by_queue_id(self, queue_id: str) -> CancelByQueueIDResult: f"""--sql UPDATE session_queue SET status = 'canceled', - status_sequence = status_sequence + 1 + status_sequence = COALESCE(status_sequence, 0) + 1 {where}; """, tuple(params), @@ -636,7 +636,7 @@ def cancel_all_except_current(self, queue_id: str, user_id: Optional[str] = None f"""--sql UPDATE session_queue SET status = 'canceled', - status_sequence = status_sequence + 1 + status_sequence = COALESCE(status_sequence, 0) + 1 {where}; """, tuple(params), diff --git a/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.test.ts b/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.test.ts index 785c2ecb606..76f5d76adfc 100644 --- a/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.test.ts +++ b/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.test.ts @@ -840,6 +840,8 @@ describe('StagingAreaApi', () => { api.$items.set([createMockQueueItem({ item_id: 1 })]); api.$lastStartedItemId.set(1); api.$lastCompletedItemId.set(1); + api._seenItemStatusRanks.set(1, 2); + api._seenItemStatusSequences.set(1, 3); api.$progressData.setKey(1, { itemId: 1, progressEvent: null, @@ -856,6 +858,8 @@ describe('StagingAreaApi', () => { expect(api.$lastStartedItemId.get()).toBe(null); expect(api.$lastCompletedItemId.get()).toBe(null); expect(api.$progressData.get()).toEqual({}); + expect(api._seenItemStatusRanks.size).toBe(0); + expect(api._seenItemStatusSequences.size).toBe(0); }); }); @@ -1132,14 +1136,14 @@ describe('StagingAreaApi', () => { const completedItem = createMockQueueItem({ item_id: 1, status: 'completed', + status_sequence: 2, }); - (completedItem as typeof completedItem & { status_sequence?: number }).status_sequence = 2; const failedItem = createMockQueueItem({ item_id: 1, status: 'failed', + status_sequence: 3, }); - (failedItem as typeof failedItem & { status_sequence?: number }).status_sequence = 3; await api.onItemsChangedEvent([completedItem]); await api.onItemsChangedEvent([failedItem]); @@ -1153,21 +1157,21 @@ describe('StagingAreaApi', () => { item_id: 1, destination: sessionId, status: 'completed', + status_sequence: 3, }); - (completedEvent as typeof completedEvent & { status_sequence?: number }).status_sequence = 3; api.onQueueItemStatusChangedEvent(completedEvent); const staleInProgressItem = createMockQueueItem({ item_id: 1, status: 'in_progress', + status_sequence: 2, session: { id: sessionId, source_prepared_mapping: {}, results: {}, }, }); - (staleInProgressItem as typeof staleInProgressItem & { status_sequence?: number }).status_sequence = 2; await api.onItemsChangedEvent([staleInProgressItem]); @@ -1175,6 +1179,32 @@ describe('StagingAreaApi', () => { expect(api.$isPending.get()).toBe(false); }); + it('should prune seen item ordering when an item leaves the staging area', async () => { + await api.onItemsChangedEvent([ + createMockQueueItem({ + item_id: 1, + status: 'completed', + status_sequence: 2, + }), + ]); + + expect(api._seenItemStatusRanks.get(1)).toBe(2); + expect(api._seenItemStatusSequences.get(1)).toBe(2); + + await api.onItemsChangedEvent([ + createMockQueueItem({ + item_id: 2, + status: 'pending', + status_sequence: 0, + }), + ]); + + expect(api._seenItemStatusRanks.has(1)).toBe(false); + expect(api._seenItemStatusSequences.has(1)).toBe(false); + expect(api._seenItemStatusRanks.get(2)).toBe(0); + expect(api._seenItemStatusSequences.get(2)).toBe(0); + }); + it('should load all images from multiple canvas_output nodes', async () => { const imageDTO1 = createMockImageDTO({ image_name: 'output1.png' }); const imageDTO2 = createMockImageDTO({ image_name: 'output2.png' }); diff --git a/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.ts b/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.ts index 5cd14e37eb2..bbf92ff74c7 100644 --- a/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.ts +++ b/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.ts @@ -65,8 +65,6 @@ export const getInitialProgressData = (itemId: number): ProgressData => ({ type ProgressDataMap = Record; const TERMINAL_QUEUE_ITEM_STATUS_RANK = 2; -type QueueItemWithStatusSequence = S['SessionQueueItem'] & { status_sequence?: number | null }; -type QueueItemStatusChangedEventWithStatusSequence = S['QueueItemStatusChangedEvent'] & { status_sequence?: number | null }; const getQueueItemStatusRank = (status: S['SessionQueueItem']['status']): number => { switch (status) { @@ -110,6 +108,63 @@ export class StagingAreaApi { _seenItemStatusRanks = new Map(); _seenItemStatusSequences = new Map(); + _recordSeenItemOrdering = ( + itemId: number, + status: S['SessionQueueItem']['status'], + statusSequence?: number + ): void => { + if (statusSequence !== undefined) { + const previousSequence = this._seenItemStatusSequences.get(itemId) ?? -1; + if (statusSequence > previousSequence) { + this._seenItemStatusSequences.set(itemId, statusSequence); + } + } + + const nextRank = getQueueItemStatusRank(status); + const previousRank = this._seenItemStatusRanks.get(itemId) ?? -1; + if (nextRank > previousRank) { + this._seenItemStatusRanks.set(itemId, nextRank); + } + }; + + _shouldAcceptQueueItem = (item: S['SessionQueueItem']): boolean => { + const statusSequence = getStatusSequence(item); + const previousSequence = this._seenItemStatusSequences.get(item.item_id); + const previousRank = this._seenItemStatusRanks.get(item.item_id); + const nextRank = getQueueItemStatusRank(item.status); + + if (statusSequence !== undefined) { + if (previousSequence === undefined) { + return true; + } + if (statusSequence > previousSequence) { + return true; + } + if (statusSequence < previousSequence) { + return false; + } + return previousRank === undefined || nextRank >= previousRank; + } + + return previousRank === undefined || nextRank >= previousRank; + }; + + _pruneSeenItemOrdering = (items: S['SessionQueueItem'][]): void => { + const itemIds = new Set(items.map(({ item_id }) => item_id)); + + for (const itemId of this._seenItemStatusRanks.keys()) { + if (!itemIds.has(itemId)) { + this._seenItemStatusRanks.delete(itemId); + } + } + + for (const itemId of this._seenItemStatusSequences.keys()) { + if (!itemIds.has(itemId)) { + this._seenItemStatusSequences.delete(itemId); + } + } + }; + /** Item ID of the last started item. Used for auto-switch on start. */ $lastStartedItemId = atom(null); @@ -376,19 +431,7 @@ export class StagingAreaApi { if (data.destination !== this._sessionId) { return; } - const eventWithStatusSequence = data as QueueItemStatusChangedEventWithStatusSequence; - const statusSequence = getStatusSequence(eventWithStatusSequence); - if (statusSequence !== undefined) { - const previousSequence = this._seenItemStatusSequences.get(data.item_id) ?? -1; - if (statusSequence > previousSequence) { - this._seenItemStatusSequences.set(data.item_id, statusSequence); - } - } - const nextRank = getQueueItemStatusRank(data.status); - const previousRank = this._seenItemStatusRanks.get(data.item_id) ?? -1; - if (nextRank > previousRank) { - this._seenItemStatusRanks.set(data.item_id, nextRank); - } + this._recordSeenItemOrdering(data.item_id, data.status, getStatusSequence(data)); if (data.status === 'completed') { /** * There is an unpleasant bit of indirection here. When an item is completed, and auto-switch is set to @@ -416,50 +459,22 @@ export class StagingAreaApi { const generation = ++this._itemsEventGeneration; const oldItems = this.$items.get(); + const oldItemsById = new Map(oldItems.map((item) => [item.item_id, item])); let didSubstituteStaleItem = false; const nextItems = items.flatMap((item) => { - const itemWithStatusSequence = item as QueueItemWithStatusSequence; - const statusSequence = getStatusSequence(itemWithStatusSequence); - const previousSequence = this._seenItemStatusSequences.get(item.item_id); - const previousRank = this._seenItemStatusRanks.get(item.item_id); - let shouldAccept = false; - - if (statusSequence !== undefined) { - if (previousSequence === undefined) { - shouldAccept = true; - } else if (statusSequence > previousSequence) { - shouldAccept = true; - } else if (statusSequence === previousSequence) { - shouldAccept = previousRank === undefined || getQueueItemStatusRank(item.status) >= previousRank; - } - } else { - shouldAccept = previousRank === undefined || getQueueItemStatusRank(item.status) >= previousRank; - } - - if (shouldAccept) { + if (this._shouldAcceptQueueItem(item)) { return [item]; } didSubstituteStaleItem = true; - const previousItem = oldItems.find(({ item_id }) => item_id === item.item_id); + const previousItem = oldItemsById.get(item.item_id); return previousItem ? [previousItem] : []; }); const normalizedItems = didSubstituteStaleItem ? nextItems : items; for (const item of normalizedItems) { - const itemWithStatusSequence = item as QueueItemWithStatusSequence; - const statusSequence = getStatusSequence(itemWithStatusSequence); - const nextRank = getQueueItemStatusRank(item.status); - const previousRank = this._seenItemStatusRanks.get(item.item_id) ?? -1; - if (statusSequence !== undefined) { - const previousSequence = this._seenItemStatusSequences.get(item.item_id) ?? -1; - if (statusSequence > previousSequence) { - this._seenItemStatusSequences.set(item.item_id, statusSequence); - } - } - if (nextRank > previousRank) { - this._seenItemStatusRanks.set(item.item_id, nextRank); - } + this._recordSeenItemOrdering(item.item_id, item.status, getStatusSequence(item)); } + this._pruneSeenItemOrdering(normalizedItems); if (normalizedItems === oldItems) { return; diff --git a/invokeai/frontend/web/src/services/events/setEventListeners.tsx b/invokeai/frontend/web/src/services/events/setEventListeners.tsx index de772ba88a3..ff143845df2 100644 --- a/invokeai/frontend/web/src/services/events/setEventListeners.tsx +++ b/invokeai/frontend/web/src/services/events/setEventListeners.tsx @@ -53,8 +53,6 @@ type SetEventListenersArg = { setIsConnected: (isConnected: boolean) => void; }; -type QueueItemWithStatusSequence = { status_sequence?: number | null }; - const selectModelInstalls = modelsApi.endpoints.listModelInstalls.select(); /** @@ -406,7 +404,7 @@ export const setEventListeners = ({ socket, store, setIsConnected }: SetEventLis dispatch( queueApi.util.updateQueryData('getQueueItem', item_id, (draft) => { draft.status = status; - (draft as typeof draft & QueueItemWithStatusSequence).status_sequence = status_sequence; + draft.status_sequence = status_sequence; draft.started_at = started_at; draft.updated_at = updated_at; draft.completed_at = completed_at; @@ -424,7 +422,7 @@ export const setEventListeners = ({ socket, store, setIsConnected }: SetEventLis const item = draft.find((i) => i.item_id === item_id); if (item) { item.status = status; - (item as typeof item & QueueItemWithStatusSequence).status_sequence = status_sequence; + item.status_sequence = status_sequence; item.started_at = started_at; item.updated_at = updated_at; item.completed_at = completed_at; diff --git a/tests/app/services/session_queue/test_session_queue_status_sequence.py b/tests/app/services/session_queue/test_session_queue_status_sequence.py index 8b5d3e0ad6d..a06c621c298 100644 --- a/tests/app/services/session_queue/test_session_queue_status_sequence.py +++ b/tests/app/services/session_queue/test_session_queue_status_sequence.py @@ -17,7 +17,11 @@ def session_queue(mock_invoker: Invoker) -> SqliteSessionQueue: return queue -def _insert_queue_item(session_queue: SqliteSessionQueue) -> int: +def _insert_queue_item( + session_queue: SqliteSessionQueue, + queue_id: str = "default", + destination: str | None = None, +) -> int: graph = Graph() graph.add_node(PromptTestInvocation(id="prompt", prompt="test")) session = GraphExecutionState(graph=graph) @@ -41,7 +45,7 @@ def _insert_queue_item(session_queue: SqliteSessionQueue) -> int: ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, - ("default", session_json, session.id, batch_id, None, 0, None, None, None, None, "system"), + (queue_id, session_json, session.id, batch_id, None, 0, None, None, destination, None, "system"), ) return cursor.lastrowid @@ -71,3 +75,16 @@ def test_status_sequence_increments_for_queue_item_lifecycle( assert len(status_events) == 2 assert [event.status for event in status_events] == ["in_progress", "completed"] assert [event.status_sequence for event in status_events] == [1, 2] + + +def test_status_sequence_increments_for_bulk_cancel_paths(session_queue: SqliteSessionQueue) -> None: + first_item_id = _insert_queue_item(session_queue) + second_item_id = _insert_queue_item(session_queue) + + result = session_queue.cancel_all_except_current("default") + + assert result.canceled == 2 + assert session_queue.get_queue_item(first_item_id).status == "canceled" + assert session_queue.get_queue_item(first_item_id).status_sequence == 1 + assert session_queue.get_queue_item(second_item_id).status == "canceled" + assert session_queue.get_queue_item(second_item_id).status_sequence == 1 diff --git a/tests/test_sqlite_migrator.py b/tests/test_sqlite_migrator.py index 7012cf6db27..7d4a5315a80 100644 --- a/tests/test_sqlite_migrator.py +++ b/tests/test_sqlite_migrator.py @@ -265,10 +265,15 @@ def test_migration_28_adds_status_sequence_to_session_queue(memory_db_conn: sqli cursor.execute("INSERT INTO session_queue (item_id, status) VALUES (1, 'pending');") Migration28Callback()(cursor) + Migration28Callback()(cursor) + + cursor.execute("PRAGMA table_info(session_queue);") + columns = [row[1] for row in cursor.fetchall()] + assert columns.count("status_sequence") == 1 cursor.execute("PRAGMA table_info(session_queue);") - columns = {row[1]: row for row in cursor.fetchall()} - assert "status_sequence" in columns + columns_by_name = {row[1]: row for row in cursor.fetchall()} + assert "status_sequence" in columns_by_name cursor.execute("SELECT status_sequence FROM session_queue WHERE item_id = 1;") assert cursor.fetchone()[0] == 0 From 6f99424a8983e09e008bd913110a51ef1d65e618 Mon Sep 17 00:00:00 2001 From: JPPhoto Date: Fri, 10 Apr 2026 12:37:05 -0500 Subject: [PATCH 3/5] trim white-box queue sequencing tests --- .../components/StagingArea/state.test.ts | 30 ------------------- tests/test_sqlite_migrator.py | 30 ------------------- 2 files changed, 60 deletions(-) diff --git a/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.test.ts b/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.test.ts index 76f5d76adfc..3e5a1272e09 100644 --- a/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.test.ts +++ b/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.test.ts @@ -840,8 +840,6 @@ describe('StagingAreaApi', () => { api.$items.set([createMockQueueItem({ item_id: 1 })]); api.$lastStartedItemId.set(1); api.$lastCompletedItemId.set(1); - api._seenItemStatusRanks.set(1, 2); - api._seenItemStatusSequences.set(1, 3); api.$progressData.setKey(1, { itemId: 1, progressEvent: null, @@ -858,8 +856,6 @@ describe('StagingAreaApi', () => { expect(api.$lastStartedItemId.get()).toBe(null); expect(api.$lastCompletedItemId.get()).toBe(null); expect(api.$progressData.get()).toEqual({}); - expect(api._seenItemStatusRanks.size).toBe(0); - expect(api._seenItemStatusSequences.size).toBe(0); }); }); @@ -1179,32 +1175,6 @@ describe('StagingAreaApi', () => { expect(api.$isPending.get()).toBe(false); }); - it('should prune seen item ordering when an item leaves the staging area', async () => { - await api.onItemsChangedEvent([ - createMockQueueItem({ - item_id: 1, - status: 'completed', - status_sequence: 2, - }), - ]); - - expect(api._seenItemStatusRanks.get(1)).toBe(2); - expect(api._seenItemStatusSequences.get(1)).toBe(2); - - await api.onItemsChangedEvent([ - createMockQueueItem({ - item_id: 2, - status: 'pending', - status_sequence: 0, - }), - ]); - - expect(api._seenItemStatusRanks.has(1)).toBe(false); - expect(api._seenItemStatusSequences.has(1)).toBe(false); - expect(api._seenItemStatusRanks.get(2)).toBe(0); - expect(api._seenItemStatusSequences.get(2)).toBe(0); - }); - it('should load all images from multiple canvas_output nodes', async () => { const imageDTO1 = createMockImageDTO({ image_name: 'output1.png' }); const imageDTO2 = createMockImageDTO({ image_name: 'output2.png' }); diff --git a/tests/test_sqlite_migrator.py b/tests/test_sqlite_migrator.py index 7d4a5315a80..04a907bd99d 100644 --- a/tests/test_sqlite_migrator.py +++ b/tests/test_sqlite_migrator.py @@ -249,36 +249,6 @@ def test_migrator_runs_all_migrations_file(logger: Logger) -> None: # Must manually close else we get an error on Windows db._conn.close() - -def test_migration_28_adds_status_sequence_to_session_queue(memory_db_conn: sqlite3.Connection) -> None: - from invokeai.app.services.shared.sqlite_migrator.migrations.migration_28 import Migration28Callback - - cursor = memory_db_conn.cursor() - cursor.execute( - """--sql - CREATE TABLE IF NOT EXISTS session_queue ( - item_id INTEGER PRIMARY KEY, - status TEXT NOT NULL DEFAULT 'pending' - ); - """ - ) - cursor.execute("INSERT INTO session_queue (item_id, status) VALUES (1, 'pending');") - - Migration28Callback()(cursor) - Migration28Callback()(cursor) - - cursor.execute("PRAGMA table_info(session_queue);") - columns = [row[1] for row in cursor.fetchall()] - assert columns.count("status_sequence") == 1 - - cursor.execute("PRAGMA table_info(session_queue);") - columns_by_name = {row[1]: row for row in cursor.fetchall()} - assert "status_sequence" in columns_by_name - - cursor.execute("SELECT status_sequence FROM session_queue WHERE item_id = 1;") - assert cursor.fetchone()[0] == 0 - - def test_migrator_backs_up_db(logger: Logger) -> None: with TemporaryDirectory() as tempdir: original_db_path = Path(tempdir) / "invokeai.db" From a510262cea7da35bcd826be123de1ec8708eb82e Mon Sep 17 00:00:00 2001 From: JPPhoto Date: Fri, 10 Apr 2026 12:41:43 -0500 Subject: [PATCH 4/5] format sqlite migrator tests --- tests/test_sqlite_migrator.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_sqlite_migrator.py b/tests/test_sqlite_migrator.py index 04a907bd99d..e03224b5a9a 100644 --- a/tests/test_sqlite_migrator.py +++ b/tests/test_sqlite_migrator.py @@ -249,6 +249,7 @@ def test_migrator_runs_all_migrations_file(logger: Logger) -> None: # Must manually close else we get an error on Windows db._conn.close() + def test_migrator_backs_up_db(logger: Logger) -> None: with TemporaryDirectory() as tempdir: original_db_path = Path(tempdir) / "invokeai.db" From da993bfdf4a6d74157595d0986f0e52460d25c24 Mon Sep 17 00:00:00 2001 From: JPPhoto Date: Wed, 22 Apr 2026 21:58:34 -0500 Subject: [PATCH 5/5] clarify queue status sequencing intent --- .../services/session_queue/session_queue_common.py | 1 + .../controlLayers/components/StagingArea/state.ts | 2 ++ .../test_session_queue_status_sequence.py | 13 +++++++++++++ 3 files changed, 16 insertions(+) diff --git a/invokeai/app/services/session_queue/session_queue_common.py b/invokeai/app/services/session_queue/session_queue_common.py index ba9b71ea522..d87221fbbae 100644 --- a/invokeai/app/services/session_queue/session_queue_common.py +++ b/invokeai/app/services/session_queue/session_queue_common.py @@ -221,6 +221,7 @@ class SessionQueueItem(BaseModel): status: QUEUE_ITEM_STATUS = Field(default="pending", description="The status of this queue item") status_sequence: int | None = Field( default=None, + # Fallback for rows serialized before migration_28 added the DB-level default of 0. description="A monotonically increasing version for this queue item's visible status lifecycle", ) priority: int = Field(default=0, description="The priority of this queue item") diff --git a/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.ts b/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.ts index bbf92ff74c7..6c16a8fdf22 100644 --- a/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.ts +++ b/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.ts @@ -143,6 +143,7 @@ export class StagingAreaApi { if (statusSequence < previousSequence) { return false; } + // Equal-sequence updates should be rare; if they happen, let the later terminal-vs-terminal arrival win. return previousRank === undefined || nextRank >= previousRank; } @@ -152,6 +153,7 @@ export class StagingAreaApi { _pruneSeenItemOrdering = (items: S['SessionQueueItem'][]): void => { const itemIds = new Set(items.map(({ item_id }) => item_id)); + // Evict vanished items so long-lived sessions do not grow these maps without bound. for (const itemId of this._seenItemStatusRanks.keys()) { if (!itemIds.has(itemId)) { this._seenItemStatusRanks.delete(itemId); diff --git a/tests/app/services/session_queue/test_session_queue_status_sequence.py b/tests/app/services/session_queue/test_session_queue_status_sequence.py index a06c621c298..0dcf8fd0f71 100644 --- a/tests/app/services/session_queue/test_session_queue_status_sequence.py +++ b/tests/app/services/session_queue/test_session_queue_status_sequence.py @@ -88,3 +88,16 @@ def test_status_sequence_increments_for_bulk_cancel_paths(session_queue: SqliteS assert session_queue.get_queue_item(first_item_id).status_sequence == 1 assert session_queue.get_queue_item(second_item_id).status == "canceled" assert session_queue.get_queue_item(second_item_id).status_sequence == 1 + + +def test_status_sequence_continues_after_dequeue_then_cancel(session_queue: SqliteSessionQueue) -> None: + item_id = _insert_queue_item(session_queue) + + in_progress_item = session_queue.dequeue() + assert in_progress_item is not None + assert in_progress_item.item_id == item_id + assert in_progress_item.status_sequence == 1 + + canceled_item = session_queue.cancel_queue_item(item_id) + assert canceled_item.status == "canceled" + assert canceled_item.status_sequence == 2