Function Calling 流式累积机制
流式场景下 LLM 返回的 tool_call 是分多个 chunk 增量到达的,必须按 index 累积拼接 id/name/arguments,流结束后统一解析。这是 AI 应用流式架构的核心工程难点。
#type / concept
#status / growing
#tech / ai
[!info] related notes
- 前置概念: Function Calling, SSE
- 上层设计: LLM Provider 抽象层
- 应用场景: 咨询 Agent 工作流
Function Calling 流式累积机制
这篇解决什么问题
非流式模式下,LLM 返回的 tool_call 是一个完整的 JSON,直接解析就行。但流式模式下,同一个 tool_call 被拆成多个 chunk 分批到达——你必须把碎片拼起来才能得到完整的工具调用请求。这是 AI 应用中最容易出 bug 的工程点。
问题本质
流式返回时,一个 tool_call 被拆成这样:
chunk 1: {index:0, id:"call_abc", function:{name:"extract_info"}}
chunk 2: {index:0, function:{arguments:"{\"body_p"}}
chunk 3: {index:0, function:{arguments:"art\":\"肩部\",\"symp"}}
chunk 4: {index:0, function:{arguments:"tom_type\":\"疼痛\"}"}}
你必须:
- 用
index区分不同的 tool_call(一个响应可以有多个并行 tool_call) - 把同一个 index 的
arguments字符串拼接起来 - 在流结束时统一
json.loads解析
累积算法
# 按 index 累积
accumulated: dict[int, dict] = {}
async for chunk in stream:
delta = chunk.choices[0].delta
if delta.tool_calls:
for tc in delta.tool_calls:
idx = tc.index
if idx not in accumulated:
accumulated[idx] = {"id": "", "name": "", "arguments": ""}
if tc.id:
accumulated[idx]["id"] = tc.id
if tc.function:
if tc.function.name:
accumulated[idx]["name"] = tc.function.name
if tc.function.arguments:
accumulated[idx]["arguments"] += tc.function.arguments # 拼接
# 流结束时解析
if chunk.choices[0].finish_reason in ("stop", "tool_calls"):
tool_calls = []
for idx in sorted(accumulated):
acc = accumulated[idx]
tool_calls.append(ToolCall(
id=acc["id"],
name=acc["name"],
arguments=json.loads(acc["arguments"]) if acc["arguments"] else {},
))
关键细节
index 是唯一标识
一个响应可以包含多个并行 tool_call(LLM 同时调用多个工具)。每个 tool_call 有唯一的 index。
id 和 name 只在第一个 chunk 出现
if tc.id: # 只有第一个 chunk 有值
accumulated[idx]["id"] = tc.id
if tc.function.name: # 只有第一个 chunk 有值
accumulated[idx]["name"] = tc.function.name
arguments 是增量拼接
if tc.function.arguments:
accumulated[idx]["arguments"] += tc.function.arguments
每次只有片段,必须字符串拼接。
finish_reason 触发解析
"stop": 正常结束(没有 tool_call)"tool_calls": 模型决定调用工具
只在这两种情况下才解析累积的 arguments。
封装在 Provider 层
好的设计:累积逻辑封装在 LLM Provider 内部,上层只在 finished=True 时拿到完整的 tool_calls:
@dataclass
class StreamChunk:
delta: str = "" # 文本增量(每个 chunk 都有)
tool_calls: list[ToolCall] | None = None # 只在 finished 时填充
finished: bool = False
上层代码:
async for chunk in provider.chat_stream(messages, tools):
if chunk.delta:
yield text_event(chunk.delta)
if chunk.tool_calls: # 只在流结束时到达
for tc in chunk.tool_calls:
if tc.name == "extract_symptom_info":
update_symptoms(tc.arguments)
坏的设计:把累积逻辑暴露给上层,每个调用点都要自己拼 arguments。
错误处理
JSON 解析失败
LLM 有时会输出非法 JSON:
try:
args = json.loads(acc["arguments"])
except json.JSONDecodeError:
args = {"_raw": acc["arguments"], "_error": "invalid_json"}
保留原始字符串供调试和重试。
流中断
用户取消或网络断开时,累积到一半的 tool_call 直接丢弃,不要尝试解析不完整的 arguments。
常见错误
在每个 chunk 都尝试解析 arguments
# ❌ 每个 chunk 都 json.loads
args = json.loads(tc.function.arguments) # 一定是不完整的 JSON
不按 index 分组
# ❌ 所有 chunk 的 arguments 拼到一起
all_args += tc.function.arguments # 多个 tool_call 会混在一起
在 Provider 外部做累积
# ❌ 上层自己拼
async for chunk in provider.stream():
if chunk.tool_calls: # 这里拿到的是碎片,不是完整 tool_calls
for tc in chunk.tool_calls:
args = tc.arguments # 不完整的