We're Hiring!
Whitepaper
Docs
Sign In
Function
Function
pipe
v0.6.1
Anthropic
Last Updated
a month ago
Created
2 months ago
Function ID
anthropic
Creator
@niknub
Downloads
118+
Get
Sponsored by Open WebUI Inc.
We are hiring!
Shape the way humanity engages with
intelligence
.
Description
Anthropic Manifold Pipe (Support Claude 4.X and Extended Thinking)
README
Function Code
Show
""" title: Anthropic Manifold Pipe with Extended Thinking and Cache Control authors: justinh-rahb, christian-taillon, jfbloom22, Mark Kazakov, Vincent, Snav. Extended thinking mode added by NIK-NUB author_url: https://github.com/NIK-NUB funding_url: https://github.com/open-webui version: 0.6.1 required_open_webui_version: 0.6.32 license: MIT description: An advanced manifold pipe for interacting with Anthropic's Claude models, featuring extended thinking support, cache control, beta features, and model handling for Claude 4.X. """ import os import requests import json import time from typing import List, Union, Generator, Iterator, Optional, Dict from pydantic import BaseModel, Field from open_webui.utils.misc import pop_system_message class Pipe: CACHE_TTL = "1h" MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5MB per image MAX_TOTAL_IMAGE_SIZE = 100 * 1024 * 1024 # 100MB total limit class Valves(BaseModel): ANTHROPIC_API_KEY: str = Field(default="", description="Anthropic API Key") CLAUDE_USE_TEMPERATURE: bool = Field( default=True, description="For Claude 4.X: Use temperature (True) or top_p (False). Claude 4.X models only supports one.", ) BETA_FEATURES: str = Field( default="", description="Enable Anthropic Beta Features. e.g.: context-management-2025-06-27", ) ENABLE_THINKING: bool = Field( default=True, description="Enable Claude's extended thinking capabilities (Claude 4.X series and Claude 3.7 Sonnet only)", ) THINKING_BUDGET: int = Field( default=16000, description="Maximum number of tokens Claude can use for thinking (min: 1024, max: 32000)", ) DISPLAY_THINKING: bool = Field( default=True, description="Display Claude's thinking process in the chat" ) def __init__(self): self.type = "manifold" self.id = "anthropic" self.name = "anthropic/" self.valves = self.Valves( **{ "ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", ""), "CLAUDE_USE_TEMPERATURE": True, "BETA_FEATURES": "", "ENABLE_THINKING": True, "THINKING_BUDGET": 16000, "DISPLAY_THINKING": True, } ) # Model cache self._model_cache: Optional[List[Dict[str, str]]] = None self._model_cache_time: float = 0 self._cache_ttl = int(os.getenv("ANTHROPIC_MODEL_CACHE_TTL", "600")) def _get_api_headers(self, include_beta: bool = False) -> Dict[str, str]: """Build API headers with optional beta features.""" headers = { "x-api-key": self.valves.ANTHROPIC_API_KEY, "anthropic-version": "2023-06-01", "content-type": "application/json", } if include_beta and self.valves.BETA_FEATURES: headers["anthropic-beta"] = self.valves.BETA_FEATURES return headers def _handle_api_error(self, response: requests.Response) -> str: """Extract error message from API response.""" error_text = response.text try: error_json = response.json() if "error" in error_json: error_text = error_json["error"].get("message", error_text) except: pass return f"HTTP Error {response.status_code}: {error_text}" def _calculate_base64_size(self, base64_data: str) -> float: """Calculate actual size from base64 encoded data.""" return len(base64_data) * 3 / 4 def get_anthropic_models(self, force_refresh: bool = False) -> List[Dict[str, str]]: """ Retrieve available Anthropic models from the API. Uses caching to reduce API calls. """ # Check cache first current_time = time.time() if ( not force_refresh and self._model_cache is not None and (current_time - self._model_cache_time) < self._cache_ttl ): return self._model_cache if not self.valves.ANTHROPIC_API_KEY: return [ { "id": "error", "name": "ANTHROPIC_API_KEY is not set. Please update the API Key in the valves.", } ] try: response = requests.get( "https://api.anthropic.com/v1/models", headers=self._get_api_headers(), timeout=10, ) if response.status_code != 200: raise Exception(self._handle_api_error(response)) data = response.json() models = [ { "id": model["id"], "name": model.get("display_name", model["id"]), } for model in data.get("data", []) ] # Update cache self._model_cache = models self._model_cache_time = current_time return models except Exception as e: print(f"Error fetching Anthropic models: {e}") return [ { "id": "error", "name": f"Could not fetch models from Anthropic: {str(e)}", } ] def _attach_cache_control(self, block: dict) -> dict: """Attach cache control to a content block.""" if not isinstance(block, dict): return block # Skip block types that cannot be cached if block.get("type") in {"thinking", "redacted_thinking"}: return block if not block.get("type"): block["type"] = "text" if "text" not in block: block["text"] = "" block["cache_control"] = {"type": "ephemeral", "ttl": self.CACHE_TTL} return block def _normalize_content_blocks(self, raw_content) -> List[dict]: """Normalize content into proper block format.""" blocks = [] items = raw_content if isinstance(raw_content, list) else [raw_content] for item in items: if isinstance(item, dict): if item.get("type"): blocks.append(dict(item)) elif "content" in item: blocks.extend(self._normalize_content_blocks(item["content"])) elif item: blocks.append({"type": "text", "text": str(item)}) elif item is not None: blocks.append({"type": "text", "text": str(item)}) return blocks def _prepare_system_blocks(self, system_message) -> Optional[List[dict]]: """Prepare system message with cache control.""" if not system_message: return None content = ( system_message.get("content") if isinstance(system_message, dict) and "content" in system_message else system_message ) normalized_blocks = self._normalize_content_blocks(content) return [ self._attach_cache_control(block) for block in normalized_blocks ] or None def _apply_cache_control_to_last_message(self, messages: List[dict]): """Apply cache control to the last user message.""" if not messages or messages[-1].get("role") != "user": return for block in reversed(messages[-1].get("content", [])): if isinstance(block, dict) and block.get("type") not in { "thinking", "redacted_thinking", }: self._attach_cache_control(block) break def pipes(self) -> List[dict]: return self.get_anthropic_models() def process_image(self, image_data: dict) -> dict: """Process image data with size validation.""" url = image_data["image_url"]["url"] if url.startswith("data:image"): mime_type, base64_data = url.split(",", 1) media_type = mime_type.split(":")[1].split(";")[0] # Validate base64 image size image_size = self._calculate_base64_size(base64_data) if image_size > self.MAX_IMAGE_SIZE: raise ValueError( f"Image size exceeds 5MB limit: {image_size / (1024 * 1024):.2f}MB" ) return { "type": "image", "source": { "type": "base64", "media_type": media_type, "data": base64_data, }, } else: # Validate URL image size response = requests.head(url, allow_redirects=True) content_length = int(response.headers.get("content-length", 0)) if content_length > self.MAX_IMAGE_SIZE: raise ValueError( f"Image at URL exceeds 5MB limit: {content_length / (1024 * 1024):.2f}MB" ) return { "type": "image", "source": {"type": "url", "url": url}, } def _is_thinking_compatible_model(self, model_name: str) -> bool: """Check if the model supports extended thinking.""" try: model_release_date = int(model_name[-8:]) return model_release_date >= 20250219 except (ValueError, IndexError): return False def _is_new_generation_model(self, model_name: str) -> bool: """Check if the model is a Claude 4.x model that only supports temperature OR top_p.""" try: model_release_date = int(model_name[-8:]) return model_release_date >= 20250514 except (ValueError, IndexError): return False def _process_messages(self, messages: List[dict]) -> tuple: """Process messages and handle images.""" processed_messages = [] total_image_size = 0.0 for message in messages: processed_content = [] content = message.get("content", "") if isinstance(content, list): for item in content: item_type = item.get("type") if item_type == "text": processed_content.append({"type": "text", "text": item["text"]}) elif item_type == "image_url": processed_image = self.process_image(item) processed_content.append(processed_image) # Track total size for base64 images if processed_image["source"]["type"] == "base64": image_size = self._calculate_base64_size( processed_image["source"]["data"] ) total_image_size += image_size if total_image_size > self.MAX_TOTAL_IMAGE_SIZE: raise ValueError( "Total size of images exceeds 100 MB limit" ) elif item_type == "thinking" and "signature" in item: processed_content.append( { "type": "thinking", "thinking": item["thinking"], "signature": item["signature"], } ) elif item_type == "redacted_thinking" and "data" in item: processed_content.append( {"type": "redacted_thinking", "data": item["data"]} ) else: processed_content = [{"type": "text", "text": str(content)}] processed_messages.append( {"role": message["role"], "content": processed_content} ) return processed_messages, total_image_size def _configure_sampling_params( self, payload: dict, body: dict, is_thinking_enabled: bool, is_new_gen: bool ): """Configure temperature, top_p, and top_k based on model generation and thinking mode.""" if is_thinking_enabled: # Extended thinking requires temperature = 1.0 payload["temperature"] = 1.0 elif is_new_gen: # New generation models (Claude 4.x): only temperature OR top_p, never both if self.valves.CLAUDE_USE_TEMPERATURE: payload["temperature"] = body.get("temperature", 0.8) else: payload["top_p"] = body.get("top_p", 0.9) else: # Older Claude models support temperature, top_p, and top_k simultaneously payload["temperature"] = body.get("temperature", 0.8) payload["top_p"] = body.get("top_p", 0.9) payload["top_k"] = body.get("top_k", 40) def pipe(self, body: dict) -> Union[str, Generator, Iterator]: system_message, messages = pop_system_message(body["messages"]) processed_messages, _ = self._process_messages(messages) system_blocks = self._prepare_system_blocks(system_message) self._apply_cache_control_to_last_message(processed_messages) model_name = body["model"][body["model"].find(".") + 1 :] is_thinking_compatible = self._is_thinking_compatible_model(model_name) is_new_gen_model = self._is_new_generation_model(model_name) will_enable_thinking = self.valves.ENABLE_THINKING and is_thinking_compatible payload = { "model": model_name, "messages": processed_messages, "max_tokens": body.get("max_tokens", 4096), "stop_sequences": body.get("stop", []), "stream": body.get("stream", False), } if system_blocks: payload["system"] = system_blocks # Configure thinking mode if will_enable_thinking: thinking_budget = max(1024, min(32000, self.valves.THINKING_BUDGET)) requested_max_tokens = body.get("max_tokens", 4096) payload["max_tokens"] = max(requested_max_tokens, thinking_budget + 2048) payload["thinking"] = {"type": "enabled", "budget_tokens": thinking_budget} print( f"Thinking enabled with budget: {thinking_budget}, max_tokens: {payload['max_tokens']}" ) # Configure sampling parameters self._configure_sampling_params( payload, body, will_enable_thinking, is_new_gen_model ) url = "https://api.anthropic.com/v1/messages" headers = self._get_api_headers(include_beta=True) try: if body.get("stream", False): return self.stream_response(url, headers, payload) else: return self.non_stream_response(url, headers, payload) except requests.exceptions.RequestException as e: print(f"Request failed: {e}") return f"Error: Request failed: {e}" except Exception as e: print(f"Error in pipe method: {e}") return f"Error: {e}" def stream_response(self, url: str, headers: dict, payload: dict) -> Generator: """Handle streaming response with the OpenWebUI thinking tags.""" try: with requests.post( url, headers=headers, json=payload, stream=True, timeout=(3.05, 60) ) as response: if response.status_code != 200: raise Exception(self._handle_api_error(response)) in_thinking = False in_text = False for line in response.iter_lines(): if not line: continue line = line.decode("utf-8") if not line.startswith("data: "): continue try: data = json.loads(line[6:]) event_type = data.get("type") # Handle content block starts if event_type == "content_block_start": block_type = data["content_block"].get("type", "") if block_type == "thinking": in_thinking = True if self.valves.DISPLAY_THINKING: yield "<think>" # Yield initial thinking if present if data["content_block"].get("thinking"): yield data["content_block"]["thinking"] elif block_type == "text": # Close thinking if transitioning to text if in_thinking and self.valves.DISPLAY_THINKING: yield "</think>" in_thinking = False in_text = True # Yield initial text if present if data["content_block"].get("text"): yield data["content_block"]["text"] elif ( block_type == "redacted_thinking" and self.valves.DISPLAY_THINKING ): in_thinking = True yield "<think>[Redacted thinking content]" # Handle block deltas elif event_type == "content_block_delta": delta = data["delta"] delta_type = delta.get("type") if ( delta_type == "thinking_delta" and in_thinking and self.valves.DISPLAY_THINKING ): yield delta["thinking"] elif delta_type == "text_delta" and in_text: yield delta["text"] # Handle block stops elif event_type == "content_block_stop": if in_thinking and self.valves.DISPLAY_THINKING: yield "</think>" in_thinking = False elif in_text: in_text = False # Handle message stop elif event_type == "message_stop": if in_thinking and self.valves.DISPLAY_THINKING: yield "</think>" break # Handle full message in stream elif event_type == "message": has_thinking = any( c.get("type") in {"thinking", "redacted_thinking"} for c in data.get("content", []) ) if has_thinking and self.valves.DISPLAY_THINKING: yield "<think>" for content in data.get("content", []): if content["type"] == "thinking": yield content["thinking"] elif content["type"] == "redacted_thinking": yield "[Redacted thinking content]" yield "</think>" for content in data.get("content", []): if content["type"] == "text": yield content["text"] time.sleep(0.01) except json.JSONDecodeError: print(f"Failed to parse JSON: {line}") except KeyError as e: print(f"Unexpected data structure: {e}, data: {data}") except requests.exceptions.RequestException as e: print(f"Request failed: {e}") yield f"Error: Request failed: {e}" except Exception as e: print(f"General error in stream_response method: {e}") yield f"Error: {e}" def non_stream_response(self, url: str, headers: dict, payload: dict) -> str: """Handle non-streaming response from Anthropic API, including thinking blocks.""" try: response = requests.post( url, headers=headers, json=payload, timeout=(3.05, 60) ) if response.status_code != 200: raise Exception(self._handle_api_error(response)) res = response.json() content_blocks = res.get("content", []) if not content_blocks: return "" thinking_parts = [] text_parts = [] for block in content_blocks: block_type = block.get("type") if self.valves.DISPLAY_THINKING: if block_type == "thinking": thinking_parts.append(block["thinking"]) elif block_type == "redacted_thinking": thinking_parts.append("[Redacted thinking content]") if block_type == "text": text_parts.append(block["text"]) result = "" if thinking_parts: result += f"<think>{''.join(thinking_parts)}</think>" result += "".join(text_parts) return result except requests.exceptions.RequestException as e: print(f"Failed non-stream request: {e}") return f"Error: {e}"