Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions docs/superpowers/plans/2026-05-19-quack-transactions-prepare.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
<!--
Copyright (c) 2026 ADBC Drivers Contributors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

# Quack Transactions And Prepare-Only Statements

**Goal:** Implement ADBC transaction callbacks and parameterless statement prepare support for the Quack driver, while keeping bind parameters unsupported.

## Summary

- Implement ADBC connection transactions over the existing attached Quack catalog/session.
- Implement parameterless `AdbcStatementPrepare` for SQL statements while keeping bind parameters unsupported.
- Adjust bound-stream state handling so SQL and bound streams can coexist until execution, where unsupported parameter binding fails explicitly.

## Key Changes

- In `src/adbc_driver_quack.cc`, add transaction state to `ConnectionState`: autocommit enabled by default plus active remote transaction tracking.
- Add `DriverConnectionCommit` and `DriverConnectionRollback`, export `AdbcConnectionCommit` and `AdbcConnectionRollback`, and wire both callbacks in `InitDriver`.
- Extend `DriverConnectionSetOption`/`GetOption` for `ADBC_CONNECTION_OPTION_AUTOCOMMIT`, returning `"true"`/`"false"` and rejecting commit/rollback while autocommit is enabled.
- When autocommit is disabled, send remote `BEGIN TRANSACTION`; after commit or rollback, send `COMMIT`/`ROLLBACK` and begin a fresh remote transaction so repeated commit/rollback calls while autocommit is disabled remain valid.
- Implement `DriverStatementPrepare` without binding: require initialized statement, initialized connection, and non-empty `state->sql`; set a `prepared` flag.
- Change bound-stream behavior:
- `DriverStatementBindStream` stores the stream but does not clear `state->sql`.
- Setting `ADBC_INGEST_OPTION_TARGET_TABLE` clears `state->sql`, since that statement is now bulk ingest.
- `DriverStatementExecuteQuery` with `has_bound_stream && !state->sql.empty()` returns `ADBC_STATUS_NOT_IMPLEMENTED` for parameterized SQL execution.
- `DriverStatementExecuteQuery` with `has_bound_stream && state->sql.empty()` keeps the current bulk-ingest path.
- Keep `DriverStatementBind` returning `ADBC_STATUS_NOT_IMPLEMENTED`, and do not implement parameter schema.

## Validation Changes

- Update `validation/tests/quack.py`:
- `connection_transactions=True`
- `statement_prepare=True`
- leave `statement_bind=False`
- leave `statement_get_parameter_schema=False`
- Do not add `validation/queries/type/bind/*.txtcase` overrides unless a validation run shows bind tests are still collected despite `statement_bind=False`.

## Tests

- Update C++ symbol tests to expect exported/populated `ConnectionCommit` and `ConnectionRollback`.
- Replace the existing C++ "prepare is unsupported" assertion with prepare state tests:
- uninitialized statement returns `ADBC_STATUS_INVALID_STATE`;
- initialized statement with no SQL returns `ADBC_STATUS_INVALID_STATE`;
- initialized SQL statement can be prepared;
- bind remains `ADBC_STATUS_NOT_IMPLEMENTED`.
- Add C++ state-routing tests for bound streams:
- `BindStream` after `SetSqlQuery` preserves SQL and execution fails with `ADBC_STATUS_NOT_IMPLEMENTED`;
- setting `ADBC_INGEST_OPTION_TARGET_TABLE` clears SQL so the statement uses the bulk-ingest path.
- Run:
- `./ci/scripts/build.sh test linux amd64`
- `./ci/scripts/test.sh linux amd64`
- `pixi run validate -k "transaction_toggle or prepare"`
- `pixi run validate --collect-only` to confirm bind cases remain skipped or marked unsupported
- `pre-commit run --all-files` outside the sandbox.

## Assumptions

- Prepared statements mean parameterless `prepare()` support only for now.
- No Quack protocol changes are included.
- Bind support remains blocked until Quack exposes a typed remote parameter mechanism.
104 changes: 51 additions & 53 deletions pixi.lock

Large diffs are not rendered by default.

125 changes: 121 additions & 4 deletions src/adbc_driver_quack.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ AdbcStatusCode DriverDatabaseSetOption(AdbcDatabase* database, char const* key,
AdbcStatusCode DriverDatabaseRelease(AdbcDatabase* database, AdbcError* error);
AdbcStatusCode DriverConnectionInit(AdbcConnection* connection,
AdbcDatabase* database, AdbcError* error);
AdbcStatusCode DriverConnectionCommit(AdbcConnection* connection,
AdbcError* error);
AdbcStatusCode DriverConnectionGetInfo(AdbcConnection* connection,
uint32_t const* info_codes,
size_t info_codes_length,
Expand All @@ -60,6 +62,8 @@ AdbcStatusCode DriverConnectionNew(AdbcConnection* connection,
AdbcError* error);
AdbcStatusCode DriverConnectionRelease(AdbcConnection* connection,
AdbcError* error);
AdbcStatusCode DriverConnectionRollback(AdbcConnection* connection,
AdbcError* error);
AdbcStatusCode DriverConnectionSetOption(AdbcConnection* connection,
char const* key, char const* value,
AdbcError* error);
Expand Down Expand Up @@ -98,6 +102,8 @@ struct ConnectionState {
duckdb_connection connection = nullptr;
uint64_t next_ingest_id = 0;
bool initialized = false;
bool autocommit = true;
bool transaction_active = false;
};

struct StatementState {
Expand All @@ -110,6 +116,7 @@ struct StatementState {
bool ingest_temporary = false;
ArrowArrayStream bound_stream = {};
bool has_bound_stream = false;
bool prepared = false;
};

struct DriverState {
Expand Down Expand Up @@ -640,6 +647,7 @@ void CloseConnectionState(ConnectionState* state) {
duckdb_close(&state->database);
}
state->initialized = false;
state->transaction_active = false;
}

AdbcStatusCode RunDuckDbQuery(ConnectionState* state, std::string const& sql,
Expand Down Expand Up @@ -890,7 +898,7 @@ AdbcStatusCode ExecuteBulkIngest(StatementState* state, int64_t* rows_affected,

RunDuckDbQuery(state->connection, "DROP TABLE IF EXISTS " + quoted_data,
nullptr);
if (status == ADBC_STATUS_OK) {
if (status == ADBC_STATUS_OK && state->connection->autocommit) {
RunDuckDbQuery(state->connection, "COMMIT", nullptr);
}
if (status == ADBC_STATUS_OK && rows_affected != nullptr) {
Expand Down Expand Up @@ -1023,12 +1031,14 @@ AdbcStatusCode InitDriver(int version, void* raw_driver, AdbcError* error) {
driver->DatabaseSetOption = DriverDatabaseSetOption;
driver->DatabaseRelease = DriverDatabaseRelease;

driver->ConnectionCommit = DriverConnectionCommit;
driver->ConnectionInit = DriverConnectionInit;
driver->ConnectionGetInfo = DriverConnectionGetInfo;
driver->ConnectionGetOption = DriverConnectionGetOption;
driver->ConnectionGetObjects = DriverConnectionGetObjects;
driver->ConnectionNew = DriverConnectionNew;
driver->ConnectionRelease = DriverConnectionRelease;
driver->ConnectionRollback = DriverConnectionRollback;
driver->ConnectionSetOption = DriverConnectionSetOption;

driver->StatementBind = DriverStatementBind;
Expand Down Expand Up @@ -1166,6 +1176,51 @@ AdbcStatusCode DriverConnectionRelease(AdbcConnection* connection,
return Ok(error);
}

static AdbcStatusCode BeginRemoteTransaction(ConnectionState* state,
AdbcError* error) {
AdbcStatusCode const status =
RunRemoteQuery(state, "BEGIN TRANSACTION", error);
if (status == ADBC_STATUS_OK) {
state->transaction_active = true;
}
return status;
}

static AdbcStatusCode FinishRemoteTransaction(ConnectionState* state,
char const* sql,
AdbcError* error) {
if (state->autocommit) {
return InvalidState(error,
"connection is in autocommit mode; cannot finish a "
"manual transaction");
}

AdbcStatusCode status = RunRemoteQuery(state, sql, error);
if (status != ADBC_STATUS_OK) {
return status;
}
state->transaction_active = false;
return BeginRemoteTransaction(state, error);
}

AdbcStatusCode DriverConnectionCommit(AdbcConnection* connection,
AdbcError* error) {
ConnectionState* state = GetConnection(connection);
if (state == nullptr || !state->initialized) {
return InvalidState(error, "connection is not initialized");
}
return FinishRemoteTransaction(state, "COMMIT", error);
}

AdbcStatusCode DriverConnectionRollback(AdbcConnection* connection,
AdbcError* error) {
ConnectionState* state = GetConnection(connection);
if (state == nullptr || !state->initialized) {
return InvalidState(error, "connection is not initialized");
}
return FinishRemoteTransaction(state, "ROLLBACK", error);
}

AdbcStatusCode DriverConnectionGetInfo(AdbcConnection* connection,
uint32_t const* info_codes,
size_t info_codes_length,
Expand Down Expand Up @@ -1215,6 +1270,9 @@ AdbcStatusCode DriverConnectionGetOption(AdbcConnection* connection,
if (status != ADBC_STATUS_OK) {
return status;
}
} else if (std::strcmp(key, ADBC_CONNECTION_OPTION_AUTOCOMMIT) == 0) {
option_value = state->autocommit ? ADBC_OPTION_VALUE_ENABLED
: ADBC_OPTION_VALUE_DISABLED;
} else {
return NotFound(error, std::string{"connection option not found: "} + key);
}
Expand Down Expand Up @@ -1272,6 +1330,30 @@ AdbcStatusCode DriverConnectionSetOption(AdbcConnection* connection,
std::string const sql = "USE " + QuoteIdentifier(value);
return RunRemoteQueryAllowNotFound(state, sql, error);
}
if (std::strcmp(key, ADBC_CONNECTION_OPTION_AUTOCOMMIT) == 0) {
if (std::strcmp(value, ADBC_OPTION_VALUE_ENABLED) == 0) {
if (!state->autocommit && state->transaction_active) {
AdbcStatusCode const status = RunRemoteQuery(state, "COMMIT", error);
if (status != ADBC_STATUS_OK) {
return status;
}
state->transaction_active = false;
}
state->autocommit = true;
return Ok(error);
}
if (std::strcmp(value, ADBC_OPTION_VALUE_DISABLED) == 0) {
if (state->autocommit) {
AdbcStatusCode const status = BeginRemoteTransaction(state, error);
if (status != ADBC_STATUS_OK) {
return status;
}
}
state->autocommit = false;
return Ok(error);
}
return InvalidArgument(error, "invalid autocommit option value");
}

return NotImplemented(error, "unsupported connection option");
}
Expand Down Expand Up @@ -1300,6 +1382,7 @@ AdbcStatusCode DriverStatementSetSqlQuery(AdbcStatement* statement,
}
ReleaseBoundStream(state);
state->sql = query;
state->prepared = false;
return Ok(error);
}

Expand All @@ -1315,6 +1398,10 @@ AdbcStatusCode DriverStatementExecuteQuery(AdbcStatement* statement,
return InvalidState(error, "connection is not initialized");
}
if (state->has_bound_stream) {
if (!state->sql.empty()) {
return NotImplemented(error,
"parameterized SQL execution is not implemented");
}
if (out != nullptr) {
return InvalidArgument(error,
"bulk ingest does not produce a result stream");
Expand Down Expand Up @@ -1344,8 +1431,20 @@ AdbcStatusCode DriverStatementExecuteQuery(AdbcStatement* statement,
return status;
}

AdbcStatusCode DriverStatementPrepare(AdbcStatement*, AdbcError* error) {
return NotImplemented(error, "parameterized statements are not implemented");
AdbcStatusCode DriverStatementPrepare(AdbcStatement* statement,
AdbcError* error) {
StatementState* state = GetStatement(statement);
if (state == nullptr) {
return InvalidState(error, "statement is not initialized");
}
if (state->connection == nullptr || !state->connection->initialized) {
return InvalidState(error, "connection is not initialized");
}
if (state->sql.empty()) {
return InvalidState(error, "SQL query is not set");
}
state->prepared = true;
return Ok(error);
}

AdbcStatusCode DriverStatementBind(AdbcStatement*, ArrowArray*, ArrowSchema*,
Expand All @@ -1364,7 +1463,6 @@ AdbcStatusCode DriverStatementBindStream(AdbcStatement* statement,
return InvalidArgument(error, "Arrow stream must not be null");
}
ReleaseBoundStream(state);
state->sql.clear();
state->bound_stream = *stream;
state->has_bound_stream = true;
stream->release = nullptr;
Expand Down Expand Up @@ -1396,6 +1494,8 @@ AdbcStatusCode DriverStatementSetOption(AdbcStatement* statement,
}
if (std::strcmp(key, ADBC_INGEST_OPTION_TARGET_TABLE) == 0) {
state->ingest_target_table = value;
state->sql.clear();
state->prepared = false;
return Ok(error);
}
if (std::strcmp(key, ADBC_INGEST_OPTION_TARGET_CATALOG) == 0) {
Expand Down Expand Up @@ -1462,13 +1562,30 @@ ADBC_EXPORT AdbcStatusCode AdbcConnectionRelease(AdbcConnection* connection,
return DriverConnectionRelease(connection, error);
}

ADBC_EXPORT AdbcStatusCode AdbcConnectionCommit(AdbcConnection* connection,
AdbcError* error) {
return DriverConnectionCommit(connection, error);
}

ADBC_EXPORT AdbcStatusCode AdbcConnectionRollback(AdbcConnection* connection,
AdbcError* error) {
return DriverConnectionRollback(connection, error);
}

ADBC_EXPORT AdbcStatusCode AdbcConnectionSetOption(AdbcConnection* connection,
char const* key,
char const* value,
AdbcError* error) {
return DriverConnectionSetOption(connection, key, value, error);
}

ADBC_EXPORT AdbcStatusCode AdbcConnectionGetOption(AdbcConnection* connection,
char const* key, char* value,
size_t* length,
AdbcError* error) {
return DriverConnectionGetOption(connection, key, value, length, error);
}

ADBC_EXPORT AdbcStatusCode AdbcConnectionGetInfo(AdbcConnection* connection,
uint32_t const* info_codes,
size_t info_codes_length,
Expand Down
Loading
Loading