From ee5da5168f39c15def1e8d4b605010974bcc038a Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Thu, 9 Apr 2026 15:27:17 +0100 Subject: [PATCH 1/3] Throw exception if not get the pre-built hash table --- cpp/velox/config/VeloxConfig.h | 3 ++ cpp/velox/substrait/SubstraitToVeloxPlan.cc | 34 +++++++++++++-------- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index cabe3252827d..f14cc37608e6 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -79,6 +79,9 @@ const std::string kVeloxSplitPreloadPerDriver = "spark.gluten.sql.columnar.backe const std::string kHashProbeDynamicFilterPushdownEnabled = "spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled"; +const std::string kHashTableBuildOncePerExecutor = "spark.gluten.velox.buildHashTableOncePerExecutor.enabled"; +const bool kHashTableBuildOncePerExecutorDefault = true; + const std::string kHashProbeBloomFilterPushdownMaxSize = "spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize"; diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index e9a2417d92d6..318235dc6196 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -399,24 +399,32 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: } else if ( sJoin.has_advanced_extension() && SubstraitParser::configSetInOptimization(sJoin.advanced_extension(), "isBHJ=")) { + bool hashTableBuildOncePerExecutorEnabled = + veloxCfg_->get(kHashTableBuildOncePerExecutor, kHashTableBuildOncePerExecutorDefault); + std::string hashTableId = sJoin.hashtableid(); std::shared_ptr opaqueSharedHashTable = nullptr; bool joinHasNullKeys = false; - try { - auto hashTableBuilder = ObjectStore::retrieve(getJoin(hashTableId)); - joinHasNullKeys = hashTableBuilder->joinHasNullKeys(); - auto originalShared = hashTableBuilder->hashTable(); - opaqueSharedHashTable = std::shared_ptr( - originalShared, reinterpret_cast(originalShared.get())); - - LOG(INFO) << "Successfully retrieved and aliased HashTable for reuse. ID: " << hashTableId; - } catch (const std::exception& e) { - LOG(WARNING) - << "Error retrieving HashTable from ObjectStore: " << e.what() - << ". Falling back to building new table. To ensure correct results, please verify that spark.gluten.velox.buildHashTableOncePerExecutor.enabled is set to false."; - opaqueSharedHashTable = nullptr; + if (hashTableBuildOncePerExecutorEnabled) { + std::cout << "the hashTableBuildOncePerExecutorEnabled is set" << "\n"; + try { + auto hashTableBuilder = ObjectStore::retrieve(getJoin(hashTableId)); + joinHasNullKeys = hashTableBuilder->joinHasNullKeys(); + auto originalShared = hashTableBuilder->hashTable(); + opaqueSharedHashTable = std::shared_ptr( + originalShared, reinterpret_cast(originalShared.get())); + + LOG(INFO) << "Successfully retrieved and aliased HashTable for reuse. ID: " << hashTableId; + } catch (const std::exception& e) { + throw GlutenException( + "Error retrieving HashTable from ObjectStore: " + std::string(e.what()) + + " You can set spark.gluten.velox.buildHashTableOncePerExecutor.enabled" + " to false as workaround."); + } + } else { + std::cout << "the hashTableBuildOncePerExecutorEnabled is false" << "\n"; } // Create HashJoinNode node From b7f8ecd1fa052c4d961e975448742d7d72ef6035 Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Fri, 10 Apr 2026 10:18:52 +0100 Subject: [PATCH 2/3] Fix failed unit tests --- cpp/velox/config/VeloxConfig.h | 3 --- cpp/velox/substrait/SubstraitToVeloxPlan.cc | 6 ++---- .../apache/gluten/execution/JoinExecTransformer.scala | 9 +++++++++ 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index f14cc37608e6..cabe3252827d 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -79,9 +79,6 @@ const std::string kVeloxSplitPreloadPerDriver = "spark.gluten.sql.columnar.backe const std::string kHashProbeDynamicFilterPushdownEnabled = "spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled"; -const std::string kHashTableBuildOncePerExecutor = "spark.gluten.velox.buildHashTableOncePerExecutor.enabled"; -const bool kHashTableBuildOncePerExecutorDefault = true; - const std::string kHashProbeBloomFilterPushdownMaxSize = "spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize"; diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 318235dc6196..5eb7e3c14df1 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -400,7 +400,8 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: sJoin.has_advanced_extension() && SubstraitParser::configSetInOptimization(sJoin.advanced_extension(), "isBHJ=")) { bool hashTableBuildOncePerExecutorEnabled = - veloxCfg_->get(kHashTableBuildOncePerExecutor, kHashTableBuildOncePerExecutorDefault); + SubstraitParser::configSetInOptimization( + sJoin.advanced_extension(), "isHashTableBuildOncePerExecutor="); std::string hashTableId = sJoin.hashtableid(); @@ -408,7 +409,6 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: bool joinHasNullKeys = false; if (hashTableBuildOncePerExecutorEnabled) { - std::cout << "the hashTableBuildOncePerExecutorEnabled is set" << "\n"; try { auto hashTableBuilder = ObjectStore::retrieve(getJoin(hashTableId)); joinHasNullKeys = hashTableBuilder->joinHasNullKeys(); @@ -423,8 +423,6 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: " You can set spark.gluten.velox.buildHashTableOncePerExecutor.enabled" " to false as workaround."); } - } else { - std::cout << "the hashTableBuildOncePerExecutorEnabled is false" << "\n"; } // Create HashJoinNode node diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala index 149a0ca729eb..7fe567c93e84 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala @@ -17,6 +17,7 @@ package org.apache.gluten.execution import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.config.GlutenConfig import org.apache.gluten.expression._ import org.apache.gluten.metrics.MetricsUpdater import org.apache.gluten.sql.shims.SparkShimLoader @@ -291,6 +292,14 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with TransformSupport { .append("buildHashTableId=") .append(buildHashTableId) .append("\n") + .append("isHashTableBuildOncePerExecutor=") + .append( + if ( + BackendsApiManager.getSettings.enableHashTableBuildOncePerExecutor() && + GlutenConfig.get.enableColumnarBroadcastExchange + ) 1 + else 0) + .append("\n") .append("isExistenceJoin=") .append(if (joinType.isInstanceOf[ExistenceJoin]) 1 else 0) .append("\n") From 7f4806e6c766625afa6272a1b4f6d052595ebf3c Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Fri, 10 Apr 2026 10:56:02 +0100 Subject: [PATCH 3/3] Fix compile issue --- .../gluten/execution/JoinExecTransformer.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala index 7fe567c93e84..629d7c9958c1 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala @@ -282,6 +282,12 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with TransformSupport { // isBHJ: 0 for SHJ, 1 for BHJ // isNullAwareAntiJoin: 0 for false, 1 for true // buildHashTableId: the unique id for the hash table of build plan + val isHashTableBuildOncePerExecutor = + if ( + BackendsApiManager.getSettings.enableHashTableBuildOncePerExecutor() && + GlutenConfig.get.enableColumnarBroadcastExchange + ) { 1 } + else 0 joinParametersStr .append("isBHJ=") .append(isBHJ) @@ -293,12 +299,7 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with TransformSupport { .append(buildHashTableId) .append("\n") .append("isHashTableBuildOncePerExecutor=") - .append( - if ( - BackendsApiManager.getSettings.enableHashTableBuildOncePerExecutor() && - GlutenConfig.get.enableColumnarBroadcastExchange - ) 1 - else 0) + .append(isHashTableBuildOncePerExecutor) .append("\n") .append("isExistenceJoin=") .append(if (joinType.isInstanceOf[ExistenceJoin]) 1 else 0)