"""
title: Make Webhook Action
author: MattRob333
version: 0.1.0
license: MIT
icon_url: data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHdpZHRoPSIyNCIgaGVpZ2h0PSIyNCIgdmlld0JveD0iMCAwIDI0IDI0IiBmaWxsPSJub25lIiBzdHJva2U9ImN1cnJlbnRDb2xvciIgc3Ryb2tlLXdpZHRoPSIyIiBzdHJva2UtbGluZWNhcD0icm91bmQiIHN0cm9rZS1saW5lam9pbj0icm91bmQiIGNsYXNzPSJsdWNpZGUgbHVjaWRlLXJvdXRlIj48Y2lyY2xlIGN4PSI2IiBjeT0iMTkiIHI9IjMiLz48cGF0aCBkPSJNOSAxOWg4LjVhMy41IDMuNSAwIDAgMCAwLTdoLTExYTMuNSAzLjUgMCAwIDEgMC03SDE1Ii8+PGNpcmNsZSBjeD0iMTgiIGN5PSI1IiByPSIzIi8+PC9zdmc+
required_open_webui_version: 0.3.10
"""
import requests
from datetime import datetime
from pydantic import BaseModel, Field
from typing import Callable, Any, Dict
DEBUG = True
class Action:
class Valves(BaseModel):
WEBHOOK_URL: str = Field(
default="Add your Make.com webhook URL here",
description="Your Make.com webhook URL.",
)
HOVER_DESCRIPTION: str = Field(
default="Send to Make.com scenario",
description="Custom description that appears when hovering over the icon.",
)
def __init__(self):
self.valves = self.Valves()
def status_object(
self,
description: str = "Unknown State",
status: str = "in_progress",
done: bool = False,
) -> Dict:
return {
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
async def action(
self,
body: dict,
__user__: dict = {},
__event_emitter__: Callable[[dict], Any] = None,
__event_call__: Callable[[dict], Any] = None,
) -> None:
if DEBUG:
print(f"Debug: Make.com Webhook action invoked")
try:
if __event_emitter__:
await __event_emitter__(
self.status_object("Initializing Make.com Webhook")
)
if not self.valves.WEBHOOK_URL:
raise ValueError("Webhook URL is not configured")
# Get the last assistant message from the conversation
messages = body.get("messages", [])
assistant_message = next(
(
message.get("content")
for message in reversed(messages)
if message.get("role") == "assistant"
),
None,
)
if not assistant_message:
raise ValueError("No assistant message found to send")
if __event_emitter__:
await __event_emitter__(
self.status_object("Sending content to Make.com")
)
# Prepare the payload
data = {
"content": assistant_message,
"timestamp": datetime.now().isoformat(),
"type": "ai_response",
"description": self.valves.HOVER_DESCRIPTION,
}
# Send to Make.com
response = requests.post(
self.valves.WEBHOOK_URL,
json=data,
headers={
"Content-Type": "application/json",
"User-Agent": "OpenWebUI-Webhook",
},
)
if response.status_code in [200, 201, 204]:
if __event_emitter__:
await __event_emitter__(
self.status_object(
"Content successfully sent to Make.com",
status="complete",
done=True,
)
)
await __event_emitter__(
{
"type": "message",
"data": {
"content": f"\n\n---\nContent sent to Make.com webhook ({self.valves.HOVER_DESCRIPTION})\n"
},
}
)
else:
raise ValueError(
f"Failed to send content. Status Code: {response.status_code}"
)
except Exception as e:
if DEBUG:
print(f"Debug: Error in action method: {str(e)}")
if __event_emitter__:
await __event_emitter__(
self.status_object(f"Error: {str(e)}", status="error", done=True)
)