Whitepaper
Docs
Sign In
Function
Function
pipe
v1.0
n8n chat with status updates from n8n, and collapsable thinking content
Function ID
n8nConnector
Creator
@rabbithole
Downloads
129+
Allows you to chat with an n8n workflow, including receiving status updates from n8n. Returns collapsable thinking content
Get
README
No README available
Function Code
Show
""" title: N8N pipe for OpenWebUI. Includes: - create an account here: https://webui.demodomain.dev/ to test the demo of this pip chatting with a live n8n workflow - pipe connects to n8n by sending user messages to an n8n webhook. Valves are: - URL for the main n8n webhook - URL for status check n8n webhook - pipe timeout (seconds) - delay between n8n status check (seconds) - bearer token for authorization to n8n - chatInput field value - output field value - pipe sets a looping function (using asyncio) to call another n8n webhook, every 2 seconds to get status updates, and display the latest status in the UI - updates from n8n can be "status", "no update", or "error" - handles errors gracefully and shuts down the function that checks for updates on a loop - uses OpenWebUi's __metadata__.get("chat_id") for chat session management with n8n - creates a collapsable element containing COT (Chain of Thought) <thinking> tags above the final message - see accompanying demo n8n workflow here: https://github.com/yupguv/openwebui/blob/main/n8n_workflow_openwebui_chat - the demo n8n workflow optionally uses supabase. Table definitions are here: https://github.com/yupguv/openwebui/blob/main/openwebui_n8n_supabase_tables author: demodomain.dev author_url: https://github.com/yupguv version: 1.0 license: MIT """ from typing import Optional, Callable, Awaitable from pydantic import BaseModel, Field import aiohttp import asyncio class Pipe: class Valves(BaseModel): n8n_url: str = Field( default="https://n8n.[your domain].com/webhook/[your webhook URL]" ) n8n_status_check: str = Field( default="https://n8n.[your domain].com/webhook/[your webhook URL]" ) n8n_bearer_token: str = Field(default="6gdw363gfsh37ydnvb7") input_field: str = Field(default="chatInput") response_field: str = Field(default="output") status_check_delay_seconds: int = Field(default=2) pipe_timeout_seconds: int = Field(default=10) def __init__(self): self.type = "pipe" self.id = "n8nConnector" self.name = "n8nConnector" self.valves = self.Valves() self.status_loop_task = None self.thinking_messages = [] async def emit_status( self, __event_emitter__: Callable[[dict], Awaitable[None]], level: str, message: str, __metadata__: dict, done: bool, body: dict, __user__: Optional[dict] = None, ): if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "status": "complete" if done else "in_progress", "level": level, "description": message, "done": done, }, } ) async def get_n8n_status( self, body: dict, __metadata__: dict, __user__: Optional[dict] = None, ) -> str: headers = { "Authorization": f"Bearer {self.valves.n8n_bearer_token}", "Content-Type": "application/json", } payload = { "sessionId": __metadata__.get("chat_id"), "user_id": __user__["id"], "input": "status check", } payload[self.valves.input_field] = "status check" try: async with aiohttp.ClientSession() as session: async with session.post( self.valves.n8n_status_check, json=payload, headers=headers, timeout=self.valves.pipe_timeout_seconds, ) as response: if response.status == 200: response_data = await response.json() if isinstance(response_data, list) and len(response_data) > 0: status_response = response_data[0].get( self.valves.response_field, "No response field found" ) else: status_response = response_data.get( self.valves.response_field, "No response field found" ) return status_response else: raise Exception( f"Error: {response.status} - {await response.text()}" ) except Exception as e: return f"Error fetching status: {str(e)}" async def status_loop( self, body, __user__, __event_emitter__, __metadata__: dict, ): while True: message = await self.get_n8n_status(body, __metadata__, __user__) if "Error - " in message: await __event_emitter__( { "type": "message", "data": {"content": message}, } ) await self.emit_status( __event_emitter__, "status", "", __metadata__, True, body, __user__, ) if self.status_loop_task is not None: self.status_loop_task.cancel() self.status_loop_task = None break else: if message == "no update": # do nothing pass elif "<details>" in message: message = message.replace(r"\n", "\n") self.thinking_messages.append( {"role": "assistant", "content": message} ) await __event_emitter__( { "type": "message", "data": {"content": message}, } ) else: await self.emit_status( __event_emitter__, "status", message, __metadata__, False, body, __user__, ) await asyncio.sleep(self.valves.status_check_delay_seconds) async def pipe( self, body: dict, __metadata__: dict, __user__: Optional[dict] = None, __event_emitter__: Callable[[dict], Awaitable[dict]] = None, __event_call__: Callable[[dict], Awaitable[dict]] = None, ) -> Optional[dict]: n8n_response = None messages = body.get("messages", []) # Verify a message is available if messages: question = messages[-1]["content"] if "Prompt: " in question: question = question.split("Prompt: ")[-1] try: await self.emit_status( __event_emitter__, "status", "sending...", __metadata__, False, body, __user__, ) self.status_loop_task = asyncio.create_task( self.status_loop(body, __user__, __event_emitter__, __metadata__) ) # Invoke N8N workflow headers = { "Authorization": f"Bearer {self.valves.n8n_bearer_token}", "Content-Type": "application/json", } payload = { "sessionId": __metadata__.get("chat_id"), "user_id": __user__["id"], } payload[self.valves.input_field] = question try: async with aiohttp.ClientSession() as session: async with session.post( self.valves.n8n_url, json=payload, headers=headers, timeout=self.valves.pipe_timeout_seconds, ) as response: if response.status == 200: # response_data = await response.json() response_data = await asyncio.wait_for( response.json(), timeout=self.valves.pipe_timeout_seconds, ) n8n_response = response_data[self.valves.response_field] else: if self.status_loop_task is not None: self.status_loop_task.cancel() self.status_loop_task = None raise Exception( f"Error: {response.status} - {await response.text()}" ) except aiohttp.ClientError as e: # Handle HTTP errors if self.status_loop_task is not None: self.status_loop_task.cancel() self.status_loop_task = None await self.emit_status( __event_emitter__, "status", f"HTTP request failed: {str(e)}", __metadata__, True, body, __user__, ) return {"error": f"HTTP request failed: {str(e)}"} except Exception as e: # Handle other errors if self.status_loop_task is not None: self.status_loop_task.cancel() self.status_loop_task = None await self.emit_status( __event_emitter__, "status", f"An error occurred: {str(e)}", __metadata__, True, body, __user__, ) return {"error": str(e)} # Check for NULL response if n8n_response is None: await self.emit_status( __event_emitter__, "status", "", __metadata__, True, body, __user__, ) n8n_response = ( "Received a NULL response from n8n. Please check the workflow." ) else: n8n_response = n8n_response.replace(r"\n", "\n") # Append "thinking" messages to the chat history for thinking_message in self.thinking_messages: body["messages"].append( { "type": "message", "data": {"content": thinking_message["content"]}, } ) # Append the final response as a new message body["messages"].append( {"type": "message", "data": {"content": n8n_response}} ) # Emit the final response as a new message await __event_emitter__( {"type": "message", "data": {"content": n8n_response}} ) except aiohttp.ClientError as e: if self.status_loop_task is not None: self.status_loop_task.cancel() self.status_loop_task = None return {"error": f"HTTP request failed: {str(e)}"} except Exception as e: # Handle unexpected errors if self.status_loop_task is not None: self.status_loop_task.cancel() self.status_loop_task = None await self.emit_status( __event_emitter__, "status", f"An unexpected error occurred: {str(e)}", __metadata__, True, body, __user__, ) return {"error": str(e)} # If no message is available, alert the user else: body["messages"].append( { "role": "assistant", "content": "No messages found in the request body", } ) if self.status_loop_task is not None: self.status_loop_task.cancel() self.status_loop_task = None await self.emit_status( __event_emitter__, "status", "", True, body, __user__, ) return body["messages"]