diff --git a/.cursor/skills/engineering-tracker/SKILL.md b/.cursor/skills/engineering-tracker/SKILL.md
index 6433d5f..85ba2eb 100644
--- a/.cursor/skills/engineering-tracker/SKILL.md
+++ b/.cursor/skills/engineering-tracker/SKILL.md
@@ -10,7 +10,7 @@ description: >-
## Purpose
-Maintain a living record of the project's evolution in `docs/CHANGELOG.md`. This serves as:
+Maintain a living record of the project's evolution in `CHANGELOG.md` (root of the repository). This serves as:
- A learning journal for the author
- A reference for blog post writing
- A history of architectural decisions
diff --git a/.cursor/skills/example-readme-writer/SKILL.md b/.cursor/skills/example-readme-writer/SKILL.md
index da8ecf5..9616ada 100644
--- a/.cursor/skills/example-readme-writer/SKILL.md
+++ b/.cursor/skills/example-readme-writer/SKILL.md
@@ -48,7 +48,7 @@ Use this order by default for example READMEs:
```markdown
# Pattern NN: [Title]
-> One-sentence value proposition.
+> One-sentence value proposition -- mention the key pattern(s) introduced.
Short positioning paragraph:
- where this pattern fits in the series
@@ -57,58 +57,85 @@ Short positioning paragraph:
## Quick Start
-Show the fastest runnable path first.
+Fastest runnable path: .env setup, docker compose, curl. Include the
+verification request here so there's no need for a separate Verification section.
## What You Get Back
-Show the response shape, output artifact, or visible success criteria.
+Response shape or visible success criteria.
## At a Glance
-Compact table: agents, runtime, ports, endpoints, prerequisites, observability.
+Compact table: agents, graph topology, runtime, ports, data sources, observability.
## The Problem
-What breaks in the simpler approach? Why does this pattern exist?
+2-4 sentences. State the limitation(s) of the previous pattern. No comparison
+tables unless the comparison genuinely adds value that prose cannot.
## Architecture
-Use Mermaid when the topology or flow is not obvious. This is the important part, we are focusing on the architecture and flow not the code.
-
-
+Mermaid diagram + a paragraph explaining WHY it's structured this way (e.g. why
+two containers, why parallel), not just describing what exists.
## Key Concepts
-Short bullets only. Keep the deep explanation lower in the doc.
+4 bullets max, one line each, em-dashes not colons. If it reads like AI marketing
+copy, rewrite it shorter.
## Implementation Walkthrough
-Explain the important code in steps, with annotated snippets.
-Do not explain the code that is not part of the example, just present the main idea and flow.
+Link to source files so the reader can jump directly. Show code only when it's
+the actual working snippet that teaches the architecture (e.g. the MCP tool
+definition). For everything else, reference the file and explain the idea in
+prose. Never show pseudo-code or comment-only code blocks.
+
+## Connect Your MCP Client / Integration
+(if applicable -- combine all client tools into one section, CLI first, GUI last)
+
+## Local Development
-## What You Should See
+uv sync, test, lint commands.
-Expected logs, traces, response shape, or runtime behavior.
+## Exercises
-## Verification
+2 items max. One simple extension, one architectural extension. One sentence each.
-One or two concrete requests plus expected success and failure behavior.
+## Trade-offs
+Table of advantages vs. limitations. End with the bridge to the next pattern.
+
+## Further Reading
+
+Only link docs for technologies introduced in this pattern.
```
-You may merge `What You Get Back` and `At a Glance` when the example is very small, but do not move `Quick Start` far down the page unless the user explicitly wants a tutorial-first flow.
+**Sections to skip:**
+
+- **What You Should See** -- skip if Quick Start already shows expected output
+- **Verification** -- never duplicate Quick Start with the same curl commands
## README Quality Rules
-- **GitHub-first**: treat the README like a landing page first and a reference document second.
+- **GitHub-first**: treat the README like a landing page, not a reference manual.
- **Self-contained**: a developer should understand the pattern without opening five other files.
- **Use case consistency**: keep the crypto intelligence platform story and team evolution intact.
- **Progressive narrative**: explain what limitation motivates the next pattern.
- **Code-to-doc fidelity**: verify names, ports, env vars, response fields, validation, failure modes, and Docker UX against the implementation.
- **No time bombs**: avoid hardcoded years or other values that drift over time unless the code also hardcodes them.
-- **No duplication**: do not explain the same concept twice at two different depths unless the second time adds new information.
-- **Show payoff early**: include an output example, trace snippet, or success criteria near the top half of the page.
-- **Keep code snippets selective**: show the parts that teach the architecture; do not dump full files.
+- **Say it once**: if a concept appears in Quick Start, don't repeat it in a Verification section. Every section must add information that no other section covers.
+- **Show payoff early**: include an output example or success criteria near the top half of the page.
+
+## Writing Style Rules
+
+- **Reference files, don't duplicate code**: link to source files (`[file.py](path)`) so readers can jump directly. Only inline a code snippet when it's the actual working code and it teaches the architecture. Never show pseudo-code, comment-only blocks, or partial extracts that don't compile.
+- **Architecture explanation helps, not just describes**: when mentioning infrastructure (containers, ports, networks), explain WHY it's structured that way, not just WHAT exists.
+- **Key Concepts are tight**: 4 bullets max. One line each with em-dash separators. Cut any bullet that restates the architecture diagram.
+- **The Problem is concise**: 2-4 sentences stating the limitation. No comparison tables unless truly needed.
+- **Exercises are short**: 2 items max. One sentence each. One simple extension, one architectural.
+- **Further Reading is scoped**: only link docs for technologies introduced by this specific pattern.
+- **Integration guides are combined**: don't split Claude Code / Cursor / Claude Desktop into separate sections. One section, multiple examples, developer-workflow order (CLI tools first, GUI apps last).
+- **No AI tone**: avoid marketing-speak, over-explanation, and restating the obvious. If a sentence doesn't add information, delete it.
## Fidelity Checklist
diff --git a/.cursor/skills/langgraph-example-implementation/SKILL.md b/.cursor/skills/langgraph-example-implementation/SKILL.md
index 9a725e2..d403e9e 100644
--- a/.cursor/skills/langgraph-example-implementation/SKILL.md
+++ b/.cursor/skills/langgraph-example-implementation/SKILL.md
@@ -45,7 +45,9 @@ Do not use it to:
- Use `verbose_log()` in meaningful places
- Keep one shared `LANGSMITH_PROJECT` across the repo and differentiate examples with run tags and metadata, not extra per-example env vars
- For public graph entrypoints, pass a `build_langsmith_run_config(...)` result into `invoke()` or `ainvoke()`
+- Prefer one small shared runtime helper when multiple transport adapters need the same timeout or LangSmith run config
- Use FastAPI `lifespan` instead of startup/shutdown decorators
+- Avoid module-level side effects in transport modules; initialize tracing, graphs, and clients in lifespan or explicit lazy helpers
- Expose `/health` returning `{"status": "ok"}`
- Keep modules focused and easy to test in isolation
@@ -94,6 +96,7 @@ class AgentState(TypedDict, total=False):
Guidance:
- Use `Required[...]` for fields the entrypoint must provide
- Keep field names business-oriented and easy to inspect in traces
+- Prefer typed fields for machine-consumed downstream data such as generated query lists instead of encoding them into free-text blobs
- Do not overload state with framework-specific objects unless necessary
- Keep internal graph state separate from public API schemas when those concerns differ
@@ -134,9 +137,11 @@ async def agent_node(state: AgentState) -> dict[str, str]:
Guidance:
- Keep one clear responsibility per node
- Return only the state updates produced by that node
+- Prefer structured LLM output when downstream nodes need deterministic machine-readable fields
- If the node uses tools, log both the action and a concise outcome
- Catch provider/tool failures only when graceful degradation is part of the example goal
- Keep prompts, parsing, and tool orchestration inside the node or its helper module, not in FastAPI handlers
+- When two nodes share the same mechanics but differ in prompts or fallback queries, extract the shared mechanics into a helper module and keep the node-specific reasoning local
## Graph Template
@@ -232,12 +237,14 @@ Guidance:
- Keep LangSmith project selection simple: reuse the repo-wide `LANGSMITH_PROJECT` and rely on tags plus metadata for per-example filtering
- Include stable tags such as `example:...`, `pattern:...`, `env:...`, `runtime:...`, and `provider:...`
- Add structured error handling when the pattern needs resilience or external dependencies
+- Add an explicit timeout boundary for public HTTP or MCP entrypoints that call external providers
- Keep endpoint handlers thin; they should validate, call the graph or service layer, and map output
## Protocol-Specific Notes
- For direct tools, keep setup inside the node or helper module that owns the capability.
-- For MCP-based tools, isolate client or connection lifecycle in a dedicated module and prefer explicit typed schemas for exposed workflows.
+- For MCP-based tools, isolate client or connection lifecycle in a dedicated module, avoid import-time initialization, and prefer explicit typed schemas for exposed workflows.
+- When REST and MCP expose the same workflow, keep shared execution config in one place and document any intentional response-shape asymmetry.
- For A2A-compatible agents, keep message-based state with a `messages` key and isolate transport-specific request or response handling at the boundary.
- Add streaming only when the example is explicitly about streaming or the UX needs incremental progress.
- Use compose `command:` overrides to run extra services from the same image when only the entrypoint changes.
diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
index 90d80ff..f1b0287 100644
--- a/.github/PULL_REQUEST_TEMPLATE.md
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -27,7 +27,7 @@
- [ ] `README.md` follows the documentation template
- [ ] Architecture diagram (Mermaid) included
- [ ] "Running the Example" section with exact commands
-- [ ] `docs/CHANGELOG.md` updated
+- [ ] `CHANGELOG.md` updated
### Testing
- [ ] Tests added/updated in `tests/`
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e74dc69..4b54bae 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,35 @@
All notable changes to this project are documented here.
+## [2026-03-30] Pattern 02: MCP Tool Integration -- Architecture Redesign
+
+### Added
+- Parallel fan-out/fan-in graph: research_planner -> [news_scanner | project_profiler | community_analyst] -> intelligence_compiler
+- Research planner extracts `project_name` and `coin_ticker` via LLM, generates tailored `NEWS_QUERIES` and `COMMUNITY_QUERIES` for downstream nodes
+- CoinGecko retry with exponential backoff (3 attempts, handles 429 rate limits and 5xx errors)
+- Project profiler ticker fallback resolution (search by name, then by ticker symbol)
+- News scanner fires 3-4 targeted queries per run and deduplicates results by URL
+- Community analyst uses DuckDuckGo with site:reddit.com and Twitter-focused queries for social sentiment
+
+### Changed
+- P02 architecture from outcome-oriented MCP tool (one `research_crypto_project` tool wraps the full pipeline) instead of raw API wrappers as MCP tools
+- Sequential graph (5 steps, ~61s) replaced with parallel graph (3 steps, ~51s)
+- Community analyst no longer calls CoinGecko -- eliminated data duplication with project profiler
+- All agent prompts improved with anti-hallucination rules and explicit output structure
+- Intelligence compiler prompt demands source attribution and "Data not available" instead of guessing
+
+### Architecture Decisions
+- **Parallel over sequential**: news, profiler, and community have zero data dependencies -- running them in parallel is strictly better. LangGraph native `add_edge` fan-out/fan-in used (no Send API needed)
+- **Data source ownership**: each node owns exactly one external data source. Profiler owns CoinGecko (market + dev data). News and community own DuckDuckGo (with different query strategies). Compiler is LLM-only synthesis
+- **LLM query generation over regex**: research planner uses the LLM to understand user intent and generate search-engine-optimized queries, replacing brittle `_normalize_query` regex
+- **Retry over fail-fast for external APIs**: CoinGecko free tier has aggressive rate limits; retry with backoff prevents silent data loss that was observed in production traces
+- **Outcome-oriented MCP**: the MCP server exposes `research_crypto_project` (the full pipeline result) rather than raw API wrappers -- this is the Software 3.0 lesson: expose capabilities, not plumbing
+
+### Dependencies
+- No new dependencies added; removed `langchain-mcp-adapters` (MCP client no longer needed inside agents)
+
+---
+
## [2026-03-29] Pattern 02: MCP Tool Integration -- complete
### Summary
diff --git a/README.md b/README.md
index 2ac0379..ae2ab01 100644
--- a/README.md
+++ b/README.md
@@ -13,7 +13,7 @@
*From a single LangGraph pipeline to enterprise-grade, cloud-deployed agent architectures.*
-[Curriculum](docs/curriculum.md) · [Vision & Roadmap](docs/vision.md) · [Blog](https://ai.ksopyla.com) · [Changelog](docs/CHANGELOG.md)
+[Curriculum](docs/curriculum.md) · [Vision & Roadmap](docs/vision.md) · [Blog](https://ai.ksopyla.com) · [Changelog](CHANGELOG.md)
@@ -27,7 +27,7 @@ This project closes that gap with **9 design patterns**, each solving a named ar
## The Approach
-This is a set of exmaples you can call it **Design Patterns** presenting practical design and implementation for multi-agent systems. Each pattern:
+This is a set of examples you can call it **Design Patterns** presenting practical design and implementation for multi-agent systems. Each pattern:
- Solves a **real architectural problem** that the previous pattern cannot handle
- Has clear **"when to use / when to avoid"** criteria
@@ -89,7 +89,7 @@ Team 2 moves to an external partner. Implicit trust is gone -- JWT authenticatio
02 |
MCP Tool Integration |
Standardized tool access for agents & AI clients |
-MCP servers/clients, Claude Code integration |
+FastMCP, outcome-oriented tools, parallel fan-out/fan-in |
| 03 |
@@ -242,9 +242,9 @@ agent-patterns-lab/
│ └── src/agent_common/ # LLM config, tracing, MCP, A2A, auth helpers
├── docs/
│ ├── curriculum.md # Technical pattern-by-pattern breakdown
-│ ├── vision.md # Full narrative, philosophy, and roadmap
-│ └── CHANGELOG.md
+│ └── vision.md # Full narrative, philosophy, and roadmap
├── infra/ # Docker base images, Azure Bicep templates
+├── CHANGELOG.md # What changed and when
└── .github/ # CI/CD workflows, PR templates
```
@@ -294,7 +294,7 @@ uv run pre-commit install --install-hooks --hook-type pre-commit --hook-type com
- **[Full Curriculum](docs/curriculum.md)** -- detailed technical breakdown of each pattern with architecture diagrams
- **[Vision & Roadmap](docs/vision.md)** -- the complete narrative, architectural philosophy, and future direction
- **[Blog](https://ai.ksopyla.com)** -- in-depth write-ups for each pattern
-- **[Changelog](docs/CHANGELOG.md)** -- what changed and when
+- **[Changelog](CHANGELOG.md)** -- what changed and when
## License
diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
deleted file mode 100644
index 8524cdd..0000000
--- a/docs/CHANGELOG.md
+++ /dev/null
@@ -1,51 +0,0 @@
-# Changelog
-
-All notable changes to this project are documented here.
-
-## [2026-03-30] Pattern 02: MCP Tool Integration -- Architecture Redesign
-
-### Added
-- Parallel fan-out/fan-in graph: research_planner -> [news_scanner | project_profiler | community_analyst] -> intelligence_compiler
-- Research planner extracts `project_name` and `coin_ticker` via LLM, generates tailored `NEWS_QUERIES` and `COMMUNITY_QUERIES` for downstream nodes
-- CoinGecko retry with exponential backoff (3 attempts, handles 429 rate limits and 5xx errors)
-- Project profiler ticker fallback resolution (search by name, then by ticker symbol)
-- News scanner fires 3-4 targeted queries per run and deduplicates results by URL
-- Community analyst uses DuckDuckGo with site:reddit.com and Twitter-focused queries for social sentiment
-- `docs/CHANGELOG.md` created for tracking project evolution
-
-### Changed
-- P02 architecture from outcome-oriented MCP tool (one `research_crypto_project` tool wraps the full pipeline) instead of raw API wrappers as MCP tools
-- Sequential graph (5 steps, ~61s) replaced with parallel graph (3 steps, ~51s)
-- Community analyst no longer calls CoinGecko -- eliminated data duplication with project profiler
-- All agent prompts improved with anti-hallucination rules and explicit output structure
-- Intelligence compiler prompt demands source attribution and "Data not available" instead of guessing
-
-### Architecture Decisions
-- **Parallel over sequential**: news, profiler, and community have zero data dependencies -- running them in parallel is strictly better. LangGraph native `add_edge` fan-out/fan-in used (no Send API needed)
-- **Data source ownership**: each node owns exactly one external data source. Profiler owns CoinGecko (market + dev data). News and community own DuckDuckGo (with different query strategies). Compiler is LLM-only synthesis
-- **LLM query generation over regex**: research planner uses the LLM to understand user intent and generate search-engine-optimized queries, replacing brittle `_normalize_query` regex
-- **Retry over fail-fast for external APIs**: CoinGecko free tier has aggressive rate limits; retry with backoff prevents silent data loss that was observed in production traces
-- **Outcome-oriented MCP**: the MCP server exposes `research_crypto_project` (the full pipeline result) rather than raw API wrappers -- this is the Software 3.0 lesson: expose capabilities, not plumbing
-
-### Dependencies
-- No new dependencies added; removed `langchain-mcp-adapters` (MCP client no longer needed inside agents)
-
-## [2026-03-29] Pattern 02: MCP Tool Integration -- Initial Implementation
-
-### Added
-- 5-agent intelligence pipeline: Research Planner, News Scanner, Project Profiler, Community Analyst, Intelligence Compiler
-- FastAPI REST entry point (`POST /run`) and FastMCP server entry point (`research_crypto_project` tool)
-- CoinGecko API client (`src/coingecko.py`) with search, info, and price endpoints
-- DuckDuckGo web search integration for news scanning
-- Multi-container Docker Compose (agent :8000, MCP server :8001)
-- LangSmith tracing with tagged runs (pattern, example, provider, runtime, env)
-- Unit tests (10), API tests (7), e2e test (1) -- 23 total, 99% coverage
-
-## [2026-03-28] Pattern 01: Orchestrator Pipeline
-
-### Added
-- 3-agent pipeline: Research Planner, News Scanner, Intelligence Compiler
-- LangGraph StateGraph with TypedDict state
-- FastAPI entry point with LangSmith tracing
-- Docker Compose single-container deployment
-- Shared library `libs/common/` with `agent_common` (LLM, tracing utilities)
diff --git a/docs/curriculum.md b/docs/curriculum.md
index dcebb71..f4741b8 100644
--- a/docs/curriculum.md
+++ b/docs/curriculum.md
@@ -154,11 +154,11 @@ graph TD
| Agent | Role | Data Source |
|-------|------|-------------|
-| Research Planner | Creates research plan | None (LLM only) |
-| News Scanner | Web search for news | DuckDuckGo (direct) |
-| Project Profiler | Project info, team, roadmap | CoinGecko API (direct httpx) |
-| Community Analyst | Community and developer activity | CoinGecko API (direct httpx) |
-| Intelligence Compiler | Synthesizes all outputs | None (LLM only) |
+| Research Planner | Extracts project identifiers, generates tailored search queries | None (LLM only) |
+| News Scanner | Web search for news, partnerships, announcements | DuckDuckGo (direct) |
+| Project Profiler | Market data, developer stats, project fundamentals | CoinGecko API (direct httpx, retry with backoff) |
+| Community Analyst | Social sentiment from Reddit, X/Twitter | DuckDuckGo (site-restricted queries) |
+| Intelligence Compiler | Synthesizes all outputs into structured report | None (LLM only) |
**Architecture:**
@@ -166,10 +166,17 @@ graph TD
graph TD
ClaudeDesktop["Claude Desktop\n/ Claude Code"] -->|"MCP: research_crypto_project()"| MCP["crypto-intelligence\nMCP Server (:8001)"]
User["User\n(POST /run)"] --> FastAPI["Agent Service\n(FastAPI :8000)"]
- FastAPI --> Pipeline["LangGraph Pipeline\n(5 agents)"]
+ FastAPI --> Pipeline["LangGraph Pipeline"]
MCP --> Pipeline
- Pipeline --> DDG["DuckDuckGo"]
- Pipeline --> CoinGecko["CoinGecko API"]
+ subgraph parallel ["Parallel Fan-Out / Fan-In"]
+ Planner["Research Planner"] --> NS["News Scanner\n(DuckDuckGo)"]
+ Planner --> PP["Project Profiler\n(CoinGecko)"]
+ Planner --> CA["Community Analyst\n(DuckDuckGo)"]
+ NS --> Compiler["Intelligence\nCompiler"]
+ PP --> Compiler
+ CA --> Compiler
+ end
+ Pipeline --> parallel
```
**Key concepts:**
@@ -177,8 +184,10 @@ graph TD
- Expose agent capability via MCP, not raw API wrappers (outcome-oriented tools)
- MCP server with `FastMCP` wrapping the full LangGraph pipeline as one tool
- Two entry points to the same graph: REST (`POST /run`) and MCP (`research_crypto_project`)
+- Parallel fan-out/fan-in: planner → [news | profiler | community] → compiler (LangGraph native `add_edge` fan-out)
+- Data source ownership: each research node owns exactly one external source (no duplication)
+- LLM-driven query generation: planner extracts `project_name`/`coin_ticker` and generates `NEWS_QUERIES`/`COMMUNITY_QUERIES` for downstream nodes
- Software 3.0 principle: the "UI" is Claude Desktop, not a bespoke chat widget
-- Agents call data sources directly (CoinGecko via httpx, DuckDuckGo via langchain) -- MCP is for the external interface, not internal plumbing
- Multi-container Docker Compose (agent REST service + MCP server)
- Claude Desktop / Claude Code / Cursor integration via MCP config
diff --git a/examples/02-mcp-tool-integration/README.md b/examples/02-mcp-tool-integration/README.md
index 349ba81..974d037 100644
--- a/examples/02-mcp-tool-integration/README.md
+++ b/examples/02-mcp-tool-integration/README.md
@@ -1,8 +1,8 @@
# Pattern 02: MCP Tool Integration
-> Expose your agent pipeline as an MCP server so Claude Desktop, Cursor, or Claude Code can call `research_crypto_project` and get a full intelligence report -- one tool, one protocol, no custom UI.
+> Expose your agent pipeline as an MCP tool and run research agents in parallel -- one protocol, one tool call, full intelligence report.
-`Pattern 02 of 9`. Expands Team 1 (Intelligence) from 3 agents to the full 5-agent lineup and adds a second entry point: alongside `POST /run` (REST), the same pipeline is now accessible as an MCP tool. This is the Software 3.0 lesson -- the "UI" is Claude Desktop, not a bespoke chat widget.
+`Pattern 02 of 9`. Expands Team 1 (Intelligence) from 3 agents to the full 5-agent lineup with a **parallel fan-out/fan-in** architecture and adds a second entry point: alongside `POST /run` (REST), the same pipeline is now accessible as an MCP tool. This is the Software 3.0 lesson -- the "UI" is Claude Desktop, not a bespoke chat widget.
Useful context:
- [Curriculum](../../docs/curriculum.md)
@@ -32,11 +32,11 @@ curl -X POST http://localhost:8000/run \
-d '{"input": "Research the Arbitrum crypto project"}'
```
-The MCP entry point is at `localhost:8001/sse` -- connect Claude Desktop or Claude Code (see [Claude Desktop Integration](#claude-desktop-integration) below).
+The MCP entry point is at `localhost:8001/sse` -- connect Claude Code, Cursor, or Claude Desktop (see [Connect Your MCP Client](#connect-your-mcp-client) below).
## What You Get Back
-Both entry points (REST and MCP) run the same 5-agent pipeline and return the same intelligence report:
+Both entry points (REST and MCP) run the same 5-agent pipeline and produce the same final intelligence report. The REST API returns the full intermediate artifact set for debugging, while the MCP tool returns the final `report` only because MCP tools should expose outcomes rather than internal pipeline state:
```json
{
@@ -48,7 +48,7 @@ Both entry points (REST and MCP) run the same 5-agent pipeline and return the sa
}
```
-Via MCP, Claude Desktop receives the `report` field directly -- a complete analysis in one tool call.
+Via MCP, Claude Desktop receives the `report` field directly -- a complete analysis in one tool call. This asymmetry is intentional: REST is optimized for developer inspection, MCP is optimized for outcome-oriented tool use.
## At a Glance
@@ -57,27 +57,20 @@ Via MCP, Claude Desktop receives the `report` field directly -- a complete analy
| Pattern role | Introduces MCP -- expose agent capabilities to AI clients |
| Team | Team 1: Intelligence (full 5-agent lineup) |
| Agents | Research Planner, News Scanner, Project Profiler, Community Analyst, Intelligence Compiler |
+| Graph | Parallel fan-out/fan-in: planner → [news \| profiler \| community] → compiler |
| Entry points | REST: `POST /run` (:8000) · MCP: `research_crypto_project` tool (:8001) |
-| MCP tool | `research_crypto_project(project_name)` -- runs the full pipeline |
-| Data sources | DuckDuckGo (web search), CoinGecko free API (project data, prices) |
+| MCP tool | `research_crypto_project(query)` -- runs the full pipeline |
+| Data sources | News Scanner: DuckDuckGo · Project Profiler: CoinGecko API · Community Analyst: DuckDuckGo (site-restricted) |
| Runtime | Agent container (FastAPI) + MCP server container (same image, different command) |
-| Input validation | `input` must be 3-500 characters (REST); `project_name` is free-text (MCP) |
+| Input validation | `input` must be 3-500 characters (REST); `query` is free-text (MCP) |
+| Timeout behavior | Both entry points execute synchronously with a 120s timeout boundary |
| Observability | `VERBOSE=true` logs to stderr; hosted LangSmith tracing when `LANGSMITH_API_KEY` is set |
## The Problem
-In Pattern 01, the pipeline is locked behind `POST /run` -- a REST endpoint. This is Software 2.0 thinking: you build a custom client, you call a custom API.
+Pattern 01 has two limitations. First, the pipeline is locked behind `POST /run` -- Claude Desktop, Cursor, and other AI clients can't call it. MCP fixes this by exposing the agent's capability as a standard protocol tool that any MCP client discovers automatically.
-Claude Desktop can't use it. Cursor can't use it. Another AI agent can't use it. The only way to trigger the pipeline is a bespoke HTTP call.
-
-MCP changes the interface:
-
-| Aspect | Pattern 01 (REST only) | Pattern 02 (REST + MCP) |
-|--------|----------------------|------------------------|
-| Claude Desktop | Can't access | Calls `research_crypto_project` via MCP |
-| Custom client | `curl POST /run` | Still works |
-| What's exposed | REST endpoint | Agent **capability** as a protocol tool |
-| Discovery | Read the docs, know the URL | MCP client discovers tools automatically |
+Second, the three research agents run sequentially even though they have zero data dependencies on each other. Parallel fan-out cuts wall-clock time by running them concurrently.
## Architecture
@@ -87,71 +80,58 @@ graph TD
User["User\n(POST /run)"] --> FastAPI["Agent Service\n(FastAPI :8000)"]
FastAPI --> Pipeline["LangGraph Pipeline"]
MCP --> Pipeline
- subgraph agents ["5-Agent Pipeline"]
- Planner["Research Planner"]
+ subgraph agents ["Parallel Fan-Out / Fan-In Pipeline"]
+ Planner["Research Planner\n(extracts project_name, coin_ticker,\ngenerates search queries)"]
Scanner["News Scanner\n(DuckDuckGo)"]
- Profiler["Project Profiler\n(CoinGecko)"]
- Analyst["Community Analyst\n(CoinGecko)"]
+ Profiler["Project Profiler\n(CoinGecko API)"]
+ Analyst["Community Analyst\n(DuckDuckGo, site-restricted)"]
Compiler["Intelligence Compiler"]
Planner --> Scanner
- Scanner --> Profiler
- Profiler --> Analyst
+ Planner --> Profiler
+ Planner --> Analyst
+ Scanner --> Compiler
+ Profiler --> Compiler
Analyst --> Compiler
end
Pipeline --> agents
Pipeline -.->|"optional traces"| LangSmith["LangSmith"]
```
-Two containers, one graph. The MCP server and the FastAPI agent share the same Docker image -- only the `command` differs.
+The MCP server and REST API share the same Docker image but run as **separate containers** -- one serves `POST /run` on `:8000`, the other serves the MCP SSE endpoint on `:8001`. This way a developer can use either entry point (or both) without any code changes. Both invoke the same compiled LangGraph, where three research nodes fan out in parallel after the planner and fan in at the compiler.
## Key Concepts
-- **Expose capability, not plumbing**: the MCP tool is `research_crypto_project`, not `get_coin_price`. The internal orchestration is hidden.
-- **Two entry points, one graph**: REST and MCP both call `build_graph().ainvoke()`. Same result, different protocols.
-- **MCP server with `FastMCP`**: a few lines to wrap the pipeline as a discoverable tool.
-- **Data sources are internal**: agents call CoinGecko and DuckDuckGo directly (httpx, langchain). MCP is for the external interface, not internal data fetching.
-- **Graceful degradation**: CoinGecko failures, search failures, and LLM failures produce partial output, not crashes.
+- **Outcome-oriented MCP tool** -- expose `research_crypto_project` (the full pipeline), not raw API wrappers like `get_coin_price`
+- **Parallel fan-out/fan-in** -- three research nodes run concurrently via LangGraph `add_edge`; compiler waits for all three
+- **Data source ownership** -- each node owns one external source (DuckDuckGo or CoinGecko), no duplication
+- **Graceful degradation** -- CoinGecko retry with backoff; search and LLM failures produce partial output, not crashes
+- **Synchronous execution boundary** -- REST and MCP both wait for the full pipeline result and fail fast after 120 seconds instead of hanging indefinitely
## Implementation Walkthrough
-### Step 1: Build the MCP server that exposes the agent pipeline
+### MCP Server
-The entire MCP server is small -- it builds the graph and exposes one tool:
+The full implementation is in [`src/mcp_servers/crypto_intelligence.py`](src/mcp_servers/crypto_intelligence.py) -- ~30 lines. It builds the graph once and wraps it as a single MCP tool:
```python
-from mcp.server.fastmcp import FastMCP
-from src.agents.graph import build_graph
-
mcp = FastMCP("crypto-intelligence", host="0.0.0.0", port=8000)
_graph = build_graph()
@mcp.tool()
-async def research_crypto_project(project_name: str) -> str:
- """Research a cryptocurrency project and produce a structured intelligence report."""
- result = await _graph.ainvoke({"input": f"Research the {project_name} crypto project"})
+async def research_crypto_project(query: str) -> str:
+ result = await _graph.ainvoke({"input": query})
return result.get("report", "")
```
-Claude Desktop sees one tool: `research_crypto_project(project_name)`. It doesn't see or care about the 5 agents, CoinGecko, or DuckDuckGo behind it.
+### Parallel Graph
-### Step 2: Agents call data sources directly
+The graph uses LangGraph's native fan-out: after `research_planner`, three edges fire simultaneously to `news_scanner`, `project_profiler`, and `community_analyst`. The compiler waits for all three via fan-in. See [`src/agents/graph.py`](src/agents/graph.py) for the wiring and the [LangGraph branching docs](https://langchain-ai.github.io/langgraph/how-tos/branching/) for the pattern.
-No MCP wrappers around APIs. The Project Profiler calls CoinGecko directly:
-
-```python
-from src.coingecko import search_coins, get_coin_info, get_coin_price
-
-async def project_profiler_node(state: AgentState) -> dict[str, str]:
- search_results = await search_coins(query)
- coin_info = await get_coin_info(coin_id)
- coin_price = await get_coin_price(coin_id)
- # LLM analyzes the data
- return {"profile": str(response.content)}
-```
+The planner extracts `project_name` and `coin_ticker` via LLM and generates search queries so downstream nodes don't pass raw user input to external APIs. See [`src/agents/research_planner.py`](src/agents/research_planner.py).
-### Step 3: Docker Compose with two entry points
+### Docker Compose
-Both containers use the same image -- only the command differs:
+Both containers use the same image -- only the command differs. See [`docker-compose.yml`](docker-compose.yml):
```yaml
services:
@@ -164,10 +144,16 @@ services:
ports: ["8000:8000"]
```
-### Claude Desktop Integration
+## Connect Your MCP Client
+
+With `docker compose up` running, connect from your tool of choice:
-With Docker running, add this to `%APPDATA%\Claude\claude_desktop_config.json`:
+**Claude Code:**
+```bash
+claude mcp add --transport sse crypto-intelligence http://localhost:8001/sse
+```
+**Cursor** -- add to your project's `.cursor/mcp.json`:
```json
{
"mcpServers": {
@@ -178,48 +164,18 @@ With Docker running, add this to `%APPDATA%\Claude\claude_desktop_config.json`:
}
```
-Restart Claude Desktop. Ask: *"Research the Solana crypto project"*. Claude calls `research_crypto_project("Solana")` and gets a complete intelligence report.
-
-### Claude Code Integration
-
-```bash
-claude mcp add --transport sse crypto-intelligence http://localhost:8001/sse
-```
-
-## What You Should See
-
-With `VERBOSE=true`, container logs show the pipeline executing (same output whether triggered via REST or MCP):
-
-```text
-[14:32:01.234] [System] FastAPI application started
-[14:32:05.234] [ResearchPlanner] Planning research for: Research the Arbitrum crypto project
-[14:32:07.891] [NewsScanner] Searching for: Research the Arbitrum crypto project
-[14:32:10.445] [NewsScanner] Got 8 search results
-[14:32:13.234] [NewsScanner] Analysis complete (623 chars)
-[14:32:13.235] [ProjectProfiler] Profiling: Research the Arbitrum crypto project
-[14:32:13.567] [CoinGecko] search_coins('Arbitrum') -> 5 results
-[14:32:14.123] [CoinGecko] get_coin_info('arbitrum') -> Arbitrum
-[14:32:14.456] [CoinGecko] get_coin_price('arbitrum') -> $1.23
-[14:32:17.890] [CommunityAnalyst] Got community/developer data
-[14:32:21.234] [IntelligenceCompiler] Report generated (1247 chars)
-```
-
-## Verification
-
-```bash
-# REST entry point
-curl http://localhost:8000/health
-curl -X POST http://localhost:8000/run \
- -H "Content-Type: application/json" \
- -d '{"input": "Research the Solana crypto project"}'
-
-# Validation (expect 422)
-curl -X POST http://localhost:8000/run \
- -H "Content-Type: application/json" \
- -d '{"input": "ab"}'
+**Claude Desktop** -- add to `%APPDATA%\Claude\claude_desktop_config.json` (macOS: `~/Library/Application Support/Claude/claude_desktop_config.json`):
+```json
+{
+ "mcpServers": {
+ "crypto-intelligence": {
+ "url": "http://localhost:8001/sse"
+ }
+ }
+}
```
-For MCP: connect Claude Desktop or Claude Code (see above), then ask about any crypto project.
+Then ask: *"Research the Solana crypto project"*.
## Local Development
@@ -232,25 +188,28 @@ uv run python scripts/linting/run_mypy.py
## Exercises
-1. **Add a second MCP tool**: Expose `get_crypto_price(project_name)` as a lightweight MCP tool that skips the full pipeline and returns just the current price via CoinGecko.
-2. **Use the official CoinGecko MCP**: Replace direct httpx calls with the official `@coingecko/coingecko-mcp` server (requires a free CoinGecko API key). Observe how the MCP server's `research_crypto_project` tool doesn't change -- internal data access is hidden from MCP clients.
-3. **Test with Claude Desktop**: Connect to the MCP server and research different crypto projects. Compare the output with `POST /run`.
+1. **Add a lightweight MCP tool**: Expose `get_crypto_price(project_name)` that skips the full pipeline and returns just the current price via CoinGecko.
+2. **Add a fourth parallel branch**: Create a `tokenomics_analyst` node that fans out alongside the other three research nodes.
## Trade-offs
| Advantage | Limitation |
|-----------|-----------|
| Any MCP client gets the full agent capability | MCP server runs the full pipeline per call (cost/latency) |
+| Parallel execution cuts wall-clock time vs. sequential | Three concurrent DuckDuckGo/CoinGecko calls may hit rate limits faster |
| Claude Desktop is the "UI" -- no custom frontend | Streaming partial results is not supported (Pattern 06 adds this) |
| Same graph, two entry points -- no code duplication | Two containers for the same image |
-| Internal data sources are hidden from clients | CoinGecko rate limits apply (30 req/min free tier) |
+| REST exposes intermediate artifacts for debugging; MCP exposes only the final report | Entry points are intentionally asymmetric, so clients see different response shapes |
+| Internal data sources are hidden from clients | CoinGecko rate limits apply (30 req/min free tier; retry with backoff mitigates) |
+| Timeout prevents hung requests from running forever | Background jobs / `202 Accepted` polling are not implemented in this pattern to keep focus on MCP integration |
+
+Both entry points currently execute the pipeline synchronously. In a production system with longer-running research, a background-task or job-queue design such as `POST /run -> 202 Accepted -> GET /tasks/{id}` would be reasonable, but that extra lifecycle machinery would distract from the MCP lesson here.
This last limitation -- every request starts from scratch -- is the reason [Pattern 03](../03-persistent-memory/README.md) exists.
## Further Reading
-- [MCP Specification](https://modelcontextprotocol.io/)
+- [FastMCP (MCP Python SDK)](https://github.com/modelcontextprotocol/python-sdk) -- the `FastMCP` class used in this pattern
- [MCP Server Design Best Practices](https://www.philschmid.de/mcp-best-practices) -- expose outcomes, not raw APIs
-- [MCP Python SDK](https://github.com/modelcontextprotocol/python-sdk)
-- [CoinGecko Free API](https://docs.coingecko.com/reference/introduction)
-- [CoinGecko Official MCP](https://www.npmjs.com/package/@coingecko/coingecko-mcp) -- alternative to direct API calls
+- [LangGraph: Parallel Branch Execution](https://langchain-ai.github.io/langgraph/how-tos/branching/) -- the fan-out/fan-in pattern used in this graph
+- [MCP Specification](https://modelcontextprotocol.io/)
diff --git a/examples/02-mcp-tool-integration/src/agents/community_analyst.py b/examples/02-mcp-tool-integration/src/agents/community_analyst.py
index 48dfb16..4f1c5ab 100644
--- a/examples/02-mcp-tool-integration/src/agents/community_analyst.py
+++ b/examples/02-mcp-tool-integration/src/agents/community_analyst.py
@@ -1,24 +1,24 @@
"""Community Analyst agent -- analyzes social media sentiment and community activity.
-Reads: state["project_name"], state["coin_ticker"], state["plan"]
+Reads: state["project_name"], state["coin_ticker"], state["community_queries"]
Writes: state["community"]
Uses DuckDuckGo with site-restricted queries (reddit.com, twitter/X keywords)
to gauge community sentiment. Does NOT call CoinGecko -- that data source is
-owned exclusively by project_profiler.
+owned exclusively by project_profiler. Shared search mechanics live in
+src.agents.web_search.
"""
from __future__ import annotations
-import re
from datetime import UTC, datetime
from agent_common.llm import get_chat_model
from agent_common.tracing import verbose_log
-from langchain_community.tools import DuckDuckGoSearchResults
from langchain_core.messages import HumanMessage, SystemMessage
from src.agents.state import AgentState
+from src.agents.web_search import format_search_results, run_search_queries
SYSTEM_PROMPT = """\
You are a crypto community and sentiment analyst. You receive web search results \
@@ -37,29 +37,10 @@
justification. If data is insufficient, rate as "Insufficient Data" and explain why."""
-def _extract_plan_queries(plan: str) -> list[str]:
- """Pull queries from the COMMUNITY_QUERIES: section of the research plan."""
- match = re.search(
- r"COMMUNITY_QUERIES:\s*\n(.*?)(?:\n\s*\n|\Z)",
- plan,
- re.DOTALL | re.IGNORECASE,
- )
- if not match:
- return []
- lines = match.group(1).strip().splitlines()
- queries: list[str] = []
- for line in lines:
- cleaned = re.sub(r"^[\s\-\d.*•]+", "", line).strip().strip('"').strip("'")
- if cleaned:
- queries.append(cleaned)
- return queries
-
-
-def _build_queries(project_name: str, ticker: str, plan: str) -> list[str]:
- """Build social-focused search queries from plan and fallback templates."""
- plan_queries = _extract_plan_queries(plan)
- if plan_queries:
- return plan_queries[:4]
+def _build_queries(project_name: str, ticker: str, community_queries: list[str]) -> list[str]:
+ """Build social-focused queries from planner output and fallback templates."""
+ if community_queries:
+ return community_queries[:4]
current_year = datetime.now(UTC).year
return [
@@ -69,52 +50,20 @@ def _build_queries(project_name: str, ticker: str, plan: str) -> list[str]:
]
-def _deduplicate_results(all_results: list[dict[str, str]]) -> list[dict[str, str]]:
- """Remove duplicate search results by URL."""
- seen_urls: set[str] = set()
- unique: list[dict[str, str]] = []
- for item in all_results:
- url = item.get("link", "")
- if url and url not in seen_urls:
- seen_urls.add(url)
- unique.append(item)
- return unique
-
-
async def community_analyst_node(state: AgentState) -> dict[str, str]:
"""Analyze community sentiment using social-focused web searches."""
project_name = state.get("project_name", state["input"])
ticker = state.get("coin_ticker", "")
- plan = state.get("plan", "")
+ community_queries = state.get("community_queries", [])
verbose_log("CommunityAnalyst", f"Analyzing community for: {project_name} ({ticker})")
- queries = _build_queries(project_name, ticker, plan)
+ queries = _build_queries(project_name, ticker, community_queries)
verbose_log("CommunityAnalyst", f"Running {len(queries)} social search queries")
- all_results: list[dict[str, str]] = []
- search = DuckDuckGoSearchResults(
- max_results=5, # type: ignore[call-arg]
- output_format="list",
- )
-
- for query in queries:
- try:
- raw = await search.ainvoke(query)
- if isinstance(raw, list):
- all_results.extend(raw)
- verbose_log("CommunityAnalyst", f" [{query[:50]}] → {len(raw)} results")
- except Exception as exc:
- verbose_log("CommunityAnalyst", f" [{query[:50]}] search failed: {exc}")
-
- unique_results = _deduplicate_results(all_results)
- verbose_log(
- "CommunityAnalyst",
- f"Total: {len(all_results)} raw → {len(unique_results)} unique results",
- )
-
- results_text = (
- "\n".join(f"- [{r.get('title', 'N/A')}]({r.get('link', '')}): {r.get('snippet', '')}" for r in unique_results)
- or "[No social media results found]"
+ search_results = await run_search_queries(queries, "CommunityAnalyst")
+ results_text = format_search_results(
+ search_results,
+ empty_message="[No social media results found]",
)
try:
diff --git a/examples/02-mcp-tool-integration/src/agents/news_scanner.py b/examples/02-mcp-tool-integration/src/agents/news_scanner.py
index e2b8997..36ef710 100644
--- a/examples/02-mcp-tool-integration/src/agents/news_scanner.py
+++ b/examples/02-mcp-tool-integration/src/agents/news_scanner.py
@@ -1,25 +1,23 @@
"""News Scanner agent -- searches the web for recent news about a crypto project.
-Reads: state["project_name"], state["coin_ticker"], state["plan"]
+Reads: state["project_name"], state["coin_ticker"], state["news_queries"]
Writes: state["news"]
-Uses DuckDuckGo web search directly (not through MCP). Fires multiple targeted
-queries built from the project name and plan, then deduplicates results before
-passing them to the LLM for analysis. Focuses on: news, partnerships,
-announcements, events, sentiment from finance/crypto portals.
+Uses DuckDuckGo web search directly (not through MCP). It receives typed search
+queries from research_planner and falls back to simple templates when those are
+missing. Shared search mechanics live in src.agents.web_search.
"""
from __future__ import annotations
-import re
from datetime import UTC, datetime
from agent_common.llm import get_chat_model
from agent_common.tracing import verbose_log
-from langchain_community.tools import DuckDuckGoSearchResults
from langchain_core.messages import HumanMessage, SystemMessage
from src.agents.state import AgentState
+from src.agents.web_search import format_search_results, run_search_queries
SYSTEM_PROMPT = """\
You are a crypto news analyst. You receive raw web search results about a crypto project.
@@ -40,29 +38,10 @@
If search results are thin, say so explicitly — do NOT fabricate information."""
-def _extract_plan_queries(plan: str) -> list[str]:
- """Pull queries from the NEWS_QUERIES: section of the research plan."""
- match = re.search(
- r"NEWS_QUERIES:\s*\n(.*?)(?:\n\s*\n|\nCOMMUNITY_QUERIES:|\Z)",
- plan,
- re.DOTALL | re.IGNORECASE,
- )
- if not match:
- return []
- lines = match.group(1).strip().splitlines()
- queries: list[str] = []
- for line in lines:
- cleaned = re.sub(r"^[\s\-\d.*•]+", "", line).strip().strip('"').strip("'")
- if cleaned:
- queries.append(cleaned)
- return queries
-
-
-def _build_queries(project_name: str, ticker: str, plan: str) -> list[str]:
- """Build search queries from plan and fallback templates."""
- plan_queries = _extract_plan_queries(plan)
- if plan_queries:
- return plan_queries[:4]
+def _build_queries(project_name: str, ticker: str, news_queries: list[str]) -> list[str]:
+ """Build search queries from planner output and fallback templates."""
+ if news_queries:
+ return news_queries[:4]
current_year = datetime.now(UTC).year
return [
@@ -73,52 +52,20 @@ def _build_queries(project_name: str, ticker: str, plan: str) -> list[str]:
]
-def _deduplicate_results(all_results: list[dict[str, str]]) -> list[dict[str, str]]:
- """Remove duplicate search results by URL."""
- seen_urls: set[str] = set()
- unique: list[dict[str, str]] = []
- for item in all_results:
- url = item.get("link", "")
- if url and url not in seen_urls:
- seen_urls.add(url)
- unique.append(item)
- return unique
-
-
async def news_scanner_node(state: AgentState) -> dict[str, str]:
"""Search the web for crypto project news and analyze results."""
project_name = state.get("project_name", state["input"])
ticker = state.get("coin_ticker", "")
- plan = state.get("plan", "")
+ news_queries = state.get("news_queries", [])
verbose_log("NewsScanner", f"Searching news for: {project_name} ({ticker})")
- queries = _build_queries(project_name, ticker, plan)
+ queries = _build_queries(project_name, ticker, news_queries)
verbose_log("NewsScanner", f"Running {len(queries)} search queries")
- all_results: list[dict[str, str]] = []
- search = DuckDuckGoSearchResults(
- max_results=5, # type: ignore[call-arg]
- output_format="list",
- )
-
- for query in queries:
- try:
- raw = await search.ainvoke(query)
- if isinstance(raw, list):
- all_results.extend(raw)
- verbose_log("NewsScanner", f" [{query[:50]}] → {len(raw)} results")
- except Exception as exc:
- verbose_log("NewsScanner", f" [{query[:50]}] search failed: {exc}")
-
- unique_results = _deduplicate_results(all_results)
- verbose_log(
- "NewsScanner",
- f"Total: {len(all_results)} raw → {len(unique_results)} unique results",
- )
-
- results_text = (
- "\n".join(f"- [{r.get('title', 'N/A')}]({r.get('link', '')}): {r.get('snippet', '')}" for r in unique_results)
- or "[No search results found]"
+ search_results = await run_search_queries(queries, "NewsScanner")
+ results_text = format_search_results(
+ search_results,
+ empty_message="[No search results found]",
)
try:
diff --git a/examples/02-mcp-tool-integration/src/agents/research_planner.py b/examples/02-mcp-tool-integration/src/agents/research_planner.py
index ec3e2bc..983743d 100644
--- a/examples/02-mcp-tool-integration/src/agents/research_planner.py
+++ b/examples/02-mcp-tool-integration/src/agents/research_planner.py
@@ -1,77 +1,83 @@
"""Research Planner agent -- orchestrates the research pipeline.
Reads: state["input"]
-Writes: state["plan"], state["project_name"], state["coin_ticker"]
+Writes: state["plan"], state["project_name"], state["coin_ticker"],
+ state["news_queries"], state["community_queries"]
The planner is the first node in the graph. It analyzes the user request,
-identifies the crypto project, and produces a research plan with tailored
-queries that downstream nodes (news_scanner, project_profiler,
-community_analyst) will use. This eliminates raw user input being passed
-directly to external APIs and search engines.
+identifies the crypto project, and produces a research plan with typed query
+lists that downstream nodes use directly. This eliminates raw user input being
+passed to external APIs and removes the need for regex parsing of LLM text.
"""
from __future__ import annotations
-import re
-
from agent_common.llm import get_chat_model
from agent_common.tracing import verbose_log
from langchain_core.messages import HumanMessage, SystemMessage
+from pydantic import BaseModel, Field
from src.agents.state import AgentState
SYSTEM_PROMPT = """\
You are a crypto project research planner. Given a user query, do three things:
-1. **Identify the project.** On the FIRST line write exactly:
- PROJECT_NAME:
- On the SECOND line write exactly:
- COIN_TICKER:
-
+1. **Identify the project** with its official project name and ticker symbol.
2. **Create a focused research plan** (numbered list, one sentence per area):
a. Recent news, announcements, partnerships, events — positive and negative signals.
b. Project fundamentals via CoinGecko: market cap, price, volume, exchanges, \
team, genesis date, categories.
c. Community and social sentiment: X/Twitter buzz, Reddit discussions, \
Telegram activity, overall retail mood.
-
-3. **Generate tailored search queries** for downstream research agents.
- Write a section headed "NEWS_QUERIES:" with 3-4 web search queries \
-optimised for finding recent project news, partnerships, and announcements.
- Write a section headed "COMMUNITY_QUERIES:" with 3-4 web search queries \
-optimised for social media sentiment (use site:reddit.com or X/twitter keywords).
+3. **Generate tailored search queries** for downstream research agents:
+ - 3-4 news queries optimized for recent partnerships, announcements, and project updates
+ - 3-4 community queries optimized for Reddit, X/Twitter, and social sentiment
Keep the plan concise and actionable. Do NOT include price predictions."""
-def _extract_field(text: str, label: str) -> str:
- """Pull a 'LABEL: value' field from the plan text."""
- pattern = rf"^{re.escape(label)}:\s*(.+)$"
- match = re.search(pattern, text, re.MULTILINE | re.IGNORECASE)
- return match.group(1).strip() if match else ""
+class ResearchPlan(BaseModel):
+ """Structured planner output used by downstream research nodes."""
+
+ project_name: str = Field(description="Official project name")
+ coin_ticker: str = Field(description="Ticker symbol in uppercase, for example ETH or SOL")
+ plan: str = Field(description="Concise numbered research plan covering news, fundamentals, and community")
+ news_queries: list[str] = Field(description="Three to four targeted web search queries for recent project news")
+ community_queries: list[str] = Field(
+ description="Three to four targeted web search queries for community and social sentiment"
+ )
-async def research_planner_node(state: AgentState) -> dict[str, str]:
+async def research_planner_node(state: AgentState) -> dict[str, str | list[str]]:
"""Create a structured research plan and extract project identifiers."""
user_input = state["input"]
verbose_log("ResearchPlanner", f"Planning research for: {user_input[:100]}")
- llm = get_chat_model()
- response = await llm.ainvoke(
+ llm = get_chat_model().with_structured_output(ResearchPlan)
+ result = await llm.ainvoke(
[
SystemMessage(content=SYSTEM_PROMPT),
HumanMessage(content=user_input),
]
)
- plan = str(response.content)
-
- project_name = _extract_field(plan, "PROJECT_NAME") or user_input.strip()
- coin_ticker = _extract_field(plan, "COIN_TICKER") or ""
+ project_name = result.project_name.strip() or user_input.strip()
+ coin_ticker = result.coin_ticker.strip().upper()
+ news_queries = [query.strip() for query in result.news_queries if query.strip()]
+ community_queries = [query.strip() for query in result.community_queries if query.strip()]
verbose_log(
"ResearchPlanner",
- f"Identified project={project_name!r}, ticker={coin_ticker!r}, plan={len(plan)} chars",
+ (
+ f"Identified project={project_name!r}, ticker={coin_ticker!r}, "
+ f"news_queries={len(news_queries)}, community_queries={len(community_queries)}"
+ ),
)
- return {"plan": plan, "project_name": project_name, "coin_ticker": coin_ticker}
+ return {
+ "plan": result.plan,
+ "project_name": project_name,
+ "coin_ticker": coin_ticker,
+ "news_queries": news_queries,
+ "community_queries": community_queries,
+ }
diff --git a/examples/02-mcp-tool-integration/src/agents/state.py b/examples/02-mcp-tool-integration/src/agents/state.py
index 3715c40..46545e3 100644
--- a/examples/02-mcp-tool-integration/src/agents/state.py
+++ b/examples/02-mcp-tool-integration/src/agents/state.py
@@ -4,7 +4,8 @@
research_planner → [news_scanner, project_profiler, community_analyst] (parallel)
→ intelligence_compiler
-research_planner populates: plan, project_name, coin_ticker
+research_planner populates: plan, project_name, coin_ticker, news_queries,
+ community_queries
news_scanner populates: news
project_profiler populates: profile
community_analyst populates: community
@@ -23,6 +24,8 @@ class AgentState(TypedDict, total=False):
plan: str
project_name: str
coin_ticker: str
+ news_queries: list[str]
+ community_queries: list[str]
# Parallel research branch outputs
news: str
diff --git a/examples/02-mcp-tool-integration/src/agents/web_search.py b/examples/02-mcp-tool-integration/src/agents/web_search.py
new file mode 100644
index 0000000..6c16d7a
--- /dev/null
+++ b/examples/02-mcp-tool-integration/src/agents/web_search.py
@@ -0,0 +1,62 @@
+"""Shared DuckDuckGo search helpers for research agents."""
+
+from __future__ import annotations
+
+from agent_common.tracing import verbose_log
+from langchain_community.tools import DuckDuckGoSearchResults
+
+
+def _deduplicate_results(all_results: list[dict[str, str]]) -> list[dict[str, str]]:
+ """Remove duplicate search results by URL."""
+ seen_urls: set[str] = set()
+ unique_results: list[dict[str, str]] = []
+
+ for item in all_results:
+ url = item.get("link", "")
+ if url and url not in seen_urls:
+ seen_urls.add(url)
+ unique_results.append(item)
+
+ return unique_results
+
+
+async def run_search_queries(
+ queries: list[str],
+ agent_name: str,
+ max_results: int = 5,
+) -> list[dict[str, str]]:
+ """Run multiple searches and deduplicate results by URL."""
+ all_results: list[dict[str, str]] = []
+ search = DuckDuckGoSearchResults(
+ max_results=max_results, # type: ignore[call-arg]
+ output_format="list",
+ )
+
+ for query in queries:
+ try:
+ raw = await search.ainvoke(query)
+ if isinstance(raw, list):
+ all_results.extend(raw)
+ verbose_log(agent_name, f" [{query[:50]}] -> {len(raw)} results")
+ except Exception as exc:
+ verbose_log(agent_name, f" [{query[:50]}] search failed: {exc}")
+
+ unique_results = _deduplicate_results(all_results)
+ verbose_log(
+ agent_name,
+ f"Total: {len(all_results)} raw -> {len(unique_results)} unique results",
+ )
+ return unique_results
+
+
+def format_search_results(
+ results: list[dict[str, str]],
+ empty_message: str = "[No results found]",
+) -> str:
+ """Format search results as a markdown list for LLM consumption."""
+ return (
+ "\n".join(
+ f"- [{item.get('title', 'N/A')}]({item.get('link', '')}): {item.get('snippet', '')}" for item in results
+ )
+ or empty_message
+ )
diff --git a/examples/02-mcp-tool-integration/src/app.py b/examples/02-mcp-tool-integration/src/app.py
index 253eaaf..83195fc 100644
--- a/examples/02-mcp-tool-integration/src/app.py
+++ b/examples/02-mcp-tool-integration/src/app.py
@@ -3,21 +3,23 @@
This is the Software 2.0 entry point (POST /run). The Software 3.0 entry point
is the MCP server in src/mcp_servers/crypto_intelligence.py, which exposes
the same pipeline as an MCP tool that any AI client can call.
+
+Both entry points run the pipeline synchronously with a timeout boundary.
"""
from __future__ import annotations
+import asyncio
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
-from typing import cast
-from agent_common.tracing import build_langsmith_run_config, setup_tracing, verbose_log
+from agent_common.tracing import setup_tracing, verbose_log
from fastapi import FastAPI
from fastapi.responses import JSONResponse
-from langchain_core.runnables import RunnableConfig
from pydantic import BaseModel, Field
from src.agents.graph import build_graph
+from src.runtime import PIPELINE_TIMEOUT_SECONDS, build_pipeline_run_config
@asynccontextmanager
@@ -52,18 +54,6 @@ class RunResponse(BaseModel):
community: str
-def _pipeline_run_config() -> RunnableConfig:
- """Build trace metadata for the public pipeline invocation."""
- return cast(
- RunnableConfig,
- build_langsmith_run_config(
- example_name="02-mcp-tool-integration",
- pattern_slug="mcp-tool-integration",
- run_name="pattern-02-mcp-tool-integration",
- ),
- )
-
-
@app.get("/health")
async def health() -> dict[str, str]:
return {"status": "ok"}
@@ -74,9 +64,21 @@ async def run(request: RunRequest) -> RunResponse | JSONResponse:
verbose_log("System", f"Received request: {request.input[:100]}")
try:
- result = await app.state.graph.ainvoke(
- {"input": request.input},
- config=_pipeline_run_config(),
+ result = await asyncio.wait_for(
+ app.state.graph.ainvoke(
+ {"input": request.input},
+ config=build_pipeline_run_config(),
+ ),
+ timeout=PIPELINE_TIMEOUT_SECONDS,
+ )
+ except TimeoutError:
+ verbose_log("System", f"Pipeline timed out after {PIPELINE_TIMEOUT_SECONDS:.0f}s")
+ return JSONResponse(
+ status_code=504,
+ content={
+ "error": "pipeline_timeout",
+ "detail": f"Pipeline timed out after {PIPELINE_TIMEOUT_SECONDS:.0f}s",
+ },
)
except Exception as exc:
verbose_log("System", f"Pipeline failed: {exc}")
diff --git a/examples/02-mcp-tool-integration/src/coingecko.py b/examples/02-mcp-tool-integration/src/coingecko.py
index ea5b096..f4eab9d 100644
--- a/examples/02-mcp-tool-integration/src/coingecko.py
+++ b/examples/02-mcp-tool-integration/src/coingecko.py
@@ -26,27 +26,30 @@ async def _get(path: str, params: dict[str, str] | None = None) -> dict: # type
url = f"{COINGECKO_BASE}{path}"
last_exc: Exception | None = None
- for attempt in range(_MAX_RETRIES):
- try:
- async with httpx.AsyncClient(timeout=15.0) as client:
+ async with httpx.AsyncClient(timeout=15.0) as client:
+ for attempt in range(_MAX_RETRIES):
+ try:
resp = await client.get(url, params=params or {})
resp.raise_for_status()
return resp.json() # type: ignore[no-any-return]
- except (httpx.HTTPStatusError, httpx.ConnectError, httpx.ReadTimeout) as exc:
- last_exc = exc
- is_client_error = (
- isinstance(exc, httpx.HTTPStatusError)
- and exc.response.status_code < 500
- and exc.response.status_code != 429
- )
- if is_client_error:
- raise
- delay = _BASE_DELAY * (2**attempt)
- verbose_log(
- "CoinGecko",
- f"Request to {path} failed (attempt {attempt + 1}/{_MAX_RETRIES}): {exc!r} — retrying in {delay:.1f}s",
- )
- await asyncio.sleep(delay)
+ except (httpx.HTTPStatusError, httpx.ConnectError, httpx.ReadTimeout) as exc:
+ last_exc = exc
+ is_client_error = (
+ isinstance(exc, httpx.HTTPStatusError)
+ and exc.response.status_code < 500
+ and exc.response.status_code != 429
+ )
+ if is_client_error:
+ raise
+ delay = _BASE_DELAY * (2**attempt)
+ verbose_log(
+ "CoinGecko",
+ (
+ f"Request to {path} failed (attempt {attempt + 1}/{_MAX_RETRIES}): "
+ f"{exc!r} — retrying in {delay:.1f}s"
+ ),
+ )
+ await asyncio.sleep(delay)
assert last_exc is not None
raise last_exc
diff --git a/examples/02-mcp-tool-integration/src/mcp_servers/crypto_intelligence.py b/examples/02-mcp-tool-integration/src/mcp_servers/crypto_intelligence.py
index 72514f1..55e7e5c 100644
--- a/examples/02-mcp-tool-integration/src/mcp_servers/crypto_intelligence.py
+++ b/examples/02-mcp-tool-integration/src/mcp_servers/crypto_intelligence.py
@@ -10,21 +10,51 @@
from __future__ import annotations
+import asyncio
+from collections.abc import AsyncIterator
+from contextlib import asynccontextmanager
+from typing import Any
+
from agent_common.tracing import setup_tracing, verbose_log
from mcp.server.fastmcp import FastMCP
from src.agents.graph import build_graph
-
-setup_tracing()
+from src.runtime import PIPELINE_TIMEOUT_SECONDS, build_pipeline_run_config
mcp = FastMCP(
"crypto-intelligence",
host="0.0.0.0",
port=8000,
)
-app = mcp.sse_app()
+_graph: Any | None = None
+_tracing_initialized = False
+
+
+def _ensure_runtime_initialized() -> Any:
+ """Initialize tracing and graph lazily for the MCP server."""
+ global _graph, _tracing_initialized
+
+ if not _tracing_initialized:
+ setup_tracing()
+ _tracing_initialized = True
+
+ if _graph is None:
+ _graph = build_graph()
+ verbose_log("MCP", "MCP server runtime initialized")
-_graph = build_graph()
+ return _graph
+
+
+@asynccontextmanager
+async def mcp_lifespan(_: object) -> AsyncIterator[None]:
+ _ensure_runtime_initialized()
+ verbose_log("MCP", "MCP server started")
+ yield
+ verbose_log("MCP", "MCP server shutting down")
+
+
+app = mcp.sse_app()
+app.router.lifespan_context = mcp_lifespan
@mcp.tool()
@@ -47,13 +77,28 @@ async def research_crypto_project(query: str) -> str:
A structured intelligence report with executive summary, market snapshot,
key findings, recent developments, community health, risk factors, and outlook.
"""
- verbose_log("MCP", f"research_crypto_project({query!r:.80}) -- starting pipeline")
-
- result = await _graph.ainvoke({"input": query})
+ graph = _ensure_runtime_initialized()
+ preview = repr(query[:80])
+ verbose_log("MCP", f"research_crypto_project({preview}) -- starting pipeline")
+
+ try:
+ result = await asyncio.wait_for(
+ graph.ainvoke(
+ {"input": query},
+ config=build_pipeline_run_config(),
+ ),
+ timeout=PIPELINE_TIMEOUT_SECONDS,
+ )
+ except TimeoutError:
+ message = f"Pipeline timed out after {PIPELINE_TIMEOUT_SECONDS:.0f}s"
+ verbose_log("MCP", message)
+ return f"[Pipeline timeout] {message}"
+ except Exception as exc:
+ verbose_log("MCP", f"research_crypto_project failed: {exc}")
+ return f"[Pipeline failed] {type(exc).__name__}: {exc}"
report: str = result.get("report", "")
verbose_log("MCP", f"research_crypto_project -- complete ({len(report)} chars)")
-
return report
diff --git a/examples/02-mcp-tool-integration/src/runtime.py b/examples/02-mcp-tool-integration/src/runtime.py
new file mode 100644
index 0000000..7dd6330
--- /dev/null
+++ b/examples/02-mcp-tool-integration/src/runtime.py
@@ -0,0 +1,22 @@
+"""Shared runtime configuration for public pipeline entry points."""
+
+from __future__ import annotations
+
+from typing import cast
+
+from agent_common.tracing import build_langsmith_run_config
+from langchain_core.runnables import RunnableConfig
+
+PIPELINE_TIMEOUT_SECONDS = 120.0
+
+
+def build_pipeline_run_config() -> RunnableConfig:
+ """Build trace metadata for public pipeline invocations."""
+ return cast(
+ RunnableConfig,
+ build_langsmith_run_config(
+ example_name="02-mcp-tool-integration",
+ pattern_slug="mcp-tool-integration",
+ run_name="pattern-02-mcp-tool-integration",
+ ),
+ )
diff --git a/examples/02-mcp-tool-integration/tests/api/test_app_api.py b/examples/02-mcp-tool-integration/tests/api/test_app_api.py
index b221ad3..c56b802 100644
--- a/examples/02-mcp-tool-integration/tests/api/test_app_api.py
+++ b/examples/02-mcp-tool-integration/tests/api/test_app_api.py
@@ -2,6 +2,7 @@
from __future__ import annotations
+import asyncio
from dataclasses import dataclass
from typing import Any
@@ -35,6 +36,16 @@ async def ainvoke(
raise RuntimeError("LLM provider unreachable")
+class _SlowGraph:
+ async def ainvoke(
+ self,
+ payload: dict[str, str],
+ config: dict[str, Any] | None = None,
+ ) -> dict[str, Any]:
+ await asyncio.sleep(0.05)
+ return {"report": "late"}
+
+
def _make_client(monkeypatch: pytest.MonkeyPatch, graph: object) -> TestClient:
"""Create a TestClient with a pre-injected graph on app.state."""
monkeypatch.setattr(app_module, "build_graph", lambda: graph)
@@ -61,7 +72,7 @@ def test_run_endpoint_executes_graph(monkeypatch: pytest.MonkeyPatch) -> None:
)
monkeypatch.setattr(
app_module,
- "_pipeline_run_config",
+ "build_pipeline_run_config",
lambda: {"run_name": "test-run", "tags": ["example:02-mcp-tool-integration"]},
)
@@ -116,3 +127,15 @@ def test_run_endpoint_returns_502_on_pipeline_failure(monkeypatch: pytest.Monkey
data = response.json()
assert data["error"] == "pipeline_failed"
assert "LLM provider unreachable" in data["detail"]
+
+
+def test_run_endpoint_returns_504_on_pipeline_timeout(monkeypatch: pytest.MonkeyPatch) -> None:
+ monkeypatch.setattr(app_module, "PIPELINE_TIMEOUT_SECONDS", 0.01)
+
+ with _make_client(monkeypatch, _SlowGraph()) as client:
+ response = client.post("/run", json={"input": "Research Arbitrum"})
+
+ assert response.status_code == 504
+ data = response.json()
+ assert data["error"] == "pipeline_timeout"
+ assert "timed out" in data["detail"]
diff --git a/examples/02-mcp-tool-integration/tests/e2e/test_pipeline_graph.py b/examples/02-mcp-tool-integration/tests/e2e/test_pipeline_graph.py
index ab981b5..1273b45 100644
--- a/examples/02-mcp-tool-integration/tests/e2e/test_pipeline_graph.py
+++ b/examples/02-mcp-tool-integration/tests/e2e/test_pipeline_graph.py
@@ -17,13 +17,15 @@
async def test_graph_executes_all_five_nodes(monkeypatch: pytest.MonkeyPatch) -> None:
call_order: list[str] = []
- async def fake_research_planner(state: dict[str, str]) -> dict[str, str]:
+ async def fake_research_planner(state: dict[str, str]) -> dict[str, str | list[str]]:
call_order.append("research_planner")
assert state["input"] == "Research Arbitrum"
return {
- "plan": "NEWS_QUERIES:\n- Arbitrum news\nCOMMUNITY_QUERIES:\n- Arbitrum reddit",
+ "plan": "1. News\n2. Profile\n3. Community",
"project_name": "Arbitrum",
"coin_ticker": "ARB",
+ "news_queries": ["Arbitrum news"],
+ "community_queries": ["Arbitrum reddit"],
}
async def fake_news_scanner(state: dict[str, str]) -> dict[str, str]:
diff --git a/examples/02-mcp-tool-integration/tests/unit/test_agent_nodes.py b/examples/02-mcp-tool-integration/tests/unit/test_agent_nodes.py
index eaa8a6b..dd8cad2 100644
--- a/examples/02-mcp-tool-integration/tests/unit/test_agent_nodes.py
+++ b/examples/02-mcp-tool-integration/tests/unit/test_agent_nodes.py
@@ -3,7 +3,7 @@
from __future__ import annotations
from dataclasses import dataclass
-from unittest.mock import AsyncMock, patch
+from unittest.mock import AsyncMock
import pytest
from src.agents import (
@@ -20,7 +20,7 @@ class _DummyResponse:
content: str
-class _DummyModel:
+class _DummyTextModel:
def __init__(self, response_text: str) -> None:
self._response_text = response_text
self.calls: list[list[object]] = []
@@ -30,32 +30,65 @@ async def ainvoke(self, messages: list[object]) -> _DummyResponse:
return _DummyResponse(content=self._response_text)
+class _DummyStructuredModel:
+ def __init__(self, response: research_planner.ResearchPlan) -> None:
+ self._response = response
+ self.calls: list[list[object]] = []
+
+ async def ainvoke(self, messages: list[object]) -> research_planner.ResearchPlan:
+ self.calls.append(messages)
+ return self._response
+
+
+class _DummyPlannerModel:
+ def __init__(self, response: research_planner.ResearchPlan) -> None:
+ self.schemas: list[type[research_planner.ResearchPlan]] = []
+ self.structured_model = _DummyStructuredModel(response)
+
+ def with_structured_output(
+ self,
+ schema: type[research_planner.ResearchPlan],
+ ) -> _DummyStructuredModel:
+ self.schemas.append(schema)
+ return self.structured_model
+
+
# --- Research Planner ---
@pytest.mark.asyncio
-async def test_research_planner_extracts_project_identifiers(monkeypatch: pytest.MonkeyPatch) -> None:
- plan_text = (
- "PROJECT_NAME: Arbitrum\n"
- "COIN_TICKER: ARB\n\n"
- "1. Recent news\n2. Project fundamentals\n3. Community activity\n\n"
- "NEWS_QUERIES:\n- Arbitrum latest news 2026\n\n"
- "COMMUNITY_QUERIES:\n- Arbitrum site:reddit.com"
+async def test_research_planner_returns_structured_output(monkeypatch: pytest.MonkeyPatch) -> None:
+ planner_output = research_planner.ResearchPlan(
+ project_name="Arbitrum",
+ coin_ticker="arb",
+ plan="1. Recent news\n2. Project fundamentals\n3. Community activity",
+ news_queries=["Arbitrum latest news 2026", "Arbitrum partnership announcement"],
+ community_queries=["Arbitrum site:reddit.com", "Arbitrum twitter sentiment"],
)
- model = _DummyModel(plan_text)
+ model = _DummyPlannerModel(planner_output)
monkeypatch.setattr(research_planner, "get_chat_model", lambda: model)
result = await research_planner.research_planner_node({"input": "Research Arbitrum"})
- assert result["plan"] == plan_text
+ assert result["plan"] == planner_output.plan
assert result["project_name"] == "Arbitrum"
assert result["coin_ticker"] == "ARB"
- assert len(model.calls) == 1
+ assert result["news_queries"] == planner_output.news_queries
+ assert result["community_queries"] == planner_output.community_queries
+ assert model.schemas == [research_planner.ResearchPlan]
+ assert len(model.structured_model.calls) == 1
@pytest.mark.asyncio
-async def test_research_planner_falls_back_on_missing_fields(monkeypatch: pytest.MonkeyPatch) -> None:
- model = _DummyModel("Just a plain plan without structured fields")
+async def test_research_planner_falls_back_on_blank_project_name(monkeypatch: pytest.MonkeyPatch) -> None:
+ planner_output = research_planner.ResearchPlan(
+ project_name=" ",
+ coin_ticker="",
+ plan="1. News\n2. Profile\n3. Community",
+ news_queries=["Arbitrum latest news"],
+ community_queries=["Arbitrum reddit"],
+ )
+ model = _DummyPlannerModel(planner_output)
monkeypatch.setattr(research_planner, "get_chat_model", lambda: model)
result = await research_planner.research_planner_node({"input": "Research Arbitrum"})
@@ -68,72 +101,48 @@ async def test_research_planner_falls_back_on_missing_fields(monkeypatch: pytest
@pytest.mark.asyncio
-async def test_news_scanner_uses_project_name(monkeypatch: pytest.MonkeyPatch) -> None:
- model = _DummyModel("Arbitrum announced Orbit chains. TVL exceeded $10B.")
+async def test_news_scanner_uses_planner_queries(monkeypatch: pytest.MonkeyPatch) -> None:
+ model = _DummyTextModel("Arbitrum announced Orbit chains. TVL exceeded $10B.")
monkeypatch.setattr(news_scanner, "get_chat_model", lambda: model)
+ run_search_queries = AsyncMock(
+ return_value=[
+ {"title": "Arbitrum news", "snippet": "Orbit chains launched", "link": "https://example.com/1"},
+ ]
+ )
+ monkeypatch.setattr(news_scanner, "run_search_queries", run_search_queries)
- mock_search = AsyncMock()
- mock_search.ainvoke = AsyncMock(
- return_value=[{"title": "Arbitrum news", "snippet": "Orbit chains launched", "link": "https://example.com/1"}]
+ result = await news_scanner.news_scanner_node(
+ {
+ "input": "Research Arbitrum",
+ "project_name": "Arbitrum",
+ "coin_ticker": "ARB",
+ "news_queries": ["Arbitrum latest news 2026"],
+ }
)
- with patch.object(news_scanner, "DuckDuckGoSearchResults", return_value=mock_search):
- result = await news_scanner.news_scanner_node(
- {
- "input": "Research Arbitrum",
- "project_name": "Arbitrum",
- "coin_ticker": "ARB",
- "plan": "NEWS_QUERIES:\n- Arbitrum latest news 2026",
- }
- )
assert "Orbit chains" in result["news"]
+ run_search_queries.assert_awaited_once_with(["Arbitrum latest news 2026"], "NewsScanner")
assert len(model.calls) == 1
@pytest.mark.asyncio
-async def test_news_scanner_degrades_on_search_failure(monkeypatch: pytest.MonkeyPatch) -> None:
- model = _DummyModel("Unable to find recent news due to search unavailability.")
- monkeypatch.setattr(news_scanner, "get_chat_model", lambda: model)
-
- mock_search = AsyncMock()
- mock_search.ainvoke = AsyncMock(side_effect=RuntimeError("Search API down"))
- with patch.object(news_scanner, "DuckDuckGoSearchResults", return_value=mock_search):
- result = await news_scanner.news_scanner_node(
- {
- "input": "Research Arbitrum",
- "project_name": "Arbitrum",
- "coin_ticker": "ARB",
- "plan": "",
- }
- )
-
- assert "news" in result
- assert len(model.calls) == 1
-
-
-@pytest.mark.asyncio
-async def test_news_scanner_deduplicates_results(monkeypatch: pytest.MonkeyPatch) -> None:
- model = _DummyModel("Deduplicated analysis.")
+async def test_news_scanner_degrades_on_empty_search_results(monkeypatch: pytest.MonkeyPatch) -> None:
+ model = _DummyTextModel("Unable to find recent news due to search unavailability.")
monkeypatch.setattr(news_scanner, "get_chat_model", lambda: model)
+ run_search_queries = AsyncMock(return_value=[])
+ monkeypatch.setattr(news_scanner, "run_search_queries", run_search_queries)
- mock_search = AsyncMock()
- mock_search.ainvoke = AsyncMock(
- return_value=[
- {"title": "Same article", "snippet": "Content", "link": "https://example.com/same"},
- {"title": "Same article copy", "snippet": "Content", "link": "https://example.com/same"},
- ]
+ result = await news_scanner.news_scanner_node(
+ {
+ "input": "Research Arbitrum",
+ "project_name": "Arbitrum",
+ "coin_ticker": "ARB",
+ "news_queries": [],
+ }
)
- with patch.object(news_scanner, "DuckDuckGoSearchResults", return_value=mock_search):
- result = await news_scanner.news_scanner_node(
- {
- "input": "Research Arbitrum",
- "project_name": "Arbitrum",
- "coin_ticker": "ARB",
- "plan": "NEWS_QUERIES:\n- query one\n- query two",
- }
- )
assert "news" in result
+ assert len(model.calls) == 1
# --- Project Profiler ---
@@ -141,7 +150,7 @@ async def test_news_scanner_deduplicates_results(monkeypatch: pytest.MonkeyPatch
@pytest.mark.asyncio
async def test_project_profiler_uses_project_name(monkeypatch: pytest.MonkeyPatch) -> None:
- model = _DummyModel("Arbitrum is an L2 optimistic rollup. Price: $1.23, Market cap: $4.5B")
+ model = _DummyTextModel("Arbitrum is an L2 optimistic rollup. Price: $1.23, Market cap: $4.5B")
monkeypatch.setattr(project_profiler, "get_chat_model", lambda: model)
monkeypatch.setattr(
@@ -174,7 +183,7 @@ async def test_project_profiler_uses_project_name(monkeypatch: pytest.MonkeyPatc
@pytest.mark.asyncio
async def test_project_profiler_degrades_on_api_failure(monkeypatch: pytest.MonkeyPatch) -> None:
- model = _DummyModel("Limited profile available due to data source issues.")
+ model = _DummyTextModel("Limited profile available due to data source issues.")
monkeypatch.setattr(project_profiler, "get_chat_model", lambda: model)
monkeypatch.setattr(
@@ -205,50 +214,49 @@ async def test_project_profiler_degrades_on_api_failure(monkeypatch: pytest.Monk
assert len(model.calls) == 1
-# --- Community Analyst (now uses DuckDuckGo, not CoinGecko) ---
+# --- Community Analyst ---
@pytest.mark.asyncio
async def test_community_analyst_uses_social_search(monkeypatch: pytest.MonkeyPatch) -> None:
- model = _DummyModel("Community Health: Strong. Active Reddit discussions and positive Twitter sentiment.")
+ model = _DummyTextModel("Community Health: Strong. Active Reddit discussions and positive Twitter sentiment.")
monkeypatch.setattr(community_analyst, "get_chat_model", lambda: model)
-
- mock_search = AsyncMock()
- mock_search.ainvoke = AsyncMock(
+ run_search_queries = AsyncMock(
return_value=[
{"title": "Arbitrum Reddit", "snippet": "Great community", "link": "https://reddit.com/r/arbitrum/1"},
]
)
- with patch.object(community_analyst, "DuckDuckGoSearchResults", return_value=mock_search):
- result = await community_analyst.community_analyst_node(
- {
- "input": "Research Arbitrum",
- "project_name": "Arbitrum",
- "coin_ticker": "ARB",
- "plan": "COMMUNITY_QUERIES:\n- Arbitrum site:reddit.com",
- }
- )
+ monkeypatch.setattr(community_analyst, "run_search_queries", run_search_queries)
+
+ result = await community_analyst.community_analyst_node(
+ {
+ "input": "Research Arbitrum",
+ "project_name": "Arbitrum",
+ "coin_ticker": "ARB",
+ "community_queries": ["Arbitrum site:reddit.com"],
+ }
+ )
assert "Strong" in result["community"]
+ run_search_queries.assert_awaited_once_with(["Arbitrum site:reddit.com"], "CommunityAnalyst")
assert len(model.calls) == 1
@pytest.mark.asyncio
-async def test_community_analyst_degrades_on_search_failure(monkeypatch: pytest.MonkeyPatch) -> None:
- model = _DummyModel("Community analysis unavailable due to search issues.")
+async def test_community_analyst_degrades_on_empty_search_results(monkeypatch: pytest.MonkeyPatch) -> None:
+ model = _DummyTextModel("Community analysis unavailable due to search issues.")
monkeypatch.setattr(community_analyst, "get_chat_model", lambda: model)
+ run_search_queries = AsyncMock(return_value=[])
+ monkeypatch.setattr(community_analyst, "run_search_queries", run_search_queries)
- mock_search = AsyncMock()
- mock_search.ainvoke = AsyncMock(side_effect=RuntimeError("Search API down"))
- with patch.object(community_analyst, "DuckDuckGoSearchResults", return_value=mock_search):
- result = await community_analyst.community_analyst_node(
- {
- "input": "Research Arbitrum",
- "project_name": "Arbitrum",
- "coin_ticker": "ARB",
- "plan": "",
- }
- )
+ result = await community_analyst.community_analyst_node(
+ {
+ "input": "Research Arbitrum",
+ "project_name": "Arbitrum",
+ "coin_ticker": "ARB",
+ "community_queries": [],
+ }
+ )
assert "community" in result
assert len(model.calls) == 1
@@ -259,7 +267,7 @@ async def test_community_analyst_degrades_on_search_failure(monkeypatch: pytest.
@pytest.mark.asyncio
async def test_intelligence_compiler_produces_report(monkeypatch: pytest.MonkeyPatch) -> None:
- model = _DummyModel("## Executive Summary\nArbitrum is a leading L2 scaling solution.")
+ model = _DummyTextModel("## Executive Summary\nArbitrum is a leading L2 scaling solution.")
monkeypatch.setattr(intelligence_compiler, "get_chat_model", lambda: model)
state = {
diff --git a/examples/02-mcp-tool-integration/tests/unit/test_state.py b/examples/02-mcp-tool-integration/tests/unit/test_state.py
index 8d664f4..ab6018a 100644
--- a/examples/02-mcp-tool-integration/tests/unit/test_state.py
+++ b/examples/02-mcp-tool-integration/tests/unit/test_state.py
@@ -16,6 +16,8 @@ def test_state_all_fields() -> None:
"plan": "1. News\n2. Team",
"project_name": "Arbitrum",
"coin_ticker": "ARB",
+ "news_queries": ["Arbitrum latest news 2026"],
+ "community_queries": ["Arbitrum site:reddit.com"],
"news": "Partnership announced",
"profile": "L2 scaling solution",
"community": "Strong community health",
@@ -23,5 +25,7 @@ def test_state_all_fields() -> None:
}
assert state["project_name"] == "Arbitrum"
assert state["coin_ticker"] == "ARB"
+ assert state["news_queries"] == ["Arbitrum latest news 2026"]
+ assert state["community_queries"] == ["Arbitrum site:reddit.com"]
assert state["profile"] == "L2 scaling solution"
assert state["community"] == "Strong community health"
diff --git a/examples/02-mcp-tool-integration/tests/unit/test_web_search.py b/examples/02-mcp-tool-integration/tests/unit/test_web_search.py
new file mode 100644
index 0000000..c5d78ec
--- /dev/null
+++ b/examples/02-mcp-tool-integration/tests/unit/test_web_search.py
@@ -0,0 +1,64 @@
+"""Unit tests for shared web search helpers."""
+
+from __future__ import annotations
+
+from unittest.mock import AsyncMock
+
+import pytest
+from src.agents import web_search
+
+
+@pytest.mark.asyncio
+async def test_run_search_queries_deduplicates_urls(monkeypatch: pytest.MonkeyPatch) -> None:
+ mock_search = AsyncMock()
+ mock_search.ainvoke = AsyncMock(
+ side_effect=[
+ [
+ {"title": "Same article", "snippet": "Content", "link": "https://example.com/same"},
+ {"title": "Unique article", "snippet": "More", "link": "https://example.com/unique"},
+ ],
+ [
+ {"title": "Same article copy", "snippet": "Content", "link": "https://example.com/same"},
+ ],
+ ]
+ )
+ monkeypatch.setattr(web_search, "DuckDuckGoSearchResults", lambda **_: mock_search)
+
+ results = await web_search.run_search_queries(["query one", "query two"], "NewsScanner")
+
+ assert results == [
+ {"title": "Same article", "snippet": "Content", "link": "https://example.com/same"},
+ {"title": "Unique article", "snippet": "More", "link": "https://example.com/unique"},
+ ]
+
+
+@pytest.mark.asyncio
+async def test_run_search_queries_ignores_search_failures(monkeypatch: pytest.MonkeyPatch) -> None:
+ mock_search = AsyncMock()
+ mock_search.ainvoke = AsyncMock(
+ side_effect=[
+ RuntimeError("Search API down"),
+ [{"title": "Recovered", "snippet": "Working again", "link": "https://example.com/recovered"}],
+ ]
+ )
+ monkeypatch.setattr(web_search, "DuckDuckGoSearchResults", lambda **_: mock_search)
+
+ results = await web_search.run_search_queries(["first", "second"], "CommunityAnalyst")
+
+ assert results == [
+ {"title": "Recovered", "snippet": "Working again", "link": "https://example.com/recovered"},
+ ]
+
+
+def test_format_search_results_formats_markdown_list() -> None:
+ formatted = web_search.format_search_results(
+ [{"title": "Arbitrum news", "snippet": "Orbit chains launched", "link": "https://example.com/1"}]
+ )
+
+ assert formatted == "- [Arbitrum news](https://example.com/1): Orbit chains launched"
+
+
+def test_format_search_results_handles_empty_results() -> None:
+ assert web_search.format_search_results([], empty_message="[No social media results found]") == (
+ "[No social media results found]"
+ )