NOTICE
Open WebUI Community is currently undergoing a major revamp to improve user experience and performance ✨

Function
filter
v0.1
GPT Usage Tracker
This function allows you to track the costs for GPT Models, if you only use OpenAI models you can enable globally, if not you will want to enable this filter for all the models you want tracked. This will be expanded in the future to allow for all models, currently this writes to a JSON file called user_costing.json -> the plan is to eventually integrate into the admin dashboard.
Function ID
gpt_usage_tracker
Downloads
612+

Function Content
python
"""
title: Usage Tracker
author: InfamyStudio CortexNetwork
author_url: https://github.com/open-webui
funding_url: https://github.com/open-webui
version: 0.1
"""

from pydantic import BaseModel, Field
from typing import Optional
import tiktoken
import json
import os

MODEL_PRICING = {
    "gpt-4o": {
        "input": 5,
        "output": 15,
        "input-tokens": 1000000,
        "output-tokens": 1000000,
    },
    "gpt-4o-mini": {
        "input": 0.150,
        "output": 0.600,
        "input-tokens": 1000000,
        "output-tokens": 1000000,
    },
    "gpt-4-turbo": {
        "input": 10,
        "output": 30,
        "input-tokens": 1000000,
        "output-tokens": 1000000,
    },
    "gpt-4": {
        "input": 30,
        "output": 60,
        "input-tokens": 1000000,
        "output-tokens": 1000000,
    },
    "gpt-3.5-turbo": {
        "input": 3,
        "output": 6,
        "input-tokens": 1000000,
        "output-tokens": 1000000,
    },
    "gpt-3.5-turbo-16k": {
        "input": 3,
        "output": 4,
        "input-tokens": 1000000,
        "output-tokens": 1000000,
    },
    "tts-1": {
        "output": 15,
        "output-tokens": 1000000,
    },
}

class Filter:
    class Valves(BaseModel):
        priority: int = Field(
            default=0, description="Priority level for the filter operations."
        )
        pass

    def __init__(self):
        self.valves = self.Valves()
        self.input_cost = 0
        self.output_cost = 0
        self.cost_file = 'user_costing.json'
        self._ensure_cost_file_exists()
        pass

    def _ensure_cost_file_exists(self):
        if not os.path.exists(self.cost_file):
            with open(self.cost_file, 'w') as file:
                json.dump({}, file)

    def _read_costs(self):
        with open(self.cost_file, 'r') as file:
            return json.load(file)

    def _write_costs(self, costs):
        with open(self.cost_file, 'w') as file:
            json.dump(costs, file)

    def _update_user_cost(self, user_email, additional_cost):
        costs = self._read_costs()
        if user_email in costs:
            costs[user_email] += additional_cost
        else:
            costs[user_email] = additional_cost
        self._write_costs(costs)

    def inlet(self, body: dict, __user__: Optional[dict] = None) -> dict:
        if __user__.get("role", "admin") in ["user", "admin"]:
            messages = body.get("messages", [])
            model = body.get("model")
            try:
                enc = tiktoken.encoding_for_model(model)
            except Exception as e:
                if "gpt-3.5-turbo" in model:
                    enc = tiktoken.encoding_for_model("gpt-3.5-turbo-")
                else:
                    print(f"Error: {e}")
                    enc = tiktoken.encoding_for_model("gpt-4o")

            if model in MODEL_PRICING:
                total_tokens = sum(
                    len(enc.encode(message["content"])) for message in messages
                )
                model_data = MODEL_PRICING[model]
                self.input_cost = (
                    total_tokens / model_data.get("input-tokens", 1)
                ) * model_data.get("input", 0)

        return body

    def outlet(self, body: dict, __user__: Optional[dict] = None) -> dict:
        messages = body.get("messages", [])
        model = body.get("model")
        try:
            enc = tiktoken.encoding_for_model(model)
        except Exception as e:
            if "gpt-3.5-turbo" in model:
                enc = tiktoken.encoding_for_model("gpt-3.5-turbo-")
            else:
                print(f"Error: {e}")
                enc = tiktoken.encoding_for_model("gpt-4o")

        if model in MODEL_PRICING:
            total_tokens = sum(
                len(enc.encode(message["content"])) for message in messages
            )
            model_data = MODEL_PRICING[model]
            self.output_cost = (
                total_tokens / model_data.get("output-tokens", 1)
            ) * model_data.get("output", 0)
            total_cost = self.input_cost + self.output_cost

            if __user__:
                print(f"User: {__user__}")
                user_email = __user__.get('email')
                self._update_user_cost(user_email, total_cost)

        return body