"""
title: DeepSeek R1
author: zgccrui
reviser 1: KinglyWayne
reviser 2: drudream
reviser 3: WolfPan 2025-02-16
description: 在OpwenWebUI中显示DeepSeek R1模型的思维链 - 仅支持0.5.6及以上版本, 修复deepseek r1生成内容缺少开头<think>标签的问题,修复联网搜索和生成标题出错的问题,通过添加处理非流式传输时不添加<think>标签来实现。
version: 1.3.5
licence: MIT
"""
import asyncio
import json
import traceback
from typing import AsyncGenerator, Callable, Awaitable
import httpx
from pydantic import BaseModel, Field
THINKING_STATE_BEFORE = 1
THINKING_STATE_INNER = 2
THINKING_STATE_AFTER = 3
THINKING_TAG_START = "<think>"
THINKING_TAG_END = "</think>"
class Pipe:
class Valves(BaseModel):
DEEPSEEK_API_BASE_URL: str = Field(
default="https://api.deepseek.com/v1",
description="DeepSeek API的基础请求地址",
)
DEEPSEEK_API_KEY: str = Field(
default="", description="用于身份验证的DeepSeek API密钥,可从控制台获取"
)
DEEPSEEK_API_MODEL: str = Field(
default="deepseek-reasoner",
description="API请求的模型名称,默认为 deepseek-reasoner ",
)
LOCAL_MODEL_NAME: str = Field(
default="deepseek-reasoner",
description="本地的模型名称,默认为 deepseek-reasoner ",
)
def __init__(self):
self.valves = self.Valves()
self.data_prefix = "data:"
def pipes(self):
return [
{
"id": self.valves.LOCAL_MODEL_NAME,
"name": self.valves.DEEPSEEK_API_MODEL,
}
]
async def process_payload(self, body: dict) -> AsyncGenerator[str, None]:
"""核心处理函数"""
model_id = body["model"].split(".", 1)[-1]
payload = {**body, "model": model_id}
url = f"{self.valves.DEEPSEEK_API_BASE_URL}/chat/completions"
headers = {
"Authorization": f"Bearer {self.valves.DEEPSEEK_API_KEY}",
"Content-Type": "application/json",
}
params = dict(url=url, json=payload, headers=headers, timeout=300)
# 处理消息以防止连续的相同角色
messages = payload["messages"]
i = 0
while i < len(messages) - 1:
if messages[i]["role"] == messages[i + 1]["role"]:
alternate_role = (
"assistant" if messages[i]["role"] == "user" else "user"
)
messages.insert(
i + 1, {"role": alternate_role, "content": "[Unfinished thinking]"}
)
i += 1
async with httpx.AsyncClient(http2=True) as client:
if payload.get("stream", False):
handler = self._handle_stream
else:
handler = self._handle_non_stream
async for item in handler(client, params):
yield item
async def start_think(self):
for part in (THINKING_TAG_START, "\n"):
await asyncio.sleep(0.1)
yield part
async def end_think(self):
for part in ("\n", THINKING_TAG_END, "\n", "\n"):
await asyncio.sleep(0.1)
yield part
async def _handle_stream(
self, client: httpx.AsyncClient, params: dict
) -> AsyncGenerator[str, None]:
"""处理流式响应"""
async with client.stream("POST", **params) as response:
if response.status_code != 200:
error = await response.aread()
yield self._format_error(response.status_code, error)
return
think_tag_added = False
thinking_state = THINKING_STATE_BEFORE
async for line in response.aiter_lines():
if not line.startswith(self.data_prefix):
continue
if line.strip() == "data: [DONE]":
return
try:
data = json.loads(line[len(self.data_prefix) :])
except json.JSONDecodeError as e:
error_detail = f"JSONDecodeError, {e}: {line}"
yield self._format_error(500, error_detail)
return
choices = data.get("choices", []) or [{}]
choice = choices[0]
if choice.get("finish_reason", None):
return
delta = choice.get("delta", {})
reasoning: str = delta.get("reasoning_content", "")
content: str = delta.get("content", "")
if thinking_state == THINKING_STATE_BEFORE:
if reasoning:
reasoning = reasoning.removeprefix(THINKING_TAG_START)
thinking_state = THINKING_STATE_INNER
async for i in self.start_think():
yield i
elif (
thinking_state == THINKING_STATE_INNER and not reasoning and content
):
thinking_state = THINKING_STATE_AFTER
content.removeprefix(THINKING_TAG_END)
async for i in self.end_think():
yield i
yield reasoning or content or ""
async def _handle_non_stream(
self, client: httpx.AsyncClient, params: dict
) -> AsyncGenerator[str, None]:
"""处理非流式响应"""
response = await client.post(**params)
if response.status_code != 200:
error = response.content
yield self._format_error(response.status_code, error)
return
data = response.json()
content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
yield content
async def pipe(
self, body: dict, __event_emitter__: Callable[[dict], Awaitable[None]] = None
) -> AsyncGenerator[str, None]:
"""主处理管道(已移除缓冲)"""
try:
if not self.valves.DEEPSEEK_API_KEY:
yield "[Error] API key not configured."
return
async for result in self.process_payload(body):
yield result
except httpx.ReadTimeout:
yield "[Error] Request to DeepSeek service timed out, check network connection or try later"
except httpx.ConnectError as e:
error_msg = [
"[Error] Network connection failed between services. Possible causes:",
"1. Network firewall / connectivity issues",
"2. Incorrect API endpoint configuration",
"3. DeepSeek service temporarily unavailable",
]
yield "\n".join(error_msg)
except Exception as e:
tb_str = "".join(traceback.TracebackException.from_exception(e).format())
yield f"{type(e).__name__}: {str(e)}\n{tb_str}"
def _format_error(self, status_code: int, error: bytes | str) -> str:
if isinstance(error, bytes):
error = error.decode(errors="ignore")
try:
err_msg = json.loads(error).get("message", error)[:200]
except Exception as e:
err_msg = error[:200]
return f"HTTP {status_code}: {err_msg}"