Tutorial: Call center modernization with multi-agent systems
In this tutorial, we will be building a working simulation of a banking customer service line powered by conversational agents!
Created on October 9|Last edited on October 13
Comment
Modern call centers are entering a new phase powered by multimodal LLMs that can listen, speak, and act in real time. Instead of static menu trees or delayed text exchanges, these systems interpret live speech, understand intent, and execute real actions using connected tools.
Banking is an ideal testing ground for this shift because most customer requests follow structured, well-defined workflows such as verifying identity, resolving failed payments, or resetting a password. A real-time model can handle these tasks naturally through conversation, combining speech recognition, reasoning, and tool use into one continuous process.
To explore how this works in practice, today we’re building a working simulation of a banking customer service line powered by conversational agents. The goal is to create a realistic support experience where a user speaks naturally, and the system handles everything from identity verification to transaction review without a human on the other end. Each agent plays a distinct role, routing calls, verifying customers, or resolving issues, while coordinating through a shared real-time voice framework.
To get a sense of where this tutorial leads, here’s a sample recording of the final system in action:
Run set
1
Table of contents
Designing our multi-agent call centerOur bank emulation environment and AI stackBuilding a simulated banking backendSupporting bank infrastructure for accessing the databaseBuilding a voice agent classBuilding the router agent and chat loopImplementing a password reset agent into our call centerBuilding our fraud investigation agentBuilding the failed transaction agentTesting and debugging our AI agentsallows the password reset agent to deliver verification codes and reset links, while the fraud agent automatically sends dispute confirmation emailsSample audio Our modernized call center system
Designing our multi-agent call center
The first step in designing our multi-agent call center is understanding what customers generally need when they call. Most banking calls aren’t complex; they’re variations of a few core problems, including logging in, resetting a password, confirming identity, unblocking a declined transaction, or checking a suspicious charge. These are repetitive, structured, and rule-based, which makes them ideal for automation through conversational agents.
The system we're designing focuses on these high-frequency cases first. Each one follows a predictable flow: verify the caller, look up an account, perform a specific action, and confirm the outcome. By centering the design around these repeatable patterns, the agents can actually complete real interactions instead of handing off partial responses.
- The authentication agent verifies identity, manages email codes, and handles password resets.
- The failed-transaction agent will focus on declined or blocked card payments, helping the user identify the cause and safely whitelist the merchant if appropriate.
- The fraud agent will investigate suspicious charges, search transactions, gather dispute details, and file a structured report while optionally blocking the merchant.
Each agent runs on the same real-time voice framework, sharing the same audio I/O, websocket handling, and transcription stack. They are launched separately by a routing agent that listens for intent and directs the call to the correct module. This keeps each agent simple, domain-focused, and easier to test and extend.
Starting with the problems customers face most often makes the system practical, directly useful, and faster to deploy, and reduces the time-to-value for your agentic system. Automating these flows frees human operators for complex cases while giving callers an immediate, voice-based experience that feels like a real conversation rather than a series of menu prompts.
Our bank emulation environment and AI stack
The multi-agent call center structure is built around a real-time conversational model that processes live audio in both directions. It listens continuously, understands intent as the user speaks, and replies with generated speech almost instantly. This creates a natural exchange instead of the slow, text-based back and forth that most chat systems rely on.
The real-time API gives the model access to tools written in Python, allowing it to perform actions instead of only generating dialogue. When someone says “send me a verification code” or “why was my card declined,” the model can call the right function, pass in arguments like the account number or merchant name, and then return the result as part of the spoken response.
Those tools connect to a banking emulator that acts like a simplified financial backend. It stores mock customer records, transaction data, and merchant information so the system can verify identities, list failed payments, or add a merchant to a whitelist. The emulator makes it possible to test real workflows without exposing any live systems or data.
Together, the real-time model, the tool interface, and the banking emulator form a complete stack for a fully simulated call center. The model manages the conversation, the tools perform actions, and the emulator provides the realistic environment in which those actions take place.
Building a simulated banking backend
To make the voice agents act like real customer service representatives, the system needs a live data source to query and update. Instead of connecting to an external database, the project uses a local JSON file that serves as a miniature banking database. It stores mock customer profiles, transaction histories, and merchant information, giving the agents a reliable and persistent backend to work with.
Each entry in the JSON database includes personal details such as name, date of birth, address, social security number, account number, and email. It also contains a list of transactions for that account with fields for timestamp, amount, merchant, type, and status. Some transactions are marked as failed to allow agents to test realistic scenarios, such as blocked payments or declined charges.
A simple Python class manages all access to this data. It reads and writes the JSON file, exposes methods for verifying customers, listing transactions, or updating allowlists, and ensures that every operation leaves a record behind. This design keeps the system fully local while still behaving like a real financial backend, giving the voice agents something concrete to act upon during conversation.
Here’s what our database looks like:
{"1001": {"name": "Alice Johnson","dob": "1990-05-04","soc": "123-45-6789","address": "123 coffee St, Springfield, IL 12345","email": "byyoung3@gmail.com","balance": 2500,"history": [["Deposit", 1000, "2024-11-15T10:30:00", "Payroll Deposit", "posted"],["Withdraw", 200, "2024-12-01T13:15:00", "ATM Withdrawal", "posted"],["Deposit", 1700, "2025-01-25T09:00:00", "Freelance Payment", "posted"],["Withdraw", 45, "2025-02-10T11:23:00", "STARBUCKS", "posted"],["Withdraw", 75, "2025-03-08T08:17:00", "UBER", "posted"],["Withdraw", 120, "2025-04-12T15:45:00", "AMAZON", "posted"],["Withdraw", 130, "2025-04-12T15:43:00", "AMAZON", "posted"],["Withdraw", 330, "2025-04-12T15:46:00", "AMAZON", "posted"],["Deposit", 300, "2025-05-10T09:00:00", "Refund", "posted"],["Withdraw", 300, "2025-06-15T12:33:00", "Unknown Merchant XYZ", "posted"],["Deposit", 600, "2025-07-20T16:00:00", "Bonus Deposit", "posted"],["Withdraw", 95, "2025-08-05T10:20:00", "Grocery Market", "posted"],["Withdraw", 180, "2025-09-29T19:40:00", "AIRLINE ABC", "failed"],["Withdraw", 52, "2025-10-03T09:10:00", "LOCAL BAKERY", "failed"]],"card": {"status": "active","limit": 1500,"merchants": {"enabled": ["LOCAL BAKERY"],"disabled": ["RANDOMSHOP", "STARBUCKS", "AMAZON", "UBER"]},"travel_limits": [{"country": "FR","start": "2025-12-01","end": "2025-12-20","daily_cap": 300},{"country": "JP","start": "2026-01-05","end": "2026-01-25","daily_cap": 400}]}},"2002": {"name": "Bob Smith","dob": "1985-11-21","soc": "987-65-4321","address": "42 Oak Ave","email": "bob.smith@example.com","balance": 980,"history": [["Deposit", 500, "2024-09-20T09:10:00", "Payroll Deposit", "posted"],["Deposit", 480, "2024-11-02T09:00:00", "Gift Transfer", "posted"],["Withdraw", 200, "2025-01-10T12:00:00", "NETFLIX", "posted"],["Deposit", 300, "2025-02-18T10:00:00", "Freelance Payment", "posted"],["Withdraw", 100, "2025-03-12T18:30:00", "SPOTIFY", "posted"],["Withdraw", 65, "2025-04-14T14:50:00", "CASINO ONLINE", "failed"],["Deposit", 450, "2025-06-10T09:10:00", "Tax Refund", "posted"],["Withdraw", 125, "2025-07-20T10:20:00", "FASTFOOD PLACE", "posted"],["Withdraw", 300, "2025-08-25T13:45:00", "Unknown Charge QRS", "posted"],["Withdraw", 90, "2025-10-01T21:05:00", "BETTING APP", "failed"]],"card": {"status": "frozen","limit": 800,"merchants": {"enabled": ["SPOTIFY", "NETFLIX"],"disabled": ["CASINO", "BETTING"]},"travel_limits": []}},"3003": {"name": "Carla Mendes","dob": "1992-08-10","soc": "222-33-4444","address": "98 Pine Blvd","email": "carla.mendes@example.com","balance": 6500,"history": [["Deposit", 4000, "2024-10-01T08:45:00", "Payroll Deposit", "posted"],["Withdraw", 500, "2024-11-22T16:30:00", "WHOLEFOODS", "posted"],["Deposit", 3000, "2025-01-15T08:00:00", "Bonus Payment", "posted"],["Withdraw", 2000, "2025-02-08T13:20:00", "DELTA Airlines", "posted"],["Deposit", 1000, "2025-03-10T08:15:00", "Tax Refund", "posted"],["Withdraw", 850, "2025-04-05T10:40:00", "AIRBNB", "posted"],["Withdraw", 400, "2025-05-22T12:05:00", "Online Boutique", "posted"],["Withdraw", 250, "2025-07-17T15:00:00", "Unknown Vendor JKL", "posted"],["Withdraw", 120, "2025-10-04T11:00:00", "LOCAL MUSEUM", "failed"]],"card": {"status": "active","limit": 5000,"merchants": {"enabled": ["AIRBNB", "DELTA", "WHOLEFOODS"],"disabled": []},"travel_limits": [{"country": "ES","start": "2025-10-01","end": "2025-10-15","daily_cap": 800}]}},"4004": {"name": "Daniel Green","dob": "1978-02-14","soc": "111-22-3333","address": "77 Birch Rd, St. Louis, MO 12345","email": "daniel.green@example.com","balance": 12000,"history": [["Deposit", 6000, "2024-12-05T09:00:00", "Salary Deposit", "posted"],["Withdraw", 1000, "2025-01-10T10:00:00", "HOMEDEPOT", "posted"],["Deposit", 7000, "2025-03-01T09:30:00", "Investment Return", "posted"],["Withdraw", 2000, "2025-04-12T14:00:00", "APPLE", "posted"],["Deposit", 2000, "2025-05-15T09:30:00", "Bonus", "posted"],["Withdraw", 250, "2025-07-18T16:30:00", "COSTCO", "posted"],["Withdraw", 900, "2025-08-25T18:20:00", "BETTINGSHOP", "failed"],["Withdraw", 400, "2025-09-10T11:45:00", "Unknown Charge ABC", "posted"],["Withdraw", 1800, "2025-10-02T12:10:00", "HOLIDAY INN", "failed"]],"card": {"status": "active","limit": 10000,"merchants": {"enabled": ["HOMEDEPOT", "APPLE", "COSTCO"],"disabled": ["BETTINGSHOP"]},"travel_limits": [{"country": "GB","start": "2025-11-01","end": "2025-11-15","daily_cap": 500},{"country": "IT","start": "2026-02-01","end": "2026-02-20","daily_cap": 600}]}}}
Supporting bank infrastructure for accessing the database
To make the database easy for our agents to read and update, similar to how a real system would interact with a backend API, we add a small support layer in Python that handles all interactions cleanly.
This class abstracts away file operations so the agents can work with higher-level methods instead of reading and writing JSON directly. It also ensures that every change is persisted immediately, keeping the simulation consistent across sessions.
For example, the support code includes helper methods to:
- Validate a customer’s personal information and return whether the match is correct
- Retrieve all failed transactions for a given date
- Add a merchant to an account’s whitelist
- Write the updated record back to the JSON file
- This simple infrastructure lets the AI agents modify the database as if they were interacting with a real banking system.
Here’s the code for the banking infrastructure emulator:
import osimport jsonfrom pathlib import Pathfrom typing import Any, Dict, List, Tuple, Optionalfrom openai import OpenAIclass BankSystem:def __init__(self, db_file: str = "bank_data.json"):self.db_path = Path(db_file)self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))if not self.db_path.exists():self.db_path.write_text("{}", encoding="utf-8")def _load_db(self) -> Dict[str, Any]:return json.loads(self.db_path.read_text(encoding="utf-8"))def _save_db(self, data: Dict[str, Any]) -> None:self.db_path.write_text(json.dumps(data, indent=2), encoding="utf-8")def _get_account(self, account_number: str) -> Tuple[Dict[str, Any], Dict[str, Any]]:data = self._load_db()if account_number not in data:raise KeyError("Account not found")return data, data[account_number]def _ensure_card_fields(self, acc: Dict[str, Any]) -> None:if "card" not in acc:acc["card"] = {"status": "active","limit": 0,"merchants": {"enabled": [], "disabled": []},"travel_limits": []}def _ensure_email_field(self, acc: Dict[str, Any]) -> None:if "email" not in acc:acc["email"] = Nonedef _normalize_history(self, acc: Dict[str, Any]) -> None:# History entries are [type, amount, timestamp, description] or [type, amount, timestamp, description, status]hist = acc.get("history") or []norm = []for entry in hist:if len(entry) == 4:ttype, amt, ts, desc = entrystatus = "posted"elif len(entry) >= 5:ttype, amt, ts, desc, status = entry[:5]else:# Skip malformed rowscontinuenorm.append([ttype, amt, ts, desc, status])acc["history"] = normdef register_customer(self,name: str,dob: str,soc: str,address: str,account_number: str,balance: float = 0.0,card_limit: float = 0.0,email: Optional[str] = None) -> None:data = self._load_db()if account_number in data:raise ValueError("Account already exists.")data[account_number] = {"name": name,"dob": dob,"soc": soc,"address": address,"email": email,"balance": balance,"history": [],"card": {"status": "active","limit": card_limit,"merchants": {"enabled": [], "disabled": []},"travel_limits": []}}self._save_db(data)def validate_customer(self, name: str, dob: str, soc: str, address: str, account_number: str) -> bool:data = self._load_db()acc = data.get(account_number)if not acc:return Falseprompt = f"""Compare this provided user info to the record.Return only yes if they appear to match, even if there are small formatting differences, otherwise no.Provided info:Name: {name}DOB: {dob}SOC: {soc}Address: {address}Record info:Name: {acc['name']}DOB: {acc['dob']}SOC: {acc['soc']}Address: {acc['address']}ONLY RESPOND WITH yes or no"""response = self.client.responses.create(model="gpt-4.1",input=prompt,temperature=0.0)answer = (getattr(response, "output_text", "") or "").strip().lower()return "yes" in answerdef get_balance(self, account_number: str) -> float:data = self._load_db()return data[account_number]["balance"]def get_spending_history(self, account_number: str) -> List[List[Any]]:data = self._load_db()acc = data[account_number]self._normalize_history(acc)return acc["history"]def send_report(self, account_number: str) -> str:data = self._load_db()acc = data[account_number]self._normalize_history(acc)report = f"Account Report for {acc['name']}\nBalance: ${acc['balance']}\nTransactions:\n"for ttype, amt, ts, desc, status in acc["history"]:report += f"{ts} {ttype} ${amt} {desc} [{status}]\n"return reportdef set_limit(self, account_number: str, new_limit: float) -> float:if new_limit < 0:raise ValueError("Limit cannot be negative")data, acc = self._get_account(account_number)self._ensure_card_fields(acc)acc["card"]["limit"] = new_limitself._save_db(data)return acc["card"]["limit"]def freeze_card(self, account_number: str) -> str:data, acc = self._get_account(account_number)self._ensure_card_fields(acc)acc["card"]["status"] = "frozen"self._save_db(data)return "frozen"def unlock_card(self, account_number: str) -> str:data, acc = self._get_account(account_number)self._ensure_card_fields(acc)acc["card"]["status"] = "active"self._save_db(data)return "active"def enable_merchant(self, account_number: str, merchant: str) -> Dict[str, List[str]]:data, acc = self._get_account(account_number)self._ensure_card_fields(acc)m = acc["card"]["merchants"]if merchant in m.get("disabled", []):m["disabled"].remove(merchant)if merchant not in m.get("enabled", []):m.setdefault("enabled", []).append(merchant)self._save_db(data)return {"enabled": m.get("enabled", []), "disabled": m.get("disabled", [])}def disable_merchant(self, account_number: str, merchant: str) -> Dict[str, List[str]]:data, acc = self._get_account(account_number)self._ensure_card_fields(acc)m = acc["card"]["merchants"]if merchant in m.get("enabled", []):m["enabled"].remove(merchant)if merchant not in m.get("disabled", []):m.setdefault("disabled", []).append(merchant)self._save_db(data)return {"enabled": m.get("enabled", []), "disabled": m.get("disabled", [])}def add_travel_limit(self,account_number: str,country: str,start: str,end: str,daily_cap: Optional[float] = None) -> List[Dict[str, Any]]:data, acc = self._get_account(account_number)self._ensure_card_fields(acc)entry: Dict[str, Any] = {"country": country, "start": start, "end": end}if daily_cap is not None:if daily_cap < 0:raise ValueError("daily_cap cannot be negative")entry["daily_cap"] = daily_capacc["card"]["travel_limits"].append(entry)self._save_db(data)return list(acc["card"]["travel_limits"])def get_card_status(self, account_number: str) -> str:_, acc = self._get_account(account_number)self._ensure_card_fields(acc)return acc["card"]["status"]def get_card_config(self, account_number: str) -> Dict[str, Any]:_, acc = self._get_account(account_number)self._ensure_card_fields(acc)return acc["card"]# New transaction supportdef list_transactions_by_date(self,account_number: str,date_iso: str,include_failed: bool = True) -> List[Tuple[str, float, str, str, str]]:data, acc = self._get_account(account_number)self._normalize_history(acc)out: List[Tuple[str, float, str, str, str]] = []for ttype, amt, ts, desc, status in acc["history"]:if ts.startswith(date_iso):if include_failed or status != "failed":out.append((ttype, float(amt), ts, desc, status))return outdef list_failed_by_date(self,account_number: str,date_iso: str) -> List[Tuple[str, float, str, str]]:data, acc = self._get_account(account_number)self._normalize_history(acc)out: List[Tuple[str, float, str, str]] = []for ttype, amt, ts, desc, status in acc["history"]:if ts.startswith(date_iso) and status == "failed":out.append((ttype, float(amt), ts, desc))return outdef whitelist_merchant(self, account_number: str, merchant: str) -> Dict[str, Any]:data, acc = self._get_account(account_number)self._ensure_card_fields(acc)m = acc["card"]["merchants"]enabled = set(m.get("enabled", []))disabled = set(m.get("disabled", []))key = merchant.strip()if key in disabled:disabled.remove(key)enabled.add(key)m["enabled"] = sorted(enabled)m["disabled"] = sorted(disabled)self._save_db(data)return {"ok": True, "enabled": m["enabled"], "disabled": m["disabled"]}def add_failed_transaction(self,account_number: str,amount: float,timestamp_iso: str,merchant: str) -> None:data, acc = self._get_account(account_number)self._normalize_history(acc)acc["history"].append(["Withdraw", float(amount), timestamp_iso, merchant, "failed"])self._save_db(data)def add_posted_transaction(self,account_number: str,ttype: str,amount: float,timestamp_iso: str,description: str) -> None:if ttype not in {"Deposit", "Withdraw"}:raise ValueError("ttype must be Deposit or Withdraw")data, acc = self._get_account(account_number)self._normalize_history(acc)acc["history"].append([ttype, float(amount), timestamp_iso, description, "posted"])self._save_db(data)
Next, we’ll build a lightweight support layer that lets our agents interact with this data just like a real banking API.
Building a voice agent class
The voice agent class is the backbone of the system. It manages the real-time websocket session, handles audio input and playback, and connects the model’s decisions to executable Python functions. It captures microphone input, streams it to the model, and plays back generated speech as it arrives. Every exchange is logged in a transcript for later analysis or debugging.
The user defines the agent’s behavior by passing system prompts, tool definitions, and a function handler. The tools describe the available actions such as verifying a customer or listing failed transactions, while the handler executes the corresponding Python functions when those tools are called. The base agent manages this routing automatically, listening for function calls from the model, invoking the handler, sending structured outputs back, and continuing the conversation flow in real time.
This implementation runs on gpt-realtime-2025-08-28, a fully multimodal model capable of understanding speech directly, generating natural spoken responses, and calling tools as part of its reasoning process. The model continuously processes audio, interprets intent, and executes function calls in real time without needing explicit rules or orchestration logic.
We built this class to make it simple to define a conversational agent that can think, speak, and act fluidly. The developer only needs to provide three pieces: a system prompt that defines the agent’s behavior, a list of callable tools that describe possible actions, and a handler that executes those tools when the model requests them.
The model itself decides when and how to use those tools, calling them in the right sequence and combining them naturally with dialogue. That means you don’t have to hard code turn taking logic or tool ordering rules. The model’s own reasoning is strong enough to manage multi step processes like verification, data lookup, and reporting autonomously.
The class also exposes clear lifecycle signals that help coordinate sessions. When the model completes the user’s request, it calls an objective completion tool that summarizes the outcome. If the caller says stop or return to main menu, the model invokes the end session tool, ensuring a clean shutdown after the final spoken line.
Together, these features make the agent a self contained bridge between human conversation, realtime AI reasoning, and live system actions, all running through one continuous audio stream.
The following code is quite long and complex due to the challenges of processing realtime audio. I recommend skipping over this script unless you are specifically interested in the logic for processing audio with the gpt-realtime model.
💡
Here’s the code:
import osimport jsonimport base64import threadingimport queueimport numpy as npimport sounddevice as sdimport websocketfrom typing import Callable, List, Dict, Any, Optionalimport timeclass RealtimeVoiceAgent:def __init__(self,api_key: Optional[str] = None,model_url: str = "wss://api.openai.com/v1/realtime?model=gpt-realtime-2025-08-28",system_instructions: str = "You are a helpful voice assistant.",tools: Optional[List[Dict[str, Any]]] = None,tool_choice: str = "auto",voice: str = "alloy",input_audio_format: str = "pcm16",output_audio_format: str = "pcm16",in_sample_rate: int = 16000,out_sample_rate: int = 24000,channels: int = 1,blocksize: int = 1024,handle_function_call: Optional[Callable[[websocket.WebSocketApp, Dict[str, Any]], None]] = None,debug: bool = True,# built-in tools (objective completed / end session)objective_completion_description: str = ("Call this when you have achieved the user’s goal and already spoken the final answer. ""Provide a short summary in the summary field."),end_session_description: str = ("Call this when the user asks to end the conversation or return to the main menu. ""Provide a brief reason."),# speak-first optionsspeak_first: bool = False,intro_instructions: str = "Hi there—how can I help you today?",intro_out_of_band: bool = False, # True => keep intro out of default conversation# Weave logging optionsconversation_id: Optional[str] = None,agent_id: Optional[str] = None,):self.API_KEY = api_key or os.environ.get("OPENAI_API_KEY") or "REPLACE_ME"self.URL = model_urlself.system_instructions = system_instructionsself.user_tools = tools or []self.tool_choice = tool_choiceself.voice = voiceself.input_audio_format = input_audio_formatself.output_audio_format = output_audio_formatself.IN_SR = in_sample_rateself.OUT_SR = out_sample_rateself.CHANNELS = channelsself.BLOCK = blocksizeself.handle_function_call_cb = handle_function_callself.debug = debug# speak-firstself.speak_first = speak_firstself.intro_instructions = intro_instructionsself.intro_out_of_band = intro_out_of_band# Metadata for loggingself.conversation_id = conversation_idself.agent_id = agent_id# Transcript collectionself.transcript: List[Dict[str, Any]] = []# ws + audio stateself._ws: Optional[websocket.WebSocketApp] = Noneself._stop_event = threading.Event()self._in_q: "queue.Queue[np.ndarray]" = queue.Queue()self._out_buf = bytearray()self._out_lock = threading.Lock()self._audio_in_thread: Optional[threading.Thread] = Noneself._audio_out_thread: Optional[threading.Thread] = None# response lifecycleself._active_response_id: Optional[str] = Noneself._need_response_after_tool = False# built-in toolsself._objective_tool_name = "objective_completion_tool"self._end_session_tool_name = "end_session_tool"self._awaiting_end_after_assistant_msg = Falseself._objective_completion_description = objective_completion_descriptionself._end_session_description = end_session_description# ---------------- Public API ----------------def start(self):if self._ws is not None:if self.debug: print("[DEBUG] Already started")returnself._audio_out_thread = threading.Thread(target=self._audio_player_loop, daemon=True)self._audio_out_thread.start()if self.debug: print("[DEBUG] Connecting to", self.URL)self._ws = websocket.WebSocketApp(self.URL,header=["Authorization: Bearer " + self.API_KEY,"OpenAI-Beta: realtime=v1",],on_open=self._on_open,on_message=self._on_message,on_close=self._on_close,on_error=self._on_error,)threading.Thread(target=self._ws.run_forever, daemon=True).start()def stop(self):time.sleep(4.5) # allow any final audio to play outif self.debug: print("[DEBUG] Agent stopping")self._stop_event.set()if self._ws:try:self._ws.close()except Exception:passself._ws = Noneprint("[AGENT EXITED]")def get_transcript(self) -> List[Dict[str, Any]]:"""Return the collected transcript of interactions."""return self.transcript# ---------------- Internals ----------------def _compose_tools(self) -> List[Dict[str, Any]]:objective_tool = {"type": "function","name": self._objective_tool_name,"description": self._objective_completion_description,"parameters": {"type": "object","properties": {"summary": {"type": "string", "description": "One line summary."}}}}end_tool = {"type": "function","name": self._end_session_tool_name,"description": self._end_session_description,"parameters": {"type": "object","properties": {"reason": {"type": "string", "description": "Why the user ended or asked to return to main menu."},"farewell": {"type": "string", "description": "Short closing line to speak to the user."}}}}return [objective_tool, end_tool] + self.user_toolsdef _send_session_update(self, ws: websocket.WebSocketApp):ev = {"type": "session.update","session": {"voice": self.voice,"instructions": (self.system_instructions+ " If the user says end, stop, goodbye, or return to main menu, call end_session_tool."),# strings only"input_audio_format": self.input_audio_format,"output_audio_format": self.output_audio_format,"input_audio_transcription": {"model": "whisper-1"},"tools": self._compose_tools(),"tool_choice": self.tool_choice,}}ws.send(json.dumps(ev))if self.debug:print("[DEBUG] session.update sent with tools:", [t["name"] for t in self._compose_tools()])def _mic_cb(self, indata, frames, time, status):if status and self.debug:print("[MIC STATUS]", status)self._in_q.put(indata.copy())def _start_mic_stream(self, ws: websocket.WebSocketApp):def mic_loop():with sd.InputStream(samplerate=self.IN_SR,channels=self.CHANNELS,dtype="float32",blocksize=self.BLOCK,callback=self._mic_cb):print("[DEBUG] Mic ready. Speak naturally.")while not self._stop_event.is_set():try:audio = self._in_q.get(timeout=0.1)except queue.Empty:continueb64 = self._base64_audio(audio[:, 0])ws.send(json.dumps({"type": "input_audio_buffer.append", "audio": b64}))self._audio_in_thread = threading.Thread(target=mic_loop, daemon=True)self._audio_in_thread.start()def _audio_player_loop(self):bytes_per_frame = 2 * self.CHANNELSframes_needed = 2048def cb(outdata, frames, time, status):if status and self.debug:print("[PLAYBACK STATUS]", status)need_bytes = frames * bytes_per_framewith self._out_lock:if len(self._out_buf) >= need_bytes:chunk = self._out_buf[:need_bytes]del self._out_buf[:need_bytes]else:chunk = bytes(need_bytes)s16 = np.frombuffer(chunk, dtype=np.int16)f32 = (s16.astype(np.float32) / 32767.0).reshape(-1, self.CHANNELS)outdata[:] = f32with sd.OutputStream(samplerate=self.OUT_SR,channels=self.CHANNELS,dtype="float32",callback=cb,blocksize=frames_needed):print("[DEBUG] Speaker ready")while not self._stop_event.is_set():sd.sleep(100)# --------------- WS Callbacks ----------------def _on_open(self, ws):if self.debug: print("[DEBUG] WebSocket connected")self._send_session_update(ws)# Speak first (optional)if self.speak_first and self.intro_instructions:payload = {"type": "response.create","response": {"instructions": self.intro_instructions}}if self.intro_out_of_band:payload["response"]["conversation"] = "none"ws.send(json.dumps(payload))if self.debug: print("[DEBUG] Intro response.create issued")self._start_mic_stream(ws)def _on_close(self, ws, *args):if self.debug: print("[DEBUG] WebSocket closed")def _on_error(self, ws, err):print("[DEBUG] WebSocket error:", err)def _on_message(self, ws, message):try:ev = json.loads(message)except Exception as e:print("[ERROR] bad JSON:", e)returnt = ev.get("type")# Print all events with full data for debugging (skip audio deltas to reduce clutter)if self.debug and t not in ["response.audio.delta", "response.audio_transcript.delta"]:print(f"\n{'='*80}")print(f"[SERVER EVENT] Type: {t}")print(f"[SERVER EVENT] Full data:")print(json.dumps(ev, indent=2))print(f"{'='*80}\n")if t == "response.created":self._active_response_id = ev.get("response", {}).get("id")if self.debug: print("[DEBUG] response.created ->", self._active_response_id)returnif t == "response.done":if self.debug: print("[DEBUG] response.done ->", self._active_response_id)self._active_response_id = Noneif self._need_response_after_tool:self._need_response_after_tool = Falseself._safe_response_create(ws)returnif t == "response.audio.delta":b64 = ev.get("delta")if b64:pcm = base64.b64decode(b64)with self._out_lock:self._out_buf.extend(pcm)returnif t == "input_audio_buffer.speech_started":if self.debug: print("[DEBUG] VAD: speech started")returnif t == "input_audio_buffer.speech_stopped":if self.debug: print("[DEBUG] VAD: speech stopped")returnif t == "conversation.item.input_audio_transcription.completed":transcript = ev.get("transcript", "")if transcript:print(f"\n[USER]: {transcript}\n")# Collect user input for transcriptself.transcript.append({"type": "user_input","content": transcript,"timestamp": time.time(),"agent_id": self.agent_id,"metadata": {"event_id": ev.get("event_id"),"item_id": ev.get("item_id"),}})returnif t == "response.output_audio_transcript.delta":delta = ev.get("delta", "")if delta:print(delta, end="", flush=True) # Print transcript chunks as they arrivereturnif t == "response.output_audio_transcript.done":transcript = ev.get("transcript", "")if transcript:print() # New line after complete transcriptif self.debug: print(f"[DEBUG] Complete transcript: {transcript}")returnif t == "response.output_item.done":item = ev.get("item", {})# Extract and print transcript from completed message itemsif item.get("type") == "message" and item.get("role") == "assistant":content = item.get("content", [])if content and len(content) > 0:transcript = content[0].get("transcript", "")if transcript:print(f"\n[ASSISTANT]: {transcript}\n")# Collect assistant response for transcriptself.transcript.append({"type": "assistant_response","content": transcript,"timestamp": time.time(),"agent_id": self.agent_id,"metadata": {"response_id": ev.get("response_id"),"item_id": item.get("id"),"event_id": ev.get("event_id"),}})if self._awaiting_end_after_assistant_msg:if self.debug: print("[DEBUG] Final assistant message delivered; stopping agent")self.stop()returnif item.get("type") == "function_call":self._dispatch_function_call(ws, item)returnif t == "response.done.function_call":returnif t == "response.function_call_arguments.done":# Collect tool calls for transcriptfunction_name = ev.get("name")arguments = ev.get("arguments", "{}")if function_name:print(f"\n[TOOL CALL]: {function_name}")print(f"[ARGUMENTS]: {arguments}\n")self.transcript.append({"type": "tool_call","content": f"Function: {function_name}","timestamp": time.time(),"agent_id": self.agent_id,"metadata": {"function_name": function_name,"arguments": arguments,"call_id": ev.get("call_id"),"response_id": ev.get("response_id"),"item_id": ev.get("item_id"),"event_id": ev.get("event_id"),}})returnif t == "error":print("[SERVER ERROR]", ev)return# --------------- Function Calls ---------------def _dispatch_function_call(self, ws, item):name = item.get("name")if self.debug:print(f"[DEBUG] function_call detected: {name} raw={item}")# End session tool: acknowledge, ask model to speak closing, then exit after next assistant messageif name == self._end_session_tool_name:args = json.loads(item.get("arguments", "{}") or "{}")farewell = args.get("farewell", "Okay, ending the session now.")ws.send(json.dumps({"type": "conversation.item.create","item": {"type": "function_call_output","call_id": item.get("call_id"),"output": json.dumps({"ok": True, "farewell": farewell})}}))if self.debug: print("[DEBUG] end_session_tool acknowledged")self._awaiting_end_after_assistant_msg = Trueself._safe_response_create(ws)return# Objective completion tool: same exit flow after the model speaks its closing lineif name == self._objective_tool_name:ws.send(json.dumps({"type": "conversation.item.create","item": {"type": "function_call_output","call_id": item.get("call_id"),"output": json.dumps({"ok": True})}}))if self.debug: print("[DEBUG] objective_completion_tool acknowledged")self._awaiting_end_after_assistant_msg = Trueself._safe_response_create(ws)return# User-registered toolsif not self.handle_function_call_cb:ws.send(json.dumps({"type": "conversation.item.create","item": {"type": "function_call_output","call_id": item.get("call_id"),"output": "{}"}}))self._safe_response_create(ws)returnbefore = self._active_response_idself.handle_function_call_cb(ws, item)after = self._active_response_idif before is not None and before == after:self._need_response_after_tool = True# --------------- Helpers ---------------def _safe_response_create(self, ws):if self._active_response_id is None:ws.send(json.dumps({"type": "response.create"}))if self.debug: print("[DEBUG] response.create issued")else:self._need_response_after_tool = Trueif self.debug: print("[DEBUG] response.create deferred (active response)")@staticmethoddef _float_to_pcm16(f32: np.ndarray) -> bytes:f = np.clip(f32, -1.0, 1.0)return (f * 32767).astype(np.int16).tobytes()def _base64_audio(self, f32_mono: np.ndarray) -> str:return base64.b64encode(self._float_to_pcm16(f32_mono)).decode("ascii")# ---------------- Demo: simple weather tool ----------------def weather_handler(ws, call):args = json.loads(call.get("arguments", "{}") or "{}")loc = args.get("location", "unknown")print(f"[DEBUG] weather tool called with location={loc}")demo = {"paris": {"temp": "21°C", "condition": "Partly cloudy"},"new york": {"temp": "22°C", "condition": "Sunny"},"london": {"temp": "16°C", "condition": "Rainy"},"tokyo": {"temp": "25°C", "condition": "Clear"},"chicago": {"temp": "20°C", "condition": "Breezy"},}w = demo.get(loc.lower(), {"temp": "20°C", "condition": "Mild"})out = json.dumps({"location": loc, "temperature": w["temp"], "condition": w["condition"]})ws.send(json.dumps({"type": "conversation.item.create","item": {"type": "function_call_output","call_id": call["call_id"],"output": out}}))print("[DEBUG] weather result posted; base class will auto-create response")if __name__ == "__main__":tools = [{"type": "function","name": "get_weather","description": "Get current weather for a location.","parameters": {"type": "object","properties": { "location": {"type": "string"} },"required": ["location"]}}]agent = RealtimeVoiceAgent(system_instructions=("You are a voice assistant. If the user asks for weather, call get_weather and say the result clearly. ""If the user asks to end or return to the main menu, call end_session_tool. ""When the task is fully complete and you have spoken the final answer, you may call objective_completion_tool. Speak English."),tools=tools,handle_function_call=weather_handler,debug=True,# Speak-first demo:speak_first=True,intro_instructions="Start out greeting the user and asking if they would like to check the weather. speak english"# intro_out_of_band=True, # optional if you want the intro outside the default conversation)agent.start()try:while not agent._stop_event.is_set():threading.Event().wait(0.2)except KeyboardInterrupt:agent.stop()
Real-time voice models can understand spoken audio directly. They analyze the raw waveform and respond in real time without needing a separate transcription step. However, if you want a text record of what the user actually said, you must attach an explicit transcription model. This is handled in our code through the input_audio_transcription field in the session configuration, where we specify model: whisper-1.
Whisper runs independently of the realtime model. It listens to the same incoming audio buffers and produces text transcripts as events. The realtime model continues using the audio for understanding and response generation, while Whisper’s role is purely to produce readable text that can be displayed or logged.
The agent receives these transcripts through conversation.item.input_audio_transcription.completed events and adds them to the internal transcript log. The assistant’s replies are also captured as response.output_audio_transcript events, providing both sides of the conversation in text form. Together, these entries form a complete trace of each turn, including what the user said, what the model spoke, and any function calls that occurred.
Saving these transcripts allows you to send them to W&B Weave for later analysis and compliance. Weave tracks each tool call and response as a node in a trace, letting you inspect every step of the conversation: which function was called, what arguments were passed, and how long each step took. By pairing that trace with the stored transcripts, you get synchronized audio, text, and action data for debugging, evaluation, or demonstration.
Building the router agent and chat loop
At the top level, this multi-agent call center system runs a simple chat loop that manages the entire interaction flow. It starts by launching the router agent, waits for it to finish listening, and reads the printed route result. Based on that route, the loop starts the matching specialized agent, such as authentication, failed transaction, or fraud, and lets it handle the rest of the call. When that agent finishes, the loop either ends or returns to the router if the user wants to go back to the main menu.
This loop keeps the logic straightforward. The router determines which problem area the user belongs to, and the orchestrator simply spawns the correct agent process. Each agent is isolated in its own script and runs independently through a subprocess call. That isolation wasn’t just a design choice; it fixed an issue where the audio libraries may not close properly between agents. Running each one in a clean process guarantees fresh audio input and output for every stage.
The router itself handles only one job: listening to the user’s initial request and mapping it to a route. It exposes a set of callable tools: start_failed_agent, start_auth_agent, start_fraud_agent, and route_to_human, each representing a possible destination. Once the model picks one, the router confirms the tool call, prints a line like ROUTE: failed, and stops. The chat loop then reads that output, launches the corresponding agent, and the conversation continues naturally without the user noticing the handoff.
This pattern turns the voice system into a chain of small, focused components. The chat loop coordinates everything, the router decides direction, and the specialized agents handle their domains from start to finish.
Here is the code for the chat loop:
import osimport sysimport subprocessimport shleximport timeimport uuidimport jsonimport weavefrom typing import Dict, Any, List, OptionalROUTE_TO_SCRIPT = {"fraud": "fraud_agent.py","failed": "transaction_agent.py","auth": "password_agent.py","human": None,"exit": None,}# IMPORTANT: call the CLI flavor, unbufferedROUTER_CMD = f"{shlex.quote(sys.executable)} -u {shlex.quote('voice_router_agent.py')}"@weave.opdef record_interaction(interaction_type: str,content: str,conversation_id: Optional[str] = None,agent_id: Optional[str] = None,metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:"""Record and track interactions in the voice agent conversation.Weave is great for monitoring LLMs in production, allowing you to trackperformance characteristics such as response length, prompt coverage, orrare failure cases over time. This makes it easier to iterate on systembehavior, uncover unintended behaviors, or debug edge cases—all of whichare key parts of effectively aligning and evaluating language models inreal-world applications.Args:interaction_type: Type of interaction (user_input, assistant_response, tool_call)content: The actual content (transcript or function call details)conversation_id: Unique identifier for the conversation sessionagent_id: Identifier for the specific agent (e.g., "password_agent", "transaction_agent")metadata: Additional context (response_id, item_id, function name, etc.)Returns:Dictionary containing the logged interaction data"""passdef run_router_once(conversation_id: str) -> str:env = os.environ.copy()env["CONVERSATION_ID"] = conversation_idproc = subprocess.run(ROUTER_CMD,shell=True,capture_output=True,text=True,check=False,env=env,)# Print router logs so you can see what's happeningif proc.stderr:sys.stderr.write(proc.stderr)sys.stderr.flush()if proc.stdout:sys.stdout.write(proc.stdout)sys.stdout.flush()route = "human"for line in (proc.stdout or "").splitlines():if line.strip().lower().startswith("route:"):route = line.split(":", 1)[1].strip().lower()breakreturn routedef run_agent_script(script: str, conversation_id: str) -> Optional[List[Dict[str, Any]]]:"""Run an agent script in a subprocess and read its transcript from file."""print(f"[orchestrator] launching agent script: {script}")# Define transcript file pathtranscript_file = f"/tmp/transcript_{conversation_id}.json"cmd = f"{shlex.quote(sys.executable)} -u {shlex.quote(script)}"env = os.environ.copy()env["CONVERSATION_ID"] = conversation_idenv["TRANSCRIPT_FILE"] = transcript_file# DON'T capture output - let the agent speak and show debug messagessubprocess.run(cmd,shell=True,check=False,env=env,# No capture_output=True here - let stdout/stderr flow to terminal)# Read the transcript file if it existstranscript = Noneif os.path.exists(transcript_file):try:import jsonwith open(transcript_file, 'r') as f:transcript = json.load(f)# Clean up the transcript fileos.remove(transcript_file)except Exception as e:print(f"[orchestrator] error reading transcript: {e}")return transcript@weave.opdef main():# Initialize Weave for tracking and monitoringprint("Voice Router Orchestrator. Ctrl+C to exit.")# Generate a unique conversation ID for this sessionconversation_id = str(uuid.uuid4())print(f"[orchestrator] Starting conversation session: {conversation_id}")while True:route = run_router_once(conversation_id)print(f"[orchestrator] router chose: {route}")if route == "exit":print("[orchestrator] exiting.")breakif route == "human":print("Connecting you to a human representative… (placeholder)")time.sleep(1.0)continuescript = ROUTE_TO_SCRIPT.get(route)if not script:print("[orchestrator] unknown route; returning to router.")continue# Brief pause to allow audio devices to fully release from routerprint("[orchestrator] preparing handoff to agent...")time.sleep(0.5)# Run agent and get transcripttranscript = run_agent_script(script, conversation_id)# Log all interactions from the transcriptif transcript:print(f"[orchestrator] logging {len(transcript)} interactions to Weave")for interaction in transcript:record_interaction(interaction_type=interaction.get("type"),content=interaction.get("content"),conversation_id=conversation_id,agent_id=interaction.get("agent_id"),metadata=interaction.get("metadata"))# Brief pause before restarting routerprint("[orchestrator] agent complete, restarting router...")time.sleep(0.5)# when the agent exits, loop back to routerif __name__ == "__main__":weave.init("finance-callcenter-voice-agents")main()
Each agent writes a JSON transcript at the end of its run. The chat loop passes a shared conversation_id so every transcript can be tied together. When an agent exits, the loop reads that file, deletes it, and logs every entry to Weave.
The transcript holds every exchange: user_input from Whisper, assistant_response from the model, and tool_call entries with arguments and IDs. Each line is timestamped and tagged with the agent_id that produced it.
The record_interaction function is decorated with a weave.op, so every call is automatically traced and visible in Weave. The orchestrator iterates through the transcript, calling record_interaction for each entry. That creates a full trace tree under one conversation_id, showing how the router and each agent interacted.
This chat loop instantiates the following router agent for classifying intent of the user so it can route to a specialized agent:
import osimport jsonimport threadingfrom typing import Dict, Any, Optionalfrom realtime_agent_base import RealtimeVoiceAgent # your fixed base classclass VoiceRouterAgent:"""Voice-first router that listens and selects ONE route via a tool call.It DOES NOT launch sub-agents. Instead, it prints "ROUTE: <route>"to stdout and exits. This avoids audio handle reuse issues."""def __init__(self,api_key: Optional[str] = None,model_url: str = "wss://api.openai.com/v1/realtime?model=gpt-realtime-2025-08-28",voice: str = "alloy",debug: bool = True,# --- speak-first options ---speak_first: bool = True,intro_instructions: str = ("use this intro to greet the user: welcome to the GPT Bank. I can connect you to the right team. ""Say 'fraud' for a suspicious charge, 'failed' for a declined payment, ""'auth' for login or password help, or 'human' to talk to a person. ""You can also say 'exit' to return to the main menu. Speak English."),intro_out_of_band: bool = False, # set True if your base agent supports OOB introsconversation_id: Optional[str] = None,):self.route_choice: Optional[str] = Noneself.debug = debug# Tools the model can call to select a routeself.tools = [{"type": "function", "name": "start_fraud_agent","description": "Route to the Fraud Agent.", "parameters": {"type": "object", "properties": {}}},{"type": "function", "name": "start_failed_agent","description": "Route to the Failed Transaction Agent.", "parameters": {"type": "object", "properties": {}}},{"type": "function", "name": "start_auth_agent","description": "Route to the Auth Agent.", "parameters": {"type": "object", "properties": {}}},{"type": "function", "name": "route_to_human","description": "Route to a human representative.", "parameters": {"type": "object", "properties": {}}},{"type": "function", "name": "shutdown_router","description": "Exit router without choosing an agent (user said exit/main menu).","parameters": {"type": "object", "properties": {}}},]system_instructions = ("Only speak English You are a voice router for a bank call center.\n""CALL EXACTLY ONE TOOL based on the user's intent:\n""- start_fraud_agent for fraudulent/unauthorized charge, dispute, stolen card, chargeback\n""- start_failed_agent for declined card, blocked merchant, failed payment, whitelist/allowlist\n""- start_auth_agent for login issue, password reset, email/verification code, identity verification\n""- route_to_human for anything else or if unclear\n""- shutdown_router if the user says exit or main menu\n\n""After you call a tool, do not keep talking. After calling the tool, EXIT!.")self.agent = RealtimeVoiceAgent(api_key=api_key or os.environ.get("OPENAI_API_KEY"),model_url=model_url,system_instructions=system_instructions,tools=self.tools,tool_choice="auto",voice=voice,input_audio_format="pcm16",output_audio_format="pcm16",handle_function_call=self._handle_tool_call,debug=debug,objective_completion_description="(unused by router)",# --- speak-first wiring ---speak_first=speak_first,intro_instructions=intro_instructions,intro_out_of_band=intro_out_of_band,# Weave loggingconversation_id=conversation_id,agent_id="voice_router_agent",)def _handle_tool_call(self, ws, item: Dict[str, Any]):name = item.get("name")# ACK the tool call so the model is satisfiedws.send(json.dumps({"type": "conversation.item.create","item": {"type": "function_call_output","call_id": item["call_id"],"output": json.dumps({"ok": True})}}))if self.debug:print(f"[router] tool call received: {name}", flush=True)# Map tool -> route stringmapping = {"start_fraud_agent": "fraud","start_failed_agent": "failed","start_auth_agent": "auth","route_to_human": "human","shutdown_router": "exit",}self.route_choice = mapping.get(name, "human")# Stop the router voice session immediately after routingself.agent.stop()def run(self) -> str:self.agent.start()try:while not self.agent._stop_event.is_set():threading.Event().wait(0.25)except KeyboardInterrupt:self.agent.stop()return self.route_choice or "human"if __name__ == "__main__":conversation_id = os.environ.get("CONVERSATION_ID")router = VoiceRouterAgent(debug=True, conversation_id=conversation_id)route = router.run()# Print one clean, parseable line for the orchestratorprint(f"ROUTE: {route}", flush=True)
Implementing a password reset agent into our call center
The authentication and password reset path uses the same real-time voice base as the other agents, but its job is specific. It verifies identity, delivers a 6-digit code by email, confirms that code, and, if requested, sends a password reset link. The AI agent takes in system instructions, a tool schema, and a single function handler. The tools declare what actions are allowed. The handler executes Python functions in response to tool calls and returns structured results to the model, allowing the conversation to continue smoothly.
The model gathers name, birth date, SSN, address, and the account number. The verify tool checks those fields against the local banking data. If verification fails, the agent offers one retry and then ends gracefully. The agent never reads the SSN back to the caller, and it does not expose balances or other account data.
Once verified, the user can request a reset. The agent calls the send email code tool, which looks up the email on file, generates a 6-digit code, and sends it through the configured SMTP account. When the caller speaks the code, the confirm email code tool validates it. If it matches, the agent can then call the send password reset link tool, which emails a short-lived link to the same address on file and returns a status object the model can explain in plain speech.
Here’s the code for the password reset agent:
import osimport jsonimport sslimport smtplibimport randomfrom typing import Dict, Any, Optional, Listfrom email.message import EmailMessagefrom bank_infra import BankSystemfrom realtime_agent_base import RealtimeVoiceAgent # your fixed voice baseclass AuthAgent:"""Voice-first Authentication assistant built on RealtimeVoiceAgent.Public surface:- __init__(db_path="bank_data.json", api_key=None, email_account=..., email_password=..., speak_first=True, intro_instructions=..., intro_out_of_band=False)- run()"""def __init__(self,db_path: str = "bank_data.json",api_key: Optional[str] = None,email_account: str = "your email address",email_password: Optional[str] = "app password for your account",speak_first: bool = True,intro_instructions: str = ("Hello, I'm the authentication assistant. ""I'll verify your identity, send a 6-digit email code, and help with password reset if needed. ""To begin, please tell me your account number."),intro_out_of_band: bool = False,conversation_id: Optional[str] = None,):self.api_key = api_key or os.environ.get("OPENAI_API_KEY")self.bank = BankSystem(db_path)self.EMAIL_ACCOUNT = email_accountself.EMAIL_PASSWORD = email_password or os.environ.get("SMTP_PASSWORD") or ""self.EMAIL_CODES: Dict[str, Dict[str, str]] = {}# --- Policy / flow (aligned with your original, plus explicit end/objective tools) ---self.SYSTEM_PROMPT = ("You are a banking authentication assistant. ""Gather user information, verify with verify_personal_info, send an email code, confirm it, ""and if the user requests a password reset, first verify the users personal info, then send_email_code, then get the code from the user and use confirm_email_code, then finally use send_password_reset_link to email them a reset link. ""Never expose account data. Keep prompts brief and clear. ""If identity verification fails, offer one retry; if it still fails, end gracefully and direct the user to human support ""(call end_session_tool). ""When the task is fully complete or the user wants to go back to the main menu, ""call objective_completion_tool with a one-line summary and then end_session_tool. ""Speak English.")# Build the realtime agent (base auto-adds end_session_tool / objective_completion_tool)self.agent = RealtimeVoiceAgent(api_key=self.api_key,system_instructions=self.SYSTEM_PROMPT,tools=self._make_tools_schema(),handle_function_call=self._make_function_handler(),debug=True,# speak-first introspeak_first=speak_first,intro_instructions=intro_instructions,intro_out_of_band=intro_out_of_band,# Weave loggingconversation_id=conversation_id,agent_id="password_agent",)# ---------------- Tool Schemas ----------------def _make_tools_schema(self) -> List[Dict[str, Any]]:return [{"type": "function","name": "verify_personal_info","description": "Validate personal info against bank_data.json. Returns {verified: bool}.","parameters": {"type": "object","properties": {"name": {"type": "string"},"dob": {"type": "string", "description": "YYYY-MM-DD"},"soc": {"type": "string", "description": "SSN NNN-NN-NNNN"},"address": {"type": "string"},"account_number": {"type": "string"}},"required": ["name", "dob", "soc", "address", "account_number"]}},{"type": "function","name": "send_email_code","description": "Looks up the email on file for the account and emails a 6-digit code.","parameters": {"type": "object","properties": {"account_number": {"type": "string"}},"required": ["account_number"]}},{"type": "function","name": "confirm_email_code","description": "Confirm the emailed code.","parameters": {"type": "object","properties": {"account_number": {"type": "string"},"code": {"type": "string"}},"required": ["account_number", "code"]}},{"type": "function","name": "send_password_reset_link","description": "Sends a password reset link to the user's registered email address.","parameters": {"type": "object","properties": {"account_number": {"type": "string"}},"required": ["account_number"]}}]# ---------------- Helpers ----------------def _load_email_for_account(self, account_number: str) -> Optional[str]:try:data = self.bank._load_db()rec = data.get(account_number)return rec.get("email") if rec else Noneexcept Exception:return Nonedef _send_email(self, to_addr: str, subject: str, body: str, html: Optional[str] = None) -> Dict[str, Any]:if not to_addr:return {"ok": False, "message": "no recipient"}msg = EmailMessage()msg["From"] = self.EMAIL_ACCOUNTmsg["To"] = to_addrmsg["Subject"] = subjectmsg.set_content(body)if html:msg.add_alternative(html, subtype="html")try:with smtplib.SMTP("smtp.gmail.com", 587) as s:s.starttls(context=ssl.create_default_context())s.login(self.EMAIL_ACCOUNT, self.EMAIL_PASSWORD)s.send_message(msg)return {"ok": True}except Exception as e:return {"ok": False, "message": str(e)}# ---------------- Tool Implementations ----------------def verify_personal_info(self, name: str, dob: str, soc: str, address: str, account_number: str) -> Dict[str, bool]:ok = self.bank.validate_customer(name=name, dob=dob, soc=soc, address=address, account_number=account_number)return {"verified": bool(ok)}def send_email_code(self, account_number: str) -> Dict[str, Any]:email_to_send = self._load_email_for_account(account_number)if not email_to_send:return {"status": "error", "message": "no email on file for this account"}code = f"{random.randint(100000, 999999)}"self.EMAIL_CODES[account_number] = {"email": email_to_send, "code": code}subject = "Your verification code"body = f"Your verification code is {code}. It expires in 10 minutes."html = f"<p>Your verification code is <b>{code}</b>. It expires in 10 minutes.</p>"send_result = self._send_email(email_to_send, subject, body, html)if not send_result.get("ok"):return {"status": "error", "message": send_result.get("message", "send failed")}return {"status": "sent", "to": email_to_send}def confirm_email_code(self, account_number: str, code: str) -> Dict[str, Any]:rec = self.EMAIL_CODES.get(account_number)if not rec:return {"success": False, "message": "no code sent"}if rec["code"] == str(code).strip():del self.EMAIL_CODES[account_number]return {"success": True}return {"success": False, "message": "invalid code"}def send_password_reset_link(self, account_number: str) -> Dict[str, Any]:email_to_send = self._load_email_for_account(account_number)if not email_to_send:return {"status": "error", "message": "no email on file for this account"}reset_link = f"https://bank-secure-reset.com/reset/{random.randint(1000000, 9999999)}"subject = "Password Reset Request"body = f"Click this link to reset your password: {reset_link}\nThis link will expire in 15 minutes."html = f"<p>Click below to reset your password:</p><a href='{reset_link}'>{reset_link}</a>"send_result = self._send_email(email_to_send, subject, body, html)if not send_result.get("ok"):return {"status": "error", "message": send_result.get("message", "send failed")}return {"status": "sent", "link": reset_link, "to": email_to_send}# ---------------- WS Tool Dispatcher ----------------def _make_function_handler(self):"""Returns a callback(ws, item) for RealtimeVoiceAgent:- Parses tool call- Executes local method- Posts function_call_output (base manages safe response timing)"""def handler(ws, item: Dict[str, Any]):name = item.get("name")args = json.loads(item.get("arguments", "{}") or "{}")try:if name == "verify_personal_info":res = self.verify_personal_info(**args)elif name == "send_email_code":res = self.send_email_code(**args)elif name == "confirm_email_code":res = self.confirm_email_code(**args)elif name == "send_password_reset_link":res = self.send_password_reset_link(**args)else:res = {"error": f"unknown tool: {name}"}except Exception as e:res = {"error": str(e)}ws.send(json.dumps({"type": "conversation.item.create","item": {"type": "function_call_output","call_id": item.get("call_id"),"output": json.dumps(res)}}))print(f"[DEBUG] tool_result posted -> {name}")return handler# ---------------- Public API ----------------def run(self) -> List[Dict[str, Any]]:"""Starts the realtime voice session. Press Ctrl+C to stop.The assistant ends itself when objective_completion_tool / end_session_tool are invoked.Returns the full transcript of the conversation."""self.agent.start()try:while not self.agent._stop_event.is_set():import threadingthreading.Event().wait(0.25)except KeyboardInterrupt:self.agent.stop()return self.agent.get_transcript()# -------------- direct launch (optional) --------------if __name__ == "__main__":conversation_id = os.environ.get("CONVERSATION_ID")agent = AuthAgent(email_account=os.environ.get("SMTP_USER", "byyoung3@gmail.com"),email_password=os.environ.get("SMTP_PASSWORD", "zleoajqasqlqccci"),speak_first=True,intro_instructions=("Hi! I can help verify your identity and send you a 6-digit email code. ""To get started, what's your account number?"),conversation_id=conversation_id,)transcript = agent.run()# Write transcript to file if TRANSCRIPT_FILE env var is settranscript_file = os.environ.get("TRANSCRIPT_FILE")if transcript_file and transcript:try:import jsonwith open(transcript_file, 'w') as f:json.dump(transcript, f)except Exception as e:print(f"[ERROR] Failed to write transcript: {e}")
The base voice runtime handles the audio stream and the websocket session, while this agent focuses on the domain logic. Tool calls are logged to the transcript for later review. Objective completion and end-of-session signals let an orchestrator mark success and close the session cleanly. In practice, this yields a short flow that starts with identity, delivers a code, validates it, and issues a reset link, all without exposing sensitive fields in responses.
Building our fraud investigation agent
The fraud transaction investigation agent handles reports of suspicious charges. It operates on the same real-time voice base as the other assistants, handling audio input and output, transcription, and tool calling. Its behavior is defined by system instructions, a tool schema that declares allowed actions, and a single function handler that executes the underlying Python functions when those tools are invoked.
The flow is straightforward. The agent authenticates the caller against the local banking data, then asks for a date and a merchant name. It searches the ledger for matches and reads back a compact list with index numbers for easy selection. After the caller picks an item, the agent gathers a brief description, files a dispute as a structured record in the database, offers to block the merchant for future charges, and, if confirmed, updates card settings. A confirmation email with the dispute details is sent to the address on file.
Here’s the code for the agent:
import osimport jsonimport sslimport smtplibimport randomfrom typing import List, Dict, Any, Optionalfrom email.message import EmailMessagefrom bank_infra import BankSystemfrom realtime_agent_base import RealtimeVoiceAgent # your voice wrapperclass FraudAgent:"""Voice Fraud Agent- Public API unchanged: __init__(...), run()- Internals use RealtimeVoiceAgent for audio I/O, VAD, and function-calling."""def __init__(self,db_path: str = "bank_data.json",api_key: Optional[str] = None,email_account: str = "your email address",email_password: Optional[str] = "app password for your account",speak_first: bool = True,intro_instructions: str = ("Hello, you’ve reached the fraud desk. I’ll help you report a suspicious charge. ""To begin, please say your account number. Speak English."),):self.api_key = api_key or os.environ.get("OPENAI_API_KEY")self.bank = BankSystem(db_path)self.email_account = email_accountself.email_password = email_password or os.environ.get("SMTP_PASSWORD") or ""# ---- System policy (semantics match your original) ----self.SYSTEM_PROMPT = ("You are a bank assistant for reporting fraudulent transactions. ""Authenticate the customer before taking action. ""Ask for account number, then full name, date of birth YYYY-MM-DD, SSN, and address. ""Call verify_personal_info_tool. If verification fails, allow one retry, then stop and direct the user to contact support (call end_session_tool). ""After verification, require the user to provide a search date in YYYY-MM-DD and a merchant string. ""Call search_transactions_tool with those two fields. Show results with index numbers starting at 1, including timestamp, merchant, amount, and status. ""If zero results, ask the user for a nearby date or a different merchant string and search again. ""When the user selects an index, echo the exact timestamp, merchant, and amount and ask the user to confirm. ""Then ask for a short reason and any extra details. ""Call create_dispute_tool with timestamp, merchant, amount, reason, and extra details. ""After a successful dispute creation, ask if they want to block this merchant and if yes, call block_merchant_tool. ""Never reveal balances or SSN. Do not claim that funds are returned. ""After creating the dispute, email a copy of the dispute report to the email on file for the account. ""When the task is fully complete or the user wants to go back to the main menu, ""call objective_completion_tool with a one-line summary and then end_session_tool. ""Speak English.")# Build the realtime voice agent (it auto-adds objective/end tools)self.agent = RealtimeVoiceAgent(api_key=self.api_key,system_instructions=self.SYSTEM_PROMPT,tools=self._make_tools_schema(), # user toolshandle_function_call=self._make_function_handler(),debug=True,# speak-first configurationspeak_first=speak_first,intro_instructions=intro_instructions,)# ---------- Tool schemas ----------def _make_tools_schema(self) -> List[Dict[str, Any]]:return [{"type": "function","name": "verify_personal_info_tool","description": "Validate customer identity. Returns {verified: bool}.","parameters": {"type": "object","properties": {"name": {"type": "string"},"dob": {"type": "string", "description": "YYYY-MM-DD"},"soc": {"type": "string", "description": "SSN NNN-NN-NNNN"},"address": {"type": "string"},"account_number": {"type": "string"}},"required": ["name", "dob", "soc", "address", "account_number"]}},{"type": "function","name": "search_transactions_tool","description": "Search transactions by date and merchant substring. Amount is not used. Returns list sorted newest first.","parameters": {"type": "object","properties": {"account_number": {"type": "string"},"date": {"type": "string", "description": "YYYY-MM-DD"},"merchant_query": {"type": "string"},"include_failed": {"type": "boolean", "default": True}},"required": ["account_number", "date", "merchant_query"]}},{"type": "function","name": "create_dispute_tool","description": "Create a dispute record and email a report to the customer.","parameters": {"type": "object","properties": {"account_number": {"type": "string"},"timestamp": {"type": "string", "description": "ISO timestamp of the suspected transaction"},"merchant": {"type": "string"},"amount": {"type": "number"},"reason": {"type": "string"},"extra_details": {"type": "string"}},"required": ["account_number", "timestamp", "merchant", "amount", "reason"]}},{"type": "function","name": "block_merchant_tool","description": "Block a merchant for future charges on this card.","parameters": {"type": "object","properties": {"account_number": {"type": "string"},"merchant": {"type": "string"}},"required": ["account_number", "merchant"]}}]# ---------- Tool implementations ----------@staticmethoddef _same_day(ts: str, day: str) -> bool:ts_day = ts.split("T", 1)[0][:10]return ts_day == day[:10]def _load_email_for_account(self, account_number: str) -> Optional[str]:try:data = self.bank._load_db()rec = data.get(account_number)return rec.get("email") if rec else Noneexcept Exception:return Nonedef _send_email(self, to_addr: str, subject: str, body: str, html: Optional[str] = None) -> Dict[str, Any]:if not to_addr:return {"ok": False, "message": "no recipient"}msg = EmailMessage()msg["From"] = self.email_accountmsg["To"] = to_addrmsg["Subject"] = subjectmsg.set_content(body)if html:msg.add_alternative(html, subtype="html")try:with smtplib.SMTP("smtp.gmail.com", 587) as s:s.starttls(context=ssl.create_default_context())s.login(self.email_account, self.email_password)s.send_message(msg)return {"ok": True}except Exception as e:return {"ok": False, "message": str(e)}# --- tool bodies ---def verify_personal_info_tool(self, name: str, dob: str, soc: str, address: str, account_number: str) -> Dict[str, bool]:ok = self.bank.validate_customer(name=name, dob=dob, soc=soc, address=address, account_number=account_number)return {"verified": bool(ok)}def search_transactions_tool(self,account_number: str,date: str,merchant_query: str,include_failed: bool = True) -> List[Dict[str, Any]]:if not date or not merchant_query:raise ValueError("date and merchant_query are required")hist = self.bank.get_spending_history(account_number)mq = merchant_query.strip().lower()out: List[Dict[str, Any]] = []for (ttype, amt, ts, desc, status) in hist:if not self._same_day(ts, date):continueif mq not in str(desc).lower():continueif not include_failed and status == "failed":continueout.append({"type": ttype,"amount": float(amt),"timestamp": ts,"merchant": desc,"status": status})out.sort(key=lambda r: r["timestamp"], reverse=True)return outdef create_dispute_tool(self,account_number: str,timestamp: str,merchant: str,amount: float,reason: str,extra_details: Optional[str] = None) -> Dict[str, Any]:data = self.bank._load_db()acc = data.get(account_number)if not acc:return {"ok": False, "message": "account not found"}disputes = acc.setdefault("disputes", [])case_id = f"D{random.randint(100000, 999999)}"disputes.append({"case_id": case_id,"timestamp": timestamp,"merchant": merchant,"amount": float(amount),"reason": reason,"details": extra_details or "","status": "submitted"})acc.setdefault("history", []).append(["Withdraw", float(amount), timestamp, f"{merchant} [DISPUTED]", "disputed"])self.bank._save_db(data)to_email = self._load_email_for_account(account_number)report_txt = (f"Fraud Dispute Report\n"f"Case ID: {case_id}\n"f"Account: {account_number}\n"f"Transaction Timestamp: {timestamp}\n"f"Merchant: {merchant}\n"f"Amount: ${float(amount):.2f}\n"f"Reason: {reason}\n"f"Details: {extra_details or ''}\n"f"Status: submitted\n")report_html = (f"<h3>Fraud Dispute Report</h3>"f"<p><b>Case ID:</b> {case_id}</p>"f"<p><b>Account:</b> {account_number}</p>"f"<p><b>Transaction Timestamp:</b> {timestamp}</p>"f"<p><b>Merchant:</b> {merchant}</p>"f"<p><b>Amount:</b> ${float(amount):.2f}</p>"f"<p><b>Reason:</b> {reason}</p>"f"<p><b>Details:</b> {extra_details or ''}</p>"f"<p><b>Status:</b> submitted</p>")email_result = {"ok": False, "message": "no email on file"}if to_email:email_result = self._send_email(to_addr=to_email,subject=f"Your Fraud Dispute Report — Case {case_id}",body=report_txt,html=report_html)return {"ok": True,"disputes_count": len(disputes),"case_id": case_id,"email": {"sent": bool(email_result.get("ok")), "to": to_email, "error": email_result.get("message")}}def block_merchant_tool(self, account_number: str, merchant: str) -> Dict[str, Any]:return self.bank.disable_merchant(account_number, merchant)# ---------- Realtime tool dispatch ----------def _make_function_handler(self):"""Returns a callback(ws, item) for RealtimeVoiceAgent.Invokes our Python tool, posts JSON result, and lets the agent manage response timing."""def handler(ws, item: Dict[str, Any]):name = item.get("name")args = json.loads(item.get("arguments", "{}") or "{}")try:if name == "verify_personal_info_tool":res = self.verify_personal_info_tool(**args)elif name == "search_transactions_tool":res = self.search_transactions_tool(**args)elif name == "create_dispute_tool":res = self.create_dispute_tool(**args)elif name == "block_merchant_tool":res = self.block_merchant_tool(**args)else:res = {"error": f"unknown tool: {name}"}except Exception as e:res = {"error": str(e)}ws.send(json.dumps({"type": "conversation.item.create","item": {"type": "function_call_output","call_id": item.get("call_id"),"output": json.dumps(res)}}))print(f"[DEBUG] tool_result posted -> {name}")# Do NOT send response.create here. The wrapper coordinates that.return handler# ---------- Public run() ----------def run(self) -> List[Dict[str, Any]]:"""Run the agent and return the full transcript."""self.agent.start()try:while not self.agent._stop_event.is_set():import threadingthreading.Event().wait(0.25)except KeyboardInterrupt:self.agent.stop()return self.agent.get_transcript()# -------- Demo (run directly) --------if __name__ == "__main__":conversation_id = os.environ.get("CONVERSATION_ID")agent = FraudAgent(email_account=os.environ.get("SMTP_USER", "no-reply@bank.example"),email_password=os.environ.get("SMTP_PASSWORD", ""))transcript = agent.run()# Write transcript to file if TRANSCRIPT_FILE env var is settranscript_file = os.environ.get("TRANSCRIPT_FILE")if transcript_file and transcript:try:with open(transcript_file, 'w') as f:json.dump(transcript, f)except Exception as e:print(f"[ERROR] Failed to write transcript: {e}")
Every session produces a transcript for logging and review. It captures user speech, assistant replies, and each tool call with arguments and results. That transcript is saved so it can be logged to Weave inside the chat loop script.
Weave is a tracing and observability layer for AI applications. Calls to your functions become trace nodes with inputs, outputs, timing, and errors. Related calls can be grouped into a thread so one customer session shows up as a clear sequence of turns like verify identity, search transactions, create dispute, block merchant, send email.
Building the failed transaction agent
Our failed transaction agent handles declined or blocked card payments for our call center. It uses the same real-time voice base as the other paths, so audio input, speech output, and function calling are already wired. You configure behavior with system instructions, pass a tool schema, and provide a single function handler that executes the Python methods when tools are invoked.
The flow starts with identity. The agent asks for the account number, full name, date of birth in YYYY MM DD, SSN in NNN NN NNNN, and address, then calls the verification tool against the local banking data. If verification fails, it offers one retry and ends politely. It never reads the SSN back, and it does not reveal balances.
After verification, the caller provides a date. The agent calls the failed list tool and reads back a compact set of failed items with index numbers. If there is exactly one item, it asks for confirmation. If there are several, it asks the caller to pick an index. It then confirms the merchant name and calls the whitelist tool to allow that merchant on the card, returning a clear success message. If no failed items exist for that date, it offers to list all transactions for the same day so the caller can double-check the merchant or date before whitelisting.
Here’s the code for the Failed Transaction Agent
import osimport jsonfrom typing import List, Dict, Any, Optionalfrom bank_infra import BankSystemfrom realtime_agent_base import RealtimeVoiceAgent # your fixed voice baseclass FailedTransactionAgent:"""Voice-first Failed Transaction assistant built on RealtimeVoiceAgent.Public surface kept simple:- __init__(db_path="bank_data.json", api_key=None, speak_first=True, intro_instructions=...)- run()"""def __init__(self,db_path: str = "bank_data.json",api_key: Optional[str] = None,speak_first: bool = True,intro_instructions: str = ("Hello, I'm the failed-transactions assistant. ""I can help with declined or blocked payments. ""First I'll verify your identity, then we'll review the failed charge. ""What happened today?"),intro_out_of_band: bool = False,conversation_id: Optional[str] = None,):self.api_key = api_key or os.environ.get("OPENAI_API_KEY")self.bank = BankSystem(db_path)# --- System policy (mirrors your original, with explicit end/objective tools) ---self.SYSTEM_PROMPT = ("You are a bank support assistant focused on failed card transactions. ""Authenticate the customer before taking any action. ""Flow: ask for the account number, then full name, date of birth YYYY-MM-DD, SSN, and address. ""Call verify_personal_info_tool. If not verified, offer one retry, then stop and suggest contacting support ""(call end_session_tool). ""After successful verification, ask for the transaction date in YYYY-MM-DD. ""Call list_failed_tool for that date, show each failed item with an index starting at 1 ""(include timestamp, merchant, and amount only). ""If exactly one failed item, ask for confirmation. If multiple, ask the user to pick the index. ""Confirm the selected merchant string with the user. ""Then call whitelist_tool to add the merchant to the allowlist and confirm success. ""If there are zero failed items on that date, offer to list all transactions for that date via ""list_transactions_tool so the user can confirm the date or merchant name. ""Never reveal balances or SSN (SOC). Do not echo the SSN back to the user. ""Do not claim to unblock or settle funds. Only update the whitelist. ""Keep prompts short and clear. ""When the task is fully complete or the user wants to go back to the main menu, ""call objective_completion_tool with a one-line summary and then end_session_tool. ""Speak English.")# Build the realtime agent (base automatically adds end/objective tools)self.agent = RealtimeVoiceAgent(api_key=self.api_key,system_instructions=self.SYSTEM_PROMPT,tools=self._make_tools_schema(),handle_function_call=self._make_function_handler(),debug=True,# --- optional speak-first intro (supported by your base) ---speak_first=speak_first,intro_instructions=intro_instructions,intro_out_of_band=intro_out_of_band,# Weave loggingconversation_id=conversation_id,agent_id="transaction_agent",)# ---------------- Tool schemas ----------------def _make_tools_schema(self) -> List[Dict[str, Any]]:return [{"type": "function","name": "verify_personal_info_tool","description": "Validate customer identity against bank_data.json. Returns {verified: bool}.","parameters": {"type": "object","properties": {"name": {"type": "string"},"dob": {"type": "string", "description": "YYYY-MM-DD"},"soc": {"type": "string", "description": "SSN format NNN-NN-NNNN"},"address": {"type": "string"},"account_number": {"type": "string"}},"required": ["name", "dob", "soc", "address", "account_number"]}},{"type": "function","name": "list_transactions_tool","description": "List all transactions on a given date. Returns objects with type, amount, timestamp, merchant, status.","parameters": {"type": "object","properties": {"account_number": {"type": "string"},"date": {"type": "string", "description": "YYYY-MM-DD"}},"required": ["account_number", "date"]}},{"type": "function","name": "list_failed_tool","description": "List failed transactions on a given date. Returns objects with type, amount, timestamp, merchant.","parameters": {"type": "object","properties": {"account_number": {"type": "string"},"date": {"type": "string", "description": "YYYY-MM-DD"}},"required": ["account_number", "date"]}},{"type": "function","name": "whitelist_tool","description": "Add a merchant to allowed list for this card.","parameters": {"type": "object","properties": {"account_number": {"type": "string"},"merchant": {"type": "string"}},"required": ["account_number", "merchant"]}}]# ---------------- Tool implementations ----------------def verify_personal_info_tool(self, name: str, dob: str, soc: str, address: str, account_number: str) -> Dict[str, bool]:ok = self.bank.validate_customer(name=name, dob=dob, soc=soc, address=address, account_number=account_number)return {"verified": bool(ok)}def list_transactions_tool(self, account_number: str, date: str) -> List[Dict[str, Any]]:txns = self.bank.list_transactions_by_date(account_number, date, include_failed=True)return [{"type": t, "amount": a, "timestamp": ts, "merchant": d, "status": s} for t, a, ts, d, s in txns]def list_failed_tool(self, account_number: str, date: str) -> List[Dict[str, Any]]:txns = self.bank.list_failed_by_date(account_number, date)return [{"type": t, "amount": a, "timestamp": ts, "merchant": d} for t, a, ts, d in txns]def whitelist_tool(self, account_number: str, merchant: str) -> Dict[str, Any]:return self.bank.whitelist_merchant(account_number, merchant)# ---------------- WS tool dispatcher ----------------def _make_function_handler(self):"""Returns a callback(ws, item) suitable for RealtimeVoiceAgent.- Parses the tool call- Executes the local method- Sends function_call_output back over the socket(The base agent manages response.create timing to avoid double firing.)"""def handler(ws, item: Dict[str, Any]):name = item.get("name")args = json.loads(item.get("arguments", "{}") or "{}")try:if name == "verify_personal_info_tool":res = self.verify_personal_info_tool(**args)elif name == "list_transactions_tool":res = self.list_transactions_tool(**args)elif name == "list_failed_tool":res = self.list_failed_tool(**args)elif name == "whitelist_tool":res = self.whitelist_tool(**args)else:res = {"error": f"unknown tool: {name}"}except Exception as e:res = {"error": str(e)}ws.send(json.dumps({"type": "conversation.item.create","item": {"type": "function_call_output","call_id": item.get("call_id"),"output": json.dumps(res)}}))print(f"[DEBUG] tool_result posted -> {name}")return handler# ---------------- Public API ----------------def run(self) -> List[Dict[str, Any]]:"""Starts the realtime voice session. Press Ctrl+C to stop.The assistant ends itself when objective_completion_tool / end_session_tool are invoked.Returns the full transcript of the conversation."""self.agent.start()try:while not self.agent._stop_event.is_set():import threadingthreading.Event().wait(0.25)except KeyboardInterrupt:self.agent.stop()return self.agent.get_transcript()# -------------- direct launch (optional) --------------if __name__ == "__main__":conversation_id = os.environ.get("CONVERSATION_ID")agent = FailedTransactionAgent(conversation_id=conversation_id)transcript = agent.run()# Write transcript to file if TRANSCRIPT_FILE env var is settranscript_file = os.environ.get("TRANSCRIPT_FILE")if transcript_file and transcript:try:import jsonwith open(transcript_file, 'w') as f:json.dump(transcript, f)except Exception as e:print(f"[ERROR] Failed to write transcript: {e}")
Testing and debugging our AI agentsallows the password reset agent to deliver verification codes and reset links, while the fraud agent automatically sends dispute confirmation emails
After building the agents, we can start experimenting with the system in real time. To authenticate successfully, you'll need to provide the same personal details that appear in the JSON database: name, date of birth, address, SSN, and account number. These fields allow the verification tools to match your identity against the mock banking records and unlock the rest of the interaction flow.
If you want to enable email features, you also need to configure an app password for your email account so the agents can send and receive messages through SMTP. This lets the password reset agent deliver verification codes and reset links, and the fraud agent sends dispute confirmation emails automatically. Make sure your email address is included in one of the user entries in the database so you can actually receive those messages.
Running this system on your own machine may require small adjustments to the audio handling code. Audio behavior changes between setups, especially on macOS, where input and output devices are managed differently. I tested the system on an M1 Pro MacBook Pro, and while it runs smoothly overall, the underlying sound libraries can act up at times. Occasionally, the input stream does not close properly, or playback buffers stay open after an agent exits. To avoid this, each agent runs in a separate subprocess so that every session starts with a clean audio context and no leftover device locks.
You should always use headphones. If the agent’s voice plays through your speakers, the microphone will capture it again, causing the model to think it is still hearing the user. This feedback loop makes the agent talk to itself endlessly. Using headphones isolates the output from the microphone, keeping the conversation stable and avoiding echo or repeated speech.
Depending on your hardware, you might also need to adjust the input and output sample rates or the audio block size in the RealtimeVoiceAgent class. These parameters control how the audio streams are buffered and processed, and tuning them can fix timing or distortion issues. Once set correctly, the agents will handle live speech naturally, listening and responding in sync without glitches or confusion.
After testing your agent, you can open Weave to explore detailed traces of every interaction. Each conversation is logged as a sequence of events, showing the user’s speech, the model’s responses, and all function calls in between. You can expand any node to view the exact arguments passed into a tool, the returned output, and the timing for each step.
Weave gives you a visual record of how your agent behaves in real time. You can trace the full path of a session, from the first spoken input to the final end signal, making it easy to spot logic errors, latency spikes, or unexpected tool usage. It’s especially useful for refining prompts and verifying that your agents are following the intended conversation flow accurately.

Sample audio
I decided to add some logic to record the audio, and log the audio to Weights & Biases as well. Here's a sample of me reporting a suspicious transaction to the fraud agent:
Run set
1
Our modernized call center system
This project shows what happens when real-time speech, tool execution, and programmatic control meet in one system. Starting from a simple idea, automating common banking calls, we built a complete environment where conversational agents act as independent service representatives, each handling a clear, functional task. The foundation is the real-time voice framework, which links live audio streams to the model and routes tool calls to actual Python functions backed by a simulated banking database.
By layering the emulator, the voice agent base, the specialized assistants, and the orchestration loop, we created a comprehensive voice-driven call flow that authenticates users, reviews transactions, files disputes, and logs every step for inspection in Weave. The result is a working prototype of a multi-agent system that feels natural, transparent, and grounded in real data.
This isn’t just a proof of concept for a voice interface; it’s a foundation for building complex, auditable AI systems that can operate through natural conversation. With further tuning, real APIs, and tighter audio integration, the same framework could support real call handling at scale. The entire stack, from the JSON backend to the real-time websocket layer, demonstrates how voice, reasoning, and execution can exist in a single continuous loop.

Add a comment
Iterate on AI agents and models faster. Try Weights & Biases today.