Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 68 additions & 29 deletions src/databricks/labs/dqx/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@
from databricks.labs.dqx.errors import InvalidCheckError, InvalidConfigError, InvalidParameterError
from databricks.labs.dqx.utils import list_tables, safe_strip_file_from_path, resolve_variables, VariableValue
from databricks.labs.dqx.io import is_one_time_trigger

from .semantic_validator import SemanticValidator, SemanticValidationMode
Comment thread
mwojtyczka marked this conversation as resolved.
Outdated
logger = logging.getLogger(__name__)


class DQEngineCore(DQEngineCoreBase):
"""Core engine to apply data quality checks to a DataFrame.

Expand Down Expand Up @@ -291,26 +290,47 @@ def apply_checks_by_metadata_and_split(
return good_df, bad_df

@staticmethod
def validate_checks(
def validate_checks(
Comment thread
mwojtyczka marked this conversation as resolved.
Outdated
Comment thread
mwojtyczka marked this conversation as resolved.
Outdated
checks: list[dict],
custom_check_functions: dict[str, Callable] | None = None,
validate_custom_check_functions: bool = True,
semantic_validation_mode: str | None = SemanticValidationMode.WARN,
) -> ChecksValidationStatus:
"""
Validate checks defined as metadata to ensure they conform to the expected structure and types.

This method validates the presence of required keys, the existence and callability of functions,
and the types of arguments passed to those functions.

Validate checks defined as metadata to ensure they conform to the expected
structure and types, and are semantically consistent as a ruleset.

Structural validation checks for required keys, callable functions, and
correct argument types. Semantic validation detects duplicate rules and
similar rules with conflicting arguments (e.g. two is_in_range checks on
the same column with different thresholds).

Note:
Rules using raw Spark SQL expressions are not deeply inspected during
semantic validation — only structured metadata is compared.

Args:
checks: List of checks to apply to the DataFrame. Each check should be a dictionary.
custom_check_functions: Optional dictionary with custom check functions (e.g., *globals()* of the calling module).
custom_check_functions: Optional dictionary with custom check functions
(e.g., *globals()* of the calling module).
validate_custom_check_functions: If True, validate custom check functions.

semantic_validation_mode: Controls how semantic issues are surfaced.
Use ``SemanticValidationMode.WARN`` (default) to log warnings,
``SemanticValidationMode.FAIL`` to raise on any issue, or
``None`` to skip semantic validation entirely.

Returns:
ChecksValidationStatus indicating the validation result.
ChecksValidationStatus indicating the structural validation result.

Raises:
ValueError: If semantic_validation_mode is FAIL and issues are found.
"""
return ChecksValidator.validate_checks(checks, custom_check_functions, validate_custom_check_functions)
status = ChecksValidator.validate_checks(checks, custom_check_functions, validate_custom_check_functions)

if semantic_validation_mode is not None:
SemanticValidator.apply(checks, mode=semantic_validation_mode)

return status

def get_invalid(self, df: DataFrame) -> DataFrame:
"""
Expand Down Expand Up @@ -1187,44 +1207,55 @@ def save_results_in_table(
)

@telemetry_logger("engine", "load_checks")
def load_checks(
self, config: BaseChecksStorageConfig, variables: dict[str, VariableValue] | None = None
def load_checks(
self,
config: BaseChecksStorageConfig,
variables: dict[str, VariableValue] | None = None,
semantic_validation_mode: str | None = SemanticValidationMode.WARN, # <-- ADD THIS
Comment thread
mwojtyczka marked this conversation as resolved.
Outdated
) -> list[dict]:
"""Load DQ rules (checks) from the storage backend described by *config*.

Comment thread
mwojtyczka marked this conversation as resolved.
Outdated
This method delegates to a storage handler selected by the factory
based on the concrete type of *config* and returns the parsed list
of checks (as dictionaries) ready for *apply_checks_by_metadata*.

Supported storage configurations include, for example:
- *FileChecksStorageConfig* (local file);
- *WorkspaceFileChecksStorageConfig* (Databricks workspace file);
- *TableChecksStorageConfig* (table-backed storage);
- *LakebaseChecksStorageConfig* (Lakebase table);
- *InstallationChecksStorageConfig* (installation directory);
- *VolumeFileChecksStorageConfig* (Unity Catalog volume file);

Per-call *variables* are merged with engine-level defaults from
*ExtraParams.variables* (per-call values take precedence on conflict).

**Security note:** variable values substituted into **sql_expression** checks are
not sanitized. Callers must ensure that variable values come from trusted sources.

Args:
config: Configuration object describing the storage backend.
variables: Optional mapping of placeholder names to replacement values. Replaces placeholders
in all string values of the check definitions before returning.

semantic_validation_mode: Controls semantic validation behavior after loading.
Use ``SemanticValidationMode.WARN`` (default) to log warnings and continue,
``SemanticValidationMode.FAIL`` to raise if issues are found, or
``None`` to skip semantic validation entirely.

Returns:
List of DQ rules (checks) represented as dictionaries.

Raises:
InvalidConfigError: If the configuration type is unsupported.
ValueError: If semantic_validation_mode is FAIL and issues are found.
"""
handler = self._checks_handler_factory.create(config)
checks = handler.load(config)
merged_variables = self._merge_variables(variables)
return resolve_variables(checks=checks, variables=merged_variables)
resolved = resolve_variables(checks=checks, variables=merged_variables) # <-- RENAME from return value
Comment thread
mwojtyczka marked this conversation as resolved.
Outdated
if semantic_validation_mode is not None: # <-- ADD THIS
SemanticValidator.apply(resolved, mode=semantic_validation_mode) # <-- ADD THIS
return resolved # <-- CHANGE return

def _merge_variables(self, per_call: dict[str, VariableValue] | None) -> dict[str, VariableValue] | None:
"""Merge engine-level default variables with per-call overrides.
Expand All @@ -1241,46 +1272,54 @@ def _merge_variables(self, per_call: dict[str, VariableValue] | None) -> dict[st
return {**defaults, **per_call}

@telemetry_logger("engine", "save_checks")
def save_checks(
def save_checks(
self,
checks: list[dict],
config: BaseChecksStorageConfig,
variables: dict[str, VariableValue] | None = None,
semantic_validation_mode: str | None = SemanticValidationMode.WARN, # <-- ADD THIS
) -> None:
"""Persist DQ rules (checks) to the storage backend described by *config*.

The appropriate storage handler is resolved from the configuration
type and used to write the provided checks. Any write semantics
(e.g., append/overwrite) are controlled by fields on *config*
such as *mode* where applicable.

Supported storage configurations include, for example:
- *FileChecksStorageConfig* (local file);
- *WorkspaceFileChecksStorageConfig* (Databricks workspace file);
- *TableChecksStorageConfig* (table-backed storage);
- *LakebaseChecksStorageConfig* (Lakebase table);
- *InstallationChecksStorageConfig* (installation directory);
- *VolumeFileChecksStorageConfig* (Unity Catalog volume file);

Per-call *variables* are merged with engine-level defaults from
*ExtraParams.variables* (per-call values take precedence on conflict).
Variables are resolved before computing fingerprints and persisting,
ensuring that stored checks and their fingerprints are consistent.

Args:
checks: List of DQ rules (checks) to save (as dictionaries).
config: Configuration object describing the storage backend and write options.
variables: Optional mapping of placeholder names to replacement values. Replaces placeholders
in all string values of the check definitions before saving.

semantic_validation_mode: Controls semantic validation behavior before saving.
Use ``SemanticValidationMode.WARN`` (default) to log warnings and continue,
``SemanticValidationMode.FAIL`` to abort saving if issues are found, or
``None`` to skip semantic validation entirely.

Returns:
None

Raises:
InvalidConfigError: If the configuration type is unsupported.
ValueError: If semantic_validation_mode is FAIL and issues are found.
"""
merged_variables = self._merge_variables(variables)
resolved_checks = resolve_variables(checks=checks, variables=merged_variables)
if semantic_validation_mode is not None: # <-- ADD THIS
SemanticValidator.apply(resolved_checks, mode=semantic_validation_mode) # <-- ADD THIS
handler = self._checks_handler_factory.create(config)
handler.save(resolved_checks, config)

Expand Down
Loading