Whitepaper
Docs
Sign In
Tool
Tool
v1.0.1
n8n Workflow Documentation Assistant
Tool ID
n8n_workflow_documentation_assistant
Creator
@lanhild
Downloads
897+
Assist in generating documentation for an n8n workflow.
Get
README
No README available
Tool Code
Show
""" title: n8n Workflow Assistant author: Lanhild author_url: https://github.com/Lanhild version: 1.0.1 """ import requests import json from urllib.parse import urlparse, urljoin from pydantic import BaseModel, Field import asyncio from typing import Callable, Any class HelpFunctions: def __init__(self): pass class EventEmitter: def __init__(self, event_emitter: Callable[[dict], Any] = None): self.event_emitter = event_emitter async def emit(self, description="Unknown State", status="in_progress", done=False): if self.event_emitter: await self.event_emitter( { "type": "status", "data": { "status": status, "description": description, "done": done, }, } ) class Tools: class Valves(BaseModel): N8N_API_BASE_URL: str = Field( default=None, description="Your n8n API instance URL.", ) N8N_API_KEY: str = Field( default=None, description="Your n8n API key.", ) def __init__(self): self.valves = self.Valves() pass async def get_workflow_metadata( self, workflow_id: str, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Get the JSON of a workflow from its ID provided by the user. :param workflow_id: The ID of the workflow to get the JSON content for. :return: The JSON content of the workflow or an error message. """ functions = HelpFunctions() emitter = EventEmitter(__event_emitter__) await emitter.emit(f"Fetching workflow metadata for: {workflow_id}") base_url = self.valves.N8N_API_BASE_URL + '/workflows/' + workflow_id headers = { "x-n8n-api-key": self.valves.N8N_API_KEY, } try: await emitter.emit("Sending request to n8n instance") resp = requests.get( base_url, headers=headers ) resp.raise_for_status() data = resp.json() except requests.exceptions.RequestException as e: await emitter.emit( status="error", description=f"Error during workflow retrieval: {str(e)}", done=True, ) return json.dumps({"error": str(e)}) async def get_workflow_nodes( self, data: dict, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Parse values across the 'nodes' array of the workflow and return data about it. :params data: The JSON data of the workflow. :return: Information about the nodes in the workflow. """ emitter = EventEmitter(__event_emitter__) await emitter.emit(f"Computing data") workflow_last_update = data["updatedAt"] workflow_name = data["name"] workflow_nodes_count = len(data["nodes"]) data = data["nodes"] mapped_data = { nodes: { "name": f'{data["name"]}', "technical_name": f'{data["type"]}', } for i, nodes in enumerate(data["nodes"]) } await emitter.emit( status="complete", description="Workflow nodes data processed successfully", done=True, ) return f""" Give a description of the nodes used in the workflow from the JSON data in (get_workflow_metadata), include the name of the workflow (f"{workflow_name}") and the date the workflow was last updated at (f"{workflow_last_update}"): Show a standard table layout of each of these nodes: {mapped_data} Include the total count of nodes in the workflow (f"{workflow_nodes_count}"). """