# Directory Structure ``` ├── .gitignore ├── .python-version ├── assets │ └── logo.svg ├── LICENSE ├── Makefile ├── pyproject.toml ├── README.md ├── server.py ├── test_server.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 3.13 ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] *$py.class # C extensions *.so # Distribution / packaging .Python build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ wheels/ share/python-wheels/ *.egg-info/ .installed.cfg *.egg MANIFEST # Installer logs pip-log.txt pip-delete-this-directory.txt # Unit test / coverage reports htmlcov/ .tox/ .nox/ .coverage .coverage.* .cache nosetests.xml coverage.xml *.cover *.py,cover .hypothesis/ .pytest_cache/ cover/ # Translations *.mo *.pot # Django stuff: *.log local_settings.py db.sqlite3 db.sqlite3-journal # Flask stuff: instance/ .webassets-cache # Scrapy stuff: .scrapy # Sphinx documentation docs/_build/ # PyBuilder .pybuilder/ target/ # Jupyter Notebook .ipynb_checkpoints # IPython profile_default/ ipython_config.py # pyenv # For a library or package, you might want to ignore these files since the code is # intended to run in multiple environments; otherwise, check them in: # .python-version # pipenv # According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. # However, in case of collaboration, if having platform-specific dependencies or dependencies # having no cross-platform support, pipenv may install dependencies that don't work, or not # install all needed dependencies. #Pipfile.lock # UV # Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. # This is especially recommended for binary packages to ensure reproducibility, and is more # commonly ignored for libraries. #uv.lock # poetry # Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. # This is especially recommended for binary packages to ensure reproducibility, and is more # commonly ignored for libraries. # https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control #poetry.lock # pdm # Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. #pdm.lock # pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it # in version control. # https://pdm.fming.dev/latest/usage/project/#working-with-version-control .pdm.toml .pdm-python .pdm-build/ # PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm __pypackages__/ # Celery stuff celerybeat-schedule celerybeat.pid # SageMath parsed files *.sage.py # Environments .env .venv env/ venv/ ENV/ env.bak/ venv.bak/ # mkdocs documentation /site # mypy .mypy_cache/ .dmypy.json dmypy.json # Pyre type checker .pyre/ # pytype static type analyzer .pytype/ # Cython debug symbols cython_debug/ # PyCharm # JetBrains specific template is maintained in a separate JetBrains.gitignore that can # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ # Custom .DS_Store .vscode/ node_modules/ theme/docs/* .ruff_cache/ ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown <img src="./assets/logo.svg" alt="Allseer Logo" width="400" height="400" /> # k8s-mcp [](https://smithery.ai/server/@vlttnv/k8s-mcp) A Python-based, read-only [Model Context Protocol (MCP)](https://modelcontextprotocol.io/introduction) server for Kubernetes clusters that exposes a comprehensive API to retrieve cluster information and diagnose issues. [Example chat using Claude](https://claude.ai/share/90ae39d3-a0c1-4065-ab79-45950b6b4806) ## Installation ### Prerequisites - Python 3.8+ - Access to a Kubernetes cluster (via kubeconfig or in-cluster configuration) - Required Python packages (see `dependencies` in `pyproject.toml`) - uv - https://github.com/astral-sh/uv ```bash # To install uv curl -LsSf https://astral.sh/uv/install.sh | sh ``` ```bash # Clone the repository git clone [email protected]:vlttnv/k8s-mcp.git cd k8s-mcp # Install dependencies uv venv source .venv/bin/activate uv sync ``` If using Claude configure open your Claude for Desktop App configuration at ~/Library/Application Support/Claude/claude_desktop_config.json in a text editor. Make sure to create the file if it doesn’t exist. ```bash code ~/Library/Application\ Support/Claude/claude_desktop_config.json ``` ```json { "mcpServers": { "k8s-mcp": { "command": "uv", "args": [ "--directory", "/ABSOLUTE/PATH/TO/PARENT/FOLDER/k8s-mcp", "run", "server.py" ] } } } ``` > You may need to put the full path to the uv executable in the command field. You can get this by running which uv on MacOS/Linux or where uv on Windows. ## Configuration The application automatically tries two methods to connect to your Kubernetes cluster: 1. **Kubeconfig File**: Uses your local kubeconfig file (typically located at `~/.kube/config`) 2. **In-Cluster Configuration**: If running inside a Kubernetes pod, uses the service account token No additional configuration is required if your kubeconfig is properly set up or if you're running inside a cluster with appropriate RBAC permissions. ## Usage ### Examples Here are some useful example prompts you can ask Claude about your Kubernetes cluster and its resources: #### General Cluster Status - "What's the overall health of my cluster?" - "Show me all namespaces in my cluster" - "What nodes are available in my cluster and what's their status?" - "How is resource utilization across my nodes?" #### Pods and Deployments - "List all pods in the production namespace" - "Are there any pods in CrashLoopBackOff state?" - "Show me pods with high restart counts" - "List all deployments across all namespaces" - "What deployments are failing to progress?" #### Debugging Issues - "Why is my pod in the staging namespace failing?" - "Get the YAML configuration for the service in the production namespace" - "Show me recent events in the default namespace" - "Are there any pods stuck in Pending state?" - "What's causing ImagePullBackOff errors in my cluster?" #### Resource Management - "Show me the resource consumption of nodes in my cluster" - "Are there any orphaned resources I should clean up?" - "List all services in the production namespace" - "Compare resource requests between staging and production" #### Specific Resource Inspection - "Show me the config for the coredns deployment in kube-system" - "Get details of the reverse-proxy service in staging" - "What containers are running in the pod xyz?" - "Show me the logs for the failing pod" ## API Reference ### Namespaces - `get_namespaces()`: List all available namespaces in the cluster ### Pods - `list_pods(namespace=None)`: List all pods, optionally filtered by namespace - `failed_pods()`: List all pods in Failed or Error state - `pending_pods()`: List all pods in Pending state with reasons - `high_restart_pods(restart_threshold=5)`: Find pods with restart counts above threshold ### Nodes - `list_nodes()`: List all nodes and their status - `node_capacity()`: Show available capacity on all nodes ### Deployments & Services - `list_deployments(namespace=None)`: List all deployments - `list_services(namespace=None)`: List all services - `list_events(namespace=None)`: List all events ### Resource Management - `orphaned_resources()`: List resources without owner references - `get_resource_yaml(namespace, resource_type, resource_name)`: Get YAML configuration for a specific resource ## License [MIT License](LICENSE) ## Contributing Contributions are welcome! Please feel free to submit a Pull Request. ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "k8s-mcp" version = "0.1.0" description = "A read-only Model Context Protocol (MCP) for querying Kubernetes clusters." readme = "README.md" requires-python = ">=3.13" dependencies = [ "kubernetes>=32.0.1", "mcp[cli]>=1.3.0", "pytest>=8.3.5", "pyyaml>=6.0.2", ] [dependency-groups] dev = [ "ruff>=0.9.9", ] [tool.ruff.lint] select = [ # isort "I", ] ``` -------------------------------------------------------------------------------- /assets/logo.svg: -------------------------------------------------------------------------------- ``` <svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 400 400"> <!-- Background --> <rect width="400" height="400" rx="12" ry="12" fill="#f8fafc" /> <!-- Server stack background --> <rect x="120" y="100" width="160" height="180" rx="8" ry="8" fill="#ebf5ff" stroke="#2563eb" stroke-width="2" /> <!-- Server components --> <rect x="140" y="125" width="120" height="30" rx="4" ry="4" fill="#ffffff" stroke="#2563eb" stroke-width="2" /> <rect x="140" y="165" width="120" height="30" rx="4" ry="4" fill="#ffffff" stroke="#2563eb" stroke-width="2" /> <rect x="140" y="205" width="120" height="30" rx="4" ry="4" fill="#ffffff" stroke="#2563eb" stroke-width="2" /> <!-- Data points on servers --> <circle cx="155" cy="140" r="4" fill="#2563eb" /> <circle cx="170" cy="140" r="4" fill="#2563eb" /> <circle cx="155" cy="180" r="4" fill="#2563eb" /> <circle cx="170" cy="180" r="4" fill="#2563eb" /> <circle cx="155" cy="220" r="4" fill="#2563eb" /> <circle cx="170" cy="220" r="4" fill="#2563eb" /> <!-- Status indicators --> <circle cx="245" cy="140" r="6" fill="#10b981" /> <circle cx="245" cy="180" r="6" fill="#10b981" /> <circle cx="245" cy="220" r="6" fill="#10b981" /> <!-- Python logo (simplified) --> <!-- <g transform="translate(200, 80) scale(0.6)"> <path d="M-20,-35 C-20,-43.28 -13.28,-50 -5,-50 L5,-50 C13.28,-50 20,-43.28 20,-35 L20,-15 C20,-6.72 13.28,0 5,0 L-5,0 C-13.28,0 -20,-6.72 -20,-15 Z" fill="#366a96" /> <path d="M-20,35 C-20,43.28 -13.28,50 -5,50 L5,50 C13.28,50 20,43.28 20,35 L20,15 C20,6.72 13.28,0 5,0 L-5,0 C-13.28,0 -20,6.72 -20,15 Z" fill="#ffd43b" /> <circle cx="-10" cy="-25" r="5" fill="#ffffff" /> <circle cx="-10" cy="25" r="5" fill="#ffffff" /> </g> --> <!-- Kubernetes wheel (simplified) --> <g transform="translate(200, 260) scale(0.7)"> <circle cx="0" cy="0" r="40" fill="#326ce5" opacity="0.2" /> <circle cx="0" cy="0" r="30" fill="#326ce5" opacity="0.3" /> <g stroke="#326ce5" stroke-width="6" stroke-linecap="round"> <line x1="0" y1="-40" x2="0" y2="-60" /> <line x1="28" y1="-28" x2="42" y2="-42" /> <line x1="40" y1="0" x2="60" y2="0" /> <line x1="28" y1="28" x2="42" y2="42" /> <line x1="0" y1="40" x2="0" y2="60" /> <line x1="-28" y1="28" x2="-42" y2="42" /> <line x1="-40" y1="0" x2="-60" y2="0" /> <line x1="-28" y1="-28" x2="-42" y2="-42" /> </g> </g> <!-- API connections --> <path d="M95,150 C75,150 75,180 95,180" fill="none" stroke="#7c3aed" stroke-width="2.5" stroke-dasharray="5,3" /> <path d="M95,180 C75,180 75,210 95,210" fill="none" stroke="#7c3aed" stroke-width="2.5" stroke-dasharray="5,3" /> <path d="M305,150 C325,150 325,180 305,180" fill="none" stroke="#7c3aed" stroke-width="2.5" stroke-dasharray="5,3" /> <path d="M305,180 C325,180 325,210 305,210" fill="none" stroke="#7c3aed" stroke-width="2.5" stroke-dasharray="5,3" /> <!-- API endpoint indicators --> <circle cx="95" cy="150" r="4" fill="#7c3aed" /> <circle cx="95" cy="180" r="4" fill="#7c3aed" /> <circle cx="95" cy="210" r="4" fill="#7c3aed" /> <circle cx="305" cy="150" r="4" fill="#7c3aed" /> <circle cx="305" cy="180" r="4" fill="#7c3aed" /> <circle cx="305" cy="210" r="4" fill="#7c3aed" /> <!-- MCP Badge --> <g transform="translate(200,260)"> <rect x="-45" y="-20" width="90" height="40" rx="20" ry="20" fill="#ffffff" stroke="#7c3aed" stroke-width="2.5" /> <text x="0" y="8" font-family="Arial, sans-serif" font-size="18" font-weight="bold" text-anchor="middle" fill="#7c3aed">MCP</text> </g> <!-- Title and description --> <text x="200" y="330" font-family="Arial, sans-serif" font-weight="bold" font-size="28" text-anchor="middle" fill="#1e293b">k8s-mcp</text> <text x="200" y="355" font-family="Arial, sans-serif" font-size="14" text-anchor="middle" fill="#475569">Read-only MCP server for Kubernetes clusters</text> </svg> ``` -------------------------------------------------------------------------------- /test_server.py: -------------------------------------------------------------------------------- ```python import unittest from unittest.mock import patch, MagicMock import json import datetime import asyncio from kubernetes.client.rest import ApiException # Import the module to be tested import server class AsyncTestCase(unittest.TestCase): """Base class for testing async functions.""" def run_async(self, coro): """Helper method to run coroutines in tests.""" return asyncio.run(coro) class TestKubernetesServer(AsyncTestCase): """Test cases for Kubernetes monitoring server functions.""" def setUp(self): """Set up test fixtures.""" # Mock configuration and API clients self.mock_config = patch("server.config").start() self.mock_core_v1 = patch("server.core_v1").start() self.mock_apps_v1 = patch("server.apps_v1").start() self.mock_batch_v1 = patch("server.batch_v1").start() self.mock_custom_objects = patch("server.custom_objects").start() # Mock FastMCP server self.mock_mcp = patch("server.mcp").start() def tearDown(self): """Tear down test fixtures.""" patch.stopall() def test_get_namespaces(self): """Test get_namespaces function.""" # Create mock namespace items mock_namespace1 = MagicMock() mock_namespace1.metadata.name = "default" mock_namespace1.status.phase = "Active" mock_namespace1.metadata.creation_timestamp = datetime.datetime( 2023, 1, 1, 12, 0, 0 ) mock_namespace2 = MagicMock() mock_namespace2.metadata.name = "kube-system" mock_namespace2.status.phase = "Active" mock_namespace2.metadata.creation_timestamp = datetime.datetime( 2023, 1, 1, 12, 0, 0 ) # Set up mock response mock_response = MagicMock() mock_response.items = [mock_namespace1, mock_namespace2] self.mock_core_v1.list_namespace.return_value = mock_response # Call the async function result = asyncio.run(server.get_namespaces()) # Verify the response namespaces = json.loads(result) self.assertEqual(len(namespaces), 2) self.assertEqual(namespaces[0]["name"], "default") self.assertEqual(namespaces[1]["name"], "kube-system") # Verify the API was called self.mock_core_v1.list_namespace.assert_called_once() def test_get_namespaces_error(self): """Test get_namespaces function with API error.""" # Simulate API exception self.mock_core_v1.list_namespace.side_effect = ApiException( status=403, reason="Forbidden" ) # Call the function result_tuple = asyncio.run(server.get_namespaces()) # If the function returns a tuple if isinstance(result_tuple, tuple): result, status_code = result_tuple else: # If function returns just the error JSON result = result_tuple status_code = 500 # Assuming default error code # Verify error response error_response = json.loads(result) self.assertEqual(status_code, 500) self.assertIn("error", error_response) def test_list_pods(self): """Test list_pods function with namespace parameter.""" # Create mock pod items mock_pod = MagicMock() mock_pod.metadata.name = "test-pod" mock_pod.metadata.namespace = "default" mock_pod.status.phase = "Running" mock_pod.status.pod_ip = "10.0.0.1" mock_pod.spec.node_name = "node1" mock_pod.metadata.creation_timestamp = datetime.datetime(2023, 1, 1, 12, 0, 0) # Create mock container mock_container = MagicMock() mock_container.name = "test-container" mock_container.image = "nginx:latest" mock_pod.spec.containers = [mock_container] # Create mock container status mock_container_status = MagicMock() mock_container_status.name = "test-container" mock_container_status.container_id = "container123" mock_pod.status.container_statuses = [mock_container_status] # Set up mock response mock_response = MagicMock() mock_response.items = [mock_pod] self.mock_core_v1.list_namespaced_pod.return_value = mock_response # Call the function with namespace result = server.list_pods(namespace="default") # Verify the response pods = json.loads(result) self.assertEqual(len(pods), 1) self.assertEqual(pods[0]["name"], "test-pod") self.assertEqual(pods[0]["namespace"], "default") self.assertEqual(pods[0]["containers"][0]["name"], "test-container") self.assertTrue(pods[0]["containers"][0]["ready"]) # Verify the API was called with correct namespace self.mock_core_v1.list_namespaced_pod.assert_called_once_with("default") def test_list_pods_all_namespaces(self): """Test list_pods function without namespace parameter.""" # Set up mock response mock_response = MagicMock() mock_response.items = [] self.mock_core_v1.list_pod_for_all_namespaces.return_value = mock_response # Call the function without namespace result = server.list_pods() # noqa: F841 # Verify the API was called for all namespaces self.mock_core_v1.list_pod_for_all_namespaces.assert_called_once() def test_list_nodes(self): """Test list_nodes function.""" # Create mock node mock_node = MagicMock() mock_node.metadata.name = "node1" # Mock node conditions mock_condition = MagicMock() mock_condition.type = "Ready" mock_condition.status = "True" mock_node.status.conditions = [mock_condition] # Mock node addresses mock_address = MagicMock() mock_address.type = "InternalIP" mock_address.address = "192.168.1.1" mock_node.status.addresses = [mock_address] # Mock node capacity mock_node.status.capacity = {"cpu": "4", "memory": "8Gi", "pods": "110"} mock_node.status.allocatable = {"cpu": "3800m", "memory": "7Gi", "pods": "100"} # Mock node info mock_node.status.node_info = MagicMock() mock_node.status.node_info.kubelet_version = "v1.25.0" # Set up mock response mock_response = MagicMock() mock_response.items = [mock_node] self.mock_core_v1.list_node.return_value = mock_response # Call the function result = server.list_nodes() # Verify the response nodes = json.loads(result) self.assertEqual(len(nodes), 1) self.assertEqual(nodes[0]["name"], "node1") self.assertEqual(nodes[0]["conditions"]["Ready"], "True") self.assertEqual(nodes[0]["addresses"]["InternalIP"], "192.168.1.1") self.assertEqual(nodes[0]["capacity"]["cpu"], "4") self.assertEqual(nodes[0]["allocatable"]["memory"], "7Gi") self.assertEqual(nodes[0]["kubelet_version"], "v1.25.0") # Verify the API was called self.mock_core_v1.list_node.assert_called_once() def test_failed_pods(self): """Test failed_pods function.""" # Create mock failed pod mock_pod = MagicMock() mock_pod.metadata.name = "failed-pod" mock_pod.metadata.namespace = "default" mock_pod.status.phase = "Failed" mock_pod.spec.node_name = "node1" mock_pod.status.message = "Pod failed" mock_pod.status.reason = "Error" # Create mock container status mock_container_status = MagicMock() mock_container_status.name = "test-container" mock_container_status.restart_count = 3 # Create mock container state mock_container_status.state = MagicMock() mock_container_status.state.waiting = MagicMock() mock_container_status.state.waiting.reason = "CrashLoopBackOff" mock_container_status.state.waiting.message = "Container crashed" mock_container_status.state.terminated = None mock_pod.status.container_statuses = [mock_container_status] # Set up mock response mock_response = MagicMock() mock_response.items = [mock_pod] self.mock_core_v1.list_pod_for_all_namespaces.return_value = mock_response # Call the function result = server.failed_pods() # Verify the response failed = json.loads(result) self.assertEqual(len(failed), 1) self.assertEqual(failed[0]["name"], "failed-pod") self.assertEqual(failed[0]["phase"], "Failed") self.assertEqual(failed[0]["container_statuses"][0]["name"], "test-container") self.assertEqual( failed[0]["container_statuses"][0]["state"]["reason"], "CrashLoopBackOff" ) # Verify the API was called self.mock_core_v1.list_pod_for_all_namespaces.assert_called_once() def test_get_resource_yaml(self): """Test get_resource_yaml function.""" # Create mock API client mock_api_client = MagicMock() server.client.ApiClient.return_value = mock_api_client # Create mock resource mock_resource = MagicMock() self.mock_core_v1.read_namespaced_pod.return_value = mock_resource # Set up serialization mock_dict = { "apiVersion": "v1", "kind": "Pod", "metadata": {"name": "test-pod"}, } mock_api_client.sanitize_for_serialization.return_value = mock_dict # Mock the yaml dump function to ensure consistent output with patch("server.yaml.dump") as mock_yaml_dump: mock_yaml_dump.return_value = ( "apiVersion: v1\nkind: Pod\nmetadata:\n name: test-pod\n" ) # Call the function result = server.get_resource_yaml("default", "pod", "test-pod") # Verify YAML output is what we expect based on our mock self.assertEqual( result, "apiVersion: v1\nkind: Pod\nmetadata:\n name: test-pod\n" ) # Verify yaml.dump was called with the correct parameters mock_yaml_dump.assert_called_once_with(mock_dict, default_flow_style=False) # Verify API calls self.mock_core_v1.read_namespaced_pod.assert_called_once_with( "test-pod", "default" ) mock_api_client.sanitize_for_serialization.assert_called_once_with( mock_resource ) def test_get_resource_yaml_unsupported_type(self): """Test get_resource_yaml function with unsupported resource type.""" # Call the function with unsupported type result, status_code = server.get_resource_yaml( "default", "unknown", "resource-name" ) # Verify error response error_response = json.loads(result) self.assertEqual(status_code, 400) self.assertIn("error", error_response) self.assertIn("Unsupported resource type", error_response["error"]) def test_format_bytes(self): """Test format_bytes helper function.""" # Test various sizes self.assertEqual(server.format_bytes(500), "500 B") self.assertEqual(server.format_bytes(1024), "1024 B") self.assertEqual(server.format_bytes(1536), "1.5 KiB") self.assertEqual(server.format_bytes(2 * 1024 * 1024), "2.0 MiB") self.assertEqual(server.format_bytes(3 * 1024 * 1024 * 1024), "3.0 GiB") if __name__ == "__main__": unittest.main() ``` -------------------------------------------------------------------------------- /server.py: -------------------------------------------------------------------------------- ```python """ Kubernetes Monitoring API Server This module provides a FastMCP server that exposes Kubernetes monitoring APIs. It connects to a Kubernetes cluster and provides endpoints to query various cluster resources including pods, services, deployments, nodes, and events. Dependencies: - kubernetes: Python client for Kubernetes - yaml: For YAML serialization - FastMCP: Server framework for API endpoints """ import yaml import json from datetime import datetime from kubernetes import client, config from kubernetes.client.rest import ApiException from mcp.server.fastmcp import FastMCP # Initialize FastMCP server mcp = FastMCP("k8s") # Kubernetes client configuration try: # Try to load from default kubeconfig config.load_kube_config() except Exception: # If running inside a pod try: config.load_incluster_config() except Exception as e: print(f"Failed to configure Kubernetes client: {e}") exit(1) # Initialize API clients core_v1 = client.CoreV1Api() apps_v1 = client.AppsV1Api() batch_v1 = client.BatchV1Api() custom_objects = client.CustomObjectsApi() @mcp.tool() def get_namespaces(): """ List all namespaces in the Kubernetes cluster. Returns: str: JSON string containing an array of namespace objects with fields: - name (str): Name of the namespace - status (str): Phase of the namespace (Active, Terminating) - creation_time (str): Timestamp when namespace was created Raises: ApiException: If there is an error communicating with the Kubernetes API """ try: namespaces = core_v1.list_namespace() result = [] for ns in namespaces.items: result.append( { "name": ns.metadata.name, "status": ns.status.phase, "creation_time": ns.metadata.creation_timestamp.strftime( "%Y-%m-%d %H:%M:%S" ) if ns.metadata.creation_timestamp else None, } ) return json.dumps(result) except ApiException as e: return json.dumps({"error": str(e)}), 500 @mcp.tool() def list_pods(namespace=None): """ Lists all pods in the specified Kubernetes namespace or across all namespaces. Retrieves detailed information about pods including their status, containers, and hosting node. Args: namespace (str, optional): The namespace to filter pods by. If None, pods from all namespaces will be returned. Defaults to None. Returns: str: JSON string containing an array of pod objects with fields: - name (str): Name of the pod - namespace (str): Namespace where the pod is running - phase (str): Current phase of the pod (Running, Pending, etc.) - ip (str): Pod IP address - node (str): Name of the node running this pod - containers (list): List of containers in the pod with their status - creation_time (str): Timestamp when pod was created Raises: ApiException: If there is an error communicating with the Kubernetes API """ try: if namespace: pods = core_v1.list_namespaced_pod(namespace) else: pods = core_v1.list_pod_for_all_namespaces() result = [] for pod in pods.items: containers = [] for container in pod.spec.containers: containers.append( { "name": container.name, "image": container.image, "ready": any( s.container_id is not None and s.name == container.name for s in pod.status.container_statuses ) if pod.status.container_statuses else False, } ) result.append( { "name": pod.metadata.name, "namespace": pod.metadata.namespace, "phase": pod.status.phase, "ip": pod.status.pod_ip, "node": pod.spec.node_name, "containers": containers, "creation_time": pod.metadata.creation_timestamp.strftime( "%Y-%m-%d %H:%M:%S" ) if pod.metadata.creation_timestamp else None, } ) return json.dumps(result) except ApiException as e: return json.dumps({"error": str(e)}), 500 @mcp.tool() def list_nodes(): """List all nodes and their status""" try: nodes = core_v1.list_node() result = [] for node in nodes.items: conditions = {} for condition in node.status.conditions: conditions[condition.type] = condition.status addresses = {} for address in node.status.addresses: addresses[address.type] = address.address # Get capacity and allocatable resources capacity = { "cpu": node.status.capacity.get("cpu"), "memory": node.status.capacity.get("memory"), "pods": node.status.capacity.get("pods"), } allocatable = { "cpu": node.status.allocatable.get("cpu"), "memory": node.status.allocatable.get("memory"), "pods": node.status.allocatable.get("pods"), } result.append( { "name": node.metadata.name, "conditions": conditions, "addresses": addresses, "capacity": capacity, "allocatable": allocatable, "kubelet_version": node.status.node_info.kubelet_version if node.status.node_info else None, } ) return json.dumps(result) except ApiException as e: return json.dumps({"error": str(e)}), 500 @mcp.tool() def list_deployments(namespace=None): """ List deployments with optional namespace filter Args: namespaces (list, optional): A list of namespace names to filter pods by. If None, pods from all namespaces will be returned. Defaults to None. """ try: if namespace: deployments = apps_v1.list_namespaced_deployment(namespace) else: deployments = apps_v1.list_deployment_for_all_namespaces() result = [] for deployment in deployments.items: result.append( { "name": deployment.metadata.name, "namespace": deployment.metadata.namespace, "replicas": deployment.spec.replicas, "available_replicas": deployment.status.available_replicas, "ready_replicas": deployment.status.ready_replicas, "strategy": deployment.spec.strategy.type, "creation_time": deployment.metadata.creation_timestamp.strftime( "%Y-%m-%d %H:%M:%S" ) if deployment.metadata.creation_timestamp else None, } ) return json.dumps(result) except ApiException as e: return json.dumps({"error": str(e)}), 500 @mcp.tool() def list_services(namespace=None): """ List services with optional namespace filter Args: namespaces (list, optional): A list of namespace names to filter pods by. If None, pods from all namespaces will be returned. Defaults to None. """ try: if namespace: services = core_v1.list_namespaced_service(namespace) else: services = core_v1.list_service_for_all_namespaces() result = [] for service in services.items: ports = [] for port in service.spec.ports: ports.append( { "name": port.name, "port": port.port, "target_port": port.target_port, "protocol": port.protocol, "node_port": port.node_port if hasattr(port, "node_port") else None, } ) result.append( { "name": service.metadata.name, "namespace": service.metadata.namespace, "type": service.spec.type, "cluster_ip": service.spec.cluster_ip, "external_ip": service.spec.external_i_ps if hasattr(service.spec, "external_i_ps") else None, "ports": ports, "selector": service.spec.selector, "creation_time": service.metadata.creation_timestamp.strftime( "%Y-%m-%d %H:%M:%S" ) if service.metadata.creation_timestamp else None, } ) return json.dumps(result) except ApiException as e: return json.dumps({"error": str(e)}), 500 @mcp.tool() def list_events(namespace=None): """ List events with optional namespace filter Args: namespaces (list, optional): A list of namespace names to filter pods by. If None, pods from all namespaces will be returned. Defaults to None. """ try: if namespace: events = core_v1.list_namespaced_event(namespace) else: events = core_v1.list_event_for_all_namespaces() result = [] for event in events.items: result.append( { "type": event.type, "reason": event.reason, "message": event.message, "object": f"{event.involved_object.kind}/{event.involved_object.name}", "namespace": event.metadata.namespace, "count": event.count, "first_time": event.first_timestamp.strftime("%Y-%m-%d %H:%M:%S") if event.first_timestamp else None, "last_time": event.last_timestamp.strftime("%Y-%m-%d %H:%M:%S") if event.last_timestamp else None, } ) # Sort by last_time (newest first) # TODO: fix issue with sorting # result.sort(key=lambda x: x.get("last_time", ""), reverse=True) return json.dumps(result) except ApiException as e: return json.dumps({"error": str(e)}), 500 @mcp.tool() def failed_pods(): """ List all pods in Failed or Error state across all namespaces. Identifies pods that are in a failed state, including those in CrashLoopBackOff, ImagePullBackOff, or other error states. Provides detailed container status information to aid in troubleshooting. Returns: str: JSON string containing an array of failed pod objects with fields: - name (str): Name of the pod - namespace (str): Namespace where the pod is running - phase (str): Current phase of the pod - container_statuses (list): Detailed status of each container including state, reason, exit codes, and restart counts - node (str): Name of the node running this pod - message (str): Status message from the pod, if any - reason (str): Reason for the current status, if any Raises: ApiException: If there is an error communicating with the Kubernetes API """ try: pods = core_v1.list_pod_for_all_namespaces() failed = [] for pod in pods.items: if pod.status.phase in ["Failed", "Error"] or any( s.state and s.state.waiting and s.state.waiting.reason in ["CrashLoopBackOff", "ImagePullBackOff", "ErrImagePull"] for s in pod.status.container_statuses if s.state and s.state.waiting ): container_statuses = [] if pod.status.container_statuses: for s in pod.status.container_statuses: state = {} if s.state.waiting: state = { "status": "waiting", "reason": s.state.waiting.reason, "message": s.state.waiting.message, } elif s.state.terminated: state = { "status": "terminated", "reason": s.state.terminated.reason, "exit_code": s.state.terminated.exit_code, "message": s.state.terminated.message, } container_statuses.append( { "name": s.name, "state": state, "restart_count": s.restart_count, } ) failed.append( { "name": pod.metadata.name, "namespace": pod.metadata.namespace, "phase": pod.status.phase, "container_statuses": container_statuses, "node": pod.spec.node_name, "message": pod.status.message if pod.status.message else None, "reason": pod.status.reason if pod.status.reason else None, } ) return json.dumps(failed) except ApiException as e: return json.dumps({"error": str(e)}), 500 @mcp.tool() def pending_pods(): """List all pods in Pending state and why they're pending""" try: pods = core_v1.list_pod_for_all_namespaces() pending = [] for pod in pods.items: if pod.status.phase == "Pending": # Check for events related to this pod events = core_v1.list_namespaced_event( pod.metadata.namespace, field_selector=f"involvedObject.name={pod.metadata.name},involvedObject.kind=Pod", ) pending_reason = "Unknown" pending_message = None # Get the latest event that might explain why it's pending if events.items: latest_event = max( events.items, key=lambda e: e.last_timestamp if e.last_timestamp else datetime.min, ) pending_reason = latest_event.reason pending_message = latest_event.message pending.append( { "name": pod.metadata.name, "namespace": pod.metadata.namespace, "node": pod.spec.node_name, "reason": pending_reason, "message": pending_message, "creation_time": pod.metadata.creation_timestamp.strftime( "%Y-%m-%d %H:%M:%S" ) if pod.metadata.creation_timestamp else None, } ) return json.dumps(pending) except ApiException as e: return json.dumps({"error": str(e)}), 500 @mcp.tool() def high_restart_pods(restart_threshold=5): """ Find pods with high restart counts (>5) Args: restart_threshold (int, optional): The minimum number of restarts required to include a pod in the results. Defaults to 5. """ try: pods = core_v1.list_pod_for_all_namespaces() high_restart = [] for pod in pods.items: high_restart_containers = [] if pod.status.container_statuses: for status in pod.status.container_statuses: if status.restart_count > restart_threshold: high_restart_containers.append( { "name": status.name, "restart_count": status.restart_count, "ready": status.ready, "image": status.image, } ) if high_restart_containers: high_restart.append( { "name": pod.metadata.name, "namespace": pod.metadata.namespace, "node": pod.spec.node_name, "containers": high_restart_containers, } ) return json.dumps(high_restart) except ApiException as e: return json.dumps({"error": str(e)}), 500 @mcp.tool() def node_capacity(): """ Show available capacity and resource utilization on all nodes. Calculates the current resource usage across all nodes, including: - Pod count vs. maximum pods per node - CPU requests vs. allocatable CPU - Memory requests vs. allocatable memory The function provides both raw values and percentage utilization to help identify nodes approaching resource limits. Returns: str: JSON string containing an array of node capacity objects with fields: - name (str): Name of the node - pods (dict): Pod capacity information - used (int): Number of pods running on the node - capacity (int): Maximum number of pods the node can run - percent_used (float): Percentage of pod capacity in use - cpu (dict): CPU resource information - requested (float): CPU cores requested by pods - allocatable (float): CPU cores available on the node - percent_used (float): Percentage of CPU capacity in use - memory (dict): Memory resource information - requested (int): Memory requested by pods in bytes - requested_human (str): Human-readable memory requested - allocatable (int): Memory available on the node in bytes - allocatable_human (str): Human-readable allocatable memory - percent_used (float): Percentage of memory capacity in use - conditions (dict): Node condition statuses Raises: ApiException: If there is an error communicating with the Kubernetes API """ try: nodes = core_v1.list_node() pods = core_v1.list_pod_for_all_namespaces() # Group pods by node node_pods = {} for pod in pods.items: if pod.spec.node_name: if pod.spec.node_name not in node_pods: node_pods[pod.spec.node_name] = [] node_pods[pod.spec.node_name].append(pod) results = [] for node in nodes.items: # Calculate pod count pod_count = len(node_pods.get(node.metadata.name, [])) max_pods = int(node.status.allocatable.get("pods", 0)) # Calculate CPU and memory utilization (rough estimate) node_pods_list = node_pods.get(node.metadata.name, []) cpu_request = 0 memory_request = 0 for pod in node_pods_list: for container in pod.spec.containers: if container.resources and container.resources.requests: if container.resources.requests.get("cpu"): cpu_str = container.resources.requests.get("cpu") if cpu_str.endswith("m"): cpu_request += int(cpu_str[:-1]) / 1000 else: cpu_request += float(cpu_str) if container.resources.requests.get("memory"): mem_str = container.resources.requests.get("memory") # Convert to bytes (rough approximation) if mem_str.endswith("Ki"): memory_request += int(mem_str[:-2]) * 1024 elif mem_str.endswith("Mi"): memory_request += int(mem_str[:-2]) * 1024 * 1024 elif mem_str.endswith("Gi"): memory_request += int(mem_str[:-2]) * 1024 * 1024 * 1024 else: memory_request += int(mem_str) # Convert allocatable CPU to cores cpu_allocatable = node.status.allocatable.get("cpu", "0") if cpu_allocatable.endswith("m"): cpu_allocatable = int(cpu_allocatable[:-1]) / 1000 else: cpu_allocatable = float(cpu_allocatable) # Convert allocatable memory to bytes mem_allocatable = node.status.allocatable.get("memory", "0") mem_bytes = 0 if mem_allocatable.endswith("Ki"): mem_bytes = int(mem_allocatable[:-2]) * 1024 elif mem_allocatable.endswith("Mi"): mem_bytes = int(mem_allocatable[:-2]) * 1024 * 1024 elif mem_allocatable.endswith("Gi"): mem_bytes = int(mem_allocatable[:-2]) * 1024 * 1024 * 1024 else: mem_bytes = int(mem_allocatable) results.append( { "name": node.metadata.name, "pods": { "used": pod_count, "capacity": max_pods, "percent_used": round((pod_count / max_pods) * 100, 2) if max_pods > 0 else 0, }, "cpu": { "requested": round(cpu_request, 2), "allocatable": round(cpu_allocatable, 2), "percent_used": round((cpu_request / cpu_allocatable) * 100, 2) if cpu_allocatable > 0 else 0, }, "memory": { "requested": memory_request, "requested_human": format_bytes(memory_request), "allocatable": mem_bytes, "allocatable_human": format_bytes(mem_bytes), "percent_used": round((memory_request / mem_bytes) * 100, 2) if mem_bytes > 0 else 0, }, "conditions": { cond.type: cond.status for cond in node.status.conditions }, } ) return json.dumps(results) except ApiException as e: return json.dumps({"error": str(e)}), 500 @mcp.tool() def orphaned_resources(): """List resources that might be orphaned (no owner references)""" try: results = { "pods": [], "services": [], "persistent_volume_claims": [], "config_maps": [], "secrets": [], } # Check for orphaned pods pods = core_v1.list_pod_for_all_namespaces() for pod in pods.items: if ( not pod.metadata.owner_references and not pod.metadata.name.startswith("kube-") and pod.metadata.namespace != "kube-system" ): results["pods"].append( { "name": pod.metadata.name, "namespace": pod.metadata.namespace, "creation_time": pod.metadata.creation_timestamp.strftime( "%Y-%m-%d %H:%M:%S" ) if pod.metadata.creation_timestamp else None, } ) # Check for orphaned services services = core_v1.list_service_for_all_namespaces() for service in services.items: if ( not service.metadata.owner_references and not service.metadata.name.startswith("kube-") and service.metadata.namespace != "kube-system" and service.metadata.name != "kubernetes" ): results["services"].append( { "name": service.metadata.name, "namespace": service.metadata.namespace, "creation_time": service.metadata.creation_timestamp.strftime( "%Y-%m-%d %H:%M:%S" ) if service.metadata.creation_timestamp else None, } ) # Check for orphaned PVCs pvcs = core_v1.list_persistent_volume_claim_for_all_namespaces() for pvc in pvcs.items: if not pvc.metadata.owner_references: results["persistent_volume_claims"].append( { "name": pvc.metadata.name, "namespace": pvc.metadata.namespace, "creation_time": pvc.metadata.creation_timestamp.strftime( "%Y-%m-%d %H:%M:%S" ) if pvc.metadata.creation_timestamp else None, } ) # Check for orphaned ConfigMaps config_maps = core_v1.list_config_map_for_all_namespaces() for cm in config_maps.items: if ( not cm.metadata.owner_references and not cm.metadata.name.startswith("kube-") and cm.metadata.namespace != "kube-system" ): results["config_maps"].append( { "name": cm.metadata.name, "namespace": cm.metadata.namespace, "creation_time": cm.metadata.creation_timestamp.strftime( "%Y-%m-%d %H:%M:%S" ) if cm.metadata.creation_timestamp else None, } ) # Check for orphaned Secrets secrets = core_v1.list_secret_for_all_namespaces() for secret in secrets.items: if ( not secret.metadata.owner_references and not secret.metadata.name.startswith("kube-") and secret.metadata.namespace != "kube-system" and not secret.type.startswith("kubernetes.io/") ): results["secrets"].append( { "name": secret.metadata.name, "namespace": secret.metadata.namespace, "type": secret.type, "creation_time": secret.metadata.creation_timestamp.strftime( "%Y-%m-%d %H:%M:%S" ) if secret.metadata.creation_timestamp else None, } ) return json.dumps(results) except ApiException as e: return json.dumps({"error": str(e)}), 500 @mcp.tool() def get_resource_yaml(namespace, resource_type, resource_name): """ Retrieves the YAML configuration for a specified Kubernetes resource. Fetches the complete configuration of a resource, which can be useful for debugging, documentation, or backup purposes. Args: namespace (str): The Kubernetes namespace containing the resource. resource_type (str): The type of resource to retrieve. Supported types: 'pod', 'deployment', 'service', 'configmap', 'secret', 'job' resource_name (str): The name of the specific resource to retrieve. Returns: str: YAML string representation of the resource configuration. Raises: ApiException: If there is an error communicating with the Kubernetes API ValueError: If an unsupported resource type is specified """ try: resource_data = None if resource_type == "pod": resource_data = core_v1.read_namespaced_pod(resource_name, namespace) elif resource_type == "deployment": resource_data = apps_v1.read_namespaced_deployment(resource_name, namespace) elif resource_type == "service": resource_data = core_v1.read_namespaced_service(resource_name, namespace) elif resource_type == "configmap": resource_data = core_v1.read_namespaced_config_map(resource_name, namespace) elif resource_type == "secret": resource_data = core_v1.read_namespaced_secret(resource_name, namespace) elif resource_type == "job": resource_data = batch_v1.read_namespaced_job(resource_name, namespace) else: return json.dumps( {"error": f"Unsupported resource type: {resource_type}"} ), 400 # Convert to dict and then to YAML resource_dict = client.ApiClient().sanitize_for_serialization(resource_data) yaml_str = yaml.dump(resource_dict, default_flow_style=False) return yaml_str except ApiException as e: return json.dumps({"error": str(e)}), 500 # Helper function to format bytes into human-readable format def format_bytes(size): """ Format bytes to human readable string. Converts a byte value to a human-readable string with appropriate units (B, KiB, MiB, GiB, TiB). Args: size (int): Size in bytes Returns: str: Human-readable string representation of the size (e.g., "2.5 MiB") """ power = 2**10 n = 0 power_labels = {0: "B", 1: "KiB", 2: "MiB", 3: "GiB", 4: "TiB"} while size > power: size /= power n += 1 return f"{round(size, 2)} {power_labels[n]}" if __name__ == "__main__": # # Initialize and run the server mcp.run(transport="stdio") ```