diff --git a/java/src/main/java/org/lance/ipc/LanceScanner.java b/java/src/main/java/org/lance/ipc/LanceScanner.java index 3a413e0ccfd..a6db09f2ea8 100644 --- a/java/src/main/java/org/lance/ipc/LanceScanner.java +++ b/java/src/main/java/org/lance/ipc/LanceScanner.java @@ -146,6 +146,44 @@ public ArrowReader scanBatches() { } } + /** + * Populate a caller-provided {@link ArrowArrayStream} with this scan's results, using the C Data + * Interface release callback to return ownership. + * + *

Unlike {@link #scanBatches()}, no Java Arrow {@link ArrowReader} is created: the caller + * supplies a stream that they allocated (typically from their own {@link + * org.apache.arrow.memory.BufferAllocator}), and Lance writes the C struct directly into it. This + * lets a downstream consumer drive the read loop with their own Arrow runtime, which is required + * when the caller and Lance are loaded by different classloaders / different Arrow versions. + * + *

The caller owns the stream and is responsible for closing it. The release callback installed + * on the C struct routes back through Lance's native side. + * + *

Example: + * + *

{@code
+   * try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(callerAllocator)) {
+   *   scanner.exportArrowStream(stream);
+   *   try (ArrowReader reader = Data.importArrayStream(callerAllocator, stream)) {
+   *     while (reader.loadNextBatch()) {
+   *       VectorSchemaRoot batch = reader.getVectorSchemaRoot();
+   *       // ...
+   *     }
+   *   }
+   * }
+   * }
+ * + * @param stream the caller-allocated stream to populate + * @throws IOException if the native scan fails to start + */ + public void exportArrowStream(ArrowArrayStream stream) throws IOException { + Preconditions.checkNotNull(stream); + try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) { + Preconditions.checkArgument(nativeScannerHandle != 0, "Scanner is closed"); + openStream(stream.memoryAddress()); + } + } + private native void openStream(long streamAddress) throws IOException; @Override diff --git a/java/src/test/java/org/lance/ScannerTest.java b/java/src/test/java/org/lance/ScannerTest.java index 00434034b64..82452385e1a 100644 --- a/java/src/test/java/org/lance/ScannerTest.java +++ b/java/src/test/java/org/lance/ScannerTest.java @@ -22,6 +22,8 @@ import org.lance.ipc.ScanOptions; import org.lance.ipc.ScanStats; +import org.apache.arrow.c.ArrowArrayStream; +import org.apache.arrow.c.Data; import org.apache.arrow.dataset.scanner.Scanner; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -158,6 +160,51 @@ void testDatasetScannerSchema(@TempDir Path tempDir) throws Exception { } } + @Test + void testDatasetScannerExportArrowStream(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("dataset_scanner_export_stream").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + int totalRows = 40; + int batchRows = 20; + try (Dataset dataset = testDataset.write(1, totalRows)) { + try (LanceScanner scanner = + dataset.newScan( + new ScanOptions.Builder() + .batchSize(batchRows) + .columns(Arrays.asList("id")) + .build())) { + // Caller allocates the C stream from their own allocator; the scanner only fills the + // C struct. This is the path callers loaded by a different classloader use to avoid + // sharing Java Arrow vector classes with Lance. + try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { + scanner.exportArrowStream(stream); + try (ArrowReader reader = Data.importArrayStream(allocator, stream)) { + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + int index = 0; + while (reader.loadNextBatch()) { + List fieldVectors = root.getFieldVectors(); + assertEquals(1, fieldVectors.size()); + FieldVector fieldVector = fieldVectors.get(0); + assertEquals( + ArrowType.ArrowTypeID.Int, fieldVector.getField().getType().getTypeID()); + assertEquals(batchRows, fieldVector.getValueCount()); + IntVector vector = (IntVector) fieldVector; + for (int i = 0; i < batchRows; i++) { + assertEquals(index, vector.get(i)); + index++; + } + } + assertEquals(totalRows, index); + } + } + } + } + } + } + @Test void testDatasetScannerCountRows(@TempDir Path tempDir) throws Exception { String datasetPath = tempDir.resolve("dataset_scanner_count").toString();