NOTICE
Open WebUI Community is currently undergoing a major revamp to improve user experience and performance ✨

Tool
v0.0.2
Kubernetes
Get pod information from your Kubernetes cluster
Tool ID
kubernetes_cluster_info
Creator
@thearyadev
Downloads
204+

Tool Content
python
"""
title: Kubernetes Cluster Info
author: thearyadev
author_url: https://github.com/thearyadev
funding_url: https://github.com/thearyadev
version: 0.0.2
"""

import requests

KUBERNETES_API_URL = ""  # kube api server url
KUBERNETES_API_TOKEN = (
    ""  # token for a service account with permissions to list in this namespace
)
NAMESPACE = "default"

headers = {
    "Authorization": f"Bearer {KUBERNETES_API_TOKEN}",
    "Content-Type": "application/json",
}


def get_pods() -> dict:
    return requests.get(
        f"{KUBERNETES_API_URL}/api/v1/namespaces/{NAMESPACE}/pods",
        headers=headers,
        verify=False,
    ).json()


def get_pod_name(pod: dict) -> str:
    return pod["metadata"]["name"]


def get_pod_status(pod: dict) -> str:
    return pod["status"]["phase"]


def get_pod_node(pod: dict) -> str:
    return pod["spec"]["nodeName"]


def get_pod_events(pod: dict) -> str:
    return ";".join([str(status) for status in pod["status"]["containerStatuses"]])


class Tools:
    def __init__(self):
        self.citation = True

    def get_kubernetes_cluster_pods(self) -> str:
        """
        Get a list of Kubernetes pods with their name, status, and node.
        :return: list of pods
        """
        try:
            pod_query = get_pods()
            return "\n".join(
                [
                    f"name={get_pod_name(pod)}, status={get_pod_status(pod)}, node={get_pod_node(pod)=}"
                    for pod in pod_query["items"]
                ]
            )
        except Exception as e:
            return f"error: {e}"

    def get_non_running_pods_with_events(self) -> str:
        """
        Get a list of non-running Kubernetes pods with their name, status, node, and events
        :return: list of non running pods
        """
        try:
            pod_query = get_pods()
            return "\n".join(
                [
                    f"name={get_pod_name(pod)}, status={get_pod_status(pod)}, node={get_pod_node(pod)}, events={get_pod_events(pod)}"
                    for pod in pod_query["items"]
                    if get_pod_status(pod) != "Running"
                ]
            )
        except Exception as e:
            return f"error: {e}"