From 3baa483ad6703fcdb61ca569a8d1ab6220f5d510 Mon Sep 17 00:00:00 2001 From: supmo668 Date: Tue, 2 Dec 2025 18:49:37 -0800 Subject: [PATCH] feat(agentenv-mcp): Add MCP environment integration with FastMCP Adds a modular framework for creating RL environments that expose their action space as MCP tools using FastMCP for both server and client. Key features: - mcp_servers/ submodule with extensible base classes - DirectionalMCPServer as example implementation - FastMCP client for tool discovery and execution - MCPAgent for environment interaction - Centralized logging configuration - End-to-end test demonstrating full workflow Architecture: - MCP servers register tools via @mcp.tool() decorators - Agent connects via FastMCP Client to list_tools() and call_tool() - Environment provides Gym-like interface (reset/step) Usage: uv run agentenv-mcp server --port 8001 uv run agentenv-mcp agent --server-url http://localhost:8001/sse --- agentenv-mcp/.gitignore | 47 +++++ agentenv-mcp/README.md | 136 +++++++++++++++ agentenv-mcp/agentenv_mcp/__init__.py | 28 +++ agentenv-mcp/agentenv_mcp/agent.py | 165 ++++++++++++++++++ agentenv-mcp/agentenv_mcp/cli.py | 95 ++++++++++ agentenv-mcp/agentenv_mcp/client.py | 117 +++++++++++++ agentenv-mcp/agentenv_mcp/environment.py | 145 +++++++++++++++ agentenv-mcp/agentenv_mcp/logging_config.py | 69 ++++++++ .../agentenv_mcp/mcp_servers/__init__.py | 15 ++ agentenv-mcp/agentenv_mcp/mcp_servers/base.py | 95 ++++++++++ .../agentenv_mcp/mcp_servers/directional.py | 128 ++++++++++++++ agentenv-mcp/agentenv_mcp/rewards/__init__.py | 9 + agentenv-mcp/agentenv_mcp/rewards/base.py | 66 +++++++ agentenv-mcp/pyproject.toml | 32 ++++ agentenv-mcp/test_e2e.py | 127 ++++++++++++++ 15 files changed, 1274 insertions(+) create mode 100644 agentenv-mcp/.gitignore create mode 100644 agentenv-mcp/README.md create mode 100644 agentenv-mcp/agentenv_mcp/__init__.py create mode 100644 agentenv-mcp/agentenv_mcp/agent.py create mode 100644 agentenv-mcp/agentenv_mcp/cli.py create mode 100644 agentenv-mcp/agentenv_mcp/client.py create mode 100644 agentenv-mcp/agentenv_mcp/environment.py create mode 100644 agentenv-mcp/agentenv_mcp/logging_config.py create mode 100644 agentenv-mcp/agentenv_mcp/mcp_servers/__init__.py create mode 100644 agentenv-mcp/agentenv_mcp/mcp_servers/base.py create mode 100644 agentenv-mcp/agentenv_mcp/mcp_servers/directional.py create mode 100644 agentenv-mcp/agentenv_mcp/rewards/__init__.py create mode 100644 agentenv-mcp/agentenv_mcp/rewards/base.py create mode 100644 agentenv-mcp/pyproject.toml create mode 100644 agentenv-mcp/test_e2e.py diff --git a/agentenv-mcp/.gitignore b/agentenv-mcp/.gitignore new file mode 100644 index 00000000..e6f22491 --- /dev/null +++ b/agentenv-mcp/.gitignore @@ -0,0 +1,47 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual environments +.venv/ +venv/ +ENV/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Testing +.pytest_cache/ +.coverage +htmlcov/ + +# Logs +*.log + +# UV +.uv/ +uv.lock + +# Ruff +.ruff_cache/ diff --git a/agentenv-mcp/README.md b/agentenv-mcp/README.md new file mode 100644 index 00000000..f5236c68 --- /dev/null +++ b/agentenv-mcp/README.md @@ -0,0 +1,136 @@ +# AgentEnv MCP + +MCP (Model Context Protocol) integration for AgentGym environments using FastMCP. + +## Overview + +This package provides a modular framework for creating RL environments that expose their action space as MCP tools. Agents connect via FastMCP client to discover and execute tools. + +### Architecture + +```text +┌─────────────────┐ MCP/SSE ┌─────────────────┐ +│ MCPAgent │◄────────────────►│ MCP Server │ +│ (FastMCP │ │ (FastMCP) │ +│ Client) │ │ │ +│ │ list_tools() │ @mcp.tool() │ +│ │ call_tool() │ - up() │ +│ │ │ - down() │ +│ │ │ - left() │ +│ │ │ - right() │ +└─────────────────┘ └─────────────────┘ +``` + +## Installation + +```bash +cd agentenv-mcp +uv sync +``` + +## Quick Start + +### 1. Start the MCP Server + +```bash +# Run directional navigation server +uv run python -m agentenv_mcp.mcp_servers.directional + +# Or via CLI +uv run agentenv-mcp server --port 8001 +``` + +### 2. Run an Agent + +```bash +# Run agent demo +uv run agentenv-mcp agent --server-url http://localhost:8001/sse --steps 10 +``` + +### 3. End-to-End Test + +```bash +uv run python test_e2e.py +``` + +## Project Structure + +```text +agentenv-mcp/ +├── agentenv_mcp/ +│ ├── __init__.py # Package exports +│ ├── client.py # FastMCP client wrapper +│ ├── agent.py # Agent for MCP interaction +│ ├── environment.py # Gym-like environment wrapper +│ ├── logging_config.py # Centralized logging +│ ├── cli.py # CLI commands +│ ├── mcp_servers/ # MCP server implementations +│ │ ├── __init__.py +│ │ ├── base.py # Base server classes +│ │ └── directional.py # Directional navigation example +│ └── rewards/ # Reward calculators +│ ├── __init__.py +│ └── base.py +├── test_e2e.py # End-to-end test +├── pyproject.toml +└── README.md +``` + +## Creating Custom MCP Servers + +Extend `BaseMCPServer` to create new environments: + +```python +from agentenv_mcp.mcp_servers.base import BaseMCPServer, MCPServerState +from dataclasses import dataclass + +@dataclass +class MyState(MCPServerState): + value: int = 0 + +class MyServer(BaseMCPServer): + def _create_default_state(self) -> MyState: + return MyState() + + def _register_tools(self) -> None: + @self.mcp.tool() + def increment() -> str: + self.state.value += 1 + return f"Value: {self.state.value}" + + @self.mcp.tool() + def decrement() -> str: + self.state.value -= 1 + return f"Value: {self.state.value}" +``` + +## Using the Agent + +```python +import asyncio +from agentenv_mcp import MCPAgent + +async def main(): + agent = MCPAgent("http://localhost:8001/sse", max_steps=50) + + # Discover tools + tools = await agent.discover_tools() + print(f"Available tools: {tools}") + + # Run episode + trajectory = await agent.run_episode(policy="random") + print(f"Total reward: {trajectory.total_reward()}") + +asyncio.run(main()) +``` + +## Dependencies + +- **fastmcp**: FastMCP library for MCP server and client +- **fastapi**: API framework (for future HTTP endpoints) +- **uvicorn**: ASGI server +- **pydantic**: Data validation + +## License + +MIT diff --git a/agentenv-mcp/agentenv_mcp/__init__.py b/agentenv-mcp/agentenv_mcp/__init__.py new file mode 100644 index 00000000..ead3d3ae --- /dev/null +++ b/agentenv-mcp/agentenv_mcp/__init__.py @@ -0,0 +1,28 @@ +""" +AgentEnv MCP - Model Context Protocol integration for AgentGym. + +This package provides MCP server implementations and a FastMCP client +for building RL environments with tool-based action spaces. + +Key Components: +- mcp_servers: Extensible MCP server implementations +- client: FastMCP client for connecting to MCP servers +- agent: Agent for interacting with MCP environments +- environment: Gym-like environment wrapper +""" + +__version__ = "0.1.0" + +from .agent import AgentTrajectory, MCPAgent +from .client import MCPClient +from .environment import MCPEnvironment +from .logging_config import get_logger, setup_logging + +__all__ = [ + "MCPClient", + "MCPAgent", + "AgentTrajectory", + "MCPEnvironment", + "setup_logging", + "get_logger", +] diff --git a/agentenv-mcp/agentenv_mcp/agent.py b/agentenv-mcp/agentenv_mcp/agent.py new file mode 100644 index 00000000..62494737 --- /dev/null +++ b/agentenv-mcp/agentenv_mcp/agent.py @@ -0,0 +1,165 @@ +""" +Agent for interacting with MCP environments. + +Provides a simple agent that can explore MCP servers by calling tools. +""" + +import random +from dataclasses import dataclass, field +from typing import Any + +from .client import MCPClient +from .logging_config import get_logger + +logger = get_logger("agent") + + +@dataclass +class AgentTrajectory: + """Records agent's trajectory through the environment.""" + + actions: list[str] = field(default_factory=list) + observations: list[str] = field(default_factory=list) + rewards: list[float] = field(default_factory=list) + + def add_step(self, action: str, observation: str, reward: float = 0.0) -> None: + """Add a step to the trajectory.""" + self.actions.append(action) + self.observations.append(observation) + self.rewards.append(reward) + + def total_reward(self) -> float: + """Get total accumulated reward.""" + return sum(self.rewards) + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary.""" + return { + "actions": self.actions, + "observations": self.observations, + "rewards": self.rewards, + "total_reward": self.total_reward(), + } + + +class MCPAgent: + """ + Agent that interacts with MCP servers. + + Uses FastMCP client to discover and execute tools. + """ + + def __init__(self, server_url: str, max_steps: int = 50): + """ + Initialize the agent. + + Args: + server_url: URL of MCP server to connect to + max_steps: Maximum steps per episode + """ + self.client = MCPClient(server_url) + self.max_steps = max_steps + self.trajectory = AgentTrajectory() + self._tools: list[str] = [] + logger.info(f"Created MCPAgent for {server_url}") + + async def discover_tools(self) -> list[str]: + """ + Discover available tools from the MCP server. + + Returns: + List of available tool names + """ + self._tools = await self.client.list_tools() + logger.info(f"Discovered tools: {self._tools}") + return self._tools + + async def step(self, action: str) -> str: + """ + Execute an action (tool call) on the MCP server. + + Args: + action: Tool name to execute + + Returns: + Observation from the tool + """ + observation = await self.client.call_tool(action) + reward = 0.1 if not observation.startswith("Error") else -0.1 + self.trajectory.add_step(action, observation, reward) + logger.debug(f"Step: {action} -> {observation[:50]}...") + return observation + + async def reset(self) -> str: + """ + Reset the environment. + + Returns: + Initial observation + """ + self.trajectory = AgentTrajectory() + if "reset" in self._tools: + return await self.client.call_tool("reset") + return "Environment ready" + + async def run_episode(self, policy: str = "random") -> AgentTrajectory: + """ + Run a complete episode with the given policy. + + Args: + policy: Policy to use ("random" for random actions) + + Returns: + Complete trajectory + """ + await self.discover_tools() + await self.reset() + + # Filter out reset tool from action space + action_space = [t for t in self._tools if t != "reset"] + + if not action_space: + logger.warning("No actions available") + return self.trajectory + + logger.info(f"Starting episode with {len(action_space)} available actions") + + for step in range(self.max_steps): + if policy == "random": + action = random.choice(action_space) + else: + action = action_space[0] # Default: first action + + observation = await self.step(action) + logger.info(f"Step {step + 1}: {action} -> {observation}") + + # Simple termination check + if "done" in observation.lower() or "goal" in observation.lower(): + logger.info("Episode completed (goal reached)") + break + + return self.trajectory + + +async def run_agent_demo(server_url: str, num_steps: int = 10) -> dict[str, Any]: + """ + Run a demo of the agent interacting with an MCP server. + + Args: + server_url: URL of MCP server + num_steps: Number of steps to run + + Returns: + Trajectory data + """ + agent = MCPAgent(server_url, max_steps=num_steps) + trajectory = await agent.run_episode(policy="random") + + result = { + "trajectory": trajectory.to_dict(), + "tools": agent._tools, + "server_url": server_url, + } + + logger.info(f"Demo complete. Total reward: {trajectory.total_reward()}") + return result diff --git a/agentenv-mcp/agentenv_mcp/cli.py b/agentenv-mcp/agentenv_mcp/cli.py new file mode 100644 index 00000000..287cfaab --- /dev/null +++ b/agentenv-mcp/agentenv_mcp/cli.py @@ -0,0 +1,95 @@ +""" +CLI for agentenv-mcp. + +Provides commands for running MCP servers and agents. +""" + +import argparse +import asyncio +import sys + +from .logging_config import get_logger, setup_logging + +logger = get_logger("cli") + + +def run_server(args: argparse.Namespace) -> None: + """Run an MCP server.""" + from .mcp_servers.directional import DirectionalMCPServer + + setup_logging() + logger.info(f"Starting server on {args.host}:{args.port}") + + server = DirectionalMCPServer() + server.run(transport="sse", host=args.host, port=args.port) + + +async def run_agent_async(args: argparse.Namespace) -> None: + """Run the agent.""" + from .agent import run_agent_demo + + setup_logging() + logger.info(f"Running agent against {args.server_url}") + + result = await run_agent_demo(args.server_url, num_steps=args.steps) + + print("\n=== Agent Demo Results ===") + print(f"Server: {result['server_url']}") + print(f"Tools discovered: {result['tools']}") + print(f"Total reward: {result['trajectory']['total_reward']:.2f}") + print(f"Steps taken: {len(result['trajectory']['actions'])}") + print("\nTrajectory:") + for i, (action, obs) in enumerate( + zip( + result["trajectory"]["actions"], + result["trajectory"]["observations"], + ) + ): + print(f" {i + 1}. {action}: {obs}") + + +def run_agent(args: argparse.Namespace) -> None: + """Run the agent (sync wrapper).""" + asyncio.run(run_agent_async(args)) + + +def main() -> None: + """Main CLI entry point.""" + parser = argparse.ArgumentParser( + description="AgentEnv MCP - MCP integration for AgentGym" + ) + subparsers = parser.add_subparsers(dest="command", help="Available commands") + + # Server command + server_parser = subparsers.add_parser("server", help="Run MCP server") + server_parser.add_argument( + "--host", default="0.0.0.0", help="Host to bind to (default: 0.0.0.0)" + ) + server_parser.add_argument( + "--port", type=int, default=8001, help="Port to bind to (default: 8001)" + ) + server_parser.set_defaults(func=run_server) + + # Agent command + agent_parser = subparsers.add_parser("agent", help="Run agent demo") + agent_parser.add_argument( + "--server-url", + default="http://localhost:8001/sse", + help="MCP server URL (default: http://localhost:8001/sse)", + ) + agent_parser.add_argument( + "--steps", type=int, default=10, help="Number of steps (default: 10)" + ) + agent_parser.set_defaults(func=run_agent) + + args = parser.parse_args() + + if args.command is None: + parser.print_help() + sys.exit(1) + + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/agentenv-mcp/agentenv_mcp/client.py b/agentenv-mcp/agentenv_mcp/client.py new file mode 100644 index 00000000..7c1bed3c --- /dev/null +++ b/agentenv-mcp/agentenv_mcp/client.py @@ -0,0 +1,117 @@ +""" +FastMCP Client for connecting to MCP servers. + +Uses FastMCP Client to connect to MCP servers and provides +a unified interface for the agent to interact with MCP tools. +""" + +import asyncio +from typing import Any + +from fastmcp import Client + +from .logging_config import get_logger + +logger = get_logger("client") + + +class MCPClient: + """ + Client for connecting to MCP servers using FastMCP. + + Handles connection management and tool execution. + """ + + def __init__(self, server_url: str): + """ + Initialize MCP client. + + Args: + server_url: URL of MCP server (e.g., "http://localhost:8001/sse") + """ + self.server_url = server_url + self.client = Client(server_url) + self._tools: dict[str, Any] = {} + self._connected = False + logger.info(f"Created MCPClient for {server_url}") + + async def connect(self) -> bool: + """ + Connect to the MCP server and discover available tools. + + Returns: + True if connection successful + """ + try: + async with self.client: + tools = await self.client.list_tools() + self._tools = {tool.name: tool for tool in tools} + self._connected = True + logger.info(f"Connected. Available tools: {list(self._tools.keys())}") + return True + except Exception as e: + logger.error(f"Connection failed: {e}") + return False + + async def list_tools(self) -> list[str]: + """ + Get list of available tools from the MCP server. + + Returns: + List of tool names + """ + async with self.client: + tools = await self.client.list_tools() + return [tool.name for tool in tools] + + async def call_tool( + self, tool_name: str, arguments: dict[str, Any] | None = None + ) -> str: + """ + Execute a tool on the MCP server. + + Args: + tool_name: Name of the tool to call + arguments: Arguments to pass to the tool + + Returns: + Tool execution result as string + """ + try: + async with self.client: + result = await self.client.call_tool(tool_name, arguments or {}) + # Extract text content from result + if hasattr(result, "content") and result.content: + if isinstance(result.content, list): + return " ".join( + getattr(item, "text", str(item)) for item in result.content + ) + return str(result.content) + return str(result) + except Exception as e: + logger.error(f"Tool call failed: {tool_name} - {e}") + return f"Error: {e}" + + def call_tool_sync( + self, tool_name: str, arguments: dict[str, Any] | None = None + ) -> str: + """ + Synchronous wrapper for call_tool. + + Args: + tool_name: Name of the tool to call + arguments: Arguments to pass to the tool + + Returns: + Tool execution result as string + """ + return asyncio.run(self.call_tool(tool_name, arguments)) + + def list_tools_sync(self) -> list[str]: + """ + Synchronous wrapper for list_tools. + + Returns: + List of tool names + """ + return asyncio.run(self.list_tools()) diff --git a/agentenv-mcp/agentenv_mcp/environment.py b/agentenv-mcp/agentenv_mcp/environment.py new file mode 100644 index 00000000..6456d2f7 --- /dev/null +++ b/agentenv-mcp/agentenv_mcp/environment.py @@ -0,0 +1,145 @@ +""" +MCP Environment wrapper for RL training. + +Provides a Gym-like interface for interacting with MCP servers. +""" + +import asyncio +from typing import Any + +from .client import MCPClient +from .logging_config import get_logger + +logger = get_logger("environment") + + +class MCPEnvironment: + """ + Gym-like environment wrapper for MCP servers. + + Uses FastMCP client to interact with MCP servers. + Actions = tool calls, Observations = tool results. + """ + + def __init__( + self, + server_url: str, + task_description: str = "Interact with MCP tools", + max_steps: int = 50, + ): + """ + Initialize the MCP environment. + + Args: + server_url: URL of the MCP server + task_description: Description of the task + max_steps: Maximum steps per episode + """ + self.server_url = server_url + self.client = MCPClient(server_url) + self.task_description = task_description + self.max_steps = max_steps + + self._tools: list[str] = [] + self._step_count = 0 + self._total_reward = 0.0 + self._done = False + self._history: list[dict[str, Any]] = [] + + logger.info(f"Created MCPEnvironment for {server_url}") + + async def _discover_tools(self) -> list[str]: + """Discover available tools from server.""" + self._tools = await self.client.list_tools() + return self._tools + + @property + def action_space(self) -> list[str]: + """Get available actions (tools).""" + return self._tools + + async def reset(self) -> str: + """ + Reset the environment. + + Returns: + Initial observation + """ + self._step_count = 0 + self._total_reward = 0.0 + self._done = False + self._history = [] + + # Discover tools + await self._discover_tools() + + # Call reset tool if available + if "reset" in self._tools: + await self.client.call_tool("reset") + + obs = f"Task: {self.task_description}\n" + obs += f"Available actions: {self._tools}" + + logger.info(f"Environment reset. Tools: {self._tools}") + return obs + + async def step(self, action: str) -> dict[str, Any]: + """ + Execute an action in the environment. + + Args: + action: Tool name to execute + + Returns: + Dict with observation, reward, done, info + """ + if self._done: + return { + "observation": "Episode complete. Call reset().", + "reward": 0.0, + "done": True, + "info": {"step": self._step_count}, + } + + self._step_count += 1 + + # Execute the tool + observation = await self.client.call_tool(action) + + # Calculate reward + reward = 0.1 if not observation.startswith("Error") else -0.1 + self._total_reward += reward + + # Check termination + if self._step_count >= self.max_steps: + self._done = True + + # Record history + self._history.append( + { + "step": self._step_count, + "action": action, + "observation": observation, + "reward": reward, + } + ) + + logger.debug(f"Step {self._step_count}: {action} -> reward={reward}") + + return { + "observation": observation, + "reward": reward, + "done": self._done, + "info": { + "step": self._step_count, + "total_reward": self._total_reward, + }, + } + + def reset_sync(self) -> str: + """Synchronous reset.""" + return asyncio.run(self.reset()) + + def step_sync(self, action: str) -> dict[str, Any]: + """Synchronous step.""" + return asyncio.run(self.step(action)) diff --git a/agentenv-mcp/agentenv_mcp/logging_config.py b/agentenv-mcp/agentenv_mcp/logging_config.py new file mode 100644 index 00000000..8f9f2b1b --- /dev/null +++ b/agentenv-mcp/agentenv_mcp/logging_config.py @@ -0,0 +1,69 @@ +""" +Centralized logging configuration for agentenv-mcp. + +Provides consistent logging across all modules with configurable levels and formats. +""" + +import logging +import sys +from typing import Optional + + +def setup_logging( + level: int = logging.INFO, + format_string: Optional[str] = None, + log_file: Optional[str] = None, +) -> logging.Logger: + """ + Configure logging for the agentenv-mcp package. + + Args: + level: Logging level (default: INFO) + format_string: Custom format string (optional) + log_file: Path to log file (optional, logs to stderr by default) + + Returns: + Root logger for agentenv_mcp + """ + if format_string is None: + format_string = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + + # Create formatter + formatter = logging.Formatter(format_string) + + # Get the root logger for our package + logger = logging.getLogger("agentenv_mcp") + logger.setLevel(level) + + # Clear existing handlers + logger.handlers.clear() + + # Console handler + console_handler = logging.StreamHandler(sys.stderr) + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + + # File handler (optional) + if log_file: + file_handler = logging.FileHandler(log_file) + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + + return logger + + +def get_logger(name: str) -> logging.Logger: + """ + Get a logger for a specific module. + + Args: + name: Module name (will be prefixed with agentenv_mcp) + + Returns: + Logger instance + """ + return logging.getLogger(f"agentenv_mcp.{name}") + + +# Initialize default logging +_root_logger = setup_logging() diff --git a/agentenv-mcp/agentenv_mcp/mcp_servers/__init__.py b/agentenv-mcp/agentenv_mcp/mcp_servers/__init__.py new file mode 100644 index 00000000..4367c4d2 --- /dev/null +++ b/agentenv-mcp/agentenv_mcp/mcp_servers/__init__.py @@ -0,0 +1,15 @@ +""" +MCP Servers submodule. + +Contains base classes and implementations for MCP servers using FastMCP. +Each server exposes tools that can be used as actions in an RL environment. +""" + +from .base import BaseMCPServer, MCPServerState +from .directional import DirectionalMCPServer + +__all__ = [ + "BaseMCPServer", + "MCPServerState", + "DirectionalMCPServer", +] diff --git a/agentenv-mcp/agentenv_mcp/mcp_servers/base.py b/agentenv-mcp/agentenv_mcp/mcp_servers/base.py new file mode 100644 index 00000000..8bfb5b17 --- /dev/null +++ b/agentenv-mcp/agentenv_mcp/mcp_servers/base.py @@ -0,0 +1,95 @@ +""" +Base classes for MCP server implementations. + +Provides abstract base classes for creating extensible MCP servers with FastMCP. +""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from typing import Any + +from fastmcp import FastMCP + +from ..logging_config import get_logger + +logger = get_logger("mcp_servers.base") + + +@dataclass +class MCPServerState: + """ + Base state class for MCP servers. + + Subclass to define environment-specific state. + """ + + step_count: int = 0 + history: list = field(default_factory=list) + + def reset(self) -> None: + """Reset state to initial values.""" + self.step_count = 0 + self.history = [] + + def get_observation(self) -> str: + """Get current observation string.""" + return f"Step: {self.step_count}" + + def to_dict(self) -> dict[str, Any]: + """Convert state to dictionary.""" + return {"step_count": self.step_count, "history": self.history} + + +class BaseMCPServer(ABC): + """ + Abstract base class for MCP servers. + + Provides a template for creating FastMCP servers with tools. + Subclass and implement register_tools() to add environment-specific tools. + """ + + def __init__(self, name: str, state: MCPServerState | None = None): + """ + Initialize the MCP server. + + Args: + name: Server name for FastMCP + state: Initial state (creates default if None) + """ + self.name = name + self.mcp = FastMCP(name) + self.state = state or self._create_default_state() + self._register_tools() + logger.info(f"Initialized MCP server: {name}") + + @abstractmethod + def _create_default_state(self) -> MCPServerState: + """Create the default state for this server type.""" + pass + + @abstractmethod + def _register_tools(self) -> None: + """Register tools with the FastMCP server.""" + pass + + def reset(self) -> str: + """Reset the environment state.""" + self.state.reset() + logger.info(f"Server {self.name} reset") + return self.state.get_observation() + + def get_observation(self) -> str: + """Get current observation.""" + return self.state.get_observation() + + def run(self, transport: str = "sse", host: str = "0.0.0.0", port: int = 8001): + """ + Run the MCP server. + + Args: + transport: Transport type ("sse" or "stdio") + host: Host to bind to + port: Port to bind to + """ + logger.info(f"Starting {self.name} on {host}:{port} ({transport})") + self.mcp.run(transport=transport, host=host, port=port) diff --git a/agentenv-mcp/agentenv_mcp/mcp_servers/directional.py b/agentenv-mcp/agentenv_mcp/mcp_servers/directional.py new file mode 100644 index 00000000..02f85920 --- /dev/null +++ b/agentenv-mcp/agentenv_mcp/mcp_servers/directional.py @@ -0,0 +1,128 @@ +""" +Directional Navigation MCP Server. + +A simple 2D grid navigation environment with up/down/left/right movements. +Demonstrates the MCP server pattern for RL environments. +""" + +from dataclasses import dataclass, field + +from ..logging_config import get_logger +from .base import BaseMCPServer, MCPServerState + +logger = get_logger("mcp_servers.directional") + + +@dataclass +class DirectionalState(MCPServerState): + """State for directional navigation environment.""" + + x: int = 0 + y: int = 0 + history: list = field(default_factory=list) + step_count: int = 0 + + def reset(self) -> None: + """Reset to origin.""" + self.x = 0 + self.y = 0 + self.history = [] + self.step_count = 0 + + def get_observation(self) -> str: + """Get current position observation.""" + return f"Position: ({self.x}, {self.y}), Steps: {self.step_count}" + + +class DirectionalMCPServer(BaseMCPServer): + """ + MCP server for 2D directional navigation. + + Tools: up, down, left, right, get_position, reset + """ + + def __init__(self, name: str = "Directional Navigation"): + super().__init__(name) + + def _create_default_state(self) -> DirectionalState: + return DirectionalState() + + def _register_tools(self) -> None: + """Register directional movement tools.""" + + @self.mcp.tool() + def up() -> str: + """Move up (increase y by 1).""" + self.state.y += 1 + self.state.step_count += 1 + self.state.history.append("up") + obs = self.state.get_observation() + logger.debug(f"up -> {obs}") + return f"Moved up. {obs}" + + @self.mcp.tool() + def down() -> str: + """Move down (decrease y by 1).""" + self.state.y -= 1 + self.state.step_count += 1 + self.state.history.append("down") + obs = self.state.get_observation() + logger.debug(f"down -> {obs}") + return f"Moved down. {obs}" + + @self.mcp.tool() + def left() -> str: + """Move left (decrease x by 1).""" + self.state.x -= 1 + self.state.step_count += 1 + self.state.history.append("left") + obs = self.state.get_observation() + logger.debug(f"left -> {obs}") + return f"Moved left. {obs}" + + @self.mcp.tool() + def right() -> str: + """Move right (increase x by 1).""" + self.state.x += 1 + self.state.step_count += 1 + self.state.history.append("right") + obs = self.state.get_observation() + logger.debug(f"right -> {obs}") + return f"Moved right. {obs}" + + @self.mcp.tool() + def get_position() -> str: + """Get current position without moving.""" + return self.state.get_observation() + + @self.mcp.tool() + def reset() -> str: + """Reset environment to initial state.""" + self.state.reset() + logger.info("Environment reset") + return f"Reset complete. {self.state.get_observation()}" + + +def create_server(host: str = "0.0.0.0", port: int = 8001) -> DirectionalMCPServer: + """ + Create and configure a directional MCP server. + + Args: + host: Host to bind to + port: Port to bind to + + Returns: + Configured DirectionalMCPServer instance + """ + server = DirectionalMCPServer() + return server + + +if __name__ == "__main__": + import os + + port = int(os.environ.get("MCP_PORT", 8001)) + host = os.environ.get("MCP_HOST", "0.0.0.0") + + server = create_server(host, port) + server.run(transport="sse", host=host, port=port) diff --git a/agentenv-mcp/agentenv_mcp/rewards/__init__.py b/agentenv-mcp/agentenv_mcp/rewards/__init__.py new file mode 100644 index 00000000..60c5ac6f --- /dev/null +++ b/agentenv-mcp/agentenv_mcp/rewards/__init__.py @@ -0,0 +1,9 @@ +""" +Reward calculation module. + +Provides extensible reward calculators for MCP environments. +""" + +from .base import DefaultRewardCalculator, RewardCalculator + +__all__ = ["RewardCalculator", "DefaultRewardCalculator"] diff --git a/agentenv-mcp/agentenv_mcp/rewards/base.py b/agentenv-mcp/agentenv_mcp/rewards/base.py new file mode 100644 index 00000000..eded0035 --- /dev/null +++ b/agentenv-mcp/agentenv_mcp/rewards/base.py @@ -0,0 +1,66 @@ +""" +Base reward calculator classes. +""" + +from abc import ABC, abstractmethod +from typing import Any + +from ..logging_config import get_logger + +logger = get_logger("rewards") + + +class RewardCalculator(ABC): + """Abstract base class for reward calculators.""" + + @abstractmethod + def calculate(self, action: str, observation: str, state: dict[str, Any]) -> float: + """ + Calculate reward for a step. + + Args: + action: Action taken + observation: Resulting observation + state: Current environment state + + Returns: + Reward value + """ + pass + + @abstractmethod + def reset(self) -> None: + """Reset calculator state.""" + pass + + +class DefaultRewardCalculator(RewardCalculator): + """ + Default reward calculator. + + Provides small positive reward for valid actions, negative for errors. + """ + + def __init__(self, step_reward: float = 0.1, error_penalty: float = -0.1): + self.step_reward = step_reward + self.error_penalty = error_penalty + self._total = 0.0 + + def calculate(self, action: str, observation: str, state: dict[str, Any]) -> float: + """Calculate reward based on observation.""" + if observation.startswith("Error"): + reward = self.error_penalty + else: + reward = self.step_reward + + self._total += reward + return reward + + def reset(self) -> None: + """Reset accumulated reward.""" + self._total = 0.0 + + @property + def total_reward(self) -> float: + """Get total accumulated reward.""" + return self._total diff --git a/agentenv-mcp/pyproject.toml b/agentenv-mcp/pyproject.toml new file mode 100644 index 00000000..761e846a --- /dev/null +++ b/agentenv-mcp/pyproject.toml @@ -0,0 +1,32 @@ +[project] +name = "agentenv-mcp" +version = "0.1.0" +description = "AgentGym environment with MCP (Model Context Protocol) integration using FastMCP" +authors = [{ name = "AgentGym Contributors" }] +dependencies = [ + "fastapi>=0.104.0", + "uvicorn>=0.24.0", + "pydantic>=2.0.0", + "fastmcp>=2.0.0", +] +requires-python = ">=3.10" +readme = "README.md" +license = { text = "MIT" } + +[project.scripts] +agentenv-mcp = "agentenv_mcp.cli:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.ruff] +line-length = 88 +target-version = "py310" + +[tool.ruff.lint] +select = ["E", "F", "I", "W"] +ignore = ["E501"] + +[dependency-groups] +dev = ["pytest>=7.4.0", "ruff>=0.1.0"] diff --git a/agentenv-mcp/test_e2e.py b/agentenv-mcp/test_e2e.py new file mode 100644 index 00000000..75407f9c --- /dev/null +++ b/agentenv-mcp/test_e2e.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +""" +End-to-end test for agentenv-mcp. + +Demonstrates the complete workflow: +1. Start MCP server (directional navigation) +2. Connect agent via FastMCP client +3. Discover tools and execute actions +4. Record trajectory and report results +""" + +import asyncio +import subprocess +import sys +import time + +# Add parent to path for imports +sys.path.insert(0, ".") + +from agentenv_mcp import MCPAgent, get_logger, setup_logging + +logger = get_logger("test_e2e") + + +async def run_e2e_test(server_url: str, num_steps: int = 10) -> dict: + """ + Run end-to-end test with the agent. + + Args: + server_url: URL of MCP server + num_steps: Number of steps to run + + Returns: + Test results + """ + logger.info(f"Starting E2E test with {num_steps} steps") + + agent = MCPAgent(server_url, max_steps=num_steps) + + # Discover available tools + tools = await agent.discover_tools() + logger.info(f"Discovered tools: {tools}") + + if not tools: + return {"success": False, "error": "No tools discovered"} + + # Reset environment + reset_obs = await agent.reset() + logger.info(f"Reset: {reset_obs}") + + # Run episode + trajectory = await agent.run_episode(policy="random") + + results = { + "success": True, + "tools": tools, + "trajectory": trajectory.to_dict(), + "steps_taken": len(trajectory.actions), + "total_reward": trajectory.total_reward(), + } + + logger.info( + f"E2E test complete: {results['steps_taken']} steps, reward={results['total_reward']:.2f}" + ) + return results + + +def start_server(port: int = 8001) -> subprocess.Popen: + """Start the MCP server in background.""" + logger.info(f"Starting MCP server on port {port}") + proc = subprocess.Popen( + [sys.executable, "-m", "agentenv_mcp.mcp_servers.directional"], + env={**subprocess.os.environ, "MCP_PORT": str(port)}, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + time.sleep(2) # Wait for server to start + return proc + + +def main(): + """Run the E2E test.""" + setup_logging() + logger.info("=" * 50) + logger.info("AgentEnv MCP End-to-End Test") + logger.info("=" * 50) + + port = 8001 + server_url = f"http://localhost:{port}/sse" + + # Start server + server_proc = start_server(port) + + try: + # Run test + results = asyncio.run(run_e2e_test(server_url, num_steps=10)) + + if results["success"]: + print("\n" + "=" * 50) + print("E2E TEST PASSED") + print("=" * 50) + print(f"Tools: {results['tools']}") + print(f"Steps: {results['steps_taken']}") + print(f"Total Reward: {results['total_reward']:.2f}") + print("\nTrajectory:") + for i, (action, obs) in enumerate( + zip( + results["trajectory"]["actions"], + results["trajectory"]["observations"], + ) + ): + print(f" {i + 1}. {action}: {obs[:60]}...") + else: + print(f"\nE2E TEST FAILED: {results.get('error', 'Unknown error')}") + sys.exit(1) + + finally: + # Cleanup + logger.info("Stopping server...") + server_proc.terminate() + server_proc.wait(timeout=5) + + print("\n✓ All tests passed!") + + +if __name__ == "__main__": + main()