Whitepaper
Docs
Sign In
Function
Function
pipe
v1.0
Deep Seek R1
Function ID
deep_seek_r1
Creator
@captresolve
Downloads
113+
Work around/Fix for 500: Ollama: 500, message='Internal Server Error', url='http://host.docker.internal:11434/api/chat '
Get
README
No README available
Function Code
Show
""" title: Deep Seek R1 (Internal Server Error message fix) author: CaptResolve author_url: https://github.com/open-webui funding_url: https://github.com/open-webui version: 1.0 Must pull the ollama models - update this function for your models. """ import os import json import time import logging import asyncio from typing import ( List, Union, Dict, Optional, AsyncIterator, ) from pydantic import BaseModel, Field from open_webui.utils.misc import pop_system_message import aiohttp class Pipe: REQUEST_TIMEOUT = 120 # Increased timeout for longer responses # DeepSeek model information DEEPSEEK_MODELS = { "deepseek-r1:70b": { "context_length": 32768, "max_tokens": 4096, }, "deepseek-r1:32b-qwen-distill-q4_K_M": { "context_length": 32768, "max_tokens": 4096, }, "deepseek-r1:14b": { "context_length": 32768, "max_tokens": 4096, }, "deepseek-r1:32b": { "context_length": 32768, "max_tokens": 4096, }, } class Valves(BaseModel): OLLAMA_API_URL: str = Field( default=os.getenv("OLLAMA_API_URL", "http://host.docker.internal:11434"), description="URL for the Ollama API (DeepSeek models are served through Ollama)", ) MAX_OUTPUT_TOKENS: bool = True # Valve to use maximum possible output tokens CONNECTION_CHECK: bool = ( True # Valve to enable/disable connection check on startup ) def __init__(self): logging.basicConfig(level=logging.INFO) self.type = "manifold" self.id = "deepseek" self.valves = self.Valves() self.request_id = None self.connection_verified = False # Verify connection on startup if enabled if self.valves.CONNECTION_CHECK: asyncio.create_task(self._verify_connection()) async def _verify_connection(self): """Verify connection to Ollama API on startup""" try: api_url = f"{self.valves.OLLAMA_API_URL.rstrip('/')}/api/tags" async with aiohttp.ClientSession() as session: timeout = aiohttp.ClientTimeout( total=5 ) # Short timeout for connection check async with session.get(api_url, timeout=timeout) as response: if response.status == 200: self.connection_verified = True logging.info( f"Successfully connected to Ollama API at {self.valves.OLLAMA_API_URL}" ) else: logging.warning( f"Connected to Ollama API at {self.valves.OLLAMA_API_URL} but received status code {response.status}" ) except Exception as e: logging.warning( f"Failed to connect to Ollama API at {self.valves.OLLAMA_API_URL}: {str(e)}" ) logging.info( "DeepSeek integration will still be available, but requests may fail if Ollama is not running" ) def get_deepseek_models(self) -> List[dict]: return [ { "id": f"deepseek/{name}", "name": name, "context_length": model_info["context_length"], "supports_vision": False, } for name, model_info in self.DEEPSEEK_MODELS.items() ] def pipes(self) -> List[dict]: return self.get_deepseek_models() async def pipe( self, body: Dict, __event_emitter__=None ) -> Union[str, AsyncIterator[str]]: """ Process a request to the DeepSeek API via Ollama. Args: body: The request body containing messages and parameters __event_emitter__: Optional event emitter for status updates Returns: Either a string response or an async iterator for streaming responses """ # Check if we need to warn about connection if self.valves.CONNECTION_CHECK and not self.connection_verified: warning_msg = f"Warning: Connection to Ollama API at {self.valves.OLLAMA_API_URL} has not been verified. Requests may fail." logging.warning(warning_msg) if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": warning_msg, "done": False}, } ) try: system_message, messages = pop_system_message(body["messages"]) if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Processing request...", "done": False}, } ) model_name = body["model"].split("/")[-1] if model_name not in self.DEEPSEEK_MODELS: logging.warning( f"Unknown model: {model_name}, using default parameters" ) # Get max tokens for the model max_tokens_limit = self.DEEPSEEK_MODELS.get(model_name, {}).get( "max_tokens", 4096 ) # If MAX_OUTPUT_TOKENS valve is enabled, use the maximum possible tokens for the model if self.valves.MAX_OUTPUT_TOKENS: max_tokens = max_tokens_limit else: max_tokens = min( body.get("max_tokens", max_tokens_limit), max_tokens_limit ) # Convert messages to Ollama format ollama_messages = self._convert_to_ollama_messages(messages, system_message) payload = { "model": model_name, "messages": ollama_messages, "stream": body.get("stream", False), "options": { "temperature": float(body.get("temperature", 0.7)), "top_p": ( float(body.get("top_p", 0.9)) if body.get("top_p") is not None else None ), "top_k": ( int(body.get("top_k", 40)) if body.get("top_k") is not None else None ), "num_predict": max_tokens, }, } # Remove None values from options payload["options"] = { k: v for k, v in payload["options"].items() if v is not None } api_url = f"{self.valves.OLLAMA_API_URL.rstrip('/')}/api/chat" try: if payload["stream"]: return self._stream_with_ui(api_url, payload, __event_emitter__) response_data = await self._send_request(api_url, payload) if isinstance(response_data, dict) and "error" in response_data: error_msg = f"Error: {response_data['error']}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": error_msg, "done": True, }, } ) return {"content": error_msg, "format": "text"} # Extract the response content response_text = response_data.get("message", {}).get("content", "") if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": "Request completed successfully", "done": True, }, } ) return response_text except Exception as e: error_msg = f"Request failed: {str(e)}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) return {"content": error_msg, "format": "text"} except Exception as e: error_msg = f"Error: {str(e)}" if __event_emitter__: await __event_emitter__( {"type": "status", "data": {"description": error_msg, "done": True}} ) return {"content": error_msg, "format": "text"} def _convert_to_ollama_messages( self, messages: List[dict], system_message: Optional[str] = None ) -> List[dict]: """ Convert Open WebUI messages to Ollama format. Args: messages: List of message objects system_message: Optional system message Returns: Messages in Ollama format """ ollama_messages = [] # Add system message if present if system_message: ollama_messages.append({"role": "system", "content": system_message}) # Process regular messages for message in messages: role = message["role"] # Map 'assistant' to 'assistant' and everything else to 'user' ollama_role = "assistant" if role == "assistant" else "user" # Handle content if isinstance(message["content"], str): content = message["content"] else: # For now, just extract text content from complex messages # DeepSeek via Ollama doesn't support images or other content types yet text_parts = [] for part in message["content"]: if part.get("type") == "text": text_parts.append(part.get("text", "")) content = "\n".join(text_parts) ollama_messages.append({"role": ollama_role, "content": content}) return ollama_messages async def _stream_with_ui( self, url: str, payload: dict, __event_emitter__=None ) -> AsyncIterator[str]: """ Stream responses from the DeepSeek API with UI event updates. Args: url: The API endpoint URL payload: Request payload __event_emitter__: Optional event emitter for status updates Yields: Text chunks from the streaming response """ try: async with aiohttp.ClientSession() as session: timeout = aiohttp.ClientTimeout(total=self.REQUEST_TIMEOUT) async with session.post(url, json=payload, timeout=timeout) as response: if response.status != 200: error_text = await response.text() error_msg = f"Error: HTTP {response.status}: {error_text}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": error_msg, "done": True, }, } ) yield error_msg return async for line in response.content: if line: try: data = json.loads(line) if "message" in data and "content" in data["message"]: # For streaming, Ollama sends the delta in the content field yield data["message"]["content"] # Check if this is the final message if data.get("done", False): if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": "Request completed", "done": True, }, } ) break except json.JSONDecodeError as e: logging.error( f"Failed to parse streaming response: {e}" ) continue except asyncio.TimeoutError: error_msg = "Request timed out" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) yield error_msg except Exception as e: error_msg = f"Stream error: {str(e)}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) yield error_msg async def _send_request(self, url: str, payload: dict) -> dict: """ Send a request to the DeepSeek API via Ollama with retry logic. Args: url: The API endpoint URL payload: Request payload Returns: Response data as a dictionary """ retry_count = 0 base_delay = 1 # Start with 1 second delay max_retries = 3 while retry_count < max_retries: try: async with aiohttp.ClientSession() as session: timeout = aiohttp.ClientTimeout(total=self.REQUEST_TIMEOUT) async with session.post( url, json=payload, timeout=timeout ) as response: if response.status == 429: retry_after = int( response.headers.get( "retry-after", base_delay * (2**retry_count) ) ) logging.warning( f"Rate limit hit. Retrying in {retry_after} seconds. Retry count: {retry_count + 1}" ) await asyncio.sleep(retry_after) retry_count += 1 continue response_text = await response.text() if response.status != 200: error_msg = f"HTTP {response.status}: {response_text}" # Provide a more helpful error message for common issues if response.status == 404: error_msg += "\nPossible causes: Ollama API URL is incorrect or the model is not installed." elif response.status in (500, 502, 503, 504): error_msg += "\nPossible causes: Ollama server is overloaded or experiencing issues." return {"error": error_msg} return json.loads(response_text) except aiohttp.ClientConnectorError as e: error_msg = f"Connection error: {str(e)}" error_msg += ( f"\nCannot connect to Ollama at {self.valves.OLLAMA_API_URL}." ) error_msg += "\nIf you're running in Docker, make sure Ollama is accessible from the container." error_msg += "\nTry setting OLLAMA_API_URL to 'http://host.docker.internal:11434' or the correct host address." # Only retry if it's not the last attempt if retry_count < max_retries - 1: logging.warning( f"{error_msg} Retrying in {base_delay * (2**retry_count)} seconds..." ) await asyncio.sleep(base_delay * (2**retry_count)) retry_count += 1 continue return {"error": error_msg} except aiohttp.ClientError as e: logging.error(f"Request failed: {str(e)}") if retry_count < max_retries - 1: retry_count += 1 await asyncio.sleep(base_delay * (2**retry_count)) continue return {"error": str(e)} logging.error("Max retries exceeded.") return {"error": "Max retries exceeded when connecting to Ollama"}