Tool
Test Drupal Via API
Test for creating article in Drupal
Tool ID
test_drupal_via_api
Creator
@umaruzdanov
Downloads
10+

Tool Content
python
import requests
from requests.auth import HTTPBasicAuth
from pydantic import BaseModel, Field
from typing import Union, Dict

class DrupalPipe:
    class Valves(BaseModel):
        DRUPAL_BASE_URL: str = Field(
            default="",
            description="Base URL of your Drupal site's API endpoint."
        )
        DRUPAL_USERNAME: str = Field(
            default="",
            description="Username for authenticating requests to the Drupal API."
        )
        DRUPAL_PASSWORD: str = Field(
            default="",
            description="Password for authenticating requests to the Drupal API."
        )

    def __init__(self):
        self.type = "drupal_integration"
        self.valves = self.Valves()

    def pipe(self, body: Dict, __user__: Dict) -> Union[str, Dict]:
        """
        Create an article in Drupal via API.
        :param body: A dictionary containing the article details.
        :param __user__: User information (not used in this function but required by Open WebUI).
        :return: The API response or an error message.
        """
        if not self.valves.DRUPAL_USERNAME or not self.valves.DRUPAL_PASSWORD:
            return "Drupal credentials not provided."

        auth = HTTPBasicAuth(self.valves.DRUPAL_USERNAME, self.valves.DRUPAL_PASSWORD)

        action = body.get("action", "")
        if action == "create_article":
            endpoint = f"{self.valves.DRUPAL_BASE_URL}/node?_format=json"
            data = {
                "type": [{"target_id": "article"}],
                "title": [{"value": body.get("title", "Untitled")}],
                "body": [{"value": body.get("body", ""), "format": "plain_text"}]
            }
            headers = {
                "Content-Type": "application/json",
                "Accept": "application/json"
            }
            
            try:
                r = requests.post(url=endpoint, json=data, headers=headers, auth=auth)
                r.raise_for_status()
                return r.json()
            except requests.exceptions.RequestException as e:
                print(f"Request failed: {e}")
                return f"Error: {e}"
        else:
            return f"Unsupported action: {action}"

    def pipes(self):
        """
        Define the available pipes for this integration.
        :return: A list of available actions.
        """
        return [
            {"id": "create_article", "name": "Create Drupal Article"},
            # Add more actions as you implement them
        ]