Function
filter
v1.1
DataDog LLM Observability Filter
Forwards chat traces to DataDog LLM Observability
Function ID
datadog_llm_observability_filter
Creator
@0xthresh
Downloads
32+

Function Content
python
"""
title: DataDog Filter Pipeline
author: James W. (0xThresh)
author_url: https://github.com/0xThresh
funding_url: https://github.com/open-webui
version: 1.1
requirements: ddtrace
"""

# You can read more about the full configuration here: https://blog.opensourceai.dev/monitor-open-webui-with-datadog-llm-observability-620ef3a598c6
from typing import Optional
from open_webui.utils.misc import get_last_user_message, get_last_assistant_message
from pydantic import Field, BaseModel
from ddtrace.llmobs import LLMObs


class Filter:
    class Valves(BaseModel):
        priority: int = Field(
            default=0, 
            description="Priority level for the filter operations."
        )
        DD_API_KEY: str = Field(
            default="",
            description="DataDog API key"
        )
        DD_SITE: str = Field(
            default="datadoghq.com",
            description="Your DataDog site. Set if your account is hosted in a specific region.",
        )
        ML_APP: str = Field(
            default="Open WebUI",
            description="Name of the app shown in DataDog LLM traces",
        )
        pass

    def __init__(self):
        self.type = "filter"
        self.name = "DataDog Filter"

        # Initialize 'valves' with specific configurations. Using 'Valves' instance helps encapsulate settings,
        # which ensures settings are managed cohesively and not confused with operational flags like 'file_handler'.
        self.valves = self.Valves()

        # DataDog LLMOBS docs: https://docs.datadoghq.com/tracing/llm_observability/sdk/
        self.LLMObs = LLMObs()
        self.llm_span = None
        self.chat_generations = {}
        pass

    def inlet(self, body: dict, __user__: Optional[dict] = None) -> dict:

        self.LLMObs.enable(
            ml_app=self.valves.ML_APP,
            api_key=self.valves.DD_API_KEY,
            site=self.valves.DD_SITE,
            agentless_enabled=True,
            integrations_enabled=True,
            env="test",
            service="test",
        )

        self.llm_span = self.LLMObs.llm(
            model_name=body["model"],
            name=f"filter:{__name__}",
            model_provider="open-webui",
            session_id=body["metadata"]["chat_id"],
            ml_app=self.valves.ML_APP,
        )

        self.LLMObs.annotate(
            span=self.llm_span,
            input_data=get_last_user_message(body["messages"]),
        )

        return body

    def outlet(self, body: dict, __user__: Optional[dict] = None) -> dict:
        self.LLMObs.annotate(
            span=self.llm_span,
            output_data=get_last_assistant_message(body["messages"]),
        )

        self.llm_span.finish()
        self.LLMObs.flush()

        return body