Whitepaper
Docs
Sign In
Function
Function
pipe
v1.0.7
Deep Thinking
Function ID
deepthinking
Creator
@timwhite
Downloads
170+
Thinking + Generate
Get
README
No README available
Function Code
Show
""" title: Deep Thinking author: TimWhite description: 在OpwenWebUI中支持类似DeepClaude的思维链和回复模型分离 - 仅支持0.5.6及以上版本 version: 1.0.7 licence: MIT """ import json import httpx import re from typing import AsyncGenerator, Callable, Awaitable from pydantic import BaseModel, Field import asyncio class Pipe: class Valves(BaseModel): """ 插件配置参数类 (Valves). 包含 Think Model 和 Base Model 两组 API 参数配置。 """ think_model_DEEPSEEK_API_BASE_URL: str = Field( default="https://api.deepseek.com/v1", description="Think Model - DeepSeek API的基础请求地址", ) think_model_DEEPSEEK_API_KEY: str = Field( default="", description="Think Model - 用于身份验证的DeepSeek API密钥,可从控制台获取", ) think_model_DEEPSEEK_API_MODEL: str = Field( default="deepseek-reasoner", description="Think Model - API请求的模型名称,默认为 deepseek-reasoner (用于生成思维链)", ) base_model_OPENAI_API_BASE_URL: str = Field( default="https://api.openai.com/v1", description="Base Model - OpenAI API的基础请求地址, 也可以是其他OpenAI格式api", ) base_model_OPENAI_API_KEY: str = Field( default="", description="Base Model - 用于身份验证的API密钥,可从控制台获取" ) base_model_OPENAI_API_MODEL: str = Field( default="gpt-4o-mini", description="Base Model - API请求的模型名称,默认为 gpt-4o-mini (用于生成最终答案)", ) def __init__(self): """ Pipe 类的初始化方法. 初始化配置参数, 数据前缀, 思考状态, 以及事件发射器. """ self.valves = self.Valves() # 初始化配置参数 self.data_prefix = "data: " # SSE 数据流前缀 self.thinking = -1 # -1:未开始思考 0:思考中 1:已回答 (思考状态机) self.emitter = None # 事件发射器 (用于 OpwenWebUI 插件框架) def pipes(self): """ 定义插件管道信息. 返回插件支持的管道列表,这里使用 Base Model 的模型 ID 作为管道 ID 和名称. """ return [ { "id": self.valves.base_model_OPENAI_API_MODEL, "name": self.valves.base_model_OPENAI_API_MODEL, } ] async def pipe( self, body: dict, __event_emitter__: Callable[[dict], Awaitable[None]] = None ) -> AsyncGenerator[str, None]: """ 主处理管道函数 (核心逻辑). 处理用户请求,依次调用 Think Model 获取思维链,再调用 Base Model 获取最终答案。 """ thinking_state = {"thinking": -1} # 使用字典来存储thinking状态 self.emitter = __event_emitter__ # 设置事件发射器 user_messages = body["messages"] # 保存用户的原始消息,后续传递给 Base Model # 验证 API 密钥配置 (Think Model 和 Base Model 密钥都需要配置) if not self.valves.think_model_DEEPSEEK_API_KEY: yield json.dumps( {"error": "未配置 Think Model API 密钥"}, ensure_ascii=False ) return if not self.valves.base_model_OPENAI_API_KEY: yield json.dumps( {"error": "未配置 Base Model API 密钥"}, ensure_ascii=False ) return # --------------------- 步骤 1: 请求 Think Model 获取思维链 --------------------- think_model_headers = { "Authorization": f"Bearer {self.valves.think_model_DEEPSEEK_API_KEY}", "Content-Type": "application/json", } think_model_payload = { **body, "model": self.valves.think_model_DEEPSEEK_API_MODEL, # 使用 Think Model } think_content = "" # 用于保存从 Think Model 获取的思维链内容 try: # Think Model Request 的 try 代码块开始 async with httpx.AsyncClient(http2=True) as client: # 使用 http2 优化连接 async with client.stream( # 使用 stream 方法获取 SSE 数据流 "POST", f"{self.valves.think_model_DEEPSEEK_API_BASE_URL}/chat/completions", # Think Model API Endpoint json=think_model_payload, headers=think_model_headers, timeout=300, # 设置超时时间 ) as response: if response.status_code != 200: # 检查 HTTP 状态码,非 200 表示错误 error = await response.aread() # 读取错误响应内容 yield self._format_error( response.status_code, error ) # 格式化错误信息并 yield return async for ( line ) in response.aiter_lines(): # 异步迭代 SSE 数据流的每一行 if not line.startswith(self.data_prefix): # 过滤非 data 行 continue json_str = line[len(self.data_prefix) :] # 截取 JSON 字符串 # 去除首尾空格后检查是否为结束标记 if json_str.strip() == "[DONE]": return try: data = json.loads(json_str) # 解析 JSON 数据 except json.JSONDecodeError as e: # JSON 解析错误处理 error_detail = f"解析失败 - 内容:{json_str},原因:{e}" yield self._format_error("JSONDecodeError", error_detail) return choice = data.get("choices", [{}])[ 0 ] # 获取 choices 列表中的第一个 choice if choice.get( "finish_reason" ): # 检查是否完成 (Think Model 思考完成) break # 结束 Think Model 的数据接收循环 # 状态机处理 state_output = await self._update_thinking_state( choice.get("delta", {}), thinking_state ) if state_output: # 如果状态发生变化,则 yield 状态标记 yield state_output if ( state_output == "<think>" ): # 如果是开始思考状态,yield 换行符 yield "\n" reasoning_content = choice.get("delta", {}).get( "reasoning_content", "" ) # 提取 reasoning_content if reasoning_content: # 如果提取到内容 if reasoning_content.startswith( "<think>" ): # 处理 thinking 开始标记 match = re.match(r"^<think>", reasoning_content) if match: reasoning_content = re.sub( r"^<think>", "", reasoning_content ) # 移除标记 yield "<think>" # yield 标记 await asyncio.sleep(0.1) # 适当延时 yield "\n" # yield 换行 elif reasoning_content.startswith( "</think>" ): # 处理 thinking 结束标记 match = re.match(r"^</think>", reasoning_content) if match: reasoning_content = re.sub( r"^</think>", "", reasoning_content ) # 移除标记 yield "</think>" # yield 标记 await asyncio.sleep(0.1) # 适当延时 yield "\n" # yield 换行 think_content += reasoning_content # 累加思维链内容 yield reasoning_content # <- 重要修改:这里仍然需要 yield 思维链内容,以便在 UI 上显示 except Exception as e: # Think Model Request 的 try 代码块异常处理 yield self._format_exception(e) # 格式化异常信息并 yield return finally: pass # Think Model 请求完成后的清理代码 (当前为空) # --------------------- 步骤 2: 请求 Base Model 获取最终答案 --------------------- base_model_headers = { "Authorization": f"Bearer {self.valves.base_model_OPENAI_API_KEY}", "Content-Type": "application/json", } # 将思维链内容添加到发送给 Base Model 的消息列表中 base_model_messages = user_messages + [ {"role": "assistant", "content": f"<think>\n{think_content}\n</think>"} ] # 构造新的消息列表,包含用户消息和思维链 base_model_payload = { **body, "model": self.valves.base_model_OPENAI_API_MODEL, # 使用 Base Model "messages": base_model_messages, # 使用包含思维链的消息列表 } try: # Base Model Request 的 try 代码块开始 async with httpx.AsyncClient(http2=True) as client: # 使用 http2 优化连接 async with client.stream( # 使用 stream 方法获取 SSE 数据流 "POST", f"{self.valves.base_model_OPENAI_API_BASE_URL}/chat/completions", # Base Model API Endpoint json=base_model_payload, headers=base_model_headers, timeout=300, # 设置超时时间 ) as response: if response.status_code != 200: # 检查 HTTP 状态码 error = await response.aread() # 读取错误响应 yield self._format_error( response.status_code, error ) # 格式化错误并 yield return async for line in response.aiter_lines(): # 异步迭代 SSE 数据流 if not line.startswith(self.data_prefix): # 过滤非 data 行 continue json_str = line[len(self.data_prefix) :] # 截取 JSON 字符串 try: data = json.loads(json_str) # 解析 JSON 数据 except json.JSONDecodeError as e: # JSON 解析错误处理 error_detail = f"解析失败 - 内容:{json_str},原因:{e}" yield self._format_error("JSONDecodeError", error_detail) return choice = data.get("choices", [{}])[ 0 ] # 获取 choices 列表中的第一个 choice if choice.get( "finish_reason" ): # 检查是否完成 (Base Model 回答完成) return # 结束 Base Model 的数据接收,整个 pipe 方法也结束 # Base Model 响应中不再有 thinking 状态,因此这里只需要处理 content content = self._process_content(choice["delta"]) # 提取 content if content: # 如果提取到内容,则 yield 内容 yield content except Exception as e: # Base Model Request 的 try 代码块异常处理 yield self._format_exception(e) # 格式化异常信息并 yield return finally: pass # Base Model 请求完成后的清理代码 (当前为空) async def _update_thinking_state(self, delta: dict, thinking_state: dict) -> str: """更新思考状态机(简化版)""" state_output = "" # 状态转换:未开始 -> 思考中 if thinking_state["thinking"] == -1 and delta.get("reasoning_content"): thinking_state["thinking"] = 0 state_output = "<think>" # 状态转换:思考中 -> 已回答 elif ( thinking_state["thinking"] == 0 and not delta.get("reasoning_content") and delta.get("content") ): thinking_state["thinking"] = 1 state_output = "\n</think>\n\n" return state_output def _process_content(self, delta: dict) -> str: """ 处理内容. 返回 content (最终答案内容). """ return delta.get("content", "") def _format_error(self, status_code: int, error: bytes) -> str: # 如果 error 已经是字符串,则无需 decode if isinstance(error, str): error_str = error else: error_str = error.decode(errors="ignore") try: err_msg = json.loads(error_str).get("message", error_str)[:200] except Exception as e: err_msg = error_str[:200] return json.dumps( {"error": f"HTTP {status_code}: {err_msg}"}, ensure_ascii=False ) def _format_exception(self, e: Exception) -> str: """ 格式化异常信息. 将异常类型和异常信息格式化为 JSON 字符串返回. """ err_type = type(e).__name__ # 获取异常类型名 return json.dumps( {"error": f"{err_type}: {str(e)}"}, ensure_ascii=False ) # 返回 JSON 格式异常信息