Whitepaper
Docs
Sign In
Function
Function
pipe
v1.0
Flux Replicate Image Generator
Function ID
flux_replicate_image_generator
Creator
@ifioravanti
Downloads
49+
Flux Replicate Image Generator for Black Forest Lab Models through Replicate (Dev, Pro, Ultra)
Get
README
No README available
Function Code
Show
""" title: FLUX Image Generator Manifold Function for Black Forest Lab Image Generation Models through Replicate author: ivanfioravanti, credit to Balaxxe, mobilestack and bgeneto author_url: https://x.com/ivanfioravanti version: 1.0 license: MIT requirements: pydantic>=2.0.0, aiohttp>=3.8.0 valves: - REPLICATE_API_TOKEN (required) - FLUX_RAW_MODE (optional, default: false) - FLUX_SAFETY_TOLERANCE (optional, default: 2) - FLUX_SEED (optional) - FLUX_ASPECT_RATIO (optional, default: "1:1") - FLUX_OUTPUT_FORMAT (optional, default: "jpg") - MODEL_URL (optional, default: "https://api.replicate.com/v1/models/black-forest-labs/flux-1.1-pro-ultra/predictions") supported providers: replicate.com Users can select settings from OpenWeb UI valves to change model, aspect ratio, seed and more. NOTE: Due to the asynchronous nature of the Replicate API, each image generation will make 2-3 (rare occasion 4) API requests: 1. Initial request to start generation 2. Follow-up request(s) to check completion status This is normal behavior and required by the API design. You will typically see only 2 requests after the first generation. """ from typing import ( Dict, Generator, Iterator, Union, Optional, Literal, Tuple, List, AsyncIterator, ) from pydantic import BaseModel, Field import os import base64 import aiohttp import asyncio import json import uuid import time AspectRatioType = Literal[ "21:9", "16:9", "3:2", "4:3", "5:4", "1:1", "4:5", "3:4", "2:3", "9:16", "9:21" ] OutputFormatType = Literal["jpg", "png"] SafetyToleranceType = Literal[1, 2, 3, 4, 5, 6] ModelVersion = Literal["DEV", "PRO", "ULTRA"] # Model URLs for different versions ModelUrls = { "DEV": "https://api.replicate.com/v1/models/black-forest-labs/flux-dev/predictions", "PRO": "https://api.replicate.com/v1/models/black-forest-labs/flux-1.1-pro/predictions", "ULTRA": "https://api.replicate.com/v1/models/black-forest-labs/flux-1.1-pro-ultra/predictions", } class Pipe: """A pipe that generates images using Black Forest Lab's Image Generation Models.""" class Valves(BaseModel): REPLICATE_API_TOKEN: str = Field( default="", description="Your Replicate API token" ) FLUX_RAW_MODE: bool = Field( default=False, description="Enable raw mode for direct prompt input" ) FLUX_SAFETY_TOLERANCE: SafetyToleranceType = Field( default=2, description="Safety filter strength (1-6)" ) FLUX_SEED: Optional[int] = Field( default=None, description="Random seed for reproducible generations" ) FLUX_ASPECT_RATIO: AspectRatioType = Field( default="1:1", description="Output image aspect ratio" ) FLUX_OUTPUT_FORMAT: OutputFormatType = Field( default="jpg", description="Output image format" ) MODEL_VERSION: ModelVersion = Field( default="ULTRA", description="Version of the model to use" ) class UserValves(BaseModel): MODEL_VERSION: ModelVersion = Field( default="ULTRA", description="Version of the model to use" ) FLUX_RAW_MODE: bool = Field( default=False, description="Enable raw mode for direct prompt input" ) FLUX_SEED: Optional[int] = Field( default=None, description="Random seed for reproducible generations" ) FLUX_ASPECT_RATIO: AspectRatioType = Field( default="1:1", description="Output image aspect ratio" ) FLUX_OUTPUT_FORMAT: OutputFormatType = Field( default="jpg", description="Output image format" ) def __init__(self): self.type = "pipe" self.id = "flux" self.name = "Flux Image Generator" self.valves = self.Valves( REPLICATE_API_TOKEN=os.getenv("REPLICATE_API_TOKEN", ""), FLUX_RAW_MODE=bool(os.getenv("FLUX_RAW_MODE", False)), FLUX_SAFETY_TOLERANCE=int(os.getenv("FLUX_SAFETY_TOLERANCE", "2")), FLUX_SEED=int(os.getenv("FLUX_SEED")) if os.getenv("FLUX_SEED") else None, FLUX_ASPECT_RATIO=os.getenv("FLUX_ASPECT_RATIO", "1:1"), FLUX_OUTPUT_FORMAT=os.getenv("FLUX_OUTPUT_FORMAT", "jpg"), MODEL_VERSION=os.getenv("MODEL_VERSION", "ULTRA"), ) self.user_valves = self.UserValves() async def _process_image( self, url_or_data: str, prompt: str, params: Dict, stream: bool = True ) -> Union[str, List[str]]: """Process image data and return it in SSE format.""" # Fetch image data if URL provided if url_or_data.startswith("http"): async with aiohttp.ClientSession() as session: async with session.get(url_or_data, timeout=30) as response: response.raise_for_status() image_data = base64.b64encode(await response.read()).decode("utf-8") content_type = response.headers.get( "Content-Type", f"image/{params.get('FLUX_OUTPUT_FORMAT')}" ) image_url = f"data:{content_type};base64,{image_data}" else: image_url = url_or_data # Build response chunks responses = [] # Image container chunk responses.append( self._create_sse_chunk( f'<div class="generated-image-container">' f'<img src="{image_url}" alt="Generated Image" style="max-width: 100%; height: auto; border-radius: 8px; margin-bottom: 8px;" />' ) ) # Finish chunks responses.append(self._create_sse_chunk({}, finish_reason="stop")) responses.append("data: [DONE]\n\n") return responses if stream else "".join(responses) def _create_sse_chunk( self, content: Union[str, Dict], content_type: str = "text/html", finish_reason: Optional[str] = None, ) -> str: """Create a Server-Sent Events chunk.""" chunk_data = { "id": f"chatcmpl-{uuid.uuid4()}", "object": "chat.completion.chunk", "created": int(time.time()), "model": "flux-1.1-pro-ultra", "choices": [ { "delta": ( {} if finish_reason else { "role": "assistant", "content": content, "content_type": content_type, } ), "index": 0, "finish_reason": finish_reason, } ], } return f"data: {json.dumps(chunk_data)}\n\n" async def _wait_for_completion( self, prediction_url: str, __event_emitter__=None ) -> Dict: headers = { "Authorization": f"Token {self.valves.REPLICATE_API_TOKEN}", "Accept": "application/json", "Prefer": "wait=30", } async with aiohttp.ClientSession() as session: await asyncio.sleep(2) async with session.get( prediction_url, headers=headers, timeout=35 ) as response: response.raise_for_status() result = await response.json() if result.get("status") in ["succeeded", "failed", "canceled"]: return result await asyncio.sleep(3) async with session.get( prediction_url, headers=headers, timeout=35 ) as response: response.raise_for_status() result = await response.json() if result.get("status") in ["succeeded", "failed", "canceled"]: return result await asyncio.sleep(3) async with session.get( prediction_url, headers=headers, timeout=35 ) as response: response.raise_for_status() final_result = await response.json() if final_result.get("status") in ["succeeded", "failed", "canceled"]: return final_result raise Exception( f"Generation incomplete after {final_result.get('status')} status" ) async def pipe( self, body: Dict, __user__: Optional[dict] = None, __event_emitter__=None ) -> AsyncIterator[str]: if not self.valves.REPLICATE_API_TOKEN: yield "Error: REPLICATE_API_TOKEN is required" return try: prompt = (body.get("messages", [{}])[-1].get("content", "") or "").strip() if not prompt: yield "Error: No prompt provided" return input_params = { "prompt": prompt, "raw": __user__["valves"].FLUX_RAW_MODE, "aspect_ratio": __user__["valves"].FLUX_ASPECT_RATIO, "output_format": __user__["valves"].FLUX_OUTPUT_FORMAT, "safety_tolerance": self.valves.FLUX_SAFETY_TOLERANCE, } if __user__["valves"].FLUX_SEED is not None: input_params["seed"] = int(__user__["valves"].FLUX_SEED) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Starting Flux {__user__['valves'].MODEL_VERSION} generation...", "done": False, }, } ) # Set MODEL_URL based on selected version model_url = ModelUrls[__user__["valves"].MODEL_VERSION] async with aiohttp.ClientSession() as session: async with session.post( model_url, headers={ "Authorization": f"Token {self.valves.REPLICATE_API_TOKEN}", "Content-Type": "application/json", "Prefer": "wait=30", }, json={"input": input_params}, timeout=35, ) as response: response.raise_for_status() prediction = await response.json() result = await self._wait_for_completion( prediction["urls"]["get"], __event_emitter__ ) if result.get("status") != "succeeded": raise Exception( f"Generation failed: {result.get('error', 'Unknown error')}" ) metrics = result.get("metrics", {}) logs = result.get("logs", "") seed = ( logs.split("Using seed:")[1].split()[0].strip() if "Using seed:" in logs else None ) image_url = result.get("output") # Handle case where output is a list if isinstance(image_url, list) and len(image_url) > 0: image_url = image_url[0] if not image_url: raise Exception("No valid output URL in prediction result") if __event_emitter__: await __event_emitter__( { "type": "message", "data": { "content": f"", "content_type": "text/markdown", }, } ) await __event_emitter__( { "type": "message", "data": { "content": f""" **Generation Details** - **Model Version:** {__user__["valves"].MODEL_VERSION} - **Prompt:** {prompt} - **Aspect Ratio:** {input_params["aspect_ratio"]} - **Format:** {input_params["output_format"]} - **Safety Level:** {input_params["safety_tolerance"]} - **Seed:** {seed or input_params.get("seed", "Random")} - **Generation Time:** {metrics.get("predict_time", "N/A")}s """, "content_type": "text/markdown", }, } ) await __event_emitter__( { "type": "status", "data": { "description": "Image generated successfully!", "done": True, }, } ) yield "" except Exception as e: error_msg = f"Error: {str(e)}" if __event_emitter__: await __event_emitter__( {"type": "status", "data": {"description": error_msg, "done": True}} ) yield error_msg