From 2c51f65bc3904606bcd5f62cc7d10c771502eed0 Mon Sep 17 00:00:00 2001 From: george-lhj Date: Tue, 5 Aug 2025 11:53:49 -0700 Subject: [PATCH 01/20] adding supervisor agent --- meta-agents-v2/supervisor_agent/agents.yaml | 55 +++++++++++++++++++ meta-agents-v2/supervisor_agent/workflow.yaml | 17 ++++++ 2 files changed, 72 insertions(+) create mode 100644 meta-agents-v2/supervisor_agent/agents.yaml create mode 100644 meta-agents-v2/supervisor_agent/workflow.yaml diff --git a/meta-agents-v2/supervisor_agent/agents.yaml b/meta-agents-v2/supervisor_agent/agents.yaml new file mode 100644 index 0000000..3913332 --- /dev/null +++ b/meta-agents-v2/supervisor_agent/agents.yaml @@ -0,0 +1,55 @@ +apiVersion: maestro/v1alpha1 +kind: Agent +metadata: + name: IntentClassifier + labels: + app: supervisor-agent +spec: + model: deepseek-r1:latest + framework: openai + mode: local + description: | + Classifies user intent to determine whether a query is for workflow generation or YAML editing. + instructions: | + You are an Intent Classification Agent. Your job is to analyze user input and determine whether the user wants to: + + 1. GENERATE_WORKFLOW - Create a new workflow or modify existing workflow structure + 2. EDIT_YAML - Make specific edits to existing YAML files (agents.yaml or workflow.yaml) + + You will receive: + - The user's input message + - Optionally, the current YAML files (agents.yaml and/or workflow.yaml) if they exist + + Classification Rules: + + GENERATE_WORKFLOW if the user wants to: + - Create a new workflow from scratch + - Generate a complete workflow based on a description + - Build a new pipeline or process + - Create a workflow that performs a specific task + - Examples: "Create a workflow that...", "I want to build a pipeline that...", "Generate a workflow for..." + + EDIT_YAML if the user wants to: + - Make specific edits to existing YAML files + - Change configuration values (models, API keys, parameters) + - Fix syntax errors in existing YAML + - Update agent descriptions or metadata + - Modify existing workflow structure or steps + - Add new agents to existing agents.yaml + - Change the sequence of steps in existing workflow + - Examples: "Change the model to gpt-4", "Fix the indentation", "Update the API key", "Change the temperature to 0.8", "Add a new agent", "Modify the workflow steps" + + Output Format: + Simply include the words "GENERATE_WORKFLOW" or "EDIT_YAML" in your response. + + Examples: + - "Based on the user's request, this should be GENERATE_WORKFLOW" + - "This is an EDIT_YAML request" + - "GENERATE_WORKFLOW" + - "EDIT_YAML" + + The system will automatically extract the intent from your response. + + Guidelines: + - Be conservative - if unsure, default to GENERATE_WORKFLOW + - Focus on the user's primary intent, not secondary effects diff --git a/meta-agents-v2/supervisor_agent/workflow.yaml b/meta-agents-v2/supervisor_agent/workflow.yaml new file mode 100644 index 0000000..c8c6d59 --- /dev/null +++ b/meta-agents-v2/supervisor_agent/workflow.yaml @@ -0,0 +1,17 @@ +apiVersion: maestro/v1alpha1 +kind: Workflow +metadata: + name: supervisor-agent + labels: + app: supervisor-agent +spec: + template: + metadata: + labels: + app: supervisor-agent + agents: + - IntentClassifier + prompt: I want to fetch the current stock prices for Apple and Microsoft, and then analyze which one has performed better over the past week. + steps: + - name: classify_intent + agent: IntentClassifier \ No newline at end of file From d551c9940e3275c63b58370b420d3d1a3ec617ab Mon Sep 17 00:00:00 2001 From: george-lhj Date: Tue, 5 Aug 2025 11:54:08 -0700 Subject: [PATCH 02/20] add supervisor route --- api/main.py | 265 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 265 insertions(+) diff --git a/api/main.py b/api/main.py index a79eddf..ca14649 100644 --- a/api/main.py +++ b/api/main.py @@ -648,6 +648,271 @@ def strip_ansi_codes(text): ) +class SupervisorRequest(BaseModel): + content: str + chat_id: Optional[str] = None + +class SupervisorResponse(BaseModel): + intent: str + confidence: float + reasoning: str + response: str + yaml_files: List[Dict[str, str]] + chat_id: str + +@app.post("/api/supervisor", response_model=SupervisorResponse) +async def supervisor_route(request: SupervisorRequest): + """ + Main entry point that uses the supervisor agent to route requests to either + workflow generation or YAML editing based on user intent. + """ + try: + agents_yaml_content = "" + workflow_yaml_content = "" + + if request.chat_id: + try: + yaml_files = db.get_yaml_files(request.chat_id) + for file in yaml_files: + if file['name'] == 'agents.yaml': + agents_yaml_content = file['content'] + elif file['name'] == 'workflow.yaml': + workflow_yaml_content = file['content'] + except Exception as e: + print(f"Warning: Could not fetch YAML files for context: {e}") + + supervisor_prompt = f"""Analyze the user's input and classify their intent as either workflow generation or YAML editing. + +User input: {request.content} + +Current YAML files (if any): +Agents YAML: +{agents_yaml_content} + +Workflow YAML: +{workflow_yaml_content} + +Please classify the user's intent and return a JSON response with intent, confidence, and reasoning.""" + + print(f"Calling supervisor agent with prompt: {supervisor_prompt[:100]}...") + resp = requests.post( + "http://localhost:8005/chat", + json={ + "prompt": supervisor_prompt, + "agent": "IntentClassifier" + }, + timeout=30 + ) + + print(f"Supervisor agent response status: {resp.status_code}") + if resp.status_code != 200: + print(f"Supervisor agent error: {resp.text}") + raise Exception(f"Supervisor agent failed with status {resp.status_code}") + + supervisor_response = resp.json().get("response", "") + print(f"Supervisor agent response: {supervisor_response[:200]}...") + + def extract_intent_from_response(response_text): + """Extract intent from supervisor response, handling any format""" + print(f"Processing supervisor response: {response_text[:200]}...") + if "GENERATE_WORKFLOW" in response_text: + return "GENERATE_WORKFLOW" + elif "EDIT_YAML" in response_text: + return "EDIT_YAML" + else: + return "GENERATE_WORKFLOW" + + intent = extract_intent_from_response(supervisor_response) + print(f"Extracted intent: {intent}") + + # Route to appropriate endpoint based on intent + if intent == "EDIT_YAML": + if not agents_yaml_content and not workflow_yaml_content: + intent = "GENERATE_WORKFLOW" + + if intent == "EDIT_YAML": + file_to_edit = "agents.yaml" if agents_yaml_content else "workflow.yaml" + yaml_content = agents_yaml_content if agents_yaml_content else workflow_yaml_content + edit_resp = requests.post( + "http://localhost:8001/api/edit_yaml", + json={ + "yaml": yaml_content, + "instruction": request.content, + "file_type": file_to_edit.split('.')[0] # 'agents' or 'workflow' + }, + timeout=30 + ) + + if edit_resp.status_code == 200: + edit_data = edit_resp.json() + return SupervisorResponse( + intent=intent, + confidence=1.0, + reasoning="Successfully routed to editing", + response=f"Successfully edited {file_to_edit} based on your request: {request.content}", + yaml_files=[{ + "name": file_to_edit, + "content": edit_data["edited_yaml"] + }], + chat_id=request.chat_id or str(uuid.uuid4()) + ) + else: + intent = "GENERATE_WORKFLOW" + + if intent == "GENERATE_WORKFLOW": + print("Routing to workflow generation...") + try: + print("Calling agent generation on port 8003...") + agents_resp = requests.post( + "http://localhost:8003/chat", + json={"prompt": request.content, "agent": "TaskInterpreter"}, + timeout=120 + ) + print(f"Agent generation response status: {agents_resp.status_code}") + if agents_resp.status_code != 200: + print(f"Agent generation error: {agents_resp.text}") + raise Exception(f"Agents generation failed: {agents_resp.text}") + + agents_output = agents_resp.json().get("response", "") + print(f"Agent generation output length: {len(agents_output)}") + agents_yaml = "" + if "```yaml" in agents_output: + agents_yaml = ( + agents_output.split("```yaml", 1)[-1].split("```", 1)[0].strip() + ) + elif "```" in agents_output: + agents_yaml = agents_output.split("```", 1)[-1].split("```", 1)[0].strip() + else: + yaml_match = re.search(r'apiVersion:.*?(?=\n\n|\Z)', agents_output, re.DOTALL) + if yaml_match: + agents_yaml = yaml_match.group(0).strip() + agents_info = [] + try: + agent_blocks = agents_yaml.split('---') + for block in agent_blocks: + if block.strip(): + agent_data = yaml.safe_load(block) + if agent_data and 'metadata' in agent_data and 'name' in agent_data['metadata']: + name = agent_data['metadata']['name'] + description = agent_data.get('spec', {}).get('description', '') + agents_info.append({ + 'name': name, + 'description': description + }) + except yaml.YAMLError: + name_matches = re.findall(r'name:\s*(\w+)', agents_yaml) + desc_matches = re.findall(r'description:\s*\|\s*\n\s*(.+?)(?=\n\s*\w+:|$)', agents_yaml, re.DOTALL) + + for i, name in enumerate(name_matches): + description = desc_matches[i] if i < len(desc_matches) else "" + agents_info.append({ + 'name': name, + 'description': description.strip() + }) + + workflow_prompt = f"Create a workflow that uses the following agents:\n\n" + for i, agent in enumerate(agents_info, 1): + workflow_prompt += f"agent{i}: {agent['name']} – {agent['description']}\n" + workflow_prompt += f"\nprompt: {request.content}" + + print("Calling workflow generation on port 8004...") + workflow_resp = requests.post( + "http://localhost:8004/chat", + json={"prompt": workflow_prompt, "agent": "WorkflowYAMLBuilder"}, + timeout=120 + ) + print(f"Workflow generation response status: {workflow_resp.status_code}") + if workflow_resp.status_code != 200: + print(f"Workflow generation error: {workflow_resp.text}") + raise Exception(f"Workflow generation failed: {workflow_resp.text}") + + workflow_output = workflow_resp.json().get("response", "") + print(f"Workflow generation output length: {len(workflow_output)}") + workflow_yaml = "" + if "```yaml" in workflow_output: + workflow_yaml = ( + workflow_output.split("```yaml", 1)[-1].split("```", 1)[0].strip() + ) + elif "```" in workflow_output: + workflow_yaml = workflow_output.split("```", 1)[-1].split("```", 1)[0].strip() + else: + yaml_match = re.search(r'apiVersion:.*?(?=\n\n|\Z)', workflow_output, re.DOTALL) + if yaml_match: + workflow_yaml = yaml_match.group(0).strip() + + print(f"Extracted workflow YAML length: {len(workflow_yaml)}") + + clean_response = f"""✅ Successfully generated both agents.yaml and workflow.yaml from your prompt! + +Your request: "{request.content}" + +I've created: +• **agents.yaml** - Contains the agent definitions +• **workflow.yaml** - Contains the workflow that uses those agents + +Both files are now available in the YAML panel on the right. You can switch between tabs to view each file.""" + + return SupervisorResponse( + intent=intent, + confidence=1.0, + reasoning="Successfully routed to workflow generation", + response=clean_response, + yaml_files=[ + {"name": "agents.yaml", "content": agents_yaml}, + {"name": "workflow.yaml", "content": workflow_yaml} + ], + chat_id=request.chat_id or str(uuid.uuid4()) + ) + except Exception as e: + raise Exception(f"Workflow generation failed: {str(e)}") + + except Exception as e: + print(f"Error in supervisor routing: {str(e)}") + try: + agents_resp = requests.post( + "http://localhost:8003/chat", + json={"prompt": request.content, "agent": "TaskInterpreter"}, + timeout=120 + ) + if agents_resp.status_code != 200: + raise Exception(f"Agents generation failed: {agents_resp.text}") + + agents_output = agents_resp.json().get("response", "") + agents_yaml = "" + if "```yaml" in agents_output: + agents_yaml = ( + agents_output.split("```yaml", 1)[-1].split("```", 1)[0].strip() + ) + elif "```" in agents_output: + agents_yaml = agents_output.split("```", 1)[-1].split("```", 1)[0].strip() + else: + yaml_match = re.search(r'apiVersion:.*?(?=\n\n|\Z)', agents_output, re.DOTALL) + if yaml_match: + agents_yaml = yaml_match.group(0).strip() + + clean_response = f"""✅ Successfully generated agents.yaml from your prompt! + +Your request: "{request.content}" + +I've created: +• **agents.yaml** - Contains the agent definitions + +The file is now available in the YAML panel on the right.""" + + return SupervisorResponse( + intent="GENERATE_WORKFLOW", + confidence=1.0, + reasoning=f"Error in supervisor routing: {str(e)}, falling back to agent generation only", + response=clean_response, + yaml_files=[ + {"name": "agents.yaml", "content": agents_yaml} + ], + chat_id=request.chat_id or str(uuid.uuid4()) + ) + except Exception as fallback_error: + raise HTTPException(status_code=500, detail=f"Supervisor routing failed: {str(e)}, fallback also failed: {str(fallback_error)}") + + @app.get("/api/health") async def health_check(): try: From bf90078e13408bb8174f0e028c8e14b98ed3cbef Mon Sep 17 00:00:00 2001 From: george-lhj Date: Tue, 5 Aug 2025 11:54:27 -0700 Subject: [PATCH 03/20] remove edit button --- src/App.tsx | 169 +++++++++++++++------------------------------------- 1 file changed, 49 insertions(+), 120 deletions(-) diff --git a/src/App.tsx b/src/App.tsx index ab238b9..6c21469 100644 --- a/src/App.tsx +++ b/src/App.tsx @@ -3,8 +3,7 @@ import { Sidebar } from './components/Sidebar' import { ChatCanvas } from './components/ChatCanvas' import { YamlPanel } from './components/YamlPanel' import { ChatInput } from './components/ChatInput' -import { apiService, type ChatHistory, type StreamEvent } from './services/api' -import axios from 'axios' +import { apiService, type ChatHistory, type ChatSession } from './services/api' export interface Message { id: string @@ -24,7 +23,7 @@ function App() { { id: '1', role: 'assistant', - content: 'Hello! I\'m your Maestro AI Builder assistant. I can help you create both agents.yaml and workflow.yaml files from a single prompt. Just describe what you want to build, and I\'ll generate both files automatically. What would you like to build today?', + content: 'Hello! I\'m your Maestro AI Builder assistant. I can help you create workflows and edit YAML files. Just describe what you want to build or what changes you want to make, and I\'ll handle it automatically. What would you like to do today?', timestamp: new Date() } ]) @@ -112,7 +111,7 @@ function App() { { id: '1', role: 'assistant', - content: 'Hello! I\'m your Maestro AI Builder assistant. I can help you create both agents.yaml and workflow.yaml files from a single prompt. Just describe what you want to build, and I\'ll generate both files automatically. What would you like to build today?', + content: 'Hello! I\'m your Maestro AI Builder assistant. I can help you create workflows and edit YAML files. Just describe what you want to build or what changes you want to make, and I\'ll handle it automatically. What would you like to do today?', timestamp: new Date() } ]) @@ -221,71 +220,54 @@ function App() { }) try { - if (useStreaming) { - await apiService.streamGenerateMessage(content, { - onEvent: async (event: StreamEvent) => { - if (event.type === 'status') { - setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: m.content ? `${m.content}\n… ${event.message}` : `… ${event.message}` } : m)) - } - if (event.type === 'chat_id') { - if (event.chat_id !== currentChatId) { - setCurrentChatId(event.chat_id) - await loadChatHistory() - } - } - if (event.type === 'agents_yaml' || event.type === 'workflow_yaml') { - mergeYaml(event.file) - } - if (event.type === 'ai_output') { - const line = event.line - setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: m.content ? `${m.content}\n${line}` : line } : m)) - } - if (event.type === 'final') { - // Add a new assistant message with final content - let parsedText = event.response - try { - const parsedJSON = JSON.parse(parsedText as string) - if ((parsedJSON as any).final_prompt) parsedText = (parsedJSON as any).final_prompt - } catch {} - setMessages(prev => [...prev, { id: String(Date.now() + 2), role: 'assistant', content: parsedText, timestamp: new Date() }]) - // Ensure YAML files reflect final payload - event.yaml_files.forEach(file => mergeYaml(file)) - if (event.chat_id !== currentChatId) { - setCurrentChatId(event.chat_id) - await loadChatHistory() - } - } - if (event.type === 'error') { - setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: `${m.content}\nError: ${event.message}` } : m)) - } - }, - onError: (err: Error) => { - console.error('Streaming error:', err) - }, - onComplete: () => { - setIsLoading(false) - // Close scoped log streams - closeAgentsLogs() - closeWorkflowLogs() + // Use the supervisor endpoint to route to appropriate handler + const apiResponse = await apiService.sendMessage(content, currentChatId || undefined); + + // Parse AI response (final_prompt if available) + let parsedText = apiResponse.response + try { + const parsedJSON = JSON.parse(parsedText) + if (parsedJSON.final_prompt) { + parsedText = parsedJSON.final_prompt } - }, currentChatId || undefined) - } else { - const response = await apiService.sendGenerateMessage(content, currentChatId || undefined) - response.yaml_files.forEach(file => mergeYaml(file)) - setMessages(prev => [...prev, { - id: String(Date.now() + 2), - role: 'assistant', - content: response.response, - timestamp: new Date() - }]) - if (response.chat_id !== currentChatId) { - setCurrentChatId(response.chat_id) - await loadChatHistory() + } catch (e) { + // Not JSON — ignore + } + + // Don't strip YAML code blocks from the response text since we want to show the full response + // The YAML files are handled separately in the yaml_files array + + const assistantMessage: Message = { + id: (Date.now() + 1).toString(), + content: parsedText, + role: 'assistant', + timestamp: new Date() + } + + setMessages(prev => [...prev, assistantMessage]) + + // Update YAML files from API response, ensuring both files are updated + const updatedYamlFiles = yamlFiles.map(file => { + const apiFile = apiResponse.yaml_files.find(apiFile => apiFile.name === file.name) + if (apiFile) { + return { + ...file, + content: apiFile.content + } } - setIsLoading(false) - closeAgentsLogs() - closeWorkflowLogs() + return file + }) + setYamlFiles(updatedYamlFiles) + + // Update chat ID if it changed + if (apiResponse.chat_id !== currentChatId) { + setCurrentChatId(apiResponse.chat_id) + await loadChatHistory() } + + setIsLoading(false) + closeAgentsLogs() + closeWorkflowLogs() } catch (error) { console.error('Error processing message:', error) setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: `${m.content}\nSorry, I encountered an error while processing your request. Please try again.` } : m)) @@ -295,53 +277,6 @@ function App() { } } - // Helper to get the currently active YAML file (default to agents.yaml) - const getCurrentYamlFile = () => { - // For now, just use agents.yaml (can be improved to track active tab) - return yamlFiles.find(f => f.name === 'agents.yaml') || yamlFiles[0] - } - - // Handler for editing YAML - const handleEditYaml = async (instruction: string) => { - const currentYamlFile = getCurrentYamlFile(); - if (!currentYamlFile) return; - setIsLoading(true); - // Add the edit instruction as a user message - const userEditMessage = { - id: Date.now().toString(), - role: 'user' as 'user', - content: `[Edit YAML] ${instruction}`, - timestamp: new Date() - }; - setMessages(prev => [...prev, userEditMessage]); - - try { - const response = await axios.post('/api/edit_yaml', { - yaml: currentYamlFile.content, - instruction, - file_type: currentYamlFile.name.includes('workflow') ? 'workflow' : 'agents', - }); - const editedYaml = response.data.edited_yaml; - // Add the editing agent's response as an assistant message - const assistantEditMessage = { - id: (Date.now() + 1).toString(), - role: 'assistant' as 'assistant', - content: editedYaml, - timestamp: new Date() - }; - setMessages(prev => [...prev, assistantEditMessage]); - - setYamlFiles(yamlFiles.map(f => - f.name === currentYamlFile.name - ? { ...f, content: editedYaml } - : f - )); - } catch (error) { - console.error('Error editing YAML:', error); - } finally { - setIsLoading(false); - } - }; const handleDirectYamlEdit = (fileName: string, newContent: string) => { setYamlFiles(prev => prev.map(f => @@ -370,13 +305,7 @@ function App() { {/* Chat Input */}
- +
From 69f2e55b100d0b0e7abe2f1033bc456147d1c165 Mon Sep 17 00:00:00 2001 From: george-lhj Date: Tue, 5 Aug 2025 11:54:49 -0700 Subject: [PATCH 04/20] remove edit button --- api/main.py | 1 + src/components/ChatInput.tsx | 20 +++----------------- 2 files changed, 4 insertions(+), 17 deletions(-) diff --git a/api/main.py b/api/main.py index ca14649..3f331d9 100644 --- a/api/main.py +++ b/api/main.py @@ -21,6 +21,7 @@ import asyncio from pathlib import Path import httpx +import requests # Initialize FastAPI app app = FastAPI( diff --git a/src/components/ChatInput.tsx b/src/components/ChatInput.tsx index 5287c07..1ff4bd1 100644 --- a/src/components/ChatInput.tsx +++ b/src/components/ChatInput.tsx @@ -4,14 +4,14 @@ import { Send, Paperclip, Mic, ChevronDown, Lightbulb } from 'lucide-react' import { cn } from '../lib/utils' interface ChatInputProps { - onSendMessage: (message: string, useStreaming: boolean) => void + onSendMessage: (message: string) => void onEditYaml?: (instruction: string) => void disabled?: boolean streamingEnabled?: boolean onToggleStreaming?: (enabled: boolean) => void } -export function ChatInput({ onSendMessage, onEditYaml, disabled = false, streamingEnabled = true, onToggleStreaming }: ChatInputProps) { +export function ChatInput({ onSendMessage, onEditYaml, disabled = false }: ChatInputProps) { const [message, setMessage] = useState('') const [isTyping, setIsTyping] = useState(false) const [showSuggestions, setShowSuggestions] = useState(false) @@ -74,21 +74,7 @@ export function ChatInput({ onSendMessage, onEditYaml, disabled = false, streami - {/* Edit YAML Button */} - {onEditYaml && ( - - )} + {/* Suggestions Dropdown */}
From 16fc588794d708f43c3209400f09f9063f642371 Mon Sep 17 00:00:00 2001 From: george-lhj Date: Tue, 5 Aug 2025 11:55:01 -0700 Subject: [PATCH 05/20] first send to supervisor endpoint --- src/services/api.ts | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/services/api.ts b/src/services/api.ts index 1ceaeb2..a75f705 100644 --- a/src/services/api.ts +++ b/src/services/api.ts @@ -70,34 +70,37 @@ class ApiService { async sendMessage(message: string, chatId?: string): Promise { try { - const response = await fetch(`${API_BASE_URL}/api/chat_builder_agent`, { + const response = await fetch(`${API_BASE_URL}/api/supervisor`, { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify({ content: message, - role: 'user', chat_id: chatId || this.currentChatId - } as ChatMessage & { chat_id?: string }) + }) }) if (!response.ok) { throw new Error(`HTTP error! status: ${response.status}`) } - const data: ChatResponse = await response.json() - - // Store the chat ID for future messages + const data = await response.json() + this.currentChatId = data.chat_id - // Ensure YAML files have proper formatting - data.yaml_files = data.yaml_files.map(file => ({ + data.yaml_files = data.yaml_files.map((file: any) => ({ ...file, content: this.formatYamlContent(file.content) })) - return data + const chatResponse: ChatResponse = { + response: data.response, + yaml_files: data.yaml_files, + chat_id: data.chat_id + } + + return chatResponse } catch (error) { console.error('Error sending message:', error) // Return a fallback response if API is not available From c45ac6844d76b9367a2e5bef3aff5f66bda07e57 Mon Sep 17 00:00:00 2001 From: george-lhj Date: Tue, 5 Aug 2025 11:55:12 -0700 Subject: [PATCH 06/20] bind 8005 to supervisor --- start.sh | 12 +++++++++++- stop.sh | 4 +++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/start.sh b/start.sh index cc378e5..8c8321c 100755 --- a/start.sh +++ b/start.sh @@ -74,6 +74,14 @@ EDITING_AGENT_PID=$! echo $EDITING_AGENT_PID > logs/editing_agent.pid print_success "Editing Agent backend started with PID: $EDITING_AGENT_PID (port 8002)" +# Start Supervisor Agent backend (port 8005) +SUPERVISOR_AGENT_CMD="maestro serve ./meta-agents-v2/supervisor_agent/agents.yaml ./meta-agents-v2/supervisor_agent/workflow.yaml --port 8005" +print_status "Starting Supervisor Agent backend: $SUPERVISOR_AGENT_CMD" +nohup $SUPERVISOR_AGENT_CMD > logs/supervisor_agent.log 2>&1 & +SUPERVISOR_AGENT_PID=$! +echo $SUPERVISOR_AGENT_PID > logs/supervisor_agent.pid +print_success "Supervisor Agent backend started with PID: $SUPERVISOR_AGENT_PID (port 8005)" + ### ───────────── Start API ───────────── print_status "Starting Maestro API service..." @@ -176,6 +184,7 @@ echo "Services:" echo " - Agent Generation Backend: http://localhost:8003" echo " - Workflow Generation Backend: http://localhost:8004" echo " - Editing Agent Backend: http://localhost:8002" +echo " - Supervisor Agent Backend: http://localhost:8005" echo " - API: http://localhost:8001" echo " - API Docs: http://localhost:8001/docs" echo " - Builder Frontend: http://localhost:5174" @@ -184,8 +193,9 @@ echo "Logs:" echo " - Agent Generation: logs/maestro_agents.log" echo " - Workflow Generation: logs/maestro_workflow.log" echo " - Editing Agent: logs/editing_agent.log" +echo " - Supervisor Agent: logs/supervisor_agent.log" echo " - API: logs/api.log" echo " - Builder: logs/builder.log" echo "" echo "To stop all services, run: ./stop.sh" -echo "To view logs: tail -f logs/api.log | tail -f logs/builder.log | tail -f logs/maestro_agents.log | tail -f logs/maestro_workflow.log | tail -f logs/editing_agent.log" +echo "To view logs: tail -f logs/api.log | tail -f logs/builder.log | tail -f logs/maestro_agents.log | tail -f logs/maestro_workflow.log | tail -f logs/editing_agent.log | tail -f logs/supervisor_agent.log" diff --git a/stop.sh b/stop.sh index 587bf96..526ba0b 100755 --- a/stop.sh +++ b/stop.sh @@ -20,10 +20,11 @@ PID_FILES=( "logs/maestro_agents.pid" "logs/maestro_workflow.pid" "logs/editing_agent.pid" + "logs/supervisor_agent.pid" "logs/api.pid" "logs/builder.pid" ) -PORTS=(8000 8001 8002 8003 8004 5174) +PORTS=(8000 8001 8002 8003 8004 8005 5174) CLEAR_LOGS=false for arg in "$@"; do @@ -82,6 +83,7 @@ if [ "$CLEAR_LOGS" = true ]; then > logs/maestro_agents.log > logs/maestro_workflow.log > logs/editing_agent.log + > logs/supervisor_agent.log > logs/maestro.log print_success "All log files have been cleared." fi From 66c53631bb2fecf083dfaf9c3455383109ba47d1 Mon Sep 17 00:00:00 2001 From: george-lhj Date: Fri, 8 Aug 2025 10:27:03 -0700 Subject: [PATCH 07/20] refactor logic into classes --- api/main.py | 359 ++++++++++++++++++++++++++++------------------------ 1 file changed, 192 insertions(+), 167 deletions(-) diff --git a/api/main.py b/api/main.py index 3f331d9..8a0ea95 100644 --- a/api/main.py +++ b/api/main.py @@ -22,6 +22,7 @@ from pathlib import Path import httpx import requests +from enum import Enum # Initialize FastAPI app app = FastAPI( @@ -105,6 +106,154 @@ class ValidateYamlResponse(BaseModel): message: str errors: List[str] = [] +class Intent(str, Enum): + GENERATE_WORKFLOW = "GENERATE_WORKFLOW" + EDIT_YAML = "EDIT_YAML" + + +class ClassifyUserRequest: + """Encapsulates supervisor call and intent extraction.""" + + SUPERVISOR_URL = "http://localhost:8005/chat" + + def classify( + self, + user_input: str, + agents_yaml_content: str, + workflow_yaml_content: str, + timeout_seconds: int = 30, + ) -> Intent: + supervisor_prompt = self._build_prompt( + user_input, agents_yaml_content, workflow_yaml_content + ) + print(f"Calling supervisor agent with prompt: {supervisor_prompt[:100]}...") + resp = requests.post( + self.SUPERVISOR_URL, + json={"prompt": supervisor_prompt, "agent": "IntentClassifier"}, + timeout=timeout_seconds, + ) + + print(f"Supervisor agent response status: {resp.status_code}") + if resp.status_code != 200: + print(f"Supervisor agent error: {resp.text}") + raise Exception( + f"Supervisor agent failed with status {resp.status_code}" + ) + + supervisor_response = resp.json().get("response", "") + print(f"Supervisor agent response: {supervisor_response[:200]}...") + return self._extract_intent(supervisor_response) + + def _build_prompt( + self, user_input: str, agents_yaml_content: str, workflow_yaml_content: str + ) -> str: + return f"""Analyze the user's input and classify their intent as either workflow generation or YAML editing. + +User input: {user_input} + +Current YAML files (if any): +Agents YAML: +{agents_yaml_content} + +Workflow YAML: +{workflow_yaml_content} + +Please classify the user's intent and return a JSON response with intent, confidence, and reasoning.""" + + def _extract_intent(self, response_text: str) -> Intent: + print(f"Processing supervisor response: {response_text[:200]}...") + if "GENERATE_WORKFLOW" in response_text: + return Intent.GENERATE_WORKFLOW + if "EDIT_YAML" in response_text: + return Intent.EDIT_YAML + return Intent.GENERATE_WORKFLOW + + +def _extract_yaml_from_model_output(text: str) -> str: + if "```yaml" in text: + return text.split("```yaml", 1)[-1].split("```", 1)[0].strip() + if "```" in text: + return text.split("```", 1)[-1].split("```", 1)[0].strip() + yaml_match = re.search(r"apiVersion:.*?(?=\n\n|\Z)", text, re.DOTALL) + return yaml_match.group(0).strip() if yaml_match else "" + + +def _parse_agents_yaml_to_info(agents_yaml: str) -> List[Dict[str, str]]: + agents_info: List[Dict[str, str]] = [] + try: + agent_blocks = agents_yaml.split("---") + for block in agent_blocks: + if block.strip(): + agent_data = yaml.safe_load(block) + if ( + agent_data + and "metadata" in agent_data + and "name" in agent_data["metadata"] + ): + name = agent_data["metadata"]["name"] + description = agent_data.get("spec", {}).get("description", "") + agents_info.append({"name": name, "description": description}) + except yaml.YAMLError: + name_matches = re.findall(r"name:\s*(\w+)", agents_yaml) + desc_matches = re.findall( + r"description:\s*\|\s*\n\s*(.+?)(?=\n\s*\w+:|$)", agents_yaml, re.DOTALL + ) + for i, name in enumerate(name_matches): + description = desc_matches[i] if i < len(desc_matches) else "" + agents_info.append({"name": name, "description": description.strip()}) + return agents_info + + +def _build_workflow_prompt(agents_info: List[Dict[str, str]], user_input: str) -> str: + workflow_prompt = "Create a workflow that uses the following agents:\n\n" + for i, agent in enumerate(agents_info, 1): + workflow_prompt += f"agent{i}: {agent['name']} – {agent['description']}\n" + workflow_prompt += f"\nprompt: {user_input}" + return workflow_prompt + + +def _edit_yaml( + yaml_content: str, file_to_edit: str, instruction: str, timeout_seconds: int = 30 +) -> str: + edit_resp = requests.post( + "http://localhost:8001/api/edit_yaml", + json={ + "yaml": yaml_content, + "instruction": instruction, + "file_type": file_to_edit.split(".")[0], + }, + timeout=timeout_seconds, + ) + if edit_resp.status_code != 200: + raise Exception("Editing agent failed") + return edit_resp.json().get("edited_yaml", "") + + +def _generate_agents_yaml(user_input: str, timeout_seconds: int = 120) -> str: + agents_resp = requests.post( + "http://localhost:8003/chat", + json={"prompt": user_input, "agent": "TaskInterpreter"}, + timeout=timeout_seconds, + ) + if agents_resp.status_code != 200: + raise Exception(f"Agents generation failed: {agents_resp.text}") + agents_output = agents_resp.json().get("response", "") + return _extract_yaml_from_model_output(agents_output) + + +def _generate_workflow_yaml( + workflow_prompt: str, timeout_seconds: int = 120 +) -> str: + workflow_resp = requests.post( + "http://localhost:8004/chat", + json={"prompt": workflow_prompt, "agent": "WorkflowYAMLBuilder"}, + timeout=timeout_seconds, + ) + if workflow_resp.status_code != 200: + raise Exception(f"Workflow generation failed: {workflow_resp.text}") + workflow_output = workflow_resp.json().get("response", "") + return _extract_yaml_from_model_output(workflow_output) + # --------------------------------------- # Service Functions # --------------------------------------- @@ -682,168 +831,44 @@ async def supervisor_route(request: SupervisorRequest): except Exception as e: print(f"Warning: Could not fetch YAML files for context: {e}") - supervisor_prompt = f"""Analyze the user's input and classify their intent as either workflow generation or YAML editing. - -User input: {request.content} - -Current YAML files (if any): -Agents YAML: -{agents_yaml_content} - -Workflow YAML: -{workflow_yaml_content} - -Please classify the user's intent and return a JSON response with intent, confidence, and reasoning.""" - - print(f"Calling supervisor agent with prompt: {supervisor_prompt[:100]}...") - resp = requests.post( - "http://localhost:8005/chat", - json={ - "prompt": supervisor_prompt, - "agent": "IntentClassifier" - }, - timeout=30 + classifier = ClassifyUserRequest() + intent = classifier.classify( + request.content, agents_yaml_content, workflow_yaml_content ) - - print(f"Supervisor agent response status: {resp.status_code}") - if resp.status_code != 200: - print(f"Supervisor agent error: {resp.text}") - raise Exception(f"Supervisor agent failed with status {resp.status_code}") - - supervisor_response = resp.json().get("response", "") - print(f"Supervisor agent response: {supervisor_response[:200]}...") - - def extract_intent_from_response(response_text): - """Extract intent from supervisor response, handling any format""" - print(f"Processing supervisor response: {response_text[:200]}...") - if "GENERATE_WORKFLOW" in response_text: - return "GENERATE_WORKFLOW" - elif "EDIT_YAML" in response_text: - return "EDIT_YAML" - else: - return "GENERATE_WORKFLOW" - - intent = extract_intent_from_response(supervisor_response) print(f"Extracted intent: {intent}") - - # Route to appropriate endpoint based on intent - if intent == "EDIT_YAML": + + if intent == Intent.EDIT_YAML: if not agents_yaml_content and not workflow_yaml_content: - intent = "GENERATE_WORKFLOW" - - if intent == "EDIT_YAML": + intent = Intent.GENERATE_WORKFLOW + else: file_to_edit = "agents.yaml" if agents_yaml_content else "workflow.yaml" - yaml_content = agents_yaml_content if agents_yaml_content else workflow_yaml_content - edit_resp = requests.post( - "http://localhost:8001/api/edit_yaml", - json={ - "yaml": yaml_content, - "instruction": request.content, - "file_type": file_to_edit.split('.')[0] # 'agents' or 'workflow' - }, - timeout=30 + yaml_content = ( + agents_yaml_content if agents_yaml_content else workflow_yaml_content ) - - if edit_resp.status_code == 200: - edit_data = edit_resp.json() - return SupervisorResponse( - intent=intent, - confidence=1.0, - reasoning="Successfully routed to editing", - response=f"Successfully edited {file_to_edit} based on your request: {request.content}", - yaml_files=[{ - "name": file_to_edit, - "content": edit_data["edited_yaml"] - }], - chat_id=request.chat_id or str(uuid.uuid4()) - ) - else: - intent = "GENERATE_WORKFLOW" - - if intent == "GENERATE_WORKFLOW": - print("Routing to workflow generation...") - try: - print("Calling agent generation on port 8003...") - agents_resp = requests.post( - "http://localhost:8003/chat", - json={"prompt": request.content, "agent": "TaskInterpreter"}, - timeout=120 + edited_yaml = _edit_yaml( + yaml_content=yaml_content, + file_to_edit=file_to_edit, + instruction=request.content, ) - print(f"Agent generation response status: {agents_resp.status_code}") - if agents_resp.status_code != 200: - print(f"Agent generation error: {agents_resp.text}") - raise Exception(f"Agents generation failed: {agents_resp.text}") - - agents_output = agents_resp.json().get("response", "") - print(f"Agent generation output length: {len(agents_output)}") - agents_yaml = "" - if "```yaml" in agents_output: - agents_yaml = ( - agents_output.split("```yaml", 1)[-1].split("```", 1)[0].strip() - ) - elif "```" in agents_output: - agents_yaml = agents_output.split("```", 1)[-1].split("```", 1)[0].strip() - else: - yaml_match = re.search(r'apiVersion:.*?(?=\n\n|\Z)', agents_output, re.DOTALL) - if yaml_match: - agents_yaml = yaml_match.group(0).strip() - agents_info = [] - try: - agent_blocks = agents_yaml.split('---') - for block in agent_blocks: - if block.strip(): - agent_data = yaml.safe_load(block) - if agent_data and 'metadata' in agent_data and 'name' in agent_data['metadata']: - name = agent_data['metadata']['name'] - description = agent_data.get('spec', {}).get('description', '') - agents_info.append({ - 'name': name, - 'description': description - }) - except yaml.YAMLError: - name_matches = re.findall(r'name:\s*(\w+)', agents_yaml) - desc_matches = re.findall(r'description:\s*\|\s*\n\s*(.+?)(?=\n\s*\w+:|$)', agents_yaml, re.DOTALL) - - for i, name in enumerate(name_matches): - description = desc_matches[i] if i < len(desc_matches) else "" - agents_info.append({ - 'name': name, - 'description': description.strip() - }) - - workflow_prompt = f"Create a workflow that uses the following agents:\n\n" - for i, agent in enumerate(agents_info, 1): - workflow_prompt += f"agent{i}: {agent['name']} – {agent['description']}\n" - workflow_prompt += f"\nprompt: {request.content}" - - print("Calling workflow generation on port 8004...") - workflow_resp = requests.post( - "http://localhost:8004/chat", - json={"prompt": workflow_prompt, "agent": "WorkflowYAMLBuilder"}, - timeout=120 + return SupervisorResponse( + intent=intent.value, + confidence=1.0, + reasoning="Successfully routed to editing", + response=f"Successfully edited {file_to_edit} based on your request: {request.content}", + yaml_files=[{"name": file_to_edit, "content": edited_yaml}], + chat_id=request.chat_id or str(uuid.uuid4()), ) - print(f"Workflow generation response status: {workflow_resp.status_code}") - if workflow_resp.status_code != 200: - print(f"Workflow generation error: {workflow_resp.text}") - raise Exception(f"Workflow generation failed: {workflow_resp.text}") - - workflow_output = workflow_resp.json().get("response", "") - print(f"Workflow generation output length: {len(workflow_output)}") - workflow_yaml = "" - if "```yaml" in workflow_output: - workflow_yaml = ( - workflow_output.split("```yaml", 1)[-1].split("```", 1)[0].strip() - ) - elif "```" in workflow_output: - workflow_yaml = workflow_output.split("```", 1)[-1].split("```", 1)[0].strip() - else: - yaml_match = re.search(r'apiVersion:.*?(?=\n\n|\Z)', workflow_output, re.DOTALL) - if yaml_match: - workflow_yaml = yaml_match.group(0).strip() - - print(f"Extracted workflow YAML length: {len(workflow_yaml)}") - clean_response = f"""✅ Successfully generated both agents.yaml and workflow.yaml from your prompt! + print("Routing to workflow generation...") + try: + print("Calling agent generation on port 8003...") + agents_yaml = _generate_agents_yaml(request.content) + agents_info = _parse_agents_yaml_to_info(agents_yaml) + workflow_prompt = _build_workflow_prompt(agents_info, request.content) + print("Calling workflow generation on port 8004...") + workflow_yaml = _generate_workflow_yaml(workflow_prompt) + + clean_response = f"""✅ Successfully generated both agents.yaml and workflow.yaml from your prompt! Your request: "{request.content}" @@ -853,19 +878,19 @@ def extract_intent_from_response(response_text): Both files are now available in the YAML panel on the right. You can switch between tabs to view each file.""" - return SupervisorResponse( - intent=intent, - confidence=1.0, - reasoning="Successfully routed to workflow generation", - response=clean_response, - yaml_files=[ - {"name": "agents.yaml", "content": agents_yaml}, - {"name": "workflow.yaml", "content": workflow_yaml} - ], - chat_id=request.chat_id or str(uuid.uuid4()) - ) - except Exception as e: - raise Exception(f"Workflow generation failed: {str(e)}") + return SupervisorResponse( + intent=Intent.GENERATE_WORKFLOW.value, + confidence=1.0, + reasoning="Successfully routed to workflow generation", + response=clean_response, + yaml_files=[ + {"name": "agents.yaml", "content": agents_yaml}, + {"name": "workflow.yaml", "content": workflow_yaml}, + ], + chat_id=request.chat_id or str(uuid.uuid4()), + ) + except Exception as e: + raise Exception(f"Workflow generation failed: {str(e)}") except Exception as e: print(f"Error in supervisor routing: {str(e)}") From 3e229463b2c5e86d92d6500744b45d3a75a45eba Mon Sep 17 00:00:00 2001 From: george-lhj Date: Fri, 8 Aug 2025 10:53:59 -0700 Subject: [PATCH 08/20] require json output for supervisor agent --- api/main.py | 106 ++++++++++++-------- meta-agents-v2/supervisor_agent/agents.yaml | 17 ++-- 2 files changed, 75 insertions(+), 48 deletions(-) diff --git a/api/main.py b/api/main.py index 8a0ea95..9e9759e 100644 --- a/api/main.py +++ b/api/main.py @@ -23,6 +23,8 @@ import httpx import requests from enum import Enum +import json +from dataclasses import dataclass # Initialize FastAPI app app = FastAPI( @@ -111,6 +113,13 @@ class Intent(str, Enum): EDIT_YAML = "EDIT_YAML" +@dataclass +class Classification: + intent: Intent + confidence: float + reasoning: str + + class ClassifyUserRequest: """Encapsulates supervisor call and intent extraction.""" @@ -122,32 +131,44 @@ def classify( agents_yaml_content: str, workflow_yaml_content: str, timeout_seconds: int = 30, - ) -> Intent: + ) -> Classification: supervisor_prompt = self._build_prompt( user_input, agents_yaml_content, workflow_yaml_content ) - print(f"Calling supervisor agent with prompt: {supervisor_prompt[:100]}...") resp = requests.post( self.SUPERVISOR_URL, json={"prompt": supervisor_prompt, "agent": "IntentClassifier"}, timeout=timeout_seconds, ) - - print(f"Supervisor agent response status: {resp.status_code}") if resp.status_code != 200: - print(f"Supervisor agent error: {resp.text}") raise Exception( f"Supervisor agent failed with status {resp.status_code}" ) supervisor_response = resp.json().get("response", "") - print(f"Supervisor agent response: {supervisor_response[:200]}...") - return self._extract_intent(supervisor_response) + + try: + parsed = json.loads(supervisor_response) + raw_intent = str(parsed.get("intent", "")).upper() + intent = ( + Intent(raw_intent) + if raw_intent in (Intent.GENERATE_WORKFLOW.value, Intent.EDIT_YAML.value) + else Intent.GENERATE_WORKFLOW + ) + confidence = float(parsed.get("confidence", 1.0)) + reasoning = str(parsed.get("reasoning", "")) + return Classification(intent=intent, confidence=confidence, reasoning=reasoning) + except Exception: + return Classification( + intent=Intent.GENERATE_WORKFLOW, + confidence=0.5, + reasoning="Defaulted due to non-JSON supervisor output", + ) def _build_prompt( self, user_input: str, agents_yaml_content: str, workflow_yaml_content: str ) -> str: - return f"""Analyze the user's input and classify their intent as either workflow generation or YAML editing. + return f"""You are an intent classifier. Determine if the user wants to GENERATE_WORKFLOW or EDIT_YAML. User input: {user_input} @@ -158,17 +179,18 @@ def _build_prompt( Workflow YAML: {workflow_yaml_content} -Please classify the user's intent and return a JSON response with intent, confidence, and reasoning.""" - - def _extract_intent(self, response_text: str) -> Intent: - print(f"Processing supervisor response: {response_text[:200]}...") - if "GENERATE_WORKFLOW" in response_text: - return Intent.GENERATE_WORKFLOW - if "EDIT_YAML" in response_text: - return Intent.EDIT_YAML - return Intent.GENERATE_WORKFLOW +Return ONLY valid JSON (no prose, no markdown) with the following schema: +{{ + "intent": "GENERATE_WORKFLOW" | "EDIT_YAML", + "confidence": number, // 0.0 to 1.0 + "reasoning": string +}} +Example valid responses: +{{"intent":"GENERATE_WORKFLOW","confidence":0.92,"reasoning":"User is asking to create a new flow"}} +{{"intent":"EDIT_YAML","confidence":0.87,"reasoning":"User wants to modify existing YAML"}}""" + def _extract_yaml_from_model_output(text: str) -> str: if "```yaml" in text: return text.split("```yaml", 1)[-1].split("```", 1)[0].strip() @@ -254,6 +276,21 @@ def _generate_workflow_yaml( workflow_output = workflow_resp.json().get("response", "") return _extract_yaml_from_model_output(workflow_output) + +def _build_generation_success_text(user_request: str) -> str: + return ( + "✅ Successfully generated both agents.yaml and workflow.yaml from your prompt!\n\n" + f"Your request: \"{user_request}\"\n\n" + "I've created:\n" + "• **agents.yaml** - Contains the agent definitions\n" + "• **workflow.yaml** - Contains the workflow that uses those agents\n\n" + "Both files are now available in the YAML panel on the right. You can switch between tabs to view each file." + ) + + +def _build_edit_success_text(file_to_edit: str, instruction: str) -> str: + return f"Successfully edited {file_to_edit} based on your request: {instruction}" + # --------------------------------------- # Service Functions # --------------------------------------- @@ -704,7 +741,6 @@ async def delete_chat_session(chat_id: str): @app.post("/api/edit_yaml", response_model=EditYamlResponse) async def edit_yaml(request: EditYamlRequest): try: - # Build the prompt for the editing agent prompt = f"Current YAML file (type: {request.file_type}):\n{request.yaml}\n\nUser instruction: {request.instruction}\n\nPlease apply the requested edit and return only the updated YAML file." resp = requests.post( "http://localhost:8002/chat", @@ -713,7 +749,6 @@ async def edit_yaml(request: EditYamlRequest): if resp.status_code != 200: raise Exception(resp.text) edited_yaml = resp.json().get("response", "") - # Remove markdown formatting if present if "```yaml" in edited_yaml: edited_yaml = ( edited_yaml.split("```yaml", 1)[-1].split("```", 1)[0].strip() @@ -728,7 +763,6 @@ async def edit_yaml(request: EditYamlRequest): @app.post("/api/validate_yaml", response_model=ValidateYamlResponse) async def validate_yaml(request: ValidateYamlRequest): try: - # fix double-escaped characters import codecs unescaped_content = codecs.decode(request.yaml_content, 'unicode_escape') with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as temp_file: @@ -832,14 +866,14 @@ async def supervisor_route(request: SupervisorRequest): print(f"Warning: Could not fetch YAML files for context: {e}") classifier = ClassifyUserRequest() - intent = classifier.classify( + classification = classifier.classify( request.content, agents_yaml_content, workflow_yaml_content ) - print(f"Extracted intent: {intent}") + print(f"Extracted intent: {classification.intent} confidence={classification.confidence}") - if intent == Intent.EDIT_YAML: + if classification.intent == Intent.EDIT_YAML: if not agents_yaml_content and not workflow_yaml_content: - intent = Intent.GENERATE_WORKFLOW + classification.intent = Intent.GENERATE_WORKFLOW else: file_to_edit = "agents.yaml" if agents_yaml_content else "workflow.yaml" yaml_content = ( @@ -851,10 +885,10 @@ async def supervisor_route(request: SupervisorRequest): instruction=request.content, ) return SupervisorResponse( - intent=intent.value, - confidence=1.0, - reasoning="Successfully routed to editing", - response=f"Successfully edited {file_to_edit} based on your request: {request.content}", + intent=classification.intent.value, + confidence=float(classification.confidence), + reasoning=classification.reasoning or "Successfully routed to editing", + response=_build_edit_success_text(file_to_edit, request.content), yaml_files=[{"name": file_to_edit, "content": edited_yaml}], chat_id=request.chat_id or str(uuid.uuid4()), ) @@ -868,21 +902,11 @@ async def supervisor_route(request: SupervisorRequest): print("Calling workflow generation on port 8004...") workflow_yaml = _generate_workflow_yaml(workflow_prompt) - clean_response = f"""✅ Successfully generated both agents.yaml and workflow.yaml from your prompt! - -Your request: "{request.content}" - -I've created: -• **agents.yaml** - Contains the agent definitions -• **workflow.yaml** - Contains the workflow that uses those agents - -Both files are now available in the YAML panel on the right. You can switch between tabs to view each file.""" - return SupervisorResponse( intent=Intent.GENERATE_WORKFLOW.value, - confidence=1.0, - reasoning="Successfully routed to workflow generation", - response=clean_response, + confidence=float(classification.confidence) if classification else 1.0, + reasoning=classification.reasoning or "Successfully routed to workflow generation", + response=_build_generation_success_text(request.content), yaml_files=[ {"name": "agents.yaml", "content": agents_yaml}, {"name": "workflow.yaml", "content": workflow_yaml}, diff --git a/meta-agents-v2/supervisor_agent/agents.yaml b/meta-agents-v2/supervisor_agent/agents.yaml index 3913332..e6dec22 100644 --- a/meta-agents-v2/supervisor_agent/agents.yaml +++ b/meta-agents-v2/supervisor_agent/agents.yaml @@ -5,7 +5,7 @@ metadata: labels: app: supervisor-agent spec: - model: deepseek-r1:latest + model: gpt-oss:latest framework: openai mode: local description: | @@ -40,15 +40,18 @@ spec: - Examples: "Change the model to gpt-4", "Fix the indentation", "Update the API key", "Change the temperature to 0.8", "Add a new agent", "Modify the workflow steps" Output Format: - Simply include the words "GENERATE_WORKFLOW" or "EDIT_YAML" in your response. + Return ONLY valid JSON (no prose, no markdown) with the following schema: + { + "intent": "GENERATE_WORKFLOW" | "EDIT_YAML", + "confidence": number, // 0.0 to 1.0 + "reasoning": string + } Examples: - - "Based on the user's request, this should be GENERATE_WORKFLOW" - - "This is an EDIT_YAML request" - - "GENERATE_WORKFLOW" - - "EDIT_YAML" + {"intent":"GENERATE_WORKFLOW","confidence":0.92,"reasoning":"User is asking to create a new workflow"} + {"intent":"EDIT_YAML","confidence":0.87,"reasoning":"User wants to modify existing YAML content"} - The system will automatically extract the intent from your response. + The system will parse this JSON and route the request accordingly. Guidelines: - Be conservative - if unsure, default to GENERATE_WORKFLOW From 9249c18f7a18de1cf1226f3ace20d49eb4aae560 Mon Sep 17 00:00:00 2001 From: george-lhj Date: Tue, 19 Aug 2025 14:27:31 -0700 Subject: [PATCH 09/20] break apart supervisor logic --- api/main.py | 475 ++++++++++++++++++++++------------------------ api/supervisor.py | 398 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 627 insertions(+), 246 deletions(-) create mode 100644 api/supervisor.py diff --git a/api/main.py b/api/main.py index 9e9759e..b337bad 100644 --- a/api/main.py +++ b/api/main.py @@ -11,6 +11,7 @@ from datetime import datetime from api.ai_agent import MaestroBuilderAgent from api.database import Database +from api.supervisor import SupervisorAgent, Intent import uuid import subprocess import tempfile @@ -21,10 +22,7 @@ import asyncio from pathlib import Path import httpx -import requests -from enum import Enum -import json -from dataclasses import dataclass +from concurrent.futures import ThreadPoolExecutor # Initialize FastAPI app app = FastAPI( @@ -108,188 +106,26 @@ class ValidateYamlResponse(BaseModel): message: str errors: List[str] = [] -class Intent(str, Enum): - GENERATE_WORKFLOW = "GENERATE_WORKFLOW" - EDIT_YAML = "EDIT_YAML" - - -@dataclass -class Classification: - intent: Intent - confidence: float - reasoning: str - - -class ClassifyUserRequest: - """Encapsulates supervisor call and intent extraction.""" - - SUPERVISOR_URL = "http://localhost:8005/chat" - - def classify( - self, - user_input: str, - agents_yaml_content: str, - workflow_yaml_content: str, - timeout_seconds: int = 30, - ) -> Classification: - supervisor_prompt = self._build_prompt( - user_input, agents_yaml_content, workflow_yaml_content - ) - resp = requests.post( - self.SUPERVISOR_URL, - json={"prompt": supervisor_prompt, "agent": "IntentClassifier"}, - timeout=timeout_seconds, - ) - if resp.status_code != 200: - raise Exception( - f"Supervisor agent failed with status {resp.status_code}" - ) - - supervisor_response = resp.json().get("response", "") - - try: - parsed = json.loads(supervisor_response) - raw_intent = str(parsed.get("intent", "")).upper() - intent = ( - Intent(raw_intent) - if raw_intent in (Intent.GENERATE_WORKFLOW.value, Intent.EDIT_YAML.value) - else Intent.GENERATE_WORKFLOW - ) - confidence = float(parsed.get("confidence", 1.0)) - reasoning = str(parsed.get("reasoning", "")) - return Classification(intent=intent, confidence=confidence, reasoning=reasoning) - except Exception: - return Classification( - intent=Intent.GENERATE_WORKFLOW, - confidence=0.5, - reasoning="Defaulted due to non-JSON supervisor output", - ) - - def _build_prompt( - self, user_input: str, agents_yaml_content: str, workflow_yaml_content: str - ) -> str: - return f"""You are an intent classifier. Determine if the user wants to GENERATE_WORKFLOW or EDIT_YAML. - -User input: {user_input} - -Current YAML files (if any): -Agents YAML: -{agents_yaml_content} - -Workflow YAML: -{workflow_yaml_content} - -Return ONLY valid JSON (no prose, no markdown) with the following schema: -{{ - "intent": "GENERATE_WORKFLOW" | "EDIT_YAML", - "confidence": number, // 0.0 to 1.0 - "reasoning": string -}} - -Example valid responses: -{{"intent":"GENERATE_WORKFLOW","confidence":0.92,"reasoning":"User is asking to create a new flow"}} -{{"intent":"EDIT_YAML","confidence":0.87,"reasoning":"User wants to modify existing YAML"}}""" - - -def _extract_yaml_from_model_output(text: str) -> str: - if "```yaml" in text: - return text.split("```yaml", 1)[-1].split("```", 1)[0].strip() - if "```" in text: - return text.split("```", 1)[-1].split("```", 1)[0].strip() - yaml_match = re.search(r"apiVersion:.*?(?=\n\n|\Z)", text, re.DOTALL) - return yaml_match.group(0).strip() if yaml_match else "" - - -def _parse_agents_yaml_to_info(agents_yaml: str) -> List[Dict[str, str]]: - agents_info: List[Dict[str, str]] = [] - try: - agent_blocks = agents_yaml.split("---") - for block in agent_blocks: - if block.strip(): - agent_data = yaml.safe_load(block) - if ( - agent_data - and "metadata" in agent_data - and "name" in agent_data["metadata"] - ): - name = agent_data["metadata"]["name"] - description = agent_data.get("spec", {}).get("description", "") - agents_info.append({"name": name, "description": description}) - except yaml.YAMLError: - name_matches = re.findall(r"name:\s*(\w+)", agents_yaml) - desc_matches = re.findall( - r"description:\s*\|\s*\n\s*(.+?)(?=\n\s*\w+:|$)", agents_yaml, re.DOTALL - ) - for i, name in enumerate(name_matches): - description = desc_matches[i] if i < len(desc_matches) else "" - agents_info.append({"name": name, "description": description.strip()}) - return agents_info - - -def _build_workflow_prompt(agents_info: List[Dict[str, str]], user_input: str) -> str: - workflow_prompt = "Create a workflow that uses the following agents:\n\n" - for i, agent in enumerate(agents_info, 1): - workflow_prompt += f"agent{i}: {agent['name']} – {agent['description']}\n" - workflow_prompt += f"\nprompt: {user_input}" - return workflow_prompt - - -def _edit_yaml( - yaml_content: str, file_to_edit: str, instruction: str, timeout_seconds: int = 30 -) -> str: - edit_resp = requests.post( - "http://localhost:8001/api/edit_yaml", - json={ - "yaml": yaml_content, - "instruction": instruction, - "file_type": file_to_edit.split(".")[0], - }, - timeout=timeout_seconds, - ) - if edit_resp.status_code != 200: - raise Exception("Editing agent failed") - return edit_resp.json().get("edited_yaml", "") - - -def _generate_agents_yaml(user_input: str, timeout_seconds: int = 120) -> str: - agents_resp = requests.post( - "http://localhost:8003/chat", - json={"prompt": user_input, "agent": "TaskInterpreter"}, - timeout=timeout_seconds, - ) - if agents_resp.status_code != 200: - raise Exception(f"Agents generation failed: {agents_resp.text}") - agents_output = agents_resp.json().get("response", "") - return _extract_yaml_from_model_output(agents_output) - - -def _generate_workflow_yaml( - workflow_prompt: str, timeout_seconds: int = 120 -) -> str: - workflow_resp = requests.post( - "http://localhost:8004/chat", - json={"prompt": workflow_prompt, "agent": "WorkflowYAMLBuilder"}, - timeout=timeout_seconds, - ) - if workflow_resp.status_code != 200: - raise Exception(f"Workflow generation failed: {workflow_resp.text}") - workflow_output = workflow_resp.json().get("response", "") - return _extract_yaml_from_model_output(workflow_output) - - -def _build_generation_success_text(user_request: str) -> str: - return ( - "✅ Successfully generated both agents.yaml and workflow.yaml from your prompt!\n\n" - f"Your request: \"{user_request}\"\n\n" - "I've created:\n" - "• **agents.yaml** - Contains the agent definitions\n" - "• **workflow.yaml** - Contains the workflow that uses those agents\n\n" - "Both files are now available in the YAML panel on the right. You can switch between tabs to view each file." - ) - +# Status tracking for frontend updates +status_updates = {} +last_sent_index = {} +request_results = {} + +def create_status_logger(chat_id: str): + """Create a logger function that tracks status updates for the frontend.""" + def log_status(message: str, level: str = "info"): + if chat_id not in status_updates: + status_updates[chat_id] = [] + update = { + "message": message, + "level": level, + "timestamp": datetime.now().isoformat() + } + status_updates[chat_id].append(update) + return log_status -def _build_edit_success_text(file_to_edit: str, instruction: str) -> str: - return f"Successfully edited {file_to_edit} based on your request: {instruction}" +supervisor_agent = SupervisorAgent() +executor = ThreadPoolExecutor(max_workers=4) # --------------------------------------- # Service Functions @@ -740,21 +576,21 @@ async def delete_chat_session(chat_id: str): @app.post("/api/edit_yaml", response_model=EditYamlResponse) async def edit_yaml(request: EditYamlRequest): + """Edit YAML content based on user instruction.""" + if not request.yaml or not request.yaml.strip(): + raise HTTPException(status_code=400, detail="YAML content cannot be empty") + if not request.instruction or not request.instruction.strip(): + raise HTTPException(status_code=400, detail="Instruction cannot be empty") + if not request.file_type or request.file_type not in ["agents", "workflow"]: + raise HTTPException(status_code=400, detail="File type must be 'agents' or 'workflow'") + try: - prompt = f"Current YAML file (type: {request.file_type}):\n{request.yaml}\n\nUser instruction: {request.instruction}\n\nPlease apply the requested edit and return only the updated YAML file." - resp = requests.post( - "http://localhost:8002/chat", - json={"prompt": prompt}, + file_name = f"{request.file_type}.yaml" + edited_yaml = supervisor_agent.edit_yaml( + yaml_content=request.yaml, + file_to_edit=file_name, + instruction=request.instruction ) - if resp.status_code != 200: - raise Exception(resp.text) - edited_yaml = resp.json().get("response", "") - if "```yaml" in edited_yaml: - edited_yaml = ( - edited_yaml.split("```yaml", 1)[-1].split("```", 1)[0].strip() - ) - elif "```" in edited_yaml: - edited_yaml = edited_yaml.split("```", 1)[-1].split("```", 1)[0].strip() return {"edited_yaml": edited_yaml} except Exception as e: raise HTTPException(status_code=500, detail=f"Editing Agent failed: {e}") @@ -844,69 +680,182 @@ class SupervisorResponse(BaseModel): yaml_files: List[Dict[str, str]] chat_id: str +class AsyncSupervisorResponse(BaseModel): + request_id: str + status: str + message: str + +def process_supervisor_request_background(request_id: str, content: str, chat_id: str): + """Background function to process supervisor requests with real-time status updates.""" + try: + status_logger = create_status_logger(request_id) + logged_supervisor = SupervisorAgent(logger_callback=status_logger) + status_logger("🎯 Processing your request...") + agents_yaml_content = "" + workflow_yaml_content = "" + + if chat_id: + try: + status_logger("📂 Loading existing YAML files for context...") + yaml_files = db.get_yaml_files(chat_id) + for file in yaml_files: + if file['name'] == 'agents.yaml': + agents_yaml_content = file['content'] + elif file['name'] == 'workflow.yaml': + workflow_yaml_content = file['content'] + if agents_yaml_content or workflow_yaml_content: + status_logger("✅ Found existing YAML files to use as context") + else: + status_logger("📝 No existing YAML files found, starting fresh") + except Exception as e: + status_logger(f"⚠️ Could not fetch YAML files for context: {e}", "warning") + + classification = logged_supervisor.classify_user_intent( + content, agents_yaml_content, workflow_yaml_content + ) + + if classification.intent == Intent.EDIT_YAML: + if not agents_yaml_content and not workflow_yaml_content: + status_logger("⚠️ No existing YAML files found, switching to workflow generation") + classification.intent = Intent.GENERATE_WORKFLOW + else: + file_to_edit = "agents.yaml" if agents_yaml_content else "workflow.yaml" + yaml_content = agents_yaml_content if agents_yaml_content else workflow_yaml_content + + status_logger(f"✏️ Editing {file_to_edit}...") + + edited_yaml = logged_supervisor.edit_yaml( + yaml_content=yaml_content, + file_to_edit=file_to_edit, + instruction=content, + ) + + status_logger(f"✅ Successfully edited {file_to_edit}") + + response_text = logged_supervisor.build_success_response( + Intent.EDIT_YAML, content, file_to_edit + ) + + result = SupervisorResponse( + intent=classification.intent.value, + confidence=float(classification.confidence), + reasoning=classification.reasoning or "Successfully routed to editing", + response=response_text, + yaml_files=[{"name": file_to_edit, "content": edited_yaml}], + chat_id=chat_id, + ) + + request_results[request_id] = result + status_logger("🎉 Request completed successfully!") + return + + # Handle GENERATE_WORKFLOW intent + status_logger("🎯 Routing to workflow generation...") + agents_yaml, workflow_yaml = logged_supervisor.process_complete_workflow_generation(content) + response_text = logged_supervisor.build_success_response( + Intent.GENERATE_WORKFLOW, content + ) + result = SupervisorResponse( + intent=Intent.GENERATE_WORKFLOW.value, + confidence=float(classification.confidence) if classification else 1.0, + reasoning=classification.reasoning or "Successfully routed to workflow generation", + response=response_text, + yaml_files=[ + {"name": "agents.yaml", "content": agents_yaml}, + {"name": "workflow.yaml", "content": workflow_yaml}, + ], + chat_id=chat_id, + ) + request_results[request_id] = result + status_logger("🎉 Request completed successfully!") + + except Exception as e: + status_logger = create_status_logger(request_id) + status_logger(f"❌ Error occurred: {str(e)}", "error") + request_results[request_id] = { + "error": True, + "message": str(e) + } + @app.post("/api/supervisor", response_model=SupervisorResponse) async def supervisor_route(request: SupervisorRequest): """ Main entry point that uses the supervisor agent to route requests to either workflow generation or YAML editing based on user intent. """ + if not request.content or not request.content.strip(): + raise HTTPException(status_code=400, detail="Request content cannot be empty") + chat_id = request.chat_id or str(uuid.uuid4()) + status_logger = create_status_logger(chat_id) + logged_supervisor = SupervisorAgent(logger_callback=status_logger) try: + status_logger("🎯 Processing your request...") agents_yaml_content = "" workflow_yaml_content = "" if request.chat_id: try: + status_logger("📂 Loading existing YAML files for context...") yaml_files = db.get_yaml_files(request.chat_id) for file in yaml_files: if file['name'] == 'agents.yaml': agents_yaml_content = file['content'] elif file['name'] == 'workflow.yaml': workflow_yaml_content = file['content'] + if agents_yaml_content or workflow_yaml_content: + status_logger("✅ Found existing YAML files to use as context") + else: + status_logger("📝 No existing YAML files found, starting fresh") except Exception as e: - print(f"Warning: Could not fetch YAML files for context: {e}") + status_logger(f"⚠️ Could not fetch YAML files for context: {e}", "warning") - classifier = ClassifyUserRequest() - classification = classifier.classify( + classification = logged_supervisor.classify_user_intent( request.content, agents_yaml_content, workflow_yaml_content ) - print(f"Extracted intent: {classification.intent} confidence={classification.confidence}") + + # Handle EDIT_YAML intent if classification.intent == Intent.EDIT_YAML: if not agents_yaml_content and not workflow_yaml_content: + status_logger("⚠️ No existing YAML files found, switching to workflow generation") classification.intent = Intent.GENERATE_WORKFLOW else: file_to_edit = "agents.yaml" if agents_yaml_content else "workflow.yaml" - yaml_content = ( - agents_yaml_content if agents_yaml_content else workflow_yaml_content - ) - edited_yaml = _edit_yaml( + yaml_content = agents_yaml_content if agents_yaml_content else workflow_yaml_content + status_logger(f"✏️ Editing {file_to_edit}...") + edited_yaml = logged_supervisor.edit_yaml( yaml_content=yaml_content, file_to_edit=file_to_edit, instruction=request.content, ) + status_logger(f"✅ Successfully edited {file_to_edit}") + response_text = logged_supervisor.build_success_response( + Intent.EDIT_YAML, request.content, file_to_edit + ) return SupervisorResponse( intent=classification.intent.value, confidence=float(classification.confidence), reasoning=classification.reasoning or "Successfully routed to editing", - response=_build_edit_success_text(file_to_edit, request.content), + response=response_text, yaml_files=[{"name": file_to_edit, "content": edited_yaml}], - chat_id=request.chat_id or str(uuid.uuid4()), + chat_id=chat_id, ) - print("Routing to workflow generation...") + # Handle GENERATE_WORKFLOW intent + status_logger("🎯 Routing to workflow generation...") try: - print("Calling agent generation on port 8003...") - agents_yaml = _generate_agents_yaml(request.content) - agents_info = _parse_agents_yaml_to_info(agents_yaml) - workflow_prompt = _build_workflow_prompt(agents_info, request.content) - print("Calling workflow generation on port 8004...") - workflow_yaml = _generate_workflow_yaml(workflow_prompt) + agents_yaml, workflow_yaml = logged_supervisor.process_complete_workflow_generation( + request.content + ) + response_text = logged_supervisor.build_success_response( + Intent.GENERATE_WORKFLOW, request.content + ) return SupervisorResponse( intent=Intent.GENERATE_WORKFLOW.value, confidence=float(classification.confidence) if classification else 1.0, reasoning=classification.reasoning or "Successfully routed to workflow generation", - response=_build_generation_success_text(request.content), + response=response_text, yaml_files=[ {"name": "agents.yaml", "content": agents_yaml}, {"name": "workflow.yaml", "content": workflow_yaml}, @@ -917,30 +866,10 @@ async def supervisor_route(request: SupervisorRequest): raise Exception(f"Workflow generation failed: {str(e)}") except Exception as e: - print(f"Error in supervisor routing: {str(e)}") try: - agents_resp = requests.post( - "http://localhost:8003/chat", - json={"prompt": request.content, "agent": "TaskInterpreter"}, - timeout=120 - ) - if agents_resp.status_code != 200: - raise Exception(f"Agents generation failed: {agents_resp.text}") - - agents_output = agents_resp.json().get("response", "") - agents_yaml = "" - if "```yaml" in agents_output: - agents_yaml = ( - agents_output.split("```yaml", 1)[-1].split("```", 1)[0].strip() - ) - elif "```" in agents_output: - agents_yaml = agents_output.split("```", 1)[-1].split("```", 1)[0].strip() - else: - yaml_match = re.search(r'apiVersion:.*?(?=\n\n|\Z)', agents_output, re.DOTALL) - if yaml_match: - agents_yaml = yaml_match.group(0).strip() - - clean_response = f"""✅ Successfully generated agents.yaml from your prompt! + status_logger("⚠️ Error occurred, attempting fallback to agents-only generation...") + agents_yaml = logged_supervisor.generate_agents_yaml(request.content) + fallback_response = f"""✅ Successfully generated agents.yaml from your prompt! Your request: "{request.content}" @@ -953,14 +882,68 @@ async def supervisor_route(request: SupervisorRequest): intent="GENERATE_WORKFLOW", confidence=1.0, reasoning=f"Error in supervisor routing: {str(e)}, falling back to agent generation only", - response=clean_response, - yaml_files=[ - {"name": "agents.yaml", "content": agents_yaml} - ], - chat_id=request.chat_id or str(uuid.uuid4()) - ) + response=fallback_response, + yaml_files=[{"name": "agents.yaml", "content": agents_yaml}], + chat_id=chat_id + ) except Exception as fallback_error: - raise HTTPException(status_code=500, detail=f"Supervisor routing failed: {str(e)}, fallback also failed: {str(fallback_error)}") + raise HTTPException( + status_code=500, + detail=f"Supervisor routing failed: {str(e)}, fallback also failed: {str(fallback_error)}" + ) + +@app.post("/api/supervisor-async", response_model=AsyncSupervisorResponse) +async def supervisor_route_async(request: SupervisorRequest): + """ + Async version of supervisor endpoint that starts background processing + and returns immediately with a request ID for polling. + """ + request_id = str(uuid.uuid4()) + chat_id = request.chat_id or str(uuid.uuid4()) + executor.submit(process_supervisor_request_background, request_id, request.content, chat_id) + + return AsyncSupervisorResponse( + request_id=request_id, + status="processing", + message="Request started, use the request_id to poll for status and results" + ) + + +@app.get("/api/supervisor-result/{request_id}") +async def get_supervisor_result(request_id: str): + """Get the result of an async supervisor request.""" + if request_id in request_results: + result = request_results[request_id] + del request_results[request_id] + return result + else: + return {"status": "processing", "message": "Request still in progress"} + + +@app.get("/api/status/{chat_id}") +async def get_status_updates(chat_id: str): + """Get status updates for a specific chat ID.""" + if chat_id in status_updates: + all_updates = status_updates[chat_id] + last_index = last_sent_index.get(chat_id, 0) + new_updates = all_updates[last_index:] + + if new_updates: + last_sent_index[chat_id] = len(all_updates) + return {"updates": new_updates} + else: + return {"updates": []} + return {"updates": []} + + +@app.delete("/api/status/{chat_id}") +async def clear_status_updates(chat_id: str): + """Clear status updates for a specific chat ID.""" + if chat_id in status_updates: + del status_updates[chat_id] + if chat_id in last_sent_index: + del last_sent_index[chat_id] + return {"message": "Status updates cleared"} @app.get("/api/health") diff --git a/api/supervisor.py b/api/supervisor.py new file mode 100644 index 0000000..0568eb7 --- /dev/null +++ b/api/supervisor.py @@ -0,0 +1,398 @@ +""" +Supervisor Agent Module + +This module contains the supervisor agent logic that classifies user intent +and routes requests to appropriate handlers (workflow generation or YAML editing). +""" + +import json +import re +import yaml +import requests +from enum import Enum +from dataclasses import dataclass +from typing import List, Dict, Tuple + + +class Intent(str, Enum): + GENERATE_WORKFLOW = "GENERATE_WORKFLOW" + EDIT_YAML = "EDIT_YAML" + + +@dataclass +class Classification: + intent: Intent + confidence: float + reasoning: str + + +class SupervisorAgent: + """ + Supervisor agent that classifies user intent and coordinates workflow generation + or YAML editing based on the classification. + """ + + SUPERVISOR_URL = "http://localhost:8005/chat" + AGENTS_GENERATION_URL = "http://localhost:8003/chat" + WORKFLOW_GENERATION_URL = "http://localhost:8004/chat" + EDITING_URL = "http://localhost:8001/api/edit_yaml" + + def __init__(self, timeout_seconds: int = 30, logger_callback=None): + self.timeout_seconds = timeout_seconds + self.logger_callback = logger_callback + + def _log(self, message: str, level: str = "info"): + """Log a message using the provided callback or print to console.""" + if self.logger_callback: + self.logger_callback(message, level) + else: + print(f"[{level.upper()}] {message}") + + def classify_user_intent( + self, + user_input: str, + agents_yaml_content: str = "", + workflow_yaml_content: str = "", + ) -> Classification: + """ + Classify user intent using the supervisor agent. + + Args: + user_input: The user's request + agents_yaml_content: Current agents YAML content for context + workflow_yaml_content: Current workflow YAML content for context + + Returns: + Classification object with intent, confidence, and reasoning + + Raises: + Exception: If supervisor agent fails or returns invalid response + """ + self._log("🔍 Starting to classify user prompt...") + self._log(f"📝 User input: {user_input[:100]}{'...' if len(user_input) > 100 else ''}") + + supervisor_prompt = self._build_classification_prompt( + user_input, agents_yaml_content, workflow_yaml_content + ) + + try: + self._log("🤖 Sending request to supervisor agent for intent classification...") + resp = requests.post( + self.SUPERVISOR_URL, + json={"prompt": supervisor_prompt, "agent": "IntentClassifier"}, + timeout=self.timeout_seconds, + ) + + if resp.status_code != 200: + self._log(f"❌ Supervisor agent failed with status {resp.status_code}", "error") + raise Exception( + f"Supervisor agent failed with status {resp.status_code}: {resp.text}" + ) + + supervisor_response = resp.json().get("response", "") + classification = self._parse_classification_response(supervisor_response) + + self._log(f"✅ Intent classified as: {classification.intent.value} (confidence: {classification.confidence:.2f})") + self._log(f"💭 Reasoning: {classification.reasoning}") + + return classification + + except requests.RequestException as e: + self._log(f"❌ Failed to communicate with supervisor agent: {str(e)}", "error") + raise Exception(f"Failed to communicate with supervisor agent: {str(e)}") + + def _build_classification_prompt( + self, user_input: str, agents_yaml_content: str, workflow_yaml_content: str + ) -> str: + """Build the prompt for intent classification.""" + return f"""You are an intent classifier. Determine if the user wants to GENERATE_WORKFLOW or EDIT_YAML. + +User input: {user_input} + +Current YAML files (if any): +Agents YAML: +{agents_yaml_content} + +Workflow YAML: +{workflow_yaml_content} + +Return ONLY valid JSON (no prose, no markdown) with the following schema: +{{ + "intent": "GENERATE_WORKFLOW" | "EDIT_YAML", + "confidence": number, // 0.0 to 1.0 + "reasoning": string +}} + +Example valid responses: +{{"intent":"GENERATE_WORKFLOW","confidence":0.92,"reasoning":"User is asking to create a new flow"}} +{{"intent":"EDIT_YAML","confidence":0.87,"reasoning":"User wants to modify existing YAML"}}""" + + def _parse_classification_response(self, supervisor_response: str) -> Classification: + """Parse the JSON response from the supervisor agent.""" + try: + parsed = json.loads(supervisor_response) + raw_intent = str(parsed.get("intent", "")).upper() + + # Validate intent value + intent = ( + Intent(raw_intent) + if raw_intent in (Intent.GENERATE_WORKFLOW.value, Intent.EDIT_YAML.value) + else Intent.GENERATE_WORKFLOW + ) + + confidence = float(parsed.get("confidence", 1.0)) + reasoning = str(parsed.get("reasoning", "")) + + return Classification(intent=intent, confidence=confidence, reasoning=reasoning) + + except (json.JSONDecodeError, ValueError, KeyError) as e: + # Fallback to default intent if parsing fails + return Classification( + intent=Intent.GENERATE_WORKFLOW, + confidence=0.5, + reasoning=f"Defaulted due to supervisor parsing error: {str(e)}", + ) + + def generate_agents_yaml(self, user_input: str) -> str: + """ + Generate agents YAML from user input. + + Args: + user_input: The user's request + + Returns: + Generated agents YAML content + + Raises: + Exception: If agents generation fails + """ + self._log("🏗️ Starting agents YAML generation...") + self._log("📡 Connecting to agents generation service (port 8003)...") + + try: + resp = requests.post( + self.AGENTS_GENERATION_URL, + json={"prompt": user_input, "agent": "TaskInterpreter"}, + timeout=120, # Longer timeout for generation + ) + + if resp.status_code != 200: + self._log(f"❌ Agents generation failed with status {resp.status_code}", "error") + raise Exception(f"Agents generation failed: {resp.text}") + + self._log("✅ Agents generation completed successfully") + self._log("🔄 Extracting YAML content from response...") + + agents_output = resp.json().get("response", "") + agents_yaml = self._extract_yaml_from_output(agents_output) + + self._log(f"📄 Generated agents YAML ({len(agents_yaml)} characters)") + + return agents_yaml + + except requests.RequestException as e: + self._log(f"❌ Failed to communicate with agents generation service: {str(e)}", "error") + raise Exception(f"Failed to communicate with agents generation service: {str(e)}") + + def generate_workflow_yaml(self, workflow_prompt: str) -> str: + """ + Generate workflow YAML from workflow prompt. + + Args: + workflow_prompt: The constructed workflow prompt + + Returns: + Generated workflow YAML content + + Raises: + Exception: If workflow generation fails + """ + self._log("⚙️ Starting workflow YAML generation...") + self._log("📡 Connecting to workflow generation service (port 8004)...") + + try: + resp = requests.post( + self.WORKFLOW_GENERATION_URL, + json={"prompt": workflow_prompt, "agent": "WorkflowYAMLBuilder"}, + timeout=120, # Longer timeout for generation + ) + + if resp.status_code != 200: + self._log(f"❌ Workflow generation failed with status {resp.status_code}", "error") + raise Exception(f"Workflow generation failed: {resp.text}") + + self._log("✅ Workflow generation completed successfully") + self._log("🔄 Extracting YAML content from response...") + + workflow_output = resp.json().get("response", "") + workflow_yaml = self._extract_yaml_from_output(workflow_output) + + self._log(f"📄 Generated workflow YAML ({len(workflow_yaml)} characters)") + + return workflow_yaml + + except requests.RequestException as e: + self._log(f"❌ Failed to communicate with workflow generation service: {str(e)}", "error") + raise Exception(f"Failed to communicate with workflow generation service: {str(e)}") + + def edit_yaml(self, yaml_content: str, file_to_edit: str, instruction: str) -> str: + """ + Edit existing YAML content based on user instruction. + + Args: + yaml_content: Current YAML content + file_to_edit: Name of the file being edited + instruction: User's editing instruction + + Returns: + Edited YAML content + + Raises: + Exception: If YAML editing fails + """ + self._log(f"✏️ Starting YAML editing for {file_to_edit}...") + self._log("📡 Connecting to editing service (port 8002)...") + + try: + resp = requests.post( + "http://localhost:8002/chat", + json={ + "prompt": f"Current YAML file (type: {file_to_edit.split('.')[0]}):\n{yaml_content}\n\nUser instruction: {instruction}\n\nPlease apply the requested edit and return only the updated YAML file." + }, + timeout=self.timeout_seconds, + ) + + if resp.status_code != 200: + self._log(f"❌ YAML editing failed with status {resp.status_code}", "error") + raise Exception(f"YAML editing failed: {resp.text}") + + self._log("✅ YAML editing completed successfully") + self._log("🔄 Extracting edited YAML content...") + + edited_output = resp.json().get("response", "") + edited_yaml = self._extract_yaml_from_output(edited_output) + + self._log(f"📄 Edited YAML ready ({len(edited_yaml)} characters)") + + return edited_yaml + + except requests.RequestException as e: + self._log(f"❌ Failed to communicate with editing service: {str(e)}", "error") + raise Exception(f"Failed to communicate with editing service: {str(e)}") + + def _extract_yaml_from_output(self, text: str) -> str: + """Extract YAML content from model output.""" + if "```yaml" in text: + return text.split("```yaml", 1)[-1].split("```", 1)[0].strip() + if "```" in text: + return text.split("```", 1)[-1].split("```", 1)[0].strip() + + # Try to find YAML content starting with apiVersion + yaml_match = re.search(r"apiVersion:.*?(?=\n\n|\Z)", text, re.DOTALL) + return yaml_match.group(0).strip() if yaml_match else text.strip() + + def parse_agents_yaml_to_info(self, agents_yaml: str) -> List[Dict[str, str]]: + """ + Parse agents YAML to extract agent information for workflow generation. + + Args: + agents_yaml: The agents YAML content + + Returns: + List of dictionaries with agent name and description + """ + agents_info: List[Dict[str, str]] = [] + + try: + agent_blocks = agents_yaml.split("---") + for block in agent_blocks: + if block.strip(): + agent_data = yaml.safe_load(block) + if ( + agent_data + and "metadata" in agent_data + and "name" in agent_data["metadata"] + ): + name = agent_data["metadata"]["name"] + description = agent_data.get("spec", {}).get("description", "") + agents_info.append({"name": name, "description": description}) + + except yaml.YAMLError: + name_matches = re.findall(r"name:\s*(\w+)", agents_yaml) + desc_matches = re.findall( + r"description:\s*\|\s*\n\s*(.+?)(?=\n\s*\w+:|$)", agents_yaml, re.DOTALL + ) + for i, name in enumerate(name_matches): + description = desc_matches[i] if i < len(desc_matches) else "" + agents_info.append({"name": name, "description": description.strip()}) + + return agents_info + + def build_workflow_prompt(self, agents_info: List[Dict[str, str]], user_input: str) -> str: + """ + Build workflow generation prompt from agents info and user input. + + Args: + agents_info: List of agent information dictionaries + user_input: Original user request + + Returns: + Constructed workflow prompt + """ + workflow_prompt = "Create a workflow that uses the following agents:\n\n" + for i, agent in enumerate(agents_info, 1): + workflow_prompt += f"agent{i}: {agent['name']} – {agent['description']}\n" + workflow_prompt += f"\nprompt: {user_input}" + return workflow_prompt + + def process_complete_workflow_generation(self, user_input: str) -> Tuple[str, str]: + """ + Process complete workflow generation (both agents and workflow). + + Args: + user_input: User's request + + Returns: + Tuple of (agents_yaml, workflow_yaml) + + Raises: + Exception: If any step of the generation process fails + """ + self._log("🚀 Starting complete workflow generation process...") + + agents_yaml = self.generate_agents_yaml(user_input) + self._log("🔍 Parsing generated agents for workflow creation...") + agents_info = self.parse_agents_yaml_to_info(agents_yaml) + self._log(f"📋 Found {len(agents_info)} agents to include in workflow") + + self._log("📝 Building workflow generation prompt...") + workflow_prompt = self.build_workflow_prompt(agents_info, user_input) + workflow_yaml = self.generate_workflow_yaml(workflow_prompt) + + self._log("🎉 Complete workflow generation finished successfully!") + + return agents_yaml, workflow_yaml + + def build_success_response(self, intent: Intent, user_request: str, file_edited: str = None) -> str: + """ + Build success response message based on the operation performed. + + Args: + intent: The intent that was processed + user_request: Original user request + file_edited: Name of file that was edited (for edit operations) + + Returns: + Success message string + """ + if intent == Intent.EDIT_YAML and file_edited: + return f"Successfully edited {file_edited} based on your request: {user_request}" + + return ( + "✅ Successfully generated both agents.yaml and workflow.yaml from your prompt!\n\n" + f"Your request: \"{user_request}\"\n\n" + "I've created:\n" + "• **agents.yaml** - Contains the agent definitions\n" + "• **workflow.yaml** - Contains the workflow that uses those agents\n\n" + "Both files are now available in the YAML panel on the right. You can switch between tabs to view each file." + ) From 0c014b3ec16d2cfdcd4c5d713ca37e926f1512f4 Mon Sep 17 00:00:00 2001 From: george-lhj Date: Tue, 19 Aug 2025 14:28:13 -0700 Subject: [PATCH 10/20] superviosr logic simplification --- src/services/api.ts | 156 ++++++------ tests/test_supervisor.py | 507 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 584 insertions(+), 79 deletions(-) create mode 100644 tests/test_supervisor.py diff --git a/src/services/api.ts b/src/services/api.ts index a75f705..74d5c85 100644 --- a/src/services/api.ts +++ b/src/services/api.ts @@ -108,90 +108,51 @@ class ApiService { } } - async sendAgentMessage(message: string, chatId?: string): Promise { - try { - const response = await fetch(`${API_BASE_URL}/api/chat_builder_agent`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - content: message, - role: 'user', - chat_id: chatId || this.currentChatId - } as ChatMessage & { chat_id?: string }) + async sendMessageAsync(message: string, chatId?: string): Promise<{requestId: string}> { + const response = await fetch(`${API_BASE_URL}/api/supervisor-async`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + content: message, + chat_id: chatId || this.currentChatId }) - if (!response.ok) { - throw new Error(`HTTP error! status: ${response.status}`) - } - const data: ChatResponse = await response.json() - this.currentChatId = data.chat_id - data.yaml_files = data.yaml_files.map(file => ({ - ...file, - content: this.formatYamlContent(file.content) - })) - return data - } catch (error) { - console.error('Error sending agent message:', error) - return this.getFallbackResponse(message) + }) + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`) } + + const data = await response.json() + return { requestId: data.request_id } } - async sendWorkflowMessage(message: string, chatId?: string): Promise { - try { - const response = await fetch(`${API_BASE_URL}/api/chat_builder_workflow`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - content: message, - role: 'user', - chat_id: chatId || this.currentChatId - } as ChatMessage & { chat_id?: string }) - }) - if (!response.ok) { - throw new Error(`HTTP error! status: ${response.status}`) - } - const data: ChatResponse = await response.json() - this.currentChatId = data.chat_id - data.yaml_files = data.yaml_files.map(file => ({ - ...file, - content: this.formatYamlContent(file.content) - })) - return data - } catch (error) { - console.error('Error sending workflow message:', error) - return this.getFallbackResponse(message) + async getAsyncResult(requestId: string): Promise { + const response = await fetch(`${API_BASE_URL}/api/supervisor-result/${requestId}`) + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`) } - } - async sendGenerateMessage(message: string, chatId?: string): Promise { - try { - const response = await fetch(`${API_BASE_URL}/api/generate`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - content: message, - role: 'user', - chat_id: chatId || this.currentChatId - } as ChatMessage & { chat_id?: string }) - }) - if (!response.ok) { - throw new Error(`HTTP error! status: ${response.status}`) - } - const data: ChatResponse = await response.json() - this.currentChatId = data.chat_id - data.yaml_files = data.yaml_files.map(file => ({ - ...file, - content: this.formatYamlContent(file.content) - })) - return data - } catch (error) { - console.error('Error sending complete message:', error) - return this.getFallbackResponse(message) + const data = await response.json() + + if (data.status === 'processing') { + return null + } + + if (data.error) { + throw new Error(data.message) + } + this.currentChatId = data.chat_id + + data.yaml_files = data.yaml_files.map((file: any) => ({ + ...file, + content: this.formatYamlContent(file.content) + })) + + return { + response: data.response, + yaml_files: data.yaml_files, + chat_id: data.chat_id } } @@ -256,7 +217,7 @@ class ApiService { console.error('Error streaming complete message:', error) handlers.onError?.(error as Error) try { - const fallback = await this.sendCompleteMessage(message, chatId) + const fallback = await this.sendMessage(message, chatId) handlers.onEvent?.({ type: 'final', response: fallback.response, yaml_files: fallback.yaml_files, chat_id: fallback.chat_id }) handlers.onEvent?.({ type: 'done' }) handlers.onComplete?.() @@ -542,6 +503,43 @@ workflow: chat_id: 'fallback-session' } } + + // Poll for status updates + async getStatusUpdates(chatId: string): Promise> { + try { + const response = await fetch(`${API_BASE_URL}/api/status/${chatId}`); + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`); + } + const data = await response.json(); + return data.updates || []; + } catch (error) { + console.error('Error getting status updates:', error); + return []; + } + } + + startStatusPolling(chatId: string, onUpdate: (updates: Array<{message: string, level: string, timestamp: string}>) => void): () => void { + let isPolling = true; + const poll = async () => { + if (!isPolling) return; + try { + const updates = await this.getStatusUpdates(chatId); + if (updates.length > 0) { + onUpdate(updates); + } + } catch (error) { + console.error('Error polling status updates:', error); + } + if (isPolling) { + setTimeout(poll, 100); + } + }; + poll(); + return () => { + isPolling = false; + }; + } } export const apiService = new ApiService() \ No newline at end of file diff --git a/tests/test_supervisor.py b/tests/test_supervisor.py new file mode 100644 index 0000000..b2eeaf6 --- /dev/null +++ b/tests/test_supervisor.py @@ -0,0 +1,507 @@ +""" +Tests for the SupervisorAgent class and related functionality. +This module provides comprehensive test coverage for intent classification, +workflow generation, and YAML editing operations. +""" + +import sys +import os +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +import pytest +import json +import yaml +from unittest.mock import Mock, patch, MagicMock +import requests + +from api.supervisor import SupervisorAgent, Intent, Classification + + +class TestSupervisorAgent: + """Test suite for SupervisorAgent class.""" + + def setup_method(self): + """Set up test fixtures.""" + self.supervisor = SupervisorAgent(timeout_seconds=10) + + def test_supervisor_agent_initialization(self): + """Test SupervisorAgent initialization.""" + agent = SupervisorAgent() + assert agent.timeout_seconds == 30 + + agent_custom = SupervisorAgent(timeout_seconds=60) + assert agent_custom.timeout_seconds == 60 + + @patch('requests.post') + def test_classify_user_intent_generate_workflow(self, mock_post): + """Test intent classification for workflow generation.""" + # Mock successful supervisor response + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "response": json.dumps({ + "intent": "GENERATE_WORKFLOW", + "confidence": 0.95, + "reasoning": "User wants to create a new workflow" + }) + } + mock_post.return_value = mock_response + + result = self.supervisor.classify_user_intent( + "Create a workflow to process customer data", + "", + "" + ) + + assert isinstance(result, Classification) + assert result.intent == Intent.GENERATE_WORKFLOW + assert result.confidence == 0.95 + assert "new workflow" in result.reasoning + + # Verify the request was made correctly + mock_post.assert_called_once() + call_args = mock_post.call_args + assert call_args[0][0] == self.supervisor.SUPERVISOR_URL + assert "agent" in call_args[1]["json"] + assert call_args[1]["json"]["agent"] == "IntentClassifier" + + @patch('requests.post') + def test_classify_user_intent_edit_yaml(self, mock_post): + """Test intent classification for YAML editing.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "response": json.dumps({ + "intent": "EDIT_YAML", + "confidence": 0.88, + "reasoning": "User wants to modify existing YAML content" + }) + } + mock_post.return_value = mock_response + + result = self.supervisor.classify_user_intent( + "Change the timeout to 30 seconds", + "existing agents yaml", + "existing workflow yaml" + ) + + assert result.intent == Intent.EDIT_YAML + assert result.confidence == 0.88 + assert "modify existing" in result.reasoning + + @patch('requests.post') + def test_classify_user_intent_supervisor_failure(self, mock_post): + """Test handling of supervisor service failure.""" + mock_response = Mock() + mock_response.status_code = 500 + mock_response.text = "Internal server error" + mock_post.return_value = mock_response + + with pytest.raises(Exception) as exc_info: + self.supervisor.classify_user_intent("test input", "", "") + + assert "Supervisor agent failed with status 500" in str(exc_info.value) + + @patch('requests.post') + def test_classify_user_intent_invalid_json_response(self, mock_post): + """Test handling of invalid JSON response from supervisor.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "response": "This is not valid JSON" + } + mock_post.return_value = mock_response + + result = self.supervisor.classify_user_intent("test input", "", "") + + # Should fall back to default + assert result.intent == Intent.GENERATE_WORKFLOW + assert result.confidence == 0.5 + assert "parsing error" in result.reasoning + + @patch('requests.post') + def test_classify_user_intent_network_error(self, mock_post): + """Test handling of network errors.""" + mock_post.side_effect = requests.RequestException("Network error") + + with pytest.raises(Exception) as exc_info: + self.supervisor.classify_user_intent("test input", "", "") + + assert "Failed to communicate with supervisor agent" in str(exc_info.value) + + @patch('requests.post') + def test_generate_agents_yaml_success(self, mock_post): + """Test successful agents YAML generation.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "response": """```yaml +apiVersion: v1 +kind: Agent +metadata: + name: test-agent +spec: + description: A test agent +```""" + } + mock_post.return_value = mock_response + + result = self.supervisor.generate_agents_yaml("Create a test agent") + + assert "apiVersion: v1" in result + assert "name: test-agent" in result + assert "description: A test agent" in result + + @patch('requests.post') + def test_generate_agents_yaml_failure(self, mock_post): + """Test handling of agents generation failure.""" + mock_response = Mock() + mock_response.status_code = 500 + mock_response.text = "Generation failed" + mock_post.return_value = mock_response + + with pytest.raises(Exception) as exc_info: + self.supervisor.generate_agents_yaml("test input") + + assert "Agents generation failed" in str(exc_info.value) + + @patch('requests.post') + def test_generate_workflow_yaml_success(self, mock_post): + """Test successful workflow YAML generation.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "response": """```yaml +apiVersion: v1 +kind: Workflow +metadata: + name: test-workflow +spec: + steps: + - name: step1 + agent: test-agent +```""" + } + mock_post.return_value = mock_response + + result = self.supervisor.generate_workflow_yaml("Create a test workflow") + + assert "apiVersion: v1" in result + assert "kind: Workflow" in result + assert "name: test-workflow" in result + + @patch('requests.post') + def test_edit_yaml_success(self, mock_post): + """Test successful YAML editing.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "response": """```yaml +apiVersion: v1 +kind: Agent +metadata: + name: updated-agent +spec: + description: Updated description + timeout: 30 +```""" + } + mock_post.return_value = mock_response + + original_yaml = """apiVersion: v1 +kind: Agent +metadata: + name: original-agent +spec: + description: Original description""" + + result = self.supervisor.edit_yaml( + original_yaml, + "agents.yaml", + "Change timeout to 30 seconds" + ) + + assert "timeout: 30" in result + assert "Updated description" in result + + def test_extract_yaml_from_output_with_yaml_markers(self): + """Test YAML extraction from output with yaml markers.""" + output = """Here's the generated YAML: + +```yaml +apiVersion: v1 +kind: Agent +metadata: + name: test +``` + +That's the result.""" + + result = self.supervisor._extract_yaml_from_output(output) + expected = """apiVersion: v1 +kind: Agent +metadata: + name: test""" + + assert result == expected + + def test_extract_yaml_from_output_with_generic_markers(self): + """Test YAML extraction from output with generic markers.""" + output = """``` +apiVersion: v1 +kind: Agent +metadata: + name: test +```""" + + result = self.supervisor._extract_yaml_from_output(output) + expected = """apiVersion: v1 +kind: Agent +metadata: + name: test""" + + assert result == expected + + def test_extract_yaml_from_output_without_markers(self): + """Test YAML extraction when there are no code markers.""" + output = """apiVersion: v1 +kind: Agent +metadata: + name: test + +Some additional text here.""" + + result = self.supervisor._extract_yaml_from_output(output) + expected = """apiVersion: v1 +kind: Agent +metadata: + name: test""" + + assert result == expected + + def test_parse_agents_yaml_to_info_valid_yaml(self): + """Test parsing agents YAML to info with valid YAML.""" + agents_yaml = """apiVersion: v1 +kind: Agent +metadata: + name: agent1 +spec: + description: First agent +--- +apiVersion: v1 +kind: Agent +metadata: + name: agent2 +spec: + description: Second agent""" + + result = self.supervisor.parse_agents_yaml_to_info(agents_yaml) + + assert len(result) == 2 + assert result[0]["name"] == "agent1" + assert result[0]["description"] == "First agent" + assert result[1]["name"] == "agent2" + assert result[1]["description"] == "Second agent" + + def test_parse_agents_yaml_to_info_invalid_yaml(self): + """Test parsing agents YAML with invalid YAML (fallback to regex).""" + invalid_yaml = """name: agent1 +description: | + First agent description +name: agent2 +description: | + Second agent description""" + + result = self.supervisor.parse_agents_yaml_to_info(invalid_yaml) + + assert len(result) == 2 + assert result[0]["name"] == "agent1" + assert result[1]["name"] == "agent2" + + def test_build_workflow_prompt(self): + """Test building workflow prompt from agents info.""" + agents_info = [ + {"name": "agent1", "description": "First agent"}, + {"name": "agent2", "description": "Second agent"} + ] + user_input = "Process customer data" + + result = self.supervisor.build_workflow_prompt(agents_info, user_input) + + assert "agent1: agent1 – First agent" in result + assert "agent2: agent2 – Second agent" in result + assert "prompt: Process customer data" in result + + @patch.object(SupervisorAgent, 'generate_agents_yaml') + @patch.object(SupervisorAgent, 'generate_workflow_yaml') + @patch.object(SupervisorAgent, 'parse_agents_yaml_to_info') + @patch.object(SupervisorAgent, 'build_workflow_prompt') + def test_process_complete_workflow_generation( + self, mock_build_prompt, mock_parse_agents, + mock_gen_workflow, mock_gen_agents + ): + """Test complete workflow generation process.""" + # Set up mocks + mock_gen_agents.return_value = "agents yaml content" + mock_parse_agents.return_value = [{"name": "agent1", "description": "desc1"}] + mock_build_prompt.return_value = "workflow prompt" + mock_gen_workflow.return_value = "workflow yaml content" + + agents_yaml, workflow_yaml = self.supervisor.process_complete_workflow_generation( + "Create a data processing workflow" + ) + + assert agents_yaml == "agents yaml content" + assert workflow_yaml == "workflow yaml content" + + # Verify the call chain + mock_gen_agents.assert_called_once_with("Create a data processing workflow") + mock_parse_agents.assert_called_once_with("agents yaml content") + mock_build_prompt.assert_called_once_with( + [{"name": "agent1", "description": "desc1"}], + "Create a data processing workflow" + ) + mock_gen_workflow.assert_called_once_with("workflow prompt") + + def test_build_success_response_generation(self): + """Test building success response for generation intent.""" + result = self.supervisor.build_success_response( + Intent.GENERATE_WORKFLOW, + "Create a workflow" + ) + + assert "Successfully generated both agents.yaml and workflow.yaml" in result + assert "Create a workflow" in result + assert "agents.yaml" in result + assert "workflow.yaml" in result + + def test_build_success_response_editing(self): + """Test building success response for edit intent.""" + result = self.supervisor.build_success_response( + Intent.EDIT_YAML, + "Change timeout", + "agents.yaml" + ) + + assert "Successfully edited agents.yaml" in result + assert "Change timeout" in result + + +class TestIntentClassificationEdgeCases: + """Test edge cases for intent classification.""" + + def setup_method(self): + self.supervisor = SupervisorAgent() + + @patch('requests.post') + def test_malformed_json_response(self, mock_post): + """Test handling of malformed JSON in supervisor response.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "response": '{"intent": "INVALID_INTENT", "confidence": "not_a_number"}' + } + mock_post.return_value = mock_response + + result = self.supervisor.classify_user_intent("test", "", "") + + # Should fall back to default intent + assert result.intent == Intent.GENERATE_WORKFLOW + assert result.confidence == 0.5 + + @patch('requests.post') + def test_missing_fields_in_response(self, mock_post): + """Test handling of missing fields in supervisor JSON response.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "response": '{"intent": "GENERATE_WORKFLOW"}' # missing confidence and reasoning + } + mock_post.return_value = mock_response + + result = self.supervisor.classify_user_intent("test", "", "") + + assert result.intent == Intent.GENERATE_WORKFLOW + assert result.confidence == 1.0 # default value + assert result.reasoning == "" # default value + + @patch('requests.post') + def test_empty_yaml_extraction(self, mock_post): + """Test YAML extraction when no YAML content is found.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "response": "No YAML content here, just plain text." + } + mock_post.return_value = mock_response + + result = self.supervisor.generate_agents_yaml("test") + + # Should return the stripped text as fallback + assert result == "No YAML content here, just plain text." + + +class TestSupervisorIntegration: + """Integration tests that test multiple components together.""" + + def setup_method(self): + self.supervisor = SupervisorAgent() + + def test_yaml_parsing_with_real_yaml_structure(self): + """Test YAML parsing with realistic agent definitions.""" + agents_yaml = """apiVersion: v1 +kind: Agent +metadata: + name: DataProcessor +spec: + description: | + Processes incoming data streams and validates format + framework: langchain +--- +apiVersion: v1 +kind: Agent +metadata: + name: EmailSender +spec: + description: | + Sends notification emails to stakeholders + framework: custom""" + + result = self.supervisor.parse_agents_yaml_to_info(agents_yaml) + + assert len(result) == 2 + assert result[0]["name"] == "DataProcessor" + assert "Processes incoming data" in result[0]["description"] + assert result[1]["name"] == "EmailSender" + assert "notification emails" in result[1]["description"] + + def test_workflow_prompt_building_realistic_scenario(self): + """Test workflow prompt building with realistic agent data.""" + agents_info = [ + { + "name": "DataValidator", + "description": "Validates incoming data against schema" + }, + { + "name": "DataTransformer", + "description": "Transforms data to required format" + }, + { + "name": "DatabaseWriter", + "description": "Writes processed data to database" + } + ] + + user_input = "Create a pipeline to process customer orders" + + prompt = self.supervisor.build_workflow_prompt(agents_info, user_input) + + assert "agent1: DataValidator – Validates incoming data against schema" in prompt + assert "agent2: DataTransformer – Transforms data to required format" in prompt + assert "agent3: DatabaseWriter – Writes processed data to database" in prompt + assert "prompt: Create a pipeline to process customer orders" in prompt + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) + From 9f0927a617a62e8f2b05a7dbb97ee7f1dbbb7e67 Mon Sep 17 00:00:00 2001 From: george-lhj Date: Tue, 19 Aug 2025 14:28:32 -0700 Subject: [PATCH 11/20] add polling to help support stream --- src/App.tsx | 80 ++++++++++++++++++++---------------- src/components/ChatInput.tsx | 7 ++-- 2 files changed, 47 insertions(+), 40 deletions(-) diff --git a/src/App.tsx b/src/App.tsx index 6c21469..10bf521 100644 --- a/src/App.tsx +++ b/src/App.tsx @@ -3,7 +3,7 @@ import { Sidebar } from './components/Sidebar' import { ChatCanvas } from './components/ChatCanvas' import { YamlPanel } from './components/YamlPanel' import { ChatInput } from './components/ChatInput' -import { apiService, type ChatHistory, type ChatSession } from './services/api' +import { apiService, type ChatHistory } from './services/api' export interface Message { id: string @@ -155,18 +155,13 @@ function App() { const deleteAllChats = async () => { try { - console.log('Starting delete all chats...') const success = await apiService.deleteAllChatSessions() - console.log('Delete all chats result:', success) if (success) { - console.log('Creating new chat after deletion...') // Create a new chat since all chats were deleted await createNewChat() - console.log('Refreshing chat history...') // Refresh chat history (should be empty now) await loadChatHistory() - console.log('Delete all chats completed successfully') } else { console.error('Failed to delete all chats - API returned false') } @@ -175,7 +170,7 @@ function App() { } } - const handleSendMessage = async (content: string, useStreaming: boolean = true) => { + const handleSendMessage = async (content: string) => { const userMessage: Message = { id: Date.now().toString(), role: 'user', @@ -195,33 +190,35 @@ function App() { timestamp: new Date() }]) - const mergeYaml = (incoming: { name: string; content: string }) => { - setYamlFiles(prev => { - const exists = prev.find(f => f.name === incoming.name) - if (exists) { - return prev.map(f => f.name === incoming.name ? { ...f, content: incoming.content } : f) - } - return [...prev, { name: incoming.name, content: incoming.content, language: 'yaml' }] + + let stopStatusPolling: (() => void) | undefined + const startPollingForRequest = (requestChatId: string) => { + stopStatusPolling = apiService.startStatusPolling(requestChatId, (updates) => { + updates.forEach(update => { + setMessages(prev => prev.map(m => + m.id === assistantLogId + ? { ...m, content: m.content ? `${m.content}\n${update.message}` : update.message } + : m + )) + }) }) } - - // Start log streaming scoped to this run - const closeAgentsLogs = apiService.streamLogs('agents', false, data => { - if (data.type === 'log' && data.line) { - const line = data.line as string - setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: m.content ? `${m.content}\n${line}` : line } : m)) + try { + const { requestId } = await apiService.sendMessageAsync(content, currentChatId || undefined); + startPollingForRequest(requestId); + let apiResponse: any = null; + let attempts = 0; + const maxAttempts = 120; + + while (attempts < maxAttempts && !apiResponse) { + await new Promise(resolve => setTimeout(resolve, 1000)); + apiResponse = await apiService.getAsyncResult(requestId); + attempts++; } - }) - const closeWorkflowLogs = apiService.streamLogs('workflow', false, data => { - if (data.type === 'log' && data.line) { - const line = data.line as string - setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: m.content ? `${m.content}\n${line}` : line } : m)) + + if (!apiResponse) { + throw new Error('Request timed out after 2 minutes'); } - }) - - try { - // Use the supervisor endpoint to route to appropriate handler - const apiResponse = await apiService.sendMessage(content, currentChatId || undefined); // Parse AI response (final_prompt if available) let parsedText = apiResponse.response @@ -248,7 +245,7 @@ function App() { // Update YAML files from API response, ensuring both files are updated const updatedYamlFiles = yamlFiles.map(file => { - const apiFile = apiResponse.yaml_files.find(apiFile => apiFile.name === file.name) + const apiFile = apiResponse.yaml_files.find((apiFile: any) => apiFile.name === file.name) if (apiFile) { return { ...file, @@ -266,14 +263,20 @@ function App() { } setIsLoading(false) - closeAgentsLogs() - closeWorkflowLogs() + if (stopStatusPolling) { + setTimeout(() => { + if (stopStatusPolling) { + stopStatusPolling() + } + }, 1000) + } } catch (error) { console.error('Error processing message:', error) setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: `${m.content}\nSorry, I encountered an error while processing your request. Please try again.` } : m)) setIsLoading(false) - closeAgentsLogs() - closeWorkflowLogs() + if (stopStatusPolling) { + stopStatusPolling() + } } } @@ -305,7 +308,12 @@ function App() {
{/* Chat Input */}
- +
diff --git a/src/components/ChatInput.tsx b/src/components/ChatInput.tsx index 1ff4bd1..3622424 100644 --- a/src/components/ChatInput.tsx +++ b/src/components/ChatInput.tsx @@ -5,13 +5,12 @@ import { cn } from '../lib/utils' interface ChatInputProps { onSendMessage: (message: string) => void - onEditYaml?: (instruction: string) => void disabled?: boolean streamingEnabled?: boolean onToggleStreaming?: (enabled: boolean) => void } -export function ChatInput({ onSendMessage, onEditYaml, disabled = false }: ChatInputProps) { +export function ChatInput({ onSendMessage, disabled = false, streamingEnabled = true, onToggleStreaming }: ChatInputProps) { const [message, setMessage] = useState('') const [isTyping, setIsTyping] = useState(false) const [showSuggestions, setShowSuggestions] = useState(false) @@ -39,7 +38,7 @@ export function ChatInput({ onSendMessage, onEditYaml, disabled = false }: ChatI const handleSend = () => { if (message.trim() && !disabled) { - onSendMessage(message.trim(), streamingEnabled) + onSendMessage(message.trim()) setMessage('') setIsTyping(false) } @@ -54,7 +53,7 @@ export function ChatInput({ onSendMessage, onEditYaml, disabled = false }: ChatI const handleSuggestionClick = (suggestion: string) => { if (!disabled) { - onSendMessage(suggestion, streamingEnabled) + onSendMessage(suggestion) setShowSuggestions(false) } } From 9806fa19d228970f3d6d738f88e3cbdcc6047b4c Mon Sep 17 00:00:00 2001 From: george-lhj Date: Tue, 19 Aug 2025 14:37:07 -0700 Subject: [PATCH 12/20] move logging process out of main --- api/main.py | 112 +++++++++----------------------------------- api/supervisor.py | 115 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+), 90 deletions(-) diff --git a/api/main.py b/api/main.py index b337bad..7a97c67 100644 --- a/api/main.py +++ b/api/main.py @@ -685,97 +685,20 @@ class AsyncSupervisorResponse(BaseModel): status: str message: str -def process_supervisor_request_background(request_id: str, content: str, chat_id: str): - """Background function to process supervisor requests with real-time status updates.""" - try: - status_logger = create_status_logger(request_id) - logged_supervisor = SupervisorAgent(logger_callback=status_logger) - status_logger("🎯 Processing your request...") - agents_yaml_content = "" - workflow_yaml_content = "" - - if chat_id: - try: - status_logger("📂 Loading existing YAML files for context...") - yaml_files = db.get_yaml_files(chat_id) - for file in yaml_files: - if file['name'] == 'agents.yaml': - agents_yaml_content = file['content'] - elif file['name'] == 'workflow.yaml': - workflow_yaml_content = file['content'] - if agents_yaml_content or workflow_yaml_content: - status_logger("✅ Found existing YAML files to use as context") - else: - status_logger("📝 No existing YAML files found, starting fresh") - except Exception as e: - status_logger(f"⚠️ Could not fetch YAML files for context: {e}", "warning") - - classification = logged_supervisor.classify_user_intent( - content, agents_yaml_content, workflow_yaml_content - ) - - if classification.intent == Intent.EDIT_YAML: - if not agents_yaml_content and not workflow_yaml_content: - status_logger("⚠️ No existing YAML files found, switching to workflow generation") - classification.intent = Intent.GENERATE_WORKFLOW - else: - file_to_edit = "agents.yaml" if agents_yaml_content else "workflow.yaml" - yaml_content = agents_yaml_content if agents_yaml_content else workflow_yaml_content - - status_logger(f"✏️ Editing {file_to_edit}...") - - edited_yaml = logged_supervisor.edit_yaml( - yaml_content=yaml_content, - file_to_edit=file_to_edit, - instruction=content, - ) - - status_logger(f"✅ Successfully edited {file_to_edit}") - - response_text = logged_supervisor.build_success_response( - Intent.EDIT_YAML, content, file_to_edit - ) - - result = SupervisorResponse( - intent=classification.intent.value, - confidence=float(classification.confidence), - reasoning=classification.reasoning or "Successfully routed to editing", - response=response_text, - yaml_files=[{"name": file_to_edit, "content": edited_yaml}], - chat_id=chat_id, - ) - - request_results[request_id] = result - status_logger("🎉 Request completed successfully!") - return - - # Handle GENERATE_WORKFLOW intent - status_logger("🎯 Routing to workflow generation...") - agents_yaml, workflow_yaml = logged_supervisor.process_complete_workflow_generation(content) - response_text = logged_supervisor.build_success_response( - Intent.GENERATE_WORKFLOW, content - ) - result = SupervisorResponse( - intent=Intent.GENERATE_WORKFLOW.value, - confidence=float(classification.confidence) if classification else 1.0, - reasoning=classification.reasoning or "Successfully routed to workflow generation", - response=response_text, - yaml_files=[ - {"name": "agents.yaml", "content": agents_yaml}, - {"name": "workflow.yaml", "content": workflow_yaml}, - ], - chat_id=chat_id, +def store_request_result(request_id: str, result): + """Store the result of a background request.""" + if isinstance(result, dict) and "error" not in result: + supervisor_result = SupervisorResponse( + intent=result["intent"], + confidence=result["confidence"], + reasoning=result["reasoning"], + response=result["response"], + yaml_files=result["yaml_files"], + chat_id=result["chat_id"], ) + request_results[request_id] = supervisor_result + else: request_results[request_id] = result - status_logger("🎉 Request completed successfully!") - - except Exception as e: - status_logger = create_status_logger(request_id) - status_logger(f"❌ Error occurred: {str(e)}", "error") - request_results[request_id] = { - "error": True, - "message": str(e) - } @app.post("/api/supervisor", response_model=SupervisorResponse) async def supervisor_route(request: SupervisorRequest): @@ -900,7 +823,16 @@ async def supervisor_route_async(request: SupervisorRequest): """ request_id = str(uuid.uuid4()) chat_id = request.chat_id or str(uuid.uuid4()) - executor.submit(process_supervisor_request_background, request_id, request.content, chat_id) + # Start background processing using the supervisor agent + executor.submit( + supervisor_agent.process_request_in_background, + request_id, + request.content, + chat_id, + create_status_logger, + store_request_result, + db + ) return AsyncSupervisorResponse( request_id=request_id, diff --git a/api/supervisor.py b/api/supervisor.py index 0568eb7..e1198c5 100644 --- a/api/supervisor.py +++ b/api/supervisor.py @@ -396,3 +396,118 @@ def build_success_response(self, intent: Intent, user_request: str, file_edited: "• **workflow.yaml** - Contains the workflow that uses those agents\n\n" "Both files are now available in the YAML panel on the right. You can switch between tabs to view each file." ) + + def process_request_in_background(self, request_id: str, content: str, chat_id: str, + status_logger_callback, result_callback, db_instance): + """ + Background function to process supervisor requests with real-time status updates. + + Args: + request_id: Unique identifier for this request + content: User's request content + chat_id: Chat session ID + status_logger_callback: Function to call for status updates + result_callback: Function to call with final result + db_instance: Database instance for YAML file retrieval + """ + try: + status_logger = status_logger_callback(request_id) + logged_supervisor = SupervisorAgent(logger_callback=status_logger) + + status_logger("🎯 Processing your request...") + + # Get existing YAML content for context + agents_yaml_content = "" + workflow_yaml_content = "" + + if chat_id: + try: + status_logger("📂 Loading existing YAML files for context...") + yaml_files = db_instance.get_yaml_files(chat_id) + for file in yaml_files: + if file['name'] == 'agents.yaml': + agents_yaml_content = file['content'] + elif file['name'] == 'workflow.yaml': + workflow_yaml_content = file['content'] + if agents_yaml_content or workflow_yaml_content: + status_logger("✅ Found existing YAML files to use as context") + else: + status_logger("📝 No existing YAML files found, starting fresh") + except Exception as e: + status_logger(f"⚠️ Could not fetch YAML files for context: {e}", "warning") + + # Classify user intent + classification = logged_supervisor.classify_user_intent( + content, agents_yaml_content, workflow_yaml_content + ) + + # Handle EDIT_YAML intent + if classification.intent == Intent.EDIT_YAML: + if not agents_yaml_content and not workflow_yaml_content: + status_logger("⚠️ No existing YAML files found, switching to workflow generation") + classification.intent = Intent.GENERATE_WORKFLOW + else: + file_to_edit = "agents.yaml" if agents_yaml_content else "workflow.yaml" + yaml_content = agents_yaml_content if agents_yaml_content else workflow_yaml_content + + status_logger(f"✏️ Editing {file_to_edit}...") + + edited_yaml = logged_supervisor.edit_yaml( + yaml_content=yaml_content, + file_to_edit=file_to_edit, + instruction=content, + ) + + status_logger(f"✅ Successfully edited {file_to_edit}") + + response_text = logged_supervisor.build_success_response( + Intent.EDIT_YAML, content, file_to_edit + ) + + result = { + "intent": classification.intent.value, + "confidence": float(classification.confidence), + "reasoning": classification.reasoning or "Successfully routed to editing", + "response": response_text, + "yaml_files": [{"name": file_to_edit, "content": edited_yaml}], + "chat_id": chat_id, + } + + result_callback(request_id, result) + status_logger("🎉 Request completed successfully!") + return + + # Handle GENERATE_WORKFLOW intent + status_logger("🎯 Routing to workflow generation...") + + agents_yaml, workflow_yaml = logged_supervisor.process_complete_workflow_generation(content) + + response_text = logged_supervisor.build_success_response( + Intent.GENERATE_WORKFLOW, content + ) + + result = { + "intent": Intent.GENERATE_WORKFLOW.value, + "confidence": float(classification.confidence) if classification else 1.0, + "reasoning": classification.reasoning or "Successfully routed to workflow generation", + "response": response_text, + "yaml_files": [ + {"name": "agents.yaml", "content": agents_yaml}, + {"name": "workflow.yaml", "content": workflow_yaml}, + ], + "chat_id": chat_id, + } + + result_callback(request_id, result) + status_logger("🎉 Request completed successfully!") + + except Exception as e: + status_logger = status_logger_callback(request_id) + status_logger(f"❌ Error occurred: {str(e)}", "error") + + # Store error result + error_result = { + "error": True, + "message": str(e) + } + result_callback(request_id, error_result) From cebedac5f0f8f4e32ff3b2a7780c9937f27360a5 Mon Sep 17 00:00:00 2001 From: george-lhj Date: Tue, 19 Aug 2025 14:46:47 -0700 Subject: [PATCH 13/20] remove old logic --- api/main.py | 134 +++++++++++++--------------------------------------- 1 file changed, 34 insertions(+), 100 deletions(-) diff --git a/api/main.py b/api/main.py index 7a97c67..132a39f 100644 --- a/api/main.py +++ b/api/main.py @@ -703,117 +703,51 @@ def store_request_result(request_id: str, result): @app.post("/api/supervisor", response_model=SupervisorResponse) async def supervisor_route(request: SupervisorRequest): """ - Main entry point that uses the supervisor agent to route requests to either - workflow generation or YAML editing based on user intent. + Synchronous supervisor endpoint that processes requests directly. + For asynchronous processing with real-time updates, use /api/supervisor-async. """ if not request.content or not request.content.strip(): raise HTTPException(status_code=400, detail="Request content cannot be empty") + chat_id = request.chat_id or str(uuid.uuid4()) - status_logger = create_status_logger(chat_id) - logged_supervisor = SupervisorAgent(logger_callback=status_logger) + try: - status_logger("🎯 Processing your request...") - agents_yaml_content = "" - workflow_yaml_content = "" + result_container = {} - if request.chat_id: - try: - status_logger("📂 Loading existing YAML files for context...") - yaml_files = db.get_yaml_files(request.chat_id) - for file in yaml_files: - if file['name'] == 'agents.yaml': - agents_yaml_content = file['content'] - elif file['name'] == 'workflow.yaml': - workflow_yaml_content = file['content'] - if agents_yaml_content or workflow_yaml_content: - status_logger("✅ Found existing YAML files to use as context") - else: - status_logger("📝 No existing YAML files found, starting fresh") - except Exception as e: - status_logger(f"⚠️ Could not fetch YAML files for context: {e}", "warning") - - classification = logged_supervisor.classify_user_intent( - request.content, agents_yaml_content, workflow_yaml_content + def sync_result_callback(request_id: str, result): + result_container[request_id] = result + + temp_request_id = "sync_request" + supervisor_agent.process_request_in_background( + temp_request_id, + request.content, + chat_id, + create_status_logger, + sync_result_callback, + db ) - - - # Handle EDIT_YAML intent - if classification.intent == Intent.EDIT_YAML: - if not agents_yaml_content and not workflow_yaml_content: - status_logger("⚠️ No existing YAML files found, switching to workflow generation") - classification.intent = Intent.GENERATE_WORKFLOW - else: - file_to_edit = "agents.yaml" if agents_yaml_content else "workflow.yaml" - yaml_content = agents_yaml_content if agents_yaml_content else workflow_yaml_content - status_logger(f"✏️ Editing {file_to_edit}...") - edited_yaml = logged_supervisor.edit_yaml( - yaml_content=yaml_content, - file_to_edit=file_to_edit, - instruction=request.content, - ) - status_logger(f"✅ Successfully edited {file_to_edit}") - response_text = logged_supervisor.build_success_response( - Intent.EDIT_YAML, request.content, file_to_edit - ) - return SupervisorResponse( - intent=classification.intent.value, - confidence=float(classification.confidence), - reasoning=classification.reasoning or "Successfully routed to editing", - response=response_text, - yaml_files=[{"name": file_to_edit, "content": edited_yaml}], - chat_id=chat_id, - ) - - # Handle GENERATE_WORKFLOW intent - status_logger("🎯 Routing to workflow generation...") - try: - agents_yaml, workflow_yaml = logged_supervisor.process_complete_workflow_generation( - request.content - ) - response_text = logged_supervisor.build_success_response( - Intent.GENERATE_WORKFLOW, request.content - ) - + + result = result_container.get(temp_request_id) + + if result and isinstance(result, dict) and "error" in result: + raise HTTPException(status_code=500, detail=result["message"]) + + if isinstance(result, dict): return SupervisorResponse( - intent=Intent.GENERATE_WORKFLOW.value, - confidence=float(classification.confidence) if classification else 1.0, - reasoning=classification.reasoning or "Successfully routed to workflow generation", - response=response_text, - yaml_files=[ - {"name": "agents.yaml", "content": agents_yaml}, - {"name": "workflow.yaml", "content": workflow_yaml}, - ], - chat_id=request.chat_id or str(uuid.uuid4()), + intent=result["intent"], + confidence=result["confidence"], + reasoning=result["reasoning"], + response=result["response"], + yaml_files=result["yaml_files"], + chat_id=result["chat_id"], ) - except Exception as e: - raise Exception(f"Workflow generation failed: {str(e)}") + return result + + except HTTPException: + raise except Exception as e: - try: - status_logger("⚠️ Error occurred, attempting fallback to agents-only generation...") - agents_yaml = logged_supervisor.generate_agents_yaml(request.content) - fallback_response = f"""✅ Successfully generated agents.yaml from your prompt! - -Your request: "{request.content}" - -I've created: -• **agents.yaml** - Contains the agent definitions - -The file is now available in the YAML panel on the right.""" - - return SupervisorResponse( - intent="GENERATE_WORKFLOW", - confidence=1.0, - reasoning=f"Error in supervisor routing: {str(e)}, falling back to agent generation only", - response=fallback_response, - yaml_files=[{"name": "agents.yaml", "content": agents_yaml}], - chat_id=chat_id - ) - except Exception as fallback_error: - raise HTTPException( - status_code=500, - detail=f"Supervisor routing failed: {str(e)}, fallback also failed: {str(fallback_error)}" - ) + raise HTTPException(status_code=500, detail=f"Supervisor processing failed: {str(e)}") @app.post("/api/supervisor-async", response_model=AsyncSupervisorResponse) async def supervisor_route_async(request: SupervisorRequest): From c43f1f0f32a8a432b148cea20ce6d972cb5cd0a1 Mon Sep 17 00:00:00 2001 From: george-lhj Date: Tue, 19 Aug 2025 14:54:34 -0700 Subject: [PATCH 14/20] update fallback logic to extract yaml --- api/supervisor.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/api/supervisor.py b/api/supervisor.py index e1198c5..9d6cc47 100644 --- a/api/supervisor.py +++ b/api/supervisor.py @@ -304,6 +304,7 @@ def parse_agents_yaml_to_info(self, agents_yaml: str) -> List[Dict[str, str]]: agents_info: List[Dict[str, str]] = [] try: + # First try to parse as properly structured YAML with separators agent_blocks = agents_yaml.split("---") for block in agent_blocks: if block.strip(): @@ -316,15 +317,26 @@ def parse_agents_yaml_to_info(self, agents_yaml: str) -> List[Dict[str, str]]: name = agent_data["metadata"]["name"] description = agent_data.get("spec", {}).get("description", "") agents_info.append({"name": name, "description": description}) + if not agents_info: + name_count = len(re.findall(r"^name:\s*\w+", agents_yaml, re.MULTILINE)) + if name_count > 1: + raise yaml.YAMLError("Multiple name entries detected - using regex fallback") + else: + agent_data = yaml.safe_load(agents_yaml) + if agent_data and isinstance(agent_data, dict): + if "name" in agent_data: + name = agent_data["name"] + description = agent_data.get("description", "") + agents_info.append({"name": name, "description": description}) except yaml.YAMLError: name_matches = re.findall(r"name:\s*(\w+)", agents_yaml) desc_matches = re.findall( - r"description:\s*\|\s*\n\s*(.+?)(?=\n\s*\w+:|$)", agents_yaml, re.DOTALL + r"description:\s*\|\s*\n\s*(.+?)(?=\nname:|$)", agents_yaml, re.DOTALL ) for i, name in enumerate(name_matches): - description = desc_matches[i] if i < len(desc_matches) else "" - agents_info.append({"name": name, "description": description.strip()}) + description = desc_matches[i].strip() if i < len(desc_matches) else "" + agents_info.append({"name": name, "description": description}) return agents_info From 18afb035e240e4b2e8f876032a61c17ed100ee0b Mon Sep 17 00:00:00 2001 From: george-lhj Date: Tue, 19 Aug 2025 16:00:16 -0700 Subject: [PATCH 15/20] update greeting msg to a const --- src/App.tsx | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/src/App.tsx b/src/App.tsx index 10bf521..eb36b26 100644 --- a/src/App.tsx +++ b/src/App.tsx @@ -18,15 +18,15 @@ export interface YamlFile { language: 'yaml' } +const INITIAL_GREETING: Message = { + id: '1', + role: 'assistant', + content: 'Hello! I\'m your Maestro AI Builder assistant. I can help you create workflows and edit YAML files. Just describe what you want to build or what changes you want to make, and I\'ll handle it automatically. What would you like to do today?', + timestamp: new Date() +} + function App() { - const [messages, setMessages] = useState([ - { - id: '1', - role: 'assistant', - content: 'Hello! I\'m your Maestro AI Builder assistant. I can help you create workflows and edit YAML files. Just describe what you want to build or what changes you want to make, and I\'ll handle it automatically. What would you like to do today?', - timestamp: new Date() - } - ]) + const [messages, setMessages] = useState([INITIAL_GREETING]) const [yamlFiles, setYamlFiles] = useState([ { @@ -107,14 +107,10 @@ function App() { apiService.setCurrentChatId(chatId) // Reset to initial state with empty YAML files - setMessages([ - { - id: '1', - role: 'assistant', - content: 'Hello! I\'m your Maestro AI Builder assistant. I can help you create workflows and edit YAML files. Just describe what you want to build or what changes you want to make, and I\'ll handle it automatically. What would you like to do today?', - timestamp: new Date() - } - ]) + setMessages([{ + ...INITIAL_GREETING, + timestamp: new Date() + }]) // Set YAML files to empty state setYamlFiles([ From 12e2d60f94ce656d22ade5534e6715fd36338620 Mon Sep 17 00:00:00 2001 From: george-lhj Date: Thu, 21 Aug 2025 22:33:21 -0700 Subject: [PATCH 16/20] show immediate agent output --- api/main.py | 4 +++- api/supervisor.py | 14 +++++++++++-- src/App.tsx | 51 +++++++++++++++++++++++++++++++++++++++++---- src/services/api.ts | 4 ++-- 4 files changed, 64 insertions(+), 9 deletions(-) diff --git a/api/main.py b/api/main.py index 132a39f..5e1e841 100644 --- a/api/main.py +++ b/api/main.py @@ -684,6 +684,7 @@ class AsyncSupervisorResponse(BaseModel): request_id: str status: str message: str + chat_id: str def store_request_result(request_id: str, result): """Store the result of a background request.""" @@ -771,7 +772,8 @@ async def supervisor_route_async(request: SupervisorRequest): return AsyncSupervisorResponse( request_id=request_id, status="processing", - message="Request started, use the request_id to poll for status and results" + message="Request started, use the request_id to poll for status and results", + chat_id=chat_id ) diff --git a/api/supervisor.py b/api/supervisor.py index 9d6cc47..f9c973b 100644 --- a/api/supervisor.py +++ b/api/supervisor.py @@ -357,7 +357,7 @@ def build_workflow_prompt(self, agents_info: List[Dict[str, str]], user_input: s workflow_prompt += f"\nprompt: {user_input}" return workflow_prompt - def process_complete_workflow_generation(self, user_input: str) -> Tuple[str, str]: + def process_complete_workflow_generation(self, user_input: str, chat_id: str = None, db_instance=None) -> Tuple[str, str]: """ Process complete workflow generation (both agents and workflow). @@ -373,6 +373,16 @@ def process_complete_workflow_generation(self, user_input: str) -> Tuple[str, st self._log("🚀 Starting complete workflow generation process...") agents_yaml = self.generate_agents_yaml(user_input) + self._log("✅ agents.yaml generated!") + + # Immediately save agents.yaml to database so frontend can see it + if chat_id and db_instance: + try: + db_instance.update_yaml_files(chat_id, {"agents.yaml": agents_yaml}) + self._log("💾 Saved agents.yaml to database for immediate viewing") + except Exception as e: + self._log(f"⚠️ Could not save agents.yaml immediately: {e}", "warning") + self._log("🔍 Parsing generated agents for workflow creation...") agents_info = self.parse_agents_yaml_to_info(agents_yaml) self._log(f"📋 Found {len(agents_info)} agents to include in workflow") @@ -492,7 +502,7 @@ def process_request_in_background(self, request_id: str, content: str, chat_id: # Handle GENERATE_WORKFLOW intent status_logger("🎯 Routing to workflow generation...") - agents_yaml, workflow_yaml = logged_supervisor.process_complete_workflow_generation(content) + agents_yaml, workflow_yaml = logged_supervisor.process_complete_workflow_generation(content, chat_id, db_instance) response_text = logged_supervisor.build_success_response( Intent.GENERATE_WORKFLOW, content diff --git a/src/App.tsx b/src/App.tsx index eb36b26..0b6bf86 100644 --- a/src/App.tsx +++ b/src/App.tsx @@ -188,7 +188,7 @@ function App() { let stopStatusPolling: (() => void) | undefined - const startPollingForRequest = (requestChatId: string) => { + const startPollingForRequest = (requestChatId: string, activeChatId: string) => { stopStatusPolling = apiService.startStatusPolling(requestChatId, (updates) => { updates.forEach(update => { setMessages(prev => prev.map(m => @@ -198,13 +198,56 @@ function App() { )) }) }) + + // Also poll for YAML files during status updates + const yamlPollingInterval = setInterval(async () => { + if (activeChatId) { + try { + console.log('🔍 Polling for YAML files with chat ID:', activeChatId) + const yamlFiles = await apiService.getYamlFiles(activeChatId) + console.log('📄 YAML files received:', yamlFiles?.length || 0, 'files') + + if (yamlFiles && yamlFiles.length > 0) { + const agentsFile = yamlFiles.find(f => f.name === 'agents.yaml') + console.log('🔎 Agents file found:', !!agentsFile, 'has content:', !!agentsFile?.content?.trim()) + + if (agentsFile && agentsFile.content.trim()) { + console.log('🎉 Found agents.yaml! Updating UI immediately') + console.log('📝 Agents content preview:', agentsFile.content.substring(0, 100) + '...') + setYamlFiles(yamlFiles) + setActiveYamlTab('agents.yaml') + clearInterval(yamlPollingInterval) + } + } else { + console.log('📭 No YAML files found yet') + } + } catch (error) { + console.log('⚠️ YAML polling error:', error) + } + } else { + console.log('⚠️ No activeChatId available for YAML polling') + } + }, 1500) + const originalStop = stopStatusPolling + stopStatusPolling = () => { + clearInterval(yamlPollingInterval) + if (originalStop) originalStop() + } } try { - const { requestId } = await apiService.sendMessageAsync(content, currentChatId || undefined); - startPollingForRequest(requestId); + const { requestId, chatId } = await apiService.sendMessageAsync(content, currentChatId || undefined); + + if (!currentChatId) { + console.log('🆔 Setting currentChatId to:', chatId); + setCurrentChatId(chatId); + } else { + console.log('🆔 Using existing currentChatId:', currentChatId); + } + + startPollingForRequest(requestId, currentChatId || chatId); let apiResponse: any = null; let attempts = 0; - const maxAttempts = 120; + const maxAttempts = 240; while (attempts < maxAttempts && !apiResponse) { await new Promise(resolve => setTimeout(resolve, 1000)); diff --git a/src/services/api.ts b/src/services/api.ts index 74d5c85..0c13590 100644 --- a/src/services/api.ts +++ b/src/services/api.ts @@ -108,7 +108,7 @@ class ApiService { } } - async sendMessageAsync(message: string, chatId?: string): Promise<{requestId: string}> { + async sendMessageAsync(message: string, chatId?: string): Promise<{requestId: string, chatId: string}> { const response = await fetch(`${API_BASE_URL}/api/supervisor-async`, { method: 'POST', headers: { @@ -124,7 +124,7 @@ class ApiService { } const data = await response.json() - return { requestId: data.request_id } + return { requestId: data.request_id, chatId: data.chat_id } } async getAsyncResult(requestId: string): Promise { From 0a4960192bca51270d7bb7d8a056f4a4de914855 Mon Sep 17 00:00:00 2001 From: george-lhj Date: Thu, 21 Aug 2025 23:34:28 -0700 Subject: [PATCH 17/20] add dynamic dots --- src/components/ChatCanvas.tsx | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/components/ChatCanvas.tsx b/src/components/ChatCanvas.tsx index 8ea0993..5027d1b 100644 --- a/src/components/ChatCanvas.tsx +++ b/src/components/ChatCanvas.tsx @@ -1,4 +1,4 @@ -import { useEffect, useRef } from 'react' +import { useEffect, useRef, useState } from 'react' import { Bot, User, Code, FileText, Loader2 } from 'lucide-react' import type { Message } from '../App' import { cn } from '../lib/utils' @@ -10,6 +10,7 @@ interface ChatCanvasProps { export function ChatCanvas({ messages, isLoading = false }: ChatCanvasProps) { const messagesEndRef = useRef(null) + const [dots, setDots] = useState('') const scrollToBottom = () => { messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' }) @@ -19,6 +20,16 @@ export function ChatCanvas({ messages, isLoading = false }: ChatCanvasProps) { scrollToBottom() }, [messages]) + useEffect(() => { + if (!isLoading) return + + const interval = setInterval(() => { + setDots(prev => prev.length >= 3 ? '' : prev + '.') + }, 500) + + return () => clearInterval(interval) + }, [isLoading]) + const formatTime = (date: Date) => { return date.toLocaleTimeString([], { hour: '2-digit', minute: '2-digit' }) } @@ -124,7 +135,7 @@ export function ChatCanvas({ messages, isLoading = false }: ChatCanvasProps) {
- Thinking... + Thinking{dots}
From 9456586e2e24052adc171897a2b96296669617bc Mon Sep 17 00:00:00 2001 From: george-lhj Date: Thu, 21 Aug 2025 23:37:38 -0700 Subject: [PATCH 18/20] remove static dots --- api/main.py | 4 ++-- src/App.tsx | 4 ++-- src/components/ChatInput.tsx | 2 +- src/components/Sidebar.tsx | 2 +- src/components/YamlPanel.tsx | 4 ++-- src/services/api.ts | 2 +- 6 files changed, 9 insertions(+), 9 deletions(-) diff --git a/api/main.py b/api/main.py index 5e1e841..de601f9 100644 --- a/api/main.py +++ b/api/main.py @@ -325,7 +325,7 @@ def to_line(obj: Dict[str, Any]) -> bytes: await asyncio.sleep(0) yield to_line({"type": "status", "message": "(Planning agents)"}) await asyncio.sleep(0) - yield to_line({"type": "status", "message": "Generating agents.yaml..."}) + yield to_line({"type": "status", "message": "Generating agents.yaml"}) await asyncio.sleep(0) agents_output, agents_yaml = await generate_agents_yaml(message.content) @@ -369,7 +369,7 @@ def to_line(obj: Dict[str, Any]) -> bytes: yield to_line({"type": "status", "message": "(Building workflow prompt)"}) await asyncio.sleep(0) - yield to_line({"type": "status", "message": "Generating workflow.yaml..."}) + yield to_line({"type": "status", "message": "Generating workflow.yaml"}) await asyncio.sleep(0) async with httpx.AsyncClient(timeout=180) as client: diff --git a/src/App.tsx b/src/App.tsx index 0b6bf86..5087693 100644 --- a/src/App.tsx +++ b/src/App.tsx @@ -182,7 +182,7 @@ function App() { setMessages(prev => [...prev, { id: assistantLogId, role: 'assistant', - content: 'Starting…', + content: 'Starting', timestamp: new Date() }]) @@ -213,7 +213,7 @@ function App() { if (agentsFile && agentsFile.content.trim()) { console.log('🎉 Found agents.yaml! Updating UI immediately') - console.log('📝 Agents content preview:', agentsFile.content.substring(0, 100) + '...') + console.log('📝 Agents content preview:', agentsFile.content.substring(0, 100)) setYamlFiles(yamlFiles) setActiveYamlTab('agents.yaml') clearInterval(yamlPollingInterval) diff --git a/src/components/ChatInput.tsx b/src/components/ChatInput.tsx index 3622424..41a0c35 100644 --- a/src/components/ChatInput.tsx +++ b/src/components/ChatInput.tsx @@ -141,7 +141,7 @@ export function ChatInput({ onSendMessage, disabled = false, streamingEnabled = } }} onKeyDown={handleKeyDown} - placeholder={disabled ? "Processing..." : "Ask me to help you build your Maestro workflow..."} + placeholder={disabled ? "Processing" : "Ask me to help you build your Maestro workflow"} className="w-full h-full min-h-[44px] max-h-32 resize-none bg-transparent border-none outline-none text-base placeholder:text-gray-400 leading-relaxed disabled:opacity-50 font-['Inter',-apple-system,BlinkMacSystemFont,'Segoe_UI',Roboto,sans-serif]" rows={1} disabled={disabled} diff --git a/src/components/Sidebar.tsx b/src/components/Sidebar.tsx index 8cf7e5d..ec41bc8 100644 --- a/src/components/Sidebar.tsx +++ b/src/components/Sidebar.tsx @@ -28,7 +28,7 @@ export function Sidebar({ chatHistory, currentChatId, onLoadChat, onCreateChat, const truncateText = (text: string, maxLength: number = 50) => { if (text.length <= maxLength) return text - return text.substring(0, maxLength) + '...' + return text.substring(0, maxLength) } const handleDeleteAll = async () => { diff --git a/src/components/YamlPanel.tsx b/src/components/YamlPanel.tsx index 203a297..5e5192f 100644 --- a/src/components/YamlPanel.tsx +++ b/src/components/YamlPanel.tsx @@ -359,7 +359,7 @@ export function YamlPanel({ yamlFiles, isLoading = false, activeTab, setActiveTa {isLoading && (
- Updating... + Updating
)}