Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3c25016
framework code for protocol buffer decoder kernels
thirtiseven Mar 16, 2026
3a47f5a
style
thirtiseven Mar 16, 2026
5800387
address comments
thirtiseven Mar 17, 2026
6992e9a
address comments
thirtiseven Mar 17, 2026
0f76877
address comment
thirtiseven Mar 17, 2026
b619524
port enum refactor
thirtiseven Mar 17, 2026
d6997db
address comments
thirtiseven Mar 17, 2026
c1fed48
address comments
thirtiseven Mar 17, 2026
d25fba1
address comments
thirtiseven Mar 17, 2026
0e82ca8
fix compile
thirtiseven Mar 17, 2026
47dc867
address comments and cudf
thirtiseven Mar 18, 2026
e4f06f9
style
thirtiseven Mar 18, 2026
cbe889a
style
thirtiseven Mar 18, 2026
e23f3dd
address comments
thirtiseven Mar 18, 2026
d48d790
address comments
thirtiseven Mar 18, 2026
c50c87c
address comments and self-check
thirtiseven Mar 19, 2026
221f07e
cudf sync
thirtiseven Mar 19, 2026
2caf5d8
address comments
thirtiseven Mar 20, 2026
e0a990c
cudf
thirtiseven Mar 20, 2026
d8b3ede
address comments
thirtiseven Mar 23, 2026
b56aef4
address coemments, use pinned memory
thirtiseven Mar 24, 2026
ba011f7
Merge remote-tracking branch 'origin/main' into protobuf_pr0_framework
thirtiseven Apr 2, 2026
456da81
address comments
thirtiseven Apr 2, 2026
e748ea7
using get_current_device_resource_ref
thirtiseven Apr 3, 2026
1ab75a7
ci
thirtiseven Apr 3, 2026
176dfba
Update src/main/cpp/src/protobuf/protobuf_kernels.cu
ttnghia Apr 3, 2026
1f7f4ff
Update src/main/cpp/src/protobuf/protobuf_kernels.cu
ttnghia Apr 3, 2026
a8cd81c
Update src/main/cpp/src/protobuf/protobuf_kernels.cu
ttnghia Apr 3, 2026
aa9c1ca
Update src/main/cpp/src/protobuf/protobuf_kernels.cuh
ttnghia Apr 3, 2026
0cc87ee
Apply suggestions from code review
ttnghia Apr 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/main/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ add_library(
src/NativeParquetJni.cpp
src/NumberConverterJni.cpp
src/ParseURIJni.cpp
src/ProtobufJni.cpp
src/RegexRewriteUtilsJni.cpp
src/RowConversionJni.cpp
src/SparkResourceAdaptorJni.cpp
Expand Down Expand Up @@ -254,6 +255,9 @@ add_library(
src/multiply.cu
src/number_converter.cu
src/parse_uri.cu
src/protobuf.cu
src/protobuf_kernels.cu
src/protobuf_builders.cu
src/regex_rewrite_utils.cu
src/row_conversion.cu
src/round_float.cu
Expand Down
289 changes: 289 additions & 0 deletions src/main/cpp/src/ProtobufJni.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
/*
* Copyright (c) 2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "cudf_jni_apis.hpp"
#include "dtype_utils.hpp"
#include "protobuf.hpp"

#include <cudf/column/column_view.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/traits.hpp>

extern "C" {

JNIEXPORT jlong JNICALL
Java_com_nvidia_spark_rapids_jni_Protobuf_decodeToStruct(JNIEnv* env,
jclass,
jlong binary_input_view,
jintArray field_numbers,
jintArray parent_indices,
jintArray depth_levels,
jintArray wire_types,
jintArray output_type_ids,
jintArray encodings,
jbooleanArray is_repeated,
jbooleanArray is_required,
jbooleanArray has_default_value,
jlongArray default_ints,
jdoubleArray default_floats,
jbooleanArray default_bools,
jobjectArray default_strings,
jobjectArray enum_valid_values,
jobjectArray enum_names,
jboolean fail_on_errors)
{
JNI_NULL_CHECK(env, binary_input_view, "binary_input_view is null", 0);
JNI_NULL_CHECK(env, field_numbers, "field_numbers is null", 0);
JNI_NULL_CHECK(env, parent_indices, "parent_indices is null", 0);
JNI_NULL_CHECK(env, depth_levels, "depth_levels is null", 0);
JNI_NULL_CHECK(env, wire_types, "wire_types is null", 0);
JNI_NULL_CHECK(env, output_type_ids, "output_type_ids is null", 0);
JNI_NULL_CHECK(env, encodings, "encodings is null", 0);
JNI_NULL_CHECK(env, is_repeated, "is_repeated is null", 0);
JNI_NULL_CHECK(env, is_required, "is_required is null", 0);
JNI_NULL_CHECK(env, has_default_value, "has_default_value is null", 0);
JNI_NULL_CHECK(env, default_ints, "default_ints is null", 0);
JNI_NULL_CHECK(env, default_floats, "default_floats is null", 0);
JNI_NULL_CHECK(env, default_bools, "default_bools is null", 0);
JNI_NULL_CHECK(env, default_strings, "default_strings is null", 0);
JNI_NULL_CHECK(env, enum_valid_values, "enum_valid_values is null", 0);
JNI_NULL_CHECK(env, enum_names, "enum_names is null", 0);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each JNI_NULL_CHECK macro expands to a lot of code. Can we check all of these variable by a single macro? Like this:

auto const is_input_valid = binary_input_view && field_numbers && .......;
JNI_NULL_CHECK(env, is_valid_input, ...);
``

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


JNI_TRY
{
cudf::jni::auto_set_device(env);
auto const* input = reinterpret_cast<cudf::column_view const*>(binary_input_view);

cudf::jni::native_jintArray n_field_numbers(env, field_numbers);
cudf::jni::native_jintArray n_parent_indices(env, parent_indices);
cudf::jni::native_jintArray n_depth_levels(env, depth_levels);
cudf::jni::native_jintArray n_wire_types(env, wire_types);
cudf::jni::native_jintArray n_output_type_ids(env, output_type_ids);
cudf::jni::native_jintArray n_encodings(env, encodings);
cudf::jni::native_jbooleanArray n_is_repeated(env, is_repeated);
cudf::jni::native_jbooleanArray n_is_required(env, is_required);
cudf::jni::native_jbooleanArray n_has_default(env, has_default_value);
cudf::jni::native_jlongArray n_default_ints(env, default_ints);
cudf::jni::native_jdoubleArray n_default_floats(env, default_floats);
cudf::jni::native_jbooleanArray n_default_bools(env, default_bools);

int num_fields = n_field_numbers.size();

// Validate array sizes
if (n_parent_indices.size() != num_fields || n_depth_levels.size() != num_fields ||
n_wire_types.size() != num_fields || n_output_type_ids.size() != num_fields ||
n_encodings.size() != num_fields || n_is_repeated.size() != num_fields ||
n_is_required.size() != num_fields || n_has_default.size() != num_fields ||
n_default_ints.size() != num_fields || n_default_floats.size() != num_fields ||
n_default_bools.size() != num_fields ||
env->GetArrayLength(default_strings) != num_fields ||
env->GetArrayLength(enum_valid_values) != num_fields ||
env->GetArrayLength(enum_names) != num_fields) {
JNI_THROW_NEW(env,
cudf::jni::ILLEGAL_ARG_EXCEPTION_CLASS,
"All field arrays must have the same length",
0);
}

// Validate schema topology and wire types:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decode_protobuf_to_struct should calls validate_decode_context by its own.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the duplicated validation here.

// - parent index must be -1 or a prior field index
// - depth must be 0 for top-level and parent_depth + 1 for children
// - wire type must be one of {0, 1, 2, 5}
for (int i = 0; i < num_fields; ++i) {
auto const parent_idx = n_parent_indices[i];
auto const depth = n_depth_levels[i];
auto const wire_type = n_wire_types[i];

if (n_field_numbers[i] <= 0 || n_field_numbers[i] > spark_rapids_jni::MAX_FIELD_NUMBER) {
JNI_THROW_NEW(env,
cudf::jni::ILLEGAL_ARG_EXCEPTION_CLASS,
"field_numbers must be in range [1, 2^29-1]",
0);
}

if (!(wire_type ==
spark_rapids_jni::wire_type_value(spark_rapids_jni::proto_wire_type::VARINT) ||
wire_type ==
spark_rapids_jni::wire_type_value(spark_rapids_jni::proto_wire_type::I64BIT) ||
wire_type ==
spark_rapids_jni::wire_type_value(spark_rapids_jni::proto_wire_type::LEN) ||
wire_type ==
spark_rapids_jni::wire_type_value(spark_rapids_jni::proto_wire_type::I32BIT))) {
JNI_THROW_NEW(env,
cudf::jni::ILLEGAL_ARG_EXCEPTION_CLASS,
"wire_types must be one of {VARINT,I64BIT,LEN,I32BIT}",
0);
}

if (parent_idx < -1 || parent_idx >= num_fields || parent_idx >= i) {
JNI_THROW_NEW(env,
cudf::jni::ILLEGAL_ARG_EXCEPTION_CLASS,
"parent_indices must be -1 or a valid prior field index",
0);
}

if (parent_idx == -1) {
if (depth != 0) {
JNI_THROW_NEW(
env, cudf::jni::ILLEGAL_ARG_EXCEPTION_CLASS, "top-level fields must have depth 0", 0);
}
} else {
auto const parent_depth = n_depth_levels[parent_idx];
if (depth != parent_depth + 1) {
JNI_THROW_NEW(env,
cudf::jni::ILLEGAL_ARG_EXCEPTION_CLASS,
"child depth must equal parent depth + 1",
0);
}
}
}

// Build schema descriptors
std::vector<spark_rapids_jni::nested_field_descriptor> schema;
schema.reserve(num_fields);
for (int i = 0; i < num_fields; ++i) {
schema.push_back({n_field_numbers[i],
n_parent_indices[i],
n_depth_levels[i],
n_wire_types[i],
static_cast<cudf::type_id>(n_output_type_ids[i]),
n_encodings[i],
n_is_repeated[i] != 0,
n_is_required[i] != 0,
n_has_default[i] != 0});
}

// Build output types
std::vector<cudf::data_type> schema_output_types;
schema_output_types.reserve(num_fields);
for (int i = 0; i < num_fields; ++i) {
schema_output_types.emplace_back(static_cast<cudf::type_id>(n_output_type_ids[i]));
}

// Convert boolean arrays
std::vector<bool> default_bool_values;
default_bool_values.reserve(num_fields);
for (int i = 0; i < num_fields; ++i) {
default_bool_values.push_back(n_default_bools[i] != 0);
}

// Convert default values
std::vector<int64_t> default_int_values(n_default_ints.begin(), n_default_ints.end());
std::vector<double> default_float_values(n_default_floats.begin(), n_default_floats.end());

// Convert default string values
std::vector<std::vector<uint8_t>> default_string_values;
default_string_values.reserve(num_fields);
for (int i = 0; i < num_fields; ++i) {
jbyteArray byte_arr = static_cast<jbyteArray>(env->GetObjectArrayElement(default_strings, i));
if (env->ExceptionCheck()) { return 0; }
if (byte_arr == nullptr) {
default_string_values.emplace_back();
} else {
jsize len = env->GetArrayLength(byte_arr);
jbyte* bytes = env->GetByteArrayElements(byte_arr, nullptr);
if (bytes == nullptr) {
env->DeleteLocalRef(byte_arr);
return 0;
}
default_string_values.emplace_back(reinterpret_cast<uint8_t*>(bytes),
reinterpret_cast<uint8_t*>(bytes) + len);
env->ReleaseByteArrayElements(byte_arr, bytes, JNI_ABORT);
env->DeleteLocalRef(byte_arr);
}
}

// Convert enum valid values
std::vector<std::vector<int32_t>> enum_values;
enum_values.reserve(num_fields);
for (int i = 0; i < num_fields; ++i) {
jintArray int_arr = static_cast<jintArray>(env->GetObjectArrayElement(enum_valid_values, i));
if (env->ExceptionCheck()) { return 0; }
if (int_arr == nullptr) {
enum_values.emplace_back();
} else {
jsize len = env->GetArrayLength(int_arr);
jint* ints = env->GetIntArrayElements(int_arr, nullptr);
if (ints == nullptr) {
env->DeleteLocalRef(int_arr);
return 0;
}
enum_values.emplace_back(ints, ints + len);
env->ReleaseIntArrayElements(int_arr, ints, JNI_ABORT);
env->DeleteLocalRef(int_arr);
}
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This byte-array-to-vector conversion pattern is repeated almost exactly for default_string_values and enum_values. Can we extract common function for both?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


// Convert enum names (byte[][][]). For each field:
// - null => not an enum-as-string field
// - byte[][] where each byte[] is UTF-8 enum name, ordered with enum_values[field]
std::vector<std::vector<std::vector<uint8_t>>> enum_name_values;
enum_name_values.reserve(num_fields);
for (int i = 0; i < num_fields; ++i) {
jobjectArray names_arr = static_cast<jobjectArray>(env->GetObjectArrayElement(enum_names, i));
if (env->ExceptionCheck()) { return 0; }
if (names_arr == nullptr) {
enum_name_values.emplace_back();
} else {
jsize num_names = env->GetArrayLength(names_arr);
std::vector<std::vector<uint8_t>> names_for_field;
names_for_field.reserve(num_names);
for (jsize j = 0; j < num_names; ++j) {
jbyteArray name_bytes = static_cast<jbyteArray>(env->GetObjectArrayElement(names_arr, j));
if (env->ExceptionCheck()) {
env->DeleteLocalRef(names_arr);
return 0;
}
if (name_bytes == nullptr) {
names_for_field.emplace_back();
} else {
jsize len = env->GetArrayLength(name_bytes);
jbyte* bytes = env->GetByteArrayElements(name_bytes, nullptr);
if (bytes == nullptr) {
env->DeleteLocalRef(name_bytes);
env->DeleteLocalRef(names_arr);
return 0;
}
names_for_field.emplace_back(reinterpret_cast<uint8_t*>(bytes),
reinterpret_cast<uint8_t*>(bytes) + len);
env->ReleaseByteArrayElements(name_bytes, bytes, JNI_ABORT);
env->DeleteLocalRef(name_bytes);
}
}
enum_name_values.push_back(std::move(names_for_field));
env->DeleteLocalRef(names_arr);
}
}

spark_rapids_jni::ProtobufDecodeContext context{std::move(schema),
std::move(schema_output_types),
std::move(default_int_values),
std::move(default_float_values),
std::move(default_bool_values),
std::move(default_string_values),
std::move(enum_values),
std::move(enum_name_values),
static_cast<bool>(fail_on_errors)};

auto result =
spark_rapids_jni::decode_protobuf_to_struct(*input, context, cudf::get_default_stream());

return cudf::jni::release_as_jlong(result);
}
JNI_CATCH(env, 0);
}

} // extern "C"
Loading
Loading