Function Calling 流式累积机制

流式场景下 LLM 返回的 tool_call 是分多个 chunk 增量到达的,必须按 index 累积拼接 id/name/arguments,流结束后统一解析。这是 AI 应用流式架构的核心工程难点。

#type / concept #status / growing #tech / ai

[!info] related notes

Function Calling 流式累积机制

这篇解决什么问题

非流式模式下,LLM 返回的 tool_call 是一个完整的 JSON,直接解析就行。但流式模式下,同一个 tool_call 被拆成多个 chunk 分批到达——你必须把碎片拼起来才能得到完整的工具调用请求。这是 AI 应用中最容易出 bug 的工程点

问题本质

流式返回时,一个 tool_call 被拆成这样:

chunk 1: {index:0, id:"call_abc", function:{name:"extract_info"}}
chunk 2: {index:0, function:{arguments:"{\"body_p"}}
chunk 3: {index:0, function:{arguments:"art\":\"肩部\",\"symp"}}
chunk 4: {index:0, function:{arguments:"tom_type\":\"疼痛\"}"}}

你必须:

  1. index 区分不同的 tool_call(一个响应可以有多个并行 tool_call)
  2. 把同一个 index 的 arguments 字符串拼接起来
  3. 在流结束时统一 json.loads 解析

累积算法

# 按 index 累积
accumulated: dict[int, dict] = {}

async for chunk in stream:
    delta = chunk.choices[0].delta

    if delta.tool_calls:
        for tc in delta.tool_calls:
            idx = tc.index
            if idx not in accumulated:
                accumulated[idx] = {"id": "", "name": "", "arguments": ""}

            if tc.id:
                accumulated[idx]["id"] = tc.id
            if tc.function:
                if tc.function.name:
                    accumulated[idx]["name"] = tc.function.name
                if tc.function.arguments:
                    accumulated[idx]["arguments"] += tc.function.arguments  # 拼接

    # 流结束时解析
    if chunk.choices[0].finish_reason in ("stop", "tool_calls"):
        tool_calls = []
        for idx in sorted(accumulated):
            acc = accumulated[idx]
            tool_calls.append(ToolCall(
                id=acc["id"],
                name=acc["name"],
                arguments=json.loads(acc["arguments"]) if acc["arguments"] else {},
            ))

关键细节

index 是唯一标识

一个响应可以包含多个并行 tool_call(LLM 同时调用多个工具)。每个 tool_call 有唯一的 index

id 和 name 只在第一个 chunk 出现

if tc.id:  # 只有第一个 chunk 有值
    accumulated[idx]["id"] = tc.id
if tc.function.name:  # 只有第一个 chunk 有值
    accumulated[idx]["name"] = tc.function.name

arguments 是增量拼接

if tc.function.arguments:
    accumulated[idx]["arguments"] += tc.function.arguments

每次只有片段,必须字符串拼接。

finish_reason 触发解析

  • "stop": 正常结束(没有 tool_call)
  • "tool_calls": 模型决定调用工具

只在这两种情况下才解析累积的 arguments。

封装在 Provider 层

好的设计:累积逻辑封装在 LLM Provider 内部,上层只在 finished=True 时拿到完整的 tool_calls:

@dataclass
class StreamChunk:
    delta: str = ""                          # 文本增量(每个 chunk 都有)
    tool_calls: list[ToolCall] | None = None  # 只在 finished 时填充
    finished: bool = False

上层代码:

async for chunk in provider.chat_stream(messages, tools):
    if chunk.delta:
        yield text_event(chunk.delta)

    if chunk.tool_calls:  # 只在流结束时到达
        for tc in chunk.tool_calls:
            if tc.name == "extract_symptom_info":
                update_symptoms(tc.arguments)

坏的设计:把累积逻辑暴露给上层,每个调用点都要自己拼 arguments。

错误处理

JSON 解析失败

LLM 有时会输出非法 JSON:

try:
    args = json.loads(acc["arguments"])
except json.JSONDecodeError:
    args = {"_raw": acc["arguments"], "_error": "invalid_json"}

保留原始字符串供调试和重试。

流中断

用户取消或网络断开时,累积到一半的 tool_call 直接丢弃,不要尝试解析不完整的 arguments。

常见错误

在每个 chunk 都尝试解析 arguments

# ❌ 每个 chunk 都 json.loads
args = json.loads(tc.function.arguments)  # 一定是不完整的 JSON

不按 index 分组

# ❌ 所有 chunk 的 arguments 拼到一起
all_args += tc.function.arguments  # 多个 tool_call 会混在一起

在 Provider 外部做累积

# ❌ 上层自己拼
async for chunk in provider.stream():
    if chunk.tool_calls:  # 这里拿到的是碎片,不是完整 tool_calls
        for tc in chunk.tool_calls:
            args = tc.arguments  # 不完整的
创建于 2026/6/25 更新于 2026/6/25