Building a lakehouse based AI evaluation platform
A demo project to show how to build a lakehouse based AI eval platform, using self hosted open source stack.
Full code: https://github.com/saumitras/ai-eval-lakehouse-demo

Features
- Runs LLM evaluations at scale — feed it a dataset of multistage Q&A cases, it runs them through an LLM agent in parallel using Ray and scores each response automatically
- Pluggable agents & judges — swap between OpenAI GPT-4o, Google Gemini, or bring your own agent; same for the scoring judge
- Everything lands in a data lakehouse — all results, traces, and metrics are stored as queryable Iceberg tables on S3, not just logs or flat files
- Streams agent traces in real-time — every step an agent takes (user turn, LLM call, tool call, retrieval) is emitted to Kafka and streamed into the lakehouse via Spark
- Automatically finds failure patterns — clusters failing cases using embeddings + HDBSCAN, then uses an LLM to summarise what went wrong in each cluster
- Deterministic replay — re-run any past case with a new model or agent using the exact same stored inputs, for apples-to-apples comparisons
- Full experiment lineage — every run records dataset version, model, agent, prompt, retriever, and commit-id so results are always reproducible and traceable
- Orchestrated end-to-end — Airflow DAGs handle the full pipeline from dataset ingestion → evaluation → clustering, with a REST API as the control surface
- Agentic failure analysis with Google ADK + MCP — after clustering, an agent reads each failure cluster writes a one-sentence summary of the failure pattern back to Iceberg
- SQL analytics — query results via Trino and visualise them via Superset dashboards (leaderboard, failure clusters, metric trends, etc)
Architecture Overview
The Full Stack
The system is split into five planes that each own a distinct concern:
| Plane | Responsibility | Key Technologies |
|---|---|---|
| Control | Orchestration, API, job tracking | Airflow, FastAPI, PostgreSQL |
| Evaluation | Distributed agent execution, scoring, deterministic replay | Ray, pluggable agents, DeepEval |
| Streaming | Real-time trace ingestion | Kafka, Spark Structured Streaming |
| Lakehouse | Durable storage + catalog | S3, Apache Iceberg, Apache Polaris |
| Analytics | Querying, dashboards, failure clustering | Trino, Apache Superset |
| Agentic Analysis | Failure cluster summarisation via LLM agent + tool calls | Google ADK, MCP |
Everything runs locally via Docker Compose. S3 is the only external dependency.
Control Plane — Airflow + FastAPI + PostgreSQL
Airflow triggers the pipeline via three DAGs (dataset_ingestion, evaluation_run, failure_clustering). Each DAG calls the FastAPI service and polls GET /jobs/{id} until the job completes — the API never blocks.
FastAPI handles all long-running work as BackgroundTasks and immediately returns a job_id. Job state (running / completed / failed) is persisted in a PostgreSQL jobs table that is auto-created on API startup.
Data Plane — Ray, Agents, and DeepEval Metrics
ExperimentManager reads cases from the dataset_cases Iceberg table and fans them out as Ray remote tasks — one task per eval case, all running in parallel.
Each Ray worker: 1. Dynamically imports the agent class from a dotted path (e.g. evallakehouse.agents.openai_agent.OpenAIAgent) 2. Runs agent.run(case, recorder) — the TraceRecorder is injected so every step is captured automatically 3. Computes DeepEval metrics (correctness, faithfulness, context recall, context precision) using an LLM judge 4. Returns scores back to ExperimentManager which writes experiment_runs, case_results, and metric_results to Iceberg
futures = [evaluate_case.remote(c.to_dict(), run_id, agent_cls_path, []) for c in cases]
results = ray.get(futures)
Four agents are built-in: OpenAIAgent, GeminiAgent (Google ADK), EchoAgent (no API calls), and MockFailureAgent (hardcoded wrong answers for testing clustering).
Streaming Pipeline — Kafka + Spark Structured Streaming
While Ray workers run, every agent step is emitted to Kafka in parallel via TraceEmitter. Two Spark Structured Streaming jobs (running as Docker services) consume these topics continuously:
TraceEmitter
├── topic: agent-traces → spark-trace-ingestion → Iceberg: agent_steps
└── topic: agent-metrics → spark-metrics-ingestion → Iceberg: metric_events
Kafka runs in KRaft mode (no Zookeeper). Spark checkpoints offsets to S3 for exactly-once delivery. The streaming jobs start automatically with ./start.sh and run indefinitely.
Without this pipeline, agent_steps and metric_events stay empty — the replay engine and live metric stream both depend on it.
Lakehouse — Apache Iceberg + Apache Polaris + S3
All evaluation artifacts are stored as Apache Iceberg tables on S3, managed by Apache Polaris (REST catalog backed by PostgreSQL). The full table namespace is polaris.analytics.*.
| Group | Tables |
|---|---|
| Core | dataset_cases, experiment_runs, case_results, metric_results, tool_calls |
| Streaming | agent_steps, metric_events |
| Analysis | failure_clusters, cluster_members |
Iceberg gives snapshot isolation, ACID writes, partition pruning, and time travel:
Polaris state persists across container restarts — catalog setup only runs once per fresh Postgres volume.
Failure Analysis — Embeddings + HDBSCAN + ADK/LLM
After an evaluation run, cases with correctness < 0.7 are pulled from case_results. Each failing case is represented as a text combining the user question, agent answer, and correctness score, then embedded with sentence-transformers (all-MiniLM-L6-v2).
HDBSCAN clusters the embeddings. For each cluster, an ADK LlmAgent is given the cluster members via an MCP server (tools: get_case_count, get_case_text, summarise_pattern) and writes a one-sentence failure pattern summary back to the failure_clusters table.
Replay Engine
The replay engine reads stored agent_steps from Iceberg for a given run_id + case_id, reconstructs the original EvalCase from the user turns, and reruns it through any agent — without needing the original retrieval results or tool outputs.
curl -X POST http://localhost:8000/api/v1/replay \
-d '{"original_run_id": "run_abc123", "case_id": "case_001", "agent_cls_path": "...", "new_model": "gpt-4o"}'
The replay produces a new replay_<12-hex> run ID with its own full trace in Iceberg, enabling direct model-vs-model comparison on identical inputs.
Analytics — Trino + Superset
Trino queries Iceberg tables directly via the Polaris REST catalog. Superset connects to Trino and ships with pre-built dashboards for model leaderboard, per-metric breakdowns, and failure cluster visualisation.
SELECT r.model_name, AVG(c.correctness) AS avg_correctness, COUNT(*) AS total_cases
FROM polaris.analytics.experiment_runs r
JOIN polaris.analytics.case_results c ON r.run_id = c.run_id
GROUP BY r.model_name
ORDER BY avg_correctness DESC;
Superset: http://localhost:8088 — Trino: http://localhost:8081
Components Deep Dive
Evaluation Datasets
Datasets are versioned JSONL files stored under datasets/<dataset_id>/<version>.jsonl. Each line is one eval case.
{
"case_id": "case_002",
"metadata": {"category": "networking", "difficulty": "medium"},
"conversation": [
{"role": "user", "content": "Explain the difference between TCP and UDP."},
{"role": "assistant", "content": "TCP is connection-oriented while UDP is connectionless.",
"context": ["TCP provides reliable, ordered delivery. UDP is faster but does not guarantee delivery."]},
{"role": "user", "content": "Which is faster?"}
],
"expected_behavior": {"contains": ["UDP"]}
}
Key fields: - conversation — full multi-turn dialogue; each turn can carry a context list (retrieved chunks used to produce that turn) - expected_behavior.contains — list of strings the agent response must contain; used as ground truth by the LLM judge - metadata — arbitrary key-value pairs passed through to dataset_cases and available for filtering in SQL
DatasetLoader reads the JSONL and hydrates each line into an EvalCase dataclass. DatasetIngestion writes them to the dataset_cases Iceberg table via Spark, partitioned by dataset_id and dataset_version.
Agents
All agents subclass BaseAgent and implement a single method:
class BaseAgent(ABC):
@abstractmethod
def run(self, case: EvalCase, recorder: TraceRecorder) -> str: ...
The TraceRecorder is injected by the Ray worker — agents call recorder.record() for each step they want to capture.
Four agents are built-in:
| Agent | Class | Use case |
|---|---|---|
OpenAIAgent | evallakehouse.agents.openai_agent.OpenAIAgent | Default; uses AGENT_MODEL from .env |
GeminiAgent | evallakehouse.agents.gemini_agent.GeminiAgent | Google ADK + Gemini via async streaming |
EchoAgent | evallakehouse.agents.echo_agent.EchoAgent | Returns [echo] <last user message>; no API calls |
MockFailureAgent | evallakehouse.agents.mock_failure_agent.MockFailureAgent | Hardcoded wrong answers; generates real failures for clustering tests |
The active agent is set via DEFAULT_AGENT_CLS in ops/.env and can be overridden per-request by passing agent_cls_path in the experiment payload. The class is loaded dynamically at runtime via importlib — no code changes needed to swap agents.
# OpenAIAgent core — records each turn then emits the response with token counts
resp = self._client.chat.completions.create(model=self._model, messages=messages)
recorder.record("assistant", role="assistant", content=content,
tokens_prompt=resp.usage.prompt_tokens,
tokens_completion=resp.usage.completion_tokens)
Metrics & Judging
Metrics are computed inside each Ray worker after the agent responds. The default suite runs four DeepEval metrics against every case:
| Metric | Class | What it measures |
|---|---|---|
correctness | GEval | Does the response correctly answer the question? |
faithfulness | FaithfulnessMetric | Is the response grounded in the retrieved context? |
context_recall | ContextualRecallMetric | Does the retrieval context cover the expected answer? |
context_precision | ContextualPrecisionMetric | Is the retrieved context relevant to the question? |
All four use an LLM-as-judge backend. The judge provider and model are configured independently from the inference agent:
Each metric returns a MetricResult(metric_name, metric_value, reason). A case is marked pass_fail=true when correctness >= 0.7. Results are written to both case_results (aggregated) and metric_results (per-metric breakdown).
To add a custom metric, subclass BaseMetric and add it to the suite:
class BaseMetric(ABC):
@property
@abstractmethod
def name(self) -> str: ...
@abstractmethod
def compute(self, response: str, case_context: dict) -> MetricResult: ...
Trace Capture
Every agent step is captured as a TraceEvent:
@dataclass
class TraceEvent:
run_id: str
case_id: str
step_id: int
step_type: str # user | assistant | tool_call | tool_result | retrieval | system | metric
role: str
content: str
tool_name: str
latency_ms: int
tokens_prompt: int
tokens_completion: int
context: str # JSON-serialised list[str] of retrieved chunks
event_time: str
TraceRecorder wraps the emitter and manages step sequencing. Agents call recorder.record() for each step; the recorder auto-increments step_id, appends to its local events list, and immediately publishes to Kafka via TraceEmitter.
TraceEmitter is a Kafka producer (acks=all, 3 retries) used as a context manager — it flushes and closes on exit:
with TraceEmitter(settings.kafka) as emitter:
recorder = TraceRecorder(run_id, case.case_id, emitter)
response = agent.run(case, recorder)
Two helpers on TraceRecorder are used downstream: - tokens_used() — sums tokens_prompt + tokens_completion across all events; written to case_results - retrieval_context() — deduplicated list of all context strings across all steps; passed to DeepEval faithfulness/recall metrics
Experiment Manager
ExperimentManager is the orchestration core for a single evaluation run:
- Reads all cases for the given
dataset_id+dataset_versionfromdataset_casesIceberg table - Initialises Ray (local mode if no cluster is running)
- Dispatches one
evaluate_case.remote()task per case - Collects results with
ray.get(futures) - Writes three Iceberg tables in sequence:
experiment_runs,case_results,metric_results
The ExperimentConfig dataclass carries all versioning fields that land in experiment_runs:
@dataclass
class ExperimentConfig:
dataset_id: str
dataset_version: str
model_name: str
model_version: str
agent_version: str
prompt_version: str
retriever_version: str
metric_suite_version: str = "v1"
git_commit: str = ""
Each run gets a unique run_id (run_<12-hex>). All downstream tables (case_results, metric_results, agent_steps) join back to experiment_runs on run_id.
Iceberg Tables
All tables live under polaris.analytics.*. The schema is defined in schema_init.py as idempotent CREATE TABLE IF NOT EXISTS DDL and created once via python -m evallakehouse.lakehouse.schema_init.
| Table | Populated by | Partitioned by |
|---|---|---|
dataset_cases | DatasetIngestion (Spark) | dataset_id, dataset_version |
experiment_runs | ExperimentManager | dataset_id, model_name |
case_results | ExperimentManager | run_id |
metric_results | ExperimentManager | metric_name |
agent_steps | Spark Structured Streaming | run_id |
metric_events | Spark Structured Streaming | metric_name |
tool_calls | Agent (optional) | run_id |
failure_clusters | run_failure_clustering() | — |
cluster_members | run_failure_clustering() | cluster_id |
The IcebergCatalog class manages the Spark session, wires up the Polaris REST catalog, and configures S3A credentials for both the Iceberg S3FileIO (Polaris-user role) and the S3A filesystem (checkpoint writes).
Google ADK + MCP Server
After HDBSCAN produces failure clusters, an ADK LlmAgent is used to summarise each one. The agent communicates with the pipeline via a MCP server (mcp_server.py) that exposes the cluster data as callable tools. ADK spawns the MCP server as a subprocess automatically — no separate process or Docker container is needed.
The MCP server exposes three tools:
| Tool | What it does |
|---|---|
get_case_count() | Returns the number of cases in the cluster |
get_case_text(index) | Returns the full text of a case by zero-based index |
summarise_pattern(summary) | Called by the agent to emit its final one-sentence summary |
The agent reads the cases, identifies the common failure pattern, and calls summarise_pattern to write the result. The summary is picked up by the caller and written to the failure_clusters Iceberg table.
# adk_summary.py — agent setup
agent = LlmAgent(
model=model,
name="cluster_summariser",
instruction="Use get_case_count to see how many cases exist, then use get_case_text "
"to read several cases. You MUST always finish by calling summarise_pattern "
"with a single concise sentence describing the common failure pattern.",
tools=[MCPToolset(connection_params=StdioServerParameters(
command="python", args=["mcp_server.py"]
))]
)
Provider is controlled by INFERENCE_PROVIDER in ops/.env: - openai — uses LiteLlm(model="openai/<AGENT_MODEL>") - gemini — uses the Gemini model directly via ADK_MODEL
This pattern — an ADK agent using MCP tools to query a data pipeline and write structured output back to a lakehouse — is directly reusable for any agentic analysis task beyond failure clustering.
Airflow DAGs
Three DAGs, all in evallakehouse/airflow_dags/:
dataset_ingestion — manually triggered; calls POST /datasets/ingest and polls until complete.
docker exec airflow airflow dags trigger dataset_ingestion \
--conf '{"dataset_id": "example_dataset", "version": "v1"}'
evaluation_run — manually triggered; runs the full eval then optionally clusters failures.
docker exec airflow airflow dags trigger evaluation_run \
--conf '{"dataset_id": "example_dataset", "dataset_version": "v1", "model_name": "gpt-4o", \
"model_version": "v1", "agent_version": "v1", "prompt_version": "v1", \
"retriever_version": "v1", "run_clustering": true}'
failure_clustering — scheduled nightly at 02:00; also manually triggerable. Configurable min_cluster_size.
All DAGs use the same async pattern: POST to the API → receive job_id → poll GET /jobs/{id} every 30 seconds. Task timeouts are set to 15–30 minutes.
REST API
FastAPI app at http://localhost:8000. All long-running endpoints return immediately with a job_id; callers poll GET /jobs/{id} for completion.
POST /api/v1/experiments launch evaluation run
POST /api/v1/datasets/ingest ingest JSONL into Iceberg
POST /api/v1/replay replay a stored trace
POST /api/v1/failure-clusters trigger failure clustering
GET /api/v1/jobs/{id} poll async job status
GET /api/v1/runs/{id}/results case-level scores for a run
GET /api/v1/runs/{id}/metrics per-metric breakdown for a run
GET /api/v1/leaderboard avg correctness per model across all runs
GET /api/v1/failure-clusters list clusters ordered by size
DELETE /api/v1/admin/iceberg/clear truncate Iceberg tables (dev/test)
Job state is stored in PostgreSQL. The connection pool is initialised on startup via the FastAPI lifespan handler and torn down on shutdown:
@asynccontextmanager
async def lifespan(app: FastAPI):
init_pool(settings.postgres.dsn)
yield
close_pool()
Full interactive docs at http://localhost:8000/docs.
Setup Guide
Prerequisites
- Docker + Docker Compose
- Python 3.11.15
pipenv—pip install pipenv- An AWS account with an S3 bucket and permissions to create IAM roles and users
- An OpenAI or Google API key (or use
EchoAgentto skip inference entirely)
Step 1 — AWS IAM Setup
Run once to create the two IAM identities Polaris needs:
This creates: - polaris-iceberg-role — IAM role with S3 read/write on the warehouse prefix - polaris-user — IAM user with sts:AssumeRole permission on the above role
The script prints three values. Copy them into ops/.env:
POLARIS_ROLE_ARN=arn:aws:iam::<account_id>:role/polaris-iceberg-role
POLARIS_AWS_ACCESS_KEY_ID=<access_key>
POLARIS_AWS_SECRET_ACCESS_KEY=<secret_key>
Step 2 — Configure ops/.env
ops/.env is the single source of truth for all configuration. Minimum required fields:
# AWS
AWS_REGION=us-east-1
AWS_PROFILE=<your-aws-profile> # or set AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY directly
# S3
S3_BUCKET=<your-s3-bucket>
S3_WAREHOUSE_PATH=evallakehouse/warehouse
# Polaris (from setup_aws_iam.sh output)
POLARIS_ROLE_ARN=arn:aws:iam::<account_id>:role/polaris-iceberg-role
POLARIS_AWS_ACCESS_KEY_ID=<access_key>
POLARIS_AWS_SECRET_ACCESS_KEY=<secret_key>
# Inference
INFERENCE_PROVIDER=openai # or gemini
OPENAI_API_KEY=<your-openai-key>
# Judge
JUDGE_PROVIDER=openai # or gemini
DEEPEVAL_JUDGE_MODEL=gpt-4o
# Agent
DEFAULT_AGENT_CLS=evallakehouse.agents.openai_agent.OpenAIAgent
AGENT_MODEL=gpt-4o
# Gemini (only needed when INFERENCE_PROVIDER=gemini or JUDGE_PROVIDER=gemini)
GOOGLE_API_KEY=<your-google-key>
ADK_MODEL=gemini-2.5-flash-lite
# Failure cluster summaries
CLUSTER_SUMMARY_ENABLED=true
The Polaris root credentials (POLARIS_ROOT_CLIENT_ID / POLARIS_ROOT_CLIENT_SECRET) are fixed bootstrap values — set them once and never change them after the first start.
Step 3 — Start Infrastructure
start.sh runs docker compose up -d, waits 15 seconds, then creates the two Kafka topics:
Services started:
| Service | URL |
|---|---|
| Polaris | http://localhost:8181 |
| Kafka | localhost:9092 |
| PostgreSQL | localhost:5432 |
| Airflow | http://localhost:8080 |
| Trino | http://localhost:8081 |
| Superset | http://localhost:8088 |
| spark-trace-ingestion | Docker service (always on) |
| spark-metrics-ingestion | Docker service (always on) |
Verify all containers are up:
On first run, Docker builds the Spark image and downloads JARs (Iceberg, Kafka, hadoop-aws) — allow ~2 minutes.
Step 4 — Install Dependencies
Step 5 — Bootstrap Polaris Metastore
One-time only — skip this if Postgres already has a
polaris_schema.
This runs the official apache/polaris-admin-tool:1.3.0-incubating Docker image to: 1. Create the polaris_schema tables in the polaris Postgres database 2. Seed the default-realm with the root credentials from ops/.env
Verify:
If you see realm already exists — the realm was already bootstrapped. Safe to ignore; proceed to the next step.
Step 6 — Setup Polaris Catalog and Iceberg Schema
One-time only — Polaris state persists in Postgres across restarts.
setup_polaris.py does the following against the Polaris REST API: 1. Creates the S3 warehouse prefix in the bucket 2. Creates the eval_catalog catalog pointing at s3://<bucket>/evallakehouse/warehouse 3. Creates principal eval_user + roles with full catalog privileges 4. Rotates credentials and writes POLARIS_CLIENT_ID / POLARIS_CLIENT_SECRET back into ops/.env
schema_init creates all 9 Iceberg tables under polaris.analytics.* backed by S3.
Then restart Trino to pick up the new OAuth credentials:
Step 7 — Register Airflow DAGs
Verify that DAGs are visible:
Verify:
If you have disabled automatic registration, then you can do it manually as below:
Unpause all three DAGs:
docker exec airflow airflow dags unpause dataset_ingestion
docker exec airflow airflow dags unpause evaluation_run
docker exec airflow airflow dags unpause failure_clustering
Step 8 — Start the API
On startup the API initialises the Postgres connection pool and auto-creates the jobs table. Docs at http://localhost:8000/docs.
Verify:
Running Your First Evaluation
The bundled example_dataset has three cases covering general knowledge, networking, and logical reasoning. We'll use it to run an end-to-end evaluation with OpenAIAgent.
Ingest a Dataset
Before running an evaluation, the dataset must be loaded into the dataset_cases Iceberg table. Trigger the dataset_ingestion DAG:
docker exec airflow airflow dags trigger dataset_ingestion \
--conf '{"dataset_id": "example_dataset", "version": "v1"}'
This calls POST /api/v1/datasets/ingest as a background job. The DAG polls until it completes. Monitor in the Airflow UI at http://localhost:8080 or poll directly:
# Get the job_id from the API response, then poll
curl http://localhost:8000/api/v1/jobs/<job_id>
# {"job_id": "...", "status": "completed", "result": {"dataset_id": "example_dataset", "version": "v1"}}
Verify the cases landed in Iceberg:
docker exec trino trino --server http://localhost:8081 --execute \
"SELECT case_id, dataset_version FROM iceberg.analytics.dataset_cases WHERE dataset_id = 'example_dataset'"
Trigger an Evaluation Run
With the dataset ingested, trigger the evaluation_run DAG. The conf payload maps directly to ExperimentConfig fields that are recorded in experiment_runs for lineage:
docker exec airflow airflow dags trigger evaluation_run \
--conf '{
"dataset_id": "example_dataset",
"dataset_version": "v1",
"model_name": "gpt-4o",
"model_version": "v1",
"agent_version": "v1",
"prompt_version": "v1",
"retriever_version": "v1"
}'
Or trigger directly via the API:
curl -X POST http://localhost:8000/api/v1/experiments \
-H "Content-Type: application/json" \
-d '{
"dataset_id": "example_dataset",
"dataset_version": "v1",
"model_name": "gpt-4o",
"model_version": "v1",
"agent_version": "v1",
"prompt_version": "v1",
"retriever_version": "v1"
}'
# {"status": "started", "job_id": "a3f9c1d2e4b5..."}
What happens under the hood: 1. ExperimentManager reads the 3 cases from dataset_cases 2. Ray dispatches 3 parallel evaluate_case.remote() tasks 3. Each worker runs OpenAIAgent.run(), records steps via TraceRecorder, emits to Kafka 4. DeepEval scores correctness + faithfulness + context recall + context precision per case 5. Results written to experiment_runs, case_results, metric_results 6. Kafka → Spark Streaming → agent_steps and metric_events populated in the background
Poll for completion:
curl http://localhost:8000/api/v1/jobs/<job_id>
# {"status": "completed", "result": {"run_id": "run_a1b2c3d4e5f6"}}
Note the run_id — you'll use it to query results.
Run with OpenAI Agent (Default)
OpenAIAgent is the default — no extra config needed beyond OPENAI_API_KEY and AGENT_MODEL in ops/.env. It sends the full conversation history to the OpenAI Chat Completions API and records prompt + completion token counts on the assistant step:
resp = self._client.chat.completions.create(model=self._model, messages=messages)
recorder.record("assistant", role="assistant", content=content,
tokens_prompt=resp.usage.prompt_tokens,
tokens_completion=resp.usage.completion_tokens)
To switch to Gemini inference instead, set in ops/.env:
INFERENCE_PROVIDER=gemini
DEFAULT_AGENT_CLS=evallakehouse.agents.gemini_agent.GeminiAgent
ADK_MODEL=gemini-2.5-flash-lite
GOOGLE_API_KEY=<your-google-key>
No code changes needed — the agent class is loaded dynamically at runtime.
Run with MockFailureAgent (Test Clustering)
MockFailureAgent returns hardcoded wrong answers per case_id, generating genuine failures without any API calls. Useful for testing the failure clustering pipeline end-to-end:
docker exec airflow airflow dags trigger evaluation_run \
--conf '{
"dataset_id": "example_dataset",
"dataset_version": "v1",
"model_name": "mock-failure",
"model_version": "v1",
"agent_version": "v1",
"prompt_version": "v1",
"retriever_version": "v1",
"agent_cls_path": "evallakehouse.agents.mock_failure_agent.MockFailureAgent",
"run_clustering": true
}'
The agent_cls_path field overrides DEFAULT_AGENT_CLS for this run only — the .env is unchanged. Passing run_clustering: true triggers the cluster_failures task in the same DAG immediately after evaluation completes.
Checking Results
Via the REST API
Once the run_id is in hand, three endpoints give you results immediately without touching SQL:
Case-level scores:
[
{"run_id": "run_a1b2c3d4e5f6", "case_id": "case_001", "correctness": 0.95, "faithfulness": 0.88, "latency_ms": 1243, "tokens_used": 312, "pass_fail": true},
{"run_id": "run_a1b2c3d4e5f6", "case_id": "case_002", "correctness": 0.82, "faithfulness": 0.91, "latency_ms": 1876, "tokens_used": 489, "pass_fail": true},
{"run_id": "run_a1b2c3d4e5f6", "case_id": "case_003", "correctness": 0.61, "faithfulness": 0.74, "latency_ms": 987, "tokens_used": 278, "pass_fail": false}
]
Per-metric breakdown:
[
{"run_id": "run_a1b2c3d4e5f6", "case_id": "case_001", "metric_name": "correctness", "metric_value": 0.95},
{"run_id": "run_a1b2c3d4e5f6", "case_id": "case_001", "metric_name": "faithfulness", "metric_value": 0.88},
{"run_id": "run_a1b2c3d4e5f6", "case_id": "case_001", "metric_name": "context_recall", "metric_value": 0.90},
{"run_id": "run_a1b2c3d4e5f6", "case_id": "case_001", "metric_name": "context_precision", "metric_value": 0.85}
]
Model leaderboard across all runs:
[
{"model_name": "gpt-4o", "avg_correctness": 0.91, "total_cases": 3},
{"model_name": "gemini-2.5-flash", "avg_correctness": 0.87, "total_cases": 3}
]
Via Trino SQL
Trino at http://localhost:8081 gives full SQL access over all Iceberg tables. Use fully-qualified names (iceberg.analytics.<table>).
Run metadata + lineage:
SELECT run_id, model_name, model_version, agent_version, prompt_version, run_timestamp
FROM iceberg.analytics.experiment_runs
ORDER BY run_timestamp DESC;
Pass/fail summary for a run:
SELECT
SUM(CASE WHEN pass_fail THEN 1 ELSE 0 END) AS passed,
COUNT(*) AS total,
AVG(correctness) AS avg_correctness,
AVG(faithfulness) AS avg_faithfulness,
AVG(latency_ms) AS avg_latency_ms,
SUM(tokens_used) AS total_tokens
FROM iceberg.analytics.case_results
WHERE run_id = 'run_a1b2c3d4e5f6';
Per-step agent trace for a case:
SELECT step_id, step_type, role, content, tokens_prompt, tokens_completion, latency_ms
FROM iceberg.analytics.agent_steps
WHERE run_id = 'run_a1b2c3d4e5f6' AND case_id = 'case_001'
ORDER BY step_id;
Compare two runs side by side:
SELECT
a.case_id,
a.correctness AS correctness_run_a,
b.correctness AS correctness_run_b,
a.correctness - b.correctness AS delta
FROM iceberg.analytics.case_results a
JOIN iceberg.analytics.case_results b ON a.case_id = b.case_id
WHERE a.run_id = 'run_a1b2c3d4e5f6'
AND b.run_id = 'run_b9c8d7e6f5a4'
ORDER BY delta;
Time travel — query results as of a past snapshot:
SELECT * FROM iceberg.analytics.case_results
FOR TIMESTAMP AS OF TIMESTAMP '2025-06-01 00:00:00 UTC';
Live metric stream (populated by Spark Structured Streaming):
SELECT run_id, case_id, metric_name, metric_value
FROM iceberg.analytics.metric_events
WHERE run_id = 'run_a1b2c3d4e5f6'
ORDER BY case_id, metric_name;
Via Superset Dashboards
Import the pre-built dashboards once:
Then open http://localhost:8088 (admin / admin). All charts connect to Trino on iceberg.analytics.
The dashboard shows following charts:
- Pass/Fail Donut — overall pass vs fail ratio across all cases
- Correctness Score Distribution — correctness scores bucketed into 0.2 ranges
- Model Leaderboard — avg correctness, faithfulness, latency, tokens, and pass count per model
- Correctness Trend by Model — avg correctness per model over time; use this to detect regressions across runs
- Evaluations Run Over Time — run count per model over time
- Tokens vs Latency Bubble — cost vs speed per case, sized by correctness; shows if expensive/slow cases are at least getting the right answer
- Multi-Metric Radar — full capability profile per model across all 4 metrics at a glance
- Cost Efficiency Index — correctness per token and per millisecond per model; which model gives the best return
- Failure Heatmap — correctness by case × model; a full red row means the dataset case is the problem, not the model
- Cluster Severity Matrix — failure clusters by case count and avg correctness; top-left is the danger zone
- Failure Cluster Summary — AI-generated one-sentence failure pattern summaries with case counts

Failure Clustering
How It Works
After evaluation, run_failure_clustering() runs a four-step pipeline entirely from Iceberg data:
1. Identify failing cases
Pulls all cases with correctness < 0.7 from case_results:
spark.table("polaris.analytics.case_results")
.filter("correctness < 0.7")
.select("run_id", "case_id", "correctness")
2. Build text representations
For each failing case, fetches the original user question from dataset_cases and the agent's response from agent_steps, then combines them into a single text:
Q: What is the capital of France?
Agent answer: The capital of France is Lyon.
Correctness score: 0.1
This is why agent_steps must be populated by Spark Structured Streaming before clustering runs.
3. Embed + cluster
Embeds all texts with sentence-transformers (all-MiniLM-L6-v2) and clusters with HDBSCAN:
model = SentenceTransformer("all-MiniLM-L6-v2")
embeddings = model.encode(texts)
clusterer = hdbscan.HDBSCAN(min_cluster_size=2, metric="euclidean")
labels = clusterer.fit_predict(embeddings)
Points labelled -1 by HDBSCAN are noise and excluded. If all points are noise and there are at least 2 cases, they are all assigned to a single cluster as a fallback.
4. Generate summaries + write to Iceberg
For each cluster, an ADK LlmAgent reads the members via MCP tools and writes a one-sentence summary. Results are appended to failure_clusters and cluster_members.
Triggering Clustering
Three ways to trigger:
As part of an evaluation run — pass run_clustering: true in the DAG conf:
docker exec airflow airflow dags trigger evaluation_run \
--conf '{"dataset_id": "example_dataset", "dataset_version": "v1",
"model_name": "gpt-4o", "model_version": "v1",
"agent_version": "v1", "prompt_version": "v1",
"retriever_version": "v1", "run_clustering": true}'
Standalone via the API:
curl -X POST "http://localhost:8000/api/v1/failure-clusters?min_cluster_size=2"
# {"status": "started", "job_id": "c4d5e6f7..."}
# Poll for completion
curl http://localhost:8000/api/v1/jobs/c4d5e6f7...
Via the nightly Airflow DAG — failure_clustering runs automatically at 02:00 daily. Trigger manually with a custom min_cluster_size:
Re-triggering clustering appends new rows. Clear old data first if you want a clean run:
Reading Cluster Results
Via the API:
[
{"cluster_id": "cluster_a1b2c3d4", "summary": "Model confuses European capital cities with regional cities.", "num_cases": 3, "created_at": "2025-06-01T02:00:00"},
{"cluster_id": "cluster_e5f6a7b8", "summary": "Model fails on transitive logical reasoning questions.", "num_cases": 2, "created_at": "2025-06-01T02:00:00"}
]
Via Trino — clusters with member details and scores:
SELECT
fc.cluster_id,
fc.summary AS failure_pattern,
cm.case_id,
cm.run_id,
cr.correctness
FROM iceberg.analytics.failure_clusters fc
JOIN iceberg.analytics.cluster_members cm ON fc.cluster_id = cm.cluster_id
JOIN iceberg.analytics.case_results cr ON cm.case_id = cr.case_id AND cm.run_id = cr.run_id
ORDER BY fc.cluster_id, cm.case_id;
ADK-Powered Cluster Summaries
When CLUSTER_SUMMARY_ENABLED=true, each cluster is summarised by an ADK LlmAgent that uses a stdio MCP server (mcp_server.py) as its tool interface. The MCP server is spawned as a subprocess automatically — no extra Docker container needed.
The MCP server exposes three tools:
| Tool | What it does |
|---|---|
get_case_count() | Returns total number of cases in the cluster |
get_case_text(index) | Returns the full text of a case by zero-based index |
summarise_pattern(summary) | Agent calls this to emit its final one-sentence summary |
The agent is instructed to call get_case_count, read several cases via get_case_text, then always finish by calling summarise_pattern. The summary is written to a temp file by the tool and read back by the caller:
# Agent instruction
"Use get_case_count to see how many cases exist, then use get_case_text to read "
"several cases. You MUST always finish by calling summarise_pattern with a single "
"concise sentence describing the common failure pattern."
Provider is controlled by INFERENCE_PROVIDER in ops/.env: - openai — uses LiteLlm(model="openai/<AGENT_MODEL>") - gemini — uses the Gemini model directly via ADK_MODEL
Deterministic Replay
How Replay Works
The replay engine re-runs a specific case from a past run through a new agent, using only the inputs that were stored in agent_steps — no live retrieval, no tool calls, no external dependencies.
ReplayEngine.replay() does three things:
1. Load the stored trace
Reads all agent_steps rows for the given run_id + case_id from Iceberg, ordered by step_id:
spark.table("polaris.analytics.agent_steps")
.filter(f"run_id = '{run_id}' AND case_id = '{case_id}'")
.orderBy("step_id")
2. Reconstruct the EvalCase
Filters the trace to only step_type == "user" events and rebuilds the conversation from them, including any stored context (retrieved chunks). The reconstructed case carries metadata={"original_run_id": ..., "frozen": "true"} so it's traceable back to the source run:
conversation = [
ConversationTurn(role=e.role, content=e.content, context=e.parsed_context())
for e in trace if e.step_type == "user"
]
3. Run the new agent + emit traces
Loads the new agent class dynamically, runs it against the reconstructed case, and emits a full new trace to Kafka under a new replay_<12-hex> run ID:
new_run_id = f"replay_{uuid.uuid4().hex[:12]}"
with TraceEmitter(settings.kafka) as emitter:
recorder = TraceRecorder(new_run_id, case_id, emitter)
agent.run(case, recorder)
The replay run flows through the same Kafka → Spark → Iceberg pipeline as a normal run, producing a full agent_steps record under the new run_id. This makes it directly comparable to the original in Trino or Superset.
Triggering a Replay
curl -X POST http://localhost:8000/api/v1/replay \
-H "Content-Type: application/json" \
-d '{
"original_run_id": "run_a1b2c3d4e5f6",
"case_id": "case_003",
"agent_cls_path": "evallakehouse.agents.openai_agent.OpenAIAgent",
"new_model": "gpt-4o"
}'
# {"status": "started", "original_run_id": "run_a1b2c3d4e5f6"}
The replay runs as a BackgroundTask and returns immediately. Once complete, compare the original and replay runs side by side in Trino:
SELECT
a.case_id,
a.step_id,
a.content AS original_response,
b.content AS replay_response
FROM iceberg.analytics.agent_steps a
JOIN iceberg.analytics.agent_steps b
ON a.case_id = b.case_id AND a.step_id = b.step_id
WHERE a.run_id = 'run_a1b2c3d4e5f6'
AND b.run_id = 'replay_x9y8z7w6v5u4'
AND a.role = 'assistant';
Key things to note: - The replay only reconstructs user turns — assistant turns from the original run are not fed back in, so the new agent generates a fresh response - agent_steps must be populated by Spark Structured Streaming before replay can load the trace; if the table is empty for a run, load_trace() returns an empty list - Any agent class can be used for replay — swap agent_cls_path to compare OpenAI vs Gemini on the exact same inputs
Experiment Lineage and Reproducibility
Every evaluation run writes a single row to experiment_runs that captures the full version context at the time of execution:
SELECT
run_id, dataset_id, dataset_version,
model_name, model_version,
agent_version, prompt_version, retriever_version, metric_suite_version,
git_commit, config, run_timestamp
FROM iceberg.analytics.experiment_runs
ORDER BY run_timestamp DESC;
All downstream tables — case_results, metric_results, agent_steps, cluster_members — join back to experiment_runs on run_id. This means any result can be traced back to the exact dataset version, model, agent, prompt, and code commit that produced it.
Lineage chain:
experiment_runs (run_id, dataset_version, model_name, agent_version, git_commit, ...)
│
├──► case_results (correctness, faithfulness, latency_ms, tokens_used, pass_fail)
│ │
│ └──► metric_results (per-metric breakdown: correctness, faithfulness,
│ context_recall, context_precision)
│
└──► agent_steps (full per-step trace: role, content, tokens, latency, context)
│
└──► cluster_members → failure_clusters (failure pattern summaries)
Reproducing a past run means re-triggering with the same conf fields:
curl -X POST http://localhost:8000/api/v1/experiments \
-H "Content-Type: application/json" \
-d '{
"dataset_id": "example_dataset",
"dataset_version": "v1",
"model_name": "gpt-4o",
"model_version": "v1",
"agent_version": "v1",
"prompt_version": "v1",
"retriever_version": "v1",
"git_commit": "a3f9c1d"
}'
Comparing two runs — e.g. same dataset, different model:
SELECT
r.run_id, r.model_name, r.agent_version, r.prompt_version,
AVG(c.correctness) AS avg_correctness,
AVG(c.faithfulness) AS avg_faithfulness,
AVG(c.latency_ms) AS avg_latency_ms,
AVG(c.tokens_used) AS avg_tokens
FROM iceberg.analytics.experiment_runs r
JOIN iceberg.analytics.case_results c ON r.run_id = c.run_id
WHERE r.dataset_id = 'example_dataset'
AND r.dataset_version = 'v1'
GROUP BY r.run_id, r.model_name, r.agent_version, r.prompt_version
ORDER BY avg_correctness DESC;
Iceberg snapshots add a second layer of reproducibility — even if rows are appended or deleted, you can query the table as it existed at any point in time:
SELECT * FROM iceberg.analytics.case_results
FOR TIMESTAMP AS OF TIMESTAMP '2025-06-01 00:00:00 UTC';
Together, experiment_runs versioning and Iceberg time travel mean any result in the system is fully reproducible: you know what data was used, what code ran, and what the table looked like at that moment.
Implementing a Custom Agent
Subclass BaseAgent, implement run(), and point DEFAULT_AGENT_CLS at it. That's all that's needed to plug a custom agent into the full pipeline — Ray distribution, trace capture, Kafka streaming, and DeepEval metrics all work automatically.
Minimal example — a RAG agent:
from openai import OpenAI
from evallakehouse.agents.base import BaseAgent
from evallakehouse.datasets.models import EvalCase
from evallakehouse.trace_capture.recorder import TraceRecorder
class MyRAGAgent(BaseAgent):
@property
def version(self) -> str:
return "rag_v1"
def run(self, case: EvalCase, recorder: TraceRecorder) -> str:
client = OpenAI()
# Record user turns
for turn in case.conversation:
if turn.role == "user":
recorder.record("user", role="user", content=turn.content)
user_input = case.conversation[-1].content
# Retrieval step
chunks = my_retriever.search(user_input, top_k=3)
recorder.record("retrieval", role="", content=str(chunks),
context=chunks) # context list fed to DeepEval faithfulness/recall
# LLM call
messages = [
{"role": "system", "content": "Answer using only the provided context."},
{"role": "user", "content": f"Context: {chunks}\n\nQuestion: {user_input}"},
]
resp = client.chat.completions.create(model="gpt-4o", messages=messages)
response = resp.choices[0].message.content
# Record assistant step with token counts — required for tokens_used in case_results
recorder.record("assistant", role="assistant", content=response,
context=chunks,
tokens_prompt=resp.usage.prompt_tokens,
tokens_completion=resp.usage.completion_tokens)
return response
A few things to get right:
- Always record
tokens_promptandtokens_completionon the assistant step — otherwisetokens_usedincase_resultswill be 0 - Pass
context=chunkson retrieval and assistant steps —TraceRecorder.retrieval_context()deduplicates these and passes them to DeepEval's faithfulness and context recall metrics versionproperty — return a meaningful string; it's recorded inexperiment_runsasagent_version
Register the agent in ops/.env:
Or override per-request without touching .env:
curl -X POST http://localhost:8000/api/v1/experiments \
-H "Content-Type: application/json" \
-d '{
"dataset_id": "example_dataset",
"dataset_version": "v1",
"model_name": "my-rag",
"model_version": "v1",
"agent_version": "v1",
"prompt_version": "v1",
"retriever_version": "v1",
"agent_cls_path": "mypackage.agents.MyRAGAgent"
}'
The class is loaded via importlib.import_module at runtime inside each Ray worker — the package just needs to be importable in the Python environment where the workers run.