Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions config/system.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,13 @@ storage:
# database: "./eval_results.db"
# table_name: "evaluation_results"

# Langfuse backend (optional) - export scores to Langfuse observability platform
# Requires: pip install 'lightspeed-evaluation[langfuse]'
# Credentials via env vars: LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_HOST
# Or provide them inline below:
# - type: "langfuse"
# host: "https://cloud.langfuse.com"

# Visualization settings
visualization:
figsize: [12, 8] # Graph size (width, height)
Expand Down
9 changes: 9 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ nlp-metrics = [
"rapidfuzz>=3.0.0,<=3.14.3", # Required for semantic_similarity_distance
]

# Langfuse observability - export evaluation scores to Langfuse
# Install with:
# pip install 'lightspeed-evaluation[langfuse]'
# or
# uv sync --extra langfuse
langfuse = [
"langfuse>=2.0.0,<3.0.0",
]

[dependency-groups]
dev = [
"bandit>=1.7.0,<=1.9.2",
Expand Down
2 changes: 2 additions & 0 deletions src/lightspeed_evaluation/core/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from lightspeed_evaluation.core.storage.config import (
DatabaseBackendConfig,
FileBackendConfig,
LangfuseBackendConfig,
StorageBackendConfig,
)
from lightspeed_evaluation.core.storage.composite_storage import (
Expand Down Expand Up @@ -56,6 +57,7 @@
"StorageError",
"FileBackendConfig",
"DatabaseBackendConfig",
"LangfuseBackendConfig",
"StorageBackendConfig",
"CompositeStorageBackend",
"NoOpStorageBackend",
Expand Down
35 changes: 33 additions & 2 deletions src/lightspeed_evaluation/core/storage/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Configuration models for storage backends.

Defines Pydantic models for file and database storage configuration.
Defines Pydantic models for file, database, and Langfuse storage configuration.
"""

from typing import Annotated, Literal, Optional, Union
Expand Down Expand Up @@ -126,8 +126,39 @@ def validate_connection_fields(self) -> "DatabaseBackendConfig":
return self


class LangfuseBackendConfig(BaseModel):
"""Configuration for Langfuse observability storage backend.

Exports evaluation scores to Langfuse as a trace with per-metric scores.
Requires the ``langfuse`` optional extra: ``pip install 'lightspeed-evaluation[langfuse]'``

Credentials are resolved from config fields first, then ``LANGFUSE_PUBLIC_KEY``,
``LANGFUSE_SECRET_KEY``, and ``LANGFUSE_HOST`` environment variables as fallback.

Example:
- type: "langfuse"
host: "https://cloud.langfuse.com"
"""

model_config = ConfigDict(extra="forbid")

type: Literal["langfuse"] = "langfuse"
host: Optional[str] = Field(
default=None,
description="Langfuse API host URL (falls back to LANGFUSE_HOST env var)",
)
public_key: Optional[str] = Field(
default=None,
description="Langfuse public key (falls back to LANGFUSE_PUBLIC_KEY env var)",
)
secret_key: Optional[str] = Field(
default=None,
description="Langfuse secret key (falls back to LANGFUSE_SECRET_KEY env var)",
)


# Discriminated union for polymorphic storage configuration
StorageBackendConfig = Annotated[
Union[FileBackendConfig, DatabaseBackendConfig],
Union[FileBackendConfig, DatabaseBackendConfig, LangfuseBackendConfig],
Field(discriminator="type"),
]
5 changes: 5 additions & 0 deletions src/lightspeed_evaluation/core/storage/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
from lightspeed_evaluation.core.storage.config import (
DatabaseBackendConfig,
FileBackendConfig,
LangfuseBackendConfig,
StorageBackendConfig,
)
from lightspeed_evaluation.core.storage.file_storage import FileStorageBackend
from lightspeed_evaluation.core.storage.langfuse_storage import LangfuseStorageBackend
from lightspeed_evaluation.core.storage.protocol import BaseStorageBackend
from lightspeed_evaluation.core.storage.sql_storage import SQLStorageBackend
from lightspeed_evaluation.core.system.exceptions import ConfigurationError
Expand Down Expand Up @@ -127,6 +129,9 @@ def create_pipeline_storage_backend(
"File storage entries in ``storage`` require ``system_config`` "
"when building the pipeline storage backend."
)
elif isinstance(config, LangfuseBackendConfig):
logger.info("Pipeline storage: langfuse backend")
backends.append(LangfuseStorageBackend(config))
else:
raise ConfigurationError(
f"Unknown storage backend type: {type(config).__name__!r}"
Expand Down
244 changes: 244 additions & 0 deletions src/lightspeed_evaluation/core/storage/langfuse_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
"""Langfuse storage backend for evaluation results.

Implements :class:`~lightspeed_evaluation.core.storage.protocol.BaseStorageBackend`
so Langfuse plugs into the standard pipeline storage lifecycle without any
changes to the runner, API, or pipeline modules.

Install with: ``pip install 'lightspeed-evaluation[langfuse]'``

Credentials are resolved from :class:`LangfuseBackendConfig` fields first,
then from ``LANGFUSE_PUBLIC_KEY``, ``LANGFUSE_SECRET_KEY``, and
``LANGFUSE_HOST`` environment variables as fallback (standard Langfuse SDK
behavior).

Lifecycle:
1. ``initialize(run_info)`` — creates the Langfuse client and trace.
2. ``save_run(results)`` — accumulates all results (called per conversation).
3. ``finalize()`` — writes scores to the trace and flushes.
4. ``close()`` — shuts down the client.
"""

from __future__ import annotations

import importlib
import logging
from typing import Any, Optional, Union

from lightspeed_evaluation.core.models.data import EvaluationData, EvaluationResult
from lightspeed_evaluation.core.storage.config import LangfuseBackendConfig
from lightspeed_evaluation.core.storage.protocol import RunInfo

logger = logging.getLogger(__name__)

_HAS_LANGFUSE = importlib.util.find_spec("langfuse") is not None


class LangfuseStorageBackend:
"""Storage backend that exports evaluation results to Langfuse.

Creates one Langfuse trace per evaluation run with one score per
evaluation result. Results with ``score=None`` (ERROR/SKIPPED) are
skipped from numeric scoring but their status is logged.

All Langfuse SDK errors are caught and logged — they never fail
the evaluation pipeline.
"""

def __init__(self, config: LangfuseBackendConfig) -> None:
"""Initialize the Langfuse storage backend.

Args:
config: Langfuse backend configuration with optional host,
public_key, and secret_key fields.
"""
self._config = config
self._client: Any = None
self._trace: Any = None
self._run_info: Optional[RunInfo] = None
self._results: list[EvaluationResult] = []

@property
def backend_name(self) -> str:
"""Return the name of this storage backend."""
return "langfuse"

def initialize(self, run_info: RunInfo) -> None:
"""Create the Langfuse client and a trace for this run."""
self._run_info = run_info
self._results = []

if not _HAS_LANGFUSE:
logger.error(
"langfuse is not installed. "
"Add: pip install 'lightspeed-evaluation[langfuse]'"
)
return

langfuse_mod = importlib.import_module("langfuse")
langfuse_cls = getattr(langfuse_mod, "Langfuse")

kwargs = self._build_client_kwargs()
try:
self._client = langfuse_cls(**kwargs)
except (RuntimeError, ValueError, OSError, ConnectionError):
logger.exception("langfuse: failed to initialize client")
self._client = None

def save_result(self, result: EvaluationResult) -> None:
"""Accumulate a single result for batch export at finalize."""
self._results.append(result)

def save_run(self, results: list[EvaluationResult]) -> None:
"""Accumulate conversation results for batch export at finalize."""
self._results.extend(results)

def set_evaluation_context(
self, evaluation_data: Optional[list[EvaluationData]] = None
) -> None:
"""No-op — Langfuse export does not need the full evaluation dataset."""
_ = evaluation_data

def finalize(self) -> None:
"""Create the trace, write all scores, and flush to Langfuse."""
if self._client is None:
return

if not self._results:
logger.info("langfuse: no results to report; skipping")
return

try:
self._write_trace_and_scores()
except (RuntimeError, ValueError, OSError, ConnectionError):
logger.exception("langfuse: failed to write trace and scores")

def close(self) -> None:
"""Shut down the Langfuse client."""
if self._client is not None:
try:
self._client.shutdown()
except (RuntimeError, OSError, ConnectionError):
logger.debug("langfuse: shutdown raised; ignoring")
self._client = None

def _build_client_kwargs(self) -> dict[str, Any]:
"""Build keyword arguments for the Langfuse constructor."""
kwargs: dict[str, Any] = {}
if self._config.public_key:
kwargs["public_key"] = self._config.public_key
if self._config.secret_key:
kwargs["secret_key"] = self._config.secret_key
if self._config.host:
kwargs["host"] = self._config.host.strip()
return kwargs

def _write_trace_and_scores(self) -> None:
"""Create one trace and emit one score per result row."""
run_name = self._run_info.name if self._run_info else "evaluation"

trace_meta: dict[str, Any] = {
"run_name": run_name,
"result_count": len(self._results),
"rows_preview": self._build_rows_preview(),
}

self._trace = self._client.trace(
name=_truncate(f"lightspeed_eval__{run_name}", 256),
metadata=trace_meta,
)

for r in self._results:
if r.score is None:
logger.debug(
"langfuse: skipping score for %s (status=%s, no numeric score)",
r.metric_identifier,
r.result,
)
continue

self._trace.score(
name=_truncate(r.metric_identifier, 200),
value=float(r.score),
comment=_format_comment(r),
metadata=_build_score_metadata(r),
)

self._client.flush()

def _build_rows_preview(self) -> list[dict[str, Any]]:
"""Build a compact preview of the first 50 rows for trace metadata."""
preview: list[dict[str, Any]] = []
for i, r in enumerate(self._results[:50]):
preview.append(
{
"idx": i,
"conversation_group_id": r.conversation_group_id,
"turn_id": r.turn_id or "",
"metric": r.metric_identifier,
"result": r.result,
"score": r.score,
}
)
return preview


def _format_comment(r: EvaluationResult) -> str:
"""Build a human-readable comment for a Langfuse score entry."""
parts: list[str] = [
f"result={r.result}",
f"conversation_group_id={r.conversation_group_id}",
f"turn_id={r.turn_id or ''}",
]
if r.reason:
max_reason = 1200
reason = (
r.reason
if len(r.reason) <= max_reason
else r.reason[: max_reason - 3] + "..."
)
parts.append(f"reason={reason}")
return " | ".join(parts)


def _build_score_metadata(r: EvaluationResult) -> dict[str, Any]:
"""Build per-score metadata mirroring evaluation CSV fields."""
max_text = 8000
return {
"query": _truncate(r.query, max_text) if r.query else "",
"response": _truncate(r.response, max_text) if r.response else "",
"conversation_group_id": r.conversation_group_id,
"turn_id": r.turn_id or "",
"tool_calls": _safe_truncate(r.tool_calls, max_text),
"contexts": _safe_truncate(r.contexts, max_text),
"expected_response": _format_expected_response(r.expected_response, max_text),
"expected_intent": _safe_truncate(r.expected_intent, max_text),
"expected_tool_calls": _safe_truncate(r.expected_tool_calls, max_text),
"expected_keywords": _safe_truncate(r.expected_keywords, max_text),
}


def _safe_truncate(value: Optional[str], max_len: int) -> str:
"""Truncate a nullable string, returning empty string for None."""
if value is None or not str(value).strip():
return ""
return _truncate(str(value), max_len)


def _format_expected_response(
value: Optional[Union[str, list[str]]], max_len: int
) -> str:
"""Format expected_response which can be a string or list of strings."""
if value is None:
return ""
if isinstance(value, list):
text = "\n---\n".join(str(x) for x in value)
else:
text = str(value)
return _truncate(text, max_len)


def _truncate(s: str, max_len: int) -> str:
"""Truncate a string with ellipsis if it exceeds max_len."""
if len(s) <= max_len:
return s
return s[: max_len - 3] + "..."
11 changes: 10 additions & 1 deletion src/lightspeed_evaluation/core/system/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from lightspeed_evaluation.core.storage.config import (
DatabaseBackendConfig,
FileBackendConfig,
LangfuseBackendConfig,
StorageBackendConfig,
)
from lightspeed_evaluation.core.system.exceptions import ConfigurationError
Expand All @@ -34,7 +35,13 @@
logger = logging.getLogger(__name__)

# Supported storage backend types
SUPPORTED_STORAGE_TYPES: tuple[str, ...] = ("file", "sqlite", "postgres", "mysql")
SUPPORTED_STORAGE_TYPES: tuple[str, ...] = (
"file",
"sqlite",
"postgres",
"mysql",
"langfuse",
)
DATABASE_STORAGE_TYPES: tuple[str, ...] = ("sqlite", "postgres", "mysql")


Expand Down Expand Up @@ -261,6 +268,8 @@ def _parse_storage_config(
backends.append(FileBackendConfig(**item))
elif backend_type in DATABASE_STORAGE_TYPES:
backends.append(DatabaseBackendConfig(**item))
elif backend_type == "langfuse":
backends.append(LangfuseBackendConfig(**item))
else:
raise ConfigurationError(
f"Unknown storage backend type {backend_type!r}. "
Expand Down
Loading