We're Hiring!
Whitepaper
Docs
Sign In
@rabbithole
·
10 months ago
·
10 months ago
function
n8n chat with status updates from n8n, and collapsable thinking content
Get
Last Updated
10 months ago
Created
10 months ago
Function
pipe
v1.0
Name
n8n chat with status updates from n8n, and collapsable thinking content
Downloads
282+
Saves
0+
Description
Allows you to chat with an n8n workflow, including receiving status updates from n8n. Returns collapsable thinking content
Function Code
Show
""" title: N8N pipe for OpenWebUI. Includes: - create an account here: https://webui.demodomain.dev/ to test the demo of this pip chatting with a live n8n workflow - pipe connects to n8n by sending user messages to an n8n webhook. Valves are: - URL for the main n8n webhook - URL for status check n8n webhook - pipe timeout (seconds) - delay between n8n status check (seconds) - bearer token for authorization to n8n - chatInput field value - output field value - pipe sets a looping function (using asyncio) to call another n8n webhook, every 2 seconds to get status updates, and display the latest status in the UI - updates from n8n can be "status", "no update", or "error" - handles errors gracefully and shuts down the function that checks for updates on a loop - uses OpenWebUi's __metadata__.get("chat_id") for chat session management with n8n - creates a collapsable element containing COT (Chain of Thought) <thinking> tags above the final message - see accompanying demo n8n workflow here: https://github.com/yupguv/openwebui/blob/main/n8n_workflow_openwebui_chat - the demo n8n workflow optionally uses supabase. Table definitions are here: https://github.com/yupguv/openwebui/blob/main/openwebui_n8n_supabase_tables author: demodomain.dev author_url: https://github.com/yupguv version: 1.0 license: MIT """ from typing import Optional, Callable, Awaitable from pydantic import BaseModel, Field import aiohttp import asyncio class Pipe: class Valves(BaseModel): n8n_url: str = Field( default="https://n8n.[your domain].com/webhook/[your webhook URL]" ) n8n_status_check: str = Field( default="https://n8n.[your domain].com/webhook/[your webhook URL]" ) n8n_bearer_token: str = Field(default="6gdw363gfsh37ydnvb7") input_field: str = Field(default="chatInput") response_field: str = Field(default="output") status_check_delay_seconds: int = Field(default=2) pipe_timeout_seconds: int = Field(default=10) def __init__(self): self.type = "pipe" self.id = "n8nConnector" self.name = "n8nConnector" self.valves = self.Valves() self.status_loop_task = None self.thinking_messages = [] async def emit_status( self, __event_emitter__: Callable[[dict], Awaitable[None]], level: str, message: str, __metadata__: dict, done: bool, body: dict, __user__: Optional[dict] = None, ): if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "status": "complete" if done else "in_progress", "level": level, "description": message, "done": done, }, } ) async def get_n8n_status( self, body: dict, __metadata__: dict, __user__: Optional[dict] = None, ) -> str: headers = { "Authorization": f"Bearer {self.valves.n8n_bearer_token}", "Content-Type": "application/json", } payload = { "sessionId": __metadata__.get("chat_id"), "user_id": __user__["id"], "input": "status check", } payload[self.valves.input_field] = "status check" try: async with aiohttp.ClientSession() as session: async with session.post( self.valves.n8n_status_check, json=payload, headers=headers, timeout=self.valves.pipe_timeout_seconds, ) as response: if response.status == 200: response_data = await response.json() if isinstance(response_data, list) and len(response_data) > 0: status_response = response_data[0].get( self.valves.response_field, "No response field found" ) else: status_response = response_data.get( self.valves.response_field, "No response field found" ) return status_response else: raise Exception( f"Error: {response.status} - {await response.text()}" ) except Exception as e: return f"Error fetching status: {str(e)}" async def status_loop( self, body, __user__, __event_emitter__, __metadata__: dict, ): while True: message = await self.get_n8n_status(body, __metadata__, __user__) if "Error - " in message: await __event_emitter__( { "type": "message", "data": {"content": message}, } ) await self.emit_status( __event_emitter__, "status", "", __metadata__, True, body, __user__, ) if self.status_loop_task is not None: self.status_loop_task.cancel() self.status_loop_task = None break else: if message == "no update": # do nothing pass elif "<details>" in message: message = message.replace(r"\n", "\n") self.thinking_messages.append( {"role": "assistant", "content": message} ) await __event_emitter__( { "type": "message", "data": {"content": message}, } ) else: await self.emit_status( __event_emitter__, "status", message, __metadata__, False, body, __user__, ) await asyncio.sleep(self.valves.status_check_delay_seconds) async def pipe( self, body: dict, __metadata__: dict, __user__: Optional[dict] = None, __event_emitter__: Callable[[dict], Awaitable[dict]] = None, __event_call__: Callable[[dict], Awaitable[dict]] = None, ) -> Optional[dict]: n8n_response = None messages = body.get("messages", []) # Verify a message is available if messages: question = messages[-1]["content"] if "Prompt: " in question: question = question.split("Prompt: ")[-1] try: await self.emit_status( __event_emitter__, "status", "sending...", __metadata__, False, body, __user__, ) self.status_loop_task = asyncio.create_task( self.status_loop(body, __user__, __event_emitter__, __metadata__) ) # Invoke N8N workflow headers = { "Authorization": f"Bearer {self.valves.n8n_bearer_token}", "Content-Type": "application/json", } payload = { "sessionId": __metadata__.get("chat_id"), "user_id": __user__["id"], } payload[self.valves.input_field] = question try: async with aiohttp.ClientSession() as session: async with session.post( self.valves.n8n_url, json=payload, headers=headers, timeout=self.valves.pipe_timeout_seconds, ) as response: if response.status == 200: # response_data = await response.json() response_data = await asyncio.wait_for( response.json(), timeout=self.valves.pipe_timeout_seconds, ) n8n_response = response_data[self.valves.response_field] else: if self.status_loop_task is not None: self.status_loop_task.cancel() self.status_loop_task = None raise Exception( f"Error: {response.status} - {await response.text()}" ) except aiohttp.ClientError as e: # Handle HTTP errors if self.status_loop_task is not None: self.status_loop_task.cancel() self.status_loop_task = None await self.emit_status( __event_emitter__, "status", f"HTTP request failed: {str(e)}", __metadata__, True, body, __user__, ) return {"error": f"HTTP request failed: {str(e)}"} except Exception as e: # Handle other errors if self.status_loop_task is not None: self.status_loop_task.cancel() self.status_loop_task = None await self.emit_status( __event_emitter__, "status", f"An error occurred: {str(e)}", __metadata__, True, body, __user__, ) return {"error": str(e)} # Check for NULL response if n8n_response is None: await self.emit_status( __event_emitter__, "status", "", __metadata__, True, body, __user__, ) n8n_response = ( "Received a NULL response from n8n. Please check the workflow." ) else: n8n_response = n8n_response.replace(r"\n", "\n") # Append "thinking" messages to the chat history for thinking_message in self.thinking_messages: body["messages"].append( { "type": "message", "data": {"content": thinking_message["content"]}, } ) # Append the final response as a new message body["messages"].append( {"type": "message", "data": {"content": n8n_response}} ) # Emit the final response as a new message await __event_emitter__( {"type": "message", "data": {"content": n8n_response}} ) except aiohttp.ClientError as e: if self.status_loop_task is not None: self.status_loop_task.cancel() self.status_loop_task = None return {"error": f"HTTP request failed: {str(e)}"} except Exception as e: # Handle unexpected errors if self.status_loop_task is not None: self.status_loop_task.cancel() self.status_loop_task = None await self.emit_status( __event_emitter__, "status", f"An unexpected error occurred: {str(e)}", __metadata__, True, body, __user__, ) return {"error": str(e)} # If no message is available, alert the user else: body["messages"].append( { "role": "assistant", "content": "No messages found in the request body", } ) if self.status_loop_task is not None: self.status_loop_task.cancel() self.status_loop_task = None await self.emit_status( __event_emitter__, "status", "", True, body, __user__, ) return body["messages"]
Sponsored by Open WebUI Inc.
We are hiring!
Shape the way humanity engages with
intelligence
.