Whitepaper
Docs
Sign In
Function
Function
pipe
v0.1
Novita Image Gen
Function ID
novita_image_gen
Creator
@antonymajar
Downloads
62+
Generate images with the novita api
Get
README
No README available
Function Code
Show
""" title: Novita.ai Image Generation Provider for OpenWebUI author: antonymajar version: 0.1 license: MIT requirements: pydantic, requests environment_variables: NOVITA_API_KEY """ import base64 import os import time from typing import Any, Dict, Generator, Iterator, List, Union import requests from open_webui.utils.misc import get_last_user_message from pydantic import BaseModel, Field class Pipe: """ Class representing the Novita.ai image generation provider. """ class Valves(BaseModel): """ Pydantic model for storing API configuration. """ NOVITA_API_KEY: str = Field( default="", description="Your API Key for Novita.ai" ) NOVITA_API_BASE_URL: str = Field( default="https://api.novita.ai", description="Base URL for the Novita API" ) def __init__(self): """ Initialize the Pipe class with default values and environment variables. """ self.type = "manifold" self.id = "NOVITA_AI" self.name = "Novita.ai: " self.valves = self.Valves( NOVITA_API_KEY=os.getenv("NOVITA_API_KEY", ""), NOVITA_API_BASE_URL=os.getenv("NOVITA_API_BASE_URL", "https://api.novita.ai") ) def wait_for_task_completion(self, task_id: str, headers: Dict[str, str], max_retries: int = 30, delay: float = 2.0) -> Dict: """ Poll the task result endpoint until the task is completed or fails. Args: task_id (str): The task ID to check headers (Dict[str, str]): Request headers max_retries (int): Maximum number of polling attempts delay (float): Delay between polling attempts in seconds Returns: Dict: The task result response """ endpoint = f"{self.valves.NOVITA_API_BASE_URL}/v3/async/task-result" params = {"task_id": task_id} for _ in range(max_retries): response = requests.get(endpoint, headers=headers, params=params) response.raise_for_status() result = response.json() if result["task"]["status"] == "TASK_STATUS_SUCCEED": return result elif result["task"]["status"] == "TASK_STATUS_FAILED": raise Exception(f"Task failed: {result['task'].get('reason', 'Unknown error')}") time.sleep(delay) raise Exception("Task timed out") def get_image_from_url(self, url: str, headers: Dict[str, str]) -> str: """ Download and convert an image URL to base64 format. Args: url (str): The image URL headers (Dict[str, str]): Request headers Returns: str: Base64-encoded image data with content type """ # Remove Authorization header for the image request as it's a public S3 URL image_headers = headers.copy() image_headers.pop("Authorization", None) response = requests.get(url, headers=image_headers) response.raise_for_status() content_type = response.headers.get("Content-Type", "image/jpeg") image_data = base64.b64encode(response.content).decode("utf-8") return f"data:{content_type};base64,{image_data}" def non_stream_response(self, headers: Dict[str, str], payload: Dict[str, Any]) -> str: """ Process a non-streaming image generation request. Args: headers (Dict[str, str]): Request headers payload (Dict[str, str]): Request payload Returns: str: Markdown formatted image or error message """ try: # Submit the generation task response = requests.post( f"{self.valves.NOVITA_API_BASE_URL}/v3/async/txt2img", headers=headers, json=payload ) response.raise_for_status() task_id = response.json()["task_id"] # Wait for task completion result = self.wait_for_task_completion(task_id, headers) if not result.get("images"): return "Error: No images generated" # Get the first generated image image_data = result["images"][0] image_url = image_data["image_url"] image_type = image_data["image_type"] # Convert image URL to base64 try: img_data = self.get_image_from_url(image_url, headers) # Return in markdown format for proper display return f"" except Exception as e: print(f"Error fetching image: {e}") # If image fetch fails, return the URL as fallback return f"Error fetching image directly. URL: {image_url}" except requests.exceptions.RequestException as e: return f"Error: Request failed: {e}" except Exception as e: return f"Error: {e}" def stream_response(self, headers: Dict[str, str], payload: Dict[str, Any]) -> Generator[str, None, None]: """ Process a streaming image generation request. Args: headers (Dict[str, str]): Request headers payload (Dict[str, Any]): Request payload Yields: str: The generated image data """ yield self.non_stream_response(headers, payload) def pipes(self) -> List[Dict[str, str]]: """ Get the list of available pipes. Returns: List[Dict[str, str]]: The list of pipes """ return [{"id": "novita_ai", "name": "Novita.ai"}] def pipe(self, body: Dict[str, Any]) -> Union[str, Generator[str, None, None], Iterator[str]]: """ Process the pipe request. Args: body (Dict[str, Any]): The request body Returns: Union[str, Generator[str, None, None], Iterator[str]]: The response """ headers = { "Authorization": f"Bearer {self.valves.NOVITA_API_KEY}", "Content-Type": "application/json" } prompt = get_last_user_message(body["messages"]) # Default payload configuration payload = { "extra": { "response_image_type": "jpeg" }, "request": { "prompt": prompt, "model_name": "sd_xl_base_1.0.safetensors", # Default to SDXL "negative_prompt": "", "width": 1024, "height": 1024, "image_num": 1, "steps": 20, "seed": -1, "sampler_name": "Euler a", "guidance_scale": 7.5 } } try: if body.get("stream", False): return self.stream_response(headers, payload) else: return self.non_stream_response(headers, payload) except Exception as e: return f"Error: {e}"