Whitepaper
Docs
Sign In
Function
Function
pipe
v0.2.4
Anthropic Bedrock Manifold Pipe
Function ID
anthropic_bedrock_manifold_pipe_v1
Creator
@sumdhu91
Downloads
101+
A Pipe to connect to Anthropic Models via AWS Bedrock
Get
README
No README available
Function Code
Show
""" title: Anthropic Manifold Pipe authors: sumanthm91 author_url: https://github.com/sumanthm91 funding_url: https://github.com/open-webui version: 0.2.4 required_open_webui_version: 0.3.17 license: MIT """ import os import requests import json import time from typing import List, Union, Generator, Iterator from pydantic import BaseModel, Field from open_webui.utils.misc import pop_system_message from anthropic import AnthropicBedrock class Pipe: class Valves(BaseModel): def __init__(self): self.type = "manifold" self.id = "anthropicbedrockv1" self.name = "" self.valves = "" self.MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5MB per image pass def get_anthropic_models(self): return [ {"id": "anthropic.claude-3-haiku-20240307-v1:0", "name": "Claude 3 Haiku"}, { "id": "anthropic.claude-3-sonnet-20240229-v1:0", "name": "Claude 3 Sonnet", }, {"id": "anthropic.claude-3-opus-20240229-v1:0", "name": "Claude 3 Opus"}, { "id": "anthropic.claude-3-5-haiku-20241022-v1:0", "name": "Claude 3.5 Haiku", }, { "id": "anthropic.claude-3-5-sonnet-20241022-v2:0", "name": "Claude 3.5 Sonnet", }, ] def pipes(self) -> List[dict]: return self.get_anthropic_models() def process_image(self, image_data): """Process image data with size validation.""" if image_data["image_url"]["url"].startswith("data:image"): mime_type, base64_data = image_data["image_url"]["url"].split(",", 1) media_type = mime_type.split(":")[1].split(";")[0] # Check base64 image size # Convert base64 size to bytes image_size = len(base64_data) * 3 / 4 if image_size > self.MAX_IMAGE_SIZE: raise ValueError( f"Image size exceeds 5MB limit: {image_size / (1024 * 1024):.2f}MB" ) return { "type": "image", "source": { "type": "base64", "media_type": media_type, "data": base64_data, }, } else: # For URL images, perform size check after fetching url = image_data["image_url"]["url"] response = requests.head(url, allow_redirects=True) content_length = int(response.headers.get("content-length", 0)) if content_length > self.MAX_IMAGE_SIZE: raise ValueError( f"Image at URL exceeds 5MB limit: {content_length / (1024 * 1024):.2f}MB" ) return { "type": "image", "source": {"type": "url", "url": url}, } def pipe(self, body: dict) -> Union[str, Generator, Iterator]: system_message, messages = pop_system_message(body["messages"]) processed_messages = [] total_image_size = 0 for message in messages: processed_content = [] if isinstance(message.get("content"), list): for item in message["content"]: if item["type"] == "text": processed_content.append({"type": "text", "text": item["text"]}) elif item["type"] == "image_url": processed_image = self.process_image(item) processed_content.append(processed_image) # Track total size for base64 images if processed_image["source"]["type"] == "base64": image_size = len(processed_image["source"]["data"]) * 3 / 4 total_image_size += image_size if ( total_image_size > 100 * 1024 * 1024 ): # 100MB total limit raise ValueError( "Total size of images exceeds 100 MB limit" ) else: processed_content = [ {"type": "text", "text": message.get("content", "")} ] processed_messages.append( {"role": message["role"], "content": processed_content} ) client = AnthropicBedrock( aws_access_key="your_aws_access_key", aws_secret_key="your_aws_secret_key", aws_region="your_aws_region", ) try: return self.stream_response( client=client, messages=processed_messages, model=body["model"], ) except requests.exceptions.RequestException as e: print(f"Request failed: {e}") return f"Error: Request failed: {e}" except Exception as e: print(f"Error in pipe method: {e}") return f"Error: {e}" def stream_response(self, client, messages, model): prefix = "anthropic_bedrock_manifold_pipe." if model.startswith(prefix): model_id = model[len(prefix) :] try: with client.messages.stream( max_tokens=8192, messages=messages, model=model_id, ) as stream: for text in stream.text_stream: yield text except requests.exceptions.RequestException as e: print(f"Request failed: {e}") yield f"Error: Request failed: {e}" except Exception as e: print(f"General error in stream_response method: {e}") yield f"Error: {e}"