We're Hiring!
Whitepaper
Docs
Sign In
@mariepop13
·
10 months ago
·
10 months ago
function
OpenRouter Reasoning with smart selection of model
Get
Last Updated
10 months ago
Created
10 months ago
Function
pipe
v0.3
Name
OpenRouter Reasoning with smart selection of model
Downloads
81+
Saves
0+
Description
This pipe allows you to use the selected model with reasoning.
Function Code
Show
""" title: OpenRouter Reasoning with smart selection of model author: mariepop13 author_url: https://mariepop13.com/ description: This pipe enables the utilization of the selected model for reasoning tasks. Update: The model error has been resolved! All reasoning models are prefixed with "Reasoning - ". Please ensure that your OPENROUTER_API_KEY is included in the valves. Awaiting OpenRouter's comprehensive list of supported reasoning models. Stay tuned! version: 0.3 """ import os import requests import json import time import logging from typing import List, Union, Generator, Iterator, Optional from pydantic import BaseModel, Field from open_webui.utils.misc import pop_system_message # Constants OPENROUTER_REASONING_PREFIX = "openrouter_reasoning_with_smart_selection_of_model." logging.basicConfig( level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) class ModelInfo(BaseModel): id: str name: str description: Optional[str] = None context_length: Optional[int] = None pricing: Optional[dict] = None class Tools: def __init__(self): pass class Pipe: class Valves(BaseModel): OPENROUTER_API_KEY: str = Field(default="") def __init__(self): self.type = "manifold" self.id = "openrouter" self.valves = self.Valves( **{ "OPENROUTER_API_KEY": os.getenv("OPENROUTER_API_KEY", ""), } ) self._models_cache = None self._last_fetch_time = 0 self._cache_duration = 300 logger.debug("Pipe instance initialized") def _get_headers(self): headers = { "Authorization": f"Bearer {self.valves.OPENROUTER_API_KEY}", "HTTP-Referer": "", "X-Title": "Open WebUI", "Content-Type": "application/json", } logger.debug("Headers generated") return headers def fetch_openrouter_models(self) -> List[dict]: logger.debug("Starting fetch_openrouter_models method") current_time = time.time() if ( self._models_cache is not None and (current_time - self._last_fetch_time) < self._cache_duration ): logger.debug("Returning cached models") return self._models_cache try: logger.debug("Fetching models from OpenRouter API") response = requests.get( "https://openrouter.ai/api/v1/models", headers=self._get_headers(), timeout=10, ) if response.status_code != 200: raise Exception( f"Failed to fetch models: {response.status_code} - {response.text}" ) models_data = response.json() processed_models = [] for model in models_data.get("data", []): if model.get("id", "").startswith( ( "deepseek/deepseek-r1", "perplexity/sonar-reasoningAionLabs", "aion-labs/aion-1.0", "aion-labs/aion-1.0-mini", "cognitivecomputations/dolphin3.0-r1-mistral-24b:free", ) ): model_info = { "id": model.get("id", "").replace( OPENROUTER_REASONING_PREFIX, "" ), "name": f"Reasoning - {model.get('name', '').split('/')[-1]}", "description": model.get("description", ""), "context_length": model.get("context_length"), "pricing": { "prompt": model.get("pricing", {}).get("prompt"), "completion": model.get("pricing", {}).get("completion"), }, } processed_models.append(model_info) logger.debug(f"Found {len(processed_models)} DeepSeek models") self._models_cache = processed_models self._last_fetch_time = current_time return processed_models except Exception as e: logger.error(f"Error fetching models: {e}") if self._models_cache is not None: return self._models_cache return [ { "id": "deepseek/deepseek-r1:free", "name": "Reasoning DeepSeek R1 (Free)", }, { "id": "deepseek/deepseek-r1", "name": "Reasoning DeepSeek R1 (Standard)", }, { "id": "deepseek/deepseek-r1:nitro", "name": "Reasoning DeepSeek R1 (Nitro)", }, ] def pipes(self) -> List[dict]: return self.fetch_openrouter_models() def pipe(self, body: dict) -> Union[str, Generator, Iterator]: logger.debug(f"pipe method called with body: {body}") body["model"] = body["model"].replace(OPENROUTER_REASONING_PREFIX, "") system_message, messages = pop_system_message(body["messages"]) if system_message: messages.insert(0, {"role": "system", "content": str(system_message)}) payload = { "model": body["model"], "messages": messages, "max_tokens": body.get("max_tokens", 4096), "temperature": body.get("temperature", 0.8), "top_p": body.get("top_p", 0.9), "stop": body.get("stop", []), "stream": body.get("stream", False), "include_reasoning": True, } logger.debug(f"Payload prepared: {payload}") url = "https://openrouter.ai/api/v1/chat/completions" try: if body.get("stream", False): return self.stream_response(url, self._get_headers(), payload) else: return self.non_stream_response(url, self._get_headers(), payload) except requests.exceptions.RequestException as e: logger.error(f"Request failed: {e}") return f"Error: Request failed: {e}" except Exception as e: logger.error(f"Error in pipe method: {e}") return f"Error: {e}" def stream_response(self, url, headers, payload): reasoning = False try: with requests.post( url, headers=headers, json=payload, stream=True, timeout=(3.05, 60) ) as response: if response.status_code != 200: raise Exception( f"HTTP Error {response.status_code}: {response.text}" ) for line in response.iter_lines(): if line: line = line.decode("utf-8") if line.startswith("data: "): try: data = json.loads(line[6:]) if "choices" in data and len(data["choices"]) > 0: choice = data["choices"][0] delta = choice.get("delta", {}) if not reasoning and delta.get("reasoning"): reasoning = True logger.debug("Reasoning started") yield "<think>\n" elif reasoning and not delta.get("reasoning"): reasoning = False logger.debug("Reasoning ended") yield "\n</think>\n\n" content = delta.get("reasoning", "") or delta.get( "content", "" ) if content: yield content time.sleep(0.01) except json.JSONDecodeError: logger.error(f"Failed to parse JSON: {line}") except KeyError as e: logger.error(f"Unexpected data structure: {e}") except Exception as e: logger.error(f"Error in stream_response: {e}") yield f"Error: {e}" def non_stream_response(self, url, headers, payload): try: response = requests.post( url, headers=headers, json=payload, timeout=(3.05, 60) ) if response.status_code != 200: raise Exception(f"HTTP Error {response.status_code}: {response.text}") res = response.json() return ( res["choices"][0]["message"]["content"] if "choices" in res and res["choices"] else "" ) except Exception as e: logger.error(f"Error in non_stream_response: {e}") return f"Error: {e}"
Sponsored by Open WebUI Inc.
We are hiring!
Shape the way humanity engages with
intelligence
.