NOTICE
Open WebUI Community is currently undergoing a major revamp to improve user experience and performance ✨

Function
pipe
v0.2.5
OpenAI ReAct agent_Added Whitelist Version
OpenAI ReAct agent using existing tools, with streaming and citations. Implemented with LangGraph.
Function ID
openai_react_agent_added_whitelist_version
Creator
@cooksleep
Downloads
933+

Function Content
python
"""
title: OpenAI ReAct agent_Added Whitelist Version
author: Michael Poluektov, Cook Sleep
author_urls:
  - https://github.com/michaelpoluektov
  - https://github.com/cooksleep
description: OpenAI ReAct agent using existing tools, with streaming and citations. Implemented with LangGraph.
required_open_webui_version: 0.3.15
requirements: langchain-openai==0.1.24, langgraph
version: 0.2.5
licence: MIT
"""

import os
from typing import Callable, AsyncGenerator, Awaitable, Optional, Protocol

from pydantic import BaseModel, Field
from openai import OpenAI
from langchain_openai import ChatOpenAI
from langchain_core.tools import StructuredTool
from langgraph.prebuilt import create_react_agent

BAD_NAMES = ["13", "3.5", "preview", "chatgpt"]

EmitterType = Optional[Callable[[dict], Awaitable[None]]]


class SendCitationType(Protocol):
    def __call__(self, url: str, title: str, content: str) -> Awaitable[None]: ...


class SendStatusType(Protocol):
    def __call__(self, status_message: str, done: bool) -> Awaitable[None]: ...


def get_send_citation(__event_emitter__: EmitterType) -> SendCitationType:
    async def send_citation(url: str, title: str, content: str):
        if __event_emitter__ is None:
            return
        await __event_emitter__(
            {
                "type": "citation",
                "data": {
                    "document": [content],
                    "metadata": [{"source": url, "html": False}],
                    "source": {"name": title},
                },
            }
        )

    return send_citation


def get_send_status(__event_emitter__: EmitterType) -> SendStatusType:
    async def send_status(status_message: str, done: bool):
        if __event_emitter__ is None:
            return
        await __event_emitter__(
            {
                "type": "status",
                "data": {"description": status_message, "done": done},
            }
        )

    return send_status


class Pipe:
    class Valves(BaseModel):
        OPENAI_BASE_URL: str = Field(
            default="https://api.openai.com/v1",
            description="Base URL for OpenAI API endpoints",
        )
        OPENAI_API_KEY: str = Field(default="", description="OpenAI API key")
        MODEL_PREFIX: str = Field(default="ReAct", description="Prefix before model ID")
        WHITELIST_MODELS: str = Field(
            default="", description="Comma-separated list of whitelisted models"
        )

    def __init__(self):
        self.type = "manifold"
        self.valves = self.Valves(
            **{k: os.getenv(k, v.default) for k, v in self.Valves.model_fields.items()}
        )
        print(f"{self.valves=}")

    def pipes(self) -> list[dict[str, str]]:
        try:
            self.setup()
        except Exception as e:
            return [{"id": "error", "name": f"Error: {e}"}]
        openai = OpenAI(**self.openai_kwargs)  # type: ignore
        models = [m.id for m in openai.models.list().data]
        models = [m for m in models if "gpt" in m]
        models = [m for m in models if not any(bad in m for bad in BAD_NAMES)]

        # Add whitelisted models
        whitelist = [
            m.strip() for m in self.valves.WHITELIST_MODELS.split(",") if m.strip()
        ]
        models = list(set(models + whitelist))

        return [{"id": m, "name": f"{self.valves.MODEL_PREFIX}/{m}"} for m in models]

    def setup(self):
        v = self.valves
        if not v.OPENAI_API_KEY or not v.OPENAI_BASE_URL:
            raise Exception("Error: OPENAI_API_KEY or OPENAI_BASE_URL is not set")
        self.openai_kwargs = {
            "base_url": v.OPENAI_BASE_URL,
            "api_key": v.OPENAI_API_KEY,
        }

    async def pipe(
        self,
        body: dict,
        __user__: dict | None,
        __task__: str | None,
        __tools__: dict[str, dict] | None,
        __event_emitter__: Callable[[dict], Awaitable[None]] | None,
    ) -> AsyncGenerator:
        print(__task__)
        print(f"{__tools__=}")
        if __task__ == "function_calling":
            return

        self.setup()

        model_id = body["model"][body["model"].rfind(".") + 1 :]
        model = ChatOpenAI(model=model_id, **self.openai_kwargs)  # type: ignore

        config = {}

        if __task__ == "title_generation":
            content = model.invoke(body["messages"], config=config).content
            assert isinstance(content, str)
            yield content
            return

        if not __tools__:
            async for chunk in model.astream(body["messages"], config=config):
                content = chunk.content
                assert isinstance(content, str)
                yield content
            return

        send_citation = get_send_citation(__event_emitter__)
        send_status = get_send_status(__event_emitter__)

        tools = []
        for key, value in __tools__.items():
            tools.append(
                StructuredTool(
                    func=None,
                    name=key,
                    coroutine=value["callable"],
                    args_schema=value["pydantic_model"],
                    description=value["spec"]["description"],
                )
            )
        graph = create_react_agent(model, tools=tools)
        inputs = {"messages": body["messages"]}
        num_tool_calls = 0
        async for event in graph.astream_events(inputs, version="v2", config=config):  # type: ignore
            kind = event["event"]
            data = event["data"]
            if kind == "on_chat_model_stream":
                if "chunk" in data and (content := data["chunk"].content):
                    yield content
            elif kind == "on_tool_start":
                yield "\n"
                await send_status(f"Running tool {event['name']}", False)
            elif kind == "on_tool_end":
                num_tool_calls += 1
                await send_status(
                    f"Tool '{event['name']}' returned {data.get('output')}", True
                )
                await send_citation(
                    url=f"Tool call {num_tool_calls}",
                    title=event["name"],
                    content=f"Tool '{event['name']}' with inputs {data.get('input')} returned {data.get('output')}",
                )