Whitepaper
Docs
Sign In
Function
Function
pipe
v0.1.0
Google Gemini 2.0 Flash Image Generation
Function ID
google_gemini_20_flash_image_generation
Creator
@myooooo
Downloads
192+
Generates image using Gemini 2.0 Flash with Google Genai SDK, stores into static folder and embed into chat
Get
README
No README available
Function Code
Show
""" title: Google Gemini 2.0 Flash Image Generation author: Myooooo author_url: https://github.com/Myooooo date: 2025-04-11 version: 0.1.0 license: MIT description: Generates image using Gemini 2.0 Flash with Google Genai SDK, stores into static folder and embed into chat. requirement: google-genai """ from typing import Iterator, Union from pydantic import BaseModel, Field from google import genai from google.genai import types import json from PIL import Image from io import BytesIO import os import re import base64 import secrets import string from pathlib import Path from open_webui.config import STATIC_DIR DEBUG = False class Pipe: class Valves(BaseModel): GEMINI_API_KEY: str = Field( default="YOUR_API_KEY", description="GEMINI API KEY", ) GEMINI_API_MODEL: str = Field( default="gemini-2.0-flash-exp-image-generation", description="Model Name, default to gemini-2.0-flash-exp-image-generation", ) DOMAIN_NAME: str = Field( default="localhost", description="Your Domain Name", ) def __init__(self): self.valves = self.Valves() self.id = "google_genai" self.type = "manifold" self.name = "Google/" self.client = None self.safety_settings = [ types.SafetySetting( category="HARM_CATEGORY_HATE_SPEECH", threshold="BLOCK_NONE" ), types.SafetySetting( category="HARM_CATEGORY_SEXUALLY_EXPLICIT", threshold="BLOCK_NONE" ), types.SafetySetting( category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="BLOCK_NONE" ), types.SafetySetting( category="HARM_CATEGORY_HARASSMENT", threshold="BLOCK_NONE" ), types.SafetySetting( category="HARM_CATEGORY_CIVIC_INTEGRITY", threshold="BLOCK_NONE" ), ] self.IMAGE_STATIC_DIR = Path(STATIC_DIR).joinpath("./image/gemini/") self.IMAGE_STATIC_DIR.mkdir(parents=True, exist_ok=True) def txt2img(self, contents, config, user_id): # create user path user_path = self.IMAGE_STATIC_DIR.joinpath(f"./{user_id}") user_path.mkdir(parents=True, exist_ok=True) response = self.client.models.generate_content( model=self.valves.GEMINI_API_MODEL, contents=contents, config=config, ) texts = [] image_paths = [] for part in response.candidates[0].content.parts: if part.text is not None: texts.append(part.text) elif part.inline_data is not None: # generate random string nonce and use hashed prompt as image name nonce = "".join( secrets.choice(string.ascii_letters + string.digits) for _ in range(4) ) contents_str = json.dumps(contents, sort_keys=True) image_name = f"{hash(contents_str + nonce)}.png" image_path = os.path.join(user_path, image_name) try: image = Image.open(BytesIO((part.inline_data.data))) image.save(image_path) image_paths.append( f"{self.valves.DOMAIN_NAME}/static/image/gemini/{user_id}/{image_name}" ) except Exception as e: if DEBUG: print(f"Error processing image data: {e}") continue return texts, image_paths def pipes(self): return [ { "id": self.valves.GEMINI_API_MODEL, "name": self.valves.GEMINI_API_MODEL, } ] def pipe(self, body: dict, __user__: dict) -> Union[str, Iterator[str]]: # check api key if not self.valves.GEMINI_API_KEY: return "Error: GOOGLE_API_KEY is not set" # initialize client self.client = genai.Client(api_key=self.valves.GEMINI_API_KEY) try: model_id = body["model"] messages = body["messages"] # stream = body.get("stream", False) stream = False if DEBUG: print("\nIncoming body:\n", str(body)) print("User:", __user__["id"]) system_message = next( (msg["content"] for msg in messages if msg["role"] == "system"), None ) contents = [] url_pattern = r"!\[image]\((.*?)\)" for message in messages: if message["role"] != "system": if isinstance(message.get("content"), list): parts = [] for content in message["content"]: # parse text content if content["type"] == "text": # extract image urls urls = re.findall(url_pattern, content["text"]) # remove url from text and append to parts new_text = re.sub(url_pattern, "", content["text"]) parts.append({"text": new_text}) for url in urls: # form local paths image_name = os.path.basename(url) user_path = self.IMAGE_STATIC_DIR.joinpath( f'./{__user__["id"]}' ) image_path = os.path.join(user_path, image_name) # read image into base64 with open(image_path, "rb") as image_file: image_data = base64.b64encode( image_file.read() ).decode("utf-8") # append image to parts parts.append( { "inline_data": { "mime_type": "image/png", "data": image_data, } } ) # parse image content elif content["type"] == "image_url": image_url = content["image_url"]["url"] if image_url.startswith("data:image"): image_data = image_url.split(",")[1] parts.append( { "inline_data": { "mime_type": "image/jpeg", "data": image_data, } } ) else: parts.append({"image_url": image_url}) contents.append({"role": message["role"], "parts": parts}) else: parts = [] # extract image urls urls = re.findall(url_pattern, message["content"]) # remove url from text and append to parts new_text = re.sub(url_pattern, "", message["content"]) parts.append({"text": new_text}) for url in urls: # form local paths image_name = os.path.basename(url) user_path = self.IMAGE_STATIC_DIR.joinpath( f'./{__user__["id"]}' ) image_path = os.path.join(user_path, image_name) # read image into base64 with open(image_path, "rb") as image_file: image_data = base64.b64encode(image_file.read()).decode( "utf-8" ) # append image to parts parts.append( { "inline_data": { "mime_type": "image/png", "data": image_data, } } ) contents.append( { "role": ( "user" if message["role"] == "user" else "model" ), "parts": parts, } ) if system_message: contents.insert( 0, {"role": "user", "parts": [{"text": f"System: {system_message}"}]}, ) config = types.GenerateContentConfig( response_modalities=["Text", "Image"], safety_settings=self.safety_settings, temperature=body.get("temperature", 0.7), top_p=body.get("top_p", 0.9), top_k=body.get("top_k", 40), max_output_tokens=body.get("max_tokens", 8192), stop_sequences=body.get("stop", []), ) if DEBUG: print("\nGoogle API request:") print(" Model:", model_id) print(" Contents:", str(contents)) print(" Generation Config:", config) print(" Stream:", stream) if stream: pass else: texts, image_paths = self.txt2img(contents, config, __user__["id"]) response = "" if DEBUG: print("\nGoogle API response:") print(" Texts:", texts) print(" Image Paths:", image_paths) print(" Text Count:", len(texts)) print(" Image Count:", len(image_paths)) for i in range(max(len(texts), len(image_paths))): if i < len(texts): response += texts[i] + "\n" if i < len(image_paths): response += f"\n" return response except Exception as e: if DEBUG: print(f"Error generating content: {e}") return f"An error occurred: {str(e)}"