diff --git a/config/system.yaml b/config/system.yaml index 56e7b3a8..a2fca551 100644 --- a/config/system.yaml +++ b/config/system.yaml @@ -295,6 +295,13 @@ storage: # database: "./eval_results.db" # table_name: "evaluation_results" + # Langfuse backend (optional) - export scores to Langfuse observability platform + # Requires: pip install 'lightspeed-evaluation[langfuse]' + # Credentials via env vars: LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_HOST + # Or provide them inline below: + # - type: "langfuse" + # host: "https://cloud.langfuse.com" + # Visualization settings visualization: figsize: [12, 8] # Graph size (width, height) diff --git a/pyproject.toml b/pyproject.toml index 4e78bd0b..fbbff56c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,6 +53,15 @@ nlp-metrics = [ "rapidfuzz>=3.0.0,<=3.14.3", # Required for semantic_similarity_distance ] +# Langfuse observability - export evaluation scores to Langfuse +# Install with: +# pip install 'lightspeed-evaluation[langfuse]' +# or +# uv sync --extra langfuse +langfuse = [ + "langfuse>=2.0.0,<3.0.0", +] + [dependency-groups] dev = [ "bandit>=1.7.0,<=1.9.2", diff --git a/src/lightspeed_evaluation/core/storage/__init__.py b/src/lightspeed_evaluation/core/storage/__init__.py index f4ac128f..55fdf647 100644 --- a/src/lightspeed_evaluation/core/storage/__init__.py +++ b/src/lightspeed_evaluation/core/storage/__init__.py @@ -28,6 +28,7 @@ from lightspeed_evaluation.core.storage.config import ( DatabaseBackendConfig, FileBackendConfig, + LangfuseBackendConfig, StorageBackendConfig, ) from lightspeed_evaluation.core.storage.composite_storage import ( @@ -56,6 +57,7 @@ "StorageError", "FileBackendConfig", "DatabaseBackendConfig", + "LangfuseBackendConfig", "StorageBackendConfig", "CompositeStorageBackend", "NoOpStorageBackend", diff --git a/src/lightspeed_evaluation/core/storage/config.py b/src/lightspeed_evaluation/core/storage/config.py index e8b84e40..27af1386 100644 --- a/src/lightspeed_evaluation/core/storage/config.py +++ b/src/lightspeed_evaluation/core/storage/config.py @@ -1,6 +1,6 @@ """Configuration models for storage backends. -Defines Pydantic models for file and database storage configuration. +Defines Pydantic models for file, database, and Langfuse storage configuration. """ from typing import Annotated, Literal, Optional, Union @@ -126,8 +126,39 @@ def validate_connection_fields(self) -> "DatabaseBackendConfig": return self +class LangfuseBackendConfig(BaseModel): + """Configuration for Langfuse observability storage backend. + + Exports evaluation scores to Langfuse as a trace with per-metric scores. + Requires the ``langfuse`` optional extra: ``pip install 'lightspeed-evaluation[langfuse]'`` + + Credentials are resolved from config fields first, then ``LANGFUSE_PUBLIC_KEY``, + ``LANGFUSE_SECRET_KEY``, and ``LANGFUSE_HOST`` environment variables as fallback. + + Example: + - type: "langfuse" + host: "https://cloud.langfuse.com" + """ + + model_config = ConfigDict(extra="forbid") + + type: Literal["langfuse"] = "langfuse" + host: Optional[str] = Field( + default=None, + description="Langfuse API host URL (falls back to LANGFUSE_HOST env var)", + ) + public_key: Optional[str] = Field( + default=None, + description="Langfuse public key (falls back to LANGFUSE_PUBLIC_KEY env var)", + ) + secret_key: Optional[str] = Field( + default=None, + description="Langfuse secret key (falls back to LANGFUSE_SECRET_KEY env var)", + ) + + # Discriminated union for polymorphic storage configuration StorageBackendConfig = Annotated[ - Union[FileBackendConfig, DatabaseBackendConfig], + Union[FileBackendConfig, DatabaseBackendConfig, LangfuseBackendConfig], Field(discriminator="type"), ] diff --git a/src/lightspeed_evaluation/core/storage/factory.py b/src/lightspeed_evaluation/core/storage/factory.py index c43bb020..889af1b9 100644 --- a/src/lightspeed_evaluation/core/storage/factory.py +++ b/src/lightspeed_evaluation/core/storage/factory.py @@ -14,9 +14,11 @@ from lightspeed_evaluation.core.storage.config import ( DatabaseBackendConfig, FileBackendConfig, + LangfuseBackendConfig, StorageBackendConfig, ) from lightspeed_evaluation.core.storage.file_storage import FileStorageBackend +from lightspeed_evaluation.core.storage.langfuse_storage import LangfuseStorageBackend from lightspeed_evaluation.core.storage.protocol import BaseStorageBackend from lightspeed_evaluation.core.storage.sql_storage import SQLStorageBackend from lightspeed_evaluation.core.system.exceptions import ConfigurationError @@ -127,6 +129,9 @@ def create_pipeline_storage_backend( "File storage entries in ``storage`` require ``system_config`` " "when building the pipeline storage backend." ) + elif isinstance(config, LangfuseBackendConfig): + logger.info("Pipeline storage: langfuse backend") + backends.append(LangfuseStorageBackend(config)) else: raise ConfigurationError( f"Unknown storage backend type: {type(config).__name__!r}" diff --git a/src/lightspeed_evaluation/core/storage/langfuse_storage.py b/src/lightspeed_evaluation/core/storage/langfuse_storage.py new file mode 100644 index 00000000..06260827 --- /dev/null +++ b/src/lightspeed_evaluation/core/storage/langfuse_storage.py @@ -0,0 +1,244 @@ +"""Langfuse storage backend for evaluation results. + +Implements :class:`~lightspeed_evaluation.core.storage.protocol.BaseStorageBackend` +so Langfuse plugs into the standard pipeline storage lifecycle without any +changes to the runner, API, or pipeline modules. + +Install with: ``pip install 'lightspeed-evaluation[langfuse]'`` + +Credentials are resolved from :class:`LangfuseBackendConfig` fields first, +then from ``LANGFUSE_PUBLIC_KEY``, ``LANGFUSE_SECRET_KEY``, and +``LANGFUSE_HOST`` environment variables as fallback (standard Langfuse SDK +behavior). + +Lifecycle: + 1. ``initialize(run_info)`` — creates the Langfuse client and trace. + 2. ``save_run(results)`` — accumulates all results (called per conversation). + 3. ``finalize()`` — writes scores to the trace and flushes. + 4. ``close()`` — shuts down the client. +""" + +from __future__ import annotations + +import importlib +import logging +from typing import Any, Optional, Union + +from lightspeed_evaluation.core.models.data import EvaluationData, EvaluationResult +from lightspeed_evaluation.core.storage.config import LangfuseBackendConfig +from lightspeed_evaluation.core.storage.protocol import RunInfo + +logger = logging.getLogger(__name__) + +_HAS_LANGFUSE = importlib.util.find_spec("langfuse") is not None + + +class LangfuseStorageBackend: + """Storage backend that exports evaluation results to Langfuse. + + Creates one Langfuse trace per evaluation run with one score per + evaluation result. Results with ``score=None`` (ERROR/SKIPPED) are + skipped from numeric scoring but their status is logged. + + All Langfuse SDK errors are caught and logged — they never fail + the evaluation pipeline. + """ + + def __init__(self, config: LangfuseBackendConfig) -> None: + """Initialize the Langfuse storage backend. + + Args: + config: Langfuse backend configuration with optional host, + public_key, and secret_key fields. + """ + self._config = config + self._client: Any = None + self._trace: Any = None + self._run_info: Optional[RunInfo] = None + self._results: list[EvaluationResult] = [] + + @property + def backend_name(self) -> str: + """Return the name of this storage backend.""" + return "langfuse" + + def initialize(self, run_info: RunInfo) -> None: + """Create the Langfuse client and a trace for this run.""" + self._run_info = run_info + self._results = [] + + if not _HAS_LANGFUSE: + logger.error( + "langfuse is not installed. " + "Add: pip install 'lightspeed-evaluation[langfuse]'" + ) + return + + langfuse_mod = importlib.import_module("langfuse") + langfuse_cls = getattr(langfuse_mod, "Langfuse") + + kwargs = self._build_client_kwargs() + try: + self._client = langfuse_cls(**kwargs) + except (RuntimeError, ValueError, OSError, ConnectionError): + logger.exception("langfuse: failed to initialize client") + self._client = None + + def save_result(self, result: EvaluationResult) -> None: + """Accumulate a single result for batch export at finalize.""" + self._results.append(result) + + def save_run(self, results: list[EvaluationResult]) -> None: + """Accumulate conversation results for batch export at finalize.""" + self._results.extend(results) + + def set_evaluation_context( + self, evaluation_data: Optional[list[EvaluationData]] = None + ) -> None: + """No-op — Langfuse export does not need the full evaluation dataset.""" + _ = evaluation_data + + def finalize(self) -> None: + """Create the trace, write all scores, and flush to Langfuse.""" + if self._client is None: + return + + if not self._results: + logger.info("langfuse: no results to report; skipping") + return + + try: + self._write_trace_and_scores() + except (RuntimeError, ValueError, OSError, ConnectionError): + logger.exception("langfuse: failed to write trace and scores") + + def close(self) -> None: + """Shut down the Langfuse client.""" + if self._client is not None: + try: + self._client.shutdown() + except (RuntimeError, OSError, ConnectionError): + logger.debug("langfuse: shutdown raised; ignoring") + self._client = None + + def _build_client_kwargs(self) -> dict[str, Any]: + """Build keyword arguments for the Langfuse constructor.""" + kwargs: dict[str, Any] = {} + if self._config.public_key: + kwargs["public_key"] = self._config.public_key + if self._config.secret_key: + kwargs["secret_key"] = self._config.secret_key + if self._config.host: + kwargs["host"] = self._config.host.strip() + return kwargs + + def _write_trace_and_scores(self) -> None: + """Create one trace and emit one score per result row.""" + run_name = self._run_info.name if self._run_info else "evaluation" + + trace_meta: dict[str, Any] = { + "run_name": run_name, + "result_count": len(self._results), + "rows_preview": self._build_rows_preview(), + } + + self._trace = self._client.trace( + name=_truncate(f"lightspeed_eval__{run_name}", 256), + metadata=trace_meta, + ) + + for r in self._results: + if r.score is None: + logger.debug( + "langfuse: skipping score for %s (status=%s, no numeric score)", + r.metric_identifier, + r.result, + ) + continue + + self._trace.score( + name=_truncate(r.metric_identifier, 200), + value=float(r.score), + comment=_format_comment(r), + metadata=_build_score_metadata(r), + ) + + self._client.flush() + + def _build_rows_preview(self) -> list[dict[str, Any]]: + """Build a compact preview of the first 50 rows for trace metadata.""" + preview: list[dict[str, Any]] = [] + for i, r in enumerate(self._results[:50]): + preview.append( + { + "idx": i, + "conversation_group_id": r.conversation_group_id, + "turn_id": r.turn_id or "", + "metric": r.metric_identifier, + "result": r.result, + "score": r.score, + } + ) + return preview + + +def _format_comment(r: EvaluationResult) -> str: + """Build a human-readable comment for a Langfuse score entry.""" + parts: list[str] = [ + f"result={r.result}", + f"conversation_group_id={r.conversation_group_id}", + f"turn_id={r.turn_id or ''}", + ] + if r.reason: + max_reason = 1200 + reason = ( + r.reason + if len(r.reason) <= max_reason + else r.reason[: max_reason - 3] + "..." + ) + parts.append(f"reason={reason}") + return " | ".join(parts) + + +def _build_score_metadata(r: EvaluationResult) -> dict[str, Any]: + """Build per-score metadata mirroring evaluation CSV fields.""" + max_text = 8000 + return { + "query": _truncate(r.query, max_text) if r.query else "", + "response": _truncate(r.response, max_text) if r.response else "", + "conversation_group_id": r.conversation_group_id, + "turn_id": r.turn_id or "", + "tool_calls": _safe_truncate(r.tool_calls, max_text), + "contexts": _safe_truncate(r.contexts, max_text), + "expected_response": _format_expected_response(r.expected_response, max_text), + "expected_intent": _safe_truncate(r.expected_intent, max_text), + "expected_tool_calls": _safe_truncate(r.expected_tool_calls, max_text), + "expected_keywords": _safe_truncate(r.expected_keywords, max_text), + } + + +def _safe_truncate(value: Optional[str], max_len: int) -> str: + """Truncate a nullable string, returning empty string for None.""" + if value is None or not str(value).strip(): + return "" + return _truncate(str(value), max_len) + + +def _format_expected_response( + value: Optional[Union[str, list[str]]], max_len: int +) -> str: + """Format expected_response which can be a string or list of strings.""" + if value is None: + return "" + if isinstance(value, list): + text = "\n---\n".join(str(x) for x in value) + else: + text = str(value) + return _truncate(text, max_len) + + +def _truncate(s: str, max_len: int) -> str: + """Truncate a string with ellipsis if it exceeds max_len.""" + if len(s) <= max_len: + return s + return s[: max_len - 3] + "..." diff --git a/src/lightspeed_evaluation/core/system/loader.py b/src/lightspeed_evaluation/core/system/loader.py index 7dbb4c4f..ae544c28 100644 --- a/src/lightspeed_evaluation/core/system/loader.py +++ b/src/lightspeed_evaluation/core/system/loader.py @@ -23,6 +23,7 @@ from lightspeed_evaluation.core.storage.config import ( DatabaseBackendConfig, FileBackendConfig, + LangfuseBackendConfig, StorageBackendConfig, ) from lightspeed_evaluation.core.system.exceptions import ConfigurationError @@ -34,7 +35,13 @@ logger = logging.getLogger(__name__) # Supported storage backend types -SUPPORTED_STORAGE_TYPES: tuple[str, ...] = ("file", "sqlite", "postgres", "mysql") +SUPPORTED_STORAGE_TYPES: tuple[str, ...] = ( + "file", + "sqlite", + "postgres", + "mysql", + "langfuse", +) DATABASE_STORAGE_TYPES: tuple[str, ...] = ("sqlite", "postgres", "mysql") @@ -261,6 +268,8 @@ def _parse_storage_config( backends.append(FileBackendConfig(**item)) elif backend_type in DATABASE_STORAGE_TYPES: backends.append(DatabaseBackendConfig(**item)) + elif backend_type == "langfuse": + backends.append(LangfuseBackendConfig(**item)) else: raise ConfigurationError( f"Unknown storage backend type {backend_type!r}. "