# Directory Structure ``` ├── .gitignore ├── .python-version ├── LICENSE ├── pyproject.toml ├── README.md ├── server.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 3.13 ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] *$py.class # C extensions *.so # Distribution / packaging .Python build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ wheels/ share/python-wheels/ *.egg-info/ .installed.cfg *.egg MANIFEST # PyInstaller # Usually these files are written by a python script from a template # before PyInstaller builds the exe, so as to inject date/other infos into it. *.manifest *.spec # Installer logs pip-log.txt pip-delete-this-directory.txt # Unit test / coverage reports htmlcov/ .tox/ .nox/ .coverage .coverage.* .cache nosetests.xml coverage.xml *.cover *.py,cover .hypothesis/ .pytest_cache/ cover/ # Translations *.mo *.pot # Django stuff: *.log local_settings.py db.sqlite3 db.sqlite3-journal # Flask stuff: instance/ .webassets-cache # Scrapy stuff: .scrapy # Sphinx documentation docs/_build/ # PyBuilder .pybuilder/ target/ # Jupyter Notebook .ipynb_checkpoints # IPython profile_default/ ipython_config.py # pyenv # For a library or package, you might want to ignore these files since the code is # intended to run in multiple environments; otherwise, check them in: # .python-version # pipenv # According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. # However, in case of collaboration, if having platform-specific dependencies or dependencies # having no cross-platform support, pipenv may install dependencies that don't work, or not # install all needed dependencies. #Pipfile.lock # UV # Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. # This is especially recommended for binary packages to ensure reproducibility, and is more # commonly ignored for libraries. #uv.lock # poetry # Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. # This is especially recommended for binary packages to ensure reproducibility, and is more # commonly ignored for libraries. # https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control #poetry.lock # pdm # Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. #pdm.lock # pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it # in version control. # https://pdm.fming.dev/latest/usage/project/#working-with-version-control .pdm.toml .pdm-python .pdm-build/ # PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm __pypackages__/ # Celery stuff celerybeat-schedule celerybeat.pid # SageMath parsed files *.sage.py # Environments .env .venv env/ venv/ ENV/ env.bak/ venv.bak/ # Spyder project settings .spyderproject .spyproject # Rope project settings .ropeproject # mkdocs documentation /site # mypy .mypy_cache/ .dmypy.json dmypy.json # Pyre type checker .pyre/ # pytype static type analyzer .pytype/ # Cython debug symbols cython_debug/ # PyCharm # JetBrains specific template is maintained in a separate JetBrains.gitignore that can # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ # Ruff stuff: .ruff_cache/ # PyPI configuration file .pypirc ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # kube-mcp ## Get a Gemini APi Key Goto https://aistudio.google.com/ and get yourself an API Key. Currently, gemini-2.0-pro-exp-02-05 LLM is available absolutely free of charge. Other models available for very cheap price also. ## Install Codename Goose Goose is an open source AI agent that supercharges your software development by automating coding tasks. We will use Codename Goose as it has a built in MCP client. Install Codename Goose by following the steps here https://block.github.io/goose/docs/getting-started/installation. Setup GOOGLE_API_KEY environment variable so that Goose knows to use Gemini API. Understand how to configure using `goose configure` and start as session using `goose session`. ## Develop MCP Server Read about MCP by reading the documentation : https://modelcontextprotocol.io/introduction and specifically the Python SDK : https://github.com/modelcontextprotocol/python-sdk Clone this repository and test it using `mcp dev server.py`. Note that this project uses `uv` package manager instead of pip. Learn about `uv` by reading docs : https://github.com/astral-sh/uv This project uses the kubernetes python client: https://github.com/kubernetes-client/python ## Install Minikube Install minikube by following isntructions : https://minikube.sigs.k8s.io/docs/start/?arch=%2Flinux%2Fx86-64%2Fstable%2Fbinary+download Ensure that the config to the cluster is provided to the MCP server. Look at the `KubernetesManager` and `config.load_kube_config()` to understand how the config is loaded. ## Connect your MCP server to Codename Goose Add the MCP Server as an extension by reading the following docs : https://block.github.io/goose/docs/getting-started/using-extensions Start a new goose session using command `goose session --with-builtin developer --with-extension "uvx kube-mcp"` ## Make it all work Try giving a command in Goose and make it interact with Minikube using the MCP Server ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "kube-mcp" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.13" dependencies = [ "kubernetes>=32.0.1", "mcp[cli]>=1.4.1", "ruff>=0.11.0", ] ``` -------------------------------------------------------------------------------- /server.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 import asyncio import json import signal from typing import Dict, List, Optional from datetime import datetime from kubernetes import client, config from kubernetes.client import V1Pod, V1Container, V1PodSpec, V1ObjectMeta from mcp.shared.exceptions import McpError from mcp.server.fastmcp import FastMCP from pydantic import BaseModel # Resource tracking class class ResourceTracker: def __init__(self, kind: str, name: str, namespace: str): self.kind = kind self.name = name self.namespace = namespace self.created_at = datetime.now() # Kubernetes Manager class KubernetesManager: def __init__(self): self.resources: List[ResourceTracker] = [] config.load_kube_config() # Load default kubeconfig self.core_api = client.CoreV1Api() self.apps_api = client.AppsV1Api() # Register signal handlers signal.signal(signal.SIGINT, lambda s, f: asyncio.create_task(self.cleanup())) signal.signal(signal.SIGTERM, lambda s, f: asyncio.create_task(self.cleanup())) async def cleanup(self): """Clean up all tracked resources in reverse order.""" for resource in reversed(self.resources): try: await self.delete_resource( resource.kind, resource.name, resource.namespace ) except Exception as e: print(f"Failed to delete {resource.kind} {resource.name}: {e}") self.resources.clear() def track_resource(self, kind: str, name: str, namespace: str): self.resources.append(ResourceTracker(kind, name, namespace)) async def delete_resource(self, kind: str, name: str, namespace: str): kind = kind.lower() if kind == "pod": await asyncio.to_thread( self.core_api.delete_namespaced_pod, name, namespace ) elif kind == "deployment": await asyncio.to_thread( self.apps_api.delete_namespaced_deployment, name, namespace ) elif kind == "service": await asyncio.to_thread( self.core_api.delete_namespaced_service, name, namespace ) self.resources = [ r for r in self.resources if not (r.kind == kind and r.name == name and r.namespace == namespace) ] def get_core_api(self): return self.core_api def get_apps_api(self): return self.apps_api # Container templates container_templates: Dict[str, V1Container] = { "ubuntu": V1Container( name="main", image="ubuntu:latest", command=["/bin/bash", "-c", "sleep infinity"], resources=client.V1ResourceRequirements( limits={"cpu": "200m", "memory": "256Mi"}, requests={"cpu": "100m", "memory": "128Mi"}, ), liveness_probe=client.V1Probe( _exec=client.V1ExecAction(command=["cat", "/proc/1/status"]), initial_delay_seconds=5, period_seconds=10, ), ), "nginx": V1Container( name="main", image="nginx:latest", ports=[client.V1ContainerPort(container_port=80)], resources=client.V1ResourceRequirements( limits={"cpu": "200m", "memory": "256Mi"}, requests={"cpu": "100m", "memory": "128Mi"}, ), liveness_probe=client.V1Probe( http_get=client.V1HTTPGetAction(path="/", port=80), initial_delay_seconds=5, period_seconds=10, ), readiness_probe=client.V1Probe( http_get=client.V1HTTPGetAction(path="/", port=80), initial_delay_seconds=2, period_seconds=5, ), ), "busybox": V1Container( name="main", image="busybox:latest", command=["sh", "-c", "sleep infinity"], resources=client.V1ResourceRequirements( limits={"cpu": "100m", "memory": "64Mi"}, requests={"cpu": "50m", "memory": "32Mi"}, ), liveness_probe=client.V1Probe( _exec=client.V1ExecAction(command=["true"]), period_seconds=10, ), ), "alpine": V1Container( name="main", image="alpine:latest", command=["sh", "-c", "sleep infinity"], resources=client.V1ResourceRequirements( limits={"cpu": "100m", "memory": "64Mi"}, requests={"cpu": "50m", "memory": "32Mi"}, ), liveness_probe=client.V1Probe( _exec=client.V1ExecAction(command=["true"]), period_seconds=10, ), ), } k8s_manager = KubernetesManager() # FastMCP Server Setup mcp = FastMCP(name="kube-mcp") # Define Tool Input Schemas with Pydantic class ListPodsInput(BaseModel): namespace: str = "default" class ListDeploymentsInput(BaseModel): namespace: str = "default" class ListServicesInput(BaseModel): namespace: str = "default" class CreatePodInput(BaseModel): name: str namespace: str template: str # Will validate against container_templates keys in the tool command: Optional[List[str]] = None class DeletePodInput(BaseModel): name: str namespace: str ignoreNotFound: bool = False class DescribePodInput(BaseModel): name: str namespace: str class GetLogsInput(BaseModel): resourceType: str name: Optional[str] = None namespace: str = "default" tail: Optional[int] = 100 # Define Tools @mcp.tool() async def list_pods(input_data: ListPodsInput): pods = await asyncio.to_thread( k8s_manager.get_core_api().list_namespaced_pod, input_data.namespace ) return [ { "type": "text", "text": json.dumps( {"pods": [pod.to_dict() for pod in pods.items]}, indent=2 ), } ] @mcp.tool() async def list_deployments(input_data: ListDeploymentsInput): deployments = await asyncio.to_thread( k8s_manager.get_apps_api().list_namespaced_deployment, input_data.namespace ) return [ { "type": "text", "text": json.dumps( {"deployments": [d.to_dict() for d in deployments.items]}, indent=2 ), } ] @mcp.tool() async def list_services(input_data: ListServicesInput): services = await asyncio.to_thread( k8s_manager.get_core_api().list_namespaced_service, input_data.namespace ) return [ { "type": "text", "text": json.dumps( {"services": [s.to_dict() for s in services.items]}, indent=2 ), } ] @mcp.tool() async def list_namespaces(): namespaces = await asyncio.to_thread(k8s_manager.get_core_api().list_namespace) return [ { "type": "text", "text": json.dumps( {"namespaces": [n.to_dict() for n in namespaces.items]}, indent=2 ), } ] @mcp.tool() async def create_pod(input_data: CreatePodInput): if input_data.template not in container_templates: raise McpError(f"Invalid template: {input_data.template}") container = container_templates[input_data.template] if input_data.command: container.command = input_data.command container.args = None pod = V1Pod( api_version="v1", kind="Pod", metadata=V1ObjectMeta( name=input_data.name, namespace=input_data.namespace, labels={"mcp-managed": "true", "app": input_data.name}, ), spec=V1PodSpec(containers=[container]), ) try: response = await asyncio.to_thread( k8s_manager.get_core_api().create_namespaced_pod, input_data.namespace, pod ) k8s_manager.track_resource("Pod", input_data.name, input_data.namespace) return [ { "type": "text", "text": json.dumps( {"podName": response.metadata.name, "status": "created"}, indent=2 ), } ] except client.exceptions.ApiException as e: raise McpError(f"Failed to create pod: {e}") @mcp.tool() async def delete_pod(input_data: DeletePodInput): try: await asyncio.to_thread( k8s_manager.get_core_api().delete_namespaced_pod, input_data.name, input_data.namespace, ) return [ { "type": "text", "text": json.dumps({"success": True, "status": "deleted"}, indent=2), } ] except client.exceptions.ApiException as e: if input_data.ignoreNotFound and e.status == 404: return [ { "type": "text", "text": json.dumps( {"success": True, "status": "not_found"}, indent=2 ), } ] raise McpError(f"Failed to delete pod: {e}") @mcp.tool() async def describe_pod(input_data: DescribePodInput): try: pod = await asyncio.to_thread( k8s_manager.get_core_api().read_namespaced_pod, input_data.name, input_data.namespace, ) return [{"type": "text", "text": json.dumps(pod.to_dict(), indent=2)}] except client.exceptions.ApiException as e: if e.status == 404: raise McpError("Pod not found") raise McpError(f"Failed to describe pod: {e}") @mcp.tool() async def cleanup(): await k8s_manager.cleanup() return [{"type": "text", "text": json.dumps({"success": True}, indent=2)}] @mcp.tool() async def list_nodes(): nodes = await asyncio.to_thread(k8s_manager.get_core_api().list_node) return [ { "type": "text", "text": json.dumps({"nodes": [n.to_dict() for n in nodes.items]}, indent=2), } ] @mcp.tool() async def get_logs(input_data: GetLogsInput): if input_data.resourceType != "pod" or not input_data.name: raise McpError("Only pod logs supported with a name") try: logs = await asyncio.to_thread( k8s_manager.get_core_api().read_namespaced_pod_log, input_data.name, input_data.namespace, tail_lines=input_data.tail, ) return [ { "type": "text", "text": json.dumps({"logs": {input_data.name: logs}}, indent=2), } ] except client.exceptions.ApiException as e: raise McpError(f"Failed to get logs: {e}") @mcp.resource("k8s://namespaces") async def read_namespaces(): try: api_call = k8s_manager.get_core_api().list_namespace result = await asyncio.to_thread(api_call) return [ { "uri": uri, "mimeType": "application/json", "text": json.dumps([item.to_dict() for item in result.items], indent=2), } ] except McpError as e: raise e except Exception as e: raise McpError(f"Failed to read resource: {e}") ```