diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index dbb81a4824c0d..d0c33a114af7c 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -77,7 +77,7 @@ indexmap = { workspace = true } itertools = { workspace = true } libc = "0.2.180" log = { workspace = true } -object_store = { workspace = true, optional = true } +object_store = { workspace = true, optional = true, features = ["tokio"] } parquet = { workspace = true, optional = true, default-features = true } recursive = { workspace = true, optional = true } sqlparser = { workspace = true, optional = true } diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index fcc2e919b6cc2..33ea7292cf26d 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -107,10 +107,10 @@ where T: AsDynArray, F: FnOnce(&[u64]) -> Result, { - // Peek at the first array to determine buffer size without fully collecting + // Keep the default RandomState path monomorphic: routing it through the + // generic custom-hasher abstraction regressed the CI aarch64 with_hashes benchmark. let mut iter = arrays.into_iter().peekable(); - // Get the required size from the first array let required_size = match iter.peek() { Some(arr) => arr.as_dyn_array().len(), None => return _internal_err!("with_hashes requires at least one array"), @@ -120,17 +120,71 @@ where let mut buffer = cell.try_borrow_mut() .map_err(|_| _internal_datafusion_err!("with_hashes cannot be called reentrantly on the same thread"))?; - // Ensure buffer has sufficient length, clearing old values buffer.clear(); buffer.resize(required_size, 0); - // Create hashes in the buffer - this consumes the iterator create_hashes(iter, random_state, &mut buffer[..required_size])?; - // Execute the callback with an immutable slice let result = callback(&buffer[..required_size])?; - // Cleanup: truncate if buffer grew too large + if buffer.capacity() > MAX_BUFFER_SIZE { + buffer.truncate(MAX_BUFFER_SIZE); + buffer.shrink_to_fit(); + } + + Ok(result) + }).map_err(|_| _internal_datafusion_err!("with_hashes cannot access thread-local storage during or after thread destruction"))? +} + +/// Creates hashes for the given arrays using a thread-local buffer and a custom +/// hash builder, then calls the provided callback with the computed hashes. +pub fn with_hashes_with_hasher( + arrays: I, + hash_builder: &S, + callback: F, +) -> Result +where + I: IntoIterator, + T: AsDynArray, + F: FnOnce(&[u64]) -> Result, + S: BuildHasher, +{ + with_hashes_with_hasher_impl(arrays, hash_builder, callback) +} + +fn with_hashes_with_hasher_impl( + arrays: I, + hash_builder: &S, + callback: F, +) -> Result +where + I: IntoIterator, + T: AsDynArray, + F: FnOnce(&[u64]) -> Result, + S: BuildHasher, +{ + let mut iter = arrays.into_iter().peekable(); + + let required_size = match iter.peek() { + Some(arr) => arr.as_dyn_array().len(), + None => return _internal_err!("with_hashes requires at least one array"), + }; + + HASH_BUFFER.try_with(|cell| { + let mut buffer = cell.try_borrow_mut() + .map_err(|_| _internal_datafusion_err!("with_hashes cannot be called reentrantly on the same thread"))?; + + buffer.clear(); + buffer.resize(required_size, 0); + + create_hashes_with_hasher_impl( + iter, + hash_builder, + &mut buffer[..required_size], + )?; + + let result = callback(&buffer[..required_size])?; + if buffer.capacity() > MAX_BUFFER_SIZE { buffer.truncate(MAX_BUFFER_SIZE); buffer.shrink_to_fit(); @@ -169,6 +223,19 @@ impl HashValue for &T { } } +#[cfg(not(feature = "force_hash_collisions"))] +// Keep custom BuildHasher leaf hashing off the default RandomState fast path. +trait BuildHasherHashValue { + fn hash_one_with_hasher(&self, state: &S) -> u64; +} + +#[cfg(not(feature = "force_hash_collisions"))] +impl BuildHasherHashValue for &T { + fn hash_one_with_hasher(&self, state: &S) -> u64 { + T::hash_one_with_hasher(self, state) + } +} + macro_rules! hash_value { ($($t:ty),+) => { $(impl HashValue for $t { @@ -178,6 +245,13 @@ macro_rules! hash_value { fn hash_write(&self, hasher: &mut impl Hasher) { Hash::hash(self, hasher) } + } + + #[cfg(not(feature = "force_hash_collisions"))] + impl BuildHasherHashValue for $t { + fn hash_one_with_hasher(&self, state: &S) -> u64 { + state.hash_one(self) + } })+ }; } @@ -193,6 +267,13 @@ macro_rules! hash_float_value { fn hash_write(&self, hasher: &mut impl Hasher) { hasher.write(&self.to_ne_bytes()) } + } + + #[cfg(not(feature = "force_hash_collisions"))] + impl BuildHasherHashValue for $t { + fn hash_one_with_hasher(&self, state: &S) -> u64 { + state.hash_one(<$i>::from_ne_bytes(self.to_ne_bytes())) + } })+ }; } @@ -210,6 +291,49 @@ fn seeded_state(seed: u64) -> foldhash::fast::SeedableRandomState { ) } +#[cfg(not(feature = "force_hash_collisions"))] +// Only recursive child hashing is shared; leaf hot loops stay split so the +// default RandomState path keeps its concrete codegen. +trait ChildHashing { + fn create_hashes(&self, arrays: I, hashes_buffer: &mut [u64]) -> Result<()> + where + I: IntoIterator, + T: AsDynArray; +} + +#[cfg(not(feature = "force_hash_collisions"))] +struct RandomStateChildHashing<'a> { + random_state: &'a RandomState, +} + +#[cfg(not(feature = "force_hash_collisions"))] +impl ChildHashing for RandomStateChildHashing<'_> { + fn create_hashes(&self, arrays: I, hashes_buffer: &mut [u64]) -> Result<()> + where + I: IntoIterator, + T: AsDynArray, + { + create_hashes(arrays, self.random_state, hashes_buffer).map(|_| ()) + } +} + +#[cfg(not(feature = "force_hash_collisions"))] +struct BuildHasherChildHashing<'a, S> { + hash_builder: &'a S, +} + +#[cfg(not(feature = "force_hash_collisions"))] +impl ChildHashing for BuildHasherChildHashing<'_, S> { + fn create_hashes(&self, arrays: I, hashes_buffer: &mut [u64]) -> Result<()> + where + I: IntoIterator, + T: AsDynArray, + { + create_hashes_with_hasher_impl(arrays, self.hash_builder, hashes_buffer) + .map(|_| ()) + } +} + /// Builds hash values of PrimitiveArray and writes them into `hashes_buffer` /// If `rehash==true` this folds the existing hash into the hasher state /// and hashes only the new value (avoiding a separate combine step). @@ -428,6 +552,231 @@ fn hash_generic_byte_view_array( } } +#[cfg(not(feature = "force_hash_collisions"))] +// The custom BuildHasher path intentionally mirrors the leaf helpers so the +// default path does not route its tight loops through generic abstractions. +fn hash_null_with_hasher( + hash_builder: &S, + hashes_buffer: &mut [u64], + mul_col: bool, +) { + if mul_col { + hashes_buffer.iter_mut().for_each(|hash| { + *hash = combine_hashes(hash_builder.hash_one(1), *hash); + }) + } else { + hashes_buffer.iter_mut().for_each(|hash| { + *hash = hash_builder.hash_one(1); + }) + } +} + +#[cfg(not(feature = "force_hash_collisions"))] +fn hash_array_primitive_with_hasher( + array: &PrimitiveArray, + hash_builder: &S, + hashes_buffer: &mut [u64], + rehash: bool, +) where + T: ArrowPrimitiveType, + S: BuildHasher, +{ + assert_eq!( + hashes_buffer.len(), + array.len(), + "hashes_buffer and array should be of equal length" + ); + + if array.null_count() == 0 { + if rehash { + for (hash, &value) in hashes_buffer.iter_mut().zip(array.values().iter()) { + *hash = combine_hashes(value.hash_one_with_hasher(hash_builder), *hash); + } + } else { + for (hash, &value) in hashes_buffer.iter_mut().zip(array.values().iter()) { + *hash = value.hash_one_with_hasher(hash_builder); + } + } + } else if rehash { + for i in array.nulls().unwrap().valid_indices() { + let value = unsafe { array.value_unchecked(i) }; + hashes_buffer[i] = combine_hashes( + value.hash_one_with_hasher(hash_builder), + hashes_buffer[i], + ); + } + } else { + for i in array.nulls().unwrap().valid_indices() { + let value = unsafe { array.value_unchecked(i) }; + hashes_buffer[i] = value.hash_one_with_hasher(hash_builder); + } + } +} + +#[cfg(not(feature = "force_hash_collisions"))] +fn hash_array_with_hasher( + array: &T, + hash_builder: &S, + hashes_buffer: &mut [u64], + rehash: bool, +) where + T: ArrayAccessor, + T::Item: BuildHasherHashValue, + S: BuildHasher, +{ + assert_eq!( + hashes_buffer.len(), + array.len(), + "hashes_buffer and array should be of equal length" + ); + + if array.null_count() == 0 { + if rehash { + for (i, hash) in hashes_buffer.iter_mut().enumerate() { + let value = unsafe { array.value_unchecked(i) }; + *hash = combine_hashes(value.hash_one_with_hasher(hash_builder), *hash); + } + } else { + for (i, hash) in hashes_buffer.iter_mut().enumerate() { + let value = unsafe { array.value_unchecked(i) }; + *hash = value.hash_one_with_hasher(hash_builder); + } + } + } else if rehash { + for i in array.nulls().unwrap().valid_indices() { + let value = unsafe { array.value_unchecked(i) }; + hashes_buffer[i] = combine_hashes( + value.hash_one_with_hasher(hash_builder), + hashes_buffer[i], + ); + } + } else { + for i in array.nulls().unwrap().valid_indices() { + let value = unsafe { array.value_unchecked(i) }; + hashes_buffer[i] = value.hash_one_with_hasher(hash_builder); + } + } +} + +#[cfg(not(feature = "force_hash_collisions"))] +#[inline(never)] +fn hash_string_view_array_inner_with_hasher< + T: ByteViewType, + S: BuildHasher, + const HAS_NULLS: bool, + const HAS_BUFFERS: bool, + const REHASH: bool, +>( + array: &GenericByteViewArray, + hash_builder: &S, + hashes_buffer: &mut [u64], +) { + assert_eq!( + hashes_buffer.len(), + array.len(), + "hashes_buffer and array should be of equal length" + ); + + let buffers = array.data_buffers(); + let view_bytes = |view_len: u32, view: u128| { + let view = ByteView::from(view); + let offset = view.offset as usize; + unsafe { + let data = buffers.get_unchecked(view.buffer_index as usize); + data.get_unchecked(offset..offset + view_len as usize) + } + }; + + let hashes_and_views = hashes_buffer.iter_mut().zip(array.views().iter()); + for (i, (hash, &v)) in hashes_and_views.enumerate() { + if HAS_NULLS && array.is_null(i) { + continue; + } + let view_len = v as u32; + if !HAS_BUFFERS || view_len <= 12 { + if REHASH { + *hash = combine_hashes(v.hash_one_with_hasher(hash_builder), *hash); + } else { + *hash = v.hash_one_with_hasher(hash_builder); + } + continue; + } + let value = view_bytes(view_len, v); + if REHASH { + *hash = combine_hashes(value.hash_one_with_hasher(hash_builder), *hash); + } else { + *hash = value.hash_one_with_hasher(hash_builder); + } + } +} + +#[cfg(not(feature = "force_hash_collisions"))] +fn hash_generic_byte_view_array_with_hasher( + array: &GenericByteViewArray, + hash_builder: &S, + hashes_buffer: &mut [u64], + rehash: bool, +) { + match ( + array.null_count() != 0, + !array.data_buffers().is_empty(), + rehash, + ) { + (false, false, false) => { + for (hash, &view) in hashes_buffer.iter_mut().zip(array.views().iter()) { + *hash = view.hash_one_with_hasher(hash_builder); + } + } + (false, false, true) => { + for (hash, &view) in hashes_buffer.iter_mut().zip(array.views().iter()) { + *hash = combine_hashes(view.hash_one_with_hasher(hash_builder), *hash); + } + } + (false, true, false) => { + hash_string_view_array_inner_with_hasher::( + array, + hash_builder, + hashes_buffer, + ) + } + (false, true, true) => { + hash_string_view_array_inner_with_hasher::( + array, + hash_builder, + hashes_buffer, + ) + } + (true, false, false) => { + hash_string_view_array_inner_with_hasher::( + array, + hash_builder, + hashes_buffer, + ) + } + (true, false, true) => { + hash_string_view_array_inner_with_hasher::( + array, + hash_builder, + hashes_buffer, + ) + } + (true, true, false) => { + hash_string_view_array_inner_with_hasher::( + array, + hash_builder, + hashes_buffer, + ) + } + (true, true, true) => { + hash_string_view_array_inner_with_hasher::( + array, + hash_builder, + hashes_buffer, + ) + } + } +} + /// Hash dictionary array with compile-time specialization for null handling. /// /// Uses const generics to eliminate runtim branching in the hot loop: @@ -438,12 +787,13 @@ fn hash_generic_byte_view_array( #[inline(never)] fn hash_dictionary_inner< K: ArrowDictionaryKeyType, + C: ChildHashing + ?Sized, const HAS_NULL_KEYS: bool, const HAS_NULL_VALUES: bool, const MULTI_COL: bool, >( array: &DictionaryArray, - random_state: &RandomState, + child_hashing: &C, hashes_buffer: &mut [u64], ) -> Result<()> { // Hash each dictionary value once, and then use that computed @@ -451,7 +801,7 @@ fn hash_dictionary_inner< // redundant hashing for large dictionary elements (e.g. strings) let dict_values = array.values(); let mut dict_hashes = vec![0; dict_values.len()]; - create_hashes([dict_values], random_state, &mut dict_hashes)?; + child_hashing.create_hashes([dict_values], &mut dict_hashes)?; if HAS_NULL_KEYS { for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) { @@ -485,7 +835,7 @@ fn hash_dictionary_inner< #[cfg(not(feature = "force_hash_collisions"))] fn hash_dictionary( array: &DictionaryArray, - random_state: &RandomState, + child_hashing: &impl ChildHashing, hashes_buffer: &mut [u64], multi_col: bool, ) -> Result<()> { @@ -495,44 +845,44 @@ fn hash_dictionary( // Dispatcher based on null presence and multi-column mode // Should reduce branching within hot loops match (has_null_keys, has_null_values, multi_col) { - (false, false, false) => hash_dictionary_inner::( + (false, false, false) => hash_dictionary_inner::( array, - random_state, + child_hashing, hashes_buffer, ), - (false, false, true) => hash_dictionary_inner::( + (false, false, true) => hash_dictionary_inner::( array, - random_state, + child_hashing, hashes_buffer, ), - (false, true, false) => hash_dictionary_inner::( + (false, true, false) => hash_dictionary_inner::( array, - random_state, + child_hashing, hashes_buffer, ), - (false, true, true) => hash_dictionary_inner::( + (false, true, true) => hash_dictionary_inner::( array, - random_state, + child_hashing, hashes_buffer, ), - (true, false, false) => hash_dictionary_inner::( + (true, false, false) => hash_dictionary_inner::( array, - random_state, + child_hashing, hashes_buffer, ), - (true, false, true) => hash_dictionary_inner::( + (true, false, true) => hash_dictionary_inner::( array, - random_state, + child_hashing, hashes_buffer, ), - (true, true, false) => hash_dictionary_inner::( + (true, true, false) => hash_dictionary_inner::( array, - random_state, + child_hashing, hashes_buffer, ), - (true, true, true) => hash_dictionary_inner::( + (true, true, true) => hash_dictionary_inner::( array, - random_state, + child_hashing, hashes_buffer, ), } @@ -541,7 +891,7 @@ fn hash_dictionary( #[cfg(not(feature = "force_hash_collisions"))] fn hash_struct_array( array: &StructArray, - random_state: &RandomState, + child_hashing: &impl ChildHashing, hashes_buffer: &mut [u64], ) -> Result<()> { let nulls = array.nulls(); @@ -549,7 +899,7 @@ fn hash_struct_array( // Create hashes for each row that combines the hashes over all the column at that row. let mut values_hashes = vec![0u64; row_len]; - create_hashes(array.columns(), random_state, &mut values_hashes)?; + child_hashing.create_hashes(array.columns(), &mut values_hashes)?; // Separate paths to avoid allocating Vec when there are no nulls if let Some(nulls) = nulls { @@ -571,7 +921,7 @@ fn hash_struct_array( #[cfg(not(feature = "force_hash_collisions"))] fn hash_map_array( array: &MapArray, - random_state: &RandomState, + child_hashing: &impl ChildHashing, hashes_buffer: &mut [u64], ) -> Result<()> { let nulls = array.nulls(); @@ -590,7 +940,7 @@ fn hash_map_array( .iter() .map(|col| col.slice(first_offset, entries_len)) .collect(); - create_hashes(&sliced_columns, random_state, &mut values_hashes)?; + child_hashing.create_hashes(&sliced_columns, &mut values_hashes)?; // Combine the hashes for entries on each row with each other and previous hash for that row // Adjust indices by first_offset since values_hashes is sliced starting from first_offset @@ -622,7 +972,7 @@ fn hash_map_array( #[cfg(not(feature = "force_hash_collisions"))] fn hash_list_array( array: &GenericListArray, - random_state: &RandomState, + child_hashing: &impl ChildHashing, hashes_buffer: &mut [u64], ) -> Result<()> where @@ -633,11 +983,10 @@ where let last_offset = array.value_offsets().last().cloned().unwrap_or_default(); let value_bytes_len = (last_offset - first_offset).as_usize(); let mut values_hashes = vec![0u64; value_bytes_len]; - create_hashes( + child_hashing.create_hashes( [array .values() .slice(first_offset.as_usize(), value_bytes_len)], - random_state, &mut values_hashes, )?; @@ -673,7 +1022,7 @@ where #[cfg(not(feature = "force_hash_collisions"))] fn hash_list_view_array( array: &GenericListViewArray, - random_state: &RandomState, + child_hashing: &impl ChildHashing, hashes_buffer: &mut [u64], ) -> Result<()> where @@ -684,7 +1033,7 @@ where let sizes = array.value_sizes(); let nulls = array.nulls(); let mut values_hashes = vec![0u64; values.len()]; - create_hashes([values], random_state, &mut values_hashes)?; + child_hashing.create_hashes([values], &mut values_hashes)?; if let Some(nulls) = nulls { for (i, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() { if nulls.is_valid(i) { @@ -712,7 +1061,7 @@ where #[cfg(not(feature = "force_hash_collisions"))] fn hash_union_array( array: &UnionArray, - random_state: &RandomState, + child_hashing: &impl ChildHashing, hashes_buffer: &mut [u64], ) -> Result<()> { let DataType::Union(union_fields, _mode) = array.data_type() else { @@ -722,12 +1071,12 @@ fn hash_union_array( if array.is_dense() { // Dense union: children only contain values of their type, so they're already compact. // Use the default hashing approach which is efficient for dense unions. - hash_union_array_default(array, union_fields, random_state, hashes_buffer) + hash_union_array_default(array, union_fields, child_hashing, hashes_buffer) } else { // Sparse union: each child has the same length as the union array. // Optimization: only hash the elements that are actually referenced by type_ids, // instead of hashing all K*N elements (where K = num types, N = array length). - hash_sparse_union_array(array, union_fields, random_state, hashes_buffer) + hash_sparse_union_array(array, union_fields, child_hashing, hashes_buffer) } } @@ -744,7 +1093,7 @@ fn hash_union_array( fn hash_union_array_default( array: &UnionArray, union_fields: &UnionFields, - random_state: &RandomState, + child_hashing: &impl ChildHashing, hashes_buffer: &mut [u64], ) -> Result<()> { let mut child_hashes: HashMap> = @@ -754,7 +1103,7 @@ fn hash_union_array_default( for (type_id, _field) in union_fields.iter() { let child = array.child(type_id); let mut child_hash_buffer = vec![0; child.len()]; - create_hashes([child], random_state, &mut child_hash_buffer)?; + child_hashing.create_hashes([child], &mut child_hash_buffer)?; child_hashes.insert(type_id, child_hash_buffer); } @@ -785,7 +1134,7 @@ fn hash_union_array_default( fn hash_sparse_union_array( array: &UnionArray, union_fields: &UnionFields, - random_state: &RandomState, + child_hashing: &impl ChildHashing, hashes_buffer: &mut [u64], ) -> Result<()> { use std::collections::HashMap; @@ -796,7 +1145,7 @@ fn hash_sparse_union_array( return hash_union_array_default( array, union_fields, - random_state, + child_hashing, hashes_buffer, ); } @@ -824,7 +1173,7 @@ fn hash_sparse_union_array( // Hash the filtered array let mut filtered_hashes = vec![0u64; filtered.len()]; - create_hashes([&filtered], random_state, &mut filtered_hashes)?; + child_hashing.create_hashes([&filtered], &mut filtered_hashes)?; // Scatter hashes back to correct positions for (hash, &idx) in filtered_hashes.iter().zip(indices.iter()) { @@ -840,14 +1189,14 @@ fn hash_sparse_union_array( #[cfg(not(feature = "force_hash_collisions"))] fn hash_fixed_list_array( array: &FixedSizeListArray, - random_state: &RandomState, + child_hashing: &impl ChildHashing, hashes_buffer: &mut [u64], ) -> Result<()> { let values = array.values(); let value_length = array.value_length() as usize; let nulls = array.nulls(); let mut values_hashes = vec![0u64; values.len()]; - create_hashes([values], random_state, &mut values_hashes)?; + child_hashing.create_hashes([values], &mut values_hashes)?; if let Some(nulls) = nulls { for i in 0..array.len() { if nulls.is_valid(i) { @@ -875,11 +1224,12 @@ fn hash_fixed_list_array( #[cfg(not(feature = "force_hash_collisions"))] fn hash_run_array_inner< R: RunEndIndexType, + C: ChildHashing + ?Sized, const HAS_NULL_VALUES: bool, const REHASH: bool, >( array: &RunArray, - random_state: &RandomState, + child_hashing: &C, hashes_buffer: &mut [u64], ) -> Result<()> { // We find the relevant runs that cover potentially sliced arrays, so we can only hash those @@ -906,11 +1256,8 @@ fn hash_run_array_inner< end_physical_index - start_physical_index, ); let mut values_hashes = vec![0u64; sliced_values.len()]; - create_hashes( - std::slice::from_ref(&sliced_values), - random_state, - &mut values_hashes, - )?; + child_hashing + .create_hashes(std::slice::from_ref(&sliced_values), &mut values_hashes)?; let mut start_in_slice = 0; for (adjusted_physical_index, &absolute_run_end) in run_ends_values @@ -946,24 +1293,26 @@ fn hash_run_array_inner< #[cfg(not(feature = "force_hash_collisions"))] fn hash_run_array( array: &RunArray, - random_state: &RandomState, + child_hashing: &impl ChildHashing, hashes_buffer: &mut [u64], rehash: bool, ) -> Result<()> { let has_null_values = array.values().null_count() != 0; match (has_null_values, rehash) { - (false, false) => { - hash_run_array_inner::(array, random_state, hashes_buffer) - } + (false, false) => hash_run_array_inner::( + array, + child_hashing, + hashes_buffer, + ), (false, true) => { - hash_run_array_inner::(array, random_state, hashes_buffer) + hash_run_array_inner::(array, child_hashing, hashes_buffer) } (true, false) => { - hash_run_array_inner::(array, random_state, hashes_buffer) + hash_run_array_inner::(array, child_hashing, hashes_buffer) } (true, true) => { - hash_run_array_inner::(array, random_state, hashes_buffer) + hash_run_array_inner::(array, child_hashing, hashes_buffer) } } } @@ -977,6 +1326,10 @@ fn hash_single_array( hashes_buffer: &mut [u64], rehash: bool, ) -> Result<()> { + // Nested types recurse through ChildHashing; primitive/string/binary cases + // stay on the dedicated RandomState fast path above. + let child_hashing = RandomStateChildHashing { random_state }; + downcast_primitive_array! { array => hash_array_primitive(array, random_state, hashes_buffer, rehash), DataType::Null => hash_null(random_state, hashes_buffer, rehash), @@ -992,43 +1345,43 @@ fn hash_single_array( hash_array(&array, random_state, hashes_buffer, rehash) } DataType::Dictionary(_, _) => downcast_dictionary_array! { - array => hash_dictionary(array, random_state, hashes_buffer, rehash)?, + array => hash_dictionary(array, &child_hashing, hashes_buffer, rehash)?, _ => unreachable!() } DataType::Struct(_) => { let array = as_struct_array(array)?; - hash_struct_array(array, random_state, hashes_buffer)?; + hash_struct_array(array, &child_hashing, hashes_buffer)?; } DataType::List(_) => { let array = as_list_array(array)?; - hash_list_array(array, random_state, hashes_buffer)?; + hash_list_array(array, &child_hashing, hashes_buffer)?; } DataType::LargeList(_) => { let array = as_large_list_array(array)?; - hash_list_array(array, random_state, hashes_buffer)?; + hash_list_array(array, &child_hashing, hashes_buffer)?; } DataType::ListView(_) => { let array = as_list_view_array(array)?; - hash_list_view_array(array, random_state, hashes_buffer)?; + hash_list_view_array(array, &child_hashing, hashes_buffer)?; } DataType::LargeListView(_) => { let array = as_large_list_view_array(array)?; - hash_list_view_array(array, random_state, hashes_buffer)?; + hash_list_view_array(array, &child_hashing, hashes_buffer)?; } DataType::Map(_, _) => { let array = as_map_array(array)?; - hash_map_array(array, random_state, hashes_buffer)?; + hash_map_array(array, &child_hashing, hashes_buffer)?; } DataType::FixedSizeList(_,_) => { let array = as_fixed_size_list_array(array)?; - hash_fixed_list_array(array, random_state, hashes_buffer)?; + hash_fixed_list_array(array, &child_hashing, hashes_buffer)?; } DataType::Union(_, _) => { let array = as_union_array(array)?; - hash_union_array(array, random_state, hashes_buffer)?; + hash_union_array(array, &child_hashing, hashes_buffer)?; } DataType::RunEndEncoded(_, _) => downcast_run_array! { - array => hash_run_array(array, random_state, hashes_buffer, rehash)?, + array => hash_run_array(array, &child_hashing, hashes_buffer, rehash)?, _ => unreachable!() } _ => { @@ -1042,6 +1395,81 @@ fn hash_single_array( Ok(()) } +#[cfg(not(feature = "force_hash_collisions"))] +fn hash_single_array_with_hasher( + array: &dyn Array, + hash_builder: &S, + hashes_buffer: &mut [u64], + rehash: bool, +) -> Result<()> { + // The custom-hasher path reuses the shared structural combiners but keeps + // its own leaf dispatch and rehash semantics. + let child_hashing = BuildHasherChildHashing { hash_builder }; + + downcast_primitive_array! { + array => hash_array_primitive_with_hasher(array, hash_builder, hashes_buffer, rehash), + DataType::Null => hash_null_with_hasher(hash_builder, hashes_buffer, rehash), + DataType::Boolean => hash_array_with_hasher(&as_boolean_array(array)?, hash_builder, hashes_buffer, rehash), + DataType::Utf8 => hash_array_with_hasher(&as_string_array(array)?, hash_builder, hashes_buffer, rehash), + DataType::Utf8View => hash_generic_byte_view_array_with_hasher(as_string_view_array(array)?, hash_builder, hashes_buffer, rehash), + DataType::LargeUtf8 => hash_array_with_hasher(&as_largestring_array(array), hash_builder, hashes_buffer, rehash), + DataType::Binary => hash_array_with_hasher(&as_generic_binary_array::(array)?, hash_builder, hashes_buffer, rehash), + DataType::BinaryView => hash_generic_byte_view_array_with_hasher(as_binary_view_array(array)?, hash_builder, hashes_buffer, rehash), + DataType::LargeBinary => hash_array_with_hasher(&as_generic_binary_array::(array)?, hash_builder, hashes_buffer, rehash), + DataType::FixedSizeBinary(_) => { + let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap(); + hash_array_with_hasher(&array, hash_builder, hashes_buffer, rehash) + } + DataType::Dictionary(_, _) => downcast_dictionary_array! { + array => hash_dictionary(array, &child_hashing, hashes_buffer, rehash)?, + _ => unreachable!() + } + DataType::Struct(_) => { + let array = as_struct_array(array)?; + hash_struct_array(array, &child_hashing, hashes_buffer)?; + } + DataType::List(_) => { + let array = as_list_array(array)?; + hash_list_array(array, &child_hashing, hashes_buffer)?; + } + DataType::LargeList(_) => { + let array = as_large_list_array(array)?; + hash_list_array(array, &child_hashing, hashes_buffer)?; + } + DataType::ListView(_) => { + let array = as_list_view_array(array)?; + hash_list_view_array(array, &child_hashing, hashes_buffer)?; + } + DataType::LargeListView(_) => { + let array = as_large_list_view_array(array)?; + hash_list_view_array(array, &child_hashing, hashes_buffer)?; + } + DataType::Map(_, _) => { + let array = as_map_array(array)?; + hash_map_array(array, &child_hashing, hashes_buffer)?; + } + DataType::FixedSizeList(_,_) => { + let array = as_fixed_size_list_array(array)?; + hash_fixed_list_array(array, &child_hashing, hashes_buffer)?; + } + DataType::Union(_, _) => { + let array = as_union_array(array)?; + hash_union_array(array, &child_hashing, hashes_buffer)?; + } + DataType::RunEndEncoded(_, _) => downcast_run_array! { + array => hash_run_array(array, &child_hashing, hashes_buffer, rehash)?, + _ => unreachable!() + } + _ => { + return _internal_err!( + "Unsupported data type in hasher: {}", + array.data_type() + ); + } + } + Ok(()) +} + /// Test version of `hash_single_array` that forces all hashes to collide to zero. #[cfg(feature = "force_hash_collisions")] fn hash_single_array( @@ -1056,6 +1484,19 @@ fn hash_single_array( Ok(()) } +#[cfg(feature = "force_hash_collisions")] +fn hash_single_array_with_hasher( + _array: &dyn Array, + _hash_builder: &S, + hashes_buffer: &mut [u64], + _rehash: bool, +) -> Result<()> { + for hash in hashes_buffer.iter_mut() { + *hash = 0 + } + Ok(()) +} + /// Something that can be returned as a `&dyn Array`. /// /// We want `create_hashes` to accept either `&dyn Array` or `ArrayRef`, @@ -1093,6 +1534,28 @@ impl AsDynArray for &ArrayRef { } } +fn create_hashes_with_hasher_impl<'a, I, T, S>( + arrays: I, + hash_builder: &S, + hashes_buffer: &'a mut [u64], +) -> Result<&'a mut [u64]> +where + I: IntoIterator, + T: AsDynArray, + S: BuildHasher, +{ + for (i, array) in arrays.into_iter().enumerate() { + let rehash = i >= 1; + hash_single_array_with_hasher( + array.as_dyn_array(), + hash_builder, + hashes_buffer, + rehash, + )?; + } + Ok(hashes_buffer) +} + /// Creates hash values for every row, based on the values in the columns. /// /// The number of rows to hash is determined by `hashes_buffer.len()`. @@ -1106,16 +1569,31 @@ where I: IntoIterator, T: AsDynArray, { + // Keep the default RandomState path concrete for the same benchmark reason as with_hashes. for (i, array) in arrays.into_iter().enumerate() { - // combine hashes with `combine_hashes` for all columns besides the first let rehash = i >= 1; hash_single_array(array.as_dyn_array(), random_state, hashes_buffer, rehash)?; } Ok(hashes_buffer) } +/// Creates hash values for every row using a caller-provided hash builder. +pub fn create_hashes_with_hasher<'a, I, T, S>( + arrays: I, + hash_builder: &S, + hashes_buffer: &'a mut [u64], +) -> Result<&'a mut [u64]> +where + I: IntoIterator, + T: AsDynArray, + S: BuildHasher, +{ + create_hashes_with_hasher_impl(arrays, hash_builder, hashes_buffer) +} + #[cfg(test)] mod tests { + use std::hash::{BuildHasherDefault, Hasher}; use std::sync::Arc; use arrow::array::*; @@ -1124,6 +1602,21 @@ mod tests { use super::*; + #[derive(Default)] + struct TestHasher(u64); + + impl Hasher for TestHasher { + fn finish(&self) -> u64 { + self.0 + } + + fn write(&mut self, bytes: &[u8]) { + for byte in bytes { + self.0 = self.0.wrapping_mul(37).wrapping_add(u64::from(*byte)); + } + } + } + #[test] fn create_hashes_for_decimal_array() -> Result<()> { let array = vec![1, 2, 3, 4] @@ -1890,6 +2383,90 @@ mod tests { ); } + #[test] + #[cfg(not(feature = "force_hash_collisions"))] + fn test_create_hashes_large_utf8_multi_column_rehash_matches_combine() { + let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4])); + let large_utf8: ArrayRef = Arc::new(LargeStringArray::from(vec![ + Some("repeat"), + None, + Some("repeat"), + Some("different"), + ])); + let random_state = RandomState::with_seed(0); + + let mut first_col_hashes = vec![0; int_array.len()]; + create_hashes([&int_array], &random_state, &mut first_col_hashes).unwrap(); + + let mut combined_hashes = vec![0; int_array.len()]; + create_hashes( + [&int_array, &large_utf8], + &random_state, + &mut combined_hashes, + ) + .unwrap(); + + let large_utf8 = large_utf8 + .as_any() + .downcast_ref::() + .unwrap(); + let expected = (0..large_utf8.len()) + .map(|i| { + if large_utf8.is_valid(i) { + combine_hashes( + large_utf8.value(i).hash_one(&random_state), + first_col_hashes[i], + ) + } else { + first_col_hashes[i] + } + }) + .collect::>(); + + assert_eq!(combined_hashes, expected); + } + + #[test] + #[cfg(not(feature = "force_hash_collisions"))] + fn test_create_hashes_with_custom_hasher() { + let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 1, 4])); + let hash_builder = BuildHasherDefault::::default(); + + let mut custom_hashes = vec![0; array.len()]; + create_hashes_with_hasher([&array], &hash_builder, &mut custom_hashes).unwrap(); + + let random_state = RandomState::with_seed(0); + let mut default_hashes = vec![0; array.len()]; + create_hashes([&array], &random_state, &mut default_hashes).unwrap(); + + assert_eq!(custom_hashes[0], custom_hashes[2]); + assert_ne!(custom_hashes[0], custom_hashes[1]); + assert_ne!(custom_hashes, default_hashes); + } + + #[test] + fn test_with_hashes_with_custom_hasher() { + let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); + let str_array: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c"])); + let hash_builder = BuildHasherDefault::::default(); + + let mut expected_hashes = vec![0; int_array.len()]; + create_hashes_with_hasher( + [&int_array, &str_array], + &hash_builder, + &mut expected_hashes, + ) + .unwrap(); + + let actual_hashes = + with_hashes_with_hasher([&int_array, &str_array], &hash_builder, |hashes| { + Ok(hashes.to_vec()) + }) + .unwrap(); + + assert_eq!(actual_hashes, expected_hashes); + } + #[test] #[cfg(not(feature = "force_hash_collisions"))] fn create_hashes_for_sparse_union_arrays() {