We're Hiring!
Whitepaper
Docs
Sign In
Function
Function
pipe
v2.1
perplexity pipe
Last Updated
4 months ago
Created
4 months ago
Function ID
perplexity_pipe
Creator
@neuromechanist
Downloads
97+
Get
Sponsored by Open WebUI Inc.
We are hiring!
Shape the way humanity engages with
intelligence
.
Description
Perplexity pipe with beautiful references
README
Function Code
Show
""" title: Perplexity Function (Simplified) author: Seyed Yahya Shirazi author_url: neuromechanist.github.io date: 2024-11-19 version: 2.1 license: MIT description: A simplified function for generating text using the Perplexity API with citations. requirements: aiohttp, pydantic environment_variables: PERPLEXITY_API_KEY """ import os import json import asyncio from typing import List, Union, Dict, Optional, AsyncIterator from pydantic import BaseModel import aiohttp from open_webui.utils.misc import pop_system_message class Pipe: class Valves(BaseModel): PERPLEXITY_API_KEY: str = "" def __init__(self): self.type = "manifold" self.id = "perplexity_simple" self.valves = self.Valves() def pipes(self) -> List[dict]: return [ {"id": "perplexity/sonar-reasoning", "name": "Perplexity Sonar Reasoning"}, {"id": "perplexity/sonar-pro", "name": "Perplexity Sonar Pro"}, {"id": "perplexity/sonar", "name": "Perplexity Sonar"}, ] def format_response_with_citations(self, content: str, citations: List[str]) -> str: """Format the response by appending citations at the end.""" if not citations: return content # Content already contains [1], [2], etc. references formatted_response = content + "\n\nReferences:\n" for i, url in enumerate(citations, 1): formatted_response += f"[{i}] {url}\n" return formatted_response async def pipe( self, body: Dict, __event_emitter__=None ) -> Union[str, AsyncIterator[str]]: if not self.valves.PERPLEXITY_API_KEY: error_msg = "Error: PERPLEXITY_API_KEY is required" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) return error_msg try: system_message, messages = pop_system_message(body["messages"]) # Extract actual model name from "perplexity/model-name" format model_id = body.get("model", "perplexity/sonar").split("/")[-1] # Add system message as the first message if present processed_messages = [] if system_message: processed_messages.append( {"role": "system", "content": str(system_message)} ) # Process remaining messages for message in messages: content = ( message["content"][0]["text"] if isinstance(message["content"], list) else message["content"] ) processed_messages.append({"role": message["role"], "content": content}) # Prepare payload with Perplexity-specific parameters payload = { "model": model_id, "messages": processed_messages, "max_tokens": body.get("max_tokens", None), "temperature": body.get("temperature", 0.2), "top_p": body.get("top_p", 0.9), "top_k": body.get("top_k", 0), "stream": body.get("stream", False), "presence_penalty": body.get("presence_penalty", 0), "frequency_penalty": body.get("frequency_penalty", 1), } # Add Perplexity-specific features if specified for key in [ "search_domain_filter", "return_images", "return_related_questions", "search_recency_filter", ]: if key in body: payload[key] = body[key] headers = { "Authorization": f"Bearer {self.valves.PERPLEXITY_API_KEY}", "Content-Type": "application/json", } if body.get("stream", False): return self._stream_response(headers, payload, __event_emitter__) else: return await self._get_completion(headers, payload, __event_emitter__) except Exception as e: error_msg = f"Error: {str(e)}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) return error_msg async def _stream_response( self, headers: dict, payload: dict, __event_emitter__=None ) -> AsyncIterator[str]: try: async with aiohttp.ClientSession() as session: timeout = aiohttp.ClientTimeout(total=300) async with session.post( "https://api.perplexity.ai/chat/completions", headers=headers, json=payload, timeout=timeout, ) as response: if response.status != 200: error_text = await response.text() error_msg = f"Error: HTTP {response.status}: {error_text}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) yield error_msg return citations = None async for line in response.content: if line and line.startswith(b"data: "): try: line_text = line[6:].decode("utf-8").strip() if line_text == "[DONE]": break data = json.loads(line_text) if "citations" in data: citations = data["citations"] if data["choices"][0]["finish_reason"] is None: content = data["choices"][0]["delta"]["content"] yield content elif ( data["choices"][0]["finish_reason"] == "stop" and citations ): yield "\n\nReferences:\n" + "\n".join( f"[{i}] {url}" for i, url in enumerate(citations, 1) ) break except json.JSONDecodeError: continue except KeyError: continue except Exception as e: error_msg = f"Stream error: {str(e)}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) yield error_msg async def _get_completion( self, headers: dict, payload: dict, __event_emitter__=None ) -> str: try: async with aiohttp.ClientSession() as session: timeout = aiohttp.ClientTimeout(total=300) async with session.post( "https://api.perplexity.ai/chat/completions", headers=headers, json=payload, timeout=timeout, ) as response: if response.status != 200: error_text = await response.text() error_msg = f"Error: HTTP {response.status}: {error_text}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) return error_msg result = await response.json() content = result["choices"][0]["message"]["content"] citations = result.get("citations", []) return self.format_response_with_citations(content, citations) except Exception as e: error_msg = f"Completion error: {str(e)}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) return error_msg