Function
filter
v.04
Reflection Filter
A simple ui for reflection. Working on image preview function.
Function ID
reflection_filter
Creator
@maxkerkula
Downloads
291+

Function Content
python
"""
title: Reflection Filter
author: Max Kerkula
version: .04
"""

import os
import uuid
import logging
import re
import json
from typing import Optional, Callable, Awaitable, Any, List, Dict
from pydantic import BaseModel, Field
import markdown

# Import necessary OpenWebUI modules for handling files
from open_webui.apps.webui.models.files import Files, FileForm
from open_webui.config import UPLOAD_DIR


# Configure module-specific logging
def setup_logger():
    logger = logging.getLogger(__name__)
    if not logger.handlers:
        logger.setLevel(logging.DEBUG)
        handler = logging.StreamHandler()
        handler.set_name("reflection_filter_handler")
        formatter = logging.Formatter(
            "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
        )
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        logger.propagate = False
    return logger


logger = setup_logger()


def clean_message(message: str, tags: list) -> str:
    """
    Cleans the message by ensuring that tags are properly formatted without additional markdown or formatting.
    """
    for tag in tags:
        # Clean opening tags
        message = re.sub(
            rf"(\*\*|\_\_)?\s*<\s*{tag}\s*>\s*(\*\*|\_\_)?",
            f"<{tag}>",
            message,
            flags=re.IGNORECASE,
        )
        # Clean closing tags
        message = re.sub(
            rf"(\*\*|\_\_)?\s*<\s*/\s*{tag}\s*>\s*(\*\*|\_\_)?",
            f"",
            message,
            flags=re.IGNORECASE,
        )
    return message


def extract_tag_content(message: str, tags: list) -> List[Dict[str, str]]:
    """
    Extracts content within specified tags from the message.
    Groups , , and  as a single round.
    Returns a list of artifacts, each containing the specified tags.
    """
    message = clean_message(message, tags)
    artifacts = []

    # Pattern to identify opening and closing tags
    tag_pattern = re.compile(rf"<\s*(/{'?'}\s*({ '|'.join(tags) })\s*)>", re.IGNORECASE)

    # Find all tag positions
    tag_positions = [
        (m.start(), m.end(), m.group(2).lower(), "/" in m.group(1))
        for m in tag_pattern.finditer(message)
    ]

    current_round = {}
    for pos in tag_positions:
        start, end, tag, is_closing = pos
        content = (
            message[end:].split("<", 1)[0].strip()
        )  # Content until next tag or end

        if not is_closing:
            current_round[tag] = content
        else:
            if tag in current_round:
                artifacts.append(current_round.copy())
                current_round = {}

    return artifacts


class MiddlewareHTMLGenerator:
    @staticmethod
    def generate_style(dark_mode=True):
        """
        Generates CSS styles based on the dark_mode flag, including styles for sticky "Show Thinking" button
        and scrollbar in fullscreen mode.
        """
        background_color = "#1e1e1e" if dark_mode else "#f5f5f5"
        text_color = "#e0e0e0" if dark_mode else "#333"
        secondary_background_color = "#2d2d2d" if dark_mode else "#fff"
        highlight_color = "#3d3d3d" if dark_mode else "#f0f0f0"
        button_color = "#444" if dark_mode else "#ddd"
        overlay_background = "rgba(0, 0, 0, 0.9)"
        arrow_color = "#888" if dark_mode else "#ccc"

        return f"""
        body {{
            font-family: 'Inter', sans-serif;
            background-color: {background_color};
            color: {text_color};
            margin: 0;
            padding: 0;
            line-height: 1.6;
        }}
        .artifact-container {{
            max-width: 800px;
            margin: 20px auto;
            background-color: {secondary_background_color};
            border-radius: 8px;
            overflow: visible !important;
            box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
            padding: 5px !important;
            position: relative;
        }}
        /* Fullscreen mode styles */
        .artifact-container.fullscreen {{
            width: 100%;
            height: 100%;
            max-width: none;
            margin: 0;
            padding: 10px;
            overflow-y: auto; /* Enable scrollbar in fullscreen mode */
        }}
        .collapsible {{
            background-color: {highlight_color};
            color: {text_color};
            cursor: pointer;
            padding: 10px;
            width: 100%;
            border: none;
            text-align: left;
            outline: none;
            font-size: 16px;
            border-radius: 5px;
            transition: background-color 0.3s;
            margin-bottom: 5px;
            position: sticky; /* Make the button sticky */
            top: 0; /* Stick to the top */
            z-index: 2; /* Ensure it stays above other content */
        }}
        .active, .collapsible:hover {{
            background-color: {secondary_background_color};
        }}
        .content {{
            padding: 0 10px;
            display: none;
            overflow: visible !important;
            background-color: {background_color};
            margin-bottom: 10px;
            border-radius: 5px;
        }}
        pre {{
            white-space: pre-wrap;
            word-wrap: break-word;
            background-color: {highlight_color};
            padding: 10px;
            border-radius: 5px;
            overflow-x: auto;
        }}
        h2 {{
            margin-top: 20px;
            margin-bottom: 10px;
            color: {text_color};
            text-align: center;
        }}
        /* Styles for gallery */
        .gallery {{
            display: flex;
            flex-wrap: wrap;
            gap: 10px;
            margin-top: 20px;
            justify-content: center;
        }}
        .gallery img {{
            width: auto;
            height: auto;
            max-height: 300px;
            object-fit: cover;
            cursor: pointer;
            border-radius: 5px;
            transition: transform 0.2s;
        }}
        .gallery img:hover {{
            transform: scale(1.05);
        }}
        /* Image modal styles */
        .image-modal {{
            position: fixed;
            top: 0;
            left: 0;
            width: 100%;
            height: 100%;
            background-color: {overlay_background};
            display: flex;
            justify-content: center;
            align-items: center;
            z-index: 5;
        }}
        .image-modal img {{
            max-width: 90%;
            max-height: 90%;
            border-radius: 5px;
        }}
        /* Fullscreen Toggle Button */
        .fullscreen-button {{
            position: absolute;
            top: 10px;
            right: 10px;
            background-color: {button_color};
            color: {text_color};
            border: none;
            padding: 8px 12px;
            cursor: pointer;
            border-radius: 5px;
            z-index: 3;
        }}
        .fullscreen-button:hover {{
            background-color: {highlight_color};
        }}
        /* Navigation buttons in image modal */
        .nav-button {{
            position: absolute;
            top: 50%;
            transform: translateY(-50%);
            background-color: transparent;
            border: none;
            font-size: 48px;
            color: {arrow_color};
            cursor: pointer;
            z-index: 6;
            user-select: none;
        }}
        .nav-button:hover {{
            color: {text_color};
        }}
        .nav-button.left {{
            left: 20px;
        }}
        .nav-button.right {{
            right: 20px;
        }}
        """

    @staticmethod
    def generate_script():
        """
        Generates JavaScript code for interactivity, including collapsible sections,
        image modal with navigation, and fullscreen functionality.
        """
        return """
        document.addEventListener('DOMContentLoaded', function() {
            // Collapsible sections
            var coll = document.getElementsByClassName('collapsible');
            for (var i = 0; i < coll.length; i++) {
                coll[i].addEventListener('click', function() {
                    this.classList.toggle('active');
                    var content = this.nextElementSibling;
                    if (content.style.display === 'block') {
                        content.style.display = 'none';
                    } else {
                        content.style.display = 'block';
                    }
                });
            }

            // Image modal with navigation
            var images = document.querySelectorAll('.gallery img');
            var currentIndex = 0;
            var imageSources = Array.from(images).map(img => img.src);

            images.forEach(function(img, index) {
                img.addEventListener('click', function(event) {
                    event.stopPropagation();
                    currentIndex = index;
                    openImageModal();
                });
            });

            function openImageModal() {
                // Create modal overlay
                var modalOverlay = document.createElement('div');
                modalOverlay.classList.add('image-modal');

                // Create image element
                var image = document.createElement('img');
                image.src = imageSources[currentIndex];

                // Append image to modal
                modalOverlay.appendChild(image);

                // Create navigation buttons
                var leftButton = document.createElement('button');
                leftButton.classList.add('nav-button', 'left');
                leftButton.innerHTML = '❮'; // Left arrow
                modalOverlay.appendChild(leftButton);

                var rightButton = document.createElement('button');
                rightButton.classList.add('nav-button', 'right');
                rightButton.innerHTML = '❯'; // Right arrow
                modalOverlay.appendChild(rightButton);

                // Event listeners for navigation buttons
                leftButton.addEventListener('click', function(event) {
                    event.stopPropagation();
                    currentIndex = (currentIndex - 1 + imageSources.length) % imageSources.length;
                    image.src = imageSources[currentIndex];
                });

                rightButton.addEventListener('click', function(event) {
                    event.stopPropagation();
                    currentIndex = (currentIndex + 1) % imageSources.length;
                    image.src = imageSources[currentIndex];
                });

                // Append modal to the fullscreen element or body
                var fullscreenElement = document.fullscreenElement || document.webkitFullscreenElement || document.mozFullScreenElement || document.msFullscreenElement;
                if (fullscreenElement) {
                    fullscreenElement.appendChild(modalOverlay);
                } else {
                    document.body.appendChild(modalOverlay);
                }

                // Click event to close modal
                modalOverlay.addEventListener('click', function(event) {
                    event.stopPropagation();
                    if (fullscreenElement) {
                        fullscreenElement.removeChild(modalOverlay);
                    } else {
                        document.body.removeChild(modalOverlay);
                    }
                });
            }

            // Fullscreen toggle button functionality
            var fullscreenButton = document.getElementById('fullscreen-button');
            if (fullscreenButton) {
                fullscreenButton.addEventListener('click', function(event) {
                    event.stopPropagation();
                    var artifactContainer = document.querySelector('.artifact-container');
                    if (!document.fullscreenElement) {
                        if (artifactContainer.requestFullscreen) {
                            artifactContainer.requestFullscreen();
                        } else if (artifactContainer.webkitRequestFullscreen) { /* Safari */
                            artifactContainer.webkitRequestFullscreen();
                        } else if (artifactContainer.msRequestFullscreen) { /* IE11 */
                            artifactContainer.msRequestFullscreen();
                        }
                    } else {
                        if (document.exitFullscreen) {
                            document.exitFullscreen();
                        } else if (document.webkitExitFullscreen) { /* Safari */
                            document.webkitExitFullscreen();
                        } else if (document.msExitFullscreen) { /* IE11 */
                            document.msExitFullscreen();
                        }
                    }
                });
            }
        });
        """

    @classmethod
    def create_middleware_html(
        cls,
        thinking_list: List[str],
        output_list: List[str],
        final_answer: str,
        image_descriptions: List[str],
        image_urls: List[str],
        dark_mode: bool,
    ) -> str:
        """
        Creates the final middleware HTML with updated navigation buttons in the fullscreen image preview.
        """
        # Combine all rounds of thinking content
        thinking_content = ""
        for idx, thinking in enumerate(thinking_list):
            thinking_markdown = markdown.markdown(thinking, extensions=["extra"])
            thinking_content += f"
\n

Round {idx + 1}

\n{thinking_markdown}\n
\n" # Combine all outputs output_content = "" for output in output_list: output_markdown = markdown.markdown(output, extensions=["extra"]) output_content += f"{output_markdown}\n" # Include final answer explicitly if needed if final_answer: final_answer_markdown = markdown.markdown( final_answer, extensions=["extra"] ) output_content += f"

Final Answer

{final_answer_markdown}

" # Include images in the output_html as a centered gallery gallery_html = "" if image_urls: gallery_html += "" output_content += gallery_html return f""" Reflection Filter Output
{thinking_content}
{output_content}
""" class Filter: class Valves(BaseModel): priority: int = Field( default=0, description="Priority level for the filter operations." ) dark_mode: bool = Field(default=True, description="Enable dark mode by default") valves: Valves viz_dir: str = "visualizations" html_dir: str = "html" system_prompt: str = ( """ You are an AI assistant designed to provide detailed, step-by-step responses while maintaining conciseness and clarity. Your outputs must adhere to the following strict structure: - **Use Exact Tags**: All tags must appear exactly as specified, each on a separate line with no additional text or formatting. ### **Response Structure** For each query, perform multiple rounds of reasoning (3 to 5 for simple tasks and 7 or more for complex tasks) following these steps: - **Analysis**: Analyze the query and outline your approach. - **Plan**: Present a step-by-step plan to address the query. - **Reasoning**: Use logical reasoning to break down your thoughts into clear steps. - **Incorporate Information**: Integrate relevant information from provided data or context. - **Problem Breakdown**: Break down complex problems into manageable sub-problems. **Confidence Level**: Provide a confidence score between 0-100% for this reasoning phase. - **Review**: Rephrase your reasoning to ensure clarity. - **Error Check**: Identify any potential errors or oversights. - **Adjustments**: Modify your conclusions based on identified errors. - **Depth**: Analyze how your reasoning affects future conclusions. **Confidence Level**: Provide a confidence score between 0-100% for this reflection phase. - **Evaluation**: Assess the quality and validity of your reasoning. - **Decision**: Decide whether to continue with another round or proceed to the final answer. - **Output**: Provide your decision in JSON format: { "title": "Discriminator Decision", "content": "Explanation of your evaluation and decision...", "confidence": 0-100, "next_action": "continue" or "final_answer" } **Confidence Level**: Provide a confidence score between 0-100% for this discriminator phase. **Repeat until you decide to proceed to the final answer.** - **Final Answer**: Present your final answer concisely. - **Formatting**: Use markdown elements appropriately to enhance readability. - **Image Description**: If images are relevant, include them using Markdown syntax: ![Alt Text](image_url) **Confidence Level**: Provide a confidence score of 100% for the final answer. ### **Guidelines** - **Language and Style**: - Use clear and concise language. - Maintain the same language as the query. - Keep a logical and thorough tone. - **Confidence and Validation**: - Provide confidence levels for each phase. - Proceed to the final answer only when confidence level is 100%. - **Content Considerations**: - If you don't know the answer, explain why. - **Additional Requirements**: - Ensure the reasoning process does not enter an infinite loop. - Maintain the integrity of the tag structure. - Avoid redundancy in your reasoning. """.strip() ) def __init__(self): self.logger = logging.getLogger(__name__) self.valves = self.Valves() def normalize_prompt(self, prompt: str) -> str: """ Normalizes the system prompt by removing excessive whitespace. """ return " ".join(prompt.split()) async def inlet( self, body: dict, __user__: dict = None, __event_emitter__: Callable[[Any], Awaitable[None]] = None, __model__: Optional[dict] = None, ) -> dict: """ Injects the system prompt into the conversation if it's not already present. """ if "messages" not in body: body["messages"] = [] normalized_system_prompt = self.normalize_prompt(self.system_prompt) existing_prompts = [ msg for msg in body["messages"] if msg.get("role") == "system" and self.normalize_prompt(msg.get("content", "")) == normalized_system_prompt ] if not existing_prompts: body["messages"].insert( 0, {"role": "system", "content": self.system_prompt} ) self.logger.debug( "Reflection Filter system prompt injected into the conversation." ) else: self.logger.debug( "Reflection Filter system prompt already present in the conversation." ) return body def ensure_chat_directory(self, chat_id: str, content_type: str) -> str: """ Ensures that the directory for storing artifacts exists. """ sanitized_chat_id = os.path.basename(chat_id) chat_dir = os.path.join( UPLOAD_DIR, self.viz_dir, content_type, sanitized_chat_id ) os.makedirs(chat_dir, exist_ok=True) self.logger.debug(f"Ensured chat directory exists at: {chat_dir}") return chat_dir def write_content_to_file( self, content: str, user_id: str, chat_id: str, content_type: str ) -> str: """ Writes the generated HTML content to a file and registers it with OpenWebUI. """ try: chat_dir = self.ensure_chat_directory(chat_id, content_type) filename = f"{content_type}_{uuid.uuid4()}.html" file_path = os.path.join(chat_dir, filename) with open(file_path, "w", encoding="utf-8") as f: f.write(content) relative_path = os.path.join(self.viz_dir, content_type, chat_id, filename) file_form = FileForm( id=str(uuid.uuid4()), filename=relative_path, meta={ "name": filename, "content_type": "text/html", "size": len(content), "path": file_path, }, ) file_id = Files.insert_new_file(user_id, file_form).id self.logger.debug( f"Written content to file: {file_path} with ID: {file_id}" ) return file_id except Exception as e: self.logger.error(f"Failed to write content to file: {e}") raise async def outlet( self, body: dict, __user__: dict = None, __event_emitter__: Callable[[Any], Awaitable[None]] = None, __call_llm__: Callable[[dict], Awaitable[dict]] = None, __model__: Optional[dict] = None, tool_ids: Optional[List[str]] = None, __tool_responses__: Optional[Dict[str, Any]] = None, ) -> dict: """ Processes the assistant's messages, extracts all rounds of reasoning, reflection, and discriminator, generates the HTML artifact, and updates the assistant's response with the HTML file ID. """ self.logger.debug("Starting outlet processing.") if "messages" not in body or not body["messages"]: self.logger.error("No messages found in the body.") return body assistant_messages = [ msg for msg in body["messages"] if msg.get("role") == "assistant" ] if not assistant_messages: self.logger.warning("No assistant messages found in the body.") return body all_artifacts = [] final_answer = "" image_descriptions = [] image_urls = [] thinking_list = [] output_list = [] tags = ["thinking", "reflection", "discriminator", "output"] for msg in assistant_messages: content = msg.get("content", "") self.logger.debug(f"Processing assistant message: {content[:100]}...") artifacts = extract_tag_content(content, tags) if artifacts: for artifact in artifacts: all_artifacts.append(artifact) # Combine thinking, reflection, and discriminator content as a single round thinking_round = "" for tag in ["thinking", "reflection", "discriminator"]: if artifact.get(tag): # Use markdown headers for each section thinking_round += ( f"### {tag.capitalize()}\n" + markdown.markdown(artifact.get(tag)) + "\n" ) thinking_list.append(thinking_round) if artifact.get("output"): output_list.append(artifact.get("output")) # Process discriminator decision discriminator_content = artifact.get("discriminator", "") try: # Extract JSON from discriminator content decision_match = re.search( r"\{.*\}", discriminator_content, re.DOTALL ) if decision_match: decision_json = decision_match.group(0) decision = json.loads(decision_json) if decision.get("next_action") == "final_answer": self.logger.debug( "Discriminator decided to proceed to final answer." ) except (json.JSONDecodeError, AttributeError): self.logger.warning( f"Failed to parse discriminator decision JSON: {discriminator_content}" ) # Extract final answer and image descriptions final_answer_match = re.search( r"Final Answer:\s*(.*?)(?=(\*\*Confidence Level\*\*|$|Images))", content, re.DOTALL | re.IGNORECASE, ) if final_answer_match: final_answer = final_answer_match.group(1).strip() self.logger.debug(f"Extracted final answer: {final_answer}") # Extract image descriptions and URLs image_desc_matches = re.findall( r"Image Description:\s*(.*?)(?=(\n\n|$|!\[))", content, re.DOTALL | re.IGNORECASE, ) image_desc_matches = [desc[0].strip() for desc in image_desc_matches] image_url_matches = re.findall(r"!\[.*?\]\((.*?)\)", content) image_descriptions.extend(image_desc_matches) image_urls.extend(image_url_matches) if not all_artifacts: self.logger.warning("No tagged content found in assistant messages.") return body # Generate middleware HTML with all rounds middleware_content = MiddlewareHTMLGenerator.create_middleware_html( thinking_list, output_list, final_answer, image_descriptions, image_urls, self.valves.dark_mode, ) self.logger.debug("Middleware HTML content generated.") chat_id = body.get("chat_id") if chat_id and __user__ and "id" in __user__: try: middleware_id = self.write_content_to_file( middleware_content, __user__["id"], chat_id, self.html_dir, ) self.logger.debug( f"Middleware content written with ID: {middleware_id}" ) # Update assistant's response body["messages"][-1][ "content" ] = f"{{{{HTML_FILE_ID_{middleware_id}}}}}" self.logger.debug( "Assistant's response updated with HTML file placeholder." ) except Exception as e: error_msg = f"Error processing content: {str(e)}" self.logger.error(error_msg) body["messages"][-1][ "content" ] += f"\n\nError: Failed to process content." else: self.logger.error("chat_id or user ID is missing.") self.logger.debug("Outlet processing completed.") return body