NOTICE
Open WebUI Community is currently undergoing a major revamp to improve user experience and performance. Expected completion by year-end! ✨

Function
pipe
v1.0.5
Semantic Model Router
Used to allow small LLMs to to make intention decisions and then dynamically switch between larger LLMs for different tasks. Also allows override based on document types. For example in the even of an image, it will use llava automatically. I will be adding user valves in a future revision to allow easier swapping of models via the UI.
Function ID
semantic_model_router
Creator
@atgehrhardt
Downloads
1K+

Function Content
python
"""
title: Semantic Model Router Manifold
author: Andrew Tait Gehrhardt
author_url: https://github.com/atgehrhardt
funding_url: https://github.com/open-webui
version: 1.0.5
"""

import json
from pydantic import BaseModel, Field
from typing import (
    Union,
    Generator,
    Iterator,
    List,
    Dict,
    Optional,
    Callable,
    Awaitable,
    Any,
)
from utils.misc import get_last_user_message
from apps.webui.models.users import Users
from main import generate_chat_completions
import requests


class Pipe:
    class Valves(BaseModel):
        ROUTER_ID: str = Field(
            default="semantic-router",
            description="Identifier for the semantic router model.",
        )
        ROUTER_NAME: str = Field(
            default="Semantic Router", description="Name for the semantic router model."
        )
        OPENAI_BASE_URL: str = Field(
            default="http://host.docker.internal:11434/v1",
            description="OpenAI Compatible URL for local models",
        )
        INTENTION_MODEL: str = Field(
            default="gemma:2b-instruct-v1.1-q4_K_M",
            description="Model for determining intention.",
        )
        ENABLE_EMITTERS: bool = Field(
            default=True,
            description="Toggle to enable or disable event emitters.",
        )
        INTENTIONS: Dict[str, Dict[str, str]] = Field(
            default={
                "chatting": {
                    "description": "The user is simply chatting with you or no other intentions match the user request.",
                    "model": "llama3:instruct",
                },
                "media": {
                    "description": "The user is specifically asking about an image or a video.",
                    "model": "llava:latest",
                },
                "code": {
                    "description": "The user is asking about or requesting help with code.",
                    "model": "anthropic.claude-3-5-sonnet-20240620",
                },
            },
            description="Mapping of intentions to their descriptions and associated models.",
        )

    def __init__(self):
        self.type = "manifold"
        self.valves = self.Valves()

    def pipes(self):
        return [{"name": self.valves.ROUTER_NAME, "id": self.valves.ROUTER_ID}]

    async def determine_intention(
        self,
        query: str,
        __event_emitter__: Optional[Callable[[Any], Awaitable[None]]] = None,
    ) -> str:
        # Add initial event emitter
        if self.valves.ENABLE_EMITTERS and __event_emitter__ is not None:
            await __event_emitter__(
                {
                    "type": "status",
                    "data": {
                        "description": "Determining Intention and Routing Request to appropriate model",
                        "done": False,
                    },
                }
            )

        intentions_prompt = {
            k: v["description"] for k, v in self.valves.INTENTIONS.items()
        }

        url = f"{self.valves.OPENAI_BASE_URL}/chat/completions"
        payload = {
            "model": self.valves.INTENTION_MODEL,
            "messages": [
                {
                    "role": "user",
                    "content": f"""
                    
                    You are an intention measurement machine. You will consider the USER_QUERY carefully and decide what their intention is.
                    You may select one of the following intentions from the object below:
    
                    {intentions_prompt}
    
                    user
                    USER_QUERY: 
                    {query}
                    
                    
                    YOU WILL RESPOND WITH ONLY A SINGLE WORD - The intention key value
                    model 
                    """,
                }
            ],
        }
        try:
            response = requests.post(url, json=payload)
            response_data = response.json()
            intention_text = (
                response_data["choices"][0]["message"]["content"].strip().lower()
            )

            print(f"Raw intention text: {intention_text}")

            # Emit message about the raw intention text immediately
            if self.valves.ENABLE_EMITTERS and __event_emitter__ is not None:
                await __event_emitter__(
                    {
                        "type": "status",
                        "data": {
                            "description": f"Raw intention determined: {intention_text}",
                            "done": False,
                        },
                    }
                )

            # Determine the final intention
            final_intention = "chatting"  # Default intention
            for intention in self.valves.INTENTIONS.keys():
                if (
                    intention == intention_text
                    or intention in intention_text
                    or intention_text in intention
                ):
                    final_intention = intention
                    break

            # Determine the corresponding model
            model_id = self.valves.INTENTIONS[final_intention]["model"]

            # Emit message about which model the query is routed to
            if self.valves.ENABLE_EMITTERS and __event_emitter__ is not None:
                await __event_emitter__(
                    {
                        "type": "status",
                        "data": {
                            "description": f"Query routed to model: {model_id}",
                            "done": False,
                        },
                    }
                )

            return final_intention

        except Exception as e:
            print(f"Error in determine_intention: {e}")
            if self.valves.ENABLE_EMITTERS and __event_emitter__ is not None:
                await __event_emitter__(
                    {
                        "type": "status",
                        "data": {
                            "description": f"Error determining intention: {str(e)}. Defaulting to chatting.",
                            "done": True,
                        },
                    }
                )
            return "chatting"  # Default to chatting in case of error

    async def route_query(
        self,
        query: str,
        messages: List[Dict],
        user: Dict,
        __event_emitter__: Optional[Callable[[Any], Awaitable[None]]] = None,
    ) -> Dict[str, Any]:
        try:
            intention = await self.determine_intention(query, __event_emitter__)
            model_id = self.valves.INTENTIONS.get(
                intention, self.valves.INTENTIONS["chatting"]
            )["model"]

            payload = {
                "model": model_id,
                "messages": messages,
                "stream": False,
            }

            print(f"Routing query to model: {model_id}")
            response = await generate_chat_completions(form_data=payload, user=user)

            print(
                f"Raw response from generate_chat_completions: {json.dumps(response, indent=2)}"
            )

            # Ensure the response is in the correct format
            if isinstance(response, dict) and "choices" in response:
                formatted_response = {
                    "choices": [
                        {
                            "message": {
                                "content": response["choices"][0]["message"]["content"],
                                "role": "assistant",
                            },
                            "finish_reason": "stop",
                            "index": 0,
                        }
                    ]
                }
            else:
                formatted_response = {
                    "choices": [
                        {
                            "message": {
                                "content": str(response),
                                "role": "assistant",
                            },
                            "finish_reason": "stop",
                            "index": 0,
                        }
                    ]
                }

            print(f"Formatted response: {json.dumps(formatted_response, indent=2)}")

            # Emit message that the model has finished processing the request
            if self.valves.ENABLE_EMITTERS and __event_emitter__ is not None:
                await __event_emitter__(
                    {
                        "type": "status",
                        "data": {
                            "description": f"Model {model_id} has finished processing the request",
                            "done": True,
                        },
                    }
                )

            return formatted_response
        except Exception as e:
            print(f"Error in route_query: {str(e)}")
            if self.valves.ENABLE_EMITTERS and __event_emitter__ is not None:
                await __event_emitter__(
                    {
                        "type": "status",
                        "data": {
                            "description": f"Error occurred: {str(e)}",
                            "done": True,
                        },
                    }
                )
            return {
                "choices": [
                    {
                        "message": {
                            "content": f"An error occurred: {str(e)}",
                            "role": "assistant",
                        },
                        "finish_reason": "stop",
                        "index": 0,
                    }
                ]
            }

    async def pipe(
        self,
        body: dict,
        __user__: dict,
        __event_emitter__: Optional[Callable[[Any], Awaitable[None]]] = None,
    ) -> str:
        print(f"pipe:{__name__}")

        messages = body["messages"]
        user_message = get_last_user_message(messages)

        user = Users.get_user_by_id(__user__["id"])

        response = await self.route_query(
            user_message, messages, user, __event_emitter__
        )

        # Extract only the content from the response
        if (
            isinstance(response, dict)
            and "choices" in response
            and len(response["choices"]) > 0
        ):
            content = response["choices"][0]["message"]["content"]
        else:
            content = "Error: Unexpected response format"

        print(f"Final content from pipe: {content}")

        return content  # Return only the content as a string