From beaff81a5cab4504948caaa457b5e8df21413fb7 Mon Sep 17 00:00:00 2001 From: george-lhj Date: Wed, 13 Aug 2025 21:43:19 -0700 Subject: [PATCH 1/3] add streaming support --- api/main.py | 207 ++++++++++++++++++++++++++++++++++++++++++++ src/App.tsx | 142 ++++++++++++++++-------------- src/services/api.ts | 104 ++++++++++++++++++++++ 3 files changed, 390 insertions(+), 63 deletions(-) diff --git a/api/main.py b/api/main.py index a3482b3..53c42f0 100644 --- a/api/main.py +++ b/api/main.py @@ -4,6 +4,7 @@ """ from fastapi import FastAPI, HTTPException +from fastapi.responses import StreamingResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import List, Dict, Any, Optional @@ -17,6 +18,10 @@ import os import yaml import re +import json +import asyncio +from pathlib import Path +import httpx # Initialize FastAPI app app = FastAPI( @@ -264,6 +269,208 @@ async def chat_builder_complete(message: ChatMessage): raise HTTPException(status_code=500, detail=f"Complete Builder failed: {e}") +@app.post("/api/chat_builder_complete_stream") +async def chat_builder_complete_stream(message: ChatMessage): + """ + Streaming variant of chat_builder_complete that emits newline-delimited JSON (NDJSON) + events as progress updates and intermediate YAMLs are ready. + + Event types emitted: + - chat_id: { chat_id } + - status: { message } + - agents_yaml: { name: "agents.yaml", content } + - workflow_yaml: { name: "workflow.yaml", content } + - final: { response, yaml_files: [...], chat_id } + - error: { message } + - done: {} + """ + + async def event_generator(): + def to_line(obj: Dict[str, Any]) -> bytes: + return (json.dumps(obj, ensure_ascii=False) + "\n").encode("utf-8") + + chat_id = str(uuid.uuid4()) + try: + # Emit chat_id early so UI can attach updates + yield to_line({"type": "chat_id", "chat_id": chat_id}) + yield to_line({"type": "status", "message": "(Starting generation)"}) + await asyncio.sleep(0) + yield to_line({"type": "status", "message": "(Reading user request)"}) + await asyncio.sleep(0) + yield to_line({"type": "status", "message": "(Planning agents)"}) + await asyncio.sleep(0) + yield to_line({"type": "status", "message": "Generating agents.yaml..."}) + await asyncio.sleep(0) + + async with httpx.AsyncClient(timeout=120) as client: + agents_resp = await client.post( + "http://localhost:8003/chat", + json={"prompt": message.content, "agent": "TaskInterpreter"}, + ) + if agents_resp.status_code != 200: + raise Exception(f"Agents generation failed: {agents_resp.text}") + + agents_output = agents_resp.json().get("response", "") + agents_yaml = "" + # Emit raw agent output as AI output lines for UI visibility + for line in agents_output.splitlines(): + if line.strip(): + yield to_line({"type": "ai_output", "source": "agents", "line": line}) + await asyncio.sleep(0) + if "```yaml" in agents_output: + agents_yaml = ( + agents_output.split("```yaml", 1)[-1].split("```", 1)[0].strip() + ) + elif "```" in agents_output: + agents_yaml = agents_output.split("```", 1)[-1].split("```", 1)[0].strip() + else: + yaml_match = re.search(r"apiVersion:.*?(?=\n\n|\Z)", agents_output, re.DOTALL) + if yaml_match: + agents_yaml = yaml_match.group(0).strip() + + # Emit agents YAML as soon as it's ready + yield to_line({ + "type": "agents_yaml", + "file": {"name": "agents.yaml", "content": agents_yaml}, + "chat_id": chat_id, + }) + await asyncio.sleep(0) + yield to_line({"type": "status", "message": "(Parsing agents output)"}) + await asyncio.sleep(0) + + # Build workflow prompt based on parsed agents + agents_info: List[Dict[str, str]] = [] + try: + agent_blocks = agents_yaml.split('---') + for block in agent_blocks: + if block.strip(): + agent_data = yaml.safe_load(block) + if agent_data and 'metadata' in agent_data and 'name' in agent_data['metadata']: + name = agent_data['metadata']['name'] + description = agent_data.get('spec', {}).get('description', '') + agents_info.append({'name': name, 'description': description}) + except yaml.YAMLError: + name_matches = re.findall(r'name:\s*(\w+)', agents_yaml) + desc_matches = re.findall(r'description:\s*\|\s*\n\s*(.+?)(?=\n\s*\w+:|$)', agents_yaml, re.DOTALL) + for i, name in enumerate(name_matches): + description = desc_matches[i] if i < len(desc_matches) else "" + agents_info.append({'name': name, 'description': description.strip()}) + + workflow_prompt = f"Create a workflow that uses the following agents:\n\n" + for i, agent in enumerate(agents_info, 1): + workflow_prompt += f"agent{i}: {agent['name']} – {agent['description']}\n" + workflow_prompt += f"\nprompt: {message.content}" + + yield to_line({"type": "status", "message": "(Building workflow prompt)"}) + await asyncio.sleep(0) + yield to_line({"type": "status", "message": "Generating workflow.yaml..."}) + await asyncio.sleep(0) + + async with httpx.AsyncClient(timeout=180) as client: + workflow_resp = await client.post( + "http://localhost:8004/chat", + json={"prompt": workflow_prompt, "agent": "WorkflowYAMLBuilder"}, + ) + if workflow_resp.status_code != 200: + raise Exception(f"Workflow generation failed: {workflow_resp.text}") + + workflow_output = workflow_resp.json().get("response", "") + workflow_yaml = "" + # Emit raw workflow output as AI output lines for UI visibility + for line in workflow_output.splitlines(): + if line.strip(): + yield to_line({"type": "ai_output", "source": "workflow", "line": line}) + await asyncio.sleep(0) + if "```yaml" in workflow_output: + workflow_yaml = ( + workflow_output.split("```yaml", 1)[-1].split("```", 1)[0].strip() + ) + elif "```" in workflow_output: + workflow_yaml = workflow_output.split("```", 1)[-1].split("```", 1)[0].strip() + else: + yaml_match = re.search(r"apiVersion:.*?(?=\n\n|\Z)", workflow_output, re.DOTALL) + if yaml_match: + workflow_yaml = yaml_match.group(0).strip() + + # Emit workflow YAML + yield to_line({ + "type": "workflow_yaml", + "file": {"name": "workflow.yaml", "content": workflow_yaml}, + "chat_id": chat_id, + }) + await asyncio.sleep(0) + yield to_line({"type": "status", "message": "(Parsing workflow output)"}) + await asyncio.sleep(0) + yield to_line({"type": "status", "message": "(Finalizing response)"}) + await asyncio.sleep(0) + + clean_response = f"""✅ Successfully generated both agents.yaml and workflow.yaml from your prompt! + +Your request: "{message.content}" + +I've created: +• **agents.yaml** - Contains the agent definitions +• **workflow.yaml** - Contains the workflow that uses those agents + +Both files are now available in the YAML panel on the right. You can switch between tabs to view each file.""" + + final_payload = { + "type": "final", + "response": clean_response, + "yaml_files": [ + {"name": "agents.yaml", "content": agents_yaml}, + {"name": "workflow.yaml", "content": workflow_yaml}, + ], + "chat_id": chat_id, + } + yield to_line(final_payload) + yield to_line({"type": "done"}) + except Exception as e: + yield to_line({"type": "error", "message": f"{e}"}) + yield to_line({"type": "done"}) + + return StreamingResponse(event_generator(), media_type="application/x-ndjson") + + +@app.get("/api/stream_logs") +async def stream_logs(source: str = "agents", from_start: bool = False): + """ + Stream Maestro log files as Server-Sent Events (SSE). + Query params: + - source: 'agents' | 'workflow' (defaults to 'agents') + - from_start: if True, stream from beginning of file, otherwise tail new lines + """ + logs_dir = Path(__file__).resolve().parent.parent / "logs" + file_map = { + "agents": logs_dir / "maestro_agents.log", + "workflow": logs_dir / "maestro_workflow.log", + } + log_path = file_map.get(source, file_map["agents"]) # default to agents + + async def sse_generator(): + try: + # Ensure file exists + if not log_path.exists(): + yield f"data: {json.dumps({'type': 'error', 'message': f'Log file not found: {log_path.name}'})}\n\n" + return + + with open(log_path, "r", encoding="utf-8", errors="ignore") as f: + if not from_start: + f.seek(0, os.SEEK_END) + + while True: + line = f.readline() + if line: + payload = {"type": "log", "source": source, "line": line.rstrip("\n")} + yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" + else: + await asyncio.sleep(0.25) + except Exception as e: + yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n" + + return StreamingResponse(sse_generator(), media_type="text/event-stream") + + @app.get("/api/get_yamls/{chat_id}", response_model=List[YamlFile]) async def get_yamls(chat_id: str): try: diff --git a/src/App.tsx b/src/App.tsx index 0d12996..68f814a 100644 --- a/src/App.tsx +++ b/src/App.tsx @@ -3,7 +3,7 @@ import { Sidebar } from './components/Sidebar' import { ChatCanvas } from './components/ChatCanvas' import { YamlPanel } from './components/YamlPanel' import { ChatInput } from './components/ChatInput' -import { apiService, type ChatSession, type ChatHistory } from './services/api' +import { apiService, type ChatHistory, type StreamEvent } from './services/api' import axios from 'axios' export interface Message { @@ -186,77 +186,93 @@ function App() { setMessages(prev => [...prev, userMessage]) setIsLoading(true) - try { - // Use the complete endpoint to generate both agents and workflow - const apiResponse = await apiService.sendCompleteMessage(content, currentChatId || undefined); + // Create a dedicated live log/status message + const assistantLogId = (Date.now() + 1).toString() + setMessages(prev => [...prev, { + id: assistantLogId, + role: 'assistant', + content: 'Starting…', + timestamp: new Date() + }]) - // Parse AI response (final_prompt if available) - let parsedText = apiResponse.response - try { - const parsedJSON = JSON.parse(parsedText) - if (parsedJSON.final_prompt) { - parsedText = parsedJSON.final_prompt + const mergeYaml = (incoming: { name: string; content: string }) => { + setYamlFiles(prev => { + const exists = prev.find(f => f.name === incoming.name) + if (exists) { + return prev.map(f => f.name === incoming.name ? { ...f, content: incoming.content } : f) } - } catch (e) { - // Not JSON — ignore - } - - // Don't strip YAML code blocks from the response text since we want to show the full response - // The YAML files are handled separately in the yaml_files array + return [...prev, { name: incoming.name, content: incoming.content, language: 'yaml' }] + }) + } - const assistantMessage: Message = { - id: (Date.now() + 1).toString(), - content: parsedText, - role: 'assistant', - timestamp: new Date() + // Start log streaming scoped to this run + const closeAgentsLogs = apiService.streamLogs('agents', false, data => { + if (data.type === 'log' && data.line) { + const line = data.line as string + setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: m.content ? `${m.content}\n${line}` : line } : m)) } + }) + const closeWorkflowLogs = apiService.streamLogs('workflow', false, data => { + if (data.type === 'log' && data.line) { + const line = data.line as string + setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: m.content ? `${m.content}\n${line}` : line } : m)) + } + }) - setMessages(prev => [...prev, assistantMessage]) - - // Update YAML files from API response, ensuring both files are updated - const updatedYamlFiles = yamlFiles.map(file => { - const apiFile = apiResponse.yaml_files.find(apiFile => apiFile.name === file.name) - if (apiFile) { - return { - ...file, - content: apiFile.content + try { + await apiService.streamCompleteMessage(content, { + onEvent: async (event: StreamEvent) => { + if (event.type === 'status') { + setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: m.content ? `${m.content}\n… ${event.message}` : `… ${event.message}` } : m)) } + if (event.type === 'chat_id') { + if (event.chat_id !== currentChatId) { + setCurrentChatId(event.chat_id) + await loadChatHistory() + } + } + if (event.type === 'agents_yaml' || event.type === 'workflow_yaml') { + mergeYaml(event.file) + } + if (event.type === 'ai_output') { + const line = event.line + setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: m.content ? `${m.content}\n${line}` : line } : m)) + } + if (event.type === 'final') { + // Add a new assistant message with final content + let parsedText = event.response + try { + const parsedJSON = JSON.parse(parsedText as string) + if ((parsedJSON as any).final_prompt) parsedText = (parsedJSON as any).final_prompt + } catch {} + setMessages(prev => [...prev, { id: String(Date.now() + 2), role: 'assistant', content: parsedText, timestamp: new Date() }]) + // Ensure YAML files reflect final payload + event.yaml_files.forEach(file => mergeYaml(file)) + if (event.chat_id !== currentChatId) { + setCurrentChatId(event.chat_id) + await loadChatHistory() + } + } + if (event.type === 'error') { + setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: `${m.content}\nError: ${event.message}` } : m)) + } + }, + onError: (err) => { + console.error('Streaming error:', err) + }, + onComplete: () => { + setIsLoading(false) + // Close scoped log streams + closeAgentsLogs() + closeWorkflowLogs() } - return file - }) - - apiResponse.yaml_files.forEach(apiFile => { - const existingFile = updatedYamlFiles.find(file => file.name === apiFile.name) - if (!existingFile) { - updatedYamlFiles.push({ - name: apiFile.name, - content: apiFile.content, - language: 'yaml' as const - }) - } - }) - - setYamlFiles(updatedYamlFiles) - - // Update current chat ID if this is a new session - if (apiResponse.chat_id !== currentChatId) { - setCurrentChatId(apiResponse.chat_id) - await loadChatHistory() - } - + }, currentChatId || undefined) } catch (error) { - console.error('Error sending message:', error) - - // Add error message - const errorMessage: Message = { - id: (Date.now() + 1).toString(), - role: 'assistant', - content: 'Sorry, I encountered an error while processing your request. Please try again.', - timestamp: new Date() - } - setMessages(prev => [...prev, errorMessage]) - } finally { + console.error('Error streaming message:', error) + setMessages(prev => prev.map(m => m.id === assistantLogId ? { ...m, content: `${m.content}\nSorry, I encountered an error while processing your request. Please try again.` } : m)) setIsLoading(false) + closeAgentsLogs() + closeWorkflowLogs() } } diff --git a/src/services/api.ts b/src/services/api.ts index 7207abb..ba63505 100644 --- a/src/services/api.ts +++ b/src/services/api.ts @@ -14,6 +14,22 @@ export interface ChatResponse { chat_id: string } +export type StreamEvent = + | { type: 'chat_id'; chat_id: string } + | { type: 'status'; message: string } + | { type: 'agents_yaml'; file: { name: string; content: string }; chat_id?: string } + | { type: 'workflow_yaml'; file: { name: string; content: string }; chat_id?: string } + | { type: 'final'; response: string; yaml_files: Array<{ name: string; content: string }>; chat_id: string } + | { type: 'error'; message: string } + | { type: 'done' } + | { type: 'ai_output'; source: 'agents' | 'workflow'; line: string } + +export interface StreamHandlers { + onEvent?: (event: StreamEvent) => void + onError?: (error: Error) => void + onComplete?: () => void +} + export interface YamlFile { name: string content: string @@ -176,6 +192,76 @@ class ApiService { } } + async streamCompleteMessage(message: string, handlers: StreamHandlers = {}, chatId?: string): Promise { + try { + const response = await fetch(`${API_BASE_URL}/api/chat_builder_complete_stream`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + content: message, + role: 'user', + chat_id: chatId || this.currentChatId + } as ChatMessage & { chat_id?: string }) + }) + + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`) + } + + if (!response.body) { + throw new Error('No response body for streaming request') + } + + const reader = response.body.getReader() + const decoder = new TextDecoder() + let buffer = '' + + while (true) { + const { value, done } = await reader.read() + if (done) break + buffer += decoder.decode(value, { stream: true }) + + let lineBreakIndex: number + while ((lineBreakIndex = buffer.indexOf('\n')) >= 0) { + const line = buffer.slice(0, lineBreakIndex).trim() + buffer = buffer.slice(lineBreakIndex + 1) + if (!line) continue + try { + const event = JSON.parse(line) as StreamEvent + if (event.type === 'final') { + this.currentChatId = event.chat_id + } else if (event.type === 'chat_id') { + this.currentChatId = event.chat_id + } + handlers.onEvent?.(event) + } catch (e) { + console.error('Failed to parse stream line:', line, e) + } + } + } + const last = buffer.trim() + if (last) { + try { + const event = JSON.parse(last) as StreamEvent + handlers.onEvent?.(event) + } catch (e) { + } + } + + handlers.onComplete?.() + } catch (error) { + console.error('Error streaming complete message:', error) + handlers.onError?.(error as Error) + try { + const fallback = await this.sendCompleteMessage(message, chatId) + handlers.onEvent?.({ type: 'final', response: fallback.response, yaml_files: fallback.yaml_files, chat_id: fallback.chat_id }) + handlers.onEvent?.({ type: 'done' }) + handlers.onComplete?.() + } catch (_) { + } + } + } + async getYamlFiles(chatId: string): Promise { try { const response = await fetch(`${API_BASE_URL}/api/get_yamls/${chatId}`) @@ -344,6 +430,24 @@ class ApiService { this.currentChatId = chatId } + streamLogs(source: 'agents' | 'workflow' = 'agents', fromStart = false, onEvent: (data: { type: string; source?: string; line?: string; message?: string }) => void): () => void { + const url = `${API_BASE_URL}/api/stream_logs?source=${encodeURIComponent(source)}&from_start=${fromStart ? 'true' : 'false'}` + const es = new EventSource(url) + es.onmessage = (e) => { + try { + const data = JSON.parse(e.data) + onEvent(data) + } catch (err) { + console.error('Failed to parse SSE message', err) + } + } + es.onerror = (e) => { + console.error('SSE error', e) + es.close() + } + return () => es.close() + } + private formatYamlContent(content: string): string { // Remove any markdown code blocks if present let formatted = content.replace(/```yaml\n?|\n?```/g, '') From 403f8a04b84b3bea3de0c9d5ff7326581a6809e4 Mon Sep 17 00:00:00 2001 From: george-lhj Date: Fri, 15 Aug 2025 11:19:43 -0700 Subject: [PATCH 2/3] add toggle button --- src/components/ChatInput.tsx | 34 ++++++++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/src/components/ChatInput.tsx b/src/components/ChatInput.tsx index 6598ef7..67c991f 100644 --- a/src/components/ChatInput.tsx +++ b/src/components/ChatInput.tsx @@ -4,12 +4,14 @@ import { Send, Paperclip, Mic, ChevronDown, Lightbulb } from 'lucide-react' import { cn } from '../lib/utils' interface ChatInputProps { - onSendMessage: (message: string) => void + onSendMessage: (message: string, useStreaming: boolean) => void onEditYaml?: (instruction: string) => void disabled?: boolean + streamingEnabled?: boolean + onToggleStreaming?: (enabled: boolean) => void } -export function ChatInput({ onSendMessage, onEditYaml, disabled = false }: ChatInputProps) { +export function ChatInput({ onSendMessage, onEditYaml, disabled = false, streamingEnabled = true, onToggleStreaming }: ChatInputProps) { const [message, setMessage] = useState('') const [isTyping, setIsTyping] = useState(false) const [showSuggestions, setShowSuggestions] = useState(false) @@ -38,7 +40,7 @@ export function ChatInput({ onSendMessage, onEditYaml, disabled = false }: ChatI const handleSend = () => { if (message.trim() && !disabled) { - onSendMessage(message.trim()) + onSendMessage(message.trim(), streamingEnabled) setMessage('') setIsTyping(false) } @@ -53,7 +55,7 @@ export function ChatInput({ onSendMessage, onEditYaml, disabled = false }: ChatI const handleSuggestionClick = (suggestion: string) => { if (!disabled) { - onSendMessage(suggestion) + onSendMessage(suggestion, streamingEnabled) setShowSuggestions(false) } } @@ -120,6 +122,30 @@ export function ChatInput({ onSendMessage, onEditYaml, disabled = false }: ChatI )} + {/* Streaming Toggle */} + {onToggleStreaming && ( + + )} + {/* Text Input */}