Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ TEST_COMMON_CORE_PATHS = \
tests/load/test_dummy_client.py \
tests/extract/test_extract.py \
tests/extract/test_sources.py \
tests/pipeline/test_pipeline_state.py
tests/pipeline/test_pipeline_state.py \
--ignore tests/normalize/test_normalize_arrow.py

test-common-core:
$(call RUN_XDIST_SAFE_SPLIT,$(TEST_COMMON_CORE_PATHS))
Expand Down Expand Up @@ -257,7 +258,7 @@ test-pipeline-min:
install-pipeline-arrow:
uv sync $(UV_SYNC_ARGS) --extra duckdb --extra cli --extra parquet

TEST_PIPELINE_ARROW_PATHS = tests/pipeline/test_pipeline_extra.py
TEST_PIPELINE_ARROW_PATHS = tests/pipeline/test_pipeline_extra.py tests/normalize/test_normalize_arrow.py

test-pipeline-arrow: PYTEST_TARGET_ARGS = -k arrow
test-pipeline-arrow:
Expand Down
2 changes: 2 additions & 0 deletions dlt/common/destination/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ def update_stored_schema(
self,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
force: bool = False,
) -> Optional[TSchemaTables]:
"""Updates storage to the current schema.

Expand All @@ -605,6 +606,7 @@ def update_stored_schema(
Args:
only_tables (Sequence[str], optional): Updates only listed tables. Defaults to None.
expected_update (TSchemaTables, optional): Update that is expected to be applied to the destination
force (bool): force full schema migration regardless of previous updates
Returns:
Optional[TSchemaTables]: Returns an update that was applied at the destination.
"""
Expand Down
2 changes: 2 additions & 0 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
TColumnNames,
TypedDict,
get_args,
NotRequired,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -367,6 +368,7 @@ class _TTableSchemaBase(TTableProcessingHints, total=False):
resource: Optional[str]
table_format: Optional[TTableFormat]
file_format: Optional[TFileFormat]
variant_name: NotRequired[str]


class TTableSchema(_TTableSchemaBase, total=False):
Expand Down
13 changes: 9 additions & 4 deletions dlt/common/storages/data_item_storage.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Any, List
from typing import Dict, Any, List, Tuple
from abc import ABC, abstractmethod

from dlt.common import logger
Expand All @@ -22,9 +22,7 @@ def __init__(self, writer_spec: FileWriterSpec, *args: Any) -> None:
def _get_writer(
self, load_id: str, schema_name: str, table_name: str
) -> BufferedDataWriter[DataWriter]:
# unique writer id
writer_id = f"{load_id}.{schema_name}.{table_name}"
writer = self.buffered_writers.get(writer_id, None)
writer_id, writer = self.get_active_writer(load_id, schema_name, table_name)
if not writer:
# assign a writer for each table
kwargs = {}
Expand All @@ -35,6 +33,13 @@ def _get_writer(
self.buffered_writers[writer_id] = writer
return writer

def get_active_writer(
self, load_id: str, schema_name: str, table_name: str
) -> Tuple[str, BufferedDataWriter[DataWriter]]:
# unique writer id
writer_id = f"{load_id}.{schema_name}.{table_name}"
return writer_id, self.buffered_writers.get(writer_id, None)

def write_data_item(
self,
load_id: str,
Expand Down
32 changes: 32 additions & 0 deletions dlt/common/storages/load_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,38 @@ def remove_completed_jobs(self, load_id: str) -> None:
recursively=True,
)

def is_empty_package(self, load_id: str) -> bool:
"""Package is empty if it does not contain any jobs or refresh commands (tables to
truncate / drop) in package state. A package that is being processed (applied schema
update already written) is never considered empty."""
applied_schema_update_file = os.path.join(
self.get_package_path(load_id), PackageStorage.APPLIED_SCHEMA_UPDATES_FILE_NAME
)
if self.storage.has_file(applied_schema_update_file):
return False
package_state = self.get_load_package_state(load_id)
dropped_tables = package_state.get("dropped_tables", [])
truncated_tables = package_state.get("truncated_tables", [])
return (
len(dropped_tables) == 0
and len(truncated_tables) == 0
and len(self.list_new_jobs(load_id)) == 0
)

def get_schema_update_file(self, load_id: str) -> Optional[TSchemaTables]:
"""Reads the update file from load package `load_id` and returns its content.
Returns none if update file is already processed
"""
package_path = self.get_package_path(load_id)
if not self.storage.has_folder(package_path):
raise FileNotFoundError(package_path)
schema_update_file = os.path.join(package_path, PackageStorage.SCHEMA_UPDATES_FILE_NAME)
if self.storage.has_file(schema_update_file):
schema_update: TSchemaTables = json.loads(self.storage.load(schema_update_file))
return schema_update
else:
return None

def delete_package(self, load_id: str, not_exists_ok: bool = False) -> None:
package_path = self.get_package_path(load_id)
if not self.storage.has_folder(package_path):
Expand Down
13 changes: 1 addition & 12 deletions dlt/common/storages/load_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,7 @@ def list_failed_jobs_in_loaded_package(self, load_id: str) -> Sequence[LoadJobIn
return self.loaded_packages.list_failed_jobs_infos(load_id)

def begin_schema_update(self, load_id: str) -> Optional[TSchemaTables]:
"""Reads the update file from load package `load_id` and returns its content.
Returns none if update file is already processed (deleted in commit_schema_update)
"""
package_path = self.get_normalized_package_path(load_id)
if not self.storage.has_folder(package_path):
raise FileNotFoundError(package_path)
schema_update_file = join(package_path, PackageStorage.SCHEMA_UPDATES_FILE_NAME)
if self.storage.has_file(schema_update_file):
schema_update: TSchemaTables = json.loads(self.storage.load(schema_update_file))
return schema_update
else:
return None
return self.normalized_packages.get_schema_update_file(load_id)

def commit_schema_update(self, load_id: str, applied_update: TSchemaTables) -> None:
"""Marks schema update as processed by removing schema update file and saving the applied
Expand Down
5 changes: 4 additions & 1 deletion dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,11 @@ def update_stored_schema(
self,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
force: bool = False,
) -> Optional[TSchemaTables]:
applied_update = super().update_stored_schema(only_tables, expected_update=expected_update)
applied_update = super().update_stored_schema(
only_tables, expected_update=expected_update, force=force
)
# here we could apply tags only if any migration happened, right now we do it on each run
# NOTE: tags are applied before any data is loaded
if (
Expand Down
3 changes: 2 additions & 1 deletion dlt/destinations/impl/destination/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ def update_stored_schema(
self,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
force: bool = False,
) -> Optional[TSchemaTables]:
return super().update_stored_schema(only_tables, expected_update)
return super().update_stored_schema(only_tables, expected_update, force)

def create_load_job(
self, table: PreparedTableSchema, file_path: str, load_id: str, restore: bool = False
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/duckdb/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,9 @@ def create_views_for_tables(self, tables: Dict[str, str]) -> None:
``UNION ALL BY NAME``.
"""
existing_tables = set(tname[0] for tname in self._conn.execute("SHOW TABLES").fetchall())

# TODO: existing table schemas and sql statements can be cached so we do not have to recompute everything
# with every query
tables_with_data: set[str] = set()
for s in self.schemas.values():
tables_with_data.update(s.dlt_table_names())
Expand Down Expand Up @@ -691,8 +694,8 @@ def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DB
if not table.this:
continue
schema = table.db
# add only tables from the dataset schema
if schema or schema.lower() != self.dataset_name.lower():
# add only tables that do not have schema prefix or schema prefix is actual dataset
if not schema or schema.lower() == self.dataset_name.lower():
load_tables[table.name] = table.name

if load_tables:
Expand Down
3 changes: 2 additions & 1 deletion dlt/destinations/impl/dummy/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ def update_stored_schema(
self,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
force: bool = False,
) -> Optional[TSchemaTables]:
applied_update = super().update_stored_schema(only_tables, expected_update)
applied_update = super().update_stored_schema(only_tables, expected_update, force)
if self.config.fail_schema_update:
raise DestinationTransientException(
"Raise on schema update due to `fail_schema_update` config flag"
Expand Down
15 changes: 12 additions & 3 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -823,25 +823,34 @@ def update_stored_schema(
self,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
force: bool = False,
) -> TSchemaTables:
applied_update = super().update_stored_schema(only_tables, expected_update)
applied_update = super().update_stored_schema(only_tables, expected_update, force)

# don't store schema when used as staging
if not self.config.as_staging_destination:
# check if schema with hash exists
current_hash = self.schema.stored_version_hash
if not self._get_stored_schema_by_hash_or_newest(current_hash):
stored = self._get_stored_schema_by_hash_or_newest(current_hash)
if not stored or force:
logger.info(
f"Schema with hash {self.schema.stored_version_hash} not found in the storage."
" upgrading"
if not stored
else (
f"Schema with hash {self.schema.stored_version_hash} found in storage but"
" update is enforced (tables to truncate/drop), ensuring table dirs"
)
)
# create destination dirs for all tables
# TODO: find only tables with changes
table_names = only_tables or self.schema.tables.keys()
dirs_to_create = self.get_table_dirs(table_names)
for _, directory in zip(table_names, dirs_to_create):
self.fs_client.makedirs(directory, exist_ok=True)
self._update_schema_in_storage(self.schema)
# do not write a duplicate schema file when the hash is already stored
if not stored:
self._update_schema_in_storage(self.schema)

# we assume that expected_update == applied_update so table schemas in dest were not
# externally changed
Expand Down
8 changes: 8 additions & 0 deletions dlt/destinations/impl/lance/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,10 @@ class LanceClientConfiguration(WithLocalFiles, DestinationClientDwhConfiguration
destination_type: Final[str] = dataclasses.field( # type: ignore
default="lance", init=False, repr=False, compare=False
)
# dataset_name is optional: when not set tables are created in the root namespace
dataset_name: Final[Optional[str]] = dataclasses.field( # type: ignore
default=None, init=False, repr=False, compare=False
)
catalog_type: LanceCatalogType = "dir"

CATALOG_CREDENTIALS: ClassVar[Dict[LanceCatalogType, Any]] = {
Expand All @@ -293,6 +297,10 @@ class LanceClientConfiguration(WithLocalFiles, DestinationClientDwhConfiguration
"""Name of branch to use for read/write table operations. Uses `main` branch if not set."""
embeddings: Optional[LanceEmbeddingsConfiguration] = None
"""Optional embeddings configuration to add a vector embedding column."""
always_refresh_views: bool = False
"""Recreate the duckdb scanner views on each `dataset()` read. New rows are visible without
this (lance reads the latest dataset version on each scan); enable it to also pick up schema
changes (new columns) through an already-open connection."""

@property
def storage_options(self) -> Optional[Dict[str, str]]:
Expand Down
4 changes: 4 additions & 0 deletions dlt/destinations/impl/lance/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any, List

from dlt.common.destination.exceptions import (
DestinationException,
DestinationUndefinedEntity,
DestinationTerminalException,
DestinationTransientException,
Expand Down Expand Up @@ -48,6 +49,9 @@ def raise_destination_error(f: TFun) -> TFun:
def _wrap(self: JobClientBase, *args: Any, **kwargs: Any) -> Any:
try:
return f(self, *args, **kwargs)
except DestinationException:
# already converted (eg. raised by a nested decorated call)
raise
except Exception as e:
if is_lance_undefined_entity_exception(e):
raise DestinationUndefinedEntity(e) from e
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/lance/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_ddl_transactions = False

caps.decimal_precision = (38, 18)
caps.wei_precision = (38, 0)
caps.timestamp_precision = 6
caps.supported_replace_strategies = ["truncate-and-insert"]

Expand Down
Loading
Loading