Skip to content
Open
6 changes: 3 additions & 3 deletions src/platform/remote/comfyui/useQueuePolling.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ describe('useQueuePolling', () => {
expect(store.update).toHaveBeenCalledOnce()
})

it('does not poll when activeJobsCount > 1', async () => {
it('polls when activeJobsCount > 1', async () => {
mountUseQueuePolling()

store.activeJobsCount = 2
await vi.advanceTimersByTimeAsync(16_000)
await vi.advanceTimersByTimeAsync(8_000)

expect(store.update).not.toHaveBeenCalled()
expect(store.update).toHaveBeenCalledOnce()
})

it('stops polling when activeJobsCount drops to 0', async () => {
Expand Down
4 changes: 2 additions & 2 deletions src/platform/remote/comfyui/useQueuePolling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export function useQueuePolling() {

const { start, stop } = useTimeoutFn(
() => {
if (queueStore.activeJobsCount !== 1 || queueStore.isLoading) return
if (queueStore.activeJobsCount < 1 || queueStore.isLoading) return
delay.value = Math.min(delay.value * BACKOFF_MULTIPLIER, MAX_INTERVAL_MS)
void queueStore.update()
},
Expand All @@ -22,7 +22,7 @@ export function useQueuePolling() {
)

function scheduleNextPoll() {
if (queueStore.activeJobsCount === 1 && !queueStore.isLoading) start()
if (queueStore.activeJobsCount >= 1 && !queueStore.isLoading) start()
else stop()
}

Expand Down
11 changes: 9 additions & 2 deletions src/schemas/apiSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { LinkReleaseTriggerAction } from '@/types/searchBoxTypes'
const zNodeType = z.string()
const zJobId = z.string()
export type JobId = z.infer<typeof zJobId>
const zWorkflowId = z.string()
export const resultItemType = z.enum(['input', 'output', 'temp'])
export type ResultItemType = z.infer<typeof resultItemType>

Expand Down Expand Up @@ -56,6 +57,7 @@ const zProgressWsMessage = z.object({
value: z.number().int(),
max: z.number().int(),
prompt_id: zJobId,
workflow_id: zWorkflowId.optional(),
node: zNodeId
})

Expand All @@ -65,20 +67,23 @@ const zNodeProgressState = z.object({
state: z.enum(['pending', 'running', 'finished', 'error']),
node_id: zNodeId,
prompt_id: zJobId,
workflow_id: zWorkflowId.optional(),
display_node_id: zNodeId.optional(),
parent_node_id: zNodeId.optional(),
real_node_id: zNodeId.optional()
})

const zProgressStateWsMessage = z.object({
prompt_id: zJobId,
workflow_id: zWorkflowId.optional(),
nodes: z.record(zNodeId, zNodeProgressState)
})

const zExecutingWsMessage = z.object({
node: zNodeId,
display_node: zNodeId,
prompt_id: zJobId
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: since executing now carries workflow_id in the schema, the API dispatch path should preserve that metadata too. It currently dispatches only display_node || node, so downstream listeners cannot apply the same active-workflow gate as the other execution events and background executions can still clear active node progress or trigger redraws for the visible canvas.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right — the schema now describes a richer executing payload but api.ts still strips it down to display_node || node at line 731-734, and the type system at line 218-220 hard-codes executing to NodeId. Plumbing the full payload through requires changes in api.ts (dispatch + ApiToEventType), the handleExecuting handler signature, and existing test mocks. That's a wider structural change than I want to bundle into this PR (which is already covering schema + eviction + active-workflow gating).

Out of scope here, but I'll include this in the follow-up issue alongside the executed write gating (3178588877) and the queue/history race (3178588879). The current handleExecuting is partially safe today because it only mutates activeJob.value.nodes (which is gated through activeJobId indirection) — but you're correct that listeners can't apply the new workflow gate downstream until the dispatch carries the metadata. Tracked for the follow-up.

prompt_id: zJobId,
workflow_id: zWorkflowId.optional()
})

const zExecutedWsMessage = zExecutingWsMessage.extend({
Expand All @@ -88,6 +93,7 @@ const zExecutedWsMessage = zExecutingWsMessage.extend({

const zExecutionWsMessageBase = z.object({
prompt_id: zJobId,
workflow_id: zWorkflowId.optional(),
timestamp: z.number().int()
})

Expand Down Expand Up @@ -115,7 +121,8 @@ const zExecutionErrorWsMessage = zExecutionWsMessageBase.extend({
const zProgressTextWsMessage = z.object({
nodeId: zNodeId,
text: z.string(),
prompt_id: z.string().optional()
prompt_id: z.string().optional(),
workflow_id: zWorkflowId.optional()
Comment thread
coderabbitai[bot] marked this conversation as resolved.
})

const zNotificationWsMessage = z.object({
Expand Down
Loading
Loading