Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions services/gastown/container/src/agent-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,19 @@ async function createMayorWorkspace(rigId: string): Promise<string> {
return createLightweightWorkspace('mayor', rigId);
}

/**
* Ensure the mayor workdir exists on disk for a given town, creating
* a lightweight git-initialized workspace if needed.
*
* Used by `prewarmMayorSDK`, which runs before `runAgent` and so cannot
* rely on `createMayorWorkspace` having been called yet — without this,
* `ensureSDKServer` would throw `ENOENT` from `process.chdir(workdir)`
* and the prewarm benefit would never materialize on cold containers.
*/
export async function ensureMayorWorkspaceForTown(townId: string): Promise<string> {
return createMayorWorkspace(`mayor-${townId}`);
}

/**
* Write the mayor's system prompt to AGENTS.md in the workspace.
*
Expand Down
33 changes: 32 additions & 1 deletion services/gastown/container/src/control-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
registerEventSink,
refreshTokenForAllAgents,
listAgents,
awaitHydration,
} from './process-manager';
import { log } from './logger';
import { startHeartbeat, stopHeartbeat, notifyContainerReady } from './heartbeat';
Expand Down Expand Up @@ -262,7 +263,19 @@ app.post('/refresh-token', async c => {
if (!body || typeof body !== 'object' || !('token' in body) || typeof body.token !== 'string') {
return c.json({ error: 'Missing or invalid token field' }, 400);
}
process.env.GASTOWN_CONTAINER_TOKEN = body.token;
// Capture the new token into a local so it survives the await below.
const newToken = body.token;

// Wait for boot hydration to release the global sdkServerLock before
// we mutate process.env or serialise N agent restarts through it.
// Without this gate, a mid-hydration token refresh can cause
// buildPrewarmEnv to pick up a different token than the one hydration
// captured locally — matching the PATCH /agents/:id/model handler
// which also gates first.
await awaitHydration();

// Now safe to assign: hydration is done, no concurrent env readers.
process.env.GASTOWN_CONTAINER_TOKEN = newToken;

const activeAgents = listAgents().filter(a => a.status === 'running' || a.status === 'starting');
log.info('refresh_token.received', {
Expand Down Expand Up @@ -312,6 +325,15 @@ app.post('/agents/start', async c => {
return c.json({ error: 'Invalid request body', issues: parsed.error.issues }, 400);
}

// Wait for boot hydration to release the global sdkServerLock. The
// control server starts accepting requests immediately at boot, before
// bootHydration finishes resuming registry agents and prewarming the
// mayor — without this gate, fresh dispatches queue behind every
// serialised SDK spawn and the DO-side AbortSignal.timeout(60s) fires
// before they ever get the lock, surfacing as the
// "startAgentInContainer EXCEPTION TimeoutError" pattern.
await awaitHydration();

// Persist the organization ID as a standalone env var so it survives
// config rebuilds (e.g. model hot-swap). The env var is the primary
// source of truth; KILO_CONFIG_CONTENT extraction is the fallback.
Expand Down Expand Up @@ -386,6 +408,15 @@ app.patch('/agents/:agentId/model', async c => {
return c.json({ error: 'Invalid request body', issues: parsed.error.issues }, 400);
}

// Model hot-swap restarts the SDK server (see updateAgentModel) and
// contends for the same global sdkServerLock that boot hydration is
// holding. Wait for hydration to drain BEFORE the env mutations
// below: concurrent PATCH requests landing during hydration would
// otherwise race on process.env writes before any of them holds the
// SDK lock, and the env visible to the eventual `kilo serve` spawn
// would be non-deterministic.
await awaitHydration();

// Update org billing context from the request body if provided.
if (parsed.data.organizationId) {
process.env.GASTOWN_ORGANIZATION_ID = parsed.data.organizationId;
Expand Down
165 changes: 163 additions & 2 deletions services/gastown/container/src/process-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,19 @@ import { describe, it, expect, vi } from 'vitest';
vi.mock('@kilocode/sdk', () => ({
createKilo: vi.fn(),
}));
// Mock workspace helpers to return a path that actually exists on the
// test runner so ensureSDKServer's process.chdir doesn't ENOENT.
const TEST_WORKSPACE = process.cwd();
vi.mock('./agent-runner', () => ({
runAgent: vi.fn(),
buildKiloConfigContent: vi.fn(),
buildKiloConfigContent: vi.fn(
(kilocodeToken: string, model: string, smallModel: string, organizationId?: string) =>
JSON.stringify({ kilocodeToken, model, smallModel, organizationId })
),
resolveGitCredentials: vi.fn(),
writeMayorSystemPromptToAgentsMd: vi.fn(),
ensureMayorWorkspaceForTown: vi.fn(async (_townId: string) => TEST_WORKSPACE),
mayorWorkdirForTown: vi.fn((_townId: string) => TEST_WORKSPACE),
}));
vi.mock('./control-server', () => ({
getCurrentTownConfig: vi.fn(() => ({})),
Expand All @@ -24,7 +32,8 @@ vi.mock('./token-refresh', () => ({
refreshTokenIfNearExpiry: vi.fn(),
}));

const { applyModelToSession, withStartAgentLock } = await import('./process-manager');
const { applyModelToSession, withStartAgentLock, awaitHydration, bootHydration } =
await import('./process-manager');

type PromptCall = {
path: { id: string };
Expand Down Expand Up @@ -178,3 +187,155 @@ describe('withStartAgentLock', () => {
expect(result).toBe('ok');
});
});

describe('awaitHydration', () => {
it('resolves immediately before any bootHydration call', async () => {
// Module-init state must not block /agents/start in test/dev contexts
// where bootHydration never runs.
let resolved = false;
void awaitHydration().then(() => {
resolved = true;
});
await new Promise(r => setTimeout(r, 0));
expect(resolved).toBe(true);
});

it('prewarms mayor SDK with env that mirrors buildAgentEnv (mayor tools require GASTOWN_AGENT_ROLE/AGENT_ID/TOWN_ID)', async () => {
// Without these env vars in the snapshot kilo serve takes at spawn,
// GastownPlugin (plugin/index.ts) treats the prewarmed mayor as a
// rig agent (or fails the createMayorClientFromEnv guard) and the
// server boots with NO mayor tools. ensureSDKServer's cache hit on
// the next /agents/start hands back that defective server.
const { createKilo } = (await import('@kilocode/sdk')) as unknown as {
createKilo: ReturnType<typeof vi.fn>;
};

const prev = {
apiUrl: process.env.GASTOWN_API_URL,
townId: process.env.GASTOWN_TOWN_ID,
token: process.env.GASTOWN_CONTAINER_TOKEN,
};
process.env.GASTOWN_API_URL = 'http://test.invalid';
process.env.GASTOWN_TOWN_ID = 'town-prewarm';
process.env.GASTOWN_CONTAINER_TOKEN = 'tok-prewarm';

let capturedEnv: Record<string, string | undefined> | null = null;
createKilo.mockImplementationOnce(() => {
// Snapshot the keys plugin/index.ts and plugin/client.ts read.
capturedEnv = {
GASTOWN_AGENT_ID: process.env.GASTOWN_AGENT_ID,
GASTOWN_AGENT_ROLE: process.env.GASTOWN_AGENT_ROLE,
GASTOWN_TOWN_ID: process.env.GASTOWN_TOWN_ID,
GASTOWN_API_URL: process.env.GASTOWN_API_URL,
GASTOWN_CONTAINER_TOKEN: process.env.GASTOWN_CONTAINER_TOKEN,
KILO_CONFIG_CONTENT: process.env.KILO_CONFIG_CONTENT,
};
return Promise.resolve({
client: {} as unknown,
server: { url: 'http://127.0.0.1:9999/', close: () => {} },
});
});

const originalFetch = globalThis.fetch;
globalThis.fetch = vi.fn(async (input: Parameters<typeof fetch>[0]) => {
const url = typeof input === 'string' ? input : input instanceof URL ? input.href : input.url;
if (url.includes('/container-registry')) {
return new Response(JSON.stringify({ data: [] }), { status: 200 });
}
if (url.includes('/mayor-id')) {
return new Response(
JSON.stringify({
success: true,
agentId: 'mayor-agent-1',
model: 'anthropic/claude-sonnet-4.6',
smallModel: 'anthropic/claude-haiku-4.5',
kilocodeToken: 'kc-tok',
organizationId: null,
}),
{ status: 200 }
);
}
// db-snapshot etc: 404 -> fresh start
return new Response('not found', { status: 404 });
}) as unknown as typeof fetch;

try {
await bootHydration();
const env = capturedEnv as Record<string, string | undefined> | null;
expect(env).not.toBeNull();
expect(env).toMatchObject({
GASTOWN_AGENT_ID: 'mayor-agent-1',
GASTOWN_AGENT_ROLE: 'mayor',
GASTOWN_TOWN_ID: 'town-prewarm',
GASTOWN_CONTAINER_TOKEN: 'tok-prewarm',
});
expect(env?.KILO_CONFIG_CONTENT).toBeTruthy();
} finally {
globalThis.fetch = originalFetch;
if (prev.apiUrl !== undefined) process.env.GASTOWN_API_URL = prev.apiUrl;
else delete process.env.GASTOWN_API_URL;
if (prev.townId !== undefined) process.env.GASTOWN_TOWN_ID = prev.townId;
else delete process.env.GASTOWN_TOWN_ID;
if (prev.token !== undefined) process.env.GASTOWN_CONTAINER_TOKEN = prev.token;
else delete process.env.GASTOWN_CONTAINER_TOKEN;
}
});

it('blocks awaiters while bootHydration is in flight and releases them when it returns', async () => {
// Drive bootHydration into its registry-fetch path with a fetch
// stub that we can hold open from the test, so we can observe a
// real "in flight" window for the gate.
const prev = {
apiUrl: process.env.GASTOWN_API_URL,
townId: process.env.GASTOWN_TOWN_ID,
token: process.env.GASTOWN_CONTAINER_TOKEN,
};
process.env.GASTOWN_API_URL = 'http://test.invalid';
process.env.GASTOWN_TOWN_ID = 'town-test';
process.env.GASTOWN_CONTAINER_TOKEN = 'tok-test';

// Use a single barrier the fetch stub awaits so every call (registry
// fetch + prewarm endpoints) holds the gate until we release it,
// and each call gets its own Response (avoids "body already read").
let releaseFetch!: () => void;
const fetchBarrier = new Promise<void>(resolve => {
releaseFetch = resolve;
});
const originalFetch = globalThis.fetch;
globalThis.fetch = vi.fn(async (input: Parameters<typeof fetch>[0]) => {
await fetchBarrier;
const url = typeof input === 'string' ? input : input instanceof URL ? input.href : input.url;
if (url.includes('/container-registry')) {
return new Response(JSON.stringify({ data: [] }), { status: 200 });
}
return new Response(JSON.stringify({ success: true, agentId: null }), { status: 200 });
}) as unknown as typeof fetch;

try {
const hydrationPromise = bootHydration();
let awaiterResolved = false;
void awaitHydration().then(() => {
awaiterResolved = true;
});

// Yield to let the registry fetch start. The gate is now held
// until the fetch resolves.
await new Promise(r => setTimeout(r, 10));
expect(awaiterResolved).toBe(false);

releaseFetch();
await hydrationPromise;
// After bootHydration returns, the gate must release any awaiters.
await new Promise(r => setTimeout(r, 0));
expect(awaiterResolved).toBe(true);
} finally {
globalThis.fetch = originalFetch;
if (prev.apiUrl !== undefined) process.env.GASTOWN_API_URL = prev.apiUrl;
else delete process.env.GASTOWN_API_URL;
if (prev.townId !== undefined) process.env.GASTOWN_TOWN_ID = prev.townId;
else delete process.env.GASTOWN_TOWN_ID;
if (prev.token !== undefined) process.env.GASTOWN_CONTAINER_TOKEN = prev.token;
else delete process.env.GASTOWN_CONTAINER_TOKEN;
}
});
});
Loading