Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,24 @@ print(g.to_json())
agentflow run pipeline.py --output summary
```

## Codex Goal Mode

Codex nodes can run through Codex's native Goal mode, the same thread goal
state managed by `/goal`:

```python
codex(
task_id="repair",
prompt="Repair the failing API tests. Run pytest after each change.",
tools="read_write",
goal=True,
)
```

`goal=True` uses the rendered prompt as the objective. You can also pass
`goal="Repair the API test failures"` to set a shorter objective while the node
prompt becomes the first turn in that goal thread.

Or just ask Codex (the agentflow skill is auto-installed):

```bash
Expand Down
208 changes: 204 additions & 4 deletions agentflow/agents/codex.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import json
from pathlib import Path

from agentflow.agents.base import AgentAdapter
Expand All @@ -8,6 +9,98 @@
from agentflow.specs import NodeSpec, ProviderConfig, RepoInstructionsMode, ToolAccess


_CODEX_GOAL_BOOTSTRAP_SCRIPT = r'''
import json
import os
import select
import subprocess
import sys
import time


def fail(message):
print(message, file=sys.stderr, flush=True)
raise SystemExit(1)


payload = json.load(sys.stdin)
executable = payload["executable"]
server = subprocess.Popen(
[executable, "app-server", "--listen", "stdio://", "--enable", "goals"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
env=os.environ.copy(),
)
next_id = 0


def send(method, params):
global next_id
next_id += 1
request = {"jsonrpc": "2.0", "id": next_id, "method": method, "params": params}
assert server.stdin is not None
server.stdin.write(json.dumps(request) + "\n")
server.stdin.flush()
return next_id


def read_response(request_id, timeout=30):
assert server.stdout is not None
assert server.stderr is not None
deadline = time.time() + timeout
while time.time() < deadline:
if server.poll() is not None:
fail(f"codex app-server exited before response {request_id}")
ready, _, _ = select.select([server.stdout, server.stderr], [], [], 0.2)
for stream in ready:
line = stream.readline()
if not line:
continue
if stream is server.stderr:
print(line.rstrip("\n"), file=sys.stderr, flush=True)
continue
try:
message = json.loads(line)
except json.JSONDecodeError:
continue
if message.get("id") != request_id:
continue
if "error" in message:
fail(f"codex app-server error for {request_id}: {message['error']}")
return message.get("result")
fail(f"timed out waiting for codex app-server response {request_id}")


def stop_server():
if server.poll() is not None:
return
server.terminate()
try:
server.wait(timeout=3)
except subprocess.TimeoutExpired:
server.kill()
server.wait(timeout=3)


try:
read_response(send("initialize", payload["initialize"]))
thread_result = read_response(send("thread/start", payload["thread_start"]))
thread_id = thread_result["thread"]["id"]
read_response(send("thread/goal/set", {
"threadId": thread_id,
"objective": payload["objective"],
"status": "active",
}))
finally:
stop_server()

resume_args = list(payload["resume_args"]) + [thread_id, payload["prompt"]]
os.execvp(resume_args[0], resume_args)
'''


class CodexAdapter(AgentAdapter):
_SUPPORTED_SANDBOX_MODES = {"read-only", "workspace-write", "danger-full-access"}

Expand Down Expand Up @@ -111,6 +204,95 @@ def _maybe_prepend_wrapper(self, node: NodeSpec, prompt: str) -> str:
return prompt
return wrapper_text + self._WRAPPER_SEPARATOR + prompt

def _goal_payload(
self,
node: NodeSpec,
prompt: str,
*,
executable: str,
provider: ProviderConfig | None,
repo_instructions_ignored: bool,
sandbox: str,
target_workdir: str,
workspace_root: str | None,
) -> dict[str, object] | None:
goal = getattr(node, "goal", False)
if not goal:
return None

if isinstance(goal, str):
objective = goal.strip()
else:
objective = prompt.strip()

resume_args = [
executable,
"exec",
"resume",
"--json",
"--skip-git-repo-check",
"-c",
'approval_policy="never"',
"-c",
"suppress_unstable_features_warning=true",
"--enable",
"goals",
"-c",
f"sandbox_mode={self._format_toml_value(sandbox)}",
]
if node.model:
resume_args.extend(["--model", node.model])
if provider:
resume_args.extend(["-c", f"model_provider={self._format_toml_value(provider.name)}"])
if repo_instructions_ignored:
resume_args.extend(["--disable", "plugins"])
if workspace_root:
resume_args.extend(
[
"-c",
"sandbox_workspace_write.writable_roots="
+ self._format_toml_value([workspace_root]),
]
)
resume_args.extend(node.extra_args)
features = {"goals": True}
if repo_instructions_ignored:
features["plugins"] = False
config: dict[str, object] = {
"features": features,
"suppress_unstable_features_warning": True,
"sandbox_mode": sandbox,
}
if provider:
config["model_provider"] = provider.name
if repo_instructions_ignored and workspace_root:
config["sandbox_workspace_write"] = {"writable_roots": [workspace_root]}

thread_start: dict[str, object] = {
"cwd": target_workdir,
"approvalPolicy": "never",
"sandbox": sandbox,
"config": config,
"threadSource": "user",
"sessionStartSource": "startup",
}
if node.model:
thread_start["model"] = node.model
if provider:
thread_start["modelProvider"] = provider.name

return {
"executable": executable,
"initialize": {
"clientInfo": {"name": "agentflow", "version": "0"},
"capabilities": None,
},
"thread_start": thread_start,
"objective": objective,
"prompt": prompt.strip() or objective,
"resume_args": resume_args,
}

def prepare(self, node: NodeSpec, prompt: str, paths: ExecutionPaths) -> PreparedExecution:
provider = self.provider_config(node.provider, node.agent)
executable = node.executable or "codex"
Expand All @@ -129,6 +311,8 @@ def prepare(self, node: NodeSpec, prompt: str, paths: ExecutionPaths) -> Prepare
"--sandbox",
sandbox,
]
if node.goal:
command.extend(["--enable", "goals"])
if node.model and not provider:
command.extend(["--model", node.model])
if provider:
Expand All @@ -138,7 +322,25 @@ def prepare(self, node: NodeSpec, prompt: str, paths: ExecutionPaths) -> Prepare
command.extend(["--add-dir", paths.target_workdir])
command.extend(node.extra_args)
prompt = self._maybe_prepend_wrapper(node, prompt)
command.append(prompt)
cwd = paths.target_workdir
if repo_instructions_ignored:
cwd = str(Path(paths.target_runtime_dir))
goal_payload = self._goal_payload(
node,
prompt,
executable=executable,
provider=provider,
repo_instructions_ignored=repo_instructions_ignored,
sandbox=sandbox,
target_workdir=cwd,
workspace_root=paths.target_workdir if repo_instructions_ignored else None,
)
stdin = None
if goal_payload is not None:
command = ["python3", "-c", _CODEX_GOAL_BOOTSTRAP_SCRIPT]
stdin = json.dumps(goal_payload)
else:
command.append(prompt)

runtime_files: dict[str, str] = {}
runtime_symlinks: dict[str, str] = {}
Expand All @@ -163,14 +365,12 @@ def prepare(self, node: NodeSpec, prompt: str, paths: ExecutionPaths) -> Prepare
runtime_symlinks[self.relative_runtime_file("codex_home", "auth.json")] = str(host_auth)
env["CODEX_HOME"] = codex_home
env["HOME"] = codex_home
cwd = paths.target_workdir
if repo_instructions_ignored:
cwd = str(Path(paths.target_runtime_dir))
return PreparedExecution(
command=command,
env=env,
cwd=cwd,
trace_kind="codex",
runtime_files=runtime_files,
runtime_symlinks=runtime_symlinks,
stdin=stdin,
)
1 change: 1 addition & 0 deletions agentflow/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,7 @@ async def _execute_node(
on_failure_restart=execution_node.on_failure_restart,
fanout_dependencies=getattr(execution_node, 'fanout_dependencies', {}),
executable=execution_node.executable,
goal=execution_node.goal,
description=execution_node.description,
repo_instructions_mode=execution_node.repo_instructions_mode,
)
Expand Down
9 changes: 8 additions & 1 deletion agentflow/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class StrEnum(str, Enum):
from pathlib import Path
from typing import Annotated, Any, Literal

from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
from pydantic import BaseModel, ConfigDict, Field, StrictBool, field_validator, model_validator

from agentflow.local_shell import (
invalid_bash_long_option_error,
Expand Down Expand Up @@ -792,6 +792,7 @@ class NodeSpec(BaseModel):
env: dict[str, str] = Field(default_factory=dict)
executable: str | None = None
extra_args: list[str] = Field(default_factory=list)
goal: StrictBool | str = False
description: str | None = None
success_criteria: list[SuccessCriterion] = Field(default_factory=list)
retries: int = Field(default=0, ge=0)
Expand Down Expand Up @@ -824,6 +825,12 @@ def ensure_unique_dependencies(self) -> "NodeSpec":
raise ValueError("scheduled nodes cannot also use `fanout`")
if self.target.kind != "local":
raise ValueError("scheduled nodes currently require a local target")
has_goal = bool(self.goal)
if isinstance(self.goal, str) and not self.goal.strip():
raise ValueError("`goal` must not be empty")
if has_goal:
if builtin_agent_kind(self.agent) != AgentKind.CODEX:
raise ValueError("`goal` is only supported for codex nodes")
resolve_provider(self.provider, self.agent)
return self

Expand Down
1 change: 1 addition & 0 deletions docs/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Each node supports:
- `repo_instructions_mode`: `inherit` (default) or `ignore` for agent CLIs that should not absorb repo-local instruction files such as `AGENTS.md`, `CLAUDE.md`, or project skills
- `mcps`: a list of MCP server definitions
- `skills`: a list of local skill paths or names
- `goal`: `true` or a string for Codex nodes that should run through Codex's native Goal mode
- `target`: `local`, `container`, `ssh`, `ec2`, or `ecs`
- local target fields: `cwd`, `bootstrap`, `shell`, `shell_login`, `shell_interactive`, and `shell_init`
- `capture`: `final` or `trace`
Expand Down
Loading