Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 108 additions & 16 deletions src/daemon/host.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { randomUUID } from 'node:crypto';
import fs from 'node:fs/promises';
import net from 'node:net';
import path from 'node:path';
import { loadDaemonConfig, type ServerDefinition } from '../config.js';
import { writeJsonFile } from '../fs-json.js';
import { readJsonFile, withFileLock, writeJsonFile } from '../fs-json.js';
import { isKeepAliveServer } from '../lifecycle.js';
import { createRuntime, type Runtime } from '../runtime.js';
import { collectConfigLayers, statConfigMtime } from './config-layers.js';
Expand Down Expand Up @@ -83,7 +84,6 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise<void> {
logPath: options.logPath,
});

await prepareSocket(options.socketPath);
await fs.mkdir(path.dirname(options.metadataPath), { recursive: true });
const configMtimeMs = await statConfigMtime(options.configPath);

Expand Down Expand Up @@ -188,29 +188,121 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise<void> {
});
});

await new Promise<void>((resolve, reject) => {
server.once('error', reject);
server.listen(options.socketPath, () => {
server.off('error', reject);
resolve();
let claimed = false;
await withFileLock(`${options.metadataPath}.bind`, async () => {
const live = await probeLiveDaemon(options.socketPath);
if (live && (await metadataMatches(options.metadataPath, live))) {
return;
}
await prepareSocket(options.socketPath);
await new Promise<void>((resolve, reject) => {
server.once('error', reject);
server.listen(options.socketPath, () => {
server.off('error', reject);
resolve();
});
});
await writeJsonFile(options.metadataPath, {
pid: process.pid,
socketPath: options.socketPath,
configPath: options.configPath,
configLayers,
startedAt: Date.now(),
logPath: options.logPath ?? null,
configMtimeMs,
});
claimed = true;
});

await writeJsonFile(options.metadataPath, {
pid: process.pid,
socketPath: options.socketPath,
configPath: options.configPath,
configLayers,
startedAt: Date.now(),
logPath: options.logPath ?? null,
configMtimeMs,
});
if (!claimed) {
logEvent(logContext, 'Daemon already running for this config; exiting without rebinding.');
server.close();
await runtime.close().catch(() => {});
await disposeLogContext(logContext).catch(() => {});
process.exit(0);
}

process.once('SIGINT', shutdown);
process.once('SIGTERM', shutdown);
process.once('SIGQUIT', shutdown);
}

const DAEMON_PROBE_TIMEOUT_MS = 2_000;

export async function isDaemonResponding(socketPath: string): Promise<boolean> {
return (await probeLiveDaemon(socketPath)) !== null;
}

async function probeLiveDaemon(socketPath: string): Promise<StatusResult | null> {
const status = await probeDaemonStatus(socketPath);
if (!status || status.socketPath !== socketPath || !isProcessAlive(status.pid)) {
return null;
}
return status;
}

export async function metadataMatches(
metadataPath: string,
live: Pick<StatusResult, 'pid' | 'socketPath'>
): Promise<boolean> {
try {
const existing = await readJsonFile<{ pid?: number; socketPath?: string }>(metadataPath);
return existing?.pid === live.pid && existing?.socketPath === live.socketPath;
} catch {
return false;
}
}

function isProcessAlive(pid: number): boolean {
if (!Number.isInteger(pid) || pid <= 0) {
return false;
}
try {
process.kill(pid, 0);
return true;
} catch (error) {
return (error as NodeJS.ErrnoException).code === 'EPERM';
}
}

async function probeDaemonStatus(socketPath: string): Promise<StatusResult | null> {
return await new Promise<StatusResult | null>((resolve) => {
const probe = net.createConnection(socketPath);
let buffer = '';
let settled = false;
const finish = (status: StatusResult | null): void => {
if (settled) {
return;
}
settled = true;
probe.removeAllListeners();
probe.destroy();
resolve(status);
};
const parse = (): StatusResult | null => {
try {
const response = JSON.parse(buffer.trim()) as DaemonResponse<StatusResult>;
return response.ok && response.result ? response.result : null;
} catch {
return null;
}
};
probe.setTimeout(DAEMON_PROBE_TIMEOUT_MS, () => finish(null));
probe.once('connect', () => {
probe.write(JSON.stringify({ id: randomUUID(), method: 'status', params: {} } satisfies DaemonRequest));
});
probe.on('data', (chunk) => {
buffer += chunk.toString();
const status = parse();
if (status) {
finish(status);
}
});
probe.once('end', () => finish(parse()));
probe.once('error', () => finish(null));
});
}

async function prepareSocket(socketPath: string): Promise<void> {
if (process.platform === 'win32') {
return;
Expand Down
110 changes: 108 additions & 2 deletions tests/daemon-host.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { describe, expect, it, vi } from 'vitest';
import { randomUUID } from 'node:crypto';
import fs from 'node:fs/promises';
import net from 'node:net';
import os from 'node:os';
import path from 'node:path';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import type { ServerDefinition } from '../src/config.js';
import { __testProcessRequest } from '../src/daemon/host.js';
import { __testProcessRequest, isDaemonResponding, metadataMatches } from '../src/daemon/host.js';
import type { DaemonRequest } from '../src/daemon/protocol.js';
import type { Runtime } from '../src/runtime.js';

Expand Down Expand Up @@ -111,6 +116,107 @@ describe('daemon host request handling', () => {
});
});

const describeUnixSocket = process.platform === 'win32' ? describe.skip : describe;

describeUnixSocket('isDaemonResponding', () => {
const servers: net.Server[] = [];
const connections: net.Socket[] = [];
const socketPaths: string[] = [];

function socketPath(): string {
const p = path.join(os.tmpdir(), `mcporter-probe-${randomUUID().slice(0, 8)}.sock`);
socketPaths.push(p);
return p;
}

function listen(server: net.Server, p: string): Promise<void> {
servers.push(server);
server.on('connection', (socket) => connections.push(socket));
return new Promise((resolve) => server.listen(p, () => resolve()));
}

afterEach(async () => {
for (const socket of connections.splice(0)) {
socket.destroy();
}
for (const server of servers.splice(0)) {
await new Promise<void>((resolve) => server.close(() => resolve()));
}
for (const p of socketPaths.splice(0)) {
await fs.rm(p, { force: true }).catch(() => {});
}
});

function statusServer(result: Record<string, unknown>): net.Server {
return net.createServer((socket) => {
socket.on('data', () => socket.end(JSON.stringify({ id: '1', ok: true, result })));
});
}

it('returns true when the socket answers status with a matching socket and live pid', async () => {
const p = socketPath();
await listen(statusServer({ pid: process.pid, socketPath: p }), p);
expect(await isDaemonResponding(p)).toBe(true);
});

it('returns false when the socket accepts but never responds (hung daemon)', async () => {
const p = socketPath();
await listen(
net.createServer((socket) => socket.pause()),
p
);
expect(await isDaemonResponding(p)).toBe(false);
}, 5_000);

it('returns false when status reports a different socket (foreign listener)', async () => {
const p = socketPath();
await listen(statusServer({ pid: process.pid, socketPath: '/some/other/daemon.sock' }), p);
expect(await isDaemonResponding(p)).toBe(false);
});

it('returns false when status reports a dead pid', async () => {
const p = socketPath();
await listen(statusServer({ pid: 2_147_483_646, socketPath: p }), p);
expect(await isDaemonResponding(p)).toBe(false);
});

it('returns false when nothing is listening', async () => {
expect(await isDaemonResponding(socketPath())).toBe(false);
});
});

describe('metadataMatches', () => {
let metadataPath: string;
const live = { pid: 4321, socketPath: '/tmp/daemon.sock' };

beforeEach(async () => {
metadataPath = path.join(os.tmpdir(), `mcporter-meta-${randomUUID().slice(0, 8)}.json`);
});

afterEach(async () => {
await fs.rm(metadataPath, { force: true }).catch(() => {});
});

it('matches when pid and socket agree', async () => {
await fs.writeFile(metadataPath, JSON.stringify({ pid: 4321, socketPath: '/tmp/daemon.sock' }), 'utf8');
expect(await metadataMatches(metadataPath, live)).toBe(true);
});

it('does not match a different pid', async () => {
await fs.writeFile(metadataPath, JSON.stringify({ pid: 9999, socketPath: '/tmp/daemon.sock' }), 'utf8');
expect(await metadataMatches(metadataPath, live)).toBe(false);
});

it('does not match when metadata is missing', async () => {
expect(await metadataMatches(metadataPath, live)).toBe(false);
});

it('does not match when metadata is corrupt', async () => {
await fs.writeFile(metadataPath, '{ not json', 'utf8');
expect(await metadataMatches(metadataPath, live)).toBe(false);
});
});

function createRuntimeDouble(): Pick<Runtime, 'callTool' | 'listTools'> {
return {
callTool: vi.fn().mockResolvedValue({ ok: true }),
Expand Down
112 changes: 111 additions & 1 deletion tests/daemon.integration.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { execFile } from 'node:child_process';
import { type ChildProcess, execFile, spawn } from 'node:child_process';
import fs from 'node:fs/promises';
import { createRequire } from 'node:module';
import os from 'node:os';
Expand Down Expand Up @@ -173,4 +173,114 @@ await new Promise((resolve) => {
await fs.rm(tempDir, { recursive: true, force: true }).catch(() => {});
}
}, 40_000);

it('refuses duplicate binds when foreground starts race outside the client lock', async () => {
await ensureDistBuilt();
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), 'mcporter-daemon-bind-'));
const scriptPath = path.join(tempDir, 'bind-server.mjs');
const configPath = path.join(tempDir, 'mcporter.bind.json');

const serverSource = `import { McpServer } from '${MCP_SERVER_MODULE}';
import { StdioServerTransport } from '${STDIO_SERVER_MODULE}';
const server = new McpServer({ name: 'bind-e2e', version: '1.0.0' });
server.registerTool('ping', { title: 'ping', description: 'ping', inputSchema: {} }, async () => ({
content: [{ type: 'text', text: 'pong' }],
}));
await server.connect(new StdioServerTransport());
await new Promise(() => {});
`;
await fs.writeFile(scriptPath, serverSource, 'utf8');
await fs.writeFile(
configPath,
JSON.stringify({
mcpServers: {
'bind-e2e': { description: 'bind race server', command: 'node', args: [scriptPath], lifecycle: 'keep-alive' },
},
}),
'utf8'
);

const children: ChildProcess[] = [];
try {
await runCli(['daemon', 'stop'], configPath).catch(() => {});
for (let i = 0; i < 4; i++) {
children.push(
spawn(process.execPath, [CLI_ENTRY, '--config', configPath, 'daemon', 'start', '--foreground'], {
env: { ...process.env, MCPORTER_NO_FORCE_EXIT: '1' },
stdio: 'ignore',
})
);
}
await new Promise((resolve) => setTimeout(resolve, 4_000));
const alive = children.filter((child) => child.exitCode === null && child.signalCode === null);
expect(alive).toHaveLength(1);
} finally {
for (const child of children) {
child.kill('SIGKILL');
}
await runCli(['daemon', 'stop'], configPath).catch(() => {});
await fs.rm(tempDir, { recursive: true, force: true }).catch(() => {});
}
}, 40_000);

it('rebinds when a live daemon owns the socket but metadata is missing', async () => {
await ensureDistBuilt();
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), 'mcporter-daemon-meta-'));
const scriptPath = path.join(tempDir, 'meta-server.mjs');
const configPath = path.join(tempDir, 'mcporter.meta.json');
const metadataPath = path.join(tempDir, 'daemon.json');

const serverSource = `import { McpServer } from '${MCP_SERVER_MODULE}';
import { StdioServerTransport } from '${STDIO_SERVER_MODULE}';
const server = new McpServer({ name: 'meta-e2e', version: '1.0.0' });
server.registerTool('ping', { title: 'ping', description: 'ping', inputSchema: {} }, async () => ({
content: [{ type: 'text', text: 'pong' }],
}));
await server.connect(new StdioServerTransport());
await new Promise(() => {});
`;
await fs.writeFile(scriptPath, serverSource, 'utf8');
await fs.writeFile(
configPath,
JSON.stringify({
mcpServers: {
'meta-e2e': { description: 'meta server', command: 'node', args: [scriptPath], lifecycle: 'keep-alive' },
},
}),
'utf8'
);

// Pin only the metadata path (not the socket, to stay under the unix socket length limit).
const env = { ...process.env, MCPORTER_NO_FORCE_EXIT: '1', MCPORTER_DAEMON_METADATA: metadataPath };
const children: ChildProcess[] = [];
const startForeground = (): ChildProcess => {
const child = spawn(process.execPath, [CLI_ENTRY, '--config', configPath, 'daemon', 'start', '--foreground'], {
env,
stdio: 'ignore',
});
children.push(child);
return child;
};

try {
const first = startForeground();
const firstPid = JSON.parse(await readFileWithRetries(metadataPath, 50)).pid as number;
expect(firstPid).toBe(first.pid);

await fs.rm(metadataPath, { force: true });

const replacement = startForeground();
await new Promise((resolve) => setTimeout(resolve, 5_000));

const ownerPid = JSON.parse(await readFileWithRetries(metadataPath, 50)).pid as number;
expect(ownerPid).toBe(replacement.pid);
expect(replacement.exitCode).toBeNull();
} finally {
for (const child of children) {
child.kill('SIGKILL');
}
await runCli(['daemon', 'stop'], configPath, { MCPORTER_DAEMON_METADATA: metadataPath }).catch(() => {});
await fs.rm(tempDir, { recursive: true, force: true }).catch(() => {});
}
}, 40_000);
});
Loading