"""
title: Context Awareness
author: Blah
version: 2.0
tested_on_open_webui_version: v0.6.15
description: Automatically injects accurate time and location context into all user messages with comprehensive timezone support, location awareness, and robust error handling for enhanced conversational awareness.
license: MIT
requirements: beautifulsoup4>=4.13,httpx>=0.24.0
"""
# CONTEXT AWARENESS MODULE
# ===================================
# This filter provides both time and location awareness by adding contextual information
# to user messages, enhancing the AI's ability to provide relevant responses.
#
# CONTEXT FORMAT:
# The filter adds context information in a structured XML format:
# <details type="filters_context">
# <context id="context_awareness">
# <time timezone="UTC" format="...">Wed 15 Jan 2025, 13:45:30</time>
# <location method="user_manual">London, United Kingdom</location>
# </context>
# </details>
#
# ERROR HANDLING:
# All context operations are wrapped in try-catch blocks to ensure
# failure doesn't break the chat functionality.
import time
import sys
import datetime
import logging
import functools
import inspect
import zoneinfo
import uuid
import json
import re
from typing import Any, Optional, Dict, List, Callable, Union, Tuple
from collections import defaultdict
from pydantic import BaseModel, Field
import bs4
from bs4 import BeautifulSoup
# Optional dependency for chat history fetching
try:
import httpx
except ImportError:
httpx = None
# Setup logging
LOGGER: logging.Logger = logging.getLogger("FUNC:CONTEXT_AWARENESS")
def set_logs(logger: logging.Logger, level: int, force: bool = False):
"""Configure logger with proper formatting and handlers."""
logger.setLevel(level)
# Check if handler already exists to avoid duplicates
for handler in logger.handlers:
if not force and isinstance(handler, logging.StreamHandler):
handler.setLevel(level)
logger.info("Logger already has a StreamHandler. Not creating a new one.")
return logger
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(level)
formatter = logging.Formatter(
"%(levelname)s[%(name)s]%(lineno)s:%(asctime)s: %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
# Initialise logging
set_logs(LOGGER, logging.INFO)
def log_exceptions(func: Callable[..., Any]):
"""Decorator to log exceptions in functions."""
if inspect.iscoroutinefunction(func):
@functools.wraps(func)
async def _wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as exc:
LOGGER.error("Error in %s: %s", func, exc, exc_info=True)
raise exc
else:
@functools.wraps(func)
def _wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as exc:
LOGGER.error("Error in %s: %s", func, exc, exc_info=True)
raise exc
return _wrapper
class ROLE:
SYSTEM = "system"
USER = "user"
ASSISTANT = "assistant"
class Filter:
class Valves(BaseModel):
# Time-related valves
timezone: str = Field(
default_factory=lambda: time.tzname[0],
description="Default timezone, unless overwritten by User Valve. "
f"Available timezones: {', '.join(sorted(list(zoneinfo.available_timezones())[:10]))}... (and more)"
)
date_format: str = Field(
default="ISO",
description='Date format: "ISO" (2025-01-15), "DMY_SLASH" (15/01/2025), "MDY_SLASH" (01/15/2025), "DMY_DOT" (15.01.2025)'
)
enable_history: bool = Field(
default=True,
description="Enable historical message timestamping by fetching chat history"
)
# Location-related valves
default_city: str = Field(
default="",
description="Default city when user hasn't provided one (e.g., 'London', 'New York', 'Tokyo')"
)
default_country: str = Field(
default="",
description="Default country when user hasn't provided one (e.g., 'United Kingdom', 'United States', 'Japan')"
)
# Common valves
api_base_url: str = Field(
default="http://127.0.0.1:8080",
description="Base URL for Open WebUI backend API (for historical chat fetching)"
)
priority: int = Field(
default=-10,
description="Filter priority (higher = later execution)"
)
debug_mode: bool = Field(
default=False,
description="Enable detailed logging and debugging information"
)
class UserValves(BaseModel):
# Time-related user valves
timezone: str = Field(
default_factory=lambda: time.tzname[0],
description="π Your timezone (overrides system timezone)"
)
# Location-related user valves
city: str = Field(
default="",
description="ποΈ Your current city (e.g., 'London', 'New York', 'Tokyo') - overrides system default"
)
country: str = Field(
default="",
description="π Your current country (e.g., 'United Kingdom', 'United States', 'Japan') - overrides system default"
)
# Common user valves
enabled: bool = Field(
default=True,
description="π΄ Enable or disable context awareness for this user"
)
CONTEXT_ID = "context_awareness"
def __init__(self):
self.valves = self.Valves()
self.uservalves = self.UserValves()
self._queries: Dict[str, Dict[str, Union[str, float]]] = {}
self.weekday_map = {
0: "Monday", 1: "Tuesday", 2: "Wednesday", 3: "Thursday",
4: "Friday", 5: "Saturday", 6: "Sunday"
}
if not httpx and self.valves.enable_history:
LOGGER.warning("httpx not available. Historical timestamping disabled. Install with: pip install httpx")
def _extract_jwt_from_request(self, request) -> Optional[str]:
"""Extract JWT token from request headers."""
try:
auth_header = getattr(request, 'headers', {}).get("authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header.split(" ")[1]
if token and "." in token:
return token
LOGGER.warning("Could not find valid Bearer token in request headers")
return None
except Exception as e:
LOGGER.error(f"Error extracting JWT token: {e}")
return None
async def _get_chat_history(self, chat_id: str, jwt_token: str, event_emitter: Optional[Callable]) -> Optional[Dict]:
"""Fetch chat history from API."""
if not httpx or not self.valves.enable_history:
return None
if not jwt_token:
LOGGER.error("No JWT token available for chat history fetch")
return None
api_url = f"{self.valves.api_base_url.strip('/')}/api/v1/chats/{chat_id}"
headers = {"Authorization": f"Bearer {jwt_token}"}
try:
async with httpx.AsyncClient() as client:
response = await client.get(api_url, headers=headers)
response.raise_for_status()
return response.json()
except Exception as e:
LOGGER.error(f"Error fetching chat history: {e}")
if event_emitter:
await event_emitter({
"type": "notification",
"data": {"type": "error", "content": "Context Awareness: Failed to fetch chat history"}
})
return None
def _get_text_from_content(self, content: Union[str, list, None]) -> str:
"""Extract text content from various content formats."""
if isinstance(content, str):
return content
if isinstance(content, list):
return "\n".join([
part.get("text", "") for part in content
if isinstance(part, dict) and part.get("type") == "text"
])
return ""
def get_time_prefix(self, dt_object: datetime.datetime, is_full_format: bool = True) -> str:
"""Generate time prefix string with configurable format."""
time_str = dt_object.strftime("%H:%M:%S")
if is_full_format:
format_mapping = {
"ISO": "%Y-%m-%d",
"DMY_SLASH": "%d/%m/%Y",
"MDY_SLASH": "%m/%d/%Y",
"DMY_DOT": "%d.%m.%Y"
}
date_format = format_mapping.get(self.valves.date_format, "%Y-%m-%d")
date_str = dt_object.strftime(date_format)
weekday_str = self.weekday_map[dt_object.weekday()]
return f"[{date_str}, {weekday_str}, {time_str}]"
else:
return f"[{time_str}]"
def _format_location(self, city: str, country: str) -> str:
"""Format location from city and country with fallback to system defaults."""
city = city.strip() if city else ""
country = country.strip() if country else ""
# Use system defaults if user hasn't provided values
if not city:
city = self.valves.default_city.strip()
if not country:
country = self.valves.default_country.strip()
if city and country:
return f"{city}, {country}"
elif city:
return city
elif country:
return country
else:
return ""
async def get_location_context(
self,
__user__: Optional[dict] = None,
__event_emitter__: Optional[Callable] = None,
) -> Optional[str]:
"""Generate location context XML string from user-provided location."""
city = ""
country = ""
# Get user-provided location
if __user__:
try:
if hasattr(__user__, "valves"):
city = getattr(__user__.valves, "city", "").strip()
country = getattr(__user__.valves, "country", "").strip()
except (KeyError, AttributeError):
LOGGER.debug("No user location valves found")
location_str = self._format_location(city, country)
# If no location available, don't add context
if not location_str:
return None
detection_method = "user_manual"
# Emit status if location was provided
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": f"π Location: {location_str}", "done": True}
})
return f'<location method="{detection_method}">{location_str}</location>'
async def get_time_context(
self,
timestamp: Optional[float] = None,
variables: Optional[Dict[str, str]] = None,
__event_emitter__: Optional[Callable] = None,
__user__: Optional[dict] = None,
) -> str:
"""Generate time context XML string."""
# Default timezone
tz = self.valves.timezone
comment = "System timezone. User may not be in this timezone."
# Check for timezone from variables
if variables and variables.get("{{CURRENT_TIMEZONE}}"):
tz = variables["{{CURRENT_TIMEZONE}}"]
comment = "Timezone from user metadata."
# Check user valve timezone
if __user__:
try:
if hasattr(__user__, "valves") and hasattr(__user__.valves, "timezone"):
user_tz = __user__.valves.timezone.strip()
if user_tz:
tz = user_tz
comment = "Timezone provided by user."
except (KeyError, AttributeError):
LOGGER.debug("No user timezone valve found")
# Validate timezone
if tz not in zoneinfo.available_timezones():
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": f"Unknown timezone: {tz}. Using UTC.", "done": True}
})
tz = "UTC"
comment = "Fallback to UTC due to invalid timezone."
# Generate datetime
timezone = zoneinfo.ZoneInfo(tz)
if timestamp is None:
date = datetime.datetime.now(tz=timezone)
else:
date = datetime.datetime.fromtimestamp(timestamp, tz=timezone)
format_str = "%a %d %b %Y, %H:%M:%S"
date_str = date.strftime(format_str)
return f'<time timezone="{tz}" format="{format_str}">{date_str}</time>'
async def get_combined_context(
self,
timestamp: Optional[float] = None,
variables: Optional[Dict[str, str]] = None,
__event_emitter__: Optional[Callable] = None,
__user__: Optional[dict] = None,
) -> str:
"""Generate combined time and location context."""
# Get time context
time_context = await self.get_time_context(
timestamp=timestamp,
variables=variables,
__event_emitter__=__event_emitter__,
__user__=__user__
)
# Get location context
location_context = await self.get_location_context(
__user__=__user__,
__event_emitter__=__event_emitter__
)
# Combine contexts
combined = time_context
if location_context:
combined = f"{time_context}\n{location_context}"
return combined
@log_exceptions
async def inlet(
self,
body: dict,
__event_emitter__: Optional[Callable[[Any], Any]] = None,
__user__: Optional[dict] = None,
__metadata__: Optional[dict] = None,
__request__: Optional[object] = None,
) -> dict:
"""Process incoming messages and inject context."""
start_time = time.time()
if self.valves.debug_mode:
set_logs(LOGGER, logging.DEBUG, force=True)
LOGGER.debug("=== Context Awareness Filter Starting ===")
LOGGER.debug(f"Inlet body: {body}")
LOGGER.debug(f"User: {__user__}")
LOGGER.debug(f"Metadata: {__metadata__}")
# Check if user has disabled the filter
if __user__ and hasattr(__user__, "valves") and hasattr(__user__.valves, "enabled") and not __user__.valves.enabled:
LOGGER.debug("Context awareness disabled by user valve")
return body
messages = body.get("messages", [])
if not messages:
LOGGER.debug("No messages found in body")
return body
# Get timezone info
variables = body.get("metadata", {}).get("variables", {})
if __metadata__ and isinstance(__metadata__.get("variables"), dict):
variables.update(__metadata__["variables"])
if self.valves.debug_mode:
LOGGER.debug(f"Timezone variables: {variables}")
# Find last user message
user_message, user_message_idx = get_last_message(messages, ROLE.USER)
if user_message_idx is None or user_message is None:
LOGGER.info("No user message found")
return body
if self.valves.debug_mode:
LOGGER.debug(f"Found user message at index {user_message_idx}: {user_message}")
# Handle historical timestamping for existing chats
chat_id = __metadata__.get("chat_id") if __metadata__ else None
if self.valves.debug_mode:
LOGGER.debug(f"Chat ID: {chat_id}")
if chat_id and __request__ and httpx and self.valves.enable_history:
# This is an existing chat - try to add historical timestamps
LOGGER.debug("Attempting to fetch chat history for historical timestamping")
jwt_token = self._extract_jwt_from_request(__request__)
chat_history = await self._get_chat_history(chat_id, jwt_token, __event_emitter__)
if chat_history:
if self.valves.debug_mode:
LOGGER.debug(f"Fetched chat history, injecting timestamps.")
await self._inject_historical_timestamps(messages, chat_history, variables, __user__)
else:
LOGGER.debug("No chat history found or fetched, proceeding with current time for last message.")
# Fallback to current time if history fetch fails
context = await self.get_combined_context(
variables=variables,
__event_emitter__=__event_emitter__,
__user__=__user__
)
if self.valves.debug_mode:
LOGGER.debug(f"Generated context: {context}")
# Store query context for outlet
if query_id := body.get("metadata", {}).get("message_id"):
self._queries[query_id] = {"context": context, "timestamp": time.time()}
if self.valves.debug_mode:
LOGGER.debug(f"Stored context for query_id {query_id}")
# Inject context into user message
try:
user_message["content"] = add_or_update_filter_context(
user_message["content"], context, id=self.CONTEXT_ID
)
except Exception as e:
LOGGER.error(f"Error injecting context in inlet: {e}")
# Don't fail the whole request if context injection fails
else:
# New chat or history disabled - just add current timestamp
if self.valves.debug_mode:
LOGGER.debug("New chat or history disabled, adding current timestamp.")
context = await self.get_combined_context(
variables=variables,
__event_emitter__=__event_emitter__,
__user__=__user__
)
# Store query context for outlet
if query_id := body.get("metadata", {}).get("message_id"):
self._queries[query_id] = {"context": context, "timestamp": time.time()}
if self.valves.debug_mode:
LOGGER.debug(f"Stored context for query_id {query_id}: {self._queries[query_id]}")
# Inject context into user message
try:
user_message["content"] = add_or_update_filter_context(
user_message["content"], context, id=self.CONTEXT_ID
)
except Exception as e:
LOGGER.error(f"Error injecting context in inlet: {e}")
# Don't fail the whole request if context injection fails
if self.valves.debug_mode:
LOGGER.debug(f"Final messages: {messages}")
end_time = time.time()
LOGGER.debug(f"=== Context Awareness Filter Completed in {end_time - start_time:.4f}s ===")
return body
async def _inject_historical_timestamps(self, messages: List[dict], chat_history: dict, variables: dict, __user__: dict):
"""Inject historical timestamps into messages."""
# Build content-to-timestamp mapping
content_to_timestamps = defaultdict(list)
history_messages = chat_history.get("chat", {}).get("history", {}).get("messages", {})
if isinstance(history_messages, dict):
sorted_history = sorted(history_messages.values(), key=lambda msg: msg.get("timestamp", 0))
for msg_data in sorted_history:
if isinstance(msg_data, dict) and msg_data.get("role") == "user":
text_content = self._get_text_from_content(msg_data.get("content"))
if "timestamp" in msg_data and text_content:
content_to_timestamps[text_content].append(msg_data["timestamp"])
if self.valves.debug_mode:
LOGGER.debug(f"Built content-to-timestamp map: {content_to_timestamps}")
# Find last user message for current timestamp
last_user_idx = -1
for i in range(len(messages) - 1, -1, -1):
if messages[i].get("role") == "user":
last_user_idx = i
break
# Process messages and add timestamps
for i, message in enumerate(messages):
if message.get("role") == "user":
content = message.get("content")
text_content = self._get_text_from_content(content)
# Skip if already has timestamp from a previous run
if text_content.strip().startswith('['):
if self.valves.debug_mode:
LOGGER.debug(f"Skipping message {i}, already has timestamp: {text_content[:30]}")
continue
# Determine timestamp
if i == last_user_idx:
# Current message - use current time and location
context = await self.get_combined_context(
variables=variables,
__user__=__user__
)
message["content"] = add_or_update_filter_context(content, context, id=self.CONTEXT_ID)
if self.valves.debug_mode:
LOGGER.debug(f"Added current context to message {i}")
else:
# Historical message - use stored timestamp
timestamp_queue = content_to_timestamps.get(text_content)
if timestamp_queue:
timestamp = timestamp_queue.pop(0)
context = await self.get_combined_context(
timestamp=timestamp,
variables=variables,
__user__=__user__
)
message["content"] = add_or_update_filter_context(content, context, id=self.CONTEXT_ID)
if self.valves.debug_mode:
LOGGER.debug(f"Added historical timestamp {timestamp} to message {i}")
elif self.valves.debug_mode:
LOGGER.debug(f"Could not find timestamp for historical message {i}: {text_content[:50]}")
@log_exceptions
async def outlet(
self,
body: dict,
__event_emitter__: Optional[Callable[[Any], Any]] = None,
__user__: Optional[dict] = None,
) -> dict:
"""Process outgoing responses and update message history."""
if self.valves.debug_mode:
set_logs(LOGGER, logging.DEBUG, force=True)
LOGGER.debug("Outlet processing started")
LOGGER.debug(f"Outlet body: {body}")
LOGGER.debug(f"User: {__user__}")
# Extract required information
answer_id = body.get("id")
session_id = body.get("session_id")
chat_id = body.get("chat_id")
messages = body.get("messages")
user_id = __user__.get("id") if __user__ else None
if None in (answer_id, session_id, chat_id, messages, user_id):
LOGGER.debug("Missing required context for outlet processing")
return body
if self.valves.debug_mode:
LOGGER.debug(f"Outlet context: answer_id={answer_id}, session_id={session_id}, chat_id={chat_id}")
# Find stored query context
query = self._queries.get(answer_id)
if not query:
LOGGER.debug(f"No stored query found for answer_id: {answer_id}")
return body
if self.valves.debug_mode:
LOGGER.debug(f"Found stored query: {query}")
# Update user message with context
user_msg, user_msg_idx = get_last_message(messages, ROLE.USER)
if user_msg and user_msg_idx is not None:
try:
# Use the context management function to safely add/update our context
user_msg["content"] = add_or_update_filter_context(
user_msg["content"], query["context"], self.CONTEXT_ID
)
if self.valves.debug_mode:
LOGGER.debug(f"Updated user message with context: {user_msg}")
except Exception as e:
LOGGER.error(f"Error updating message context in outlet: {e}")
# Don't fail the whole response if context update fails
# Clean up old queries (prevent memory leak)
current_time = time.time()
expired_queries = [
k for k, v in self._queries.items()
if current_time - v.get("timestamp", 0) > 1800 # 30 minutes
]
for k in expired_queries:
del self._queries[k]
if self.valves.debug_mode and expired_queries:
LOGGER.debug(f"Cleaned up {len(expired_queries)} expired queries.")
if self.valves.debug_mode:
LOGGER.debug("Outlet processing completed")
return body
def get_last_message(messages: List[Dict[str, str]], role: str) -> Tuple[Optional[Dict[str, str]], Optional[int]]:
"""Get the last message from specified role and its index."""
for i, message in enumerate(reversed(messages)):
if message.get("role") == role:
return (message, len(messages) - i - 1)
return (None, None)
def add_or_update_filter_context(
message: str,
context: str,
id: str,
*,
selector: str = "details[type=filters_context]",
container: str = (
'<details type="filters_context">'
'\n<summary>Filters context</summary>\n'
'<!--This context was added by the system to this message, not by the user. '
'You can use information provided here, but never mention or refer to this context explicitly. -->'
'\n{content}\n<!-- User message will follow "details" closing tag. --></details>\n'
),
) -> str:
"""Add or update XML context in message."""
try:
soup = BeautifulSoup(message, "xml")
details_match = soup.select(selector)
context_end = "context_end"
context_xml = f'<context id="{id}">{context}</context>'
if not details_match:
# No existing context - create new container
out_soup = BeautifulSoup(container.format(content=context_xml), "xml").contents[0]
out_soup.append(
BeautifulSoup(f'<{context_end} uuid="{str(uuid.uuid4())}"/>', "xml").contents[0]
)
return "\n".join((str(out_soup), message))
elif len(details_match) > 1:
raise ValueError("Ill-formed message: more than one filter context container found")
else:
# Update existing context
details = details_match[0]
user_msg = _remove_context(message, details, container=container, context_end=context_end)
context_soup = BeautifulSoup(context_xml, "xml").contents[0]
# Check for existing context with same ID
same_ids = details.select(f"context[id={id}]")
if len(same_ids) > 1:
raise ValueError(f"More than one context found with id '{id}'")
elif len(same_ids) == 1:
# Replace existing context
same_ids[0].replace_with(context_soup)
else:
# Add new context
details.insert(-1, context_soup)
return "\n".join((str(soup.contents[0]), user_msg))
except Exception as e:
LOGGER.error(f"Error in add_or_update_filter_context: {e}")
# Fallback: return original message if context injection fails
return message
def _remove_context(message: str, details: bs4.Tag, container: str, context_end: str) -> str:
"""Remove context details from message and return clean user message."""
try:
# Find context_end marker
end_uuid = None
for child in details:
if getattr(child, "name", None) == context_end:
end_uuid = child.get("uuid")
break
if end_uuid is None:
raise ValueError("Ill-formed context: no context_end uuid found")
# Find UUID position and extract user message
uuid_idx = message.index(end_uuid)
# Find container closing tag
match = re.search(r"(</.*>)\s*$", container)
if match is None:
raise ValueError("Ill-formed container: no closing tag found")
closing_tag = match.groups()[0]
closing_tag_idx = message.index(closing_tag, uuid_idx)
return message[closing_tag_idx + len(closing_tag):]
except Exception as e:
LOGGER.error(f"Error in _remove_context: {e}")
# Fallback: return original message if context removal fails
return message