Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/web/src/app/api/chat/link-account/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ async function reprocessLinkedMessage(
message,
platformIntegration,
user,
state: bot.getState(),
});
},
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import {
} from '@/lib/bot/request-logging';
import { parseBotCallbackStep } from '@/lib/bot/step-budget';
import { runBotAgent, type BotAgentMessageLike } from '@/lib/bot/agent-runner';
import { getRehydratedBotRequestMessageState } from '@/lib/bot/message-state';
import { botPlatforms } from '@/lib/bot/platforms';
import { getPlatformIntegrationById } from '@/lib/bot/platform-helpers';
import { findUserById } from '@/lib/user';
import type { Thread } from 'chat';
import type { Message } from 'chat';
import { type Thread } from 'chat';

type ExecutionCallbackPayload = {
sessionId: string;
Expand Down Expand Up @@ -183,6 +185,7 @@ async function continueBotAgentAfterCallback(params: {
requestRow: Awaited<ReturnType<typeof getBotRequest>>;
platformIntegration: PlatformIntegration;
thread: Thread;
message: Message;
continuationPrompt: string;
completedStepCount: number;
}) {
Expand All @@ -195,23 +198,8 @@ async function continueBotAgentAfterCallback(params: {
return await botPlatforms.require(params.platformIntegration.platform).withAuthContext({
platformIntegration: params.platformIntegration,
fn: async () => {
const originalMessage = await Promise.resolve(
params.thread.adapter.fetchMessage?.(
params.thread.id,
params.requestRow.platform_message_id
) ?? null
).catch(error => {
console.warn('[BotSessionCallback] Failed to fetch original platform message:', {
error,
platform: params.platformIntegration.platform,
threadId: params.thread.id,
messageId: params.requestRow.platform_message_id,
});
return null;
});

const callbackMessage: BotAgentMessageLike = {
author: originalMessage?.author ?? {
author: params.message.author ?? {
fullName: 'Cloud Agent Callback',
isBot: false,
isMe: false,
Expand Down Expand Up @@ -402,6 +390,7 @@ async function handleCompletedCallback(
requestRow: NonNullable<Awaited<ReturnType<typeof getBotRequest>>>,
platformIntegration: PlatformIntegration,
thread: Thread,
message: Message,
completedStepCount: number,
trackedCallbackSession: BotRequestCloudAgentSession | undefined
) {
Expand Down Expand Up @@ -642,6 +631,7 @@ ${cloudAgentResultsForPrompt}`;
requestRow,
platformIntegration,
thread,
message,
continuationPrompt,
completedStepCount,
});
Expand Down Expand Up @@ -882,7 +872,13 @@ export async function POST(
requestRow.platform_integration_id
);
await bot.initialize();
const thread = bot.thread(requestRow.platform_thread_id);
const { thread, message } = await getRehydratedBotRequestMessageState(botRequestId);
Comment thread
RSO marked this conversation as resolved.
Outdated

logCallback('Resolved callback chat context', {
botRequestId,
threadId: thread.id,
messageId: message?.id,
});

if (childSessionStatus && trackedCallbackSession) {
try {
Expand Down Expand Up @@ -934,6 +930,7 @@ export async function POST(
requestRow,
platformIntegration,
thread,
message,
completedStepCount,
trackedCallbackSession
);
Expand Down
39 changes: 1 addition & 38 deletions apps/web/src/lib/bot-identity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import * as z from 'zod';
import { NEXTAUTH_SECRET } from '@/lib/config.server';
import { botIdentityRedisKey } from '@/lib/redis-keys';
import { PLATFORM } from '@/lib/integrations/core/constants';
import { serializedMessageSchema, serializedThreadSchema } from '@/lib/bot/message-state';

const CHAT_SDK_CACHE_KEY_PREFIX = 'chat-sdk:cache:';
const LINK_ACCOUNT_CONTEXT_KEY_PREFIX = 'link-account-context:';
Expand Down Expand Up @@ -152,44 +153,6 @@ const platformIdentitySchema = z.object({
userId: z.string(),
});

const serializedThreadShape = z.looseObject({
_type: z.literal('chat:Thread'),
adapterName: z.string(),
channelId: z.string(),
id: z.string(),
isDM: z.boolean(),
});

const serializedThreadSchema = z.custom<SerializedThread>(
value => serializedThreadShape.safeParse(value).success
);

const serializedMessageShape = z.looseObject({
_type: z.literal('chat:Message'),
attachments: z.array(z.unknown()),
author: z.object({
userId: z.string(),
userName: z.string(),
fullName: z.string(),
isBot: z.union([z.boolean(), z.literal('unknown')]),
isMe: z.boolean(),
}),
formatted: z.unknown(),
id: z.string(),
metadata: z.object({
dateSent: z.iso.datetime(),
edited: z.boolean(),
editedAt: z.iso.datetime().optional(),
}),
raw: z.unknown(),
text: z.string(),
threadId: z.string(),
});

const serializedMessageSchema = z.custom<SerializedMessage>(
value => serializedMessageShape.safeParse(value).success
);

const linkTokenPayloadSchema = z.object({
identity: platformIdentitySchema,
contextKey: z.string(),
Expand Down
8 changes: 5 additions & 3 deletions apps/web/src/lib/bot.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ declare global {

globalThis.__botTestMentionHandler ??= null;

let mockState: { kind: string } | undefined;

function getMockState() {
return { kind: 'state' };
mockState ??= { kind: 'state' };
return mockState;
}

function getMockSlackAdapter() {
Expand Down Expand Up @@ -188,8 +191,6 @@ const mockedCanKiloUserAccessPlatformIntegration = jest.mocked(
const mockedGetPlatformIntegration = jest.mocked(getPlatformIntegration);
const mockedFindUserById = jest.mocked(findUserById);
const mockedProcessLinkedMessage = jest.mocked(processLinkedMessage);
const mockState = getMockState();

function makeThread() {
return { id: 'thread-1', adapter: { name: 'slack' } };
}
Expand Down Expand Up @@ -259,6 +260,7 @@ describe('bot mention authorization', () => {
message,
platformIntegration: integration,
user,
state: mockState,
});
});
});
Expand Down
8 changes: 7 additions & 1 deletion apps/web/src/lib/bot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,13 @@ function createKiloBot(
}

try {
await processLinkedMessage({ thread, message, platformIntegration, user });
await processLinkedMessage({
thread,
message,
platformIntegration,
user,
state: chatBot.getState(),
});
} catch (error) {
console.error('[Bot] Unhandled error in message handler:', error);
await thread.post({ markdown: 'Sorry, something went wrong while processing your message.' });
Expand Down
112 changes: 112 additions & 0 deletions apps/web/src/lib/bot/message-state.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import 'server-only';
import * as z from 'zod';
import {
ThreadImpl,
Message,
type SerializedMessage,
type SerializedThread,
type StateAdapter,
type Thread,
} from 'chat';
import { bot } from '@/lib/bot';

const BOT_REQUEST_MESSAGE_STATE_KEY_PREFIX = 'bot-request-message-state:';
const BOT_REQUEST_MESSAGE_STATE_TTL_MS = 24 * 60 * 60 * 1000;

type BotRequestMessageState = {
thread: SerializedThread;
message: SerializedMessage;
};

const serializedThreadShape = z.looseObject({
_type: z.literal('chat:Thread'),
adapterName: z.string(),
channelId: z.string(),
id: z.string(),
isDM: z.boolean(),
});

export const serializedThreadSchema = z.custom<SerializedThread>(
value => serializedThreadShape.safeParse(value).success
);

const serializedMessageShape = z.looseObject({
_type: z.literal('chat:Message'),
attachments: z.array(z.unknown()),
author: z.object({
userId: z.string(),
userName: z.string(),
fullName: z.string(),
isBot: z.union([z.boolean(), z.literal('unknown')]),
isMe: z.boolean(),
}),
formatted: z.unknown(),
id: z.string(),
metadata: z.object({
dateSent: z.iso.datetime(),
edited: z.boolean(),
editedAt: z.iso.datetime().optional(),
}),
raw: z.unknown(),
text: z.string(),
threadId: z.string(),
});

export const serializedMessageSchema = z.custom<SerializedMessage>(
value => serializedMessageShape.safeParse(value).success
);

const botRequestMessageStateSchema = z.object({
thread: serializedThreadSchema,
message: serializedMessageSchema,
});

function botRequestMessageStateKey(botRequestId: string): string {
return `${BOT_REQUEST_MESSAGE_STATE_KEY_PREFIX}${botRequestId}`;
}

export async function storeBotRequestMessageState({
state,
botRequestId,
thread,
message,
}: {
state: StateAdapter;
botRequestId: string;
thread: Thread;
message: Message;
}): Promise<void> {
await state.set<BotRequestMessageState>(
botRequestMessageStateKey(botRequestId),
{
thread: thread.toJSON(),
message: message.toJSON(),
},
BOT_REQUEST_MESSAGE_STATE_TTL_MS
);
}

export async function getBotRequestMessageState(
state: StateAdapter,
botRequestId: string
): Promise<BotRequestMessageState | null> {
const value = await state.get<unknown>(botRequestMessageStateKey(botRequestId));
if (!value) {
return null;
}

return botRequestMessageStateSchema.parse(value);
}

export async function getRehydratedBotRequestMessageState(botRequestId: string) {
const stored = await getBotRequestMessageState(bot.getState(), botRequestId);

if (!stored) {
throw new Error('Could not find message state for botRequest ' + botRequestId);
}

return {
thread: ThreadImpl.fromJSON(stored.thread),
message: Message.fromJSON(stored.message),
};
}
11 changes: 10 additions & 1 deletion apps/web/src/lib/bot/run.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
import { createBotRequest, updateBotRequest } from '@/lib/bot/request-logging';
import { runBotAgent } from '@/lib/bot/agent-runner';
import { extractAndUploadImages } from '@/lib/bot/images';
import { storeBotRequestMessageState } from '@/lib/bot/message-state';
import type { PlatformIntegration, User } from '@kilocode/db';
import type { Message, Thread } from 'chat';
import type { Message, StateAdapter, Thread } from 'chat';
import { captureException } from '@sentry/nextjs';

export async function processLinkedMessage({
thread,
message,
platformIntegration,
user,
state,
}: {
thread: Thread;
message: Message;
platformIntegration: PlatformIntegration;
user: User;
state: StateAdapter;
}) {
await thread.startTyping('Thinking...');

Expand All @@ -30,6 +33,12 @@ export async function processLinkedMessage({
userMessage: message.text,
modelUsed: undefined,
});
await storeBotRequestMessageState({
Comment thread
RSO marked this conversation as resolved.
state,
botRequestId,
thread,
message,
});
} catch (error) {
captureException(error, {
tags: { component: 'kilo-bot', op: 'create-bot-request' },
Expand Down
Loading