NOTICE
Open WebUI Community is currently undergoing a major revamp to improve user experience and performance ✨

Function
action
v1.0
Reflection Manifold Pipe
Any model can now reflect on it's answer
Function ID
reflection_manifold_pipe
Creator
@floman
Downloads
628+

Function Content
python
"""
title: Reflection Manifold Pipe
author: flomanxl
version: 1.0
required_open_webui_version: 0.3.9
"""

from pydantic import BaseModel, Field
from typing import Optional, Callable, Awaitable
import aiohttp

class Action:
    class Valves(BaseModel):
        model: str = Field(
            default="gemma2-27b", description="Model to use for reflection process."
        )
        api_base: str = Field(
            default="http://host.docker.internal:11434/v1", 
            description="Base URL for the model API."
        )
        enable_status_indicator: bool = Field(
            default=True, description="Enable or disable status indicator emissions"
        )

    def __init__(self):
        self.valves = self.Valves()

    async def action(
        self,
        body: dict,
        __user__: Optional[dict] = None,
        __event_emitter__: Callable[[dict], Awaitable[None]] = None
    ) -> Optional[dict]:
        await self.emit_status(
            __event_emitter__, "info", "Starting reflection process", False
        )

        messages = body.get("messages", [])
        if not messages:
            error_msg = "No messages found in the request body"
            await self.emit_status(__event_emitter__, "error", error_msg, True)
            return {"error": error_msg}

        initial_response = await self.process_thinking(messages[-1]["content"], __event_emitter__)

        reflection_response = await self.process_reflection(initial_response, __event_emitter__)

        final_response = await self.process_output(reflection_response, __event_emitter__)

        body["messages"].append({"role": "assistant", "content": final_response})
        await self.emit_status(
            __event_emitter__, "info", "Reflection process completed", True
        )
        return body

    async def process_thinking(
        self, prompt: str, __event_emitter__: Callable[[dict], Awaitable[None]] = None
    ) -> str:
        await self.emit_status(__event_emitter__, "info", "Generating initial thinking", False)
        response = await self.query_model(f"{prompt}", __event_emitter__)
        return response

    async def process_reflection(
        self, thinking_response: str, __event_emitter__: Callable[[dict], Awaitable[None]] = None
    ) -> str:
        await self.emit_status(__event_emitter__, "info", "Checking for reflection", False)
        reflection_prompt = f"{thinking_response}"
        reflection_response = await self.query_model(reflection_prompt, __event_emitter__)
        return reflection_response

    async def process_output(
        self, reflection_response: str, __event_emitter__: Callable[[dict], Awaitable[None]] = None
    ) -> str:
        await self.emit_status(__event_emitter__, "info", "Generating final output", False)
        final_output = await self.query_model(f"{reflection_response}", __event_emitter__)
        return final_output

    async def query_model(
        self, prompt: str, __event_emitter__: Callable[[dict], Awaitable[None]] = None
    ) -> str:
        url = f"{self.valves.api_base}/chat/completions"
        headers = {"Content-Type": "application/json"}
        data = {"model": self.valves.model, "messages": [{"role": "user", "content": prompt}]}

        try:
            await self.emit_status(__event_emitter__, "info", f"Querying model: {self.valves.model}", False)
            async with aiohttp.ClientSession() as session:
                async with session.post(url, headers=headers, json=data) as response:
                    if response.status != 200:
                        return f"Error: {response.status}"
                    result = await response.json()
                    return result["choices"][0]["message"]["content"]
        except Exception as e:
            return f"Error: {str(e)}"

    async def emit_status(
        self,
        __event_emitter__: Callable[[dict], Awaitable[None]],
        level: str,
        message: str,
        done: bool,
    ):
        if __event_emitter__ and self.valves.enable_status_indicator:
            await __event_emitter__(
                {
                    "type": "status",
                    "data": {
                        "status": "complete" if done else "in_progress",
                        "level": level,
                        "description": message,
                        "done": done,
                    },
                }
            )

    async def on_start(self):
        print("Reflection Process Action for Gemma2 27B started")

    async def on_stop(self):
        print("Reflection Process Action for Gemma2 27B stopped")