This is page 1 of 4. Use http://codebase.md/manusa/kubernetes-mcp-server?page={x} to view the full context.
# Directory Structure
```
├── .github
│ ├── dependabot.yml
│ └── workflows
│ ├── build.yaml
│ ├── release-image.yml
│ └── release.yaml
├── .gitignore
├── AGENTS.md
├── build
│ ├── keycloak.mk
│ ├── kind.mk
│ └── tools.mk
├── CLAUDE.md
├── cmd
│ └── kubernetes-mcp-server
│ ├── main_test.go
│ └── main.go
├── dev
│ └── config
│ ├── cert-manager
│ │ └── selfsigned-issuer.yaml
│ ├── ingress
│ │ └── nginx-ingress.yaml
│ ├── keycloak
│ │ ├── client-scopes
│ │ │ ├── groups.json
│ │ │ ├── mcp-openshift.json
│ │ │ └── mcp-server.json
│ │ ├── clients
│ │ │ ├── mcp-client.json
│ │ │ ├── mcp-server-update.json
│ │ │ ├── mcp-server.json
│ │ │ └── openshift.json
│ │ ├── deployment.yaml
│ │ ├── ingress.yaml
│ │ ├── mappers
│ │ │ ├── groups-membership.json
│ │ │ ├── mcp-server-audience.json
│ │ │ ├── openshift-audience.json
│ │ │ └── username.json
│ │ ├── rbac.yaml
│ │ ├── realm
│ │ │ ├── realm-create.json
│ │ │ └── realm-events-config.json
│ │ └── users
│ │ └── mcp.json
│ └── kind
│ └── cluster.yaml
├── Dockerfile
├── docs
│ └── images
│ ├── kubernetes-mcp-server-github-copilot.jpg
│ └── vibe-coding.jpg
├── go.mod
├── go.sum
├── hack
│ └── generate-placeholder-ca.sh
├── internal
│ ├── test
│ │ ├── env.go
│ │ ├── kubernetes.go
│ │ ├── mcp.go
│ │ ├── mock_server.go
│ │ └── test.go
│ └── tools
│ └── update-readme
│ └── main.go
├── LICENSE
├── Makefile
├── npm
│ ├── kubernetes-mcp-server
│ │ ├── bin
│ │ │ └── index.js
│ │ └── package.json
│ ├── kubernetes-mcp-server-darwin-amd64
│ │ └── package.json
│ ├── kubernetes-mcp-server-darwin-arm64
│ │ └── package.json
│ ├── kubernetes-mcp-server-linux-amd64
│ │ └── package.json
│ ├── kubernetes-mcp-server-linux-arm64
│ │ └── package.json
│ ├── kubernetes-mcp-server-windows-amd64
│ │ └── package.json
│ └── kubernetes-mcp-server-windows-arm64
│ └── package.json
├── pkg
│ ├── api
│ │ ├── toolsets_test.go
│ │ └── toolsets.go
│ ├── config
│ │ ├── config_default_overrides.go
│ │ ├── config_default.go
│ │ ├── config_test.go
│ │ ├── config.go
│ │ ├── provider_config_test.go
│ │ └── provider_config.go
│ ├── helm
│ │ └── helm.go
│ ├── http
│ │ ├── authorization_test.go
│ │ ├── authorization.go
│ │ ├── http_test.go
│ │ ├── http.go
│ │ ├── middleware.go
│ │ ├── sts_test.go
│ │ ├── sts.go
│ │ └── wellknown.go
│ ├── kubernetes
│ │ ├── accesscontrol_clientset.go
│ │ ├── accesscontrol_restmapper.go
│ │ ├── accesscontrol.go
│ │ ├── common_test.go
│ │ ├── configuration.go
│ │ ├── events.go
│ │ ├── impersonate_roundtripper.go
│ │ ├── kubernetes_derived_test.go
│ │ ├── kubernetes.go
│ │ ├── manager_test.go
│ │ ├── manager.go
│ │ ├── namespaces.go
│ │ ├── nodes.go
│ │ ├── openshift.go
│ │ ├── pods.go
│ │ ├── provider_kubeconfig_test.go
│ │ ├── provider_kubeconfig.go
│ │ ├── provider_registry_test.go
│ │ ├── provider_registry.go
│ │ ├── provider_single_test.go
│ │ ├── provider_single.go
│ │ ├── provider_test.go
│ │ ├── provider.go
│ │ ├── resources.go
│ │ └── token.go
│ ├── kubernetes-mcp-server
│ │ └── cmd
│ │ ├── root_test.go
│ │ ├── root.go
│ │ └── testdata
│ │ ├── empty-config.toml
│ │ └── valid-config.toml
│ ├── mcp
│ │ ├── common_test.go
│ │ ├── configuration_test.go
│ │ ├── events_test.go
│ │ ├── helm_test.go
│ │ ├── m3labs.go
│ │ ├── mcp_middleware_test.go
│ │ ├── mcp_test.go
│ │ ├── mcp_tools_test.go
│ │ ├── mcp.go
│ │ ├── modules.go
│ │ ├── namespaces_test.go
│ │ ├── nodes_test.go
│ │ ├── pods_exec_test.go
│ │ ├── pods_test.go
│ │ ├── pods_top_test.go
│ │ ├── resources_test.go
│ │ ├── testdata
│ │ │ ├── helm-chart-no-op
│ │ │ │ └── Chart.yaml
│ │ │ ├── helm-chart-secret
│ │ │ │ ├── Chart.yaml
│ │ │ │ └── templates
│ │ │ │ └── secret.yaml
│ │ │ ├── toolsets-config-tools.json
│ │ │ ├── toolsets-core-tools.json
│ │ │ ├── toolsets-full-tools-multicluster-enum.json
│ │ │ ├── toolsets-full-tools-multicluster.json
│ │ │ ├── toolsets-full-tools-openshift.json
│ │ │ ├── toolsets-full-tools.json
│ │ │ └── toolsets-helm-tools.json
│ │ ├── tool_filter_test.go
│ │ ├── tool_filter.go
│ │ ├── tool_mutator_test.go
│ │ ├── tool_mutator.go
│ │ └── toolsets_test.go
│ ├── output
│ │ ├── output_test.go
│ │ └── output.go
│ ├── toolsets
│ │ ├── config
│ │ │ ├── configuration.go
│ │ │ └── toolset.go
│ │ ├── core
│ │ │ ├── events.go
│ │ │ ├── namespaces.go
│ │ │ ├── nodes.go
│ │ │ ├── pods.go
│ │ │ ├── resources.go
│ │ │ └── toolset.go
│ │ ├── helm
│ │ │ ├── helm.go
│ │ │ └── toolset.go
│ │ ├── toolsets_test.go
│ │ └── toolsets.go
│ └── version
│ └── version.go
├── python
│ ├── kubernetes_mcp_server
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ └── kubernetes_mcp_server.py
│ ├── pyproject.toml
│ └── README.md
├── README.md
└── smithery.yaml
```
# Files
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
_output/
.idea/
.vscode/
.docusaurus/
node_modules/
.npmrc
kubernetes-mcp-server
!cmd/kubernetes-mcp-server
!pkg/kubernetes-mcp-server
npm/kubernetes-mcp-server/README.md
npm/kubernetes-mcp-server/LICENSE
!npm/kubernetes-mcp-server
kubernetes-mcp-server-darwin-amd64
!npm/kubernetes-mcp-server-darwin-amd64/
kubernetes-mcp-server-darwin-arm64
!npm/kubernetes-mcp-server-darwin-arm64
kubernetes-mcp-server-linux-amd64
!npm/kubernetes-mcp-server-linux-amd64
kubernetes-mcp-server-linux-arm64
!npm/kubernetes-mcp-server-linux-arm64
kubernetes-mcp-server-windows-amd64.exe
kubernetes-mcp-server-windows-arm64.exe
python/.venv/
python/build/
python/dist/
python/kubernetes_mcp_server.egg-info/
!python/kubernetes-mcp-server
```
--------------------------------------------------------------------------------
/python/README.md:
--------------------------------------------------------------------------------
```markdown
../README.md
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# Kubernetes MCP Server
[](https://github.com/containers/kubernetes-mcp-server/blob/main/LICENSE)
[](https://www.npmjs.com/package/kubernetes-mcp-server)
[](https://pypi.org/project/kubernetes-mcp-server/)
[](https://github.com/containers/kubernetes-mcp-server/releases/latest)
[](https://github.com/containers/kubernetes-mcp-server/actions/workflows/build.yaml)
[✨ Features](#features) | [🚀 Getting Started](#getting-started) | [🎥 Demos](#demos) | [⚙️ Configuration](#configuration) | [🛠️ Tools](#tools-and-functionalities) | [🧑💻 Development](#development)
https://github.com/user-attachments/assets/be2b67b3-fc1c-4d11-ae46-93deba8ed98e
## ✨ Features <a id="features"></a>
A powerful and flexible Kubernetes [Model Context Protocol (MCP)](https://blog.marcnuri.com/model-context-protocol-mcp-introduction) server implementation with support for **Kubernetes** and **OpenShift**.
- **✅ Configuration**:
- Automatically detect changes in the Kubernetes configuration and update the MCP server.
- **View** and manage the current [Kubernetes `.kube/config`](https://blog.marcnuri.com/where-is-my-default-kubeconfig-file) or in-cluster configuration.
- **✅ Generic Kubernetes Resources**: Perform operations on **any** Kubernetes or OpenShift resource.
- Any CRUD operation (Create or Update, Get, List, Delete).
- **✅ Pods**: Perform Pod-specific operations.
- **List** pods in all namespaces or in a specific namespace.
- **Get** a pod by name from the specified namespace.
- **Delete** a pod by name from the specified namespace.
- **Show logs** for a pod by name from the specified namespace.
- **Top** gets resource usage metrics for all pods or a specific pod in the specified namespace.
- **Exec** into a pod and run a command.
- **Run** a container image in a pod and optionally expose it.
- **✅ Namespaces**: List Kubernetes Namespaces.
- **✅ Events**: View Kubernetes events in all namespaces or in a specific namespace.
- **✅ Projects**: List OpenShift Projects.
- **☸️ Helm**:
- **Install** a Helm chart in the current or provided namespace.
- **List** Helm releases in all namespaces or in a specific namespace.
- **Uninstall** a Helm release in the current or provided namespace.
Unlike other Kubernetes MCP server implementations, this **IS NOT** just a wrapper around `kubectl` or `helm` command-line tools.
It is a **Go-based native implementation** that interacts directly with the Kubernetes API server.
There is **NO NEED** for external dependencies or tools to be installed on the system.
If you're using the native binaries you don't need to have Node or Python installed on your system.
- **✅ Lightweight**: The server is distributed as a single native binary for Linux, macOS, and Windows.
- **✅ High-Performance / Low-Latency**: Directly interacts with the Kubernetes API server without the overhead of calling and waiting for external commands.
- **✅ Multi-Cluster**: Can interact with multiple Kubernetes clusters simultaneously (as defined in your kubeconfig files).
- **✅ Cross-Platform**: Available as a native binary for Linux, macOS, and Windows, as well as an npm package, a Python package, and container/Docker image.
- **✅ Configurable**: Supports [command-line arguments](#configuration) to configure the server behavior.
- **✅ Well tested**: The server has an extensive test suite to ensure its reliability and correctness across different Kubernetes environments.
## 🚀 Getting Started <a id="getting-started"></a>
### Requirements
- Access to a Kubernetes cluster.
### Claude Desktop
#### Using npx
If you have npm installed, this is the fastest way to get started with `kubernetes-mcp-server` on Claude Desktop.
Open your `claude_desktop_config.json` and add the mcp server to the list of `mcpServers`:
``` json
{
"mcpServers": {
"kubernetes": {
"command": "npx",
"args": [
"-y",
"kubernetes-mcp-server@latest"
]
}
}
}
```
### VS Code / VS Code Insiders
Install the Kubernetes MCP server extension in VS Code Insiders by pressing the following link:
[<img src="https://img.shields.io/badge/VS_Code-VS_Code?style=flat-square&label=Install%20Server&color=0098FF" alt="Install in VS Code">](https://insiders.vscode.dev/redirect?url=vscode%3Amcp%2Finstall%3F%257B%2522name%2522%253A%2522kubernetes%2522%252C%2522command%2522%253A%2522npx%2522%252C%2522args%2522%253A%255B%2522-y%2522%252C%2522kubernetes-mcp-server%2540latest%2522%255D%257D)
[<img alt="Install in VS Code Insiders" src="https://img.shields.io/badge/VS_Code_Insiders-VS_Code_Insiders?style=flat-square&label=Install%20Server&color=24bfa5">](https://insiders.vscode.dev/redirect?url=vscode-insiders%3Amcp%2Finstall%3F%257B%2522name%2522%253A%2522kubernetes%2522%252C%2522command%2522%253A%2522npx%2522%252C%2522args%2522%253A%255B%2522-y%2522%252C%2522kubernetes-mcp-server%2540latest%2522%255D%257D)
Alternatively, you can install the extension manually by running the following command:
```shell
# For VS Code
code --add-mcp '{"name":"kubernetes","command":"npx","args":["kubernetes-mcp-server@latest"]}'
# For VS Code Insiders
code-insiders --add-mcp '{"name":"kubernetes","command":"npx","args":["kubernetes-mcp-server@latest"]}'
```
### Cursor
Install the Kubernetes MCP server extension in Cursor by pressing the following link:
[](https://cursor.com/en/install-mcp?name=kubernetes-mcp-server&config=eyJjb21tYW5kIjoibnB4IC15IGt1YmVybmV0ZXMtbWNwLXNlcnZlckBsYXRlc3QifQ%3D%3D)
Alternatively, you can install the extension manually by editing the `mcp.json` file:
```json
{
"mcpServers": {
"kubernetes-mcp-server": {
"command": "npx",
"args": ["-y", "kubernetes-mcp-server@latest"]
}
}
}
```
### Goose CLI
[Goose CLI](https://blog.marcnuri.com/goose-on-machine-ai-agent-cli-introduction) is the easiest (and cheapest) way to get rolling with artificial intelligence (AI) agents.
#### Using npm
If you have npm installed, this is the fastest way to get started with `kubernetes-mcp-server`.
Open your goose `config.yaml` and add the mcp server to the list of `mcpServers`:
```yaml
extensions:
kubernetes:
command: npx
args:
- -y
- kubernetes-mcp-server@latest
```
## 🎥 Demos <a id="demos"></a>
### Diagnosing and automatically fixing an OpenShift Deployment
Demo showcasing how Kubernetes MCP server is leveraged by Claude Desktop to automatically diagnose and fix a deployment in OpenShift without any user assistance.
https://github.com/user-attachments/assets/a576176d-a142-4c19-b9aa-a83dc4b8d941
### _Vibe Coding_ a simple game and deploying it to OpenShift
In this demo, I walk you through the process of _Vibe Coding_ a simple game using VS Code and how to leverage [Podman MCP server](https://github.com/manusa/podman-mcp-server) and Kubernetes MCP server to deploy it to OpenShift.
<a href="https://www.youtube.com/watch?v=l05jQDSrzVI" target="_blank">
<img src="docs/images/vibe-coding.jpg" alt="Vibe Coding: Build & Deploy a Game on Kubernetes" width="240" />
</a>
### Supercharge GitHub Copilot with Kubernetes MCP Server in VS Code - One-Click Setup!
In this demo, I'll show you how to set up Kubernetes MCP server in VS code just by clicking a link.
<a href="https://youtu.be/AI4ljYMkgtA" target="_blank">
<img src="docs/images/kubernetes-mcp-server-github-copilot.jpg" alt="Supercharge GitHub Copilot with Kubernetes MCP Server in VS Code - One-Click Setup!" width="240" />
</a>
## ⚙️ Configuration <a id="configuration"></a>
The Kubernetes MCP server can be configured using command line (CLI) arguments.
You can run the CLI executable either by using `npx`, `uvx`, or by downloading the [latest release binary](https://github.com/containers/kubernetes-mcp-server/releases/latest).
```shell
# Run the Kubernetes MCP server using npx (in case you have npm and node installed)
npx kubernetes-mcp-server@latest --help
```
```shell
# Run the Kubernetes MCP server using uvx (in case you have uv and python installed)
uvx kubernetes-mcp-server@latest --help
```
```shell
# Run the Kubernetes MCP server using the latest release binary
./kubernetes-mcp-server --help
```
### Configuration Options
| Option | Description |
|---------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `--port` | Starts the MCP server in Streamable HTTP mode (path /mcp) and Server-Sent Event (SSE) (path /sse) mode and listens on the specified port . |
| `--log-level` | Sets the logging level (values [from 0-9](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-instrumentation/logging.md)). Similar to [kubectl logging levels](https://kubernetes.io/docs/reference/kubectl/quick-reference/#kubectl-output-verbosity-and-debugging). |
| `--kubeconfig` | Path to the Kubernetes configuration file. If not provided, it will try to resolve the configuration (in-cluster, default location, etc.). |
| `--list-output` | Output format for resource list operations (one of: yaml, table) (default "table") |
| `--read-only` | If set, the MCP server will run in read-only mode, meaning it will not allow any write operations (create, update, delete) on the Kubernetes cluster. This is useful for debugging or inspecting the cluster without making changes. |
| `--disable-destructive` | If set, the MCP server will disable all destructive operations (delete, update, etc.) on the Kubernetes cluster. This is useful for debugging or inspecting the cluster without accidentally making changes. This option has no effect when `--read-only` is used. |
| `--toolsets` | Comma-separated list of toolsets to enable. Check the [🛠️ Tools and Functionalities](#tools-and-functionalities) section for more information. |
| `--disable-multi-cluster` | If set, the MCP server will disable multi-cluster support and will only use the current context from the kubeconfig file. This is useful if you want to restrict the MCP server to a single cluster. |
## 🛠️ Tools and Functionalities <a id="tools-and-functionalities"></a>
The Kubernetes MCP server supports enabling or disabling specific groups of tools and functionalities (tools, resources, prompts, and so on) via the `--toolsets` command-line flag or `toolsets` configuration option.
This allows you to control which Kubernetes functionalities are available to your AI tools.
Enabling only the toolsets you need can help reduce the context size and improve the LLM's tool selection accuracy.
### Available Toolsets
The following sets of tools are available (all on by default):
<!-- AVAILABLE-TOOLSETS-START -->
| Toolset | Description |
|---------|-------------------------------------------------------------------------------------|
| config | View and manage the current local Kubernetes configuration (kubeconfig) |
| core | Most common tools for Kubernetes management (Pods, Generic Resources, Events, etc.) |
| helm | Tools for managing Helm charts and releases |
<!-- AVAILABLE-TOOLSETS-END -->
### Tools
In case multi-cluster support is enabled (default) and you have access to multiple clusters, all applicable tools will include an additional `context` argument to specify the Kubernetes context (cluster) to use for that operation.
<!-- AVAILABLE-TOOLSETS-TOOLS-START -->
<details>
<summary>config</summary>
- **configuration_contexts_list** - List all available context names and associated server urls from the kubeconfig file
- **configuration_view** - Get the current Kubernetes configuration content as a kubeconfig YAML
- `minified` (`boolean`) - Return a minified version of the configuration. If set to true, keeps only the current-context and the relevant pieces of the configuration for that context. If set to false, all contexts, clusters, auth-infos, and users are returned in the configuration. (Optional, default true)
</details>
<details>
<summary>core</summary>
- **events_list** - List all the Kubernetes events in the current cluster from all namespaces
- `namespace` (`string`) - Optional Namespace to retrieve the events from. If not provided, will list events from all namespaces
- **namespaces_list** - List all the Kubernetes namespaces in the current cluster
- **projects_list** - List all the OpenShift projects in the current cluster
- **pods_list** - List all the Kubernetes pods in the current cluster from all namespaces
- `labelSelector` (`string`) - Optional Kubernetes label selector (e.g. 'app=myapp,env=prod' or 'app in (myapp,yourapp)'), use this option when you want to filter the pods by label
- **pods_list_in_namespace** - List all the Kubernetes pods in the specified namespace in the current cluster
- `labelSelector` (`string`) - Optional Kubernetes label selector (e.g. 'app=myapp,env=prod' or 'app in (myapp,yourapp)'), use this option when you want to filter the pods by label
- `namespace` (`string`) **(required)** - Namespace to list pods from
- **pods_get** - Get a Kubernetes Pod in the current or provided namespace with the provided name
- `name` (`string`) **(required)** - Name of the Pod
- `namespace` (`string`) - Namespace to get the Pod from
- **pods_delete** - Delete a Kubernetes Pod in the current or provided namespace with the provided name
- `name` (`string`) **(required)** - Name of the Pod to delete
- `namespace` (`string`) - Namespace to delete the Pod from
- **pods_top** - List the resource consumption (CPU and memory) as recorded by the Kubernetes Metrics Server for the specified Kubernetes Pods in the all namespaces, the provided namespace, or the current namespace
- `all_namespaces` (`boolean`) - If true, list the resource consumption for all Pods in all namespaces. If false, list the resource consumption for Pods in the provided namespace or the current namespace
- `label_selector` (`string`) - Kubernetes label selector (e.g. 'app=myapp,env=prod' or 'app in (myapp,yourapp)'), use this option when you want to filter the pods by label (Optional, only applicable when name is not provided)
- `name` (`string`) - Name of the Pod to get the resource consumption from (Optional, all Pods in the namespace if not provided)
- `namespace` (`string`) - Namespace to get the Pods resource consumption from (Optional, current namespace if not provided and all_namespaces is false)
- **pods_exec** - Execute a command in a Kubernetes Pod in the current or provided namespace with the provided name and command
- `command` (`array`) **(required)** - Command to execute in the Pod container. The first item is the command to be run, and the rest are the arguments to that command. Example: ["ls", "-l", "/tmp"]
- `container` (`string`) - Name of the Pod container where the command will be executed (Optional)
- `name` (`string`) **(required)** - Name of the Pod where the command will be executed
- `namespace` (`string`) - Namespace of the Pod where the command will be executed
- **pods_log** - Get the logs of a Kubernetes Pod in the current or provided namespace with the provided name
- `container` (`string`) - Name of the Pod container to get the logs from (Optional)
- `name` (`string`) **(required)** - Name of the Pod to get the logs from
- `namespace` (`string`) - Namespace to get the Pod logs from
- `previous` (`boolean`) - Return previous terminated container logs (Optional)
- `tail` (`integer`) - Number of lines to retrieve from the end of the logs (Optional, default: 100)
- **pods_run** - Run a Kubernetes Pod in the current or provided namespace with the provided container image and optional name
- `image` (`string`) **(required)** - Container Image to run in the Pod
- `name` (`string`) - Name of the Pod (Optional, random name if not provided)
- `namespace` (`string`) - Namespace to run the Pod in
- `port` (`number`) - TCP/IP port to expose from the Pod container (Optional, no port exposed if not provided)
- **resources_list** - List Kubernetes resources and objects in the current cluster by providing their apiVersion and kind and optionally the namespace and label selector
(common apiVersion and kind include: v1 Pod, v1 Service, v1 Node, apps/v1 Deployment, networking.k8s.io/v1 Ingress, route.openshift.io/v1 Route)
- `apiVersion` (`string`) **(required)** - apiVersion of the resources (examples of valid apiVersion are: v1, apps/v1, networking.k8s.io/v1)
- `kind` (`string`) **(required)** - kind of the resources (examples of valid kind are: Pod, Service, Deployment, Ingress)
- `labelSelector` (`string`) - Optional Kubernetes label selector (e.g. 'app=myapp,env=prod' or 'app in (myapp,yourapp)'), use this option when you want to filter the pods by label
- `namespace` (`string`) - Optional Namespace to retrieve the namespaced resources from (ignored in case of cluster scoped resources). If not provided, will list resources from all namespaces
- **resources_get** - Get a Kubernetes resource in the current cluster by providing its apiVersion, kind, optionally the namespace, and its name
(common apiVersion and kind include: v1 Pod, v1 Service, v1 Node, apps/v1 Deployment, networking.k8s.io/v1 Ingress, route.openshift.io/v1 Route)
- `apiVersion` (`string`) **(required)** - apiVersion of the resource (examples of valid apiVersion are: v1, apps/v1, networking.k8s.io/v1)
- `kind` (`string`) **(required)** - kind of the resource (examples of valid kind are: Pod, Service, Deployment, Ingress)
- `name` (`string`) **(required)** - Name of the resource
- `namespace` (`string`) - Optional Namespace to retrieve the namespaced resource from (ignored in case of cluster scoped resources). If not provided, will get resource from configured namespace
- **resources_create_or_update** - Create or update a Kubernetes resource in the current cluster by providing a YAML or JSON representation of the resource
(common apiVersion and kind include: v1 Pod, v1 Service, v1 Node, apps/v1 Deployment, networking.k8s.io/v1 Ingress, route.openshift.io/v1 Route)
- `resource` (`string`) **(required)** - A JSON or YAML containing a representation of the Kubernetes resource. Should include top-level fields such as apiVersion,kind,metadata, and spec
- **resources_delete** - Delete a Kubernetes resource in the current cluster by providing its apiVersion, kind, optionally the namespace, and its name
(common apiVersion and kind include: v1 Pod, v1 Service, v1 Node, apps/v1 Deployment, networking.k8s.io/v1 Ingress, route.openshift.io/v1 Route)
- `apiVersion` (`string`) **(required)** - apiVersion of the resource (examples of valid apiVersion are: v1, apps/v1, networking.k8s.io/v1)
- `kind` (`string`) **(required)** - kind of the resource (examples of valid kind are: Pod, Service, Deployment, Ingress)
- `name` (`string`) **(required)** - Name of the resource
- `namespace` (`string`) - Optional Namespace to delete the namespaced resource from (ignored in case of cluster scoped resources). If not provided, will delete resource from configured namespace
</details>
<details>
<summary>helm</summary>
- **helm_install** - Install a Helm chart in the current or provided namespace
- `chart` (`string`) **(required)** - Chart reference to install (for example: stable/grafana, oci://ghcr.io/nginxinc/charts/nginx-ingress)
- `name` (`string`) - Name of the Helm release (Optional, random name if not provided)
- `namespace` (`string`) - Namespace to install the Helm chart in (Optional, current namespace if not provided)
- `values` (`object`) - Values to pass to the Helm chart (Optional)
- **helm_list** - List all the Helm releases in the current or provided namespace (or in all namespaces if specified)
- `all_namespaces` (`boolean`) - If true, lists all Helm releases in all namespaces ignoring the namespace argument (Optional)
- `namespace` (`string`) - Namespace to list Helm releases from (Optional, all namespaces if not provided)
- **helm_uninstall** - Uninstall a Helm release in the current or provided namespace
- `name` (`string`) **(required)** - Name of the Helm release to uninstall
- `namespace` (`string`) - Namespace to uninstall the Helm release from (Optional, current namespace if not provided)
</details>
<!-- AVAILABLE-TOOLSETS-TOOLS-END -->
## 🧑💻 Development <a id="development"></a>
### Running with mcp-inspector
Compile the project and run the Kubernetes MCP server with [mcp-inspector](https://modelcontextprotocol.io/docs/tools/inspector) to inspect the MCP server.
```shell
# Compile the project
make build
# Run the Kubernetes MCP server with mcp-inspector
npx @modelcontextprotocol/inspector@latest $(pwd)/kubernetes-mcp-server
```
```
--------------------------------------------------------------------------------
/CLAUDE.md:
--------------------------------------------------------------------------------
```markdown
AGENTS.md
```
--------------------------------------------------------------------------------
/AGENTS.md:
--------------------------------------------------------------------------------
```markdown
# Project Agents.md for Kubernetes MCP Server
This Agents.md file provides comprehensive guidance for AI assistants and coding agents (like Claude, Gemini, Cursor, and others) to work with this codebase.
This repository contains the kubernetes-mcp-server project,
a powerful Go-based Model Context Protocol (MCP) server that provides native Kubernetes and OpenShift cluster management capabilities without external dependencies.
This MCP server enables AI assistants (like Claude, Gemini, Cursor, and others) to interact with Kubernetes clusters using the Model Context Protocol (MCP).
## Project Structure and Repository layout
- Go package layout follows the standard Go conventions:
- `cmd/kubernetes-mcp-server/` – main application entry point using Cobra CLI framework.
- `pkg/` – libraries grouped by domain.
- `api/` - API-related functionality, tool definitions, and toolset interfaces.
- `config/` – configuration management.
- `helm/` - Helm chart operations integration.
- `http/` - HTTP server and authorization middleware.
- `kubernetes/` - Kubernetes client management, authentication, and access control.
- `mcp/` - Model Context Protocol (MCP) server implementation with tool registration and STDIO/HTTP support.
- `output/` - output formatting and rendering.
- `toolsets/` - Toolset registration and management for MCP tools.
- `version/` - Version information management.
- `.github/` – GitHub-related configuration (Actions workflows, issue templates...).
- `docs/` – documentation files.
- `npm/` – Node packages that wraps the compiled binaries for distribution through npmjs.com.
- `python/` – Python package providing a script that downloads the correct platform binary from the GitHub releases page and runs it for distribution through pypi.org.
- `Dockerfile` - container image description file to distribute the server as a container image.
- `Makefile` – tasks for building, formatting, linting and testing.
## Feature development
Implement new functionality in the Go sources under `cmd/` and `pkg/`.
The JavaScript (`npm/`) and Python (`python/`) directories only wrap the compiled binary for distribution (npm and PyPI).
Most changes will not require touching them unless the version or packaging needs to be updated.
### Adding new MCP tools
The project uses a toolset-based architecture for organizing MCP tools:
- **Tool definitions** are created in `pkg/api/` using the `ServerTool` struct.
- **Toolsets** group related tools together (e.g., config tools, core Kubernetes tools, Helm tools).
- **Registration** happens in `pkg/toolsets/` where toolsets are registered at initialization.
- Each toolset lives in its own subdirectory under `pkg/toolsets/` (e.g., `pkg/toolsets/config/`, `pkg/toolsets/core/`, `pkg/toolsets/helm/`).
When adding a new tool:
1. Define the tool handler function that implements the tool's logic.
2. Create a `ServerTool` struct with the tool definition and handler.
3. Add the tool to an appropriate toolset (or create a new toolset if needed).
4. Register the toolset in `pkg/toolsets/` if it's a new toolset.
## Building
Use the provided Makefile targets:
```bash
# Format source and build the binary
make build
# Build for all supported platforms
make build-all-platforms
```
`make build` will run `go fmt` and `go mod tidy` before compiling.
The resulting executable is `kubernetes-mcp-server`.
## Running
The README demonstrates running the server via
[`mcp-inspector`](https://modelcontextprotocol.io/docs/tools/inspector):
```bash
make build
npx @modelcontextprotocol/inspector@latest $(pwd)/kubernetes-mcp-server
```
To run the server locally, you can use `npx`, `uvx` or execute the binary directly:
```bash
# Using npx (Node.js package runner)
npx -y kubernetes-mcp-server@latest
# Using uvx (Python package runner)
uvx kubernetes-mcp-server@latest
# Binary execution
./kubernetes-mcp-server
```
This MCP server is designed to run both locally and remotely.
### Local Execution
When running locally, the server connects to a Kubernetes or OpenShift cluster using the kubeconfig file.
It reads the kubeconfig from the `--kubeconfig` flag, the `KUBECONFIG` environment variable, or defaults to `~/.kube/config`.
This means that `npx -y kubernetes-mcp-server@latest` on a workstation will talk to whatever cluster your current kubeconfig points to (e.g. a local Kind cluster).
### Remote Execution
When running remotely, the server can be deployed as a container image in a Kubernetes or OpenShift cluster.
The server can be run as a Deployment, StatefulSet, or any other Kubernetes resource that suits your needs.
The server will automatically use the in-cluster configuration to connect to the Kubernetes API server.
## Tests
Run all Go tests with:
```bash
make test
```
The test suite relies on the `setup-envtest` tooling from `sigs.k8s.io/controller-runtime`.
The first run downloads a Kubernetes `envtest` environment from the internet, so network access is required.
Without it some tests will fail during setup.
## Linting
Static analysis is performed with `golangci-lint`:
```bash
make lint
```
The `lint` target downloads the specified `golangci-lint` version if it is not already present under `_output/tools/bin/`.
## Additional Makefile targets
Beyond the basic build, test, and lint targets, the Makefile provides additional utilities:
**Local Development:**
```bash
# Setup a complete local development environment with Kind cluster
make local-env-setup
# Tear down the local Kind cluster
make local-env-teardown
# Show Keycloak status and connection info (for OIDC testing)
make keycloak-status
# Tail Keycloak logs
make keycloak-logs
# Install required development tools (like Kind) to ./_output/bin/
make tools
```
**Distribution and Publishing:**
```bash
# Copy compiled binaries to each npm package
make npm-copy-binaries
# Publish the npm packages
make npm-publish
# Publish the Python packages
make python-publish
# Update README.md with the latest toolsets
make update-readme-tools
```
Run `make help` to see all available targets with descriptions.
## Dependencies
When introducing new modules run `make tidy` so that `go.mod` and `go.sum` remain tidy.
## Coding style
- Go modules target Go **1.24** (see `go.mod`).
- Tests are written with the standard library `testing` package.
- Build, test and lint steps are defined in the Makefile—keep them working.
## Distribution Methods
The server is distributed as a binary executable, a Docker image, an npm package, and a Python package.
- **Native binaries** for Linux, macOS, and Windows are available in the GitHub releases.
- A **container image** (Docker) is built and pushed to the `quay.io/manusa/kubernetes_mcp_server` repository.
- An **npm** package is available at [npmjs.com](https://www.npmjs.com/package/kubernetes-mcp-server).
It wraps the platform-specific binary and provides a convenient way to run the server using `npx`.
- A **Python** package is available at [pypi.org](https://pypi.org/project/kubernetes-mcp-server/).
It provides a script that downloads the correct platform binary from the GitHub releases page and runs it.
It provides a convenient way to run the server using `uvx` or `python -m kubernetes_mcp_server`.
```
--------------------------------------------------------------------------------
/pkg/kubernetes-mcp-server/cmd/testdata/empty-config.toml:
--------------------------------------------------------------------------------
```toml
```
--------------------------------------------------------------------------------
/pkg/mcp/testdata/helm-chart-no-op/Chart.yaml:
--------------------------------------------------------------------------------
```yaml
apiVersion: v1
name: no-op
version: 1.33.7
```
--------------------------------------------------------------------------------
/dev/config/keycloak/realm/realm-create.json:
--------------------------------------------------------------------------------
```json
{
"realm": "openshift",
"enabled": true
}
```
--------------------------------------------------------------------------------
/pkg/mcp/testdata/helm-chart-secret/Chart.yaml:
--------------------------------------------------------------------------------
```yaml
apiVersion: v2
name: secret-chart
version: 0.1.0
type: application
```
--------------------------------------------------------------------------------
/python/kubernetes_mcp_server/__main__.py:
--------------------------------------------------------------------------------
```python
from .kubernetes_mcp_server import main
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/.github/dependabot.yml:
--------------------------------------------------------------------------------
```yaml
version: 2
updates:
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "daily"
open-pull-requests-limit: 10
```
--------------------------------------------------------------------------------
/pkg/version/version.go:
--------------------------------------------------------------------------------
```go
package version
var CommitHash = "unknown"
var BuildTime = "1970-01-01T00:00:00Z"
var Version = "0.0.0"
var BinaryName = "kubernetes-mcp-server"
```
--------------------------------------------------------------------------------
/python/kubernetes_mcp_server/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Kubernetes MCP Server (Model Context Protocol) with special support for OpenShift.
"""
from .kubernetes_mcp_server import main
__all__ = ['main']
```
--------------------------------------------------------------------------------
/dev/config/keycloak/client-scopes/groups.json:
--------------------------------------------------------------------------------
```json
{
"name": "groups",
"protocol": "openid-connect",
"attributes": {
"display.on.consent.screen": "false",
"include.in.token.scope": "true"
}
}
```
--------------------------------------------------------------------------------
/dev/config/keycloak/client-scopes/mcp-server.json:
--------------------------------------------------------------------------------
```json
{
"name": "mcp-server",
"protocol": "openid-connect",
"attributes": {
"display.on.consent.screen": "false",
"include.in.token.scope": "true"
}
}
```
--------------------------------------------------------------------------------
/dev/config/keycloak/client-scopes/mcp-openshift.json:
--------------------------------------------------------------------------------
```json
{
"name": "mcp:openshift",
"protocol": "openid-connect",
"attributes": {
"display.on.consent.screen": "false",
"include.in.token.scope": "true"
}
}
```
--------------------------------------------------------------------------------
/dev/config/keycloak/realm/realm-events-config.json:
--------------------------------------------------------------------------------
```json
{
"realm": "openshift",
"enabled": true,
"eventsEnabled": true,
"eventsListeners": ["jboss-logging"],
"adminEventsEnabled": true,
"adminEventsDetailsEnabled": true
}
```
--------------------------------------------------------------------------------
/cmd/kubernetes-mcp-server/main_test.go:
--------------------------------------------------------------------------------
```go
package main
import (
"os"
)
func Example_version() {
oldArgs := os.Args
defer func() { os.Args = oldArgs }()
os.Args = []string{"kubernetes-mcp-server", "--version"}
main()
// Output: 0.0.0
}
```
--------------------------------------------------------------------------------
/pkg/config/config_default_overrides.go:
--------------------------------------------------------------------------------
```go
package config
func defaultOverrides() StaticConfig {
return StaticConfig{
// IMPORTANT: this file is used to override default config values in downstream builds.
// This is intentionally left blank.
}
}
```
--------------------------------------------------------------------------------
/internal/test/env.go:
--------------------------------------------------------------------------------
```go
package test
import (
"os"
"strings"
)
func RestoreEnv(originalEnv []string) {
os.Clearenv()
for _, env := range originalEnv {
if key, value, found := strings.Cut(env, "="); found {
_ = os.Setenv(key, value)
}
}
}
```
--------------------------------------------------------------------------------
/pkg/mcp/modules.go:
--------------------------------------------------------------------------------
```go
package mcp
import _ "github.com/containers/kubernetes-mcp-server/pkg/toolsets/config"
import _ "github.com/containers/kubernetes-mcp-server/pkg/toolsets/core"
import _ "github.com/containers/kubernetes-mcp-server/pkg/toolsets/helm"
```
--------------------------------------------------------------------------------
/dev/config/keycloak/mappers/openshift-audience.json:
--------------------------------------------------------------------------------
```json
{
"name": "openshift-audience",
"protocol": "openid-connect",
"protocolMapper": "oidc-audience-mapper",
"config": {
"included.client.audience": "openshift",
"id.token.claim": "true",
"access.token.claim": "true"
}
}
```
--------------------------------------------------------------------------------
/dev/config/keycloak/mappers/mcp-server-audience.json:
--------------------------------------------------------------------------------
```json
{
"name": "mcp-server-audience",
"protocol": "openid-connect",
"protocolMapper": "oidc-audience-mapper",
"config": {
"included.client.audience": "mcp-server",
"id.token.claim": "true",
"access.token.claim": "true"
}
}
```
--------------------------------------------------------------------------------
/pkg/kubernetes/token.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import (
"context"
authenticationv1api "k8s.io/api/authentication/v1"
)
type TokenVerifier interface {
VerifyToken(ctx context.Context, cluster, token, audience string) (*authenticationv1api.UserInfo, []string, error)
}
```
--------------------------------------------------------------------------------
/dev/config/keycloak/users/mcp.json:
--------------------------------------------------------------------------------
```json
{
"username": "mcp",
"email": "[email protected]",
"firstName": "MCP",
"lastName": "User",
"enabled": true,
"emailVerified": true,
"credentials": [
{
"type": "password",
"value": "mcp",
"temporary": false
}
]
}
```
--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------
```yaml
# Smithery.ai configuration https://smithery.ai/docs/config#smitheryyaml
startCommand:
type: stdio
configSchema:
{}
commandFunction:
|-
(config) => ({
"command": "npx",
"args": [
"-y", "kubernetes-mcp-server@latest"
]
})
```
--------------------------------------------------------------------------------
/pkg/mcp/testdata/helm-chart-secret/templates/secret.yaml:
--------------------------------------------------------------------------------
```yaml
apiVersion: v1
kind: Secret
metadata:
name: {{ .Release.Name }}-secret
labels:
app.kubernetes.io/managed-by: {{ .Release.Service }}
app.kubernetes.io/instance: {{ .Release.Name }}
type: Opaque
data:
username: {{ b64enc "aitana" }}
password: {{ b64enc "alex" }}
```
--------------------------------------------------------------------------------
/dev/config/keycloak/mappers/groups-membership.json:
--------------------------------------------------------------------------------
```json
{
"name": "groups",
"protocol": "openid-connect",
"protocolMapper": "oidc-group-membership-mapper",
"config": {
"claim.name": "groups",
"full.path": "false",
"id.token.claim": "true",
"access.token.claim": "true",
"userinfo.token.claim": "true"
}
}
```
--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
FROM golang:latest AS builder
WORKDIR /app
COPY ./ ./
RUN make build
FROM registry.access.redhat.com/ubi9/ubi-minimal:latest
WORKDIR /app
COPY --from=builder /app/kubernetes-mcp-server /app/kubernetes-mcp-server
USER 65532:65532
ENTRYPOINT ["/app/kubernetes-mcp-server", "--port", "8080"]
EXPOSE 8080
```
--------------------------------------------------------------------------------
/npm/kubernetes-mcp-server-linux-amd64/package.json:
--------------------------------------------------------------------------------
```json
{
"name": "kubernetes-mcp-server-linux-amd64",
"version": "0.0.0",
"description": "Model Context Protocol (MCP) server for Kubernetes and OpenShift",
"repository": {
"type": "git",
"url": "git+https://github.com/containers/kubernetes-mcp-server.git"
},
"os": [
"linux"
],
"cpu": [
"x64"
]
}
```
--------------------------------------------------------------------------------
/npm/kubernetes-mcp-server-darwin-amd64/package.json:
--------------------------------------------------------------------------------
```json
{
"name": "kubernetes-mcp-server-darwin-amd64",
"version": "0.0.0",
"description": "Model Context Protocol (MCP) server for Kubernetes and OpenShift",
"repository": {
"type": "git",
"url": "git+https://github.com/containers/kubernetes-mcp-server.git"
},
"os": [
"darwin"
],
"cpu": [
"x64"
]
}
```
--------------------------------------------------------------------------------
/npm/kubernetes-mcp-server-linux-arm64/package.json:
--------------------------------------------------------------------------------
```json
{
"name": "kubernetes-mcp-server-linux-arm64",
"version": "0.0.0",
"description": "Model Context Protocol (MCP) server for Kubernetes and OpenShift",
"repository": {
"type": "git",
"url": "git+https://github.com/containers/kubernetes-mcp-server.git"
},
"os": [
"linux"
],
"cpu": [
"arm64"
]
}
```
--------------------------------------------------------------------------------
/npm/kubernetes-mcp-server-windows-amd64/package.json:
--------------------------------------------------------------------------------
```json
{
"name": "kubernetes-mcp-server-windows-amd64",
"version": "0.0.0",
"description": "Model Context Protocol (MCP) server for Kubernetes and OpenShift",
"repository": {
"type": "git",
"url": "git+https://github.com/containers/kubernetes-mcp-server.git"
},
"os": [
"win32"
],
"cpu": [
"x64"
]
}
```
--------------------------------------------------------------------------------
/npm/kubernetes-mcp-server-darwin-arm64/package.json:
--------------------------------------------------------------------------------
```json
{
"name": "kubernetes-mcp-server-darwin-arm64",
"version": "0.0.0",
"description": "Model Context Protocol (MCP) server for Kubernetes and OpenShift",
"repository": {
"type": "git",
"url": "git+https://github.com/containers/kubernetes-mcp-server.git"
},
"os": [
"darwin"
],
"cpu": [
"arm64"
]
}
```
--------------------------------------------------------------------------------
/npm/kubernetes-mcp-server-windows-arm64/package.json:
--------------------------------------------------------------------------------
```json
{
"name": "kubernetes-mcp-server-windows-arm64",
"version": "0.0.0",
"description": "Model Context Protocol (MCP) server for Kubernetes and OpenShift",
"repository": {
"type": "git",
"url": "git+https://github.com/containers/kubernetes-mcp-server.git"
},
"os": [
"win32"
],
"cpu": [
"arm64"
]
}
```
--------------------------------------------------------------------------------
/dev/config/keycloak/mappers/username.json:
--------------------------------------------------------------------------------
```json
{
"name": "username",
"protocol": "openid-connect",
"protocolMapper": "oidc-usermodel-property-mapper",
"config": {
"userinfo.token.claim": "true",
"user.attribute": "username",
"id.token.claim": "true",
"access.token.claim": "true",
"claim.name": "preferred_username",
"jsonType.label": "String"
}
}
```
--------------------------------------------------------------------------------
/dev/config/keycloak/clients/openshift.json:
--------------------------------------------------------------------------------
```json
{
"clientId": "openshift",
"enabled": true,
"publicClient": false,
"standardFlowEnabled": true,
"directAccessGrantsEnabled": true,
"serviceAccountsEnabled": true,
"authorizationServicesEnabled": false,
"redirectUris": ["*"],
"webOrigins": ["*"],
"defaultClientScopes": ["profile", "email", "groups"],
"optionalClientScopes": []
}
```
--------------------------------------------------------------------------------
/dev/config/keycloak/clients/mcp-client.json:
--------------------------------------------------------------------------------
```json
{
"clientId": "mcp-client",
"enabled": true,
"publicClient": true,
"standardFlowEnabled": true,
"directAccessGrantsEnabled": true,
"serviceAccountsEnabled": false,
"authorizationServicesEnabled": false,
"redirectUris": ["*"],
"webOrigins": ["*"],
"defaultClientScopes": ["profile", "email"],
"optionalClientScopes": ["mcp-server"]
}
```
--------------------------------------------------------------------------------
/internal/test/test.go:
--------------------------------------------------------------------------------
```go
package test
import (
"os"
"path/filepath"
"runtime"
)
func Must[T any](v T, err error) T {
if err != nil {
panic(err)
}
return v
}
func ReadFile(path ...string) string {
_, file, _, _ := runtime.Caller(1)
filePath := filepath.Join(append([]string{filepath.Dir(file)}, path...)...)
fileBytes := Must(os.ReadFile(filePath))
return string(fileBytes)
}
```
--------------------------------------------------------------------------------
/pkg/kubernetes/common_test.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import (
"os"
"testing"
)
func TestMain(m *testing.M) {
// Set up
_ = os.Setenv("KUBECONFIG", "/dev/null") // Avoid interference from existing kubeconfig
_ = os.Setenv("KUBERNETES_SERVICE_HOST", "") // Avoid interference from in-cluster config
_ = os.Setenv("KUBERNETES_SERVICE_PORT", "") // Avoid interference from in-cluster config
// Run tests
code := m.Run()
// Tear down
os.Exit(code)
}
```
--------------------------------------------------------------------------------
/pkg/kubernetes/openshift.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import (
"context"
"k8s.io/apimachinery/pkg/runtime/schema"
)
type Openshift interface {
IsOpenShift(context.Context) bool
}
func (m *Manager) IsOpenShift(_ context.Context) bool {
// This method should be fast and not block (it's called at startup)
_, err := m.discoveryClient.ServerResourcesForGroupVersion(schema.GroupVersion{
Group: "project.openshift.io",
Version: "v1",
}.String())
return err == nil
}
```
--------------------------------------------------------------------------------
/cmd/kubernetes-mcp-server/main.go:
--------------------------------------------------------------------------------
```go
package main
import (
"os"
"github.com/spf13/pflag"
"k8s.io/cli-runtime/pkg/genericiooptions"
"github.com/containers/kubernetes-mcp-server/pkg/kubernetes-mcp-server/cmd"
)
func main() {
flags := pflag.NewFlagSet("kubernetes-mcp-server", pflag.ExitOnError)
pflag.CommandLine = flags
root := cmd.NewMCPServer(genericiooptions.IOStreams{In: os.Stdin, Out: os.Stdout, ErrOut: os.Stderr})
if err := root.Execute(); err != nil {
os.Exit(1)
}
}
```
--------------------------------------------------------------------------------
/pkg/kubernetes/impersonate_roundtripper.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import "net/http"
// nolint:unused
type impersonateRoundTripper struct {
delegate http.RoundTripper
}
// nolint:unused
func (irt *impersonateRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
// TODO: Solution won't work with discoveryclient which uses context.TODO() instead of the passed-in context
if v, ok := req.Context().Value(OAuthAuthorizationHeader).(string); ok {
req.Header.Set("Authorization", v)
}
return irt.delegate.RoundTrip(req)
}
```
--------------------------------------------------------------------------------
/pkg/kubernetes-mcp-server/cmd/testdata/valid-config.toml:
--------------------------------------------------------------------------------
```toml
log_level = 1
port = "9999"
kubeconfig = "test"
list_output = "yaml"
read_only = true
disable_destructive = true
denied_resources = [
{group = "apps", version = "v1", kind = "Deployment"},
{group = "rbac.authorization.k8s.io", version = "v1", kind = "Role"}
]
enabled_tools = ["configuration_view", "events_list", "namespaces_list", "pods_list", "resources_list", "resources_get", "resources_create_or_update", "resources_delete"]
disabled_tools = ["pods_delete", "pods_top", "pods_log", "pods_run", "pods_exec"]
```
--------------------------------------------------------------------------------
/internal/test/kubernetes.go:
--------------------------------------------------------------------------------
```go
package test
import (
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)
func KubeConfigFake() *clientcmdapi.Config {
fakeConfig := clientcmdapi.NewConfig()
fakeConfig.Clusters["fake"] = clientcmdapi.NewCluster()
fakeConfig.Clusters["fake"].Server = "https://127.0.0.1:6443"
fakeConfig.AuthInfos["fake"] = clientcmdapi.NewAuthInfo()
fakeConfig.Contexts["fake-context"] = clientcmdapi.NewContext()
fakeConfig.Contexts["fake-context"].Cluster = "fake"
fakeConfig.Contexts["fake-context"].AuthInfo = "fake"
fakeConfig.CurrentContext = "fake-context"
return fakeConfig
}
```
--------------------------------------------------------------------------------
/dev/config/cert-manager/selfsigned-issuer.yaml:
--------------------------------------------------------------------------------
```yaml
apiVersion: cert-manager.io/v1
kind: ClusterIssuer
metadata:
name: selfsigned-issuer
spec:
selfSigned: {}
---
apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
name: selfsigned-ca
namespace: cert-manager
spec:
isCA: true
commonName: selfsigned-ca
secretName: selfsigned-ca-secret
privateKey:
algorithm: ECDSA
size: 256
issuerRef:
name: selfsigned-issuer
kind: ClusterIssuer
group: cert-manager.io
---
apiVersion: cert-manager.io/v1
kind: ClusterIssuer
metadata:
name: selfsigned-ca-issuer
spec:
ca:
secretName: selfsigned-ca-secret
```
--------------------------------------------------------------------------------
/pkg/toolsets/helm/toolset.go:
--------------------------------------------------------------------------------
```go
package helm
import (
"slices"
"github.com/containers/kubernetes-mcp-server/pkg/api"
internalk8s "github.com/containers/kubernetes-mcp-server/pkg/kubernetes"
"github.com/containers/kubernetes-mcp-server/pkg/toolsets"
)
type Toolset struct{}
var _ api.Toolset = (*Toolset)(nil)
func (t *Toolset) GetName() string {
return "helm"
}
func (t *Toolset) GetDescription() string {
return "Tools for managing Helm charts and releases"
}
func (t *Toolset) GetTools(_ internalk8s.Openshift) []api.ServerTool {
return slices.Concat(
initHelm(),
)
}
func init() {
toolsets.Register(&Toolset{})
}
```
--------------------------------------------------------------------------------
/dev/config/keycloak/clients/mcp-server.json:
--------------------------------------------------------------------------------
```json
{
"clientId": "mcp-server",
"enabled": true,
"publicClient": false,
"standardFlowEnabled": true,
"directAccessGrantsEnabled": true,
"serviceAccountsEnabled": true,
"authorizationServicesEnabled": false,
"redirectUris": ["*"],
"webOrigins": ["*"],
"defaultClientScopes": ["profile", "email", "groups", "mcp-server"],
"optionalClientScopes": ["mcp:openshift"],
"attributes": {
"oauth2.device.authorization.grant.enabled": "false",
"oidc.ciba.grant.enabled": "false",
"backchannel.logout.session.required": "true",
"backchannel.logout.revoke.offline.tokens": "false"
}
}
```
--------------------------------------------------------------------------------
/pkg/kubernetes/namespaces.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import (
"context"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
func (k *Kubernetes) NamespacesList(ctx context.Context, options ResourceListOptions) (runtime.Unstructured, error) {
return k.ResourcesList(ctx, &schema.GroupVersionKind{
Group: "", Version: "v1", Kind: "Namespace",
}, "", options)
}
func (k *Kubernetes) ProjectsList(ctx context.Context, options ResourceListOptions) (runtime.Unstructured, error) {
return k.ResourcesList(ctx, &schema.GroupVersionKind{
Group: "project.openshift.io", Version: "v1", Kind: "Project",
}, "", options)
}
```
--------------------------------------------------------------------------------
/hack/generate-placeholder-ca.sh:
--------------------------------------------------------------------------------
```bash
#!/bin/bash
set -e
# Generate a placeholder self-signed CA certificate for KIND cluster startup
# This will be replaced with the real cert-manager CA after the cluster is created
CERT_DIR="_output/cert-manager-ca"
CA_CERT="$CERT_DIR/ca.crt"
CA_KEY="$CERT_DIR/ca.key"
mkdir -p "$CERT_DIR"
# Generate a self-signed CA certificate (valid placeholder)
openssl req -x509 -newkey rsa:2048 -nodes \
-keyout "$CA_KEY" \
-out "$CA_CERT" \
-days 365 \
-subj "/CN=placeholder-ca" \
2>/dev/null
echo "✅ Placeholder CA certificate created at $CA_CERT"
echo "⚠️ This will be replaced with cert-manager CA after cluster creation"
```
--------------------------------------------------------------------------------
/pkg/toolsets/config/toolset.go:
--------------------------------------------------------------------------------
```go
package config
import (
"slices"
"github.com/containers/kubernetes-mcp-server/pkg/api"
internalk8s "github.com/containers/kubernetes-mcp-server/pkg/kubernetes"
"github.com/containers/kubernetes-mcp-server/pkg/toolsets"
)
type Toolset struct{}
var _ api.Toolset = (*Toolset)(nil)
func (t *Toolset) GetName() string {
return "config"
}
func (t *Toolset) GetDescription() string {
return "View and manage the current local Kubernetes configuration (kubeconfig)"
}
func (t *Toolset) GetTools(_ internalk8s.Openshift) []api.ServerTool {
return slices.Concat(
initConfiguration(),
)
}
func init() {
toolsets.Register(&Toolset{})
}
```
--------------------------------------------------------------------------------
/dev/config/keycloak/clients/mcp-server-update.json:
--------------------------------------------------------------------------------
```json
{
"clientId": "mcp-server",
"enabled": true,
"publicClient": false,
"standardFlowEnabled": true,
"directAccessGrantsEnabled": true,
"serviceAccountsEnabled": true,
"authorizationServicesEnabled": false,
"redirectUris": ["*"],
"webOrigins": ["*"],
"defaultClientScopes": ["profile", "email", "groups", "mcp-server"],
"optionalClientScopes": ["mcp:openshift"],
"attributes": {
"oauth2.device.authorization.grant.enabled": "false",
"oidc.ciba.grant.enabled": "false",
"backchannel.logout.session.required": "true",
"backchannel.logout.revoke.offline.tokens": "false",
"standard.token.exchange.enabled": "true"
}
}
```
--------------------------------------------------------------------------------
/dev/config/keycloak/rbac.yaml:
--------------------------------------------------------------------------------
```yaml
# RBAC ClusterRoleBinding for mcp user with OIDC authentication
#
# IMPORTANT: This requires Kubernetes API server to be configured with OIDC:
# --oidc-issuer-url=https://keycloak.127-0-0-1.sslip.io:8443/realms/openshift
# --oidc-username-claim=preferred_username
#
# Without OIDC configuration, this binding will not work.
#
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: oidc-mcp-cluster-admin
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: cluster-admin
subjects:
- apiGroup: rbac.authorization.k8s.io
kind: User
name: https://keycloak.127-0-0-1.sslip.io:8443/realms/openshift#mcp
```
--------------------------------------------------------------------------------
/pkg/toolsets/core/toolset.go:
--------------------------------------------------------------------------------
```go
package core
import (
"slices"
"github.com/containers/kubernetes-mcp-server/pkg/api"
internalk8s "github.com/containers/kubernetes-mcp-server/pkg/kubernetes"
"github.com/containers/kubernetes-mcp-server/pkg/toolsets"
)
type Toolset struct{}
var _ api.Toolset = (*Toolset)(nil)
func (t *Toolset) GetName() string {
return "core"
}
func (t *Toolset) GetDescription() string {
return "Most common tools for Kubernetes management (Pods, Generic Resources, Events, etc.)"
}
func (t *Toolset) GetTools(o internalk8s.Openshift) []api.ServerTool {
return slices.Concat(
initEvents(),
initNamespaces(o),
initNodes(),
initPods(),
initResources(o),
)
}
func init() {
toolsets.Register(&Toolset{})
}
```
--------------------------------------------------------------------------------
/python/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[build-system]
requires = ["setuptools>=42", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "kubernetes-mcp-server"
version = "0.0.0"
description = "Kubernetes MCP Server (Model Context Protocol) with special support for OpenShift"
readme = {file="README.md", content-type="text/markdown"}
requires-python = ">=3.6"
license = "Apache-2.0"
authors = [
{ name = "Marc Nuri", email = "[email protected]" }
]
classifiers = [
"Programming Language :: Python :: 3",
"Operating System :: OS Independent",
]
[project.urls]
Homepage = "https://github.com/containers/kubernetes-mcp-server"
Repository = "https://github.com/containers/kubernetes-mcp-server"
[project.scripts]
kubernetes-mcp-server = "kubernetes_mcp_server:main"
```
--------------------------------------------------------------------------------
/pkg/mcp/testdata/toolsets-config-tools.json:
--------------------------------------------------------------------------------
```json
[
{
"annotations": {
"title": "Configuration: View",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "Get the current Kubernetes configuration content as a kubeconfig YAML",
"inputSchema": {
"type": "object",
"properties": {
"minified": {
"description": "Return a minified version of the configuration. If set to true, keeps only the current-context and the relevant pieces of the configuration for that context. If set to false, all contexts, clusters, auth-infos, and users are returned in the configuration. (Optional, default true)",
"type": "boolean"
}
}
},
"name": "configuration_view"
}
]
```
--------------------------------------------------------------------------------
/dev/config/kind/cluster.yaml:
--------------------------------------------------------------------------------
```yaml
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
extraMounts:
- hostPath: ./_output/cert-manager-ca/ca.crt
containerPath: /etc/kubernetes/pki/keycloak-ca.crt
readOnly: true
kubeadmConfigPatches:
- |
kind: InitConfiguration
nodeRegistration:
kubeletExtraArgs:
node-labels: "ingress-ready=true"
kind: ClusterConfiguration
apiServer:
extraArgs:
oidc-issuer-url: https://keycloak.127-0-0-1.sslip.io:8443/realms/openshift
oidc-client-id: openshift
oidc-username-claim: preferred_username
oidc-groups-claim: groups
oidc-ca-file: /etc/kubernetes/pki/keycloak-ca.crt
extraPortMappings:
- containerPort: 80
hostPort: 8000
protocol: TCP
- containerPort: 443
hostPort: 8443
protocol: TCP
```
--------------------------------------------------------------------------------
/pkg/kubernetes/nodes.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import (
"context"
"fmt"
)
func (k *Kubernetes) NodesLog(ctx context.Context, name string, logPath string, tail int64) (string, error) {
// Use the node proxy API to access logs from the kubelet
// Common log paths:
// - /var/log/kubelet.log - kubelet logs
// - /var/log/kube-proxy.log - kube-proxy logs
// - /var/log/containers/ - container logs
req, err := k.AccessControlClientset().NodesLogs(ctx, name, logPath)
if err != nil {
return "", err
}
// Query parameters for tail
if tail > 0 {
req.Param("tailLines", fmt.Sprintf("%d", tail))
}
result := req.Do(ctx)
if result.Error() != nil {
return "", fmt.Errorf("failed to get node logs: %w", result.Error())
}
rawData, err := result.Raw()
if err != nil {
return "", fmt.Errorf("failed to read node log response: %w", err)
}
return string(rawData), nil
}
```
--------------------------------------------------------------------------------
/pkg/config/provider_config.go:
--------------------------------------------------------------------------------
```go
package config
import (
"fmt"
"github.com/BurntSushi/toml"
)
// ProviderConfig is the interface that all provider-specific configurations must implement.
// Each provider registers a factory function to parse its config from TOML primitives
type ProviderConfig interface {
Validate() error
}
type ProviderConfigParser func(primitive toml.Primitive, md toml.MetaData) (ProviderConfig, error)
var (
providerConfigParsers = make(map[string]ProviderConfigParser)
)
func RegisterProviderConfig(strategy string, parser ProviderConfigParser) {
if _, exists := providerConfigParsers[strategy]; exists {
panic(fmt.Sprintf("provider config parser already registered for strategy '%s'", strategy))
}
providerConfigParsers[strategy] = parser
}
func getProviderConfigParser(strategy string) (ProviderConfigParser, bool) {
provider, ok := providerConfigParsers[strategy]
return provider, ok
}
```
--------------------------------------------------------------------------------
/dev/config/keycloak/ingress.yaml:
--------------------------------------------------------------------------------
```yaml
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: keycloak
namespace: keycloak
labels:
app: keycloak
annotations:
cert-manager.io/cluster-issuer: "selfsigned-ca-issuer"
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/backend-protocol: "HTTP"
# Required for Keycloak 26.2.0+ to include port in issuer URLs
nginx.ingress.kubernetes.io/configuration-snippet: |
proxy_set_header X-Forwarded-Proto https;
proxy_set_header X-Forwarded-Port 8443;
proxy_set_header X-Forwarded-Host $host:8443;
spec:
ingressClassName: nginx
tls:
- hosts:
- keycloak.127-0-0-1.sslip.io
secretName: keycloak-tls-cert
rules:
- host: keycloak.127-0-0-1.sslip.io
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: keycloak
port:
number: 80
```
--------------------------------------------------------------------------------
/pkg/kubernetes/accesscontrol.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import (
"fmt"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/containers/kubernetes-mcp-server/pkg/config"
)
// isAllowed checks the resource is in denied list or not.
// If it is in denied list, this function returns false.
func isAllowed(
staticConfig *config.StaticConfig, // TODO: maybe just use the denied resource slice
gvk *schema.GroupVersionKind,
) bool {
if staticConfig == nil {
return true
}
for _, val := range staticConfig.DeniedResources {
// If kind is empty, that means Group/Version pair is denied entirely
if val.Kind == "" {
if gvk.Group == val.Group && gvk.Version == val.Version {
return false
}
}
if gvk.Group == val.Group &&
gvk.Version == val.Version &&
gvk.Kind == val.Kind {
return false
}
}
return true
}
func isNotAllowedError(gvk *schema.GroupVersionKind) error {
return fmt.Errorf("resource not allowed: %s", gvk.String())
}
```
--------------------------------------------------------------------------------
/.github/workflows/build.yaml:
--------------------------------------------------------------------------------
```yaml
name: Build
on:
push:
branches:
- 'main'
paths-ignore:
- '.gitignore'
- 'LICENSE'
- '*.md'
pull_request:
paths-ignore:
- '.gitignore'
- 'LICENSE'
- '*.md'
concurrency:
# Only run once for latest commit per ref and cancel other (previous) runs.
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
env:
GO_VERSION: 1.23
defaults:
run:
shell: bash
jobs:
build:
name: Build on ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os:
- ubuntu-latest #x64
- ubuntu-24.04-arm #arm64
- windows-latest #x64
- macos-13 #x64
- macos-latest #arm64
runs-on: ${{ matrix.os }}
steps:
- name: Checkout
uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: ${{ env.GO_VERSION }}
- name: Build
run: make build
- name: Test
run: make test
```
--------------------------------------------------------------------------------
/pkg/toolsets/toolsets.go:
--------------------------------------------------------------------------------
```go
package toolsets
import (
"fmt"
"slices"
"strings"
"github.com/containers/kubernetes-mcp-server/pkg/api"
)
var toolsets []api.Toolset
// Clear removes all registered toolsets, TESTING PURPOSES ONLY.
func Clear() {
toolsets = []api.Toolset{}
}
func Register(toolset api.Toolset) {
toolsets = append(toolsets, toolset)
}
func Toolsets() []api.Toolset {
return toolsets
}
func ToolsetNames() []string {
names := make([]string, 0)
for _, toolset := range Toolsets() {
names = append(names, toolset.GetName())
}
slices.Sort(names)
return names
}
func ToolsetFromString(name string) api.Toolset {
for _, toolset := range Toolsets() {
if toolset.GetName() == strings.TrimSpace(name) {
return toolset
}
}
return nil
}
func Validate(toolsets []string) error {
for _, toolset := range toolsets {
if ToolsetFromString(toolset) == nil {
return fmt.Errorf("invalid toolset name: %s, valid names are: %s", toolset, strings.Join(ToolsetNames(), ", "))
}
}
return nil
}
```
--------------------------------------------------------------------------------
/pkg/output/output_test.go:
--------------------------------------------------------------------------------
```go
package output
import (
"encoding/json"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"regexp"
"testing"
)
func TestPlainTextUnstructuredList(t *testing.T) {
var podList unstructured.UnstructuredList
_ = json.Unmarshal([]byte(`
{ "apiVersion": "v1", "kind": "PodList", "items": [{
"apiVersion": "v1", "kind": "Pod",
"metadata": {
"name": "pod-1", "namespace": "default", "creationTimestamp": "2023-10-01T00:00:00Z", "labels": { "app": "nginx" }
},
"spec": { "containers": [{ "name": "container-1", "image": "marcnuri/chuck-norris" }] } }
]}`), &podList)
out, err := Table.PrintObj(&podList)
t.Run("processes the list", func(t *testing.T) {
if err != nil {
t.Fatalf("Error printing pod list: %v", err)
}
})
t.Run("prints headers", func(t *testing.T) {
expectedHeaders := "NAME\\s+AGE\\s+LABELS"
if m, e := regexp.MatchString(expectedHeaders, out); !m || e != nil {
t.Errorf("Expected headers '%s' not found in output: %s", expectedHeaders, out)
}
})
}
```
--------------------------------------------------------------------------------
/pkg/kubernetes/kubernetes.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import (
"k8s.io/apimachinery/pkg/runtime"
"github.com/containers/kubernetes-mcp-server/pkg/helm"
"k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
)
type HeaderKey string
const (
CustomAuthorizationHeader = HeaderKey("kubernetes-authorization")
OAuthAuthorizationHeader = HeaderKey("Authorization")
CustomUserAgent = "kubernetes-mcp-server/bearer-token-auth"
)
type CloseWatchKubeConfig func() error
type Kubernetes struct {
manager *Manager
}
// AccessControlClientset returns the access-controlled clientset
// This ensures that any denied resources configured in the system are properly enforced
func (k *Kubernetes) AccessControlClientset() *AccessControlClientset {
return k.manager.accessControlClientSet
}
var Scheme = scheme.Scheme
var ParameterCodec = runtime.NewParameterCodec(Scheme)
func (k *Kubernetes) NewHelm() *helm.Helm {
// This is a derived Kubernetes, so it already has the Helm initialized
return helm.NewHelm(k.manager)
}
```
--------------------------------------------------------------------------------
/npm/kubernetes-mcp-server/package.json:
--------------------------------------------------------------------------------
```json
{
"name": "kubernetes-mcp-server",
"version": "0.0.0",
"description": "Model Context Protocol (MCP) server for Kubernetes and OpenShift",
"main": "./bin/index.js",
"bin": {
"kubernetes-mcp-server": "bin/index.js"
},
"optionalDependencies": {
"kubernetes-mcp-server-darwin-amd64": "0.0.0",
"kubernetes-mcp-server-darwin-arm64": "0.0.0",
"kubernetes-mcp-server-linux-amd64": "0.0.0",
"kubernetes-mcp-server-linux-arm64": "0.0.0",
"kubernetes-mcp-server-windows-amd64": "0.0.0",
"kubernetes-mcp-server-windows-arm64": "0.0.0"
},
"repository": {
"type": "git",
"url": "git+https://github.com/containers/kubernetes-mcp-server.git"
},
"keywords": [
"mcp",
"kubernetes",
"openshift",
"model context protocol",
"model",
"context",
"protocol"
],
"author": {
"name": "Marc Nuri",
"url": "https://www.marcnuri.com"
},
"license": "Apache-2.0",
"bugs": {
"url": "https://github.com/containers/kubernetes-mcp-server/issues"
},
"homepage": "https://github.com/containers/kubernetes-mcp-server#readme"
}
```
--------------------------------------------------------------------------------
/pkg/mcp/tool_filter.go:
--------------------------------------------------------------------------------
```go
package mcp
import (
"github.com/containers/kubernetes-mcp-server/pkg/api"
"github.com/containers/kubernetes-mcp-server/pkg/kubernetes"
)
// ToolFilter is a function that takes a ServerTool and returns a boolean indicating whether to include the tool
type ToolFilter func(tool api.ServerTool) bool
func CompositeFilter(filters ...ToolFilter) ToolFilter {
return func(tool api.ServerTool) bool {
for _, f := range filters {
if !f(tool) {
return false
}
}
return true
}
}
func ShouldIncludeTargetListTool(targetName string, targets []string) ToolFilter {
return func(tool api.ServerTool) bool {
if !tool.IsTargetListProvider() {
return true
}
if len(targets) <= 1 {
// there is no need to provide a tool to list the single available target
return false
}
// TODO: this check should be removed or make more generic when we have other
if tool.Tool.Name == "configuration_contexts_list" && targetName != kubernetes.KubeConfigTargetParameterName {
// let's not include configuration_contexts_list if we aren't targeting contexts in our Provider
return false
}
return true
}
}
```
--------------------------------------------------------------------------------
/pkg/config/config_default.go:
--------------------------------------------------------------------------------
```go
package config
import (
"bytes"
"github.com/BurntSushi/toml"
)
func Default() *StaticConfig {
defaultConfig := StaticConfig{
ListOutput: "table",
Toolsets: []string{"core", "config", "helm"},
}
overrides := defaultOverrides()
mergedConfig := mergeConfig(defaultConfig, overrides)
return &mergedConfig
}
// HasDefaultOverrides indicates whether the internal defaultOverrides function
// provides any overrides or an empty StaticConfig.
func HasDefaultOverrides() bool {
overrides := defaultOverrides()
var buf bytes.Buffer
if err := toml.NewEncoder(&buf).Encode(overrides); err != nil {
// If marshaling fails, assume no overrides
return false
}
return len(bytes.TrimSpace(buf.Bytes())) > 0
}
// mergeConfig applies non-zero values from override to base using TOML serialization
// and returns the merged StaticConfig.
// In case of any error during marshalling or unmarshalling, it returns the base config unchanged.
func mergeConfig(base, override StaticConfig) StaticConfig {
var overrideBuffer bytes.Buffer
if err := toml.NewEncoder(&overrideBuffer).Encode(override); err != nil {
// If marshaling fails, return base unchanged
return base
}
_, _ = toml.NewDecoder(&overrideBuffer).Decode(&base)
return base
}
```
--------------------------------------------------------------------------------
/pkg/http/middleware.go:
--------------------------------------------------------------------------------
```go
package http
import (
"bufio"
"net"
"net/http"
"time"
"k8s.io/klog/v2"
)
func RequestMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/healthz" {
next.ServeHTTP(w, r)
return
}
start := time.Now()
lrw := &loggingResponseWriter{
ResponseWriter: w,
statusCode: http.StatusOK,
}
next.ServeHTTP(lrw, r)
duration := time.Since(start)
klog.V(5).Infof("%s %s %d %v", r.Method, r.URL.Path, lrw.statusCode, duration)
})
}
type loggingResponseWriter struct {
http.ResponseWriter
statusCode int
headerWritten bool
}
func (lrw *loggingResponseWriter) WriteHeader(code int) {
if !lrw.headerWritten {
lrw.statusCode = code
lrw.headerWritten = true
lrw.ResponseWriter.WriteHeader(code)
}
}
func (lrw *loggingResponseWriter) Write(b []byte) (int, error) {
if !lrw.headerWritten {
lrw.statusCode = http.StatusOK
lrw.headerWritten = true
}
return lrw.ResponseWriter.Write(b)
}
func (lrw *loggingResponseWriter) Flush() {
if flusher, ok := lrw.ResponseWriter.(http.Flusher); ok {
flusher.Flush()
}
}
func (lrw *loggingResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
if hijacker, ok := lrw.ResponseWriter.(http.Hijacker); ok {
return hijacker.Hijack()
}
return nil, nil, http.ErrNotSupported
}
```
--------------------------------------------------------------------------------
/pkg/api/toolsets_test.go:
--------------------------------------------------------------------------------
```go
package api
import (
"testing"
"github.com/stretchr/testify/suite"
"k8s.io/utils/ptr"
)
type ToolsetsSuite struct {
suite.Suite
}
func (s *ToolsetsSuite) TestServerTool() {
s.Run("IsClusterAware", func() {
s.Run("defaults to true", func() {
tool := &ServerTool{}
s.True(tool.IsClusterAware(), "Expected IsClusterAware to be true by default")
})
s.Run("can be set to false", func() {
tool := &ServerTool{ClusterAware: ptr.To(false)}
s.False(tool.IsClusterAware(), "Expected IsClusterAware to be false when set to false")
})
s.Run("can be set to true", func() {
tool := &ServerTool{ClusterAware: ptr.To(true)}
s.True(tool.IsClusterAware(), "Expected IsClusterAware to be true when set to true")
})
})
s.Run("IsTargetListProvider", func() {
s.Run("defaults to false", func() {
tool := &ServerTool{}
s.False(tool.IsTargetListProvider(), "Expected IsTargetListProvider to be false by default")
})
s.Run("can be set to false", func() {
tool := &ServerTool{TargetListProvider: ptr.To(false)}
s.False(tool.IsTargetListProvider(), "Expected IsTargetListProvider to be false when set to false")
})
s.Run("can be set to true", func() {
tool := &ServerTool{TargetListProvider: ptr.To(true)}
s.True(tool.IsTargetListProvider(), "Expected IsTargetListProvider to be true when set to true")
})
})
}
func TestToolsets(t *testing.T) {
suite.Run(t, new(ToolsetsSuite))
}
```
--------------------------------------------------------------------------------
/pkg/kubernetes/provider.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import (
"context"
"github.com/containers/kubernetes-mcp-server/pkg/config"
)
type Provider interface {
// Openshift extends the Openshift interface to provide OpenShift specific functionality to toolset providers
// TODO: with the configurable toolset implementation and especially the multi-cluster approach
// extending this interface might not be a good idea anymore.
// For the kubecontext case, a user might be targeting both an OpenShift flavored cluster and a vanilla Kubernetes cluster.
// See: https://github.com/containers/kubernetes-mcp-server/pull/372#discussion_r2421592315
Openshift
TokenVerifier
GetTargets(ctx context.Context) ([]string, error)
GetDerivedKubernetes(ctx context.Context, target string) (*Kubernetes, error)
GetDefaultTarget() string
GetTargetParameterName() string
WatchTargets(func() error)
Close()
}
func NewProvider(cfg *config.StaticConfig) (Provider, error) {
strategy := resolveStrategy(cfg)
factory, err := getProviderFactory(strategy)
if err != nil {
return nil, err
}
return factory(cfg)
}
func resolveStrategy(cfg *config.StaticConfig) string {
if cfg.ClusterProviderStrategy != "" {
return cfg.ClusterProviderStrategy
}
if cfg.KubeConfig != "" {
return config.ClusterProviderKubeConfig
}
if _, inClusterConfigErr := InClusterConfig(); inClusterConfigErr == nil {
return config.ClusterProviderInCluster
}
return config.ClusterProviderKubeConfig
}
```
--------------------------------------------------------------------------------
/dev/config/keycloak/deployment.yaml:
--------------------------------------------------------------------------------
```yaml
---
apiVersion: v1
kind: Namespace
metadata:
name: keycloak
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: keycloak
namespace: keycloak
labels:
app: keycloak
spec:
replicas: 1
selector:
matchLabels:
app: keycloak
template:
metadata:
labels:
app: keycloak
spec:
containers:
- name: keycloak
image: quay.io/keycloak/keycloak:26.4
args: ["start-dev"]
env:
- name: KC_BOOTSTRAP_ADMIN_USERNAME
value: "admin"
- name: KC_BOOTSTRAP_ADMIN_PASSWORD
value: "admin"
- name: KC_HOSTNAME
value: "https://keycloak.127-0-0-1.sslip.io:8443"
- name: KC_HTTP_ENABLED
value: "true"
- name: KC_HEALTH_ENABLED
value: "true"
- name: KC_PROXY_HEADERS
value: "xforwarded"
ports:
- name: http
containerPort: 8080
readinessProbe:
httpGet:
path: /health/ready
port: 9000
initialDelaySeconds: 30
periodSeconds: 10
livenessProbe:
httpGet:
path: /health/live
port: 9000
initialDelaySeconds: 60
periodSeconds: 30
---
apiVersion: v1
kind: Service
metadata:
name: keycloak
namespace: keycloak
labels:
app: keycloak
spec:
ports:
- name: http
port: 80
targetPort: 8080
selector:
app: keycloak
type: ClusterIP
```
--------------------------------------------------------------------------------
/.github/workflows/release.yaml:
--------------------------------------------------------------------------------
```yaml
name: Release
on:
push:
tags:
- '*'
concurrency:
# Only run once for latest commit per ref and cancel other (previous) runs.
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
env:
GO_VERSION: 1.23
UV_PUBLISH_TOKEN: ${{ secrets.UV_PUBLISH_TOKEN }}
permissions:
contents: write
id-token: write # Required for npmjs OIDC
discussions: write
jobs:
release:
name: Release
runs-on: macos-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: ${{ env.GO_VERSION }}
- name: Build
run: make build-all-platforms
- name: Upload artifacts
uses: softprops/action-gh-release@v2
with:
generate_release_notes: true
make_latest: true
files: |
LICENSE
kubernetes-mcp-server-*
# Ensure npm 11.5.1 or later is installed (required for https://docs.npmjs.com/trusted-publishers)
- name: Setup node
uses: actions/setup-node@v6
with:
node-version: 24
registry-url: 'https://registry.npmjs.org'
- name: Publish npm
run:
make npm-publish
python:
name: Release Python
# Python logic requires the tag/release version to be available from GitHub
needs: release
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- uses: astral-sh/setup-uv@v5
- name: Publish Python
run:
make python-publish
```
--------------------------------------------------------------------------------
/pkg/kubernetes/events.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import (
"context"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"strings"
)
func (k *Kubernetes) EventsList(ctx context.Context, namespace string) ([]map[string]any, error) {
var eventMap []map[string]any
raw, err := k.ResourcesList(ctx, &schema.GroupVersionKind{
Group: "", Version: "v1", Kind: "Event",
}, namespace, ResourceListOptions{})
if err != nil {
return eventMap, err
}
unstructuredList := raw.(*unstructured.UnstructuredList)
if len(unstructuredList.Items) == 0 {
return eventMap, nil
}
for _, item := range unstructuredList.Items {
event := &v1.Event{}
if err = runtime.DefaultUnstructuredConverter.FromUnstructured(item.Object, event); err != nil {
return eventMap, err
}
timestamp := event.EventTime.Time
if timestamp.IsZero() && event.Series != nil {
timestamp = event.Series.LastObservedTime.Time
} else if timestamp.IsZero() && event.Count > 1 {
timestamp = event.LastTimestamp.Time
} else if timestamp.IsZero() {
timestamp = event.FirstTimestamp.Time
}
eventMap = append(eventMap, map[string]any{
"Namespace": event.Namespace,
"Timestamp": timestamp.String(),
"Type": event.Type,
"Reason": event.Reason,
"InvolvedObject": map[string]string{
"apiVersion": event.InvolvedObject.APIVersion,
"Kind": event.InvolvedObject.Kind,
"Name": event.InvolvedObject.Name,
},
"Message": strings.TrimSpace(event.Message),
})
}
return eventMap, nil
}
```
--------------------------------------------------------------------------------
/internal/test/mcp.go:
--------------------------------------------------------------------------------
```go
package test
import (
"net/http"
"net/http/httptest"
"testing"
"github.com/mark3labs/mcp-go/client"
"github.com/mark3labs/mcp-go/mcp"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
)
type McpClient struct {
ctx context.Context
testServer *httptest.Server
*client.Client
}
func NewMcpClient(t *testing.T, mcpHttpServer http.Handler) *McpClient {
require.NotNil(t, mcpHttpServer, "McpHttpServer must be provided")
var err error
ret := &McpClient{ctx: t.Context()}
ret.testServer = httptest.NewServer(mcpHttpServer)
ret.Client, err = client.NewStreamableHttpClient(ret.testServer.URL + "/mcp")
require.NoError(t, err, "Expected no error creating MCP client")
err = ret.Start(t.Context())
require.NoError(t, err, "Expected no error starting MCP client")
initRequest := mcp.InitializeRequest{}
initRequest.Params.ProtocolVersion = mcp.LATEST_PROTOCOL_VERSION
initRequest.Params.ClientInfo = mcp.Implementation{Name: "test", Version: "1.33.7"}
_, err = ret.Initialize(t.Context(), initRequest)
require.NoError(t, err, "Expected no error initializing MCP client")
return ret
}
func (m *McpClient) Close() {
if m.Client != nil {
_ = m.Client.Close()
}
if m.testServer != nil {
m.testServer.Close()
}
}
// CallTool helper function to call a tool by name with arguments
func (m *McpClient) CallTool(name string, args map[string]interface{}) (*mcp.CallToolResult, error) {
callToolRequest := mcp.CallToolRequest{}
callToolRequest.Params.Name = name
callToolRequest.Params.Arguments = args
return m.Client.CallTool(m.ctx, callToolRequest)
}
```
--------------------------------------------------------------------------------
/npm/kubernetes-mcp-server/bin/index.js:
--------------------------------------------------------------------------------
```javascript
#!/usr/bin/env node
const childProcess = require('child_process');
const BINARY_MAP = {
darwin_x64: {name: 'kubernetes-mcp-server-darwin-amd64', suffix: ''},
darwin_arm64: {name: 'kubernetes-mcp-server-darwin-arm64', suffix: ''},
linux_x64: {name: 'kubernetes-mcp-server-linux-amd64', suffix: ''},
linux_arm64: {name: 'kubernetes-mcp-server-linux-arm64', suffix: ''},
win32_x64: {name: 'kubernetes-mcp-server-windows-amd64', suffix: '.exe'},
win32_arm64: {name: 'kubernetes-mcp-server-windows-arm64', suffix: '.exe'},
};
// Resolving will fail if the optionalDependency was not installed or the platform/arch is not supported
const resolveBinaryPath = () => {
try {
const binary = BINARY_MAP[`${process.platform}_${process.arch}`];
return require.resolve(`${binary.name}/bin/${binary.name}${binary.suffix}`);
} catch (e) {
throw new Error(`Could not resolve binary path for platform/arch: ${process.platform}/${process.arch}`);
}
};
const child = childProcess.spawn(resolveBinaryPath(), process.argv.slice(2), {
stdio: 'inherit',
});
const handleSignal = () => (signal) => {
console.log(`Received ${signal}, terminating child process...`);
if (child && !child.killed) {
child.kill(signal);
}
};
['SIGTERM', 'SIGINT', 'SIGHUP'].forEach((signal) => {
process.on(signal, handleSignal(signal));
});
child.on('close', (code, signal) => {
if (signal) {
console.log(`Child process terminated by signal: ${signal}`);
process.exit(128 + (signal === 'SIGTERM' ? 15 : signal === 'SIGINT' ? 2 : 1));
} else {
process.exit(code || 0);
}
});
```
--------------------------------------------------------------------------------
/pkg/mcp/tool_mutator.go:
--------------------------------------------------------------------------------
```go
package mcp
import (
"fmt"
"sort"
"github.com/containers/kubernetes-mcp-server/pkg/api"
"github.com/google/jsonschema-go/jsonschema"
)
type ToolMutator func(tool api.ServerTool) api.ServerTool
const maxTargetsInEnum = 5 // TODO: test and validate that this is a reasonable cutoff
// WithTargetParameter adds a target selection parameter to the tool's input schema if the tool is cluster-aware
func WithTargetParameter(defaultCluster, targetParameterName string, targets []string) ToolMutator {
return func(tool api.ServerTool) api.ServerTool {
if !tool.IsClusterAware() {
return tool
}
if tool.Tool.InputSchema == nil {
tool.Tool.InputSchema = &jsonschema.Schema{Type: "object"}
}
if tool.Tool.InputSchema.Properties == nil {
tool.Tool.InputSchema.Properties = make(map[string]*jsonschema.Schema)
}
if len(targets) > 1 {
tool.Tool.InputSchema.Properties[targetParameterName] = createTargetProperty(
defaultCluster,
targetParameterName,
targets,
)
}
return tool
}
}
func createTargetProperty(defaultCluster, targetName string, targets []string) *jsonschema.Schema {
baseSchema := &jsonschema.Schema{
Type: "string",
Description: fmt.Sprintf(
"Optional parameter selecting which %s to run the tool in. Defaults to %s if not set",
targetName,
defaultCluster,
),
}
if len(targets) <= maxTargetsInEnum {
// Sort clusters to ensure consistent enum ordering
sort.Strings(targets)
enumValues := make([]any, 0, len(targets))
for _, c := range targets {
enumValues = append(enumValues, c)
}
baseSchema.Enum = enumValues
}
return baseSchema
}
```
--------------------------------------------------------------------------------
/pkg/toolsets/core/events.go:
--------------------------------------------------------------------------------
```go
package core
import (
"fmt"
"github.com/google/jsonschema-go/jsonschema"
"k8s.io/utils/ptr"
"github.com/containers/kubernetes-mcp-server/pkg/api"
"github.com/containers/kubernetes-mcp-server/pkg/output"
)
func initEvents() []api.ServerTool {
return []api.ServerTool{
{Tool: api.Tool{
Name: "events_list",
Description: "List all the Kubernetes events in the current cluster from all namespaces",
InputSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"namespace": {
Type: "string",
Description: "Optional Namespace to retrieve the events from. If not provided, will list events from all namespaces",
},
},
},
Annotations: api.ToolAnnotations{
Title: "Events: List",
ReadOnlyHint: ptr.To(true),
DestructiveHint: ptr.To(false),
IdempotentHint: ptr.To(false),
OpenWorldHint: ptr.To(true),
},
}, Handler: eventsList},
}
}
func eventsList(params api.ToolHandlerParams) (*api.ToolCallResult, error) {
namespace := params.GetArguments()["namespace"]
if namespace == nil {
namespace = ""
}
eventMap, err := params.EventsList(params, namespace.(string))
if err != nil {
return api.NewToolCallResult("", fmt.Errorf("failed to list events in all namespaces: %v", err)), nil
}
if len(eventMap) == 0 {
return api.NewToolCallResult("# No events found", nil), nil
}
yamlEvents, err := output.MarshalYaml(eventMap)
if err != nil {
err = fmt.Errorf("failed to list events in all namespaces: %v", err)
}
return api.NewToolCallResult(fmt.Sprintf("# The following events (YAML format) were found:\n%s", yamlEvents), err), nil
}
```
--------------------------------------------------------------------------------
/pkg/kubernetes/provider_registry.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import (
"fmt"
"sort"
"github.com/containers/kubernetes-mcp-server/pkg/config"
)
// ProviderFactory creates a new Provider instance for a given strategy.
// Implementations should validate that the Manager is compatible with their strategy
// (e.g., kubeconfig provider should reject in-cluster managers).
type ProviderFactory func(cfg *config.StaticConfig) (Provider, error)
var providerFactories = make(map[string]ProviderFactory)
// RegisterProvider registers a provider factory for a given strategy name.
// This should be called from init() functions in provider implementation files.
// Panics if a provider is already registered for the given strategy.
func RegisterProvider(strategy string, factory ProviderFactory) {
if _, exists := providerFactories[strategy]; exists {
panic(fmt.Sprintf("provider already registered for strategy '%s'", strategy))
}
providerFactories[strategy] = factory
}
// getProviderFactory retrieves a registered provider factory by strategy name.
// Returns an error if no provider is registered for the given strategy.
func getProviderFactory(strategy string) (ProviderFactory, error) {
factory, ok := providerFactories[strategy]
if !ok {
available := GetRegisteredStrategies()
return nil, fmt.Errorf("no provider registered for strategy '%s', available strategies: %v", strategy, available)
}
return factory, nil
}
// GetRegisteredStrategies returns a sorted list of all registered strategy names.
// This is useful for error messages and debugging.
func GetRegisteredStrategies() []string {
strategies := make([]string, 0, len(providerFactories))
for strategy := range providerFactories {
strategies = append(strategies, strategy)
}
sort.Strings(strategies)
return strategies
}
```
--------------------------------------------------------------------------------
/pkg/http/sts.go:
--------------------------------------------------------------------------------
```go
package http
import (
"context"
"github.com/coreos/go-oidc/v3/oidc"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google/externalaccount"
"github.com/containers/kubernetes-mcp-server/pkg/config"
)
type staticSubjectTokenSupplier struct {
token string
}
func (s *staticSubjectTokenSupplier) SubjectToken(_ context.Context, _ externalaccount.SupplierOptions) (string, error) {
return s.token, nil
}
var _ externalaccount.SubjectTokenSupplier = &staticSubjectTokenSupplier{}
type SecurityTokenService struct {
*oidc.Provider
ClientId string
ClientSecret string
ExternalAccountAudience string
ExternalAccountScopes []string
}
func NewFromConfig(config *config.StaticConfig, provider *oidc.Provider) *SecurityTokenService {
return &SecurityTokenService{
Provider: provider,
ClientId: config.StsClientId,
ClientSecret: config.StsClientSecret,
ExternalAccountAudience: config.StsAudience,
ExternalAccountScopes: config.StsScopes,
}
}
func (sts *SecurityTokenService) IsEnabled() bool {
return sts.Provider != nil && sts.ClientId != "" && sts.ExternalAccountAudience != ""
}
func (sts *SecurityTokenService) ExternalAccountTokenExchange(ctx context.Context, originalToken *oauth2.Token) (*oauth2.Token, error) {
ts, err := externalaccount.NewTokenSource(ctx, externalaccount.Config{
TokenURL: sts.Endpoint().TokenURL,
ClientID: sts.ClientId,
ClientSecret: sts.ClientSecret,
Audience: sts.ExternalAccountAudience,
SubjectTokenType: "urn:ietf:params:oauth:token-type:access_token",
SubjectTokenSupplier: &staticSubjectTokenSupplier{token: originalToken.AccessToken},
Scopes: sts.ExternalAccountScopes,
})
if err != nil {
return nil, err
}
return ts.Token()
}
```
--------------------------------------------------------------------------------
/pkg/kubernetes/provider_registry_test.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import (
"testing"
"github.com/containers/kubernetes-mcp-server/pkg/config"
"github.com/stretchr/testify/suite"
)
type ProviderRegistryTestSuite struct {
BaseProviderSuite
}
func (s *ProviderRegistryTestSuite) TestRegisterProvider() {
s.Run("With no pre-existing provider, registers the provider", func() {
RegisterProvider("test-strategy", func(cfg *config.StaticConfig) (Provider, error) {
return nil, nil
})
_, exists := providerFactories["test-strategy"]
s.True(exists, "Provider should be registered")
})
s.Run("With pre-existing provider, panics", func() {
RegisterProvider("test-pre-existent", func(cfg *config.StaticConfig) (Provider, error) {
return nil, nil
})
s.Panics(func() {
RegisterProvider("test-pre-existent", func(cfg *config.StaticConfig) (Provider, error) {
return nil, nil
})
}, "Registering a provider with an existing strategy should panic")
})
}
func (s *ProviderRegistryTestSuite) TestGetRegisteredStrategies() {
s.Run("With no registered providers, returns empty list", func() {
providerFactories = make(map[string]ProviderFactory)
strategies := GetRegisteredStrategies()
s.Empty(strategies, "No strategies should be registered")
})
s.Run("With multiple registered providers, returns sorted list", func() {
providerFactories = make(map[string]ProviderFactory)
RegisterProvider("foo-strategy", func(cfg *config.StaticConfig) (Provider, error) {
return nil, nil
})
RegisterProvider("bar-strategy", func(cfg *config.StaticConfig) (Provider, error) {
return nil, nil
})
strategies := GetRegisteredStrategies()
expected := []string{"bar-strategy", "foo-strategy"}
s.Equal(expected, strategies, "Strategies should be sorted alphabetically")
})
}
func TestProviderRegistry(t *testing.T) {
suite.Run(t, new(ProviderRegistryTestSuite))
}
```
--------------------------------------------------------------------------------
/.github/workflows/release-image.yml:
--------------------------------------------------------------------------------
```yaml
name: Release as container image
on:
push:
branches:
- main
tags:
- '*'
env:
IMAGE_NAME: quay.io/manusa/kubernetes_mcp_server
TAG: ${{ github.ref_name == 'main' && 'latest' || github.ref_type == 'tag' && github.ref_name && startsWith(github.ref_name, 'v') && github.ref_name || 'unknown' }}
jobs:
publish-platform-images:
name: 'Publish: linux-${{ matrix.platform.tag }}'
strategy:
fail-fast: true
matrix:
platform:
- runner: ubuntu-latest
tag: amd64
- runner: ubuntu-24.04-arm
tag: arm64
runs-on: ${{ matrix.platform.runner }}
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Install Podman # Not available in arm64 image
run: |
sudo apt-get update
sudo apt-get install -y podman
- name: Quay Login
run: |
echo ${{ secrets.QUAY_PASSWORD }} | podman login quay.io -u ${{ secrets.QUAY_USERNAME }} --password-stdin
- name: Build Image
run: |
podman build \
--platform "linux/${{ matrix.platform.tag }}" \
-f Dockerfile \
-t "${{ env.IMAGE_NAME }}:${{ env.TAG }}-linux-${{ matrix.platform.tag }}" \
.
- name: Push Image
run: |
podman push \
"${{ env.IMAGE_NAME }}:${{ env.TAG }}-linux-${{ matrix.platform.tag }}"
publish-manifest:
name: Publish Manifest
runs-on: ubuntu-latest
needs: publish-platform-images
steps:
- name: Quay Login
run: |
echo ${{ secrets.QUAY_PASSWORD }} | podman login quay.io -u ${{ secrets.QUAY_USERNAME }} --password-stdin
- name: Create Manifest
run: |
podman manifest create \
"${{ env.IMAGE_NAME }}:${{ env.TAG }}" \
"${{ env.IMAGE_NAME }}:${{ env.TAG }}-linux-amd64" \
"${{ env.IMAGE_NAME }}:${{ env.TAG }}-linux-arm64"
- name: Push Manifest
run: |
podman manifest push \
"${{ env.IMAGE_NAME }}:${{ env.TAG }}"
```
--------------------------------------------------------------------------------
/pkg/toolsets/core/namespaces.go:
--------------------------------------------------------------------------------
```go
package core
import (
"context"
"fmt"
"github.com/google/jsonschema-go/jsonschema"
"k8s.io/utils/ptr"
"github.com/containers/kubernetes-mcp-server/pkg/api"
internalk8s "github.com/containers/kubernetes-mcp-server/pkg/kubernetes"
)
func initNamespaces(o internalk8s.Openshift) []api.ServerTool {
ret := make([]api.ServerTool, 0)
ret = append(ret, api.ServerTool{
Tool: api.Tool{
Name: "namespaces_list",
Description: "List all the Kubernetes namespaces in the current cluster",
InputSchema: &jsonschema.Schema{
Type: "object",
},
Annotations: api.ToolAnnotations{
Title: "Namespaces: List",
ReadOnlyHint: ptr.To(true),
DestructiveHint: ptr.To(false),
IdempotentHint: ptr.To(false),
OpenWorldHint: ptr.To(true),
},
}, Handler: namespacesList,
})
if o.IsOpenShift(context.Background()) {
ret = append(ret, api.ServerTool{
Tool: api.Tool{
Name: "projects_list",
Description: "List all the OpenShift projects in the current cluster",
InputSchema: &jsonschema.Schema{
Type: "object",
},
Annotations: api.ToolAnnotations{
Title: "Projects: List",
ReadOnlyHint: ptr.To(true),
DestructiveHint: ptr.To(false),
IdempotentHint: ptr.To(false),
OpenWorldHint: ptr.To(true),
},
}, Handler: projectsList,
})
}
return ret
}
func namespacesList(params api.ToolHandlerParams) (*api.ToolCallResult, error) {
ret, err := params.NamespacesList(params, internalk8s.ResourceListOptions{AsTable: params.ListOutput.AsTable()})
if err != nil {
return api.NewToolCallResult("", fmt.Errorf("failed to list namespaces: %v", err)), nil
}
return api.NewToolCallResult(params.ListOutput.PrintObj(ret)), nil
}
func projectsList(params api.ToolHandlerParams) (*api.ToolCallResult, error) {
ret, err := params.ProjectsList(params, internalk8s.ResourceListOptions{AsTable: params.ListOutput.AsTable()})
if err != nil {
return api.NewToolCallResult("", fmt.Errorf("failed to list projects: %v", err)), nil
}
return api.NewToolCallResult(params.ListOutput.PrintObj(ret)), nil
}
```
--------------------------------------------------------------------------------
/pkg/mcp/m3labs.go:
--------------------------------------------------------------------------------
```go
package mcp
import (
"context"
"encoding/json"
"fmt"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
"github.com/containers/kubernetes-mcp-server/pkg/api"
)
func ServerToolToM3LabsServerTool(s *Server, tools []api.ServerTool) ([]server.ServerTool, error) {
m3labTools := make([]server.ServerTool, 0)
for _, tool := range tools {
m3labTool := mcp.Tool{
Name: tool.Tool.Name,
Description: tool.Tool.Description,
Annotations: mcp.ToolAnnotation{
Title: tool.Tool.Annotations.Title,
ReadOnlyHint: tool.Tool.Annotations.ReadOnlyHint,
DestructiveHint: tool.Tool.Annotations.DestructiveHint,
IdempotentHint: tool.Tool.Annotations.IdempotentHint,
OpenWorldHint: tool.Tool.Annotations.OpenWorldHint,
},
}
if tool.Tool.InputSchema != nil {
schema, err := json.Marshal(tool.Tool.InputSchema)
if err != nil {
return nil, fmt.Errorf("failed to marshal tool input schema for tool %s: %v", tool.Tool.Name, err)
}
// TODO: temporary fix to append an empty properties object (some client have trouble parsing a schema without properties)
// As opposed, Gemini had trouble for a while when properties was present but empty.
// https://github.com/containers/kubernetes-mcp-server/issues/340
if string(schema) == `{"type":"object"}` {
schema = []byte(`{"type":"object","properties":{}}`)
}
m3labTool.RawInputSchema = schema
}
m3labHandler := func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// get the correct derived Kubernetes client for the target specified in the request
cluster := request.GetString(s.p.GetTargetParameterName(), s.p.GetDefaultTarget())
k, err := s.p.GetDerivedKubernetes(ctx, cluster)
if err != nil {
return nil, err
}
result, err := tool.Handler(api.ToolHandlerParams{
Context: ctx,
Kubernetes: k,
ToolCallRequest: request,
ListOutput: s.configuration.ListOutput(),
})
if err != nil {
return nil, err
}
return NewTextResult(result.Content, result.Error), nil
}
m3labTools = append(m3labTools, server.ServerTool{Tool: m3labTool, Handler: m3labHandler})
}
return m3labTools, nil
}
```
--------------------------------------------------------------------------------
/pkg/http/http.go:
--------------------------------------------------------------------------------
```go
package http
import (
"context"
"errors"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/coreos/go-oidc/v3/oidc"
"k8s.io/klog/v2"
"github.com/containers/kubernetes-mcp-server/pkg/config"
"github.com/containers/kubernetes-mcp-server/pkg/mcp"
)
const (
healthEndpoint = "/healthz"
mcpEndpoint = "/mcp"
sseEndpoint = "/sse"
sseMessageEndpoint = "/message"
)
func Serve(ctx context.Context, mcpServer *mcp.Server, staticConfig *config.StaticConfig, oidcProvider *oidc.Provider, httpClient *http.Client) error {
mux := http.NewServeMux()
wrappedMux := RequestMiddleware(
AuthorizationMiddleware(staticConfig, oidcProvider, mcpServer, httpClient)(mux),
)
httpServer := &http.Server{
Addr: ":" + staticConfig.Port,
Handler: wrappedMux,
}
sseServer := mcpServer.ServeSse(staticConfig.SSEBaseURL, httpServer)
streamableHttpServer := mcpServer.ServeHTTP(httpServer)
mux.Handle(sseEndpoint, sseServer)
mux.Handle(sseMessageEndpoint, sseServer)
mux.Handle(mcpEndpoint, streamableHttpServer)
mux.HandleFunc(healthEndpoint, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
mux.Handle("/.well-known/", WellKnownHandler(staticConfig, httpClient))
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGHUP, syscall.SIGTERM)
serverErr := make(chan error, 1)
go func() {
klog.V(0).Infof("Streaming and SSE HTTP servers starting on port %s and paths /mcp, /sse, /message", staticConfig.Port)
if err := httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
serverErr <- err
}
}()
select {
case sig := <-sigChan:
klog.V(0).Infof("Received signal %v, initiating graceful shutdown", sig)
cancel()
case <-ctx.Done():
klog.V(0).Infof("Context cancelled, initiating graceful shutdown")
case err := <-serverErr:
klog.Errorf("HTTP server error: %v", err)
return err
}
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownCancel()
klog.V(0).Infof("Shutting down HTTP server gracefully...")
if err := httpServer.Shutdown(shutdownCtx); err != nil {
klog.Errorf("HTTP server shutdown error: %v", err)
return err
}
klog.V(0).Infof("HTTP server shutdown complete")
return nil
}
```
--------------------------------------------------------------------------------
/pkg/mcp/mcp_middleware_test.go:
--------------------------------------------------------------------------------
```go
package mcp
import (
"regexp"
"strings"
"testing"
"github.com/mark3labs/mcp-go/client/transport"
)
func TestToolCallLogging(t *testing.T) {
testCaseWithContext(t, &mcpContext{logLevel: 5}, func(c *mcpContext) {
_, _ = c.callTool("configuration_view", map[string]interface{}{
"minified": false,
})
t.Run("Logs tool name", func(t *testing.T) {
expectedLog := "mcp tool call: configuration_view("
if !strings.Contains(c.logBuffer.String(), expectedLog) {
t.Errorf("Expected log to contain '%s', got: %s", expectedLog, c.logBuffer.String())
}
})
t.Run("Logs tool call arguments", func(t *testing.T) {
expected := `"mcp tool call: configuration_view\((.+)\)"`
m := regexp.MustCompile(expected).FindStringSubmatch(c.logBuffer.String())
if len(m) != 2 {
t.Fatalf("Expected log entry to contain arguments, got %s", c.logBuffer.String())
}
if m[1] != "map[minified:false]" {
t.Errorf("Expected log arguments to be 'map[minified:false]', got %s", m[1])
}
})
})
before := func(c *mcpContext) {
c.clientOptions = append(c.clientOptions, transport.WithHeaders(map[string]string{
"Accept-Encoding": "gzip",
"Authorization": "Bearer should-not-be-logged",
"authorization": "Bearer should-not-be-logged",
"a-loggable-header": "should-be-logged",
}))
}
testCaseWithContext(t, &mcpContext{logLevel: 7, before: before}, func(c *mcpContext) {
_, _ = c.callTool("configuration_view", map[string]interface{}{
"minified": false,
})
t.Run("Logs tool call headers", func(t *testing.T) {
expectedLog := "mcp tool call headers: A-Loggable-Header: should-be-logged"
if !strings.Contains(c.logBuffer.String(), expectedLog) {
t.Errorf("Expected log to contain '%s', got: %s", expectedLog, c.logBuffer.String())
}
})
sensitiveHeaders := []string{
"Authorization:",
// TODO: Add more sensitive headers as needed
}
t.Run("Does not log sensitive headers", func(t *testing.T) {
for _, header := range sensitiveHeaders {
if strings.Contains(c.logBuffer.String(), header) {
t.Errorf("Log should not contain sensitive header '%s', got: %s", header, c.logBuffer.String())
}
}
})
t.Run("Does not log sensitive header values", func(t *testing.T) {
if strings.Contains(c.logBuffer.String(), "should-not-be-logged") {
t.Errorf("Log should not contain sensitive header value 'should-not-be-logged', got: %s", c.logBuffer.String())
}
})
})
}
```
--------------------------------------------------------------------------------
/pkg/toolsets/core/nodes.go:
--------------------------------------------------------------------------------
```go
package core
import (
"errors"
"fmt"
"github.com/google/jsonschema-go/jsonschema"
"k8s.io/utils/ptr"
"github.com/containers/kubernetes-mcp-server/pkg/api"
)
func initNodes() []api.ServerTool {
return []api.ServerTool{
{Tool: api.Tool{
Name: "nodes_log",
Description: "Get logs from a Kubernetes node (kubelet, kube-proxy, or other system logs). This accesses node logs through the Kubernetes API proxy to the kubelet",
InputSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"name": {
Type: "string",
Description: "Name of the node to get logs from",
},
"log_path": {
Type: "string",
Description: "Path to the log file on the node (e.g. 'kubelet.log', 'kube-proxy.log'). Default is 'kubelet.log'",
Default: api.ToRawMessage("kubelet.log"),
},
"tail": {
Type: "integer",
Description: "Number of lines to retrieve from the end of the logs (Optional, 0 means all logs)",
Default: api.ToRawMessage(100),
Minimum: ptr.To(float64(0)),
},
},
Required: []string{"name"},
},
Annotations: api.ToolAnnotations{
Title: "Node: Log",
ReadOnlyHint: ptr.To(true),
DestructiveHint: ptr.To(false),
IdempotentHint: ptr.To(false),
OpenWorldHint: ptr.To(true),
},
}, Handler: nodesLog},
}
}
func nodesLog(params api.ToolHandlerParams) (*api.ToolCallResult, error) {
name, ok := params.GetArguments()["name"].(string)
if !ok || name == "" {
return api.NewToolCallResult("", errors.New("failed to get node log, missing argument name")), nil
}
logPath, ok := params.GetArguments()["log_path"].(string)
if !ok || logPath == "" {
logPath = "kubelet.log"
}
tail := params.GetArguments()["tail"]
var tailInt int64
if tail != nil {
// Convert to int64 - safely handle both float64 (JSON number) and int types
switch v := tail.(type) {
case float64:
tailInt = int64(v)
case int:
case int64:
tailInt = v
default:
return api.NewToolCallResult("", fmt.Errorf("failed to parse tail parameter: expected integer, got %T", tail)), nil
}
}
ret, err := params.NodesLog(params, name, logPath, tailInt)
if err != nil {
return api.NewToolCallResult("", fmt.Errorf("failed to get node log for %s: %v", name, err)), nil
} else if ret == "" {
ret = fmt.Sprintf("The node %s has not logged any message yet or the log file is empty", name)
}
return api.NewToolCallResult(ret, nil), nil
}
```
--------------------------------------------------------------------------------
/pkg/mcp/testdata/toolsets-helm-tools.json:
--------------------------------------------------------------------------------
```json
[
{
"annotations": {
"title": "Helm: Install",
"readOnlyHint": false,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "Install a Helm chart in the current or provided namespace",
"inputSchema": {
"type": "object",
"properties": {
"chart": {
"description": "Chart reference to install (for example: stable/grafana, oci://ghcr.io/nginxinc/charts/nginx-ingress)",
"type": "string"
},
"name": {
"description": "Name of the Helm release (Optional, random name if not provided)",
"type": "string"
},
"namespace": {
"description": "Namespace to install the Helm chart in (Optional, current namespace if not provided)",
"type": "string"
},
"values": {
"description": "Values to pass to the Helm chart (Optional)",
"type": "object"
}
},
"required": [
"chart"
]
},
"name": "helm_install"
},
{
"annotations": {
"title": "Helm: List",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "List all the Helm releases in the current or provided namespace (or in all namespaces if specified)",
"inputSchema": {
"type": "object",
"properties": {
"all_namespaces": {
"description": "If true, lists all Helm releases in all namespaces ignoring the namespace argument (Optional)",
"type": "boolean"
},
"namespace": {
"description": "Namespace to list Helm releases from (Optional, all namespaces if not provided)",
"type": "string"
}
}
},
"name": "helm_list"
},
{
"annotations": {
"title": "Helm: Uninstall",
"readOnlyHint": false,
"destructiveHint": true,
"idempotentHint": true,
"openWorldHint": true
},
"description": "Uninstall a Helm release in the current or provided namespace",
"inputSchema": {
"type": "object",
"properties": {
"name": {
"description": "Name of the Helm release to uninstall",
"type": "string"
},
"namespace": {
"description": "Namespace to uninstall the Helm release from (Optional, current namespace if not provided)",
"type": "string"
}
},
"required": [
"name"
]
},
"name": "helm_uninstall"
}
]
```
--------------------------------------------------------------------------------
/pkg/kubernetes/accesscontrol_restmapper.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/restmapper"
"github.com/containers/kubernetes-mcp-server/pkg/config"
)
type AccessControlRESTMapper struct {
delegate *restmapper.DeferredDiscoveryRESTMapper
staticConfig *config.StaticConfig // TODO: maybe just store the denied resource slice
}
var _ meta.RESTMapper = &AccessControlRESTMapper{}
func (a AccessControlRESTMapper) KindFor(resource schema.GroupVersionResource) (schema.GroupVersionKind, error) {
gvk, err := a.delegate.KindFor(resource)
if err != nil {
return schema.GroupVersionKind{}, err
}
if !isAllowed(a.staticConfig, &gvk) {
return schema.GroupVersionKind{}, isNotAllowedError(&gvk)
}
return gvk, nil
}
func (a AccessControlRESTMapper) KindsFor(resource schema.GroupVersionResource) ([]schema.GroupVersionKind, error) {
gvks, err := a.delegate.KindsFor(resource)
if err != nil {
return nil, err
}
for i := range gvks {
if !isAllowed(a.staticConfig, &gvks[i]) {
return nil, isNotAllowedError(&gvks[i])
}
}
return gvks, nil
}
func (a AccessControlRESTMapper) ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, error) {
return a.delegate.ResourceFor(input)
}
func (a AccessControlRESTMapper) ResourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error) {
return a.delegate.ResourcesFor(input)
}
func (a AccessControlRESTMapper) RESTMapping(gk schema.GroupKind, versions ...string) (*meta.RESTMapping, error) {
for _, version := range versions {
gvk := &schema.GroupVersionKind{Group: gk.Group, Version: version, Kind: gk.Kind}
if !isAllowed(a.staticConfig, gvk) {
return nil, isNotAllowedError(gvk)
}
}
return a.delegate.RESTMapping(gk, versions...)
}
func (a AccessControlRESTMapper) RESTMappings(gk schema.GroupKind, versions ...string) ([]*meta.RESTMapping, error) {
for _, version := range versions {
gvk := &schema.GroupVersionKind{Group: gk.Group, Version: version, Kind: gk.Kind}
if !isAllowed(a.staticConfig, gvk) {
return nil, isNotAllowedError(gvk)
}
}
return a.delegate.RESTMappings(gk, versions...)
}
func (a AccessControlRESTMapper) ResourceSingularizer(resource string) (singular string, err error) {
return a.delegate.ResourceSingularizer(resource)
}
func (a AccessControlRESTMapper) Reset() {
a.delegate.Reset()
}
func NewAccessControlRESTMapper(delegate *restmapper.DeferredDiscoveryRESTMapper, staticConfig *config.StaticConfig) *AccessControlRESTMapper {
return &AccessControlRESTMapper{delegate: delegate, staticConfig: staticConfig}
}
```
--------------------------------------------------------------------------------
/pkg/kubernetes/provider_single.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import (
"context"
"errors"
"fmt"
"github.com/containers/kubernetes-mcp-server/pkg/config"
authenticationv1api "k8s.io/api/authentication/v1"
)
// singleClusterProvider implements Provider for managing a single
// Kubernetes cluster. Used for in-cluster deployments or when multi-cluster
// support is disabled.
type singleClusterProvider struct {
strategy string
manager *Manager
}
var _ Provider = &singleClusterProvider{}
func init() {
RegisterProvider(config.ClusterProviderInCluster, newSingleClusterProvider(config.ClusterProviderInCluster))
RegisterProvider(config.ClusterProviderDisabled, newSingleClusterProvider(config.ClusterProviderDisabled))
}
// newSingleClusterProvider creates a provider that manages a single cluster.
// When used within a cluster or with an 'in-cluster' strategy, it uses an InClusterManager.
// Otherwise, it uses a KubeconfigManager.
func newSingleClusterProvider(strategy string) ProviderFactory {
return func(cfg *config.StaticConfig) (Provider, error) {
if cfg != nil && cfg.KubeConfig != "" && strategy == config.ClusterProviderInCluster {
return nil, fmt.Errorf("kubeconfig file %s cannot be used with the in-cluster ClusterProviderStrategy", cfg.KubeConfig)
}
var m *Manager
var err error
if strategy == config.ClusterProviderInCluster || IsInCluster(cfg) {
m, err = NewInClusterManager(cfg)
} else {
m, err = NewKubeconfigManager(cfg, "")
}
if err != nil {
if errors.Is(err, ErrorInClusterNotInCluster) {
return nil, fmt.Errorf("server must be deployed in cluster for the %s ClusterProviderStrategy: %v", strategy, err)
}
return nil, err
}
return &singleClusterProvider{
manager: m,
strategy: strategy,
}, nil
}
}
func (p *singleClusterProvider) IsOpenShift(ctx context.Context) bool {
return p.manager.IsOpenShift(ctx)
}
func (p *singleClusterProvider) VerifyToken(ctx context.Context, target, token, audience string) (*authenticationv1api.UserInfo, []string, error) {
if target != "" {
return nil, nil, fmt.Errorf("unable to get manager for other context/cluster with %s strategy", p.strategy)
}
return p.manager.VerifyToken(ctx, token, audience)
}
func (p *singleClusterProvider) GetTargets(_ context.Context) ([]string, error) {
return []string{""}, nil
}
func (p *singleClusterProvider) GetDerivedKubernetes(ctx context.Context, target string) (*Kubernetes, error) {
if target != "" {
return nil, fmt.Errorf("unable to get manager for other context/cluster with %s strategy", p.strategy)
}
return p.manager.Derived(ctx)
}
func (p *singleClusterProvider) GetDefaultTarget() string {
return ""
}
func (p *singleClusterProvider) GetTargetParameterName() string {
return ""
}
func (p *singleClusterProvider) WatchTargets(watch func() error) {
p.manager.WatchKubeConfig(watch)
}
func (p *singleClusterProvider) Close() {
p.manager.Close()
}
```
--------------------------------------------------------------------------------
/pkg/kubernetes/configuration.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import (
"github.com/containers/kubernetes-mcp-server/pkg/config"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/client-go/tools/clientcmd/api/latest"
)
const inClusterKubeConfigDefaultContext = "in-cluster"
// InClusterConfig is a variable that holds the function to get the in-cluster config
// Exposed for testing
var InClusterConfig = func() (*rest.Config, error) {
// TODO use kubernetes.default.svc instead of resolved server
// Currently running into: `http: server gave HTTP response to HTTPS client`
inClusterConfig, err := rest.InClusterConfig()
if inClusterConfig != nil {
inClusterConfig.Host = "https://kubernetes.default.svc"
}
return inClusterConfig, err
}
func IsInCluster(cfg *config.StaticConfig) bool {
// Even if running in-cluster, if a kubeconfig is provided, we consider it as out-of-cluster
if cfg != nil && cfg.KubeConfig != "" {
return false
}
restConfig, err := InClusterConfig()
return err == nil && restConfig != nil
}
func (k *Kubernetes) NamespaceOrDefault(namespace string) string {
return k.manager.NamespaceOrDefault(namespace)
}
// ConfigurationContextsDefault returns the current context name
// TODO: Should be moved to the Provider level ?
func (k *Kubernetes) ConfigurationContextsDefault() (string, error) {
cfg, err := k.manager.clientCmdConfig.RawConfig()
if err != nil {
return "", err
}
return cfg.CurrentContext, nil
}
// ConfigurationContextsList returns the list of available context names
// TODO: Should be moved to the Provider level ?
func (k *Kubernetes) ConfigurationContextsList() (map[string]string, error) {
cfg, err := k.manager.clientCmdConfig.RawConfig()
if err != nil {
return nil, err
}
contexts := make(map[string]string, len(cfg.Contexts))
for name, context := range cfg.Contexts {
cluster, ok := cfg.Clusters[context.Cluster]
if !ok || cluster.Server == "" {
contexts[name] = "unknown"
} else {
contexts[name] = cluster.Server
}
}
return contexts, nil
}
// ConfigurationView returns the current kubeconfig content as a kubeconfig YAML
// If minify is true, keeps only the current-context and the relevant pieces of the configuration for that context.
// If minify is false, all contexts, clusters, auth-infos, and users are returned in the configuration.
// TODO: Should be moved to the Provider level ?
func (k *Kubernetes) ConfigurationView(minify bool) (runtime.Object, error) {
var cfg clientcmdapi.Config
var err error
if cfg, err = k.manager.clientCmdConfig.RawConfig(); err != nil {
return nil, err
}
if minify {
if err = clientcmdapi.MinifyConfig(&cfg); err != nil {
return nil, err
}
}
//nolint:staticcheck
if err = clientcmdapi.FlattenConfig(&cfg); err != nil {
// ignore error
//return "", err
}
return latest.Scheme.ConvertToVersion(&cfg, latest.ExternalVersion)
}
```
--------------------------------------------------------------------------------
/pkg/mcp/tool_filter_test.go:
--------------------------------------------------------------------------------
```go
package mcp
import (
"testing"
"github.com/containers/kubernetes-mcp-server/pkg/api"
"github.com/stretchr/testify/suite"
"k8s.io/utils/ptr"
)
type ToolFilterSuite struct {
suite.Suite
}
func (s *ToolFilterSuite) TestToolFilterType() {
s.Run("ToolFilter type can be used as function", func() {
var mutator ToolFilter = func(tool api.ServerTool) bool {
return tool.Tool.Name == "included"
}
s.Run("returns true for included tool", func() {
tool := api.ServerTool{Tool: api.Tool{Name: "included"}}
s.True(mutator(tool))
})
s.Run("returns false for excluded tool", func() {
tool := api.ServerTool{Tool: api.Tool{Name: "excluded"}}
s.False(mutator(tool))
})
})
}
func (s *ToolFilterSuite) TestCompositeFilter() {
s.Run("returns true if all filters return true", func() {
filter := CompositeFilter(
func(tool api.ServerTool) bool { return true },
func(tool api.ServerTool) bool { return true },
)
tool := api.ServerTool{Tool: api.Tool{Name: "test"}}
s.True(filter(tool))
})
s.Run("returns false if any filter returns false", func() {
filter := CompositeFilter(
func(tool api.ServerTool) bool { return true },
func(tool api.ServerTool) bool { return false },
)
tool := api.ServerTool{Tool: api.Tool{Name: "test"}}
s.False(filter(tool))
})
}
func (s *ToolFilterSuite) TestShouldIncludeTargetListTool() {
s.Run("non-target-list-provider tools: returns true ", func() {
filter := ShouldIncludeTargetListTool("any", []string{"a", "b", "c", "d", "e", "f"})
tool := api.ServerTool{Tool: api.Tool{Name: "test"}, TargetListProvider: ptr.To(false)}
s.True(filter(tool))
})
s.Run("target-list-provider tools", func() {
s.Run("with targets == 1: returns false", func() {
filter := ShouldIncludeTargetListTool("any", []string{"1"})
tool := api.ServerTool{Tool: api.Tool{Name: "test"}, TargetListProvider: ptr.To(true)}
s.False(filter(tool))
})
s.Run("with targets == 1", func() {
s.Run("and tool is configuration_contexts_list and targetName is not context: returns false", func() {
filter := ShouldIncludeTargetListTool("not_context", []string{"1"})
tool := api.ServerTool{Tool: api.Tool{Name: "configuration_contexts_list"}, TargetListProvider: ptr.To(true)}
s.False(filter(tool))
})
s.Run("and tool is configuration_contexts_list and targetName is context: returns false", func() {
filter := ShouldIncludeTargetListTool("context", []string{"1"})
tool := api.ServerTool{Tool: api.Tool{Name: "configuration_contexts_list"}, TargetListProvider: ptr.To(true)}
s.False(filter(tool))
})
s.Run("and tool is not configuration_contexts_list: returns false", func() {
filter := ShouldIncludeTargetListTool("any", []string{"1"})
tool := api.ServerTool{Tool: api.Tool{Name: "other_tool"}, TargetListProvider: ptr.To(true)}
s.False(filter(tool))
})
})
})
}
func TestToolFilter(t *testing.T) {
suite.Run(t, new(ToolFilterSuite))
}
```
--------------------------------------------------------------------------------
/pkg/http/wellknown.go:
--------------------------------------------------------------------------------
```go
package http
import (
"encoding/json"
"fmt"
"net/http"
"strings"
"github.com/containers/kubernetes-mcp-server/pkg/config"
)
const (
oauthAuthorizationServerEndpoint = "/.well-known/oauth-authorization-server"
oauthProtectedResourceEndpoint = "/.well-known/oauth-protected-resource"
openIDConfigurationEndpoint = "/.well-known/openid-configuration"
)
var WellKnownEndpoints = []string{
oauthAuthorizationServerEndpoint,
oauthProtectedResourceEndpoint,
openIDConfigurationEndpoint,
}
type WellKnown struct {
authorizationUrl string
scopesSupported []string
disableDynamicClientRegistration bool
httpClient *http.Client
}
var _ http.Handler = &WellKnown{}
func WellKnownHandler(staticConfig *config.StaticConfig, httpClient *http.Client) http.Handler {
authorizationUrl := staticConfig.AuthorizationURL
if authorizationUrl != "" && strings.HasSuffix(authorizationUrl, "/") {
authorizationUrl = strings.TrimSuffix(authorizationUrl, "/")
}
if httpClient == nil {
httpClient = http.DefaultClient
}
return &WellKnown{
authorizationUrl: authorizationUrl,
disableDynamicClientRegistration: staticConfig.DisableDynamicClientRegistration,
scopesSupported: staticConfig.OAuthScopes,
httpClient: httpClient,
}
}
func (w WellKnown) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
if w.authorizationUrl == "" {
http.Error(writer, "Authorization URL is not configured", http.StatusNotFound)
return
}
req, err := http.NewRequest(request.Method, w.authorizationUrl+request.URL.EscapedPath(), nil)
if err != nil {
http.Error(writer, "Failed to create request: "+err.Error(), http.StatusInternalServerError)
return
}
for key, values := range request.Header {
for _, value := range values {
req.Header.Add(key, value)
}
}
resp, err := w.httpClient.Do(req.WithContext(request.Context()))
if err != nil {
http.Error(writer, "Failed to perform request: "+err.Error(), http.StatusInternalServerError)
return
}
defer func() { _ = resp.Body.Close() }()
var resourceMetadata map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&resourceMetadata)
if err != nil {
http.Error(writer, "Failed to read response body: "+err.Error(), http.StatusInternalServerError)
return
}
if w.disableDynamicClientRegistration {
delete(resourceMetadata, "registration_endpoint")
resourceMetadata["require_request_uri_registration"] = false
}
if len(w.scopesSupported) > 0 {
resourceMetadata["scopes_supported"] = w.scopesSupported
}
body, err := json.Marshal(resourceMetadata)
if err != nil {
http.Error(writer, "Failed to marshal response body: "+err.Error(), http.StatusInternalServerError)
return
}
for key, values := range resp.Header {
for _, value := range values {
writer.Header().Add(key, value)
}
}
writer.Header().Set("Content-Length", fmt.Sprintf("%d", len(body)))
writer.WriteHeader(resp.StatusCode)
_, _ = writer.Write(body)
}
```
--------------------------------------------------------------------------------
/pkg/toolsets/toolsets_test.go:
--------------------------------------------------------------------------------
```go
package toolsets
import (
"testing"
"github.com/containers/kubernetes-mcp-server/pkg/api"
"github.com/containers/kubernetes-mcp-server/pkg/kubernetes"
"github.com/stretchr/testify/suite"
)
type ToolsetsSuite struct {
suite.Suite
originalToolsets []api.Toolset
}
func (s *ToolsetsSuite) SetupTest() {
s.originalToolsets = Toolsets()
Clear()
}
func (s *ToolsetsSuite) TearDownTest() {
for _, toolset := range s.originalToolsets {
Register(toolset)
}
}
type TestToolset struct {
name string
description string
}
func (t *TestToolset) GetName() string { return t.name }
func (t *TestToolset) GetDescription() string { return t.description }
func (t *TestToolset) GetTools(_ kubernetes.Openshift) []api.ServerTool { return nil }
var _ api.Toolset = (*TestToolset)(nil)
func (s *ToolsetsSuite) TestToolsetNames() {
s.Run("Returns empty list if no toolsets registered", func() {
s.Empty(ToolsetNames(), "Expected empty list of toolset names")
})
Register(&TestToolset{name: "z"})
Register(&TestToolset{name: "b"})
Register(&TestToolset{name: "1"})
s.Run("Returns sorted list of registered toolset names", func() {
names := ToolsetNames()
s.Equal([]string{"1", "b", "z"}, names, "Expected sorted list of toolset names")
})
}
func (s *ToolsetsSuite) TestToolsetFromString() {
s.Run("Returns nil if toolset not found", func() {
s.Nil(ToolsetFromString("non-existent"), "Expected nil for non-existent toolset")
})
s.Run("Returns the correct toolset if found", func() {
Register(&TestToolset{name: "existent"})
res := ToolsetFromString("existent")
s.NotNil(res, "Expected to find the registered toolset")
s.Equal("existent", res.GetName(), "Expected to find the registered toolset by name")
})
s.Run("Returns the correct toolset if found after trimming spaces", func() {
Register(&TestToolset{name: "no-spaces"})
res := ToolsetFromString(" no-spaces ")
s.NotNil(res, "Expected to find the registered toolset")
s.Equal("no-spaces", res.GetName(), "Expected to find the registered toolset by name")
})
}
func (s *ToolsetsSuite) TestValidate() {
s.Run("Returns nil for empty toolset list", func() {
s.Nil(Validate([]string{}), "Expected nil for empty toolset list")
})
s.Run("Returns error for invalid toolset name", func() {
err := Validate([]string{"invalid"})
s.NotNil(err, "Expected error for invalid toolset name")
s.Contains(err.Error(), "invalid toolset name: invalid", "Expected error message to contain invalid toolset name")
})
s.Run("Returns nil for valid toolset names", func() {
Register(&TestToolset{name: "valid-1"})
Register(&TestToolset{name: "valid-2"})
err := Validate([]string{"valid-1", "valid-2"})
s.Nil(err, "Expected nil for valid toolset names")
})
s.Run("Returns error if any toolset name is invalid", func() {
Register(&TestToolset{name: "valid"})
err := Validate([]string{"valid", "invalid"})
s.NotNil(err, "Expected error if any toolset name is invalid")
s.Contains(err.Error(), "invalid toolset name: invalid", "Expected error message to contain invalid toolset name")
})
}
func TestToolsets(t *testing.T) {
suite.Run(t, new(ToolsetsSuite))
}
```
--------------------------------------------------------------------------------
/python/kubernetes_mcp_server/kubernetes_mcp_server.py:
--------------------------------------------------------------------------------
```python
import os
import platform
import subprocess
import sys
from pathlib import Path
import shutil
import tempfile
import urllib.request
if sys.version_info >= (3, 8):
from importlib.metadata import version
else:
from importlib_metadata import version
__version__ = version("kubernetes-mcp-server")
def get_platform_binary():
"""Determine the correct binary for the current platform."""
system = platform.system().lower()
arch = platform.machine().lower()
# Normalize architecture names
if arch in ["x86_64", "amd64"]:
arch = "amd64"
elif arch in ["arm64", "aarch64"]:
arch = "arm64"
else:
raise RuntimeError(f"Unsupported architecture: {arch}")
if system == "darwin":
return f"kubernetes-mcp-server-darwin-{arch}"
elif system == "linux":
return f"kubernetes-mcp-server-linux-{arch}"
elif system == "windows":
return f"kubernetes-mcp-server-windows-{arch}.exe"
else:
raise RuntimeError(f"Unsupported operating system: {system}")
def download_binary(binary_version="latest", destination=None):
"""Download the correct binary for the current platform."""
binary_name = get_platform_binary()
if destination is None:
destination = Path.home() / ".kubernetes-mcp-server" / "bin" / binary_version
destination = Path(destination)
destination.mkdir(parents=True, exist_ok=True)
binary_path = destination / binary_name
if binary_path.exists():
return binary_path
base_url = "https://github.com/containers/kubernetes-mcp-server/releases"
if binary_version == "latest":
release_url = f"{base_url}/latest/download/{binary_name}"
else:
release_url = f"{base_url}/download/v{binary_version}/{binary_name}"
# Download the binary
print(f"Downloading {binary_name} from {release_url}")
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
try:
with urllib.request.urlopen(release_url) as response:
shutil.copyfileobj(response, temp_file)
temp_file.close()
# Move to destination and make executable
shutil.move(temp_file.name, binary_path)
binary_path.chmod(binary_path.stat().st_mode | 0o755) # Make executable
return binary_path
except Exception as e:
os.unlink(temp_file.name)
raise RuntimeError(f"Failed to download binary: {e}")
def execute(args=None):
"""Download and execute the kubernetes-mcp-server binary."""
if args is None:
args = []
try:
binary_path = download_binary(binary_version=__version__)
cmd = [str(binary_path)] + args
# Execute the binary with the provided arguments
process = subprocess.run(cmd)
return process.returncode
except Exception as e:
print(f"Error executing kubernetes-mcp-server: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(execute(sys.argv[1:]))
def main():
"""Main function to execute the kubernetes-mcp-server binary."""
args = sys.argv[1:] if len(sys.argv) > 1 else []
return execute(args)
```
--------------------------------------------------------------------------------
/pkg/output/output.go:
--------------------------------------------------------------------------------
```go
package output
import (
"bytes"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/cli-runtime/pkg/printers"
yml "sigs.k8s.io/yaml"
)
var Yaml = &yaml{}
var Table = &table{}
type Output interface {
// GetName returns the name of the output format, will be used by the CLI to identify the output format.
GetName() string
// AsTable true if the kubernetes request should be made with the `application/json;as=Table;v=0.1` header.
AsTable() bool
// PrintObj prints the given object as a string.
PrintObj(obj runtime.Unstructured) (string, error)
}
var Outputs = []Output{
Yaml,
Table,
}
var Names []string
func FromString(name string) Output {
for _, output := range Outputs {
if output.GetName() == name {
return output
}
}
return nil
}
type yaml struct{}
func (p *yaml) GetName() string {
return "yaml"
}
func (p *yaml) AsTable() bool {
return false
}
func (p *yaml) PrintObj(obj runtime.Unstructured) (string, error) {
return MarshalYaml(obj)
}
type table struct{}
func (p *table) GetName() string {
return "table"
}
func (p *table) AsTable() bool {
return true
}
func (p *table) PrintObj(obj runtime.Unstructured) (string, error) {
var objectToPrint runtime.Object = obj
withNamespace := false
if obj.GetObjectKind().GroupVersionKind() == metav1.SchemeGroupVersion.WithKind("Table") {
t := &metav1.Table{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), t); err == nil {
objectToPrint = t
// Process the Raw object to retrieve the complete metadata (see kubectl/pkg/printers/table_printer.go)
for i := range t.Rows {
row := &t.Rows[i]
if row.Object.Raw == nil || row.Object.Object != nil {
continue
}
row.Object.Object, err = runtime.Decode(unstructured.UnstructuredJSONScheme, row.Object.Raw)
// Print namespace if at least one row has it (object is namespaced)
if err == nil && !withNamespace {
switch rowObject := row.Object.Object.(type) {
case *unstructured.Unstructured:
withNamespace = rowObject.GetNamespace() != ""
}
}
}
}
}
buf := new(bytes.Buffer)
// TablePrinter is mutable and not thread-safe, must create a new instance each time.
printer := printers.NewTablePrinter(printers.PrintOptions{
WithNamespace: withNamespace,
WithKind: true,
Wide: true,
ShowLabels: true,
})
err := printer.PrintObj(objectToPrint, buf)
return buf.String(), err
}
func MarshalYaml(v any) (string, error) {
switch t := v.(type) {
//case unstructured.UnstructuredList:
// for i := range t.Items {
// t.Items[i].SetManagedFields(nil)
// }
// v = t.Items
case *unstructured.UnstructuredList:
for i := range t.Items {
t.Items[i].SetManagedFields(nil)
}
v = t.Items
//case unstructured.Unstructured:
// t.SetManagedFields(nil)
case *unstructured.Unstructured:
t.SetManagedFields(nil)
}
ret, err := yml.Marshal(v)
if err != nil {
return "", err
}
return string(ret), nil
}
func init() {
Names = make([]string, 0)
for _, output := range Outputs {
Names = append(Names, output.GetName())
}
}
```
--------------------------------------------------------------------------------
/internal/tools/update-readme/main.go:
--------------------------------------------------------------------------------
```go
package main
import (
"context"
"fmt"
"maps"
"os"
"path/filepath"
"slices"
"strings"
internalk8s "github.com/containers/kubernetes-mcp-server/pkg/kubernetes"
"github.com/containers/kubernetes-mcp-server/pkg/toolsets"
_ "github.com/containers/kubernetes-mcp-server/pkg/toolsets/config"
_ "github.com/containers/kubernetes-mcp-server/pkg/toolsets/core"
_ "github.com/containers/kubernetes-mcp-server/pkg/toolsets/helm"
)
type OpenShift struct{}
func (o *OpenShift) IsOpenShift(ctx context.Context) bool {
return true
}
var _ internalk8s.Openshift = (*OpenShift)(nil)
func main() {
// Snyk reports false positive unless we flow the args through filepath.Clean and filepath.Localize in this specific order
var err error
localReadmePath := filepath.Clean(os.Args[1])
localReadmePath, err = filepath.Localize(localReadmePath)
if err != nil {
panic(err)
}
readme, err := os.ReadFile(localReadmePath)
if err != nil {
panic(err)
}
// Available Toolsets
toolsetsList := toolsets.Toolsets()
maxNameLen, maxDescLen := len("Toolset"), len("Description")
for _, toolset := range toolsetsList {
nameLen := len(toolset.GetName())
descLen := len(toolset.GetDescription())
if nameLen > maxNameLen {
maxNameLen = nameLen
}
if descLen > maxDescLen {
maxDescLen = descLen
}
}
availableToolsets := strings.Builder{}
availableToolsets.WriteString(fmt.Sprintf("| %-*s | %-*s |\n", maxNameLen, "Toolset", maxDescLen, "Description"))
availableToolsets.WriteString(fmt.Sprintf("|-%s-|-%s-|\n", strings.Repeat("-", maxNameLen), strings.Repeat("-", maxDescLen)))
for _, toolset := range toolsetsList {
availableToolsets.WriteString(fmt.Sprintf("| %-*s | %-*s |\n", maxNameLen, toolset.GetName(), maxDescLen, toolset.GetDescription()))
}
updated := replaceBetweenMarkers(
string(readme),
"<!-- AVAILABLE-TOOLSETS-START -->",
"<!-- AVAILABLE-TOOLSETS-END -->",
availableToolsets.String(),
)
// Available Toolset Tools
toolsetTools := strings.Builder{}
for _, toolset := range toolsetsList {
toolsetTools.WriteString("<details>\n\n<summary>" + toolset.GetName() + "</summary>\n\n")
tools := toolset.GetTools(&OpenShift{})
for _, tool := range tools {
toolsetTools.WriteString(fmt.Sprintf("- **%s** - %s\n", tool.Tool.Name, tool.Tool.Description))
for _, propName := range slices.Sorted(maps.Keys(tool.Tool.InputSchema.Properties)) {
property := tool.Tool.InputSchema.Properties[propName]
toolsetTools.WriteString(fmt.Sprintf(" - `%s` (`%s`)", propName, property.Type))
if slices.Contains(tool.Tool.InputSchema.Required, propName) {
toolsetTools.WriteString(" **(required)**")
}
toolsetTools.WriteString(fmt.Sprintf(" - %s\n", property.Description))
}
toolsetTools.WriteString("\n")
}
toolsetTools.WriteString("</details>\n\n")
}
updated = replaceBetweenMarkers(
updated,
"<!-- AVAILABLE-TOOLSETS-TOOLS-START -->",
"<!-- AVAILABLE-TOOLSETS-TOOLS-END -->",
toolsetTools.String(),
)
if err := os.WriteFile(localReadmePath, []byte(updated), 0o644); err != nil {
panic(err)
}
}
func replaceBetweenMarkers(content, startMarker, endMarker, replacement string) string {
startIdx := strings.Index(content, startMarker)
if startIdx == -1 {
return content
}
endIdx := strings.Index(content, endMarker)
if endIdx == -1 || endIdx <= startIdx {
return content
}
return content[:startIdx+len(startMarker)] + "\n\n" + replacement + "\n" + content[endIdx:]
}
```
--------------------------------------------------------------------------------
/pkg/api/toolsets.go:
--------------------------------------------------------------------------------
```go
package api
import (
"context"
"encoding/json"
internalk8s "github.com/containers/kubernetes-mcp-server/pkg/kubernetes"
"github.com/containers/kubernetes-mcp-server/pkg/output"
"github.com/google/jsonschema-go/jsonschema"
)
type ServerTool struct {
Tool Tool
Handler ToolHandlerFunc
ClusterAware *bool
TargetListProvider *bool
}
// IsClusterAware indicates whether the tool can accept a "cluster" or "context" parameter
// to operate on a specific Kubernetes cluster context.
// Defaults to true if not explicitly set
func (s *ServerTool) IsClusterAware() bool {
if s.ClusterAware != nil {
return *s.ClusterAware
}
return true
}
// IsTargetListProvider indicates whether the tool is used to provide a list of targets (clusters/contexts)
// Defaults to false if not explicitly set
func (s *ServerTool) IsTargetListProvider() bool {
if s.TargetListProvider != nil {
return *s.TargetListProvider
}
return false
}
type Toolset interface {
// GetName returns the name of the toolset.
// Used to identify the toolset in configuration, logs, and command-line arguments.
// Examples: "core", "metrics", "helm"
GetName() string
GetDescription() string
GetTools(o internalk8s.Openshift) []ServerTool
}
type ToolCallRequest interface {
GetArguments() map[string]any
}
type ToolCallResult struct {
// Raw content returned by the tool.
Content string
// Error (non-protocol) to send back to the LLM.
Error error
}
func NewToolCallResult(content string, err error) *ToolCallResult {
return &ToolCallResult{
Content: content,
Error: err,
}
}
type ToolHandlerParams struct {
context.Context
*internalk8s.Kubernetes
ToolCallRequest
ListOutput output.Output
}
type ToolHandlerFunc func(params ToolHandlerParams) (*ToolCallResult, error)
type Tool struct {
// The name of the tool.
// Intended for programmatic or logical use, but used as a display name in past
// specs or fallback (if title isn't present).
Name string `json:"name"`
// A human-readable description of the tool.
//
// This can be used by clients to improve the LLM's understanding of available
// tools. It can be thought of like a "hint" to the model.
Description string `json:"description,omitempty"`
// Additional tool information.
Annotations ToolAnnotations `json:"annotations"`
// A JSON Schema object defining the expected parameters for the tool.
InputSchema *jsonschema.Schema
}
type ToolAnnotations struct {
// Human-readable title for the tool
Title string `json:"title,omitempty"`
// If true, the tool does not modify its environment.
ReadOnlyHint *bool `json:"readOnlyHint,omitempty"`
// If true, the tool may perform destructive updates to its environment. If
// false, the tool performs only additive updates.
//
// (This property is meaningful only when ReadOnlyHint == false.)
DestructiveHint *bool `json:"destructiveHint,omitempty"`
// If true, calling the tool repeatedly with the same arguments will have no
// additional effect on its environment.
//
// (This property is meaningful only when ReadOnlyHint == false.)
IdempotentHint *bool `json:"idempotentHint,omitempty"`
// If true, this tool may interact with an "open world" of external entities. If
// false, the tool's domain of interaction is closed. For example, the world of
// a web search tool is open, whereas that of a memory tool is not.
OpenWorldHint *bool `json:"openWorldHint,omitempty"`
}
func ToRawMessage(v any) json.RawMessage {
if v == nil {
return nil
}
b, err := json.Marshal(v)
if err != nil {
return nil
}
return b
}
```
--------------------------------------------------------------------------------
/pkg/kubernetes/provider_kubeconfig.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import (
"context"
"errors"
"fmt"
"github.com/containers/kubernetes-mcp-server/pkg/config"
authenticationv1api "k8s.io/api/authentication/v1"
)
// KubeConfigTargetParameterName is the parameter name used to specify
// the kubeconfig context when using the kubeconfig cluster provider strategy.
const KubeConfigTargetParameterName = "context"
// kubeConfigClusterProvider implements Provider for managing multiple
// Kubernetes clusters using different contexts from a kubeconfig file.
// It lazily initializes managers for each context as they are requested.
type kubeConfigClusterProvider struct {
defaultContext string
managers map[string]*Manager
}
var _ Provider = &kubeConfigClusterProvider{}
func init() {
RegisterProvider(config.ClusterProviderKubeConfig, newKubeConfigClusterProvider)
}
// newKubeConfigClusterProvider creates a provider that manages multiple clusters
// via kubeconfig contexts.
// Internally, it leverages a KubeconfigManager for each context, initializing them
// lazily when requested.
func newKubeConfigClusterProvider(cfg *config.StaticConfig) (Provider, error) {
m, err := NewKubeconfigManager(cfg, "")
if err != nil {
if errors.Is(err, ErrorKubeconfigInClusterNotAllowed) {
return nil, fmt.Errorf("kubeconfig ClusterProviderStrategy is invalid for in-cluster deployments: %v", err)
}
return nil, err
}
rawConfig, err := m.clientCmdConfig.RawConfig()
if err != nil {
return nil, err
}
allClusterManagers := map[string]*Manager{
rawConfig.CurrentContext: m, // we already initialized a manager for the default context, let's use it
}
for name := range rawConfig.Contexts {
if name == rawConfig.CurrentContext {
continue // already initialized this, don't want to set it to nil
}
allClusterManagers[name] = nil
}
return &kubeConfigClusterProvider{
defaultContext: rawConfig.CurrentContext,
managers: allClusterManagers,
}, nil
}
func (p *kubeConfigClusterProvider) managerForContext(context string) (*Manager, error) {
m, ok := p.managers[context]
if ok && m != nil {
return m, nil
}
baseManager := p.managers[p.defaultContext]
m, err := NewKubeconfigManager(baseManager.staticConfig, context)
if err != nil {
return nil, err
}
p.managers[context] = m
return m, nil
}
func (p *kubeConfigClusterProvider) IsOpenShift(ctx context.Context) bool {
return p.managers[p.defaultContext].IsOpenShift(ctx)
}
func (p *kubeConfigClusterProvider) VerifyToken(ctx context.Context, context, token, audience string) (*authenticationv1api.UserInfo, []string, error) {
m, err := p.managerForContext(context)
if err != nil {
return nil, nil, err
}
return m.VerifyToken(ctx, token, audience)
}
func (p *kubeConfigClusterProvider) GetTargets(_ context.Context) ([]string, error) {
contextNames := make([]string, 0, len(p.managers))
for contextName := range p.managers {
contextNames = append(contextNames, contextName)
}
return contextNames, nil
}
func (p *kubeConfigClusterProvider) GetTargetParameterName() string {
return KubeConfigTargetParameterName
}
func (p *kubeConfigClusterProvider) GetDerivedKubernetes(ctx context.Context, context string) (*Kubernetes, error) {
m, err := p.managerForContext(context)
if err != nil {
return nil, err
}
return m.Derived(ctx)
}
func (p *kubeConfigClusterProvider) GetDefaultTarget() string {
return p.defaultContext
}
func (p *kubeConfigClusterProvider) WatchTargets(onKubeConfigChanged func() error) {
m := p.managers[p.defaultContext]
m.WatchKubeConfig(onKubeConfigChanged)
}
func (p *kubeConfigClusterProvider) Close() {
m := p.managers[p.defaultContext]
m.Close()
}
```
--------------------------------------------------------------------------------
/pkg/toolsets/config/configuration.go:
--------------------------------------------------------------------------------
```go
package config
import (
"fmt"
"github.com/google/jsonschema-go/jsonschema"
"k8s.io/utils/ptr"
"github.com/containers/kubernetes-mcp-server/pkg/api"
"github.com/containers/kubernetes-mcp-server/pkg/output"
)
func initConfiguration() []api.ServerTool {
tools := []api.ServerTool{
{
Tool: api.Tool{
Name: "configuration_contexts_list",
Description: "List all available context names and associated server urls from the kubeconfig file",
InputSchema: &jsonschema.Schema{
Type: "object",
},
Annotations: api.ToolAnnotations{
Title: "Configuration: Contexts List",
ReadOnlyHint: ptr.To(true),
DestructiveHint: ptr.To(false),
IdempotentHint: ptr.To(true),
OpenWorldHint: ptr.To(false),
},
},
ClusterAware: ptr.To(false),
TargetListProvider: ptr.To(true),
Handler: contextsList,
},
{
Tool: api.Tool{
Name: "configuration_view",
Description: "Get the current Kubernetes configuration content as a kubeconfig YAML",
InputSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"minified": {
Type: "boolean",
Description: "Return a minified version of the configuration. " +
"If set to true, keeps only the current-context and the relevant pieces of the configuration for that context. " +
"If set to false, all contexts, clusters, auth-infos, and users are returned in the configuration. " +
"(Optional, default true)",
},
},
},
Annotations: api.ToolAnnotations{
Title: "Configuration: View",
ReadOnlyHint: ptr.To(true),
DestructiveHint: ptr.To(false),
IdempotentHint: ptr.To(false),
OpenWorldHint: ptr.To(true),
},
},
ClusterAware: ptr.To(false),
Handler: configurationView,
},
}
return tools
}
func contextsList(params api.ToolHandlerParams) (*api.ToolCallResult, error) {
contexts, err := params.ConfigurationContextsList()
if err != nil {
return api.NewToolCallResult("", fmt.Errorf("failed to list contexts: %v", err)), nil
}
if len(contexts) == 0 {
return api.NewToolCallResult("No contexts found in kubeconfig", nil), nil
}
defaultContext, err := params.ConfigurationContextsDefault()
if err != nil {
return api.NewToolCallResult("", fmt.Errorf("failed to get default context: %v", err)), nil
}
result := fmt.Sprintf("Available Kubernetes contexts (%d total, default: %s):\n\n", len(contexts), defaultContext)
result += "Format: [*] CONTEXT_NAME -> SERVER_URL\n"
result += " (* indicates the default context used in tools if context is not set)\n\n"
result += "Contexts:\n---------\n"
for context, server := range contexts {
marker := " "
if context == defaultContext {
marker = "*"
}
result += fmt.Sprintf("%s%s -> %s\n", marker, context, server)
}
result += "---------\n\n"
result += "To use a specific context with any tool, set the 'context' parameter in the tool call arguments"
// TODO: Review output format, current is not parseable and might not be ideal for LLM consumption
return api.NewToolCallResult(result, nil), nil
}
func configurationView(params api.ToolHandlerParams) (*api.ToolCallResult, error) {
minify := true
minified := params.GetArguments()["minified"]
if _, ok := minified.(bool); ok {
minify = minified.(bool)
}
ret, err := params.ConfigurationView(minify)
if err != nil {
return api.NewToolCallResult("", fmt.Errorf("failed to get configuration: %v", err)), nil
}
configurationYaml, err := output.MarshalYaml(ret)
if err != nil {
err = fmt.Errorf("failed to get configuration: %v", err)
}
return api.NewToolCallResult(configurationYaml, err), nil
}
```
--------------------------------------------------------------------------------
/pkg/helm/helm.go:
--------------------------------------------------------------------------------
```go
package helm
import (
"context"
"fmt"
"helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/chart/loader"
"helm.sh/helm/v3/pkg/cli"
"helm.sh/helm/v3/pkg/registry"
"helm.sh/helm/v3/pkg/release"
"k8s.io/cli-runtime/pkg/genericclioptions"
"log"
"sigs.k8s.io/yaml"
"time"
)
type Kubernetes interface {
genericclioptions.RESTClientGetter
NamespaceOrDefault(namespace string) string
}
type Helm struct {
kubernetes Kubernetes
}
// NewHelm creates a new Helm instance
func NewHelm(kubernetes Kubernetes) *Helm {
return &Helm{kubernetes: kubernetes}
}
func (h *Helm) Install(ctx context.Context, chart string, values map[string]interface{}, name string, namespace string) (string, error) {
cfg, err := h.newAction(h.kubernetes.NamespaceOrDefault(namespace), false)
if err != nil {
return "", err
}
install := action.NewInstall(cfg)
if name == "" {
install.GenerateName = true
install.ReleaseName, _, _ = install.NameAndChart([]string{chart})
} else {
install.ReleaseName = name
}
install.Namespace = h.kubernetes.NamespaceOrDefault(namespace)
install.Wait = true
install.Timeout = 5 * time.Minute
install.DryRun = false
chartRequested, err := install.LocateChart(chart, cli.New())
if err != nil {
return "", err
}
chartLoaded, err := loader.Load(chartRequested)
if err != nil {
return "", err
}
installedRelease, err := install.RunWithContext(ctx, chartLoaded, values)
if err != nil {
return "", err
}
ret, err := yaml.Marshal(simplify(installedRelease))
if err != nil {
return "", err
}
return string(ret), nil
}
// List lists all the releases for the specified namespace (or current namespace if). Or allNamespaces is true, it lists all releases across all namespaces.
func (h *Helm) List(namespace string, allNamespaces bool) (string, error) {
cfg, err := h.newAction(namespace, allNamespaces)
if err != nil {
return "", err
}
list := action.NewList(cfg)
list.AllNamespaces = allNamespaces
releases, err := list.Run()
if err != nil {
return "", err
} else if len(releases) == 0 {
return "No Helm releases found", nil
}
ret, err := yaml.Marshal(simplify(releases...))
if err != nil {
return "", err
}
return string(ret), nil
}
func (h *Helm) Uninstall(name string, namespace string) (string, error) {
cfg, err := h.newAction(h.kubernetes.NamespaceOrDefault(namespace), false)
if err != nil {
return "", err
}
uninstall := action.NewUninstall(cfg)
uninstall.IgnoreNotFound = true
uninstall.Wait = true
uninstall.Timeout = 5 * time.Minute
uninstalledRelease, err := uninstall.Run(name)
if uninstalledRelease == nil && err == nil {
return fmt.Sprintf("Release %s not found", name), nil
} else if err != nil {
return "", err
}
return fmt.Sprintf("Uninstalled release %s %s", uninstalledRelease.Release.Name, uninstalledRelease.Info), nil
}
func (h *Helm) newAction(namespace string, allNamespaces bool) (*action.Configuration, error) {
cfg := new(action.Configuration)
applicableNamespace := ""
if !allNamespaces {
applicableNamespace = h.kubernetes.NamespaceOrDefault(namespace)
}
registryClient, err := registry.NewClient()
if err != nil {
return nil, err
}
cfg.RegistryClient = registryClient
return cfg, cfg.Init(h.kubernetes, applicableNamespace, "", log.Printf)
}
func simplify(release ...*release.Release) []map[string]interface{} {
ret := make([]map[string]interface{}, len(release))
for i, r := range release {
ret[i] = map[string]interface{}{
"name": r.Name,
"namespace": r.Namespace,
"revision": r.Version,
}
if r.Chart != nil {
ret[i]["chart"] = r.Chart.Metadata.Name
ret[i]["chartVersion"] = r.Chart.Metadata.Version
ret[i]["appVersion"] = r.Chart.Metadata.AppVersion
}
if r.Info != nil {
ret[i]["status"] = r.Info.Status.String()
if !r.Info.LastDeployed.IsZero() {
ret[i]["lastDeployed"] = r.Info.LastDeployed.Format(time.RFC1123Z)
}
}
}
return ret
}
```
--------------------------------------------------------------------------------
/pkg/mcp/mcp_tools_test.go:
--------------------------------------------------------------------------------
```go
package mcp
import (
"testing"
"github.com/BurntSushi/toml"
"github.com/mark3labs/mcp-go/mcp"
"github.com/stretchr/testify/suite"
"k8s.io/utils/ptr"
)
// McpToolProcessingSuite tests MCP tool processing (isToolApplicable)
type McpToolProcessingSuite struct {
BaseMcpSuite
}
func (s *McpToolProcessingSuite) TestUnrestricted() {
s.InitMcpClient()
tools, err := s.ListTools(s.T().Context(), mcp.ListToolsRequest{})
s.Require().NotNil(tools)
s.Run("ListTools returns tools", func() {
s.NoError(err, "call ListTools failed")
s.NotNilf(tools, "list tools failed")
})
s.Run("Destructive tools ARE NOT read only", func() {
for _, tool := range tools.Tools {
readOnly := ptr.Deref(tool.Annotations.ReadOnlyHint, false)
destructive := ptr.Deref(tool.Annotations.DestructiveHint, false)
s.Falsef(readOnly && destructive, "Tool %s is read-only and destructive, which is not allowed", tool.Name)
}
})
}
func (s *McpToolProcessingSuite) TestReadOnly() {
s.Require().NoError(toml.Unmarshal([]byte(`
read_only = true
`), s.Cfg), "Expected to parse read only server config")
s.InitMcpClient()
tools, err := s.ListTools(s.T().Context(), mcp.ListToolsRequest{})
s.Require().NotNil(tools)
s.Run("ListTools returns tools", func() {
s.NoError(err, "call ListTools failed")
s.NotNilf(tools, "list tools failed")
})
s.Run("ListTools returns only read-only tools", func() {
for _, tool := range tools.Tools {
s.Falsef(tool.Annotations.ReadOnlyHint == nil || !*tool.Annotations.ReadOnlyHint,
"Tool %s is not read-only but should be", tool.Name)
s.Falsef(tool.Annotations.DestructiveHint != nil && *tool.Annotations.DestructiveHint,
"Tool %s is destructive but should not be in read-only mode", tool.Name)
}
})
}
func (s *McpToolProcessingSuite) TestDisableDestructive() {
s.Require().NoError(toml.Unmarshal([]byte(`
disable_destructive = true
`), s.Cfg), "Expected to parse disable destructive server config")
s.InitMcpClient()
tools, err := s.ListTools(s.T().Context(), mcp.ListToolsRequest{})
s.Require().NotNil(tools)
s.Run("ListTools returns tools", func() {
s.NoError(err, "call ListTools failed")
s.NotNilf(tools, "list tools failed")
})
s.Run("ListTools does not return destructive tools", func() {
for _, tool := range tools.Tools {
s.Falsef(tool.Annotations.DestructiveHint != nil && *tool.Annotations.DestructiveHint,
"Tool %s is destructive but should not be in disable_destructive mode", tool.Name)
}
})
}
func (s *McpToolProcessingSuite) TestEnabledTools() {
s.Require().NoError(toml.Unmarshal([]byte(`
enabled_tools = [ "namespaces_list", "events_list" ]
`), s.Cfg), "Expected to parse enabled tools server config")
s.InitMcpClient()
tools, err := s.ListTools(s.T().Context(), mcp.ListToolsRequest{})
s.Require().NotNil(tools)
s.Run("ListTools returns tools", func() {
s.NoError(err, "call ListTools failed")
s.NotNilf(tools, "list tools failed")
})
s.Run("ListTools returns only explicitly enabled tools", func() {
s.Len(tools.Tools, 2, "ListTools should return exactly 2 tools")
for _, tool := range tools.Tools {
s.Falsef(tool.Name != "namespaces_list" && tool.Name != "events_list",
"Tool %s is not enabled but should be", tool.Name)
}
})
}
func (s *McpToolProcessingSuite) TestDisabledTools() {
s.Require().NoError(toml.Unmarshal([]byte(`
disabled_tools = [ "namespaces_list", "events_list" ]
`), s.Cfg), "Expected to parse disabled tools server config")
s.InitMcpClient()
tools, err := s.ListTools(s.T().Context(), mcp.ListToolsRequest{})
s.Require().NotNil(tools)
s.Run("ListTools returns tools", func() {
s.NoError(err, "call ListTools failed")
s.NotNilf(tools, "list tools failed")
})
s.Run("ListTools does not return disabled tools", func() {
for _, tool := range tools.Tools {
s.Falsef(tool.Name == "namespaces_list" || tool.Name == "events_list",
"Tool %s is not disabled but should be", tool.Name)
}
})
}
func TestMcpToolProcessing(t *testing.T) {
suite.Run(t, new(McpToolProcessingSuite))
}
```
--------------------------------------------------------------------------------
/pkg/mcp/pods_exec_test.go:
--------------------------------------------------------------------------------
```go
package mcp
import (
"bytes"
"io"
"net/http"
"strings"
"testing"
"github.com/BurntSushi/toml"
"github.com/mark3labs/mcp-go/mcp"
"github.com/stretchr/testify/suite"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/containers/kubernetes-mcp-server/internal/test"
)
type PodsExecSuite struct {
BaseMcpSuite
mockServer *test.MockServer
}
func (s *PodsExecSuite) SetupTest() {
s.BaseMcpSuite.SetupTest()
s.mockServer = test.NewMockServer()
s.Cfg.KubeConfig = s.mockServer.KubeconfigFile(s.T())
}
func (s *PodsExecSuite) TearDownTest() {
s.BaseMcpSuite.TearDownTest()
if s.mockServer != nil {
s.mockServer.Close()
}
}
func (s *PodsExecSuite) TestPodsExec() {
s.mockServer.Handle(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.Path != "/api/v1/namespaces/default/pods/pod-to-exec/exec" {
return
}
var stdin, stdout bytes.Buffer
ctx, err := test.CreateHTTPStreams(w, req, &test.StreamOptions{
Stdin: &stdin,
Stdout: &stdout,
})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
defer func(conn io.Closer) { _ = conn.Close() }(ctx.Closer)
_, _ = io.WriteString(ctx.StdoutStream, "command:"+strings.Join(req.URL.Query()["command"], " ")+"\n")
_, _ = io.WriteString(ctx.StdoutStream, "container:"+strings.Join(req.URL.Query()["container"], " ")+"\n")
}))
s.mockServer.Handle(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.Path != "/api/v1/namespaces/default/pods/pod-to-exec" {
return
}
test.WriteObject(w, &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod-to-exec",
},
Spec: v1.PodSpec{Containers: []v1.Container{{Name: "container-to-exec"}}},
})
}))
s.InitMcpClient()
s.Run("pods_exec(name=pod-to-exec, namespace=nil, command=[ls -l]), uses configured namespace", func() {
result, err := s.CallTool("pods_exec", map[string]interface{}{
"name": "pod-to-exec",
"command": []interface{}{"ls", "-l"},
})
s.Require().NotNil(result)
s.Run("returns command output", func() {
s.NoError(err, "call tool failed %v", err)
s.Falsef(result.IsError, "call tool failed: %v", result.Content)
s.Contains(result.Content[0].(mcp.TextContent).Text, "command:ls -l\n", "unexpected result %v", result.Content[0].(mcp.TextContent).Text)
})
})
s.Run("pods_exec(name=pod-to-exec, namespace=default, command=[ls -l])", func() {
result, err := s.CallTool("pods_exec", map[string]interface{}{
"namespace": "default",
"name": "pod-to-exec",
"command": []interface{}{"ls", "-l"},
})
s.Require().NotNil(result)
s.Run("returns command output", func() {
s.NoError(err, "call tool failed %v", err)
s.Falsef(result.IsError, "call tool failed: %v", result.Content)
s.Contains(result.Content[0].(mcp.TextContent).Text, "command:ls -l\n", "unexpected result %v", result.Content[0].(mcp.TextContent).Text)
})
})
s.Run("pods_exec(name=pod-to-exec, namespace=default, command=[ls -l], container=a-specific-container)", func() {
result, err := s.CallTool("pods_exec", map[string]interface{}{
"namespace": "default",
"name": "pod-to-exec",
"command": []interface{}{"ls", "-l"},
"container": "a-specific-container",
})
s.Require().NotNil(result)
s.Run("returns command output", func() {
s.NoError(err, "call tool failed %v", err)
s.Falsef(result.IsError, "call tool failed: %v", result.Content)
s.Contains(result.Content[0].(mcp.TextContent).Text, "command:ls -l\n", "unexpected result %v", result.Content[0].(mcp.TextContent).Text)
})
})
}
func (s *PodsExecSuite) TestPodsExecDenied() {
s.Require().NoError(toml.Unmarshal([]byte(`
denied_resources = [ { version = "v1", kind = "Pod" } ]
`), s.Cfg), "Expected to parse denied resources config")
s.InitMcpClient()
s.Run("pods_exec (denied)", func() {
toolResult, err := s.CallTool("pods_exec", map[string]interface{}{
"namespace": "default",
"name": "pod-to-exec",
"command": []interface{}{"ls", "-l"},
"container": "a-specific-container",
})
s.Require().NotNil(toolResult, "toolResult should not be nil")
s.Run("has error", func() {
s.Truef(toolResult.IsError, "call tool should fail")
s.Nilf(err, "call tool should not return error object")
})
s.Run("describes denial", func() {
expectedMessage := "failed to exec in pod pod-to-exec in namespace default: resource not allowed: /v1, Kind=Pod"
s.Equalf(expectedMessage, toolResult.Content[0].(mcp.TextContent).Text,
"expected descriptive error '%s', got %v", expectedMessage, toolResult.Content[0].(mcp.TextContent).Text)
})
})
}
func TestPodsExec(t *testing.T) {
suite.Run(t, new(PodsExecSuite))
}
```
--------------------------------------------------------------------------------
/pkg/mcp/events_test.go:
--------------------------------------------------------------------------------
```go
package mcp
import (
"strings"
"testing"
"github.com/BurntSushi/toml"
"github.com/mark3labs/mcp-go/mcp"
"github.com/stretchr/testify/suite"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/yaml"
)
type EventsSuite struct {
BaseMcpSuite
}
func (s *EventsSuite) TestEventsList() {
s.InitMcpClient()
s.Run("events_list (no events)", func() {
toolResult, err := s.CallTool("events_list", map[string]interface{}{})
s.Run("no error", func() {
s.Nilf(err, "call tool failed %v", err)
s.Falsef(toolResult.IsError, "call tool failed")
})
s.Run("returns no events message", func() {
s.Equal("# No events found", toolResult.Content[0].(mcp.TextContent).Text)
})
})
s.Run("events_list (with events)", func() {
client := kubernetes.NewForConfigOrDie(envTestRestConfig)
for _, ns := range []string{"default", "ns-1"} {
_, _ = client.CoreV1().Events(ns).Create(s.T().Context(), &v1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: "an-event-in-" + ns,
},
InvolvedObject: v1.ObjectReference{
APIVersion: "v1",
Kind: "Pod",
Name: "a-pod",
Namespace: ns,
},
Type: "Normal",
Message: "The event message",
}, metav1.CreateOptions{})
}
s.Run("events_list()", func() {
toolResult, err := s.CallTool("events_list", map[string]interface{}{})
s.Run("no error", func() {
s.Nilf(err, "call tool failed %v", err)
s.Falsef(toolResult.IsError, "call tool failed")
})
s.Run("has yaml comment indicating output format", func() {
s.Truef(strings.HasPrefix(toolResult.Content[0].(mcp.TextContent).Text, "# The following events (YAML format) were found:\n"), "unexpected result %v", toolResult.Content[0].(mcp.TextContent).Text)
})
var decoded []v1.Event
err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded)
s.Run("has yaml content", func() {
s.Nilf(err, "unmarshal failed %v", err)
})
s.Run("returns all events", func() {
s.YAMLEqf(""+
"- InvolvedObject:\n"+
" Kind: Pod\n"+
" Name: a-pod\n"+
" apiVersion: v1\n"+
" Message: The event message\n"+
" Namespace: default\n"+
" Reason: \"\"\n"+
" Timestamp: 0001-01-01 00:00:00 +0000 UTC\n"+
" Type: Normal\n"+
"- InvolvedObject:\n"+
" Kind: Pod\n"+
" Name: a-pod\n"+
" apiVersion: v1\n"+
" Message: The event message\n"+
" Namespace: ns-1\n"+
" Reason: \"\"\n"+
" Timestamp: 0001-01-01 00:00:00 +0000 UTC\n"+
" Type: Normal\n",
toolResult.Content[0].(mcp.TextContent).Text,
"unexpected result %v", toolResult.Content[0].(mcp.TextContent).Text)
})
})
s.Run("events_list(namespace=ns-1)", func() {
toolResult, err := s.CallTool("events_list", map[string]interface{}{
"namespace": "ns-1",
})
s.Run("no error", func() {
s.Nilf(err, "call tool failed %v", err)
s.Falsef(toolResult.IsError, "call tool failed")
})
s.Run("has yaml comment indicating output format", func() {
s.Truef(strings.HasPrefix(toolResult.Content[0].(mcp.TextContent).Text, "# The following events (YAML format) were found:\n"), "unexpected result %v", toolResult.Content[0].(mcp.TextContent).Text)
})
var decoded []v1.Event
err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded)
s.Run("has yaml content", func() {
s.Nilf(err, "unmarshal failed %v", err)
})
s.Run("returns events from namespace", func() {
s.YAMLEqf(""+
"- InvolvedObject:\n"+
" Kind: Pod\n"+
" Name: a-pod\n"+
" apiVersion: v1\n"+
" Message: The event message\n"+
" Namespace: ns-1\n"+
" Reason: \"\"\n"+
" Timestamp: 0001-01-01 00:00:00 +0000 UTC\n"+
" Type: Normal\n",
toolResult.Content[0].(mcp.TextContent).Text,
"unexpected result %v", toolResult.Content[0].(mcp.TextContent).Text)
})
})
})
}
func (s *EventsSuite) TestEventsListDenied() {
s.Require().NoError(toml.Unmarshal([]byte(`
denied_resources = [ { version = "v1", kind = "Event" } ]
`), s.Cfg), "Expected to parse denied resources config")
s.InitMcpClient()
s.Run("events_list (denied)", func() {
toolResult, err := s.CallTool("events_list", map[string]interface{}{})
s.Require().NotNil(toolResult, "toolResult should not be nil")
s.Run("has error", func() {
s.Truef(toolResult.IsError, "call tool should fail")
s.Nilf(err, "call tool should not return error object")
})
s.Run("describes denial", func() {
expectedMessage := "failed to list events in all namespaces: resource not allowed: /v1, Kind=Event"
s.Equalf(expectedMessage, toolResult.Content[0].(mcp.TextContent).Text,
"expected descriptive error '%s', got %v", expectedMessage, toolResult.Content[0].(mcp.TextContent).Text)
})
})
}
func TestEvents(t *testing.T) {
suite.Run(t, new(EventsSuite))
}
```
--------------------------------------------------------------------------------
/pkg/config/config.go:
--------------------------------------------------------------------------------
```go
package config
import (
"bytes"
"fmt"
"os"
"github.com/BurntSushi/toml"
)
const (
ClusterProviderKubeConfig = "kubeconfig"
ClusterProviderInCluster = "in-cluster"
ClusterProviderDisabled = "disabled"
)
// StaticConfig is the configuration for the server.
// It allows to configure server specific settings and tools to be enabled or disabled.
type StaticConfig struct {
DeniedResources []GroupVersionKind `toml:"denied_resources"`
LogLevel int `toml:"log_level,omitzero"`
Port string `toml:"port,omitempty"`
SSEBaseURL string `toml:"sse_base_url,omitempty"`
KubeConfig string `toml:"kubeconfig,omitempty"`
ListOutput string `toml:"list_output,omitempty"`
// When true, expose only tools annotated with readOnlyHint=true
ReadOnly bool `toml:"read_only,omitempty"`
// When true, disable tools annotated with destructiveHint=true
DisableDestructive bool `toml:"disable_destructive,omitempty"`
Toolsets []string `toml:"toolsets,omitempty"`
EnabledTools []string `toml:"enabled_tools,omitempty"`
DisabledTools []string `toml:"disabled_tools,omitempty"`
// Authorization-related fields
// RequireOAuth indicates whether the server requires OAuth for authentication.
RequireOAuth bool `toml:"require_oauth,omitempty"`
// OAuthAudience is the valid audience for the OAuth tokens, used for offline JWT claim validation.
OAuthAudience string `toml:"oauth_audience,omitempty"`
// ValidateToken indicates whether the server should validate the token against the Kubernetes API Server using TokenReview.
ValidateToken bool `toml:"validate_token,omitempty"`
// AuthorizationURL is the URL of the OIDC authorization server.
// It is used for token validation and for STS token exchange.
AuthorizationURL string `toml:"authorization_url,omitempty"`
// DisableDynamicClientRegistration indicates whether dynamic client registration is disabled.
// If true, the .well-known endpoints will not expose the registration endpoint.
DisableDynamicClientRegistration bool `toml:"disable_dynamic_client_registration,omitempty"`
// OAuthScopes are the supported **client** scopes requested during the **client/frontend** OAuth flow.
OAuthScopes []string `toml:"oauth_scopes,omitempty"`
// StsClientId is the OAuth client ID used for backend token exchange
StsClientId string `toml:"sts_client_id,omitempty"`
// StsClientSecret is the OAuth client secret used for backend token exchange
StsClientSecret string `toml:"sts_client_secret,omitempty"`
// StsAudience is the audience for the STS token exchange.
StsAudience string `toml:"sts_audience,omitempty"`
// StsScopes is the scopes for the STS token exchange.
StsScopes []string `toml:"sts_scopes,omitempty"`
CertificateAuthority string `toml:"certificate_authority,omitempty"`
ServerURL string `toml:"server_url,omitempty"`
// ClusterProviderStrategy is how the server finds clusters.
// If set to "kubeconfig", the clusters will be loaded from those in the kubeconfig.
// If set to "in-cluster", the server will use the in cluster config
ClusterProviderStrategy string `toml:"cluster_provider_strategy,omitempty"`
// ClusterProvider-specific configurations
// This map holds raw TOML primitives that will be parsed by registered provider parsers
ClusterProviderConfigs map[string]toml.Primitive `toml:"cluster_provider_configs,omitempty"`
// Internal: parsed provider configs (not exposed to TOML package)
parsedClusterProviderConfigs map[string]ProviderConfig
}
type GroupVersionKind struct {
Group string `toml:"group"`
Version string `toml:"version"`
Kind string `toml:"kind,omitempty"`
}
// Read reads the toml file and returns the StaticConfig.
func Read(configPath string) (*StaticConfig, error) {
configData, err := os.ReadFile(configPath)
if err != nil {
return nil, err
}
return ReadToml(configData)
}
// ReadToml reads the toml data and returns the StaticConfig.
func ReadToml(configData []byte) (*StaticConfig, error) {
config := Default()
md, err := toml.NewDecoder(bytes.NewReader(configData)).Decode(config)
if err != nil {
return nil, err
}
if err := config.parseClusterProviderConfigs(md); err != nil {
return nil, err
}
return config, nil
}
func (c *StaticConfig) GetProviderConfig(strategy string) (ProviderConfig, bool) {
config, ok := c.parsedClusterProviderConfigs[strategy]
return config, ok
}
func (c *StaticConfig) parseClusterProviderConfigs(md toml.MetaData) error {
if c.parsedClusterProviderConfigs == nil {
c.parsedClusterProviderConfigs = make(map[string]ProviderConfig, len(c.ClusterProviderConfigs))
}
for strategy, primitive := range c.ClusterProviderConfigs {
parser, ok := getProviderConfigParser(strategy)
if !ok {
continue
}
providerConfig, err := parser(primitive, md)
if err != nil {
return fmt.Errorf("failed to parse config for ClusterProvider '%s': %w", strategy, err)
}
if err := providerConfig.Validate(); err != nil {
return fmt.Errorf("invalid config file for ClusterProvider '%s': %w", strategy, err)
}
c.parsedClusterProviderConfigs[strategy] = providerConfig
}
return nil
}
```
--------------------------------------------------------------------------------
/pkg/toolsets/helm/helm.go:
--------------------------------------------------------------------------------
```go
package helm
import (
"fmt"
"github.com/google/jsonschema-go/jsonschema"
"k8s.io/utils/ptr"
"github.com/containers/kubernetes-mcp-server/pkg/api"
)
func initHelm() []api.ServerTool {
return []api.ServerTool{
{Tool: api.Tool{
Name: "helm_install",
Description: "Install a Helm chart in the current or provided namespace",
InputSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"chart": {
Type: "string",
Description: "Chart reference to install (for example: stable/grafana, oci://ghcr.io/nginxinc/charts/nginx-ingress)",
},
"values": {
Type: "object",
Description: "Values to pass to the Helm chart (Optional)",
Properties: make(map[string]*jsonschema.Schema),
},
"name": {
Type: "string",
Description: "Name of the Helm release (Optional, random name if not provided)",
},
"namespace": {
Type: "string",
Description: "Namespace to install the Helm chart in (Optional, current namespace if not provided)",
},
},
Required: []string{"chart"},
},
Annotations: api.ToolAnnotations{
Title: "Helm: Install",
ReadOnlyHint: ptr.To(false),
DestructiveHint: ptr.To(false),
IdempotentHint: ptr.To(false), // TODO: consider replacing implementation with equivalent to: helm upgrade --install
OpenWorldHint: ptr.To(true),
},
}, Handler: helmInstall},
{Tool: api.Tool{
Name: "helm_list",
Description: "List all the Helm releases in the current or provided namespace (or in all namespaces if specified)",
InputSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"namespace": {
Type: "string",
Description: "Namespace to list Helm releases from (Optional, all namespaces if not provided)",
},
"all_namespaces": {
Type: "boolean",
Description: "If true, lists all Helm releases in all namespaces ignoring the namespace argument (Optional)",
},
},
},
Annotations: api.ToolAnnotations{
Title: "Helm: List",
ReadOnlyHint: ptr.To(true),
DestructiveHint: ptr.To(false),
IdempotentHint: ptr.To(false),
OpenWorldHint: ptr.To(true),
},
}, Handler: helmList},
{Tool: api.Tool{
Name: "helm_uninstall",
Description: "Uninstall a Helm release in the current or provided namespace",
InputSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"name": {
Type: "string",
Description: "Name of the Helm release to uninstall",
},
"namespace": {
Type: "string",
Description: "Namespace to uninstall the Helm release from (Optional, current namespace if not provided)",
},
},
Required: []string{"name"},
},
Annotations: api.ToolAnnotations{
Title: "Helm: Uninstall",
ReadOnlyHint: ptr.To(false),
DestructiveHint: ptr.To(true),
IdempotentHint: ptr.To(true),
OpenWorldHint: ptr.To(true),
},
}, Handler: helmUninstall},
}
}
func helmInstall(params api.ToolHandlerParams) (*api.ToolCallResult, error) {
var chart string
ok := false
if chart, ok = params.GetArguments()["chart"].(string); !ok {
return api.NewToolCallResult("", fmt.Errorf("failed to install helm chart, missing argument chart")), nil
}
values := map[string]interface{}{}
if v, ok := params.GetArguments()["values"].(map[string]interface{}); ok {
values = v
}
name := ""
if v, ok := params.GetArguments()["name"].(string); ok {
name = v
}
namespace := ""
if v, ok := params.GetArguments()["namespace"].(string); ok {
namespace = v
}
ret, err := params.NewHelm().Install(params, chart, values, name, namespace)
if err != nil {
return api.NewToolCallResult("", fmt.Errorf("failed to install helm chart '%s': %w", chart, err)), nil
}
return api.NewToolCallResult(ret, err), nil
}
func helmList(params api.ToolHandlerParams) (*api.ToolCallResult, error) {
allNamespaces := false
if v, ok := params.GetArguments()["all_namespaces"].(bool); ok {
allNamespaces = v
}
namespace := ""
if v, ok := params.GetArguments()["namespace"].(string); ok {
namespace = v
}
ret, err := params.NewHelm().List(namespace, allNamespaces)
if err != nil {
return api.NewToolCallResult("", fmt.Errorf("failed to list helm releases in namespace '%s': %w", namespace, err)), nil
}
return api.NewToolCallResult(ret, err), nil
}
func helmUninstall(params api.ToolHandlerParams) (*api.ToolCallResult, error) {
var name string
ok := false
if name, ok = params.GetArguments()["name"].(string); !ok {
return api.NewToolCallResult("", fmt.Errorf("failed to uninstall helm chart, missing argument name")), nil
}
namespace := ""
if v, ok := params.GetArguments()["namespace"].(string); ok {
namespace = v
}
ret, err := params.NewHelm().Uninstall(name, namespace)
if err != nil {
return api.NewToolCallResult("", fmt.Errorf("failed to uninstall helm chart '%s': %w", name, err)), nil
}
return api.NewToolCallResult(ret, err), nil
}
```
--------------------------------------------------------------------------------
/pkg/mcp/mcp_test.go:
--------------------------------------------------------------------------------
```go
package mcp
import (
"context"
"net/http"
"os"
"path/filepath"
"runtime"
"testing"
"time"
"github.com/containers/kubernetes-mcp-server/internal/test"
"github.com/mark3labs/mcp-go/client"
"github.com/mark3labs/mcp-go/mcp"
)
func TestWatchKubeConfig(t *testing.T) {
if runtime.GOOS != "linux" && runtime.GOOS != "darwin" {
t.Skip("Skipping test on non-Unix-like platforms")
}
testCase(t, func(c *mcpContext) {
// Given
withTimeout, cancel := context.WithTimeout(c.ctx, 5*time.Second)
defer cancel()
var notification *mcp.JSONRPCNotification
c.mcpClient.OnNotification(func(n mcp.JSONRPCNotification) {
notification = &n
})
// When
f, _ := os.OpenFile(filepath.Join(c.tempDir, "config"), os.O_APPEND|os.O_WRONLY, 0644)
_, _ = f.WriteString("\n")
for notification == nil {
select {
case <-withTimeout.Done():
default:
time.Sleep(100 * time.Millisecond)
}
}
// Then
t.Run("WatchKubeConfig notifies tools change", func(t *testing.T) {
if notification == nil {
t.Fatalf("WatchKubeConfig did not notify")
}
if notification.Method != "notifications/tools/list_changed" {
t.Fatalf("WatchKubeConfig did not notify tools change, got %s", notification.Method)
}
})
})
}
func TestSseHeaders(t *testing.T) {
mockServer := test.NewMockServer()
defer mockServer.Close()
before := func(c *mcpContext) {
c.withKubeConfig(mockServer.Config())
c.clientOptions = append(c.clientOptions, client.WithHeaders(map[string]string{"kubernetes-authorization": "Bearer a-token-from-mcp-client"}))
}
pathHeaders := make(map[string]http.Header, 0)
mockServer.Handle(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
pathHeaders[req.URL.Path] = req.Header.Clone()
// Request Performed by DiscoveryClient to Kube API (Get API Groups legacy -core-)
if req.URL.Path == "/api" {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"kind":"APIVersions","versions":["v1"],"serverAddressByClientCIDRs":[{"clientCIDR":"0.0.0.0/0"}]}`))
return
}
// Request Performed by DiscoveryClient to Kube API (Get API Groups)
if req.URL.Path == "/apis" {
w.Header().Set("Content-Type", "application/json")
//w.Write([]byte(`{"kind":"APIGroupList","apiVersion":"v1","groups":[{"name":"apps","versions":[{"groupVersion":"apps/v1","version":"v1"}],"preferredVersion":{"groupVersion":"apps/v1","version":"v1"}}]}`))
_, _ = w.Write([]byte(`{"kind":"APIGroupList","apiVersion":"v1","groups":[]}`))
return
}
// Request Performed by DiscoveryClient to Kube API (Get API Resources)
if req.URL.Path == "/api/v1" {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"kind":"APIResourceList","apiVersion":"v1","resources":[{"name":"pods","singularName":"","namespaced":true,"kind":"Pod","verbs":["get","list","watch","create","update","patch","delete"]}]}`))
return
}
// Request Performed by DynamicClient
if req.URL.Path == "/api/v1/namespaces/default/pods" {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"kind":"PodList","apiVersion":"v1","items":[]}`))
return
}
// Request Performed by kubernetes.Interface
if req.URL.Path == "/api/v1/namespaces/default/pods/a-pod-to-delete" {
w.WriteHeader(200)
return
}
w.WriteHeader(404)
}))
testCaseWithContext(t, &mcpContext{before: before}, func(c *mcpContext) {
_, _ = c.callTool("pods_list", map[string]interface{}{})
t.Run("DiscoveryClient propagates headers to Kube API", func(t *testing.T) {
if len(pathHeaders) == 0 {
t.Fatalf("No requests were made to Kube API")
}
if pathHeaders["/api"] == nil || pathHeaders["/api"].Get("Authorization") != "Bearer a-token-from-mcp-client" {
t.Fatalf("Overridden header Authorization not found in request to /api")
}
if pathHeaders["/apis"] == nil || pathHeaders["/apis"].Get("Authorization") != "Bearer a-token-from-mcp-client" {
t.Fatalf("Overridden header Authorization not found in request to /apis")
}
if pathHeaders["/api/v1"] == nil || pathHeaders["/api/v1"].Get("Authorization") != "Bearer a-token-from-mcp-client" {
t.Fatalf("Overridden header Authorization not found in request to /api/v1")
}
})
t.Run("DynamicClient propagates headers to Kube API", func(t *testing.T) {
if len(pathHeaders) == 0 {
t.Fatalf("No requests were made to Kube API")
}
if pathHeaders["/api/v1/namespaces/default/pods"] == nil || pathHeaders["/api/v1/namespaces/default/pods"].Get("Authorization") != "Bearer a-token-from-mcp-client" {
t.Fatalf("Overridden header Authorization not found in request to /api/v1/namespaces/default/pods")
}
})
_, _ = c.callTool("pods_delete", map[string]interface{}{"name": "a-pod-to-delete"})
t.Run("kubernetes.Interface propagates headers to Kube API", func(t *testing.T) {
if len(pathHeaders) == 0 {
t.Fatalf("No requests were made to Kube API")
}
if pathHeaders["/api/v1/namespaces/default/pods/a-pod-to-delete"] == nil || pathHeaders["/api/v1/namespaces/default/pods/a-pod-to-delete"].Get("Authorization") != "Bearer a-token-from-mcp-client" {
t.Fatalf("Overridden header Authorization not found in request to /api/v1/namespaces/default/pods/a-pod-to-delete")
}
})
})
}
```
--------------------------------------------------------------------------------
/pkg/config/provider_config_test.go:
--------------------------------------------------------------------------------
```go
package config
import (
"errors"
"testing"
"github.com/BurntSushi/toml"
"github.com/stretchr/testify/suite"
)
type ProviderConfigSuite struct {
BaseConfigSuite
originalProviderConfigParsers map[string]ProviderConfigParser
}
func (s *ProviderConfigSuite) SetupTest() {
s.originalProviderConfigParsers = make(map[string]ProviderConfigParser)
for k, v := range providerConfigParsers {
s.originalProviderConfigParsers[k] = v
}
}
func (s *ProviderConfigSuite) TearDownTest() {
providerConfigParsers = make(map[string]ProviderConfigParser)
for k, v := range s.originalProviderConfigParsers {
providerConfigParsers[k] = v
}
}
type ProviderConfigForTest struct {
BoolProp bool `toml:"bool_prop"`
StrProp string `toml:"str_prop"`
IntProp int `toml:"int_prop"`
}
var _ ProviderConfig = (*ProviderConfigForTest)(nil)
func (p *ProviderConfigForTest) Validate() error {
if p.StrProp == "force-error" {
return errors.New("validation error forced by test")
}
return nil
}
func providerConfigForTestParser(primitive toml.Primitive, md toml.MetaData) (ProviderConfig, error) {
var providerConfigForTest ProviderConfigForTest
if err := md.PrimitiveDecode(primitive, &providerConfigForTest); err != nil {
return nil, err
}
return &providerConfigForTest, nil
}
func (s *ProviderConfigSuite) TestRegisterProviderConfig() {
s.Run("panics when registering duplicate provider config parser", func() {
s.Panics(func() {
RegisterProviderConfig("test", providerConfigForTestParser)
RegisterProviderConfig("test", providerConfigForTestParser)
}, "Expected panic when registering duplicate provider config parser")
})
}
func (s *ProviderConfigSuite) TestReadConfigValid() {
RegisterProviderConfig("test", providerConfigForTestParser)
validConfigPath := s.writeConfig(`
cluster_provider_strategy = "test"
[cluster_provider_configs.test]
bool_prop = true
str_prop = "a string"
int_prop = 42
`)
config, err := Read(validConfigPath)
s.Run("returns no error for valid file with registered provider config", func() {
s.Require().NoError(err, "Expected no error for valid file, got %v", err)
})
s.Run("returns config for valid file with registered provider config", func() {
s.Require().NotNil(config, "Expected non-nil config for valid file")
})
s.Run("parses provider config correctly", func() {
providerConfig, ok := config.GetProviderConfig("test")
s.Require().True(ok, "Expected to find provider config for strategy 'test'")
s.Require().NotNil(providerConfig, "Expected non-nil provider config for strategy 'test'")
testProviderConfig, ok := providerConfig.(*ProviderConfigForTest)
s.Require().True(ok, "Expected provider config to be of type *ProviderConfigForTest")
s.Equal(true, testProviderConfig.BoolProp, "Expected BoolProp to be true")
s.Equal("a string", testProviderConfig.StrProp, "Expected StrProp to be 'a string'")
s.Equal(42, testProviderConfig.IntProp, "Expected IntProp to be 42")
})
}
func (s *ProviderConfigSuite) TestReadConfigInvalidProviderConfig() {
RegisterProviderConfig("test", providerConfigForTestParser)
invalidConfigPath := s.writeConfig(`
cluster_provider_strategy = "test"
[cluster_provider_configs.test]
bool_prop = true
str_prop = "force-error"
int_prop = 42
`)
config, err := Read(invalidConfigPath)
s.Run("returns error for invalid provider config", func() {
s.Require().NotNil(err, "Expected error for invalid provider config, got nil")
s.ErrorContains(err, "validation error forced by test", "Expected validation error from provider config")
})
s.Run("returns nil config for invalid provider config", func() {
s.Nil(config, "Expected nil config for invalid provider config")
})
}
func (s *ProviderConfigSuite) TestReadConfigUnregisteredProviderConfig() {
invalidConfigPath := s.writeConfig(`
cluster_provider_strategy = "unregistered"
[cluster_provider_configs.unregistered]
bool_prop = true
str_prop = "a string"
int_prop = 42
`)
config, err := Read(invalidConfigPath)
s.Run("returns no error for unregistered provider config", func() {
s.Require().NoError(err, "Expected no error for unregistered provider config, got %v", err)
})
s.Run("returns config for unregistered provider config", func() {
s.Require().NotNil(config, "Expected non-nil config for unregistered provider config")
})
s.Run("does not parse unregistered provider config", func() {
_, ok := config.GetProviderConfig("unregistered")
s.Require().False(ok, "Expected no provider config for unregistered strategy")
})
}
func (s *ProviderConfigSuite) TestReadConfigParserError() {
RegisterProviderConfig("test", func(primitive toml.Primitive, md toml.MetaData) (ProviderConfig, error) {
return nil, errors.New("parser error forced by test")
})
invalidConfigPath := s.writeConfig(`
cluster_provider_strategy = "test"
[cluster_provider_configs.test]
bool_prop = true
str_prop = "a string"
int_prop = 42
`)
config, err := Read(invalidConfigPath)
s.Run("returns error for provider config parser error", func() {
s.Require().NotNil(err, "Expected error for provider config parser error, got nil")
s.ErrorContains(err, "parser error forced by test", "Expected parser error from provider config")
})
s.Run("returns nil config for provider config parser error", func() {
s.Nil(config, "Expected nil config for provider config parser error")
})
}
func TestProviderConfig(t *testing.T) {
suite.Run(t, new(ProviderConfigSuite))
}
```
--------------------------------------------------------------------------------
/pkg/kubernetes/provider_single_test.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import (
"net/http"
"testing"
"github.com/containers/kubernetes-mcp-server/internal/test"
"github.com/containers/kubernetes-mcp-server/pkg/config"
"github.com/stretchr/testify/suite"
"k8s.io/client-go/rest"
)
type ProviderSingleTestSuite struct {
BaseProviderSuite
mockServer *test.MockServer
originalIsInClusterConfig func() (*rest.Config, error)
provider Provider
}
func (s *ProviderSingleTestSuite) SetupTest() {
// Single cluster provider is used when in-cluster or when the multi-cluster feature is disabled.
// For this test suite we simulate an in-cluster deployment.
s.originalIsInClusterConfig = InClusterConfig
s.mockServer = test.NewMockServer()
InClusterConfig = func() (*rest.Config, error) {
return s.mockServer.Config(), nil
}
provider, err := NewProvider(&config.StaticConfig{})
s.Require().NoError(err, "Expected no error creating provider with kubeconfig")
s.provider = provider
}
func (s *ProviderSingleTestSuite) TearDownTest() {
InClusterConfig = s.originalIsInClusterConfig
if s.mockServer != nil {
s.mockServer.Close()
}
}
func (s *ProviderSingleTestSuite) TestType() {
s.IsType(&singleClusterProvider{}, s.provider)
}
func (s *ProviderSingleTestSuite) TestWithNonOpenShiftCluster() {
s.Run("IsOpenShift returns false", func() {
inOpenShift := s.provider.IsOpenShift(s.T().Context())
s.False(inOpenShift, "Expected InOpenShift to return false")
})
}
func (s *ProviderSingleTestSuite) TestWithOpenShiftCluster() {
s.mockServer.Handle(&test.InOpenShiftHandler{})
s.Run("IsOpenShift returns true", func() {
inOpenShift := s.provider.IsOpenShift(s.T().Context())
s.True(inOpenShift, "Expected InOpenShift to return true")
})
}
func (s *ProviderSingleTestSuite) TestVerifyToken() {
s.mockServer.Handle(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.EscapedPath() == "/apis/authentication.k8s.io/v1/tokenreviews" {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`
{
"kind": "TokenReview",
"apiVersion": "authentication.k8s.io/v1",
"spec": {"token": "the-token"},
"status": {
"authenticated": true,
"user": {
"username": "test-user",
"groups": ["system:authenticated"]
},
"audiences": ["the-audience"]
}
}`))
}
}))
s.Run("VerifyToken returns UserInfo for empty target (default target)", func() {
userInfo, audiences, err := s.provider.VerifyToken(s.T().Context(), "", "the-token", "the-audience")
s.Require().NoError(err, "Expected no error from VerifyToken with empty target")
s.Require().NotNil(userInfo, "Expected UserInfo from VerifyToken with empty target")
s.Equalf(userInfo.Username, "test-user", "Expected username test-user, got: %s", userInfo.Username)
s.Containsf(userInfo.Groups, "system:authenticated", "Expected group system:authenticated in %v", userInfo.Groups)
s.Require().NotNil(audiences, "Expected audiences from VerifyToken with empty target")
s.Len(audiences, 1, "Expected audiences from VerifyToken with empty target")
s.Containsf(audiences, "the-audience", "Expected audience the-audience in %v", audiences)
})
s.Run("VerifyToken returns error for non-empty context", func() {
userInfo, audiences, err := s.provider.VerifyToken(s.T().Context(), "non-empty", "the-token", "the-audience")
s.Require().Error(err, "Expected error from VerifyToken with non-empty target")
s.ErrorContains(err, "unable to get manager for other context/cluster with in-cluster strategy", "Expected error about trying to get other cluster")
s.Nil(userInfo, "Expected no UserInfo from VerifyToken with non-empty target")
s.Nil(audiences, "Expected no audiences from VerifyToken with non-empty target")
})
}
func (s *ProviderSingleTestSuite) TestGetTargets() {
s.Run("GetTargets returns single empty target", func() {
targets, err := s.provider.GetTargets(s.T().Context())
s.Require().NoError(err, "Expected no error from GetTargets")
s.Len(targets, 1, "Expected 1 targets from GetTargets")
s.Contains(targets, "", "Expected empty target from GetTargets")
})
}
func (s *ProviderSingleTestSuite) TestGetDerivedKubernetes() {
s.Run("GetDerivedKubernetes returns Kubernetes for empty target", func() {
k8s, err := s.provider.GetDerivedKubernetes(s.T().Context(), "")
s.Require().NoError(err, "Expected no error from GetDerivedKubernetes with empty target")
s.NotNil(k8s, "Expected Kubernetes from GetDerivedKubernetes with empty target")
})
s.Run("GetDerivedKubernetes returns error for non-empty target", func() {
k8s, err := s.provider.GetDerivedKubernetes(s.T().Context(), "non-empty-target")
s.Require().Error(err, "Expected error from GetDerivedKubernetes with non-empty target")
s.ErrorContains(err, "unable to get manager for other context/cluster with in-cluster strategy", "Expected error about trying to get other cluster")
s.Nil(k8s, "Expected no Kubernetes from GetDerivedKubernetes with non-empty target")
})
}
func (s *ProviderSingleTestSuite) TestGetDefaultTarget() {
s.Run("GetDefaultTarget returns empty string", func() {
s.Empty(s.provider.GetDefaultTarget(), "Expected fake-context as default target")
})
}
func (s *ProviderSingleTestSuite) TestGetTargetParameterName() {
s.Empty(s.provider.GetTargetParameterName(), "Expected empty string as target parameter name")
}
func TestProviderSingle(t *testing.T) {
suite.Run(t, new(ProviderSingleTestSuite))
}
```