"""
title: Portkey Manifold Pipe
author: Narendranath Gogineni
author_url: https://github.com/github/narengogi
version: 0.1.5
license: MIT
documentation: https://portkey.ai/docs/integrations/libraries/openwebui
"""
from pydantic import BaseModel, Field
from typing import Optional, Union, Generator, Iterator
import os
import json
import requests
virtual_keys = {} # ex: {"perplexity": "tooljet-perplex-3f95ca"}
class Pipe:
class Valves(BaseModel):
PORTKEY_API_BASE_URL: str = Field(
default="https://api.portkey.ai/v1", # Change this if using a self-hosted or enterprise solution
description="The base URL for Portkey API endpoints. Change this if using a self-hosted or enterprise solution",
)
PORTKEY_API_KEY: str = Field(
default="",
description="Required API key to access Portkey services.",
)
def __init__(self):
self.type = "manifold"
self.valves = self.Valves()
self.VIRTUAL_KEYS: dict = virtual_keys
def pipes(self):
# add model ids in the following format "{provider}/{model}"
return [
{
"id": "perplexity/mistral-7b-instruct",
"name": "perplexity/mistral-7b-instruct",
},
{
"id": "perplexity/llama-3.1-sonar-small-128k-online",
"name": "perplexity/llama-3.1-sonar-small-128k-online",
},
]
def pipe(self, body: dict, __user__: dict) -> Union[str, Generator, Iterator]:
if not self.valves.PORTKEY_API_KEY:
raise Exception("PORTKEY_API_KEY not provided in the valves.")
model_id = body["model"]
if model_id.startswith("portkey."):
model_id = model_id[len("portkey.") :]
model_id_parts = model_id.split("/")
model_provider = model_id_parts[0]
model_id = model_id_parts[1]
virtual_key = self.VIRTUAL_KEYS[f"{model_provider}"]
metadata = {"_user": f"{__user__.get('email')}"}
payload = {**body, "model": model_id}
headers = {
"Authorization": f"{self.valves.PORTKEY_API_KEY}",
"Content-Type": "application/json",
"accept": "application/json",
"x-portkey-provider": f"{model_provider}",
"x-portkey-virtual-key": f"{virtual_key}",
"x-portkey-metadata": json.dumps(metadata),
}
try:
r = requests.post(
url=f"{self.valves.PORTKEY_API_BASE_URL}/chat/completions",
json=payload,
headers=headers,
stream=True,
)
r.raise_for_status()
if body.get("stream", False):
return r.iter_lines()
else:
return r.json()
except Exception as e:
return f"Error: {e}"