diff --git a/src/daemon/host.ts b/src/daemon/host.ts index c59a6f25..60c40670 100644 --- a/src/daemon/host.ts +++ b/src/daemon/host.ts @@ -1,8 +1,9 @@ +import { randomUUID } from 'node:crypto'; import fs from 'node:fs/promises'; import net from 'node:net'; import path from 'node:path'; import { loadDaemonConfig, type ServerDefinition } from '../config.js'; -import { writeJsonFile } from '../fs-json.js'; +import { readJsonFile, withFileLock, writeJsonFile } from '../fs-json.js'; import { isKeepAliveServer } from '../lifecycle.js'; import { createRuntime, type Runtime } from '../runtime.js'; import { collectConfigLayers, statConfigMtime } from './config-layers.js'; @@ -83,7 +84,6 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise { logPath: options.logPath, }); - await prepareSocket(options.socketPath); await fs.mkdir(path.dirname(options.metadataPath), { recursive: true }); const configMtimeMs = await statConfigMtime(options.configPath); @@ -188,29 +188,121 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise { }); }); - await new Promise((resolve, reject) => { - server.once('error', reject); - server.listen(options.socketPath, () => { - server.off('error', reject); - resolve(); + let claimed = false; + await withFileLock(`${options.metadataPath}.bind`, async () => { + const live = await probeLiveDaemon(options.socketPath); + if (live && (await metadataMatches(options.metadataPath, live))) { + return; + } + await prepareSocket(options.socketPath); + await new Promise((resolve, reject) => { + server.once('error', reject); + server.listen(options.socketPath, () => { + server.off('error', reject); + resolve(); + }); }); + await writeJsonFile(options.metadataPath, { + pid: process.pid, + socketPath: options.socketPath, + configPath: options.configPath, + configLayers, + startedAt: Date.now(), + logPath: options.logPath ?? null, + configMtimeMs, + }); + claimed = true; }); - await writeJsonFile(options.metadataPath, { - pid: process.pid, - socketPath: options.socketPath, - configPath: options.configPath, - configLayers, - startedAt: Date.now(), - logPath: options.logPath ?? null, - configMtimeMs, - }); + if (!claimed) { + logEvent(logContext, 'Daemon already running for this config; exiting without rebinding.'); + server.close(); + await runtime.close().catch(() => {}); + await disposeLogContext(logContext).catch(() => {}); + process.exit(0); + } process.once('SIGINT', shutdown); process.once('SIGTERM', shutdown); process.once('SIGQUIT', shutdown); } +const DAEMON_PROBE_TIMEOUT_MS = 2_000; + +export async function isDaemonResponding(socketPath: string): Promise { + return (await probeLiveDaemon(socketPath)) !== null; +} + +async function probeLiveDaemon(socketPath: string): Promise { + const status = await probeDaemonStatus(socketPath); + if (!status || status.socketPath !== socketPath || !isProcessAlive(status.pid)) { + return null; + } + return status; +} + +export async function metadataMatches( + metadataPath: string, + live: Pick +): Promise { + try { + const existing = await readJsonFile<{ pid?: number; socketPath?: string }>(metadataPath); + return existing?.pid === live.pid && existing?.socketPath === live.socketPath; + } catch { + return false; + } +} + +function isProcessAlive(pid: number): boolean { + if (!Number.isInteger(pid) || pid <= 0) { + return false; + } + try { + process.kill(pid, 0); + return true; + } catch (error) { + return (error as NodeJS.ErrnoException).code === 'EPERM'; + } +} + +async function probeDaemonStatus(socketPath: string): Promise { + return await new Promise((resolve) => { + const probe = net.createConnection(socketPath); + let buffer = ''; + let settled = false; + const finish = (status: StatusResult | null): void => { + if (settled) { + return; + } + settled = true; + probe.removeAllListeners(); + probe.destroy(); + resolve(status); + }; + const parse = (): StatusResult | null => { + try { + const response = JSON.parse(buffer.trim()) as DaemonResponse; + return response.ok && response.result ? response.result : null; + } catch { + return null; + } + }; + probe.setTimeout(DAEMON_PROBE_TIMEOUT_MS, () => finish(null)); + probe.once('connect', () => { + probe.write(JSON.stringify({ id: randomUUID(), method: 'status', params: {} } satisfies DaemonRequest)); + }); + probe.on('data', (chunk) => { + buffer += chunk.toString(); + const status = parse(); + if (status) { + finish(status); + } + }); + probe.once('end', () => finish(parse())); + probe.once('error', () => finish(null)); + }); +} + async function prepareSocket(socketPath: string): Promise { if (process.platform === 'win32') { return; diff --git a/tests/daemon-host.test.ts b/tests/daemon-host.test.ts index f1dd4634..0a485ef8 100644 --- a/tests/daemon-host.test.ts +++ b/tests/daemon-host.test.ts @@ -1,6 +1,11 @@ -import { describe, expect, it, vi } from 'vitest'; +import { randomUUID } from 'node:crypto'; +import fs from 'node:fs/promises'; +import net from 'node:net'; +import os from 'node:os'; +import path from 'node:path'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import type { ServerDefinition } from '../src/config.js'; -import { __testProcessRequest } from '../src/daemon/host.js'; +import { __testProcessRequest, isDaemonResponding, metadataMatches } from '../src/daemon/host.js'; import type { DaemonRequest } from '../src/daemon/protocol.js'; import type { Runtime } from '../src/runtime.js'; @@ -111,6 +116,107 @@ describe('daemon host request handling', () => { }); }); +const describeUnixSocket = process.platform === 'win32' ? describe.skip : describe; + +describeUnixSocket('isDaemonResponding', () => { + const servers: net.Server[] = []; + const connections: net.Socket[] = []; + const socketPaths: string[] = []; + + function socketPath(): string { + const p = path.join(os.tmpdir(), `mcporter-probe-${randomUUID().slice(0, 8)}.sock`); + socketPaths.push(p); + return p; + } + + function listen(server: net.Server, p: string): Promise { + servers.push(server); + server.on('connection', (socket) => connections.push(socket)); + return new Promise((resolve) => server.listen(p, () => resolve())); + } + + afterEach(async () => { + for (const socket of connections.splice(0)) { + socket.destroy(); + } + for (const server of servers.splice(0)) { + await new Promise((resolve) => server.close(() => resolve())); + } + for (const p of socketPaths.splice(0)) { + await fs.rm(p, { force: true }).catch(() => {}); + } + }); + + function statusServer(result: Record): net.Server { + return net.createServer((socket) => { + socket.on('data', () => socket.end(JSON.stringify({ id: '1', ok: true, result }))); + }); + } + + it('returns true when the socket answers status with a matching socket and live pid', async () => { + const p = socketPath(); + await listen(statusServer({ pid: process.pid, socketPath: p }), p); + expect(await isDaemonResponding(p)).toBe(true); + }); + + it('returns false when the socket accepts but never responds (hung daemon)', async () => { + const p = socketPath(); + await listen( + net.createServer((socket) => socket.pause()), + p + ); + expect(await isDaemonResponding(p)).toBe(false); + }, 5_000); + + it('returns false when status reports a different socket (foreign listener)', async () => { + const p = socketPath(); + await listen(statusServer({ pid: process.pid, socketPath: '/some/other/daemon.sock' }), p); + expect(await isDaemonResponding(p)).toBe(false); + }); + + it('returns false when status reports a dead pid', async () => { + const p = socketPath(); + await listen(statusServer({ pid: 2_147_483_646, socketPath: p }), p); + expect(await isDaemonResponding(p)).toBe(false); + }); + + it('returns false when nothing is listening', async () => { + expect(await isDaemonResponding(socketPath())).toBe(false); + }); +}); + +describe('metadataMatches', () => { + let metadataPath: string; + const live = { pid: 4321, socketPath: '/tmp/daemon.sock' }; + + beforeEach(async () => { + metadataPath = path.join(os.tmpdir(), `mcporter-meta-${randomUUID().slice(0, 8)}.json`); + }); + + afterEach(async () => { + await fs.rm(metadataPath, { force: true }).catch(() => {}); + }); + + it('matches when pid and socket agree', async () => { + await fs.writeFile(metadataPath, JSON.stringify({ pid: 4321, socketPath: '/tmp/daemon.sock' }), 'utf8'); + expect(await metadataMatches(metadataPath, live)).toBe(true); + }); + + it('does not match a different pid', async () => { + await fs.writeFile(metadataPath, JSON.stringify({ pid: 9999, socketPath: '/tmp/daemon.sock' }), 'utf8'); + expect(await metadataMatches(metadataPath, live)).toBe(false); + }); + + it('does not match when metadata is missing', async () => { + expect(await metadataMatches(metadataPath, live)).toBe(false); + }); + + it('does not match when metadata is corrupt', async () => { + await fs.writeFile(metadataPath, '{ not json', 'utf8'); + expect(await metadataMatches(metadataPath, live)).toBe(false); + }); +}); + function createRuntimeDouble(): Pick { return { callTool: vi.fn().mockResolvedValue({ ok: true }), diff --git a/tests/daemon.integration.test.ts b/tests/daemon.integration.test.ts index 8293bca3..a8bdcece 100644 --- a/tests/daemon.integration.test.ts +++ b/tests/daemon.integration.test.ts @@ -1,4 +1,4 @@ -import { execFile } from 'node:child_process'; +import { type ChildProcess, execFile, spawn } from 'node:child_process'; import fs from 'node:fs/promises'; import { createRequire } from 'node:module'; import os from 'node:os'; @@ -173,4 +173,114 @@ await new Promise((resolve) => { await fs.rm(tempDir, { recursive: true, force: true }).catch(() => {}); } }, 40_000); + + it('refuses duplicate binds when foreground starts race outside the client lock', async () => { + await ensureDistBuilt(); + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), 'mcporter-daemon-bind-')); + const scriptPath = path.join(tempDir, 'bind-server.mjs'); + const configPath = path.join(tempDir, 'mcporter.bind.json'); + + const serverSource = `import { McpServer } from '${MCP_SERVER_MODULE}'; +import { StdioServerTransport } from '${STDIO_SERVER_MODULE}'; +const server = new McpServer({ name: 'bind-e2e', version: '1.0.0' }); +server.registerTool('ping', { title: 'ping', description: 'ping', inputSchema: {} }, async () => ({ + content: [{ type: 'text', text: 'pong' }], +})); +await server.connect(new StdioServerTransport()); +await new Promise(() => {}); +`; + await fs.writeFile(scriptPath, serverSource, 'utf8'); + await fs.writeFile( + configPath, + JSON.stringify({ + mcpServers: { + 'bind-e2e': { description: 'bind race server', command: 'node', args: [scriptPath], lifecycle: 'keep-alive' }, + }, + }), + 'utf8' + ); + + const children: ChildProcess[] = []; + try { + await runCli(['daemon', 'stop'], configPath).catch(() => {}); + for (let i = 0; i < 4; i++) { + children.push( + spawn(process.execPath, [CLI_ENTRY, '--config', configPath, 'daemon', 'start', '--foreground'], { + env: { ...process.env, MCPORTER_NO_FORCE_EXIT: '1' }, + stdio: 'ignore', + }) + ); + } + await new Promise((resolve) => setTimeout(resolve, 4_000)); + const alive = children.filter((child) => child.exitCode === null && child.signalCode === null); + expect(alive).toHaveLength(1); + } finally { + for (const child of children) { + child.kill('SIGKILL'); + } + await runCli(['daemon', 'stop'], configPath).catch(() => {}); + await fs.rm(tempDir, { recursive: true, force: true }).catch(() => {}); + } + }, 40_000); + + it('rebinds when a live daemon owns the socket but metadata is missing', async () => { + await ensureDistBuilt(); + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), 'mcporter-daemon-meta-')); + const scriptPath = path.join(tempDir, 'meta-server.mjs'); + const configPath = path.join(tempDir, 'mcporter.meta.json'); + const metadataPath = path.join(tempDir, 'daemon.json'); + + const serverSource = `import { McpServer } from '${MCP_SERVER_MODULE}'; +import { StdioServerTransport } from '${STDIO_SERVER_MODULE}'; +const server = new McpServer({ name: 'meta-e2e', version: '1.0.0' }); +server.registerTool('ping', { title: 'ping', description: 'ping', inputSchema: {} }, async () => ({ + content: [{ type: 'text', text: 'pong' }], +})); +await server.connect(new StdioServerTransport()); +await new Promise(() => {}); +`; + await fs.writeFile(scriptPath, serverSource, 'utf8'); + await fs.writeFile( + configPath, + JSON.stringify({ + mcpServers: { + 'meta-e2e': { description: 'meta server', command: 'node', args: [scriptPath], lifecycle: 'keep-alive' }, + }, + }), + 'utf8' + ); + + // Pin only the metadata path (not the socket, to stay under the unix socket length limit). + const env = { ...process.env, MCPORTER_NO_FORCE_EXIT: '1', MCPORTER_DAEMON_METADATA: metadataPath }; + const children: ChildProcess[] = []; + const startForeground = (): ChildProcess => { + const child = spawn(process.execPath, [CLI_ENTRY, '--config', configPath, 'daemon', 'start', '--foreground'], { + env, + stdio: 'ignore', + }); + children.push(child); + return child; + }; + + try { + const first = startForeground(); + const firstPid = JSON.parse(await readFileWithRetries(metadataPath, 50)).pid as number; + expect(firstPid).toBe(first.pid); + + await fs.rm(metadataPath, { force: true }); + + const replacement = startForeground(); + await new Promise((resolve) => setTimeout(resolve, 5_000)); + + const ownerPid = JSON.parse(await readFileWithRetries(metadataPath, 50)).pid as number; + expect(ownerPid).toBe(replacement.pid); + expect(replacement.exitCode).toBeNull(); + } finally { + for (const child of children) { + child.kill('SIGKILL'); + } + await runCli(['daemon', 'stop'], configPath, { MCPORTER_DAEMON_METADATA: metadataPath }).catch(() => {}); + await fs.rm(tempDir, { recursive: true, force: true }).catch(() => {}); + } + }, 40_000); });