diff --git a/README.md b/README.md index 9613305..7e0cb60 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,24 @@ print(g.to_json()) agentflow run pipeline.py --output summary ``` +## Codex Goal Mode + +Codex nodes can run through Codex's native Goal mode, the same thread goal +state managed by `/goal`: + +```python +codex( + task_id="repair", + prompt="Repair the failing API tests. Run pytest after each change.", + tools="read_write", + goal=True, +) +``` + +`goal=True` uses the rendered prompt as the objective. You can also pass +`goal="Repair the API test failures"` to set a shorter objective while the node +prompt becomes the first turn in that goal thread. + Or just ask Codex (the agentflow skill is auto-installed): ```bash diff --git a/agentflow/agents/codex.py b/agentflow/agents/codex.py index 37ad6a9..cd25045 100644 --- a/agentflow/agents/codex.py +++ b/agentflow/agents/codex.py @@ -1,5 +1,6 @@ from __future__ import annotations +import json from pathlib import Path from agentflow.agents.base import AgentAdapter @@ -8,6 +9,98 @@ from agentflow.specs import NodeSpec, ProviderConfig, RepoInstructionsMode, ToolAccess +_CODEX_GOAL_BOOTSTRAP_SCRIPT = r''' +import json +import os +import select +import subprocess +import sys +import time + + +def fail(message): + print(message, file=sys.stderr, flush=True) + raise SystemExit(1) + + +payload = json.load(sys.stdin) +executable = payload["executable"] +server = subprocess.Popen( + [executable, "app-server", "--listen", "stdio://", "--enable", "goals"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + env=os.environ.copy(), +) +next_id = 0 + + +def send(method, params): + global next_id + next_id += 1 + request = {"jsonrpc": "2.0", "id": next_id, "method": method, "params": params} + assert server.stdin is not None + server.stdin.write(json.dumps(request) + "\n") + server.stdin.flush() + return next_id + + +def read_response(request_id, timeout=30): + assert server.stdout is not None + assert server.stderr is not None + deadline = time.time() + timeout + while time.time() < deadline: + if server.poll() is not None: + fail(f"codex app-server exited before response {request_id}") + ready, _, _ = select.select([server.stdout, server.stderr], [], [], 0.2) + for stream in ready: + line = stream.readline() + if not line: + continue + if stream is server.stderr: + print(line.rstrip("\n"), file=sys.stderr, flush=True) + continue + try: + message = json.loads(line) + except json.JSONDecodeError: + continue + if message.get("id") != request_id: + continue + if "error" in message: + fail(f"codex app-server error for {request_id}: {message['error']}") + return message.get("result") + fail(f"timed out waiting for codex app-server response {request_id}") + + +def stop_server(): + if server.poll() is not None: + return + server.terminate() + try: + server.wait(timeout=3) + except subprocess.TimeoutExpired: + server.kill() + server.wait(timeout=3) + + +try: + read_response(send("initialize", payload["initialize"])) + thread_result = read_response(send("thread/start", payload["thread_start"])) + thread_id = thread_result["thread"]["id"] + read_response(send("thread/goal/set", { + "threadId": thread_id, + "objective": payload["objective"], + "status": "active", + })) +finally: + stop_server() + +resume_args = list(payload["resume_args"]) + [thread_id, payload["prompt"]] +os.execvp(resume_args[0], resume_args) +''' + + class CodexAdapter(AgentAdapter): _SUPPORTED_SANDBOX_MODES = {"read-only", "workspace-write", "danger-full-access"} @@ -111,6 +204,95 @@ def _maybe_prepend_wrapper(self, node: NodeSpec, prompt: str) -> str: return prompt return wrapper_text + self._WRAPPER_SEPARATOR + prompt + def _goal_payload( + self, + node: NodeSpec, + prompt: str, + *, + executable: str, + provider: ProviderConfig | None, + repo_instructions_ignored: bool, + sandbox: str, + target_workdir: str, + workspace_root: str | None, + ) -> dict[str, object] | None: + goal = getattr(node, "goal", False) + if not goal: + return None + + if isinstance(goal, str): + objective = goal.strip() + else: + objective = prompt.strip() + + resume_args = [ + executable, + "exec", + "resume", + "--json", + "--skip-git-repo-check", + "-c", + 'approval_policy="never"', + "-c", + "suppress_unstable_features_warning=true", + "--enable", + "goals", + "-c", + f"sandbox_mode={self._format_toml_value(sandbox)}", + ] + if node.model: + resume_args.extend(["--model", node.model]) + if provider: + resume_args.extend(["-c", f"model_provider={self._format_toml_value(provider.name)}"]) + if repo_instructions_ignored: + resume_args.extend(["--disable", "plugins"]) + if workspace_root: + resume_args.extend( + [ + "-c", + "sandbox_workspace_write.writable_roots=" + + self._format_toml_value([workspace_root]), + ] + ) + resume_args.extend(node.extra_args) + features = {"goals": True} + if repo_instructions_ignored: + features["plugins"] = False + config: dict[str, object] = { + "features": features, + "suppress_unstable_features_warning": True, + "sandbox_mode": sandbox, + } + if provider: + config["model_provider"] = provider.name + if repo_instructions_ignored and workspace_root: + config["sandbox_workspace_write"] = {"writable_roots": [workspace_root]} + + thread_start: dict[str, object] = { + "cwd": target_workdir, + "approvalPolicy": "never", + "sandbox": sandbox, + "config": config, + "threadSource": "user", + "sessionStartSource": "startup", + } + if node.model: + thread_start["model"] = node.model + if provider: + thread_start["modelProvider"] = provider.name + + return { + "executable": executable, + "initialize": { + "clientInfo": {"name": "agentflow", "version": "0"}, + "capabilities": None, + }, + "thread_start": thread_start, + "objective": objective, + "prompt": prompt.strip() or objective, + "resume_args": resume_args, + } + def prepare(self, node: NodeSpec, prompt: str, paths: ExecutionPaths) -> PreparedExecution: provider = self.provider_config(node.provider, node.agent) executable = node.executable or "codex" @@ -129,6 +311,8 @@ def prepare(self, node: NodeSpec, prompt: str, paths: ExecutionPaths) -> Prepare "--sandbox", sandbox, ] + if node.goal: + command.extend(["--enable", "goals"]) if node.model and not provider: command.extend(["--model", node.model]) if provider: @@ -138,7 +322,25 @@ def prepare(self, node: NodeSpec, prompt: str, paths: ExecutionPaths) -> Prepare command.extend(["--add-dir", paths.target_workdir]) command.extend(node.extra_args) prompt = self._maybe_prepend_wrapper(node, prompt) - command.append(prompt) + cwd = paths.target_workdir + if repo_instructions_ignored: + cwd = str(Path(paths.target_runtime_dir)) + goal_payload = self._goal_payload( + node, + prompt, + executable=executable, + provider=provider, + repo_instructions_ignored=repo_instructions_ignored, + sandbox=sandbox, + target_workdir=cwd, + workspace_root=paths.target_workdir if repo_instructions_ignored else None, + ) + stdin = None + if goal_payload is not None: + command = ["python3", "-c", _CODEX_GOAL_BOOTSTRAP_SCRIPT] + stdin = json.dumps(goal_payload) + else: + command.append(prompt) runtime_files: dict[str, str] = {} runtime_symlinks: dict[str, str] = {} @@ -163,9 +365,6 @@ def prepare(self, node: NodeSpec, prompt: str, paths: ExecutionPaths) -> Prepare runtime_symlinks[self.relative_runtime_file("codex_home", "auth.json")] = str(host_auth) env["CODEX_HOME"] = codex_home env["HOME"] = codex_home - cwd = paths.target_workdir - if repo_instructions_ignored: - cwd = str(Path(paths.target_runtime_dir)) return PreparedExecution( command=command, env=env, @@ -173,4 +372,5 @@ def prepare(self, node: NodeSpec, prompt: str, paths: ExecutionPaths) -> Prepare trace_kind="codex", runtime_files=runtime_files, runtime_symlinks=runtime_symlinks, + stdin=stdin, ) diff --git a/agentflow/orchestrator.py b/agentflow/orchestrator.py index 0a1efb5..53a4f7d 100644 --- a/agentflow/orchestrator.py +++ b/agentflow/orchestrator.py @@ -939,6 +939,7 @@ async def _execute_node( on_failure_restart=execution_node.on_failure_restart, fanout_dependencies=getattr(execution_node, 'fanout_dependencies', {}), executable=execution_node.executable, + goal=execution_node.goal, description=execution_node.description, repo_instructions_mode=execution_node.repo_instructions_mode, ) diff --git a/agentflow/specs.py b/agentflow/specs.py index a2537ca..4932694 100644 --- a/agentflow/specs.py +++ b/agentflow/specs.py @@ -18,7 +18,7 @@ class StrEnum(str, Enum): from pathlib import Path from typing import Annotated, Any, Literal -from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator +from pydantic import BaseModel, ConfigDict, Field, StrictBool, field_validator, model_validator from agentflow.local_shell import ( invalid_bash_long_option_error, @@ -792,6 +792,7 @@ class NodeSpec(BaseModel): env: dict[str, str] = Field(default_factory=dict) executable: str | None = None extra_args: list[str] = Field(default_factory=list) + goal: StrictBool | str = False description: str | None = None success_criteria: list[SuccessCriterion] = Field(default_factory=list) retries: int = Field(default=0, ge=0) @@ -824,6 +825,12 @@ def ensure_unique_dependencies(self) -> "NodeSpec": raise ValueError("scheduled nodes cannot also use `fanout`") if self.target.kind != "local": raise ValueError("scheduled nodes currently require a local target") + has_goal = bool(self.goal) + if isinstance(self.goal, str) and not self.goal.strip(): + raise ValueError("`goal` must not be empty") + if has_goal: + if builtin_agent_kind(self.agent) != AgentKind.CODEX: + raise ValueError("`goal` is only supported for codex nodes") resolve_provider(self.provider, self.agent) return self diff --git a/docs/pipelines.md b/docs/pipelines.md index 95c011e..bfcdf42 100644 --- a/docs/pipelines.md +++ b/docs/pipelines.md @@ -49,6 +49,7 @@ Each node supports: - `repo_instructions_mode`: `inherit` (default) or `ignore` for agent CLIs that should not absorb repo-local instruction files such as `AGENTS.md`, `CLAUDE.md`, or project skills - `mcps`: a list of MCP server definitions - `skills`: a list of local skill paths or names +- `goal`: `true` or a string for Codex nodes that should run through Codex's native Goal mode - `target`: `local`, `container`, `ssh`, `ec2`, or `ecs` - local target fields: `cwd`, `bootstrap`, `shell`, `shell_login`, `shell_interactive`, and `shell_init` - `capture`: `final` or `trace` diff --git a/tests/test_agents.py b/tests/test_agents.py index d2368b2..e3c179a 100644 --- a/tests/test_agents.py +++ b/tests/test_agents.py @@ -1,14 +1,16 @@ from __future__ import annotations import json +import os from pathlib import Path +import subprocess from agentflow.agents.claude import ClaudeAdapter from agentflow.agents.codex import CodexAdapter from agentflow.agents.kimi import KimiAdapter from agentflow.agents.pi import PiAdapter from agentflow.prepared import ExecutionPaths -from agentflow.specs import NodeSpec +from agentflow.specs import NodeSpec, RepoInstructionsMode import pytest @@ -85,6 +87,180 @@ def test_codex_adapter_suppresses_unstable_feature_warning(tmp_path): assert 'suppress_unstable_features_warning=true' in prepared.command +def test_codex_adapter_runs_prompt_through_native_goal_bootstrap(tmp_path): + node = NodeSpec.model_validate( + { + "id": "implement", + "agent": "codex", + "prompt": "Finish the migration and keep tests green.", + "goal": True, + } + ) + + prepared = CodexAdapter().prepare(node, "Finish the migration and keep tests green.", _paths(tmp_path)) + + assert prepared.command[:2] == ["python3", "-c"] + assert prepared.stdin is not None + payload = json.loads(prepared.stdin) + assert payload["objective"] == "Finish the migration and keep tests green." + assert payload["prompt"] == "Finish the migration and keep tests green." + assert payload["resume_args"][:5] == ["codex", "exec", "resume", "--json", "--skip-git-repo-check"] + assert "--enable" in payload["resume_args"] + assert payload["resume_args"][payload["resume_args"].index("--enable") + 1] == "goals" + assert "-c" in payload["resume_args"] + assert 'sandbox_mode="read-only"' in payload["resume_args"] + + +def test_codex_adapter_combines_explicit_goal_with_prompt_context(tmp_path): + node = NodeSpec.model_validate( + { + "id": "repair", + "agent": "codex", + "prompt": "Use pytest after each change.", + "goal": "Repair the failing API tests.", + } + ) + + prepared = CodexAdapter().prepare(node, "Use pytest after each change.", _paths(tmp_path)) + + assert prepared.stdin is not None + payload = json.loads(prepared.stdin) + assert payload["objective"] == "Repair the failing API tests." + assert payload["prompt"] == "Use pytest after each change." + + +def test_codex_goal_resume_preserves_custom_provider_and_model(tmp_path): + node = NodeSpec.model_validate( + { + "id": "repair", + "agent": "codex", + "prompt": "Use the custom provider.", + "goal": True, + "model": "gpt-custom", + "provider": { + "name": "openai-pinned", + "base_url": "http://example.test/v1", + "api_key_env": "OPENAI_API_KEY", + "wire_api": "responses", + }, + } + ) + + prepared = CodexAdapter().prepare(node, "Use the custom provider.", _paths(tmp_path)) + + assert prepared.stdin is not None + payload = json.loads(prepared.stdin) + assert payload["thread_start"]["model"] == "gpt-custom" + assert payload["thread_start"]["modelProvider"] == "openai-pinned" + assert payload["thread_start"]["config"]["model_provider"] == "openai-pinned" + assert "--model" in payload["resume_args"] + assert payload["resume_args"][payload["resume_args"].index("--model") + 1] == "gpt-custom" + assert 'model_provider="openai-pinned"' in payload["resume_args"] + + +def test_codex_goal_resume_preserves_ignored_instruction_workspace_root(tmp_path): + paths = _paths(tmp_path) + node = NodeSpec.model_validate( + { + "id": "repair", + "agent": "codex", + "prompt": "Work on the repo without loading repo instructions.", + "goal": True, + "repo_instructions_mode": RepoInstructionsMode.IGNORE, + "tools": "read_write", + } + ) + + prepared = CodexAdapter().prepare(node, "Work on the repo without loading repo instructions.", paths) + + assert prepared.cwd == str(tmp_path / ".runtime") + assert prepared.stdin is not None + payload = json.loads(prepared.stdin) + expected_roots = [str(tmp_path)] + assert payload["thread_start"]["cwd"] == str(tmp_path / ".runtime") + assert payload["thread_start"]["config"]["features"]["plugins"] is False + assert payload["thread_start"]["config"]["sandbox_workspace_write"]["writable_roots"] == expected_roots + assert "--disable" in payload["resume_args"] + assert payload["resume_args"][payload["resume_args"].index("--disable") + 1] == "plugins" + assert "sandbox_workspace_write.writable_roots=" + json.dumps(expected_roots) in payload["resume_args"] + + +def test_codex_goal_bootstrap_sets_native_thread_goal_then_resumes(tmp_path): + log_path = tmp_path / "codex-log.jsonl" + fake_codex = tmp_path / "codex" + fake_codex.write_text( + r'''#!/usr/bin/env python3 +import json +import os +import sys + +log_path = os.environ["FAKE_CODEX_LOG"] + +def log(payload): + with open(log_path, "a", encoding="utf-8") as handle: + handle.write(json.dumps(payload) + "\n") + +args = sys.argv[1:] +if args[:2] == ["app-server", "--listen"]: + for line in sys.stdin: + request = json.loads(line) + log({"kind": "app-server", "method": request["method"], "params": request.get("params")}) + if request["method"] == "initialize": + response = {"id": request["id"], "result": {"userAgent": "fake", "codexHome": "/tmp/fake", "platformFamily": "unix", "platformOs": "linux"}} + elif request["method"] == "thread/start": + response = {"id": request["id"], "result": {"thread": {"id": "thread-123"}}} + elif request["method"] == "thread/goal/set": + response = {"id": request["id"], "result": {"goal": {"threadId": "thread-123", "objective": request["params"]["objective"], "status": "active"}}} + else: + response = {"id": request["id"], "error": {"message": "unexpected"}} + print(json.dumps(response), flush=True) + raise SystemExit(0) + +if args[:2] == ["exec", "resume"]: + log({"kind": "resume", "args": args}) + prompt = args[-1] + print(json.dumps({"type": "response.output_item.done", "item": {"type": "message", "role": "assistant", "content": [{"type": "output_text", "text": prompt}]}})) + raise SystemExit(0) + +raise SystemExit(2) +''', + encoding="utf-8", + ) + fake_codex.chmod(0o755) + node = NodeSpec.model_validate( + { + "id": "repair", + "agent": "codex", + "prompt": "Use pytest after each change.", + "goal": "Repair the failing API tests.", + "executable": str(fake_codex), + } + ) + prepared = CodexAdapter().prepare(node, "Use pytest after each change.", _paths(tmp_path)) + + result = subprocess.run( + prepared.command, + input=prepared.stdin, + text=True, + capture_output=True, + env={**os.environ, **prepared.env, "FAKE_CODEX_LOG": str(log_path)}, + cwd=prepared.cwd, + timeout=10, + ) + + assert result.returncode == 0, result.stderr + log_entries = [json.loads(line) for line in log_path.read_text(encoding="utf-8").splitlines()] + assert [entry["method"] for entry in log_entries if entry["kind"] == "app-server"] == [ + "initialize", + "thread/start", + "thread/goal/set", + ] + goal_set = [entry for entry in log_entries if entry.get("method") == "thread/goal/set"][0] + assert goal_set["params"]["objective"] == "Repair the failing API tests." + resume = [entry for entry in log_entries if entry["kind"] == "resume"][0] + assert resume["args"][-2:] == ["thread-123", "Use pytest after each change."] + + def test_codex_adapter_allows_env_override_for_sandbox_mode(tmp_path): node = NodeSpec.model_validate( { diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py index abdaaa7..38deba0 100644 --- a/tests/test_orchestrator.py +++ b/tests/test_orchestrator.py @@ -8,6 +8,7 @@ import pytest +from agentflow import Graph, codex from agentflow.agents.base import AgentAdapter from agentflow.agents.registry import AdapterRegistry from agentflow.orchestrator import Orchestrator @@ -240,6 +241,56 @@ async def test_orchestrator_runs_parallel_and_templates_outputs(tmp_path: Path): assert elapsed < 1.0 +@pytest.mark.asyncio +async def test_graph_codex_goal_mode_answers_from_goal_thread(tmp_path: Path): + marker = tmp_path / "goal-set" + fake_codex = tmp_path / "codex" + fake_codex.write_text( + r'''#!/usr/bin/env bash + +case "${1:-}:${2:-}" in +app-server:--listen) + read -r _ + echo '{"id":1,"result":{}}' + read -r _ + echo '{"id":2,"result":{"thread":{"id":"thread-123"}}}' + read -r _ + touch "$FAKE_CODEX_GOAL_MARKER" + echo '{"id":3,"result":{}}' + ;; +exec:resume) + if [[ -f "$FAKE_CODEX_GOAL_MARKER" && "${@: -1}" == "am i in /goal ? yes or no" ]]; then + echo '{"type":"response.output_item.done","item":{"type":"message","role":"assistant","content":[{"type":"output_text","text":"yes"}]}}' + else + echo '{"type":"response.output_item.done","item":{"type":"message","role":"assistant","content":[{"type":"output_text","text":"no"}]}}' + fi + ;; +*) + exit 2 + ;; +esac +''', + encoding="utf-8", + ) + fake_codex.chmod(0o755) + orchestrator = Orchestrator(store=RunStore(tmp_path / "runs"), adapters=AdapterRegistry(), runners=RunnerRegistry()) + + with Graph("goal-mode", working_dir=str(tmp_path)) as graph: + codex( + task_id="ask", + prompt="am i in /goal ? yes or no", + goal=True, + executable=str(fake_codex), + env={"FAKE_CODEX_GOAL_MARKER": str(marker)}, + ) + + run = await orchestrator.submit(graph.to_spec()) + completed = await orchestrator.wait(run.id, timeout=5) + + assert completed.status.value == "completed" + assert "yes" in (completed.nodes["ask"].output or "").lower() + + @pytest.mark.asyncio async def test_orchestrator_renders_fanout_group_context_in_merge_prompt(tmp_path: Path): orchestrator = make_orchestrator(tmp_path) diff --git a/tests/test_store_and_validation.py b/tests/test_store_and_validation.py index a097cb8..3351c9d 100644 --- a/tests/test_store_and_validation.py +++ b/tests/test_store_and_validation.py @@ -141,6 +141,32 @@ def test_pipeline_validation_rejects_codex_kimi_provider_alias(): ) +def test_pipeline_validation_rejects_goal_on_non_codex_nodes(): + with pytest.raises(ValueError, match="`goal` is only supported for codex nodes"): + PipelineSpec.model_validate( + { + "name": "invalid-goal-agent", + "working_dir": ".", + "nodes": [ + {"id": "review", "agent": "claude", "prompt": "review", "goal": True}, + ], + } + ) + + +def test_pipeline_validation_rejects_empty_goal(): + with pytest.raises(ValueError, match="`goal` must not be empty"): + PipelineSpec.model_validate( + { + "name": "invalid-empty-goal", + "working_dir": ".", + "nodes": [ + {"id": "plan", "agent": "codex", "prompt": "plan", "goal": " "}, + ], + } + ) + + @pytest.mark.parametrize( ("target_patch", "expected_field"), [