Skip to content

Building a lakehouse based AI evaluation platform

A demo project to show how to build a lakehouse based AI eval platform, using self hosted open source stack.

Full code: https://github.com/saumitras/ai-eval-lakehouse-demo

Architecture

Features

  • Runs LLM evaluations at scale — feed it a dataset of multistage Q&A cases, it runs them through an LLM agent in parallel using Ray and scores each response automatically
  • Pluggable agents & judges — swap between OpenAI GPT-4o, Google Gemini, or bring your own agent; same for the scoring judge
  • Everything lands in a data lakehouse — all results, traces, and metrics are stored as queryable Iceberg tables on S3, not just logs or flat files
  • Streams agent traces in real-time — every step an agent takes (user turn, LLM call, tool call, retrieval) is emitted to Kafka and streamed into the lakehouse via Spark
  • Automatically finds failure patterns — clusters failing cases using embeddings + HDBSCAN, then uses an LLM to summarise what went wrong in each cluster
  • Deterministic replay — re-run any past case with a new model or agent using the exact same stored inputs, for apples-to-apples comparisons
  • Full experiment lineage — every run records dataset version, model, agent, prompt, retriever, and commit-id so results are always reproducible and traceable
  • Orchestrated end-to-end — Airflow DAGs handle the full pipeline from dataset ingestion → evaluation → clustering, with a REST API as the control surface
  • Agentic failure analysis with Google ADK + MCP — after clustering, an agent reads each failure cluster writes a one-sentence summary of the failure pattern back to Iceberg
  • SQL analytics — query results via Trino and visualise them via Superset dashboards (leaderboard, failure clusters, metric trends, etc)

Architecture Overview

The Full Stack

The system is split into five planes that each own a distinct concern:

Plane Responsibility Key Technologies
Control Orchestration, API, job tracking Airflow, FastAPI, PostgreSQL
Evaluation Distributed agent execution, scoring, deterministic replay Ray, pluggable agents, DeepEval
Streaming Real-time trace ingestion Kafka, Spark Structured Streaming
Lakehouse Durable storage + catalog S3, Apache Iceberg, Apache Polaris
Analytics Querying, dashboards, failure clustering Trino, Apache Superset
Agentic Analysis Failure cluster summarisation via LLM agent + tool calls Google ADK, MCP

Everything runs locally via Docker Compose. S3 is the only external dependency.

Control Plane — Airflow + FastAPI + PostgreSQL

Airflow triggers the pipeline via three DAGs (dataset_ingestion, evaluation_run, failure_clustering). Each DAG calls the FastAPI service and polls GET /jobs/{id} until the job completes — the API never blocks.

FastAPI handles all long-running work as BackgroundTasks and immediately returns a job_id. Job state (running / completed / failed) is persisted in a PostgreSQL jobs table that is auto-created on API startup.

Airflow DAG  →  POST /api/v1/experiments  →  job_id
             ←  GET  /api/v1/jobs/{id}   ←  poll until completed

Data Plane — Ray, Agents, and DeepEval Metrics

ExperimentManager reads cases from the dataset_cases Iceberg table and fans them out as Ray remote tasks — one task per eval case, all running in parallel.

Each Ray worker: 1. Dynamically imports the agent class from a dotted path (e.g. evallakehouse.agents.openai_agent.OpenAIAgent) 2. Runs agent.run(case, recorder) — the TraceRecorder is injected so every step is captured automatically 3. Computes DeepEval metrics (correctness, faithfulness, context recall, context precision) using an LLM judge 4. Returns scores back to ExperimentManager which writes experiment_runs, case_results, and metric_results to Iceberg

futures = [evaluate_case.remote(c.to_dict(), run_id, agent_cls_path, []) for c in cases]
results = ray.get(futures)

Four agents are built-in: OpenAIAgent, GeminiAgent (Google ADK), EchoAgent (no API calls), and MockFailureAgent (hardcoded wrong answers for testing clustering).

Streaming Pipeline — Kafka + Spark Structured Streaming

While Ray workers run, every agent step is emitted to Kafka in parallel via TraceEmitter. Two Spark Structured Streaming jobs (running as Docker services) consume these topics continuously:

TraceEmitter
  ├── topic: agent-traces   →  spark-trace-ingestion   →  Iceberg: agent_steps
  └── topic: agent-metrics  →  spark-metrics-ingestion →  Iceberg: metric_events

Kafka runs in KRaft mode (no Zookeeper). Spark checkpoints offsets to S3 for exactly-once delivery. The streaming jobs start automatically with ./start.sh and run indefinitely.

Without this pipeline, agent_steps and metric_events stay empty — the replay engine and live metric stream both depend on it.

Lakehouse — Apache Iceberg + Apache Polaris + S3

All evaluation artifacts are stored as Apache Iceberg tables on S3, managed by Apache Polaris (REST catalog backed by PostgreSQL). The full table namespace is polaris.analytics.*.

Group Tables
Core dataset_cases, experiment_runs, case_results, metric_results, tool_calls
Streaming agent_steps, metric_events
Analysis failure_clusters, cluster_members

Iceberg gives snapshot isolation, ACID writes, partition pruning, and time travel:

SELECT * FROM polaris.analytics.case_results
VERSION AS OF TIMESTAMP '2025-01-01 00:00:00';

Polaris state persists across container restarts — catalog setup only runs once per fresh Postgres volume.

Failure Analysis — Embeddings + HDBSCAN + ADK/LLM

After an evaluation run, cases with correctness < 0.7 are pulled from case_results. Each failing case is represented as a text combining the user question, agent answer, and correctness score, then embedded with sentence-transformers (all-MiniLM-L6-v2).

HDBSCAN clusters the embeddings. For each cluster, an ADK LlmAgent is given the cluster members via an MCP server (tools: get_case_count, get_case_text, summarise_pattern) and writes a one-sentence failure pattern summary back to the failure_clusters table.

failing cases → embeddings → HDBSCAN → clusters → ADK agent (via MCP) → summary → Iceberg

Replay Engine

The replay engine reads stored agent_steps from Iceberg for a given run_id + case_id, reconstructs the original EvalCase from the user turns, and reruns it through any agent — without needing the original retrieval results or tool outputs.

curl -X POST http://localhost:8000/api/v1/replay \
  -d '{"original_run_id": "run_abc123", "case_id": "case_001", "agent_cls_path": "...", "new_model": "gpt-4o"}'

The replay produces a new replay_<12-hex> run ID with its own full trace in Iceberg, enabling direct model-vs-model comparison on identical inputs.

Analytics — Trino + Superset

Trino queries Iceberg tables directly via the Polaris REST catalog. Superset connects to Trino and ships with pre-built dashboards for model leaderboard, per-metric breakdowns, and failure cluster visualisation.

SELECT r.model_name, AVG(c.correctness) AS avg_correctness, COUNT(*) AS total_cases
FROM polaris.analytics.experiment_runs r
JOIN polaris.analytics.case_results c ON r.run_id = c.run_id
GROUP BY r.model_name
ORDER BY avg_correctness DESC;

Superset: http://localhost:8088 — Trino: http://localhost:8081


Components Deep Dive

Evaluation Datasets

Datasets are versioned JSONL files stored under datasets/<dataset_id>/<version>.jsonl. Each line is one eval case.

{
  "case_id": "case_002",
  "metadata": {"category": "networking", "difficulty": "medium"},
  "conversation": [
    {"role": "user", "content": "Explain the difference between TCP and UDP."},
    {"role": "assistant", "content": "TCP is connection-oriented while UDP is connectionless.",
     "context": ["TCP provides reliable, ordered delivery. UDP is faster but does not guarantee delivery."]},
    {"role": "user", "content": "Which is faster?"}
  ],
  "expected_behavior": {"contains": ["UDP"]}
}

Key fields: - conversation — full multi-turn dialogue; each turn can carry a context list (retrieved chunks used to produce that turn) - expected_behavior.contains — list of strings the agent response must contain; used as ground truth by the LLM judge - metadata — arbitrary key-value pairs passed through to dataset_cases and available for filtering in SQL

DatasetLoader reads the JSONL and hydrates each line into an EvalCase dataclass. DatasetIngestion writes them to the dataset_cases Iceberg table via Spark, partitioned by dataset_id and dataset_version.

Agents

All agents subclass BaseAgent and implement a single method:

class BaseAgent(ABC):
    @abstractmethod
    def run(self, case: EvalCase, recorder: TraceRecorder) -> str: ...

The TraceRecorder is injected by the Ray worker — agents call recorder.record() for each step they want to capture.

Four agents are built-in:

Agent Class Use case
OpenAIAgent evallakehouse.agents.openai_agent.OpenAIAgent Default; uses AGENT_MODEL from .env
GeminiAgent evallakehouse.agents.gemini_agent.GeminiAgent Google ADK + Gemini via async streaming
EchoAgent evallakehouse.agents.echo_agent.EchoAgent Returns [echo] <last user message>; no API calls
MockFailureAgent evallakehouse.agents.mock_failure_agent.MockFailureAgent Hardcoded wrong answers; generates real failures for clustering tests

The active agent is set via DEFAULT_AGENT_CLS in ops/.env and can be overridden per-request by passing agent_cls_path in the experiment payload. The class is loaded dynamically at runtime via importlib — no code changes needed to swap agents.

# OpenAIAgent core — records each turn then emits the response with token counts
resp = self._client.chat.completions.create(model=self._model, messages=messages)
recorder.record("assistant", role="assistant", content=content,
                tokens_prompt=resp.usage.prompt_tokens,
                tokens_completion=resp.usage.completion_tokens)

Metrics & Judging

Metrics are computed inside each Ray worker after the agent responds. The default suite runs four DeepEval metrics against every case:

Metric Class What it measures
correctness GEval Does the response correctly answer the question?
faithfulness FaithfulnessMetric Is the response grounded in the retrieved context?
context_recall ContextualRecallMetric Does the retrieval context cover the expected answer?
context_precision ContextualPrecisionMetric Is the retrieved context relevant to the question?

All four use an LLM-as-judge backend. The judge provider and model are configured independently from the inference agent:

JUDGE_PROVIDER=openai          # or gemini
DEEPEVAL_JUDGE_MODEL=gpt-4o   # or gemini-2.5-flash-lite

Each metric returns a MetricResult(metric_name, metric_value, reason). A case is marked pass_fail=true when correctness >= 0.7. Results are written to both case_results (aggregated) and metric_results (per-metric breakdown).

To add a custom metric, subclass BaseMetric and add it to the suite:

class BaseMetric(ABC):
    @property
    @abstractmethod
    def name(self) -> str: ...

    @abstractmethod
    def compute(self, response: str, case_context: dict) -> MetricResult: ...

Trace Capture

Every agent step is captured as a TraceEvent:

@dataclass
class TraceEvent:
    run_id: str
    case_id: str
    step_id: int
    step_type: str   # user | assistant | tool_call | tool_result | retrieval | system | metric
    role: str
    content: str
    tool_name: str
    latency_ms: int
    tokens_prompt: int
    tokens_completion: int
    context: str     # JSON-serialised list[str] of retrieved chunks
    event_time: str

TraceRecorder wraps the emitter and manages step sequencing. Agents call recorder.record() for each step; the recorder auto-increments step_id, appends to its local events list, and immediately publishes to Kafka via TraceEmitter.

TraceEmitter is a Kafka producer (acks=all, 3 retries) used as a context manager — it flushes and closes on exit:

with TraceEmitter(settings.kafka) as emitter:
    recorder = TraceRecorder(run_id, case.case_id, emitter)
    response = agent.run(case, recorder)

Two helpers on TraceRecorder are used downstream: - tokens_used() — sums tokens_prompt + tokens_completion across all events; written to case_results - retrieval_context() — deduplicated list of all context strings across all steps; passed to DeepEval faithfulness/recall metrics

Experiment Manager

ExperimentManager is the orchestration core for a single evaluation run:

  1. Reads all cases for the given dataset_id + dataset_version from dataset_cases Iceberg table
  2. Initialises Ray (local mode if no cluster is running)
  3. Dispatches one evaluate_case.remote() task per case
  4. Collects results with ray.get(futures)
  5. Writes three Iceberg tables in sequence: experiment_runs, case_results, metric_results

The ExperimentConfig dataclass carries all versioning fields that land in experiment_runs:

@dataclass
class ExperimentConfig:
    dataset_id: str
    dataset_version: str
    model_name: str
    model_version: str
    agent_version: str
    prompt_version: str
    retriever_version: str
    metric_suite_version: str = "v1"
    git_commit: str = ""

Each run gets a unique run_id (run_<12-hex>). All downstream tables (case_results, metric_results, agent_steps) join back to experiment_runs on run_id.

Iceberg Tables

All tables live under polaris.analytics.*. The schema is defined in schema_init.py as idempotent CREATE TABLE IF NOT EXISTS DDL and created once via python -m evallakehouse.lakehouse.schema_init.

Table Populated by Partitioned by
dataset_cases DatasetIngestion (Spark) dataset_id, dataset_version
experiment_runs ExperimentManager dataset_id, model_name
case_results ExperimentManager run_id
metric_results ExperimentManager metric_name
agent_steps Spark Structured Streaming run_id
metric_events Spark Structured Streaming metric_name
tool_calls Agent (optional) run_id
failure_clusters run_failure_clustering()
cluster_members run_failure_clustering() cluster_id

The IcebergCatalog class manages the Spark session, wires up the Polaris REST catalog, and configures S3A credentials for both the Iceberg S3FileIO (Polaris-user role) and the S3A filesystem (checkpoint writes).

Google ADK + MCP Server

After HDBSCAN produces failure clusters, an ADK LlmAgent is used to summarise each one. The agent communicates with the pipeline via a MCP server (mcp_server.py) that exposes the cluster data as callable tools. ADK spawns the MCP server as a subprocess automatically — no separate process or Docker container is needed.

The MCP server exposes three tools:

Tool What it does
get_case_count() Returns the number of cases in the cluster
get_case_text(index) Returns the full text of a case by zero-based index
summarise_pattern(summary) Called by the agent to emit its final one-sentence summary

The agent reads the cases, identifies the common failure pattern, and calls summarise_pattern to write the result. The summary is picked up by the caller and written to the failure_clusters Iceberg table.

# adk_summary.py — agent setup
agent = LlmAgent(
    model=model,
    name="cluster_summariser",
    instruction="Use get_case_count to see how many cases exist, then use get_case_text "
                "to read several cases. You MUST always finish by calling summarise_pattern "
                "with a single concise sentence describing the common failure pattern.",
    tools=[MCPToolset(connection_params=StdioServerParameters(
        command="python", args=["mcp_server.py"]
    ))]
)

Provider is controlled by INFERENCE_PROVIDER in ops/.env: - openai — uses LiteLlm(model="openai/<AGENT_MODEL>") - gemini — uses the Gemini model directly via ADK_MODEL

This pattern — an ADK agent using MCP tools to query a data pipeline and write structured output back to a lakehouse — is directly reusable for any agentic analysis task beyond failure clustering.

Airflow DAGs

Three DAGs, all in evallakehouse/airflow_dags/:

dataset_ingestion — manually triggered; calls POST /datasets/ingest and polls until complete.

docker exec airflow airflow dags trigger dataset_ingestion \
  --conf '{"dataset_id": "example_dataset", "version": "v1"}'

evaluation_run — manually triggered; runs the full eval then optionally clusters failures.

docker exec airflow airflow dags trigger evaluation_run \
  --conf '{"dataset_id": "example_dataset", "dataset_version": "v1", "model_name": "gpt-4o", \
           "model_version": "v1", "agent_version": "v1", "prompt_version": "v1", \
           "retriever_version": "v1", "run_clustering": true}'

failure_clustering — scheduled nightly at 02:00; also manually triggerable. Configurable min_cluster_size.

All DAGs use the same async pattern: POST to the API → receive job_id → poll GET /jobs/{id} every 30 seconds. Task timeouts are set to 15–30 minutes.

REST API

FastAPI app at http://localhost:8000. All long-running endpoints return immediately with a job_id; callers poll GET /jobs/{id} for completion.

POST /api/v1/experiments          launch evaluation run
POST /api/v1/datasets/ingest      ingest JSONL into Iceberg
POST /api/v1/replay               replay a stored trace
POST /api/v1/failure-clusters     trigger failure clustering
GET  /api/v1/jobs/{id}            poll async job status
GET  /api/v1/runs/{id}/results    case-level scores for a run
GET  /api/v1/runs/{id}/metrics    per-metric breakdown for a run
GET  /api/v1/leaderboard          avg correctness per model across all runs
GET  /api/v1/failure-clusters     list clusters ordered by size
DELETE /api/v1/admin/iceberg/clear  truncate Iceberg tables (dev/test)

Job state is stored in PostgreSQL. The connection pool is initialised on startup via the FastAPI lifespan handler and torn down on shutdown:

@asynccontextmanager
async def lifespan(app: FastAPI):
    init_pool(settings.postgres.dsn)
    yield
    close_pool()

Full interactive docs at http://localhost:8000/docs.


Setup Guide

Prerequisites

  • Docker + Docker Compose
  • Python 3.11.15
  • pipenvpip install pipenv
  • An AWS account with an S3 bucket and permissions to create IAM roles and users
  • An OpenAI or Google API key (or use EchoAgent to skip inference entirely)

Step 1 — AWS IAM Setup

Run once to create the two IAM identities Polaris needs:

bash setup_aws_iam.sh

This creates: - polaris-iceberg-role — IAM role with S3 read/write on the warehouse prefix - polaris-user — IAM user with sts:AssumeRole permission on the above role

The script prints three values. Copy them into ops/.env:

POLARIS_ROLE_ARN=arn:aws:iam::<account_id>:role/polaris-iceberg-role
POLARIS_AWS_ACCESS_KEY_ID=<access_key>
POLARIS_AWS_SECRET_ACCESS_KEY=<secret_key>

Step 2 — Configure ops/.env

ops/.env is the single source of truth for all configuration. Minimum required fields:

# AWS
AWS_REGION=us-east-1
AWS_PROFILE=<your-aws-profile>      # or set AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY directly

# S3
S3_BUCKET=<your-s3-bucket>
S3_WAREHOUSE_PATH=evallakehouse/warehouse

# Polaris (from setup_aws_iam.sh output)
POLARIS_ROLE_ARN=arn:aws:iam::<account_id>:role/polaris-iceberg-role
POLARIS_AWS_ACCESS_KEY_ID=<access_key>
POLARIS_AWS_SECRET_ACCESS_KEY=<secret_key>

# Inference
INFERENCE_PROVIDER=openai           # or gemini
OPENAI_API_KEY=<your-openai-key>

# Judge
JUDGE_PROVIDER=openai               # or gemini
DEEPEVAL_JUDGE_MODEL=gpt-4o

# Agent
DEFAULT_AGENT_CLS=evallakehouse.agents.openai_agent.OpenAIAgent
AGENT_MODEL=gpt-4o

# Gemini (only needed when INFERENCE_PROVIDER=gemini or JUDGE_PROVIDER=gemini)
GOOGLE_API_KEY=<your-google-key>
ADK_MODEL=gemini-2.5-flash-lite

# Failure cluster summaries
CLUSTER_SUMMARY_ENABLED=true

The Polaris root credentials (POLARIS_ROOT_CLIENT_ID / POLARIS_ROOT_CLIENT_SECRET) are fixed bootstrap values — set them once and never change them after the first start.

Step 3 — Start Infrastructure

cd ops && ./start.sh

start.sh runs docker compose up -d, waits 15 seconds, then creates the two Kafka topics:

agent-traces   (4 partitions)
agent-metrics  (4 partitions)

Services started:

Service URL
Polaris http://localhost:8181
Kafka localhost:9092
PostgreSQL localhost:5432
Airflow http://localhost:8080
Trino http://localhost:8081
Superset http://localhost:8088
spark-trace-ingestion Docker service (always on)
spark-metrics-ingestion Docker service (always on)

Verify all containers are up:

docker ps --format "table {{.Names}}\t{{.Status}}"

On first run, Docker builds the Spark image and downloads JARs (Iceberg, Kafka, hadoop-aws) — allow ~2 minutes.

Step 4 — Install Dependencies

cd .. && pipenv install && pipenv shell

Step 5 — Bootstrap Polaris Metastore

One-time only — skip this if Postgres already has a polaris_schema.

cd ops && ./bootstrap_polaris.sh && cd ..

This runs the official apache/polaris-admin-tool:1.3.0-incubating Docker image to: 1. Create the polaris_schema tables in the polaris Postgres database 2. Seed the default-realm with the root credentials from ops/.env

Verify:

docker exec postgres psql -U airflow -d polaris -c "\dt polaris_schema.*"
# Expected: 6 tables

If you see realm already exists — the realm was already bootstrapped. Safe to ignore; proceed to the next step.

Step 6 — Setup Polaris Catalog and Iceberg Schema

One-time only — Polaris state persists in Postgres across restarts.

python setup_polaris.py && python -m evallakehouse.lakehouse.schema_init

setup_polaris.py does the following against the Polaris REST API: 1. Creates the S3 warehouse prefix in the bucket 2. Creates the eval_catalog catalog pointing at s3://<bucket>/evallakehouse/warehouse 3. Creates principal eval_user + roles with full catalog privileges 4. Rotates credentials and writes POLARIS_CLIENT_ID / POLARIS_CLIENT_SECRET back into ops/.env

schema_init creates all 9 Iceberg tables under polaris.analytics.* backed by S3.

Then restart Trino to pick up the new OAuth credentials:

cd ops && docker compose up -d --force-recreate trino && cd ..

Step 7 — Register Airflow DAGs

Verify that DAGs are visible:

Verify:

docker exec airflow airflow dags list

If you have disabled automatic registration, then you can do it manually as below:

docker exec airflow airflow dags reserialize

Unpause all three DAGs:

docker exec airflow airflow dags unpause dataset_ingestion
docker exec airflow airflow dags unpause evaluation_run
docker exec airflow airflow dags unpause failure_clustering

Step 8 — Start the API

uvicorn evallakehouse.api.main:app --reload

On startup the API initialises the Postgres connection pool and auto-creates the jobs table. Docs at http://localhost:8000/docs.

Verify:

curl http://localhost:8000/health
# {"status": "ok"}


Running Your First Evaluation

The bundled example_dataset has three cases covering general knowledge, networking, and logical reasoning. We'll use it to run an end-to-end evaluation with OpenAIAgent.

Ingest a Dataset

Before running an evaluation, the dataset must be loaded into the dataset_cases Iceberg table. Trigger the dataset_ingestion DAG:

docker exec airflow airflow dags trigger dataset_ingestion \
  --conf '{"dataset_id": "example_dataset", "version": "v1"}'

This calls POST /api/v1/datasets/ingest as a background job. The DAG polls until it completes. Monitor in the Airflow UI at http://localhost:8080 or poll directly:

# Get the job_id from the API response, then poll
curl http://localhost:8000/api/v1/jobs/<job_id>
# {"job_id": "...", "status": "completed", "result": {"dataset_id": "example_dataset", "version": "v1"}}

Verify the cases landed in Iceberg:

docker exec trino trino --server http://localhost:8081 --execute \
  "SELECT case_id, dataset_version FROM iceberg.analytics.dataset_cases WHERE dataset_id = 'example_dataset'"
 case_id  | dataset_version
----------+-----------------
 case_001 | v1
 case_002 | v1
 case_003 | v1

Trigger an Evaluation Run

With the dataset ingested, trigger the evaluation_run DAG. The conf payload maps directly to ExperimentConfig fields that are recorded in experiment_runs for lineage:

docker exec airflow airflow dags trigger evaluation_run \
  --conf '{
    "dataset_id": "example_dataset",
    "dataset_version": "v1",
    "model_name": "gpt-4o",
    "model_version": "v1",
    "agent_version": "v1",
    "prompt_version": "v1",
    "retriever_version": "v1"
  }'

Or trigger directly via the API:

curl -X POST http://localhost:8000/api/v1/experiments \
  -H "Content-Type: application/json" \
  -d '{
    "dataset_id": "example_dataset",
    "dataset_version": "v1",
    "model_name": "gpt-4o",
    "model_version": "v1",
    "agent_version": "v1",
    "prompt_version": "v1",
    "retriever_version": "v1"
  }'
# {"status": "started", "job_id": "a3f9c1d2e4b5..."}

What happens under the hood: 1. ExperimentManager reads the 3 cases from dataset_cases 2. Ray dispatches 3 parallel evaluate_case.remote() tasks 3. Each worker runs OpenAIAgent.run(), records steps via TraceRecorder, emits to Kafka 4. DeepEval scores correctness + faithfulness + context recall + context precision per case 5. Results written to experiment_runs, case_results, metric_results 6. Kafka → Spark Streaming → agent_steps and metric_events populated in the background

Poll for completion:

curl http://localhost:8000/api/v1/jobs/<job_id>
# {"status": "completed", "result": {"run_id": "run_a1b2c3d4e5f6"}}

Note the run_id — you'll use it to query results.

Run with OpenAI Agent (Default)

OpenAIAgent is the default — no extra config needed beyond OPENAI_API_KEY and AGENT_MODEL in ops/.env. It sends the full conversation history to the OpenAI Chat Completions API and records prompt + completion token counts on the assistant step:

resp = self._client.chat.completions.create(model=self._model, messages=messages)
recorder.record("assistant", role="assistant", content=content,
                tokens_prompt=resp.usage.prompt_tokens,
                tokens_completion=resp.usage.completion_tokens)

To switch to Gemini inference instead, set in ops/.env:

INFERENCE_PROVIDER=gemini
DEFAULT_AGENT_CLS=evallakehouse.agents.gemini_agent.GeminiAgent
ADK_MODEL=gemini-2.5-flash-lite
GOOGLE_API_KEY=<your-google-key>

No code changes needed — the agent class is loaded dynamically at runtime.

Run with MockFailureAgent (Test Clustering)

MockFailureAgent returns hardcoded wrong answers per case_id, generating genuine failures without any API calls. Useful for testing the failure clustering pipeline end-to-end:

docker exec airflow airflow dags trigger evaluation_run \
  --conf '{
    "dataset_id": "example_dataset",
    "dataset_version": "v1",
    "model_name": "mock-failure",
    "model_version": "v1",
    "agent_version": "v1",
    "prompt_version": "v1",
    "retriever_version": "v1",
    "agent_cls_path": "evallakehouse.agents.mock_failure_agent.MockFailureAgent",
    "run_clustering": true
  }'

The agent_cls_path field overrides DEFAULT_AGENT_CLS for this run only — the .env is unchanged. Passing run_clustering: true triggers the cluster_failures task in the same DAG immediately after evaluation completes.


Checking Results

Via the REST API

Once the run_id is in hand, three endpoints give you results immediately without touching SQL:

Case-level scores:

curl http://localhost:8000/api/v1/runs/run_a1b2c3d4e5f6/results
[
  {"run_id": "run_a1b2c3d4e5f6", "case_id": "case_001", "correctness": 0.95, "faithfulness": 0.88, "latency_ms": 1243, "tokens_used": 312, "pass_fail": true},
  {"run_id": "run_a1b2c3d4e5f6", "case_id": "case_002", "correctness": 0.82, "faithfulness": 0.91, "latency_ms": 1876, "tokens_used": 489, "pass_fail": true},
  {"run_id": "run_a1b2c3d4e5f6", "case_id": "case_003", "correctness": 0.61, "faithfulness": 0.74, "latency_ms": 987,  "tokens_used": 278, "pass_fail": false}
]

Per-metric breakdown:

curl http://localhost:8000/api/v1/runs/run_a1b2c3d4e5f6/metrics
[
  {"run_id": "run_a1b2c3d4e5f6", "case_id": "case_001", "metric_name": "correctness",       "metric_value": 0.95},
  {"run_id": "run_a1b2c3d4e5f6", "case_id": "case_001", "metric_name": "faithfulness",      "metric_value": 0.88},
  {"run_id": "run_a1b2c3d4e5f6", "case_id": "case_001", "metric_name": "context_recall",    "metric_value": 0.90},
  {"run_id": "run_a1b2c3d4e5f6", "case_id": "case_001", "metric_name": "context_precision", "metric_value": 0.85}
]

Model leaderboard across all runs:

curl http://localhost:8000/api/v1/leaderboard
[
  {"model_name": "gpt-4o",            "avg_correctness": 0.91, "total_cases": 3},
  {"model_name": "gemini-2.5-flash",  "avg_correctness": 0.87, "total_cases": 3}
]

Via Trino SQL

Trino at http://localhost:8081 gives full SQL access over all Iceberg tables. Use fully-qualified names (iceberg.analytics.<table>).

Run metadata + lineage:

SELECT run_id, model_name, model_version, agent_version, prompt_version, run_timestamp
FROM iceberg.analytics.experiment_runs
ORDER BY run_timestamp DESC;

Pass/fail summary for a run:

SELECT
  SUM(CASE WHEN pass_fail THEN 1 ELSE 0 END) AS passed,
  COUNT(*) AS total,
  AVG(correctness)  AS avg_correctness,
  AVG(faithfulness) AS avg_faithfulness,
  AVG(latency_ms)   AS avg_latency_ms,
  SUM(tokens_used)  AS total_tokens
FROM iceberg.analytics.case_results
WHERE run_id = 'run_a1b2c3d4e5f6';

Per-step agent trace for a case:

SELECT step_id, step_type, role, content, tokens_prompt, tokens_completion, latency_ms
FROM iceberg.analytics.agent_steps
WHERE run_id = 'run_a1b2c3d4e5f6' AND case_id = 'case_001'
ORDER BY step_id;

Compare two runs side by side:

SELECT
  a.case_id,
  a.correctness AS correctness_run_a,
  b.correctness AS correctness_run_b,
  a.correctness - b.correctness AS delta
FROM iceberg.analytics.case_results a
JOIN iceberg.analytics.case_results b ON a.case_id = b.case_id
WHERE a.run_id = 'run_a1b2c3d4e5f6'
  AND b.run_id = 'run_b9c8d7e6f5a4'
ORDER BY delta;

Time travel — query results as of a past snapshot:

SELECT * FROM iceberg.analytics.case_results
FOR TIMESTAMP AS OF TIMESTAMP '2025-06-01 00:00:00 UTC';

Live metric stream (populated by Spark Structured Streaming):

SELECT run_id, case_id, metric_name, metric_value
FROM iceberg.analytics.metric_events
WHERE run_id = 'run_a1b2c3d4e5f6'
ORDER BY case_id, metric_name;

Via Superset Dashboards

Import the pre-built dashboards once:

python setup_superset.py

Then open http://localhost:8088 (admin / admin). All charts connect to Trino on iceberg.analytics.

The dashboard shows following charts:

  1. Pass/Fail Donut — overall pass vs fail ratio across all cases
  2. Correctness Score Distribution — correctness scores bucketed into 0.2 ranges
  3. Model Leaderboard — avg correctness, faithfulness, latency, tokens, and pass count per model
  4. Correctness Trend by Model — avg correctness per model over time; use this to detect regressions across runs
  5. Evaluations Run Over Time — run count per model over time
  6. Tokens vs Latency Bubble — cost vs speed per case, sized by correctness; shows if expensive/slow cases are at least getting the right answer
  7. Multi-Metric Radar — full capability profile per model across all 4 metrics at a glance
  8. Cost Efficiency Index — correctness per token and per millisecond per model; which model gives the best return
  9. Failure Heatmap — correctness by case × model; a full red row means the dataset case is the problem, not the model
  10. Cluster Severity Matrix — failure clusters by case count and avg correctness; top-left is the danger zone
  11. Failure Cluster Summary — AI-generated one-sentence failure pattern summaries with case counts

Superset Dashboard


Failure Clustering

How It Works

After evaluation, run_failure_clustering() runs a four-step pipeline entirely from Iceberg data:

1. Identify failing cases

Pulls all cases with correctness < 0.7 from case_results:

spark.table("polaris.analytics.case_results")
    .filter("correctness < 0.7")
    .select("run_id", "case_id", "correctness")

2. Build text representations

For each failing case, fetches the original user question from dataset_cases and the agent's response from agent_steps, then combines them into a single text:

Q: What is the capital of France?
Agent answer: The capital of France is Lyon.
Correctness score: 0.1

This is why agent_steps must be populated by Spark Structured Streaming before clustering runs.

3. Embed + cluster

Embeds all texts with sentence-transformers (all-MiniLM-L6-v2) and clusters with HDBSCAN:

model = SentenceTransformer("all-MiniLM-L6-v2")
embeddings = model.encode(texts)

clusterer = hdbscan.HDBSCAN(min_cluster_size=2, metric="euclidean")
labels = clusterer.fit_predict(embeddings)

Points labelled -1 by HDBSCAN are noise and excluded. If all points are noise and there are at least 2 cases, they are all assigned to a single cluster as a fallback.

4. Generate summaries + write to Iceberg

For each cluster, an ADK LlmAgent reads the members via MCP tools and writes a one-sentence summary. Results are appended to failure_clusters and cluster_members.

Triggering Clustering

Three ways to trigger:

As part of an evaluation run — pass run_clustering: true in the DAG conf:

docker exec airflow airflow dags trigger evaluation_run \
  --conf '{"dataset_id": "example_dataset", "dataset_version": "v1",
           "model_name": "gpt-4o", "model_version": "v1",
           "agent_version": "v1", "prompt_version": "v1",
           "retriever_version": "v1", "run_clustering": true}'

Standalone via the API:

curl -X POST "http://localhost:8000/api/v1/failure-clusters?min_cluster_size=2"
# {"status": "started", "job_id": "c4d5e6f7..."}

# Poll for completion
curl http://localhost:8000/api/v1/jobs/c4d5e6f7...

Via the nightly Airflow DAGfailure_clustering runs automatically at 02:00 daily. Trigger manually with a custom min_cluster_size:

docker exec airflow airflow dags trigger failure_clustering \
  --conf '{"min_cluster_size": 3}'

Re-triggering clustering appends new rows. Clear old data first if you want a clean run:

curl -X DELETE "http://localhost:8000/api/v1/admin/iceberg/clear?tables=failure_clusters,cluster_members"

Reading Cluster Results

Via the API:

curl http://localhost:8000/api/v1/failure-clusters
[
  {"cluster_id": "cluster_a1b2c3d4", "summary": "Model confuses European capital cities with regional cities.", "num_cases": 3, "created_at": "2025-06-01T02:00:00"},
  {"cluster_id": "cluster_e5f6a7b8", "summary": "Model fails on transitive logical reasoning questions.", "num_cases": 2, "created_at": "2025-06-01T02:00:00"}
]

Via Trino — clusters with member details and scores:

SELECT
  fc.cluster_id,
  fc.summary AS failure_pattern,
  cm.case_id,
  cm.run_id,
  cr.correctness
FROM iceberg.analytics.failure_clusters fc
JOIN iceberg.analytics.cluster_members cm ON fc.cluster_id = cm.cluster_id
JOIN iceberg.analytics.case_results cr ON cm.case_id = cr.case_id AND cm.run_id = cr.run_id
ORDER BY fc.cluster_id, cm.case_id;

ADK-Powered Cluster Summaries

When CLUSTER_SUMMARY_ENABLED=true, each cluster is summarised by an ADK LlmAgent that uses a stdio MCP server (mcp_server.py) as its tool interface. The MCP server is spawned as a subprocess automatically — no extra Docker container needed.

The MCP server exposes three tools:

Tool What it does
get_case_count() Returns total number of cases in the cluster
get_case_text(index) Returns the full text of a case by zero-based index
summarise_pattern(summary) Agent calls this to emit its final one-sentence summary

The agent is instructed to call get_case_count, read several cases via get_case_text, then always finish by calling summarise_pattern. The summary is written to a temp file by the tool and read back by the caller:

# Agent instruction
"Use get_case_count to see how many cases exist, then use get_case_text to read "
"several cases. You MUST always finish by calling summarise_pattern with a single "
"concise sentence describing the common failure pattern."

Provider is controlled by INFERENCE_PROVIDER in ops/.env: - openai — uses LiteLlm(model="openai/<AGENT_MODEL>") - gemini — uses the Gemini model directly via ADK_MODEL


Deterministic Replay

How Replay Works

The replay engine re-runs a specific case from a past run through a new agent, using only the inputs that were stored in agent_steps — no live retrieval, no tool calls, no external dependencies.

ReplayEngine.replay() does three things:

1. Load the stored trace

Reads all agent_steps rows for the given run_id + case_id from Iceberg, ordered by step_id:

spark.table("polaris.analytics.agent_steps")
    .filter(f"run_id = '{run_id}' AND case_id = '{case_id}'")
    .orderBy("step_id")

2. Reconstruct the EvalCase

Filters the trace to only step_type == "user" events and rebuilds the conversation from them, including any stored context (retrieved chunks). The reconstructed case carries metadata={"original_run_id": ..., "frozen": "true"} so it's traceable back to the source run:

conversation = [
    ConversationTurn(role=e.role, content=e.content, context=e.parsed_context())
    for e in trace if e.step_type == "user"
]

3. Run the new agent + emit traces

Loads the new agent class dynamically, runs it against the reconstructed case, and emits a full new trace to Kafka under a new replay_<12-hex> run ID:

new_run_id = f"replay_{uuid.uuid4().hex[:12]}"
with TraceEmitter(settings.kafka) as emitter:
    recorder = TraceRecorder(new_run_id, case_id, emitter)
    agent.run(case, recorder)

The replay run flows through the same Kafka → Spark → Iceberg pipeline as a normal run, producing a full agent_steps record under the new run_id. This makes it directly comparable to the original in Trino or Superset.

Triggering a Replay

curl -X POST http://localhost:8000/api/v1/replay \
  -H "Content-Type: application/json" \
  -d '{
    "original_run_id": "run_a1b2c3d4e5f6",
    "case_id": "case_003",
    "agent_cls_path": "evallakehouse.agents.openai_agent.OpenAIAgent",
    "new_model": "gpt-4o"
  }'
# {"status": "started", "original_run_id": "run_a1b2c3d4e5f6"}

The replay runs as a BackgroundTask and returns immediately. Once complete, compare the original and replay runs side by side in Trino:

SELECT
  a.case_id,
  a.step_id,
  a.content  AS original_response,
  b.content  AS replay_response
FROM iceberg.analytics.agent_steps a
JOIN iceberg.analytics.agent_steps b
  ON a.case_id = b.case_id AND a.step_id = b.step_id
WHERE a.run_id = 'run_a1b2c3d4e5f6'
  AND b.run_id = 'replay_x9y8z7w6v5u4'
  AND a.role = 'assistant';

Key things to note: - The replay only reconstructs user turns — assistant turns from the original run are not fed back in, so the new agent generates a fresh response - agent_steps must be populated by Spark Structured Streaming before replay can load the trace; if the table is empty for a run, load_trace() returns an empty list - Any agent class can be used for replay — swap agent_cls_path to compare OpenAI vs Gemini on the exact same inputs


Experiment Lineage and Reproducibility

Every evaluation run writes a single row to experiment_runs that captures the full version context at the time of execution:

SELECT
  run_id, dataset_id, dataset_version,
  model_name, model_version,
  agent_version, prompt_version, retriever_version, metric_suite_version,
  git_commit, config, run_timestamp
FROM iceberg.analytics.experiment_runs
ORDER BY run_timestamp DESC;

All downstream tables — case_results, metric_results, agent_steps, cluster_members — join back to experiment_runs on run_id. This means any result can be traced back to the exact dataset version, model, agent, prompt, and code commit that produced it.

Lineage chain:

experiment_runs (run_id, dataset_version, model_name, agent_version, git_commit, ...)
       ├──► case_results    (correctness, faithfulness, latency_ms, tokens_used, pass_fail)
       │         │
       │         └──► metric_results  (per-metric breakdown: correctness, faithfulness,
       │                               context_recall, context_precision)
       └──► agent_steps     (full per-step trace: role, content, tokens, latency, context)
                 └──► cluster_members → failure_clusters  (failure pattern summaries)

Reproducing a past run means re-triggering with the same conf fields:

curl -X POST http://localhost:8000/api/v1/experiments \
  -H "Content-Type: application/json" \
  -d '{
    "dataset_id": "example_dataset",
    "dataset_version": "v1",
    "model_name": "gpt-4o",
    "model_version": "v1",
    "agent_version": "v1",
    "prompt_version": "v1",
    "retriever_version": "v1",
    "git_commit": "a3f9c1d"
  }'

Comparing two runs — e.g. same dataset, different model:

SELECT
  r.run_id, r.model_name, r.agent_version, r.prompt_version,
  AVG(c.correctness)  AS avg_correctness,
  AVG(c.faithfulness) AS avg_faithfulness,
  AVG(c.latency_ms)   AS avg_latency_ms,
  AVG(c.tokens_used)  AS avg_tokens
FROM iceberg.analytics.experiment_runs r
JOIN iceberg.analytics.case_results c ON r.run_id = c.run_id
WHERE r.dataset_id = 'example_dataset'
  AND r.dataset_version = 'v1'
GROUP BY r.run_id, r.model_name, r.agent_version, r.prompt_version
ORDER BY avg_correctness DESC;

Iceberg snapshots add a second layer of reproducibility — even if rows are appended or deleted, you can query the table as it existed at any point in time:

SELECT * FROM iceberg.analytics.case_results
FOR TIMESTAMP AS OF TIMESTAMP '2025-06-01 00:00:00 UTC';

Together, experiment_runs versioning and Iceberg time travel mean any result in the system is fully reproducible: you know what data was used, what code ran, and what the table looked like at that moment.


Implementing a Custom Agent

Subclass BaseAgent, implement run(), and point DEFAULT_AGENT_CLS at it. That's all that's needed to plug a custom agent into the full pipeline — Ray distribution, trace capture, Kafka streaming, and DeepEval metrics all work automatically.

Minimal example — a RAG agent:

from openai import OpenAI
from evallakehouse.agents.base import BaseAgent
from evallakehouse.datasets.models import EvalCase
from evallakehouse.trace_capture.recorder import TraceRecorder

class MyRAGAgent(BaseAgent):

    @property
    def version(self) -> str:
        return "rag_v1"

    def run(self, case: EvalCase, recorder: TraceRecorder) -> str:
        client = OpenAI()

        # Record user turns
        for turn in case.conversation:
            if turn.role == "user":
                recorder.record("user", role="user", content=turn.content)

        user_input = case.conversation[-1].content

        # Retrieval step
        chunks = my_retriever.search(user_input, top_k=3)
        recorder.record("retrieval", role="", content=str(chunks),
                        context=chunks)  # context list fed to DeepEval faithfulness/recall

        # LLM call
        messages = [
            {"role": "system", "content": "Answer using only the provided context."},
            {"role": "user",   "content": f"Context: {chunks}\n\nQuestion: {user_input}"},
        ]
        resp = client.chat.completions.create(model="gpt-4o", messages=messages)
        response = resp.choices[0].message.content

        # Record assistant step with token counts — required for tokens_used in case_results
        recorder.record("assistant", role="assistant", content=response,
                        context=chunks,
                        tokens_prompt=resp.usage.prompt_tokens,
                        tokens_completion=resp.usage.completion_tokens)
        return response

A few things to get right:

  • Always record tokens_prompt and tokens_completion on the assistant step — otherwise tokens_used in case_results will be 0
  • Pass context=chunks on retrieval and assistant steps — TraceRecorder.retrieval_context() deduplicates these and passes them to DeepEval's faithfulness and context recall metrics
  • version property — return a meaningful string; it's recorded in experiment_runs as agent_version

Register the agent in ops/.env:

DEFAULT_AGENT_CLS=mypackage.agents.MyRAGAgent

Or override per-request without touching .env:

curl -X POST http://localhost:8000/api/v1/experiments \
  -H "Content-Type: application/json" \
  -d '{
    "dataset_id": "example_dataset",
    "dataset_version": "v1",
    "model_name": "my-rag",
    "model_version": "v1",
    "agent_version": "v1",
    "prompt_version": "v1",
    "retriever_version": "v1",
    "agent_cls_path": "mypackage.agents.MyRAGAgent"
  }'

The class is loaded via importlib.import_module at runtime inside each Ray worker — the package just needs to be importable in the Python environment where the workers run.