"""
title: OpenAI Responses API Pipeline
id: openai_responses_api_pipeline
author: Justin Kropp
author_url: https://github.com/jrkropp
description: Brings OpenAI Response API support to Open WebUI. Streams <think> reasoning (if enabled) and assistant text using the OpenAI Responses SDK, with early support for images, reasoning, and status indicators.
version: 1.6.2
license: MIT
requirements: openai>=1.76.0
-----------------------------------------------------------------------------------------
π OVERVIEW
-----------------------------------------------------------------------------------------
This pipeline integrates OpenAI Responses API with Open WebUI, enabling features exclusive
to the Responses API. Simply drop it in to start using advanced models and capabilities.
Supported features:
- π¬ Reasoning control via REASON_SUMMARY and REASON_EFFORT.
- πΌοΈ Image input support (base64-encoded images via "input_image" parameter).
- π Streaming assistant responses and reasoning in real-time.
- π Web search integration (requires ENABLE_WEB_SEARCH=True).
- π Debug mode (DEBUG=True) for inspecting requests and streamed events.
- π Usage statistics (tracks and displays token usage if enabled in the model).
- π§ Native tool calling (supports OpenAI function calling with OpenWebUI tools).
- π‘οΈ Safety settings for tool usage and API calls.
- π Custom API keys (users can use their own API keys for requests).
- π Citations (displays tool-generated citations for transparency and debugging).
Roadmap / Future Improvements:
TODO πΌοΈ Add support for latest OpenAI image model (gpt-image-1). Enable via optional valve. Ideally via tool calling and/or image button.
TODO π Add support for permanently saving tool outputs to the conversation history, not just passing them temporarily during the active request.
TODO π Document input support (e.g., PDFs, other files via __files__ parameter in pipe() function).
TODO π Enhanced RAG support (improved retrieval-augmented generation with better context injection).
Notes:
- This pipeline is designed for OpenAI's Responses API and may not work with other APIs.
- This pipeline is still in development and may have bugs. Use at your own risk.
- To use multiple models, duplicate this pipeline and configure each copy for a specific model.
- Tool calling requires OpenWebUI model Advanced Params > Function Calling > set to "Native".
Read more about the responses API:
- https://openai.com/index/new-tools-for-building-agents/
- https://platform.openai.com/docs/quickstart?api-mode=responses
- https://platform.openai.com/docs/api-reference/responses
-----------------------------------------------------------------------------------------
π οΈ CHANGELOG
-----------------------------------------------------------------------------------------
β’ 1.6.2
- Fixed bug where it would check if the client is established each time a chunk is streamed. Fixed by moving, 'client = get_openai_client(self.valves)' outside the while loop.
β’ 1.6.1
- Updated requirements to "openai>=1.76.0" (library will automatically install when pipe in initialized).
- Added lazy and safe OpenAI client creation inside pipe() to avoid unnecessary re-instantiation.
- Cleaned up docstring for improved readability.
β’ 1.6.0
- Added TOKEN_BUFFER_SIZE (default 1) for streaming control. This controls the number of tokens to buffer before yielding. Set to 1 for immediate per-token streaming.
- Cleaned up docstring at top of file for better readability.
- Refactored code for improved readability and maintainability.
- Rewrote transform_chat_messages_to_responses_api_format() for better readability and performance.
- Changed tool_choice behavior. Now defaults to "none" if no tools are present.
β’ 1.5.10
- Introduced True Parallel Tool Calling. Tool calls are now executed in parallel using asyncio.gather, then all results are appended at once before returning to the LLM. Previously, calls were handled one-by-one due to sequential loop logic.
- Set PARALLEL_TOOL_CALLS default back to True to match OpenAI's default behavior.
- The model now receives a clear system message when nearing the MAX_TOOL_CALLS limit, encouraging it to conclude tool use gracefully.
- Status messages now reflect when a tool is being invoked, with more personality and clarity for the user.
- Tool responses are now emitted as citations, giving visibility into raw results (especially useful for debugging and transparency).
β’ 1.5.9
- Fixed bug where web_search tool could cause OpenAI responses to loop indefinitely.
- Introduced MAX_TOOL_CALLS valve (default 5) to limit the number of tool calls in a single request as extra safety precaution.
- Set PARALLEL_TOOL_CALLS valve default to False (prev. True).
β’ 1.5.8
- Polished docstrings and streamlined debug logging output.
- Refactored code for improved readability.
β’ 1.5.7
- Introduced native tool support for OpenAI! Integrate with OpenWebUI tools.
β’ 1.5.6
- Fixed minor bugs in function calling and improved performance for large messages.
- Introduced partial support for multi-modal input.
"""
from __future__ import annotations
import os
import re
import traceback
import asyncio
import json
import httpx
from datetime import datetime
from typing import Any, Dict, List, Optional, Callable, Awaitable
from openai import AsyncOpenAI
from pydantic import BaseModel, Field
from fastapi import Request
class Pipe:
"""
A pipeline for streaming responses from the OpenAI Responses API.
The Responses API builds on the Chat Completions API by introducing server-side state,
built-in tools, and a rich event-based streaming format. This pipeline bridges the gap
between Open WebUI's Chat Completions-style input/output and the Responses API's
item-based schema.
"""
class Valves(BaseModel):
"""
Parameters controlling the API call and behavior.
"""
BASE_URL: Optional[str] = Field(
default="https://api.openai.com/v1",
description="The base URL to use with the OpenAI SDK. Defaults to the official OpenAI API endpoint. Supports LiteLLM and other custom endpoints.",
)
API_KEY: str = Field(
default=os.getenv("OPENAI_API_KEY"),
description=(
"Your OpenAI API key. If left blank, the environment variable "
"'OPENAI_API_KEY' will be used instead."
),
)
MODEL_ID: str = Field(
default="gpt-4.1",
description=(
"Model ID used to generate responses. Defaults to 'gpt-4.1'. "
"Note: The model ID must be a valid OpenAI model ID. E.g. 'gpt-4o', 'o3', etc."
),
) # Read more: https://platform.openai.com/docs/api-reference/responses/create#responses-create-model
REASON_SUMMARY: Optional[str] = Field(
default=None,
description=(
"Reasoning summary style for o-series models (auto | concise | detailed)"
"Leave blank to skip passing 'reasoning' (default)."
),
) # Read more: https://platform.openai.com/docs/api-reference/responses/create#responses-create-reasoning
REASON_EFFORT: Optional[str] = Field(
default=None,
description=(
"Reasoning effort level (low | medium | high). "
"Leave blank to skip passing 'reasoning' if not needed (default)."
),
) # Read more: https://platform.openai.com/docs/api-reference/responses/create#responses-create-reasoning
ENABLE_WEB_SEARCH: bool = Field(
default=False,
description=(
"Whether to enable the built-in 'web_search' tool. "
"If True, adds {'type': 'web_search'} to tools (unless already present)."
),
) # Read more: https://platform.openai.com/docs/guides/tools-web-search?api-mode=responses
SEARCH_CONTEXT_SIZE: Optional[str] = Field(
default="medium",
description=(
"Specifies the OpenAI web search context size: low | medium | high. "
"Default is 'medium'. Affects cost, quality, and latency"
"Only used if ENABLE_WEB_SEARCH=True."
),
)
PARALLEL_TOOL_CALLS: bool = Field(
default=False,
description=(
"Whether tool calls can be parallelized. Defaults to True if not set."
),
) # Read more: https://platform.openai.com/docs/api-reference/responses/create#responses-create-parallel_tool_calls
MAX_TOOL_CALLS: int = Field(
default=5,
description=(
"Maximum number of tool calls the model can make in a single request. "
"This is a hard stop safety limit to prevent infinite loops. "
"Defaults to 5."
),
)
STORE_RESPONSE: bool = Field(
default=False,
description=(
"Whether to store the generated model response (on OpenAI's side) for later debuging. "
"Defaults to False."
),
) # Read more: https://platform.openai.com/docs/api-reference/responses/create#responses-create-store
TOKEN_BUFFER_SIZE: int = Field(
default=1,
description="Number of partial tokens to buffer before yielding. "
"Set this to 1 (default) for immediate per-token streaming.",
)
DEBUG: bool = Field(
default=False,
description="When True, prints debug statements to the console. Do not leave this on in production.",
)
def __init__(self):
print("Pipe __init__ called!")
self.valves = self.Valves()
self.name = f"OpenAI: {self.valves.MODEL_ID}"
async def on_startup(self):
print(f"on_startup:{__name__}")
async def on_shutdown(self):
print(f"on_shutdown:{__name__}")
async def on_valves_updated(self):
print(f"on_valves_updated: {__name__}")
async def pipe(
self,
body: dict[str, Any],
__user__: Dict[str, Any],
__request__: Request,
__event_emitter__: Callable[["Event"], Awaitable[None]],
__event_call__: Callable[[dict[str, Any]], Awaitable[Any]],
__task__: str,
__task_body__: dict[str, Any],
__files__: list[dict[str, Any]],
__metadata__: dict[str, Any],
__tools__: dict[str, Any],
):
"""
Main pipeline method that:
1. Transforms incoming chat messages to OpenAI Responses format
2. Prepares tools (for function calling)
3. Builds a streaming request
4. Collects partial tokens in an array
5. Processes function calls (tools) if any, then loops if needed
6. Returns a final array of all text chunks
"""
"""
# Uncomments to show all function parameters when DEBUG is True (for exploration). Commented out to avoid cluttering the console.
if self.valves.DEBUG:
print("---------------------------------------------------------")
print("[DEBUG] Pipe function called with parameters:")
all_params = {
"body": body,
"__user__": __user__,
"__request__": __request__,
"__event_emitter__": __event_emitter__,
"__event_call__": __event_call__,
"__task__": __task__,
"__task_body__": __task_body__,
"__files__": __files__,
"__metadata__": __metadata__,
"__tools__": __tools__,
}
print(json.dumps(all_params, indent=2, default=str))
print("---------------------------------------------------------")
"""
# If tools are provided, but function calling is not 'native', warn the user
if __tools__ and __metadata__.get("function_calling") != "native":
yield (
"π Tools detected, but native function calling is disabled.\n\n"
'To enable tools in this chat, switch **Function Calling** to **"Native"** under:\n'
"βοΈ **Chat Controls** β **Advanced Params** β **Function Calling**\n\n"
"If you're an admin, you can also set this at the **model level**:\n"
"**Model Settings** β **Advanced Params** β **Function Calling = Native**"
)
# STEP 1: Establish OpenAI Client
client = get_openai_client(self.valves)
# STEP 2: Transform the userβs messages into the format the Responses API expects
all_messages = body.get("messages", [])
transformed_messsage_array = transform_chat_messages_to_responses_api_format(
all_messages
)
input_messages = transformed_messsage_array["input"]
instructions = transformed_messsage_array["instructions"]
# STEP 3: Prepare any tools (function specs), if any
tools = prepare_tools(__tools__)
if self.valves.ENABLE_WEB_SEARCH:
tools.append(
{
"type": "web_search",
"search_context_size": self.valves.SEARCH_CONTEXT_SIZE,
}
)
# STEP 4: Build the request parameters
request_params = {
"model": self.valves.MODEL_ID,
"tools": tools,
"tool_choice": "auto" if tools else "none",
"instructions": instructions,
"input": input_messages,
"parallel_tool_calls": self.valves.PARALLEL_TOOL_CALLS,
"max_output_tokens": body.get("max_tokens"),
"temperature": body.get("temperature") or 1.0,
"top_p": body.get("top_p") or 1.0,
"user": __user__.get("email"),
"store": self.valves.STORE_RESPONSE,
"text": {"format": {"type": "text"}},
"truncation": "auto",
"stream": True,
}
if self.valves.REASON_EFFORT or self.valves.REASON_SUMMARY:
request_params["reasoning"] = {}
if self.valves.REASON_EFFORT:
request_params["reasoning"]["effort"] = self.valves.REASON_EFFORT
if self.valves.REASON_SUMMARY:
request_params["reasoning"]["summary"] = self.valves.REASON_SUMMARY
tool_call_count = 0
# STEP 5: Loop until we either run out of tool calls or the conversation ends
while tool_call_count <= self.valves.MAX_TOOL_CALLS:
debug = self.valves.DEBUG
token_buffer_size = self.valves.TOKEN_BUFFER_SIZE
if debug:
print("[DEBUG] Calling OpenAI API with the following input_messages:")
print(json.dumps(input_messages, indent=2, default=str))
# Try to call the OpenAI API and stream the response
try:
pending_function_calls = []
conversation_ended = False
response_stream = await client.responses.create(**request_params)
# Process each event in the response stream based on its type
is_model_thinking = False
buffer_parts = [] # Collect partial chunks here
event_count_since_flush = (
0 # Count partial text events since last flush
)
async for event in response_stream:
if debug:
interesting_events = {
"response.function_call_arguments.done",
"response.output_text.done",
"response.failed",
"response.done",
"response.completed",
"response.output_item.added",
}
print(f"\033[1;92m[DEBUG] Event type: {event.type}\033[0m")
if event.type in interesting_events:
print(
f"\033[1;93m{json.dumps(event.__dict__, indent=2, default=str)}\033[0m"
)
event_type = event.type
chunk = ""
# -------------------------------------------------------------------
# REASONING (chain-of-thought) partial events
# -------------------------------------------------------------------
if event_type == "response.reasoning_summary_text.delta":
if not is_model_thinking:
chunk += "<think>" # OpenWebUI expects <think> tags to be used for reasoning
is_model_thinking = True
chunk += event.delta
elif event_type == "response.reasoning_summary_text.done":
chunk += "</think>\n"
is_model_thinking = False
# -------------------------------------------------------------------
# NORMAL TEXT (assistant output) partial events
# -------------------------------------------------------------------
elif event_type == "response.output_text.delta":
# If we were in a <think> block, close it first
if is_model_thinking:
buffer_parts.append("</think>\n")
is_model_thinking = False
event_count_since_flush += 1
chunk += event.delta
elif event_type == "response.output_text.done":
chunk += "\n\n"
# -------------------------------------------------------------------
# FUNCTION CALLS OR STATUS
# -------------------------------------------------------------------
elif event_type == "response.output_item.added":
item = getattr(event, "item", None)
if item and item.type == "function_call" and __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"π§ Hmm, let me run my {item.name} tool real quick...",
"done": False,
},
}
)
continue
elif event_type == "response.output_item.done":
item = getattr(event, "item", None)
if item.type == "function_call":
# queue the function call for processing
pending_function_calls.append(item)
continue
elif event_type == "response.content_part.added":
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "", "done": True},
}
)
continue
elif event_type == "response.web_search_call.in_progress":
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"π Searching the internet...",
"done": False,
},
}
)
continue
# -------------------------------------------------------------------
# CITATIONS / ANNOTATIONS
# -------------------------------------------------------------------
elif event_type == "response.output_text.annotation.added":
raw_anno = str(getattr(event, "annotation", ""))
title_m = re.search(r"title='([^']*)'", raw_anno)
url_m = re.search(r"url='([^']*)'", raw_anno)
title = title_m.group(1) if title_m else "Unknown Title"
url = url_m.group(1) if url_m else ""
url = url.replace("?utm_source=openai", "").replace(
"&utm_source=openai", ""
)
if __event_emitter__:
await __event_emitter__(
{
"type": "citation",
"data": {
"document": [title],
"metadata": [
{
"date_accessed": datetime.now().isoformat(),
"source": title,
}
],
"source": {"name": url, "url": url},
},
}
)
continue
# -------------------------------------------------------------------
# END OF RESPONSE OR ERROR
# -------------------------------------------------------------------
elif event_type in (
"response.completed",
"response.done",
"response.incomplete",
"response.failed",
):
conversation_ended = True
elif event_type == "error":
conversation_ended = True
else:
# Unhandled event type
continue
# -------------------------------------------------------------------
# 2) Append any new chunk to the buffer
# -------------------------------------------------------------------
if chunk:
buffer_parts.append(chunk)
event_count_since_flush += 1
# -------------------------------------------------------------------
# 3) Check if we should flush
# -------------------------------------------------------------------
if (
event_count_since_flush >= token_buffer_size
or conversation_ended
):
yield "".join(buffer_parts)
buffer_parts.clear()
event_count_since_flush = 0
if conversation_ended:
break
# 4) Final flush after loop ends, in case there's leftover text
if buffer_parts:
yield "".join(buffer_parts)
buffer_parts.clear()
except Exception as ex:
# On error, yield it and break
yield f"β {type(ex).__name__}: {ex}\n{''.join(traceback.format_exc(limit=5))}"
break
# ----------------------------------------------------------------
# 3) If we found any function calls, call them, append the results to bottom of conversation, and re-loop
# ----------------------------------------------------------------
if pending_function_calls:
tool_call_count += 1
# Step A: For each pending function call, add a "function_call" item,
# and prepare the async tasks for the actual tool calls.
tasks = []
for fc_item in pending_function_calls:
# Record the function_call in the conversation
input_messages.append(
{
"type": "function_call",
"call_id": fc_item.call_id,
"name": fc_item.name,
"arguments": fc_item.arguments,
}
)
# Look up and queue the tool callable
tool = __tools__.get(fc_item.name)
if tool is None:
continue
try:
args = json.loads(fc_item.arguments or "{}")
except:
args = {}
tasks.append(asyncio.create_task(tool["callable"](**args)))
# Step B: Collect the results of all tool calls
try:
results = await asyncio.gather(*tasks)
except Exception as ex:
results = [f"Error: {ex}"] * len(tasks)
# Step C: Append the "function_call_output" items
for fc_item, tool_result in zip(pending_function_calls, results):
input_messages.append(
{
"type": "function_call_output",
"call_id": fc_item.call_id,
"output": str(tool_result),
}
)
# Optionally emit a "citation" so user sees the tool's raw output
if __event_emitter__:
await __event_emitter__(
{
"type": "citation",
"data": {
"document": [
f"{fc_item.name.title()} Tool Output\n\n{tool_result}"
],
"metadata": [
{
"date_accessed": datetime.now().isoformat(),
"source": fc_item.name,
}
],
"source": {"name": f"{fc_item.name.title()} Tool"},
},
}
)
# Step D: If near the limit, warn
if tool_call_count == self.valves.MAX_TOOL_CALLS - 1:
input_messages.append(
{
"role": "system",
"content": (
f"[ToolCallLimitApproaching] You have used {tool_call_count} "
f"of {self.valves.MAX_TOOL_CALLS} tool calls in this response. "
"**Exactly one call** remains before the quota is exhausted.\n\n"
"β’ If that final call is essential, invoke it now.\n"
"β’ After this, we'll automatically set `tool_choice='none'`.\n\n"
"You can always ask **continue** (or similar) to start fresh "
"with a new tool-call quota."
),
}
)
# Step E: If we've hit or exceeded max calls, disable further tool use
if tool_call_count >= self.valves.MAX_TOOL_CALLS:
request_params["tool_choice"] = "none"
# Re-loop now that your conversation includes function_call + outputs
continue
# ----------------------------------------------------------------
# 4) If no function calls or conversation ended, exit
# ----------------------------------------------------------------
if conversation_ended or not pending_function_calls:
break
###############################################################################
# Module-level Helper Functions (Outside Pipe Class)
###############################################################################
def get_openai_client(valves):
"""
Lazy-create and cache a shared AsyncOpenAI client.
Rebuilds only if API key or base URL change.
Logs debug information if valves.DEBUG is True.
"""
cached = getattr(get_openai_client, "_client", None)
cfg = getattr(get_openai_client, "_cfg", None)
new_cfg = (valves.API_KEY, valves.BASE_URL)
if cached is None:
if valves.DEBUG:
print("[DEBUG] No OpenAI client cached yet β creating new one...")
elif cfg != new_cfg:
if valves.DEBUG:
print(
f"[DEBUG] Config changed! Old={cfg} New={new_cfg} β rebuilding OpenAI client..."
)
else:
if valves.DEBUG:
print("[DEBUG] Reusing cached OpenAI client.")
if cached is None or cfg != new_cfg:
transport = httpx.AsyncClient(http2=True, timeout=90)
cached = AsyncOpenAI(
api_key=valves.API_KEY,
base_url=valves.BASE_URL,
http_client=transport,
)
get_openai_client._client = cached
get_openai_client._cfg = new_cfg
if valves.DEBUG:
print("[DEBUG] OpenAI client initialized (HTTP/2 enabled)")
return cached
def prepare_tools(__tools__: Optional[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Converts user-provided tools into the standard format the OpenAI API expects.
Example of __tools__ input:
{
"websearch": {
"spec": {
"name": "websearch",
"description": "...",
"parameters": {...}
},
"callable": <async function>
},
"calculator": {
"spec": {
"name": "calculator",
"description": "...",
"parameters": {...}
},
"callable": <async function>
}
}
"""
tools = []
if __tools__:
for tool_name, tool_info in __tools__.items():
# Each tool has a "spec" dict describing its schema
spec = tool_info.get("spec", {})
# Force the type to "function" to match OpenAI's function calling
spec["type"] = "function"
tools.append(spec)
return tools
def transform_chat_messages_to_responses_api_format(messages):
"""
Convert WebUI Chat-Completions history β OpenAI Responses format.
INPUT EXAMPLE
[
{"role": "system", "content": "You are helpful."},
{"role": "user", "content": "Hi!"},
{"role": "assistant", "content": "Hello π"},
{
"role": "user",
"content": [
{"type": "text", "text": "Whatβs in this picture?"},
{"type": "image_url",
"image_url": {"url": "β¦"}}
]
}
]
OUTPUT EXAMPLE
{
"instructions": "You are helpful.",
"input": [
{"role": "user",
"content": [{"type": "input_text",
"text": "Hi!"}]},
{"role": "assistant",
"content": [{"type": "output_text",
"text": "Hello π"}]},
{"role": "user",
"content": [
{"type": "input_text",
"text": "Whatβs in this picture?"},
{"type": "input_image",
"image_url": "β¦"}
]}
]
}
"""
instructions = None
output = []
# ββ system β instructions βββββββββββββββββββββββββββββββββββββββββββββββ
if messages and messages[0].get("role") == "system":
instructions = str(messages[0].get("content", "")).strip()
messages = messages[1:]
# ββ convert remaining messages ββββββββββββββββββββββββββββββββββββββββββ
for msg in messages:
role = msg.get("role", "user")
is_assistant = role == "assistant"
items = msg.get("content", [])
if not isinstance(items, list):
items = [items]
converted = []
for item in items:
if item is None: # guard against nulls
continue
# A) structured dict items
if isinstance(item, dict):
itype = item.get("type", "text")
if is_assistant:
if itype == "refusal":
converted.append(
{
"type": "refusal",
"reason": item.get("reason", "No reason"),
}
)
else: # output text
converted.append(
{"type": "output_text", "text": item.get("text", "")}
)
else: # user
if itype == "image_url":
url = item.get("image_url", {}).get("url", "")
converted.append({"type": "input_image", "image_url": url})
else: # input text
converted.append(
{"type": "input_text", "text": item.get("text", "")}
)
# B) primitive str / int items
else:
text_val = item if isinstance(item, str) else str(item)
converted.append(
{
"type": "output_text" if is_assistant else "input_text",
"text": text_val,
}
)
output.append({"role": role, "content": converted})
return {"instructions": instructions, "input": output}