#
tokens: 12025/50000 7/7 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── .python-version
├── assets
│   └── logo.svg
├── LICENSE
├── Makefile
├── pyproject.toml
├── README.md
├── server.py
├── test_server.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.13

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
#   For a library or package, you might want to ignore these files since the code is
#   intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
#   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
#   However, in case of collaboration, if having platform-specific dependencies or dependencies
#   having no cross-platform support, pipenv may install dependencies that don't work, or not
#   install all needed dependencies.
#Pipfile.lock

# UV
#   Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
#   This is especially recommended for binary packages to ensure reproducibility, and is more
#   commonly ignored for libraries.
#uv.lock

# poetry
#   Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
#   This is especially recommended for binary packages to ensure reproducibility, and is more
#   commonly ignored for libraries.
#   https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock

# pdm
#   Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
#   pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
#   in version control.
#   https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/

# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# PyCharm
#  JetBrains specific template is maintained in a separate JetBrains.gitignore that can
#  be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
#  and can be added to the global gitignore or merged into this file.  For a more nuclear
#  option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

# Custom
.DS_Store
.vscode/
node_modules/
theme/docs/*
.ruff_cache/

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
<img src="./assets/logo.svg" alt="Allseer Logo" width="400" height="400" />

# k8s-mcp
[![smithery badge](https://smithery.ai/badge/@vlttnv/k8s-mcp)](https://smithery.ai/server/@vlttnv/k8s-mcp)

A Python-based, read-only [Model Context Protocol (MCP)](https://modelcontextprotocol.io/introduction) server for Kubernetes clusters that exposes a comprehensive API to retrieve cluster information and diagnose issues.

[Example chat using Claude](https://claude.ai/share/90ae39d3-a0c1-4065-ab79-45950b6b4806)

## Installation

### Prerequisites

- Python 3.8+
- Access to a Kubernetes cluster (via kubeconfig or in-cluster configuration)
- Required Python packages (see `dependencies` in `pyproject.toml`)
- uv - https://github.com/astral-sh/uv

```bash
# To install uv
curl -LsSf https://astral.sh/uv/install.sh | sh
```

```bash
# Clone the repository
git clone [email protected]:vlttnv/k8s-mcp.git
cd k8s-mcp

# Install dependencies
uv venv
source .venv/bin/activate
uv sync
```

If using Claude configure open your Claude for Desktop App configuration at ~/Library/Application Support/Claude/claude_desktop_config.json in a text editor. Make sure to create the file if it doesn’t exist.

```bash
code ~/Library/Application\ Support/Claude/claude_desktop_config.json
```

```json
{
    "mcpServers": {
        "k8s-mcp": {
            "command": "uv",
            "args": [
                "--directory",
                "/ABSOLUTE/PATH/TO/PARENT/FOLDER/k8s-mcp",
                "run",
                "server.py"
            ]
        }
    }
}
```

> You may need to put the full path to the uv executable in the command field. You can get this by running which uv on MacOS/Linux or where uv on Windows.

## Configuration

The application automatically tries two methods to connect to your Kubernetes cluster:

1. **Kubeconfig File**: Uses your local kubeconfig file (typically located at `~/.kube/config`)
2. **In-Cluster Configuration**: If running inside a Kubernetes pod, uses the service account token

No additional configuration is required if your kubeconfig is properly set up or if you're running inside a cluster with appropriate RBAC permissions.

## Usage

### Examples
Here are some useful example prompts you can ask Claude about your Kubernetes cluster and its resources:

#### General Cluster Status
- "What's the overall health of my cluster?"
- "Show me all namespaces in my cluster"
- "What nodes are available in my cluster and what's their status?"
- "How is resource utilization across my nodes?"

#### Pods and Deployments
- "List all pods in the production namespace"
- "Are there any pods in CrashLoopBackOff state?"
- "Show me pods with high restart counts"
- "List all deployments across all namespaces"
- "What deployments are failing to progress?"

#### Debugging Issues
- "Why is my pod in the staging namespace failing?"
- "Get the YAML configuration for the service in the production namespace"
- "Show me recent events in the default namespace"
- "Are there any pods stuck in Pending state?"
- "What's causing ImagePullBackOff errors in my cluster?"

#### Resource Management
- "Show me the resource consumption of nodes in my cluster"
- "Are there any orphaned resources I should clean up?"
- "List all services in the production namespace"
- "Compare resource requests between staging and production"

#### Specific Resource Inspection
- "Show me the config for the coredns deployment in kube-system"
- "Get details of the reverse-proxy service in staging"
- "What containers are running in the pod xyz?"
- "Show me the logs for the failing pod"

## API Reference

### Namespaces

- `get_namespaces()`: List all available namespaces in the cluster

### Pods

- `list_pods(namespace=None)`: List all pods, optionally filtered by namespace
- `failed_pods()`: List all pods in Failed or Error state
- `pending_pods()`: List all pods in Pending state with reasons
- `high_restart_pods(restart_threshold=5)`: Find pods with restart counts above threshold

### Nodes

- `list_nodes()`: List all nodes and their status
- `node_capacity()`: Show available capacity on all nodes

### Deployments & Services

- `list_deployments(namespace=None)`: List all deployments
- `list_services(namespace=None)`: List all services
- `list_events(namespace=None)`: List all events

### Resource Management

- `orphaned_resources()`: List resources without owner references
- `get_resource_yaml(namespace, resource_type, resource_name)`: Get YAML configuration for a specific resource

## License

[MIT License](LICENSE)

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "k8s-mcp"
version = "0.1.0"
description = "A read-only Model Context Protocol (MCP) for querying Kubernetes clusters."
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
    "kubernetes>=32.0.1",
    "mcp[cli]>=1.3.0",
    "pytest>=8.3.5",
    "pyyaml>=6.0.2",
]

[dependency-groups]
dev = [
    "ruff>=0.9.9",
]

[tool.ruff.lint]
select = [
    # isort
    "I",
]

```

--------------------------------------------------------------------------------
/assets/logo.svg:
--------------------------------------------------------------------------------

```
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 400 400">
    <!-- Background -->
    <rect width="400" height="400" rx="12" ry="12" fill="#f8fafc" />

    <!-- Server stack background -->
    <rect x="120" y="100" width="160" height="180" rx="8" ry="8" fill="#ebf5ff" stroke="#2563eb" stroke-width="2" />

    <!-- Server components -->
    <rect x="140" y="125" width="120" height="30" rx="4" ry="4" fill="#ffffff" stroke="#2563eb" stroke-width="2" />
    <rect x="140" y="165" width="120" height="30" rx="4" ry="4" fill="#ffffff" stroke="#2563eb" stroke-width="2" />
    <rect x="140" y="205" width="120" height="30" rx="4" ry="4" fill="#ffffff" stroke="#2563eb" stroke-width="2" />

    <!-- Data points on servers -->
    <circle cx="155" cy="140" r="4" fill="#2563eb" />
    <circle cx="170" cy="140" r="4" fill="#2563eb" />
    <circle cx="155" cy="180" r="4" fill="#2563eb" />
    <circle cx="170" cy="180" r="4" fill="#2563eb" />
    <circle cx="155" cy="220" r="4" fill="#2563eb" />
    <circle cx="170" cy="220" r="4" fill="#2563eb" />

    <!-- Status indicators -->
    <circle cx="245" cy="140" r="6" fill="#10b981" />
    <circle cx="245" cy="180" r="6" fill="#10b981" />
    <circle cx="245" cy="220" r="6" fill="#10b981" />

    <!-- Python logo (simplified) -->
    <!-- <g transform="translate(200, 80) scale(0.6)">
      <path d="M-20,-35 C-20,-43.28 -13.28,-50 -5,-50 L5,-50 C13.28,-50 20,-43.28 20,-35 L20,-15 C20,-6.72 13.28,0 5,0 L-5,0 C-13.28,0 -20,-6.72 -20,-15 Z" fill="#366a96" />
      <path d="M-20,35 C-20,43.28 -13.28,50 -5,50 L5,50 C13.28,50 20,43.28 20,35 L20,15 C20,6.72 13.28,0 5,0 L-5,0 C-13.28,0 -20,6.72 -20,15 Z" fill="#ffd43b" />
      <circle cx="-10" cy="-25" r="5" fill="#ffffff" />
      <circle cx="-10" cy="25" r="5" fill="#ffffff" />
    </g> -->

    <!-- Kubernetes wheel (simplified) -->
    <g transform="translate(200, 260) scale(0.7)">
      <circle cx="0" cy="0" r="40" fill="#326ce5" opacity="0.2" />
      <circle cx="0" cy="0" r="30" fill="#326ce5" opacity="0.3" />
      <g stroke="#326ce5" stroke-width="6" stroke-linecap="round">
        <line x1="0" y1="-40" x2="0" y2="-60" />
        <line x1="28" y1="-28" x2="42" y2="-42" />
        <line x1="40" y1="0" x2="60" y2="0" />
        <line x1="28" y1="28" x2="42" y2="42" />
        <line x1="0" y1="40" x2="0" y2="60" />
        <line x1="-28" y1="28" x2="-42" y2="42" />
        <line x1="-40" y1="0" x2="-60" y2="0" />
        <line x1="-28" y1="-28" x2="-42" y2="-42" />
      </g>
    </g>

    <!-- API connections -->
    <path d="M95,150 C75,150 75,180 95,180" fill="none" stroke="#7c3aed" stroke-width="2.5" stroke-dasharray="5,3" />
    <path d="M95,180 C75,180 75,210 95,210" fill="none" stroke="#7c3aed" stroke-width="2.5" stroke-dasharray="5,3" />
    <path d="M305,150 C325,150 325,180 305,180" fill="none" stroke="#7c3aed" stroke-width="2.5" stroke-dasharray="5,3" />
    <path d="M305,180 C325,180 325,210 305,210" fill="none" stroke="#7c3aed" stroke-width="2.5" stroke-dasharray="5,3" />

    <!-- API endpoint indicators -->
    <circle cx="95" cy="150" r="4" fill="#7c3aed" />
    <circle cx="95" cy="180" r="4" fill="#7c3aed" />
    <circle cx="95" cy="210" r="4" fill="#7c3aed" />
    <circle cx="305" cy="150" r="4" fill="#7c3aed" />
    <circle cx="305" cy="180" r="4" fill="#7c3aed" />
    <circle cx="305" cy="210" r="4" fill="#7c3aed" />

    <!-- MCP Badge -->
    <g transform="translate(200,260)">
      <rect x="-45" y="-20" width="90" height="40" rx="20" ry="20" fill="#ffffff" stroke="#7c3aed" stroke-width="2.5" />
      <text x="0" y="8" font-family="Arial, sans-serif" font-size="18" font-weight="bold" text-anchor="middle" fill="#7c3aed">MCP</text>
    </g>

    <!-- Title and description -->
    <text x="200" y="330" font-family="Arial, sans-serif" font-weight="bold" font-size="28" text-anchor="middle" fill="#1e293b">k8s-mcp</text>
    <text x="200" y="355" font-family="Arial, sans-serif" font-size="14" text-anchor="middle" fill="#475569">Read-only MCP server for Kubernetes clusters</text>
  </svg>

```

--------------------------------------------------------------------------------
/test_server.py:
--------------------------------------------------------------------------------

```python
import unittest
from unittest.mock import patch, MagicMock
import json
import datetime
import asyncio
from kubernetes.client.rest import ApiException

# Import the module to be tested
import server


class AsyncTestCase(unittest.TestCase):
    """Base class for testing async functions."""

    def run_async(self, coro):
        """Helper method to run coroutines in tests."""
        return asyncio.run(coro)


class TestKubernetesServer(AsyncTestCase):
    """Test cases for Kubernetes monitoring server functions."""

    def setUp(self):
        """Set up test fixtures."""
        # Mock configuration and API clients
        self.mock_config = patch("server.config").start()
        self.mock_core_v1 = patch("server.core_v1").start()
        self.mock_apps_v1 = patch("server.apps_v1").start()
        self.mock_batch_v1 = patch("server.batch_v1").start()
        self.mock_custom_objects = patch("server.custom_objects").start()

        # Mock FastMCP server
        self.mock_mcp = patch("server.mcp").start()

    def tearDown(self):
        """Tear down test fixtures."""
        patch.stopall()

    def test_get_namespaces(self):
        """Test get_namespaces function."""
        # Create mock namespace items
        mock_namespace1 = MagicMock()
        mock_namespace1.metadata.name = "default"
        mock_namespace1.status.phase = "Active"
        mock_namespace1.metadata.creation_timestamp = datetime.datetime(
            2023, 1, 1, 12, 0, 0
        )

        mock_namespace2 = MagicMock()
        mock_namespace2.metadata.name = "kube-system"
        mock_namespace2.status.phase = "Active"
        mock_namespace2.metadata.creation_timestamp = datetime.datetime(
            2023, 1, 1, 12, 0, 0
        )

        # Set up mock response
        mock_response = MagicMock()
        mock_response.items = [mock_namespace1, mock_namespace2]
        self.mock_core_v1.list_namespace.return_value = mock_response

        # Call the async function
        result = asyncio.run(server.get_namespaces())

        # Verify the response
        namespaces = json.loads(result)
        self.assertEqual(len(namespaces), 2)
        self.assertEqual(namespaces[0]["name"], "default")
        self.assertEqual(namespaces[1]["name"], "kube-system")

        # Verify the API was called
        self.mock_core_v1.list_namespace.assert_called_once()

    def test_get_namespaces_error(self):
        """Test get_namespaces function with API error."""
        # Simulate API exception
        self.mock_core_v1.list_namespace.side_effect = ApiException(
            status=403, reason="Forbidden"
        )

        # Call the function
        result_tuple = asyncio.run(server.get_namespaces())
        # If the function returns a tuple
        if isinstance(result_tuple, tuple):
            result, status_code = result_tuple
        else:
            # If function returns just the error JSON
            result = result_tuple
            status_code = 500  # Assuming default error code

        # Verify error response
        error_response = json.loads(result)
        self.assertEqual(status_code, 500)
        self.assertIn("error", error_response)

    def test_list_pods(self):
        """Test list_pods function with namespace parameter."""
        # Create mock pod items
        mock_pod = MagicMock()
        mock_pod.metadata.name = "test-pod"
        mock_pod.metadata.namespace = "default"
        mock_pod.status.phase = "Running"
        mock_pod.status.pod_ip = "10.0.0.1"
        mock_pod.spec.node_name = "node1"
        mock_pod.metadata.creation_timestamp = datetime.datetime(2023, 1, 1, 12, 0, 0)

        # Create mock container
        mock_container = MagicMock()
        mock_container.name = "test-container"
        mock_container.image = "nginx:latest"
        mock_pod.spec.containers = [mock_container]

        # Create mock container status
        mock_container_status = MagicMock()
        mock_container_status.name = "test-container"
        mock_container_status.container_id = "container123"
        mock_pod.status.container_statuses = [mock_container_status]

        # Set up mock response
        mock_response = MagicMock()
        mock_response.items = [mock_pod]
        self.mock_core_v1.list_namespaced_pod.return_value = mock_response

        # Call the function with namespace
        result = server.list_pods(namespace="default")

        # Verify the response
        pods = json.loads(result)
        self.assertEqual(len(pods), 1)
        self.assertEqual(pods[0]["name"], "test-pod")
        self.assertEqual(pods[0]["namespace"], "default")
        self.assertEqual(pods[0]["containers"][0]["name"], "test-container")
        self.assertTrue(pods[0]["containers"][0]["ready"])

        # Verify the API was called with correct namespace
        self.mock_core_v1.list_namespaced_pod.assert_called_once_with("default")

    def test_list_pods_all_namespaces(self):
        """Test list_pods function without namespace parameter."""
        # Set up mock response
        mock_response = MagicMock()
        mock_response.items = []
        self.mock_core_v1.list_pod_for_all_namespaces.return_value = mock_response

        # Call the function without namespace
        result = server.list_pods()  # noqa: F841

        # Verify the API was called for all namespaces
        self.mock_core_v1.list_pod_for_all_namespaces.assert_called_once()

    def test_list_nodes(self):
        """Test list_nodes function."""
        # Create mock node
        mock_node = MagicMock()
        mock_node.metadata.name = "node1"

        # Mock node conditions
        mock_condition = MagicMock()
        mock_condition.type = "Ready"
        mock_condition.status = "True"
        mock_node.status.conditions = [mock_condition]

        # Mock node addresses
        mock_address = MagicMock()
        mock_address.type = "InternalIP"
        mock_address.address = "192.168.1.1"
        mock_node.status.addresses = [mock_address]

        # Mock node capacity
        mock_node.status.capacity = {"cpu": "4", "memory": "8Gi", "pods": "110"}
        mock_node.status.allocatable = {"cpu": "3800m", "memory": "7Gi", "pods": "100"}

        # Mock node info
        mock_node.status.node_info = MagicMock()
        mock_node.status.node_info.kubelet_version = "v1.25.0"

        # Set up mock response
        mock_response = MagicMock()
        mock_response.items = [mock_node]
        self.mock_core_v1.list_node.return_value = mock_response

        # Call the function
        result = server.list_nodes()

        # Verify the response
        nodes = json.loads(result)
        self.assertEqual(len(nodes), 1)
        self.assertEqual(nodes[0]["name"], "node1")
        self.assertEqual(nodes[0]["conditions"]["Ready"], "True")
        self.assertEqual(nodes[0]["addresses"]["InternalIP"], "192.168.1.1")
        self.assertEqual(nodes[0]["capacity"]["cpu"], "4")
        self.assertEqual(nodes[0]["allocatable"]["memory"], "7Gi")
        self.assertEqual(nodes[0]["kubelet_version"], "v1.25.0")

        # Verify the API was called
        self.mock_core_v1.list_node.assert_called_once()

    def test_failed_pods(self):
        """Test failed_pods function."""
        # Create mock failed pod
        mock_pod = MagicMock()
        mock_pod.metadata.name = "failed-pod"
        mock_pod.metadata.namespace = "default"
        mock_pod.status.phase = "Failed"
        mock_pod.spec.node_name = "node1"
        mock_pod.status.message = "Pod failed"
        mock_pod.status.reason = "Error"

        # Create mock container status
        mock_container_status = MagicMock()
        mock_container_status.name = "test-container"
        mock_container_status.restart_count = 3

        # Create mock container state
        mock_container_status.state = MagicMock()
        mock_container_status.state.waiting = MagicMock()
        mock_container_status.state.waiting.reason = "CrashLoopBackOff"
        mock_container_status.state.waiting.message = "Container crashed"
        mock_container_status.state.terminated = None

        mock_pod.status.container_statuses = [mock_container_status]

        # Set up mock response
        mock_response = MagicMock()
        mock_response.items = [mock_pod]
        self.mock_core_v1.list_pod_for_all_namespaces.return_value = mock_response

        # Call the function
        result = server.failed_pods()

        # Verify the response
        failed = json.loads(result)
        self.assertEqual(len(failed), 1)
        self.assertEqual(failed[0]["name"], "failed-pod")
        self.assertEqual(failed[0]["phase"], "Failed")
        self.assertEqual(failed[0]["container_statuses"][0]["name"], "test-container")
        self.assertEqual(
            failed[0]["container_statuses"][0]["state"]["reason"], "CrashLoopBackOff"
        )

        # Verify the API was called
        self.mock_core_v1.list_pod_for_all_namespaces.assert_called_once()

    def test_get_resource_yaml(self):
        """Test get_resource_yaml function."""
        # Create mock API client
        mock_api_client = MagicMock()
        server.client.ApiClient.return_value = mock_api_client

        # Create mock resource
        mock_resource = MagicMock()
        self.mock_core_v1.read_namespaced_pod.return_value = mock_resource

        # Set up serialization
        mock_dict = {
            "apiVersion": "v1",
            "kind": "Pod",
            "metadata": {"name": "test-pod"},
        }
        mock_api_client.sanitize_for_serialization.return_value = mock_dict

        # Mock the yaml dump function to ensure consistent output
        with patch("server.yaml.dump") as mock_yaml_dump:
            mock_yaml_dump.return_value = (
                "apiVersion: v1\nkind: Pod\nmetadata:\n  name: test-pod\n"
            )

            # Call the function
            result = server.get_resource_yaml("default", "pod", "test-pod")

            # Verify YAML output is what we expect based on our mock
            self.assertEqual(
                result, "apiVersion: v1\nkind: Pod\nmetadata:\n  name: test-pod\n"
            )

            # Verify yaml.dump was called with the correct parameters
            mock_yaml_dump.assert_called_once_with(mock_dict, default_flow_style=False)

        # Verify API calls
        self.mock_core_v1.read_namespaced_pod.assert_called_once_with(
            "test-pod", "default"
        )
        mock_api_client.sanitize_for_serialization.assert_called_once_with(
            mock_resource
        )

    def test_get_resource_yaml_unsupported_type(self):
        """Test get_resource_yaml function with unsupported resource type."""
        # Call the function with unsupported type
        result, status_code = server.get_resource_yaml(
            "default", "unknown", "resource-name"
        )

        # Verify error response
        error_response = json.loads(result)
        self.assertEqual(status_code, 400)
        self.assertIn("error", error_response)
        self.assertIn("Unsupported resource type", error_response["error"])

    def test_format_bytes(self):
        """Test format_bytes helper function."""
        # Test various sizes
        self.assertEqual(server.format_bytes(500), "500 B")
        self.assertEqual(server.format_bytes(1024), "1024 B")
        self.assertEqual(server.format_bytes(1536), "1.5 KiB")
        self.assertEqual(server.format_bytes(2 * 1024 * 1024), "2.0 MiB")
        self.assertEqual(server.format_bytes(3 * 1024 * 1024 * 1024), "3.0 GiB")


if __name__ == "__main__":
    unittest.main()

```

--------------------------------------------------------------------------------
/server.py:
--------------------------------------------------------------------------------

```python
"""
Kubernetes Monitoring API Server

This module provides a FastMCP server that exposes Kubernetes monitoring APIs.
It connects to a Kubernetes cluster and provides endpoints to query various
cluster resources including pods, services, deployments, nodes, and events.

Dependencies:
    - kubernetes: Python client for Kubernetes
    - yaml: For YAML serialization
    - FastMCP: Server framework for API endpoints
"""

import yaml
import json
from datetime import datetime
from kubernetes import client, config
from kubernetes.client.rest import ApiException
from mcp.server.fastmcp import FastMCP


# Initialize FastMCP server
mcp = FastMCP("k8s")

# Kubernetes client configuration
try:
    # Try to load from default kubeconfig
    config.load_kube_config()
except Exception:
    # If running inside a pod
    try:
        config.load_incluster_config()
    except Exception as e:
        print(f"Failed to configure Kubernetes client: {e}")
        exit(1)

# Initialize API clients
core_v1 = client.CoreV1Api()
apps_v1 = client.AppsV1Api()
batch_v1 = client.BatchV1Api()
custom_objects = client.CustomObjectsApi()


@mcp.tool()
def get_namespaces():
    """
    List all namespaces in the Kubernetes cluster.

    Returns:
        str: JSON string containing an array of namespace objects with fields:
            - name (str): Name of the namespace
            - status (str): Phase of the namespace (Active, Terminating)
            - creation_time (str): Timestamp when namespace was created

    Raises:
        ApiException: If there is an error communicating with the Kubernetes API
    """
    try:
        namespaces = core_v1.list_namespace()
        result = []
        for ns in namespaces.items:
            result.append(
                {
                    "name": ns.metadata.name,
                    "status": ns.status.phase,
                    "creation_time": ns.metadata.creation_timestamp.strftime(
                        "%Y-%m-%d %H:%M:%S"
                    )
                    if ns.metadata.creation_timestamp
                    else None,
                }
            )
        return json.dumps(result)
    except ApiException as e:
        return json.dumps({"error": str(e)}), 500


@mcp.tool()
def list_pods(namespace=None):
    """
    Lists all pods in the specified Kubernetes namespace or across all namespaces.

    Retrieves detailed information about pods including their status, containers,
    and hosting node.

    Args:
        namespace (str, optional): The namespace to filter pods by.
            If None, pods from all namespaces will be returned. Defaults to None.

    Returns:
        str: JSON string containing an array of pod objects with fields:
            - name (str): Name of the pod
            - namespace (str): Namespace where the pod is running
            - phase (str): Current phase of the pod (Running, Pending, etc.)
            - ip (str): Pod IP address
            - node (str): Name of the node running this pod
            - containers (list): List of containers in the pod with their status
            - creation_time (str): Timestamp when pod was created

    Raises:
        ApiException: If there is an error communicating with the Kubernetes API
    """

    try:
        if namespace:
            pods = core_v1.list_namespaced_pod(namespace)
        else:
            pods = core_v1.list_pod_for_all_namespaces()

        result = []
        for pod in pods.items:
            containers = []
            for container in pod.spec.containers:
                containers.append(
                    {
                        "name": container.name,
                        "image": container.image,
                        "ready": any(
                            s.container_id is not None and s.name == container.name
                            for s in pod.status.container_statuses
                        )
                        if pod.status.container_statuses
                        else False,
                    }
                )

            result.append(
                {
                    "name": pod.metadata.name,
                    "namespace": pod.metadata.namespace,
                    "phase": pod.status.phase,
                    "ip": pod.status.pod_ip,
                    "node": pod.spec.node_name,
                    "containers": containers,
                    "creation_time": pod.metadata.creation_timestamp.strftime(
                        "%Y-%m-%d %H:%M:%S"
                    )
                    if pod.metadata.creation_timestamp
                    else None,
                }
            )
        return json.dumps(result)
    except ApiException as e:
        return json.dumps({"error": str(e)}), 500


@mcp.tool()
def list_nodes():
    """List all nodes and their status"""
    try:
        nodes = core_v1.list_node()
        result = []
        for node in nodes.items:
            conditions = {}
            for condition in node.status.conditions:
                conditions[condition.type] = condition.status

            addresses = {}
            for address in node.status.addresses:
                addresses[address.type] = address.address

            # Get capacity and allocatable resources
            capacity = {
                "cpu": node.status.capacity.get("cpu"),
                "memory": node.status.capacity.get("memory"),
                "pods": node.status.capacity.get("pods"),
            }

            allocatable = {
                "cpu": node.status.allocatable.get("cpu"),
                "memory": node.status.allocatable.get("memory"),
                "pods": node.status.allocatable.get("pods"),
            }

            result.append(
                {
                    "name": node.metadata.name,
                    "conditions": conditions,
                    "addresses": addresses,
                    "capacity": capacity,
                    "allocatable": allocatable,
                    "kubelet_version": node.status.node_info.kubelet_version
                    if node.status.node_info
                    else None,
                }
            )
        return json.dumps(result)
    except ApiException as e:
        return json.dumps({"error": str(e)}), 500


@mcp.tool()
def list_deployments(namespace=None):
    """
    List deployments with optional namespace filter

    Args:
        namespaces (list, optional): A list of namespace names to filter pods by.
            If None, pods from all namespaces will be returned. Defaults to None.
    """
    try:
        if namespace:
            deployments = apps_v1.list_namespaced_deployment(namespace)
        else:
            deployments = apps_v1.list_deployment_for_all_namespaces()

        result = []
        for deployment in deployments.items:
            result.append(
                {
                    "name": deployment.metadata.name,
                    "namespace": deployment.metadata.namespace,
                    "replicas": deployment.spec.replicas,
                    "available_replicas": deployment.status.available_replicas,
                    "ready_replicas": deployment.status.ready_replicas,
                    "strategy": deployment.spec.strategy.type,
                    "creation_time": deployment.metadata.creation_timestamp.strftime(
                        "%Y-%m-%d %H:%M:%S"
                    )
                    if deployment.metadata.creation_timestamp
                    else None,
                }
            )
        return json.dumps(result)
    except ApiException as e:
        return json.dumps({"error": str(e)}), 500


@mcp.tool()
def list_services(namespace=None):
    """
    List services with optional namespace filter

    Args:
        namespaces (list, optional): A list of namespace names to filter pods by.
            If None, pods from all namespaces will be returned. Defaults to None.
    """
    try:
        if namespace:
            services = core_v1.list_namespaced_service(namespace)
        else:
            services = core_v1.list_service_for_all_namespaces()

        result = []
        for service in services.items:
            ports = []
            for port in service.spec.ports:
                ports.append(
                    {
                        "name": port.name,
                        "port": port.port,
                        "target_port": port.target_port,
                        "protocol": port.protocol,
                        "node_port": port.node_port
                        if hasattr(port, "node_port")
                        else None,
                    }
                )

            result.append(
                {
                    "name": service.metadata.name,
                    "namespace": service.metadata.namespace,
                    "type": service.spec.type,
                    "cluster_ip": service.spec.cluster_ip,
                    "external_ip": service.spec.external_i_ps
                    if hasattr(service.spec, "external_i_ps")
                    else None,
                    "ports": ports,
                    "selector": service.spec.selector,
                    "creation_time": service.metadata.creation_timestamp.strftime(
                        "%Y-%m-%d %H:%M:%S"
                    )
                    if service.metadata.creation_timestamp
                    else None,
                }
            )
        return json.dumps(result)
    except ApiException as e:
        return json.dumps({"error": str(e)}), 500


@mcp.tool()
def list_events(namespace=None):
    """
    List events with optional namespace filter

    Args:
        namespaces (list, optional): A list of namespace names to filter pods by.
            If None, pods from all namespaces will be returned. Defaults to None.
    """
    try:
        if namespace:
            events = core_v1.list_namespaced_event(namespace)
        else:
            events = core_v1.list_event_for_all_namespaces()

        result = []
        for event in events.items:
            result.append(
                {
                    "type": event.type,
                    "reason": event.reason,
                    "message": event.message,
                    "object": f"{event.involved_object.kind}/{event.involved_object.name}",
                    "namespace": event.metadata.namespace,
                    "count": event.count,
                    "first_time": event.first_timestamp.strftime("%Y-%m-%d %H:%M:%S")
                    if event.first_timestamp
                    else None,
                    "last_time": event.last_timestamp.strftime("%Y-%m-%d %H:%M:%S")
                    if event.last_timestamp
                    else None,
                }
            )
        # Sort by last_time (newest first)
        # TODO: fix issue with sorting
        # result.sort(key=lambda x: x.get("last_time", ""), reverse=True)
        return json.dumps(result)
    except ApiException as e:
        return json.dumps({"error": str(e)}), 500


@mcp.tool()
def failed_pods():
    """
    List all pods in Failed or Error state across all namespaces.

    Identifies pods that are in a failed state, including those in CrashLoopBackOff,
    ImagePullBackOff, or other error states. Provides detailed container status
    information to aid in troubleshooting.

    Returns:
        str: JSON string containing an array of failed pod objects with fields:
            - name (str): Name of the pod
            - namespace (str): Namespace where the pod is running
            - phase (str): Current phase of the pod
            - container_statuses (list): Detailed status of each container
              including state, reason, exit codes, and restart counts
            - node (str): Name of the node running this pod
            - message (str): Status message from the pod, if any
            - reason (str): Reason for the current status, if any

    Raises:
        ApiException: If there is an error communicating with the Kubernetes API
    """
    try:
        pods = core_v1.list_pod_for_all_namespaces()
        failed = []

        for pod in pods.items:
            if pod.status.phase in ["Failed", "Error"] or any(
                s.state
                and s.state.waiting
                and s.state.waiting.reason
                in ["CrashLoopBackOff", "ImagePullBackOff", "ErrImagePull"]
                for s in pod.status.container_statuses
                if s.state and s.state.waiting
            ):
                container_statuses = []
                if pod.status.container_statuses:
                    for s in pod.status.container_statuses:
                        state = {}
                        if s.state.waiting:
                            state = {
                                "status": "waiting",
                                "reason": s.state.waiting.reason,
                                "message": s.state.waiting.message,
                            }
                        elif s.state.terminated:
                            state = {
                                "status": "terminated",
                                "reason": s.state.terminated.reason,
                                "exit_code": s.state.terminated.exit_code,
                                "message": s.state.terminated.message,
                            }
                        container_statuses.append(
                            {
                                "name": s.name,
                                "state": state,
                                "restart_count": s.restart_count,
                            }
                        )

                failed.append(
                    {
                        "name": pod.metadata.name,
                        "namespace": pod.metadata.namespace,
                        "phase": pod.status.phase,
                        "container_statuses": container_statuses,
                        "node": pod.spec.node_name,
                        "message": pod.status.message if pod.status.message else None,
                        "reason": pod.status.reason if pod.status.reason else None,
                    }
                )

        return json.dumps(failed)
    except ApiException as e:
        return json.dumps({"error": str(e)}), 500


@mcp.tool()
def pending_pods():
    """List all pods in Pending state and why they're pending"""
    try:
        pods = core_v1.list_pod_for_all_namespaces()
        pending = []

        for pod in pods.items:
            if pod.status.phase == "Pending":
                # Check for events related to this pod
                events = core_v1.list_namespaced_event(
                    pod.metadata.namespace,
                    field_selector=f"involvedObject.name={pod.metadata.name},involvedObject.kind=Pod",
                )

                pending_reason = "Unknown"
                pending_message = None

                # Get the latest event that might explain why it's pending
                if events.items:
                    latest_event = max(
                        events.items,
                        key=lambda e: e.last_timestamp
                        if e.last_timestamp
                        else datetime.min,
                    )
                    pending_reason = latest_event.reason
                    pending_message = latest_event.message

                pending.append(
                    {
                        "name": pod.metadata.name,
                        "namespace": pod.metadata.namespace,
                        "node": pod.spec.node_name,
                        "reason": pending_reason,
                        "message": pending_message,
                        "creation_time": pod.metadata.creation_timestamp.strftime(
                            "%Y-%m-%d %H:%M:%S"
                        )
                        if pod.metadata.creation_timestamp
                        else None,
                    }
                )

        return json.dumps(pending)
    except ApiException as e:
        return json.dumps({"error": str(e)}), 500


@mcp.tool()
def high_restart_pods(restart_threshold=5):
    """
    Find pods with high restart counts (>5)

    Args:
        restart_threshold (int, optional): The minimum number of restarts
            required to include a pod in the results. Defaults to 5.
    """

    try:
        pods = core_v1.list_pod_for_all_namespaces()
        high_restart = []

        for pod in pods.items:
            high_restart_containers = []

            if pod.status.container_statuses:
                for status in pod.status.container_statuses:
                    if status.restart_count > restart_threshold:
                        high_restart_containers.append(
                            {
                                "name": status.name,
                                "restart_count": status.restart_count,
                                "ready": status.ready,
                                "image": status.image,
                            }
                        )

            if high_restart_containers:
                high_restart.append(
                    {
                        "name": pod.metadata.name,
                        "namespace": pod.metadata.namespace,
                        "node": pod.spec.node_name,
                        "containers": high_restart_containers,
                    }
                )

        return json.dumps(high_restart)
    except ApiException as e:
        return json.dumps({"error": str(e)}), 500


@mcp.tool()
def node_capacity():
    """
    Show available capacity and resource utilization on all nodes.

    Calculates the current resource usage across all nodes, including:
    - Pod count vs. maximum pods per node
    - CPU requests vs. allocatable CPU
    - Memory requests vs. allocatable memory

    The function provides both raw values and percentage utilization to help
    identify nodes approaching resource limits.

    Returns:
        str: JSON string containing an array of node capacity objects with fields:
            - name (str): Name of the node
            - pods (dict): Pod capacity information
              - used (int): Number of pods running on the node
              - capacity (int): Maximum number of pods the node can run
              - percent_used (float): Percentage of pod capacity in use
            - cpu (dict): CPU resource information
              - requested (float): CPU cores requested by pods
              - allocatable (float): CPU cores available on the node
              - percent_used (float): Percentage of CPU capacity in use
            - memory (dict): Memory resource information
              - requested (int): Memory requested by pods in bytes
              - requested_human (str): Human-readable memory requested
              - allocatable (int): Memory available on the node in bytes
              - allocatable_human (str): Human-readable allocatable memory
              - percent_used (float): Percentage of memory capacity in use
            - conditions (dict): Node condition statuses

    Raises:
        ApiException: If there is an error communicating with the Kubernetes API
    """
    try:
        nodes = core_v1.list_node()
        pods = core_v1.list_pod_for_all_namespaces()

        # Group pods by node
        node_pods = {}
        for pod in pods.items:
            if pod.spec.node_name:
                if pod.spec.node_name not in node_pods:
                    node_pods[pod.spec.node_name] = []
                node_pods[pod.spec.node_name].append(pod)

        results = []
        for node in nodes.items:
            # Calculate pod count
            pod_count = len(node_pods.get(node.metadata.name, []))
            max_pods = int(node.status.allocatable.get("pods", 0))

            # Calculate CPU and memory utilization (rough estimate)
            node_pods_list = node_pods.get(node.metadata.name, [])
            cpu_request = 0
            memory_request = 0

            for pod in node_pods_list:
                for container in pod.spec.containers:
                    if container.resources and container.resources.requests:
                        if container.resources.requests.get("cpu"):
                            cpu_str = container.resources.requests.get("cpu")
                            if cpu_str.endswith("m"):
                                cpu_request += int(cpu_str[:-1]) / 1000
                            else:
                                cpu_request += float(cpu_str)

                        if container.resources.requests.get("memory"):
                            mem_str = container.resources.requests.get("memory")
                            # Convert to bytes (rough approximation)
                            if mem_str.endswith("Ki"):
                                memory_request += int(mem_str[:-2]) * 1024
                            elif mem_str.endswith("Mi"):
                                memory_request += int(mem_str[:-2]) * 1024 * 1024
                            elif mem_str.endswith("Gi"):
                                memory_request += int(mem_str[:-2]) * 1024 * 1024 * 1024
                            else:
                                memory_request += int(mem_str)

            # Convert allocatable CPU to cores
            cpu_allocatable = node.status.allocatable.get("cpu", "0")
            if cpu_allocatable.endswith("m"):
                cpu_allocatable = int(cpu_allocatable[:-1]) / 1000
            else:
                cpu_allocatable = float(cpu_allocatable)

            # Convert allocatable memory to bytes
            mem_allocatable = node.status.allocatable.get("memory", "0")
            mem_bytes = 0
            if mem_allocatable.endswith("Ki"):
                mem_bytes = int(mem_allocatable[:-2]) * 1024
            elif mem_allocatable.endswith("Mi"):
                mem_bytes = int(mem_allocatable[:-2]) * 1024 * 1024
            elif mem_allocatable.endswith("Gi"):
                mem_bytes = int(mem_allocatable[:-2]) * 1024 * 1024 * 1024
            else:
                mem_bytes = int(mem_allocatable)

            results.append(
                {
                    "name": node.metadata.name,
                    "pods": {
                        "used": pod_count,
                        "capacity": max_pods,
                        "percent_used": round((pod_count / max_pods) * 100, 2)
                        if max_pods > 0
                        else 0,
                    },
                    "cpu": {
                        "requested": round(cpu_request, 2),
                        "allocatable": round(cpu_allocatable, 2),
                        "percent_used": round((cpu_request / cpu_allocatable) * 100, 2)
                        if cpu_allocatable > 0
                        else 0,
                    },
                    "memory": {
                        "requested": memory_request,
                        "requested_human": format_bytes(memory_request),
                        "allocatable": mem_bytes,
                        "allocatable_human": format_bytes(mem_bytes),
                        "percent_used": round((memory_request / mem_bytes) * 100, 2)
                        if mem_bytes > 0
                        else 0,
                    },
                    "conditions": {
                        cond.type: cond.status for cond in node.status.conditions
                    },
                }
            )

        return json.dumps(results)
    except ApiException as e:
        return json.dumps({"error": str(e)}), 500


@mcp.tool()
def orphaned_resources():
    """List resources that might be orphaned (no owner references)"""
    try:
        results = {
            "pods": [],
            "services": [],
            "persistent_volume_claims": [],
            "config_maps": [],
            "secrets": [],
        }

        # Check for orphaned pods
        pods = core_v1.list_pod_for_all_namespaces()
        for pod in pods.items:
            if (
                not pod.metadata.owner_references
                and not pod.metadata.name.startswith("kube-")
                and pod.metadata.namespace != "kube-system"
            ):
                results["pods"].append(
                    {
                        "name": pod.metadata.name,
                        "namespace": pod.metadata.namespace,
                        "creation_time": pod.metadata.creation_timestamp.strftime(
                            "%Y-%m-%d %H:%M:%S"
                        )
                        if pod.metadata.creation_timestamp
                        else None,
                    }
                )

        # Check for orphaned services
        services = core_v1.list_service_for_all_namespaces()
        for service in services.items:
            if (
                not service.metadata.owner_references
                and not service.metadata.name.startswith("kube-")
                and service.metadata.namespace != "kube-system"
                and service.metadata.name != "kubernetes"
            ):
                results["services"].append(
                    {
                        "name": service.metadata.name,
                        "namespace": service.metadata.namespace,
                        "creation_time": service.metadata.creation_timestamp.strftime(
                            "%Y-%m-%d %H:%M:%S"
                        )
                        if service.metadata.creation_timestamp
                        else None,
                    }
                )

        # Check for orphaned PVCs
        pvcs = core_v1.list_persistent_volume_claim_for_all_namespaces()
        for pvc in pvcs.items:
            if not pvc.metadata.owner_references:
                results["persistent_volume_claims"].append(
                    {
                        "name": pvc.metadata.name,
                        "namespace": pvc.metadata.namespace,
                        "creation_time": pvc.metadata.creation_timestamp.strftime(
                            "%Y-%m-%d %H:%M:%S"
                        )
                        if pvc.metadata.creation_timestamp
                        else None,
                    }
                )

        # Check for orphaned ConfigMaps
        config_maps = core_v1.list_config_map_for_all_namespaces()
        for cm in config_maps.items:
            if (
                not cm.metadata.owner_references
                and not cm.metadata.name.startswith("kube-")
                and cm.metadata.namespace != "kube-system"
            ):
                results["config_maps"].append(
                    {
                        "name": cm.metadata.name,
                        "namespace": cm.metadata.namespace,
                        "creation_time": cm.metadata.creation_timestamp.strftime(
                            "%Y-%m-%d %H:%M:%S"
                        )
                        if cm.metadata.creation_timestamp
                        else None,
                    }
                )

        # Check for orphaned Secrets
        secrets = core_v1.list_secret_for_all_namespaces()
        for secret in secrets.items:
            if (
                not secret.metadata.owner_references
                and not secret.metadata.name.startswith("kube-")
                and secret.metadata.namespace != "kube-system"
                and not secret.type.startswith("kubernetes.io/")
            ):
                results["secrets"].append(
                    {
                        "name": secret.metadata.name,
                        "namespace": secret.metadata.namespace,
                        "type": secret.type,
                        "creation_time": secret.metadata.creation_timestamp.strftime(
                            "%Y-%m-%d %H:%M:%S"
                        )
                        if secret.metadata.creation_timestamp
                        else None,
                    }
                )

        return json.dumps(results)
    except ApiException as e:
        return json.dumps({"error": str(e)}), 500


@mcp.tool()
def get_resource_yaml(namespace, resource_type, resource_name):
    """
    Retrieves the YAML configuration for a specified Kubernetes resource.

    Fetches the complete configuration of a resource, which can be useful for
    debugging, documentation, or backup purposes.

    Args:
        namespace (str): The Kubernetes namespace containing the resource.
        resource_type (str): The type of resource to retrieve.
            Supported types: 'pod', 'deployment', 'service', 'configmap',
            'secret', 'job'
        resource_name (str): The name of the specific resource to retrieve.

    Returns:
        str: YAML string representation of the resource configuration.

    Raises:
        ApiException: If there is an error communicating with the Kubernetes API
        ValueError: If an unsupported resource type is specified
    """
    try:
        resource_data = None

        if resource_type == "pod":
            resource_data = core_v1.read_namespaced_pod(resource_name, namespace)
        elif resource_type == "deployment":
            resource_data = apps_v1.read_namespaced_deployment(resource_name, namespace)
        elif resource_type == "service":
            resource_data = core_v1.read_namespaced_service(resource_name, namespace)
        elif resource_type == "configmap":
            resource_data = core_v1.read_namespaced_config_map(resource_name, namespace)
        elif resource_type == "secret":
            resource_data = core_v1.read_namespaced_secret(resource_name, namespace)
        elif resource_type == "job":
            resource_data = batch_v1.read_namespaced_job(resource_name, namespace)
        else:
            return json.dumps(
                {"error": f"Unsupported resource type: {resource_type}"}
            ), 400

        # Convert to dict and then to YAML
        resource_dict = client.ApiClient().sanitize_for_serialization(resource_data)
        yaml_str = yaml.dump(resource_dict, default_flow_style=False)

        return yaml_str
    except ApiException as e:
        return json.dumps({"error": str(e)}), 500


# Helper function to format bytes into human-readable format
def format_bytes(size):
    """
    Format bytes to human readable string.

    Converts a byte value to a human-readable string with appropriate
    units (B, KiB, MiB, GiB, TiB).

    Args:
        size (int): Size in bytes

    Returns:
        str: Human-readable string representation of the size
            (e.g., "2.5 MiB")
    """
    power = 2**10
    n = 0
    power_labels = {0: "B", 1: "KiB", 2: "MiB", 3: "GiB", 4: "TiB"}
    while size > power:
        size /= power
        n += 1
    return f"{round(size, 2)} {power_labels[n]}"


if __name__ == "__main__":
    # # Initialize and run the server
    mcp.run(transport="stdio")

```