"""
title: Open-WebUI Reasoning Manifold
version: 0.4.3
- [x] Updated to work on OWUI 0.4.x
- [x] OpenAI Streaming
- [x] Ollama Streaming
- [x] Emit collapsible
- [ ] Fix "cannot use 'in' operator to search for "detail" in "404: Model not f..." (cannot reproduce?)
- [ ] Stream output asap
- [ ] Delay output pending final LLM summary
"""
import os
import json
import time
import asyncio
from typing import List, Union, Optional, Callable, Awaitable, Dict, Any
from pydantic import BaseModel, Field
from open_webui.utils.misc import pop_system_message
from starlette.responses import StreamingResponse
from open_webui.main import (
generate_chat_completions,
get_task_model_id,
)
from open_webui.utils.misc import get_last_assistant_message, get_content_from_message
from open_webui.config import TASK_MODEL, TASK_MODEL_EXTERNAL
import logging
import inspect # Import to inspect the function signature
# Mock the user object as expected by the function (OWUI 0.4.x constraint)
mock_user = {
"id": "reasoning_manifold",
"username": "reasoning_manifold",
"role": "admin",
}
class Pipe:
# Compatibility layer to handle 0.3.x to 0.4.x change to signature
def resolve_task_model_id(self, model_id: str) -> str:
"""
Resolve the task model ID dynamically based on the version of get_task_model_id.
Supports:
- Older single-argument version
- Updated multi-argument version
- Overrides via valve configuration.
"""
try:
# Check for valve override
valve_override = self.valves.thought_summary_model_id
if valve_override != "default-task-model":
self.log_debug(
f"[THOUGHT_SUMMARY] valve override will be used: {valve_override}"
)
return valve_override
# Get the signature of the `get_task_model_id` function
sig = inspect.signature(get_task_model_id)
params = sig.parameters
task_model_id = ""
# Check the number of parameters and their names
if len(params) == 1:
# Single-argument version (0.3.x)
self.log_debug(f"[THOUGHT_SUMMARY] detected OWUI <=0.3.x")
task_model_id = get_task_model_id(model_id)
elif len(params) == 4:
# Multi-argument version (0.4.x)
from open_webui.main import get_all_base_models
self.log_debug(f"[THOUGHT_SUMMARY] detected OWUI >=0.4.x")
self.log_debug(
f"[THOUGHT_SUMMARY] selecting model using params: {model_id}, {task_model}, {task_model_external}, {models}"
)
task_model_id = get_task_model_id(
default_model_id=model_id,
task_model=TASK_MODEL,
task_model_external=TASK_MODEL_EXTERNAL,
models=get_all_base_models(),
)
else:
raise TypeError("Unexpected number of arguments in get_task_model_id")
return model
except Exception as e:
raise RuntimeError(f"Error resolving task model ID: {e}")
class Valves(BaseModel):
"""
Configuration for the Open-WebUI Reasoning Manifold.
"""
# Model Configuration
model_ids: str = Field(
default="marco-o1",
description="Comma-separated list of model IDs to be used by the manifold.",
)
manifold_prefix: str = Field(
default="reason/",
description="Prefix used for model names.",
)
streaming_enabled: bool = Field(
default=True, description="Enable or disable streaming for responses."
)
# Thought Handling Configuration
thought_tag: str = Field(
default="Thought", description="The XML tag for internal thoughts."
)
output_tag: str = Field(
default="Output", description="The XML tag for final output."
)
# Status Update Configuration
use_collapsible: bool = Field(
default=False,
description="Collapsible UI to reveal generated thoughts.",
)
enable_llm_summaries: bool = Field(
default=False,
description="Enable LLM-generated summaries for updates.",
)
thought_summary_model_id: str = Field(
default="default-task-model",
description=(
"Optional override for the model ID used to generate thought summaries. "
"If 'default-task-model', will use the relevant task model from Admin Panel > Settings > Interface."
),
)
thought_summary_interval_tokens: int = Field(
default=10,
description="Number of tokens after which to generate a thought summary.",
)
dynamic_status_prompt: str = Field(
default="Summarize the current thought process in exactly 4 words.",
description="Prompt for generating LLM summaries.",
)
dynamic_status_system_prompt: str = Field(
default="You are a helpful assistant summarizing thoughts concisely, only respond with the summary.",
description="System prompt for dynamic status generation.",
)
emit_interval: float = Field(
default=3.0,
description="Interval in seconds between status updates.",
)
# Debugging
debug: bool = Field(default=False, description="Enable debug logging.")
def __init__(self):
"""
Initialize the Pipe with default valves and necessary state variables.
"""
self.type = "manifold"
self.id = "reason"
self.valves = self.Valves()
self.name = self.valves.manifold_prefix
self.stop_emitter = asyncio.Event()
# State Variables
self.thought_buffer = ""
self.output_buffer = ""
self.determined = False
self.inside_thought = False
self.inside_output = False
self.token_count_since_last_summary = 0
self.start_time = None
self.buffer = "" # Initialize the buffer
self.tracked_tokens = [] # Track tokens for dynamic summaries
self.last_summary_time = time.time() # Last time a summary was emitted
self.task_model_id = None # Will be set in pipe method
self.thought_summary_task = None # Task for generating summaries
self.messages = [] # Store the messages list
self.thought_tags = [] # List of thought tags per model
self.output_tags = [] # List of output tags per model
# Initialize current tags
self.current_thought_tag = (
self.valves.thought_tag
) # Default to valve's thought_tag
self.current_output_tag = (
self.valves.output_tag
) # Default to valve's output_tag
# Setup Logging
self.log = logging.getLogger(self.__class__.__name__)
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
if not self.log.handlers:
self.log.addHandler(handler)
self.log.setLevel(logging.DEBUG if self.valves.debug else logging.INFO)
# Parse tags based on model_ids
self.parse_tags()
def log_debug(self, message: str):
"""Log debug messages if debugging is enabled."""
if self.valves.debug:
self.log.debug(message)
def parse_tags(self):
"""
Parse the thought_tag and output_tag fields into lists matching the model_ids.
Supports both singular tags (applied to all models) and comma-separated lists.
"""
model_ids = [m.strip() for m in self.valves.model_ids.split(",") if m.strip()]
model_count = len(model_ids)
self.log_debug(f"Parsing tags for {model_count} models.")
# Parse thought_tags
thought_tags = [tag.strip() for tag in self.valves.thought_tag.split(",")]
if len(thought_tags) == 1:
self.thought_tags = thought_tags * model_count
elif len(thought_tags) == model_count:
self.thought_tags = thought_tags
else:
self.log.debug(
f"[TAG_ERROR] Number of thought_tags ({len(thought_tags)}) does not match number of model_ids ({model_count}). "
f"Defaulting all thought_tags to '{thought_tags[0]}'"
)
self.thought_tags = [thought_tags[0]] * model_count
self.log_debug(f"Parsed thought_tags: {self.thought_tags}")
# Parse output_tags
output_tags = [tag.strip() for tag in self.valves.output_tag.split(",")]
if len(output_tags) == 1:
self.output_tags = output_tags * model_count
elif len(output_tags) == model_count:
self.output_tags = output_tags
else:
self.log.debug(
f"[TAG_ERROR] Number of output_tags ({len(output_tags)}) does not match number of model_ids ({model_count}). "
f"Defaulting all output_tags to '{output_tags[0]}'"
)
self.output_tags = [output_tags[0]] * model_count
self.log_debug(f"Parsed output_tags: {self.output_tags}")
def get_models(self) -> List[dict]:
"""List of models derived from the valve configuration."""
model_ids = [
model_id.strip()
for model_id in self.valves.model_ids.split(",")
if model_id.strip()
]
return [{"id": model_id, "name": model_id} for model_id in model_ids]
def pipes(self) -> List[dict]:
"""Return the list of model pipes."""
return self.get_models()
def get_summary_model_id(self) -> str:
"""
Determine the model ID to use for generating summaries.
Returns the `thought_summary_model_id` if specified; otherwise, falls back to `task_model_id`.
"""
if self.valves.thought_summary_model_id:
self.log_debug(
f"[SUMMARY_CHECK] Using override model ID for summary: {self.valves.thought_summary_model_id}"
)
return self.valves.thought_summary_model_id
else:
self.log_debug(
f"[SUMMARY_CHECK] Using dynamic task model ID: {self.task_model_id}"
)
return self.task_model_id or ""
async def pipe(
self,
body: dict,
__user__: Optional[dict] = None,
__event_emitter__: Callable[[dict], Awaitable[None]] = None,
) -> Union[str, StreamingResponse]:
"""
Main handler for processing requests.
"""
# Reset state variables for each new request
self.thought_buffer = ""
self.output_buffer = ""
self.determined = False
self.inside_thought = False
self.inside_output = False
self.token_count_since_last_summary = 0
self.tracked_tokens = []
self.last_summary_time = time.time()
self.start_time = time.time()
self.stop_emitter.clear()
self.thought_summary_task = None
self.buffer = "" # Reset buffer
self.task_model_id = None # Reset task model ID
self.messages = [] # Reset messages list
# Reset current tags
self.current_thought_tag = (
self.valves.thought_tag
) # Default to valve's thought_tag
self.current_output_tag = (
self.valves.output_tag
) # Default to valve's output_tag
try:
# Emit the initial "Thinking" status immediately
if __event_emitter__:
await self.emit_status(__event_emitter__, level="Info", initial=True)
# Extract model ID and clean prefix by stripping up to the first dot
model_id = body.get("model", "")
self.log_debug(f"Original model_id: {model_id}")
if "." in model_id:
model_id = model_id.split(".", 1)[1]
self.log_debug(f"Stripped model_id: {model_id}")
else:
self.log_debug(
f"model_id does not contain a dot and remains unchanged: {model_id}"
)
# Determine the task model ID dynamically, considering valve overrides
try:
self.task_model_id = self.resolve_task_model_id(model_id)
self.log_debug(f"Resolved task_model_id: {self.task_model_id}")
except RuntimeError as e:
self.log_debug(f"Failed to resolve task model ID: {e}")
self.task_model_id = model_id # Fallback to original model_id
self.log_debug(f"Selected task_model_id: {self.task_model_id}")
# Handle system messages if present (assumes system messages are separated)
system_message, messages = pop_system_message(body.get("messages", []))
self.log_debug(f"System message: {system_message}")
self.log_debug(
f"Number of messages after popping system message: {len(messages)}"
)
self.messages = messages # Store messages for later modification
processed_messages = [
{"role": message["role"], "content": message.get("content", "")}
for message in messages
]
payload = {
"model": model_id,
"messages": processed_messages,
}
self.log_debug(f"Payload prepared for model '{model_id}': {payload}")
if self.valves.streaming_enabled:
self.log_debug("Streaming is enabled. Handling stream response.")
response = await self.stream_response(payload, __event_emitter__)
else:
self.log_debug("Streaming is disabled. Handling non-stream response.")
response = await self.non_stream_response(payload, __event_emitter__)
self.log_debug("Response handling completed.")
return response
except json.JSONDecodeError as e:
if __event_emitter__:
await self.emit_status(
__event_emitter__, "Error", f"JSON Decode Error: {str(e)}", True
)
self.log_debug(f"JSON Decode Error in pipe method: {e}")
return f"JSON Decode Error: {e}"
except Exception as e:
if __event_emitter__:
await self.emit_status(
__event_emitter__, "Error", f"Error: {str(e)}", True
)
self.log_debug(f"Error in pipe method: {e}")
return f"Error: {e}"
async def stream_response(self, payload, __event_emitter__) -> StreamingResponse:
"""
Handle streaming responses from generate_chat_completions.
"""
async def response_generator():
self.log_debug(
"[STREAM_RESPONSE] Entered response_generator for stream_response."
)
try:
# Ensure streaming is enabled in the payload
payload["stream"] = True
self.log_debug(
f"[STREAM_RESPONSE] Payload for generate_chat_completions: {payload}"
)
# Call the generate_chat_completions function
stream_response = await generate_chat_completions(
form_data=payload, user=mock_user, bypass_filter=True
)
self.log_debug(
"[STREAM_RESPONSE] generate_chat_completions called successfully."
)
# Ensure the response is a StreamingResponse
if isinstance(stream_response, StreamingResponse):
self.log_debug(
"[STREAM_RESPONSE] Received StreamingResponse from generate_chat_completions."
)
# Iterate over the body_iterator of the StreamingResponse
async for chunk in stream_response.body_iterator:
try:
# Process the chunk based on its type (bytes or str)
if isinstance(chunk, bytes):
chunk_str = chunk.decode("utf-8").rstrip("\n")
elif isinstance(chunk, str):
chunk_str = chunk.rstrip("\n")
else:
self.log_debug(
f"[STREAM_RESPONSE] Unexpected chunk type: {type(chunk)}"
)
continue
self.log_debug(
f"[STREAM_RESPONSE] Raw chunk processed: {chunk_str}"
)
# Skip empty chunks
if not chunk_str:
self.log_debug(
"[STREAM_RESPONSE] Empty chunk received. Skipping."
)
continue
# SSE format starts with 'data: '
if chunk_str.startswith("data:"):
data_str = chunk_str[5:].lstrip()
self.log_debug(
f"[STREAM_RESPONSE] Extracted data_str: {data_str}"
)
# Handle the "[DONE]" marker
if data_str == "[DONE]":
self.log_debug(
"[STREAM_RESPONSE] Received [DONE]. Finalizing stream."
)
# Process any remaining buffer content
if self.buffer.strip():
try:
self.log_debug(
"[STREAM_RESPONSE] Processing remaining buffer content."
)
await self.process_streaming_data(
"", __event_emitter__
)
except AttributeError as e:
self.log_debug(
f"[STREAM_RESPONSE] Error: {e}. Ensure process_streaming_data is defined and accessible."
)
self.buffer = ""
# Emit any remaining thought content
if self.thought_buffer.strip():
self.log_debug(
f"[STREAM_RESPONSE] Emitting remaining thought content: {self.thought_buffer}"
)
if self.valves.use_collapsible:
await self.emit_thought(
__event_emitter__, self.thought_buffer
)
self.thought_buffer = ""
# Emit final status or collapsible enclosure
await self.emit_final_status_or_update(
__event_emitter__
)
break
# Parse the JSON content
try:
chunk_data = json.loads(data_str)
self.log_debug(
f"[STREAM_RESPONSE] Parsed chunk data: {chunk_data}"
)
except json.JSONDecodeError as parse_error:
self.log_debug(
f"[STREAM_RESPONSE] Failed to parse chunk: {chunk_str}. Error: {parse_error}"
)
continue
# Extract content from the chunk
data = (
chunk_data.get("choices", [{}])[0]
.get("delta", {})
.get("content", "")
)
self.log_debug(
f"[STREAM_RESPONSE] Extracted data from chunk: {data}"
)
# Process the data if it exists
if data:
# Track tokens for dynamic summaries
self.tracked_tokens.append(data)
self.log_debug(
f"[STREAM_RESPONSE] Appended data to tracked_tokens: {data}"
)
# Process the data for tag detection and buffering
await self.process_streaming_data(
data, __event_emitter__
)
# Check if it's time to trigger an update
current_time = time.time()
time_since_last_summary = (
current_time - self.last_summary_time
)
if (
time_since_last_summary
>= self.valves.emit_interval
):
self.last_summary_time = current_time
await self.trigger_update(__event_emitter__)
else:
# Handle unexpected data format
self.log_debug(
f"[STREAM_RESPONSE] Unexpected chunk format: {chunk_str}"
)
except Exception as e:
# Handle errors in processing chunks
self.log_debug(
f"[STREAM_RESPONSE] Error processing chunk: {chunk_str}. Error: {e}"
)
continue
else:
self.log_debug(
"[STREAM_RESPONSE] Expected a StreamingResponse but got something else."
)
# Yield nothing as we are managing emissions via event emitter
return
except Exception as e:
# Handle errors in the response generator
self.log_debug(f"[STREAM_RESPONSE] Error in response_generator: {e}")
yield f'data: {{"error": "{str(e)}"}}\n\n'
self.log_debug("[STREAM_RESPONSE] Exiting response_generator.")
self.log_debug(
"[STREAM_RESPONSE] Returning StreamingResponse from stream_response."
)
return StreamingResponse(response_generator(), media_type="text/event-stream")
async def process_streaming_data(self, data: str, __event_emitter__) -> None:
"""
Process incoming streaming data, handling and