"""
title: n8n Workflow Assistant
author: Lanhild
author_url: https://github.com/Lanhild
version: 1.0.1
"""
import requests
import json
from urllib.parse import urlparse, urljoin
from pydantic import BaseModel, Field
import asyncio
from typing import Callable, Any
class HelpFunctions:
def __init__(self):
pass
class EventEmitter:
def __init__(self, event_emitter: Callable[[dict], Any] = None):
self.event_emitter = event_emitter
async def emit(self, description="Unknown State", status="in_progress", done=False):
if self.event_emitter:
await self.event_emitter(
{
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
)
class Tools:
class Valves(BaseModel):
N8N_API_BASE_URL: str = Field(
default=None,
description="Your n8n API instance URL.",
)
N8N_API_KEY: str = Field(
default=None,
description="Your n8n API key.",
)
def __init__(self):
self.valves = self.Valves()
pass
async def get_workflow_metadata(
self,
workflow_id: str,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Get the JSON of a workflow from its ID provided by the user.
:param workflow_id: The ID of the workflow to get the JSON content for.
:return: The JSON content of the workflow or an error message.
"""
functions = HelpFunctions()
emitter = EventEmitter(__event_emitter__)
await emitter.emit(f"Fetching workflow metadata for: {workflow_id}")
base_url = self.valves.N8N_API_BASE_URL + '/workflows/' + workflow_id
headers = {
"x-n8n-api-key": self.valves.N8N_API_KEY,
}
try:
await emitter.emit("Sending request to n8n instance")
resp = requests.get(
base_url, headers=headers
)
resp.raise_for_status()
data = resp.json()
except requests.exceptions.RequestException as e:
await emitter.emit(
status="error",
description=f"Error during workflow retrieval: {str(e)}",
done=True,
)
return json.dumps({"error": str(e)})
async def get_workflow_nodes(
self,
data: dict,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Parse values across the 'nodes' array of the workflow and return data about it.
:params data: The JSON data of the workflow.
:return: Information about the nodes in the workflow.
"""
emitter = EventEmitter(__event_emitter__)
await emitter.emit(f"Computing data")
workflow_last_update = data["updatedAt"]
workflow_name = data["name"]
workflow_nodes_count = len(data["nodes"])
data = data["nodes"]
mapped_data = {
nodes: {
"name": f'{data["name"]}',
"technical_name": f'{data["type"]}',
}
for i, nodes in enumerate(data["nodes"])
}
await emitter.emit(
status="complete",
description="Workflow nodes data processed successfully",
done=True,
)
return f"""
Give a description of the nodes used in the workflow from the JSON data in (get_workflow_metadata), include the name of the workflow (f"{workflow_name}") and the date the workflow was last updated at (f"{workflow_last_update}"):
Show a standard table layout of each of these nodes: {mapped_data}
Include the total count of nodes in the workflow (f"{workflow_nodes_count}").
"""