Whitepaper
Docs
Sign In
Function
Function
pipe
v0.2.5
OpenAI ReAct agent_Added Whitelist Version
Function ID
openai_react_agent_added_whitelist_version
Creator
@cooksleep
Downloads
1.1K+
OpenAI ReAct agent using existing tools, with streaming and citations. Implemented with LangGraph.
Get
README
No README available
Function Code
Show
""" title: OpenAI ReAct agent_Added Whitelist Version author: Michael Poluektov, Cook Sleep author_urls: - https://github.com/michaelpoluektov - https://github.com/cooksleep description: OpenAI ReAct agent using existing tools, with streaming and citations. Implemented with LangGraph. required_open_webui_version: 0.3.15 requirements: langchain-openai==0.1.24, langgraph version: 0.2.5 licence: MIT """ import os from typing import Callable, AsyncGenerator, Awaitable, Optional, Protocol from pydantic import BaseModel, Field from openai import OpenAI from langchain_openai import ChatOpenAI from langchain_core.tools import StructuredTool from langgraph.prebuilt import create_react_agent BAD_NAMES = ["13", "3.5", "preview", "chatgpt"] EmitterType = Optional[Callable[[dict], Awaitable[None]]] class SendCitationType(Protocol): def __call__(self, url: str, title: str, content: str) -> Awaitable[None]: ... class SendStatusType(Protocol): def __call__(self, status_message: str, done: bool) -> Awaitable[None]: ... def get_send_citation(__event_emitter__: EmitterType) -> SendCitationType: async def send_citation(url: str, title: str, content: str): if __event_emitter__ is None: return await __event_emitter__( { "type": "citation", "data": { "document": [content], "metadata": [{"source": url, "html": False}], "source": {"name": title}, }, } ) return send_citation def get_send_status(__event_emitter__: EmitterType) -> SendStatusType: async def send_status(status_message: str, done: bool): if __event_emitter__ is None: return await __event_emitter__( { "type": "status", "data": {"description": status_message, "done": done}, } ) return send_status class Pipe: class Valves(BaseModel): OPENAI_BASE_URL: str = Field( default="https://api.openai.com/v1", description="Base URL for OpenAI API endpoints", ) OPENAI_API_KEY: str = Field(default="", description="OpenAI API key") MODEL_PREFIX: str = Field(default="ReAct", description="Prefix before model ID") WHITELIST_MODELS: str = Field( default="", description="Comma-separated list of whitelisted models" ) def __init__(self): self.type = "manifold" self.valves = self.Valves( **{k: os.getenv(k, v.default) for k, v in self.Valves.model_fields.items()} ) print(f"{self.valves=}") def pipes(self) -> list[dict[str, str]]: try: self.setup() except Exception as e: return [{"id": "error", "name": f"Error: {e}"}] openai = OpenAI(**self.openai_kwargs) # type: ignore models = [m.id for m in openai.models.list().data] models = [m for m in models if "gpt" in m] models = [m for m in models if not any(bad in m for bad in BAD_NAMES)] # Add whitelisted models whitelist = [ m.strip() for m in self.valves.WHITELIST_MODELS.split(",") if m.strip() ] models = list(set(models + whitelist)) return [{"id": m, "name": f"{self.valves.MODEL_PREFIX}/{m}"} for m in models] def setup(self): v = self.valves if not v.OPENAI_API_KEY or not v.OPENAI_BASE_URL: raise Exception("Error: OPENAI_API_KEY or OPENAI_BASE_URL is not set") self.openai_kwargs = { "base_url": v.OPENAI_BASE_URL, "api_key": v.OPENAI_API_KEY, } async def pipe( self, body: dict, __user__: dict | None, __task__: str | None, __tools__: dict[str, dict] | None, __event_emitter__: Callable[[dict], Awaitable[None]] | None, ) -> AsyncGenerator: print(__task__) print(f"{__tools__=}") if __task__ == "function_calling": return self.setup() model_id = body["model"][body["model"].rfind(".") + 1 :] model = ChatOpenAI(model=model_id, **self.openai_kwargs) # type: ignore config = {} if __task__ == "title_generation": content = model.invoke(body["messages"], config=config).content assert isinstance(content, str) yield content return if not __tools__: async for chunk in model.astream(body["messages"], config=config): content = chunk.content assert isinstance(content, str) yield content return send_citation = get_send_citation(__event_emitter__) send_status = get_send_status(__event_emitter__) tools = [] for key, value in __tools__.items(): tools.append( StructuredTool( func=None, name=key, coroutine=value["callable"], args_schema=value["pydantic_model"], description=value["spec"]["description"], ) ) graph = create_react_agent(model, tools=tools) inputs = {"messages": body["messages"]} num_tool_calls = 0 async for event in graph.astream_events(inputs, version="v2", config=config): # type: ignore kind = event["event"] data = event["data"] if kind == "on_chat_model_stream": if "chunk" in data and (content := data["chunk"].content): yield content elif kind == "on_tool_start": yield "\n" await send_status(f"Running tool {event['name']}", False) elif kind == "on_tool_end": num_tool_calls += 1 await send_status( f"Tool '{event['name']}' returned {data.get('output')}", True ) await send_citation( url=f"Tool call {num_tool_calls}", title=event["name"], content=f"Tool '{event['name']}' with inputs {data.get('input')} returned {data.get('output')}", )