Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,9 @@ add_library(
src/io/json/process_tokens.cu
src/io/json/write_json.cpp
src/io/json/write_json.cu
src/io/protobuf/builders.cu
src/io/protobuf/decode.cu
src/io/protobuf/kernels.cu
src/io/orc/aggregate_orc_metadata.cpp
src/io/orc/dict_enc.cu
src/io/orc/orc.cpp
Expand Down
55 changes: 55 additions & 0 deletions cpp/include/cudf/io/detail/protobuf.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cudf/io/protobuf.hpp>
#include <cudf/utilities/export.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/resource_ref.hpp>

namespace CUDF_EXPORT cudf {
namespace io::protobuf::detail {

/**
* @brief Check if an encoding is compatible with the given field and data type.
*/
bool is_encoding_compatible(nested_field_descriptor const& field, cudf::data_type const& type);

/**
* @brief Validate the decode context (schema consistency, encoding compatibility, etc.).
*
* @throws cudf::logic_error if the context is invalid
*/
void validate_decode_options(decode_protobuf_options const& options);

/**
* @brief Create a view into a single field's metadata from the decode options.
*/
protobuf_field_meta_view make_field_meta_view(decode_protobuf_options const& options,
int schema_idx);

/**
* @brief Internal implementation of decode_protobuf.
*/
std::unique_ptr<cudf::column> decode_protobuf(cudf::column_view const& binary_input,
decode_protobuf_options const& options,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

} // namespace io::protobuf::detail
} // namespace CUDF_EXPORT cudf
156 changes: 156 additions & 0 deletions cpp/include/cudf/io/protobuf.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright (c) 2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cudf/column/column.hpp>
#include <cudf/column/column_view.hpp>
#include <cudf/detail/utilities/host_vector.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/export.hpp>
#include <cudf/utilities/memory_resource.hpp>

#include <rmm/cuda_stream_view.hpp>

#include <cstdint>
#include <memory>
#include <vector>

namespace CUDF_EXPORT cudf {
namespace io::protobuf {

/**
* @brief Protobuf field encoding types.
*/
enum class proto_encoding : int {
DEFAULT = 0, ///< Standard varint encoding
FIXED = 1, ///< Fixed-width encoding (32-bit or 64-bit)
ZIGZAG = 2, ///< ZigZag encoding for signed integers
ENUM_STRING = 3, ///< Enum field decoded as string
};

/**
* @brief Get the integer value of a proto_encoding.
*/
CUDF_HOST_DEVICE constexpr int encoding_value(proto_encoding encoding)
{
return static_cast<int>(encoding);
}

/// Maximum protobuf field number (29-bit).
constexpr int MAX_FIELD_NUMBER = (1 << 29) - 1;

/**
* @brief Protobuf wire types.
*/
enum class proto_wire_type : int {
VARINT = 0, ///< Variable-length integer
I64BIT = 1, ///< 64-bit fixed
LEN = 2, ///< Length-delimited
SGROUP = 3, ///< Start group (deprecated)
EGROUP = 4, ///< End group (deprecated)
I32BIT = 5, ///< 32-bit fixed
};

/**
* @brief Get the integer value of a proto_wire_type.
*/
CUDF_HOST_DEVICE constexpr int wire_type_value(proto_wire_type wire_type)
{
return static_cast<int>(wire_type);
}

/// Maximum supported nesting depth for nested protobuf messages.
constexpr int MAX_NESTING_DEPTH = 10;

/**
* @brief Descriptor for a single field in a (possibly nested) protobuf schema.
*
* Fields are organized in a flat array where parent-child relationships are
* expressed via @p parent_idx. Top-level fields have `parent_idx == -1`.
*/
struct nested_field_descriptor {
int field_number; ///< Protobuf field number
int parent_idx; ///< Index of parent field in schema (-1 for top-level)
int depth; ///< Nesting depth (0 for top-level)
proto_wire_type wire_type; ///< Expected wire type
cudf::type_id output_type; ///< Output cudf type
proto_encoding encoding; ///< Encoding type
bool is_repeated; ///< Whether this field is repeated (array)
bool is_required; ///< Whether this field is required (proto2)
bool has_default_value; ///< Whether this field has a default value
};

/**
* @brief Context for decoding protobuf messages.
*
* Contains the schema (as a flat array of field descriptors), default values,
* and enum metadata needed for decoding.
*/
struct decode_protobuf_options {
std::vector<nested_field_descriptor> schema; ///< Flat array of field descriptors
std::vector<int64_t> default_ints; ///< Default integer values per field
std::vector<double> default_floats; ///< Default float values per field
std::vector<bool> default_bools; ///< Default boolean values per field
std::vector<cudf::detail::host_vector<uint8_t>>
default_strings; ///< Default string values per field
std::vector<cudf::detail::host_vector<int32_t>>
enum_valid_values; ///< Valid enum numbers per field
std::vector<std::vector<cudf::detail::host_vector<uint8_t>>>
enum_names; ///< UTF-8 enum names per field
bool fail_on_errors; ///< If true, throw on malformed messages; otherwise return nulls
};

/**
* @brief View into a single field's metadata from a decode_protobuf_options.
*/
struct protobuf_field_meta_view {
nested_field_descriptor const& schema;
cudf::data_type const output_type;
int64_t default_int;
double default_float;
bool default_bool;
cudf::detail::host_vector<uint8_t> const& default_string;
cudf::detail::host_vector<int32_t> const& enum_valid_values;
std::vector<cudf::detail::host_vector<uint8_t>> const& enum_names;
};

/**
* @brief Decode serialized protobuf messages into a struct column.
*
* Takes a LIST<UINT8> column where each row contains a serialized protobuf message,
* and decodes it into a STRUCT column according to the provided schema.
*
* Supports nested messages (up to 10 levels), repeated fields (as LIST columns),
* enum-as-string conversion, default values, and required field checking.
*
* @param binary_input LIST<INT8> or LIST<UINT8> column of serialized protobuf messages
* @param options Decoding options including schema, defaults, and enum metadata
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return A STRUCT column containing the decoded protobuf fields
*
* @throws cudf::logic_error if the schema is invalid
* @throws cudf::logic_error if fail_on_errors is true and a message cannot be decoded
*/
std::unique_ptr<cudf::column> decode_protobuf(
cudf::column_view const& binary_input,
decode_protobuf_options const& options,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

} // namespace io::protobuf
} // namespace CUDF_EXPORT cudf
126 changes: 126 additions & 0 deletions cpp/src/io/protobuf/builders.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (c) 2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "io/protobuf/kernels.cuh"

#include <cudf/lists/detail/lists_column_factories.hpp>
#include <cudf/strings/detail/strings_column_factories.cuh>

namespace cudf::io::protobuf::detail {

std::unique_ptr<cudf::column> make_null_column(cudf::data_type dtype,
cudf::size_type num_rows,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
if (num_rows == 0) { return cudf::make_empty_column(dtype); }

switch (dtype.id()) {
case cudf::type_id::BOOL8:
case cudf::type_id::INT8:
case cudf::type_id::UINT8:
case cudf::type_id::INT16:
case cudf::type_id::UINT16:
case cudf::type_id::INT32:
case cudf::type_id::UINT32:
case cudf::type_id::INT64:
case cudf::type_id::UINT64:
case cudf::type_id::FLOAT32:
case cudf::type_id::FLOAT64:
return cudf::make_fixed_width_column(dtype, num_rows, cudf::mask_state::ALL_NULL, stream, mr);
case cudf::type_id::STRING: {
rmm::device_uvector<cudf::strings::detail::string_index_pair> pairs(num_rows, stream, mr);
thrust::fill(rmm::exec_policy_nosync(stream),
pairs.data(),
pairs.end(),
cudf::strings::detail::string_index_pair{nullptr, 0});
return cudf::strings::detail::make_strings_column(pairs.data(), pairs.end(), stream, mr);
}
case cudf::type_id::LIST:
return cudf::lists::detail::make_all_nulls_lists_column(
num_rows, cudf::data_type{cudf::type_id::UINT8}, stream, mr);
case cudf::type_id::STRUCT: {
std::vector<std::unique_ptr<cudf::column>> empty_children;
auto null_mask = cudf::create_null_mask(num_rows, cudf::mask_state::ALL_NULL, stream, mr);
return cudf::make_structs_column(
num_rows, std::move(empty_children), num_rows, std::move(null_mask), stream, mr);
}
default: CUDF_FAIL("Unsupported type for null column creation");
}
}

std::unique_ptr<cudf::column> make_empty_column_safe(cudf::data_type dtype,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
switch (dtype.id()) {
case cudf::type_id::LIST: {
auto offsets_col =
std::make_unique<cudf::column>(cudf::data_type{cudf::type_id::INT32},
1,
rmm::device_buffer(sizeof(int32_t), stream, mr),
rmm::device_buffer{},
0);
CUDF_CUDA_TRY(cudaMemsetAsync(
offsets_col->mutable_view().data<int32_t>(), 0, sizeof(int32_t), stream.value()));
auto child_col = std::make_unique<cudf::column>(
cudf::data_type{cudf::type_id::UINT8}, 0, rmm::device_buffer{}, rmm::device_buffer{}, 0);
return cudf::make_lists_column(
0, std::move(offsets_col), std::move(child_col), 0, rmm::device_buffer{});
}
case cudf::type_id::STRUCT: {
std::vector<std::unique_ptr<cudf::column>> empty_children;
return cudf::make_structs_column(
0, std::move(empty_children), 0, rmm::device_buffer{}, stream, mr);
}
default: return cudf::make_empty_column(dtype);
}
}

std::unique_ptr<cudf::column> make_null_list_column_with_child(
std::unique_ptr<cudf::column> child_col,
cudf::size_type num_rows,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
rmm::device_uvector<int32_t> offsets(num_rows + 1, stream, mr);
thrust::fill(rmm::exec_policy_nosync(stream), offsets.begin(), offsets.end(), 0);
auto offsets_col = std::make_unique<cudf::column>(cudf::data_type{cudf::type_id::INT32},
num_rows + 1,
offsets.release(),
rmm::device_buffer{},
0);
auto null_mask = cudf::create_null_mask(num_rows, cudf::mask_state::ALL_NULL, stream, mr);
return cudf::make_lists_column(
num_rows, std::move(offsets_col), std::move(child_col), num_rows, std::move(null_mask));
}

std::unique_ptr<cudf::column> make_empty_list_column(std::unique_ptr<cudf::column> element_col,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
auto offsets_col = std::make_unique<cudf::column>(cudf::data_type{cudf::type_id::INT32},
1,
rmm::device_buffer(sizeof(int32_t), stream, mr),
rmm::device_buffer{},
0);
CUDF_CUDA_TRY(cudaMemsetAsync(
offsets_col->mutable_view().data<int32_t>(), 0, sizeof(int32_t), stream.value()));
return cudf::make_lists_column(
0, std::move(offsets_col), std::move(element_col), 0, rmm::device_buffer{});
}

} // namespace cudf::io::protobuf::detail
Loading
Loading