"""
title: Kubernetes Cluster Info
author: thearyadev
author_url: https://github.com/thearyadev
funding_url: https://github.com/thearyadev
version: 0.0.2
"""
import requests
KUBERNETES_API_URL = "" # kube api server url
KUBERNETES_API_TOKEN = (
"" # token for a service account with permissions to list in this namespace
)
NAMESPACE = "default"
headers = {
"Authorization": f"Bearer {KUBERNETES_API_TOKEN}",
"Content-Type": "application/json",
}
def get_pods() -> dict:
return requests.get(
f"{KUBERNETES_API_URL}/api/v1/namespaces/{NAMESPACE}/pods",
headers=headers,
verify=False,
).json()
def get_pod_name(pod: dict) -> str:
return pod["metadata"]["name"]
def get_pod_status(pod: dict) -> str:
return pod["status"]["phase"]
def get_pod_node(pod: dict) -> str:
return pod["spec"]["nodeName"]
def get_pod_events(pod: dict) -> str:
return ";".join([str(status) for status in pod["status"]["containerStatuses"]])
class Tools:
def __init__(self):
self.citation = True
def get_kubernetes_cluster_pods(self) -> str:
"""
Get a list of Kubernetes pods with their name, status, and node.
:return: list of pods
"""
try:
pod_query = get_pods()
return "\n".join(
[
f"name={get_pod_name(pod)}, status={get_pod_status(pod)}, node={get_pod_node(pod)=}"
for pod in pod_query["items"]
]
)
except Exception as e:
return f"error: {e}"
def get_non_running_pods_with_events(self) -> str:
"""
Get a list of non-running Kubernetes pods with their name, status, node, and events
:return: list of non running pods
"""
try:
pod_query = get_pods()
return "\n".join(
[
f"name={get_pod_name(pod)}, status={get_pod_status(pod)}, node={get_pod_node(pod)}, events={get_pod_events(pod)}"
for pod in pod_query["items"]
if get_pod_status(pod) != "Running"
]
)
except Exception as e:
return f"error: {e}"