diff --git a/python/samples/agentchat_behavioral_monitor/README.md b/python/samples/agentchat_behavioral_monitor/README.md new file mode 100644 index 000000000000..e861e00edb4d --- /dev/null +++ b/python/samples/agentchat_behavioral_monitor/README.md @@ -0,0 +1,103 @@ +# agentchat_behavioral_monitor + +Detect vocabulary drift across repeated AgentChat runs. + +When a long-running agent shifts away from earlier task vocabulary, the failure +often shows up first as a change in outputs rather than an explicit error. This +sample shows how to watch for that drift on the public AgentChat surface. + +The demo is deterministic: it uses `ReplayChatCompletionClient` together with a +real `AssistantAgent`, then monitors the resulting `TaskResult.messages` +history. In production, replace the replay model with a real model client and +keep the same monitor. + +This sample detects drift using **Ghost Consistency Score (CCS)**: the fraction +of vocabulary from the earliest runs still present in the most recent runs. A +score below 0.40 indicates likely behavioral drift. + +## How it works + +``` +Baseline window = first 25% of conversation turns +Current window = last 25% of conversation turns +CCS = |vocab(baseline) ∩ vocab(current)| / |vocab(baseline)| +``` + +A "ghost term" is a task-relevant word (`jwt`, `bcrypt`, `foreign_key`, +`redis`, etc.) +that appeared in the baseline window but has disappeared from the current +window. Ghost terms are the most direct signal of forgotten context. + +## Running the demo + +```bash +python main.py +``` + +Expected output: + +``` +=== AutoGen AgentChat behavioral monitor demo === + +Turn 1 +CCS: 1.0 +Ghost terms: [] +Drift detected: False + +Turn 3 +CCS: 0.25 +Ghost terms: ['bcrypt', 'foreign_key', 'jwt', 'redis'] +Drift detected: True +``` + +## Integrating into your agent loop + +```python +from autogen_agentchat.agents import AssistantAgent +from autogen_ext.models.replay import ReplayChatCompletionClient +from main import BehavioralMonitor + +monitor = BehavioralMonitor( + ccs_threshold=0.40, + min_messages=3, +) + +history = [] +agent = AssistantAgent( + "assistant", + model_client=ReplayChatCompletionClient([ + "Use jwt and bcrypt for auth.", + "Keep jwt auth intact for the profile endpoint.", + "Add endpoint rate limiting.", + ]), +) + +# Check after each public AgentChat run +task_result = await agent.run(task="Use jwt and bcrypt for auth", output_task_messages=False) +result = monitor.observe_result(history, task_result) +if result["drift_detected"]: + print("Drift at turn", result["turn"], "ghost:", result["ghost_terms"]) + +# Later runs keep extending the same external history +task_result = await agent.run(task="Now add a profile endpoint", output_task_messages=False) +result = monitor.observe_result(history, task_result) +``` + +## Parameters + +| Parameter | Default | Description | +|---|---|---| +| `ccs_threshold` | 0.40 | Flag drift when CCS drops below this value | +| `min_messages` | 3 | Minimum number of tracked AgentChat results before checks run | +| `ghost_lexicon` | built-in list | Domain terms to watch for disappearance | + +## Connection to AutoGen issue #7265 + +This sample addresses the production reliability pattern discussed in +https://github.com/microsoft/autogen/issues/7265 — specifically the +ghost-lexicon pattern for detecting when long-running agent outputs silently +drift away from earlier task vocabulary. + +## Related + +- [compression-monitor](https://github.com/agent-morrow/compression-monitor) — the standalone toolkit this sample is adapted from diff --git a/python/samples/agentchat_behavioral_monitor/main.py b/python/samples/agentchat_behavioral_monitor/main.py new file mode 100644 index 000000000000..0523f8bb05bf --- /dev/null +++ b/python/samples/agentchat_behavioral_monitor/main.py @@ -0,0 +1,234 @@ +#!/usr/bin/env python +""" +agentchat_behavioral_monitor — monitor vocabulary drift across AgentChat runs. + +This sample uses the public AgentChat surface end to end: + +- `AssistantAgent.run()` for stateful repeated turns +- `TaskResult.messages` as the observed output surface +- `ReplayChatCompletionClient` for a deterministic, runnable demo + +The replay model intentionally shifts away from earlier task vocabulary over +three runs so the monitor can flag drift without relying on a live provider. +In real deployments, replace the replay model with a production model client +and keep the same monitoring pattern. +""" + +from __future__ import annotations + +import asyncio +import re +from collections import Counter +from typing import Any, Dict, List, Optional, Sequence + +from autogen_agentchat.agents import AssistantAgent +from autogen_agentchat.base import TaskResult +from autogen_agentchat.messages import BaseAgentEvent, BaseChatMessage, TextMessage +from autogen_ext.models.replay import ReplayChatCompletionClient + + +GHOST_LEXICON: List[str] = [ + "jwt", + "oauth", + "token", + "bearer", + "api_key", + "secret", + "credential", + "bcrypt", + "hash", + "salt", + "certificate", + "tls", + "ssl", + "database", + "schema", + "migration", + "index", + "foreign_key", + "transaction", + "redis", + "postgres", + "sqlite", + "mongo", + "vector", + "memory", + "context", + "retrieval", + "embedding", + "chunk", + "summarize", + "tool_call", + "function_call", + "handoff", + "termination", + "deploy", + "container", + "docker", + "kubernetes", + "endpoint", + "webhook", + "rate_limit", + "timeout", + "retry", +] + +_MIN_WORD_LEN = 4 +_BASELINE_FRAC = 0.25 +_CURRENT_FRAC = 0.25 + + +def _tokenize(text: str) -> Counter: + words = re.findall(r"[a-z_]{%d,}" % _MIN_WORD_LEN, text.lower()) + return Counter(words) + + +def _extract_content(msg: Any) -> str: + """Extract text from common AgentChat message shapes.""" + if isinstance(msg, TextMessage): + return str(msg.content or "") + if isinstance(msg, dict): + return str(msg.get("content", "") or "") + content = getattr(msg, "content", None) + if isinstance(content, list): + return " ".join(str(part) for part in content) + if content is not None: + return str(content) + return str(msg) + + +def _tracked_messages(task_result: TaskResult) -> List[BaseAgentEvent | BaseChatMessage]: + """Keep only textual messages that reflect agent output.""" + tracked: List[BaseAgentEvent | BaseChatMessage] = [] + for message in task_result.messages: + if isinstance(message, TextMessage): + tracked.append(message) + return tracked + + +class BehavioralMonitor: + """ + Stateless consistency checker for AgentChat histories. + + Computes Ghost Consistency Score (CCS) — the fraction of vocabulary from + the baseline window (earliest turns) still present in the current window + (latest turns). A score below `ccs_threshold` indicates that important + task vocabulary may have dropped out of the assistant's outputs. + """ + + def __init__( + self, + ghost_lexicon: Optional[List[str]] = None, + ccs_threshold: float = 0.40, + min_messages: int = 3, + ) -> None: + self.ghost_lexicon = ghost_lexicon or GHOST_LEXICON + self.ccs_threshold = ccs_threshold + self.min_messages = min_messages + + def check(self, messages: Sequence[Any]) -> Dict[str, Any]: + n = len(messages) + result: Dict[str, Any] = { + "drift_detected": False, + "ccs": 1.0, + "ghost_terms": [], + "turn": n, + } + + if n < self.min_messages: + return result + + cutoff_b = max(1, int(n * _BASELINE_FRAC)) + cutoff_c = max(1, int(n * _CURRENT_FRAC)) + + baseline_text = " ".join(_extract_content(m) for m in messages[:cutoff_b]) + current_text = " ".join(_extract_content(m) for m in messages[-cutoff_c:]) + + baseline_vocab = _tokenize(baseline_text) + current_vocab = _tokenize(current_text) + + if not baseline_vocab: + return result + + shared = sum(1 for word in baseline_vocab if word in current_vocab) + ccs = shared / len(baseline_vocab) + ghost_terms = [ + term + for term in self.ghost_lexicon + if baseline_vocab.get(term, 0) > 0 and current_vocab.get(term, 0) == 0 + ] + + result["ccs"] = round(ccs, 3) + result["ghost_terms"] = ghost_terms + result["drift_detected"] = ccs < self.ccs_threshold or bool(ghost_terms) + return result + + def observe_result(self, history: List[Any], task_result: TaskResult) -> Dict[str, Any]: + history.extend(_tracked_messages(task_result)) + return self.check(history) + + +async def main() -> None: + model_client = ReplayChatCompletionClient( + [ + ( + "Use jwt validation, bcrypt password hashing, redis-backed sessions, " + "and preserve foreign_key constraints in the auth schema." + ), + ( + "Keep the PATCH /profile endpoint aligned with the existing auth flow: " + "jwt bearer tokens, bcrypt hashes, and redis session checks still apply." + ), + ( + "Add PATCH /profile rate limiting with 429 responses and concise " + "input validation. Keep the implementation focused on the endpoint." + ), + ] + ) + + agent = AssistantAgent( + name="behavioral_monitor_demo", + description="Deterministic AgentChat demo for behavioral drift monitoring.", + model_client=model_client, + system_message=( + "You are a careful API architect. Answer tersely and stay aligned with " + "the running implementation context." + ), + ) + + tasks = [ + "Design the auth stack for a profile API. Mention jwt, bcrypt, redis, and foreign_key constraints.", + "Extend the same system with a PATCH /profile endpoint while keeping earlier auth constraints intact.", + "Now focus only on endpoint-level rate limiting and omit earlier auth details unless absolutely necessary.", + ] + + monitor = BehavioralMonitor(min_messages=3) + history: List[BaseAgentEvent | BaseChatMessage] = [] + + print("=== AutoGen AgentChat behavioral monitor demo ===") + for turn, task in enumerate(tasks, start=1): + result = await agent.run(task=task, output_task_messages=False) + report = monitor.observe_result(history, result) + final_message = next( + ( + _extract_content(message) + for message in reversed(result.messages) + if isinstance(message, TextMessage) + ), + "", + ) + + print(f"\nTurn {turn}") + print(f"Task: {task}") + print(f"Assistant: {final_message}") + print(f"CCS: {report['ccs']}") + print(f"Ghost terms: {report['ghost_terms']}") + print(f"Drift detected: {report['drift_detected']}") + + print("\nInterpretation: the sample uses real AgentChat runs with a replay model.") + print("Swap in a production model client to monitor live long-running conversations") + print("through the same `TaskResult.messages`-based path.") + + +if __name__ == "__main__": + asyncio.run(main())