Function
filter
v1.0.0
AddMetadata
A Filter that adds user and other type of metadata to the requests. Useful for langfuse or litellm
Function ID
addmetadata
Downloads
38+

Function Content
python
"""
title: AddMetadata
author: thiswillbeyourightub
author_url: https://github.com/thiswillbeyourgithub/openwebui_custom_pipes_filters/
funding_url: https://github.com/thiswillbeyourgithub/openwebui_custom_pipes_filters/
version: 1.0.0
date: 2024-08-29
license: GPLv3
description: A Filter that adds user and other type of metadata to the requests. Useful for langfuse or litellm
"""

from pydantic import BaseModel, Field
from typing import Optional, Callable, Any
import json
from functools import cache

@cache
def load_json_dict(user_value: str) -> dict:
    user_value = user_value.strip()
    if not user_value:
        return {}
    loaded = json.loads(user_value)
    assert isinstance(loaded, dict), f"json is not a dict but '{type(loaded)}'"
    return loaded

@cache
def load_json_list(user_value: str) -> list:
    user_value = user_value.strip()
    if not user_value:
        return []
    loaded = json.loads(user_value)
    assert isinstance(loaded, list), f"json is not a list but '{type(loaded)}'"
    assert all(isinstance(elem, str) for elem in loaded), f"List contained non strings elements: '{loaded}'"
    return loaded


class Filter:
    class Valves(BaseModel):
        priority: int = Field(
            default=0,
            description="Priority level for the filter operations (default 0).",
        )
        add_userinfo: bool = Field(
            default=True,
            description="True to add the '__user__' dict of openwebui to the request as metadata. Note that 'user' of __user__ is also set as a 'user' metadata",
        )
        extra_metadata: str = Field(
            default='{"source": "open-webui"}',
            description="String that when passed through json.loads is a dict that will be added to the request. If a the value is a list or a value of the metadata is already set then we will append the new value to the list.",
        )
        extra_tags: str = Field(
            default='["open-webui"]',
            description="String that when passed through json.loads is a list that will be added as tags to the request.",
        )
        debug: bool = Field(
            default=False, description="True to add emitter prints",
        )

    def __init__(self):
        self.valves = self.Valves()

    async def on_valves_updated(self):
        load_json_dict(self.valves.extra_metadata)
        load_json_list(self.valves.extra_tags)

    async def inlet(
        self,
        body: dict,
        __user__: Optional[dict] = None,
        __event_emitter__: Callable[[dict], Any] = None,
        ) -> dict:
        # printer
        emitter = EventEmitter(__event_emitter__)
        async def log(message: str):
            if self.valves.debug:
                print(f"AddMetadata filter: inlet: {message}")
            if self.valves.debug:
                await emitter.progress_update(message)
            else:
                await emitter.progress_update("")


        if self.valves.debug:
            await log(f"AddMetadata filter: inlet: __user__ {__user__}")
            await log(f"AddMetadata filter: inlet: body {body}")

        # user
        if self.valves.add_userinfo:
            if "user" in body:
                await log(f"User key already found in body: '{body['user']}'")
                if body["user"] != __user__["name"]:
                    await log(f"User key different than expected: '{body['user']}' vs '{__user__['name']}'")
            body["user"] = __user__["name"]
            await log(f"Added user '{__user__['name']}'")

            if "metadata" in body:
                body["metadata"]["open-webui_user"] = __user__
            else:
                body["metadata"] = {"open-webui_user": __user__}

        # metadata
        metadata = load_json_dict(self.valves.extra_metadata)
        if metadata:
            if "metadata" in body:
                for k, v in metadata.items():
                    if k in body["metadata"]:
                        if isinstance(v, list) and isinstance(body["metadata"][k], list):
                            body["metadata"][k].extend(v)
                        elif isinstance(body["metadata"][k], list):
                            body["metadata"][k].append(v)
                        elif isinstance(v, list):
                            body["metadata"][k] = [body["metadata"][k]] + v
                    else:
                        body["metadata"][k] = v
                        # await log(f"Extra_metadata of key '{k}' was already present in request. Value before: '{body['metadata'][k]}', value after: '{v}'")
                await log("Updated metadata")
            else:
                body["metadata"] = metadata
                await log("Set metadata")
        else:
            await log("No metadata specified")

        tags = load_json_list(self.valves.extra_tags)
        if tags:
            if "tags" in body:
                body["tags"] += tags
                await log("Updated tags")
            else:
                body["tags"] = tags
                await log("Set tags")
        else:
            await log("No tags specified")

        # also add as langfuse metadata
        body["metadata"]["trace_metadata"] = body["metadata"].copy()

        await log(json.dumps(body))
        await emitter.success_update("")  # hides the emitter
        return body

    def outlet(self, body: dict, __user__: Optional[dict] = None) -> dict:
        return body


class EventEmitter:
    def __init__(self, event_emitter: Callable[[dict], Any] = None):
        self.event_emitter = event_emitter

    async def progress_update(self, description):
        await self.emit(description)

    async def error_update(self, description):
        await self.emit(description, "error", True)

    async def success_update(self, description):
        await self.emit(description, "success", True)

    async def emit(self, description="Unknown State", status="in_progress", done=False):
        if self.event_emitter:
            await self.event_emitter(
                {
                    "type": "status",
                    "data": {
                        "status": status,
                        "description": description,
                        "done": done,
                    },
                }
            )