diff --git a/docs/DESIGN.md b/docs/DESIGN.md index c2b217b..1495f37 100644 --- a/docs/DESIGN.md +++ b/docs/DESIGN.md @@ -35,8 +35,8 @@ root with CMake. There is no `rust/` or `go/` implementation directory. and bulk ingest. - `src/get_info_stream.cc` builds the Arrow C Stream result for `AdbcConnectionGetInfo` with nanoarrow. -- `src/duckdb_arrow_stream.cc` adapts DuckDB Arrow query results to the Arrow C - Stream interface. +- `src/duckdb_arrow_stream.cc` adapts DuckDB streaming query results to the + Arrow C Stream interface. - `src/quack_uri.cc` parses `quack://...` connection URIs. - `src/sql_escape.cc` builds escaped DuckDB SQL string literals and remote query wrappers. @@ -73,6 +73,12 @@ Statement execution wraps caller SQL with Quack remote execution through creates or updates the remote table through Quack, clears Quack metadata cache, and inserts into `remote..`. +Result-producing statement execution and GetObjects use DuckDB streaming query +results and convert each fetched DuckDB chunk to Arrow on demand. Callers must +consume or release a returned Arrow stream before issuing another query on the +same connection because DuckDB permits only one active streaming query result +per connection. + ## Metadata APIs Implemented metadata support currently includes `AdbcConnectionGetInfo`. diff --git a/docs/superpowers/plans/2026-05-20-true-streaming-query-results.md b/docs/superpowers/plans/2026-05-20-true-streaming-query-results.md new file mode 100644 index 0000000..fc5bfa8 --- /dev/null +++ b/docs/superpowers/plans/2026-05-20-true-streaming-query-results.md @@ -0,0 +1,608 @@ + + +# True Streaming Query Results Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Replace DuckDB's materialized Arrow query API with a true chunk-fetching Arrow C Stream adapter for statement queries and GetObjects. + +**Architecture:** Keep streaming logic isolated in `src/duckdb_arrow_stream.*`. The helper will own a DuckDB C++ `QueryResult`, fetch one `DataChunk` per `get_next`, and convert each chunk to Arrow only when requested. + +**Tech Stack:** C++20, DuckDB C++ API, Arrow C Data/Stream interfaces, Googletest, CMake, Pixi validation. + +--- + +## File Structure + +- Modify `src/duckdb_arrow_stream.h`: rename the helper declaration to `ExecuteDuckDbStreamingArrowQuery`. +- Modify `src/duckdb_arrow_stream.cc`: replace `duckdb_query_arrow` usage with `duckdb::Connection::SendQuery`, `duckdb::QueryResult::TryFetch`, `duckdb::ArrowConverter::ToArrowSchema`, and `duckdb::ArrowConverter::ToArrowArray`. +- Modify `src/adbc_driver_quack.cc`: update `DriverStatementExecuteQuery` and `DriverConnectionGetObjects` to call the renamed helper. +- Modify `tests/duckdb_arrow_stream_test.cc`: update the existing test and add streaming-specific coverage. +- Optionally modify `docs/DESIGN.md`: update the helper description from "DuckDB Arrow query results" to "DuckDB streaming query results" if the code rename makes the existing description stale. + +## Task 1: Rename Helper API and Update Call Sites + +**Files:** +- Modify: `src/duckdb_arrow_stream.h` +- Modify: `src/duckdb_arrow_stream.cc` +- Modify: `src/adbc_driver_quack.cc` +- Modify: `tests/duckdb_arrow_stream_test.cc` + +- [ ] **Step 1: Rename the declaration** + +In `src/duckdb_arrow_stream.h`, replace: + +```cpp +DuckDbArrowQueryResult ExecuteDuckDbArrowQuery(duckdb_connection connection, + std::string_view sql, + ArrowArrayStream* out, + int64_t* rows_affected); +``` + +with: + +```cpp +DuckDbArrowQueryResult ExecuteDuckDbStreamingArrowQuery( + duckdb_connection connection, std::string_view sql, ArrowArrayStream* out, + int64_t* rows_affected); +``` + +- [ ] **Step 2: Rename the definition** + +In `src/duckdb_arrow_stream.cc`, replace the function signature: + +```cpp +DuckDbArrowQueryResult ExecuteDuckDbArrowQuery(duckdb_connection connection, + std::string_view sql, + ArrowArrayStream* out, + int64_t* rows_affected) { +``` + +with: + +```cpp +DuckDbArrowQueryResult ExecuteDuckDbStreamingArrowQuery( + duckdb_connection connection, std::string_view sql, ArrowArrayStream* out, + int64_t* rows_affected) { +``` + +- [ ] **Step 3: Update driver call sites** + +In `src/adbc_driver_quack.cc`, replace both calls: + +```cpp +adbc_driver_quack::ExecuteDuckDbArrowQuery( +``` + +with: + +```cpp +adbc_driver_quack::ExecuteDuckDbStreamingArrowQuery( +``` + +- [ ] **Step 4: Update test call sites** + +In `tests/duckdb_arrow_stream_test.cc`, replace: + +```cpp +adbc_driver_quack::ExecuteDuckDbArrowQuery( +``` + +with: + +```cpp +adbc_driver_quack::ExecuteDuckDbStreamingArrowQuery( +``` + +- [ ] **Step 5: Build to confirm the mechanical rename** + +Run: + +```bash +cmake --build build/ci-test-linux-amd64 --target adbc_driver_quack_helper_tests +``` + +Expected: the target either builds successfully or fails only because Task 2 has not yet replaced the helper internals. + +## Task 2: Replace Materialized DuckDB Arrow State With Streaming Query State + +**Files:** +- Modify: `src/duckdb_arrow_stream.cc` + +- [ ] **Step 1: Update includes** + +At the top of `src/duckdb_arrow_stream.cc`, add these includes: + +```cpp +#include +#include +#include +#include +#include +``` + +Keep the existing standard headers. + +- [ ] **Step 2: Change stream state ownership** + +Replace: + +```cpp +struct DuckDbArrowStreamState { + duckdb_arrow result = nullptr; + std::string last_error; +}; +``` + +with: + +```cpp +struct DuckDbArrowStreamState { + duckdb::unique_ptr result; + duckdb::unordered_map< + duckdb::idx_t, const duckdb::shared_ptr> + extension_types; + std::string last_error; +}; +``` + +- [ ] **Step 3: Replace DuckDB Arrow error helper** + +Replace `DuckDbArrowError(...)` with: + +```cpp +std::string DuckDbErrorMessage(duckdb::QueryResult const* result, + std::string fallback) { + if (result == nullptr || !result->HasError()) { + return fallback; + } + std::string const& error = result->GetError(); + if (error.empty()) { + return fallback; + } + return error; +} +``` + +- [ ] **Step 4: Update release callback** + +Replace the body of `StreamRelease` with: + +```cpp +void StreamRelease(ArrowArrayStream* stream) { + auto* state = GetState(stream); + if (state == nullptr) { + ResetStream(stream); + return; + } + delete state; + ResetStream(stream); +} +``` + +The `duckdb::unique_ptr` destructor closes the underlying result. + +## Task 3: Implement Streaming Schema and Chunk Fetching + +**Files:** +- Modify: `src/duckdb_arrow_stream.cc` + +- [ ] **Step 1: Rewrite `StreamGetSchema`** + +Replace the callback body with: + +```cpp +int StreamGetSchema(ArrowArrayStream* stream, ArrowSchema* out) { + auto* state = GetState(stream); + if (state == nullptr || state->result == nullptr || out == nullptr) { + return EINVAL; + } + std::memset(out, 0, sizeof(*out)); + try { + duckdb::ArrowConverter::ToArrowSchema( + out, state->result->types, state->result->names, + state->result->client_properties); + } catch (duckdb::Exception const& ex) { + state->last_error = ex.what(); + return EIO; + } catch (std::exception const& ex) { + state->last_error = ex.what(); + return EIO; + } catch (...) { + state->last_error = "unknown error while getting DuckDB Arrow schema"; + return EIO; + } + state->last_error.clear(); + return 0; +} +``` + +- [ ] **Step 2: Rewrite `StreamGetNext`** + +Replace the callback body with: + +```cpp +int StreamGetNext(ArrowArrayStream* stream, ArrowArray* out) { + auto* state = GetState(stream); + if (state == nullptr || state->result == nullptr || out == nullptr) { + return EINVAL; + } + std::memset(out, 0, sizeof(*out)); + + duckdb::ErrorData fetch_error; + duckdb::unique_ptr chunk; + if (!state->result->TryFetch(chunk, fetch_error)) { + state->last_error = fetch_error.Message(); + if (state->last_error.empty()) { + state->last_error = "failed to fetch DuckDB result chunk"; + } + return EIO; + } + if (chunk == nullptr || chunk->size() == 0) { + state->last_error.clear(); + return 0; + } + + try { + duckdb::ArrowConverter::ToArrowArray( + *chunk, out, state->result->client_properties, state->extension_types); + } catch (duckdb::Exception const& ex) { + state->last_error = ex.what(); + return EIO; + } catch (std::exception const& ex) { + state->last_error = ex.what(); + return EIO; + } catch (...) { + state->last_error = "unknown error while converting DuckDB chunk to Arrow"; + return EIO; + } + + state->last_error.clear(); + return 0; +} +``` + +- [ ] **Step 3: Keep `StreamGetLastError` unchanged** + +Verify it still returns `nullptr` when `last_error` is empty and a stable C string otherwise. + +## Task 4: Implement Streaming Query Creation + +**Files:** +- Modify: `src/duckdb_arrow_stream.cc` + +- [ ] **Step 1: Replace materialized query execution** + +Inside `ExecuteDuckDbStreamingArrowQuery`, replace the `duckdb_query_arrow` block and the old `duckdb_destroy_arrow` cleanup with: + +```cpp +auto* duckdb_connection_ptr = + reinterpret_cast(connection); +duckdb::unique_ptr query_result; +try { + query_result = duckdb_connection_ptr->SendQuery( + std::string(sql), duckdb::QueryResultOutputType::ALLOW_STREAMING); +} catch (duckdb::Exception const& ex) { + return Error(ADBC_STATUS_IO, ex.what()); +} catch (std::exception const& ex) { + return Error(ADBC_STATUS_IO, ex.what()); +} catch (...) { + return Error(ADBC_STATUS_IO, "unknown DuckDB query error"); +} + +if (query_result == nullptr) { + return Error(ADBC_STATUS_IO, "DuckDB query returned no result"); +} +if (query_result->HasError()) { + return Error(ADBC_STATUS_IO, + DuckDbErrorMessage(query_result.get(), "DuckDB query failed"), + static_cast(query_result->GetErrorType())); +} +``` + +- [ ] **Step 2: Preserve row count behavior** + +After the error check, add: + +```cpp +if (rows_affected != nullptr) { + *rows_affected = -1; +} +``` + +- [ ] **Step 3: Handle `out == nullptr`** + +Keep a no-output branch that discards the result object: + +```cpp +if (out == nullptr) { + return {}; +} +``` + +- [ ] **Step 4: Compute extension type conversions once** + +Before allocating `DuckDbArrowStreamState`, add: + +```cpp +duckdb::unordered_map< + duckdb::idx_t, const duckdb::shared_ptr> + extension_types; +try { + extension_types = duckdb::ArrowTypeExtensionData::GetExtensionTypes( + *query_result->client_properties.client_context, query_result->types); +} catch (duckdb::Exception const& ex) { + return Error(ADBC_STATUS_IO, ex.what()); +} catch (std::exception const& ex) { + return Error(ADBC_STATUS_IO, ex.what()); +} catch (...) { + return Error(ADBC_STATUS_IO, + "unknown error while preparing DuckDB Arrow conversions"); +} +``` + +- [ ] **Step 5: Install stream callbacks** + +Replace the old state allocation with: + +```cpp +auto* state = new (std::nothrow) DuckDbArrowStreamState{ + std::move(query_result), std::move(extension_types), {}}; +if (state == nullptr) { + return Error(ADBC_STATUS_UNKNOWN, "failed to allocate DuckDB Arrow stream"); +} + +out->private_data = state; +out->get_schema = StreamGetSchema; +out->get_next = StreamGetNext; +out->get_last_error = StreamGetLastError; +out->release = StreamRelease; +return {}; +``` + +- [ ] **Step 6: Add active stream comment** + +Add this comment immediately above `SendQuery`: + +```cpp +// DuckDB allows one active StreamQueryResult per connection. ADBC callers +// must consume or release the returned ArrowArrayStream before issuing another +// query on the same connection. +``` + +## Task 5: Update and Expand Helper Tests + +**Files:** +- Modify: `tests/duckdb_arrow_stream_test.cc` + +- [ ] **Step 1: Add RAII helpers for DuckDB setup** + +Keep the existing simple setup if preferred. Add a local helper function near the top of the test file: + +```cpp +struct DuckDbFixture { + duckdb_database database = nullptr; + duckdb_connection connection = nullptr; + + DuckDbFixture() { + EXPECT_EQ(duckdb_open(nullptr, &database), DuckDBSuccess); + EXPECT_EQ(duckdb_connect(database, &connection), DuckDBSuccess); + } + + ~DuckDbFixture() { + duckdb_disconnect(&connection); + duckdb_close(&database); + } +}; +``` + +- [ ] **Step 2: Update the existing smoke test** + +Replace manual open/connect/close in `ExecutesQueryAsArrowStream` with: + +```cpp +DuckDbFixture fixture; + +ArrowArrayStream stream = {}; +int64_t rows_affected = 0; +auto const result = adbc_driver_quack::ExecuteDuckDbStreamingArrowQuery( + fixture.connection, "SELECT 42 AS answer", &stream, &rows_affected); +``` + +Keep the existing schema, first-array, end-of-stream, and release assertions. + +- [ ] **Step 3: Add a multi-chunk streaming test** + +Add: + +```cpp +TEST(DuckDbArrowStreamTest, FetchesLargeQueryInMultipleArrays) { + DuckDbFixture fixture; + + ArrowArrayStream stream = {}; + auto const result = adbc_driver_quack::ExecuteDuckDbStreamingArrowQuery( + fixture.connection, "SELECT range AS value FROM range(10000)", &stream, + nullptr); + + ASSERT_EQ(result.status, ADBC_STATUS_OK); + ASSERT_NE(stream.get_next, nullptr); + + int array_count = 0; + int64_t row_count = 0; + while (true) { + ArrowArray array = {}; + ASSERT_EQ(stream.get_next(&stream, &array), 0); + if (array.release == nullptr) { + break; + } + ++array_count; + row_count += array.length; + array.release(&array); + } + + EXPECT_GT(array_count, 1); + EXPECT_EQ(row_count, 10000); + stream.release(&stream); +} +``` + +- [ ] **Step 4: Add a query error test** + +Add: + +```cpp +TEST(DuckDbArrowStreamTest, ReportsQueryErrors) { + DuckDbFixture fixture; + + ArrowArrayStream stream = {}; + auto const result = adbc_driver_quack::ExecuteDuckDbStreamingArrowQuery( + fixture.connection, "SELECT * FROM missing_table", &stream, nullptr); + + EXPECT_EQ(result.status, ADBC_STATUS_IO); + EXPECT_FALSE(result.message.empty()); + EXPECT_EQ(stream.release, nullptr); +} +``` + +- [ ] **Step 5: Add callback argument validation test** + +Add: + +```cpp +TEST(DuckDbArrowStreamTest, RejectsInvalidCallbackArguments) { + DuckDbFixture fixture; + + ArrowArrayStream stream = {}; + auto const result = adbc_driver_quack::ExecuteDuckDbStreamingArrowQuery( + fixture.connection, "SELECT 1", &stream, nullptr); + ASSERT_EQ(result.status, ADBC_STATUS_OK); + + ArrowSchema schema = {}; + ArrowArray array = {}; + EXPECT_EQ(stream.get_schema(nullptr, &schema), EINVAL); + EXPECT_EQ(stream.get_schema(&stream, nullptr), EINVAL); + EXPECT_EQ(stream.get_next(nullptr, &array), EINVAL); + EXPECT_EQ(stream.get_next(&stream, nullptr), EINVAL); + + stream.release(&stream); +} +``` + +- [ ] **Step 6: Run helper tests** + +Run: + +```bash +cmake --build build/ci-test-linux-amd64 --target adbc_driver_quack_helper_tests +./build/ci-test-linux-amd64/tests/adbc_driver_quack_helper_tests +``` + +Expected: all helper tests pass. + +## Task 6: Update Project Design Notes + +**Files:** +- Modify: `docs/DESIGN.md` + +- [ ] **Step 1: Update helper description** + +In `docs/DESIGN.md`, replace: + +```markdown +- `src/duckdb_arrow_stream.cc` adapts DuckDB Arrow query results to the Arrow C + Stream interface. +``` + +with: + +```markdown +- `src/duckdb_arrow_stream.cc` adapts DuckDB streaming query results to the + Arrow C Stream interface. +``` + +- [ ] **Step 2: Add runtime note** + +After the paragraph that starts with `Statement execution wraps caller SQL`, add: + +```markdown +Result-producing statement execution and GetObjects use DuckDB streaming query +results and convert each fetched DuckDB chunk to Arrow on demand. Callers must +consume or release a returned Arrow stream before issuing another query on the +same connection because DuckDB permits only one active streaming query result +per connection. +``` + +## Task 7: Full Verification + +**Files:** +- No source changes expected. + +- [ ] **Step 1: Run CI-style build** + +Run: + +```bash +./ci/scripts/build.sh test linux amd64 +``` + +Expected: build succeeds without compiler warnings. + +- [ ] **Step 2: Run C++ tests** + +Run: + +```bash +./ci/scripts/test.sh linux amd64 +``` + +Expected: all C++ tests pass. + +- [ ] **Step 3: Run validation collection** + +Run: + +```bash +pixi run validate --collect-only +``` + +Expected: validation tests collect successfully. + +- [ ] **Step 4: Run GetObjects validation subset** + +Run: + +```bash +pixi run validate -k get_objects +``` + +Expected: GetObjects validation passes. + +- [ ] **Step 5: Run pre-commit outside the sandbox** + +Run outside the sandbox: + +```bash +pre-commit run --all-files +``` + +Expected: all hooks pass. Include any lockfile or formatting updates produced by hooks in the final diff if they are relevant. + +## Self-Review + +- Spec coverage: the plan covers both `AdbcStatementExecuteQuery` and `DriverConnectionGetObjects`, avoids deprecated C streaming APIs, converts one chunk per `get_next`, preserves Arrow C Stream behavior, and documents DuckDB's one-active-stream constraint. +- Placeholder scan: no task contains unresolved placeholders. +- Type consistency: the plan consistently uses `ExecuteDuckDbStreamingArrowQuery`, `DuckDbArrowQueryResult`, `duckdb::QueryResult`, `duckdb::DataChunk`, and Arrow C Stream callbacks. diff --git a/docs/superpowers/specs/2026-05-20-true-streaming-query-results-design.md b/docs/superpowers/specs/2026-05-20-true-streaming-query-results-design.md new file mode 100644 index 0000000..486e303 --- /dev/null +++ b/docs/superpowers/specs/2026-05-20-true-streaming-query-results-design.md @@ -0,0 +1,141 @@ + + +# True Streaming Query Results Design + +## Goal + +Make result-producing driver APIs stream DuckDB result chunks instead of +materializing the full result set through `duckdb_query_arrow`. + +This applies to both: + +- `AdbcStatementExecuteQuery` for SQL query execution. +- `AdbcConnectionGetObjects` for validation metadata results. + +## Selected Approach + +Use DuckDB's C++ streaming query API: + +```cpp +duckdb::Connection::SendQuery(sql, duckdb::QueryResultOutputType::ALLOW_STREAMING) +``` + +The returned `duckdb::QueryResult` may be a `StreamQueryResult` or a +`MaterializedQueryResult`, depending on DuckDB and statement shape. The helper +will not call `duckdb_query_arrow`, so it will not request DuckDB's deprecated +materialized Arrow result API. Result consumption will use `QueryResult::Fetch` +or `QueryResult::TryFetch` and convert each `duckdb::DataChunk` to Arrow as it +is requested by `ArrowArrayStream::get_next`. + +The deprecated DuckDB C streaming APIs are intentionally not used. + +## Architecture + +Replace the internals of `src/duckdb_arrow_stream.cc` with a streaming +implementation while keeping a narrow helper boundary for callers. The helper +will own a `duckdb::QueryResult` in `ArrowArrayStream::private_data`, expose the +same Arrow C Stream callbacks, and release the query result when the stream is +released. + +The helper API should be renamed from `ExecuteDuckDbArrowQuery` to +`ExecuteDuckDbStreamingArrowQuery` so call sites and tests clearly distinguish +the new behavior from the old materializing DuckDB Arrow API. Both +`DriverStatementExecuteQuery` and `DriverConnectionGetObjects` will call the +new helper. + +## Stream State + +The stream state will contain: + +```cpp +std::unique_ptr result; +duckdb::unordered_map> + extension_types; +std::string last_error; +``` + +`extension_types` is computed once after a successful query using +`duckdb::ArrowTypeExtensionData::GetExtensionTypes`, matching DuckDB's own C +Arrow adapter behavior. + +## Data Flow + +1. The caller builds the remote Quack SQL exactly as it does today. +2. The streaming helper calls `SendQuery(..., ALLOW_STREAMING)`. +3. If DuckDB returns an error result, the helper converts it to + `ADBC_STATUS_IO` and no stream is installed. +4. If `rows_affected` is requested for result-producing execution, the helper + sets it to `-1`. It does not call APIs that require materializing the result + only to compute a row count. +5. `get_schema` converts `result->types` and `result->names` with + `duckdb::ArrowConverter::ToArrowSchema`. +6. `get_next` fetches one `duckdb::DataChunk`. If no chunk remains, it leaves + `ArrowArray::release` null. If a chunk is available, it converts that chunk + with `duckdb::ArrowConverter::ToArrowArray`. +7. `release` destroys the stored `QueryResult` and clears all callbacks. + +## Error Handling + +The helper should catch `duckdb::Exception`, `std::exception`, and unknown +exceptions around query execution, schema conversion, and chunk conversion. +Errors from callbacks are stored in `last_error` and reported through +`get_last_error`. + +Callback return codes should remain POSIX-style integers: + +- `EINVAL` for null stream state or null output pointers. +- `EIO` for DuckDB query/fetch/conversion failures. + +Driver entry points continue to convert helper failures into `AdbcError` with +the existing `StatusError` path. + +## Concurrency Constraint + +DuckDB documents that there can be only one active `StreamQueryResult` per +connection and that starting another query invalidates any existing streaming +result on that connection. The driver will not add connection-level +synchronization in this change. It will rely on the existing ADBC stream +lifetime contract: callers must consume or release the stream before issuing +another query on the same connection. + +Tests should cover the normal stream lifecycle and the behavior expected from +chunk-by-chunk consumption. The connection-level single-active-stream rule +should be documented in code comments where the streaming query is created. + +## Testing + +Update the existing helper test to use the renamed streaming helper. Add tests +that: + +- Fetch a simple one-row query through the Arrow C Stream callbacks. +- Fetch a large `range(...)` query and observe more than one Arrow array. +- Return an empty end-of-stream Arrow array with `release == nullptr`. +- Surface syntax or catalog errors from query execution. +- Surface invalid callback arguments with `EINVAL`. + +Run the CI-style build and test scripts after implementation: + +```bash +./ci/scripts/build.sh test linux amd64 +./ci/scripts/test.sh linux amd64 +pixi run validate --collect-only +pixi run validate -k get_objects +pre-commit run --all-files +``` + +Codex must run `pre-commit` outside the sandbox. diff --git a/src/adbc_driver_quack.cc b/src/adbc_driver_quack.cc index 12e9fba..177ca65 100644 --- a/src/adbc_driver_quack.cc +++ b/src/adbc_driver_quack.cc @@ -1303,7 +1303,7 @@ AdbcStatusCode DriverConnectionGetObjects( return InvalidArgument(error, "invalid GetObjects depth"); } - auto const result = adbc_driver_quack::ExecuteDuckDbArrowQuery( + auto const result = adbc_driver_quack::ExecuteDuckDbStreamingArrowQuery( state->connection, adbc_driver_quack::BuildRemoteQuerySql(query), out, nullptr); if (result.status != ADBC_STATUS_OK) { @@ -1414,7 +1414,7 @@ AdbcStatusCode DriverStatementExecuteQuery(AdbcStatement* statement, std::string const remote_sql = adbc_driver_quack::BuildRemoteQuerySql(state->sql); if (out != nullptr) { - auto const result = adbc_driver_quack::ExecuteDuckDbArrowQuery( + auto const result = adbc_driver_quack::ExecuteDuckDbStreamingArrowQuery( state->connection->connection, remote_sql, out, rows_affected); if (result.status != ADBC_STATUS_OK) { return StatusError(error, result.status, result.message, diff --git a/src/duckdb_arrow_stream.cc b/src/duckdb_arrow_stream.cc index 32981a7..ae48d75 100644 --- a/src/duckdb_arrow_stream.cc +++ b/src/duckdb_arrow_stream.cc @@ -16,6 +16,13 @@ #include #include +#include +#include +#include +#include +#include +#include +#include #include #include #include @@ -24,7 +31,14 @@ namespace adbc_driver_quack { namespace { struct DuckDbArrowStreamState { - duckdb_arrow result = nullptr; + duckdb::unique_ptr pending_result; + duckdb::unique_ptr result; + duckdb::vector types; + duckdb::vector names; + duckdb::ClientProperties client_properties; + duckdb::unordered_map< + duckdb::idx_t, duckdb::shared_ptr const> + extension_types; std::string last_error; }; @@ -55,29 +69,80 @@ DuckDbArrowStreamState* GetState(ArrowArrayStream* stream) { return static_cast(stream->private_data); } -std::string DuckDbArrowError(duckdb_arrow result, std::string fallback) { - if (result == nullptr) { +std::string DuckDbErrorMessage(duckdb::QueryResult const* result, + std::string fallback) { + if (result == nullptr || !result->HasError()) { return fallback; } - char const* error = duckdb_query_arrow_error(result); - if (error == nullptr || error[0] == '\0') { + std::string const& error = result->GetError(); + if (error.empty()) { return fallback; } return error; } +std::string DuckDbErrorMessage(duckdb::PendingQueryResult const* result, + std::string fallback) { + if (result == nullptr || !result->HasError()) { + return fallback; + } + std::string const& error = result->GetError(); + if (error.empty()) { + return fallback; + } + return error; +} + +int EnsureResultReady(DuckDbArrowStreamState* state) { + if (state->result != nullptr) { + return 0; + } + if (state->pending_result == nullptr) { + state->last_error = "DuckDB query result is not available"; + return EINVAL; + } + try { + state->result = state->pending_result->Execute(); + state->pending_result.reset(); + } catch (duckdb::Exception const& ex) { + state->last_error = ex.what(); + return EIO; + } catch (std::exception const& ex) { + state->last_error = ex.what(); + return EIO; + } catch (...) { + state->last_error = "unknown DuckDB query execution error"; + return EIO; + } + if (state->result == nullptr) { + state->last_error = "DuckDB query returned no result"; + return EIO; + } + if (state->result->HasError()) { + state->last_error = + DuckDbErrorMessage(state->result.get(), "DuckDB query failed"); + return EIO; + } + return 0; +} + int StreamGetSchema(ArrowArrayStream* stream, ArrowSchema* out) { auto* state = GetState(stream); - if (state == nullptr || state->result == nullptr || out == nullptr) { + if (state == nullptr || out == nullptr) { return EINVAL; } std::memset(out, 0, sizeof(*out)); - auto* schema = out; - if (duckdb_query_arrow_schema( - state->result, reinterpret_cast(&schema)) != - DuckDBSuccess) { - state->last_error = - DuckDbArrowError(state->result, "failed to get DuckDB Arrow schema"); + try { + duckdb::ArrowConverter::ToArrowSchema(out, state->types, state->names, + state->client_properties); + } catch (duckdb::Exception const& ex) { + state->last_error = ex.what(); + return EIO; + } catch (std::exception const& ex) { + state->last_error = ex.what(); + return EIO; + } catch (...) { + state->last_error = "unknown error while getting DuckDB Arrow schema"; return EIO; } state->last_error.clear(); @@ -86,18 +151,56 @@ int StreamGetSchema(ArrowArrayStream* stream, ArrowSchema* out) { int StreamGetNext(ArrowArrayStream* stream, ArrowArray* out) { auto* state = GetState(stream); - if (state == nullptr || state->result == nullptr || out == nullptr) { + if (state == nullptr || out == nullptr) { return EINVAL; } std::memset(out, 0, sizeof(*out)); - auto* array = out; - if (duckdb_query_arrow_array(state->result, - reinterpret_cast(&array)) != - DuckDBSuccess) { - state->last_error = - DuckDbArrowError(state->result, "failed to get DuckDB Arrow array"); + + int const result_status = EnsureResultReady(state); + if (result_status != 0) { + return result_status; + } + + duckdb::ErrorData& fetch_error = state->result->GetErrorObject(); + duckdb::unique_ptr chunk; + try { + if (!state->result->TryFetch(chunk, fetch_error)) { + state->last_error = fetch_error.Message(); + if (state->last_error.empty()) { + state->last_error = DuckDbErrorMessage( + state->result.get(), "failed to fetch DuckDB result chunk"); + } + return EIO; + } + } catch (duckdb::Exception const& ex) { + state->last_error = ex.what(); + return EIO; + } catch (std::exception const& ex) { + state->last_error = ex.what(); + return EIO; + } catch (...) { + state->last_error = "unknown error while fetching DuckDB result chunk"; + return EIO; + } + if (chunk == nullptr || chunk->size() == 0) { + state->last_error.clear(); + return 0; + } + + try { + duckdb::ArrowConverter::ToArrowArray(*chunk, out, state->client_properties, + state->extension_types); + } catch (duckdb::Exception const& ex) { + state->last_error = ex.what(); + return EIO; + } catch (std::exception const& ex) { + state->last_error = ex.what(); + return EIO; + } catch (...) { + state->last_error = "unknown error while converting DuckDB chunk to Arrow"; return EIO; } + state->last_error.clear(); return 0; } @@ -119,45 +222,84 @@ void StreamRelease(ArrowArrayStream* stream) { ResetStream(stream); return; } - duckdb_destroy_arrow(&state->result); delete state; ResetStream(stream); } } // namespace -DuckDbArrowQueryResult ExecuteDuckDbArrowQuery(duckdb_connection connection, - std::string_view sql, - ArrowArrayStream* out, - int64_t* rows_affected) { +DuckDbArrowQueryResult ExecuteDuckDbStreamingArrowQuery( + duckdb_connection connection, std::string_view sql, ArrowArrayStream* out, + int64_t* rows_affected) { ResetStream(out); if (connection == nullptr) { return Error(ADBC_STATUS_INVALID_STATE, "connection is not initialized"); } - std::string sql_storage(sql); - duckdb_arrow result = nullptr; - if (duckdb_query_arrow(connection, sql_storage.c_str(), &result) != - DuckDBSuccess) { - std::string const message = - DuckDbArrowError(result, "DuckDB Arrow query failed"); - duckdb_destroy_arrow(&result); - return Error(ADBC_STATUS_IO, message); + auto* duckdb_connection_ptr = + reinterpret_cast(connection); + duckdb::unique_ptr pending_result; + try { + // DuckDB allows one active StreamQueryResult per connection. ADBC callers + // must consume or release the returned ArrowArrayStream before issuing + // another query on the same connection. + pending_result = duckdb_connection_ptr->PendingQuery( + std::string(sql), duckdb::QueryResultOutputType::ALLOW_STREAMING); + } catch (duckdb::Exception const& ex) { + return Error(ADBC_STATUS_IO, ex.what()); + } catch (std::exception const& ex) { + return Error(ADBC_STATUS_IO, ex.what()); + } catch (...) { + return Error(ADBC_STATUS_IO, "unknown DuckDB query error"); + } + + if (pending_result == nullptr) { + return Error(ADBC_STATUS_IO, "DuckDB query returned no pending result"); + } + if (pending_result->HasError()) { + return Error( + ADBC_STATUS_IO, + DuckDbErrorMessage(pending_result.get(), "DuckDB query failed"), + static_cast(pending_result->GetErrorType())); } if (rows_affected != nullptr) { - idx_t const rows_changed = duckdb_arrow_rows_changed(result); - *rows_affected = rows_changed > 0 ? static_cast(rows_changed) : -1; + *rows_affected = -1; } if (out == nullptr) { - duckdb_destroy_arrow(&result); return {}; } - auto* state = new (std::nothrow) DuckDbArrowStreamState{result, {}}; + duckdb::ClientProperties client_properties = + duckdb_connection_ptr->context->GetClientProperties(); + + duckdb::unordered_map< + duckdb::idx_t, duckdb::shared_ptr const> + extension_types; + try { + extension_types = duckdb::ArrowTypeExtensionData::GetExtensionTypes( + *client_properties.client_context, pending_result->types); + } catch (duckdb::Exception const& ex) { + return Error(ADBC_STATUS_IO, ex.what()); + } catch (std::exception const& ex) { + return Error(ADBC_STATUS_IO, ex.what()); + } catch (...) { + return Error(ADBC_STATUS_IO, + "unknown error while preparing DuckDB Arrow conversions"); + } + + duckdb::vector types = pending_result->types; + duckdb::vector names = pending_result->names; + auto* state = + new (std::nothrow) DuckDbArrowStreamState{std::move(pending_result), + nullptr, + std::move(types), + std::move(names), + std::move(client_properties), + std::move(extension_types), + {}}; if (state == nullptr) { - duckdb_destroy_arrow(&result); return Error(ADBC_STATUS_UNKNOWN, "failed to allocate DuckDB Arrow stream"); } diff --git a/src/duckdb_arrow_stream.h b/src/duckdb_arrow_stream.h index 215e58e..194eb38 100644 --- a/src/duckdb_arrow_stream.h +++ b/src/duckdb_arrow_stream.h @@ -29,9 +29,8 @@ struct DuckDbArrowQueryResult { int32_t vendor_code = 0; }; -DuckDbArrowQueryResult ExecuteDuckDbArrowQuery(duckdb_connection connection, - std::string_view sql, - ArrowArrayStream* out, - int64_t* rows_affected); +DuckDbArrowQueryResult ExecuteDuckDbStreamingArrowQuery( + duckdb_connection connection, std::string_view sql, ArrowArrayStream* out, + int64_t* rows_affected); } // namespace adbc_driver_quack diff --git a/tests/duckdb_arrow_stream_test.cc b/tests/duckdb_arrow_stream_test.cc index b0ee07b..74eb719 100644 --- a/tests/duckdb_arrow_stream_test.cc +++ b/tests/duckdb_arrow_stream_test.cc @@ -18,43 +18,170 @@ #include #include -TEST(DuckDbArrowStreamTest, ExecutesQueryAsArrowStream) { - duckdb_database database = nullptr; - duckdb_connection connection = nullptr; - ASSERT_EQ(duckdb_open(nullptr, &database), DuckDBSuccess); - ASSERT_EQ(duckdb_connect(database, &connection), DuckDBSuccess); +#include +struct ArrowArrayStreamGuard { ArrowArrayStream stream = {}; + + ~ArrowArrayStreamGuard() { Release(); } + + ArrowArrayStream* get() { return &stream; } + + void Release() { + if (stream.release != nullptr) { + stream.release(&stream); + } + } +}; + +struct ArrowSchemaGuard { + ArrowSchema schema = {}; + + ~ArrowSchemaGuard() { Release(); } + + ArrowSchema* get() { return &schema; } + + void Release() { + if (schema.release != nullptr) { + schema.release(&schema); + } + } +}; + +struct ArrowArrayGuard { + ArrowArray array = {}; + + ~ArrowArrayGuard() { Release(); } + + ArrowArray* get() { return &array; } + + void Release() { + if (array.release != nullptr) { + array.release(&array); + } + } +}; + +class DuckDbArrowStreamTest : public ::testing::Test { + protected: + void SetUp() override { + ASSERT_EQ(duckdb_open(nullptr, &database_), DuckDBSuccess); + ASSERT_EQ(duckdb_connect(database_, &connection_), DuckDBSuccess); + } + + void TearDown() override { + if (connection_ != nullptr) { + duckdb_disconnect(&connection_); + } + if (database_ != nullptr) { + duckdb_close(&database_); + } + } + + duckdb_connection connection() { return connection_; } + + private: + duckdb_database database_ = nullptr; + duckdb_connection connection_ = nullptr; +}; + +TEST_F(DuckDbArrowStreamTest, ExecutesQueryAsArrowStream) { + ArrowArrayStreamGuard stream; int64_t rows_affected = 0; - auto const result = adbc_driver_quack::ExecuteDuckDbArrowQuery( - connection, "SELECT 42 AS answer", &stream, &rows_affected); + auto const result = adbc_driver_quack::ExecuteDuckDbStreamingArrowQuery( + connection(), "SELECT 42 AS answer", stream.get(), &rows_affected); EXPECT_EQ(result.status, ADBC_STATUS_OK); EXPECT_TRUE(result.message.empty()); EXPECT_EQ(rows_affected, -1); - ASSERT_NE(stream.get_schema, nullptr); - ASSERT_NE(stream.get_next, nullptr); - ASSERT_NE(stream.release, nullptr); + ASSERT_NE(stream.stream.get_schema, nullptr); + ASSERT_NE(stream.stream.get_next, nullptr); + ASSERT_NE(stream.stream.release, nullptr); - ArrowSchema schema = {}; - EXPECT_EQ(stream.get_schema(&stream, &schema), 0); - ASSERT_NE(schema.release, nullptr); - EXPECT_STREQ(schema.name, "duckdb_query_result"); - schema.release(&schema); + ArrowSchemaGuard schema; + EXPECT_EQ(stream.stream.get_schema(stream.get(), schema.get()), 0); + ASSERT_NE(schema.schema.release, nullptr); + EXPECT_STREQ(schema.schema.name, "duckdb_query_result"); + schema.Release(); - ArrowArray array = {}; - EXPECT_EQ(stream.get_next(&stream, &array), 0); - EXPECT_EQ(array.length, 1); - ASSERT_NE(array.release, nullptr); - array.release(&array); + ArrowArrayGuard array; + EXPECT_EQ(stream.stream.get_next(stream.get(), array.get()), 0); + EXPECT_EQ(array.array.length, 1); + ASSERT_NE(array.array.release, nullptr); + array.Release(); + + ArrowArrayGuard end; + EXPECT_EQ(stream.stream.get_next(stream.get(), end.get()), 0); + EXPECT_EQ(end.array.release, nullptr); + + stream.Release(); + EXPECT_EQ(stream.stream.release, nullptr); +} + +TEST_F(DuckDbArrowStreamTest, FetchesLargeQueryInMultipleArrays) { + ArrowArrayStreamGuard stream; + auto const result = adbc_driver_quack::ExecuteDuckDbStreamingArrowQuery( + connection(), "SELECT range AS value FROM range(10000)", stream.get(), + nullptr); + + ASSERT_EQ(result.status, ADBC_STATUS_OK); + ASSERT_NE(stream.stream.get_next, nullptr); + + int array_count = 0; + int64_t row_count = 0; + while (true) { + ArrowArrayGuard array; + ASSERT_EQ(stream.stream.get_next(stream.get(), array.get()), 0); + if (array.array.release == nullptr) { + break; + } + ++array_count; + row_count += array.array.length; + } + + EXPECT_GT(array_count, 1); + EXPECT_EQ(row_count, 10000); +} + +TEST_F(DuckDbArrowStreamTest, ReportsQueryErrors) { + ArrowArrayStreamGuard stream; + auto const result = adbc_driver_quack::ExecuteDuckDbStreamingArrowQuery( + connection(), "SELECT * FROM missing_table", stream.get(), nullptr); + + EXPECT_EQ(result.status, ADBC_STATUS_IO); + EXPECT_FALSE(result.message.empty()); + EXPECT_EQ(stream.stream.release, nullptr); +} + +TEST_F(DuckDbArrowStreamTest, RejectsInvalidCallbackArguments) { + ArrowArrayStreamGuard stream; + auto const result = adbc_driver_quack::ExecuteDuckDbStreamingArrowQuery( + connection(), "SELECT 1", stream.get(), nullptr); + ASSERT_EQ(result.status, ADBC_STATUS_OK); + ASSERT_NE(stream.stream.get_schema, nullptr); + ASSERT_NE(stream.stream.get_next, nullptr); + + { + ArrowSchemaGuard schema; + EXPECT_EQ(stream.stream.get_schema(nullptr, schema.get()), EINVAL); + EXPECT_EQ(schema.schema.release, nullptr); + } - ArrowArray end = {}; - EXPECT_EQ(stream.get_next(&stream, &end), 0); - EXPECT_EQ(end.release, nullptr); + { + ArrowSchemaGuard schema; + EXPECT_EQ(stream.stream.get_schema(stream.get(), nullptr), EINVAL); + EXPECT_EQ(schema.schema.release, nullptr); + } - stream.release(&stream); - EXPECT_EQ(stream.release, nullptr); + { + ArrowArrayGuard array; + EXPECT_EQ(stream.stream.get_next(nullptr, array.get()), EINVAL); + EXPECT_EQ(array.array.release, nullptr); + } - duckdb_disconnect(&connection); - duckdb_close(&database); + { + ArrowArrayGuard array; + EXPECT_EQ(stream.stream.get_next(stream.get(), nullptr), EINVAL); + EXPECT_EQ(array.array.release, nullptr); + } }