"""
title: OpenAI Responses API Pipeline
id: openai_responses_api_pipeline
author: Justin Kropp
author_url: https://github.com/jrkropp
required_open_webui_version: 0.4.0
description: Brings OpenAI Response API support to Open WebUI. Streams <think> reasoning (if enabled) and assistant text using the OpenAI Responses SDK, with early support for images, reasoning, and status indicators.
version: 1.5.9
license: MIT
requirements: openai>=1.75.0
-----------------------------------------------------------------------------------------
📌 OVERVIEW
-----------------------------------------------------------------------------------------
This pipeline brings OpenAI Responses API support to Open WebUI. Just drop it in to
leverage models and features exclusive to the Responses API. Clone the pipeline for each
model you want to add.
IMPORTANT NOTE: I have NOT run this in production yet. I will further improve &
refine over the next few weeks so check back for new versions.
-----------------------------------------------------------------------------------------
✨ FEATURES
-----------------------------------------------------------------------------------------
• 💬 Reasoning (chain-of-thought) control via `REASON_SUMMARY` and `REASON_EFFORT`.
• 🔨 Native OpenAI tool calling support (integrates cleanly with OpenWebUI tools).
• 🖼️ Image input support via `"input_image"` (base64 format).
• 🐞 Optional debug mode (`DEBUG=True`) for inspecting requests and streamed events.
• 🌐 Integrated OpenAI web search tool (if `ENABLE_WEB_SEARCH=True`).
• 💬 Minimal overhead for bridging OpenWebUI messages to OpenAI's Responses API.
-----------------------------------------------------------------------------------------
🛠️ CHANGELOG
-----------------------------------------------------------------------------------------
• 1.5.9
- Fixed bug where web_search tool could cause OpenAI responses to loop indefinitely.
- Introduced `MAX_TOOL_CALLS` valve (default 5) to limit the number of tool calls in a single request as extra safety precaution.
- Set PARALLEL_TOOL_CALLS valve default to False (prev. True) to mirror OpenAI's default behavior.
• 1.5.8
- Polished docstrings and streamlined debug logging output.
- Refactored code for improved readability.
• 1.5.7
- Introduced native tool support for OpenAI! Integrate with OpenWebUI tools.
• 1.5.6
- Fixed minor bugs in function calling and improved performance for large messages.
- Introduced partial support for multi-modal input.
-----------------------------------------------------------------------------------------
🧭 ROADMAP
-----------------------------------------------------------------------------------------
⭢ Usage Stats Integration
Display token usage, cost estimates, and related metrics directly in the Open WebUI UI.
⭢ Tool Call Citations
Automatically generate citations when built-in or custom tools are invoked,
improving traceability and output explainability.
⭢ Message Transform Refactor
Rewrite `transform_messages_for_responses()` for improved readability, structure,
and maintainability.
-----------------------------------------------------------------------------------------
💡 TIPS
-----------------------------------------------------------------------------------------
See below for a drafted System Prompt that works well with GPT-4.1:
---
You are ChatGPT, a large language model trained by OpenAI.
Current date: {{CURRENT_DATE}}
Over the course of conversation, adapt to the user’s tone and preferences. Try to match the user’s vibe, tone, and generally how they are speaking. You want the conversation to feel natural. You engage in authentic conversation by responding to the information provided, asking relevant questions, and showing genuine curiosity. If natural, use information you know about the user to personalize your responses and ask a follow up question.
Do *NOT* ask for *confirmation* between each step of multi-stage user requests. However, for ambiguous requests, you *may* ask for *clarification* (but do so very sparingly only when you need to).
You *must* browse the web for *any* query that could benefit from up-to-date or niche information, unless the user explicitly asks you not to browse the web. If there is even slight doubt about the currency of your knowledge, err on the side of searching, since outdated or incomplete replies frustrate the user and violate their expectations; after browsing, respond in clear, well‑formatted markdown (no top‑level heading) unless the user asks for another format.
When in doubt, browse. The user will appreciate the extra effort.
When fulfilling a request that involves multiple tools, think through the steps out loud first and explain your reasoning to the user before taking any action.
Call tools in a logical order that respects dependencies. For example, if a value is needed from a web search before performing a calculation, search first.
Before calling any tool, clearly tell the user which tool you’re using and why you’re using it.
If you are asked what model you are, say **OpenAI 4.1**.
The Yap score measures verbosity; aim for responses ≤ Yap words. Overly verbose responses when Yap is low (or overly terse when Yap is high) may be penalized. Today's Yap score is **8192**.
Avoid excessive use of tables in your responses. Use them only when they add clear value. Most tasks won’t benefit from a table. Do not write code in tables; it will not render correctly.
"""
from __future__ import annotations
import os
import re
import traceback
import ast # for parsing single-quoted dict strings (like "{'type':'image_url', ...}")
import json # for pretty-printing debug info
from datetime import datetime
from typing import (
Any,
AsyncGenerator,
Dict,
List,
Optional,
Callable,
Awaitable,
Iterator,
Generator,
)
from openai import AsyncOpenAI
from pydantic import BaseModel, Field, validator
from starlette.responses import StreamingResponse
from fastapi import Request
class Pipe:
"""
A pipeline for streaming responses from the OpenAI Responses API.
The Responses API builds on the Chat Completions API by introducing server-side state,
built-in tools, and a rich event-based streaming format. This pipeline bridges the gap
between Open WebUI's Chat Completions-style input/output and the Responses API's
item-based schema.
Notes:
- Ensure native function calling is enabled when using tools. To enable native
function calling navigate to **Model Settings** → **Advanced Params** → **Function Calling**
and set it to **"Native"**.
"""
class Valves(BaseModel):
"""
These valves map directly to OpenAI's 'responses.create' parameters.
"""
BASE_URL: Optional[str] = Field(
default="https://api.openai.com/v1",
description="The base URL to use with the OpenAI SDK. Defaults to the official OpenAI API endpoint.",
)
API_KEY: str = Field(
default=None,
description=(
"Your OpenAI API key. If left blank, the environment variable "
"'OPENAI_API_KEY' will be used instead."
),
)
MODEL_ID: str = Field(
default="gpt-4.1",
description=(
"Model ID used to generate responses. Defaults to 'gpt-4.1'. "
"Note: The model ID must be a valid OpenAI model ID. E.g. 'gpt-4o', 'o3', etc."
),
)
REASON_SUMMARY: Optional[str] = Field(
default=None,
description=(
"Reasoning summary style for o-series models (auto | concise | detailed)"
"Leave blank to skip passing 'reasoning' (default)."
),
)
REASON_EFFORT: Optional[str] = Field(
default=None,
description=(
"Reasoning effort level (low | medium | high). "
"Leave blank to skip passing 'reasoning' if not needed (default)."
),
)
ENABLE_WEB_SEARCH: bool = Field(
default=False,
description=(
"Whether to enable the built-in 'web_search' tool. "
"If True, adds {'type': 'web_search'} to tools (unless already present)."
),
)
SEARCH_CONTEXT_SIZE: Optional[str] = Field(
default="medium",
description=(
"Specifies the OpenAI web search context size: low | medium | high. "
"Default is 'medium'. Affects cost, quality, and latency"
"Only used if ENABLE_WEB_SEARCH=True."
),
)
PARALLEL_TOOL_CALLS: bool = Field(
default=False,
description=(
"Whether tool calls can be parallelized. Defaults to True if not set."
),
)
MAX_TOOL_CALLS: int = Field(
default=10,
description=(
"Maximum number of tool calls the model can make in a single request. "
"This is a hard stop safety limit to prevent infinite loops. "
"Defaults to 10."
),
)
STORE_RESPONSE: bool = Field(
default=False,
description=(
"Whether to store the generated model response (on OpenAI's side) for later retrieval via API. "
"Defaults to False."
),
)
DEBUG: bool = Field(
default=False,
description="When True, prints debug statements to the console. Do not leave this on in production.",
)
def __init__(self):
self.valves = self.Valves()
# Optionally, you can set the id and name of the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
# self.id = "openai_pipeline"
self.name = f"OpenAI: {self.valves.MODEL_ID}"
async def on_startup(self):
# This function is called when the server is started.
print(f"on_startup:{__name__}")
pass
async def on_shutdown(self):
# This function is called when the server is stopped.
print(f"on_shutdown:{__name__}")
pass
async def on_valves_updated(self):
print(f"[DEBUG] on_valves_updated: {__name__}")
pass
async def pipe(
self,
body: dict[str, Any],
__user__: Dict[str, Any],
__request__: Request,
__event_emitter__: Callable[["Event"], Awaitable[None]],
__event_call__: Callable[[dict[str, Any]], Awaitable[Any]],
__task__: str,
__task_body__: dict[str, Any],
__files__: list[dict[str, Any]],
__metadata__: dict[str, Any],
__tools__: dict[str, Any],
):
"""
Main pipeline method that streams from 'responses.create(...)'.
1) If REASON_SUMMARY or REASON_EFFORT are set, we pass 'reasoning'.
2) We forcibly set 'stream=True' so the response is always streamed.
3) On 'ANNOTATION_ADDED', parse out 'title' and 'url', remove
'?utm_source=openai' or '&utm_source=openai', and emit a 'citation'.
"""
# ------------------------------------------------------------
# 0) Tools => must have "native" function calling
# ------------------------------------------------------------
if __tools__ and __metadata__.get("function_calling") != "native":
yield (
"🛑 Tools detected, but native function calling is disabled.\n\n"
'To enable tools in this chat, switch **Function Calling** to **"Native"** under:\n'
"⚙️ **Chat Controls** → **Advanced Params** → **Function Calling**\n\n"
"If you're an admin, you can also set this at the **model level**:\n"
"**Model Settings** → **Advanced Params** → **Function Calling = Native**"
)
in_think = False
# ------------------------------------------------------------
# Show all function parameters if DEBUG is True (for exploration)
# ------------------------------------------------------------
# if self.valves.DEBUG:
# print("---------------------------------------------------------")
# print("[DEBUG] Pipe function called with parameters:")
# all_params = {
# #"body": body,
# #"__user__": __user__,
# #"__request__": __request__,
# #"__event_emitter__": __event_emitter__,
# #"__event_call__": __event_call__,
# #"__task__": __task__,
# #"__task_body__": __task_body__,
# #"__files__": __files__,
# #"__metadata__": __metadata__,
# #"__tools__": __tools__,
# }
# print(json.dumps(all_params, indent=2, default=str))
# print("---------------------------------------------------------")
try:
from openai import AsyncOpenAI
self.client = AsyncOpenAI(
base_url=self.valves.BASE_URL,
api_key=self.valves.API_KEY,
timeout=90,
)
# Use the transform function
transformed_messages = transform_messages_for_responses(
body.get("messages", [])
)
# -----------------------------------------------------------------
# Prepare final list of tools
# -----------------------------------------------------------------
tools = []
# Merge user-provided tools from __tools__
if __tools__:
for tool_name, tool_info in __tools__.items():
spec = tool_info.get("spec", {})
# Ensure we set type=function if missing
if "type" not in spec:
spec["type"] = "function"
tools.append(spec)
# Optionally append the built-in 'web_search' if enabled
if self.valves.ENABLE_WEB_SEARCH and not any(
t.get("type") == "web_search" for t in tools
):
tools.append(
{
"type": "web_search",
"search_context_size": self.valves.SEARCH_CONTEXT_SIZE
or "medium",
}
)
# Build request params
request_params = {
"model": self.valves.MODEL_ID,
"tools": tools,
"tool_choice": "auto",
"parallel_tool_calls": self.valves.PARALLEL_TOOL_CALLS,
"max_output_tokens": body.get("max_tokens"),
"temperature": body.get("temperature") or 1.0,
"top_p": body.get("top_p") or 1.0,
"user": __user__.get("email"),
"store": self.valves.STORE_RESPONSE,
"text": {"format": {"type": "text"}},
"truncation": "auto",
"stream": True, # Always streaming
}
# Configure OpenAI reasoning features if enabled
if self.valves.REASON_EFFORT or self.valves.REASON_SUMMARY:
request_params["reasoning"] = {}
if self.valves.REASON_EFFORT:
request_params["reasoning"]["effort"] = self.valves.REASON_EFFORT
if self.valves.REASON_SUMMARY:
request_params["reasoning"]["summary"] = self.valves.REASON_SUMMARY
# input_messages = instructions + user input + function_call_output, etc.
input_messages = []
if transformed_messages["instructions"]:
request_params["instructions"] = transformed_messages["instructions"]
input_messages.extend(transformed_messages["input"])
in_think = False
tool_round: int = 0 # how many times we’ve re‑entered
MAX_ROUNDS = self.valves.MAX_TOOL_CALLS # e.g. 6, set in your valves
while tool_round <= MAX_ROUNDS:
need_rerun = False
# 1) Prepare request input
request_params["input"] = input_messages
if self.valves.DEBUG:
print("[DEBUG] About to call OpenAI with input_messages:")
print(json.dumps(input_messages, indent=2, default=str))
# 2) Make a single streaming request
response_stream = await self.client.responses.create(**request_params)
try:
async for event in response_stream:
if self.valves.DEBUG:
print(
"[DEBUG] Stream event:\n",
json.dumps(event.__dict__, indent=2, default=str),
)
match event.type:
# ------------------------------------------------------------------
# Completed Output Item => could be function_call or assistant message
# ------------------------------------------------------------------
case "response.output_item.done":
item = getattr(event, "item", None)
if not item:
break
if item.type == "function_call":
# The model wants to call a tool
# Add the function_call to input_messages
input_messages.append(
{
"type": "function_call",
"call_id": item.call_id,
"name": item.name,
"arguments": item.arguments,
}
)
# Run the tool
tool_obj = __tools__.get(item.name)
if tool_obj:
try:
parsed_args = json.loads(
item.arguments or "{}"
)
result = await tool_obj["callable"](
**parsed_args
)
input_messages.append(
{
"type": "function_call_output",
"call_id": item.call_id,
"output": str(result),
}
)
need_rerun = True # re-run the pipeline with the tools result
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"✅ Tool call finished: {item.name}; result={result}",
"done": True,
},
}
)
except Exception as ex:
if self.valves.DEBUG:
print(f"[DEBUG] Tool call error: {ex}")
else:
if self.valves.DEBUG:
print(
f"[DEBUG] Tool '{item.name}' not found!"
)
elif item.type == "message" and hasattr(item, "role"):
# This is a final chunk of the assistant's text
# Collect all textual parts
text_content = "".join(
part.text or ""
for part in getattr(item, "content", [])
if getattr(part, "type", "") == "output_text"
)
input_messages.append(
{"role": item.role, "content": text_content}
)
# ------------------------------------------------------------------
# Reasoning summary streaming (chain-of-thought)
# ------------------------------------------------------------------
case "response.reasoning_summary_text.delta":
if not in_think:
yield "<think>"
in_think = True
yield event.delta
case "response.reasoning_summary_text.done":
yield "\n"
# ------------------------------------------------------------------
# Assistant partial text streaming
# ------------------------------------------------------------------
case "response.output_text.delta":
if in_think:
yield "</think>\n"
in_think = False
yield event.delta
case "response.output_text.done":
# We finalize it upon "response.output_item.done"
yield "\n\n"
# ------------------------------------------------------------------
# Web search status
# ------------------------------------------------------------------
case (
"response.web_search_call.in_progress"
| "response.web_search_call.searching"
):
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "🔍 Web search in progress...",
"done": False,
},
}
)
case "response.web_search_call.completed":
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "✅ Web search complete.",
"done": True,
},
}
)
# ------------------------------------------------------------------
# Citations
# ------------------------------------------------------------------
case "response.output_text.annotation.added":
raw_anno = str(getattr(event, "annotation", ""))
title_match = re.search(r"title='([^']*)'", raw_anno)
url_match = re.search(r"url='([^']*)'", raw_anno)
title = (
title_match.group(1)
if title_match
else "Unknown Title"
)
url = url_match.group(1) if url_match else ""
url = url.replace("?utm_source=openai", "").replace(
"&utm_source=openai", ""
)
if __event_emitter__:
await __event_emitter__(
{
"type": "citation",
"data": {
"document": [title],
"metadata": [
{
"date_accessed": datetime.now().isoformat(),
"source": title,
}
],
"source": {"name": url, "url": url},
},
}
)
# ------------------------------------------------------------------
# Terminal event => done with this request
# ------------------------------------------------------------------
case (
"response.completed"
| "response.done"
| "response.failed"
| "response.incomplete"
):
if self.valves.DEBUG:
print("[DEBUG] Terminal event:", event.type)
continue # we just keep consuming until the stream ends
# ------------------------------------------------------------------
# Catch-all
# ------------------------------------------------------------------
case _:
if self.valves.DEBUG:
print(f"[DEBUG] Unhandled event type: {event.type}")
except Exception as ex:
# If there's an error, yield it and break out
yield f"❌ {type(ex).__name__}: {ex}\n{''.join(traceback.format_exc(limit=5))}"
break
# Decide whether to re‑enter the loop
if need_rerun:
tool_round += 1
# after you increment tool_round …
if tool_round > MAX_ROUNDS:
yield f"⚠️ ToolLimitReached: hit {tool_round}/{MAX_ROUNDS} calls – stopping to avoid an infinite loop.\n"
break # <-- don’t re‑enter the while‑loop
else:
continue # go round again with new tool output
else:
break # no tool call → conversation finished
except Exception as exc:
if self.valves.DEBUG:
print("[DEBUG] Top-level Exception in pipe():")
print(
json.dumps(
{"error_type": str(type(exc).__name__), "error": str(exc)},
indent=2,
default=str,
)
)
yield f"❌ {type(exc).__name__}: {exc}\n{''.join(traceback.format_exc(limit=5))}"
def transform_messages_for_responses(messages: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Transforms a Chat Completions-style conversation (Open WebUI style)
into the input format expected by the OpenAI Responses API.
------------------------------------------------------------------------------------
WHY THIS FUNCTION EXISTS:
------------------------------------------------------------------------------------
- The Chat Completions API:
* Expects an array of "messages", each with:
{ "role": <"system"|"user"|"assistant">, "content": <string or structured data> }
* 'system' is often used for high-level instructions, 'user' for end-user requests,
and 'assistant' for model responses.
- The Responses API:
* Expects "instructions" (optional) plus "input" which is an array of objects:
[
{
"role": <"developer"|"user"|"assistant">,
"content": [
{ "type": "input_text", "text": ... },
{ "type": "input_image", "image_url": ... },
{ "type": "output_text", "text": ... },
{ "type": "refusal", "reason": ... },
...
]
}
]
* There's also an "instructions" parameter for system-level or developer instructions.
The Responses API can handle multi-step tasks, tool calls, structured events, etc.
Chat Completions lumps everything into role/content pairs. This function bridges that gap.
------------------------------------------------------------------------------------
WHAT THIS FUNCTION DOES:
------------------------------------------------------------------------------------
1) If the first message is "system", treat it as "instructions".
The rest go into "input".
2) For "user" messages, content becomes "input_text" or "input_image".
3) For "assistant" messages, content becomes "output_text" or "refusal".
4) If content is a list of typed objects, parse each individually.
If content is a string, try to parse it as a single-quoted dict; fallback to text.
------------------------------------------------------------------------------------
EXAMPLE:
------------------------------------------------------------------------------------
Chat Completions messages (e.g., from Open WebUI):
[
{ "role": "system", "content": "You are a helpful assistant named Steve." },
{ "role": "user", "content": "Hi, what's your name?" },
{ "role": "assistant", "content": "Hello! I'm Steve. How can I help?" },
{
"role": "user",
"content": [
{ "type": "text", "text": "Describe this" },
{ "type": "image_url", "image_url": { "url": "data:image/jpeg;base64,...." } }
]
}
]
=> transform_messages_for_responses(messages):
{
"instructions": "You are a helpful assistant named Steve.",
"input": [
{
"role": "user",
"content": [
{ "type": "input_text", "text": "Hi, what's your name?" }
]
},
{
"role": "assistant",
"content": [
{ "type": "output_text", "text": "Hello! I'm Steve. How can I help?" }
]
},
{
"role": "user",
"content": [
{ "type": "input_text", "text": "Describe this" },
{ "type": "input_image", "image_url": "data:image/jpeg;base64,...." }
]
}
]
}
This dictionary can then be passed to "client.responses.create(...)", e.g.:
client.responses.create(
model="gpt-4.1",
instructions=transformed["instructions"],
input=transformed["input"],
...
)
------------------------------------------------------------------------------------
"""
# 1) Check if the first message is "system" => instructions
instructions = None
usable = messages[:]
if usable and usable[0].get("role") == "system":
instructions = str(usable[0].get("content", "")).strip()
usable = usable[1:]
transformed_input = []
def parse_single(content_item, role):
"""
Convert a single content item to a typed format valid in the Responses API.
- user => "input_text" or "input_image"
- assistant => "output_text" or "refusal"
"""
# Try to parse single-quoted dict if it's a string
parsed = None
if isinstance(content_item, str):
try:
attempt = ast.literal_eval(content_item)
if isinstance(attempt, dict):
parsed = attempt
else:
parsed = {"type": "text", "text": content_item}
except Exception:
# parse failed, treat as plain text
parsed = {"type": "text", "text": content_item}
elif isinstance(content_item, dict):
parsed = content_item
else:
parsed = {"type": "text", "text": str(content_item)}
ptype = parsed.get("type", "text")
# Now map ptype => actual accepted type
if role == "assistant":
# default => output_text, or if ptype=refusal => refusal
if ptype == "refusal":
reason = parsed.get("reason", "No reason given")
return {"type": "refusal", "reason": reason}
else:
text_val = parsed.get("text", content_item)
return {"type": "output_text", "text": text_val}
else:
# user => input_text or input_image
if ptype == "image_url":
image_url = parsed.get("image_url", {}).get("url", "")
return {"type": "input_image", "image_url": image_url}
else:
text_val = parsed.get("text", content_item)
return {"type": "input_text", "text": text_val}
# 2) Convert each message
for msg in usable:
role = msg.get("role", "user")
raw_content = msg.get("content", "")
final_items = []
if isinstance(raw_content, list):
# Possibly multiple typed objects
for c in raw_content:
final_items.append(parse_single(c, role))
else:
# single chunk
final_items.append(parse_single(raw_content, role))
transformed_input.append({"role": role, "content": final_items})
return {"instructions": instructions, "input": transformed_input}