Whitepaper
Docs
Sign In
Function
Function
pipe
v0.1
Dynamic Vision Pipe
Function ID
dynamic_vision_pipe
Creator
@soeplepel
Downloads
135+
Route any attached image to describe and use its own prompt to answer
Get
README
No README available
Function Code
Show
""" title: Dynamic Vision Pipe author: Soeplepel author_url: None version: 0.1 credits to @hub and @Haervwe """ import logging import json import asyncio from typing import Dict, List, Callable, Awaitable, Union, Optional from pydantic import BaseModel, Field from dataclasses import dataclass from open_webui.constants import TASKS from open_webui.main import generate_chat_completions from open_webui.utils.misc import get_last_user_message_item name = "Dynamic_Vision_Pipe" # Changed name def setup_logger(): logger = logging.getLogger(name) if not logger.handlers: logger.setLevel(logging.DEBUG) handler = logging.StreamHandler() handler.set_name(name) formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) handler.setFormatter(formatter) logger.addHandler(handler) logger.propagate = False return logger logger = setup_logger() @dataclass class User: id: str email: str name: str role: str class Pipe: __current_event_emitter__: Callable[[dict], Awaitable[None]] __user__: User __model__: str class Valves(BaseModel): OutputModel: str = Field(default="", description="Model for output generation") OutputModelSystemMessage: str = Field( default="", description="System message for output model" ) VisionModelId: str = Field( default="", description="Vision Model ID to describe image. Note: Compatibility is provider-specific.", ) VisionModelSystemMessage: str = Field( # ADDED VALVE default="", description="System message for the vision model to guide image description.", ) Temperature: float = Field(default=1, description="Models temperature") Top_k: int = Field(default=50, description="Models top_k") Top_p: float = Field(default=0.8, description="Models top_p") def __init__(self): self.type = "manifold" self.conversation_history = [] self.valves = self.Valves() def pipes(self) -> list[dict[str, str]]: return [{"id": f"{name}-pipe", "name": f"{name} Pipe"}] async def get_streaming_completion( self, messages, model: str, top_k: int = 50, top_p: float = 0.9, ): try: form_data = { "model": model, "messages": messages, "stream": True, "temperature": self.valves.Temperature, "top_k": self.valves.Top_k, "top_p": self.valves.Top_p, } response = await generate_chat_completions( self.__request__, form_data, user=self.__user__, ) if not hasattr(response, "body_iterator"): raise ValueError("Response does not support streaming") async for chunk in response.body_iterator: for part in self.get_chunk_content(chunk): yield part except Exception as e: raise RuntimeError(f"Streaming completion failed: {e}") def get_chunk_content(self, chunk): chunk_str = chunk.decode("utf-8") if isinstance(chunk, bytes) else chunk if chunk_str.startswith("data: "): chunk_str = chunk_str[6:] chunk_str = chunk_str.strip() if chunk_str == "[DONE]" or not chunk_str: return try: chunk_data = json.loads(chunk_str) if "choices" in chunk_data and len(chunk_data["choices"]) > 0: delta = chunk_data["choices"][0].get("delta", {}) if "content" in delta: yield delta["content"] except json.JSONDecodeError: logger.error(f'ChunkDecodeError: unable to parse "{chunk_str[:100]}"') async def emit_message(self, message: str): await self.__current_event_emitter__( {"type": "message", "data": {"content": message}} ) async def emit_status(self, level: str, message: str, done: bool): await self.__current_event_emitter__( { "type": "status", "data": { "status": "complete" if done else "in_progress", "level": level, "description": message, "done": done, }, } ) async def emit_model_title(self, model_name: str): """Helper function to emit the model title with a separator.""" await self.emit_message(f"\n\n---\n\n**{model_name}:**\n\n") async def pipe( self, body: dict, __user__: dict, __event_emitter__=None, __task__=None, __model__=None, __request__=None, ) -> str: self.__current_event_emitter__ = __event_emitter__ self.__user__ = User(**__user__) self.__model__ = __model__ # Store the default model self.__request__ = __request__ if ( __task__ == TASKS.TITLE_GENERATION or __task__ == TASKS.TAGS_GENERATION ): # for title and tags, use output model. output_model = self.valves.OutputModel or self.__model__ response = await generate_chat_completions( self.__request__, { "model": output_model, "messages": body.get("messages"), "stream": False, }, user=self.__user__, ) return f"{name}: {response['choices'][0]['message']['content']}" messages = body.get("messages") if messages is None: return body # Handle no messages user_message = get_last_user_message_item(messages) if user_message is None: return body # Handle no user message has_images = user_message.get("images") is not None if not has_images: user_message_content = user_message.get("content") if user_message_content is not None and isinstance( user_message_content, list ): has_images = any( item.get("type") == "image_url" for item in user_message_content ) original_user_message_content = user_message.get( "content", [] ) # Get content as list original_user_text_question = "" # Initialize as empty string if isinstance( original_user_message_content, list ): # Check if content is a list for item in original_user_message_content: if item.get("type") == "text": original_user_text_question += item.get( "text", "" ) # Concatenate text parts final_user_message_content = ( original_user_text_question # Initialize with original TEXT question ) if has_images: vision_model_id = self.valves.VisionModelId if vision_model_id: await self.emit_status( "info", f"Describing image using vision model: {vision_model_id}...", False, ) vision_messages = [] if self.valves.VisionModelSystemMessage: # ADDED SYSTEM MESSAGE HERE vision_messages.append( { "role": "system", "content": self.valves.VisionModelSystemMessage, } ) vision_messages.append( { "role": "user", "content": user_message.get( "content" ), # Pass the original complex content to vision model "images": user_message.get("images"), } ) try: vision_response_content = "" async for chunk in self.get_streaming_completion( vision_messages, model=vision_model_id ): vision_response_content += chunk final_user_message_content = f"Image Description: {vision_response_content.strip()}\n\nUser Question: {original_user_text_question}" # SIMPLIFIED CONTENT - NOW WITH EXTRACTED TEXT QUESTION await self.emit_status("info", f"Image description received.", True) except Exception as e: await self.emit_status( "error", f"Error getting vision description: {e}", True ) final_user_message_content = f"Image Description: Error describing image.\n\nUser Question: {original_user_text_question}" # Fallback message - simplified, with EXTRACTED text question else: await self.emit_status( "warning", "Vision Model ID not configured, cannot describe image.", True, ) final_user_message_content = f"Image Description: No vision model configured.\n\nUser Question: {original_user_text_question}" # Fallback message - simplified, with EXTRACTED text question if ( not original_user_text_question ): # if no original question, just use simple fallback final_user_message_content = "No vision model configured." output_model = self.valves.OutputModel output_system_message = self.valves.OutputModelSystemMessage if not output_model: await self.emit_status("error", "Output Model not configured.", True) return "Output Model not configured." messages_for_output_model = [ {"role": "system", "content": output_system_message}, { "role": "user", "content": final_user_message_content, # MODIFIED CONTENT HERE - simplified user message with EXTRACTED TEXT QUESTION }, # Use the description and ORIGINAL TEXT QUESTION ] logger.debug( f"Messages for output model: {messages_for_output_model}" ) # ADDED LOGGING HERE await self.emit_status( "info", f"Getting response from output model: {output_model}...", False ) try: full_response = "" async for chunk in self.get_streaming_completion( messages_for_output_model, model=output_model ): full_response += chunk await self.emit_message(chunk) cleaned_response = full_response.strip() self.conversation_history.append( {"role": "assistant", "content": cleaned_response} ) await self.emit_status("success", "Response generated.", True) except Exception as e: await self.emit_status( "error", f"Error getting response from output model {output_model}: {e}", True, ) return f"Error getting response from output model {output_model}: {e}" return ""