Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion yarn-project/aztec/src/cli/aztec_start_action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import type { ChainConfig } from '@aztec/stdlib/config';
import { AztecNodeAdminApiSchema, AztecNodeApiSchema, AztecNodeDebugApiSchema } from '@aztec/stdlib/interfaces/client';
import { getPackageVersion } from '@aztec/stdlib/update-checker';
import { getVersioningMiddleware } from '@aztec/stdlib/versioning';
import { getOtelJsonRpcPropagationMiddleware } from '@aztec/telemetry-client';
import { getOtelJsonRpcDiagnosticsMiddleware, getOtelJsonRpcPropagationMiddleware } from '@aztec/telemetry-client';

import { createLocalNetwork } from '../local-network/index.js';
import { github, splash } from '../splash.js';
Expand Down Expand Up @@ -95,6 +95,7 @@ export async function aztecStart(options: any, userLog: LogFn, debugLogger: Logg
// Start the main JSON-RPC server
if (Object.entries(services).length > 0) {
const rpcServer = createNamespacedSafeJsonRpcServer(services, {
diagnostic: getOtelJsonRpcDiagnosticsMiddleware(),
http200OnError: false,
log: debugLogger,
middlewares: [getOtelJsonRpcPropagationMiddleware(), getVersioningMiddleware(versions, versioningOpts)],
Expand Down Expand Up @@ -126,6 +127,7 @@ export async function aztecStart(options: any, userLog: LogFn, debugLogger: Logg
}

const rpcServer = createNamespacedSafeJsonRpcServer(adminServices, {
diagnostic: getOtelJsonRpcDiagnosticsMiddleware(),
http200OnError: false,
log: debugLogger,
middlewares: adminMiddlewares,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,64 @@ describe('SafeJsonRpcServer', () => {
expectError(response, 400, 'Test state failed');
});

it('runs diagnostics around the dispatched RPC function', async () => {
const calls: string[] = [];
server = createSafeJsonRpcServer<TestStateApi>(testState, TestStateSchema, {
diagnostic: async (ctx, next) => {
calls.push(`start:${ctx.method}:${ctx.id}:${ctx.headers['x-test-header']}`);
await next();
calls.push(`end:${ctx.method}`);
},
});

const response = await request(server.getApp().callback())
.post('/')
.send({ jsonrpc: '2.0', method: 'count', params: [], id: 42 })
.set({ 'content-type': 'application/json', 'x-test-header': 'test-value' });

expect(response.status).toBe(200);
expect(response.text).toEqual(JSON.stringify({ jsonrpc, id: 42, result: 2 }));
expect(calls).toEqual(['start:count:42:test-value', 'end:count']);
});

it('runs diagnostics for each request in a batch', async () => {
const methods: string[] = [];
server = createSafeJsonRpcServer<TestStateApi>(testState, TestStateSchema, {
diagnostic: async (ctx, next) => {
methods.push(ctx.method);
await next();
},
maxBatchSize: 10,
});

const response = await sendBatch(
{ jsonrpc: '2.0', method: 'getStatus', params: [], id: 42 },
{ jsonrpc: '2.0', method: 'clear', params: [], id: 43 },
);

expect(response.status).toBe(200);
expect(methods).toEqual(['getStatus', 'clear']);
});

it('lets diagnostics observe handler failures', async () => {
const errors: string[] = [];
server = createSafeJsonRpcServer<TestStateApi>(testState, TestStateSchema, {
diagnostic: async (ctx, next) => {
try {
await next();
} catch (err) {
errors.push(`${ctx.method}:${err instanceof Error ? err.message : String(err)}`);
throw err;
}
},
});

const response = await send({ method: 'fail', params: [] });

expectError(response, 400, 'Test state failed');
expect(errors).toEqual(['fail:Test state failed']);
});

it('fails if sends invalid JSON', async () => {
const response = await send('{');
expectError(response, 400, expect.stringContaining('Parse error'));
Expand Down
34 changes: 23 additions & 11 deletions yarn-project/foundation/src/json-rpc/server/safe_json_rpc_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ export class SafeJsonRpcServer {
config: Partial<SafeJsonRpcServerConfig> = {},
/** Health check function */
private readonly healthCheck: StatusCheckFn = () => true,
/** Additional middlewares */
/** Additional Koa middlewares */
private extraMiddlewares: Application.Middleware[] = [],
/** Additional per-request diagnostics middlewares */
private diagnosticsMiddleware?: DiagnosticsMiddleware,
/** Logger */
private log = createLogger('json-rpc:server'),
) {
Expand Down Expand Up @@ -161,7 +163,7 @@ export class SafeJsonRpcServer {
};
return;
}
const resp = await this.processBatch(ctx.request.body);
const resp = await this.processBatch(ctx.request.body, ctx.request.headers);
if (Array.isArray(resp)) {
ctx.status = 200;
ctx.body = resp;
Expand All @@ -170,7 +172,7 @@ export class SafeJsonRpcServer {
ctx.body = resp;
}
} else {
const resp = await this.processRequest(ctx.request.body);
const resp = await this.processRequest(ctx.request.body, ctx.request.headers);
if ('error' in resp) {
ctx.status = this.config.http200OnError ? 200 : 400;
}
Expand All @@ -182,11 +184,11 @@ export class SafeJsonRpcServer {
return router;
}

private async processBatch(requests: any[]) {
private async processBatch(requests: any[], headers: http.IncomingHttpHeaders = {}) {
if (requests.length === 0) {
return { jsonrpc: '2.0', error: { code: -32600, message: 'Invalid Request' }, id: null };
}
const results = await Promise.allSettled(requests.map(req => this.processRequest(req)));
const results = await Promise.allSettled(requests.map(req => this.processRequest(req, headers)));
return results.map(res => {
if (res.status === 'fulfilled') {
return res.value;
Expand All @@ -197,7 +199,7 @@ export class SafeJsonRpcServer {
});
}

private async processRequest(request: any) {
private async processRequest(request: any, headers: http.IncomingHttpHeaders = {}) {
if (!request || typeof request !== 'object') {
return { jsonrpc: '2.0', error: { code: -32600, message: 'Invalid Request' }, id: null };
}
Expand All @@ -212,7 +214,16 @@ export class SafeJsonRpcServer {
return { jsonrpc, id, error: { code: -32601, message: `Method not found: ${method}` } };
} else {
try {
const result = await this.proxy.call(method, params);
let result: any;

if (this.diagnosticsMiddleware) {
await this.diagnosticsMiddleware({ id: id ?? null, method, params, headers }, async () => {
result = await this.proxy.call(method, params);
});
} else {
result = await this.proxy.call(method, params);
}

return { jsonrpc, id, result };
} catch (err: any) {
if (err && err instanceof ZodError) {
Expand Down Expand Up @@ -383,6 +394,7 @@ function makeAggregateHealthcheck(namedHandlers: NamespacedApiHandlers, log?: Lo
export type SafeJsonRpcServerOptions = Partial<
SafeJsonRpcServerConfig & {
healthCheck: StatusCheckFn;
diagnostic: DiagnosticsMiddleware;
log: Logger;
middlewares: Application.Middleware[];
}
Expand All @@ -397,20 +409,20 @@ export function createNamespacedSafeJsonRpcServer(
handlers: NamespacedApiHandlers,
options: Omit<SafeJsonRpcServerOptions, 'healthcheck'> = {},
): SafeJsonRpcServer {
const { middlewares, log } = options;
const { diagnostic, middlewares, log } = options;
const proxy = new NamespacedSafeJsonProxy(handlers);
const healthCheck = makeAggregateHealthcheck(handlers, log);
return new SafeJsonRpcServer(proxy, options, healthCheck, middlewares, log);
return new SafeJsonRpcServer(proxy, options, healthCheck, middlewares, diagnostic, log);
}

export function createSafeJsonRpcServer<T extends object = any>(
handler: T,
schema: ApiSchemaFor<T>,
options: SafeJsonRpcServerOptions = {},
) {
const { log, healthCheck, middlewares: extraMiddlewares } = options;
const { diagnostic, log, healthCheck, middlewares: extraMiddlewares } = options;
const proxy = new SafeJsonProxy(handler, schema);
return new SafeJsonRpcServer(proxy, options, healthCheck, extraMiddlewares, log);
return new SafeJsonRpcServer(proxy, options, healthCheck, extraMiddlewares, diagnostic, log);
}

/**
Expand Down
43 changes: 42 additions & 1 deletion yarn-project/telemetry-client/src/otel_propagation.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { DiagnosticsMiddleware } from '@aztec/foundation/json-rpc/server';

import { ROOT_CONTEXT, type Span, SpanKind, SpanStatusCode, propagation } from '@opentelemetry/api';
import type Koa from 'koa';

Expand All @@ -17,7 +19,7 @@ export function getOtelJsonRpcPropagationMiddleware(
const context = propagation.extract(ROOT_CONTEXT, ctx.request.headers);
const method = (ctx.request.body as any)?.method;
return tracer.startActiveSpan(
`JsonRpcServer.${method ?? 'unknown'}`,
`JsonRpcServer.${method ?? 'batch'}`,
{ kind: SpanKind.SERVER },
context,
async (span: Span): Promise<void> => {
Expand Down Expand Up @@ -48,3 +50,42 @@ export function getOtelJsonRpcPropagationMiddleware(
);
};
}

export function getOtelJsonRpcDiagnosticsMiddleware(): DiagnosticsMiddleware {
return function otelJsonRpcDiagnostics(ctx, next) {
const [namespace, method] = splitNamespace(ctx.method);
const scope = namespace ?? 'UnknownHandler';
const tracer = getTelemetryClient().getTracer(scope);
return tracer.startActiveSpan(
`${scope}.${method}`,
{ kind: SpanKind.INTERNAL, attributes: { [ATTR_JSONRPC_METHOD]: ctx.method } },
async span => {
if (ctx.id !== null) {
span.setAttribute(ATTR_JSONRPC_REQUEST_ID, ctx.id);
}

try {
await next();
span.setStatus({ code: SpanStatusCode.OK });
} catch (err) {
span.setStatus({ code: SpanStatusCode.ERROR, message: err instanceof Error ? err.message : String(err) });
if (typeof err === 'string' || err instanceof Error) {
span.recordException(err);
}
throw err;
} finally {
span.end();
}
},
);
};
}

function splitNamespace(fullMethod: string): [namespace: string | undefined, method: string] {
const idx = fullMethod.indexOf('_');
if (idx > -1) {
return [fullMethod.slice(0, idx), fullMethod.slice(idx + 1)];
} else {
return [undefined, fullMethod];
}
}
Loading