diff --git a/docs/dqx/docs/guide/quality_checks_apply.mdx b/docs/dqx/docs/guide/quality_checks_apply.mdx
index 6c4b45837..3fab57d47 100644
--- a/docs/dqx/docs/guide/quality_checks_apply.mdx
+++ b/docs/dqx/docs/guide/quality_checks_apply.mdx
@@ -43,7 +43,7 @@ The end-to-end `apply_checks_and_save_in_table` and `apply_checks_by_metadata_an
The engine ensures that the specified `column`, `columns`, `filter`, or sql 'expression' fields can be resolved in the input DataFrame. If any of these fields are invalid, the check evaluation is skipped, and the results include the check failure with a message identifying the invalid fields and `skipped=True` in the result struct. You can suppress these entries entirely or identify them downstream — see [Suppressing skipped check entries](/docs/guide/additional_configuration#suppressing-skipped-check-entries).
The engine will raise an error if you try to apply checks with invalid definition (e.g. wrong syntax).
-In addition, you can also perform a standalone syntax validation of the checks as described [here](/docs/guide/quality_checks_definition#validating-syntax-of-quality-checks).
+In addition, you can also perform standalone validation of the checks as described [here](/docs/guide/quality_checks_definition#validating-quality-checks).
You can apply quality checks to streaming pipelines using the same methods as for batch processing.
You can either use the end-to-end methods or manage the input stream and output directly with native Spark APIs (e.g. `spark.readStream` and `writeStream`).
diff --git a/docs/dqx/docs/guide/quality_checks_definition.mdx b/docs/dqx/docs/guide/quality_checks_definition.mdx
index ff11678d8..056d92482 100644
--- a/docs/dqx/docs/guide/quality_checks_definition.mdx
+++ b/docs/dqx/docs/guide/quality_checks_definition.mdx
@@ -794,16 +794,24 @@ In addition to specifying variables during the load or save process, you can def
For technical details and configuration examples, see [Default Variables](/docs/guide/additional_configuration#defining-default-variables-for-substitution) in the Additional Configuration guide.
-## Validating syntax of quality checks
+## Validating quality checks
-You can validate the syntax of checks loaded from a storage system or checks defined programmatically before applying them.
-This validation ensures that the checks are correctly defined and can be interpreted by the DQX engine.
+You can validate checks loaded from a storage system or checks defined declaratively (metadata) before applying them.
+DQX performs two complementary kinds of validation:
-The validation cannot be used for checks defined programmatically using DQX classes.
+- **Syntax (structural) validation** ensures each check is correctly defined and can be interpreted by the DQX engine,
+ for example that the function exists and the provided arguments match its signature. Structural problems are reported
+ as errors in the returned `ChecksValidationStatus`.
+- **Semantic (ruleset-level) validation** inspects the ruleset as a whole and detects:
+ - **Duplicate rules**: two rules with the same function, arguments, criticality and filter.
+ - **Conflicting rules**: two rules targeting the same function and column(s) but with different arguments
+ (e.g. two `is_in_range` checks on the same column with different thresholds).
+
+Validation cannot be used for checks defined programmatically using DQX classes.
When checks are defined programmatically with DQX classes, syntax validation is unnecessary because the application will fail to interpret them if the DQX objects are constructed incorrectly.
-Validating quality rules are typically done as part of the CI/CD process to ensure checks are ready to use in the application.
+Validating quality rules is typically done as part of the CI/CD process to ensure checks are ready to use in the application.
@@ -842,3 +850,48 @@ Validating quality rules are typically done as part of the CI/CD process to ensu
- 'checks_location': file or table location of the quality checks
+
+### Controlling semantic validation
+
+Semantic validation runs automatically inside `validate_checks`, `load_checks` and `save_checks`. Its behavior is
+controlled by the `semantic_validation_mode` parameter:
+
+| Mode | Behavior |
+| --- | --- |
+| `"warn"` (default) | Log a warning for each duplicate or conflicting rule and continue. |
+| `"fail"` | Raise a `ValueError` listing all issues found. |
+| `None` | Skip semantic validation entirely. |
+
+Duplicate and conflicting rules are not necessarily invalid (a duplicate is redundant, a conflict may be intentional),
+so the default mode only warns. Use `"fail"` to enforce a clean ruleset, for example in a CI/CD pipeline.
+
+
+Checks that use raw Spark SQL expressions (via the `sql_expression` function) are not deeply inspected — only
+structured metadata (function name, column, arguments, criticality and filter) is compared.
+
+
+```python
+import yaml
+from databricks.labs.dqx.engine import DQEngine
+from databricks.labs.dqx.checks_semantic_validator import ChecksSemanticValidationMode
+
+checks = yaml.safe_load("""
+- criticality: error
+ check:
+ function: is_not_null
+ arguments:
+ column: col1
+- criticality: error
+ check:
+ function: is_not_null
+ arguments:
+ column: col1
+""")
+
+# Fail fast on duplicate or conflicting rules (e.g. in CI/CD)
+DQEngine.validate_checks(checks, semantic_validation_mode=ChecksSemanticValidationMode.FAIL)
+
+# Skip semantic validation when loading checks
+engine = DQEngine(ws)
+engine.load_checks(config=..., semantic_validation_mode=None)
+```
diff --git a/docs/dqx/docs/guide/quality_checks_storage.mdx b/docs/dqx/docs/guide/quality_checks_storage.mdx
index 87cefe482..9c25f80ec 100644
--- a/docs/dqx/docs/guide/quality_checks_storage.mdx
+++ b/docs/dqx/docs/guide/quality_checks_storage.mdx
@@ -222,8 +222,9 @@ If you create checks as a list of DQRule objects, you can convert them using the
-Wrong types, unknown arguments, and missing required check function parameters are reported by `DQEngine.validate_checks`.
-`load_checks` and `save_checks` (except delta storage) methods do not validate the returned/provided metadata. Call `validate_checks` after load / before save when you want to catch problems before apply (for example hand edited YAML/JSON or checks written to a table without going through DQX).
+Wrong types, unknown arguments, and missing required check function parameters (syntax validation) are reported by `DQEngine.validate_checks`.
+`load_checks` and `save_checks` do not run syntax validation on the returned/provided metadata, so call `validate_checks` after load / before save when you want to catch syntax problems before apply (for example hand edited YAML/JSON or checks written to a table without going through DQX).
+They do, however, run semantic (ruleset-level) validation to detect duplicate and conflicting rules; this is controlled by the `semantic_validation_mode` parameter (`"warn"` by default, `"fail"` to raise, or `None` to skip).
For field semantics and validation details, see [Quality checks definition](/docs/guide/quality_checks_definition).
diff --git a/docs/dqx/docs/reference/engine.mdx b/docs/dqx/docs/reference/engine.mdx
index 50d0a45a2..01d050bea 100644
--- a/docs/dqx/docs/reference/engine.mdx
+++ b/docs/dqx/docs/reference/engine.mdx
@@ -59,11 +59,11 @@ The following table outlines the available methods of the `DQEngine` and their f
| `apply_checks_by_metadata_and_save_in_table` | Applies quality checks defined as a dictionary, writes results to valid and invalid Delta table(s) with result columns, and optionally writes summary metrics to a Delta table. Checks can be passed directly as metadata/dict or loaded automatically from a storage location. By default, checks are applied to the entire input table. Incremental processing is supported using streaming with the AvailableNow trigger for batch-style execution, along with checkpointing to ensure consistency across runs. | `input_config`: `InputConfig` object with the data location (e.g. a table) and options for reading the input data; `output_config`: (optional) `OutputConfig` object with the table name, output mode, and options for the output data (supports `partition_by` or `cluster_by`, only one applies). Optional only when `quarantine_config` is provided, in which case valid records are not written (quarantine-only mode). At least one of `output_config` or `quarantine_config` must be provided; `checks`: (optional) List of checks defined as dictionary. If not provided, `checks_location` must be provided; `quarantine_config`: (optional) `OutputConfig` object with the table name, output mode, and options for the quarantine data (supports `partition_by` or `cluster_by`, only one applies) - if provided, data will be split; `metrics_config`: (optional) `OutputConfig` object with the table name, output mode, and options for the summary metrics; `custom_check_functions`: (optional) Dictionary with custom check functions; `ref_dfs`: (optional) Reference DataFrames to use in the checks, if applicable; `checks_location`: (optional) location of the checks. If `checks` is provided, used only for reporting in the summary metrics table. If `checks` is not provided, used for loading checks from the storage. At least one of `checks` or `checks_location` must be provided; `run_config_name`: (optional) Name of the run configuration to use when loading checks from a table (defaults to "default"). | No |
| `apply_checks_and_save_in_tables` | Applies quality checks persisted in a storage to multiple tables and writes results to valid and invalid Delta table(s) with result columns. By default, checks are applied to the entire input table. Incremental processing is supported using streaming with the AvailableNow trigger for batch-style execution, along with checkpointing to ensure consistency across runs. | `run_configs`: list of run config objects (`RunConfig`) containing input config (`InputConfig`), output config (`OutputConfig`), quarantine config (`OutputConfig`, if provided data will be split), 'checks_location', and if provided 'reference_tables' and 'custom_check_functions'; `max_parallelism`: (optional) Maximum number of tables to check in parallel (defaults to the number of CPU cores). | No |
| `apply_checks_and_save_in_tables_for_patterns` | Applies quality checks persisted in a storage to multiple tables matching provided wildcard patterns and writes results to valid and invalid Delta table(s) with result columns. Skip output and quarantine tables based on specified suffixes. By default, checks are applied to the entire input table. Incremental processing is supported using streaming with the AvailableNow trigger for batch-style execution, along with checkpointing to ensure consistency across runs. | `patterns`: List of table names or filesystem-style wildcards (e.g. 'schema.*') to include (if None, all tables are included); ; `exclude_patterns`: (optional) List of table names or filesystem-style wildcards (e.g., '*_dq_output') to exclude, useful if wanting to exclude existing output or quarantine tables; `checks_location`: Location of the checks files (e.g. absolute workspace or volume directory or delta table), for file based locations, checks are expected to be found under 'checks_location/input_table_name.yml'; `exclude_matched`:(optional) Whether to exclude matched tables (default False); `run_config_template`: (optional) Run configuration template to use for all tables (skip location in the 'input_config', 'output_config', and 'quarantine_config' fields as it is derived from patterns, skip 'checks_location' of the run config as it is derived separately, autogenerate 'input_config' and 'output_config' if not provided, use 'reference_tables' and 'custom_check_functions' if provided; `max_parallelism`: (optional) Maximum number of tables to check in parallel (defaults to the number of CPU cores); `output_table_suffix`: (optional) Suffix to append to the output table name (default "_dq_output"); `quarantine_table_suffix`: (optional) Suffix to append to the quarantine table name (default "_dq_quarantine"). | No |
-| `validate_checks` | Validates declarative checks (list of dict metadata): expected shape, argument types where the check function has annotations, unknown argument names, and required parameters of each check function’s signature. | `checks`: List of checks to validate; `custom_check_functions`: (optional) Dictionary of custom check functions that can be used; `validate_custom_check_functions`: (optional) If True, validates custom check functions (defaults to True). | Yes |
+| `validate_checks` | Validates declarative checks (list of dict metadata): expected shape, argument types where the check function has annotations, unknown argument names, and required parameters of each check function’s signature. Also runs semantic (ruleset-level) validation to detect duplicate and conflicting rules. | `checks`: List of checks to validate; `custom_check_functions`: (optional) Dictionary of custom check functions that can be used; `validate_custom_check_functions`: (optional) If True, validates custom check functions (defaults to True); `semantic_validation_mode`: (optional) how to surface duplicate/conflicting rules — `"warn"` (default, log), `"fail"` (raise), or `None` (skip). | Yes |
| `get_invalid` | Retrieves records from the DataFrame that violate data quality checks (records with warnings and errors). | `df`: Input DataFrame. | Yes |
| `get_valid` | Retrieves records from the DataFrame that pass all data quality checks. | `df`: Input DataFrame. | Yes |
-| `load_checks` | Loads quality rules (checks) from storage backend. Multiple storage backends are supported including tables, files, workspace files, or installation-managed sources inferred from run config. | `config`: Configuration for loading checks from a storage backend, e.g., `FileChecksStorageConfig` (local YAML/JSON file or workspace file), `WorkspaceFileChecksStorageConfig` (workspace file with absolute path), `VolumeFileChecksStorageConfig` (Unity Catalog Volume YAML/JSON), `TableChecksStorageConfig` (table), `InstallationChecksStorageConfig` (installation-managed backend using `checks_location` in run config); `variables`: (optional) dictionary of variables for [variable substitution](/docs/guide/quality_checks_definition/#variable-substitution). | Yes (only with `FileChecksStorageConfig`) |
-| `save_checks` | Saves quality rules (checks) to a storage backend. Multiple storage backends are supported including tables, files, workspace files, or installation-managed targets inferred from run config. Variables are resolved before computing fingerprints and persisting. | `checks`: List of checks defined as dictionary; `config`: Configuration for saving checks in a storage backend, e.g., `FileChecksStorageConfig` (local YAML/JSON file or workspace file), `WorkspaceFileChecksStorageConfig` (workspace file with absolute path), `VolumeFileChecksStorageConfig` (Unity Catalog Volume YAML/JSON), `TableChecksStorageConfig` (table), `InstallationChecksStorageConfig` (installation-managed backend using `checks_location` in run config); `variables`: (optional) dictionary of variables for [variable substitution](/docs/guide/quality_checks_definition/#variable-substitution). | Yes (only with `FileChecksStorageConfig`) |
+| `load_checks` | Loads quality rules (checks) from storage backend. Multiple storage backends are supported including tables, files, workspace files, or installation-managed sources inferred from run config. | `config`: Configuration for loading checks from a storage backend, e.g., `FileChecksStorageConfig` (local YAML/JSON file or workspace file), `WorkspaceFileChecksStorageConfig` (workspace file with absolute path), `VolumeFileChecksStorageConfig` (Unity Catalog Volume YAML/JSON), `TableChecksStorageConfig` (table), `InstallationChecksStorageConfig` (installation-managed backend using `checks_location` in run config); `variables`: (optional) dictionary of variables for [variable substitution](/docs/guide/quality_checks_definition/#variable-substitution); `semantic_validation_mode`: (optional) how to surface duplicate/conflicting rules after loading — `"warn"` (default, log), `"fail"` (raise), or `None` (skip). | Yes (only with `FileChecksStorageConfig`) |
+| `save_checks` | Saves quality rules (checks) to a storage backend. Multiple storage backends are supported including tables, files, workspace files, or installation-managed targets inferred from run config. Variables are resolved before computing fingerprints and persisting. | `checks`: List of checks defined as dictionary; `config`: Configuration for saving checks in a storage backend, e.g., `FileChecksStorageConfig` (local YAML/JSON file or workspace file), `WorkspaceFileChecksStorageConfig` (workspace file with absolute path), `VolumeFileChecksStorageConfig` (Unity Catalog Volume YAML/JSON), `TableChecksStorageConfig` (table), `InstallationChecksStorageConfig` (installation-managed backend using `checks_location` in run config); `variables`: (optional) dictionary of variables for [variable substitution](/docs/guide/quality_checks_definition/#variable-substitution); `semantic_validation_mode`: (optional) how to surface duplicate/conflicting rules before saving — `"warn"` (default, log), `"fail"` (raise), or `None` (skip). | Yes (only with `FileChecksStorageConfig`) |
| `save_results_in_table` | Saves DataFrames as tables using Unity Catalog table references or storage paths. Supports both batch and streaming writes. For streaming DataFrames, returns a StreamingQuery that can be used to monitor or wait for completion. For batch DataFrames, data is written synchronously and None is returned. | `output_df`: (optional) DataFrame containing the output data (batch or streaming); `quarantine_df`: (optional) DataFrame containing invalid data (batch or streaming); `observation`: (optional) Spark Observation tracking summary metrics; `output_config`: `OutputConfig` with location (table name or storage path), mode, format, options, and optional trigger (supports `partition_by` or `cluster_by`, only one applies;); `quarantine_config`: (optional) `OutputConfig` with location (table name or storage path), mode, format, options, and optional trigger (supports `partition_by` or `cluster_by`, only one applies;); `metrics_config`: (optional) `OutputConfig` with location for summary metrics; `rule_set_fingerprint`: (optional) SHA-256 fingerprint of the rule set used for this run, included in summary metrics when metrics_config is provided; `run_config_name`: Name of the run config to use; `install_folder`: (optional) Installation folder where DQX is installed (only required for custom folder); `assume_user`: (optional) If True, assume user installation, otherwise global. | No |
| `save_summary_metrics` | Saves quality checking summary metrics to a Delta table. | `observed_metrics`: `dict[str, Any]` Collected summary metrics from Spark Observation; `metrics_config`: `OutputConfig` object with the table name, output mode, and options for the summary metrics data; `input_config`: (optional) `InputConfig` object with the table name for reading the input data; `output_config`: (optional) `OutputConfig` object with the table name for the output data (supports `partition_by` or `cluster_by`, only one applies); `quarantine_config`: (optional) `OutputConfig` object with the table name for the quarantine data (supports `partition_by` or `cluster_by`, only one applies); `checks_location`: (optional) Location where checks are stored; `rule_set_fingerprint`: (optional) SHA-256 fingerprint of the rule set used for this run. | No |
| `get_streaming_metrics_listener` | Gets a streaming metrics listener for writing metrics to an output table. Only required when using streaming DataFrames. | `metrics_config`: `OutputConfig` object with the table name, output mode, and options for the summary metrics data; `input_config`: (optional) `InputConfig` object with the table name for reading the input data; `output_config`: (optional) `OutputConfig` object with the table name for the output data (supports `partition_by` or `cluster_by`, only one applies); `quarantine_config`: (optional) `OutputConfig` object with the table name for the quarantine data (supports `partition_by` or `cluster_by`, only one applies); `checks_location`: (optional) checks location; `rule_set_fingerprint`: (optional) SHA-256 fingerprint of the rule set used for this run; `target_query_id`: (optional) Query ID of the specific streaming query to monitor, if provided, metrics will be collected only for this query. | No |
diff --git a/src/databricks/labs/dqx/checks_semantic_validator.py b/src/databricks/labs/dqx/checks_semantic_validator.py
new file mode 100644
index 000000000..335acd71c
--- /dev/null
+++ b/src/databricks/labs/dqx/checks_semantic_validator.py
@@ -0,0 +1,352 @@
+"""Semantic (ruleset-level) validation for DQ checks."""
+
+from __future__ import annotations
+
+import json
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class ChecksSemanticValidationMode:
+ """Controls how semantic validation issues are surfaced."""
+
+ WARN = "warn" # Log warnings but continue
+ FAIL = "fail" # Raise an exception if any issues are found
+
+
+class ChecksSemanticValidator:
+ """Provides semantic validation for a collection of DQ rules.
+
+ Detects ruleset-level issues such as:
+ - Duplicate rules: two rules with the same function, arguments, criticality, and filter.
+ - Conflicting rules: two rules targeting the same function and column but with
+ different arguments (e.g. two *is_in_range* checks with different thresholds).
+
+ Note:
+ Rules that use raw Spark SQL expressions (via the *sql_expression* function)
+ are not deeply inspected — only structured metadata (function name, column,
+ arguments) is compared. Document this limitation when such checks are used.
+
+ Usage::
+
+ # Just get a list of issues:
+ issues = ChecksSemanticValidator.validate_ruleset(checks)
+
+ # Or apply with configurable behavior:
+ ChecksSemanticValidator.apply(checks, mode=ChecksSemanticValidationMode.WARN)
+ ChecksSemanticValidator.apply(checks, mode=ChecksSemanticValidationMode.FAIL)
+ """
+
+ @staticmethod
+ def _inner_block(check: dict) -> dict | None:
+ """Return the check block, handling both the nested and flat forms.
+
+ The nested form wraps the definition under a *check* key; the flat form
+ places *function*/*arguments* at the top level. Returns None for anything
+ that is not a dict (malformed input is reported by structural validation).
+ """
+ if not isinstance(check, dict):
+ return None
+ inner = check.get("check", check)
+ return inner if isinstance(inner, dict) else None
+
+ @staticmethod
+ def _get_function(check: dict) -> str | None:
+ """Extract the function name from a check dict."""
+ inner = ChecksSemanticValidator._inner_block(check)
+ return inner.get("function") if inner is not None else None
+
+ @staticmethod
+ def _get_arguments(check: dict) -> dict:
+ """Extract the arguments dict from a check dict.
+
+ Returns an empty dict for malformed checks (e.g. a non-dict *check* block
+ or non-dict *arguments*); structural validation reports those separately.
+ """
+ inner = ChecksSemanticValidator._inner_block(check)
+ if inner is None:
+ return {}
+ arguments = inner.get("arguments", {})
+ return arguments if isinstance(arguments, dict) else {}
+
+ @staticmethod
+ def _get_for_each_column(check: dict) -> object:
+ """Extract the *for_each_column* value (a list of columns or list of column groups).
+
+ Returns None when absent. This value lives in the check block alongside
+ *function* and *arguments*, so it must be part of a rule's identity.
+ """
+ inner = ChecksSemanticValidator._inner_block(check)
+ return inner.get("for_each_column") if inner is not None else None
+
+ @staticmethod
+ def _get_filter(check: dict) -> object:
+ """Extract the rule *filter*.
+
+ DQX accepts *filter* either at the top level of the check or nested inside
+ the check block; the top-level value takes precedence when both are present.
+ """
+ if not isinstance(check, dict):
+ return None
+ if check.get("filter") is not None:
+ return check.get("filter")
+ inner = ChecksSemanticValidator._inner_block(check)
+ return inner.get("filter") if inner is not None else None
+
+ @staticmethod
+ def _sorted_columns(columns: list) -> list:
+ """Sort a column list by a stable serialized key (handles names and nested groups)."""
+ return sorted(columns, key=lambda item: json.dumps(item, sort_keys=True, default=str))
+
+ @staticmethod
+ def _normalize_columns(value: object) -> object:
+ """Return an order-insensitive canonical form for a column list.
+
+ Column targeting (*columns*, *for_each_column*) is order-independent, so two
+ rules listing the same columns in a different order are the same rule. Lists
+ are sorted by a stable serialized key; non-list values are returned unchanged.
+ """
+ if isinstance(value, list):
+ return ChecksSemanticValidator._sorted_columns(value)
+ return value
+
+ @staticmethod
+ def _normalize_arguments(arguments: dict) -> dict:
+ """Return a copy of *arguments* with the column-targeting list order normalized.
+
+ Column order is not semantically significant, so the plural *columns* argument
+ is sorted to a canonical order. This keeps duplicate and conflict detection
+ consistent when comparing arguments.
+ """
+ normalized = dict(arguments)
+ if "columns" in normalized:
+ normalized["columns"] = ChecksSemanticValidator._normalize_columns(normalized["columns"])
+ return normalized
+
+ @staticmethod
+ def _make_hashable(value: object) -> object:
+ """Convert a value into a stable, hashable form so it can be used as a dict key.
+
+ Lists, tuples and dicts (which may appear in malformed checks, e.g. a list-valued
+ *filter* or nested *columns*) are converted to tuples so that building rule keys
+ never raises ``TypeError: unhashable type``. The validator should report or skip
+ malformed checks, not crash on them; structural validation flags them separately.
+ """
+ if isinstance(value, (list, tuple)):
+ return tuple(ChecksSemanticValidator._make_hashable(item) for item in value)
+ if isinstance(value, dict):
+ return tuple(
+ sorted(
+ ((str(k), ChecksSemanticValidator._make_hashable(v)) for k, v in value.items()),
+ key=lambda kv: kv[0],
+ )
+ )
+ return value
+
+ @staticmethod
+ def _full_key(check: dict) -> tuple | None:
+ """Return a hashable key representing a rule's complete identity.
+
+ Two rules with the same full key are exact duplicates.
+ Key: (function, arguments + for_each_column, criticality, filter)
+ """
+ function = ChecksSemanticValidator._get_function(check)
+ if not function:
+ return None
+ arguments = ChecksSemanticValidator._get_arguments(check)
+ criticality = check.get("criticality", "error")
+ filter_expr = ChecksSemanticValidator._get_filter(check)
+ for_each_column = ChecksSemanticValidator._normalize_columns(
+ ChecksSemanticValidator._get_for_each_column(check)
+ )
+ # Normalize the column-targeting argument so that reordered column lists are
+ # treated as the same rule (column order is not semantically significant).
+ normalized_arguments = ChecksSemanticValidator._normalize_arguments(arguments)
+ # Serialize the targeting parts (arguments + for_each_column) to a stable,
+ # hashable string so that list- or dict-valued values (e.g. is_in_list
+ # allowed=[...], for_each_column=[...]) do not raise "unhashable type" when
+ # the key is used in a dict/set, and so rules targeting different columns via
+ # for_each_column are not collapsed into the same identity.
+ identity = {"arguments": normalized_arguments, "for_each_column": for_each_column}
+ identity_key = json.dumps(identity, sort_keys=True, default=str)
+ # Hash the free-form components so a malformed (list/dict-valued) criticality or
+ # filter cannot make the key unhashable and crash the validator.
+ return (
+ function,
+ identity_key,
+ ChecksSemanticValidator._make_hashable(criticality),
+ ChecksSemanticValidator._make_hashable(filter_expr),
+ )
+
+ @staticmethod
+ def _conflict_key(check: dict) -> tuple | None:
+ """Return a key grouping rules that target the same function and column(s).
+
+ Used to detect rules that share a function and column but differ in
+ other arguments (e.g. conflicting thresholds). Handles both the singular
+ *column*/*col_name* arguments and the plural *columns* argument. Returns
+ None if the check has no identifiable column to compare against.
+ """
+ function = ChecksSemanticValidator._get_function(check)
+ if not function:
+ return None
+ arguments = ChecksSemanticValidator._get_arguments(check)
+ column = arguments.get("col_name") or arguments.get("column") or arguments.get("columns")
+ if not column:
+ return None
+ if isinstance(column, list):
+ # Column order is not semantically significant; normalize so reordered
+ # lists group together.
+ column = ChecksSemanticValidator._sorted_columns(column)
+ # Make the column component hashable so malformed/nested column values cannot
+ # crash the validator when the key is used in a dict.
+ return (function, ChecksSemanticValidator._make_hashable(column))
+
+ @staticmethod
+ def _duplicate_issue(idx: int, check: dict, seen: dict[tuple, int]) -> str | None:
+ """Record *check* in *seen* and return a duplicate-issue message if it repeats an earlier rule."""
+ key = ChecksSemanticValidator._full_key(check)
+ if key is None:
+ return None
+ if key not in seen:
+ seen[key] = idx
+ return None
+ return (
+ f"Duplicate rule detected: rule at index {idx} is identical to "
+ f"rule at index {seen[key]} (function: '{key[0]}')."
+ )
+
+ @staticmethod
+ def detect_duplicates(checks: list[dict]) -> list[str]:
+ """Detect rules that are completely identical.
+
+ Two rules are duplicates when they share the same function, arguments,
+ criticality, and filter expression.
+
+ Args:
+ checks: The ruleset to inspect.
+
+ Returns:
+ A list of issue message strings, empty if no duplicates found.
+ """
+ seen: dict[tuple, int] = {}
+ issues: list[str] = []
+
+ for idx, check in enumerate(checks):
+ try:
+ issue = ChecksSemanticValidator._duplicate_issue(idx, check, seen)
+ except TypeError as exc:
+ # Best-effort validation: never let a malformed/unhashable check crash the
+ # validator. Skip it here; structural validation reports such checks.
+ logger.warning(f"Skipping duplicate detection for check at index {idx}: {exc}")
+ continue
+ if issue:
+ issues.append(issue)
+
+ return issues
+
+ @staticmethod
+ def _conflict_issue(idx: int, check: dict, seen: dict[tuple, tuple[int, dict]]) -> str | None:
+ """Record *check* in *seen* and return a conflict-issue message if it clashes with an earlier rule."""
+ conflict_key = ChecksSemanticValidator._conflict_key(check)
+ if conflict_key is None:
+ return None
+ # Compare normalized arguments so that rules differing only by column order
+ # are treated as identical (a duplicate), not as a conflict.
+ arguments = ChecksSemanticValidator._normalize_arguments(ChecksSemanticValidator._get_arguments(check))
+ if conflict_key not in seen:
+ seen[conflict_key] = (idx, arguments)
+ return None
+ prev_idx, prev_arguments = seen[conflict_key]
+ if arguments == prev_arguments:
+ return None
+ function, column = conflict_key
+ column_label = ", ".join(str(c) for c in column) if isinstance(column, tuple) else str(column)
+ return (
+ f"Conflicting rules detected: rule at index {idx} and rule at index {prev_idx} "
+ f"both apply '{function}' to column '{column_label}' but with different arguments "
+ f"(index {prev_idx}: {prev_arguments}, index {idx}: {arguments})."
+ )
+
+ @staticmethod
+ def detect_conflicts(checks: list[dict]) -> list[str]:
+ """Detect rules targeting the same function and column with different arguments.
+
+ For example, two *is_in_range* checks on *age* with different min/max
+ thresholds would be flagged, as this is likely a misconfiguration.
+
+ Args:
+ checks: The ruleset to inspect.
+
+ Returns:
+ A list of issue message strings, empty if no conflicts found.
+ """
+ seen: dict[tuple, tuple[int, dict]] = {}
+ issues: list[str] = []
+
+ for idx, check in enumerate(checks):
+ try:
+ issue = ChecksSemanticValidator._conflict_issue(idx, check, seen)
+ except TypeError as exc:
+ # Best-effort validation: never let a malformed/unhashable check crash the
+ # validator. Skip it here; structural validation reports such checks.
+ logger.warning(f"Skipping conflict detection for check at index {idx}: {exc}")
+ continue
+ if issue:
+ issues.append(issue)
+
+ return issues
+
+ @staticmethod
+ def validate_ruleset(checks: list[dict]) -> list[str]:
+ """Run all semantic checks and return a combined list of issue messages.
+
+ Args:
+ checks: The ruleset to inspect.
+
+ Returns:
+ A list of issue strings. Empty list means the ruleset is semantically clean.
+ """
+ issues: list[str] = []
+ issues.extend(ChecksSemanticValidator.detect_duplicates(checks))
+ issues.extend(ChecksSemanticValidator.detect_conflicts(checks))
+ return issues
+
+ @staticmethod
+ def apply(checks: list[dict], mode: str | None = ChecksSemanticValidationMode.WARN) -> None:
+ """Run semantic validation and surface issues according to the chosen mode.
+
+ This is the main entry point called from *validate_checks*, *save_checks*,
+ and *load_checks* with configurable behavior.
+
+ Args:
+ checks: The ruleset to inspect.
+ mode: One of *ChecksSemanticValidationMode.WARN* (default),
+ *ChecksSemanticValidationMode.FAIL*, or *None*. In WARN mode, issues are
+ logged as warnings and execution continues. In FAIL mode, a *ValueError*
+ is raised listing all issues found. When *None*, semantic validation is
+ skipped entirely.
+
+ Raises:
+ ValueError: If *mode* is FAIL and any semantic issues are detected.
+ ValueError: If an unsupported mode value is passed.
+ """
+ if mode is None:
+ return
+
+ if mode not in (ChecksSemanticValidationMode.WARN, ChecksSemanticValidationMode.FAIL):
+ raise ValueError(f"Unsupported semantic validation mode: '{mode}'. Use 'warn' or 'fail'.")
+
+ issues = ChecksSemanticValidator.validate_ruleset(checks)
+ if not issues:
+ return
+
+ if mode == ChecksSemanticValidationMode.WARN:
+ for issue in issues:
+ logger.warning(f"Semantic validation: {issue}")
+ else:
+ raise ValueError(
+ "Semantic validation failed with the following issues:\n"
+ + "\n".join(f" - {issue}" for issue in issues)
+ )
diff --git a/src/databricks/labs/dqx/engine.py b/src/databricks/labs/dqx/engine.py
index 42e72be92..09575ea29 100644
--- a/src/databricks/labs/dqx/engine.py
+++ b/src/databricks/labs/dqx/engine.py
@@ -55,6 +55,7 @@
from databricks.labs.dqx.errors import InvalidCheckError, InvalidConfigError, InvalidParameterError
from databricks.labs.dqx.utils import list_tables, safe_strip_file_from_path, resolve_variables, VariableValue
from databricks.labs.dqx.io import is_one_time_trigger
+from databricks.labs.dqx.checks_semantic_validator import ChecksSemanticValidator, ChecksSemanticValidationMode
logger = logging.getLogger(__name__)
@@ -299,22 +300,43 @@ def validate_checks(
checks: list[dict],
custom_check_functions: dict[str, Callable] | None = None,
validate_custom_check_functions: bool = True,
+ semantic_validation_mode: str | None = ChecksSemanticValidationMode.WARN,
) -> ChecksValidationStatus:
"""
- Validate checks defined as metadata to ensure they conform to the expected structure and types.
+ Validate checks defined as metadata to ensure they conform to the expected
+ structure and types, and are semantically consistent as a ruleset.
- This method validates the presence of required keys, the existence and callability of functions,
- and the types of arguments passed to those functions.
+ Structural validation checks for required keys, callable functions, and
+ correct argument types. Semantic validation detects duplicate rules and
+ similar rules with conflicting arguments (e.g. two is_in_range checks on
+ the same column with different thresholds).
+
+ Note:
+ Rules using raw Spark SQL expressions are not deeply inspected during
+ semantic validation — only structured metadata is compared.
Args:
checks: List of checks to apply to the DataFrame. Each check should be a dictionary.
- custom_check_functions: Optional dictionary with custom check functions (e.g., *globals()* of the calling module).
+ custom_check_functions: Optional dictionary with custom check functions
+ (e.g., *globals()* of the calling module).
validate_custom_check_functions: If True, validate custom check functions.
+ semantic_validation_mode: Controls how semantic issues are surfaced.
+ Use *ChecksSemanticValidationMode.WARN* (default) to log warnings,
+ *ChecksSemanticValidationMode.FAIL* to raise on any issue, or
+ *None* to skip semantic validation entirely.
Returns:
- ChecksValidationStatus indicating the validation result.
+ ChecksValidationStatus indicating the structural validation result.
+
+ Raises:
+ ValueError: If semantic_validation_mode is FAIL and issues are found.
"""
- return ChecksValidator.validate_checks(checks, custom_check_functions, validate_custom_check_functions)
+ status = ChecksValidator.validate_checks(checks, custom_check_functions, validate_custom_check_functions)
+
+ if semantic_validation_mode is not None:
+ ChecksSemanticValidator.apply(checks, mode=semantic_validation_mode)
+
+ return status
def get_invalid(self, df: DataFrame) -> DataFrame:
"""
@@ -1206,25 +1228,35 @@ def validate_checks(
checks: list[dict],
custom_check_functions: dict[str, Callable] | None = None,
validate_custom_check_functions: bool = True,
+ semantic_validation_mode: str | None = ChecksSemanticValidationMode.WARN,
) -> ChecksValidationStatus:
"""
Validate checks defined as metadata to ensure they conform to the expected structure and types.
This method validates the presence of required keys, the existence and callability of functions,
- and the types of arguments passed to those functions.
+ and the types of arguments passed to those functions. It also runs semantic validation across the
+ ruleset to detect duplicate and conflicting rules.
Args:
checks: List of checks to apply to the DataFrame. Each check should be a dictionary.
custom_check_functions: Optional dictionary with custom check functions (e.g., *globals()* of the calling module).
validate_custom_check_functions: If True, validate custom check functions.
+ semantic_validation_mode: Controls how semantic issues are surfaced.
+ Use *ChecksSemanticValidationMode.WARN* (default) to log warnings,
+ *ChecksSemanticValidationMode.FAIL* to raise on any issue, or
+ *None* to skip semantic validation entirely.
Returns:
ChecksValidationStatus indicating the validation result.
+
+ Raises:
+ ValueError: If semantic_validation_mode is FAIL and issues are found.
"""
return DQEngineCore.validate_checks(
checks=checks,
custom_check_functions=custom_check_functions,
validate_custom_check_functions=validate_custom_check_functions,
+ semantic_validation_mode=semantic_validation_mode,
)
def get_invalid(self, df: DataFrame) -> DataFrame:
@@ -1368,7 +1400,10 @@ def save_results_in_table(
@telemetry_logger("engine", "load_checks")
def load_checks(
- self, config: BaseChecksStorageConfig, variables: dict[str, VariableValue] | None = None
+ self,
+ config: BaseChecksStorageConfig,
+ variables: dict[str, VariableValue] | None = None,
+ semantic_validation_mode: str | None = ChecksSemanticValidationMode.WARN,
) -> list[dict]:
"""Load DQ rules (checks) from the storage backend described by *config*.
@@ -1394,17 +1429,25 @@ def load_checks(
config: Configuration object describing the storage backend.
variables: Optional mapping of placeholder names to replacement values. Replaces placeholders
in all string values of the check definitions before returning.
+ semantic_validation_mode: Controls semantic validation behavior after loading.
+ Use *ChecksSemanticValidationMode.WARN* (default) to log warnings and continue,
+ *ChecksSemanticValidationMode.FAIL* to raise if issues are found, or
+ *None* to skip semantic validation entirely.
Returns:
List of DQ rules (checks) represented as dictionaries.
Raises:
InvalidConfigError: If the configuration type is unsupported.
+ ValueError: If semantic_validation_mode is FAIL and issues are found.
"""
handler = self._checks_handler_factory.create(config)
checks = handler.load(config)
merged_variables = self._merge_variables(variables)
- return resolve_variables(checks=checks, variables=merged_variables)
+ resolved = resolve_variables(checks=checks, variables=merged_variables)
+ if semantic_validation_mode is not None:
+ ChecksSemanticValidator.apply(resolved, mode=semantic_validation_mode)
+ return resolved
def _merge_variables(self, per_call: dict[str, VariableValue] | None) -> dict[str, VariableValue] | None:
"""Merge engine-level default variables with per-call overrides.
@@ -1426,6 +1469,7 @@ def save_checks(
checks: list[dict],
config: BaseChecksStorageConfig,
variables: dict[str, VariableValue] | None = None,
+ semantic_validation_mode: str | None = ChecksSemanticValidationMode.WARN,
) -> None:
"""Persist DQ rules (checks) to the storage backend described by *config*.
@@ -1452,15 +1496,22 @@ def save_checks(
config: Configuration object describing the storage backend and write options.
variables: Optional mapping of placeholder names to replacement values. Replaces placeholders
in all string values of the check definitions before saving.
+ semantic_validation_mode: Controls semantic validation behavior before saving.
+ Use *ChecksSemanticValidationMode.WARN* (default) to log warnings and continue,
+ *ChecksSemanticValidationMode.FAIL* to abort saving if issues are found, or
+ *None* to skip semantic validation entirely.
Returns:
None
Raises:
InvalidConfigError: If the configuration type is unsupported.
+ ValueError: If semantic_validation_mode is FAIL and issues are found.
"""
merged_variables = self._merge_variables(variables)
resolved_checks = resolve_variables(checks=checks, variables=merged_variables)
+ if semantic_validation_mode is not None:
+ ChecksSemanticValidator.apply(resolved_checks, mode=semantic_validation_mode)
handler = self._checks_handler_factory.create(config)
handler.save(resolved_checks, config)
diff --git a/tests/integration/test_checks_semantic_validation.py b/tests/integration/test_checks_semantic_validation.py
new file mode 100644
index 000000000..acd5eebf2
--- /dev/null
+++ b/tests/integration/test_checks_semantic_validation.py
@@ -0,0 +1,75 @@
+import pytest
+
+from databricks.labs.dqx.config import WorkspaceFileChecksStorageConfig
+from databricks.labs.dqx.engine import DQEngine
+from databricks.labs.dqx.checks_semantic_validator import ChecksSemanticValidationMode
+
+DUPLICATE_CHECKS = [
+ {"criticality": "error", "check": {"function": "is_not_null", "arguments": {"column": "col1"}}},
+ {"criticality": "error", "check": {"function": "is_not_null", "arguments": {"column": "col1"}}},
+]
+
+CONFLICTING_CHECKS = [
+ {
+ "criticality": "error",
+ "check": {"function": "is_in_range", "arguments": {"column": "col1", "min_limit": 0, "max_limit": 100}},
+ },
+ {
+ "criticality": "error",
+ "check": {"function": "is_in_range", "arguments": {"column": "col1", "min_limit": 0, "max_limit": 50}},
+ },
+]
+
+
+def test_validate_checks_fails_on_duplicate_rules(ws, spark):
+ dq_engine = DQEngine(ws, spark)
+ with pytest.raises(ValueError, match="Semantic validation failed"):
+ dq_engine.validate_checks(DUPLICATE_CHECKS, semantic_validation_mode=ChecksSemanticValidationMode.FAIL)
+
+
+def test_validate_checks_fails_on_conflicting_rules(ws, spark):
+ dq_engine = DQEngine(ws, spark)
+ with pytest.raises(ValueError, match="Semantic validation failed"):
+ dq_engine.validate_checks(CONFLICTING_CHECKS, semantic_validation_mode=ChecksSemanticValidationMode.FAIL)
+
+
+def test_load_checks_fails_on_duplicate_rules(ws, spark, installation_ctx):
+ installation_ctx.installation.save(installation_ctx.config)
+ install_dir = installation_ctx.installation.install_folder()
+ checks_path = f"{install_dir}/{installation_ctx.config.get_run_config().checks_location}"
+
+ dq_engine = DQEngine(ws, spark)
+ config = WorkspaceFileChecksStorageConfig(location=checks_path)
+ # Persist a ruleset with duplicates without triggering validation on save.
+ dq_engine.save_checks(DUPLICATE_CHECKS, config=config, semantic_validation_mode=None)
+
+ with pytest.raises(ValueError, match="Semantic validation failed"):
+ dq_engine.load_checks(config=config, semantic_validation_mode=ChecksSemanticValidationMode.FAIL)
+
+
+def test_save_checks_fails_on_duplicate_rules(ws, spark, installation_ctx):
+ installation_ctx.installation.save(installation_ctx.config)
+ install_dir = installation_ctx.installation.install_folder()
+ checks_path = f"{install_dir}/{installation_ctx.config.get_run_config().checks_location}"
+
+ dq_engine = DQEngine(ws, spark)
+ config = WorkspaceFileChecksStorageConfig(location=checks_path)
+
+ with pytest.raises(ValueError, match="Semantic validation failed"):
+ dq_engine.save_checks(
+ DUPLICATE_CHECKS, config=config, semantic_validation_mode=ChecksSemanticValidationMode.FAIL
+ )
+
+
+def test_save_checks_fails_on_conflicting_rules(ws, spark, installation_ctx):
+ installation_ctx.installation.save(installation_ctx.config)
+ install_dir = installation_ctx.installation.install_folder()
+ checks_path = f"{install_dir}/{installation_ctx.config.get_run_config().checks_location}"
+
+ dq_engine = DQEngine(ws, spark)
+ config = WorkspaceFileChecksStorageConfig(location=checks_path)
+
+ with pytest.raises(ValueError, match="Semantic validation failed"):
+ dq_engine.save_checks(
+ CONFLICTING_CHECKS, config=config, semantic_validation_mode=ChecksSemanticValidationMode.FAIL
+ )
diff --git a/tests/unit/test_checks_semantic_validator.py b/tests/unit/test_checks_semantic_validator.py
new file mode 100644
index 000000000..58963e2d4
--- /dev/null
+++ b/tests/unit/test_checks_semantic_validator.py
@@ -0,0 +1,432 @@
+"""Unit tests for ChecksSemanticValidator."""
+
+import logging
+import pytest
+
+from databricks.labs.dqx.checks_semantic_validator import ChecksSemanticValidator, ChecksSemanticValidationMode
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+
+def _make_check(function: str, column: str, criticality: str = "error", filter_expr=None, **arguments) -> dict:
+ """Build a DQX-style check dict."""
+ check: dict = {
+ "check": {
+ "function": function,
+ "arguments": {"column": column, **arguments},
+ },
+ "criticality": criticality,
+ }
+ if filter_expr is not None:
+ check["filter"] = filter_expr
+ return check
+
+
+# ---------------------------------------------------------------------------
+# detect_duplicates
+# ---------------------------------------------------------------------------
+
+
+def test_no_duplicates_returns_empty():
+ checks = [
+ _make_check("is_not_null", "age"),
+ _make_check("is_not_null", "name"),
+ ]
+ assert not ChecksSemanticValidator.detect_duplicates(checks)
+
+
+def test_identical_rules_flagged_as_duplicate():
+ checks = [
+ _make_check("is_not_null", "age"),
+ _make_check("is_not_null", "age"),
+ ]
+ issues = ChecksSemanticValidator.detect_duplicates(checks)
+ assert len(issues) == 1
+ assert "Duplicate rule detected" in issues[0]
+ assert "index 1" in issues[0]
+ assert "index 0" in issues[0]
+
+
+def test_duplicate_with_same_criticality_and_filter():
+ checks = [
+ _make_check("is_not_null", "age", criticality="warn", filter_expr="status = 'ACTIVE'"),
+ _make_check("is_not_null", "age", criticality="warn", filter_expr="status = 'ACTIVE'"),
+ ]
+ issues = ChecksSemanticValidator.detect_duplicates(checks)
+ assert len(issues) == 1
+
+
+def test_different_criticality_not_duplicate():
+ checks = [
+ _make_check("is_not_null", "age", criticality="error"),
+ _make_check("is_not_null", "age", criticality="warn"),
+ ]
+ assert not ChecksSemanticValidator.detect_duplicates(checks)
+
+
+def test_different_filter_not_duplicate():
+ checks = [
+ _make_check("is_not_null", "age", filter_expr="status = 'ACTIVE'"),
+ _make_check("is_not_null", "age", filter_expr="status = 'INACTIVE'"),
+ ]
+ assert not ChecksSemanticValidator.detect_duplicates(checks)
+
+
+def test_multiple_duplicates_all_flagged():
+ checks = [
+ _make_check("is_not_null", "age"),
+ _make_check("is_not_null", "age"),
+ _make_check("is_not_null", "age"),
+ ]
+ issues = ChecksSemanticValidator.detect_duplicates(checks)
+ assert len(issues) == 2
+
+
+def test_list_valued_arguments_do_not_crash_duplicate_detection():
+ """List-valued arguments must not raise 'unhashable type: list' (regression)."""
+ checks = [
+ _make_check("is_in_list", "status", allowed=["A", "B", "C"]),
+ _make_check("is_in_list", "status", allowed=["A", "B", "C"]),
+ ]
+ issues = ChecksSemanticValidator.detect_duplicates(checks)
+ assert len(issues) == 1
+ assert "Duplicate rule detected" in issues[0]
+
+
+def test_list_valued_arguments_with_different_lists_not_duplicate():
+ checks = [
+ _make_check("is_in_list", "status", allowed=["A", "B"]),
+ _make_check("is_in_list", "status", allowed=["A", "C"]),
+ ]
+ assert not ChecksSemanticValidator.detect_duplicates(checks)
+
+
+def test_for_each_column_same_columns_is_duplicate():
+ checks = [
+ {"criticality": "error", "check": {"function": "is_not_null", "for_each_column": ["a", "b"], "arguments": {}}},
+ {"criticality": "error", "check": {"function": "is_not_null", "for_each_column": ["a", "b"], "arguments": {}}},
+ ]
+ issues = ChecksSemanticValidator.detect_duplicates(checks)
+ assert len(issues) == 1
+ assert "Duplicate rule detected" in issues[0]
+
+
+def test_for_each_column_different_columns_not_duplicate():
+ """for_each_column is part of a rule's identity; different column groups are distinct rules."""
+ checks = [
+ {"criticality": "error", "check": {"function": "is_not_null", "for_each_column": ["a", "b"], "arguments": {}}},
+ {"criticality": "error", "check": {"function": "is_not_null", "for_each_column": ["c", "d"], "arguments": {}}},
+ ]
+ assert not ChecksSemanticValidator.detect_duplicates(checks)
+
+
+def test_for_each_column_reordered_is_duplicate():
+ """Column order in for_each_column is not significant, so reordered lists are duplicates."""
+ checks = [
+ {"criticality": "error", "check": {"function": "is_not_null", "for_each_column": ["a", "b"], "arguments": {}}},
+ {"criticality": "error", "check": {"function": "is_not_null", "for_each_column": ["b", "a"], "arguments": {}}},
+ ]
+ assert len(ChecksSemanticValidator.detect_duplicates(checks)) == 1
+
+
+def test_reordered_columns_argument_is_duplicate():
+ """Column order in the plural 'columns' argument is not significant."""
+ checks = [
+ {"criticality": "error", "check": {"function": "is_unique", "arguments": {"columns": ["a", "b"]}}},
+ {"criticality": "error", "check": {"function": "is_unique", "arguments": {"columns": ["b", "a"]}}},
+ ]
+ assert len(ChecksSemanticValidator.detect_duplicates(checks)) == 1
+
+
+def test_nested_filter_same_is_duplicate():
+ checks = [
+ {"criticality": "error", "check": {"function": "is_not_null", "arguments": {"column": "x"}, "filter": "a > 0"}},
+ {"criticality": "error", "check": {"function": "is_not_null", "arguments": {"column": "x"}, "filter": "a > 0"}},
+ ]
+ issues = ChecksSemanticValidator.detect_duplicates(checks)
+ assert len(issues) == 1
+
+
+def test_nested_filter_different_not_duplicate():
+ """A filter nested inside the check block is part of a rule's identity."""
+ checks = [
+ {"criticality": "error", "check": {"function": "is_not_null", "arguments": {"column": "x"}, "filter": "a > 0"}},
+ {"criticality": "error", "check": {"function": "is_not_null", "arguments": {"column": "x"}, "filter": "a < 0"}},
+ ]
+ assert not ChecksSemanticValidator.detect_duplicates(checks)
+
+
+def test_flat_form_checks_detected_as_duplicate():
+ """Checks in the flat form (no nested 'check' block) are also compared."""
+ checks = [
+ {"function": "is_not_null", "arguments": {"column": "x"}},
+ {"function": "is_not_null", "arguments": {"column": "x"}},
+ ]
+ issues = ChecksSemanticValidator.detect_duplicates(checks)
+ assert len(issues) == 1
+
+
+# ---------------------------------------------------------------------------
+# detect_conflicts
+# ---------------------------------------------------------------------------
+
+
+def test_no_conflicts_returns_empty():
+ checks = [
+ _make_check("is_in_range", "age", min=0, max=120),
+ _make_check("is_in_range", "score", min=0, max=100),
+ ]
+ assert not ChecksSemanticValidator.detect_conflicts(checks)
+
+
+def test_same_function_same_column_different_args_flagged():
+ checks = [
+ _make_check("is_in_range", "age", min=0, max=120),
+ _make_check("is_in_range", "age", min=18, max=65),
+ ]
+ issues = ChecksSemanticValidator.detect_conflicts(checks)
+ assert len(issues) == 1
+ assert "Conflicting rules detected" in issues[0]
+ assert "is_in_range" in issues[0]
+ assert "age" in issues[0]
+
+
+def test_same_function_same_column_same_args_no_conflict():
+ """Identical args on same column/function is a duplicate, not a conflict."""
+ checks = [
+ _make_check("is_in_range", "age", min=0, max=120),
+ _make_check("is_in_range", "age", min=0, max=120),
+ ]
+ assert not ChecksSemanticValidator.detect_conflicts(checks)
+
+
+def test_check_without_column_skipped_in_conflict_detection():
+ checks = [
+ {"check": {"function": "sql_expression", "arguments": {"expression": "age > 0"}}, "criticality": "error"},
+ {"check": {"function": "sql_expression", "arguments": {"expression": "age > 18"}}, "criticality": "error"},
+ ]
+ assert not ChecksSemanticValidator.detect_conflicts(checks)
+
+
+def test_plural_columns_same_columns_different_args_flagged():
+ """Conflict detection handles the plural 'columns' argument (e.g. is_unique)."""
+ checks = [
+ {
+ "criticality": "error",
+ "check": {"function": "is_unique", "arguments": {"columns": ["a", "b"], "nulls_distinct": True}},
+ },
+ {
+ "criticality": "error",
+ "check": {"function": "is_unique", "arguments": {"columns": ["a", "b"], "nulls_distinct": False}},
+ },
+ ]
+ issues = ChecksSemanticValidator.detect_conflicts(checks)
+ assert len(issues) == 1
+ assert "is_unique" in issues[0]
+
+
+def test_plural_columns_different_column_sets_not_conflict():
+ checks = [
+ {"criticality": "error", "check": {"function": "is_unique", "arguments": {"columns": ["a", "b"]}}},
+ {"criticality": "error", "check": {"function": "is_unique", "arguments": {"columns": ["c", "d"]}}},
+ ]
+ assert not ChecksSemanticValidator.detect_conflicts(checks)
+
+
+def test_plural_columns_reordered_same_set_flagged_as_conflict():
+ """Reordered 'columns' lists target the same set, so differing args still conflict."""
+ checks = [
+ {
+ "criticality": "error",
+ "check": {"function": "is_unique", "arguments": {"columns": ["a", "b"], "nulls_distinct": True}},
+ },
+ {
+ "criticality": "error",
+ "check": {"function": "is_unique", "arguments": {"columns": ["b", "a"], "nulls_distinct": False}},
+ },
+ ]
+ assert len(ChecksSemanticValidator.detect_conflicts(checks)) == 1
+
+
+def test_reordered_columns_identical_args_not_conflict():
+ """Reordered columns with otherwise identical args are a duplicate, not a conflict."""
+ checks = [
+ {
+ "criticality": "error",
+ "check": {"function": "is_unique", "arguments": {"columns": ["a", "b"], "nulls_distinct": True}},
+ },
+ {
+ "criticality": "error",
+ "check": {"function": "is_unique", "arguments": {"columns": ["b", "a"], "nulls_distinct": True}},
+ },
+ ]
+ assert not ChecksSemanticValidator.detect_conflicts(checks)
+ assert len(ChecksSemanticValidator.detect_duplicates(checks)) == 1
+
+
+def test_malformed_checks_do_not_crash_validation():
+ """Malformed checks (non-dict check block or arguments) must not raise (regression).
+
+ Structural validation reports these separately; semantic validation should skip them.
+ """
+ checks = [
+ {"criticality": "warn", "check": "not_a_dict"},
+ {"criticality": "warn", "check": {"function": "dummy_func", "arguments": "not_a_dict"}},
+ ]
+ assert not ChecksSemanticValidator.validate_ruleset(checks)
+
+
+@pytest.mark.parametrize(
+ "checks",
+ [
+ # list-valued filter
+ [
+ {
+ "criticality": "error",
+ "check": {"function": "is_not_null", "arguments": {"column": "x"}},
+ "filter": ["a"],
+ },
+ {
+ "criticality": "error",
+ "check": {"function": "is_not_null", "arguments": {"column": "x"}},
+ "filter": ["a"],
+ },
+ ],
+ # list-valued criticality
+ [
+ {"criticality": ["error"], "check": {"function": "is_not_null", "arguments": {"column": "x"}}},
+ {"criticality": ["error"], "check": {"function": "is_not_null", "arguments": {"column": "x"}}},
+ ],
+ # nested list inside the 'columns' argument (conflict path)
+ [
+ {"criticality": "error", "check": {"function": "f", "arguments": {"columns": [["a", "b"]], "x": 1}}},
+ {"criticality": "error", "check": {"function": "f", "arguments": {"columns": [["a", "b"]], "x": 2}}},
+ ],
+ # dict-valued argument
+ [
+ {"criticality": "error", "check": {"function": "f", "arguments": {"column": "x", "opts": {"k": [1, 2]}}}},
+ {"criticality": "error", "check": {"function": "f", "arguments": {"column": "x", "opts": {"k": [1, 2]}}}},
+ ],
+ ],
+)
+def test_unhashable_values_do_not_crash_validation(checks):
+ """List/dict-valued fields must not raise 'unhashable type' when building rule keys."""
+ # Must not raise, regardless of mode.
+ ChecksSemanticValidator.validate_ruleset(checks)
+ ChecksSemanticValidator.apply(checks, mode=ChecksSemanticValidationMode.WARN)
+ ChecksSemanticValidator.detect_duplicates(checks)
+ ChecksSemanticValidator.detect_conflicts(checks)
+
+
+def test_unhashable_value_beyond_normalization_is_skipped(caplog):
+ """A value that stays unhashable (e.g. a set) is skipped and logged, not raised."""
+ checks = [
+ {"criticality": "error", "check": {"function": "is_not_null", "arguments": {"column": "x"}}, "filter": {"a"}},
+ {"criticality": "error", "check": {"function": "is_not_null", "arguments": {"column": "x"}}, "filter": {"a"}},
+ ]
+ with caplog.at_level(logging.WARNING, logger="databricks.labs.dqx.checks_semantic_validator"):
+ # Must not raise; the malformed checks are skipped.
+ assert not ChecksSemanticValidator.detect_duplicates(checks)
+ assert not ChecksSemanticValidator.detect_conflicts(checks)
+ assert any("Skipping" in r.message for r in caplog.records)
+
+
+# ---------------------------------------------------------------------------
+# validate_ruleset
+# ---------------------------------------------------------------------------
+
+
+def test_validate_ruleset_combines_both():
+ checks = [
+ _make_check("is_not_null", "age"),
+ _make_check("is_not_null", "age"), # duplicate
+ _make_check("is_in_range", "score", min=0, max=100),
+ _make_check("is_in_range", "score", min=0, max=50), # conflict
+ ]
+ issues = ChecksSemanticValidator.validate_ruleset(checks)
+ assert len(issues) == 2
+ assert any("Duplicate" in i for i in issues)
+ assert any("Conflicting" in i for i in issues)
+
+
+def test_validate_ruleset_clean_returns_empty():
+ checks = [
+ _make_check("is_not_null", "age"),
+ _make_check("is_not_null", "name"),
+ _make_check("is_in_range", "score", min=0, max=100),
+ ]
+ assert not ChecksSemanticValidator.validate_ruleset(checks)
+
+
+# ---------------------------------------------------------------------------
+# apply — WARN mode
+# ---------------------------------------------------------------------------
+
+
+def test_apply_warn_mode_logs_and_does_not_raise(caplog):
+ checks = [
+ _make_check("is_not_null", "age"),
+ _make_check("is_not_null", "age"),
+ ]
+ with caplog.at_level(logging.WARNING, logger="databricks.labs.dqx.checks_semantic_validator"):
+ ChecksSemanticValidator.apply(checks, mode=ChecksSemanticValidationMode.WARN)
+
+ assert any("Duplicate" in r.message for r in caplog.records)
+
+
+def test_apply_warn_mode_clean_ruleset_no_logs(caplog):
+ checks = [_make_check("is_not_null", "age")]
+ with caplog.at_level(logging.WARNING, logger="databricks.labs.dqx.checks_semantic_validator"):
+ ChecksSemanticValidator.apply(checks, mode=ChecksSemanticValidationMode.WARN)
+ assert caplog.records == []
+
+
+# ---------------------------------------------------------------------------
+# apply — FAIL mode
+# ---------------------------------------------------------------------------
+
+
+def test_apply_fail_mode_raises_on_duplicate():
+ checks = [
+ _make_check("is_not_null", "age"),
+ _make_check("is_not_null", "age"),
+ ]
+ with pytest.raises(ValueError, match="Semantic validation failed"):
+ ChecksSemanticValidator.apply(checks, mode=ChecksSemanticValidationMode.FAIL)
+
+
+def test_apply_fail_mode_raises_on_conflict():
+ checks = [
+ _make_check("is_in_range", "age", min=0, max=120),
+ _make_check("is_in_range", "age", min=18, max=65),
+ ]
+ with pytest.raises(ValueError, match="Semantic validation failed"):
+ ChecksSemanticValidator.apply(checks, mode=ChecksSemanticValidationMode.FAIL)
+
+
+def test_apply_fail_mode_clean_ruleset_does_not_raise():
+ checks = [_make_check("is_not_null", "age")]
+ ChecksSemanticValidator.apply(checks, mode=ChecksSemanticValidationMode.FAIL) # should not raise
+
+
+def test_apply_invalid_mode_raises():
+ with pytest.raises(ValueError, match="Unsupported semantic validation mode"):
+ ChecksSemanticValidator.apply([], mode="invalid")
+
+
+# ---------------------------------------------------------------------------
+# apply — None mode (skip validation)
+# ---------------------------------------------------------------------------
+
+
+def test_apply_none_mode_skips_validation():
+ """Passing mode=None should skip all semantic checks entirely."""
+ checks = [
+ _make_check("is_not_null", "age"),
+ _make_check("is_not_null", "age"), # would normally be a duplicate
+ ]
+ # Should not raise and should not log
+ ChecksSemanticValidator.apply(checks, mode=None)