Whitepaper
Docs
Sign In
Function
Function
pipe
v1.0.1
Gemini Imagen 3 Manifold Pipe
Function ID
google_genai_imagen_3
Creator
@intraweb
Downloads
62+
Function to generate images from Google AI Imagen 3 model
Get
README
Function Code
Show
""" title: Gemini Imagen 3 (Generative Language API) description: Open WebUI function for generating images using Google's Imagen 3 model via the Generative Language API. author: Intraweb & Gemini 2.5 Flash funding_url: https://github.com/open-webui version: 1.0.1 """ import base64 import json import logging import re import time import uuid from typing import List, Dict, Any, Optional, Callable, Awaitable, Literal import httpx from fastapi import Request from pydantic import BaseModel, Field from starlette.responses import StreamingResponse class Pipe: class Valves(BaseModel): GEMINI_API_KEY: str = Field( default="", description="Your Google AI Studio API key" ) API_BASE_URL: str = Field( default="https://generativelanguage.googleapis.com/v1beta", description="Google Generative Language API base URL", ) MODEL: str = Field( default="imagen-3.0-generate-002", description="Imagen model to use" ) IMAGE_COUNT: int = Field( default=1, description="Number of images to generate (1-4)", ge=1, le=4 ) # Removed: ASPECT_RATIO Valve as the API endpoint does not support it REQUEST_TIMEOUT: int = Field( default=120, description="Request timeout in seconds" ) def __init__(self): self.valves = self.Valves() self.emitter: Optional[Callable[[dict], Awaitable[None]]] = None async def emit_status( self, message: str, done: bool = False, show_in_chat: bool = False ): """Emit status updates to the client.""" if self.emitter: await self.emitter( {"type": "status", "data": {"description": message, "done": done}} ) if show_in_chat: if done: return f"**✅ {message}**\n\n" else: return f"**⏳ {message}**\n\n" return "" async def pipes(self) -> List[dict]: return [ { "id": "gemini-imagen-3-gen-lang", "name": "Gemini Imagen 3 (Gen. Lang. API)", } ] async def pipe( self, body: dict, __event_emitter__: Optional[Callable[[dict], Awaitable[None]]] = None, **kwargs, ) -> StreamingResponse: self.emitter = __event_emitter__ # Configuration from self.valves api_base_url = self.valves.API_BASE_URL.rstrip("/") api_key = self.valves.GEMINI_API_KEY model_id = self.valves.MODEL request_timeout = self.valves.REQUEST_TIMEOUT # Removed: ALLOWED_ASPECT_RATIOS list # --- Generative Language API Endpoint Path --- endpoint_path = f"/models/{model_id}:predict" api_url = f"{api_base_url}{endpoint_path}" # --- End Endpoint Path --- # --- Headers --- headers = {"Content-Type": "application/json", "x-goog-api-key": api_key} # --- End Headers --- async def stream_response(): try: messages = body.get("messages", []) is_stream = body.get("stream", False) prompt = self._extract_prompt(messages) # Get image count from function call body, fall back to Valve requested_image_count = body.get( "number_of_images", self.valves.IMAGE_COUNT ) image_count_to_use = min(max(requested_image_count, 1), 4) # Removed: Aspect ratio handling as API does not support it # Check for API key from Valves if not api_key: yield self._format_data( is_stream=is_stream, content="Error: Gemini API key not provided in Pipe Valves. Please configure your API key.", ) await self.emit_status("❌ API Key missing", True, True) return await self.emit_status("🖼️ Preparing image generation...") # --- Construct the JSON payload (Matching images.py) --- json_data = { "instances": {"prompt": prompt}, "parameters": { "sampleCount": image_count_to_use, "outputOptions": {"mimeType": "image/png"}, # Removed: "size" parameter as API does not support it }, } # --- End JSON Payload --- response = None async with httpx.AsyncClient(timeout=request_timeout) as client: print(f"Attempt 1/1: Trying Gen. Lang. API URL: {api_url}") print(f"Attempt 1 Request JSON: {json.dumps(json_data)}") print(f"Attempt 1 Headers: {headers}") try: await self.emit_status( f"🔄 Generating image... This may take a moment." ) response = await client.post( api_url, json=json_data, headers=headers ) if response.status_code != 200: error_message = f"Error from Gen. Lang. API: {response.status_code} - {response.text}" error_status = await self.emit_status( f"❌ Image generation failed", True, True ) print( f"Gen. Lang. API call failed with status {response.status_code}. Response: {response.text[:500]}..." ) yield self._format_data( is_stream=is_stream, content=f"{error_status}{error_message}", ) return else: print(f"Gen. Lang. API call successful with status 200.") except httpx.RequestError as e: logging.error( f"Gen. Lang. API Request Error for URL: {api_url} - {e}" ) await self.emit_status( f"❌ Gen. Lang. API Request Error", True, True ) yield self._format_data( is_stream=is_stream, content=f"❌ Gen. Lang. API Request Error: {e}", ) return try: response_data = response.json() except json.JSONDecodeError: error_message = f"Error: Received non-JSON response from Gen. Lang. API. Status: {response.status_code}, Content: {response.text[:500]}..." error_status = await self.emit_status( f"❌ Invalid API response format", True, True ) yield self._format_data( is_stream=is_stream, content=f"{error_status}{error_message}" ) print( f"Failed to parse JSON response. Status: {response.status_code}, Content: {response.text[:500]}..." ) return await self.emit_status("✅ Image processing complete!", True) image_markdown = [] # --- Response Parsing (Based on images.py) --- predictions = response_data.get("predictions", []) for i, prediction in enumerate(predictions): if "bytesBase64Encoded" in prediction: b64_data = prediction.get("bytesBase64Encoded") mime_type = "image/png" if b64_data: image_markdown.append( f"" ) # Kept original parsing logic as fallback/alternative formats candidates = response_data.get("candidates", []) for i, candidate in enumerate(candidates): content = candidate.get("content", {}) parts = content.get("parts", []) for part in parts: if ( part.get("inlineData", {}) .get("mimeType", "") .startswith("image/") ): b64_data = part.get("inlineData", {}).get("data", "") if b64_data: mime_type = part.get("inlineData", {}).get( "mimeType", "image/png" ) image_markdown.append( f"" ) images_data_key = response_data.get("data", []) for i, img in enumerate(images_data_key): if "b64_json" in img: image_markdown.append( f"" ) elif "url" in img: image_markdown.append(f"") images_direct_key = response_data.get("images", []) for i, img in enumerate(images_direct_key): if "base64" in img: image_markdown.append( f"" ) elif "url" in img: image_markdown.append(f"") content = "\n\n".join(image_markdown) if not image_markdown: content = f"No images were found in the response. Raw response: ```json\n{json.dumps(response_data, indent=2)}\n```" await self.emit_status( f"⚠️ No images found in response.", True, True ) else: await self.emit_status( f"✨ Image generation successful!", True, True ) if is_stream: yield self._format_data( is_stream=True, model=model_id, content=content, ) yield self._format_data( is_stream=True, model=model_id, content=None ) else: yield self._format_data( is_stream=False, model=model_id, content=content ) except Exception as err: error_status = await self.emit_status( f"❌ An unexpected error occurred", True, True ) yield self._format_data( is_stream=body.get("stream", False), content=f"{error_status}Error processing image request: {str(err)}", ) return StreamingResponse(stream_response()) def _extract_prompt(self, messages: List[Dict[str, Any]]) -> str: """Extract prompt from messages.""" prompt = "" for message in reversed(messages): if message.get("role") != "user": continue content = message.get("content", "") if isinstance(content, list): for item in content: if item.get("type") == "text": prompt += item.get("text", "") + " " elif isinstance(content, str): prompt += content break return prompt.strip() def _format_data( self, is_stream: bool, model: str = "", content: Optional[str] = "", usage: Optional[dict] = None, ) -> str: """Format the response data in the expected OpenAI-compatible format.""" response_id = f"chat.{uuid.uuid4().hex}" object_type = "chat.completion.chunk" if is_stream else "chat.completion" created_time = int(time.time()) data = { "id": response_id, "object": object_type, "created": created_time, "model": model, } choices = [] if content is not None: choice_content = {"role": "assistant", "content": content} if is_stream: choice_delta = choice_content choice_message = None else: choice_delta = None choice_message = choice_content choices.append( { "finish_reason": ( "stop" if not is_stream and content is not None else None ), "index": 0, "delta": choice_delta, "message": choice_content if not is_stream else choice_delta, } ) elif is_stream and content is None: choices.append( { "finish_reason": "stop", "index": 0, "delta": {}, "message": None, } ) data["choices"] = choices if usage: data["usage"] = usage if is_stream: return f"data: {json.dumps(data)}\n\n" else: return json.dumps(data) # --- Open WebUI Entry Point --- pipe = Pipe()