Whitepaper
Docs
Sign In
Function
Function
pipe
v0.1.0
Claude 3_7 with Extended Thinking
Function ID
claude_3_7_with_extended_thinking_
Creator
@rosselliot
Downloads
59+
Anthropic Manifold Pipe with Extended Thinking
Get
README
No README available
Function Code
Show
""" title: Anthropic Manifold Pipe with Extended Thinking authors: Based on original by justinh-rahb and christian-taillon, extended thinking support added version: 0.1.0 required_open_webui_version: 0.1.0 license: MIT Note that this function includes Claude 3.7 Sonnet (with thinking), 3.5 Sonnet, 3.5 Haiku, and 3.0 Opus. See Anthropic documentation for other models, you can add them under "get_anthropic_models". Be aware that with this pipeline, you can only use Claude 3.7 with thinking enabled or disabled. By that I mean, from chat to chat, you cannot change between the two. To change it, you would have to go into the valves and toggle the thinking capabilities. """ import os import requests import json import time from typing import List, Union, Generator, Iterator from pydantic import BaseModel, Field from open_webui.utils.misc import pop_system_message class Pipe: class Valves(BaseModel): ANTHROPIC_API_KEY: str = Field(default="", description="Your Anthropic API key") ENABLE_THINKING: bool = Field( default=True, description="Enable Claude's extended thinking capabilities (Claude 3.7 Sonnet only)", ) THINKING_BUDGET: int = Field( default=16000, description="Maximum number of tokens Claude can use for thinking (min: 1024, max: 32000)", ) DISPLAY_THINKING: bool = Field( default=True, description="Display Claude's thinking process in the chat" ) def __init__(self): self.type = "manifold" self.id = "anthropic" self.name = "anthropic/" self.valves = self.Valves( **{ "ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", ""), "ENABLE_THINKING": True, "THINKING_BUDGET": 16000, "DISPLAY_THINKING": True, } ) self.MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5MB per image pass def get_anthropic_models(self): # Comprehensive list of Claude models with extended thinking support marked return [ { "id": "claude-3-7-sonnet-20250219", "name": "Claude 3.7 Sonnet (with thinking)", }, {"id": "claude-3-5-sonnet-20241022", "name": "Claude 3.5 Sonnet"}, {"id": "claude-3-opus-20240229", "name": "Claude 3 Opus"}, {"id": "claude-3-5-haiku-20241022", "name": "Claude 3.5 Haiku"}, ] def pipes(self) -> List[dict]: return self.get_anthropic_models() def process_image(self, image_data): """Process image data with size validation.""" if image_data["image_url"]["url"].startswith("data:image"): mime_type, base64_data = image_data["image_url"]["url"].split(",", 1) media_type = mime_type.split(":")[1].split(";")[0] # Check base64 image size image_size = len(base64_data) * 3 / 4 # Convert base64 size to bytes if image_size > self.MAX_IMAGE_SIZE: raise ValueError( f"Image size exceeds 5MB limit: {image_size / (1024 * 1024):.2f}MB" ) return { "type": "image", "source": { "type": "base64", "media_type": media_type, "data": base64_data, }, } else: # For URL images, perform size check after fetching url = image_data["image_url"]["url"] response = requests.head(url, allow_redirects=True) content_length = int(response.headers.get("content-length", 0)) if content_length > self.MAX_IMAGE_SIZE: raise ValueError( f"Image at URL exceeds 5MB limit: {content_length / (1024 * 1024):.2f}MB" ) return { "type": "image", "source": {"type": "url", "url": url}, } def pipe(self, body: dict) -> Union[str, Generator, Iterator]: """Process the request and call Anthropic API. Handles extended thinking for Claude 3.7 Sonnet.""" system_message, messages = pop_system_message(body["messages"]) processed_messages = [] total_image_size = 0 # Process messages and handle thinking blocks from previous turns for message in messages: processed_content = [] if isinstance(message.get("content"), list): for item in message["content"]: if item["type"] == "text": processed_content.append({"type": "text", "text": item["text"]}) elif item["type"] == "image_url": processed_image = self.process_image(item) processed_content.append(processed_image) # Track total size for base64 images if processed_image["source"]["type"] == "base64": image_size = len(processed_image["source"]["data"]) * 3 / 4 total_image_size += image_size if ( total_image_size > 100 * 1024 * 1024 ): # 100MB total limit raise ValueError( "Total size of images exceeds 100 MB limit" ) elif item["type"] == "thinking" and "signature" in item: # Include thinking blocks if present in the message processed_content.append( { "type": "thinking", "thinking": item["thinking"], "signature": item["signature"], } ) elif item["type"] == "redacted_thinking" and "data" in item: # Include redacted thinking blocks if present processed_content.append( {"type": "redacted_thinking", "data": item["data"]} ) else: processed_content = [ {"type": "text", "text": message.get("content", "")} ] processed_messages.append( {"role": message["role"], "content": processed_content} ) # Build the API payload payload = { "model": body["model"][body["model"].find(".") + 1 :], "messages": processed_messages, "max_tokens": body.get("max_tokens", 64000), "temperature": body.get("temperature", 1), "stop_sequences": body.get("stop", []), **({"system": str(system_message)} if system_message else {}), "stream": body.get("stream", False), } # Add extended thinking for Claude 3.7 Sonnet model_name = payload["model"].lower() if self.valves.ENABLE_THINKING and "claude-3-7" in model_name: # Ensure thinking budget is within reasonable limits (1024-32000 tokens) thinking_budget = max(1024, min(32000, self.valves.THINKING_BUDGET)) payload["thinking"] = {"type": "enabled", "budget_tokens": thinking_budget} headers = { "x-api-key": self.valves.ANTHROPIC_API_KEY, "anthropic-version": "2023-06-01", "content-type": "application/json", } url = "https://api.anthropic.com/v1/messages" try: if body.get("stream", False): return self.stream_response(url, headers, payload) else: return self.non_stream_response(url, headers, payload) except requests.exceptions.RequestException as e: error_msg = f"API request failed: {str(e)}" print(error_msg) return f"Error: {error_msg}" except ValueError as e: print(f"Value error: {e}") return f"Error: {e}" except Exception as e: print(f"Unexpected error: {e}") return f"Error: {e}" def stream_response(self, url, headers, payload): """Handle streaming response with the OpenWebUI thinking tags.""" try: with requests.post( url, headers=headers, json=payload, stream=True, timeout=(3.05, 60) ) as response: if response.status_code != 200: error_text = response.text try: error_json = response.json() if "error" in error_json: error_text = error_json["error"].get("message", error_text) except: pass raise Exception(f"HTTP Error {response.status_code}: {error_text}") thinking_content = "" is_thinking_block = False is_text_block = False has_yielded_thinking = False has_yielded_think_tag = False for line in response.iter_lines(): if line: line = line.decode("utf-8") if line.startswith("data: "): try: data = json.loads(line[6:]) # Handle content block starts if data["type"] == "content_block_start": block_type = data["content_block"].get("type", "") # Handle thinking block start if block_type == "thinking": is_thinking_block = True # Emit thinking start tag immediately if ( not has_yielded_think_tag and self.valves.DISPLAY_THINKING ): yield "<think>" has_yielded_think_tag = True # Handle transition to text block elif block_type == "text": # If we were in a thinking block, close it before starting text if is_thinking_block and has_yielded_think_tag: yield "</think>" has_yielded_thinking = True is_thinking_block = False is_text_block = True # For text blocks, yield the initial text if any if ( "text" in data["content_block"] and data["content_block"]["text"] ): yield data["content_block"]["text"] # Handle redacted thinking block elif ( block_type == "redacted_thinking" and self.valves.DISPLAY_THINKING ): if not has_yielded_think_tag: yield "<think>" has_yielded_think_tag = True yield "[Redacted thinking content]" # Handle block deltas elif data["type"] == "content_block_delta": delta = data["delta"] # Stream thinking deltas with the thinking tag if ( delta["type"] == "thinking_delta" and is_thinking_block and self.valves.DISPLAY_THINKING ): thinking_content += delta["thinking"] yield delta["thinking"] # Stream text deltas normally elif ( delta["type"] == "text_delta" and is_text_block ): yield delta["text"] # Handle block stops elif data["type"] == "content_block_stop": if is_thinking_block: is_thinking_block = False # Close thinking tag at the end of thinking block if ( has_yielded_think_tag and not has_yielded_thinking ): yield "</think>" has_yielded_thinking = True elif is_text_block: is_text_block = False # Handle message stop elif data["type"] == "message_stop": # Make sure thinking tag is closed if needed if ( has_yielded_think_tag and not has_yielded_thinking ): yield "</think>" break # Handle single message (non-streaming style response in stream) elif data["type"] == "message": has_thinking = False # First check if there's thinking content for content in data.get("content", []): if ( content["type"] == "thinking" or content["type"] == "redacted_thinking" ) and self.valves.DISPLAY_THINKING: has_thinking = True break # If there's thinking, handle it first if has_thinking: yield "<think>" for content in data.get("content", []): if ( content["type"] == "thinking" and self.valves.DISPLAY_THINKING ): yield content["thinking"] elif ( content["type"] == "redacted_thinking" and self.valves.DISPLAY_THINKING ): yield "[Redacted thinking content]" yield "</think>" # Then yield all text blocks for content in data.get("content", []): if content["type"] == "text": yield content["text"] time.sleep( 0.01 ) # Small delay to prevent overwhelming the client except json.JSONDecodeError: print(f"Failed to parse JSON: {line}") except KeyError as e: print(f"Unexpected data structure: {e}") print(f"Full data: {data}") except requests.exceptions.RequestException as e: print(f"Request failed: {e}") yield f"Error: Request failed: {e}" except Exception as e: print(f"General error in stream_response method: {e}") yield f"Error: {e}" def non_stream_response(self, url, headers, payload): """Handle non-streaming response from Anthropic API, including thinking blocks.""" try: response = requests.post( url, headers=headers, json=payload, timeout=(3.05, 60) ) if response.status_code != 200: error_text = response.text try: error_json = response.json() if "error" in error_json: error_text = error_json["error"].get("message", error_text) except: pass raise Exception(f"HTTP Error {response.status_code}: {error_text}") res = response.json() if "content" not in res or not res["content"]: return "" has_thinking = False thinking_content = "" text_content = "" # First organize content by type for content_block in res["content"]: if content_block["type"] == "thinking" and self.valves.DISPLAY_THINKING: has_thinking = True thinking_content += content_block["thinking"] elif ( content_block["type"] == "redacted_thinking" and self.valves.DISPLAY_THINKING ): has_thinking = True thinking_content += "[Redacted thinking content]" elif content_block["type"] == "text": text_content += content_block["text"] # Then construct the response with the <think> tags result = "" if has_thinking: result += f"<think>{thinking_content}</think>" result += text_content return result except requests.exceptions.RequestException as e: print(f"Failed non-stream request: {e}") return f"Error: {e}"