diff --git a/src/platform/remote/comfyui/useQueuePolling.test.ts b/src/platform/remote/comfyui/useQueuePolling.test.ts index 79a3c4b6130..d13c15be149 100644 --- a/src/platform/remote/comfyui/useQueuePolling.test.ts +++ b/src/platform/remote/comfyui/useQueuePolling.test.ts @@ -63,13 +63,13 @@ describe('useQueuePolling', () => { expect(store.update).toHaveBeenCalledOnce() }) - it('does not poll when activeJobsCount > 1', async () => { + it('polls when activeJobsCount > 1', async () => { mountUseQueuePolling() store.activeJobsCount = 2 - await vi.advanceTimersByTimeAsync(16_000) + await vi.advanceTimersByTimeAsync(8_000) - expect(store.update).not.toHaveBeenCalled() + expect(store.update).toHaveBeenCalledOnce() }) it('stops polling when activeJobsCount drops to 0', async () => { diff --git a/src/platform/remote/comfyui/useQueuePolling.ts b/src/platform/remote/comfyui/useQueuePolling.ts index c66c14a25a6..8625ba622c8 100644 --- a/src/platform/remote/comfyui/useQueuePolling.ts +++ b/src/platform/remote/comfyui/useQueuePolling.ts @@ -13,7 +13,7 @@ export function useQueuePolling() { const { start, stop } = useTimeoutFn( () => { - if (queueStore.activeJobsCount !== 1 || queueStore.isLoading) return + if (queueStore.activeJobsCount < 1 || queueStore.isLoading) return delay.value = Math.min(delay.value * BACKOFF_MULTIPLIER, MAX_INTERVAL_MS) void queueStore.update() }, @@ -22,7 +22,7 @@ export function useQueuePolling() { ) function scheduleNextPoll() { - if (queueStore.activeJobsCount === 1 && !queueStore.isLoading) start() + if (queueStore.activeJobsCount >= 1 && !queueStore.isLoading) start() else stop() } diff --git a/src/schemas/apiSchema.ts b/src/schemas/apiSchema.ts index e7e40abdf71..c15d92e194b 100644 --- a/src/schemas/apiSchema.ts +++ b/src/schemas/apiSchema.ts @@ -10,6 +10,7 @@ import { LinkReleaseTriggerAction } from '@/types/searchBoxTypes' const zNodeType = z.string() const zJobId = z.string() export type JobId = z.infer +const zWorkflowId = z.string() export const resultItemType = z.enum(['input', 'output', 'temp']) export type ResultItemType = z.infer @@ -56,6 +57,7 @@ const zProgressWsMessage = z.object({ value: z.number().int(), max: z.number().int(), prompt_id: zJobId, + workflow_id: zWorkflowId.optional(), node: zNodeId }) @@ -65,6 +67,7 @@ const zNodeProgressState = z.object({ state: z.enum(['pending', 'running', 'finished', 'error']), node_id: zNodeId, prompt_id: zJobId, + workflow_id: zWorkflowId.optional(), display_node_id: zNodeId.optional(), parent_node_id: zNodeId.optional(), real_node_id: zNodeId.optional() @@ -72,13 +75,15 @@ const zNodeProgressState = z.object({ const zProgressStateWsMessage = z.object({ prompt_id: zJobId, + workflow_id: zWorkflowId.optional(), nodes: z.record(zNodeId, zNodeProgressState) }) const zExecutingWsMessage = z.object({ node: zNodeId, display_node: zNodeId, - prompt_id: zJobId + prompt_id: zJobId, + workflow_id: zWorkflowId.optional() }) const zExecutedWsMessage = zExecutingWsMessage.extend({ @@ -88,6 +93,7 @@ const zExecutedWsMessage = zExecutingWsMessage.extend({ const zExecutionWsMessageBase = z.object({ prompt_id: zJobId, + workflow_id: zWorkflowId.optional(), timestamp: z.number().int() }) @@ -115,7 +121,8 @@ const zExecutionErrorWsMessage = zExecutionWsMessageBase.extend({ const zProgressTextWsMessage = z.object({ nodeId: zNodeId, text: z.string(), - prompt_id: z.string().optional() + prompt_id: z.string().optional(), + workflow_id: zWorkflowId.optional() }) const zNotificationWsMessage = z.object({ diff --git a/src/stores/executionStore.test.ts b/src/stores/executionStore.test.ts index dc143d844ff..e0b041372d5 100644 --- a/src/stores/executionStore.test.ts +++ b/src/stores/executionStore.test.ts @@ -11,12 +11,22 @@ const { mockNodeExecutionIdToNodeLocatorId, mockNodeIdToNodeLocatorId, mockNodeLocatorIdToNodeExecutionId, - mockShowTextPreview + mockShowTextPreview, + mockActiveWorkflow, + mockRevokePreviewsByExecutionId } = vi.hoisted(() => ({ mockNodeExecutionIdToNodeLocatorId: vi.fn(), mockNodeIdToNodeLocatorId: vi.fn(), mockNodeLocatorIdToNodeExecutionId: vi.fn(), - mockShowTextPreview: vi.fn() + mockShowTextPreview: vi.fn(), + mockActiveWorkflow: { + current: null as null | { + activeState?: { id?: string } + initialState?: { id?: string } + path?: string + } + }, + mockRevokePreviewsByExecutionId: vi.fn() })) import type * as WorkflowStoreModule from '@/platform/workflow/management/stores/workflowStore' @@ -35,7 +45,10 @@ vi.mock('@/platform/workflow/management/stores/workflowStore', async () => { useWorkflowStore: vi.fn(() => ({ nodeExecutionIdToNodeLocatorId: mockNodeExecutionIdToNodeLocatorId, nodeIdToNodeLocatorId: mockNodeIdToNodeLocatorId, - nodeLocatorIdToNodeExecutionId: mockNodeLocatorIdToNodeExecutionId + nodeLocatorIdToNodeExecutionId: mockNodeLocatorIdToNodeExecutionId, + get activeWorkflow() { + return mockActiveWorkflow.current + } })) } }) @@ -70,9 +83,9 @@ vi.mock('@/scripts/api', () => ({ } })) -vi.mock('@/stores/imagePreviewStore', () => ({ +vi.mock('@/stores/nodeOutputStore', () => ({ useNodeOutputStore: () => ({ - revokePreviewsByExecutionId: vi.fn() + revokePreviewsByExecutionId: mockRevokePreviewsByExecutionId }) })) @@ -440,6 +453,599 @@ describe('useExecutionStore - reconcileInitializingJobs', () => { }) }) +describe('useExecutionStore - active workflow gating of progress mirror', () => { + let store: ReturnType + + function makeProgressNodes( + nodeId: string, + jobId: string + ): Record { + return { + [nodeId]: { + value: 5, + max: 10, + state: 'running', + node_id: nodeId, + prompt_id: jobId, + display_node_id: nodeId + } + } + } + + function fireProgressState( + jobId: string, + nodes: Record, + workflowId?: string + ) { + const handler = apiEventHandlers.get('progress_state') + if (!handler) throw new Error('progress_state handler not bound') + handler( + new CustomEvent('progress_state', { + detail: { nodes, prompt_id: jobId, workflow_id: workflowId } + }) + ) + } + + beforeEach(() => { + vi.clearAllMocks() + apiEventHandlers.clear() + mockActiveWorkflow.current = null + setActivePinia(createTestingPinia({ stubActions: false })) + store = useExecutionStore() + store.bindExecutionEvents() + }) + + it('updates per-job progress regardless of active workflow', () => { + mockActiveWorkflow.current = { + activeState: { id: 'wf-active' }, + path: '/wf-active.json' + } + + fireProgressState( + 'job-other', + makeProgressNodes('1', 'job-other'), + 'wf-other' + ) + + expect(store.nodeProgressStatesByJob).toHaveProperty('job-other') + }) + + it('skips preview revocation for non-active workflow messages', () => { + mockActiveWorkflow.current = { + activeState: { id: 'wf-active' }, + path: '/wf-active.json' + } + mockRevokePreviewsByExecutionId.mockClear() + + fireProgressState( + 'job-other', + makeProgressNodes('1', 'job-other'), + 'wf-other' + ) + + expect(mockRevokePreviewsByExecutionId).not.toHaveBeenCalled() + }) + + it('revokes previews for active workflow messages', () => { + mockActiveWorkflow.current = { + activeState: { id: 'wf-active' }, + path: '/wf-active.json' + } + mockRevokePreviewsByExecutionId.mockClear() + + fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-active') + + expect(mockRevokePreviewsByExecutionId).toHaveBeenCalledWith('1') + }) + + it('skips global mirror when message workflow_id mismatches active workflow', () => { + mockActiveWorkflow.current = { + activeState: { id: 'wf-active' }, + path: '/wf-active.json' + } + + fireProgressState( + 'job-other', + makeProgressNodes('1', 'job-other'), + 'wf-other' + ) + + expect(store.nodeProgressStates).toEqual({}) + }) + + it('updates global mirror when message workflow_id matches active workflow', () => { + mockActiveWorkflow.current = { + activeState: { id: 'wf-active' }, + path: '/wf-active.json' + } + + fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-active') + + expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-1')) + }) + + it('falls back to jobIdToWorkflowId mapping when message has no workflow_id', () => { + mockActiveWorkflow.current = { + activeState: { id: 'wf-active' }, + path: '/wf-active.json' + } + store.registerJobWorkflowIdMapping('job-other', 'wf-other') + + fireProgressState('job-other', makeProgressNodes('1', 'job-other')) + + expect(store.nodeProgressStates).toEqual({}) + }) + + it('falls back to session path mapping when message has no workflow_id and no id mapping', () => { + mockActiveWorkflow.current = { + activeState: { id: 'wf-active' }, + path: '/wf-active.json' + } + store.ensureSessionWorkflowPath('job-other', '/wf-other.json') + + fireProgressState('job-other', makeProgressNodes('1', 'job-other')) + + expect(store.nodeProgressStates).toEqual({}) + }) + + it('updates mirror when no resolution is available (preserves single-tab behaviour)', () => { + mockActiveWorkflow.current = { + activeState: { id: 'wf-active' }, + path: '/wf-active.json' + } + + fireProgressState('job-unknown', makeProgressNodes('1', 'job-unknown')) + + expect(store.nodeProgressStates).toEqual( + makeProgressNodes('1', 'job-unknown') + ) + }) + + it('updates mirror when there is no active workflow', () => { + mockActiveWorkflow.current = null + + fireProgressState('job-1', makeProgressNodes('1', 'job-1'), 'wf-1') + + expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-1')) + }) + + it('skips _executingNodeProgress on workflow_id mismatch', () => { + mockActiveWorkflow.current = { + activeState: { id: 'wf-active' }, + path: '/wf-active.json' + } + + const handler = apiEventHandlers.get('progress') + if (!handler) throw new Error('progress handler not bound') + handler( + new CustomEvent('progress', { + detail: { + value: 5, + max: 10, + prompt_id: 'job-other', + node: '1', + workflow_id: 'wf-other' + } + }) + ) + + expect(store._executingNodeProgress).toBeNull() + }) + + it('updates _executingNodeProgress on workflow_id match', () => { + mockActiveWorkflow.current = { + activeState: { id: 'wf-active' }, + path: '/wf-active.json' + } + + const handler = apiEventHandlers.get('progress') + if (!handler) throw new Error('progress handler not bound') + handler( + new CustomEvent('progress', { + detail: { + value: 7, + max: 10, + prompt_id: 'job-1', + node: '1', + workflow_id: 'wf-active' + } + }) + ) + + expect(store._executingNodeProgress).toEqual({ + value: 7, + max: 10, + prompt_id: 'job-1', + node: '1', + workflow_id: 'wf-active' + }) + }) +}) + +describe('useExecutionStore - reconcileMirrorForActiveWorkflow', () => { + let store: ReturnType + + function makeProgressNodes( + nodeId: string, + jobId: string + ): Record { + return { + [nodeId]: { + value: 5, + max: 10, + state: 'running', + node_id: nodeId, + prompt_id: jobId, + display_node_id: nodeId + } + } + } + + function fireProgressState( + jobId: string, + nodes: Record, + workflowId?: string + ) { + const handler = apiEventHandlers.get('progress_state') + if (!handler) throw new Error('progress_state handler not bound') + handler( + new CustomEvent('progress_state', { + detail: { nodes, prompt_id: jobId, workflow_id: workflowId } + }) + ) + } + + beforeEach(() => { + vi.clearAllMocks() + apiEventHandlers.clear() + mockActiveWorkflow.current = null + setActivePinia(createTestingPinia({ stubActions: false })) + store = useExecutionStore() + store.bindExecutionEvents() + }) + + it('rebuilds the mirror from the active workflow job on tab switch', () => { + mockActiveWorkflow.current = { + activeState: { id: 'wf-a' }, + path: '/wf-a.json' + } + fireProgressState('job-a', makeProgressNodes('1', 'job-a'), 'wf-a') + store.registerJobWorkflowIdMapping('job-a', 'wf-a') + + mockActiveWorkflow.current = { + activeState: { id: 'wf-b' }, + path: '/wf-b.json' + } + fireProgressState('job-b', makeProgressNodes('2', 'job-b'), 'wf-b') + store.registerJobWorkflowIdMapping('job-b', 'wf-b') + + expect(store.nodeProgressStates).toEqual(makeProgressNodes('2', 'job-b')) + + mockActiveWorkflow.current = { + activeState: { id: 'wf-a' }, + path: '/wf-a.json' + } + store.reconcileMirrorForActiveWorkflow() + + expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-a')) + }) + + it('clears the mirror when the active workflow has no matching job', () => { + mockActiveWorkflow.current = { + activeState: { id: 'wf-a' }, + path: '/wf-a.json' + } + fireProgressState('job-a', makeProgressNodes('1', 'job-a'), 'wf-a') + store.registerJobWorkflowIdMapping('job-a', 'wf-a') + + expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-a')) + + mockActiveWorkflow.current = { + activeState: { id: 'wf-empty' }, + path: '/wf-empty.json' + } + store.reconcileMirrorForActiveWorkflow() + + expect(store.nodeProgressStates).toEqual({}) + }) + + it('clears _executingNodeProgress that belonged to a different job', () => { + mockActiveWorkflow.current = { + activeState: { id: 'wf-a' }, + path: '/wf-a.json' + } + fireProgressState('job-a', makeProgressNodes('1', 'job-a'), 'wf-a') + store.registerJobWorkflowIdMapping('job-a', 'wf-a') + + const progressHandler = apiEventHandlers.get('progress') + if (!progressHandler) throw new Error('progress handler not bound') + progressHandler( + new CustomEvent('progress', { + detail: { + value: 5, + max: 10, + prompt_id: 'job-a', + node: '1', + workflow_id: 'wf-a' + } + }) + ) + + mockActiveWorkflow.current = { + activeState: { id: 'wf-empty' }, + path: '/wf-empty.json' + } + store.reconcileMirrorForActiveWorkflow() + + expect(store._executingNodeProgress).toBeNull() + }) + + it('falls back to session path mapping when workflow id is not registered', () => { + mockActiveWorkflow.current = { + activeState: { id: 'wf-a' }, + path: '/wf-a.json' + } + fireProgressState('job-a', makeProgressNodes('1', 'job-a'), 'wf-a') + store.ensureSessionWorkflowPath('job-a', '/wf-a.json') + + mockActiveWorkflow.current = { + activeState: { id: 'wf-b' }, + path: '/wf-b.json' + } + fireProgressState('job-b', makeProgressNodes('2', 'job-b'), 'wf-b') + store.ensureSessionWorkflowPath('job-b', '/wf-b.json') + + mockActiveWorkflow.current = { + activeState: { id: 'wf-a' }, + path: '/wf-a.json' + } + store.reconcileMirrorForActiveWorkflow() + + expect(store.nodeProgressStates).toEqual(makeProgressNodes('1', 'job-a')) + }) +}) + +describe('useExecutionStore - reconcileTerminalJobs', () => { + let store: ReturnType + + function makeProgressNodes( + nodeId: string, + jobId: string + ): Record { + return { + [nodeId]: { + value: 5, + max: 10, + state: 'running', + node_id: nodeId, + prompt_id: jobId, + display_node_id: nodeId + } + } + } + + function fireProgressState( + jobId: string, + nodes: Record + ) { + const handler = apiEventHandlers.get('progress_state') + if (!handler) throw new Error('progress_state handler not bound') + handler( + new CustomEvent('progress_state', { detail: { nodes, prompt_id: jobId } }) + ) + } + + function fireExecutionStart(jobId: string) { + const handler = apiEventHandlers.get('execution_start') + if (!handler) throw new Error('execution_start handler not bound') + handler( + new CustomEvent('execution_start', { detail: { prompt_id: jobId } }) + ) + } + + beforeEach(() => { + vi.clearAllMocks() + apiEventHandlers.clear() + setActivePinia(createTestingPinia({ stubActions: false })) + store = useExecutionStore() + store.bindExecutionEvents() + }) + + it('evicts a non-active terminal job without disturbing the active job', () => { + fireExecutionStart('job-old') + fireProgressState('job-old', makeProgressNodes('1', 'job-old')) + + fireExecutionStart('job-new') + fireProgressState('job-new', makeProgressNodes('2', 'job-new')) + + expect(store.activeJobId).toBe('job-new') + expect(store.nodeProgressStatesByJob).toHaveProperty('job-old') + expect(store.nodeProgressStatesByJob).toHaveProperty('job-new') + + store.reconcileTerminalJobs(new Set(['job-new']), new Set(['job-old'])) + + expect(store.nodeProgressStatesByJob).not.toHaveProperty('job-old') + expect(store.nodeProgressStatesByJob).toHaveProperty('job-new') + expect(store.activeJobId).toBe('job-new') + expect(store.nodeProgressStates).toEqual(makeProgressNodes('2', 'job-new')) + }) + + it('evicts an active terminal job and clears global mirror', () => { + fireExecutionStart('job-1') + fireProgressState('job-1', makeProgressNodes('1', 'job-1')) + + expect(store.activeJobId).toBe('job-1') + expect(Object.keys(store.nodeProgressStates)).toHaveLength(1) + + store.reconcileTerminalJobs(new Set(), new Set(['job-1'])) + + expect(store.nodeProgressStatesByJob).not.toHaveProperty('job-1') + expect(store.activeJobId).toBeNull() + expect(store.nodeProgressStates).toEqual({}) + }) + + it('clears stale global mirror when its owner job becomes terminal', () => { + fireExecutionStart('job-old') + fireProgressState('job-old', makeProgressNodes('1', 'job-old')) + + fireExecutionStart('job-new') + expect(store.activeJobId).toBe('job-new') + expect(store.nodeProgressStates['1']?.prompt_id).toBe('job-old') + + store.reconcileTerminalJobs(new Set(['job-new']), new Set(['job-old'])) + + expect(store.nodeProgressStates).toEqual({}) + expect(store.activeJobId).toBe('job-new') + expect(store.nodeProgressStatesByJob).not.toHaveProperty('job-old') + }) + + it('skips jobs that are still active even if also in terminal set', () => { + fireExecutionStart('job-1') + fireProgressState('job-1', makeProgressNodes('1', 'job-1')) + + store.reconcileTerminalJobs(new Set(['job-1']), new Set(['job-1'])) + + expect(store.nodeProgressStatesByJob).toHaveProperty('job-1') + expect(store.activeJobId).toBe('job-1') + }) + + it('skips jobs absent from the terminal set', () => { + fireExecutionStart('job-1') + fireProgressState('job-1', makeProgressNodes('1', 'job-1')) + + store.reconcileTerminalJobs(new Set(), new Set()) + + expect(store.nodeProgressStatesByJob).toHaveProperty('job-1') + expect(store.activeJobId).toBe('job-1') + }) + + it('is idempotent for an already-cleared job', () => { + store.reconcileTerminalJobs(new Set(), new Set(['job-1'])) + store.reconcileTerminalJobs(new Set(), new Set(['job-1'])) + + expect(store.nodeProgressStatesByJob).not.toHaveProperty('job-1') + expect(store.activeJobId).toBeNull() + }) + + it('evicts initializing-only jobs that landed in history without progress events', () => { + store.initializingJobIds = new Set(['job-init']) + + store.reconcileTerminalJobs(new Set(), new Set(['job-init'])) + + expect(store.initializingJobIds.has('job-init')).toBe(false) + }) +}) + +describe('useExecutionStore - terminal WS handlers do not clobber active job', () => { + let store: ReturnType + + function makeProgressNodes( + nodeId: string, + jobId: string + ): Record { + return { + [nodeId]: { + value: 5, + max: 10, + state: 'running', + node_id: nodeId, + prompt_id: jobId, + display_node_id: nodeId + } + } + } + + function fireProgressState( + jobId: string, + nodes: Record, + workflowId?: string + ) { + const handler = apiEventHandlers.get('progress_state') + if (!handler) throw new Error('progress_state handler not bound') + handler( + new CustomEvent('progress_state', { + detail: { nodes, prompt_id: jobId, workflow_id: workflowId } + }) + ) + } + + function fireExecutionStart(jobId: string) { + const handler = apiEventHandlers.get('execution_start') + if (!handler) throw new Error('execution_start handler not bound') + handler( + new CustomEvent('execution_start', { detail: { prompt_id: jobId } }) + ) + } + + function fireExecutionSuccess(jobId: string) { + const handler = apiEventHandlers.get('execution_success') + if (!handler) throw new Error('execution_success handler not bound') + handler( + new CustomEvent('execution_success', { detail: { prompt_id: jobId } }) + ) + } + + function fireExecutionInterrupted(jobId: string) { + const handler = apiEventHandlers.get('execution_interrupted') + if (!handler) throw new Error('execution_interrupted handler not bound') + handler( + new CustomEvent('execution_interrupted', { + detail: { prompt_id: jobId, node_id: '1', node_type: 'X', executed: [] } + }) + ) + } + + beforeEach(() => { + vi.clearAllMocks() + apiEventHandlers.clear() + mockActiveWorkflow.current = null + setActivePinia(createTestingPinia({ stubActions: false })) + store = useExecutionStore() + store.bindExecutionEvents() + }) + + it('execution_success for a non-active job does not clobber the active job mirror', () => { + fireExecutionStart('job-old') + fireProgressState('job-old', makeProgressNodes('1', 'job-old')) + + fireExecutionStart('job-new') + fireProgressState('job-new', makeProgressNodes('2', 'job-new')) + + expect(store.activeJobId).toBe('job-new') + + fireExecutionSuccess('job-old') + + expect(store.activeJobId).toBe('job-new') + expect(store.nodeProgressStates).toEqual(makeProgressNodes('2', 'job-new')) + expect(store.nodeProgressStatesByJob).not.toHaveProperty('job-old') + expect(store.nodeProgressStatesByJob).toHaveProperty('job-new') + }) + + it('execution_interrupted for a non-active job does not clobber the active job', () => { + fireExecutionStart('job-old') + fireProgressState('job-old', makeProgressNodes('1', 'job-old')) + + fireExecutionStart('job-new') + fireProgressState('job-new', makeProgressNodes('2', 'job-new')) + + fireExecutionInterrupted('job-old') + + expect(store.activeJobId).toBe('job-new') + expect(store.nodeProgressStates).toEqual(makeProgressNodes('2', 'job-new')) + expect(store.nodeProgressStatesByJob).not.toHaveProperty('job-old') + }) + + it('execution_success for the active job clears the global mirror and activeJobId', () => { + fireExecutionStart('job-1') + fireProgressState('job-1', makeProgressNodes('1', 'job-1')) + + fireExecutionSuccess('job-1') + + expect(store.activeJobId).toBeNull() + expect(store.nodeProgressStates).toEqual({}) + }) +}) + describe('useExecutionStore - progress_text startup guard', () => { let store: ReturnType @@ -447,6 +1053,7 @@ describe('useExecutionStore - progress_text startup guard', () => { nodeId: string text: string prompt_id?: string + workflow_id?: string }) { const handler = apiEventHandlers.get('progress_text') if (!handler) throw new Error('progress_text handler not bound') @@ -456,6 +1063,7 @@ describe('useExecutionStore - progress_text startup guard', () => { beforeEach(() => { vi.clearAllMocks() apiEventHandlers.clear() + mockActiveWorkflow.current = null setActivePinia(createTestingPinia({ stubActions: false })) store = useExecutionStore() store.bindExecutionEvents() @@ -488,6 +1096,50 @@ describe('useExecutionStore - progress_text startup guard', () => { expect(mockShowTextPreview).toHaveBeenCalledWith(mockNode, 'warming up') }) + + it('should skip progress_text whose workflow_id mismatches active workflow', async () => { + mockActiveWorkflow.current = { + activeState: { id: 'wf-active' }, + path: '/wf-active.json' + } + const mockNode = createMockLGraphNode({ id: 1 }) + const { useCanvasStore } = + await import('@/renderer/core/canvas/canvasStore') + useCanvasStore().canvas = { + graph: { getNodeById: vi.fn(() => mockNode) } + } as unknown as LGraphCanvas + + fireProgressText({ + nodeId: '1', + text: 'warming up', + prompt_id: 'job-other', + workflow_id: 'wf-other' + }) + + expect(mockShowTextPreview).not.toHaveBeenCalled() + }) + + it('should call showTextPreview when workflow_id matches active workflow', async () => { + mockActiveWorkflow.current = { + activeState: { id: 'wf-active' }, + path: '/wf-active.json' + } + const mockNode = createMockLGraphNode({ id: 1 }) + const { useCanvasStore } = + await import('@/renderer/core/canvas/canvasStore') + useCanvasStore().canvas = { + graph: { getNodeById: vi.fn(() => mockNode) } + } as unknown as LGraphCanvas + + fireProgressText({ + nodeId: '1', + text: 'warming up', + prompt_id: 'job-1', + workflow_id: 'wf-active' + }) + + expect(mockShowTextPreview).toHaveBeenCalledWith(mockNode, 'warming up') + }) }) describe('useExecutionErrorStore - Node Error Lookups', () => { diff --git a/src/stores/executionStore.ts b/src/stores/executionStore.ts index 1df4702858e..ff256ec10a7 100644 --- a/src/stores/executionStore.ts +++ b/src/stores/executionStore.ts @@ -1,5 +1,5 @@ import { defineStore } from 'pinia' -import { computed, ref, shallowRef } from 'vue' +import { computed, ref, shallowRef, watch } from 'vue' import { useNodeProgressText } from '@/composables/node/useNodeProgressText' import { isCloud } from '@/platform/distribution/types' @@ -273,6 +273,10 @@ export const useExecutionStore = defineStore('execution', () => { ) { const jobId = e.detail.prompt_id if (activeJobId.value) clearInitializationByJobId(activeJobId.value) + if (jobId !== activeJobId.value) { + evictTerminalJob(jobId) + return + } resetExecutionState(jobId) } @@ -288,6 +292,10 @@ export const useExecutionStore = defineStore('execution', () => { }) } const jobId = e.detail.prompt_id + if (jobId !== activeJobId.value) { + evictTerminalJob(jobId) + return + } resetExecutionState(jobId) } @@ -335,43 +343,146 @@ export const useExecutionStore = defineStore('execution', () => { } function handleProgressState(e: CustomEvent) { - const { nodes, prompt_id: jobId } = e.detail + const { nodes, prompt_id: jobId, workflow_id: messageWorkflowId } = e.detail + const isActiveWorkflowMessage = messageMatchesActiveWorkflow( + jobId, + messageWorkflowId + ) - // Revoke previews for nodes that are starting to execute const previousForJob = nodeProgressStatesByJob.value[jobId] || {} - for (const nodeId in nodes) { - const nodeState = nodes[nodeId] - if (nodeState.state === 'running' && !previousForJob[nodeId]) { - // This node just started executing, revoke its previews - // Note that we're doing the *actual* node id instead of the display node id - // here intentionally. That way, we don't clear the preview every time a new node - // within an expanded graph starts executing. - const { revokePreviewsByExecutionId } = useNodeOutputStore() - revokePreviewsByExecutionId(nodeId) + if (isActiveWorkflowMessage) { + const { revokePreviewsByExecutionId } = useNodeOutputStore() + for (const nodeId in nodes) { + const nodeState = nodes[nodeId] + if (nodeState.state === 'running' && !previousForJob[nodeId]) { + revokePreviewsByExecutionId(nodeId) + } } } - // Update the progress states for all nodes nodeProgressStatesByJob.value = { ...nodeProgressStatesByJob.value, [jobId]: nodes } evictOldProgressJobs() - nodeProgressStates.value = nodes - - // If we have progress for the currently executing node, update it for backwards compatibility - if (executingNodeId.value && nodes[executingNodeId.value]) { - const nodeState = nodes[executingNodeId.value] - _executingNodeProgress.value = { - value: nodeState.value, - max: nodeState.max, - prompt_id: nodeState.prompt_id, - node: nodeState.display_node_id || nodeState.node_id + + if (isActiveWorkflowMessage) { + nodeProgressStates.value = nodes + + if (executingNodeId.value && nodes[executingNodeId.value]) { + const nodeState = nodes[executingNodeId.value] + _executingNodeProgress.value = { + value: nodeState.value, + max: nodeState.max, + prompt_id: nodeState.prompt_id, + node: nodeState.display_node_id || nodeState.node_id + } + } + } + } + + /** + * Determines whether a WebSocket execution message belongs to the + * currently active workflow tab. Used to gate writes to the global + * "current execution" mirror so a job initiated from another open + * workflow cannot leak its progress into the active one. + * + * Resolution order: + * 1. `workflow_id` carried on the WS message (when backend supports it). + * 2. {@link jobIdToWorkflowId} mapping populated when the job was queued + * from this tab. + * 3. {@link jobIdToSessionWorkflowPath} mapping (path-based fallback). + * + * When the workflow cannot be resolved at all (e.g. job queued in a + * different browser session), the message is treated as belonging to + * the active workflow to preserve current behaviour for the existing + * single-tab common case. + */ + function messageMatchesActiveWorkflow( + jobId: JobId, + messageWorkflowId: string | undefined + ): boolean { + const activeWorkflow = workflowStore.activeWorkflow + if (!activeWorkflow) return true + + const activeId = + activeWorkflow.activeState?.id ?? activeWorkflow.initialState?.id ?? null + + if (messageWorkflowId && activeId) { + return messageWorkflowId === activeId + } + + const mappedId = jobIdToWorkflowId.value.get(jobId) + if (mappedId && activeId) return mappedId === activeId + + const mappedPath = jobIdToSessionWorkflowPath.value.get(jobId) + if (mappedPath && activeWorkflow.path) { + return mappedPath === activeWorkflow.path + } + + return true + } + + /** + * Rebuilds the global progress mirror to match the currently active + * workflow tab. Called when the user switches tabs so stale progress + * from the previously active workflow does not bleed into the new one. + * + * Picks the most recent job whose mapping resolves to the active + * workflow and replays its `nodeProgressStatesByJob` entry into the + * mirror; clears the mirror entirely when no such job exists. + */ + function reconcileMirrorForActiveWorkflow() { + const activeWorkflow = workflowStore.activeWorkflow + if (!activeWorkflow) return + + const activeId = + activeWorkflow.activeState?.id ?? activeWorkflow.initialState?.id ?? null + const activePath = activeWorkflow.path ?? null + + const jobIds = Object.keys(nodeProgressStatesByJob.value) + let matchedJobId: JobId | null = null + for (let i = jobIds.length - 1; i >= 0; i--) { + const jobId = jobIds[i] + const mappedId = jobIdToWorkflowId.value.get(jobId) + const mappedPath = jobIdToSessionWorkflowPath.value.get(jobId) + const idMatch = activeId !== null && mappedId === activeId + const pathMatch = activePath !== null && mappedPath === activePath + if (idMatch || pathMatch) { + matchedJobId = jobId + break } } + + if (matchedJobId) { + const nodes = nodeProgressStatesByJob.value[matchedJobId] ?? {} + nodeProgressStates.value = nodes + executionIdToLocatorCache.clear() + if ( + _executingNodeProgress.value && + _executingNodeProgress.value.prompt_id !== matchedJobId + ) { + _executingNodeProgress.value = null + } + } else { + if (Object.keys(nodeProgressStates.value).length > 0) { + nodeProgressStates.value = {} + executionIdToLocatorCache.clear() + } + _executingNodeProgress.value = null + } } + watch( + () => workflowStore.activeWorkflow, + () => { + reconcileMirrorForActiveWorkflow() + } + ) + function handleProgress(e: CustomEvent) { + const { prompt_id: jobId, workflow_id: messageWorkflowId } = e.detail + if (!messageMatchesActiveWorkflow(jobId, messageWorkflowId)) return _executingNodeProgress.value = e.detail } @@ -384,6 +495,20 @@ export const useExecutionStore = defineStore('execution', () => { } } + /** + * Routes a terminal cleanup to the correct primitive: `evictTerminalJob` + * for non-active jobs (safe for any jobId, never clobbers another running + * job's mirror) and `resetExecutionState` for the active job (clears the + * global mirror that the active job owns). + */ + function terminateJob(jobId: JobId) { + if (jobId !== activeJobId.value) { + evictTerminalJob(jobId) + return + } + resetExecutionState(jobId) + } + function handleExecutionError(e: CustomEvent) { if (isCloud) { useTelemetry()?.trackExecutionError({ @@ -404,7 +529,7 @@ export const useExecutionStore = defineStore('execution', () => { // OSS path / Cloud fallback (real runtime errors) executionErrorStore.lastExecutionError = e.detail clearInitializationByJobId(e.detail.prompt_id) - resetExecutionState(e.detail.prompt_id) + terminateJob(e.detail.prompt_id) } function handleServiceLevelError(detail: ExecutionErrorWsMessage): boolean { @@ -413,7 +538,7 @@ export const useExecutionStore = defineStore('execution', () => { return false clearInitializationByJobId(detail.prompt_id) - resetExecutionState(detail.prompt_id) + terminateJob(detail.prompt_id) executionErrorStore.lastPromptError = { type: detail.exception_type ?? 'error', message: detail.exception_type @@ -431,7 +556,7 @@ export const useExecutionStore = defineStore('execution', () => { if (!result) return false clearInitializationByJobId(detail.prompt_id) - resetExecutionState(detail.prompt_id) + terminateJob(detail.prompt_id) if (result.kind === 'nodeErrors') { executionErrorStore.lastNodeErrors = result.nodeErrors @@ -485,6 +610,103 @@ export const useExecutionStore = defineStore('execution', () => { clearInitializationByJobIds(orphaned) } + /** + * Safely evict per-job execution artifacts for a job that has reached a + * terminal state, without disturbing state belonging to a different + * currently-running job. + * + * Unlike {@link resetExecutionState}, this is safe to call for any jobId, + * including jobs that are not the {@link activeJobId}. It is the polling + * fallback for the case where a WebSocket terminal message + * (`execution_success` / `execution_error` / `execution_interrupted`) is + * dropped and per-job UI state would otherwise remain stuck. + * + * Behaviour: + * - Always removes the job's per-job entries + * ({@link nodeProgressStatesByJob}, {@link queuedJobs}, preview). + * - Clears the global "current execution" mirror + * ({@link nodeProgressStates}, {@link _executingNodeProgress}, + * {@link activeJobId}) only when those still belong to the evicted job. + * - Idempotent: calling for an already-cleared job is a no-op. + */ + function evictTerminalJob(jobId: JobId) { + if (!jobId) return + + const hadProgress = jobId in nodeProgressStatesByJob.value + if (hadProgress) { + const map = { ...nodeProgressStatesByJob.value } + delete map[jobId] + nodeProgressStatesByJob.value = map + } + + if (jobId in queuedJobs.value) { + const next = { ...queuedJobs.value } + delete next[jobId] + queuedJobs.value = next + } + + useJobPreviewStore().clearPreview(jobId) + clearInitializationByJobId(jobId) + + const isActive = activeJobId.value === jobId + const mirrorBelongsToEvicted = mirrorOwnerJobId() === jobId + + if (isActive || mirrorBelongsToEvicted) { + nodeProgressStates.value = {} + executionIdToLocatorCache.clear() + } + + if ( + _executingNodeProgress.value && + _executingNodeProgress.value.prompt_id === jobId + ) { + _executingNodeProgress.value = null + } + + if (isActive) { + activeJobId.value = null + executionErrorStore.clearPromptError() + } + } + + /** + * Returns the prompt_id that the global {@link nodeProgressStates} mirror + * currently belongs to, or `null` when the mirror is empty. + * + * The mirror is replaced wholesale on every `progress_state` message, so + * all entries within it always share a single prompt_id; reading the + * first entry is sufficient. + */ + function mirrorOwnerJobId(): JobId | null { + const first = Object.values(nodeProgressStates.value)[0] + return first?.prompt_id ?? null + } + + /** + * Reconcile per-job progress state against the authoritative job sets from + * the backend (running/pending vs. terminal). Used by the queue polling + * path to recover from dropped WebSocket terminal messages. + * + * @param activeJobIds Jobs currently in Running or Pending on the backend. + * @param terminalJobIds Jobs in History (completed/failed/cancelled). + */ + function reconcileTerminalJobs( + activeJobIds: Set, + terminalJobIds: Set + ) { + const tracked = new Set([ + ...Object.keys(nodeProgressStatesByJob.value), + ...initializingJobIds.value + ]) + if (activeJobId.value) tracked.add(activeJobId.value) + + for (const jobId of tracked) { + if (activeJobIds.has(jobId)) continue + if (!terminalJobIds.has(jobId)) continue + evictTerminalJob(jobId) + } + } + function isJobInitializing(jobId: JobId | number | undefined): boolean { if (!jobId) return false return initializingJobIds.value.has(String(jobId)) @@ -519,14 +741,27 @@ export const useExecutionStore = defineStore('execution', () => { } function handleProgressText(e: CustomEvent) { - const { nodeId, text, prompt_id } = e.detail + const { nodeId, text, prompt_id, workflow_id } = e.detail if (!text || !nodeId) return - // Filter: only accept progress for the active prompt - if (prompt_id && activeJobId.value && prompt_id !== activeJobId.value) - return + // Prefer the workflow-ownership gate when we have any signal that lets + // us resolve it (workflow_id on the message, or a registered mapping). + // Only fall back to the legacy active-prompt guard when ownership is + // unresolvable, otherwise activeJobId pointing at a different workflow's + // job would incorrectly drop messages for the visible workflow. + if (prompt_id) { + const canResolveWorkflow = + Boolean(workflow_id) || + jobIdToWorkflowId.value.has(prompt_id) || + jobIdToSessionWorkflowPath.value.has(prompt_id) + + if (canResolveWorkflow) { + if (!messageMatchesActiveWorkflow(prompt_id, workflow_id)) return + } else if (activeJobId.value && prompt_id !== activeJobId.value) { + return + } + } - // Handle execution node IDs for subgraphs const currentId = getNodeIdIfExecuting(nodeId) if (!currentId) return const node = canvasStore.canvas?.graph?.getNodeById(currentId) @@ -643,6 +878,8 @@ export const useExecutionStore = defineStore('execution', () => { clearInitializationByJobId, clearInitializationByJobIds, reconcileInitializingJobs, + reconcileTerminalJobs, + reconcileMirrorForActiveWorkflow, bindExecutionEvents, unbindExecutionEvents, storeJob, diff --git a/src/stores/queueStore.test.ts b/src/stores/queueStore.test.ts index 9b5a62d2d6c..a2c9e04f29d 100644 --- a/src/stores/queueStore.test.ts +++ b/src/stores/queueStore.test.ts @@ -5,6 +5,7 @@ import { beforeEach, describe, expect, it, vi } from 'vitest' import type { JobListItem } from '@/platform/remote/comfyui/jobs/jobTypes' import type { TaskOutput } from '@/schemas/apiSchema' import { api } from '@/scripts/api' +import { useExecutionStore } from '@/stores/executionStore' import { TaskItemImpl, useQueueStore } from '@/stores/queueStore' // Fixture factory for JobListItem @@ -676,6 +677,34 @@ describe('useQueueStore', () => { // Should preserve array identity when history is unchanged expect(store.historyTasks).toBe(initialHistoryTasks) }) + + it('should reconcile terminal jobs when queue is empty but history is not', async () => { + const executionStore = useExecutionStore() + const reconcileSpy = vi.spyOn(executionStore, 'reconcileTerminalJobs') + const finishedJob = createHistoryJob(10, 'finished-job') + + mockGetQueue.mockResolvedValue({ Running: [], Pending: [] }) + mockGetHistory.mockResolvedValue([finishedJob]) + + await store.update() + + expect(reconcileSpy).toHaveBeenCalledTimes(1) + const [activeIds, terminalIds] = reconcileSpy.mock.calls[0] + expect(activeIds.size).toBe(0) + expect(terminalIds.has('finished-job')).toBe(true) + }) + + it('should not reconcile terminal jobs when history is empty', async () => { + const executionStore = useExecutionStore() + const reconcileSpy = vi.spyOn(executionStore, 'reconcileTerminalJobs') + + mockGetQueue.mockResolvedValue({ Running: [], Pending: [] }) + mockGetHistory.mockResolvedValue([]) + + await store.update() + + expect(reconcileSpy).not.toHaveBeenCalled() + }) }) describe('update() - maxHistoryItems limit', () => { diff --git a/src/stores/queueStore.ts b/src/stores/queueStore.ts index f0f660c1dba..4956c03c9ad 100644 --- a/src/stores/queueStore.ts +++ b/src/stores/queueStore.ts @@ -546,18 +546,28 @@ export const useQueueStore = defineStore('queue', () => { } }) - // Only reconcile when the queue fetch returned data. api.getQueue() - // returns empty Running/Pending on transient errors, which would - // incorrectly clear all initializing prompts. + const activeJobIds = new Set([ + ...queue.Running.map((j) => j.id), + ...queue.Pending.map((j) => j.id) + ]) + + // Only reconcile initializing jobs when the queue fetch returned data. + // api.getQueue() returns empty Running/Pending on transient errors, + // which would incorrectly clear all initializing prompts. const queueHasData = queue.Running.length > 0 || queue.Pending.length > 0 if (queueHasData) { - const activeJobIds = new Set([ - ...queue.Running.map((j) => j.id), - ...queue.Pending.map((j) => j.id) - ]) executionStore.reconcileInitializingJobs(activeJobIds) } + // Reconcile terminal jobs whenever history is non-empty. The last + // active job finishing legitimately produces empty Running/Pending, + // and terminal eviction is the only path that clears stuck node + // progress when WebSocket terminal messages are dropped. + if (history.length > 0) { + const terminalJobIds = new Set(history.map((j) => j.id)) + executionStore.reconcileTerminalJobs(activeJobIds, terminalJobIds) + } + // Sort by create_time descending and limit to maxItems const sortedHistory = [...history] .sort((a, b) => b.create_time - a.create_time)