We're Hiring!
Whitepaper
Docs
Sign In
Function
Function
pipe
v5.0
Anthropic API, Supports Claude 4, Includes Both Standard and Extended Thinking Versions
Last Updated
6 months ago
Created
9 months ago
Function ID
anthropic_balaxxe_cot
Creator
@ncks024
Downloads
2.3K+
Get
Sponsored by Open WebUI Inc.
We are hiring!
Shape the way humanity engages with
intelligence
.
Description
The original function made by Balaxxe modified to stream CoT tokens
README
Function Code
Show
""" title: Anthropic API Integration w/ Extended Thinking, 128K Output, & Complete Claude Model Support author: Balaxxe (Updated by nbellochi) version: 5.0 license: MIT requirements: pydantic>=2.0.0, requests>=2.0.0 environment_variables: - ANTHROPIC_API_KEY (required) Supports: - All Claude 3, 3.5, 3.7, and 4 models (including latest versions) - Extended thinking with configurable budget - 128K output tokens for Claude 3.7 and 4 models - Streaming responses with thinking visualization - Image and PDF processing - Prompt caching (server-side) - Function calling - Cache Control - Comprehensive error handling Updates (v5.0): - Added support for new Claude 4 models (Sonnet 4, Opus 4) - Updated model capabilities and token limits - Enhanced extended thinking for Claude 4 models - Improved error handling and retry logic - Updated API version and beta headers - Refined documentation and model metadata """ import os import json import time import hashlib import logging import asyncio import random from datetime import datetime from typing import ( List, Union, Generator, Iterator, Dict, Optional, AsyncIterator, ) from pydantic import BaseModel, Field from open_webui.utils.misc import pop_system_message import aiohttp class Pipe: API_VERSION = "2023-06-01" # Current API version as of May 2025 MODEL_URL = "https://api.anthropic.com/v1/messages" SUPPORTED_IMAGE_TYPES = ["image/jpeg", "image/png", "image/gif", "image/webp"] SUPPORTED_PDF_MODELS = [ "claude-3-5-sonnet-20241022", "claude-3-5-sonnet-20240620", "claude-3-7-sonnet-latest", "claude-3-7-sonnet-20250219", "claude-sonnet-4-20250514", # New Claude 4 models "claude-opus-4-20250514", "claude-sonnet-4-latest", "claude-opus-4-latest", ] MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5MB per image MAX_PDF_SIZE = 32 * 1024 * 1024 # 32MB per PDF TOTAL_MAX_IMAGE_SIZE = 100 * 1024 * 1024 # 100MB total PDF_BETA_HEADER = "pdfs-2024-09-25" OUTPUT_128K_BETA = "output-128k-2025-02-19" # Model max tokens - comprehensive list of all Claude models MODEL_MAX_TOKENS = { # Claude 3 family "claude-3-opus-20240229": 4096, "claude-3-sonnet-20240229": 4096, "claude-3-haiku-20240307": 4096, # Claude 3.5 family "claude-3-5-sonnet-20240620": 8192, "claude-3-5-sonnet-20241022": 8192, "claude-3-5-haiku-20241022": 8192, # Claude 3.7 family "claude-3-7-sonnet-20250219": 16384, # 16K by default, 128K with beta # Claude 4 family - NEW MODELS "claude-sonnet-4-20250514": 32000, # 32K by default, 128K with beta "claude-opus-4-20250514": 32000, # 32K by default, 128K with beta # Latest aliases "claude-3-opus-latest": 4096, "claude-3-sonnet-latest": 4096, "claude-3-haiku-latest": 4096, "claude-3-5-sonnet-latest": 8192, "claude-3-5-haiku-latest": 8192, "claude-3-7-sonnet-latest": 16384, # 16K by default, 128K with beta "claude-sonnet-4-latest": 32000, # 32K by default, 128K with beta "claude-opus-4-latest": 32000, # 32K by default, 128K with beta } # Model context lengths - maximum input tokens MODEL_CONTEXT_LENGTH = { # Claude 3 family "claude-3-opus-20240229": 200000, "claude-3-sonnet-20240229": 200000, "claude-3-haiku-20240307": 200000, # Claude 3.5 family "claude-3-5-sonnet-20240620": 200000, "claude-3-5-sonnet-20241022": 200000, "claude-3-5-haiku-20241022": 200000, # Claude 3.7 family "claude-3-7-sonnet-20250219": 200000, # Claude 4 family - NEW MODELS "claude-sonnet-4-20250514": 200000, "claude-opus-4-20250514": 200000, # Latest aliases "claude-3-opus-latest": 200000, "claude-3-sonnet-latest": 200000, "claude-3-haiku-latest": 200000, "claude-3-5-sonnet-latest": 200000, "claude-3-5-haiku-latest": 200000, "claude-3-7-sonnet-latest": 200000, "claude-sonnet-4-latest": 200000, "claude-opus-4-latest": 200000, } # Models that support extended thinking THINKING_SUPPORTED_MODELS = [ "claude-3-7-sonnet-latest", "claude-3-7-sonnet-20250219", # Claude 4 models with enhanced thinking capabilities "claude-sonnet-4-20250514", "claude-opus-4-20250514", "claude-sonnet-4-latest", "claude-opus-4-latest", ] BETA_HEADER = "prompt-caching-2024-07-31" REQUEST_TIMEOUT = ( 300 # Increased timeout for longer responses with extended thinking ) THINKING_BUDGET_TOKENS = 16000 # Default thinking budget tokens (max 16K) CLAUDE_4_THINKING_BUDGET = 32000 # Enhanced thinking budget for Claude 4 models class Valves(BaseModel): ANTHROPIC_API_KEY: str = "Your API Key Here" ENABLE_THINKING: bool = ( True # Changed to True to enable streaming of thinking tokens ) MAX_OUTPUT_TOKENS: bool = True # Valve to use maximum possible output tokens ENABLE_TOOL_CHOICE: bool = True # Valve to enable tool choice ENABLE_SYSTEM_PROMPT: bool = True # Valve to enable system prompt THINKING_BUDGET_TOKENS: int = Field( default=16000, ge=0, le=16000 ) # Configurable thinking budget tokens 16,000 max def __init__(self): logging.basicConfig(level=logging.INFO) self.type = "manifold" self.id = "anthropic" self.valves = self.Valves() self.request_id = None def get_anthropic_models(self) -> List[dict]: """ Returns a list of all supported Anthropic Claude models with their capabilities. For hybrid models (3.7, 4.x), this creates separate entries for standard and thinking modes so you can explicitly choose which mode to use in OpenWebUI. """ # Standard models (no thinking capability) standard_models = [ # Claude 3 family "claude-3-opus-20240229", "claude-3-sonnet-20240229", "claude-3-haiku-20240307", # Claude 3.5 family "claude-3-5-sonnet-20240620", "claude-3-5-sonnet-20241022", "claude-3-5-haiku-20241022", # Latest aliases for non-thinking models "claude-3-opus-latest", "claude-3-sonnet-latest", "claude-3-haiku-latest", "claude-3-5-sonnet-latest", "claude-3-5-haiku-latest", ] # Hybrid models (support both standard and thinking modes) hybrid_models = [ "claude-3-7-sonnet-20250219", "claude-3-7-sonnet-latest", "claude-sonnet-4-20250514", "claude-opus-4-20250514", "claude-sonnet-4-latest", "claude-opus-4-latest", ] models = [] # Add standard models for name in standard_models: models.append({ "id": f"anthropic/{name}", "name": name, "context_length": self.MODEL_CONTEXT_LENGTH.get(name, 200000), "supports_vision": name != "claude-3-5-haiku-20241022", "supports_thinking": False, "is_hybrid_model": False, "max_output_tokens": self.MODEL_MAX_TOKENS.get(name, 4096), }) # Add hybrid models - both standard and thinking versions for name in hybrid_models: # Standard mode version models.append({ "id": f"anthropic/{name}", "name": f"{name} (Standard)", "context_length": self.MODEL_CONTEXT_LENGTH.get(name, 200000), "supports_vision": True, "supports_thinking": False, "is_hybrid_model": True, "thinking_mode": "standard", "max_output_tokens": self.MODEL_MAX_TOKENS.get(name, 16384), }) # Thinking mode version models.append({ "id": f"anthropic/{name}-thinking", "name": f"{name} (Extended Thinking)", "context_length": self.MODEL_CONTEXT_LENGTH.get(name, 200000), "supports_vision": True, "supports_thinking": True, "is_hybrid_model": True, "thinking_mode": "extended", "max_output_tokens": 131072 if self.valves.MAX_OUTPUT_TOKENS else self.MODEL_MAX_TOKENS.get(name, 16384), # 128K with beta }) return models def pipes(self) -> List[dict]: return self.get_anthropic_models() def process_content(self, content: Union[str, List[dict]]) -> List[dict]: if isinstance(content, str): return [{"type": "text", "text": content}] processed_content = [] for item in content: if item["type"] == "text": processed_content.append({"type": "text", "text": item["text"]}) elif item["type"] == "image_url": processed_content.append(self.process_image(item)) elif item["type"] == "pdf_url": model_name = item.get("model", "").split("/")[-1] if model_name not in self.SUPPORTED_PDF_MODELS: raise ValueError( f"PDF support is only available for models: {', '.join(self.SUPPORTED_PDF_MODELS)}" ) processed_content.append(self.process_pdf(item)) elif item["type"] == "tool_calls": processed_content.append(item) elif item["type"] == "tool_results": processed_content.append(item) return processed_content def process_image(self, image_data): if image_data["image_url"]["url"].startswith("data:image"): mime_type, base64_data = image_data["image_url"]["url"].split(",", 1) media_type = mime_type.split(":")[1].split(";")[0] if media_type not in self.SUPPORTED_IMAGE_TYPES: raise ValueError(f"Unsupported media type: {media_type}") # Check image size image_size = len(base64_data) * 3 / 4 # Approximate size of decoded base64 if image_size > self.MAX_IMAGE_SIZE: raise ValueError( f"Image size exceeds {self.MAX_IMAGE_SIZE/(1024*1024)}MB limit: {image_size/(1024*1024):.2f}MB" ) return { "type": "image", "source": { "type": "base64", "media_type": media_type, "data": base64_data, }, } else: return { "type": "image", "source": {"type": "url", "url": image_data["image_url"]["url"]}, } def process_pdf(self, pdf_data): if pdf_data["pdf_url"]["url"].startswith("data:application/pdf"): mime_type, base64_data = pdf_data["pdf_url"]["url"].split(",", 1) # Check PDF size pdf_size = len(base64_data) * 3 / 4 # Approximate size of decoded base64 if pdf_size > self.MAX_PDF_SIZE: raise ValueError( f"PDF size exceeds {self.MAX_PDF_SIZE/(1024*1024)}MB limit: {pdf_size/(1024*1024):.2f}MB" ) document = { "type": "document", "source": { "type": "base64", "media_type": "application/pdf", "data": base64_data, }, } if pdf_data.get("cache_control"): document["cache_control"] = pdf_data["cache_control"] return document else: document = { "type": "document", "source": {"type": "url", "url": pdf_data["pdf_url"]["url"]}, } if pdf_data.get("cache_control"): document["cache_control"] = pdf_data["cache_control"] return document async def pipe( self, body: Dict, __event_emitter__=None ) -> Union[str, AsyncIterator[str]]: """ Process a request to the Anthropic API. Args: body: The request body containing messages and parameters __event_emitter__: Optional event emitter for status updates Returns: Either a string response or an async iterator for streaming responses """ if not self.valves.ANTHROPIC_API_KEY: error_msg = "Error: ANTHROPIC_API_KEY is required" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) return {"content": error_msg, "format": "text"} try: system_message, messages = pop_system_message(body["messages"]) model_name = body["model"].split("/")[-1] # Handle thinking model variants (remove -thinking suffix for API call) is_thinking_variant = model_name.endswith("-thinking") actual_model_name = model_name.replace("-thinking", "") if is_thinking_variant else model_name if actual_model_name not in self.MODEL_MAX_TOKENS: logging.warning( f"Unknown model: {actual_model_name}, using default token limit" ) # Get max tokens for the model max_tokens_limit = self.MODEL_MAX_TOKENS.get(actual_model_name, 4096) # If MAX_OUTPUT_TOKENS valve is enabled, use the maximum possible tokens for the model if self.valves.MAX_OUTPUT_TOKENS: max_tokens = max_tokens_limit else: max_tokens = min( body.get("max_tokens", max_tokens_limit), max_tokens_limit ) payload = { "model": actual_model_name, # Use actual model name for API call "messages": self._process_messages(messages), "max_tokens": max_tokens, "temperature": ( float(body.get("temperature")) if body.get("temperature") is not None else None ), "top_k": ( int(body.get("top_k")) if body.get("top_k") is not None else None ), "top_p": ( float(body.get("top_p")) if body.get("top_p") is not None else None ), "stream": body.get("stream", False), "metadata": body.get("metadata", {}), } # Determine if thinking should be enabled based on model selection # For hybrid models: only enable thinking if explicitly chosen via -thinking suffix # For non-hybrid models: use global valve setting if actual_model_name in self.THINKING_SUPPORTED_MODELS: # This is a hybrid model - respect explicit choice should_enable_thinking = is_thinking_variant else: # This is a non-hybrid model - use global valve (though these don't support thinking anyway) should_enable_thinking = self.valves.ENABLE_THINKING if should_enable_thinking and actual_model_name in self.THINKING_SUPPORTED_MODELS: # Use enhanced thinking budget for Claude 4 models thinking_budget = self.valves.THINKING_BUDGET_TOKENS if any(claude4_model in actual_model_name for claude4_model in ["claude-sonnet-4", "claude-opus-4"]): thinking_budget = min(self.CLAUDE_4_THINKING_BUDGET, self.valves.THINKING_BUDGET_TOKENS * 2) payload["thinking"] = { "type": "enabled", "budget_tokens": thinking_budget, } payload = {k: v for k, v in payload.items() if v is not None} # Add system message if enabled if system_message and self.valves.ENABLE_SYSTEM_PROMPT: payload["system"] = str(system_message) # Add tools if enabled if "tools" in body and self.valves.ENABLE_TOOL_CHOICE: payload["tools"] = [ {"type": "function", "function": tool} for tool in body["tools"] ] payload["tool_choice"] = body.get("tool_choice") if "response_format" in body: payload["response_format"] = { "type": body["response_format"].get("type") } headers = { "x-api-key": self.valves.ANTHROPIC_API_KEY, "anthropic-version": self.API_VERSION, "content-type": "application/json", } beta_headers = [] # Add PDF beta header if needed if any( isinstance(msg["content"], list) and any(item.get("type") == "pdf_url" for item in msg["content"]) for msg in body.get("messages", []) ): beta_headers.append(self.PDF_BETA_HEADER) # Add cache control beta header if needed if any( isinstance(msg["content"], list) and any(item.get("cache_control") for item in msg["content"]) for msg in body.get("messages", []) ): beta_headers.append(self.BETA_HEADER) # Add 128K output beta header for Claude 3.7 and 4 models if ( actual_model_name in self.THINKING_SUPPORTED_MODELS and self.valves.MAX_OUTPUT_TOKENS ): beta_headers.append(self.OUTPUT_128K_BETA) if beta_headers: headers["anthropic-beta"] = ",".join(beta_headers) try: if payload["stream"]: return self._stream_with_ui( self.MODEL_URL, headers, payload, body, __event_emitter__ ) response_data, cache_metrics = await self._send_request( self.MODEL_URL, headers, payload ) if ( isinstance(response_data, dict) and "content" in response_data and response_data.get("format") == "text" ): # This is an error response if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": response_data["content"], "done": True, }, } ) return response_data["content"] # Handle tool calls in the response if any( block.get("type") == "tool_use" for block in response_data.get("content", []) ): tool_blocks = [ block for block in response_data.get("content", []) if block.get("type") == "tool_use" ] tool_calls = [] for block in tool_blocks: tool_use = block["tool_use"] tool_calls.append( { "id": tool_use["id"], "type": "function", "function": { "name": tool_use["name"], "arguments": tool_use["input"], }, } ) if tool_calls: return json.dumps( {"type": "tool_calls", "tool_calls": tool_calls} ) # Handle thinking in the response thinking_content = None if should_enable_thinking and actual_model_name in self.THINKING_SUPPORTED_MODELS: thinking_blocks = [ block for block in response_data.get("content", []) if block.get("type") == "thinking" ] if thinking_blocks: thinking_content = thinking_blocks[0].get("thinking", "") # Get the text response text_blocks = [ block for block in response_data.get("content", []) if block.get("type") == "text" ] response_text = text_blocks[0]["text"] if text_blocks else "" # If thinking is available, wrap it with <thinking> tags and prepend to the response if thinking_content: response_text = ( f"<thinking>{thinking_content}</thinking>{response_text}" ) return response_text except Exception as e: error_msg = f"Request failed: {str(e)}" if self.request_id: error_msg += f" (Request ID: {self.request_id})" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) return {"content": error_msg, "format": "text"} except Exception as e: error_msg = f"Error: {str(e)}" if self.request_id: error_msg += f" (Request ID: {self.request_id})" if __event_emitter__: await __event_emitter__( {"type": "status", "data": {"description": error_msg, "done": True}} ) return {"content": error_msg, "format": "text"} async def _stream_with_ui( self, url: str, headers: dict, payload: dict, body: dict, __event_emitter__=None ) -> AsyncIterator[str]: """ Stream responses from the Anthropic API with UI event updates. Yields text chunks including extended thinking tokens. """ try: async with aiohttp.ClientSession() as session: timeout = aiohttp.ClientTimeout(total=self.REQUEST_TIMEOUT) async with session.post( url, headers=headers, json=payload, timeout=timeout ) as response: self.request_id = response.headers.get("x-request-id") if response.status != 200: error_text = await response.text() error_msg = f"Error: HTTP {response.status}: {error_text}" if self.request_id: error_msg += f" (Request ID: {self.request_id})" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) yield error_msg return is_thinking = False async for line in response.content: if line and line.startswith(b"data: "): try: line_text = line[6:].decode("utf-8").strip() if line_text == "[DONE]": if is_thinking: yield "</thinking>" break data = json.loads(line_text) # Start of a new content block if data["type"] == "content_block_start": block_type = data.get("content_block", {}).get( "type" ) if block_type == "thinking": is_thinking = True if self.valves.ENABLE_THINKING: yield "<thinking>" elif block_type == "text": is_thinking = False # Handling deltas inside a block elif data["type"] == "content_block_delta": delta_type = data.get("delta", {}).get("type") if ( is_thinking and delta_type == "thinking_delta" and self.valves.ENABLE_THINKING ): # Stream thinking tokens yield data["delta"].get("thinking", "") elif ( not is_thinking ) and delta_type == "text_delta": # Stream regular text tokens yield data["delta"].get("text", "") # End of the current content block elif data["type"] == "content_block_stop": if is_thinking: yield "</thinking>" is_thinking = False elif data["type"] == "message_stop": break except json.JSONDecodeError as e: logging.error( f"Failed to parse streaming response: {e}" ) continue except asyncio.TimeoutError: error_msg = "Request timed out" if self.request_id: error_msg += f" (Request ID: {self.request_id})" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) yield error_msg except Exception as e: error_msg = f"Stream error: {str(e)}" if self.request_id: error_msg += f" (Request ID: {self.request_id})" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) yield error_msg def _process_messages(self, messages: List[dict]) -> List[dict]: """ Process messages for the Anthropic API format. Args: messages: List of message objects Returns: Processed messages in Anthropic API format """ processed_messages = [] for message in messages: processed_content = [] for content in self.process_content(message["content"]): if ( message.get("role") == "assistant" and content.get("type") == "tool_calls" ): content["cache_control"] = {"type": "ephemeral"} elif ( message.get("role") == "user" and content.get("type") == "tool_results" ): content["cache_control"] = {"type": "ephemeral"} processed_content.append(content) processed_messages.append( {"role": message["role"], "content": processed_content} ) return processed_messages async def _send_request( self, url: str, headers: dict, payload: dict ) -> tuple[dict, Optional[dict]]: """ Send a request to the Anthropic API with enhanced retry logic. Args: url: The API endpoint URL headers: Request headers payload: Request payload Returns: Tuple of (response_data, cache_metrics) """ retry_count = 0 base_delay = 1 # Start with 1 second delay max_retries = 5 # Increased from 3 to 5 for better reliability retry_status_codes = [429, 500, 502, 503, 504] # Status codes to retry on while retry_count < max_retries: try: async with aiohttp.ClientSession() as session: timeout = aiohttp.ClientTimeout(total=self.REQUEST_TIMEOUT) async with session.post( url, headers=headers, json=payload, timeout=timeout ) as response: self.request_id = response.headers.get("x-request-id") org_id = response.headers.get("anthropic-organization-id") response_text = await response.text() # Handle rate limiting and server errors with exponential backoff if response.status in retry_status_codes: # Use retry-after header if available, otherwise use exponential backoff retry_after = int( response.headers.get( "retry-after", base_delay * (2**retry_count) ) ) # Add jitter to avoid thundering herd problem jitter = random.uniform(0, 0.1 * retry_after) retry_time = retry_after + jitter logging.warning( f"Request failed with status {response.status}. " f"Retrying in {retry_time:.2f} seconds. " f"Retry count: {retry_count + 1}/{max_retries}" ) await asyncio.sleep(retry_time) retry_count += 1 continue if response.status != 200: error_msg = f"Error: HTTP {response.status}" try: error_data = json.loads(response_text).get("error", {}) error_msg += ( f": {error_data.get('message', response_text)}" ) # Include error type and code if available if error_data.get("type"): error_msg += f" (Type: {error_data.get('type')})" if error_data.get("code"): error_msg += f" (Code: {error_data.get('code')})" except: error_msg += f": {response_text}" if self.request_id: error_msg += f" (Request ID: {self.request_id})" logging.error(error_msg) return {"content": error_msg, "format": "text"}, None result = json.loads(response_text) usage = result.get("usage", {}) cache_metrics = { "cache_creation_input_tokens": usage.get( "cache_creation_input_tokens", 0 ), "cache_read_input_tokens": usage.get( "cache_read_input_tokens", 0 ), "input_tokens": usage.get("input_tokens", 0), "output_tokens": usage.get("output_tokens", 0), } # Log usage metrics for monitoring logging.info( f"Request successful. Input tokens: {usage.get('input_tokens', 0)}, " f"Output tokens: {usage.get('output_tokens', 0)}" ) return result, cache_metrics except aiohttp.ClientError as e: logging.error(f"Request failed: {str(e)}") if retry_count < max_retries - 1: retry_count += 1 retry_time = base_delay * (2**retry_count) logging.info( f"Retrying in {retry_time} seconds. Retry count: {retry_count}/{max_retries}" ) await asyncio.sleep(retry_time) continue raise except asyncio.TimeoutError: logging.error(f"Request timed out after {self.REQUEST_TIMEOUT} seconds") if retry_count < max_retries - 1: retry_count += 1 retry_time = base_delay * (2**retry_count) logging.info( f"Retrying in {retry_time} seconds after timeout. Retry count: {retry_count}/{max_retries}" ) await asyncio.sleep(retry_time) continue raise logging.error(f"Max retries ({max_retries}) exceeded.") return { "content": f"Max retries ({max_retries}) exceeded", "format": "text", }, None