Skip to main content

Tutorial: Call center modernization with multi-agent systems

In this tutorial, we will be building a working simulation of a banking customer service line powered by conversational agents!
Created on October 9|Last edited on October 13
Modern call centers are entering a new phase powered by multimodal LLMs that can listen, speak, and act in real time. Instead of static menu trees or delayed text exchanges, these systems interpret live speech, understand intent, and execute real actions using connected tools.
Banking is an ideal testing ground for this shift because most customer requests follow structured, well-defined workflows such as verifying identity, resolving failed payments, or resetting a password. A real-time model can handle these tasks naturally through conversation, combining speech recognition, reasoning, and tool use into one continuous process.
To explore how this works in practice, today we’re building a working simulation of a banking customer service line powered by conversational agents. The goal is to create a realistic support experience where a user speaks naturally, and the system handles everything from identity verification to transaction review without a human on the other end. Each agent plays a distinct role, routing calls, verifying customers, or resolving issues, while coordinating through a shared real-time voice framework.
To get a sense of where this tutorial leads, here’s a sample recording of the final system in action:

Run set
1


Table of contents



Designing our multi-agent call center

The first step in designing our multi-agent call center is understanding what customers generally need when they call. Most banking calls aren’t complex; they’re variations of a few core problems, including logging in, resetting a password, confirming identity, unblocking a declined transaction, or checking a suspicious charge. These are repetitive, structured, and rule-based, which makes them ideal for automation through conversational agents.
The system we're designing focuses on these high-frequency cases first. Each one follows a predictable flow: verify the caller, look up an account, perform a specific action, and confirm the outcome. By centering the design around these repeatable patterns, the agents can actually complete real interactions instead of handing off partial responses.
In our system, these agentic workflows will be split across three specialized agents:
  1. The authentication agent verifies identity, manages email codes, and handles password resets.
  2. The failed-transaction agent will focus on declined or blocked card payments, helping the user identify the cause and safely whitelist the merchant if appropriate.
  3. The fraud agent will investigate suspicious charges, search transactions, gather dispute details, and file a structured report while optionally blocking the merchant.
Each agent runs on the same real-time voice framework, sharing the same audio I/O, websocket handling, and transcription stack. They are launched separately by a routing agent that listens for intent and directs the call to the correct module. This keeps each agent simple, domain-focused, and easier to test and extend.
Starting with the problems customers face most often makes the system practical, directly useful, and faster to deploy, and reduces the time-to-value for your agentic system. Automating these flows frees human operators for complex cases while giving callers an immediate, voice-based experience that feels like a real conversation rather than a series of menu prompts.

Our bank emulation environment and AI stack

The multi-agent call center structure is built around a real-time conversational model that processes live audio in both directions. It listens continuously, understands intent as the user speaks, and replies with generated speech almost instantly. This creates a natural exchange instead of the slow, text-based back and forth that most chat systems rely on.
The real-time API gives the model access to tools written in Python, allowing it to perform actions instead of only generating dialogue. When someone says “send me a verification code” or “why was my card declined,” the model can call the right function, pass in arguments like the account number or merchant name, and then return the result as part of the spoken response.
Those tools connect to a banking emulator that acts like a simplified financial backend. It stores mock customer records, transaction data, and merchant information so the system can verify identities, list failed payments, or add a merchant to a whitelist. The emulator makes it possible to test real workflows without exposing any live systems or data.
Together, the real-time model, the tool interface, and the banking emulator form a complete stack for a fully simulated call center. The model manages the conversation, the tools perform actions, and the emulator provides the realistic environment in which those actions take place.

Building a simulated banking backend

To make the voice agents act like real customer service representatives, the system needs a live data source to query and update. Instead of connecting to an external database, the project uses a local JSON file that serves as a miniature banking database. It stores mock customer profiles, transaction histories, and merchant information, giving the agents a reliable and persistent backend to work with.
Each entry in the JSON database includes personal details such as name, date of birth, address, social security number, account number, and email. It also contains a list of transactions for that account with fields for timestamp, amount, merchant, type, and status. Some transactions are marked as failed to allow agents to test realistic scenarios, such as blocked payments or declined charges.
A simple Python class manages all access to this data. It reads and writes the JSON file, exposes methods for verifying customers, listing transactions, or updating allowlists, and ensures that every operation leaves a record behind. This design keeps the system fully local while still behaving like a real financial backend, giving the voice agents something concrete to act upon during conversation.
Here’s what our database looks like:
{
"1001": {
"name": "Alice Johnson",
"dob": "1990-05-04",
"soc": "123-45-6789",
"address": "123 coffee St, Springfield, IL 12345",
"email": "byyoung3@gmail.com",
"balance": 2500,
"history": [
["Deposit", 1000, "2024-11-15T10:30:00", "Payroll Deposit", "posted"],
["Withdraw", 200, "2024-12-01T13:15:00", "ATM Withdrawal", "posted"],
["Deposit", 1700, "2025-01-25T09:00:00", "Freelance Payment", "posted"],
["Withdraw", 45, "2025-02-10T11:23:00", "STARBUCKS", "posted"],
["Withdraw", 75, "2025-03-08T08:17:00", "UBER", "posted"],
["Withdraw", 120, "2025-04-12T15:45:00", "AMAZON", "posted"],
["Withdraw", 130, "2025-04-12T15:43:00", "AMAZON", "posted"],
["Withdraw", 330, "2025-04-12T15:46:00", "AMAZON", "posted"],
["Deposit", 300, "2025-05-10T09:00:00", "Refund", "posted"],
["Withdraw", 300, "2025-06-15T12:33:00", "Unknown Merchant XYZ", "posted"],
["Deposit", 600, "2025-07-20T16:00:00", "Bonus Deposit", "posted"],
["Withdraw", 95, "2025-08-05T10:20:00", "Grocery Market", "posted"],
["Withdraw", 180, "2025-09-29T19:40:00", "AIRLINE ABC", "failed"],
["Withdraw", 52, "2025-10-03T09:10:00", "LOCAL BAKERY", "failed"]
],
"card": {
"status": "active",
"limit": 1500,
"merchants": {
"enabled": ["LOCAL BAKERY"],
"disabled": ["RANDOMSHOP", "STARBUCKS", "AMAZON", "UBER"]
},
"travel_limits": [
{
"country": "FR",
"start": "2025-12-01",
"end": "2025-12-20",
"daily_cap": 300
},
{
"country": "JP",
"start": "2026-01-05",
"end": "2026-01-25",
"daily_cap": 400
}
]
}
},
"2002": {
"name": "Bob Smith",
"dob": "1985-11-21",
"soc": "987-65-4321",
"address": "42 Oak Ave",
"email": "bob.smith@example.com",
"balance": 980,
"history": [
["Deposit", 500, "2024-09-20T09:10:00", "Payroll Deposit", "posted"],
["Deposit", 480, "2024-11-02T09:00:00", "Gift Transfer", "posted"],
["Withdraw", 200, "2025-01-10T12:00:00", "NETFLIX", "posted"],
["Deposit", 300, "2025-02-18T10:00:00", "Freelance Payment", "posted"],
["Withdraw", 100, "2025-03-12T18:30:00", "SPOTIFY", "posted"],
["Withdraw", 65, "2025-04-14T14:50:00", "CASINO ONLINE", "failed"],
["Deposit", 450, "2025-06-10T09:10:00", "Tax Refund", "posted"],
["Withdraw", 125, "2025-07-20T10:20:00", "FASTFOOD PLACE", "posted"],
["Withdraw", 300, "2025-08-25T13:45:00", "Unknown Charge QRS", "posted"],
["Withdraw", 90, "2025-10-01T21:05:00", "BETTING APP", "failed"]
],
"card": {
"status": "frozen",
"limit": 800,
"merchants": {
"enabled": ["SPOTIFY", "NETFLIX"],
"disabled": ["CASINO", "BETTING"]
},
"travel_limits": []
}
},
"3003": {
"name": "Carla Mendes",
"dob": "1992-08-10",
"soc": "222-33-4444",
"address": "98 Pine Blvd",
"email": "carla.mendes@example.com",
"balance": 6500,
"history": [
["Deposit", 4000, "2024-10-01T08:45:00", "Payroll Deposit", "posted"],
["Withdraw", 500, "2024-11-22T16:30:00", "WHOLEFOODS", "posted"],
["Deposit", 3000, "2025-01-15T08:00:00", "Bonus Payment", "posted"],
["Withdraw", 2000, "2025-02-08T13:20:00", "DELTA Airlines", "posted"],
["Deposit", 1000, "2025-03-10T08:15:00", "Tax Refund", "posted"],
["Withdraw", 850, "2025-04-05T10:40:00", "AIRBNB", "posted"],
["Withdraw", 400, "2025-05-22T12:05:00", "Online Boutique", "posted"],
["Withdraw", 250, "2025-07-17T15:00:00", "Unknown Vendor JKL", "posted"],
["Withdraw", 120, "2025-10-04T11:00:00", "LOCAL MUSEUM", "failed"]
],
"card": {
"status": "active",
"limit": 5000,
"merchants": {
"enabled": ["AIRBNB", "DELTA", "WHOLEFOODS"],
"disabled": []
},
"travel_limits": [
{
"country": "ES",
"start": "2025-10-01",
"end": "2025-10-15",
"daily_cap": 800
}
]
}
},
"4004": {
"name": "Daniel Green",
"dob": "1978-02-14",
"soc": "111-22-3333",
"address": "77 Birch Rd, St. Louis, MO 12345",
"email": "daniel.green@example.com",
"balance": 12000,
"history": [
["Deposit", 6000, "2024-12-05T09:00:00", "Salary Deposit", "posted"],
["Withdraw", 1000, "2025-01-10T10:00:00", "HOMEDEPOT", "posted"],
["Deposit", 7000, "2025-03-01T09:30:00", "Investment Return", "posted"],
["Withdraw", 2000, "2025-04-12T14:00:00", "APPLE", "posted"],
["Deposit", 2000, "2025-05-15T09:30:00", "Bonus", "posted"],
["Withdraw", 250, "2025-07-18T16:30:00", "COSTCO", "posted"],
["Withdraw", 900, "2025-08-25T18:20:00", "BETTINGSHOP", "failed"],
["Withdraw", 400, "2025-09-10T11:45:00", "Unknown Charge ABC", "posted"],
["Withdraw", 1800, "2025-10-02T12:10:00", "HOLIDAY INN", "failed"]
],
"card": {
"status": "active",
"limit": 10000,
"merchants": {
"enabled": ["HOMEDEPOT", "APPLE", "COSTCO"],
"disabled": ["BETTINGSHOP"]
},
"travel_limits": [
{
"country": "GB",
"start": "2025-11-01",
"end": "2025-11-15",
"daily_cap": 500
},
{
"country": "IT",
"start": "2026-02-01",
"end": "2026-02-20",
"daily_cap": 600
}
]
}
}
}

Supporting bank infrastructure for accessing the database

To make the database easy for our agents to read and update, similar to how a real system would interact with a backend API, we add a small support layer in Python that handles all interactions cleanly.
This class abstracts away file operations so the agents can work with higher-level methods instead of reading and writing JSON directly. It also ensures that every change is persisted immediately, keeping the simulation consistent across sessions.
For example, the support code includes helper methods to:
  • Validate a customer’s personal information and return whether the match is correct
  • Retrieve all failed transactions for a given date
  • Add a merchant to an account’s whitelist
  • Write the updated record back to the JSON file
  • This simple infrastructure lets the AI agents modify the database as if they were interacting with a real banking system.
Here’s the code for the banking infrastructure emulator:
import os
import json
from pathlib import Path
from typing import Any, Dict, List, Tuple, Optional
from openai import OpenAI

class BankSystem:
def __init__(self, db_file: str = "bank_data.json"):
self.db_path = Path(db_file)
self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
if not self.db_path.exists():
self.db_path.write_text("{}", encoding="utf-8")

def _load_db(self) -> Dict[str, Any]:
return json.loads(self.db_path.read_text(encoding="utf-8"))

def _save_db(self, data: Dict[str, Any]) -> None:
self.db_path.write_text(json.dumps(data, indent=2), encoding="utf-8")

def _get_account(self, account_number: str) -> Tuple[Dict[str, Any], Dict[str, Any]]:
data = self._load_db()
if account_number not in data:
raise KeyError("Account not found")
return data, data[account_number]

def _ensure_card_fields(self, acc: Dict[str, Any]) -> None:
if "card" not in acc:
acc["card"] = {
"status": "active",
"limit": 0,
"merchants": {"enabled": [], "disabled": []},
"travel_limits": []
}

def _ensure_email_field(self, acc: Dict[str, Any]) -> None:
if "email" not in acc:
acc["email"] = None

def _normalize_history(self, acc: Dict[str, Any]) -> None:
# History entries are [type, amount, timestamp, description] or [type, amount, timestamp, description, status]
hist = acc.get("history") or []
norm = []
for entry in hist:
if len(entry) == 4:
ttype, amt, ts, desc = entry
status = "posted"
elif len(entry) >= 5:
ttype, amt, ts, desc, status = entry[:5]
else:
# Skip malformed rows
continue
norm.append([ttype, amt, ts, desc, status])
acc["history"] = norm

def register_customer(
self,
name: str,
dob: str,
soc: str,
address: str,
account_number: str,
balance: float = 0.0,
card_limit: float = 0.0,
email: Optional[str] = None
) -> None:
data = self._load_db()
if account_number in data:
raise ValueError("Account already exists.")
data[account_number] = {
"name": name,
"dob": dob,
"soc": soc,
"address": address,
"email": email,
"balance": balance,
"history": [],
"card": {
"status": "active",
"limit": card_limit,
"merchants": {"enabled": [], "disabled": []},
"travel_limits": []
}
}
self._save_db(data)

def validate_customer(self, name: str, dob: str, soc: str, address: str, account_number: str) -> bool:
data = self._load_db()
acc = data.get(account_number)
if not acc:
return False

prompt = f"""
Compare this provided user info to the record.
Return only yes if they appear to match, even if there are small formatting differences, otherwise no.

Provided info:
Name: {name}
DOB: {dob}
SOC: {soc}
Address: {address}

Record info:
Name: {acc['name']}
DOB: {acc['dob']}
SOC: {acc['soc']}
Address: {acc['address']}

ONLY RESPOND WITH yes or no
"""
response = self.client.responses.create(
model="gpt-4.1",
input=prompt,
temperature=0.0
)
answer = (getattr(response, "output_text", "") or "").strip().lower()
return "yes" in answer

def get_balance(self, account_number: str) -> float:
data = self._load_db()
return data[account_number]["balance"]

def get_spending_history(self, account_number: str) -> List[List[Any]]:
data = self._load_db()
acc = data[account_number]
self._normalize_history(acc)
return acc["history"]

def send_report(self, account_number: str) -> str:
data = self._load_db()
acc = data[account_number]
self._normalize_history(acc)
report = f"Account Report for {acc['name']}\nBalance: ${acc['balance']}\nTransactions:\n"
for ttype, amt, ts, desc, status in acc["history"]:
report += f"{ts} {ttype} ${amt} {desc} [{status}]\n"
return report

def set_limit(self, account_number: str, new_limit: float) -> float:
if new_limit < 0:
raise ValueError("Limit cannot be negative")
data, acc = self._get_account(account_number)
self._ensure_card_fields(acc)
acc["card"]["limit"] = new_limit
self._save_db(data)
return acc["card"]["limit"]

def freeze_card(self, account_number: str) -> str:
data, acc = self._get_account(account_number)
self._ensure_card_fields(acc)
acc["card"]["status"] = "frozen"
self._save_db(data)
return "frozen"

def unlock_card(self, account_number: str) -> str:
data, acc = self._get_account(account_number)
self._ensure_card_fields(acc)
acc["card"]["status"] = "active"
self._save_db(data)
return "active"

def enable_merchant(self, account_number: str, merchant: str) -> Dict[str, List[str]]:
data, acc = self._get_account(account_number)
self._ensure_card_fields(acc)
m = acc["card"]["merchants"]
if merchant in m.get("disabled", []):
m["disabled"].remove(merchant)
if merchant not in m.get("enabled", []):
m.setdefault("enabled", []).append(merchant)
self._save_db(data)
return {"enabled": m.get("enabled", []), "disabled": m.get("disabled", [])}

def disable_merchant(self, account_number: str, merchant: str) -> Dict[str, List[str]]:
data, acc = self._get_account(account_number)
self._ensure_card_fields(acc)
m = acc["card"]["merchants"]
if merchant in m.get("enabled", []):
m["enabled"].remove(merchant)
if merchant not in m.get("disabled", []):
m.setdefault("disabled", []).append(merchant)
self._save_db(data)
return {"enabled": m.get("enabled", []), "disabled": m.get("disabled", [])}

def add_travel_limit(
self,
account_number: str,
country: str,
start: str,
end: str,
daily_cap: Optional[float] = None
) -> List[Dict[str, Any]]:
data, acc = self._get_account(account_number)
self._ensure_card_fields(acc)
entry: Dict[str, Any] = {"country": country, "start": start, "end": end}
if daily_cap is not None:
if daily_cap < 0:
raise ValueError("daily_cap cannot be negative")
entry["daily_cap"] = daily_cap
acc["card"]["travel_limits"].append(entry)
self._save_db(data)
return list(acc["card"]["travel_limits"])

def get_card_status(self, account_number: str) -> str:
_, acc = self._get_account(account_number)
self._ensure_card_fields(acc)
return acc["card"]["status"]

def get_card_config(self, account_number: str) -> Dict[str, Any]:
_, acc = self._get_account(account_number)
self._ensure_card_fields(acc)
return acc["card"]

# New transaction support

def list_transactions_by_date(
self,
account_number: str,
date_iso: str,
include_failed: bool = True
) -> List[Tuple[str, float, str, str, str]]:
data, acc = self._get_account(account_number)
self._normalize_history(acc)
out: List[Tuple[str, float, str, str, str]] = []
for ttype, amt, ts, desc, status in acc["history"]:
if ts.startswith(date_iso):
if include_failed or status != "failed":
out.append((ttype, float(amt), ts, desc, status))
return out

def list_failed_by_date(
self,
account_number: str,
date_iso: str
) -> List[Tuple[str, float, str, str]]:
data, acc = self._get_account(account_number)
self._normalize_history(acc)
out: List[Tuple[str, float, str, str]] = []
for ttype, amt, ts, desc, status in acc["history"]:
if ts.startswith(date_iso) and status == "failed":
out.append((ttype, float(amt), ts, desc))
return out

def whitelist_merchant(self, account_number: str, merchant: str) -> Dict[str, Any]:
data, acc = self._get_account(account_number)
self._ensure_card_fields(acc)
m = acc["card"]["merchants"]
enabled = set(m.get("enabled", []))
disabled = set(m.get("disabled", []))

key = merchant.strip()
if key in disabled:
disabled.remove(key)
enabled.add(key)

m["enabled"] = sorted(enabled)
m["disabled"] = sorted(disabled)
self._save_db(data)
return {"ok": True, "enabled": m["enabled"], "disabled": m["disabled"]}

def add_failed_transaction(
self,
account_number: str,
amount: float,
timestamp_iso: str,
merchant: str
) -> None:
data, acc = self._get_account(account_number)
self._normalize_history(acc)
acc["history"].append(["Withdraw", float(amount), timestamp_iso, merchant, "failed"])
self._save_db(data)

def add_posted_transaction(
self,
account_number: str,
ttype: str,
amount: float,
timestamp_iso: str,
description: str
) -> None:
if ttype not in {"Deposit", "Withdraw"}:
raise ValueError("ttype must be Deposit or Withdraw")
data, acc = self._get_account(account_number)
self._normalize_history(acc)
acc["history"].append([ttype, float(amount), timestamp_iso, description, "posted"])
self._save_db(data)
Next, we’ll build a lightweight support layer that lets our agents interact with this data just like a real banking API.

Building a voice agent class

The voice agent class is the backbone of the system. It manages the real-time websocket session, handles audio input and playback, and connects the model’s decisions to executable Python functions. It captures microphone input, streams it to the model, and plays back generated speech as it arrives. Every exchange is logged in a transcript for later analysis or debugging.
The user defines the agent’s behavior by passing system prompts, tool definitions, and a function handler. The tools describe the available actions such as verifying a customer or listing failed transactions, while the handler executes the corresponding Python functions when those tools are called. The base agent manages this routing automatically, listening for function calls from the model, invoking the handler, sending structured outputs back, and continuing the conversation flow in real time.
This implementation runs on gpt-realtime-2025-08-28, a fully multimodal model capable of understanding speech directly, generating natural spoken responses, and calling tools as part of its reasoning process. The model continuously processes audio, interprets intent, and executes function calls in real time without needing explicit rules or orchestration logic.
We built this class to make it simple to define a conversational agent that can think, speak, and act fluidly. The developer only needs to provide three pieces: a system prompt that defines the agent’s behavior, a list of callable tools that describe possible actions, and a handler that executes those tools when the model requests them.
The model itself decides when and how to use those tools, calling them in the right sequence and combining them naturally with dialogue. That means you don’t have to hard code turn taking logic or tool ordering rules. The model’s own reasoning is strong enough to manage multi step processes like verification, data lookup, and reporting autonomously.
The class also exposes clear lifecycle signals that help coordinate sessions. When the model completes the user’s request, it calls an objective completion tool that summarizes the outcome. If the caller says stop or return to main menu, the model invokes the end session tool, ensuring a clean shutdown after the final spoken line.
Together, these features make the agent a self contained bridge between human conversation, realtime AI reasoning, and live system actions, all running through one continuous audio stream.
The following code is quite long and complex due to the challenges of processing realtime audio. I recommend skipping over this script unless you are specifically interested in the logic for processing audio with the gpt-realtime model.
💡
Here’s the code:
import os
import json
import base64
import threading
import queue
import numpy as np
import sounddevice as sd
import websocket
from typing import Callable, List, Dict, Any, Optional
import time


class RealtimeVoiceAgent:
def __init__(
self,
api_key: Optional[str] = None,
model_url: str = "wss://api.openai.com/v1/realtime?model=gpt-realtime-2025-08-28",
system_instructions: str = "You are a helpful voice assistant.",
tools: Optional[List[Dict[str, Any]]] = None,
tool_choice: str = "auto",
voice: str = "alloy",
input_audio_format: str = "pcm16",
output_audio_format: str = "pcm16",
in_sample_rate: int = 16000,
out_sample_rate: int = 24000,
channels: int = 1,
blocksize: int = 1024,
handle_function_call: Optional[Callable[[websocket.WebSocketApp, Dict[str, Any]], None]] = None,
debug: bool = True,
# built-in tools (objective completed / end session)
objective_completion_description: str = (
"Call this when you have achieved the user’s goal and already spoken the final answer. "
"Provide a short summary in the summary field."
),
end_session_description: str = (
"Call this when the user asks to end the conversation or return to the main menu. "
"Provide a brief reason."
),
# speak-first options
speak_first: bool = False,
intro_instructions: str = "Hi there—how can I help you today?",
intro_out_of_band: bool = False, # True => keep intro out of default conversation
# Weave logging options
conversation_id: Optional[str] = None,
agent_id: Optional[str] = None,
):
self.API_KEY = api_key or os.environ.get("OPENAI_API_KEY") or "REPLACE_ME"
self.URL = model_url

self.system_instructions = system_instructions
self.user_tools = tools or []
self.tool_choice = tool_choice
self.voice = voice
self.input_audio_format = input_audio_format
self.output_audio_format = output_audio_format

self.IN_SR = in_sample_rate
self.OUT_SR = out_sample_rate
self.CHANNELS = channels
self.BLOCK = blocksize

self.handle_function_call_cb = handle_function_call
self.debug = debug

# speak-first
self.speak_first = speak_first
self.intro_instructions = intro_instructions
self.intro_out_of_band = intro_out_of_band

# Metadata for logging
self.conversation_id = conversation_id
self.agent_id = agent_id

# Transcript collection
self.transcript: List[Dict[str, Any]] = []

# ws + audio state
self._ws: Optional[websocket.WebSocketApp] = None
self._stop_event = threading.Event()
self._in_q: "queue.Queue[np.ndarray]" = queue.Queue()
self._out_buf = bytearray()
self._out_lock = threading.Lock()
self._audio_in_thread: Optional[threading.Thread] = None
self._audio_out_thread: Optional[threading.Thread] = None

# response lifecycle
self._active_response_id: Optional[str] = None
self._need_response_after_tool = False

# built-in tools
self._objective_tool_name = "objective_completion_tool"
self._end_session_tool_name = "end_session_tool"
self._awaiting_end_after_assistant_msg = False
self._objective_completion_description = objective_completion_description
self._end_session_description = end_session_description

# ---------------- Public API ----------------

def start(self):
if self._ws is not None:
if self.debug: print("[DEBUG] Already started")
return

self._audio_out_thread = threading.Thread(target=self._audio_player_loop, daemon=True)
self._audio_out_thread.start()

if self.debug: print("[DEBUG] Connecting to", self.URL)
self._ws = websocket.WebSocketApp(
self.URL,
header=[
"Authorization: Bearer " + self.API_KEY,
"OpenAI-Beta: realtime=v1",
],
on_open=self._on_open,
on_message=self._on_message,
on_close=self._on_close,
on_error=self._on_error,
)
threading.Thread(target=self._ws.run_forever, daemon=True).start()

def stop(self):
time.sleep(4.5) # allow any final audio to play out
if self.debug: print("[DEBUG] Agent stopping")
self._stop_event.set()
if self._ws:
try:
self._ws.close()
except Exception:
pass
self._ws = None
print("[AGENT EXITED]")

def get_transcript(self) -> List[Dict[str, Any]]:
"""Return the collected transcript of interactions."""
return self.transcript

# ---------------- Internals ----------------

def _compose_tools(self) -> List[Dict[str, Any]]:
objective_tool = {
"type": "function",
"name": self._objective_tool_name,
"description": self._objective_completion_description,
"parameters": {
"type": "object",
"properties": {
"summary": {"type": "string", "description": "One line summary."}
}
}
}
end_tool = {
"type": "function",
"name": self._end_session_tool_name,
"description": self._end_session_description,
"parameters": {
"type": "object",
"properties": {
"reason": {"type": "string", "description": "Why the user ended or asked to return to main menu."},
"farewell": {"type": "string", "description": "Short closing line to speak to the user."}
}
}
}
return [objective_tool, end_tool] + self.user_tools

def _send_session_update(self, ws: websocket.WebSocketApp):
ev = {
"type": "session.update",
"session": {
"voice": self.voice,
"instructions": (
self.system_instructions
+ " If the user says end, stop, goodbye, or return to main menu, call end_session_tool."
),
# strings only
"input_audio_format": self.input_audio_format,
"output_audio_format": self.output_audio_format,
"input_audio_transcription": {
"model": "whisper-1"
},
"tools": self._compose_tools(),
"tool_choice": self.tool_choice,
}
}
ws.send(json.dumps(ev))
if self.debug:
print("[DEBUG] session.update sent with tools:", [t["name"] for t in self._compose_tools()])

def _mic_cb(self, indata, frames, time, status):
if status and self.debug:
print("[MIC STATUS]", status)
self._in_q.put(indata.copy())

def _start_mic_stream(self, ws: websocket.WebSocketApp):
def mic_loop():
with sd.InputStream(
samplerate=self.IN_SR,
channels=self.CHANNELS,
dtype="float32",
blocksize=self.BLOCK,
callback=self._mic_cb
):
print("[DEBUG] Mic ready. Speak naturally.")
while not self._stop_event.is_set():
try:
audio = self._in_q.get(timeout=0.1)
except queue.Empty:
continue
b64 = self._base64_audio(audio[:, 0])
ws.send(json.dumps({"type": "input_audio_buffer.append", "audio": b64}))
self._audio_in_thread = threading.Thread(target=mic_loop, daemon=True)
self._audio_in_thread.start()

def _audio_player_loop(self):
bytes_per_frame = 2 * self.CHANNELS
frames_needed = 2048

def cb(outdata, frames, time, status):
if status and self.debug:
print("[PLAYBACK STATUS]", status)
need_bytes = frames * bytes_per_frame
with self._out_lock:
if len(self._out_buf) >= need_bytes:
chunk = self._out_buf[:need_bytes]
del self._out_buf[:need_bytes]
else:
chunk = bytes(need_bytes)
s16 = np.frombuffer(chunk, dtype=np.int16)
f32 = (s16.astype(np.float32) / 32767.0).reshape(-1, self.CHANNELS)
outdata[:] = f32

with sd.OutputStream(
samplerate=self.OUT_SR,
channels=self.CHANNELS,
dtype="float32",
callback=cb,
blocksize=frames_needed
):
print("[DEBUG] Speaker ready")
while not self._stop_event.is_set():
sd.sleep(100)

# --------------- WS Callbacks ----------------

def _on_open(self, ws):
if self.debug: print("[DEBUG] WebSocket connected")
self._send_session_update(ws)

# Speak first (optional)
if self.speak_first and self.intro_instructions:
payload = {
"type": "response.create",
"response": {
"instructions": self.intro_instructions
}
}
if self.intro_out_of_band:
payload["response"]["conversation"] = "none"
ws.send(json.dumps(payload))
if self.debug: print("[DEBUG] Intro response.create issued")

self._start_mic_stream(ws)

def _on_close(self, ws, *args):
if self.debug: print("[DEBUG] WebSocket closed")

def _on_error(self, ws, err):
print("[DEBUG] WebSocket error:", err)

def _on_message(self, ws, message):
try:
ev = json.loads(message)
except Exception as e:
print("[ERROR] bad JSON:", e)
return

t = ev.get("type")

# Print all events with full data for debugging (skip audio deltas to reduce clutter)
if self.debug and t not in ["response.audio.delta", "response.audio_transcript.delta"]:
print(f"\n{'='*80}")
print(f"[SERVER EVENT] Type: {t}")
print(f"[SERVER EVENT] Full data:")
print(json.dumps(ev, indent=2))
print(f"{'='*80}\n")

if t == "response.created":
self._active_response_id = ev.get("response", {}).get("id")
if self.debug: print("[DEBUG] response.created ->", self._active_response_id)
return

if t == "response.done":
if self.debug: print("[DEBUG] response.done ->", self._active_response_id)
self._active_response_id = None
if self._need_response_after_tool:
self._need_response_after_tool = False
self._safe_response_create(ws)
return

if t == "response.audio.delta":
b64 = ev.get("delta")
if b64:
pcm = base64.b64decode(b64)
with self._out_lock:
self._out_buf.extend(pcm)
return

if t == "input_audio_buffer.speech_started":
if self.debug: print("[DEBUG] VAD: speech started")
return

if t == "input_audio_buffer.speech_stopped":
if self.debug: print("[DEBUG] VAD: speech stopped")
return

if t == "conversation.item.input_audio_transcription.completed":
transcript = ev.get("transcript", "")
if transcript:
print(f"\n[USER]: {transcript}\n")
# Collect user input for transcript
self.transcript.append({
"type": "user_input",
"content": transcript,
"timestamp": time.time(),
"agent_id": self.agent_id,
"metadata": {
"event_id": ev.get("event_id"),
"item_id": ev.get("item_id"),
}
})
return

if t == "response.output_audio_transcript.delta":
delta = ev.get("delta", "")
if delta:
print(delta, end="", flush=True) # Print transcript chunks as they arrive
return

if t == "response.output_audio_transcript.done":
transcript = ev.get("transcript", "")
if transcript:
print() # New line after complete transcript
if self.debug: print(f"[DEBUG] Complete transcript: {transcript}")
return

if t == "response.output_item.done":
item = ev.get("item", {})

# Extract and print transcript from completed message items
if item.get("type") == "message" and item.get("role") == "assistant":
content = item.get("content", [])
if content and len(content) > 0:
transcript = content[0].get("transcript", "")
if transcript:
print(f"\n[ASSISTANT]: {transcript}\n")
# Collect assistant response for transcript
self.transcript.append({
"type": "assistant_response",
"content": transcript,
"timestamp": time.time(),
"agent_id": self.agent_id,
"metadata": {
"response_id": ev.get("response_id"),
"item_id": item.get("id"),
"event_id": ev.get("event_id"),
}
})

if self._awaiting_end_after_assistant_msg:
if self.debug: print("[DEBUG] Final assistant message delivered; stopping agent")
self.stop()
return

if item.get("type") == "function_call":
self._dispatch_function_call(ws, item)
return

if t == "response.done.function_call":
return

if t == "response.function_call_arguments.done":
# Collect tool calls for transcript
function_name = ev.get("name")
arguments = ev.get("arguments", "{}")
if function_name:
print(f"\n[TOOL CALL]: {function_name}")
print(f"[ARGUMENTS]: {arguments}\n")
self.transcript.append({
"type": "tool_call",
"content": f"Function: {function_name}",
"timestamp": time.time(),
"agent_id": self.agent_id,
"metadata": {
"function_name": function_name,
"arguments": arguments,
"call_id": ev.get("call_id"),
"response_id": ev.get("response_id"),
"item_id": ev.get("item_id"),
"event_id": ev.get("event_id"),
}
})
return

if t == "error":
print("[SERVER ERROR]", ev)
return

# --------------- Function Calls ---------------

def _dispatch_function_call(self, ws, item):
name = item.get("name")
if self.debug:
print(f"[DEBUG] function_call detected: {name} raw={item}")

# End session tool: acknowledge, ask model to speak closing, then exit after next assistant message
if name == self._end_session_tool_name:
args = json.loads(item.get("arguments", "{}") or "{}")
farewell = args.get("farewell", "Okay, ending the session now.")
ws.send(json.dumps({
"type": "conversation.item.create",
"item": {
"type": "function_call_output",
"call_id": item.get("call_id"),
"output": json.dumps({"ok": True, "farewell": farewell})
}
}))
if self.debug: print("[DEBUG] end_session_tool acknowledged")
self._awaiting_end_after_assistant_msg = True
self._safe_response_create(ws)
return

# Objective completion tool: same exit flow after the model speaks its closing line
if name == self._objective_tool_name:
ws.send(json.dumps({
"type": "conversation.item.create",
"item": {
"type": "function_call_output",
"call_id": item.get("call_id"),
"output": json.dumps({"ok": True})
}
}))
if self.debug: print("[DEBUG] objective_completion_tool acknowledged")
self._awaiting_end_after_assistant_msg = True
self._safe_response_create(ws)
return

# User-registered tools
if not self.handle_function_call_cb:
ws.send(json.dumps({
"type": "conversation.item.create",
"item": {
"type": "function_call_output",
"call_id": item.get("call_id"),
"output": "{}"
}
}))
self._safe_response_create(ws)
return

before = self._active_response_id
self.handle_function_call_cb(ws, item)
after = self._active_response_id
if before is not None and before == after:
self._need_response_after_tool = True

# --------------- Helpers ---------------

def _safe_response_create(self, ws):
if self._active_response_id is None:
ws.send(json.dumps({"type": "response.create"}))
if self.debug: print("[DEBUG] response.create issued")
else:
self._need_response_after_tool = True
if self.debug: print("[DEBUG] response.create deferred (active response)")

@staticmethod
def _float_to_pcm16(f32: np.ndarray) -> bytes:
f = np.clip(f32, -1.0, 1.0)
return (f * 32767).astype(np.int16).tobytes()

def _base64_audio(self, f32_mono: np.ndarray) -> str:
return base64.b64encode(self._float_to_pcm16(f32_mono)).decode("ascii")


# ---------------- Demo: simple weather tool ----------------

def weather_handler(ws, call):
args = json.loads(call.get("arguments", "{}") or "{}")
loc = args.get("location", "unknown")
print(f"[DEBUG] weather tool called with location={loc}")

demo = {
"paris": {"temp": "21°C", "condition": "Partly cloudy"},
"new york": {"temp": "22°C", "condition": "Sunny"},
"london": {"temp": "16°C", "condition": "Rainy"},
"tokyo": {"temp": "25°C", "condition": "Clear"},
"chicago": {"temp": "20°C", "condition": "Breezy"},
}
w = demo.get(loc.lower(), {"temp": "20°C", "condition": "Mild"})

out = json.dumps({"location": loc, "temperature": w["temp"], "condition": w["condition"]})
ws.send(json.dumps({
"type": "conversation.item.create",
"item": {
"type": "function_call_output",
"call_id": call["call_id"],
"output": out
}
}))
print("[DEBUG] weather result posted; base class will auto-create response")


if __name__ == "__main__":
tools = [
{
"type": "function",
"name": "get_weather",
"description": "Get current weather for a location.",
"parameters": {
"type": "object",
"properties": { "location": {"type": "string"} },
"required": ["location"]
}
}
]

agent = RealtimeVoiceAgent(
system_instructions=(
"You are a voice assistant. If the user asks for weather, call get_weather and say the result clearly. "
"If the user asks to end or return to the main menu, call end_session_tool. "
"When the task is fully complete and you have spoken the final answer, you may call objective_completion_tool. Speak English."
),
tools=tools,
handle_function_call=weather_handler,
debug=True,
# Speak-first demo:
speak_first=True,
intro_instructions="Start out greeting the user and asking if they would like to check the weather. speak english"
# intro_out_of_band=True, # optional if you want the intro outside the default conversation
)

agent.start()
try:
while not agent._stop_event.is_set():
threading.Event().wait(0.2)
except KeyboardInterrupt:
agent.stop()
Real-time voice models can understand spoken audio directly. They analyze the raw waveform and respond in real time without needing a separate transcription step. However, if you want a text record of what the user actually said, you must attach an explicit transcription model. This is handled in our code through the input_audio_transcription field in the session configuration, where we specify model: whisper-1.
Whisper runs independently of the realtime model. It listens to the same incoming audio buffers and produces text transcripts as events. The realtime model continues using the audio for understanding and response generation, while Whisper’s role is purely to produce readable text that can be displayed or logged.
The agent receives these transcripts through conversation.item.input_audio_transcription.completed events and adds them to the internal transcript log. The assistant’s replies are also captured as response.output_audio_transcript events, providing both sides of the conversation in text form. Together, these entries form a complete trace of each turn, including what the user said, what the model spoke, and any function calls that occurred.
Saving these transcripts allows you to send them to W&B Weave for later analysis and compliance. Weave tracks each tool call and response as a node in a trace, letting you inspect every step of the conversation: which function was called, what arguments were passed, and how long each step took. By pairing that trace with the stored transcripts, you get synchronized audio, text, and action data for debugging, evaluation, or demonstration.

Building the router agent and chat loop

At the top level, this multi-agent call center system runs a simple chat loop that manages the entire interaction flow. It starts by launching the router agent, waits for it to finish listening, and reads the printed route result. Based on that route, the loop starts the matching specialized agent, such as authentication, failed transaction, or fraud, and lets it handle the rest of the call. When that agent finishes, the loop either ends or returns to the router if the user wants to go back to the main menu.
This loop keeps the logic straightforward. The router determines which problem area the user belongs to, and the orchestrator simply spawns the correct agent process. Each agent is isolated in its own script and runs independently through a subprocess call. That isolation wasn’t just a design choice; it fixed an issue where the audio libraries may not close properly between agents. Running each one in a clean process guarantees fresh audio input and output for every stage.
The router itself handles only one job: listening to the user’s initial request and mapping it to a route. It exposes a set of callable tools: start_failed_agent, start_auth_agent, start_fraud_agent, and route_to_human, each representing a possible destination. Once the model picks one, the router confirms the tool call, prints a line like ROUTE: failed, and stops. The chat loop then reads that output, launches the corresponding agent, and the conversation continues naturally without the user noticing the handoff.
This pattern turns the voice system into a chain of small, focused components. The chat loop coordinates everything, the router decides direction, and the specialized agents handle their domains from start to finish.
Here is the code for the chat loop:
import os
import sys
import subprocess
import shlex
import time
import uuid
import json
import weave
from typing import Dict, Any, List, Optional

ROUTE_TO_SCRIPT = {
"fraud": "fraud_agent.py",
"failed": "transaction_agent.py",
"auth": "password_agent.py",
"human": None,
"exit": None,
}

# IMPORTANT: call the CLI flavor, unbuffered
ROUTER_CMD = f"{shlex.quote(sys.executable)} -u {shlex.quote('voice_router_agent.py')}"


@weave.op
def record_interaction(
interaction_type: str,
content: str,
conversation_id: Optional[str] = None,
agent_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Record and track interactions in the voice agent conversation.

Weave is great for monitoring LLMs in production, allowing you to track
performance characteristics such as response length, prompt coverage, or
rare failure cases over time. This makes it easier to iterate on system
behavior, uncover unintended behaviors, or debug edge cases—all of which
are key parts of effectively aligning and evaluating language models in
real-world applications.

Args:
interaction_type: Type of interaction (user_input, assistant_response, tool_call)
content: The actual content (transcript or function call details)
conversation_id: Unique identifier for the conversation session
agent_id: Identifier for the specific agent (e.g., "password_agent", "transaction_agent")
metadata: Additional context (response_id, item_id, function name, etc.)

Returns:
Dictionary containing the logged interaction data
"""
pass


def run_router_once(conversation_id: str) -> str:
env = os.environ.copy()
env["CONVERSATION_ID"] = conversation_id
proc = subprocess.run(
ROUTER_CMD,
shell=True,
capture_output=True,
text=True,
check=False,
env=env,
)

# Print router logs so you can see what's happening
if proc.stderr:
sys.stderr.write(proc.stderr)
sys.stderr.flush()
if proc.stdout:
sys.stdout.write(proc.stdout)
sys.stdout.flush()

route = "human"
for line in (proc.stdout or "").splitlines():
if line.strip().lower().startswith("route:"):
route = line.split(":", 1)[1].strip().lower()
break
return route

def run_agent_script(script: str, conversation_id: str) -> Optional[List[Dict[str, Any]]]:
"""Run an agent script in a subprocess and read its transcript from file."""
print(f"[orchestrator] launching agent script: {script}")

# Define transcript file path
transcript_file = f"/tmp/transcript_{conversation_id}.json"

cmd = f"{shlex.quote(sys.executable)} -u {shlex.quote(script)}"
env = os.environ.copy()
env["CONVERSATION_ID"] = conversation_id
env["TRANSCRIPT_FILE"] = transcript_file

# DON'T capture output - let the agent speak and show debug messages
subprocess.run(
cmd,
shell=True,
check=False,
env=env,
# No capture_output=True here - let stdout/stderr flow to terminal
)

# Read the transcript file if it exists
transcript = None
if os.path.exists(transcript_file):
try:
import json
with open(transcript_file, 'r') as f:
transcript = json.load(f)
# Clean up the transcript file
os.remove(transcript_file)
except Exception as e:
print(f"[orchestrator] error reading transcript: {e}")

return transcript



@weave.op
def main():
# Initialize Weave for tracking and monitoring
print("Voice Router Orchestrator. Ctrl+C to exit.")

# Generate a unique conversation ID for this session
conversation_id = str(uuid.uuid4())
print(f"[orchestrator] Starting conversation session: {conversation_id}")

while True:
route = run_router_once(conversation_id)
print(f"[orchestrator] router chose: {route}")
if route == "exit":
print("[orchestrator] exiting.")
break
if route == "human":
print("Connecting you to a human representative… (placeholder)")
time.sleep(1.0)
continue

script = ROUTE_TO_SCRIPT.get(route)
if not script:
print("[orchestrator] unknown route; returning to router.")
continue

# Brief pause to allow audio devices to fully release from router
print("[orchestrator] preparing handoff to agent...")
time.sleep(0.5)

# Run agent and get transcript
transcript = run_agent_script(script, conversation_id)

# Log all interactions from the transcript
if transcript:
print(f"[orchestrator] logging {len(transcript)} interactions to Weave")
for interaction in transcript:
record_interaction(
interaction_type=interaction.get("type"),
content=interaction.get("content"),
conversation_id=conversation_id,
agent_id=interaction.get("agent_id"),
metadata=interaction.get("metadata")
)

# Brief pause before restarting router
print("[orchestrator] agent complete, restarting router...")
time.sleep(0.5)
# when the agent exits, loop back to router

if __name__ == "__main__":

weave.init("finance-callcenter-voice-agents")
main()
Each agent writes a JSON transcript at the end of its run. The chat loop passes a shared conversation_id so every transcript can be tied together. When an agent exits, the loop reads that file, deletes it, and logs every entry to Weave.
The transcript holds every exchange: user_input from Whisper, assistant_response from the model, and tool_call entries with arguments and IDs. Each line is timestamped and tagged with the agent_id that produced it.
The record_interaction function is decorated with a weave.op, so every call is automatically traced and visible in Weave. The orchestrator iterates through the transcript, calling record_interaction for each entry. That creates a full trace tree under one conversation_id, showing how the router and each agent interacted.
This chat loop instantiates the following router agent for classifying intent of the user so it can route to a specialized agent:
import os
import json
import threading
from typing import Dict, Any, Optional

from realtime_agent_base import RealtimeVoiceAgent # your fixed base class


class VoiceRouterAgent:
"""
Voice-first router that listens and selects ONE route via a tool call.
It DOES NOT launch sub-agents. Instead, it prints "ROUTE: <route>"
to stdout and exits. This avoids audio handle reuse issues.
"""

def __init__(
self,
api_key: Optional[str] = None,
model_url: str = "wss://api.openai.com/v1/realtime?model=gpt-realtime-2025-08-28",
voice: str = "alloy",
debug: bool = True,
# --- speak-first options ---
speak_first: bool = True,
intro_instructions: str = (
"use this intro to greet the user: welcome to the GPT Bank. I can connect you to the right team. "
"Say 'fraud' for a suspicious charge, 'failed' for a declined payment, "
"'auth' for login or password help, or 'human' to talk to a person. "
"You can also say 'exit' to return to the main menu. Speak English."
),
intro_out_of_band: bool = False, # set True if your base agent supports OOB intros
conversation_id: Optional[str] = None,
):
self.route_choice: Optional[str] = None
self.debug = debug

# Tools the model can call to select a route
self.tools = [
{"type": "function", "name": "start_fraud_agent",
"description": "Route to the Fraud Agent.", "parameters": {"type": "object", "properties": {}}},
{"type": "function", "name": "start_failed_agent",
"description": "Route to the Failed Transaction Agent.", "parameters": {"type": "object", "properties": {}}},
{"type": "function", "name": "start_auth_agent",
"description": "Route to the Auth Agent.", "parameters": {"type": "object", "properties": {}}},
{"type": "function", "name": "route_to_human",
"description": "Route to a human representative.", "parameters": {"type": "object", "properties": {}}},
{"type": "function", "name": "shutdown_router",
"description": "Exit router without choosing an agent (user said exit/main menu).",
"parameters": {"type": "object", "properties": {}}},
]

system_instructions = (
"Only speak English You are a voice router for a bank call center.\n"
"CALL EXACTLY ONE TOOL based on the user's intent:\n"
"- start_fraud_agent for fraudulent/unauthorized charge, dispute, stolen card, chargeback\n"
"- start_failed_agent for declined card, blocked merchant, failed payment, whitelist/allowlist\n"
"- start_auth_agent for login issue, password reset, email/verification code, identity verification\n"
"- route_to_human for anything else or if unclear\n"
"- shutdown_router if the user says exit or main menu\n\n"
"After you call a tool, do not keep talking. After calling the tool, EXIT!."
)

self.agent = RealtimeVoiceAgent(
api_key=api_key or os.environ.get("OPENAI_API_KEY"),
model_url=model_url,
system_instructions=system_instructions,
tools=self.tools,
tool_choice="auto",
voice=voice,
input_audio_format="pcm16",
output_audio_format="pcm16",
handle_function_call=self._handle_tool_call,
debug=debug,
objective_completion_description="(unused by router)",
# --- speak-first wiring ---
speak_first=speak_first,
intro_instructions=intro_instructions,
intro_out_of_band=intro_out_of_band,
# Weave logging
conversation_id=conversation_id,
agent_id="voice_router_agent",
)

def _handle_tool_call(self, ws, item: Dict[str, Any]):
name = item.get("name")
# ACK the tool call so the model is satisfied
ws.send(json.dumps({
"type": "conversation.item.create",
"item": {
"type": "function_call_output",
"call_id": item["call_id"],
"output": json.dumps({"ok": True})
}
}))

if self.debug:
print(f"[router] tool call received: {name}", flush=True)

# Map tool -> route string
mapping = {
"start_fraud_agent": "fraud",
"start_failed_agent": "failed",
"start_auth_agent": "auth",
"route_to_human": "human",
"shutdown_router": "exit",
}
self.route_choice = mapping.get(name, "human")

# Stop the router voice session immediately after routing
self.agent.stop()

def run(self) -> str:
self.agent.start()
try:
while not self.agent._stop_event.is_set():
threading.Event().wait(0.25)
except KeyboardInterrupt:
self.agent.stop()
return self.route_choice or "human"


if __name__ == "__main__":
conversation_id = os.environ.get("CONVERSATION_ID")
router = VoiceRouterAgent(debug=True, conversation_id=conversation_id)
route = router.run()
# Print one clean, parseable line for the orchestrator
print(f"ROUTE: {route}", flush=True)

Implementing a password reset agent into our call center

The authentication and password reset path uses the same real-time voice base as the other agents, but its job is specific. It verifies identity, delivers a 6-digit code by email, confirms that code, and, if requested, sends a password reset link. The AI agent takes in system instructions, a tool schema, and a single function handler. The tools declare what actions are allowed. The handler executes Python functions in response to tool calls and returns structured results to the model, allowing the conversation to continue smoothly.
The model gathers name, birth date, SSN, address, and the account number. The verify tool checks those fields against the local banking data. If verification fails, the agent offers one retry and then ends gracefully. The agent never reads the SSN back to the caller, and it does not expose balances or other account data.
Once verified, the user can request a reset. The agent calls the send email code tool, which looks up the email on file, generates a 6-digit code, and sends it through the configured SMTP account. When the caller speaks the code, the confirm email code tool validates it. If it matches, the agent can then call the send password reset link tool, which emails a short-lived link to the same address on file and returns a status object the model can explain in plain speech.
Here’s the code for the password reset agent:
import os
import json
import ssl
import smtplib
import random
from typing import Dict, Any, Optional, List
from email.message import EmailMessage

from bank_infra import BankSystem
from realtime_agent_base import RealtimeVoiceAgent # your fixed voice base


class AuthAgent:
"""
Voice-first Authentication assistant built on RealtimeVoiceAgent.

Public surface:
- __init__(db_path="bank_data.json", api_key=None, email_account=..., email_password=..., speak_first=True, intro_instructions=..., intro_out_of_band=False)
- run()
"""

def __init__(
self,
db_path: str = "bank_data.json",
api_key: Optional[str] = None,
email_account: str = "your email address",
email_password: Optional[str] = "app password for your account",
speak_first: bool = True,
intro_instructions: str = (
"Hello, I'm the authentication assistant. "
"I'll verify your identity, send a 6-digit email code, and help with password reset if needed. "
"To begin, please tell me your account number."
),
intro_out_of_band: bool = False,
conversation_id: Optional[str] = None,
):
self.api_key = api_key or os.environ.get("OPENAI_API_KEY")
self.bank = BankSystem(db_path)
self.EMAIL_ACCOUNT = email_account
self.EMAIL_PASSWORD = email_password or os.environ.get("SMTP_PASSWORD") or ""
self.EMAIL_CODES: Dict[str, Dict[str, str]] = {}

# --- Policy / flow (aligned with your original, plus explicit end/objective tools) ---
self.SYSTEM_PROMPT = (
"You are a banking authentication assistant. "
"Gather user information, verify with verify_personal_info, send an email code, confirm it, "
"and if the user requests a password reset, first verify the users personal info, then send_email_code, then get the code from the user and use confirm_email_code, then finally use send_password_reset_link to email them a reset link. "
"Never expose account data. Keep prompts brief and clear. "
"If identity verification fails, offer one retry; if it still fails, end gracefully and direct the user to human support "
"(call end_session_tool). "
"When the task is fully complete or the user wants to go back to the main menu, "
"call objective_completion_tool with a one-line summary and then end_session_tool. "
"Speak English."
)

# Build the realtime agent (base auto-adds end_session_tool / objective_completion_tool)
self.agent = RealtimeVoiceAgent(
api_key=self.api_key,
system_instructions=self.SYSTEM_PROMPT,
tools=self._make_tools_schema(),
handle_function_call=self._make_function_handler(),
debug=True,
# speak-first intro
speak_first=speak_first,
intro_instructions=intro_instructions,
intro_out_of_band=intro_out_of_band,
# Weave logging
conversation_id=conversation_id,
agent_id="password_agent",
)

# ---------------- Tool Schemas ----------------

def _make_tools_schema(self) -> List[Dict[str, Any]]:
return [
{
"type": "function",
"name": "verify_personal_info",
"description": "Validate personal info against bank_data.json. Returns {verified: bool}.",
"parameters": {
"type": "object",
"properties": {
"name": {"type": "string"},
"dob": {"type": "string", "description": "YYYY-MM-DD"},
"soc": {"type": "string", "description": "SSN NNN-NN-NNNN"},
"address": {"type": "string"},
"account_number": {"type": "string"}
},
"required": ["name", "dob", "soc", "address", "account_number"]
}
},
{
"type": "function",
"name": "send_email_code",
"description": "Looks up the email on file for the account and emails a 6-digit code.",
"parameters": {
"type": "object",
"properties": {"account_number": {"type": "string"}},
"required": ["account_number"]
}
},
{
"type": "function",
"name": "confirm_email_code",
"description": "Confirm the emailed code.",
"parameters": {
"type": "object",
"properties": {
"account_number": {"type": "string"},
"code": {"type": "string"}
},
"required": ["account_number", "code"]
}
},
{
"type": "function",
"name": "send_password_reset_link",
"description": "Sends a password reset link to the user's registered email address.",
"parameters": {
"type": "object",
"properties": {"account_number": {"type": "string"}},
"required": ["account_number"]
}
}
]

# ---------------- Helpers ----------------

def _load_email_for_account(self, account_number: str) -> Optional[str]:
try:
data = self.bank._load_db()
rec = data.get(account_number)
return rec.get("email") if rec else None
except Exception:
return None

def _send_email(self, to_addr: str, subject: str, body: str, html: Optional[str] = None) -> Dict[str, Any]:
if not to_addr:
return {"ok": False, "message": "no recipient"}
msg = EmailMessage()
msg["From"] = self.EMAIL_ACCOUNT
msg["To"] = to_addr
msg["Subject"] = subject
msg.set_content(body)
if html:
msg.add_alternative(html, subtype="html")
try:
with smtplib.SMTP("smtp.gmail.com", 587) as s:
s.starttls(context=ssl.create_default_context())
s.login(self.EMAIL_ACCOUNT, self.EMAIL_PASSWORD)
s.send_message(msg)
return {"ok": True}
except Exception as e:
return {"ok": False, "message": str(e)}

# ---------------- Tool Implementations ----------------

def verify_personal_info(self, name: str, dob: str, soc: str, address: str, account_number: str) -> Dict[str, bool]:
ok = self.bank.validate_customer(name=name, dob=dob, soc=soc, address=address, account_number=account_number)
return {"verified": bool(ok)}

def send_email_code(self, account_number: str) -> Dict[str, Any]:
email_to_send = self._load_email_for_account(account_number)
if not email_to_send:
return {"status": "error", "message": "no email on file for this account"}

code = f"{random.randint(100000, 999999)}"
self.EMAIL_CODES[account_number] = {"email": email_to_send, "code": code}
subject = "Your verification code"
body = f"Your verification code is {code}. It expires in 10 minutes."
html = f"<p>Your verification code is <b>{code}</b>. It expires in 10 minutes.</p>"
send_result = self._send_email(email_to_send, subject, body, html)
if not send_result.get("ok"):
return {"status": "error", "message": send_result.get("message", "send failed")}
return {"status": "sent", "to": email_to_send}

def confirm_email_code(self, account_number: str, code: str) -> Dict[str, Any]:
rec = self.EMAIL_CODES.get(account_number)
if not rec:
return {"success": False, "message": "no code sent"}
if rec["code"] == str(code).strip():
del self.EMAIL_CODES[account_number]
return {"success": True}
return {"success": False, "message": "invalid code"}

def send_password_reset_link(self, account_number: str) -> Dict[str, Any]:
email_to_send = self._load_email_for_account(account_number)
if not email_to_send:
return {"status": "error", "message": "no email on file for this account"}
reset_link = f"https://bank-secure-reset.com/reset/{random.randint(1000000, 9999999)}"
subject = "Password Reset Request"
body = f"Click this link to reset your password: {reset_link}\nThis link will expire in 15 minutes."
html = f"<p>Click below to reset your password:</p><a href='{reset_link}'>{reset_link}</a>"
send_result = self._send_email(email_to_send, subject, body, html)
if not send_result.get("ok"):
return {"status": "error", "message": send_result.get("message", "send failed")}
return {"status": "sent", "link": reset_link, "to": email_to_send}

# ---------------- WS Tool Dispatcher ----------------

def _make_function_handler(self):
"""
Returns a callback(ws, item) for RealtimeVoiceAgent:
- Parses tool call
- Executes local method
- Posts function_call_output (base manages safe response timing)
"""
def handler(ws, item: Dict[str, Any]):
name = item.get("name")
args = json.loads(item.get("arguments", "{}") or "{}")

try:
if name == "verify_personal_info":
res = self.verify_personal_info(**args)
elif name == "send_email_code":
res = self.send_email_code(**args)
elif name == "confirm_email_code":
res = self.confirm_email_code(**args)
elif name == "send_password_reset_link":
res = self.send_password_reset_link(**args)
else:
res = {"error": f"unknown tool: {name}"}
except Exception as e:
res = {"error": str(e)}

ws.send(json.dumps({
"type": "conversation.item.create",
"item": {
"type": "function_call_output",
"call_id": item.get("call_id"),
"output": json.dumps(res)
}
}))
print(f"[DEBUG] tool_result posted -> {name}")

return handler

# ---------------- Public API ----------------

def run(self) -> List[Dict[str, Any]]:
"""
Starts the realtime voice session. Press Ctrl+C to stop.
The assistant ends itself when objective_completion_tool / end_session_tool are invoked.
Returns the full transcript of the conversation.
"""
self.agent.start()
try:
while not self.agent._stop_event.is_set():
import threading
threading.Event().wait(0.25)
except KeyboardInterrupt:
self.agent.stop()
return self.agent.get_transcript()


# -------------- direct launch (optional) --------------
if __name__ == "__main__":
conversation_id = os.environ.get("CONVERSATION_ID")
agent = AuthAgent(
email_account=os.environ.get("SMTP_USER", "byyoung3@gmail.com"),
email_password=os.environ.get("SMTP_PASSWORD", "zleoajqasqlqccci"),
speak_first=True,
intro_instructions=(
"Hi! I can help verify your identity and send you a 6-digit email code. "
"To get started, what's your account number?"
),
conversation_id=conversation_id,
)
transcript = agent.run()

# Write transcript to file if TRANSCRIPT_FILE env var is set
transcript_file = os.environ.get("TRANSCRIPT_FILE")
if transcript_file and transcript:
try:
import json
with open(transcript_file, 'w') as f:
json.dump(transcript, f)
except Exception as e:
print(f"[ERROR] Failed to write transcript: {e}")
The base voice runtime handles the audio stream and the websocket session, while this agent focuses on the domain logic. Tool calls are logged to the transcript for later review. Objective completion and end-of-session signals let an orchestrator mark success and close the session cleanly. In practice, this yields a short flow that starts with identity, delivers a code, validates it, and issues a reset link, all without exposing sensitive fields in responses.

Building our fraud investigation agent

The fraud transaction investigation agent handles reports of suspicious charges. It operates on the same real-time voice base as the other assistants, handling audio input and output, transcription, and tool calling. Its behavior is defined by system instructions, a tool schema that declares allowed actions, and a single function handler that executes the underlying Python functions when those tools are invoked.
The flow is straightforward. The agent authenticates the caller against the local banking data, then asks for a date and a merchant name. It searches the ledger for matches and reads back a compact list with index numbers for easy selection. After the caller picks an item, the agent gathers a brief description, files a dispute as a structured record in the database, offers to block the merchant for future charges, and, if confirmed, updates card settings. A confirmation email with the dispute details is sent to the address on file.
Here’s the code for the agent:
import os
import json
import ssl
import smtplib
import random
from typing import List, Dict, Any, Optional
from email.message import EmailMessage

from bank_infra import BankSystem
from realtime_agent_base import RealtimeVoiceAgent # your voice wrapper


class FraudAgent:
"""
Voice Fraud Agent
- Public API unchanged: __init__(...), run()
- Internals use RealtimeVoiceAgent for audio I/O, VAD, and function-calling.
"""

def __init__(
self,
db_path: str = "bank_data.json",
api_key: Optional[str] = None,
email_account: str = "your email address",
email_password: Optional[str] = "app password for your account",
speak_first: bool = True,
intro_instructions: str = (
"Hello, you’ve reached the fraud desk. I’ll help you report a suspicious charge. "
"To begin, please say your account number. Speak English."
),
):
self.api_key = api_key or os.environ.get("OPENAI_API_KEY")
self.bank = BankSystem(db_path)
self.email_account = email_account
self.email_password = email_password or os.environ.get("SMTP_PASSWORD") or ""

# ---- System policy (semantics match your original) ----
self.SYSTEM_PROMPT = (
"You are a bank assistant for reporting fraudulent transactions. "
"Authenticate the customer before taking action. "
"Ask for account number, then full name, date of birth YYYY-MM-DD, SSN, and address. "
"Call verify_personal_info_tool. If verification fails, allow one retry, then stop and direct the user to contact support (call end_session_tool). "
"After verification, require the user to provide a search date in YYYY-MM-DD and a merchant string. "
"Call search_transactions_tool with those two fields. Show results with index numbers starting at 1, including timestamp, merchant, amount, and status. "
"If zero results, ask the user for a nearby date or a different merchant string and search again. "
"When the user selects an index, echo the exact timestamp, merchant, and amount and ask the user to confirm. "
"Then ask for a short reason and any extra details. "
"Call create_dispute_tool with timestamp, merchant, amount, reason, and extra details. "
"After a successful dispute creation, ask if they want to block this merchant and if yes, call block_merchant_tool. "
"Never reveal balances or SSN. Do not claim that funds are returned. "
"After creating the dispute, email a copy of the dispute report to the email on file for the account. "
"When the task is fully complete or the user wants to go back to the main menu, "
"call objective_completion_tool with a one-line summary and then end_session_tool. "
"Speak English."
)

# Build the realtime voice agent (it auto-adds objective/end tools)
self.agent = RealtimeVoiceAgent(
api_key=self.api_key,
system_instructions=self.SYSTEM_PROMPT,
tools=self._make_tools_schema(), # user tools
handle_function_call=self._make_function_handler(),
debug=True,
# speak-first configuration
speak_first=speak_first,
intro_instructions=intro_instructions,
)

# ---------- Tool schemas ----------

def _make_tools_schema(self) -> List[Dict[str, Any]]:
return [
{
"type": "function",
"name": "verify_personal_info_tool",
"description": "Validate customer identity. Returns {verified: bool}.",
"parameters": {
"type": "object",
"properties": {
"name": {"type": "string"},
"dob": {"type": "string", "description": "YYYY-MM-DD"},
"soc": {"type": "string", "description": "SSN NNN-NN-NNNN"},
"address": {"type": "string"},
"account_number": {"type": "string"}
},
"required": ["name", "dob", "soc", "address", "account_number"]
}
},
{
"type": "function",
"name": "search_transactions_tool",
"description": "Search transactions by date and merchant substring. Amount is not used. Returns list sorted newest first.",
"parameters": {
"type": "object",
"properties": {
"account_number": {"type": "string"},
"date": {"type": "string", "description": "YYYY-MM-DD"},
"merchant_query": {"type": "string"},
"include_failed": {"type": "boolean", "default": True}
},
"required": ["account_number", "date", "merchant_query"]
}
},
{
"type": "function",
"name": "create_dispute_tool",
"description": "Create a dispute record and email a report to the customer.",
"parameters": {
"type": "object",
"properties": {
"account_number": {"type": "string"},
"timestamp": {"type": "string", "description": "ISO timestamp of the suspected transaction"},
"merchant": {"type": "string"},
"amount": {"type": "number"},
"reason": {"type": "string"},
"extra_details": {"type": "string"}
},
"required": ["account_number", "timestamp", "merchant", "amount", "reason"]
}
},
{
"type": "function",
"name": "block_merchant_tool",
"description": "Block a merchant for future charges on this card.",
"parameters": {
"type": "object",
"properties": {
"account_number": {"type": "string"},
"merchant": {"type": "string"}
},
"required": ["account_number", "merchant"]
}
}
]

# ---------- Tool implementations ----------

@staticmethod
def _same_day(ts: str, day: str) -> bool:
ts_day = ts.split("T", 1)[0][:10]
return ts_day == day[:10]

def _load_email_for_account(self, account_number: str) -> Optional[str]:
try:
data = self.bank._load_db()
rec = data.get(account_number)
return rec.get("email") if rec else None
except Exception:
return None

def _send_email(self, to_addr: str, subject: str, body: str, html: Optional[str] = None) -> Dict[str, Any]:
if not to_addr:
return {"ok": False, "message": "no recipient"}
msg = EmailMessage()
msg["From"] = self.email_account
msg["To"] = to_addr
msg["Subject"] = subject
msg.set_content(body)
if html:
msg.add_alternative(html, subtype="html")
try:
with smtplib.SMTP("smtp.gmail.com", 587) as s:
s.starttls(context=ssl.create_default_context())
s.login(self.email_account, self.email_password)
s.send_message(msg)
return {"ok": True}
except Exception as e:
return {"ok": False, "message": str(e)}

# --- tool bodies ---

def verify_personal_info_tool(self, name: str, dob: str, soc: str, address: str, account_number: str) -> Dict[str, bool]:
ok = self.bank.validate_customer(name=name, dob=dob, soc=soc, address=address, account_number=account_number)
return {"verified": bool(ok)}

def search_transactions_tool(
self,
account_number: str,
date: str,
merchant_query: str,
include_failed: bool = True
) -> List[Dict[str, Any]]:
if not date or not merchant_query:
raise ValueError("date and merchant_query are required")

hist = self.bank.get_spending_history(account_number)
mq = merchant_query.strip().lower()
out: List[Dict[str, Any]] = []

for (ttype, amt, ts, desc, status) in hist:
if not self._same_day(ts, date):
continue
if mq not in str(desc).lower():
continue
if not include_failed and status == "failed":
continue
out.append({
"type": ttype,
"amount": float(amt),
"timestamp": ts,
"merchant": desc,
"status": status
})

out.sort(key=lambda r: r["timestamp"], reverse=True)
return out

def create_dispute_tool(
self,
account_number: str,
timestamp: str,
merchant: str,
amount: float,
reason: str,
extra_details: Optional[str] = None
) -> Dict[str, Any]:
data = self.bank._load_db()
acc = data.get(account_number)
if not acc:
return {"ok": False, "message": "account not found"}

disputes = acc.setdefault("disputes", [])
case_id = f"D{random.randint(100000, 999999)}"
disputes.append({
"case_id": case_id,
"timestamp": timestamp,
"merchant": merchant,
"amount": float(amount),
"reason": reason,
"details": extra_details or "",
"status": "submitted"
})

acc.setdefault("history", []).append(["Withdraw", float(amount), timestamp, f"{merchant} [DISPUTED]", "disputed"])
self.bank._save_db(data)

to_email = self._load_email_for_account(account_number)
report_txt = (
f"Fraud Dispute Report\n"
f"Case ID: {case_id}\n"
f"Account: {account_number}\n"
f"Transaction Timestamp: {timestamp}\n"
f"Merchant: {merchant}\n"
f"Amount: ${float(amount):.2f}\n"
f"Reason: {reason}\n"
f"Details: {extra_details or ''}\n"
f"Status: submitted\n"
)
report_html = (
f"<h3>Fraud Dispute Report</h3>"
f"<p><b>Case ID:</b> {case_id}</p>"
f"<p><b>Account:</b> {account_number}</p>"
f"<p><b>Transaction Timestamp:</b> {timestamp}</p>"
f"<p><b>Merchant:</b> {merchant}</p>"
f"<p><b>Amount:</b> ${float(amount):.2f}</p>"
f"<p><b>Reason:</b> {reason}</p>"
f"<p><b>Details:</b> {extra_details or ''}</p>"
f"<p><b>Status:</b> submitted</p>"
)

email_result = {"ok": False, "message": "no email on file"}
if to_email:
email_result = self._send_email(
to_addr=to_email,
subject=f"Your Fraud Dispute Report — Case {case_id}",
body=report_txt,
html=report_html
)

return {
"ok": True,
"disputes_count": len(disputes),
"case_id": case_id,
"email": {"sent": bool(email_result.get("ok")), "to": to_email, "error": email_result.get("message")}
}

def block_merchant_tool(self, account_number: str, merchant: str) -> Dict[str, Any]:
return self.bank.disable_merchant(account_number, merchant)

# ---------- Realtime tool dispatch ----------

def _make_function_handler(self):
"""
Returns a callback(ws, item) for RealtimeVoiceAgent.
Invokes our Python tool, posts JSON result, and lets the agent manage response timing.
"""
def handler(ws, item: Dict[str, Any]):
name = item.get("name")
args = json.loads(item.get("arguments", "{}") or "{}")

try:
if name == "verify_personal_info_tool":
res = self.verify_personal_info_tool(**args)
elif name == "search_transactions_tool":
res = self.search_transactions_tool(**args)
elif name == "create_dispute_tool":
res = self.create_dispute_tool(**args)
elif name == "block_merchant_tool":
res = self.block_merchant_tool(**args)
else:
res = {"error": f"unknown tool: {name}"}
except Exception as e:
res = {"error": str(e)}

ws.send(json.dumps({
"type": "conversation.item.create",
"item": {
"type": "function_call_output",
"call_id": item.get("call_id"),
"output": json.dumps(res)
}
}))
print(f"[DEBUG] tool_result posted -> {name}")
# Do NOT send response.create here. The wrapper coordinates that.
return handler

# ---------- Public run() ----------

def run(self) -> List[Dict[str, Any]]:
"""Run the agent and return the full transcript."""
self.agent.start()
try:
while not self.agent._stop_event.is_set():
import threading
threading.Event().wait(0.25)
except KeyboardInterrupt:
self.agent.stop()
return self.agent.get_transcript()


# -------- Demo (run directly) --------
if __name__ == "__main__":
conversation_id = os.environ.get("CONVERSATION_ID")
agent = FraudAgent(
email_account=os.environ.get("SMTP_USER", "no-reply@bank.example"),
email_password=os.environ.get("SMTP_PASSWORD", "")
)
transcript = agent.run()

# Write transcript to file if TRANSCRIPT_FILE env var is set
transcript_file = os.environ.get("TRANSCRIPT_FILE")
if transcript_file and transcript:
try:
with open(transcript_file, 'w') as f:
json.dump(transcript, f)
except Exception as e:
print(f"[ERROR] Failed to write transcript: {e}")
Every session produces a transcript for logging and review. It captures user speech, assistant replies, and each tool call with arguments and results. That transcript is saved so it can be logged to Weave inside the chat loop script.
Weave is a tracing and observability layer for AI applications. Calls to your functions become trace nodes with inputs, outputs, timing, and errors. Related calls can be grouped into a thread so one customer session shows up as a clear sequence of turns like verify identity, search transactions, create dispute, block merchant, send email.

Building the failed transaction agent

Our failed transaction agent handles declined or blocked card payments for our call center. It uses the same real-time voice base as the other paths, so audio input, speech output, and function calling are already wired. You configure behavior with system instructions, pass a tool schema, and provide a single function handler that executes the Python methods when tools are invoked.
The flow starts with identity. The agent asks for the account number, full name, date of birth in YYYY MM DD, SSN in NNN NN NNNN, and address, then calls the verification tool against the local banking data. If verification fails, it offers one retry and ends politely. It never reads the SSN back, and it does not reveal balances.
After verification, the caller provides a date. The agent calls the failed list tool and reads back a compact set of failed items with index numbers. If there is exactly one item, it asks for confirmation. If there are several, it asks the caller to pick an index. It then confirms the merchant name and calls the whitelist tool to allow that merchant on the card, returning a clear success message. If no failed items exist for that date, it offers to list all transactions for the same day so the caller can double-check the merchant or date before whitelisting.
Here’s the code for the Failed Transaction Agent
import os
import json
from typing import List, Dict, Any, Optional

from bank_infra import BankSystem
from realtime_agent_base import RealtimeVoiceAgent # your fixed voice base


class FailedTransactionAgent:
"""
Voice-first Failed Transaction assistant built on RealtimeVoiceAgent.

Public surface kept simple:
- __init__(db_path="bank_data.json", api_key=None, speak_first=True, intro_instructions=...)
- run()
"""

def __init__(
self,
db_path: str = "bank_data.json",
api_key: Optional[str] = None,
speak_first: bool = True,
intro_instructions: str = (
"Hello, I'm the failed-transactions assistant. "
"I can help with declined or blocked payments. "
"First I'll verify your identity, then we'll review the failed charge. "
"What happened today?"
),
intro_out_of_band: bool = False,
conversation_id: Optional[str] = None,
):
self.api_key = api_key or os.environ.get("OPENAI_API_KEY")
self.bank = BankSystem(db_path)

# --- System policy (mirrors your original, with explicit end/objective tools) ---
self.SYSTEM_PROMPT = (
"You are a bank support assistant focused on failed card transactions. "
"Authenticate the customer before taking any action. "
"Flow: ask for the account number, then full name, date of birth YYYY-MM-DD, SSN, and address. "
"Call verify_personal_info_tool. If not verified, offer one retry, then stop and suggest contacting support "
"(call end_session_tool). "
"After successful verification, ask for the transaction date in YYYY-MM-DD. "
"Call list_failed_tool for that date, show each failed item with an index starting at 1 "
"(include timestamp, merchant, and amount only). "
"If exactly one failed item, ask for confirmation. If multiple, ask the user to pick the index. "
"Confirm the selected merchant string with the user. "
"Then call whitelist_tool to add the merchant to the allowlist and confirm success. "
"If there are zero failed items on that date, offer to list all transactions for that date via "
"list_transactions_tool so the user can confirm the date or merchant name. "
"Never reveal balances or SSN (SOC). Do not echo the SSN back to the user. "
"Do not claim to unblock or settle funds. Only update the whitelist. "
"Keep prompts short and clear. "
"When the task is fully complete or the user wants to go back to the main menu, "
"call objective_completion_tool with a one-line summary and then end_session_tool. "
"Speak English."
)

# Build the realtime agent (base automatically adds end/objective tools)
self.agent = RealtimeVoiceAgent(
api_key=self.api_key,
system_instructions=self.SYSTEM_PROMPT,
tools=self._make_tools_schema(),
handle_function_call=self._make_function_handler(),
debug=True,
# --- optional speak-first intro (supported by your base) ---
speak_first=speak_first,
intro_instructions=intro_instructions,
intro_out_of_band=intro_out_of_band,
# Weave logging
conversation_id=conversation_id,
agent_id="transaction_agent",
)

# ---------------- Tool schemas ----------------

def _make_tools_schema(self) -> List[Dict[str, Any]]:
return [
{
"type": "function",
"name": "verify_personal_info_tool",
"description": "Validate customer identity against bank_data.json. Returns {verified: bool}.",
"parameters": {
"type": "object",
"properties": {
"name": {"type": "string"},
"dob": {"type": "string", "description": "YYYY-MM-DD"},
"soc": {"type": "string", "description": "SSN format NNN-NN-NNNN"},
"address": {"type": "string"},
"account_number": {"type": "string"}
},
"required": ["name", "dob", "soc", "address", "account_number"]
}
},
{
"type": "function",
"name": "list_transactions_tool",
"description": "List all transactions on a given date. Returns objects with type, amount, timestamp, merchant, status.",
"parameters": {
"type": "object",
"properties": {
"account_number": {"type": "string"},
"date": {"type": "string", "description": "YYYY-MM-DD"}
},
"required": ["account_number", "date"]
}
},
{
"type": "function",
"name": "list_failed_tool",
"description": "List failed transactions on a given date. Returns objects with type, amount, timestamp, merchant.",
"parameters": {
"type": "object",
"properties": {
"account_number": {"type": "string"},
"date": {"type": "string", "description": "YYYY-MM-DD"}
},
"required": ["account_number", "date"]
}
},
{
"type": "function",
"name": "whitelist_tool",
"description": "Add a merchant to allowed list for this card.",
"parameters": {
"type": "object",
"properties": {
"account_number": {"type": "string"},
"merchant": {"type": "string"}
},
"required": ["account_number", "merchant"]
}
}
]

# ---------------- Tool implementations ----------------

def verify_personal_info_tool(
self, name: str, dob: str, soc: str, address: str, account_number: str
) -> Dict[str, bool]:
ok = self.bank.validate_customer(
name=name, dob=dob, soc=soc, address=address, account_number=account_number
)
return {"verified": bool(ok)}

def list_transactions_tool(self, account_number: str, date: str) -> List[Dict[str, Any]]:
txns = self.bank.list_transactions_by_date(account_number, date, include_failed=True)
return [{"type": t, "amount": a, "timestamp": ts, "merchant": d, "status": s} for t, a, ts, d, s in txns]

def list_failed_tool(self, account_number: str, date: str) -> List[Dict[str, Any]]:
txns = self.bank.list_failed_by_date(account_number, date)
return [{"type": t, "amount": a, "timestamp": ts, "merchant": d} for t, a, ts, d in txns]

def whitelist_tool(self, account_number: str, merchant: str) -> Dict[str, Any]:
return self.bank.whitelist_merchant(account_number, merchant)

# ---------------- WS tool dispatcher ----------------

def _make_function_handler(self):
"""
Returns a callback(ws, item) suitable for RealtimeVoiceAgent.
- Parses the tool call
- Executes the local method
- Sends function_call_output back over the socket
(The base agent manages response.create timing to avoid double firing.)
"""
def handler(ws, item: Dict[str, Any]):
name = item.get("name")
args = json.loads(item.get("arguments", "{}") or "{}")

try:
if name == "verify_personal_info_tool":
res = self.verify_personal_info_tool(**args)
elif name == "list_transactions_tool":
res = self.list_transactions_tool(**args)
elif name == "list_failed_tool":
res = self.list_failed_tool(**args)
elif name == "whitelist_tool":
res = self.whitelist_tool(**args)
else:
res = {"error": f"unknown tool: {name}"}
except Exception as e:
res = {"error": str(e)}

ws.send(json.dumps({
"type": "conversation.item.create",
"item": {
"type": "function_call_output",
"call_id": item.get("call_id"),
"output": json.dumps(res)
}
}))
print(f"[DEBUG] tool_result posted -> {name}")

return handler

# ---------------- Public API ----------------

def run(self) -> List[Dict[str, Any]]:
"""
Starts the realtime voice session. Press Ctrl+C to stop.
The assistant ends itself when objective_completion_tool / end_session_tool are invoked.
Returns the full transcript of the conversation.
"""
self.agent.start()
try:
while not self.agent._stop_event.is_set():
import threading
threading.Event().wait(0.25)
except KeyboardInterrupt:
self.agent.stop()
return self.agent.get_transcript()


# -------------- direct launch (optional) --------------
if __name__ == "__main__":
conversation_id = os.environ.get("CONVERSATION_ID")
agent = FailedTransactionAgent(conversation_id=conversation_id)
transcript = agent.run()

# Write transcript to file if TRANSCRIPT_FILE env var is set
transcript_file = os.environ.get("TRANSCRIPT_FILE")
if transcript_file and transcript:
try:
import json
with open(transcript_file, 'w') as f:
json.dump(transcript, f)
except Exception as e:
print(f"[ERROR] Failed to write transcript: {e}")

Testing and debugging our AI agentsallows the password reset agent to deliver verification codes and reset links, while the fraud agent automatically sends dispute confirmation emails

After building the agents, we can start experimenting with the system in real time. To authenticate successfully, you'll need to provide the same personal details that appear in the JSON database: name, date of birth, address, SSN, and account number. These fields allow the verification tools to match your identity against the mock banking records and unlock the rest of the interaction flow.
If you want to enable email features, you also need to configure an app password for your email account so the agents can send and receive messages through SMTP. This lets the password reset agent deliver verification codes and reset links, and the fraud agent sends dispute confirmation emails automatically. Make sure your email address is included in one of the user entries in the database so you can actually receive those messages.
Running this system on your own machine may require small adjustments to the audio handling code. Audio behavior changes between setups, especially on macOS, where input and output devices are managed differently. I tested the system on an M1 Pro MacBook Pro, and while it runs smoothly overall, the underlying sound libraries can act up at times. Occasionally, the input stream does not close properly, or playback buffers stay open after an agent exits. To avoid this, each agent runs in a separate subprocess so that every session starts with a clean audio context and no leftover device locks.
You should always use headphones. If the agent’s voice plays through your speakers, the microphone will capture it again, causing the model to think it is still hearing the user. This feedback loop makes the agent talk to itself endlessly. Using headphones isolates the output from the microphone, keeping the conversation stable and avoiding echo or repeated speech.
Depending on your hardware, you might also need to adjust the input and output sample rates or the audio block size in the RealtimeVoiceAgent class. These parameters control how the audio streams are buffered and processed, and tuning them can fix timing or distortion issues. Once set correctly, the agents will handle live speech naturally, listening and responding in sync without glitches or confusion.
After testing your agent, you can open Weave to explore detailed traces of every interaction. Each conversation is logged as a sequence of events, showing the user’s speech, the model’s responses, and all function calls in between. You can expand any node to view the exact arguments passed into a tool, the returned output, and the timing for each step.
Weave gives you a visual record of how your agent behaves in real time. You can trace the full path of a session, from the first spoken input to the final end signal, making it easy to spot logic errors, latency spikes, or unexpected tool usage. It’s especially useful for refining prompts and verifying that your agents are following the intended conversation flow accurately.



Sample audio

I decided to add some logic to record the audio, and log the audio to Weights & Biases as well. Here's a sample of me reporting a suspicious transaction to the fraud agent:


Run set
1


Our modernized call center system

This project shows what happens when real-time speech, tool execution, and programmatic control meet in one system. Starting from a simple idea, automating common banking calls, we built a complete environment where conversational agents act as independent service representatives, each handling a clear, functional task. The foundation is the real-time voice framework, which links live audio streams to the model and routes tool calls to actual Python functions backed by a simulated banking database.
By layering the emulator, the voice agent base, the specialized assistants, and the orchestration loop, we created a comprehensive voice-driven call flow that authenticates users, reviews transactions, files disputes, and logs every step for inspection in Weave. The result is a working prototype of a multi-agent system that feels natural, transparent, and grounded in real data.
This isn’t just a proof of concept for a voice interface; it’s a foundation for building complex, auditable AI systems that can operate through natural conversation. With further tuning, real APIs, and tighter audio integration, the same framework could support real call handling at scale. The entire stack, from the JSON backend to the real-time websocket layer, demonstrates how voice, reasoning, and execution can exist in a single continuous loop.




Iterate on AI agents and models faster. Try Weights & Biases today.