Skip to main content

Evaluating cost and hyperparameters for Pinecone RAG systems with W&B Weave

Created on September 19|Last edited on September 19
Retrieval-Augmented Generation, a.k.a. RAG, combines two complementary steps: retrieving relevant information from a vector database and then generating a grounded answer with a language model. This design not only reduces hallucinations but also lowers cost, since the generator works with a focused context instead of producing long, speculative outputs. By tying every answer to retrieved passages, RAG systems give users more accurate responses and provide transparency into where the information came from.
In this article we will show how to build such a pipeline using Pinecone and W&B Weave. Pinecone serves as the vector database, with integrated inference that allows us to embed and index a large corpus of passages directly inside the service. W&B Weave acts as the experiment tracker and evaluation framework, ensuring that each retrieval and generation step is logged and comparable. For the generation stage we rely on W&B Inference, which gives quick access to multiple open source models through a single API, making it easy to swap models in and out during experiments.


The Question

For this article, we want to focus on a guiding question: how do retrieval depth and model choice interact to influence the effectiveness of a retrieval-augmented generation pipeline? Specifically, we are asking whether increasing the number of retrieved passages genuinely improves accuracy or whether it simply adds unnecessary cost and latency. At the same time, we want to know whether different models vary in how well they use the same retrieved context, highlighting strengths and weaknesses across architectures.
To explore this, we adjust two factors in our experiments. The first is retrieval depth, tested with top-k values of 5, 10, and 20 passages. The second is the generation model, where we swap between different open source options available through W&B Inference. By varying these factors together, we can measure the trade-offs between accuracy, latency, and cost. Weave records all results, giving us a unified view of how retrieval and model choice shape pipeline performance.

What is RAG and why use it?

RAG, short for Retrieval-Augmented Generation, is a method that couples retrieval with generation. Large language models are powerful but limited; on their own they rely only on the information stored in their parameters, which can quickly become outdated or incomplete. They also tend to hallucinate, filling in gaps with answers that sound plausible but are not supported by evidence.
RAG addresses these issues by first retrieving relevant passages from a curated knowledge base, then generating an answer using those passages as context. This has three major benefits. It grounds the output in verifiable text, which reduces hallucinations. It keeps the system flexible, since updating the knowledge base instantly updates what the model can reference. And it lowers cost, because the generator works with a narrowed context instead of producing long, speculative completions.
For practical applications, these advantages make RAG the preferred design. Whether you are building search assistants, customer support tools, or internal knowledge bots, RAG ensures answers are more accurate, more current, and more efficient. Combined with vector databases like Pinecone and evaluation frameworks like W&B Weave, RAG becomes a repeatable, measurable way to deliver reliable answers at scale.

The power of Pinecone

A RAG pipeline is only as strong as its retriever, and that makes the vector database a central piece of the system. Pinecone is built specifically for this role. It handles the heavy lifting of storing, indexing, and searching millions of vector embeddings with low latency and high reliability. Instead of managing your own infrastructure for approximate nearest neighbor search, you can rely on Pinecone’s managed service to scale up or down as needed.
One advantage of Pinecone is its integrated inference. You can specify an embedding model at index creation, and Pinecone will automatically embed your text as you upload it. This saves the overhead of running a separate embedding pipeline and ensures consistency between indexing and query time. Metadata support makes it easy to attach query IDs, URLs, or selection labels to each passage, which is essential for evaluation.
For experiments, Pinecone also supports reranking. After retrieving candidate passages by vector similarity, you can apply a cross-encoder reranker to refine the order. This can greatly improve the relevance of the top few results, which are the ones most likely to be fed to the generator.

Building and Evaluating a RAG Pipeline with Pinecone and W&B

Now we will walk through the main stages of setting up and testing a RAG system. The process starts with preparing a dataset that contains queries and relevant passages, moves on to indexing those passages in Pinecone, and then ties retrieval to generation through W&B Inference. Finally, we evaluate the pipeline using W&B Weave, which captures retrieval quality, answer correctness, latency, and cost across different settings.
The goal is not only to build a working pipeline but also to understand how different design choices affect performance. By running controlled experiments with multiple top-k values and multiple generation models, we can see where the trade-offs lie. For example, does increasing k from 5 to 20 actually improve correctness, or does it only inflate cost? Does one model make better use of retrieved passages than another? And how do latency and efficiency shift as we change these parameters?

Creating and account and setting up your environment

To access the dataset and run retrieval, you’ll need active accounts on Hugging Face, Pinecone, and Weights & Biases. Hugging Face provides the dataset and model weights, Pinecone stores the passage index for retrieval, and W&B tracks evaluation runs. After creating accounts, generate API keys from each service’s dashboard.
Set these keys in your environment so the code can authenticate automatically: HUGGINGFACE_API_KEY for Hugging Face dataset access, PINECONE_API_KEY for indexing and retrieval, and WANDB_API_KEY for logging results. With those exported as environment variables, along with your OpenAI key for the judge model, you’ll have full access to datasets, models, and experiment tracking.

Step 1: Obtaining Data

For this project, we will use a subset of the MS MARCO dataset. MS MARCO is a large-scale benchmark originally designed for passage ranking and machine reading comprehension. Each example contains a user query along with multiple candidate passages drawn from web documents. Among these candidates, at least one passage is marked as selected, meaning it contains relevant information that can be used to answer the query. The rest are unselected and serve as negatives.
This structure makes MS MARCO especially valuable for RAG experiments because it mirrors the real-world search problem: when a user asks a question, only a few passages among many are useful. By having explicit labels for which passages are relevant, the dataset provides clear ground truth for both retrieval and evaluation.
Because the full dataset is massive, we will work with a smaller subset to keep the project manageable. Sampling around ten thousand examples gives us enough diversity to evaluate how different models and retrieval depths perform, while still being lightweight enough to run multiple experiments. The key point is that every query we include has at least one positive passage, ensuring we can always measure whether retrieval and generation succeeded.
To make the data easier to work with, we restructure it into JSONL format. Each row stores one query-passage pair, including fields for the query ID, the query text, the passage text, a flag indicating whether it is selected, and the source URL when available. Flattening the data like this removes the inconsistencies in the raw MS MARCO format and prepares it for direct indexing into Pinecone.
Here’s the code that will prepare our dataset:

from datasets import load_dataset
from pathlib import Path
from datetime import datetime
import json

CONFIG = "v1.1"
SPLIT = "train"
LIMIT = 10000 # set to None for all

OUT_DIR = Path("local_msmarco_jsonl")
PASSAGES_PATH = OUT_DIR / "passages.jsonl"
SELECTED_PATH = OUT_DIR / "selected_only.jsonl"
QUERIES_PATH = OUT_DIR / "queries.jsonl"
MANIFEST_PATH = OUT_DIR / "manifest.json"

OUT_DIR.mkdir(parents=True, exist_ok=True)

def normalize_example_rows(ex):
qid = ex.get("query_id")
qtext = ex.get("query", "") or ""
p = ex.get("passages", {})

rows = []

if isinstance(p, dict) and "passage_text" in p:
texts = p.get("passage_text", []) or []
selected = p.get("is_selected", []) or []
urls = p.get("url", []) or []
n = max(len(texts), len(selected or []), len(urls or []))

def safe(lst, i, default=""):
try:
return lst[i]
except Exception:
return default

for j in range(n):
text = (safe(texts, j, "") or "").strip()
if not text:
continue
rows.append({
"pid": f"{qid}#{j}",
"query_id": qid,
"query": qtext,
"passage_text": text,
"is_selected": int(safe(selected, j, 0) or 0),
"url": safe(urls, j, "") or ""
})

elif isinstance(p, list):
for j, item in enumerate(p):
text = (item.get("passage_text", "") or "").strip()
if not text:
continue
rows.append({
"pid": f"{qid}#{j}",
"query_id": qid,
"query": qtext,
"passage_text": text,
"is_selected": int(item.get("is_selected", 0)),
"url": item.get("url", "") or ""
})

return rows

def write_jsonl(path, rows_iter):
with open(path, "w", encoding="utf-8") as w:
for r in rows_iter:
w.write(json.dumps(r, ensure_ascii=False) + "\n")

def main():
ds = load_dataset("microsoft/ms_marco", CONFIG, split=SPLIT, streaming=True)

passages_buf = []
seen = 0

for ex in ds:
rows = normalize_example_rows(ex)
if not rows:
continue

positives = [r for r in rows if r["is_selected"] == 1]
negatives = [r for r in rows if r["is_selected"] != 1]

# add positives first
for r in positives:
if LIMIT is not None and seen >= LIMIT:
break
passages_buf.append(r)
seen += 1

# only add negatives if all positives were included
if (LIMIT is None or seen < LIMIT) and len(positives) == sum(1 for _ in positives):
for r in negatives:
if LIMIT is not None and seen >= LIMIT:
break
passages_buf.append(r)
seen += 1

if LIMIT is not None and seen >= LIMIT:
break

selected_buf = [r for r in passages_buf if r["is_selected"] == 1]

by_query = {}
for r in passages_buf:
qid = r["query_id"]
if qid not in by_query:
by_query[qid] = {"query": r["query"], "total": 0, "positives": 0}
by_query[qid]["total"] += 1
if r["is_selected"] == 1:
by_query[qid]["positives"] += 1

queries_rows = [
{"query_id": qid, "query": v["query"], "total_passages": v["total"], "positive_passages": v["positives"]}
for qid, v in by_query.items()
]

write_jsonl(PASSAGES_PATH, passages_buf)
write_jsonl(SELECTED_PATH, selected_buf)
write_jsonl(QUERIES_PATH, queries_rows)

manifest = {
"dataset": "microsoft/ms_marco",
"config": CONFIG,
"split": SPLIT,
"written_passages": len(passages_buf),
"written_selected": len(selected_buf),
"unique_queries": len(by_query),
"limit": LIMIT,
"timestamp": datetime.utcnow().isoformat() + "Z"
}
with open(MANIFEST_PATH, "w", encoding="utf-8") as w:
json.dump(manifest, w, ensure_ascii=False, indent=2)

print(f"Using microsoft/ms_marco {CONFIG} {SPLIT}")
print(f"Wrote {len(passages_buf)} passages")
print(f"Wrote {len(selected_buf)} selected passages")
print(f"Wrote {len(by_query)} unique queries")
print("Files:")
print(f" {PASSAGES_PATH}")
print(f" {SELECTED_PATH}")
print(f" {QUERIES_PATH}")
print(f" {MANIFEST_PATH}")

if __name__ == "__main__":
main()
Because the full dataset is massive, we will work with a smaller subset to keep the project manageable. In our case, we cap the subset at 10,000 query–passage pairs. Within that limit the script always includes at least one selected passage for every query that has one, and then fills the remaining space with unselected passages. This ensures that the dataset is balanced: every query has relevant ground truth for evaluation, but the overall size stays light enough to run multiple experiments efficiently.
To make the data easier to work with, we restructure it into JSONL format. Each row stores one query–passage pair, including fields for the query ID, the query text, the passage text, a flag indicating whether it is selected, and the source URL when available.

Uploading our data to Pinecone

With the dataset formatted into JSONL, the next step is to load it into Pinecone. Pinecone is responsible for storing the passages as dense vectors so they can be searched efficiently during retrieval. To make this simple, we use Pinecone’s integrated inference: when we create the index, we specify an embedding model and tell Pinecone which field to embed (in this case, passage_text). That way, embedding happens automatically when we upsert records.
Each passage record includes a unique ID, the passage text, the query ID, whether it was selected as relevant, and the source URL. We stream these records into Pinecone in manageable batches. The script also tracks token usage per batch to enforce a limit of 250,000 tokens per minute. This prevents overload and ensures indexing runs smoothly. If we hit rate limits, retries with exponential backoff make the process resilient.
Here’s the code:
import os, json, itertools
from pinecone import Pinecone
from pinecone.exceptions import PineconeApiException

API = os.environ.get("PINECONE_API_KEY") or os.environ.get("ONE_API_KEY")
if not API:
raise RuntimeError("set PINECONE_API_KEY or ONE_API_KEY")

INDEX_NAME = "msmarco-demo-v3"
NAMESPACE = "__default__"
DATA_PATH = "local_msmarco_jsonl/passages.jsonl"
BATCH = 96

pc = Pinecone(api_key=API)

try:
info = pc.create_index_for_model(
name=INDEX_NAME,
cloud="aws",
region="us-east-1",
embed={"model": "multilingual-e5-large", "field_map": {"text": "passage_text"}}
)
except PineconeApiException as e:
if getattr(e, "status", None) == 409:
info = pc.describe_index(name=INDEX_NAME)
else:
raise

index = pc.Index(host=info.host)

def iter_records(path):
with open(path, "r", encoding="utf-8") as f:
for i, line in enumerate(f):
o = json.loads(line)
t = (o.get("passage_text", "") or "").strip()
if not t:
continue
rid = str(o.get("pid") or f"{o.get('query_id','')}#{i}")
yield {
"id": rid,
"passage_text": t, # mapped to 'text'
"query_id": str(o.get("query_id", "")),
"url": o.get("url", ""),
"is_selected": int(o.get("is_selected", 0)),
}

def batched(it, n):
it = iter(it)
while True:
chunk = list(itertools.islice(it, n))
if not chunk:
return
yield chunk


import time


# --- Throttle to not exceed 250,000 tokens/minute ---
EMBEDDING_TOKEN_LIMIT_PER_MIN = 250_000
def count_tokens(text):
# Updated estimate: 1 word ≈ 3 tokens
return len(text.split()) * 3

count = 0
max_retries = 6 # exponential backoff: up to ~1min
rate_limit_delays = 0
tokens_this_minute = 0
minute_start = time.time()

for chunk in batched(iter_records(DATA_PATH), BATCH):
# Calculate tokens in this batch
batch_tokens = sum(count_tokens(r["passage_text"]) for r in chunk)

# Throttle if needed
now = time.time()
elapsed = now - minute_start
if elapsed > 60:
tokens_this_minute = 0
minute_start = now
if tokens_this_minute + batch_tokens > EMBEDDING_TOKEN_LIMIT_PER_MIN:
wait = 60 - elapsed if elapsed < 60 else 0
print(f"Token limit nearly exceeded ({tokens_this_minute + batch_tokens} > {EMBEDDING_TOKEN_LIMIT_PER_MIN}). Sleeping {wait:.1f}s...")
time.sleep(max(wait, 0))
tokens_this_minute = 0
minute_start = time.time()

for attempt in range(max_retries):
try:
index.upsert_records(namespace=NAMESPACE, records=chunk)
count += len(chunk)
tokens_this_minute += batch_tokens
break # success, move to next chunk
except PineconeApiException as e:
if getattr(e, "status", None) == 429:
wait = 2 ** attempt
rate_limit_delays += 1
print(f"Rate limit hit (429). Waiting {wait}s before retrying... (delay #{rate_limit_delays})")
time.sleep(wait)
else:
raise
else:
print("Failed to upsert after retries due to repeated 429 errors.")
break

print(f"re-upserted {count} into {INDEX_NAME}/{NAMESPACE}")
if rate_limit_delays:
print(f"Encountered rate limiting {rate_limit_delays} times during upserts.")

When this script runs, it checks if the index exists and creates one if necessary, tied to the multilingual-e5-large embedding model. It then streams through passages.jsonl, building batches of 96 records at a time and upserting them into Pinecone under a namespace. Each record is embedded automatically as it’s written.
The script throttles requests so the token budget is never exceeded and logs any time it has to wait or retry after a 429 error. At the end it prints a summary of how many passages were successfully upserted and whether any rate limiting occurred. At this point, the index is live and can be queried immediately, meaning we can move on to testing retrieval with top-k values and preparing to connect the retriever to our generator.

Step 3: Building a RAG module

Once the dataset is indexed in Pinecone, the next step is to connect retrieval with generation. This is where we build the RAG module, the component that takes a user query, fetches supporting passages, and produces a grounded answer. The goal is not just to generate text, but to generate text that directly uses the retrieved context.
The module handles three main tasks. It queries Pinecone and returns the top k passages for a question. It formats those passages into a structured context and passes them to a language model running on W&B Inference. And it logs every step into Weave, including the answer, the retrieved passages, token usage, and estimated cost. This way, every generation is reproducible and comparable across different models and retrieval depths.
In short, this module is the glue between the retriever and the generator. It makes sure the model only answers with evidence, gives visibility into where each answer came from, and records all the data needed to evaluate accuracy, latency, and cost.
Here’s the code:
import os, json, textwrap
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from pinecone import Pinecone
from pinecone.exceptions import PineconeApiException
import openai
import weave

def _to_dict(obj):
if isinstance(obj, dict):
return obj
return obj.to_dict() if hasattr(obj, "to_dict") else json.loads(
json.dumps(obj, default=lambda o: getattr(o, "__dict__", str(o)))
)

def _parse_hits(res_obj) -> List[Dict[str, Any]]:
d = _to_dict(res_obj)
if isinstance(d.get("result"), dict):
hits = d["result"].get("hits", []) or []
out = []
for h in hits:
fields = h.get("fields", {}) or {}
out.append({
"id": h.get("_id"),
"score": h.get("_score", 0.0),
"text": fields.get("passage_text", "") or "",
"url": fields.get("url", "") or "",
"qid": fields.get("query_id", "") or "",
})
return out
results = d.get("results") or d.get("received_data", {}).get("results") or []
if results:
hits = results[0].get("hits", []) or []
out = []
for h in hits:
hh = _to_dict(h)
rec = _to_dict(hh.get("record", {}))
md = rec.get("metadata", {}) or rec
out.append({
"id": rec.get("id") or hh.get("_id"),
"score": hh.get("score", 0.0),
"text": md.get("passage_text", "") or "",
"url": md.get("url", "") or "",
"qid": md.get("query_id", "") or "",
})
return out
return []

@dataclass
class RAGConfig:
model: str = "openai/gpt-oss-120b"
temperature: float = 0.0
top_k: int = 5
namespace: str = "__default__"
system_preamble: str = (
"Answer the user using only the provided context. "
"If the answer is not in the context, say you cannot find it."
)
wandb_project: str = "wandb_inference"
wandb_header_project: str = "wandb_fc/quickstart_playground"

class RAGModule:
def __init__(
self,
pinecone_index_name: str,
config: Optional[RAGConfig] = None,
use_rerank: bool = False,
):
self.cfg = config or RAGConfig()

api = os.environ.get("PINECONE_API_KEY") or os.environ.get("ONE_API_KEY")
if not api:
raise RuntimeError("set PINECONE_API_KEY or ONE_API_KEY")
pc = Pinecone(api_key=api)
try:
info = pc.describe_index(name=pinecone_index_name)
except PineconeApiException as e:
raise RuntimeError(f"pinecone index '{pinecone_index_name}' not found: {e}") from e
self.index = pc.Index(host=info.host)
self.use_rerank = use_rerank

self.client = openai.OpenAI(
base_url="https://api.inference.wandb.ai/v1",
api_key=os.getenv("WANDB_API_KEY"),
project="rag_demo",
default_headers={
"OpenAI-Project": "wandb_fc/quickstart_playground" # replace with your actual team/project
}
)

def retrieve(self, query: str, k: Optional[int] = None) -> List[Dict[str, Any]]:
k = int(k or self.cfg.top_k)
kwargs = {
"namespace": self.cfg.namespace,
"query": {"top_k": k, "inputs": {"text": query}},
"fields": ["passage_text", "url", "query_id"],
}
if self.use_rerank:
kwargs["rerank"] = {
"model": "pinecone-rerank-v0",
"rank_fields": ["passage_text"],
"top_n": min(5, k),
}
res = self.index.search(**kwargs)
return _parse_hits(res)

def _build_context(self, hits: List[Dict[str, Any]]) -> str:
parts = []
for i, h in enumerate(hits, 1):
txt = (h.get("text") or "").strip()
if not txt:
continue
parts.append(f"[{i}] {txt}")
blob = "\n\n".join(parts)
return blob

@weave.op
def generate(self, query: str, k: Optional[int] = None) -> Dict[str, Any]:
hits = self.retrieve(query, k)
context = self._build_context(hits)
prompt = f"Context:\n{context}\n\nQuestion: {query}\nAnswer:"
print(prompt, flush=True)

resp = self.client.chat.completions.create(
model=self.cfg.model,
temperature=self.cfg.temperature,
messages=[
{"role": "system", "content": self.cfg.system_preamble},
{"role": "user", "content": prompt},
],
)

# inline cost calc from resp.usage using known per token rates
raw = resp.model_dump() if hasattr(resp, "model_dump") else _to_dict(resp)
usage = raw.get("usage", {}) or {}
prompt_tokens = int(usage.get("prompt_tokens", 0) or 0)
completion_tokens = int(usage.get("completion_tokens", 0) or 0)
total_tokens = prompt_tokens + completion_tokens

# per token USD rates
pricing: Dict[str, Dict[str, float]] = {
"deepseek-ai/DeepSeek-V3.1": {"prompt": 0.55 / 1_000_000, "completion": 1.65 / 1_000_000},
"meta-llama/Llama-3.1-8B-Instruct": {"prompt": 0.22 / 1_000_000, "completion": 0.22 / 1_000_000},
"zai-org/GLM-4.5": {"prompt": 0.55 / 1_000_000, "completion": 2.00 / 1_000_000},
}

model_id = self.cfg.model
pr = pricing.get(model_id)
if pr:
prompt_cost = prompt_tokens * pr["prompt"]
completion_cost = completion_tokens * pr["completion"]
total_cost = prompt_cost + completion_cost
else:
prompt_cost = completion_cost = total_cost = 0.0

text = (resp.choices[0].message.content or "").strip()
return {
"query": query,
"top_k": k or self.cfg.top_k,
"answer": text,
"contexts": hits,
"usage": {
"model": model_id,
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens,
"prompt_cost_usd": prompt_cost,
"completion_cost_usd": completion_cost,
"total_cost_usd": total_cost,
},
}

if __name__ == "__main__":
weave.init("wandb_inference")
rag = RAGModule(pinecone_index_name="msmarco-demo-v3", config=RAGConfig())
q = "how long do you need for sydney and surrounding areas"
out = rag.generate(q)
print("\nQ:", q)
print("\nA:", textwrap.shorten(out["answer"], width=400))
print("\nContexts:", len(out["contexts"]))
print("\nUsage/Cost:", json.dumps(out["usage"], indent=2))

Step 4: Evaluating our RAG Module

Now we will evaluate our pipeline by systematically running it across different models and retrieval depths. The evaluation code begins by loading a pool of queries that each have at least one positive passage. For every combination of model and top-k value, it creates a RAG module, retrieves passages, and generates an answer. Each answer is then scored in two ways. First, retrieval quality is measured by checking if any of the positive passages appear in the retrieved set, either by ID or by text match. Second, correctness is judged by gpt-5, which compares the generated answer to the gold passages and outputs a binary score.
import os, sys, json, time, re, random
from pathlib import Path
from typing import Dict, List, Any, Tuple, Optional, Set
from dataclasses import dataclass
from openai import OpenAI

import weave
from weave import EvaluationLogger

# seed at the top
RANDOM_SEED = 0
random.seed(RANDOM_SEED)
os.environ["PYTHONHASHSEED"] = str(RANDOM_SEED)

# init Weave
client = weave.init("rag-evals")

# import rag module
try:
from rag_module import RAGModule, RAGConfig
except ModuleNotFoundError:
here = Path(__file__).resolve().parent
for p in [here, here.parent, here / "pinecone"]:
sp = str(p)
if sp not in sys.path:
sys.path.insert(0, sp)
from rag_module import RAGModule, RAGConfig

DATA_DIR = Path("local_msmarco_jsonl")
PASSAGES_PATH = DATA_DIR / "passages.jsonl"

PINECONE_INDEX_NAME = "msmarco-demo-v3"
MAX_UNIQUE_QUERIES = 30

MODEL_CANDIDATES = [
"meta-llama/Llama-3.1-8B-Instruct",
"deepseek-ai/DeepSeek-V3.1",
"zai-org/GLM-4.5",
]
TOPK_CANDIDATES = [5, 10, 20]



JUDGE_MODEL = "gpt-5"
SYSTEM = (
"You are a strict evaluator. "
"Decide only whether the candidate answer answers the question, "
"given the provided ground truth context. "
"Output JSON with a single field 'answers_query' as 1 or 0."
)
PROMPT = """Question:
{question}

Ground truth context:
{context}

Candidate answer:
{answer}

Task:
Does the candidate answer the question, given the context?

Respond with a single JSON object exactly like one of these:
Example 1: {{"answers_query": 1}}
Example 2: {{"answers_query": 0}}

Return only the JSON object.
"""

def _make_client():
if not os.getenv("OPENAI_API_KEY"):
raise RuntimeError("set OPENAI_API_KEY")
return OpenAI()

def _extract_json(s: str) -> dict:
i, j = s.find("{"), s.rfind("}")
if i == -1 or j == -1 or j < i:
return {"answers_query": 0}
try:
return json.loads(s[i:j+1])
except Exception:
return {"answers_query": 0}

@weave.op
def llm_judge(question: str, context: str, answer: str) -> dict:
oai = _make_client()
user = PROMPT.format(question=question.strip(), context=context.strip(), answer=answer.strip())
resp = oai.chat.completions.create(
model=JUDGE_MODEL,
messages=[{"role": "system", "content": SYSTEM},
{"role": "user", "content": user}],
)
raw = resp.choices[0].message.content or "{}"
return _extract_json(raw)

_ws = re.compile(r"\s+")
def _norm_query(q: str) -> str:
return _ws.sub(" ", (q or "").strip().lower())

def _norm_text(t: str) -> str:
return _ws.sub(" ", (t or "").strip().lower())

def _read_jsonl(path: Path):
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
yield json.loads(line)

def _build_pool_from_passages(path: Path) -> List[Tuple[str, Dict[str, Any]]]:
by_qtxt: Dict[str, Dict[str, Any]] = {}
for row in _read_jsonl(path):
if int(row.get("is_selected", 0) or 0) != 1:
continue
qtxt_raw = row.get("query", "") or ""
key = _norm_query(qtxt_raw)
if not key:
continue
if key not in by_qtxt:
by_qtxt[key] = {
"query_text": qtxt_raw,
"qids": set(),
"positives": [], # positive passage texts
"positive_ids": set(), # positive passage ids if present
}
qid = str(row.get("query_id", "")) or ""
if qid:
by_qtxt[key]["qids"].add(qid)
txt = (row.get("passage_text", "") or "").strip()
pid = row.get("pid") or row.get("passage_id") or row.get("id")
if txt:
by_qtxt[key]["positives"].append(txt)
if pid is not None:
by_qtxt[key]["positive_ids"].add(str(pid))
items: List[Tuple[str, Dict[str, Any]]] = []
for key, rec in by_qtxt.items():
if rec["positives"]:
rec["qids"] = sorted(rec["qids"])
rec["positive_ids"] = sorted(rec["positive_ids"])
items.append((key, rec))
items.sort(key=lambda kv: kv[0])
return items

def _maybe_set(obj: Any, attr: str, value: Any):
if value is None:
return
if hasattr(obj, attr):
setattr(obj, attr, value)

def _init_rag(index_name: str, model_name: Optional[str], top_k: Optional[int]) -> RAGModule:
cfg = RAGConfig()
_maybe_set(cfg, "model", model_name)
_maybe_set(cfg, "top_k", top_k)
return RAGModule(pinecone_index_name=index_name, config=cfg)

def _generate_with_rag(rag: RAGModule, question: str, top_k: Optional[int]) -> Tuple[Dict[str, Any], str, List[Dict[str, Any]]]:
try:
out = rag.generate(question, k=top_k)
except TypeError:
out = rag.generate(question)
answer_text = out.get("answer", "")
retrieved = out.get("contexts", []) or out.get("retrieved", []) or []
return out, answer_text, retrieved

def _extract_retrieved_ids(items: List[Dict[str, Any]]) -> Set[str]:
ids: Set[str] = set()
for r in items:
cand = (
r.get("id")
or r.get("_id")
or r.get("pid")
or (r.get("metadata", {}) or {}).get("pid")
or (r.get("metadata", {}) or {}).get("id")
)
if cand is not None:
ids.add(str(cand))
return ids

def _extract_retrieved_texts(items: List[Dict[str, Any]]) -> List[str]:
out: List[str] = []
for r in items:
t = (
r.get("text")
or r.get("passage_text")
or r.get("content")
or (r.get("metadata", {}) or {}).get("passage_text")
)
if t:
out.append(_norm_text(str(t)))
return out

def _hit_by_text(positives: List[str], retrieved_texts: List[str]) -> int:
if not positives or not retrieved_texts:
return 0
corpus = " \n ".join(retrieved_texts)
for pos in positives:
p = _norm_text(pos)
if p and p in corpus:
return 1
return 0

@dataclass
class _Rates:
prompt: float
completion: float

#### model costs in
_MODEL_COSTS = {
"deepseek-ai/DeepSeek-V3.1": _Rates(prompt=0.55/1_000_000, completion=1.65/1_000_000),
"meta-llama/Llama-3.1-8B-Instruct": _Rates(prompt=0.22/1_000_000, completion=0.22/1_000_000),
"zai-org/GLM-4.5": _Rates(prompt=0.55/1_000_000, completion=2.00/1_000_000),
}

def _extract_usage(u: Any) -> Tuple[int, int]:
if not isinstance(u, dict):
return 0, 0
pt = u.get("prompt_tokens") or u.get("input_tokens") or 0
ct = u.get("completion_tokens") or u.get("output_tokens") or 0
try:
return int(pt), int(ct)
except Exception:
return 0, 0

def run_eval_for_combo(items, model_name: str, top_k: int, project_name: str):
weave.init(project_name)

eval_logger = EvaluationLogger(
model=f"{model_name}_top{top_k}",
dataset=f"msmarco_unique_{len(items)}"
)

rag = _init_rag(PINECONE_INDEX_NAME, model_name=model_name, top_k=top_k)
rates = _MODEL_COSTS.get(model_name)

seen = 0
answered = 0
agg_hit = 0
t0 = time.time()

# shuffle evaluation order deterministically
rng = random.Random(RANDOM_SEED)
items_shuffled = items[:]
rng.shuffle(items_shuffled)

for key, rec in items_shuffled:
qtext = rec["query_text"]
gt_ctx_list = rec["positives"]
gt_ctx = "\n\n".join(gt_ctx_list)
positive_ids = set(map(str, rec.get("positive_ids", [])))

tq = time.time()
try:
out, answer_text, retrieved = _generate_with_rag(rag, qtext, top_k)
usage_blob = out.get("usage", {}) or out.get("llm_usage", {}) or {}
except Exception as e:
answer_text = f"GENERATION_ERROR: {e}"
retrieved = []
usage_blob = {}
gen_latency = time.time() - tq

# judge
try:
j = llm_judge(qtext, gt_ctx, answer_text)
flag = int(j.get("answers_query", 0) or 0)
except Exception as e:
j = {"answers_query": 0, "error": str(e)}
flag = 0

# retrieval hit
retrieved_ids = _extract_retrieved_ids(retrieved)
retrieved_texts = _extract_retrieved_texts(retrieved)
hit_by_id = 1 if positive_ids and (positive_ids & retrieved_ids) else 0
retrieval_hit = hit_by_id or _hit_by_text(gt_ctx_list, retrieved_texts)

agg_hit += retrieval_hit
seen += 1
if flag == 1:
answered += 1

ptoks, ctoks = _extract_usage(usage_blob)
total_cost_usd = None
total_cost_us_cents = None

if rates:
total_cost_usd = ptoks * rates.prompt + ctoks * rates.completion
total_cost_us_cents = (total_cost_usd * 100) if total_cost_usd else 0

pred_inputs = {
"query": qtext,
"gold_context": gt_ctx_list,
}
pred_output = {
"answer": answer_text,
"retrieved": retrieved,
"judge": j,
"gen_latency_sec": round(gen_latency, 4),
"total_cost_usd": total_cost_usd,
"total_cost_us_cents": total_cost_us_cents,

"retrieval": {
"retrieval_hit": bool(retrieval_hit),
},
}
pred = eval_logger.log_prediction(inputs=pred_inputs, output=pred_output)

pred.log_score(scorer="correctness", score=bool(flag))
pred.log_score(scorer="total_cost_usd", score=(total_cost_usd or 0.0))
pred.log_score(scorer="avg_cost_us_cents", score=(total_cost_us_cents or 0))
pred.log_score(scorer="retrieval_hit", score=bool(retrieval_hit))
pred.finish()

if seen % 20 == 0:
ans_rate = answered / max(1, seen)
hit_rate = agg_hit / max(1, seen)
print(
f"model={model_name}|topk={top_k} | {seen} evaluated, "
f"judge rate {ans_rate:.3f} | hit@k {hit_rate:.3f}",
flush=True
)

dt = time.time() - t0
ans_rate = answered / max(1, seen)
hit_rate = agg_hit / max(1, seen)

eval_logger.log_summary()

print(
f"done model={model_name}|topk={top_k}. "
f"{seen} unique queries. judge rate {ans_rate:.3f} | hit@k {hit_rate:.3f}"
)

def sanitize(name: str) -> str:
return re.sub(r"[^A-Za-z0-9_.-]+", "_", name)

def main():
if not PASSAGES_PATH.exists():
raise FileNotFoundError(f"missing {PASSAGES_PATH}")

items = _build_pool_from_passages(PASSAGES_PATH)
print(f"loaded {len(items)} unique queries with positives from passages.jsonl")
if not items:
print("no positives found. check that passages.jsonl has is_selected == 1 rows")
return
if MAX_UNIQUE_QUERIES is not None:
items = items[:MAX_UNIQUE_QUERIES]

project = "rag-evals"
for model_name in MODEL_CANDIDATES:
for k in TOPK_CANDIDATES:
run_eval_for_combo(items, model_name=model_name, top_k=k, project_name=project)

if __name__ == "__main__":
main()
In addition to correctness and retrieval hits, the code logs latency for each query, counts tokens for both prompt and completion, and calculates cost using model-specific rates. These details are captured for every prediction and sent to Weave through the EvaluationLogger. At the end of each configuration, a full summary is written to Weave, giving a clear view of correctness, efficiency, and cost.
This evaluation makes the trade-offs clear. You can see if increasing top-k from 5 to 20 actually raises correctness or if it only increases cost and latency. You can also compare how different models handle the same retrieved context, revealing which ones generate more accurate answers for a given budget. Together with Pinecone for indexing and W&B Inference for generation, this evaluation closes the loop and turns the pipeline into a system that is measurable and directly comparable across design choices.
After running the script, you can navigate to Weave to visualize the performance of your models. Here are the results for my evaluation:


Results

The experiments varied both retrieval depth (top-k = 5, 10, 20) and model choice (zai-org/GLM-4.5, deepseek-ai/DeepSeek-V3.1, meta-llama/Llama-3.1-8B-Instruct). Retrieval hit rates scaled directly with k: 0.833 at top-k=5, 0.967 at top-k=10, and 1.000 at top-k=20. This shows that increasing k improves the likelihood of including a relevant passage, though it also raises cost and latency.
Correctness showed more variation across models and settings. At top-k=5, DeepSeek performed best with 0.767, followed by GLM-4.5 at 0.700 and Llama-3.1 at 0.567. At top-k=10, GLM-4.5 led with 0.767, DeepSeek scored 0.733, and Llama reached 0.633. At top-k=20, GLM-4.5 again led at 0.733, DeepSeek came in at 0.700, and Llama improved to 0.667.
Two configurations stand out: DeepSeek at top-k=5 (0.767) and GLM-4.5 at top-k=10 (0.767). This suggests different models thrive under different retrieval depths. DeepSeek is efficient with smaller, cleaner contexts, while GLM-4.5 benefits more from moderate breadth. At top-k=20, correctness dipped slightly, likely due to the model being distracted by irrelevant passages even though retrieval recall was perfect.
Cost scaling with retrieval depth is also clear in the results. As k increases, costs rise because more passages are retrieved and processed, leading to larger prompts and higher token usage. This creates a trade-off: larger k values improve recall but also increase cost and latency, while smaller k values are more efficient but risk missing relevant context.
Overall, the results highlight the importance of balancing retrieval depth with model choice. DeepSeek shines at lean retrieval (k=5), GLM-4.5 peaks with moderate retrieval (k=10), and Llama improves steadily as k increases but lags behind the others. Larger k values guarantee coverage but also inflate costs, so the optimal configuration depends on whether the goal is maximum accuracy, minimum cost, or a balance of both.

The Weave Comparison View

Along with displaying aggregate scores, Weave lets you dig into individual predictions. You can click into a query, see the passages that were retrieved, the generated answer, the judgment from gpt-5, and the associated latency and cost. This traceability makes it easier to spot where a model missed relevant passages, produced an unsupported answer, or used more tokens than expected.

The comparison view also helps reveal strengths and weaknesses across models. For example, you might find that one model consistently answers more accurately when top-k is set to 10, while another requires k=20 to achieve similar performance but at higher cost. By inspecting detailed outputs, you can identify not just which configuration scores higher overall, but why one performs better on certain types of queries.
This kind of visibility is what makes Weave more than a logging tool. It becomes a way to reason about the entire pipeline, diagnose errors, and refine system design. With the dashboard in place, you can move beyond raw metrics and start understanding how retrieval and generation interact, guiding the choice of model and retrieval depth for production scenarios.

Conclusion

A well-built RAG pipeline does not just prove that retrieval plus generation works, it shows how design choices shape real performance. By experimenting with retrieval depth, model selection, and cost trade-offs, you begin to see the system less as a black box and more as a controllable machine. Pinecone, W&B Inference, and Weave together create a feedback loop where every query is both an answer and a data point. The conclusion is not that one model or one top-k setting is always best, but that the right combination depends on context, constraints, and goals. What matters is that you now have the tools to measure, compare, and refine, turning abstract ideas about RAG into a practical, evolving system.