"""
title: Image Generation Pipeline (Matthewh)
author: matthewh
version: 1.0
required_open_webui_version: 0.3.9
Instructions:
1. Configure the Image Generation Engine:
- Go to Admin Panel > Settings > Images.
- Set Image Generation Engine to "Default (Open AI)".
- Use model "dall-e-3" with image size "1024x1024" and set steps to "4".
2. Enable the Image Gen Pipe:
- Navigate to Workspace > Functions.
- Ensure that the Image Gen Pipe is enabled.
3. Start a new chat using the Image Gen Pipe:
- Click "New Chat".
- Select "Image Gen Pipe" from the model dropdown list.
"""
from typing import Optional, Callable, Awaitable, Dict, Any, List
from pydantic import BaseModel, Field
import asyncio
import logging
from open_webui.apps.images.main import image_generations, GenerateImageForm
from open_webui.apps.webui.models.users import Users
from open_webui.utils.misc import get_last_user_message, pop_system_message
class Pipe:
"""Pipeline for handling image generation requests."""
class Valves(BaseModel):
"""Configuration for the Image Generation Pipe."""
emit_interval: float = Field(
default=1.0, description="Interval in seconds between status emissions."
)
enable_status_indicator: bool = Field(
default=True, description="Enable or disable status indicator emissions."
)
debug: bool = Field(
default=False, description="Enable or disable debug logging."
)
def __init__(self):
self.valves = self.Valves()
self.stop_emitter = asyncio.Event()
self.log = logging.getLogger(__name__)
async def emit_periodic_status(
self,
__event_emitter__: Callable[[dict], Awaitable[None]],
message: str,
interval: float,
):
"""Emit status updates periodically until the stop event is set."""
start_time = asyncio.get_event_loop().time()
try:
while not self.stop_emitter.is_set():
elapsed_time = asyncio.get_event_loop().time() - start_time
await self.emit_status(
__event_emitter__,
f"{message} (elapsed: {elapsed_time:.1f}s)",
False,
)
await asyncio.sleep(interval)
except asyncio.CancelledError:
if self.valves.debug:
self.log.debug("Periodic status emitter cancelled.")
async def pipe(
self,
body: dict,
__user__: Optional[dict] = None,
__event_emitter__: Callable[[dict], Awaitable[None]] = None,
) -> Optional[Dict[str, Any]]:
"""Main handler for image generation requests."""
status_task = None
try:
# Start emitting status updates periodically
if __event_emitter__ and self.valves.enable_status_indicator:
self.stop_emitter.clear()
status_task = asyncio.create_task(
self.emit_periodic_status(
__event_emitter__,
"Generating an image...",
self.valves.emit_interval,
)
)
# Extract prompt from the messages
_, messages = pop_system_message(body.get("messages", []))
prompt = get_last_user_message(messages)
if prompt is None:
await self.emit_status(
__event_emitter__, "No prompt provided for image generation.", True
)
return {"error": "No prompt provided for image generation."}
if self.valves.debug:
self.log.debug(f"[image_gen] Prompt: {prompt}")
self.log.debug(f"[image_gen] User: {__user__}")
# Generate images
images = await image_generations(
GenerateImageForm(prompt=prompt),
Users.get_user_by_id(__user__["id"]),
)
# Stop the periodic status emitter
if status_task:
self.stop_emitter.set()
await status_task
# Emit generated images to the event emitter
if __event_emitter__:
await self.emit_status(
__event_emitter__, "Image generation completed.", True
)
for image in images:
await __event_emitter__(
{
"type": "message",
"data": {"content": f"![Generated Image]({image['url']})"},
}
)
return {"images": images}
except Exception as e:
# Stop the periodic status emitter
if status_task:
self.stop_emitter.set()
await status_task
# Emit error status
await self.emit_status(
__event_emitter__, f"An error occurred: {str(e)}", True
)
if self.valves.debug:
self.log.error(f"Error during image generation: {e}")
return {"error": str(e)}
async def emit_status(
self,
__event_emitter__: Callable[[dict], Awaitable[None]],
message: str,
done: bool,
):
"""Emit status updates to the event emitter."""
if __event_emitter__:
event = {
"type": "status",
"data": {"description": message, "done": done},
}
if self.valves.debug:
self.log.debug(f"Emitting status event: {event}")
await __event_emitter__(event)