From 10c7a945f64ff6e52462dc058b4f45a281748abf Mon Sep 17 00:00:00 2001 From: Coby Geralnik Date: Wed, 27 May 2026 11:56:52 +0300 Subject: [PATCH 1/3] Implement SQLAlchemy reflection for the cc_sqlalchemy dialect The cc_sqlalchemy dialect did not support SQLAlchemy reflection (MetaData.reflect / Inspector multi-table reflection), which broke sqlacodegen and any tool that calls `dialect.get_columns()` directly. This change fills in the missing dialect methods so reflection works end-to-end against a ClickHouse server. Changes: 1. dialect.py: add `ClickHouseDialect.get_columns()`. Previously only `ChInspector.get_columns()` existed, but SQLAlchemy's reflection path goes through `Dialect.get_multi_columns` -> `Dialect.get_columns` and never touches `Inspector.get_columns` on its own. Without a dialect implementation, `MetaData.reflect()` raised `NotImplementedError` from the SQLAlchemy base class. `get_pk_constraint()` / `get_primary_keys()` now return the actual primary key columns derived from `system.columns.is_in_primary_key` (which mirrors MergeTree's ORDER BY / PRIMARY KEY) instead of empty lists. This lets sqlacodegen generate declarative classes instead of bare `Table(...)` definitions for any MergeTree table. 2. inspector.py: promote `get_columns` and `get_pk_constraint` to module-level functions so the dialect can call the same logic. `ChInspector.reflect_table()` now applies the PK constraint to reflected columns (it was building columns with no PK info, so even direct `Table('asset', md, autoload_with=engine)` reflection lost the primary key). 3. datatypes/sqltypes.py: replace `python_type = None` on UDT-based types with concrete Python types. SQLAlchemy's contract for `TypeEngine.python_type` is that it either returns a class or raises `NotImplementedError`; returning `None` makes any consumer that does `python_type.__module__` / `__name__` crash with `AttributeError: 'NoneType' object has no attribute '__module__'` (sqlacodegen, and anything else that walks python_type for annotations or metadata). - UUID -> uuid.UUID - IPv4 / IPv6 -> ipaddress.IPv4Address / IPv6Address - Nothing -> type(None) - Point -> tuple - Ring / Polygon -> list - LineString etc. -> list - JSON -> dict - Nested -> list - (Simple)AggregateFunction -> str 4. datatypes/sqltypes.py: `Array` now subclasses `sqlalchemy.types.ARRAY` (alongside `ChSqlaType`) and exposes `item_type` as a regular instance attribute plus `dimensions = 1`. Two effects: - `isinstance(col.type, sqlalchemy.types.ARRAY)` now matches CH arrays, which lets sqlacodegen render `Mapped[list[T]]` annotations for single-dim arrays without special-casing. - `item_type` is mutable so sqlacodegen's `fix_column_types` adaptation pass (which reassigns `new_coltype.item_type`) works. `dimensions = 1` reflects CH's type system: every Array is one-dimensional and nested arrays (`Array(Array(String))`) are represented via the inner item type, not via a dimension count. Tests: - tests/integration_tests/test_sqlalchemy/test_reflect.py: `test_metadata_reflect_and_primary_keys` exercises the `Dialect.get_columns` reflection path via `MetaData.reflect()` and asserts composite primary key reflection from a MergeTree ORDER BY clause, both via `MetaData.reflect()` and via direct `Table(autoload_with=...)`. End-to-end effect: `MetaData.reflect()` and `sqlacodegen ` now produce a complete, importable Python module with declarative ORM classes, composite primary keys, and typed `Mapped[...]` annotations against a real ClickHouse schema. No changes to the runtime client paths. --- CHANGELOG.md | 3 + .../cc_sqlalchemy/datatypes/sqltypes.py | 55 +++++++------- clickhouse_connect/cc_sqlalchemy/dialect.py | 9 ++- clickhouse_connect/cc_sqlalchemy/inspector.py | 72 ++++++++++++------- .../test_sqlalchemy/test_reflect.py | 28 ++++++++ 5 files changed, 113 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9fbde5cd..cf7cf189 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,9 @@ - Async client: retry stale keep-alive resets surfaced by aiohttp as `ClientOSError` or `ClientConnectionResetError`, fixing large async inserts on killed pooled connections. Closes [#763](https://github.com/ClickHouse/clickhouse-connect/issues/763). - Async client: do not retry aiohttp timeout, connector, or fingerprint errors as these can indicate the request was already delivered or a config issue, not a stale connection. - Sync client: also retry stale keep-alive `BrokenPipeError` (in addition to `ConnectionResetError`), matching the async behavior. +- SQLAlchemy: implement reflection on the dialect itself so `MetaData.reflect()` and `Inspector.get_multi_columns()` work. `get_pk_constraint()` / `get_primary_keys()` now derive primary key columns from `system.columns.is_in_primary_key` instead of returning empty lists. +- SQLAlchemy: UDT-based types (`UUID`, `IPv4`/`IPv6`, `JSON`, `Nested`, geometry types, `AggregateFunction`, etc.) now return concrete `python_type` classes instead of `None`, matching SQLAlchemy's `TypeEngine.python_type` contract. +- SQLAlchemy: `Array` now subclasses `sqlalchemy.types.ARRAY` and exposes `item_type`. ## 1.1.0, 2026-05-26 diff --git a/clickhouse_connect/cc_sqlalchemy/datatypes/sqltypes.py b/clickhouse_connect/cc_sqlalchemy/datatypes/sqltypes.py index 94d8e67f..161700c4 100644 --- a/clickhouse_connect/cc_sqlalchemy/datatypes/sqltypes.py +++ b/clickhouse_connect/cc_sqlalchemy/datatypes/sqltypes.py @@ -1,7 +1,17 @@ +import ipaddress +import uuid from collections.abc import Sequence from enum import Enum as PyEnum from sqlalchemy.exc import ArgumentError +from sqlalchemy.types import ( + ARRAY, + Float, + Integer, + Interval, + Numeric, + UserDefinedType, +) from sqlalchemy.types import ( Boolean as SqlaBoolean, ) @@ -11,18 +21,11 @@ from sqlalchemy.types import ( DateTime as SqlaDateTime, ) -from sqlalchemy.types import ( - Float, - Integer, - Interval, - Numeric, - UserDefinedType, -) from sqlalchemy.types import ( String as SqlaString, ) -from clickhouse_connect.cc_sqlalchemy.datatypes.base import ChSqlaType, schema_types +from clickhouse_connect.cc_sqlalchemy.datatypes.base import ChSqlaType, schema_types, sqla_type_from_name from clickhouse_connect.datatypes.base import EMPTY_TYPE_DEF, LC_TYPE_DEF, NULLABLE_TYPE_DEF, TypeDef from clickhouse_connect.datatypes.numeric import Enum8 as ChEnum8 from clickhouse_connect.datatypes.numeric import Enum16 as ChEnum16 @@ -209,43 +212,43 @@ def __init__(self, size: int = -1, type_def: TypeDef = None): class IPv4(ChSqlaType, UserDefinedType): - python_type = None + python_type = ipaddress.IPv4Address class IPv6(ChSqlaType, UserDefinedType): - python_type = None + python_type = ipaddress.IPv6Address class UUID(ChSqlaType, UserDefinedType): - python_type = None + python_type = uuid.UUID class Nothing(ChSqlaType, UserDefinedType): - python_type = None + python_type = type(None) class Point(ChSqlaType, UserDefinedType): - python_type = None + python_type = tuple class Ring(ChSqlaType, UserDefinedType): - python_type = None + python_type = list class Polygon(ChSqlaType, UserDefinedType): - python_type = None + python_type = list class MultiPolygon(ChSqlaType, UserDefinedType): - python_type = None + python_type = list class LineString(ChSqlaType, UserDefinedType): - python_type = None + python_type = list class MultiLineString(ChSqlaType, UserDefinedType): - python_type = None + python_type = list class Date(ChSqlaType, SqlaDate): @@ -412,8 +415,9 @@ def __new__(cls, element: ChSqlaType | type[ChSqlaType]): return element.__class__(type_def=TypeDef(wrappers, orig.keys, orig.values)) -class Array(ChSqlaType, UserDefinedType): +class Array(ChSqlaType, ARRAY): python_type = list + dimensions = 1 def __init__(self, element: ChSqlaType | type[ChSqlaType] = None, type_def: TypeDef = None): """ @@ -425,7 +429,10 @@ def __init__(self, element: ChSqlaType | type[ChSqlaType] = None, type_def: Type if callable(element): element = element() type_def = TypeDef(values=(element.name,)) - super().__init__(type_def) + ChSqlaType.__init__(self, type_def) + # Set item_type directly; calling ARRAY.__init__ would reject nested Array(Array(T)), + # which CH supports natively (CH expresses dimensions via nesting, not a dim count). + self.item_type = sqla_type_from_name(type_def.values[0]) class Map(ChSqlaType, UserDefinedType): @@ -489,7 +496,7 @@ class JSON(ChSqlaType, UserDefinedType): Note this isn't currently supported for insert/select, only table definitions """ - python_type = None + python_type = dict class Nested(ChSqlaType, UserDefinedType): @@ -497,11 +504,11 @@ class Nested(ChSqlaType, UserDefinedType): Note this isn't currently supported for insert/select, only table definitions """ - python_type = None + python_type = list class SimpleAggregateFunction(ChSqlaType, UserDefinedType): - python_type = None + python_type = str def __init__( self, @@ -532,7 +539,7 @@ class AggregateFunction(ChSqlaType, UserDefinedType): Note this isn't currently supported for insert/select, only table definitions """ - python_type = None + python_type = str def __init__(self, *params, type_def: TypeDef = None): """ diff --git a/clickhouse_connect/cc_sqlalchemy/dialect.py b/clickhouse_connect/cc_sqlalchemy/dialect.py index 4e47b96d..3c6bae0d 100644 --- a/clickhouse_connect/cc_sqlalchemy/dialect.py +++ b/clickhouse_connect/cc_sqlalchemy/dialect.py @@ -5,7 +5,7 @@ from clickhouse_connect import dbapi from clickhouse_connect.cc_sqlalchemy import dialect_name, ischema_names -from clickhouse_connect.cc_sqlalchemy.inspector import ChInspector, get_table_metadata +from clickhouse_connect.cc_sqlalchemy.inspector import ChInspector, get_columns, get_pk_constraint, get_table_metadata from clickhouse_connect.cc_sqlalchemy.sql import full_table from clickhouse_connect.cc_sqlalchemy.sql.compiler import ChStatementCompiler from clickhouse_connect.cc_sqlalchemy.sql.ddlcompiler import ChDDLCompiler @@ -94,11 +94,14 @@ def get_table_names(self, connection, schema=None, **kw): cmd += " FROM " + quote_identifier(schema) return [row.name for row in connection.execute(text(cmd))] + def get_columns(self, connection, table_name, schema=None, **kw): + return get_columns(connection, table_name, schema) + def get_primary_keys(self, connection, table_name, schema=None, **kw): - return [] + return get_pk_constraint(connection, table_name, schema)["constrained_columns"] def get_pk_constraint(self, connection, table_name, schema=None, **kw): - return {"constrained_columns": [], "name": None} + return get_pk_constraint(connection, table_name, schema) def get_foreign_keys(self, connection, table_name, schema=None, **kw): return [] diff --git a/clickhouse_connect/cc_sqlalchemy/inspector.py b/clickhouse_connect/cc_sqlalchemy/inspector.py index 68875208..80508651 100644 --- a/clickhouse_connect/cc_sqlalchemy/inspector.py +++ b/clickhouse_connect/cc_sqlalchemy/inspector.py @@ -121,6 +121,47 @@ def get_dictionary_metadata(connection, table_name: str, schema: str | None = No return metadata +def get_pk_constraint(connection, table_name: str, schema: str | None = None) -> dict[str, Any]: + database = _database_name(connection, schema) + result_set = connection.execute( + text( + "SELECT name FROM system.columns WHERE database = :database AND table = :table_name AND is_in_primary_key = 1 ORDER BY position" + ), + {"database": database, "table_name": table_name}, + ) + return {"constrained_columns": [row.name for row in result_set], "name": None} + + +def get_columns(connection, table_name: str, schema: str | None = None) -> list[dict[str, Any]]: + table_metadata = get_table_metadata(connection, table_name, schema) + if table_metadata.engine == "Dictionary": + return get_dictionary_columns(connection, table_name, schema) + table_id = full_table(table_name, schema) + result_set = connection.execute(text(f"DESCRIBE TABLE {table_id}")) + if not result_set: + raise NoResultFound(f"Table {table_id} does not exist") + columns = [] + for row in result_set: + sqla_type = sqla_type_from_name(row.type.replace("\n", "")) + col = { + "name": row.name, + "type": sqla_type, + "nullable": sqla_type.nullable, + "autoincrement": False, + "comment": row.comment or None, + "clickhouse_codec": row.codec_expression or None, + "clickhouse_ttl": text(row.ttl_expression) if row.ttl_expression else None, + } + if row.default_type == "DEFAULT" and row.default_expression: + col["server_default"] = text(row.default_expression) + elif row.default_type == "MATERIALIZED" and row.default_expression: + col["clickhouse_materialized"] = text(row.default_expression) + elif row.default_type == "ALIAS" and row.default_expression: + col["clickhouse_alias"] = text(row.default_expression) + columns.append(col) + return columns + + class ChInspector(Inspector): def reflect_table( self, @@ -137,12 +178,15 @@ def reflect_table( else: reflected_columns = self.get_columns(table.name, schema) + pk_columns = set(get_pk_constraint(self.bind, table.name, schema)["constrained_columns"]) for col in reflected_columns: name = col.pop("name") if (include_columns and name not in include_columns) or (exclude_columns and name in exclude_columns): continue col_type = col.pop("type") col_args = {key: value for key, value in col.items() if value is not None} + if name in pk_columns: + col_args["primary_key"] = True table.append_column(sa_schema.Column(name, col_type, **col_args)) if table_metadata.engine == "Dictionary": dictionary_metadata = get_dictionary_metadata(self.bind, table.name, schema) @@ -157,30 +201,4 @@ def reflect_table( table.kwargs["clickhouse_engine"] = table.engine def get_columns(self, table_name, schema=None, **_kwargs): - table_metadata = get_table_metadata(self.bind, table_name, schema) - if table_metadata.engine == "Dictionary": - return get_dictionary_columns(self.bind, table_name, schema) - table_id = full_table(table_name, schema) - result_set = self.bind.execute(text(f"DESCRIBE TABLE {table_id}")) - if not result_set: - raise NoResultFound(f"Table {table_id} does not exist") - columns = [] - for row in result_set: - sqla_type = sqla_type_from_name(row.type.replace("\n", "")) - col = { - "name": row.name, - "type": sqla_type, - "nullable": sqla_type.nullable, - "autoincrement": False, - "comment": row.comment or None, - "clickhouse_codec": row.codec_expression or None, - "clickhouse_ttl": text(row.ttl_expression) if row.ttl_expression else None, - } - if row.default_type == "DEFAULT" and row.default_expression: - col["server_default"] = text(row.default_expression) - elif row.default_type == "MATERIALIZED" and row.default_expression: - col["clickhouse_materialized"] = text(row.default_expression) - elif row.default_type == "ALIAS" and row.default_expression: - col["clickhouse_alias"] = text(row.default_expression) - columns.append(col) - return columns + return get_columns(self.bind, table_name, schema) diff --git a/tests/integration_tests/test_sqlalchemy/test_reflect.py b/tests/integration_tests/test_sqlalchemy/test_reflect.py index 311a7675..c1f5177f 100644 --- a/tests/integration_tests/test_sqlalchemy/test_reflect.py +++ b/tests/integration_tests/test_sqlalchemy/test_reflect.py @@ -70,3 +70,31 @@ def test_get_table_names(test_engine: Engine, test_db: str): assert isinstance(system_tables, list) assert "columns" in system_tables assert "fake_table" not in system_tables + + +def test_metadata_reflect_and_primary_keys(test_engine: Engine, test_db: str): + """Dialect-level reflection. MetaData.reflect() exercises the + Dialect.get_multi_columns -> Dialect.get_columns path (not + Inspector.get_columns), which previously raised NotImplementedError. + Primary key columns derived from system.columns.is_in_primary_key must + also land on the reflected Table, both via MetaData.reflect() and via a + direct Table(autoload_with=...) call.""" + common.set_setting("invalid_setting_action", "drop") + with test_engine.begin() as conn: + conn.execute(text(f"DROP TABLE IF EXISTS {test_db}.reflect_pk_test")) + conn.execute( + text( + f"CREATE TABLE {test_db}.reflect_pk_test (org_id UInt32, id UInt64, payload String) ENGINE MergeTree ORDER BY (org_id, id)" + ) + ) + + metadata = db.MetaData(schema=test_db) + metadata.reflect(bind=test_engine, only=["reflect_pk_test"]) + table = metadata.tables[f"{test_db}.reflect_pk_test"] + + assert {c.name for c in table.columns} == {"org_id", "id", "payload"} + assert [c.name for c in table.primary_key.columns] == ["org_id", "id"] + + # Direct autoload should also surface the composite PK. + table2 = db.Table("reflect_pk_test", db.MetaData(schema=test_db), autoload_with=test_engine) + assert [c.name for c in table2.primary_key.columns] == ["org_id", "id"] From 3d8341275a4da466049d83c729cd7fff58cded0c Mon Sep 17 00:00:00 2001 From: Coby Geralnik Date: Fri, 29 May 2026 22:33:25 +0300 Subject: [PATCH 2/3] Address review: drop PK reflection, set Array.as_tuple Per review on #766, the dialect no longer reflects a primary key. ClickHouse PRIMARY KEY / ORDER BY is a sparse index, not a uniqueness guarantee, so get_primary_keys / get_pk_constraint return empty results and the identity key is left for application code to declare. Removes the is_in_primary_key query and the PK-application path in reflect_table. Also set Array.as_tuple = False. Array bypasses ARRAY.__init__ to allow nested arrays, but as_tuple has no class-level default and ARRAY.hashable reads it, so select(arr).unique() raised AttributeError before. --- CHANGELOG.md | 6 +++--- .../cc_sqlalchemy/datatypes/sqltypes.py | 2 ++ clickhouse_connect/cc_sqlalchemy/dialect.py | 6 +++--- clickhouse_connect/cc_sqlalchemy/inspector.py | 14 -------------- .../test_sqlalchemy/test_reflect.py | 15 ++++++++------- 5 files changed, 16 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cf7cf189..e2693be6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,9 @@ ### Bug Fixes - A `datetime` bound to a server-side `{name:DateTime64(...)}` placeholder now keeps its sub-second precision instead of being truncated to seconds. The declared parameter type drives this, so no `_64` name suffix or manual `DT64Param` wrapper is needed, and it applies through `Array` and `Tuple` hints. Plain `DateTime` binds are unchanged. Closes [#739](https://github.com/ClickHouse/clickhouse-connect/issues/739). - Strip `--` line comments that have no following space when classifying queries, so a DDL with a leading `--sql`-style comment is routed as a command instead of raising `StreamFailureError`. Closes [#499](https://github.com/ClickHouse/clickhouse-connect/issues/499). +- SQLAlchemy: implement reflection on the dialect itself so `MetaData.reflect()` and `Inspector.get_multi_columns()` work. +- SQLAlchemy: UDT-based types (`UUID`, `IPv4`/`IPv6`, `JSON`, `Nested`, geometry types, `AggregateFunction`, etc.) now return concrete `python_type` classes instead of `None`, matching SQLAlchemy's `TypeEngine.python_type` contract. +- SQLAlchemy: `Array` now subclasses `sqlalchemy.types.ARRAY` and exposes `item_type`. ## 1.1.1, 2026-05-27 @@ -17,9 +20,6 @@ - Async client: retry stale keep-alive resets surfaced by aiohttp as `ClientOSError` or `ClientConnectionResetError`, fixing large async inserts on killed pooled connections. Closes [#763](https://github.com/ClickHouse/clickhouse-connect/issues/763). - Async client: do not retry aiohttp timeout, connector, or fingerprint errors as these can indicate the request was already delivered or a config issue, not a stale connection. - Sync client: also retry stale keep-alive `BrokenPipeError` (in addition to `ConnectionResetError`), matching the async behavior. -- SQLAlchemy: implement reflection on the dialect itself so `MetaData.reflect()` and `Inspector.get_multi_columns()` work. `get_pk_constraint()` / `get_primary_keys()` now derive primary key columns from `system.columns.is_in_primary_key` instead of returning empty lists. -- SQLAlchemy: UDT-based types (`UUID`, `IPv4`/`IPv6`, `JSON`, `Nested`, geometry types, `AggregateFunction`, etc.) now return concrete `python_type` classes instead of `None`, matching SQLAlchemy's `TypeEngine.python_type` contract. -- SQLAlchemy: `Array` now subclasses `sqlalchemy.types.ARRAY` and exposes `item_type`. ## 1.1.0, 2026-05-26 diff --git a/clickhouse_connect/cc_sqlalchemy/datatypes/sqltypes.py b/clickhouse_connect/cc_sqlalchemy/datatypes/sqltypes.py index 161700c4..53561505 100644 --- a/clickhouse_connect/cc_sqlalchemy/datatypes/sqltypes.py +++ b/clickhouse_connect/cc_sqlalchemy/datatypes/sqltypes.py @@ -432,7 +432,9 @@ def __init__(self, element: ChSqlaType | type[ChSqlaType] = None, type_def: Type ChSqlaType.__init__(self, type_def) # Set item_type directly; calling ARRAY.__init__ would reject nested Array(Array(T)), # which CH supports natively (CH expresses dimensions via nesting, not a dim count). + # as_tuple has no class-level default, so set it here to satisfy ARRAY result processing. self.item_type = sqla_type_from_name(type_def.values[0]) + self.as_tuple = False class Map(ChSqlaType, UserDefinedType): diff --git a/clickhouse_connect/cc_sqlalchemy/dialect.py b/clickhouse_connect/cc_sqlalchemy/dialect.py index 3c6bae0d..368e8042 100644 --- a/clickhouse_connect/cc_sqlalchemy/dialect.py +++ b/clickhouse_connect/cc_sqlalchemy/dialect.py @@ -5,7 +5,7 @@ from clickhouse_connect import dbapi from clickhouse_connect.cc_sqlalchemy import dialect_name, ischema_names -from clickhouse_connect.cc_sqlalchemy.inspector import ChInspector, get_columns, get_pk_constraint, get_table_metadata +from clickhouse_connect.cc_sqlalchemy.inspector import ChInspector, get_columns, get_table_metadata from clickhouse_connect.cc_sqlalchemy.sql import full_table from clickhouse_connect.cc_sqlalchemy.sql.compiler import ChStatementCompiler from clickhouse_connect.cc_sqlalchemy.sql.ddlcompiler import ChDDLCompiler @@ -98,10 +98,10 @@ def get_columns(self, connection, table_name, schema=None, **kw): return get_columns(connection, table_name, schema) def get_primary_keys(self, connection, table_name, schema=None, **kw): - return get_pk_constraint(connection, table_name, schema)["constrained_columns"] + return [] def get_pk_constraint(self, connection, table_name, schema=None, **kw): - return get_pk_constraint(connection, table_name, schema) + return {"constrained_columns": [], "name": None} def get_foreign_keys(self, connection, table_name, schema=None, **kw): return [] diff --git a/clickhouse_connect/cc_sqlalchemy/inspector.py b/clickhouse_connect/cc_sqlalchemy/inspector.py index 80508651..e566fec9 100644 --- a/clickhouse_connect/cc_sqlalchemy/inspector.py +++ b/clickhouse_connect/cc_sqlalchemy/inspector.py @@ -121,17 +121,6 @@ def get_dictionary_metadata(connection, table_name: str, schema: str | None = No return metadata -def get_pk_constraint(connection, table_name: str, schema: str | None = None) -> dict[str, Any]: - database = _database_name(connection, schema) - result_set = connection.execute( - text( - "SELECT name FROM system.columns WHERE database = :database AND table = :table_name AND is_in_primary_key = 1 ORDER BY position" - ), - {"database": database, "table_name": table_name}, - ) - return {"constrained_columns": [row.name for row in result_set], "name": None} - - def get_columns(connection, table_name: str, schema: str | None = None) -> list[dict[str, Any]]: table_metadata = get_table_metadata(connection, table_name, schema) if table_metadata.engine == "Dictionary": @@ -178,15 +167,12 @@ def reflect_table( else: reflected_columns = self.get_columns(table.name, schema) - pk_columns = set(get_pk_constraint(self.bind, table.name, schema)["constrained_columns"]) for col in reflected_columns: name = col.pop("name") if (include_columns and name not in include_columns) or (exclude_columns and name in exclude_columns): continue col_type = col.pop("type") col_args = {key: value for key, value in col.items() if value is not None} - if name in pk_columns: - col_args["primary_key"] = True table.append_column(sa_schema.Column(name, col_type, **col_args)) if table_metadata.engine == "Dictionary": dictionary_metadata = get_dictionary_metadata(self.bind, table.name, schema) diff --git a/tests/integration_tests/test_sqlalchemy/test_reflect.py b/tests/integration_tests/test_sqlalchemy/test_reflect.py index c1f5177f..0e07c836 100644 --- a/tests/integration_tests/test_sqlalchemy/test_reflect.py +++ b/tests/integration_tests/test_sqlalchemy/test_reflect.py @@ -72,13 +72,13 @@ def test_get_table_names(test_engine: Engine, test_db: str): assert "fake_table" not in system_tables -def test_metadata_reflect_and_primary_keys(test_engine: Engine, test_db: str): +def test_metadata_reflect(test_engine: Engine, test_db: str): """Dialect-level reflection. MetaData.reflect() exercises the Dialect.get_multi_columns -> Dialect.get_columns path (not Inspector.get_columns), which previously raised NotImplementedError. - Primary key columns derived from system.columns.is_in_primary_key must - also land on the reflected Table, both via MetaData.reflect() and via a - direct Table(autoload_with=...) call.""" + The dialect does not reflect a primary key: ClickHouse PRIMARY KEY / + ORDER BY is not a uniqueness guarantee, so the identity key is left for + application code to declare explicitly.""" common.set_setting("invalid_setting_action", "drop") with test_engine.begin() as conn: conn.execute(text(f"DROP TABLE IF EXISTS {test_db}.reflect_pk_test")) @@ -93,8 +93,9 @@ def test_metadata_reflect_and_primary_keys(test_engine: Engine, test_db: str): table = metadata.tables[f"{test_db}.reflect_pk_test"] assert {c.name for c in table.columns} == {"org_id", "id", "payload"} - assert [c.name for c in table.primary_key.columns] == ["org_id", "id"] + assert list(table.primary_key.columns) == [] - # Direct autoload should also surface the composite PK. + # Direct autoload should also populate columns without a reflected PK. table2 = db.Table("reflect_pk_test", db.MetaData(schema=test_db), autoload_with=test_engine) - assert [c.name for c in table2.primary_key.columns] == ["org_id", "id"] + assert {c.name for c in table2.columns} == {"org_id", "id", "payload"} + assert list(table2.primary_key.columns) == [] From 78fd6aa6100805fda4cbd1f2c416d9668f3e6b23 Mon Sep 17 00:00:00 2001 From: Coby Geralnik Date: Thu, 4 Jun 2026 12:57:05 +0300 Subject: [PATCH 3/3] Add test for user-declared primary key surviving reflection --- .../test_sqlalchemy/test_reflect.py | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/tests/integration_tests/test_sqlalchemy/test_reflect.py b/tests/integration_tests/test_sqlalchemy/test_reflect.py index 0e07c836..6da2669f 100644 --- a/tests/integration_tests/test_sqlalchemy/test_reflect.py +++ b/tests/integration_tests/test_sqlalchemy/test_reflect.py @@ -99,3 +99,26 @@ def test_metadata_reflect(test_engine: Engine, test_db: str): table2 = db.Table("reflect_pk_test", db.MetaData(schema=test_db), autoload_with=test_engine) assert {c.name for c in table2.columns} == {"org_id", "id", "payload"} assert list(table2.primary_key.columns) == [] + + +def test_user_declared_primary_key(test_engine: Engine, test_db: str): + """A user-declared primary key on a pre-declared column survives reflection.""" + common.set_setting("invalid_setting_action", "drop") + with test_engine.begin() as conn: + conn.execute(text(f"DROP TABLE IF EXISTS {test_db}.reflect_user_pk_test")) + conn.execute( + text( + f"CREATE TABLE {test_db}.reflect_user_pk_test (org_id UInt32, id UInt64, payload String) " + "ENGINE MergeTree ORDER BY (org_id, id)" + ) + ) + + table = db.Table( + "reflect_user_pk_test", + db.MetaData(schema=test_db), + db.Column("org_id", UInt32, primary_key=True), + db.Column("id", db.BigInteger, primary_key=True), + autoload_with=test_engine, + ) + assert [c.name for c in table.primary_key.columns] == ["org_id", "id"] + assert {c.name for c in table.columns} == {"org_id", "id", "payload"}