Whitepaper
Docs
Sign In
Function
Function
pipe
v0.3.0
Gemini 2.0 Flash Native Image Gen
Function ID
gemini_2_0_flash_native_image_gen
Creator
@jscheah
Downloads
526+
Gemini 2.0 Flash Experimental with native image generation capabilities.
Get
README
No README available
Function Code
Show
""" title: Gemini 2.0 Flash Native Image Gen author: jscheah author_url: https://github.com/cheahjs version: 0.3.0 license: MIT """ # 0.3.0 Changelog: # - Images are now uploaded to the server and the URL used in the response, this reduces the size of the message # 0.2.0 Changelog: # - Remap generated images back into images so that conversations work import io import mimetypes import os import json import traceback from fastapi import Request, UploadFile from open_webui.models.users import UserModel, Users from open_webui.routers.files import get_file_content_by_id, upload_file import requests import uuid import time from pydantic import BaseModel, Field from typing import List, Union, Iterator import base64 class Pipe: class Valves(BaseModel): GOOGLE_API_KEY: str = Field(default="") def __init__(self): self.id = "gemini_flash_native_image_gen" self.type = "manifold" self.name = "Google: " self.valves = self.Valves( **{ "GOOGLE_API_KEY": os.getenv("GOOGLE_API_KEY", ""), "USE_PERMISSIVE_SAFETY": False, } ) def pipes(self) -> List[dict]: return [ { "id": "gemini-2.0-flash-exp-image-generation", "name": "Gemini 2.0 Flash Experimental (Image Generation)", } ] def upload_image( self, __request__: Request, user: UserModel, image_data: str, mime_type: str ) -> str: image_format = mimetypes.guess_extension(mime_type) file = UploadFile( file=io.BytesIO(base64.b64decode(image_data)), filename=f"generated-image{image_format}", # will be converted to a unique ID on upload_file headers={ "content-type": mime_type, }, ) file_item = upload_file( __request__, file, user, file_metadata={"mime_type": mime_type} ) image_url = __request__.app.url_path_for( "get_file_content_by_id", id=file_item.id ) return f"" async def get_image_content(self, user: UserModel, markdown_string: str) -> dict: if "![Gemini Generated Image" not in markdown_string: return {"text": markdown_string} # Check for legacy image format if "data:" in markdown_string: try: data_url = markdown_string[ markdown_string.index("(data:") + 6 : markdown_string.rindex(")") ] mime_type = data_url[: data_url.index(";")] image_data = data_url[data_url.index(",") + 1 :] return { "inline_data": { "mime_type": mime_type, "data": image_data, } } except Exception: # If parsing fails, treat it as regular text return {"text": markdown_string} # Extract the ID from the markdown string from  id = markdown_string.split("id=")[1].split("]")[0] file_response = await get_file_content_by_id(id, user) # Read file_response.path as bytes with open(file_response.path, "rb") as file: file_content = file.read() file_content = base64.b64encode(file_content).decode("utf-8") return { "inline_data": { "mime_type": file_response.headers["content-type"], "data": file_content, } } async def pipe( self, body: dict, __user__: dict, __request__: Request, ) -> Union[str, Iterator[str]]: if not self.valves.GOOGLE_API_KEY: return "Error: GOOGLE_API_KEY is not set" try: user = Users.get_user_by_id(__user__["id"]) headers = { "Content-Type": "application/json", } model_id: str = body["model"] model_id = model_id.split(".", maxsplit=1)[1] messages: list[dict] = body["messages"] stream = body.get("stream", False) system_message: str | None = next( (msg["content"] for msg in messages if msg["role"] == "system"), None ) contents = [] for message in messages: if message["role"] == "system": continue if isinstance(message.get("content"), str): message["content"] = [{"type": "text", "text": message["content"]}] if isinstance(message.get("content"), list): for content in message["content"]: if content["type"] == "text": # Check if content contains markdown image data URLs text = content["text"] parts = [] # Split content by newlines to find Gemini Generated Images segments = text.split("\n") current_text = [] for segment in segments: # Check if segment is a Gemini Generated Image if segment.startswith("![Gemini Generated Image"): # If there's accumulated text, add it first if current_text: parts.append({"text": "\n".join(current_text)}) current_text = [] parts.append( await self.get_image_content(user, segment) ) else: current_text.append(segment) # Add any remaining text if current_text: parts.append({"text": "\n".join(current_text)}) contents.append( { "role": ( "user" if message["role"] == "user" else "model" ), "parts": parts, } ) elif content["type"] == "image_url": parts = [] image_url = content["image_url"]["url"] if image_url.startswith("data:image"): mime_type = image_url.split(":")[1].split(";")[0] image_data = image_url.split(",")[1] parts.append( { "inline_data": { "mime_type": mime_type, "data": image_data, } } ) else: parts.append({"image_url": image_url}) contents.append( { "role": ( "user" if message["role"] == "user" else "model" ), "parts": parts, } ) if system_message: contents.insert( 0, {"role": "user", "parts": [{"text": f"{system_message}"}]}, ) generation_config = { "temperature": body.get("temperature", 0.7), "top_p": body.get("top_p", 0.9), "top_k": body.get("top_k", 40), "max_output_tokens": body.get("max_tokens", 8192), "stop_sequences": body.get("stop", []), "responseModalities": ["text", "image"], } if stream: def stream_generator(): with requests.post( f"https://generativelanguage.googleapis.com/v1beta/models/{model_id}:streamGenerateContent?alt=sse&key={self.valves.GOOGLE_API_KEY}", headers=headers, json={ "contents": contents, "generation_config": generation_config, }, ) as response: for line in response.iter_lines(): if not line: continue line: str = line.decode("utf-8") if not line.startswith("data: "): continue data = json.loads(line.lstrip("data: ")) # Transform Gemini output into OpenAI compatible output try: candidates = data.get("candidates", []) if len(candidates) > 0: candidate = candidates[0] # Initialize response chunk chunk = { "id": "chat" + str(uuid.uuid4()), "object": "chat.completion.chunk", "created": int(time.time()), "model": model_id, "choices": [ { "index": 0, "delta": {}, "finish_reason": None, } ], } # Handle content if "content" in candidate: parts = candidate["content"].get("parts", []) for part in parts: if "text" in part: chunk["choices"][0]["delta"][ "content" ] = part["text"] elif "inlineData" in part: # Handle inline image data mime_type = part["inlineData"][ "mimeType" ] image_data = part["inlineData"]["data"] markdown_image = self.upload_image( __request__, user, image_data, mime_type, ) # Format as markdown image with double newlines markdown_image = ( f"\n\n{markdown_image}\n\n" ) chunk["choices"][0]["delta"][ "content" ] = markdown_image # Handle finish reason if "finishReason" in candidate: finish_reason = candidate[ "finishReason" ].lower() if finish_reason == "stop": chunk["choices"][0][ "finish_reason" ] = "stop" elif finish_reason == "max_tokens": chunk["choices"][0][ "finish_reason" ] = "length" elif finish_reason in [ "safety", "recitation", "blocklist", "prohibited_content", "spii", ]: chunk["choices"][0][ "finish_reason" ] = "content_filter" else: chunk["choices"][0][ "finish_reason" ] = finish_reason # Handle usage if present if "usageMetadata" in data: chunk["usage"] = data["usageMetadata"] yield chunk except Exception as e: yield f"Error: {str(e)}" return stream_generator() else: # Handle non-streaming response response = requests.post( f"https://generativelanguage.googleapis.com/v1beta/models/{model_id}:generateContent?key={self.valves.GOOGLE_API_KEY}", headers=headers, json={ "contents": contents, "generation_config": generation_config, }, ) try: data = response.json() # Transform Gemini output into OpenAI compatible output completion_response = { "id": "chat" + str(uuid.uuid4()), "object": "chat.completion", "created": int(time.time()), "model": model_id, "choices": [], "usage": data.get("usageMetadata", {}), } candidates = data.get("candidates", []) for idx, candidate in enumerate(candidates): choice = { "index": idx, "message": {"role": "assistant", "content": ""}, "finish_reason": None, } # Handle content if "content" in candidate: content_parts = [] parts = candidate["content"].get("parts", []) for part in parts: if "text" in part: content_parts.append(part["text"]) elif "inlineData" in part: # Handle inline image data mime_type = part["inlineData"]["mimeType"] image_data = part["inlineData"]["data"] markdown_image = self.upload_image( __request__, user, image_data, mime_type ) # Format as markdown image with double newlines markdown_image = f"\n\n{markdown_image}\n\n" content_parts.append(markdown_image) choice["message"]["content"] = "".join(content_parts) # Handle finish reason if "finishReason" in candidate: finish_reason = candidate["finishReason"].lower() if finish_reason == "stop": choice["finish_reason"] = "stop" elif finish_reason == "max_tokens": choice["finish_reason"] = "length" elif finish_reason in [ "safety", "recitation", "blocklist", "prohibited_content", "spii", ]: choice["finish_reason"] = "content_filter" else: choice["finish_reason"] = finish_reason completion_response["choices"].append(choice) return json.dumps(completion_response) except Exception as e: return f"Error processing response: {str(e)}" except Exception as e: # print traceback traceback.print_exc() return f"Error: {e}"