import json
import logging
import os
import time
from typing import Dict, Generator, Iterator, List, Union
import aiohttp
import requests
from open_webui.utils.misc import pop_system_message
from pydantic import BaseModel, Field
"""
An OpenWebUI pipe for Anthropic models that supports reasoning for the 3.7 series models and streaming, including native <thinking> tag support. Additionally, it can handle images and PDFs.
"""
class Pipe:
API_VERSION = "2023-06-01"
MODEL_URL = "https://api.anthropic.com/v1/messages"
SUPPORTED_IMAGE_TYPES = ["image/jpeg", "image/png", "image/gif", "image/webp"]
SUPPORTED_PDF_MODELS = [
"claude-3-5-sonnet-20241022",
"claude-3-5-sonnet-20240620",
"claude-3-5-sonnet-latest",
"claude-3-7-sonnet-20250219",
"claude-3-7-sonnet-latest",
]
MAX_IMAGE_SIZE = 5 * 1024 * 1024
MAX_PDF_SIZE = 32 * 1024 * 1024
TOTAL_MAX_IMAGE_SIZE = 100 * 1024 * 1024
PDF_BETA_HEADER = "pdfs-2024-09-25"
# TODO: Fetch model max tokens from the API if available
MODEL_MAX_TOKENS = {
"claude-3-opus-20240229": 4096,
"claude-3-sonnet-20240229": 4096,
"claude-3-haiku-20240307": 4096,
"claude-3-5-sonnet-20240620": 8192,
"claude-3-5-sonnet-20241022": 8192,
"claude-3-5-haiku-20241022": 8192,
"claude-3-opus-latest": 4096,
"claude-3-5-sonnet-latest": 8192,
"claude-3-5-haiku-latest": 8192,
"claude-3-7-sonnet-20250219": 16384,
"claude-3-7-sonnet-latest": 16384,
}
def get_reasoning_tokens(self, model_name, effort="medium"):
"""Calculate optimal reasoning tokens based on model max tokens and effort level"""
max_tokens = self.MODEL_MAX_TOKENS.get(model_name, 4096)
# Reserve a portion of tokens for the final answer
answer_reserve = max(1024, int(max_tokens * 0.1)) # At least 1024 tokens or 10%
# Calculate safe maximum reasoning tokens
safe_max = max_tokens - answer_reserve
# Scale based on effort level
if effort == "low":
return min(16000, int(safe_max * 0.7)) # 70% of available tokens
elif effort == "medium":
return min(32000, int(safe_max * 0.85)) # 85% of available tokens
elif effort == "high":
return min(64000, int(safe_max * 0.95)) # 95% of available tokens
else:
return min(32000, int(safe_max * 0.85)) # Default to medium
BETA_HEADER = "prompt-caching-2024-07-31"
REQUEST_TIMEOUT = (3.05, 60)
class Valves(BaseModel):
ANTHROPIC_API_KEY: str = Field(
default=os.getenv("ANTHROPIC_API_KEY", ""),
description="Your Anthropic API key",
)
def __init__(self):
logging.basicConfig(level=logging.INFO)
self.type = "manifold"
self.id = "anthropic"
self.valves = self.Valves()
self.request_id = None
self.is_thinking = False
self.thinking_start_time = None
def get_anthropic_models(self) -> List[dict]:
return [
{
"id": f"anthropic/{name}",
"name": name,
"context_length": 200000,
"supports_vision": name not in ["claude-3-5-haiku-20241022"],
}
for name in [
"claude-3-opus-20240229",
"claude-3-sonnet-20240229",
"claude-3-haiku-20240307",
"claude-3-5-sonnet-20240620",
"claude-3-5-sonnet-20241022",
"claude-3-5-haiku-20241022",
"claude-3-opus-latest",
"claude-3-5-sonnet-latest",
"claude-3-5-haiku-latest",
"claude-3-7-sonnet-20250219",
"claude-3-7-sonnet-latest",
]
]
def pipes(self) -> List[dict]:
return self.get_anthropic_models()
def process_content(self, content: Union[str, List[dict]]) -> List[dict]:
if isinstance(content, str):
return [{"type": "text", "text": content}]
processed_content = []
for item in content:
if item["type"] == "text":
processed_content.append({"type": "text", "text": item["text"]})
elif item["type"] == "image_url":
processed_content.append(self.process_image(item))
elif item["type"] == "pdf_url":
model_name = item.get("model", "").split("/")[-1]
if model_name not in self.SUPPORTED_PDF_MODELS:
raise ValueError(
f"PDF support is only available for models: {', '.join(self.SUPPORTED_PDF_MODELS)}"
)
processed_content.append(self.process_pdf(item))
return processed_content
def process_image(self, image_data):
if image_data["image_url"]["url"].startswith("data:image"):
mime_type, base64_data = image_data["image_url"]["url"].split(",", 1)
media_type = mime_type.split(":")[1].split(";")[0]
if media_type not in self.SUPPORTED_IMAGE_TYPES:
raise ValueError(f"Unsupported media type: {media_type}")
# TODO: Optimize image processing to avoid reading the entire base64 data into memory
return {
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": base64_data,
},
}
else:
return {
"type": "image",
"source": {"type": "url", "url": image_data["image_url"]["url"]},
}
def process_pdf(self, pdf_data):
if pdf_data["pdf_url"]["url"].startswith("data:application/pdf"):
mime_type, base64_data = pdf_data["pdf_url"]["url"].split(",", 1)
document = {
"type": "document",
"source": {
"type": "base64",
"media_type": "application/pdf",
"data": base64_data,
},
}
# Calculate PDF size for status updates
pdf_size = len(base64_data) * 3 / 4
pdf_size_mb = pdf_size / (1024 * 1024)
# Check PDF size against limits
if pdf_size > self.MAX_PDF_SIZE:
raise ValueError(
f"PDF size exceeds {self.MAX_PDF_SIZE/(1024*1024)}MB limit: {pdf_size_mb:.2f}MB"
)
# Store PDF info for status updates
if not hasattr(self, "pdf_info"):
self.pdf_info = []
self.pdf_info.append({"type": "base64", "size_mb": pdf_size_mb})
# TODO: Optimize PDF processing to avoid reading the entire base64 data into memory
if pdf_data.get("cache_control"):
document["cache_control"] = pdf_data["cache_control"]
return document
else:
document = {
"type": "document",
"source": {"type": "url", "url": pdf_data["pdf_url"]["url"]},
}
# Store PDF info for status updates
if not hasattr(self, "pdf_info"):
self.pdf_info = []
self.pdf_info.append({"type": "url", "url": pdf_data["pdf_url"]["url"]})
if pdf_data.get("cache_control"):
document["cache_control"] = pdf_data["cache_control"]
return document
async def pipe(
self, body: Dict, __event_emitter__=None
) -> Union[str, Generator, Iterator]:
if not self.valves.ANTHROPIC_API_KEY:
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Error: ANTHROPIC_API_KEY is required",
"done": True,
},
}
)
return {"content": "Error: ANTHROPIC_API_KEY is required", "format": "text"}
try:
# Reset PDF info at the start of each request
self.pdf_info = []
system_message, messages = pop_system_message(body["messages"])
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Processing request...", "done": False},
}
)
model_name = body["model"].split("/")[-1]
max_tokens_limit = self.MODEL_MAX_TOKENS.get(model_name, 4096)
# Initialize thinking if needed
reasoning_tokens = None
if model_name.startswith("claude-3-7") and body.get("reasoning_effort"):
reasoning_effort = body.get("reasoning_effort", "medium")
reasoning_tokens = self.get_reasoning_tokens(
model_name, reasoning_effort
)
# Calculate max_tokens to be greater than reasoning_tokens
answer_reserve = max(
1024, int(reasoning_tokens * 0.1)
) # Reserve at least 1024 tokens
max_tokens = min(max_tokens_limit, reasoning_tokens + answer_reserve)
# If user specified a higher max_tokens, use that instead
if body.get("max_tokens", 0) > max_tokens:
max_tokens = min(body.get("max_tokens"), max_tokens_limit)
# Final check to ensure max_tokens > reasoning_tokens
if max_tokens <= reasoning_tokens:
# Reduce reasoning tokens slightly to ensure proper relationship
reasoning_tokens = int(
max_tokens * 0.9
) # 90% for reasoning, 10% for final output
logging.info(
f"Adjusted reasoning_tokens to {reasoning_tokens} to fit within max_tokens of {max_tokens}"
)
else:
max_tokens = min(
body.get("max_tokens", max_tokens_limit), max_tokens_limit
)
payload = {
"model": model_name,
"messages": self._process_messages(messages),
"max_tokens": max_tokens,
"temperature": (
float(body.get("temperature"))
if body.get("temperature") is not None
else None
),
"top_k": (
int(body.get("top_k")) if body.get("top_k") is not None else None
),
"top_p": (
float(body.get("top_p")) if body.get("top_p") is not None else None
),
"stream": body.get("stream"),
"metadata": body.get("metadata", {}),
}
# Add thinking parameter for Claude 3.7 models
if reasoning_tokens:
payload["thinking"] = {
"type": "enabled",
"budget_tokens": reasoning_tokens,
}
# Log the values to help with debugging
logging.info(
f"Using reasoning_tokens={reasoning_tokens}, max_tokens={max_tokens}"
)
payload = {k: v for k, v in payload.items() if v is not None}
if system_message:
payload["system"] = str(system_message)
if "tools" in body:
payload["tools"] = [
{"type": "function", "function": tool} for tool in body["tools"]
]
payload["tool_choice"] = body.get("tool_choice")
if "response_format" in body:
payload["response_format"] = {
"type": body["response_format"].get("type")
}
headers = {
"x-api-key": self.valves.ANTHROPIC_API_KEY,
"anthropic-version": self.API_VERSION,
"content-type": "application/json",
}
beta_headers = []
if any(
isinstance(msg["content"], list)
and any(
item.get("type") == "pdf_url" or item.get("cache_control")
for item in msg["content"]
)
for msg in body.get("messages", [])
):
if any(
isinstance(msg["content"], list)
and any(item.get("type") == "pdf_url" for item in msg["content"])
for msg in body.get("messages", [])
):
beta_headers.append(self.PDF_BETA_HEADER)
# Send status update about PDF processing
if (
__event_emitter__
and hasattr(self, "pdf_info")
and self.pdf_info
):
pdf_count = len(self.pdf_info)
pdf_sizes = [
pdf.get("size_mb", "unknown")
for pdf in self.pdf_info
if pdf.get("size_mb")
]
if pdf_sizes:
total_size = sum(
size
for size in pdf_sizes
if isinstance(size, (int, float))
)
size_info = f" (Total: {total_size:.2f}MB)"
else:
size_info = ""
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Processing {pdf_count} PDF{'s' if pdf_count > 1 else ''}{size_info}...",
"done": False,
},
}
)
if any(
isinstance(msg["content"], list)
and any(item.get("cache_control") for item in msg["content"])
for msg in body.get("messages", [])
):
beta_headers.append(self.BETA_HEADER)
if beta_headers:
headers["anthropic-beta"] = ",".join(beta_headers)
try:
if payload["stream"]:
return self._stream_with_ui(
self.MODEL_URL, headers, payload, body, __event_emitter__
)
response = await self._send_request(self.MODEL_URL, headers, payload)
if response.status_code != 200:
return {
"content": f"Error: HTTP {response.status_code}: {response.text}",
"format": "text",
}
result, _ = self._handle_response(response)
response_text = result["content"][0]["text"]
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Request completed successfully",
"done": True,
},
}
)
return response_text
except requests.exceptions.RequestException as e:
error_msg = f"Request failed: {str(e)}"
if self.request_id:
error_msg += f" (Request ID: {self.request_id})"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": error_msg, "done": True},
}
)
return {"content": error_msg, "format": "text"}
except Exception as e:
error_msg = f"Error: {str(e)}"
if self.request_id:
error_msg += f" (Request ID: {self.request_id})"
if __event_emitter__:
await __event_emitter__(
{"type": "status", "data": {"description": error_msg, "done": True}}
)
return {"content": error_msg, "format": "text"}
async def _stream_with_ui(
self, url: str, headers: dict, payload: dict, body: dict, __event_emitter__=None
) -> Generator:
try:
self.is_thinking = False
self.thinking_start_time = None
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=payload) as response:
self.request_id = response.headers.get("x-request-id")
if response.status != 200:
error_msg = (
f"Error: HTTP {response.status}: {await response.text()}"
)
if self.request_id:
error_msg += f" (Request ID: {self.request_id})"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": error_msg,
"done": True,
},
}
)
yield error_msg
return
async for line in response.content:
if line and line.startswith(b"data: "):
try:
data = json.loads(line[6:])
# Handling thinking content start
if (
data["type"] == "content_block_start"
and data.get("content_block", {}).get("type")
== "thinking"
):
self.is_thinking = True
self.thinking_start_time = time.time()
# Just yield the opening tag immediately
yield "<think>"
yield "\n"
# Handling thinking content deltas
elif (
self.is_thinking
and data["type"] == "content_block_delta"
and "thinking"
in data.get("delta", {}).get("type", "")
):
thinking_text = data.get("delta", {}).get(
"thinking", ""
)
# Just yield each piece of thinking as it arrives
yield thinking_text
# Handling thinking content end
elif (
self.is_thinking
and data["type"] == "content_block_stop"
):
self.is_thinking = False
thinking_time = (
time.time() - self.thinking_start_time
)
# Close the thinking block
yield "\n</think>\n\n"
# Also update status
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Completed reasoning in {thinking_time:.1f} seconds",
"done": False,
},
}
)
# Handling regular content
elif (
data["type"] == "content_block_delta"
and "text" in data["delta"]
):
yield data["delta"]["text"]
# Handling completion
elif data["type"] == "message_stop":
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Request completed",
"done": True,
},
}
)
break
except json.JSONDecodeError as e:
logging.error(
f"Failed to parse streaming response: {e}"
)
continue
except Exception as e:
error_msg = f"Stream error: {str(e)}"
if self.request_id:
error_msg += f" (Request ID: {self.request_id})"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": error_msg, "done": True},
}
)
yield error_msg
def _process_messages(self, messages: List[dict]) -> List[dict]:
processed_messages = []
for message in messages:
processed_content = []
for content in self.process_content(message["content"]):
if (
message.get("role") == "assistant"
and content.get("type") == "tool_calls"
):
content["cache_control"] = {"type": "ephemeral"}
elif (
message.get("role") == "user"
and content.get("type") == "tool_results"
):
content["cache_control"] = {"type": "ephemeral"}
elif content.get("type") == "image":
if content["source"]["type"] == "base64":
image_size = len(content["source"]["data"]) * 3 / 4
if image_size > self.MAX_IMAGE_SIZE:
raise ValueError(
f"Image size exceeds 5MB limit: {image_size / (1024 * 1024):.2f}MB"
)
if (
content["source"]["media_type"]
not in self.SUPPORTED_IMAGE_TYPES
):
raise ValueError(
f"Unsupported media type: {content['source']['media_type']}"
)
elif content.get("type") == "document":
# Add status update for document processing
logging.info(
f"Processing document: {content.get('source', {}).get('type')}"
)
processed_content.append(content)
processed_messages.append(
{"role": message["role"], "content": processed_content}
)
return processed_messages
async def _send_request(
self, url: str, headers: dict, payload: dict
) -> requests.Response:
retry_count = 0
base_delay = 1 # Start with 1 second delay
max_retries = 3
while retry_count < max_retries:
try:
response = requests.post(
url,
headers=headers,
json=payload,
timeout=self.REQUEST_TIMEOUT,
)
if response.status_code == 429:
retry_after = int(
response.headers.get(
"retry-after", base_delay * (2 * retry_count)
)
)
logging.warning(
f"Rate limit hit. Retrying in {retry_after} seconds. Retry count: {retry_count + 1}"
)
time.sleep(retry_after)
retry_count += 1
continue
return response
except requests.exceptions.RequestException as e:
logging.error(f"Request failed: {str(e)}")
raise
logging.error("Max retries exceeded for rate limit.")
return requests.Response()
def _handle_response(self, response):
if response.status_code != 200:
error_msg = f"Error: HTTP {response.status_code}"
try:
error_data = response.json().get("error", {})
error_msg += f": {error_data.get('message', response.text)}"
except:
error_msg += f": {response.text}"
self.request_id = response.headers.get("x-request-id")
if self.request_id:
error_msg += f" (Request ID: {self.request_id})"
return {"content": error_msg, "format": "text"}, None
result = response.json()
usage = result.get("usage", {})
cache_metrics = {
"cache_creation_input_tokens": usage.get("cache_creation_input_tokens", 0),
"cache_read_input_tokens": usage.get("cache_read_input_tokens", 0),
"input_tokens": usage.get("input_tokens", 0),
"output_tokens": usage.get("output_tokens", 0),
}
return result, cache_metrics