"""
title: Anthropic Claude Models via Google Vertex AI
author: sanjay
date: 2025-05-24
version: 1.0.0
description: Access all Anthropic Claude models (including Claude 4 Opus & Sonnet) through Google Vertex AI with enterprise-grade security, extended thinking capabilities, and vision support
requirements: google-auth, google-cloud-aiplatform
environment_variables: GOOGLE_APPLICATION_CREDENTIALS, VERTEX_AI_PROJECT_ID, VERTEX_AI_LOCATION, SERVICE_ACCOUNT_JSON
"""
import os
import json
from pydantic import BaseModel, Field
from typing import Union, Iterator, List
from google.oauth2 import service_account
from google.auth.transport.requests import AuthorizedSession
from google.auth import default
# =============================================================================
# CONFIGURATION SECTION - UPDATE THESE VALUES FOR YOUR SETUP
# =============================================================================
# Enable detailed logging for troubleshooting (set to False for production)
DEBUG_MODE = False
# Your Google Cloud Project ID where Vertex AI is enabled
# You can also set this via environment variable: VERTEX_AI_PROJECT_ID
DEFAULT_PROJECT_ID = "" # Example: "my-company-ai-project"
# Default region for Vertex AI (us-east5 recommended for Claude models)
# You can also set this via environment variable: VERTEX_AI_LOCATION
DEFAULT_LOCATION = "us-east5"
# =============================================================================
# MAIN FUNCTION CODE - NO NEED TO MODIFY BELOW THIS LINE
# =============================================================================
class Pipe:
"""
Vertex AI Claude Models Pipeline
This pipeline provides access to all Anthropic Claude models through Google Vertex AI,
including the latest Claude 4 Opus and Sonnet models with extended thinking capabilities.
"""
class Valves(BaseModel):
"""Configuration valves for the pipeline - these can be adjusted in the UI"""
PROJECT_ID: str = Field(
default=DEFAULT_PROJECT_ID,
description="Google Cloud Project ID (leave empty to use VERTEX_AI_PROJECT_ID environment variable)",
)
LOCATION: str = Field(
default=DEFAULT_LOCATION,
description="Google Cloud region for Vertex AI (us-east5 recommended for Claude models)"
)
SERVICE_ACCOUNT_JSON: str = Field(
default="",
description="Service Account JSON credentials (optional - leave empty to use environment authentication)",
)
MAX_OUTPUT_TOKENS: int = Field(
default=4096,
description="Maximum tokens in model response (1-200000)"
)
TEMPERATURE: float = Field(
default=0.7,
description="Response creativity/randomness (0.0=deterministic, 1.0=very creative)"
)
ENABLE_THINKING: bool = Field(
default=False,
description="Enable Claude's extended thinking process (Claude 4 & 3.7 models only)"
)
THINKING_TOKEN_BUDGET: int = Field(
default=16000,
description="Maximum tokens Claude can use for thinking (1024-32000)"
)
def __init__(self):
"""Initialize the pipeline with secure authentication and model definitions"""
self.pipeline_id = "claude_4_via_vertex_ai"
self.type = "manifold"
self.name = "Vertex AI Claude"
self.valves = self.Valves()
# Authentication components - initialized on startup
self.google_credentials = None
self.authenticated_session = None
async def on_startup(self):
"""
Initialize authentication when the pipeline starts
Called automatically by Open WebUI
"""
await self._setup_authentication()
async def on_valves_updated(self):
"""
Re-authenticate when configuration is changed
Called automatically when user updates valves
"""
await self._setup_authentication()
async def _setup_authentication(self):
"""
Setup Google Cloud authentication using multiple fallback methods
Priority: Valves JSON > Environment JSON > Credentials File > Default ADC
"""
try:
# Method 1: Use service account JSON from valves configuration
if self.valves.SERVICE_ACCOUNT_JSON.strip():
if DEBUG_MODE:
print("๐ Using service account JSON from pipeline configuration")
credentials_data = json.loads(self.valves.SERVICE_ACCOUNT_JSON)
self.google_credentials = service_account.Credentials.from_service_account_info(
credentials_data,
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
# Method 2: Use SERVICE_ACCOUNT_JSON environment variable
elif os.getenv("SERVICE_ACCOUNT_JSON"):
if DEBUG_MODE:
print("๐ Using service account JSON from SERVICE_ACCOUNT_JSON environment variable")
credentials_data = json.loads(os.getenv("SERVICE_ACCOUNT_JSON"))
self.google_credentials = service_account.Credentials.from_service_account_info(
credentials_data,
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
# Method 3: Use GOOGLE_APPLICATION_CREDENTIALS file path
elif os.getenv("GOOGLE_APPLICATION_CREDENTIALS"):
if DEBUG_MODE:
print("๐ Using service account file from GOOGLE_APPLICATION_CREDENTIALS")
self.google_credentials = service_account.Credentials.from_service_account_file(
os.getenv("GOOGLE_APPLICATION_CREDENTIALS"),
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
# Method 4: Use Application Default Credentials (for Google Cloud environments)
else:
if DEBUG_MODE:
print("๐ Using Application Default Credentials (ADC)")
self.google_credentials, _ = default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
# Create authenticated HTTP session for API calls
self.authenticated_session = AuthorizedSession(self.google_credentials)
if DEBUG_MODE:
print("โ
Google Cloud authentication successful")
except Exception as authentication_error:
error_message = f"โ Authentication failed: {authentication_error}"
if DEBUG_MODE:
print(error_message)
raise Exception(error_message)
def pipes(self) -> List[dict]:
"""
Return the list of available Claude models
These will appear as selectable models in the Open WebUI interface
"""
return [
{
"id": "claude-opus-4@20250514",
"name": "Claude Opus 4 (Most Powerful - Best for Complex Tasks)"
},
{
"id": "claude-sonnet-4@20250514",
"name": "Claude Sonnet 4 (Balanced Performance & Cost)"
},
{
"id": "claude-3-7-sonnet@20250219",
"name": "Claude 3.7 Sonnet (Extended Thinking Capabilities)"
},
{
"id": "claude-3-5-sonnet-v2@20241022",
"name": "Claude 3.5 Sonnet V2 (Advanced Reasoning)"
},
{
"id": "claude-3-5-haiku@20241022",
"name": "Claude 3.5 Haiku (Fastest Response)"
},
{
"id": "claude-3-opus@20240229",
"name": "Claude 3 Opus (Previous Generation Flagship)"
},
{
"id": "claude-3-haiku@20240307",
"name": "Claude 3 Haiku (Previous Generation Fast)"
},
]
def pipe(self, body: dict) -> Union[str, Iterator[str]]:
"""
Main pipeline function - processes chat requests and returns Claude responses
Called by Open WebUI for each user message
"""
# Get Google Cloud project configuration
project_id = self.valves.PROJECT_ID or os.getenv("VERTEX_AI_PROJECT_ID")
if not project_id:
return "โ Error: Google Cloud Project ID not configured. Please set PROJECT_ID in pipeline settings or set VERTEX_AI_PROJECT_ID environment variable."
# Get region configuration
region = self.valves.LOCATION or os.getenv("VERTEX_AI_LOCATION", DEFAULT_LOCATION)
# Extract the model ID from the request
requested_model = body.get("model")
if not requested_model:
return "โ Error: No model specified in request."
if DEBUG_MODE:
print(f"๐จ Original model ID from request: {requested_model}")
# Clean the model ID by removing any pipeline prefixes
clean_model_id = self._extract_clean_model_id(requested_model)
if DEBUG_MODE:
print(f"๐งน Cleaned model ID: {clean_model_id}")
# Validate that we have a proper Claude model ID
if not clean_model_id.startswith("claude-"):
return f"โ Error: Invalid Claude model ID: {clean_model_id}"
# Ensure we have authentication set up
if not self.authenticated_session:
try:
# Set up authentication synchronously if needed
import asyncio
event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(event_loop)
event_loop.run_until_complete(self._setup_authentication())
event_loop.close()
except Exception as auth_error:
return f"โ Authentication Error: {auth_error}"
try:
# Make the API call to Claude via Vertex AI
return self._call_claude_via_vertex_ai(body, project_id, region, clean_model_id)
except Exception as pipeline_error:
if DEBUG_MODE:
print(f"๐ฅ Pipeline error: {pipeline_error}")
return f"โ Error: {pipeline_error}"
def _extract_clean_model_id(self, raw_model_id: str) -> str:
"""
Remove pipeline prefixes from model ID to get clean Claude model identifier
Handles various prefix formats that Open WebUI might add
"""
# List of possible prefixes that need to be removed
possible_prefixes = [
f"{self.pipeline_id}.", # vertex_claude.
"vertex_ai.", # vertex_ai.
"vertex_claude.", # explicit prefix
"anthropic.", # anthropic.
]
cleaned_id = raw_model_id
for prefix in possible_prefixes:
if cleaned_id.startswith(prefix):
cleaned_id = cleaned_id[len(prefix):]
if DEBUG_MODE:
print(f"๐ง Removed prefix '{prefix}' from model ID")
break
# Remove any remaining leading dots
cleaned_id = cleaned_id.lstrip('.')
return cleaned_id
def _call_claude_via_vertex_ai(self, request_body, project_id, region, claude_model_id):
"""
Make API call to Claude model through Google Vertex AI
Handles message formatting, authentication, and response processing
"""
# Construct the Vertex AI endpoint URL for Claude
vertex_api_endpoint = (
f"https://{region}-aiplatform.googleapis.com/v1/"
f"projects/{project_id}/locations/{region}/"
f"publishers/anthropic/models/{claude_model_id}:streamRawPredict"
)
# Process and format the conversation messages
conversation_messages = request_body.get("messages", [])
system_prompt, formatted_messages = self._process_conversation_messages(conversation_messages)
# Build the request payload for Claude API
claude_request_payload = {
"anthropic_version": "vertex-2023-10-16", # Required version for Vertex AI
"messages": formatted_messages,
"max_tokens": min(
request_body.get("max_tokens", self.valves.MAX_OUTPUT_TOKENS),
200000 # Claude's maximum token limit
),
"temperature": request_body.get("temperature", self.valves.TEMPERATURE),
"stream": False # Using non-streaming for reliability
}
# Add system prompt if present
if system_prompt:
claude_request_payload["system"] = system_prompt
# Add optional parameters if specified
if request_body.get("top_p") is not None:
claude_request_payload["top_p"] = float(request_body["top_p"])
if request_body.get("top_k") is not None:
claude_request_payload["top_k"] = int(request_body["top_k"])
# Handle stop sequences
if request_body.get("stop"):
stop_sequences = request_body["stop"]
if isinstance(stop_sequences, str):
stop_sequences = [stop_sequences]
claude_request_payload["stop_sequences"] = stop_sequences
# Enable extended thinking for supported models
if self._model_supports_thinking(claude_model_id) and self.valves.ENABLE_THINKING:
claude_request_payload["thinking"] = {
"max_tokens": max(1024, min(self.valves.THINKING_TOKEN_BUDGET, 32000))
}
if DEBUG_MODE:
print(f"๐ง Enabled thinking for {claude_model_id} with {claude_request_payload['thinking']['max_tokens']} token budget")
if DEBUG_MODE:
print(f"๐ API Endpoint: {vertex_api_endpoint}")
print(f"๐ฆ Request Payload: {json.dumps(claude_request_payload, indent=2)}")
# Make the HTTP request to Vertex AI
try:
api_response = self.authenticated_session.post(
vertex_api_endpoint,
json=claude_request_payload,
timeout=300 # 5 minute timeout for long responses
)
except Exception as request_error:
return f"๐ Request Error: {request_error}"
# Process the API response
return self._process_claude_response(api_response)
def _process_conversation_messages(self, messages):
"""
Process conversation messages into Claude API format
Separates system messages and formats content properly
"""
system_prompt = None
formatted_messages = []
for message in messages:
message_role = message.get("role", "user")
message_content = message.get("content", "")
if message_role == "system":
# Claude handles system messages separately
system_prompt = message_content
else:
# Format the message content for Claude API
formatted_messages.append({
"role": message_role,
"content": self._format_message_content_for_claude(message_content)
})
return system_prompt, formatted_messages
def _format_message_content_for_claude(self, content):
"""
Format message content for Claude API
Handles text, images, and mixed content types
"""
if isinstance(content, str):
# Simple text content
return [{"type": "text", "text": content}]
elif isinstance(content, list):
# Mixed content (text + images)
formatted_content_blocks = []
for content_item in content:
if isinstance(content_item, dict):
if content_item.get("type") == "text":
# Text block
formatted_content_blocks.append(content_item)
elif content_item.get("type") == "image_url":
# Image block - handle both base64 and URL images
image_url = content_item.get("image_url", {}).get("url", "")
if image_url.startswith("data:"):
# Base64 encoded image
try:
mime_header, base64_data = image_url.split(",", 1)
media_type = mime_header.split(":")[1].split(";")[0]
formatted_content_blocks.append({
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": base64_data
}
})
except Exception as image_error:
if DEBUG_MODE:
print(f"โ ๏ธ Error processing base64 image: {image_error}")
formatted_content_blocks.append({
"type": "text",
"text": f"[Image processing error: {image_error}]"
})
else:
# URL-based image
formatted_content_blocks.append({
"type": "image",
"source": {
"type": "url",
"url": image_url
}
})
else:
# Convert other types to text
formatted_content_blocks.append({"type": "text", "text": str(content_item)})
return formatted_content_blocks if formatted_content_blocks else [{"type": "text", "text": str(content)}]
else:
# Convert any other type to text
return [{"type": "text", "text": str(content)}]
def _model_supports_thinking(self, model_id):
"""
Check if the model supports extended thinking capabilities
Currently available for Claude 4 and Claude 3.7 models
"""
thinking_supported_models = [
"claude-opus-4",
"claude-sonnet-4",
"claude-3-7-sonnet"
]
return any(supported_model in model_id for supported_model in thinking_supported_models)
def _process_claude_response(self, api_response):
"""
Process the API response from Claude via Vertex AI
Handles different response formats and extracts the generated text
"""
if api_response.status_code == 200:
try:
response_data = api_response.json()
if DEBUG_MODE:
print(f"๐จ Claude API Response: {json.dumps(response_data, indent=2)}")
# Handle the standard Claude response format
if "content" in response_data and isinstance(response_data["content"], list):
text_blocks = []
thinking_blocks = []
# Extract different types of content from the response
for content_block in response_data["content"]:
if content_block.get("type") == "text":
text_blocks.append(content_block.get("text", ""))
elif content_block.get("type") == "thinking" and self.valves.ENABLE_THINKING:
thinking_blocks.append(f"๐ค **Thinking Process:**\n{content_block.get('thinking', '')}")
# Combine thinking and response content
response_parts = []
if thinking_blocks:
response_parts.extend(thinking_blocks)
if text_blocks:
response_parts.extend(text_blocks)
return "\n\n".join(response_parts) if response_parts else "No content generated."
# Handle alternative response formats (fallback)
elif "predictions" in response_data and response_data["predictions"]:
prediction = response_data["predictions"][0]
if isinstance(prediction, dict):
return prediction.get("content", str(prediction))
else:
return str(prediction)
# Handle direct text response format
elif isinstance(response_data, dict) and "text" in response_data:
return response_data["text"]
else:
# Unknown response format
debug_info = json.dumps(response_data, indent=2) if DEBUG_MODE else "Enable DEBUG_MODE for response details"
return f"โ ๏ธ Unexpected response format from Claude API: {debug_info}"
except json.JSONDecodeError as json_error:
return f"๐ Error parsing Claude API response: {json_error}"
else:
# Handle API errors
try:
error_response = api_response.json()
if "error" in error_response:
error_details = error_response["error"]
if isinstance(error_details, dict):
error_message = error_details.get("message", str(error_details))
else:
error_message = str(error_details)
else:
error_message = api_response.text
except:
error_message = api_response.text
return f"๐ซ Claude API Error ({api_response.status_code}): {error_message}"