Python: [BREAKING] Python: Fix orchestration outputs so as_agent() returns the final answer only. Align other orchestration outputs.#5301
Conversation
…. Align other orchestration outputs
There was a problem hiding this comment.
Pull request overview
This PR changes Python orchestration output semantics so workflow.as_agent().run(prompt) returns only the final “answer” (as an AgentResponse) while intermediate participant activity is surfaced via data events (instead of additional output events). It adds a single new knob (AgentExecutor(emit_intermediate_data=...)) and updates orchestrations, tests, and a sample to align with the clarified contract.
Changes:
- Add
AgentExecutor.emit_intermediate_datato emit paralleldataevents forAgentResponse/AgentResponseUpdate. - Update Sequential/Concurrent/GroupChat/Magentic/Handoff orchestrations to reserve
outputfor terminal answers and usedatafor intermediate agent activity. - Update orchestration tests and a sample to validate/illustrate the new terminal vs intermediate event behavior.
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| python/samples/03-workflows/agents/sequential_workflow_as_agent.py | Updates sample to reflect as_agent() returning only the final agent response and notes intermediate observation via data. |
| python/packages/orchestrations/tests/test_sequential.py | Rewrites sequential tests to assert terminal output is last agent only and intermediate activity is data when enabled. |
| python/packages/orchestrations/tests/test_magentic.py | Updates Magentic tests to expect AgentResponse terminal outputs and data events for intermediate updates. |
| python/packages/orchestrations/tests/test_handoff.py | Updates Handoff tests to reflect no synthetic terminal output and per-agent outputs as the stream result. |
| python/packages/orchestrations/tests/test_group_chat.py | Updates GroupChat tests to expect a single terminal AgentResponse from the orchestrator and intermediate agent updates as data. |
| python/packages/orchestrations/tests/test_concurrent.py | Updates Concurrent tests to expect default aggregator returns AgentResponse with assistant messages only (no user prompt). |
| python/packages/orchestrations/agent_framework_orchestrations/_sequential.py | Changes sequential wiring so the last agent executor is the workflow output executor; uses data events for intermediate participants. |
| python/packages/orchestrations/agent_framework_orchestrations/_orchestration_request_info.py | Plumbs emit_intermediate_data through AgentApprovalExecutor into the inner AgentExecutor. |
| python/packages/orchestrations/agent_framework_orchestrations/_magentic.py | Changes Magentic terminal output to AgentResponse; adds participant emit_intermediate_data plumbing; restricts output executor to orchestrator. |
| python/packages/orchestrations/agent_framework_orchestrations/_handoff.py | Removes terminal yield; relies on per-agent output events as the observable result. |
| python/packages/orchestrations/agent_framework_orchestrations/_group_chat.py | Changes GroupChat completion to yield an AgentResponse; uses participant emit_intermediate_data; restricts outputs to orchestrator. |
| python/packages/orchestrations/agent_framework_orchestrations/_concurrent.py | Changes default aggregator to yield AgentResponse (assistant replies only) and uses participant emit_intermediate_data for intermediate observation. |
| python/packages/orchestrations/agent_framework_orchestrations/_base_group_chat_orchestrator.py | Updates base orchestrator termination/max-round outputs to yield AgentResponse completion messages. |
| python/packages/core/tests/workflow/test_agent_executor.py | Adds core tests asserting emit_intermediate_data produces data events in both streaming and non-streaming modes. |
| python/packages/core/agent_framework/_workflows/_agent_executor.py | Implements emit_intermediate_data by emitting WorkflowEvent.emit(...)/type='data' alongside existing outputs. |
| python/packages/core/agent_framework/_workflows/_agent.py | Updates WorkflowAgent to consume data events carrying AgentResponse / AgentResponseUpdate as part of agent responses. |
moonbox3
left a comment
There was a problem hiding this comment.
Automated Code Review
Reviewers: 3 | Confidence: 82%
✓ Correctness
This PR refactors orchestration output from
list[Message]toAgentResponseand introducesemit_intermediate_dataonAgentExecutorto surface intermediate participants viadataevents. The core mechanics are sound:output_executorsis now always set (not conditioned onintermediate_outputs), intermediate agents emitdataevents, and the_agent.pyadapter layer correctly processes bothoutputanddataevents. The existing unresolved review comments remain valid (AgentApprovalExecutor as last participant in sequential, sample file stale imports/docstrings, data-event propagation through WorkflowExecutor, and docstring ordering in concurrent aggregator). I found no additional correctness bugs beyond those already raised.
✓ Security Reliability
This PR refactors orchestration outputs from
list[Message]toAgentResponseand introducesemit_intermediate_dataonAgentExecutorto surface intermediate participants asdataevents while reservingoutputevents for the terminal answer. The design is clean and theoutput_executors/data-event split is well-implemented. The key security/reliability issues already identified in previous review comments (AgentApprovalExecutor as terminal participant producing no output, docstring ordering inaccuracy,dataevent forwarding in WorkflowExecutor) remain unresolved and still apply. One new concern: the handoff workflow removes all terminalyield_outputcalls, meaning a termination condition that fires before any agent speaks (e.g., from user input alone) will cause the workflow to go idle with zero output events — consumers relying on at least one output would see silent completion.
✓ Test Coverage
The PR introduces a significant refactoring of orchestration output contracts (from
list[Message]toAgentResponse) and addsemit_intermediate_datawiring to surface per-agent responses asdataevents. Test coverage for sequential, group-chat, and magentic workflows is solid, with good new tests for non-streaming, streaming, intermediate outputs, and as_agent scenarios. However, there are notable gaps: ConcurrentBuilder'sintermediate_outputs=Truepath is completely untested despite new wiring, the handoff async-termination test was weakened to a bare IDLE-state check, and sequential intermediate_outputs is only tested in non-streaming mode.
Automated review by moonbox3's agents
1. Sample cleanup: Remove commented-out FoundryChatClient block and update prerequisites to reference OPENAI_CHAT_MODEL_ID instead of FOUNDRY_* vars. 2. Sequential approval output: Change _EndWithConversation.end_with_agent_executor_response from a no-op sink to yield response.agent_response. When the last participant is AgentApprovalExecutor (via with_request_info), _EndWithConversation is the output executor so the yield produces the terminal answer. When the last participant is a regular AgentExecutor, _EndWithConversation is not in output_executors so the yield is silently filtered out. 3. Forward data events through WorkflowExecutor: _process_workflow_result now also forwards 'data' events from sub-workflows so that emit_intermediate_data=True on AgentExecutor works correctly when wrapped in AgentApprovalExecutor. 4. Concurrent docstring: Update _AggregateAgentConversations docstring to say 'deterministic participant order' instead of 'completion order'. 5. Add test_concurrent_intermediate_outputs_emits_data_events verifying that ConcurrentBuilder(intermediate_outputs=True) emits per-participant data events alongside the single aggregated output event. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…outputs (microsoft#5301) Address PR review comments 2, 3, and 5: - Add test_sequential_request_info_last_participant_emits_output: Verifies that when the last participant is wrapped via with_request_info() (AgentApprovalExecutor), the workflow still emits a terminal output after approval, exercising the _EndWithConversation.end_with_agent_executor_response fallback path. - Add test_sequential_request_info_with_intermediate_outputs_emits_data_events: Verifies that emit_intermediate_data=True works correctly through AgentApprovalExecutor wrapping—WorkflowExecutor._process_result already forwards data events from sub-workflows, so intermediate agent responses surface as data events in the parent workflow. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…#5301) Update cast() calls in _group_chat.py and _magentic.py to use WorkflowContext[Never, AgentResponse] instead of the old WorkflowContext[Never, list[Message]], matching the updated method signatures in _base_group_chat_orchestrator.py. Fix _sequential.py _EndWithConversation.end_with_agent_executor_response to declare WorkflowContext[Any, AgentResponse] so yield_output accepts AgentResponse[None]. Fix _workflow_executor.py data event forwarding to handle nullable executor_id. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Extract event.data into a typed local variable before the isinstance check to avoid pyright narrowing it to AgentResponse[Unknown]. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…icrosoft#5301) Add pyright: ignore[reportMissingImports] to orjson imports that are already guarded by try/except ImportError, matching the existing pattern used elsewhere in the samples. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
eavanvalkenburg
left a comment
There was a problem hiding this comment.
small nit on the env var names, otherwise good to go
Reverts the mistaken switch from FoundryChatClient to OpenAIChatClient in the sequential workflow as agent sample. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
… reasoning conversion Layered on top of the prior review-feedback work in this branch. Renames: - AgentExecutor.emit_intermediate_data -> emit_data_events (mechanical rename; orchestration semantics live at the orchestration layer, not the general-purpose executor). Forwarded through MagenticAgentExecutor, AgentApprovalExecutor, and all orchestration call sites. - HandoffAgentExecutor._check_terminate_and_yield -> _should_terminate (pure predicate; no longer yields anything). HandoffBuilder docstring rewritten to describe the new per-agent AgentResponse output contract. WorkflowAgent reasoning-content conversion: - Add _rewrite_text_to_reasoning(contents) and _msg_as_reasoning(msg) helpers; the as_agent() path now reframes text content from data events as text_reasoning Content blocks before merging into the AgentResponse. - Consumers iterate msg.contents and branch on content.type — same path they already use for Claude thinking and OpenAI reasoning. No new field on Message/AgentResponse/WorkflowEvent. - Streaming branch constructs fresh AgentResponseUpdate instances instead of mutating shared payloads (regression test added). - Helper _msg_maybe_reasoning consolidates the conditional rewrite at three call sites in the non-streaming conversion. Tests: - TestWorkflowAgentReasoningHelpers + TestWorkflowAgentDataEventReasoningConversion add 9 new tests covering helpers, non-streaming, streaming, mixed content, already-reasoning passthrough, and mutation-safety regression. - Updated test_sequential_as_agent_with_intermediate_outputs_includes_chain to assert text_reasoning content for intermediate agents.
The streaming conversion path narrowed event.data via isinstance against generic AgentResponse, producing AgentResponse[Unknown] and tripping reportUnknownVariableType/reportUnknownMemberType. Binding data: Any before the check keeps runtime behavior identical while restoring a fully known type for downstream access.
Motivation and Context
Important
Breaking changes are scoped to the still-experimental
agent-framework-orchestrationspackage. Coreagent-frameworkworkflow code is non-breaking — the newAgentExecutor.emit_data_eventskwarg defaults toFalse(no behavior change), andWorkflowAgentadditively consumesdataevents it previously ignored.workflow.as_agent().run(prompt)forSequentialBuilderreturned the user's input plus every agent's reply instead of the last agent's answer. The same class of bug affectedConcurrent,GroupChat,Magentic, andHandoff: they all yieldedlist[Message]conversation dumps asoutputevents, and intermediate agents emittedoutputevents indistinguishable from the final answer whenintermediate_outputs=True.An ADR (#4799) proposed adding an
is_run_completedflag onWorkflowEventto discriminate final from intermediate outputs. This PR takes a simpler path that uses primitives the framework already has — no new event types or discriminator flags.Design
Two layers of discrimination, each at the right boundary:
Event layer (workflow consumers):
outputevents carry the workflow's final answer;dataevents carry intermediate participant activity. Orchestrations restrictoutput_executorsto the terminator, so only the answer surfaces on theoutputchannel. Intermediate participants surface (whenintermediate_outputs=True) on thedatachannel viaAgentExecutor(emit_data_events=True), which mirrorsyield_outputtoWorkflowEvent.emit. Workflow consumers branch onevent.type.Content layer (
as_agent()consumers):WorkflowAgentrewrites text content fromdataevents astext_reasoningContent blocks before merging into the finalAgentResponse. Consumers iteratemsg.contentsand branch oncontent.type— the same rendering path they already use for Claude thinking and OpenAI reasoning. No new field onMessage/AgentResponse/WorkflowEvent.Per-orchestration contract
outputeventAgentResponse(streaming chunks — the lastAgentExecutoritself is the workflow's output executor)list[Message](unchanged)AgentResponsewith one assistant message per participant (no user prompt prepended)AgentResponsewith the orchestrator's completion messageAgentResponsewith the manager's synthesized final answeroutputevents as agents speakBehavior changes (orchestrations package)
workflow.as_agent().run(prompt)now returns only the meaningful answer for each orchestration, matching the agent contract.ConcurrentBuilderdefault aggregator output changed fromlist[Message](user + per-agent) toAgentResponse(per-agent only). Callers consumingoutput_events[0].datawill see anAgentResponseinstead of a list.SequentialBuilder,GroupChat, andMagenticterminal output changed fromlist[Message](full conversation) toAgentResponse(answer only).Handoffno longer emits a synthetic terminal event. The last agent'soutputevent is the end of the stream.intermediate_outputs=Trueno longer floods theoutputchannel — intermediate participants surface on thedatachannel and arrive atas_agent()consumers astext_reasoningcontent.Review feedback addressed
emit_intermediate_data→emit_data_events(mechanically accurate; orchestration semantics live at the orchestration layer).dataevents are not "re-introducing the removedAgentResponsesEvent" — they're the unified event model's documented intermediate-data channel (WorkflowEvent.emit). The new behavior is the content rewrite at theWorkflowAgentboundary, not a new event class.AgentResponseUpdateinstances instead of mutating shared payloads (regression test added).emit_data_eventsis suppressed when the default aggregator is in use; only enabled with custom aggregators where intermediate visibility is genuinely separate from the aggregator's transformed answer.SequentialBuildernow correctly recognizesAgentApprovalExecutoras a valid terminator and routes its inner output to the parent viaallow_direct_output=True.AgentApprovalExecutornow forwards innerdataevents through to the parent so wrapped intermediate participants don't silently drop their reasoning.Contribution Checklist