NOTICE
Open WebUI Community is currently undergoing a major revamp to improve user experience and performance ✨

Function
pipe
v0.2
Flux 1.1 Pro
A function to use flux.1.1.pro model to generate vivid images, with together.ai API key
Function ID
fluxpro
Creator
@nickfixit
Downloads
441+

Function Content
python
"""
title: FLUX.1.1 Pro Manifold Function for Black Forest Lab Image Generation Models
author: mobilestack, credit to bgeneto
author_url: https://github.com/mobilestack/open-webui-flux-1.1-pro
funding_url: https://github.com/open-webui
version: 0.2
license: MIT
requirements: pydantic, requests
environment_variables: REPLICATE_API_TOKEN
supported providers: replicate.com
"""

import base64
import os
from typing import Any, Dict, Generator, Iterator, List, Union
import requests
from open_webui.utils.misc import get_last_user_message
from pydantic import BaseModel, Field

class Pipe:
    """
    Class representing the FLUX.1.1-pro Manifold Function.
    """
    class Valves(BaseModel):
        """
        Pydantic model for storing API keys and base URLs.
        """
        REPLICATE_API_TOKEN: str = Field(
            default="", description="Your API Token for Replicate"
        )
        REPLICATE_API_BASE_URL: str = Field(
            default="https://api.replicate.com/v1/predictions",
            description="Base URL for the Replicate API",
        )

    def __init__(self):
        """
        Initialize the Pipe class with default values and environment variables.
        """
        self.type = "manifold"
        self.id = "FLUX_1_1_PRO"
        self.name = "FLUX.1.1-pro: "
        self.valves = self.Valves(
            REPLICATE_API_TOKEN=os.getenv("REPLICATE_API_TOKEN", ""),
            REPLICATE_API_BASE_URL=os.getenv(
                "REPLICATE_API_BASE_URL",
                "https://api.replicate.com/v1/predictions",
            ),
        )

    # [Previous helper methods remain the same]

    def non_stream_response(
        self, headers: Dict[str, str], payload: Dict[str, Any]
    ) -> str:
        """
        Get a non-streaming response from the API.
        """
        try:
            # Create prediction
            response = requests.post(
                url=self.valves.REPLICATE_API_BASE_URL,
                headers=headers,
                json=payload,
                timeout=(3.05, 60),
            )
            response.raise_for_status()
            prediction = response.json()

            # Poll for completion
            while prediction['status'] not in ['succeeded', 'failed', 'canceled']:
                poll_url = f"{self.valves.REPLICATE_API_BASE_URL}/{prediction['id']}"
                response = requests.get(poll_url, headers=headers)
                prediction = response.json()
                if prediction['status'] == 'failed':
                    return f"Error: Generation failed: {prediction.get('error', 'Unknown error')}"

            # Handle the completed prediction
            if prediction['status'] == 'succeeded':
                # Replicate returns a URL directly
                image_url = prediction['output']
                # Download the image and convert to base64
                img_response = requests.get(image_url)
                img_response.raise_for_status()
                content_type = img_response.headers.get("Content-Type", "image/jpeg")
                image_base64 = base64.b64encode(img_response.content).decode("utf-8")
                return f"![Image](data:{content_type};base64,{image_base64})\n`GeneratedImage.{content_type.split('/')[-1]}`"
            
            return "Error: Image generation failed"

        except requests.exceptions.RequestException as e:
            return f"Error: Request failed: {e}"
        except Exception as e:
            return f"Error: {e}"

    def pipe(
        self, body: Dict[str, Any]
    ) -> Union[str, Generator[str, None, None], Iterator[str]]:
        """
        Process the pipe request.
        """
        headers = {
            "Authorization": f"Token {self.valves.REPLICATE_API_TOKEN}",
            "Content-Type": "application/json",
        }
        
        prompt = get_last_user_message(body["messages"])
        
        # Replicate-specific payload
        payload = {
            "version": "black-forest-labs/flux-1.1-pro-ultra",
            "input": {
                "prompt": prompt,
                "raw": False,
                "aspect_ratio": "1:1",
                "output_format": "jpg",
                "safety_tolerance": 2
            }
        }

        try:
            if body.get("stream", False):
                return self.stream_response(headers, payload)
            else:
                return self.non_stream_response(headers, payload)
        except requests.exceptions.RequestException as e:
            return f"Error: Request failed: {e}"
        except Exception as e:
            return f"Error: {e}"

    def pipes(self) -> List[Dict[str, str]]:
        """
        Get the list of available pipes.
        """
        return [{"id": "flux_1_1_pro", "name": "Flux 1.1 PRO"}]