Whitepaper
Docs
Sign In
Function
Function
pipe
v0.3
AI Router Model Routing
Function ID
ai_router_model_routing
Creator
@airouterio
Downloads
22+
Use airouter.io to automatically use the best model for every request. Save costs and enable easy access to a carefully curated model mix.
Get
README
Function Code
Show
""" title: Model Router author: airouter.io | Matthias Lau author_url: https://airouter.io version: 0.3 """ from pydantic import BaseModel, Field from typing import Callable, Any, Awaitable, Iterator, Dict, List, Union, Optional import requests import json import asyncio import logging # Set up logging logger = logging.getLogger(__name__) class Pipe: """ Main pipe class for routing AI model requests through airouter.io. """ BASE_URL = "https://api.airouter.io/v1" class Valves(BaseModel): """Configuration settings for the Model Router pipe.""" AIROUTER_API_KEY: str = Field( default="", description="API key for authenticating requests to the AI Router API.", ) NAME_PREFIX: str = Field( default="", description="The optional prefix applied before the model names.", ) ENABLE_MODEL_GATEWAY: bool = Field( default=False, description="Enable model gateway to also show all supported models.", ) SHOW_MODEL_USED: bool = Field( default=True, description="Show the model that was used by the model routing.", ) def __init__(self): """Initialize the pipe with default valves.""" self.valves = self.Valves() def pipes(self) -> List[Dict[str, str]]: """ Fetch available AI models from the router API. Returns: List of model dictionaries containing id and name. """ # always add the auto mode auto_model = [ { "id": "auto", "name": f"{self.valves.NAME_PREFIX}auto", } ] model_gateway_models = [] # add all supported models if the gateway mode is activated if self.valves.ENABLE_MODEL_GATEWAY and self.valves.AIROUTER_API_KEY: try: headers = { "Authorization": f"Bearer {self.valves.AIROUTER_API_KEY}", "Content-Type": "application/json", } with requests.get(f"{self.BASE_URL}/models", headers=headers) as r: r.raise_for_status() models = r.json() model_gateway_models = sorted( [ { "id": model["id"], "name": f'{self.valves.NAME_PREFIX}{model.get("name", model["id"])}', } for model in models["data"] ], key=lambda x: x["name"], ) except requests.RequestException as e: logger.error(f"Error fetching models: {e}") except ValueError as e: logger.error(f"Error parsing models response: {e}") return auto_model + model_gateway_models async def pipe( self, body: Dict[str, Any], __event_emitter__: Callable[[Any], Awaitable[None]], __metadata__, ): """ Process an AI request through the router. Args: body: The request body containing the model and prompt __event_emitter__: Callback for emitting status events Returns: Either a generator for streaming responses or a JSON result """ headers = { "Authorization": f"Bearer {self.valves.AIROUTER_API_KEY}", "Content-Type": "application/json", } model_id = ( body["model"].split(".", 1)[1] if "." in body["model"] else body["model"] ) payload = {**body, "model": model_id, "model_routing": True} # limit models to the default model for gateway mode gateway_mode = False if model_id != "auto": gateway_mode = True payload["models"] = [model_id] show_models = self.valves.SHOW_MODEL_USED and not gateway_mode # deactivate the model debugging for auto-generation tasks if "task" in __metadata__ and "_generation" in __metadata__["task"]: show_models = False try: r = requests.post( url=f"{self.BASE_URL}/chat/completions", json=payload, headers=headers, stream=True, ) r.raise_for_status() if body.get("stream", False): return ModelTrackingGenerator(r, __event_emitter__, show_models) else: r_json = r.json() if show_models: asyncio.create_task( __event_emitter__( { "type": "status", "data": { "description": f"Used {r_json['model']}", "done": True, }, } ) ) return r_json except requests.RequestException as e: logger.error(f"Request error: {e}") # Return error in format matching expected output await __event_emitter__( { "type": "status", "data": { "description": f"Error: {str(e)}", "done": True, "error": True, }, } ) return {"error": str(e)} except Exception as e: logger.error(f"Unexpected error: {e}") await __event_emitter__( { "type": "status", "data": { "description": f"Error: {str(e)}", "done": True, "error": True, }, } ) return {"error": str(e)} class ModelTrackingGenerator: """ A wrapper around response.iter_lines() that extracts and logs the model name from the first applicable chunk, while maintaining all original generator behavior. """ def __init__(self, response, event_emitter, show_model_used=True): """ Initialize the generator with a response object and event emitter. Args: response: The HTTP response object with streaming content event_emitter: Callback for emitting status events """ self.response = response self.event_emitter = event_emitter self.model = None self.model_logged = False self.show_model_used = show_model_used # Store the original iterator self.iterator = response.iter_lines() def __iter__(self) -> Iterator: """Return self as an iterator""" return self def __next__(self) -> bytes: """ Get the next line and check for model info if needed. Returns: The next chunk from the response Raises: StopIteration: When the response is complete """ try: # Get the next line from the stored iterator line = next(self.iterator) # Try to extract model if not already logged if not self.model_logged and line.startswith(b"data: "): try: data = json.loads(line[6:].decode("utf-8")) if "model" in data: self.model = data["model"] if self.show_model_used: asyncio.create_task( self.event_emitter( { "type": "status", "data": { "description": f"Using {self.model}", "done": False, }, } ) ) self.model_logged = True except json.JSONDecodeError: pass except Exception as e: logger.error(f"Error extracting model info: {e}") return line except StopIteration: if self.show_model_used: # Send completion event if we found a model if self.model: asyncio.create_task( self.event_emitter( { "type": "status", "data": { "description": f"Used {self.model}", "done": True, }, } ) ) else: # Send generic completion event if no model was found asyncio.create_task( self.event_emitter( { "type": "status", "data": { "description": "Request completed", "done": True, }, } ) ) # Close the response when done to free resources self.response.close() # Make sure we properly propagate StopIteration raise