Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion dlt/common/destination/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,38 @@ class DestinationClientConfiguration(BaseConfiguration):

__recommended_sections__: ClassVar[Sequence[str]] = (known_sections.DESTINATION, "")

def physical_location(self) -> str:
"""Returns a non-secret physical location identity, or "" when unavailable."""
return ""

# TODO: If we ever clean up fingerprinting across all destinations, consider making
# the default `digest128(self.physical_location())`. This will break telemetry
# semantics, so it must be a deliberate cutover.
def fingerprint(self) -> str:
Comment thread
rudolfix marked this conversation as resolved.
"""Returns a destination fingerprint which is a hash of selected configuration fields. ie. host in case of connection string"""
"""Returns a destination fingerprint derived from selected configuration fields."""
return ""

def can_read_from(self, other: "DestinationClientConfiguration") -> bool:
"""Returns True if `self` can read data from `other`.
In case of SQL engines it is an ability to SELECT / JOIN
"""
if not isinstance(other, DestinationClientConfiguration):
return False
if self.destination_type != other.destination_type:
return False
self_loc = self.physical_location()
other_loc = other.physical_location()
if self_loc and other_loc and self_loc == other_loc:
return True
return False

def can_write_from(self, other: "DestinationClientConfiguration") -> bool:
"""Returns true if `self` can write data from `other`
In case of SQL engines it is an ability to INSERT FROM
"""
# in most destinations, ability to read is also the same as abilty to write
return self.can_read_from(other)

def __str__(self) -> str:
"""Return displayable destination location"""
return str(self.credentials)
Expand Down
14 changes: 8 additions & 6 deletions dlt/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from dlt.common.destination.client import JobClientBase, SupportsOpenTables, WithStateSync
from dlt.common.schema import Schema
from dlt.common.typing import Self
from dlt.common.warnings import Dlt100DeprecationWarning, deprecated
from dlt.common.schema.typing import (
C_DLT_LOAD_ID,
C_DLT_LOADS_TABLE_LOAD_ID,
Expand Down Expand Up @@ -497,13 +498,14 @@ def get_dataset_sql_client(dataset: dlt.Dataset) -> SqlClientBase[Any]:
raise SqlClientNotAvailable("dataset", dataset.dataset_name, client.config.destination_type)


@deprecated(
"Use `destination_client.config.can_join_with(other.destination_client.config)` instead.",
category=Dlt100DeprecationWarning,
stacklevel=2,
)
def is_same_physical_destination(dataset1: dlt.Dataset, dataset2: dlt.Dataset) -> bool:
Comment thread
Travior marked this conversation as resolved.
"""Check if both datasets are at the same physical destination.

This is done by comparing the fingerprint of both destination configs. There
are potential false positive if two different config give access to the same destination.
"""
return str(dataset1.destination_client.config) == str(dataset2.destination_client.config)
"""Check if both datasets are at the same physical destination."""
return dataset1.destination_client.config.can_read_from(dataset2.destination_client.config)


def _get_dataset_schema_from_destination_using_schema_name(
Expand Down
20 changes: 20 additions & 0 deletions dlt/destinations/impl/athena/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dlt.common.configuration import configspec
from dlt.common.destination.client import DestinationClientDwhWithStagingConfiguration
from dlt.common.configuration.specs import AwsCredentials
from dlt.common.utils import digest128
from dlt.destinations.impl.athena.utils import is_s3_tables_catalog


Expand Down Expand Up @@ -60,6 +61,25 @@ def to_connector_params(self, use_catalog_name: bool = True) -> Dict[str, Any]:
def _is_s3_tables_catalog(self) -> bool:
return is_s3_tables_catalog(self.aws_data_catalog)

def physical_location(self) -> str:
"""Returns region/catalog, or "" when region is unavailable."""
# athena catalog names are case-insensitive, AWS docs spell the default `AwsDataCatalog`
catalog = (self.aws_data_catalog or DEFAULT_AWS_DATA_CATALOG).lower()
region = None
if self.credentials:
region = self.credentials.region_name

if region:
return f"{region}/{catalog}"
return ""

def fingerprint(self) -> str:
"""Returns a fingerprint of the physical Athena location."""
physical_location = self.physical_location()
if physical_location:
return digest128(physical_location)
return ""

def __str__(self) -> str:
"""Return displayable destination location"""
if self.staging_config:
Expand Down
18 changes: 13 additions & 5 deletions dlt/destinations/impl/bigquery/configuration.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import dataclasses
from typing import ClassVar, List, Final, Optional, Union
from typing import ClassVar, List, Optional, Union

from dlt.common.configuration import configspec
from dlt.common.configuration.specs import GcpServiceAccountCredentials, GcpOAuthCredentials
from dlt.common.utils import digest128

from dlt.common.destination.client import DestinationClientDwhWithStagingConfiguration
from dlt.common.utils import digest128


@configspec
class BigQueryClientConfiguration(DestinationClientDwhWithStagingConfiguration):
destination_type: Final[str] = dataclasses.field(default="bigquery", init=False, repr=False, compare=False) # type: ignore
destination_type: str = dataclasses.field(
default="bigquery", init=False, repr=False, compare=False
)
credentials: Union[GcpServiceAccountCredentials, GcpOAuthCredentials] = None
location: str = "US"
project_id: Optional[str] = None
Expand Down Expand Up @@ -39,7 +40,14 @@ def get_location(self) -> str:
return self.location

def fingerprint(self) -> str:
"""Returns a fingerprint of project_id"""
"""Returns a fingerprint of the credentials project id."""
if self.credentials and self.credentials.project_id:
return digest128(self.credentials.project_id)
return ""

def physical_location(self) -> str:
"""Returns configured project id, falling back to credentials."""
project_id = self.project_id
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked again and bigquery can join based on location. so here just return self.location.
https://docs.cloud.google.com/bigquery/docs/locations#specify_locations

so now it looks like self.location() is just right! and we can adopt this name everywhere like in the original ticket.

if not project_id and self.credentials:
project_id = self.credentials.project_id
return project_id or ""
8 changes: 7 additions & 1 deletion dlt/destinations/impl/clickhouse/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,13 @@ class ClickHouseClientConfiguration(DestinationClientDwhWithStagingConfiguration
]

def fingerprint(self) -> str:
"""Returns a fingerprint of the host part of a connection string."""
"""Returns a fingerprint of the configured host."""
if self.credentials and self.credentials.host:
return digest128(self.credentials.host)
return ""

def physical_location(self) -> str:
"""Returns host:port."""
if self.credentials and self.credentials.host:
return f"{self.credentials.host}:{self.credentials.port}"
return ""
15 changes: 11 additions & 4 deletions dlt/destinations/impl/databricks/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import dataclasses
from copy import deepcopy
from typing import TYPE_CHECKING, Any, Callable, ClassVar, Dict, Final, List, Optional, Union, cast
from typing import TYPE_CHECKING, Any, Callable, ClassVar, Dict, List, Optional, Union, cast
from urllib.parse import urlparse

from dlt.common import logger
Expand All @@ -20,7 +20,6 @@
if TYPE_CHECKING:
from zerobus import ArrowStreamConfigurationOptions, IPCCompression


DATABRICKS_APPLICATION_ID = "dltHub_dlt"
DEFAULT_DATABRICKS_INSERT_API: TDatabricksInsertApi = "copy_into"
# ZSTD was fastest in my benchmarks out of the three `ipc_compression` options
Expand Down Expand Up @@ -237,7 +236,9 @@ def _coerce_ipc_compression(ipc_compression: Union[str, IPCCompression]) -> IPCC

@configspec
class DatabricksClientConfiguration(DestinationClientDwhWithStagingConfiguration):
destination_type: Final[str] = dataclasses.field(default="databricks", init=False, repr=False, compare=False) # type: ignore[misc]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this "Final" makes sense even with mypy complaining. you removed it in several other places.

destination_type: str = dataclasses.field(
default="databricks", init=False, repr=False, compare=False
)
credentials: DatabricksCredentials = None
staging_credentials_name: Optional[str] = None
"If set, credentials with given name will be used in copy command"
Expand Down Expand Up @@ -287,7 +288,13 @@ def on_resolved(self) -> None:
)

def fingerprint(self) -> str:
"""Returns a fingerprint of host part of a connection string"""
"""Returns a fingerprint of the server hostname."""
if self.credentials and self.credentials.server_hostname:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could reuse physical_location

return digest128(self.credentials.server_hostname)
return ""

def physical_location(self) -> str:
"""Returns the server hostname."""
if self.credentials and self.credentials.server_hostname:
return self.credentials.server_hostname
return ""
14 changes: 11 additions & 3 deletions dlt/destinations/impl/dremio/configuration.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import dataclasses
from typing import Final, Optional, Any, Dict, ClassVar, List
from typing import Optional, Any, Dict, ClassVar, List

from dlt.common.configuration import configspec
from dlt.common.configuration.specs import ConnectionStringCredentials
Expand Down Expand Up @@ -32,13 +32,21 @@ def db_kwargs(self) -> Dict[str, Any]:

@configspec
class DremioClientConfiguration(DestinationClientDwhWithStagingConfiguration):
destination_type: Final[str] = dataclasses.field(default="dremio", init=False, repr=False, compare=False) # type: ignore[misc]
destination_type: str = dataclasses.field(
default="dremio", init=False, repr=False, compare=False
)
credentials: DremioCredentials = None
staging_data_source: str = None
"""The name of the staging data source"""

def fingerprint(self) -> str:
"""Returns a fingerprint of host part of a connection string"""
"""Returns a fingerprint of the configured host."""
if self.credentials and self.credentials.host:
return digest128(self.credentials.host)
return ""

def physical_location(self) -> str:
"""Returns host:port."""
if self.credentials and self.credentials.host:
return f"{self.credentials.host}:{self.credentials.port}"
return ""
6 changes: 6 additions & 0 deletions dlt/destinations/impl/duckdb/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,5 +318,11 @@ def __init__(
)
self.create_indexes = create_indexes

def physical_location(self) -> str:
"""Returns the database file path or ':memory:'."""
if self.credentials and self.credentials.database:
Comment thread
rudolfix marked this conversation as resolved.
return self.credentials.database
return ""

def on_resolved(self) -> None:
self.credentials.database = self.make_location(self.credentials.database, DUCK_DB_NAME_PAT)
38 changes: 35 additions & 3 deletions dlt/destinations/impl/ducklake/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
FilesystemConfigurationWithLocalFiles,
WithLocalFiles,
)
from dlt.common.utils import digest128
from dlt.destinations.impl.duckdb.configuration import DuckDbConnectionPool, DuckDbBaseCredentials
from dlt.destinations.impl.duckdb.factory import _set_duckdb_raw_capabilities

Expand Down Expand Up @@ -144,11 +143,44 @@ class DuckLakeClientConfiguration(WithLocalFiles, DestinationClientDwhWithStagin
"""When true, attaches with `AUTOMATIC_MIGRATION true` so DuckDB migrates an older DuckLake catalog schema on attach."""

def fingerprint(self) -> str:
"""Use fingerprint of underlying storage. This is precise to bucket level"""
if self.credentials.storage is None:
"""Returns a fingerprint of the underlying storage."""
if not self.credentials or self.credentials.storage is None:
return ""
return self.credentials.storage.fingerprint()

def physical_location(self) -> str:
"""Returns credential-free catalog identity which locates the ducklake."""
if not self.credentials or not self.credentials.catalog:
return ""

catalog = self.credentials.catalog
drivername = catalog.drivername or ""
# attach statement converts `postgresql` to duckdb-known `postgres`
if drivername == "postgresql":
drivername = "postgres"

# TODO: motherduck catalog has no non-secret account identity
if drivername == "md":
return ""

# file catalogs: the database file is the lake, attach name is just an alias
if drivername in ("duckdb", "sqlite"):
if catalog.database:
return f"{drivername}://{catalog.database}"
return ""

# sql catalogs host one lake per metadata schema which defaults to ducklake name
if catalog.host and catalog.database:
metadata_schema = (
self.credentials.metadata_schema
or self.credentials.ducklake_name
or DEFAULT_DUCKLAKE_NAME
)
# NOTE: ports must be specified (or not) consistently across configs to match
port_str = f":{catalog.port}" if catalog.port else ""
return f"{drivername}://{catalog.host}{port_str}/{catalog.database}#{metadata_schema}"
return ""

def on_resolved(self) -> None:
# redirect local catalog database file to `local_dir`
if self.credentials.catalog.drivername in ("duckdb", "sqlite"):
Expand Down
15 changes: 15 additions & 0 deletions dlt/destinations/impl/fabric/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dlt.common.configuration.specs import AzureServicePrincipalCredentials
from dlt.common.destination.client import DestinationClientDwhWithStagingConfiguration
from dlt.common.exceptions import MissingDependencyException
from dlt.common.utils import digest128
from dlt import version

_AZURE_STORAGE_EXTRA = f"{version.DLT_PKG_NAME}[az]"
Expand Down Expand Up @@ -165,5 +166,19 @@ class FabricClientConfiguration(DestinationClientDwhWithStagingConfiguration):
Both have UTF-8 encoding. LongAsMax=yes is automatically configured.
"""

def physical_location(self) -> str:
"""Returns host:port."""
if self.credentials and self.credentials.host:
port = self.credentials.port or 1433
return f"{self.credentials.host}:{port}"
return ""

def fingerprint(self) -> str:
"""Returns a fingerprint of the physical Fabric location."""
physical_location = self.physical_location()
if physical_location:
return digest128(physical_location)
return ""


__all__ = ["FabricCredentials", "FabricClientConfiguration"]
37 changes: 34 additions & 3 deletions dlt/destinations/impl/filesystem/configuration.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import dataclasses

import os
from typing import Dict, Final, Optional, Type
from typing import Dict, Optional, Type
from urllib.parse import urlparse

from dlt.common.typing import DictStrAny, DictStrOptionalStr

Expand All @@ -10,6 +11,7 @@
from dlt.common.configuration.specs.hf_credentials import HfCredentials
from dlt.common.destination.client import (
CredentialsConfiguration,
DestinationClientConfiguration,
DestinationClientStagingConfiguration,
)
from dlt.common.storages import FilesystemConfigurationWithLocalFiles
Expand All @@ -19,8 +21,10 @@


@configspec
class FilesystemDestinationClientConfiguration(FilesystemConfigurationWithLocalFiles, DestinationClientStagingConfiguration): # type: ignore[misc]
destination_type: Final[str] = dataclasses.field( # type: ignore[misc]
class FilesystemDestinationClientConfiguration( # type: ignore[misc]
FilesystemConfigurationWithLocalFiles, DestinationClientStagingConfiguration
):
destination_type: str = dataclasses.field(
default="filesystem", init=False, repr=False, compare=False
)
current_datetime: Optional[TCurrentDateTime] = None
Expand All @@ -44,6 +48,33 @@ class FilesystemDestinationClientConfiguration(FilesystemConfigurationWithLocalF
def resolve_credentials_type(self) -> Type[CredentialsConfiguration]:
return super().resolve_credentials_type()

def physical_location(self) -> str:
"""Returns scheme://netloc for remote filesystems, or the absolute local path."""
if not self.bucket_url:
return ""

if self.is_local_path(self.bucket_url):
return self.make_local_path(self.make_file_url(self.bucket_url))

url = urlparse(self.bucket_url)
if url.scheme == "file":
return self.make_local_path(self.bucket_url)
return f"{url.scheme}://{url.netloc}"

def can_write_from(self, other: DestinationClientConfiguration) -> bool:
"""Filesystem does not have an engine that can write. `dlt` is that engine,
and setting False here we enforce it's usage
"""
return False

def can_read_from(self, other: DestinationClientConfiguration) -> bool:
# filesystem tables are queried through a local engine (e.g. DuckDB) that
# can access multiple storage backends in a single query, so join
# compatibility is determined by the engine, not by the storage location.

# until auto ATTACH is implemented, storage location must be used
return super().can_read_from(other)

def on_resolved(self) -> None:
# Validate layout and show unused placeholders
_, layout_placeholders = check_layout(self.layout, self.extra_placeholders)
Expand Down
Loading
Loading