NOTICE
Open WebUI Community is currently undergoing a major revamp to improve user experience and performance ✨

Function
pipe
v0.4.1
ReAct Agent (OpenAI and Ollama)
ReAct agent using existing tools, with streaming and citations. Implemented with LangGraph. Supports LangFuse optionally. Works with both OpenAI and Ollama. Now with model whitelist parameter.
Function ID
react_agent
Downloads
859+

Function Content
python
"""
title: OpenAI ReAct with Langfuse
author: Michael Poluektov
author_url: https://github.com/michaelpoluektov
git_url: https://github.com/michaelpoluektov/OWUI-ReAct
description: OpenAI ReAct
required_open_webui_version: 0.4.3
requirements: langchain-openai, langgraph, ollama, langchain_ollama
version: 0.4.1
licence: MIT
"""

import os
from typing import AsyncGenerator, Awaitable, Callable, Optional, Protocol

import ollama
from langchain_core.tools import StructuredTool
from langchain_ollama import ChatOllama
from langchain_openai import ChatOpenAI
from langfuse.callback import CallbackHandler
from langgraph.prebuilt import create_react_agent
from openai import OpenAI
from pydantic import BaseModel, Field

EmitterType = Optional[Callable[[dict], Awaitable[None]]]


def extract_event_info(event_emitter):
    if not event_emitter or not event_emitter.__closure__:
        return None, None
    for cell in event_emitter.__closure__:
        if isinstance(request_info := cell.cell_contents, dict):
            chat_id = request_info.get("chat_id")
            message_id = request_info.get("message_id")
            return chat_id, message_id
    return None, None


class SendCitationType(Protocol):
    def __call__(self, url: str, title: str, content: str) -> Awaitable[None]: ...


class SendStatusType(Protocol):
    def __call__(self, status_message: str, done: bool) -> Awaitable[None]: ...


def get_send_citation(__event_emitter__: EmitterType) -> SendCitationType:
    async def send_citation(url: str, title: str, content: str):
        if __event_emitter__ is None:
            return
        await __event_emitter__(
            {
                "type": "source",
                "data": {
                    "document": [content],
                    "metadata": [{"source": url, "html": False}],
                    "source": {"name": title},
                },
            }
        )

    return send_citation


def get_send_status(__event_emitter__: EmitterType) -> SendStatusType:
    async def send_status(status_message: str, done: bool):
        if __event_emitter__ is None:
            return
        await __event_emitter__(
            {
                "type": "status",
                "data": {"description": status_message, "done": done},
            }
        )

    return send_status


class Pipe:
    class Valves(BaseModel):
        OPENAI_BASE_URL: str = Field(
            default="https://api.openai.com/v1",
            description="Base URL for OpenAI API endpoints",
        )
        OPENAI_API_KEY: str = Field(default="", description="OpenAI API key")
        OLLAMA_URL: str = Field(
            default="", description="Base URL for Ollama API endpoints"
        )
        LANGFUSE_SECRET_KEY: str = Field(default="", description="Langfuse secret key")
        LANGFUSE_PUBLIC_KEY: str = Field(default="", description="Langfuse public key")
        LANGFUSE_URL: str = Field(default="", description="Langfuse URL")
        MODEL_PREFIX: str = Field(default="ReAct", description="Prefix before model ID")
        ENABLED_MODELS: str = Field(
            default="gpt-4o,gpt-4o-mini,gpt-4",
            description="Enabled models, comma-separated.",
        )

    def __init__(self):
        self.type = "manifold"
        self.valves = self.Valves(
            **{k: os.getenv(k, v.default) for k, v in self.Valves.model_fields.items()}
        )
        print(f"{self.valves=}")

    def pipes(self) -> list[dict[str, str]]:
        try:
            self.setup()
        except Exception as e:
            return [{"id": "error", "name": f"Error: {e}"}]
        models = []
        self.model_sources = {}
        if self.openai_kwargs:
            try:
                openai = OpenAI(**self.openai_kwargs)  # type: ignore
                enabled_models = [
                    m.strip() for m in self.valves.ENABLED_MODELS.split(",")
                ]
                oai_models = [
                    m.id for m in openai.models.list().data if m.id in enabled_models
                ]
                models.extend(oai_models)
                self.model_sources |= {m: "openai" for m in oai_models}
            except Exception as e:
                print(f"OpenAI error: {e}")
        if self.ollama_kwargs:
            try:
                client = ollama.Client(host=self.valves.OLLAMA_URL)
                ollama_models = [m["name"] for m in client.list()["models"]]
                models.extend(ollama_models)
                self.model_sources |= {m: "ollama" for m in ollama_models}
            except Exception as e:
                print(f"Ollama error: {e}")
        return [{"id": m, "name": f"{self.valves.MODEL_PREFIX}/{m}"} for m in models]

    def setup(self):
        v = self.valves
        if v.OPENAI_API_KEY and v.OPENAI_BASE_URL:
            self.openai_kwargs = {
                "base_url": v.OPENAI_BASE_URL,
                "api_key": v.OPENAI_API_KEY,
            }
        else:
            self.openai_kwargs = None
        if v.OLLAMA_URL:
            self.ollama_kwargs = {"base_url": v.OLLAMA_URL}
        else:
            self.ollama_kwargs = None
        if not (self.openai_kwargs or self.ollama_kwargs):
            raise ValueError("No API keys provided")

        lf = (v.LANGFUSE_SECRET_KEY, v.LANGFUSE_PUBLIC_KEY, v.LANGFUSE_URL)
        if not all(lf):
            self.langfuse_kwargs = None
        else:
            self.langfuse_kwargs = {
                "secret_key": v.LANGFUSE_SECRET_KEY,
                "public_key": v.LANGFUSE_PUBLIC_KEY,
                "host": v.LANGFUSE_URL,
            }

    async def pipe(
        self,
        body: dict,
        __user__: dict | None,
        __task__: str | None,
        __tools__: dict[str, dict] | None,
        __event_emitter__: Callable[[dict], Awaitable[None]] | None,
    ) -> AsyncGenerator:
        message_id, _ = extract_event_info(__event_emitter__)
        if __task__ == "function_calling":
            return

        self.setup()
        model_id = ".".join(body["model"].split(".")[1:])
        if self.model_sources[model_id] == "openai":
            assert self.openai_kwargs, "OpenAI API"
            model = ChatOpenAI(model=model_id, **self.openai_kwargs)  # type: ignore
        else:
            assert self.ollama_kwargs, "Ollama API"
            model = ChatOllama(model=model_id, **self.ollama_kwargs)  # type: ignore
        if self.langfuse_kwargs:
            user_kwargs = {"user_id": __user__["id"]} if __user__ else {}
            callback_kwargs = self.langfuse_kwargs | user_kwargs
            callbacks = [CallbackHandler(**callback_kwargs)]  # type: ignore
        else:
            callbacks = []
        config = {"callbacks": callbacks}  # type: ignore

        if __task__ == "title_generation":
            content = model.invoke(body["messages"], config=config).content  # type: ignore
            assert isinstance(content, str)
            yield content
            return

        if not __tools__:
            async for chunk in model.astream(
                body["messages"],
                config=config | {"run_id": message_id},  # type: ignore
            ):
                content = chunk.content
                assert isinstance(content, str)
                yield content
            return

        send_citation = get_send_citation(__event_emitter__)
        send_status = get_send_status(__event_emitter__)

        tools = []
        for key, value in __tools__.items():
            tools.append(
                StructuredTool(
                    func=None,
                    name=key,
                    coroutine=value["callable"],
                    args_schema=value["pydantic_model"],
                    description=value["spec"]["description"],
                )
            )
        graph = create_react_agent(model, tools=tools)
        inputs = {"messages": body["messages"]}
        num_tool_calls = 0
        started_tools = set()
        async for event in graph.astream_events(
            inputs,
            version="v2",
            config=config | {"run_id": message_id},  # type: ignore
        ):
            kind = event["event"]
            data = event["data"]
            if kind == "on_chat_model_stream":
                if "chunk" in data and (content := data["chunk"].content):
                    yield content
            elif kind == "on_tool_start":
                yield "\n"
                name = event["name"]
                await send_status(f"Running tool {name}", False)
                started_tools.add(name)
            elif kind == "on_tool_end":
                num_tool_calls += 1
                name = event["name"]
                await send_status(f"Tool '{name}' returned {data.get('output')}", True)
                await send_citation(
                    url=f"Tool call {num_tool_calls}",
                    title=name,
                    content=f"Tool '{name}' with inputs {data.get('input')} returned {data.get('output')}",
                )
                started_tools.remove(name)
        for name in started_tools:
            await send_status(f"Tool '{name}' failed.", True)