"""
title: Perplexity 联网查询
author: konbakuyomu
author_url: https://linux.do/t/topic/395819
description: Use the most powerful Perplexity API for online queries.(使用目前最强大的 Perplexity 的api进行联网查询)
version: 1.0.4
licence: MIT
"""
import json
import httpx
import re
import traceback
import asyncio
from typing import AsyncGenerator, Callable, Awaitable
from pydantic import BaseModel, Field
from urllib.parse import urlparse
TITLE_REGEX = re.compile(r"<title>(.*?)</title>", re.IGNORECASE | re.DOTALL)
def fetch_page_title(url: str, timeout: float = 10.0) -> str:
try:
resp = httpx.get(url, timeout=timeout)
if resp.status_code != 200:
return ""
html_content = resp.text
match = TITLE_REGEX.search(html_content)
if match:
return match.group(1).strip()
except Exception as e:
print(f"请求或解析页面时出错: {e}")
return ""
def extract_domain(url: str) -> str:
try:
parsed = urlparse(url)
return parsed.netloc or ""
except:
return ""
class Pipe:
class Valves(BaseModel):
PERPLEXITY_API_BASE_URL: str = Field(
default="https://api.perplexity.ai", description="Perplexity 的基础请求地址"
)
PERPLEXITY_API_TOKEN: str = Field(
default="", description="用于访问 Perplexity 的 Bearer Token"
)
PERPLEXITY_MODEL: str = Field(
default="sonar-reasoning",
description=(
"可选的模型名称,必须是以下之一:"
"r1-1776, "
"sonar-reasoning-pro, sonar-reasoning, sonar-pro, sonar, "
"llama-3.1-sonar-small-128k-online, "
"llama-3.1-sonar-large-128k-online, "
"llama-3.1-sonar-huge-128k-online"
),
)
VALID_MODELS = [
"r1-1776",
"sonar-reasoning-pro",
"sonar-reasoning",
"sonar-pro",
"sonar",
"llama-3.1-sonar-small-128k-online",
"llama-3.1-sonar-large-128k-online",
"llama-3.1-sonar-huge-128k-online",
]
def __init__(self):
self.valves = self.Valves()
self.data_prefix = "data: "
self.emitter = None
self.current_citations = []
# 用于跟踪思维链状态:-1 未开始,0 思考中,1 思考结束
self.thinking_state = {"thinking": -1}
def pipes(self):
return [
{
"id": self.valves.PERPLEXITY_API_BASE_URL,
"name": self.valves.PERPLEXITY_MODEL,
}
]
async def pipe(
self, body: dict, __event_emitter__: Callable[[dict], Awaitable[None]] = None
) -> AsyncGenerator[str, None]:
self.emitter = __event_emitter__
self.current_citations = []
# (A) 检查 TOKEN
if not self.valves.PERPLEXITY_API_TOKEN:
err_msg = "未配置 Perplexity API Token"
yield json.dumps({"error": err_msg}, ensure_ascii=False)
return
# (B) 检查模型是否合法
if self.valves.PERPLEXITY_MODEL not in self.VALID_MODELS:
err_msg = (
f"模型 '{self.valves.PERPLEXITY_MODEL}' 不在可选范围:"
f"{', '.join(self.VALID_MODELS)}"
)
yield json.dumps({"error": err_msg}, ensure_ascii=False)
return
# (C) 清理用户输入中的图片链接
self._inject_image_if_any(body)
# (D) 消息预处理:避免连续相同角色,插入占位消息
self._process_messages(body)
# 组装 payload
payload = {**body}
payload["model"] = self.valves.PERPLEXITY_MODEL
if "stream" not in payload:
payload["stream"] = True
headers = {
"Authorization": f"Bearer {self.valves.PERPLEXITY_API_TOKEN}",
"Content-Type": "application/json",
}
url = f"{self.valves.PERPLEXITY_API_BASE_URL}/chat/completions"
try:
async with httpx.AsyncClient(http2=True) as client:
async with client.stream(
"POST", url, json=payload, headers=headers, timeout=120
) as response:
if response.status_code != 200:
error_content = await response.aread()
err_str = self._format_error(
response.status_code, error_content
)
yield err_str
return
# 状态:连接成功,开始生成回答
if self.emitter:
await self.emitter(
{
"type": "status",
"data": {
"description": "AI回答正在生成中……",
"done": False,
},
}
)
first_chunk = True
# 逐行读取 SSE
async for line in response.aiter_lines():
if not line.startswith(self.data_prefix):
continue
data_str = line[len(self.data_prefix) :].strip()
if not data_str:
continue
try:
data = json.loads(data_str)
except:
continue
if "citations" in data:
self.current_citations = data["citations"]
choice = data.get("choices", [{}])[0]
delta = choice.get("delta", {})
# 思维链状态更新(如遇到 reasoning_content 则插入思考标记)
state_output = await self._update_thinking_state(
delta, self.thinking_state
)
if state_output:
yield state_output
delta_text = delta.get("content", "")
if delta_text:
# 如果是第一块有效回答,则更新状态
if first_chunk and self.emitter:
await self.emitter(
{
"type": "status",
"data": {
"description": "Perplexity 联网查询中,请稍候…",
"done": False,
},
}
)
first_chunk = False
# 保持对引用的转换处理
delta_text = self._transform_references(
delta_text, self.current_citations
)
yield delta_text
# 状态:获取参考网站标题
if self.emitter:
await self.emitter(
{
"type": "status",
"data": {
"description": "正在获取参考网站标题,请稍候…",
"done": False,
},
}
)
# 输出参考网址(附网页标题)
if self.current_citations:
yield self._format_references(self.current_citations)
# 状态:完成
if self.emitter:
await self.emitter(
{
"type": "status",
"data": {
"description": "✅ Perplexity生成结束",
"done": True,
},
}
)
# 重置思维链状态
self.thinking_state = {"thinking": -1}
except Exception as e:
traceback.print_exc()
err_str = self._format_exception(e)
yield err_str
def _inject_image_if_any(self, payload: dict) -> None:
"""清除用户消息中的图片链接"""
messages = payload.get("messages", [])
if not messages:
return
last_msg = messages[-1]
if last_msg.get("role") != "user":
return
content_str = last_msg.get("content", "")
if not isinstance(content_str, str):
return
cleaned_text = re.sub(
r"(https?://[^\s]+?\.(?:png|jpg|jpeg|gif|bmp|tiff|webp))",
"",
content_str,
flags=re.IGNORECASE,
).strip()
last_msg["content"] = cleaned_text
def _process_messages(self, payload: dict) -> None:
"""避免连续相同角色消息,插入占位消息"""
messages = payload.get("messages", [])
i = 0
while i < len(messages) - 1:
if messages[i]["role"] == messages[i + 1]["role"]:
alternate_role = (
"assistant" if messages[i]["role"] == "user" else "user"
)
messages.insert(
i + 1, {"role": alternate_role, "content": "[Unfinished thinking]"}
)
i += 1 # 跳过插入的占位消息
i += 1
async def _update_thinking_state(self, delta: dict, thinking_state: dict) -> str:
"""更新思维链状态,依据 delta 中是否存在 reasoning_content 判断"""
state_output = ""
# 当开始接收到 reasoning_content 时,认为进入思考状态
if thinking_state["thinking"] == -1 and delta.get("reasoning_content"):
thinking_state["thinking"] = 0
state_output = "\n\n"
return state_output
def _transform_references(self, text: str, citations: list) -> str:
def _replace_one(m: re.Match) -> str:
idx_str = m.group(1)
idx = int(idx_str)
if 1 <= idx <= len(citations):
url = citations[idx - 1]
return f"[[{idx_str}]]({url})"
else:
return f"[[{idx_str}]]"
return re.sub(r"\[(\d+)\]", _replace_one, text)
def _format_references(self, citations: list) -> str:
if not citations:
return ""
lines = []
lines.append("\n\n<details>\n<summary>参考网站</summary>")
for i, url in enumerate(citations, 1):
page_title = fetch_page_title(url)
if page_title:
lines.append(f"{i}: [{page_title}]({url})")
else:
domain = extract_domain(url)
if not domain:
domain = "unknown"
lines.append(f"{i}: [新闻来源: {domain}]({url})")
lines.append("</details>")
return "\n".join(lines)
def _format_error(self, status_code: int, error: bytes) -> str:
try:
err_msg = json.loads(error).get("message", error.decode(errors="ignore"))[
:200
]
except:
err_msg = error.decode(errors="ignore")[:200]
return json.dumps(
{"error": f"HTTP {status_code}: {err_msg}"}, ensure_ascii=False
)
def _format_exception(self, e: Exception) -> str:
err_type = type(e).__name__
return json.dumps({"error": f"{err_type}: {str(e)}"}, ensure_ascii=False)