"""
title: Enhanced Context Counter for OpenWebUI
author: AG
author_url: https://github.com/open-webui
funding_url: https://github.com/open-webui
version: 4.0
license: MIT
requirements: tiktoken
description: Advanced context window tracker and metrics dashboard for OpenWebUI with comprehensive LLM support, cost tracking, and performance analytics.
# ENHANCED CONTEXT COUNTER FOR OPENWEBUI v4.0
## Description
The Enhanced Context Counter is a sophisticated Function Filter for OpenWebUI that provides real-time monitoring and analytics for LLM interactions. It tracks token usage, estimates costs, monitors performance metrics, and provides actionable insights through a configurable status display. The system supports a wide range of LLMs through multi-source model detection and offers extensive customization options via Valves and UserValves.
## Key Features
- **Comprehensive Model Support**: Multi-source model detection using OpenRouter API, exports, hardcoded defaults, and user-defined custom models.
- **Advanced Token Counting**: Primary tiktoken-based counting with intelligent fallbacks, content-specific adjustments, and calibration factors.
- **Cost Estimation & Budgeting**: Precise cost calculation with input/output breakdown and multi-level budget tracking (daily, monthly, session).
- **Performance Analytics**: Real-time token rate calculation, adaptive window sizing, and comprehensive session statistics.
- **Intelligent Context Management**: Context window monitoring with progress visualization, warnings, and smart trimming suggestions.
- **Persistent Cost Tracking**: File-based tracking with thread-safe operations for user, daily, and monthly costs.
- **Highly Configurable UI**: Customizable status line with modular components and visual indicators.
## Other Features
- **Image Token Estimation**: Heuristic-based calculation using defaults, resolution analysis, and model-specific overrides.
- **Calibration Integration**: Status display based on external calibration results for accuracy verification.
- **Error Resilience**: Graceful fallbacks for missing dependencies, API failures, and unrecognized models.
- **Content-Type Detection**: Specialized handling for different content types (code, JSON, tables, etc.).
- **Cache Optimization**: Token counting cache with adaptive pruning for performance enhancement.
- **Cost Optimization Hints**: Actionable suggestions for reducing costs based on usage patterns.
- **Extensive Logging**: Configurable logging with rotation for diagnostics and troubleshooting.
## Valve Configuration Guide
The function offers extensive customization through Valves (global settings) and UserValves (per-user overrides):
### Core Valves
- **[Model Detection]**: Configure model recognition with `fuzzy_match_threshold`, `vendor_family_map`, and `heuristic_rules`.
- **[Token Counting]**: Adjust accuracy with `model_correction_factors` and `content_correction_factors`.
- **[Cost/Budget]**: Set `budget_amount`, `monthly_budget_amount`, and `budget_tracking_mode` for financial controls.
- **[UI/UX]**: Customize display with toggles like `show_progress_bar`, `show_cost`, and `progress_bar_style`.
- **[Performance]**: Fine-tune with `adaptive_rate_averaging` and related window settings.
- **[Cache]**: Optimize with `enable_token_cache` and `token_cache_size`.
- **[Warnings]**: Configure alerts with percentage thresholds for context and budget usage.
### UserValves
Users can override global settings with personal preferences:
- Custom budget amounts and warning thresholds
- Model aliases for simplified model references
- Personal correction factors for token counting accuracy
- Visual style preferences for the status display
## UI Status Line Breakdown
The status line provides a comprehensive overview of the current session's metrics in a compact format:
```
🪙 48/1.0M tokens (0.00%) [▱▱▱▱▱] | 🔽5/🔼43 | 💰 $0.000000 | 🏦 Daily: $0.009221/$100.00 (0.0%) | ⏱️ 5.1s (8.4 t/s) | 🗓️ $99.99 left (0.01%) this month | Text: 48 | 🔧 Not Calibrated
```
### Status Components
- **🪙 48/1.0M tokens (0.00%)**: Total tokens used / context window size with percentage
- **[▱▱▱▱▱]**: Visual progress bar showing context window usage
- **🔽5/🔼43**: Input/Output token breakdown (5 input, 43 output)
- **💰 $0.000000**: Total estimated cost for the current session
- **🏦 Daily: $0.009221/$100.00 (0.0%)**: Daily budget usage (spent/total and percentage)
- **⏱️ 5.1s (8.4 t/s)**: Elapsed time and tokens per second rate
- **🗓️ $99.99 left (0.01%) this month**: Monthly budget status (remaining amount and percentage used)
- **Text: 48**: Text token count (excludes image tokens if present)
- **🔧 Not Calibrated**: Calibration status of token counting accuracy
### Display Modes
The status line adapts to different levels of detail based on configuration:
1. **Minimal**: Shows only essential information (tokens, context percentage)
```
🪙 48/1.0M tokens (0.00%)
```
2. **Standard**: Includes core metrics (default mode)
```
🪙 48/1.0M tokens (0.00%) [▱▱▱▱▱] | 🔽5/🔼43 | 💰 $0.000000 | ⏱️ 5.1s (8.4 t/s)
```
3. **Detailed**: Displays all available metrics including budgets, token breakdowns, and calibration status
```
🪙 48/1.0M tokens (0.00%) [▱▱▱▱▱] | 🔽5/🔼43 | 💰 $0.000000 | 🏦 Daily: $0.009221/$100.00 (0.0%) | ⏱️ 5.1s (8.4 t/s) | 🗓️ $99.99 left (0.01%) this month | Text: 48 | 🔧 Not Calibrated
```
The display automatically adjusts based on available space and configured preferences in the Valves settings.
## Roadmap (2025-2026)
1. Enhanced model family detection with ML-based classification
2. Advanced content-specific token counting with specialized encoders
3. Interactive UI components for real-time adjustments and analytics
4. Predictive budget forecasting based on usage patterns
5. Cross-session analytics with visualization and reporting
7. API for external integration with monitoring and alerting systems
"""
import time
# import tiktoken # Moved down
import logging
import asyncio
import os
import re
import json
import urllib.request
import urllib.error # Import specific error type
from typing import List, Optional, Dict, Callable, Any, Awaitable, Tuple
from pydantic import BaseModel, Field
# from decimal import ROUND_HALF_UP, Decimal # Removed unused import
from collections import deque # Feature 4: Import deque
from datetime import date, datetime, timedelta
from pyee import EventEmitter # Added for cache TTL check
# Fix 1: Import locking libraries (Corrected Structure)
fcntl = None
msvcrt = None
LOCK_EX = 0
LOCK_SH = 0
LOCK_NB = 0
LOCK_UN = 0
try:
if os.name == "posix":
import fcntl
LOCK_EX = fcntl.LOCK_EX
LOCK_SH = fcntl.LOCK_SH
LOCK_NB = fcntl.LOCK_NB
LOCK_UN = fcntl.LOCK_UN
print("DEBUG: fcntl imported successfully.") # Debug print
elif os.name == "nt":
import msvcrt
print("DEBUG: msvcrt imported successfully.") # Debug print
except ImportError as e:
print(f"DEBUG: Locking library import failed ({os.name}): {e}") # Debug print
# Locking will not be available, functions using it should handle this.
# Define fallback functions *before* trying the import
def fallback_get_last_assistant_message(messages):
"""Fallback: Get the last assistant message."""
for message in reversed(messages):
if message.get("role") == "assistant" and "content" in message:
return message.get("content", "")
return ""
def fallback_get_messages_content(messages):
"""Fallback: Get all message content joined."""
return "\n".join(
[
msg.get("content", "")
for msg in messages
if isinstance(msg.get("content"), str)
]
)
# Import helpers from OpenWebUI - handle gracefully if not available
try:
# Attempt to import from the expected location
from open_webui.utils.misc import get_last_assistant_message, get_messages_content # type: ignore
print("DEBUG: open_webui.utils.misc imported successfully.") # Debug print
except ImportError:
print("DEBUG: open_webui.utils.misc import failed. Using fallbacks.") # Debug print
# Assign fallbacks if import fails
get_last_assistant_message = fallback_get_last_assistant_message
get_messages_content = fallback_get_messages_content
# Global error handler for uncaught exceptions
import sys
def global_exception_handler(exc_type, exc_value, exc_traceback):
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
logger.error("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))
sys.excepthook = global_exception_handler
# Add asyncio error handler
def handle_async_exception(loop, context):
msg = context.get("exception", context["message"])
logger.error(f"Asyncio error: {msg}")
try:
loop = asyncio.get_event_loop()
loop.set_exception_handler(handle_async_exception)
except Exception:
pass
# Set up logging
logger = logging.getLogger("EnhancedContextCounter")
handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
# logger.setLevel(logging.INFO) # Fix 2: Moved level setting down
# Check for tiktoken availability
TIKTOKEN_AVAILABLE = False
try:
import tiktoken
TIKTOKEN_AVAILABLE = True
except ImportError:
logger.warning(
"tiktoken package not available. Will use fallback token counting method."
)
# Print a more detailed message to help users troubleshoot
print(
"NOTE: 'tiktoken' package is not installed. Using fallback token counting method instead."
)
print("TROUBLESHOOTING: If you're seeing installation errors, try these steps:")
print(" 1. Check your Python version (tiktoken requires Python 3.8+)")
print(" 2. Try installing with: pip install -U setuptools wheel")
print(" 3. For Linux systems, you may need 'build-essential' package")
print(" 4. Consider using alternatives like 'transformers' tokenizers if needed")
print("The context counter will continue to work with reduced accuracy.")
# We'll handle this in the token counting methods with our fallback mechanism
# Constants
PROGRESS_CHARS = ["▱", "▰"] # Empty/filled progress bar characters
WARNING_EMOJI = "⚠️"
TOKEN_EMOJI = "🪙"
CLOCK_EMOJI = "⏱️"
MONEY_EMOJI = "💰"
CHART_EMOJI = "📊"
CACHE_EMOJI = "🔄"
INPUT_TOKEN_EMOJI = "📥" # Added for input token visualization
OUTPUT_TOKEN_EMOJI = "📤" # Added for output token visualization (cost breakdown)
BUDGET_EMOJI = "🏦" # Feature 2: Emoji for budget
SCISSORS_EMOJI = "✂️" # For trimming hint
# Feature 7: Fallback token counting estimates
CHAR_PER_TOKEN_ESTIMATE = {
"text": 4.0,
"code": 3.5,
"json": 3.0,
"table": 3.8, # Estimate for tables
"list": 4.2, # Estimate for lists
"default": 4.0, # Default estimate
}
# ANSI Colors for terminal output
COLORS = {
"reset": "\033[0m",
"bold": "\033[1m",
"blue": "\033[34m",
"green": "\033[32m",
"yellow": "\033[33m",
"red": "\033[31m",
"cyan": "\033[36m",
"magenta": "\033[35m",
# Light versions
"light_blue": "\033[94m",
"light_green": "\033[92m",
"light_yellow": "\033[93m",
"light_red": "\033[91m",
"light_cyan": "\033[96m",
"light_magenta": "\033[95m",
# Background colors
"bg_blue": "\033[44m",
"bg_green": "\033[42m",
"bg_yellow": "\033[43m",
"bg_red": "\033[41m",
}
# Configuration settings
DATA_DIR = "data"
CACHE_DIR = os.path.join(DATA_DIR, ".cache")
TOKEN_CACHE_DIR = os.path.join(CACHE_DIR, "token_cache")
PATTERN_CACHE_DIR = os.path.join(CACHE_DIR, "patterns")
USER_COST_FILE = os.path.join(DATA_DIR, f"costs-{time.strftime('%Y')}.json")
DAILY_COST_FILE = os.path.join(
DATA_DIR, "daily_costs.json"
) # Feature 2: Daily cost file
CACHE_TTL = 432000 # 5 days
CACHE_MAXSIZE = 1000
DECIMALS = "0.00000001"
MODEL_CACHE_FILE = os.path.join(
CACHE_DIR, "openrouter_models.json"
) # For Model Data Cache feature
MODEL_CACHE_TTL_SECONDS = 86400 # 24 hours for Model Data Cache
# --- Pydantic Models for Custom Model Definitions ---
class CustomModelPricing(BaseModel):
input: float = Field(default=0.0, description="Cost per input token (default: 0.0)")
output: float = Field(
default=0.0, description="Cost per output token (default: 0.0)"
)
class CustomModelDefinition(BaseModel):
id: str = Field(
...,
description="Unique ID for the custom model (e.g., 'ollama/llama3', 'my-local-model')",
)
context_length: int = Field(..., description="Context window size in tokens")
pricing: CustomModelPricing = Field(
default_factory=CustomModelPricing, description="Optional pricing per token"
)
# Optional: Add a 'family' field if needed for specific tokenizer logic later
# family: Optional[str] = Field(default=None, description="Optional model family hint (e.g., 'llama', 'mistral') for tokenizer selection")
class Filter:
"""A Function that provides enhanced metrics display with context tracking, cost estimation and performance stats."""
class Valves(BaseModel):
"""Configuration valves for the Enhanced Context Counter function."""
# VALIDATION ERROR FIX: The structured `custom_models` list field was removed due to persistent
# Pydantic validation errors when loading from saved config. The issue occurred because
# the nested structure with custom model objects couldn't be properly deserialized from
# the saved JSON configuration, causing validation errors on startup.
#
# SOLUTION: The `custom_models_plaintext` field below provides a more robust alternative
# that uses simple string parsing instead of complex nested objects. This approach is
# more resilient to serialization/deserialization issues and allows for easier user input.
# The corresponding commented-out code for the structured approach has been removed for clarity.
custom_models_plaintext: str = Field(
default="openai/gpt-4o 128000 0.000005 0.000015",
title="Custom Models (Plaintext - One per line)",
description="Add models easily. Enter one model per line in this format: <ID> <Context> <Input Cost> <Output Cost>\nExample:\nopenai/gpt-4o 128000 0.000005 0.000015\nmylocal/mistral-7b 32768 0 0\n\nDetails: ID=Model Identifier, Context=Max Tokens, Costs=USD **per token** (use 0 for free models)."
)
# --- Image Token Estimation Heuristics ---
default_image_tokens: int = Field(
default=500,
title="[Image Tokens] Default Tokens Per Image",
description="Default estimated tokens per image if no other heuristic applies.",
)
tokens_per_megapixel: int = Field(
default=100,
title="[Image Tokens] Tokens Per Megapixel",
description="Estimated tokens per megapixel of image resolution.",
)
model_image_token_overrides: Dict[str, int] = Field(
default_factory=dict,
title="[Image Tokens] Model-Specific Image Token Overrides",
description="Overrides for specific models, e.g., {'gpt-4-vision': 1500}",
)
# --- Model Detection Improvement Plan Phase 1 ---
force_openrouter_refresh: bool = Field(
default=True,
title="[Model Detection] Force OpenRouter Refresh",
description="Force refresh OpenRouter model list on next startup or manual trigger.",
)
openrouter_refresh_interval_hours: int = Field(
default=1,
title="[Model Detection] OpenRouter Refresh Interval (hours)",
description="How often to refresh OpenRouter model list automatically.",
)
# --- Model Detection Improvement Plan Phase 2 ---
fuzzy_match_threshold: int = Field(
default=90,
title="[Model Detection] Fuzzy Match Similarity Threshold (%)",
description="Minimum similarity score (0-100) to accept fuzzy match for model detection.",
)
# --- Model Detection Improvement Plan Phase 3 ---
vendor_family_map: Dict[str, Tuple[str, int]] = Field(
default_factory=lambda: {
"openai": ("gpt4", 128000),
"anthropic": ("claude", 200000),
"google": ("gemini", 1000000),
"mistralai": ("mistral", 32768),
"meta-llama": ("llama", 128000), # Updated default Llama family context to 128k
"qwen": ("qwen", 32768),
"cohere": ("cohere", 128000),
"x-ai": ("grok", 131072),
"cognitivecomputations": ("dolphin", 32768),
"deepseek": ("deepseek", 131072),
"all-hands": ("openhands", 32768),
"openrouter": ("quasar", 32768),
},
title="[Model Detection] Vendor to Family Map",
description="Map vendor prefixes to (family, context_size) fallback if exact/fuzzy match fails.",
)
# --- Model Detection Improvement Plan Phase 4 ---
heuristic_rules: Dict[str, Tuple[str, int]] = Field(
default_factory=lambda: {
"flash": ("gemini", 1000000),
"gemini": ("gemini", 1000000),
"sonnet": ("claude", 200000),
"opus": ("claude", 200000),
"haiku": ("claude", 200000), # Corrected Haiku context
"mixtral": ("mixtral", 32768),
"pixtral": ("mixtral", 32768), # Corrected Pixtral context (Mistral docs say 16k, but OR lists 32k/131k - using 32k as safer default)
"deephermes": ("llama", 8192),
"dolphin": ("dolphin", 32768),
"quasar": ("quasar", 32768),
"grok": ("grok", 131072),
"llama": ("llama", 8192), # Base Llama rule
"nemotron": ("llama", 128000), # Updated Nemotron context
"deepseek": ("deepseek", 131072),
"command": ("cohere", 128000),
"openhands": ("openhands", 32768),
"claude": ("claude", 200000),
"gpt-4.5": ("gpt4", 128000),
"gpt-4o": ("gpt4o", 128000),
"gpt-4-turbo": ("gpt4", 128000), # Added specific turbo rule
"gpt-4": ("gpt4", 128000), # Keep general gpt-4 rule
"gpt-3.5": ("gpt35", 16385),
"o1": ("openai", 200000), # Corrected o1 context
"o3": ("openai", 200000), # Corrected o3 context
},
title="[Model Detection] Heuristic Substring Rules",
description="Map substrings to (family, context_size) fallback if other methods fail.",
)
# --- End Phase 4 additions ---
# --- Unknown Model Logging ---
log_unknown_models: bool = Field(
default=True,
title="[Model Detection] Log Unknown Models",
description="Enable logging of unrecognized model names for continuous improvement.",
)
show_detection_source: bool = Field(
default=True,
title="[UI/UX] Show Model Detection Source",
description="Show how the model was detected (exact, alias, fuzzy, vendor, heuristic, fallback) in the UI status message.",
)
unknown_models_log_max_size_kb: int = Field(
default=1024,
title="[Model Detection] Unknown Models Log Max Size (KB)",
description="Rotate unknown_models.log if it exceeds this size (in KB).",
)
# Enable all features by default for testing
enable_token_cache: bool = True
enable_model_data_cache: bool = True
enable_pattern_recognition: bool = True
enable_content_detection: bool = True
content_specific_counting: bool = True
intelligent_trimming_hint: bool = True
persist_user_costs: bool = True
show_status: bool = True
show_progress: bool = True
show_tokens_per_message: bool = True
show_after_completion: bool = True
use_enhanced_visuals: bool = True
show_cost_summary: bool = True
show_metrics_panel: bool = True
colored_output: bool = True
show_cache_metrics: bool = True
show_content_breakdown: bool = True
adaptive_rate_averaging: bool = True
# --- End Unknown Model Logging ---
# [General]
priority: int = Field(
default=100,
title="[General] Priority",
description="Priority level for execution (lower runs earlier in pipeline). Set high (e.g., 100) to ensure UI status appears after all other filters.",
)
status_emission_retries: int = Field(
default=3,
title="[General] Status Emission Retries",
description="Number of retries for failed status emissions.",
)
retry_delay_ms: int = Field(
default=100,
title="[General] Status Emission Retry Delay (ms)",
description="Delay between status emission retries in milliseconds.",
)
# [UI/UX]
show_status: bool = Field(
default=True,
title="[UI/UX] Show Status Updates",
description="Show status updates in the UI.",
)
show_progress: bool = Field(
default=True,
title="[UI/UX] Show Progress Bar",
description="Show progress bar for context usage.",
)
bar_length: int = Field(
default=5,
title="[UI/UX] Progress Bar Length",
description="Length of the visual progress bar.",
)
progress_bar_style: str = Field(
default="standard", # Options: standard, minimal, none
title="[UI/UX] Progress Bar Style",
description="Style of the progress bar ('standard'=[▰▱▱▱▱], 'minimal'=▰, 'none'=hide).",
)
show_tokens_per_message: bool = Field(
default=True,
title="[UI/UX] Show Tokens Per Message",
description="Show token count for each message in status.",
)
show_after_completion: bool = Field(
default=True,
title="[UI/UX] Show Status After Completion",
description="Continue showing context usage after text generation is complete.",
)
# --- Modular UI Status Toggles ---
show_total_tokens: bool = Field(default=True, title="[UI/UX] Show Total Tokens")
show_text_image_split: bool = Field(default=True, title="[UI/UX] Show Text/Image Token Split")
show_context_percentage: bool = Field(default=True, title="[UI/UX] Show Context Percentage")
show_progress_bar: bool = Field(default=True, title="[UI/UX] Show Progress Bar")
show_cost: bool = Field(default=True, title="[UI/UX] Show Cost")
show_cost_breakdown: bool = Field(default=True, title="[UI/UX] Show Cost Breakdown")
show_budget_info: bool = Field(default=True, title="[UI/UX] Show Budget Info (Daily/Session)")
show_monthly_budget_info: bool = Field(default=True, title="[UI/UX] Show Monthly Budget Info")
show_daily_spend_info: bool = Field(default=True, title="[UI/UX] Show Daily Spend Info")
show_performance_metrics: bool = Field(default=True, title="[UI/UX] Show Performance Metrics")
show_image_token_warning: bool = Field(default=True, title="[UI/UX] Show Image Token Warning")
show_trimming_hint: bool = Field(default=True, title="[UI/UX] Show Trimming Hint")
show_cache_hit_rate: bool = Field(default=False, title="[UI/UX] Show Cache Hit Rate (Debug)")
show_error_rate: bool = Field(default=False, title="[UI/UX] Show Error Rate") # Removed (Debug) suffix
show_cost_comparisons: bool = Field(default=False, title="[UI/UX] Show Cost Comparisons")
show_calibration_status: bool = Field(default=True, title="[UI/UX] Show Calibration Status")
show_calibration_timestamp: bool = Field(default=True, title="[UI/UX] Show Calibration Timestamp") # Added new valve
use_enhanced_visuals: bool = Field(
default=True,
title="[UI/UX] Use Enhanced Visuals",
description="Use enhanced visual elements like symbols and better formatting.",
)
show_cost_summary: bool = Field(
default=True,
title="[UI/UX] Show Cost Summary",
description="Show cost summary in the metrics display.",
)
show_metrics_panel: bool = Field(
default=True,
title="[UI/UX] Show Performance Metrics",
description="Show expanded metrics panel with performance stats.",
)
colored_output: bool = Field(
default=True,
title="[UI/UX] Colored Terminal Output",
description="Use ANSI color codes in the terminal output (requires env var).",
)
show_cache_metrics: bool = Field(
default=True,
title="[UI/UX] Show Cache Metrics",
description="Show cache hit/miss metrics.", # Removed detailed/debug mention
)
show_content_breakdown: bool = Field(
default=True, # Keep this default, relates to future use
title="[UI/UX] Show Content Breakdown",
description="Show breakdown of token usage by content type in status (Future Use).",
)
# [Warnings]
warn_at_percentage: float = Field(
default=75.0,
title="[Warnings] Context Warning Threshold (%)",
description="Percentage of context window at which to show warnings.",
)
# [Calibration]
log_ui_token_counts: bool = Field(
default=True,
title="[Calibration] Log UI Token Counts",
description="If enabled, logs plugin token counts and prompts user to input UI counts for calibration.",
)
# [Token Counting Correction Factors]
model_correction_factors: Dict[str, float] = Field(
default_factory=lambda: {
"all-hands/openhands-lm-32b-v0.1": 1.0,
"anthropic/claude-3.5-haiku": 1.2864,
"anthropic/claude-3.5-sonnet": 1.2519,
"anthropic/claude-3.7-sonnet": 1.1926,
"anthropic/claude-3.7-sonnet:thinking": 1.2788,
"cognitivecomputations/dolphin-mixtral-8x22b": 1.0,
"deepseek/deepseek-chat-v3-0324": 1.1554,
"deepseek/deepseek-chat-v3-0324:free": 1.1538,
"deepseek/deepseek-r1": 1.0,
"deepseek/deepseek-r1:free": 1.0,
"google/gemini-2.0-flash-001": 1.1474,
"google/gemini-2.0-flash-thinking-exp-1219:free": 1.3637,
"google/gemini-2.0-flash-thinking-exp:free": 1.3637,
"google/gemini-2.5-pro-exp-03-25:free": 1.3478,
"google/gemini-2.5-pro-preview-03-25": 1.3637,
"google/gemma-3-27b-it": 1.0,
"google/gemma-3-27b-it:free": 1.0,
"mistralai/pixtral-large-2411": 1.0,
"nousresearch/deephermes-3-llama-3-8b-preview:free": 1.0,
"nvidia/llama-3.1-nemotron-ultra-253b-v1:free": 1.0,
"nvidia/llama-3.3-nemotron-super-49b-v1:free": 1.0685,
"openai/chatgpt-4o-latest": 1.2140,
"openai/gpt-4.5-preview": 1.1994,
"openai/o1": 1.6884,
"openai/o1-pro": 1.3998,
"openai/o3-mini-high": 1.7964,
"openrouter/quasar-alpha": 1.2492,
"qwen/qwq-32b": 1.7994,
"x-ai/grok-3-beta": 1.1121,
"x-ai/grok-3-mini-beta": 1.7971,
},
title="[Token Counting] Model Correction Factors",
description="Correction factors per model (e.g., {'gemini':1.1}) to adjust token counts.",
)
content_correction_factors: Dict[str, float] = Field(
default_factory=lambda: {
"plain_short": 1.0,
"plain_long": 1.0,
"code": 1.05,
"json": 1.05,
"markdown": 1.05,
"emoji": 1.10,
},
title="[Token Counting] Content-Type Correction Factors",
description="Correction factors per content type (e.g., {'code':1.05, 'emoji':1.1}) to adjust token counts.",
)
critical_at_percentage: float = Field(
default=90.0,
title="[Warnings] Context Critical Threshold (%)",
description="Percentage of context window at which to show critical warnings.",
)
budget_warning_percentage: float = Field( # Feature 2: Budget warning threshold
default=80.0,
title="[Warnings] Budget Warning Threshold (%)",
description="Percentage of budget usage at which to show a warning.",
)
prompt_cost_warning_threshold: float = Field(
default=0.005, # Half a cent
title="[Warnings] Inlet Prompt Cost Threshold ($)",
description="Warn via log if estimated input prompt cost exceeds this $ amount (0=disable).",
)
enable_cost_optimization_hints: bool = Field(
default=True,
title="[Warnings] Enable Cost Optimization Hints",
description="Show actionable hints for cost optimization in the status line.",
)
expensive_model_cost_threshold: float = Field(
default=0.00001, # 1 cent per 1K tokens
title="[Warnings] Expensive Model Threshold ($ per token)",
description="Threshold for considering a model 'expensive' for cost hint suggestions.",
)
# [Cost/Budget]
budget_amount: float = (
Field( # Feature 2: Default budget (can be overridden by UserValves)
default=100.0,
title="[Cost/Budget] Budget Amount ($)",
description="Default budget amount in dollars (per day or session).",
)
)
budget_tracking_mode: str = Field( # Feature 2: Add budget tracking mode
default="daily", # Options: 'daily', 'session'
title="[Cost/Budget] Budget Tracking Mode",
description="Track budget usage 'daily' or per 'session'.",
)
monthly_budget_amount: float = Field( # Added monthly budget valve
default=100.0,
title="[Cost/Budget] Monthly Budget Amount ($)",
description="Monthly budget amount in dollars (0 to disable).",
)
compensation: float = Field(
default=1.0,
title="[Cost/Budget] Cost Compensation Factor",
description="Compensation factor for cost calculation (e.g., 1.5 for 10% markup).",
)
# [Performance]
metrics_refresh_rate: float = Field(
default=0.5,
title="[Performance] Metrics Refresh Rate (s)",
description="Refresh rate for metrics display in seconds (Not currently used).",
)
stream_update_interval: float = Field( # Feature 9: Add Valve
default=0.2,
title="[Performance] Stream Update Interval (s)",
description="Interval in seconds for updating stream status (e.g., tokens/sec).",
)
adaptive_rate_averaging: bool = Field(
default=True,
title="[Performance] Enable Adaptive Rate Averaging",
description="Enable adaptive window for rolling token rate calculation.",
)
rate_avg_window_min: int = Field(
default=3,
title="[Performance] Adaptive Rate Min Window",
description="Minimum window size (samples) for adaptive rate.",
)
rate_avg_window_max: int = Field(
default=30,
title="[Performance] Adaptive Rate Max Window",
description="Maximum window size (samples) for adaptive rate.",
)
rate_fast_threshold: float = Field(
default=150.0,
title="[Performance] Adaptive Rate Fast Threshold (t/s)",
description="Tokens/sec threshold to consider 'fast' for adaptive rate window.",
)
rate_slow_threshold: float = Field(
default=10.0,
title="[Performance] Adaptive Rate Slow Threshold (t/s)",
description="Tokens/sec threshold to consider 'slow' for adaptive rate window.",
)
# [Cache]
enable_token_cache: bool = Field(
default=True,
title="[Cache] Enable Token Cache",
description="Enable token counting cache for improved performance.",
)
token_cache_size: int = Field(
default=2000,
title="[Cache] Token Cache Size",
description="Maximum number of entries in the token cache.",
)
enable_model_data_cache: bool = Field(
default=True,
title="[Cache] Enable Model Data Cache",
description="Enable caching of fetched OpenRouter model list to reduce API calls on startup.",
)
# [Models]
# [Token Counting]
enable_pattern_recognition: bool = Field(
default=True,
title="[Token Counting] Enable Pattern Recognition",
description="Enable detection of repeating patterns for optimized token counting (Future Use).",
)
enable_content_detection: bool = Field(
default=True,
title="[Token Counting] Enable Content Type Detection",
description="Enable detection of content types (code, JSON, etc.) for specialized handling.",
)
content_specific_counting: bool = Field(
default=True,
title="[Token Counting] Content-Specific Counting",
description="Use specialized counting methods for different content types (Future Use).",
)
# [Trimming]
intelligent_trimming_hint: bool = Field(
default=True,
title="[Trimming] Enable Intelligent Trimming Hint",
description="Show intelligent hints about trimming early messages when context is critical.",
)
trimming_hint_message_count: int = Field(
default=3,
title="[Trimming] Trimming Hint Message Count",
description="Number of early messages to analyze for the trimming hint.",
)
# [Persistence]
persist_user_costs: bool = Field(
default=True,
title="[Persistence] Persist User Costs",
description="Save user cost data to disk (data/costs-{year}.json).",
)
# [Debug/Log]
log_level: str = Field(
default="INFO",
title="[Debug/Log] Log Level",
description="Logging level (DEBUG, INFO, WARNING, ERROR).",
)
debug: bool = Field(
default=False,
title="[Debug/Log] Enable Debug Logging",
description="Enable debug logging.",
)
class UserValves(BaseModel):
"""Per-user configuration options."""
enabled: bool = Field(
default=True, description="Enable or disable the function for this user"
)
show_status: bool = Field(
default=True, description="Show status updates for this user"
)
warn_at_percentage: float = Field(
default=75.0, description="Custom warning threshold for this user"
)
bar_style: str = Field(
default="standard",
description="Visual style for progress bar (standard, minimal, detailed)",
)
budget_amount: Optional[float] = Field( # Feature 2: Allow user override
default=None,
description="User's budget amount in dollars (overrides global)",
)
model_aliases: Dict[str, str] = Field( # Feature 1: Add model aliases
default_factory=dict,
description="User-defined aliases for model IDs (e.g., {'o3mh': 'OR.openai/o3-mini-high'})",
)
# New: User override correction factors
model_correction_factors: Dict[str, float] = Field(
default_factory=dict,
description="User override correction factors per model (e.g., {'gemini':1.1})",
)
content_correction_factors: Dict[str, float] = Field(
default_factory=dict,
description="User override correction factors per content type (e.g., {'code':1.05})",
)
enable_correction_factors: bool = Field(
default=True,
description="Enable applying correction factors (user or global)",
)
monthly_budget_amount: Optional[float] = Field( # Add monthly budget override for users
default=None,
description="User's monthly budget amount in dollars (overrides global, 0 to disable)",
)
def __init__(self):
"""Initialize the enhanced context counter with model context sizes and pricing data."""
self.valves = self.Valves()
# Request tracking for status persistence
self.current_request_id = None
# Set debug mode based on valve configuration
self.debug_mode = self.valves.debug
# Ensure absolute path for logs directory
log_dir = os.path.join(os.getcwd(), "logs")
log_file = os.path.join(log_dir, "context_counter.log")
# Safely create log directory
os.makedirs(log_dir, exist_ok=True)
# Add file handler for persistent logging
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# Set log level based on config *before* logging init message (Fix 2)
logger.setLevel(
logging.DEBUG
if self.debug_mode
else getattr(logging, self.valves.log_level, logging.INFO)
)
# Log the absolute path to the log file for easy location
print(f"CONTEXT COUNTER: Log file created at: {os.path.abspath(log_file)}")
logger.info(
f"Logging initialized. Log file location: {os.path.abspath(log_file)}"
) # Fix 2: Moved after setLevel
# Initialize context recognition patterns cache for improved performance
self.language_patterns_cache = {}
self.content_recognition_patterns = self._initialize_content_patterns()
# Check for environment variable to enable colored output
self.valves.colored_output = os.environ.get(
"OPENWEBUI_USE_COLORS", "false"
).lower() in ["true", "1", "yes", "y"]
logger.info(
f"ANSI Color support is {'ENABLED' if self.valves.colored_output else 'DISABLED'}"
)
try:
os.makedirs(DATA_DIR)
except Exception as e:
logger.warning(f"Could not create data directory: {e}")
if not os.path.exists(CACHE_DIR):
try:
os.makedirs(CACHE_DIR)
except Exception as e:
logger.warning(f"Could not create cache directory: {e}")
# ENHANCEMENT: Always try to load context sizes from OpenRouter API by default
self.dynamic_contexts = (
os.environ.get("OPENWEBUI_DYNAMIC_CONTEXTS", "true").lower() == "true"
)
self.openrouter_api_key = os.environ.get("OPENROUTER_API_KEY", "")
# --- Model Data Initialization Order ---
# 1. Load from Cache (if enabled and valid)
# 2. Fetch from OpenRouter API (if enabled and cache invalid/disabled) -> Update Cache
# 3. Load Hardcoded Defaults
# 4. Load from JSON/MD Export (if exists)
# 5. Load Custom Models from Valves (Highest Priority)
# Initialize dictionaries
self.model_contexts = {}
self.model_pricing = {}
self.using_dynamic_pricing = False # Reset flag
# 1. & 2. Load from Cache or Fetch from OpenRouter API
fetched_models = {}
if self.dynamic_contexts: # Check if dynamic loading is generally enabled
cache_valid = False
if self.valves.enable_model_data_cache:
fetched_models, cache_valid = self._load_models_from_cache()
if not cache_valid:
fetched_models = self._fetch_openrouter_model_list()
if fetched_models and self.valves.enable_model_data_cache:
self._save_models_to_cache(
fetched_models
) # Update cache if fetch successful
# Merge fetched data using setdefault (lowest priority)
for model_id, data in fetched_models.items():
if data.get("context_length") is not None:
# Ensure context length is an integer
try:
context_len = int(data["context_length"])
if context_len > 0:
self.model_contexts.setdefault(model_id, context_len)
except (ValueError, TypeError):
logger.warning(
f"Invalid context length '{data['context_length']}' for model {model_id} from API/Cache."
)
if data.get("pricing") and (
data["pricing"]["input"] > 0 or data["pricing"]["output"] > 0
):
self.model_pricing.setdefault(model_id, data["pricing"])
# Set flag only if API provided actual pricing
if not self.using_dynamic_pricing and (
data["pricing"]["input"] > 0 or data["pricing"]["output"] > 0
):
self.using_dynamic_pricing = True
# 3. Load Hardcoded Defaults (Medium priority - fills gaps)
# HARDCODED CONSTANTS FOR MAJOR MODELS
# Force correct context sizes for known models
self.GPT4O_CONTEXT_SIZE = 128000
self.GEMINI_FLASH_CONTEXT_SIZE = 1000000 # Corrected to exactly 1M tokens
self.GEMINI_PRO_CONTEXT_SIZE = 256000
self.CLAUDE3_OPUS_CONTEXT_SIZE = 200000
# Initialize fallback for unrecognized models (no longer using default_context_size from valves)
self.fallback_context_size = 4096 # Fallback if everything else fails
# Hardcoded model context windows (will be supplemented/overridden)
hardcoded_contexts = {
"all-hands/openhands-lm-32b-v0.1": 32768, # Updated from API
"anthropic/claude-3.5-haiku": 200000,
"anthropic/claude-3.5-sonnet": 200000,
"anthropic/claude-3.7-sonnet": 200000,
"anthropic/claude-3.7-sonnet:thinking": 200000,
"cognitivecomputations/dolphin-mixtral-8x22b": 65536, # Updated from API
"deepseek/deepseek-chat-v3-0324": 163840, # Updated from API
"deepseek/deepseek-chat-v3-0324:free": 163840,
"deepseek/deepseek-r1": 163840,
"deepseek/deepseek-r1:free": 163840,
"google/gemini-2.0-flash-001": 1048576, # Updated from API
"google/gemini-2.0-flash-thinking-exp-1219:free": 1048576, # Updated from API
"google/gemini-2.0-flash-thinking-exp:free": 1048576,
"google/gemini-2.5-pro-exp-03-25:free": 1048576, # Updated from API
"google/gemini-2.5-pro-preview-03-25": 1048576, # Updated from API
"google/gemma-3-27b-it": 131072,
"google/gemma-3-27b-it:free": 131072, # Updated from API
"mistralai/pixtral-large-2411": 131072,
"nousresearch/deephermes-3-llama-3-8b-preview:free": 131072,
"nvidia/llama-3.1-nemotron-ultra-253b-v1:free": 131072,
"nvidia/llama-3.3-nemotron-super-49b-v1:free": 131072,
"openai/chatgpt-4o-latest": 128000,
"openai/gpt-4.1": 1048576, # Updated from API
"openai/gpt-4.1-mini": 1048576, # Updated from API
"openai/gpt-4.1-nano": 1048576, # Updated from API
"openai/gpt-4.5-preview": 128000,
"openai/o1": 200000,
"openai/o1-pro": 200000,
"openai/o3-mini-high": 200000,
"qwen/qwq-32b": 131072,
"x-ai/grok-3-beta": 131072,
"x-ai/grok-3-mini-beta": 131072,
}
# Merge hardcoded contexts using setdefault
for model_id, context in hardcoded_contexts.items():
self.model_contexts.setdefault(model_id, context)
# Base model pricing data as fallback (will be supplemented/overridden)
# Prices are per token (API prices are per million, converted here)
hardcoded_pricing = {
"all-hands/openhands-lm-32b-v0.1": {"input": 9.0e-07, "output": 9.0e-07}, # Updated from API
"anthropic/claude-3.5-haiku": {"input": 2.5e-07, "output": 1.25e-06}, # Updated from API
"anthropic/claude-3.5-sonnet": {"input": 3.0e-06, "output": 1.5e-05}, # Updated from API
"anthropic/claude-3.7-sonnet": {"input": 3.0e-06, "output": 1.5e-05}, # Updated from API
"anthropic/claude-3.7-sonnet:thinking": {"input": 3.0e-06, "output": 1.5e-05}, # Updated from API
"cognitivecomputations/dolphin-mixtral-8x22b": {"input": 6.5e-07, "output": 6.5e-07}, # Updated from API
"deepseek/deepseek-chat-v3-0324": {"input": 1.4e-07, "output": 2.8e-07}, # Updated from API
"deepseek/deepseek-chat-v3-0324:free": {"input": 0.0, "output": 0.0},
"deepseek/deepseek-r1": {"input": 1.4e-07, "output": 2.8e-07}, # Updated from API
"deepseek/deepseek-r1:free": {"input": 0.0, "output": 0.0},
"google/gemini-2.0-flash-001": {"input": 1.25e-07, "output": 3.75e-07}, # Updated from API
"google/gemini-2.0-flash-thinking-exp-1219:free": {"input": 0.0, "output": 0.0},
"google/gemini-2.0-flash-thinking-exp:free": {"input": 0.0, "output": 0.0},
"google/gemini-2.5-pro-exp-03-25:free": {"input": 0.0, "output": 0.0},
"google/gemini-2.5-pro-preview-03-25": {"input": 5.0e-07, "output": 1.5e-06}, # Updated from API
"google/gemma-3-27b-it": {"input": 1e-06, "output": 1e-06},
"google/gemma-3-27b-it:free": {"input": 0.0, "output": 0.0},
"mistralai/pixtral-large-2411": {"input": 1e-06, "output": 1e-06},
"nousresearch/deephermes-3-llama-3-8b-preview:free": {"input": 0.0, "output": 0.0},
"nvidia/llama-3.1-nemotron-ultra-253b-v1:free": {"input": 0.0, "output": 0.0},
"nvidia/llama-3.3-nemotron-super-49b-v1:free": {"input": 0.0, "output": 0.0},
"openai/chatgpt-4o-latest": {"input": 5.0e-06, "output": 1.5e-05}, # Updated from API
"openai/gpt-4.1": {"input": 1e-06, "output": 3.0e-06}, # Updated from API
"openai/gpt-4.1-mini": {"input": 5.0e-07, "output": 1.5e-06}, # Updated from API
"openai/gpt-4.1-nano": {"input": 1.0e-07, "output": 3.0e-07}, # Updated from API
"openai/gpt-4.5-preview": {"input": 6.0e-06, "output": 1.2e-05}, # Updated from API
"openai/o1": {"input": 5.0e-06, "output": 1.5e-05}, # Updated from API
"openai/o1-pro": {"input": 1.0e-05, "output": 3.0e-05}, # Updated from API
"openai/o3-mini-high": {"input": 2.0e-07, "output": 6.0e-07}, # Updated from API
"qwen/qwq-32b": {"input": 5.0e-07, "output": 5.0e-07}, # Updated from API
"x-ai/grok-3-beta": {"input": 1e-06, "output": 1e-06},
"x-ai/grok-3-mini-beta": {"input": 1.0e-07, "output": 1.0e-07}, # Updated from API
}
# Merge hardcoded pricing using setdefault
for model_id, pricing in hardcoded_pricing.items():
self.model_pricing.setdefault(model_id, pricing)
# 4. Load from JSON/MD Export (Higher Priority - Overwrites API/Hardcoded)
try:
# Construct path relative to the script's directory if possible, or use absolute
# Assuming script runs from within openwebui-context-counter directory
base_dir = (
os.path.dirname(os.path.abspath(__file__))
if "__file__" in locals()
else os.getcwd()
)
memory_bank_dir = os.path.join(base_dir, "memory-bank")
# Look for export files (handle potential naming variations)
export_file_pattern = re.compile(r"models-export-.*\.json")
export_md_pattern = re.compile(r"models-export-.*\.md")
json_export_path = None
md_export_path = None
if os.path.exists(memory_bank_dir):
for filename in os.listdir(memory_bank_dir):
if export_file_pattern.match(filename):
json_export_path = os.path.join(memory_bank_dir, filename)
break # Use the first one found
elif export_md_pattern.match(filename):
md_export_path = os.path.join(memory_bank_dir, filename)
# Don't break, prefer JSON if both exist
if json_export_path:
logger.info(f"Found model export file at {json_export_path}")
self.load_models_from_json_export(json_export_path)
logger.info(
f"Successfully loaded/overwrote model data from JSON export file"
)
elif md_export_path: # Only use MD if JSON wasn't found
logger.info(f"Found markdown model export file at {md_export_path}")
self.load_models_from_json_export(md_export_path, is_markdown=True)
logger.info(
f"Successfully loaded/overwrote model data from markdown export file"
)
else:
logger.info("No model export file found in memory-bank directory.")
except Exception as e:
logger.error(f"Error loading models from export: {str(e)}")
# 5. Load Custom Models from Valves (Highest Priority - Overwrites All Previous)
logger.info("Model data initialization complete.")
logger.debug(f"Final unique model contexts loaded: {len(self.model_contexts)}")
logger.debug(f"Final unique model pricings loaded: {len(self.model_pricing)}")
# Parse plaintext definitions if provided (one per line)
if self.valves.custom_models_plaintext:
# Log the actual value received from the valve at INFO level for visibility
logger.info(f"Parsing custom_models_plaintext valve content: {self.valves.custom_models_plaintext!r}")
logger.debug(f"Attempting to parse custom_models_plaintext: {self.valves.custom_models_plaintext!r}") # Log the raw valve value
for line_num, line in enumerate(self.valves.custom_models_plaintext.strip().splitlines()):
line = line.strip()
if not line or line.startswith('#'): # Skip empty lines and comments
continue
logger.debug(f"Processing plaintext line {line_num + 1}: '{line}'")
# Use split() and filter empty strings for more robust parsing
parts = [part for part in line.split() if part]
logger.debug(f" Split parts (filtered): {parts}")
if len(parts) >= 4:
id_, ctx_str, inp_str, outp_str = parts[0], parts[1], parts[2], parts[3]
logger.debug(f" Extracted: id='{id_}', ctx='{ctx_str}', inp='{inp_str}', outp='{outp_str}'")
try:
ctx_i = int(ctx_str)
inp_f = float(inp_str)
outp_f = float(outp_str)
# Add directly to the main dictionaries, overwriting if needed
self.model_contexts[id_] = ctx_i
self.model_pricing[id_] = {"input": inp_f, "output": outp_f}
logger.debug(f" Successfully applied custom model from plaintext: {id_}")
except (ValueError, TypeError) as e:
# Log conversion errors as ERROR for better visibility
logger.error(f" ERROR parsing custom model definition line {line_num + 1}: '{line}' - Error: {e}")
else:
logger.warning(f" Skipping invalid custom model line {line_num + 1} (expected 4+ parts): '{line}'")
# REMOVED: Logic for merging structured custom_models list, as the valve was removed.
# for custom in self.valves.custom_models:
# if custom.id not in self.model_contexts: # Avoid overwriting plaintext entries
# self.model_contexts[custom.id] = custom.context_length
# self.model_pricing[custom.id] = {"input": custom.pricing.input, "output": custom.pricing.output}
# # logger.debug(f"Applied custom model from structured list: {custom.id}") # Logic removed
# logger.info(f"Applied custom models: {len(self.valves.custom_models)} from structured list, potentially more from plaintext.") # Logic removed
logger.info(f"Applied custom models from plaintext valve.") # Log message remains relevant
# Tokenizer cache
self.encoders = {}
# Token pattern recognition components
self.token_cache = {}
self.token_cache_stats = {"hits": 0, "misses": 0, "prunes": 0}
self.pattern_frequency = {}
self.content_type_stats = {}
self.token_cache_last_prune = time.time() # Track when we last pruned the cache
self.token_cache_entry_times = (
{}
) # Track when each cache entry was added/accessed
# Per-message tracking
self.message_tokens = {}
# Cost tracking
self.user_cost_file = USER_COST_FILE
self._ensure_cost_file_exists()
self.daily_cost_file = DAILY_COST_FILE # Feature 2: Daily cost file path
self._ensure_daily_cost_file_exists() # Feature 2: Ensure daily cost file exists
# Stream token tracking for dynamic rate calculation
self.stream_token_counter = 0
self.stream_start_time = None
self.current_token_rate = 0.0
self.last_stream_update = 0.0
# Adaptive rate deque - use max window size initially
self.stream_history = deque(maxlen=self.valves.rate_avg_window_max)
# self.stream_update_interval = 0.5 # Feature 9: Removed hardcoded value, use valve instead
# self.last_event_emitter = None # Removed - incompatible
# Timing info
self.start_time = None
# Session stats
self.session_stats = {
"total_tokens": 0,
"input_tokens": 0,
"output_tokens": 0,
"requests": 0,
"avg_tokens_per_req": 0,
"avg_tokens_per_sec": 0,
"success_rate": 100.0,
"peak_concurrency": 1,
"total_cost": 0.0,
"budget_remaining": self.valves.budget_amount,
"error_count": 0, # Feature 6: Add error counter
"session_cost": 0.0, # Feature 2: Add session cost tracking
"daily_cost": 0.0, # Feature 2: Add daily cost tracking (loaded later)
# Enhanced performance tracking
"tokens_added_last_cycle": 0,
"tokens_removed_last_cycle": 0,
"last_update_time": time.time(),
"message_generation_time": 0.0,
"prev_total_tokens": 0,
}
# self.using_dynamic_pricing flag is set during data loading
# Load initial calibration status
self.calibration_status_display = self._load_calibration_status()
# Initialize request counter
self.request_counter = 0
# Initial parsing attempt during initialization
self._parse_and_apply_plaintext_models()
# DEBUG: Log final pricing dictionary after initial loading/parsing in __init__
logger.debug(f"Model pricing after __init__: {self.model_pricing}")
def _parse_and_apply_plaintext_models(self):
"""Parses the plaintext custom models valve and updates pricing/context dictionaries.
This method is a critical part of the model detection and pricing system, replacing
the previous structured approach that used Pydantic models. It provides several key benefits:
1. Robustness: Simple string parsing is more resilient to serialization/deserialization issues
that were causing persistent Pydantic validation errors with the previous approach.
2. User-Friendly: Allows users to define custom models in a simple text format:
<MODEL_ID> <CONTEXT_SIZE> <INPUT_COST_PER_TOKEN> <OUTPUT_COST_PER_TOKEN>
Example: openai/o4-mini-high 200000 0.0000011 0.0000044
3. Flexibility: Supports comments (lines starting with #) and handles whitespace variations.
4. Priority: Custom models defined here take highest priority, overriding any values from
other sources (API, hardcoded defaults, exports).
5. Real-time Updates: This method is called both during initialization and in the outlet
method to ensure the latest custom model definitions are always applied before processing.
The method parses each non-empty, non-comment line from the valve content, extracts the
model ID, context size, and pricing information, converts them to the appropriate types,
and adds them directly to the main model_contexts and model_pricing dictionaries.
"""
if self.valves.custom_models_plaintext:
# Log the actual value received from the valve at INFO level for visibility
logger.info(f"Parsing custom_models_plaintext valve content: {self.valves.custom_models_plaintext!r}")
logger.debug(f"Attempting to parse custom_models_plaintext: {self.valves.custom_models_plaintext!r}") # Log the raw valve value
for line_num, line in enumerate(self.valves.custom_models_plaintext.strip().splitlines()):
line = line.strip()
if not line or line.startswith('#'): # Skip empty lines and comments
continue
logger.debug(f"Processing plaintext line {line_num + 1}: '{line}'")
parts = re.split(r'\s+', line)
logger.debug(f" Split parts: {parts}")
if len(parts) >= 4:
id_, ctx_str, inp_str, outp_str = parts[0], parts[1], parts[2], parts[3]
logger.debug(f" Extracted: id='{id_}', ctx='{ctx_str}', inp='{inp_str}', outp='{outp_str}'")
try:
ctx_i = int(ctx_str)
inp_f = float(inp_str)
outp_f = float(outp_str)
# Add directly to the main dictionaries, overwriting if needed
self.model_contexts[id_] = ctx_i
self.model_pricing[id_] = {"input": inp_f, "output": outp_f}
logger.debug(f" Successfully applied custom model from plaintext: {id_}")
except (ValueError, TypeError) as e:
# Log conversion errors as ERROR for better visibility
logger.error(f" ERROR parsing custom model definition line {line_num + 1}: '{line}' - Error: {e}")
else:
logger.warning(f" Skipping invalid custom model line {line_num + 1} (expected 4+ parts): '{line}'")
else:
logger.debug("custom_models_plaintext valve is empty, skipping parsing.")
# --- New Methods for Dynamic Loading & Caching ---
def _load_calibration_status(self) -> str:
"""Loads the calibration status string from the status file."""
status_file = os.path.join(DATA_DIR, "calibration_status.json")
default_status = "Not Calibrated" # Changed default from "Unknown"
try:
if os.path.exists(status_file):
with open(status_file, "r", encoding="utf-8") as f:
status_data = json.load(f)
# Basic validation
if isinstance(status_data, dict) and "status_string" in status_data:
loaded_status = status_data["status_string"]
# Add timestamp info if available
timestamp = status_data.get("analysis_timestamp")
if timestamp:
try:
# Format timestamp nicely
dt_obj = datetime.fromisoformat(timestamp)
# Example: " (as of Apr 16 14:30)"
formatted_ts = dt_obj.strftime(" (as of %b %d %H:%M)")
loaded_status += formatted_ts
except ValueError:
pass # Ignore invalid timestamp format
logger.info(f"Loaded calibration status: {loaded_status}")
return loaded_status
else:
logger.warning(f"Invalid format in {status_file}. Using default status.")
return default_status
else:
logger.info(f"Calibration status file not found ({status_file}). Using default status.")
return default_status
except (IOError, json.JSONDecodeError) as e:
logger.error(f"Error loading calibration status from {status_file}: {e}")
return default_status
except Exception as e: # Catch any other unexpected errors
logger.error(f"Unexpected error loading calibration status: {e}")
return default_status
def _load_models_from_cache(self) -> Tuple[Dict, bool]:
"""Loads model data from cache file if valid."""
if not os.path.exists(MODEL_CACHE_FILE):
logger.info("Model cache file not found.")
return {}, False
try:
with open(MODEL_CACHE_FILE, "r", encoding="utf-8") as f:
cache_data = json.load(f)
cache_timestamp_str = cache_data.get("timestamp")
if not cache_timestamp_str:
logger.warning("Model cache file missing timestamp.")
return {}, False
cache_timestamp = datetime.fromisoformat(cache_timestamp_str)
if datetime.now() - cache_timestamp > timedelta(
seconds=MODEL_CACHE_TTL_SECONDS
):
logger.info("Model cache file is expired.")
return {}, False
logger.info(
f"Loading model data from valid cache file (Timestamp: {cache_timestamp_str})."
)
return cache_data.get("models", {}), True
except (json.JSONDecodeError, OSError, ValueError) as e:
logger.error(f"Error loading model cache file: {e}")
return {}, False
def _save_models_to_cache(self, models_data: Dict):
"""Saves fetched model data to the cache file."""
try:
os.makedirs(os.path.dirname(MODEL_CACHE_FILE), exist_ok=True)
cache_content = {
"timestamp": datetime.now().isoformat(),
"models": models_data,
}
with open(MODEL_CACHE_FILE, "w", encoding="utf-8") as f:
json.dump(cache_content, f)
logger.info(f"Saved fetched model data to cache file: {MODEL_CACHE_FILE}")
except (OSError, TypeError) as e:
logger.error(f"Error saving model data to cache: {e}")
def _fetch_openrouter_model_list(self) -> Dict:
"""Fetches the model list from OpenRouter API."""
logger.info("Attempting to fetch model list from OpenRouter API...")
api_url = "https://openrouter.ai/api/v1/models"
fetched_models = {}
try:
req = urllib.request.Request(
api_url,
headers={
"HTTP-Referer": "https://github.com/open-webui/open-webui",
"X-Title": "Context Counter Function",
},
)
# Increased timeout for potentially slow API
with urllib.request.urlopen(req, timeout=15) as response:
if response.status == 200:
data = json.loads(response.read().decode("utf-8"))
models_data = data.get("data", [])
for model in models_data:
model_id = model.get("id")
if not model_id:
continue
context = model.get("context_length")
pricing = model.get("pricing", {})
# Handle potential None or non-string values gracefully
input_price_str = str(pricing.get("input", "0") or "0")
output_price_str = str(pricing.get("output", "0") or "0")
try:
# Prices are per million tokens, convert to per token
input_per_token = float(input_price_str) / 1_000_000
output_per_token = float(output_price_str) / 1_000_000
except ValueError:
logger.warning(
f"Could not parse pricing for {model_id}: input='{input_price_str}', output='{output_price_str}'"
)
input_per_token = 0.0
output_per_token = 0.0
fetched_models[model_id] = {
"context_length": context,
"pricing": {
"input": input_per_token,
"output": output_per_token,
},
}
logger.info(
f"Successfully fetched data for {len(fetched_models)} models from OpenRouter."
)
return fetched_models
else:
logger.error(
f"Failed to fetch model list from OpenRouter. Status: {response.status}"
)
return {}
except urllib.error.URLError as e:
logger.error(f"Network error fetching OpenRouter model list: {e}")
return {}
except TimeoutError:
logger.error("Timeout error fetching OpenRouter model list.")
return {}
except json.JSONDecodeError as e:
logger.error(
f"Error decoding JSON response from OpenRouter model list: {e}"
)
return {}
except Exception as e:
logger.error(f"Unexpected error fetching OpenRouter model list: {e}")
return {}
# --- End New Methods ---
def _initialize_content_patterns(self):
"""Initialize content recognition patterns for different content types.
This function builds pattern dictionaries for detecting various content types
like code, JSON, tables, mathematical formulas, etc. These patterns are used
throughout the token counting and content type detection system.
Returns:
Dictionary mapping content types to pattern dictionaries
"""
# Directly define basic patterns as the enhanced module is not used/available.
logger.debug("Initializing basic content patterns.")
patterns = {
# Programming language patterns
"code": {
# Function/method definitions
"function_defs": [
r"(function|def|func|fn|sub|method|procedure)\s+[\w_]+ *\(", # Basic function def
r"(public|private|protected|static|async)\s+[\w_]+\s+[\w_]+ *\(", # OOP methods
r"^\s*class\s+[\w_]+", # Class definitions
],
# Variable declarations
"variable_defs": [
r"(var|let|const|int|float|double|string|bool)\s+[\w_]+\s*=",
],
},
# JSON patterns
"json": {
"structure": [
r'^\s*\{\s*"[\w_]+"\s*:', # Object start
r'^\s*\[\s*\{\s*"[\w_]+"\s*:', # Array of objects start
],
},
# Markdown table patterns
"table": {
"headers": r"\|[\s-:]*\|[\s-:]*\|", # Table header/divider row
"structure": r"(\|[^\|]+)+\|", # Basic row structure
},
# Markdown list patterns
"list": {
"unordered": r"(\n\s*[-*+•]\s+[^\n]+)+", # Unordered lists with different markers
"ordered": r"(\n\s*\d+[.)\]]\s+[^\n]+)+", # Numbered lists
},
}
return patterns
def _ensure_cost_file_exists(self):
"""Ensure that the cost tracking file exists."""
if not os.path.exists(self.user_cost_file):
try:
# Create parent directories if they don't exist
os.makedirs(os.path.dirname(self.user_cost_file), exist_ok=True)
# Create empty cost file with empty JSON object
with open(self.user_cost_file, "w", encoding="UTF-8") as f:
json.dump({}, f)
except Exception as e:
logger.error(f"Failed to create user cost file: {str(e)}")
# Feature 2: Add method to ensure daily cost file exists
def _ensure_daily_cost_file_exists(self):
"""Ensure the daily cost tracking file exists and initialize if needed."""
if not os.path.exists(self.daily_cost_file):
try:
os.makedirs(os.path.dirname(self.daily_cost_file), exist_ok=True)
with open(self.daily_cost_file, "w", encoding="UTF-8") as f:
# Initialize with today's date and zero cost
today_str = date.today().isoformat()
json.dump({today_str: 0.0}, f)
except Exception as e:
logger.error(f"Failed to create daily cost file: {str(e)}")
def _load_and_update_daily_cost(self, cost_to_add: float) -> float:
"""Load today's cost, add the new cost, save with locking, and return the updated total."""
today_str = date.today().isoformat()
daily_costs = {}
updated_today_cost = 0.0
f = None # Initialize f to None
lock_acquired = False # Initialize lock status
try:
f = open(self.daily_cost_file, "a+", encoding="UTF-8")
f.seek(0) # Move to beginning for reading
# --- Acquire lock (OS-dependent) ---
if os.name == "posix" and fcntl:
try:
fcntl.flock(f, LOCK_EX | LOCK_NB) # Try non-blocking exclusive lock
lock_acquired = True
except IOError:
logger.warning("Daily cost file (POSIX) is locked, waiting...")
fcntl.flock(f, LOCK_EX) # Fallback to blocking lock
lock_acquired = True
elif os.name == "nt" and msvcrt:
# Windows locking: Lock the entire file exclusively
try:
msvcrt.locking(
f.fileno(), msvcrt.LK_NBLCK, 1
) # Try non-blocking exclusive lock
lock_acquired = True
except IOError:
logger.warning(
"Daily cost file (Windows) is locked, retrying with blocking lock..."
)
time.sleep(0.1) # Simple delay before retry
try:
msvcrt.locking(
f.fileno(), msvcrt.LK_LOCK, 1
) # Blocking lock on retry
lock_acquired = True
except IOError as e_lock:
logger.error(
f"Could not acquire lock on daily cost file (Windows) after retry: {e_lock}"
)
lock_acquired = False
logger.error("Proceeding without lock due to acquisition failure.")
else:
logger.warning(
"File locking not supported on this OS. Daily cost updates may be unreliable under concurrency."
)
lock_acquired = True
# --- Perform Read/Update/Write ONLY if lock is held or not needed/supported ---
if lock_acquired:
try:
content = f.read()
if content:
daily_costs = json.loads(content)
else:
daily_costs = {}
except json.JSONDecodeError:
logger.error("Failed to parse daily cost file, resetting.")
daily_costs = {}
today_cost = daily_costs.get(today_str, 0.0)
updated_today_cost = today_cost + cost_to_add
daily_costs = {today_str: updated_today_cost}
f.seek(0)
f.truncate()
json.dump(daily_costs, f)
f.flush()
os.fsync(f.fileno())
logger.debug(f"Updated daily cost for {today_str}: Added ${cost_to_add:.6f}, New Total: ${updated_today_cost:.6f}")
return updated_today_cost
else:
logger.error("Failed to acquire lock on daily cost file. Update skipped.")
try:
f.seek(0)
content = f.read()
if content:
daily_costs = json.loads(content)
else:
daily_costs = {}
except Exception:
daily_costs = {}
return daily_costs.get(today_str, 0.0)
except IOError as e:
logger.error(f"File lock/IO error for daily costs: {e}")
return daily_costs.get(today_str, 0.0)
except Exception as e:
logger.error(f"Failed to update daily cost: {str(e)}")
return daily_costs.get(today_str, 0.0)
finally:
if f:
if lock_acquired:
if os.name == "posix" and fcntl:
fcntl.flock(f, LOCK_UN)
elif os.name == "nt" and msvcrt:
try:
msvcrt.locking(f.fileno(), msvcrt.LK_UNLCK, 1)
except IOError:
pass
f.close()
# --- Monthly Cost Tracking ---
def _load_and_update_monthly_cost(self, cost_to_add: float) -> Dict[str, float]:
"""Load this month's cost, add the new cost, save with locking, and return month info."""
current_month_str = date.today().strftime("%Y-%m")
# Define monthly cost file path dynamically based on current month
monthly_cost_file = os.path.join(DATA_DIR, f"monthly_costs_{current_month_str}.json")
monthly_costs = {}
updated_month_cost = 0.0
f = None
lock_acquired = False
try:
# Ensure data directory exists
os.makedirs(os.path.dirname(monthly_cost_file), exist_ok=True)
# Check if the file exists for the *current* month. If not, it's a new month.
is_new_month = not os.path.exists(monthly_cost_file)
# Open file (create if doesn't exist)
f = open(monthly_cost_file, "a+", encoding="UTF-8")
f.seek(0) # Move to beginning for reading
# --- Acquire lock (OS-dependent) ---
if os.name == "posix" and fcntl:
try:
fcntl.flock(f, LOCK_EX | LOCK_NB) # Try non-blocking exclusive lock
lock_acquired = True
except IOError:
logger.warning("Monthly cost file (POSIX) is locked, waiting...")
fcntl.flock(f, LOCK_EX) # Fallback to blocking lock
lock_acquired = True
elif os.name == "nt" and msvcrt:
# Windows locking: Lock the entire file exclusively
try:
msvcrt.locking(f.fileno(), msvcrt.LK_NBLCK, 1) # Try non-blocking exclusive lock
lock_acquired = True
except IOError:
logger.warning("Monthly cost file (Windows) is locked, retrying...")
time.sleep(0.1) # Simple delay before retry
try:
msvcrt.locking(f.fileno(), msvcrt.LK_LOCK, 1) # Blocking lock on retry
lock_acquired = True
except IOError as e_lock:
logger.error(f"Could not acquire lock on monthly cost file (Windows): {e_lock}")
lock_acquired = False # Indicate failure
else:
logger.warning("File locking not supported on this OS for monthly costs.")
lock_acquired = True # Proceed assuming no lock needed
# --- Perform Read/Update/Write ONLY if lock is held or assumed ---
if lock_acquired:
try:
content = f.read()
if content and not is_new_month: # Only load if file existed and wasn't empty
monthly_costs = json.loads(content)
# Get the cost for the current month if it exists in the file
updated_month_cost = monthly_costs.get(current_month_str, 0.0)
else: # New month or empty file
monthly_costs = {}
updated_month_cost = 0.0 # Start fresh for the new month
logger.info(f"Starting new monthly cost tracking for {current_month_str}.")
except json.JSONDecodeError:
logger.error("Failed to parse monthly cost file, resetting for the month.")
monthly_costs = {}
updated_month_cost = 0.0
# Add the new cost
updated_month_cost += cost_to_add
monthly_costs[current_month_str] = updated_month_cost # Store/update with current month key
# Write back the updated data for the current month
f.seek(0)
f.truncate()
json.dump(monthly_costs, f) # Save the dictionary containing the current month's cost
f.flush()
os.fsync(f.fileno()) # Ensure data is written to disk
logger.debug(f"Updated monthly cost for {current_month_str}: Added ${cost_to_add:.6f}, New Total: ${updated_month_cost:.6f}")
# Determine the applicable monthly budget (user override or global valve)
# Note: __user__ is not available here, so we rely on the global valve for now.
# The check against user override needs to happen in the outlet method.
monthly_budget = self.valves.monthly_budget_amount # Use global valve here
remaining_monthly = monthly_budget - updated_month_cost if monthly_budget > 0 else 0 # Calculate remaining only if budget > 0
used_monthly_percent = (updated_month_cost / monthly_budget * 100) if monthly_budget > 0 else 0
return {
"current_month_cost": updated_month_cost, # Total cost accumulated this month
"remaining_monthly": remaining_monthly,
"used_monthly_percent": used_monthly_percent,
"monthly_budget": monthly_budget # The budget amount used for calculation
}
else:
logger.error("Failed to acquire lock on monthly cost file. Update skipped.")
# Attempt to read current value without lock if possible, otherwise return 0
current_cost = 0.0 # Default if read fails
try:
f.seek(0)
content = f.read()
if content:
monthly_costs = json.loads(content)
current_cost = monthly_costs.get(current_month_str, 0.0)
except Exception:
pass # Ignore read errors if lock failed
# Return defaults or last known state if possible, using global budget as fallback
fallback_budget = self.valves.monthly_budget_amount
return {"current_month_cost": current_cost, "remaining_monthly": fallback_budget - current_cost if fallback_budget > 0 else 0, "used_monthly_percent": (current_cost / fallback_budget * 100) if fallback_budget > 0 else 0, "monthly_budget": fallback_budget}
except Exception as e:
logger.error(f"Failed to update monthly cost: {str(e)}")
# Return defaults in case of error, using global budget
fallback_budget = self.valves.monthly_budget_amount
return {"current_month_cost": 0.0, "remaining_monthly": fallback_budget, "used_monthly_percent": 0.0, "monthly_budget": fallback_budget}
finally:
if f:
if lock_acquired:
if os.name == "posix" and fcntl:
fcntl.flock(f, LOCK_UN)
elif os.name == "nt" and msvcrt:
try:
msvcrt.locking(f.fileno(), msvcrt.LK_UNLCK, 1)
except IOError: pass # Ignore errors on unlock
f.close()
def _update_user_cost(
self,
user_email: str,
model: str,
input_tokens: int,
output_tokens: int,
total_cost: float,
):
"""Update the user's cost data in the cost file."""
if not self.valves.persist_user_costs:
return
if not user_email:
logger.warning("User email not provided for cost tracking")
return
try:
# Read existing cost data
costs = {}
if os.path.exists(self.user_cost_file):
with open(self.user_cost_file, "r", encoding="UTF-8") as f:
try:
costs = json.load(f)
except json.JSONDecodeError:
logger.error("Failed to parse user cost file, creating new one")
costs = {}
# Ensure user entry exists
if user_email not in costs:
costs[user_email] = []
# Add new cost entry
costs[user_email].append(
{
"model": model,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"),
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total_cost": str(total_cost),
}
)
# Write updated cost data
with open(self.user_cost_file, "w", encoding="UTF-8") as f:
json.dump(costs, f, indent=4)
except Exception as e:
logger.error(f"Failed to update user cost: {str(e)}")
def normalize_model_name(self, model_id: str) -> str:
"""Normalize model names to standard format for better detection.
This function standardizes model identifiers by:
1. Converting to lowercase
2. Removing vendor prefixes (openai/, anthropic/, google/, etc.)
3. Standardizing separators (replacing spaces, underscores with hyphens)
4. Removing version qualifiers (like :free, :beta)
5. Removing suffixes like -tuned
Examples:
"openai/gpt-4-turbo" → "gpt-4-turbo"
"Google/gemini-1.5-flash" → "gemini-1.5-flash"
"""
if not model_id:
return ""
# Start with lowercase and strip whitespace
normalized = model_id.lower().strip()
# Remove common vendor prefixes
prefixes = [
# Provider paths
"openai/",
"anthropic/",
"google/",
"mistralai/",
"meta-llama/",
"cohere/",
"meta/",
"qwen/",
"x-ai/",
"azure/",
"amazon/",
"together.ai/",
"deepinfra/",
"fireworks.ai/",
"replicate/",
# Provider names without slashes
"openai",
"github",
"google_genai",
"anthropic",
"google",
"microsoft",
"replicate",
"perplexity",
"huggingface",
"meta",
"amazon",
"nvidia",
]
# Try to handle nested prefixes
for prefix in prefixes:
if normalized.startswith(prefix):
normalized = normalized[len(prefix) :]
break
# Standardize separators (convert spaces and underscores to hyphens)
normalized = re.sub(r"[-_\s]+", "-", normalized)
# Remove any extra hyphens at start or end
normalized = normalized.strip("-")
# Log the normalization result for debugging
if model_id != normalized:
logger.debug(f"Normalized model name: '{model_id}' → '{normalized}'")
return normalized
def load_models_from_json_export(
self, file_path: str, is_markdown: bool = False
) -> None:
"""Load model data from JSON export file."""
try:
# Read the file
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
# Extract JSON content
json_content = content
if is_markdown:
# Look for JSON block in markdown
json_blocks = re.findall(r"```json\s*([\s\S]*?)\s*```", content)
if json_blocks:
json_content = json_blocks[0]
else:
# Try looking for any code block
code_blocks = re.findall(r"```\s*([\s\S]*?)\s*```", content)
if code_blocks:
json_content = code_blocks[0]
else:
# Try to find JSON-like content
json_like = re.search(r"(\{[\s\S]*\}|\[[\s\S]*\])", content)
if json_like:
json_content = json_like.group(0)
# Parse JSON
models_data = json.loads(json_content)
# Process model data
loaded_count = 0
# Handle different JSON structures
if isinstance(models_data, list):
# Array of model objects
for model in models_data:
if "id" in model:
model_id = model["id"]
# Extract context size if available
context_size = None
if "context_length" in model:
context_size = model["context_length"]
elif "context_window" in model:
context_size = model["context_window"]
# Apply context size if valid (OVERWRITE existing)
if (
context_size
and isinstance(context_size, (int, float))
and context_size > 0
):
self.model_contexts[model_id] = int(context_size)
loaded_count += 1
# Extract pricing if available
input_price = None
output_price = None
if "pricing" in model:
pricing = model["pricing"]
if isinstance(pricing, dict):
if "input" in pricing:
input_price = pricing["input"]
if "output" in pricing:
output_price = pricing["output"]
# Apply pricing if valid (OVERWRITE existing)
if input_price is not None and output_price is not None:
try:
# Assume prices in export might be per-million or per-token
input_p = float(input_price)
output_p = float(output_price)
# Heuristic: if price > 0.01, assume per-million
if input_p > 0.01:
input_p /= 1_000_000
if output_p > 0.01:
output_p /= 1_000_000
self.model_pricing[model_id] = {
"input": input_p,
"output": output_p,
}
except ValueError:
logger.warning(
f"Could not parse pricing from export for {model_id}"
)
# Log results
logger.info(
f"Loaded/Overwrote {loaded_count} models from JSON export at {file_path}"
)
except Exception as e:
logger.error(f"Error loading models from JSON export: {str(e)}")
def is_claude(self, model_id: str) -> bool:
"""Check if a model is any variant of Anthropic Claude."""
if not model_id:
return False
# Use normalized name for consistent matching
model_id_lower = self.normalize_model_name(model_id).lower()
# Comprehensive Claude indicators
claude_indicators = [
"claude",
"anthropic-claude",
"anthropic/claude",
"anthropic",
"claude-3",
"claude-3.5",
"claude-3.7",
"claude-4",
"opus",
"sonnet",
"haiku",
"instant",
]
# If ANY of these substrings appears in the model ID, assume it's Claude
for indicator in claude_indicators:
if indicator in model_id_lower:
return True
return False
def is_gpt4o(self, model_id: str) -> bool:
"""Check if a model is any variant of GPT-4o."""
if not model_id:
return False
# Use normalized name for consistent matching
model_id_lower = self.normalize_model_name(model_id).lower()
# Check for ANY variation that might indicate GPT-4o
gpt4o_indicators = [
"gpt-4o",
"gpt4o",
"gpt4-o",
"gpt-4-o",
"gpt4o",
"gpt 4o",
"gpt-4o-mini",
"gpt4omini",
]
# If ANY of these substrings appears in the model ID, assume it's GPT-4o
for indicator in gpt4o_indicators:
if indicator in model_id_lower:
return True
return False
def is_gemini(self, model_id: str) -> bool:
"""Check if a model is any variant of Google Gemini."""
if not model_id:
return False
# Use normalized name for consistent matching
model_id_lower = self.normalize_model_name(model_id).lower()
# Check for ANY variation that might indicate Gemini
gemini_indicators = [
"gemini",
"google-gemini",
"gemini-pro",
"gemini-flash",
"gemini-1.5",
"gemini-2.0",
"gemini-2.5",
]
# If ANY of these substrings appears in the model ID, assume it's Gemini
for indicator in gemini_indicators:
if indicator in model_id_lower:
return True
return False
def extract_openrouter_model(self, model_name: str) -> str:
"""Extract the actual model ID from an OpenRouter prefixed model."""
if model_name.startswith("OR."):
# Extract the actual model part after "OR." prefix
return model_name[3:]
return model_name
def add_or_prefix(self, model_name: str) -> str:
"""Add OR. prefix to a model name if it doesn't already have it."""
if not model_name.startswith("OR."):
return f"OR.{model_name}"
return model_name
# Removed load_openrouter_contexts and load_openrouter_pricing as logic is now in __init__ / _fetch_openrouter_model_list
def _detect_content_type(self, text: str) -> str:
"""Detect the type of content in the text."""
if not text or not self.valves.enable_content_detection:
return "text"
# Code blocks (triple backticks)
if re.search(r"```[a-zA-Z]*\n[\s\S]*?\n```", text):
return "code"
# JSON objects or arrays
if (text.strip().startswith("{") and text.strip().endswith("}")) or (
text.strip().startswith("[") and text.strip().endswith("]")
):
try:
json.loads(text.strip())
return "json"
except:
pass
# Markdown tables
if re.search(r"\|[-]+\|[-]+\|", text):
return "table"
# Markdown headers
if re.search(r"^#{1,6}\s", text, re.MULTILINE):
return "markdown_header"
# Markdown links or images
if re.search(r"!\[.*?\]\(.*?\)", text) or re.search(r"\[.*?\]\(.*?\)", text):
return "markdown_link"
# HTML/XML blocks
if re.search(r"<[a-zA-Z][\s\S]*?>", text):
return "html"
# Emojis (basic unicode emoji range)
if re.search(r"[\U0001F600-\U0001F64F]", text) or re.search(
r"[\U0001F300-\U0001F5FF]", text
):
return "emoji"
# Quoted text (blockquotes)
if re.search(r"^\s*>", text, re.MULTILINE):
return "blockquote"
# Inline code snippets
if re.search(r"`[^`]+`", text):
return "inline_code"
# Default fallback
return "text"
def _update_content_type_stats(self, content_type: str) -> None:
"""Update statistics on content type detection."""
if content_type not in self.content_type_stats:
self.content_type_stats[content_type] = 0
self.content_type_stats[content_type] += 1
def _get_appropriate_encoding(self, model_name: str) -> str:
"""Determine the appropriate tiktoken encoding for a model.
Different models may use different tokenization schemes. This method
determines the best encoding to use based on the model family.
Args:
model_name: The name of the model to get encoding for
Returns:
The name of the appropriate tiktoken encoding
"""
# Normalize the model name for better matching
normalized = self.normalize_model_name(model_name)
# Most models use cl100k_base (GPT-4, Claude, etc.)
if self.is_claude(model_name) or "claude" in normalized:
return "cl100k_base" # Claude uses similar tokenizer to GPT-4
if self.is_gpt4o(model_name) or "gpt-4" in normalized:
return "cl100k_base"
if "gpt-3.5" in normalized:
return "cl100k_base"
# Check for older GPT models that use different encoders
if "gpt-3" in normalized:
return "p50k_base"
# Gemini and other models - best approximation is cl100k_base
# There's no perfect match but this is closest
return "cl100k_base"
def _preprocess_for_claude(self, text: str) -> str:
"""Apply Claude-specific preprocessing for more accurate token counting.
Claude treats certain patterns differently from other models, especially
with regard to whitespace, newlines, and XML-like tags.
Args:
text: The text to preprocess
Returns:
Preprocessed text optimized for Claude tokenization
"""
# Note: This is a simplification. Full Claude token counting requires
# their proprietary tokenizer, but these adjustments help approximate it better.
# Claude is more sensitive to repeated newlines
# Compress multiple newlines to reduce excessive token counts
processed = re.sub(r"\n{3,}", "\n\n", text)
# Claude considers <tag> structures as special
# We don't modify them but account for them in our counting
return processed
def _preprocess_for_gemini(self, text: str) -> str:
"""Apply Gemini-specific preprocessing for more accurate token counting.
Gemini has different handling for certain markdown patterns and
for content like code blocks. This preprocessing helps approximate
its token counting better.
Args:
text: The text to preprocess
Returns:
Preprocessed text optimized for Gemini tokenization
"""
# Note: This is an approximation since Gemini uses a different underlying tokenizer
# Gemini handles code blocks slightly differently
# Mark them to account for this in counting
processed = text
# Gemini may parse markdown differently, but for now we'll
# rely on the general-purpose tokenizer
return processed
def _fallback_count_tokens(self, text: str, model_name: str) -> int:
"""Fallback method for token counting when tiktoken is unavailable.
This provides more sophisticated estimation based on model family
and content type when the standard tokenizer isn't available.
Args:
text: The text to count tokens for
model_name: The model name to determine estimation approach
Returns:
Estimated token count
"""
# Check for content type to apply different estimation rules
content_type = self._detect_content_type(text)
# Basic character count as starting point
char_count = len(text)
# Word count (most tokenizers roughly track words, but with variations)
# word_count = len(text.split()) # Less reliable than char count for tokens
# Feature 7: Use refined estimates based on content type
ratio = CHAR_PER_TOKEN_ESTIMATE.get(
content_type, CHAR_PER_TOKEN_ESTIMATE["default"]
)
estimated_tokens = char_count / ratio
# Model-specific adjustments (can be added if needed)
# if self.is_claude(model_name):
# estimated_tokens *= 1.05 # Example adjustment
return int(estimated_tokens)
def _count_structured_tokens(self, text: str, content_type: str, encoder) -> int:
"""Apply specialized token counting for different content types.
Different types of content (code, JSON, tables) tokenize differently.
This method applies content-specific optimizations.
Args:
text: The text to count tokens for
content_type: The detected content type
encoder: The tiktoken encoder to use
Returns:
Optimized token count for the specific content type
"""
if content_type == "code":
# Code tends to have special tokens, indentation, etc.
# For now, we'll use the regular encoder but we could add
# code-specific optimizations here
return len(encoder.encode(text, disallowed_special=()))
elif content_type == "json":
# JSON has many special characters that become tokens
# For accurate counting, we apply the regular encoder
return len(encoder.encode(text, disallowed_special=()))
elif content_type == "table":
# Tables have special handling in some models
# For now, use regular encoding
return len(encoder.encode(text, disallowed_special=()))
else:
# Regular text
return len(encoder.encode(text, disallowed_special=()))
def count_tokens(self, text: str, model_name: str = "default") -> int:
"""Count tokens in text using the appropriate tokenizer for the model.
This enhanced method uses model-specific tokenization approaches and
content-specific optimizations for more accurate token counts across
different types of content and model families.
Args:
text: The text to count tokens for
model_name: The model name to determine tokenization approach
Returns:
Token count for the provided text
"""
if not text:
return 0
# Check cache first for performance optimization
if self.valves.enable_token_cache:
cache_key = f"{model_name}:{hash(text)}"
if cache_key in self.token_cache:
self.token_cache_stats["hits"] += 1
return self.token_cache[cache_key]
else:
self.token_cache_stats["misses"] += 1
# Determine content type for specialized handling
content_type = "text"
if self.valves.enable_content_detection:
content_type = self._detect_content_type(text)
self._update_content_type_stats(content_type)
# Use tiktoken if available
if TIKTOKEN_AVAILABLE:
# Get appropriate encoding based on model family
encoding_name = self._get_appropriate_encoding(
model_name
) # This line was missing
# Get or create encoder
if encoding_name not in self.encoders:
try:
self.encoders[encoding_name] = tiktoken.get_encoding(encoding_name)
except Exception as e:
logger.error(f"Failed to get encoding {encoding_name}: {e}")
return self._fallback_count_tokens(text, model_name)
encoder = self.encoders[encoding_name]
# Apply model-specific preprocessing for better accuracy
processed_text = text
if self.is_claude(model_name):
processed_text = self._preprocess_for_claude(text)
elif self.is_gemini(model_name):
processed_text = self._preprocess_for_gemini(text)
# Apply content-specific counting if enabled
if self.valves.content_specific_counting and content_type != "text":
token_count = self._count_structured_tokens(
processed_text, content_type, encoder
)
else:
token_count = len(encoder.encode(processed_text, disallowed_special=()))
else:
# Enhanced fallback token counting method when tiktoken not available
token_count = self._fallback_count_tokens(text, model_name)
# Add to cache with timestamp
if self.valves.enable_token_cache:
# Add or update the cache entry
self.token_cache[cache_key] = token_count
# Record the access time
self.token_cache_entry_times[cache_key] = time.time()
# Periodically check if cache needs pruning (every 100 misses)
if self.token_cache_stats["misses"] % 100 == 0:
self._prune_token_cache()
# Apply model-specific correction factor
model_factor = self.valves.model_correction_factors.get(model_name, 1.0)
# Apply content-type correction factor
content_factor = self.valves.content_correction_factors.get(content_type, 1.0)
corrected_tokens = int(token_count * model_factor * content_factor)
logger.debug(f"Count tokens for '{model_name}' ({content_type}): Base={token_count}, ModelFactor={model_factor:.4f}, ContentFactor={content_factor:.4f} -> Corrected={corrected_tokens}")
# Return the corrected token count
return corrected_tokens
def _prune_token_cache(self) -> None:
"""Prune the token cache to prevent unbounded memory growth.
This method uses both time-based and size-based approaches to maintain
the token cache at a reasonable size:
1. Time-based: Removes entries older than CACHE_TTL (default: 5 days)
2. Size-based: If the cache exceeds the configured maximum size, removes
the oldest entries until it's within the limit
The pruning approach prioritizes keeping recently used entries while
removing stale ones that are unlikely to be used again.
"""
now = time.time()
prune_count = 0
# Track the last time we did a full prune to avoid doing it too often
if now - self.token_cache_last_prune < 3600: # Only do full prune once per hour
return
try:
# 1. Time-based pruning: Remove entries older than CACHE_TTL
expired_keys = []
for key, timestamp in list(self.token_cache_entry_times.items()):
if now - timestamp > CACHE_TTL:
expired_keys.append(key)
# Remove expired entries
for key in expired_keys:
if key in self.token_cache:
del self.token_cache[key]
if key in self.token_cache_entry_times:
del self.token_cache_entry_times[key]
prune_count += 1
# 2. Size-based pruning: If still too large, remove oldest entries
if len(self.token_cache) > self.valves.token_cache_size:
# Sort by timestamp (oldest first)
sorted_entries = sorted(
self.token_cache_entry_times.items(), key=lambda x: x[1]
)
# Calculate how many entries to remove
excess = len(self.token_cache) - self.valves.token_cache_size
keys_to_remove = [k for k, _ in sorted_entries[:excess]]
# Remove excess entries (oldest first)
for key in keys_to_remove:
if key in self.token_cache:
del self.token_cache[key]
if key in self.token_cache_entry_times:
del self.token_cache_entry_times[key]
prune_count += 1
# Update prune stats
self.token_cache_stats["prunes"] += prune_count
# Update the last prune time
self.token_cache_last_prune = now
# Log pruning results
if prune_count > 0:
logger.debug(
f"Pruned {prune_count} entries from token cache. Cache size: {len(self.token_cache)}"
)
except Exception as e:
# Non-critical operation - log but don't crash
logger.error(f"Error pruning token cache: {e}")
def _is_experimental_model(self, model_name: str) -> bool:
"""Determine if a model is experimental or newly released.
This helps identify models that might benefit from cost comparisons
with more established models.
Args:
model_name: The model name to check
Returns:
True if the model is considered experimental, False otherwise
"""
# Indicators in name suggesting experimental status
experimental_indicators = [
"preview",
"alpha",
"beta",
"test",
"exp",
"experimental",
"4.5",
"3.7",
"2.5",
"o3", # Version indicators for new models
"-latest",
"-dev",
"-preview",
]
# Normalize and lowercase for better matching
normalized = self.normalize_model_name(model_name).lower()
# Check for any experimental indicators
for indicator in experimental_indicators:
if indicator in normalized:
return True
# Also consider models not in our standard dictionary as experimental
return model_name not in self.model_contexts
def infer_model_family(self, model_name: str) -> Tuple[str, int]:
"""Infer model family and context size based on patterns in model name.
This is a smart detection system that looks for specific keywords in model names
and returns the most likely model family and context size.
Returns:
Tuple of (family_name, context_size)
"""
model_name = model_name.lower()
# Map of model family keywords to their likely context sizes
family_patterns = [
# Newest models - most specific first
{"pattern": "o3", "family": "o3", "context": 128000},
{"pattern": "opus", "family": "claude", "context": 200000},
{"pattern": "claude-3", "family": "claude", "context": 200000},
{"pattern": "claude-3.5", "family": "claude", "context": 200000},
{"pattern": "claude-3.7", "family": "claude", "context": 200000},
{"pattern": "claude-instant", "family": "claude", "context": 100000},
{"pattern": "claude-haiku", "family": "claude", "context": 150000},
{"pattern": "claude-sonnet", "family": "claude", "context": 180000},
{"pattern": "gemini-flash", "family": "gemini", "context": 1000000},
{"pattern": "gemini-2.5", "family": "gemini", "context": 256000},
{"pattern": "gemini-pro", "family": "gemini", "context": 256000},
{"pattern": "gpt-4o", "family": "gpt4o", "context": 128000},
{"pattern": "gpt-4o-mini", "family": "gpt4o", "context": 128000},
{"pattern": "gpt-4-turbo", "family": "gpt4", "context": 128000},
{"pattern": "gpt-4-32k", "family": "gpt4", "context": 32768},
{"pattern": "gpt-4", "family": "gpt4", "context": 8192},
{"pattern": "gpt-3.5", "family": "gpt35", "context": 16385},
# Other major model families
{"pattern": "mixtral", "family": "mixtral", "context": 32768},
{"pattern": "mistral", "family": "mistral", "context": 32768},
{"pattern": "llama-3.1-70b", "family": "llama", "context": 131072},
{"pattern": "llama-3.1", "family": "llama", "context": 131072},
{"pattern": "llama-3-70b", "family": "llama", "context": 8192},
{"pattern": "llama-3", "family": "llama", "context": 8192},
{"pattern": "llama-2", "family": "llama", "context": 4096},
{"pattern": "qwen-2.5", "family": "qwen", "context": 128000},
{"pattern": "qwen-2", "family": "qwen", "context": 32768},
{"pattern": "qwen-1.5", "family": "qwen", "context": 32768},
{"pattern": "qwen", "family": "qwen", "context": 32768},
{"pattern": "phi-3", "family": "phi", "context": 32768},
{"pattern": "phi-2", "family": "phi", "context": 4096},
{"pattern": "command-r-plus", "family": "cohere", "context": 128000},
{"pattern": "command-r", "family": "cohere", "context": 128000},
{"pattern": "command", "family": "cohere", "context": 128000},
{"pattern": "grok-2", "family": "grok", "context": 131072},
{"pattern": "grok-1.5", "family": "grok", "context": 131072},
{"pattern": "grok-1", "family": "grok", "context": 8192},
{"pattern": "grok", "family": "grok", "context": 8192},
]
# Check for matches in the patterns list
for entry in family_patterns:
if entry["pattern"] in model_name:
logger.debug(
f"Inferred family '{entry['family']}' for model '{model_name}' with context {entry['context']}"
)
return (entry["family"], entry["context"])
# If no match found, return default using the defined fallback
logger.debug(
f"Could not infer family for model '{model_name}', using fallback context {self.fallback_context_size}"
)
return ("unknown", self.fallback_context_size)
class ModelNotRecognizedError(Exception):
"""Exception raised when a model cannot be recognized."""
pass
def get_context_size(
self, model_name: str, __user__: Optional[dict] = None
) -> int: # Feature 1: Add __user__
"""Get the context size for a model.
Raises:
ModelNotRecognizedError: If the model cannot be recognized and no context size
can be determined.
"""
if not model_name:
raise self.ModelNotRecognizedError("Model name is empty or missing")
# Feature 1: Check user aliases first (handle both dict and Pydantic model for valves)
resolved_model_name = model_name
if __user__ and "valves" in __user__:
user_valves_data = __user__["valves"]
aliases = None
if isinstance(
user_valves_data, self.UserValves
): # Check if it's the Pydantic model instance
aliases = getattr(user_valves_data, "model_aliases", None)
elif isinstance(user_valves_data, dict): # Check if it's a dictionary
aliases = user_valves_data.get("model_aliases")
if aliases and isinstance(aliases, dict):
resolved_model_name = aliases.get(model_name, model_name)
if resolved_model_name != model_name:
logger.debug(
f"Resolved alias '{model_name}' to '{resolved_model_name}'"
)
try:
# Try exact match first (using resolved name)
if resolved_model_name in self.model_contexts:
context_size = self.model_contexts[resolved_model_name]
logger.debug(
f"Model '{resolved_model_name}' detected via: Exact Match. Context: {context_size}"
)
return context_size
# Try with OR. prefix (for OpenRouter models without prefixes)
or_prefixed = self.add_or_prefix(resolved_model_name)
if or_prefixed in self.model_contexts:
context_size = self.model_contexts[or_prefixed]
logger.debug(
f"Model '{resolved_model_name}' detected via: OR Prefix Match ('{or_prefixed}'). Context: {context_size}"
)
return context_size
# Try normalized name match
normalized = self.normalize_model_name(resolved_model_name)
if normalized in self.model_contexts:
context_size = self.model_contexts[normalized]
logger.debug(
f"Model '{resolved_model_name}' detected via: Normalized Match ('{normalized}'). Context: {context_size}"
)
return context_size
# --- Phase 2: Fuzzy matching ---
best_match = None
best_score = 0
threshold = self.valves.fuzzy_match_threshold
for known_model in self.model_contexts.keys():
score = self._simple_similarity(normalized, known_model)
if score > best_score:
best_score = score
best_match = known_model
if best_score >= threshold:
context_size = self.model_contexts[best_match]
logger.debug(
f"Model '{resolved_model_name}' detected via: Fuzzy Match ('{best_match}', {best_score}%). Context: {context_size}"
)
return context_size
# Try to match by known model family detection methods
if self.is_gpt4o(resolved_model_name):
context_size = self.GPT4O_CONTEXT_SIZE
logger.debug(
f"Model '{resolved_model_name}' detected via: Specific Check (is_gpt4o). Context: {context_size}"
)
return context_size
if self.is_claude(resolved_model_name):
context_size = 200000 # Latest Claude models have 200K context by default
logger.debug(
f"Model '{resolved_model_name}' detected via: Specific Check (is_claude). Context: {context_size}"
)
return context_size
if self.is_gemini(resolved_model_name):
context_size = self.GEMINI_FLASH_CONTEXT_SIZE # Default to the larger context
logger.debug(
f"Model '{resolved_model_name}' detected via: Specific Check (is_gemini). Context: {context_size}"
)
return context_size
# --- REMOVED Fallback: No longer inferring family as a last resort ---
# family, context_size = self.infer_model_family(resolved_model_name)
# if family != "unknown":
# logger.debug(
# f"Model '{resolved_model_name}' detected via: Family Inference ('{family}'). Context: {context_size}"
# )
# # Cache this result for future lookups
# self.model_contexts[resolved_model_name] = context_size
# return context_size
# If we get here, the model could not be recognized after all checks
model_name_display = (
resolved_model_name
if len(resolved_model_name) <= 30
else f"{resolved_model_name[:27]}..."
)
logger.warning(f"Model not recognized: '{resolved_model_name}'")
# Log unknown model if enabled
if self.valves.log_unknown_models:
try:
log_dir = os.path.join(os.getcwd(), "logs")
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, "unknown_models.log")
# Rotate log if too big
max_bytes = self.valves.unknown_models_log_max_size_kb * 1024
if (
os.path.exists(log_path)
and os.path.getsize(log_path) > max_bytes
):
rotated_path = log_path + ".1"
if os.path.exists(rotated_path):
os.remove(rotated_path)
os.rename(log_path, rotated_path)
# Deduplicate recent entries (simple approach)
recent_lines = set()
if os.path.exists(log_path):
with open(log_path, "r", encoding="utf-8") as f:
for line in f.readlines()[-100:]:
recent_lines.add(line.strip())
entry = f"{datetime.now().isoformat()} | {resolved_model_name}"
if entry not in recent_lines:
with open(log_path, "a", encoding="utf-8") as f:
f.write(entry + "\n")
except Exception as e:
logger.error(
f"Failed to log unknown model '{resolved_model_name}': {e}"
)
# Raise error with detection source info
raise self.ModelNotRecognizedError(
f"'{model_name_display}' unknown context size. Detection source: none"
)
except self.ModelNotRecognizedError:
# Re-raise model not recognized errors
raise
except Exception as e:
logger.error(f"Error getting context size: {e}")
raise self.ModelNotRecognizedError(
f"Error determining context size for model '{resolved_model_name}': {str(e)}"
)
def _simple_similarity(self, s1: str, s2: str) -> int:
"""Compute a simple similarity ratio (0-100) based on Levenshtein distance."""
if s1 == s2:
return 100
len_s1 = len(s1)
len_s2 = len(s2)
max_len = max(len_s1, len_s2)
if max_len == 0:
return 100
dist = self._levenshtein_distance(s1, s2)
similarity = int(100 * (1 - dist / max_len))
return similarity
def _levenshtein_distance(self, s1: str, s2: str) -> int:
"""Compute Levenshtein distance between two strings."""
if len(s1) < len(s2):
return self._levenshtein_distance(s2, s1)
if len(s2) == 0:
return len(s1)
previous_row = list(range(len(s2) + 1))
for i, c1 in enumerate(s1):
current_row = [i + 1]
for j, c2 in enumerate(s2):
insertions = previous_row[j + 1] + 1
deletions = current_row[j] + 1
substitutions = previous_row[j] + (c1 != c2)
current_row.append(min(insertions, deletions, substitutions))
previous_row = current_row
return previous_row[-1]
def _format_number(self, num: int) -> str:
"""Format large numbers to use K, M suffixes for better readability."""
if num >= 1000000:
return f"{num/1000000:.1f}M"
elif num >= 1000:
return f"{num/1000:.1f}K"
else:
return f"{num}"
def _format_progress_bar(self, percentage: float) -> str:
"""Format a progress bar based on the configured style."""
style = self.valves.progress_bar_style.lower()
if style == "none":
return ""
elif style == "minimal":
# Show a single block if usage > 0, otherwise empty
return PROGRESS_CHARS[1] if percentage > 0 else PROGRESS_CHARS[0]
elif style == "standard":
# Scale to bar length
filled_length = int(self.valves.bar_length * percentage / 100)
# Create the bar
bar = ""
for i in range(self.valves.bar_length):
if i < filled_length:
bar += PROGRESS_CHARS[1] # Filled character
else:
bar += PROGRESS_CHARS[0] # Empty character
return f"[{bar}]" # Return standard bar enclosed in brackets
else: # Default to standard if style is unrecognized
logger.warning(f"Unrecognized progress_bar_style '{self.valves.progress_bar_style}', defaulting to 'standard'.")
filled_length = int(self.valves.bar_length * percentage / 100)
bar = ""
for i in range(self.valves.bar_length):
if i < filled_length:
bar += PROGRESS_CHARS[1]
else:
bar += PROGRESS_CHARS[0]
return f"[{bar}]"
def _format_cost(self, cost: float, context_size: int) -> str:
"""Format cost with appropriate decimal precision based on context size and amount.
For very small costs (common with large context windows like 1M+), this ensures
that at least the first two significant digits are visible, making the cost
more readable and meaningful to users.
Args:
cost: The calculated cost amount
context_size: The context window size (used to adjust formatting logic)
Returns:
Formatted cost string with appropriate precision
"""
# For tiny costs (very large contexts like 1M+), show enough decimals to display first 2 significant digits
if cost < 0.01:
# Small costs: show six decimal places
return f"${cost:.6f}"
# For small contexts/higher costs, use fewer decimals
elif cost < 0.1:
return f"${cost:.5f}"
elif cost < 1.0:
return f"${cost:.4f}"
elif cost < 10.0:
return f"${cost:.3f}"
else:
return f"${cost:.2f}"
def _generate_status_message(
self,
status_prefix,
total_tokens,
context_size,
context_percentage,
progress_bar,
input_tokens,
output_tokens,
cost,
model_name, # Added model_name for cost breakdown
elapsed_time=None,
tokens_per_second=None,
rolling_rate=None, # Added rolling_rate
cost_comparisons=None,
status_level="normal",
budget_info=None, # Daily/Session budget info
trimming_hint=None,
# --- Add new parameters ---
monthly_budget_info: Optional[Dict] = None, # Pass pre-calculated monthly info
daily_spend: float = 0.0,
text_tokens: int = 0,
image_tokens: int = 0,
calibration_status: str = "Approximate",
) -> str:
"""Generate a status message with controlled length to avoid truncation.
Prioritizes different elements of the status message based on importance:
1. Base information (tokens, context percentage)
2. Token breakdown
3. Cost information (with visual breakdown in detailed mode)
4. Budget Information
5. Performance metrics
6. Trimming Hint (if applicable)
7. Cost comparisons (only for experimental models)
8. Cache Hit Rate (detailed/debug)
9. Error Rate (detailed/debug)
Args:
Various status elements to include in the message
Returns:
Formatted status message with appropriate length
"""
max_length = 120 # Define max length early
pricing_source_fallback = (
not self.using_dynamic_pricing
) # Feature 10: Check flag
# ----- GUARANTEED NON-TRUNCATING DISPLAY MODES -----
# Format common elements used across all modes
# (Removed unused variables: formatted_tokens, formatted_context, formatted_input, formatted_output)
# Base information (highest priority)
status_message = f"{status_prefix}{TOKEN_EMOJI} {self._format_number(total_tokens)}/{self._format_number(context_size)} tokens ({context_percentage:.2f}%)"
# Add progress bar if enabled and style is not 'none'
if self.valves.show_progress and self.valves.progress_bar_style.lower() != "none":
# The _format_progress_bar function now returns the complete formatted bar (or single char)
status_message += f" {progress_bar}"
# Add token breakdown (high priority)
token_breakdown = f" | 🔽{self._format_number(input_tokens)}/🔼{self._format_number(output_tokens)}" # Simpler breakdown with new emojis
# Add cost information (medium priority)
cost_info = ""
if self.valves.show_cost_summary:
# Removed cost_indicator logic for fallback pricing
# cost_indicator = ("*" if pricing_source_fallback else "")
# Improvement #4: Visual Cost Breakdown
input_cost_part = input_tokens * self._get_model_pricing(
model_name, None
).get(
"input", 0
) # Recalculate parts for ratio
output_cost_part = output_tokens * self._get_model_pricing(
model_name, None
).get("output", 0)
# REMOVE cost breakdown percentages
cost_info = f" | {MONEY_EMOJI} {cost}" # Removed cost_indicator
# Feature 2 & Item 1/7: Combine Daily Budget/Spend for brevity
budget_msg = ""
if budget_info and budget_info["budget"] > 0 and self.valves.show_budget_info:
# Format: Spent/Total (% Used)
daily_spend_formatted = self._format_cost(daily_spend, context_size)
daily_budget_formatted = self._format_cost(budget_info['budget'], context_size)
daily_percent_formatted = f"{budget_info['used_percent']:.1f}%" # Use 1 decimal for brevity
daily_warning = f"{WARNING_EMOJI} " if budget_info.get("warning", False) else ""
budget_msg = f" | {daily_warning}{BUDGET_EMOJI} Daily: {daily_spend_formatted}/{daily_budget_formatted} ({daily_percent_formatted})"
elif self.valves.show_daily_spend_info and daily_spend > 0: # Show only spend if budget is off/0
daily_spend_formatted = self._format_cost(daily_spend, context_size)
budget_msg = f" | 📅 {daily_spend_formatted} spent today"
# Add monthly budget info if enabled and available (using pre-calculated info)
monthly_budget_msg = ""
if self.valves.show_monthly_budget_info and monthly_budget_info: # Keep monthly separate for now
effective_monthly_budget = monthly_budget_info.get("budget", 0.0)
current_month_cost = monthly_budget_info.get("current_month_cost", 0.0)
if effective_monthly_budget > 0: # Budget is enabled
# Use the result of _format_cost directly as it includes '$'
remaining_monthly_formatted = self._format_cost(monthly_budget_info['remaining'], context_size)
used_percent_formatted = f"{monthly_budget_info['used_percent']:.2f}"
monthly_warning = f"{WARNING_EMOJI} " if monthly_budget_info.get("warning", False) else ""
# Remove the extra '$' from the f-string
monthly_budget_msg = f" | {monthly_warning}🗓️ {remaining_monthly_formatted} left ({used_percent_formatted}%) this month"
else: # Budget is disabled (0), show total spent
# Use the result of _format_cost directly as it includes '$'
current_month_cost_formatted = self._format_cost(current_month_cost, context_size)
# Remove the extra '$' from the f-string
monthly_budget_msg = f" | 🗓️ {current_month_cost_formatted} spent this month"
# Add daily spend info if enabled
daily_spend_msg = ""
if self.valves.show_daily_spend_info and daily_spend > 0:
daily_spend_formatted = self._format_cost(daily_spend, context_size) # Format daily spend
daily_spend_msg = f" | 📅 {daily_spend_formatted} spent today"
# Add text/image split if enabled
text_image_msg = ""
if self.valves.show_text_image_split and (text_tokens > 0 or image_tokens > 0): # Show if either is non-zero
text_image_msg = f" | Text: {self._format_number(text_tokens)}"
if image_tokens > 0:
# Add ~ to indicate image tokens are estimated
text_image_msg += f", Img: ~{self._format_number(image_tokens)}"
# Add image token warning if enabled and images present (Removed as info is now in text_image_msg)
image_token_warning_msg = ""
# if self.valves.show_image_token_warning and image_tokens > 0:
# image_token_warning_msg = f" | ~{self._format_number(image_tokens)} img tokens (est.)"
# Add trimming hint if enabled and applicable
trimming_hint_msg = ""
# Use the passed trimming_hint directly if it exists and the valve is enabled
if self.valves.show_trimming_hint and trimming_hint:
trimming_hint_msg = f" | {trimming_hint}"
# Add cost optimization hints if enabled
cost_optimization_msg = ""
if self.valves.enable_cost_optimization_hints:
# 1. High prompt cost hint
if input_tokens > 0 and input_tokens > output_tokens * 0.5: # If input is significant compared to output
input_cost = input_tokens * self._get_model_pricing(model_name, None).get("input", 0)
if input_cost >= self.valves.prompt_cost_warning_threshold:
cost_optimization_msg = f" | 💡 Consider summarizing input to reduce cost"
# 2. Expensive model suggestion
if not cost_optimization_msg: # Only show one hint at a time
model_input_price = self._get_model_pricing(model_name, None).get("input", 0)
model_output_price = self._get_model_pricing(model_name, None).get("output", 0)
# Check if either input or output price exceeds threshold
if model_input_price > self.valves.expensive_model_cost_threshold or model_output_price > self.valves.expensive_model_cost_threshold:
# Suggest cheaper alternatives based on model family
if "gpt-4" in model_name.lower() or "o1" in model_name.lower():
cost_optimization_msg = f" | 💡 For simpler tasks, try GPT-3.5 or Claude Haiku"
elif "claude-3" in model_name.lower() and "haiku" not in model_name.lower():
cost_optimization_msg = f" | 💡 For simpler tasks, try Claude Haiku"
elif "gemini-pro" in model_name.lower():
cost_optimization_msg = f" | 💡 For simpler tasks, try Gemini Flash"
# 3. Budget limit warnings with specific hints
if not cost_optimization_msg: # Only show one hint at a time
# Daily/session budget warning
if budget_info and budget_info.get("warning", False):
cost_optimization_msg = f" | 💡 Daily budget nearing limit. Consider switching model"
# Monthly budget warning
elif monthly_budget_info and monthly_budget_info.get("warning", False):
cost_optimization_msg = f" | 💡 Monthly budget nearing limit. Consider switching model"
# Add calibration status if enabled (using the passed status string)
calibration_status_msg = ""
if self.valves.show_calibration_status and calibration_status:
# Determine emoji based on the content of the status string
if "Calibrated" in calibration_status:
cal_emoji = "🔧"
elif "Unknown" in calibration_status or "Approximate" in calibration_status:
cal_emoji = "⚠️" # Use warning for unknown/approximate
else:
cal_emoji = "" # No emoji for other statuses? Or default to warning? Let's default to warning for safety.
cal_emoji = "⚠️"
# Use the passed status string, optionally removing timestamp
status_text = calibration_status
if not self.valves.show_calibration_timestamp: # Item 3: Check new valve
# Remove timestamp part like " (as of Apr 16 14:30)"
status_text = re.sub(r"\s+\(as of .*\)", "", calibration_status)
calibration_status_msg = f" | {cal_emoji} {status_text}"
# Add performance metrics if enabled (lower priority)
performance_msg = ""
if self.valves.show_metrics_panel and elapsed_time and elapsed_time > 0:
# Use rolling rate if available and valid, otherwise use overall rate
rate_to_display = (
rolling_rate
if rolling_rate is not None and rolling_rate > 0
else tokens_per_second
)
rate_indicator = (
"~" if rolling_rate is not None and rolling_rate > 0 else ""
) # Indicate rolling avg
performance_msg = f" | {CLOCK_EMOJI} {elapsed_time:.1f}s ({rate_indicator}{rate_to_display:.1f} t/s)"
# Add cost comparisons for experimental models (lowest priority)
comparisons_msg = ""
if cost_comparisons:
comparisons_msg = " | Compare: " + ", ".join(
[
f"{m.split('/')[-1].split('-')[0]}: {self._format_cost(c, context_size)}"
for m, c in cost_comparisons.items()
]
)
# Item 6: Add Cache Hit Rate (controlled by valve, not just debug)
cache_msg = ""
if self.valves.show_cache_hit_rate: # Use the new valve
hits = self.token_cache_stats["hits"]
misses = self.token_cache_stats["misses"]
total_lookups = hits + misses
if total_lookups > 0:
hit_rate = (
hits / (total_lookups + 1e-9)
) * 100 # Avoid division by zero
cache_msg = (
f" | {CACHE_EMOJI} Cache: {hit_rate:.1f}% ({hits}/{total_lookups})"
)
else:
cache_msg = f" | {CACHE_EMOJI} Cache: N/A"
# Item 6: Add Error Rate (controlled by valve, show even if 0 if valve is on)
error_msg = ""
if self.valves.show_error_rate: # Use the new valve
errors = self.session_stats["error_count"]
requests = self.session_stats["requests"]
# Show even if errors are 0, if the valve is enabled
error_msg = f" | Errors: {errors}/{requests}"
# --- Assemble the message, checking length ---
# Always include token breakdown (high priority)
if len(status_message) + len(token_breakdown) <= max_length:
status_message += token_breakdown
# Add cost information if fits
if len(status_message) + len(cost_info) <= max_length:
status_message += cost_info
# Feature 2: Add Budget info if fits
if len(status_message) + len(budget_msg) <= max_length:
status_message += budget_msg
# Add performance metrics if fits
if len(status_message) + len(performance_msg) <= max_length:
status_message += performance_msg
# Add Trimming Hint if applicable and fits
trim_msg = ""
if status_level == "critical" and trimming_hint:
trim_msg = f" {trimming_hint}"
if len(status_message) + len(trim_msg) <= max_length:
status_message += trim_msg
else: # Fallback if hint is too long
clear_suggestion = " (Clear Chat?)"
if len(status_message) + len(clear_suggestion) <= max_length:
status_message += clear_suggestion
elif (
status_level == "critical"
): # Default suggestion if hint disabled/not generated
clear_suggestion = " (Clear Chat?)"
if len(status_message) + len(clear_suggestion) <= max_length:
status_message += clear_suggestion
# Add Cache Rate if fits (detailed/debug)
if len(status_message) + len(cache_msg) <= max_length:
status_message += cache_msg
# Add Error Rate if fits (detailed/debug)
if len(status_message) + len(error_msg) <= max_length:
status_message += error_msg
# Item 6: Add cost comparisons if fits (controlled by valve)
if self.valves.show_cost_comparisons:
if len(status_message) + len(comparisons_msg) <= max_length:
status_message += comparisons_msg
elif comparisons_msg and len(status_message) <= max_length - 30:
# If full comparisons won't fit, add a shortened version
# Just include first model comparison
short_comparison = " | Compare: " + ", ".join(
[
f"{m.split('/')[-1].split('-')[0]}: {self._format_cost(c, context_size)}"
for m, c in list(cost_comparisons.items())[:1]
]
)
if len(status_message) + len(short_comparison) <= max_length:
status_message += short_comparison
# Append remaining optional UI elements if enabled (ignore length limit for these)
# Note: Daily spend is now part of the combined budget_msg or shown if budget is off
status_message += monthly_budget_msg # Monthly budget kept separate
# status_message += daily_spend_msg # Removed as it's combined or conditional in budget_msg
status_message += text_image_msg
# Define image_token_warning_msg if it's not already defined
image_token_warning_msg = ""
if self.valves.show_image_token_warning and image_tokens > 0:
image_token_warning_msg = f" | ~{self._format_number(image_tokens)} img tokens (est.)"
status_message += image_token_warning_msg
status_message += trimming_hint_msg
status_message += calibration_status_msg
return status_message
def _is_experimental_model(self, model_name: str) -> bool:
"""Determine if a model is experimental or newly released.
This helps identify models that might benefit from cost comparisons
with more established models.
Args:
model_name: The model name to check
Returns:
True if the model is considered experimental, False otherwise
"""
# Indicators in name suggesting experimental status
experimental_indicators = [
"preview",
"alpha",
"beta",
"test",
"exp",
"experimental",
"4.5",
"3.7",
"2.5",
"o3", # Version indicators for new models
"-latest",
"-dev",
"-preview",
]
# Normalize and lowercase for better matching
normalized = self.normalize_model_name(model_name).lower()
# Check for any experimental indicators
for indicator in experimental_indicators:
if indicator in normalized:
return True
# Also consider models not in our standard dictionary as experimental
return model_name not in self.model_contexts
def _find_similar_model(self, model_name: str) -> str:
"""Find a similar known model to use for estimates when exact model is unknown.
This method attempts to match the unknown model to a known model family
by examining patterns in the model name, allowing for reasonable
pricing and context estimates even when a model isn't explicitly recognized.
Args:
model_name: The unrecognized model name to find a similar model for
Returns:
The name of a similar known model to use for estimates
"""
# Normalize the model name for better pattern matching
normalized = self.normalize_model_name(model_name)
# Try to match by model family
if self.is_gpt4o(model_name) or "gpt-4" in normalized:
return "gpt-4"
elif self.is_claude(model_name):
return "claude-3-sonnet"
elif self.is_gemini(model_name):
return "gemini-pro"
elif "mistral" in normalized:
return "mistral-medium"
elif "llama" in normalized:
return "meta-llama/llama-3-70b-instruct"
elif "qwen" in normalized:
return "qwen-2.5-72b"
# Default fallback
return "gpt-3.5-turbo"
def _get_model_pricing(
self, model_name: str, __user__: Optional[dict] = None
) -> Dict[str, float]: # Feature 1: Add __user__
"""Get pricing information for a model."""
# DEBUG: Log the input model name
logger.debug(f"Entering _get_model_pricing with model_name: '{model_name}'")
# Feature 1: Check user aliases first (handle both dict and Pydantic model for valves)
resolved_model_name = model_name
if __user__ and "valves" in __user__:
user_valves_data = __user__["valves"]
aliases = None
if isinstance(
user_valves_data, self.UserValves
): # Check if it's the Pydantic model instance
aliases = getattr(user_valves_data, "model_aliases", None)
elif isinstance(user_valves_data, dict): # Check if it's a dictionary
aliases = user_valves_data.get("model_aliases")
if aliases and isinstance(aliases, dict):
resolved_model_name = aliases.get(model_name, model_name)
if resolved_model_name != model_name:
logger.debug(
f"Pricing: Resolved alias '{model_name}' to '{resolved_model_name}'"
)
# DEBUG: Log the resolved model name used for lookup
logger.debug(f"Resolved model name for pricing lookup: '{resolved_model_name}'")
if not resolved_model_name:
logger.warning("Empty model name passed to _get_model_pricing, using default fallback.")
return {"input": 0.0000015, "output": 0.000002} # Default to GPT-3.5 pricing
# --- Pricing Lookup Logic ---
pricing_info = None
source = "None"
# 1. Try exact match
if resolved_model_name in self.model_pricing:
pricing_info = self.model_pricing[resolved_model_name]
source = "Exact Match"
# 2. Try normalized name
else:
normalized = self.normalize_model_name(resolved_model_name)
if normalized in self.model_pricing:
pricing_info = self.model_pricing[normalized]
source = "Normalized Match"
# --- Validation and Fallback ---
# Check if pricing was found and if it's non-zero (unless it's a known free model)
is_free_model = "free" in resolved_model_name.lower()
if pricing_info and (pricing_info.get("input", 0) > 0 or pricing_info.get("output", 0) > 0 or is_free_model):
logger.debug(f"Pricing for '{resolved_model_name}' found via {source}: {pricing_info}")
return pricing_info
else:
# If pricing is missing or zero for a non-free model, log a warning and use generic fallback
# DEBUG: Include current pricing keys and original model name in the warning
current_pricing_keys = list(self.model_pricing.keys())
logger.warning(f"Pricing not found or zero via {source} for resolved_model_name='{resolved_model_name}' (original input='{model_name}'). Using generic fallback (GPT-3.5). Current pricing keys: {current_pricing_keys}")
# The hardcoded_pricing dictionary is local to __init__ and merged into self.model_pricing.
# We don't need to access it separately here. The check above handles if the merged value is valid.
# If it wasn't found or was invalid, we use the generic fallback.
return {"input": 0.0000015, "output": 0.000002} # Generic fallback pricing
def _calculate_cost(
self,
input_tokens: int,
output_tokens: int,
model_name: str,
__user__: Optional[dict] = None,
) -> float: # Feature 1: Add __user__
"""Calculate the cost of a conversation based on token usage."""
# --- Free model detection ---
if "free" in model_name.lower():
logger.info(
f"Cost calculation skipped: '{model_name}' detected as free model."
)
return 0.0
pricing = self._get_model_pricing(
model_name, __user__
) # Feature 1: Pass __user__
# Log the pricing being used for debugging
logger.debug(f"Calculating cost for '{model_name}' using pricing: Input=${pricing.get('input', 0):.8f}, Output=${pricing.get('output', 0):.8f}")
input_cost = input_tokens * pricing.get("input", 0) # Use .get with default 0
output_cost = output_tokens * pricing.get("output", 0) # Use .get with default 0
# The _emit_status call was incorrectly placed here by a previous edit.
# It belongs inside the async _emit_status function itself.
# Removing the misplaced call from this synchronous function.
total_cost = input_cost + output_cost
# Apply compensation factor (for profit, rounding, etc.)
if self.valves.compensation != 1.0:
total_cost *= self.valves.compensation
return total_cost
async def _emit_status(
self, event_emitter, description: str, done: bool = False
) -> None:
"""Emit a status event to update the UI."""
if not event_emitter or not self.valves.show_status:
return
logger.debug(f"Emitting status: {description}, done={done}")
# Use simplified status to ensure compatibility
message = {"type": "status", "data": {"description": description, "done": done}}
try:
await event_emitter(message) # Correct location for await
logger.debug("Status emitted successfully")
except Exception as e:
logger.error(f"Error emitting status: {str(e)}")
# INLET METHOD (Pre-processing)
def inlet(self, body: dict, __user__: Optional[dict] = None) -> dict:
"""Process requests before they reach the LLM.
This is the entry point for the Filter function. It initializes timing,
session stats, and prepares for token counting. It also checks input prompt cost.
Args:
body: The request body containing messages
__user__: Optional user information
Returns:
The potentially modified request body
"""
# Start timing for performance metrics
self.start_time = time.time()
# Increment request counter
self.request_counter += 1
self.session_stats["requests"] += 1
# Feature 2: Reset session cost if tracking mode is 'session'
if self.valves.budget_tracking_mode == "session":
self.session_stats["session_cost"] = 0.0
logger.debug("Reset session cost for new request.")
# Log the request
logger.debug(f"Processing request #{self.request_counter}")
# Log the request
logger.debug(f"Processing request #{self.request_counter}")
# --- Inlet Cost Prediction ---
if self.valves.prompt_cost_warning_threshold > 0:
try:
model_name = body.get("model", "default")
messages = body.get("messages", [])
# Consider only the last user message for prompt cost check
last_user_message_content = ""
if messages and messages[-1].get("role") == "user":
content = messages[-1].get("content", "")
if isinstance(content, str):
last_user_message_content = content
if last_user_message_content:
prompt_tokens = self.count_tokens(
last_user_message_content, model_name
)
pricing = self._get_model_pricing(model_name, __user__)
estimated_input_cost = prompt_tokens * pricing.get("input", 0)
if (
estimated_input_cost
>= self.valves.prompt_cost_warning_threshold
):
logger.warning(
f"Estimated input cost (${estimated_input_cost:.6f}) for the current prompt "
f"exceeds threshold (${self.valves.prompt_cost_warning_threshold:.6f}). "
f"Model: {model_name}, Prompt Tokens: {prompt_tokens}"
)
# Note: Emitting status from inlet might be unreliable/unsupported in OpenWebUI
# await self._emit_status(__event_emitter__, f"⚠️ High Prompt Cost: ~${estimated_input_cost:.4f}", False) # Requires __event_emitter__ in signature
except Exception as e:
logger.error(f"Error during inlet cost prediction: {e}")
# --- End Inlet Cost Prediction ---
# Return the unmodified body (this filter doesn't modify inputs)
return body
# STREAM METHOD (Real-time processing)
def stream(self, event: dict) -> dict:
"""Process streamed chunks from the LLM in real-time.
This enhanced method tracks token generation speed in real-time using an adaptive window.
Args:
event: The stream event from the LLM
Returns:
The potentially modified event
"""
# Get request ID from the event
request_id = event.get("id", "")
# Only process events for the current request to maintain status persistence
# This prevents overwriting status of previous messages
if not request_id or request_id == self.current_request_id:
# Get current time for performance measurements
now = time.time()
# Initialize stream timing if not started
if self.stream_start_time is None:
self.stream_start_time = now
self.stream_token_counter = 0
self.stream_history.clear() # Clear history for new stream
# Reset deque maxlen based on config
self.stream_history = deque(maxlen=self.valves.rate_avg_window_max)
# Count tokens in this chunk
chunk_token_count = 0
for choice in event.get("choices", []):
if "delta" in choice and "content" in choice.get("delta", {}):
content = choice["delta"]["content"]
# Use simple approximation for streaming - refine later with actual tokenizer if needed
token_count = len(content) // 4
self.stream_token_counter += token_count
chunk_token_count += token_count # Track tokens in this specific chunk for rolling rate
# Update rolling rate history
if chunk_token_count > 0:
self.stream_history.append((now, self.stream_token_counter))
# Calculate and update rate periodically using configurable interval
if now - self.last_stream_update >= self.valves.stream_update_interval:
elapsed = now - self.stream_start_time
rolling_rate = None
# --- Adaptive Token Rate Averaging ---
window_size = len(self.stream_history)
if self.valves.adaptive_rate_averaging and window_size > 1:
# Determine adaptive window based on current overall rate
current_overall_rate = (
self.stream_token_counter / elapsed if elapsed > 0 else 0
)
if current_overall_rate > self.valves.rate_fast_threshold:
adaptive_window = self.valves.rate_avg_window_min
elif current_overall_rate < self.valves.rate_slow_threshold:
adaptive_window = self.valves.rate_avg_window_max
else:
# Linear interpolation between min and max window size
ratio = (
current_overall_rate - self.valves.rate_slow_threshold
) / (
self.valves.rate_fast_threshold
- self.valves.rate_slow_threshold
+ 1e-9
) # Avoid div by zero
adaptive_window = int(
self.valves.rate_avg_window_max
- ratio
* (
self.valves.rate_avg_window_max
- self.valves.rate_avg_window_min
)
)
adaptive_window = max(
self.valves.rate_avg_window_min,
min(adaptive_window, self.valves.rate_avg_window_max),
)
# Adjust deque maxlen if needed (only if different from current)
if self.stream_history.maxlen != adaptive_window:
logger.debug(f"Adapting rate window to: {adaptive_window}")
# Create new deque with new maxlen and populate with recent history
new_history = deque(
list(self.stream_history)[-adaptive_window:],
maxlen=adaptive_window,
)
self.stream_history = new_history
window_size = len(
self.stream_history
) # Update window size after potential resize
# Use the (potentially adapted) window slice, ensuring at least 2 samples
calc_window = min(
window_size, self.stream_history.maxlen
) # Use current maxlen
if calc_window > 1:
# Correct slicing: Use negative index to get last 'calc_window' elements
relevant_history = list(self.stream_history)[-calc_window:]
first_time, first_tokens = relevant_history[0]
last_time, last_tokens = relevant_history[-1]
time_diff = last_time - first_time
token_diff = last_tokens - first_tokens
if time_diff > 0.1: # Avoid division by zero or unstable rates
rolling_rate = token_diff / time_diff
logger.debug(
f"Adaptive Rate: Window={calc_window}, Rate={rolling_rate:.1f} t/s"
)
# Fallback to overall rate if rolling rate couldn't be calculated
if (
rolling_rate is None
and elapsed > 0
and self.stream_token_counter > 0
):
self.current_token_rate = self.stream_token_counter / elapsed
elif rolling_rate is not None:
self.current_token_rate = (
rolling_rate # Store the calculated rolling rate
)
# --- End Adaptive Token Rate Averaging ---
self.last_stream_update = now
else:
logger.debug(f"Ignoring stream event for non-current request: {request_id}")
return event
# OUTLET METHOD (Post-processing)
async def outlet(
self,
body: dict,
__event_emitter__: Callable[[Any], Awaitable[None]],
__user__: Optional[dict] = None,
__model__: Optional[dict] = None,
) -> dict:
"""Process responses after they've been generated by the LLM.
This is where the main token counting, cost calculation, and metrics display happen.
Args:
body: The response body containing messages
__event_emitter__: Function to emit status updates to UI
__user__: Optional user information
__model__: Optional model information
Returns:
The potentially modified response body
"""
logger.debug("Entering outlet method...")
# Re-parse custom models from valves *before* processing to ensure latest config is used
logger.debug("Re-parsing custom models in outlet...")
self._parse_and_apply_plaintext_models()
logger.debug(f"Model pricing after re-parsing in outlet: {self.model_pricing}")
# DEBUG: Log entire response body keys and usage info
try:
logger.debug(f"Response body keys: {list(body.keys())}")
logger.debug(f"Response body 'usage': {body.get('usage')}")
except Exception:
pass
# Store the current request ID to maintain status persistence
self.current_request_id = body.get("id", f"request-{self.request_counter}")
logger.debug(f"Processing outlet for request ID: {self.current_request_id}")
# End timing for performance metrics
end_time = time.time()
elapsed_time = (
end_time - self.start_time if self.start_time else 0
) # Handle case where inlet might not have run
try:
logger.debug(f"Response body keys: {list(body.keys())}")
logger.debug(f"Response body 'usage': {body.get('usage')}")
except Exception:
pass
# Extract model information
model_name = __model__.get("id", "default") if __model__ else "default"
# Get all messages
messages = body.get("messages", [])
# Extract the last assistant message
get_last_assistant_message(messages)
# Count tokens in all messages
input_tokens = 0
output_tokens = 0
total_text_tokens = 0
total_image_tokens = 0
# Store token counts per message for trimming hint
message_token_counts = []
for idx, message in enumerate(messages):
role = message.get("role", "")
content = message.get("content", "")
msg_tokens = 0
image_tokens = 0
text_tokens = 0
if isinstance(content, str):
# --- Image token estimation ---
images = []
images += re.findall(r"!\[.*?\]\((.*?)\)", content)
if "data:image/" in content:
images += re.findall(r'(data:image/[^"\')\s]+)', content)
images += re.findall(
r"(https?://[^\s)]+(?:\.png|\.jpg|\.jpeg|\.gif|\.webp|\.bmp|\.tiff))",
content,
re.IGNORECASE,
)
for img in images:
override = self.valves.model_image_token_overrides.get(
model_name
)
if override:
est_tokens = override
else:
est_tokens = self.valves.default_image_tokens
image_tokens += est_tokens
# Count text tokens only
text_tokens = self.count_tokens(content, model_name)
# Add image tokens to message tokens
msg_tokens = text_tokens + image_tokens
# Sum separate totals
total_text_tokens += text_tokens
total_image_tokens += image_tokens
if role == "user":
input_tokens += msg_tokens
elif role == "assistant":
output_tokens += msg_tokens
# Store token count for trimming hint analysis
message_token_counts.append(
{"role": role, "tokens": msg_tokens, "index": idx}
)
# Calculate total tokens
total_tokens = input_tokens + output_tokens
# Update session stats
self.session_stats["total_tokens"] = total_tokens
self.session_stats["input_tokens"] = input_tokens
self.session_stats["output_tokens"] = output_tokens
try:
# Try to get context size and calculate percentage (Pass __user__ for alias check - Feature 1)
context_size = self.get_context_size(model_name, __user__)
context_percentage = (
(total_tokens / context_size) * 100 if context_size > 0 else 0
)
# Calculate performance metrics using the potentially adaptive rate from stream
tokens_per_second = (
self.current_token_rate if self.current_token_rate > 0 else 0.0
)
if (
elapsed_time > 0 and tokens_per_second == 0.0
): # Fallback if stream rate wasn't set
tokens_per_second = output_tokens / elapsed_time
self.session_stats["avg_tokens_per_sec"] = tokens_per_second
self.session_stats["message_generation_time"] = elapsed_time
# Calculate cost (Pass __user__ for alias check - Feature 1)
total_cost = self._calculate_cost(
input_tokens, output_tokens, model_name, __user__
)
self.session_stats["total_cost"] = total_cost
# Feature 2: Update and get current period's cost
current_period_cost = 0.0
if self.valves.budget_tracking_mode == "session":
self.session_stats["session_cost"] += total_cost
current_period_cost = self.session_stats["session_cost"]
elif self.valves.budget_tracking_mode == "daily":
current_period_cost = self._load_and_update_daily_cost(total_cost)
self.session_stats["daily_cost"] = (
current_period_cost # Store for potential display
)
# Feature 2: Calculate DAILY/SESSION budget info
budget_info = None
user_budget = self.valves.budget_amount # Default global daily/session budget
if __user__ and "valves" in __user__:
user_valves_data = __user__["valves"]
override_budget = None
if isinstance(
user_valves_data, self.UserValves
): # Pydantic model instance
override_budget = getattr(
user_valves_data, "budget_amount", None
)
elif isinstance(user_valves_data, dict): # Dictionary
override_budget = user_valves_data.get("budget_amount")
if override_budget is not None:
user_budget = override_budget # Use user override if set and valid
if user_budget > 0: # Only calculate if budget is enabled
budget_remaining = user_budget - current_period_cost
budget_used_percent = (current_period_cost / user_budget * 100) if user_budget > 0 else 0
budget_warning = (budget_used_percent >= self.valves.budget_warning_percentage)
budget_info = {
"budget": user_budget, # The budget amount used (daily/session)
"remaining": budget_remaining,
"used_percent": budget_used_percent,
"warning": budget_warning,
}
# --- Monthly Budget Calculation (using potential user override) ---
user_monthly_budget = self.valves.monthly_budget_amount # Default global monthly budget
if __user__ and "valves" in __user__:
user_valves_data = __user__["valves"]
override_monthly_budget = None
if isinstance(user_valves_data, self.UserValves): # Pydantic model
override_monthly_budget = getattr(user_valves_data, "monthly_budget_amount", None)
elif isinstance(user_valves_data, dict): # Dictionary
override_monthly_budget = user_valves_data.get("monthly_budget_amount")
if override_monthly_budget is not None:
user_monthly_budget = override_monthly_budget # Use user override
# Update the monthly cost file *now* with the final total_cost for this request
# This ensures the cost is persisted correctly before generating the final status message
monthly_cost_data = self._load_and_update_monthly_cost(total_cost)
# Recalculate monthly budget info using the potentially overridden budget
monthly_budget_info = None
if user_monthly_budget > 0 and monthly_cost_data:
current_month_total_cost = monthly_cost_data.get("current_month_cost", 0.0)
monthly_remaining = user_monthly_budget - current_month_total_cost
monthly_used_percent = (current_month_total_cost / user_monthly_budget * 100) if user_monthly_budget > 0 else 0
monthly_warning = (monthly_used_percent >= self.valves.budget_warning_percentage)
monthly_budget_info = {
"budget": user_monthly_budget, # The effective monthly budget
"remaining": monthly_remaining,
"used_percent": monthly_used_percent,
"warning": monthly_warning,
"current_month_cost": current_month_total_cost # Include total cost for display if budget=0
}
elif monthly_cost_data: # Handle case where budget is 0 but we want to show total spent
monthly_budget_info = {
"budget": 0.0,
"remaining": 0.0,
"used_percent": 0.0,
"warning": False,
"current_month_cost": monthly_cost_data.get("current_month_cost", 0.0)
}
# Format metrics for display
progress_bar = self._format_progress_bar(context_percentage)
# Determine status level based on context usage
status_level = "normal"
if context_percentage >= self.valves.critical_at_percentage:
status_level = "critical"
elif context_percentage >= self.valves.warn_at_percentage:
status_level = "warning"
# --- Intelligent Trimming Hint ---
trimming_hint = None
if (
status_level == "critical"
and self.valves.intelligent_trimming_hint
and len(messages) > 1
):
tokens_to_trim = 0
# Analyze first few messages (excluding potential system prompt at index 0)
num_msgs_to_analyze = min(
self.valves.trimming_hint_message_count, len(messages) - 1
)
start_index = 1 if messages[0].get("role") == "system" else 0
# Ensure we don't go out of bounds and analyze at least one message if possible
end_index = min(
start_index + num_msgs_to_analyze, len(message_token_counts)
)
if end_index > start_index:
for i in range(start_index, end_index):
tokens_to_trim += message_token_counts[i]["tokens"]
if tokens_to_trim > 0:
trimming_hint = f"{SCISSORS_EMOJI} Trim ~{self._format_number(tokens_to_trim)} tokens?"
# --- End Intelligent Trimming Hint ---
# Reset stream tracking for next generation
self.stream_token_counter = 0
self.stream_start_time = None
self.stream_history.clear() # Clear history
self.current_token_rate = 0.0 # Reset rate
# Determine status prefix based on context usage
if context_percentage >= self.valves.critical_at_percentage:
status_prefix = f"{WARNING_EMOJI} CRITICAL: "
elif context_percentage >= self.valves.warn_at_percentage:
status_prefix = f"{WARNING_EMOJI} WARNING: "
else:
status_prefix = ""
# Get cost comparisons for experimental models
cost_comparisons = None
if (
self._is_experimental_model(model_name)
and self.valves.show_cost_summary
):
# Select models to compare with - use correct model ID format
comparison_models = ["openai/gpt-4o"] # Removed claude-3-sonnet as requested
cost_comparisons = {}
# Calculate costs for comparison models
for comp_model in comparison_models:
if comp_model != model_name: # Don't compare to self
# Pass __user__ for alias check (Feature 1)
comp_pricing = self._get_model_pricing(comp_model, __user__)
comp_cost = (input_tokens * comp_pricing["input"]) + (
output_tokens * comp_pricing["output"]
)
cost_comparisons[comp_model] = comp_cost
# Format cost with appropriate precision
formatted_cost = self._format_cost(total_cost, context_size)
# Use the calibration status loaded during initialization
calibration_status = self.calibration_status_display
# Generate optimized status message with controlled length, passing the calculated monthly info
status_message = self._generate_status_message(
status_prefix=status_prefix,
total_tokens=total_tokens,
context_size=context_size,
context_percentage=context_percentage,
progress_bar=progress_bar,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost=formatted_cost,
model_name=model_name, # Pass model_name for cost breakdown
elapsed_time=elapsed_time,
tokens_per_second=tokens_per_second, # Pass the calculated rate
rolling_rate=(
self.current_token_rate
if self.valves.adaptive_rate_averaging
else None
), # Pass rolling rate if adaptive
cost_comparisons=cost_comparisons,
status_level=status_level,
budget_info=budget_info, # Pass daily/session budget info
monthly_budget_info=monthly_budget_info, # Pass the final monthly budget info
trimming_hint=trimming_hint, # Pass the hint
# --- Pass additional metrics (Ensure they are defined) ---
daily_spend=self.session_stats.get("daily_cost", 0.0), # Use .get with default
text_tokens=total_text_tokens, # Already defined in this scope
image_tokens=total_image_tokens, # Already defined in this scope
calibration_status=calibration_status, # Pass loaded status
)
# Update user cost tracking if user info is available
if __user__ and "email" in __user__:
self._update_user_cost(
__user__["email"],
model_name,
input_tokens,
output_tokens,
total_cost,
)
logger.info(
f"Request #{self.request_counter} completed: {total_tokens} tokens ({context_percentage:.1f}%), ${total_cost:.6f}"
)
# Emit final status update to UI
if self.valves.show_status:
await self._emit_status(__event_emitter__, status_message, True)
return body
except self.ModelNotRecognizedError as e:
# Model not recognized - Log the error and emit a clear status message
logger.warning(f"Model not recognized: {model_name}. Cannot determine context size or pricing.")
self.session_stats["error_count"] += 1
# Generate a simple error message for the UI
status_message = f"{WARNING_EMOJI} Model not recognized: '{model_name}'"
# Emit the error status update
if self.valves.show_status:
logger.debug("Attempting final status emission for unrecognized model...")
await self._emit_status(__event_emitter__, status_message, True)
# --- Calibration discrepancy logging (still useful even if model unknown) ---
try:
ui_usage = body.get("usage", {})
plugin_total = total_tokens
ui_total = ui_usage.get("total_tokens", 0)
if (
ui_total and abs(plugin_total - ui_total) / max(ui_total, 1) > 0.05
): # >5% diff
log_dir = os.path.join(os.getcwd(), "logs")
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, "token_discrepancies.log")
with open(log_path, "a", encoding="utf-8") as f:
f.write(
f"{datetime.now().isoformat()} | Model: {model_name} | Plugin: {plugin_total} | UI: {ui_total} | Diff: {plugin_total - ui_total}\n"
)
except Exception as e:
logger.error(f"Error logging token discrepancy: {e}")
# Return the unmodified body (this filter doesn't modify outputs)
logger.debug("Exiting outlet method (main try block successful).") # DEBUG
return body
except self.ModelNotRecognizedError as e:
# Model not recognized - Log the error and emit a clear status message
logger.warning(f"Model not recognized: {model_name}. Cannot determine context size or pricing.")
self.session_stats["error_count"] += 1
# Generate a simple error message for the UI
status_message = f"{WARNING_EMOJI} Model not recognized: '{model_name}'"
# Emit the error status update
if self.valves.show_status:
logger.debug("Attempting final status emission for unrecognized model...")
await self._emit_status(__event_emitter__, status_message, True)
# --- Calibration discrepancy logging (still useful even if model unknown) ---
try:
ui_usage = body.get("usage", {})
plugin_total = total_tokens
ui_total = ui_usage.get("total_tokens", 0)
if (
ui_total and abs(plugin_total - ui_total) / max(ui_total, 1) > 0.05
): # >5% diff
log_dir = os.path.join(os.getcwd(), "logs")
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, "token_discrepancies.log")
with open(log_path, "a", encoding="utf-8") as f:
f.write(
f"{datetime.now().isoformat()} | Model: {model_name} | Plugin: {plugin_total} | UI: {ui_total} | Diff: {plugin_total - ui_total}\n"
)
except Exception as e:
logger.error(f"Error logging token discrepancy: {e}")
# Return the unmodified body (this filter doesn't modify outputs)
logger.debug("Exiting outlet method (ModelNotRecognizedError block).") # DEBUG
return body
except (KeyError, TypeError, ValueError) as specific_err:
logger.error(f"Data or calculation error in outlet processing: {specific_err.__class__.__name__} - {str(specific_err)}")
logger.exception("Exception details:") # Keep traceback for context
self.session_stats["error_count"] += 1
# Emit a generic error status if possible
if self.valves.show_status:
try:
error_message = f"{WARNING_EMOJI} ERROR processing data"
await self._emit_status(__event_emitter__, error_message, True)
except Exception as emit_err:
logger.error(f"Error emitting status during specific error handling: {emit_err}")
except Exception as e: # General exception handler for the main try block
logger.error(f"General error in outlet processing: {e}") # Clarify log message
logger.exception("Exception details:") # Log full traceback
self.session_stats["error_count"] += 1 # Increment error count
# Emit an error status if possible
if self.valves.show_status:
try:
error_message = f"{WARNING_EMOJI} ERROR: {str(e)}"
await self._emit_status(__event_emitter__, error_message, True)
except Exception as emit_err: # Catch specific exception during emission
logger.error(f"Error emitting status during general exception handling: {emit_err}")
# pass # Decide if we need to do anything else here
# Return the unmodified body in case of error
logger.debug("Exiting outlet method (except block).") # DEBUG
return body