"""
title: Google AI Pipeline
author: owndev, olivier-lacroix, christiant
author_url: https://github.com/owndev/
project_url: https://github.com/owndev/Open-WebUI-Functions
funding_url: https://github.com/sponsors/owndev
version: 1.4.1
license: Apache License 2.0
description: A manifold pipeline for interacting with Google Gemini and Gemma models, including dynamic model specification, streaming responses, and flexible error handling.
Last Change Log: Added proper Gemma model support handling no system message support condition
features:
- Asynchronous API calls for better performance
- Model caching to reduce API calls
- Dynamic model specification with automatic prefix stripping
- Streaming response handling with safety checks
- Support for multimodal input (text and images)
- Flexible error handling and logging
- Integration with Google Generative AI or Vertex AI API for content generation
- Support for various generation parameters (temperature, max tokens, etc.)
- Customizable safety settings based on environment variables
- Encrypted storage of sensitive API keys
- Grounding with Google search
- Native tool calling support
"""
import os
import inspect
from functools import update_wrapper
import re
import time
import asyncio
import base64
import hashlib
import logging
from google import genai
from google.genai import types
from google.genai.errors import ClientError, ServerError, APIError
from typing import List, Union, Optional, Dict, Any, Tuple, AsyncIterator, Callable
from pydantic_core import core_schema
from pydantic import BaseModel, Field, GetCoreSchemaHandler
from cryptography.fernet import Fernet, InvalidToken
from open_webui.env import SRC_LOG_LEVELS
# Simplified encryption implementation with automatic handling
class EncryptedStr(str):
"""A string type that automatically handles encryption/decryption"""
@classmethod
def _get_encryption_key(cls) -> Optional[bytes]:
"""
Generate encryption key from WEBUI_SECRET_KEY if available
Returns None if no key is configured
"""
secret = os.getenv("WEBUI_SECRET_KEY")
if not secret:
return None
hashed_key = hashlib.sha256(secret.encode()).digest()
return base64.urlsafe_b64encode(hashed_key)
@classmethod
def encrypt(cls, value: str) -> str:
"""
Encrypt a string value if a key is available
Returns the original value if no key is available
"""
if not value or value.startswith("encrypted:"):
return value
key = cls._get_encryption_key()
if not key: # No encryption if no key
return value
f = Fernet(key)
encrypted = f.encrypt(value.encode())
return f"encrypted:{encrypted.decode()}"
@classmethod
def decrypt(cls, value: str) -> str:
"""
Decrypt an encrypted string value if a key is available
Returns the original value if no key is available or decryption fails
"""
if not value or not value.startswith("encrypted:"):
return value
key = cls._get_encryption_key()
if not key: # No decryption if no key
return value[len("encrypted:") :] # Return without prefix
try:
encrypted_part = value[len("encrypted:") :]
f = Fernet(key)
decrypted = f.decrypt(encrypted_part.encode())
return decrypted.decode()
except (InvalidToken, Exception):
return value
# Pydantic integration
@classmethod
def __get_pydantic_core_schema__(
cls, _source_type: Any, _handler: GetCoreSchemaHandler
) -> core_schema.CoreSchema:
return core_schema.union_schema(
[
core_schema.is_instance_schema(cls),
core_schema.chain_schema(
[
core_schema.str_schema(),
core_schema.no_info_plain_validator_function(
lambda value: cls(cls.encrypt(value) if value else value)
),
]
),
],
serialization=core_schema.plain_serializer_function_ser_schema(
lambda instance: str(instance)
),
)
def get_decrypted(self) -> str:
"""Get the decrypted value"""
return self.decrypt(self)
class Pipe:
"""
Pipeline for interacting with Google Gemini models.
"""
# Configuration valves for the pipeline
class Valves(BaseModel):
GOOGLE_API_KEY: EncryptedStr = Field(
default=os.getenv("GOOGLE_API_KEY", ""),
description="API key for Google Generative AI (used if USE_VERTEX_AI is false).",
)
USE_VERTEX_AI: bool = Field(
default=os.getenv("GOOGLE_GENAI_USE_VERTEXAI", "false").lower() == "true",
description="Whether to use Google Cloud Vertex AI instead of the Google Generative AI API.",
)
VERTEX_PROJECT: str | None = Field(
default=os.getenv("GOOGLE_CLOUD_PROJECT"),
description="The Google Cloud project ID to use with Vertex AI.",
)
VERTEX_LOCATION: str = Field(
default=os.getenv("GOOGLE_CLOUD_LOCATION", "global"),
description="The Google Cloud region to use with Vertex AI.",
)
USE_PERMISSIVE_SAFETY: bool = Field(
default=os.getenv("USE_PERMISSIVE_SAFETY", "false").lower() == "true",
description="Use permissive safety settings for content generation.",
)
MODEL_CACHE_TTL: int = Field(
default=int(os.getenv("GOOGLE_MODEL_CACHE_TTL", "600")),
description="Time in seconds to cache the model list before refreshing",
)
RETRY_COUNT: int = Field(
default=int(os.getenv("GOOGLE_RETRY_COUNT", "2")),
description="Number of times to retry API calls on temporary failures",
)
def __init__(self):
"""Initializes the Pipe instance and configures the genai library."""
self.valves = self.Valves()
self.name: str = "Google Gemini: "
# Setup logging
self.log = logging.getLogger("google_ai.pipe")
self.log.setLevel(SRC_LOG_LEVELS.get("OPENAI", logging.INFO))
# Model cache
self._model_cache: Optional[List[Dict[str, str]]] = None
self._model_cache_time: float = 0
def _get_client(self) -> genai.Client:
"""
Validates API credentials and returns a genai.Client instance.
"""
self._validate_api_key()
if self.valves.USE_VERTEX_AI:
self.log.debug(
f"Initializing Vertex AI client (Project: {self.valves.VERTEX_PROJECT}, Location: {self.valves.VERTEX_LOCATION})"
)
return genai.Client(
vertexai=True,
project=self.valves.VERTEX_PROJECT,
location=self.valves.VERTEX_LOCATION,
)
else:
self.log.debug("Initializing Google Generative AI client with API Key")
return genai.Client(api_key=self.valves.GOOGLE_API_KEY.get_decrypted())
def _validate_api_key(self) -> None:
"""
Validates that the necessary Google API credentials are set.
Raises:
ValueError: If the required credentials are not set.
"""
if self.valves.USE_VERTEX_AI:
if not self.valves.VERTEX_PROJECT:
self.log.error("USE_VERTEX_AI is true, but VERTEX_PROJECT is not set.")
raise ValueError(
"VERTEX_PROJECT is not set. Please provide the Google Cloud project ID."
)
# For Vertex AI, location has a default, so project is the main thing to check.
# Actual authentication will be handled by ADC or environment.
self.log.debug(
"Using Vertex AI. Ensure ADC or service account is configured."
)
else:
if not self.valves.GOOGLE_API_KEY:
self.log.error("GOOGLE_API_KEY is not set (and not using Vertex AI).")
raise ValueError(
"GOOGLE_API_KEY is not set. Please provide the API key in the environment variables or valves."
)
self.log.debug("Using Google Generative AI API with API Key.")
def strip_prefix(self, model_name: str) -> str:
"""
Extract the model identifier using regex, handling various naming conventions.
e.g., "google_gemini_pipeline.gemini-2.5-flash-preview-04-17" -> "gemini-2.5-flash-preview-04-17"
e.g., "models/gemini-1.5-flash-001" -> "gemini-1.5-flash-001"
e.g., "publishers/google/models/gemini-1.5-pro" -> "gemini-1.5-pro"
"""
# Use regex to remove everything up to and including the last '/' or the first '.'
stripped = re.sub(r"^(?:.*/|[^.]*\.)", "", model_name)
return stripped
def get_google_models(self, force_refresh: bool = False) -> List[Dict[str, str]]:
"""
Retrieve available Google models suitable for content generation.
Uses caching to reduce API calls.
Args:
force_refresh: Whether to force refreshing the model cache
Returns:
List of dictionaries containing model id and name.
"""
# Check cache first
current_time = time.time()
if (
not force_refresh
and self._model_cache is not None
and (current_time - self._model_cache_time) < self.valves.MODEL_CACHE_TTL
):
self.log.debug("Using cached model list")
return self._model_cache
try:
client = self._get_client()
self.log.debug("Fetching models from Google API")
models = client.models.list()
available_models = []
for model in models:
actions = model.supported_actions
if actions is None or "generateContent" in actions:
available_models.append(
{
"id": self.strip_prefix(model.name),
"name": model.display_name or self.strip_prefix(model.name),
}
)
model_map = {model["id"]: model for model in available_models}
# Filter map to only include models starting with 'gemini-'
filtered_models = {
k: v for k, v in model_map.items() if k.startswith("gem")
}
# Update cache
self._model_cache = list(filtered_models.values())
self._model_cache_time = current_time
self.log.debug(f"Found {len(self._model_cache)} Gemini models")
return self._model_cache
except Exception as e:
self.log.exception(f"Could not fetch models from Google: {str(e)}")
# Return a specific error entry for the UI
return [{"id": "error", "name": f"Could not fetch models: {str(e)}"}]
def pipes(self) -> List[Dict[str, str]]:
"""
Returns a list of available Google Gemini models for the UI.
Returns:
List of dictionaries containing model id and name.
"""
try:
self.name = "Google Gemini: "
return self.get_google_models()
except ValueError as e:
# Handle the case where API key is missing during pipe listing
self.log.error(f"Error during pipes listing (validation): {e}")
return [{"id": "error", "name": str(e)}]
except Exception as e:
# Handle other potential errors during model fetching
self.log.exception(
f"An unexpected error occurred during pipes listing: {str(e)}"
)
return [{"id": "error", "name": f"An unexpected error occurred: {str(e)}"}]
def _prepare_model_id(self, model_id: str) -> str:
"""
Prepare and validate the model ID for use with the API.
...
"""
original_model_id = model_id
model_id = self.strip_prefix(model_id)
# Allow models starting with 'gemini-' or 'gemma-'
if not (model_id.startswith("gemini-") or model_id.startswith("gemma-")):
models_list = self.get_google_models()
found_model = next(
(m["id"] for m in models_list if m["name"] == original_model_id), None
)
# Ensure the found model also starts with 'gemini-' or 'gemma-'
if found_model and (
found_model.startswith("gemini-") or found_model.startswith("gemma-")
):
model_id = found_model
self.log.debug(
f"Mapped model name '{original_model_id}' to model ID '{model_id}'"
)
else:
self.log.error(
f"Invalid or unsupported model ID: '{original_model_id}'"
)
raise ValueError(
f"Invalid or unsupported Google model ID or name: '{original_model_id}'"
)
return model_id
def _prepare_content(
self, messages: List[Dict[str, Any]]
) -> Tuple[List[Dict[str, Any]], Optional[str]]:
"""
Prepare messages content for the API and extract system message if present.
Args:
messages: List of message objects from the request
Returns:
Tuple of (prepared content list, system message string or None)
"""
# Extract system message
system_message = next(
(msg["content"] for msg in messages if msg.get("role") == "system"),
None,
)
# Prepare contents for the API
contents = []
for message in messages:
role = message.get("role")
if role == "system":
continue # Skip system messages, handled separately
content = message.get("content", "")
parts = []
# Handle different content types
if isinstance(content, list): # Multimodal content
parts.extend(self._process_multimodal_content(content))
elif isinstance(content, str): # Plain text content
parts.append({"text": content})
else:
self.log.warning(f"Unsupported message content type: {type(content)}")
continue # Skip unsupported content
# Map roles: 'assistant' -> 'model', 'user' -> 'user'
api_role = "model" if role == "assistant" else "user"
if parts: # Only add if there are parts
contents.append({"role": api_role, "parts": parts})
return contents, system_message
def _process_multimodal_content(
self, content_list: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""
Process multimodal content (text and images).
Args:
content_list: List of content items
Returns:
List of processed parts for the Gemini API
"""
parts = []
for item in content_list:
if item.get("type") == "text":
parts.append({"text": item.get("text", "")})
elif item.get("type") == "image_url":
image_url = item.get("image_url", {}).get("url", "")
if image_url.startswith("data:image"):
# Handle base64 encoded image data
try:
header, encoded = image_url.split(",", 1)
mime_type = header.split(":")[1].split(";")[0]
# Basic validation for image types
if mime_type not in [
"image/jpeg",
"image/png",
"image/webp",
"image/heic",
"image/heif",
]:
self.log.warning(
f"Unsupported image mime type: {mime_type}"
)
parts.append(
{"text": f"[Image type {mime_type} not supported]"}
)
continue
parts.append(
{
"inline_data": {
"mime_type": mime_type,
"data": encoded,
}
}
)
except Exception as img_ex:
self.log.exception(f"Could not parse image data URL: {img_ex}")
parts.append({"text": "[Image data could not be processed]"})
else:
# Gemini API doesn't directly support image URLs
self.log.warning(f"Direct image URLs not supported: {image_url}")
parts.append({"text": f"[Image URL not processed: {image_url}]"})
return parts
@staticmethod
def _create_tool(tool_def):
"""OpenwebUI tool is a functools.partial coroutine, which genai does not support directly.
See https://github.com/googleapis/python-genai/issues/907
This function wraps the tool into a callable that can be used with genai.
In particular, it sets the signature of the function properly,
removing any frozen keyword arguments (extra_params).
"""
bound_callable = tool_def["callable"]
# Create a wrapper for bound_callable, which is always async
async def wrapper(*args, **kwargs):
return await bound_callable(*args, **kwargs)
# Remove 'frozen' keyword arguments (extra_params) from the signature
original_sig = inspect.signature(bound_callable)
frozen_kwargs = {
"__event_emitter__",
"__event_call__",
"__user__",
"__metadata__",
"__request__",
"__model__",
}
new_parameters = []
for name, parameter in original_sig.parameters.items():
# Exclude keyword arguments that are frozen
if name in frozen_kwargs and parameter.kind in (
inspect.Parameter.POSITIONAL_OR_KEYWORD,
inspect.Parameter.KEYWORD_ONLY,
):
continue
# Keep remaining parameters
new_parameters.append(parameter)
new_sig = inspect.Signature(
parameters=new_parameters, return_annotation=original_sig.return_annotation
)
# Ensure name, docstring and signature are properly set
update_wrapper(wrapper, bound_callable)
wrapper.__signature__ = new_sig
return wrapper
def _configure_generation(
self,
body: Dict[str, Any],
system_instruction: Optional[str],
model_id: str = "",
metadata: Dict[str, Any] = None,
tools: Dict[str, Any] = None,
) -> types.GenerateContentConfig:
"""
Configure generation parameters and safety settings.
Args:
body: The request body containing generation parameters
system_instruction: Optional system instruction string
model_id: The model ID to check for Gemma models
metadata: Additional metadata for the request
tools: Tools available for the model
Returns:
types.GenerateContentConfig
"""
if metadata is None:
metadata = {}
gen_config_params = {
"temperature": body.get("temperature"),
"top_p": body.get("top_p"),
"top_k": body.get("top_k"),
"max_output_tokens": body.get("max_tokens"),
"stop_sequences": body.get("stop") or None,
}
# Only add system instruction for non-Gemma models
if system_instruction and not model_id.startswith("gemma-"):
gen_config_params["system_instruction"] = system_instruction
# Configure safety settings
if self.valves.USE_PERMISSIVE_SAFETY:
safety_settings = [
types.SafetySetting(
category="HARM_CATEGORY_HARASSMENT", threshold="BLOCK_NONE"
),
types.SafetySetting(
category="HARM_CATEGORY_HATE_SPEECH", threshold="BLOCK_NONE"
),
types.SafetySetting(
category="HARM_CATEGORY_SEXUALLY_EXPLICIT", threshold="BLOCK_NONE"
),
types.SafetySetting(
category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="BLOCK_NONE"
),
]
gen_config_params["safety_settings"] = safety_settings
features = metadata.get("features", {})
if features.get("google_search_tool", False):
self.log.debug("Enabling Google search grounding")
gen_config_params.setdefault("tools", []).append(
types.Tool(google_search=types.GoogleSearch())
)
if tools is not None and metadata.get("function_calling") == "native":
for name, tool_def in tools.items():
tool = self._create_tool(tool_def)
self.log.debug(
f"Adding tool '{name}' with signature {tool.__signature__}"
)
gen_config_params.setdefault("tools", []).append(tool)
# Filter out None values for generation config
filtered_params = {k: v for k, v in gen_config_params.items() if v is not None}
return types.GenerateContentConfig(**filtered_params)
@staticmethod
def _format_grounding_chunks_as_sources(
grounding_chunks: list[types.GroundingChunk],
):
formatted_sources = []
for chunk in grounding_chunks:
context = chunk.web or chunk.retrieved_context
if not context:
continue
uri = context.uri
title = context.title or "Source"
formatted_sources.append(
{
"source": {
"name": title,
"type": "web_search_results",
"url": uri,
},
"document": ["Click the link to view the content."],
"metadata": [{"source": title}],
}
)
return formatted_sources
async def _process_grounding_metadata(
self,
grounding_metadata_list: List[types.GroundingMetadata],
text: str,
__event_emitter__: Callable,
):
"""Process and emit grounding metadata events."""
grounding_chunks = []
web_search_queries = []
grounding_supports = []
for metadata in grounding_metadata_list:
if metadata.grounding_chunks:
grounding_chunks.extend(metadata.grounding_chunks)
if metadata.web_search_queries:
web_search_queries.extend(metadata.web_search_queries)
if metadata.grounding_supports:
grounding_supports.extend(metadata.grounding_supports)
# Add sources to the response
if grounding_chunks:
sources = self._format_grounding_chunks_as_sources(grounding_chunks)
await __event_emitter__(
{"type": "chat:completion", "data": {"sources": sources}}
)
# Add status specifying google queries used for grounding
if web_search_queries:
await __event_emitter__(
{
"type": "status",
"data": {
"action": "web_search",
"description": "This response was grounded with Google Search",
"urls": [
f"https://www.google.com/search?q={query}"
for query in web_search_queries
],
},
}
)
# Add citations in the text body
if grounding_supports:
# Citation indexes are in bytes
ENCODING = "utf-8"
text_bytes = text.encode(ENCODING)
last_byte_index = 0
cited_chunks = []
for support in grounding_supports:
cited_chunks.append(
text_bytes[last_byte_index : support.segment.end_index].decode(
ENCODING
)
)
# Generate and append citations (e.g., "[1][2]")
footnotes = "".join(
[f"[{i + 1}]" for i in support.grounding_chunk_indices]
)
cited_chunks.append(f" {footnotes}")
# Update index for the next segment
last_byte_index = support.segment.end_index
# Append any remaining text after the last citation
if last_byte_index < len(text_bytes):
cited_chunks.append(text_bytes[last_byte_index:].decode(ENCODING))
await __event_emitter__(
{
"type": "replace",
"data": {"content": "".join(cited_chunks)},
}
)
async def _handle_streaming_response(
self,
response_iterator: Any,
__event_emitter__: Callable,
) -> AsyncIterator[str]:
"""
Handle streaming response from Gemini API.
Args:
response_iterator: Iterator from generate_content
Returns:
Generator yielding text chunks
"""
grounding_metadata_list = []
text_chunks = []
try:
async for chunk in response_iterator:
# Check for safety feedback or empty chunks
if not chunk.candidates:
# Check prompt feedback
if (
response_iterator.prompt_feedback
and response_iterator.prompt_feedback.block_reason
):
yield f"[Blocked due to Prompt Safety: {response_iterator.prompt_feedback.block_reason.name}]"
else:
yield "[Blocked by safety settings]"
return # Stop generation
if chunk.candidates[0].grounding_metadata:
grounding_metadata_list.append(
chunk.candidates[0].grounding_metadata
)
if chunk.text:
text_chunks.append(chunk.text)
await __event_emitter__(
{
"type": "chat:message:delta",
"data": {
"content": chunk.text,
},
}
)
# After processing all chunks, handle grounding data
if grounding_metadata_list and __event_emitter__:
await self._process_grounding_metadata(
grounding_metadata_list, "".join(text_chunks), __event_emitter__
)
except Exception as e:
self.log.exception(f"Error during streaming: {e}")
yield f"Error during streaming: {e}"
def _handle_standard_response(self, response: Any) -> str:
"""
Handle non-streaming response from Gemini API.
Args:
response: Response from generate_content
Returns:
Generated text or error message
"""
# Check for prompt safety blocks
if response.prompt_feedback and response.prompt_feedback.block_reason:
return f"[Blocked due to Prompt Safety: {response.prompt_feedback.block_reason.name}]"
# Check for missing candidates
if not response.candidates:
return "[Blocked by safety settings or no candidates generated]"
# Check candidate finish reason
candidate = response.candidates[0]
if candidate.finish_reason == types.FinishReason.SAFETY:
# Try to get specific safety rating info
blocking_rating = next(
(r for r in candidate.safety_ratings if r.blocked), None
)
reason = f" ({blocking_rating.category.name})" if blocking_rating else ""
return f"[Blocked by safety settings{reason}]"
# Process content parts
if candidate.content and candidate.content.parts:
# Combine text from all parts
return "".join(
part.text for part in candidate.content.parts if hasattr(part, "text")
)
else:
return "[No content generated or unexpected response structure]"
async def _retry_with_backoff(self, func, *args, **kwargs) -> Any:
"""
Retry a function with exponential backoff.
Args:
func: Async function to retry
*args, **kwargs: Arguments to pass to the function
Returns:
Result from the function
Raises:
The last exception encountered after all retries
"""
max_retries = self.valves.RETRY_COUNT
retry_count = 0
last_exception = None
while retry_count <= max_retries:
try:
return await func(*args, **kwargs)
except ServerError as e:
# These errors might be temporary, so retry
retry_count += 1
last_exception = e
if retry_count <= max_retries:
# Calculate backoff time (exponential with jitter)
wait_time = min(2**retry_count + (0.1 * retry_count), 10)
self.log.warning(
f"Temporary error from Google API: {e}. Retrying in {wait_time:.1f}s ({retry_count}/{max_retries})"
)
await asyncio.sleep(wait_time)
else:
raise
except Exception:
# Don't retry other exceptions
raise
# If we get here, we've exhausted retries
assert last_exception is not None
raise last_exception
async def pipe(
self,
body: Dict[str, Any],
__metadata__: dict[str, Any],
__event_emitter__: Callable,
__tools__: dict[str, Any] | None,
) -> Union[str, AsyncIterator[str]]:
"""
Main method for sending requests to the Google Gemini endpoint.
Args:
body: The request body containing messages and other parameters.
Returns:
Response from Google Gemini API, which could be a string or an iterator for streaming.
"""
# Setup logging for this request
request_id = id(body)
self.log.debug(f"Processing request {request_id}")
try:
# Parse and validate model ID
model_id = body.get("model", "")
try:
model_id = self._prepare_model_id(model_id)
self.log.debug(f"Using model: {model_id}")
except ValueError as ve:
return f"Model Error: {ve}"
# Get stream flag
stream = body.get("stream", False)
messages = body.get("messages", [])
# Prepare content and extract system message
contents, system_instruction = self._prepare_content(messages)
if not contents:
return "Error: No valid message content found"
# Configure generation parameters and safety settings
generation_config = self._configure_generation(
body, system_instruction, model_id, __metadata__, __tools__
)
# Make the API call
client = self._get_client()
if stream:
try:
async def get_streaming_response():
return await client.aio.models.generate_content_stream(
model=model_id,
contents=contents,
config=generation_config,
)
response_iterator = await self._retry_with_backoff(
get_streaming_response
)
self.log.debug(f"Request {request_id}: Got streaming response")
return self._handle_streaming_response(
response_iterator, __event_emitter__
)
except Exception as e:
self.log.exception(f"Error in streaming request {request_id}: {e}")
return f"Error during streaming: {e}"
else:
try:
async def get_response():
return await client.aio.models.generate_content(
model=model_id,
contents=contents,
config=generation_config,
)
response = await self._retry_with_backoff(get_response)
self.log.debug(f"Request {request_id}: Got non-streaming response")
return self._handle_standard_response(response)
except Exception as e:
self.log.exception(
f"Error in non-streaming request {request_id}: {e}"
)
return f"Error generating content: {e}"
except ClientError as ce:
error_msg = f"Client error raised by the GenAI API: {ce}."
self.log.error(f"Client error: {ce}")
return error_msg
except ServerError as se:
error_msg = f"Server error raised by the GenAI API: {se}"
self.log.error(f"Server error raised by the GenAI API.: {se}")
return error_msg
except APIError as apie:
error_msg = f"Google API Error: {apie}"
self.log.error(error_msg)
return error_msg
except ValueError as ve:
error_msg = f"Configuration error: {ve}"
self.log.error(f"Value error: {ve}")
return error_msg
except Exception as e:
# Log the full error with traceback
import traceback
error_trace = traceback.format_exc()
self.log.exception(f"Unexpected error: {e}\n{error_trace}")
# Return a user-friendly error message
return f"An error occurred while processing your request: {e}"