Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/main/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ add_library(
src/BloomFilterJni.cpp
src/CaseWhenJni.cpp
src/CastStringJni.cpp
src/CharsetDecodeJni.cpp
src/DateTimeUtilsJni.cpp
src/DecimalUtilsJni.cpp
src/DeviceAttrJni.cpp
Expand Down Expand Up @@ -227,6 +228,7 @@ add_library(
src/cast_string.cu
src/cast_string_to_datetime.cu
src/cast_string_to_float.cu
src/charset_decode.cu
src/datetime_rebase.cu
src/datetime_truncate.cu
src/decimal_utils.cu
Expand Down
71 changes: 71 additions & 0 deletions src/main/cpp/scripts/GenerateGbkTable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CodingErrorAction;

/**
* Generates the GBK-to-Unicode lookup table used by charset_decode.cu.
*
* Usage:
* javac GenerateGbkTable.java
* java GenerateGbkTable > ../src/gbk_to_unicode_table.inc
*
* The output is a C array initializer suitable for #include into a
* constexpr uint16_t array. Each entry maps a GBK double-byte pair
* (first_byte in [0x81,0xFE], second_byte in [0x40,0xFE]) to its
* Unicode codepoint via: index = (first - 0x81) * 191 + (second - 0x40).
* Invalid/unmappable pairs are set to 0xFFFD (U+FFFD REPLACEMENT CHARACTER).
*
* The ground truth is Java's GBK CharsetDecoder, which matches the behavior
* of Spark's CPU-side StringDecode expression.
*/
public class GenerateGbkTable {
public static void main(String[] args) throws Exception {
Charset gbk = Charset.forName("GBK");
CharsetDecoder decoder = gbk.newDecoder()
.onMalformedInput(CodingErrorAction.REPORT)
.onUnmappableCharacter(CodingErrorAction.REPORT);

System.out.println("// GBK to Unicode lookup table.");
System.out.println("// Generated by GenerateGbkTable.java using Java's GBK CharsetDecoder.");
System.out.println("// Index: (first_byte - 0x81) * 191 + (second_byte - 0x40)");
System.out.println("// Entries: 126 * 191 = 24066");

for (int first = 0x81; first <= 0xFE; first++) {
StringBuilder sb = new StringBuilder();
for (int second = 0x40; second <= 0xFE; second++) {
byte[] bytes = {(byte) first, (byte) second};
int cp;
try {
decoder.reset();
CharBuffer cb = decoder.decode(ByteBuffer.wrap(bytes));
cp = (cb.length() == 1) ? cb.charAt(0) : 0xFFFD;
} catch (Exception e) {
cp = 0xFFFD;
}
if (second > 0x40) {
Comment on lines +43 to +63
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Generator uses REPORT mode; test ground truth uses REPLACE mode

The generator uses CodingErrorAction.REPORT (throws MalformedInputException/UnmappableCharacterException, caught and mapped to 0xFFFD), while CharsetDecodeTest.decodeGbkJava uses CodingErrorAction.REPLACE. For standard GBK pairs these produce identical tables, but the generator does not handle standalone 0x80 at all (it only iterates first from 0x81 to 0xFE). This is fine for the double-byte table itself, but documents a discrepancy with the test's ground truth function worth noting:

  • Generator: 0x80 not iterated → not in table.
  • Java ground truth (decodeGbkJava): 0x80 decoded with REPLACE mode → returns on most JVMs.

The generator comment should note that 0x80 → U+20AC needs separate handling in the kernel, since it is outside the generated table range.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good observation on the mode difference. We investigated this thoroughly — the behavioral difference between REPORT and REPLACE modes is limited to:

  1. Second byte 0x7F: REPLACE mode splits (lead, 0x7F) into two characters (FFFD + DEL), while REPORT mode on the isolated 2-byte buffer throws. The kernel excludes 0x7F from valid second bytes (second \!= 0x7F) to match REPLACE behavior.
  2. Second byte 0xFF: REPLACE mode consumes (lead, 0xFF) as a pair (advance 2, emit FFFD), while the table range only covers 0x40-0xFE. The kernel handles this with a special case (second > GBK_SECOND_BYTE_MAX → UNICODE_REPLACEMENT, advance 2).

We validated this by running a Java simulation of the kernel logic against Java REPLACE-mode streaming decode on 100k random byte sequences with zero mismatches.

Regarding 0x80: as noted in the reply above, this JDK (OpenJDK 1.8.0) returns FFFD for standalone 0x80, not €. The test dynamically adapts to the JDK behavior.

sb.append(' ');
}
sb.append(String.format("0x%04X,", cp));
}
System.out.println(" " + sb.toString());
}
}
}
42 changes: 42 additions & 0 deletions src/main/cpp/src/CharsetDecodeJni.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "charset_decode.hpp"
#include "cudf_jni_apis.hpp"

#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/memory_resource.hpp>

extern "C" {

JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_CharsetDecode_decodeNative(
JNIEnv* env, jclass, jlong input_column, jint charset)
{
JNI_NULL_CHECK(env, input_column, "input column is null", 0);
JNI_TRY
{
cudf::jni::auto_set_device(env);
auto const input = *reinterpret_cast<cudf::column_view const*>(input_column);
return cudf::jni::release_as_jlong(
spark_rapids_jni::decode_charset(input,
static_cast<spark_rapids_jni::charset_type>(charset),
cudf::get_default_stream(),
cudf::get_current_device_resource_ref()));
}
JNI_CATCH(env, 0);
}

} // extern "C"
226 changes: 226 additions & 0 deletions src/main/cpp/src/charset_decode.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/*
* Copyright (c) 2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "charset_decode.hpp"
#include "nvtx_ranges.hpp"

#include <cudf/column/column_factories.hpp>
#include <cudf/detail/utilities/cuda_memcpy.hpp>
#include <cudf/lists/lists_column_view.hpp>
#include <cudf/null_mask.hpp>
#include <cudf/strings/detail/strings_children.cuh>
#include <cudf/utilities/error.hpp>

#include <rmm/device_uvector.hpp>

#include <mutex>
#include <stdexcept>

namespace spark_rapids_jni {

namespace {

// GBK double-byte encoding constants
constexpr uint8_t GBK_FIRST_BYTE_MIN = 0x81;
constexpr uint8_t GBK_FIRST_BYTE_MAX = 0xFE;
constexpr uint8_t GBK_SECOND_BYTE_MIN = 0x40;
constexpr uint8_t GBK_SECOND_BYTE_MAX = 0xFE;
constexpr int GBK_FIRST_RANGE = GBK_FIRST_BYTE_MAX - GBK_FIRST_BYTE_MIN + 1; // 126
constexpr int GBK_SECOND_RANGE = GBK_SECOND_BYTE_MAX - GBK_SECOND_BYTE_MIN + 1; // 191
constexpr int GBK_TABLE_SIZE = GBK_FIRST_RANGE * GBK_SECOND_RANGE; // 24066
constexpr uint16_t UNICODE_REPLACEMENT = 0xFFFD;

// Generated by src/main/cpp/scripts/GenerateGbkTable.java using Java's GBK CharsetDecoder.
// To regenerate: javac GenerateGbkTable.java && java GenerateGbkTable > gbk_to_unicode_table.inc
static constexpr uint16_t gbk_to_unicode_host[] = {
#include "gbk_to_unicode_table.inc"
};

// Cached device copy of the GBK lookup table
static std::once_flag gbk_table_init_flag;
static std::unique_ptr<rmm::device_uvector<uint16_t>> gbk_table_device;

uint16_t const* get_gbk_table(rmm::cuda_stream_view stream)
{
// synchronize() ensures the memcpy completes before any kernel on any stream
// can use the table pointer returned below.
std::call_once(gbk_table_init_flag, [stream]() {
gbk_table_device = std::make_unique<rmm::device_uvector<uint16_t>>(GBK_TABLE_SIZE, stream);
cudf::detail::cuda_memcpy_async(
cudf::device_span<uint16_t>(gbk_table_device->data(), GBK_TABLE_SIZE),
cudf::host_span<uint16_t const>(gbk_to_unicode_host, GBK_TABLE_SIZE),
stream);
stream.synchronize();
});
return gbk_table_device->data();
}
Comment on lines +53 to +69
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Static device table singleton — dangling pointer risk on CUDA/RMM re-init

gbk_table_device and gbk_table_init_flag are file-scope statics. std::once_flag is never reset. If CUDA or RMM is torn down and re-initialized within the same process (a common pattern in unit-test suites and JVM-embedded workloads), the sequence is:

  1. First run: table is allocated with the stream from the first call.
  2. RMM allocator is reset / CUDA context is destroyed (test teardown).
  3. gbk_table_device's internal device pointer is now dangling.
  4. Next call to decode_gbk: call_once sees the flag already set, skips initialization, and returns the dangling pointer.
  5. The GPU kernel reads from freed device memory → silent corruption or crash.

Additionally, at process exit, the static destructor of gbk_table_device issues a cudaFree after CUDA is shut down, which can surface as a CUDA error in test logs.

Consider managing this table through the existing cudf resource lifecycle, or at minimum validating the device pointer on each call:

// Option: use a function-local static that ties lifetime to RMM resource
// Or check before returning:
CUDF_EXPECTS(gbk_table_device != nullptr, "GBK table not initialized");

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged. This is a known pattern in the project — other static singletons like the bloom filter hash seed table (bloom_filter.cu) follow the same approach. In spark-rapids-jni, the CUDA context and RMM allocator are not torn down and re-initialized within an executor process lifetime.

If this becomes a project-wide concern, it should be addressed holistically rather than in this PR alone.


/**
* @brief Encode a Unicode codepoint (BMP) as UTF-8 bytes.
*
* @param codepoint The Unicode codepoint (BMP only, <= 0xFFFF)
* @param output Pointer to write UTF-8 bytes (must have space for up to 3 bytes)
* @return Number of bytes written
*/
__device__ __forceinline__ int codepoint_to_utf8(uint32_t codepoint, char* output)
{
if (codepoint <= 0x7F) {
if (output) { output[0] = static_cast<char>(codepoint); }
return 1;
} else if (codepoint <= 0x7FF) {
if (output) {
output[0] = static_cast<char>(0xC0 | (codepoint >> 6));
output[1] = static_cast<char>(0x80 | (codepoint & 0x3F));
}
return 2;
} else {
// BMP: U+0800 to U+FFFF (includes U+FFFD replacement character)
if (output) {
output[0] = static_cast<char>(0xE0 | (codepoint >> 12));
output[1] = static_cast<char>(0x80 | ((codepoint >> 6) & 0x3F));
output[2] = static_cast<char>(0x80 | (codepoint & 0x3F));
}
return 3;
}
}

/**
* @brief Functor for two-pass GBK to UTF-8 string decoding.
*
* Used with cudf::strings::detail::make_strings_children.
* Pass 1 (d_chars==nullptr): compute output byte sizes.
* Pass 2 (d_chars!=nullptr): write UTF-8 output bytes.
*/
struct gbk_decode_fn {
uint8_t const* input_data;
cudf::size_type const* input_offsets;
uint16_t const* gbk_table;

cudf::size_type* d_sizes;
char* d_chars;
cudf::detail::input_offsetalator d_offsets;

__device__ void operator()(cudf::size_type idx)
{
auto const start = input_offsets[idx];
auto const end = input_offsets[idx + 1];
auto const len = end - start;

cudf::size_type output_size = 0;
char* out_ptr = d_chars ? d_chars + d_offsets[idx] : nullptr;

cudf::size_type i = 0;
while (i < len) {
uint8_t byte = input_data[start + i];

if (byte <= 0x7F) {
// ASCII: single byte, maps directly to UTF-8
if (out_ptr) { *out_ptr++ = static_cast<char>(byte); }
output_size += 1;
i += 1;
} else if (byte >= GBK_FIRST_BYTE_MIN && byte <= GBK_FIRST_BYTE_MAX && (i + 1) < len) {
Comment on lines +129 to +134
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Byte 0x80 not mapped to U+20AC (€) — mismatch with Java GBK

Java's "GBK" charset in both Oracle JDK and OpenJDK is implemented as CP936/Windows-936, which maps the single byte 0x80 to U+20AC (Euro sign, ). The current kernel treats 0x80 as an invalid lead byte (it falls outside [GBK_FIRST_BYTE_MIN, GBK_FIRST_BYTE_MAX] = [0x81, 0xFE]) and emits U+FFFD instead.

This causes a behavioral divergence from Spark's CPU-side StringDecode("GBK") on any JVM where GBK maps 0x80 → € (i.e., practically all modern JVMs). The testByte0x80 unit test acknowledges this JDK dependency — the test itself will fail on any JVM that maps 0x80 → € because the GPU always returns FFFD.

To match Java's GBK decoder, byte 0x80 should be decoded as U+20AC:

if (byte == 0x80) {
    // CP936/Windows-936 extension: 0x80 → U+20AC (EURO SIGN)
    int utf8_len = codepoint_to_utf8(0x20AC, out_ptr);
    if (out_ptr) { out_ptr += utf8_len; }
    output_size += utf8_len;
    i += 1;
} else if (byte <= 0x7F) {
    // ASCII ...

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We verified this on the actual build JDK (OpenJDK 1.8.0_482): new String(new byte[]{(byte)0x80}, "GBK") returns U+FFFD, not U+20AC. We also tried adding a 0x80 → U+20AC special case in the kernel — the integration test (GPU vs CPU comparison with random binary data) failed immediately because the CPU side produces FFFD.

The testByte0x80 test uses decodeGbkJava() which dynamically queries the JDK behavior as ground truth, so it adapts to whichever JDK is in use. The claim that "practically all modern JVMs" map 0x80 to € does not hold for this JDK.

// Potential GBK double-byte sequence
uint8_t second = input_data[start + i + 1];
if (second >= GBK_SECOND_BYTE_MIN && second != 0x7F) {
// Java's GBK decoder consumes any second byte >= 0x40 (except 0x7F) as a pair.
// Bytes 0x40-0xFE (minus 0x7F) are looked up in the table; 0xFF always maps to FFFD.
uint16_t unicode;
if (second <= GBK_SECOND_BYTE_MAX) {
int table_idx =
(byte - GBK_FIRST_BYTE_MIN) * GBK_SECOND_RANGE + (second - GBK_SECOND_BYTE_MIN);
unicode = gbk_table[table_idx];
} else {
unicode = UNICODE_REPLACEMENT; // second == 0xFF, not in table
}
int utf8_len = codepoint_to_utf8(unicode, out_ptr);
if (out_ptr) { out_ptr += utf8_len; }
output_size += utf8_len;
i += 2;
} else {
// Second byte < 0x40 or == 0x7F: not consumed as pair.
// Emit replacement for the lead byte only, re-process second byte.
int utf8_len = codepoint_to_utf8(UNICODE_REPLACEMENT, out_ptr);
if (out_ptr) { out_ptr += utf8_len; }
output_size += utf8_len;
i += 1;
}
} else {
// Invalid lead byte -> replacement character
int utf8_len = codepoint_to_utf8(UNICODE_REPLACEMENT, out_ptr);
if (out_ptr) { out_ptr += utf8_len; }
output_size += utf8_len;
i += 1;
}
}

if (!d_chars) { d_sizes[idx] = output_size; }
}
};

std::unique_ptr<cudf::column> decode_gbk(cudf::column_view const& input,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
auto const num_rows = input.size();
if (num_rows == 0) { return cudf::make_empty_column(cudf::type_id::STRING); }

CUDF_EXPECTS(input.type().id() == cudf::type_id::LIST,
"Input must be LIST type (BinaryType)",
std::invalid_argument);
cudf::lists_column_view list_col(input);
auto const child = list_col.child();
auto const offsets = list_col.offsets();
CUDF_EXPECTS(child.type().id() == cudf::type_id::UINT8,
"Input must be LIST<UINT8> (BinaryType)",
std::invalid_argument);
CUDF_EXPECTS(
!child.nullable(), "Child column of binary column must be non-nullable", std::invalid_argument);

auto const* gbk_table = get_gbk_table(stream);

auto [new_offsets, new_chars] =
cudf::strings::detail::make_strings_children(gbk_decode_fn{child.data<uint8_t>(),
offsets.data<cudf::size_type>(),
gbk_table,
nullptr,
nullptr,
cudf::detail::input_offsetalator{}},
num_rows,
stream,
mr);

return cudf::make_strings_column(num_rows,
std::move(new_offsets),
new_chars.release(),
input.null_count(),
cudf::copy_bitmask(input, stream, mr));
}

} // anonymous namespace

std::unique_ptr<cudf::column> decode_charset(cudf::column_view const& input,
charset_type charset,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
SRJ_FUNC_RANGE();
switch (charset) {
case charset_type::GBK: return decode_gbk(input, stream, mr);
default: CUDF_FAIL("Unsupported charset type for decode");
}
}

} // namespace spark_rapids_jni
Loading
Loading