From 1884a7d5eb875a2fcdce4f225953f7ae9e3e2ead Mon Sep 17 00:00:00 2001 From: Jing chen He Date: Tue, 9 Jun 2026 22:01:44 -0700 Subject: [PATCH] fix(java): expose updatedFragmentOffsets on Update operation for RewriteColumns commits --- java/lance-jni/src/error.rs | 6 ++ java/lance-jni/src/transaction.rs | 77 ++++++++++++++++- .../main/java/org/lance/operation/Update.java | 64 +++++++++++++- .../java/org/lance/operation/UpdateTest.java | 84 +++++++++++++++++++ 4 files changed, 224 insertions(+), 7 deletions(-) diff --git a/java/lance-jni/src/error.rs b/java/lance-jni/src/error.rs index cdb922a3cef..e27abd4fda6 100644 --- a/java/lance-jni/src/error.rs +++ b/java/lance-jni/src/error.rs @@ -236,6 +236,12 @@ impl From for Error { } } +impl From for Error { + fn from(err: std::io::Error) -> Self { + Self::io_error(err.to_string()) + } +} + impl From for Error { fn from(err: Utf8Error) -> Self { Self::input_error(err.to_string()) diff --git a/java/lance-jni/src/transaction.rs b/java/lance-jni/src/transaction.rs index 6bc1948ae6a..ce0f3f2416b 100644 --- a/java/lance-jni/src/transaction.rs +++ b/java/lance-jni/src/transaction.rs @@ -19,7 +19,7 @@ use jni::sys::{jboolean, jint, jlong}; use lance::dataset::CommitBuilder; use lance::dataset::transaction::{ DataReplacementGroup, Operation, RewriteGroup, RewrittenIndex, Transaction, TransactionBuilder, - UpdateMap, UpdateMapEntry, UpdateMode, + UpdateMap, UpdateMapEntry, UpdateMode, UpdatedFragmentOffsets, }; use lance::io::ObjectStoreParams; use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore; @@ -433,7 +433,7 @@ fn convert_to_java_operation_inner<'local>( fields_for_preserving_frag_bitmap, update_mode, inserted_rows_filter: _, - updated_fragment_offsets: _, + updated_fragment_offsets, } => { let removed_ids: Vec> = removed_fragment_ids .iter() @@ -457,9 +457,44 @@ fn convert_to_java_operation_inner<'local>( &[JValue::Object(&update_mode)], )? .l()?; + // Serialize updated_fragment_offsets to Java Map. + // Values are portable RoaringBitmap bytes so the JNI boundary stays O(bitmap size) + // rather than O(n rows). Empty HashMap when None so the Java constructor always + // receives a non-null map. A per-iteration local frame (capacity 4: Long + byte[] + + // put return + slack) bounds local-ref growth for large offset maps. + let java_offsets_map = { + let java_map = env.new_object("java/util/HashMap", "()V", &[])?; + if let Some(UpdatedFragmentOffsets(ref map)) = updated_fragment_offsets { + for (frag_id, bitmap) in map { + let mut buf: Vec = Vec::new(); + bitmap.serialize_into(&mut buf)?; + // JNI byte arrays are signed i8; reinterpret without copying. + let buf_i8: &[i8] = unsafe { + std::slice::from_raw_parts(buf.as_ptr() as *const i8, buf.len()) + }; + env.with_local_frame(4, |env| { + let java_key = env.new_object( + "java/lang/Long", + "(J)V", + &[JValue::Long(*frag_id as i64)], + )?; + let java_arr = env.new_byte_array(buf_i8.len() as i32)?; + env.set_byte_array_region(&java_arr, 0, buf_i8)?; + env.call_method( + &java_map, + "put", + "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;", + &[JValue::Object(&java_key), JValue::Object(&*java_arr)], + )?; + Ok::(JObject::null()) + })?; + } + } + java_map + }; Ok(env.new_object( "org/lance/operation/Update", - "(Ljava/util/List;Ljava/util/List;Ljava/util/List;[J[JLjava/util/Optional;)V", + "(Ljava/util/List;Ljava/util/List;Ljava/util/List;[J[JLjava/util/Optional;Ljava/util/Map;)V", &[ JValue::Object(&removed_fragment_ids_obj), JValue::Object(&updated_fragments_obj), @@ -467,6 +502,7 @@ fn convert_to_java_operation_inner<'local>( JValueGen::Object(&fields_modified), JValueGen::Object(&fields_for_preserving_frag_bitmap), JValue::Object(&update_mode_optional), + JValue::Object(&java_offsets_map), ], )?) } @@ -1232,6 +1268,39 @@ fn convert_to_rust_operation( update_mode.extract_object(env) })?; + let updated_fragment_offsets = { + let offsets_obj = env + .call_method( + java_operation, + "updatedFragmentOffsets", + "()Ljava/util/Map;", + &[], + )? + .l()?; + if offsets_obj.is_null() { + None + } else { + let jmap = JMap::from_env(env, &offsets_obj)?; + let mut iter = jmap.iter(env)?; + let mut offsets: HashMap = HashMap::new(); + env.with_local_frame(32, |env| { + while let Some((key, value)) = iter.next(env)? { + let frag_id = + env.call_method(&key, "longValue", "()J", &[])?.j()? as u64; + let buf: Vec = env.convert_byte_array(JByteArray::from(value))?; + let bitmap = RoaringBitmap::deserialize_from(buf.as_slice())?; + offsets.insert(frag_id, bitmap); + } + Ok::<(), Error>(()) + })?; + if offsets.is_empty() { + None + } else { + Some(UpdatedFragmentOffsets(offsets)) + } + } + }; + Operation::Update { removed_fragment_ids, updated_fragments, @@ -1241,7 +1310,7 @@ fn convert_to_rust_operation( fields_for_preserving_frag_bitmap, update_mode, inserted_rows_filter: None, - updated_fragment_offsets: None, + updated_fragment_offsets, } } "DataReplacement" => { diff --git a/java/src/main/java/org/lance/operation/Update.java b/java/src/main/java/org/lance/operation/Update.java index f886942b4b9..721bbd84b47 100644 --- a/java/src/main/java/org/lance/operation/Update.java +++ b/java/src/main/java/org/lance/operation/Update.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -31,19 +32,30 @@ public class Update implements Operation { private final long[] fieldsForPreservingFragBitmap; private final Optional updateMode; + /** + * Per-fragment matched row offsets serialized as portable RoaringBitmap bytes (little-endian, + * spec-compliant). Keys are fragment ids; values are the serialized bitmap for the local physical + * row offsets (0-based) within the fragment whose columns were rewritten. Empty map means the + * caller did not supply offsets and the partial last_updated refresh in build_manifest will not + * activate. + */ + private final Map updatedFragmentOffsets; + private Update( List removedFragmentIds, List updatedFragments, List newFragments, long[] fieldsModified, long[] fieldsForPreservingFragBitmap, - Optional updateMode) { + Optional updateMode, + Map updatedFragmentOffsets) { this.removedFragmentIds = removedFragmentIds; this.updatedFragments = updatedFragments; this.newFragments = newFragments; this.fieldsModified = fieldsModified; this.fieldsForPreservingFragBitmap = fieldsForPreservingFragBitmap; this.updateMode = updateMode; + this.updatedFragmentOffsets = updatedFragmentOffsets; } public static Builder builder() { @@ -74,6 +86,10 @@ public Optional updateMode() { return updateMode; } + public Map updatedFragmentOffsets() { + return updatedFragmentOffsets; + } + @Override public String name() { return "Update"; @@ -87,6 +103,7 @@ public String toString() { .add("fieldsModified", fieldsModified) .add("fieldsForPreservingFragBitmap", fieldsForPreservingFragBitmap) .add("updateMode", updateMode) + .add("updatedFragmentOffsets", updatedFragmentOffsets) .toString(); } @@ -100,7 +117,32 @@ public boolean equals(Object o) { && Objects.equals(newFragments, that.newFragments) && Arrays.equals(fieldsModified, that.fieldsModified) && Arrays.equals(fieldsForPreservingFragBitmap, that.fieldsForPreservingFragBitmap) - && Objects.equals(updateMode, that.updateMode); + && Objects.equals(updateMode, that.updateMode) + && offsetMapsEqual(updatedFragmentOffsets, that.updatedFragmentOffsets); + } + + /** Deep-equality for {@code Map}: keys by value, arrays by content. */ + private static boolean offsetMapsEqual(Map a, Map b) { + if (a == b) return true; + if (a.size() != b.size()) return false; + for (Map.Entry entry : a.entrySet()) { + if (!Arrays.equals(entry.getValue(), b.get(entry.getKey()))) return false; + } + return true; + } + + @Override + public int hashCode() { + int h = Objects.hash(removedFragmentIds, updatedFragments, newFragments, updateMode); + h = 31 * h + Arrays.hashCode(fieldsModified); + h = 31 * h + Arrays.hashCode(fieldsForPreservingFragBitmap); + // Sum entry hashes (XOR key ^ array-content hash) so result is insertion-order-independent. + int mapHash = 0; + for (Map.Entry entry : updatedFragmentOffsets.entrySet()) { + mapHash += Long.hashCode(entry.getKey()) ^ Arrays.hashCode(entry.getValue()); + } + h = 31 * h + mapHash; + return h; } public enum UpdateMode { @@ -115,6 +157,7 @@ public static class Builder { private long[] fieldsModified = new long[0]; private long[] fieldsForPreservingFragBitmap = new long[0]; private Optional updateMode = Optional.empty(); + private Map updatedFragmentOffsets = Collections.emptyMap(); private Builder() {} @@ -148,6 +191,20 @@ public Builder updateMode(Optional updateMode) { return this; } + /** + * Set the per-fragment matched row offsets for a RewriteColumns commit. + * + *

Keys are fragment ids; values are portable RoaringBitmap bytes (little-endian, + * spec-compliant serialization) encoding the local physical row offsets (0-based) within the + * fragment that matched the update_columns hash join. When non-empty and update mode is + * RewriteColumns with stable row IDs enabled, build_manifest will call the partial last_updated + * refresh for those offsets only. + */ + public Builder updatedFragmentOffsets(Map updatedFragmentOffsets) { + this.updatedFragmentOffsets = updatedFragmentOffsets; + return this; + } + public Update build() { return new Update( removedFragmentIds, @@ -155,7 +212,8 @@ public Update build() { newFragments, fieldsModified, fieldsForPreservingFragBitmap, - updateMode); + updateMode, + updatedFragmentOffsets); } } } diff --git a/java/src/test/java/org/lance/operation/UpdateTest.java b/java/src/test/java/org/lance/operation/UpdateTest.java index bb39a5f4d12..120689e8798 100644 --- a/java/src/test/java/org/lance/operation/UpdateTest.java +++ b/java/src/test/java/org/lance/operation/UpdateTest.java @@ -36,10 +36,14 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; public class UpdateTest extends OperationTestBase { @@ -104,6 +108,86 @@ void testUpdate(@TempDir Path tempDir) throws Exception { } } + @Test + void testUpdatedFragmentOffsetsRoundTrip(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("testUpdatedFragmentOffsetsRoundTrip").toString(); + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + dataset = testDataset.createEmptyDataset(); + + // Append an initial fragment so we have a real fragment id. + FragmentMetadata fragmentMeta = testDataset.createNewFragment(10); + try (Transaction appendTxn = + new Transaction.Builder() + .readVersion(dataset.version()) + .operation( + Append.builder().fragments(Collections.singletonList(fragmentMeta)).build()) + .build()) { + new CommitBuilder(dataset).execute(appendTxn).close(); + } + + dataset = Dataset.open(datasetPath, allocator); + long fragmentId = dataset.getFragments().get(0).getId(); + FragmentMetadata newFragment = testDataset.createNewFragment(10); + + // Build Update with non-empty updatedFragmentOffsets. Values are portable RoaringBitmap + // bytes encoding {1, 3, 5}: cookie(4) + containerCount(4) + key(2) + card-1(2) + + // offset(4) + elems(6). Offset = 16 (start of container data from beginning of stream). + Map offsets = new HashMap<>(); + offsets.put( + fragmentId, + new byte[] { + (byte) 0x3A, + (byte) 0x30, + (byte) 0x00, + (byte) 0x00, // cookie = 12346 + (byte) 0x01, + (byte) 0x00, + (byte) 0x00, + (byte) 0x00, // 1 container + (byte) 0x00, + (byte) 0x00, // container key 0 + (byte) 0x02, + (byte) 0x00, // cardinality - 1 = 2 + (byte) 0x10, + (byte) 0x00, + (byte) 0x00, + (byte) 0x00, // offset = 16 + (byte) 0x01, + (byte) 0x00, // element 1 + (byte) 0x03, + (byte) 0x00, // element 3 + (byte) 0x05, + (byte) 0x00 // element 5 + }); + + try (Transaction updateTxn = + new Transaction.Builder() + .readVersion(dataset.version()) + .operation( + Update.builder() + .removedFragmentIds(Collections.singletonList(fragmentId)) + .newFragments(Collections.singletonList(newFragment)) + .updateMode(Optional.of(UpdateMode.RewriteRows)) + .updatedFragmentOffsets(offsets) + .build()) + .build()) { + try (Dataset committed = new CommitBuilder(dataset).execute(updateTxn)) { + // Read the committed transaction back (exercises the IntoJava JNI path). + try (Transaction readTx = committed.readTransaction().orElseThrow()) { + assertInstanceOf(Update.class, readTx.operation()); + Update readOp = (Update) readTx.operation(); + + Map readOffsets = readOp.updatedFragmentOffsets(); + assertEquals(1, readOffsets.size()); + assertArrayEquals(offsets.get(fragmentId), readOffsets.get(fragmentId)); + } + } + } + } + } + @Test void testUpdateColumns(@TempDir Path tempDir) throws Exception { String datasetPath = tempDir.resolve("testUpdateColumns").toString();