diff --git a/api/main.py b/api/main.py index a79eddf..de601f9 100644 --- a/api/main.py +++ b/api/main.py @@ -11,6 +11,7 @@ from datetime import datetime from api.ai_agent import MaestroBuilderAgent from api.database import Database +from api.supervisor import SupervisorAgent, Intent import uuid import subprocess import tempfile @@ -21,6 +22,7 @@ import asyncio from pathlib import Path import httpx +from concurrent.futures import ThreadPoolExecutor # Initialize FastAPI app app = FastAPI( @@ -104,6 +106,27 @@ class ValidateYamlResponse(BaseModel): message: str errors: List[str] = [] +# Status tracking for frontend updates +status_updates = {} +last_sent_index = {} +request_results = {} + +def create_status_logger(chat_id: str): + """Create a logger function that tracks status updates for the frontend.""" + def log_status(message: str, level: str = "info"): + if chat_id not in status_updates: + status_updates[chat_id] = [] + update = { + "message": message, + "level": level, + "timestamp": datetime.now().isoformat() + } + status_updates[chat_id].append(update) + return log_status + +supervisor_agent = SupervisorAgent() +executor = ThreadPoolExecutor(max_workers=4) + # --------------------------------------- # Service Functions # --------------------------------------- @@ -302,7 +325,7 @@ def to_line(obj: Dict[str, Any]) -> bytes: await asyncio.sleep(0) yield to_line({"type": "status", "message": "(Planning agents)"}) await asyncio.sleep(0) - yield to_line({"type": "status", "message": "Generating agents.yaml..."}) + yield to_line({"type": "status", "message": "Generating agents.yaml"}) await asyncio.sleep(0) agents_output, agents_yaml = await generate_agents_yaml(message.content) @@ -346,7 +369,7 @@ def to_line(obj: Dict[str, Any]) -> bytes: yield to_line({"type": "status", "message": "(Building workflow prompt)"}) await asyncio.sleep(0) - yield to_line({"type": "status", "message": "Generating workflow.yaml..."}) + yield to_line({"type": "status", "message": "Generating workflow.yaml"}) await asyncio.sleep(0) async with httpx.AsyncClient(timeout=180) as client: @@ -553,23 +576,21 @@ async def delete_chat_session(chat_id: str): @app.post("/api/edit_yaml", response_model=EditYamlResponse) async def edit_yaml(request: EditYamlRequest): + """Edit YAML content based on user instruction.""" + if not request.yaml or not request.yaml.strip(): + raise HTTPException(status_code=400, detail="YAML content cannot be empty") + if not request.instruction or not request.instruction.strip(): + raise HTTPException(status_code=400, detail="Instruction cannot be empty") + if not request.file_type or request.file_type not in ["agents", "workflow"]: + raise HTTPException(status_code=400, detail="File type must be 'agents' or 'workflow'") + try: - # Build the prompt for the editing agent - prompt = f"Current YAML file (type: {request.file_type}):\n{request.yaml}\n\nUser instruction: {request.instruction}\n\nPlease apply the requested edit and return only the updated YAML file." - resp = requests.post( - "http://localhost:8002/chat", - json={"prompt": prompt}, + file_name = f"{request.file_type}.yaml" + edited_yaml = supervisor_agent.edit_yaml( + yaml_content=request.yaml, + file_to_edit=file_name, + instruction=request.instruction ) - if resp.status_code != 200: - raise Exception(resp.text) - edited_yaml = resp.json().get("response", "") - # Remove markdown formatting if present - if "```yaml" in edited_yaml: - edited_yaml = ( - edited_yaml.split("```yaml", 1)[-1].split("```", 1)[0].strip() - ) - elif "```" in edited_yaml: - edited_yaml = edited_yaml.split("```", 1)[-1].split("```", 1)[0].strip() return {"edited_yaml": edited_yaml} except Exception as e: raise HTTPException(status_code=500, detail=f"Editing Agent failed: {e}") @@ -578,7 +599,6 @@ async def edit_yaml(request: EditYamlRequest): @app.post("/api/validate_yaml", response_model=ValidateYamlResponse) async def validate_yaml(request: ValidateYamlRequest): try: - # fix double-escaped characters import codecs unescaped_content = codecs.decode(request.yaml_content, 'unicode_escape') with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as temp_file: @@ -648,6 +668,152 @@ def strip_ansi_codes(text): ) +class SupervisorRequest(BaseModel): + content: str + chat_id: Optional[str] = None + +class SupervisorResponse(BaseModel): + intent: str + confidence: float + reasoning: str + response: str + yaml_files: List[Dict[str, str]] + chat_id: str + +class AsyncSupervisorResponse(BaseModel): + request_id: str + status: str + message: str + chat_id: str + +def store_request_result(request_id: str, result): + """Store the result of a background request.""" + if isinstance(result, dict) and "error" not in result: + supervisor_result = SupervisorResponse( + intent=result["intent"], + confidence=result["confidence"], + reasoning=result["reasoning"], + response=result["response"], + yaml_files=result["yaml_files"], + chat_id=result["chat_id"], + ) + request_results[request_id] = supervisor_result + else: + request_results[request_id] = result + +@app.post("/api/supervisor", response_model=SupervisorResponse) +async def supervisor_route(request: SupervisorRequest): + """ + Synchronous supervisor endpoint that processes requests directly. + For asynchronous processing with real-time updates, use /api/supervisor-async. + """ + if not request.content or not request.content.strip(): + raise HTTPException(status_code=400, detail="Request content cannot be empty") + + chat_id = request.chat_id or str(uuid.uuid4()) + + try: + result_container = {} + + def sync_result_callback(request_id: str, result): + result_container[request_id] = result + + temp_request_id = "sync_request" + supervisor_agent.process_request_in_background( + temp_request_id, + request.content, + chat_id, + create_status_logger, + sync_result_callback, + db + ) + + result = result_container.get(temp_request_id) + + if result and isinstance(result, dict) and "error" in result: + raise HTTPException(status_code=500, detail=result["message"]) + + if isinstance(result, dict): + return SupervisorResponse( + intent=result["intent"], + confidence=result["confidence"], + reasoning=result["reasoning"], + response=result["response"], + yaml_files=result["yaml_files"], + chat_id=result["chat_id"], + ) + + return result + + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"Supervisor processing failed: {str(e)}") + +@app.post("/api/supervisor-async", response_model=AsyncSupervisorResponse) +async def supervisor_route_async(request: SupervisorRequest): + """ + Async version of supervisor endpoint that starts background processing + and returns immediately with a request ID for polling. + """ + request_id = str(uuid.uuid4()) + chat_id = request.chat_id or str(uuid.uuid4()) + # Start background processing using the supervisor agent + executor.submit( + supervisor_agent.process_request_in_background, + request_id, + request.content, + chat_id, + create_status_logger, + store_request_result, + db + ) + + return AsyncSupervisorResponse( + request_id=request_id, + status="processing", + message="Request started, use the request_id to poll for status and results", + chat_id=chat_id + ) + + +@app.get("/api/supervisor-result/{request_id}") +async def get_supervisor_result(request_id: str): + """Get the result of an async supervisor request.""" + if request_id in request_results: + result = request_results[request_id] + del request_results[request_id] + return result + else: + return {"status": "processing", "message": "Request still in progress"} + + +@app.get("/api/status/{chat_id}") +async def get_status_updates(chat_id: str): + """Get status updates for a specific chat ID.""" + if chat_id in status_updates: + all_updates = status_updates[chat_id] + last_index = last_sent_index.get(chat_id, 0) + new_updates = all_updates[last_index:] + + if new_updates: + last_sent_index[chat_id] = len(all_updates) + return {"updates": new_updates} + else: + return {"updates": []} + return {"updates": []} + + +@app.delete("/api/status/{chat_id}") +async def clear_status_updates(chat_id: str): + """Clear status updates for a specific chat ID.""" + if chat_id in status_updates: + del status_updates[chat_id] + if chat_id in last_sent_index: + del last_sent_index[chat_id] + return {"message": "Status updates cleared"} + + @app.get("/api/health") async def health_check(): try: diff --git a/api/supervisor.py b/api/supervisor.py new file mode 100644 index 0000000..f9c973b --- /dev/null +++ b/api/supervisor.py @@ -0,0 +1,535 @@ +""" +Supervisor Agent Module + +This module contains the supervisor agent logic that classifies user intent +and routes requests to appropriate handlers (workflow generation or YAML editing). +""" + +import json +import re +import yaml +import requests +from enum import Enum +from dataclasses import dataclass +from typing import List, Dict, Tuple + + +class Intent(str, Enum): + GENERATE_WORKFLOW = "GENERATE_WORKFLOW" + EDIT_YAML = "EDIT_YAML" + + +@dataclass +class Classification: + intent: Intent + confidence: float + reasoning: str + + +class SupervisorAgent: + """ + Supervisor agent that classifies user intent and coordinates workflow generation + or YAML editing based on the classification. + """ + + SUPERVISOR_URL = "http://localhost:8005/chat" + AGENTS_GENERATION_URL = "http://localhost:8003/chat" + WORKFLOW_GENERATION_URL = "http://localhost:8004/chat" + EDITING_URL = "http://localhost:8001/api/edit_yaml" + + def __init__(self, timeout_seconds: int = 30, logger_callback=None): + self.timeout_seconds = timeout_seconds + self.logger_callback = logger_callback + + def _log(self, message: str, level: str = "info"): + """Log a message using the provided callback or print to console.""" + if self.logger_callback: + self.logger_callback(message, level) + else: + print(f"[{level.upper()}] {message}") + + def classify_user_intent( + self, + user_input: str, + agents_yaml_content: str = "", + workflow_yaml_content: str = "", + ) -> Classification: + """ + Classify user intent using the supervisor agent. + + Args: + user_input: The user's request + agents_yaml_content: Current agents YAML content for context + workflow_yaml_content: Current workflow YAML content for context + + Returns: + Classification object with intent, confidence, and reasoning + + Raises: + Exception: If supervisor agent fails or returns invalid response + """ + self._log("πŸ” Starting to classify user prompt...") + self._log(f"πŸ“ User input: {user_input[:100]}{'...' if len(user_input) > 100 else ''}") + + supervisor_prompt = self._build_classification_prompt( + user_input, agents_yaml_content, workflow_yaml_content + ) + + try: + self._log("πŸ€– Sending request to supervisor agent for intent classification...") + resp = requests.post( + self.SUPERVISOR_URL, + json={"prompt": supervisor_prompt, "agent": "IntentClassifier"}, + timeout=self.timeout_seconds, + ) + + if resp.status_code != 200: + self._log(f"❌ Supervisor agent failed with status {resp.status_code}", "error") + raise Exception( + f"Supervisor agent failed with status {resp.status_code}: {resp.text}" + ) + + supervisor_response = resp.json().get("response", "") + classification = self._parse_classification_response(supervisor_response) + + self._log(f"βœ… Intent classified as: {classification.intent.value} (confidence: {classification.confidence:.2f})") + self._log(f"πŸ’­ Reasoning: {classification.reasoning}") + + return classification + + except requests.RequestException as e: + self._log(f"❌ Failed to communicate with supervisor agent: {str(e)}", "error") + raise Exception(f"Failed to communicate with supervisor agent: {str(e)}") + + def _build_classification_prompt( + self, user_input: str, agents_yaml_content: str, workflow_yaml_content: str + ) -> str: + """Build the prompt for intent classification.""" + return f"""You are an intent classifier. Determine if the user wants to GENERATE_WORKFLOW or EDIT_YAML. + +User input: {user_input} + +Current YAML files (if any): +Agents YAML: +{agents_yaml_content} + +Workflow YAML: +{workflow_yaml_content} + +Return ONLY valid JSON (no prose, no markdown) with the following schema: +{{ + "intent": "GENERATE_WORKFLOW" | "EDIT_YAML", + "confidence": number, // 0.0 to 1.0 + "reasoning": string +}} + +Example valid responses: +{{"intent":"GENERATE_WORKFLOW","confidence":0.92,"reasoning":"User is asking to create a new flow"}} +{{"intent":"EDIT_YAML","confidence":0.87,"reasoning":"User wants to modify existing YAML"}}""" + + def _parse_classification_response(self, supervisor_response: str) -> Classification: + """Parse the JSON response from the supervisor agent.""" + try: + parsed = json.loads(supervisor_response) + raw_intent = str(parsed.get("intent", "")).upper() + + # Validate intent value + intent = ( + Intent(raw_intent) + if raw_intent in (Intent.GENERATE_WORKFLOW.value, Intent.EDIT_YAML.value) + else Intent.GENERATE_WORKFLOW + ) + + confidence = float(parsed.get("confidence", 1.0)) + reasoning = str(parsed.get("reasoning", "")) + + return Classification(intent=intent, confidence=confidence, reasoning=reasoning) + + except (json.JSONDecodeError, ValueError, KeyError) as e: + # Fallback to default intent if parsing fails + return Classification( + intent=Intent.GENERATE_WORKFLOW, + confidence=0.5, + reasoning=f"Defaulted due to supervisor parsing error: {str(e)}", + ) + + def generate_agents_yaml(self, user_input: str) -> str: + """ + Generate agents YAML from user input. + + Args: + user_input: The user's request + + Returns: + Generated agents YAML content + + Raises: + Exception: If agents generation fails + """ + self._log("πŸ—οΈ Starting agents YAML generation...") + self._log("πŸ“‘ Connecting to agents generation service (port 8003)...") + + try: + resp = requests.post( + self.AGENTS_GENERATION_URL, + json={"prompt": user_input, "agent": "TaskInterpreter"}, + timeout=120, # Longer timeout for generation + ) + + if resp.status_code != 200: + self._log(f"❌ Agents generation failed with status {resp.status_code}", "error") + raise Exception(f"Agents generation failed: {resp.text}") + + self._log("βœ… Agents generation completed successfully") + self._log("πŸ”„ Extracting YAML content from response...") + + agents_output = resp.json().get("response", "") + agents_yaml = self._extract_yaml_from_output(agents_output) + + self._log(f"πŸ“„ Generated agents YAML ({len(agents_yaml)} characters)") + + return agents_yaml + + except requests.RequestException as e: + self._log(f"❌ Failed to communicate with agents generation service: {str(e)}", "error") + raise Exception(f"Failed to communicate with agents generation service: {str(e)}") + + def generate_workflow_yaml(self, workflow_prompt: str) -> str: + """ + Generate workflow YAML from workflow prompt. + + Args: + workflow_prompt: The constructed workflow prompt + + Returns: + Generated workflow YAML content + + Raises: + Exception: If workflow generation fails + """ + self._log("βš™οΈ Starting workflow YAML generation...") + self._log("πŸ“‘ Connecting to workflow generation service (port 8004)...") + + try: + resp = requests.post( + self.WORKFLOW_GENERATION_URL, + json={"prompt": workflow_prompt, "agent": "WorkflowYAMLBuilder"}, + timeout=120, # Longer timeout for generation + ) + + if resp.status_code != 200: + self._log(f"❌ Workflow generation failed with status {resp.status_code}", "error") + raise Exception(f"Workflow generation failed: {resp.text}") + + self._log("βœ… Workflow generation completed successfully") + self._log("πŸ”„ Extracting YAML content from response...") + + workflow_output = resp.json().get("response", "") + workflow_yaml = self._extract_yaml_from_output(workflow_output) + + self._log(f"πŸ“„ Generated workflow YAML ({len(workflow_yaml)} characters)") + + return workflow_yaml + + except requests.RequestException as e: + self._log(f"❌ Failed to communicate with workflow generation service: {str(e)}", "error") + raise Exception(f"Failed to communicate with workflow generation service: {str(e)}") + + def edit_yaml(self, yaml_content: str, file_to_edit: str, instruction: str) -> str: + """ + Edit existing YAML content based on user instruction. + + Args: + yaml_content: Current YAML content + file_to_edit: Name of the file being edited + instruction: User's editing instruction + + Returns: + Edited YAML content + + Raises: + Exception: If YAML editing fails + """ + self._log(f"✏️ Starting YAML editing for {file_to_edit}...") + self._log("πŸ“‘ Connecting to editing service (port 8002)...") + + try: + resp = requests.post( + "http://localhost:8002/chat", + json={ + "prompt": f"Current YAML file (type: {file_to_edit.split('.')[0]}):\n{yaml_content}\n\nUser instruction: {instruction}\n\nPlease apply the requested edit and return only the updated YAML file." + }, + timeout=self.timeout_seconds, + ) + + if resp.status_code != 200: + self._log(f"❌ YAML editing failed with status {resp.status_code}", "error") + raise Exception(f"YAML editing failed: {resp.text}") + + self._log("βœ… YAML editing completed successfully") + self._log("πŸ”„ Extracting edited YAML content...") + + edited_output = resp.json().get("response", "") + edited_yaml = self._extract_yaml_from_output(edited_output) + + self._log(f"πŸ“„ Edited YAML ready ({len(edited_yaml)} characters)") + + return edited_yaml + + except requests.RequestException as e: + self._log(f"❌ Failed to communicate with editing service: {str(e)}", "error") + raise Exception(f"Failed to communicate with editing service: {str(e)}") + + def _extract_yaml_from_output(self, text: str) -> str: + """Extract YAML content from model output.""" + if "```yaml" in text: + return text.split("```yaml", 1)[-1].split("```", 1)[0].strip() + if "```" in text: + return text.split("```", 1)[-1].split("```", 1)[0].strip() + + # Try to find YAML content starting with apiVersion + yaml_match = re.search(r"apiVersion:.*?(?=\n\n|\Z)", text, re.DOTALL) + return yaml_match.group(0).strip() if yaml_match else text.strip() + + def parse_agents_yaml_to_info(self, agents_yaml: str) -> List[Dict[str, str]]: + """ + Parse agents YAML to extract agent information for workflow generation. + + Args: + agents_yaml: The agents YAML content + + Returns: + List of dictionaries with agent name and description + """ + agents_info: List[Dict[str, str]] = [] + + try: + # First try to parse as properly structured YAML with separators + agent_blocks = agents_yaml.split("---") + for block in agent_blocks: + if block.strip(): + agent_data = yaml.safe_load(block) + if ( + agent_data + and "metadata" in agent_data + and "name" in agent_data["metadata"] + ): + name = agent_data["metadata"]["name"] + description = agent_data.get("spec", {}).get("description", "") + agents_info.append({"name": name, "description": description}) + if not agents_info: + name_count = len(re.findall(r"^name:\s*\w+", agents_yaml, re.MULTILINE)) + if name_count > 1: + raise yaml.YAMLError("Multiple name entries detected - using regex fallback") + else: + agent_data = yaml.safe_load(agents_yaml) + if agent_data and isinstance(agent_data, dict): + if "name" in agent_data: + name = agent_data["name"] + description = agent_data.get("description", "") + agents_info.append({"name": name, "description": description}) + + except yaml.YAMLError: + name_matches = re.findall(r"name:\s*(\w+)", agents_yaml) + desc_matches = re.findall( + r"description:\s*\|\s*\n\s*(.+?)(?=\nname:|$)", agents_yaml, re.DOTALL + ) + for i, name in enumerate(name_matches): + description = desc_matches[i].strip() if i < len(desc_matches) else "" + agents_info.append({"name": name, "description": description}) + + return agents_info + + def build_workflow_prompt(self, agents_info: List[Dict[str, str]], user_input: str) -> str: + """ + Build workflow generation prompt from agents info and user input. + + Args: + agents_info: List of agent information dictionaries + user_input: Original user request + + Returns: + Constructed workflow prompt + """ + workflow_prompt = "Create a workflow that uses the following agents:\n\n" + for i, agent in enumerate(agents_info, 1): + workflow_prompt += f"agent{i}: {agent['name']} – {agent['description']}\n" + workflow_prompt += f"\nprompt: {user_input}" + return workflow_prompt + + def process_complete_workflow_generation(self, user_input: str, chat_id: str = None, db_instance=None) -> Tuple[str, str]: + """ + Process complete workflow generation (both agents and workflow). + + Args: + user_input: User's request + + Returns: + Tuple of (agents_yaml, workflow_yaml) + + Raises: + Exception: If any step of the generation process fails + """ + self._log("πŸš€ Starting complete workflow generation process...") + + agents_yaml = self.generate_agents_yaml(user_input) + self._log("βœ… agents.yaml generated!") + + # Immediately save agents.yaml to database so frontend can see it + if chat_id and db_instance: + try: + db_instance.update_yaml_files(chat_id, {"agents.yaml": agents_yaml}) + self._log("πŸ’Ύ Saved agents.yaml to database for immediate viewing") + except Exception as e: + self._log(f"⚠️ Could not save agents.yaml immediately: {e}", "warning") + + self._log("πŸ” Parsing generated agents for workflow creation...") + agents_info = self.parse_agents_yaml_to_info(agents_yaml) + self._log(f"πŸ“‹ Found {len(agents_info)} agents to include in workflow") + + self._log("πŸ“ Building workflow generation prompt...") + workflow_prompt = self.build_workflow_prompt(agents_info, user_input) + workflow_yaml = self.generate_workflow_yaml(workflow_prompt) + + self._log("πŸŽ‰ Complete workflow generation finished successfully!") + + return agents_yaml, workflow_yaml + + def build_success_response(self, intent: Intent, user_request: str, file_edited: str = None) -> str: + """ + Build success response message based on the operation performed. + + Args: + intent: The intent that was processed + user_request: Original user request + file_edited: Name of file that was edited (for edit operations) + + Returns: + Success message string + """ + if intent == Intent.EDIT_YAML and file_edited: + return f"Successfully edited {file_edited} based on your request: {user_request}" + + return ( + "βœ… Successfully generated both agents.yaml and workflow.yaml from your prompt!\n\n" + f"Your request: \"{user_request}\"\n\n" + "I've created:\n" + "β€’ **agents.yaml** - Contains the agent definitions\n" + "β€’ **workflow.yaml** - Contains the workflow that uses those agents\n\n" + "Both files are now available in the YAML panel on the right. You can switch between tabs to view each file." + ) + + def process_request_in_background(self, request_id: str, content: str, chat_id: str, + status_logger_callback, result_callback, db_instance): + """ + Background function to process supervisor requests with real-time status updates. + + Args: + request_id: Unique identifier for this request + content: User's request content + chat_id: Chat session ID + status_logger_callback: Function to call for status updates + result_callback: Function to call with final result + db_instance: Database instance for YAML file retrieval + """ + try: + status_logger = status_logger_callback(request_id) + logged_supervisor = SupervisorAgent(logger_callback=status_logger) + + status_logger("🎯 Processing your request...") + + # Get existing YAML content for context + agents_yaml_content = "" + workflow_yaml_content = "" + + if chat_id: + try: + status_logger("πŸ“‚ Loading existing YAML files for context...") + yaml_files = db_instance.get_yaml_files(chat_id) + for file in yaml_files: + if file['name'] == 'agents.yaml': + agents_yaml_content = file['content'] + elif file['name'] == 'workflow.yaml': + workflow_yaml_content = file['content'] + if agents_yaml_content or workflow_yaml_content: + status_logger("βœ… Found existing YAML files to use as context") + else: + status_logger("πŸ“ No existing YAML files found, starting fresh") + except Exception as e: + status_logger(f"⚠️ Could not fetch YAML files for context: {e}", "warning") + + # Classify user intent + classification = logged_supervisor.classify_user_intent( + content, agents_yaml_content, workflow_yaml_content + ) + + # Handle EDIT_YAML intent + if classification.intent == Intent.EDIT_YAML: + if not agents_yaml_content and not workflow_yaml_content: + status_logger("⚠️ No existing YAML files found, switching to workflow generation") + classification.intent = Intent.GENERATE_WORKFLOW + else: + file_to_edit = "agents.yaml" if agents_yaml_content else "workflow.yaml" + yaml_content = agents_yaml_content if agents_yaml_content else workflow_yaml_content + + status_logger(f"✏️ Editing {file_to_edit}...") + + edited_yaml = logged_supervisor.edit_yaml( + yaml_content=yaml_content, + file_to_edit=file_to_edit, + instruction=content, + ) + + status_logger(f"βœ… Successfully edited {file_to_edit}") + + response_text = logged_supervisor.build_success_response( + Intent.EDIT_YAML, content, file_to_edit + ) + + result = { + "intent": classification.intent.value, + "confidence": float(classification.confidence), + "reasoning": classification.reasoning or "Successfully routed to editing", + "response": response_text, + "yaml_files": [{"name": file_to_edit, "content": edited_yaml}], + "chat_id": chat_id, + } + + result_callback(request_id, result) + status_logger("πŸŽ‰ Request completed successfully!") + return + + # Handle GENERATE_WORKFLOW intent + status_logger("🎯 Routing to workflow generation...") + + agents_yaml, workflow_yaml = logged_supervisor.process_complete_workflow_generation(content, chat_id, db_instance) + + response_text = logged_supervisor.build_success_response( + Intent.GENERATE_WORKFLOW, content + ) + + result = { + "intent": Intent.GENERATE_WORKFLOW.value, + "confidence": float(classification.confidence) if classification else 1.0, + "reasoning": classification.reasoning or "Successfully routed to workflow generation", + "response": response_text, + "yaml_files": [ + {"name": "agents.yaml", "content": agents_yaml}, + {"name": "workflow.yaml", "content": workflow_yaml}, + ], + "chat_id": chat_id, + } + + result_callback(request_id, result) + status_logger("πŸŽ‰ Request completed successfully!") + + except Exception as e: + status_logger = status_logger_callback(request_id) + status_logger(f"❌ Error occurred: {str(e)}", "error") + + # Store error result + error_result = { + "error": True, + "message": str(e) + } + result_callback(request_id, error_result) diff --git a/meta-agents-v2/supervisor_agent/agents.yaml b/meta-agents-v2/supervisor_agent/agents.yaml new file mode 100644 index 0000000..e6dec22 --- /dev/null +++ b/meta-agents-v2/supervisor_agent/agents.yaml @@ -0,0 +1,58 @@ +apiVersion: maestro/v1alpha1 +kind: Agent +metadata: + name: IntentClassifier + labels: + app: supervisor-agent +spec: + model: gpt-oss:latest + framework: openai + mode: local + description: | + Classifies user intent to determine whether a query is for workflow generation or YAML editing. + instructions: | + You are an Intent Classification Agent. Your job is to analyze user input and determine whether the user wants to: + + 1. GENERATE_WORKFLOW - Create a new workflow or modify existing workflow structure + 2. EDIT_YAML - Make specific edits to existing YAML files (agents.yaml or workflow.yaml) + + You will receive: + - The user's input message + - Optionally, the current YAML files (agents.yaml and/or workflow.yaml) if they exist + + Classification Rules: + + GENERATE_WORKFLOW if the user wants to: + - Create a new workflow from scratch + - Generate a complete workflow based on a description + - Build a new pipeline or process + - Create a workflow that performs a specific task + - Examples: "Create a workflow that...", "I want to build a pipeline that...", "Generate a workflow for..." + + EDIT_YAML if the user wants to: + - Make specific edits to existing YAML files + - Change configuration values (models, API keys, parameters) + - Fix syntax errors in existing YAML + - Update agent descriptions or metadata + - Modify existing workflow structure or steps + - Add new agents to existing agents.yaml + - Change the sequence of steps in existing workflow + - Examples: "Change the model to gpt-4", "Fix the indentation", "Update the API key", "Change the temperature to 0.8", "Add a new agent", "Modify the workflow steps" + + Output Format: + Return ONLY valid JSON (no prose, no markdown) with the following schema: + { + "intent": "GENERATE_WORKFLOW" | "EDIT_YAML", + "confidence": number, // 0.0 to 1.0 + "reasoning": string + } + + Examples: + {"intent":"GENERATE_WORKFLOW","confidence":0.92,"reasoning":"User is asking to create a new workflow"} + {"intent":"EDIT_YAML","confidence":0.87,"reasoning":"User wants to modify existing YAML content"} + + The system will parse this JSON and route the request accordingly. + + Guidelines: + - Be conservative - if unsure, default to GENERATE_WORKFLOW + - Focus on the user's primary intent, not secondary effects diff --git a/meta-agents-v2/supervisor_agent/workflow.yaml b/meta-agents-v2/supervisor_agent/workflow.yaml new file mode 100644 index 0000000..c8c6d59 --- /dev/null +++ b/meta-agents-v2/supervisor_agent/workflow.yaml @@ -0,0 +1,17 @@ +apiVersion: maestro/v1alpha1 +kind: Workflow +metadata: + name: supervisor-agent + labels: + app: supervisor-agent +spec: + template: + metadata: + labels: + app: supervisor-agent + agents: + - IntentClassifier + prompt: I want to fetch the current stock prices for Apple and Microsoft, and then analyze which one has performed better over the past week. + steps: + - name: classify_intent + agent: IntentClassifier \ No newline at end of file diff --git a/src/App.tsx b/src/App.tsx index ab238b9..5087693 100644 --- a/src/App.tsx +++ b/src/App.tsx @@ -3,8 +3,7 @@ import { Sidebar } from './components/Sidebar' import { ChatCanvas } from './components/ChatCanvas' import { YamlPanel } from './components/YamlPanel' import { ChatInput } from './components/ChatInput' -import { apiService, type ChatHistory, type StreamEvent } from './services/api' -import axios from 'axios' +import { apiService, type ChatHistory } from './services/api' export interface Message { id: string @@ -19,15 +18,15 @@ export interface YamlFile { language: 'yaml' } +const INITIAL_GREETING: Message = { + id: '1', + role: 'assistant', + content: 'Hello! I\'m your Maestro AI Builder assistant. I can help you create workflows and edit YAML files. Just describe what you want to build or what changes you want to make, and I\'ll handle it automatically. What would you like to do today?', + timestamp: new Date() +} + function App() { - const [messages, setMessages] = useState([ - { - id: '1', - role: 'assistant', - content: 'Hello! I\'m your Maestro AI Builder assistant. I can help you create both agents.yaml and workflow.yaml files from a single prompt. Just describe what you want to build, and I\'ll generate both files automatically. What would you like to build today?', - timestamp: new Date() - } - ]) + const [messages, setMessages] = useState([INITIAL_GREETING]) const [yamlFiles, setYamlFiles] = useState([ { @@ -108,14 +107,10 @@ function App() { apiService.setCurrentChatId(chatId) // Reset to initial state with empty YAML files - setMessages([ - { - id: '1', - role: 'assistant', - content: 'Hello! I\'m your Maestro AI Builder assistant. I can help you create both agents.yaml and workflow.yaml files from a single prompt. Just describe what you want to build, and I\'ll generate both files automatically. What would you like to build today?', - timestamp: new Date() - } - ]) + setMessages([{ + ...INITIAL_GREETING, + timestamp: new Date() + }]) // Set YAML files to empty state setYamlFiles([ @@ -156,18 +151,13 @@ function App() { const deleteAllChats = async () => { try { - console.log('Starting delete all chats...') const success = await apiService.deleteAllChatSessions() - console.log('Delete all chats result:', success) if (success) { - console.log('Creating new chat after deletion...') // Create a new chat since all chats were deleted await createNewChat() - console.log('Refreshing chat history...') // Refresh chat history (should be empty now) await loadChatHistory() - console.log('Delete all chats completed successfully') } else { console.error('Failed to delete all chats - API returned false') } @@ -176,7 +166,7 @@ function App() { } } - const handleSendMessage = async (content: string, useStreaming: boolean = true) => { + const handleSendMessage = async (content: string) => { const userMessage: Message = { id: Date.now().toString(), role: 'user', @@ -192,156 +182,143 @@ function App() { setMessages(prev => [...prev, { id: assistantLogId, role: 'assistant', - content: 'Starting…', + content: 'Starting', timestamp: new Date() }]) - const mergeYaml = (incoming: { name: string; content: string }) => { - setYamlFiles(prev => { - const exists = prev.find(f => f.name === incoming.name) - if (exists) { - return prev.map(f => f.name === incoming.name ? { ...f, content: incoming.content } : f) - } - return [...prev, { name: incoming.name, content: incoming.content, language: 'yaml' }] + + let stopStatusPolling: (() => void) | undefined + const startPollingForRequest = (requestChatId: string, activeChatId: string) => { + stopStatusPolling = apiService.startStatusPolling(requestChatId, (updates) => { + updates.forEach(update => { + setMessages(prev => prev.map(m => + m.id === assistantLogId + ? { ...m, content: m.content ? `${m.content}\n${update.message}` : update.message } + : m + )) + }) }) + + // Also poll for YAML files during status updates + const yamlPollingInterval = setInterval(async () => { + if (activeChatId) { + try { + console.log('πŸ” Polling for YAML files with chat ID:', activeChatId) + const yamlFiles = await apiService.getYamlFiles(activeChatId) + console.log('πŸ“„ YAML files received:', yamlFiles?.length || 0, 'files') + + if (yamlFiles && yamlFiles.length > 0) { + const agentsFile = yamlFiles.find(f => f.name === 'agents.yaml') + console.log('πŸ”Ž Agents file found:', !!agentsFile, 'has content:', !!agentsFile?.content?.trim()) + + if (agentsFile && agentsFile.content.trim()) { + console.log('πŸŽ‰ Found agents.yaml! Updating UI immediately') + console.log('πŸ“ Agents content preview:', agentsFile.content.substring(0, 100)) + setYamlFiles(yamlFiles) + setActiveYamlTab('agents.yaml') + clearInterval(yamlPollingInterval) + } + } else { + console.log('πŸ“­ No YAML files found yet') + } + } catch (error) { + console.log('⚠️ YAML polling error:', error) + } + } else { + console.log('⚠️ No activeChatId available for YAML polling') + } + }, 1500) + const originalStop = stopStatusPolling + stopStatusPolling = () => { + clearInterval(yamlPollingInterval) + if (originalStop) originalStop() + } } + try { + const { requestId, chatId } = await apiService.sendMessageAsync(content, currentChatId || undefined); + + if (!currentChatId) { + console.log('πŸ†” Setting currentChatId to:', chatId); + setCurrentChatId(chatId); + } else { + console.log('πŸ†” Using existing currentChatId:', currentChatId); + } + + startPollingForRequest(requestId, currentChatId || chatId); + let apiResponse: any = null; + let attempts = 0; + const maxAttempts = 240; + + while (attempts < maxAttempts && !apiResponse) { + await new Promise(resolve => setTimeout(resolve, 1000)); + apiResponse = await apiService.getAsyncResult(requestId); + attempts++; + } + + if (!apiResponse) { + throw new Error('Request timed out after 2 minutes'); + } - // Start log streaming scoped to this run - const closeAgentsLogs = apiService.streamLogs('agents', false, data => { - if (data.type === 'log' && data.line) { - const line = data.line as string - setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: m.content ? `${m.content}\n${line}` : line } : m)) + // Parse AI response (final_prompt if available) + let parsedText = apiResponse.response + try { + const parsedJSON = JSON.parse(parsedText) + if (parsedJSON.final_prompt) { + parsedText = parsedJSON.final_prompt + } + } catch (e) { + // Not JSON β€” ignore } - }) - const closeWorkflowLogs = apiService.streamLogs('workflow', false, data => { - if (data.type === 'log' && data.line) { - const line = data.line as string - setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: m.content ? `${m.content}\n${line}` : line } : m)) + + // Don't strip YAML code blocks from the response text since we want to show the full response + // The YAML files are handled separately in the yaml_files array + + const assistantMessage: Message = { + id: (Date.now() + 1).toString(), + content: parsedText, + role: 'assistant', + timestamp: new Date() } - }) - try { - if (useStreaming) { - await apiService.streamGenerateMessage(content, { - onEvent: async (event: StreamEvent) => { - if (event.type === 'status') { - setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: m.content ? `${m.content}\n… ${event.message}` : `… ${event.message}` } : m)) - } - if (event.type === 'chat_id') { - if (event.chat_id !== currentChatId) { - setCurrentChatId(event.chat_id) - await loadChatHistory() - } - } - if (event.type === 'agents_yaml' || event.type === 'workflow_yaml') { - mergeYaml(event.file) - } - if (event.type === 'ai_output') { - const line = event.line - setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: m.content ? `${m.content}\n${line}` : line } : m)) - } - if (event.type === 'final') { - // Add a new assistant message with final content - let parsedText = event.response - try { - const parsedJSON = JSON.parse(parsedText as string) - if ((parsedJSON as any).final_prompt) parsedText = (parsedJSON as any).final_prompt - } catch {} - setMessages(prev => [...prev, { id: String(Date.now() + 2), role: 'assistant', content: parsedText, timestamp: new Date() }]) - // Ensure YAML files reflect final payload - event.yaml_files.forEach(file => mergeYaml(file)) - if (event.chat_id !== currentChatId) { - setCurrentChatId(event.chat_id) - await loadChatHistory() - } - } - if (event.type === 'error') { - setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: `${m.content}\nError: ${event.message}` } : m)) + setMessages(prev => [...prev, assistantMessage]) + + // Update YAML files from API response, ensuring both files are updated + const updatedYamlFiles = yamlFiles.map(file => { + const apiFile = apiResponse.yaml_files.find((apiFile: any) => apiFile.name === file.name) + if (apiFile) { + return { + ...file, + content: apiFile.content } - }, - onError: (err: Error) => { - console.error('Streaming error:', err) - }, - onComplete: () => { - setIsLoading(false) - // Close scoped log streams - closeAgentsLogs() - closeWorkflowLogs() - } - }, currentChatId || undefined) - } else { - const response = await apiService.sendGenerateMessage(content, currentChatId || undefined) - response.yaml_files.forEach(file => mergeYaml(file)) - setMessages(prev => [...prev, { - id: String(Date.now() + 2), - role: 'assistant', - content: response.response, - timestamp: new Date() - }]) - if (response.chat_id !== currentChatId) { - setCurrentChatId(response.chat_id) - await loadChatHistory() } - setIsLoading(false) - closeAgentsLogs() - closeWorkflowLogs() + return file + }) + setYamlFiles(updatedYamlFiles) + + // Update chat ID if it changed + if (apiResponse.chat_id !== currentChatId) { + setCurrentChatId(apiResponse.chat_id) + await loadChatHistory() + } + + setIsLoading(false) + if (stopStatusPolling) { + setTimeout(() => { + if (stopStatusPolling) { + stopStatusPolling() + } + }, 1000) } } catch (error) { console.error('Error processing message:', error) setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: `${m.content}\nSorry, I encountered an error while processing your request. Please try again.` } : m)) setIsLoading(false) - closeAgentsLogs() - closeWorkflowLogs() + if (stopStatusPolling) { + stopStatusPolling() + } } } - // Helper to get the currently active YAML file (default to agents.yaml) - const getCurrentYamlFile = () => { - // For now, just use agents.yaml (can be improved to track active tab) - return yamlFiles.find(f => f.name === 'agents.yaml') || yamlFiles[0] - } - - // Handler for editing YAML - const handleEditYaml = async (instruction: string) => { - const currentYamlFile = getCurrentYamlFile(); - if (!currentYamlFile) return; - setIsLoading(true); - // Add the edit instruction as a user message - const userEditMessage = { - id: Date.now().toString(), - role: 'user' as 'user', - content: `[Edit YAML] ${instruction}`, - timestamp: new Date() - }; - setMessages(prev => [...prev, userEditMessage]); - - try { - const response = await axios.post('/api/edit_yaml', { - yaml: currentYamlFile.content, - instruction, - file_type: currentYamlFile.name.includes('workflow') ? 'workflow' : 'agents', - }); - const editedYaml = response.data.edited_yaml; - // Add the editing agent's response as an assistant message - const assistantEditMessage = { - id: (Date.now() + 1).toString(), - role: 'assistant' as 'assistant', - content: editedYaml, - timestamp: new Date() - }; - setMessages(prev => [...prev, assistantEditMessage]); - - setYamlFiles(yamlFiles.map(f => - f.name === currentYamlFile.name - ? { ...f, content: editedYaml } - : f - )); - } catch (error) { - console.error('Error editing YAML:', error); - } finally { - setIsLoading(false); - } - }; const handleDirectYamlEdit = (fileName: string, newContent: string) => { setYamlFiles(prev => prev.map(f => @@ -371,12 +348,11 @@ function App() { {/* Chat Input */}
+ onSendMessage={handleSendMessage} + disabled={isLoading} + streamingEnabled={streamingEnabled} + onToggleStreaming={setStreamingEnabled} + />
diff --git a/src/components/ChatCanvas.tsx b/src/components/ChatCanvas.tsx index 8ea0993..5027d1b 100644 --- a/src/components/ChatCanvas.tsx +++ b/src/components/ChatCanvas.tsx @@ -1,4 +1,4 @@ -import { useEffect, useRef } from 'react' +import { useEffect, useRef, useState } from 'react' import { Bot, User, Code, FileText, Loader2 } from 'lucide-react' import type { Message } from '../App' import { cn } from '../lib/utils' @@ -10,6 +10,7 @@ interface ChatCanvasProps { export function ChatCanvas({ messages, isLoading = false }: ChatCanvasProps) { const messagesEndRef = useRef(null) + const [dots, setDots] = useState('') const scrollToBottom = () => { messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' }) @@ -19,6 +20,16 @@ export function ChatCanvas({ messages, isLoading = false }: ChatCanvasProps) { scrollToBottom() }, [messages]) + useEffect(() => { + if (!isLoading) return + + const interval = setInterval(() => { + setDots(prev => prev.length >= 3 ? '' : prev + '.') + }, 500) + + return () => clearInterval(interval) + }, [isLoading]) + const formatTime = (date: Date) => { return date.toLocaleTimeString([], { hour: '2-digit', minute: '2-digit' }) } @@ -124,7 +135,7 @@ export function ChatCanvas({ messages, isLoading = false }: ChatCanvasProps) {
- Thinking... + Thinking{dots}
diff --git a/src/components/ChatInput.tsx b/src/components/ChatInput.tsx index 5287c07..41a0c35 100644 --- a/src/components/ChatInput.tsx +++ b/src/components/ChatInput.tsx @@ -4,14 +4,13 @@ import { Send, Paperclip, Mic, ChevronDown, Lightbulb } from 'lucide-react' import { cn } from '../lib/utils' interface ChatInputProps { - onSendMessage: (message: string, useStreaming: boolean) => void - onEditYaml?: (instruction: string) => void + onSendMessage: (message: string) => void disabled?: boolean streamingEnabled?: boolean onToggleStreaming?: (enabled: boolean) => void } -export function ChatInput({ onSendMessage, onEditYaml, disabled = false, streamingEnabled = true, onToggleStreaming }: ChatInputProps) { +export function ChatInput({ onSendMessage, disabled = false, streamingEnabled = true, onToggleStreaming }: ChatInputProps) { const [message, setMessage] = useState('') const [isTyping, setIsTyping] = useState(false) const [showSuggestions, setShowSuggestions] = useState(false) @@ -39,7 +38,7 @@ export function ChatInput({ onSendMessage, onEditYaml, disabled = false, streami const handleSend = () => { if (message.trim() && !disabled) { - onSendMessage(message.trim(), streamingEnabled) + onSendMessage(message.trim()) setMessage('') setIsTyping(false) } @@ -54,7 +53,7 @@ export function ChatInput({ onSendMessage, onEditYaml, disabled = false, streami const handleSuggestionClick = (suggestion: string) => { if (!disabled) { - onSendMessage(suggestion, streamingEnabled) + onSendMessage(suggestion) setShowSuggestions(false) } } @@ -74,21 +73,7 @@ export function ChatInput({ onSendMessage, onEditYaml, disabled = false, streami - {/* Edit YAML Button */} - {onEditYaml && ( - - )} + {/* Suggestions Dropdown */}
@@ -156,7 +141,7 @@ export function ChatInput({ onSendMessage, onEditYaml, disabled = false, streami } }} onKeyDown={handleKeyDown} - placeholder={disabled ? "Processing..." : "Ask me to help you build your Maestro workflow..."} + placeholder={disabled ? "Processing" : "Ask me to help you build your Maestro workflow"} className="w-full h-full min-h-[44px] max-h-32 resize-none bg-transparent border-none outline-none text-base placeholder:text-gray-400 leading-relaxed disabled:opacity-50 font-['Inter',-apple-system,BlinkMacSystemFont,'Segoe_UI',Roboto,sans-serif]" rows={1} disabled={disabled} diff --git a/src/components/Sidebar.tsx b/src/components/Sidebar.tsx index 8cf7e5d..ec41bc8 100644 --- a/src/components/Sidebar.tsx +++ b/src/components/Sidebar.tsx @@ -28,7 +28,7 @@ export function Sidebar({ chatHistory, currentChatId, onLoadChat, onCreateChat, const truncateText = (text: string, maxLength: number = 50) => { if (text.length <= maxLength) return text - return text.substring(0, maxLength) + '...' + return text.substring(0, maxLength) } const handleDeleteAll = async () => { diff --git a/src/components/YamlPanel.tsx b/src/components/YamlPanel.tsx index 203a297..5e5192f 100644 --- a/src/components/YamlPanel.tsx +++ b/src/components/YamlPanel.tsx @@ -359,7 +359,7 @@ export function YamlPanel({ yamlFiles, isLoading = false, activeTab, setActiveTa {isLoading && (
- Updating... + Updating
)}
) : (
diff --git a/src/services/api.ts b/src/services/api.ts index 1ceaeb2..1a78803 100644 --- a/src/services/api.ts +++ b/src/services/api.ts @@ -70,34 +70,37 @@ class ApiService { async sendMessage(message: string, chatId?: string): Promise { try { - const response = await fetch(`${API_BASE_URL}/api/chat_builder_agent`, { + const response = await fetch(`${API_BASE_URL}/api/supervisor`, { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify({ content: message, - role: 'user', chat_id: chatId || this.currentChatId - } as ChatMessage & { chat_id?: string }) + }) }) if (!response.ok) { throw new Error(`HTTP error! status: ${response.status}`) } - const data: ChatResponse = await response.json() - - // Store the chat ID for future messages + const data = await response.json() + this.currentChatId = data.chat_id - // Ensure YAML files have proper formatting - data.yaml_files = data.yaml_files.map(file => ({ + data.yaml_files = data.yaml_files.map((file: any) => ({ ...file, content: this.formatYamlContent(file.content) })) - return data + const chatResponse: ChatResponse = { + response: data.response, + yaml_files: data.yaml_files, + chat_id: data.chat_id + } + + return chatResponse } catch (error) { console.error('Error sending message:', error) // Return a fallback response if API is not available @@ -105,90 +108,51 @@ class ApiService { } } - async sendAgentMessage(message: string, chatId?: string): Promise { - try { - const response = await fetch(`${API_BASE_URL}/api/chat_builder_agent`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - content: message, - role: 'user', - chat_id: chatId || this.currentChatId - } as ChatMessage & { chat_id?: string }) + async sendMessageAsync(message: string, chatId?: string): Promise<{requestId: string, chatId: string}> { + const response = await fetch(`${API_BASE_URL}/api/supervisor-async`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + content: message, + chat_id: chatId || this.currentChatId }) - if (!response.ok) { - throw new Error(`HTTP error! status: ${response.status}`) - } - const data: ChatResponse = await response.json() - this.currentChatId = data.chat_id - data.yaml_files = data.yaml_files.map(file => ({ - ...file, - content: this.formatYamlContent(file.content) - })) - return data - } catch (error) { - console.error('Error sending agent message:', error) - return this.getFallbackResponse(message) + }) + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`) } + + const data = await response.json() + return { requestId: data.request_id, chatId: data.chat_id } } - async sendWorkflowMessage(message: string, chatId?: string): Promise { - try { - const response = await fetch(`${API_BASE_URL}/api/chat_builder_workflow`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - content: message, - role: 'user', - chat_id: chatId || this.currentChatId - } as ChatMessage & { chat_id?: string }) - }) - if (!response.ok) { - throw new Error(`HTTP error! status: ${response.status}`) - } - const data: ChatResponse = await response.json() - this.currentChatId = data.chat_id - data.yaml_files = data.yaml_files.map(file => ({ - ...file, - content: this.formatYamlContent(file.content) - })) - return data - } catch (error) { - console.error('Error sending workflow message:', error) - return this.getFallbackResponse(message) + async getAsyncResult(requestId: string): Promise { + const response = await fetch(`${API_BASE_URL}/api/supervisor-result/${requestId}`) + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`) } - } - async sendGenerateMessage(message: string, chatId?: string): Promise { - try { - const response = await fetch(`${API_BASE_URL}/api/generate`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - content: message, - role: 'user', - chat_id: chatId || this.currentChatId - } as ChatMessage & { chat_id?: string }) - }) - if (!response.ok) { - throw new Error(`HTTP error! status: ${response.status}`) - } - const data: ChatResponse = await response.json() - this.currentChatId = data.chat_id - data.yaml_files = data.yaml_files.map(file => ({ - ...file, - content: this.formatYamlContent(file.content) - })) - return data - } catch (error) { - console.error('Error sending complete message:', error) - return this.getFallbackResponse(message) + const data = await response.json() + + if (data.status === 'processing') { + return null + } + + if (data.error) { + throw new Error(data.message) + } + this.currentChatId = data.chat_id + + data.yaml_files = data.yaml_files.map((file: any) => ({ + ...file, + content: this.formatYamlContent(file.content) + })) + + return { + response: data.response, + yaml_files: data.yaml_files, + chat_id: data.chat_id } } @@ -253,7 +217,7 @@ class ApiService { console.error('Error streaming complete message:', error) handlers.onError?.(error as Error) try { - const fallback = await this.sendCompleteMessage(message, chatId) + const fallback = await this.sendMessage(message, chatId) handlers.onEvent?.({ type: 'final', response: fallback.response, yaml_files: fallback.yaml_files, chat_id: fallback.chat_id }) handlers.onEvent?.({ type: 'done' }) handlers.onComplete?.() @@ -359,7 +323,7 @@ class ApiService { async deleteAllChatSessions(): Promise { try { - console.log('API: Calling delete all chat sessions endpoint...') + console.log('API: Calling delete all chat sessions endpoint') const response = await fetch(`${API_BASE_URL}/api/delete_all_chats`, { method: 'DELETE' }) @@ -539,6 +503,43 @@ workflow: chat_id: 'fallback-session' } } + + // Poll for status updates + async getStatusUpdates(chatId: string): Promise> { + try { + const response = await fetch(`${API_BASE_URL}/api/status/${chatId}`); + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`); + } + const data = await response.json(); + return data.updates || []; + } catch (error) { + console.error('Error getting status updates:', error); + return []; + } + } + + startStatusPolling(chatId: string, onUpdate: (updates: Array<{message: string, level: string, timestamp: string}>) => void): () => void { + let isPolling = true; + const poll = async () => { + if (!isPolling) return; + try { + const updates = await this.getStatusUpdates(chatId); + if (updates.length > 0) { + onUpdate(updates); + } + } catch (error) { + console.error('Error polling status updates:', error); + } + if (isPolling) { + setTimeout(poll, 100); + } + }; + poll(); + return () => { + isPolling = false; + }; + } } export const apiService = new ApiService() \ No newline at end of file diff --git a/start.sh b/start.sh index cc378e5..8c8321c 100755 --- a/start.sh +++ b/start.sh @@ -74,6 +74,14 @@ EDITING_AGENT_PID=$! echo $EDITING_AGENT_PID > logs/editing_agent.pid print_success "Editing Agent backend started with PID: $EDITING_AGENT_PID (port 8002)" +# Start Supervisor Agent backend (port 8005) +SUPERVISOR_AGENT_CMD="maestro serve ./meta-agents-v2/supervisor_agent/agents.yaml ./meta-agents-v2/supervisor_agent/workflow.yaml --port 8005" +print_status "Starting Supervisor Agent backend: $SUPERVISOR_AGENT_CMD" +nohup $SUPERVISOR_AGENT_CMD > logs/supervisor_agent.log 2>&1 & +SUPERVISOR_AGENT_PID=$! +echo $SUPERVISOR_AGENT_PID > logs/supervisor_agent.pid +print_success "Supervisor Agent backend started with PID: $SUPERVISOR_AGENT_PID (port 8005)" + ### ───────────── Start API ───────────── print_status "Starting Maestro API service..." @@ -176,6 +184,7 @@ echo "Services:" echo " - Agent Generation Backend: http://localhost:8003" echo " - Workflow Generation Backend: http://localhost:8004" echo " - Editing Agent Backend: http://localhost:8002" +echo " - Supervisor Agent Backend: http://localhost:8005" echo " - API: http://localhost:8001" echo " - API Docs: http://localhost:8001/docs" echo " - Builder Frontend: http://localhost:5174" @@ -184,8 +193,9 @@ echo "Logs:" echo " - Agent Generation: logs/maestro_agents.log" echo " - Workflow Generation: logs/maestro_workflow.log" echo " - Editing Agent: logs/editing_agent.log" +echo " - Supervisor Agent: logs/supervisor_agent.log" echo " - API: logs/api.log" echo " - Builder: logs/builder.log" echo "" echo "To stop all services, run: ./stop.sh" -echo "To view logs: tail -f logs/api.log | tail -f logs/builder.log | tail -f logs/maestro_agents.log | tail -f logs/maestro_workflow.log | tail -f logs/editing_agent.log" +echo "To view logs: tail -f logs/api.log | tail -f logs/builder.log | tail -f logs/maestro_agents.log | tail -f logs/maestro_workflow.log | tail -f logs/editing_agent.log | tail -f logs/supervisor_agent.log" diff --git a/stop.sh b/stop.sh index 587bf96..526ba0b 100755 --- a/stop.sh +++ b/stop.sh @@ -20,10 +20,11 @@ PID_FILES=( "logs/maestro_agents.pid" "logs/maestro_workflow.pid" "logs/editing_agent.pid" + "logs/supervisor_agent.pid" "logs/api.pid" "logs/builder.pid" ) -PORTS=(8000 8001 8002 8003 8004 5174) +PORTS=(8000 8001 8002 8003 8004 8005 5174) CLEAR_LOGS=false for arg in "$@"; do @@ -82,6 +83,7 @@ if [ "$CLEAR_LOGS" = true ]; then > logs/maestro_agents.log > logs/maestro_workflow.log > logs/editing_agent.log + > logs/supervisor_agent.log > logs/maestro.log print_success "All log files have been cleared." fi diff --git a/tests/test_supervisor.py b/tests/test_supervisor.py new file mode 100644 index 0000000..b2eeaf6 --- /dev/null +++ b/tests/test_supervisor.py @@ -0,0 +1,507 @@ +""" +Tests for the SupervisorAgent class and related functionality. +This module provides comprehensive test coverage for intent classification, +workflow generation, and YAML editing operations. +""" + +import sys +import os +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +import pytest +import json +import yaml +from unittest.mock import Mock, patch, MagicMock +import requests + +from api.supervisor import SupervisorAgent, Intent, Classification + + +class TestSupervisorAgent: + """Test suite for SupervisorAgent class.""" + + def setup_method(self): + """Set up test fixtures.""" + self.supervisor = SupervisorAgent(timeout_seconds=10) + + def test_supervisor_agent_initialization(self): + """Test SupervisorAgent initialization.""" + agent = SupervisorAgent() + assert agent.timeout_seconds == 30 + + agent_custom = SupervisorAgent(timeout_seconds=60) + assert agent_custom.timeout_seconds == 60 + + @patch('requests.post') + def test_classify_user_intent_generate_workflow(self, mock_post): + """Test intent classification for workflow generation.""" + # Mock successful supervisor response + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "response": json.dumps({ + "intent": "GENERATE_WORKFLOW", + "confidence": 0.95, + "reasoning": "User wants to create a new workflow" + }) + } + mock_post.return_value = mock_response + + result = self.supervisor.classify_user_intent( + "Create a workflow to process customer data", + "", + "" + ) + + assert isinstance(result, Classification) + assert result.intent == Intent.GENERATE_WORKFLOW + assert result.confidence == 0.95 + assert "new workflow" in result.reasoning + + # Verify the request was made correctly + mock_post.assert_called_once() + call_args = mock_post.call_args + assert call_args[0][0] == self.supervisor.SUPERVISOR_URL + assert "agent" in call_args[1]["json"] + assert call_args[1]["json"]["agent"] == "IntentClassifier" + + @patch('requests.post') + def test_classify_user_intent_edit_yaml(self, mock_post): + """Test intent classification for YAML editing.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "response": json.dumps({ + "intent": "EDIT_YAML", + "confidence": 0.88, + "reasoning": "User wants to modify existing YAML content" + }) + } + mock_post.return_value = mock_response + + result = self.supervisor.classify_user_intent( + "Change the timeout to 30 seconds", + "existing agents yaml", + "existing workflow yaml" + ) + + assert result.intent == Intent.EDIT_YAML + assert result.confidence == 0.88 + assert "modify existing" in result.reasoning + + @patch('requests.post') + def test_classify_user_intent_supervisor_failure(self, mock_post): + """Test handling of supervisor service failure.""" + mock_response = Mock() + mock_response.status_code = 500 + mock_response.text = "Internal server error" + mock_post.return_value = mock_response + + with pytest.raises(Exception) as exc_info: + self.supervisor.classify_user_intent("test input", "", "") + + assert "Supervisor agent failed with status 500" in str(exc_info.value) + + @patch('requests.post') + def test_classify_user_intent_invalid_json_response(self, mock_post): + """Test handling of invalid JSON response from supervisor.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "response": "This is not valid JSON" + } + mock_post.return_value = mock_response + + result = self.supervisor.classify_user_intent("test input", "", "") + + # Should fall back to default + assert result.intent == Intent.GENERATE_WORKFLOW + assert result.confidence == 0.5 + assert "parsing error" in result.reasoning + + @patch('requests.post') + def test_classify_user_intent_network_error(self, mock_post): + """Test handling of network errors.""" + mock_post.side_effect = requests.RequestException("Network error") + + with pytest.raises(Exception) as exc_info: + self.supervisor.classify_user_intent("test input", "", "") + + assert "Failed to communicate with supervisor agent" in str(exc_info.value) + + @patch('requests.post') + def test_generate_agents_yaml_success(self, mock_post): + """Test successful agents YAML generation.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "response": """```yaml +apiVersion: v1 +kind: Agent +metadata: + name: test-agent +spec: + description: A test agent +```""" + } + mock_post.return_value = mock_response + + result = self.supervisor.generate_agents_yaml("Create a test agent") + + assert "apiVersion: v1" in result + assert "name: test-agent" in result + assert "description: A test agent" in result + + @patch('requests.post') + def test_generate_agents_yaml_failure(self, mock_post): + """Test handling of agents generation failure.""" + mock_response = Mock() + mock_response.status_code = 500 + mock_response.text = "Generation failed" + mock_post.return_value = mock_response + + with pytest.raises(Exception) as exc_info: + self.supervisor.generate_agents_yaml("test input") + + assert "Agents generation failed" in str(exc_info.value) + + @patch('requests.post') + def test_generate_workflow_yaml_success(self, mock_post): + """Test successful workflow YAML generation.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "response": """```yaml +apiVersion: v1 +kind: Workflow +metadata: + name: test-workflow +spec: + steps: + - name: step1 + agent: test-agent +```""" + } + mock_post.return_value = mock_response + + result = self.supervisor.generate_workflow_yaml("Create a test workflow") + + assert "apiVersion: v1" in result + assert "kind: Workflow" in result + assert "name: test-workflow" in result + + @patch('requests.post') + def test_edit_yaml_success(self, mock_post): + """Test successful YAML editing.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "response": """```yaml +apiVersion: v1 +kind: Agent +metadata: + name: updated-agent +spec: + description: Updated description + timeout: 30 +```""" + } + mock_post.return_value = mock_response + + original_yaml = """apiVersion: v1 +kind: Agent +metadata: + name: original-agent +spec: + description: Original description""" + + result = self.supervisor.edit_yaml( + original_yaml, + "agents.yaml", + "Change timeout to 30 seconds" + ) + + assert "timeout: 30" in result + assert "Updated description" in result + + def test_extract_yaml_from_output_with_yaml_markers(self): + """Test YAML extraction from output with yaml markers.""" + output = """Here's the generated YAML: + +```yaml +apiVersion: v1 +kind: Agent +metadata: + name: test +``` + +That's the result.""" + + result = self.supervisor._extract_yaml_from_output(output) + expected = """apiVersion: v1 +kind: Agent +metadata: + name: test""" + + assert result == expected + + def test_extract_yaml_from_output_with_generic_markers(self): + """Test YAML extraction from output with generic markers.""" + output = """``` +apiVersion: v1 +kind: Agent +metadata: + name: test +```""" + + result = self.supervisor._extract_yaml_from_output(output) + expected = """apiVersion: v1 +kind: Agent +metadata: + name: test""" + + assert result == expected + + def test_extract_yaml_from_output_without_markers(self): + """Test YAML extraction when there are no code markers.""" + output = """apiVersion: v1 +kind: Agent +metadata: + name: test + +Some additional text here.""" + + result = self.supervisor._extract_yaml_from_output(output) + expected = """apiVersion: v1 +kind: Agent +metadata: + name: test""" + + assert result == expected + + def test_parse_agents_yaml_to_info_valid_yaml(self): + """Test parsing agents YAML to info with valid YAML.""" + agents_yaml = """apiVersion: v1 +kind: Agent +metadata: + name: agent1 +spec: + description: First agent +--- +apiVersion: v1 +kind: Agent +metadata: + name: agent2 +spec: + description: Second agent""" + + result = self.supervisor.parse_agents_yaml_to_info(agents_yaml) + + assert len(result) == 2 + assert result[0]["name"] == "agent1" + assert result[0]["description"] == "First agent" + assert result[1]["name"] == "agent2" + assert result[1]["description"] == "Second agent" + + def test_parse_agents_yaml_to_info_invalid_yaml(self): + """Test parsing agents YAML with invalid YAML (fallback to regex).""" + invalid_yaml = """name: agent1 +description: | + First agent description +name: agent2 +description: | + Second agent description""" + + result = self.supervisor.parse_agents_yaml_to_info(invalid_yaml) + + assert len(result) == 2 + assert result[0]["name"] == "agent1" + assert result[1]["name"] == "agent2" + + def test_build_workflow_prompt(self): + """Test building workflow prompt from agents info.""" + agents_info = [ + {"name": "agent1", "description": "First agent"}, + {"name": "agent2", "description": "Second agent"} + ] + user_input = "Process customer data" + + result = self.supervisor.build_workflow_prompt(agents_info, user_input) + + assert "agent1: agent1 – First agent" in result + assert "agent2: agent2 – Second agent" in result + assert "prompt: Process customer data" in result + + @patch.object(SupervisorAgent, 'generate_agents_yaml') + @patch.object(SupervisorAgent, 'generate_workflow_yaml') + @patch.object(SupervisorAgent, 'parse_agents_yaml_to_info') + @patch.object(SupervisorAgent, 'build_workflow_prompt') + def test_process_complete_workflow_generation( + self, mock_build_prompt, mock_parse_agents, + mock_gen_workflow, mock_gen_agents + ): + """Test complete workflow generation process.""" + # Set up mocks + mock_gen_agents.return_value = "agents yaml content" + mock_parse_agents.return_value = [{"name": "agent1", "description": "desc1"}] + mock_build_prompt.return_value = "workflow prompt" + mock_gen_workflow.return_value = "workflow yaml content" + + agents_yaml, workflow_yaml = self.supervisor.process_complete_workflow_generation( + "Create a data processing workflow" + ) + + assert agents_yaml == "agents yaml content" + assert workflow_yaml == "workflow yaml content" + + # Verify the call chain + mock_gen_agents.assert_called_once_with("Create a data processing workflow") + mock_parse_agents.assert_called_once_with("agents yaml content") + mock_build_prompt.assert_called_once_with( + [{"name": "agent1", "description": "desc1"}], + "Create a data processing workflow" + ) + mock_gen_workflow.assert_called_once_with("workflow prompt") + + def test_build_success_response_generation(self): + """Test building success response for generation intent.""" + result = self.supervisor.build_success_response( + Intent.GENERATE_WORKFLOW, + "Create a workflow" + ) + + assert "Successfully generated both agents.yaml and workflow.yaml" in result + assert "Create a workflow" in result + assert "agents.yaml" in result + assert "workflow.yaml" in result + + def test_build_success_response_editing(self): + """Test building success response for edit intent.""" + result = self.supervisor.build_success_response( + Intent.EDIT_YAML, + "Change timeout", + "agents.yaml" + ) + + assert "Successfully edited agents.yaml" in result + assert "Change timeout" in result + + +class TestIntentClassificationEdgeCases: + """Test edge cases for intent classification.""" + + def setup_method(self): + self.supervisor = SupervisorAgent() + + @patch('requests.post') + def test_malformed_json_response(self, mock_post): + """Test handling of malformed JSON in supervisor response.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "response": '{"intent": "INVALID_INTENT", "confidence": "not_a_number"}' + } + mock_post.return_value = mock_response + + result = self.supervisor.classify_user_intent("test", "", "") + + # Should fall back to default intent + assert result.intent == Intent.GENERATE_WORKFLOW + assert result.confidence == 0.5 + + @patch('requests.post') + def test_missing_fields_in_response(self, mock_post): + """Test handling of missing fields in supervisor JSON response.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "response": '{"intent": "GENERATE_WORKFLOW"}' # missing confidence and reasoning + } + mock_post.return_value = mock_response + + result = self.supervisor.classify_user_intent("test", "", "") + + assert result.intent == Intent.GENERATE_WORKFLOW + assert result.confidence == 1.0 # default value + assert result.reasoning == "" # default value + + @patch('requests.post') + def test_empty_yaml_extraction(self, mock_post): + """Test YAML extraction when no YAML content is found.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "response": "No YAML content here, just plain text." + } + mock_post.return_value = mock_response + + result = self.supervisor.generate_agents_yaml("test") + + # Should return the stripped text as fallback + assert result == "No YAML content here, just plain text." + + +class TestSupervisorIntegration: + """Integration tests that test multiple components together.""" + + def setup_method(self): + self.supervisor = SupervisorAgent() + + def test_yaml_parsing_with_real_yaml_structure(self): + """Test YAML parsing with realistic agent definitions.""" + agents_yaml = """apiVersion: v1 +kind: Agent +metadata: + name: DataProcessor +spec: + description: | + Processes incoming data streams and validates format + framework: langchain +--- +apiVersion: v1 +kind: Agent +metadata: + name: EmailSender +spec: + description: | + Sends notification emails to stakeholders + framework: custom""" + + result = self.supervisor.parse_agents_yaml_to_info(agents_yaml) + + assert len(result) == 2 + assert result[0]["name"] == "DataProcessor" + assert "Processes incoming data" in result[0]["description"] + assert result[1]["name"] == "EmailSender" + assert "notification emails" in result[1]["description"] + + def test_workflow_prompt_building_realistic_scenario(self): + """Test workflow prompt building with realistic agent data.""" + agents_info = [ + { + "name": "DataValidator", + "description": "Validates incoming data against schema" + }, + { + "name": "DataTransformer", + "description": "Transforms data to required format" + }, + { + "name": "DatabaseWriter", + "description": "Writes processed data to database" + } + ] + + user_input = "Create a pipeline to process customer orders" + + prompt = self.supervisor.build_workflow_prompt(agents_info, user_input) + + assert "agent1: DataValidator – Validates incoming data against schema" in prompt + assert "agent2: DataTransformer – Transforms data to required format" in prompt + assert "agent3: DatabaseWriter – Writes processed data to database" in prompt + assert "prompt: Create a pipeline to process customer orders" in prompt + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) +