From 494e00a623d7f2e4d5f6fdfe712117484e698f32 Mon Sep 17 00:00:00 2001
From: David Li
Date: Wed, 20 May 2026 10:53:52 +0900
Subject: [PATCH 1/2] fix: stream results
---
docs/DESIGN.md | 10 +-
...2026-05-20-true-streaming-query-results.md | 608 ++++++++++++++++++
...-20-true-streaming-query-results-design.md | 141 ++++
src/adbc_driver_quack.cc | 4 +-
src/duckdb_arrow_stream.cc | 145 ++++-
src/duckdb_arrow_stream.h | 7 +-
tests/duckdb_arrow_stream_test.cc | 181 +++++-
7 files changed, 1026 insertions(+), 70 deletions(-)
create mode 100644 docs/superpowers/plans/2026-05-20-true-streaming-query-results.md
create mode 100644 docs/superpowers/specs/2026-05-20-true-streaming-query-results-design.md
diff --git a/docs/DESIGN.md b/docs/DESIGN.md
index c2b217b..1495f37 100644
--- a/docs/DESIGN.md
+++ b/docs/DESIGN.md
@@ -35,8 +35,8 @@ root with CMake. There is no `rust/` or `go/` implementation directory.
and bulk ingest.
- `src/get_info_stream.cc` builds the Arrow C Stream result for
`AdbcConnectionGetInfo` with nanoarrow.
-- `src/duckdb_arrow_stream.cc` adapts DuckDB Arrow query results to the Arrow C
- Stream interface.
+- `src/duckdb_arrow_stream.cc` adapts DuckDB streaming query results to the
+ Arrow C Stream interface.
- `src/quack_uri.cc` parses `quack://...` connection URIs.
- `src/sql_escape.cc` builds escaped DuckDB SQL string literals and remote
query wrappers.
@@ -73,6 +73,12 @@ Statement execution wraps caller SQL with Quack remote execution through
creates or updates the remote table through Quack, clears Quack metadata cache,
and inserts into `remote..`.
+Result-producing statement execution and GetObjects use DuckDB streaming query
+results and convert each fetched DuckDB chunk to Arrow on demand. Callers must
+consume or release a returned Arrow stream before issuing another query on the
+same connection because DuckDB permits only one active streaming query result
+per connection.
+
## Metadata APIs
Implemented metadata support currently includes `AdbcConnectionGetInfo`.
diff --git a/docs/superpowers/plans/2026-05-20-true-streaming-query-results.md b/docs/superpowers/plans/2026-05-20-true-streaming-query-results.md
new file mode 100644
index 0000000..fc5bfa8
--- /dev/null
+++ b/docs/superpowers/plans/2026-05-20-true-streaming-query-results.md
@@ -0,0 +1,608 @@
+
+
+# True Streaming Query Results Implementation Plan
+
+> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
+
+**Goal:** Replace DuckDB's materialized Arrow query API with a true chunk-fetching Arrow C Stream adapter for statement queries and GetObjects.
+
+**Architecture:** Keep streaming logic isolated in `src/duckdb_arrow_stream.*`. The helper will own a DuckDB C++ `QueryResult`, fetch one `DataChunk` per `get_next`, and convert each chunk to Arrow only when requested.
+
+**Tech Stack:** C++20, DuckDB C++ API, Arrow C Data/Stream interfaces, Googletest, CMake, Pixi validation.
+
+---
+
+## File Structure
+
+- Modify `src/duckdb_arrow_stream.h`: rename the helper declaration to `ExecuteDuckDbStreamingArrowQuery`.
+- Modify `src/duckdb_arrow_stream.cc`: replace `duckdb_query_arrow` usage with `duckdb::Connection::SendQuery`, `duckdb::QueryResult::TryFetch`, `duckdb::ArrowConverter::ToArrowSchema`, and `duckdb::ArrowConverter::ToArrowArray`.
+- Modify `src/adbc_driver_quack.cc`: update `DriverStatementExecuteQuery` and `DriverConnectionGetObjects` to call the renamed helper.
+- Modify `tests/duckdb_arrow_stream_test.cc`: update the existing test and add streaming-specific coverage.
+- Optionally modify `docs/DESIGN.md`: update the helper description from "DuckDB Arrow query results" to "DuckDB streaming query results" if the code rename makes the existing description stale.
+
+## Task 1: Rename Helper API and Update Call Sites
+
+**Files:**
+- Modify: `src/duckdb_arrow_stream.h`
+- Modify: `src/duckdb_arrow_stream.cc`
+- Modify: `src/adbc_driver_quack.cc`
+- Modify: `tests/duckdb_arrow_stream_test.cc`
+
+- [ ] **Step 1: Rename the declaration**
+
+In `src/duckdb_arrow_stream.h`, replace:
+
+```cpp
+DuckDbArrowQueryResult ExecuteDuckDbArrowQuery(duckdb_connection connection,
+ std::string_view sql,
+ ArrowArrayStream* out,
+ int64_t* rows_affected);
+```
+
+with:
+
+```cpp
+DuckDbArrowQueryResult ExecuteDuckDbStreamingArrowQuery(
+ duckdb_connection connection, std::string_view sql, ArrowArrayStream* out,
+ int64_t* rows_affected);
+```
+
+- [ ] **Step 2: Rename the definition**
+
+In `src/duckdb_arrow_stream.cc`, replace the function signature:
+
+```cpp
+DuckDbArrowQueryResult ExecuteDuckDbArrowQuery(duckdb_connection connection,
+ std::string_view sql,
+ ArrowArrayStream* out,
+ int64_t* rows_affected) {
+```
+
+with:
+
+```cpp
+DuckDbArrowQueryResult ExecuteDuckDbStreamingArrowQuery(
+ duckdb_connection connection, std::string_view sql, ArrowArrayStream* out,
+ int64_t* rows_affected) {
+```
+
+- [ ] **Step 3: Update driver call sites**
+
+In `src/adbc_driver_quack.cc`, replace both calls:
+
+```cpp
+adbc_driver_quack::ExecuteDuckDbArrowQuery(
+```
+
+with:
+
+```cpp
+adbc_driver_quack::ExecuteDuckDbStreamingArrowQuery(
+```
+
+- [ ] **Step 4: Update test call sites**
+
+In `tests/duckdb_arrow_stream_test.cc`, replace:
+
+```cpp
+adbc_driver_quack::ExecuteDuckDbArrowQuery(
+```
+
+with:
+
+```cpp
+adbc_driver_quack::ExecuteDuckDbStreamingArrowQuery(
+```
+
+- [ ] **Step 5: Build to confirm the mechanical rename**
+
+Run:
+
+```bash
+cmake --build build/ci-test-linux-amd64 --target adbc_driver_quack_helper_tests
+```
+
+Expected: the target either builds successfully or fails only because Task 2 has not yet replaced the helper internals.
+
+## Task 2: Replace Materialized DuckDB Arrow State With Streaming Query State
+
+**Files:**
+- Modify: `src/duckdb_arrow_stream.cc`
+
+- [ ] **Step 1: Update includes**
+
+At the top of `src/duckdb_arrow_stream.cc`, add these includes:
+
+```cpp
+#include
+#include
+#include
+#include
+#include
+```
+
+Keep the existing standard headers.
+
+- [ ] **Step 2: Change stream state ownership**
+
+Replace:
+
+```cpp
+struct DuckDbArrowStreamState {
+ duckdb_arrow result = nullptr;
+ std::string last_error;
+};
+```
+
+with:
+
+```cpp
+struct DuckDbArrowStreamState {
+ duckdb::unique_ptr result;
+ duckdb::unordered_map<
+ duckdb::idx_t, const duckdb::shared_ptr>
+ extension_types;
+ std::string last_error;
+};
+```
+
+- [ ] **Step 3: Replace DuckDB Arrow error helper**
+
+Replace `DuckDbArrowError(...)` with:
+
+```cpp
+std::string DuckDbErrorMessage(duckdb::QueryResult const* result,
+ std::string fallback) {
+ if (result == nullptr || !result->HasError()) {
+ return fallback;
+ }
+ std::string const& error = result->GetError();
+ if (error.empty()) {
+ return fallback;
+ }
+ return error;
+}
+```
+
+- [ ] **Step 4: Update release callback**
+
+Replace the body of `StreamRelease` with:
+
+```cpp
+void StreamRelease(ArrowArrayStream* stream) {
+ auto* state = GetState(stream);
+ if (state == nullptr) {
+ ResetStream(stream);
+ return;
+ }
+ delete state;
+ ResetStream(stream);
+}
+```
+
+The `duckdb::unique_ptr` destructor closes the underlying result.
+
+## Task 3: Implement Streaming Schema and Chunk Fetching
+
+**Files:**
+- Modify: `src/duckdb_arrow_stream.cc`
+
+- [ ] **Step 1: Rewrite `StreamGetSchema`**
+
+Replace the callback body with:
+
+```cpp
+int StreamGetSchema(ArrowArrayStream* stream, ArrowSchema* out) {
+ auto* state = GetState(stream);
+ if (state == nullptr || state->result == nullptr || out == nullptr) {
+ return EINVAL;
+ }
+ std::memset(out, 0, sizeof(*out));
+ try {
+ duckdb::ArrowConverter::ToArrowSchema(
+ out, state->result->types, state->result->names,
+ state->result->client_properties);
+ } catch (duckdb::Exception const& ex) {
+ state->last_error = ex.what();
+ return EIO;
+ } catch (std::exception const& ex) {
+ state->last_error = ex.what();
+ return EIO;
+ } catch (...) {
+ state->last_error = "unknown error while getting DuckDB Arrow schema";
+ return EIO;
+ }
+ state->last_error.clear();
+ return 0;
+}
+```
+
+- [ ] **Step 2: Rewrite `StreamGetNext`**
+
+Replace the callback body with:
+
+```cpp
+int StreamGetNext(ArrowArrayStream* stream, ArrowArray* out) {
+ auto* state = GetState(stream);
+ if (state == nullptr || state->result == nullptr || out == nullptr) {
+ return EINVAL;
+ }
+ std::memset(out, 0, sizeof(*out));
+
+ duckdb::ErrorData fetch_error;
+ duckdb::unique_ptr chunk;
+ if (!state->result->TryFetch(chunk, fetch_error)) {
+ state->last_error = fetch_error.Message();
+ if (state->last_error.empty()) {
+ state->last_error = "failed to fetch DuckDB result chunk";
+ }
+ return EIO;
+ }
+ if (chunk == nullptr || chunk->size() == 0) {
+ state->last_error.clear();
+ return 0;
+ }
+
+ try {
+ duckdb::ArrowConverter::ToArrowArray(
+ *chunk, out, state->result->client_properties, state->extension_types);
+ } catch (duckdb::Exception const& ex) {
+ state->last_error = ex.what();
+ return EIO;
+ } catch (std::exception const& ex) {
+ state->last_error = ex.what();
+ return EIO;
+ } catch (...) {
+ state->last_error = "unknown error while converting DuckDB chunk to Arrow";
+ return EIO;
+ }
+
+ state->last_error.clear();
+ return 0;
+}
+```
+
+- [ ] **Step 3: Keep `StreamGetLastError` unchanged**
+
+Verify it still returns `nullptr` when `last_error` is empty and a stable C string otherwise.
+
+## Task 4: Implement Streaming Query Creation
+
+**Files:**
+- Modify: `src/duckdb_arrow_stream.cc`
+
+- [ ] **Step 1: Replace materialized query execution**
+
+Inside `ExecuteDuckDbStreamingArrowQuery`, replace the `duckdb_query_arrow` block and the old `duckdb_destroy_arrow` cleanup with:
+
+```cpp
+auto* duckdb_connection_ptr =
+ reinterpret_cast(connection);
+duckdb::unique_ptr query_result;
+try {
+ query_result = duckdb_connection_ptr->SendQuery(
+ std::string(sql), duckdb::QueryResultOutputType::ALLOW_STREAMING);
+} catch (duckdb::Exception const& ex) {
+ return Error(ADBC_STATUS_IO, ex.what());
+} catch (std::exception const& ex) {
+ return Error(ADBC_STATUS_IO, ex.what());
+} catch (...) {
+ return Error(ADBC_STATUS_IO, "unknown DuckDB query error");
+}
+
+if (query_result == nullptr) {
+ return Error(ADBC_STATUS_IO, "DuckDB query returned no result");
+}
+if (query_result->HasError()) {
+ return Error(ADBC_STATUS_IO,
+ DuckDbErrorMessage(query_result.get(), "DuckDB query failed"),
+ static_cast(query_result->GetErrorType()));
+}
+```
+
+- [ ] **Step 2: Preserve row count behavior**
+
+After the error check, add:
+
+```cpp
+if (rows_affected != nullptr) {
+ *rows_affected = -1;
+}
+```
+
+- [ ] **Step 3: Handle `out == nullptr`**
+
+Keep a no-output branch that discards the result object:
+
+```cpp
+if (out == nullptr) {
+ return {};
+}
+```
+
+- [ ] **Step 4: Compute extension type conversions once**
+
+Before allocating `DuckDbArrowStreamState`, add:
+
+```cpp
+duckdb::unordered_map<
+ duckdb::idx_t, const duckdb::shared_ptr>
+ extension_types;
+try {
+ extension_types = duckdb::ArrowTypeExtensionData::GetExtensionTypes(
+ *query_result->client_properties.client_context, query_result->types);
+} catch (duckdb::Exception const& ex) {
+ return Error(ADBC_STATUS_IO, ex.what());
+} catch (std::exception const& ex) {
+ return Error(ADBC_STATUS_IO, ex.what());
+} catch (...) {
+ return Error(ADBC_STATUS_IO,
+ "unknown error while preparing DuckDB Arrow conversions");
+}
+```
+
+- [ ] **Step 5: Install stream callbacks**
+
+Replace the old state allocation with:
+
+```cpp
+auto* state = new (std::nothrow) DuckDbArrowStreamState{
+ std::move(query_result), std::move(extension_types), {}};
+if (state == nullptr) {
+ return Error(ADBC_STATUS_UNKNOWN, "failed to allocate DuckDB Arrow stream");
+}
+
+out->private_data = state;
+out->get_schema = StreamGetSchema;
+out->get_next = StreamGetNext;
+out->get_last_error = StreamGetLastError;
+out->release = StreamRelease;
+return {};
+```
+
+- [ ] **Step 6: Add active stream comment**
+
+Add this comment immediately above `SendQuery`:
+
+```cpp
+// DuckDB allows one active StreamQueryResult per connection. ADBC callers
+// must consume or release the returned ArrowArrayStream before issuing another
+// query on the same connection.
+```
+
+## Task 5: Update and Expand Helper Tests
+
+**Files:**
+- Modify: `tests/duckdb_arrow_stream_test.cc`
+
+- [ ] **Step 1: Add RAII helpers for DuckDB setup**
+
+Keep the existing simple setup if preferred. Add a local helper function near the top of the test file:
+
+```cpp
+struct DuckDbFixture {
+ duckdb_database database = nullptr;
+ duckdb_connection connection = nullptr;
+
+ DuckDbFixture() {
+ EXPECT_EQ(duckdb_open(nullptr, &database), DuckDBSuccess);
+ EXPECT_EQ(duckdb_connect(database, &connection), DuckDBSuccess);
+ }
+
+ ~DuckDbFixture() {
+ duckdb_disconnect(&connection);
+ duckdb_close(&database);
+ }
+};
+```
+
+- [ ] **Step 2: Update the existing smoke test**
+
+Replace manual open/connect/close in `ExecutesQueryAsArrowStream` with:
+
+```cpp
+DuckDbFixture fixture;
+
+ArrowArrayStream stream = {};
+int64_t rows_affected = 0;
+auto const result = adbc_driver_quack::ExecuteDuckDbStreamingArrowQuery(
+ fixture.connection, "SELECT 42 AS answer", &stream, &rows_affected);
+```
+
+Keep the existing schema, first-array, end-of-stream, and release assertions.
+
+- [ ] **Step 3: Add a multi-chunk streaming test**
+
+Add:
+
+```cpp
+TEST(DuckDbArrowStreamTest, FetchesLargeQueryInMultipleArrays) {
+ DuckDbFixture fixture;
+
+ ArrowArrayStream stream = {};
+ auto const result = adbc_driver_quack::ExecuteDuckDbStreamingArrowQuery(
+ fixture.connection, "SELECT range AS value FROM range(10000)", &stream,
+ nullptr);
+
+ ASSERT_EQ(result.status, ADBC_STATUS_OK);
+ ASSERT_NE(stream.get_next, nullptr);
+
+ int array_count = 0;
+ int64_t row_count = 0;
+ while (true) {
+ ArrowArray array = {};
+ ASSERT_EQ(stream.get_next(&stream, &array), 0);
+ if (array.release == nullptr) {
+ break;
+ }
+ ++array_count;
+ row_count += array.length;
+ array.release(&array);
+ }
+
+ EXPECT_GT(array_count, 1);
+ EXPECT_EQ(row_count, 10000);
+ stream.release(&stream);
+}
+```
+
+- [ ] **Step 4: Add a query error test**
+
+Add:
+
+```cpp
+TEST(DuckDbArrowStreamTest, ReportsQueryErrors) {
+ DuckDbFixture fixture;
+
+ ArrowArrayStream stream = {};
+ auto const result = adbc_driver_quack::ExecuteDuckDbStreamingArrowQuery(
+ fixture.connection, "SELECT * FROM missing_table", &stream, nullptr);
+
+ EXPECT_EQ(result.status, ADBC_STATUS_IO);
+ EXPECT_FALSE(result.message.empty());
+ EXPECT_EQ(stream.release, nullptr);
+}
+```
+
+- [ ] **Step 5: Add callback argument validation test**
+
+Add:
+
+```cpp
+TEST(DuckDbArrowStreamTest, RejectsInvalidCallbackArguments) {
+ DuckDbFixture fixture;
+
+ ArrowArrayStream stream = {};
+ auto const result = adbc_driver_quack::ExecuteDuckDbStreamingArrowQuery(
+ fixture.connection, "SELECT 1", &stream, nullptr);
+ ASSERT_EQ(result.status, ADBC_STATUS_OK);
+
+ ArrowSchema schema = {};
+ ArrowArray array = {};
+ EXPECT_EQ(stream.get_schema(nullptr, &schema), EINVAL);
+ EXPECT_EQ(stream.get_schema(&stream, nullptr), EINVAL);
+ EXPECT_EQ(stream.get_next(nullptr, &array), EINVAL);
+ EXPECT_EQ(stream.get_next(&stream, nullptr), EINVAL);
+
+ stream.release(&stream);
+}
+```
+
+- [ ] **Step 6: Run helper tests**
+
+Run:
+
+```bash
+cmake --build build/ci-test-linux-amd64 --target adbc_driver_quack_helper_tests
+./build/ci-test-linux-amd64/tests/adbc_driver_quack_helper_tests
+```
+
+Expected: all helper tests pass.
+
+## Task 6: Update Project Design Notes
+
+**Files:**
+- Modify: `docs/DESIGN.md`
+
+- [ ] **Step 1: Update helper description**
+
+In `docs/DESIGN.md`, replace:
+
+```markdown
+- `src/duckdb_arrow_stream.cc` adapts DuckDB Arrow query results to the Arrow C
+ Stream interface.
+```
+
+with:
+
+```markdown
+- `src/duckdb_arrow_stream.cc` adapts DuckDB streaming query results to the
+ Arrow C Stream interface.
+```
+
+- [ ] **Step 2: Add runtime note**
+
+After the paragraph that starts with `Statement execution wraps caller SQL`, add:
+
+```markdown
+Result-producing statement execution and GetObjects use DuckDB streaming query
+results and convert each fetched DuckDB chunk to Arrow on demand. Callers must
+consume or release a returned Arrow stream before issuing another query on the
+same connection because DuckDB permits only one active streaming query result
+per connection.
+```
+
+## Task 7: Full Verification
+
+**Files:**
+- No source changes expected.
+
+- [ ] **Step 1: Run CI-style build**
+
+Run:
+
+```bash
+./ci/scripts/build.sh test linux amd64
+```
+
+Expected: build succeeds without compiler warnings.
+
+- [ ] **Step 2: Run C++ tests**
+
+Run:
+
+```bash
+./ci/scripts/test.sh linux amd64
+```
+
+Expected: all C++ tests pass.
+
+- [ ] **Step 3: Run validation collection**
+
+Run:
+
+```bash
+pixi run validate --collect-only
+```
+
+Expected: validation tests collect successfully.
+
+- [ ] **Step 4: Run GetObjects validation subset**
+
+Run:
+
+```bash
+pixi run validate -k get_objects
+```
+
+Expected: GetObjects validation passes.
+
+- [ ] **Step 5: Run pre-commit outside the sandbox**
+
+Run outside the sandbox:
+
+```bash
+pre-commit run --all-files
+```
+
+Expected: all hooks pass. Include any lockfile or formatting updates produced by hooks in the final diff if they are relevant.
+
+## Self-Review
+
+- Spec coverage: the plan covers both `AdbcStatementExecuteQuery` and `DriverConnectionGetObjects`, avoids deprecated C streaming APIs, converts one chunk per `get_next`, preserves Arrow C Stream behavior, and documents DuckDB's one-active-stream constraint.
+- Placeholder scan: no task contains unresolved placeholders.
+- Type consistency: the plan consistently uses `ExecuteDuckDbStreamingArrowQuery`, `DuckDbArrowQueryResult`, `duckdb::QueryResult`, `duckdb::DataChunk`, and Arrow C Stream callbacks.
diff --git a/docs/superpowers/specs/2026-05-20-true-streaming-query-results-design.md b/docs/superpowers/specs/2026-05-20-true-streaming-query-results-design.md
new file mode 100644
index 0000000..486e303
--- /dev/null
+++ b/docs/superpowers/specs/2026-05-20-true-streaming-query-results-design.md
@@ -0,0 +1,141 @@
+
+
+# True Streaming Query Results Design
+
+## Goal
+
+Make result-producing driver APIs stream DuckDB result chunks instead of
+materializing the full result set through `duckdb_query_arrow`.
+
+This applies to both:
+
+- `AdbcStatementExecuteQuery` for SQL query execution.
+- `AdbcConnectionGetObjects` for validation metadata results.
+
+## Selected Approach
+
+Use DuckDB's C++ streaming query API:
+
+```cpp
+duckdb::Connection::SendQuery(sql, duckdb::QueryResultOutputType::ALLOW_STREAMING)
+```
+
+The returned `duckdb::QueryResult` may be a `StreamQueryResult` or a
+`MaterializedQueryResult`, depending on DuckDB and statement shape. The helper
+will not call `duckdb_query_arrow`, so it will not request DuckDB's deprecated
+materialized Arrow result API. Result consumption will use `QueryResult::Fetch`
+or `QueryResult::TryFetch` and convert each `duckdb::DataChunk` to Arrow as it
+is requested by `ArrowArrayStream::get_next`.
+
+The deprecated DuckDB C streaming APIs are intentionally not used.
+
+## Architecture
+
+Replace the internals of `src/duckdb_arrow_stream.cc` with a streaming
+implementation while keeping a narrow helper boundary for callers. The helper
+will own a `duckdb::QueryResult` in `ArrowArrayStream::private_data`, expose the
+same Arrow C Stream callbacks, and release the query result when the stream is
+released.
+
+The helper API should be renamed from `ExecuteDuckDbArrowQuery` to
+`ExecuteDuckDbStreamingArrowQuery` so call sites and tests clearly distinguish
+the new behavior from the old materializing DuckDB Arrow API. Both
+`DriverStatementExecuteQuery` and `DriverConnectionGetObjects` will call the
+new helper.
+
+## Stream State
+
+The stream state will contain:
+
+```cpp
+std::unique_ptr result;
+duckdb::unordered_map>
+ extension_types;
+std::string last_error;
+```
+
+`extension_types` is computed once after a successful query using
+`duckdb::ArrowTypeExtensionData::GetExtensionTypes`, matching DuckDB's own C
+Arrow adapter behavior.
+
+## Data Flow
+
+1. The caller builds the remote Quack SQL exactly as it does today.
+2. The streaming helper calls `SendQuery(..., ALLOW_STREAMING)`.
+3. If DuckDB returns an error result, the helper converts it to
+ `ADBC_STATUS_IO` and no stream is installed.
+4. If `rows_affected` is requested for result-producing execution, the helper
+ sets it to `-1`. It does not call APIs that require materializing the result
+ only to compute a row count.
+5. `get_schema` converts `result->types` and `result->names` with
+ `duckdb::ArrowConverter::ToArrowSchema`.
+6. `get_next` fetches one `duckdb::DataChunk`. If no chunk remains, it leaves
+ `ArrowArray::release` null. If a chunk is available, it converts that chunk
+ with `duckdb::ArrowConverter::ToArrowArray`.
+7. `release` destroys the stored `QueryResult` and clears all callbacks.
+
+## Error Handling
+
+The helper should catch `duckdb::Exception`, `std::exception`, and unknown
+exceptions around query execution, schema conversion, and chunk conversion.
+Errors from callbacks are stored in `last_error` and reported through
+`get_last_error`.
+
+Callback return codes should remain POSIX-style integers:
+
+- `EINVAL` for null stream state or null output pointers.
+- `EIO` for DuckDB query/fetch/conversion failures.
+
+Driver entry points continue to convert helper failures into `AdbcError` with
+the existing `StatusError` path.
+
+## Concurrency Constraint
+
+DuckDB documents that there can be only one active `StreamQueryResult` per
+connection and that starting another query invalidates any existing streaming
+result on that connection. The driver will not add connection-level
+synchronization in this change. It will rely on the existing ADBC stream
+lifetime contract: callers must consume or release the stream before issuing
+another query on the same connection.
+
+Tests should cover the normal stream lifecycle and the behavior expected from
+chunk-by-chunk consumption. The connection-level single-active-stream rule
+should be documented in code comments where the streaming query is created.
+
+## Testing
+
+Update the existing helper test to use the renamed streaming helper. Add tests
+that:
+
+- Fetch a simple one-row query through the Arrow C Stream callbacks.
+- Fetch a large `range(...)` query and observe more than one Arrow array.
+- Return an empty end-of-stream Arrow array with `release == nullptr`.
+- Surface syntax or catalog errors from query execution.
+- Surface invalid callback arguments with `EINVAL`.
+
+Run the CI-style build and test scripts after implementation:
+
+```bash
+./ci/scripts/build.sh test linux amd64
+./ci/scripts/test.sh linux amd64
+pixi run validate --collect-only
+pixi run validate -k get_objects
+pre-commit run --all-files
+```
+
+Codex must run `pre-commit` outside the sandbox.
diff --git a/src/adbc_driver_quack.cc b/src/adbc_driver_quack.cc
index 12e9fba..177ca65 100644
--- a/src/adbc_driver_quack.cc
+++ b/src/adbc_driver_quack.cc
@@ -1303,7 +1303,7 @@ AdbcStatusCode DriverConnectionGetObjects(
return InvalidArgument(error, "invalid GetObjects depth");
}
- auto const result = adbc_driver_quack::ExecuteDuckDbArrowQuery(
+ auto const result = adbc_driver_quack::ExecuteDuckDbStreamingArrowQuery(
state->connection, adbc_driver_quack::BuildRemoteQuerySql(query), out,
nullptr);
if (result.status != ADBC_STATUS_OK) {
@@ -1414,7 +1414,7 @@ AdbcStatusCode DriverStatementExecuteQuery(AdbcStatement* statement,
std::string const remote_sql =
adbc_driver_quack::BuildRemoteQuerySql(state->sql);
if (out != nullptr) {
- auto const result = adbc_driver_quack::ExecuteDuckDbArrowQuery(
+ auto const result = adbc_driver_quack::ExecuteDuckDbStreamingArrowQuery(
state->connection->connection, remote_sql, out, rows_affected);
if (result.status != ADBC_STATUS_OK) {
return StatusError(error, result.status, result.message,
diff --git a/src/duckdb_arrow_stream.cc b/src/duckdb_arrow_stream.cc
index 32981a7..ee4d4d7 100644
--- a/src/duckdb_arrow_stream.cc
+++ b/src/duckdb_arrow_stream.cc
@@ -16,6 +16,11 @@
#include
#include
+#include
+#include
+#include
+#include
+#include
#include
#include
#include
@@ -24,7 +29,10 @@ namespace adbc_driver_quack {
namespace {
struct DuckDbArrowStreamState {
- duckdb_arrow result = nullptr;
+ duckdb::unique_ptr result;
+ duckdb::unordered_map<
+ duckdb::idx_t, duckdb::shared_ptr const>
+ extension_types;
std::string last_error;
};
@@ -55,12 +63,13 @@ DuckDbArrowStreamState* GetState(ArrowArrayStream* stream) {
return static_cast(stream->private_data);
}
-std::string DuckDbArrowError(duckdb_arrow result, std::string fallback) {
- if (result == nullptr) {
+std::string DuckDbErrorMessage(duckdb::QueryResult const* result,
+ std::string fallback) {
+ if (result == nullptr || !result->HasError()) {
return fallback;
}
- char const* error = duckdb_query_arrow_error(result);
- if (error == nullptr || error[0] == '\0') {
+ std::string const& error = result->GetError();
+ if (error.empty()) {
return fallback;
}
return error;
@@ -72,12 +81,18 @@ int StreamGetSchema(ArrowArrayStream* stream, ArrowSchema* out) {
return EINVAL;
}
std::memset(out, 0, sizeof(*out));
- auto* schema = out;
- if (duckdb_query_arrow_schema(
- state->result, reinterpret_cast(&schema)) !=
- DuckDBSuccess) {
- state->last_error =
- DuckDbArrowError(state->result, "failed to get DuckDB Arrow schema");
+ try {
+ duckdb::ArrowConverter::ToArrowSchema(out, state->result->types,
+ state->result->names,
+ state->result->client_properties);
+ } catch (duckdb::Exception const& ex) {
+ state->last_error = ex.what();
+ return EIO;
+ } catch (std::exception const& ex) {
+ state->last_error = ex.what();
+ return EIO;
+ } catch (...) {
+ state->last_error = "unknown error while getting DuckDB Arrow schema";
return EIO;
}
state->last_error.clear();
@@ -90,14 +105,47 @@ int StreamGetNext(ArrowArrayStream* stream, ArrowArray* out) {
return EINVAL;
}
std::memset(out, 0, sizeof(*out));
- auto* array = out;
- if (duckdb_query_arrow_array(state->result,
- reinterpret_cast(&array)) !=
- DuckDBSuccess) {
- state->last_error =
- DuckDbArrowError(state->result, "failed to get DuckDB Arrow array");
+
+ duckdb::ErrorData& fetch_error = state->result->GetErrorObject();
+ duckdb::unique_ptr chunk;
+ try {
+ if (!state->result->TryFetch(chunk, fetch_error)) {
+ state->last_error = fetch_error.Message();
+ if (state->last_error.empty()) {
+ state->last_error = DuckDbErrorMessage(
+ state->result.get(), "failed to fetch DuckDB result chunk");
+ }
+ return EIO;
+ }
+ } catch (duckdb::Exception const& ex) {
+ state->last_error = ex.what();
+ return EIO;
+ } catch (std::exception const& ex) {
+ state->last_error = ex.what();
+ return EIO;
+ } catch (...) {
+ state->last_error = "unknown error while fetching DuckDB result chunk";
return EIO;
}
+ if (chunk == nullptr || chunk->size() == 0) {
+ state->last_error.clear();
+ return 0;
+ }
+
+ try {
+ duckdb::ArrowConverter::ToArrowArray(
+ *chunk, out, state->result->client_properties, state->extension_types);
+ } catch (duckdb::Exception const& ex) {
+ state->last_error = ex.what();
+ return EIO;
+ } catch (std::exception const& ex) {
+ state->last_error = ex.what();
+ return EIO;
+ } catch (...) {
+ state->last_error = "unknown error while converting DuckDB chunk to Arrow";
+ return EIO;
+ }
+
state->last_error.clear();
return 0;
}
@@ -119,45 +167,72 @@ void StreamRelease(ArrowArrayStream* stream) {
ResetStream(stream);
return;
}
- duckdb_destroy_arrow(&state->result);
delete state;
ResetStream(stream);
}
} // namespace
-DuckDbArrowQueryResult ExecuteDuckDbArrowQuery(duckdb_connection connection,
- std::string_view sql,
- ArrowArrayStream* out,
- int64_t* rows_affected) {
+DuckDbArrowQueryResult ExecuteDuckDbStreamingArrowQuery(
+ duckdb_connection connection, std::string_view sql, ArrowArrayStream* out,
+ int64_t* rows_affected) {
ResetStream(out);
if (connection == nullptr) {
return Error(ADBC_STATUS_INVALID_STATE, "connection is not initialized");
}
- std::string sql_storage(sql);
- duckdb_arrow result = nullptr;
- if (duckdb_query_arrow(connection, sql_storage.c_str(), &result) !=
- DuckDBSuccess) {
- std::string const message =
- DuckDbArrowError(result, "DuckDB Arrow query failed");
- duckdb_destroy_arrow(&result);
- return Error(ADBC_STATUS_IO, message);
+ auto* duckdb_connection_ptr =
+ reinterpret_cast(connection);
+ duckdb::unique_ptr query_result;
+ try {
+ // DuckDB allows one active StreamQueryResult per connection. ADBC callers
+ // must consume or release the returned ArrowArrayStream before issuing
+ // another query on the same connection.
+ query_result = duckdb_connection_ptr->SendQuery(
+ std::string(sql), duckdb::QueryResultOutputType::ALLOW_STREAMING);
+ } catch (duckdb::Exception const& ex) {
+ return Error(ADBC_STATUS_IO, ex.what());
+ } catch (std::exception const& ex) {
+ return Error(ADBC_STATUS_IO, ex.what());
+ } catch (...) {
+ return Error(ADBC_STATUS_IO, "unknown DuckDB query error");
+ }
+
+ if (query_result == nullptr) {
+ return Error(ADBC_STATUS_IO, "DuckDB query returned no result");
+ }
+ if (query_result->HasError()) {
+ return Error(ADBC_STATUS_IO,
+ DuckDbErrorMessage(query_result.get(), "DuckDB query failed"),
+ static_cast(query_result->GetErrorType()));
}
if (rows_affected != nullptr) {
- idx_t const rows_changed = duckdb_arrow_rows_changed(result);
- *rows_affected = rows_changed > 0 ? static_cast(rows_changed) : -1;
+ *rows_affected = -1;
}
if (out == nullptr) {
- duckdb_destroy_arrow(&result);
return {};
}
- auto* state = new (std::nothrow) DuckDbArrowStreamState{result, {}};
+ duckdb::unordered_map<
+ duckdb::idx_t, duckdb::shared_ptr const>
+ extension_types;
+ try {
+ extension_types = duckdb::ArrowTypeExtensionData::GetExtensionTypes(
+ *query_result->client_properties.client_context, query_result->types);
+ } catch (duckdb::Exception const& ex) {
+ return Error(ADBC_STATUS_IO, ex.what());
+ } catch (std::exception const& ex) {
+ return Error(ADBC_STATUS_IO, ex.what());
+ } catch (...) {
+ return Error(ADBC_STATUS_IO,
+ "unknown error while preparing DuckDB Arrow conversions");
+ }
+
+ auto* state = new (std::nothrow) DuckDbArrowStreamState{
+ std::move(query_result), std::move(extension_types), {}};
if (state == nullptr) {
- duckdb_destroy_arrow(&result);
return Error(ADBC_STATUS_UNKNOWN, "failed to allocate DuckDB Arrow stream");
}
diff --git a/src/duckdb_arrow_stream.h b/src/duckdb_arrow_stream.h
index 215e58e..194eb38 100644
--- a/src/duckdb_arrow_stream.h
+++ b/src/duckdb_arrow_stream.h
@@ -29,9 +29,8 @@ struct DuckDbArrowQueryResult {
int32_t vendor_code = 0;
};
-DuckDbArrowQueryResult ExecuteDuckDbArrowQuery(duckdb_connection connection,
- std::string_view sql,
- ArrowArrayStream* out,
- int64_t* rows_affected);
+DuckDbArrowQueryResult ExecuteDuckDbStreamingArrowQuery(
+ duckdb_connection connection, std::string_view sql, ArrowArrayStream* out,
+ int64_t* rows_affected);
} // namespace adbc_driver_quack
diff --git a/tests/duckdb_arrow_stream_test.cc b/tests/duckdb_arrow_stream_test.cc
index b0ee07b..74eb719 100644
--- a/tests/duckdb_arrow_stream_test.cc
+++ b/tests/duckdb_arrow_stream_test.cc
@@ -18,43 +18,170 @@
#include
#include
-TEST(DuckDbArrowStreamTest, ExecutesQueryAsArrowStream) {
- duckdb_database database = nullptr;
- duckdb_connection connection = nullptr;
- ASSERT_EQ(duckdb_open(nullptr, &database), DuckDBSuccess);
- ASSERT_EQ(duckdb_connect(database, &connection), DuckDBSuccess);
+#include
+struct ArrowArrayStreamGuard {
ArrowArrayStream stream = {};
+
+ ~ArrowArrayStreamGuard() { Release(); }
+
+ ArrowArrayStream* get() { return &stream; }
+
+ void Release() {
+ if (stream.release != nullptr) {
+ stream.release(&stream);
+ }
+ }
+};
+
+struct ArrowSchemaGuard {
+ ArrowSchema schema = {};
+
+ ~ArrowSchemaGuard() { Release(); }
+
+ ArrowSchema* get() { return &schema; }
+
+ void Release() {
+ if (schema.release != nullptr) {
+ schema.release(&schema);
+ }
+ }
+};
+
+struct ArrowArrayGuard {
+ ArrowArray array = {};
+
+ ~ArrowArrayGuard() { Release(); }
+
+ ArrowArray* get() { return &array; }
+
+ void Release() {
+ if (array.release != nullptr) {
+ array.release(&array);
+ }
+ }
+};
+
+class DuckDbArrowStreamTest : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ ASSERT_EQ(duckdb_open(nullptr, &database_), DuckDBSuccess);
+ ASSERT_EQ(duckdb_connect(database_, &connection_), DuckDBSuccess);
+ }
+
+ void TearDown() override {
+ if (connection_ != nullptr) {
+ duckdb_disconnect(&connection_);
+ }
+ if (database_ != nullptr) {
+ duckdb_close(&database_);
+ }
+ }
+
+ duckdb_connection connection() { return connection_; }
+
+ private:
+ duckdb_database database_ = nullptr;
+ duckdb_connection connection_ = nullptr;
+};
+
+TEST_F(DuckDbArrowStreamTest, ExecutesQueryAsArrowStream) {
+ ArrowArrayStreamGuard stream;
int64_t rows_affected = 0;
- auto const result = adbc_driver_quack::ExecuteDuckDbArrowQuery(
- connection, "SELECT 42 AS answer", &stream, &rows_affected);
+ auto const result = adbc_driver_quack::ExecuteDuckDbStreamingArrowQuery(
+ connection(), "SELECT 42 AS answer", stream.get(), &rows_affected);
EXPECT_EQ(result.status, ADBC_STATUS_OK);
EXPECT_TRUE(result.message.empty());
EXPECT_EQ(rows_affected, -1);
- ASSERT_NE(stream.get_schema, nullptr);
- ASSERT_NE(stream.get_next, nullptr);
- ASSERT_NE(stream.release, nullptr);
+ ASSERT_NE(stream.stream.get_schema, nullptr);
+ ASSERT_NE(stream.stream.get_next, nullptr);
+ ASSERT_NE(stream.stream.release, nullptr);
- ArrowSchema schema = {};
- EXPECT_EQ(stream.get_schema(&stream, &schema), 0);
- ASSERT_NE(schema.release, nullptr);
- EXPECT_STREQ(schema.name, "duckdb_query_result");
- schema.release(&schema);
+ ArrowSchemaGuard schema;
+ EXPECT_EQ(stream.stream.get_schema(stream.get(), schema.get()), 0);
+ ASSERT_NE(schema.schema.release, nullptr);
+ EXPECT_STREQ(schema.schema.name, "duckdb_query_result");
+ schema.Release();
- ArrowArray array = {};
- EXPECT_EQ(stream.get_next(&stream, &array), 0);
- EXPECT_EQ(array.length, 1);
- ASSERT_NE(array.release, nullptr);
- array.release(&array);
+ ArrowArrayGuard array;
+ EXPECT_EQ(stream.stream.get_next(stream.get(), array.get()), 0);
+ EXPECT_EQ(array.array.length, 1);
+ ASSERT_NE(array.array.release, nullptr);
+ array.Release();
+
+ ArrowArrayGuard end;
+ EXPECT_EQ(stream.stream.get_next(stream.get(), end.get()), 0);
+ EXPECT_EQ(end.array.release, nullptr);
+
+ stream.Release();
+ EXPECT_EQ(stream.stream.release, nullptr);
+}
+
+TEST_F(DuckDbArrowStreamTest, FetchesLargeQueryInMultipleArrays) {
+ ArrowArrayStreamGuard stream;
+ auto const result = adbc_driver_quack::ExecuteDuckDbStreamingArrowQuery(
+ connection(), "SELECT range AS value FROM range(10000)", stream.get(),
+ nullptr);
+
+ ASSERT_EQ(result.status, ADBC_STATUS_OK);
+ ASSERT_NE(stream.stream.get_next, nullptr);
+
+ int array_count = 0;
+ int64_t row_count = 0;
+ while (true) {
+ ArrowArrayGuard array;
+ ASSERT_EQ(stream.stream.get_next(stream.get(), array.get()), 0);
+ if (array.array.release == nullptr) {
+ break;
+ }
+ ++array_count;
+ row_count += array.array.length;
+ }
+
+ EXPECT_GT(array_count, 1);
+ EXPECT_EQ(row_count, 10000);
+}
+
+TEST_F(DuckDbArrowStreamTest, ReportsQueryErrors) {
+ ArrowArrayStreamGuard stream;
+ auto const result = adbc_driver_quack::ExecuteDuckDbStreamingArrowQuery(
+ connection(), "SELECT * FROM missing_table", stream.get(), nullptr);
+
+ EXPECT_EQ(result.status, ADBC_STATUS_IO);
+ EXPECT_FALSE(result.message.empty());
+ EXPECT_EQ(stream.stream.release, nullptr);
+}
+
+TEST_F(DuckDbArrowStreamTest, RejectsInvalidCallbackArguments) {
+ ArrowArrayStreamGuard stream;
+ auto const result = adbc_driver_quack::ExecuteDuckDbStreamingArrowQuery(
+ connection(), "SELECT 1", stream.get(), nullptr);
+ ASSERT_EQ(result.status, ADBC_STATUS_OK);
+ ASSERT_NE(stream.stream.get_schema, nullptr);
+ ASSERT_NE(stream.stream.get_next, nullptr);
+
+ {
+ ArrowSchemaGuard schema;
+ EXPECT_EQ(stream.stream.get_schema(nullptr, schema.get()), EINVAL);
+ EXPECT_EQ(schema.schema.release, nullptr);
+ }
- ArrowArray end = {};
- EXPECT_EQ(stream.get_next(&stream, &end), 0);
- EXPECT_EQ(end.release, nullptr);
+ {
+ ArrowSchemaGuard schema;
+ EXPECT_EQ(stream.stream.get_schema(stream.get(), nullptr), EINVAL);
+ EXPECT_EQ(schema.schema.release, nullptr);
+ }
- stream.release(&stream);
- EXPECT_EQ(stream.release, nullptr);
+ {
+ ArrowArrayGuard array;
+ EXPECT_EQ(stream.stream.get_next(nullptr, array.get()), EINVAL);
+ EXPECT_EQ(array.array.release, nullptr);
+ }
- duckdb_disconnect(&connection);
- duckdb_close(&database);
+ {
+ ArrowArrayGuard array;
+ EXPECT_EQ(stream.stream.get_next(stream.get(), nullptr), EINVAL);
+ EXPECT_EQ(array.array.release, nullptr);
+ }
}
From 95ca9464ff5bdb43a31b3ea5920e3e4962c9cbef Mon Sep 17 00:00:00 2001
From: David Li
Date: Wed, 20 May 2026 22:38:33 +0900
Subject: [PATCH 2/2] update
---
src/duckdb_arrow_stream.cc | 103 ++++++++++++++++++++++++++++++-------
1 file changed, 85 insertions(+), 18 deletions(-)
diff --git a/src/duckdb_arrow_stream.cc b/src/duckdb_arrow_stream.cc
index ee4d4d7..ae48d75 100644
--- a/src/duckdb_arrow_stream.cc
+++ b/src/duckdb_arrow_stream.cc
@@ -19,7 +19,9 @@
#include
#include
#include
+#include
#include
+#include
#include
#include
#include
@@ -29,7 +31,11 @@ namespace adbc_driver_quack {
namespace {
struct DuckDbArrowStreamState {
+ duckdb::unique_ptr pending_result;
duckdb::unique_ptr result;
+ duckdb::vector types;
+ duckdb::vector names;
+ duckdb::ClientProperties client_properties;
duckdb::unordered_map<
duckdb::idx_t, duckdb::shared_ptr const>
extension_types;
@@ -75,16 +81,60 @@ std::string DuckDbErrorMessage(duckdb::QueryResult const* result,
return error;
}
+std::string DuckDbErrorMessage(duckdb::PendingQueryResult const* result,
+ std::string fallback) {
+ if (result == nullptr || !result->HasError()) {
+ return fallback;
+ }
+ std::string const& error = result->GetError();
+ if (error.empty()) {
+ return fallback;
+ }
+ return error;
+}
+
+int EnsureResultReady(DuckDbArrowStreamState* state) {
+ if (state->result != nullptr) {
+ return 0;
+ }
+ if (state->pending_result == nullptr) {
+ state->last_error = "DuckDB query result is not available";
+ return EINVAL;
+ }
+ try {
+ state->result = state->pending_result->Execute();
+ state->pending_result.reset();
+ } catch (duckdb::Exception const& ex) {
+ state->last_error = ex.what();
+ return EIO;
+ } catch (std::exception const& ex) {
+ state->last_error = ex.what();
+ return EIO;
+ } catch (...) {
+ state->last_error = "unknown DuckDB query execution error";
+ return EIO;
+ }
+ if (state->result == nullptr) {
+ state->last_error = "DuckDB query returned no result";
+ return EIO;
+ }
+ if (state->result->HasError()) {
+ state->last_error =
+ DuckDbErrorMessage(state->result.get(), "DuckDB query failed");
+ return EIO;
+ }
+ return 0;
+}
+
int StreamGetSchema(ArrowArrayStream* stream, ArrowSchema* out) {
auto* state = GetState(stream);
- if (state == nullptr || state->result == nullptr || out == nullptr) {
+ if (state == nullptr || out == nullptr) {
return EINVAL;
}
std::memset(out, 0, sizeof(*out));
try {
- duckdb::ArrowConverter::ToArrowSchema(out, state->result->types,
- state->result->names,
- state->result->client_properties);
+ duckdb::ArrowConverter::ToArrowSchema(out, state->types, state->names,
+ state->client_properties);
} catch (duckdb::Exception const& ex) {
state->last_error = ex.what();
return EIO;
@@ -101,11 +151,16 @@ int StreamGetSchema(ArrowArrayStream* stream, ArrowSchema* out) {
int StreamGetNext(ArrowArrayStream* stream, ArrowArray* out) {
auto* state = GetState(stream);
- if (state == nullptr || state->result == nullptr || out == nullptr) {
+ if (state == nullptr || out == nullptr) {
return EINVAL;
}
std::memset(out, 0, sizeof(*out));
+ int const result_status = EnsureResultReady(state);
+ if (result_status != 0) {
+ return result_status;
+ }
+
duckdb::ErrorData& fetch_error = state->result->GetErrorObject();
duckdb::unique_ptr chunk;
try {
@@ -133,8 +188,8 @@ int StreamGetNext(ArrowArrayStream* stream, ArrowArray* out) {
}
try {
- duckdb::ArrowConverter::ToArrowArray(
- *chunk, out, state->result->client_properties, state->extension_types);
+ duckdb::ArrowConverter::ToArrowArray(*chunk, out, state->client_properties,
+ state->extension_types);
} catch (duckdb::Exception const& ex) {
state->last_error = ex.what();
return EIO;
@@ -183,12 +238,12 @@ DuckDbArrowQueryResult ExecuteDuckDbStreamingArrowQuery(
auto* duckdb_connection_ptr =
reinterpret_cast(connection);
- duckdb::unique_ptr query_result;
+ duckdb::unique_ptr pending_result;
try {
// DuckDB allows one active StreamQueryResult per connection. ADBC callers
// must consume or release the returned ArrowArrayStream before issuing
// another query on the same connection.
- query_result = duckdb_connection_ptr->SendQuery(
+ pending_result = duckdb_connection_ptr->PendingQuery(
std::string(sql), duckdb::QueryResultOutputType::ALLOW_STREAMING);
} catch (duckdb::Exception const& ex) {
return Error(ADBC_STATUS_IO, ex.what());
@@ -198,13 +253,14 @@ DuckDbArrowQueryResult ExecuteDuckDbStreamingArrowQuery(
return Error(ADBC_STATUS_IO, "unknown DuckDB query error");
}
- if (query_result == nullptr) {
- return Error(ADBC_STATUS_IO, "DuckDB query returned no result");
+ if (pending_result == nullptr) {
+ return Error(ADBC_STATUS_IO, "DuckDB query returned no pending result");
}
- if (query_result->HasError()) {
- return Error(ADBC_STATUS_IO,
- DuckDbErrorMessage(query_result.get(), "DuckDB query failed"),
- static_cast(query_result->GetErrorType()));
+ if (pending_result->HasError()) {
+ return Error(
+ ADBC_STATUS_IO,
+ DuckDbErrorMessage(pending_result.get(), "DuckDB query failed"),
+ static_cast(pending_result->GetErrorType()));
}
if (rows_affected != nullptr) {
@@ -215,12 +271,15 @@ DuckDbArrowQueryResult ExecuteDuckDbStreamingArrowQuery(
return {};
}
+ duckdb::ClientProperties client_properties =
+ duckdb_connection_ptr->context->GetClientProperties();
+
duckdb::unordered_map<
duckdb::idx_t, duckdb::shared_ptr const>
extension_types;
try {
extension_types = duckdb::ArrowTypeExtensionData::GetExtensionTypes(
- *query_result->client_properties.client_context, query_result->types);
+ *client_properties.client_context, pending_result->types);
} catch (duckdb::Exception const& ex) {
return Error(ADBC_STATUS_IO, ex.what());
} catch (std::exception const& ex) {
@@ -230,8 +289,16 @@ DuckDbArrowQueryResult ExecuteDuckDbStreamingArrowQuery(
"unknown error while preparing DuckDB Arrow conversions");
}
- auto* state = new (std::nothrow) DuckDbArrowStreamState{
- std::move(query_result), std::move(extension_types), {}};
+ duckdb::vector types = pending_result->types;
+ duckdb::vector names = pending_result->names;
+ auto* state =
+ new (std::nothrow) DuckDbArrowStreamState{std::move(pending_result),
+ nullptr,
+ std::move(types),
+ std::move(names),
+ std::move(client_properties),
+ std::move(extension_types),
+ {}};
if (state == nullptr) {
return Error(ADBC_STATUS_UNKNOWN, "failed to allocate DuckDB Arrow stream");
}