diff --git a/python/packages/autogen-ext/src/autogen_ext/tools/opa/README.md b/python/packages/autogen-ext/src/autogen_ext/tools/opa/README.md new file mode 100644 index 000000000000..84c6bc3d6beb --- /dev/null +++ b/python/packages/autogen-ext/src/autogen_ext/tools/opa/README.md @@ -0,0 +1,55 @@ +# autogen OPA Tool Authorization + +Open Policy Agent (OPA) authorization for AutoGen tool calls and agent handoffs. + +## Overview + +`autogen_ext.tools.opa` wraps any `BaseTool` — including agent handoff tools +(`transfer_to_`) — and evaluates every call against an OPA policy +**before** execution. Zero changes to `autogen-core` or `autogen-agentchat`. + +``` +LLM → AssistantAgent → OPAAuthorizedTool.run_json() + ↓ + POST /v1/data/autogen/tools/allow + ↓ + {"result": true} → inner_tool.run_json() + {"result": false} → OPAAuthorizationError +``` + +## Quick Start + +```python +from autogen_agentchat.agents import AssistantAgent +from autogen_core.tools import FunctionTool +from autogen_ext.tools.opa import opa_authorize_tools + +def web_search(query: str) -> str: ... +def delete_file(path: str) -> str: ... + +agent = AssistantAgent( + name="PlannerAgent", + model_client=..., + tools=opa_authorize_tools( + [FunctionTool(web_search, ...), FunctionTool(delete_file, ...)], + opa_url="http://localhost:8181", + context={"user": "alice", "role": "analyst"}, + ), +) +``` + +## Behavior When OPA Is Unreachable + +| `fail_open` | OPA down | Result | +|---|---|---| +| `False` (default) | OPA down | `OPAConnectionError` raised | +| `True` | OPA down | Tool call proceeds (warning logged) | + +## Loading the Sample Policy + +```bash +opa run --server & +curl -X PUT http://localhost:8181/v1/policies/autogen_tools \ + -H "Content-Type: text/plain" \ + --data-binary @policies/autogen_tools.rego +``` diff --git a/python/packages/autogen-ext/src/autogen_ext/tools/opa/__init__.py b/python/packages/autogen-ext/src/autogen_ext/tools/opa/__init__.py new file mode 100644 index 000000000000..25acaf42375a --- /dev/null +++ b/python/packages/autogen-ext/src/autogen_ext/tools/opa/__init__.py @@ -0,0 +1,11 @@ +"""autogen-opa: Open Policy Agent authorization for AutoGen tool calls and agent handoffs.""" + +from ._exceptions import OPAAuthorizationError, OPAConnectionError +from ._opa_tool import OPAAuthorizedTool, opa_authorize_tools + +__all__ = [ + "OPAAuthorizedTool", + "opa_authorize_tools", + "OPAAuthorizationError", + "OPAConnectionError", +] diff --git a/python/packages/autogen-ext/src/autogen_ext/tools/opa/_exceptions.py b/python/packages/autogen-ext/src/autogen_ext/tools/opa/_exceptions.py new file mode 100644 index 000000000000..770afa0eecdf --- /dev/null +++ b/python/packages/autogen-ext/src/autogen_ext/tools/opa/_exceptions.py @@ -0,0 +1,40 @@ +"""Exceptions for autogen OPA authorization.""" + + +class OPAAuthorizationError(PermissionError): + """Raised when OPA denies a tool call. + + Attributes: + tool_name: The name of the tool that was denied. + policy_reason: Optional human-readable reason returned by the OPA policy. + """ + + def __init__( + self, + tool_name: str, + reason: str = "OPA policy denied the request", + policy_reason: str | None = None, + ) -> None: + self.tool_name = tool_name + self.policy_reason = policy_reason + message = f"Tool '{tool_name}' denied by OPA policy: {reason}" + if policy_reason: + message += f" (policy reason: {policy_reason})" + super().__init__(message) + + +class OPAConnectionError(RuntimeError): + """Raised when the OPA server cannot be reached and fail_open=False. + + Attributes: + opa_url: The OPA server URL that could not be reached. + cause: The underlying exception, if any. + """ + + def __init__(self, opa_url: str, cause: Exception | None = None) -> None: + self.opa_url = opa_url + self.cause = cause + message = f"Cannot connect to OPA server at {opa_url}" + if cause: + message += f": {cause}" + super().__init__(message) diff --git a/python/packages/autogen-ext/src/autogen_ext/tools/opa/_opa_tool.py b/python/packages/autogen-ext/src/autogen_ext/tools/opa/_opa_tool.py new file mode 100644 index 000000000000..5e118107af10 --- /dev/null +++ b/python/packages/autogen-ext/src/autogen_ext/tools/opa/_opa_tool.py @@ -0,0 +1,207 @@ +"""OPA-authorized tool wrapper for AutoGen BaseTool.""" + +from __future__ import annotations + +import logging +from typing import Any, Mapping, Sequence + +import httpx +from autogen_core import CancellationToken +from autogen_core.tools import BaseTool + +from ._exceptions import OPAAuthorizationError, OPAConnectionError + +logger = logging.getLogger(__name__) + +_DEFAULT_OPA_TIMEOUT = 5.0 + + +class OPAAuthorizedTool(BaseTool[Any, Any]): + """A :class:`~autogen_core.tools.BaseTool` wrapper that intercepts every tool call + and evaluates it against an Open Policy Agent (OPA) policy before execution. + + Works transparently for both regular tool calls and agent-to-agent handoff tools + (``transfer_to_``), since both ultimately call ``run_json()``. + + Example usage:: + + from autogen_ext.tools.opa import opa_authorize_tools + + authorized_tools = opa_authorize_tools( + [search_tool, delete_tool], + opa_url="http://localhost:8181", + context={"user": "alice", "role": "analyst"}, + ) + agent = AssistantAgent(name="PlannerAgent", tools=authorized_tools, ...) + + OPA request body for every call:: + + { + "input": { + "tool": "", + "args": { ... }, + "context": { "user": "...", "role": "...", ... } + } + } + + OPA must return ``{"result": true}`` to permit the call. + """ + + def __init__( + self, + inner_tool: BaseTool[Any, Any], + *, + opa_url: str = "http://localhost:8181", + policy_path: str = "v1/data/autogen/tools/allow", + context: dict[str, Any] | None = None, + fail_open: bool = False, + timeout: float = _DEFAULT_OPA_TIMEOUT, + http_client: httpx.AsyncClient | None = None, + ) -> None: + """ + Args: + inner_tool: The real :class:`~autogen_core.tools.BaseTool` to execute when + authorization succeeds. + opa_url: Base URL of the OPA server, e.g. ``http://localhost:8181``. + policy_path: OPA REST API path for the policy rule, e.g. + ``v1/data/autogen/tools/allow``. + context: Arbitrary key/value pairs forwarded to OPA as ``input.context``. + Typical keys: ``user``, ``role``, ``agent_name``, ``session_id``. + fail_open: If ``True``, allow the tool call when OPA is unreachable. + If ``False`` (default), deny and raise :class:`OPAConnectionError`. + timeout: HTTP timeout in seconds for OPA requests (default: 5). + http_client: Optional pre-configured :class:`httpx.AsyncClient`. + Primarily useful for testing. + """ + super().__init__( + args_type=inner_tool.args_type, + return_type=inner_tool.return_type, + name=inner_tool.name, + description=inner_tool.description, + ) + self._inner = inner_tool + self._opa_url = opa_url.rstrip("/") + self._policy_path = policy_path.lstrip("/") + self._context: dict[str, Any] = context or {} + self._fail_open = fail_open + self._timeout = timeout + self._http_client = http_client + + async def _query_opa(self, tool_name: str, args: Mapping[str, Any]) -> bool: + """Send a policy query to OPA and return True if the call is allowed.""" + endpoint = f"{self._opa_url}/{self._policy_path}" + payload: dict[str, Any] = { + "input": { + "tool": tool_name, + "args": dict(args), + "context": self._context, + } + } + logger.debug("OPA query: POST %s payload=%s", endpoint, payload) + + try: + if self._http_client is not None: + response = await self._http_client.post(endpoint, json=payload, timeout=self._timeout) + else: + async with httpx.AsyncClient() as client: + response = await client.post(endpoint, json=payload, timeout=self._timeout) + + response.raise_for_status() + data = response.json() + allowed: bool = bool(data.get("result", False)) + logger.debug("OPA response: allowed=%s data=%s", allowed, data) + return allowed + + except (httpx.ConnectError, httpx.TimeoutException, httpx.NetworkError) as exc: + if self._fail_open: + logger.warning( + "OPA unreachable at %s — fail_open=True, allowing tool '%s': %s", + self._opa_url, tool_name, exc, + ) + return True + raise OPAConnectionError(self._opa_url, cause=exc) from exc + + except httpx.HTTPStatusError as exc: + logger.error("OPA returned HTTP error: %s", exc) + if self._fail_open: + return True + raise OPAConnectionError(self._opa_url, cause=exc) from exc + + async def run_json( + self, + args: Mapping[str, Any], + cancellation_token: CancellationToken, + call_id: str | None = None, + ) -> Any: + """Intercept the tool call, evaluate against OPA, then delegate to the inner tool. + + This is the single dispatch point for *all* tool calls, including + agent-to-agent handoff tools (``transfer_to_``). + """ + allowed = await self._query_opa(self.name, args) + + if not allowed: + logger.info("OPA denied tool call: tool=%s args=%s", self.name, args) + raise OPAAuthorizationError( + tool_name=self.name, + reason="OPA policy evaluation returned false", + ) + + logger.debug("OPA allowed tool call: tool=%s", self.name) + return await self._inner.run_json(args, cancellation_token, call_id=call_id) + + async def run(self, args: Any, cancellation_token: CancellationToken) -> Any: + """Satisfy the abstract method requirement — delegation handled by run_json.""" + return await self._inner.run(args, cancellation_token) + + +def opa_authorize_tools( + tools: Sequence[BaseTool[Any, Any]], + *, + opa_url: str = "http://localhost:8181", + policy_path: str = "v1/data/autogen/tools/allow", + context: dict[str, Any] | None = None, + fail_open: bool = False, + timeout: float = _DEFAULT_OPA_TIMEOUT, + http_client: httpx.AsyncClient | None = None, +) -> list[OPAAuthorizedTool]: + """Wrap a list of tools with OPA authorization. + + This is the recommended entry point for most users:: + + from autogen_ext.tools.opa import opa_authorize_tools + + agent = AssistantAgent( + name="Planner", + tools=opa_authorize_tools( + [search_tool, calculator_tool], + opa_url="http://opa.internal:8181", + context={"user": "bob", "role": "analyst"}, + ), + ) + + Args: + tools: Any sequence of :class:`~autogen_core.tools.BaseTool` instances, + including handoff tools. + opa_url: OPA server base URL. + policy_path: OPA REST API policy path. + context: Key/value pairs forwarded as ``input.context`` in every OPA query. + fail_open: Allow tool calls when OPA is unreachable (default: False = deny). + timeout: HTTP timeout in seconds for OPA requests. + http_client: Optional shared :class:`httpx.AsyncClient` (useful for testing). + + Returns: + A list of :class:`OPAAuthorizedTool` instances, one per input tool. + """ + return [ + OPAAuthorizedTool( + inner_tool=tool, + opa_url=opa_url, + policy_path=policy_path, + context=context, + fail_open=fail_open, + timeout=timeout, + http_client=http_client, + ) + for tool in tools + ] diff --git a/python/packages/autogen-ext/src/autogen_ext/tools/opa/policies/autogen_tools.rego b/python/packages/autogen-ext/src/autogen_ext/tools/opa/policies/autogen_tools.rego new file mode 100644 index 000000000000..981d7e9b3a1a --- /dev/null +++ b/python/packages/autogen-ext/src/autogen_ext/tools/opa/policies/autogen_tools.rego @@ -0,0 +1,83 @@ +package autogen.tools + +import future.keywords.if +import future.keywords.in + +# --------------------------------------------------------------------------- +# Default: deny everything unless explicitly allowed. +# --------------------------------------------------------------------------- +default allow := false +default reason := "no matching allow rule" + +# --------------------------------------------------------------------------- +# Read-only tools — allowed for any authenticated user +# --------------------------------------------------------------------------- +read_only_tools := { + "web_search", + "read_file", + "list_directory", + "get_weather", + "fetch_url", + "calculator", +} + +allow if { + input.tool in read_only_tools + input.context.user != "" +} + +reason := "read-only tool allowed for authenticated user" if { + input.tool in read_only_tools + input.context.user != "" +} + +# --------------------------------------------------------------------------- +# Destructive / privileged tools — admin role only +# --------------------------------------------------------------------------- +destructive_tools := { + "delete_file", + "execute_code", + "write_file", + "run_shell", +} + +allow if { + input.tool in destructive_tools + input.context.role == "admin" +} + +reason := "destructive tool allowed for admin" if { + input.tool in destructive_tools + input.context.role == "admin" +} + +# --------------------------------------------------------------------------- +# Argument-level constraint: delete_file restricted to /tmp/ +# --------------------------------------------------------------------------- +allow if { + input.tool == "delete_file" + input.context.role == "admin" + startswith(input.args.path, "/tmp/") +} + +# --------------------------------------------------------------------------- +# Agent handoff tools — whitelist of permitted target agents +# --------------------------------------------------------------------------- +allowed_handoff_targets := { + "CoderAgent", + "ReviewerAgent", + "PlannerAgent", + "SummaryAgent", +} + +allow if { + startswith(input.tool, "transfer_to_") + target := substring(input.tool, count("transfer_to_"), -1) + target in allowed_handoff_targets +} + +reason := "handoff allowed to whitelisted agent" if { + startswith(input.tool, "transfer_to_") + target := substring(input.tool, count("transfer_to_"), -1) + target in allowed_handoff_targets +} diff --git a/python/packages/autogen-ext/tests/tools/opa/__init__.py b/python/packages/autogen-ext/tests/tools/opa/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/python/packages/autogen-ext/tests/tools/opa/test_opa_tool.py b/python/packages/autogen-ext/tests/tools/opa/test_opa_tool.py new file mode 100644 index 000000000000..5e968d586677 --- /dev/null +++ b/python/packages/autogen-ext/tests/tools/opa/test_opa_tool.py @@ -0,0 +1,175 @@ +"""Unit tests for OPAAuthorizedTool — no real OPA server required.""" + +from __future__ import annotations + +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +import httpx +import pytest + +from autogen_core import CancellationToken +from autogen_core.tools import BaseTool + +from autogen_ext.tools.opa import OPAAuthorizedTool, opa_authorize_tools +from autogen_ext.tools.opa._exceptions import OPAAuthorizationError, OPAConnectionError + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +class _EchoTool(BaseTool[Any, Any]): + """Minimal BaseTool that echoes its args.""" + + def __init__(self, name: str = "echo_tool") -> None: + super().__init__( + args_type=dict, + return_type=dict, + name=name, + description="Echo args back", + ) + + async def run(self, args: Any, cancellation_token: CancellationToken) -> Any: + return {"echoed": dict(args)} + + +def _opa_response(allowed: bool) -> MagicMock: + resp = MagicMock(spec=httpx.Response) + resp.status_code = 200 + resp.json.return_value = {"result": allowed} + resp.raise_for_status = MagicMock() + return resp + + +def _make_tool( + inner: BaseTool | None = None, + *, + allowed: bool = True, + fail_open: bool = False, + connect_error: bool = False, + name: str = "echo_tool", +) -> tuple[OPAAuthorizedTool, AsyncMock]: + inner = inner or _EchoTool(name=name) + mock_client = AsyncMock(spec=httpx.AsyncClient) + if connect_error: + mock_client.post.side_effect = httpx.ConnectError("refused") + else: + mock_client.post.return_value = _opa_response(allowed) + tool = OPAAuthorizedTool( + inner_tool=inner, + opa_url="http://opa-test:8181", + context={"user": "alice", "role": "analyst"}, + fail_open=fail_open, + http_client=mock_client, + ) + return tool, mock_client + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_allowed_executes_inner_tool() -> None: + tool, mock_client = _make_tool(allowed=True) + result = await tool.run_json({"x": 1}, CancellationToken()) + assert result == {"echoed": {"x": 1}} + mock_client.post.assert_called_once() + + +@pytest.mark.asyncio +async def test_denied_raises_authorization_error() -> None: + inner = _EchoTool() + inner.run = AsyncMock() # must NOT be called + tool, _ = _make_tool(inner=inner, allowed=False) + with pytest.raises(OPAAuthorizationError) as exc_info: + await tool.run_json({"x": 1}, CancellationToken()) + assert exc_info.value.tool_name == "echo_tool" + inner.run.assert_not_called() + + +@pytest.mark.asyncio +async def test_fail_open_true_allows_when_opa_down() -> None: + tool, _ = _make_tool(fail_open=True, connect_error=True) + result = await tool.run_json({"x": 99}, CancellationToken()) + assert result == {"echoed": {"x": 99}} + + +@pytest.mark.asyncio +async def test_fail_open_false_raises_when_opa_down() -> None: + tool, _ = _make_tool(fail_open=False, connect_error=True) + with pytest.raises(OPAConnectionError) as exc_info: + await tool.run_json({"x": 1}, CancellationToken()) + assert "opa-test" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_opa_authorize_tools_wraps_list() -> None: + tools = [_EchoTool("a"), _EchoTool("b"), _EchoTool("c")] + authorized = opa_authorize_tools(tools, opa_url="http://opa:8181") + assert len(authorized) == 3 + assert all(isinstance(t, OPAAuthorizedTool) for t in authorized) + assert [t.name for t in authorized] == ["a", "b", "c"] + assert authorized[0]._inner is tools[0] + + +@pytest.mark.asyncio +async def test_handoff_tool_intercepted() -> None: + handoff = _EchoTool(name="transfer_to_CoderAgent") + tool, mock_client = _make_tool(inner=handoff, allowed=True, name="transfer_to_CoderAgent") + await tool.run_json({}, CancellationToken()) + payload = mock_client.post.call_args.kwargs["json"] + assert payload["input"]["tool"] == "transfer_to_CoderAgent" + + +@pytest.mark.asyncio +async def test_handoff_denied_by_opa() -> None: + handoff = _EchoTool(name="transfer_to_EvilAgent") + tool, _ = _make_tool(inner=handoff, allowed=False, name="transfer_to_EvilAgent") + with pytest.raises(OPAAuthorizationError) as exc_info: + await tool.run_json({}, CancellationToken()) + assert exc_info.value.tool_name == "transfer_to_EvilAgent" + + +@pytest.mark.asyncio +async def test_opa_payload_structure() -> None: + tool, mock_client = _make_tool(allowed=True, name="read_file") + await tool.run_json({"path": "/tmp/report.txt"}, CancellationToken()) + payload = mock_client.post.call_args.kwargs["json"] + assert payload == { + "input": { + "tool": "read_file", + "args": {"path": "/tmp/report.txt"}, + "context": {"user": "alice", "role": "analyst"}, + } + } + + +@pytest.mark.asyncio +async def test_call_id_forwarded() -> None: + inner = _EchoTool() + inner.run_json = AsyncMock(return_value={"echoed": {}}) + tool, mock_client = _make_tool(inner=inner, allowed=True) + await tool.run_json({}, CancellationToken(), call_id="call-abc123") + inner.run_json.assert_called_once() + _, _, kwargs = inner.run_json.call_args[0], inner.run_json.call_args[1], inner.run_json.call_args[1] + assert inner.run_json.call_args[1].get("call_id") == "call-abc123" or inner.run_json.call_args[0][2] == "call-abc123" + + +def test_exception_messages() -> None: + err = OPAAuthorizationError("delete_file", reason="policy denied") + assert "delete_file" in str(err) + assert "policy denied" in str(err) + + conn_err = OPAConnectionError("http://opa:8181") + assert "http://opa:8181" in str(conn_err) + + +def test_tool_name_preserved() -> None: + inner = _EchoTool(name="my_special_tool") + tool, _ = _make_tool(inner=inner, name="my_special_tool") + assert tool.name == "my_special_tool" + assert tool.description == inner.description