Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.opensearch.sql.common.setting.Settings.Key.CALCITE_ENGINE_ENABLED;
import static org.opensearch.sql.common.setting.Settings.Key.PPL_JOIN_SUBSEARCH_MAXOUT;
import static org.opensearch.sql.common.setting.Settings.Key.PPL_REX_MAX_MATCH_LIMIT;
import static org.opensearch.sql.common.setting.Settings.Key.PPL_SUBSEARCH_MAXOUT;
import static org.opensearch.sql.common.setting.Settings.Key.QUERY_SIZE_LIMIT;

Expand Down Expand Up @@ -127,14 +128,24 @@ public static class Builder {
* org.opensearch.sql.api.parser.PPLQueryParser} reuses the v2 {@code AstBuilder}, which gates
* Calcite-only commands (e.g. {@code visitTableCommand}) on this setting; without the default,
* those commands fail at parse time even when the cluster setting is true.
*
* <p>{@link Settings.Key#PPL_REX_MAX_MATCH_LIMIT} defaults to {@code 10} here because {@code
* AstBuilder.visitRexCommand} reads it unconditionally and unboxes to {@code int} — a {@code
* null} return from {@code getSettingValue} NPEs the planner before any operator-level
* capability check runs. The value mirrors the cluster-side default of {@code 10} registered by
* {@code OpenSearchSettings.PPL_REX_MAX_MATCH_LIMIT_SETTING}. Cluster-side overrides reach this
* map via {@link #setting(String, Object)} — the REST handler reads the live value from {@code
* OpenSearchSettings} and routes it through that existing API, keeping {@link
* UnifiedQueryContext} decoupled from any specific {@link Settings} implementation.
*/
private final Map<Settings.Key, Object> settings =
new HashMap<Settings.Key, Object>(
Map.of(
QUERY_SIZE_LIMIT, SysLimit.DEFAULT.querySizeLimit(),
PPL_SUBSEARCH_MAXOUT, SysLimit.DEFAULT.subsearchLimit(),
PPL_JOIN_SUBSEARCH_MAXOUT, SysLimit.DEFAULT.joinSubsearchLimit(),
CALCITE_ENGINE_ENABLED, true));
CALCITE_ENGINE_ENABLED, true,
PPL_REX_MAX_MATCH_LIMIT, 10));

/**
* Sets the query language frontend to be used.
Expand Down Expand Up @@ -223,6 +234,7 @@ public UnifiedQueryContext build() {
case SQL -> UnifiedSqlSpec.extended();
case PPL -> UnifiedPplSpec.create();
};

Settings settings = buildSettings();
CalcitePlanContext planContext =
CalcitePlanContext.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public void testContextCreationWithDefaults() {
"Settings should have default system limits",
SysLimit.DEFAULT,
SysLimit.fromSettings(context.getSettings()));
assertEquals(
"PPL_REX_MAX_MATCH_LIMIT default should be 10",
Integer.valueOf(10),
context.getSettings().getSettingValue(PPL_REX_MAX_MATCH_LIMIT));
}

@Test
Expand All @@ -43,10 +47,15 @@ public void testContextCreationWithCustomConfig() {
.catalog("opensearch", testSchema)
.cacheMetadata(true)
.setting("plugins.query.size_limit", 200)
.setting("plugins.ppl.rex.max_match.limit", 5)
.build();

Integer querySizeLimit = context.getSettings().getSettingValue(QUERY_SIZE_LIMIT);
assertEquals("Custom setting should be applied", Integer.valueOf(200), querySizeLimit);
assertEquals(
"Cluster-side override for PPL_REX_MAX_MATCH_LIMIT should reach the unified path",
Integer.valueOf(5),
context.getSettings().getSettingValue(PPL_REX_MAX_MATCH_LIMIT));
}

@Test(expected = IllegalArgumentException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ private BiFunction<SQLQueryRequest, RestChannel, Boolean> createSqlAnalyticsRout
if (executor == null) {
return null;
}
cached[0] = new RestUnifiedQueryAction(client, clusterService, executor);
cached[0] =
new RestUnifiedQueryAction(client, clusterService, executor, pluginSettings);
}
return cached[0];
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,17 @@ public class RestUnifiedQueryAction {
private final AnalyticsExecutionEngine analyticsEngine;
private final NodeClient client;
private final ClusterService clusterService;
private final org.opensearch.sql.common.setting.Settings pluginSettings;

public RestUnifiedQueryAction(
NodeClient client,
ClusterService clusterService,
QueryPlanExecutor<RelNode, Iterable<Object[]>> planExecutor) {
QueryPlanExecutor<RelNode, Iterable<Object[]>> planExecutor,
org.opensearch.sql.common.setting.Settings pluginSettings) {
this.client = client;
this.clusterService = clusterService;
this.analyticsEngine = new AnalyticsExecutionEngine(planExecutor);
this.pluginSettings = pluginSettings;
}

/**
Expand Down Expand Up @@ -154,19 +157,42 @@ public void explain(
* Build a lightweight context for parsing only (index name extraction). Does not require cluster
* state or catalog schema.
*/
private static UnifiedQueryContext buildParsingContext(QueryType queryType) {
return UnifiedQueryContext.builder().language(queryType).build();
private UnifiedQueryContext buildParsingContext(QueryType queryType) {
return applyClusterOverrides(UnifiedQueryContext.builder().language(queryType)).build();
}

private UnifiedQueryContext buildContext(QueryType queryType, boolean profiling) {
return UnifiedQueryContext.builder()
.language(queryType)
.catalog(SCHEMA_NAME, OpenSearchSchemaBuilder.buildSchema(clusterService.state()))
.defaultNamespace(SCHEMA_NAME)
.profiling(profiling)
return applyClusterOverrides(
UnifiedQueryContext.builder()
.language(queryType)
.catalog(SCHEMA_NAME, OpenSearchSchemaBuilder.buildSchema(clusterService.state()))
.defaultNamespace(SCHEMA_NAME)
.profiling(profiling))
.build();
}

/**
* Routes operator-configured cluster overrides into the builder via the existing {@code
* setting(String, Object)} API, keeping {@link UnifiedQueryContext} decoupled from any specific
* {@link org.opensearch.sql.common.setting.Settings} implementation.
*
* <p>Currently scoped to {@code plugins.ppl.rex.max_match.limit} — required so the unified path
* honors {@code _cluster/settings} updates for {@code rex max_match} (CalciteRexCommandIT's
* testRexMaxMatchConfigurableLimit). Add keys here if a future PR / IT depends on cluster-side
* fidelity for one of the other planning settings.
*/
private UnifiedQueryContext.Builder applyClusterOverrides(UnifiedQueryContext.Builder builder) {
Object rexLimit =
pluginSettings.getSettingValue(
org.opensearch.sql.common.setting.Settings.Key.PPL_REX_MAX_MATCH_LIMIT);
if (rexLimit != null) {
builder.setting(
org.opensearch.sql.common.setting.Settings.Key.PPL_REX_MAX_MATCH_LIMIT.getKeyValue(),
rexLimit);
}
return builder;
}

/**
* Extract the source index name by parsing the query and visiting the AST to find the Relation
* node. Uses the context's parser which supports both PPL and SQL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class TransportPPLQueryAction

private final NodeClient clientRef;
private final ClusterService clusterServiceRef;
private final org.opensearch.sql.common.setting.Settings pluginSettingsRef;

@Inject
public TransportPPLQueryAction(
Expand All @@ -82,11 +83,13 @@ public TransportPPLQueryAction(

ModulesBuilder modules = new ModulesBuilder();
modules.add(new OpenSearchPluginModule());
org.opensearch.sql.common.setting.Settings pluginSettings =
new OpenSearchSettings(clusterService.getClusterSettings());
this.pluginSettingsRef = pluginSettings;
modules.add(
b -> {
b.bind(NodeClient.class).toInstance(client);
b.bind(org.opensearch.sql.common.setting.Settings.class)
.toInstance(new OpenSearchSettings(clusterService.getClusterSettings()));
b.bind(org.opensearch.sql.common.setting.Settings.class).toInstance(pluginSettings);
b.bind(DataSourceService.class).toInstance(dataSourceService);
});
this.injector = Guice.createInjector(modules);
Expand All @@ -105,7 +108,8 @@ public void setQueryPlanExecutor(
QueryPlanExecutor<RelNode, Iterable<Object[]>> queryPlanExecutor) {
AnalyticsExecutorHolder.set(queryPlanExecutor);
this.unifiedQueryHandler =
new RestUnifiedQueryAction(clientRef, clusterServiceRef, queryPlanExecutor);
new RestUnifiedQueryAction(
clientRef, clusterServiceRef, queryPlanExecutor, pluginSettingsRef);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.junit.Test;
import org.opensearch.analytics.exec.QueryPlanExecutor;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.executor.QueryType;
import org.opensearch.transport.client.node.NodeClient;

Expand All @@ -30,7 +31,8 @@ public void setUp() {
@SuppressWarnings("unchecked")
QueryPlanExecutor<RelNode, Iterable<Object[]>> executor = mock(QueryPlanExecutor.class);
action =
new RestUnifiedQueryAction(mock(NodeClient.class), mock(ClusterService.class), executor);
new RestUnifiedQueryAction(
mock(NodeClient.class), mock(ClusterService.class), executor, mock(Settings.class));
}

@Test
Expand Down
Loading