Skip to content
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## UNRELEASED

### New Features
- Add an in-process `chdb` backend. `clickhouse_connect.get_client(interface="chdb")` (and `get_async_client`) returns a client backed by the embedded ClickHouse engine via the `chdb` Python package, with no server required.

### Bug Fixes
- Async client: `ca_cert="certifi"` shorthand now resolves to `certifi.where()`, matching the sync client. Previously the async path passed the literal string to `ssl_context.load_verify_locations`, producing `FileNotFoundError`. Closes [#742](https://github.com/ClickHouse/clickhouse-connect/issues/742)
- Fix SQLAlchemy dialect rendering for `ILIKE` and `NOT ILIKE` expressions to use native ClickHouse syntax instead of the generic SQLAlchemy `lower(...) LIKE lower(...)` fallback.
Expand Down
51 changes: 51 additions & 0 deletions clickhouse_connect/driver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,14 @@ def create_client(
limits. Only available for query operations (not inserts). Default: False
:return: ClickHouse Connect Client instance
"""
if interface == "chdb":
return _create_chdb_client(
database=database,
settings=settings,
generic_args=generic_args,
kwargs=kwargs,
)

host, username, password, port, database, interface = _parse_connection_params(
host, username, password, port, database, interface, secure, dsn, kwargs
)
Expand Down Expand Up @@ -264,6 +272,14 @@ async def create_async_client(
limits. Only available for query operations (not inserts). Default: False
:return: ClickHouse Connect AsyncClient instance
"""
if interface == "chdb":
return _create_chdb_async_client(
database=database,
settings=settings,
generic_args=generic_args,
kwargs=kwargs,
)

try:
from clickhouse_connect.driver.asyncclient import AsyncClient as _AsyncClient
except ModuleNotFoundError as ex:
Expand Down Expand Up @@ -315,3 +331,38 @@ async def create_async_client(
)
await client._initialize()
return client


def _create_chdb_client(
*,
database: str,
settings: dict[str, Any] | None,
generic_args: dict[str, Any] | None,
kwargs: dict[str, Any],
) -> Client:
from clickhouse_connect.driver.chdbclient import ChdbClient

settings = dict(settings or {})
if generic_args:
for name, value in generic_args.items():
if name.startswith("ch_"):
name = name[3:]
settings[name] = value
return ChdbClient(
database=database,
settings=settings,
**kwargs,
)


def _create_chdb_async_client(
*,
database: str,
settings: dict[str, Any] | None,
generic_args: dict[str, Any] | None,
kwargs: dict[str, Any],
):
from clickhouse_connect.driver.chdbasync import AsyncChdbClient

sync_client = _create_chdb_client(database=database, settings=settings, generic_args=generic_args, kwargs=kwargs)
return AsyncChdbClient(sync_client) # type: ignore[arg-type]
314 changes: 314 additions & 0 deletions clickhouse_connect/driver/chdbasync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
"""
Async wrapper around ChdbClient.

chdb has no native async API, so this client delegates each call to the wrapped
sync ChdbClient via `asyncio.get_running_loop().run_in_executor(...)`. Because
ChdbClient serializes concurrent calls on a per-client `threading.Lock`,
gather()-style concurrency on a single AsyncChdbClient does not actually run in
parallel — for true parallelism, create multiple clients.
"""

from __future__ import annotations

import asyncio
import io
from collections.abc import Generator, Iterable, Sequence
from datetime import tzinfo
from typing import TYPE_CHECKING, Any, BinaryIO

from clickhouse_connect.datatypes.base import ClickHouseType
from clickhouse_connect.driver.chdbclient import ChdbClient
from clickhouse_connect.driver.client import Client
from clickhouse_connect.driver.common import StreamContext
from clickhouse_connect.driver.external import ExternalData
from clickhouse_connect.driver.insert import InsertContext
from clickhouse_connect.driver.query import QueryContext, QueryResult, TzMode
from clickhouse_connect.driver.summary import QuerySummary

if TYPE_CHECKING:
import numpy
import pandas
import polars
import pyarrow


class AsyncChdbClient(Client):
"""
Async-facing client for the in-process chdb backend. Each public coroutine
schedules the corresponding sync ChdbClient call on the default thread
executor. Sync-only methods (settings, min_version) are passed through
directly.
"""

def __init__(self, sync: ChdbClient):
self._sync = sync
# Mirror attributes commonly read off the client object so user code that
# touches them (server_version, server_tz, database, etc.) keeps working.
self.server_tz = sync.server_tz
self.server_version = sync.server_version
self.server_settings = sync.server_settings
self.database = sync.database
self.uri = sync.uri
self.query_limit = sync.query_limit
self.query_retries = sync.query_retries
self.tz_mode = sync.tz_mode
self._tz_source = sync._tz_source
self._apply_server_tz = sync._apply_server_tz
self._dst_safe = sync._dst_safe
self.show_clickhouse_errors = sync.show_clickhouse_errors
self.protocol_version = sync.protocol_version
self.write_compression = sync.write_compression
self.compression = sync.compression
self._read_format = sync._read_format
self._write_format = sync._write_format
self._transform = sync._transform

@property
def chdb_connection(self):
return self._sync.chdb_connection

async def _run(self, func, *args, **kwargs):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, lambda: func(*args, **kwargs))

# ---- sync passthroughs (no I/O) ----

def set_client_setting(self, key: str, value: Any) -> None:
self._sync.set_client_setting(key, value)

def get_client_setting(self, key: str) -> str | None:
return self._sync.get_client_setting(key)

def set_access_token(self, access_token: str) -> None:
self._sync.set_access_token(access_token)

def min_version(self, version_str: str) -> bool:
return self._sync.min_version(version_str)

# ---- async overrides ----

async def _query_with_context(self, context: QueryContext) -> QueryResult: # type: ignore[override]
return await self._run(self._sync._query_with_context, context)

async def query( # type: ignore[override]
self,
query: str | None = None,
parameters: Sequence | dict[str, Any] | None = None,
settings: dict[str, Any] | None = None,
query_formats: dict[str, str] | None = None,
column_formats: dict[str, str | dict[str, str]] | None = None,
encoding: str | None = None,
use_none: bool | None = None,
column_oriented: bool | None = None,
use_numpy: bool | None = None,
max_str_len: int | None = None,
context: QueryContext | None = None,
query_tz: str | tzinfo | None = None,
column_tzs: dict[str, str | tzinfo] | None = None,
external_data: ExternalData | None = None,
transport_settings: dict[str, str] | None = None,
tz_mode: TzMode | None = None,
) -> QueryResult:
return await self._run(
lambda: self._sync.query(
query=query,
parameters=parameters,
settings=settings,
query_formats=query_formats,
column_formats=column_formats,
encoding=encoding,
use_none=use_none,
column_oriented=column_oriented,
use_numpy=use_numpy,
max_str_len=max_str_len,
context=context,
query_tz=query_tz,
column_tzs=column_tzs,
external_data=external_data,
transport_settings=transport_settings,
tz_mode=tz_mode,
)
)

async def query_column_block_stream(self, *args, **kwargs) -> StreamContext: # type: ignore[override]
return await self._run(lambda: self._sync.query_column_block_stream(*args, **kwargs))

async def query_row_block_stream(self, *args, **kwargs) -> StreamContext: # type: ignore[override]
return await self._run(lambda: self._sync.query_row_block_stream(*args, **kwargs))

async def query_rows_stream(self, *args, **kwargs) -> StreamContext: # type: ignore[override]
return await self._run(lambda: self._sync.query_rows_stream(*args, **kwargs))

async def query_np(self, *args, **kwargs) -> numpy.ndarray:
return await self._run(lambda: self._sync.query_np(*args, **kwargs))

async def query_np_stream(self, *args, **kwargs) -> StreamContext: # type: ignore[override]
return await self._run(lambda: self._sync.query_np_stream(*args, **kwargs))

async def query_df(self, *args, **kwargs) -> pandas.DataFrame:
return await self._run(lambda: self._sync.query_df(*args, **kwargs))

async def query_df_stream(self, *args, **kwargs) -> StreamContext: # type: ignore[override]
return await self._run(lambda: self._sync.query_df_stream(*args, **kwargs))

async def query_arrow(self, *args, **kwargs) -> pyarrow.Table:
return await self._run(lambda: self._sync.query_arrow(*args, **kwargs))

async def query_arrow_stream(self, *args, **kwargs) -> StreamContext: # type: ignore[override]
return await self._run(lambda: self._sync.query_arrow_stream(*args, **kwargs))

async def query_df_arrow(self, *args, **kwargs) -> pandas.DataFrame | polars.DataFrame:
return await self._run(lambda: self._sync.query_df_arrow(*args, **kwargs))

async def query_df_arrow_stream(self, *args, **kwargs) -> StreamContext: # type: ignore[override]
return await self._run(lambda: self._sync.query_df_arrow_stream(*args, **kwargs))

async def command( # type: ignore[override]
self,
cmd: str,
parameters: Sequence | dict[str, Any] | None = None,
data: str | bytes | None = None,
settings: dict[str, Any] | None = None,
use_database: bool = True,
external_data: ExternalData | None = None,
transport_settings: dict[str, str] | None = None,
) -> str | int | Sequence[str] | QuerySummary:
return await self._run(
lambda: self._sync.command(
cmd,
parameters=parameters,
data=data,
settings=settings,
use_database=use_database,
external_data=external_data,
transport_settings=transport_settings,
)
)

async def ping(self) -> bool: # type: ignore[override]
return await self._run(self._sync.ping)

async def raw_query( # type: ignore[override]
self,
query: str,
parameters: Sequence | dict[str, Any] | None = None,
settings: dict[str, Any] | None = None,
fmt: str | None = None,
use_database: bool = True,
external_data: ExternalData | None = None,
transport_settings: dict[str, str] | None = None,
) -> bytes:
return await self._run(
lambda: self._sync.raw_query(
query,
parameters=parameters,
settings=settings,
fmt=fmt,
use_database=use_database,
external_data=external_data,
transport_settings=transport_settings,
)
)

async def raw_stream( # type: ignore[override]
self,
query: str,
parameters: Sequence | dict[str, Any] | None = None,
settings: dict[str, Any] | None = None,
fmt: str | None = None,
use_database: bool = True,
external_data: ExternalData | None = None,
transport_settings: dict[str, str] | None = None,
) -> io.IOBase:
return await self._run(
lambda: self._sync.raw_stream(
query,
parameters=parameters,
settings=settings,
fmt=fmt,
use_database=use_database,
external_data=external_data,
transport_settings=transport_settings,
)
)

async def insert( # type: ignore[override]
self,
table: str | None = None,
data=None,
column_names: str | Iterable[str] = "*",
database: str | None = None,
column_types: Sequence[ClickHouseType] | None = None,
column_type_names: Sequence[str] | None = None,
column_oriented: bool = False,
settings: dict[str, Any] | None = None,
context: InsertContext | None = None,
transport_settings: dict[str, str] | None = None,
) -> QuerySummary:
return await self._run(
lambda: self._sync.insert(
table=table,
data=data,
column_names=column_names,
database=database,
column_types=column_types,
column_type_names=column_type_names,
column_oriented=column_oriented,
settings=settings,
context=context,
transport_settings=transport_settings,
)
)

async def insert_df(self, *args, **kwargs) -> QuerySummary: # type: ignore[override]
return await self._run(lambda: self._sync.insert_df(*args, **kwargs))

async def insert_arrow(self, *args, **kwargs) -> QuerySummary: # type: ignore[override]
return await self._run(lambda: self._sync.insert_arrow(*args, **kwargs))

async def insert_df_arrow(self, *args, **kwargs) -> QuerySummary: # type: ignore[override]
return await self._run(lambda: self._sync.insert_df_arrow(*args, **kwargs))

async def data_insert(self, context: InsertContext) -> QuerySummary: # type: ignore[override]
return await self._run(self._sync.data_insert, context)

async def raw_insert( # type: ignore[override]
self,
table: str | None = None,
column_names: Sequence[str] | None = None,
insert_block: str | bytes | Generator[bytes, None, None] | BinaryIO | None = None,
settings: dict[str, Any] | None = None,
fmt: str | None = None,
compression: str | None = None,
transport_settings: dict[str, str] | None = None,
) -> QuerySummary:
return await self._run(
lambda: self._sync.raw_insert(
table=table,
column_names=column_names,
insert_block=insert_block,
settings=settings,
fmt=fmt,
compression=compression,
transport_settings=transport_settings,
)
)

async def close(self) -> None: # type: ignore[override]
await self._run(self._sync.close)

async def close_connections(self) -> None: # type: ignore[override]
await self._run(self._sync.close_connections)

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc, tb):
await self.close()
return False

async def create_insert_context(self, *args, **kwargs) -> InsertContext: # type: ignore[override]
return await self._run(lambda: self._sync.create_insert_context(*args, **kwargs))

def create_query_context(self, *args, **kwargs) -> QueryContext:
return self._sync.create_query_context(*args, **kwargs)
Loading