Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions delphi/polismath/regression/comparer.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,20 @@ def __init__(
# E.g., {0: 1, 1: -1} means PC1 unchanged, PC2 flipped
self._pca_sign_flips: Dict[str, Dict[int, int]] = {}

def compare_with_golden(self, dataset_name: str, benchmark: bool = True) -> Dict:
def compare_with_golden(
self,
dataset_name: str,
benchmark: bool = False,
skip_intermediate_stages: bool = False,
) -> Dict:
"""
Compare current implementation with golden snapshot.

Args:
dataset_name: Name of the dataset ('biodiversity' or 'vw')
benchmark: If True, compare timing information (default: True)
benchmark: If True, compare timing information (default: False)
skip_intermediate_stages: If True, skip stages 1-4 and only compute
full recompute + data export. Saves time for large datasets.

Returns:
Dictionary containing comparison results
Expand Down Expand Up @@ -139,13 +146,17 @@ def compare_with_golden(self, dataset_name: str, benchmark: bool = True) -> Dict
if benchmark:
logger.info("Computing all stages with benchmarking...")
current_results = compute_all_stages_with_benchmark(
dataset_name, votes_dict, metadata["fixed_timestamp"]
dataset_name, votes_dict, metadata["fixed_timestamp"],
skip_intermediate_stages=skip_intermediate_stages,
)
current_stages = current_results["stages"]
current_timing_stats = current_results["timing_stats"]
else:
logger.info("Computing all stages...")
current_results = compute_all_stages(dataset_name, votes_dict, metadata["fixed_timestamp"])
current_results = compute_all_stages(
dataset_name, votes_dict, metadata["fixed_timestamp"],
skip_intermediate_stages=skip_intermediate_stages,
)
current_stages = current_results["stages"]
current_timing_stats = {}

Expand Down
151 changes: 83 additions & 68 deletions delphi/polismath/regression/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ def compute_file_md5(filepath: str) -> str:
return "error_computing_md5"


def compute_all_stages(dataset_name: str, votes_dict: Dict, fixed_timestamp: int) -> Dict[str, Dict[str, Any]]:
def compute_all_stages(
dataset_name: str,
votes_dict: Dict,
fixed_timestamp: int,
skip_intermediate_stages: bool = False,
) -> Dict[str, Dict[str, Any]]:
"""
Compute all conversation stages with timing information.

Expand All @@ -50,6 +55,9 @@ def compute_all_stages(dataset_name: str, votes_dict: Dict, fixed_timestamp: int
votes_dict: Dictionary containing votes data with format:
{'votes': [...], 'lastVoteTimestamp': timestamp}
fixed_timestamp: Fixed timestamp for reproducibility
skip_intermediate_stages: If True, skip stages 1-4 (empty, load-only,
PCA-only, PCA+clustering) and only compute the full recompute and
data export. Saves significant time for large datasets.

Returns:
Dictionary with two keys:
Expand All @@ -59,73 +67,74 @@ def compute_all_stages(dataset_name: str, votes_dict: Dict, fixed_timestamp: int
stages = {}
timings = {}

# Stage 1: Empty conversation (with fixed timestamp)
start_time = time.perf_counter()
conv_empty = Conversation(dataset_name, last_updated=fixed_timestamp)
timings["empty"] = time.perf_counter() - start_time
stages["empty"] = conv_empty.to_dict()

# Stage 2: After loading votes (no recompute)
conv = Conversation(dataset_name, last_updated=fixed_timestamp)
start_time = time.perf_counter()
conv = conv.update_votes(votes_dict, recompute=False)
timings["after_load_no_compute"] = time.perf_counter() - start_time

# Validation: Ensure votes were actually loaded
if conv.participant_count == 0 or conv.comment_count == 0:
raise ValueError(
f"Failed to load votes! participant_count={conv.participant_count}, "
f"comment_count={conv.comment_count}"
)
if not skip_intermediate_stages:
# Stage 1: Empty conversation (with fixed timestamp)
start_time = time.perf_counter()
conv_empty = Conversation(dataset_name, last_updated=fixed_timestamp)
timings["empty"] = time.perf_counter() - start_time
stages["empty"] = conv_empty.to_dict()
Comment on lines +70 to +75
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gating stages 1–4 behind a single if not skip_intermediate_stages: introduces a very large indented block, making future edits to the stage pipeline easier to get wrong (e.g., accidentally moving shared logic into/out of the gated section). A more maintainable structure is to compute stage 5+ unconditionally, and put stages 1–4 into a small helper (or early-return pattern that appends optional stages), so the core pipeline remains at one indentation level.

Copilot uses AI. Check for mistakes.

stages["after_load_no_compute"] = conv.to_dict()

# DEBUG: Capture the matrix that goes into PCA (only when DEBUG logging is enabled)
if logger.isEnabledFor(logging.DEBUG):
debug_info = {}
try:
# Get the clean matrix that PCA will use
if hasattr(conv, '_get_clean_matrix'):
clean_matrix = conv._get_clean_matrix()
# Save first 5x5 section of the matrix for debugging
if not clean_matrix.empty:
debug_info["pca_input_matrix_sample"] = {
"shape": list(clean_matrix.shape),
"rows_first_10": list(clean_matrix.index[:10]),
"cols_first_10": list(clean_matrix.columns[:10]),
"sample_5x5": clean_matrix.iloc[:5, :5].to_dict(),
"dtype": str(clean_matrix.dtypes.iloc[0] if len(clean_matrix.dtypes) > 0 else "unknown")
}
# Check for NaN values
nan_info = {
"total_cells": clean_matrix.size,
"nan_count": clean_matrix.isna().sum().sum(),
"nan_percentage": (clean_matrix.isna().sum().sum() / clean_matrix.size * 100) if clean_matrix.size > 0 else 0
}
debug_info["nan_info"] = nan_info

# Save debug info to .test_outputs/debug directory
debug_dir = Path(__file__).parent.parent / ".test_outputs" / "debug"
debug_dir.mkdir(parents=True, exist_ok=True)
debug_path = debug_dir / f"pca_debug_{dataset_name}.json"
with open(debug_path, "w") as f:
json.dump(debug_info, f, indent=2, default=str)
logger.debug(f"Saved PCA debug info to {debug_path}")
except Exception as e:
logger.error(f"Debug capture failed: {e}")

# Stage 3: After PCA computation only
start_time = time.perf_counter()
conv._compute_pca()
timings["after_pca"] = time.perf_counter() - start_time
stages["after_pca"] = conv.to_dict()
# Stage 2: After loading votes (no recompute)
conv = Conversation(dataset_name, last_updated=fixed_timestamp)
start_time = time.perf_counter()
conv = conv.update_votes(votes_dict, recompute=False)
timings["after_load_no_compute"] = time.perf_counter() - start_time

# Validation: Ensure votes were actually loaded
if conv.participant_count == 0 or conv.comment_count == 0:
raise ValueError(
f"Failed to load votes! participant_count={conv.participant_count}, "
f"comment_count={conv.comment_count}"
)

stages["after_load_no_compute"] = conv.to_dict()

# DEBUG: Capture the matrix that goes into PCA (only when DEBUG logging is enabled)
if logger.isEnabledFor(logging.DEBUG):
debug_info = {}
try:
# Get the clean matrix that PCA will use
if hasattr(conv, '_get_clean_matrix'):
clean_matrix = conv._get_clean_matrix()
# Save first 5x5 section of the matrix for debugging
if not clean_matrix.empty:
debug_info["pca_input_matrix_sample"] = {
"shape": list(clean_matrix.shape),
"rows_first_10": list(clean_matrix.index[:10]),
"cols_first_10": list(clean_matrix.columns[:10]),
"sample_5x5": clean_matrix.iloc[:5, :5].to_dict(),
"dtype": str(clean_matrix.dtypes.iloc[0] if len(clean_matrix.dtypes) > 0 else "unknown")
}
# Check for NaN values
nan_info = {
"total_cells": clean_matrix.size,
"nan_count": clean_matrix.isna().sum().sum(),
"nan_percentage": (clean_matrix.isna().sum().sum() / clean_matrix.size * 100) if clean_matrix.size > 0 else 0
}
debug_info["nan_info"] = nan_info

# Save debug info to .test_outputs/debug directory
debug_dir = Path(__file__).parent.parent / ".test_outputs" / "debug"
debug_dir.mkdir(parents=True, exist_ok=True)
debug_path = debug_dir / f"pca_debug_{dataset_name}.json"
with open(debug_path, "w") as f:
json.dump(debug_info, f, indent=2, default=str)
logger.debug(f"Saved PCA debug info to {debug_path}")
except Exception as e:
logger.error(f"Debug capture failed: {e}")

# Stage 3: After PCA computation only
start_time = time.perf_counter()
conv._compute_pca()
timings["after_pca"] = time.perf_counter() - start_time
stages["after_pca"] = conv.to_dict()

# Stage 4: After PCA + clustering
start_time = time.perf_counter()
conv._compute_pca()
conv._compute_clusters()
timings["after_clustering"] = time.perf_counter() - start_time
stages["after_clustering"] = conv.to_dict()
# Stage 4: After PCA + clustering
start_time = time.perf_counter()
conv._compute_pca()
conv._compute_clusters()
timings["after_clustering"] = time.perf_counter() - start_time
stages["after_clustering"] = conv.to_dict()

# Stage 5: Full recompute (includes repness and participant_info)
conv_full = Conversation(dataset_name, last_updated=fixed_timestamp)
Expand Down Expand Up @@ -159,7 +168,8 @@ def compute_all_stages_with_benchmark(
dataset_name: str,
votes_dict: Dict,
fixed_timestamp: int,
n_runs: int = 3
n_runs: int = 3,
skip_intermediate_stages: bool = False,
) -> Dict[str, Any]:
Comment on lines +171 to 173
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compute_all_stages_with_benchmark() now supports skip_intermediate_stages, which changes the set of timing keys produced by compute_all_stages(). If the benchmarking aggregation code assumes a fixed set of stage keys across runs (and/or always-present optional stages like full_data_export), this can lead to KeyErrors or inconsistent stats. Make the aggregation robust to missing/conditional stage keys (e.g., aggregate over the union of keys and skip/None-fill missing ones) or ensure compute_all_stages() always emits consistent timing keys when benchmarking is enabled.

Copilot uses AI. Check for mistakes.
"""
Compute all conversation stages multiple times and collect timing statistics.
Expand All @@ -173,6 +183,8 @@ def compute_all_stages_with_benchmark(
votes_dict: Dictionary containing votes data
fixed_timestamp: Fixed timestamp for reproducibility
n_runs: Number of times to run the computation (default: 3)
skip_intermediate_stages: If True, skip stages 1-4 (passed through to
compute_all_stages).

Returns:
Dictionary with:
Expand All @@ -187,7 +199,10 @@ def compute_all_stages_with_benchmark(

logger.info(f"Running {n_runs} iterations for benchmarking...")
for i in range(n_runs):
result = compute_all_stages(dataset_name, votes_dict, fixed_timestamp)
result = compute_all_stages(
dataset_name, votes_dict, fixed_timestamp,
skip_intermediate_stages=skip_intermediate_stages,
)
if stages is None or i == n_runs - 1:
# Keep the last run's stages
stages = result["stages"]
Expand Down
6 changes: 4 additions & 2 deletions delphi/tests/test_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,10 @@ def test_conversation_regression(dataset_name):
# and different implementations may produce equivalent results with opposite signs
comparer = ConversationComparer(ignore_pca_sign_flip=True)

# Run comparison
result = comparer.compare_with_golden(dataset_name)
# Run comparison — skip intermediate stages (empty, load-only, PCA-only,
# PCA+clustering) since this test only checks overall_match. The stage-level
# test below exercises intermediate stages individually.
result = comparer.compare_with_golden(dataset_name, skip_intermediate_stages=True)

Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change introduces a new execution path (skip_intermediate_stages=True) that bypasses stages 1–4. Add a targeted assertion in the test suite that explicitly verifies the skipped-stage behavior (e.g., that intermediate stage keys are absent and that the remaining stages still match golden), so failures in the new control-flow won’t be masked by only checking overall_match.

Suggested change
# Verify that no stage-level keys are present when skip_intermediate_stages=True.
# This ensures that the new control-flow genuinely skips emitting intermediate
# stage results, rather than only affecting overall_match.
stage_keys = [key for key in result.keys() if "stage" in key.lower()]
assert not stage_keys, (
f"Expected no stage-level results when skip_intermediate_stages=True, "
f"but found keys: {stage_keys}"
)

Copilot uses AI. Check for mistakes.
# Check for errors
if "error" in result:
Expand Down
Loading