Whitepaper
Docs
Sign In
Function
Function
pipe
v0.1.0
Anthropic
Function ID
anthropic
Creator
@grandx
Downloads
657+
Anthropic API access with all the latest API changes (prompt caching, PDF, vision, reasoning etc).
Get
README
No README available
Function Code
Show
""" title: Anthropic API Integration author: grandx, based on Balaxxe & lavantien version: 0.1.0 license: MIT requirements: pydantic>=2.0.0 environment_variables: - ANTHROPIC_API_KEY (required) - THINKING_BUDGET_TOKENS Supports: - All Claude Sonnet models - Streaming responses - Image processing - Prompt caching (server-side) - Function calling - PDF processing - Thinking - Cache Control """ import os import json import logging import asyncio import aiohttp from datetime import datetime from typing import ( List, Union, Dict, Optional, AsyncIterator, AsyncGenerator, ) from pydantic import BaseModel, Field from open_webui.utils.misc import pop_system_message from types import SimpleNamespace class Pipe: API_VERSION = "2023-06-01" MODEL_URL = "https://api.anthropic.com/v1/messages" SUPPORTED_IMAGE_TYPES = ["image/jpeg", "image/png", "image/gif", "image/webp"] MAX_IMAGE_SIZE = 5 * 1024 * 1024 MAX_PDF_SIZE = 32 * 1024 * 1024 TOTAL_MAX_IMAGE_SIZE = 100 * 1024 * 1024 MODEL_MAX_TOKENS = { "claude-3-sonnet-20240229": 4096, "claude-3-5-sonnet-20240620": 8192, "claude-3-5-sonnet-20241022": 8192, "claude-3-5-sonnet-latest": 8192, "claude-3-7-sonnet-20250219": 128000, "claude-3-7-sonnet-latest": 128000, } THINKING_BUDGET_TOKENS = 16000 PDF_HEADER = "pdfs-2024-09-25" PROMPT_CACHE_HEADER = "prompt-caching-2024-07-31" OUTPUT128K_HEADER = "output-128k-2025-02-19" REQUEST_TIMEOUT = 120 class Valves(BaseModel): ANTHROPIC_API_KEY: str = Field( default=os.getenv("ANTHROPIC_API_KEY", ""), description="Your Anthropic API key", ) THINKING_BUDGET_TOKENS: int = Field(default=16000, ge=0, le=96000) def __init__(self): logging.basicConfig(level=logging.INFO) self.type = "manifold" self.id = "anthropic" self.valves = self.Valves() self.request_id = None def get_anthropic_models(self) -> List[dict]: return [ { "id": f"api/{name}", "name": name, "context_length": 200000, "supports_vision": True, } for name in [ "claude-3-sonnet-20240229", "claude-3-5-sonnet-20240620", "claude-3-5-sonnet-20241022", "claude-3-5-sonnet-latest", "claude-3-7-sonnet-20250219", "claude-3-7-sonnet-latest", "claude-3-7-sonnet-latest-thinking", ] ] def pipes(self) -> List[dict]: return self.get_anthropic_models() def process_content(self, content: Union[str, List[dict]]) -> List[dict]: if isinstance(content, str): return [{"type": "text", "text": content}] processed_content = [] for item in content: if item["type"] == "text": processed_content.append({"type": "text", "text": item["text"]}) elif item["type"] == "image_url": processed_content.append(self.process_image(item)) elif item["type"] == "pdf_url": processed_content.append(self.process_pdf(item)) elif item["type"] == "tool_calls": processed_content.append(item) elif item["type"] == "tool_results": processed_content.append(item) return processed_content def process_image(self, image_data): if image_data["image_url"]["url"].startswith("data:image"): mime_type, base64_data = image_data["image_url"]["url"].split(",", 1) media_type = mime_type.split(":")[1].split(";")[0] if media_type not in self.SUPPORTED_IMAGE_TYPES: raise ValueError(f"Unsupported media type: {media_type}") # Check image size image_size = len(base64_data) * 3 / 4 # Approximate size of decoded base64 if image_size > self.MAX_IMAGE_SIZE: raise ValueError( f"Image size exceeds {self.MAX_IMAGE_SIZE/(1024*1024)}MB limit: {image_size/(1024*1024):.2f}MB" ) return { "type": "image", "source": { "type": "base64", "media_type": media_type, "data": base64_data, }, } else: return { "type": "image", "source": {"type": "url", "url": image_data["image_url"]["url"]}, } def process_pdf(self, pdf_data): if pdf_data["pdf_url"]["url"].startswith("data:application/pdf"): mime_type, base64_data = pdf_data["pdf_url"]["url"].split(",", 1) # Check PDF size pdf_size = len(base64_data) * 3 / 4 # Approximate size of decoded base64 if pdf_size > self.MAX_PDF_SIZE: raise ValueError( f"PDF size exceeds {self.MAX_PDF_SIZE/(1024*1024)}MB limit: {pdf_size/(1024*1024):.2f}MB" ) document = { "type": "document", "source": { "type": "base64", "media_type": "application/pdf", "data": base64_data, }, } if pdf_data.get("cache_control"): document["cache_control"] = pdf_data["cache_control"] return document else: document = { "type": "document", "source": {"type": "url", "url": pdf_data["pdf_url"]["url"]}, } if pdf_data.get("cache_control"): document["cache_control"] = pdf_data["cache_control"] return document async def pipe(self, body: Dict) -> Union[str, AsyncGenerator[str, None]]: if not self.valves.ANTHROPIC_API_KEY: return {"content": "Error: ANTHROPIC_API_KEY is required", "format": "text"} try: system_message, messages = pop_system_message(body["messages"]) # Determine if using thinking mode model_name = body["model"].split("/")[-1] is_thinking_mode = "-thinking" in body["model"] if is_thinking_mode: # Strip the "-thinking" suffix for API call model_name = model_name.replace("-thinking", "") max_tokens_limit = self.MODEL_MAX_TOKENS.get(model_name, 4096) payload = { "model": model_name, "messages": self._process_messages(messages), "max_tokens": min( body.get("max_tokens", max_tokens_limit), max_tokens_limit ), "temperature": ( float(body.get("temperature")) if body.get("temperature") is not None else None ), "top_k": ( int(body.get("top_k")) if body.get("top_k") is not None else None ), "top_p": ( float(body.get("top_p")) if body.get("top_p") is not None else None ), "stream": body.get("stream", True), "metadata": body.get("metadata", {}), } payload = {k: v for k, v in payload.items() if v is not None} if system_message: payload["system"] = str(system_message) # Todo OpenAI to Anthropic format # if "tools" in body: # payload["tools"] = [ # {"type": "function", "function": tool} for tool in body["tools"] # ] # payload["tool_choice"] = body.get("tool_choice") if "response_format" in body: payload["response_format"] = { "type": body["response_format"].get("type") } # Add thinking parameters if using thinking mode if is_thinking_mode: payload["thinking"] = { "type": "enabled", "budget_tokens": self.valves.THINKING_BUDGET_TOKENS, } headers = { "x-api-key": self.valves.ANTHROPIC_API_KEY, "anthropic-version": self.API_VERSION, "content-type": "application/json", } beta_headers = [] # Add 128K output beta header beta_headers.append(self.OUTPUT128K_HEADER) if any( isinstance(msg["content"], list) and any( item.get("type") == "pdf_url" or item.get("cache_control") for item in msg["content"] ) for msg in body.get("messages", []) ): if any( isinstance(msg["content"], list) and any(item.get("type") == "pdf_url" for item in msg["content"]) for msg in body.get("messages", []) ): beta_headers.append(self.PDF_HEADER) if any( isinstance(msg["content"], list) and any(item.get("cache_control") for item in msg["content"]) for msg in body.get("messages", []) ): beta_headers.append(self.PROMPT_CACHE_HEADER) if beta_headers: headers["anthropic-beta"] = ",".join(beta_headers) try: if payload["stream"]: return self._stream_with_ui(self.MODEL_URL, headers, payload, body) response = await self._send_request(self.MODEL_URL, headers, payload) if response.status_code != 200: return { "content": f"Error: HTTP {response.status_code}: {response.text}", "format": "text", } result, _ = self._handle_response(response) response_text = result["content"][0]["text"] return response_text except aiohttp.ClientError as e: error_msg = f"Request failed: {str(e)}" if self.request_id: error_msg += f" (Request ID: {self.request_id})" return {"content": error_msg, "format": "text"} except Exception as e: error_msg = f"Error: {str(e)}" if self.request_id: error_msg += f" (Request ID: {self.request_id})" return {"content": error_msg, "format": "text"} async def _stream_with_ui( self, url: str, headers: dict, payload: dict, body: dict ) -> AsyncGenerator[str, None]: try: async with aiohttp.ClientSession() as session: async with session.post( url, headers=headers, json=payload, timeout=aiohttp.ClientTimeout(total=self.REQUEST_TIMEOUT), ) as response: self.request_id = response.headers.get("x-request-id") if response.status != 200: error_msg = ( f"Error: HTTP {response.status}: {await response.text()}" ) if self.request_id: error_msg += f" (Request ID: {self.request_id})" yield error_msg return async for line in response.content: if line and line.startswith(b"data: "): try: data = json.loads(line[6:]) if ( data["type"] == "content_block_delta" and "text" in data["delta"] ): yield data["delta"]["text"] elif data["type"] == "message_stop": break except json.JSONDecodeError as e: logging.error( f"Failed to parse streaming response: {e}" ) continue except asyncio.TimeoutError: error_msg = "Request timed out" if self.request_id: error_msg += f" (Request ID: {self.request_id})" yield error_msg except Exception as e: error_msg = f"Stream error: {str(e)}" if self.request_id: error_msg += f" (Request ID: {self.request_id})" yield error_msg def _process_messages(self, messages: List[dict]) -> List[dict]: processed_messages = [] for message in messages: processed_content = [] for content in self.process_content(message["content"]): if ( message.get("role") == "assistant" and content.get("type") == "tool_calls" ): content["cache_control"] = {"type": "ephemeral"} elif ( message.get("role") == "user" and content.get("type") == "tool_results" ): content["cache_control"] = {"type": "ephemeral"} processed_content.append(content) processed_messages.append( {"role": message["role"], "content": processed_content} ) return processed_messages async def _send_request( self, url: str, headers: dict, payload: dict ) -> SimpleNamespace: retry_count = 0 base_delay = 1 # Start with 1 second delay max_retries = 3 async with aiohttp.ClientSession() as session: while retry_count < max_retries: try: async with session.post( url, headers=headers, json=payload, timeout=aiohttp.ClientTimeout(total=self.REQUEST_TIMEOUT), ) as response: self.request_id = response.headers.get("x-request-id") if response.status == 429: retry_after = int( response.headers.get( "retry-after", base_delay * (2**retry_count) ) ) logging.warning( f"Rate limit hit. Retrying in {retry_after} seconds. Retry count: {retry_count + 1}" ) await asyncio.sleep(retry_after) retry_count += 1 continue response_text = await response.text() response_wrapper = SimpleNamespace( status_code=response.status, headers=response.headers, text=response_text, json=lambda: json.loads(response_text), ) return response_wrapper except aiohttp.ClientError as e: logging.error(f"Request failed: {str(e)}") raise e logging.error("Max retries exceeded for rate limit.") return SimpleNamespace( status_code=429, headers={}, text="Max retries exceeded", json=lambda: {"error": {"message": "Max retries exceeded"}}, ) def _handle_response(self, response): if response.status_code != 200: error_msg = f"Error: HTTP {response.status_code}" try: error_data = response.json().get("error", {}) error_msg += f": {error_data.get('message', response.text)}" except: error_msg += f": {response.text}" self.request_id = response.headers.get("x-request-id") if self.request_id: error_msg += f" (Request ID: {self.request_id})" return {"content": error_msg, "format": "text"}, None result = response.json() usage = result.get("usage", {}) cache_metrics = { "cache_creation_input_tokens": usage.get("cache_creation_input_tokens", 0), "cache_read_input_tokens": usage.get("cache_read_input_tokens", 0), "input_tokens": usage.get("input_tokens", 0), "output_tokens": usage.get("output_tokens", 0), } return result, cache_metrics