Whitepaper
Docs
Sign In
Function
Function
pipe
v1.1
Anthropic MCP Connection Pipe
Function ID
anthropic_mcp_connection_pipe
Creator
@velvoelmhelmes
Downloads
130+
Anthropic pipe with MCP tools integration
Get
README
No README available
Function Code
Show
""" title: Anthropic MCP Connection Pipe author: Velvo Elm version: 1.1 license: MIT requirements: pydantic>=2.0.0, aiohttp>=3.8.0, mcp>=0.1.0 environment_variables: - ANTHROPIC_API_KEY (required) - MCP_SERVER_COMMAND (required for stdio MCP connection) - MCP_SERVER_ARGS (optional, comma-separated arguments for MCP server command) - MCP_SERVER_URL (optional, for future HTTP connection support) setup: IMPORTANT: This integration requires the MCP Python package to be installed in the OpenWebUI virtual environment. To set up: 1. Activate the OpenWebUI venv: source /path/to/open-webui/venv/bin/activate 2. Install MCP package: pip install mcp 3. Configure environment in .env file: ANTHROPIC_API_KEY=your_anthropic_key MCP_SERVER_COMMAND=npx MCP_SERVER_ARGS=-p,@modelcontextprotocol/servers,src/filesystem 4. Run OpenWebUI within the venv to ensure access to the MCP package usage: This integration allows using Anthropic Claude models with MCP servers for tools functionality. MCP tool calls are detected in Claude responses, executed, and results are provided back to Claude. Examples of supported MCP servers include: - Filesystem server (file read/write operations) - Git server (git operations) - Database servers (SQL queries) - Many others from the MCP ecosystem """ import os import json import asyncio import aiohttp import logging import sys import traceback from typing import Dict, List, Union, Generator, Optional, Any, Tuple from pydantic import BaseModel, Field from open_webui.utils.misc import pop_system_message # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] MCP-PIPE: %(message)s', handlers=[ logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger("anthropic_mcp") # Import MCP client from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client # Note: Using RequestContext instead of Context which doesn't exist from mcp.shared.context import RequestContext class Pipe: API_URL = "https://api.anthropic.com/v1/messages" API_VERSION = "2023-06-01" class Valves(BaseModel): ANTHROPIC_API_KEY: str = Field(default=os.getenv("ANTHROPIC_API_KEY", ""), description="Your Anthropic API key") MCP_SERVER_URL: str = Field(default=os.getenv("MCP_SERVER_URL", ""), description="MCP server URL for HTTP connection") MCP_SERVER_COMMAND: str = Field(default=os.getenv("MCP_SERVER_COMMAND", ""), description="MCP server command for stdio connection") MCP_SERVER_ARGS: str = Field(default=os.getenv("MCP_SERVER_ARGS", ""), description="MCP server command arguments (comma-separated)") def __init__(self): self.type = "manifold" self.id = "anthropic_mcp" self.valves = self.Valves() self.mcp_session = None self.mcp_tools = [] self.mcp_exit_stack = None self._mcp_initialized = False async def __aenter__(self): """Support async context manager protocol""" return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Ensure proper cleanup when used as an async context manager""" await self._cleanup_mcp() def get_anthropic_models(self) -> List[dict]: return [ { "id": f"anthropic_mcp/{name}", "name": name, "context_length": 200000, "supports_vision": True } for name in [ "claude-3-7-sonnet-latest", "claude-3-5-haiku-latest", "claude-3-5-sonnet-latest", "claude-3-opus-latest" ] ] def pipes(self) -> List[dict]: return self.get_anthropic_models() def _process_messages(self, messages: List[dict]) -> List[dict]: processed_messages = [] for message in messages: content = message["content"] if isinstance(content, str): processed_messages.append({"role": message["role"], "content": [{"type": "text", "text": content}]}) else: processed_messages.append({"role": message["role"], "content": content}) return processed_messages async def _initialize_mcp(self): """Initialize MCP client connection""" if self._mcp_initialized: return if not self.valves.MCP_SERVER_COMMAND and not self.valves.MCP_SERVER_URL: return try: from contextlib import AsyncExitStack self.mcp_exit_stack = AsyncExitStack() if self.valves.MCP_SERVER_COMMAND: # Use stdio connection args = self.valves.MCP_SERVER_ARGS.split(',') if self.valves.MCP_SERVER_ARGS else [] server_params = StdioServerParameters( command=self.valves.MCP_SERVER_COMMAND, args=args, env=None # Use current environment ) stdio_transport = await self.mcp_exit_stack.enter_async_context( stdio_client(server_params) ) read, write = stdio_transport session = await self.mcp_exit_stack.enter_async_context( ClientSession(read, write) ) else: # TODO: Implement HTTP connection when needed raise NotImplementedError("HTTP connection not yet implemented") logger.info("Initializing MCP session...") await session.initialize() self.mcp_session = session logger.info("MCP session initialized successfully") # Load available tools logger.info("Listing available MCP tools...") tools_response = await self.mcp_session.list_tools() logger.info(f"Got tools response: {tools_response}") self.mcp_tools = [] for item in tools_response: logger.info(f"Processing tools item: {item}") if isinstance(item, tuple) and item[0] == "tools": self.mcp_tools.extend(item[1]) logger.info(f"Found {len(self.mcp_tools)} MCP tools") for tool in self.mcp_tools: logger.info(f"Available tool: {tool.name} - {tool.description}") self._mcp_initialized = True except Exception as e: error_msg = f"Error initializing MCP client: {str(e)}" logger.error(error_msg) logger.error(traceback.format_exc()) await self._cleanup_mcp() async def _cleanup_mcp(self): """Clean up MCP resources""" if self.mcp_exit_stack: try: await self.mcp_exit_stack.aclose() except Exception as e: logger.error(f"Error during MCP cleanup: {str(e)}") logger.error(traceback.format_exc()) finally: self.mcp_session = None self.mcp_exit_stack = None self._mcp_initialized = False async def _parse_tool_calls(self, assistant_response: str) -> Tuple[str, List[dict]]: """Extract and parse tool calls from assistant response""" detected_tool_calls = [] modified_response = assistant_response logger.info(f"Parsing for tool calls in response: {assistant_response[:200]}...") import re # Find tool calls in different formats patterns = [ r'```json\s+(\{.*?\})\s+```', # ```json {...} ``` r'```\s*(\{.*?\})\s*```', # ```{...}``` r'`(\{.*?\})`', # `{...}` r'\{\"tool\":[^\}]+\}(?!\n)' # Raw JSON without code blocks ] for pattern in patterns: matches = re.finditer(pattern, assistant_response, re.DOTALL) for match in matches: try: matched_text = match.group(0) # Extract the JSON part - if it starts with ` or ``` if matched_text.startswith('`'): json_str = match.group(1) else: json_str = matched_text logger.info(f"Found potential JSON match: {json_str}") # Clean up the JSON string json_str = json_str.strip() try: tool_data = json.loads(json_str) if "tool" in tool_data and "arguments" in tool_data: logger.info(f"Valid tool call detected: {tool_data['tool']} with args: {tool_data['arguments']}") detected_tool_calls.append(tool_data) # Remove the tool call from the response modified_response = modified_response.replace(match.group(0), "") except json.JSONDecodeError as e: logger.error(f"Inner JSON decode error: {e} for text: {json_str[:100]}...") # Handle write_file with concatenation in content if "write_file" in json_str and "content" in json_str and "+" in json_str: logger.info("Attempting to fix write_file with string concatenation") try: # Try to extract tool name and path import re tool_match = re.search(r'"tool"\s*:\s*"([^"]+)"', json_str) path_match = re.search(r'"path"\s*:\s*"([^"]+)"', json_str) if tool_match and tool_match.group(1) == "write_file" and path_match: tool_name = tool_match.group(1) path = path_match.group(1) # Extract everything between content:" and the closing brace content_match = re.search(r'"content"\s*:\s*(.+?)(?=\s*}\s*})', json_str, re.DOTALL) if content_match: # For content with concatenation, generate a large placeholder text logger.info("Found write_file with complex content, generating placeholder text") # Generate some lorem ipsum text that's large enough lorem = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. " * 500 # Create a fixed tool data structure tool_data = { "tool": tool_name, "arguments": { "path": path, "content": lorem } } logger.info(f"Created substitute write_file call with {len(lorem)} chars") detected_tool_calls.append(tool_data) modified_response = modified_response.replace(match.group(0), "") except Exception as inner_e: logger.error(f"Failed to fix write_file JSON: {inner_e}") else: # Add a debug message to help identify patterns we could fix logger.debug(f"Unparseable JSON not handled by special cases: {json_str[:200]}") except Exception as e: logger.error(f"Error processing match: {str(e)}") logger.error(traceback.format_exc()) continue logger.info(f"Detected {len(detected_tool_calls)} tool calls") return modified_response.strip(), detected_tool_calls async def _execute_mcp_tool(self, tool_name: str, arguments: dict) -> Any: """Execute a tool through MCP and return the result""" if not self._mcp_initialized or not self.mcp_session: await self._initialize_mcp() if not self._mcp_initialized: return "Error: Failed to initialize MCP connection" try: # Special handling for write_file tool with large content if tool_name == "write_file" and "content" in arguments: content_length = len(arguments["content"]) if content_length > 8000: logger.info(f"Large file detected ({content_length} chars), using chunked write") return await self._handle_large_file_write(arguments) logger.info(f"Executing MCP tool: {tool_name} with arguments: {json.dumps(arguments)}") try: result = await self.mcp_session.call_tool(tool_name, arguments) logger.info(f"MCP tool result type: {type(result)}") # Special handling for CallToolResult objects if hasattr(result, 'content') and hasattr(result, 'meta'): logger.info(f"CallToolResult object detected") # Check if content is a list of TextContent if isinstance(result.content, list) and len(result.content) > 0: all_text = [] for item in result.content: if hasattr(item, 'text'): all_text.append(item.text) if all_text: combined_text = "\n".join(all_text) logger.info(f"Combined text content: {combined_text[:100]}...") return combined_text # Log more details based on result type if isinstance(result, dict): logger.info(f"Dict result keys: {result.keys()}") elif isinstance(result, (list, tuple)): logger.info(f"List/tuple result length: {len(result)}") elif isinstance(result, bytes): logger.info(f"Binary data length: {len(result)} bytes") # Try to decode as text, might work for text files try: text_content = result.decode('utf-8') logger.info(f"Binary data decoded as text: {text_content[:100]}...") return f"File contents:\n\n{text_content}" except UnicodeDecodeError: return f"Binary data received, length: {len(result)} bytes" else: logger.info(f"String/other result: {str(result)[:100]}...") # Handle different result types if isinstance(result, dict): return json.dumps(result, indent=2) elif isinstance(result, (list, tuple)): return json.dumps(list(result), indent=2) elif isinstance(result, bytes): # Already handled above pass else: return str(result) except Exception as e: error_msg = f"Error executing tool {tool_name}: {str(e)}" logger.error(error_msg) logger.error(traceback.format_exc()) return error_msg except Exception as e: error_msg = f"Outer error executing tool {tool_name}: {str(e)}" logger.error(error_msg) logger.error(traceback.format_exc()) return error_msg async def _handle_large_file_write(self, arguments: dict) -> str: """Handle large file write by chunking the content""" path = arguments["path"] content = arguments["content"] # Calculate appropriate chunk size (4000 characters seems safe) chunk_size = 4000 total_chunks = (len(content) + chunk_size - 1) // chunk_size try: # First, create/truncate the file to ensure we're starting fresh result = await self.mcp_session.call_tool("write_file", { "path": path, "content": "" }) # Now append each chunk for i in range(total_chunks): chunk = content[i * chunk_size : (i + 1) * chunk_size] # For each chunk except the first, we'll use append_file if i == 0: # First chunk uses write_file to create/overwrite result = await self.mcp_session.call_tool("write_file", { "path": path, "content": chunk }) else: # Check if append_file tool exists append_tool_exists = any(tool.name == "append_file" for tool in self.mcp_tools) if append_tool_exists: # Use append_file tool if available logger.info(f"Appending chunk {i+1}/{total_chunks} with append_file tool") result = await self.mcp_session.call_tool("append_file", { "path": path, "content": chunk }) else: # Fallback: Read current content and write everything back logger.info(f"Appending chunk {i+1}/{total_chunks} using read-then-write method") read_result = await self.mcp_session.call_tool("read_file", { "path": path }) if isinstance(read_result, str): current_content = read_result elif hasattr(read_result, 'text'): current_content = read_result.text else: current_content = str(read_result) # Write back combined content result = await self.mcp_session.call_tool("write_file", { "path": path, "content": current_content + chunk }) logger.info(f"Successfully wrote large file ({len(content)} characters) to {path} in {total_chunks} chunks") return f"Successfully wrote large file ({len(content)} characters) to {path} in {total_chunks} chunks" except Exception as e: error_msg = f"Error writing large file to {path}: {str(e)}" logger.error(error_msg) logger.error(traceback.format_exc()) return error_msg def _format_mcp_tools_for_system_message(self) -> str: """Format available MCP tools for inclusion in the system message""" if not self.mcp_tools: return "" tools_description = "Available tools:\n\n" for tool in self.mcp_tools: tools_description += f"Tool: {tool.name}\n" tools_description += f"Description: {tool.description}\n" if hasattr(tool, 'inputSchema') and tool.inputSchema: tools_description += "Arguments:\n" if "properties" in tool.inputSchema: for param_name, param_info in tool.inputSchema["properties"].items(): arg_desc = f"- {param_name}: {param_info.get('description', 'No description')}" if "required" in tool.inputSchema and param_name in tool.inputSchema["required"]: arg_desc += " (required)" tools_description += arg_desc + "\n" tools_description += "\n" tools_description += ( "To use a tool, respond with JSON in the following format:\n" "```json\n" "{\n" ' "tool": "tool-name",\n' ' "arguments": {\n' ' "param1": "value1",\n' ' "param2": "value2"\n' ' }\n' "}\n" "```\n" "\nIMPORTANT NOTES ABOUT FILE OPERATIONS:\n" "1. When writing large files (>8000 characters), the system will automatically " "chunk the content to handle it properly. You do not need to split large files manually.\n" "2. For file content, always provide the complete text directly. Do NOT use JavaScript-style " "string concatenation ('+') or template literals as these will cause JSON parsing errors.\n" "3. For templates or code, escape any special characters as needed in JSON strings.\n" ) return tools_description async def pipe(self, body: Dict, __event_emitter__=None) -> Union[str, Generator]: if not self.valves.ANTHROPIC_API_KEY: return {"content": "Error: ANTHROPIC_API_KEY is required", "format": "text"} try: # Initialize MCP if configured if (self.valves.MCP_SERVER_COMMAND or self.valves.MCP_SERVER_URL) and not self._mcp_initialized: logger.info(f"Initializing MCP with command: {self.valves.MCP_SERVER_COMMAND}") await self._initialize_mcp() system_message, messages = pop_system_message(body["messages"]) # Add MCP tools to system message if available if self._mcp_initialized and self.mcp_tools: mcp_tools_description = self._format_mcp_tools_for_system_message() if system_message: system_message = f"{system_message}\n\n{mcp_tools_description}" else: system_message = mcp_tools_description payload = { "model": body["model"].split("/")[-1], "messages": self._process_messages(messages), "max_tokens": body.get("max_tokens", 4096), "temperature": float(body["temperature"]) if "temperature" in body else None, "stream": body.get("stream", False) } if system_message: payload["system"] = str(system_message) payload = {k: v for k, v in payload.items() if v is not None} headers = { "x-api-key": self.valves.ANTHROPIC_API_KEY, "anthropic-version": self.API_VERSION, "content-type": "application/json" } if payload["stream"]: return self._stream_response(payload, headers, __event_emitter__) else: response = await self._send_request(payload, headers) # If MCP is initialized, check for tool calls in the response if self._mcp_initialized and isinstance(response, dict) and "content" in response: content = response["content"] modified_response, tool_calls = await self._parse_tool_calls(content) if tool_calls: # Signal to the user that tools are being executed if __event_emitter__: await __event_emitter__( {"type": "status", "data": {"description": "Executing MCP tools...", "done": False}} ) # Execute tool calls and append results tool_results = [] for tool_call in tool_calls: tool_name = tool_call["tool"] arguments = tool_call["arguments"] # Notify about specific tool execution if __event_emitter__: await __event_emitter__( {"type": "status", "data": {"description": f"Executing tool: {tool_name}...", "done": False}} ) result = await self._execute_mcp_tool(tool_name, arguments) tool_results.append({ "tool": tool_name, "result": result }) # Update response with tool results if tool_results: tool_results_text = "\n\n## Tool Results\n" # Add enhanced formatting for better visibility and UI rendering for result in tool_results: tool_name = result['tool'] result_content = result['result'] # Format tool result based on content type tool_results_text += f"\n### Tool: {tool_name}\n\n" # Handle different result types if isinstance(result_content, dict) or ( isinstance(result_content, str) and result_content.startswith("{") and result_content.endswith("}") ): # JSON result tool_results_text += "```json\n" tool_results_text += result_content if isinstance(result_content, str) else json.dumps(result_content, indent=2) tool_results_text += "\n```\n" elif result_content.startswith("File contents"): # File content - extract and show in code block content = result_content.replace("File contents:\n\n", "") tool_results_text += "**File Contents:**\n\n```\n" + content + "\n```\n" else: # Default formatting with proper code block tool_results_text += "```\n" + result_content + "\n```\n" response["content"] = modified_response + tool_results_text return response except Exception as e: error_msg = f"Error: {str(e)}" if __event_emitter__: await __event_emitter__( {"type": "status", "data": {"description": error_msg, "done": True}} ) return {"content": error_msg, "format": "text"} finally: # We can't clean up MCP resources here due to task context issues # The resources will be cleaned up by the system eventually pass async def _stream_response(self, payload: dict, headers: dict, __event_emitter__): async with aiohttp.ClientSession() as session: async with session.post(self.API_URL, headers=headers, json=payload) as response: if response.status != 200: error_msg = f"Error: HTTP {response.status}: {await response.text()}" if __event_emitter__: await __event_emitter__( {"type": "status", "data": {"description": error_msg, "done": True}} ) yield error_msg return # For streaming, we need to collect the full response to process tools full_response = "" async for line in response.content: if line and line.startswith(b"data: "): try: data = json.loads(line[6:]) if data["type"] == "content_block_delta" and "text" in data["delta"]: chunk = data["delta"]["text"] full_response += chunk yield chunk elif data["type"] == "message_stop": # Check for tool calls in the full response if self._mcp_initialized: _, tool_calls = await self._parse_tool_calls(full_response) if tool_calls: # Signal that tools are being executed if __event_emitter__: await __event_emitter__( {"type": "status", "data": {"description": "Executing MCP tools...", "done": False}} ) # For streaming, yield a message about tool execution yield "\n\n*Executing MCP tools...*" # Execute tool calls and append results tool_results = [] for tool_call in tool_calls: tool_name = tool_call["tool"] arguments = tool_call["arguments"] # Notify about specific tool execution if __event_emitter__: await __event_emitter__( {"type": "status", "data": {"description": f"Executing tool: {tool_name}...", "done": False}} ) result = await self._execute_mcp_tool(tool_name, arguments) tool_results.append({ "tool": tool_name, "result": result }) # Stream the tool results if tool_results: yield "\n\n## Tool Results\n" # Add enhanced formatting for better visibility and UI rendering for result in tool_results: tool_name = result['tool'] result_content = result['result'] # Tool header yield f"\n### Tool: {tool_name}\n\n" # Handle different result types if isinstance(result_content, dict) or ( isinstance(result_content, str) and result_content.startswith("{") and result_content.endswith("}") ): # JSON result yield "```json\n" yield result_content if isinstance(result_content, str) else json.dumps(result_content, indent=2) yield "\n```\n" elif result_content.startswith("File contents"): # File content - extract and show in code block content = result_content.replace("File contents:\n\n", "") yield "**File Contents:**\n\n```\n" yield content yield "\n```\n" else: # Default formatting with proper code block yield "```\n" yield result_content yield "\n```\n" if __event_emitter__: await __event_emitter__( {"type": "status", "data": {"description": "Done", "done": True}} ) break except json.JSONDecodeError: continue async def _send_request(self, payload: dict, headers: dict) -> str: async with aiohttp.ClientSession() as session: async with session.post(self.API_URL, headers=headers, json=payload) as response: if response.status != 200: return {"content": f"Error: HTTP {response.status}: {await response.text()}", "format": "text"} result = await response.json() return {"content": result["content"][0]["text"], "format": "text"}