Whitepaper
Docs
Sign In
Function
Function
pipe
v3.0 - ncks024
Anthropic API Integration w/ Prompt Caching, Claude 3.7 CoT Streaming, & More!
Function ID
anthropic_balaxxe_cot
Creator
@ncks024
Downloads
218+
The original function made by Balaxxe modified to stream CoT tokens
Get
README
No README available
Function Code
Show
""" title: Anthropic API Integration w/ Prompt Caching, Claude 3.7 CoT Streaming, & More! author: Balaxxe version: 3.0 - ncks024 license: MIT requirements: pydantic>=2.0.0, requests>=2.0.0 environment_variables: - ANTHROPIC_API_KEY (required) Supports: - All Claude 3 models - Streaming responses - Image processing - Prompt caching (server-side) - Function calling - PDF processing - Cache Control Updates: Added Claude 3.7 Added valves to support thinking Added Chain of Thought token streaming """ import os import json import time import hashlib import logging import asyncio from datetime import datetime from typing import ( List, Union, Generator, Iterator, Dict, Optional, AsyncIterator, ) from pydantic import BaseModel, Field from open_webui.utils.misc import pop_system_message import aiohttp class Pipe: API_VERSION = "2023-06-01" MODEL_URL = "https://api.anthropic.com/v1/messages" SUPPORTED_IMAGE_TYPES = ["image/jpeg", "image/png", "image/gif", "image/webp"] SUPPORTED_PDF_MODELS = [ "claude-3-5-sonnet-20241022", "claude-3-5-sonnet-20240620", "claude-3-7-sonnet-latest", ] MAX_IMAGE_SIZE = 5 * 1024 * 1024 MAX_PDF_SIZE = 32 * 1024 * 1024 TOTAL_MAX_IMAGE_SIZE = 100 * 1024 * 1024 PDF_BETA_HEADER = "pdfs-2024-09-25" OUTPUT_128K_BETA = "output-128k-2025-02-19" # Model max tokens - updated with Claude 3.7 values MODEL_MAX_TOKENS = { "claude-3-opus-20240229": 4096, "claude-3-sonnet-20240229": 4096, "claude-3-haiku-20240307": 4096, "claude-3-5-sonnet-20240620": 8192, "claude-3-5-sonnet-20241022": 8192, "claude-3-5-haiku-20241022": 8192, "claude-3-opus-latest": 4096, "claude-3-5-sonnet-latest": 8192, "claude-3-5-haiku-latest": 8192, "claude-3-7-sonnet-latest": 16384, # Claude 3.7 supports up to 16K output tokens by default, 128K with beta } # Model context lengths - maximum input tokens MODEL_CONTEXT_LENGTH = { "claude-3-opus-20240229": 200000, "claude-3-sonnet-20240229": 200000, "claude-3-haiku-20240307": 200000, "claude-3-5-sonnet-20240620": 200000, "claude-3-5-sonnet-20241022": 200000, "claude-3-5-haiku-20241022": 200000, "claude-3-opus-latest": 200000, "claude-3-5-sonnet-latest": 200000, "claude-3-5-haiku-latest": 200000, "claude-3-7-sonnet-latest": 200000, # Claude 3.7 supports up to 200K context } BETA_HEADER = "prompt-caching-2024-07-31" REQUEST_TIMEOUT = 120 # Increased timeout for longer responses THINKING_BUDGET_TOKENS = 16000 # Default thinking budget tokens class Valves(BaseModel): ANTHROPIC_API_KEY: str = "Your API Key Here" ENABLE_THINKING: bool = ( True # Changed to True to enable streaming of thinking tokens ) MAX_OUTPUT_TOKENS: bool = True # Valve to use maximum possible output tokens ENABLE_TOOL_CHOICE: bool = True # Valve to enable tool choice ENABLE_SYSTEM_PROMPT: bool = True # Valve to enable system prompt THINKING_BUDGET_TOKENS: int = Field( default=16000, ge=0, le=16000 ) # Configurable thinking budget tokens 16,000 max def __init__(self): logging.basicConfig(level=logging.INFO) self.type = "manifold" self.id = "anthropic" self.valves = self.Valves() self.request_id = None def get_anthropic_models(self) -> List[dict]: return [ { "id": f"anthropic/{name}", "name": name, "context_length": self.MODEL_CONTEXT_LENGTH.get(name, 200000), "supports_vision": name != "claude-3-5-haiku-20241022", "supports_thinking": name == "claude-3-7-sonnet-latest", # Only Claude 3.7 supports thinking } for name in [ "claude-3-opus-20240229", "claude-3-sonnet-20240229", "claude-3-haiku-20240307", "claude-3-5-sonnet-20240620", "claude-3-5-sonnet-20241022", "claude-3-5-haiku-20241022", "claude-3-opus-latest", "claude-3-5-sonnet-latest", "claude-3-5-haiku-latest", "claude-3-7-sonnet-latest", ] ] def pipes(self) -> List[dict]: return self.get_anthropic_models() def process_content(self, content: Union[str, List[dict]]) -> List[dict]: if isinstance(content, str): return [{"type": "text", "text": content}] processed_content = [] for item in content: if item["type"] == "text": processed_content.append({"type": "text", "text": item["text"]}) elif item["type"] == "image_url": processed_content.append(self.process_image(item)) elif item["type"] == "pdf_url": model_name = item.get("model", "").split("/")[-1] if model_name not in self.SUPPORTED_PDF_MODELS: raise ValueError( f"PDF support is only available for models: {', '.join(self.SUPPORTED_PDF_MODELS)}" ) processed_content.append(self.process_pdf(item)) elif item["type"] == "tool_calls": processed_content.append(item) elif item["type"] == "tool_results": processed_content.append(item) return processed_content def process_image(self, image_data): if image_data["image_url"]["url"].startswith("data:image"): mime_type, base64_data = image_data["image_url"]["url"].split(",", 1) media_type = mime_type.split(":")[1].split(";")[0] if media_type not in self.SUPPORTED_IMAGE_TYPES: raise ValueError(f"Unsupported media type: {media_type}") # Check image size image_size = len(base64_data) * 3 / 4 # Approximate size of decoded base64 if image_size > self.MAX_IMAGE_SIZE: raise ValueError( f"Image size exceeds {self.MAX_IMAGE_SIZE/(1024*1024)}MB limit: {image_size/(1024*1024):.2f}MB" ) return { "type": "image", "source": { "type": "base64", "media_type": media_type, "data": base64_data, }, } else: return { "type": "image", "source": {"type": "url", "url": image_data["image_url"]["url"]}, } def process_pdf(self, pdf_data): if pdf_data["pdf_url"]["url"].startswith("data:application/pdf"): mime_type, base64_data = pdf_data["pdf_url"]["url"].split(",", 1) # Check PDF size pdf_size = len(base64_data) * 3 / 4 # Approximate size of decoded base64 if pdf_size > self.MAX_PDF_SIZE: raise ValueError( f"PDF size exceeds {self.MAX_PDF_SIZE/(1024*1024)}MB limit: {pdf_size/(1024*1024):.2f}MB" ) document = { "type": "document", "source": { "type": "base64", "media_type": "application/pdf", "data": base64_data, }, } if pdf_data.get("cache_control"): document["cache_control"] = pdf_data["cache_control"] return document else: document = { "type": "document", "source": {"type": "url", "url": pdf_data["pdf_url"]["url"]}, } if pdf_data.get("cache_control"): document["cache_control"] = pdf_data["cache_control"] return document async def pipe( self, body: Dict, __event_emitter__=None ) -> Union[str, AsyncIterator[str]]: """ Process a request to the Anthropic API. Args: body: The request body containing messages and parameters __event_emitter__: Optional event emitter for status updates Returns: Either a string response or an async iterator for streaming responses """ if not self.valves.ANTHROPIC_API_KEY: error_msg = "Error: ANTHROPIC_API_KEY is required" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) return {"content": error_msg, "format": "text"} try: system_message, messages = pop_system_message(body["messages"]) if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Processing request...", "done": False}, } ) model_name = body["model"].split("/")[-1] if model_name not in self.MODEL_MAX_TOKENS: logging.warning( f"Unknown model: {model_name}, using default token limit" ) # Get max tokens for the model max_tokens_limit = self.MODEL_MAX_TOKENS.get(model_name, 4096) # If MAX_OUTPUT_TOKENS valve is enabled, use the maximum possible tokens for the model if self.valves.MAX_OUTPUT_TOKENS: max_tokens = max_tokens_limit else: max_tokens = min( body.get("max_tokens", max_tokens_limit), max_tokens_limit ) payload = { "model": model_name, "messages": self._process_messages(messages), "max_tokens": max_tokens, "temperature": ( float(body.get("temperature")) if body.get("temperature") is not None else None ), "top_k": ( int(body.get("top_k")) if body.get("top_k") is not None else None ), "top_p": ( float(body.get("top_p")) if body.get("top_p") is not None else None ), "stream": body.get("stream", False), "metadata": body.get("metadata", {}), } # Add thinking parameter with proper format if enabled and model supports it if self.valves.ENABLE_THINKING and model_name == "claude-3-7-sonnet-latest": payload["thinking"] = { "type": "enabled", "budget_tokens": self.valves.THINKING_BUDGET_TOKENS, } payload = {k: v for k, v in payload.items() if v is not None} # Add system message if enabled if system_message and self.valves.ENABLE_SYSTEM_PROMPT: payload["system"] = str(system_message) # Add tools if enabled if "tools" in body and self.valves.ENABLE_TOOL_CHOICE: payload["tools"] = [ {"type": "function", "function": tool} for tool in body["tools"] ] payload["tool_choice"] = body.get("tool_choice") if "response_format" in body: payload["response_format"] = { "type": body["response_format"].get("type") } headers = { "x-api-key": self.valves.ANTHROPIC_API_KEY, "anthropic-version": self.API_VERSION, "content-type": "application/json", } beta_headers = [] # Add PDF beta header if needed if any( isinstance(msg["content"], list) and any(item.get("type") == "pdf_url" for item in msg["content"]) for msg in body.get("messages", []) ): beta_headers.append(self.PDF_BETA_HEADER) # Add cache control beta header if needed if any( isinstance(msg["content"], list) and any(item.get("cache_control") for item in msg["content"]) for msg in body.get("messages", []) ): beta_headers.append(self.BETA_HEADER) # Add 128K output beta header for Claude 3.7 if ( model_name == "claude-3-7-sonnet-latest" and self.valves.MAX_OUTPUT_TOKENS ): beta_headers.append(self.OUTPUT_128K_BETA) if beta_headers: headers["anthropic-beta"] = ",".join(beta_headers) try: if payload["stream"]: return self._stream_with_ui( self.MODEL_URL, headers, payload, body, __event_emitter__ ) response_data, cache_metrics = await self._send_request( self.MODEL_URL, headers, payload ) if ( isinstance(response_data, dict) and "content" in response_data and response_data.get("format") == "text" ): # This is an error response if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": response_data["content"], "done": True, }, } ) return response_data["content"] # Handle tool calls in the response if any( block.get("type") == "tool_use" for block in response_data.get("content", []) ): tool_blocks = [ block for block in response_data.get("content", []) if block.get("type") == "tool_use" ] tool_calls = [] for block in tool_blocks: tool_use = block["tool_use"] tool_calls.append( { "id": tool_use["id"], "type": "function", "function": { "name": tool_use["name"], "arguments": tool_use["input"], }, } ) if tool_calls: return json.dumps( {"type": "tool_calls", "tool_calls": tool_calls} ) # Handle thinking in the response thinking_content = None if ( self.valves.ENABLE_THINKING and model_name == "claude-3-7-sonnet-latest" ): thinking_blocks = [ block for block in response_data.get("content", []) if block.get("type") == "thinking" ] if thinking_blocks: thinking_content = thinking_blocks[0].get("thinking", "") # Get the text response text_blocks = [ block for block in response_data.get("content", []) if block.get("type") == "text" ] response_text = text_blocks[0]["text"] if text_blocks else "" # If thinking is available, wrap it with <thinking> tags and prepend to the response if thinking_content: response_text = ( f"<thinking>{thinking_content}</thinking>{response_text}" ) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": "Request completed successfully", "done": True, }, } ) return response_text except Exception as e: error_msg = f"Request failed: {str(e)}" if self.request_id: error_msg += f" (Request ID: {self.request_id})" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) return {"content": error_msg, "format": "text"} except Exception as e: error_msg = f"Error: {str(e)}" if self.request_id: error_msg += f" (Request ID: {self.request_id})" if __event_emitter__: await __event_emitter__( {"type": "status", "data": {"description": error_msg, "done": True}} ) return {"content": error_msg, "format": "text"} async def _stream_with_ui( self, url: str, headers: dict, payload: dict, body: dict, __event_emitter__=None ) -> AsyncIterator[str]: """ Stream responses from the Anthropic API with UI event updates. Yields text chunks including extended thinking tokens. """ try: async with aiohttp.ClientSession() as session: timeout = aiohttp.ClientTimeout(total=self.REQUEST_TIMEOUT) async with session.post( url, headers=headers, json=payload, timeout=timeout ) as response: self.request_id = response.headers.get("x-request-id") if response.status != 200: error_text = await response.text() error_msg = f"Error: HTTP {response.status}: {error_text}" if self.request_id: error_msg += f" (Request ID: {self.request_id})" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) yield error_msg return is_thinking = False async for line in response.content: if line and line.startswith(b"data: "): try: line_text = line[6:].decode("utf-8").strip() if line_text == "[DONE]": if is_thinking: yield "</thinking>" break data = json.loads(line_text) # Start of a new content block if data["type"] == "content_block_start": block_type = data.get("content_block", {}).get( "type" ) if block_type == "thinking": is_thinking = True if self.valves.ENABLE_THINKING: yield "<thinking>" elif block_type == "text": is_thinking = False # Handling deltas inside a block elif data["type"] == "content_block_delta": delta_type = data.get("delta", {}).get("type") if ( is_thinking and delta_type == "thinking_delta" and self.valves.ENABLE_THINKING ): yield data["delta"].get("thinking", "") elif ( not is_thinking ) and delta_type == "text_delta": yield data["delta"].get("text", "") # End of the current content block elif data["type"] == "content_block_stop": if is_thinking: yield "</thinking>" is_thinking = False elif data["type"] == "message_stop": if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": "Request completed", "done": True, }, } ) break except json.JSONDecodeError as e: logging.error( f"Failed to parse streaming response: {e}" ) continue except asyncio.TimeoutError: error_msg = "Request timed out" if self.request_id: error_msg += f" (Request ID: {self.request_id})" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) yield error_msg except Exception as e: error_msg = f"Stream error: {str(e)}" if self.request_id: error_msg += f" (Request ID: {self.request_id})" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) yield error_msg def _process_messages(self, messages: List[dict]) -> List[dict]: """ Process messages for the Anthropic API format. Args: messages: List of message objects Returns: Processed messages in Anthropic API format """ processed_messages = [] for message in messages: processed_content = [] for content in self.process_content(message["content"]): if ( message.get("role") == "assistant" and content.get("type") == "tool_calls" ): content["cache_control"] = {"type": "ephemeral"} elif ( message.get("role") == "user" and content.get("type") == "tool_results" ): content["cache_control"] = {"type": "ephemeral"} processed_content.append(content) processed_messages.append( {"role": message["role"], "content": processed_content} ) return processed_messages async def _send_request( self, url: str, headers: dict, payload: dict ) -> tuple[dict, Optional[dict]]: """ Send a request to the Anthropic API with retry logic. Args: url: The API endpoint URL headers: Request headers payload: Request payload Returns: Tuple of (response_data, cache_metrics) """ retry_count = 0 base_delay = 1 # Start with 1 second delay max_retries = 3 while retry_count < max_retries: try: async with aiohttp.ClientSession() as session: timeout = aiohttp.ClientTimeout(total=self.REQUEST_TIMEOUT) async with session.post( url, headers=headers, json=payload, timeout=timeout ) as response: self.request_id = response.headers.get("x-request-id") if response.status == 429: retry_after = int( response.headers.get( "retry-after", base_delay * (2**retry_count) ) ) logging.warning( f"Rate limit hit. Retrying in {retry_after} seconds. Retry count: {retry_count + 1}" ) await asyncio.sleep(retry_after) retry_count += 1 continue response_text = await response.text() if response.status != 200: error_msg = f"Error: HTTP {response.status}" try: error_data = json.loads(response_text).get("error", {}) error_msg += ( f": {error_data.get('message', response_text)}" ) except: error_msg += f": {response_text}" if self.request_id: error_msg += f" (Request ID: {self.request_id})" return {"content": error_msg, "format": "text"}, None result = json.loads(response_text) usage = result.get("usage", {}) cache_metrics = { "cache_creation_input_tokens": usage.get( "cache_creation_input_tokens", 0 ), "cache_read_input_tokens": usage.get( "cache_read_input_tokens", 0 ), "input_tokens": usage.get("input_tokens", 0), "output_tokens": usage.get("output_tokens", 0), } return result, cache_metrics except aiohttp.ClientError as e: logging.error(f"Request failed: {str(e)}") if retry_count < max_retries - 1: retry_count += 1 await asyncio.sleep(base_delay * (2**retry_count)) continue raise logging.error("Max retries exceeded for rate limit.") return {"content": "Max retries exceeded", "format": "text"}, None