from_protobuf kernel (part0): framework, API, and schema validation #4373
from_protobuf kernel (part0): framework, API, and schema validation #4373thirtiseven merged 30 commits intoNVIDIA:mainfrom
Conversation
Greptile SummaryThis PR establishes the full framework for a GPU-accelerated protobuf decoder: public C++ API ( Confidence Score: 5/5Safe to merge; all prior P0/P1 issues are resolved and the one remaining finding is a minor P2 resource-safety note in JNI helpers. The major structural bugs from earlier review rounds (wrong column type for zero-row repeated scalars, zero-child STRUCT assembly, missing repeated+required and enum sort checks) are all fixed and tested. The only new finding is a low-probability resource leak in jni_byte_array_to_vector / jni_int_array_to_vector if make_host_vector throws under OOM — a P2 hardening item that does not block merge. src/main/cpp/src/ProtobufJni.cpp — minor RAII hardening for JNI array element release. Important Files Changed
Sequence DiagramsequenceDiagram
participant Java as Protobuf.java
participant JNI as ProtobufJni.cpp
participant Validate as validate_decode_context()
participant Stub as decode_protobuf_to_struct()
participant Builders as protobuf_builders.cu
Java->>JNI: decodeToStruct(binaryInputView, 15 arrays...)
JNI->>JNI: null-check all inputs
JNI->>JNI: validate array lengths match num_fields
JNI->>JNI: build nested_field_descriptor[] + context
JNI->>Stub: decode_protobuf_to_struct(input, context, stream, mr)
Stub->>Validate: validate_decode_context(context)
Validate-->>Stub: OK / throws invalid_argument
alt num_fields == 0
Stub->>Stub: return empty STRUCT<> with input null mask
else num_rows == 0
Stub->>Builders: make_empty_struct/list/column_safe per top-level field
Builders-->>Stub: typed empty columns
Stub->>Stub: return STRUCT<...> with 0 rows
else normal path (stub)
Stub->>Builders: make_null_column_with_schema per top-level field
Builders->>Builders: recurse into STRUCT children / wrap repeated in LIST
Builders-->>Stub: all-null columns with correct type hierarchy
Stub->>Stub: propagate input null mask → return STRUCT<...>
end
Stub-->>JNI: unique_ptr<cudf::column>
JNI-->>Java: jlong handle → ColumnVector
Reviews (22): Last reviewed commit: "Apply suggestions from code review" | Re-trigger Greptile |
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
9e440d7 to
b619524
Compare
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
ttnghia
left a comment
There was a problem hiding this comment.
Can you put the new files (except *Jni.cpp file) into a separate folder please? Such as src/protobuf/*. This way can organize code by modules and provide a better structural view of the project. We should do the same to reorganize other modules soon.
src/main/cpp/src/protobuf.hpp
Outdated
| namespace spark_rapids_jni { | ||
|
|
||
| // Encoding constants | ||
| enum class proto_encoding : int { |
There was a problem hiding this comment.
For better encapsulation, please wrap all the protobuf code into a sub namespace namespace protobuf. By doing so, we can easier identify the module of code. For example: protobuf::proto_encoding etc. It will make the code less confusing and cleaner.
src/main/cpp/src/protobuf.hpp
Outdated
| std::unique_ptr<cudf::column> decode_protobuf_to_struct(cudf::column_view const& binary_input, | ||
| ProtobufDecodeContext const& context, | ||
| rmm::cuda_stream_view stream); |
There was a problem hiding this comment.
Lack of memory resources parameter.
src/main/cpp/src/protobuf.hpp
Outdated
| } | ||
| } | ||
|
|
||
| inline void validate_decode_context(ProtobufDecodeContext const& context) |
There was a problem hiding this comment.
Do not inline function in public header. Put them in a source file, only leave the declaration here.
There was a problem hiding this comment.
Moved to protobuf.cu
src/main/cpp/src/protobuf.hpp
Outdated
| throw std::invalid_argument(std::string("protobuf decode context: ") + name + | ||
| " size mismatch with schema (" + std::to_string(actual) + " vs " + | ||
| std::to_string(num_fields) + ")"); |
There was a problem hiding this comment.
Use CUDF_EXPECTS(,..., std::invalid_argument) for error check;
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
|
|
||
| struct ProtobufFieldMetaView { | ||
| nested_field_descriptor const& schema; | ||
| cudf::data_type const& output_type; |
There was a problem hiding this comment.
Dangling reference: This is binding to a temporary at protobuf.cu:225.
| cudf::data_type const& output_type; | |
| cudf::data_type const output_type; |
| * given an array of schema indices and the schema itself. | ||
| * Returns an empty vector if the max field number exceeds the threshold. | ||
| */ | ||
| inline std::vector<int> build_index_lookup_table(nested_field_descriptor const* schema, |
There was a problem hiding this comment.
build_index_lookup_table and build_field_lookup_table have identical algorithm bodies differing only in field-number accessor.
Consider extract their content into a shared function.
There was a problem hiding this comment.
Nice catch done.
| constexpr int ERR_REPEATED_COUNT_MISMATCH = 12; | ||
|
|
||
| // Maximum supported nesting depth for recursive struct decoding. | ||
| constexpr int MAX_NESTED_STRUCT_DECODE_DEPTH = 10; |
There was a problem hiding this comment.
This seems duplicate of MAX_NESTING_DEPTH=10 in protobuf.hpp, right?
There was a problem hiding this comment.
Oh, nice catch!
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
src/main/cpp/src/ProtobufJni.cpp
Outdated
| // Convert default string values | ||
| std::vector<std::vector<uint8_t>> default_string_values; | ||
| default_string_values.reserve(num_fields); | ||
| for (int i = 0; i < num_fields; ++i) { | ||
| jbyteArray byte_arr = static_cast<jbyteArray>(env->GetObjectArrayElement(default_strings, i)); | ||
| if (env->ExceptionCheck()) { return 0; } | ||
| if (byte_arr == nullptr) { | ||
| default_string_values.emplace_back(); | ||
| } else { | ||
| jsize len = env->GetArrayLength(byte_arr); | ||
| jbyte* bytes = env->GetByteArrayElements(byte_arr, nullptr); | ||
| if (bytes == nullptr) { | ||
| env->DeleteLocalRef(byte_arr); | ||
| return 0; | ||
| } | ||
| default_string_values.emplace_back(reinterpret_cast<uint8_t*>(bytes), | ||
| reinterpret_cast<uint8_t*>(bytes) + len); | ||
| env->ReleaseByteArrayElements(byte_arr, bytes, JNI_ABORT); | ||
| env->DeleteLocalRef(byte_arr); | ||
| } | ||
| } | ||
|
|
||
| // Convert enum valid values | ||
| std::vector<std::vector<int32_t>> enum_values; | ||
| enum_values.reserve(num_fields); | ||
| for (int i = 0; i < num_fields; ++i) { | ||
| jintArray int_arr = static_cast<jintArray>(env->GetObjectArrayElement(enum_valid_values, i)); | ||
| if (env->ExceptionCheck()) { return 0; } | ||
| if (int_arr == nullptr) { | ||
| enum_values.emplace_back(); | ||
| } else { | ||
| jsize len = env->GetArrayLength(int_arr); | ||
| jint* ints = env->GetIntArrayElements(int_arr, nullptr); | ||
| if (ints == nullptr) { | ||
| env->DeleteLocalRef(int_arr); | ||
| return 0; | ||
| } | ||
| enum_values.emplace_back(ints, ints + len); | ||
| env->ReleaseIntArrayElements(int_arr, ints, JNI_ABORT); | ||
| env->DeleteLocalRef(int_arr); | ||
| } | ||
| } |
There was a problem hiding this comment.
Nit: This byte-array-to-vector conversion pattern is repeated almost exactly for default_string_values and enum_values. Can we extract common function for both?
| #include <thrust/fill.h> | ||
| #include <thrust/for_each.h> | ||
| #include <thrust/iterator/counting_iterator.h> | ||
| #include <thrust/remove.h> | ||
| #include <thrust/scan.h> | ||
| #include <thrust/sort.h> | ||
| #include <thrust/transform.h> | ||
| #include <thrust/unique.h> |
There was a problem hiding this comment.
This seems still a CUDA header with lot of device code and kernel, thus I don't see what's difference from the protobuf_device_helpers.cuh?
There was a problem hiding this comment.
Good point. I moved those device code and kernels to protobuf_kernels.
| CUDF_CUDA_TRY(cudaMemcpyAsync( | ||
| d_default.data(), default_bytes.data(), def_len, cudaMemcpyHostToDevice, stream.value())); |
There was a problem hiding this comment.
Use memcpy from pageable memory is known to have limitations. Can we pass default_bytes as a pinned memory vector to this function instead?
There was a problem hiding this comment.
It looks to be a quite large refactor but I think it's done.
|
|
||
| auto const blocks = static_cast<int>((num_items + THREADS_PER_BLOCK - 1u) / THREADS_PER_BLOCK); | ||
| rmm::device_uvector<int32_t> d_valid_enums(valid_enums.size(), stream, mr); | ||
| CUDF_CUDA_TRY(cudaMemcpyAsync(d_valid_enums.data(), |
There was a problem hiding this comment.
Copy from pageable memory. Can we pass valid_enums as pinned memory vector instead?
There was a problem hiding this comment.
Yes. I did it along with the default_bytes change.
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
|
|
||
| namespace spark_rapids_jni::protobuf { | ||
|
|
||
| using namespace detail; |
There was a problem hiding this comment.
Changed to the namespace detail { way.
| CUDF_EXPECTS(context.default_ints.size() == num_fields, | ||
| "protobuf decode context: default_ints size mismatch with schema (" + | ||
| std::to_string(context.default_ints.size()) + " vs " + std::to_string(num_fields) + | ||
| ")", | ||
| std::invalid_argument); | ||
| CUDF_EXPECTS(context.default_floats.size() == num_fields, | ||
| "protobuf decode context: default_floats size mismatch with schema (" + | ||
| std::to_string(context.default_floats.size()) + " vs " + | ||
| std::to_string(num_fields) + ")", | ||
| std::invalid_argument); | ||
| CUDF_EXPECTS(context.default_bools.size() == num_fields, | ||
| "protobuf decode context: default_bools size mismatch with schema (" + | ||
| std::to_string(context.default_bools.size()) + " vs " + | ||
| std::to_string(num_fields) + ")", | ||
| std::invalid_argument); | ||
| CUDF_EXPECTS(context.default_strings.size() == num_fields, | ||
| "protobuf decode context: default_strings size mismatch with schema (" + | ||
| std::to_string(context.default_strings.size()) + " vs " + | ||
| std::to_string(num_fields) + ")", | ||
| std::invalid_argument); | ||
| CUDF_EXPECTS(context.enum_valid_values.size() == num_fields, | ||
| "protobuf decode context: enum_valid_values size mismatch with schema (" + | ||
| std::to_string(context.enum_valid_values.size()) + " vs " + | ||
| std::to_string(num_fields) + ")", | ||
| std::invalid_argument); | ||
| CUDF_EXPECTS(context.enum_names.size() == num_fields, | ||
| "protobuf decode context: enum_names size mismatch with schema (" + | ||
| std::to_string(context.enum_names.size()) + " vs " + std::to_string(num_fields) + | ||
| ")", | ||
| std::invalid_argument); |
There was a problem hiding this comment.
These calls would be expensive, as the strings are always concatenated to form the error messages unconditionally. The better approach, which only constructs the error message if failed, is:
if (cond) {
CUDF_FAIL(<err_msg>);
}
| ")", | ||
| std::invalid_argument); | ||
|
|
||
| std::set<std::pair<int, int>> seen_field_numbers; |
There was a problem hiding this comment.
Use unsorted hash table is better, since each insertion to the sorted table is more expensive.
| std::set<std::pair<int, int>> seen_field_numbers; | |
| #include <unordered_set> | |
| std::unordered_set<std::pair<int, int>> seen_field_numbers; |
There was a problem hiding this comment.
Done. Now using std::unordered_set<uint64_t> seen_field_numbers; because pair do not have a default hash function.
There was a problem hiding this comment.
I used to think set is basically faster than unordered_set for small size of data because of the big constant in O(1) and possible collision for hash table. It turns out I was wrong. Thanks!
| if (num_rows == 0 || field_indices.empty()) { return; } | ||
|
|
||
| bool has_required = false; | ||
| auto h_is_required = cudf::detail::make_host_vector<uint8_t>(field_indices.size(), stream); |
There was a problem hiding this comment.
We need make_pinned_vector specifically, do not use make_host_vector. make_host_vector is a generic factory function which can allocate from either pinned pool or pageable (just normal host memory) based on an internal threshold value. By default, the threshold to use pinned pool in cudf is 0, which mean just allocate from pageable memory, which mean the output host_vector is no different from std::vector.
| auto h_is_required = cudf::detail::make_host_vector<uint8_t>(field_indices.size(), stream); | |
| auto h_is_required = cudf::detail::make_pinned_vector<uint8_t>(field_indices.size(), stream); |
Note: This is applied to all places using make_host_vector.
There was a problem hiding this comment.
Thanks, updated.
| rmm::device_uvector<int32_t> invalid_rows(num_items, stream, mr); | ||
| thrust::transform(rmm::exec_policy_nosync(stream), | ||
| thrust::make_counting_iterator(0), | ||
| thrust::make_counting_iterator(num_items), | ||
| invalid_rows.begin(), | ||
| [item_invalid = item_invalid.data(), top_row_indices] __device__(int idx) { | ||
| return item_invalid[idx] ? top_row_indices[idx] : -1; | ||
| }); | ||
|
|
||
| auto valid_end = | ||
| thrust::remove(rmm::exec_policy_nosync(stream), invalid_rows.begin(), invalid_rows.end(), -1); | ||
| thrust::sort(rmm::exec_policy_nosync(stream), invalid_rows.begin(), valid_end); | ||
| auto unique_end = | ||
| thrust::unique(rmm::exec_policy_nosync(stream), invalid_rows.begin(), valid_end); | ||
| thrust::for_each(rmm::exec_policy_nosync(stream), | ||
| invalid_rows.begin(), | ||
| unique_end, | ||
| [row_invalid = row_invalid.data()] __device__(int32_t row_idx) { | ||
| row_invalid[row_idx] = true; | ||
| }); |
There was a problem hiding this comment.
This seems too overkilled (it uses transform→remove→sort→unique→for_each (5 passes) when a single scatter suffices. Bool writes are idempotent — no dedup needed):
#include <cuda/atomic>
- rmm::device_uvector<int32_t> invalid_rows(num_items, stream, mr);
- thrust::transform(...); // pass 1
- thrust::remove(...); // pass 2
- thrust::sort(...); // pass 3
- thrust::unique(...); // pass 4
- thrust::for_each(...); // pass 5
thrust::for_each(rmm::exec_policy_nosync(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(num_items),
[item_invalid, top_row_indices, row_invalid] __device__(int idx) {
if (item_invalid[idx]) {
cuda::atomic_ref<bool, cuda::thread_scope_device> ref(row_invalid[top_row_indices[idx]]);
ref.store(true, cuda::memory_order_relaxed);
}
});
| int32_t const* top_row_indices, | ||
| int* error_flag, | ||
| rmm::cuda_stream_view stream, | ||
| rmm::device_async_resource_ref mr) |
There was a problem hiding this comment.
Similarly, there is no return memory thus mr seems redundant.
| } | ||
| } | ||
| template <typename LocationProvider> | ||
| CUDF_KERNEL void copy_varlen_data_kernel(uint8_t const* message_data, |
There was a problem hiding this comment.
Does this kernel do copying on medium/large size strings? If so, generating the arrays src_ptr, dst_ptr, sizes using a thrust::for_each kernel then using cub batch memcpy would be much faster.
There was a problem hiding this comment.
Yes it might happen. Updated as you suggested.
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
| if (!has_required) { return; } | ||
|
|
||
| auto d_is_required = cudf::detail::make_device_uvector_async( | ||
| h_is_required, stream, rmm::mr::get_current_device_resource()); |
There was a problem hiding this comment.
The lines 89, 151, 153 and protobuf_kernels.cuh (line 570): our convention mandates usage cudf::get_current_device_resource_ref().
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Why premerge ci failure is introduced by [NVIDIA/spark-rapids#14525]? |
Sorry, I pasted the wrong PR link. It should be #4430 |
Co-authored-by: Nghia Truong <7416935+ttnghia@users.noreply.github.com>
|
build |
Resolve conflicts from part0 (PR NVIDIA#4373) merge. Take main's reviewed conventions: detail namespace, snake_case, exec_policy_nosync, cub::DeviceMemcpy, pinned vectors, no-brace style. Keep dev branch's full implementation code. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
First PR in a series adding a GPU-accelerated protobuf decoder to cuIO. Establishes the public API, schema validation, JNI bridge, Python/Cython bindings, and a stub decode_protobuf() entry point. The stub returns correctly-typed all-null columns for each schema field; actual data extraction is added in follow-up PRs. Includes: - C++ public API (protobuf.hpp): decode_protobuf_options, nested_field_descriptor, typed proto_encoding/proto_wire_type enums, validate_decode_options() - Shared CUDA infrastructure (device_helpers, host_helpers, kernels, types) - Java API (ProtobufSchemaDescriptor) and JNI bridge (ProtobufJni.cpp) - Python/Cython bindings (pylibcudf.io.protobuf) - 7 C++ tests covering output shape, type structure, and null propagation Migrated from NVIDIA/spark-rapids-jni#4373. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add GPU protobuf decoder: framework, API, and schema validation [1/4]
Summary
First PR in a four-part series adding a GPU-accelerated protobuf decoder to spark-rapids-jni. This PR establishes the public API, schema validation, JNI bridge, shared CUDA infrastructure, and a stub
decode_protobuf_to_structentry point.The decoder converts
LIST<INT8/UINT8>columns (one serialized protobuf message per row) into nested cuDFSTRUCTcolumns. The stub in this PR returns correctly-typed all-null columns for each schema field; actual data extraction is added in follow-up PRs.What is included
C++ public API (
protobuf.hpp):nested_field_descriptor,ProtobufDecodeContext, typedproto_encoding/proto_wire_typeenums,validate_decode_context()with comprehensive schema invariant checks (field numbers, parent-child topology, depth, wire type / encoding / output type compatibility,repeated && requiredrejection, sortedenum_valid_values).Java schema API (
Protobuf.java+ProtobufSchemaDescriptor.java): immutable schema descriptor with defensive deep copies, full validation mirroring C++,Serializablewith re-validation on deserialization. PublicdecodeToStruct()method with PERMISSIVE / FAILFAST mode support.JNI bridge (
ProtobufJni.cpp): converts 15 Java arrays to C++ProtobufDecodeContextwith null checks, field number range validation, wire type validation, and properDeleteLocalReffor all JNI local references including triple-nestedenum_names.Shared CUDA header (
protobuf_common.cuh): types, device helpers (read_varint,skip_field,decode_tag), LocationProvider structs, template extraction kernels, and forward declarations. Included in full so that follow-up PRs do not need to modify this header — eliminating merge conflicts across the PR chain.Stub decode (
protobuf.cu): validates context, handles empty-schema and zero-row edge cases with correct nested type construction, propagates input null mask to output struct, and assembles a STRUCT with all-null children via recursivemake_null_column_with_schema(respects nested STRUCT children and repeated LIST wrapping).Column utilities (
protobuf_builders.cu):make_null_column(all types),make_empty_column_safe,make_null_list_column_with_child,make_empty_list_column.Test coverage (26 tests)
Schema validation (13 tests): repeated+default rejection, repeated+required rejection, struct/list default rejection, enum metadata requirements, duplicate field numbers, child-parent constraints, encoding compatibility, depth limit, serialization roundtrip.
Output shape and null semantics (13 tests): empty schema, single/multi-field schemas, multiple rows, null input row propagation (verified with
isNullassertions), all-null input, zero-row with flat/nested/repeated schemas (including grandchild type verification), repeated scalar LIST wrapping, input validation.Follow-up PRs
scan_all_fields_kernel, batched extraction, all scalar types, defaults, required fieldsEach follow-up inserts decode logic into the
column_mapsection ofprotobuf.cuwithout modifying existing code.protobuf_common.cuhis frozen after this PR.Review guide
protobuf.hpp— Start here. The API contract:nested_field_descriptor,ProtobufDecodeContext,validate_decode_context().ProtobufSchemaDescriptor.java— Java-side validation mirror. Check defensive copies and deserialization re-validation.ProtobufJni.cpp— Focus on local reference cleanup in theenum_namestriple-nested loop.protobuf.cu— Stub decode. Verify: empty-schema returnsnum_rowswith 0 children; zero-row builds correct nested types; null mask is propagated from input;make_null_column_with_schemarecursively builds STRUCT children and LIST wrapping.protobuf_common.cuh— For this PR, focus on types andmake_empty_struct_column_with_schema/find_child_field_indices. The rest (device helpers, template kernels, LocationProviders) is infrastructure for follow-up PRs.ProtobufSchemaDescriptorTestfor validation,ProtobufTestfor output shape and null semantics.