We're Hiring!
Whitepaper
Docs
Sign In
Function
Function
pipe
v0.3.1
claude_4_5_with_thinking
Last Updated
2 months ago
Created
2 months ago
Function ID
claude_4_5_with_thinking
Creator
@niknub
Downloads
190+
Get
Sponsored by Open WebUI Inc.
We are hiring!
Shape the way humanity engages with
intelligence
.
Description
Anthropic Manifold Pipe with Extended Thinking Mode.
README
Function Code
Show
""" title: Anthropic Manifold Pipe with Extended Thinking authors: justinh-rahb and christian-taillon, Mark Kazakov, Vincent, extended thinking support added by NIK-NUB author_url: https://github.com/NIK-NUB version: 0.3.1 required_open_webui_version: 0.3.17 license: MIT """ import os import requests import json import time from typing import List, Union, Generator, Iterator from pydantic import BaseModel, Field from open_webui.utils.misc import pop_system_message class Pipe: class Valves(BaseModel): ANTHROPIC_API_KEY: str = Field(default="", description="Antropic API Key") CLAUDE_45_USE_TEMPERATURE: bool = Field( default=True, description="For Claude 4.5: Use temperature (True) or top_p (False). Claude 4.5 only supports one.", ) BETA_FEATURES: str = Field( default="", description="Enable Antrophic Beta Features. eq: context-management-2025-06-27", ) ENABLE_THINKING: bool = Field( default=True, description="Enable Claude's extended thinking capabilities (Claude 4.5 Sonnet with thinking model only)", ) THINKING_BUDGET: int = Field( default=16000, description="Maximum number of tokens Claude can use for thinking (min: 1024, max: 32000)", ) DISPLAY_THINKING: bool = Field( default=True, description="Display Claude's thinking process in the chat" ) def __init__(self): self.type = "manifold" self.id = "anthropic" self.name = "anthropic/" self.valves = self.Valves( **{ "ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", ""), "CLAUDE_45_USE_TEMPERATURE": True, "BETA_FEATURES": "", "ENABLE_THINKING": True, "THINKING_BUDGET": 16000, "DISPLAY_THINKING": True, } ) self.MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5MB per image pass def get_anthropic_models(self): return [ {"id": "claude-3-5-haiku-latest", "name": "claude-3.5-haiku"}, {"id": "claude-opus-4-1-20250805", "name": "claude-4.1-opus"}, {"id": "claude-sonnet-4-5-20250929", "name": "claude-4.5-sonnet"}, { "id": "claude-sonnet-4-5-20250929-think", "name": "claude-4.5-sonnet-with-thinking", }, ] def pipes(self) -> List[dict]: return self.get_anthropic_models() def process_image(self, image_data): """Process image data with size validation.""" if image_data["image_url"]["url"].startswith("data:image"): mime_type, base64_data = image_data["image_url"]["url"].split(",", 1) media_type = mime_type.split(":")[1].split(";")[0] # Check base64 image size image_size = len(base64_data) * 3 / 4 # Convert base64 size to bytes if image_size > self.MAX_IMAGE_SIZE: raise ValueError( f"Image size exceeds 5MB limit: {image_size / (1024 * 1024):.2f}MB" ) return { "type": "image", "source": { "type": "base64", "media_type": media_type, "data": base64_data, }, } else: # For URL images, perform size check after fetching url = image_data["image_url"]["url"] response = requests.head(url, allow_redirects=True) content_length = int(response.headers.get("content-length", 0)) if content_length > self.MAX_IMAGE_SIZE: raise ValueError( f"Image at URL exceeds 5MB limit: {content_length / (1024 * 1024):.2f}MB" ) return { "type": "image", "source": {"type": "url", "url": url}, } def pipe(self, body: dict) -> Union[str, Generator, Iterator]: system_message, messages = pop_system_message(body["messages"]) processed_messages = [] total_image_size = 0 for message in messages: processed_content = [] if isinstance(message.get("content"), list): for item in message["content"]: if item["type"] == "text": processed_content.append({"type": "text", "text": item["text"]}) elif item["type"] == "image_url": processed_image = self.process_image(item) processed_content.append(processed_image) # Track total size for base64 images if processed_image["source"]["type"] == "base64": image_size = len(processed_image["source"]["data"]) * 3 / 4 total_image_size += image_size if ( total_image_size > 100 * 1024 * 1024 ): # 100MB total limit raise ValueError( "Total size of images exceeds 100 MB limit" ) elif item["type"] == "thinking" and "signature" in item: # Include thinking blocks if present in the message processed_content.append( { "type": "thinking", "thinking": item["thinking"], "signature": item["signature"], } ) elif item["type"] == "redacted_thinking" and "data" in item: # Include redacted thinking blocks if present processed_content.append( {"type": "redacted_thinking", "data": item["data"]} ) else: processed_content = [ {"type": "text", "text": message.get("content", "")} ] processed_messages.append( {"role": message["role"], "content": processed_content} ) model_name = body["model"][body["model"].find(".") + 1 :] # Check if this is a thinking model is_thinking_model = model_name.endswith("-think") # Remove the "-think" suffix for API call if present api_model_name = ( model_name.replace("-think", "") if is_thinking_model else model_name ) # Determine if thinking will be enabled will_enable_thinking = ( self.valves.ENABLE_THINKING and is_thinking_model and "claude-sonnet-4-5" in model_name ) payload = { "model": api_model_name, "messages": processed_messages, "max_tokens": body.get("max_tokens", 4096), "stop_sequences": body.get("stop", []), **({"system": str(system_message)} if system_message else {}), "stream": body.get("stream", False), } # Only add top_k if thinking is NOT enabled if not will_enable_thinking: payload["top_k"] = body.get("top_k", 40) # Add extended thinking for Claude 4.5 Sonnet with thinking if will_enable_thinking: # Ensure thinking budget is within reasonable limits (1024-32000 tokens) thinking_budget = max(1024, min(32000, self.valves.THINKING_BUDGET)) payload["thinking"] = {"type": "enabled", "budget_tokens": thinking_budget} # Handle temperature/top_p settings if api_model_name.startswith("claude-sonnet-4-5"): if is_thinking_model: # For thinking model, always use temperature = 1.0 payload["temperature"] = 1.0 elif self.valves.CLAUDE_45_USE_TEMPERATURE: payload["temperature"] = body.get("temperature", 0.8) else: payload["top_p"] = body.get("top_p", 0.9) else: # Other Claude models support both payload["temperature"] = body.get("temperature", 0.8) payload["top_p"] = body.get("top_p", 0.9) headers = { "x-api-key": self.valves.ANTHROPIC_API_KEY, "anthropic-version": "2023-06-01", "content-type": "application/json", } if self.valves.BETA_FEATURES: headers["anthropic-beta"] = self.valves.BETA_FEATURES url = "https://api.anthropic.com/v1/messages" try: if body.get("stream", False): return self.stream_response(url, headers, payload) else: return self.non_stream_response(url, headers, payload) except requests.exceptions.RequestException as e: print(f"Request failed: {e}") return f"Error: Request failed: {e}" except Exception as e: print(f"Error in pipe method: {e}") return f"Error: {e}" def stream_response(self, url, headers, payload): """Handle streaming response with the OpenWebUI thinking tags.""" try: with requests.post( url, headers=headers, json=payload, stream=True, timeout=(3.05, 60) ) as response: if response.status_code != 200: error_text = response.text try: error_json = response.json() if "error" in error_json: error_text = error_json["error"].get("message", error_text) except: pass raise Exception(f"HTTP Error {response.status_code}: {error_text}") thinking_content = "" is_thinking_block = False is_text_block = False has_yielded_thinking = False has_yielded_think_tag = False for line in response.iter_lines(): if line: line = line.decode("utf-8") if line.startswith("data: "): try: data = json.loads(line[6:]) # Handle content block starts if data["type"] == "content_block_start": block_type = data["content_block"].get("type", "") # Handle thinking block start if block_type == "thinking": is_thinking_block = True # Emit thinking start tag immediately if ( not has_yielded_think_tag and self.valves.DISPLAY_THINKING ): yield "<think>" has_yielded_think_tag = True # Handle transition to text block elif block_type == "text": # If we were in a thinking block, close it before starting text if is_thinking_block and has_yielded_think_tag: yield "</think>" has_yielded_thinking = True is_thinking_block = False is_text_block = True # For text blocks, yield the initial text if any if ( "text" in data["content_block"] and data["content_block"]["text"] ): yield data["content_block"]["text"] # Handle redacted thinking block elif ( block_type == "redacted_thinking" and self.valves.DISPLAY_THINKING ): if not has_yielded_think_tag: yield "<think>" has_yielded_think_tag = True yield "[Redacted thinking content]" # Handle block deltas elif data["type"] == "content_block_delta": delta = data["delta"] # Stream thinking deltas with the thinking tag if ( delta["type"] == "thinking_delta" and is_thinking_block and self.valves.DISPLAY_THINKING ): thinking_content += delta["thinking"] yield delta["thinking"] # Stream text deltas normally elif ( delta["type"] == "text_delta" and is_text_block ): yield delta["text"] # Handle block stops elif data["type"] == "content_block_stop": if is_thinking_block: is_thinking_block = False # Close thinking tag at the end of thinking block if ( has_yielded_think_tag and not has_yielded_thinking ): yield "</think>" has_yielded_thinking = True elif is_text_block: is_text_block = False # Handle message stop elif data["type"] == "message_stop": # Make sure thinking tag is closed if needed if ( has_yielded_think_tag and not has_yielded_thinking ): yield "</think>" break # Handle single message (non-streaming style response in stream) elif data["type"] == "message": has_thinking = False # First check if there's thinking content for content in data.get("content", []): if ( content["type"] == "thinking" or content["type"] == "redacted_thinking" ) and self.valves.DISPLAY_THINKING: has_thinking = True break # If there's thinking, handle it first if has_thinking: yield "<think>" for content in data.get("content", []): if ( content["type"] == "thinking" and self.valves.DISPLAY_THINKING ): yield content["thinking"] elif ( content["type"] == "redacted_thinking" and self.valves.DISPLAY_THINKING ): yield "[Redacted thinking content]" yield "</think>" # Then yield all text blocks for content in data.get("content", []): if content["type"] == "text": yield content["text"] time.sleep( 0.01 ) # Delay to avoid overwhelming the client except json.JSONDecodeError: print(f"Failed to parse JSON: {line}") except KeyError as e: print(f"Unexpected data structure: {e}") print(f"Full data: {data}") except requests.exceptions.RequestException as e: print(f"Request failed: {e}") yield f"Error: Request failed: {e}" except Exception as e: print(f"General error in stream_response method: {e}") yield f"Error: {e}" def non_stream_response(self, url, headers, payload): """Handle non-streaming response from Anthropic API, including thinking blocks.""" try: response = requests.post( url, headers=headers, json=payload, timeout=(3.05, 60) ) if response.status_code != 200: error_text = response.text try: error_json = response.json() if "error" in error_json: error_text = error_json["error"].get("message", error_text) except: pass raise Exception(f"HTTP Error {response.status_code}: {error_text}") res = response.json() if "content" not in res or not res["content"]: return "" has_thinking = False thinking_content = "" text_content = "" # First organize content by type for content_block in res["content"]: if content_block["type"] == "thinking" and self.valves.DISPLAY_THINKING: has_thinking = True thinking_content += content_block["thinking"] elif ( content_block["type"] == "redacted_thinking" and self.valves.DISPLAY_THINKING ): has_thinking = True thinking_content += "[Redacted thinking content]" elif content_block["type"] == "text": text_content += content_block["text"] # Then construct the response with the <think> tags result = "" if has_thinking: result += f"<think>{thinking_content}</think>" result += text_content return result except requests.exceptions.RequestException as e: print(f"Failed non-stream request: {e}") return f"Error: {e}"