Harden streaming deltas against null payloads

This commit is contained in:
Areon Lundkvist
2026-06-01 16:09:17 +02:00
committed by GitHub
parent e7d61c724f
commit f853a3fc67
+13 -13
View File
@@ -387,8 +387,8 @@ def _build_anthropic_payload(model, messages, temperature, max_tokens, stream=Fa
if m.get("content"): if m.get("content"):
content.append({"type": "text", "text": m["content"]}) content.append({"type": "text", "text": m["content"]})
for tc in m["tool_calls"]: for tc in m["tool_calls"]:
fn = tc.get("function", {}) fn = tc.get("function") or {}
args_str = fn.get("arguments", "{}") args_str = fn.get("arguments") or "{}"
try: try:
args = json.loads(args_str) if isinstance(args_str, str) else args_str args = json.loads(args_str) if isinstance(args_str, str) else args_str
except (json.JSONDecodeError, TypeError): except (json.JSONDecodeError, TypeError):
@@ -886,26 +886,26 @@ async def stream_llm(url: str, model: str, messages: List[Dict], temperature: fl
evt = j.get("type", "") evt = j.get("type", "")
if evt == "content_block_start": if evt == "content_block_start":
_anth_block_idx = j.get("index", _anth_block_idx + 1) _anth_block_idx = j.get("index", _anth_block_idx + 1)
cb = j.get("content_block", {}) cb = j.get("content_block") or {}
_anth_block_type = cb.get("type", "text") _anth_block_type = cb.get("type", "text")
if _anth_block_type == "tool_use": if _anth_block_type == "tool_use":
_anth_tool_blocks[_anth_block_idx] = { _anth_tool_blocks[_anth_block_idx] = {
"id": cb.get("id", f"call_{_anth_block_idx}"), "id": cb.get("id") or f"call_{_anth_block_idx}",
"name": cb.get("name", ""), "name": cb.get("name") or "",
"arguments": "", "arguments": "",
} }
elif evt == "content_block_delta": elif evt == "content_block_delta":
delta = j.get("delta", {}) delta = j.get("delta") or {}
delta_type = delta.get("type", "") delta_type = delta.get("type", "")
if delta_type == "text_delta": if delta_type == "text_delta":
text = delta.get("text", "") text = delta.get("text") or ""
if text: if text:
yield f'data: {json.dumps({"delta": text})}\n\n' yield f'data: {json.dumps({"delta": text})}\n\n'
elif delta_type == "input_json_delta": elif delta_type == "input_json_delta":
# Accumulate tool arguments JSON # Accumulate tool arguments JSON
idx = j.get("index", _anth_block_idx) idx = j.get("index", _anth_block_idx)
if idx in _anth_tool_blocks: if idx in _anth_tool_blocks:
partial = delta.get("partial_json", "") partial = delta.get("partial_json") or ""
_anth_tool_blocks[idx]["arguments"] += partial _anth_tool_blocks[idx]["arguments"] += partial
# Stream tool arg deltas for doc tools # Stream tool arg deltas for doc tools
if partial and _anth_tool_blocks[idx].get("name") in ("create_document", "update_document", "edit_document"): if partial and _anth_tool_blocks[idx].get("name") in ("create_document", "update_document", "edit_document"):
@@ -1000,14 +1000,14 @@ async def stream_llm(url: str, model: str, messages: List[Dict], temperature: fl
u = j["usage"] u = j["usage"]
yield f'data: {json.dumps({"type": "usage", "data": {"input_tokens": u.get("prompt_tokens", 0), "output_tokens": u.get("completion_tokens", 0)}})}\n\n' yield f'data: {json.dumps({"type": "usage", "data": {"input_tokens": u.get("prompt_tokens", 0), "output_tokens": u.get("completion_tokens", 0)}})}\n\n'
elif "choices" in j: elif "choices" in j:
delta = j["choices"][0].get("delta", {}) delta = j["choices"][0].get("delta") or {}
if isinstance(delta, dict): if isinstance(delta, dict):
# Text content # Text content
# Reasoning tokens (VLLM --reasoning-parser, e.g. Qwen3/DeepSeek-R1) # Reasoning tokens (VLLM --reasoning-parser, e.g. Qwen3/DeepSeek-R1)
reasoning = delta.get("reasoning_content", "") reasoning = delta.get("reasoning_content") or ""
if reasoning: if reasoning:
yield f'data: {json.dumps({"delta": reasoning, "thinking": True})}\n\n' yield f'data: {json.dumps({"delta": reasoning, "thinking": True})}\n\n'
content = delta.get("content", "") content = delta.get("content") or ""
if content: if content:
# Some thinking backends start normal content with a # Some thinking backends start normal content with a
# stray closing tag. Repair only that shape; do not # stray closing tag. Repair only that shape; do not
@@ -1018,13 +1018,13 @@ async def stream_llm(url: str, model: str, messages: List[Dict], temperature: fl
_first_content_sent = True _first_content_sent = True
yield f'data: {json.dumps({"delta": content})}\n\n' yield f'data: {json.dumps({"delta": content})}\n\n'
# Native tool calls — accumulate across chunks # Native tool calls — accumulate across chunks
for tc in delta.get("tool_calls", []): for tc in delta.get("tool_calls") or []:
idx = tc.get("index", 0) idx = tc.get("index", 0)
if idx not in _tc_acc: if idx not in _tc_acc:
_tc_acc[idx] = {"id": "", "name": "", "arguments": ""} _tc_acc[idx] = {"id": "", "name": "", "arguments": ""}
if tc.get("id"): if tc.get("id"):
_tc_acc[idx]["id"] = tc["id"] _tc_acc[idx]["id"] = tc["id"]
func = tc.get("function", {}) func = tc.get("function") or {}
if func.get("name"): if func.get("name"):
_tc_acc[idx]["name"] = func["name"] _tc_acc[idx]["name"] = func["name"]
if "arguments" in func: if "arguments" in func: