Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions python/samples/agentchat_behavioral_monitor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# agentchat_behavioral_monitor

Detect vocabulary drift across repeated AgentChat runs.

When a long-running agent shifts away from earlier task vocabulary, the failure
often shows up first as a change in outputs rather than an explicit error. This
sample shows how to watch for that drift on the public AgentChat surface.

The demo is deterministic: it uses `ReplayChatCompletionClient` together with a
real `AssistantAgent`, then monitors the resulting `TaskResult.messages`
history. In production, replace the replay model with a real model client and
keep the same monitor.

This sample detects drift using **Ghost Consistency Score (CCS)**: the fraction
of vocabulary from the earliest runs still present in the most recent runs. A
score below 0.40 indicates likely behavioral drift.

## How it works

```
Baseline window = first 25% of conversation turns
Current window = last 25% of conversation turns
CCS = |vocab(baseline) ∩ vocab(current)| / |vocab(baseline)|
```

A "ghost term" is a task-relevant word (`jwt`, `bcrypt`, `foreign_key`,
`redis`, etc.)
that appeared in the baseline window but has disappeared from the current
window. Ghost terms are the most direct signal of forgotten context.

## Running the demo

```bash
python main.py
```

Expected output:

```
=== AutoGen AgentChat behavioral monitor demo ===

Turn 1
CCS: 1.0
Ghost terms: []
Drift detected: False

Turn 3
CCS: 0.25
Ghost terms: ['bcrypt', 'foreign_key', 'jwt', 'redis']
Drift detected: True
```

## Integrating into your agent loop

```python
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.replay import ReplayChatCompletionClient
from main import BehavioralMonitor

monitor = BehavioralMonitor(
ccs_threshold=0.40,
min_messages=3,
)

history = []
agent = AssistantAgent(
"assistant",
model_client=ReplayChatCompletionClient([
"Use jwt and bcrypt for auth.",
"Keep jwt auth intact for the profile endpoint.",
"Add endpoint rate limiting.",
]),
)

# Check after each public AgentChat run
task_result = await agent.run(task="Use jwt and bcrypt for auth", output_task_messages=False)
result = monitor.observe_result(history, task_result)
if result["drift_detected"]:
print("Drift at turn", result["turn"], "ghost:", result["ghost_terms"])

# Later runs keep extending the same external history
task_result = await agent.run(task="Now add a profile endpoint", output_task_messages=False)
result = monitor.observe_result(history, task_result)
```

## Parameters

| Parameter | Default | Description |
|---|---|---|
| `ccs_threshold` | 0.40 | Flag drift when CCS drops below this value |
| `min_messages` | 3 | Minimum number of tracked AgentChat results before checks run |
| `ghost_lexicon` | built-in list | Domain terms to watch for disappearance |

## Connection to AutoGen issue #7265

This sample addresses the production reliability pattern discussed in
https://github.com/microsoft/autogen/issues/7265 — specifically the
ghost-lexicon pattern for detecting when long-running agent outputs silently
drift away from earlier task vocabulary.

## Related

- [compression-monitor](https://github.com/agent-morrow/compression-monitor) — the standalone toolkit this sample is adapted from
234 changes: 234 additions & 0 deletions python/samples/agentchat_behavioral_monitor/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
#!/usr/bin/env python
"""
agentchat_behavioral_monitor — monitor vocabulary drift across AgentChat runs.

This sample uses the public AgentChat surface end to end:

- `AssistantAgent.run()` for stateful repeated turns
- `TaskResult.messages` as the observed output surface
- `ReplayChatCompletionClient` for a deterministic, runnable demo

The replay model intentionally shifts away from earlier task vocabulary over
three runs so the monitor can flag drift without relying on a live provider.
In real deployments, replace the replay model with a production model client
and keep the same monitoring pattern.
"""

from __future__ import annotations

import asyncio
import re
from collections import Counter
from typing import Any, Dict, List, Optional, Sequence

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.base import TaskResult
from autogen_agentchat.messages import BaseAgentEvent, BaseChatMessage, TextMessage
from autogen_ext.models.replay import ReplayChatCompletionClient


GHOST_LEXICON: List[str] = [
"jwt",
"oauth",
"token",
"bearer",
"api_key",
"secret",
"credential",
"bcrypt",
"hash",
"salt",
"certificate",
"tls",
"ssl",
"database",
"schema",
"migration",
"index",
"foreign_key",
"transaction",
"redis",
"postgres",
"sqlite",
"mongo",
"vector",
"memory",
"context",
"retrieval",
"embedding",
"chunk",
"summarize",
"tool_call",
"function_call",
"handoff",
"termination",
"deploy",
"container",
"docker",
"kubernetes",
"endpoint",
"webhook",
"rate_limit",
"timeout",
"retry",
]

_MIN_WORD_LEN = 4
_BASELINE_FRAC = 0.25
_CURRENT_FRAC = 0.25


def _tokenize(text: str) -> Counter:
words = re.findall(r"[a-z_]{%d,}" % _MIN_WORD_LEN, text.lower())
return Counter(words)


def _extract_content(msg: Any) -> str:
"""Extract text from common AgentChat message shapes."""
if isinstance(msg, TextMessage):
return str(msg.content or "")
if isinstance(msg, dict):
return str(msg.get("content", "") or "")
content = getattr(msg, "content", None)
if isinstance(content, list):
return " ".join(str(part) for part in content)
if content is not None:
return str(content)
return str(msg)


def _tracked_messages(task_result: TaskResult) -> List[BaseAgentEvent | BaseChatMessage]:
"""Keep only textual messages that reflect agent output."""
tracked: List[BaseAgentEvent | BaseChatMessage] = []
for message in task_result.messages:
if isinstance(message, TextMessage):
tracked.append(message)
return tracked


class BehavioralMonitor:
"""
Stateless consistency checker for AgentChat histories.

Computes Ghost Consistency Score (CCS) — the fraction of vocabulary from
the baseline window (earliest turns) still present in the current window
(latest turns). A score below `ccs_threshold` indicates that important
task vocabulary may have dropped out of the assistant's outputs.
"""

def __init__(
self,
ghost_lexicon: Optional[List[str]] = None,
ccs_threshold: float = 0.40,
min_messages: int = 3,
) -> None:
self.ghost_lexicon = ghost_lexicon or GHOST_LEXICON
self.ccs_threshold = ccs_threshold
self.min_messages = min_messages

def check(self, messages: Sequence[Any]) -> Dict[str, Any]:
n = len(messages)
result: Dict[str, Any] = {
"drift_detected": False,
"ccs": 1.0,
"ghost_terms": [],
"turn": n,
}

if n < self.min_messages:
return result

cutoff_b = max(1, int(n * _BASELINE_FRAC))
cutoff_c = max(1, int(n * _CURRENT_FRAC))

baseline_text = " ".join(_extract_content(m) for m in messages[:cutoff_b])
current_text = " ".join(_extract_content(m) for m in messages[-cutoff_c:])

baseline_vocab = _tokenize(baseline_text)
current_vocab = _tokenize(current_text)

if not baseline_vocab:
return result

shared = sum(1 for word in baseline_vocab if word in current_vocab)
ccs = shared / len(baseline_vocab)
ghost_terms = [
term
for term in self.ghost_lexicon
if baseline_vocab.get(term, 0) > 0 and current_vocab.get(term, 0) == 0
]

result["ccs"] = round(ccs, 3)
result["ghost_terms"] = ghost_terms
result["drift_detected"] = ccs < self.ccs_threshold or bool(ghost_terms)
return result

def observe_result(self, history: List[Any], task_result: TaskResult) -> Dict[str, Any]:
history.extend(_tracked_messages(task_result))
return self.check(history)


async def main() -> None:
model_client = ReplayChatCompletionClient(
[
(
"Use jwt validation, bcrypt password hashing, redis-backed sessions, "
"and preserve foreign_key constraints in the auth schema."
),
(
"Keep the PATCH /profile endpoint aligned with the existing auth flow: "
"jwt bearer tokens, bcrypt hashes, and redis session checks still apply."
),
(
"Add PATCH /profile rate limiting with 429 responses and concise "
"input validation. Keep the implementation focused on the endpoint."
),
]
)

agent = AssistantAgent(
name="behavioral_monitor_demo",
description="Deterministic AgentChat demo for behavioral drift monitoring.",
model_client=model_client,
system_message=(
"You are a careful API architect. Answer tersely and stay aligned with "
"the running implementation context."
),
)

tasks = [
"Design the auth stack for a profile API. Mention jwt, bcrypt, redis, and foreign_key constraints.",
"Extend the same system with a PATCH /profile endpoint while keeping earlier auth constraints intact.",
"Now focus only on endpoint-level rate limiting and omit earlier auth details unless absolutely necessary.",
]

monitor = BehavioralMonitor(min_messages=3)
history: List[BaseAgentEvent | BaseChatMessage] = []

print("=== AutoGen AgentChat behavioral monitor demo ===")
for turn, task in enumerate(tasks, start=1):
result = await agent.run(task=task, output_task_messages=False)
report = monitor.observe_result(history, result)
final_message = next(
(
_extract_content(message)
for message in reversed(result.messages)
if isinstance(message, TextMessage)
),
"",
)

print(f"\nTurn {turn}")
print(f"Task: {task}")
print(f"Assistant: {final_message}")
print(f"CCS: {report['ccs']}")
print(f"Ghost terms: {report['ghost_terms']}")
print(f"Drift detected: {report['drift_detected']}")

print("\nInterpretation: the sample uses real AgentChat runs with a replay model.")
print("Swap in a production model client to monitor live long-running conversations")
print("through the same `TaskResult.messages`-based path.")


if __name__ == "__main__":
asyncio.run(main())