Whitepaper
Docs
Sign In
Function
Function
pipe
v1.0
DeepSeek R1 (support siliconflow)
Function ID
deepseek_r1_support_siliconflow
Creator
@xuzhougneg
Downloads
981+
在OpwenWebUI中显示DeepSeek R1模型的思维链,支持官方API和硅基流动的API
Get
README
No README available
Function Code
Show
""" title: DeepSeek R1 author: xuzhougeng description: 从zgccrui的版本修改而来,支持官方API和硅基流动的API。在OpwenWebUI中显示DeepSeek R1模型的思维链 - 仅支持0.5.6及以上版本 version: 1.0 licence: MIT """ import json import httpx from typing import AsyncGenerator, Callable, Awaitable from pydantic import BaseModel, Field class Pipe: class Valves(BaseModel): DEEPSEEK_API_BASE_URL: str = Field( default="https://api.siliconflow.com/v1", description="DeepSeek API的基础请求地址", ) DEEPSEEK_API_KEY: str = Field( default="", description="用于身份验证的DeepSeek API密钥,可从控制台获取" ) DEEPSEEK_R1_MODEL: str = Field( default="deepseek-ai/DeepSeek-R1", description="DeepSeek R1模型的ID,可从控制台获取", ) def __init__(self): self.valves = self.Valves() self.data_prefix = "data: " self.thinking = -1 # -1:未开始 0:思考中 1:已回答 self.emitter = None def pipes(self): return [{"id": self.valves.DEEPSEEK_R1_MODEL, "name": "deepseek-reasoner"}] async def pipe( self, body: dict, __event_emitter__: Callable[[dict], Awaitable[None]] = None ) -> AsyncGenerator[str, None]: """主处理管道(已移除缓冲)""" self.thinking = -1 self.emitter = __event_emitter__ # 验证配置 if not self.valves.DEEPSEEK_API_KEY: yield json.dumps({"error": "未配置API密钥"}, ensure_ascii=False) return # 准备请求参数 headers = { "Authorization": f"Bearer {self.valves.DEEPSEEK_API_KEY}", "Content-Type": "application/json", } try: # 模型ID提取 model_id = body["model"].split(".", 1)[-1] payload = {**body, "model": model_id} # 处理消息以防止连续的相同角色 messages = payload["messages"] i = 0 while i < len(messages) - 1: if messages[i]["role"] == messages[i + 1]["role"]: # 插入具有替代角色的占位符消息 alternate_role = "assistant" if messages[i]["role"] == "user" else "user" messages.insert(i + 1, { "role": alternate_role, "content": "[Unfinished thinking]" }) i += 1 # yield json.dumps(payload, ensure_ascii=False) # 发起API请求 async with httpx.AsyncClient(http2=True) as client: async with client.stream( "POST", f"{self.valves.DEEPSEEK_API_BASE_URL}/chat/completions", json=payload, headers=headers, timeout=30, ) as response: # 错误处理 if response.status_code != 200: error = await response.aread() yield self._format_error(response.status_code, error) return # 流式处理响应 async for line in response.aiter_lines(): if not line.startswith(self.data_prefix): continue data = json.loads(line[len(self.data_prefix) :]) choice = data.get("choices", [{}])[0] # 结束条件判断 if choice.get("finish_reason"): return # 状态机处理 state_output = await self._update_thinking_state( choice.get("delta", {}) ) if state_output: yield state_output # 直接发送状态标记 if state_output == "<think>": yield "\n" # 内容处理并立即发送 content = self._process_content(choice["delta"]) if content: yield content except Exception as e: yield self._format_exception(e) async def _update_thinking_state(self, delta: dict) -> str: """更新思考状态机(严格管理推理标记)""" state_output = "" # 安全获取并处理可能的None值 reasoning_content = delta.get("reasoning_content") or "" content = delta.get("content") or "" has_reasoning = bool(reasoning_content.strip()) has_content = bool(content.strip()) # 状态转换:未开始 -> 推理中(仅当有实际推理内容时) if self.thinking == -1 and has_reasoning: self.thinking = 0 state_output = "<think>" # 状态转换:推理中 -> 已回答(当推理内容完全结束时) elif self.thinking == 0 and not has_reasoning and has_content: self.thinking = 1 state_output = "\n</think>\n\n" return state_output def _process_content(self, delta: dict) -> str: """合并处理两种内容""" content = [] # 处理reasoning_content if delta.get("reasoning_content") is not None: text = delta["reasoning_content"] if text: content.append(text) # 处理content if delta.get("content") is not None: text = delta["content"] if text: content.append(text) return ''.join(content) def _format_error(self, status_code: int, error: bytes) -> str: """错误格式化保持不变""" try: err_msg = json.loads(error).get("message", error.decode(errors="ignore"))[:200] except: err_msg = error.decode(errors="ignore")[:200] return json.dumps( {"error": f"HTTP {status_code}: {err_msg}"}, ensure_ascii=False ) def _format_exception(self, e: Exception) -> str: """异常格式化保持不变""" err_type = type(e).__name__ return json.dumps({"error": f"{err_type}: {str(e)}"}, ensure_ascii=False)