"""
title: Reflection Manifold Pipe
author: flomanxl
version: 1.0
required_open_webui_version: 0.3.9
"""
from pydantic import BaseModel, Field
from typing import Optional, Callable, Awaitable
import aiohttp
class Action:
class Valves(BaseModel):
model: str = Field(
default="gemma2-27b", description="Model to use for reflection process."
)
api_base: str = Field(
default="http://host.docker.internal:11434/v1",
description="Base URL for the model API."
)
enable_status_indicator: bool = Field(
default=True, description="Enable or disable status indicator emissions"
)
def __init__(self):
self.valves = self.Valves()
async def action(
self,
body: dict,
__user__: Optional[dict] = None,
__event_emitter__: Callable[[dict], Awaitable[None]] = None
) -> Optional[dict]:
await self.emit_status(
__event_emitter__, "info", "Starting reflection process", False
)
messages = body.get("messages", [])
if not messages:
error_msg = "No messages found in the request body"
await self.emit_status(__event_emitter__, "error", error_msg, True)
return {"error": error_msg}
initial_response = await self.process_thinking(messages[-1]["content"], __event_emitter__)
reflection_response = await self.process_reflection(initial_response, __event_emitter__)
final_response = await self.process_output(reflection_response, __event_emitter__)
body["messages"].append({"role": "assistant", "content": final_response})
await self.emit_status(
__event_emitter__, "info", "Reflection process completed", True
)
return body
async def process_thinking(
self, prompt: str, __event_emitter__: Callable[[dict], Awaitable[None]] = None
) -> str:
await self.emit_status(__event_emitter__, "info", "Generating initial thinking", False)
response = await self.query_model(f"{prompt}", __event_emitter__)
return response
async def process_reflection(
self, thinking_response: str, __event_emitter__: Callable[[dict], Awaitable[None]] = None
) -> str:
await self.emit_status(__event_emitter__, "info", "Checking for reflection", False)
reflection_prompt = f"{thinking_response}"
reflection_response = await self.query_model(reflection_prompt, __event_emitter__)
return reflection_response
async def process_output(
self, reflection_response: str, __event_emitter__: Callable[[dict], Awaitable[None]] = None
) -> str:
await self.emit_status(__event_emitter__, "info", "Generating final output", False)
final_output = await self.query_model(f"", __event_emitter__)
return final_output
async def query_model(
self, prompt: str, __event_emitter__: Callable[[dict], Awaitable[None]] = None
) -> str:
url = f"{self.valves.api_base}/chat/completions"
headers = {"Content-Type": "application/json"}
data = {"model": self.valves.model, "messages": [{"role": "user", "content": prompt}]}
try:
await self.emit_status(__event_emitter__, "info", f"Querying model: {self.valves.model}", False)
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=data) as response:
if response.status != 200:
return f"Error: {response.status}"
result = await response.json()
return result["choices"][0]["message"]["content"]
except Exception as e:
return f"Error: {str(e)}"
async def emit_status(
self,
__event_emitter__: Callable[[dict], Awaitable[None]],
level: str,
message: str,
done: bool,
):
if __event_emitter__ and self.valves.enable_status_indicator:
await __event_emitter__(
{
"type": "status",
"data": {
"status": "complete" if done else "in_progress",
"level": level,
"description": message,
"done": done,
},
}
)
async def on_start(self):
print("Reflection Process Action for Gemma2 27B started")
async def on_stop(self):
print("Reflection Process Action for Gemma2 27B stopped")