"""
title: Reflection Filter
author: Max Kerkula
version: .04
"""
import os
import uuid
import logging
import re
import json
from typing import Optional, Callable, Awaitable, Any, List, Dict
from pydantic import BaseModel, Field
import markdown
# Import necessary OpenWebUI modules for handling files
from open_webui.apps.webui.models.files import Files, FileForm
from open_webui.config import UPLOAD_DIR
# Configure module-specific logging
def setup_logger():
logger = logging.getLogger(__name__)
if not logger.handlers:
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.set_name("reflection_filter_handler")
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.propagate = False
return logger
logger = setup_logger()
def clean_message(message: str, tags: list) -> str:
"""
Cleans the message by ensuring that tags are properly formatted without additional markdown or formatting.
"""
for tag in tags:
# Clean opening tags
message = re.sub(
rf"(\*\*|\_\_)?\s*<\s*{tag}\s*>\s*(\*\*|\_\_)?",
f"<{tag}>",
message,
flags=re.IGNORECASE,
)
# Clean closing tags
message = re.sub(
rf"(\*\*|\_\_)?\s*<\s*/\s*{tag}\s*>\s*(\*\*|\_\_)?",
f"{tag}>",
message,
flags=re.IGNORECASE,
)
return message
def extract_tag_content(message: str, tags: list) -> List[Dict[str, str]]:
"""
Extracts content within specified tags from the message.
Groups , , and as a single round.
Returns a list of artifacts, each containing the specified tags.
"""
message = clean_message(message, tags)
artifacts = []
# Pattern to identify opening and closing tags
tag_pattern = re.compile(rf"<\s*(/{'?'}\s*({ '|'.join(tags) })\s*)>", re.IGNORECASE)
# Find all tag positions
tag_positions = [
(m.start(), m.end(), m.group(2).lower(), "/" in m.group(1))
for m in tag_pattern.finditer(message)
]
current_round = {}
for pos in tag_positions:
start, end, tag, is_closing = pos
content = (
message[end:].split("<", 1)[0].strip()
) # Content until next tag or end
if not is_closing:
current_round[tag] = content
else:
if tag in current_round:
artifacts.append(current_round.copy())
current_round = {}
return artifacts
class MiddlewareHTMLGenerator:
@staticmethod
def generate_style(dark_mode=True):
"""
Generates CSS styles based on the dark_mode flag, including styles for sticky "Show Thinking" button
and scrollbar in fullscreen mode.
"""
background_color = "#1e1e1e" if dark_mode else "#f5f5f5"
text_color = "#e0e0e0" if dark_mode else "#333"
secondary_background_color = "#2d2d2d" if dark_mode else "#fff"
highlight_color = "#3d3d3d" if dark_mode else "#f0f0f0"
button_color = "#444" if dark_mode else "#ddd"
overlay_background = "rgba(0, 0, 0, 0.9)"
arrow_color = "#888" if dark_mode else "#ccc"
return f"""
body {{
font-family: 'Inter', sans-serif;
background-color: {background_color};
color: {text_color};
margin: 0;
padding: 0;
line-height: 1.6;
}}
.artifact-container {{
max-width: 800px;
margin: 20px auto;
background-color: {secondary_background_color};
border-radius: 8px;
overflow: visible !important;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
padding: 5px !important;
position: relative;
}}
/* Fullscreen mode styles */
.artifact-container.fullscreen {{
width: 100%;
height: 100%;
max-width: none;
margin: 0;
padding: 10px;
overflow-y: auto; /* Enable scrollbar in fullscreen mode */
}}
.collapsible {{
background-color: {highlight_color};
color: {text_color};
cursor: pointer;
padding: 10px;
width: 100%;
border: none;
text-align: left;
outline: none;
font-size: 16px;
border-radius: 5px;
transition: background-color 0.3s;
margin-bottom: 5px;
position: sticky; /* Make the button sticky */
top: 0; /* Stick to the top */
z-index: 2; /* Ensure it stays above other content */
}}
.active, .collapsible:hover {{
background-color: {secondary_background_color};
}}
.content {{
padding: 0 10px;
display: none;
overflow: visible !important;
background-color: {background_color};
margin-bottom: 10px;
border-radius: 5px;
}}
pre {{
white-space: pre-wrap;
word-wrap: break-word;
background-color: {highlight_color};
padding: 10px;
border-radius: 5px;
overflow-x: auto;
}}
h2 {{
margin-top: 20px;
margin-bottom: 10px;
color: {text_color};
text-align: center;
}}
/* Styles for gallery */
.gallery {{
display: flex;
flex-wrap: wrap;
gap: 10px;
margin-top: 20px;
justify-content: center;
}}
.gallery img {{
width: auto;
height: auto;
max-height: 300px;
object-fit: cover;
cursor: pointer;
border-radius: 5px;
transition: transform 0.2s;
}}
.gallery img:hover {{
transform: scale(1.05);
}}
/* Image modal styles */
.image-modal {{
position: fixed;
top: 0;
left: 0;
width: 100%;
height: 100%;
background-color: {overlay_background};
display: flex;
justify-content: center;
align-items: center;
z-index: 5;
}}
.image-modal img {{
max-width: 90%;
max-height: 90%;
border-radius: 5px;
}}
/* Fullscreen Toggle Button */
.fullscreen-button {{
position: absolute;
top: 10px;
right: 10px;
background-color: {button_color};
color: {text_color};
border: none;
padding: 8px 12px;
cursor: pointer;
border-radius: 5px;
z-index: 3;
}}
.fullscreen-button:hover {{
background-color: {highlight_color};
}}
/* Navigation buttons in image modal */
.nav-button {{
position: absolute;
top: 50%;
transform: translateY(-50%);
background-color: transparent;
border: none;
font-size: 48px;
color: {arrow_color};
cursor: pointer;
z-index: 6;
user-select: none;
}}
.nav-button:hover {{
color: {text_color};
}}
.nav-button.left {{
left: 20px;
}}
.nav-button.right {{
right: 20px;
}}
"""
@staticmethod
def generate_script():
"""
Generates JavaScript code for interactivity, including collapsible sections,
image modal with navigation, and fullscreen functionality.
"""
return """
document.addEventListener('DOMContentLoaded', function() {
// Collapsible sections
var coll = document.getElementsByClassName('collapsible');
for (var i = 0; i < coll.length; i++) {
coll[i].addEventListener('click', function() {
this.classList.toggle('active');
var content = this.nextElementSibling;
if (content.style.display === 'block') {
content.style.display = 'none';
} else {
content.style.display = 'block';
}
});
}
// Image modal with navigation
var images = document.querySelectorAll('.gallery img');
var currentIndex = 0;
var imageSources = Array.from(images).map(img => img.src);
images.forEach(function(img, index) {
img.addEventListener('click', function(event) {
event.stopPropagation();
currentIndex = index;
openImageModal();
});
});
function openImageModal() {
// Create modal overlay
var modalOverlay = document.createElement('div');
modalOverlay.classList.add('image-modal');
// Create image element
var image = document.createElement('img');
image.src = imageSources[currentIndex];
// Append image to modal
modalOverlay.appendChild(image);
// Create navigation buttons
var leftButton = document.createElement('button');
leftButton.classList.add('nav-button', 'left');
leftButton.innerHTML = '❮'; // Left arrow
modalOverlay.appendChild(leftButton);
var rightButton = document.createElement('button');
rightButton.classList.add('nav-button', 'right');
rightButton.innerHTML = '❯'; // Right arrow
modalOverlay.appendChild(rightButton);
// Event listeners for navigation buttons
leftButton.addEventListener('click', function(event) {
event.stopPropagation();
currentIndex = (currentIndex - 1 + imageSources.length) % imageSources.length;
image.src = imageSources[currentIndex];
});
rightButton.addEventListener('click', function(event) {
event.stopPropagation();
currentIndex = (currentIndex + 1) % imageSources.length;
image.src = imageSources[currentIndex];
});
// Append modal to the fullscreen element or body
var fullscreenElement = document.fullscreenElement || document.webkitFullscreenElement || document.mozFullScreenElement || document.msFullscreenElement;
if (fullscreenElement) {
fullscreenElement.appendChild(modalOverlay);
} else {
document.body.appendChild(modalOverlay);
}
// Click event to close modal
modalOverlay.addEventListener('click', function(event) {
event.stopPropagation();
if (fullscreenElement) {
fullscreenElement.removeChild(modalOverlay);
} else {
document.body.removeChild(modalOverlay);
}
});
}
// Fullscreen toggle button functionality
var fullscreenButton = document.getElementById('fullscreen-button');
if (fullscreenButton) {
fullscreenButton.addEventListener('click', function(event) {
event.stopPropagation();
var artifactContainer = document.querySelector('.artifact-container');
if (!document.fullscreenElement) {
if (artifactContainer.requestFullscreen) {
artifactContainer.requestFullscreen();
} else if (artifactContainer.webkitRequestFullscreen) { /* Safari */
artifactContainer.webkitRequestFullscreen();
} else if (artifactContainer.msRequestFullscreen) { /* IE11 */
artifactContainer.msRequestFullscreen();
}
} else {
if (document.exitFullscreen) {
document.exitFullscreen();
} else if (document.webkitExitFullscreen) { /* Safari */
document.webkitExitFullscreen();
} else if (document.msExitFullscreen) { /* IE11 */
document.msExitFullscreen();
}
}
});
}
});
"""
@classmethod
def create_middleware_html(
cls,
thinking_list: List[str],
output_list: List[str],
final_answer: str,
image_descriptions: List[str],
image_urls: List[str],
dark_mode: bool,
) -> str:
"""
Creates the final middleware HTML with updated navigation buttons in the fullscreen image preview.
"""
# Combine all rounds of thinking content
thinking_content = ""
for idx, thinking in enumerate(thinking_list):
thinking_markdown = markdown.markdown(thinking, extensions=["extra"])
thinking_content += f"\nRound {idx + 1}
\n{thinking_markdown}\n \n"
# Combine all outputs
output_content = ""
for output in output_list:
output_markdown = markdown.markdown(output, extensions=["extra"])
output_content += f"{output_markdown}\n"
# Include final answer explicitly if needed
if final_answer:
final_answer_markdown = markdown.markdown(
final_answer, extensions=["extra"]
)
output_content += f"Final Answer
{final_answer_markdown}
"
# Include images in the output_html as a centered gallery
gallery_html = ""
if image_urls:
gallery_html += ""
for idx, image_url in enumerate(image_urls):
image_description = (
image_descriptions[idx] if idx < len(image_descriptions) else ""
)
gallery_html += f"""
"""
gallery_html += ""
output_content += gallery_html
return f"""
Reflection Filter Output
{thinking_content}
{output_content}
"""
class Filter:
class Valves(BaseModel):
priority: int = Field(
default=0, description="Priority level for the filter operations."
)
dark_mode: bool = Field(default=True, description="Enable dark mode by default")
valves: Valves
viz_dir: str = "visualizations"
html_dir: str = "html"
system_prompt: str = (
"""
You are an AI assistant designed to provide detailed, step-by-step responses while maintaining conciseness and clarity. Your outputs must adhere to the following strict structure:
- **Use Exact Tags**: All tags must appear exactly as specified, each on a separate line with no additional text or formatting.
### **Response Structure**
For each query, perform multiple rounds of reasoning (3 to 5 for simple tasks and 7 or more for complex tasks) following these steps:
- **Analysis**: Analyze the query and outline your approach.
- **Plan**: Present a step-by-step plan to address the query.
- **Reasoning**: Use logical reasoning to break down your thoughts into clear steps.
- **Incorporate Information**: Integrate relevant information from provided data or context.
- **Problem Breakdown**: Break down complex problems into manageable sub-problems.
**Confidence Level**: Provide a confidence score between 0-100% for this reasoning phase.
- **Review**: Rephrase your reasoning to ensure clarity.
- **Error Check**: Identify any potential errors or oversights.
- **Adjustments**: Modify your conclusions based on identified errors.
- **Depth**: Analyze how your reasoning affects future conclusions.
**Confidence Level**: Provide a confidence score between 0-100% for this reflection phase.
- **Evaluation**: Assess the quality and validity of your reasoning.
- **Decision**: Decide whether to continue with another round or proceed to the final answer.
- **Output**: Provide your decision in JSON format:
{
"title": "Discriminator Decision",
"content": "Explanation of your evaluation and decision...",
"confidence": 0-100,
"next_action": "continue" or "final_answer"
}
**Confidence Level**: Provide a confidence score between 0-100% for this discriminator phase.
**Repeat until you decide to proceed to the final answer.**
### **Guidelines**
- **Language and Style**:
- Use clear and concise language.
- Maintain the same language as the query.
- Keep a logical and thorough tone.
- **Confidence and Validation**:
- Provide confidence levels for each phase.
- Proceed to the final answer only when confidence level is 100%.
- **Content Considerations**:
- If you don't know the answer, explain why.
- **Additional Requirements**:
- Ensure the reasoning process does not enter an infinite loop.
- Maintain the integrity of the tag structure.
- Avoid redundancy in your reasoning.
""".strip()
)
def __init__(self):
self.logger = logging.getLogger(__name__)
self.valves = self.Valves()
def normalize_prompt(self, prompt: str) -> str:
"""
Normalizes the system prompt by removing excessive whitespace.
"""
return " ".join(prompt.split())
async def inlet(
self,
body: dict,
__user__: dict = None,
__event_emitter__: Callable[[Any], Awaitable[None]] = None,
__model__: Optional[dict] = None,
) -> dict:
"""
Injects the system prompt into the conversation if it's not already present.
"""
if "messages" not in body:
body["messages"] = []
normalized_system_prompt = self.normalize_prompt(self.system_prompt)
existing_prompts = [
msg
for msg in body["messages"]
if msg.get("role") == "system"
and self.normalize_prompt(msg.get("content", ""))
== normalized_system_prompt
]
if not existing_prompts:
body["messages"].insert(
0, {"role": "system", "content": self.system_prompt}
)
self.logger.debug(
"Reflection Filter system prompt injected into the conversation."
)
else:
self.logger.debug(
"Reflection Filter system prompt already present in the conversation."
)
return body
def ensure_chat_directory(self, chat_id: str, content_type: str) -> str:
"""
Ensures that the directory for storing artifacts exists.
"""
sanitized_chat_id = os.path.basename(chat_id)
chat_dir = os.path.join(
UPLOAD_DIR, self.viz_dir, content_type, sanitized_chat_id
)
os.makedirs(chat_dir, exist_ok=True)
self.logger.debug(f"Ensured chat directory exists at: {chat_dir}")
return chat_dir
def write_content_to_file(
self, content: str, user_id: str, chat_id: str, content_type: str
) -> str:
"""
Writes the generated HTML content to a file and registers it with OpenWebUI.
"""
try:
chat_dir = self.ensure_chat_directory(chat_id, content_type)
filename = f"{content_type}_{uuid.uuid4()}.html"
file_path = os.path.join(chat_dir, filename)
with open(file_path, "w", encoding="utf-8") as f:
f.write(content)
relative_path = os.path.join(self.viz_dir, content_type, chat_id, filename)
file_form = FileForm(
id=str(uuid.uuid4()),
filename=relative_path,
meta={
"name": filename,
"content_type": "text/html",
"size": len(content),
"path": file_path,
},
)
file_id = Files.insert_new_file(user_id, file_form).id
self.logger.debug(
f"Written content to file: {file_path} with ID: {file_id}"
)
return file_id
except Exception as e:
self.logger.error(f"Failed to write content to file: {e}")
raise
async def outlet(
self,
body: dict,
__user__: dict = None,
__event_emitter__: Callable[[Any], Awaitable[None]] = None,
__call_llm__: Callable[[dict], Awaitable[dict]] = None,
__model__: Optional[dict] = None,
tool_ids: Optional[List[str]] = None,
__tool_responses__: Optional[Dict[str, Any]] = None,
) -> dict:
"""
Processes the assistant's messages, extracts all rounds of reasoning, reflection, and discriminator,
generates the HTML artifact, and updates the assistant's response with the HTML file ID.
"""
self.logger.debug("Starting outlet processing.")
if "messages" not in body or not body["messages"]:
self.logger.error("No messages found in the body.")
return body
assistant_messages = [
msg for msg in body["messages"] if msg.get("role") == "assistant"
]
if not assistant_messages:
self.logger.warning("No assistant messages found in the body.")
return body
all_artifacts = []
final_answer = ""
image_descriptions = []
image_urls = []
thinking_list = []
output_list = []
tags = ["thinking", "reflection", "discriminator", "output"]
for msg in assistant_messages:
content = msg.get("content", "")
self.logger.debug(f"Processing assistant message: {content[:100]}...")
artifacts = extract_tag_content(content, tags)
if artifacts:
for artifact in artifacts:
all_artifacts.append(artifact)
# Combine thinking, reflection, and discriminator content as a single round
thinking_round = ""
for tag in ["thinking", "reflection", "discriminator"]:
if artifact.get(tag):
# Use markdown headers for each section
thinking_round += (
f"### {tag.capitalize()}\n"
+ markdown.markdown(artifact.get(tag))
+ "\n"
)
thinking_list.append(thinking_round)
if artifact.get("output"):
output_list.append(artifact.get("output"))
# Process discriminator decision
discriminator_content = artifact.get("discriminator", "")
try:
# Extract JSON from discriminator content
decision_match = re.search(
r"\{.*\}", discriminator_content, re.DOTALL
)
if decision_match:
decision_json = decision_match.group(0)
decision = json.loads(decision_json)
if decision.get("next_action") == "final_answer":
self.logger.debug(
"Discriminator decided to proceed to final answer."
)
except (json.JSONDecodeError, AttributeError):
self.logger.warning(
f"Failed to parse discriminator decision JSON: {discriminator_content}"
)
# Extract final answer and image descriptions
final_answer_match = re.search(
r"Final Answer:\s*(.*?)(?=(\*\*Confidence Level\*\*|$|Images))",
content,
re.DOTALL | re.IGNORECASE,
)
if final_answer_match:
final_answer = final_answer_match.group(1).strip()
self.logger.debug(f"Extracted final answer: {final_answer}")
# Extract image descriptions and URLs
image_desc_matches = re.findall(
r"Image Description:\s*(.*?)(?=(\n\n|$|!\[))",
content,
re.DOTALL | re.IGNORECASE,
)
image_desc_matches = [desc[0].strip() for desc in image_desc_matches]
image_url_matches = re.findall(r"!\[.*?\]\((.*?)\)", content)
image_descriptions.extend(image_desc_matches)
image_urls.extend(image_url_matches)
if not all_artifacts:
self.logger.warning("No tagged content found in assistant messages.")
return body
# Generate middleware HTML with all rounds
middleware_content = MiddlewareHTMLGenerator.create_middleware_html(
thinking_list,
output_list,
final_answer,
image_descriptions,
image_urls,
self.valves.dark_mode,
)
self.logger.debug("Middleware HTML content generated.")
chat_id = body.get("chat_id")
if chat_id and __user__ and "id" in __user__:
try:
middleware_id = self.write_content_to_file(
middleware_content,
__user__["id"],
chat_id,
self.html_dir,
)
self.logger.debug(
f"Middleware content written with ID: {middleware_id}"
)
# Update assistant's response
body["messages"][-1][
"content"
] = f"{{{{HTML_FILE_ID_{middleware_id}}}}}"
self.logger.debug(
"Assistant's response updated with HTML file placeholder."
)
except Exception as e:
error_msg = f"Error processing content: {str(e)}"
self.logger.error(error_msg)
body["messages"][-1][
"content"
] += f"\n\nError: Failed to process content."
else:
self.logger.error("chat_id or user ID is missing.")
self.logger.debug("Outlet processing completed.")
return body