Whitepaper
Docs
Sign In
Function
Function
pipe
v1.0.0
N8N Pipe w/Permissions
Function ID
n8n_pipe_perm
Creator
@jbaldinips
Downloads
74+
Extended Cole Medins Pipe Class to include user group valves. Enabling only admins and select user groups to access the integration.
Get
README
No README available
Function Code
Show
""" title: n8n Pipe Function author: Cole Medin, Joe Baldini version: 1.0.0 A Modified version of Cole's Pipe class with access controls for specific user groups and admin roles. Also included self.icon to distinguish "Agent" models from any existing models. """ from typing import Optional, Callable, Awaitable from pydantic import BaseModel, Field import time import requests class Pipe: class Valves(BaseModel): n8n_url: str = Field( default="https://n8n.[your domain].com/webhook/[your webhook URL]", description="The URL of your n8n webhook", ) n8n_bearer_token: str = Field( default="...", description="Bearer token for n8n webhook" ) input_field: str = Field(default="chatInput", description="Key for input data") response_field: str = Field( default="output", description="Key for response data" ) emit_interval: float = Field( default=2.0, description="Interval in seconds between status emissions" ) enable_status_indicator: bool = Field( default=True, description="Enable or disable status indicator emissions" ) groups_allowed: list[str] = Field( default=["marketing,call_center"], description="List of user groups allowed to use this pipe", ) def __init__(self): self.type = "pipe" self.id = "n8n_pipe" self.name = "N8N Pipe" self.icon = "🧩" self.valves = self.Valves() self.last_emit_time = 0 async def emit_status( self, __event_emitter__: Callable[[dict], Awaitable[None]], level: str, message: str, done: bool, ): current_time = time.time() if ( __event_emitter__ and self.valves.enable_status_indicator and ( current_time - self.last_emit_time >= self.valves.emit_interval or done ) ): await __event_emitter__( { "type": "status", "data": { "status": "complete" if done else "in_progress", "level": level, "description": message, "done": done, }, } ) self.last_emit_time = current_time async def pipe( self, body: dict, __user__: Optional[dict] = None, __event_emitter__: Callable[[dict], Awaitable[None]] = None, __event_call__: Callable[[dict], Awaitable[dict]] = None, ) -> Optional[dict]: await self.emit_status( __event_emitter__, "info", "Calling n8n workflow...", False ) # 🔐 Check access: must be in allowed groups OR have admin role if __user__: user_role = __user__.get("role", "") user_groups = __user__.get("groups", []) if user_role != "admin" and not any( group in self.valves.groups_allowed for group in user_groups ): await self.emit_status( __event_emitter__, "error", f"Access denied: not in {self.valves.groups_allowed} and not admin", True, ) return { "error": "Access denied. This pipe is only available to certain groups or admins." } messages = body.get("messages", []) if not messages: await self.emit_status( __event_emitter__, "error", "No messages found in the request body", True, ) return {"error": "No messages found in the request body"} # Extract the latest user message question = messages[-1]["content"] if "Prompt: " in question: question = question.split("Prompt: ")[-1] try: # Prepare and send payload to n8n webhook headers = { "Authorization": f"Bearer {self.valves.n8n_bearer_token}", "Content-Type": "application/json", } session_id = f"{__user__['id']} - {messages[0]['content'].split('Prompt: ')[-1][:100]}" payload = { "sessionId": session_id, self.valves.input_field: question, } response = requests.post(self.valves.n8n_url, json=payload, headers=headers) if response.status_code != 200: raise Exception(f"{response.status_code} - {response.text}") n8n_response = response.json().get( self.valves.response_field, "No output returned." ) # Append response as assistant message body["messages"].append({"role": "assistant", "content": n8n_response}) except Exception as e: await self.emit_status( __event_emitter__, "error", f"Error during execution: {str(e)}", True, ) return {"error": str(e)} await self.emit_status(__event_emitter__, "info", "Complete", True) return n8n_response