"""
title: Reflection Manifold Pipe Updated
author: flomanxl | AndreyRGW
version: 1.2
required_open_webui_version: 0.3.21
"""
from pydantic import BaseModel, Field
from typing import Optional, Callable, Awaitable
import aiohttp
class Action:
class Valves(BaseModel):
model: str = Field(
default="magnum", description="Model to use for reflection process."
)
api_base: str = Field(
default="http://localhost:11434/v1",
description="Base URL for the model API.",
)
enable_status_indicator: bool = Field(
default=True, description="Enable or disable status indicator emissions"
)
def __init__(self):
self.valves = self.Valves()
async def action(
self,
body: dict,
__user__: Optional[dict] = None,
__event_emitter__: Callable[[dict], Awaitable[None]] = None,
) -> Optional[dict]:
await self.emit_status(
__event_emitter__, "info", "Starting reflection process", False
)
messages = body.get("messages", [])
if not messages:
error_msg = "No messages found in the request body"
await self.emit_status(__event_emitter__, "error", error_msg, True)
return {"error": error_msg}
initial_response = await self.process_thinking(
messages[-1]["content"], __event_emitter__
)
print(f"Initial response: {initial_response}")
reflection_response = await self.process_reflection(
initial_response, __event_emitter__
)
print(f"Reflection response: {reflection_response}")
final_response = await self.process_output(
reflection_response, __event_emitter__
)
print(f"Final response: {final_response}")
# Check that final_response is not empty
if not final_response:
error_msg = "Final response is empty"
await self.emit_status(__event_emitter__, "error", error_msg, True)
return {"error": error_msg}
# Updating the last message from the assistant
if messages[-1]["role"] == "assistant":
messages[-1]["content"] += f"\n\nReflection output:\n{final_response}"
else:
# Adding a new message from the assistant
body["messages"].append({"role": "assistant", "content": final_response})
print(f"Updated body: {body}")
await self.emit_status(
__event_emitter__, "info", "Reflection process completed", True
)
# Check that the body contains updated messages
if "messages" not in body or not body["messages"]:
error_msg = "Failed to add final response to body"
await self.emit_status(__event_emitter__, "error", error_msg, True)
return {"error": error_msg}
# Bringing back an updated body
return body
async def process_thinking(
self, prompt: str, __event_emitter__: Callable[[dict], Awaitable[None]] = None
) -> str:
await self.emit_status(
__event_emitter__, "info", "Generating initial thinking", False
)
response = await self.query_model(
f"{prompt}", __event_emitter__
)
print(f"Thinking response: {response}")
return response
async def process_reflection(
self,
thinking_response: str,
__event_emitter__: Callable[[dict], Awaitable[None]] = None,
) -> str:
await self.emit_status(
__event_emitter__, "info", "Checking for reflection", False
)
reflection_prompt = f"{thinking_response}"
reflection_response = await self.query_model(
reflection_prompt, __event_emitter__
)
print(f"Reflection response: {reflection_response}")
return reflection_response
async def process_output(
self,
reflection_response: str,
__event_emitter__: Callable[[dict], Awaitable[None]] = None,
) -> str:
await self.emit_status(
__event_emitter__, "info", "Generating final output", False
)
final_output = await self.query_model(
f"", __event_emitter__
)
print(f"Final output: {final_output}")
return final_output
async def query_model(
self, prompt: str, __event_emitter__: Callable[[dict], Awaitable[None]] = None
) -> str:
url = f"{self.valves.api_base}/chat/completions"
headers = {"Content-Type": "application/json"}
data = {
"model": self.valves.model,
"messages": [{"role": "user", "content": prompt}],
}
try:
await self.emit_status(
__event_emitter__, "info", f"Querying model: {self.valves.model}", False
)
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=data) as response:
if response.status != 200:
return f"Error: {response.status}"
result = await response.json()
return result["choices"][0]["message"]["content"]
except Exception as e:
return f"Error: {str(e)}"
async def emit_status(
self,
__event_emitter__: Callable[[dict], Awaitable[None]],
level: str,
message: str,
done: bool,
):
if __event_emitter__ and self.valves.enable_status_indicator:
await __event_emitter__(
{
"type": "status",
"data": {
"status": "complete" if done else "in_progress",
"level": level,
"description": message,
"done": done,
},
}
)
async def on_start(self):
print("Reflection Process Action for Gemma2 27B started")
async def on_stop(self):
print("Reflection Process Action for Gemma2 27B stopped")