#
tokens: 4085/50000 5/5 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── .python-version
├── LICENSE
├── pyproject.toml
├── README.md
├── server.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.13

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
#  Usually these files are written by a python script from a template
#  before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
#   For a library or package, you might want to ignore these files since the code is
#   intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
#   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
#   However, in case of collaboration, if having platform-specific dependencies or dependencies
#   having no cross-platform support, pipenv may install dependencies that don't work, or not
#   install all needed dependencies.
#Pipfile.lock

# UV
#   Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
#   This is especially recommended for binary packages to ensure reproducibility, and is more
#   commonly ignored for libraries.
#uv.lock

# poetry
#   Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
#   This is especially recommended for binary packages to ensure reproducibility, and is more
#   commonly ignored for libraries.
#   https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock

# pdm
#   Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
#   pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
#   in version control.
#   https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/

# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# PyCharm
#  JetBrains specific template is maintained in a separate JetBrains.gitignore that can
#  be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
#  and can be added to the global gitignore or merged into this file.  For a more nuclear
#  option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

# Ruff stuff:
.ruff_cache/

# PyPI configuration file
.pypirc

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# kube-mcp

## Get a Gemini APi Key
Goto https://aistudio.google.com/ and get yourself an API Key. Currently, gemini-2.0-pro-exp-02-05 LLM is available absolutely free of charge. Other models available for very cheap price also.

## Install Codename Goose
Goose is an open source AI agent that supercharges your software development by automating coding tasks. We will use Codename Goose as it has a built in MCP client. Install Codename Goose by following the steps here https://block.github.io/goose/docs/getting-started/installation. Setup GOOGLE_API_KEY environment variable so that Goose knows to use Gemini API. Understand how to configure using `goose configure` and start as session using `goose session`. 

## Develop MCP Server
Read about MCP by reading the documentation : https://modelcontextprotocol.io/introduction and specifically the Python SDK : https://github.com/modelcontextprotocol/python-sdk
Clone this repository and test it using `mcp dev server.py`. Note that this project uses `uv` package manager instead of pip. Learn about `uv` by reading docs : https://github.com/astral-sh/uv
This project uses the kubernetes python client: https://github.com/kubernetes-client/python

## Install Minikube
Install minikube by following isntructions : https://minikube.sigs.k8s.io/docs/start/?arch=%2Flinux%2Fx86-64%2Fstable%2Fbinary+download
Ensure that the config to the cluster is provided to the MCP server. Look at the `KubernetesManager` and `config.load_kube_config()` to understand how the config is loaded. 

## Connect your MCP server to Codename Goose
Add the MCP Server as an extension by reading the following docs : https://block.github.io/goose/docs/getting-started/using-extensions
Start a new goose session using command `goose session --with-builtin developer --with-extension "uvx kube-mcp"` 

## Make it all work
Try giving a command in Goose and make it interact with Minikube using the MCP Server

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "kube-mcp"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
    "kubernetes>=32.0.1",
    "mcp[cli]>=1.4.1",
    "ruff>=0.11.0",
]

```

--------------------------------------------------------------------------------
/server.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
import asyncio
import json
import signal
from typing import Dict, List, Optional
from datetime import datetime
from kubernetes import client, config
from kubernetes.client import V1Pod, V1Container, V1PodSpec, V1ObjectMeta
from mcp.shared.exceptions import McpError
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel


# Resource tracking class
class ResourceTracker:
    def __init__(self, kind: str, name: str, namespace: str):
        self.kind = kind
        self.name = name
        self.namespace = namespace
        self.created_at = datetime.now()


# Kubernetes Manager
class KubernetesManager:
    def __init__(self):
        self.resources: List[ResourceTracker] = []
        config.load_kube_config()  # Load default kubeconfig
        self.core_api = client.CoreV1Api()
        self.apps_api = client.AppsV1Api()

        # Register signal handlers
        signal.signal(signal.SIGINT, lambda s, f: asyncio.create_task(self.cleanup()))
        signal.signal(signal.SIGTERM, lambda s, f: asyncio.create_task(self.cleanup()))

    async def cleanup(self):
        """Clean up all tracked resources in reverse order."""
        for resource in reversed(self.resources):
            try:
                await self.delete_resource(
                    resource.kind, resource.name, resource.namespace
                )
            except Exception as e:
                print(f"Failed to delete {resource.kind} {resource.name}: {e}")
        self.resources.clear()

    def track_resource(self, kind: str, name: str, namespace: str):
        self.resources.append(ResourceTracker(kind, name, namespace))

    async def delete_resource(self, kind: str, name: str, namespace: str):
        kind = kind.lower()
        if kind == "pod":
            await asyncio.to_thread(
                self.core_api.delete_namespaced_pod, name, namespace
            )
        elif kind == "deployment":
            await asyncio.to_thread(
                self.apps_api.delete_namespaced_deployment, name, namespace
            )
        elif kind == "service":
            await asyncio.to_thread(
                self.core_api.delete_namespaced_service, name, namespace
            )
        self.resources = [
            r
            for r in self.resources
            if not (r.kind == kind and r.name == name and r.namespace == namespace)
        ]

    def get_core_api(self):
        return self.core_api

    def get_apps_api(self):
        return self.apps_api


# Container templates
container_templates: Dict[str, V1Container] = {
    "ubuntu": V1Container(
        name="main",
        image="ubuntu:latest",
        command=["/bin/bash", "-c", "sleep infinity"],
        resources=client.V1ResourceRequirements(
            limits={"cpu": "200m", "memory": "256Mi"},
            requests={"cpu": "100m", "memory": "128Mi"},
        ),
        liveness_probe=client.V1Probe(
            _exec=client.V1ExecAction(command=["cat", "/proc/1/status"]),
            initial_delay_seconds=5,
            period_seconds=10,
        ),
    ),
    "nginx": V1Container(
        name="main",
        image="nginx:latest",
        ports=[client.V1ContainerPort(container_port=80)],
        resources=client.V1ResourceRequirements(
            limits={"cpu": "200m", "memory": "256Mi"},
            requests={"cpu": "100m", "memory": "128Mi"},
        ),
        liveness_probe=client.V1Probe(
            http_get=client.V1HTTPGetAction(path="/", port=80),
            initial_delay_seconds=5,
            period_seconds=10,
        ),
        readiness_probe=client.V1Probe(
            http_get=client.V1HTTPGetAction(path="/", port=80),
            initial_delay_seconds=2,
            period_seconds=5,
        ),
    ),
    "busybox": V1Container(
        name="main",
        image="busybox:latest",
        command=["sh", "-c", "sleep infinity"],
        resources=client.V1ResourceRequirements(
            limits={"cpu": "100m", "memory": "64Mi"},
            requests={"cpu": "50m", "memory": "32Mi"},
        ),
        liveness_probe=client.V1Probe(
            _exec=client.V1ExecAction(command=["true"]),
            period_seconds=10,
        ),
    ),
    "alpine": V1Container(
        name="main",
        image="alpine:latest",
        command=["sh", "-c", "sleep infinity"],
        resources=client.V1ResourceRequirements(
            limits={"cpu": "100m", "memory": "64Mi"},
            requests={"cpu": "50m", "memory": "32Mi"},
        ),
        liveness_probe=client.V1Probe(
            _exec=client.V1ExecAction(command=["true"]),
            period_seconds=10,
        ),
    ),
}

k8s_manager = KubernetesManager()

# FastMCP Server Setup
mcp = FastMCP(name="kube-mcp")


# Define Tool Input Schemas with Pydantic
class ListPodsInput(BaseModel):
    namespace: str = "default"


class ListDeploymentsInput(BaseModel):
    namespace: str = "default"


class ListServicesInput(BaseModel):
    namespace: str = "default"


class CreatePodInput(BaseModel):
    name: str
    namespace: str
    template: str  # Will validate against container_templates keys in the tool
    command: Optional[List[str]] = None


class DeletePodInput(BaseModel):
    name: str
    namespace: str
    ignoreNotFound: bool = False


class DescribePodInput(BaseModel):
    name: str
    namespace: str


class GetLogsInput(BaseModel):
    resourceType: str
    name: Optional[str] = None
    namespace: str = "default"
    tail: Optional[int] = 100


# Define Tools
@mcp.tool()
async def list_pods(input_data: ListPodsInput):
    pods = await asyncio.to_thread(
        k8s_manager.get_core_api().list_namespaced_pod, input_data.namespace
    )
    return [
        {
            "type": "text",
            "text": json.dumps(
                {"pods": [pod.to_dict() for pod in pods.items]}, indent=2
            ),
        }
    ]


@mcp.tool()
async def list_deployments(input_data: ListDeploymentsInput):
    deployments = await asyncio.to_thread(
        k8s_manager.get_apps_api().list_namespaced_deployment, input_data.namespace
    )
    return [
        {
            "type": "text",
            "text": json.dumps(
                {"deployments": [d.to_dict() for d in deployments.items]}, indent=2
            ),
        }
    ]


@mcp.tool()
async def list_services(input_data: ListServicesInput):
    services = await asyncio.to_thread(
        k8s_manager.get_core_api().list_namespaced_service, input_data.namespace
    )
    return [
        {
            "type": "text",
            "text": json.dumps(
                {"services": [s.to_dict() for s in services.items]}, indent=2
            ),
        }
    ]


@mcp.tool()
async def list_namespaces():
    namespaces = await asyncio.to_thread(k8s_manager.get_core_api().list_namespace)
    return [
        {
            "type": "text",
            "text": json.dumps(
                {"namespaces": [n.to_dict() for n in namespaces.items]}, indent=2
            ),
        }
    ]


@mcp.tool()
async def create_pod(input_data: CreatePodInput):
    if input_data.template not in container_templates:
        raise McpError(f"Invalid template: {input_data.template}")
    container = container_templates[input_data.template]
    if input_data.command:
        container.command = input_data.command
        container.args = None
    pod = V1Pod(
        api_version="v1",
        kind="Pod",
        metadata=V1ObjectMeta(
            name=input_data.name,
            namespace=input_data.namespace,
            labels={"mcp-managed": "true", "app": input_data.name},
        ),
        spec=V1PodSpec(containers=[container]),
    )
    try:
        response = await asyncio.to_thread(
            k8s_manager.get_core_api().create_namespaced_pod, input_data.namespace, pod
        )
        k8s_manager.track_resource("Pod", input_data.name, input_data.namespace)
        return [
            {
                "type": "text",
                "text": json.dumps(
                    {"podName": response.metadata.name, "status": "created"}, indent=2
                ),
            }
        ]
    except client.exceptions.ApiException as e:
        raise McpError(f"Failed to create pod: {e}")


@mcp.tool()
async def delete_pod(input_data: DeletePodInput):
    try:
        await asyncio.to_thread(
            k8s_manager.get_core_api().delete_namespaced_pod,
            input_data.name,
            input_data.namespace,
        )
        return [
            {
                "type": "text",
                "text": json.dumps({"success": True, "status": "deleted"}, indent=2),
            }
        ]
    except client.exceptions.ApiException as e:
        if input_data.ignoreNotFound and e.status == 404:
            return [
                {
                    "type": "text",
                    "text": json.dumps(
                        {"success": True, "status": "not_found"}, indent=2
                    ),
                }
            ]
        raise McpError(f"Failed to delete pod: {e}")


@mcp.tool()
async def describe_pod(input_data: DescribePodInput):
    try:
        pod = await asyncio.to_thread(
            k8s_manager.get_core_api().read_namespaced_pod,
            input_data.name,
            input_data.namespace,
        )
        return [{"type": "text", "text": json.dumps(pod.to_dict(), indent=2)}]
    except client.exceptions.ApiException as e:
        if e.status == 404:
            raise McpError("Pod not found")
        raise McpError(f"Failed to describe pod: {e}")


@mcp.tool()
async def cleanup():
    await k8s_manager.cleanup()
    return [{"type": "text", "text": json.dumps({"success": True}, indent=2)}]


@mcp.tool()
async def list_nodes():
    nodes = await asyncio.to_thread(k8s_manager.get_core_api().list_node)
    return [
        {
            "type": "text",
            "text": json.dumps({"nodes": [n.to_dict() for n in nodes.items]}, indent=2),
        }
    ]


@mcp.tool()
async def get_logs(input_data: GetLogsInput):
    if input_data.resourceType != "pod" or not input_data.name:
        raise McpError("Only pod logs supported with a name")
    try:
        logs = await asyncio.to_thread(
            k8s_manager.get_core_api().read_namespaced_pod_log,
            input_data.name,
            input_data.namespace,
            tail_lines=input_data.tail,
        )
        return [
            {
                "type": "text",
                "text": json.dumps({"logs": {input_data.name: logs}}, indent=2),
            }
        ]
    except client.exceptions.ApiException as e:
        raise McpError(f"Failed to get logs: {e}")


@mcp.resource("k8s://namespaces")
async def read_namespaces():
    try:
        api_call = k8s_manager.get_core_api().list_namespace
        result = await asyncio.to_thread(api_call)
        return [
            {
                "uri": uri,
                "mimeType": "application/json",
                "text": json.dumps([item.to_dict() for item in result.items], indent=2),
            }
        ]
    except McpError as e:
        raise e
    except Exception as e:
        raise McpError(f"Failed to read resource: {e}")

```