Skip to main content

The Microsoft Agent Framework: Observability

Build resilient, multi-step AI agents with the Microsoft Agent Framework - combining orchestration, human-in-the-loop review, and checkpointing for real apps.
Created on September 29|Last edited on October 2
Building agents that go beyond single-prompt response calls requires more than clever prompting. Real applications demand systems that can coordinate multiple steps, incorporate human oversight, and survive interruptions. The Microsoft Agent Framework was designed specifically for this purpose. It gives developers a consistent way to assemble agents into workflows that can branch, pause, resume, and adapt to real-world conditions. By layering orchestration, human-in-the-loop support, and checkpointing on top of a unified client abstraction for OpenAI and Azure OpenAI, the framework turns experimental prototypes into durable systems.
In this article, we will explore how the Microsoft Agent Framework enables three core capabilities:
  • First, we’ll examine orchestration and see how executors connect to workflow graphs that can run in sequence, parallel, or in loops.
  • Then we’ll examine human-in-the-loop design, where workflows pause to gather approvals or feedback before resuming seamlessly.
  • Finally, we’ll cover checkpointing, which enables long-running workflows to save and restore their state without having to restart after interruptions.
Together, these features underpin an agentic framework that provides a foundation for building agents that are not only functional but also resilient and adaptable in real-world scenarios.

Table of contents



Orchestration

The Microsoft Agent Framework treats orchestration as more than just chaining model calls together. It provides a way to design full workflows as graphs of executors, each responsible for a specific step and communicating through typed messages. Instead of ad-hoc function calls or brittle glue code, you describe the structure once in a WorkflowBuilder, and the framework handles routing, scheduling, and state management as the workflow runs.
A simple case is a linear pipeline. One executor generates a draft, another refines it, and a final one publishes the result. The hand-off is automatic: the output type from one step matches the input type of the next, so messages flow naturally through the graph. This makes pipelines easier to extend, for example by inserting a fact-checker or a style adapter in between steps.
Parallel branching (fan-out) is just as straightforward. A single input can be split into multiple paths simultaneously, allowing different agents to work independently. In the Microsoft Agent Framework finance demo, a company query is fanned out into two executors: one gathers technical and valuation data from market sources, while the other scans recent news. Each branch runs asynchronously, and the results come back in parallel.
Rejoining (fan-in) is handled by a join executor that waits until it has the required inputs before emitting a combined message. In the same demo, the JoinExecutor collects both technical and news outputs and merges them into a single TickerView, which is then handed to a reporter agent to produce a unified analysis.
Other structural patterns are supported naturally. You can create mixed chains where branches diverge, do heterogeneous work, and then rejoin before continuing. Cyclic loops work too, as in their content review example, where a draft cycles between writer and reviewer until approval is given. Even multi-stage parallel waves, where one layer of branching feeds into a join and then branches again, can be expressed cleanly.
Because every message is a dataclass, the types themselves define the routing rules. This makes the system deterministic and reduces ambiguity when workflows grow larger. Developers don’t need to manually code who should handle what; the graph and message types make it explicit. The result is an orchestration model that supports linear, parallel, cyclic, and layered flows using the same underlying primitives, providing flexibility to build both simple pipelines and complex, long-running agent systems.

Human-in-the-Loop support

There are many cases where an agent on its own isn’t enough, because some steps require human oversight.
Think about an AI system generating financial research notes. Compliance officers may need to sign off on the content before it is sent to clients. In a marketing workflow, a human editor may want to check tone and accuracy before publishing. In customer service, escalation paths sometimes require a manager's approval for a refund or policy exception.
The framework addresses these situations directly. When an executor produces a HumanReviewRequest, the workflow pauses and records that it’s waiting for input. An external interface, such as a command line, a web app, or a chat integration, collects the reviewer’s response. That response is wrapped in a RequestResponse and passed back into the workflow, which then continues as though nothing was interrupted.
A practical example is found in the Microsoft Agent Framework content creation demo: a writer executor drafts a post, the coordinator executor forwards it to a human reviewer, and depending on the feedback, either the publisher executor releases it or the draft goes back to the writer for revision. Without first-class support for human review, developers would have to bolt together custom pause and resume logic, but within this new frameworkd it’s part of the runtime itself.
This design makes it easy to embed judgment-heavy checkpoints inside an automated process, ensuring that agents don’t operate in isolation, but as collaborators with humans where necessary.

Checkpointing support

Agent workflows are often not short, atomic tasks. They may span minutes, and in rare cases, hours, depending on the amount of computation, external input, or coordination involved. Along the way, they might rely on outside services, human approval, or scheduled pauses. Without a way to preserve state, any interruption, whether a crash, a network error, or a delayed response, would force the system to restart from the very beginning. The Microsoft Agent Framework addresses this with checkpointing, a built-in mechanism for saving and restoring workflow state.
Checkpointing works by capturing the workflow at well-defined safe points. At those points, the framework records pending messages in the queue, shared data across executors, and any executor-specific state marked for persistence. These snapshots are stored in a checkpoint backend, such as a file-based storage layer, as shown in the examples; however, other storage systems could also be used.
When you restart a workflow, you don’t need to replay the entire history of messages and executor calls. Instead, you can enumerate the available checkpoints, choose one, and resume execution from that exact point. The framework reloads the saved state, restores the message queue, and continues as if the workflow had never been interrupted.
This design is useful in several scenarios. A workflow that requires human approval can pause and create a checkpoint right before sending the review request. Hours later, when the reviewer responds, the workflow resumes without losing context. In data-heavy flows, checkpointing prevents wasted computation when an API call fails or a timeout occurs, allowing you to restart from the last saved state instead of repeating earlier steps. For recurring jobs, such as nightly research tasks, checkpoints allow for incremental progress, so each run picks up where the last one left off.
The Microsoft Agent Services checkpointing demo illustrates this. A search agent produces a summary and saves its state; execution can then pause there. Later, you can reload from that checkpoint and continue with rewriting, without rerunning the search. This demonstrates how checkpointing enables workflows to be resilient to interruptions, human delays, and multi-session lifecycles.
By combining explicit routing with persistent checkpoints, the framework ensures workflows are not only flexible but also durable. This makes them more practical for real deployments, where the ability to resume partial progress is just as important as the ability to orchestrate complex flows.

Tutorial: Using the Microsoft Agent Framework with observability

This tutorial will guide you through three example scripts that highlight the most important features of the Microsoft Agent Framework: orchestration, human-in-the-loop support, and checkpointing. Each script shows a different way the framework can help structure complex workflows. The goal is not only to understand what the scripts do but also to see how they map onto real-world scenarios where agents need to coordinate multiple steps, handle interruptions, and remain reliable over time.

An orchestration example

Many workflows involve multiple steps. Sometimes tasks must run in a specific order, such as drafting content, revising it, and then publishing. In other situations, you may need to handle multiple tasks at the same time, like collecting different kinds of financial data in parallel.
The Microsoft Agent Framework lets you organize these processes as a graph of executors, with each step connected by clearly typed messages. You define the workflow structure up front, and the framework takes care of moving information between steps and managing the order of execution. This supports both simple sequential pipelines and more complex branching flows.
In the first script, we will show how to set up a workflow that takes a single company query and splits it into two parallel tasks. One executor will gather technical and valuation data, while another will scan for recent news. After both branches complete their tasks, their results are combined and reviewed by a reporting agent. This example demonstrates how to coordinate multiple agents and tasks within a single, organized workflow.
"""
Finance Research Agent: parallel technical and conceptual analysis

Agents
TechDataExecutor pulls fundamentals, valuation, technicals (yfinance) via class methods
ConceptNewsExecutor scans the web via the chat client, structures headlines, sentiment, why it matters
JoinExecutor waits for both branches and merges
ReporterExecutor uses an LLM to write an unbiased analysis

No CLI args. Set your OpenAI API key in env for agent_framework.openai.OpenAIChatClient.
Requires: pip install yfinance
"""

import asyncio
import json
import re
import hashlib
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from math import isnan
from typing import Any

import yfinance as yf
from pydantic import PrivateAttr

from agent_framework import (
Executor,
WorkflowBuilder,
WorkflowContext,
WorkflowOutputEvent,
handler,
)
from agent_framework.openai import OpenAIChatClient


import weave; weave.init("ms_agents")
# ===================== data types =====================

@dataclass
class ResearchQuery:
ticker: str
company_name: str
lookback_days: int = 7
question: str = "What changed recently and what should I watch next"


@dataclass
class TechSnapshot:
ticker: str
fundamentals: dict[str, Any]
valuation: dict[str, Any]
price: dict[str, Any]
events: dict[str, Any]
���

@dataclass
class CleanHeadlines:
ticker: str
items: list[dict[str, Any]] # title, url, published_at, source, sentiment, summary, why_it_matters


@dataclass
class TickerView:
ticker: str
tech: TechSnapshot
news: CleanHeadlines


# ===================== executors =====================

class TechDataExecutor(Executor):
"""All technicals helpers are class or static methods on this agent."""

def __init__(self, id: str = "tech"):
super().__init__(id=id)

# ---------- numeric helpers ----------
@staticmethod
def _pct(a, b):
try:
if b == 0 or b is None or a is None:
return None
return 100.0 * (a - b) / b
except Exception:
return None

@staticmethod
def _safe(v):
if v is None:
return None
try:
if isinstance(v, float) and isnan(v):
return None
except Exception:
pass
return v

# ---------- yfinance helpers ----------
@classmethod
def _next_earnings_date(cls, tkr: yf.Ticker):
try:
cal = tkr.calendar
if cal is None or cal.empty:
return None
for k in ["Earnings Date", "EarningsDate"]:
if k in cal.index:
val = cal.loc[k].values[0]
if hasattr(val, "to_pydatetime"):
return val.to_pydatetime().date().isoformat()
if isinstance(val, datetime):
return val.date().isoformat()
if isinstance(val, str):
return val[:10]
return None
except Exception:
return None

@classmethod
def _trailing_price_series(cls, ticker: str, months: int = 15):
end = datetime.now(timezone.utc)
start = end - timedelta(days=30 * months)
df = yf.download(
ticker,
start=start.date().isoformat(),
end=end.date().isoformat(),
progress=False,
auto_adjust=True,
)
return df

@classmethod
def _price_block(cls, df):
if df is None or df.empty:
return {
"px": None,
"pct_1m": None,
"pct_3m": None,
"range_52w": None,
"dist_from_52w_high": None,
"vol_vs_20d": None,
}
close = df["Close"]
last = cls._safe(float(close.iloc[-1]))

def nth_from_end(n):
try:
return float(close.iloc[-n])
except Exception:
return None

one_month = nth_from_end(21)
three_month = nth_from_end(63)

low_52w = cls._safe(float(close.rolling(252, min_periods=1).min().iloc[-1]))
high_52w = cls._safe(float(close.rolling(252, min_periods=1).max().iloc[-1]))
dist_from_high = cls._pct(last, high_52w)
vol = df["Volume"].astype(float)
vol_20d = cls._safe(float(vol.iloc[-20:].mean())) if len(vol) >= 20 else None
vol_last = cls._safe(float(vol.iloc[-1]))
vol_ratio = cls._safe(vol_last / vol_20d) if vol_20d and vol_20d != 0 else None

return {
"px": last,
"pct_1m": cls._pct(last, one_month),
"pct_3m": cls._pct(last, three_month),
"range_52w": f"{low_52w} to {high_52w}" if low_52w is not None and high_52w is not None else None,
"dist_from_52w_high": dist_from_high,
"vol_vs_20d": vol_ratio,
}

@classmethod
def _valuation_block(cls, info: dict, tkr: yf.Ticker):
pe = cls._safe(info.get("trailingPE"))
ps = cls._safe(info.get("priceToSalesTrailing12Months"))
try:
shares_out = cls._safe(info.get("sharesOutstanding"))
px = cls._safe(info.get("currentPrice"))
market_cap = px * shares_out if px and shares_out else cls._safe(info.get("marketCap"))
except Exception:
market_cap = cls._safe(info.get("marketCap"))
total_debt = cls._safe(info.get("totalDebt"))
cash = cls._safe(info.get("totalCash"))
enterprise_value = None
if market_cap is not None:
enterprise_value = market_cap + (total_debt or 0) - (cash or 0)
ebitda = cls._safe(info.get("ebitda"))
ev_ebitda = None
if enterprise_value is not None and ebitda and ebitda != 0:
ev_ebitda = enterprise_value / ebitda
return {
"pe": pe,
"ps": ps,
"ev_ebitda": ev_ebitda,
"enterprise_value": enterprise_value,
}

@classmethod
def _fundamentals_block(cls, tkr: yf.Ticker):
rev_yoy = None
eps_yoy = None
gross_margin = None
fcf_margin = None
try:
fin = tkr.financials
if fin is not None and not fin.empty:
rev = fin.loc["Total Revenue"].dropna().astype(float)
if len(rev) >= 2:
rev_yoy = cls._pct(rev.iloc[0], rev.iloc[1])
if "Gross Profit" in fin.index and "Total Revenue" in fin.index:
gp = float(fin.loc["Gross Profit"].dropna().iloc[0])
tr = float(fin.loc["Total Revenue"].dropna().iloc[0])
if tr:
gross_margin = 100.0 * gp / tr
except Exception:
pass
try:
q = tkr.quarterly_financials
if q is not None and not q.empty and "Diluted EPS" in q.index:
eps_series = q.loc["Diluted EPS"].dropna().astype(float)
if len(eps_series) >= 5:
eps_yoy = cls._pct(eps_series.iloc[0], eps_series.iloc[4])
except Exception:
pass
try:
cf = tkr.cashflow
if cf is not None and not cf.empty and "Total Cash From Operating Activities" in cf.index:
cfo = float(cf.loc["Total Cash From Operating Activities"].dropna().iloc[0])
capex = 0.0
if "Capital Expenditures" in cf.index:
capex = float(cf.loc["Capital Expenditures"].dropna().iloc[0])
fcf = cfo + capex
fin = tkr.financials
if fin is not None and not fin.empty and "Total Revenue" in fin.index:
revenue = float(fin.loc["Total Revenue"].dropna().iloc[0])
if revenue:
fcf_margin = 100.0 * fcf / revenue
except Exception:
pass
return {
"revenue_yoy": rev_yoy,
"eps_yoy": eps_yoy,
"gross_margin": gross_margin,
"fcf_margin": fcf_margin,
}

@classmethod
def get_technicals(cls, ticker: str) -> dict:
tkr = yf.Ticker(ticker)
info = tkr.info or {}
df = cls._trailing_price_series(ticker, months=15)
price = cls._price_block(df)
valuation = cls._valuation_block(info, tkr)
fundamentals = cls._fundamentals_block(tkr)
earnings = cls._next_earnings_date(tkr)
return {
"ticker": ticker,
"fundamentals": fundamentals,
"valuation": valuation,
"price": price,
"events": {"next_earnings_date": earnings},
}

@handler
async def get_tech(self, rq: ResearchQuery, ctx: WorkflowContext) -> None:
data = self.get_technicals(rq.ticker)
snap = TechSnapshot(
ticker=rq.ticker,
fundamentals=data["fundamentals"],
valuation=data["valuation"],
price=data["price"],
events=data["events"],
)
await ctx.send_message(snap)

import asyncio
import hashlib
import json
import re
from datetime import datetime, timedelta
from typing import Any

from openai import OpenAI

# assumes these come from your framework
# from your_pkg import Executor, handler, WorkflowContext, ResearchQuery, CleanHeadlines


class ConceptNewsExecutor(Executor):
_client: OpenAI

def __init__(self, id: str = "concept"):
super().__init__(id=id)
self._client = OpenAI()

@handler
async def scan_and_summarize(self, rq: ResearchQuery, ctx: WorkflowContext) -> None:
days = int(getattr(rq, "lookback_days", 10) or 10)
company = getattr(rq, "company_name", "") or getattr(rq, "ticker", "")
max_results = 10

window = {(datetime.now() - timedelta(days=i)).strftime("%Y-%m-%d") for i in range(days + 1)}

stories = await self._fetch_news(company=company, days_back=days, max_results=max_results)
if not isinstance(stories, list):
stories = []

stories = [s for s in stories if str(s.get("date_published", ""))[:10] in window]
stories = self._dedupe_headlines(stories)

cleaned = []
for it in stories:
title = it.get("title", "")
snippet = it.get("snippet", "")
dt = str(it.get("date_published", ""))[:10]

sent, summary, implications, date_clean = await self._classify_and_summarize(
title=title,
snippet=snippet,
date=dt or datetime.now().strftime("%Y-%m-%d"),
)

cleaned.append({
"title": title,
"url": it.get("url", ""),
"published_at": date_clean,
"source": self._source_from_url(it.get("url", "")),
"sentiment": sent or "neutral",
"summary": summary or "",
"why_it_matters": implications or "",
})

await asyncio.sleep(0.3)

await ctx.send_message(CleanHeadlines(ticker=rq.ticker, items=cleaned))

async def _fetch_news(self, company: str, days_back: int, max_results: int) -> list[dict[str, Any]]:
prompt = (
f"Find up to {max_results} of the most important English news stories about '{company}' "
f"from the last {days_back} days. Strictly output as a JSON array in a markdown code block like this: "
"[{\"title\": \"...\", \"url\": \"...\", \"date_published\": \"YYYY-MM-DD\", \"snippet\": \"...\"}] "
"If there are none, return []."
)

def _call():
return self._client.responses.create(
model="gpt-4.1",
tools=[{"type": "web_search_preview"}],
input=prompt,
)

resp = await asyncio.to_thread(_call)
output = getattr(resp, "output_text", "").strip() if resp else ""
stories = self._extract_json(output)
return stories if isinstance(stories, list) else []

async def _classify_and_summarize(self, title: str, snippet: str, date: str) -> tuple[str, str, str, str]:
prompt = (
"Given the following news story, respond ONLY with a JSON object: "
"{"
"\"sentiment\": \"positive\" or \"negative\", "
"\"summary\": \"detailed, multi-sentence summary (about 10 sentences)\", "
"\"implications\": \"Succinctly explain why this news matters for the company or industry (1-2 sentences)\", "
"\"date\": \"YYYY-MM-DD\""
"}. "
"Do not output anything except valid JSON. "
f"Title: {title}\nSnippet: {snippet}\nDate: {date}"
)

def _call():
return self._client.chat.completions.create(
model="gpt-4.1",
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
)

resp = await asyncio.to_thread(_call)
text = resp.choices[0].message.content.strip() if resp and resp.choices else ""
try:
obj = json.loads(text)
return (
str(obj.get("sentiment", "unknown")),
str(obj.get("summary", "")),
str(obj.get("implications", "")),
str(obj.get("date", date)),
)
except Exception:
sent = re.search(r'"sentiment"\s*:\s*"(\w+)"', text)
summ = re.search(r'"summary"\s*:\s*"([^"]+)"', text)
impact = re.search(r'"implications"\s*:\s*"([^"]+)"', text)
date_f = re.search(r'"date"\s*:\s*"([^"]+)"', text)
return (
sent.group(1) if sent else "unknown",
summ.group(1) if summ else "",
impact.group(1) if impact else "",
date_f.group(1) if date_f else date,
)

@staticmethod
def _source_from_url(url: str) -> str:
if not url:
return ""
return re.sub(r"^https?://(www\.)?", "", url).split("/")[0]

@staticmethod
def _extract_json(payload: str) -> Any:
if isinstance(payload, (dict, list)):
return payload

m = re.search(r"```json\s*(.*?)\s*```", payload, re.DOTALL)
if not m:
m = re.search(r"```\s*(.*?)\s*```", payload, re.DOTALL)
raw = m.group(1) if m else payload.strip()
try:
obj = json.loads(raw)
if isinstance(obj, dict):
return [obj]
if isinstance(obj, list):
return obj
return []
except Exception:
return []

@staticmethod
def _norm_title(s: str) -> str:
return re.sub(r"\s+", " ", (s or "").lower()).strip()

def _dedupe_headlines(self, items: list[dict[str, Any]]) -> list[dict[str, Any]]:
seen = set()
out = []
for it in items or []:
title = self._norm_title(it.get("title", ""))
url = it.get("url", "")
domain = self._source_from_url(url)
key = hashlib.md5(f"{title}|{domain}".encode()).hexdigest()
if key in seen:
continue
seen.add(key)
out.append(it)
return out


class JoinExecutor(Executor):
_tech: dict[str, TechSnapshot] = PrivateAttr(default_factory=dict)
_news: dict[str, CleanHeadlines] = PrivateAttr(default_factory=dict)

def __init__(self, id: str = "join"):
super().__init__(id=id)

@handler
async def take_tech(self, snap: TechSnapshot, ctx: WorkflowContext) -> None:
self._tech[snap.ticker] = snap
await self._maybe_emit(snap.ticker, ctx)

@handler
async def take_news(self, news: CleanHeadlines, ctx: WorkflowContext) -> None:
self._news[news.ticker] = news
await self._maybe_emit(news.ticker, ctx)

async def _maybe_emit(self, ticker: str, ctx: WorkflowContext) -> None:
if ticker in self._tech and ticker in self._news:
tech = self._tech.pop(ticker)
news = self._news.pop(ticker)
await ctx.send_message(TickerView(ticker=ticker, tech=tech, news=news))


class ReporterExecutor(Executor):
_client: OpenAIChatClient = PrivateAttr()

def __init__(self, client: OpenAIChatClient, id: str = "report"):
super().__init__(id=id)
self._client = client

@handler
async def report(self, tv: TickerView, ctx: WorkflowContext) -> None:
payload = {
"ticker": tv.ticker,
"valuation": tv.tech.valuation,
"price": tv.tech.price,
"fundamentals": tv.tech.fundamentals,
"events": tv.tech.events,
"news": tv.news.items[:8],
}

prompt = f"""
You are a finance analyst. Using the structured data below, write an unbiased assessment.

Data:
{json.dumps(payload, indent=2)}

Return a plain text note with:
Observations on valuation, technicals, and fundamentals without making recommendations.
Balanced read of recent news, calling out both positive and negative items with reasoning tied to business impact.
A short synthesis, 2 to 3 sentences, that explains how the data fits together and what to watch next.
Avoid hype; be specific and grounded in the data provided.
"""
resp = await self._client.get_response(prompt)
analysis_text = resp.messages[0].text.strip()

pe = tv.tech.valuation.get("pe", "n a")
ps = tv.tech.valuation.get("ps", "n a")
p1m = tv.tech.price.get("pct_1m", "n a")
p3m = tv.tech.price.get("pct_3m", "n a")
dist_hi = tv.tech.price.get("dist_from_52w_high", "n a")
header = (
f"=== {tv.ticker} snapshot ===\n"
f"PE {pe} PS {ps} 1m {p1m} 3m {p3m} distance from 52w high {dist_hi}\n"
f"Next earnings: {tv.tech.events.get('next_earnings_date', 'unknown')}\n\n"
)

await ctx.yield_output(header + analysis_text)


class FanOutExecutor(Executor):
_targets: list[str] = PrivateAttr()

def __init__(self, targets: list[str], id: str = "fanout"):
super().__init__(id=id)
self._targets = targets

@handler
async def split(self, rq: ResearchQuery, ctx: WorkflowContext) -> None:
for tid in self._targets:
await ctx.send_message(rq, target_id=tid)


# ===================== workflow =====================

async def parallel_single_ticker():
client = OpenAIChatClient()

fanout = FanOutExecutor(targets=["tech", "concept"], id="fanout")
tech = TechDataExecutor("tech")
concept = ConceptNewsExecutor("concept")
join = JoinExecutor("join")
report = ReporterExecutor(client, "report")

wf = (
WorkflowBuilder()
.set_start_executor(fanout)
.add_edge(fanout, tech)
.add_edge(fanout, concept)
.add_edge(tech, join)
.add_edge(concept, join)
.add_edge(join, report)
.build()
)

rq = ResearchQuery(ticker="AMZN", company_name="Amazon.com Inc", lookback_days=20)
output = None
async for ev in wf.run_stream(rq):
if isinstance(ev, WorkflowOutputEvent):
output = ev.data
if output:
print(output)


# ===================== main =====================

async def main():
await parallel_single_ticker()

if __name__ == "__main__":
asyncio.run(main())
This script demonstrates how orchestration becomes simpler and more reliable when you utilize explicit message types and workflow graphs. Dataclasses define what each executor can send and receive, which makes routing deterministic. The WorkflowBuilder wires together the executors, so you can create linear pipelines, parallel fan outs, joins, and even loops with the same set of primitives. In the finance example, both the technical and news branches run asynchronously, and the join executor ensures nothing proceeds until both are ready. This pattern is powerful because it scales: you can add new executors or new branches without rewriting the existing flow. Instead of writing glue logic to juggle parallel tasks, you let the graph and message types define the behavior.
The result is a clean orchestration layer that supports anything from simple sequential chains to complex parallel and cyclic workflows. Since we imported and initialized W&B Weave, and the code uses the OpenAIChatClient, all our calls to the model are logged inside Weave. We can simply run our code, navigate to the Weave dashboard, and view the inputs and outputs to our model.
This can be extremely useful in cases where we need to debug agents that are performing sub-optimally.
I've found that a large percentage of the time, Agents can be significantly improved by adjusting the prompts and context that they are given, which is why I strongly recommend using Weave when developing your agents.
💡



A human feedback example

Automation can greatly improve efficiency, but certain workflows still require human oversight. Scenarios such as content publishing, compliance checks, and regulated decision-making often require a person to review and approve outputs generated by an automated system. Without a proper way to include human input, developers are forced to create workarounds that add complexity and make the workflow less reliable.
The Microsoft Agent Framework provides a structured approach to incorporating humans into automated workflows. You can design processes where automated steps pause at the appropriate time and wait for feedback before proceeding. This approach allows you to naturally incorporate critical human judgment into the workflow, without disrupting the flow or relying on custom pause-and-resume logic.
In the following script, we will build a workflow where an agent drafts content, a coordinator sends it to a human for review, and the process waits for approval or revision feedback before continuing. This pattern makes it easy for automated agents and human reviewers to collaborate within a single, unified workflow.
"""
Human-in-the-Loop Workflow Example

This example demonstrates:
- RequestInfoExecutor for human input
- Workflow pause/resume with human feedback
- Correlation between requests and responses
- Interactive approval workflows
"""

import asyncio
import os
from dataclasses import dataclass
from typing import Any
from pydantic import PrivateAttr

import weave; weave.init("ms_agents")

from agent_framework import (
Executor,
WorkflowBuilder,
WorkflowContext,
WorkflowOutputEvent,
RequestInfoExecutor,
RequestInfoEvent,
RequestInfoMessage,
RequestResponse,
WorkflowStatusEvent,
WorkflowRunState,
handler,
)
from agent_framework.openai import OpenAIChatClient

# Toggle full content display during human review (set SHOW_FULL_REVIEW=0 to show a short preview)
SHOW_FULL_REVIEW = os.getenv("SHOW_FULL_REVIEW", "1").lower() not in ("0", "false", "no")


@dataclass
class ContentRequest:
topic: str
style: str
target_audience: str


@dataclass
class ContentDraft:
content: str
topic: str
created_by: str


@dataclass
class HumanReviewRequest(RequestInfoMessage):
content: str = ""
topic: str = ""
action_needed: str = ""
context: dict[str, Any] | None = None


@dataclass
class ApprovalDecision:
approved: bool
feedback: str
action: str


class ContentWriterExecutor(Executor):
"""Executor that creates content drafts."""

_client: OpenAIChatClient = PrivateAttr()

def __init__(self, chat_client: OpenAIChatClient, id: str = "writer"):
super().__init__(id=id)
self._client = chat_client

@property
def client(self) -> OpenAIChatClient:
return self._client

@handler
async def create_content(self, request: ContentRequest, ctx: WorkflowContext) -> None:
print(f"✍️ [{self.id}] Creating content for topic: {request.topic}")

prompt = f"""
Create {request.style} content about: {request.topic}
Target audience: {request.target_audience}
Keep it concise but informative.
"""

response = await self.client.get_response(prompt)
content = response.messages[0].text

draft = ContentDraft(content=content, topic=request.topic, created_by=self.id)
print(f"📄 [{self.id}] Draft created ({len(content)} characters)")
await ctx.send_message(draft)


@handler
async def revise_content(self, feedback: RequestResponse[HumanReviewRequest, str], ctx: WorkflowContext) -> None:
human_feedback = feedback.data
original_request = feedback.original_request

print(f"🔄 [{self.id}] Revising content based on feedback: {human_feedback}")

prompt = f"""
Revise this content based on the feedback:

Original content:
{original_request.content}

Human feedback:
{human_feedback}
"""

response = await self.client.get_response(prompt)
revised_content = response.messages[0].text

draft = ContentDraft(
content=revised_content,
topic=original_request.topic,
created_by=f"{self.id}_revised",
)

print(f"✅ [{self.id}] Content revised")
await ctx.send_message(draft)


class ReviewCoordinatorExecutor(Executor):
"""Executor that coordinates human review process."""

_human_reviewer_id: str = PrivateAttr()
_writer_id: str = PrivateAttr()
_publisher_id: str = PrivateAttr()

def __init__(self, human_reviewer_id: str, writer_id: str, publisher_id: str, id: str = "coordinator"):
super().__init__(id=id)
self._human_reviewer_id = human_reviewer_id
self._writer_id = writer_id
self._publisher_id = publisher_id

@property
def human_reviewer_id(self) -> str:
return self._human_reviewer_id

@property
def writer_id(self) -> str:
return self._writer_id

@property
def publisher_id(self) -> str:
return self._publisher_id

@handler
async def request_review(self, draft: ContentDraft, ctx: WorkflowContext) -> None:
print(f"👀 [{self.id}] Requesting human review for: {draft.topic}")

review_request = HumanReviewRequest(
content=draft.content,
topic=draft.topic,
action_needed="Please review this content. Reply with 'approve', or provide revision feedback.",
context={
"created_by": draft.created_by,
"character_count": len(draft.content),
},
)

await ctx.send_message(review_request, target_id=self.human_reviewer_id)

@handler
async def handle_review_response(self, response: RequestResponse[HumanReviewRequest, str], ctx: WorkflowContext) -> None:
human_decision = response.data.strip().lower()
original_request = response.original_request

print(f"📝 [{self.id}] Human decision: {human_decision}")

if human_decision == "approve" or "approved" in human_decision:
approved_draft = ContentDraft(
content=original_request.content,
topic=original_request.topic,
created_by="approved",
)
print(f"✅ [{self.id}] Content approved! Sending to publisher.")
await ctx.send_message(approved_draft, target_id=self.publisher_id)
else:
print(f"🔄 [{self.id}] Revision requested. Sending back to writer.")
await ctx.send_message(response, target_id=self.writer_id)


class ContentPublisherExecutor(Executor):
"""Executor that publishes approved content."""

def __init__(self, id: str = "publisher"):
super().__init__(id=id)

@handler
async def publish_content(self, draft: ContentDraft, ctx: WorkflowContext) -> None:
print(f"🚀 [{self.id}] Publishing content: {draft.topic}")

published_result = f"""
=== PUBLISHED CONTENT ===
Topic: {draft.topic}
Created by: {draft.created_by}
Length: {len(draft.content)} chars

{draft.content}

Published at: {asyncio.get_event_loop().time()}
========================
""".strip()

print(f"📰 [{self.id}] Content published successfully!")
await ctx.yield_output(published_result)


async def interactive_content_workflow():
print("\n👥 Interactive Content Creation Workflow")
print("-" * 50)

client = OpenAIChatClient()
writer = ContentWriterExecutor(client, "writer")
human_reviewer = RequestInfoExecutor("human_reviewer")
coordinator = ReviewCoordinatorExecutor("human_reviewer", "writer", "publisher", "coordinator")
publisher = ContentPublisherExecutor("publisher")

workflow = (
WorkflowBuilder()
.set_start_executor(writer)
.add_edge(writer, coordinator)
.add_edge(coordinator, human_reviewer)
.add_edge(human_reviewer, coordinator)
.add_edge(coordinator, writer)
.add_edge(coordinator, publisher)
.build()
)

content_request = ContentRequest(
topic="The Benefits of Remote Work for Small Businesses",
style="professional blog post",
target_audience="small business owners",
)

print(f"📋 Content Request: {content_request.topic}")

pending_requests = {}
workflow_completed = False

while not workflow_completed:
if not pending_requests:
events = [e async for e in workflow.run_stream(content_request)]
else:
events = [e async for e in workflow.send_responses_streaming(pending_requests)]

pending_requests = {}

for event in events:
if isinstance(event, RequestInfoEvent):
request_data = event.data
print("\n👤 HUMAN REVIEW NEEDED")
print(f"Topic: {request_data.topic}")
print(f"Action: {request_data.action_needed}")
if SHOW_FULL_REVIEW:
print("Full content:\n")
print(request_data.content)
else:
print(f"Preview: {request_data.content[:120]}...")
print(f"Full content length: {len(request_data.content)} characters")

human_input = input("\nApprove or feedback (quit to exit): ").strip()
if human_input.lower() == "quit":
return
pending_requests[event.request_id] = human_input

elif isinstance(event, WorkflowOutputEvent):
print("\n🎉 Workflow completed!")
print(f"📄 Final result:\n{event.data}")
workflow_completed = True
break

elif isinstance(event, WorkflowStatusEvent):
if event.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS:
print("⏸️ Waiting for human input...")
elif event.state == WorkflowRunState.IDLE:
# Terminal quiescent state; if no WorkflowOutputEvent was seen,
# consider the run done without output.
print("✅ Workflow reached idle state")
workflow_completed = True


async def main():
print("👥 Human-in-the-Loop Workflow Examples")
print("=" * 50)
await interactive_content_workflow()


if __name__ == "__main__":
asyncio.run(main())
The most important detail here is how human feedback is represented as structured messages. When the writer produces a draft, the coordinator wraps it in a HumanReviewRequest and sends it to a RequestInfoExecutor, which stands in for the human. At this point the workflow enters an idle with pending requests state, meaning it is paused safely until a response arrives. The human’s decision is returned as a RequestResponse, which routes back into the workflow. If the response is approval, the draft moves on to publishing. If revisions are needed, the feedback is sent back to the writer, creating a feedback loop. This approach makes long pauses completely safe, since the workflow can wait hours or days without losing context. By making human judgment part of the normal message passing system, the framework allows developers to design workflows that combine automation with oversight in a stable and repeatable way.






Next, the agent will follow my instructions and do a full rewrite:

After, I can check the writing once again, and approve it:


A checkpointing example

Even with careful orchestration and human review, workflows must also be durable. If a process takes a long time to complete, a crash or interruption should not require restarting from the beginning. This is especially important when dealing with external services, human approvals, or scheduled batch tasks, where interruptions and delays are common.
The Microsoft Agent Framework addresses this need by providing automatic checkpointing. At specific points during execution, the framework saves the current state of the workflow. This state includes pending messages, shared data, and any information that needs to be retained for future use. These checkpoints can be stored, reviewed, and used to resume the workflow at a chosen point, rather than having to repeat earlier steps.
In the next example, we will implement a script that saves the workflow’s progress after each major step. If the process is interrupted, you can review available checkpoints, select one, and resume execution from exactly where you left off. This ensures that long or complex workflows remain robust and do not lose progress due to unexpected interruptions.
"""
Two-Agent Checkpoint Demo

Agents
1) SearchAgent: produces a brief story summary with rough sources
2) ToneAgent: rewrites the story to sound negative

Behavior
- Saves a checkpoint right after SearchAgent finishes
- If a checkpoint exists, offers to resume from it and skip SearchAgent
"""

import asyncio
import json
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Optional

from pydantic import PrivateAttr

from agent_framework import (
Executor,
WorkflowBuilder,
WorkflowContext,
FileCheckpointStorage,
handler,
)
from agent_framework.openai import OpenAIChatClient

CHECKPOINT_DIR = Path(__file__).parent / "checkpoints"
CHECKPOINT_DIR.mkdir(exist_ok=True)

import weave; weave.init("ms_agents")

# ----------------------
# Messages
# ----------------------

@dataclass
class SearchTask:
topic: str
max_points: int = 5


@dataclass
class SearchResult:
topic: str
summary: str
sources: list[str]


@dataclass
class NegativeStory:
topic: str
text: str


# ----------------------
# Executors
# ----------------------

class SearchAgent(Executor):
_client: OpenAIChatClient = PrivateAttr()

def __init__(self, client: OpenAIChatClient, id: str = "search_agent"):
super().__init__(id=id)
self._client = client

@handler
async def run_search(self, task: SearchTask, ctx: WorkflowContext) -> None:
print(f"🔎 [{self.id}] Searching topic: {task.topic}")

prompt = f"""
You are a web researcher. Summarize the latest story about:
{task.topic}

Return:
1) a short narrative paragraph (3 to 6 sentences) that reads like a news brief
2) 3 to {task.max_points} plausible source links (just domains and paths, no markdown)
Keep it concise.
"""
resp = await self._client.get_response(prompt)
text = resp.messages[0].text

# naive split: last lines as "sources", first part as summary
lines = [ln.strip() for ln in text.strip().splitlines() if ln.strip()]
# heuristics to find links
links = [ln for ln in lines if "http" in ln or "." in ln and "/" in ln]
if not links:
links = ["example.com/story", "news.example.org/report", "media.example.net/article"]

summary_lines = [ln for ln in lines if ln not in links]
summary = " ".join(summary_lines).strip()
if not summary:
summary = f"A brief summary of {task.topic} could not be derived; using placeholder."

result = SearchResult(topic=task.topic, summary=summary, sources=links[: task.max_points])

# Save state for automatic checkpointing (framework will handle the checkpoint creation)
await ctx.set_state({"step": "searched", "result": asdict(result)})
await ctx.set_shared_state("search_result", asdict(result))
print(f"💾 [{self.id}] State saved (automatic checkpoint will be created)")

await ctx.send_message(result)


class ToneAgent(Executor):
_client: OpenAIChatClient = PrivateAttr()

def __init__(self, client: OpenAIChatClient, id: str = "tone_agent"):
super().__init__(id=id)
self._client = client

@handler
async def make_negative(self, result: SearchResult, ctx: WorkflowContext) -> None:
print(f"🖊️ [{self.id}] Rewriting with negative tone: {result.topic}")

prompt = f"""
Rewrite the following story with a strong positive framing while staying factual,
avoiding exaggerations, and keeping it tight (120 to 180 words).
End with a one-sentence kicker that underscores the positive frame.

Story:
{result.summary}

Sources:
{chr(10).join(result.sources)}
"""
resp = await self._client.get_response(prompt)
negative_text = resp.messages[0].text.strip()

final = NegativeStory(topic=result.topic, text=negative_text)

# Save final state for automatic checkpointing
await ctx.set_state({"step": "rewritten", "negative_story": asdict(final)})
await ctx.set_shared_state("final_story", asdict(final))
# Yield the final output
await ctx.yield_output(render_output(final))


# ----------------------
# Rendering
# ----------------------

def render_output(story: NegativeStory) -> str:
return f"""# positive Rewrite
Topic: {story.topic}

{story.text}
"""


# ----------------------
# Workflow Creation & Demo
# ----------------------

def create_workflow_with_checkpointing():
"""Create workflow with automatic checkpointing enabled."""
client = OpenAIChatClient()
search_agent = SearchAgent(client)
tone_agent = ToneAgent(client)
# Enable automatic checkpointing with FileCheckpointStorage
checkpoint_storage = FileCheckpointStorage(storage_path=str(CHECKPOINT_DIR))
return (
WorkflowBuilder()
.set_start_executor(search_agent)
.add_edge(search_agent, tone_agent)
.with_checkpointing(checkpoint_storage=checkpoint_storage) # Enable automatic checkpointing
.build()
)


async def main():
print("Two-Agent Checkpoint Demo (Automated)")
print("=" * 50)

# Create workflow with automatic checkpointing
workflow = create_workflow_with_checkpointing()
checkpoint_storage = FileCheckpointStorage(storage_path=str(CHECKPOINT_DIR))

# Check for existing checkpoints
all_checkpoints = await checkpoint_storage.list_checkpoints()
available_checkpoints = [cp for cp in all_checkpoints if cp.workflow_id]
if available_checkpoints:
print(f"Found {len(available_checkpoints)} existing checkpoints:")
sorted_cps = sorted(available_checkpoints, key=lambda c: c.timestamp)
for idx, cp in enumerate(sorted_cps):
shared_state = cp.shared_state
search_done = "search_result" in shared_state
tone_done = "final_story" in shared_state
status = "completed" if tone_done else "after_search" if search_done else "started"
print(f" [{idx}] iter={cp.iteration_count} status={status} time={cp.timestamp}")
user_input = input("\nEnter checkpoint index to resume from (or press Enter to start fresh): ").strip()
if user_input.isdigit():
idx = int(user_input)
if 0 <= idx < len(sorted_cps):
chosen_checkpoint = sorted_cps[idx]
print(f"\n🔄 Resuming from checkpoint {chosen_checkpoint.checkpoint_id}...")
# Use the framework's automatic resume and capture outputs
resume_outputs = []
async for event in workflow.run_stream_from_checkpoint(
chosen_checkpoint.checkpoint_id,
checkpoint_storage=checkpoint_storage
):
print(f"Resumed Event: {type(event).__name__}")
# Capture WorkflowOutputEvent data during resume
if type(event).__name__ == 'WorkflowOutputEvent' and hasattr(event, 'data'):
resume_outputs.append(event.data)
# Display resumed outputs
if resume_outputs:
print(f"\n🎭 Resumed Output:\n{resume_outputs[-1]}")
else:
print("\n⚠️ No outputs from resume")
return

# Fresh run
print("\n▶️ Starting fresh workflow...")
topic = input("Enter a topic to research (or press Enter for default): ").strip()
if not topic:
topic = "AI adoption in enterprise software"
task = SearchTask(topic=topic, max_points=3)
# Run with streaming to see events
outputs = []
async for event in workflow.run_stream(task):
print(f"Event: {type(event).__name__}")
# Only capture WorkflowOutputEvent data (the final outputs)
if type(event).__name__ == 'WorkflowOutputEvent' and hasattr(event, 'data'):
outputs.append(event.data)
# Display final outputs
if outputs:
print(f"\n🎭 Final Output:\n{outputs[-1]}")
else:
print("\n⚠️ No workflow outputs captured")
# Show created checkpoints
final_checkpoints = await checkpoint_storage.list_checkpoints()
new_checkpoints = [cp for cp in final_checkpoints if cp not in all_checkpoints]
print(f"\n💾 Created {len(new_checkpoints)} automatic checkpoints during execution")


if __name__ == "__main__":
asyncio.run(main())
The mechanics here are straightforward but powerful. When the search agent finishes its work, the state is saved, and a checkpoint is created. If the process is stopped, you can later resume from that checkpoint and continue into the rewriting stage without having to rerun the search. This is useful when workflows are interrupted by human delays, when external services impose rate limits, or when nightly jobs need to resume each morning. The framework not only saves progress but also makes resumption explicit, letting you see what checkpoints are available and decide which one to use. By combining checkpointing with orchestration and human review, you get agents that can branch, pause, and recover from interruptions while still delivering consistent results.
After running the script, we can see the results inside Weave for the calls made to the model:



After running the script a second time, we see the checkpoints that were saved from the previous script. We can select the index of the checkpoint, and our agent pipeline will pick up at the selected step, instead of starting from scratch.



After choosing a checkpoint, we can see the run picks up using the information gathered at the previous run!


Conclusion

In this tutorial, we walked through three core capabilities of the Microsoft Agent Framework.
  • First, we explored orchestration, where workflows are expressed as graphs of executors that can run in sequence, branch into parallel tasks, and rejoin into combined outputs.
  • Next, we looked at human-in-the-loop design, where the system pauses safely to gather human judgment and then resumes without losing context.
  • Finally, we examined checkpointing, which allows workflows to save their progress and continue later rather than starting over after an interruption.
Together, these scripts demonstrate how the Microsoft Agent Framework transforms agent building into a structured and dependable process. Instead of chaining fragile calls, you gain tools for coordination, oversight, and durability. This means agents can scale beyond simple demos into processes that are flexible enough to branch, reliable enough to handle human delays, and resilient enough to survive crashes or long pauses.
The outcome is a foundation where developers can design agents not just as prototypes, but as systems that withstand real-world conditions.
Iterate on AI agents and models faster. Try Weights & Biases today.