diff --git a/design/EP-991-workflow-subagents.md b/design/EP-991-workflow-subagents.md new file mode 100644 index 000000000..217cbf672 --- /dev/null +++ b/design/EP-991-workflow-subagents.md @@ -0,0 +1,190 @@ +# EP-991: Declarative Workflow Subagents + +- Issue: [#991](https://github.com/kagent-dev/kagent/issues/991) + +## Background + +Google ADK provides three workflow agent primitives — `SequentialAgent`, `ParallelAgent`, and `LoopAgent` — that deterministically orchestrate in-process sub-agents. kagent currently supports agent-to-agent delegation via A2A tool references, but this is LLM-driven (the model decides when and whether to call sub-agents). There is no way to declare deterministic multi-agent workflows in YAML. + +This EP adds a `workflow` field to the `DeclarativeAgentSpec` CRD that lets users declare Sequential, Parallel, and Loop orchestration patterns. Sub-agents are defined inline within the parent agent's CRD and run in-process within the same pod, sharing session state. + +## Motivation + +Users building multi-agent systems need deterministic orchestration patterns: +- **Sequential**: Run agents in a fixed order (e.g., writer → editor → publisher) +- **Parallel**: Run agents concurrently and merge results (e.g., research multiple topics simultaneously) +- **Loop**: Iterate until a condition is met (e.g., write → critique → refine cycles) + +Today, users must either rely on the LLM to coordinate agents (non-deterministic) or write custom BYO agents in code. Declarative workflow support brings these patterns to YAML-only users and ensures reliable execution order. + +### Goals + +1. Support `Sequential`, `Parallel`, and `Loop` workflow types via CRD configuration +2. Sub-agents run in-process within a single pod, sharing session state +3. Each sub-agent can have its own system message, model config, and MCP tools +4. Loop workflows support `maxIterations` and exit-on-escalation +5. Both Python and Go runtimes support workflow agents + +### Non-Goals + +1. Remote sub-agents (separate pods communicating via A2A within a workflow) +2. Nested workflows (a sub-agent that is itself a workflow) +3. Conditional branching or DAG-based orchestration beyond what ADK provides +4. UI visualization of workflow topology + +## Implementation + +### 1. CRD Types (`go/api/v1alpha2/agent_types.go`) + +New types added to the agent CRD: + +```go +// +kubebuilder:validation:Enum=Sequential;Parallel;Loop +type WorkflowType string + +type WorkflowSpec struct { + Type WorkflowType `json:"type"` + SubAgents []InlineAgentSpec `json:"subAgents"` + MaxIterations *int `json:"maxIterations,omitempty"` // Loop only +} + +type InlineAgentSpec struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + SystemMessage string `json:"systemMessage"` + ModelConfig string `json:"modelConfig,omitempty"` // inherits parent if unset + Tools []*Tool `json:"tools,omitempty"` // MCP tools only +} +``` + +The `Workflow` field is added to `DeclarativeAgentSpec` with CEL validation rules: +- `workflow` is mutually exclusive with `systemMessage`, `systemMessageFrom`, and `tools` +- `workflow` is mutually exclusive with `memory`, `context`, and `executeCodeBlocks` +- `maxIterations` is only valid for `Loop` type; when unset, the loop runs indefinitely until a sub-agent escalates + +### 2. ADK Config Types (`go/api/adk/types.go`) + +JSON-serializable types passed to both Python and Go runtimes: + +```go +type WorkflowAgentConfig struct { + Type string `json:"type"` // "sequential", "parallel", "loop" + SubAgents []SubAgentConfig `json:"sub_agents"` + MaxIterations *int `json:"max_iterations,omitempty"` +} + +type SubAgentConfig struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + Instruction string `json:"instruction"` + Model Model `json:"model"` + HttpTools []HttpMcpServerConfig `json:"http_tools,omitempty"` + SseTools []SseMcpServerConfig `json:"sse_tools,omitempty"` +} +``` + +The `AgentConfig` struct gets a new `Workflow *WorkflowAgentConfig` field. + +### 3. Translator Changes + +The translator's `translateInlineAgent` method branches when `Workflow` is set, calling a new `translateWorkflowAgent` method that: + +1. Resolves the default model config (used by sub-agents without their own) +2. For each sub-agent: resolves its model (own or inherited), translates MCP tools +3. Returns an `AgentConfig` with the `Workflow` field populated + +The `translateMCPServerTarget` method is refactored to support writing tool configs to `SubAgentConfig` in addition to `AgentConfig`. + +Validation rules enforced by the translator: +- Sub-agent names must be unique within a workflow +- Agent-as-tool references are not allowed within workflow sub-agents +- `maxIterations` only meaningful for Loop type + +### 4. Python Runtime (`python/packages/kagent-adk/src/kagent/adk/types.py`) + +The `AgentConfig.to_agent()` method is refactored: +- Existing logic moves to `_build_llm_agent()` +- New `_build_workflow_agent()` constructs in-process sub-agents and wraps them in the appropriate ADK workflow agent + +```python +from google.adk.agents import SequentialAgent, ParallelAgent, LoopAgent + +def _build_workflow_agent(self, name, sts_integration): + sub_agents = [self._build_sub_agent(cfg, sts_integration) for cfg in self.workflow.sub_agents] + match self.workflow.type: + case "sequential": return SequentialAgent(name=name, sub_agents=sub_agents, ...) + case "parallel": return ParallelAgent(name=name, sub_agents=sub_agents, ...) + case "loop": return LoopAgent(name=name, sub_agents=sub_agents, max_iterations=...) +``` + +### 5. Go Runtime (`go/adk/pkg/agent/agent.go`) + +New `CreateWorkflowAgent()` function creates sub-agents via `llmagent.New()` and wraps them in the appropriate workflow agent from `google.golang.org/adk/agent/workflowagents/`. + +The runner adapter (`go/adk/pkg/runner/adapter.go`) routes to `CreateWorkflowAgent()` when `agentConfig.Workflow != nil`. + +### Example: Sequential Workflow + +```yaml +apiVersion: kagent.dev/v1alpha2 +kind: Agent +metadata: + name: writer-critic +spec: + type: Declarative + description: Generates content then reviews it + declarative: + runtime: python + modelConfig: default-model-config + workflow: + type: Sequential + subAgents: + - name: writer + description: Writes creative content + systemMessage: | + You are a creative writer. Write a compelling paragraph about the given topic. + - name: critic + description: Reviews and improves content + systemMessage: | + You are a writing critic. Review the previous content and provide improvements. +``` + +### Example: Loop Workflow + +```yaml +apiVersion: kagent.dev/v1alpha2 +kind: Agent +metadata: + name: iterative-refiner +spec: + type: Declarative + description: Iteratively refines content through write-critique cycles + declarative: + modelConfig: default-model-config + workflow: + type: Loop + maxIterations: 5 + subAgents: + - name: writer + systemMessage: Write or refine content based on feedback. + - name: critic + systemMessage: Critique the content. If satisfactory, escalate to stop the loop. +``` + +### Test Plan + +1. **Translator golden tests**: Input YAML + expected output JSON for sequential, parallel, and loop workflows +2. **Python unit tests**: Verify `to_agent()` returns correct workflow agent type with correct sub-agent count and configuration +3. **Go unit tests**: Verify `CreateWorkflowAgent()` for all three workflow types +4. **CRD validation**: Verify CEL rules reject invalid combinations (workflow + systemMessage, maxIterations on non-Loop) + +## Alternatives + +**Remote sub-agents via A2A**: Each sub-agent as a separate Agent CR and pod. Rejected because ADK workflow agents require in-process sub-agents sharing session state. Remote A2A adds network latency and breaks session state sharing. The existing agent-as-tool pattern already covers the remote case. + +**Workflow as a separate CRD**: A dedicated `Workflow` resource type that references Agent CRs. Rejected for the same reason — ADK workflow agents need in-process sub-agents, not separate pods. + +## Open Questions + +1. Should sub-agents within a workflow be allowed to reference remote agents (other Agent CRs) as tools? Currently prohibited for simplicity; could be added later since the pod already has network access. +2. Should nested workflows (a sub-agent that is itself a workflow) be supported? Deferred to a future EP. diff --git a/go/adk/pkg/agent/agent.go b/go/adk/pkg/agent/agent.go index 1a7953f99..6be85bdf5 100644 --- a/go/adk/pkg/agent/agent.go +++ b/go/adk/pkg/agent/agent.go @@ -14,6 +14,9 @@ import ( "github.com/kagent-dev/kagent/go/api/adk" "google.golang.org/adk/agent" "google.golang.org/adk/agent/llmagent" + "google.golang.org/adk/agent/workflowagents/loopagent" + "google.golang.org/adk/agent/workflowagents/parallelagent" + "google.golang.org/adk/agent/workflowagents/sequentialagent" adkmodel "google.golang.org/adk/model" adkgemini "google.golang.org/adk/model/gemini" "google.golang.org/adk/tool" @@ -37,6 +40,101 @@ func CreateGoogleADKAgent(ctx context.Context, agentConfig *adk.AgentConfig, age return a, err } +// CreateWorkflowAgent creates a workflow agent (Sequential, Parallel, or Loop) +// from a config that contains a Workflow specification. +func CreateWorkflowAgent(ctx context.Context, agentConfig *adk.AgentConfig, agentName string) (agent.Agent, error) { + if agentConfig == nil || agentConfig.Workflow == nil { + return nil, fmt.Errorf("agent config with workflow is required") + } + + log := logr.FromContextOrDiscard(ctx) + workflow := agentConfig.Workflow + + subAgents := make([]agent.Agent, 0, len(workflow.SubAgents)) + for _, subAgentConfig := range workflow.SubAgents { + subAgent, err := createInProcessSubAgent(ctx, &subAgentConfig, log) + if err != nil { + return nil, fmt.Errorf("failed to create sub-agent %q: %w", subAgentConfig.Name, err) + } + subAgents = append(subAgents, subAgent) + } + + baseConfig := agent.Config{ + Name: agentName, + Description: agentConfig.Description, + SubAgents: subAgents, + } + + switch workflow.Type { + case "sequential": + return sequentialagent.New(sequentialagent.Config{ + AgentConfig: baseConfig, + }) + case "parallel": + return parallelagent.New(parallelagent.Config{ + AgentConfig: baseConfig, + }) + case "loop": + loopConfig := loopagent.Config{ + AgentConfig: baseConfig, + } + if workflow.MaxIterations != nil { + loopConfig.MaxIterations = uint(*workflow.MaxIterations) + } + return loopagent.New(loopConfig) + default: + return nil, fmt.Errorf("unknown workflow type: %s", workflow.Type) + } +} + +// createInProcessSubAgent builds an in-process LLM agent from a SubAgentConfig. +func createInProcessSubAgent(ctx context.Context, config *adk.SubAgentConfig, log logr.Logger) (agent.Agent, error) { + toolsets := mcp.CreateToolsets(ctx, config.HttpTools, config.SseTools) + + llmModel, err := CreateLLM(ctx, config.Model, log) + if err != nil { + return nil, fmt.Errorf("failed to create LLM: %w", err) + } + + // Collect tool names that require approval from MCP tools. + approvalSet := make(map[string]bool) + for _, httpTool := range config.HttpTools { + for _, name := range httpTool.RequireApproval { + approvalSet[name] = true + } + } + for _, sseTool := range config.SseTools { + for _, name := range sseTool.RequireApproval { + approvalSet[name] = true + } + } + + beforeToolCallbacks := []llmagent.BeforeToolCallback{makeBeforeToolCallback(log)} + beforeModelCallbacks := []llmagent.BeforeModelCallback{} + if len(approvalSet) > 0 { + log.Info("Wiring approval callback for workflow sub-agent", "name", config.Name, "toolCount", len(approvalSet)) + beforeToolCallbacks = append([]llmagent.BeforeToolCallback{MakeApprovalCallback(approvalSet)}, beforeToolCallbacks...) + beforeModelCallbacks = append(beforeModelCallbacks, MakeStripConfirmationPartsCallback()) + } + + return llmagent.New(llmagent.Config{ + Name: config.Name, + Description: config.Description, + Instruction: config.Instruction, + Model: llmModel, + Toolsets: toolsets, + IncludeContents: llmagent.IncludeContentsDefault, + BeforeToolCallbacks: beforeToolCallbacks, + BeforeModelCallbacks: beforeModelCallbacks, + AfterToolCallbacks: []llmagent.AfterToolCallback{ + makeAfterToolCallback(log), + }, + OnToolErrorCallbacks: []llmagent.OnToolErrorCallback{ + makeOnToolErrorCallback(log), + }, + }) +} + // CreateGoogleADKAgentWithSubagentSessionIDs creates a Google ADK agent and a // map of remote-subagent tool name → A2A context session ID (for stamping // outbound A2A events). Callers that only need the agent can use diff --git a/go/adk/pkg/runner/adapter.go b/go/adk/pkg/runner/adapter.go index d6230ccad..369137eb0 100644 --- a/go/adk/pkg/runner/adapter.go +++ b/go/adk/pkg/runner/adapter.go @@ -9,6 +9,7 @@ import ( kagentmemory "github.com/kagent-dev/kagent/go/adk/pkg/memory" "github.com/kagent-dev/kagent/go/adk/pkg/session" "github.com/kagent-dev/kagent/go/api/adk" + adkagent "google.golang.org/adk/agent" adkmemory "google.golang.org/adk/memory" "google.golang.org/adk/runner" adksession "google.golang.org/adk/session" @@ -31,18 +32,32 @@ func CreateRunnerConfig( appName string, memoryService *kagentmemory.KagentMemoryService, ) (runner.Config, map[string]string, error) { - var extraTools []adktool.Tool - if memoryService != nil { - saveTool, err := kagentmemory.NewSaveMemoryTool(memoryService) + var adkAgent adkagent.Agent + var subagentSessionIDs map[string]string + + if agentConfig.Workflow != nil { + // Workflow agents use a different creation path — no extra tools or remote agents. + var err error + adkAgent, err = agent.CreateWorkflowAgent(ctx, agentConfig, agentNameFromAppName(appName)) if err != nil { - return runner.Config{}, nil, fmt.Errorf("failed to create save_memory tool: %w", err) + return runner.Config{}, nil, fmt.Errorf("failed to create workflow agent: %w", err) + } + subagentSessionIDs = make(map[string]string) + } else { + var extraTools []adktool.Tool + if memoryService != nil { + saveTool, err := kagentmemory.NewSaveMemoryTool(memoryService) + if err != nil { + return runner.Config{}, nil, fmt.Errorf("failed to create save_memory tool: %w", err) + } + extraTools = append(extraTools, saveTool) } - extraTools = append(extraTools, saveTool) - } - adkAgent, subagentSessionIDs, err := agent.CreateGoogleADKAgentWithSubagentSessionIDs(ctx, agentConfig, agentNameFromAppName(appName), extraTools...) - if err != nil { - return runner.Config{}, nil, fmt.Errorf("failed to create agent: %w", err) + var err error + adkAgent, subagentSessionIDs, err = agent.CreateGoogleADKAgentWithSubagentSessionIDs(ctx, agentConfig, agentNameFromAppName(appName), extraTools...) + if err != nil { + return runner.Config{}, nil, fmt.Errorf("failed to create agent: %w", err) + } } var adkSessionService adksession.Service diff --git a/go/api/adk/types.go b/go/api/adk/types.go index 602a45798..b27ff0254 100644 --- a/go/api/adk/types.go +++ b/go/api/adk/types.go @@ -484,6 +484,27 @@ func (c *AgentCompressionConfig) UnmarshalJSON(data []byte) error { return nil } +// WorkflowAgentConfig represents a workflow agent that deterministically orchestrates +// in-process sub-agents using Sequential, Parallel, or Loop patterns. +type WorkflowAgentConfig struct { + // Type is the workflow pattern: "sequential", "parallel", or "loop". + Type string `json:"type"` + // SubAgents are the in-process LLM sub-agent configurations. + SubAgents []SubAgentConfig `json:"sub_agents"` + // MaxIterations applies only to loop workflows. + MaxIterations *int `json:"max_iterations,omitempty"` +} + +// SubAgentConfig represents an in-process LLM sub-agent within a workflow. +type SubAgentConfig struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + Instruction string `json:"instruction"` + Model Model `json:"model"` + HttpTools []HttpMcpServerConfig `json:"http_tools,omitempty"` + SseTools []SseMcpServerConfig `json:"sse_tools,omitempty"` +} + // See `python/packages/kagent-adk/src/kagent/adk/types.py` for the python version of this type AgentConfig struct { Model Model `json:"model"` @@ -497,6 +518,7 @@ type AgentConfig struct { Memory *MemoryConfig `json:"memory,omitempty"` Network *NetworkConfig `json:"network,omitempty"` ContextConfig *AgentContextConfig `json:"context_config,omitempty"` + Workflow *WorkflowAgentConfig `json:"workflow,omitempty"` } // GetStream returns the stream value or default if not set @@ -528,6 +550,7 @@ func (a *AgentConfig) UnmarshalJSON(data []byte) error { Memory json.RawMessage `json:"memory"` Network *NetworkConfig `json:"network,omitempty"` ContextConfig *AgentContextConfig `json:"context_config,omitempty"` + Workflow *WorkflowAgentConfig `json:"workflow,omitempty"` } if err := json.Unmarshal(data, &tmp); err != nil { return err @@ -557,6 +580,7 @@ func (a *AgentConfig) UnmarshalJSON(data []byte) error { a.Memory = memory a.Network = tmp.Network a.ContextConfig = tmp.ContextConfig + a.Workflow = tmp.Workflow return nil } diff --git a/go/api/config/crd/bases/kagent.dev_agents.yaml b/go/api/config/crd/bases/kagent.dev_agents.yaml index c92d4091e..3d4407a63 100644 --- a/go/api/config/crd/bases/kagent.dev_agents.yaml +++ b/go/api/config/crd/bases/kagent.dev_agents.yaml @@ -10160,10 +10160,238 @@ spec: rule: '!(!has(self.agent) && self.type == ''Agent'')' maxItems: 20 type: array + workflow: + description: |- + Workflow configures this agent as a workflow orchestrator (Sequential, Parallel, or Loop). + When set, this agent orchestrates in-process sub-agents rather than being an LLM agent itself. + Mutually exclusive with systemMessage, systemMessageFrom, and tools. + properties: + maxIterations: + description: |- + MaxIterations is the maximum number of loop iterations (Loop type only). + The loop exits when a sub-agent escalates or when this limit is reached. + If not set for Loop type, the loop runs indefinitely until a sub-agent escalates. + minimum: 1 + type: integer + subAgents: + description: |- + SubAgents are the in-process LLM agents that this workflow orchestrates. + Each sub-agent runs within the same pod and shares session state. + items: + description: |- + InlineAgentSpec defines an in-process LLM sub-agent within a workflow. + Unlike a top-level Agent CR, inline agents are not separate pods — they run + in-process within the parent workflow agent's pod. + properties: + description: + description: Description is a human-readable description + of what this sub-agent does. + type: string + modelConfig: + description: |- + ModelConfig is the name of the ModelConfig to use for this sub-agent. + If not specified, inherits the parent workflow agent's model config. + Must be in the same namespace as the Agent. + type: string + name: + description: |- + Name is the unique identifier for this sub-agent within the workflow. + Must be a valid Python identifier (letters, digits, underscores; cannot start with a digit). + maxLength: 63 + minLength: 1 + pattern: ^[A-Za-z_][A-Za-z0-9_]*$ + type: string + systemMessage: + description: SystemMessage is the system prompt for + this sub-agent. + minLength: 1 + type: string + tools: + description: |- + Tools are the MCP server tools available to this sub-agent. + Agent-as-tool references are not supported within workflow sub-agents. + items: + properties: + agent: + properties: + apiGroup: + type: string + kind: + type: string + name: + type: string + namespace: + type: string + required: + - name + type: object + headersFrom: + description: |- + HeadersFrom specifies a list of configuration values to be added as + headers to requests sent to the Tool from this agent. The value of + each header is resolved from either a Secret or ConfigMap in the same + namespace as the Agent. Headers specified here will override any + headers of the same name/key specified on the tool. + items: + description: ValueRef represents a configuration + value + properties: + name: + type: string + value: + type: string + valueFrom: + description: ValueSource defines a source + for configuration values from a Secret + or ConfigMap + properties: + key: + description: The key of the ConfigMap + or Secret. + type: string + name: + description: The name of the ConfigMap + or Secret. + type: string + type: + enum: + - ConfigMap + - Secret + type: string + required: + - key + - name + - type + type: object + required: + - name + type: object + x-kubernetes-validations: + - message: Exactly one of value or valueFrom + must be specified + rule: (has(self.value) && !has(self.valueFrom)) + || (!has(self.value) && has(self.valueFrom)) + type: array + mcpServer: + properties: + allowedHeaders: + description: |- + AllowedHeaders specifies which headers from the A2A request should be + propagated to MCP tool calls. Header names are case-insensitive. + + Authorization header behavior: + - Authorization headers CAN be propagated if explicitly listed in allowedHeaders + - When STS token propagation is enabled, STS-generated Authorization headers + will take precedence and replace any Authorization header from the A2A request + - This is a security measure to prevent request headers from overwriting + authentication tokens generated by the STS integration + + Example: ["x-user-email", "x-tenant-id"] + items: + type: string + type: array + apiGroup: + type: string + kind: + type: string + name: + type: string + namespace: + type: string + requireApproval: + description: |- + RequireApproval lists tool names that require human approval before + execution. Each name must also appear in ToolNames. When a tool in + this list is invoked by the agent, execution pauses and the user is + prompted to approve or reject the call. + items: + type: string + maxItems: 50 + type: array + toolNames: + description: |- + The names of the tools to be provided by the ToolServer + For a list of all the tools provided by the server, + the client can query the status of the ToolServer object after it has been created + items: + type: string + maxItems: 50 + type: array + required: + - name + type: object + x-kubernetes-validations: + - message: each RequireApproval entry must also + appear in ToolNames + rule: '!has(self.requireApproval) || self.requireApproval.all(x, + has(self.toolNames) && x in self.toolNames)' + type: + allOf: + - enum: + - McpServer + - Agent + - enum: + - McpServer + - Agent + description: ToolProviderType represents the tool + provider type + type: string + type: object + x-kubernetes-validations: + - message: type.mcpServer must be nil if the type + is not McpServer + rule: '!(has(self.mcpServer) && self.type != ''McpServer'')' + - message: type.mcpServer must be specified for McpServer + filter.type + rule: '!(!has(self.mcpServer) && self.type == ''McpServer'')' + - message: type.agent must be nil if the type is not + Agent + rule: '!(has(self.agent) && self.type != ''Agent'')' + - message: type.agent must be specified for Agent + filter.type + rule: '!(!has(self.agent) && self.type == ''Agent'')' + maxItems: 20 + type: array + required: + - name + - systemMessage + type: object + maxItems: 10 + minItems: 1 + type: array + type: + allOf: + - enum: + - Sequential + - Parallel + - Loop + - enum: + - Sequential + - Parallel + - Loop + description: Type is the workflow orchestration pattern. + type: string + required: + - subAgents + - type + type: object + x-kubernetes-validations: + - message: maxIterations is only valid for Loop workflow type + rule: self.type == 'Loop' || !has(self.maxIterations) type: object x-kubernetes-validations: - message: systemMessage and systemMessageFrom are mutually exclusive rule: '!has(self.systemMessage) || !has(self.systemMessageFrom)' + - message: workflow is mutually exclusive with systemMessage, systemMessageFrom, + and tools + rule: '!has(self.workflow) || (!has(self.systemMessage) && !has(self.systemMessageFrom) + && !has(self.tools))' + - message: workflow is mutually exclusive with memory, context, and + executeCodeBlocks + rule: '!has(self.workflow) || (!has(self.memory) && !has(self.context) + && !has(self.executeCodeBlocks))' + - message: either workflow or a system message source must be specified + rule: has(self.workflow) || has(self.systemMessage) || has(self.systemMessageFrom) description: type: string sandbox: diff --git a/go/api/config/crd/bases/kagent.dev_sandboxagents.yaml b/go/api/config/crd/bases/kagent.dev_sandboxagents.yaml index cfaace48b..d9bb645a7 100644 --- a/go/api/config/crd/bases/kagent.dev_sandboxagents.yaml +++ b/go/api/config/crd/bases/kagent.dev_sandboxagents.yaml @@ -7810,10 +7810,238 @@ spec: rule: '!(!has(self.agent) && self.type == ''Agent'')' maxItems: 20 type: array + workflow: + description: |- + Workflow configures this agent as a workflow orchestrator (Sequential, Parallel, or Loop). + When set, this agent orchestrates in-process sub-agents rather than being an LLM agent itself. + Mutually exclusive with systemMessage, systemMessageFrom, and tools. + properties: + maxIterations: + description: |- + MaxIterations is the maximum number of loop iterations (Loop type only). + The loop exits when a sub-agent escalates or when this limit is reached. + If not set for Loop type, the loop runs indefinitely until a sub-agent escalates. + minimum: 1 + type: integer + subAgents: + description: |- + SubAgents are the in-process LLM agents that this workflow orchestrates. + Each sub-agent runs within the same pod and shares session state. + items: + description: |- + InlineAgentSpec defines an in-process LLM sub-agent within a workflow. + Unlike a top-level Agent CR, inline agents are not separate pods — they run + in-process within the parent workflow agent's pod. + properties: + description: + description: Description is a human-readable description + of what this sub-agent does. + type: string + modelConfig: + description: |- + ModelConfig is the name of the ModelConfig to use for this sub-agent. + If not specified, inherits the parent workflow agent's model config. + Must be in the same namespace as the Agent. + type: string + name: + description: |- + Name is the unique identifier for this sub-agent within the workflow. + Must be a valid Python identifier (letters, digits, underscores; cannot start with a digit). + maxLength: 63 + minLength: 1 + pattern: ^[A-Za-z_][A-Za-z0-9_]*$ + type: string + systemMessage: + description: SystemMessage is the system prompt for + this sub-agent. + minLength: 1 + type: string + tools: + description: |- + Tools are the MCP server tools available to this sub-agent. + Agent-as-tool references are not supported within workflow sub-agents. + items: + properties: + agent: + properties: + apiGroup: + type: string + kind: + type: string + name: + type: string + namespace: + type: string + required: + - name + type: object + headersFrom: + description: |- + HeadersFrom specifies a list of configuration values to be added as + headers to requests sent to the Tool from this agent. The value of + each header is resolved from either a Secret or ConfigMap in the same + namespace as the Agent. Headers specified here will override any + headers of the same name/key specified on the tool. + items: + description: ValueRef represents a configuration + value + properties: + name: + type: string + value: + type: string + valueFrom: + description: ValueSource defines a source + for configuration values from a Secret + or ConfigMap + properties: + key: + description: The key of the ConfigMap + or Secret. + type: string + name: + description: The name of the ConfigMap + or Secret. + type: string + type: + enum: + - ConfigMap + - Secret + type: string + required: + - key + - name + - type + type: object + required: + - name + type: object + x-kubernetes-validations: + - message: Exactly one of value or valueFrom + must be specified + rule: (has(self.value) && !has(self.valueFrom)) + || (!has(self.value) && has(self.valueFrom)) + type: array + mcpServer: + properties: + allowedHeaders: + description: |- + AllowedHeaders specifies which headers from the A2A request should be + propagated to MCP tool calls. Header names are case-insensitive. + + Authorization header behavior: + - Authorization headers CAN be propagated if explicitly listed in allowedHeaders + - When STS token propagation is enabled, STS-generated Authorization headers + will take precedence and replace any Authorization header from the A2A request + - This is a security measure to prevent request headers from overwriting + authentication tokens generated by the STS integration + + Example: ["x-user-email", "x-tenant-id"] + items: + type: string + type: array + apiGroup: + type: string + kind: + type: string + name: + type: string + namespace: + type: string + requireApproval: + description: |- + RequireApproval lists tool names that require human approval before + execution. Each name must also appear in ToolNames. When a tool in + this list is invoked by the agent, execution pauses and the user is + prompted to approve or reject the call. + items: + type: string + maxItems: 50 + type: array + toolNames: + description: |- + The names of the tools to be provided by the ToolServer + For a list of all the tools provided by the server, + the client can query the status of the ToolServer object after it has been created + items: + type: string + maxItems: 50 + type: array + required: + - name + type: object + x-kubernetes-validations: + - message: each RequireApproval entry must also + appear in ToolNames + rule: '!has(self.requireApproval) || self.requireApproval.all(x, + has(self.toolNames) && x in self.toolNames)' + type: + allOf: + - enum: + - McpServer + - Agent + - enum: + - McpServer + - Agent + description: ToolProviderType represents the tool + provider type + type: string + type: object + x-kubernetes-validations: + - message: type.mcpServer must be nil if the type + is not McpServer + rule: '!(has(self.mcpServer) && self.type != ''McpServer'')' + - message: type.mcpServer must be specified for McpServer + filter.type + rule: '!(!has(self.mcpServer) && self.type == ''McpServer'')' + - message: type.agent must be nil if the type is not + Agent + rule: '!(has(self.agent) && self.type != ''Agent'')' + - message: type.agent must be specified for Agent + filter.type + rule: '!(!has(self.agent) && self.type == ''Agent'')' + maxItems: 20 + type: array + required: + - name + - systemMessage + type: object + maxItems: 10 + minItems: 1 + type: array + type: + allOf: + - enum: + - Sequential + - Parallel + - Loop + - enum: + - Sequential + - Parallel + - Loop + description: Type is the workflow orchestration pattern. + type: string + required: + - subAgents + - type + type: object + x-kubernetes-validations: + - message: maxIterations is only valid for Loop workflow type + rule: self.type == 'Loop' || !has(self.maxIterations) type: object x-kubernetes-validations: - message: systemMessage and systemMessageFrom are mutually exclusive rule: '!has(self.systemMessage) || !has(self.systemMessageFrom)' + - message: workflow is mutually exclusive with systemMessage, systemMessageFrom, + and tools + rule: '!has(self.workflow) || (!has(self.systemMessage) && !has(self.systemMessageFrom) + && !has(self.tools))' + - message: workflow is mutually exclusive with memory, context, and + executeCodeBlocks + rule: '!has(self.workflow) || (!has(self.memory) && !has(self.context) + && !has(self.executeCodeBlocks))' + - message: either workflow or a system message source must be specified + rule: has(self.workflow) || has(self.systemMessage) || has(self.systemMessageFrom) description: type: string sandbox: diff --git a/go/api/v1alpha2/agent_types.go b/go/api/v1alpha2/agent_types.go index 83df4c175..3c0b0260a 100644 --- a/go/api/v1alpha2/agent_types.go +++ b/go/api/v1alpha2/agent_types.go @@ -148,7 +148,75 @@ type GitRepo struct { Name string `json:"name,omitempty"` } +// WorkflowType represents the workflow orchestration pattern. +// +kubebuilder:validation:Enum=Sequential;Parallel;Loop +type WorkflowType string + +const ( + WorkflowType_Sequential WorkflowType = "Sequential" + WorkflowType_Parallel WorkflowType = "Parallel" + WorkflowType_Loop WorkflowType = "Loop" +) + +// WorkflowSpec defines a workflow agent that deterministically orchestrates +// in-process sub-agents using Sequential, Parallel, or Loop patterns. +// Sub-agents run within the same pod and share session state. +// +kubebuilder:validation:XValidation:rule="self.type == 'Loop' || !has(self.maxIterations)",message="maxIterations is only valid for Loop workflow type" +type WorkflowSpec struct { + // Type is the workflow orchestration pattern. + // +kubebuilder:validation:Enum=Sequential;Parallel;Loop + Type WorkflowType `json:"type"` + + // SubAgents are the in-process LLM agents that this workflow orchestrates. + // Each sub-agent runs within the same pod and shares session state. + // +kubebuilder:validation:MinItems=1 + // +kubebuilder:validation:MaxItems=10 + SubAgents []InlineAgentSpec `json:"subAgents"` + + // MaxIterations is the maximum number of loop iterations (Loop type only). + // The loop exits when a sub-agent escalates or when this limit is reached. + // If not set for Loop type, the loop runs indefinitely until a sub-agent escalates. + // +optional + // +kubebuilder:validation:Minimum=1 + MaxIterations *int `json:"maxIterations,omitempty"` +} + +// InlineAgentSpec defines an in-process LLM sub-agent within a workflow. +// Unlike a top-level Agent CR, inline agents are not separate pods — they run +// in-process within the parent workflow agent's pod. +type InlineAgentSpec struct { + // Name is the unique identifier for this sub-agent within the workflow. + // Must be a valid Python identifier (letters, digits, underscores; cannot start with a digit). + // +kubebuilder:validation:MinLength=1 + // +kubebuilder:validation:MaxLength=63 + // +kubebuilder:validation:Pattern=`^[A-Za-z_][A-Za-z0-9_]*$` + Name string `json:"name"` + + // Description is a human-readable description of what this sub-agent does. + // +optional + Description string `json:"description,omitempty"` + + // SystemMessage is the system prompt for this sub-agent. + // +kubebuilder:validation:MinLength=1 + SystemMessage string `json:"systemMessage"` + + // ModelConfig is the name of the ModelConfig to use for this sub-agent. + // If not specified, inherits the parent workflow agent's model config. + // Must be in the same namespace as the Agent. + // +optional + ModelConfig string `json:"modelConfig,omitempty"` + + // Tools are the MCP server tools available to this sub-agent. + // Agent-as-tool references are not supported within workflow sub-agents. + // +optional + // +kubebuilder:validation:MaxItems=20 + Tools []*Tool `json:"tools,omitempty"` +} + // +kubebuilder:validation:XValidation:rule="!has(self.systemMessage) || !has(self.systemMessageFrom)",message="systemMessage and systemMessageFrom are mutually exclusive" +// +kubebuilder:validation:XValidation:rule="!has(self.workflow) || (!has(self.systemMessage) && !has(self.systemMessageFrom) && !has(self.tools))",message="workflow is mutually exclusive with systemMessage, systemMessageFrom, and tools" +// +kubebuilder:validation:XValidation:rule="!has(self.workflow) || (!has(self.memory) && !has(self.context) && !has(self.executeCodeBlocks))",message="workflow is mutually exclusive with memory, context, and executeCodeBlocks" +// +kubebuilder:validation:XValidation:rule="has(self.workflow) || has(self.systemMessage) || has(self.systemMessageFrom)",message="either workflow or a system message source must be specified" type DeclarativeAgentSpec struct { // Runtime specifies which ADK implementation to use for this agent. // - "python": Uses the Python ADK (default, slower startup, full feature set) @@ -211,6 +279,12 @@ type DeclarativeAgentSpec struct { // This includes event compaction (compression) and context caching. // +optional Context *ContextConfig `json:"context,omitempty"` + + // Workflow configures this agent as a workflow orchestrator (Sequential, Parallel, or Loop). + // When set, this agent orchestrates in-process sub-agents rather than being an LLM agent itself. + // Mutually exclusive with systemMessage, systemMessageFrom, and tools. + // +optional + Workflow *WorkflowSpec `json:"workflow,omitempty"` } // SandboxConfig configures sandboxed execution behavior. diff --git a/go/api/v1alpha2/zz_generated.deepcopy.go b/go/api/v1alpha2/zz_generated.deepcopy.go index 2b0ca618f..e4cfbe95e 100644 --- a/go/api/v1alpha2/zz_generated.deepcopy.go +++ b/go/api/v1alpha2/zz_generated.deepcopy.go @@ -501,6 +501,11 @@ func (in *DeclarativeAgentSpec) DeepCopyInto(out *DeclarativeAgentSpec) { *out = new(ContextConfig) (*in).DeepCopyInto(*out) } + if in.Workflow != nil { + in, out := &in.Workflow, &out.Workflow + *out = new(WorkflowSpec) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeclarativeAgentSpec. @@ -590,6 +595,32 @@ func (in *GitRepo) DeepCopy() *GitRepo { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *InlineAgentSpec) DeepCopyInto(out *InlineAgentSpec) { + *out = *in + if in.Tools != nil { + in, out := &in.Tools, &out.Tools + *out = make([]*Tool, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(Tool) + (*in).DeepCopyInto(*out) + } + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InlineAgentSpec. +func (in *InlineAgentSpec) DeepCopy() *InlineAgentSpec { + if in == nil { + return nil + } + out := new(InlineAgentSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *MCPTool) DeepCopyInto(out *MCPTool) { *out = *in @@ -1601,3 +1632,30 @@ func (in *ValueSource) DeepCopy() *ValueSource { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *WorkflowSpec) DeepCopyInto(out *WorkflowSpec) { + *out = *in + if in.SubAgents != nil { + in, out := &in.SubAgents, &out.SubAgents + *out = make([]InlineAgentSpec, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.MaxIterations != nil { + in, out := &in.MaxIterations, &out.MaxIterations + *out = new(int) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkflowSpec. +func (in *WorkflowSpec) DeepCopy() *WorkflowSpec { + if in == nil { + return nil + } + out := new(WorkflowSpec) + in.DeepCopyInto(out) + return out +} diff --git a/go/core/internal/controller/translator/agent/adk_api_translator.go b/go/core/internal/controller/translator/agent/adk_api_translator.go index b11d030aa..dd3678e78 100644 --- a/go/core/internal/controller/translator/agent/adk_api_translator.go +++ b/go/core/internal/controller/translator/agent/adk_api_translator.go @@ -936,6 +936,28 @@ func (a *adkApiTranslator) translateRemoteMCPServerTarget(ctx context.Context, a return nil } +// translateMCPServerTool translates an MCP server tool reference into HttpMcpServerConfig +// or SseMcpServerConfig. Unlike translateMCPServerTarget, this returns the tool configs +// instead of mutating an AgentConfig, making it reusable for both regular agents and +// workflow sub-agents. +func (a *adkApiTranslator) translateMCPServerTool(ctx context.Context, agentNamespace string, toolServer *v1alpha2.McpServerTool, agentHeaders map[string]string, proxyURL string) (*adk.HttpMcpServerConfig, *adk.SseMcpServerConfig, error) { + // Create a temporary AgentConfig to reuse existing translation logic + tmpConfig := &adk.AgentConfig{} + if err := a.translateMCPServerTarget(ctx, tmpConfig, agentNamespace, toolServer, agentHeaders, proxyURL); err != nil { + return nil, nil, err + } + + var httpTool *adk.HttpMcpServerConfig + var sseTool *adk.SseMcpServerConfig + if len(tmpConfig.HttpTools) > 0 { + httpTool = &tmpConfig.HttpTools[len(tmpConfig.HttpTools)-1] + } + if len(tmpConfig.SseTools) > 0 { + sseTool = &tmpConfig.SseTools[len(tmpConfig.SseTools)-1] + } + return httpTool, sseTool, nil +} + // Helper functions // isInternalK8sURL checks if a URL points to an internal Kubernetes service. diff --git a/go/core/internal/controller/translator/agent/compiler.go b/go/core/internal/controller/translator/agent/compiler.go index 0232a859e..79516651f 100644 --- a/go/core/internal/controller/translator/agent/compiler.go +++ b/go/core/internal/controller/translator/agent/compiler.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "slices" + "strings" "github.com/kagent-dev/kagent/go/api/adk" "github.com/kagent-dev/kagent/go/api/v1alpha2" @@ -126,6 +127,11 @@ func (a *adkApiTranslator) validateAgent(ctx context.Context, agent v1alpha2.Age return nil } + // Validate workflow agents + if spec.Declarative.Workflow != nil { + return a.validateWorkflowAgent(spec.Declarative.Workflow) + } + for _, tool := range spec.Declarative.Tools { switch tool.Type { case v1alpha2.ToolProviderType_Agent: @@ -157,6 +163,12 @@ func (a *adkApiTranslator) validateAgent(ctx context.Context, agent v1alpha2.Age func (a *adkApiTranslator) translateInlineAgent(ctx context.Context, agent v1alpha2.AgentObject) (*adk.AgentConfig, *modelDeploymentData, []byte, error) { spec := agent.GetAgentSpec() + + // If this is a workflow agent, use the workflow translation path. + if spec.Declarative.Workflow != nil { + return a.translateWorkflowAgent(ctx, agent) + } + model, mdd, secretHashBytes, err := a.translateModel(ctx, agent.GetNamespace(), spec.Declarative.ModelConfig) if err != nil { return nil, nil, nil, err @@ -314,6 +326,96 @@ func (a *adkApiTranslator) translateInlineAgent(ctx context.Context, agent v1alp return cfg, mdd, secretHashBytes, nil } +func (a *adkApiTranslator) validateWorkflowAgent(workflow *v1alpha2.WorkflowSpec) error { + // Validate unique sub-agent names + names := make(map[string]bool, len(workflow.SubAgents)) + for _, subAgent := range workflow.SubAgents { + if names[subAgent.Name] { + return fmt.Errorf("duplicate sub-agent name %q in workflow", subAgent.Name) + } + names[subAgent.Name] = true + + // Agent-as-tool references are not supported within workflow sub-agents + for _, tool := range subAgent.Tools { + if tool.Type == v1alpha2.ToolProviderType_Agent { + return fmt.Errorf("sub-agent %q: agent-as-tool references are not supported within workflow sub-agents", subAgent.Name) + } + } + } + return nil +} + +func (a *adkApiTranslator) translateWorkflowAgent(ctx context.Context, agent v1alpha2.AgentObject) (*adk.AgentConfig, *modelDeploymentData, []byte, error) { + spec := agent.GetAgentSpec() + workflow := spec.Declarative.Workflow + + // Resolve the default model config (used by sub-agents that don't specify their own). + defaultModelConfigName := spec.Declarative.ModelConfig + defaultModel, mdd, secretHashBytes, err := a.translateModel(ctx, agent.GetNamespace(), defaultModelConfigName) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to translate default model config: %w", err) + } + + workflowConfig := &adk.WorkflowAgentConfig{ + Type: strings.ToLower(string(workflow.Type)), + MaxIterations: workflow.MaxIterations, + } + + for _, subAgent := range workflow.SubAgents { + subConfig := adk.SubAgentConfig{ + Name: subAgent.Name, + Description: subAgent.Description, + Instruction: subAgent.SystemMessage, + } + + // Resolve model: use sub-agent's model if specified, else inherit default. + if subAgent.ModelConfig != "" && subAgent.ModelConfig != defaultModelConfigName { + subModel, subMdd, subHash, err := a.translateModel(ctx, agent.GetNamespace(), subAgent.ModelConfig) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to translate model for sub-agent %q: %w", subAgent.Name, err) + } + subConfig.Model = subModel + mergeDeploymentData(mdd, subMdd) + secretHashBytes = append(secretHashBytes, subHash...) + } else { + subConfig.Model = defaultModel + } + + // Translate MCP tools for this sub-agent. + for _, tool := range subAgent.Tools { + headers, err := tool.ResolveHeaders(ctx, a.kube, agent.GetNamespace()) + if err != nil { + return nil, nil, nil, fmt.Errorf("sub-agent %q: failed to resolve tool headers: %w", subAgent.Name, err) + } + + if tool.McpServer != nil { + httpTool, sseTool, err := a.translateMCPServerTool(ctx, agent.GetNamespace(), tool.McpServer, headers, a.globalProxyURL) + if err != nil { + return nil, nil, nil, fmt.Errorf("sub-agent %q: failed to translate MCP tool: %w", subAgent.Name, err) + } + if httpTool != nil { + subConfig.HttpTools = append(subConfig.HttpTools, *httpTool) + } + if sseTool != nil { + subConfig.SseTools = append(subConfig.SseTools, *sseTool) + } + } + } + + workflowConfig.SubAgents = append(workflowConfig.SubAgents, subConfig) + } + + stream := spec.Declarative.Stream + cfg := &adk.AgentConfig{ + Description: spec.Description, + Model: defaultModel, + Stream: &stream, + Workflow: workflowConfig, + } + + return cfg, mdd, secretHashBytes, nil +} + // resolveRawSystemMessage gets the raw system message string from the agent spec // without applying any template processing. func (a *adkApiTranslator) resolveRawSystemMessage(ctx context.Context, agent v1alpha2.AgentObject) (string, error) { diff --git a/go/core/internal/controller/translator/agent/testdata/inputs/agent_with_workflow_loop.yaml b/go/core/internal/controller/translator/agent/testdata/inputs/agent_with_workflow_loop.yaml new file mode 100644 index 000000000..476690b6e --- /dev/null +++ b/go/core/internal/controller/translator/agent/testdata/inputs/agent_with_workflow_loop.yaml @@ -0,0 +1,41 @@ +operation: translateAgent +targetObject: iterative-refiner +namespace: test +objects: + - apiVersion: v1 + kind: Secret + metadata: + name: openai-secret + namespace: test + data: + api-key: c2stdGVzdC1hcGkta2V5 # base64 encoded "sk-test-api-key" + - apiVersion: kagent.dev/v1alpha2 + kind: ModelConfig + metadata: + name: default-model + namespace: test + spec: + provider: OpenAI + model: gpt-4o + apiKeySecret: openai-secret + apiKeySecretKey: api-key + - apiVersion: kagent.dev/v1alpha2 + kind: Agent + metadata: + name: iterative-refiner + namespace: test + spec: + type: Declarative + description: Iteratively refines content through write-critique cycles + declarative: + modelConfig: default-model + workflow: + type: Loop + maxIterations: 5 + subAgents: + - name: writer + description: Writes or refines content + systemMessage: Write or refine content based on feedback. + - name: critic + description: Critiques content + systemMessage: Critique the content. If satisfactory, escalate to stop. diff --git a/go/core/internal/controller/translator/agent/testdata/inputs/agent_with_workflow_sequential.yaml b/go/core/internal/controller/translator/agent/testdata/inputs/agent_with_workflow_sequential.yaml new file mode 100644 index 000000000..b419a78e0 --- /dev/null +++ b/go/core/internal/controller/translator/agent/testdata/inputs/agent_with_workflow_sequential.yaml @@ -0,0 +1,40 @@ +operation: translateAgent +targetObject: writer-critic +namespace: test +objects: + - apiVersion: v1 + kind: Secret + metadata: + name: openai-secret + namespace: test + data: + api-key: c2stdGVzdC1hcGkta2V5 # base64 encoded "sk-test-api-key" + - apiVersion: kagent.dev/v1alpha2 + kind: ModelConfig + metadata: + name: default-model + namespace: test + spec: + provider: OpenAI + model: gpt-4o + apiKeySecret: openai-secret + apiKeySecretKey: api-key + - apiVersion: kagent.dev/v1alpha2 + kind: Agent + metadata: + name: writer-critic + namespace: test + spec: + type: Declarative + description: A sequential workflow that writes then critiques content + declarative: + modelConfig: default-model + workflow: + type: Sequential + subAgents: + - name: writer + description: Writes creative content + systemMessage: You are a creative writer. Write a compelling paragraph. + - name: critic + description: Reviews and improves content + systemMessage: You are a writing critic. Review and improve the content. diff --git a/go/core/internal/controller/translator/agent/testdata/outputs/agent_with_workflow_loop.json b/go/core/internal/controller/translator/agent/testdata/outputs/agent_with_workflow_loop.json new file mode 100644 index 000000000..5d4f9ec83 --- /dev/null +++ b/go/core/internal/controller/translator/agent/testdata/outputs/agent_with_workflow_loop.json @@ -0,0 +1,312 @@ +{ + "agentCard": { + "capabilities": { + "pushNotifications": false, + "stateTransitionHistory": true, + "streaming": true + }, + "defaultInputModes": [ + "text" + ], + "defaultOutputModes": [ + "text" + ], + "description": "Iteratively refines content through write-critique cycles", + "name": "iterative_refiner", + "preferredTransport": "JSONRPC", + "skills": null, + "url": "http://iterative-refiner.test:8080", + "version": "" + }, + "config": { + "description": "Iteratively refines content through write-critique cycles", + "instruction": "", + "model": { + "base_url": "", + "model": "gpt-4o", + "type": "openai" + }, + "stream": false, + "workflow": { + "max_iterations": 5, + "sub_agents": [ + { + "description": "Writes or refines content", + "instruction": "Write or refine content based on feedback.", + "model": { + "base_url": "", + "model": "gpt-4o", + "type": "openai" + }, + "name": "writer" + }, + { + "description": "Critiques content", + "instruction": "Critique the content. If satisfactory, escalate to stop.", + "model": { + "base_url": "", + "model": "gpt-4o", + "type": "openai" + }, + "name": "critic" + } + ], + "type": "loop" + } + }, + "manifest": [ + { + "apiVersion": "v1", + "kind": "Secret", + "metadata": { + "labels": { + "app": "kagent", + "app.kubernetes.io/managed-by": "kagent", + "app.kubernetes.io/name": "iterative-refiner", + "app.kubernetes.io/part-of": "kagent", + "kagent": "iterative-refiner" + }, + "name": "iterative-refiner", + "namespace": "test", + "ownerReferences": [ + { + "apiVersion": "kagent.dev/v1alpha2", + "blockOwnerDeletion": true, + "controller": true, + "kind": "Agent", + "name": "iterative-refiner", + "uid": "" + } + ] + }, + "stringData": { + "agent-card.json": "{\"name\":\"iterative_refiner\",\"description\":\"Iteratively refines content through write-critique cycles\",\"url\":\"http://iterative-refiner.test:8080\",\"version\":\"\",\"capabilities\":{\"streaming\":true,\"pushNotifications\":false,\"stateTransitionHistory\":true},\"defaultInputModes\":[\"text\"],\"defaultOutputModes\":[\"text\"],\"skills\":[],\"preferredTransport\":\"JSONRPC\"}", + "config.json": "{\"model\":{\"type\":\"openai\",\"model\":\"gpt-4o\",\"base_url\":\"\"},\"description\":\"Iteratively refines content through write-critique cycles\",\"instruction\":\"\",\"stream\":false,\"workflow\":{\"type\":\"loop\",\"sub_agents\":[{\"name\":\"writer\",\"description\":\"Writes or refines content\",\"instruction\":\"Write or refine content based on feedback.\",\"model\":{\"type\":\"openai\",\"model\":\"gpt-4o\",\"base_url\":\"\"}},{\"name\":\"critic\",\"description\":\"Critiques content\",\"instruction\":\"Critique the content. If satisfactory, escalate to stop.\",\"model\":{\"type\":\"openai\",\"model\":\"gpt-4o\",\"base_url\":\"\"}}],\"max_iterations\":5}}" + } + }, + { + "apiVersion": "v1", + "kind": "ServiceAccount", + "metadata": { + "labels": { + "app": "kagent", + "app.kubernetes.io/managed-by": "kagent", + "app.kubernetes.io/name": "iterative-refiner", + "app.kubernetes.io/part-of": "kagent", + "kagent": "iterative-refiner" + }, + "name": "iterative-refiner", + "namespace": "test", + "ownerReferences": [ + { + "apiVersion": "kagent.dev/v1alpha2", + "blockOwnerDeletion": true, + "controller": true, + "kind": "Agent", + "name": "iterative-refiner", + "uid": "" + } + ] + } + }, + { + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": { + "labels": { + "app": "kagent", + "app.kubernetes.io/managed-by": "kagent", + "app.kubernetes.io/name": "iterative-refiner", + "app.kubernetes.io/part-of": "kagent", + "kagent": "iterative-refiner" + }, + "name": "iterative-refiner", + "namespace": "test", + "ownerReferences": [ + { + "apiVersion": "kagent.dev/v1alpha2", + "blockOwnerDeletion": true, + "controller": true, + "kind": "Agent", + "name": "iterative-refiner", + "uid": "" + } + ] + }, + "spec": { + "selector": { + "matchLabels": { + "app": "kagent", + "kagent": "iterative-refiner" + } + }, + "strategy": { + "rollingUpdate": { + "maxSurge": 1, + "maxUnavailable": 0 + }, + "type": "RollingUpdate" + }, + "template": { + "metadata": { + "annotations": { + "kagent.dev/config-hash": "1491071081710492384" + }, + "labels": { + "app": "kagent", + "app.kubernetes.io/managed-by": "kagent", + "app.kubernetes.io/name": "iterative-refiner", + "app.kubernetes.io/part-of": "kagent", + "kagent": "iterative-refiner" + } + }, + "spec": { + "containers": [ + { + "args": [ + "--host", + "0.0.0.0", + "--port", + "8080", + "--filepath", + "/config" + ], + "env": [ + { + "name": "OPENAI_API_KEY", + "valueFrom": { + "secretKeyRef": { + "key": "api-key", + "name": "openai-secret" + } + } + }, + { + "name": "KAGENT_NAMESPACE", + "valueFrom": { + "fieldRef": { + "fieldPath": "metadata.namespace" + } + } + }, + { + "name": "KAGENT_NAME", + "value": "iterative-refiner" + }, + { + "name": "KAGENT_URL", + "value": "http://kagent-controller.kagent:8083" + } + ], + "image": "cr.kagent.dev/kagent-dev/kagent/app:dev", + "imagePullPolicy": "IfNotPresent", + "name": "kagent", + "ports": [ + { + "containerPort": 8080, + "name": "http" + } + ], + "readinessProbe": { + "httpGet": { + "path": "/.well-known/agent-card.json", + "port": "http" + }, + "initialDelaySeconds": 15, + "periodSeconds": 15, + "timeoutSeconds": 15 + }, + "resources": { + "limits": { + "cpu": "2", + "memory": "1Gi" + }, + "requests": { + "cpu": "100m", + "memory": "384Mi" + } + }, + "volumeMounts": [ + { + "mountPath": "/config", + "name": "config" + }, + { + "mountPath": "/var/run/secrets/tokens", + "name": "kagent-token" + } + ] + } + ], + "serviceAccountName": "iterative-refiner", + "volumes": [ + { + "name": "config", + "secret": { + "secretName": "iterative-refiner" + } + }, + { + "name": "kagent-token", + "projected": { + "sources": [ + { + "serviceAccountToken": { + "audience": "kagent", + "expirationSeconds": 3600, + "path": "kagent-token" + } + } + ] + } + } + ] + } + } + }, + "status": {} + }, + { + "apiVersion": "v1", + "kind": "Service", + "metadata": { + "labels": { + "app": "kagent", + "app.kubernetes.io/managed-by": "kagent", + "app.kubernetes.io/name": "iterative-refiner", + "app.kubernetes.io/part-of": "kagent", + "kagent": "iterative-refiner" + }, + "name": "iterative-refiner", + "namespace": "test", + "ownerReferences": [ + { + "apiVersion": "kagent.dev/v1alpha2", + "blockOwnerDeletion": true, + "controller": true, + "kind": "Agent", + "name": "iterative-refiner", + "uid": "" + } + ] + }, + "spec": { + "ports": [ + { + "name": "http", + "port": 8080, + "targetPort": 8080 + } + ], + "selector": { + "app": "kagent", + "kagent": "iterative-refiner" + }, + "type": "ClusterIP" + }, + "status": { + "loadBalancer": {} + } + } + ] +} \ No newline at end of file diff --git a/go/core/internal/controller/translator/agent/testdata/outputs/agent_with_workflow_sequential.json b/go/core/internal/controller/translator/agent/testdata/outputs/agent_with_workflow_sequential.json new file mode 100644 index 000000000..6ca21a03c --- /dev/null +++ b/go/core/internal/controller/translator/agent/testdata/outputs/agent_with_workflow_sequential.json @@ -0,0 +1,311 @@ +{ + "agentCard": { + "capabilities": { + "pushNotifications": false, + "stateTransitionHistory": true, + "streaming": true + }, + "defaultInputModes": [ + "text" + ], + "defaultOutputModes": [ + "text" + ], + "description": "A sequential workflow that writes then critiques content", + "name": "writer_critic", + "preferredTransport": "JSONRPC", + "skills": null, + "url": "http://writer-critic.test:8080", + "version": "" + }, + "config": { + "description": "A sequential workflow that writes then critiques content", + "instruction": "", + "model": { + "base_url": "", + "model": "gpt-4o", + "type": "openai" + }, + "stream": false, + "workflow": { + "sub_agents": [ + { + "description": "Writes creative content", + "instruction": "You are a creative writer. Write a compelling paragraph.", + "model": { + "base_url": "", + "model": "gpt-4o", + "type": "openai" + }, + "name": "writer" + }, + { + "description": "Reviews and improves content", + "instruction": "You are a writing critic. Review and improve the content.", + "model": { + "base_url": "", + "model": "gpt-4o", + "type": "openai" + }, + "name": "critic" + } + ], + "type": "sequential" + } + }, + "manifest": [ + { + "apiVersion": "v1", + "kind": "Secret", + "metadata": { + "labels": { + "app": "kagent", + "app.kubernetes.io/managed-by": "kagent", + "app.kubernetes.io/name": "writer-critic", + "app.kubernetes.io/part-of": "kagent", + "kagent": "writer-critic" + }, + "name": "writer-critic", + "namespace": "test", + "ownerReferences": [ + { + "apiVersion": "kagent.dev/v1alpha2", + "blockOwnerDeletion": true, + "controller": true, + "kind": "Agent", + "name": "writer-critic", + "uid": "" + } + ] + }, + "stringData": { + "agent-card.json": "{\"name\":\"writer_critic\",\"description\":\"A sequential workflow that writes then critiques content\",\"url\":\"http://writer-critic.test:8080\",\"version\":\"\",\"capabilities\":{\"streaming\":true,\"pushNotifications\":false,\"stateTransitionHistory\":true},\"defaultInputModes\":[\"text\"],\"defaultOutputModes\":[\"text\"],\"skills\":[],\"preferredTransport\":\"JSONRPC\"}", + "config.json": "{\"model\":{\"type\":\"openai\",\"model\":\"gpt-4o\",\"base_url\":\"\"},\"description\":\"A sequential workflow that writes then critiques content\",\"instruction\":\"\",\"stream\":false,\"workflow\":{\"type\":\"sequential\",\"sub_agents\":[{\"name\":\"writer\",\"description\":\"Writes creative content\",\"instruction\":\"You are a creative writer. Write a compelling paragraph.\",\"model\":{\"type\":\"openai\",\"model\":\"gpt-4o\",\"base_url\":\"\"}},{\"name\":\"critic\",\"description\":\"Reviews and improves content\",\"instruction\":\"You are a writing critic. Review and improve the content.\",\"model\":{\"type\":\"openai\",\"model\":\"gpt-4o\",\"base_url\":\"\"}}]}}" + } + }, + { + "apiVersion": "v1", + "kind": "ServiceAccount", + "metadata": { + "labels": { + "app": "kagent", + "app.kubernetes.io/managed-by": "kagent", + "app.kubernetes.io/name": "writer-critic", + "app.kubernetes.io/part-of": "kagent", + "kagent": "writer-critic" + }, + "name": "writer-critic", + "namespace": "test", + "ownerReferences": [ + { + "apiVersion": "kagent.dev/v1alpha2", + "blockOwnerDeletion": true, + "controller": true, + "kind": "Agent", + "name": "writer-critic", + "uid": "" + } + ] + } + }, + { + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": { + "labels": { + "app": "kagent", + "app.kubernetes.io/managed-by": "kagent", + "app.kubernetes.io/name": "writer-critic", + "app.kubernetes.io/part-of": "kagent", + "kagent": "writer-critic" + }, + "name": "writer-critic", + "namespace": "test", + "ownerReferences": [ + { + "apiVersion": "kagent.dev/v1alpha2", + "blockOwnerDeletion": true, + "controller": true, + "kind": "Agent", + "name": "writer-critic", + "uid": "" + } + ] + }, + "spec": { + "selector": { + "matchLabels": { + "app": "kagent", + "kagent": "writer-critic" + } + }, + "strategy": { + "rollingUpdate": { + "maxSurge": 1, + "maxUnavailable": 0 + }, + "type": "RollingUpdate" + }, + "template": { + "metadata": { + "annotations": { + "kagent.dev/config-hash": "9876469571143765314" + }, + "labels": { + "app": "kagent", + "app.kubernetes.io/managed-by": "kagent", + "app.kubernetes.io/name": "writer-critic", + "app.kubernetes.io/part-of": "kagent", + "kagent": "writer-critic" + } + }, + "spec": { + "containers": [ + { + "args": [ + "--host", + "0.0.0.0", + "--port", + "8080", + "--filepath", + "/config" + ], + "env": [ + { + "name": "OPENAI_API_KEY", + "valueFrom": { + "secretKeyRef": { + "key": "api-key", + "name": "openai-secret" + } + } + }, + { + "name": "KAGENT_NAMESPACE", + "valueFrom": { + "fieldRef": { + "fieldPath": "metadata.namespace" + } + } + }, + { + "name": "KAGENT_NAME", + "value": "writer-critic" + }, + { + "name": "KAGENT_URL", + "value": "http://kagent-controller.kagent:8083" + } + ], + "image": "cr.kagent.dev/kagent-dev/kagent/app:dev", + "imagePullPolicy": "IfNotPresent", + "name": "kagent", + "ports": [ + { + "containerPort": 8080, + "name": "http" + } + ], + "readinessProbe": { + "httpGet": { + "path": "/.well-known/agent-card.json", + "port": "http" + }, + "initialDelaySeconds": 15, + "periodSeconds": 15, + "timeoutSeconds": 15 + }, + "resources": { + "limits": { + "cpu": "2", + "memory": "1Gi" + }, + "requests": { + "cpu": "100m", + "memory": "384Mi" + } + }, + "volumeMounts": [ + { + "mountPath": "/config", + "name": "config" + }, + { + "mountPath": "/var/run/secrets/tokens", + "name": "kagent-token" + } + ] + } + ], + "serviceAccountName": "writer-critic", + "volumes": [ + { + "name": "config", + "secret": { + "secretName": "writer-critic" + } + }, + { + "name": "kagent-token", + "projected": { + "sources": [ + { + "serviceAccountToken": { + "audience": "kagent", + "expirationSeconds": 3600, + "path": "kagent-token" + } + } + ] + } + } + ] + } + } + }, + "status": {} + }, + { + "apiVersion": "v1", + "kind": "Service", + "metadata": { + "labels": { + "app": "kagent", + "app.kubernetes.io/managed-by": "kagent", + "app.kubernetes.io/name": "writer-critic", + "app.kubernetes.io/part-of": "kagent", + "kagent": "writer-critic" + }, + "name": "writer-critic", + "namespace": "test", + "ownerReferences": [ + { + "apiVersion": "kagent.dev/v1alpha2", + "blockOwnerDeletion": true, + "controller": true, + "kind": "Agent", + "name": "writer-critic", + "uid": "" + } + ] + }, + "spec": { + "ports": [ + { + "name": "http", + "port": 8080, + "targetPort": 8080 + } + ], + "selector": { + "app": "kagent", + "kagent": "writer-critic" + }, + "type": "ClusterIP" + }, + "status": { + "loadBalancer": {} + } + } + ] +} \ No newline at end of file diff --git a/helm/kagent-crds/templates/kagent.dev_agents.yaml b/helm/kagent-crds/templates/kagent.dev_agents.yaml index c92d4091e..3d4407a63 100644 --- a/helm/kagent-crds/templates/kagent.dev_agents.yaml +++ b/helm/kagent-crds/templates/kagent.dev_agents.yaml @@ -10160,10 +10160,238 @@ spec: rule: '!(!has(self.agent) && self.type == ''Agent'')' maxItems: 20 type: array + workflow: + description: |- + Workflow configures this agent as a workflow orchestrator (Sequential, Parallel, or Loop). + When set, this agent orchestrates in-process sub-agents rather than being an LLM agent itself. + Mutually exclusive with systemMessage, systemMessageFrom, and tools. + properties: + maxIterations: + description: |- + MaxIterations is the maximum number of loop iterations (Loop type only). + The loop exits when a sub-agent escalates or when this limit is reached. + If not set for Loop type, the loop runs indefinitely until a sub-agent escalates. + minimum: 1 + type: integer + subAgents: + description: |- + SubAgents are the in-process LLM agents that this workflow orchestrates. + Each sub-agent runs within the same pod and shares session state. + items: + description: |- + InlineAgentSpec defines an in-process LLM sub-agent within a workflow. + Unlike a top-level Agent CR, inline agents are not separate pods — they run + in-process within the parent workflow agent's pod. + properties: + description: + description: Description is a human-readable description + of what this sub-agent does. + type: string + modelConfig: + description: |- + ModelConfig is the name of the ModelConfig to use for this sub-agent. + If not specified, inherits the parent workflow agent's model config. + Must be in the same namespace as the Agent. + type: string + name: + description: |- + Name is the unique identifier for this sub-agent within the workflow. + Must be a valid Python identifier (letters, digits, underscores; cannot start with a digit). + maxLength: 63 + minLength: 1 + pattern: ^[A-Za-z_][A-Za-z0-9_]*$ + type: string + systemMessage: + description: SystemMessage is the system prompt for + this sub-agent. + minLength: 1 + type: string + tools: + description: |- + Tools are the MCP server tools available to this sub-agent. + Agent-as-tool references are not supported within workflow sub-agents. + items: + properties: + agent: + properties: + apiGroup: + type: string + kind: + type: string + name: + type: string + namespace: + type: string + required: + - name + type: object + headersFrom: + description: |- + HeadersFrom specifies a list of configuration values to be added as + headers to requests sent to the Tool from this agent. The value of + each header is resolved from either a Secret or ConfigMap in the same + namespace as the Agent. Headers specified here will override any + headers of the same name/key specified on the tool. + items: + description: ValueRef represents a configuration + value + properties: + name: + type: string + value: + type: string + valueFrom: + description: ValueSource defines a source + for configuration values from a Secret + or ConfigMap + properties: + key: + description: The key of the ConfigMap + or Secret. + type: string + name: + description: The name of the ConfigMap + or Secret. + type: string + type: + enum: + - ConfigMap + - Secret + type: string + required: + - key + - name + - type + type: object + required: + - name + type: object + x-kubernetes-validations: + - message: Exactly one of value or valueFrom + must be specified + rule: (has(self.value) && !has(self.valueFrom)) + || (!has(self.value) && has(self.valueFrom)) + type: array + mcpServer: + properties: + allowedHeaders: + description: |- + AllowedHeaders specifies which headers from the A2A request should be + propagated to MCP tool calls. Header names are case-insensitive. + + Authorization header behavior: + - Authorization headers CAN be propagated if explicitly listed in allowedHeaders + - When STS token propagation is enabled, STS-generated Authorization headers + will take precedence and replace any Authorization header from the A2A request + - This is a security measure to prevent request headers from overwriting + authentication tokens generated by the STS integration + + Example: ["x-user-email", "x-tenant-id"] + items: + type: string + type: array + apiGroup: + type: string + kind: + type: string + name: + type: string + namespace: + type: string + requireApproval: + description: |- + RequireApproval lists tool names that require human approval before + execution. Each name must also appear in ToolNames. When a tool in + this list is invoked by the agent, execution pauses and the user is + prompted to approve or reject the call. + items: + type: string + maxItems: 50 + type: array + toolNames: + description: |- + The names of the tools to be provided by the ToolServer + For a list of all the tools provided by the server, + the client can query the status of the ToolServer object after it has been created + items: + type: string + maxItems: 50 + type: array + required: + - name + type: object + x-kubernetes-validations: + - message: each RequireApproval entry must also + appear in ToolNames + rule: '!has(self.requireApproval) || self.requireApproval.all(x, + has(self.toolNames) && x in self.toolNames)' + type: + allOf: + - enum: + - McpServer + - Agent + - enum: + - McpServer + - Agent + description: ToolProviderType represents the tool + provider type + type: string + type: object + x-kubernetes-validations: + - message: type.mcpServer must be nil if the type + is not McpServer + rule: '!(has(self.mcpServer) && self.type != ''McpServer'')' + - message: type.mcpServer must be specified for McpServer + filter.type + rule: '!(!has(self.mcpServer) && self.type == ''McpServer'')' + - message: type.agent must be nil if the type is not + Agent + rule: '!(has(self.agent) && self.type != ''Agent'')' + - message: type.agent must be specified for Agent + filter.type + rule: '!(!has(self.agent) && self.type == ''Agent'')' + maxItems: 20 + type: array + required: + - name + - systemMessage + type: object + maxItems: 10 + minItems: 1 + type: array + type: + allOf: + - enum: + - Sequential + - Parallel + - Loop + - enum: + - Sequential + - Parallel + - Loop + description: Type is the workflow orchestration pattern. + type: string + required: + - subAgents + - type + type: object + x-kubernetes-validations: + - message: maxIterations is only valid for Loop workflow type + rule: self.type == 'Loop' || !has(self.maxIterations) type: object x-kubernetes-validations: - message: systemMessage and systemMessageFrom are mutually exclusive rule: '!has(self.systemMessage) || !has(self.systemMessageFrom)' + - message: workflow is mutually exclusive with systemMessage, systemMessageFrom, + and tools + rule: '!has(self.workflow) || (!has(self.systemMessage) && !has(self.systemMessageFrom) + && !has(self.tools))' + - message: workflow is mutually exclusive with memory, context, and + executeCodeBlocks + rule: '!has(self.workflow) || (!has(self.memory) && !has(self.context) + && !has(self.executeCodeBlocks))' + - message: either workflow or a system message source must be specified + rule: has(self.workflow) || has(self.systemMessage) || has(self.systemMessageFrom) description: type: string sandbox: diff --git a/helm/kagent-crds/templates/kagent.dev_sandboxagents.yaml b/helm/kagent-crds/templates/kagent.dev_sandboxagents.yaml index cfaace48b..d9bb645a7 100644 --- a/helm/kagent-crds/templates/kagent.dev_sandboxagents.yaml +++ b/helm/kagent-crds/templates/kagent.dev_sandboxagents.yaml @@ -7810,10 +7810,238 @@ spec: rule: '!(!has(self.agent) && self.type == ''Agent'')' maxItems: 20 type: array + workflow: + description: |- + Workflow configures this agent as a workflow orchestrator (Sequential, Parallel, or Loop). + When set, this agent orchestrates in-process sub-agents rather than being an LLM agent itself. + Mutually exclusive with systemMessage, systemMessageFrom, and tools. + properties: + maxIterations: + description: |- + MaxIterations is the maximum number of loop iterations (Loop type only). + The loop exits when a sub-agent escalates or when this limit is reached. + If not set for Loop type, the loop runs indefinitely until a sub-agent escalates. + minimum: 1 + type: integer + subAgents: + description: |- + SubAgents are the in-process LLM agents that this workflow orchestrates. + Each sub-agent runs within the same pod and shares session state. + items: + description: |- + InlineAgentSpec defines an in-process LLM sub-agent within a workflow. + Unlike a top-level Agent CR, inline agents are not separate pods — they run + in-process within the parent workflow agent's pod. + properties: + description: + description: Description is a human-readable description + of what this sub-agent does. + type: string + modelConfig: + description: |- + ModelConfig is the name of the ModelConfig to use for this sub-agent. + If not specified, inherits the parent workflow agent's model config. + Must be in the same namespace as the Agent. + type: string + name: + description: |- + Name is the unique identifier for this sub-agent within the workflow. + Must be a valid Python identifier (letters, digits, underscores; cannot start with a digit). + maxLength: 63 + minLength: 1 + pattern: ^[A-Za-z_][A-Za-z0-9_]*$ + type: string + systemMessage: + description: SystemMessage is the system prompt for + this sub-agent. + minLength: 1 + type: string + tools: + description: |- + Tools are the MCP server tools available to this sub-agent. + Agent-as-tool references are not supported within workflow sub-agents. + items: + properties: + agent: + properties: + apiGroup: + type: string + kind: + type: string + name: + type: string + namespace: + type: string + required: + - name + type: object + headersFrom: + description: |- + HeadersFrom specifies a list of configuration values to be added as + headers to requests sent to the Tool from this agent. The value of + each header is resolved from either a Secret or ConfigMap in the same + namespace as the Agent. Headers specified here will override any + headers of the same name/key specified on the tool. + items: + description: ValueRef represents a configuration + value + properties: + name: + type: string + value: + type: string + valueFrom: + description: ValueSource defines a source + for configuration values from a Secret + or ConfigMap + properties: + key: + description: The key of the ConfigMap + or Secret. + type: string + name: + description: The name of the ConfigMap + or Secret. + type: string + type: + enum: + - ConfigMap + - Secret + type: string + required: + - key + - name + - type + type: object + required: + - name + type: object + x-kubernetes-validations: + - message: Exactly one of value or valueFrom + must be specified + rule: (has(self.value) && !has(self.valueFrom)) + || (!has(self.value) && has(self.valueFrom)) + type: array + mcpServer: + properties: + allowedHeaders: + description: |- + AllowedHeaders specifies which headers from the A2A request should be + propagated to MCP tool calls. Header names are case-insensitive. + + Authorization header behavior: + - Authorization headers CAN be propagated if explicitly listed in allowedHeaders + - When STS token propagation is enabled, STS-generated Authorization headers + will take precedence and replace any Authorization header from the A2A request + - This is a security measure to prevent request headers from overwriting + authentication tokens generated by the STS integration + + Example: ["x-user-email", "x-tenant-id"] + items: + type: string + type: array + apiGroup: + type: string + kind: + type: string + name: + type: string + namespace: + type: string + requireApproval: + description: |- + RequireApproval lists tool names that require human approval before + execution. Each name must also appear in ToolNames. When a tool in + this list is invoked by the agent, execution pauses and the user is + prompted to approve or reject the call. + items: + type: string + maxItems: 50 + type: array + toolNames: + description: |- + The names of the tools to be provided by the ToolServer + For a list of all the tools provided by the server, + the client can query the status of the ToolServer object after it has been created + items: + type: string + maxItems: 50 + type: array + required: + - name + type: object + x-kubernetes-validations: + - message: each RequireApproval entry must also + appear in ToolNames + rule: '!has(self.requireApproval) || self.requireApproval.all(x, + has(self.toolNames) && x in self.toolNames)' + type: + allOf: + - enum: + - McpServer + - Agent + - enum: + - McpServer + - Agent + description: ToolProviderType represents the tool + provider type + type: string + type: object + x-kubernetes-validations: + - message: type.mcpServer must be nil if the type + is not McpServer + rule: '!(has(self.mcpServer) && self.type != ''McpServer'')' + - message: type.mcpServer must be specified for McpServer + filter.type + rule: '!(!has(self.mcpServer) && self.type == ''McpServer'')' + - message: type.agent must be nil if the type is not + Agent + rule: '!(has(self.agent) && self.type != ''Agent'')' + - message: type.agent must be specified for Agent + filter.type + rule: '!(!has(self.agent) && self.type == ''Agent'')' + maxItems: 20 + type: array + required: + - name + - systemMessage + type: object + maxItems: 10 + minItems: 1 + type: array + type: + allOf: + - enum: + - Sequential + - Parallel + - Loop + - enum: + - Sequential + - Parallel + - Loop + description: Type is the workflow orchestration pattern. + type: string + required: + - subAgents + - type + type: object + x-kubernetes-validations: + - message: maxIterations is only valid for Loop workflow type + rule: self.type == 'Loop' || !has(self.maxIterations) type: object x-kubernetes-validations: - message: systemMessage and systemMessageFrom are mutually exclusive rule: '!has(self.systemMessage) || !has(self.systemMessageFrom)' + - message: workflow is mutually exclusive with systemMessage, systemMessageFrom, + and tools + rule: '!has(self.workflow) || (!has(self.systemMessage) && !has(self.systemMessageFrom) + && !has(self.tools))' + - message: workflow is mutually exclusive with memory, context, and + executeCodeBlocks + rule: '!has(self.workflow) || (!has(self.memory) && !has(self.context) + && !has(self.executeCodeBlocks))' + - message: either workflow or a system message source must be specified + rule: has(self.workflow) || has(self.systemMessage) || has(self.systemMessageFrom) description: type: string sandbox: diff --git a/python/packages/kagent-adk/src/kagent/adk/types.py b/python/packages/kagent-adk/src/kagent/adk/types.py index 635fca9c9..d10396ef7 100644 --- a/python/packages/kagent-adk/src/kagent/adk/types.py +++ b/python/packages/kagent-adk/src/kagent/adk/types.py @@ -3,7 +3,7 @@ import httpx from agentsts.adk import ADKTokenPropagationPlugin -from google.adk.agents import Agent +from google.adk.agents import Agent, BaseAgent from google.adk.agents.callback_context import CallbackContext from google.adk.agents.llm_agent import ToolUnion from google.adk.agents.readonly_context import ReadonlyContext @@ -284,6 +284,25 @@ class NetworkConfig(BaseModel): allowed_domains: list[str] = Field(default_factory=list) +class SubAgentConfig(BaseModel): + """Configuration for an in-process LLM sub-agent within a workflow.""" + + name: str + description: str = "" + instruction: str + model: ModelUnion = Field(discriminator="type") + http_tools: list[HttpMcpServerConfig] | None = None + sse_tools: list[SseMcpServerConfig] | None = None + + +class WorkflowAgentConfig(BaseModel): + """Configuration for a workflow agent that orchestrates in-process sub-agents.""" + + type: Literal["sequential", "parallel", "loop"] + sub_agents: list[SubAgentConfig] + max_iterations: int | None = None + + class AgentConfig(BaseModel): model: ModelUnion = Field(discriminator="type") description: str @@ -296,8 +315,14 @@ class AgentConfig(BaseModel): memory: MemoryConfig | None = None # Memory configuration network: NetworkConfig | None = None context_config: ContextConfig | None = None + workflow: WorkflowAgentConfig | None = None - def to_agent(self, name: str, sts_integration: Optional[ADKTokenPropagationPlugin] = None) -> Agent: + def to_agent(self, name: str, sts_integration: Optional[ADKTokenPropagationPlugin] = None) -> BaseAgent: + if self.workflow is not None: + return self._build_workflow_agent(name, sts_integration) + return self._build_llm_agent(name, sts_integration) + + def _build_llm_agent(self, name: str, sts_integration: Optional[ADKTokenPropagationPlugin] = None) -> Agent: if name is None or not str(name).strip(): raise ValueError("Agent name must be a non-empty string.") tools: list[ToolUnion] = [] @@ -436,6 +461,99 @@ async def rewrite_url_to_proxy(request: httpx.Request) -> None: return agent + def _build_workflow_agent( + self, name: str, sts_integration: Optional[ADKTokenPropagationPlugin] = None + ) -> BaseAgent: + """Build a workflow agent (Sequential, Parallel, or Loop) from config.""" + from google.adk.agents import LoopAgent, ParallelAgent, SequentialAgent + + sub_agents = [self._build_sub_agent(sub_cfg, sts_integration) for sub_cfg in self.workflow.sub_agents] + + workflow_type = self.workflow.type + if workflow_type == "sequential": + return SequentialAgent( + name=name, + description=self.description, + sub_agents=sub_agents, + ) + elif workflow_type == "parallel": + return ParallelAgent( + name=name, + description=self.description, + sub_agents=sub_agents, + ) + elif workflow_type == "loop": + kwargs: dict[str, Any] = { + "name": name, + "description": self.description, + "sub_agents": sub_agents, + } + if self.workflow.max_iterations is not None: + kwargs["max_iterations"] = self.workflow.max_iterations + return LoopAgent(**kwargs) + else: + raise ValueError(f"Unknown workflow type: {workflow_type}") + + def _build_sub_agent( + self, sub_cfg: SubAgentConfig, sts_integration: Optional[ADKTokenPropagationPlugin] = None + ) -> Agent: + """Build an in-process LLM Agent from a SubAgentConfig.""" + tools: list[ToolUnion] = [] + sts_header_provider = None + if sts_integration: + sts_header_provider = sts_integration.header_provider + + tools_requiring_approval: set[str] = set() + + if sub_cfg.http_tools: + for http_tool in sub_cfg.http_tools: + tool_header_provider = create_header_provider( + allowed_headers=http_tool.allowed_headers, + sts_header_provider=sts_header_provider, + ) + tools.append( + KAgentMcpToolset( + connection_params=http_tool.params, + tool_filter=http_tool.tools, + header_provider=tool_header_provider, + ) + ) + if http_tool.require_approval: + tools_requiring_approval.update(http_tool.require_approval) + + if sub_cfg.sse_tools: + for sse_tool in sub_cfg.sse_tools: + tool_header_provider = create_header_provider( + allowed_headers=sse_tool.allowed_headers, + sts_header_provider=sts_header_provider, + ) + tools.append( + KAgentMcpToolset( + connection_params=sse_tool.params, + tool_filter=sse_tool.tools, + header_provider=tool_header_provider, + ) + ) + if sse_tool.require_approval: + tools_requiring_approval.update(sse_tool.require_approval) + + model = _create_llm_from_model_config(sub_cfg.model) + + tools.append(AskUserTool()) + + before_tool_callback = make_approval_callback(tools_requiring_approval) if tools_requiring_approval else None + before_model_callback = strip_confirmation_parts_callback if tools_requiring_approval else None + + return Agent( + name=sub_cfg.name, + model=model, + description=sub_cfg.description, + static_instruction=sub_cfg.instruction, + tools=tools, + before_tool_callback=before_tool_callback, + before_model_callback=before_model_callback, + ) + def _configure_memory(self, agent: Agent) -> None: """Configures the agent to properly use memory. diff --git a/python/packages/kagent-adk/tests/unittests/test_workflow_agents.py b/python/packages/kagent-adk/tests/unittests/test_workflow_agents.py new file mode 100644 index 000000000..c5215f34e --- /dev/null +++ b/python/packages/kagent-adk/tests/unittests/test_workflow_agents.py @@ -0,0 +1,137 @@ +"""Tests for workflow agent construction from AgentConfig.""" + +from unittest.mock import MagicMock, patch + +import pytest +from google.adk.agents import LoopAgent, ParallelAgent, SequentialAgent +from google.adk.models import BaseLlm + +from kagent.adk.types import AgentConfig, SubAgentConfig, WorkflowAgentConfig + + +def _make_model_dict() -> dict: + """Create a minimal OpenAI model config dict.""" + return { + "type": "openai", + "model": "gpt-4o", + "base_url": "", + } + + +def _make_mock_llm(): + """Create a mock LLM that passes pydantic validation.""" + mock = MagicMock(spec=BaseLlm) + mock.model = "gpt-4o" + return mock + + +def _make_agent_config(workflow_type: str, max_iterations: int | None = None) -> AgentConfig: + """Create an AgentConfig with a workflow configuration.""" + sub_agents = [ + SubAgentConfig( + name="writer", + description="Writes content", + instruction="You are a writer.", + model=_make_model_dict(), + ), + SubAgentConfig( + name="critic", + description="Reviews content", + instruction="You are a critic.", + model=_make_model_dict(), + ), + ] + + workflow = WorkflowAgentConfig( + type=workflow_type, + sub_agents=sub_agents, + max_iterations=max_iterations, + ) + + return AgentConfig( + model=_make_model_dict(), + description="Test workflow agent", + instruction="", + workflow=workflow, + ) + + +@patch("kagent.adk.types._create_llm_from_model_config") +def test_sequential_workflow(mock_llm): + """to_agent() returns a SequentialAgent for type='sequential'.""" + mock_llm.return_value = _make_mock_llm() + config = _make_agent_config("sequential") + agent = config.to_agent("test_sequential") + assert isinstance(agent, SequentialAgent) + assert agent.name == "test_sequential" + assert len(agent.sub_agents) == 2 + assert agent.sub_agents[0].name == "writer" + assert agent.sub_agents[1].name == "critic" + + +@patch("kagent.adk.types._create_llm_from_model_config") +def test_parallel_workflow(mock_llm): + """to_agent() returns a ParallelAgent for type='parallel'.""" + mock_llm.return_value = _make_mock_llm() + config = _make_agent_config("parallel") + agent = config.to_agent("test_parallel") + assert isinstance(agent, ParallelAgent) + assert agent.name == "test_parallel" + assert len(agent.sub_agents) == 2 + + +@patch("kagent.adk.types._create_llm_from_model_config") +def test_loop_workflow(mock_llm): + """to_agent() returns a LoopAgent for type='loop' with max_iterations.""" + mock_llm.return_value = _make_mock_llm() + config = _make_agent_config("loop", max_iterations=5) + agent = config.to_agent("test_loop") + assert isinstance(agent, LoopAgent) + assert agent.name == "test_loop" + assert len(agent.sub_agents) == 2 + assert agent.max_iterations == 5 + + +@patch("kagent.adk.types._create_llm_from_model_config") +def test_loop_workflow_no_max_iterations(mock_llm): + """to_agent() returns a LoopAgent with no max_iterations when not set.""" + mock_llm.return_value = _make_mock_llm() + config = _make_agent_config("loop") + agent = config.to_agent("test_loop_no_max") + assert isinstance(agent, LoopAgent) + assert agent.max_iterations is None + + +@patch("kagent.adk.types._create_llm_from_model_config") +def test_unknown_workflow_type(mock_llm): + """to_agent() raises ValueError for unknown workflow type.""" + mock_llm.return_value = _make_mock_llm() + # Use model_construct to bypass pydantic validation for Literal type + workflow = WorkflowAgentConfig.model_construct( + type="unknown", + sub_agents=[ + SubAgentConfig( + name="agent1", + instruction="test", + model=_make_model_dict(), + ), + ], + ) + config = AgentConfig( + model=_make_model_dict(), + description="Test", + instruction="", + workflow=workflow, + ) + with pytest.raises(ValueError, match="Unknown workflow type"): + config.to_agent("test_unknown") + + +def test_no_workflow_returns_llm_agent(): + """AgentConfig without workflow has workflow=None.""" + config = AgentConfig( + model=_make_model_dict(), + description="Test regular agent", + instruction="You are helpful.", + ) + assert config.workflow is None