NOTICE
Open WebUI Community is currently undergoing a major revamp to improve user experience and performance ✨

Function
action
v1.2
Reflection Manifold Pipe Updated
Any model can now reflect on it's answer
Function ID
reflection_manifold_pipe_updated
Creator
@andreyrgw
Downloads
891+

Function Content
python
"""
title: Reflection Manifold Pipe Updated
author: flomanxl | AndreyRGW
version: 1.2
required_open_webui_version: 0.3.21
"""

from pydantic import BaseModel, Field
from typing import Optional, Callable, Awaitable
import aiohttp


class Action:
    class Valves(BaseModel):
        model: str = Field(
            default="magnum", description="Model to use for reflection process."
        )
        api_base: str = Field(
            default="http://localhost:11434/v1",
            description="Base URL for the model API.",
        )
        enable_status_indicator: bool = Field(
            default=True, description="Enable or disable status indicator emissions"
        )

    def __init__(self):
        self.valves = self.Valves()

    async def action(
        self,
        body: dict,
        __user__: Optional[dict] = None,
        __event_emitter__: Callable[[dict], Awaitable[None]] = None,
    ) -> Optional[dict]:
        await self.emit_status(
            __event_emitter__, "info", "Starting reflection process", False
        )

        messages = body.get("messages", [])
        if not messages:
            error_msg = "No messages found in the request body"
            await self.emit_status(__event_emitter__, "error", error_msg, True)
            return {"error": error_msg}

        initial_response = await self.process_thinking(
            messages[-1]["content"], __event_emitter__
        )
        print(f"Initial response: {initial_response}")

        reflection_response = await self.process_reflection(
            initial_response, __event_emitter__
        )
        print(f"Reflection response: {reflection_response}")

        final_response = await self.process_output(
            reflection_response, __event_emitter__
        )
        print(f"Final response: {final_response}")

        # Check that final_response is not empty
        if not final_response:
            error_msg = "Final response is empty"
            await self.emit_status(__event_emitter__, "error", error_msg, True)
            return {"error": error_msg}

        # Updating the last message from the assistant
        if messages[-1]["role"] == "assistant":
            messages[-1]["content"] += f"\n\nReflection output:\n{final_response}"
        else:
            # Adding a new message from the assistant
            body["messages"].append({"role": "assistant", "content": final_response})

        print(f"Updated body: {body}")
        await self.emit_status(
            __event_emitter__, "info", "Reflection process completed", True
        )

        # Check that the body contains updated messages
        if "messages" not in body or not body["messages"]:
            error_msg = "Failed to add final response to body"
            await self.emit_status(__event_emitter__, "error", error_msg, True)
            return {"error": error_msg}

        # Bringing back an updated body
        return body

    async def process_thinking(
        self, prompt: str, __event_emitter__: Callable[[dict], Awaitable[None]] = None
    ) -> str:
        await self.emit_status(
            __event_emitter__, "info", "Generating initial thinking", False
        )
        response = await self.query_model(
            f"{prompt}", __event_emitter__
        )
        print(f"Thinking response: {response}")
        return response

    async def process_reflection(
        self,
        thinking_response: str,
        __event_emitter__: Callable[[dict], Awaitable[None]] = None,
    ) -> str:
        await self.emit_status(
            __event_emitter__, "info", "Checking for reflection", False
        )
        reflection_prompt = f"{thinking_response}"
        reflection_response = await self.query_model(
            reflection_prompt, __event_emitter__
        )
        print(f"Reflection response: {reflection_response}")
        return reflection_response

    async def process_output(
        self,
        reflection_response: str,
        __event_emitter__: Callable[[dict], Awaitable[None]] = None,
    ) -> str:
        await self.emit_status(
            __event_emitter__, "info", "Generating final output", False
        )
        final_output = await self.query_model(
            f"{reflection_response}", __event_emitter__
        )
        print(f"Final output: {final_output}")
        return final_output

    async def query_model(
        self, prompt: str, __event_emitter__: Callable[[dict], Awaitable[None]] = None
    ) -> str:
        url = f"{self.valves.api_base}/chat/completions"
        headers = {"Content-Type": "application/json"}
        data = {
            "model": self.valves.model,
            "messages": [{"role": "user", "content": prompt}],
        }

        try:
            await self.emit_status(
                __event_emitter__, "info", f"Querying model: {self.valves.model}", False
            )
            async with aiohttp.ClientSession() as session:
                async with session.post(url, headers=headers, json=data) as response:
                    if response.status != 200:
                        return f"Error: {response.status}"
                    result = await response.json()
                    return result["choices"][0]["message"]["content"]
        except Exception as e:
            return f"Error: {str(e)}"

    async def emit_status(
        self,
        __event_emitter__: Callable[[dict], Awaitable[None]],
        level: str,
        message: str,
        done: bool,
    ):
        if __event_emitter__ and self.valves.enable_status_indicator:
            await __event_emitter__(
                {
                    "type": "status",
                    "data": {
                        "status": "complete" if done else "in_progress",
                        "level": level,
                        "description": message,
                        "done": done,
                    },
                }
            )

    async def on_start(self):
        print("Reflection Process Action for Gemma2 27B started")

    async def on_stop(self):
        print("Reflection Process Action for Gemma2 27B stopped")