Whitepaper
Docs
Sign In
Function
Function
pipe
v0.2.1
Perplexity Manifold Pipe
Function ID
perplexity_manifold_pipe
Creator
@codex419
Downloads
49+
Current up to date perplexity manifold with deep research and r1 from perplexity
Get
README
No README available
Function Code
Show
""" title: Perplexity Manifold Pipe author: codex419 nikolaskn, justinh-rahb, moblangeois author_url: https://github.com/open-webui funding_url: https://github.com/open-webui version: 0.2.1 license: MIT """ from pydantic import BaseModel, Field from typing import Union, Generator, Iterator from open_webui.utils.misc import pop_system_message import json import time import requests class Pipe: class Valves(BaseModel): NAME_PREFIX: str = Field( default="Perplexity/", description="The prefix applied before the model names.", ) PERPLEXITY_API_BASE_URL: str = Field( default="https://api.perplexity.ai", description="The base URL for Perplexity API endpoints.", ) PERPLEXITY_API_KEY: str = Field( default="", description="Required API key to access Perplexity services.", ) def __init__(self): self.type = "manifold" self.valves = self.Valves() def pipes(self): # List all supported models as per the Perplexity API documentation. return [ { "id": "sonar-reasoning-pro", "name": f"{self.valves.NAME_PREFIX}Sonar Reasoning Pro (127k Context)", }, { "id": "sonar-reasoning", "name": f"{self.valves.NAME_PREFIX}Sonar Reasoning (127k Context)", }, { "id": "sonar-pro", "name": f"{self.valves.NAME_PREFIX}Sonar Pro (200k Context)", }, { "id": "sonar", "name": f"{self.valves.NAME_PREFIX}Sonar (127k Context)", }, { "id": "sonar-deep-research", "name": f"{self.valves.NAME_PREFIX}Sonar Deep Research (128k Context)", }, { "id": "r1-1776", "name": f"{self.valves.NAME_PREFIX}r1-1776 (128k Context)", }, ] def pipe(self, body: dict, __user__: dict) -> Union[str, Generator, Iterator]: print(f"pipe:{__name__}") if not self.valves.PERPLEXITY_API_KEY: raise Exception("PERPLEXITY_API_KEY not provided in the valves.") headers = { "Authorization": f"Bearer {self.valves.PERPLEXITY_API_KEY}", "Content-Type": "application/json", "accept": "application/json", } system_message, messages = pop_system_message(body.get("messages", [])) system_prompt = "You are a helpful assistant." if system_message is not None: system_prompt = system_message["content"] # Remove the prefix if present. model_id = body["model"] if model_id.startswith(self.valves.NAME_PREFIX): model_id = model_id[len(self.valves.NAME_PREFIX) :] if model_id.startswith("perplexity."): model_id = model_id[len("perplexity.") :] # Validate that the model id is one of the supported models. supported_models = { "sonar-reasoning-pro", "sonar-reasoning", "sonar-pro", "sonar", "sonar-deep-research", "r1-1776", } if model_id not in supported_models: raise Exception( f"Invalid model '{body['model']}'. Use one of the supported models: sonar-reasoning-pro, sonar-reasoning, sonar-pro, sonar, sonar-deep-research, r1-1776." ) payload = { "model": model_id, "messages": [{"role": "system", "content": system_prompt}, *messages], "stream": body.get("stream", True), "return_citations": True, "return_images": True, } # Forward additional parameters if provided. for param in ["temperature", "top_p", "max_tokens"]: if param in body: payload[param] = body[param] url = f"{self.valves.PERPLEXITY_API_BASE_URL}/chat/completions" try: if body.get("stream", False): return self.stream_response(url, headers, payload) else: return self.non_stream_response(url, headers, payload) except requests.exceptions.RequestException as e: print(f"Request failed: {e}") return f"Error: Request failed: {e}" except Exception as e: print(f"Error in pipe method: {e}") return f"Error: {e}" def stream_response(self, url, headers, payload) -> Generator: try: with requests.post( url, headers=headers, json=payload, stream=True, timeout=(3.05, 60) ) as response: if response.status_code != 200: raise Exception( f"HTTP Error {response.status_code}: {response.text}" ) data = None # iter_lines with decode_unicode=True provides each line as a unicode string. for line in response.iter_lines(decode_unicode=True): if line: # Check for termination marker. if line.strip() == "data: [DONE]": break if line.startswith("data: "): try: data = json.loads(line[6:]) # For streaming responses, yield content found under delta. yield data["choices"][0]["delta"].get("content", "") # Brief pause to help rate-limit responses. time.sleep(0.01) except json.JSONDecodeError: print(f"Failed to parse JSON: {line}") except KeyError as e: print(f"Unexpected data structure: {e}") print(f"Full data: {data}") # Append citations if available from the last valid data. if data is not None: citations = data.get("citations", []) if citations: citations_string = "\n".join( [f"[{i+1}] {cite}" for i, cite in enumerate(citations)] ) yield "\n\n" + citations_string except requests.exceptions.RequestException as e: print(f"Request failed: {e}") yield f"Error: Request failed: {e}" except Exception as e: print(f"General error in stream_response method: {e}") yield f"Error: {e}" def non_stream_response(self, url, headers, payload) -> str: try: response = requests.post( url, headers=headers, json=payload, timeout=(3.05, 60) ) if response.status_code != 200: raise Exception(f"HTTP Error {response.status_code}: {response.text}") res = response.json() citations = res.get("citations", []) citations_string = ( "\n".join([f"[{i+1}] {cite}" for i, cite in enumerate(citations)]) if citations else "" ) content = res["choices"][0]["message"].get("content", "") return content + ("\n\n" + citations_string if citations_string else "") except requests.exceptions.RequestException as e: print(f"Failed non-stream request: {e}") return f"Error: {e}"