"""
title: Usage Tracker
author: InfamyStudio CortexNetwork
author_url: https://github.com/open-webui
funding_url: https://github.com/open-webui
version: 0.1
"""
from pydantic import BaseModel, Field
from typing import Optional
import tiktoken
import json
import os
MODEL_PRICING = {
"gpt-4o": {
"input": 5,
"output": 15,
"input-tokens": 1000000,
"output-tokens": 1000000,
},
"gpt-4o-mini": {
"input": 0.150,
"output": 0.600,
"input-tokens": 1000000,
"output-tokens": 1000000,
},
"gpt-4-turbo": {
"input": 10,
"output": 30,
"input-tokens": 1000000,
"output-tokens": 1000000,
},
"gpt-4": {
"input": 30,
"output": 60,
"input-tokens": 1000000,
"output-tokens": 1000000,
},
"gpt-3.5-turbo": {
"input": 3,
"output": 6,
"input-tokens": 1000000,
"output-tokens": 1000000,
},
"gpt-3.5-turbo-16k": {
"input": 3,
"output": 4,
"input-tokens": 1000000,
"output-tokens": 1000000,
},
"tts-1": {
"output": 15,
"output-tokens": 1000000,
},
}
class Filter:
class Valves(BaseModel):
priority: int = Field(
default=0, description="Priority level for the filter operations."
)
pass
def __init__(self):
self.valves = self.Valves()
self.input_cost = 0
self.output_cost = 0
self.cost_file = 'user_costing.json'
self._ensure_cost_file_exists()
pass
def _ensure_cost_file_exists(self):
if not os.path.exists(self.cost_file):
with open(self.cost_file, 'w') as file:
json.dump({}, file)
def _read_costs(self):
with open(self.cost_file, 'r') as file:
return json.load(file)
def _write_costs(self, costs):
with open(self.cost_file, 'w') as file:
json.dump(costs, file)
def _update_user_cost(self, user_email, additional_cost):
costs = self._read_costs()
if user_email in costs:
costs[user_email] += additional_cost
else:
costs[user_email] = additional_cost
self._write_costs(costs)
def inlet(self, body: dict, __user__: Optional[dict] = None) -> dict:
if __user__.get("role", "admin") in ["user", "admin"]:
messages = body.get("messages", [])
model = body.get("model")
try:
enc = tiktoken.encoding_for_model(model)
except Exception as e:
if "gpt-3.5-turbo" in model:
enc = tiktoken.encoding_for_model("gpt-3.5-turbo-")
else:
print(f"Error: {e}")
enc = tiktoken.encoding_for_model("gpt-4o")
if model in MODEL_PRICING:
total_tokens = sum(
len(enc.encode(message["content"])) for message in messages
)
model_data = MODEL_PRICING[model]
self.input_cost = (
total_tokens / model_data.get("input-tokens", 1)
) * model_data.get("input", 0)
return body
def outlet(self, body: dict, __user__: Optional[dict] = None) -> dict:
messages = body.get("messages", [])
model = body.get("model")
try:
enc = tiktoken.encoding_for_model(model)
except Exception as e:
if "gpt-3.5-turbo" in model:
enc = tiktoken.encoding_for_model("gpt-3.5-turbo-")
else:
print(f"Error: {e}")
enc = tiktoken.encoding_for_model("gpt-4o")
if model in MODEL_PRICING:
total_tokens = sum(
len(enc.encode(message["content"])) for message in messages
)
model_data = MODEL_PRICING[model]
self.output_cost = (
total_tokens / model_data.get("output-tokens", 1)
) * model_data.get("output", 0)
total_cost = self.input_cost + self.output_cost
if __user__:
print(f"User: {__user__}")
user_email = __user__.get('email')
self._update_user_cost(user_email, total_cost)
return body