Building a financial agentic RAG pipeline (Part 1)
A comprehensive guide to optimized retrieval with Qdrant, agentic RAG orchestration via LangGraph, and observability with Weave.
Created on November 10|Last edited on November 20
Comment
Financial institutions process thousands of documents daily. From SEC filings and earnings reports to compliance documents and market analyses, each can contain critical information that drives investment decisions. Yet, most RAG implementations fail at scale, often choking on memory constraints, returning irrelevant results, or lacking the sophistication to route complex financial queries effectively. An analyst asking about "Netflix's subscriber growth drivers in APAC" shouldn't receive generic revenue figures or outdated web results.
This article presents a field-tested architecture for production-grade financial document processing, addressing the real challenges teams face when moving beyond proof-of-concept. We'll build a system that reduces memory usage by 32x through binary quantization, combines semantic and keyword search for superior retrieval, and intelligently routes queries to specialized tools, all while maintaining complete observability for debugging and optimization.
By the end, you'll have a working implementation that can efficiently handle millions of financial documents, provide accurate answers with proper citations, and scale to meet the demands of enterprises.
Let's get started.
What we'll be covering:
What is retrieval augmented generation?Performance optimization through binary quantizationHow does binary quantization work?Hybrid search with reciprocal rank fusionEfficient routing system using LangGraph state managementObservability matters: Trace and monitor your app using W&B WeaveStep-by-Step Code ImplementationPart 0: Initial setupPart 1: Offline document processingPart-2: Agentic RAG orchestrationConclusionSources
What is retrieval augmented generation?
Retrieval augmented generation (RAG) has become fundamental to production LLM applications. Here's the core concept: when a financial analyst is asked, "What did Netflix say about supply chain costs in their 2024 10-K filing?", they don't guess, they locate the document, find the relevant section, and provide an evidence-based answer. That's precisely what RAG systems do programmatically.

Two major components in RAG
RAG systems operate through two distinct phases:
- Retrieval: The system searches through a vectorized knowledge base to find the most relevant information chunks for a given query. These chunks could come from SEC filings, PDFs, databases, or internal documents. The retrieval phase uses similarity search algorithms to identify and rank relevant content.
- Generation: The retrieved context is combined with the user's query and passed to an LLM. The model generates a response grounded in the retrieved information, following a predefined prompt template that ensures consistency in tone and structure.
Performance optimization through binary quantization
When building vector databases for financial documents, storage and memory requirements quickly become prohibitive. Standard embeddings use 32-bit floating-point numbers, meaning a single 1536-dimensional vector requires approximately 6 KB. Scale this to millions of documents, and you're managing terabytes of data, along with the corresponding infrastructure costs.
Traditional embeddings use 32-bit floating point numbers, so just one 1536-dimensional vector already takes over 6 kilobytes. Scale that across millions of entries, and you're looking at gigabytes or even terabytes of memory. This makes real-time search slow and infrastructure expensive.
Binary quantization addresses this challenge by dramatically reducing vector storage requirements while maintaining search quality.
How does binary quantization work?
Binary quantization converts each vector dimension to a single bit: positive values become 1, zero or negative values become 0. This creates a binary representation that preserves the essential directional information of the original vector.
Here’s a simple example transformation:

How does binary quantization work?
- Original vector (6 dimensions): [2.1, -0.8, 0.0, 4.3, -1.2, 0.9]
- After Binary Quantization: [1, 0, 0, 1, 0, 1]
- Storage before: 6 × 4 bytes = 24 bytes
- Storage after: 6 bits ≈ 0.75 bytes
This compression means you can fit many more vectors in the same amount of RAM. Additionally, comparing binary vectors utilizes fast bitwise operations, such as XOR, which modern CPUs handle extremely efficiently, resulting in significantly faster similarity searches.
IMPORTANT IMPLEMENTATION NOTE: Binary quantization works best when your embedding model produces vectors with >=1024 dimensions or more and when the values are centered around zero. Smaller vectors lose too much meaning after this compression. In our code demo, we are using OpenAI embedding model with 1536 dimensions (dense).
💡
Hybrid search with reciprocal rank fusion
Reciprocal rank fusion (RRF), is a scoring rerank technique that merges ranked lists from multiple retrieval systems, particularly the hybrid search approach. Instead of relying on raw scores, it aggregates based purely on document ranks, giving more weight to top-ranked items while still considering the lower ones. The constant k (typically 60) ensures that extreme rank variations don’t dominate.
Reciprocal rank fusion formula
Where:
- d: the document
- k: a constant (typically 60)
- n: the number of retrieval methods
- rankᵢ(d) the rank of document d in the i-th retrieval method
In hybrid search combining dense (semantic) and sparse (lexical) retrievers, RRF effectively balances:
- Dense retrievers: Capture semantic meaning but may miss exact keyword matches
- Sparse retrievers: Excel at precise term matching but lack semantic understanding
The fusion preserves strengths from both approaches, delivering more comprehensive search results.
Note: The value of the k parameter can help you increase or decrease the impact of the lower ranked documents. The smaller the parameter value, the bigger the impact of the top-ranked results. Source: Qdrant Documentation and Essential
Efficient routing system using LangGraph state management
For our orchestration pipeline, we will be implementing LangGraph state management. LangGraph brings structure to agentic workflows or any LLM-based applications by modeling them as stateful graphs, where each node represents a function that defines the logic, such as routing, retrieval, or generation, and edges define the flow between them. Its core strength lies in shared: a single mutable dictionary that all nodes read from and update. This means context, decisions, and intermediate results persist cleanly across steps without manual passing.
For RAG systems, we maintain three critical state elements:
- User question (Q): The original query
- Context (C): Retrieved information
- Answer (A): Generated response
Our implementation uses LangGraph to orchestrate three specialized tools:
- Knowledge base: Searches internal financial documents
- Summarizer: Creates document summaries with metadata filtering
- Web search: Fetches real-time information when needed
The router classifies incoming queries and directs execution to the appropriate tool, with each tool appending its retrieved context to the shared state for final answer generation.
Observability matters: Trace and monitor your app using W&B Weave
W&B Weave offers comprehensive observability for LLM applications, which is essential for production deployments. Once configured as a callback, Weave automatically captures:
- Complete input/output traces across your workflow
- Tool invocation details with inputs and outputs
- Token usage and cost calculations per request
- Execution latency metrics
- Prompt versioning and artifact storage
This visibility enables debugging complex multi-step retrievals, optimizing costs, and maintaining quality as your system scales. We'll demonstrate two integration approaches: callback-based and decorator-based tracing.

Step-by-Step Code Implementation
- Parse and chunk financial PDF documents, such as SEC filings, shareholder letters, news articles, and compliance data, using PDFium2, which is well-suited for table-aware parsing in financial documents.
- Custom metadata enrichment for better performance and improved retrieval with relevant context for grounding, and also for full-text search filtering based on document type, summary, and chunk-level keywords/tags.
- Optimize memory and storage for indexing using Binary Quantization.
- Implement hybrid search (Dense + Sparse BM25) with Reciprocal Rank Fusion (RRF) for better reordering of the most diverse and related information to pass to the LLM.
- Routing-based application for agentic RAG workflow orchestration to choose across multiple tools using conditional edges.
- We will utilize OpenAI GPT-5 mini for reasoning to select the appropriate tool for answer generation across multiple options. However, for final deployment and staging, one might switch to GPT-5, which offers streaming capabilities.

Part 0: Initial setup
Now that we know the frameworks that we will be using for building this application, let's start with the installation:
!pip install langgraph langchain-openai==1.0.2 langchain-community!pip install weave!pip install qdrant_client fastembed pypdfium2!pip install tavily-python
Once the installation is done, save the credentials inside the .env file or environment variables.
WANDB_API_KEY=<replace-with-your-api-key>GOOGLE_API_KEY=<replace-with-your-api-key>OPENAI_API_KEY=<replace-with-your-api-key>QDRANT_API_KEY=<replace-with-your-api-key>QDRANT_URL=<replace-with-your-endpoint-url>TAVILY_API_KEY=<replace-with-your-api-key>
Part 1: Offline document processing
In the next part, our goal is to save the data inside the vector database, i.e., indexing. For the data, we will be using Netflix's SEC filing, shareholder letter, and news data, which are all publicly available here: https://ir.netflix.net/financials/sec-filings/default.aspx

Document parsing and chunking
Now that we have a folder containing 8 files, we need to parse the raw information from the documents. There are numerous PDF loaders available. However, a research paper titled "A Comparative Study of PDF Parsing Tools Across Diverse Document Categories" suggests that for financial documents, PyPDFium2 yields better results. We'll use this loader for the extraction.
As an alternative, you are free to use Unstructured.IO, remember this is offline document processing. This happens only once, after the data is indexed, you can directly inference and get the relevant information.
💡
from langchain_community.document_loaders import FileSystemBlobLoaderfrom langchain_community.document_loaders.generic import GenericLoaderfrom langchain_community.document_loaders.parsers import PyPDFium2Parserfrom dotenv import load_dotenvload_dotenv()
Since we have the PDF files organized in multiple folders, use the blob loader to extract the data that ends with the PDF extension.
loader = GenericLoader(blob_loader=FileSystemBlobLoader(path="data",glob="**/*.pdf",),blob_parser=PyPDFium2Parser(),)documents = loader.load()
The loaded documents variable is a Langchain object that contains page content and metadata for each page. However, basic metadata alone won't suffice for production, as it requires more contextual information for improved retrieval, so let's add additional details.
Metadata enrichment
Basic metadata is insufficient for production retrieval. We enhance each chunk with:
- Chunk summaries: Generated using a distilled BART model for cost-effective summarization
- Financial tags: Key concepts extracted via Gemini-2.0-Flash for improved filtering
- Document types: Classifications (10-K, 8-K, etc.) for targeted retrieval
- Temporal markers: Year indicators for time-sensitive queries
import torchfrom transformers import pipelinefrom google.genai import Clientsummarizer_model = pipeline("summarization",model="sshleifer/distilbart-cnn-12-6",device=0 if torch.cuda.is_available() else -1,dtype = torch.float16 if torch.cuda.is_available() else None,batch_size=8, max_length=128, truncation=True)llm_client = Client()
Once the summarizer model is loaded locally, pass each chunk through the summarizer_model and get the concise response back.
def summarize_chunk(text: str) -> str:input_tokens = len(text.split())max_length = max(20, min(input_tokens // 2, 128))min_length = min(20, max_length - 1)try:result = summarizer_model(text,max_length=max_length,min_length=min_length,do_sample=False,truncation=True)return result[0]['summary_text']except Exception as e:return text[:200] if len(text) > 200 else text
And since we have multiple folders with different source file paths, let's consider this scenario: if you ask for a summary of a specific document, the system needs to filter based on the document type keyword for exact matching. We define the doc_type for each source file to enable metadata filtering during retrieval. This ensures queries target the correct document category without retrieving irrelevant content.
def add_doc_type(source_file_name):if "Form_8k" in source_file_name or "FORM_8k" in source_file_name:return "8-K Filing"elif "Form_10K" in source_file_name or "Annural" in source_file_name:return "10-K Filing"elif "Form_10Q" in source_file_name or "Quarterly" in source_file_name:return "10-Q Filing"elif "Shareholder" in source_file_name:return "Shareholder Letter"elif "NETFLIX-BITES" in source_file_name or "Netflix-House" in source_file_name:return "News Article"else:return "Document"
Tag generation from summaries improves retrieval quality by creating structured metadata that captures the core financial concepts in each chunk. Even this serves the same purpose as applying a filter whenever needed to improve the hybrid search approach.
def generate_tags_from_summary(summary: str, chunk_data: str):SYSTEM_PROMPT = f"""You are an expert Financial text analyzer.Analyze the provided SUMMARY and ADDITIONAL CONTEXT financial text and \identify ALL relevant financial metrics and business concepts mentioned.Core Financial Metrics (use these exact tags if relevant):- revenue, subscribers, earnings_per_share, operating_income, operating_margin- net_income, free_cash_flow, content_spending, debt, cash_and_equivalents- advertising_revenue, churn, arpu, guidanceRespond with relevant tags seperated with commaExamples: revenue, subscribers,... // this is just example, be smart and generate relevant tags only"""USER_PROMPT = f"""Summary to consider: {summary}.Additional Context: {chunk_data}"""try:response = llm_client.models.generate_content(model="gemini-2.5-flash-lite",contents=USER_PROMPT,config={"system_instruction": SYSTEM_PROMPT})return response.textexcept Exception as e:print(f"Tag generation error: {e}")return []
Now that we have the logic for generating summaries, tags, and document types, the next step is to iterate through all the document chunks and update their metadata dictionary accordingly.
def update_metadata(data):for chunk in data:generated_summary = summarize_chunk(chunk.page_content)tags = generate_tags_from_summary(generated_summary, chunk.page_content[-800:])doc_type = add_doc_type(chunk.metadata['source'])chunk.metadata['chunk_summary'] = generated_summarychunk.metadata['chunk_tags'] = tagschunk.metadata['doc_type'] = doc_typechunk.metadata['calendar_year'] = 2025return datamodified_data = update_metadata(documents)
Modified data is simply the updated version of the documents; page_content remains the same, but we have added new keys to the metadata dictionary.
Vector database: Binary optimization and hybrid search configuration
We now take the enriched (modified_data) document data and push it into the Qdrant Vector database. Here, we configure the collection to support both dense and sparse embeddings, apply Binary Quantization for optimized memory usage, and enable hybrid search to semantic (dense) and keyword-based (sparse) retrieval for faster and reliable responses.
import uuid, gcfrom langchain_openai import OpenAIEmbeddingsfrom fastembed import SparseTextEmbeddingfrom qdrant_client import QdrantClient,modelsfrom qdrant_client.models import (VectorParams, Distance, SparseVectorParams, Modifier,BinaryQuantization, BinaryQuantizationConfig,PointStruct)
Now define the embedding models for both dense and sparse representations. The dense model from OpenAI captures semantic meaning in 1536 dimensions, while the sparse model using Qdrant’s BM25 focuses on keyword-based (or lexical-based) relevance.
embed_model_name = "text-embedding-3-small"dense_model = OpenAIEmbeddings(model = embed_model_name)sparse_model = SparseTextEmbedding(model_name="Qdrant/BM25")check_dim = dense_model.embed_query("testing the dimensions of embedding model")print(len(check_dim)) #1536 fixed dense dimensions
To start, we define a collection in Qdrant, which acts as a dedicated space to store and organize all our vector embeddings. Every collection can have its own configuration, including vector types, storage preferences, and quantization settings.
To get your credentials:
- Create a free cluster with the default cloud and region settings.
- Get the Qdrant URL ending with 6333 port and the api key.
collection_name = "financialv2"client = QdrantClient(url = os.getenv("QDRANT_URL"),api_key = os.getenv("QDRANT_API_KEY"),)
Quadrant allows binary quantization to compress dense vectors, significantly reducing their memory footprint. During the search, it first performs an approximate match using the binary index stored in RAM, which helps identify potential candidates more quickly. Once the top candidates are identified, Qdrant retrieves their full original vectors from disk for an accurate comparison, giving the best balance between speed and precision.
By default, Qdrant keeps both the full and binary vectors in RAM, which may increase memory usage. To optimize resource usage, we set always_ram=False inside the BinaryQuantizationConfig.
This works well only if your dimensions is more than or equal to 1024.
client.create_collection(collection_name=collection_name,vectors_config={"dense": VectorParams(size=len(check_dim), distance=Distance.COSINE, on_disk=True),},sparse_vectors_config={"sparse": SparseVectorParams(modifier=Modifier.IDF),},quantization_config=BinaryQuantization(binary=BinaryQuantizationConfig(always_ram=False)))
Finally, we define a hybrid search configuration while creating the collection. This includes specifying both dense and sparse vector spaces. The dense vectors capture semantic similarity using cosine distance, while the sparse vectors use an IDF-based modifier for keyword relevance. Together, they form a hybrid search setup.
Index your document
The embedding and vector database are initialized and configured; now we need to combine them with the modified data.
Iterate through each chunk by embedding it using both dense and sparse models to capture the semantic and keyword-based vectors. These embeddings, along with their metadata, are stored as points inside the Qdrant collection for retrieval. This is saved inside vector params in PointStruct.
Ensure that you define your metadata and the main content within the Payload. Adding metadata to the Payload makes it useful to later apply metadata filtering on these keys (metadata is a dictionary, filter is applied on its key). Once defined, Payload and Vector, we can upsert and index the data to the vector database.
def index_documents(data):for i, doc in enumerate(data):try:tags = doc.metadata.get("chunk_tags","")content = doc.page_content.strip()searchable_text = f"{content} \n Keywords: {tags}"dense_embedding = dense_model.embed_query(searchable_text)sparse_embedding = next(sparse_model.embed([searchable_text])) # note here its inside listpoint = PointStruct(id=str(uuid.uuid4()),vector={"dense": dense_embedding,"sparse": sparse_embedding.as_object(),},payload={"content": content,"source": doc.metadata.get("source",""),"page": doc.metadata.get('page', 0),"chunk_tags": tags,"document_type": doc.metadata.get("doc_type",""),"chunk_id": doc.metadata.get('chunk_id', ''),"calendar_year": doc.metadata.get('calendar_year', ''),})client.upsert(collection_name=collection_name,points=[point],)if i % 10 == 0:gc.collect()except Exception as e:print(e)index_documents(modified_data)
Search relevant documents using RRF and hybrid search
Now that the documents are successfully indexed in the vector database, ask a user query and extract the relevant chunks by applying the Prefetch combination of dense vectors, i.e., Semantic search using OpenAI model, and sparse vectors, i.e., Lexical search using BM25. Each of these searches is limited to 10 relevant documents.
These 20 documents have now formed the mixture, and after RRF reranking scores, we pass the top-k, i.e., 5 chunks, to the LLM for the answer generation.
from pydantic import BaseModel, Fieldfrom typing import TypedDict,Literal,List,Optional,Dict,Anydef db_search(query: str, filter_condition: Optional[None]):dense_vectors = dense_model.embed_query(query)sparse_vectors = next(sparse_model.embed([query]))prefetch = [models.Prefetch(query = models.SparseVector(**sparse_vectors.as_object()),using="sparse",limit=10,),models.Prefetch(query = dense_vectors,using="dense",limit=10,)]response = client.query_points(collection_name = collection_name,prefetch = prefetch,query = models.FusionQuery(fusion=models.Fusion.RRF),limit = 5,with_payload = True,query_filter = filter_condition)return responsequery = "Based on Netflix’s most recent 10-K filing, what were the key drivers of subscriber growth in the Asia-Pacific region"docs = db_search(query=query,filter_condition=None)
Output:

Part-2: Agentic RAG orchestration
With the retrieval infrastructure complete, we implement the orchestration layer using LangGraph for dynamic query routing and response generation.
from langchain_openai import ChatOpenAIfrom langgraph.graph import StateGraph, START, ENDfrom tavily import TavilyClientimport weave,refrom weave.integrations.langchain import WeaveTracer
We initialize Weave for observability to monitor the workflow execution and insights. This will create and initialize the project, which can be viewed on the dashboard when the user query is executed.
Tavily is set up for real-time web search, and GPT-5-mini is loaded as the core reasoning model for routing and response generation.
observe = weave.init('financialv1')tavily_client = TavilyClient()llm = ChatOpenAI(model = "gpt-5-mini")
Defining your router
The router acts as the first decision-maker in the agentic workflow, determining how each query should be handled. It uses a structured output schema defined by the Route class to understand the intent and then route to the specific task, i.e., tool.
class Route(BaseModel):step: Literal["knowledge", "search", "summary"] = Field(None, description="The next step in the routing process")router = llm.with_structured_output(Route)
Function calling tools nodes
In LangGraph, a State represents the shared memory that carries data between nodes during execution. The attributes defined in AgentState include tool_used, question, context, and response. This helps to track the current tool being used, the user’s query, the retrieved context, and the final output.
class AgentState(TypedDict):tool_used: strquestion: strcontext: List[str]response: str
Once the state is defined, we need the nodes, i.e., the functions that hold the logic for executing the workflow. We will start with the LLM router.
llm_call_router node: This function takes in the user query and uses the router model to decide the next tool to invoke.
def llm_call_router(state: AgentState):"""Route the input to the appropriate node"""SYSTEM_PROMPT = """Route the input to knowledge, search, or summary based on the user's request.- route to knowledge_base if user query is grounded in the knowledge base and it is a QA based question within the knowledge base.- route to web_search if user query is not grounded in the knowledge base and needs to be fetched from web.- route to summarizer if user query is not grounded in the knowledge base and needs to be summarized or the question is to summarize any document i.e., policy or filing or compliance or news information from knowledge base."""decision = router.invoke([{"role": "system", "content": SYSTEM_PROMPT},{"role": "user", "content": state["question"]}])return {"tool_used": decision.step}
knowledge_base node: The logic here is only db_search. As you can see, it performs a similarity search, fetches the top results, and compiles structured snippets containing content, source, document type, and tags, returning them as a string. These retrieved chunks form the evidence context for answering knowledge-grounded questions, which include citations, i.e., the source and page number.
def knowledge_base(state: AgentState) -> AgentState:"""Tool 1: knowledge_base: to answer the question from the vector databasecontext: Retrieval from Knowledge base, answer the question if its from the knowledge base"""query = state["question"]response = db_search(query, filter_condition=None)context_parts = []for i, point in enumerate(response.points):payload = point.payloadcontext_parts.append(f"Content: {payload.get('content', '')}"f"\nSource: {payload.get('source', 'Unknown')}"f"\nDocType:: {payload.get('document_type', 'Unknown')}"f"\nPage: {payload.get('page', 'N/A')}"f"\nTags: {payload.get('chunk_tags', 'None')}")context = "\n".join(context_parts)return {"context": context}
web_search node: When the user query is unrelated to internal data, the web_search node acts as a fallback. It leverages Tavily’s search API to pull relevant web results, extracts key titles and content, and merges them into a context.
def web_search(state: AgentState) -> AgentState:"""Tool 2: web_search: to answer the question from the Search web browsercontext: Browse through Tavily Search as fallback, if the user question is not grounded in the knowledge base"""query = state['question']response = tavily_client.search(query,max_results=10)context = ""for result in response["results"]:context += result["title"] + " " + result["content"]return {"context": context}
Summarizer with metadata filter
Document summarization can be challenging, especially when users mix summary requests with other QA-based queries. Here, when we refer to a summary, we make it robust enough to handle multi-entry filtering based on document_type.
First, we make the metadata indexable for filtering using create_payload_index and define the key from the payload (i.e., metadata). Then, we interpret the user query to understand the intent and extract the appropriate filter tag, which helps retrieve data from the correct document type along with semantic search. Understand the syntax below, and then build the summarizer node. This can be very experimental, depending on the data we use.
client.create_payload_index(collection_name=collection_name,field_name="source",field_schema=models.PayloadSchemaType.KEYWORD)client.create_payload_index(collection_name=collection_name,field_name="document_type",field_schema=models.PayloadSchemaType.KEYWORD)query = "summarize 10-k filing"doc_type_map = {"10-k": "10-K Filing", "10k": "10-K Filing", "annual": "10-K Filing","10-q": "10-Q Filing", "10q": "10-Q Filing", "quarterly": "10-Q Filing","8-k": "8-K Filing", "8k": "8-K Filing","shareholder": "Shareholder Letter", "letter": "Shareholder Letter"}target_doc = next((v for k, v in doc_type_map.items() if k in query.lower()), None)print(target_doc)
summarizer node: The summarizer function fetches all document chunks belonging to a detected document type and compiles their content.
def summarizer(state: AgentState) -> AgentState:"""Tool 3: summarizer – Retrieve full content of a specific document to enable summarization.Uses your exact document_type values i.e., 8-K Filing, 10-K Filing, 10-Q Filing, etc."""query = state["question"].lower()doc_type_map = {"10-k": "10-K Filing", "10k": "10-K Filing", "annual": "10-K Filing","10-q": "10-Q Filing", "10q": "10-Q Filing", "quarterly": "10-Q Filing","8-k": "8-K Filing", "8k": "8-K Filing","shareholder": "Shareholder Letter", "letter": "Shareholder Letter"}target_doc = next((v for k, v in doc_type_map.items() if k in query.lower()), None)query_filter = Noneif target_doc:query_filter = models.Filter(must=[models.FieldCondition(key="document_type",match=models.MatchValue(value=target_doc))])response = db_search(query, query_filter)context_point = []for point in response.points:context_point.append(point.payload['content'])context = "\n".join(context_point)return {"context": context}
answer_generation node: This node is where the final response gets generated using the LLM. The system prompt determines how the model should respond, providing exact answers for knowledge-based queries, well-structured summaries for document-type requests, and clear guidance for web-based responses.
Every output remains context-aware, grounded in the retrieved or fetched content to ensure the answer is precise and relevant.
def answer_generation(state: AgentState) -> AgentState:question = state["question"]tool_used = state["tool_used"]context = state["context"]SYSTEM_PROMPT = """You are an expert financial analyst specializing in SEC filings and corporate finance.- For knowledge_base queries: Provide precise answers with exact figures, dates, and citations [Doc Type - Source, Page X].- For summarizer queries: Create structured summaries organized by themes, highlighting key metrics and strategic points.- For web_search queries: Guide users to appropriate resources and explain available database information."""user_msg_template = {"knowledge_base": f"Question: {question}\n\nDocuments:\n{context}\n\nProvide precise answer with citations. If question is not from the CONTEXT, use search tool, if not say not enough information.","summarizer": f"Question: {question}\n\nSummaries:\n{context}\n\nCreate comprehensive summary.","web_search": f"Question: {question}\n\n{context}."}HUMAN_PROMPT = user_msg_template.get(tool_used, user_msg_template["knowledge_base"]) # default - user_msg_templateprompt = [{"role": "system", "content": SYSTEM_PROMPT},{"role": "user", "content": HUMAN_PROMPT}]response = llm.invoke(prompt)return {"response":response.content}
Phew, we have come a long way. It's time to end this by connecting the flow of the node using the edge.
Conditional Edge Node and compile workflow
Since we have the condition after the router decision, we will implement conditional_edge in the LangGraph.
# Conditional edge nodedef route_decision(state: AgentState):if state["tool_used"] == "knowledge":return "knowledge_base"elif state["tool_used"] == "search":return "web_search"elif state["tool_used"] == "summary":return "summarizer"
Each tool node connects to the answer generator, completing the reasoning loop. Finally, the graph is compiled into an executable pipeline, making the agentic RAG system operational and ready for dynamic query handling.
First, add all the available nodes to the graph builder state, and then connect the nodes via edge using Graph Functional API logic.
graph_builder = StateGraph(AgentState)graph_builder.add_node("llm_call_router", llm_call_router)graph_builder.add_node("knowledge_base", knowledge_base)graph_builder.add_node("web_search", web_search)graph_builder.add_node("summarizer", summarizer)graph_builder.add_node("answer_generation", answer_generation)graph_builder.add_edge(START, "llm_call_router")graph_builder.add_conditional_edges("llm_call_router",route_decision,{"knowledge_base": "knowledge_base","web_search": "web_search","summarizer": "summarizer",},)graph_builder.add_edge("knowledge_base", "answer_generation")graph_builder.add_edge("web_search", "answer_generation")graph_builder.add_edge("summarizer", "answer_generation")graph_builder.add_edge("answer_generation",END)graph = graph_builder.compile()graph

Trace and monitor your application using Weave
Tracing and monitoring with Weave helps visualize (for tracing and monitoring) the complete flow of your LangGraph application. There are two ways to enable it:
- First, by attaching the WeaveTracer callback during graph invocation to track each node execution in real time.
- Second, by using a custom @weave.op() function that logs every call, captures feedback, and records route decisions directly inside the Weave dashboard.
weave_tracer = WeaveTracer()config = {"callbacks": [weave_tracer]}query = "Based on Netflix’s most recent 10-K filing, what were the key drivers of subscriber growth on global region"result = graph.invoke({"question":query},config=config)print(result['tool_used'])print(result['response'])

When using the WeaveTracer function as a callback, it automatically captures and logs all state attributes during execution. Alternatively, we can manually add tool_used as feedback using the client.feedback.add_note method from Weave.
This same approach will be used in Part 2 of the blog for running evaluations.
@weave.op()def get_response(query):result = graph.invoke({"question":query})current_call = weave.require_current_call()call_id = current_call.idcall = observe.get_call(call_id)call.feedback.add_note(f"Routed to: {result['tool_used']}")return result['response']query2 = "Provide detailed summary on 8K sec filing"get_response(query2)


Conclusion
What we’ve built isn’t just a chatbot. It is a comprehensive workflow that encompasses optimization, retrieval refinement with hybrid search and reciprocal rank fusion, and the addition of metadata for enhanced context. It’s a reliable, traceable assistant that handles real financial documents in production. In the next article, we’ll evaluate it. We’ll use LLM-as-a-Judge and run basic metrics-based evals to see how well the answers hold up under different scenarios and personas of the user testing your application.
Stay tuned for part 2!
Sources
- Reciprocal Rank Fusion outperforms Condorcet and individual Rank Learning Methods: https://cormack.uwaterloo.ca/cormack/cormacksigir09-rrf.pdf
- Inspiration to add Metadata for enrichment, Uber case study on building Enhanced Agentic RAG: https://www.uber.com/en-IN/blog/enhanced-agentic-rag/
Add a comment