Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cpp/velox/config/VeloxConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ const std::string kVeloxSplitPreloadPerDriver = "spark.gluten.sql.columnar.backe
const std::string kHashProbeDynamicFilterPushdownEnabled =
"spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled";

const std::string kHashTableBuildOncePerExecutor = "spark.gluten.velox.buildHashTableOncePerExecutor.enabled";
const bool kHashTableBuildOncePerExecutorDefault = true;

const std::string kHashProbeBloomFilterPushdownMaxSize =
"spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize";

Expand Down
34 changes: 21 additions & 13 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -399,24 +399,32 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
} else if (
sJoin.has_advanced_extension() &&
SubstraitParser::configSetInOptimization(sJoin.advanced_extension(), "isBHJ=")) {
bool hashTableBuildOncePerExecutorEnabled =
veloxCfg_->get<bool>(kHashTableBuildOncePerExecutor, kHashTableBuildOncePerExecutorDefault);

std::string hashTableId = sJoin.hashtableid();

std::shared_ptr<core::OpaqueHashTable> opaqueSharedHashTable = nullptr;
bool joinHasNullKeys = false;

try {
auto hashTableBuilder = ObjectStore::retrieve<gluten::HashTableBuilder>(getJoin(hashTableId));
joinHasNullKeys = hashTableBuilder->joinHasNullKeys();
auto originalShared = hashTableBuilder->hashTable();
opaqueSharedHashTable = std::shared_ptr<core::OpaqueHashTable>(
originalShared, reinterpret_cast<core::OpaqueHashTable*>(originalShared.get()));

LOG(INFO) << "Successfully retrieved and aliased HashTable for reuse. ID: " << hashTableId;
} catch (const std::exception& e) {
LOG(WARNING)
<< "Error retrieving HashTable from ObjectStore: " << e.what()
<< ". Falling back to building new table. To ensure correct results, please verify that spark.gluten.velox.buildHashTableOncePerExecutor.enabled is set to false.";
opaqueSharedHashTable = nullptr;
if (hashTableBuildOncePerExecutorEnabled) {
std::cout << "the hashTableBuildOncePerExecutorEnabled is set" << "\n";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need the log?

try {
auto hashTableBuilder = ObjectStore::retrieve<gluten::HashTableBuilder>(getJoin(hashTableId));
joinHasNullKeys = hashTableBuilder->joinHasNullKeys();
auto originalShared = hashTableBuilder->hashTable();
opaqueSharedHashTable = std::shared_ptr<core::OpaqueHashTable>(
originalShared, reinterpret_cast<core::OpaqueHashTable*>(originalShared.get()));

LOG(INFO) << "Successfully retrieved and aliased HashTable for reuse. ID: " << hashTableId;
} catch (const std::exception& e) {
throw GlutenException(
"Error retrieving HashTable from ObjectStore: " + std::string(e.what()) +
" You can set spark.gluten.velox.buildHashTableOncePerExecutor.enabled"
" to false as workaround.");
}
} else {
std::cout << "the hashTableBuildOncePerExecutorEnabled is false" << "\n";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOG(INFO)

}

// Create HashJoinNode node
Expand Down
Loading