Whitepaper
Docs
Sign In
Function
Function
pipe
v0.0.5 (17.02.2025)
Mistral API
Function ID
Mistral
Creator
@peuportier
Downloads
143+
Mistral API working with OpenWebui Rag and also Pixtral image
Get
README
No README available
Function Code
Show
""" Mistral API title: Mistral Manifold Pipe author: Peuportier Aurelien modified from Anthropic and Jari Hiltunen function done by author_url: https://github.com/justinh-rahb version: 0.0.5 (17.02.2025) required_open_webui_version: 0.5.xx license: MIT Note! Image creation i'm still working on it. If any help, I would appreciate. """ import os import json import requests import time from typing import List, Union, Dict, Generator # Added Generator import from pydantic import BaseModel, Field class Pipe: class Valves(BaseModel): """Configuration for Mistral API.""" MISTRAL_API_BASE_URL: str = Field(default="https://api.mistral.ai/v1") MISTRAL_API_KEY: str = Field(default="") def __init__(self): self.debug_models = False self.debug_stream = True self.debug_errors = True self.type = "manifold" self.id = "mistral" self.name = "mistral/" # European server self.server = "https://api.mistral.ai" self.models_url = self.server + "/v1/models" self.chat_url = self.server + "/v1/chat/completions" self.temperature = 0.7 self.top_p = 0.9 self.max_tokens = 4096 api_key = os.getenv("MISTRAL_API_KEY", "").strip() self.valves = self.Valves(MISTRAL_API_KEY=api_key) self.last_request_time: float = ( 0.0 # Initialize the last request time for rate limiting ) self.rate_limit_reset: float = 0.0 # Initialize rate_limit_reset to 0 self.rate_limit_interval: float = ( 30.0 # Set the rate limit interval in seconds (Open is 100 requests per hour) ) self.models = "" # Not yet implemented! self.MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5MB per image self.image_url = "" def _debug(self, message: str): """Prints debug messages if DEBUG is enabled.""" if self.debug_errors: print(message) def _get_headers(self) -> Dict[str, str]: """Returns the headers for API requests.""" if not self.valves.MISTRAL_API_KEY: raise ValueError("MISTRAL_API_KEY is missing or invalid.") return { "Authorization": f"Bearer {self.valves.MISTRAL_API_KEY}", "Content-Type": "application/json", } def get_mistral_models(self) -> List[Dict[str, Union[str, int, bool]]]: """Fetches available Mistral models, filters, and returns unique models.""" if not self.valves.MISTRAL_API_KEY: raise ValueError("MISTRAL_API_KEY is missing or invalid.") headers = { "Authorization": f"Bearer {self.valves.MISTRAL_API_KEY}", "Content-Type": "application/json", } try: response = requests.get(f"{self.models_url}", headers=headers) response.raise_for_status() self.models = response.json()["data"] except requests.exceptions.RequestException as e: if self.debug_errors: print(f"API call failed: {e}") # Map to track unique models model_map = {} for model in self.models: # Check if the model has the `completion_chat` capability if not model["capabilities"].get("completion_chat", False): continue # Extract base ID and check if it's a "latest" version base_id = "-".join(model["id"].split("-")[:-1]) is_latest = "latest" in model["id"] or "latest" in model["aliases"] # Update or add model to the map if base_id not in model_map or is_latest: model_map[base_id] = model # Prepare the final list of unique models unique_models = [] for base_id, model in model_map.items(): unique_models.append( { "id": model["id"], "name": model["name"], "capabilities": model["capabilities"], "description": model["description"], "max_context_length": model["max_context_length"], "aliases": model["aliases"], "deprecation": model["deprecation"], "default_model_temperature": model["default_model_temperature"], "type": model["type"], } ) if self.debug_models: print("Unique Models:") for model in unique_models: print(f"ID: {model['id']}") print(f"Name: {model['name']}") print(f"Capabilities: {model['capabilities']}") print(f"Description: {model['description']}") print(f"Max Context Length: {model['max_context_length']}") print(f"Aliases: {model['aliases']}") print(f"Deprecation: {model['deprecation']}") print( f"Default Model Temperature: {model['default_model_temperature']}" ) print(f"Type: {model['type']}") print("-" * 40) return unique_models def pipes(self) -> List[dict]: """Returns a list of available models.""" return self.get_mistral_models() def pipe(self, body: dict) -> Union[str, Generator[str, None, None]]: """Handles a single request to the pipe.""" try: model = body["model"].removeprefix("mistral.") messages = body["messages"] stream = body.get("stream", False) if self.debug_stream: self._debug("Incoming body:") self._debug(json.dumps(body, indent=2)) if stream: return self.stream_response(model, messages) return self.get_completion(model, messages) except KeyError as e: error_msg = f"Missing required key in body: {e}" self._debug(error_msg) return f"Error: {error_msg}" except Exception as e: self._debug(f"Error in pipe method: {e}") return f"Error: {e}" def stream_response( self, model: str, messages: List[dict], retries: int = 5 ) -> Generator[str, None, None]: """Streams a response from the Mistral API, handling rate limits.""" url = f"{self.chat_url}" payload = {"model": model, "messages": messages, "stream": True} self._debug(f"Streaming response from {url}") self._debug(f"Payload: {json.dumps(payload, indent=2)}") for attempt in range(retries): try: response = requests.post( url, json=payload, headers=self._get_headers(), stream=True ) response.raise_for_status() for line in response.iter_lines(): if line: try: line_data = line.decode("utf-8").lstrip("data: ") event = json.loads(line_data) self._debug(f"Received stream event: {event}") delta_content = ( event.get("choices", [{}])[0] .get("delta", {}) .get("content") ) if delta_content: yield delta_content if ( event.get("choices", [{}])[0].get("finish_reason") == "stop" ): break except json.JSONDecodeError: self._debug(f"Failed to decode stream line: {line}") continue return # Exit after successful streaming except requests.RequestException as e: if response.status_code == 429 and attempt < retries - 1: wait_time = 2**attempt self._debug( f"Rate limited (429). Retrying after {wait_time} seconds..." ) time.sleep(wait_time) else: self._debug(f"Stream request failed: {e}") yield f"Error: {str(e)}" def get_completion(self, model: str, messages: List[dict], retries: int = 3) -> str: """Fetches a single completion response, handling rate limits.""" url = f"{self.chat_url}" payload = {"model": model, "messages": messages} for attempt in range(retries): try: self._debug( f"Attempt {attempt + 1}: Sending completion request to {url}" ) response = requests.post(url, json=payload, headers=self._get_headers()) data = self._handle_response(response) return data["choices"][0]["message"]["content"] except requests.RequestException as e: if response.status_code == 429 and attempt < retries - 1: wait_time = 2**attempt self._debug( f"Rate limited (429). Retrying after {wait_time} seconds..." ) time.sleep(wait_time) else: self._debug(f"Completion request failed: {e}") return f"Error: {str(e)}"