Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/platform/remote/comfyui/useQueuePolling.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ describe('useQueuePolling', () => {
expect(store.update).toHaveBeenCalledOnce()
})

it('does not poll when activeJobsCount > 1', async () => {
it('polls when activeJobsCount > 1', async () => {
mountUseQueuePolling()

store.activeJobsCount = 2
await vi.advanceTimersByTimeAsync(16_000)
await vi.advanceTimersByTimeAsync(8_000)

expect(store.update).not.toHaveBeenCalled()
expect(store.update).toHaveBeenCalledOnce()
})

it('stops polling when activeJobsCount drops to 0', async () => {
Expand Down
4 changes: 2 additions & 2 deletions src/platform/remote/comfyui/useQueuePolling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export function useQueuePolling() {

const { start, stop } = useTimeoutFn(
() => {
if (queueStore.activeJobsCount !== 1 || queueStore.isLoading) return
if (queueStore.activeJobsCount < 1 || queueStore.isLoading) return
delay.value = Math.min(delay.value * BACKOFF_MULTIPLIER, MAX_INTERVAL_MS)
void queueStore.update()
},
Expand All @@ -22,7 +22,7 @@ export function useQueuePolling() {
)

function scheduleNextPoll() {
if (queueStore.activeJobsCount === 1 && !queueStore.isLoading) start()
if (queueStore.activeJobsCount >= 1 && !queueStore.isLoading) start()
else stop()
}

Expand Down
11 changes: 9 additions & 2 deletions src/schemas/apiSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { LinkReleaseTriggerAction } from '@/types/searchBoxTypes'
const zNodeType = z.string()
const zJobId = z.string()
export type JobId = z.infer<typeof zJobId>
const zWorkflowId = z.string()
export const resultItemType = z.enum(['input', 'output', 'temp'])
export type ResultItemType = z.infer<typeof resultItemType>

Expand Down Expand Up @@ -56,6 +57,7 @@ const zProgressWsMessage = z.object({
value: z.number().int(),
max: z.number().int(),
prompt_id: zJobId,
workflow_id: zWorkflowId.optional(),
node: zNodeId
})

Expand All @@ -65,20 +67,23 @@ const zNodeProgressState = z.object({
state: z.enum(['pending', 'running', 'finished', 'error']),
node_id: zNodeId,
prompt_id: zJobId,
workflow_id: zWorkflowId.optional(),
display_node_id: zNodeId.optional(),
parent_node_id: zNodeId.optional(),
real_node_id: zNodeId.optional()
})

const zProgressStateWsMessage = z.object({
prompt_id: zJobId,
workflow_id: zWorkflowId.optional(),
nodes: z.record(zNodeId, zNodeProgressState)
})

const zExecutingWsMessage = z.object({
node: zNodeId,
display_node: zNodeId,
prompt_id: zJobId
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: since executing now carries workflow_id in the schema, the API dispatch path should preserve that metadata too. It currently dispatches only display_node || node, so downstream listeners cannot apply the same active-workflow gate as the other execution events and background executions can still clear active node progress or trigger redraws for the visible canvas.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right — the schema now describes a richer executing payload but api.ts still strips it down to display_node || node at line 731-734, and the type system at line 218-220 hard-codes executing to NodeId. Plumbing the full payload through requires changes in api.ts (dispatch + ApiToEventType), the handleExecuting handler signature, and existing test mocks. That's a wider structural change than I want to bundle into this PR (which is already covering schema + eviction + active-workflow gating).

Out of scope here, but I'll include this in the follow-up issue alongside the executed write gating (3178588877) and the queue/history race (3178588879). The current handleExecuting is partially safe today because it only mutates activeJob.value.nodes (which is gated through activeJobId indirection) — but you're correct that listeners can't apply the new workflow gate downstream until the dispatch carries the metadata. Tracked for the follow-up.

prompt_id: zJobId,
workflow_id: zWorkflowId.optional()
})

const zExecutedWsMessage = zExecutingWsMessage.extend({
Expand All @@ -88,6 +93,7 @@ const zExecutedWsMessage = zExecutingWsMessage.extend({

const zExecutionWsMessageBase = z.object({
prompt_id: zJobId,
workflow_id: zWorkflowId.optional(),
timestamp: z.number().int()
})

Expand Down Expand Up @@ -115,7 +121,8 @@ const zExecutionErrorWsMessage = zExecutionWsMessageBase.extend({
const zProgressTextWsMessage = z.object({
nodeId: zNodeId,
text: z.string(),
prompt_id: z.string().optional()
prompt_id: z.string().optional(),
workflow_id: zWorkflowId.optional()
Comment thread
coderabbitai[bot] marked this conversation as resolved.
})

const zNotificationWsMessage = z.object({
Expand Down
Loading
Loading