Skip to content
This repository was archived by the owner on Apr 17, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 207 additions & 0 deletions api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""

from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
Expand All @@ -17,6 +18,10 @@
import os
import yaml
import re
import json
import asyncio
from pathlib import Path
import httpx

# Initialize FastAPI app
app = FastAPI(
Expand Down Expand Up @@ -264,6 +269,208 @@ async def chat_builder_complete(message: ChatMessage):
raise HTTPException(status_code=500, detail=f"Complete Builder failed: {e}")


@app.post("/api/chat_builder_complete_stream")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is a mouthful of an endpoint... is there a way to make this shorter?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, switched it into function logic

async def chat_builder_complete_stream(message: ChatMessage):
"""
Streaming variant of chat_builder_complete that emits newline-delimited JSON (NDJSON)
events as progress updates and intermediate YAMLs are ready.

Event types emitted:
- chat_id: { chat_id }
- status: { message }
- agents_yaml: { name: "agents.yaml", content }
- workflow_yaml: { name: "workflow.yaml", content }
- final: { response, yaml_files: [...], chat_id }
- error: { message }
- done: {}
"""

async def event_generator():
def to_line(obj: Dict[str, Any]) -> bytes:
return (json.dumps(obj, ensure_ascii=False) + "\n").encode("utf-8")

chat_id = str(uuid.uuid4())
try:
# Emit chat_id early so UI can attach updates
yield to_line({"type": "chat_id", "chat_id": chat_id})
yield to_line({"type": "status", "message": "(Starting generation)"})
await asyncio.sleep(0)
yield to_line({"type": "status", "message": "(Reading user request)"})
await asyncio.sleep(0)
yield to_line({"type": "status", "message": "(Planning agents)"})
await asyncio.sleep(0)
yield to_line({"type": "status", "message": "Generating agents.yaml..."})
await asyncio.sleep(0)

async with httpx.AsyncClient(timeout=120) as client:
agents_resp = await client.post(
"http://localhost:8003/chat",
json={"prompt": message.content, "agent": "TaskInterpreter"},
)
if agents_resp.status_code != 200:
raise Exception(f"Agents generation failed: {agents_resp.text}")

agents_output = agents_resp.json().get("response", "")
agents_yaml = ""
# Emit raw agent output as AI output lines for UI visibility
for line in agents_output.splitlines():
if line.strip():
yield to_line({"type": "ai_output", "source": "agents", "line": line})
await asyncio.sleep(0)
if "```yaml" in agents_output:
agents_yaml = (
agents_output.split("```yaml", 1)[-1].split("```", 1)[0].strip()
)
elif "```" in agents_output:
agents_yaml = agents_output.split("```", 1)[-1].split("```", 1)[0].strip()
else:
yaml_match = re.search(r"apiVersion:.*?(?=\n\n|\Z)", agents_output, re.DOTALL)
if yaml_match:
agents_yaml = yaml_match.group(0).strip()

# Emit agents YAML as soon as it's ready
yield to_line({
"type": "agents_yaml",
"file": {"name": "agents.yaml", "content": agents_yaml},
"chat_id": chat_id,
})
await asyncio.sleep(0)
yield to_line({"type": "status", "message": "(Parsing agents output)"})
await asyncio.sleep(0)

# Build workflow prompt based on parsed agents
agents_info: List[Dict[str, str]] = []
try:
agent_blocks = agents_yaml.split('---')
for block in agent_blocks:
if block.strip():
agent_data = yaml.safe_load(block)
if agent_data and 'metadata' in agent_data and 'name' in agent_data['metadata']:
name = agent_data['metadata']['name']
description = agent_data.get('spec', {}).get('description', '')
agents_info.append({'name': name, 'description': description})
except yaml.YAMLError:
name_matches = re.findall(r'name:\s*(\w+)', agents_yaml)
desc_matches = re.findall(r'description:\s*\|\s*\n\s*(.+?)(?=\n\s*\w+:|$)', agents_yaml, re.DOTALL)
for i, name in enumerate(name_matches):
description = desc_matches[i] if i < len(desc_matches) else ""
agents_info.append({'name': name, 'description': description.strip()})

workflow_prompt = f"Create a workflow that uses the following agents:\n\n"
for i, agent in enumerate(agents_info, 1):
workflow_prompt += f"agent{i}: {agent['name']} – {agent['description']}\n"
workflow_prompt += f"\nprompt: {message.content}"

yield to_line({"type": "status", "message": "(Building workflow prompt)"})
await asyncio.sleep(0)
yield to_line({"type": "status", "message": "Generating workflow.yaml..."})
await asyncio.sleep(0)

async with httpx.AsyncClient(timeout=180) as client:
workflow_resp = await client.post(
"http://localhost:8004/chat",
json={"prompt": workflow_prompt, "agent": "WorkflowYAMLBuilder"},
)
if workflow_resp.status_code != 200:
raise Exception(f"Workflow generation failed: {workflow_resp.text}")

workflow_output = workflow_resp.json().get("response", "")
workflow_yaml = ""
# Emit raw workflow output as AI output lines for UI visibility
for line in workflow_output.splitlines():
if line.strip():
yield to_line({"type": "ai_output", "source": "workflow", "line": line})
await asyncio.sleep(0)
if "```yaml" in workflow_output:
workflow_yaml = (
workflow_output.split("```yaml", 1)[-1].split("```", 1)[0].strip()
)
elif "```" in workflow_output:
workflow_yaml = workflow_output.split("```", 1)[-1].split("```", 1)[0].strip()
else:
yaml_match = re.search(r"apiVersion:.*?(?=\n\n|\Z)", workflow_output, re.DOTALL)
if yaml_match:
workflow_yaml = yaml_match.group(0).strip()

# Emit workflow YAML
yield to_line({
"type": "workflow_yaml",
"file": {"name": "workflow.yaml", "content": workflow_yaml},
"chat_id": chat_id,
})
await asyncio.sleep(0)
yield to_line({"type": "status", "message": "(Parsing workflow output)"})
await asyncio.sleep(0)
yield to_line({"type": "status", "message": "(Finalizing response)"})
await asyncio.sleep(0)

clean_response = f"""✅ Successfully generated both agents.yaml and workflow.yaml from your prompt!

Your request: "{message.content}"

I've created:
• **agents.yaml** - Contains the agent definitions
• **workflow.yaml** - Contains the workflow that uses those agents

Both files are now available in the YAML panel on the right. You can switch between tabs to view each file."""

final_payload = {
"type": "final",
"response": clean_response,
"yaml_files": [
{"name": "agents.yaml", "content": agents_yaml},
{"name": "workflow.yaml", "content": workflow_yaml},
],
"chat_id": chat_id,
}
yield to_line(final_payload)
yield to_line({"type": "done"})
except Exception as e:
yield to_line({"type": "error", "message": f"{e}"})
yield to_line({"type": "done"})

return StreamingResponse(event_generator(), media_type="application/x-ndjson")


@app.get("/api/stream_logs")
async def stream_logs(source: str = "agents", from_start: bool = False):
"""
Stream Maestro log files as Server-Sent Events (SSE).
Query params:
- source: 'agents' | 'workflow' (defaults to 'agents')
- from_start: if True, stream from beginning of file, otherwise tail new lines
"""
logs_dir = Path(__file__).resolve().parent.parent / "logs"
file_map = {
"agents": logs_dir / "maestro_agents.log",
"workflow": logs_dir / "maestro_workflow.log",
}
log_path = file_map.get(source, file_map["agents"]) # default to agents

async def sse_generator():
try:
# Ensure file exists
if not log_path.exists():
yield f"data: {json.dumps({'type': 'error', 'message': f'Log file not found: {log_path.name}'})}\n\n"
return

with open(log_path, "r", encoding="utf-8", errors="ignore") as f:
if not from_start:
f.seek(0, os.SEEK_END)

while True:
line = f.readline()
if line:
payload = {"type": "log", "source": source, "line": line.rstrip("\n")}
yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
else:
await asyncio.sleep(0.25)
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"

return StreamingResponse(sse_generator(), media_type="text/event-stream")


@app.get("/api/get_yamls/{chat_id}", response_model=List[YamlFile])
async def get_yamls(chat_id: str):
try:
Expand Down
142 changes: 79 additions & 63 deletions src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Sidebar } from './components/Sidebar'
import { ChatCanvas } from './components/ChatCanvas'
import { YamlPanel } from './components/YamlPanel'
import { ChatInput } from './components/ChatInput'
import { apiService, type ChatSession, type ChatHistory } from './services/api'
import { apiService, type ChatHistory, type StreamEvent } from './services/api'
import axios from 'axios'

export interface Message {
Expand Down Expand Up @@ -186,77 +186,93 @@ function App() {
setMessages(prev => [...prev, userMessage])
setIsLoading(true)

try {
// Use the complete endpoint to generate both agents and workflow
const apiResponse = await apiService.sendCompleteMessage(content, currentChatId || undefined);
// Create a dedicated live log/status message
const assistantLogId = (Date.now() + 1).toString()
setMessages(prev => [...prev, {
id: assistantLogId,
role: 'assistant',
content: 'Starting…',
timestamp: new Date()
}])

// Parse AI response (final_prompt if available)
let parsedText = apiResponse.response
try {
const parsedJSON = JSON.parse(parsedText)
if (parsedJSON.final_prompt) {
parsedText = parsedJSON.final_prompt
const mergeYaml = (incoming: { name: string; content: string }) => {
setYamlFiles(prev => {
const exists = prev.find(f => f.name === incoming.name)
if (exists) {
return prev.map(f => f.name === incoming.name ? { ...f, content: incoming.content } : f)
}
} catch (e) {
// Not JSON — ignore
}

// Don't strip YAML code blocks from the response text since we want to show the full response
// The YAML files are handled separately in the yaml_files array
return [...prev, { name: incoming.name, content: incoming.content, language: 'yaml' }]
})
}

const assistantMessage: Message = {
id: (Date.now() + 1).toString(),
content: parsedText,
role: 'assistant',
timestamp: new Date()
// Start log streaming scoped to this run
const closeAgentsLogs = apiService.streamLogs('agents', false, data => {
if (data.type === 'log' && data.line) {
const line = data.line as string
setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: m.content ? `${m.content}\n${line}` : line } : m))
}
})
const closeWorkflowLogs = apiService.streamLogs('workflow', false, data => {
if (data.type === 'log' && data.line) {
const line = data.line as string
setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: m.content ? `${m.content}\n${line}` : line } : m))
}
})

setMessages(prev => [...prev, assistantMessage])

// Update YAML files from API response, ensuring both files are updated
const updatedYamlFiles = yamlFiles.map(file => {
const apiFile = apiResponse.yaml_files.find(apiFile => apiFile.name === file.name)
if (apiFile) {
return {
...file,
content: apiFile.content
try {
await apiService.streamCompleteMessage(content, {
onEvent: async (event: StreamEvent) => {
if (event.type === 'status') {
setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: m.content ? `${m.content}\n… ${event.message}` : `… ${event.message}` } : m))
}
if (event.type === 'chat_id') {
if (event.chat_id !== currentChatId) {
setCurrentChatId(event.chat_id)
await loadChatHistory()
}
}
if (event.type === 'agents_yaml' || event.type === 'workflow_yaml') {
mergeYaml(event.file)
}
if (event.type === 'ai_output') {
const line = event.line
setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: m.content ? `${m.content}\n${line}` : line } : m))
}
if (event.type === 'final') {
// Add a new assistant message with final content
let parsedText = event.response
try {
const parsedJSON = JSON.parse(parsedText as string)
if ((parsedJSON as any).final_prompt) parsedText = (parsedJSON as any).final_prompt
} catch {}
setMessages(prev => [...prev, { id: String(Date.now() + 2), role: 'assistant', content: parsedText, timestamp: new Date() }])
// Ensure YAML files reflect final payload
event.yaml_files.forEach(file => mergeYaml(file))
if (event.chat_id !== currentChatId) {
setCurrentChatId(event.chat_id)
await loadChatHistory()
}
}
if (event.type === 'error') {
setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: `${m.content}\nError: ${event.message}` } : m))
}
},
onError: (err) => {
console.error('Streaming error:', err)
},
onComplete: () => {
setIsLoading(false)
// Close scoped log streams
closeAgentsLogs()
closeWorkflowLogs()
}
return file
})

apiResponse.yaml_files.forEach(apiFile => {
const existingFile = updatedYamlFiles.find(file => file.name === apiFile.name)
if (!existingFile) {
updatedYamlFiles.push({
name: apiFile.name,
content: apiFile.content,
language: 'yaml' as const
})
}
})

setYamlFiles(updatedYamlFiles)

// Update current chat ID if this is a new session
if (apiResponse.chat_id !== currentChatId) {
setCurrentChatId(apiResponse.chat_id)
await loadChatHistory()
}

}, currentChatId || undefined)
} catch (error) {
console.error('Error sending message:', error)

// Add error message
const errorMessage: Message = {
id: (Date.now() + 1).toString(),
role: 'assistant',
content: 'Sorry, I encountered an error while processing your request. Please try again.',
timestamp: new Date()
}
setMessages(prev => [...prev, errorMessage])
} finally {
console.error('Error streaming message:', error)
setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: `${m.content}\nSorry, I encountered an error while processing your request. Please try again.` } : m))
setIsLoading(false)
closeAgentsLogs()
closeWorkflowLogs()
}
}

Expand Down
Loading