diff --git a/delphi/polismath/regression/comparer.py b/delphi/polismath/regression/comparer.py index 1646460fd..50ab43857 100644 --- a/delphi/polismath/regression/comparer.py +++ b/delphi/polismath/regression/comparer.py @@ -59,13 +59,20 @@ def __init__( # E.g., {0: 1, 1: -1} means PC1 unchanged, PC2 flipped self._pca_sign_flips: Dict[str, Dict[int, int]] = {} - def compare_with_golden(self, dataset_name: str, benchmark: bool = True) -> Dict: + def compare_with_golden( + self, + dataset_name: str, + benchmark: bool = False, + skip_intermediate_stages: bool = False, + ) -> Dict: """ Compare current implementation with golden snapshot. Args: dataset_name: Name of the dataset ('biodiversity' or 'vw') - benchmark: If True, compare timing information (default: True) + benchmark: If True, compare timing information (default: False) + skip_intermediate_stages: If True, skip stages 1-4 and only compute + full recompute + data export. Saves time for large datasets. Returns: Dictionary containing comparison results @@ -139,13 +146,17 @@ def compare_with_golden(self, dataset_name: str, benchmark: bool = True) -> Dict if benchmark: logger.info("Computing all stages with benchmarking...") current_results = compute_all_stages_with_benchmark( - dataset_name, votes_dict, metadata["fixed_timestamp"] + dataset_name, votes_dict, metadata["fixed_timestamp"], + skip_intermediate_stages=skip_intermediate_stages, ) current_stages = current_results["stages"] current_timing_stats = current_results["timing_stats"] else: logger.info("Computing all stages...") - current_results = compute_all_stages(dataset_name, votes_dict, metadata["fixed_timestamp"]) + current_results = compute_all_stages( + dataset_name, votes_dict, metadata["fixed_timestamp"], + skip_intermediate_stages=skip_intermediate_stages, + ) current_stages = current_results["stages"] current_timing_stats = {} diff --git a/delphi/polismath/regression/utils.py b/delphi/polismath/regression/utils.py index d06e68bf5..7a86877e8 100644 --- a/delphi/polismath/regression/utils.py +++ b/delphi/polismath/regression/utils.py @@ -37,7 +37,12 @@ def compute_file_md5(filepath: str) -> str: return "error_computing_md5" -def compute_all_stages(dataset_name: str, votes_dict: Dict, fixed_timestamp: int) -> Dict[str, Dict[str, Any]]: +def compute_all_stages( + dataset_name: str, + votes_dict: Dict, + fixed_timestamp: int, + skip_intermediate_stages: bool = False, +) -> Dict[str, Dict[str, Any]]: """ Compute all conversation stages with timing information. @@ -50,6 +55,9 @@ def compute_all_stages(dataset_name: str, votes_dict: Dict, fixed_timestamp: int votes_dict: Dictionary containing votes data with format: {'votes': [...], 'lastVoteTimestamp': timestamp} fixed_timestamp: Fixed timestamp for reproducibility + skip_intermediate_stages: If True, skip stages 1-4 (empty, load-only, + PCA-only, PCA+clustering) and only compute the full recompute and + data export. Saves significant time for large datasets. Returns: Dictionary with two keys: @@ -59,73 +67,74 @@ def compute_all_stages(dataset_name: str, votes_dict: Dict, fixed_timestamp: int stages = {} timings = {} - # Stage 1: Empty conversation (with fixed timestamp) - start_time = time.perf_counter() - conv_empty = Conversation(dataset_name, last_updated=fixed_timestamp) - timings["empty"] = time.perf_counter() - start_time - stages["empty"] = conv_empty.to_dict() - - # Stage 2: After loading votes (no recompute) - conv = Conversation(dataset_name, last_updated=fixed_timestamp) - start_time = time.perf_counter() - conv = conv.update_votes(votes_dict, recompute=False) - timings["after_load_no_compute"] = time.perf_counter() - start_time - - # Validation: Ensure votes were actually loaded - if conv.participant_count == 0 or conv.comment_count == 0: - raise ValueError( - f"Failed to load votes! participant_count={conv.participant_count}, " - f"comment_count={conv.comment_count}" - ) + if not skip_intermediate_stages: + # Stage 1: Empty conversation (with fixed timestamp) + start_time = time.perf_counter() + conv_empty = Conversation(dataset_name, last_updated=fixed_timestamp) + timings["empty"] = time.perf_counter() - start_time + stages["empty"] = conv_empty.to_dict() - stages["after_load_no_compute"] = conv.to_dict() - - # DEBUG: Capture the matrix that goes into PCA (only when DEBUG logging is enabled) - if logger.isEnabledFor(logging.DEBUG): - debug_info = {} - try: - # Get the clean matrix that PCA will use - if hasattr(conv, '_get_clean_matrix'): - clean_matrix = conv._get_clean_matrix() - # Save first 5x5 section of the matrix for debugging - if not clean_matrix.empty: - debug_info["pca_input_matrix_sample"] = { - "shape": list(clean_matrix.shape), - "rows_first_10": list(clean_matrix.index[:10]), - "cols_first_10": list(clean_matrix.columns[:10]), - "sample_5x5": clean_matrix.iloc[:5, :5].to_dict(), - "dtype": str(clean_matrix.dtypes.iloc[0] if len(clean_matrix.dtypes) > 0 else "unknown") - } - # Check for NaN values - nan_info = { - "total_cells": clean_matrix.size, - "nan_count": clean_matrix.isna().sum().sum(), - "nan_percentage": (clean_matrix.isna().sum().sum() / clean_matrix.size * 100) if clean_matrix.size > 0 else 0 - } - debug_info["nan_info"] = nan_info - - # Save debug info to .test_outputs/debug directory - debug_dir = Path(__file__).parent.parent / ".test_outputs" / "debug" - debug_dir.mkdir(parents=True, exist_ok=True) - debug_path = debug_dir / f"pca_debug_{dataset_name}.json" - with open(debug_path, "w") as f: - json.dump(debug_info, f, indent=2, default=str) - logger.debug(f"Saved PCA debug info to {debug_path}") - except Exception as e: - logger.error(f"Debug capture failed: {e}") - - # Stage 3: After PCA computation only - start_time = time.perf_counter() - conv._compute_pca() - timings["after_pca"] = time.perf_counter() - start_time - stages["after_pca"] = conv.to_dict() + # Stage 2: After loading votes (no recompute) + conv = Conversation(dataset_name, last_updated=fixed_timestamp) + start_time = time.perf_counter() + conv = conv.update_votes(votes_dict, recompute=False) + timings["after_load_no_compute"] = time.perf_counter() - start_time + + # Validation: Ensure votes were actually loaded + if conv.participant_count == 0 or conv.comment_count == 0: + raise ValueError( + f"Failed to load votes! participant_count={conv.participant_count}, " + f"comment_count={conv.comment_count}" + ) + + stages["after_load_no_compute"] = conv.to_dict() + + # DEBUG: Capture the matrix that goes into PCA (only when DEBUG logging is enabled) + if logger.isEnabledFor(logging.DEBUG): + debug_info = {} + try: + # Get the clean matrix that PCA will use + if hasattr(conv, '_get_clean_matrix'): + clean_matrix = conv._get_clean_matrix() + # Save first 5x5 section of the matrix for debugging + if not clean_matrix.empty: + debug_info["pca_input_matrix_sample"] = { + "shape": list(clean_matrix.shape), + "rows_first_10": list(clean_matrix.index[:10]), + "cols_first_10": list(clean_matrix.columns[:10]), + "sample_5x5": clean_matrix.iloc[:5, :5].to_dict(), + "dtype": str(clean_matrix.dtypes.iloc[0] if len(clean_matrix.dtypes) > 0 else "unknown") + } + # Check for NaN values + nan_info = { + "total_cells": clean_matrix.size, + "nan_count": clean_matrix.isna().sum().sum(), + "nan_percentage": (clean_matrix.isna().sum().sum() / clean_matrix.size * 100) if clean_matrix.size > 0 else 0 + } + debug_info["nan_info"] = nan_info + + # Save debug info to .test_outputs/debug directory + debug_dir = Path(__file__).parent.parent / ".test_outputs" / "debug" + debug_dir.mkdir(parents=True, exist_ok=True) + debug_path = debug_dir / f"pca_debug_{dataset_name}.json" + with open(debug_path, "w") as f: + json.dump(debug_info, f, indent=2, default=str) + logger.debug(f"Saved PCA debug info to {debug_path}") + except Exception as e: + logger.error(f"Debug capture failed: {e}") + + # Stage 3: After PCA computation only + start_time = time.perf_counter() + conv._compute_pca() + timings["after_pca"] = time.perf_counter() - start_time + stages["after_pca"] = conv.to_dict() - # Stage 4: After PCA + clustering - start_time = time.perf_counter() - conv._compute_pca() - conv._compute_clusters() - timings["after_clustering"] = time.perf_counter() - start_time - stages["after_clustering"] = conv.to_dict() + # Stage 4: After PCA + clustering + start_time = time.perf_counter() + conv._compute_pca() + conv._compute_clusters() + timings["after_clustering"] = time.perf_counter() - start_time + stages["after_clustering"] = conv.to_dict() # Stage 5: Full recompute (includes repness and participant_info) conv_full = Conversation(dataset_name, last_updated=fixed_timestamp) @@ -159,7 +168,8 @@ def compute_all_stages_with_benchmark( dataset_name: str, votes_dict: Dict, fixed_timestamp: int, - n_runs: int = 3 + n_runs: int = 3, + skip_intermediate_stages: bool = False, ) -> Dict[str, Any]: """ Compute all conversation stages multiple times and collect timing statistics. @@ -173,6 +183,8 @@ def compute_all_stages_with_benchmark( votes_dict: Dictionary containing votes data fixed_timestamp: Fixed timestamp for reproducibility n_runs: Number of times to run the computation (default: 3) + skip_intermediate_stages: If True, skip stages 1-4 (passed through to + compute_all_stages). Returns: Dictionary with: @@ -187,7 +199,10 @@ def compute_all_stages_with_benchmark( logger.info(f"Running {n_runs} iterations for benchmarking...") for i in range(n_runs): - result = compute_all_stages(dataset_name, votes_dict, fixed_timestamp) + result = compute_all_stages( + dataset_name, votes_dict, fixed_timestamp, + skip_intermediate_stages=skip_intermediate_stages, + ) if stages is None or i == n_runs - 1: # Keep the last run's stages stages = result["stages"] diff --git a/delphi/tests/test_regression.py b/delphi/tests/test_regression.py index df4823d50..fae3af4a6 100644 --- a/delphi/tests/test_regression.py +++ b/delphi/tests/test_regression.py @@ -71,8 +71,10 @@ def test_conversation_regression(dataset_name): # and different implementations may produce equivalent results with opposite signs comparer = ConversationComparer(ignore_pca_sign_flip=True) - # Run comparison - result = comparer.compare_with_golden(dataset_name) + # Run comparison — skip intermediate stages (empty, load-only, PCA-only, + # PCA+clustering) since this test only checks overall_match. The stage-level + # test below exercises intermediate stages individually. + result = comparer.compare_with_golden(dataset_name, skip_intermediate_stages=True) # Check for errors if "error" in result: