We're Hiring!
Whitepaper
Docs
Sign In
@olof
ยท
6 days ago
function
Visualizations
Get
Last Updated
6 days ago
Created
6 days ago
Function
filter
v1.0.0
Name
Visualizations
Downloads
28+
Saves
1+
Description
Chart, Generative Image, Fancy Text, Large Emoji, QR Code and Color Swatch via Markdown image URLs
Function Code
Show
""" title: Visualizations description: Chart, Generative Image, Fancy Text, Large Emoji, QR Code and Color Swatch via Markdown image URLs author: Olof Larsson version: 1.0.0 requirements: httpx, Pillow, async-lru, cachetools, vl-convert-python, aiohttp, pyhocon, regex, segno, replicate, openai, fal-client, psutil>=5.9.0 """ import asyncio import base64 import csv import hashlib import html import io import json import logging import mimetypes import os import platform import re import secrets import socket import threading import time from abc import ABC, abstractmethod from collections import defaultdict from collections.abc import Callable from functools import lru_cache from pathlib import Path from urllib.parse import unquote, urlparse import aiohttp import fal_client import httpx import openai import psutil import regex import replicate import segno import vl_convert from fastapi import Request from fastapi.responses import Response from open_webui.config import STATIC_DIR, WEBUI_URL from open_webui.env import AUDIT_EXCLUDED_PATHS, DATA_DIR from open_webui.main import app from PIL import Image, ImageDraw, ImageFont from pydantic import BaseModel, Field from pyhocon import ConfigFactory from starlette.routing import Route # ================================================================ # constants.py # ================================================================ INSTRUCTION_BASE_URL = (WEBUI_URL.value or "http://localhost:8080").rstrip("/") VISUALIZATIONS_DATA_DIR = Path(DATA_DIR) / "visualizations" VISUALIZATIONS_DATA_DIR.mkdir(parents=True, exist_ok=True) VISUALIZATION_CACHE_DIR = VISUALIZATIONS_DATA_DIR / "cache" VISUALIZATION_CACHE_DIR.mkdir(parents=True, exist_ok=True) # ================================================================ # utils_file_io.py # ================================================================ def get_string_from_file(filename: str) -> str: file_path = VISUALIZATIONS_DATA_DIR / filename if file_path.exists(): try: with open(file_path, encoding="utf-8") as f: content = f.read().strip() return content if content else "" except Exception: pass return "" def set_string_to_file(filename: str, content: str) -> None: file_path = VISUALIZATIONS_DATA_DIR / filename file_path.parent.mkdir(parents=True, exist_ok=True) with open(file_path, "w", encoding="utf-8") as f: f.write(content.strip()) # ================================================================ # utils_aspect.py # ================================================================ """ Utility functions for aspect ratio calculations and mappings. This module provides pure functions for: 1. Parsing aspect ratio strings 2. Calculating aspect ratios from dimensions 3. Finding closest matches in aspect maps 4. Formatting dimensions for APIs """ def parse_aspect_ratio(aspect_str: str) -> tuple[float, float] | None: """ Parse aspect ratio string like '16:9' or '1.78:1' into (width, height) tuple. Args: aspect_str: String representation of aspect ratio Returns: Tuple of (width, height) as floats, or None if parsing fails """ try: if ":" in aspect_str: parts = aspect_str.lower().split(":") if len(parts) == 2: width = float(parts[0]) height = float(parts[1]) if width > 0 and height > 0: return (width, height) except (ValueError, IndexError): pass return None def calculate_aspect_ratio(width: int, height: int) -> float: """ Calculate the aspect ratio from width and height. Args: width: Width in pixels height: Height in pixels Returns: Aspect ratio as float (width/height) """ if height == 0: raise ValueError("Height cannot be zero") return width / height def find_closest_aspect_ratio( target_ratio: float, aspect_map: dict[str, list[int]] ) -> str | None: """ Find the aspect ratio name with the closest match to the target ratio. Args: target_ratio: Target aspect ratio as float aspect_map: Map of aspect ratio names to [width, height] arrays Returns: Name of closest aspect ratio, or None if map is empty """ if not aspect_map: return None best_ratio_name = None best_distance = float("inf") for ratio_name, dimensions in aspect_map.items(): dim_width, dim_height = dimensions dim_ratio = calculate_aspect_ratio(dim_width, dim_height) distance = abs(target_ratio - dim_ratio) if distance < best_distance: best_distance = distance best_ratio_name = ratio_name return best_ratio_name def dimensions_to_size_string(width: int, height: int) -> str: """ Convert dimensions to 'wxh' size string for API calls. Args: width: Width in pixels height: Height in pixels Returns: Size string formatted as 'wxh' """ return f"{width}x{height}" def get_aspect_ratio_from_map( aspect_ratio: str, aspect_map: dict[str, list[int]] ) -> float: """ Get the actual aspect ratio from the aspect map. Args: aspect_ratio: Aspect ratio name aspect_map: Map of aspect ratio names to [width, height] arrays Returns: Actual aspect ratio as float, or default if not found """ if aspect_ratio in aspect_map: width, height = aspect_map[aspect_ratio] return calculate_aspect_ratio(width, height) # Fall back to parsing the aspect ratio string target_ratio = parse_aspect_ratio(aspect_ratio) if target_ratio: width, height = target_ratio return calculate_aspect_ratio(int(width), int(height)) # Default to first ratio in map if aspect_map: default_width, default_height = next(iter(aspect_map.values())) return calculate_aspect_ratio(default_width, default_height) raise ValueError("Empty aspect map provided") def resolve_aspect_ratio_to_size( aspect_ratio: str, aspect_map: dict[str, list[int]] ) -> str: """ Resolve aspect ratio to size string, finding closest match if needed. Args: aspect_ratio: Requested aspect ratio name aspect_map: Map of aspect ratio names to [width, height] arrays Returns: Size string formatted as 'wxh' """ # Direct match if aspect_ratio in aspect_map: width, height = aspect_map[aspect_ratio] return dimensions_to_size_string(width, height) # Parse and find closest match target_ratio = parse_aspect_ratio(aspect_ratio) if target_ratio: target_width, target_height = target_ratio target_value = calculate_aspect_ratio(target_width, target_height) closest_name = find_closest_aspect_ratio(target_value, aspect_map) if closest_name: closest_width, closest_height = aspect_map[closest_name] return dimensions_to_size_string(closest_width, closest_height) # Default fallback if aspect_map: default_width, default_height = next(iter(aspect_map.values())) return dimensions_to_size_string(default_width, default_height) raise ValueError("Cannot resolve aspect ratio: empty aspect map") def find_closest_aspect_ratio_name( target_ratio: tuple[float, float], aspect_map: dict[str, list[int]] ) -> str | None: """ Find the closest aspect ratio name from a parsed target ratio. Args: target_ratio: Tuple of (width, height) as floats aspect_map: Map of aspect ratio names to [width, height] arrays Returns: Name of closest aspect ratio, or None if no match found """ if not target_ratio or not aspect_map: return None target_width, target_height = target_ratio target_value = calculate_aspect_ratio(target_width, target_height) return find_closest_aspect_ratio(target_value, aspect_map) def get_default_aspect_ratio(aspect_map: dict[str, list[int]]) -> str: """ Get the default aspect ratio name from an aspect map. Args: aspect_map: Map of aspect ratio names to [width, height] arrays Returns: First aspect ratio name in the map """ if not aspect_map: raise ValueError("Cannot get default from empty aspect map") return next(iter(aspect_map.keys())) def get_default_dimensions(aspect_map: dict[str, list[int]]) -> list[int]: """ Get the default dimensions from an aspect map. Args: aspect_map: Map of aspect ratio names to [width, height] arrays Returns: First [width, height] array in the map """ if not aspect_map: raise ValueError("Cannot get default from empty aspect map") return next(iter(aspect_map.values())) # ================================================================ # image_generator_base.py # ================================================================ log = logging.getLogger(__name__) class ImageGenerator(ABC): @abstractmethod def get_api_token(self) -> str: pass @abstractmethod def _generate_image_impl(self, prompt: str, aspect_ratio: str) -> bytes: pass @abstractmethod def get_aspect_map(self) -> dict[str, list[int]]: pass def is_available(self) -> bool: return bool(self.get_api_token()) def generate_image(self, prompt: str, aspect_ratio: str) -> bytes: start_time = time.time() image_data = self._generate_image_impl(prompt, aspect_ratio) end_time = time.time() generation_time = end_time - start_time log.info( f"Generated image with {self.generator_name} (high quality, {aspect_ratio}) in {generation_time:.2f}s" ) return image_data # ================================================================ # utils_pass.py # ================================================================ def get_pass() -> str: return get_string_from_file("pass.txt") def set_pass(pass_value: str) -> None: set_string_to_file("pass.txt", pass_value) def get_or_create_pass() -> str: existing_pass = get_pass() if existing_pass: return existing_pass # Fall through to generate new pass new_pass = secrets.token_urlsafe(9) set_pass(new_pass) return new_pass VISUALIZATION_PASS = get_or_create_pass() # ================================================================ # utils_font_data.py # ================================================================ ROBOTO_SEMIBOLD_BASE64 = """""" ROBOTO_SEMIBOLD_BYTES = base64.b64decode(ROBOTO_SEMIBOLD_BASE64) # ================================================================ # image_generator_replicate.py # ================================================================ def get_replicate_api_token() -> str: return get_string_from_file("replicate-api-token.txt") def set_replicate_api_token(token: str) -> None: set_string_to_file("replicate-api-token.txt", token) class ReplicateImageGenerator(ImageGenerator): def get_api_token(self) -> str: return get_replicate_api_token() @property def generator_name(self) -> str: return "Replicate" def get_aspect_map(self) -> dict[str, list[int]]: """Return a map from aspect ratio names to [width, height] arrays based on 4K dimensions.""" return { "1:1": [4096, 4096], "4:3": [4096, 3072], "3:4": [3072, 4096], "16:9": [4096, 2304], "9:16": [2304, 4096], "3:2": [4096, 2731], "2:3": [2731, 4096], "21:9": [4096, 1755], } def _generate_image_impl(self, prompt: str, aspect_ratio: str) -> bytes: target_ratio = parse_aspect_ratio(aspect_ratio) aspect_map = self.get_aspect_map() actual_aspect_ratio = find_closest_aspect_ratio_name(target_ratio, aspect_map) token = self.get_api_token() if not token: raise ValueError("Replicate API token is not configured") client = replicate.Client(api_token=token) model_input = { "size": "4K", "enhance_prompt": True, "aspect_ratio": actual_aspect_ratio, "prompt": prompt, } output = client.run("bytedance/seedream-4", input=model_input) if output and len(output) > 0: return output[0].read() else: raise ValueError("No image generated from Replicate API") # ================================================================ # image_generator_openai.py # ================================================================ def get_openai_api_token() -> str: return get_string_from_file("openai-api-token.txt") def set_openai_api_token(token: str) -> None: set_string_to_file("openai-api-token.txt", token) class OpenAIImageGenerator(ImageGenerator): def __init__(self): self.client = None def get_api_token(self) -> str: return get_openai_api_token() @property def generator_name(self) -> str: return "OpenAI" def get_aspect_map(self) -> dict[str, list[int]]: """Return a map from aspect ratio names to [width, height] arrays.""" return { "1:1": [1024, 1024], "16:9": [1792, 1024], "9:16": [1024, 1792], } def _generate_image_impl(self, prompt: str, aspect_ratio: str) -> bytes: if not self.client: token = self.get_api_token() if not token: raise ValueError("OpenAI API token is not configured") self.client = openai.OpenAI(api_key=token) aspect_map = self.get_aspect_map() size = resolve_aspect_ratio_to_size(aspect_ratio, aspect_map) response = self.client.images.generate( model="dall-e-3", prompt=prompt, size=size, quality="hd", response_format="b64_json", n=1, ) if response.data and len(response.data) > 0: return base64.b64decode(response.data[0].b64_json) else: raise ValueError("No image generated from OpenAI API") # ================================================================ # image_generator_fal.py # ================================================================ def get_fal_api_token() -> str: return get_string_from_file("fal-api-token.txt") def set_fal_api_token(token: str) -> None: set_string_to_file("fal-api-token.txt", token) class FalImageGenerator(ImageGenerator): def __init__(self): self.client = None def get_api_token(self) -> str: return get_fal_api_token() @property def generator_name(self) -> str: return "fal.ai" def get_aspect_map(self) -> dict[str, list[int]]: """Return a map from aspect ratio names to [width, height] arrays based on fal.ai flux-pro model capabilities.""" return { "21:9": [3136, 1344], "16:9": [2752, 1536], "4:3": [2368, 1792], "3:2": [2496, 1664], "1:1": [2048, 2048], "2:3": [1664, 2496], "3:4": [1792, 2368], "9:16": [1536, 2752], "9:21": [1344, 3136], } def _generate_image_impl(self, prompt: str, aspect_ratio: str) -> bytes: token = self.get_api_token() if not token: raise ValueError("fal.ai API token is not configured") # Create a SyncClient with the API token client = fal_client.SyncClient(key=token) target_ratio = parse_aspect_ratio(aspect_ratio) aspect_map = self.get_aspect_map() actual_aspect_ratio = find_closest_aspect_ratio_name(target_ratio, aspect_map) # Prepare model input with your specified parameters model_input = { "prompt": prompt, "num_images": 1, "enable_safety_checker": False, "output_format": "jpeg", "safety_tolerance": "6", "aspect_ratio": actual_aspect_ratio, "raw": False, "enhance_prompt": False, } # Generate image using client.subscribe result = client.subscribe( "fal-ai/flux-pro/v1.1-ultra", arguments=model_input, with_logs=False, ) if result and "images" in result and len(result["images"]) > 0: # Get the first image URL and download it image_url = result["images"][0]["url"] # Download the image data response = httpx.get(image_url) response.raise_for_status() return response.content else: raise ValueError("No image generated from fal.ai API") # ================================================================ # visualization_class.py # ================================================================ class Visualization: def __init__( self, endpoint_path: str, endpoint_handler, instruction: str, enable_check: Callable | None = None, cacheable: bool = False, ): self.endpoint_path = endpoint_path self.endpoint_handler = endpoint_handler self.instruction = instruction self.enable_check = enable_check self.cacheable = cacheable def is_enabled(self) -> bool: if self.enable_check is None: return True try: return self.enable_check() except Exception: return True def get_full_url(self) -> str: """ Generate the full URL for this visualization with authentication. Returns: The complete URL with pass parameter (e.g., "http://localhost:8080/visualization/chart.png?pass=abc123") """ # Constants will be available in the combined file return f"{INSTRUCTION_BASE_URL}{self.endpoint_path}?pass={VISUALIZATION_PASS}" def get_instruction(self) -> str: """ Get the instruction with all placeholders replaced. Returns: The instruction with VISUALIZATION_URL placeholder replaced with the actual URL """ instruction = self.instruction full_url = self.get_full_url() return instruction.replace("<<VISUALIZATION_URL>>", full_url) def get_tag(self) -> str: """ Return the tag for this visualization type, derived from the endpoint path. Returns: The tag string for this visualization extracted from endpoint_path """ # Extract tag from endpoint_path, e.g., "/visualization/chart.png" -> "chart" filename = os.path.basename(self.endpoint_path) # Remove the extension to get the tag tag = os.path.splitext(filename)[0] return tag # ================================================================ # utils_api.py # ================================================================ log = logging.getLogger(__name__) # --- Caching Utilities --- generation_locks = defaultdict(threading.Lock) # Request utilities def get_query_param( request: Request, param_names: list[str], max_length: int = 3000 ) -> str: filtered_params = [ (k.lower(), v) for k, v in request.query_params.items() if k.lower() in [name.lower() for name in param_names] ] if not filtered_params: raise ValueError(f"A non-empty query parameter is required from: {param_names}") for preferred_name in param_names: for param_key, param_value in filtered_params: if param_key == preferred_name.lower(): if len(param_value) > max_length: raise ValueError( f"Input exceeds maximum length of {max_length} characters." ) return param_value raise ValueError( f"A query parameter must match one of the specified names: {param_names}" ) def verify_pass_parameter(request: Request) -> None: provided_pass = get_query_param(request, ["pass", "password"]) if provided_pass != VISUALIZATION_PASS: raise ValueError("Invalid pass parameter.") def create_authenticated_endpoint(visualization): async def wrapped_handler(request): try: verify_pass_parameter(request) filename = generate_filename_from_request(request) if visualization.cacheable: cache_file = VISUALIZATION_CACHE_DIR / filename if cache_file.exists(): log.info(f"Serving from cache: {cache_file}") with open(cache_file, "rb") as f: content_bytes = f.read() else: lock = generation_locks[filename] with lock: if cache_file.exists(): log.info(f"Serving from cache (after lock): {cache_file}") with open(cache_file, "rb") as f: content_bytes = f.read() else: log.info( f"Generating new content for {visualization.get_tag()}" ) content_bytes = await visualization.endpoint_handler( request ) with open(cache_file, "wb") as f: f.write(content_bytes) else: content_bytes = await visualization.endpoint_handler(request) media_type = get_media_type_from_extension(Path(filename).suffix) headers = { "Content-Disposition": f'inline; filename="{filename}"', "Cache-Control": "public, max-age=259200", } return Response( content=content_bytes, media_type=media_type, headers=headers ) except Exception as e: log.error(f"Failed to generate visualization: {e}", exc_info=True) return Response( content=f"Error: {e}", status_code=500, media_type="text/plain", ) return wrapped_handler def get_media_type_from_extension(ext: str) -> str: media_type, _ = mimetypes.guess_type(f"file{ext}") return media_type or "application/octet-stream" # Response utilities def generate_filename_from_request(request: Request) -> str: # Extract filename from URL path (e.g., "/visualization/chart.png" -> "chart") path = request.url.path filename_with_ext = path.split("/")[-1] # Get last part of path base_parts = filename_with_ext.split(".", 1) # Split on first dot basename = base_parts[0] extension = base_parts[1] if len(base_parts) > 1 else "" # Use all query params including pass for the cache key/filename hash sorted_params = sorted(request.query_params.items()) hash_input = "&".join([f"{k}={v}" for k, v in sorted_params]).encode("utf-8") cache_key = hashlib.sha256(hash_input).hexdigest() return ( f"{basename}-{cache_key}.{extension}" if extension else f"{basename}-{cache_key}" ) # FastAPI integration def add_or_replace_route(new_route: Route): app.router.routes = [ route for route in app.router.routes if getattr(route, "name", "") != new_route.name ] app.router.routes.insert(0, new_route) if new_route.path not in AUDIT_EXCLUDED_PATHS: AUDIT_EXCLUDED_PATHS.append(new_route.path) def register_visualization_endpoints(visualizations): for viz in visualizations: wrapped_handler = create_authenticated_endpoint(viz) add_or_replace_route( Route( viz.endpoint_path, wrapped_handler, methods=["GET"], name=viz.endpoint_path.replace("/", "").replace(".", "_"), ) ) # ================================================================ # utils_font.py # ================================================================ @lru_cache(maxsize=10) def get_font(size: int) -> ImageFont.FreeTypeFont: return ImageFont.truetype(io.BytesIO(ROBOTO_SEMIBOLD_BYTES), size=size) # ================================================================ # image_generator_all.py # ================================================================ image_generator_fal = FalImageGenerator() image_generator_replicate = ReplicateImageGenerator() image_generator_openai = OpenAIImageGenerator() all_image_generators = [ image_generator_fal, image_generator_replicate, image_generator_openai, ] def get_selected_image_generator_name() -> str: return get_string_from_file("selected-image-generator.txt") def set_selected_image_generator_name(provider: str) -> None: set_string_to_file("selected-image-generator.txt", provider) def get_image_generator() -> ImageGenerator | None: selected_provider = get_selected_image_generator_name() if selected_provider: for generator in all_image_generators: if ( selected_provider == generator.generator_name and generator.is_available() ): return generator return None # ================================================================ # visualization_qr_code.py # ================================================================ QR_CODE_INSTRUCTION = """ To create a QR code, use a Markdown image tag pointing to `<<VISUALIZATION_URL>>`. The data should be URL-encoded and passed as the `data` query parameter. **Usage Examples:**  """ def qr_code_generate_png_cached(data: str) -> bytes: qr = segno.make(data, error="M") buffer = io.BytesIO() qr.save(buffer, kind="png", scale=10) return buffer.getvalue() async def qr_code_endpoint_handler(request): data = get_query_param(request, ["data", "content", "url", "link", "text"]) png_data = await asyncio.to_thread(qr_code_generate_png_cached, data) return png_data qr_code_visualization = Visualization( endpoint_path="/visualization/qr-code.png", endpoint_handler=qr_code_endpoint_handler, instruction=QR_CODE_INSTRUCTION, ) # ================================================================ # visualization_meme.py # ================================================================ log = logging.getLogger(__name__) # ============================================================ # CONSTANTS # ============================================================ TARGET_WIDTH = 3000 BORDER_WIDTH_FACTOR = 0.005 FONT_SIZE_FACTOR = 0.03 TEXT_PADDING_FACTOR = 0.5 JPEG_QUALITY = 95 MAX_DOWNLOAD_SIZE = 30 * 1024 * 1024 # 30 MB DOWNLOAD_TIMEOUT = 10.0 # 10 seconds MEME_INSTRUCTION = """ To create a meme, you may use a Markdown image tag pointing to `<<VISUALIZATION_URL>>`. You must provide the `image_url` and `text` query parameters. The `image_url` should point to a public image on the web. The `text` is the caption that will be added to the bottom of the image. Examples:      """ # ============================================================ # URL HANDLING & DOWNLOADING # ============================================================ def meme_is_valid_url_structure(url: str) -> bool: try: result = urlparse(url) return all([result.scheme, result.netloc]) except Exception: return False def meme_create_url_candidates(url: str) -> list[str]: raw_candidates = [] base_url = INSTRUCTION_BASE_URL raw_candidates.append(url) if not url.startswith(("http://", "https://")): try: parsed = urlparse(url) path_segment = parsed.netloc + parsed.path if parsed.query: path_segment += f"?{parsed.query}" if path_segment and path_segment.lstrip("/"): raw_candidates.append(f"{base_url}/{path_segment.lstrip('/')}") except Exception: pass unique_candidates = list(dict.fromkeys(raw_candidates)) return [c for c in unique_candidates if meme_is_valid_url_structure(c)] async def meme_download_image(url: str) -> bytes: """ Downloads an image from the given URL (or candidates) with size and timeout limits. """ candidates = meme_create_url_candidates(url) last_exception = None async with httpx.AsyncClient( timeout=DOWNLOAD_TIMEOUT, follow_redirects=True ) as client: for candidate in candidates: try: # Stream the response to check size before downloading everything async with client.stream("GET", candidate) as response: response.raise_for_status() content_length = response.headers.get("Content-Length") if content_length and int(content_length) > MAX_DOWNLOAD_SIZE: log.warning( f"Image at {candidate} is too large ({content_length} bytes). Skipping." ) continue data = io.BytesIO() downloaded_size = 0 async for chunk in response.aiter_bytes(): downloaded_size += len(chunk) if downloaded_size > MAX_DOWNLOAD_SIZE: raise ValueError( f"Image exceeded maximum size of {MAX_DOWNLOAD_SIZE} bytes" ) data.write(chunk) return data.getvalue() except Exception as e: log.warning(f"Failed to download from {candidate}: {e}") last_exception = e raise ValueError( f"Could not download image from any candidate for URL: {url}. Last error: {last_exception}" ) # ============================================================ # IMAGE PROCESSING # ============================================================ def meme_resize_image(image: Image.Image, target_width: int) -> Image.Image: """Resizes an image to the target_width while maintaining aspect ratio.""" aspect_ratio = image.height / image.width target_height = int(target_width * aspect_ratio) return image.resize((target_width, target_height), Image.Resampling.LANCZOS) def meme_wrap_text( text: str, font: ImageFont.FreeTypeFont, max_width: int ) -> list[str]: """ Wraps text using a bottom-up sliding window approach. - First, a greedy wrap determines the initial line breaks. - Then, it iterates from the bottom up, merging and re-splitting pairs of lines to find a more optimal, balanced layout. """ # Handle single paragraphs first if "\n" not in text: # If the whole text fits, we're done. if font.getbbox(text)[2] <= max_width: return [text] # Start with a greedy wrap lines = meme_wrap_text_greedy_by_word(text, font, max_width) if len(lines) <= 1: return lines # Apply the bottom-up sliding window refinement for i in range(len(lines) - 2, -1, -1): # Merge the two adjacent lines combined_text = lines[i] + " " + lines[i + 1] # Find the best way to split this combined text best_split = meme_wrap_text_find_best_split(combined_text, font, max_width) if best_split: # Replace the original two lines with the new, optimized split lines[i], lines[i + 1] = best_split return lines # If there are multiple paragraphs, wrap each one individually final_lines = [] for paragraph in text.split("\n"): final_lines.extend(meme_wrap_text(paragraph, font, max_width)) return final_lines def meme_wrap_text_greedy_by_word( text: str, font: ImageFont.FreeTypeFont, max_width: int ) -> list[str]: """A simple greedy wrapper for edge cases where the main algorithm fails.""" lines = [] words = text.split() current_words = [] for word in words: test_line = " ".join(current_words + [word]) if font.getbbox(test_line)[2] <= max_width: current_words.append(word) else: if current_words: lines.append(" ".join(current_words)) current_words = [word] if current_words: lines.append(" ".join(current_words)) return lines def meme_wrap_text_find_best_split( text: str, font: ImageFont.FreeTypeFont, max_width: int ) -> tuple[str, str] | None: """ Finds the best single split point for a given string. - Prioritizes splits at sentence ends, then commas, then spaces. - Aims for the most balanced lines where the second line is not longer than the first. """ words = text.split() best_split = None best_score = -1 min_diff = float("inf") # Iterate backwards to find the best split point for i in range(len(words) - 1, 0, -1): line1 = " ".join(words[:i]) line2 = " ".join(words[i:]) width1 = font.getbbox(line1)[2] width2 = font.getbbox(line2)[2] # Rule: Both lines must fit. if width1 <= max_width and width2 <= max_width: diff = abs(width1 - width2) # Absolute difference for true balance # Score based on punctuation score = 0 last_word = words[i - 1] if last_word.endswith((".", "!", "?")): score = 2 elif last_word.endswith(","): score = 1 # A better punctuation is always preferred. # For the same punctuation level, the most balanced (smallest diff) is chosen. if score > best_score or (score == best_score and diff < min_diff): best_score = score min_diff = diff best_split = (line1, line2) return best_split def meme_create_image(image_bytes: bytes, image_text: str) -> bytes: """ Orchestrates the creation of the meme image. """ downloaded_image = Image.open(io.BytesIO(image_bytes)) base_image = meme_resize_image(downloaded_image, TARGET_WIDTH) base_width, base_height = base_image.size border_size = int(base_width * BORDER_WIDTH_FACTOR) # --- Text Layout Calculation --- font_size = int(base_width * FONT_SIZE_FACTOR) font = get_font(font_size) ascent, descent = font.getmetrics() line_height = ascent + descent padding = line_height * TEXT_PADDING_FACTOR max_text_width = base_width - padding wrapped_lines = meme_wrap_text(image_text, font, max_text_width) text_block_height = line_height * len(wrapped_lines) text_area_height = text_block_height + (2 * padding) # --- Final Image Creation --- final_width = base_width + (2 * border_size) final_height = int(base_height + (1 * border_size) + text_area_height) final_image = Image.new("RGB", (final_width, final_height), "black") final_image.paste(base_image, (border_size, border_size)) # --- Draw Wrapped Text --- draw = ImageDraw.Draw(final_image) text_area_y_start = base_height + (1 * border_size) current_y = text_area_y_start + (text_area_height - text_block_height) / 2 for i, line in enumerate(wrapped_lines): bbox = draw.textbbox((0, 0), line, font=font) text_width = bbox[2] - bbox[0] x_position = (final_width - text_width) / 2 draw.text((x_position, current_y), line, font=font, fill="white", anchor="la") current_y += line_height byte_arr = io.BytesIO() final_image.save(byte_arr, format="JPEG", quality=JPEG_QUALITY, optimize=True) return byte_arr.getvalue() # ============================================================ # ENDPOINT HANDLER # ============================================================ async def meme_endpoint_handler(request): image_url = get_query_param(request, ["image_url", "url", "src", "image"]) text = get_query_param(request, ["text", "caption", "title"]) image_bytes = await meme_download_image(image_url) final_image_bytes = meme_create_image(image_bytes, text) return final_image_bytes meme_visualization = Visualization( endpoint_path="/visualization/meme.jpeg", endpoint_handler=meme_endpoint_handler, instruction=MEME_INSTRUCTION, cacheable=True, ) # ================================================================ # visualization_large_emoji.py # ================================================================ LARGE_EMOJI_FONT_SIZE = 100 LARGE_EMOJI_INSTRUCTION = """ To express emotions with large emojis, use a Markdown image tag pointing to `<<VISUALIZATION_URL>>`. Each image can contain up to 5 emojis. Pass them as the `text` query parameter. **Example Emojis:** ๐ค๐คฏ๐คท๐๐ธ๐ดโ๏ธ๐ฅ๐คฎ๐คฌ๐ฉ๐คก๐๐ธ๐ช๐๐๐ค๐ง๐คจ๐ก๐ง ๐๐คฆ๐คทโโ๏ธ๐ฏ๐ช๐๐คฉ๐ฅณ๐๐โ ๐๐๐ **Usage Examples:**     """ def large_emoji_generate_svg(text: str) -> str: safe_text = html.escape(text) # --- Calculate SVG Dimensions --- # Correctly count grapheme clusters (user-perceived characters) emoji_count = len(regex.findall(r"\X", text)) padding = LARGE_EMOJI_FONT_SIZE * 0.2 # 20% padding svg_width = (emoji_count * LARGE_EMOJI_FONT_SIZE) + padding svg_height = LARGE_EMOJI_FONT_SIZE + padding # --- Adjust text position --- x_coordinate = padding / 2 y_coordinate = LARGE_EMOJI_FONT_SIZE # --- Create SVG --- return f""" <svg width="{svg_width}" height="{svg_height}" xmlns="http://www.w3.org/2000/svg"> <text x="{x_coordinate}" y="{y_coordinate}" font-family="'Apple Color Emoji', 'Segoe UI Emoji', 'Segoe UI Symbol', 'Noto Color Emoji', sans-serif" font-size="{LARGE_EMOJI_FONT_SIZE}px" fill="#000000"> {safe_text} </text> </svg> """.strip() async def large_emoji_endpoint_handler(request): text = get_query_param(request, ["text", "emoji", "emojis", "characters"]) svg_content = large_emoji_generate_svg(text) return svg_content.encode("utf-8") large_emoji_visualization = Visualization( endpoint_path="/visualization/large-emoji.svg", endpoint_handler=large_emoji_endpoint_handler, instruction=LARGE_EMOJI_INSTRUCTION, ) # ================================================================ # visualization_generative_image.py # ================================================================ GENERATIVE_IMAGE_WIDTH = 2560 GENERATIVE_IMAGE_HEIGHT = 1440 GENERATIVE_IMAGE_INSTRUCTION = """ To generate images using generative AI, use a Markdown image tag pointing to `<<VISUALIZATION_URL>>`. The prompt should be URL-encoded and passed as the `prompt` query parameter, with an optional `aspect_ratio` parameter. This can generate high-quality images from text descriptions. Use descriptive, detailed prompts for best results. **Aspect Ratio Guidelines:** - The aspect_ratio should be selected by picking the most suitable one for the image. - If the user suggests or implies an aspect ratio, then use that, or the closest valid one. - "3:4" should be used for upper body shots and similar. - "2:3" should be used for full body portraits and similar. - "21:9" for panoramas or ultra wide images. - "16:9" is otherwise the default recommended aspect ratio. - Supported aspect ratios include: 1:1, 4:3, 3:4, 16:9, 9:16, 3:2, 2:3, 21:9, and more **Prompt Enhancement Guidelines:** The prompt must abide by the following policies: 1. The prompt should be in English. If the image description was not in English, then translate it. 2. Always mention the image type (photo, oil painting, watercolor painting, illustration, cartoon, drawing, vector, render, etc.) in the beginning of the prompt. Unless the description suggests otherwise. 3. The prompt must intricately describe every part of the image in concrete, objective detail. THINK about what the end goal of the description is, and extrapolate that to what would make satisfying images. 4. The prompt should be a paragraph of text that is extremely descriptive and detailed. It should be more than 3 sentences long. 5. If the user requested modifications to a previous image, the prompt should not simply be longer, but rather it should be refactored to integrate the suggestions. **Usage Examples:**     """ def generative_image_generate_jpg(prompt: str, aspect_ratio: str) -> bytes: generator = get_image_generator() if generator is None: raise ValueError("No image generation provider is configured") return generator.generate_image(prompt, aspect_ratio) async def generative_image_endpoint_handler(request): # Get aspect ratio parameter try: aspect_ratio_param = get_query_param( request, ["aspect_ratio", "aspect", "aspectratio", "ar"] ) aspect_ratio = aspect_ratio_param.strip() except ValueError: aspect_ratio = "16:9" # Normal image generation flow prompt = get_query_param(request, ["prompt", "data", "content"]) jpg_data = await asyncio.to_thread( generative_image_generate_jpg, prompt, aspect_ratio ) return jpg_data def is_image_generation_configured() -> bool: return get_image_generator() is not None generative_image_visualization = Visualization( endpoint_path="/visualization/generative-image.jpg", endpoint_handler=generative_image_endpoint_handler, instruction=GENERATIVE_IMAGE_INSTRUCTION, enable_check=is_image_generation_configured, cacheable=True, ) # ================================================================ # visualization_fancy_text.py # ================================================================ FANCY_TEXT_FONT_SIZE = 40 FANCY_TEXT_FONT = get_font(FANCY_TEXT_FONT_SIZE) FANCY_TEXT_INSTRUCTION = """ To create a fancy text image, you may use a Markdown image tag pointing to `<<VISUALIZATION_URL>>`. The text should be URL-encoded and passed as the `text` query parameter. These images are self-sizing and will adapt to the length of your text. They are perfect for attention-grabbing headlines with just a few words. Think of them as a word-art effect of sorts. A bit cheesy, but it works. Example:  """ log = logging.getLogger(__name__) async def fancy_text_generate_svg_cached(text: str) -> str: safe_text = html.escape(text) font = FANCY_TEXT_FONT # --- Measure Text --- bbox = font.getbbox(safe_text) text_width = bbox[2] - bbox[0] text_height = bbox[3] - bbox[1] # --- Calculate SVG Dimensions --- stroke_width_main = max(1, round(FANCY_TEXT_FONT_SIZE / 7)) padding = stroke_width_main * 2 # Add "headroom" for emojis by adding twice the width of a capital 'A' headroom = font.getbbox("A")[2] * 2 svg_width = text_width + padding + headroom svg_height = text_height + padding # --- Adjust text position --- x_coordinate = -bbox[0] + (padding / 2) y_coordinate = padding / 2 log.info(f"Text coordinates x={x_coordinate}, y={y_coordinate}") # --- Embed Font --- font_face_rule = f""" @font-face {{ font-family: 'Roboto-SemiBold'; src: url('data:font/truetype;base64,{ROBOTO_SEMIBOLD_BASE64}'); }} """ # --- Create SVG --- return f""" <svg width="{svg_width}" height="{svg_height}" xmlns="http://www.w3.org/2000/svg"> <defs> <style>{font_face_rule}</style> <linearGradient id="grad" x1="0%" y1="0%" x2="100%" y2="100%"> <stop offset="0%" style="stop-color:#1e3a8a" /> <stop offset="100%" style="stop-color:#e94560" /> </linearGradient> <text id="t" x="{x_coordinate}" y="{y_coordinate}" dominant-baseline="hanging" font-family="Roboto-SemiBold, Arial, sans-serif" font-size="{FANCY_TEXT_FONT_SIZE}" font-weight="bold">{safe_text}</text> </defs> <use href="#t" fill="none" stroke="url(#grad)" stroke-width="{stroke_width_main}"/> <use href="#t" fill="none" stroke="white" stroke-width="{max(1, round(stroke_width_main / 2))}"/> <use href="#t" fill="url(#grad)"/> </svg> """.strip() async def fancy_text_endpoint_handler(request): text = get_query_param(request, ["text", "label", "string", "title"]) svg_content = await fancy_text_generate_svg_cached(text) return svg_content.encode("utf-8") fancy_text_visualization = Visualization( endpoint_path="/visualization/fancy-text.svg", endpoint_handler=fancy_text_endpoint_handler, instruction=FANCY_TEXT_INSTRUCTION, ) # ================================================================ # visualization_color_swatch.py # ================================================================ COLOR_SWATCH_SQUARE_SIZE = 1000 # Each color square is 1000x1000 pixels COLOR_SWATCH_MAX_COLORS_PER_ROW = 5 # Maximum 5 colors per row (5000 pixels wide total) COLOR_SWATCH_INSTRUCTION = """ To create color swatch images, use a Markdown image tag pointing to `<<VISUALIZATION_URL>>`. Colors should be URL-encoded and passed as comma-separated values in the `colors` query parameter. Supports hex colors (like #FF5733), RGB colors (like rgb(255,87,51)), and named colors (like red, blue). Example: """ def color_swatch_parse_colors(colors_param: str) -> list[str]: if not colors_param: return [] # URL decode and split by comma decoded = unquote(colors_param) colors = [color.strip() for color in decoded.split(",") if color.strip()] return colors def color_swatch_generate_svg(colors: list[str]) -> str: rows = ( len(colors) + COLOR_SWATCH_MAX_COLORS_PER_ROW - 1 ) // COLOR_SWATCH_MAX_COLORS_PER_ROW total_width = COLOR_SWATCH_SQUARE_SIZE * COLOR_SWATCH_MAX_COLORS_PER_ROW total_height = rows * COLOR_SWATCH_SQUARE_SIZE svg_parts = [ f'<svg width="{total_width}" height="{total_height}" xmlns="http://www.w3.org/2000/svg">' ] for row_index, color in enumerate(colors): col_index = row_index % COLOR_SWATCH_MAX_COLORS_PER_ROW actual_row = row_index // COLOR_SWATCH_MAX_COLORS_PER_ROW x = col_index * COLOR_SWATCH_SQUARE_SIZE y = actual_row * COLOR_SWATCH_SQUARE_SIZE svg_parts.append( f'<rect x="{x}" y="{y}" width="{COLOR_SWATCH_SQUARE_SIZE}" height="{COLOR_SWATCH_SQUARE_SIZE}" fill="{html.escape(color)}"/>' ) svg_parts.append("</svg>") return "\n".join(svg_parts) async def color_swatch_endpoint_handler(request): colors_param = get_query_param( request, ["colors", "color", "palette", "data", "content"] ) colors = color_swatch_parse_colors(colors_param) if len(colors) == 0: raise ValueError("Error: At least 1 color is required") if len(colors) > 50: raise ValueError("Error: Too many colors provided (maximum 50)") svg_content = color_swatch_generate_svg(colors) return svg_content.encode("utf-8") color_swatch_visualization = Visualization( endpoint_path="/visualization/color-swatch.svg", endpoint_handler=color_swatch_endpoint_handler, instruction=COLOR_SWATCH_INSTRUCTION, ) # ================================================================ # visualization_chart.py # ================================================================ CHART_DEFAULT_WIDTH = 800 CHART_DEFAULT_HEIGHT = 400 CHART_SCALE_FACTOR = 5 CHART_PADDING = 10 log = logging.getLogger(__name__) CHART_INSTRUCTION = """ To create a chart, simply respond with a Markdown image tag pointing to `<<VISUALIZATION_URL>>` with the Vega-Lite spec JSON in the `spec` parameter. **Important:** No tool calling is required! Just type the Markdown image directly. **URL Formatting:** - URL-Encode the JSON spec: The JSON spec string must be URL-encoded to be passed as a parameter. - URL-encode parentheses `(` as `%28` and `)` as `%29` within the spec. Unencoded parentheses will break the Markdown image syntax since they may be interpreted as URL termination. **Best Practices:** - **Use a Data URL:** Whenever possible, use `data.url` pointing to an exposed/public url with the CSV/JSON file. - **Keep it Short:** `width`, `height`, and `$schema` are all optional and will be set automatically. - **Add a title:** Always add a `title` property to the chart spec. --- ### **Examples** **Bar Chart:**  **Top 20 Candies by Sugar:**  **Line Chart:**  **Sweden CO2 Emissions Over Time**  **Scatter Plot:**  **Area Chart:**  **Heatmap:**  **Simple Inline Data Donut Chart (for small datasets when no data url is available):** """ def chart_is_valid_url_structure(url: str) -> bool: try: result = urlparse(url) return all([result.scheme, result.netloc]) except Exception: return False def chart_create_url_candidates(url: str) -> list[str]: raw_candidates = [] base_url = INSTRUCTION_BASE_URL raw_candidates.append(url) if not url.startswith(("http://", "https://")): try: parsed = urlparse(url) path_segment = parsed.netloc + parsed.path if parsed.query: path_segment += f"?{parsed.query}" if path_segment and path_segment.lstrip("/"): raw_candidates.append(f"{base_url}/{path_segment.lstrip('/')}") except Exception: pass unique_candidates = list(dict.fromkeys(raw_candidates)) return [c for c in unique_candidates if chart_is_valid_url_structure(c)] async def chart_download_and_parse_data(url: str) -> list[dict]: """Download and parse data from a URL with caching.""" try: async with aiohttp.ClientSession() as session, session.get( url, timeout=5 ) as response: if response.status == 200: content = await response.text() content_type = response.headers.get("Content-Type", "").split(";")[0] # Determine data type and parse is_json = "json" in content_type or url.endswith(".json") is_csv = "csv" in content_type or url.endswith(".csv") if is_json: parsed_data = json.loads(content) elif is_csv: csv_file = io.StringIO(content) reader = csv.DictReader(csv_file) parsed_data = list(reader) else: log.warning(f"Could not determine data type for {url}") return [] return parsed_data else: raise ValueError(f"HTTP {response.status} when downloading {url}") except Exception as e: log.error(f"Failed to download from {url}. Error: {type(e).__name__}") raise async def chart_resolve_data_url(spec_data: dict): original_url = spec_data["url"] candidates = chart_create_url_candidates(original_url) for i, candidate in enumerate(candidates): try: # Use the cached download method parsed_data = await chart_download_and_parse_data(candidate) if parsed_data: # Embed parsed data and clean up spec spec_data["values"] = parsed_data del spec_data["url"] if "format" in spec_data: del spec_data["format"] return except Exception as e: log.warning( f"Failed to resolve data from candidate '{candidate}'. Error: {type(e).__name__}, Details: {e}" ) log.error(f"All candidates failed for URL: {original_url}") raise ValueError(f"Could not resolve data for chart from URL: {original_url}") def chart_repair_json_spec(spec: str) -> str: """Attempt to repair common JSON encoding issues from LLM-generated URLs""" # Fix malformed property names (e.g., {22labelAngle} -> {"labelAngle"}) spec = re.sub(r"\{(\d+)([a-zA-Z_][a-zA-Z0-9_]*)", r'{"\2', spec) # Fix missing quotes in general after commas or opening braces spec = re.sub(r"(\{|\,)(\d+)([a-zA-Z_][a-zA-Z0-9_]*)", r'\1"\3', spec) # Fix URL-encoded characters that weren't properly encoded spec = spec.replace("{22", '{"') spec = spec.replace("22:", '":') spec = spec.replace("22}", '"}') # Fix common quote issues around colon spec = re.sub(r"(\d+):", r'"\1":', spec) return spec def chart_parse_spec(spec: str) -> dict: """Multi-stage parsing strategy with repair fallbacks""" # Stage 1: Direct JSON parsing try: result = json.loads(spec) return result except json.JSONDecodeError: pass # Stage 2: Repair and retry JSON try: repaired_spec = chart_repair_json_spec(spec) result = json.loads(repaired_spec) return result except json.JSONDecodeError: pass # Stage 3: HOCON parsing try: result = ConfigFactory.parse_string(spec).as_plain_ordered_dict() return result except Exception: pass # Stage 4: Repair and try HOCON try: repaired_spec = chart_repair_json_spec(spec) result = ConfigFactory.parse_string(repaired_spec).as_plain_ordered_dict() return result except Exception as final_error: raise ValueError( f"Failed to parse specification with all strategies: {final_error}" ) async def chart_prepare_spec_for_rendering(vl_spec_dict: dict): if "url" in vl_spec_dict.get("data", {}): await chart_resolve_data_url(vl_spec_dict["data"]) if "$schema" not in vl_spec_dict: vl_spec_dict["$schema"] = "https://vega.github.io/schema/vega-lite/v5.json" if "width" not in vl_spec_dict: vl_spec_dict["width"] = CHART_DEFAULT_WIDTH if "height" not in vl_spec_dict: vl_spec_dict["height"] = CHART_DEFAULT_HEIGHT # Force autosize to fit for predictable dimensions vl_spec_dict["autosize"] = "fit" # Force padding to consistent spacing vl_spec_dict["padding"] = CHART_PADDING async def chart_render_png_from_spec_cached(vl_spec_str: str) -> bytes: result = vl_convert.vegalite_to_png(vl_spec_str, scale=CHART_SCALE_FACTOR) return result async def chart_endpoint_handler(request: Request): # Extract spec parameter spec = get_query_param( request, ["spec", "data", "specification", "vega", "chart", "content"], 100_000 ) # Parse spec vl_spec_dict = chart_parse_spec(spec) # Prepare spec for rendering await chart_prepare_spec_for_rendering(vl_spec_dict) # Convert to string for rendering final_vl_spec_str = json.dumps(vl_spec_dict) # Render PNG png_data = await chart_render_png_from_spec_cached(final_vl_spec_str) # Create response return png_data chart_visualization = Visualization( endpoint_path="/visualization/chart.png", endpoint_handler=chart_endpoint_handler, instruction=CHART_INSTRUCTION, cacheable=True, ) # ================================================================ # visualization_all.py # ================================================================ visualizations = [ chart_visualization, generative_image_visualization, fancy_text_visualization, large_emoji_visualization, qr_code_visualization, color_swatch_visualization, meme_visualization, ] # ================================================================ # utils_hardware.py # ================================================================ log = logging.getLogger(__name__) def log_hardware_specs(): """Log hardware specifications with focus on CPU cores.""" try: # CPU Information - this is the main focus cpu_count_logical = psutil.cpu_count(logical=True) cpu_count_physical = psutil.cpu_count(logical=False) cpu_freq = psutil.cpu_freq() log.info("=" * 50) log.info("HARDWARE SPECIFICATIONS") log.info("=" * 50) log.info(f"CPU Logical Cores: {cpu_count_logical}") log.info(f"CPU Physical Cores: {cpu_count_physical}") if cpu_freq: log.info(f"CPU Frequency: {cpu_freq.current:.2f} MHz") log.info(f"CPU Architecture: {platform.machine()}") log.info(f"CPU Usage: {psutil.cpu_percent(interval=1)}%") # Memory Information memory = psutil.virtual_memory() log.info(f"Memory Total: {memory.total / (1024**3):.2f} GB") log.info(f"Memory Available: {memory.available / (1024**3):.2f} GB") log.info(f"Memory Used: {memory.percent}%") # System Information log.info(f"Platform: {platform.system()} {platform.release()}") log.info(f"Hostname: {socket.gethostname()}") log.info("=" * 50) except Exception as e: log.error(f"Failed to log hardware specs: {e}") # Log hardware specs on startup log_hardware_specs() # ================================================================ # visualizations_main.py # ================================================================ log = logging.getLogger(__name__) # Register visualization endpoints register_visualization_endpoints(visualizations) # ================================================================ # INJECT LOADER JS # ================================================================ def ensure_loader_js(prefix: str, js_code: str) -> None: """ Inject multiline JavaScript code into the frontend loader.js file. Args: prefix: Unique prefix to identify the JS block js_code: Multiline JavaScript code to inject """ target_file = Path(STATIC_DIR) / "loader.js" # Create the multiline block with start/end comments start_comment = f"/*{prefix}_start*/" end_comment = f"/*{prefix}_end*/" full_block = f"{start_comment}\n{js_code}\n{end_comment}" # Pattern to find existing blocks start_pattern = f"/*{prefix}_start*/" end_pattern = f"/*{prefix}_end*/" try: # Read existing content or create empty string if file doesn't exist if target_file.exists(): with open(target_file, encoding="utf-8") as f: content = f.read() else: content = "" # Create directory if it doesn't exist target_file.parent.mkdir(parents=True, exist_ok=True) # Find and replace existing block with same prefix, or add new one start_index = content.find(start_pattern) end_index = content.find(end_pattern) if start_index != -1 and end_index != -1: # Replace existing block before_block = content[:start_index] after_block = content[end_index + len(end_pattern) :] new_content = before_block + full_block + after_block log.info(f"Replaced existing block with prefix '{prefix}' in {target_file}") else: # Add new block at the end new_content = ( content.rstrip() + "\n\n" + full_block + "\n" if content.strip() else full_block + "\n" ) log.info(f"Added new block with prefix '{prefix}' to {target_file}") # Write back to file with open(target_file, "w", encoding="utf-8") as f: f.write(new_content) log.info(f"Successfully ensured block exists in {target_file}") except Exception as e: log.error(f"Failed to inject block into {target_file}: {e}") # Inject visual loading state for generative images loading_state_js = """ (function() { 'use strict'; // Constants const GENERATIVE_IMAGE_PATH = '/visualization/generative-image.jpg'; const CHART_PATH = '/visualization/chart.png'; const DEFAULT_ASPECT_RATIO = 16/9; // Chart defaults from backend variables const CHART_DEFAULT_WIDTH = <<CHART_DEFAULT_WIDTH>>; const CHART_DEFAULT_HEIGHT = <<CHART_DEFAULT_HEIGHT>>; const CHART_SCALE_FACTOR = <<CHART_SCALE_FACTOR>>; const CHART_PADDING = <<CHART_PADDING>>; // === Element Detection === function isWithinChatSection(img) { let element = img; while (element && element.parentElement) { if (element.tagName === 'SECTION' && element.getAttribute('aria-labelledby') === 'chat-conversation') { return true; } element = element.parentElement; } return false; } // === DOM Operations === function canProcessImage(img) { return !img.dataset.vizProcessed && img.tagName === 'IMG' && isWithinChatSection(img) && (img.src?.includes(GENERATIVE_IMAGE_PATH) || img.src?.includes(CHART_PATH)); } function markImageProcessed(img) { img.dataset.vizProcessed = 'true'; } function removeWfitClasses(img) { let container = img.parentElement; let steps = 0; const maxSteps = 3; while (container && container.parentElement && steps < maxSteps) { if (container.classList.contains('w-fit')) { container.classList.remove('w-fit'); break; } container = container.parentElement; steps++; } } // === Frontend Chart Spec Extraction === function getVlSpecDict(imageUrl) { try { const url = new URL(imageUrl); const specParam = url.searchParams.get('spec'); return specParam ? JSON.parse(decodeURIComponent(specParam)) : {}; } catch (error) { console.warn('Failed to parse VL spec, using empty object:', error); return {}; } } // === Frontend Chart Dimension Extraction === function getChartDimension(vlSpecDict, dimension, defaultValue) { try { const value = vlSpecDict[dimension] || defaultValue; return parseFloat(value) || defaultValue; } catch (error) { return defaultValue; } } // === Frontend Chart Aspect Ratio Calculation === function calculateChartAspectRatio(imageUrl) { // Parse VL spec from URL const vlSpecDict = getVlSpecDict(imageUrl); // Extract width and height using the helper method const width = getChartDimension(vlSpecDict, 'width', CHART_DEFAULT_WIDTH); const height = getChartDimension(vlSpecDict, 'height', CHART_DEFAULT_HEIGHT); // Return the aspect ratio of predicted final dimensions // Final dimensions: (width + padding*2) * scale_factor return ((width + (CHART_PADDING * 2)) * CHART_SCALE_FACTOR) / ((height + (CHART_PADDING * 2)) * CHART_SCALE_FACTOR); } // === Available Aspect Map === // Dynamically injected by backend when filter is invoked // Map from aspect ratio names to [width, height] arrays const AVAILABLE_ASPECT_MAP = <<AVAILABLE_ASPECT_MAP>>; // === Frontend Aspect Ratio Prediction === function parseAspectRatio(aspectStr) { try { // Single regex to split on either ":" or "x" const parts = aspectStr.toLowerCase().split(/[:x]/); if (parts.length === 2) { const width = parseFloat(parts[0]); const height = parseFloat(parts[1]); if (width > 0 && height > 0) { return [width, height]; } } } catch (error) { // Parsing failed } return null; } function calculateGenerativeImageAspectRatio(imageUrl) { try { const url = new URL(imageUrl); const aspectRatioParam = url.searchParams.get('aspect_ratio') || '16:9'; const targetRatio = parseAspectRatio(aspectRatioParam); if (!targetRatio || !Object.keys(AVAILABLE_ASPECT_MAP).length) return 16 / 9; const [targetWidth, targetHeight] = targetRatio; const targetValue = targetWidth / targetHeight; let bestDistance = Infinity; let bestDimensions = null; // Find closest supported aspect ratio using actual dimensions for (const [ratioName, dimensions] of Object.entries(AVAILABLE_ASPECT_MAP)) { const [width, height] = dimensions; const ratioValue = width / height; const distance = Math.abs(targetValue - ratioValue); if (distance < bestDistance) { bestDistance = distance; bestDimensions = dimensions; } } return bestDimensions ? bestDimensions[0] / bestDimensions[1] : 16 / 9; } catch (error) { console.warn('Failed to predict generative image aspect ratio:', error); return 16 / 9; } } function calculateAspectRatio(imageUrl) { try { if (imageUrl.includes(CHART_PATH)) { const ratio = calculateChartAspectRatio(imageUrl); console.log('Chart aspect ratio calculated:', ratio); return ratio; } else if (imageUrl.includes(GENERATIVE_IMAGE_PATH)) { const ratio = calculateGenerativeImageAspectRatio(imageUrl); console.log('Generative image aspect ratio predicted:', ratio); return ratio; } else { console.log('Unknown image type, using default aspect ratio'); return DEFAULT_ASPECT_RATIO; } } catch (error) { console.warn('Failed to calculate aspect ratio, using default:', error); return DEFAULT_ASPECT_RATIO; } } // === Styling === function createBackgroundStyle() { // Create a style element for the animation const styleId = 'viz-loading-animation-style'; let styleEl = document.getElementById(styleId); if (!styleEl) { styleEl = document.createElement('style'); styleEl.id = styleId; styleEl.textContent = ` @keyframes viz-loading-pulse { 0%, 100% { background-color: #d1d5db; } 50% { background-color: #f3f4f6; } } .viz-loading-placeholder { animation: viz-loading-pulse 2s ease-in-out infinite; } .viz-error-state { background-color: #fca5a5; animation: none; } `; document.head.appendChild(styleEl); } } function createPlaceholder(aspectRatio) { createBackgroundStyle(); const wrapper = document.createElement('div'); wrapper.classList.add('rounded-lg', 'viz-loading-placeholder'); Object.assign(wrapper.style, { display: 'block', width: '100%', position: 'relative', overflow: 'hidden', paddingBottom: `${(1 / aspectRatio) * 100}%`, height: '0' }); return wrapper; } function createImageClone(imgElement) { const imgClone = imgElement.cloneNode(true); imgClone.classList.remove('rounded-lg'); imgClone.style.margin = '0'; Object.assign(imgClone.style, { position: 'absolute', top: '0', left: '0', width: '100%', height: '100%', objectFit: 'cover', opacity: '0', transition: 'opacity 0.1s ease-in-out' }); return imgClone; } function setupImageLoadHandler(imgClone) { const handleLoad = () => { imgClone.style.opacity = '1'; // Stop the loading animation stopLoadingAnimation(imgClone.parentElement); }; const handleError = () => { // Apply error state with red background applyErrorState(imgClone.parentElement); }; imgClone.onload = handleLoad; imgClone.onerror = handleError; // Handle cached images if (imgClone.complete && imgClone.naturalHeight !== 0) { handleLoad(); } } function stopLoadingAnimation(wrapper) { if (wrapper && wrapper.classList.contains('viz-loading-placeholder')) { wrapper.style.animation = 'none'; wrapper.classList.remove('viz-loading-placeholder'); } } function applyErrorState(wrapper) { if (wrapper) { // Stop the loading animation wrapper.style.animation = 'none'; wrapper.classList.remove('viz-loading-placeholder'); // Apply error state with red background wrapper.classList.add('viz-error-state'); } } // === Main Processing Functions === async function processImage(imgElement) { if (!canProcessImage(imgElement)) { return; } markImageProcessed(imgElement); try { let aspectRatio; if (imgElement.src.includes(CHART_PATH)) { // Calculate chart aspect ratio immediately (no network call) aspectRatio = calculateChartAspectRatio(imgElement.src); console.log('Chart aspect ratio calculated frontend:', aspectRatio); } else if (imgElement.src.includes(GENERATIVE_IMAGE_PATH)) { // Use local prediction for generative images (no per-image backend call) aspectRatio = calculateGenerativeImageAspectRatio(imgElement.src); console.log('Generative image aspect ratio predicted locally:', aspectRatio); } else { aspectRatio = DEFAULT_ASPECT_RATIO; console.log('Unknown image type, using default aspect ratio:', aspectRatio); } wrapImageWithPlaceholder(imgElement, aspectRatio); } catch (error) { console.warn('Failed to process loading state:', error); wrapImageWithPlaceholder(imgElement, DEFAULT_ASPECT_RATIO); } } function wrapImageWithPlaceholder(imgElement, aspectRatio) { const parent = imgElement.parentNode; if (!parent) { console.warn('Image has no parent element, cannot apply wrapper'); return; } removeWfitClasses(imgElement); const wrapper = createPlaceholder(aspectRatio); const imgClone = createImageClone(imgElement); wrapper.appendChild(imgClone); parent.replaceChild(wrapper, imgElement); setupImageLoadHandler(imgClone); } // === Initialization === function processExistingImages() { const images = document.querySelectorAll('img'); images.forEach(processImage); } function setupMutationObserver() { const observer = new MutationObserver((mutations) => { mutations.forEach((mutation) => { mutation.addedNodes.forEach((node) => { if (node.nodeType === Node.ELEMENT_NODE) { if (node.tagName === 'IMG') { processImage(node); } const descendantImages = node.querySelectorAll?.('img'); if (descendantImages) { descendantImages.forEach(processImage); } } }); }); }); observer.observe(document.body, { childList: true, subtree: true }); } function initializeImageLoader() { processExistingImages(); setupMutationObserver(); } // === DOM Ready Check === if (document.readyState === 'loading') { document.addEventListener('DOMContentLoaded', initializeImageLoader); } else { initializeImageLoader(); } })(); """ # Format the JavaScript with actual chart default values and available aspect ratios def get_formatted_loading_state_js(): # Get available aspect map from the current image generator generator = get_image_generator() if generator: aspect_map = generator.get_aspect_map() else: aspect_map = {"16:9": [1792, 1024]} # Default fallback (OpenAI 16:9 dimensions) # Format as JavaScript object aspect_map_json = json.dumps(aspect_map) # Replace placeholders with actual values return ( loading_state_js.replace("<<CHART_DEFAULT_WIDTH>>", str(CHART_DEFAULT_WIDTH)) .replace("<<CHART_DEFAULT_HEIGHT>>", str(CHART_DEFAULT_HEIGHT)) .replace("<<CHART_SCALE_FACTOR>>", str(CHART_SCALE_FACTOR)) .replace("<<CHART_PADDING>>", str(CHART_PADDING)) .replace("<<AVAILABLE_ASPECT_MAP>>", aspect_map_json) ) # Initialize on first load, will be updated dynamically in filter.inlet() formatted_loading_state_js = get_formatted_loading_state_js() ensure_loader_js("visualizations_loading_state", formatted_loading_state_js) # ================================================================ # FILTER # ================================================================ VISUALIZATIONS_INTRO = ( """**This message is knowledge automatically added by the system.**""" ) VISUALIZATIONS_OUTRO = """So to clarify no tool use or function calling is required to do this. Just type out the Markdown image directly as above. IMPORTANT: Always use the full URL form (e.g., `<<INSTRUCTION_BASE_URL>>/visualization/fancy-text.svg?pass=<<VISUALIZATION_PASS>>&text=...`) rather than relative URLs. All URLs must include the `pass` parameter for authentication. Do NOT wrap the markdown image in code blocks or other markdown formatting. The image should be rendered and displayed.""" provider_names: list[str] = [gen.generator_name for gen in all_image_generators] default_provider: str = provider_names[0] class Filter: class Valves(BaseModel): SELECTED_IMAGE_GENERATOR: str = Field( default=default_provider, description="Select the image generation provider to use.", json_schema_extra={"enum": provider_names}, ) FAL_API_TOKEN: str = Field(default="", description="Your fal.ai API token") REPLICATE_API_TOKEN: str = Field( default="", description="Your Replicate API token" ) OPENAI_API_TOKEN: str = Field(default="", description="Your OpenAI API token") def __init__(self): self.valves = self.Valves( SELECTED_IMAGE_GENERATOR=os.getenv("SELECTED_IMAGE_GENERATOR", ""), FAL_API_TOKEN=os.getenv("FAL_API_TOKEN", ""), REPLICATE_API_TOKEN=os.getenv("REPLICATE_API_TOKEN", ""), OPENAI_API_TOKEN=os.getenv("OPENAI_API_TOKEN", ""), ) def inlet(self, body: dict, __user__: dict | None = None) -> dict: selected_provider = self.valves.SELECTED_IMAGE_GENERATOR set_selected_image_generator_name(selected_provider) fal_api_token = self.valves.FAL_API_TOKEN set_fal_api_token(fal_api_token) replicate_api_token = self.valves.REPLICATE_API_TOKEN set_replicate_api_token(replicate_api_token) openai_api_token = self.valves.OPENAI_API_TOKEN set_openai_api_token(openai_api_token) # Update the JavaScript with current available aspect ratios formatted_loading_state_js = get_formatted_loading_state_js() ensure_loader_js("visualizations_loading_state", formatted_loading_state_js) # Create COMBINED_INSTRUCTION dynamically, only including enabled visualizations # Build instruction parts with tags instruction_parts = [] # Add intro section with tag instruction_parts.append( f"<visualizations_intro>\n{VISUALIZATIONS_INTRO.strip()}\n</visualizations_intro>" ) # Add each enabled visualization with its tag for viz in visualizations: if viz.is_enabled(): tag = viz.get_tag() instruction = viz.get_instruction().strip() instruction_parts.append( f"<visualization_{tag}>\n{instruction}\n</visualization_{tag}>" ) # Add outro section with tag instruction_parts.append( f"<visualizations_outro>\n{VISUALIZATIONS_OUTRO.strip()}\n</visualizations_outro>" ) # Combine all parts and replace placeholders COMBINED_INSTRUCTION = ( "\n\n".join(instruction_parts) .replace("<<INSTRUCTION_BASE_URL>>", INSTRUCTION_BASE_URL) .replace("<<VISUALIZATION_PASS>>", VISUALIZATION_PASS) ) messages = body.get("messages", []) instruction_message = {"role": "user", "content": COMBINED_INSTRUCTION} if messages and messages[0].get("role") == "system": messages.insert(1, instruction_message) else: messages.insert(0, instruction_message) return body
Sponsored by Open WebUI Inc.
We are hiring!
Shape the way humanity engages with
intelligence
.
1