Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions dlt/common/libs/narwhals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any

import narwhals
from narwhals.typing import IntoDataFrame

if TYPE_CHECKING:
from dlt.common.libs.pyarrow import pyarrow


def df_to_arrow(df: IntoDataFrame) -> pyarrow.Table:
Comment thread
zilto marked this conversation as resolved.
Outdated
"""Converts any narwhals-compatible eager or lazy frame to a pyarrow table.
lazy frames are eagerly collected.
"""
nw_df = narwhals.from_native(df, allow_series=False)
if isinstance(nw_df, narwhals.LazyFrame):
nw_df = nw_df.collect()

return nw_df.to_arrow()
36 changes: 16 additions & 20 deletions dlt/extract/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
DestinationCapabilitiesContext,
adjust_schema_to_capabilities,
)
from dlt.common.libs import is_pandas_frame, is_polars_frame
from dlt.common.libs import is_arrow_object
from dlt.common.libs.narwhals import df_to_arrow
from dlt.common.metrics import DataWriterMetrics
from dlt.common.runtime.collector import Collector, NULL_COLLECTOR
from dlt.common.typing import TDataItems, TDataItem, TLoaderFileFormat
Expand All @@ -35,19 +36,6 @@
from dlt.common.libs.pyarrow import pyarrow as pa, TAnyArrowItem


def _to_arrow_table(item: Any) -> Any:
"""Convert a pandas or polars frame to a pyarrow Table; pass arrow items through."""
if is_pandas_frame(item):
from dlt.common.libs.pandas import pandas_to_arrow

return pandas_to_arrow(item)
if is_polars_frame(item):
from dlt.common.libs.polars import polars_to_arrow
Comment on lines -44 to -45
Copy link
Copy Markdown

@FBruzzesi FBruzzesi May 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice that unlike is_polars_frame, narwhals.dependencies.is_into_dataframe returns False for a polars LazyFrame (and we don't have a lazy equivalent of is_into_dataframe - we can consider adding it to be honest, @MarcoGorelli 👀)

Copy link
Copy Markdown
Collaborator Author

@zilto zilto May 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks to you and Marco for the reviews! It's saving us a lot of time actually.

@FBruzzesi it would be nice to have a type guard to that catches LazyFrame. Though, it doesn't block this PR. This polars support hasn't reach master branch and isn't released yet

@rudolfix IMO, the "eager arrow code path" shouldn't support lazyframes and raise "Received LazyFrame, call .collect() inside your @dlt.resource". Currently, we're hiding an expensive operation for minimal convenience. AFAIK, we're not returning LazyFrame objects back for downstream @dlt.transformer.

Lazy objects should be supported via the "lazy model code path" where we have Ibis expressions, SQLGlot, etc. for now.

Hopefully, we can unify both by implementing load package preparations and incremental logic via Narwhals

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think calling collect on behalf of your users would only be justified if you were doing a lot of operations lazily before reaching collect which would be difficult for the user to do outside of dlt

instead of calling collect first thing, a clear error message like your one looks like a nice solution 👍


return polars_to_arrow(item)
return item


class MaterializedEmptyList(List[Any]):
"""A list variant that will materialize tables even if empty list was yielded"""

Expand Down Expand Up @@ -384,14 +372,22 @@ def _retrieve_normalize_config(self) -> ItemsNormalizerConfiguration:
)

def write_items(self, resource: DltResource, items: TDataItems, meta: Any) -> None:
static_table_name = self._get_static_table_name(resource, meta)

items_list = items if isinstance(items, list) else [items]
items = []
for item in items_list:
if not is_arrow_object(item):
try:
item = df_to_arrow(item)
except TypeError:
raise TypeError(
f"Received unsupported type `{type(item)}`. Not supported by pyarrow nor"
" narwhals."
)

items.append(self._apply_contract_filters(item, resource, static_table_name))

static_table_name = self._get_static_table_name(resource, meta)
items = [
# 2. remove columns and rows in data contract filters
self._apply_contract_filters(_to_arrow_table(item), resource, static_table_name)
for item in items_list
]
super().write_items(resource, items, meta)

def _write_to_static_table(
Expand Down
5 changes: 3 additions & 2 deletions dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
from dlt.common import logger
from dlt.common.data_types.typing import TDataType
from dlt.common.exceptions import ValueErrorWithKnownValues
from dlt.common.libs import is_arrow_object, is_pandas_frame, is_polars_frame
from dlt.common.libs import is_arrow_object
from dlt.common.libs.narwhals import narwhals
from dlt.common.jsonpath import compile_path, extract_simple_field_name
from dlt.common.typing import (
TDataItem,
Expand Down Expand Up @@ -644,7 +645,7 @@ def _get_transform(self, items: TDataItems) -> IncrementalTransform:
"""Gets transform implementation that handles particular data item type"""
# assume list is all of the same type
for item in items if isinstance(items, list) else [items]:
if is_arrow_object(item) or is_pandas_frame(item) or is_polars_frame(item):
if is_arrow_object(item) or narwhals.dependencies.is_into_dataframe(item):
return self._make_or_get_transformer(ArrowIncremental)
return self._make_or_get_transformer(JsonIncremental)
return self._make_or_get_transformer(JsonIncremental)
Expand Down
16 changes: 3 additions & 13 deletions dlt/extract/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,8 @@
from functools import wraps

from dlt.common.data_writers import TDataItemFormat
from dlt.common.libs import (
get_pandas_module,
get_polars_module,
get_pyarrow_module,
get_pydantic_module,
is_arrow_object,
is_pandas_frame,
is_polars_frame,
)
from dlt.common.libs import get_pydantic_module, is_arrow_object
from dlt.common.libs.narwhals import narwhals
from dlt.common.reflection.inspect import isgeneratorfunction
from dlt.common.schema.typing import TAnySchemaColumns, TTableSchemaColumns
from dlt.common.schema.utils import normalize_schema_name
Expand Down Expand Up @@ -63,14 +56,11 @@ def get_data_item_format(items: TDataItems) -> TDataItemFormat:
if isinstance(items, Relation):
return "model"

if get_pyarrow_module() is None and get_pandas_module() is None and get_polars_module() is None:
return "object"

# Assume all items in list are the same type
try:
if isinstance(items, list):
items = items[0]
if is_arrow_object(items) or is_pandas_frame(items) or is_polars_frame(items):
if is_arrow_object(items) or narwhals.dependencies.is_into_dataframe(items):
return "arrow"
except IndexError:
pass
Expand Down
5 changes: 3 additions & 2 deletions dlt/extract/wrappers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Any

from dlt.common.libs import is_arrow_object, is_pandas_frame, is_polars_frame
from dlt.common.libs import is_arrow_object
from dlt.common.libs.narwhals import narwhals


def wrap_additional_type(data: Any) -> Any:
Expand All @@ -9,7 +10,7 @@ def wrap_additional_type(data: Any) -> Any:
if data is None:
return data

if is_arrow_object(data) or is_pandas_frame(data) or is_polars_frame(data):
if is_arrow_object(data) or narwhals.dependencies.is_into_dataframe(data):
return [data]

return data
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ dependencies = [
"sqlglot>=25.4.0",
"pywin32>=306 ; sys_platform == 'win32'",
"rich-argparse>=1.6.0",
"narwhals>=2.20.0",
]

[project.optional-dependencies]
Expand Down
64 changes: 8 additions & 56 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading