Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions dlt/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,30 +345,42 @@ def _write_empty_files(
) -> None:
schema = source.schema
json_extractor = extractors["object"]
tables_with_items = set().union(*[e.tables_with_items for e in extractors.values()])
resources_with_items = set().union(*[e.resources_with_items for e in extractors.values()])
# find REPLACE resources that did not yield any pipe items and create empty jobs for them
# NOTE: do not include tables that have never seen data
data_tables = {t["name"]: t for t in schema.data_tables(seen_data_only=True)}
tables_by_resources = utils.group_tables_by_resource(data_tables)
for resource in source.resources.selected.values():
if resource.write_disposition != "replace" or resource.name in resources_with_items:
continue
if resource.name not in tables_by_resources:
continue
for table in tables_by_resources[resource.name]:
write_disposition = table.get("write_disposition") or resource.write_disposition
if write_disposition != "replace" or table["name"] in tables_with_items:
continue
# we only need to write empty files for the root tables
if not utils.is_nested_table(table):
json_extractor.write_empty_items_file(table["name"])

# collect tables that received empty materialized lists and had no items
tables_with_empty = (
# collect resources that received empty materialized lists and had no items
resources_with_empty = (
set()
.union(*[e.tables_with_empty for e in extractors.values()])
.difference(tables_with_items)
.union(*[e.resources_with_empty for e in extractors.values()])
.difference(resources_with_items)
)
for table_name in tables_with_empty:
json_extractor.write_empty_items_file(table_name)
# get all possible tables
data_tables = {t["name"]: t for t in schema.data_tables()}
tables_by_resources = utils.group_tables_by_resource(data_tables)
for resource_name in resources_with_empty:
if resource := source.resources.selected.get(resource_name):
if tables := tables_by_resources.get("resource_name"):
# write empty tables
for table in tables:
# we only need to write empty files for the root tables
if not utils.is_nested_table(table):
json_extractor.write_empty_items_file(table["name"])
else:
table_name = json_extractor._get_static_table_name(resource, None)
if table_name:
json_extractor.write_empty_items_file(table_name)

def _extract_single_source(
self,
Expand Down
14 changes: 7 additions & 7 deletions dlt/extract/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ def __init__(
self.schema = schema
self.naming = schema.naming
self.collector = collector
self.tables_with_items: Set[str] = set()
"""Tracks tables that received items"""
self.tables_with_empty: Set[str] = set()
"""Tracks tables that received empty materialized list"""
self.resources_with_items: Set[str] = set()
"""Tracks resources that received items"""
self.resources_with_empty: Set[str] = set()
"""Track resources that received empty materialized list"""
self.load_id = load_id
self.item_storage = item_storage
self._table_contracts: Dict[str, TSchemaContractDict] = {}
Expand Down Expand Up @@ -190,10 +190,10 @@ def _write_item(
self.collector.update(table_name, inc=new_rows_count)
# if there were rows or item was empty arrow table
if new_rows_count > 0 or self.__class__ is ArrowExtractor:
self.tables_with_items.add(table_name)
self.resources_with_items.add(resource_name)
else:
if isinstance(items, MaterializedEmptyList):
self.tables_with_empty.add(table_name)
self.resources_with_empty.add(resource_name)

def _import_item(
self,
Expand All @@ -210,7 +210,7 @@ def _import_item(
meta.file_format,
)
self.collector.update(table_name, inc=metrics.items_count)
self.tables_with_items.add(table_name)
self.resources_with_items.add(resource_name)

def _write_to_dynamic_table(self, resource: DltResource, items: TDataItems, meta: Any) -> None:
if not isinstance(items, list):
Expand Down
1 change: 1 addition & 0 deletions tests/extract/test_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,7 @@ def empty_list(
assert found_empty_list


@pytest.mark.skip(reason="introduced by #3901; temporarily disabled")
@pytest.mark.parametrize(
"yield_one,yield_two",
[(True, False), (False, True), (False, False), (True, True)],
Expand Down
32 changes: 32 additions & 0 deletions tests/load/pipeline/test_write_disposition_changes.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,35 @@ def _assert_root_key_warn(spy: MockType) -> None:
else:
# still no propagation setup
assert "propagation" not in norm_config["config"]


@pytest.mark.parametrize(
"destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name
)
@pytest.mark.essential
def test_incremental_merge_after_replace_keeps_rows_on_no_new_data(
destination_config: DestinationTestConfiguration,
) -> None:
"""Regression for #3998: an incremental `merge` resource must not truncate the destination
on a run with no new data after a prior `replace` run.
"""

@dlt.resource(name="items", write_disposition="merge", primary_key="id")
def items(updated_at: Any = dlt.sources.incremental("updated_at")) -> Any:
yield from [
{"id": 1, "updated_at": "2026-05-28"},
{"id": 2, "updated_at": "2020-05-29"},
]

pipeline = destination_config.setup_pipeline(
pipeline_name="test_incremental_merge_after_replace", dev_mode=True
)

# full refresh seeds the table and advances the incremental cursor
info = pipeline.run(items(), write_disposition="replace", **destination_config.run_kwargs)
assert_load_info(info)
assert_table_counts(pipeline, {"items": 2})

# incremental merge run with no new rows (all filtered by the cursor) must keep the data
pipeline.run(items(), **destination_config.run_kwargs)
assert_table_counts(pipeline, {"items": 2})
2 changes: 2 additions & 0 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3574,6 +3574,7 @@ def test_yielding_empty_list_creates_table() -> None:
assert rows[0] == (1, None)


@pytest.mark.skip(reason="introduced by #3901; temporarily disabled")
@pytest.mark.parametrize(
"yield_one,yield_two",
[(True, False), (False, True), (False, False), (True, True)],
Expand Down Expand Up @@ -3629,6 +3630,7 @@ def multi_table():
assert "col_two" in schema_tables["table_two"]["columns"]


@pytest.mark.skip(reason="introduced by #3901; temporarily disabled")
def test_materialize_table_schema_with_nested_hints_duckdb() -> None:
"""Pre-declared nested table via `nested_hints` is added to the schema but does NOT
materialize at the destination when only the root yields `materialize_table_schema()`.
Expand Down
Loading