import requests
class N8nWebhook:
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
def send_data(self, data: dict) -> dict:
"""
Sends data to the n8n webhook and returns the response.
:param data: A dictionary containing the data to be sent to the webhook.
:return: A dictionary containing the webhook response.
"""
try:
response = requests.post(self.webhook_url, json=data)
response.raise_for_status()
return response.json() # Assuming n8n sends a JSON response
except requests.exceptions.HTTPError as err:
print(f"HTTP error occurred: {err}")
return {"error": str(err)}
except Exception as err:
print(f"Other error occurred: {err}")
return {"error": str(err)}
# Example usage
if __name__ == "__main__":
webhook_url = "https://your-n8n-instance.com/webhook/your-webhook-id" # Replace with your actual n8n webhook URL
n8n = N8nWebhook(webhook_url=webhook_url)
# Sample data to send to the n8n workflow
data = {
"name": "John Doe",
"email": "johndoe@example.com",
"action": "create_post",
"title": "Sample Post",
"content": "This is a sample post content sent via n8n webhook."
}
# Sending data to the n8n webhook
response = n8n.send_data(data)
print("Webhook Response:", response)