Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions dlt/destinations/impl/sqlalchemy/db_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
DatabaseTerminalException,
DatabaseTransientException,
LoadClientNotConnected,
DatabaseException,
)
from dlt.common.destination.dataset import DBApiCursor
from dlt.common.typing import TFun
Expand Down Expand Up @@ -201,7 +200,7 @@ def _ensure_transaction(self) -> Iterator[DBTransaction]:
def has_dataset(self) -> bool:
with self._ensure_transaction():
schema_names = self.engine.dialect.get_schema_names(self._current_connection) # type: ignore[attr-defined]
return self.dataset_name in schema_names
return self._dialect_caps.dataset_exists(schema_names, self.dataset_name)

def _sqlite_dataset_filename(self, dataset_name: str) -> str:
current_file_path = Path(self.database_name)
Expand Down Expand Up @@ -276,15 +275,12 @@ def with_alternative_dataset_name(
def create_dataset(self) -> None:
if self.dialect_name == "sqlite":
return self._sqlite_create_dataset(self.dataset_name)
self.execute_sql(sa.schema.CreateSchema(self.dataset_name))
self._dialect_caps.create_dataset(self)

def drop_dataset(self) -> None:
if self.dialect_name == "sqlite":
return self._sqlite_drop_dataset(self.dataset_name)
try:
self.execute_sql(sa.schema.DropSchema(self.dataset_name, cascade=True))
except DatabaseException: # Try again in case cascade is not supported
self.execute_sql(sa.schema.DropSchema(self.dataset_name))
self._dialect_caps.drop_dataset(self)

def truncate_tables(self, *tables: str) -> None:
# TODO: alchemy doesn't have a construct for TRUNCATE TABLE
Expand Down
65 changes: 63 additions & 2 deletions dlt/destinations/impl/sqlalchemy/dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@
for dialects that are not built-in. See register_dialect_capabilities.
"""

from typing import Any, Dict, Optional, Type
from typing import Any, Dict, List, Optional, Type, TYPE_CHECKING

import sqlalchemy as sa # noqa

from dlt.common.destination.capabilities import DataTypeMapper, DestinationCapabilitiesContext
from dlt.common.destination.typing import PreparedTableSchema
from dlt.destinations.exceptions import DatabaseException, DatabaseTerminalException

if TYPE_CHECKING:
from dlt.destinations.impl.sqlalchemy.db_api_client import SqlalchemyClient


_GENERIC_UNDEFINED_RELATION_PATTERNS = [
Expand Down Expand Up @@ -58,6 +62,8 @@ class DialectCapabilities:
* adapt_table -- modify an sa.Table object before it is materialized
(e.g. reorder columns for StarRocks)
* is_undefined_relation -- detect "table/schema not found" errors for the dialect
* dataset_exists / create_dataset / drop_dataset -- adapt schema (dataset) lifecycle
for dialects that do not support bare CREATE SCHEMA (e.g. Oracle)

The sqlglot_dialect property maps backend names to sqlglot dialect names. Override
it in subclasses or add entries to SQLGLOT_DIALECTS for non-obvious mappings.
Expand Down Expand Up @@ -127,6 +133,26 @@ def is_undefined_relation(self, e: Exception) -> Optional[bool]:
return True
return None

def dataset_exists(self, schema_names: List[str], dataset_name: str) -> bool:
"""Return True if the dataset (schema) exists among the schemas reported by the database.

Args:
schema_names: Schema names as returned by the dialect's get_schema_names.
dataset_name: Name of the dataset (schema) dlt is looking for.
"""
return dataset_name in schema_names

def create_dataset(self, client: "SqlalchemyClient") -> None:
"""Create the dataset (schema) identified by client.dataset_name."""
client.execute_sql(sa.schema.CreateSchema(client.dataset_name))

def drop_dataset(self, client: "SqlalchemyClient") -> None:
"""Drop the dataset (schema) identified by client.dataset_name and all objects it contains."""
try:
client.execute_sql(sa.schema.DropSchema(client.dataset_name, cascade=True))
except DatabaseException: # Try again in case cascade is not supported
client.execute_sql(sa.schema.DropSchema(client.dataset_name))


DIALECT_CAPS_REGISTRY: Dict[str, Type[DialectCapabilities]] = {}
"""Maps dialect / backend name to the DialectCapabilities class that handles it."""
Expand Down Expand Up @@ -214,7 +240,13 @@ def type_mapper_class(self) -> Type[DataTypeMapper]:


class OracleDialectCapabilities(DialectCapabilities):
"""Capabilities for Oracle."""
"""Capabilities for Oracle.

In Oracle a schema is owned by a database user and cannot be created with a bare
`CREATE SCHEMA` statement (that fails with ORA-02420). dlt therefore treats the dataset
as an existing schema (user) that must be created in advance and only manages the tables
within it.
"""

def is_undefined_relation(self, e: Exception) -> Optional[bool]:
msg = str(e).lower()
Expand All @@ -223,6 +255,35 @@ def is_undefined_relation(self, e: Exception) -> Optional[bool]:
return True
return super().is_undefined_relation(e)

def dataset_exists(self, schema_names: List[str], dataset_name: str) -> bool:
# Oracle folds unquoted identifiers to upper case, so match case-insensitively
folded = dataset_name.casefold()
return any(name.casefold() == folded for name in schema_names)

def create_dataset(self, client: "SqlalchemyClient") -> None:
# Oracle has no bare CREATE SCHEMA (schemas are users); the schema must already exist
if client.has_dataset():
return
raise DatabaseTerminalException(
Exception(
f"Oracle schema (user) '{client.dataset_name}' does not exist and cannot be"
" created by dlt. In Oracle a schema is owned by a database user and must be"
' created in advance, e.g. `CREATE USER "'
f"{client.dataset_name}"
'" IDENTIFIED BY ...` with the appropriate quota and grants (CREATE SESSION,'
" CREATE TABLE, ...). The staging dataset (named '<dataset_name>_staging') used by"
" merge and replace write dispositions must be created the same way. Create the"
" schema(s) manually and run the pipeline again."
)
)

def drop_dataset(self, client: "SqlalchemyClient") -> None:
# Oracle cannot DROP SCHEMA (that would require DROP USER, a DBA privilege the loader
# rarely has and which dlt does not own); drop the tables within the schema instead
table_names = sa.inspect(client.engine).get_table_names(schema=client.dataset_name)
if table_names:
client.drop_tables(*table_names)


def _format_mysql_datetime_literal(v: Any, precision: int = 6, no_tz: bool = False) -> str:
from dlt.common.data_writers.escape import format_datetime_literal
Expand Down
13 changes: 11 additions & 2 deletions docs/website/docs/dlt-ecosystem/destinations/sqlalchemy.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,13 @@ Please report issues with particular dialects. We'll try to make them work.
* Trino does not support PRIMARY/UNIQUE constraints

### Oracle limitations
* In Oracle, regular (non-DBA, non-SYS/SYSOPS) users are assigned one schema on user creation, and usually cannot create other schemas. For features requiring staging datasets you should either ensure schema creation rights for the DB user or exactly specify existing schema to be used for staging dataset. See [staging dataset documentation](../staging.md#staging-dataset) for more details
* In Oracle a schema is owned by a database user, and there is no bare `CREATE SCHEMA` statement (it requires `CREATE SCHEMA AUTHORIZATION <user> ...`, which fails with `ORA-02420` otherwise). dlt therefore does not try to create the schema on Oracle. Your `dataset_name` must point to an existing schema (user). If it does not exist, dlt raises a clear error instead of the cryptic `ORA-02420`. Create the schema in advance, e.g.:
```sql
CREATE USER my_dataset IDENTIFIED BY "..." DEFAULT TABLESPACE my_ts QUOTA UNLIMITED ON my_ts;
GRANT CREATE SESSION, CREATE TABLE TO my_dataset;
```
* Regular (non-DBA, non-SYS/SYSOPS) users are assigned one schema on user creation and usually cannot create other schemas. For features requiring staging datasets you should either grant schema creation rights to the DB user or point the staging dataset (named `<dataset_name>_staging`) at an existing schema. The staging schema must be pre-created the same way as above. See [staging dataset documentation](../staging.md#staging-dataset) for more details.
* `dev_mode` and dropping a dataset do not drop the Oracle schema (that would require `DROP USER`, a DBA privilege). dlt drops the tables inside the schema instead and leaves the schema (user) in place.


### Adapting destination for a dialect
Expand Down Expand Up @@ -424,14 +430,17 @@ register_dialect_capabilities("my_dialect", MyDialectCapabilities)

After registration, any pipeline using a `my_dialect://` connection URL will automatically use the custom capabilities. No additional configuration is needed.

The `DialectCapabilities` class supports four extension points:
The `DialectCapabilities` class supports these extension points:

| Method | Description |
| --- | --- |
| `adjust_capabilities` | Modify destination capabilities (identifier lengths, timestamp precision, sqlglot dialect, etc.) |
| `type_mapper_class` | Return a custom `DataTypeMapper` subclass for the dialect |
| `adapt_table` | Modify `sa.Table` objects before they are created or used for loading (e.g. reorder columns for StarRocks) |
| `is_undefined_relation` | Classify exceptions as "table/schema not found" errors for the dialect |
| `dataset_exists` | Decide whether a dataset (schema) exists from the schema names reported by the database (e.g. case-insensitive match on Oracle) |
| `create_dataset` | Create the dataset (schema), or skip/raise for dialects without a bare `CREATE SCHEMA` (e.g. Oracle) |
| `drop_dataset` | Drop the dataset (schema), or drop the tables within it for dialects that cannot drop schemas (e.g. Oracle) |

:::tip
Passing `type_mapper=` directly to `dlt.destinations.sqlalchemy()` always takes precedence over the registered dialect capabilities. Use direct passing for one-off overrides and registration for reusable dialect support.
Expand Down
105 changes: 104 additions & 1 deletion tests/load/sqlalchemy/test_sqlalchemy_dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"""

from typing import Optional, Set, Type
from unittest.mock import patch
from unittest.mock import MagicMock, patch

import pytest

Expand Down Expand Up @@ -211,6 +211,109 @@ def test_oracle_dialect_caps() -> None:
assert dc.is_undefined_relation(Exception("syntax error")) is None


def test_base_dataset_exists_is_case_sensitive() -> None:
"""The generic dataset_exists matches schema names exactly."""
dc = DialectCapabilities()
assert dc.dataset_exists(["my_schema", "other"], "my_schema") is True
# case must match for the generic implementation
assert dc.dataset_exists(["MY_SCHEMA"], "my_schema") is False
assert dc.dataset_exists(["my_schema"], "missing") is False


def test_oracle_dataset_exists_is_case_insensitive() -> None:
"""Oracle folds unquoted identifiers to upper case, so existence is matched case-insensitively."""
dc = OracleDialectCapabilities("oracle")
# Oracle reports schema/user names upper-cased; a lower-case dataset_name still matches
assert dc.dataset_exists(["MY_SCHEMA", "OTHER"], "my_schema") is True
assert dc.dataset_exists(["MY_SCHEMA"], "MY_SCHEMA") is True
assert dc.dataset_exists(["MY_SCHEMA"], "missing") is False


def test_base_create_dataset_emits_create_schema() -> None:
"""The generic create_dataset issues a CREATE SCHEMA statement."""
dc = DialectCapabilities("postgresql")
client = MagicMock()
client.dataset_name = "my_schema"

dc.create_dataset(client)

assert client.execute_sql.call_count == 1
stmt = client.execute_sql.call_args[0][0]
assert isinstance(stmt, sa.schema.CreateSchema)


def test_base_drop_dataset_emits_drop_schema() -> None:
"""The generic drop_dataset issues a DROP SCHEMA statement."""
dc = DialectCapabilities("postgresql")
client = MagicMock()
client.dataset_name = "my_schema"

dc.drop_dataset(client)

assert client.execute_sql.call_count == 1
stmt = client.execute_sql.call_args[0][0]
assert isinstance(stmt, sa.schema.DropSchema)


def test_oracle_create_dataset_raises_when_schema_missing() -> None:
"""Oracle cannot CREATE SCHEMA; a missing schema raises a clear terminal error and emits no DDL."""
dc = OracleDialectCapabilities("oracle")
client = MagicMock()
client.dataset_name = "MY_SCHEMA"
client.has_dataset.return_value = False

with pytest.raises(DatabaseTerminalException, match="does not exist and cannot be"):
dc.create_dataset(client)
# no CREATE SCHEMA must be emitted
client.execute_sql.assert_not_called()


def test_oracle_create_dataset_noop_when_schema_exists() -> None:
"""When the Oracle schema (user) already exists, create_dataset is a no-op."""
dc = OracleDialectCapabilities("oracle")
client = MagicMock()
client.dataset_name = "MY_SCHEMA"
client.has_dataset.return_value = True

dc.create_dataset(client)

client.execute_sql.assert_not_called()


def test_oracle_drop_dataset_drops_tables_not_schema() -> None:
"""Oracle cannot DROP SCHEMA; drop_dataset drops the tables within the schema instead."""
dc = OracleDialectCapabilities("oracle")
client = MagicMock()
client.dataset_name = "MY_SCHEMA"

inspector = MagicMock()
inspector.get_table_names.return_value = ["t1", "t2"]

with patch.object(sa, "inspect", return_value=inspector):
dc.drop_dataset(client)

inspector.get_table_names.assert_called_once_with(schema="MY_SCHEMA")
client.drop_tables.assert_called_once_with("t1", "t2")
# no DROP SCHEMA must be emitted
client.execute_sql.assert_not_called()


def test_oracle_drop_dataset_noop_when_empty() -> None:
"""drop_dataset does nothing when the Oracle schema has no tables."""
dc = OracleDialectCapabilities("oracle")
client = MagicMock()
client.dataset_name = "MY_SCHEMA"

inspector = MagicMock()
inspector.get_table_names.return_value = []

with patch.object(sa, "inspect", return_value=inspector):
dc.drop_dataset(client)

client.drop_tables.assert_not_called()
client.execute_sql.assert_not_called()


def test_sqlglot_dialect_explicit_mappings() -> None:
"""SQLGLOT_DIALECTS maps backend names that differ from their sqlglot dialect."""
for backend, expected in [
Expand Down