Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion materializationengine/celery_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def create_celery(app=None):
"result_expires": 86400, # results expire in broker after 1 day
"redis_socket_connect_timeout": 10,
"broker_transport_options": {
"visibility_timeout": 8000,
"visibility_timeout": 21600,
"socket_timeout": 20,
"socket_connect_timeout": 20,
}, # timeout (s) for tasks to be sent back to broker queue
Expand Down
3 changes: 3 additions & 0 deletions materializationengine/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ class BaseConfig:
if "DELTALAKE_OPTIMIZE_TARGET_SIZE_BYTES" in os.environ
else None
)
# this one should help with memory during optimize if it is still a problem,
# but has not been tested on the mesh worker nodes. im not sure how spilling to
# disk on those will work out of the box
DELTALAKE_OPTIMIZE_MAX_SPILL_SIZE_BYTES = (
int(os.environ["DELTALAKE_OPTIMIZE_MAX_SPILL_SIZE_BYTES"])
if "DELTALAKE_OPTIMIZE_MAX_SPILL_SIZE_BYTES" in os.environ
Expand Down
23 changes: 9 additions & 14 deletions materializationengine/workflows/deltalake_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def _spatial_col_rank(col_name: str) -> int:
if spatial_candidates:
spatial_candidates.sort(key=lambda c: _spatial_col_rank(c[0]))
col, owning_table = spatial_candidates[0]
# Spatial index — partition on Morton code, z-order on coordinates
# Spatial index — partition on Morton code, z-order on Morton column
# NOTE: using the uniform range approach here as the percentile approach
# won't work without some extra tooling, as the morton column doesn't
# exist in the db
Expand All @@ -255,7 +255,7 @@ def _spatial_col_rank(col_name: str) -> int:
partition_by=f"{col}_morton",
partition_strategy="uniform_range",
n_partitions="auto",
zorder_columns=[f"{col}_x", f"{col}_y", f"{col}_z"],
zorder_columns=[f"{col}_morton"],
bloom_filter_columns=[],
source_geometry_column=col,
source_table=owning_table,
Expand Down Expand Up @@ -1334,6 +1334,8 @@ def _build_frozen_db_connection_string(
name="deltalake:write_deltalake_table",
bind=True,
acks_late=True,
soft_time_limit=21000,
time_limit=21300,
)
def write_deltalake_table(
self,
Expand Down Expand Up @@ -1465,7 +1467,7 @@ def write_deltalake_table(
)
return

# --- Partial-export detection (task 8.4) ---
# --- Existing Delta Lake detection ---
for spec in resolved_specs:
lake_name = spec.partition_by or "flat"
uri = f"{output_uri_base}/{lake_name}"
Expand All @@ -1489,17 +1491,10 @@ def write_deltalake_table(
)

if existing_rows is not None:
celery_logger.info(
"Existing Delta Lake found for table %s (v%d) at %s: "
"%d rows (expected %d)",
table_name,
version,
uri,
existing_rows,
row_count,
)
celery_logger.info(
"Assuming existing Delta Lake is correct and skipping export for this spec."
raise RuntimeError(
f"Delta Lake for table {table_name!r} already exists at "
f"{uri} with {existing_rows} rows (matches expected count). "
f"Delete the existing Delta Lake before re-exporting."
)

# --- Estimate bytes per row and resolve partition counts / bounds ---
Expand Down
6 changes: 1 addition & 5 deletions tests/test_deltalake_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,7 @@ def test_spatial_index_produces_morton_spec(self):
assert len(specs) == 1
assert specs[0].partition_by == "pt_position_morton"
assert specs[0].partition_strategy == "uniform_range"
assert specs[0].zorder_columns == [
"pt_position_x",
"pt_position_y",
"pt_position_z",
]
assert specs[0].zorder_columns == ["pt_position_morton"]
assert specs[0].source_geometry_column == "pt_position"

def test_multiple_indexes(self):
Expand Down
Loading