Skip to main content

DSPy와 W&B Weave로 RAG 시스템 구축 및 평가

DSPy로 RAG 시스템을 구축하고 W&B Weave로 평가하는 가이드. 이 글은 AI 번역 기사입니다. 오역 가능성은 댓글로 알려주세요.
Created on September 12|Last edited on September 12
검색 증강 생성 (RAG) 외부 데이터 소스를 통합해 더 풍부하고 적합한 출력을 생성함으로써 언어 모델의 동작 방식을 혁신하고 있습니다. 이 튜토리얼에서는 프롬프트 엔지니어링의 복잡성을 단순화하도록 설계된 강력한 프레임워크인 DSPy를 사용해 RAG 애플리케이션을 구축하고 최적화하는 방법을 살펴봅니다.
이 글에서는 RAG와 DSPy를 각각 이해할 수 있도록 역할을 설명하고, 두 기술을 결합해 고성능 AI 시스템을 구축하는 방법을 안내합니다. 챗봇으로 최신 내부 문서를 유지 관리하거나 실시간 뉴스 데이터를 연동해 주식 시장 심리 분석에 활용하고자 한다면, DSPy를 익히는 것이 RAG 구현을 효율화하는 데 큰 도움이 됩니다. 또한 Weave 평가를 사용해 RAG 파이프라인의 성능을 측정하고 개선함으로써, 모델이 정확하고 신뢰할 수 있는 출력을 제공하도록 보장하겠습니다.
Jump to the tutorial




목차



검색 증강 생성(RAG) 이해하기

검색 증강 생성은 언어 모델이 실시간의 외부 정보를 검색해 응답에 반영할 수 있게 함으로써 모델의 역량을 강화합니다. 이 접근법은 정적 데이터셋에만 의존하는 사전 학습 모델의 한계를 보완하여, 출력이 일관성 있을 뿐만 아니라 최신성도 갖추도록 보장합니다. RAG는 검색과 생성을 결합하는 두 가지 핵심 과정으로 동작합니다.
먼저 외부 데이터베이스나 리포지토리에서 관련 정보를 검색하고, 이어서 검색된 콘텐츠를 바탕으로 출력을 생성합니다. 이 이중 과정은 금융 뉴스나 고객 지원 시스템처럼 시의성과 정확성이 필수적인 빠르게 변화하는 영역에서도 모델이 정확하고 관련성 있게 유지되도록 돕습니다.

RAG가 언어 모델을 향상시키는 방법

RAG는 언어 모델의 신뢰성과 활용도를 크게 향상시킵니다. 전통적인 모델은 종종 오래된 지식으로 인해 어려움을 겪고, 잘못된 정보를 생성하는 이른바 “환각”을 일으키기도 합니다. RAG는 추론 시 최신 데이터를 검색해 응답을 근거 자료에 기반하도록 함으로써 이러한 문제를 완화합니다.
예를 들어, RAG 기반 내부 챗봇은 직원들에게 최신 정책 정보를 제공할 수 있고, 뉴스 추적 시스템은 주식 시장 분석을 위해 실시간 감성 데이터를 추출할 수 있습니다. 이러한 이유로 RAG는 변화가 빠른 환경에서 특히 가치가 크며, AI 시스템에 대한 사용자 신뢰와 경험을 향상시킵니다.

DSPy란 무엇인가요?

DSPy는 프롬프트 엔지니어링을 구조화된 프로그래밍 방식으로 추상화하여 RAG 파이프라인 구현을 단순화하는 프레임워크입니다. 개발자는 모듈식 구성 요소를 사용해 작업 동작을 정의할 수 있으며, 이를 통해 RAG 시스템 관리의 복잡성을 줄이는 동시에 모델의 일관성과 효율을 높일 수 있습니다.

DSPy의 핵심 구성 요소

DSPy는 검색에서 생성에 이르는 데이터 흐름을 관리하기 위한 프레임워크를 제공합니다. 이 프레임워크에는 시그니처(Signatures), 모듈(Modules), 옵티마이저(Optimizers)와 같은 몇 가지 핵심 구성 요소가 포함됩니다.
시그니처 개발자가 작업의 입력과 출력을 명확하게 정의할 수 있게 하여, 모델이 일관되게 동작하도록 보장합니다. 다음은 코드에서 시그니처를 정의하는 방법의 예시입니다:
class GenerateAnswer(dspy.Signature):
"""Signature for generating answers based on retrieved context."""
context = dspy.InputField(desc="Relevant facts") # Input field for facts
question = dspy.InputField() # Input field for user questions
answer = dspy.OutputField(desc="Short fact-based answer") # Output field for the generated answer
이 예시에서 GenerateAnswer 시그니처는 모듈이 질문과 컨텍스트를 입력으로 받고, 짧은 답변을 출력으로 생성하도록 보장합니다. 이러한 구조는 RAG 파이프라인 전반에서 입력과 출력의 일관성을 보증합니다.
모듈 체인 오브 소트(Chain-of-Thought)나 ReAct와 같은 다양한 프롬프트 전략을 캡슐화하여 사용 사례에 따라 교체할 수 있습니다. 옵티마이저 미세 조정 시스템 전반을 대상으로 최소한의 데이터로도 지시문과 매개변수를 정교화하여 최적의 성능을 달성합니다. 다음은 RAG 모듈의 예시입니다:
class RAG(dspy.Module):
def __init__(self, num_passages=3):
super().__init__()
self.retrieve = dspy.Retrieve(k=num_passages) # Retrieve relevant content
self.generate_answer = dspy.ChainOfThought(GenerateAnswer) # Use the Chain-of-Thought method

def forward(self, question: str):
context = self.retrieve(question).passages # Fetch top-k relevant passages
prediction = self.generate_answer(context=context, question=question) # Generate answer
return dspy.Prediction(context=context, answer=prediction.answer)
이 모듈은 데이터베이스에서 관련 구절을 검색하고 Chain-of-Thought 전략을 사용해 응답을 생성합니다. 검색과 응답 생성을 조율하는 데 필요한 로직을 캡슐화합니다.
DSPy 옵티마이저 는 DSPy 프로그램 내의 다양한 매개변수를 특정 지표에 따라 미세 조정하여 작업 성능을 향상시키도록 설계되었습니다. 이러한 최적화는 정확도나 재현율과 같은 지표로 측정되는 오류 최소화 또는 성능 극대화에 초점을 맞춥니다. 과정은 지시형 프롬프트뿐 아니라 가능한 경우 기반 언어 모델의 매개변수까지 반복적으로 조정하는 것을 포함합니다. 이 다단계 최적화는 가중치 수정을 위한 경사 하강법과 같은 방법은 물론, 작업 데모와 프롬프트 구조의 이산적 튜닝을 수반할 수 있습니다. 이러한 자동화된 개선은 효과적인 프롬프트를 보다 효율적으로 생성하게 하며, 수작업으로는 재현하기 어려운 성과를 종종 달성합니다.
DSPy 프로그램은 여러 모듈로 구성되며, 각 모듈은 작업 흐름에서 특정 로직을 담당합니다. 이 프레임워크의 옵티마이저는 전통적인 딥러닝 시스템에서 옵티마이저가 모델의 매개변수를 최적화하듯, 시스템의 매개변수를 최적화하는 것을 목표로 합니다. 로컬 모델처럼 가중치 수정이 가능한 경우, 옵티마이저는 언어 모델의 내부 가중치를 조정하여 작업 성과를 개선할 수 있습니다. API를 통해 접근하는 모델이거나 가중치 튜닝이 불가능한 경우에는 지시형 프롬프트를 정교화하고 작업 특화 데모를 최적화하는 데 초점을 맞춥니다. 모델의 응답을 유도하는 예시로서의 이러한 데모는 반복적 최적화를 통해 생성되고 개선되며, 작업 요구사항에 지속적으로 부합하도록 유지되고 다양한 입력 간의 일관성을 높이도록 설계됩니다.
DSPy 옵티마이저는 프로그램의 구조와 모듈 간 상호작용을 정교화하는 데 큰 역할을 합니다. 일관성을 높이기 위해 프롬프트와 지시문을 자주 조정하여 모델이 입력을 의도대로 해석하도록 보장합니다. 데몬스트레이션이 포함된 경우, 옵티마이저는 원하는 동작을 강화하는 예시를 선택하거나 생성하고, 이를 사전에 정의된 지표로 검증합니다. 가중치 튜닝이 가능한 시나리오에서는, 옵티마이저가 최적화된 프롬프트 구조에 맞추어 모델의 내부 파라미터를 직접 업데이트합니다. 프롬프트 최적화, 작업 데몬스트레이션 정교화, 선택적 가중치 수정이 결합된 이 접근은 사전 학습 모델, 독점 솔루션, API 기반 시스템 등 다양한 요구에 적응하는 유연한 프레임워크를 제공합니다.
각 최적화 단계는 진행 상황을 측정하기 위해 지표에 크게 의존합니다. 이러한 지표는 단순한 정확도 점수부터, 작업 수행 정도를 평가하는 복잡한 로직까지 다양합니다. 반복적인 과정을 통해 DSPy 옵티마이저는 전체 파이프라인의 효율을 높여 오류를 줄이고 출력의 관련성과 품질을 개선합니다. 모듈 간 상호작용을 최적화하든, 프롬프트를 조정하든, 가중치를 직접 미세 조정하든, DSPy는 고성능 언어 모델 애플리케이션을 구축하기 위한 견고한 프레임워크를 제공합니다.

DSPy와 W&B Weave 사용하기

DSPy는 다음과 매끄럽게 통합됩니다 Weave, Weights & Biases의 경량형 툴킷으로, 최소한의 설정만으로도 언어 모델 애플리케이션을 추적하고 평가할 수 있게 해줍니다. 이 통합을 통해 DSPy RAG 시스템 내의 모든 상호작용—모델 호출, 입력, 출력, 트레이스—이 자동으로 로깅되어 워크플로 전체를 한눈에 파악할 수 있습니다. 이는 본질적으로 실험적인 AI 시스템 개발 과정에 구조와 가시성을 부여하여, 불필요한 오버헤드를 추가하지 않고도 효율적으로 디버깅하고 견고한 평가를 구축할 수 있도록 돕습니다.
Weave와 함께 DSPy를 설정하려면 Weave 라이브러리를 임포트하고 프로젝트 내에서 초기화하기만 하면 됩니다. 다음을 호출하여 weave.init("project_name") DSPy 프로그램 시작 시점에 Weave를 초기화하면, Weave가 RAG 파이프라인 내 모든 작업을 자동으로 수집하고 로깅합니다. 여기에는 모듈 간 모든 상호작용, 리트리벌과 생성 과정, 그리고 입력과 출력에 대한 상세 트레이스가 포함됩니다. 이 통합을 통해 실험, 평가, 프로덕션의 모든 단계가 한곳에서 체계적으로 정리되고 접근 가능해져, 원활한 반복 개선과 진행 상황 추적을 지원합니다.
Weave를 DSPy에 통합하면 모델의 동작을 실시간으로 모니터링하고, 문제를 더 효과적으로 디버깅하며, 서로 다른 모델이나 구성의 성능을 비교해 엄격한 평가를 구축할 수 있습니다. Weave를 임포트하고 초기화한 뒤 DSPy RAG 파이프라인을 실행하면, Weave에서 시스템 성능을 열람하고 시각화할 수 있습니다.
제가 만든 RAG 시스템 중 하나에서 나온 로그 예시는 다음과 같습니다:

보시듯이 DSPy 호출의 전체 시퀀스가 Weave에 로깅되어, 시스템이 어떻게 동작하는지 손쉽게 분석할 수 있습니다!

DSPy로 RAG 파이프라인 구축하기

DSPy로 효과적인 RAG 파이프라인을 구축하려면 리트리벌 모델과 언어 모델을 모두 일관되게 작동하도록 구성해야 합니다. 언어 모델과 달리, DSPy에는 기본 제공되는 벡터 데이터베이스 리트리버 인프라가 포함되어 있지 않습니다. 따라서 API를 통해 외부 벡터 데이터베이스에 연결해야 합니다.
Weaviate는 강력한 벡터 검색 엔진으로, 이 인프라를 구축하는 데 훌륭한 선택지입니다. DSPy 내 리트리벌 모듈로 Weaviate를 구성하여 쿼리에 대한 관련 문단을 가져올 수 있습니다.

RAG 파이프라인을 위한 Weaviate 설정하기

Weaviate를 시작하려면 계정을 만들고 클러스터를 설정해야 합니다. Weaviate는 실험에 적합한 14일 무료 체험을 제공합니다. 클러스터 설정을 마치면 DSPy 파이프라인을 Weaviate 인스턴스에 연결하기 위한 클러스터 URL이 필요합니다. Weaviate 계정을 만든 뒤 “Create a Cluster”로 이동하면 아래에 표시된 화면으로 안내되며, 여기에서 “Sandbox” 클러스터를 생성할 수 있습니다.


Weaviate에 문서 업로드하기

이 튜토리얼에서는 업로드할 데이터로 W&B Weave 문서의 일부를 사용합니다. 아래 코드는 문서를 다운로드한 뒤 더 작은 청크로 분할하고, Cohere 임베딩을 사용하여 사용자의 Weaviate 클러스터에 업로드합니다.
또한 Weaviate 내에서 임베딩 모듈로 Cohere를 구성합니다. Cohere는 강력한 다국어 임베딩을 제공하며, 텍스트를 고밀도 벡터 표현으로 변환하는 데 적합합니다. 이러한 임베딩을 통해 벡터 데이터베이스는 의미적 유사성에 기반해 텍스트를 저장하고 검색할 수 있어, RAG 파이프라인이 가장 관련성 높은 정보를 가져오도록 보장합니다.
import os
import re
import glob
import weaviate
import weaviate.classes.config as wvcc

# Function to download the Weave documentation .md files
def download_weave_docs():
repo_dir = "weave_docs"
if not os.path.exists(repo_dir):
print("Downloading Weave documentation...")
os.system(f"git clone --depth 1 --filter=blob:none --sparse https://github.com/wandb/weave.git {repo_dir}")
os.chdir(repo_dir)
os.system("git sparse-checkout set docs/docs/guides/tracking")
os.chdir("..")
print("Weave documentation downloaded successfully.")
else:
print(f"{repo_dir} already exists, skipping download.")

# Function to read and process the .md files
def process_md_files():
md_files = glob.glob('weave_docs/docs/docs/guides/tracking/*.md')
all_chunks = []

for md_file in md_files:
with open(md_file, 'r', encoding='utf-8') as f:
content = f.read()
# Clean the content if necessary
content = clean_markdown(content)
# Split content into sentences or paragraphs
chunks = split_into_chunks(content, chunk_size=200)
all_chunks.extend(chunks)
return all_chunks

# Function to clean markdown content
def clean_markdown(text):
"""Remove unnecessary elements but retain code, symbols, and structure."""
text = re.sub(r'!\[.*?\]\(.*?\)', '', text) # Remove images
text = re.sub(r'\[([^\]]+)\]\((.*?)\)', r'\1', text) # Keep link text but remove URLs
text = text.strip()
return text

# Function to split text into chunks of specified character length
def split_into_chunks(text, chunk_size=200):
words = text.split()
chunks = []
current_chunk = []
current_length = 0

for word in words:
current_chunk.append(word)
current_length += len(word) + 1 # +1 for space
if current_length >= chunk_size:
chunks.append(' '.join(current_chunk))
current_chunk = []
current_length = 0

if current_chunk:
chunks.append(' '.join(current_chunk))

return chunks

# Main execution
if __name__ == "__main__":
# Step 1: Download Weave documentation
download_weave_docs()

# Step 2: Process the .md files
print("Processing .md files...")
content_chunks = process_md_files()
print(f"Total content chunks: {len(content_chunks)}")


# Step 3: Connect to Weaviate instance
client = weaviate.connect_to_wcs(
cluster_url="your weaviate cluster url ", # Replace with your WCS URL
auth_credentials=weaviate.auth.AuthApiKey("your weaviate auth key"), # Replace with your WCS key
headers={
'X-Cohere-Api-Key': "your cohere api key" # Replace with your Cohere API key
}
)

try:
# CAUTION: Running this will delete the collection along with the objects
# Delete the collection if it exists
collection_name = "WeaveDocsChunk"
if client.collections.exists(collection_name):
client.collections.delete(collection_name)
print(f"Existing {collection_name} collection deleted.")

# Create the collection with the specified vectorizer configuration and properties
collection = client.collections.create(
name=collection_name,
vectorizer_config=wvcc.Configure.Vectorizer.text2vec_cohere(
model="embed-multilingual-v3.0"
),
properties=[
wvcc.Property(name="content", data_type=wvcc.DataType.TEXT),
wvcc.Property(name="source", data_type=wvcc.DataType.TEXT),
]
)
print(f"{collection_name} collection created successfully.")

# Import Objects
collection_ref = client.collections.get(collection_name)

for idx, content_chunk in enumerate(content_chunks):
collection_ref.data.insert(
properties={
"content": content_chunk,
"source": "WeaveDocs"
}
)
print(f"Uploaded chunk {idx + 1}/{len(content_chunks)}")

print("Content chunks successfully imported into Weaviate!")

finally:
# Close the client connection
client.close()
print("Client connection closed.")
이 예시에서는 Cohere의 text2vec 문서 청크를 의미 있는 벡터 표현으로 변환하기 위한 임베딩입니다. 이러한 임베딩을 통해 Weaviate는 청크를 유사도 기반 검색이 효율적으로 가능하도록 저장할 수 있습니다. 이 설정은 쿼리 시 RAG 파이프라인이 맥락적으로 가장 관련성 높은 정보를 검색하도록 보장합니다.
이 리트리벌 모델로 DSPy를 구성하면 파이프라인이 Weaviate와 Cohere를 함께 활용해 관련 문서를 동적으로 가져올 수 있습니다. 문서 업로드를 마쳤다면, 다음 단계는 언어 모델과 리트리버가 자연스럽게 연동되도록 DSPy 설정을 구성하는 것입니다. 이 강력한 조합을 통해 파이프라인은 최신 데이터에 기반한 정확하고 맥락 인지적인 응답을 제공합니다.

DSPy로 RAG 파이프라인 구축하기

이제 Weaviate로 벡터 데이터베이스를 만들었으니, DSPy로 RAG 파이프라인을 구축할 준비가 되었습니다.
먼저 Weaviate의 API와 통합되는 사용자 지정 리트리벌 모델을 구성합니다. DSPy는 ColBERTv2, Pinecone, Azure Cognitive Search 같은 기본 제공 리트리벌 모듈을 제공하지만, 사용자 지정 리트리벌 클라이언트를 직접 구현할 수도 있습니다. 이러한 클라이언트는 쿼리를 처리하고 각 쿼리와 관련성이 높은 상위 k개의 패시지를 반환하는 역할을 하며, 모든 RAG 파이프라인에서 핵심 구성 요소입니다. 아래는 Weaviate로 사용자 지정 리트리벌 모델을 구현하는 방법입니다.
먼저 Weaviate 클라이언트를 초기화하고, 사용자의 Weaviate 클라우드 인스턴스에 연결되도록 구성합니다. 이 클라이언트는 지정된 컬렉션에서 관련 데이터를 검색하도록 RAG 시스템을 활성화합니다:
import weaviate
import weave; weave.init("dspy-inference")

# Connect to Weaviate cloud instance
client = weaviate.connect_to_wcs(
cluster_url="your-cluster-url", # Replace with your WCS URL
auth_credentials=weaviate.auth.AuthApiKey("your-api-key"), # Replace with your WCS API key
headers={'X-Cohere-Api-Key': "your-api-key"} # Replace with your Cohere API key
)

# Create a WeaviateRM client for retrieval
retriever_model = WeaviateRM(
weaviate_collection_name="WeaveDocsChunk",
weaviate_client=client,
k=5
)
여기에서, WeaviateRM 모듈은 리트리버 역할을 하여 벡터 데이터베이스에 접근하고 주어진 쿼리에 대해 상위 k개의 패시지를 가져옵니다. 이 모듈은 언어 모델과 함께 RAG 파이프라인의 핵심을 이룹니다. 이제 언어 모델을 구성해 보겠습니다. 예시로 GPT-4 mini를 사용하겠습니다:
# Configure Language Model
llm = dspy.OpenAI(
model='gpt-4o-mini',
api_key="your-api-key"
)
언어 모델과 리트리벌 모델을 모두 준비했으면, DSPy 설정을 통해 두 구성 요소를 연결합니다. 이 구성은 리트리버가 관련 컨텍스트를 가져오고 언어 모델이 최종 답변을 생성하도록 두 요소가 동기화되어 작동하게 합니다:
# Configure dspy settings
dspy.settings.configure(lm=llm, rm=retriever_model)
이제 리트리벌과 언어 모델 구성을 효과적으로 활용하기 위해 시그니처와 RAG 모듈을 정의해 보겠습니다:
class GenerateAnswer(dspy.Signature):
"""Answer questions with short factoid answers."""
context = dspy.InputField(desc="Relevant facts")
question = dspy.InputField()
answer = dspy.OutputField(desc="Short answer")

class RAG(dspy.Module):
def __init__(self, num_passages=3):
super().__init__()
self.retrieve = dspy.Retrieve(k=num_passages)
self.generate_answer = dspy.ChainOfThought(GenerateAnswer)

def forward(self, question: str):
# Retrieve relevant passages
context = self.retrieve(question).passages
# Generate an answer using the retrieved context
prediction = self.generate_answer(context=context, question=question)
return dspy.Prediction(context=context, answer=prediction.answer)
이 설정은 검색된 컨텍스트를 바탕으로 답변을 생성할 때 포함되는 필드를 명시하는 GenerateAnswer 시그니처를 정의합니다. RAG 모듈은 사용자 지정 Weaviate 리트리버로 데이터를 검색하는 단계에서 구성된 GPT-4 모델로 답변을 생성하는 단계까지의 흐름을 오케스트레이션합니다. 이러한 구조를 통해 RAG 파이프라인은 관련 정보를 가져오는 것부터 정확한 답변을 제공하는 것까지 전체 사이클을 수행할 수 있습니다.
Weaviate를 리트리버로, GPT-4 mini 같은 언어 모델과 결합하면 관련 정보를 검색하고 정확하며 문맥을 이해한 응답을 생성할 수 있는 RAG 파이프라인을 구축할 수 있습니다. DSPy의 유연한 구성 방식은 리트리버나 언어 모델을 손쉽게 교체할 수 있게 해 주어, 핵심 로직을 다시 손대지 않고도 다양한 설정을 실험할 수 있게 합니다.

RAG 프로그램 컴파일 및 실행

이제 DSPy를 사용해 Retrieval-Augmented Generation(RAG) 파이프라인을 컴파일하는 과정을 단계별로 살펴보겠습니다. 옵티마이저가 프로그램 성능을 효과적으로 개선하려면 학습용 데이터셋과 테스트용 데이터셋이 모두 필요합니다. 학습 세트는 옵티마이저가 파라미터를 미세 조정하는 데 도움을 주고, 테스트 세트는 시스템이 보지 못한 데이터에 얼마나 잘 일반화하는지 평가합니다. 이 튜토리얼에서는 데이터셋을 학습 세트와 평가 세트로 분할하고, 서식화된 예시들을 통해 옵티마이저의 탐색을 안내합니다. 아래에는 RAG 파이프라인을 효율적으로 구축하고 컴파일하는 데 도움이 되는 전체 설정 개요가 제공됩니다.
데이터셋은 다음 기준으로 분할됩니다 train_test_split 학습용 20개 예시와 테스트용 10개 예시로 분할합니다. 각 예시는 질문과 기준 답변을 포함합니다. 이러한 구조화된 형식은 옵티마이저가 예측을 정답과 효과적으로 비교·평가할 수 있도록 보장합니다. 또한 우리는 다음을 정의합니다 GenerateAnswer 시그니처로, 입력 질문과 컨텍스트 같은 예상 입력과 기대되는 짧은 답변 등 입력-출력 구조를 명시합니다. 이는 RAG 모듈이 관련 콘텐츠를 검색하고 일관된 응답을 생성하도록 안내합니다.
RAG 파이프라인은 다음을 정의하여 구축됩니다 RAG 모듈로, 검색과 생성 로직을 통합합니다. 검색 구성 요소는 입력 질문을 바탕으로 가장 관련성 높은 구절들을 가져오고, 생성 구성 요소는 검색된 컨텍스트를 활용해 답변을 생성합니다. 파이프라인 최적화를 보장하기 위해 사실 정확도를 평가하는 사용자 지정 지표를 도입합니다. 이 지표는 생성된 응답을 기준 답변과 비교해 사실적 정확성을 판정함으로써, 모델이 신뢰할 수 있는 정보를 출력하도록 합니다.
옵티마이저를 컴파일하는 코드는 다음과 같습니다:
# Step 1: Import necessary libraries and modules
import dspy
from dspy.teleprompt import BootstrapFewShot
from dspy.evaluate.evaluate import Evaluate
from sklearn.model_selection import train_test_split
import pandas as pd
import random
import weave
import weaviate
from dspy.retrieve.weaviate_rm import WeaviateRM
from dspy.primitives.example import Example

# Import Ragas metric for factual correctness
from ragas.llms import LangchainLLMWrapper
from ragas.dataset_schema import SingleTurnSample
from ragas.metrics._factual_correctness import FactualCorrectness
from langchain_openai import ChatOpenAI
import asyncio
import nest_asyncio

# Apply nest_asyncio to avoid event loop conflicts in Weave
nest_asyncio.apply()


weave.init(project_name="dspy")
SEED = 42
random.seed(SEED)

dataset_path = './generated_testset.csv'

# Step 2: Configure the Language Model (LM) and Retrieval Model (RM)
llm = dspy.OpenAI(model='gpt-4o-mini', api_key="sk-")

client = weaviate.connect_to_wcs(
cluster_url="", # Replace with your WCS URL
auth_credentials=weaviate.auth.AuthApiKey(""), # Replace with your WCS key
headers={
'X-Cohere-Api-Key': "" # Replace with your Cohere API key
}
)

retriever_model = WeaviateRM(
weaviate_collection_name="WeaviateBlogChunk", # Use 'class_name' instead of 'collection_name'
weaviate_client=client,
k=5 # Number of top results to retrieve
)

dspy.settings.configure(lm=llm, rm=retriever_model)

# Initialize the evaluator LLM using Langchain and OpenAI GPT-4 model
evaluator_llm = LangchainLLMWrapper(
ChatOpenAI(model="gpt-4o-mini", openai_api_key="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")
)

# Step 3: Load and Format Your Custom Dataset
df = pd.read_csv("/Users/brettyoung/Desktop/dev_24/tutorials/ds_py/v2/generated_testset.csv")

def format_dataset(df):
"""Format the dataset into DSPy-compatible examples with initialized inputs."""
examples = []
for _, row in df.iterrows():
example = Example({
'question': row['user_input'],
'answer': row['reference']
}).with_inputs('question') # Input key as 'question'
examples.append(example)
return examples

# Split the dataset into training and evaluation sets
train_df, eval_df = train_test_split(df, train_size=20, test_size=10, random_state=SEED)

trainset = format_dataset(train_df)
devset = format_dataset(eval_df)

print(f"Trainset Size: {len(trainset)}, Devset Size: {len(devset)}")
print(f"First Trainset Example: {trainset[0]}")

# Step 4: Define the Signatures for the RAG Pipeline
class GenerateAnswer(dspy.Signature):
"""Answer questions with 1-3 sentence answers."""
context = dspy.InputField(desc="may contain relevant facts")
question = dspy.InputField()
answer = dspy.OutputField(desc="Answer in 1-3 sentences")

# Step 5: Build the RAG Pipeline
class RAG(dspy.Module):
def __init__(self, num_passages=5):
super().__init__()
self.retrieve = dspy.Retrieve(k=num_passages)
self.generate_answer = dspy.ChainOfThought(GenerateAnswer)

def forward(self, question):
context = self.retrieve(question).passages
prediction = self.generate_answer(context=context, question=question)
return dspy.Prediction(context=context, answer=prediction.answer)

# Step 6: Define the Factual Correctness Metric
def factual_correctness_metric(example, pred, trace=None):
"""Use Ragas factual correctness metric."""
# Extract response and reference correctly
response = pred.answer # The model-generated answer
reference = example.answer # The ground truth reference

# Create the sample in the correct format
sample = SingleTurnSample(
response=response,
reference=reference
)

# Initialize and calculate the factual correctness score
factual_correctness_metric = FactualCorrectness(llm=evaluator_llm)

loop = asyncio.get_event_loop()
score = loop.run_until_complete(factual_correctness_metric.single_turn_ascore(sample=sample))

score = factual_correctness_metric.single_turn_score(sample=sample)
print("#"*20); print(score); print("#"*20)
# Return numerical score (0-1 range) for optimization
return score

# Step 7: Compile the RAG Program with the Custom Metric and Teleprompter
teleprompter = BootstrapFewShot(
metric=factual_correctness_metric, # Use the factual correctness metric
metric_threshold=0.7 # Accept scores >= 0.5
)

compiled_rag = teleprompter.compile(RAG(), trainset=trainset)

# Optional: Save the compiled RAG program
save_path = './compiled_rag_program_v1.json'
compiled_rag.save(save_path)

print(f"RAG Program compiled and saved to {save_path}.")
컴파일 과정은 BootstrapFewShot 옵티마이저로, 사실 정확도 지표를 활용해 최적화 과정을 안내합니다. 옵티마이저는 반복 평가를 수행하며, 점수가 0.75 임계값을 넘는 예측만 채택해 파이프라인을 정교화하고 성능을 극대화합니다. 컴파일이 끝나면 RAG 프로그램을 향후 사용을 위해 저장하여, 전체 모델을 재학습하지 않고도 시스템을 추가로 개선하거나 배포할 수 있도록 합니다.
이 설정이 완료되면 RAG 파이프라인은 정확하고 컨텍스트를 반영한 응답을 생성할 준비가 됩니다. 체계적으로 구성된 시그니처, 최적화된 검색 및 생성 모듈, 신뢰할 수 있는 평가 지표의 조합이 파이프라인의 효과적인 성능을 보장합니다. 컴파일된 버전은 다음으로 저장됩니다 compiled_rag_program_v1.json, 향후 반복에서 손쉬운 재배포와 추가 최적화를 가능하게 합니다.

Weave 평가로 성능 평가하기

이 섹션에서는 다음을 사용하여 우리의 RAG 파이프라인 성능을 평가합니다 Weave 평가이를 통해 사실 정확성과 같은 핵심 지표를 바탕으로 파이프라인의 효과성을 측정할 수 있습니다. 테스트 데이터셋을 사용해 기대 출력과 RAG 시스템이 생성한 출력을 비교함으로써, 파이프라인이 의도한 대로 동작하고 고품질의 응답을 제공하는지 확인합니다.
다음은 평가 프로세스를 설정하고 실행하기 위한 전체 코드입니다. 여기에는 데이터셋 로딩, 평가용 포맷팅, 그리고 Weave와 RAGAS 지표를 사용해 필요한 평가 함수들을 정의하는 작업이 포함됩니다.
import os
import asyncio
import pandas as pd
import random
import weave
from weave import Evaluation, Model

# Import your RAG pipeline components
from dspy.primitives.example import Example
import dspy
from dspy.datasets import HotPotQA
from dspy.teleprompt import BootstrapFewShot
from dspy.evaluate.evaluate import Evaluate
from sklearn.model_selection import train_test_split
import weaviate
from dspy.retrieve.weaviate_rm import WeaviateRM

# Import RAGAS metrics and related classes
from ragas.llms import LangchainLLMWrapper
from langchain.chat_models import ChatOpenAI
from ragas.dataset_schema import SingleTurnSample
from ragas.metrics import FactualCorrectness
from weave.trace.box import unbox

import nest_asyncio
import math


import weave; weave.init("ragas_eval")
# Seed for reproducibility
SEED = 42
random.seed(SEED)

# Apply nest_asyncio to avoid event loop conflicts in Weave
nest_asyncio.apply()

# Configure Language Model (LM) and Retriever Model (RM)
turbo = dspy.OpenAI(model='gpt-4o-mini', api_key="your api key")

client = weaviate.connect_to_wcs(
cluster_url="your cluser url",
auth_credentials=weaviate.auth.AuthApiKey("your weaviate api key"),
headers={'X-Cohere-Api-Key': "your cohere api key"}
)

retriever_model = WeaviateRM(
weaviate_collection_name="WeaveDocsChunk",
weaviate_client=client,
k=5
)

# Configure DSPy settings
dspy.settings.configure(lm=turbo, rm=retriever_model)


class GenerateAnswer(dspy.Signature):
"""Answer questions with short factoid answers."""
context = dspy.InputField(desc="may contain relevant facts")
question = dspy.InputField()
answer = dspy.OutputField(desc="Answer in 1-3 sentences")


class RAG(dspy.Module):
def __init__(self, num_passages=5):
super().__init__()
self.retrieve = dspy.Retrieve(k=num_passages)
self.generate_answer = dspy.ChainOfThought(GenerateAnswer)

def forward(self, question: str):
context = self.retrieve(question).passages
if isinstance(question, weave.trace.box.BoxedStr):
question = unbox(question)

prediction = self.generate_answer(context=context, question=question)
return dspy.Prediction(context=context, answer=prediction.answer)


class RAGModel(Model):
model_name: str
rag_pipeline: RAG

@weave.op
def predict(self, question: str):
print(rag_pipeline)
pred = self.rag_pipeline(question)
return {
"output": pred.answer,
"input": question,
"retrieved_contexts": pred.context
}


def safe_score(score):
"""Helper function to return 0 if the score is NaN or None."""
if score is None or (isinstance(score, float) and math.isnan(score)):
return 0
return score

@weave.op
def ragas_factual_correctness_score(expected: str, question: str, model_output: dict) -> dict:
if not model_output:
print("Model output is None.")
return {'factual_correctness_score': 0}

sample = SingleTurnSample(
response=model_output.get('output', "No output."),
reference=expected
)

evaluator_llm = LangchainLLMWrapper(ChatOpenAI(model="gpt-4o-mini"))
factual_correctness_metric = FactualCorrectness(llm=evaluator_llm)

loop = asyncio.get_event_loop()
score = loop.run_until_complete(factual_correctness_metric.single_turn_ascore(sample=sample))

return {'factual_correctness_score': safe_score(score)}


def format_dataset(df):
"""Format the dataset into a list of dictionaries for Weave evaluation."""
evaluation_dataset = []
for _, row in df.iterrows():
entry = {
"question": row['user_input'],
"expected": row['reference']
}
evaluation_dataset.append(entry)
return evaluation_dataset

if __name__ == "__main__":
dataset_path = './generated_testset.csv'
df = pd.read_csv(dataset_path)
eval_data = format_dataset(df)

save_path = './compiled_rag_program_v1.json'
rag_pipeline = RAG()
rag_pipeline.load(path=save_path)
rag_model = RAGModel(model_name='RAG Model', rag_pipeline=rag_pipeline)

evaluation = Evaluation(dataset=eval_data, scorers=[ragas_factual_correctness_score])

print("Starting Weave evaluation...")
asyncio.run(evaluation.evaluate(rag_model))
파이프라인 설정은 검색과 생성의 흐름을 통해 …을(를) 정의합니다 RAG 클래스, RAGModel 예측의 진입점 역할을 하는 클래스. The format_dataset 함수는 테스트 데이터셋을 평가에 필요한 구조화된 형식으로 변환합니다. 평가 로직에는 RAGAS 라이브러리의 사실 정확성 지표를 사용하여, 모델 출력이 기대 정답과 일치하는지 비교해 사실 일관성을 확인합니다. 그런 다음 포맷된 데이터셋과 스코어러로 Evaluation 객체를 초기화하고, 비동기 방식으로 평가를 실행하여 모델 성능을 측정합니다.
이 스크립트를 실행하면 Weave 비교 대시보드에서 결과를 시각화할 수 있습니다. 이 대시보드는 두 모델의 토큰 정확도와 전체 정확도와 같은 핵심 지표를 나란히 비교하여, 성능을 쉽게 분석할 수 있도록 제공합니다.
Weights & Biases 프로젝트에서 평가 실행을 선택한 뒤 ‘Compare’ 버튼을 클릭하면, 평가에서 모델이 어떻게 성능을 보였는지에 대한 상세한 시각화를 생성할 수 있습니다. 이 비교는 모델이 잘 수행하는 영역과 추가 최적화가 필요한 영역을 식별하는 데 도움을 줍니다.
평가를 실행한 뒤 Weave 내에서 보이는 화면의 스크린샷은 다음과 같습니다.

이번 평가의 결과는 사실 정확성 측면에서 RAG 파이프라인이 얼마나 잘 작동하는지를 보여줍니다. 필요하다면, 사용 사례에 따라 충실도나 관련성과 같은 추가 지표를 도입해 평가 과정을 보완할 수 있습니다.

결론

이 튜토리얼은 외부 데이터 소스를 결합해 응답의 관련성, 정확성, 최신성을 보장함으로써 RAG가 언어 모델을 어떻게 개선하는지 보여주었습니다. 또한 DSPy가 프롬프트 엔지니어링을 구조화된 방식으로 제공하여 RAG 파이프라인의 생성과 최적화를 단순화하고, 복잡한 AI 워크플로를 더 쉽게 구축·유지·개선할 수 있게 하는 방법을 살펴보았습니다. 아울러 Weave 평가를 통합해 파이프라인의 성능을 모니터링하고 측정함으로써, 시스템이 신뢰할 수 있는 출력을 생성하고 오래된 정보나 환각과 같은 오류를 최소화하도록 보장했습니다.
DSPy의 유연성과 Weave의 평가 기능을 결합하면 고성능 RAG 시스템을 효율적으로 구축하고 정교하게 튜닝할 수 있습니다. 챗봇을 통해 문서를 자동화해야 하든, 금융 데이터에서 실시간 감정을 추출해야 하든, 이 도구들을 숙달하면 AI 솔루션이 기능적이면서 신뢰할 수 있고, 비즈니스의 변화하는 요구에 맞춰 유연하게 적응할 수 있습니다!



이 글은 AI로 번역된 기사입니다. 오역이 의심되면 댓글로 알려 주세요. 원문 보고서는 아래 링크에서 확인할 수 있습니다: 원문 보고서 보기