-
Notifications
You must be signed in to change notification settings - Fork 367
perf(web): isolate streaming text to fix main-thread jank #1111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
a297987
ab17799
b791a35
119eba8
318e752
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| --- | ||
| "@moonshot-ai/kimi-code": patch | ||
| --- | ||
|
|
||
| Keep the web chat responsive during long streaming replies by isolating live token text from the rest of the UI state, so it no longer stalls the main thread. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| <!-- apps/kimi-web/src/components/chat/StreamingBlocks.vue --> | ||
| <!-- | ||
| Renders the live (still-streaming) text/thinking blocks of the active | ||
| assistant message. This is the ONLY component that re-renders on each | ||
| `assistantDelta`: it subscribes to the fine-grained streaming store, so the | ||
| rest of the app (App, sidebar, the turn list) does not move on every token. | ||
|
|
||
| Mounted by ChatPane only for the turn that is currently streaming; unmounts | ||
| when the turn settles (the committed content in `messagesBySession` takes | ||
| over). | ||
| --> | ||
| <script setup lang="ts"> | ||
| import { computed } from 'vue'; | ||
| import Markdown from './Markdown.vue'; | ||
| import ThinkingBlock from './ThinkingBlock.vue'; | ||
| import { streamingBySession } from '../../composables/client/streamingStore'; | ||
| import type { FilePreviewRequest } from '../../types'; | ||
|
|
||
| const props = withDefaults( | ||
| defineProps<{ | ||
| sessionId: string; | ||
| turnId: string; | ||
| mobile?: boolean; | ||
| }>(), | ||
| { mobile: false }, | ||
| ); | ||
|
|
||
| const emit = defineEmits<{ | ||
| openFile: [target: FilePreviewRequest]; | ||
| openThinking: [target: { turnId: string; blockIndex: number }]; | ||
| }>(); | ||
|
|
||
| // Subscribe to this session's live blocks. Only this computed (and therefore | ||
| // only this component) is dirtied when a delta appends to the store. | ||
| const blocks = computed(() => streamingBySession[props.sessionId]?.blocks ?? []); | ||
| </script> | ||
|
|
||
| <template> | ||
| <template v-for="blk in blocks" :key="`stream-${blk.kind}-${blk.contentIndex}`"> | ||
| <ThinkingBlock | ||
| v-if="blk.kind === 'thinking'" | ||
| :text="blk.text" | ||
| :mobile="mobile" | ||
| :streaming="true" | ||
| @open="emit('openThinking', { turnId, blockIndex: blk.contentIndex })" | ||
| /> | ||
| <div v-else-if="blk.kind === 'text' && blk.text" class="msg"> | ||
| <Markdown :text="blk.text" :streaming="true" :open-file="(target) => emit('openFile', target)" /> | ||
|
Comment on lines
+47
to
+48
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When a user refreshes or reconnects mid-reply, Useful? React with 👍 / 👎. |
||
| </div> | ||
| </template> | ||
| </template> | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| // apps/kimi-web/src/composables/client/streamingStore.ts | ||
| // | ||
| // Fine-grained streaming-text store, kept OUTSIDE `rawState` on purpose. | ||
| // | ||
| // `assistantDelta` is the only genuinely high-frequency event (dozens to | ||
| // hundreds per second). Routing it through the immutable reducer + the coarse | ||
| // `rawState` graph makes every delta re-render the whole App and recompute the | ||
| // sidebar computeds (see the main-thread-jank investigation). Instead, deltas | ||
| // append here and only the single `StreamingBlocks` component subscribed to a | ||
| // session re-renders. | ||
| // | ||
| // Lifecycle: deltas append; `messageUpdated` (authoritative full content) and | ||
| // turn-end (`sessionStatusChanged` idle/aborted) clear the entry so the | ||
| // committed content in `messagesBySession` takes over without duplication. | ||
|
|
||
| import { reactive } from 'vue'; | ||
|
|
||
| export interface StreamingBlock { | ||
| contentIndex: number; | ||
| kind: 'text' | 'thinking'; | ||
| text: string; | ||
| } | ||
|
|
||
| export interface StreamingState { | ||
| /** id of the assistant message currently being streamed. */ | ||
| messageId: string; | ||
| /** Ordered live text/thinking blocks (always trailing in the message). */ | ||
| blocks: StreamingBlock[]; | ||
| } | ||
|
|
||
| /** | ||
| * Per-session live streaming state. A session has at most one in-flight | ||
| * assistant message (its trailing one), so a single entry per session suffices. | ||
| */ | ||
| export const streamingBySession = reactive<Record<string, StreamingState>>({}); | ||
|
|
||
| /** | ||
| * Append one `assistantDelta` to the streaming store. O(1): either mutates the | ||
| * trailing block's text in place (same contentIndex) or pushes a new block | ||
| * (new contentIndex, rare). Never touches `rawState`, so no heavy computed | ||
| * (`turns`, sidebar) is dirtied. | ||
| */ | ||
| export function appendStreamingDelta( | ||
| sessionId: string, | ||
| messageId: string, | ||
| contentIndex: number, | ||
| delta: { text?: string; thinking?: string }, | ||
| ): void { | ||
| let state = streamingBySession[sessionId]; | ||
| // A new assistant message (new step, or text resuming after a tool) starts a | ||
| // fresh entry — the previous message is already committed via messageUpdated. | ||
| if (!state || state.messageId !== messageId) { | ||
| state = streamingBySession[sessionId] = { messageId, blocks: [] }; | ||
| } | ||
|
|
||
| const kind: 'text' | 'thinking' = delta.text !== undefined ? 'text' : 'thinking'; | ||
| const chunk = delta.text ?? delta.thinking ?? ''; | ||
| if (chunk.length === 0) return; | ||
|
|
||
| const last = state.blocks.at(-1); | ||
| if (last && last.contentIndex === contentIndex && last.kind === kind) { | ||
| last.text += chunk; | ||
| } else { | ||
| state.blocks.push({ contentIndex, kind, text: chunk }); | ||
| } | ||
| } | ||
|
|
||
| /** Drop the live entry for a session (commit or turn end). */ | ||
| export function clearStreaming(sessionId: string): void { | ||
| delete streamingBySession[sessionId]; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ import { | |
| STORAGE_KEYS, | ||
| } from '../lib/storage'; | ||
| import { createEventBatcher, isRenderEvent } from './client/eventBatcher'; | ||
| import { appendStreamingDelta, clearStreaming } from './client/streamingStore'; | ||
| import { useAppearance } from './client/useAppearance'; | ||
| import { useNotification } from './client/useNotification'; | ||
| import { useTaskPoller } from './client/useTaskPoller'; | ||
|
|
@@ -486,6 +487,7 @@ function forgetSession(sessionId: string): void { | |
| // That would make hasLoadedMessages() treat the stale empty cache as | ||
| // authoritative and skip the next snapshot fetch for this id. | ||
| enqueueEvent.flush(); | ||
| clearStreaming(sessionId); | ||
| removeSession(sessionId); | ||
| removeSessionMessages(sessionId); | ||
| delete rawState.approvalsBySession[sessionId]; | ||
|
|
@@ -639,8 +641,28 @@ function nextOptimisticMsgId(): string { | |
| // past the queue check and clobber promptIdBySession (breaking abort). | ||
| const inFlightPromptSessions = new Set<string>(); | ||
|
|
||
| // Mirror of the reducer's advanceSeq, for the one event (assistantDelta) that | ||
| // bypasses the reducer. lastSeqBySession is a resync cursor with no rendering | ||
| // dependencies, so mutating it in place is both safe and cheap. | ||
| function advanceSeqCursor(sessionId: string | undefined, seq: number | undefined): void { | ||
| if (sessionId !== undefined && seq !== undefined && seq > 0) { | ||
| const prev = rawState.lastSeqBySession[sessionId] ?? 0; | ||
| if (seq > prev) rawState.lastSeqBySession[sessionId] = seq; | ||
| } | ||
| } | ||
|
|
||
| // Helper: mutate rawState by applying a reducer on a snapshot then re-assigning fields | ||
| function applyEvent(event: ReturnType<typeof toAppEvent>, sessionId: string, seq: number): void { | ||
| // Streaming text/thinking deltas bypass the reducer entirely. Appending to the | ||
| // fine-grained streaming store is O(1) and dirties only the single | ||
| // StreamingBlocks component — instead of cloning all of `rawState` and | ||
| // re-rendering the whole App + sidebar on every token. | ||
| if (event.type === 'assistantDelta') { | ||
| appendStreamingDelta(sessionId, event.messageId, event.contentIndex, event.delta); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Because this writes deltas outside Useful? React with 👍 / 👎. |
||
| advanceSeqCursor(sessionId, seq); | ||
| return; | ||
| } | ||
|
|
||
| const snapshot: KimiClientState = { | ||
| sessions: rawState.sessions, | ||
| activeSessionId: rawState.activeSessionId, | ||
|
|
@@ -670,6 +692,20 @@ function applyEvent(event: ReturnType<typeof toAppEvent>, sessionId: string, seq | |
| rawState.config = next.config ?? null; | ||
| rawState.warnings = next.warnings; | ||
|
|
||
| // `messageUpdated` carries the authoritative full content of a message (tool | ||
| // slot / step end / turn end): drop the live streaming entry so the just- | ||
| // committed content takes over without rendering the same text twice. | ||
| if (event.type === 'messageUpdated') { | ||
| clearStreaming(sessionId); | ||
| } | ||
| // Turn end: release the streaming entry for the session. | ||
| if ( | ||
| event.type === 'sessionStatusChanged' && | ||
| (event.status === 'idle' || event.status === 'aborted') | ||
| ) { | ||
| clearStreaming(sessionId); | ||
| } | ||
|
|
||
| if (event.type === 'configChanged') { | ||
| rawState.defaultModel = event.config.defaultModel ?? null; | ||
| } | ||
|
|
@@ -1028,8 +1064,14 @@ async function syncSessionFromSnapshot(sessionId: string): Promise<SyncSessionRe | |
| // messagesBySession[sessionId]. The snapshot is authoritative (it already | ||
| // contains everything up to asOfSeq); applying stale queued deltas on top | ||
| // of it would duplicate text / tool output. Flushing here applies them to | ||
| // the pre-snapshot array, which the snapshot then overwrites. | ||
| // the pre-snapshot state, which the snapshot then overwrites. | ||
| enqueueEvent.flush(); | ||
| // The snapshot is authoritative for the live streaming text too: any deltas | ||
| // the flush just landed in the streaming store are superseded by the | ||
| // snapshot (and the in-flight seed below), so drop them. Without this, a | ||
| // reconnect or delta-gap resync mid-stream would render stale live chunks | ||
| // on top of the seeded snapshot. | ||
| clearStreaming(sessionId); | ||
|
|
||
| updateSession(sessionId, (s) => ({ | ||
| ...snap.session, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a user clicks a still-streaming thinking block, this emits
blockIndex: blk.contentIndex, but the live block is no longer present inclient.turnsbecauseassistantDeltanow bypassesmessagesBySession;useDetailPanelstill readsturn.blocks?.[blockIndex], so the side panel either stays closed or shows an older committed thinking block. This regresses the existing “click thinking to view full text” behavior for long live thoughts until the finalmessageUpdatedarrives.Useful? React with 👍 / 👎.