"""
title: Multi-Model Manifold Pipe
author: Feynman Liang (
[email protected])
version: 0.0.1
license: MIT
"""
import json
from pydantic import BaseModel, Field
from typing import Union, Iterator, List
from google.oauth2 import service_account
from google.auth.transport.requests import AuthorizedSession
# Set DEBUG to True to enable detailed logging
DEBUG = True
class Pipe:
class Valves(BaseModel):
PROJECT_ID: str = Field(
default="", # Set this to your Project ID
description="Your Google Cloud Project ID.",
)
LOCATION: str = Field(
default="", description="Default region for Vertex AI models."
)
SERVICE_ACCOUNT_JSON: str = Field(
default="", description="Service Account JSON"
)
def __init__(self):
self.id = "vertex_ai_streaming" # Set this to your function's ID in OpenWebUI
self.type = "manifold"
self.name = "Vertex AI Models"
self.valves = self.Valves()
self.authed_session = None
def _ensure_auth(self):
credentials_info = json.loads(self.valves.SERVICE_ACCOUNT_JSON)
self.credentials = service_account.Credentials.from_service_account_info(
credentials_info, scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
self.authed_session = AuthorizedSession(self.credentials)
def pipes(self) -> List[dict]:
# Return a list of models
return [
{"id": "gemini-1.5-pro-002", "name": "Gemini 1.5 Pro (Feynman Dev)"},
# {"id": "claude-3-5-sonnet-v2@20241022", "name": "Claude 3.5 Sonnet V2"},
# {"id": "claude-3-haiku@20240307", "name": "Claude 3 Haiku"},
]
def pipe(self, body: dict) -> Union[str, Iterator[str]]:
self._ensure_auth()
project_id = self.valves.PROJECT_ID
default_location = self.valves.LOCATION
model_id = body.get("model")
stream = body.get("stream", False)
if not model_id:
return "Error: No model specified."
if DEBUG:
print(f"Original model_id: {model_id}")
# Remove the function ID prefix if present
function_id_with_dot = f"{self.id}."
if model_id.startswith(function_id_with_dot):
model_id = model_id[len(function_id_with_dot) :]
if DEBUG:
print(
f"Removed prefix '{function_id_with_dot}' from model_id, new model_id: {model_id}"
)
else:
if DEBUG:
print("No prefix to remove from model_id.")
if DEBUG:
print(f"Using model_id: {model_id}")
try:
# Determine which model is selected and call the appropriate method
if model_id.startswith("gemini"):
return self.call_gemini_model(
body, project_id, default_location, model_id
)
elif model_id.startswith("claude"):
# Use 'us-east5' for Claude models
return self.call_claude_model(body, project_id, "us-east5", model_id)
elif model_id.startswith("llama"):
return self.call_llama_model(
body, project_id, default_location, model_id
)
else:
return f"Error: Unknown model {model_id}"
except Exception as e:
if DEBUG:
print(f"Error in pipe method: {e}")
return f"Error: {e}"
def call_gemini_model(self, body, project_id, location, model_id):
url = f"https://{location}-aiplatform.googleapis.com/v1beta1/projects/{project_id}/locations/{location}/endpoints/openapi/chat/completions"
stream = body.get("stream", False)
# Prepare the prompt messages
messages = body["messages"]
formatted_messages = self.format_messages(messages)
# Prepare request data
data = {
"model": f"google/{model_id}",
"messages": formatted_messages,
"temperature": body.get("temperature", 0.7),
"max_tokens": body.get("max_tokens", 512),
"top_p": body.get("top_p", 0.8),
"top_k": body.get("top_k", 40),
"stream": stream,
}
if DEBUG:
print("Request URL:", url)
print("Request Data:", json.dumps(data, indent=2))
if stream:
return self._stream_response(url, data)
else:
# Make the POST request
response = self.authed_session.post(url, json=data)
# Check the response
if response.status_code == 200:
response_data = response.json()
if DEBUG:
print("Response Data:", json.dumps(response_data, indent=2))
if "choices" in response_data and len(response_data["choices"]) > 0:
# Extract the generated text
generated_text = response_data["choices"][0]["message"]["content"]
return generated_text
else:
if DEBUG:
print("No choices found in the response.")
print("Response Data:", json.dumps(response_data, indent=2))
return "No response generated by the model."
else:
if DEBUG:
print(f"Error: {response.status_code}")
print("Response Headers:", response.headers)
print("Response Content:", response.text)
return f"Error: {response.status_code} {response.text}"
def _stream_response(self, url: str, data: dict) -> Iterator[str]:
"""Stream the response from the API."""
response = self.authed_session.post(url, json=data, stream=True)
if response.status_code != 200:
if DEBUG:
print(f"Error: {response.status_code}")
print("Response Headers:", response.headers)
print("Response Content:", response.text)
yield f"Error: {response.status_code} {response.text}"
return
for line in response.iter_lines():
if line:
try:
json_response = json.loads(line.decode("utf-8").lstrip("data: "))
if DEBUG:
print("Streaming chunk:", json_response)
if "choices" in json_response and len(json_response["choices"]) > 0:
delta = json_response["choices"][0].get("delta", {})
if "content" in delta:
yield delta["content"]
except json.JSONDecodeError as e:
if DEBUG:
print(f"Error decoding JSON: {e}")
continue
except Exception as e:
if DEBUG:
print(f"Error processing stream: {e}")
continue
def call_claude_model(self, body, project_id, location, model_id):
url = f"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}/locations/{location}/publishers/anthropic/models/{model_id}:predict"
stream = body.get("stream", False)
# Prepare the prompt messages
messages = body["messages"]
formatted_messages = self.format_claude_messages(messages)
# Prepare request data
data = {
"instances": [
{
"messages": formatted_messages,
"anthropic_version": "vertex-2023-10-16",
}
],
"parameters": {
"temperature": body.get("temperature", 0.7),
"max_output_tokens": body.get("max_tokens", 512),
"top_p": body.get("top_p", 0.8),
"top_k": body.get("top_k", 40),
"stream": stream,
},
}
if DEBUG:
print("Request URL:", url)
print("Request Data:", json.dumps(data, indent=2))
if stream:
return self._stream_claude_response(url, data)
else:
# Make the POST request
response = self.authed_session.post(url, json=data)
# Check the response
if response.status_code == 200:
response_data = response.json()
if DEBUG:
print("Response Data:", json.dumps(response_data, indent=2))
if (
"predictions" in response_data
and len(response_data["predictions"]) > 0
):
generated_text = response_data["predictions"][0].get("content", "")
return generated_text
else:
if DEBUG:
print("No predictions found in the response.")
print("Response Data:", json.dumps(response_data, indent=2))
return "No response generated by the model."
else:
if DEBUG:
print(f"Error: {response.status_code}")
print("Response Headers:", response.headers)
print("Response Content:", response.text)
return f"Error: {response.status_code} {response.text}"
def _stream_claude_response(self, url: str, data: dict) -> Iterator[str]:
"""Stream the response from the Claude API."""
response = self.authed_session.post(url, json=data, stream=True)
if response.status_code != 200:
if DEBUG:
print(f"Error: {response.status_code}")
print("Response Headers:", response.headers)
print("Response Content:", response.text)
yield f"Error: {response.status_code} {response.text}"
return
for line in response.iter_lines():
if line:
try:
json_response = json.loads(line.decode("utf-8").lstrip("data: "))
if DEBUG:
print("Streaming chunk:", json_response)
if (
"predictions" in json_response
and len(json_response["predictions"]) > 0
):
chunk = json_response["predictions"][0].get("content", "")
if chunk:
yield chunk
except json.JSONDecodeError as e:
if DEBUG:
print(f"Error decoding JSON: {e}")
continue
except Exception as e:
if DEBUG:
print(f"Error processing stream: {e}")
continue
def format_messages(self, messages):
formatted_messages = []
for msg in messages:
role = msg.get("role")
content = msg.get("content")
if role == "user":
formatted_messages.append({"role": "user", "content": content})
elif role == "assistant":
formatted_messages.append({"role": "assistant", "content": content})
elif role == "system":
formatted_messages.append({"role": "system", "content": content})
return formatted_messages
def format_claude_messages(self, messages):
# For Claude models, messages might need to be formatted differently
formatted_messages = []
for msg in messages:
role = msg.get("role")
content = msg.get("content")
formatted_messages.append({"role": role, "content": content})
return formatted_messages