From d6a77bb9e6048135f1893a450484e4ad563b8ea5 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Wed, 25 Mar 2026 11:41:18 -0700 Subject: [PATCH 1/4] [Query] Add PPL query cancellation support via Discover cancel button Enable cancellation of in-flight PPL queries from the Discover UI cancel button by leveraging the OpenSearch task framework. - Generate a client-side UUID (queryId) for each synchronous PPL query - Pass queryId through the full request pipeline to OpenSearch PPL API - Add POST /api/enhancements/ppl/cancel server route that lists PPL tasks via _tasks API, matches by queryId in task description, and cancels all matching tasks - Wire the existing Discover abort flow to call the cancel route on abort - Cancel all matching tasks (data + histogram queries share the same queryId) Depends on opensearch-project/sql#5254 for backend task registration. Signed-off-by: Kai Huang --- .../query_enhancements/common/constants.ts | 1 + .../query_enhancements/common/types.ts | 1 + .../query_enhancements/common/utils.ts | 46 ++- .../public/search/ppl_search_interceptor.ts | 3 + .../query_enhancements/server/routes/index.ts | 3 + .../server/routes/ppl_cancel.test.ts | 267 ++++++++++++++++++ .../server/routes/ppl_cancel.ts | 99 +++++++ .../query_enhancements/server/utils/facet.ts | 3 +- 8 files changed, 408 insertions(+), 15 deletions(-) create mode 100644 src/plugins/query_enhancements/server/routes/ppl_cancel.test.ts create mode 100644 src/plugins/query_enhancements/server/routes/ppl_cancel.ts diff --git a/src/plugins/query_enhancements/common/constants.ts b/src/plugins/query_enhancements/common/constants.ts index 45aa36aa35e2..8613ae726b29 100644 --- a/src/plugins/query_enhancements/common/constants.ts +++ b/src/plugins/query_enhancements/common/constants.ts @@ -42,6 +42,7 @@ export const API = { INDEXES: `${BASE_API}/remote_cluster/indexes`, }, }, + PPL_CANCEL: `${BASE_API}/ppl/cancel`, PPL_GRAMMAR: `${BASE_API}/ppl/grammar`, AGENT_API: { CONFIG_EXISTS: `${BASE_API_ASSISTANT}/agent_config/_exists`, diff --git a/src/plugins/query_enhancements/common/types.ts b/src/plugins/query_enhancements/common/types.ts index 93807dda4732..03f0807d0d14 100644 --- a/src/plugins/query_enhancements/common/types.ts +++ b/src/plugins/query_enhancements/common/types.ts @@ -30,6 +30,7 @@ export interface EnhancedFetchContext { pollQueryResultsParams?: PollQueryResultsParams; timeRange?: TimeRange; options?: Record; + queryId?: string; }; } diff --git a/src/plugins/query_enhancements/common/utils.ts b/src/plugins/query_enhancements/common/utils.ts index d940260b2f1b..e510c821a866 100644 --- a/src/plugins/query_enhancements/common/utils.ts +++ b/src/plugins/query_enhancements/common/utils.ts @@ -78,6 +78,7 @@ export const fetch = (context: EnhancedFetchContext, query: Query, aggConfig?: Q pollQueryResultsParams: context.body?.pollQueryResultsParams, timeRange: context.body?.timeRange, ...(highlight && { highlight }), + ...(context.body?.queryId && { queryId: context.body.queryId }), }); return from( @@ -89,20 +90,37 @@ export const fetch = (context: EnhancedFetchContext, query: Query, aggConfig?: Q signal, }) .catch(async (error) => { - if (error.name === 'AbortError' && context.body?.pollQueryResultsParams?.queryId) { - // Cancel job - try { - await http.fetch({ - method: 'DELETE', - path: API.DATA_SOURCE.ASYNC_JOBS, - query: { - id: query.dataset?.dataSource?.id, - queryId: context.body?.pollQueryResultsParams.queryId, - }, - }); - } catch (cancelError) { - // eslint-disable-next-line no-console - console.error('Failed to cancel query:', cancelError); + if (error.name === 'AbortError') { + if (context.body?.pollQueryResultsParams?.queryId) { + // Cancel async job + try { + await http.fetch({ + method: 'DELETE', + path: API.DATA_SOURCE.ASYNC_JOBS, + query: { + id: query.dataset?.dataSource?.id, + queryId: context.body?.pollQueryResultsParams.queryId, + }, + }); + } catch (cancelError) { + // eslint-disable-next-line no-console + console.error('Failed to cancel async query:', cancelError); + } + } else if (context.body?.queryId) { + // Cancel synchronous PPL query via task cancellation + try { + await http.fetch({ + method: 'POST', + path: API.PPL_CANCEL, + body: JSON.stringify({ + queryId: context.body.queryId, + dataSourceId: query.dataset?.dataSource?.id, + }), + }); + } catch (cancelError) { + // eslint-disable-next-line no-console + console.error('Failed to cancel PPL query:', cancelError); + } } } throw error; diff --git a/src/plugins/query_enhancements/public/search/ppl_search_interceptor.ts b/src/plugins/query_enhancements/public/search/ppl_search_interceptor.ts index c5cee7251476..db2ca7410251 100644 --- a/src/plugins/query_enhancements/public/search/ppl_search_interceptor.ts +++ b/src/plugins/query_enhancements/public/search/ppl_search_interceptor.ts @@ -4,6 +4,7 @@ */ import { trimEnd } from 'lodash'; +import { v4 as uuidv4 } from 'uuid'; import { from, Observable } from 'rxjs'; import { first, switchMap } from 'rxjs/operators'; import { formatTimePickerDate, Query, UI_SETTINGS } from '../../../data/common'; @@ -54,6 +55,7 @@ export class PPLSearchInterceptor extends SearchInterceptor { strategy?: string ): Observable { const { id, ...searchRequest } = request; + const isAsync = strategy === SEARCH_STRATEGY.PPL_ASYNC; const context: EnhancedFetchContext = { http: this.deps.http, path: trimEnd(`${API.SEARCH}/${strategy}`), @@ -61,6 +63,7 @@ export class PPLSearchInterceptor extends SearchInterceptor { body: { pollQueryResultsParams: request.params?.pollQueryResultsParams, timeRange: request.params?.body?.timeRange, + ...(!isAsync && { queryId: uuidv4() }), }, }; diff --git a/src/plugins/query_enhancements/server/routes/index.ts b/src/plugins/query_enhancements/server/routes/index.ts index 049c672e2e3a..68fd458a124c 100644 --- a/src/plugins/query_enhancements/server/routes/index.ts +++ b/src/plugins/query_enhancements/server/routes/index.ts @@ -16,6 +16,7 @@ import { API, URI } from '../../common'; import { registerQueryAssistRoutes } from './query_assist'; import { registerDataSourceConnectionsRoutes } from './data_source_connection'; import { registerResourceRoutes } from './resources'; +import { registerPPLCancelRoute } from './ppl_cancel'; /** * Coerce status code to 503 for 500 errors from dependency services. Only use @@ -90,6 +91,7 @@ export function defineSearchStrategyRouteProvider(logger: Logger, router: IRoute timeRange: schema.maybe(schema.object({}, { unknowns: 'allow' })), options: schema.maybe(schema.object({}, { unknowns: 'allow' })), highlight: schema.maybe(schema.object({}, { unknowns: 'allow' })), + queryId: schema.maybe(schema.string()), }), }, }, @@ -187,6 +189,7 @@ export function defineRoutes( registerDataSourceConnectionsRoutes(router, client); registerQueryAssistRoutes(router); registerResourceRoutes(router); + registerPPLCancelRoute(router, logger); definePPLBundleRoute(logger, router); } diff --git a/src/plugins/query_enhancements/server/routes/ppl_cancel.test.ts b/src/plugins/query_enhancements/server/routes/ppl_cancel.test.ts new file mode 100644 index 000000000000..24dd867be48f --- /dev/null +++ b/src/plugins/query_enhancements/server/routes/ppl_cancel.test.ts @@ -0,0 +1,267 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { loggingSystemMock } from '../../../../core/server/mocks'; +import { registerPPLCancelRoute } from './ppl_cancel'; + +describe('registerPPLCancelRoute', () => { + let handler: any; + let logger: ReturnType['get']; + + const createResponse = () => ({ + ok: jest.fn((v) => v), + notFound: jest.fn((v) => v), + custom: jest.fn((v) => v), + }); + + const createContext = (transportRequestMock: jest.Mock) => + ({ + core: { + opensearch: { + client: { + asCurrentUser: { + transport: { + request: transportRequestMock, + }, + }, + }, + }, + }, + dataSource: { + opensearch: { + getClient: jest.fn().mockResolvedValue({ + transport: { + request: transportRequestMock, + }, + }), + }, + }, + } as any); + + const tasksResponseWithMatch = (queryId: string) => ({ + body: { + nodes: { + node1: { + tasks: { + 'node1:12345': { + description: `PPL [queryId=${queryId}]: source=test`, + action: 'indices:data/read/ppl', + }, + }, + }, + }, + }, + }); + + const tasksResponseNoMatch = { + body: { + nodes: { + node1: { + tasks: { + 'node1:99999': { + description: 'PPL [queryId=other-id]: source=test', + action: 'indices:data/read/ppl', + }, + }, + }, + }, + }, + }; + + const tasksResponseEmpty = { + body: { + nodes: {}, + }, + }; + + beforeEach(() => { + const router = { + post: jest.fn((_, h) => { + handler = h; + }), + } as any; + logger = loggingSystemMock.create().get(); + registerPPLCancelRoute(router, logger); + }); + + it('should cancel a matching PPL task successfully', async () => { + const queryId = 'test-uuid-1234'; + const transportRequest = jest + .fn() + .mockResolvedValueOnce(tasksResponseWithMatch(queryId)) + .mockResolvedValueOnce({ body: { nodes: {} } }); + + const context = createContext(transportRequest); + const req = { body: { queryId } } as any; + const res = createResponse(); + + await handler(context, req, res); + + expect(transportRequest).toHaveBeenCalledWith({ + method: 'GET', + path: '/_tasks', + querystring: { actions: '*ppl*', detailed: 'true' }, + }); + expect(transportRequest).toHaveBeenCalledWith({ + method: 'POST', + path: '/_tasks/node1:12345/_cancel', + }); + expect(res.ok).toHaveBeenCalledWith({ + body: { + cancelled: true, + queryId, + tasks: [ + expect.objectContaining({ + taskId: 'node1:12345', + nodeId: 'node1', + }), + ], + }, + }); + }); + + it('should return 404 when no matching task is found', async () => { + const queryId = 'nonexistent-uuid'; + const transportRequest = jest.fn().mockResolvedValueOnce(tasksResponseNoMatch); + + const context = createContext(transportRequest); + const req = { body: { queryId } } as any; + const res = createResponse(); + + await handler(context, req, res); + + expect(transportRequest).toHaveBeenCalledTimes(1); + expect(res.notFound).toHaveBeenCalledWith({ + body: { message: `No running PPL task found for queryId: ${queryId}` }, + }); + }); + + it('should return 404 when no tasks exist', async () => { + const queryId = 'some-uuid'; + const transportRequest = jest.fn().mockResolvedValueOnce(tasksResponseEmpty); + + const context = createContext(transportRequest); + const req = { body: { queryId } } as any; + const res = createResponse(); + + await handler(context, req, res); + + expect(res.notFound).toHaveBeenCalled(); + }); + + it('should use data source client when dataSourceId is provided', async () => { + const queryId = 'ds-uuid'; + const dataSourceId = 'ds-1'; + const transportRequest = jest + .fn() + .mockResolvedValueOnce(tasksResponseWithMatch(queryId)) + .mockResolvedValueOnce({ body: {} }); + + const context = createContext(transportRequest); + const req = { body: { queryId, dataSourceId } } as any; + const res = createResponse(); + + await handler(context, req, res); + + expect(context.dataSource.opensearch.getClient).toHaveBeenCalledWith(dataSourceId); + expect(res.ok).toHaveBeenCalled(); + }); + + it('should use core client when dataSourceId is not provided', async () => { + const queryId = 'local-uuid'; + const transportRequest = jest + .fn() + .mockResolvedValueOnce(tasksResponseWithMatch(queryId)) + .mockResolvedValueOnce({ body: {} }); + + const context = createContext(transportRequest); + const req = { body: { queryId } } as any; + const res = createResponse(); + + await handler(context, req, res); + + expect(context.dataSource.opensearch.getClient).not.toHaveBeenCalled(); + expect(res.ok).toHaveBeenCalled(); + }); + + it('should handle OpenSearch errors and coerce 500 to 503', async () => { + const queryId = 'error-uuid'; + const transportRequest = jest.fn().mockRejectedValueOnce({ + statusCode: 500, + message: 'internal server error', + }); + + const context = createContext(transportRequest); + const req = { body: { queryId } } as any; + const res = createResponse(); + + await handler(context, req, res); + + expect(res.custom).toHaveBeenCalledWith({ + statusCode: 503, + body: 'internal server error', + }); + }); + + it('should cancel all matching tasks across multiple nodes', async () => { + const queryId = 'multi-task-uuid'; + const transportRequest = jest + .fn() + .mockResolvedValueOnce({ + body: { + nodes: { + node1: { + tasks: { + 'node1:111': { + description: `PPL [queryId=${queryId}]: source=test`, + }, + }, + }, + node2: { + tasks: { + 'node2:222': { + description: `PPL [queryId=${queryId}]: source=test | stats count()`, + }, + 'node2:333': { + description: 'PPL [queryId=other]: source=other', + }, + }, + }, + }, + }, + }) + .mockResolvedValue({ body: {} }); + + const context = createContext(transportRequest); + const req = { body: { queryId } } as any; + const res = createResponse(); + + await handler(context, req, res); + + // Should cancel both matching tasks, not the non-matching one + expect(transportRequest).toHaveBeenCalledWith({ + method: 'POST', + path: '/_tasks/node1:111/_cancel', + }); + expect(transportRequest).toHaveBeenCalledWith({ + method: 'POST', + path: '/_tasks/node2:222/_cancel', + }); + expect(transportRequest).not.toHaveBeenCalledWith({ + method: 'POST', + path: '/_tasks/node2:333/_cancel', + }); + expect(res.ok).toHaveBeenCalledWith({ + body: { + cancelled: true, + queryId, + tasks: expect.arrayContaining([ + expect.objectContaining({ taskId: 'node1:111', nodeId: 'node1' }), + expect.objectContaining({ taskId: 'node2:222', nodeId: 'node2' }), + ]), + }, + }); + }); +}); diff --git a/src/plugins/query_enhancements/server/routes/ppl_cancel.ts b/src/plugins/query_enhancements/server/routes/ppl_cancel.ts new file mode 100644 index 000000000000..c29bdd222de7 --- /dev/null +++ b/src/plugins/query_enhancements/server/routes/ppl_cancel.ts @@ -0,0 +1,99 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { schema } from '@osd/config-schema'; +import { IRouter, Logger } from 'opensearch-dashboards/server'; +import { API } from '../../common'; + +export function registerPPLCancelRoute(router: IRouter, logger: Logger) { + router.post( + { + path: API.PPL_CANCEL, + validate: { + body: schema.object({ + queryId: schema.string(), + dataSourceId: schema.maybe(schema.nullable(schema.string())), + }), + }, + }, + async (context, request, response) => { + const { queryId, dataSourceId } = request.body; + try { + const client = dataSourceId + ? await context.dataSource.opensearch.getClient(dataSourceId) + : context.core.opensearch.client.asCurrentUser; + + // List all PPL tasks with detailed descriptions + const tasksResponse = await client.transport.request({ + method: 'GET', + path: '/_tasks', + querystring: { + actions: '*ppl*', + detailed: 'true', + }, + }); + + const tasksBody = tasksResponse.body || tasksResponse; + + // Search across all nodes for tasks whose description contains queryId=. + // A single PPL search fires two sequential queries (data + histogram aggregation) + // that share the same queryId, so we must cancel all matching tasks. + const targetPattern = `queryId=${queryId}`; + const matchingTasks: Array<{ taskId: string; nodeId: string }> = []; + + const nodes = tasksBody.nodes || {}; + for (const [nodeId, nodeInfo] of Object.entries(nodes) as Array<[string, any]>) { + const tasks = nodeInfo.tasks || {}; + for (const [taskId, taskInfo] of Object.entries(tasks) as Array<[string, any]>) { + if (taskInfo.description && taskInfo.description.includes(targetPattern)) { + matchingTasks.push({ taskId, nodeId }); + } + } + } + + if (matchingTasks.length === 0) { + return response.notFound({ + body: { + message: `No running PPL task found for queryId: ${queryId}`, + }, + }); + } + + // Cancel all matching tasks + const cancelResults = await Promise.all( + matchingTasks.map(async ({ taskId, nodeId }) => { + const cancelResponse = await client.transport.request({ + method: 'POST', + path: `/_tasks/${taskId}/_cancel`, + }); + logger.info( + `PPL query cancelled: queryId=${queryId}, taskId=${taskId}, nodeId=${nodeId}` + ); + return { + taskId, + nodeId, + cancelResponse: cancelResponse.body || cancelResponse, + }; + }) + ); + + return response.ok({ + body: { + cancelled: true, + queryId, + tasks: cancelResults, + }, + }); + } catch (error: any) { + logger.error(`Failed to cancel PPL query queryId=${queryId}: ${error.message}`); + const statusCode = error.statusCode === 500 ? 503 : error.statusCode || 503; + return response.custom({ + statusCode, + body: error.message, + }); + } + } + ); +} diff --git a/src/plugins/query_enhancements/server/utils/facet.ts b/src/plugins/query_enhancements/server/utils/facet.ts index ce9ffe54bb80..faf3dd073440 100644 --- a/src/plugins/query_enhancements/server/utils/facet.ts +++ b/src/plugins/query_enhancements/server/utils/facet.ts @@ -54,7 +54,7 @@ export class Facet { const query: Query = request.body.query; const dataSource = query.dataset?.dataSource; const meta = dataSource?.meta; - const { format, lang, fetchSize } = request.body; + const { format, lang, fetchSize, queryId } = request.body; const compressionHeaders = this.getCompressionHeaders(); const { highlight } = request.body; const params = { @@ -67,6 +67,7 @@ export class Facet { }), ...(lang && { lang }), ...(highlight && { highlight }), + ...(queryId && { queryId }), }, ...(format !== 'jdbc' && { format }), ...(Object.keys(compressionHeaders).length > 0 && { headers: compressionHeaders }), From a47e90ca07f37e6f30980ca4bbb3505cce12dfb2 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Wed, 25 Mar 2026 15:22:28 -0700 Subject: [PATCH 2/4] Fire-and-forget PPL cancel request to avoid blocking UI Remove await from the cancel HTTP call so the AbortError propagates immediately. The cancel is a best-effort notification to the backend and should not block the frontend from moving on. Signed-off-by: Kai Huang --- src/plugins/query_enhancements/common/utils.ts | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/plugins/query_enhancements/common/utils.ts b/src/plugins/query_enhancements/common/utils.ts index e510c821a866..8897abf18d70 100644 --- a/src/plugins/query_enhancements/common/utils.ts +++ b/src/plugins/query_enhancements/common/utils.ts @@ -107,20 +107,21 @@ export const fetch = (context: EnhancedFetchContext, query: Query, aggConfig?: Q console.error('Failed to cancel async query:', cancelError); } } else if (context.body?.queryId) { - // Cancel synchronous PPL query via task cancellation - try { - await http.fetch({ + // Fire-and-forget: notify backend to cancel the PPL task. + // No need to await — the UI should move on immediately. + http + .fetch({ method: 'POST', path: API.PPL_CANCEL, body: JSON.stringify({ queryId: context.body.queryId, dataSourceId: query.dataset?.dataSource?.id, }), + }) + .catch((cancelError) => { + // eslint-disable-next-line no-console + console.error('Failed to cancel PPL query:', cancelError); }); - } catch (cancelError) { - // eslint-disable-next-line no-console - console.error('Failed to cancel PPL query:', cancelError); - } } } throw error; From 0216993fdeb5e478d699d0e963ec8219e44ac883 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Wed, 25 Mar 2026 15:35:28 -0700 Subject: [PATCH 3/4] Use exact PPL action name instead of wildcard in task filter Use 'cluster:admin/opensearch/ppl' instead of '*ppl*' to avoid matching unrelated tasks. The _tasks API does not support filtering by description or queryId, so client-side matching remains necessary. Signed-off-by: Kai Huang --- src/plugins/query_enhancements/server/routes/ppl_cancel.test.ts | 2 +- src/plugins/query_enhancements/server/routes/ppl_cancel.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/plugins/query_enhancements/server/routes/ppl_cancel.test.ts b/src/plugins/query_enhancements/server/routes/ppl_cancel.test.ts index 24dd867be48f..ddb6d86d2ee8 100644 --- a/src/plugins/query_enhancements/server/routes/ppl_cancel.test.ts +++ b/src/plugins/query_enhancements/server/routes/ppl_cancel.test.ts @@ -102,7 +102,7 @@ describe('registerPPLCancelRoute', () => { expect(transportRequest).toHaveBeenCalledWith({ method: 'GET', path: '/_tasks', - querystring: { actions: '*ppl*', detailed: 'true' }, + querystring: { actions: 'cluster:admin/opensearch/ppl', detailed: 'true' }, }); expect(transportRequest).toHaveBeenCalledWith({ method: 'POST', diff --git a/src/plugins/query_enhancements/server/routes/ppl_cancel.ts b/src/plugins/query_enhancements/server/routes/ppl_cancel.ts index c29bdd222de7..5fec43a27022 100644 --- a/src/plugins/query_enhancements/server/routes/ppl_cancel.ts +++ b/src/plugins/query_enhancements/server/routes/ppl_cancel.ts @@ -30,7 +30,7 @@ export function registerPPLCancelRoute(router: IRouter, logger: Logger) { method: 'GET', path: '/_tasks', querystring: { - actions: '*ppl*', + actions: 'cluster:admin/opensearch/ppl', detailed: 'true', }, }); From 95a6f6dce32f9b24171943395981a205e33d4c1a Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Wed, 25 Mar 2026 15:52:41 -0700 Subject: [PATCH 4/4] Revert action filter back to wildcard *ppl* The exact action name is less resilient to future changes. The wildcard is sufficient since we still match by queryId in the task description. Signed-off-by: Kai Huang --- src/plugins/query_enhancements/server/routes/ppl_cancel.test.ts | 2 +- src/plugins/query_enhancements/server/routes/ppl_cancel.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/plugins/query_enhancements/server/routes/ppl_cancel.test.ts b/src/plugins/query_enhancements/server/routes/ppl_cancel.test.ts index ddb6d86d2ee8..24dd867be48f 100644 --- a/src/plugins/query_enhancements/server/routes/ppl_cancel.test.ts +++ b/src/plugins/query_enhancements/server/routes/ppl_cancel.test.ts @@ -102,7 +102,7 @@ describe('registerPPLCancelRoute', () => { expect(transportRequest).toHaveBeenCalledWith({ method: 'GET', path: '/_tasks', - querystring: { actions: 'cluster:admin/opensearch/ppl', detailed: 'true' }, + querystring: { actions: '*ppl*', detailed: 'true' }, }); expect(transportRequest).toHaveBeenCalledWith({ method: 'POST', diff --git a/src/plugins/query_enhancements/server/routes/ppl_cancel.ts b/src/plugins/query_enhancements/server/routes/ppl_cancel.ts index 5fec43a27022..c29bdd222de7 100644 --- a/src/plugins/query_enhancements/server/routes/ppl_cancel.ts +++ b/src/plugins/query_enhancements/server/routes/ppl_cancel.ts @@ -30,7 +30,7 @@ export function registerPPLCancelRoute(router: IRouter, logger: Logger) { method: 'GET', path: '/_tasks', querystring: { - actions: 'cluster:admin/opensearch/ppl', + actions: '*ppl*', detailed: 'true', }, });