"""
title: Universal Token Counter and Cost Metrics
author: Gurasis Osahan
author_url: https://github.com/asisosahan
funding_url: https://buymeacoffee.com/gosahan
gitlab_url: https://gitlab.genomicops.cloud/innovation-hub/open-webui-token-counter
version: 0.1.1
description:
A token counting and cost estimation utility, designed as a snippet for
Open Web UI. Supports GPT-3.5/4, Anthropic Claude, and fallback categories
for unknown models. Pricing is user-configurable and can be manually updated
to accommodate future changes.
"""
import logging
import time
from typing import Any, Dict, List, Optional
import tiktoken
from fastapi.requests import Request
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
if not logger.handlers:
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.set_name("universal_token_counter")
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.propagate = False
# ==============================================================================
# PRICING DICTIONARIES
# ------------------------------------------------------------------------------
# 1) MANUAL UPDATES:
# - Users can add/update entries in MODEL_PRICING if new models or changed
# token prices appear. For example, if OpenAI updates GPT-4 input cost
# from $0.03 → $0.025 per 1K tokens, multiply by 1000 for per-million
# tokens (i.e. $25.00 → $20.00).
#
# 2) AUTOMATED FETCH (NOT IMPLEMENTED):
# - If an official API or stable source emerges, integrate logic here
# (e.g., from a known endpoint). For now, manual updates are the recommended
# approach due to frequent changes and varied vendor structures.
# ==============================================================================
MODEL_PRICING = {
# ------------------------------
# GPT-4 and GPT-3.5 placeholders
# ------------------------------
"gpt-4": {"input": 30.00, "output": 60.00}, # 8k context
"gpt-4-32k": {"input": 60.00, "output": 120.00}, # 32k context
"gpt-3.5-turbo": {"input": 1.50, "output": 2.00},
"gpt-3.5-turbo-16k": {"input": 3.00, "output": 4.00},
"gpt-3.5-turbo-0613": {"input": 1.50, "output": 2.00},
"gpt-3.5-turbo-16k-0613": {"input": 3.00, "output": 4.00},
# Legacy references
"davinci-002": {"input": 2.00, "output": 2.00},
"babbage-002": {"input": 0.40, "output": 0.40},
# -------------------------------------------------------------------------
# Anthropic Claude placeholders - edit to match your actual costs
# -------------------------------------------------------------------------
"claude-3.5-sonnet-2024-10-22": {"input": 20.00, "output": 40.00},
"claude-3.5-sonnet-2024-06-20": {"input": 20.00, "output": 40.00},
"claude-3.5-haiku": {"input": 15.00, "output": 30.00},
"claude-3-opus": {"input": 25.00, "output": 50.00},
"claude-3-sonnet": {"input": 20.00, "output": 40.00},
"claude-3-haiku": {"input": 15.00, "output": 30.00},
"claude-2.1": {"input": 12.00, "output": 24.00},
"claude-2.0": {"input": 12.00, "output": 24.00},
# -------------------------------------------------------------------------
# GPT-4o and other placeholders from earlier versions
# -------------------------------------------------------------------------
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-2024-11-20": {"input": 2.50, "output": 10.00},
"gpt-4o-2024-08-06": {"input": 2.50, "output": 10.00},
"gpt-4o-2024-05-13": {"input": 5.00, "output": 15.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gpt-4o-mini-2024-07-18": {"input": 0.15, "output": 0.60},
"gpt-4o-audio-preview": {"input": 2.50, "output": 10.00},
"gpt-4o-audio-preview-2024-12-17": {"input": 2.50, "output": 10.00},
"gpt-4o-audio-preview-2024-10-01": {"input": 2.50, "output": 10.00},
"gpt-4o-realtime-preview": {"input": 5.00, "output": 20.00},
"gpt-4o-realtime-preview-2024-12-17": {"input": 5.00, "output": 20.00},
"gpt-4o-realtime-preview-2024-10-01": {"input": 5.00, "output": 20.00},
"o1": {"input": 15.00, "output": 60.00},
"o1-2024-12-17": {"input": 15.00, "output": 60.00},
"o1-preview": {"input": 15.00, "output": 60.00},
"o1-preview-2024-09-12": {"input": 15.00, "output": 60.00},
"o1-mini": {"input": 3.00, "output": 12.00},
"o1-mini-2024-09-12": {"input": 3.00, "output": 12.00},
# --------------------------------------------------------------
# Fallback categories: "small", "medium", "large" for unknown models
# --------------------------------------------------------------
"small": {"input": 0.50, "output": 1.50},
"medium": {"input": 5.00, "output": 15.00},
"large": {"input": 15.00, "output": 60.00},
}
class Action:
"""
This snippet is designed to run within Open Web UI, analyzing token usage
in the last user/assistant message and emitting cost metrics.
Usage:
- Provide a `body` dict with a "messages" list:
e.g. body = {"messages": [{"role": "user", "content": "Hello!"}]}
- Provide an optional `__model__` dict, e.g. {"id": "gpt-4"}
to determine the cost rate.
- The method will emit "status" and "citation" events via
`__event_emitter__`.
Future Updates:
- If new model names or updated per-million rates appear,
edit `MODEL_PRICING` accordingly.
- If an official pricing API becomes available,
integrate it by replacing or supplementing the `MODEL_PRICING` dict.
"""
class Valves(BaseModel):
show_status: bool = Field(
default=True, description="Show status of the token counting action"
)
show_cost: bool = Field(default=True, description="Show cost estimation in USD")
show_timing: bool = Field(default=True, description="Show processing time")
detailed_output: bool = Field(
default=False, description="Show detailed token breakdown"
)
def __init__(self) -> None:
self.valves = self.Valves()
self.start_time: Optional[float] = None
def get_model_category(self, model: str) -> str:
"""
Determine model category based on name for pricing fallback.
(Used if model isn't explicitly in MODEL_PRICING.)
"""
model_lower = model.lower()
if any(
term in model_lower for term in ["gpt-4o", "claude-3-opus", "o1", "-large"]
):
return "large"
elif any(
term in model_lower for term in ["gpt-3.5", "claude-3-sonnet", "-medium"]
):
return "medium"
else:
return "small"
def get_token_count(self, text: str, model: str) -> int:
"""
Count tokens using tiktoken:
1) Tries model-specific encoding (encoding_for_model).
2) If that fails, tries cl100k_base.
3) If that fails too, approximates tokens by ~1.3 per word.
"""
try:
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
except Exception as e_primary:
logger.debug(f"Primary encoding failed for model '{model}': {e_primary}")
try:
encoding = tiktoken.get_encoding("cl100k_base")
return len(encoding.encode(text))
except Exception as e_fallback:
logger.error(f"Fallback encoding also failed: {e_fallback}")
# Approximate as last resort
return int(len(text.split()) * 1.3)
def calculate_cost(self, tokens: int, model: str, is_input: bool = True) -> float:
"""
Calculate cost based on token count.
If model not in MODEL_PRICING, fallback to get_model_category().
"""
if model not in MODEL_PRICING:
category = self.get_model_category(model)
price_key = "input" if is_input else "output"
price_per_million = MODEL_PRICING[category][price_key]
else:
price_key = "input" if is_input else "output"
price_per_million = MODEL_PRICING[model][price_key]
# Convert tokens → cost per million
return (tokens / 1_000_000) * price_per_million
async def action(
self,
body: Dict[str, List[Dict[str, str]]],
__request__: Request,
__user__: Optional[Any] = None,
__event_emitter__: Optional[Any] = None,
__event_call__: Optional[Any] = None,
__model__: Optional[Dict[str, str]] = None,
) -> None:
"""
Main entry point for token analysis.
Emits "status" updates and optionally "citation" details.
"""
logger.info(f"Starting token analysis for action:{__name__}")
self.start_time = time.time()
# If no event emitter, there's nowhere to send output.
if not __event_emitter__:
return None
try:
messages = body.get("messages", [])
model = (__model__.get("id") if __model__ else "unknown") or "unknown"
logger.info(f"Processing model: {model}")
# Emit initial status
if self.valves.show_status:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Analyzing Usage", "done": False},
}
)
if not messages:
logger.warning("No messages found to analyze.")
return None
# We typically measure the last message
last_message = messages[-1]
message_content = last_message.get("content", "")
# Count tokens
token_count = self.get_token_count(message_content, model)
# Calculate cost (assuming output tokens, adjust if needed)
cost = self.calculate_cost(token_count, model, is_input=False)
# Compute elapsed time
elapsed_time = time.time() - self.start_time
# Build status display
stats = [f"{token_count:,} tokens"]
if self.valves.show_cost:
stats.append(f"${cost:.4f} USD")
if self.valves.show_timing:
stats.append(f"{elapsed_time:.2f}s")
status_message = " | ".join(stats)
# Emit final status
await __event_emitter__(
{
"type": "status",
"data": {"description": status_message, "done": True},
}
)
# If user wants detailed breakdown
if self.valves.detailed_output:
await __event_emitter__(
{
"type": "citation",
"data": {
"source": {"name": "Token & Cost Analysis"},
"document": [
f"Model: {model}",
f"Token Count: {token_count:,}",
f"Category Fallback: {self.get_model_category(model)}",
f"Estimated Cost: ${cost:.4f} USD",
f"Processing Time: {elapsed_time:.2f}s",
],
"metadata": [{"source": "Universal Token Counter"}],
},
}
)
except Exception as e:
logger.error(f"Error in token analysis: {str(e)}")
# Emit error status
if self.valves.show_status:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Error in Analysis",
"done": True,
},
}
)
# Provide error details
await __event_emitter__(
{
"type": "citation",
"data": {
"source": {"name": "Error:token analysis"},
"document": [str(e)],
"metadata": [{"source": "Universal Token Counter"}],
},
}
)
finally:
return None