Whitepaper
Docs
Sign In
Function
Function
pipe
Anthropic Claude Sonnet 3.7 via Open Router
Function ID
anthropic_claude_sonnet_3_7_via_open_router
Creator
@galahad77
Downloads
56+
Connects to the latest Claude Sonnet 3.7 Model through the Open Router's API.
Get
README
No README available
Function Code
Show
import json import logging import os import time import asyncio from typing import Dict, Generator, Iterator, List, Union import aiohttp import requests from pydantic import BaseModel, Field class Pipe: API_VERSION = "v1" MODEL_URL = "https://openrouter.ai/api/v1/chat/completions" SUPPORTED_IMAGE_TYPES = ["image/jpeg", "image/png", "image/gif", "image/webp"] MAX_IMAGE_SIZE = 5 * 1024 * 1024 MAX_PDF_SIZE = 32 * 1024 * 1024 REQUEST_TIMEOUT = (3.05, 60) MODEL_MAX_TOKENS = { "anthropic/claude-3.7-sonnet": 128000, # Output max tokens per Beta } CONTEXT_MAX_TOKENS = 200000 # Input context max class Valves(BaseModel): OPENROUTER_API_KEY: str = Field( default=os.getenv("OPENROUTER_API_KEY", ""), description="Your OpenRouter API key", ) def __init__(self): logging.basicConfig(level=logging.INFO) self.type = "manifold" self.id = "openrouter" self.valves = self.Valves() self.request_id = None def pipes(self) -> List[dict]: return [ { "id": "anthropic/claude-3.7-sonnet", "name": "Claude 3.7 Sonnet", "context_length": self.CONTEXT_MAX_TOKENS, "supports_vision": True, } ] async def pipe( self, body: Dict, __event_emitter__=None ) -> Union[str, Generator, Iterator]: """Main entrypoint for the pipeline function.""" # Check API key if not self.valves.OPENROUTER_API_KEY: logging.error("OPENROUTER_API_KEY is missing or empty") return { "content": "Error: OPENROUTER_API_KEY is required", "format": "text", } try: model_name = "anthropic/claude-3.7-sonnet" max_tokens = self.MODEL_MAX_TOKENS[model_name] # Log important info for debugging logging.info(f"Processing request for model: {model_name}") # Ensure messages are properly formatted messages = body.get("messages", []) # Sanity check on messages format for msg in messages: if "role" not in msg or "content" not in msg: logging.warning(f"Message with invalid format detected: {msg}") # Log the first user message for debugging (truncated) if messages and len(messages) > 0: user_msg = next((m for m in messages if m.get("role") == "user"), None) if user_msg: content = user_msg.get("content", "") if isinstance(content, str): preview = content[:50] + "..." if len(content) > 50 else content logging.info(f"User query preview: {preview}") elif isinstance(content, list): # Handle multimodal content text_parts = [ item.get("text", "") for item in content if item.get("type") == "text" ] preview = ( " ".join(text_parts)[:50] + "..." if len(" ".join(text_parts)) > 50 else " ".join(text_parts) ) logging.info(f"User query preview (multimodal): {preview}") # Construct the payload - using direct routing with no fallbacks payload = { "model": model_name, "messages": messages, "max_tokens": min(body.get("max_tokens", max_tokens), max_tokens), "temperature": body.get("temperature", 0.7), "stream": True, # streaming enabled "route": "direct", # Use direct routing to prevent model substitution "fallbacks": [], # Empty array to prevent fallbacks } # Add additional parameters if present if "top_p" in body: payload["top_p"] = body["top_p"] if "frequency_penalty" in body: payload["frequency_penalty"] = body["frequency_penalty"] if "presence_penalty" in body: payload["presence_penalty"] = body["presence_penalty"] # Required headers for OpenRouter headers = { "Authorization": f"Bearer {self.valves.OPENROUTER_API_KEY}", "Content-Type": "application/json", "HTTP-Referer": "https://your.link.com", #Change this URL to match your server "X-Title": "your_title", #Change this title to match your needs "User-Agent": "OpenWebUI/1.0", } logging.info(f"Connecting to OpenRouter API at {self.MODEL_URL}") logging.info(f"Using payload: {json.dumps(payload, indent=2)}") # Stream the response back (SSE) return self._stream_with_ui( self.MODEL_URL, headers, payload, __event_emitter__ ) except Exception as e: logging.error(f"Error in pipe: {str(e)}", exc_info=True) return { "content": f"Error in OpenRouter connection: {str(e)}", "format": "text", } async def _stream_with_ui( self, url: str, headers: dict, payload: dict, __event_emitter__=None ) -> Generator[str, None, None]: """ A robust SSE handler for Anthropic-based streaming via OpenRouter. Completely rewritten to handle SSE format properly. """ try: logging.info("Initiating streaming connection to OpenRouter") # Use ClientTimeout to prevent hanging connections timeout = aiohttp.ClientTimeout(total=120) async with aiohttp.ClientSession(timeout=timeout) as session: # Log the request being sent logging.info(f"Sending POST request to {url}") async with session.post(url, headers=headers, json=payload) as response: # Immediately check and log the status code logging.info(f"OpenRouter response status: {response.status}") # Log full response headers for debugging logging.info(f"Response headers: {dict(response.headers)}") if response.status != 200: error_text = await response.text() logging.error(f"OpenRouter error response: {error_text}") yield f"Error from OpenRouter API: HTTP {response.status}: {error_text}" return # Set up buffer for SSE processing buffer = "" response_content_type = response.headers.get("Content-Type", "") logging.info(f"Response content type: {response_content_type}") if "text/event-stream" not in response_content_type: logging.warning( f"Response is not in SSE format! Content-Type: {response_content_type}" ) # Process the streaming response received_data = False model_used = None async for chunk in response.content: if not chunk: continue try: text_chunk = chunk.decode("utf-8") buffer += text_chunk # Process complete SSE messages that end with double newlines while "\n\n" in buffer or "\r\n\r\n" in buffer: # Split on first occurrence of a double newline if "\n\n" in buffer: message, buffer = buffer.split("\n\n", 1) else: message, buffer = buffer.split("\r\n\r\n", 1) # Process each line in the SSE message for line in message.split("\n"): line = line.strip() if not line or line == ":keep-alive": continue if line.startswith("data:"): data_content = line[5:].strip() # Handle the [DONE] marker if data_content == "[DONE]": logging.info( "Received [DONE] marker, stream complete" ) if model_used: logging.info( f"Final model used in response: {model_used}" ) continue try: data = json.loads(data_content) received_data = True # Log which model is actually responding if "model" in data and not model_used: model_used = data["model"] logging.info( f"Response is coming from model: {model_used}" ) # Extract content from various possible formats text_content = "" # Handle different response structures if "choices" in data and data["choices"]: choice = data["choices"][0] # Standard OpenAI/OpenRouter format if ( "delta" in choice and "content" in choice["delta"] ): text_content = choice["delta"][ "content" ] # Direct content format elif "content" in choice: text_content = choice["content"] # Text field format elif "text" in choice: text_content = choice["text"] # Anthropic format elif ( "message" in choice and "content" in choice["message"] ): text_content = choice["message"][ "content" ] if text_content: yield text_content except json.JSONDecodeError as e: logging.error( f"Failed to parse JSON from data: {data_content}", exc_info=True, ) except Exception as e: logging.error( f"Error processing data chunk: {str(e)}", exc_info=True, ) except UnicodeDecodeError as e: logging.error( f"Unicode decode error: {str(e)}", exc_info=True ) # Handle the case where we didn't receive any data if not received_data: logging.error( "No data received from OpenRouter during streaming" ) yield "Error: No response data received from OpenRouter. Please check your API key and try again." # Log final model information if model_used: logging.info(f"Completed streaming from model: {model_used}") else: logging.warning( "Completed streaming but model used was not identified" ) except aiohttp.ClientConnectorError as e: logging.error(f"Connection error to OpenRouter: {str(e)}", exc_info=True) yield f"Connection error: Could not connect to OpenRouter ({str(e)}). Please check your network settings and API key." except aiohttp.ClientError as e: logging.error(f"HTTP client error with OpenRouter: {str(e)}", exc_info=True) yield f"Error communicating with OpenRouter: {str(e)}" except asyncio.TimeoutError: logging.error("Request to OpenRouter timed out", exc_info=True) yield "Request timed out while waiting for OpenRouter response. Please try again." except Exception as e: logging.error(f"Unexpected error in streaming: {str(e)}", exc_info=True) yield f"Error: {str(e)}"