"""
title: Anthropic API Integration for OpenWebUI
author: Balaxxe
version: 1.3
license: MIT
requirements: pydantic>=2.0.0, requests>=2.0.0
environment_variables:
- ANTHROPIC_API_KEY (required)
Supports:
- All Claude 3 models
- Streaming responses
- Image processing
- Prompt caching (server-side)
- Function calling
"""
import os
import requests
import json
import time
import hashlib
from datetime import datetime
from typing import (
List,
Union,
Generator,
Iterator,
Dict,
Optional,
AsyncIterator,
)
from pydantic import BaseModel, Field
from open_webui.utils.misc import pop_system_message
class Pipe:
API_VERSION = "2023-06-01"
MODEL_URL = "https://api.anthropic.com/v1/messages"
SUPPORTED_IMAGE_TYPES = ["image/jpeg", "image/png", "image/gif", "image/webp"]
MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5MB in bytes
TOTAL_MAX_IMAGE_SIZE = 100 * 1024 * 1024 # 100MB in bytes
DEFAULT_MAX_TOKENS_CLAUDE_3_5 = 8192
DEFAULT_MAX_TOKENS_OTHER = 4096
BETA_HEADER = "prompt-caching-2024-07-31"
REQUEST_TIMEOUT = (3.05, 60)
class Valves(BaseModel):
ANTHROPIC_API_KEY: str = Field(default="", description="Your Anthropic API key")
def __init__(self):
self.type = "manifold"
self.id = "anthropic"
self.name = "anthropic/"
self.valves = self.Valves(
**{
"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", ""),
}
)
self.request_id = None
def get_anthropic_models(self):
return [
{
"id": "anthropic/claude-3-haiku-20240307",
"name": "claude-3-haiku-20240307",
"context_length": 200000,
"supports_vision": False,
},
{
"id": "anthropic/claude-3-opus-20240229",
"name": "claude-3-opus-20240229",
"context_length": 200000,
"supports_vision": True,
},
{
"id": "anthropic/claude-3-sonnet-20240229",
"name": "claude-3-sonnet-20240229",
"context_length": 200000,
"supports_vision": True,
},
{
"id": "anthropic/claude-3-5-haiku-20241022",
"name": "claude-3.5-haiku-20241022",
"context_length": 200000,
"supports_vision": False,
},
{
"id": "anthropic/claude-3-5-haiku-latest",
"name": "claude-3.5-haiku-latest",
"context_length": 200000,
"supports_vision": False,
},
{
"id": "anthropic/claude-3-5-sonnet-20240620",
"name": "claude-3.5-sonnet-20240620",
"context_length": 200000,
"supports_vision": True,
},
{
"id": "anthropic/claude-3-5-sonnet-20241022",
"name": "claude-3.5-sonnet-20241022",
"context_length": 200000,
"supports_vision": True,
},
{
"id": "anthropic/claude-3-5-sonnet-latest",
"name": "claude-3.5-sonnet-latest",
"context_length": 200000,
"supports_vision": True,
},
]
def pipes(self) -> List[dict]:
return self.get_anthropic_models()
def process_content(self, content: Union[str, List[dict]]) -> List[dict]:
if isinstance(content, str):
return [{"type": "text", "text": content}]
processed_content = []
total_image_size = 0
for item in content:
if item["type"] == "text":
processed_content.append({"type": "text", "text": item["text"]})
elif item["type"] == "image_url":
processed_image = self.process_image(item)
if processed_image["source"]["type"] == "base64":
image_size = len(processed_image["source"]["data"]) * 3 / 4
total_image_size += image_size
if image_size > self.MAX_IMAGE_SIZE:
raise ValueError(
f"Image size exceeds 5MB limit: {image_size / (1024 * 1024):.2f}MB"
)
if total_image_size > self.TOTAL_MAX_IMAGE_SIZE:
raise ValueError("Total size of images exceeds 100 MB limit")
processed_content.append(processed_image)
return processed_content
def process_image(self, image_data):
if image_data["image_url"]["url"].startswith("data:image"):
mime_type, base64_data = image_data["image_url"]["url"].split(",", 1)
media_type = mime_type.split(":")[1].split(";")[0]
if media_type not in ["image/jpeg", "image/png", "image/gif", "image/webp"]:
raise ValueError(f"Unsupported media type: {media_type}")
image_size = len(base64_data) * 3 / 4
if image_size > self.MAX_IMAGE_SIZE:
raise ValueError(
f"Image size exceeds limit: {image_size / (1024 * 1024):.2f}MB"
)
return {
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": base64_data,
},
}
else:
url = image_data["image_url"]["url"]
response = requests.head(url, allow_redirects=True)
content_type = response.headers.get("content-type", "")
content_length = int(response.headers.get("content-length", 0))
if content_type not in [
"image/jpeg",
"image/png",
"image/gif",
"image/webp",
]:
raise ValueError(f"Unsupported media type: {content_type}")
if content_length > self.MAX_IMAGE_SIZE:
raise ValueError(
f"Image exceeds 5MB limit: {content_length / (1024 * 1024):.2f}MB"
)
return {"type": "image", "source": {"type": "url", "url": url}}
async def pipe(
self, body: Dict, __event_emitter__=None
) -> Union[str, Generator, Iterator]:
if not self.valves.ANTHROPIC_API_KEY:
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Error: ANTHROPIC_API_KEY is required",
"done": True,
},
}
)
return {"content": "Error: ANTHROPIC_API_KEY is required", "format": "text"}
try:
system_message, messages = pop_system_message(body["messages"])
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Processing request...", "done": False},
}
)
model_name = body["model"].split("/")[-1]
default_max_tokens = self.DEFAULT_MAX_TOKENS_CLAUDE_3_5 if model_name.startswith("claude-3-5") else self.DEFAULT_MAX_TOKENS_OTHER
payload = {
"model": model_name,
"messages": self._process_messages(messages),
"max_tokens": min(
body.get("max_tokens", default_max_tokens), default_max_tokens
),
"temperature": float(body.get("temperature")) if body.get("temperature") is not None else None,
"top_k": int(body.get("top_k")) if body.get("top_k") is not None else None,
"top_p": float(body.get("top_p")) if body.get("top_p") is not None else None,
"stream": body.get("stream"),
"metadata": body.get("metadata", {}),
}
payload = {k: v for k, v in payload.items() if v is not None}
if system_message:
payload["system"] = str(system_message)
if "tools" in body:
payload["tools"] = [
{"type": "function", "function": tool} for tool in body["tools"]
]
payload["tool_choice"] = body.get("tool_choice")
if "response_format" in body:
payload["response_format"] = {
"type": body["response_format"].get("type")
}
headers = {
"x-api-key": self.valves.ANTHROPIC_API_KEY,
"anthropic-version": self.API_VERSION,
"anthropic-beta": self.BETA_HEADER,
"content-type": "application/json",
}
try:
if payload["stream"]:
return self._stream_with_ui(
self.MODEL_URL, headers, payload, body, __event_emitter__
)
response = requests.post(
self.MODEL_URL, headers=headers, json=payload, timeout=self.REQUEST_TIMEOUT
)
if response.status_code != 200:
return {
"content": f"Error: HTTP {response.status_code}: {response.text}",
"format": "text",
}
result, cache_metrics = self._handle_response(response)
response_text = result["content"][0]["text"]
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Request completed successfully",
"done": True,
},
}
)
return response_text
except requests.exceptions.RequestException as e:
error_msg = f"Request failed: {str(e)}"
if self.request_id:
error_msg += f" (Request ID: {self.request_id})"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": error_msg, "done": True},
}
)
return {"content": error_msg, "format": "text"}
except Exception as e:
error_msg = f"Error: {str(e)}"
if self.request_id:
error_msg += f" (Request ID: {self.request_id})"
if __event_emitter__:
await __event_emitter__(
{"type": "status", "data": {"description": error_msg, "done": True}}
)
return {"content": error_msg, "format": "text"}
async def _stream_with_ui(
self, url: str, headers: dict, payload: dict, body: dict, __event_emitter__=None
) -> Generator:
try:
with requests.post(
url, headers=headers, json=payload, stream=True, timeout=self.REQUEST_TIMEOUT
) as response:
self.request_id = response.headers.get("x-request-id")
if response.status_code != 200:
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Request failed", "done": True},
}
)
yield f"Error: HTTP {response.status_code}: {response.text} (Request ID: {self.request_id})"
return
for line in response.iter_lines():
if line and line.startswith(b"data: "):
data = json.loads(line[6:])
if (
data["type"] == "content_block_delta"
and "text" in data["delta"]
):
yield data["delta"]["text"]
elif data["type"] == "message_stop":
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Request completed successfully",
"done": True,
},
}
)
break
elif data["type"] == "error":
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Stream error",
"done": True,
},
}
)
yield f"Stream error: {data.get('error', {}).get('message', 'Unknown error')} (Request ID: {self.request_id})"
except Exception as e:
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Stream error", "done": True},
}
)
yield f"Stream error: {str(e)} (Request ID: {self.request_id if self.request_id else 'unknown'})"
def _process_messages(self, messages: List[dict]) -> List[dict]:
processed_messages = []
for message in messages:
processed_content = []
for content in self.process_content(message["content"]):
if message.get("role") == "assistant" and content.get("type") == "tool_calls":
content["cache_control"] = {"type": "ephemeral"}
elif message.get("role") == "user" and content.get("type") == "tool_results":
content["cache_control"] = {"type": "ephemeral"}
processed_content.append(content)
processed_messages.append({
"role": message["role"],
"content": processed_content
})
return processed_messages
def _handle_response(self, response):
result = response.json()
usage = result.get("usage", {})
cache_metrics = {
"cache_creation_input_tokens": usage.get("cache_creation_input_tokens", 0),
"cache_read_input_tokens": usage.get("cache_read_input_tokens", 0),
"input_tokens": usage.get("input_tokens", 0)
}
return result, cache_metrics