Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/plugins/query_enhancements/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export const API = {
INDEXES: `${BASE_API}/remote_cluster/indexes`,
},
},
PPL_CANCEL: `${BASE_API}/ppl/cancel`,
PPL_GRAMMAR: `${BASE_API}/ppl/grammar`,
AGENT_API: {
CONFIG_EXISTS: `${BASE_API_ASSISTANT}/agent_config/_exists`,
Expand Down
1 change: 1 addition & 0 deletions src/plugins/query_enhancements/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export interface EnhancedFetchContext {
pollQueryResultsParams?: PollQueryResultsParams;
timeRange?: TimeRange;
options?: Record<string, unknown>;
queryId?: string;
};
}

Expand Down
47 changes: 33 additions & 14 deletions src/plugins/query_enhancements/common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export const fetch = (context: EnhancedFetchContext, query: Query, aggConfig?: Q
pollQueryResultsParams: context.body?.pollQueryResultsParams,
timeRange: context.body?.timeRange,
...(highlight && { highlight }),
...(context.body?.queryId && { queryId: context.body.queryId }),
});

return from(
Expand All @@ -89,20 +90,38 @@ export const fetch = (context: EnhancedFetchContext, query: Query, aggConfig?: Q
signal,
})
.catch(async (error) => {
if (error.name === 'AbortError' && context.body?.pollQueryResultsParams?.queryId) {
// Cancel job
try {
await http.fetch({
method: 'DELETE',
path: API.DATA_SOURCE.ASYNC_JOBS,
query: {
id: query.dataset?.dataSource?.id,
queryId: context.body?.pollQueryResultsParams.queryId,
},
});
} catch (cancelError) {
// eslint-disable-next-line no-console
console.error('Failed to cancel query:', cancelError);
if (error.name === 'AbortError') {
if (context.body?.pollQueryResultsParams?.queryId) {
// Cancel async job
try {
await http.fetch({
method: 'DELETE',
path: API.DATA_SOURCE.ASYNC_JOBS,
query: {
id: query.dataset?.dataSource?.id,
queryId: context.body?.pollQueryResultsParams.queryId,
},
});
} catch (cancelError) {
// eslint-disable-next-line no-console
console.error('Failed to cancel async query:', cancelError);
}
} else if (context.body?.queryId) {
// Fire-and-forget: notify backend to cancel the PPL task.
// No need to await — the UI should move on immediately.
http
.fetch({
method: 'POST',
path: API.PPL_CANCEL,
body: JSON.stringify({
queryId: context.body.queryId,
dataSourceId: query.dataset?.dataSource?.id,
}),
})
.catch((cancelError) => {
// eslint-disable-next-line no-console
console.error('Failed to cancel PPL query:', cancelError);
});
}
}
throw error;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/

import { trimEnd } from 'lodash';
import { v4 as uuidv4 } from 'uuid';
import { from, Observable } from 'rxjs';
import { first, switchMap } from 'rxjs/operators';
import { formatTimePickerDate, Query, UI_SETTINGS } from '../../../data/common';
Expand Down Expand Up @@ -54,13 +55,15 @@ export class PPLSearchInterceptor extends SearchInterceptor {
strategy?: string
): Observable<IOpenSearchDashboardsSearchResponse> {
const { id, ...searchRequest } = request;
const isAsync = strategy === SEARCH_STRATEGY.PPL_ASYNC;
const context: EnhancedFetchContext = {
http: this.deps.http,
path: trimEnd(`${API.SEARCH}/${strategy}`),
signal,
body: {
pollQueryResultsParams: request.params?.pollQueryResultsParams,
timeRange: request.params?.body?.timeRange,
...(!isAsync && { queryId: uuidv4() }),
},
};

Expand Down
3 changes: 3 additions & 0 deletions src/plugins/query_enhancements/server/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { API, URI } from '../../common';
import { registerQueryAssistRoutes } from './query_assist';
import { registerDataSourceConnectionsRoutes } from './data_source_connection';
import { registerResourceRoutes } from './resources';
import { registerPPLCancelRoute } from './ppl_cancel';

/**
* Coerce status code to 503 for 500 errors from dependency services. Only use
Expand Down Expand Up @@ -90,6 +91,7 @@ export function defineSearchStrategyRouteProvider(logger: Logger, router: IRoute
timeRange: schema.maybe(schema.object({}, { unknowns: 'allow' })),
options: schema.maybe(schema.object({}, { unknowns: 'allow' })),
highlight: schema.maybe(schema.object({}, { unknowns: 'allow' })),
queryId: schema.maybe(schema.string()),
}),
},
},
Expand Down Expand Up @@ -187,6 +189,7 @@ export function defineRoutes(
registerDataSourceConnectionsRoutes(router, client);
registerQueryAssistRoutes(router);
registerResourceRoutes(router);
registerPPLCancelRoute(router, logger);

definePPLBundleRoute(logger, router);
}
267 changes: 267 additions & 0 deletions src/plugins/query_enhancements/server/routes/ppl_cancel.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import { loggingSystemMock } from '../../../../core/server/mocks';
import { registerPPLCancelRoute } from './ppl_cancel';

describe('registerPPLCancelRoute', () => {
let handler: any;
let logger: ReturnType<typeof loggingSystemMock.create>['get'];

const createResponse = () => ({
ok: jest.fn((v) => v),
notFound: jest.fn((v) => v),
custom: jest.fn((v) => v),
});

const createContext = (transportRequestMock: jest.Mock) =>
({
core: {
opensearch: {
client: {
asCurrentUser: {
transport: {
request: transportRequestMock,
},
},
},
},
},
dataSource: {
opensearch: {
getClient: jest.fn().mockResolvedValue({
transport: {
request: transportRequestMock,
},
}),
},
},
} as any);

const tasksResponseWithMatch = (queryId: string) => ({
body: {
nodes: {
node1: {
tasks: {
'node1:12345': {
description: `PPL [queryId=${queryId}]: source=test`,
action: 'indices:data/read/ppl',
},
},
},
},
},
});

const tasksResponseNoMatch = {
body: {
nodes: {
node1: {
tasks: {
'node1:99999': {
description: 'PPL [queryId=other-id]: source=test',
action: 'indices:data/read/ppl',
},
},
},
},
},
};

const tasksResponseEmpty = {
body: {
nodes: {},
},
};

beforeEach(() => {
const router = {
post: jest.fn((_, h) => {
handler = h;
}),
} as any;
logger = loggingSystemMock.create().get();
registerPPLCancelRoute(router, logger);
});

it('should cancel a matching PPL task successfully', async () => {
const queryId = 'test-uuid-1234';
const transportRequest = jest
.fn()
.mockResolvedValueOnce(tasksResponseWithMatch(queryId))
.mockResolvedValueOnce({ body: { nodes: {} } });

const context = createContext(transportRequest);
const req = { body: { queryId } } as any;
const res = createResponse();

await handler(context, req, res);

expect(transportRequest).toHaveBeenCalledWith({
method: 'GET',
path: '/_tasks',
querystring: { actions: '*ppl*', detailed: 'true' },
});
expect(transportRequest).toHaveBeenCalledWith({
method: 'POST',
path: '/_tasks/node1:12345/_cancel',
});
expect(res.ok).toHaveBeenCalledWith({
body: {
cancelled: true,
queryId,
tasks: [
expect.objectContaining({
taskId: 'node1:12345',
nodeId: 'node1',
}),
],
},
});
});

it('should return 404 when no matching task is found', async () => {
const queryId = 'nonexistent-uuid';
const transportRequest = jest.fn().mockResolvedValueOnce(tasksResponseNoMatch);

const context = createContext(transportRequest);
const req = { body: { queryId } } as any;
const res = createResponse();

await handler(context, req, res);

expect(transportRequest).toHaveBeenCalledTimes(1);
expect(res.notFound).toHaveBeenCalledWith({
body: { message: `No running PPL task found for queryId: ${queryId}` },
});
});

it('should return 404 when no tasks exist', async () => {
const queryId = 'some-uuid';
const transportRequest = jest.fn().mockResolvedValueOnce(tasksResponseEmpty);

const context = createContext(transportRequest);
const req = { body: { queryId } } as any;
const res = createResponse();

await handler(context, req, res);

expect(res.notFound).toHaveBeenCalled();
});

it('should use data source client when dataSourceId is provided', async () => {
const queryId = 'ds-uuid';
const dataSourceId = 'ds-1';
const transportRequest = jest
.fn()
.mockResolvedValueOnce(tasksResponseWithMatch(queryId))
.mockResolvedValueOnce({ body: {} });

const context = createContext(transportRequest);
const req = { body: { queryId, dataSourceId } } as any;
const res = createResponse();

await handler(context, req, res);

expect(context.dataSource.opensearch.getClient).toHaveBeenCalledWith(dataSourceId);
expect(res.ok).toHaveBeenCalled();
});

it('should use core client when dataSourceId is not provided', async () => {
const queryId = 'local-uuid';
const transportRequest = jest
.fn()
.mockResolvedValueOnce(tasksResponseWithMatch(queryId))
.mockResolvedValueOnce({ body: {} });

const context = createContext(transportRequest);
const req = { body: { queryId } } as any;
const res = createResponse();

await handler(context, req, res);

expect(context.dataSource.opensearch.getClient).not.toHaveBeenCalled();
expect(res.ok).toHaveBeenCalled();
});

it('should handle OpenSearch errors and coerce 500 to 503', async () => {
const queryId = 'error-uuid';
const transportRequest = jest.fn().mockRejectedValueOnce({
statusCode: 500,
message: 'internal server error',
});

const context = createContext(transportRequest);
const req = { body: { queryId } } as any;
const res = createResponse();

await handler(context, req, res);

expect(res.custom).toHaveBeenCalledWith({
statusCode: 503,
body: 'internal server error',
});
});

it('should cancel all matching tasks across multiple nodes', async () => {
const queryId = 'multi-task-uuid';
const transportRequest = jest
.fn()
.mockResolvedValueOnce({
body: {
nodes: {
node1: {
tasks: {
'node1:111': {
description: `PPL [queryId=${queryId}]: source=test`,
},
},
},
node2: {
tasks: {
'node2:222': {
description: `PPL [queryId=${queryId}]: source=test | stats count()`,
},
'node2:333': {
description: 'PPL [queryId=other]: source=other',
},
},
},
},
},
})
.mockResolvedValue({ body: {} });

const context = createContext(transportRequest);
const req = { body: { queryId } } as any;
const res = createResponse();

await handler(context, req, res);

// Should cancel both matching tasks, not the non-matching one
expect(transportRequest).toHaveBeenCalledWith({
method: 'POST',
path: '/_tasks/node1:111/_cancel',
});
expect(transportRequest).toHaveBeenCalledWith({
method: 'POST',
path: '/_tasks/node2:222/_cancel',
});
expect(transportRequest).not.toHaveBeenCalledWith({
method: 'POST',
path: '/_tasks/node2:333/_cancel',
});
expect(res.ok).toHaveBeenCalledWith({
body: {
cancelled: true,
queryId,
tasks: expect.arrayContaining([
expect.objectContaining({ taskId: 'node1:111', nodeId: 'node1' }),
expect.objectContaining({ taskId: 'node2:222', nodeId: 'node2' }),
]),
},
});
});
});
Loading
Loading