Whitepaper
Docs
Sign In
Function
Function
action
v0.1.1
ElevenLabs TTS
Function ID
elevenlabs_tts
Creator
@justinrahb
Downloads
1.9K+
An action button to generate speech from text using the ElevenLabs API
Get
README
No README available
Function Code
Show
""" title: ElevenLabs TTS Action author: justinh-rahb author_url: https://github.com/justinh-rahb funding_url: https://github.com/open-webui version: 0.1.1 license: MIT icon_url: data:image/svg+xml;base64,PHN2ZyB3aWR0aD0iMzIiIGhlaWdodD0iMzIiIHhtbG5zPSJodHRwOi8vd3d3LnczLm9yZy8yMDAwL3N2ZyI+PHJlY3QgeD0iOCIgeT0iNiIgd2lkdGg9IjYiIGhlaWdodD0iMjAiIGZpbGw9IiM0QzRDNEMiLz48cmVjdCB4PSIxOCIgeT0iNiIgd2lkdGg9IjYiIGhlaWdodD0iMjAiIGZpbGw9IiM0QzRDNEMiLz48L3N2Zz4= required_open_webui_version: 0.3.10 """ import requests import uuid import os from pydantic import BaseModel, Field from typing import Callable, Union, Any, Dict, Tuple from config import UPLOAD_DIR from apps.webui.models.files import Files DEBUG = True class Action: class Valves(BaseModel): ELEVENLABS_API_KEY: str = Field( default=None, description="Your ElevenLabs API key." ) ELEVENLABS_MODEL_ID: str = Field( default="eleven_multilingual_v2", description="ID of the ElevenLabs TTS model to use.", ) def __init__(self): self.valves = self.Valves() self.voice_id_cache = {} def status_object( self, description: str = "Unknown State", status: str = "in_progress", done: bool = False, ) -> Dict: return { "type": "status", "data": { "status": status, "description": description, "done": done, }, } async def fetch_available_voices(self) -> Tuple[str, Dict[str, str]]: if DEBUG: print("Debug: Fetching available voices") base_url = "https://api.elevenlabs.io/v1" headers = { "xi-api-key": self.valves.ELEVENLABS_API_KEY, "Content-Type": "application/json", } voices_url = f"{base_url}/voices" try: response = requests.get(voices_url, headers=headers) response.raise_for_status() voices_data = response.json() display_message = "Available voices from ElevenLabs:\n\n" voice_options = {} for voice in voices_data.get("voices", []): voice_name = voice["name"] voice_id = voice["voice_id"] display_message += f"- {voice_name}\n" voice_options[voice_name] = voice_id if DEBUG: print(f"Debug: Found {len(voices_data.get('voices', []))} voices") return display_message, voice_options except requests.RequestException as e: if DEBUG: print(f"Debug: Error fetching voices: {str(e)}") return "Sorry, couldn't fetch available voices at the moment.", {} async def action( self, body: dict, __user__: dict = {}, __event_emitter__: Callable[[dict], Any] = None, __event_call__: Callable[[dict], Any] = None, ) -> None: if DEBUG: print(f"Debug: ElevenLabs TTS action invoked") try: if __event_emitter__: await __event_emitter__( self.status_object("Initializing ElevenLabs Text-to-Speech") ) if not self.valves.ELEVENLABS_API_KEY: raise ValueError("ElevenLabs API key is not set") if "id" not in __user__: raise ValueError("User not authenticated") display_message, self.voice_id_cache = await self.fetch_available_voices() if not self.voice_id_cache: raise ValueError("No available voices to select") response = await __event_call__( { "type": "input", "data": { "title": "Select Voice", "message": display_message, "input_type": "select", "options": list(self.voice_id_cache.keys()), }, } ) if DEBUG: print(f"Debug: Voice selection response: {response}") if isinstance(response, str): selected_voice_name = response elif isinstance(response, dict): selected_voice_name = response.get("message") else: raise ValueError(f"Unexpected response type: {type(response)}") selected_voice_id = self.voice_id_cache.get(selected_voice_name) if DEBUG: print( f"Debug: Selected voice: {selected_voice_name} ({selected_voice_id})" ) if not selected_voice_id: raise ValueError(f"Invalid voice selection: {selected_voice_name}") messages = body.get("messages", []) assistant_message = next( ( message.get("content") for message in reversed(messages) if message.get("role") == "assistant" ), None, ) if not assistant_message: raise ValueError("No assistant message to convert") if __event_emitter__: await __event_emitter__(self.status_object("Generating speech")) base_url = "https://api.elevenlabs.io/v1" headers = { "xi-api-key": self.valves.ELEVENLABS_API_KEY, "Content-Type": "application/json", } tts_url = f"{base_url}/text-to-speech/{selected_voice_id}" payload = { "text": assistant_message, "model_id": self.valves.ELEVENLABS_MODEL_ID, "voice_settings": {"stability": 0.5, "similarity_boost": 0.5}, } response = requests.post(tts_url, json=payload, headers=headers) response.raise_for_status() if response.status_code == 200: audio_data = response.content file_name = f"tts_{uuid.uuid4()}.mp3" file_id = self._create_file( file_name, "Generated Audio", audio_data, "audio/mpeg", __user__ ) if file_id: file_url = self._get_file_url(file_id) if __event_emitter__: await __event_emitter__( self.status_object( "Generated successfully", status="complete", done=True ) ) if file_url: await __event_emitter__( { "type": "message", "data": { "content": f"\n\n---\n- [Download ElevenLabs Audio]({file_url})\n" }, } ) else: raise ValueError("Error saving audio file") else: raise ValueError(f"Unexpected API response: {response.text}") except Exception as e: if DEBUG: print(f"Debug: Error in action method: {str(e)}") if __event_emitter__: await __event_emitter__( self.status_object(f"Error: {str(e)}", status="error", done=True) ) def _create_file( self, file_name: str, title: str, content: Union[str, bytes], content_type: str, __user__: dict = {}, ) -> str: if DEBUG: print(f"Debug: Creating file: {file_name}") if "id" not in __user__: if DEBUG: print("Debug: User ID is not available") return None base_path = os.path.join(UPLOAD_DIR) file_id = str(uuid.uuid4()) file_path = os.path.join(base_path, f"{file_id}_{file_name}") mode = "w" if isinstance(content, str) else "wb" try: os.makedirs(base_path, exist_ok=True) with open(file_path, mode) as f: f.write(content) meta = { "source": file_path, "title": title, "content_type": content_type, "size": os.path.getsize(file_path), "path": file_path, } class FileForm(BaseModel): id: str filename: str meta: dict = {} formData = FileForm(id=file_id, filename=file_name, meta=meta) file = Files.insert_new_file(__user__["id"], formData) if DEBUG: print(f"Debug: File saved. Path: {file_path}") return file.id except Exception as e: if DEBUG: print(f"Debug: Error saving file: {e}") return None def _get_file_url(self, file_id: str) -> str: return f"/api/v1/files/{file_id}/content"