Function
pipe
v1.0
Image Gen Pipe
A streamlined image generation pipeline, handling prompts, periodic status updates, and error management, designed for seamless integration with OpenWebUI's image generation tools.
Function ID
image_gen_pipe
Creator
@matthewh
Downloads
8+

Function Content
python
"""
title: Image Generation Pipeline (Matthewh)
author: matthewh
version: 1.0
required_open_webui_version: 0.3.9

Instructions:
1. Configure the Image Generation Engine:
    - Go to Admin Panel > Settings > Images.
    - Set Image Generation Engine to "Default (Open AI)".
    - Use model "dall-e-3" with image size "1024x1024" and set steps to "4".
2. Enable the Image Gen Pipe:
    - Navigate to Workspace > Functions.
    - Ensure that the Image Gen Pipe is enabled.
3. Start a new chat using the Image Gen Pipe:
    - Click "New Chat".
    - Select "Image Gen Pipe" from the model dropdown list.
"""

from typing import Optional, Callable, Awaitable, Dict, Any, List
from pydantic import BaseModel, Field
import asyncio
import logging

from open_webui.apps.images.main import image_generations, GenerateImageForm
from open_webui.apps.webui.models.users import Users
from open_webui.utils.misc import get_last_user_message, pop_system_message


class Pipe:
    """Pipeline for handling image generation requests."""

    class Valves(BaseModel):
        """Configuration for the Image Generation Pipe."""

        emit_interval: float = Field(
            default=1.0, description="Interval in seconds between status emissions."
        )
        enable_status_indicator: bool = Field(
            default=True, description="Enable or disable status indicator emissions."
        )
        debug: bool = Field(
            default=False, description="Enable or disable debug logging."
        )

    def __init__(self):
        self.valves = self.Valves()
        self.stop_emitter = asyncio.Event()
        self.log = logging.getLogger(__name__)

    async def emit_periodic_status(
        self,
        __event_emitter__: Callable[[dict], Awaitable[None]],
        message: str,
        interval: float,
    ):
        """Emit status updates periodically until the stop event is set."""
        start_time = asyncio.get_event_loop().time()
        try:
            while not self.stop_emitter.is_set():
                elapsed_time = asyncio.get_event_loop().time() - start_time
                await self.emit_status(
                    __event_emitter__,
                    f"{message} (elapsed: {elapsed_time:.1f}s)",
                    False,
                )
                await asyncio.sleep(interval)
        except asyncio.CancelledError:
            if self.valves.debug:
                self.log.debug("Periodic status emitter cancelled.")

    async def pipe(
        self,
        body: dict,
        __user__: Optional[dict] = None,
        __event_emitter__: Callable[[dict], Awaitable[None]] = None,
    ) -> Optional[Dict[str, Any]]:
        """Main handler for image generation requests."""
        status_task = None
        try:
            # Start emitting status updates periodically
            if __event_emitter__ and self.valves.enable_status_indicator:
                self.stop_emitter.clear()
                status_task = asyncio.create_task(
                    self.emit_periodic_status(
                        __event_emitter__,
                        "Generating an image...",
                        self.valves.emit_interval,
                    )
                )

            # Extract prompt from the messages
            _, messages = pop_system_message(body.get("messages", []))
            prompt = get_last_user_message(messages)
            if prompt is None:
                await self.emit_status(
                    __event_emitter__, "No prompt provided for image generation.", True
                )
                return {"error": "No prompt provided for image generation."}

            if self.valves.debug:
                self.log.debug(f"[image_gen] Prompt: {prompt}")
                self.log.debug(f"[image_gen] User: {__user__}")

            # Generate images
            images = await image_generations(
                GenerateImageForm(prompt=prompt),
                Users.get_user_by_id(__user__["id"]),
            )

            # Stop the periodic status emitter
            if status_task:
                self.stop_emitter.set()
                await status_task

            # Emit generated images to the event emitter
            if __event_emitter__:
                await self.emit_status(
                    __event_emitter__, "Image generation completed.", True
                )
                for image in images:
                    await __event_emitter__(
                        {
                            "type": "message",
                            "data": {"content": f"![Generated Image]({image['url']})"},
                        }
                    )

            return {"images": images}

        except Exception as e:
            # Stop the periodic status emitter
            if status_task:
                self.stop_emitter.set()
                await status_task

            # Emit error status
            await self.emit_status(
                __event_emitter__, f"An error occurred: {str(e)}", True
            )
            if self.valves.debug:
                self.log.error(f"Error during image generation: {e}")
            return {"error": str(e)}

    async def emit_status(
        self,
        __event_emitter__: Callable[[dict], Awaitable[None]],
        message: str,
        done: bool,
    ):
        """Emit status updates to the event emitter."""
        if __event_emitter__:
            event = {
                "type": "status",
                "data": {"description": message, "done": done},
            }
            if self.valves.debug:
                self.log.debug(f"Emitting status event: {event}")
            await __event_emitter__(event)