"""
title: Gemini Manifold Pipe
author: Your Name
version: 0.2.0
license: MIT
"""
import json
from pydantic import BaseModel, Field
from typing import Union, Iterator
from google.oauth2 import service_account
from google.auth.transport.requests import AuthorizedSession
# Set DEBUG to True to enable detailed logging
DEBUG = True
# Replace this string with your actual service account JSON content
SERVICE_ACCOUNT_JSON = r"""
{
}
"""
class Pipe:
class Valves(BaseModel):
PROJECT_ID: str = Field(
default="",
description="Your Google Cloud Project ID.",
)
LOCATION: str = Field(
default="us-central1",
description="Region for Vertex AI Generative AI models.",
)
MODEL_ID: str = Field(
default="gemini-1.5-pro-002", description="Model ID to use."
)
def __init__(self):
self.id = "gemini_manifold"
self.type = "manifold"
self.name = "Gemini Manifold Pipe"
self.valves = self.Valves()
# Load credentials from the hardcoded JSON
credentials_info = json.loads(SERVICE_ACCOUNT_JSON)
self.credentials = service_account.Credentials.from_service_account_info(
credentials_info, scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
self.authed_session = AuthorizedSession(self.credentials)
def get_models(self):
# Return a list with the model specified in the valves
return [
{
"id": self.valves.MODEL_ID,
"name": f"Vertex AI Model: {self.valves.MODEL_ID}",
}
]
def pipe(self, body: dict) -> Union[str, Iterator[str]]:
project_id = self.valves.PROJECT_ID
location = self.valves.LOCATION
model_id = self.valves.MODEL_ID
try:
# Endpoint URL for Generative AI models
url = f"https://{location}-aiplatform.googleapis.com/v1beta1/projects/{project_id}/locations/{location}/endpoints/openapi/chat/completions"
# Prepare the prompt messages
messages = body["messages"]
formatted_messages = self.format_messages(messages)
# Prepare request data
data = {
"model": f"google/{model_id}",
"messages": formatted_messages,
"temperature": body.get("temperature", 0.7),
"max_tokens": body.get("max_tokens", 512),
"top_p": body.get("top_p", 0.8),
"top_k": body.get("top_k", 40),
# Add other parameters if needed
}
if DEBUG:
print("Request URL:", url)
print("Request Data:", json.dumps(data, indent=2))
# Make the POST request
response = self.authed_session.post(url, json=data)
# Check the response
if response.status_code == 200:
response_data = response.json()
if DEBUG:
print("Response Data:", json.dumps(response_data, indent=2))
if "choices" in response_data and len(response_data["choices"]) > 0:
# Extract the generated text
generated_text = response_data["choices"][0]["message"]["content"]
return generated_text
else:
if DEBUG:
print("No choices found in the response.")
print("Response Data:", json.dumps(response_data, indent=2))
return "No response generated by the model."
else:
if DEBUG:
print(f"Error: {response.status_code}")
print("Response Content:", response.text)
return f"Error: {response.status_code} {response.text}"
except Exception as e:
if DEBUG:
print(f"Error in pipe method: {e}")
return f"Error: {e}"
def format_messages(self, messages):
formatted_messages = []
for msg in messages:
role = msg.get("role")
content = msg.get("content")
if role == "user":
formatted_messages.append({"role": "user", "content": content})
elif role == "assistant":
formatted_messages.append({"role": "assistant", "content": content})
elif role == "system":
formatted_messages.append({"role": "system", "content": content})
return formatted_messages