NOTICE
Open WebUI Community is currently undergoing a major revamp to improve user experience and performance. Expected completion by year-end! ✨

Function
pipe
v1.3
Anthropic V2
Anthropic API access with all the latest API changes (prompt caching, vision, etc).
Function ID
anthropic_v2
Creator
@balaxxe
Downloads
20+

Function Content
python
"""
title: Anthropic API Integration for OpenWebUI
author: Balaxxe
version: 1.3
license: MIT
requirements: pydantic>=2.0.0, requests>=2.0.0
environment_variables: 
    - ANTHROPIC_API_KEY (required)

Supports:
- All Claude 3 models
- Streaming responses
- Image processing
- Prompt caching (server-side)
- Function calling
"""

import os
import requests
import json
import time
import hashlib
from datetime import datetime
from typing import (
    List,
    Union,
    Generator,
    Iterator,
    Dict,
    Optional,
    AsyncIterator,
)
from pydantic import BaseModel, Field
from open_webui.utils.misc import pop_system_message


class Pipe:
    API_VERSION = "2023-06-01"
    MODEL_URL = "https://api.anthropic.com/v1/messages"
    SUPPORTED_IMAGE_TYPES = ["image/jpeg", "image/png", "image/gif", "image/webp"]
    MAX_IMAGE_SIZE = 5 * 1024 * 1024  # 5MB in bytes
    TOTAL_MAX_IMAGE_SIZE = 100 * 1024 * 1024  # 100MB in bytes
    DEFAULT_MAX_TOKENS_CLAUDE_3_5 = 8192
    DEFAULT_MAX_TOKENS_OTHER = 4096
    BETA_HEADER = "prompt-caching-2024-07-31"
    REQUEST_TIMEOUT = (3.05, 60)

    class Valves(BaseModel):
        ANTHROPIC_API_KEY: str = Field(default="", description="Your Anthropic API key")

    def __init__(self):
        self.type = "manifold"
        self.id = "anthropic"
        self.name = "anthropic/"
        self.valves = self.Valves(
            **{
                "ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", ""),
            }
        )
        self.request_id = None

    def get_anthropic_models(self):
        return [
            {
                "id": "anthropic/claude-3-haiku-20240307",
                "name": "claude-3-haiku-20240307",
                "context_length": 200000,
                "supports_vision": False,
            },
            {
                "id": "anthropic/claude-3-opus-20240229",
                "name": "claude-3-opus-20240229",
                "context_length": 200000,
                "supports_vision": True,
            },
            {
                "id": "anthropic/claude-3-sonnet-20240229",
                "name": "claude-3-sonnet-20240229",
                "context_length": 200000,
                "supports_vision": True,
            },
            {
                "id": "anthropic/claude-3-5-haiku-20241022",
                "name": "claude-3.5-haiku-20241022",
                "context_length": 200000,
                "supports_vision": False,
            },
            {
                "id": "anthropic/claude-3-5-haiku-latest",
                "name": "claude-3.5-haiku-latest",
                "context_length": 200000,
                "supports_vision": False,
            },
            {
                "id": "anthropic/claude-3-5-sonnet-20240620",
                "name": "claude-3.5-sonnet-20240620",
                "context_length": 200000,
                "supports_vision": True,
            },
            {
                "id": "anthropic/claude-3-5-sonnet-20241022",
                "name": "claude-3.5-sonnet-20241022",
                "context_length": 200000,
                "supports_vision": True,
            },
            {
                "id": "anthropic/claude-3-5-sonnet-latest",
                "name": "claude-3.5-sonnet-latest",
                "context_length": 200000,
                "supports_vision": True,
            },
        ]

    def pipes(self) -> List[dict]:
        return self.get_anthropic_models()

    def process_content(self, content: Union[str, List[dict]]) -> List[dict]:
        if isinstance(content, str):
            return [{"type": "text", "text": content}]

        processed_content = []
        total_image_size = 0

        for item in content:
            if item["type"] == "text":
                processed_content.append({"type": "text", "text": item["text"]})
            elif item["type"] == "image_url":
                processed_image = self.process_image(item)
                if processed_image["source"]["type"] == "base64":
                    image_size = len(processed_image["source"]["data"]) * 3 / 4
                    total_image_size += image_size
                    if image_size > self.MAX_IMAGE_SIZE:
                        raise ValueError(
                            f"Image size exceeds 5MB limit: {image_size / (1024 * 1024):.2f}MB"
                        )
                    if total_image_size > self.TOTAL_MAX_IMAGE_SIZE:
                        raise ValueError("Total size of images exceeds 100 MB limit")
                processed_content.append(processed_image)

        return processed_content

    def process_image(self, image_data):
        if image_data["image_url"]["url"].startswith("data:image"):
            mime_type, base64_data = image_data["image_url"]["url"].split(",", 1)
            media_type = mime_type.split(":")[1].split(";")[0]

            if media_type not in ["image/jpeg", "image/png", "image/gif", "image/webp"]:
                raise ValueError(f"Unsupported media type: {media_type}")

            image_size = len(base64_data) * 3 / 4
            if image_size > self.MAX_IMAGE_SIZE:
                raise ValueError(
                    f"Image size exceeds limit: {image_size / (1024 * 1024):.2f}MB"
                )

            return {
                "type": "image",
                "source": {
                    "type": "base64",
                    "media_type": media_type,
                    "data": base64_data,
                },
            }
        else:
            url = image_data["image_url"]["url"]
            response = requests.head(url, allow_redirects=True)
            content_type = response.headers.get("content-type", "")
            content_length = int(response.headers.get("content-length", 0))

            if content_type not in [
                "image/jpeg",
                "image/png",
                "image/gif",
                "image/webp",
            ]:
                raise ValueError(f"Unsupported media type: {content_type}")
            if content_length > self.MAX_IMAGE_SIZE:
                raise ValueError(
                    f"Image exceeds 5MB limit: {content_length / (1024 * 1024):.2f}MB"
                )

            return {"type": "image", "source": {"type": "url", "url": url}}

    async def pipe(
        self, body: Dict, __event_emitter__=None
    ) -> Union[str, Generator, Iterator]:
        if not self.valves.ANTHROPIC_API_KEY:
            if __event_emitter__:
                await __event_emitter__(
                    {
                        "type": "status",
                        "data": {
                            "description": "Error: ANTHROPIC_API_KEY is required",
                            "done": True,
                        },
                    }
                )
            return {"content": "Error: ANTHROPIC_API_KEY is required", "format": "text"}

        try:
            system_message, messages = pop_system_message(body["messages"])

            if __event_emitter__:
                await __event_emitter__(
                    {
                        "type": "status",
                        "data": {"description": "Processing request...", "done": False},
                    }
                )

            model_name = body["model"].split("/")[-1]
            default_max_tokens = self.DEFAULT_MAX_TOKENS_CLAUDE_3_5 if model_name.startswith("claude-3-5") else self.DEFAULT_MAX_TOKENS_OTHER

            payload = {
                "model": model_name,
                "messages": self._process_messages(messages),
                "max_tokens": min(
                    body.get("max_tokens", default_max_tokens), default_max_tokens
                ),
                "temperature": float(body.get("temperature")) if body.get("temperature") is not None else None,
                "top_k": int(body.get("top_k")) if body.get("top_k") is not None else None,
                "top_p": float(body.get("top_p")) if body.get("top_p") is not None else None,
                "stream": body.get("stream"),
                "metadata": body.get("metadata", {}),
            }

            payload = {k: v for k, v in payload.items() if v is not None}

            if system_message:
                payload["system"] = str(system_message)

            if "tools" in body:
                payload["tools"] = [
                    {"type": "function", "function": tool} for tool in body["tools"]
                ]
                payload["tool_choice"] = body.get("tool_choice")

            if "response_format" in body:
                payload["response_format"] = {
                    "type": body["response_format"].get("type")
                }

            headers = {
                "x-api-key": self.valves.ANTHROPIC_API_KEY,
                "anthropic-version": self.API_VERSION,
                "anthropic-beta": self.BETA_HEADER,
                "content-type": "application/json",
            }

            try:
                if payload["stream"]:
                    return self._stream_with_ui(
                        self.MODEL_URL, headers, payload, body, __event_emitter__
                    )

                response = requests.post(
                    self.MODEL_URL, headers=headers, json=payload, timeout=self.REQUEST_TIMEOUT
                )
                if response.status_code != 200:
                    return {
                        "content": f"Error: HTTP {response.status_code}: {response.text}",
                        "format": "text",
                    }

                result, cache_metrics = self._handle_response(response)
                response_text = result["content"][0]["text"]

                if __event_emitter__:
                    await __event_emitter__(
                        {
                            "type": "status",
                            "data": {
                                "description": "Request completed successfully",
                                "done": True,
                            },
                        }
                    )

                return response_text

            except requests.exceptions.RequestException as e:
                error_msg = f"Request failed: {str(e)}"
                if self.request_id:
                    error_msg += f" (Request ID: {self.request_id})"

                if __event_emitter__:
                    await __event_emitter__(
                        {
                            "type": "status",
                            "data": {"description": error_msg, "done": True},
                        }
                    )
                return {"content": error_msg, "format": "text"}

        except Exception as e:
            error_msg = f"Error: {str(e)}"
            if self.request_id:
                error_msg += f" (Request ID: {self.request_id})"

            if __event_emitter__:
                await __event_emitter__(
                    {"type": "status", "data": {"description": error_msg, "done": True}}
                )
            return {"content": error_msg, "format": "text"}

    async def _stream_with_ui(
        self, url: str, headers: dict, payload: dict, body: dict, __event_emitter__=None
    ) -> Generator:
        try:
            with requests.post(
                url, headers=headers, json=payload, stream=True, timeout=self.REQUEST_TIMEOUT
            ) as response:
                self.request_id = response.headers.get("x-request-id")

                if response.status_code != 200:
                    if __event_emitter__:
                        await __event_emitter__(
                            {
                                "type": "status",
                                "data": {"description": "Request failed", "done": True},
                            }
                        )
                    yield f"Error: HTTP {response.status_code}: {response.text} (Request ID: {self.request_id})"
                    return

                for line in response.iter_lines():
                    if line and line.startswith(b"data: "):
                        data = json.loads(line[6:])
                        if (
                            data["type"] == "content_block_delta"
                            and "text" in data["delta"]
                        ):
                            yield data["delta"]["text"]
                        elif data["type"] == "message_stop":
                            if __event_emitter__:
                                await __event_emitter__(
                                    {
                                        "type": "status",
                                        "data": {
                                            "description": "Request completed successfully",
                                            "done": True,
                                        },
                                    }
                                )
                            break
                        elif data["type"] == "error":
                            if __event_emitter__:
                                await __event_emitter__(
                                    {
                                        "type": "status",
                                        "data": {
                                            "description": "Stream error",
                                            "done": True,
                                        },
                                    }
                                )
                            yield f"Stream error: {data.get('error', {}).get('message', 'Unknown error')} (Request ID: {self.request_id})"

        except Exception as e:
            if __event_emitter__:
                await __event_emitter__(
                    {
                        "type": "status",
                        "data": {"description": "Stream error", "done": True},
                    }
                )
            yield f"Stream error: {str(e)} (Request ID: {self.request_id if self.request_id else 'unknown'})"

    def _process_messages(self, messages: List[dict]) -> List[dict]:
        processed_messages = []
        for message in messages:
            processed_content = []
            for content in self.process_content(message["content"]):
                if message.get("role") == "assistant" and content.get("type") == "tool_calls":
                    content["cache_control"] = {"type": "ephemeral"}
                elif message.get("role") == "user" and content.get("type") == "tool_results":
                    content["cache_control"] = {"type": "ephemeral"}
                processed_content.append(content)
            processed_messages.append({
                "role": message["role"],
                "content": processed_content
            })
        return processed_messages

    def _handle_response(self, response):
        result = response.json()
        usage = result.get("usage", {})
        cache_metrics = {
            "cache_creation_input_tokens": usage.get("cache_creation_input_tokens", 0),
            "cache_read_input_tokens": usage.get("cache_read_input_tokens", 0),
            "input_tokens": usage.get("input_tokens", 0)
        }
        return result, cache_metrics