Skip to content
This repository was archived by the owner on Apr 17, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 184 additions & 18 deletions api/main.py
Comment thread
george-lhj marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from datetime import datetime
from api.ai_agent import MaestroBuilderAgent
from api.database import Database
from api.supervisor import SupervisorAgent, Intent
import uuid
import subprocess
import tempfile
Expand All @@ -21,6 +22,7 @@
import asyncio
from pathlib import Path
import httpx
from concurrent.futures import ThreadPoolExecutor

# Initialize FastAPI app
app = FastAPI(
Expand Down Expand Up @@ -104,6 +106,27 @@ class ValidateYamlResponse(BaseModel):
message: str
errors: List[str] = []

# Status tracking for frontend updates
status_updates = {}
last_sent_index = {}
request_results = {}

def create_status_logger(chat_id: str):
"""Create a logger function that tracks status updates for the frontend."""
def log_status(message: str, level: str = "info"):
if chat_id not in status_updates:
status_updates[chat_id] = []
update = {
"message": message,
"level": level,
"timestamp": datetime.now().isoformat()
}
status_updates[chat_id].append(update)
return log_status

supervisor_agent = SupervisorAgent()
executor = ThreadPoolExecutor(max_workers=4)

# ---------------------------------------
# Service Functions
# ---------------------------------------
Expand Down Expand Up @@ -302,7 +325,7 @@ def to_line(obj: Dict[str, Any]) -> bytes:
await asyncio.sleep(0)
yield to_line({"type": "status", "message": "(Planning agents)"})
await asyncio.sleep(0)
yield to_line({"type": "status", "message": "Generating agents.yaml..."})
yield to_line({"type": "status", "message": "Generating agents.yaml"})
await asyncio.sleep(0)

agents_output, agents_yaml = await generate_agents_yaml(message.content)
Expand Down Expand Up @@ -346,7 +369,7 @@ def to_line(obj: Dict[str, Any]) -> bytes:

yield to_line({"type": "status", "message": "(Building workflow prompt)"})
await asyncio.sleep(0)
yield to_line({"type": "status", "message": "Generating workflow.yaml..."})
yield to_line({"type": "status", "message": "Generating workflow.yaml"})
await asyncio.sleep(0)

async with httpx.AsyncClient(timeout=180) as client:
Expand Down Expand Up @@ -553,23 +576,21 @@ async def delete_chat_session(chat_id: str):

@app.post("/api/edit_yaml", response_model=EditYamlResponse)
async def edit_yaml(request: EditYamlRequest):
"""Edit YAML content based on user instruction."""
if not request.yaml or not request.yaml.strip():
raise HTTPException(status_code=400, detail="YAML content cannot be empty")
if not request.instruction or not request.instruction.strip():
raise HTTPException(status_code=400, detail="Instruction cannot be empty")
if not request.file_type or request.file_type not in ["agents", "workflow"]:
raise HTTPException(status_code=400, detail="File type must be 'agents' or 'workflow'")

try:
# Build the prompt for the editing agent
prompt = f"Current YAML file (type: {request.file_type}):\n{request.yaml}\n\nUser instruction: {request.instruction}\n\nPlease apply the requested edit and return only the updated YAML file."
resp = requests.post(
"http://localhost:8002/chat",
json={"prompt": prompt},
file_name = f"{request.file_type}.yaml"
edited_yaml = supervisor_agent.edit_yaml(
yaml_content=request.yaml,
file_to_edit=file_name,
instruction=request.instruction
)
if resp.status_code != 200:
raise Exception(resp.text)
edited_yaml = resp.json().get("response", "")
# Remove markdown formatting if present
if "```yaml" in edited_yaml:
edited_yaml = (
edited_yaml.split("```yaml", 1)[-1].split("```", 1)[0].strip()
)
elif "```" in edited_yaml:
edited_yaml = edited_yaml.split("```", 1)[-1].split("```", 1)[0].strip()
return {"edited_yaml": edited_yaml}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Editing Agent failed: {e}")
Expand All @@ -578,7 +599,6 @@ async def edit_yaml(request: EditYamlRequest):
@app.post("/api/validate_yaml", response_model=ValidateYamlResponse)
async def validate_yaml(request: ValidateYamlRequest):
try:
# fix double-escaped characters
import codecs
unescaped_content = codecs.decode(request.yaml_content, 'unicode_escape')
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as temp_file:
Expand Down Expand Up @@ -648,6 +668,152 @@ def strip_ansi_codes(text):
)


class SupervisorRequest(BaseModel):
content: str
chat_id: Optional[str] = None

class SupervisorResponse(BaseModel):
intent: str
confidence: float
reasoning: str
response: str
yaml_files: List[Dict[str, str]]
chat_id: str

class AsyncSupervisorResponse(BaseModel):
request_id: str
status: str
message: str
chat_id: str

def store_request_result(request_id: str, result):
"""Store the result of a background request."""
if isinstance(result, dict) and "error" not in result:
supervisor_result = SupervisorResponse(
intent=result["intent"],
confidence=result["confidence"],
reasoning=result["reasoning"],
response=result["response"],
yaml_files=result["yaml_files"],
chat_id=result["chat_id"],
)
request_results[request_id] = supervisor_result
else:
request_results[request_id] = result

@app.post("/api/supervisor", response_model=SupervisorResponse)
async def supervisor_route(request: SupervisorRequest):
"""
Synchronous supervisor endpoint that processes requests directly.
For asynchronous processing with real-time updates, use /api/supervisor-async.
"""
if not request.content or not request.content.strip():
raise HTTPException(status_code=400, detail="Request content cannot be empty")

chat_id = request.chat_id or str(uuid.uuid4())

try:
result_container = {}

def sync_result_callback(request_id: str, result):
result_container[request_id] = result

temp_request_id = "sync_request"
supervisor_agent.process_request_in_background(
temp_request_id,
request.content,
chat_id,
create_status_logger,
sync_result_callback,
db
)

result = result_container.get(temp_request_id)

if result and isinstance(result, dict) and "error" in result:
raise HTTPException(status_code=500, detail=result["message"])

if isinstance(result, dict):
return SupervisorResponse(
intent=result["intent"],
confidence=result["confidence"],
reasoning=result["reasoning"],
response=result["response"],
yaml_files=result["yaml_files"],
chat_id=result["chat_id"],
)

return result

except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Supervisor processing failed: {str(e)}")

@app.post("/api/supervisor-async", response_model=AsyncSupervisorResponse)
async def supervisor_route_async(request: SupervisorRequest):
"""
Async version of supervisor endpoint that starts background processing
and returns immediately with a request ID for polling.
"""
request_id = str(uuid.uuid4())
chat_id = request.chat_id or str(uuid.uuid4())
# Start background processing using the supervisor agent
executor.submit(
supervisor_agent.process_request_in_background,
request_id,
request.content,
chat_id,
create_status_logger,
store_request_result,
db
)

return AsyncSupervisorResponse(
request_id=request_id,
status="processing",
message="Request started, use the request_id to poll for status and results",
chat_id=chat_id
)


@app.get("/api/supervisor-result/{request_id}")
async def get_supervisor_result(request_id: str):
"""Get the result of an async supervisor request."""
if request_id in request_results:
result = request_results[request_id]
del request_results[request_id]
return result
else:
return {"status": "processing", "message": "Request still in progress"}


@app.get("/api/status/{chat_id}")
async def get_status_updates(chat_id: str):
"""Get status updates for a specific chat ID."""
if chat_id in status_updates:
all_updates = status_updates[chat_id]
last_index = last_sent_index.get(chat_id, 0)
new_updates = all_updates[last_index:]

if new_updates:
last_sent_index[chat_id] = len(all_updates)
return {"updates": new_updates}
else:
return {"updates": []}
return {"updates": []}


@app.delete("/api/status/{chat_id}")
async def clear_status_updates(chat_id: str):
"""Clear status updates for a specific chat ID."""
if chat_id in status_updates:
del status_updates[chat_id]
if chat_id in last_sent_index:
del last_sent_index[chat_id]
return {"message": "Status updates cleared"}


@app.get("/api/health")
async def health_check():
try:
Expand Down
Loading