"""
title: SiliconFlow R1 Healling
author: samuka007
description: 修复硅基流动中DeepSeek-R1 API不能正确渲染 <think> 的问题。参考:https://openwebui.com/f/zgccrui/deepseek_r1
version: 0.2.1
licence: MIT
"""
import json
import httpx
import re
from typing import AsyncGenerator, Callable, Awaitable
from pydantic import BaseModel, Field
class Pipe:
class Valves(BaseModel):
API_BASE_URL: str = Field(
default="https://api.siliconflow.cn/v1",
description="SiliconFlow API Base URL",
)
API_KEY: str = Field(default="", description="API key")
TARGET_MODEL_ID: str = Field(
default="deepseek-ai/DeepSeek-R1", description="Model ID"
)
def __init__(self):
self.valves = self.Valves()
self.data_prefix = "data: "
self.emitter = None
def pipes(self):
return [
{
"id": self.valves.TARGET_MODEL_ID,
"name": "ThinkBlock-Healing/DeepSeek-R1",
}
]
async def pipe(
self, body: dict, __event_emitter__: Callable[[dict], Awaitable[None]] = None
) -> AsyncGenerator[str, None]:
"""主处理管道(已移除缓冲)"""
self.thinking = -1
self.emitter = __event_emitter__
# 验证配置
if not self.valves.API_KEY:
yield json.dumps({"error": "未配置API密钥"}, ensure_ascii=False)
return
# 准备请求参数
headers = {
"Authorization": f"Bearer {self.valves.API_KEY}",
"Content-Type": "application/json",
}
try:
# 模型ID提取
model_id = body["model"].split(".", 1)[-1]
payload = {**body, "model": model_id}
# 处理消息以防止连续的相同角色
messages = payload["messages"]
i = 0
while i < len(messages) - 1:
if messages[i]["role"] == messages[i + 1]["role"]:
# 插入具有替代角色的占位符消息
alternate_role = (
"assistant" if messages[i]["role"] == "user" else "user"
)
messages.insert(
i + 1,
{"role": alternate_role, "content": "[Unfinished thinking]"},
)
i += 1
# yield json.dumps(payload, ensure_ascii=False)
# 发起API请求
async with httpx.AsyncClient(http2=True) as client:
async with client.stream(
"POST",
f"{self.valves.API_BASE_URL}/chat/completions",
json=payload,
headers=headers,
timeout=300,
) as response:
# 错误处理
if response.status_code != 200:
error = await response.aread()
yield self._format_error(response.status_code, error)
return
# 流式处理响应
async for line in response.aiter_lines():
if not line.startswith(self.data_prefix):
continue
data = json.loads(line[len(self.data_prefix) :])
choice = data.get("choices", [{}])[0]
# 内容处理并立即发送
content = choice["delta"].get("content", "")
if content:
if content.startswith("<think>"):
content = content[7:]
yield "<think>"
# 添加尾置换行,这也是为什么没能正确渲染的问题,见
# https://github.com/open-webui/open-webui/commit/c9dc7299c5dbdd6daee5589d5871d5c1589f37bf#diff-73e28dd627aa137e2c628a274c7493f8d758cc857d7a7e8887652442b733aa88R1117-R1149
yield "\n"
if content.startswith("</think>"):
content = content[7:]
yield "</think>"
yield "\n"
yield content
except Exception as e:
yield self._format_exception(e)
def _format_error(self, status_code: int, error: bytes) -> str:
"""错误格式化保持不变"""
try:
err_msg = json.loads(error).get("message", error.decode(errors="ignore"))[
:200
]
except:
err_msg = error.decode(errors="ignore")[:200]
return json.dumps(
{"error": f"HTTP {status_code}: {err_msg}"}, ensure_ascii=False
)
def _format_exception(self, e: Exception) -> str:
"""异常格式化保持不变"""
err_type = type(e).__name__
return json.dumps({"error": f"{err_type}: {str(e)}"}, ensure_ascii=False)