Function
filter
v0.1
Weave Context and Completion Logging
Log your inputs and completions to Weave (by Weights & Biases) for LLMOps / observability
Function ID
weave_context_and_completion_logging
Creator
@mrgba
Downloads
24+

Function Content
python
"""
title: Weave (Wandb) Context and Completion Logging
description: Weave (by Weights and Biases) context and completion logging for OpenWebUI
author: m-rgba (@martinmark)
author_url: https://github.com/m-rgba/openwebui-weave
version: 0.1
"""

import datetime
import tiktoken
import requests
from pydantic import BaseModel, Field
from typing import List, Optional


class Filter:
    class Valves(BaseModel):
        priority: int = Field(
            default=0, description="Priority level for the filter operations."
        )
        wandb_project_name: Optional[str] = Field(
            default=None,
            description="Weights & Biases project name for initialization in the format of `username/project_name`.",
        )
        wandb_api_key: Optional[str] = Field(
            default=None, description="Weights & Biases API key for login."
        )
        pass

    def __init__(self):
        self.valves = self.Valves()
        pass

    def inlet(self, body: dict, __user__: Optional[dict] = None) -> dict:
        # print(f"inlet:{__name__}")
        # print(f"inlet:body:{body}")
        # print(f"inlet:user:{__user__}")
        url = "https://trace.wandb.ai/call/start"
        headers = {"content-type": "application/json"}
        payload = {
            "start": {
                "project_id": self.valves.wandb_project_name,
                "op_name": __name__,
                "started_at": datetime.datetime.now().isoformat(),
                "inputs": {
                    "messages": body.get("messages", []),
                    "model": body.get("model", ""),
                    "metadata": body.get("metadata", {}),
                    "user": __user__,
                },
                "attributes": {},
            }
        }
        response = requests.post(
            url, headers=headers, json=payload, auth=("api", self.valves.wandb_api_key)
        )
        if response.status_code == 200:
            # print("Start request successful")
            data = response.json()
            self.trace_id = data.get("id")
        else:
            print("Start request failed with status:", response.status_code)
            print(response.text)
        return body

    def outlet(self, body: dict, __user__: Optional[dict] = None) -> dict:
        # print(f"outlet:{__name__}")
        # print(f"outlet:body:{body}")
        # print(f"outlet:user:{__user__}")

        # Extract the last assistant message and its metadata
        messages = body.get("messages", [])
        last_assistant_message_obj = next(
            (msg for msg in reversed(messages) if msg.get("role") == "assistant"),
            None,
        )
        last_assistant_message = (
            last_assistant_message_obj.get("content")
            if last_assistant_message_obj
            else None
        )

        # Check if usage data is available in the last assistant message
        usage_data = (
            last_assistant_message_obj.get("info", {}).get("usage")
            if last_assistant_message_obj
            else None
        )
        model = body.get("model", "gpt-4o")
        if usage_data:
            usage_source = "API"
            # Use the provided usage data
            input_tokens = usage_data.get("prompt_tokens", 0)
            output_tokens = usage_data.get("completion_tokens", 0)
        else:
            usage_source = "Calculated"

            # Initialize tiktoken encoder and token buffers based on model
            # Approximate buffer from tiktoken as described here:
            # https://cookbook.openai.com/examples/how_to_count_tokens_with_tiktoken#6-counting-tokens-for-chat-completions-api-calls
            try:
                encoding = tiktoken.encoding_for_model(model)
            except KeyError:
                encoding = tiktoken.get_encoding("cl100k_base")
            # Calculate input tokens from all messages
            input_tokens = 0
            # print(messages)
            for message in messages:
                # Exclude last assistant response for input tokens
                if message is not last_assistant_message_obj:
                    message_content = message.get("content", "")
                    input_tokens += len(encoding.encode(str(message_content)))
                    input_tokens = input_tokens + 4

            # Calculate output tokens
            output_tokens = 0
            if last_assistant_message:
                output_tokens = len(encoding.encode(last_assistant_message))
                output_tokens = output_tokens + 3

        # Build the payload
        payload = {
            "end": {
                "project_id": self.valves.wandb_project_name,
                "id": self.trace_id,
                "ended_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
                "output": {
                    "choices": [
                        {"message": {"content": last_assistant_message}},
                    ],
                    "usage_source": usage_source,
                },
                "summary": {
                    "usage": {
                        model: {
                            "prompt_tokens": input_tokens,
                            "completion_tokens": output_tokens,
                            "total_tokens": input_tokens + output_tokens,
                            "requests": 1,
                        }
                    }
                },
            }
        }
        # Send end request
        url = "https://trace.wandb.ai/call/end"
        headers = {"content-type": "application/json"}
        response = requests.post(
            url, headers=headers, json=payload, auth=("api", self.valves.wandb_api_key)
        )
        if response.status_code == 200:
            # print("End request successful")
            data = response.json()
            # print(f"outlet:data:{data}")
        else:
            print("End request failed with status:", response.status_code)
            print(response.text)
        return body