Whitepaper
Docs
Sign In
Function
Function
pipe
v0.0.1
Anthropic Claude 3.7 Reasoning
Function ID
anthropic_reasoning
Creator
@kimsang
Downloads
79+
Anthropic Manifold Pipe (for Reasoning)
Get
README
No README available
Function Code
Show
""" title: Anthropic Claude 3.7 Reasoning authors: h.a.l version: 0.0.1 required_open_webui_version: 0.5.18 license: MIT """ import os import requests import json import time from typing import List, Union, Generator, Iterator from pydantic import BaseModel, Field from open_webui.utils.misc import pop_system_message class Pipe: class Valves(BaseModel): ANTHROPIC_API_KEY: str = Field(default="") THINKING: bool = Field(default=False) BUDGET_TOKENS: int = Field(default=1024) def __init__(self): self.type = "manifold" self.id = "anthropic" self.name = "anthropic/" self.valves = self.Valves( **{"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", "")} ) self.MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5MB per image pass def pipes(self) -> List[dict]: return [ {"id": "claude-3-7-sonnet-latest", "name": "claude-3.7-sonnet"}, ] def process_image(self, image_data): """Process image data with size validation.""" if image_data["image_url"]["url"].startswith("data:image"): mime_type, base64_data = image_data["image_url"]["url"].split(",", 1) media_type = mime_type.split(":")[1].split(";")[0] # Check base64 image size image_size = len(base64_data) * 3 / 4 # Convert base64 size to bytes if image_size > self.MAX_IMAGE_SIZE: raise ValueError( f"Image size exceeds 5MB limit: {image_size / (1024 * 1024):.2f}MB" ) return { "type": "image", "source": { "type": "base64", "media_type": media_type, "data": base64_data, }, } else: # For URL images, perform size check after fetching url = image_data["image_url"]["url"] response = requests.head(url, allow_redirects=True) content_length = int(response.headers.get("content-length", 0)) if content_length > self.MAX_IMAGE_SIZE: raise ValueError( f"Image at URL exceeds 5MB limit: {content_length / (1024 * 1024):.2f}MB" ) return { "type": "image", "source": {"type": "url", "url": url}, } def pipe(self, body: dict) -> Union[str, Generator, Iterator]: system_message, messages = pop_system_message(body["messages"]) processed_messages = [] total_image_size = 0 for message in messages: processed_content = [] if isinstance(message.get("content"), list): for item in message["content"]: if item["type"] == "text": processed_content.append({"type": "text", "text": item["text"]}) elif item["type"] == "image_url": processed_image = self.process_image(item) processed_content.append(processed_image) # Track total size for base64 images if processed_image["source"]["type"] == "base64": image_size = len(processed_image["source"]["data"]) * 3 / 4 total_image_size += image_size if ( total_image_size > 100 * 1024 * 1024 ): # 100MB total limit raise ValueError( "Total size of images exceeds 100 MB limit" ) else: processed_content = [ {"type": "text", "text": message.get("content", "")} ] processed_messages.append( {"role": message["role"], "content": processed_content} ) payload = { "model": body["model"][body["model"].find(".") + 1 :], "messages": processed_messages, "max_tokens": body.get("max_tokens", 4096), **( { "thinking": { "type": "enabled", "budget_tokens": self.valves.BUDGET_TOKENS, } } if self.valves.THINKING else {} ), # cannot use the following parameters, see https://docs.anthropic.com/en/docs/build-with-claude/extended-thinking#important-considerations-when-using-extended-thinking # "temperature": body.get("temperature", 0.8), # "top_k": body.get("top_k", 40), # "top_p": body.get("top_p", 0.9), "stop_sequences": body.get("stop", []), **({"system": str(system_message)} if system_message else {}), "stream": body.get("stream", False), } headers = { "x-api-key": self.valves.ANTHROPIC_API_KEY, "anthropic-version": "2023-06-01", "content-type": "application/json", } url = "https://api.anthropic.com/v1/messages" try: if body.get("stream", False): return self.stream_response(url, headers, payload) else: return self.non_stream_response(url, headers, payload) except requests.exceptions.RequestException as e: print(f"Request failed: {e}") return f"Error: Request failed: {e}" except Exception as e: print(f"Error in pipe method: {e}") return f"Error: {e}" def stream_response(self, url, headers, payload): try: with requests.post( url, headers=headers, json=payload, stream=True, timeout=(3.05, 60) ) as response: if response.status_code != 200: raise Exception( f"HTTP Error {response.status_code}: {response.text}" ) for line in response.iter_lines(): # print(line) if line: line = line.decode("utf-8") if line.startswith("data: "): try: data = json.loads(line[6:]) if data["type"] == "content_block_start": if data["content_block"]["type"] == "thinking": yield "<think>\n" else: yield data["content_block"]["text"] elif data["type"] == "content_block_delta": if data["delta"]["type"] == "thinking_delta": yield data["delta"]["thinking"] else: yield data["delta"]["text"] elif data["type"] == "message_stop": break elif data["type"] == "content_block_stop": if self.valves.THINKING and data["index"] == 0: yield "</think>\n" elif data["type"] == "message": for content in data.get("content", []): if content["type"] == "text": yield content["text"] time.sleep( 0.01 ) # Delay to avoid overwhelming the client except json.JSONDecodeError: print(f"Failed to parse JSON: {line}") except KeyError as e: print(f"Unexpected data structure: {e}") print(f"Full data: {data}") except requests.exceptions.RequestException as e: print(f"Request failed: {e}") yield f"Error: Request failed: {e}" except Exception as e: print(f"General error in stream_response method: {e}") yield f"Error: {e}" def non_stream_response(self, url, headers, payload): try: response = requests.post( # TODO change 2nd timeout value url, headers=headers, json=payload, timeout=(3.05, 60), ) start_time = time.time() # print("made it here") if response.status_code != 200: raise Exception(f"HTTP Error {response.status_code}: {response.text}") # print(response.text) res = response.json() if "content" in res and res["content"]: full_response = "" if self.valves.THINKING: full_response += ( f"<details>\n<summary>Thought for {int(time.time() - start_time)} seconds 💭</summary>" + "\n> " + res["content"][0]["thinking"].replace("\n", "\n> ") + "\n\n---\n\n</details>\n" ) full_response += res["content"][1]["text"] else: full_response += res["content"][0]["text"] return full_response except requests.exceptions.RequestException as e: print(f"Failed non-stream request: {e}") return f"Error: {e}"