Tool
v0.1.0
Make Webhook Action Button
Button that sends a payload to a make.com webhook
Tool ID
make_webhook_action_button
Creator
@mattrob333
Downloads
23+

Tool Content
python
"""
title: Make Webhook Action
author: MattRob333
version: 0.1.0
license: MIT
icon_url: 
required_open_webui_version: 0.3.10
"""

import requests
from datetime import datetime
from pydantic import BaseModel, Field
from typing import Callable, Any, Dict

DEBUG = True


class Action:
    class Valves(BaseModel):
        WEBHOOK_URL: str = Field(
            default="Add your Make.com webhook URL here",
            description="Your Make.com webhook URL.",
        )
        HOVER_DESCRIPTION: str = Field(
            default="Send to Make.com scenario",
            description="Custom description that appears when hovering over the icon.",
        )

    def __init__(self):
        self.valves = self.Valves()

    def status_object(
        self,
        description: str = "Unknown State",
        status: str = "in_progress",
        done: bool = False,
    ) -> Dict:
        return {
            "type": "status",
            "data": {
                "status": status,
                "description": description,
                "done": done,
            },
        }

    async def action(
        self,
        body: dict,
        __user__: dict = {},
        __event_emitter__: Callable[[dict], Any] = None,
        __event_call__: Callable[[dict], Any] = None,
    ) -> None:
        if DEBUG:
            print(f"Debug: Make.com Webhook action invoked")

        try:
            if __event_emitter__:
                await __event_emitter__(
                    self.status_object("Initializing Make.com Webhook")
                )

            if not self.valves.WEBHOOK_URL:
                raise ValueError("Webhook URL is not configured")

            # Get the last assistant message from the conversation
            messages = body.get("messages", [])
            assistant_message = next(
                (
                    message.get("content")
                    for message in reversed(messages)
                    if message.get("role") == "assistant"
                ),
                None,
            )

            if not assistant_message:
                raise ValueError("No assistant message found to send")

            if __event_emitter__:
                await __event_emitter__(
                    self.status_object("Sending content to Make.com")
                )

            # Prepare the payload
            data = {
                "content": assistant_message,
                "timestamp": datetime.now().isoformat(),
                "type": "ai_response",
                "description": self.valves.HOVER_DESCRIPTION,
            }

            # Send to Make.com
            response = requests.post(
                self.valves.WEBHOOK_URL,
                json=data,
                headers={
                    "Content-Type": "application/json",
                    "User-Agent": "OpenWebUI-Webhook",
                },
            )

            if response.status_code in [200, 201, 204]:
                if __event_emitter__:
                    await __event_emitter__(
                        self.status_object(
                            "Content successfully sent to Make.com",
                            status="complete",
                            done=True,
                        )
                    )
                    await __event_emitter__(
                        {
                            "type": "message",
                            "data": {
                                "content": f"\n\n---\nContent sent to Make.com webhook ({self.valves.HOVER_DESCRIPTION})\n"
                            },
                        }
                    )
            else:
                raise ValueError(
                    f"Failed to send content. Status Code: {response.status_code}"
                )

        except Exception as e:
            if DEBUG:
                print(f"Debug: Error in action method: {str(e)}")
            if __event_emitter__:
                await __event_emitter__(
                    self.status_object(f"Error: {str(e)}", status="error", done=True)
                )