import requests
from requests.auth import HTTPBasicAuth
from pydantic import BaseModel, Field
from typing import Union, Dict
class DrupalPipe:
class Valves(BaseModel):
DRUPAL_BASE_URL: str = Field(
default="",
description="Base URL of your Drupal site's API endpoint."
)
DRUPAL_USERNAME: str = Field(
default="",
description="Username for authenticating requests to the Drupal API."
)
DRUPAL_PASSWORD: str = Field(
default="",
description="Password for authenticating requests to the Drupal API."
)
def __init__(self):
self.type = "drupal_integration"
self.valves = self.Valves()
def pipe(self, body: Dict, __user__: Dict) -> Union[str, Dict]:
"""
Create an article in Drupal via API.
:param body: A dictionary containing the article details.
:param __user__: User information (not used in this function but required by Open WebUI).
:return: The API response or an error message.
"""
if not self.valves.DRUPAL_USERNAME or not self.valves.DRUPAL_PASSWORD:
return "Drupal credentials not provided."
auth = HTTPBasicAuth(self.valves.DRUPAL_USERNAME, self.valves.DRUPAL_PASSWORD)
action = body.get("action", "")
if action == "create_article":
endpoint = f"{self.valves.DRUPAL_BASE_URL}/node?_format=json"
data = {
"type": [{"target_id": "article"}],
"title": [{"value": body.get("title", "Untitled")}],
"body": [{"value": body.get("body", ""), "format": "plain_text"}]
}
headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
try:
r = requests.post(url=endpoint, json=data, headers=headers, auth=auth)
r.raise_for_status()
return r.json()
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
return f"Error: {e}"
else:
return f"Unsupported action: {action}"
def pipes(self):
"""
Define the available pipes for this integration.
:return: A list of available actions.
"""
return [
{"id": "create_article", "name": "Create Drupal Article"},
# Add more actions as you implement them
]