diff --git a/readme.md b/readme.md index f228293..acde20a 100644 --- a/readme.md +++ b/readme.md @@ -12,6 +12,8 @@ TMLL provides users with pre-built, automated solutions integrating general Trac ## Table of Contents - [Installation](#installation) - [Quick Start](#quick-start) +- [MCP Server](#mcp-server) +- [CLI Usage](#cli-usage) - [Features and Modules](#features-and-modules) - [Prerequisites](#prerequisites) - [Documentation](#documentation) @@ -99,6 +101,92 @@ anomalies = ad.find_anomalies(method='iforest') ad.plot_anomalies(anomalies) ``` +## MCP Server + +TMLL provides an MCP (Model Context Protocol) server that exposes trace analysis capabilities to AI assistants and other MCP clients. + +### Setup + +1. Install TMLL (see [Installation](#installation)) + +2. Start your Trace Server: +```bash +./tracecompass-server -vmargs -Dtraceserver.port=8080 +``` + +3. Configure in your MCP client (e.g., `~/.config/kiro-cli/mcp.json`). Point `command` at the Python interpreter of the environment where TMLL is installed, and set `PYTHONPATH` so the `tmll` package is importable: +```json +{ + "mcpServers": { + "tmll": { + "type": "stdio", + "command": "/path/to/tmll/venv/bin/python", + "args": ["-m", "tmll.mcp.server"], + "env": { + "PYTHONPATH": "/path/to/tmll" + } + } + } +} +``` + +### Available Tools + +- `ensure_server`: Check if the Trace Compass server is running; downloads, installs, and starts it automatically if not found +- `create_experiment`: Create trace experiments from files +- `list_outputs`: List available outputs for an experiment +- `fetch_data`: Fetch data from experiment outputs +- `detect_anomalies`: Run anomaly detection analysis +- `detect_memory_leak`: Detect memory leaks +- `detect_changepoints`: Detect performance trend changes +- `analyze_correlation`: Perform root cause correlation analysis +- `detect_idle_resources`: Identify underutilized resources +- `plan_capacity`: Run capacity planning predictions + +## CLI Usage + +TMLL includes a command-line interface for running analyses without writing code. + +### Basic Commands + +```bash +# Create an experiment +tmll_cli.py create --traces /path/to/trace1 /path/to/trace2 --name "My Experiment" + +# List outputs for an experiment +tmll_cli.py list-outputs --experiment + +# Fetch data from outputs +tmll_cli.py fetch-data --experiment --keywords "cpu usage" + +# Run anomaly detection +tmll_cli.py detect-anomalies --experiment --keywords "cpu usage" --method iforest + +# Detect memory leaks +tmll_cli.py detect-memory-leak --experiment + +# Detect change points +tmll_cli.py detect-changepoints --experiment --method pelt + +# Analyze correlations +tmll_cli.py analyze-correlation --experiment --method pearson + +# Detect idle resources +tmll_cli.py detect-idle --experiment --threshold 5 + +# Run capacity planning +tmll_cli.py plan-capacity --experiment --horizon 30 + +# Perform clustering +tmll_cli.py cluster --experiment --method kmeans --n-clusters 3 +``` + +### Options + +- `--host`: Trace Server host (default: localhost) +- `--port`: Trace Server port (default: 8080) +- `--verbose`: Enable verbose output + ## Prerequisites - Python 3.8 or higher diff --git a/tmll/mcp/cli.py b/tmll/mcp/cli.py index 9fe0fca..a98ff51 100644 --- a/tmll/mcp/cli.py +++ b/tmll/mcp/cli.py @@ -68,22 +68,22 @@ def fetch_data_cmd(args): data = client.fetch_data(experiment, outputs_with_tree) if args.output: - for key, value in data.items(): - if isinstance(value, pd.DataFrame): - value.to_csv(f"{args.output}_{key}.csv", index=False) - elif isinstance(value, dict): - for sub_key, df in value.items(): - if isinstance(df, pd.DataFrame): - df.to_csv(f"{args.output}_{key}_{sub_key}.csv", index=False) + for output_id, content in data.items(): + if isinstance(content, dict): + for series_name, df in content.items(): + safe_name = series_name.replace("->", "_").replace(" ", "_") + df.to_csv(f"{args.output}_{output_id}_{safe_name}.csv", index=False) + elif isinstance(content, pd.DataFrame): + content.to_csv(f"{args.output}_{output_id}.csv", index=False) print(f"Data exported to {args.output}_*.csv") else: - result = {} + serializable_data = {} for k, v in data.items(): - if isinstance(v, pd.DataFrame): - result[k] = v.to_dict() - elif isinstance(v, dict): - result[k] = {sk: sv.to_dict() for sk, sv in v.items() if isinstance(sv, pd.DataFrame)} - print(json.dumps(result, indent=2, default=str)) + if isinstance(v, dict): + serializable_data[k] = {sk: sv.to_dict() for sk, sv in v.items() if isinstance(sv, pd.DataFrame)} + elif isinstance(v, pd.DataFrame): + serializable_data[k] = v.to_dict() + print(json.dumps(serializable_data, indent=2, default=str)) def detect_anomalies(args): @@ -101,14 +101,30 @@ def detect_anomalies(args): print("No outputs found matching criteria") return - ad = AnomalyDetection(client, experiment, outputs) + ad = AnomalyDetection(client, experiment, outputs, resample_freq=args.resample_freq, min_size=args.min_size) result = ad.find_anomalies(method=args.method) if args.plot: ad.plot_anomalies(result) else: - total = sum(len(df) for df in result.anomalies.values()) + total = sum(len(df[df.filter(regex="_is_anomaly$").any(axis=1)]) for df in result.anomalies.values()) print(f"Found {total} anomalies across {len(result.anomalies)} outputs") + + if total > 0: + print("\nTop 3 Anomalies:") + all_anomalies = [] + for name, df in result.anomalies.items(): + is_anomaly = df.filter(regex="_is_anomaly$").any(axis=1) + anomaly_df = df[is_anomaly].copy() + anomaly_df["source"] = name + all_anomalies.append(anomaly_df) + + if all_anomalies: + combined_anomalies = pd.concat(all_anomalies) + if "anomaly_score" in combined_anomalies.columns: + top_3 = combined_anomalies.sort_values("anomaly_score", ascending=False).head(3) + for i, (idx, row) in enumerate(top_3.iterrows(), 1): + print(f"{i}. Time: {idx}, Source: {row['source']}, Score: {row['anomaly_score']:.4f}") def detect_memory_leak(args): @@ -281,6 +297,8 @@ def main(): anomaly_parser.add_argument("-k", "--keywords", nargs="+", default=["cpu usage"], help="Output keywords") anomaly_parser.add_argument("-m", "--method", default="iforest", help="Detection method") anomaly_parser.add_argument("-p", "--plot", action="store_true", help="Plot anomalies") + anomaly_parser.add_argument("-H", "--resample-freq", default="1s", help="Resampling frequency") + anomaly_parser.add_argument("-s", "--min-size", type=int, default=10, help="Minimum data points") anomaly_parser.set_defaults(func=detect_anomalies) # memory-leak command diff --git a/tmll/mcp/server.py b/tmll/mcp/server.py index 82d57aa..3cd6471 100644 --- a/tmll/mcp/server.py +++ b/tmll/mcp/server.py @@ -1,38 +1,154 @@ #!/usr/bin/env python3 """MCP server for TMLL CLI - exposes all CLI commands as MCP tools.""" -import asyncio +import base64 +import contextlib +import functools +import io import json import subprocess import sys +import traceback as _tb +import urllib.request from pathlib import Path -from typing import Any, Optional +from typing import Optional -from mcp.server import Server -from mcp.server.stdio import stdio_server -from mcp.types import Tool, TextContent +import matplotlib +matplotlib.use("Agg") +import matplotlib.pyplot as plt +import pandas as pd -server = Server("tmll-cli-mcp-server") +from mcp.server.fastmcp import FastMCP +from mcp.types import ImageContent, TextContent + +mcp = FastMCP("tmll-cli-mcp-server") CLI_PATH = sys.argv[1] if len(sys.argv) > 1 else str(Path(__file__).resolve().parent / "cli.py") +DEFAULT_HOST = "localhost" +DEFAULT_PORT = 8080 + + +# --------------------------------------------------------------------------- +# Debug helpers +# --------------------------------------------------------------------------- + +def _log(msg: str) -> None: + """Write debug message to stderr (safe for MCP stdio transport).""" + print(f"[tmll-mcp-debug] {msg}", file=sys.stderr, flush=True) + + +@contextlib.contextmanager +def _protect_stdout(): + """Temporarily redirect stdout→stderr so stray print() cannot corrupt the MCP stdio transport.""" + old = sys.stdout + sys.stdout = sys.stderr + try: + yield + finally: + sys.stdout = old + + +def _safe_tool(fn): + """Decorator applied to every tool: protects stdout, logs entry/exit/errors.""" + @functools.wraps(fn) + def wrapper(*args, **kwargs): + name = fn.__name__ + _log(f">>> TOOL CALL {name} args={args!r} kwargs={kwargs!r}") + with _protect_stdout(): + try: + result = fn(*args, **kwargs) + preview = repr(result)[:300] + _log(f"<<< TOOL OK {name} result_preview={preview}") + return result + except Exception as exc: + tb = _tb.format_exc() + _log(f"!!! TOOL ERROR {name} {type(exc).__name__}: {exc}\n{tb}") + raise + return wrapper + + +# --------------------------------------------------------------------------- +# Server health +# --------------------------------------------------------------------------- + +def _server_is_running(host: str = DEFAULT_HOST, port: int = DEFAULT_PORT) -> bool: + """Check if the trace server is reachable.""" + url = f"http://{host}:{port}/tsp/api/health" + try: + urllib.request.urlopen(url, timeout=3) + return True + except Exception as exc: + _log(f"Server health check failed ({url}): {exc}") + return False + + +@mcp.tool() +def ensure_server(host: str = DEFAULT_HOST, port: int = DEFAULT_PORT) -> str: + """Ensure the Trace Compass server is running. Downloads and installs it if not found, then starts it.""" + if _server_is_running(host, port): + return f"Trace server already running at {host}:{port}" + + from tmll.services.tsp_installer import TSPInstaller + installer = TSPInstaller() + installer.install() + + import time + for _ in range(15): + time.sleep(2) + if _server_is_running(host, port): + return f"Trace server started at {host}:{port}" + + return "Trace server was launched but is not yet responding. It may need more time to start." + + +# --------------------------------------------------------------------------- +# CLI runner +# --------------------------------------------------------------------------- def run_cli(*args: str) -> str: """Run a tmll_cli.py command and return output.""" - result = subprocess.run( - [sys.executable, CLI_PATH, "--log-stderr", *args], - capture_output=True, text=True, timeout=120 - ) + cmd = [sys.executable, CLI_PATH, "--log-stderr", *args] + _log(f"run_cli: executing {' '.join(cmd)}") + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) + except subprocess.TimeoutExpired as exc: + msg = ( + f"CLI timed out after 120s\n" + f" command: {' '.join(cmd)}\n" + f" partial stdout: {exc.stdout!r}\n" + f" partial stderr: {exc.stderr!r}" + ) + _log(f"run_cli TIMEOUT: {msg}") + raise RuntimeError(msg) + except Exception as exc: + msg = ( + f"Failed to launch CLI: {type(exc).__name__}: {exc}\n" + f" command: {' '.join(cmd)}" + ) + _log(f"run_cli LAUNCH ERROR: {msg}") + raise RuntimeError(msg) + + _log(f"run_cli: exit_code={result.returncode} stdout_len={len(result.stdout)} stderr_len={len(result.stderr)}") + if result.stderr.strip(): + _log(f"run_cli stderr:\n{result.stderr.strip()}") + if result.returncode != 0: - raise RuntimeError(result.stderr or f"CLI exited with code {result.returncode}") + msg = ( + f"CLI exited with code {result.returncode}\n" + f" command: {' '.join(cmd)}\n" + f" stdout: {result.stdout.strip()}\n" + f" stderr: {result.stderr.strip()}" + ) + _log(f"run_cli FAILED: {msg}") + raise RuntimeError(msg) return result.stdout.strip() -def build_args(arguments: dict, flag_map: dict[str, str]) -> list[str]: - """Convert tool arguments to CLI flags.""" +def build_args(flag_map: dict[str, tuple[str, any]]) -> list[str]: + """Convert (flag, value) pairs to CLI flags.""" args = [] - for key, flag in flag_map.items(): - val = arguments.get(key) + for flag, val in flag_map.values(): if val is None: continue if isinstance(val, bool): @@ -45,204 +161,205 @@ def build_args(arguments: dict, flag_map: dict[str, str]) -> list[str]: return args -@server.list_tools() -async def list_tools() -> list[Tool]: - return [ - Tool( - name="create_experiment", - description="Create a trace experiment from LTTng trace files or directories.", - inputSchema={ - "type": "object", - "properties": { - "traces": {"type": "array", "items": {"type": "string"}, "description": "Trace file/directory paths"}, - "experiment_name": {"type": "string"}, - "host": {"type": "string", "default": "localhost"}, - "port": {"type": "integer", "default": 8080}, - }, - "required": ["traces", "experiment_name"], - }, - ), - Tool( - name="list_experiments", - description="List all open experiments", - inputSchema={"type": "object", "properties": {}}, - ), - Tool( - name="list_outputs", - description="List available outputs for an experiment", - inputSchema={ - "type": "object", - "properties": { - "experiment_id": {"type": "string"}, - "keywords": {"type": "array", "items": {"type": "string"}}, - }, - "required": ["experiment_id"], - }, - ), - Tool( - name="fetch_data", - description="Fetch data from experiment outputs", - inputSchema={ - "type": "object", - "properties": { - "experiment_id": {"type": "string"}, - "keywords": {"type": "array", "items": {"type": "string"}, "default": ["cpu usage"]}, - "output_file": {"type": "string", "description": "CSV output file prefix"}, - }, - "required": ["experiment_id"], - }, - ), - Tool( - name="delete_experiment", - description="Delete an experiment", - inputSchema={ - "type": "object", - "properties": {"experiment_id": {"type": "string"}}, - "required": ["experiment_id"], - }, - ), - Tool( - name="detect_anomalies", - description="Detect anomalies in trace data using ML methods (iforest, zscore, iqr, moving_average, seasonality, frequency_domain, combined)", - inputSchema={ - "type": "object", - "properties": { - "experiment_id": {"type": "string"}, - "keywords": {"type": "array", "items": {"type": "string"}, "default": ["cpu usage"]}, - "method": {"type": "string", "default": "iforest", "enum": ["iforest", "zscore", "iqr", "moving_average", "seasonality", "frequency_domain", "combined"]}, - }, - "required": ["experiment_id"], - }, - ), - Tool( - name="detect_memory_leak", - description="Detect memory leaks in trace data", - inputSchema={ - "type": "object", - "properties": { - "experiment_id": {"type": "string"}, - "keywords": {"type": "array", "items": {"type": "string"}, "default": ["memory"]}, - }, - "required": ["experiment_id"], - }, - ), - Tool( - name="detect_changepoints", - description="Detect change points in performance trends (single, zscore, voting, pca)", - inputSchema={ - "type": "object", - "properties": { - "experiment_id": {"type": "string"}, - "keywords": {"type": "array", "items": {"type": "string"}, "default": ["cpu usage"]}, - "methods": {"type": "array", "items": {"type": "string"}, "default": ["single", "zscore", "voting", "pca"], - "description": "Analysis methods (single, zscore, voting, pca)"}, - }, - "required": ["experiment_id"], - }, - ), - Tool( - name="analyze_correlation", - description="Analyze correlation between outputs for root cause analysis (pearson, kendall, spearman)", - inputSchema={ - "type": "object", - "properties": { - "experiment_id": {"type": "string"}, - "keywords": {"type": "array", "items": {"type": "string"}, "default": ["cpu", "memory"]}, - "method": {"type": "string", "default": "pearson", "enum": ["pearson", "spearman", "kendall"]}, - }, - "required": ["experiment_id"], - }, - ), - Tool( - name="detect_idle_resources", - description="Detect idle/underutilized resources", - inputSchema={ - "type": "object", - "properties": { - "experiment_id": {"type": "string"}, - "keywords": {"type": "array", "items": {"type": "string"}, "default": ["cpu usage"]}, - "cpu_idle_threshold": {"type": "number", "default": 5.0, "description": "CPU idle threshold percentage"}, - "memory_idle_threshold": {"type": "number", "default": 5.0, "description": "Memory idle threshold percentage"}, - "disk_idle_threshold": {"type": "number", "default": 5.0, "description": "Disk idle threshold percentage"}, - }, - "required": ["experiment_id"], - }, - ), - Tool( - name="plan_capacity", - description="Perform capacity planning with predictive models", - inputSchema={ - "type": "object", - "properties": { - "experiment_id": {"type": "string"}, - "keywords": {"type": "array", "items": {"type": "string"}, "default": ["cpu usage"]}, - "horizon": {"type": "integer", "default": 100, "description": "Number of forecast steps"}, - }, - "required": ["experiment_id"], - }, - ), - ] - - -@server.call_tool() -async def call_tool(name: str, arguments: Optional[dict] = None) -> list[TextContent]: - arguments = arguments if isinstance(arguments, dict) else {} - global_args = build_args(arguments, {"host": "--host", "port": "--port"}) - - if name == "create_experiment": - out = run_cli(*global_args, "create", *arguments["traces"], "-n", arguments["experiment_name"]) - - elif name == "list_experiments": - out = run_cli(*global_args, "list") - - elif name == "list_outputs": - args = build_args(arguments, {"keywords": "-k"}) - out = run_cli(*global_args, "list-outputs", arguments["experiment_id"], *args) - - elif name == "fetch_data": - args = build_args(arguments, {"keywords": "-k", "output_file": "-o"}) - out = run_cli(*global_args, "fetch-data", arguments["experiment_id"], *args) - - elif name == "delete_experiment": - out = run_cli(*global_args, "delete", arguments["experiment_id"]) - - elif name == "detect_anomalies": - args = build_args(arguments, {"keywords": "-k", "method": "-m"}) - out = run_cli(*global_args, "anomaly", arguments["experiment_id"], *args) - - elif name == "detect_memory_leak": - args = build_args(arguments, {"keywords": "-k"}) - out = run_cli(*global_args, "memory-leak", arguments["experiment_id"], *args) - - elif name == "detect_changepoints": - args = build_args(arguments, {"keywords": "-k", "methods": "-m"}) - out = run_cli(*global_args, "changepoint", arguments["experiment_id"], *args) - - elif name == "analyze_correlation": - args = build_args(arguments, {"keywords": "-k", "method": "-m"}) - out = run_cli(*global_args, "correlation", arguments["experiment_id"], *args) - - elif name == "detect_idle_resources": - args = build_args(arguments, {"keywords": "-k", - "cpu_idle_threshold": "--cpu-idle-threshold", - "memory_idle_threshold": "--memory-idle-threshold", - "disk_idle_threshold": "--disk-idle-threshold"}) - out = run_cli(*global_args, "idle-resources", arguments["experiment_id"], *args) - - elif name == "plan_capacity": - args = build_args(arguments, {"keywords": "-k", "horizon": "-H"}) - out = run_cli(*global_args, "capacity", arguments["experiment_id"], *args) - - else: - raise ValueError(f"Unknown tool: {name}") - - return [TextContent(type="text", text=out)] - - -async def main(): - async with stdio_server() as (read_stream, write_stream): - await server.run(read_stream, write_stream, server.create_initialization_options()) +def _global_args(host: Optional[str], port: Optional[int]) -> list[str]: + args = [] + if host: + args.extend(["--host", host]) + if port: + args.extend(["--port", str(port)]) + return args + + +# --------------------------------------------------------------------------- +# Tools +# --------------------------------------------------------------------------- + +@mcp.tool() +@_safe_tool +def create_experiment(traces: list[str], experiment_name: str, host: Optional[str] = None, port: Optional[int] = None) -> str: + """Create a trace experiment from LTTng trace files or directories.""" + return run_cli(*_global_args(host, port), "create", *traces, "-n", experiment_name) + + +@mcp.tool() +@_safe_tool +def list_experiments() -> str: + """List all open experiments.""" + return run_cli("list") + + +@mcp.tool() +@_safe_tool +def list_outputs(experiment_id: str, keywords: Optional[list[str]] = None) -> str: + """List available outputs for an experiment.""" + args = build_args({"keywords": ("-k", keywords)}) + return run_cli("list-outputs", experiment_id, *args) + + +@mcp.tool() +@_safe_tool +def fetch_data(experiment_id: str, keywords: Optional[list[str]] = None, output_file: Optional[str] = None) -> str: + """Fetch data from experiment outputs.""" + args = build_args({"keywords": ("-k", keywords or ["cpu usage"]), "output_file": ("-o", output_file)}) + return run_cli("fetch-data", experiment_id, *args) + + +@mcp.tool() +@_safe_tool +def delete_experiment(experiment_id: str) -> str: + """Delete an experiment.""" + return run_cli("delete", experiment_id) + + +@mcp.tool() +def detect_anomalies(experiment_id: str, keywords: Optional[list[str]] = None, method: Optional[str] = None, resample_freq: Optional[str] = None) -> str: + """Detect anomalies in trace data using ML methods (iforest, zscore, iqr, moving_average, seasonality, frequency_domain, combined).""" + args = build_args({"keywords": ("-k", keywords or ["cpu usage"]), "method": ("-m", method or "iforest"), "resample_freq": ("-H", resample_freq)}) + return run_cli("anomaly", experiment_id, *args) + + +@mcp.tool() +@_safe_tool +def detect_memory_leak(experiment_id: str, keywords: Optional[list[str]] = None) -> str: + """Detect memory leaks in trace data.""" + args = build_args({"keywords": ("-k", keywords or ["memory"])}) + return run_cli("memory-leak", experiment_id, *args) + + +@mcp.tool() +def detect_changepoints(experiment_id: str, keywords: Optional[list[str]] = None, methods: Optional[list[str]] = None) -> str: + """Detect change points in performance trends (single, zscore, voting, pca).""" + args = build_args({"keywords": ("-k", keywords or ["cpu usage"]), "methods": ("-m", methods or ["single", "zscore", "voting", "pca"])}) + return run_cli("changepoint", experiment_id, *args) + + +@mcp.tool() +@_safe_tool +def analyze_correlation(experiment_id: str, keywords: Optional[list[str]] = None, method: Optional[str] = None) -> str: + """Analyze correlation between outputs for root cause analysis (pearson, kendall, spearman).""" + args = build_args({"keywords": ("-k", keywords or ["cpu", "memory"]), "method": ("-m", method or "pearson")}) + return run_cli("correlation", experiment_id, *args) + + +@mcp.tool() +def detect_idle_resources(experiment_id: str, keywords: Optional[list[str]] = None, + cpu_idle_threshold: Optional[float] = None, + memory_idle_threshold: Optional[float] = None, + disk_idle_threshold: Optional[float] = None) -> str: + """Detect idle/underutilized resources.""" + args = build_args({ + "keywords": ("-k", keywords or ["cpu usage"]), + "cpu": ("--cpu-idle-threshold", cpu_idle_threshold), + "memory": ("--memory-idle-threshold", memory_idle_threshold), + "disk": ("--disk-idle-threshold", disk_idle_threshold), + }) + return run_cli("idle-resources", experiment_id, *args) + + +@mcp.tool() +@_safe_tool +def plan_capacity(experiment_id: str, keywords: Optional[list[str]] = None, horizon: Optional[int] = None) -> str: + """Perform capacity planning with predictive models.""" + args = build_args({"keywords": ("-k", keywords or ["cpu usage"]), "horizon": ("-H", horizon or 100)}) + return run_cli("capacity", experiment_id, *args) + + +@mcp.tool() +@_safe_tool +def plot_xy_with_anomalies( + experiment_id: str, + keywords: Optional[list[str]] = None, + method: Optional[str] = None, + host: Optional[str] = None, + port: Optional[int] = None, + resample_freq: Optional[str] = None, +) -> list[TextContent | ImageContent]: + """Fetch XY data from an experiment, run anomaly detection, and return an annotated plot image with a text summary.""" + from tmll.tmll_client import TMLLClient + from tmll.common.models.experiment import Experiment + from tmll.ml.modules.anomaly_detection.anomaly_detection_module import AnomalyDetection + + h = host or DEFAULT_HOST + p = port or DEFAULT_PORT + keywords = keywords or ["cpu usage"] + method = method or "iforest" + + client = TMLLClient(h, p) + + resp = client.tsp_client.fetch_experiment(experiment_id) + if resp.status_code != 200: + return [TextContent(type="text", text=f"Experiment {experiment_id} not found (status={resp.status_code}).")] + experiment = Experiment.from_tsp_experiment(resp.model) + experiment.assign_outputs(client._fetch_outputs(experiment)) + + outputs = experiment.find_outputs(keyword=keywords, type=["xy"]) + if not outputs: + return [TextContent(type="text", text="No XY outputs found matching keywords.")] + + ad_kwargs = {} + if resample_freq: + ad_kwargs["resample_freq"] = resample_freq + ad = AnomalyDetection(client, experiment, outputs, **ad_kwargs) + result = ad.find_anomalies(method=method) + if not result or not result.anomalies: + return [TextContent(type="text", text="Anomaly detection returned no results.")] + + colors = plt.colormaps.get_cmap("tab10") + contents: list[TextContent | ImageContent] = [] + total_anomalies = 0 + + for idx, (name, dataframe) in enumerate(ad.dataframes.items()): + anomaly_df = result.anomalies.get(name, pd.DataFrame()) + periods = result.anomaly_periods.get(name, []) + + fig, ax = plt.subplots(figsize=(14, 4), dpi=120) + ax.plot(dataframe.index, dataframe.iloc[:, 0], color=colors(idx), linewidth=1.2, label=name) + + for i, (start, end) in enumerate(periods): + ax.axvspan(start, end, color="red", alpha=0.2, label="Anomaly Period" if i == 0 else None) + + if not anomaly_df.empty: + is_anomaly_cols = anomaly_df.filter(regex="_is_anomaly$") + if not is_anomaly_cols.empty: + is_anomaly = is_anomaly_cols.any(axis=1) + else: + is_anomaly = anomaly_df.any(axis=1) + n_anomaly_points = int(is_anomaly.sum()) + total_anomalies += n_anomaly_points + + # Scatter points not already inside a shaded period + for point in anomaly_df[is_anomaly].index: + if any(s <= point <= e for s, e in periods): + continue + if point in dataframe.index: + ax.scatter(point, dataframe.loc[point].values[0], color="red", s=40, zorder=5) + + ax.set_title(f"Anomaly Detection: {name} ({method})") + ax.set_xlabel("Time") + ax.set_ylabel(name) + ax.legend(loc="upper right", fontsize=8) + fig.tight_layout() + + buf = io.BytesIO() + fig.savefig(buf, format="png") + plt.close(fig) + buf.seek(0) + contents.append(ImageContent(type="image", data=base64.b64encode(buf.read()).decode(), mimeType="image/png")) + + period_summary = [] + for name, periods in result.anomaly_periods.items(): + for start, end in periods: + period_summary.append(f" {name}: {start} → {end}") + + summary = f"Found {total_anomalies} anomalies across {len(ad.dataframes)} outputs using '{method}'." + if period_summary: + summary += "\n\nAnomaly periods:\n" + "\n".join(period_summary) + + contents.insert(0, TextContent(type="text", text=summary)) + return contents + if __name__ == "__main__": - asyncio.run(main()) + _log(f"MCP server starting — CLI_PATH={CLI_PATH} python={sys.executable}") + mcp.run() diff --git a/tmll/ml/modules/anomaly_detection/anomaly_detection_module.py b/tmll/ml/modules/anomaly_detection/anomaly_detection_module.py index b03b6d0..c0dbdb0 100644 --- a/tmll/ml/modules/anomaly_detection/anomaly_detection_module.py +++ b/tmll/ml/modules/anomaly_detection/anomaly_detection_module.py @@ -80,9 +80,9 @@ def __init__(self, client: TMLLClient, experiment: Experiment, self._process(outputs, **kwargs) def _process(self, outputs: Optional[List[Output]] = None, **kwargs) -> None: + kwargs.setdefault("min_size", MINIMUM_REQUIRED_DATAPOINTS) super()._process(outputs=outputs, normalize=False, - min_size=kwargs.get("min_size", MINIMUM_REQUIRED_DATAPOINTS), **kwargs) def _post_process(self, **kwargs) -> None: diff --git a/tmll/ml/modules/base_module.py b/tmll/ml/modules/base_module.py index ebba2f3..84a40f7 100644 --- a/tmll/ml/modules/base_module.py +++ b/tmll/ml/modules/base_module.py @@ -141,36 +141,31 @@ def _process(self, outputs: Optional[List[Output]] = None, **kwargs) -> None: self.logger.error("No data fetched") return + self.logger.info(f"Fetched data for {len(data)} outputs") self.outputs = outputs # Process each output - for output_key, output_data in data.items(): - shortened = output_key.split("$")[0] - converted = next(iter(output for output in outputs if output.id == shortened), None) if outputs else None - shortened = converted.name if converted else shortened - - if shortened not in self.dataframes: - df = output_data - - if converted and converted.type == "TIME_GRAPH": - df = df.rename({"start_time": "timestamp"}, axis=1) - df["end_time"] = pd.to_datetime(df["end_time"]) - - # Apply common preprocessing steps - if kwargs.get("normalize", True): - df = self.data_preprocessor.normalize(df) - if kwargs.get("convert_datetime", True): - df = self.data_preprocessor.convert_to_datetime(df) - if kwargs.get("resample", True): - df = self.data_preprocessor.resample(df, frequency=kwargs.get("resample_freq", "1s")) - if kwargs.get("remove_minimum", False): - df = self.data_preprocessor.remove_minimum(df) - - self.dataframes[shortened] = df - + for output_id, output_content in data.items(): + converted = next(iter(output for output in outputs if output.id == output_id), None) if outputs else None + self.logger.info(f"Processing output {output_id}, type(content)={type(output_content)}") + + # If output_content is a dict, it's TREE_TIME_XY with series + if isinstance(output_content, dict): + self.logger.info(f"Output {output_id} has {len(output_content)} series") + for series_name, df in output_content.items(): + name = f"{converted.name if converted else output_id} - {series_name}" + self.logger.info(f"Adding series {name} with {len(df)} rows") + self._add_dataframe(name, df, converted, **kwargs) + else: + name = converted.name if converted else output_id + self.logger.info(f"Adding output {name} with {len(output_content)} rows") + self._add_dataframe(name, output_content, converted, **kwargs) + + self.logger.info(f"Dataframes before filtering: {list(self.dataframes.keys())}") # Filter out dataframes with less than min_size instances min_size = kwargs.get("min_size", 1) self.dataframes = {k: v for k, v in self.dataframes.items() if len(v) >= min_size} + self.logger.info(f"Dataframes after filtering (min_size={min_size}): {list(self.dataframes.keys())}") # Align timestamps if needed if kwargs.get("align_timestamps", True) and self.dataframes: @@ -179,6 +174,28 @@ def _process(self, outputs: Optional[List[Output]] = None, **kwargs) -> None: # Call module-specific post-processing self._post_process(**kwargs) + def _add_dataframe(self, name: str, df: pd.DataFrame, output: Optional[Output], **kwargs) -> None: + """Helper to preprocess and add a dataframe to self.dataframes""" + if name not in self.dataframes: + if output and output.type == "TIME_GRAPH": + df = df.rename({"start_time": "timestamp"}, axis=1) + df["end_time"] = pd.to_datetime(df["end_time"]) + + if output and output.type == "TREE_TIME_XY": + df = df.rename(columns={"x": "timestamp", "y": "value"}) + + # Apply common preprocessing steps + if kwargs.get("normalize", True): + df = self.data_preprocessor.normalize(df) + if kwargs.get("convert_datetime", True): + df = self.data_preprocessor.convert_to_datetime(df) + if kwargs.get("resample", True): + df = self.data_preprocessor.resample(df, frequency=kwargs.get("resample_freq", "1s")) + if kwargs.get("remove_minimum", False): + df = self.data_preprocessor.remove_minimum(df) + + self.dataframes[name] = df + @abstractmethod def _post_process(self, **kwargs) -> None: """