Skip to content

Streaming(流式处理)

Streaming让你可以在agent运行过程中订阅实时更新。这对于向最终用户展示进度更新和部分响应特别有用。

要使用流式处理,你可以调用Runner.run_streamed(),它会返回一个RunResultStreaming对象。调用result.stream_events()会给你一个异步流,包含StreamEvent对象,下面会详细介绍这些对象。

原始响应事件

RawResponsesStreamEvent是直接从LLM传递的原始事件。它们采用OpenAI Responses API格式,这意味着每个事件都有一个类型(如response.createdresponse.output_text.delta等)和数据。如果你想在生成响应消息后立即将其流式传输给用户,这些事件非常有用。

例如,下面的代码将逐个token输出LLM生成的文本:

import asyncio
from openai.types.responses import ResponseTextDeltaEvent
from agents import Agent, Runner

async def main():
    agent = Agent(
        name="Joker",
        instructions="你是一个有帮助的助手。",
    )

    result = Runner.run_streamed(agent, input="请给我讲5个笑话。")
    async for event in result.stream_events():
        if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
            print(event.data.delta, end="", flush=True)


if __name__ == "__main__":
    asyncio.run(main())

运行项目事件和agent事件

RunItemStreamEvent是更高级别的事件。它们会在某个项目完全生成时通知你。这允许你在"消息已生成"、"工具已运行"等级别推送进度更新,而不是每个token。类似地,AgentUpdatedStreamEvent会在当前agent发生变化时(例如,作为交接的结果)提供更新。

例如,下面的代码会忽略原始事件并向用户流式传输更新:

import asyncio
import random
from agents import Agent, ItemHelpers, Runner, function_tool

@function_tool
def how_many_jokes() -> int:
    return random.randint(1, 10)


async def main():
    agent = Agent(
        name="Joker",
        instructions="首先调用`how_many_jokes`工具,然后讲相应数量的笑话。",
        tools=[how_many_jokes],
    )

    result = Runner.run_streamed(
        agent,
        input="你好",
    )
    print("=== 开始运行 ===")

    async for event in result.stream_events():
        # 我们会忽略原始响应事件增量
        if event.type == "raw_response_event":
            continue
        # 当agent更新时,打印出来
        elif event.type == "agent_updated_stream_event":
            print(f"Agent已更新: {event.new_agent.name}")
            continue
        # 当项目生成时,打印它们
        elif event.type == "run_item_stream_event":
            if event.item.type == "tool_call_item":
                print("-- 工具被调用")
            elif event.item.type == "tool_call_output_item":
                print(f"-- 工具输出: {event.item.output}")
            elif event.item.type == "message_output_item":
                print(f"-- 消息输出:\n {ItemHelpers.text_message_output(event.item)}")
            else:
                pass  # 忽略其他事件类型

    print("=== 运行完成 ===")


if __name__ == "__main__":
    asyncio.run(main())