Whitepaper
Docs
Sign In
Function
Function
pipe
v0.2.0
Perplexity with Titled Citations and Thinking
Function ID
perplexity_with_titled_citations_and_thinking
Creator
@rosselliot
Downloads
50+
Perplexity with Titled Citations
Get
README
No README available
Function Code
Show
""" title: Perplexity with Titled Citations and Thinking version: 0.2.0 license: MIT Includes all 5 Sonar models from Perplexity. Deep Research does not return citations. I'm not sure why, and there's very little documentation from Perplexity to help figure it out. Chats from the other Sonar models will include citations at the end. Citations will be linked and include the page title, so they look nice. Thinking for the reasoning models is also included and displays natively. 0.2.0 Removed the "Return_images: true" because it doesn't work for all usage tiers and is unnecessary for this function. """ import json import logging from typing import Generator, Iterator, Union import asyncio import aiohttp from open_webui.utils.misc import pop_system_message from pydantic import BaseModel, Field from bs4 import BeautifulSoup logger = logging.getLogger(__name__) class Pipe: class Valves(BaseModel): NAME_PREFIX: str = Field( default="perplexity/", description="The prefix applied before the model names.", ) PERPLEXITY_API_BASE_URL: str = Field( default="https://api.perplexity.ai", description="The base URL for Perplexity API endpoints.", ) PERPLEXITY_API_KEY: str = Field( default="", description="Required API key to access Perplexity services.", ) def __init__(self): self.type = "manifold" self.valves = self.Valves() def pipes(self): return [ { "id": "perplexity/sonar", "name": f"{self.valves.NAME_PREFIX}Sonar", }, { "id": "perplexity/sonar-pro", "name": f"{self.valves.NAME_PREFIX}Sonar Pro", }, { "id": "perplexity/sonar-reasoning", "name": f"{self.valves.NAME_PREFIX}Sonar Reasoning", }, { "id": "perplexity/sonar-reasoning-pro", "name": f"{self.valves.NAME_PREFIX}Sonar Reasoning Pro", }, { "id": "perplexity/sonar-deep-research", "name": f"{self.valves.NAME_PREFIX}Sonar Deep Research", }, ] async def pipe(self, body: dict, __user__: dict) -> Union[str, Generator, Iterator]: logger.debug(f"pipe:{__name__}") if not self.valves.PERPLEXITY_API_KEY: raise Exception("PERPLEXITY_API_KEY not provided in the valves.") headers = { "Authorization": f"Bearer {self.valves.PERPLEXITY_API_KEY}", "Content-Type": "application/json", "accept": "application/json", } system_message, messages = pop_system_message(body.get("messages", [])) system_prompt = "You are a helpful assistant." if system_message is not None: system_prompt = system_message["content"] model_id = body["model"] # Remove any custom prefixes that might be added by your implementation if model_id.startswith("perplexity_citations_manifold."): model_id = model_id[len("perplexity_citations_manifold.") :] elif model_id.startswith("perplexity_with_titled_citations."): model_id = model_id[len("perplexity_with_titled_citations.") :] # Remove the NAME_PREFIX from the model ID if it exists if model_id.startswith(self.valves.NAME_PREFIX): model_id = model_id[len(self.valves.NAME_PREFIX) :] payload = { "model": model_id, "messages": [{"role": "system", "content": system_prompt}, *messages], "stream": body.get("stream", True), "return_citations": True, } url = f"{self.valves.PERPLEXITY_API_BASE_URL}/chat/completions" try: if body.get("stream", False): return self.stream_response(url, headers, payload) else: return await self.non_stream_response(url, headers, payload) except aiohttp.ClientError as e: logger.error(f"Request failed: {e}") return f"Error: Request failed: {e}" except Exception as e: logger.error(f"Error in pipe method: {e}") return f"Error: {e}" async def fetch_page_title(self, url: str, session: aiohttp.ClientSession) -> str: """Fetch webpage title for a given URL.""" try: timeout = aiohttp.ClientTimeout( total=5 ) # 5 second timeout to avoid slowing down responses async with session.get( url, timeout=timeout, allow_redirects=True ) as response: if response.status != 200: return self.extract_domain(url) # Fallback to domain html = await response.text() soup = BeautifulSoup(html, "html.parser") title = soup.title if title and title.string: # Clean up and truncate title if needed clean_title = title.string.strip() if len(clean_title) > 100: # Limit title length clean_title = clean_title[:97] + "..." return clean_title else: return self.extract_domain(url) except (aiohttp.ClientError, asyncio.TimeoutError, UnicodeDecodeError) as e: logger.debug(f"Could not fetch title for {url}: {e}") return self.extract_domain(url) # Fallback to domain def extract_domain(self, url: str) -> str: """Extract domain from URL.""" try: from urllib.parse import urlparse return urlparse(url).netloc except: return url async def format_citations_with_titles( self, citations: list, session: aiohttp.ClientSession ) -> str: """Format citations in markdown with page titles.""" if not citations: return "" # Fetch titles for all citations concurrently titles = await asyncio.gather( *[self.fetch_page_title(url, session) for url in citations] ) citations_list = [ f"[\[{i+1}\] {title}]({url})" for i, (url, title) in enumerate(zip(citations, titles)) ] return "\n\n**References:**\n" + "\n".join(citations_list) async def stream_response(self, url, headers, payload): try: async with aiohttp.ClientSession() as session: async with session.post( url, headers=headers, json=payload, timeout=aiohttp.ClientTimeout(total=600), ) as response: if response.status != 200: text = await response.text() raise Exception(f"HTTP Error {response.status}: {text}") data = None citations = None async for line in response.content: line = line.decode("utf-8").strip() if line and line.startswith("data: "): try: data = json.loads(line[6:]) if citations is None and "citations" in data: citations = data["citations"] yield data["choices"][0]["delta"]["content"] except json.JSONDecodeError: logger.error(f"Failed to parse JSON: {line}") except KeyError as e: logger.error(f"Unexpected data structure: {e}") logger.error(f"Full data: {data}") if citations: citations_text = await self.format_citations_with_titles( citations, session ) yield citations_text except aiohttp.ClientError as e: logger.error(f"Request failed: {e}") yield f"Error: Request failed: {e}" except Exception as e: logger.error(f"General error in stream_response method: {e}", exc_info=True) yield f"Error: {str(e)}" async def non_stream_response(self, url, headers, payload): try: async with aiohttp.ClientSession() as session: async with session.post( url, headers=headers, json=payload, timeout=aiohttp.ClientTimeout(total=60), ) as response: if response.status != 200: text = await response.text() raise Exception(f"HTTP Error {response.status}: {text}") res = await response.json() citations = res.get("citations", []) content = res["choices"][0]["message"]["content"] if citations: citations_text = await self.format_citations_with_titles( citations, session ) return content + citations_text return content except aiohttp.ClientError as e: logger.error(f"Failed non-stream request: {e}") return f"Error: {e}" except Exception as e: logger.error(f"General error in non_stream_response method: {e}") return f"Error: {e}"