From 342245484ccc887b4a3422a5ee82564d1502a608 Mon Sep 17 00:00:00 2001 From: nadheesh Date: Thu, 18 Jun 2026 14:40:01 +0530 Subject: [PATCH 1/5] Paginate TraceFetcher.fetch_traces and add deterministic trace sampling --- evaluation-job/main.py | 13 +- evaluation-job/test_main.py | 87 ++++++++++ .../src/amp_evaluation/runner.py | 85 +++++----- .../src/amp_evaluation/trace/__init__.py | 4 +- .../src/amp_evaluation/trace/fetcher.py | 97 +++++++++-- libs/amp-evaluation/tests/test_eval_runner.py | 87 ++++++++++ .../tests/test_trace_fetcher.py | 155 ++++++++++++++++++ 7 files changed, 474 insertions(+), 54 deletions(-) create mode 100644 libs/amp-evaluation/tests/test_trace_fetcher.py diff --git a/evaluation-job/main.py b/evaluation-job/main.py index cef14d79b..ca0132d3d 100644 --- a/evaluation-job/main.py +++ b/evaluation-job/main.py @@ -555,6 +555,13 @@ def main() -> None: ) sys.exit(1) + if not 0 < args.sampling_rate <= 1: + logger.error( + "Invalid --sampling-rate: %s. Expected a value in (0, 1]", + args.sampling_rate, + ) + sys.exit(1) + # Parse evaluators JSON try: evaluators_config = json.loads(args.evaluators) @@ -654,7 +661,11 @@ def main() -> None: ) # Run evaluation - result = monitor.run(start_time=args.trace_start, end_time=args.trace_end) + result = monitor.run( + start_time=args.trace_start, + end_time=args.trace_end, + sample_rate=args.sampling_rate, + ) # Fail if there were errors (e.g. trace fetching failed) if result.errors: diff --git a/evaluation-job/test_main.py b/evaluation-job/test_main.py index d7449c8bc..5bbabce57 100644 --- a/evaluation-job/test_main.py +++ b/evaluation-job/test_main.py @@ -855,6 +855,93 @@ def test_full_flow_with_realistic_evaluators(self, mock_builtin, mock_publish): # publish_scores was called mock_publish.assert_called_once() + # sampling_rate default (1.0) is forwarded to Monitor.run() + mock_monitor_instance.run.assert_called_once_with( + start_time="2026-01-15T10:00:00Z", + end_time="2026-01-15T11:00:00Z", + sample_rate=1.0, + ) + + @patch("main.publish_scores", return_value=True) + @patch("main.builtin") + def test_custom_sampling_rate_forwarded_to_monitor_run(self, mock_builtin, mock_publish): + """A non-default --sampling-rate is forwarded to Monitor.run().""" + from main import main + + mock_run_result = MagicMock() + mock_run_result.traces_evaluated = 5 + mock_run_result.duration_seconds = 2.5 + mock_run_result.success = True + mock_run_result.errors = [] + mock_run_result.scores = { + "Latency": _make_evaluator_summary("Latency", "trace", [], {"mean": 0.9}), + } + + mock_monitor_instance = MagicMock() + mock_monitor_instance.run.return_value = mock_run_result + + evaluators = [ + { + "identifier": "latency_performance", + "displayName": "Latency", + "config": {"level": "trace"}, + } + ] + argv = self._make_argv(evaluators) + ["--sampling-rate", "0.3"] + + with ( + patch.object(sys, "argv", argv), + patch.dict( + "os.environ", + { + "IDP_TOKEN_URL": "http://thunder:8090/oauth2/token", + "IDP_CLIENT_ID": "test-client", + "IDP_CLIENT_SECRET": "test-secret", + }, + ), + patch("main.TraceFetcher"), + patch("main.Monitor", return_value=mock_monitor_instance), + pytest.raises(SystemExit) as exc_info, + ): + main() + + assert exc_info.value.code == 0 + mock_monitor_instance.run.assert_called_once_with( + start_time="2026-01-15T10:00:00Z", + end_time="2026-01-15T11:00:00Z", + sample_rate=0.3, + ) + + @pytest.mark.parametrize("bad_rate", ["0", "1.5", "-0.2"]) + def test_out_of_range_sampling_rate_exits(self, bad_rate): + """Should exit with code 1 when --sampling-rate is outside (0, 1].""" + from main import main + + evaluators = [ + { + "identifier": "latency_performance", + "displayName": "Latency", + "config": {"level": "trace"}, + } + ] + argv = self._make_argv(evaluators) + ["--sampling-rate", bad_rate] + + with ( + patch.object(sys, "argv", argv), + patch.dict( + "os.environ", + { + "IDP_TOKEN_URL": "http://thunder:8090/oauth2/token", + "IDP_CLIENT_ID": "test-client", + "IDP_CLIENT_SECRET": "test-secret", + }, + ), + pytest.raises(SystemExit) as exc_info, + ): + main() + + assert exc_info.value.code == 1 + def test_missing_idp_credentials_exits(self): """Should exit with code 1 when IDP credentials are not set.""" from main import main diff --git a/libs/amp-evaluation/src/amp_evaluation/runner.py b/libs/amp-evaluation/src/amp_evaluation/runner.py index aca756832..3caca0490 100644 --- a/libs/amp-evaluation/src/amp_evaluation/runner.py +++ b/libs/amp-evaluation/src/amp_evaluation/runner.py @@ -37,13 +37,13 @@ result = monitor.run() """ -from typing import List, Dict, Literal, Optional, Any, Union, TYPE_CHECKING +from typing import List, Dict, Iterable, Literal, Optional, Any, Union, TYPE_CHECKING from dataclasses import dataclass, field from datetime import datetime from abc import ABC, abstractmethod import logging -from .trace import Trace, parse_trace_for_evaluation, TraceFetcher, TraceLoader +from .trace import Trace, parse_trace_for_evaluation, TraceFetcher, TraceLoader, sample_traces from .trace.fetcher import OTELTrace, _safe_request_error from .evaluators.base import BaseEvaluator, validate_unique_evaluator_names from .evaluators.params import EvalMode @@ -277,7 +277,7 @@ def _get_fetcher(self) -> Any: return self._fetcher_instance - def _fetch_traces(self, start_time: str, end_time: str) -> List[OTELTrace]: + def _fetch_traces(self, start_time: str, end_time: str) -> Iterable[OTELTrace]: """Unified interface to fetch traces.""" fetcher = self._get_fetcher() @@ -334,11 +334,11 @@ def evaluate_trace( def _evaluate_traces( self, - traces: List[Trace], + traces: Iterable[Trace], tasks: Optional[Dict[str, Task]] = None, trial_info: Optional[Dict[str, str]] = None, ) -> RunResult: - """Internal method to evaluate a list of traces.""" + """Internal method to evaluate traces from a (possibly lazy) iterable.""" from .dataset import generate_id run_id = generate_id("run") @@ -356,20 +356,31 @@ def _evaluate_traces( ) scores_by_evaluator: Dict[str, List[EvaluatorScore]] = {e.name: [] for e in self._evaluators} - total_traces = len(traces) evaluator_names = [e.name for e in self._evaluators] logger.info( - "Starting evaluation: %d trace(s) x %d evaluator(s) %s", - total_traces, + "Starting evaluation: %d evaluator(s) %s", len(self._evaluators), evaluator_names, ) - for idx, trace in enumerate(traces, 1): + trace_iter = iter(traces) + idx = 0 + while True: + try: + trace = next(trace_iter) + except StopIteration: + break + except Exception as e: + error_msg = f"Error fetching/parsing trace: {e}" + result.errors.append(error_msg) + logger.error(error_msg) + break + + idx += 1 task = tasks.get(trace.trace_id) if tasks else None trial_id = trial_info.get(trace.trace_id) if trial_info else None - logger.info("Evaluating trace %d/%d trace_id=%s", idx, total_traces, trace.trace_id) + logger.info("Evaluating trace %d trace_id=%s", idx, trace.trace_id) try: trace_scores = self.evaluate_trace(trace, task, trial_id=trial_id) @@ -664,9 +675,11 @@ def _fetch_and_match_traces( expected_count = len(dataset.tasks) * self.trials_per_task - fetched_traces = self._fetch_traces( - start_time=fetch_start.isoformat(), - end_time=fetch_end.isoformat(), + fetched_traces = list( + self._fetch_traces( + start_time=fetch_start.isoformat(), + end_time=fetch_end.isoformat(), + ) ) logger.info(f"Fetched {len(fetched_traces)} traces from trace service (expected: {expected_count})") @@ -756,43 +769,33 @@ def run( start_time: Optional[str] = None, end_time: Optional[str] = None, traces: Optional[List[Trace]] = None, + sample_rate: Optional[float] = None, **kwargs: Any, ) -> RunResult: """ Run monitor evaluation. Provide traces directly OR specify time range to fetch. - """ - eval_traces: List[Trace] = [] - if traces: - eval_traces = traces - else: - try: - fetched = self._fetch_traces( - start_time=start_time or "", - end_time=end_time or "", - ) - for trace in fetched: - try: - eval_traces.append(parse_trace_for_evaluation(trace)) - except Exception as parse_error: - logger.error(f"Error parsing trace: {parse_error}") - continue - - except Exception as e: - error_msg = f"Failed to fetch traces: {_safe_request_error(e)}" - logger.error(error_msg, exc_info=True) + Args: + sample_rate: Optional fraction (0, 1] of fetched traces to evaluate, + deterministically sampled by traceId. Ignored if `traces` is provided. + """ - from .dataset import generate_id + def _iter_parsed_traces() -> Iterable[Trace]: + fetched = self._fetch_traces( + start_time=start_time or "", + end_time=end_time or "", + ) + if sample_rate is not None: + fetched = sample_traces(fetched, sample_rate) + for otel_trace in fetched: + try: + yield parse_trace_for_evaluation(otel_trace) + except Exception as parse_error: + logger.error(f"Error parsing trace: {parse_error}") - return RunResult( - run_id=generate_id("run"), - eval_mode=EvalMode.MONITOR, - started_at=datetime.now(), - completed_at=datetime.now(), - errors=[error_msg], - ) + eval_traces: Iterable[Trace] = traces if traces else _iter_parsed_traces() run_result = self._evaluate_traces( traces=eval_traces, diff --git a/libs/amp-evaluation/src/amp_evaluation/trace/__init__.py b/libs/amp-evaluation/src/amp_evaluation/trace/__init__.py index 1a99c95b8..300dce204 100644 --- a/libs/amp-evaluation/src/amp_evaluation/trace/__init__.py +++ b/libs/amp-evaluation/src/amp_evaluation/trace/__init__.py @@ -13,6 +13,7 @@ ... parse_trace_for_evaluation, # Parser ... parse_traces_for_evaluation, ... TraceFetcher, TraceLoader, # Fetch traces from platform or files + ... sample_traces, # Deterministic client-side sampling ... ) """ @@ -56,7 +57,7 @@ ) # Fetcher -from .fetcher import TraceFetcher, TraceLoader +from .fetcher import TraceFetcher, TraceLoader, sample_traces __all__ = [ @@ -95,4 +96,5 @@ # Fetchers "TraceFetcher", "TraceLoader", + "sample_traces", ] diff --git a/libs/amp-evaluation/src/amp_evaluation/trace/fetcher.py b/libs/amp-evaluation/src/amp_evaluation/trace/fetcher.py index 5cf751c65..b22453d7c 100644 --- a/libs/amp-evaluation/src/amp_evaluation/trace/fetcher.py +++ b/libs/amp-evaluation/src/amp_evaluation/trace/fetcher.py @@ -33,10 +33,11 @@ from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import List, Optional, Dict, Any, Callable +from typing import List, Optional, Dict, Any, Callable, Iterable, Iterator, Tuple from amp_evaluation.trace.models import ToolDefinition from pathlib import Path +import hashlib import json import logging import requests @@ -424,18 +425,16 @@ def _get_auth_headers(self) -> Dict[str, str]: token = self.token_provider() return {"Authorization": f"Bearer {token}"} - def fetch_traces(self, start_time: str, end_time: str) -> List[OTELTrace]: + def _fetch_page( + self, start_time: str, end_time: str, limit: int, sort_order: str = "asc" + ) -> Tuple[List[OTELTrace], int]: """ - Fetch traces from the trace service using /traces/export endpoint. - - Args: - start_time: Start time in ISO 8601 format (e.g., "2025-12-16T06:58:02.433Z") - end_time: End time in ISO 8601 format + Fetch a single page from /traces/export. Returns: - List of Trace objects with OTEL/AMP attributes + Tuple of (traces in this page, totalCount reported by the API for the + full start_time..end_time range). """ - try: headers = self._get_auth_headers() response = requests.get( @@ -447,6 +446,8 @@ def fetch_traces(self, start_time: str, end_time: str) -> List[OTELTrace]: "project": self.project, "agent": self.agent, "environment": self.environment, + "limit": str(limit), + "sortOrder": sort_order, }, headers=headers, timeout=self.timeout, @@ -454,14 +455,63 @@ def fetch_traces(self, start_time: str, end_time: str) -> List[OTELTrace]: response.raise_for_status() data = response.json() - # Parse TraceExportResponse traces_data = data.get("traces", []) - return [_parse_trace(t) for t in traces_data] + return [_parse_trace(t) for t in traces_data], data.get("totalCount", len(traces_data)) except requests.exceptions.RequestException as e: logger.error("Failed to fetch traces: %s", _safe_request_error(e)) raise + def fetch_traces( + self, + start_time: str, + end_time: str, + page_size: int = 1000, + max_traces: Optional[int] = None, + ) -> Iterator[OTELTrace]: + """ + Fetch traces from the trace service using /traces/export endpoint. + + /traces/export caps each response at `page_size` traces (max 1000) and has no + cursor, so this walks the time window in ascending order, re-querying from the + last seen trace's startTime until the range is exhausted. Traces are yielded + lazily as each page comes in. + + Args: + start_time: Start time in ISO 8601 format (e.g., "2025-12-16T06:58:02.433Z") + end_time: End time in ISO 8601 format + page_size: Max traces to request per page (API max is 1000) + max_traces: Optional cap on the total number of traces to yield + + Yields: + Trace objects with OTEL/AMP attributes, in ascending startTime order + """ + seen_ids: set = set() + cursor_start = start_time + yielded = 0 + + while True: + page, _total_count = self._fetch_page(cursor_start, end_time, limit=page_size, sort_order="asc") + if not page: + return + + new_traces = [t for t in page if t.traceId not in seen_ids] + if not new_traces: + # Entire page already seen: a startTime tie spans more than one page. + return + + for trace in new_traces: + seen_ids.add(trace.traceId) + yield trace + yielded += 1 + if max_traces is not None and yielded >= max_traces: + return + + if len(page) < page_size: + return + + cursor_start = page[-1].startTime + def fetch_trace_by_id(self, trace_id: str) -> Optional[OTELTrace]: """ Fetch a single trace by its ID using /trace endpoint. @@ -532,6 +582,31 @@ def health_check(self) -> bool: return False +# ============================================================================ +# Sampling +# ============================================================================ + + +def sample_traces(traces: Iterable[OTELTrace], sample_rate: float) -> Iterator[OTELTrace]: + """ + Deterministically sample traces by hashing traceId, so the same trace is always + included or excluded for a given sample_rate regardless of fetch order or page + boundaries. Composes with any iterable (fetched lazily or already in memory). + + Args: + traces: Traces to sample from + sample_rate: Fraction to keep, in (0, 1] + """ + if not 0 < sample_rate <= 1: + raise ValueError(f"sample_rate must be in (0, 1], got {sample_rate}") + + threshold = int(sample_rate * 10_000) + for trace in traces: + digest = hashlib.sha256(trace.traceId.encode()).hexdigest() + if int(digest, 16) % 10_000 < threshold: + yield trace + + # ============================================================================ # Trace Loader (for loading traces from files) # ============================================================================ diff --git a/libs/amp-evaluation/tests/test_eval_runner.py b/libs/amp-evaluation/tests/test_eval_runner.py index 96649e5a4..8a62c142a 100644 --- a/libs/amp-evaluation/tests/test_eval_runner.py +++ b/libs/amp-evaluation/tests/test_eval_runner.py @@ -176,6 +176,93 @@ def solo_eval(trace: Trace) -> EvalResult: assert runner.evaluator_names == ["solo_eval"] +# ============================================================================ +# TESTS: Monitor fetch -> sample -> evaluate pipeline (lazy, via trace_fetcher) +# ============================================================================ + + +class _FakeFetcher: + """Stands in for a TraceFetcher: records the call args and yields lazily, + like the real generator-based TraceFetcher.fetch_traces does.""" + + def __init__(self, otel_traces): + self._otel_traces = otel_traces + self.calls = [] + + def fetch_traces(self, start_time, end_time): + self.calls.append((start_time, end_time)) + return iter(self._otel_traces) + + +@pytest.fixture +def otel_traces(): + """Raw OTELTrace objects (pre-parse), as a fetcher would return them.""" + loader = TraceLoader(file_path=str(FIXTURES_DIR / "sample_traces.json")) + return loader.load_traces() + + +class TestMonitorFetchAndSamplePipeline: + """Monitor.run() should fetch via the configured fetcher, optionally sample, + then parse/evaluate lazily -- without requiring an explicit `traces=` list.""" + + def test_fetches_via_time_range_and_evaluates_all(self, otel_traces): + evaluated_ids = [] + + @evaluator(name="record_trace_id") + def record_trace_id(trace: Trace) -> EvalResult: + evaluated_ids.append(trace.trace_id) + return EvalResult(score=1.0) + + fetcher = _FakeFetcher(otel_traces) + runner = Monitor(evaluators=[record_trace_id], trace_fetcher=fetcher) + + result = runner.run(start_time="2026-02-01T00:00:00Z", end_time="2026-02-10T00:00:00Z") + + assert fetcher.calls == [("2026-02-01T00:00:00Z", "2026-02-10T00:00:00Z")] + assert result.traces_evaluated == len(otel_traces) + assert sorted(evaluated_ids) == sorted(t.traceId for t in otel_traces) + + def test_sample_rate_filters_traces_before_evaluation(self, otel_traces): + from amp_evaluation.trace.fetcher import sample_traces as deterministic_sample + + expected_ids = {t.traceId for t in deterministic_sample(otel_traces, sample_rate=0.5)} + # Sanity: the fixture set is small, so make sure the sample isn't trivially + # "all" or "none" -- otherwise this test wouldn't actually exercise filtering. + assert 0 < len(expected_ids) < len(otel_traces) + + evaluated_ids = [] + + @evaluator(name="record_trace_id") + def record_trace_id(trace: Trace) -> EvalResult: + evaluated_ids.append(trace.trace_id) + return EvalResult(score=1.0) + + fetcher = _FakeFetcher(otel_traces) + runner = Monitor(evaluators=[record_trace_id], trace_fetcher=fetcher) + + result = runner.run(start_time="s", end_time="e", sample_rate=0.5) + + assert set(evaluated_ids) == expected_ids + assert result.traces_evaluated == len(expected_ids) + + def test_explicit_traces_param_bypasses_fetcher_and_sampling(self, sample_traces): + evaluated_ids = [] + + @evaluator(name="record_trace_id") + def record_trace_id(trace: Trace) -> EvalResult: + evaluated_ids.append(trace.trace_id) + return EvalResult(score=1.0) + + fetcher = _FakeFetcher(otel_traces=[]) + runner = Monitor(evaluators=[record_trace_id], trace_fetcher=fetcher) + + result = runner.run(traces=sample_traces, sample_rate=0.1) + + assert fetcher.calls == [] # fetcher never invoked + assert result.traces_evaluated == len(sample_traces) + assert sorted(evaluated_ids) == sorted(t.trace_id for t in sample_traces) + + # ============================================================================ # REGRESSION TESTS: Bugs found during code review # ============================================================================ diff --git a/libs/amp-evaluation/tests/test_trace_fetcher.py b/libs/amp-evaluation/tests/test_trace_fetcher.py new file mode 100644 index 000000000..9da3c1d80 --- /dev/null +++ b/libs/amp-evaluation/tests/test_trace_fetcher.py @@ -0,0 +1,155 @@ +# Copyright (c) 2026, WSO2 LLC. (https://www.wso2.com). +# +# WSO2 LLC. licenses this file to you under the Apache License, +# Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Unit tests for TraceFetcher pagination and sample_traces. +""" + +from typing import List +from unittest.mock import patch, MagicMock + +import pytest + +from amp_evaluation.trace.fetcher import TraceFetcher, sample_traces + + +def _raw_trace(trace_id: str, start_time: str) -> dict: + return { + "traceId": trace_id, + "rootSpanId": f"{trace_id}-root", + "rootSpanName": "root", + "startTime": start_time, + "endTime": start_time, + "spans": [], + } + + +def _make_fetcher() -> TraceFetcher: + return TraceFetcher( + base_url="http://localhost:8001", + organization="org", + project="proj", + agent="agent", + environment="dev", + token_provider=lambda: "test-token", + ) + + +def _mock_response(traces: List[dict], total_count: int) -> MagicMock: + resp = MagicMock() + resp.raise_for_status.return_value = None + resp.json.return_value = {"traces": traces, "totalCount": total_count, "truncated": False} + return resp + + +class TestFetchTracesPagination: + def test_paginates_across_multiple_pages(self): + fetcher = _make_fetcher() + + page1 = [_raw_trace(f"t{i}", f"2026-01-01T00:00:{i:02d}Z") for i in range(0, 50)] + page2 = [_raw_trace(f"t{i}", f"2026-01-01T00:01:{i - 50:02d}Z") for i in range(50, 100)] + page3 = [_raw_trace(f"t{i}", f"2026-01-01T00:02:{i - 100:02d}Z") for i in range(100, 125)] + + responses = [ + _mock_response(page1, total_count=125), + _mock_response(page2, total_count=125), + _mock_response(page3, total_count=125), + ] + + with patch("requests.get", side_effect=responses) as mock_get: + result = list( + fetcher.fetch_traces(start_time="2026-01-01T00:00:00Z", end_time="2026-01-01T01:00:00Z", page_size=50) + ) + + assert [t.traceId for t in result] == [f"t{i}" for i in range(125)] + assert mock_get.call_count == 3 + # last call advanced the cursor to the last trace's startTime of the prior page + assert mock_get.call_args_list[1].kwargs["params"]["startTime"] == page1[-1]["startTime"] + assert mock_get.call_args_list[2].kwargs["params"]["startTime"] == page2[-1]["startTime"] + + def test_stops_when_page_smaller_than_page_size(self): + fetcher = _make_fetcher() + page = [_raw_trace("t0", "2026-01-01T00:00:00Z")] + + with patch("requests.get", return_value=_mock_response(page, total_count=1)) as mock_get: + result = list(fetcher.fetch_traces(start_time="s", end_time="e", page_size=50)) + + assert [t.traceId for t in result] == ["t0"] + assert mock_get.call_count == 1 + + def test_dedupes_traces_tied_at_page_boundary(self): + """If multiple traces share the exact boundary startTime, the next page's + re-query (from that same startTime) may re-return them. They must not be + yielded twice, and a page that returns nothing new must stop iteration.""" + fetcher = _make_fetcher() + + tie_time = "2026-01-01T00:00:00Z" + page1 = [_raw_trace("t0", tie_time), _raw_trace("t1", tie_time)] + # Re-querying from tie_time returns the same two traces again (no new ones). + page2 = [_raw_trace("t0", tie_time), _raw_trace("t1", tie_time)] + + with patch("requests.get", side_effect=[_mock_response(page1, 2), _mock_response(page2, 2)]) as mock_get: + result = list(fetcher.fetch_traces(start_time="s", end_time="e", page_size=2)) + + assert [t.traceId for t in result] == ["t0", "t1"] + assert mock_get.call_count == 2 + + def test_max_traces_stops_early(self): + fetcher = _make_fetcher() + page = [_raw_trace(f"t{i}", f"2026-01-01T00:00:{i:02d}Z") for i in range(10)] + + with patch("requests.get", return_value=_mock_response(page, total_count=10)): + result = list(fetcher.fetch_traces(start_time="s", end_time="e", page_size=10, max_traces=3)) + + assert [t.traceId for t in result] == ["t0", "t1", "t2"] + + def test_empty_page_stops_iteration(self): + fetcher = _make_fetcher() + + with patch("requests.get", return_value=_mock_response([], total_count=0)) as mock_get: + result = list(fetcher.fetch_traces(start_time="s", end_time="e")) + + assert result == [] + assert mock_get.call_count == 1 + + +class TestSampleTraces: + def test_deterministic_across_runs(self): + traces = [MagicMock(traceId=f"trace-{i}") for i in range(500)] + + first = [t.traceId for t in sample_traces(traces, sample_rate=0.3)] + second = [t.traceId for t in sample_traces(traces, sample_rate=0.3)] + + assert first == second + assert len(first) > 0 + + def test_approximate_rate_over_large_set(self): + traces = [MagicMock(traceId=f"trace-{i}") for i in range(5000)] + + kept = list(sample_traces(traces, sample_rate=0.2)) + + ratio = len(kept) / len(traces) + assert 0.15 < ratio < 0.25 + + def test_sample_rate_one_keeps_everything(self): + traces = [MagicMock(traceId=f"trace-{i}") for i in range(20)] + assert list(sample_traces(traces, sample_rate=1.0)) == traces + + def test_invalid_sample_rate_raises(self): + with pytest.raises(ValueError): + list(sample_traces([], sample_rate=0)) + with pytest.raises(ValueError): + list(sample_traces([], sample_rate=1.5)) From 2feed7ac7111b6bbf32e97aefcc3989f909ea7dc Mon Sep 17 00:00:00 2001 From: nadheesh Date: Mon, 22 Jun 2026 10:17:37 +0530 Subject: [PATCH 2/5] Make TraceFetcher page size configurable and memory-bound --- evaluation-job/main.py | 8 +++- evaluation-job/test_main.py | 6 ++- .../src/amp_evaluation/trace/fetcher.py | 20 +++++++--- .../tests/test_trace_fetcher.py | 39 +++++++++++++++++++ 4 files changed, 65 insertions(+), 8 deletions(-) diff --git a/evaluation-job/main.py b/evaluation-job/main.py index ca0132d3d..0dbc9529d 100644 --- a/evaluation-job/main.py +++ b/evaluation-job/main.py @@ -55,6 +55,11 @@ PUBLISH_MAX_RETRIES = 3 PUBLISH_INITIAL_BACKOFF = 2 # seconds +# Max traces per /traces/export page. Kept small and deliberate -- each page is fully +# parsed into memory at once, so this bounds peak memory regardless of how many traces +# match the monitor's time window. Not exposed as a CLI flag; tune this constant directly. +TRACE_FETCH_PAGE_SIZE = 10 + class OAuth2TokenManager: """Manages OAuth2 client_credentials tokens with caching.""" @@ -529,7 +534,7 @@ def main() -> None: logger.info("Configured LLM client to route through OpenAI-compatible gateway at %s", llm_api_base) logger.info( - "Starting monitor evaluation monitor=%s organization=%s project=%s agent=%s env=%s time_range=%s..%s sampling=%.1f", + "Starting monitor evaluation monitor=%s organization=%s project=%s agent=%s env=%s time_range=%s..%s sampling=%.2f", args.monitor_name, args.organization, args.project, @@ -653,6 +658,7 @@ def main() -> None: agent=args.agent, environment=args.environment, token_provider=token_manager.get_token, + page_size=TRACE_FETCH_PAGE_SIZE, ) monitor = Monitor( diff --git a/evaluation-job/test_main.py b/evaluation-job/test_main.py index 5bbabce57..b7327d7ae 100644 --- a/evaluation-job/test_main.py +++ b/evaluation-job/test_main.py @@ -36,6 +36,7 @@ validate_time_format, publish_scores, OAuth2TokenManager, + TRACE_FETCH_PAGE_SIZE, _eval_template, _load_custom_code_evaluator, ) @@ -834,7 +835,7 @@ def test_full_flow_with_realistic_evaluators(self, mock_builtin, mock_publish): "IDP_CLIENT_SECRET": "test-secret", }, ), - patch("main.TraceFetcher"), + patch("main.TraceFetcher") as mock_trace_fetcher, patch("main.Monitor", return_value=mock_monitor_instance), pytest.raises(SystemExit) as exc_info, ): @@ -862,6 +863,9 @@ def test_full_flow_with_realistic_evaluators(self, mock_builtin, mock_publish): sample_rate=1.0, ) + # page_size is set on the TraceFetcher via the in-code constant, not a CLI flag + assert mock_trace_fetcher.call_args.kwargs["page_size"] == TRACE_FETCH_PAGE_SIZE + @patch("main.publish_scores", return_value=True) @patch("main.builtin") def test_custom_sampling_rate_forwarded_to_monitor_run(self, mock_builtin, mock_publish): diff --git a/libs/amp-evaluation/src/amp_evaluation/trace/fetcher.py b/libs/amp-evaluation/src/amp_evaluation/trace/fetcher.py index b22453d7c..7ef271a32 100644 --- a/libs/amp-evaluation/src/amp_evaluation/trace/fetcher.py +++ b/libs/amp-evaluation/src/amp_evaluation/trace/fetcher.py @@ -386,6 +386,7 @@ def __init__( environment: str, token_provider: Optional[Callable[[], str]] = None, timeout: int = 30, + page_size: int = 10, ): """ Initialize trace fetcher. @@ -398,6 +399,8 @@ def __init__( environment: Environment name (required) token_provider: Callable that returns a JWT token for authentication (required) timeout: Request timeout in seconds + page_size: Default max traces per /traces/export page (memory-bound; API + allows up to 1000). Override per-call via fetch_traces(page_size=...). """ if not base_url: raise ValueError("base_url is required") @@ -419,6 +422,7 @@ def __init__( self.environment = environment self.token_provider = token_provider self.timeout = timeout + self.page_size = page_size def _get_auth_headers(self) -> Dict[str, str]: """Get authorization headers with a fresh JWT token.""" @@ -466,26 +470,30 @@ def fetch_traces( self, start_time: str, end_time: str, - page_size: int = 1000, + page_size: Optional[int] = None, max_traces: Optional[int] = None, ) -> Iterator[OTELTrace]: """ Fetch traces from the trace service using /traces/export endpoint. - /traces/export caps each response at `page_size` traces (max 1000) and has no - cursor, so this walks the time window in ascending order, re-querying from the - last seen trace's startTime until the range is exhausted. Traces are yielded - lazily as each page comes in. + /traces/export caps each response at `page_size` traces (API allows up to 1000) + and has no cursor, so this walks the time window in ascending order, re-querying + from the last seen trace's startTime until the range is exhausted. Traces are + yielded lazily as each page comes in, so at most `page_size` parsed trace objects + (each with nested spans/payloads) are held in memory at once — keep this small + rather than maxing it out at 1000. Args: start_time: Start time in ISO 8601 format (e.g., "2025-12-16T06:58:02.433Z") end_time: End time in ISO 8601 format - page_size: Max traces to request per page (API max is 1000) + page_size: Max traces to request per page (memory-bound; API allows up to + 1000). Defaults to the page_size set on this fetcher instance. max_traces: Optional cap on the total number of traces to yield Yields: Trace objects with OTEL/AMP attributes, in ascending startTime order """ + page_size = page_size if page_size is not None else self.page_size seen_ids: set = set() cursor_start = start_time yielded = 0 diff --git a/libs/amp-evaluation/tests/test_trace_fetcher.py b/libs/amp-evaluation/tests/test_trace_fetcher.py index 9da3c1d80..694813eaa 100644 --- a/libs/amp-evaluation/tests/test_trace_fetcher.py +++ b/libs/amp-evaluation/tests/test_trace_fetcher.py @@ -125,6 +125,45 @@ def test_empty_page_stops_iteration(self): assert result == [] assert mock_get.call_count == 1 + def test_constructor_page_size_used_when_not_overridden(self): + fetcher = TraceFetcher( + base_url="http://localhost:8001", + organization="org", + project="proj", + agent="agent", + environment="dev", + token_provider=lambda: "test-token", + page_size=25, + ) + page = [_raw_trace("t0", "2026-01-01T00:00:00Z")] + + with patch("requests.get", return_value=_mock_response(page, total_count=1)) as mock_get: + list(fetcher.fetch_traces(start_time="s", end_time="e")) + + assert mock_get.call_args.kwargs["params"]["limit"] == "25" + + def test_per_call_page_size_overrides_constructor_default(self): + fetcher = TraceFetcher( + base_url="http://localhost:8001", + organization="org", + project="proj", + agent="agent", + environment="dev", + token_provider=lambda: "test-token", + page_size=25, + ) + page = [_raw_trace("t0", "2026-01-01T00:00:00Z")] + + with patch("requests.get", return_value=_mock_response(page, total_count=1)) as mock_get: + list(fetcher.fetch_traces(start_time="s", end_time="e", page_size=5)) + + assert mock_get.call_args.kwargs["params"]["limit"] == "5" + + def test_default_page_size_is_small(self): + """The constructor default must stay memory-bound, not the API's 1000 max.""" + fetcher = _make_fetcher() + assert fetcher.page_size == 10 + class TestSampleTraces: def test_deterministic_across_runs(self): From a0649d95197e33bb52ea167a9f96a9ee32a0a668 Mon Sep 17 00:00:00 2001 From: nadheesh Date: Mon, 22 Jun 2026 10:18:04 +0530 Subject: [PATCH 3/5] Add sampling rate slider to monitor create form --- .../eval/src/CreateMonitor.Component.tsx | 6 ++-- .../workspaces/pages/eval/src/form/schema.ts | 4 +-- .../src/subComponents/CreateMonitorForm.tsx | 28 +++++++++++++++++++ 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/console/workspaces/pages/eval/src/CreateMonitor.Component.tsx b/console/workspaces/pages/eval/src/CreateMonitor.Component.tsx index a0794a752..787dd8b5c 100644 --- a/console/workspaces/pages/eval/src/CreateMonitor.Component.tsx +++ b/console/workspaces/pages/eval/src/CreateMonitor.Component.tsx @@ -82,7 +82,7 @@ export const CreateMonitorComponent: React.FC = () => { const samplingRatePercent = sourceMonitor.samplingRate !== undefined ? Math.min(100, Math.max(0, Math.round(sourceMonitor.samplingRate * 100))) - : 25; + : 100; return { displayName, name: slugifyMonitorName(displayName), @@ -107,7 +107,7 @@ export const CreateMonitorComponent: React.FC = () => { traceStart: defaultTimeRange.start, traceEnd: defaultTimeRange.end, intervalMinutes: 60, - samplingRate: 25, + samplingRate: 100, evaluators: [], }; }, [duplicateFrom, sourceMonitor, defaultTimeRange, envId]); @@ -150,7 +150,7 @@ export const CreateMonitorComponent: React.FC = () => { ? values.traceStart.toISOString() : undefined, traceEnd: values.traceEnd ? values.traceEnd.toISOString() : undefined, - samplingRate: (values.samplingRate ?? 0) / 100, + samplingRate: (values.samplingRate ?? 100) / 100, }; createMonitor(payload, { diff --git a/console/workspaces/pages/eval/src/form/schema.ts b/console/workspaces/pages/eval/src/form/schema.ts index f828175fb..a60fd781c 100644 --- a/console/workspaces/pages/eval/src/form/schema.ts +++ b/console/workspaces/pages/eval/src/form/schema.ts @@ -84,9 +84,9 @@ export const createMonitorSchema = z .refine( (value) => value === undefined || - (Number.isInteger(value) && value >= 0 && value <= 100), + (Number.isInteger(value) && value > 0 && value <= 100), { - message: "Sampling rate must be between 0 and 100", + message: "Sampling rate must be between 1 and 100", }, ) .optional(), diff --git a/console/workspaces/pages/eval/src/subComponents/CreateMonitorForm.tsx b/console/workspaces/pages/eval/src/subComponents/CreateMonitorForm.tsx index 7c849693f..db5cb29d5 100644 --- a/console/workspaces/pages/eval/src/subComponents/CreateMonitorForm.tsx +++ b/console/workspaces/pages/eval/src/subComponents/CreateMonitorForm.tsx @@ -23,6 +23,7 @@ import { Form, MenuItem, Select, + Slider, TextField, Typography, } from "@wso2/oxygen-ui"; @@ -31,6 +32,8 @@ import type { MonitorType } from "@agent-management-platform/types"; import type { CreateMonitorFormValues } from "../form/schema"; import { getMonitorTypeFieldPatch } from "../utils/monitorFormUtils"; +const SAMPLING_RATE_MARKS = [{ value: 100, label: "100%" }]; + export interface EnvironmentOption { name: string; displayName?: string; @@ -244,6 +247,31 @@ export function CreateMonitorForm({ + + + + onFieldChange( + "samplingRate", + Array.isArray(sliderValue) ? sliderValue[0] : sliderValue, + ) + } + /> + + {errors.samplingRate ?? + "Percentage of matching traces to evaluate"} + + + ); From 4b10b278cab8b8dc6feeeefaaffa644e7464398d Mon Sep 17 00:00:00 2001 From: nadheesh Date: Mon, 22 Jun 2026 10:31:37 +0530 Subject: [PATCH 4/5] Add compare view for monitors --- .../workspaces/core-ui/src/Route/Route.tsx | 5 + .../workspaces/core-ui/src/pages/index.tsx | 2 + .../types/src/routes/generated-route.map.ts | 5 + .../libs/types/src/routes/routes.map.ts | 5 + .../eval/src/CompareMonitor.Component.tsx | 612 ++++++++++++++++++ .../pages/eval/src/ViewMonitor.Component.tsx | 128 +++- console/workspaces/pages/eval/src/index.ts | 8 + .../subComponents/AgentPerformanceCard.tsx | 39 +- .../subComponents/EvaluationSummaryCard.tsx | 4 +- .../pages/eval/src/utils/monitorScoreUtils.ts | 60 ++ 10 files changed, 824 insertions(+), 44 deletions(-) create mode 100644 console/workspaces/pages/eval/src/CompareMonitor.Component.tsx create mode 100644 console/workspaces/pages/eval/src/utils/monitorScoreUtils.ts diff --git a/console/workspaces/core-ui/src/Route/Route.tsx b/console/workspaces/core-ui/src/Route/Route.tsx index 099626d8e..72dfe41c7 100644 --- a/console/workspaces/core-ui/src/Route/Route.tsx +++ b/console/workspaces/core-ui/src/Route/Route.tsx @@ -56,6 +56,7 @@ import { LazyCreateMonitorComponent, LazyViewMonitorComponent, LazyEditMonitorComponent, + LazyCompareMonitorComponent, } from "../pages"; import { LoadingFallback } from "../components/LoadingFallback"; import { relativeRouteMap } from "@agent-management-platform/types"; @@ -548,6 +549,10 @@ export function RootRouter() { path={monitorBase + "/" + monitorRoutes.children.edit.path} element={} /> + } + /> } diff --git a/console/workspaces/core-ui/src/pages/index.tsx b/console/workspaces/core-ui/src/pages/index.tsx index f2628ecfa..25bcc10d9 100644 --- a/console/workspaces/core-ui/src/pages/index.tsx +++ b/console/workspaces/core-ui/src/pages/index.tsx @@ -85,6 +85,8 @@ export const LazyEditMonitorComponent = evalMetadata.pages.organization.editMonitor.component as FC; export const LazyViewMonitorComponent = evalMetadata.pages.organization.viewMonitor.component as FC; +export const LazyCompareMonitorComponent = + evalMetadata.pages.organization.compareMonitor.component as FC; // LLM Providers export const LazyLLMProvidersOrg = llmProvidersMetadata.levels!.organization as FC; diff --git a/console/workspaces/libs/types/src/routes/generated-route.map.ts b/console/workspaces/libs/types/src/routes/generated-route.map.ts index 98043000e..cf5ac3869 100644 --- a/console/workspaces/libs/types/src/routes/generated-route.map.ts +++ b/console/workspaces/libs/types/src/routes/generated-route.map.ts @@ -304,6 +304,11 @@ export const generatedRouteMap = { "path": "/org/:orgId/project/:projectId/agents/:agentId/environment/:envId/evaluation/monitor/edit/:monitorId", "wildPath": "/org/:orgId/project/:projectId/agents/:agentId/environment/:envId/evaluation/monitor/edit/:monitorId/*", "children": {} + }, + "compare": { + "path": "/org/:orgId/project/:projectId/agents/:agentId/environment/:envId/evaluation/monitor/compare/:monitorId", + "wildPath": "/org/:orgId/project/:projectId/agents/:agentId/environment/:envId/evaluation/monitor/compare/:monitorId/*", + "children": {} } } } diff --git a/console/workspaces/libs/types/src/routes/routes.map.ts b/console/workspaces/libs/types/src/routes/routes.map.ts index 82275919f..02e766f3c 100644 --- a/console/workspaces/libs/types/src/routes/routes.map.ts +++ b/console/workspaces/libs/types/src/routes/routes.map.ts @@ -323,6 +323,11 @@ export const rootRouteMap: AppRoute = { path: 'edit/:monitorId', index: true, children: {}, + }, + compare: { + path: 'compare/:monitorId', + index: true, + children: {}, } }, } diff --git a/console/workspaces/pages/eval/src/CompareMonitor.Component.tsx b/console/workspaces/pages/eval/src/CompareMonitor.Component.tsx new file mode 100644 index 000000000..7e7a7eb35 --- /dev/null +++ b/console/workspaces/pages/eval/src/CompareMonitor.Component.tsx @@ -0,0 +1,612 @@ +/** + * Copyright (c) 2026, WSO2 LLC. (https://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import React, { useMemo, useState } from "react"; +import { formatTraceWindow, PageLayout } from "@agent-management-platform/views"; +import { + Alert, + Card, + CardContent, + Chip, + Grid, + Menu, + MenuItem, + Select, + Skeleton, + Stack, + Typography, + useTheme, +} from "@wso2/oxygen-ui"; +import { ChevronDown } from "@wso2/oxygen-ui-icons-react"; +import { + generatePath, + useNavigate, + useParams, + useSearchParams, +} from "react-router-dom"; +import { + absoluteRouteMap, + type EvaluationLevel, + type EvaluatorScoreSummary, + type MonitorResponse, + TraceListTimeRange, +} from "@agent-management-platform/types"; +import { + useGetMonitor, + useListMonitors, + useMonitorScores, +} from "@agent-management-platform/api-client"; +import AgentPerformanceCard, { + type RadarDataPoint, + type RadarDefinition, +} from "./subComponents/AgentPerformanceCard"; +import EvaluationSummaryCard from "./subComponents/EvaluationSummaryCard"; +import ScoreChip from "./subComponents/ScoreChip"; +import { LEVEL_CONFIG, levelChipSx } from "./subComponents/levelConfig"; +import { + computeAverageScore, + computeLevelSummaries, + getMean, +} from "./utils/monitorScoreUtils"; + +const MONITOR_TIME_RANGE_OPTIONS = [ + { value: TraceListTimeRange.ONE_DAY, label: "Last 1 Day" }, + { value: TraceListTimeRange.THREE_DAYS, label: "Last 3 Days" }, + { value: TraceListTimeRange.SEVEN_DAYS, label: "Last 7 Days" }, + { value: TraceListTimeRange.THIRTY_DAYS, label: "Last 30 Days" }, +]; + +interface CompareRadarPoint extends RadarDataPoint { + source: number | null; + target: number | null; + _isNoDataSource: boolean; + _isNoDataTarget: boolean; + _scoredCountSource: number; + _totalCountSource: number; + _scoredCountTarget: number; + _totalCountTarget: number; +} + +/** Per-monitor score query params: past monitors use their own fixed trace window. */ +function buildScoreQueryParams( + monitor: MonitorResponse | undefined, + timeRange: TraceListTimeRange, +) { + if (monitor?.type === "past" && monitor.traceStart && monitor.traceEnd) { + return { startTime: monitor.traceStart, endTime: monitor.traceEnd }; + } + return { timeRange }; +} + +/** + * Shows each monitor's actual comparison window inline next to its name — + * a fixed date range for past monitors, or its own time-range picker for + * future monitors — so a past-vs-future comparison never hides that the two + * sides are looking at different time semantics. + */ +function MonitorTimeBadge({ + monitor, + timeRange, + onTimeRangeChange, +}: { + monitor: MonitorResponse | undefined; + timeRange: TraceListTimeRange; + onTimeRangeChange: (value: TraceListTimeRange) => void; +}) { + if (monitor?.type === "past" && monitor.traceStart && monitor.traceEnd) { + return ( + + {formatTraceWindow(monitor.traceStart, monitor.traceEnd)} + + ); + } + + return ( + + ); +} + +export const CompareMonitorComponent: React.FC = () => { + const { orgId, projectId, agentId, envId, monitorId } = useParams<{ + orgId: string; + projectId: string; + agentId: string; + envId: string; + monitorId: string; + }>(); + const [searchParams, setSearchParams] = useSearchParams(); + const navigate = useNavigate(); + const targetMonitorId = searchParams.get("with") ?? ""; + const theme = useTheme(); + const palette = theme.vars?.palette; + + const sourceTimeRange = useMemo( + () => + (searchParams.get("sourceTimeRange") as TraceListTimeRange) || + TraceListTimeRange.SEVEN_DAYS, + [searchParams], + ); + const targetTimeRange = useMemo( + () => + (searchParams.get("targetTimeRange") as TraceListTimeRange) || + TraceListTimeRange.SEVEN_DAYS, + [searchParams], + ); + + const handleSourceTimeRangeChange = (value: TraceListTimeRange) => { + const next = new URLSearchParams(searchParams); + next.set("sourceTimeRange", value); + setSearchParams(next, { replace: true }); + }; + const handleTargetTimeRangeChange = (value: TraceListTimeRange) => { + const next = new URLSearchParams(searchParams); + next.set("targetTimeRange", value); + setSearchParams(next, { replace: true }); + }; + + // ── Target monitor picker — change who we're comparing against ────────── + const [targetAnchorEl, setTargetAnchorEl] = useState( + null, + ); + const { data: candidateMonitors } = useListMonitors( + { orgName: orgId ?? "", projName: projectId ?? "", agentName: agentId ?? "" }, + { environmentName: envId }, + ); + const otherMonitors = useMemo( + () => + (candidateMonitors?.monitors ?? []).filter( + (m) => m.name !== monitorId && m.name !== targetMonitorId, + ), + [candidateMonitors, monitorId, targetMonitorId], + ); + const handleOpenTargetMenu = (event: React.MouseEvent) => { + setTargetAnchorEl(event.currentTarget); + }; + const handleCloseTargetMenu = () => { + setTargetAnchorEl(null); + }; + const handleChangeTarget = (nextTargetName: string) => { + handleCloseTargetMenu(); + const next = new URLSearchParams(searchParams); + next.set("with", nextTargetName); + setSearchParams(next, { replace: true }); + }; + + // ── Source monitor picker — source lives in the URL path, not a query + // param, so swapping it means navigating to a new compare URL instead of + // just updating search params. Shares the same candidate list as target. + const [sourceAnchorEl, setSourceAnchorEl] = useState( + null, + ); + const handleOpenSourceMenu = (event: React.MouseEvent) => { + setSourceAnchorEl(event.currentTarget); + }; + const handleCloseSourceMenu = () => { + setSourceAnchorEl(null); + }; + const handleChangeSource = (nextSourceName: string) => { + handleCloseSourceMenu(); + if (!orgId || !projectId || !agentId || !envId) { + return; + } + navigate({ + pathname: generatePath( + absoluteRouteMap.children.org.children.projects.children.agents + .children.environment.children.evaluation.children.monitor.children + .compare.path, + { orgId, projectId, agentId, envId, monitorId: nextSourceName }, + ), + search: searchParams.toString(), + }); + }; + + const sourceParams = useMemo( + () => ({ + monitorName: monitorId ?? "", + orgName: orgId ?? "", + projName: projectId ?? "", + agentName: agentId ?? "", + }), + [monitorId, orgId, projectId, agentId], + ); + + const targetParams = useMemo( + () => ({ + monitorName: targetMonitorId, + orgName: orgId ?? "", + projName: projectId ?? "", + agentName: agentId ?? "", + }), + [targetMonitorId, orgId, projectId, agentId], + ); + + const { data: sourceMonitor, isLoading: isSourceMonitorLoading } = + useGetMonitor(sourceParams); + const { data: targetMonitor, isLoading: isTargetMonitorLoading } = + useGetMonitor(targetParams); + + const sourceScoreQuery = useMemo( + () => buildScoreQueryParams(sourceMonitor, sourceTimeRange), + [sourceMonitor, sourceTimeRange], + ); + const targetScoreQuery = useMemo( + () => buildScoreQueryParams(targetMonitor, targetTimeRange), + [targetMonitor, targetTimeRange], + ); + + const { data: sourceScores, isLoading: isSourceScoresLoading } = + useMonitorScores(sourceParams, sourceScoreQuery); + const { data: targetScores, isLoading: isTargetScoresLoading } = + useMonitorScores(targetParams, targetScoreQuery); + + const isLoading = + isSourceMonitorLoading || + isTargetMonitorLoading || + isSourceScoresLoading || + isTargetScoresLoading; + + const sourceEvaluators = useMemo( + () => sourceScores?.evaluators ?? [], + [sourceScores], + ); + const targetEvaluators = useMemo( + () => targetScores?.evaluators ?? [], + [targetScores], + ); + + const sourceName = sourceMonitor?.displayName ?? sourceMonitor?.name ?? "Monitor A"; + const targetName = targetMonitor?.displayName ?? targetMonitor?.name ?? "Monitor B"; + + const sourceColor = palette?.primary.main ?? "#3f8cff"; + // theme.palette.secondary is a neutral grey in this design system (not a + // visible accent), so use a fixed, high-contrast color for the comparison + // series instead — same approach as the fixed level colors in levelConfig.ts. + const targetColor = "#f59e0b"; + + // ── Union radar dataset: every evaluator either monitor has ────────────── + const radarChartData = useMemo(() => { + const byNameSource = new Map( + sourceEvaluators.map((e) => [e.evaluatorName, e]), + ); + const byNameTarget = new Map( + targetEvaluators.map((e) => [e.evaluatorName, e]), + ); + const names = Array.from( + new Set([...byNameSource.keys(), ...byNameTarget.keys()]), + ); + + return names.map((name) => { + const a = byNameSource.get(name); + const b = byNameTarget.get(name); + const meanA = a ? getMean(a) : undefined; + const meanB = b ? getMean(b) : undefined; + const sourceValue = a ? (meanA ?? 0) * 100 : null; + const targetValue = b ? (meanB ?? 0) * 100 : null; + const level: EvaluationLevel = (a ?? b)?.level ?? "trace"; + + return { + metric: name, + current: sourceValue ?? targetValue ?? 0, + _isNoData: a ? meanA === null : true, + _scoredCount: a ? a.count - a.skippedCount : 0, + _totalCount: a ? a.count : 0, + _level: level, + source: sourceValue, + target: targetValue, + _isNoDataSource: a ? meanA === null : false, + _isNoDataTarget: b ? meanB === null : false, + _scoredCountSource: a ? a.count - a.skippedCount : 0, + _totalCountSource: a ? a.count : 0, + _scoredCountTarget: b ? b.count - b.skippedCount : 0, + _totalCountTarget: b ? b.count : 0, + }; + }); + }, [sourceEvaluators, targetEvaluators]); + + const radars = useMemo( + () => [ + { + dataKey: "source", + name: sourceName, + stroke: sourceColor, + fill: sourceColor, + fillOpacity: 0.2, + strokeWidth: 2, + connectNulls: true, + }, + { + dataKey: "target", + name: targetName, + stroke: targetColor, + fill: targetColor, + fillOpacity: 0.15, + strokeWidth: 2, + connectNulls: true, + }, + ], + [sourceName, targetName, sourceColor, targetColor], + ); + + const sourceLevelSummaries = useMemo( + () => computeLevelSummaries(sourceEvaluators), + [sourceEvaluators], + ); + const targetLevelSummaries = useMemo( + () => computeLevelSummaries(targetEvaluators), + [targetEvaluators], + ); + const sourceAverageScore = useMemo( + () => computeAverageScore(sourceEvaluators), + [sourceEvaluators], + ); + const targetAverageScore = useMemo( + () => computeAverageScore(targetEvaluators), + [targetEvaluators], + ); + + const backHref = useMemo(() => { + if (!orgId || !projectId || !agentId || !envId || !monitorId) { + return "#"; + } + return generatePath( + absoluteRouteMap.children.org.children.projects.children.agents.children + .environment.children.evaluation.children.monitor.children.view.path, + { orgId, projectId, agentId, envId, monitorId }, + ); + }, [orgId, projectId, agentId, envId, monitorId]); + + if (!targetMonitorId) { + return ( + + + No comparison monitor selected. Go back and choose a monitor to + compare against. + + + ); + } + + return ( + + + } + onDelete={handleOpenSourceMenu} + onClick={handleOpenSourceMenu} + sx={{ fontWeight: 600 }} + /> + + {otherMonitors.length === 0 ? ( + No other monitors to compare + ) : ( + otherMonitors.map((m) => ( + handleChangeSource(m.name)}> + {m.displayName ?? m.name} + + )) + )} + + + + + vs + + + } + onDelete={handleOpenTargetMenu} + onClick={handleOpenTargetMenu} + sx={{ fontWeight: 600 }} + /> + + {otherMonitors.length === 0 ? ( + No other monitors to compare + ) : ( + otherMonitors.map((m) => ( + handleChangeTarget(m.name)}> + {m.displayName ?? m.name} + + )) + )} + + + + + } + > + + {isLoading ? ( + + + + + + + + + + + + ) : ( + <> + + + { + if (!active || !payload?.length) return null; + const dataPoint = payload[0]?.payload as + | CompareRadarPoint + | undefined; + if (!dataPoint) return null; + const cfg = LEVEL_CONFIG[dataPoint._level]; + + const rows = [ + { + label: sourceName, + color: sourceColor, + value: dataPoint.source, + isNoData: dataPoint._isNoDataSource, + scored: dataPoint._scoredCountSource, + total: dataPoint._totalCountSource, + }, + { + label: targetName, + color: targetColor, + value: dataPoint.target, + isNoData: dataPoint._isNoDataTarget, + scored: dataPoint._scoredCountTarget, + total: dataPoint._totalCountTarget, + }, + ].filter((row) => row.value !== null); + + return ( + + + + + + {dataPoint.metric} + + + + {rows.map((row) => ( + + + + {row.label} + + {row.isNoData ? ( + + ) : ( + + )} + + {row.isNoData + ? "all skipped" + : `(${row.scored}/${row.total} ${cfg.unit})`} + + + ))} + + + + ); + }} + /> + + + + + + + + + + )} + + + ); +}; + +export default CompareMonitorComponent; diff --git a/console/workspaces/pages/eval/src/ViewMonitor.Component.tsx b/console/workspaces/pages/eval/src/ViewMonitor.Component.tsx index e90a423ed..b3bcc8060 100644 --- a/console/workspaces/pages/eval/src/ViewMonitor.Component.tsx +++ b/console/workspaces/pages/eval/src/ViewMonitor.Component.tsx @@ -16,14 +16,16 @@ * under the License. */ -import React, { useCallback, useMemo } from "react"; +import React, { useCallback, useMemo, useState } from "react"; import { formatTraceWindow, PageLayout } from "@agent-management-platform/views"; import { + Button, Chip, CircularProgress, Grid, IconButton, InputAdornment, + Menu, MenuItem, Select, Skeleton, @@ -31,11 +33,18 @@ import { Typography, useTheme, } from "@wso2/oxygen-ui"; -import { Clock, RefreshCcw, Timer } from "@wso2/oxygen-ui-icons-react"; +import { + ChevronDown, + Clock, + GitCompare, + RefreshCcw, + Timer, +} from "@wso2/oxygen-ui-icons-react"; import { generatePath, Route, Routes, + useNavigate, useParams, useSearchParams, } from "react-router-dom"; @@ -43,7 +52,6 @@ import { absoluteRouteMap, relativeRouteMap, type EvaluationLevel, - type EvaluatorScoreSummary, TraceListTimeRange, } from "@agent-management-platform/types"; import AgentPerformanceCard, { @@ -58,10 +66,16 @@ import ScoreBreakdownCard from "./subComponents/ScoreBreakdownCard"; import { useGetMonitor, useGroupedScores, + useListMonitors, useMonitorScores, } from "@agent-management-platform/api-client"; import { useQueryClient } from "@tanstack/react-query"; import MonitorRunList from "./subComponents/MonitorRunList"; +import { + computeAverageScore, + computeLevelSummaries, + getMean, +} from "./utils/monitorScoreUtils"; const MONITOR_TIME_RANGE_OPTIONS = [ { value: TraceListTimeRange.ONE_DAY, label: "Last 1 Day" }, @@ -70,20 +84,54 @@ const MONITOR_TIME_RANGE_OPTIONS = [ { value: TraceListTimeRange.THIRTY_DAYS, label: "Last 30 Days" }, ]; -/** Extract the numeric mean from an evaluator's aggregations map. */ -const getMean = (e: EvaluatorScoreSummary): number | null => { - const v = e.aggregations?.["mean"]; - return typeof v === "number" ? v : null; -}; - export const ViewMonitorComponent: React.FC = () => { const { orgId, projectId, agentId, envId, monitorId } = useParams(); const theme = useTheme(); const palette = theme.vars?.palette; const queryClient = useQueryClient(); + const navigate = useNavigate(); const [searchParams, setSearchParams] = useSearchParams(); + // ── Compare button / monitor picker ─────────────────────────────────── + const [compareAnchorEl, setCompareAnchorEl] = + useState(null); + const { data: compareCandidates } = useListMonitors( + { orgName: orgId ?? "", projName: projectId ?? "", agentName: agentId ?? "" }, + { environmentName: envId }, + ); + const otherMonitors = useMemo( + () => (compareCandidates?.monitors ?? []).filter((m) => m.name !== monitorId), + [compareCandidates, monitorId], + ); + const handleOpenCompareMenu = useCallback( + (event: React.MouseEvent) => { + setCompareAnchorEl(event.currentTarget); + }, + [], + ); + const handleCloseCompareMenu = useCallback(() => { + setCompareAnchorEl(null); + }, []); + const handleCompareWith = useCallback( + (targetMonitorName: string) => { + handleCloseCompareMenu(); + if (!orgId || !projectId || !agentId || !envId || !monitorId) { + return; + } + navigate({ + pathname: generatePath( + absoluteRouteMap.children.org.children.projects.children.agents + .children.environment.children.evaluation.children.monitor.children + .compare.path, + { orgId, projectId, agentId, envId, monitorId }, + ), + search: `?with=${encodeURIComponent(targetMonitorName)}`, + }); + }, + [agentId, envId, handleCloseCompareMenu, monitorId, navigate, orgId, projectId], + ); + const timeRange = useMemo( () => (searchParams.get("timeRange") as TraceListTimeRange) || @@ -156,32 +204,15 @@ export const ViewMonitorComponent: React.FC = () => { const hasLlmLevel = levelsPresent.has("llm"); // ── EvaluationSummaryCard — per-level breakdown ─────────────────────────── - const levelSummaries = useMemo(() => { - const levelOrder: EvaluationLevel[] = ["trace", "agent", "llm"]; - return levelOrder - .filter((lvl) => levelsPresent.has(lvl)) - .map((lvl) => { - const group = evaluators.filter((e) => e.level === lvl); - return { - level: lvl, - evaluatorCount: group.length, - uniqueCount: Math.max(...group.map((e) => e.count), 0), - totalEvaluations: group.reduce((s, e) => s + e.count, 0), - skippedCount: group.reduce((s, e) => s + e.skippedCount, 0), - }; - }); - }, [evaluators, levelsPresent]); + const levelSummaries = useMemo( + () => computeLevelSummaries(evaluators), + [evaluators], + ); - const averageScore = useMemo(() => { - const means = evaluators - .map(getMean) - .filter((m): m is number => m !== null); - if (means.length === 0) { - return null; - } - const sum = means.reduce((acc, m) => acc + m, 0); - return sum / means.length; - }, [evaluators]); + const averageScore = useMemo( + () => computeAverageScore(evaluators), + [evaluators], + ); // ── PerformanceByEvaluatorCard ─────────────────────────────────────────── const evaluatorInfoList = useMemo( @@ -420,6 +451,35 @@ export const ViewMonitorComponent: React.FC = () => { ))} )} + + + {otherMonitors.length === 0 ? ( + + No other monitors in this environment + + ) : ( + otherMonitors.map((m) => ( + handleCompareWith(m.name)} + > + {m.displayName ?? m.name} + + )) + )} + React.ReactNode); // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -52,16 +58,28 @@ export interface RadarDataPoint { _scoredCount: number; _totalCount: number; _level: EvaluationLevel; + // Additional series (e.g. a second monitor's score) can be attached under + // their own dataKey; consumers index into RadarDataPoint dynamically. + [key: string]: unknown; } +export type RadarTooltipContent = (props: { + active?: boolean; + payload?: Array<{ value?: number; payload?: RadarDataPoint }>; +}) => React.ReactNode; + interface AgentPerformanceCardProps { radarChartData: RadarDataPoint[]; radars: RadarDefinition[]; + title?: string; + renderTooltipContent?: RadarTooltipContent; } const AgentPerformanceCard: React.FC = ({ radarChartData, radars, + title = "Agent Performance", + renderTooltipContent, }) => { const theme = useTheme(); const isDark = theme.palette.mode === "dark"; @@ -76,7 +94,7 @@ const AgentPerformanceCard: React.FC = ({ justifyContent="space-between" alignItems="center" > - Agent Performance + {title} {radarChartData.length === 0 ? ( = ({ > ; - }) => { + content={ + renderTooltipContent ?? + (({ + active, + payload, + }: { + active?: boolean; + payload?: Array<{ value?: number; payload?: RadarDataPoint }>; + }) => { if (!active || !payload?.length) return null; const point = payload[0]; const dataPoint = point.payload; @@ -169,7 +189,8 @@ const AgentPerformanceCard: React.FC = ({ ); - }} + }) + } /> )} diff --git a/console/workspaces/pages/eval/src/subComponents/EvaluationSummaryCard.tsx b/console/workspaces/pages/eval/src/subComponents/EvaluationSummaryCard.tsx index f307d4a7d..a8d2fe0f2 100644 --- a/console/workspaces/pages/eval/src/subComponents/EvaluationSummaryCard.tsx +++ b/console/workspaces/pages/eval/src/subComponents/EvaluationSummaryCard.tsx @@ -44,11 +44,13 @@ export interface LevelSummary { interface EvaluationSummaryCardProps { levels: LevelSummary[]; averageScore: number | null; + title?: string; } const EvaluationSummaryCard: React.FC = ({ levels, averageScore, + title = "Evaluation Summary", }) => { const theme = useTheme(); const isDark = theme.palette.mode === "dark"; @@ -61,7 +63,7 @@ const EvaluationSummaryCard: React.FC = ({ return ( - Evaluation Summary + {title} {levels.length === 0 ? ( e.level)); + return levelOrder + .filter((lvl) => levelsPresent.has(lvl)) + .map((lvl) => { + const group = evaluators.filter((e) => e.level === lvl); + return { + level: lvl, + evaluatorCount: group.length, + uniqueCount: Math.max(...group.map((e) => e.count), 0), + totalEvaluations: group.reduce((s, e) => s + e.count, 0), + skippedCount: group.reduce((s, e) => s + e.skippedCount, 0), + }; + }); +} + +/** Averages the mean score across all evaluators that have a numeric mean. */ +export function computeAverageScore( + evaluators: EvaluatorScoreSummary[], +): number | null { + const means = evaluators.map(getMean).filter((m): m is number => m !== null); + if (means.length === 0) { + return null; + } + return means.reduce((acc, m) => acc + m, 0) / means.length; +} From 59452e6c4095412ccaa9864925f8453133b8c65d Mon Sep 17 00:00:00 2001 From: nadheesh Date: Mon, 22 Jun 2026 16:57:05 +0530 Subject: [PATCH 5/5] Fix trace-fetch tie truncation and address PR review feedback --- .../eval/src/CompareMonitor.Component.tsx | 47 +++++--- .../eval/src/CreateMonitor.Component.tsx | 2 +- evaluation-job/main.py | 8 +- .../src/amp_evaluation/trace/fetcher.py | 71 +++++++++++- .../tests/test_trace_fetcher.py | 105 ++++++++++++++++-- 5 files changed, 196 insertions(+), 37 deletions(-) diff --git a/console/workspaces/pages/eval/src/CompareMonitor.Component.tsx b/console/workspaces/pages/eval/src/CompareMonitor.Component.tsx index 7e7a7eb35..cdd688fb6 100644 --- a/console/workspaces/pages/eval/src/CompareMonitor.Component.tsx +++ b/console/workspaces/pages/eval/src/CompareMonitor.Component.tsx @@ -71,6 +71,14 @@ const MONITOR_TIME_RANGE_OPTIONS = [ { value: TraceListTimeRange.THIRTY_DAYS, label: "Last 30 Days" }, ]; +/** Guard a raw query-param value before treating it as a TraceListTimeRange. */ +function isValidTimeRange(value: string | null): value is TraceListTimeRange { + return ( + value !== null && + (Object.values(TraceListTimeRange) as string[]).includes(value) + ); +} + interface CompareRadarPoint extends RadarDataPoint { source: number | null; target: number | null; @@ -147,18 +155,14 @@ export const CompareMonitorComponent: React.FC = () => { const theme = useTheme(); const palette = theme.vars?.palette; - const sourceTimeRange = useMemo( - () => - (searchParams.get("sourceTimeRange") as TraceListTimeRange) || - TraceListTimeRange.SEVEN_DAYS, - [searchParams], - ); - const targetTimeRange = useMemo( - () => - (searchParams.get("targetTimeRange") as TraceListTimeRange) || - TraceListTimeRange.SEVEN_DAYS, - [searchParams], - ); + const sourceTimeRange = useMemo(() => { + const raw = searchParams.get("sourceTimeRange"); + return isValidTimeRange(raw) ? raw : TraceListTimeRange.SEVEN_DAYS; + }, [searchParams]); + const targetTimeRange = useMemo(() => { + const raw = searchParams.get("targetTimeRange"); + return isValidTimeRange(raw) ? raw : TraceListTimeRange.SEVEN_DAYS; + }, [searchParams]); const handleSourceTimeRangeChange = (value: TraceListTimeRange) => { const next = new URLSearchParams(searchParams); @@ -286,9 +290,9 @@ export const CompareMonitorComponent: React.FC = () => { const sourceColor = palette?.primary.main ?? "#3f8cff"; // theme.palette.secondary is a neutral grey in this design system (not a - // visible accent), so use a fixed, high-contrast color for the comparison - // series instead — same approach as the fixed level colors in levelConfig.ts. - const targetColor = "#f59e0b"; + // visible accent), so use the warning token (amber) as the high-contrast + // comparison series, falling back to a fixed hex if the token is unavailable. + const targetColor = palette?.warning?.main ?? "#f59e0b"; // ── Union radar dataset: every evaluator either monitor has ────────────── const radarChartData = useMemo(() => { @@ -307,8 +311,12 @@ export const CompareMonitorComponent: React.FC = () => { const b = byNameTarget.get(name); const meanA = a ? getMean(a) : undefined; const meanB = b ? getMean(b) : undefined; - const sourceValue = a ? (meanA ?? 0) * 100 : null; - const targetValue = b ? (meanB ?? 0) * 100 : null; + // Keep null (not 0) when an evaluator is absent or fully skipped, so the + // radar bridges the axis (connectNulls) instead of plotting a misleading + // 0 score. The _isNoData* flags below still distinguish "absent" from + // "all skipped" for the tooltip. + const sourceValue = meanA != null ? meanA * 100 : null; + const targetValue = meanB != null ? meanB * 100 : null; const level: EvaluationLevel = (a ?? b)?.level ?? "trace"; return { @@ -518,7 +526,10 @@ export const CompareMonitorComponent: React.FC = () => { scored: dataPoint._scoredCountTarget, total: dataPoint._totalCountTarget, }, - ].filter((row) => row.value !== null); + // Keep a row when the monitor has the evaluator: either it has + // a score (value set) or it ran but fully skipped (isNoData). + // Drop only evaluators the monitor doesn't have at all. + ].filter((row) => row.value !== null || row.isNoData); return ( diff --git a/console/workspaces/pages/eval/src/CreateMonitor.Component.tsx b/console/workspaces/pages/eval/src/CreateMonitor.Component.tsx index 787dd8b5c..72756bc6e 100644 --- a/console/workspaces/pages/eval/src/CreateMonitor.Component.tsx +++ b/console/workspaces/pages/eval/src/CreateMonitor.Component.tsx @@ -81,7 +81,7 @@ export const CreateMonitorComponent: React.FC = () => { const displayName = `${sourceMonitor.displayName} (Copy)`; const samplingRatePercent = sourceMonitor.samplingRate !== undefined - ? Math.min(100, Math.max(0, Math.round(sourceMonitor.samplingRate * 100))) + ? Math.min(100, Math.max(1, Math.round(sourceMonitor.samplingRate * 100))) : 100; return { displayName, diff --git a/evaluation-job/main.py b/evaluation-job/main.py index 0dbc9529d..554d6e06c 100644 --- a/evaluation-job/main.py +++ b/evaluation-job/main.py @@ -140,6 +140,12 @@ def configure_logging() -> None: logging.basicConfig(level=logging.INFO, handlers=[handler]) logging.getLogger(__name__).setLevel(level) + # The openai/anthropic SDKs (used internally by any_llm) log retry attempts at + # INFO via their own package loggers, not through evaluation-job's logger above. + logging.getLogger("openai").setLevel(logging.WARNING) + logging.getLogger("anthropic").setLevel(logging.WARNING) + logging.getLogger("httpx").setLevel(logging.WARNING) + def parse_args() -> argparse.Namespace: """Parse command-line arguments for monitor execution.""" @@ -188,7 +194,7 @@ def parse_args() -> argparse.Namespace: "--sampling-rate", type=float, default=1.0, - help="Sampling rate for traces (0.0-1.0), default: 1.0", + help="Sampling rate for traces (> 0 and <= 1), default: 1.0", ) parser.add_argument( diff --git a/libs/amp-evaluation/src/amp_evaluation/trace/fetcher.py b/libs/amp-evaluation/src/amp_evaluation/trace/fetcher.py index 7ef271a32..18235c44e 100644 --- a/libs/amp-evaluation/src/amp_evaluation/trace/fetcher.py +++ b/libs/amp-evaluation/src/amp_evaluation/trace/fetcher.py @@ -32,7 +32,7 @@ """ from dataclasses import dataclass, field -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from typing import List, Optional, Dict, Any, Callable, Iterable, Iterator, Tuple from amp_evaluation.trace.models import ToolDefinition @@ -88,6 +88,11 @@ def _parse_timestamp(raw_timestamp: Any) -> Optional[datetime]: return None +def _format_iso_z(dt: datetime) -> str: + """Serialise a datetime as a UTC ISO-8601 string with a trailing 'Z'.""" + return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z") + + # ============================================================================ # OTEL/AMP Attribute Models (matching /traces/export API response) # ============================================================================ @@ -494,6 +499,9 @@ def fetch_traces( Trace objects with OTEL/AMP attributes, in ascending startTime order """ page_size = page_size if page_size is not None else self.page_size + if page_size <= 0: + raise ValueError("page_size must be a positive integer") + seen_ids: set = set() cursor_start = start_time yielded = 0 @@ -504,22 +512,75 @@ def fetch_traces( return new_traces = [t for t in page if t.traceId not in seen_ids] + if not new_traces: - # Entire page already seen: a startTime tie spans more than one page. - return + # No new traces at this cursor. A SHORT page means the window is + # genuinely exhausted. A FULL page of entirely-already-seen traces + # means the cursor is stalled on a cluster of traces the store can't + # advance past (more traces share one stored time than fit in a page). + if len(page) < page_size: + return + + next_cursor = self._step_past_stall(page[-1].startTime, end_time, page_size, seen_ids) + if next_cursor is None: + return # nothing resolvable after the stall — stop (never loop) + # Stepping forward skips any still-unseen traces sharing the stalled + # timestamp; surface it so the (intentional) loss is observable. + logger.warning( + "More than page_size (%d) traces share the stored time around %s; " + "stepping the cursor past it — remaining traces at that time are skipped.", + page_size, + page[-1].startTime, + ) + cursor_start = next_cursor + continue for trace in new_traces: + if max_traces is not None and yielded >= max_traces: + return seen_ids.add(trace.traceId) yield trace yielded += 1 - if max_traces is not None and yielded >= max_traces: - return if len(page) < page_size: return cursor_start = page[-1].startTime + def _step_past_stall(self, stalled_ts: str, end_time: str, page_size: int, seen_ids: set) -> Optional[str]: + """ + Return a cursor that advances past a stalled timestamp — the case where a full + page is entirely already-seen traces because more traces share one stored time + than fit in `page_size`. + + The export API's time filter/sort is delegated to an external store whose + resolution is unknown, so a fixed increment could be rounded back to the same + time and loop. Instead, grow the step (1µs, 10µs, … geometric) until a fetch at + the stepped cursor surfaces at least one unseen trace. Returns that cursor, or + None if nothing resolvable remains (window exhausted, unparseable, or step + capped) so the caller stops rather than looping. Note: Python datetime is + microsecond-resolution, so traces distinct only at sub-microsecond precision + collapse into the stall and are skipped along with it. + """ + base = _parse_timestamp(stalled_ts) + if base is None: + return None + end_dt = _parse_timestamp(end_time) + + step_us = 1 + for _ in range(15): # 1µs grown ×10 each time spans well past any real window + candidate_dt = base + timedelta(microseconds=step_us) + if end_dt is not None and candidate_dt > end_dt: + return None + candidate = _format_iso_z(candidate_dt) + page, _ = self._fetch_page(candidate, end_time, limit=page_size, sort_order="asc") + if not page: + return None + if any(t.traceId not in seen_ids for t in page): + return candidate # the cursor advanced far enough to surface new traces + step_us *= 10 # store rounded the step away — grow it and retry + return None + def fetch_trace_by_id(self, trace_id: str) -> Optional[OTELTrace]: """ Fetch a single trace by its ID using /trace endpoint. diff --git a/libs/amp-evaluation/tests/test_trace_fetcher.py b/libs/amp-evaluation/tests/test_trace_fetcher.py index 694813eaa..940149638 100644 --- a/libs/amp-evaluation/tests/test_trace_fetcher.py +++ b/libs/amp-evaluation/tests/test_trace_fetcher.py @@ -23,7 +23,7 @@ import pytest -from amp_evaluation.trace.fetcher import TraceFetcher, sample_traces +from amp_evaluation.trace.fetcher import TraceFetcher, sample_traces, _parse_timestamp def _raw_trace(trace_id: str, start_time: str) -> dict: @@ -55,6 +55,32 @@ def _mock_response(traces: List[dict], total_count: int) -> MagicMock: return resp +def _api_simulator(all_traces: List[dict], resolution_us: int = 1): + """Faithfully simulate /traces/export as a `requests.get` side_effect. + + Filters `startTime` to [cursor, end] using parsed datetimes (like a real + time-based store), sorts ascending, and caps at `limit`. `resolution_us` + truncates both trace timestamps and the cursor before comparing, so + `resolution_us=1000` models a millisecond-resolution store (used to prove the + self-correcting cursor step escapes a rounded-away increment). + """ + + def _bucket(dt): + epoch_us = int(dt.timestamp() * 1_000_000) + return (epoch_us // resolution_us) * resolution_us + + ordered = sorted(all_traces, key=lambda t: _parse_timestamp(t["startTime"])) + + def _get(url, params=None, headers=None, timeout=None, **kwargs): + cursor = _bucket(_parse_timestamp(params["startTime"])) + end = _bucket(_parse_timestamp(params["endTime"])) + limit = int(params["limit"]) + matching = [t for t in ordered if cursor <= _bucket(_parse_timestamp(t["startTime"])) <= end] + return _mock_response(matching[:limit], total_count=len(matching)) + + return _get + + class TestFetchTracesPagination: def test_paginates_across_multiple_pages(self): fetcher = _make_fetcher() @@ -91,21 +117,76 @@ def test_stops_when_page_smaller_than_page_size(self): assert mock_get.call_count == 1 def test_dedupes_traces_tied_at_page_boundary(self): - """If multiple traces share the exact boundary startTime, the next page's - re-query (from that same startTime) may re-return them. They must not be - yielded twice, and a page that returns nothing new must stop iteration.""" + """A startTime tie that exactly fills the page (and is the whole window) is + yielded once each and terminates — no duplicates, no loop.""" fetcher = _make_fetcher() + tie = "2026-01-01T00:00:00Z" + traces = [_raw_trace("t0", tie), _raw_trace("t1", tie)] - tie_time = "2026-01-01T00:00:00Z" - page1 = [_raw_trace("t0", tie_time), _raw_trace("t1", tie_time)] - # Re-querying from tie_time returns the same two traces again (no new ones). - page2 = [_raw_trace("t0", tie_time), _raw_trace("t1", tie_time)] - - with patch("requests.get", side_effect=[_mock_response(page1, 2), _mock_response(page2, 2)]) as mock_get: - result = list(fetcher.fetch_traces(start_time="s", end_time="e", page_size=2)) + with patch("requests.get", side_effect=_api_simulator(traces)): + result = list( + fetcher.fetch_traces( + start_time="2026-01-01T00:00:00Z", + end_time="2026-02-01T00:00:00Z", + page_size=2, + ) + ) assert [t.traceId for t in result] == ["t0", "t1"] - assert mock_get.call_count == 2 + + def test_tie_exceeding_page_size_skips_only_tail(self): + """When more than page_size traces share one startTime, the cursor steps past + it: the unseen tail at that timestamp is dropped, but every later trace is + still returned (no silent truncation of the rest) and it terminates.""" + fetcher = _make_fetcher() + t2 = "2026-01-01T00:00:02.000000Z" + t3 = "2026-01-01T00:00:03.000000Z" + traces = [_raw_trace(f"a{i}", t2) for i in range(5)] + [_raw_trace(f"b{i}", t3) for i in range(2)] + + with patch("requests.get", side_effect=_api_simulator(traces, resolution_us=1)): + result = list( + fetcher.fetch_traces( + start_time="2026-01-01T00:00:00Z", + end_time="2026-02-01T00:00:00Z", + page_size=2, + ) + ) + + ids = [t.traceId for t in result] + # First page_size of the tie are seen; the rest of the T2 tail (a2,a3,a4) are + # skipped; both T3 traces are preserved. + assert ids == ["a0", "a1", "b0", "b1"] + + def test_coarse_store_step_grows_and_terminates(self): + """If the store resolves time at millisecond granularity, a 1µs step rounds + back to the tie; the step must grow until it advances — and terminate.""" + fetcher = _make_fetcher() + # Three traces within the same millisecond bucket (sub-ms apart) + one later. + traces = [ + _raw_trace("a0", "2026-01-01T00:00:02.000100Z"), + _raw_trace("a1", "2026-01-01T00:00:02.000200Z"), + _raw_trace("a2", "2026-01-01T00:00:02.000300Z"), + _raw_trace("b0", "2026-01-01T00:00:05.000000Z"), + ] + + with patch("requests.get", side_effect=_api_simulator(traces, resolution_us=1000)): + result = list( + fetcher.fetch_traces( + start_time="2026-01-01T00:00:00Z", + end_time="2026-02-01T00:00:00Z", + page_size=2, + ) + ) + + ids = [t.traceId for t in result] + # a0,a1 seen (first page of the ms-tie); a2 is the dropped tail; b0 preserved. + assert "b0" in ids # did not silently truncate everything after the tie + assert ids == ["a0", "a1", "b0"] + + def test_page_size_must_be_positive(self): + fetcher = _make_fetcher() + with pytest.raises(ValueError): + list(fetcher.fetch_traces(start_time="s", end_time="e", page_size=0)) def test_max_traces_stops_early(self): fetcher = _make_fetcher()