We're Hiring!
Whitepaper
Docs
Sign In
@rburmorrison
·
10 months ago
·
a year ago
function
OpenRouter Reasoning Token Models
Get
Last Updated
10 months ago
Created
a year ago
Function
pipe
v0.3
Name
OpenRouter Reasoning Token Models
Downloads
84+
Saves
0+
Description
Support for OpenRouter reasoning token models, including reasoning block.
Function Code
Show
""" title: OpenRouter Reasoning Token Models author: rburmorrison author_url: https://github.com/rburmorrison version: 0.3 """ from pydantic import BaseModel, Field import json import httpx class Pipe: class Valves(BaseModel): OPENROUTER_API_KEY: str = Field(default="", description="OpenRouter API Key") def __init__(self): self.valves = self.Valves() def pipes(self): return [ {"id": "deepseek/deepseek-r1:free", "name": "DeepSeek R1 (Free)"}, {"id": "deepseek/deepseek-r1", "name": "DeepSeek R1 (Standard)"}, {"id": "deepseek/deepseek-r1:nitro", "name": "DeepSeek R1 (Nitro)"}, { "id": "deepseek/deepseek-r1-distill-qwen-14b", "name": "DeepSeek R1 (Distill Qwen 14b)", }, { "id": "deepseek/deepseek-r1-distill-qwen-32b", "name": "DeepSeek R1 (Distill Qwen 32b)", }, { "id": "deepseek/deepseek-r1-distill-llama-70b", "name": "DeepSeek R1 (Distill Llama 70b)", }, ] async def pipe(self, body: dict, __user__: dict): print(f"pipe:{__name__}") if not self.valves.OPENROUTER_API_KEY: yield "Error: API key not set." return headers = { "Authorization": f"Bearer {self.valves.OPENROUTER_API_KEY}", "Content-Type": "application/json", "HTTP-Referer": "https://openwebui.com/", "X-Title": "Open WebUI", } # Extract model id from the model name model_id = body["model"][body["model"].find(".") + 1 :] # Set the initial state for reasoning reasoning = False # Update the model id in the body payload = {**body, "model": model_id, "include_reasoning": True} try: async with httpx.AsyncClient() as client: async with client.stream( "POST", "https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers, timeout=30, ) as response: response.raise_for_status() if body.get("stream", False): async for line in response.aiter_lines(): if not line.startswith("data: "): continue data = json.loads(line[6:]) choice = data.get("choices", [{}])[0] if choice.get("finish_reason"): return delta = choice.get("delta", {}) if not reasoning and delta.get("reasoning"): reasoning = True yield "<think>" yield "\n" elif reasoning and not delta.get("reasoning"): reasoning = False yield "\n</think>\n\n" content = delta.get("reasoning", "") or delta.get( "content", "" ) if content: yield content else: yield response.json() except Exception as e: yield f"Error: {e}"
Sponsored by Open WebUI Inc.
We are hiring!
Shape the way humanity engages with
intelligence
.