The Microsoft Agent Framework: Observability
Build resilient, multi-step AI agents with the Microsoft Agent Framework - combining orchestration, human-in-the-loop review, and checkpointing for real apps.
Created on September 29|Last edited on October 2
Comment
Building agents that go beyond single-prompt response calls requires more than clever prompting. Real applications demand systems that can coordinate multiple steps, incorporate human oversight, and survive interruptions. The Microsoft Agent Framework was designed specifically for this purpose. It gives developers a consistent way to assemble agents into workflows that can branch, pause, resume, and adapt to real-world conditions. By layering orchestration, human-in-the-loop support, and checkpointing on top of a unified client abstraction for OpenAI and Azure OpenAI, the framework turns experimental prototypes into durable systems.
In this article, we will explore how the Microsoft Agent Framework enables three core capabilities:
- First, we’ll examine orchestration and see how executors connect to workflow graphs that can run in sequence, parallel, or in loops.
- Then we’ll examine human-in-the-loop design, where workflows pause to gather approvals or feedback before resuming seamlessly.
- Finally, we’ll cover checkpointing, which enables long-running workflows to save and restore their state without having to restart after interruptions.
Together, these features underpin an agentic framework that provides a foundation for building agents that are not only functional but also resilient and adaptable in real-world scenarios.
Table of contents
OrchestrationHuman-in-the-Loop support Checkpointing support Tutorial: Using the Microsoft Agent Framework with observabilityAn orchestration example A human feedback example A checkpointing example Conclusion
Orchestration
The Microsoft Agent Framework treats orchestration as more than just chaining model calls together. It provides a way to design full workflows as graphs of executors, each responsible for a specific step and communicating through typed messages. Instead of ad-hoc function calls or brittle glue code, you describe the structure once in a WorkflowBuilder, and the framework handles routing, scheduling, and state management as the workflow runs.
A simple case is a linear pipeline. One executor generates a draft, another refines it, and a final one publishes the result. The hand-off is automatic: the output type from one step matches the input type of the next, so messages flow naturally through the graph. This makes pipelines easier to extend, for example by inserting a fact-checker or a style adapter in between steps.
Parallel branching (fan-out) is just as straightforward. A single input can be split into multiple paths simultaneously, allowing different agents to work independently. In the Microsoft Agent Framework finance demo, a company query is fanned out into two executors: one gathers technical and valuation data from market sources, while the other scans recent news. Each branch runs asynchronously, and the results come back in parallel.
Rejoining (fan-in) is handled by a join executor that waits until it has the required inputs before emitting a combined message. In the same demo, the JoinExecutor collects both technical and news outputs and merges them into a single TickerView, which is then handed to a reporter agent to produce a unified analysis.
Other structural patterns are supported naturally. You can create mixed chains where branches diverge, do heterogeneous work, and then rejoin before continuing. Cyclic loops work too, as in their content review example, where a draft cycles between writer and reviewer until approval is given. Even multi-stage parallel waves, where one layer of branching feeds into a join and then branches again, can be expressed cleanly.
Because every message is a dataclass, the types themselves define the routing rules. This makes the system deterministic and reduces ambiguity when workflows grow larger. Developers don’t need to manually code who should handle what; the graph and message types make it explicit. The result is an orchestration model that supports linear, parallel, cyclic, and layered flows using the same underlying primitives, providing flexibility to build both simple pipelines and complex, long-running agent systems.
Human-in-the-Loop support
There are many cases where an agent on its own isn’t enough, because some steps require human oversight.
Think about an AI system generating financial research notes. Compliance officers may need to sign off on the content before it is sent to clients. In a marketing workflow, a human editor may want to check tone and accuracy before publishing. In customer service, escalation paths sometimes require a manager's approval for a refund or policy exception.
The framework addresses these situations directly. When an executor produces a HumanReviewRequest, the workflow pauses and records that it’s waiting for input. An external interface, such as a command line, a web app, or a chat integration, collects the reviewer’s response. That response is wrapped in a RequestResponse and passed back into the workflow, which then continues as though nothing was interrupted.
A practical example is found in the Microsoft Agent Framework content creation demo: a writer executor drafts a post, the coordinator executor forwards it to a human reviewer, and depending on the feedback, either the publisher executor releases it or the draft goes back to the writer for revision. Without first-class support for human review, developers would have to bolt together custom pause and resume logic, but within this new frameworkd it’s part of the runtime itself.
This design makes it easy to embed judgment-heavy checkpoints inside an automated process, ensuring that agents don’t operate in isolation, but as collaborators with humans where necessary.
Checkpointing support
Agent workflows are often not short, atomic tasks. They may span minutes, and in rare cases, hours, depending on the amount of computation, external input, or coordination involved. Along the way, they might rely on outside services, human approval, or scheduled pauses. Without a way to preserve state, any interruption, whether a crash, a network error, or a delayed response, would force the system to restart from the very beginning. The Microsoft Agent Framework addresses this with checkpointing, a built-in mechanism for saving and restoring workflow state.
Checkpointing works by capturing the workflow at well-defined safe points. At those points, the framework records pending messages in the queue, shared data across executors, and any executor-specific state marked for persistence. These snapshots are stored in a checkpoint backend, such as a file-based storage layer, as shown in the examples; however, other storage systems could also be used.
When you restart a workflow, you don’t need to replay the entire history of messages and executor calls. Instead, you can enumerate the available checkpoints, choose one, and resume execution from that exact point. The framework reloads the saved state, restores the message queue, and continues as if the workflow had never been interrupted.
This design is useful in several scenarios. A workflow that requires human approval can pause and create a checkpoint right before sending the review request. Hours later, when the reviewer responds, the workflow resumes without losing context. In data-heavy flows, checkpointing prevents wasted computation when an API call fails or a timeout occurs, allowing you to restart from the last saved state instead of repeating earlier steps. For recurring jobs, such as nightly research tasks, checkpoints allow for incremental progress, so each run picks up where the last one left off.
The Microsoft Agent Services checkpointing demo illustrates this. A search agent produces a summary and saves its state; execution can then pause there. Later, you can reload from that checkpoint and continue with rewriting, without rerunning the search. This demonstrates how checkpointing enables workflows to be resilient to interruptions, human delays, and multi-session lifecycles.
By combining explicit routing with persistent checkpoints, the framework ensures workflows are not only flexible but also durable. This makes them more practical for real deployments, where the ability to resume partial progress is just as important as the ability to orchestrate complex flows.
Tutorial: Using the Microsoft Agent Framework with observability
This tutorial will guide you through three example scripts that highlight the most important features of the Microsoft Agent Framework: orchestration, human-in-the-loop support, and checkpointing. Each script shows a different way the framework can help structure complex workflows. The goal is not only to understand what the scripts do but also to see how they map onto real-world scenarios where agents need to coordinate multiple steps, handle interruptions, and remain reliable over time.
An orchestration example
Many workflows involve multiple steps. Sometimes tasks must run in a specific order, such as drafting content, revising it, and then publishing. In other situations, you may need to handle multiple tasks at the same time, like collecting different kinds of financial data in parallel.
The Microsoft Agent Framework lets you organize these processes as a graph of executors, with each step connected by clearly typed messages. You define the workflow structure up front, and the framework takes care of moving information between steps and managing the order of execution. This supports both simple sequential pipelines and more complex branching flows.
In the first script, we will show how to set up a workflow that takes a single company query and splits it into two parallel tasks. One executor will gather technical and valuation data, while another will scan for recent news. After both branches complete their tasks, their results are combined and reviewed by a reporting agent. This example demonstrates how to coordinate multiple agents and tasks within a single, organized workflow.
"""Finance Research Agent: parallel technical and conceptual analysisAgentsTechDataExecutor pulls fundamentals, valuation, technicals (yfinance) via class methodsConceptNewsExecutor scans the web via the chat client, structures headlines, sentiment, why it mattersJoinExecutor waits for both branches and mergesReporterExecutor uses an LLM to write an unbiased analysisNo CLI args. Set your OpenAI API key in env for agent_framework.openai.OpenAIChatClient.Requires: pip install yfinance"""import asyncioimport jsonimport reimport hashlibfrom dataclasses import dataclassfrom datetime import datetime, timedelta, timezonefrom math import isnanfrom typing import Anyimport yfinance as yffrom pydantic import PrivateAttrfrom agent_framework import (Executor,WorkflowBuilder,WorkflowContext,WorkflowOutputEvent,handler,)from agent_framework.openai import OpenAIChatClientimport weave; weave.init("ms_agents")# ===================== data types =====================@dataclassclass ResearchQuery:ticker: strcompany_name: strlookback_days: int = 7question: str = "What changed recently and what should I watch next"@dataclassclass TechSnapshot:ticker: strfundamentals: dict[str, Any]valuation: dict[str, Any]price: dict[str, Any]events: dict[str, Any]���@dataclassclass CleanHeadlines:ticker: stritems: list[dict[str, Any]] # title, url, published_at, source, sentiment, summary, why_it_matters@dataclassclass TickerView:ticker: strtech: TechSnapshotnews: CleanHeadlines# ===================== executors =====================class TechDataExecutor(Executor):"""All technicals helpers are class or static methods on this agent."""def __init__(self, id: str = "tech"):super().__init__(id=id)# ---------- numeric helpers ----------@staticmethoddef _pct(a, b):try:if b == 0 or b is None or a is None:return Nonereturn 100.0 * (a - b) / bexcept Exception:return None@staticmethoddef _safe(v):if v is None:return Nonetry:if isinstance(v, float) and isnan(v):return Noneexcept Exception:passreturn v# ---------- yfinance helpers ----------@classmethoddef _next_earnings_date(cls, tkr: yf.Ticker):try:cal = tkr.calendarif cal is None or cal.empty:return Nonefor k in ["Earnings Date", "EarningsDate"]:if k in cal.index:val = cal.loc[k].values[0]if hasattr(val, "to_pydatetime"):return val.to_pydatetime().date().isoformat()if isinstance(val, datetime):return val.date().isoformat()if isinstance(val, str):return val[:10]return Noneexcept Exception:return None@classmethoddef _trailing_price_series(cls, ticker: str, months: int = 15):end = datetime.now(timezone.utc)start = end - timedelta(days=30 * months)df = yf.download(ticker,start=start.date().isoformat(),end=end.date().isoformat(),progress=False,auto_adjust=True,)return df@classmethoddef _price_block(cls, df):if df is None or df.empty:return {"px": None,"pct_1m": None,"pct_3m": None,"range_52w": None,"dist_from_52w_high": None,"vol_vs_20d": None,}close = df["Close"]last = cls._safe(float(close.iloc[-1]))def nth_from_end(n):try:return float(close.iloc[-n])except Exception:return Noneone_month = nth_from_end(21)three_month = nth_from_end(63)low_52w = cls._safe(float(close.rolling(252, min_periods=1).min().iloc[-1]))high_52w = cls._safe(float(close.rolling(252, min_periods=1).max().iloc[-1]))dist_from_high = cls._pct(last, high_52w)vol = df["Volume"].astype(float)vol_20d = cls._safe(float(vol.iloc[-20:].mean())) if len(vol) >= 20 else Nonevol_last = cls._safe(float(vol.iloc[-1]))vol_ratio = cls._safe(vol_last / vol_20d) if vol_20d and vol_20d != 0 else Nonereturn {"px": last,"pct_1m": cls._pct(last, one_month),"pct_3m": cls._pct(last, three_month),"range_52w": f"{low_52w} to {high_52w}" if low_52w is not None and high_52w is not None else None,"dist_from_52w_high": dist_from_high,"vol_vs_20d": vol_ratio,}@classmethoddef _valuation_block(cls, info: dict, tkr: yf.Ticker):pe = cls._safe(info.get("trailingPE"))ps = cls._safe(info.get("priceToSalesTrailing12Months"))try:shares_out = cls._safe(info.get("sharesOutstanding"))px = cls._safe(info.get("currentPrice"))market_cap = px * shares_out if px and shares_out else cls._safe(info.get("marketCap"))except Exception:market_cap = cls._safe(info.get("marketCap"))total_debt = cls._safe(info.get("totalDebt"))cash = cls._safe(info.get("totalCash"))enterprise_value = Noneif market_cap is not None:enterprise_value = market_cap + (total_debt or 0) - (cash or 0)ebitda = cls._safe(info.get("ebitda"))ev_ebitda = Noneif enterprise_value is not None and ebitda and ebitda != 0:ev_ebitda = enterprise_value / ebitdareturn {"pe": pe,"ps": ps,"ev_ebitda": ev_ebitda,"enterprise_value": enterprise_value,}@classmethoddef _fundamentals_block(cls, tkr: yf.Ticker):rev_yoy = Noneeps_yoy = Nonegross_margin = Nonefcf_margin = Nonetry:fin = tkr.financialsif fin is not None and not fin.empty:rev = fin.loc["Total Revenue"].dropna().astype(float)if len(rev) >= 2:rev_yoy = cls._pct(rev.iloc[0], rev.iloc[1])if "Gross Profit" in fin.index and "Total Revenue" in fin.index:gp = float(fin.loc["Gross Profit"].dropna().iloc[0])tr = float(fin.loc["Total Revenue"].dropna().iloc[0])if tr:gross_margin = 100.0 * gp / trexcept Exception:passtry:q = tkr.quarterly_financialsif q is not None and not q.empty and "Diluted EPS" in q.index:eps_series = q.loc["Diluted EPS"].dropna().astype(float)if len(eps_series) >= 5:eps_yoy = cls._pct(eps_series.iloc[0], eps_series.iloc[4])except Exception:passtry:cf = tkr.cashflowif cf is not None and not cf.empty and "Total Cash From Operating Activities" in cf.index:cfo = float(cf.loc["Total Cash From Operating Activities"].dropna().iloc[0])capex = 0.0if "Capital Expenditures" in cf.index:capex = float(cf.loc["Capital Expenditures"].dropna().iloc[0])fcf = cfo + capexfin = tkr.financialsif fin is not None and not fin.empty and "Total Revenue" in fin.index:revenue = float(fin.loc["Total Revenue"].dropna().iloc[0])if revenue:fcf_margin = 100.0 * fcf / revenueexcept Exception:passreturn {"revenue_yoy": rev_yoy,"eps_yoy": eps_yoy,"gross_margin": gross_margin,"fcf_margin": fcf_margin,}@classmethoddef get_technicals(cls, ticker: str) -> dict:tkr = yf.Ticker(ticker)info = tkr.info or {}df = cls._trailing_price_series(ticker, months=15)price = cls._price_block(df)valuation = cls._valuation_block(info, tkr)fundamentals = cls._fundamentals_block(tkr)earnings = cls._next_earnings_date(tkr)return {"ticker": ticker,"fundamentals": fundamentals,"valuation": valuation,"price": price,"events": {"next_earnings_date": earnings},}@handlerasync def get_tech(self, rq: ResearchQuery, ctx: WorkflowContext) -> None:data = self.get_technicals(rq.ticker)snap = TechSnapshot(ticker=rq.ticker,fundamentals=data["fundamentals"],valuation=data["valuation"],price=data["price"],events=data["events"],)await ctx.send_message(snap)import asyncioimport hashlibimport jsonimport refrom datetime import datetime, timedeltafrom typing import Anyfrom openai import OpenAI# assumes these come from your framework# from your_pkg import Executor, handler, WorkflowContext, ResearchQuery, CleanHeadlinesclass ConceptNewsExecutor(Executor):_client: OpenAIdef __init__(self, id: str = "concept"):super().__init__(id=id)self._client = OpenAI()@handlerasync def scan_and_summarize(self, rq: ResearchQuery, ctx: WorkflowContext) -> None:days = int(getattr(rq, "lookback_days", 10) or 10)company = getattr(rq, "company_name", "") or getattr(rq, "ticker", "")max_results = 10window = {(datetime.now() - timedelta(days=i)).strftime("%Y-%m-%d") for i in range(days + 1)}stories = await self._fetch_news(company=company, days_back=days, max_results=max_results)if not isinstance(stories, list):stories = []stories = [s for s in stories if str(s.get("date_published", ""))[:10] in window]stories = self._dedupe_headlines(stories)cleaned = []for it in stories:title = it.get("title", "")snippet = it.get("snippet", "")dt = str(it.get("date_published", ""))[:10]sent, summary, implications, date_clean = await self._classify_and_summarize(title=title,snippet=snippet,date=dt or datetime.now().strftime("%Y-%m-%d"),)cleaned.append({"title": title,"url": it.get("url", ""),"published_at": date_clean,"source": self._source_from_url(it.get("url", "")),"sentiment": sent or "neutral","summary": summary or "","why_it_matters": implications or "",})await asyncio.sleep(0.3)await ctx.send_message(CleanHeadlines(ticker=rq.ticker, items=cleaned))async def _fetch_news(self, company: str, days_back: int, max_results: int) -> list[dict[str, Any]]:prompt = (f"Find up to {max_results} of the most important English news stories about '{company}' "f"from the last {days_back} days. Strictly output as a JSON array in a markdown code block like this: ""[{\"title\": \"...\", \"url\": \"...\", \"date_published\": \"YYYY-MM-DD\", \"snippet\": \"...\"}] ""If there are none, return [].")def _call():return self._client.responses.create(model="gpt-4.1",tools=[{"type": "web_search_preview"}],input=prompt,)resp = await asyncio.to_thread(_call)output = getattr(resp, "output_text", "").strip() if resp else ""stories = self._extract_json(output)return stories if isinstance(stories, list) else []async def _classify_and_summarize(self, title: str, snippet: str, date: str) -> tuple[str, str, str, str]:prompt = ("Given the following news story, respond ONLY with a JSON object: ""{""\"sentiment\": \"positive\" or \"negative\", ""\"summary\": \"detailed, multi-sentence summary (about 10 sentences)\", ""\"implications\": \"Succinctly explain why this news matters for the company or industry (1-2 sentences)\", ""\"date\": \"YYYY-MM-DD\"""}. ""Do not output anything except valid JSON. "f"Title: {title}\nSnippet: {snippet}\nDate: {date}")def _call():return self._client.chat.completions.create(model="gpt-4.1",messages=[{"role": "user", "content": prompt}],temperature=0.2,)resp = await asyncio.to_thread(_call)text = resp.choices[0].message.content.strip() if resp and resp.choices else ""try:obj = json.loads(text)return (str(obj.get("sentiment", "unknown")),str(obj.get("summary", "")),str(obj.get("implications", "")),str(obj.get("date", date)),)except Exception:sent = re.search(r'"sentiment"\s*:\s*"(\w+)"', text)summ = re.search(r'"summary"\s*:\s*"([^"]+)"', text)impact = re.search(r'"implications"\s*:\s*"([^"]+)"', text)date_f = re.search(r'"date"\s*:\s*"([^"]+)"', text)return (sent.group(1) if sent else "unknown",summ.group(1) if summ else "",impact.group(1) if impact else "",date_f.group(1) if date_f else date,)@staticmethoddef _source_from_url(url: str) -> str:if not url:return ""return re.sub(r"^https?://(www\.)?", "", url).split("/")[0]@staticmethoddef _extract_json(payload: str) -> Any:if isinstance(payload, (dict, list)):return payloadm = re.search(r"```json\s*(.*?)\s*```", payload, re.DOTALL)if not m:m = re.search(r"```\s*(.*?)\s*```", payload, re.DOTALL)raw = m.group(1) if m else payload.strip()try:obj = json.loads(raw)if isinstance(obj, dict):return [obj]if isinstance(obj, list):return objreturn []except Exception:return []@staticmethoddef _norm_title(s: str) -> str:return re.sub(r"\s+", " ", (s or "").lower()).strip()def _dedupe_headlines(self, items: list[dict[str, Any]]) -> list[dict[str, Any]]:seen = set()out = []for it in items or []:title = self._norm_title(it.get("title", ""))url = it.get("url", "")domain = self._source_from_url(url)key = hashlib.md5(f"{title}|{domain}".encode()).hexdigest()if key in seen:continueseen.add(key)out.append(it)return outclass JoinExecutor(Executor):_tech: dict[str, TechSnapshot] = PrivateAttr(default_factory=dict)_news: dict[str, CleanHeadlines] = PrivateAttr(default_factory=dict)def __init__(self, id: str = "join"):super().__init__(id=id)@handlerasync def take_tech(self, snap: TechSnapshot, ctx: WorkflowContext) -> None:self._tech[snap.ticker] = snapawait self._maybe_emit(snap.ticker, ctx)@handlerasync def take_news(self, news: CleanHeadlines, ctx: WorkflowContext) -> None:self._news[news.ticker] = newsawait self._maybe_emit(news.ticker, ctx)async def _maybe_emit(self, ticker: str, ctx: WorkflowContext) -> None:if ticker in self._tech and ticker in self._news:tech = self._tech.pop(ticker)news = self._news.pop(ticker)await ctx.send_message(TickerView(ticker=ticker, tech=tech, news=news))class ReporterExecutor(Executor):_client: OpenAIChatClient = PrivateAttr()def __init__(self, client: OpenAIChatClient, id: str = "report"):super().__init__(id=id)self._client = client@handlerasync def report(self, tv: TickerView, ctx: WorkflowContext) -> None:payload = {"ticker": tv.ticker,"valuation": tv.tech.valuation,"price": tv.tech.price,"fundamentals": tv.tech.fundamentals,"events": tv.tech.events,"news": tv.news.items[:8],}prompt = f"""You are a finance analyst. Using the structured data below, write an unbiased assessment.Data:{json.dumps(payload, indent=2)}Return a plain text note with:Observations on valuation, technicals, and fundamentals without making recommendations.Balanced read of recent news, calling out both positive and negative items with reasoning tied to business impact.A short synthesis, 2 to 3 sentences, that explains how the data fits together and what to watch next.Avoid hype; be specific and grounded in the data provided."""resp = await self._client.get_response(prompt)analysis_text = resp.messages[0].text.strip()pe = tv.tech.valuation.get("pe", "n a")ps = tv.tech.valuation.get("ps", "n a")p1m = tv.tech.price.get("pct_1m", "n a")p3m = tv.tech.price.get("pct_3m", "n a")dist_hi = tv.tech.price.get("dist_from_52w_high", "n a")header = (f"=== {tv.ticker} snapshot ===\n"f"PE {pe} PS {ps} 1m {p1m} 3m {p3m} distance from 52w high {dist_hi}\n"f"Next earnings: {tv.tech.events.get('next_earnings_date', 'unknown')}\n\n")await ctx.yield_output(header + analysis_text)class FanOutExecutor(Executor):_targets: list[str] = PrivateAttr()def __init__(self, targets: list[str], id: str = "fanout"):super().__init__(id=id)self._targets = targets@handlerasync def split(self, rq: ResearchQuery, ctx: WorkflowContext) -> None:for tid in self._targets:await ctx.send_message(rq, target_id=tid)# ===================== workflow =====================async def parallel_single_ticker():client = OpenAIChatClient()fanout = FanOutExecutor(targets=["tech", "concept"], id="fanout")tech = TechDataExecutor("tech")concept = ConceptNewsExecutor("concept")join = JoinExecutor("join")report = ReporterExecutor(client, "report")wf = (WorkflowBuilder().set_start_executor(fanout).add_edge(fanout, tech).add_edge(fanout, concept).add_edge(tech, join).add_edge(concept, join).add_edge(join, report).build())rq = ResearchQuery(ticker="AMZN", company_name="Amazon.com Inc", lookback_days=20)output = Noneasync for ev in wf.run_stream(rq):if isinstance(ev, WorkflowOutputEvent):output = ev.dataif output:print(output)# ===================== main =====================async def main():await parallel_single_ticker()if __name__ == "__main__":asyncio.run(main())
This script demonstrates how orchestration becomes simpler and more reliable when you utilize explicit message types and workflow graphs. Dataclasses define what each executor can send and receive, which makes routing deterministic. The WorkflowBuilder wires together the executors, so you can create linear pipelines, parallel fan outs, joins, and even loops with the same set of primitives. In the finance example, both the technical and news branches run asynchronously, and the join executor ensures nothing proceeds until both are ready. This pattern is powerful because it scales: you can add new executors or new branches without rewriting the existing flow. Instead of writing glue logic to juggle parallel tasks, you let the graph and message types define the behavior.
The result is a clean orchestration layer that supports anything from simple sequential chains to complex parallel and cyclic workflows. Since we imported and initialized W&B Weave, and the code uses the OpenAIChatClient, all our calls to the model are logged inside Weave. We can simply run our code, navigate to the Weave dashboard, and view the inputs and outputs to our model.
This can be extremely useful in cases where we need to debug agents that are performing sub-optimally.
I've found that a large percentage of the time, Agents can be significantly improved by adjusting the prompts and context that they are given, which is why I strongly recommend using Weave when developing your agents.
💡


A human feedback example
Automation can greatly improve efficiency, but certain workflows still require human oversight. Scenarios such as content publishing, compliance checks, and regulated decision-making often require a person to review and approve outputs generated by an automated system. Without a proper way to include human input, developers are forced to create workarounds that add complexity and make the workflow less reliable.
The Microsoft Agent Framework provides a structured approach to incorporating humans into automated workflows. You can design processes where automated steps pause at the appropriate time and wait for feedback before proceeding. This approach allows you to naturally incorporate critical human judgment into the workflow, without disrupting the flow or relying on custom pause-and-resume logic.
In the following script, we will build a workflow where an agent drafts content, a coordinator sends it to a human for review, and the process waits for approval or revision feedback before continuing. This pattern makes it easy for automated agents and human reviewers to collaborate within a single, unified workflow.
"""Human-in-the-Loop Workflow ExampleThis example demonstrates:- RequestInfoExecutor for human input- Workflow pause/resume with human feedback- Correlation between requests and responses- Interactive approval workflows"""import asyncioimport osfrom dataclasses import dataclassfrom typing import Anyfrom pydantic import PrivateAttrimport weave; weave.init("ms_agents")from agent_framework import (Executor,WorkflowBuilder,WorkflowContext,WorkflowOutputEvent,RequestInfoExecutor,RequestInfoEvent,RequestInfoMessage,RequestResponse,WorkflowStatusEvent,WorkflowRunState,handler,)from agent_framework.openai import OpenAIChatClient# Toggle full content display during human review (set SHOW_FULL_REVIEW=0 to show a short preview)SHOW_FULL_REVIEW = os.getenv("SHOW_FULL_REVIEW", "1").lower() not in ("0", "false", "no")@dataclassclass ContentRequest:topic: strstyle: strtarget_audience: str@dataclassclass ContentDraft:content: strtopic: strcreated_by: str@dataclassclass HumanReviewRequest(RequestInfoMessage):content: str = ""topic: str = ""action_needed: str = ""context: dict[str, Any] | None = None@dataclassclass ApprovalDecision:approved: boolfeedback: straction: strclass ContentWriterExecutor(Executor):"""Executor that creates content drafts."""_client: OpenAIChatClient = PrivateAttr()def __init__(self, chat_client: OpenAIChatClient, id: str = "writer"):super().__init__(id=id)self._client = chat_client@propertydef client(self) -> OpenAIChatClient:return self._client@handlerasync def create_content(self, request: ContentRequest, ctx: WorkflowContext) -> None:print(f"✍️ [{self.id}] Creating content for topic: {request.topic}")prompt = f"""Create {request.style} content about: {request.topic}Target audience: {request.target_audience}Keep it concise but informative."""response = await self.client.get_response(prompt)content = response.messages[0].textdraft = ContentDraft(content=content, topic=request.topic, created_by=self.id)print(f"📄 [{self.id}] Draft created ({len(content)} characters)")await ctx.send_message(draft)@handlerasync def revise_content(self, feedback: RequestResponse[HumanReviewRequest, str], ctx: WorkflowContext) -> None:human_feedback = feedback.dataoriginal_request = feedback.original_requestprint(f"🔄 [{self.id}] Revising content based on feedback: {human_feedback}")prompt = f"""Revise this content based on the feedback:Original content:{original_request.content}Human feedback:{human_feedback}"""response = await self.client.get_response(prompt)revised_content = response.messages[0].textdraft = ContentDraft(content=revised_content,topic=original_request.topic,created_by=f"{self.id}_revised",)print(f"✅ [{self.id}] Content revised")await ctx.send_message(draft)class ReviewCoordinatorExecutor(Executor):"""Executor that coordinates human review process."""_human_reviewer_id: str = PrivateAttr()_writer_id: str = PrivateAttr()_publisher_id: str = PrivateAttr()def __init__(self, human_reviewer_id: str, writer_id: str, publisher_id: str, id: str = "coordinator"):super().__init__(id=id)self._human_reviewer_id = human_reviewer_idself._writer_id = writer_idself._publisher_id = publisher_id@propertydef human_reviewer_id(self) -> str:return self._human_reviewer_id@propertydef writer_id(self) -> str:return self._writer_id@propertydef publisher_id(self) -> str:return self._publisher_id@handlerasync def request_review(self, draft: ContentDraft, ctx: WorkflowContext) -> None:print(f"👀 [{self.id}] Requesting human review for: {draft.topic}")review_request = HumanReviewRequest(content=draft.content,topic=draft.topic,action_needed="Please review this content. Reply with 'approve', or provide revision feedback.",context={"created_by": draft.created_by,"character_count": len(draft.content),},)await ctx.send_message(review_request, target_id=self.human_reviewer_id)@handlerasync def handle_review_response(self, response: RequestResponse[HumanReviewRequest, str], ctx: WorkflowContext) -> None:human_decision = response.data.strip().lower()original_request = response.original_requestprint(f"📝 [{self.id}] Human decision: {human_decision}")if human_decision == "approve" or "approved" in human_decision:approved_draft = ContentDraft(content=original_request.content,topic=original_request.topic,created_by="approved",)print(f"✅ [{self.id}] Content approved! Sending to publisher.")await ctx.send_message(approved_draft, target_id=self.publisher_id)else:print(f"🔄 [{self.id}] Revision requested. Sending back to writer.")await ctx.send_message(response, target_id=self.writer_id)class ContentPublisherExecutor(Executor):"""Executor that publishes approved content."""def __init__(self, id: str = "publisher"):super().__init__(id=id)@handlerasync def publish_content(self, draft: ContentDraft, ctx: WorkflowContext) -> None:print(f"🚀 [{self.id}] Publishing content: {draft.topic}")published_result = f"""=== PUBLISHED CONTENT ===Topic: {draft.topic}Created by: {draft.created_by}Length: {len(draft.content)} chars{draft.content}Published at: {asyncio.get_event_loop().time()}========================""".strip()print(f"📰 [{self.id}] Content published successfully!")await ctx.yield_output(published_result)async def interactive_content_workflow():print("\n👥 Interactive Content Creation Workflow")print("-" * 50)client = OpenAIChatClient()writer = ContentWriterExecutor(client, "writer")human_reviewer = RequestInfoExecutor("human_reviewer")coordinator = ReviewCoordinatorExecutor("human_reviewer", "writer", "publisher", "coordinator")publisher = ContentPublisherExecutor("publisher")workflow = (WorkflowBuilder().set_start_executor(writer).add_edge(writer, coordinator).add_edge(coordinator, human_reviewer).add_edge(human_reviewer, coordinator).add_edge(coordinator, writer).add_edge(coordinator, publisher).build())content_request = ContentRequest(topic="The Benefits of Remote Work for Small Businesses",style="professional blog post",target_audience="small business owners",)print(f"📋 Content Request: {content_request.topic}")pending_requests = {}workflow_completed = Falsewhile not workflow_completed:if not pending_requests:events = [e async for e in workflow.run_stream(content_request)]else:events = [e async for e in workflow.send_responses_streaming(pending_requests)]pending_requests = {}for event in events:if isinstance(event, RequestInfoEvent):request_data = event.dataprint("\n👤 HUMAN REVIEW NEEDED")print(f"Topic: {request_data.topic}")print(f"Action: {request_data.action_needed}")if SHOW_FULL_REVIEW:print("Full content:\n")print(request_data.content)else:print(f"Preview: {request_data.content[:120]}...")print(f"Full content length: {len(request_data.content)} characters")human_input = input("\nApprove or feedback (quit to exit): ").strip()if human_input.lower() == "quit":returnpending_requests[event.request_id] = human_inputelif isinstance(event, WorkflowOutputEvent):print("\n🎉 Workflow completed!")print(f"📄 Final result:\n{event.data}")workflow_completed = Truebreakelif isinstance(event, WorkflowStatusEvent):if event.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS:print("⏸️ Waiting for human input...")elif event.state == WorkflowRunState.IDLE:# Terminal quiescent state; if no WorkflowOutputEvent was seen,# consider the run done without output.print("✅ Workflow reached idle state")workflow_completed = Trueasync def main():print("👥 Human-in-the-Loop Workflow Examples")print("=" * 50)await interactive_content_workflow()if __name__ == "__main__":asyncio.run(main())
The most important detail here is how human feedback is represented as structured messages. When the writer produces a draft, the coordinator wraps it in a HumanReviewRequest and sends it to a RequestInfoExecutor, which stands in for the human. At this point the workflow enters an idle with pending requests state, meaning it is paused safely until a response arrives. The human’s decision is returned as a RequestResponse, which routes back into the workflow. If the response is approval, the draft moves on to publishing. If revisions are needed, the feedback is sent back to the writer, creating a feedback loop. This approach makes long pauses completely safe, since the workflow can wait hours or days without losing context. By making human judgment part of the normal message passing system, the framework allows developers to design workflows that combine automation with oversight in a stable and repeatable way.


Next, the agent will follow my instructions and do a full rewrite:

After, I can check the writing once again, and approve it:

A checkpointing example
Even with careful orchestration and human review, workflows must also be durable. If a process takes a long time to complete, a crash or interruption should not require restarting from the beginning. This is especially important when dealing with external services, human approvals, or scheduled batch tasks, where interruptions and delays are common.
The Microsoft Agent Framework addresses this need by providing automatic checkpointing. At specific points during execution, the framework saves the current state of the workflow. This state includes pending messages, shared data, and any information that needs to be retained for future use. These checkpoints can be stored, reviewed, and used to resume the workflow at a chosen point, rather than having to repeat earlier steps.
In the next example, we will implement a script that saves the workflow’s progress after each major step. If the process is interrupted, you can review available checkpoints, select one, and resume execution from exactly where you left off. This ensures that long or complex workflows remain robust and do not lose progress due to unexpected interruptions.
"""Two-Agent Checkpoint DemoAgents1) SearchAgent: produces a brief story summary with rough sources2) ToneAgent: rewrites the story to sound negativeBehavior- Saves a checkpoint right after SearchAgent finishes- If a checkpoint exists, offers to resume from it and skip SearchAgent"""import asyncioimport jsonfrom dataclasses import dataclass, asdictfrom pathlib import Pathfrom typing import Optionalfrom pydantic import PrivateAttrfrom agent_framework import (Executor,WorkflowBuilder,WorkflowContext,FileCheckpointStorage,handler,)from agent_framework.openai import OpenAIChatClientCHECKPOINT_DIR = Path(__file__).parent / "checkpoints"CHECKPOINT_DIR.mkdir(exist_ok=True)import weave; weave.init("ms_agents")# ----------------------# Messages# ----------------------@dataclassclass SearchTask:topic: strmax_points: int = 5@dataclassclass SearchResult:topic: strsummary: strsources: list[str]@dataclassclass NegativeStory:topic: strtext: str# ----------------------# Executors# ----------------------class SearchAgent(Executor):_client: OpenAIChatClient = PrivateAttr()def __init__(self, client: OpenAIChatClient, id: str = "search_agent"):super().__init__(id=id)self._client = client@handlerasync def run_search(self, task: SearchTask, ctx: WorkflowContext) -> None:print(f"🔎 [{self.id}] Searching topic: {task.topic}")prompt = f"""You are a web researcher. Summarize the latest story about:{task.topic}Return:1) a short narrative paragraph (3 to 6 sentences) that reads like a news brief2) 3 to {task.max_points} plausible source links (just domains and paths, no markdown)Keep it concise."""resp = await self._client.get_response(prompt)text = resp.messages[0].text# naive split: last lines as "sources", first part as summarylines = [ln.strip() for ln in text.strip().splitlines() if ln.strip()]# heuristics to find linkslinks = [ln for ln in lines if "http" in ln or "." in ln and "/" in ln]if not links:links = ["example.com/story", "news.example.org/report", "media.example.net/article"]summary_lines = [ln for ln in lines if ln not in links]summary = " ".join(summary_lines).strip()if not summary:summary = f"A brief summary of {task.topic} could not be derived; using placeholder."result = SearchResult(topic=task.topic, summary=summary, sources=links[: task.max_points])# Save state for automatic checkpointing (framework will handle the checkpoint creation)await ctx.set_state({"step": "searched", "result": asdict(result)})await ctx.set_shared_state("search_result", asdict(result))print(f"💾 [{self.id}] State saved (automatic checkpoint will be created)")await ctx.send_message(result)class ToneAgent(Executor):_client: OpenAIChatClient = PrivateAttr()def __init__(self, client: OpenAIChatClient, id: str = "tone_agent"):super().__init__(id=id)self._client = client@handlerasync def make_negative(self, result: SearchResult, ctx: WorkflowContext) -> None:print(f"🖊️ [{self.id}] Rewriting with negative tone: {result.topic}")prompt = f"""Rewrite the following story with a strong positive framing while staying factual,avoiding exaggerations, and keeping it tight (120 to 180 words).End with a one-sentence kicker that underscores the positive frame.Story:{result.summary}Sources:{chr(10).join(result.sources)}"""resp = await self._client.get_response(prompt)negative_text = resp.messages[0].text.strip()final = NegativeStory(topic=result.topic, text=negative_text)# Save final state for automatic checkpointingawait ctx.set_state({"step": "rewritten", "negative_story": asdict(final)})await ctx.set_shared_state("final_story", asdict(final))# Yield the final outputawait ctx.yield_output(render_output(final))# ----------------------# Rendering# ----------------------def render_output(story: NegativeStory) -> str:return f"""# positive RewriteTopic: {story.topic}{story.text}"""# ----------------------# Workflow Creation & Demo# ----------------------def create_workflow_with_checkpointing():"""Create workflow with automatic checkpointing enabled."""client = OpenAIChatClient()search_agent = SearchAgent(client)tone_agent = ToneAgent(client)# Enable automatic checkpointing with FileCheckpointStoragecheckpoint_storage = FileCheckpointStorage(storage_path=str(CHECKPOINT_DIR))return (WorkflowBuilder().set_start_executor(search_agent).add_edge(search_agent, tone_agent).with_checkpointing(checkpoint_storage=checkpoint_storage) # Enable automatic checkpointing.build())async def main():print("Two-Agent Checkpoint Demo (Automated)")print("=" * 50)# Create workflow with automatic checkpointingworkflow = create_workflow_with_checkpointing()checkpoint_storage = FileCheckpointStorage(storage_path=str(CHECKPOINT_DIR))# Check for existing checkpointsall_checkpoints = await checkpoint_storage.list_checkpoints()available_checkpoints = [cp for cp in all_checkpoints if cp.workflow_id]if available_checkpoints:print(f"Found {len(available_checkpoints)} existing checkpoints:")sorted_cps = sorted(available_checkpoints, key=lambda c: c.timestamp)for idx, cp in enumerate(sorted_cps):shared_state = cp.shared_statesearch_done = "search_result" in shared_statetone_done = "final_story" in shared_statestatus = "completed" if tone_done else "after_search" if search_done else "started"print(f" [{idx}] iter={cp.iteration_count} status={status} time={cp.timestamp}")user_input = input("\nEnter checkpoint index to resume from (or press Enter to start fresh): ").strip()if user_input.isdigit():idx = int(user_input)if 0 <= idx < len(sorted_cps):chosen_checkpoint = sorted_cps[idx]print(f"\n🔄 Resuming from checkpoint {chosen_checkpoint.checkpoint_id}...")# Use the framework's automatic resume and capture outputsresume_outputs = []async for event in workflow.run_stream_from_checkpoint(chosen_checkpoint.checkpoint_id,checkpoint_storage=checkpoint_storage):print(f"Resumed Event: {type(event).__name__}")# Capture WorkflowOutputEvent data during resumeif type(event).__name__ == 'WorkflowOutputEvent' and hasattr(event, 'data'):resume_outputs.append(event.data)# Display resumed outputsif resume_outputs:print(f"\n🎭 Resumed Output:\n{resume_outputs[-1]}")else:print("\n⚠️ No outputs from resume")return# Fresh runprint("\n▶️ Starting fresh workflow...")topic = input("Enter a topic to research (or press Enter for default): ").strip()if not topic:topic = "AI adoption in enterprise software"task = SearchTask(topic=topic, max_points=3)# Run with streaming to see eventsoutputs = []async for event in workflow.run_stream(task):print(f"Event: {type(event).__name__}")# Only capture WorkflowOutputEvent data (the final outputs)if type(event).__name__ == 'WorkflowOutputEvent' and hasattr(event, 'data'):outputs.append(event.data)# Display final outputsif outputs:print(f"\n🎭 Final Output:\n{outputs[-1]}")else:print("\n⚠️ No workflow outputs captured")# Show created checkpointsfinal_checkpoints = await checkpoint_storage.list_checkpoints()new_checkpoints = [cp for cp in final_checkpoints if cp not in all_checkpoints]print(f"\n💾 Created {len(new_checkpoints)} automatic checkpoints during execution")if __name__ == "__main__":asyncio.run(main())
The mechanics here are straightforward but powerful. When the search agent finishes its work, the state is saved, and a checkpoint is created. If the process is stopped, you can later resume from that checkpoint and continue into the rewriting stage without having to rerun the search. This is useful when workflows are interrupted by human delays, when external services impose rate limits, or when nightly jobs need to resume each morning. The framework not only saves progress but also makes resumption explicit, letting you see what checkpoints are available and decide which one to use. By combining checkpointing with orchestration and human review, you get agents that can branch, pause, and recover from interruptions while still delivering consistent results.
After running the script, we can see the results inside Weave for the calls made to the model:


After running the script a second time, we see the checkpoints that were saved from the previous script. We can select the index of the checkpoint, and our agent pipeline will pick up at the selected step, instead of starting from scratch.

After choosing a checkpoint, we can see the run picks up using the information gathered at the previous run!

Conclusion
In this tutorial, we walked through three core capabilities of the Microsoft Agent Framework.
- First, we explored orchestration, where workflows are expressed as graphs of executors that can run in sequence, branch into parallel tasks, and rejoin into combined outputs.
- Next, we looked at human-in-the-loop design, where the system pauses safely to gather human judgment and then resumes without losing context.
- Finally, we examined checkpointing, which allows workflows to save their progress and continue later rather than starting over after an interruption.
Together, these scripts demonstrate how the Microsoft Agent Framework transforms agent building into a structured and dependable process. Instead of chaining fragile calls, you gain tools for coordination, oversight, and durability. This means agents can scale beyond simple demos into processes that are flexible enough to branch, reliable enough to handle human delays, and resilient enough to survive crashes or long pauses.
The outcome is a foundation where developers can design agents not just as prototypes, but as systems that withstand real-world conditions.
Add a comment
Iterate on AI agents and models faster. Try Weights & Biases today.