import requests
from requests.auth import HTTPBasicAuth
from pydantic import BaseModel, Field
from typing import Union, Dict, List
class WordPressPipe:
class Valves(BaseModel):
WORDPRESS_BASE_URL: str = Field(
default="",
description="Base URL of your WordPress site's REST API endpoint."
)
WORDPRESS_USERNAME: str = Field(
default="",
description="Username for authenticating requests to the WordPress REST API."
)
WORDPRESS_PASSWORD: str = Field(
default="",
description="Application password or token for authenticating requests to the WordPress REST API."
)
def __init__(self):
self.type = "wordpress_integration"
self.valves = self.Valves()
def pipe(self, body: Dict, __user__: Dict) -> Union[str, Dict]:
"""
Perform actions on WordPress via the REST API.
:param body: A dictionary containing the action details.
:param __user__: User information (not used in this function but required by Open WebUI).
:return: The API response or an error message.
"""
if not self.valves.WORDPRESS_USERNAME or not self.valves.WORDPRESS_PASSWORD:
return "WordPress credentials not provided."
auth = HTTPBasicAuth(self.valves.WORDPRESS_USERNAME, self.valves.WORDPRESS_PASSWORD)
action = body.get("action", "")
headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
if action == "create_post":
endpoint = f"{self.valves.WORDPRESS_BASE_URL}/wp-json/wp/v2/posts"
data = {
"title": body.get("title", "Untitled"),
"content": body.get("content", ""),
"status": body.get("status", "draft") # Set post status (e.g., 'publish' or 'draft')
}
try:
r = requests.post(url=endpoint, json=data, headers=headers, auth=auth)
r.raise_for_status()
return r.json()
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
return f"Error: {e}"
elif action == "update_post":
post_id = body.get("post_id")
if not post_id:
return "post_id is required for updating a post."
endpoint = f"{self.valves.WORDPRESS_BASE_URL}/wp-json/wp/v2/posts/{post_id}"
data = body.get("data", {})
try:
r = requests.post(url=endpoint, json=data, headers=headers, auth=auth)
r.raise_for_status()
return r.json()
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
return f"Error: {e}"
elif action == "delete_post":
post_id = body.get("post_id")
if not post_id:
return "post_id is required for deleting a post."
endpoint = f"{self.valves.WORDPRESS_BASE_URL}/wp-json/wp/v2/posts/{post_id}?force=true"
try:
r = requests.delete(url=endpoint, headers=headers, auth=auth)
r.raise_for_status()
return r.json()
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
return f"Error: {e}"
elif action == "create_category":
endpoint = f"{self.valves.WORDPRESS_BASE_URL}/wp-json/wp/v2/categories"
data = {
"name": body.get("name", ""),
"slug": body.get("slug", ""),
"description": body.get("description", "")
}
try:
r = requests.post(url=endpoint, json=data, headers=headers, auth=auth)
r.raise_for_status()
return r.json()
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
return f"Error: {e}"
elif action == "update_category":
category_id = body.get("category_id")
if not category_id:
return "category_id is required for updating a category."
endpoint = f"{self.valves.WORDPRESS_BASE_URL}/wp-json/wp/v2/categories/{category_id}"
data = body.get("data", {})
try:
r = requests.post(url=endpoint, json=data, headers=headers, auth=auth)
r.raise_for_status()
return r.json()
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
return f"Error: {e}"
elif action == "delete_category":
category_id = body.get("category_id")
if not category_id:
return "category_id is required for deleting a category."
endpoint = f"{self.valves.WORDPRESS_BASE_URL}/wp-json/wp/v2/categories/{category_id}"
try:
r = requests.delete(url=endpoint, headers=headers, auth=auth)
r.raise_for_status()
return r.json()
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
return f"Error: {e}"
else:
return f"Unsupported action: {action}"
def pipes(self) -> List[Dict[str, str]]:
"""
Define the available pipes for this integration.
:return: A list of available actions.
"""
return [
{"id": "create_post", "name": "Create WordPress Post"},
{"id": "update_post", "name": "Update WordPress Post"},
{"id": "delete_post", "name": "Delete WordPress Post"},
{"id": "create_category", "name": "Create WordPress Category"},
{"id": "update_category", "name": "Update WordPress Category"},
{"id": "delete_category", "name": "Delete WordPress Category"}
]
# Example usage
wp_pipe = WordPressPipe()
wp_pipe.valves.WORDPRESS_BASE_URL = "https://yourwordpresssite.com"
wp_pipe.valves.WORDPRESS_USERNAME = "your_username"
wp_pipe.valves.WORDPRESS_PASSWORD = "your_application_password"
# Example action: Create a post
response = wp_pipe.pipe({
"action": "create_post",
"title": "Test Post",
"content": "This is a test post created via the WordPress REST API.",
"status": "publish"
}, {})
print(response)
# Example action: Update a post
response = wp_pipe.pipe({
"action": "update_post",
"post_id": 123, # Replace with the actual post ID
"data": {
"title": "Updated Post Title",
"content": "Updated content for the post."
}
}, {})
print(response)
# Example action: Delete a post
response = wp_pipe.pipe({
"action": "delete_post",
"post_id": 123 # Replace with the actual post ID
}, {})
print(response)
# Example action: Create a category
response = wp_pipe.pipe({
"action": "create_category",
"name": "New Category",
"slug": "new-category",
"description": "Description for the new category."
}, {})
print(response)
# Example action: Update a category
response = wp_pipe.pipe({
"action": "update_category",
"category_id": 456, # Replace with the actual category ID
"data": {
"name": "Updated Category Name",
"description": "Updated description for the category."
}
}, {})
print(response)
# Example action: Delete a category
response = wp_pipe.pipe({
"action": "delete_category",
"category_id": 456 # Replace with the actual category ID
}, {})
print(response)