Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions agentenv-mcp/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg

# Virtual environments
.venv/
venv/
ENV/

# IDE
.idea/
.vscode/
*.swp
*.swo

# Testing
.pytest_cache/
.coverage
htmlcov/

# Logs
*.log

# UV
.uv/
uv.lock

# Ruff
.ruff_cache/
136 changes: 136 additions & 0 deletions agentenv-mcp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# AgentEnv MCP

MCP (Model Context Protocol) integration for AgentGym environments using FastMCP.

## Overview

This package provides a modular framework for creating RL environments that expose their action space as MCP tools. Agents connect via FastMCP client to discover and execute tools.

### Architecture

```text
┌─────────────────┐ MCP/SSE ┌─────────────────┐
│ MCPAgent │◄────────────────►│ MCP Server │
│ (FastMCP │ │ (FastMCP) │
│ Client) │ │ │
│ │ list_tools() │ @mcp.tool() │
│ │ call_tool() │ - up() │
│ │ │ - down() │
│ │ │ - left() │
│ │ │ - right() │
└─────────────────┘ └─────────────────┘
```

## Installation

```bash
cd agentenv-mcp
uv sync
```

## Quick Start

### 1. Start the MCP Server

```bash
# Run directional navigation server
uv run python -m agentenv_mcp.mcp_servers.directional

# Or via CLI
uv run agentenv-mcp server --port 8001
```

### 2. Run an Agent

```bash
# Run agent demo
uv run agentenv-mcp agent --server-url http://localhost:8001/sse --steps 10
```

### 3. End-to-End Test

```bash
uv run python test_e2e.py
```

## Project Structure

```text
agentenv-mcp/
├── agentenv_mcp/
│ ├── __init__.py # Package exports
│ ├── client.py # FastMCP client wrapper
│ ├── agent.py # Agent for MCP interaction
│ ├── environment.py # Gym-like environment wrapper
│ ├── logging_config.py # Centralized logging
│ ├── cli.py # CLI commands
│ ├── mcp_servers/ # MCP server implementations
│ │ ├── __init__.py
│ │ ├── base.py # Base server classes
│ │ └── directional.py # Directional navigation example
│ └── rewards/ # Reward calculators
│ ├── __init__.py
│ └── base.py
├── test_e2e.py # End-to-end test
├── pyproject.toml
└── README.md
```

## Creating Custom MCP Servers

Extend `BaseMCPServer` to create new environments:

```python
from agentenv_mcp.mcp_servers.base import BaseMCPServer, MCPServerState
from dataclasses import dataclass

@dataclass
class MyState(MCPServerState):
value: int = 0

class MyServer(BaseMCPServer):
def _create_default_state(self) -> MyState:
return MyState()

def _register_tools(self) -> None:
@self.mcp.tool()
def increment() -> str:
self.state.value += 1
return f"Value: {self.state.value}"

@self.mcp.tool()
def decrement() -> str:
self.state.value -= 1
return f"Value: {self.state.value}"
```

## Using the Agent

```python
import asyncio
from agentenv_mcp import MCPAgent

async def main():
agent = MCPAgent("http://localhost:8001/sse", max_steps=50)

# Discover tools
tools = await agent.discover_tools()
print(f"Available tools: {tools}")

# Run episode
trajectory = await agent.run_episode(policy="random")
print(f"Total reward: {trajectory.total_reward()}")

asyncio.run(main())
```

## Dependencies

- **fastmcp**: FastMCP library for MCP server and client
- **fastapi**: API framework (for future HTTP endpoints)
- **uvicorn**: ASGI server
- **pydantic**: Data validation

## License

MIT
28 changes: 28 additions & 0 deletions agentenv-mcp/agentenv_mcp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""
AgentEnv MCP - Model Context Protocol integration for AgentGym.

This package provides MCP server implementations and a FastMCP client
for building RL environments with tool-based action spaces.

Key Components:
- mcp_servers: Extensible MCP server implementations
- client: FastMCP client for connecting to MCP servers
- agent: Agent for interacting with MCP environments
- environment: Gym-like environment wrapper
"""

__version__ = "0.1.0"

from .agent import AgentTrajectory, MCPAgent
from .client import MCPClient
from .environment import MCPEnvironment
from .logging_config import get_logger, setup_logging

__all__ = [
"MCPClient",
"MCPAgent",
"AgentTrajectory",
"MCPEnvironment",
"setup_logging",
"get_logger",
]
165 changes: 165 additions & 0 deletions agentenv-mcp/agentenv_mcp/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
"""
Agent for interacting with MCP environments.

Provides a simple agent that can explore MCP servers by calling tools.
"""

import random
from dataclasses import dataclass, field
from typing import Any

from .client import MCPClient
from .logging_config import get_logger

logger = get_logger("agent")


@dataclass
class AgentTrajectory:
"""Records agent's trajectory through the environment."""

actions: list[str] = field(default_factory=list)
observations: list[str] = field(default_factory=list)
rewards: list[float] = field(default_factory=list)

def add_step(self, action: str, observation: str, reward: float = 0.0) -> None:
"""Add a step to the trajectory."""
self.actions.append(action)
self.observations.append(observation)
self.rewards.append(reward)

def total_reward(self) -> float:
"""Get total accumulated reward."""
return sum(self.rewards)

def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary."""
return {
"actions": self.actions,
"observations": self.observations,
"rewards": self.rewards,
"total_reward": self.total_reward(),
}


class MCPAgent:
"""
Agent that interacts with MCP servers.

Uses FastMCP client to discover and execute tools.
"""

def __init__(self, server_url: str, max_steps: int = 50):
"""
Initialize the agent.

Args:
server_url: URL of MCP server to connect to
max_steps: Maximum steps per episode
"""
self.client = MCPClient(server_url)
self.max_steps = max_steps
self.trajectory = AgentTrajectory()
self._tools: list[str] = []
logger.info(f"Created MCPAgent for {server_url}")

async def discover_tools(self) -> list[str]:
"""
Discover available tools from the MCP server.

Returns:
List of available tool names
"""
self._tools = await self.client.list_tools()
logger.info(f"Discovered tools: {self._tools}")
return self._tools

async def step(self, action: str) -> str:
"""
Execute an action (tool call) on the MCP server.

Args:
action: Tool name to execute

Returns:
Observation from the tool
"""
observation = await self.client.call_tool(action)
reward = 0.1 if not observation.startswith("Error") else -0.1
self.trajectory.add_step(action, observation, reward)
logger.debug(f"Step: {action} -> {observation[:50]}...")
return observation

async def reset(self) -> str:
"""
Reset the environment.

Returns:
Initial observation
"""
self.trajectory = AgentTrajectory()
if "reset" in self._tools:
return await self.client.call_tool("reset")
return "Environment ready"

async def run_episode(self, policy: str = "random") -> AgentTrajectory:
"""
Run a complete episode with the given policy.

Args:
policy: Policy to use ("random" for random actions)

Returns:
Complete trajectory
"""
await self.discover_tools()
await self.reset()

# Filter out reset tool from action space
action_space = [t for t in self._tools if t != "reset"]

if not action_space:
logger.warning("No actions available")
return self.trajectory

logger.info(f"Starting episode with {len(action_space)} available actions")

for step in range(self.max_steps):
if policy == "random":
action = random.choice(action_space)
else:
action = action_space[0] # Default: first action

observation = await self.step(action)
logger.info(f"Step {step + 1}: {action} -> {observation}")

# Simple termination check
if "done" in observation.lower() or "goal" in observation.lower():
logger.info("Episode completed (goal reached)")
break

return self.trajectory


async def run_agent_demo(server_url: str, num_steps: int = 10) -> dict[str, Any]:
"""
Run a demo of the agent interacting with an MCP server.

Args:
server_url: URL of MCP server
num_steps: Number of steps to run

Returns:
Trajectory data
"""
agent = MCPAgent(server_url, max_steps=num_steps)
trajectory = await agent.run_episode(policy="random")

result = {
"trajectory": trajectory.to_dict(),
"tools": agent._tools,
"server_url": server_url,
}

logger.info(f"Demo complete. Total reward: {trajectory.total_reward()}")
return result
Loading