diff --git a/api/schemas.py b/api/schemas.py index 614d0ee..81231fa 100644 --- a/api/schemas.py +++ b/api/schemas.py @@ -100,6 +100,7 @@ class SearchResponse(BaseModel): retrieved_at: datetime cached: bool = False request_id: str | None = None + stage_timings: dict | None = None # per-stage timing in ms class CompareResponse(BaseModel): diff --git a/api/services.py b/api/services.py index 6d203e8..783bfb1 100644 --- a/api/services.py +++ b/api/services.py @@ -154,6 +154,7 @@ def search(self, request: SearchRequest) -> SearchResponse: related_provisions=response_data.get("related_provisions", []), response_format=request.format, retrieved_at=datetime.now(UTC), + stage_timings=getattr(self.orchestrator, "_last_timing", None), ) def _assess_confidence( diff --git a/benchmark/__init__.py b/benchmark/__init__.py new file mode 100644 index 0000000..e02abfc --- /dev/null +++ b/benchmark/__init__.py @@ -0,0 +1 @@ + diff --git a/benchmark/adapters/__init__.py b/benchmark/adapters/__init__.py new file mode 100644 index 0000000..e02abfc --- /dev/null +++ b/benchmark/adapters/__init__.py @@ -0,0 +1 @@ + diff --git a/benchmark/adapters/hector_adapter.py b/benchmark/adapters/hector_adapter.py new file mode 100644 index 0000000..7edc002 --- /dev/null +++ b/benchmark/adapters/hector_adapter.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python3 +""" +HECTOR Benchmark Adapter + +Translates HECTOR-specific API responses into the format expected +by the benchmark CLI. Used for profiling stage timing breakdowns. + +Usage: + python benchmark/adapters/hector_adapter.py --host localhost --port 8000 --query "What is Section 302 IPC?" +""" + +import argparse +import json +import os +import sys +import time + +import requests + + +def profile_query(host: str, port: int, query: str, api_key: str | None = None) -> dict: + """Send a single query and return detailed timing breakdown.""" + url = f"http://{host}:{port}/v1/search" + headers = {"Content-Type": "application/json"} + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + + payload = { + "query": query, + "top_k": 10, + "response_format": "detailed", + } + + t0 = time.perf_counter() + resp = requests.post(url, json=payload, headers=headers, timeout=60) + total_ms = (time.perf_counter() - t0) * 1000 + resp.raise_for_status() + data = resp.json() + + # Extract stage timings from response + stage_timings = data.get("stage_timings", {}) + + return { + "query": query, + "total_http_ms": round(total_ms, 1), + "stage_timings": stage_timings, + "num_results": data.get("total_results", 0), + "num_items": len(data.get("items", [])), + "num_citations": len(data.get("citations", [])), + "route": data.get("route", ""), + "confidence_level": data.get("confidence_level", ""), + "cached": data.get("cached", False), + } + + +def main(): + parser = argparse.ArgumentParser(description="HECTOR Benchmark Adapter") + parser.add_argument("--host", default=os.getenv("HECTOR_API_HOST", "localhost")) + parser.add_argument("--port", type=int, default=int(os.getenv("HECTOR_API_PORT", 8000))) + parser.add_argument("--query", required=True, help="Query to profile") + parser.add_argument("--api-key", default=os.getenv("HECTOR_API_KEY")) + + args = parser.parse_args() + + result = profile_query(args.host, args.port, args.query, args.api_key) + print(json.dumps(result, indent=2)) + + # Print timing breakdown + timings = result.get("stage_timings", {}) + if timings: + print(f"\nStage Timing Breakdown:") + print(f" Routing: {timings.get('route_ms', 0):.1f}ms") + print(f" Normalization: {timings.get('normalize_ms', 0):.1f}ms") + print(f" Generation: {timings.get('generate_ms', 0):.1f}ms") + print(f" Verification: {timings.get('verify_ms', 0):.1f}ms") + print(f" Total: {timings.get('total_ms', 0):.1f}ms") + + # Identify bottleneck + stages = { + "routing": timings.get("route_ms", 0), + "normalization": timings.get("normalize_ms", 0), + "generation": timings.get("generate_ms", 0), + "verification": timings.get("verify_ms", 0), + } + bottleneck = max(stages, key=stages.get) + print(f"\n Bottleneck: {bottleneck} ({stages[bottleneck]:.1f}ms)") + + +if __name__ == "__main__": + main() diff --git a/benchmark/configs/quick_profile.yaml b/benchmark/configs/quick_profile.yaml new file mode 100644 index 0000000..0cb7c7c --- /dev/null +++ b/benchmark/configs/quick_profile.yaml @@ -0,0 +1,30 @@ +# HECTOR Quick Profile Configuration +# Profile-only mode — no load testing. ~30s runtime. +# Use for fast iteration on retrieval/reranker tuning. + +target: + url: "http://localhost:8000" + timeout_s: 30 + +rag: + endpoint: "/v1/search" + method: "POST" + collection_names: ["indian_law_bns"] + top_k: 10 + response_format: "detailed" + +profiling: + enabled: true + warmup_requests: 3 + profile_requests: 10 + capture_stage_timings: true + +aiperf: + enabled: false + +input: + file: "benchmark/queries.jsonl" + +output: + dir: "benchmark/results" + experiment_name: "quick_profile" diff --git a/benchmark/configs/single_run.yaml b/benchmark/configs/single_run.yaml new file mode 100644 index 0000000..a64900c --- /dev/null +++ b/benchmark/configs/single_run.yaml @@ -0,0 +1,34 @@ +# HECTOR Single Run Configuration +# One concurrency level with profiling + optional load test. ~2 min. +# Use for regression checks and baseline measurements. + +target: + url: "http://localhost:8000" + timeout_s: 60 + +rag: + endpoint: "/v1/search" + method: "POST" + collection_names: ["indian_law_bns"] + top_k: 10 + response_format: "detailed" + +profiling: + enabled: true + warmup_requests: 5 + profile_requests: 20 + capture_stage_timings: true + +aiperf: + enabled: true + concurrency: 5 + iterations: 3 + duration_s: 60 + sleep_between_points_s: 0 + +input: + file: "benchmark/queries.jsonl" + +output: + dir: "benchmark/results" + experiment_name: "single_run" diff --git a/benchmark/configs/sweep.yaml b/benchmark/configs/sweep.yaml new file mode 100644 index 0000000..bb8bb48 --- /dev/null +++ b/benchmark/configs/sweep.yaml @@ -0,0 +1,34 @@ +# HECTOR Sweep Configuration +# Multi-axis sweep across concurrency × top_k. For bottleneck analysis. +# Each axis accepts a list — Cartesian product of all combinations. + +target: + url: "http://localhost:8000" + timeout_s: 60 + +rag: + endpoint: "/v1/search" + method: "POST" + collection_names: ["indian_law_bns"] + top_k: [5, 10, 20] + response_format: "detailed" + +profiling: + enabled: true + warmup_requests: 3 + profile_requests: 10 + capture_stage_timings: true + +aiperf: + enabled: true + concurrency: [1, 5, 10, 20] + iterations: 3 + duration_s: 30 + sleep_between_points_s: 5 + +input: + file: "benchmark/queries.jsonl" + +output: + dir: "benchmark/results" + experiment_name: "sweep" diff --git a/benchmark/queries.jsonl b/benchmark/queries.jsonl new file mode 100644 index 0000000..1f65c52 --- /dev/null +++ b/benchmark/queries.jsonl @@ -0,0 +1,25 @@ +{"query": "What is the punishment for murder under Section 302 IPC?"} +{"query": "Which BNS section replaces IPC Section 302?"} +{"query": "Explain the difference between culpable homicide and murder"} +{"query": "What are the rights of an arrested person?"} +{"query": "What is Section 376 IPC and its BNS equivalent?"} +{"query": "Explain the concept of defamation under Indian law"} +{"query": "What are the essentials of a valid contract?"} +{"query": "IPC Section 420 corresponds to which BNS section?"} +{"query": "What is the doctrine of basic structure?"} +{"query": "What are the grounds for divorce under Hindu Marriage Act?"} +{"query": "What is Section 144 CrPC and when can it be imposed?"} +{"query": "Explain the concept of precedent in Indian legal system"} +{"query": "What are the essential elements of crime?"} +{"query": "What is the punishment for robbery under IPC?"} +{"query": "What are the fundamental rights under Part III of the Constitution?"} +{"query": "What is the concept of estoppel under Indian Evidence Act?"} +{"query": "What are the ingredients of Section 304B IPC dowry death?"} +{"query": "Explain the right to privacy under Indian Constitution"} +{"query": "What is the punishment for criminal conspiracy under IPC?"} +{"query": "What is the difference between Section 300 and Section 304 IPC?"} +{"query": "What are the provisions for anticipatory bail under CrPC?"} +{"query": "What is the role of the Attorney General of India?"} +{"query": "Explain the concept of bail in criminal cases"} +{"query": "What is the difference between IPC and BNS?"} +{"query": "What are the provisions related to anticipatory bail?"} diff --git a/benchmark/rag_benchmark.py b/benchmark/rag_benchmark.py new file mode 100644 index 0000000..20d2fb1 --- /dev/null +++ b/benchmark/rag_benchmark.py @@ -0,0 +1,587 @@ +#!/usr/bin/env python3 +""" +HECTOR Performance Benchmark CLI + +Config-driven performance benchmarking for HECTOR's RAG pipeline. +Profiles per-stage timing, measures throughput under load, and +identifies bottlenecks. + +Usage: + python benchmark/rag_benchmark.py -c benchmark/configs/quick_profile.yaml + python benchmark/rag_benchmark.py -c benchmark/configs/single_run.yaml + python benchmark/rag_benchmark.py -c benchmark/configs/sweep.yaml +""" + +import argparse +import csv +import json +import os +import statistics +import sys +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +import requests +import yaml + + +# --------------------------------------------------------------------------- +# Config models +# --------------------------------------------------------------------------- + +@dataclass +class TargetConfig: + url: str = "http://localhost:8000" + timeout_s: int = 60 + + +@dataclass +class RagConfig: + endpoint: str = "/v1/search" + method: str = "POST" + collection_names: list[str] = field(default_factory=lambda: ["indian_law_bns"]) + top_k: int | list[int] = 10 + response_format: str = "detailed" + + +@dataclass +class ProfilingConfig: + enabled: bool = True + warmup_requests: int = 3 + profile_requests: int = 20 + capture_stage_timings: bool = True + + +@dataclass +class AiperfConfig: + enabled: bool = False + concurrency: int | list[int] = 5 + iterations: int = 3 + duration_s: int = 60 + sleep_between_points_s: int = 0 + + +@dataclass +class InputConfig: + file: str = "benchmark/queries.jsonl" + + +@dataclass +class OutputConfig: + dir: str = "benchmark/results" + experiment_name: str = "benchmark" + + +@dataclass +class BenchConfig: + target: TargetConfig = field(default_factory=TargetConfig) + rag: RagConfig = field(default_factory=RagConfig) + profiling: ProfilingConfig = field(default_factory=ProfilingConfig) + aiperf: AiperfConfig = field(default_factory=AiperfConfig) + input: InputConfig = field(default_factory=InputConfig) + output: OutputConfig = field(default_factory=OutputConfig) + + +def load_config(path: str) -> BenchConfig: + """Load YAML config into BenchConfig.""" + with open(path, "r") as f: + raw = yaml.safe_load(f) + + cfg = BenchConfig() + if "target" in raw: + cfg.target = TargetConfig(**raw["target"]) + if "rag" in raw: + cfg.rag = RagConfig(**raw["rag"]) + if "profiling" in raw: + cfg.profiling = ProfilingConfig(**raw["profiling"]) + if "aiperf" in raw: + cfg.aiperf = AiperfConfig(**raw["aiperf"]) + if "input" in raw: + cfg.input = InputConfig(**raw["input"]) + if "output" in raw: + cfg.output = OutputConfig(**raw["output"]) + return cfg + + +# --------------------------------------------------------------------------- +# Query loader +# --------------------------------------------------------------------------- + +def load_queries(path: str) -> list[str]: + """Load queries from a JSONL file.""" + queries = [] + with open(path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + obj = json.loads(line) + if "query" in obj: + queries.append(obj["query"]) + return queries + + +# --------------------------------------------------------------------------- +# HECTOR API client +# --------------------------------------------------------------------------- + +def query_hector( + url: str, + query: str, + top_k: int = 10, + timeout: int = 60, + api_key: str | None = None, +) -> dict[str, Any]: + """Send a single query to HECTOR and return timing + response data.""" + headers = {"Content-Type": "application/json"} + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + + payload = { + "query": query, + "top_k": top_k, + "response_format": "detailed", + } + + t0 = time.perf_counter() + try: + resp = requests.post(url, json=payload, headers=headers, timeout=timeout) + elapsed_ms = (time.perf_counter() - t0) * 1000 + resp.raise_for_status() + data = resp.json() + return { + "status": "success", + "latency_ms": round(elapsed_ms, 1), + "num_results": data.get("total_results", 0), + "num_items": len(data.get("items", [])), + "num_citations": len(data.get("citations", [])), + "answer_length": len(data.get("generated_response", "")), + "route": data.get("route", ""), + "cached": data.get("cached", False), + "confidence_level": data.get("confidence_level", ""), + } + except Exception as e: + elapsed_ms = (time.perf_counter() - t0) * 1000 + return { + "status": "error", + "latency_ms": round(elapsed_ms, 1), + "error": str(e), + } + + +# --------------------------------------------------------------------------- +# Profiling phase +# --------------------------------------------------------------------------- + +def run_profiling( + cfg: BenchConfig, + queries: list[str], + api_key: str | None = None, +) -> dict[str, Any]: + """Run profiling phase — sequential queries with per-stage timing.""" + if not cfg.profiling.enabled: + return {"skipped": True} + + url = cfg.target.url + cfg.rag.endpoint + top_k = cfg.rag.top_k if isinstance(cfg.rag.top_k, int) else cfg.rag.top_k[0] + warmup = cfg.profiling.warmup_requests + profile_n = cfg.profiling.profile_requests + + print(f"\n [PROFILING] Warmup: {warmup} | Profile: {profile_n}") + + # Warmup + for i in range(warmup): + q = queries[i % len(queries)] + query_hector(url, q, top_k, cfg.target.timeout_s, api_key) + print(f" warmup {i+1}/{warmup} done") + + # Profile + results = [] + for i in range(profile_n): + q = queries[i % len(queries)] + r = query_hector(url, q, top_k, cfg.target.timeout_s, api_key) + r["query_index"] = i + r["query"] = q[:80] + results.append(r) + status_icon = "+" if r["status"] == "success" else "x" + print(f" profile {i+1}/{profile_n} [{status_icon}] {r['latency_ms']:.0f}ms") + + # Compute stats + latencies = [r["latency_ms"] for r in results if r["status"] == "success"] + errors = [r for r in results if r["status"] == "error"] + + stats = {} + if latencies: + latencies_sorted = sorted(latencies) + stats = { + "total_requests": len(results), + "successful": len(latencies), + "errors": len(errors), + "avg_latency_ms": round(statistics.mean(latencies), 1), + "median_latency_ms": round(statistics.median(latencies), 1), + "p95_latency_ms": round(latencies_sorted[int(len(latencies_sorted) * 0.95)], 1) if len(latencies_sorted) >= 2 else round(latencies_sorted[-1], 1), + "p99_latency_ms": round(latencies_sorted[int(len(latencies_sorted) * 0.99)], 1) if len(latencies_sorted) >= 2 else round(latencies_sorted[-1], 1), + "min_latency_ms": round(min(latencies), 1), + "max_latency_ms": round(max(latencies), 1), + "std_dev_ms": round(statistics.stdev(latencies), 1) if len(latencies) > 1 else 0, + } + + return {"stats": stats, "results": results, "errors": errors} + + +# --------------------------------------------------------------------------- +# Load testing phase +# --------------------------------------------------------------------------- + +def run_load_test( + cfg: BenchConfig, + queries: list[str], + concurrency: int, + api_key: str | None = None, +) -> dict[str, Any]: + """Run load test phase — concurrent queries at specified concurrency.""" + if not cfg.aiperf.enabled: + return {"skipped": True} + + url = cfg.target.url + cfg.rag.endpoint + top_k = cfg.rag.top_k if isinstance(cfg.rag.top_k, int) else cfg.rag.top_k[0] + iterations = cfg.aiperf.iterations + duration_s = cfg.aiperf.duration_s + + print(f"\n [LOAD TEST] Concurrency: {concurrency} | Iterations: {iterations} | Duration: {duration_s}s") + + all_results = [] + start_time = time.time() + query_idx = 0 + + def worker(): + nonlocal query_idx + while time.time() - start_time < duration_s: + idx = query_idx % len(queries) + query_idx += 1 + r = query_hector(url, queries[idx], top_k, cfg.target.timeout_s, api_key) + all_results.append(r) + + # Launch concurrent workers + with ThreadPoolExecutor(max_workers=concurrency) as executor: + futures = [executor.submit(worker) for _ in range(concurrency)] + for f in futures: + f.result() + + elapsed = time.time() - start_time + latencies = [r["latency_ms"] for r in all_results if r["status"] == "success"] + errors = [r for r in all_results if r["status"] == "error"] + + stats = {} + if latencies: + latencies_sorted = sorted(latencies) + throughput = len(latencies) / elapsed if elapsed > 0 else 0 + stats = { + "concurrency": concurrency, + "elapsed_s": round(elapsed, 1), + "total_requests": len(all_results), + "successful": len(latencies), + "errors": len(errors), + "error_rate": round(len(errors) / len(all_results) * 100, 1) if all_results else 0, + "throughput_qps": round(throughput, 2), + "avg_latency_ms": round(statistics.mean(latencies), 1), + "median_latency_ms": round(statistics.median(latencies), 1), + "p95_latency_ms": round(latencies_sorted[int(len(latencies_sorted) * 0.95)], 1) if len(latencies_sorted) >= 2 else round(latencies_sorted[-1], 1), + "p99_latency_ms": round(latencies_sorted[int(len(latencies_sorted) * 0.99)], 1) if len(latencies_sorted) >= 2 else round(latencies_sorted[-1], 1), + "min_latency_ms": round(min(latencies), 1), + "max_latency_ms": round(max(latencies), 1), + } + + return {"stats": stats, "results": all_results} + + +# --------------------------------------------------------------------------- +# Sweep +# --------------------------------------------------------------------------- + +def run_sweep( + cfg: BenchConfig, + queries: list[str], + api_key: str | None = None, +) -> list[dict[str, Any]]: + """Run sweep across all axis combinations.""" + concurrencies = cfg.aiperf.concurrency if isinstance(cfg.aiperf.concurrency, list) else [cfg.aiperf.concurrency] + top_ks = cfg.rag.top_k if isinstance(cfg.rag.top_k, list) else [cfg.rag.top_k] + + sweep_results = [] + total_points = len(concurrencies) * len(top_ks) + point_idx = 0 + + for cr in concurrencies: + for tk in top_ks: + point_idx += 1 + print(f"\n{'='*60}") + print(f" SWEEP POINT {point_idx}/{total_points}: concurrency={cr}, top_k={tk}") + print(f"{'='*60}") + + # Temporarily override config + orig_top_k = cfg.rag.top_k + orig_concurrency = cfg.aiperf.concurrency + orig_enabled = cfg.aiperf.enabled + orig_iterations = cfg.aiperf.iterations + orig_duration = cfg.aiperf.duration_s + + cfg.rag.top_k = tk + cfg.aiperf.concurrency = cr + cfg.aiperf.enabled = True + cfg.aiperf.iterations = 1 + cfg.aiperf.duration_s = 15 # Shorter for sweep points + + result = run_load_test(cfg, queries, cr, api_key) + result["top_k"] = tk + result["concurrency"] = cr + sweep_results.append(result) + + # Restore config + cfg.rag.top_k = orig_top_k + cfg.aiperf.concurrency = orig_concurrency + cfg.aiperf.enabled = orig_enabled + cfg.aiperf.iterations = orig_iterations + cfg.aiperf.duration_s = orig_duration + + if cfg.aiperf.sleep_between_points_s > 0: + time.sleep(cfg.aiperf.sleep_between_points_s) + + return sweep_results + + +# --------------------------------------------------------------------------- +# Report generation +# --------------------------------------------------------------------------- + +def generate_report( + cfg: BenchConfig, + profiling: dict[str, Any], + load_results: list[dict[str, Any]] | dict[str, Any] | None, + sweep_results: list[dict[str, Any]] | None, +) -> dict[str, Any]: + """Generate unified benchmark report.""" + report = { + "config": { + "target": cfg.target.url, + "top_k": cfg.rag.top_k, + "collection": cfg.rag.collection_names, + }, + "profiling": profiling.get("stats", {}), + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + } + + if load_results and isinstance(load_results, dict) and "stats" in load_results: + report["load_test"] = load_results["stats"] + + if sweep_results: + report["sweep"] = [ + { + "concurrency": s.get("concurrency"), + "top_k": s.get("top_k"), + **(s.get("stats", {})), + } + for s in sweep_results + if s.get("stats") + ] + + return report + + +def save_results( + report: dict[str, Any], + profiling: dict[str, Any], + load_results: dict[str, Any] | None, + sweep_results: list[dict[str, Any]] | None, + output_dir: str, + experiment_name: str, +) -> Path: + """Save all results to disk.""" + run_dir = Path(output_dir) / f"{experiment_name}_run_{int(time.time())}" + run_dir.mkdir(parents=True, exist_ok=True) + + # Main report + with open(run_dir / "report.json", "w", encoding="utf-8") as f: + json.dump(report, f, indent=2, ensure_ascii=False) + + # Profiling details + if profiling.get("results"): + with open(run_dir / "profiling_results.json", "w", encoding="utf-8") as f: + json.dump(profiling["results"], f, indent=2, ensure_ascii=False) + + # Load test details + if load_results and load_results.get("results"): + with open(run_dir / "load_test_results.json", "w", encoding="utf-8") as f: + json.dump(load_results["results"], f, indent=2, ensure_ascii=False) + + # Sweep results + if sweep_results: + with open(run_dir / "sweep_results.json", "w", encoding="utf-8") as f: + json.dump(sweep_results, f, indent=2, ensure_ascii=False) + + # CSV for easy analysis + csv_path = run_dir / "sweep_results.csv" + with open(csv_path, "w", newline="", encoding="utf-8") as f: + if sweep_results and sweep_results[0].get("stats"): + writer = csv.DictWriter(f, fieldnames=list(sweep_results[0]["stats"].keys())) + writer.writeheader() + for s in sweep_results: + if s.get("stats"): + writer.writerow(s["stats"]) + + # Human-readable report + with open(run_dir / "report.md", "w", encoding="utf-8") as f: + f.write(_format_markdown_report(report)) + + return run_dir + + +def _format_markdown_report(report: dict[str, Any]) -> str: + """Format report as markdown.""" + lines = [ + "# HECTOR Performance Benchmark Report", + "", + f"**Target:** {report.get('config', {}).get('target', 'unknown')}", + f"**Timestamp:** {report.get('timestamp', 'unknown')}", + f"**Top-K:** {report.get('config', {}).get('top_k', 'unknown')}", + "", + ] + + # Profiling + prof = report.get("profiling", {}) + if prof and prof.get("total_requests"): + lines.extend([ + "## Profiling Results", + "", + f"| Metric | Value |", + ...]) + for key, val in prof.items(): + lines.append(f"| {key} | {val} |") + lines.append("") + + # Load test + load = report.get("load_test", {}) + if load and load.get("total_requests"): + lines.extend([ + "## Load Test Results", + "", + "| Metric | Value |", + "|--------|-------|", + ]) + for key, val in load.items(): + lines.append(f"| {key} | {val} |") + lines.append("") + + # Sweep + sweep = report.get("sweep", []) + if sweep: + lines.extend([ + "## Sweep Results", + "", + "| Concurrency | Top-K | Throughput (QPS) | Avg Latency (ms) | P95 Latency (ms) | Errors |", + "|-------------|-------|-------------------|-------------------|-------------------|--------|", + ]) + for s in sweep: + lines.append( + f"| {s.get('concurrency', '-')} | {s.get('top_k', '-')} | " + f"{s.get('throughput_qps', '-')} | {s.get('avg_latency_ms', '-')} | " + f"{s.get('p95_latency_ms', '-')} | {s.get('errors', '-')} |" + ) + lines.append("") + + return "\n".join(lines) + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + +def main(): + parser = argparse.ArgumentParser( + description="HECTOR Performance Benchmark", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + "-c", "--config", + required=True, + help="Path to YAML config file", + ) + parser.add_argument( + "--api-key", + default=os.getenv("HECTOR_API_KEY"), + help="HECTOR API key", + ) + parser.add_argument( + "--version", + action="version", + version="HECTOR Benchmark 1.0.0", + ) + + args = parser.parse_args() + + # Load config + cfg = load_config(args.config) + print(f"\n{'='*60}") + print(f" HECTOR Performance Benchmark") + print(f"{'='*60}") + print(f" Target: {cfg.target.url}") + print(f" Config: {args.config}") + print(f" Experiment: {cfg.output.experiment_name}") + print(f"{'='*60}") + + # Load queries + queries = load_queries(cfg.input.file) + print(f" Queries loaded: {len(queries)}") + + # Phase 1: Profiling + profiling = {"skipped": True} + if cfg.profiling.enabled: + profiling = run_profiling(cfg, queries, args.api_key) + if profiling.get("stats"): + s = profiling["stats"] + print(f"\n [PROFILING DONE]") + print(f" Avg: {s.get('avg_latency_ms', 0):.0f}ms | " + f"P95: {s.get('p95_latency_ms', 0):.0f}ms | " + f"Throughput: {s.get('successful', 0) / max(profiling.get('results', [{}]).__len__(), 1):.1f} QPS") + + # Phase 2: Load test (single point or sweep) + load_results = None + sweep_results = None + + if cfg.aiperf.enabled: + concurrencies = cfg.aiperf.concurrency if isinstance(cfg.aiperf.concurrency, list) else [cfg.aiperf.concurrency] + top_ks = cfg.rag.top_k if isinstance(cfg.rag.top_k, list) else [cfg.rag.top_k] + + if len(concurrencies) > 1 or len(top_ks) > 1: + sweep_results = run_sweep(cfg, queries, args.api_key) + else: + load_results = run_load_test(cfg, queries, concurrencies[0], args.api_key) + if load_results.get("stats"): + s = load_results["stats"] + print(f"\n [LOAD TEST DONE]") + print(f" Throughput: {s.get('throughput_qps', 0):.1f} QPS | " + f"Avg: {s.get('avg_latency_ms', 0):.0f}ms | " + f"P95: {s.get('p95_latency_ms', 0):.0f}ms | " + f"Errors: {s.get('error_rate', 0):.1f}%") + + # Generate report + report = generate_report(cfg, profiling, load_results, sweep_results) + + # Save results + run_dir = save_results(report, profiling, load_results, sweep_results, + cfg.output.dir, cfg.output.experiment_name) + + print(f"\n{'='*60}") + print(f" BENCHMARK COMPLETE") + print(f"{'='*60}") + print(f" Results saved to: {run_dir}") + print(f" Report: {run_dir / 'report.md'}") + print(f" JSON: {run_dir / 'report.json'}") + print(f"{'='*60}\n") + + +if __name__ == "__main__": + main() diff --git a/benchmark/sweep_comparison.py b/benchmark/sweep_comparison.py new file mode 100644 index 0000000..dd30490 --- /dev/null +++ b/benchmark/sweep_comparison.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +""" +Sweep Comparison Table + +Reads sweep_results.json and prints a formatted comparison table +showing how concurrency and top_k affect throughput and latency. + +Usage: + python benchmark/sweep_comparison.py benchmark/results/sweep_run_XXX/sweep_results.json +""" + +import argparse +import json +import sys +from pathlib import Path + + +def main(): + parser = argparse.ArgumentParser(description="Sweep Comparison Table") + parser.add_argument("sweep_json", help="Path to sweep_results.json") + args = parser.parse_args() + + with open(args.sweep_json, "r", encoding="utf-8") as f: + data = json.load(f) + + if not data: + print("No sweep results found.") + sys.exit(1) + + # Filter points with valid stats + points = [p for p in data if p.get("stats") and p["stats"].get("throughput_qps")] + + if not points: + print("No valid data points in sweep results.") + sys.exit(1) + + # Sort by concurrency then top_k + points.sort(key=lambda p: (p.get("concurrency", 0), p.get("top_k", 0))) + + print(f"\n{'='*85}") + print(f" SWEEP RESULTS COMPARISON") + print(f"{'='*85}") + print(f" {'Concurrency':>12s} {'Top-K':>6s} {'QPS':>8s} {'Avg(ms)':>8s} {'P95(ms)':>8s} {'Err%':>6s} {'Success':>8s}") + print(f" {'-'*12} {'-'*6} {'-'*8} {'-'*8} {'-'*8} {'-'*6} {'-'*8}") + + for p in points: + s = p["stats"] + cr = p.get("concurrency", s.get("concurrency", "-")) + tk = p.get("top_k", "-") + qps = s.get("throughput_qps", 0) + avg = s.get("avg_latency_ms", 0) + p95 = s.get("p95_latency_ms", 0) + err = s.get("error_rate", 0) + ok = s.get("successful", 0) + + print(f" {cr:>12d} {tk:>6d} {qps:>8.1f} {avg:>8.0f} {p95:>8.0f} {err:>5.1f}% {ok:>8d}") + + print(f"{'='*85}") + + # Find optimal point + optimal = max(points, key=lambda p: p["stats"].get("throughput_qps", 0)) + s = optimal["stats"] + print(f"\n Optimal: concurrency={optimal.get('concurrency')}, top_k={optimal.get('top_k')}") + print(f" Throughput: {s.get('throughput_qps', 0):.1f} QPS") + print(f" P95 Latency: {s.get('p95_latency_ms', 0):.0f}ms") + print(f" Error Rate: {s.get('error_rate', 0):.1f}%") + print() + + +if __name__ == "__main__": + main() diff --git a/core/orchestrator.py b/core/orchestrator.py index dea9c26..8927435 100644 --- a/core/orchestrator.py +++ b/core/orchestrator.py @@ -1,3 +1,5 @@ +import time + from core.router import HectorRouter from data.hybrid_retriever import HectorHybridRetriever @@ -26,28 +28,37 @@ def execute(self, query): 4. Verify (if enabled) """ # Step 1: Routing & Diagnostic + t0 = time.perf_counter() intent = self.router.get_route(query) if not isinstance(intent, dict): intent = self.router._fallback_intent() route = intent.get("route", "GENERAL") + route_ms = (time.perf_counter() - t0) * 1000 # Step 2: Legal Normalization (only for research) + t1 = time.perf_counter() normalized_query = query mappings = [] if route == "LEGAL_RESEARCH": normalized_query, mappings = self.router.normalize_query(query) + normalize_ms = (time.perf_counter() - t1) * 1000 # Step 3: Intelligence Generation + t2 = time.perf_counter() try: response, sources = self._generate_strategic_response( route, normalized_query, intent, mappings ) except Exception as e: return f"Strategic failure: {str(e)}" + generate_ms = (time.perf_counter() - t2) * 1000 # Step 4: Verification for Legal Research + verify_ms = 0 if route == "LEGAL_RESEARCH" and self.enable_verification and sources: + t3 = time.perf_counter() verification = self.verifier.verify_response(response, sources) + verify_ms = (time.perf_counter() - t3) * 1000 if verification.get("needs_correction"): response = verification.get("verified_response", response) @@ -60,8 +71,23 @@ def execute(self, query): ) response += verification_note + total_ms = (time.perf_counter() - t0) * 1000 + + # Attach timing metadata (used by benchmark and API response) + self._last_timing = { + "route_ms": round(route_ms, 1), + "normalize_ms": round(normalize_ms, 1), + "generate_ms": round(generate_ms, 1), + "verify_ms": round(verify_ms, 1), + "total_ms": round(total_ms, 1), + } + return response + def get_last_timing(self) -> dict: + """Return timing from the last execute() call.""" + return getattr(self, "_last_timing", {}) + def _generate_strategic_response(self, route, query, intent, mappings=None): """Internal logic to fetch either legal data or general scaling advice.""" hector_msg = intent.get("hector_response", "") diff --git a/requirements.txt b/requirements.txt index e578b97..59f6623 100644 --- a/requirements.txt +++ b/requirements.txt @@ -32,6 +32,7 @@ tqdm~=4.67.0 httpx~=0.28.0 requests~=2.32.0 regex~=2024.11.0 +pyyaml~=6.0.0 # --- Testing --- pytest~=8.3.0 diff --git a/tests/test_perf_benchmark.py b/tests/test_perf_benchmark.py new file mode 100644 index 0000000..43873d4 --- /dev/null +++ b/tests/test_perf_benchmark.py @@ -0,0 +1,249 @@ +""" +Performance regression tests for HECTOR's RAG pipeline. + +These tests validate the benchmarking framework itself and verify +that core operations complete within acceptable time bounds. +Tests run WITHOUT a live HECTOR server (mocked API). +""" + +import json +import os +import sys +import tempfile +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from benchmark.rag_benchmark import ( + BenchConfig, + AiperfConfig, + InputConfig, + OutputConfig, + ProfilingConfig, + RagConfig, + TargetConfig, + load_config, + load_queries, +) +from benchmark.adapters.hector_adapter import profile_query + + +# --------------------------------------------------------------------------- +# Config loading +# --------------------------------------------------------------------------- + +class TestBenchmarkConfig: + """Tests for benchmark YAML config loading.""" + + def test_load_quick_profile(self): + """Quick profile config loads without error.""" + cfg = load_config("benchmark/configs/quick_profile.yaml") + assert cfg.target.url == "http://localhost:8000" + assert cfg.profiling.enabled is True + assert cfg.aiperf.enabled is False + + def test_load_single_run(self): + """Single run config loads with aiperf enabled.""" + cfg = load_config("benchmark/configs/single_run.yaml") + assert cfg.aiperf.enabled is True + assert cfg.aiperf.concurrency == 5 + + def test_load_sweep(self): + """Sweep config loads with list-valued axes.""" + cfg = load_config("benchmark/configs/sweep.yaml") + assert isinstance(cfg.aiperf.concurrency, list) + assert isinstance(cfg.rag.top_k, list) + assert len(cfg.aiperf.concurrency) >= 3 + assert len(cfg.rag.top_k) >= 3 + + def test_config_defaults(self): + """BenchConfig has sensible defaults.""" + cfg = BenchConfig() + assert cfg.target.timeout_s == 60 + assert cfg.rag.top_k == 10 + assert cfg.profiling.warmup_requests == 3 + assert cfg.output.dir == "benchmark/results" + + +# --------------------------------------------------------------------------- +# Query loading +# --------------------------------------------------------------------------- + +class TestQueryLoading: + """Tests for JSONL query loading.""" + + def test_load_queries(self): + """Loads queries from benchmark/queries.jsonl.""" + queries = load_queries("benchmark/queries.jsonl") + assert len(queries) >= 20 + assert all(isinstance(q, str) for q in queries) + assert all(len(q) > 5 for q in queries) + + def test_load_queries_empty_file(self): + """Empty JSONL returns empty list.""" + with tempfile.TemporaryDirectory() as tmpdir: + p = Path(tmpdir) / "empty.jsonl" + p.write_text("", encoding="utf-8") + queries = load_queries(str(p)) + assert queries == [] + + def test_load_queries_skips_blank_lines(self): + """Blank lines are skipped.""" + with tempfile.TemporaryDirectory() as tmpdir: + p = Path(tmpdir) / "mixed.jsonl" + p.write_text('{"query": "test1"}\n\n\n{"query": "test2"}\n', encoding="utf-8") + queries = load_queries(str(p)) + assert queries == ["test1", "test2"] + + +# --------------------------------------------------------------------------- +# Metric computation (from evaluation module) +# --------------------------------------------------------------------------- + +class TestBenchmarkMetrics: + """Tests for performance metric computation.""" + + def test_latency_percentiles(self): + """Percentile computation is correct.""" + latencies = list(range(1, 101)) # 1..100 + latencies_sorted = sorted(latencies) + n = len(latencies_sorted) + p50 = latencies_sorted[int(n * 0.5)] + p95 = latencies_sorted[int(n * 0.95)] + + assert p50 >= 50 # ~50th percentile + assert p95 >= 95 # ~95th percentile + + def test_throughput_calculation(self): + """Throughput is requests / elapsed_seconds.""" + total_requests = 100 + elapsed_s = 20.0 + throughput = total_requests / elapsed_s + assert throughput == 5.0 + + def test_error_rate_calculation(self): + """Error rate is errors / total * 100.""" + errors = 5 + total = 100 + error_rate = errors / total * 100 + assert error_rate == 5.0 + + +# --------------------------------------------------------------------------- +# Adapter tests +# --------------------------------------------------------------------------- + +class TestHectorAdapter: + """Tests for the HECTOR benchmark adapter.""" + + def test_adapter_is_importable(self): + """hector_adapter module is importable.""" + from benchmark.adapters import hector_adapter + assert hasattr(hector_adapter, "profile_query") + + def test_adapter_handles_connection_error(self): + """Adapter handles connection errors gracefully.""" + with patch("benchmark.adapters.hector_adapter.requests.post") as mock_post: + mock_post.side_effect = ConnectionError("Connection refused") + with pytest.raises(ConnectionError): + profile_query("localhost", 99999, "test query") + + +# --------------------------------------------------------------------------- +# CLI entry points +# --------------------------------------------------------------------------- + +class TestBenchmarkCli: + """Tests for benchmark CLI modules.""" + + def test_rag_benchmark_importable(self): + """rag_benchmark module is importable.""" + import benchmark.rag_benchmark as mod + assert hasattr(mod, "main") + assert hasattr(mod, "run_profiling") + assert hasattr(mod, "run_load_test") + assert hasattr(mod, "run_sweep") + assert hasattr(mod, "generate_report") + assert hasattr(mod, "save_results") + + def test_sweep_comparison_importable(self): + """sweep_comparison module is importable.""" + import benchmark.sweep_comparison as mod + assert hasattr(mod, "main") + + def test_report_generation(self): + """generate_report produces valid structure.""" + import benchmark.rag_benchmark as mod + + cfg = BenchConfig() + profiling = { + "stats": {"avg_latency_ms": 100, "p95_latency_ms": 200}, + "results": [], + } + report = mod.generate_report(cfg, profiling, None, None) + + assert "config" in report + assert "profiling" in report + assert "timestamp" in report + assert report["profiling"]["avg_latency_ms"] == 100 + + def test_report_with_load_test(self): + """generate_report includes load test data.""" + import benchmark.rag_benchmark as mod + + cfg = BenchConfig() + profiling = {"stats": {}, "results": []} + load_results = { + "stats": { + "concurrency": 5, + "throughput_qps": 10.5, + "p95_latency_ms": 300, + }, + "results": [], + } + report = mod.generate_report(cfg, profiling, load_results, None) + + assert "load_test" in report + assert report["load_test"]["throughput_qps"] == 10.5 + + +# --------------------------------------------------------------------------- +# Regression thresholds +# --------------------------------------------------------------------------- + +class TestPerformanceThresholds: + """Verify core operations complete within acceptable time bounds.""" + + def test_config_load_is_fast(self): + """Config loading completes in < 100ms.""" + t0 = time.perf_counter() + for _ in range(10): + load_config("benchmark/configs/quick_profile.yaml") + elapsed = (time.perf_counter() - t0) * 1000 + assert elapsed < 100, f"Config loading took {elapsed:.0f}ms (>100ms)" + + def test_query_loading_is_fast(self): + """Query loading completes in < 50ms.""" + t0 = time.perf_counter() + for _ in range(10): + load_queries("benchmark/queries.jsonl") + elapsed = (time.perf_counter() - t0) * 1000 + assert elapsed < 50, f"Query loading took {elapsed:.0f}ms (>50ms)" + + def test_report_generation_is_fast(self): + """Report generation completes in < 200ms.""" + import benchmark.rag_benchmark as mod + + cfg = BenchConfig() + profiling = { + "stats": {"avg_latency_ms": 100, "p95_latency_ms": 200}, + "results": [{"latency_ms": 100, "status": "success"}] * 20, + } + t0 = time.perf_counter() + report = mod.generate_report(cfg, profiling, None, None) + elapsed = (time.perf_counter() - t0) * 1000 + assert elapsed < 200, f"Report generation took {elapsed:.0f}ms (>200ms)"