#
tokens: 5610/50000 5/5 files
lines: on (toggle) GitHub
raw markdown copy reset
# Directory Structure

```
├── .gitignore
├── .python-version
├── LICENSE
├── pyproject.toml
├── README.md
├── server.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
1 | 3.13
2 | 
```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
  1 | # Byte-compiled / optimized / DLL files
  2 | __pycache__/
  3 | *.py[cod]
  4 | *$py.class
  5 | 
  6 | # C extensions
  7 | *.so
  8 | 
  9 | # Distribution / packaging
 10 | .Python
 11 | build/
 12 | develop-eggs/
 13 | dist/
 14 | downloads/
 15 | eggs/
 16 | .eggs/
 17 | lib/
 18 | lib64/
 19 | parts/
 20 | sdist/
 21 | var/
 22 | wheels/
 23 | share/python-wheels/
 24 | *.egg-info/
 25 | .installed.cfg
 26 | *.egg
 27 | MANIFEST
 28 | 
 29 | # PyInstaller
 30 | #  Usually these files are written by a python script from a template
 31 | #  before PyInstaller builds the exe, so as to inject date/other infos into it.
 32 | *.manifest
 33 | *.spec
 34 | 
 35 | # Installer logs
 36 | pip-log.txt
 37 | pip-delete-this-directory.txt
 38 | 
 39 | # Unit test / coverage reports
 40 | htmlcov/
 41 | .tox/
 42 | .nox/
 43 | .coverage
 44 | .coverage.*
 45 | .cache
 46 | nosetests.xml
 47 | coverage.xml
 48 | *.cover
 49 | *.py,cover
 50 | .hypothesis/
 51 | .pytest_cache/
 52 | cover/
 53 | 
 54 | # Translations
 55 | *.mo
 56 | *.pot
 57 | 
 58 | # Django stuff:
 59 | *.log
 60 | local_settings.py
 61 | db.sqlite3
 62 | db.sqlite3-journal
 63 | 
 64 | # Flask stuff:
 65 | instance/
 66 | .webassets-cache
 67 | 
 68 | # Scrapy stuff:
 69 | .scrapy
 70 | 
 71 | # Sphinx documentation
 72 | docs/_build/
 73 | 
 74 | # PyBuilder
 75 | .pybuilder/
 76 | target/
 77 | 
 78 | # Jupyter Notebook
 79 | .ipynb_checkpoints
 80 | 
 81 | # IPython
 82 | profile_default/
 83 | ipython_config.py
 84 | 
 85 | # pyenv
 86 | #   For a library or package, you might want to ignore these files since the code is
 87 | #   intended to run in multiple environments; otherwise, check them in:
 88 | # .python-version
 89 | 
 90 | # pipenv
 91 | #   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
 92 | #   However, in case of collaboration, if having platform-specific dependencies or dependencies
 93 | #   having no cross-platform support, pipenv may install dependencies that don't work, or not
 94 | #   install all needed dependencies.
 95 | #Pipfile.lock
 96 | 
 97 | # UV
 98 | #   Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
 99 | #   This is especially recommended for binary packages to ensure reproducibility, and is more
100 | #   commonly ignored for libraries.
101 | #uv.lock
102 | 
103 | # poetry
104 | #   Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
105 | #   This is especially recommended for binary packages to ensure reproducibility, and is more
106 | #   commonly ignored for libraries.
107 | #   https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
108 | #poetry.lock
109 | 
110 | # pdm
111 | #   Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
112 | #pdm.lock
113 | #   pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
114 | #   in version control.
115 | #   https://pdm.fming.dev/latest/usage/project/#working-with-version-control
116 | .pdm.toml
117 | .pdm-python
118 | .pdm-build/
119 | 
120 | # PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
121 | __pypackages__/
122 | 
123 | # Celery stuff
124 | celerybeat-schedule
125 | celerybeat.pid
126 | 
127 | # SageMath parsed files
128 | *.sage.py
129 | 
130 | # Environments
131 | .env
132 | .venv
133 | env/
134 | venv/
135 | ENV/
136 | env.bak/
137 | venv.bak/
138 | 
139 | # Spyder project settings
140 | .spyderproject
141 | .spyproject
142 | 
143 | # Rope project settings
144 | .ropeproject
145 | 
146 | # mkdocs documentation
147 | /site
148 | 
149 | # mypy
150 | .mypy_cache/
151 | .dmypy.json
152 | dmypy.json
153 | 
154 | # Pyre type checker
155 | .pyre/
156 | 
157 | # pytype static type analyzer
158 | .pytype/
159 | 
160 | # Cython debug symbols
161 | cython_debug/
162 | 
163 | # PyCharm
164 | #  JetBrains specific template is maintained in a separate JetBrains.gitignore that can
165 | #  be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
166 | #  and can be added to the global gitignore or merged into this file.  For a more nuclear
167 | #  option (not recommended) you can uncomment the following to ignore the entire idea folder.
168 | #.idea/
169 | 
170 | # Ruff stuff:
171 | .ruff_cache/
172 | 
173 | # PyPI configuration file
174 | .pypirc
175 | 
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
 1 | # kube-mcp
 2 | 
 3 | ## Get a Gemini APi Key
 4 | Goto https://aistudio.google.com/ and get yourself an API Key. Currently, gemini-2.0-pro-exp-02-05 LLM is available absolutely free of charge. Other models available for very cheap price also.
 5 | 
 6 | ## Install Codename Goose
 7 | Goose is an open source AI agent that supercharges your software development by automating coding tasks. We will use Codename Goose as it has a built in MCP client. Install Codename Goose by following the steps here https://block.github.io/goose/docs/getting-started/installation. Setup GOOGLE_API_KEY environment variable so that Goose knows to use Gemini API. Understand how to configure using `goose configure` and start as session using `goose session`. 
 8 | 
 9 | ## Develop MCP Server
10 | Read about MCP by reading the documentation : https://modelcontextprotocol.io/introduction and specifically the Python SDK : https://github.com/modelcontextprotocol/python-sdk
11 | Clone this repository and test it using `mcp dev server.py`. Note that this project uses `uv` package manager instead of pip. Learn about `uv` by reading docs : https://github.com/astral-sh/uv
12 | This project uses the kubernetes python client: https://github.com/kubernetes-client/python
13 | 
14 | ## Install Minikube
15 | Install minikube by following isntructions : https://minikube.sigs.k8s.io/docs/start/?arch=%2Flinux%2Fx86-64%2Fstable%2Fbinary+download
16 | Ensure that the config to the cluster is provided to the MCP server. Look at the `KubernetesManager` and `config.load_kube_config()` to understand how the config is loaded. 
17 | 
18 | ## Connect your MCP server to Codename Goose
19 | Add the MCP Server as an extension by reading the following docs : https://block.github.io/goose/docs/getting-started/using-extensions
20 | Start a new goose session using command `goose session --with-builtin developer --with-extension "uvx kube-mcp"` 
21 | 
22 | ## Make it all work
23 | Try giving a command in Goose and make it interact with Minikube using the MCP Server
24 | 
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
 1 | [project]
 2 | name = "kube-mcp"
 3 | version = "0.1.0"
 4 | description = "Add your description here"
 5 | readme = "README.md"
 6 | requires-python = ">=3.13"
 7 | dependencies = [
 8 |     "kubernetes>=32.0.1",
 9 |     "mcp[cli]>=1.4.1",
10 |     "ruff>=0.11.0",
11 | ]
12 | 
```

--------------------------------------------------------------------------------
/server.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | import asyncio
  3 | import json
  4 | import signal
  5 | from typing import Dict, List, Optional
  6 | from datetime import datetime
  7 | from kubernetes import client, config
  8 | from kubernetes.client import V1Pod, V1Container, V1PodSpec, V1ObjectMeta
  9 | from mcp.shared.exceptions import McpError
 10 | from mcp.server.fastmcp import FastMCP
 11 | from pydantic import BaseModel
 12 | 
 13 | 
 14 | # Resource tracking class
 15 | class ResourceTracker:
 16 |     def __init__(self, kind: str, name: str, namespace: str):
 17 |         self.kind = kind
 18 |         self.name = name
 19 |         self.namespace = namespace
 20 |         self.created_at = datetime.now()
 21 | 
 22 | 
 23 | # Kubernetes Manager
 24 | class KubernetesManager:
 25 |     def __init__(self):
 26 |         self.resources: List[ResourceTracker] = []
 27 |         config.load_kube_config()  # Load default kubeconfig
 28 |         self.core_api = client.CoreV1Api()
 29 |         self.apps_api = client.AppsV1Api()
 30 | 
 31 |         # Register signal handlers
 32 |         signal.signal(signal.SIGINT, lambda s, f: asyncio.create_task(self.cleanup()))
 33 |         signal.signal(signal.SIGTERM, lambda s, f: asyncio.create_task(self.cleanup()))
 34 | 
 35 |     async def cleanup(self):
 36 |         """Clean up all tracked resources in reverse order."""
 37 |         for resource in reversed(self.resources):
 38 |             try:
 39 |                 await self.delete_resource(
 40 |                     resource.kind, resource.name, resource.namespace
 41 |                 )
 42 |             except Exception as e:
 43 |                 print(f"Failed to delete {resource.kind} {resource.name}: {e}")
 44 |         self.resources.clear()
 45 | 
 46 |     def track_resource(self, kind: str, name: str, namespace: str):
 47 |         self.resources.append(ResourceTracker(kind, name, namespace))
 48 | 
 49 |     async def delete_resource(self, kind: str, name: str, namespace: str):
 50 |         kind = kind.lower()
 51 |         if kind == "pod":
 52 |             await asyncio.to_thread(
 53 |                 self.core_api.delete_namespaced_pod, name, namespace
 54 |             )
 55 |         elif kind == "deployment":
 56 |             await asyncio.to_thread(
 57 |                 self.apps_api.delete_namespaced_deployment, name, namespace
 58 |             )
 59 |         elif kind == "service":
 60 |             await asyncio.to_thread(
 61 |                 self.core_api.delete_namespaced_service, name, namespace
 62 |             )
 63 |         self.resources = [
 64 |             r
 65 |             for r in self.resources
 66 |             if not (r.kind == kind and r.name == name and r.namespace == namespace)
 67 |         ]
 68 | 
 69 |     def get_core_api(self):
 70 |         return self.core_api
 71 | 
 72 |     def get_apps_api(self):
 73 |         return self.apps_api
 74 | 
 75 | 
 76 | # Container templates
 77 | container_templates: Dict[str, V1Container] = {
 78 |     "ubuntu": V1Container(
 79 |         name="main",
 80 |         image="ubuntu:latest",
 81 |         command=["/bin/bash", "-c", "sleep infinity"],
 82 |         resources=client.V1ResourceRequirements(
 83 |             limits={"cpu": "200m", "memory": "256Mi"},
 84 |             requests={"cpu": "100m", "memory": "128Mi"},
 85 |         ),
 86 |         liveness_probe=client.V1Probe(
 87 |             _exec=client.V1ExecAction(command=["cat", "/proc/1/status"]),
 88 |             initial_delay_seconds=5,
 89 |             period_seconds=10,
 90 |         ),
 91 |     ),
 92 |     "nginx": V1Container(
 93 |         name="main",
 94 |         image="nginx:latest",
 95 |         ports=[client.V1ContainerPort(container_port=80)],
 96 |         resources=client.V1ResourceRequirements(
 97 |             limits={"cpu": "200m", "memory": "256Mi"},
 98 |             requests={"cpu": "100m", "memory": "128Mi"},
 99 |         ),
100 |         liveness_probe=client.V1Probe(
101 |             http_get=client.V1HTTPGetAction(path="/", port=80),
102 |             initial_delay_seconds=5,
103 |             period_seconds=10,
104 |         ),
105 |         readiness_probe=client.V1Probe(
106 |             http_get=client.V1HTTPGetAction(path="/", port=80),
107 |             initial_delay_seconds=2,
108 |             period_seconds=5,
109 |         ),
110 |     ),
111 |     "busybox": V1Container(
112 |         name="main",
113 |         image="busybox:latest",
114 |         command=["sh", "-c", "sleep infinity"],
115 |         resources=client.V1ResourceRequirements(
116 |             limits={"cpu": "100m", "memory": "64Mi"},
117 |             requests={"cpu": "50m", "memory": "32Mi"},
118 |         ),
119 |         liveness_probe=client.V1Probe(
120 |             _exec=client.V1ExecAction(command=["true"]),
121 |             period_seconds=10,
122 |         ),
123 |     ),
124 |     "alpine": V1Container(
125 |         name="main",
126 |         image="alpine:latest",
127 |         command=["sh", "-c", "sleep infinity"],
128 |         resources=client.V1ResourceRequirements(
129 |             limits={"cpu": "100m", "memory": "64Mi"},
130 |             requests={"cpu": "50m", "memory": "32Mi"},
131 |         ),
132 |         liveness_probe=client.V1Probe(
133 |             _exec=client.V1ExecAction(command=["true"]),
134 |             period_seconds=10,
135 |         ),
136 |     ),
137 | }
138 | 
139 | k8s_manager = KubernetesManager()
140 | 
141 | # FastMCP Server Setup
142 | mcp = FastMCP(name="kube-mcp")
143 | 
144 | 
145 | # Define Tool Input Schemas with Pydantic
146 | class ListPodsInput(BaseModel):
147 |     namespace: str = "default"
148 | 
149 | 
150 | class ListDeploymentsInput(BaseModel):
151 |     namespace: str = "default"
152 | 
153 | 
154 | class ListServicesInput(BaseModel):
155 |     namespace: str = "default"
156 | 
157 | 
158 | class CreatePodInput(BaseModel):
159 |     name: str
160 |     namespace: str
161 |     template: str  # Will validate against container_templates keys in the tool
162 |     command: Optional[List[str]] = None
163 | 
164 | 
165 | class DeletePodInput(BaseModel):
166 |     name: str
167 |     namespace: str
168 |     ignoreNotFound: bool = False
169 | 
170 | 
171 | class DescribePodInput(BaseModel):
172 |     name: str
173 |     namespace: str
174 | 
175 | 
176 | class GetLogsInput(BaseModel):
177 |     resourceType: str
178 |     name: Optional[str] = None
179 |     namespace: str = "default"
180 |     tail: Optional[int] = 100
181 | 
182 | 
183 | # Define Tools
184 | @mcp.tool()
185 | async def list_pods(input_data: ListPodsInput):
186 |     pods = await asyncio.to_thread(
187 |         k8s_manager.get_core_api().list_namespaced_pod, input_data.namespace
188 |     )
189 |     return [
190 |         {
191 |             "type": "text",
192 |             "text": json.dumps(
193 |                 {"pods": [pod.to_dict() for pod in pods.items]}, indent=2
194 |             ),
195 |         }
196 |     ]
197 | 
198 | 
199 | @mcp.tool()
200 | async def list_deployments(input_data: ListDeploymentsInput):
201 |     deployments = await asyncio.to_thread(
202 |         k8s_manager.get_apps_api().list_namespaced_deployment, input_data.namespace
203 |     )
204 |     return [
205 |         {
206 |             "type": "text",
207 |             "text": json.dumps(
208 |                 {"deployments": [d.to_dict() for d in deployments.items]}, indent=2
209 |             ),
210 |         }
211 |     ]
212 | 
213 | 
214 | @mcp.tool()
215 | async def list_services(input_data: ListServicesInput):
216 |     services = await asyncio.to_thread(
217 |         k8s_manager.get_core_api().list_namespaced_service, input_data.namespace
218 |     )
219 |     return [
220 |         {
221 |             "type": "text",
222 |             "text": json.dumps(
223 |                 {"services": [s.to_dict() for s in services.items]}, indent=2
224 |             ),
225 |         }
226 |     ]
227 | 
228 | 
229 | @mcp.tool()
230 | async def list_namespaces():
231 |     namespaces = await asyncio.to_thread(k8s_manager.get_core_api().list_namespace)
232 |     return [
233 |         {
234 |             "type": "text",
235 |             "text": json.dumps(
236 |                 {"namespaces": [n.to_dict() for n in namespaces.items]}, indent=2
237 |             ),
238 |         }
239 |     ]
240 | 
241 | 
242 | @mcp.tool()
243 | async def create_pod(input_data: CreatePodInput):
244 |     if input_data.template not in container_templates:
245 |         raise McpError(f"Invalid template: {input_data.template}")
246 |     container = container_templates[input_data.template]
247 |     if input_data.command:
248 |         container.command = input_data.command
249 |         container.args = None
250 |     pod = V1Pod(
251 |         api_version="v1",
252 |         kind="Pod",
253 |         metadata=V1ObjectMeta(
254 |             name=input_data.name,
255 |             namespace=input_data.namespace,
256 |             labels={"mcp-managed": "true", "app": input_data.name},
257 |         ),
258 |         spec=V1PodSpec(containers=[container]),
259 |     )
260 |     try:
261 |         response = await asyncio.to_thread(
262 |             k8s_manager.get_core_api().create_namespaced_pod, input_data.namespace, pod
263 |         )
264 |         k8s_manager.track_resource("Pod", input_data.name, input_data.namespace)
265 |         return [
266 |             {
267 |                 "type": "text",
268 |                 "text": json.dumps(
269 |                     {"podName": response.metadata.name, "status": "created"}, indent=2
270 |                 ),
271 |             }
272 |         ]
273 |     except client.exceptions.ApiException as e:
274 |         raise McpError(f"Failed to create pod: {e}")
275 | 
276 | 
277 | @mcp.tool()
278 | async def delete_pod(input_data: DeletePodInput):
279 |     try:
280 |         await asyncio.to_thread(
281 |             k8s_manager.get_core_api().delete_namespaced_pod,
282 |             input_data.name,
283 |             input_data.namespace,
284 |         )
285 |         return [
286 |             {
287 |                 "type": "text",
288 |                 "text": json.dumps({"success": True, "status": "deleted"}, indent=2),
289 |             }
290 |         ]
291 |     except client.exceptions.ApiException as e:
292 |         if input_data.ignoreNotFound and e.status == 404:
293 |             return [
294 |                 {
295 |                     "type": "text",
296 |                     "text": json.dumps(
297 |                         {"success": True, "status": "not_found"}, indent=2
298 |                     ),
299 |                 }
300 |             ]
301 |         raise McpError(f"Failed to delete pod: {e}")
302 | 
303 | 
304 | @mcp.tool()
305 | async def describe_pod(input_data: DescribePodInput):
306 |     try:
307 |         pod = await asyncio.to_thread(
308 |             k8s_manager.get_core_api().read_namespaced_pod,
309 |             input_data.name,
310 |             input_data.namespace,
311 |         )
312 |         return [{"type": "text", "text": json.dumps(pod.to_dict(), indent=2)}]
313 |     except client.exceptions.ApiException as e:
314 |         if e.status == 404:
315 |             raise McpError("Pod not found")
316 |         raise McpError(f"Failed to describe pod: {e}")
317 | 
318 | 
319 | @mcp.tool()
320 | async def cleanup():
321 |     await k8s_manager.cleanup()
322 |     return [{"type": "text", "text": json.dumps({"success": True}, indent=2)}]
323 | 
324 | 
325 | @mcp.tool()
326 | async def list_nodes():
327 |     nodes = await asyncio.to_thread(k8s_manager.get_core_api().list_node)
328 |     return [
329 |         {
330 |             "type": "text",
331 |             "text": json.dumps({"nodes": [n.to_dict() for n in nodes.items]}, indent=2),
332 |         }
333 |     ]
334 | 
335 | 
336 | @mcp.tool()
337 | async def get_logs(input_data: GetLogsInput):
338 |     if input_data.resourceType != "pod" or not input_data.name:
339 |         raise McpError("Only pod logs supported with a name")
340 |     try:
341 |         logs = await asyncio.to_thread(
342 |             k8s_manager.get_core_api().read_namespaced_pod_log,
343 |             input_data.name,
344 |             input_data.namespace,
345 |             tail_lines=input_data.tail,
346 |         )
347 |         return [
348 |             {
349 |                 "type": "text",
350 |                 "text": json.dumps({"logs": {input_data.name: logs}}, indent=2),
351 |             }
352 |         ]
353 |     except client.exceptions.ApiException as e:
354 |         raise McpError(f"Failed to get logs: {e}")
355 | 
356 | 
357 | @mcp.resource("k8s://namespaces")
358 | async def read_namespaces():
359 |     try:
360 |         api_call = k8s_manager.get_core_api().list_namespace
361 |         result = await asyncio.to_thread(api_call)
362 |         return [
363 |             {
364 |                 "uri": uri,
365 |                 "mimeType": "application/json",
366 |                 "text": json.dumps([item.to_dict() for item in result.items], indent=2),
367 |             }
368 |         ]
369 |     except McpError as e:
370 |         raise e
371 |     except Exception as e:
372 |         raise McpError(f"Failed to read resource: {e}")
373 | 
```