Whitepaper
Docs
Sign In
Function
Function
pipe
v0.2.1
Anthropic Valveless
Function ID
anthropic_valveless
Creator
@pikaro
Downloads
98+
Anthropic Manifold Pipe (Vaveless)
Get
README
No README available
Function Code
Show
""" title: Anthropic Manifold Pipe authors: justinh-rahb and christian-taillon author_url: https://github.com/justinh-rahb funding_url: https://github.com/open-webui version: 0.2.1 required_open_webui_version: 0.3.17 license: MIT """ # Forked from the original `anthropic` function, but lacking the valve for the API key. # This allows the admin to set the key via env variable without potentially exposing it to users. import os import requests import json import time from typing import List, Union, Generator, Iterator from pydantic import BaseModel, Field from open_webui.utils.misc import pop_system_message class Pipe: def __init__(self): self.type = "manifold" self.id = "anthropic_valveless" self.name = "anthropic_valveless/" pass def get_anthropic_models(self): return [ {"id": "claude-3-haiku-20240307", "name": "claude-3-haiku"}, {"id": "claude-3-opus-20240229", "name": "claude-3-opus"}, {"id": "claude-3-sonnet-20240229", "name": "claude-3-sonnet"}, {"id": "claude-3-5-haiku-20241022", "name": "claude-3.5-haiku"}, {"id": "claude-3-5-haiku-latest", "name": "claude-3.5-haiku"}, {"id": "claude-3-5-sonnet-20240620", "name": "claude-3.5-sonnet"}, {"id": "claude-3-5-sonnet-20241022", "name": "claude-3.5-sonnet"}, {"id": "claude-3-5-sonnet-latest", "name": "claude-3.5-sonnet"}, ] def pipes(self) -> List[dict]: return self.get_anthropic_models() def process_image(self, image_data): if image_data["image_url"]["url"].startswith("data:image"): mime_type, base64_data = image_data["image_url"]["url"].split(",", 1) media_type = mime_type.split(":")[1].split(";")[0] return { "type": "image", "source": { "type": "base64", "media_type": media_type, "data": base64_data, }, } else: return { "type": "image", "source": {"type": "url", "url": image_data["image_url"]["url"]}, } def pipe(self, body: dict) -> Union[str, Generator, Iterator]: system_message, messages = pop_system_message(body["messages"]) processed_messages = [] image_count = 0 total_image_size = 0 for message in messages: processed_content = [] if isinstance(message.get("content"), list): for item in message["content"]: if item["type"] == "text": processed_content.append({"type": "text", "text": item["text"]}) elif item["type"] == "image_url": if image_count >= 5: raise ValueError( "Maximum of 5 images per API call exceeded" ) processed_image = self.process_image(item) processed_content.append(processed_image) if processed_image["source"]["type"] == "base64": image_size = len(processed_image["source"]["data"]) * 3 / 4 else: image_size = 0 total_image_size += image_size if total_image_size > 100 * 1024 * 1024: raise ValueError( "Total size of images exceeds 100 MB limit" ) image_count += 1 else: processed_content = [ {"type": "text", "text": message.get("content", "")} ] processed_messages.append( {"role": message["role"], "content": processed_content} ) # Ensure the system_message is coerced to a string payload = { "model": body["model"][body["model"].find(".") + 1 :], "messages": processed_messages, "max_tokens": body.get("max_tokens", 4096), "temperature": body.get("temperature", 0.8), "top_k": body.get("top_k", 40), "top_p": body.get("top_p", 0.9), "stop_sequences": body.get("stop", []), **({"system": str(system_message)} if system_message else {}), "stream": body.get("stream", False), } headers = { "x-api-key": os.environ["ANTHROPIC_API_KEY"], "anthropic-version": "2023-06-01", "content-type": "application/json", } url = "https://api.anthropic.com/v1/messages" try: if body.get("stream", False): return self.stream_response(url, headers, payload) else: return self.non_stream_response(url, headers, payload) except requests.exceptions.RequestException as e: print(f"Request failed: {e}") return f"Error: Request failed: {e}" except Exception as e: print(f"Error in pipe method: {e}") return f"Error: {e}" def stream_response(self, url, headers, payload): try: with requests.post( url, headers=headers, json=payload, stream=True, timeout=(3.05, 60) ) as response: if response.status_code != 200: raise Exception( f"HTTP Error {response.status_code}: {response.text}" ) for line in response.iter_lines(): if line: line = line.decode("utf-8") if line.startswith("data: "): try: data = json.loads(line[6:]) if data["type"] == "content_block_start": yield data["content_block"]["text"] elif data["type"] == "content_block_delta": yield data["delta"]["text"] elif data["type"] == "message_stop": break elif data["type"] == "message": for content in data.get("content", []): if content["type"] == "text": yield content["text"] # Delay to avoid overwhelming the client time.sleep(0.01) except json.JSONDecodeError: print(f"Failed to parse JSON: {line}") except KeyError as e: print(f"Unexpected data structure: {e}") print(f"Full data: {data}") except requests.exceptions.RequestException as e: print(f"Request failed: {e}") yield f"Error: Request failed: {e}" except Exception as e: print(f"General error in stream_response method: {e}") yield f"Error: {e}" def non_stream_response(self, url, headers, payload): try: response = requests.post( url, headers=headers, json=payload, timeout=(3.05, 60) ) if response.status_code != 200: raise Exception(f"HTTP Error {response.status_code}: {response.text}") res = response.json() return ( res["content"][0]["text"] if "content" in res and res["content"] else "" ) except requests.exceptions.RequestException as e: print(f"Failed non-stream request: {e}") return f"Error: {e}"