Whitepaper
Docs
Sign In
Function
Function
action
v1.2
Reflection Manifold Pipe Updated
Function ID
reflection_manifold_pipe_updated
Creator
@andreyrgw
Downloads
1.1K+
Any model can now reflect on it's answer
Get
README
No README available
Function Code
Show
""" title: Reflection Manifold Pipe Updated author: flomanxl | AndreyRGW version: 1.2 required_open_webui_version: 0.3.21 """ from pydantic import BaseModel, Field from typing import Optional, Callable, Awaitable import aiohttp class Action: class Valves(BaseModel): model: str = Field( default="magnum", description="Model to use for reflection process." ) api_base: str = Field( default="http://localhost:11434/v1", description="Base URL for the model API.", ) enable_status_indicator: bool = Field( default=True, description="Enable or disable status indicator emissions" ) def __init__(self): self.valves = self.Valves() async def action( self, body: dict, __user__: Optional[dict] = None, __event_emitter__: Callable[[dict], Awaitable[None]] = None, ) -> Optional[dict]: await self.emit_status( __event_emitter__, "info", "Starting reflection process", False ) messages = body.get("messages", []) if not messages: error_msg = "No messages found in the request body" await self.emit_status(__event_emitter__, "error", error_msg, True) return {"error": error_msg} initial_response = await self.process_thinking( messages[-1]["content"], __event_emitter__ ) print(f"Initial response: {initial_response}") reflection_response = await self.process_reflection( initial_response, __event_emitter__ ) print(f"Reflection response: {reflection_response}") final_response = await self.process_output( reflection_response, __event_emitter__ ) print(f"Final response: {final_response}") # Check that final_response is not empty if not final_response: error_msg = "Final response is empty" await self.emit_status(__event_emitter__, "error", error_msg, True) return {"error": error_msg} # Updating the last message from the assistant if messages[-1]["role"] == "assistant": messages[-1]["content"] += f"\n\nReflection output:\n{final_response}" else: # Adding a new message from the assistant body["messages"].append({"role": "assistant", "content": final_response}) print(f"Updated body: {body}") await self.emit_status( __event_emitter__, "info", "Reflection process completed", True ) # Check that the body contains updated messages if "messages" not in body or not body["messages"]: error_msg = "Failed to add final response to body" await self.emit_status(__event_emitter__, "error", error_msg, True) return {"error": error_msg} # Bringing back an updated body return body async def process_thinking( self, prompt: str, __event_emitter__: Callable[[dict], Awaitable[None]] = None ) -> str: await self.emit_status( __event_emitter__, "info", "Generating initial thinking", False ) response = await self.query_model( f"<thinking>{prompt}</thinking>", __event_emitter__ ) print(f"Thinking response: {response}") return response async def process_reflection( self, thinking_response: str, __event_emitter__: Callable[[dict], Awaitable[None]] = None, ) -> str: await self.emit_status( __event_emitter__, "info", "Checking for reflection", False ) reflection_prompt = f"<reflection>{thinking_response}</reflection>" reflection_response = await self.query_model( reflection_prompt, __event_emitter__ ) print(f"Reflection response: {reflection_response}") return reflection_response async def process_output( self, reflection_response: str, __event_emitter__: Callable[[dict], Awaitable[None]] = None, ) -> str: await self.emit_status( __event_emitter__, "info", "Generating final output", False ) final_output = await self.query_model( f"<output>{reflection_response}</output>", __event_emitter__ ) print(f"Final output: {final_output}") return final_output async def query_model( self, prompt: str, __event_emitter__: Callable[[dict], Awaitable[None]] = None ) -> str: url = f"{self.valves.api_base}/chat/completions" headers = {"Content-Type": "application/json"} data = { "model": self.valves.model, "messages": [{"role": "user", "content": prompt}], } try: await self.emit_status( __event_emitter__, "info", f"Querying model: {self.valves.model}", False ) async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, json=data) as response: if response.status != 200: return f"Error: {response.status}" result = await response.json() return result["choices"][0]["message"]["content"] except Exception as e: return f"Error: {str(e)}" async def emit_status( self, __event_emitter__: Callable[[dict], Awaitable[None]], level: str, message: str, done: bool, ): if __event_emitter__ and self.valves.enable_status_indicator: await __event_emitter__( { "type": "status", "data": { "status": "complete" if done else "in_progress", "level": level, "description": message, "done": done, }, } ) async def on_start(self): print("Reflection Process Action for Gemma2 27B started") async def on_stop(self): print("Reflection Process Action for Gemma2 27B stopped")