From ec281f9c5e096c00927a15a5850898b1c031ea5b Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 4 Jun 2026 14:13:34 +0800 Subject: [PATCH 1/3] feat: add distributed cleanup old versions --- docs/src/compaction.md | 116 +++++++++- lance_ray/__init__.py | 3 + lance_ray/cleanup.py | 240 +++++++++++++++++++++ tests/test_cleanup.py | 470 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 827 insertions(+), 2 deletions(-) create mode 100644 lance_ray/cleanup.py create mode 100644 tests/test_cleanup.py diff --git a/docs/src/compaction.md b/docs/src/compaction.md index 52fbacdf..64b51aad 100644 --- a/docs/src/compaction.md +++ b/docs/src/compaction.md @@ -1,8 +1,10 @@ -# Distributed Compaction +# Distributed Compaction and Cleanup As Lance datasets evolve over time (e.g., frequent appends / overwrites), they can accumulate many small fragments. Compaction rewrites fragments into fewer, larger fragments to improve scan and query performance. -Lance-Ray provides a distributed compaction workflow backed by Ray workers. +Lance-Ray provides distributed maintenance workflows backed by Ray workers. +Compaction can split one table into multiple compaction tasks; cleanup runs +old-version cleanup across namespace tables. ## `compact_files` @@ -66,6 +68,83 @@ This function lists tables under `database` via the namespace API and runs `comp **Returns:** A list of dictionaries, one per table, with keys `table_id` (the full table identifier) and `metrics` (the compaction result, or `None` if no compaction was needed). +## `cleanup_old_versions` + +```python +cleanup_old_versions( + uri=None, + *, + table_id=None, + older_than=None, + retain_versions=None, + delete_unverified=False, + error_if_tagged_old_versions=True, + delete_rate_limit=None, + storage_options=None, + namespace_impl=None, + namespace_properties=None, +) +``` + +Clean old dataset versions in a single Lance table. This delegates deletion +planning and safety checks to Lance core. + +**Parameters:** + +- `uri`: Dataset URI to clean (either `uri` OR `namespace_impl` + `table_id` required) +- `table_id`: Table identifier as a list of strings (requires `namespace_impl`) +- `older_than`: Optional `datetime.timedelta`; versions older than this may be removed +- `retain_versions`: Optional number of latest versions to retain +- `delete_unverified`: Delete unverified files without the default age guard. Only use this when no other process is writing to the dataset. +- `error_if_tagged_old_versions`: Raise if tagged versions match the cleanup policy (default: `True`) +- `delete_rate_limit`: Optional maximum delete operations per second +- `storage_options`: Optional storage configuration dictionary +- `namespace_impl`: Namespace implementation type (e.g., `"rest"`, `"dir"`) +- `namespace_properties`: Properties for connecting to the namespace + +**Returns:** `CleanupStats`. + +## `cleanup_database_old_versions` + +```python +cleanup_database_old_versions( + *, + database, + namespace_impl, + namespace_properties=None, + older_than=None, + retain_versions=None, + delete_unverified=False, + error_if_tagged_old_versions=True, + delete_rate_limit=None, + num_workers=4, + storage_options=None, + ray_remote_args=None, +) +``` + +Clean old versions for all tables under a database (namespace). This function +lists tables under `database` via the namespace API and runs one table cleanup +task per table using a Ray Pool. + +**Parameters:** + +- `database`: Database (namespace) identifier as a list of path segments, e.g. `['my_database']` +- `namespace_impl`: Namespace implementation type (e.g., `"rest"`, `"dir"`) +- `namespace_properties`: Properties for connecting to the namespace +- `older_than`: Optional `datetime.timedelta`; versions older than this may be removed +- `retain_versions`: Optional number of latest versions to retain +- `delete_unverified`: Delete unverified files without the default age guard. Only use this when no other process is writing to the datasets. +- `error_if_tagged_old_versions`: Raise if tagged versions match the cleanup policy (default: `True`) +- `delete_rate_limit`: Optional maximum delete operations per second per table +- `num_workers`: Number of Ray workers across tables (default: 4) +- `storage_options`: Optional storage configuration dictionary +- `ray_remote_args`: Optional kwargs for Ray remote tasks + +**Returns:** A list of dictionaries, one per table, with keys `table_id` and +`stats`. `stats` is a plain dictionary with the cleanup counters returned by +Lance. + ## Examples ### Compact a single table by URI @@ -112,3 +191,36 @@ results = lr.compact_database( for item in results: print(item["table_id"], item["metrics"]) ``` + +### Clean old versions for one table + +```python +from datetime import timedelta +import lance_ray as lr + +stats = lr.cleanup_old_versions( + uri="/path/to/table.lance", + older_than=timedelta(days=7), + retain_versions=3, +) +print(stats) +``` + +### Clean old versions across a database + +```python +from datetime import timedelta +import lance_ray as lr + +results = lr.cleanup_database_old_versions( + database=["my_db"], + namespace_impl="dir", + namespace_properties={"root": "/path/to/tables"}, + older_than=timedelta(days=7), + retain_versions=3, + num_workers=4, +) + +for item in results: + print(item["table_id"], item["stats"]) +``` diff --git a/lance_ray/__init__.py b/lance_ray/__init__.py index b448a0ae..38bcb674 100644 --- a/lance_ray/__init__.py +++ b/lance_ray/__init__.py @@ -8,6 +8,7 @@ __version__ = "0.4.2" __author__ = "LanceDB Devs" __email__ = "dev@lancedb.com" +from .cleanup import cleanup_database_old_versions, cleanup_old_versions from .compaction import compact_database, compact_files # Main imports @@ -42,6 +43,8 @@ "optimize_indices", "compact_files", "compact_database", + "cleanup_old_versions", + "cleanup_database_old_versions", "LanceFragmentWriter", "LanceFragmentCommitter", ] diff --git a/lance_ray/cleanup.py b/lance_ray/cleanup.py new file mode 100644 index 00000000..b9c283e2 --- /dev/null +++ b/lance_ray/cleanup.py @@ -0,0 +1,240 @@ +import logging +from datetime import timedelta +from typing import Any, Optional + +import lance +from lance.lance import CleanupStats +from ray.util.multiprocessing import Pool + +from .utils import ( + get_namespace_kwargs, + get_or_create_namespace, + validate_uri_or_namespace, +) + +logger = logging.getLogger(__name__) + +CLEANUP_STATS_FIELDS = ( + "bytes_removed", + "old_versions", + "data_files_removed", + "transaction_files_removed", + "index_files_removed", + "deletion_files_removed", +) + + +def _cleanup_stats_to_dict(stats: CleanupStats) -> dict[str, Any]: + return {field: getattr(stats, field) for field in CLEANUP_STATS_FIELDS} + + +def _resolve_dataset( + uri: Optional[str], + storage_options: Optional[dict[str, str]], + namespace_impl: Optional[str], + namespace_properties: Optional[dict[str, str]], + table_id: Optional[list[str]], +) -> tuple[str, dict[str, Any], dict[str, Any]]: + merged_storage_options: dict[str, Any] = {} + if storage_options: + merged_storage_options.update(storage_options) + + namespace = get_or_create_namespace(namespace_impl, namespace_properties) + if namespace is not None and table_id is not None: + from lance_namespace import DescribeTableRequest + + describe_response = namespace.describe_table(DescribeTableRequest(id=table_id)) + uri = describe_response.location + if describe_response.storage_options: + merged_storage_options.update(describe_response.storage_options) + + if uri is None: + raise ValueError( + "Namespace table resolution did not return a dataset location." + ) + + namespace_kwargs = get_namespace_kwargs( + namespace_impl, namespace_properties, table_id + ) + return uri, merged_storage_options, namespace_kwargs + + +def cleanup_old_versions( + uri: Optional[str] = None, + *, + table_id: Optional[list[str]] = None, + older_than: Optional[timedelta] = None, + retain_versions: Optional[int] = None, + delete_unverified: bool = False, + error_if_tagged_old_versions: bool = True, + delete_rate_limit: Optional[int] = None, + storage_options: Optional[dict[str, str]] = None, + namespace_impl: Optional[str] = None, + namespace_properties: Optional[dict[str, str]] = None, +) -> CleanupStats: + """Clean up old versions in one Lance dataset. + + This delegates file safety and deletion policy to Lance core. Use + ``cleanup_database_old_versions`` to run cleanup across many namespace tables + with Ray workers. + """ + validate_uri_or_namespace(uri, namespace_impl, table_id) + + dataset_uri, merged_storage_options, namespace_kwargs = _resolve_dataset( + uri, + storage_options, + namespace_impl, + namespace_properties, + table_id, + ) + + logger.info("Cleaning up old versions for dataset %s", dataset_uri) + dataset = lance.LanceDataset( + dataset_uri, + storage_options=merged_storage_options, + **namespace_kwargs, + ) + stats = dataset.cleanup_old_versions( + older_than=older_than, + retain_versions=retain_versions, + delete_unverified=delete_unverified, + error_if_tagged_old_versions=error_if_tagged_old_versions, + delete_rate_limit=delete_rate_limit, + ) + logger.info( + "Cleanup completed for dataset %s: bytes_removed=%s, old_versions=%s", + dataset_uri, + stats.bytes_removed, + stats.old_versions, + ) + return stats + + +def _handle_cleanup_table( + *, + older_than: Optional[timedelta], + retain_versions: Optional[int], + delete_unverified: bool, + error_if_tagged_old_versions: bool, + delete_rate_limit: Optional[int], + storage_options: Optional[dict[str, str]], + namespace_impl: str, + namespace_properties: Optional[dict[str, str]], +): + def func(table_id: list[str]) -> dict[str, Any]: + try: + logger.info("Cleaning up old versions for table %s", table_id) + stats = cleanup_old_versions( + uri=None, + table_id=table_id, + older_than=older_than, + retain_versions=retain_versions, + delete_unverified=delete_unverified, + error_if_tagged_old_versions=error_if_tagged_old_versions, + delete_rate_limit=delete_rate_limit, + storage_options=storage_options, + namespace_impl=namespace_impl, + namespace_properties=namespace_properties, + ) + logger.info("Cleanup completed for table %s: %s", table_id, stats) + return { + "status": "success", + "table_id": table_id, + "stats": _cleanup_stats_to_dict(stats), + } + except Exception as e: + logger.exception("Cleanup failed for table %s: %s", table_id, e) + return { + "status": "error", + "table_id": table_id, + "error": str(e), + } + + return func + + +def cleanup_database_old_versions( + *, + database: list[str], + namespace_impl: str, + namespace_properties: Optional[dict[str, str]] = None, + older_than: Optional[timedelta] = None, + retain_versions: Optional[int] = None, + delete_unverified: bool = False, + error_if_tagged_old_versions: bool = True, + delete_rate_limit: Optional[int] = None, + num_workers: int = 4, + storage_options: Optional[dict[str, str]] = None, + ray_remote_args: Optional[dict[str, Any]] = None, +) -> list[dict[str, Any]]: + """Clean up old versions for every table under a namespace database.""" + if not database: + raise ValueError("'database' must be a non-empty list of path segments.") + if not namespace_impl: + raise ValueError( + "'namespace_impl' is required when using cleanup_database_old_versions." + ) + if num_workers <= 0: + raise ValueError("'num_workers' must be positive.") + + from lance_namespace import ListTablesRequest + + namespace = get_or_create_namespace(namespace_impl, namespace_properties) + if namespace is None: + raise RuntimeError( + "Failed to create namespace from namespace_impl and namespace_properties." + ) + + all_tables: list[str] = [] + page_token: Optional[str] = None + limit = 500 + + while True: + request = ListTablesRequest( + id=database, + page_token=page_token, + limit=limit, + include_declared=False, + ) + response = namespace.list_tables(request) + all_tables.extend(response.tables) + page_token = getattr(response, "page_token", None) + if not page_token: + break + + if not all_tables: + logger.info("No tables found under database %s, nothing to clean up.", database) + return [] + + table_ids = [database + [table_name] for table_name in all_tables] + processes = min(num_workers, len(table_ids)) + pool = Pool(processes=processes, ray_remote_args=ray_remote_args) + task_handler = _handle_cleanup_table( + older_than=older_than, + retain_versions=retain_versions, + delete_unverified=delete_unverified, + error_if_tagged_old_versions=error_if_tagged_old_versions, + delete_rate_limit=delete_rate_limit, + storage_options=storage_options, + namespace_impl=namespace_impl, + namespace_properties=namespace_properties, + ) + + try: + results = pool.map_async(task_handler, table_ids, chunksize=1).get() + except Exception as e: + raise RuntimeError(f"Failed to complete distributed cleanup: {e}") from e + finally: + pool.close() + pool.join() + + failed_results = [result for result in results if result["status"] == "error"] + if failed_results: + error_messages = [ + f"{result['table_id']}: {result['error']}" for result in failed_results + ] + raise RuntimeError(f"Cleanup failed: {'; '.join(error_messages)}") + + return [ + {"table_id": result["table_id"], "stats": result["stats"]} for result in results + ] diff --git a/tests/test_cleanup.py b/tests/test_cleanup.py new file mode 100644 index 00000000..e1bd2eae --- /dev/null +++ b/tests/test_cleanup.py @@ -0,0 +1,470 @@ +"""Tests for old-version cleanup helpers.""" + +from datetime import timedelta +from types import SimpleNamespace +from unittest.mock import MagicMock, patch + +import lance_ray as lr +import pytest + + +def make_cleanup_stats(**overrides): + values = { + "bytes_removed": 0, + "old_versions": 0, + "data_files_removed": 0, + "transaction_files_removed": 0, + "index_files_removed": 0, + "deletion_files_removed": 0, + } + values.update(overrides) + return SimpleNamespace(**values) + + +def cleanup_stats_dict(**overrides): + values = { + "bytes_removed": 0, + "old_versions": 0, + "data_files_removed": 0, + "transaction_files_removed": 0, + "index_files_removed": 0, + "deletion_files_removed": 0, + } + values.update(overrides) + return values + + +def test_cleanup_old_versions_passes_options_to_lance_dataset(monkeypatch): + captured = {} + stats = make_cleanup_stats( + bytes_removed=10, + old_versions=2, + data_files_removed=3, + transaction_files_removed=4, + index_files_removed=5, + deletion_files_removed=6, + ) + + class FakeDataset: + def __init__(self, uri, **kwargs): + captured["dataset"] = {"uri": uri, **kwargs} + + def cleanup_old_versions(self, **kwargs): + captured["cleanup"] = kwargs + return stats + + monkeypatch.setattr("lance_ray.cleanup.lance.LanceDataset", FakeDataset) + + result = lr.cleanup_old_versions( + uri="s3://bucket/table.lance", + older_than=timedelta(days=7), + retain_versions=3, + delete_unverified=True, + error_if_tagged_old_versions=False, + delete_rate_limit=100, + storage_options={"region": "us-west-2"}, + ) + + assert result is stats + assert captured["dataset"] == { + "uri": "s3://bucket/table.lance", + "storage_options": {"region": "us-west-2"}, + } + assert captured["cleanup"] == { + "older_than": timedelta(days=7), + "retain_versions": 3, + "delete_unverified": True, + "error_if_tagged_old_versions": False, + "delete_rate_limit": 100, + } + + +def test_cleanup_old_versions_resolves_namespace_storage_options(monkeypatch): + captured = {} + stats = make_cleanup_stats() + + class FakeDataset: + def __init__(self, uri, **kwargs): + captured["dataset"] = {"uri": uri, **kwargs} + + def cleanup_old_versions(self, **kwargs): + return stats + + namespace = MagicMock() + namespace.describe_table.return_value = SimpleNamespace( + location="s3://bucket/ns/table.lance", + storage_options={"secret_access_key": "namespace-secret"}, + ) + + monkeypatch.setattr("lance_ray.cleanup.lance.LanceDataset", FakeDataset) + monkeypatch.setattr( + "lance_ray.cleanup.get_or_create_namespace", + lambda namespace_impl, namespace_properties: namespace, + ) + monkeypatch.setattr( + "lance_ray.cleanup.get_namespace_kwargs", + lambda namespace_impl, namespace_properties, table_id: { + "namespace_client": "fake-namespace", + "table_id": table_id, + }, + ) + + result = lr.cleanup_old_versions( + table_id=["db", "table"], + storage_options={ + "access_key_id": "user-access", + "secret_access_key": "user-secret", + }, + namespace_impl="dir", + namespace_properties={"root": "/tmp/tables"}, + ) + + assert result is stats + assert captured["dataset"] == { + "uri": "s3://bucket/ns/table.lance", + "storage_options": { + "access_key_id": "user-access", + "secret_access_key": "namespace-secret", + }, + "namespace_client": "fake-namespace", + "table_id": ["db", "table"], + } + request = namespace.describe_table.call_args.args[0] + assert request.id == ["db", "table"] + + +def test_cleanup_old_versions_missing_namespace_location_raises(monkeypatch): + namespace = MagicMock() + namespace.describe_table.return_value = SimpleNamespace( + location=None, + storage_options=None, + ) + + monkeypatch.setattr( + "lance_ray.cleanup.get_or_create_namespace", + lambda namespace_impl, namespace_properties: namespace, + ) + + with pytest.raises(ValueError, match="dataset location"): + lr.cleanup_old_versions( + table_id=["db", "table"], + namespace_impl="dir", + namespace_properties={"root": "/tmp/tables"}, + ) + + +def test_cleanup_database_old_versions_empty_database_raises(): + with pytest.raises(ValueError, match="database.*non-empty"): + lr.cleanup_database_old_versions(database=[], namespace_impl="dir") + + +def test_cleanup_database_old_versions_missing_namespace_impl_raises(): + with pytest.raises(ValueError, match="namespace_impl.*required"): + lr.cleanup_database_old_versions(database=["db"], namespace_impl="") + + +def test_cleanup_database_old_versions_empty_tables_returns_empty_list(): + namespace = MagicMock() + namespace.list_tables.return_value = SimpleNamespace(tables=[], page_token=None) + + with patch( + "lance_ray.cleanup.get_or_create_namespace", + return_value=namespace, + ): + assert ( + lr.cleanup_database_old_versions( + database=["db"], + namespace_impl="dir", + namespace_properties={"root": "/tmp/tables"}, + ) + == [] + ) + + +def test_cleanup_database_old_versions_runs_tables_in_pool(monkeypatch): + captured = {"cleanup_calls": []} + stats_by_table = { + ("db", "table_a"): make_cleanup_stats(bytes_removed=1, old_versions=1), + ("db", "table_b"): make_cleanup_stats(bytes_removed=2, old_versions=2), + } + + namespace = MagicMock() + namespace.list_tables.return_value = SimpleNamespace( + tables=["table_a", "table_b"], + page_token=None, + ) + + class FakeAsyncResult: + def __init__(self, results): + self._results = results + + def get(self): + return self._results + + class FakePool: + def __init__(self, processes, ray_remote_args=None): + captured["pool"] = { + "processes": processes, + "ray_remote_args": ray_remote_args, + } + + def map_async(self, func, items, chunksize=1): + captured["map"] = {"items": items, "chunksize": chunksize} + return FakeAsyncResult([func(item) for item in items]) + + def close(self): + captured["closed"] = True + + def join(self): + captured["joined"] = True + + def fake_cleanup_old_versions(**kwargs): + captured["cleanup_calls"].append(kwargs) + return stats_by_table[tuple(kwargs["table_id"])] + + monkeypatch.setattr( + "lance_ray.cleanup.get_or_create_namespace", + lambda namespace_impl, namespace_properties: namespace, + ) + monkeypatch.setattr("lance_ray.cleanup.Pool", FakePool) + monkeypatch.setattr( + "lance_ray.cleanup.cleanup_old_versions", + fake_cleanup_old_versions, + ) + + results = lr.cleanup_database_old_versions( + database=["db"], + namespace_impl="dir", + namespace_properties={"root": "/tmp/tables"}, + older_than=timedelta(days=3), + retain_versions=5, + delete_unverified=True, + error_if_tagged_old_versions=False, + delete_rate_limit=50, + num_workers=8, + storage_options={"region": "us-west-2"}, + ray_remote_args={"num_cpus": 1}, + ) + + assert captured["pool"] == { + "processes": 2, + "ray_remote_args": {"num_cpus": 1}, + } + assert captured["map"] == { + "items": [["db", "table_a"], ["db", "table_b"]], + "chunksize": 1, + } + assert captured["closed"] is True + assert captured["joined"] is True + assert [tuple(call["table_id"]) for call in captured["cleanup_calls"]] == [ + ("db", "table_a"), + ("db", "table_b"), + ] + assert all( + call["older_than"] == timedelta(days=3) for call in captured["cleanup_calls"] + ) + assert all(call["retain_versions"] == 5 for call in captured["cleanup_calls"]) + assert all(call["delete_unverified"] is True for call in captured["cleanup_calls"]) + assert all( + call["error_if_tagged_old_versions"] is False + for call in captured["cleanup_calls"] + ) + assert all(call["delete_rate_limit"] == 50 for call in captured["cleanup_calls"]) + assert all( + call["storage_options"] == {"region": "us-west-2"} + for call in captured["cleanup_calls"] + ) + assert all(call["namespace_impl"] == "dir" for call in captured["cleanup_calls"]) + assert all( + call["namespace_properties"] == {"root": "/tmp/tables"} + for call in captured["cleanup_calls"] + ) + assert results == [ + { + "table_id": ["db", "table_a"], + "stats": cleanup_stats_dict(bytes_removed=1, old_versions=1), + }, + { + "table_id": ["db", "table_b"], + "stats": cleanup_stats_dict(bytes_removed=2, old_versions=2), + }, + ] + + +def test_cleanup_database_old_versions_paginates_tables(monkeypatch): + captured = {"requests": []} + stats = make_cleanup_stats(bytes_removed=1, old_versions=1) + namespace = MagicMock() + + def list_tables(request): + captured["requests"].append(request) + if request.page_token is None: + return SimpleNamespace(tables=["table_a"], page_token="next") + return SimpleNamespace(tables=["table_b"], page_token=None) + + namespace.list_tables.side_effect = list_tables + + class FakeAsyncResult: + def __init__(self, results): + self._results = results + + def get(self): + return self._results + + class FakePool: + def __init__(self, processes, ray_remote_args=None): + pass + + def map_async(self, func, items, chunksize=1): + return FakeAsyncResult([func(item) for item in items]) + + def close(self): + pass + + def join(self): + pass + + monkeypatch.setattr( + "lance_ray.cleanup.get_or_create_namespace", + lambda namespace_impl, namespace_properties: namespace, + ) + monkeypatch.setattr("lance_ray.cleanup.Pool", FakePool) + monkeypatch.setattr( + "lance_ray.cleanup.cleanup_old_versions", + lambda **kwargs: stats, + ) + + results = lr.cleanup_database_old_versions( + database=["db"], + namespace_impl="dir", + ) + + assert [request.page_token for request in captured["requests"]] == [None, "next"] + assert [request.id for request in captured["requests"]] == [["db"], ["db"]] + assert [request.include_declared for request in captured["requests"]] == [ + False, + False, + ] + assert results == [ + { + "table_id": ["db", "table_a"], + "stats": cleanup_stats_dict(bytes_removed=1, old_versions=1), + }, + { + "table_id": ["db", "table_b"], + "stats": cleanup_stats_dict(bytes_removed=1, old_versions=1), + }, + ] + + +def test_cleanup_database_old_versions_raises_on_table_failure(monkeypatch): + namespace = MagicMock() + namespace.list_tables.return_value = SimpleNamespace( + tables=["table_a", "table_b", "table_c"], + page_token=None, + ) + + class FakeAsyncResult: + def __init__(self, results): + self._results = results + + def get(self): + return self._results + + class FakePool: + def __init__(self, processes, ray_remote_args=None): + pass + + def map_async(self, func, items, chunksize=1): + return FakeAsyncResult([func(item) for item in items]) + + def close(self): + pass + + def join(self): + pass + + monkeypatch.setattr( + "lance_ray.cleanup.get_or_create_namespace", + lambda namespace_impl, namespace_properties: namespace, + ) + monkeypatch.setattr("lance_ray.cleanup.Pool", FakePool) + + def fake_cleanup_old_versions(**kwargs): + if kwargs["table_id"] == ["db", "table_b"]: + return make_cleanup_stats() + raise RuntimeError(f"boom {kwargs['table_id'][-1]}") + + monkeypatch.setattr( + "lance_ray.cleanup.cleanup_old_versions", + fake_cleanup_old_versions, + ) + + with pytest.raises( + RuntimeError, + match=r"Cleanup failed: .*table_a.*boom table_a.*table_c.*boom table_c", + ): + lr.cleanup_database_old_versions( + database=["db"], + namespace_impl="dir", + ) + + +def test_cleanup_database_old_versions_invalid_num_workers_raises(): + with pytest.raises(ValueError, match="num_workers.*positive"): + lr.cleanup_database_old_versions( + database=["db"], + namespace_impl="dir", + num_workers=0, + ) + + +def test_cleanup_database_old_versions_namespace_creation_failure_raises(monkeypatch): + monkeypatch.setattr( + "lance_ray.cleanup.get_or_create_namespace", + lambda namespace_impl, namespace_properties: None, + ) + + with pytest.raises(RuntimeError, match="Failed to create namespace"): + lr.cleanup_database_old_versions( + database=["db"], + namespace_impl="dir", + ) + + +def test_cleanup_database_old_versions_pool_get_failure_raises(monkeypatch): + namespace = MagicMock() + namespace.list_tables.return_value = SimpleNamespace( + tables=["table_a"], + page_token=None, + ) + + class FakeAsyncResult: + def get(self): + raise RuntimeError("ray unavailable") + + class FakePool: + def __init__(self, processes, ray_remote_args=None): + self.closed = False + self.joined = False + + def map_async(self, func, items, chunksize=1): + return FakeAsyncResult() + + def close(self): + self.closed = True + + def join(self): + self.joined = True + + monkeypatch.setattr( + "lance_ray.cleanup.get_or_create_namespace", + lambda namespace_impl, namespace_properties: namespace, + ) + monkeypatch.setattr("lance_ray.cleanup.Pool", FakePool) + + with pytest.raises(RuntimeError, match="Failed to complete distributed cleanup"): + lr.cleanup_database_old_versions( + database=["db"], + namespace_impl="dir", + ) From c9d4a8589c07386065a969467f927a3080cf4165 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 4 Jun 2026 16:18:37 +0800 Subject: [PATCH 2/3] fix: harden distributed cleanup with validation, docs, and tests Follow-up review hardening on top of the distributed cleanup feature. - Guard retain_versions <= 0 in both cleanup_old_versions and cleanup_database_old_versions. retain_versions=0 otherwise triggers a Rust PanicException in Lance core (a BaseException) that escapes the worker's `except Exception`, aborting the whole batch with a generic error and breaking the documented per-table aggregation contract. - Expand docstrings to the compaction.py bar: full Args/Returns/Raises, the deliberate parallel-aggregate vs serial-fail-fast contrast, num_workers semantics, the non-atomic/destructive Warning, the 7-day delete_unverified guard, and the conditional two-week older_than default. - Extract the page-size literal into the named _LIST_TABLES_PAGE_SIZE, annotate _handle_cleanup_table's return type, and tighten _cleanup_stats_to_dict to dict[str, int]. - Document the parallel/aggregate behavior, non-atomic warning, and stat field names in docs/src/compaction.md. - Strengthen tests: uri-vs-namespace validation, retain_versions rejection, safe-default pinning (incl. num_workers=4), 3-page pagination, exact partial-failure message, failure-path pool close/join and __cause__ chaining, and a CleanupStats field drift guard. --- docs/src/compaction.md | 19 +++- lance_ray/cleanup.py | 138 ++++++++++++++++++++++-- tests/test_cleanup.py | 240 +++++++++++++++++++++++++++++++++-------- 3 files changed, 343 insertions(+), 54 deletions(-) diff --git a/docs/src/compaction.md b/docs/src/compaction.md index 64b51aad..3b0519a2 100644 --- a/docs/src/compaction.md +++ b/docs/src/compaction.md @@ -95,7 +95,7 @@ planning and safety checks to Lance core. - `table_id`: Table identifier as a list of strings (requires `namespace_impl`) - `older_than`: Optional `datetime.timedelta`; versions older than this may be removed - `retain_versions`: Optional number of latest versions to retain -- `delete_unverified`: Delete unverified files without the default age guard. Only use this when no other process is writing to the dataset. +- `delete_unverified`: Delete unverified files without Lance's default (7-day) age guard. Only use this when no other process is writing to the dataset. - `error_if_tagged_old_versions`: Raise if tagged versions match the cleanup policy (default: `True`) - `delete_rate_limit`: Optional maximum delete operations per second - `storage_options`: Optional storage configuration dictionary @@ -127,6 +127,18 @@ Clean old versions for all tables under a database (namespace). This function lists tables under `database` via the namespace API and runs one table cleanup task per table using a Ray Pool. +Unlike `compact_database` (which processes tables serially and fails fast on the +first error), cleanup runs tables **in parallel** and **aggregates** per-table +errors: every table is attempted, and a single error summarizing all failures is +raised only after the pool finishes. `num_workers` therefore bounds concurrency +*across tables*, not within a single table. + +!!! warning + + This operation is destructive and **not atomic**. Tables are cleaned eagerly + by workers, so when it raises for a failed table, other tables may already + have had old versions deleted. + **Parameters:** - `database`: Database (namespace) identifier as a list of path segments, e.g. `['my_database']` @@ -134,7 +146,7 @@ task per table using a Ray Pool. - `namespace_properties`: Properties for connecting to the namespace - `older_than`: Optional `datetime.timedelta`; versions older than this may be removed - `retain_versions`: Optional number of latest versions to retain -- `delete_unverified`: Delete unverified files without the default age guard. Only use this when no other process is writing to the datasets. +- `delete_unverified`: Delete unverified files without Lance's default (7-day) age guard. Only use this when no other process is writing to the datasets. - `error_if_tagged_old_versions`: Raise if tagged versions match the cleanup policy (default: `True`) - `delete_rate_limit`: Optional maximum delete operations per second per table - `num_workers`: Number of Ray workers across tables (default: 4) @@ -143,7 +155,8 @@ task per table using a Ray Pool. **Returns:** A list of dictionaries, one per table, with keys `table_id` and `stats`. `stats` is a plain dictionary with the cleanup counters returned by -Lance. +Lance: `bytes_removed`, `old_versions`, `data_files_removed`, +`transaction_files_removed`, `index_files_removed`, and `deletion_files_removed`. ## Examples diff --git a/lance_ray/cleanup.py b/lance_ray/cleanup.py index b9c283e2..0e3975b0 100644 --- a/lance_ray/cleanup.py +++ b/lance_ray/cleanup.py @@ -1,4 +1,5 @@ import logging +from collections.abc import Callable from datetime import timedelta from typing import Any, Optional @@ -23,8 +24,12 @@ "deletion_files_removed", ) +# Page size used when listing tables under a database via the namespace API. +_LIST_TABLES_PAGE_SIZE = 500 -def _cleanup_stats_to_dict(stats: CleanupStats) -> dict[str, Any]: + +def _cleanup_stats_to_dict(stats: CleanupStats) -> dict[str, int]: + """Convert ``CleanupStats`` to a plain, Ray-serializable dict of counters.""" return {field: getattr(stats, field) for field in CLEANUP_STATS_FIELDS} @@ -74,11 +79,48 @@ def cleanup_old_versions( ) -> CleanupStats: """Clean up old versions in one Lance dataset. - This delegates file safety and deletion policy to Lance core. Use - ``cleanup_database_old_versions`` to run cleanup across many namespace tables - with Ray workers. + This delegates file safety and deletion policy to Lance core. Use + :func:`cleanup_database_old_versions` to run cleanup across many namespace + tables with Ray workers. + + Args: + uri: The URI of the Lance dataset to clean. Either ``uri`` OR + (``namespace_impl`` + ``table_id``) must be provided. + table_id: The table identifier as a list of strings. Must be provided + together with ``namespace_impl``. + older_than: Optional ``datetime.timedelta``; only versions older than + this are removed. Delegated to Lance core, which defaults to two + weeks only when both ``older_than`` and ``retain_versions`` are unset. + retain_versions: Optional number of latest versions to retain. + delete_unverified: Delete unverified files without Lance's default age + guard (which otherwise retains files leftover from a failed + transaction until they are at least 7 days old). Only set this when + no other process is writing to the dataset; otherwise in-flight files + of a concurrent writer may be deleted, corrupting the dataset. + error_if_tagged_old_versions: Raise if tagged versions match the cleanup + policy (default: ``True``). + delete_rate_limit: Optional maximum delete operations per second. + storage_options: Storage options for the dataset. Merged with any + options vended by the namespace (namespace values take precedence). + Pass credentials here rather than inline in ``uri``: ``uri`` is + logged and may appear in error messages. + namespace_impl: The namespace implementation type (e.g. ``"rest"``, + ``"dir"``). Used together with ``table_id`` to resolve the dataset + location and vend credentials. + namespace_properties: Properties for connecting to the namespace. + + Returns: + :class:`~lance.lance.CleanupStats` with statistics from the cleanup. + + Raises: + ValueError: If neither ``uri`` nor (``namespace_impl`` + ``table_id``) + is provided, if both are provided, if ``retain_versions`` is not + positive, or if namespace resolution does not return a dataset + location. """ validate_uri_or_namespace(uri, namespace_impl, table_id) + if retain_versions is not None and retain_versions <= 0: + raise ValueError("'retain_versions' must be positive when provided.") dataset_uri, merged_storage_options, namespace_kwargs = _resolve_dataset( uri, @@ -120,7 +162,21 @@ def _handle_cleanup_table( storage_options: Optional[dict[str, str]], namespace_impl: str, namespace_properties: Optional[dict[str, str]], -): +) -> Callable[[list[str]], dict[str, Any]]: + """Build the per-table cleanup task executed by Ray workers. + + Returns a callable that runs :func:`cleanup_old_versions` for a single + ``table_id`` and never raises: every outcome is captured as a Ray- + serializable dict so failures can be aggregated across tables. The dict is + either:: + + {"status": "success", "table_id": list[str], "stats": dict[str, int]} + + or:: + + {"status": "error", "table_id": list[str], "error": str} + """ + def func(table_id: list[str]) -> dict[str, Any]: try: logger.info("Cleaning up old versions for table %s", table_id) @@ -167,7 +223,72 @@ def cleanup_database_old_versions( storage_options: Optional[dict[str, str]] = None, ray_remote_args: Optional[dict[str, Any]] = None, ) -> list[dict[str, Any]]: - """Clean up old versions for every table under a namespace database.""" + """Clean up old versions for every table under a namespace database. + + Lists all tables under ``database`` via the namespace API, then runs + :func:`cleanup_old_versions` on each table. Unlike :func:`compact_database` + (which compacts tables serially and fails fast on the first error), cleanup + runs tables **in parallel** across a Ray ``Pool`` and **aggregates** per-table + errors: every table is attempted, and a single ``RuntimeError`` summarizing + all failures is raised only after the pool finishes. Accordingly, + ``num_workers`` here bounds concurrency *across tables*, not within a single + table. + + Args: + database: The database (namespace) identifier as a list of path segments, + e.g. ``["my_database"]``. All tables under this namespace are cleaned. + namespace_impl: The namespace implementation type (e.g. ``"rest"``, + ``"dir"``). Required for resolving table locations and credentials. + namespace_properties: Properties for connecting to the namespace. + older_than: Optional ``datetime.timedelta``; only versions older than + this are removed (applied to every table). Lance defaults to two + weeks only when both ``older_than`` and ``retain_versions`` are unset. + retain_versions: Optional number of latest versions to retain per table. + delete_unverified: Delete unverified files without Lance's default + (7-day) age guard. Only set this when no other process is writing to + the datasets; otherwise a concurrent writer's files may be deleted. + error_if_tagged_old_versions: Raise if tagged versions match the cleanup + policy (default: ``True``). + delete_rate_limit: Optional maximum delete operations per second *per + table* (each table runs in its own worker, so the aggregate rate may + be up to ``num_workers`` times this value). + num_workers: Maximum number of Ray workers across tables (default: 4). + Capped at the number of tables found. + storage_options: Storage options for the datasets. + ray_remote_args: Options for Ray tasks (e.g. ``num_cpus``, ``resources``). + + Returns: + A list of dicts, one per table, with keys: + - ``"table_id"``: ``list[str]`` – full table identifier + (``database`` + table name). + - ``"stats"``: ``dict[str, int]`` – the cleanup counters returned by + Lance (see :data:`CLEANUP_STATS_FIELDS`). + + Raises: + ValueError: If ``database`` is empty, ``namespace_impl`` is missing, + ``num_workers`` is not positive, or ``retain_versions`` is not + positive. + RuntimeError: If the namespace cannot be created, if the distributed + cleanup cannot complete, or if any table's cleanup fails (the message + aggregates every failed table). + + Warning: + This operation is destructive and **not atomic**. Tables are cleaned + eagerly by workers, so when a ``RuntimeError`` is raised for a failed + table, other tables may already have had old versions deleted. + + Example: + >>> results = cleanup_database_old_versions( + ... database=["my_db"], + ... namespace_impl="dir", + ... namespace_properties={"root": "/path/to/tables"}, + ... older_than=timedelta(days=7), + ... retain_versions=3, + ... num_workers=4, + ... ) + >>> for item in results: + ... print(item["table_id"], item["stats"]) + """ if not database: raise ValueError("'database' must be a non-empty list of path segments.") if not namespace_impl: @@ -176,6 +297,8 @@ def cleanup_database_old_versions( ) if num_workers <= 0: raise ValueError("'num_workers' must be positive.") + if retain_versions is not None and retain_versions <= 0: + raise ValueError("'retain_versions' must be positive when provided.") from lance_namespace import ListTablesRequest @@ -187,13 +310,12 @@ def cleanup_database_old_versions( all_tables: list[str] = [] page_token: Optional[str] = None - limit = 500 while True: request = ListTablesRequest( id=database, page_token=page_token, - limit=limit, + limit=_LIST_TABLES_PAGE_SIZE, include_declared=False, ) response = namespace.list_tables(request) diff --git a/tests/test_cleanup.py b/tests/test_cleanup.py index e1bd2eae..18f05d4a 100644 --- a/tests/test_cleanup.py +++ b/tests/test_cleanup.py @@ -6,9 +6,11 @@ import lance_ray as lr import pytest +from lance_ray.cleanup import _LIST_TABLES_PAGE_SIZE, CLEANUP_STATS_FIELDS def make_cleanup_stats(**overrides): + """Build a stand-in for ``lance.lance.CleanupStats`` (attribute access).""" values = { "bytes_removed": 0, "old_versions": 0, @@ -22,16 +24,35 @@ def make_cleanup_stats(**overrides): def cleanup_stats_dict(**overrides): - values = { - "bytes_removed": 0, - "old_versions": 0, - "data_files_removed": 0, - "transaction_files_removed": 0, - "index_files_removed": 0, - "deletion_files_removed": 0, - } - values.update(overrides) - return values + """The dict form of ``make_cleanup_stats`` — kept in lock-step with it.""" + return dict(vars(make_cleanup_stats(**overrides))) + + +class FakeAsyncResult: + """Stand-in for the ``AsyncResult`` returned by ``Pool.map_async``.""" + + def __init__(self, results): + self._results = results + + def get(self): + return self._results + + +def test_cleanup_stats_fields_match_lance_cleanup_stats(): + """Guard that every field we read off ``CleanupStats`` actually exists. + + ``_cleanup_stats_to_dict`` does ``getattr(stats, field)`` for each name in + ``CLEANUP_STATS_FIELDS``; if Lance renamed or removed one of those + attributes this would raise ``AttributeError`` at runtime. This asserts the + field list is a subset of the real attributes so such a drift fails loudly + here instead. (It intentionally does not assert equality: Lance adding a new + counter we don't expose is fine and shouldn't break this test.) + """ + from lance.lance import CleanupStats + + real_fields = {attr for attr in dir(CleanupStats) if not attr.startswith("_")} + missing = set(CLEANUP_STATS_FIELDS) - real_fields + assert not missing, f"CLEANUP_STATS_FIELDS not present on CleanupStats: {missing}" def test_cleanup_old_versions_passes_options_to_lance_dataset(monkeypatch): @@ -153,6 +174,59 @@ def test_cleanup_old_versions_missing_namespace_location_raises(monkeypatch): ) +def test_cleanup_old_versions_requires_uri_or_namespace(): + with pytest.raises(ValueError, match="Must provide either"): + lr.cleanup_old_versions() + + +def test_cleanup_old_versions_rejects_uri_and_namespace_together(): + with pytest.raises(ValueError, match="Cannot provide both"): + lr.cleanup_old_versions( + uri="s3://bucket/table.lance", + table_id=["db", "table"], + namespace_impl="dir", + ) + + +@pytest.mark.parametrize("retain_versions", [0, -1]) +def test_cleanup_old_versions_rejects_nonpositive_retain_versions(retain_versions): + # retain_versions=0 triggers a Rust PanicException in Lance core (a + # BaseException that escapes our worker's `except Exception`); guard early. + with pytest.raises(ValueError, match="retain_versions.*positive"): + lr.cleanup_old_versions( + uri="s3://bucket/table.lance", + retain_versions=retain_versions, + ) + + +def test_cleanup_old_versions_uses_safe_defaults(monkeypatch): + # Pin the safety/policy defaults: a regression flipping delete_unverified to + # True or error_if_tagged_old_versions to False must fail here. + captured = {} + stats = make_cleanup_stats() + + class FakeDataset: + def __init__(self, uri, **kwargs): + captured["dataset"] = {"uri": uri, **kwargs} + + def cleanup_old_versions(self, **kwargs): + captured["cleanup"] = kwargs + return stats + + monkeypatch.setattr("lance_ray.cleanup.lance.LanceDataset", FakeDataset) + + result = lr.cleanup_old_versions(uri="s3://bucket/table.lance") + + assert result is stats + assert captured["cleanup"] == { + "older_than": None, + "retain_versions": None, + "delete_unverified": False, + "error_if_tagged_old_versions": True, + "delete_rate_limit": None, + } + + def test_cleanup_database_old_versions_empty_database_raises(): with pytest.raises(ValueError, match="database.*non-empty"): lr.cleanup_database_old_versions(database=[], namespace_impl="dir") @@ -194,13 +268,6 @@ def test_cleanup_database_old_versions_runs_tables_in_pool(monkeypatch): page_token=None, ) - class FakeAsyncResult: - def __init__(self, results): - self._results = results - - def get(self): - return self._results - class FakePool: def __init__(self, processes, ray_remote_args=None): captured["pool"] = { @@ -296,21 +363,21 @@ def test_cleanup_database_old_versions_paginates_tables(monkeypatch): stats = make_cleanup_stats(bytes_removed=1, old_versions=1) namespace = MagicMock() + # Three pages, so an intermediate page_token must be threaded through more + # than one iteration before the loop terminates. + pages = { + None: (["table_a"], "p2"), + "p2": (["table_b"], "p3"), + "p3": (["table_c"], None), + } + def list_tables(request): captured["requests"].append(request) - if request.page_token is None: - return SimpleNamespace(tables=["table_a"], page_token="next") - return SimpleNamespace(tables=["table_b"], page_token=None) + tables, next_token = pages[request.page_token] + return SimpleNamespace(tables=tables, page_token=next_token) namespace.list_tables.side_effect = list_tables - class FakeAsyncResult: - def __init__(self, results): - self._results = results - - def get(self): - return self._results - class FakePool: def __init__(self, processes, ray_remote_args=None): pass @@ -339,12 +406,21 @@ def join(self): namespace_impl="dir", ) - assert [request.page_token for request in captured["requests"]] == [None, "next"] - assert [request.id for request in captured["requests"]] == [["db"], ["db"]] + assert [request.page_token for request in captured["requests"]] == [ + None, + "p2", + "p3", + ] + assert [request.id for request in captured["requests"]] == [["db"], ["db"], ["db"]] assert [request.include_declared for request in captured["requests"]] == [ False, False, + False, ] + # Every page request uses the configured page size. + assert [request.limit for request in captured["requests"]] == [ + _LIST_TABLES_PAGE_SIZE + ] * 3 assert results == [ { "table_id": ["db", "table_a"], @@ -354,6 +430,10 @@ def join(self): "table_id": ["db", "table_b"], "stats": cleanup_stats_dict(bytes_removed=1, old_versions=1), }, + { + "table_id": ["db", "table_c"], + "stats": cleanup_stats_dict(bytes_removed=1, old_versions=1), + }, ] @@ -364,13 +444,6 @@ def test_cleanup_database_old_versions_raises_on_table_failure(monkeypatch): page_token=None, ) - class FakeAsyncResult: - def __init__(self, results): - self._results = results - - def get(self): - return self._results - class FakePool: def __init__(self, processes, ray_remote_args=None): pass @@ -400,14 +473,17 @@ def fake_cleanup_old_versions(**kwargs): fake_cleanup_old_versions, ) - with pytest.raises( - RuntimeError, - match=r"Cleanup failed: .*table_a.*boom table_a.*table_c.*boom table_c", - ): + # The aggregated message lists only the failed tables, by full table_id, + # in input order, joined by "; ". The successful table_b is omitted. + with pytest.raises(RuntimeError) as excinfo: lr.cleanup_database_old_versions( database=["db"], namespace_impl="dir", ) + assert str(excinfo.value) == ( + "Cleanup failed: ['db', 'table_a']: boom table_a; " + "['db', 'table_c']: boom table_c" + ) def test_cleanup_database_old_versions_invalid_num_workers_raises(): @@ -419,6 +495,70 @@ def test_cleanup_database_old_versions_invalid_num_workers_raises(): ) +@pytest.mark.parametrize("retain_versions", [0, -1]) +def test_cleanup_database_old_versions_rejects_nonpositive_retain_versions( + retain_versions, +): + with pytest.raises(ValueError, match="retain_versions.*positive"): + lr.cleanup_database_old_versions( + database=["db"], + namespace_impl="dir", + retain_versions=retain_versions, + ) + + +def test_cleanup_database_old_versions_uses_safe_defaults(monkeypatch): + # Pin the per-table safety/policy defaults AND the num_workers=4 default + # (observable only with >4 tables, since processes = min(num_workers, n)). + captured = {"calls": []} + namespace = MagicMock() + namespace.list_tables.return_value = SimpleNamespace( + tables=["t1", "t2", "t3", "t4", "t5"], + page_token=None, + ) + + class FakePool: + def __init__(self, processes, ray_remote_args=None): + captured["processes"] = processes + captured["ray_remote_args"] = ray_remote_args + + def map_async(self, func, items, chunksize=1): + return FakeAsyncResult([func(item) for item in items]) + + def close(self): + pass + + def join(self): + pass + + def fake_cleanup_old_versions(**kwargs): + captured["calls"].append(kwargs) + return make_cleanup_stats() + + monkeypatch.setattr( + "lance_ray.cleanup.get_or_create_namespace", + lambda namespace_impl, namespace_properties: namespace, + ) + monkeypatch.setattr("lance_ray.cleanup.Pool", FakePool) + monkeypatch.setattr( + "lance_ray.cleanup.cleanup_old_versions", + fake_cleanup_old_versions, + ) + + lr.cleanup_database_old_versions(database=["db"], namespace_impl="dir") + + assert captured["processes"] == 4 + assert captured["ray_remote_args"] is None + assert len(captured["calls"]) == 5 + for call in captured["calls"]: + assert call["older_than"] is None + assert call["retain_versions"] is None + assert call["delete_unverified"] is False + assert call["error_if_tagged_old_versions"] is True + assert call["delete_rate_limit"] is None + assert call["storage_options"] is None + + def test_cleanup_database_old_versions_namespace_creation_failure_raises(monkeypatch): monkeypatch.setattr( "lance_ray.cleanup.get_or_create_namespace", @@ -439,7 +579,9 @@ def test_cleanup_database_old_versions_pool_get_failure_raises(monkeypatch): page_token=None, ) - class FakeAsyncResult: + captured = {} + + class FailingAsyncResult: def get(self): raise RuntimeError("ray unavailable") @@ -447,9 +589,10 @@ class FakePool: def __init__(self, processes, ray_remote_args=None): self.closed = False self.joined = False + captured["pool"] = self def map_async(self, func, items, chunksize=1): - return FakeAsyncResult() + return FailingAsyncResult() def close(self): self.closed = True @@ -463,8 +606,19 @@ def join(self): ) monkeypatch.setattr("lance_ray.cleanup.Pool", FakePool) - with pytest.raises(RuntimeError, match="Failed to complete distributed cleanup"): + with pytest.raises( + RuntimeError, match="Failed to complete distributed cleanup" + ) as excinfo: lr.cleanup_database_old_versions( database=["db"], namespace_impl="dir", ) + + # The original cause is preserved via `raise ... from e`. + assert isinstance(excinfo.value.__cause__, RuntimeError) + assert str(excinfo.value.__cause__) == "ray unavailable" + + # The pool must be released even when map_async().get() raises (the finally + # block), otherwise the Ray pool would leak on failure. + assert captured["pool"].closed is True + assert captured["pool"].joined is True From 242ae02adc688641872dd3323ff5070cd025033a Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 4 Jun 2026 16:18:47 +0800 Subject: [PATCH 3/3] chore: ignore .codegraph index directory --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index e8fece73..a93ad0b3 100644 --- a/.gitignore +++ b/.gitignore @@ -205,3 +205,6 @@ cython_debug/ marimo/_static/ marimo/_lsp/ __marimo__/ + +# CodeGraph index (local dev tooling) +.codegraph/