This is page 4 of 4. Use http://codebase.md/manusa/kubernetes-mcp-server?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .github
│ ├── dependabot.yml
│ └── workflows
│ ├── build.yaml
│ ├── release-image.yml
│ └── release.yaml
├── .gitignore
├── AGENTS.md
├── build
│ ├── keycloak.mk
│ ├── kind.mk
│ └── tools.mk
├── CLAUDE.md
├── cmd
│ └── kubernetes-mcp-server
│ ├── main_test.go
│ └── main.go
├── dev
│ └── config
│ ├── cert-manager
│ │ └── selfsigned-issuer.yaml
│ ├── ingress
│ │ └── nginx-ingress.yaml
│ ├── keycloak
│ │ ├── client-scopes
│ │ │ ├── groups.json
│ │ │ ├── mcp-openshift.json
│ │ │ └── mcp-server.json
│ │ ├── clients
│ │ │ ├── mcp-client.json
│ │ │ ├── mcp-server-update.json
│ │ │ ├── mcp-server.json
│ │ │ └── openshift.json
│ │ ├── deployment.yaml
│ │ ├── ingress.yaml
│ │ ├── mappers
│ │ │ ├── groups-membership.json
│ │ │ ├── mcp-server-audience.json
│ │ │ ├── openshift-audience.json
│ │ │ └── username.json
│ │ ├── rbac.yaml
│ │ ├── realm
│ │ │ ├── realm-create.json
│ │ │ └── realm-events-config.json
│ │ └── users
│ │ └── mcp.json
│ └── kind
│ └── cluster.yaml
├── Dockerfile
├── docs
│ ├── GETTING_STARTED_CLAUDE_CODE.md
│ ├── GETTING_STARTED_KUBERNETES.md
│ ├── images
│ │ ├── keycloak-login-page.png
│ │ ├── keycloak-mcp-inspector-connect.png
│ │ ├── keycloak-mcp-inspector-results.png
│ │ ├── kubernetes-mcp-server-github-copilot.jpg
│ │ └── vibe-coding.jpg
│ ├── KEYCLOAK_OIDC_SETUP.md
│ └── README.md
├── go.mod
├── go.sum
├── hack
│ └── generate-placeholder-ca.sh
├── internal
│ ├── test
│ │ ├── env.go
│ │ ├── kubernetes.go
│ │ ├── mcp.go
│ │ ├── mock_server.go
│ │ └── test.go
│ └── tools
│ └── update-readme
│ └── main.go
├── LICENSE
├── Makefile
├── npm
│ ├── kubernetes-mcp-server
│ │ ├── bin
│ │ │ └── index.js
│ │ └── package.json
│ ├── kubernetes-mcp-server-darwin-amd64
│ │ └── package.json
│ ├── kubernetes-mcp-server-darwin-arm64
│ │ └── package.json
│ ├── kubernetes-mcp-server-linux-amd64
│ │ └── package.json
│ ├── kubernetes-mcp-server-linux-arm64
│ │ └── package.json
│ ├── kubernetes-mcp-server-windows-amd64
│ │ └── package.json
│ └── kubernetes-mcp-server-windows-arm64
│ └── package.json
├── pkg
│ ├── api
│ │ ├── toolsets_test.go
│ │ └── toolsets.go
│ ├── config
│ │ ├── config_default_overrides.go
│ │ ├── config_default.go
│ │ ├── config_test.go
│ │ ├── config.go
│ │ ├── provider_config_test.go
│ │ └── provider_config.go
│ ├── helm
│ │ └── helm.go
│ ├── http
│ │ ├── authorization_test.go
│ │ ├── authorization.go
│ │ ├── http_test.go
│ │ ├── http.go
│ │ ├── middleware.go
│ │ ├── sts_test.go
│ │ ├── sts.go
│ │ └── wellknown.go
│ ├── kubernetes
│ │ ├── accesscontrol_clientset.go
│ │ ├── accesscontrol_restmapper.go
│ │ ├── accesscontrol.go
│ │ ├── common_test.go
│ │ ├── configuration.go
│ │ ├── events.go
│ │ ├── impersonate_roundtripper.go
│ │ ├── kubernetes_derived_test.go
│ │ ├── kubernetes.go
│ │ ├── manager_test.go
│ │ ├── manager.go
│ │ ├── namespaces.go
│ │ ├── nodes.go
│ │ ├── openshift.go
│ │ ├── pods.go
│ │ ├── provider_kubeconfig_test.go
│ │ ├── provider_kubeconfig.go
│ │ ├── provider_registry_test.go
│ │ ├── provider_registry.go
│ │ ├── provider_single_test.go
│ │ ├── provider_single.go
│ │ ├── provider_test.go
│ │ ├── provider.go
│ │ ├── resources.go
│ │ └── token.go
│ ├── kubernetes-mcp-server
│ │ └── cmd
│ │ ├── root_test.go
│ │ ├── root.go
│ │ └── testdata
│ │ ├── empty-config.toml
│ │ └── valid-config.toml
│ ├── mcp
│ │ ├── common_test.go
│ │ ├── configuration_test.go
│ │ ├── events_test.go
│ │ ├── helm_test.go
│ │ ├── m3labs.go
│ │ ├── mcp_middleware_test.go
│ │ ├── mcp_test.go
│ │ ├── mcp_tools_test.go
│ │ ├── mcp.go
│ │ ├── modules.go
│ │ ├── namespaces_test.go
│ │ ├── nodes_test.go
│ │ ├── pods_exec_test.go
│ │ ├── pods_test.go
│ │ ├── pods_top_test.go
│ │ ├── resources_test.go
│ │ ├── testdata
│ │ │ ├── helm-chart-no-op
│ │ │ │ └── Chart.yaml
│ │ │ ├── helm-chart-secret
│ │ │ │ ├── Chart.yaml
│ │ │ │ └── templates
│ │ │ │ └── secret.yaml
│ │ │ ├── toolsets-config-tools.json
│ │ │ ├── toolsets-core-tools.json
│ │ │ ├── toolsets-full-tools-multicluster-enum.json
│ │ │ ├── toolsets-full-tools-multicluster.json
│ │ │ ├── toolsets-full-tools-openshift.json
│ │ │ ├── toolsets-full-tools.json
│ │ │ └── toolsets-helm-tools.json
│ │ ├── tool_filter_test.go
│ │ ├── tool_filter.go
│ │ ├── tool_mutator_test.go
│ │ ├── tool_mutator.go
│ │ └── toolsets_test.go
│ ├── output
│ │ ├── output_test.go
│ │ └── output.go
│ ├── toolsets
│ │ ├── config
│ │ │ ├── configuration.go
│ │ │ └── toolset.go
│ │ ├── core
│ │ │ ├── events.go
│ │ │ ├── namespaces.go
│ │ │ ├── nodes.go
│ │ │ ├── pods.go
│ │ │ ├── resources.go
│ │ │ └── toolset.go
│ │ ├── helm
│ │ │ ├── helm.go
│ │ │ └── toolset.go
│ │ ├── toolsets_test.go
│ │ └── toolsets.go
│ └── version
│ └── version.go
├── python
│ ├── kubernetes_mcp_server
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ └── kubernetes_mcp_server.py
│ ├── pyproject.toml
│ └── README.md
├── README.md
└── smithery.yaml
```
# Files
--------------------------------------------------------------------------------
/pkg/mcp/testdata/toolsets-full-tools-multicluster.json:
--------------------------------------------------------------------------------
```json
[
{
"annotations": {
"title": "Configuration: Contexts List",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": true,
"openWorldHint": false
},
"description": "List all available context names and associated server urls from the kubeconfig file",
"inputSchema": {
"type": "object"
},
"name": "configuration_contexts_list"
},
{
"annotations": {
"title": "Configuration: View",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "Get the current Kubernetes configuration content as a kubeconfig YAML",
"inputSchema": {
"type": "object",
"properties": {
"minified": {
"description": "Return a minified version of the configuration. If set to true, keeps only the current-context and the relevant pieces of the configuration for that context. If set to false, all contexts, clusters, auth-infos, and users are returned in the configuration. (Optional, default true)",
"type": "boolean"
}
}
},
"name": "configuration_view"
},
{
"annotations": {
"title": "Events: List",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "List all the Kubernetes events in the current cluster from all namespaces",
"inputSchema": {
"type": "object",
"properties": {
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"type": "string"
},
"namespace": {
"description": "Optional Namespace to retrieve the events from. If not provided, will list events from all namespaces",
"type": "string"
}
}
},
"name": "events_list"
},
{
"annotations": {
"title": "Helm: Install",
"readOnlyHint": false,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "Install a Helm chart in the current or provided namespace",
"inputSchema": {
"type": "object",
"properties": {
"chart": {
"description": "Chart reference to install (for example: stable/grafana, oci://ghcr.io/nginxinc/charts/nginx-ingress)",
"type": "string"
},
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"type": "string"
},
"name": {
"description": "Name of the Helm release (Optional, random name if not provided)",
"type": "string"
},
"namespace": {
"description": "Namespace to install the Helm chart in (Optional, current namespace if not provided)",
"type": "string"
},
"values": {
"description": "Values to pass to the Helm chart (Optional)",
"type": "object"
}
},
"required": [
"chart"
]
},
"name": "helm_install"
},
{
"annotations": {
"title": "Helm: List",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "List all the Helm releases in the current or provided namespace (or in all namespaces if specified)",
"inputSchema": {
"type": "object",
"properties": {
"all_namespaces": {
"description": "If true, lists all Helm releases in all namespaces ignoring the namespace argument (Optional)",
"type": "boolean"
},
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"type": "string"
},
"namespace": {
"description": "Namespace to list Helm releases from (Optional, all namespaces if not provided)",
"type": "string"
}
}
},
"name": "helm_list"
},
{
"annotations": {
"title": "Helm: Uninstall",
"readOnlyHint": false,
"destructiveHint": true,
"idempotentHint": true,
"openWorldHint": true
},
"description": "Uninstall a Helm release in the current or provided namespace",
"inputSchema": {
"type": "object",
"properties": {
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"type": "string"
},
"name": {
"description": "Name of the Helm release to uninstall",
"type": "string"
},
"namespace": {
"description": "Namespace to uninstall the Helm release from (Optional, current namespace if not provided)",
"type": "string"
}
},
"required": [
"name"
]
},
"name": "helm_uninstall"
},
{
"annotations": {
"title": "Namespaces: List",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "List all the Kubernetes namespaces in the current cluster",
"inputSchema": {
"type": "object",
"properties": {
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"type": "string"
}
}
},
"name": "namespaces_list"
},
{
"annotations": {
"title": "Node: Log",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "Get logs from a Kubernetes node (kubelet, kube-proxy, or other system logs). This accesses node logs through the Kubernetes API proxy to the kubelet",
"inputSchema": {
"type": "object",
"properties": {
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"type": "string"
},
"log_path": {
"default": "kubelet.log",
"description": "Path to the log file on the node (e.g. 'kubelet.log', 'kube-proxy.log'). Default is 'kubelet.log'",
"type": "string"
},
"name": {
"description": "Name of the node to get logs from",
"type": "string"
},
"tail": {
"default": 100,
"description": "Number of lines to retrieve from the end of the logs (Optional, 0 means all logs)",
"minimum": 0,
"type": "integer"
}
},
"required": [
"name"
]
},
"name": "nodes_log"
},
{
"annotations": {
"title": "Pods: Delete",
"readOnlyHint": false,
"destructiveHint": true,
"idempotentHint": true,
"openWorldHint": true
},
"description": "Delete a Kubernetes Pod in the current or provided namespace with the provided name",
"inputSchema": {
"type": "object",
"properties": {
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"type": "string"
},
"name": {
"description": "Name of the Pod to delete",
"type": "string"
},
"namespace": {
"description": "Namespace to delete the Pod from",
"type": "string"
}
},
"required": [
"name"
]
},
"name": "pods_delete"
},
{
"annotations": {
"title": "Pods: Exec",
"readOnlyHint": false,
"destructiveHint": true,
"idempotentHint": false,
"openWorldHint": true
},
"description": "Execute a command in a Kubernetes Pod in the current or provided namespace with the provided name and command",
"inputSchema": {
"type": "object",
"properties": {
"command": {
"description": "Command to execute in the Pod container. The first item is the command to be run, and the rest are the arguments to that command. Example: [\"ls\", \"-l\", \"/tmp\"]",
"items": {
"type": "string"
},
"type": "array"
},
"container": {
"description": "Name of the Pod container where the command will be executed (Optional)",
"type": "string"
},
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"type": "string"
},
"name": {
"description": "Name of the Pod where the command will be executed",
"type": "string"
},
"namespace": {
"description": "Namespace of the Pod where the command will be executed",
"type": "string"
}
},
"required": [
"name",
"command"
]
},
"name": "pods_exec"
},
{
"annotations": {
"title": "Pods: Get",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "Get a Kubernetes Pod in the current or provided namespace with the provided name",
"inputSchema": {
"type": "object",
"properties": {
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"type": "string"
},
"name": {
"description": "Name of the Pod",
"type": "string"
},
"namespace": {
"description": "Namespace to get the Pod from",
"type": "string"
}
},
"required": [
"name"
]
},
"name": "pods_get"
},
{
"annotations": {
"title": "Pods: List",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "List all the Kubernetes pods in the current cluster from all namespaces",
"inputSchema": {
"type": "object",
"properties": {
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"type": "string"
},
"labelSelector": {
"description": "Optional Kubernetes label selector (e.g. 'app=myapp,env=prod' or 'app in (myapp,yourapp)'), use this option when you want to filter the pods by label",
"pattern": "([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9]",
"type": "string"
}
}
},
"name": "pods_list"
},
{
"annotations": {
"title": "Pods: List in Namespace",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "List all the Kubernetes pods in the specified namespace in the current cluster",
"inputSchema": {
"type": "object",
"properties": {
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"type": "string"
},
"labelSelector": {
"description": "Optional Kubernetes label selector (e.g. 'app=myapp,env=prod' or 'app in (myapp,yourapp)'), use this option when you want to filter the pods by label",
"pattern": "([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9]",
"type": "string"
},
"namespace": {
"description": "Namespace to list pods from",
"type": "string"
}
},
"required": [
"namespace"
]
},
"name": "pods_list_in_namespace"
},
{
"annotations": {
"title": "Pods: Log",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "Get the logs of a Kubernetes Pod in the current or provided namespace with the provided name",
"inputSchema": {
"type": "object",
"properties": {
"container": {
"description": "Name of the Pod container to get the logs from (Optional)",
"type": "string"
},
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"type": "string"
},
"name": {
"description": "Name of the Pod to get the logs from",
"type": "string"
},
"namespace": {
"description": "Namespace to get the Pod logs from",
"type": "string"
},
"previous": {
"description": "Return previous terminated container logs (Optional)",
"type": "boolean"
},
"tail": {
"default": 100,
"description": "Number of lines to retrieve from the end of the logs (Optional, default: 100)",
"minimum": 0,
"type": "integer"
}
},
"required": [
"name"
]
},
"name": "pods_log"
},
{
"annotations": {
"title": "Pods: Run",
"readOnlyHint": false,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "Run a Kubernetes Pod in the current or provided namespace with the provided container image and optional name",
"inputSchema": {
"type": "object",
"properties": {
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"type": "string"
},
"image": {
"description": "Container Image to run in the Pod",
"type": "string"
},
"name": {
"description": "Name of the Pod (Optional, random name if not provided)",
"type": "string"
},
"namespace": {
"description": "Namespace to run the Pod in",
"type": "string"
},
"port": {
"description": "TCP/IP port to expose from the Pod container (Optional, no port exposed if not provided)",
"type": "number"
}
},
"required": [
"image"
]
},
"name": "pods_run"
},
{
"annotations": {
"title": "Pods: Top",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": true,
"openWorldHint": true
},
"description": "List the resource consumption (CPU and memory) as recorded by the Kubernetes Metrics Server for the specified Kubernetes Pods in the all namespaces, the provided namespace, or the current namespace",
"inputSchema": {
"type": "object",
"properties": {
"all_namespaces": {
"default": true,
"description": "If true, list the resource consumption for all Pods in all namespaces. If false, list the resource consumption for Pods in the provided namespace or the current namespace",
"type": "boolean"
},
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"type": "string"
},
"label_selector": {
"description": "Kubernetes label selector (e.g. 'app=myapp,env=prod' or 'app in (myapp,yourapp)'), use this option when you want to filter the pods by label (Optional, only applicable when name is not provided)",
"pattern": "([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9]",
"type": "string"
},
"name": {
"description": "Name of the Pod to get the resource consumption from (Optional, all Pods in the namespace if not provided)",
"type": "string"
},
"namespace": {
"description": "Namespace to get the Pods resource consumption from (Optional, current namespace if not provided and all_namespaces is false)",
"type": "string"
}
}
},
"name": "pods_top"
},
{
"annotations": {
"title": "Resources: Create or Update",
"readOnlyHint": false,
"destructiveHint": true,
"idempotentHint": true,
"openWorldHint": true
},
"description": "Create or update a Kubernetes resource in the current cluster by providing a YAML or JSON representation of the resource\n(common apiVersion and kind include: v1 Pod, v1 Service, v1 Node, apps/v1 Deployment, networking.k8s.io/v1 Ingress)",
"inputSchema": {
"type": "object",
"properties": {
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"type": "string"
},
"resource": {
"description": "A JSON or YAML containing a representation of the Kubernetes resource. Should include top-level fields such as apiVersion,kind,metadata, and spec",
"type": "string"
}
},
"required": [
"resource"
]
},
"name": "resources_create_or_update"
},
{
"annotations": {
"title": "Resources: Delete",
"readOnlyHint": false,
"destructiveHint": true,
"idempotentHint": true,
"openWorldHint": true
},
"description": "Delete a Kubernetes resource in the current cluster by providing its apiVersion, kind, optionally the namespace, and its name\n(common apiVersion and kind include: v1 Pod, v1 Service, v1 Node, apps/v1 Deployment, networking.k8s.io/v1 Ingress)",
"inputSchema": {
"type": "object",
"properties": {
"apiVersion": {
"description": "apiVersion of the resource (examples of valid apiVersion are: v1, apps/v1, networking.k8s.io/v1)",
"type": "string"
},
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"type": "string"
},
"kind": {
"description": "kind of the resource (examples of valid kind are: Pod, Service, Deployment, Ingress)",
"type": "string"
},
"name": {
"description": "Name of the resource",
"type": "string"
},
"namespace": {
"description": "Optional Namespace to delete the namespaced resource from (ignored in case of cluster scoped resources). If not provided, will delete resource from configured namespace",
"type": "string"
}
},
"required": [
"apiVersion",
"kind",
"name"
]
},
"name": "resources_delete"
},
{
"annotations": {
"title": "Resources: Get",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "Get a Kubernetes resource in the current cluster by providing its apiVersion, kind, optionally the namespace, and its name\n(common apiVersion and kind include: v1 Pod, v1 Service, v1 Node, apps/v1 Deployment, networking.k8s.io/v1 Ingress)",
"inputSchema": {
"type": "object",
"properties": {
"apiVersion": {
"description": "apiVersion of the resource (examples of valid apiVersion are: v1, apps/v1, networking.k8s.io/v1)",
"type": "string"
},
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"type": "string"
},
"kind": {
"description": "kind of the resource (examples of valid kind are: Pod, Service, Deployment, Ingress)",
"type": "string"
},
"name": {
"description": "Name of the resource",
"type": "string"
},
"namespace": {
"description": "Optional Namespace to retrieve the namespaced resource from (ignored in case of cluster scoped resources). If not provided, will get resource from configured namespace",
"type": "string"
}
},
"required": [
"apiVersion",
"kind",
"name"
]
},
"name": "resources_get"
},
{
"annotations": {
"title": "Resources: List",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "List Kubernetes resources and objects in the current cluster by providing their apiVersion and kind and optionally the namespace and label selector\n(common apiVersion and kind include: v1 Pod, v1 Service, v1 Node, apps/v1 Deployment, networking.k8s.io/v1 Ingress)",
"inputSchema": {
"type": "object",
"properties": {
"apiVersion": {
"description": "apiVersion of the resources (examples of valid apiVersion are: v1, apps/v1, networking.k8s.io/v1)",
"type": "string"
},
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"type": "string"
},
"kind": {
"description": "kind of the resources (examples of valid kind are: Pod, Service, Deployment, Ingress)",
"type": "string"
},
"labelSelector": {
"description": "Optional Kubernetes label selector (e.g. 'app=myapp,env=prod' or 'app in (myapp,yourapp)'), use this option when you want to filter the pods by label",
"pattern": "([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9]",
"type": "string"
},
"namespace": {
"description": "Optional Namespace to retrieve the namespaced resources from (ignored in case of cluster scoped resources). If not provided, will list resources from all namespaces",
"type": "string"
}
},
"required": [
"apiVersion",
"kind"
]
},
"name": "resources_list"
}
]
```
--------------------------------------------------------------------------------
/pkg/mcp/testdata/toolsets-full-tools-multicluster-enum.json:
--------------------------------------------------------------------------------
```json
[
{
"annotations": {
"title": "Configuration: Contexts List",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": true,
"openWorldHint": false
},
"description": "List all available context names and associated server urls from the kubeconfig file",
"inputSchema": {
"type": "object"
},
"name": "configuration_contexts_list"
},
{
"annotations": {
"title": "Configuration: View",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "Get the current Kubernetes configuration content as a kubeconfig YAML",
"inputSchema": {
"type": "object",
"properties": {
"minified": {
"description": "Return a minified version of the configuration. If set to true, keeps only the current-context and the relevant pieces of the configuration for that context. If set to false, all contexts, clusters, auth-infos, and users are returned in the configuration. (Optional, default true)",
"type": "boolean"
}
}
},
"name": "configuration_view"
},
{
"annotations": {
"title": "Events: List",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "List all the Kubernetes events in the current cluster from all namespaces",
"inputSchema": {
"type": "object",
"properties": {
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"enum": [
"extra-cluster",
"fake-context"
],
"type": "string"
},
"namespace": {
"description": "Optional Namespace to retrieve the events from. If not provided, will list events from all namespaces",
"type": "string"
}
}
},
"name": "events_list"
},
{
"annotations": {
"title": "Helm: Install",
"readOnlyHint": false,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "Install a Helm chart in the current or provided namespace",
"inputSchema": {
"type": "object",
"properties": {
"chart": {
"description": "Chart reference to install (for example: stable/grafana, oci://ghcr.io/nginxinc/charts/nginx-ingress)",
"type": "string"
},
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"enum": [
"extra-cluster",
"fake-context"
],
"type": "string"
},
"name": {
"description": "Name of the Helm release (Optional, random name if not provided)",
"type": "string"
},
"namespace": {
"description": "Namespace to install the Helm chart in (Optional, current namespace if not provided)",
"type": "string"
},
"values": {
"description": "Values to pass to the Helm chart (Optional)",
"type": "object"
}
},
"required": [
"chart"
]
},
"name": "helm_install"
},
{
"annotations": {
"title": "Helm: List",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "List all the Helm releases in the current or provided namespace (or in all namespaces if specified)",
"inputSchema": {
"type": "object",
"properties": {
"all_namespaces": {
"description": "If true, lists all Helm releases in all namespaces ignoring the namespace argument (Optional)",
"type": "boolean"
},
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"enum": [
"extra-cluster",
"fake-context"
],
"type": "string"
},
"namespace": {
"description": "Namespace to list Helm releases from (Optional, all namespaces if not provided)",
"type": "string"
}
}
},
"name": "helm_list"
},
{
"annotations": {
"title": "Helm: Uninstall",
"readOnlyHint": false,
"destructiveHint": true,
"idempotentHint": true,
"openWorldHint": true
},
"description": "Uninstall a Helm release in the current or provided namespace",
"inputSchema": {
"type": "object",
"properties": {
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"enum": [
"extra-cluster",
"fake-context"
],
"type": "string"
},
"name": {
"description": "Name of the Helm release to uninstall",
"type": "string"
},
"namespace": {
"description": "Namespace to uninstall the Helm release from (Optional, current namespace if not provided)",
"type": "string"
}
},
"required": [
"name"
]
},
"name": "helm_uninstall"
},
{
"annotations": {
"title": "Namespaces: List",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "List all the Kubernetes namespaces in the current cluster",
"inputSchema": {
"type": "object",
"properties": {
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"enum": [
"extra-cluster",
"fake-context"
],
"type": "string"
}
}
},
"name": "namespaces_list"
},
{
"annotations": {
"title": "Node: Log",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "Get logs from a Kubernetes node (kubelet, kube-proxy, or other system logs). This accesses node logs through the Kubernetes API proxy to the kubelet",
"inputSchema": {
"type": "object",
"properties": {
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"enum": [
"extra-cluster",
"fake-context"
],
"type": "string"
},
"log_path": {
"default": "kubelet.log",
"description": "Path to the log file on the node (e.g. 'kubelet.log', 'kube-proxy.log'). Default is 'kubelet.log'",
"type": "string"
},
"name": {
"description": "Name of the node to get logs from",
"type": "string"
},
"tail": {
"default": 100,
"description": "Number of lines to retrieve from the end of the logs (Optional, 0 means all logs)",
"minimum": 0,
"type": "integer"
}
},
"required": [
"name"
]
},
"name": "nodes_log"
},
{
"annotations": {
"title": "Pods: Delete",
"readOnlyHint": false,
"destructiveHint": true,
"idempotentHint": true,
"openWorldHint": true
},
"description": "Delete a Kubernetes Pod in the current or provided namespace with the provided name",
"inputSchema": {
"type": "object",
"properties": {
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"enum": [
"extra-cluster",
"fake-context"
],
"type": "string"
},
"name": {
"description": "Name of the Pod to delete",
"type": "string"
},
"namespace": {
"description": "Namespace to delete the Pod from",
"type": "string"
}
},
"required": [
"name"
]
},
"name": "pods_delete"
},
{
"annotations": {
"title": "Pods: Exec",
"readOnlyHint": false,
"destructiveHint": true,
"idempotentHint": false,
"openWorldHint": true
},
"description": "Execute a command in a Kubernetes Pod in the current or provided namespace with the provided name and command",
"inputSchema": {
"type": "object",
"properties": {
"command": {
"description": "Command to execute in the Pod container. The first item is the command to be run, and the rest are the arguments to that command. Example: [\"ls\", \"-l\", \"/tmp\"]",
"items": {
"type": "string"
},
"type": "array"
},
"container": {
"description": "Name of the Pod container where the command will be executed (Optional)",
"type": "string"
},
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"enum": [
"extra-cluster",
"fake-context"
],
"type": "string"
},
"name": {
"description": "Name of the Pod where the command will be executed",
"type": "string"
},
"namespace": {
"description": "Namespace of the Pod where the command will be executed",
"type": "string"
}
},
"required": [
"name",
"command"
]
},
"name": "pods_exec"
},
{
"annotations": {
"title": "Pods: Get",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "Get a Kubernetes Pod in the current or provided namespace with the provided name",
"inputSchema": {
"type": "object",
"properties": {
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"enum": [
"extra-cluster",
"fake-context"
],
"type": "string"
},
"name": {
"description": "Name of the Pod",
"type": "string"
},
"namespace": {
"description": "Namespace to get the Pod from",
"type": "string"
}
},
"required": [
"name"
]
},
"name": "pods_get"
},
{
"annotations": {
"title": "Pods: List",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "List all the Kubernetes pods in the current cluster from all namespaces",
"inputSchema": {
"type": "object",
"properties": {
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"enum": [
"extra-cluster",
"fake-context"
],
"type": "string"
},
"labelSelector": {
"description": "Optional Kubernetes label selector (e.g. 'app=myapp,env=prod' or 'app in (myapp,yourapp)'), use this option when you want to filter the pods by label",
"pattern": "([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9]",
"type": "string"
}
}
},
"name": "pods_list"
},
{
"annotations": {
"title": "Pods: List in Namespace",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "List all the Kubernetes pods in the specified namespace in the current cluster",
"inputSchema": {
"type": "object",
"properties": {
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"enum": [
"extra-cluster",
"fake-context"
],
"type": "string"
},
"labelSelector": {
"description": "Optional Kubernetes label selector (e.g. 'app=myapp,env=prod' or 'app in (myapp,yourapp)'), use this option when you want to filter the pods by label",
"pattern": "([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9]",
"type": "string"
},
"namespace": {
"description": "Namespace to list pods from",
"type": "string"
}
},
"required": [
"namespace"
]
},
"name": "pods_list_in_namespace"
},
{
"annotations": {
"title": "Pods: Log",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "Get the logs of a Kubernetes Pod in the current or provided namespace with the provided name",
"inputSchema": {
"type": "object",
"properties": {
"container": {
"description": "Name of the Pod container to get the logs from (Optional)",
"type": "string"
},
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"enum": [
"extra-cluster",
"fake-context"
],
"type": "string"
},
"name": {
"description": "Name of the Pod to get the logs from",
"type": "string"
},
"namespace": {
"description": "Namespace to get the Pod logs from",
"type": "string"
},
"previous": {
"description": "Return previous terminated container logs (Optional)",
"type": "boolean"
},
"tail": {
"default": 100,
"description": "Number of lines to retrieve from the end of the logs (Optional, default: 100)",
"minimum": 0,
"type": "integer"
}
},
"required": [
"name"
]
},
"name": "pods_log"
},
{
"annotations": {
"title": "Pods: Run",
"readOnlyHint": false,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "Run a Kubernetes Pod in the current or provided namespace with the provided container image and optional name",
"inputSchema": {
"type": "object",
"properties": {
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"enum": [
"extra-cluster",
"fake-context"
],
"type": "string"
},
"image": {
"description": "Container Image to run in the Pod",
"type": "string"
},
"name": {
"description": "Name of the Pod (Optional, random name if not provided)",
"type": "string"
},
"namespace": {
"description": "Namespace to run the Pod in",
"type": "string"
},
"port": {
"description": "TCP/IP port to expose from the Pod container (Optional, no port exposed if not provided)",
"type": "number"
}
},
"required": [
"image"
]
},
"name": "pods_run"
},
{
"annotations": {
"title": "Pods: Top",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": true,
"openWorldHint": true
},
"description": "List the resource consumption (CPU and memory) as recorded by the Kubernetes Metrics Server for the specified Kubernetes Pods in the all namespaces, the provided namespace, or the current namespace",
"inputSchema": {
"type": "object",
"properties": {
"all_namespaces": {
"default": true,
"description": "If true, list the resource consumption for all Pods in all namespaces. If false, list the resource consumption for Pods in the provided namespace or the current namespace",
"type": "boolean"
},
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"enum": [
"extra-cluster",
"fake-context"
],
"type": "string"
},
"label_selector": {
"description": "Kubernetes label selector (e.g. 'app=myapp,env=prod' or 'app in (myapp,yourapp)'), use this option when you want to filter the pods by label (Optional, only applicable when name is not provided)",
"pattern": "([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9]",
"type": "string"
},
"name": {
"description": "Name of the Pod to get the resource consumption from (Optional, all Pods in the namespace if not provided)",
"type": "string"
},
"namespace": {
"description": "Namespace to get the Pods resource consumption from (Optional, current namespace if not provided and all_namespaces is false)",
"type": "string"
}
}
},
"name": "pods_top"
},
{
"annotations": {
"title": "Resources: Create or Update",
"readOnlyHint": false,
"destructiveHint": true,
"idempotentHint": true,
"openWorldHint": true
},
"description": "Create or update a Kubernetes resource in the current cluster by providing a YAML or JSON representation of the resource\n(common apiVersion and kind include: v1 Pod, v1 Service, v1 Node, apps/v1 Deployment, networking.k8s.io/v1 Ingress)",
"inputSchema": {
"type": "object",
"properties": {
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"enum": [
"extra-cluster",
"fake-context"
],
"type": "string"
},
"resource": {
"description": "A JSON or YAML containing a representation of the Kubernetes resource. Should include top-level fields such as apiVersion,kind,metadata, and spec",
"type": "string"
}
},
"required": [
"resource"
]
},
"name": "resources_create_or_update"
},
{
"annotations": {
"title": "Resources: Delete",
"readOnlyHint": false,
"destructiveHint": true,
"idempotentHint": true,
"openWorldHint": true
},
"description": "Delete a Kubernetes resource in the current cluster by providing its apiVersion, kind, optionally the namespace, and its name\n(common apiVersion and kind include: v1 Pod, v1 Service, v1 Node, apps/v1 Deployment, networking.k8s.io/v1 Ingress)",
"inputSchema": {
"type": "object",
"properties": {
"apiVersion": {
"description": "apiVersion of the resource (examples of valid apiVersion are: v1, apps/v1, networking.k8s.io/v1)",
"type": "string"
},
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"enum": [
"extra-cluster",
"fake-context"
],
"type": "string"
},
"kind": {
"description": "kind of the resource (examples of valid kind are: Pod, Service, Deployment, Ingress)",
"type": "string"
},
"name": {
"description": "Name of the resource",
"type": "string"
},
"namespace": {
"description": "Optional Namespace to delete the namespaced resource from (ignored in case of cluster scoped resources). If not provided, will delete resource from configured namespace",
"type": "string"
}
},
"required": [
"apiVersion",
"kind",
"name"
]
},
"name": "resources_delete"
},
{
"annotations": {
"title": "Resources: Get",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "Get a Kubernetes resource in the current cluster by providing its apiVersion, kind, optionally the namespace, and its name\n(common apiVersion and kind include: v1 Pod, v1 Service, v1 Node, apps/v1 Deployment, networking.k8s.io/v1 Ingress)",
"inputSchema": {
"type": "object",
"properties": {
"apiVersion": {
"description": "apiVersion of the resource (examples of valid apiVersion are: v1, apps/v1, networking.k8s.io/v1)",
"type": "string"
},
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"enum": [
"extra-cluster",
"fake-context"
],
"type": "string"
},
"kind": {
"description": "kind of the resource (examples of valid kind are: Pod, Service, Deployment, Ingress)",
"type": "string"
},
"name": {
"description": "Name of the resource",
"type": "string"
},
"namespace": {
"description": "Optional Namespace to retrieve the namespaced resource from (ignored in case of cluster scoped resources). If not provided, will get resource from configured namespace",
"type": "string"
}
},
"required": [
"apiVersion",
"kind",
"name"
]
},
"name": "resources_get"
},
{
"annotations": {
"title": "Resources: List",
"readOnlyHint": true,
"destructiveHint": false,
"idempotentHint": false,
"openWorldHint": true
},
"description": "List Kubernetes resources and objects in the current cluster by providing their apiVersion and kind and optionally the namespace and label selector\n(common apiVersion and kind include: v1 Pod, v1 Service, v1 Node, apps/v1 Deployment, networking.k8s.io/v1 Ingress)",
"inputSchema": {
"type": "object",
"properties": {
"apiVersion": {
"description": "apiVersion of the resources (examples of valid apiVersion are: v1, apps/v1, networking.k8s.io/v1)",
"type": "string"
},
"context": {
"description": "Optional parameter selecting which context to run the tool in. Defaults to fake-context if not set",
"enum": [
"extra-cluster",
"fake-context"
],
"type": "string"
},
"kind": {
"description": "kind of the resources (examples of valid kind are: Pod, Service, Deployment, Ingress)",
"type": "string"
},
"labelSelector": {
"description": "Optional Kubernetes label selector (e.g. 'app=myapp,env=prod' or 'app in (myapp,yourapp)'), use this option when you want to filter the pods by label",
"pattern": "([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9]",
"type": "string"
},
"namespace": {
"description": "Optional Namespace to retrieve the namespaced resources from (ignored in case of cluster scoped resources). If not provided, will list resources from all namespaces",
"type": "string"
}
},
"required": [
"apiVersion",
"kind"
]
},
"name": "resources_list"
}
]
```
--------------------------------------------------------------------------------
/pkg/mcp/resources_test.go:
--------------------------------------------------------------------------------
```go
package mcp
import (
"regexp"
"strings"
"testing"
"github.com/mark3labs/mcp-go/mcp"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"sigs.k8s.io/yaml"
"github.com/containers/kubernetes-mcp-server/internal/test"
"github.com/containers/kubernetes-mcp-server/pkg/config"
"github.com/containers/kubernetes-mcp-server/pkg/output"
)
func TestResourcesList(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
t.Run("resources_list with missing apiVersion returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_list", map[string]interface{}{})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to list resources, missing argument apiVersion" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
}
})
t.Run("resources_list with missing kind returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_list", map[string]interface{}{"apiVersion": "v1"})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to list resources, missing argument kind" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
}
})
t.Run("resources_list with invalid apiVersion returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_list", map[string]interface{}{"apiVersion": "invalid/api/version", "kind": "Pod"})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to list resources, invalid argument apiVersion" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
}
})
t.Run("resources_list with nonexistent apiVersion returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_list", map[string]interface{}{"apiVersion": "custom.non.existent.example.com/v1", "kind": "Custom"})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
}
if toolResult.Content[0].(mcp.TextContent).Text != `failed to list resources: no matches for kind "Custom" in version "custom.non.existent.example.com/v1"` {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
}
})
namespaces, err := c.callTool("resources_list", map[string]interface{}{"apiVersion": "v1", "kind": "Namespace"})
t.Run("resources_list returns namespaces", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if namespaces.IsError {
t.Fatalf("call tool failed")
return
}
})
var decodedNamespaces []unstructured.Unstructured
err = yaml.Unmarshal([]byte(namespaces.Content[0].(mcp.TextContent).Text), &decodedNamespaces)
t.Run("resources_list has yaml content", func(t *testing.T) {
if err != nil {
t.Fatalf("invalid tool result content %v", err)
}
})
t.Run("resources_list returns more than 2 items", func(t *testing.T) {
if len(decodedNamespaces) < 3 {
t.Fatalf("invalid namespace count, expected >2, got %v", len(decodedNamespaces))
}
})
// Test label selector functionality
t.Run("resources_list with label selector returns filtered pods", func(t *testing.T) {
// List pods with label selector
result, err := c.callTool("resources_list", map[string]interface{}{
"apiVersion": "v1",
"kind": "Pod",
"namespace": "default",
"labelSelector": "app=nginx",
})
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if result.IsError {
t.Fatalf("call tool failed")
return
}
var decodedPods []unstructured.Unstructured
err = yaml.Unmarshal([]byte(result.Content[0].(mcp.TextContent).Text), &decodedPods)
if err != nil {
t.Fatalf("invalid tool result content %v", err)
return
}
// Verify only the pod with matching label is returned
if len(decodedPods) != 1 {
t.Fatalf("expected 1 pod, got %d", len(decodedPods))
return
}
if decodedPods[0].GetName() != "a-pod-in-default" {
t.Fatalf("expected pod-with-label, got %s", decodedPods[0].GetName())
return
}
// Test that multiple label selectors work
result, err = c.callTool("resources_list", map[string]interface{}{
"apiVersion": "v1",
"kind": "Pod",
"namespace": "default",
"labelSelector": "test-label=test-value,another=value",
})
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if result.IsError {
t.Fatalf("call tool failed")
return
}
err = yaml.Unmarshal([]byte(result.Content[0].(mcp.TextContent).Text), &decodedPods)
if err != nil {
t.Fatalf("invalid tool result content %v", err)
return
}
// Verify no pods match multiple label selector
if len(decodedPods) != 0 {
t.Fatalf("expected 0 pods, got %d", len(decodedPods))
return
}
})
})
}
func TestResourcesListDenied(t *testing.T) {
deniedResourcesServer := test.Must(config.ReadToml([]byte(`
denied_resources = [
{ version = "v1", kind = "Secret" },
{ group = "rbac.authorization.k8s.io", version = "v1" }
]
`)))
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
c.withEnvTest()
deniedByKind, _ := c.callTool("resources_list", map[string]interface{}{"apiVersion": "v1", "kind": "Secret"})
t.Run("resources_list (denied by kind) has error", func(t *testing.T) {
if !deniedByKind.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("resources_list (denied by kind) describes denial", func(t *testing.T) {
expectedMessage := "failed to list resources: resource not allowed: /v1, Kind=Secret"
if deniedByKind.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, deniedByKind.Content[0].(mcp.TextContent).Text)
}
})
deniedByGroup, _ := c.callTool("resources_list", map[string]interface{}{"apiVersion": "rbac.authorization.k8s.io/v1", "kind": "Role"})
t.Run("resources_list (denied by group) has error", func(t *testing.T) {
if !deniedByGroup.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("resources_list (denied by group) describes denial", func(t *testing.T) {
expectedMessage := "failed to list resources: resource not allowed: rbac.authorization.k8s.io/v1, Kind=Role"
if deniedByGroup.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, deniedByKind.Content[0].(mcp.TextContent).Text)
}
})
allowedResource, _ := c.callTool("resources_list", map[string]interface{}{"apiVersion": "v1", "kind": "Namespace"})
t.Run("resources_list (not denied) returns list", func(t *testing.T) {
if allowedResource.IsError {
t.Fatalf("call tool should not fail")
}
})
})
}
func TestResourcesListAsTable(t *testing.T) {
testCaseWithContext(t, &mcpContext{listOutput: output.Table, before: inOpenShift, after: inOpenShiftClear}, func(c *mcpContext) {
c.withEnvTest()
kc := c.newKubernetesClient()
_, _ = kc.CoreV1().ConfigMaps("default").Create(t.Context(), &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: "a-configmap-to-list-as-table", Labels: map[string]string{"resource": "config-map"}},
Data: map[string]string{"key": "value"},
}, metav1.CreateOptions{})
configMapList, err := c.callTool("resources_list", map[string]interface{}{"apiVersion": "v1", "kind": "ConfigMap"})
t.Run("resources_list returns ConfigMap list", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
}
if configMapList.IsError {
t.Fatalf("call tool failed")
}
})
outConfigMapList := configMapList.Content[0].(mcp.TextContent).Text
t.Run("resources_list returns column headers for ConfigMap list", func(t *testing.T) {
expectedHeaders := "NAMESPACE\\s+APIVERSION\\s+KIND\\s+NAME\\s+DATA\\s+AGE\\s+LABELS"
if m, e := regexp.MatchString(expectedHeaders, outConfigMapList); !m || e != nil {
t.Fatalf("Expected headers '%s' not found in output:\n%s", expectedHeaders, outConfigMapList)
}
})
t.Run("resources_list returns formatted row for a-configmap-to-list-as-table", func(t *testing.T) {
expectedRow := "(?<namespace>default)\\s+" +
"(?<apiVersion>v1)\\s+" +
"(?<kind>ConfigMap)\\s+" +
"(?<name>a-configmap-to-list-as-table)\\s+" +
"(?<data>1)\\s+" +
"(?<age>(\\d+m)?(\\d+s)?)\\s+" +
"(?<labels>resource=config-map)"
if m, e := regexp.MatchString(expectedRow, outConfigMapList); !m || e != nil {
t.Fatalf("Expected row '%s' not found in output:\n%s", expectedRow, outConfigMapList)
}
})
// Custom Resource List
_, _ = dynamic.NewForConfigOrDie(envTestRestConfig).
Resource(schema.GroupVersionResource{Group: "route.openshift.io", Version: "v1", Resource: "routes"}).
Namespace("default").
Create(c.ctx, &unstructured.Unstructured{Object: map[string]interface{}{
"apiVersion": "route.openshift.io/v1",
"kind": "Route",
"metadata": map[string]interface{}{
"name": "an-openshift-route-to-list-as-table",
},
}}, metav1.CreateOptions{})
routeList, err := c.callTool("resources_list", map[string]interface{}{"apiVersion": "route.openshift.io/v1", "kind": "Route"})
t.Run("resources_list returns Route list", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
}
if routeList.IsError {
t.Fatalf("call tool failed")
}
})
outRouteList := routeList.Content[0].(mcp.TextContent).Text
t.Run("resources_list returns column headers for Route list", func(t *testing.T) {
expectedHeaders := "NAMESPACE\\s+APIVERSION\\s+KIND\\s+NAME\\s+AGE\\s+LABELS"
if m, e := regexp.MatchString(expectedHeaders, outRouteList); !m || e != nil {
t.Fatalf("Expected headers '%s' not found in output:\n%s", expectedHeaders, outRouteList)
}
})
t.Run("resources_list returns formatted row for an-openshift-route-to-list-as-table", func(t *testing.T) {
expectedRow := "(?<namespace>default)\\s+" +
"(?<apiVersion>route.openshift.io/v1)\\s+" +
"(?<kind>Route)\\s+" +
"(?<name>an-openshift-route-to-list-as-table)\\s+" +
"(?<age>(\\d+m)?(\\d+s)?)\\s+" +
"(?<labels><none>)"
if m, e := regexp.MatchString(expectedRow, outRouteList); !m || e != nil {
t.Fatalf("Expected row '%s' not found in output:\n%s", expectedRow, outRouteList)
}
})
})
}
func TestResourcesGet(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
t.Run("resources_get with missing apiVersion returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_get", map[string]interface{}{})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to get resource, missing argument apiVersion" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("resources_get with missing kind returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_get", map[string]interface{}{"apiVersion": "v1"})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to get resource, missing argument kind" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("resources_get with invalid apiVersion returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_get", map[string]interface{}{"apiVersion": "invalid/api/version", "kind": "Pod", "name": "a-pod"})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to get resource, invalid argument apiVersion" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("resources_get with nonexistent apiVersion returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_get", map[string]interface{}{"apiVersion": "custom.non.existent.example.com/v1", "kind": "Custom", "name": "a-custom"})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != `failed to get resource: no matches for kind "Custom" in version "custom.non.existent.example.com/v1"` {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("resources_get with missing name returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_get", map[string]interface{}{"apiVersion": "v1", "kind": "Namespace"})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to get resource, missing argument name" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
namespace, err := c.callTool("resources_get", map[string]interface{}{"apiVersion": "v1", "kind": "Namespace", "name": "default"})
t.Run("resources_get returns namespace", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if namespace.IsError {
t.Fatalf("call tool failed")
return
}
})
var decodedNamespace unstructured.Unstructured
err = yaml.Unmarshal([]byte(namespace.Content[0].(mcp.TextContent).Text), &decodedNamespace)
t.Run("resources_get has yaml content", func(t *testing.T) {
if err != nil {
t.Fatalf("invalid tool result content %v", err)
return
}
})
t.Run("resources_get returns default namespace", func(t *testing.T) {
if decodedNamespace.GetName() != "default" {
t.Fatalf("invalid namespace name, expected default, got %v", decodedNamespace.GetName())
return
}
})
})
}
func TestResourcesGetDenied(t *testing.T) {
deniedResourcesServer := test.Must(config.ReadToml([]byte(`
denied_resources = [
{ version = "v1", kind = "Secret" },
{ group = "rbac.authorization.k8s.io", version = "v1" }
]
`)))
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
c.withEnvTest()
kc := c.newKubernetesClient()
_, _ = kc.CoreV1().Secrets("default").Create(c.ctx, &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{Name: "denied-secret"},
}, metav1.CreateOptions{})
_, _ = kc.RbacV1().Roles("default").Create(c.ctx, &v1.Role{
ObjectMeta: metav1.ObjectMeta{Name: "denied-role"},
}, metav1.CreateOptions{})
deniedByKind, _ := c.callTool("resources_get", map[string]interface{}{"apiVersion": "v1", "kind": "Secret", "namespace": "default", "name": "denied-secret"})
t.Run("resources_get (denied by kind) has error", func(t *testing.T) {
if !deniedByKind.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("resources_get (denied by kind) describes denial", func(t *testing.T) {
expectedMessage := "failed to get resource: resource not allowed: /v1, Kind=Secret"
if deniedByKind.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, deniedByKind.Content[0].(mcp.TextContent).Text)
}
})
deniedByGroup, _ := c.callTool("resources_get", map[string]interface{}{"apiVersion": "rbac.authorization.k8s.io/v1", "kind": "Role", "namespace": "default", "name": "denied-role"})
t.Run("resources_get (denied by group) has error", func(t *testing.T) {
if !deniedByGroup.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("resources_get (denied by group) describes denial", func(t *testing.T) {
expectedMessage := "failed to get resource: resource not allowed: rbac.authorization.k8s.io/v1, Kind=Role"
if deniedByGroup.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, deniedByKind.Content[0].(mcp.TextContent).Text)
}
})
allowedResource, _ := c.callTool("resources_get", map[string]interface{}{"apiVersion": "v1", "kind": "Namespace", "name": "default"})
t.Run("resources_get (not denied) returns resource", func(t *testing.T) {
if allowedResource.IsError {
t.Fatalf("call tool should not fail")
}
})
})
}
func TestResourcesCreateOrUpdate(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
t.Run("resources_create_or_update with nil resource returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_create_or_update", map[string]interface{}{})
if toolResult.IsError != true {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to create or update resources, missing argument resource" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("resources_create_or_update with empty resource returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_create_or_update", map[string]interface{}{"resource": ""})
if toolResult.IsError != true {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to create or update resources, missing argument resource" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
client := c.newKubernetesClient()
configMapYaml := "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: a-cm-created-or-updated\n namespace: default\n"
resourcesCreateOrUpdateCm1, err := c.callTool("resources_create_or_update", map[string]interface{}{"resource": configMapYaml})
t.Run("resources_create_or_update with valid namespaced yaml resource returns success", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if resourcesCreateOrUpdateCm1.IsError {
t.Errorf("call tool failed")
return
}
})
var decodedCreateOrUpdateCm1 []unstructured.Unstructured
err = yaml.Unmarshal([]byte(resourcesCreateOrUpdateCm1.Content[0].(mcp.TextContent).Text), &decodedCreateOrUpdateCm1)
t.Run("resources_create_or_update with valid namespaced yaml resource returns yaml content", func(t *testing.T) {
if err != nil {
t.Errorf("invalid tool result content %v", err)
return
}
if !strings.HasPrefix(resourcesCreateOrUpdateCm1.Content[0].(mcp.TextContent).Text, "# The following resources (YAML) have been created or updated successfully") {
t.Errorf("Excpected success message, got %v", resourcesCreateOrUpdateCm1.Content[0].(mcp.TextContent).Text)
return
}
if len(decodedCreateOrUpdateCm1) != 1 {
t.Errorf("invalid resource count, expected 1, got %v", len(decodedCreateOrUpdateCm1))
return
}
if decodedCreateOrUpdateCm1[0].GetName() != "a-cm-created-or-updated" {
t.Errorf("invalid resource name, expected a-cm-created-or-updated, got %v", decodedCreateOrUpdateCm1[0].GetName())
return
}
if decodedCreateOrUpdateCm1[0].GetUID() == "" {
t.Errorf("invalid uid, got %v", decodedCreateOrUpdateCm1[0].GetUID())
return
}
})
t.Run("resources_create_or_update with valid namespaced yaml resource creates ConfigMap", func(t *testing.T) {
cm, _ := client.CoreV1().ConfigMaps("default").Get(c.ctx, "a-cm-created-or-updated", metav1.GetOptions{})
if cm == nil {
t.Fatalf("ConfigMap not found")
return
}
})
configMapJson := "{\"apiVersion\": \"v1\", \"kind\": \"ConfigMap\", \"metadata\": {\"name\": \"a-cm-created-or-updated-2\", \"namespace\": \"default\"}}"
resourcesCreateOrUpdateCm2, err := c.callTool("resources_create_or_update", map[string]interface{}{"resource": configMapJson})
t.Run("resources_create_or_update with valid namespaced json resource returns success", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if resourcesCreateOrUpdateCm2.IsError {
t.Fatalf("call tool failed")
return
}
})
t.Run("resources_create_or_update with valid namespaced json resource creates config map", func(t *testing.T) {
cm, _ := client.CoreV1().ConfigMaps("default").Get(c.ctx, "a-cm-created-or-updated-2", metav1.GetOptions{})
if cm == nil {
t.Fatalf("ConfigMap not found")
return
}
})
customResourceDefinitionJson := `
{
"apiVersion": "apiextensions.k8s.io/v1",
"kind": "CustomResourceDefinition",
"metadata": {"name": "customs.example.com"},
"spec": {
"group": "example.com",
"versions": [{
"name": "v1","served": true,"storage": true,
"schema": {"openAPIV3Schema": {"type": "object"}}
}],
"scope": "Namespaced",
"names": {"plural": "customs","singular": "custom","kind": "Custom"}
}
}`
resourcesCreateOrUpdateCrd, err := c.callTool("resources_create_or_update", map[string]interface{}{"resource": customResourceDefinitionJson})
t.Run("resources_create_or_update with valid cluster-scoped json resource returns success", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if resourcesCreateOrUpdateCrd.IsError {
t.Fatalf("call tool failed")
return
}
})
t.Run("resources_create_or_update with valid cluster-scoped json resource creates custom resource definition", func(t *testing.T) {
apiExtensionsV1Client := c.newApiExtensionsClient()
_, err = apiExtensionsV1Client.CustomResourceDefinitions().Get(c.ctx, "customs.example.com", metav1.GetOptions{})
if err != nil {
t.Fatalf("custom resource definition not found")
return
}
})
c.crdWaitUntilReady("customs.example.com")
customJson := "{\"apiVersion\": \"example.com/v1\", \"kind\": \"Custom\", \"metadata\": {\"name\": \"a-custom-resource\"}}"
resourcesCreateOrUpdateCustom, err := c.callTool("resources_create_or_update", map[string]interface{}{"resource": customJson})
t.Run("resources_create_or_update with valid namespaced json resource returns success", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if resourcesCreateOrUpdateCustom.IsError {
t.Fatalf("call tool failed, got: %v", resourcesCreateOrUpdateCustom.Content)
return
}
})
t.Run("resources_create_or_update with valid namespaced json resource creates custom resource", func(t *testing.T) {
dynamicClient := dynamic.NewForConfigOrDie(envTestRestConfig)
_, err = dynamicClient.
Resource(schema.GroupVersionResource{Group: "example.com", Version: "v1", Resource: "customs"}).
Namespace("default").
Get(c.ctx, "a-custom-resource", metav1.GetOptions{})
if err != nil {
t.Fatalf("custom resource not found")
return
}
})
customJsonUpdated := "{\"apiVersion\": \"example.com/v1\", \"kind\": \"Custom\", \"metadata\": {\"name\": \"a-custom-resource\",\"annotations\": {\"updated\": \"true\"}}}"
resourcesCreateOrUpdateCustomUpdated, err := c.callTool("resources_create_or_update", map[string]interface{}{"resource": customJsonUpdated})
t.Run("resources_create_or_update with valid namespaced json resource updates custom resource", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if resourcesCreateOrUpdateCustomUpdated.IsError {
t.Fatalf("call tool failed")
return
}
})
t.Run("resources_create_or_update with valid namespaced json resource updates custom resource", func(t *testing.T) {
dynamicClient := dynamic.NewForConfigOrDie(envTestRestConfig)
customResource, _ := dynamicClient.
Resource(schema.GroupVersionResource{Group: "example.com", Version: "v1", Resource: "customs"}).
Namespace("default").
Get(c.ctx, "a-custom-resource", metav1.GetOptions{})
if customResource == nil {
t.Fatalf("custom resource not found")
return
}
annotations := customResource.GetAnnotations()
if annotations == nil || annotations["updated"] != "true" {
t.Fatalf("custom resource not updated")
return
}
})
})
}
func TestResourcesCreateOrUpdateDenied(t *testing.T) {
deniedResourcesServer := test.Must(config.ReadToml([]byte(`
denied_resources = [
{ version = "v1", kind = "Secret" },
{ group = "rbac.authorization.k8s.io", version = "v1" }
]
`)))
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
c.withEnvTest()
secretYaml := "apiVersion: v1\nkind: Secret\nmetadata:\n name: a-denied-secret\n namespace: default\n"
deniedByKind, _ := c.callTool("resources_create_or_update", map[string]interface{}{"resource": secretYaml})
t.Run("resources_create_or_update (denied by kind) has error", func(t *testing.T) {
if !deniedByKind.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("resources_create_or_update (denied by kind) describes denial", func(t *testing.T) {
expectedMessage := "failed to create or update resources: resource not allowed: /v1, Kind=Secret"
if deniedByKind.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, deniedByKind.Content[0].(mcp.TextContent).Text)
}
})
roleYaml := "apiVersion: rbac.authorization.k8s.io/v1\nkind: Role\nmetadata:\n name: a-denied-role\n namespace: default\n"
deniedByGroup, _ := c.callTool("resources_create_or_update", map[string]interface{}{"resource": roleYaml})
t.Run("resources_create_or_update (denied by group) has error", func(t *testing.T) {
if !deniedByGroup.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("resources_create_or_update (denied by group) describes denial", func(t *testing.T) {
expectedMessage := "failed to create or update resources: resource not allowed: rbac.authorization.k8s.io/v1, Kind=Role"
if deniedByGroup.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, deniedByKind.Content[0].(mcp.TextContent).Text)
}
})
configMapYaml := "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: a-cm-created-or-updated\n namespace: default\n"
allowedResource, _ := c.callTool("resources_create_or_update", map[string]interface{}{"resource": configMapYaml})
t.Run("resources_create_or_update (not denied) creates or updates resource", func(t *testing.T) {
if allowedResource.IsError {
t.Fatalf("call tool should not fail")
}
})
})
}
func TestResourcesDelete(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
t.Run("resources_delete with missing apiVersion returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_delete", map[string]interface{}{})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to delete resource, missing argument apiVersion" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("resources_delete with missing kind returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_delete", map[string]interface{}{"apiVersion": "v1"})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to delete resource, missing argument kind" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("resources_delete with invalid apiVersion returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_delete", map[string]interface{}{"apiVersion": "invalid/api/version", "kind": "Pod", "name": "a-pod"})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to delete resource, invalid argument apiVersion" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("resources_delete with nonexistent apiVersion returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_delete", map[string]interface{}{"apiVersion": "custom.non.existent.example.com/v1", "kind": "Custom", "name": "a-custom"})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != `failed to delete resource: no matches for kind "Custom" in version "custom.non.existent.example.com/v1"` {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("resources_delete with missing name returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_delete", map[string]interface{}{"apiVersion": "v1", "kind": "Namespace"})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to delete resource, missing argument name" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("resources_delete with nonexistent resource returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_delete", map[string]interface{}{"apiVersion": "v1", "kind": "ConfigMap", "name": "nonexistent-configmap"})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != `failed to delete resource: configmaps "nonexistent-configmap" not found` {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
resourcesDeleteCm, err := c.callTool("resources_delete", map[string]interface{}{"apiVersion": "v1", "kind": "ConfigMap", "name": "a-configmap-to-delete"})
t.Run("resources_delete with valid namespaced resource returns success", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if resourcesDeleteCm.IsError {
t.Fatalf("call tool failed")
return
}
if resourcesDeleteCm.Content[0].(mcp.TextContent).Text != "Resource deleted successfully" {
t.Fatalf("invalid tool result content got: %v", resourcesDeleteCm.Content[0].(mcp.TextContent).Text)
return
}
})
client := c.newKubernetesClient()
t.Run("resources_delete with valid namespaced resource deletes ConfigMap", func(t *testing.T) {
_, err := client.CoreV1().ConfigMaps("default").Get(c.ctx, "a-configmap-to-delete", metav1.GetOptions{})
if err == nil {
t.Fatalf("ConfigMap not deleted")
return
}
})
resourcesDeleteNamespace, err := c.callTool("resources_delete", map[string]interface{}{"apiVersion": "v1", "kind": "Namespace", "name": "ns-to-delete"})
t.Run("resources_delete with valid namespaced resource returns success", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if resourcesDeleteNamespace.IsError {
t.Fatalf("call tool failed")
return
}
if resourcesDeleteNamespace.Content[0].(mcp.TextContent).Text != "Resource deleted successfully" {
t.Fatalf("invalid tool result content got: %v", resourcesDeleteNamespace.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("resources_delete with valid namespaced resource deletes Namespace", func(t *testing.T) {
ns, err := client.CoreV1().Namespaces().Get(c.ctx, "ns-to-delete", metav1.GetOptions{})
if err == nil && ns != nil && ns.DeletionTimestamp == nil {
t.Fatalf("Namespace not deleted")
return
}
})
})
}
func TestResourcesDeleteDenied(t *testing.T) {
deniedResourcesServer := test.Must(config.ReadToml([]byte(`
denied_resources = [
{ version = "v1", kind = "Secret" },
{ group = "rbac.authorization.k8s.io", version = "v1" }
]
`)))
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
c.withEnvTest()
kc := c.newKubernetesClient()
_, _ = kc.CoreV1().ConfigMaps("default").Create(c.ctx, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: "allowed-configmap-to-delete"},
}, metav1.CreateOptions{})
deniedByKind, _ := c.callTool("resources_delete", map[string]interface{}{"apiVersion": "v1", "kind": "Secret", "namespace": "default", "name": "denied-secret"})
t.Run("resources_delete (denied by kind) has error", func(t *testing.T) {
if !deniedByKind.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("resources_delete (denied by kind) describes denial", func(t *testing.T) {
expectedMessage := "failed to delete resource: resource not allowed: /v1, Kind=Secret"
if deniedByKind.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, deniedByKind.Content[0].(mcp.TextContent).Text)
}
})
deniedByGroup, _ := c.callTool("resources_delete", map[string]interface{}{"apiVersion": "rbac.authorization.k8s.io/v1", "kind": "Role", "namespace": "default", "name": "denied-role"})
t.Run("resources_delete (denied by group) has error", func(t *testing.T) {
if !deniedByGroup.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("resources_delete (denied by group) describes denial", func(t *testing.T) {
expectedMessage := "failed to delete resource: resource not allowed: rbac.authorization.k8s.io/v1, Kind=Role"
if deniedByGroup.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, deniedByKind.Content[0].(mcp.TextContent).Text)
}
})
allowedResource, _ := c.callTool("resources_delete", map[string]interface{}{"apiVersion": "v1", "kind": "ConfigMap", "name": "allowed-configmap-to-delete"})
t.Run("resources_delete (not denied) deletes resource", func(t *testing.T) {
if allowedResource.IsError {
t.Fatalf("call tool should not fail")
}
})
})
}
```
--------------------------------------------------------------------------------
/pkg/mcp/pods_test.go:
--------------------------------------------------------------------------------
```go
package mcp
import (
"regexp"
"strings"
"testing"
"github.com/containers/kubernetes-mcp-server/internal/test"
"github.com/containers/kubernetes-mcp-server/pkg/config"
"github.com/containers/kubernetes-mcp-server/pkg/output"
"github.com/mark3labs/mcp-go/mcp"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"sigs.k8s.io/yaml"
)
func TestPodsListInAllNamespaces(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
toolResult, err := c.callTool("pods_list", map[string]interface{}{})
t.Run("pods_list returns pods list", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
}
if toolResult.IsError {
t.Fatalf("call tool failed")
}
})
var decoded []unstructured.Unstructured
err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded)
t.Run("pods_list has yaml content", func(t *testing.T) {
if err != nil {
t.Fatalf("invalid tool result content %v", err)
}
})
t.Run("pods_list returns 3 items", func(t *testing.T) {
if len(decoded) != 3 {
t.Fatalf("invalid pods count, expected 3, got %v", len(decoded))
}
})
t.Run("pods_list returns pod in ns-1", func(t *testing.T) {
if decoded[1].GetName() != "a-pod-in-ns-1" {
t.Fatalf("invalid pod name, expected a-pod-in-ns-1, got %v", decoded[1].GetName())
}
if decoded[1].GetNamespace() != "ns-1" {
t.Fatalf("invalid pod namespace, expected ns-1, got %v", decoded[1].GetNamespace())
}
})
t.Run("pods_list returns pod in ns-2", func(t *testing.T) {
if decoded[2].GetName() != "a-pod-in-ns-2" {
t.Fatalf("invalid pod name, expected a-pod-in-ns-2, got %v", decoded[2].GetName())
}
if decoded[2].GetNamespace() != "ns-2" {
t.Fatalf("invalid pod namespace, expected ns-2, got %v", decoded[2].GetNamespace())
}
})
t.Run("pods_list omits managed fields", func(t *testing.T) {
if decoded[1].GetManagedFields() != nil {
t.Fatalf("managed fields should be omitted, got %v", decoded[0].GetManagedFields())
}
})
})
}
func TestPodsListInAllNamespacesUnauthorized(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
defer restoreAuth(c.ctx)
client := c.newKubernetesClient()
// Authorize user only for default/configured namespace
r, _ := client.RbacV1().Roles("default").Create(c.ctx, &rbacv1.Role{
ObjectMeta: metav1.ObjectMeta{Name: "allow-pods-list"},
Rules: []rbacv1.PolicyRule{{
Verbs: []string{"get", "list"},
APIGroups: []string{""},
Resources: []string{"pods"},
}},
}, metav1.CreateOptions{})
_, _ = client.RbacV1().RoleBindings("default").Create(c.ctx, &rbacv1.RoleBinding{
ObjectMeta: metav1.ObjectMeta{Name: "allow-pods-list"},
Subjects: []rbacv1.Subject{{Kind: "User", Name: envTestUser.Name}},
RoleRef: rbacv1.RoleRef{Kind: "Role", Name: r.Name},
}, metav1.CreateOptions{})
// Deny cluster by removing cluster rule
_ = client.RbacV1().ClusterRoles().Delete(c.ctx, "allow-all", metav1.DeleteOptions{})
toolResult, err := c.callTool("pods_list", map[string]interface{}{})
t.Run("pods_list returns pods list for default namespace only", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if toolResult.IsError {
t.Fatalf("call tool failed %v", toolResult.Content)
return
}
})
var decoded []unstructured.Unstructured
err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded)
t.Run("pods_list has yaml content", func(t *testing.T) {
if err != nil {
t.Fatalf("invalid tool result content %v", err)
return
}
})
t.Run("pods_list returns 1 items", func(t *testing.T) {
if len(decoded) != 1 {
t.Fatalf("invalid pods count, expected 1, got %v", len(decoded))
return
}
})
t.Run("pods_list returns pod in default", func(t *testing.T) {
if decoded[0].GetName() != "a-pod-in-default" {
t.Fatalf("invalid pod name, expected a-pod-in-default, got %v", decoded[0].GetName())
return
}
if decoded[0].GetNamespace() != "default" {
t.Fatalf("invalid pod namespace, expected default, got %v", decoded[0].GetNamespace())
return
}
})
})
}
func TestPodsListInNamespace(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
t.Run("pods_list_in_namespace with nil namespace returns error", func(t *testing.T) {
toolResult, _ := c.callTool("pods_list_in_namespace", map[string]interface{}{})
if toolResult.IsError != true {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to list pods in namespace, missing argument namespace" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
toolResult, err := c.callTool("pods_list_in_namespace", map[string]interface{}{
"namespace": "ns-1",
})
t.Run("pods_list_in_namespace returns pods list", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
}
if toolResult.IsError {
t.Fatalf("call tool failed")
}
})
var decoded []unstructured.Unstructured
err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded)
t.Run("pods_list_in_namespace has yaml content", func(t *testing.T) {
if err != nil {
t.Fatalf("invalid tool result content %v", err)
}
})
t.Run("pods_list_in_namespace returns 1 items", func(t *testing.T) {
if len(decoded) != 1 {
t.Fatalf("invalid pods count, expected 1, got %v", len(decoded))
}
})
t.Run("pods_list_in_namespace returns pod in ns-1", func(t *testing.T) {
if decoded[0].GetName() != "a-pod-in-ns-1" {
t.Errorf("invalid pod name, expected a-pod-in-ns-1, got %v", decoded[0].GetName())
}
if decoded[0].GetNamespace() != "ns-1" {
t.Errorf("invalid pod namespace, expected ns-1, got %v", decoded[0].GetNamespace())
}
})
t.Run("pods_list_in_namespace omits managed fields", func(t *testing.T) {
if decoded[0].GetManagedFields() != nil {
t.Fatalf("managed fields should be omitted, got %v", decoded[0].GetManagedFields())
}
})
})
}
func TestPodsListDenied(t *testing.T) {
deniedResourcesServer := test.Must(config.ReadToml([]byte(`
denied_resources = [ { version = "v1", kind = "Pod" } ]
`)))
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
c.withEnvTest()
podsList, _ := c.callTool("pods_list", map[string]interface{}{})
t.Run("pods_list has error", func(t *testing.T) {
if !podsList.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("pods_list describes denial", func(t *testing.T) {
expectedMessage := "failed to list pods in all namespaces: resource not allowed: /v1, Kind=Pod"
if podsList.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, podsList.Content[0].(mcp.TextContent).Text)
}
})
podsListInNamespace, _ := c.callTool("pods_list_in_namespace", map[string]interface{}{"namespace": "ns-1"})
t.Run("pods_list_in_namespace has error", func(t *testing.T) {
if !podsListInNamespace.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("pods_list_in_namespace describes denial", func(t *testing.T) {
expectedMessage := "failed to list pods in namespace ns-1: resource not allowed: /v1, Kind=Pod"
if podsListInNamespace.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, podsListInNamespace.Content[0].(mcp.TextContent).Text)
}
})
})
}
func TestPodsListAsTable(t *testing.T) {
testCaseWithContext(t, &mcpContext{listOutput: output.Table}, func(c *mcpContext) {
c.withEnvTest()
podsList, err := c.callTool("pods_list", map[string]interface{}{})
t.Run("pods_list returns pods list", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
}
if podsList.IsError {
t.Fatalf("call tool failed")
}
})
outPodsList := podsList.Content[0].(mcp.TextContent).Text
t.Run("pods_list returns table with 1 header and 3 rows", func(t *testing.T) {
lines := strings.Count(outPodsList, "\n")
if lines != 4 {
t.Fatalf("invalid line count, expected 4 (1 header, 3 row), got %v", lines)
}
})
t.Run("pods_list_in_namespace returns column headers", func(t *testing.T) {
expectedHeaders := "NAMESPACE\\s+APIVERSION\\s+KIND\\s+NAME\\s+READY\\s+STATUS\\s+RESTARTS\\s+AGE\\s+IP\\s+NODE\\s+NOMINATED NODE\\s+READINESS GATES\\s+LABELS"
if m, e := regexp.MatchString(expectedHeaders, outPodsList); !m || e != nil {
t.Fatalf("Expected headers '%s' not found in output:\n%s", expectedHeaders, outPodsList)
}
})
t.Run("pods_list_in_namespace returns formatted row for a-pod-in-ns-1", func(t *testing.T) {
expectedRow := "(?<namespace>ns-1)\\s+" +
"(?<apiVersion>v1)\\s+" +
"(?<kind>Pod)\\s+" +
"(?<name>a-pod-in-ns-1)\\s+" +
"(?<ready>0\\/1)\\s+" +
"(?<status>Pending)\\s+" +
"(?<restarts>0)\\s+" +
"(?<age>(\\d+m)?(\\d+s)?)\\s+" +
"(?<ip><none>)\\s+" +
"(?<node><none>)\\s+" +
"(?<nominated_node><none>)\\s+" +
"(?<readiness_gates><none>)\\s+" +
"(?<labels><none>)"
if m, e := regexp.MatchString(expectedRow, outPodsList); !m || e != nil {
t.Fatalf("Expected row '%s' not found in output:\n%s", expectedRow, outPodsList)
}
})
t.Run("pods_list_in_namespace returns formatted row for a-pod-in-default", func(t *testing.T) {
expectedRow := "(?<namespace>default)\\s+" +
"(?<apiVersion>v1)\\s+" +
"(?<kind>Pod)\\s+" +
"(?<name>a-pod-in-default)\\s+" +
"(?<ready>0\\/1)\\s+" +
"(?<status>Pending)\\s+" +
"(?<restarts>0)\\s+" +
"(?<age>(\\d+m)?(\\d+s)?)\\s+" +
"(?<ip><none>)\\s+" +
"(?<node><none>)\\s+" +
"(?<nominated_node><none>)\\s+" +
"(?<readiness_gates><none>)\\s+" +
"(?<labels>app=nginx)"
if m, e := regexp.MatchString(expectedRow, outPodsList); !m || e != nil {
t.Fatalf("Expected row '%s' not found in output:\n%s", expectedRow, outPodsList)
}
})
podsListInNamespace, err := c.callTool("pods_list_in_namespace", map[string]interface{}{
"namespace": "ns-1",
})
t.Run("pods_list_in_namespace returns pods list", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if podsListInNamespace.IsError {
t.Fatalf("call tool failed")
}
})
outPodsListInNamespace := podsListInNamespace.Content[0].(mcp.TextContent).Text
t.Run("pods_list_in_namespace returns table with 1 header and 1 row", func(t *testing.T) {
lines := strings.Count(outPodsListInNamespace, "\n")
if lines != 2 {
t.Fatalf("invalid line count, expected 2 (1 header, 1 row), got %v", lines)
}
})
t.Run("pods_list_in_namespace returns column headers", func(t *testing.T) {
expectedHeaders := "NAMESPACE\\s+APIVERSION\\s+KIND\\s+NAME\\s+READY\\s+STATUS\\s+RESTARTS\\s+AGE\\s+IP\\s+NODE\\s+NOMINATED NODE\\s+READINESS GATES\\s+LABELS"
if m, e := regexp.MatchString(expectedHeaders, outPodsListInNamespace); !m || e != nil {
t.Fatalf("Expected headers '%s' not found in output:\n%s", expectedHeaders, outPodsListInNamespace)
}
})
t.Run("pods_list_in_namespace returns formatted row", func(t *testing.T) {
expectedRow := "(?<namespace>ns-1)\\s+" +
"(?<apiVersion>v1)\\s+" +
"(?<kind>Pod)\\s+" +
"(?<name>a-pod-in-ns-1)\\s+" +
"(?<ready>0\\/1)\\s+" +
"(?<status>Pending)\\s+" +
"(?<restarts>0)\\s+" +
"(?<age>(\\d+m)?(\\d+s)?)\\s+" +
"(?<ip><none>)\\s+" +
"(?<node><none>)\\s+" +
"(?<nominated_node><none>)\\s+" +
"(?<readiness_gates><none>)\\s+" +
"(?<labels><none>)"
if m, e := regexp.MatchString(expectedRow, outPodsListInNamespace); !m || e != nil {
t.Fatalf("Expected row '%s' not found in output:\n%s", expectedRow, outPodsListInNamespace)
}
})
})
}
func TestPodsGet(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
t.Run("pods_get with nil name returns error", func(t *testing.T) {
toolResult, _ := c.callTool("pods_get", map[string]interface{}{})
if toolResult.IsError != true {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to get pod, missing argument name" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("pods_get with not found name returns error", func(t *testing.T) {
toolResult, _ := c.callTool("pods_get", map[string]interface{}{"name": "not-found"})
if toolResult.IsError != true {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to get pod not-found in namespace : pods \"not-found\" not found" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
podsGetNilNamespace, err := c.callTool("pods_get", map[string]interface{}{
"name": "a-pod-in-default",
})
t.Run("pods_get with name and nil namespace returns pod", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if podsGetNilNamespace.IsError {
t.Fatalf("call tool failed")
return
}
})
var decodedNilNamespace unstructured.Unstructured
err = yaml.Unmarshal([]byte(podsGetNilNamespace.Content[0].(mcp.TextContent).Text), &decodedNilNamespace)
t.Run("pods_get with name and nil namespace has yaml content", func(t *testing.T) {
if err != nil {
t.Fatalf("invalid tool result content %v", err)
return
}
})
t.Run("pods_get with name and nil namespace returns pod in default", func(t *testing.T) {
if decodedNilNamespace.GetName() != "a-pod-in-default" {
t.Fatalf("invalid pod name, expected a-pod-in-default, got %v", decodedNilNamespace.GetName())
return
}
if decodedNilNamespace.GetNamespace() != "default" {
t.Fatalf("invalid pod namespace, expected default, got %v", decodedNilNamespace.GetNamespace())
return
}
})
t.Run("pods_get with name and nil namespace omits managed fields", func(t *testing.T) {
if decodedNilNamespace.GetManagedFields() != nil {
t.Fatalf("managed fields should be omitted, got %v", decodedNilNamespace.GetManagedFields())
return
}
})
podsGetInNamespace, err := c.callTool("pods_get", map[string]interface{}{
"namespace": "ns-1",
"name": "a-pod-in-ns-1",
})
t.Run("pods_get with name and namespace returns pod", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if podsGetInNamespace.IsError {
t.Fatalf("call tool failed")
return
}
})
var decodedInNamespace unstructured.Unstructured
err = yaml.Unmarshal([]byte(podsGetInNamespace.Content[0].(mcp.TextContent).Text), &decodedInNamespace)
t.Run("pods_get with name and namespace has yaml content", func(t *testing.T) {
if err != nil {
t.Fatalf("invalid tool result content %v", err)
return
}
})
t.Run("pods_get with name and namespace returns pod in ns-1", func(t *testing.T) {
if decodedInNamespace.GetName() != "a-pod-in-ns-1" {
t.Fatalf("invalid pod name, expected a-pod-in-ns-1, got %v", decodedInNamespace.GetName())
return
}
if decodedInNamespace.GetNamespace() != "ns-1" {
t.Fatalf("invalid pod namespace, ns-1 ns-1, got %v", decodedInNamespace.GetNamespace())
return
}
})
})
}
func TestPodsGetDenied(t *testing.T) {
deniedResourcesServer := test.Must(config.ReadToml([]byte(`
denied_resources = [ { version = "v1", kind = "Pod" } ]
`)))
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
c.withEnvTest()
podsGet, _ := c.callTool("pods_get", map[string]interface{}{"name": "a-pod-in-default"})
t.Run("pods_get has error", func(t *testing.T) {
if !podsGet.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("pods_get describes denial", func(t *testing.T) {
expectedMessage := "failed to get pod a-pod-in-default in namespace : resource not allowed: /v1, Kind=Pod"
if podsGet.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, podsGet.Content[0].(mcp.TextContent).Text)
}
})
})
}
func TestPodsDelete(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
// Errors
t.Run("pods_delete with nil name returns error", func(t *testing.T) {
toolResult, _ := c.callTool("pods_delete", map[string]interface{}{})
if toolResult.IsError != true {
t.Errorf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to delete pod, missing argument name" {
t.Errorf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("pods_delete with not found name returns error", func(t *testing.T) {
toolResult, _ := c.callTool("pods_delete", map[string]interface{}{"name": "not-found"})
if toolResult.IsError != true {
t.Errorf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to delete pod not-found in namespace : pods \"not-found\" not found" {
t.Errorf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
// Default/nil Namespace
kc := c.newKubernetesClient()
_, _ = kc.CoreV1().Pods("default").Create(c.ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "a-pod-to-delete"},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}}},
}, metav1.CreateOptions{})
podsDeleteNilNamespace, err := c.callTool("pods_delete", map[string]interface{}{
"name": "a-pod-to-delete",
})
t.Run("pods_delete with name and nil namespace returns success", func(t *testing.T) {
if err != nil {
t.Errorf("call tool failed %v", err)
return
}
if podsDeleteNilNamespace.IsError {
t.Errorf("call tool failed")
return
}
if podsDeleteNilNamespace.Content[0].(mcp.TextContent).Text != "Pod deleted successfully" {
t.Errorf("invalid tool result content, got %v", podsDeleteNilNamespace.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("pods_delete with name and nil namespace deletes Pod", func(t *testing.T) {
p, pErr := kc.CoreV1().Pods("default").Get(c.ctx, "a-pod-to-delete", metav1.GetOptions{})
if pErr == nil && p != nil && p.DeletionTimestamp == nil {
t.Errorf("Pod not deleted")
return
}
})
// Provided Namespace
_, _ = kc.CoreV1().Pods("ns-1").Create(c.ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "a-pod-to-delete-in-ns-1"},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}}},
}, metav1.CreateOptions{})
podsDeleteInNamespace, err := c.callTool("pods_delete", map[string]interface{}{
"namespace": "ns-1",
"name": "a-pod-to-delete-in-ns-1",
})
t.Run("pods_delete with name and namespace returns success", func(t *testing.T) {
if err != nil {
t.Errorf("call tool failed %v", err)
return
}
if podsDeleteInNamespace.IsError {
t.Errorf("call tool failed")
return
}
if podsDeleteInNamespace.Content[0].(mcp.TextContent).Text != "Pod deleted successfully" {
t.Errorf("invalid tool result content, got %v", podsDeleteInNamespace.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("pods_delete with name and namespace deletes Pod", func(t *testing.T) {
p, pErr := kc.CoreV1().Pods("ns-1").Get(c.ctx, "a-pod-to-delete-in-ns-1", metav1.GetOptions{})
if pErr == nil && p != nil && p.DeletionTimestamp == nil {
t.Errorf("Pod not deleted")
return
}
})
// Managed Pod
managedLabels := map[string]string{
"app.kubernetes.io/managed-by": "kubernetes-mcp-server",
"app.kubernetes.io/name": "a-manged-pod-to-delete",
}
_, _ = kc.CoreV1().Pods("default").Create(c.ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "a-managed-pod-to-delete", Labels: managedLabels},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}}},
}, metav1.CreateOptions{})
_, _ = kc.CoreV1().Services("default").Create(c.ctx, &corev1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "a-managed-service-to-delete", Labels: managedLabels},
Spec: corev1.ServiceSpec{Selector: managedLabels, Ports: []corev1.ServicePort{{Port: 80}}},
}, metav1.CreateOptions{})
podsDeleteManaged, err := c.callTool("pods_delete", map[string]interface{}{
"name": "a-managed-pod-to-delete",
})
t.Run("pods_delete with managed pod returns success", func(t *testing.T) {
if err != nil {
t.Errorf("call tool failed %v", err)
return
}
if podsDeleteManaged.IsError {
t.Errorf("call tool failed")
return
}
if podsDeleteManaged.Content[0].(mcp.TextContent).Text != "Pod deleted successfully" {
t.Errorf("invalid tool result content, got %v", podsDeleteManaged.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("pods_delete with managed pod deletes Pod and Service", func(t *testing.T) {
p, pErr := kc.CoreV1().Pods("default").Get(c.ctx, "a-managed-pod-to-delete", metav1.GetOptions{})
if pErr == nil && p != nil && p.DeletionTimestamp == nil {
t.Errorf("Pod not deleted")
return
}
s, sErr := kc.CoreV1().Services("default").Get(c.ctx, "a-managed-service-to-delete", metav1.GetOptions{})
if sErr == nil && s != nil && s.DeletionTimestamp == nil {
t.Errorf("Service not deleted")
return
}
})
})
}
func TestPodsDeleteDenied(t *testing.T) {
deniedResourcesServer := test.Must(config.ReadToml([]byte(`
denied_resources = [ { version = "v1", kind = "Pod" } ]
`)))
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
c.withEnvTest()
podsDelete, _ := c.callTool("pods_delete", map[string]interface{}{"name": "a-pod-in-default"})
t.Run("pods_delete has error", func(t *testing.T) {
if !podsDelete.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("pods_delete describes denial", func(t *testing.T) {
expectedMessage := "failed to delete pod a-pod-in-default in namespace : resource not allowed: /v1, Kind=Pod"
if podsDelete.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, podsDelete.Content[0].(mcp.TextContent).Text)
}
})
})
}
func TestPodsDeleteInOpenShift(t *testing.T) {
testCaseWithContext(t, &mcpContext{before: inOpenShift, after: inOpenShiftClear}, func(c *mcpContext) {
managedLabels := map[string]string{
"app.kubernetes.io/managed-by": "kubernetes-mcp-server",
"app.kubernetes.io/name": "a-manged-pod-to-delete",
}
kc := c.newKubernetesClient()
_, _ = kc.CoreV1().Pods("default").Create(c.ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "a-managed-pod-to-delete-in-openshift", Labels: managedLabels},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}}},
}, metav1.CreateOptions{})
dynamicClient := dynamic.NewForConfigOrDie(envTestRestConfig)
_, _ = dynamicClient.Resource(schema.GroupVersionResource{Group: "route.openshift.io", Version: "v1", Resource: "routes"}).
Namespace("default").Create(c.ctx, &unstructured.Unstructured{Object: map[string]interface{}{
"apiVersion": "route.openshift.io/v1",
"kind": "Route",
"metadata": map[string]interface{}{
"name": "a-managed-route-to-delete",
"labels": managedLabels,
},
}}, metav1.CreateOptions{})
podsDeleteManagedOpenShift, err := c.callTool("pods_delete", map[string]interface{}{
"name": "a-managed-pod-to-delete-in-openshift",
})
t.Run("pods_delete with managed pod in OpenShift returns success", func(t *testing.T) {
if err != nil {
t.Errorf("call tool failed %v", err)
return
}
if podsDeleteManagedOpenShift.IsError {
t.Errorf("call tool failed")
return
}
if podsDeleteManagedOpenShift.Content[0].(mcp.TextContent).Text != "Pod deleted successfully" {
t.Errorf("invalid tool result content, got %v", podsDeleteManagedOpenShift.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("pods_delete with managed pod in OpenShift deletes Pod and Route", func(t *testing.T) {
p, pErr := kc.CoreV1().Pods("default").Get(c.ctx, "a-managed-pod-to-delete-in-openshift", metav1.GetOptions{})
if pErr == nil && p != nil && p.DeletionTimestamp == nil {
t.Errorf("Pod not deleted")
return
}
r, rErr := dynamicClient.
Resource(schema.GroupVersionResource{Group: "route.openshift.io", Version: "v1", Resource: "routes"}).
Namespace("default").Get(c.ctx, "a-managed-route-to-delete", metav1.GetOptions{})
if rErr == nil && r != nil && r.GetDeletionTimestamp() == nil {
t.Errorf("Route not deleted")
return
}
})
})
}
func TestPodsLog(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
t.Run("pods_log with nil name returns error", func(t *testing.T) {
toolResult, _ := c.callTool("pods_log", map[string]interface{}{})
if toolResult.IsError != true {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to get pod log, missing argument name" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("pods_log with not found name returns error", func(t *testing.T) {
toolResult, _ := c.callTool("pods_log", map[string]interface{}{"name": "not-found"})
if toolResult.IsError != true {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to get pod not-found log in namespace : pods \"not-found\" not found" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
podsLogNilNamespace, err := c.callTool("pods_log", map[string]interface{}{
"name": "a-pod-in-default",
})
t.Run("pods_log with name and nil namespace returns pod log", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if podsLogNilNamespace.IsError {
t.Fatalf("call tool failed")
return
}
})
podsLogInNamespace, err := c.callTool("pods_log", map[string]interface{}{
"namespace": "ns-1",
"name": "a-pod-in-ns-1",
})
t.Run("pods_log with name and namespace returns pod log", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if podsLogInNamespace.IsError {
t.Fatalf("call tool failed")
return
}
})
podsContainerLogInNamespace, err := c.callTool("pods_log", map[string]interface{}{
"namespace": "ns-1",
"name": "a-pod-in-ns-1",
"container": "nginx",
})
t.Run("pods_log with name, container and namespace returns pod log", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if podsContainerLogInNamespace.IsError {
t.Fatalf("call tool failed")
return
}
})
toolResult, err := c.callTool("pods_log", map[string]interface{}{
"namespace": "ns-1",
"name": "a-pod-in-ns-1",
"container": "a-not-existing-container",
})
t.Run("pods_log with non existing container returns error", func(t *testing.T) {
if toolResult.IsError != true {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to get pod a-pod-in-ns-1 log in namespace ns-1: container a-not-existing-container is not valid for pod a-pod-in-ns-1" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
podsPreviousLogInNamespace, err := c.callTool("pods_log", map[string]interface{}{
"namespace": "ns-1",
"name": "a-pod-in-ns-1",
"previous": true,
})
t.Run("pods_log with previous=true returns previous pod log", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if podsPreviousLogInNamespace.IsError {
t.Fatalf("call tool failed")
return
}
})
podsPreviousLogFalse, err := c.callTool("pods_log", map[string]interface{}{
"namespace": "ns-1",
"name": "a-pod-in-ns-1",
"previous": false,
})
t.Run("pods_log with previous=false returns current pod log", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if podsPreviousLogFalse.IsError {
t.Fatalf("call tool failed")
return
}
})
// Test with tail parameter
podsTailLines, err := c.callTool("pods_log", map[string]interface{}{
"namespace": "ns-1",
"name": "a-pod-in-ns-1",
"tail": 50,
})
t.Run("pods_log with tail=50 returns pod log", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if podsTailLines.IsError {
t.Fatalf("call tool failed")
return
}
})
// Test with invalid tail parameter
podsInvalidTailLines, _ := c.callTool("pods_log", map[string]interface{}{
"namespace": "ns-1",
"name": "a-pod-in-ns-1",
"tail": "invalid",
})
t.Run("pods_log with invalid tail returns error", func(t *testing.T) {
if !podsInvalidTailLines.IsError {
t.Fatalf("call tool should fail")
return
}
expectedErrorMsg := "failed to parse tail parameter: expected integer"
if errMsg := podsInvalidTailLines.Content[0].(mcp.TextContent).Text; !strings.Contains(errMsg, expectedErrorMsg) {
t.Fatalf("unexpected error message, expected to contain '%s', got '%s'", expectedErrorMsg, errMsg)
return
}
})
})
}
func TestPodsLogDenied(t *testing.T) {
deniedResourcesServer := test.Must(config.ReadToml([]byte(`
denied_resources = [ { version = "v1", kind = "Pod" } ]
`)))
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
c.withEnvTest()
podsLog, _ := c.callTool("pods_log", map[string]interface{}{"name": "a-pod-in-default"})
t.Run("pods_log has error", func(t *testing.T) {
if !podsLog.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("pods_log describes denial", func(t *testing.T) {
expectedMessage := "failed to get pod a-pod-in-default log in namespace : resource not allowed: /v1, Kind=Pod"
if podsLog.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, podsLog.Content[0].(mcp.TextContent).Text)
}
})
})
}
func TestPodsRun(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
t.Run("pods_run with nil image returns error", func(t *testing.T) {
toolResult, _ := c.callTool("pods_run", map[string]interface{}{})
if toolResult.IsError != true {
t.Errorf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to run pod, missing argument image" {
t.Errorf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
podsRunNilNamespace, err := c.callTool("pods_run", map[string]interface{}{"image": "nginx"})
t.Run("pods_run with image and nil namespace runs pod", func(t *testing.T) {
if err != nil {
t.Errorf("call tool failed %v", err)
return
}
if podsRunNilNamespace.IsError {
t.Errorf("call tool failed")
return
}
})
var decodedNilNamespace []unstructured.Unstructured
err = yaml.Unmarshal([]byte(podsRunNilNamespace.Content[0].(mcp.TextContent).Text), &decodedNilNamespace)
t.Run("pods_run with image and nil namespace has yaml content", func(t *testing.T) {
if err != nil {
t.Errorf("invalid tool result content %v", err)
return
}
})
t.Run("pods_run with image and nil namespace returns 1 item (Pod)", func(t *testing.T) {
if len(decodedNilNamespace) != 1 {
t.Errorf("invalid pods count, expected 1, got %v", len(decodedNilNamespace))
return
}
if decodedNilNamespace[0].GetKind() != "Pod" {
t.Errorf("invalid pod kind, expected Pod, got %v", decodedNilNamespace[0].GetKind())
return
}
})
t.Run("pods_run with image and nil namespace returns pod in default", func(t *testing.T) {
if decodedNilNamespace[0].GetNamespace() != "default" {
t.Errorf("invalid pod namespace, expected default, got %v", decodedNilNamespace[0].GetNamespace())
return
}
})
t.Run("pods_run with image and nil namespace returns pod with random name", func(t *testing.T) {
if !strings.HasPrefix(decodedNilNamespace[0].GetName(), "kubernetes-mcp-server-run-") {
t.Errorf("invalid pod name, expected random, got %v", decodedNilNamespace[0].GetName())
return
}
})
t.Run("pods_run with image and nil namespace returns pod with labels", func(t *testing.T) {
labels := decodedNilNamespace[0].Object["metadata"].(map[string]interface{})["labels"].(map[string]interface{})
if labels["app.kubernetes.io/name"] == "" {
t.Errorf("invalid labels, expected app.kubernetes.io/name, got %v", labels)
return
}
if labels["app.kubernetes.io/component"] == "" {
t.Errorf("invalid labels, expected app.kubernetes.io/component, got %v", labels)
return
}
if labels["app.kubernetes.io/managed-by"] != "kubernetes-mcp-server" {
t.Errorf("invalid labels, expected app.kubernetes.io/managed-by, got %v", labels)
return
}
if labels["app.kubernetes.io/part-of"] != "kubernetes-mcp-server-run-sandbox" {
t.Errorf("invalid labels, expected app.kubernetes.io/part-of, got %v", labels)
return
}
})
t.Run("pods_run with image and nil namespace returns pod with nginx container", func(t *testing.T) {
containers := decodedNilNamespace[0].Object["spec"].(map[string]interface{})["containers"].([]interface{})
if containers[0].(map[string]interface{})["image"] != "nginx" {
t.Errorf("invalid container name, expected nginx, got %v", containers[0].(map[string]interface{})["image"])
return
}
})
podsRunNamespaceAndPort, err := c.callTool("pods_run", map[string]interface{}{"image": "nginx", "port": 80})
t.Run("pods_run with image, namespace, and port runs pod", func(t *testing.T) {
if err != nil {
t.Errorf("call tool failed %v", err)
return
}
if podsRunNamespaceAndPort.IsError {
t.Errorf("call tool failed")
return
}
})
var decodedNamespaceAndPort []unstructured.Unstructured
err = yaml.Unmarshal([]byte(podsRunNamespaceAndPort.Content[0].(mcp.TextContent).Text), &decodedNamespaceAndPort)
t.Run("pods_run with image, namespace, and port has yaml content", func(t *testing.T) {
if err != nil {
t.Errorf("invalid tool result content %v", err)
return
}
})
t.Run("pods_run with image, namespace, and port returns 2 items (Pod + Service)", func(t *testing.T) {
if len(decodedNamespaceAndPort) != 2 {
t.Errorf("invalid pods count, expected 2, got %v", len(decodedNamespaceAndPort))
return
}
if decodedNamespaceAndPort[0].GetKind() != "Pod" {
t.Errorf("invalid pod kind, expected Pod, got %v", decodedNamespaceAndPort[0].GetKind())
return
}
if decodedNamespaceAndPort[1].GetKind() != "Service" {
t.Errorf("invalid service kind, expected Service, got %v", decodedNamespaceAndPort[1].GetKind())
return
}
})
t.Run("pods_run with image, namespace, and port returns pod with port", func(t *testing.T) {
containers := decodedNamespaceAndPort[0].Object["spec"].(map[string]interface{})["containers"].([]interface{})
ports := containers[0].(map[string]interface{})["ports"].([]interface{})
if ports[0].(map[string]interface{})["containerPort"] != int64(80) {
t.Errorf("invalid container port, expected 80, got %v", ports[0].(map[string]interface{})["containerPort"])
return
}
})
t.Run("pods_run with image, namespace, and port returns service with port and selector", func(t *testing.T) {
ports := decodedNamespaceAndPort[1].Object["spec"].(map[string]interface{})["ports"].([]interface{})
if ports[0].(map[string]interface{})["port"] != int64(80) {
t.Errorf("invalid service port, expected 80, got %v", ports[0].(map[string]interface{})["port"])
return
}
if ports[0].(map[string]interface{})["targetPort"] != int64(80) {
t.Errorf("invalid service target port, expected 80, got %v", ports[0].(map[string]interface{})["targetPort"])
return
}
selector := decodedNamespaceAndPort[1].Object["spec"].(map[string]interface{})["selector"].(map[string]interface{})
if selector["app.kubernetes.io/name"] == "" {
t.Errorf("invalid service selector, expected app.kubernetes.io/name, got %v", selector)
return
}
if selector["app.kubernetes.io/managed-by"] != "kubernetes-mcp-server" {
t.Errorf("invalid service selector, expected app.kubernetes.io/managed-by, got %v", selector)
return
}
if selector["app.kubernetes.io/part-of"] != "kubernetes-mcp-server-run-sandbox" {
t.Errorf("invalid service selector, expected app.kubernetes.io/part-of, got %v", selector)
return
}
})
})
}
func TestPodsRunDenied(t *testing.T) {
deniedResourcesServer := test.Must(config.ReadToml([]byte(`
denied_resources = [ { version = "v1", kind = "Pod" } ]
`)))
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
c.withEnvTest()
podsRun, _ := c.callTool("pods_run", map[string]interface{}{"image": "nginx"})
t.Run("pods_run has error", func(t *testing.T) {
if !podsRun.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("pods_run describes denial", func(t *testing.T) {
expectedMessage := "failed to run pod in namespace : resource not allowed: /v1, Kind=Pod"
if podsRun.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, podsRun.Content[0].(mcp.TextContent).Text)
}
})
})
}
func TestPodsRunInOpenShift(t *testing.T) {
testCaseWithContext(t, &mcpContext{before: inOpenShift, after: inOpenShiftClear}, func(c *mcpContext) {
t.Run("pods_run with image, namespace, and port returns route with port", func(t *testing.T) {
podsRunInOpenShift, err := c.callTool("pods_run", map[string]interface{}{"image": "nginx", "port": 80})
if err != nil {
t.Errorf("call tool failed %v", err)
return
}
if podsRunInOpenShift.IsError {
t.Errorf("call tool failed")
return
}
var decodedPodServiceRoute []unstructured.Unstructured
err = yaml.Unmarshal([]byte(podsRunInOpenShift.Content[0].(mcp.TextContent).Text), &decodedPodServiceRoute)
if err != nil {
t.Errorf("invalid tool result content %v", err)
return
}
if len(decodedPodServiceRoute) != 3 {
t.Errorf("invalid pods count, expected 3, got %v", len(decodedPodServiceRoute))
return
}
if decodedPodServiceRoute[2].GetKind() != "Route" {
t.Errorf("invalid route kind, expected Route, got %v", decodedPodServiceRoute[2].GetKind())
return
}
targetPort := decodedPodServiceRoute[2].Object["spec"].(map[string]interface{})["port"].(map[string]interface{})["targetPort"].(int64)
if targetPort != 80 {
t.Errorf("invalid route target port, expected 80, got %v", targetPort)
return
}
})
})
}
func TestPodsListWithLabelSelector(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
kc := c.newKubernetesClient()
// Create pods with labels
_, _ = kc.CoreV1().Pods("default").Create(c.ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-with-labels",
Labels: map[string]string{"app": "test", "env": "dev"},
},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}}},
}, metav1.CreateOptions{})
_, _ = kc.CoreV1().Pods("ns-1").Create(c.ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "another-pod-with-labels",
Labels: map[string]string{"app": "test", "env": "prod"},
},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}}},
}, metav1.CreateOptions{})
// Test pods_list with label selector
t.Run("pods_list with label selector returns filtered pods", func(t *testing.T) {
toolResult, err := c.callTool("pods_list", map[string]interface{}{
"labelSelector": "app=test",
})
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if toolResult.IsError {
t.Fatalf("call tool failed")
return
}
var decoded []unstructured.Unstructured
err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded)
if err != nil {
t.Fatalf("invalid tool result content %v", err)
return
}
if len(decoded) != 2 {
t.Fatalf("invalid pods count, expected 2, got %v", len(decoded))
return
}
})
// Test pods_list_in_namespace with label selector
t.Run("pods_list_in_namespace with label selector returns filtered pods", func(t *testing.T) {
toolResult, err := c.callTool("pods_list_in_namespace", map[string]interface{}{
"namespace": "ns-1",
"labelSelector": "env=prod",
})
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if toolResult.IsError {
t.Fatalf("call tool failed")
return
}
var decoded []unstructured.Unstructured
err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded)
if err != nil {
t.Fatalf("invalid tool result content %v", err)
return
}
if len(decoded) != 1 {
t.Fatalf("invalid pods count, expected 1, got %v", len(decoded))
return
}
if decoded[0].GetName() != "another-pod-with-labels" {
t.Fatalf("invalid pod name, expected another-pod-with-labels, got %v", decoded[0].GetName())
return
}
})
// Test multiple label selectors
t.Run("pods_list with multiple label selectors returns filtered pods", func(t *testing.T) {
toolResult, err := c.callTool("pods_list", map[string]interface{}{
"labelSelector": "app=test,env=prod",
})
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if toolResult.IsError {
t.Fatalf("call tool failed")
return
}
var decoded []unstructured.Unstructured
err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded)
if err != nil {
t.Fatalf("invalid tool result content %v", err)
return
}
if len(decoded) != 1 {
t.Fatalf("invalid pods count, expected 1, got %v", len(decoded))
return
}
if decoded[0].GetName() != "another-pod-with-labels" {
t.Fatalf("invalid pod name, expected another-pod-with-labels, got %v", decoded[0].GetName())
return
}
})
})
}
```
--------------------------------------------------------------------------------
/pkg/http/http_test.go:
--------------------------------------------------------------------------------
```go
package http
import (
"bufio"
"bytes"
"context"
"crypto/rand"
"crypto/rsa"
"flag"
"fmt"
"io"
"net"
"net/http"
"net/http/httptest"
"os"
"regexp"
"strconv"
"strings"
"testing"
"time"
"github.com/containers/kubernetes-mcp-server/internal/test"
"github.com/coreos/go-oidc/v3/oidc"
"github.com/coreos/go-oidc/v3/oidc/oidctest"
"golang.org/x/sync/errgroup"
"k8s.io/klog/v2"
"k8s.io/klog/v2/textlogger"
"github.com/containers/kubernetes-mcp-server/pkg/config"
"github.com/containers/kubernetes-mcp-server/pkg/mcp"
)
type httpContext struct {
klogState klog.State
mockServer *test.MockServer
LogBuffer bytes.Buffer
HttpAddress string // HTTP server address
timeoutCancel context.CancelFunc // Release resources if test completes before the timeout
StopServer context.CancelFunc
WaitForShutdown func() error
StaticConfig *config.StaticConfig
OidcProvider *oidc.Provider
}
const tokenReviewSuccessful = `
{
"kind": "TokenReview",
"apiVersion": "authentication.k8s.io/v1",
"spec": {"token": "valid-token"},
"status": {
"authenticated": true,
"user": {
"username": "test-user",
"groups": ["system:authenticated"]
}
}
}`
func (c *httpContext) beforeEach(t *testing.T) {
t.Helper()
http.DefaultClient.Timeout = 10 * time.Second
if c.StaticConfig == nil {
c.StaticConfig = config.Default()
}
c.mockServer = test.NewMockServer()
// Fake Kubernetes configuration
c.StaticConfig.KubeConfig = c.mockServer.KubeconfigFile(t)
// Capture logging
c.klogState = klog.CaptureState()
flags := flag.NewFlagSet("test", flag.ContinueOnError)
klog.InitFlags(flags)
_ = flags.Set("v", "5")
klog.SetLogger(textlogger.NewLogger(textlogger.NewConfig(textlogger.Verbosity(5), textlogger.Output(&c.LogBuffer))))
// Start server in random port
ln, err := net.Listen("tcp", "0.0.0.0:0")
if err != nil {
t.Fatalf("Failed to find random port for HTTP server: %v", err)
}
c.HttpAddress = ln.Addr().String()
if randomPortErr := ln.Close(); randomPortErr != nil {
t.Fatalf("Failed to close random port listener: %v", randomPortErr)
}
c.StaticConfig.Port = fmt.Sprintf("%d", ln.Addr().(*net.TCPAddr).Port)
mcpServer, err := mcp.NewServer(mcp.Configuration{StaticConfig: c.StaticConfig})
if err != nil {
t.Fatalf("Failed to create MCP server: %v", err)
}
var timeoutCtx, cancelCtx context.Context
timeoutCtx, c.timeoutCancel = context.WithTimeout(t.Context(), 10*time.Second)
group, gc := errgroup.WithContext(timeoutCtx)
cancelCtx, c.StopServer = context.WithCancel(gc)
group.Go(func() error { return Serve(cancelCtx, mcpServer, c.StaticConfig, c.OidcProvider, nil) })
c.WaitForShutdown = group.Wait
// Wait for HTTP server to start (using net)
for i := 0; i < 10; i++ {
conn, err := net.Dial("tcp", c.HttpAddress)
if err == nil {
_ = conn.Close()
break
}
time.Sleep(50 * time.Millisecond) // Wait before retrying
}
}
func (c *httpContext) afterEach(t *testing.T) {
t.Helper()
c.mockServer.Close()
c.StopServer()
err := c.WaitForShutdown()
if err != nil {
t.Errorf("HTTP server did not shut down gracefully: %v", err)
}
c.timeoutCancel()
c.klogState.Restore()
_ = os.Setenv("KUBECONFIG", "")
}
func testCase(t *testing.T, test func(c *httpContext)) {
testCaseWithContext(t, &httpContext{}, test)
}
func testCaseWithContext(t *testing.T, httpCtx *httpContext, test func(c *httpContext)) {
httpCtx.beforeEach(t)
t.Cleanup(func() { httpCtx.afterEach(t) })
test(httpCtx)
}
type OidcTestServer struct {
*rsa.PrivateKey
*oidc.Provider
*httptest.Server
TokenEndpointHandler http.HandlerFunc
}
func NewOidcTestServer(t *testing.T) (oidcTestServer *OidcTestServer) {
t.Helper()
var err error
oidcTestServer = &OidcTestServer{}
oidcTestServer.PrivateKey, err = rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
t.Fatalf("failed to generate private key for oidc: %v", err)
}
oidcServer := &oidctest.Server{
Algorithms: []string{oidc.RS256, oidc.ES256},
PublicKeys: []oidctest.PublicKey{
{
PublicKey: oidcTestServer.Public(),
KeyID: "test-oidc-key-id",
Algorithm: oidc.RS256,
},
},
}
oidcTestServer.Server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/token" && oidcTestServer.TokenEndpointHandler != nil {
oidcTestServer.TokenEndpointHandler.ServeHTTP(w, r)
return
}
oidcServer.ServeHTTP(w, r)
}))
oidcServer.SetIssuer(oidcTestServer.URL)
oidcTestServer.Provider, err = oidc.NewProvider(t.Context(), oidcTestServer.URL)
if err != nil {
t.Fatalf("failed to create OIDC provider: %v", err)
}
return
}
func TestGracefulShutdown(t *testing.T) {
testCase(t, func(ctx *httpContext) {
ctx.StopServer()
err := ctx.WaitForShutdown()
t.Run("Stops gracefully", func(t *testing.T) {
if err != nil {
t.Errorf("Expected graceful shutdown, but got error: %v", err)
}
})
t.Run("Stops on context cancel", func(t *testing.T) {
if !strings.Contains(ctx.LogBuffer.String(), "Context cancelled, initiating graceful shutdown") {
t.Errorf("Context cancelled, initiating graceful shutdown, got: %s", ctx.LogBuffer.String())
}
})
t.Run("Starts server shutdown", func(t *testing.T) {
if !strings.Contains(ctx.LogBuffer.String(), "Shutting down HTTP server gracefully") {
t.Errorf("Expected graceful shutdown log, got: %s", ctx.LogBuffer.String())
}
})
t.Run("Server shutdown completes", func(t *testing.T) {
if !strings.Contains(ctx.LogBuffer.String(), "HTTP server shutdown complete") {
t.Errorf("Expected HTTP server shutdown completed log, got: %s", ctx.LogBuffer.String())
}
})
})
}
func TestSseTransport(t *testing.T) {
testCase(t, func(ctx *httpContext) {
sseResp, sseErr := http.Get(fmt.Sprintf("http://%s/sse", ctx.HttpAddress))
t.Cleanup(func() { _ = sseResp.Body.Close() })
t.Run("Exposes SSE endpoint at /sse", func(t *testing.T) {
if sseErr != nil {
t.Fatalf("Failed to get SSE endpoint: %v", sseErr)
}
if sseResp.StatusCode != http.StatusOK {
t.Errorf("Expected HTTP 200 OK, got %d", sseResp.StatusCode)
}
})
t.Run("SSE endpoint returns text/event-stream content type", func(t *testing.T) {
if sseResp.Header.Get("Content-Type") != "text/event-stream" {
t.Errorf("Expected Content-Type text/event-stream, got %s", sseResp.Header.Get("Content-Type"))
}
})
responseReader := bufio.NewReader(sseResp.Body)
event, eventErr := responseReader.ReadString('\n')
endpoint, endpointErr := responseReader.ReadString('\n')
t.Run("SSE endpoint returns stream with messages endpoint", func(t *testing.T) {
if eventErr != nil {
t.Fatalf("Failed to read SSE response body (event): %v", eventErr)
}
if event != "event: endpoint\n" {
t.Errorf("Expected SSE event 'endpoint', got %s", event)
}
if endpointErr != nil {
t.Fatalf("Failed to read SSE response body (endpoint): %v", endpointErr)
}
if !strings.HasPrefix(endpoint, "data: /message?sessionId=") {
t.Errorf("Expected SSE data: '/message', got %s", endpoint)
}
})
messageResp, messageErr := http.Post(
fmt.Sprintf("http://%s/message?sessionId=%s", ctx.HttpAddress, strings.TrimSpace(endpoint[25:])),
"application/json",
bytes.NewBufferString("{}"),
)
t.Cleanup(func() { _ = messageResp.Body.Close() })
t.Run("Exposes message endpoint at /message", func(t *testing.T) {
if messageErr != nil {
t.Fatalf("Failed to get message endpoint: %v", messageErr)
}
if messageResp.StatusCode != http.StatusAccepted {
t.Errorf("Expected HTTP 202 OK, got %d", messageResp.StatusCode)
}
})
})
}
func TestStreamableHttpTransport(t *testing.T) {
testCase(t, func(ctx *httpContext) {
mcpGetResp, mcpGetErr := http.Get(fmt.Sprintf("http://%s/mcp", ctx.HttpAddress))
t.Cleanup(func() { _ = mcpGetResp.Body.Close() })
t.Run("Exposes MCP GET endpoint at /mcp", func(t *testing.T) {
if mcpGetErr != nil {
t.Fatalf("Failed to get MCP endpoint: %v", mcpGetErr)
}
if mcpGetResp.StatusCode != http.StatusOK {
t.Errorf("Expected HTTP 200 OK, got %d", mcpGetResp.StatusCode)
}
})
t.Run("MCP GET endpoint returns text/event-stream content type", func(t *testing.T) {
if mcpGetResp.Header.Get("Content-Type") != "text/event-stream" {
t.Errorf("Expected Content-Type text/event-stream (GET), got %s", mcpGetResp.Header.Get("Content-Type"))
}
})
mcpPostResp, mcpPostErr := http.Post(fmt.Sprintf("http://%s/mcp", ctx.HttpAddress), "application/json", bytes.NewBufferString("{}"))
t.Cleanup(func() { _ = mcpPostResp.Body.Close() })
t.Run("Exposes MCP POST endpoint at /mcp", func(t *testing.T) {
if mcpPostErr != nil {
t.Fatalf("Failed to post to MCP endpoint: %v", mcpPostErr)
}
if mcpPostResp.StatusCode != http.StatusOK {
t.Errorf("Expected HTTP 200 OK, got %d", mcpPostResp.StatusCode)
}
})
t.Run("MCP POST endpoint returns application/json content type", func(t *testing.T) {
if mcpPostResp.Header.Get("Content-Type") != "application/json" {
t.Errorf("Expected Content-Type application/json (POST), got %s", mcpPostResp.Header.Get("Content-Type"))
}
})
})
}
func TestHealthCheck(t *testing.T) {
testCase(t, func(ctx *httpContext) {
t.Run("Exposes health check endpoint at /healthz", func(t *testing.T) {
resp, err := http.Get(fmt.Sprintf("http://%s/healthz", ctx.HttpAddress))
if err != nil {
t.Fatalf("Failed to get health check endpoint: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close })
if resp.StatusCode != http.StatusOK {
t.Errorf("Expected HTTP 200 OK, got %d", resp.StatusCode)
}
})
})
// Health exposed even when require Authorization
testCaseWithContext(t, &httpContext{StaticConfig: &config.StaticConfig{RequireOAuth: true, ValidateToken: true, ClusterProviderStrategy: config.ClusterProviderKubeConfig}}, func(ctx *httpContext) {
resp, err := http.Get(fmt.Sprintf("http://%s/healthz", ctx.HttpAddress))
if err != nil {
t.Fatalf("Failed to get health check endpoint with OAuth: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close() })
t.Run("Health check with OAuth returns HTTP 200 OK", func(t *testing.T) {
if resp.StatusCode != http.StatusOK {
t.Errorf("Expected HTTP 200 OK, got %d", resp.StatusCode)
}
})
})
}
func TestWellKnownReverseProxy(t *testing.T) {
cases := []string{
".well-known/oauth-authorization-server",
".well-known/oauth-protected-resource",
".well-known/openid-configuration",
}
// With No Authorization URL configured
testCaseWithContext(t, &httpContext{StaticConfig: &config.StaticConfig{RequireOAuth: true, ValidateToken: true, ClusterProviderStrategy: config.ClusterProviderKubeConfig}}, func(ctx *httpContext) {
for _, path := range cases {
resp, err := http.Get(fmt.Sprintf("http://%s/%s", ctx.HttpAddress, path))
t.Cleanup(func() { _ = resp.Body.Close() })
t.Run("Protected resource '"+path+"' without Authorization URL returns 404 - Not Found", func(t *testing.T) {
if err != nil {
t.Fatalf("Failed to get %s endpoint: %v", path, err)
}
if resp.StatusCode != http.StatusNotFound {
t.Errorf("Expected HTTP 404 Not Found, got %d", resp.StatusCode)
}
})
}
})
// With Authorization URL configured but invalid payload
invalidPayloadServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`NOT A JSON PAYLOAD`))
}))
t.Cleanup(invalidPayloadServer.Close)
invalidPayloadConfig := &config.StaticConfig{
AuthorizationURL: invalidPayloadServer.URL,
RequireOAuth: true,
ValidateToken: true,
ClusterProviderStrategy: config.ClusterProviderKubeConfig,
}
testCaseWithContext(t, &httpContext{StaticConfig: invalidPayloadConfig}, func(ctx *httpContext) {
for _, path := range cases {
resp, err := http.Get(fmt.Sprintf("http://%s/%s", ctx.HttpAddress, path))
t.Cleanup(func() { _ = resp.Body.Close() })
t.Run("Protected resource '"+path+"' with invalid Authorization URL payload returns 500 - Internal Server Error", func(t *testing.T) {
if err != nil {
t.Fatalf("Failed to get %s endpoint: %v", path, err)
}
if resp.StatusCode != http.StatusInternalServerError {
t.Errorf("Expected HTTP 500 Internal Server Error, got %d", resp.StatusCode)
}
})
}
})
// With Authorization URL configured and valid payload
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !strings.HasPrefix(r.URL.EscapedPath(), "/.well-known/") {
http.NotFound(w, r)
return
}
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"issuer": "https://example.com","scopes_supported":["mcp-server"]}`))
}))
t.Cleanup(testServer.Close)
staticConfig := &config.StaticConfig{
AuthorizationURL: testServer.URL,
RequireOAuth: true,
ValidateToken: true,
ClusterProviderStrategy: config.ClusterProviderKubeConfig,
}
testCaseWithContext(t, &httpContext{StaticConfig: staticConfig}, func(ctx *httpContext) {
for _, path := range cases {
resp, err := http.Get(fmt.Sprintf("http://%s/%s", ctx.HttpAddress, path))
t.Cleanup(func() { _ = resp.Body.Close() })
t.Run("Exposes "+path+" endpoint", func(t *testing.T) {
if err != nil {
t.Fatalf("Failed to get %s endpoint: %v", path, err)
}
if resp.StatusCode != http.StatusOK {
t.Errorf("Expected HTTP 200 OK, got %d", resp.StatusCode)
}
})
t.Run(path+" returns application/json content type", func(t *testing.T) {
if resp.Header.Get("Content-Type") != "application/json" {
t.Errorf("Expected Content-Type application/json, got %s", resp.Header.Get("Content-Type"))
}
})
}
})
}
func TestWellKnownHeaderPropagation(t *testing.T) {
cases := []string{
".well-known/oauth-authorization-server",
".well-known/oauth-protected-resource",
".well-known/openid-configuration",
}
var receivedRequestHeaders http.Header
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !strings.HasPrefix(r.URL.EscapedPath(), "/.well-known/") {
http.NotFound(w, r)
return
}
// Capture headers received from the proxy
receivedRequestHeaders = r.Header.Clone()
// Set response headers that should be propagated back
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "https://example.com")
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("X-Custom-Backend-Header", "backend-value")
_, _ = w.Write([]byte(`{"issuer": "https://example.com"}`))
}))
t.Cleanup(testServer.Close)
staticConfig := &config.StaticConfig{
AuthorizationURL: testServer.URL,
RequireOAuth: true,
ValidateToken: true,
ClusterProviderStrategy: config.ClusterProviderKubeConfig,
}
testCaseWithContext(t, &httpContext{StaticConfig: staticConfig}, func(ctx *httpContext) {
for _, path := range cases {
receivedRequestHeaders = nil
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/%s", ctx.HttpAddress, path), nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
// Add various headers to test propagation
req.Header.Set("Origin", "https://example.com")
req.Header.Set("User-Agent", "Test-Agent/1.0")
req.Header.Set("Accept", "application/json")
req.Header.Set("Accept-Language", "en-US")
req.Header.Set("X-Custom-Header", "custom-value")
req.Header.Set("Referer", "https://example.com/page")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to get %s endpoint: %v", path, err)
}
t.Cleanup(func() { _ = resp.Body.Close() })
t.Run("Well-known proxy propagates Origin header to backend for "+path, func(t *testing.T) {
if receivedRequestHeaders == nil {
t.Fatal("Backend did not receive any headers")
}
if receivedRequestHeaders.Get("Origin") != "https://example.com" {
t.Errorf("Expected Origin header 'https://example.com', got '%s'", receivedRequestHeaders.Get("Origin"))
}
})
t.Run("Well-known proxy propagates User-Agent header to backend for "+path, func(t *testing.T) {
if receivedRequestHeaders.Get("User-Agent") != "Test-Agent/1.0" {
t.Errorf("Expected User-Agent header 'Test-Agent/1.0', got '%s'", receivedRequestHeaders.Get("User-Agent"))
}
})
t.Run("Well-known proxy propagates Accept header to backend for "+path, func(t *testing.T) {
if receivedRequestHeaders.Get("Accept") != "application/json" {
t.Errorf("Expected Accept header 'application/json', got '%s'", receivedRequestHeaders.Get("Accept"))
}
})
t.Run("Well-known proxy propagates Accept-Language header to backend for "+path, func(t *testing.T) {
if receivedRequestHeaders.Get("Accept-Language") != "en-US" {
t.Errorf("Expected Accept-Language header 'en-US', got '%s'", receivedRequestHeaders.Get("Accept-Language"))
}
})
t.Run("Well-known proxy propagates custom headers to backend for "+path, func(t *testing.T) {
if receivedRequestHeaders.Get("X-Custom-Header") != "custom-value" {
t.Errorf("Expected X-Custom-Header 'custom-value', got '%s'", receivedRequestHeaders.Get("X-Custom-Header"))
}
})
t.Run("Well-known proxy propagates Referer header to backend for "+path, func(t *testing.T) {
if receivedRequestHeaders.Get("Referer") != "https://example.com/page" {
t.Errorf("Expected Referer header 'https://example.com/page', got '%s'", receivedRequestHeaders.Get("Referer"))
}
})
t.Run("Well-known proxy returns Access-Control-Allow-Origin from backend for "+path, func(t *testing.T) {
if resp.Header.Get("Access-Control-Allow-Origin") != "https://example.com" {
t.Errorf("Expected Access-Control-Allow-Origin header 'https://example.com', got '%s'", resp.Header.Get("Access-Control-Allow-Origin"))
}
})
t.Run("Well-known proxy returns Access-Control-Allow-Methods from backend for "+path, func(t *testing.T) {
if resp.Header.Get("Access-Control-Allow-Methods") != "GET, POST, OPTIONS" {
t.Errorf("Expected Access-Control-Allow-Methods header 'GET, POST, OPTIONS', got '%s'", resp.Header.Get("Access-Control-Allow-Methods"))
}
})
t.Run("Well-known proxy returns Cache-Control from backend for "+path, func(t *testing.T) {
if resp.Header.Get("Cache-Control") != "no-cache" {
t.Errorf("Expected Cache-Control header 'no-cache', got '%s'", resp.Header.Get("Cache-Control"))
}
})
t.Run("Well-known proxy returns custom response headers from backend for "+path, func(t *testing.T) {
if resp.Header.Get("X-Custom-Backend-Header") != "backend-value" {
t.Errorf("Expected X-Custom-Backend-Header 'backend-value', got '%s'", resp.Header.Get("X-Custom-Backend-Header"))
}
})
}
})
}
func TestWellKnownOverrides(t *testing.T) {
cases := []string{
".well-known/oauth-authorization-server",
".well-known/oauth-protected-resource",
".well-known/openid-configuration",
}
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !strings.HasPrefix(r.URL.EscapedPath(), "/.well-known/") {
http.NotFound(w, r)
return
}
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`
{
"issuer": "https://localhost",
"registration_endpoint": "https://localhost/clients-registrations/openid-connect",
"require_request_uri_registration": true,
"scopes_supported":["scope-1", "scope-2"]
}`))
}))
t.Cleanup(testServer.Close)
baseConfig := config.StaticConfig{
AuthorizationURL: testServer.URL,
RequireOAuth: true,
ValidateToken: true,
ClusterProviderStrategy: config.ClusterProviderKubeConfig,
}
// With Dynamic Client Registration disabled
disableDynamicRegistrationConfig := baseConfig
disableDynamicRegistrationConfig.DisableDynamicClientRegistration = true
testCaseWithContext(t, &httpContext{StaticConfig: &disableDynamicRegistrationConfig}, func(ctx *httpContext) {
for _, path := range cases {
resp, _ := http.Get(fmt.Sprintf("http://%s/%s", ctx.HttpAddress, path))
t.Cleanup(func() { _ = resp.Body.Close() })
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Failed to read response body: %v", err)
}
t.Run("DisableDynamicClientRegistration removes registration_endpoint field", func(t *testing.T) {
if strings.Contains(string(body), "registration_endpoint") {
t.Error("Expected registration_endpoint to be removed, but it was found in the response")
}
})
t.Run("DisableDynamicClientRegistration sets require_request_uri_registration = false", func(t *testing.T) {
if !strings.Contains(string(body), `"require_request_uri_registration":false`) {
t.Error("Expected require_request_uri_registration to be false, but it was not found in the response")
}
})
t.Run("DisableDynamicClientRegistration includes/preserves scopes_supported", func(t *testing.T) {
if !strings.Contains(string(body), `"scopes_supported":["scope-1","scope-2"]`) {
t.Error("Expected scopes_supported to be present, but it was not found in the response")
}
})
}
})
// With overrides for OAuth scopes (client/frontend)
oAuthScopesConfig := baseConfig
oAuthScopesConfig.OAuthScopes = []string{"openid", "mcp-server"}
testCaseWithContext(t, &httpContext{StaticConfig: &oAuthScopesConfig}, func(ctx *httpContext) {
for _, path := range cases {
resp, _ := http.Get(fmt.Sprintf("http://%s/%s", ctx.HttpAddress, path))
t.Cleanup(func() { _ = resp.Body.Close() })
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Failed to read response body: %v", err)
}
t.Run("OAuthScopes overrides scopes_supported", func(t *testing.T) {
if !strings.Contains(string(body), `"scopes_supported":["openid","mcp-server"]`) {
t.Errorf("Expected scopes_supported to be overridden, but original was preserved, response: %s", string(body))
}
})
t.Run("OAuthScopes preserves other fields", func(t *testing.T) {
if !strings.Contains(string(body), `"issuer":"https://localhost"`) {
t.Errorf("Expected issuer to be preserved, but got: %s", string(body))
}
if !strings.Contains(string(body), `"registration_endpoint":"https://localhost`) {
t.Errorf("Expected registration_endpoint to be preserved, but got: %s", string(body))
}
if !strings.Contains(string(body), `"require_request_uri_registration":true`) {
t.Error("Expected require_request_uri_registration to be true, but it was not found in the response")
}
})
}
})
}
func TestMiddlewareLogging(t *testing.T) {
testCase(t, func(ctx *httpContext) {
_, _ = http.Get(fmt.Sprintf("http://%s/.well-known/oauth-protected-resource", ctx.HttpAddress))
t.Run("Logs HTTP requests and responses", func(t *testing.T) {
if !strings.Contains(ctx.LogBuffer.String(), "GET /.well-known/oauth-protected-resource 404") {
t.Errorf("Expected log entry for GET /.well-known/oauth-protected-resource, got: %s", ctx.LogBuffer.String())
}
})
t.Run("Logs HTTP request duration", func(t *testing.T) {
expected := `"GET /.well-known/oauth-protected-resource 404 (.+)"`
m := regexp.MustCompile(expected).FindStringSubmatch(ctx.LogBuffer.String())
if len(m) != 2 {
t.Fatalf("Expected log entry to contain duration, got %s", ctx.LogBuffer.String())
}
duration, err := time.ParseDuration(m[1])
if err != nil {
t.Fatalf("Failed to parse duration from log entry: %v", err)
}
if duration < 0 {
t.Errorf("Expected duration to be non-negative, got %v", duration)
}
})
})
}
func TestAuthorizationUnauthorized(t *testing.T) {
// Missing Authorization header
testCaseWithContext(t, &httpContext{StaticConfig: &config.StaticConfig{RequireOAuth: true, ValidateToken: true, ClusterProviderStrategy: config.ClusterProviderKubeConfig}}, func(ctx *httpContext) {
resp, err := http.Get(fmt.Sprintf("http://%s/mcp", ctx.HttpAddress))
if err != nil {
t.Fatalf("Failed to get protected endpoint: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close })
t.Run("Protected resource with MISSING Authorization header returns 401 - Unauthorized", func(t *testing.T) {
if resp.StatusCode != 401 {
t.Errorf("Expected HTTP 401, got %d", resp.StatusCode)
}
})
t.Run("Protected resource with MISSING Authorization header returns WWW-Authenticate header", func(t *testing.T) {
authHeader := resp.Header.Get("WWW-Authenticate")
expected := `Bearer realm="Kubernetes MCP Server", error="missing_token"`
if authHeader != expected {
t.Errorf("Expected WWW-Authenticate header to be %q, got %q", expected, authHeader)
}
})
t.Run("Protected resource with MISSING Authorization header logs error", func(t *testing.T) {
if !strings.Contains(ctx.LogBuffer.String(), "Authentication failed - missing or invalid bearer token") {
t.Errorf("Expected log entry for missing or invalid bearer token, got: %s", ctx.LogBuffer.String())
}
})
})
// Authorization header without Bearer prefix
testCaseWithContext(t, &httpContext{StaticConfig: &config.StaticConfig{RequireOAuth: true, ValidateToken: true, ClusterProviderStrategy: config.ClusterProviderKubeConfig}}, func(ctx *httpContext) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/mcp", ctx.HttpAddress), nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Authorization", "Basic YWxhZGRpbjpvcGVuc2VzYW1l")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to get protected endpoint: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close })
t.Run("Protected resource with INCOMPATIBLE Authorization header returns WWW-Authenticate header", func(t *testing.T) {
authHeader := resp.Header.Get("WWW-Authenticate")
expected := `Bearer realm="Kubernetes MCP Server", error="missing_token"`
if authHeader != expected {
t.Errorf("Expected WWW-Authenticate header to be %q, got %q", expected, authHeader)
}
})
t.Run("Protected resource with INCOMPATIBLE Authorization header logs error", func(t *testing.T) {
if !strings.Contains(ctx.LogBuffer.String(), "Authentication failed - missing or invalid bearer token") {
t.Errorf("Expected log entry for missing or invalid bearer token, got: %s", ctx.LogBuffer.String())
}
})
})
// Invalid Authorization header
testCaseWithContext(t, &httpContext{StaticConfig: &config.StaticConfig{RequireOAuth: true, ValidateToken: true, ClusterProviderStrategy: config.ClusterProviderKubeConfig}}, func(ctx *httpContext) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/mcp", ctx.HttpAddress), nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Authorization", "Bearer "+strings.ReplaceAll(tokenBasicNotExpired, ".", ".invalid"))
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to get protected endpoint: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close })
t.Run("Protected resource with INVALID Authorization header returns 401 - Unauthorized", func(t *testing.T) {
if resp.StatusCode != 401 {
t.Errorf("Expected HTTP 401, got %d", resp.StatusCode)
}
})
t.Run("Protected resource with INVALID Authorization header returns WWW-Authenticate header", func(t *testing.T) {
authHeader := resp.Header.Get("WWW-Authenticate")
expected := `Bearer realm="Kubernetes MCP Server", error="invalid_token"`
if authHeader != expected {
t.Errorf("Expected WWW-Authenticate header to be %q, got %q", expected, authHeader)
}
})
t.Run("Protected resource with INVALID Authorization header logs error", func(t *testing.T) {
if !strings.Contains(ctx.LogBuffer.String(), "Authentication failed - JWT validation error") ||
!strings.Contains(ctx.LogBuffer.String(), "error: failed to parse JWT token: illegal base64 data") {
t.Errorf("Expected log entry for JWT validation error, got: %s", ctx.LogBuffer.String())
}
})
})
// Expired Authorization Bearer token
testCaseWithContext(t, &httpContext{StaticConfig: &config.StaticConfig{RequireOAuth: true, ValidateToken: true, ClusterProviderStrategy: config.ClusterProviderKubeConfig}}, func(ctx *httpContext) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/mcp", ctx.HttpAddress), nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Authorization", "Bearer "+tokenBasicExpired)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to get protected endpoint: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close })
t.Run("Protected resource with EXPIRED Authorization header returns 401 - Unauthorized", func(t *testing.T) {
if resp.StatusCode != 401 {
t.Errorf("Expected HTTP 401, got %d", resp.StatusCode)
}
})
t.Run("Protected resource with EXPIRED Authorization header returns WWW-Authenticate header", func(t *testing.T) {
authHeader := resp.Header.Get("WWW-Authenticate")
expected := `Bearer realm="Kubernetes MCP Server", error="invalid_token"`
if authHeader != expected {
t.Errorf("Expected WWW-Authenticate header to be %q, got %q", expected, authHeader)
}
})
t.Run("Protected resource with EXPIRED Authorization header logs error", func(t *testing.T) {
if !strings.Contains(ctx.LogBuffer.String(), "Authentication failed - JWT validation error") ||
!strings.Contains(ctx.LogBuffer.String(), "validation failed, token is expired (exp)") {
t.Errorf("Expected log entry for JWT validation error, got: %s", ctx.LogBuffer.String())
}
})
})
// Invalid audience claim Bearer token
testCaseWithContext(t, &httpContext{StaticConfig: &config.StaticConfig{RequireOAuth: true, OAuthAudience: "expected-audience", ValidateToken: true, ClusterProviderStrategy: config.ClusterProviderKubeConfig}}, func(ctx *httpContext) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/mcp", ctx.HttpAddress), nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Authorization", "Bearer "+tokenBasicExpired)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to get protected endpoint: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close })
t.Run("Protected resource with INVALID AUDIENCE Authorization header returns 401 - Unauthorized", func(t *testing.T) {
if resp.StatusCode != 401 {
t.Errorf("Expected HTTP 401, got %d", resp.StatusCode)
}
})
t.Run("Protected resource with INVALID AUDIENCE Authorization header returns WWW-Authenticate header", func(t *testing.T) {
authHeader := resp.Header.Get("WWW-Authenticate")
expected := `Bearer realm="Kubernetes MCP Server", audience="expected-audience", error="invalid_token"`
if authHeader != expected {
t.Errorf("Expected WWW-Authenticate header to be %q, got %q", expected, authHeader)
}
})
t.Run("Protected resource with INVALID AUDIENCE Authorization header logs error", func(t *testing.T) {
if !strings.Contains(ctx.LogBuffer.String(), "Authentication failed - JWT validation error") ||
!strings.Contains(ctx.LogBuffer.String(), "invalid audience claim (aud)") {
t.Errorf("Expected log entry for JWT validation error, got: %s", ctx.LogBuffer.String())
}
})
})
// Failed OIDC validation
oidcTestServer := NewOidcTestServer(t)
t.Cleanup(oidcTestServer.Close)
testCaseWithContext(t, &httpContext{StaticConfig: &config.StaticConfig{RequireOAuth: true, OAuthAudience: "mcp-server", ValidateToken: true, ClusterProviderStrategy: config.ClusterProviderKubeConfig}, OidcProvider: oidcTestServer.Provider}, func(ctx *httpContext) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/mcp", ctx.HttpAddress), nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Authorization", "Bearer "+tokenBasicNotExpired)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to get protected endpoint: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close })
t.Run("Protected resource with INVALID OIDC Authorization header returns 401 - Unauthorized", func(t *testing.T) {
if resp.StatusCode != 401 {
t.Errorf("Expected HTTP 401, got %d", resp.StatusCode)
}
})
t.Run("Protected resource with INVALID OIDC Authorization header returns WWW-Authenticate header", func(t *testing.T) {
authHeader := resp.Header.Get("WWW-Authenticate")
expected := `Bearer realm="Kubernetes MCP Server", audience="mcp-server", error="invalid_token"`
if authHeader != expected {
t.Errorf("Expected WWW-Authenticate header to be %q, got %q", expected, authHeader)
}
})
t.Run("Protected resource with INVALID OIDC Authorization header logs error", func(t *testing.T) {
if !strings.Contains(ctx.LogBuffer.String(), "Authentication failed - JWT validation error") ||
!strings.Contains(ctx.LogBuffer.String(), "OIDC token validation error: failed to verify signature") {
t.Errorf("Expected log entry for OIDC validation error, got: %s", ctx.LogBuffer.String())
}
})
})
// Failed Kubernetes TokenReview
rawClaims := `{
"iss": "` + oidcTestServer.URL + `",
"exp": ` + strconv.FormatInt(time.Now().Add(time.Hour).Unix(), 10) + `,
"aud": "mcp-server"
}`
validOidcToken := oidctest.SignIDToken(oidcTestServer.PrivateKey, "test-oidc-key-id", oidc.RS256, rawClaims)
testCaseWithContext(t, &httpContext{StaticConfig: &config.StaticConfig{RequireOAuth: true, OAuthAudience: "mcp-server", ValidateToken: true, ClusterProviderStrategy: config.ClusterProviderKubeConfig}, OidcProvider: oidcTestServer.Provider}, func(ctx *httpContext) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/mcp", ctx.HttpAddress), nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Authorization", "Bearer "+validOidcToken)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to get protected endpoint: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close })
t.Run("Protected resource with INVALID KUBERNETES Authorization header returns 401 - Unauthorized", func(t *testing.T) {
if resp.StatusCode != 401 {
t.Errorf("Expected HTTP 401, got %d", resp.StatusCode)
}
})
t.Run("Protected resource with INVALID KUBERNETES Authorization header returns WWW-Authenticate header", func(t *testing.T) {
authHeader := resp.Header.Get("WWW-Authenticate")
expected := `Bearer realm="Kubernetes MCP Server", audience="mcp-server", error="invalid_token"`
if authHeader != expected {
t.Errorf("Expected WWW-Authenticate header to be %q, got %q", expected, authHeader)
}
})
t.Run("Protected resource with INVALID KUBERNETES Authorization header logs error", func(t *testing.T) {
if !strings.Contains(ctx.LogBuffer.String(), "Authentication failed - JWT validation error") ||
!strings.Contains(ctx.LogBuffer.String(), "kubernetes API token validation error: failed to create token review") {
t.Errorf("Expected log entry for Kubernetes TokenReview error, got: %s", ctx.LogBuffer.String())
}
})
})
}
func TestAuthorizationRequireOAuthFalse(t *testing.T) {
testCaseWithContext(t, &httpContext{StaticConfig: &config.StaticConfig{RequireOAuth: false, ClusterProviderStrategy: config.ClusterProviderKubeConfig}}, func(ctx *httpContext) {
resp, err := http.Get(fmt.Sprintf("http://%s/mcp", ctx.HttpAddress))
if err != nil {
t.Fatalf("Failed to get protected endpoint: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close() })
t.Run("Protected resource with MISSING Authorization header returns 200 - OK)", func(t *testing.T) {
if resp.StatusCode != http.StatusOK {
t.Errorf("Expected HTTP 200 OK, got %d", resp.StatusCode)
}
})
})
}
func TestAuthorizationRawToken(t *testing.T) {
cases := []struct {
audience string
validateToken bool
}{
{"", false}, // No audience, no validation
{"", true}, // No audience, validation enabled
{"mcp-server", false}, // Audience set, no validation
{"mcp-server", true}, // Audience set, validation enabled
}
for _, c := range cases {
testCaseWithContext(t, &httpContext{StaticConfig: &config.StaticConfig{RequireOAuth: true, OAuthAudience: c.audience, ValidateToken: c.validateToken, ClusterProviderStrategy: config.ClusterProviderKubeConfig}}, func(ctx *httpContext) {
tokenReviewed := false
ctx.mockServer.Handle(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.EscapedPath() == "/apis/authentication.k8s.io/v1/tokenreviews" {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(tokenReviewSuccessful))
tokenReviewed = true
return
}
}))
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/mcp", ctx.HttpAddress), nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Authorization", "Bearer "+tokenBasicNotExpired)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to get protected endpoint: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close() })
t.Run(fmt.Sprintf("Protected resource with audience = '%s' and validate-token = '%t', with VALID Authorization header returns 200 - OK", c.audience, c.validateToken), func(t *testing.T) {
if resp.StatusCode != http.StatusOK {
t.Errorf("Expected HTTP 200 OK, got %d", resp.StatusCode)
}
})
t.Run(fmt.Sprintf("Protected resource with audience = '%s' and validate-token = '%t', with VALID Authorization header performs token validation accordingly", c.audience, c.validateToken), func(t *testing.T) {
if tokenReviewed == true && !c.validateToken {
t.Errorf("Expected token review to be skipped when validate-token is false, but it was performed")
}
if tokenReviewed == false && c.validateToken {
t.Errorf("Expected token review to be performed when validate-token is true, but it was skipped")
}
})
})
}
}
func TestAuthorizationOidcToken(t *testing.T) {
oidcTestServer := NewOidcTestServer(t)
t.Cleanup(oidcTestServer.Close)
rawClaims := `{
"iss": "` + oidcTestServer.URL + `",
"exp": ` + strconv.FormatInt(time.Now().Add(time.Hour).Unix(), 10) + `,
"aud": "mcp-server"
}`
validOidcToken := oidctest.SignIDToken(oidcTestServer.PrivateKey, "test-oidc-key-id", oidc.RS256, rawClaims)
cases := []bool{false, true}
for _, validateToken := range cases {
testCaseWithContext(t, &httpContext{StaticConfig: &config.StaticConfig{RequireOAuth: true, OAuthAudience: "mcp-server", ValidateToken: validateToken, ClusterProviderStrategy: config.ClusterProviderKubeConfig}, OidcProvider: oidcTestServer.Provider}, func(ctx *httpContext) {
tokenReviewed := false
ctx.mockServer.Handle(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.EscapedPath() == "/apis/authentication.k8s.io/v1/tokenreviews" {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(tokenReviewSuccessful))
tokenReviewed = true
return
}
}))
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/mcp", ctx.HttpAddress), nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Authorization", "Bearer "+validOidcToken)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to get protected endpoint: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close() })
t.Run(fmt.Sprintf("Protected resource with validate-token='%t' with VALID OIDC Authorization header returns 200 - OK", validateToken), func(t *testing.T) {
if resp.StatusCode != http.StatusOK {
t.Errorf("Expected HTTP 200 OK, got %d", resp.StatusCode)
}
})
t.Run(fmt.Sprintf("Protected resource with validate-token='%t' with VALID OIDC Authorization header performs token validation accordingly", validateToken), func(t *testing.T) {
if tokenReviewed == true && !validateToken {
t.Errorf("Expected token review to be skipped when validate-token is false, but it was performed")
}
if tokenReviewed == false && validateToken {
t.Errorf("Expected token review to be performed when validate-token is true, but it was skipped")
}
})
})
}
}
func TestAuthorizationOidcTokenExchange(t *testing.T) {
oidcTestServer := NewOidcTestServer(t)
t.Cleanup(oidcTestServer.Close)
rawClaims := `{
"iss": "` + oidcTestServer.URL + `",
"exp": ` + strconv.FormatInt(time.Now().Add(time.Hour).Unix(), 10) + `,
"aud": "%s"
}`
validOidcClientToken := oidctest.SignIDToken(oidcTestServer.PrivateKey, "test-oidc-key-id", oidc.RS256,
fmt.Sprintf(rawClaims, "mcp-server"))
validOidcBackendToken := oidctest.SignIDToken(oidcTestServer.PrivateKey, "test-oidc-key-id", oidc.RS256,
fmt.Sprintf(rawClaims, "backend-audience"))
oidcTestServer.TokenEndpointHandler = func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = fmt.Fprintf(w, `{"access_token":"%s","token_type":"Bearer","expires_in":253402297199}`, validOidcBackendToken)
}
cases := []bool{false, true}
for _, validateToken := range cases {
staticConfig := &config.StaticConfig{
RequireOAuth: true,
OAuthAudience: "mcp-server",
ValidateToken: validateToken,
StsClientId: "test-sts-client-id",
StsClientSecret: "test-sts-client-secret",
StsAudience: "backend-audience",
StsScopes: []string{"backend-scope"},
ClusterProviderStrategy: config.ClusterProviderKubeConfig,
}
testCaseWithContext(t, &httpContext{StaticConfig: staticConfig, OidcProvider: oidcTestServer.Provider}, func(ctx *httpContext) {
tokenReviewed := false
ctx.mockServer.Handle(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.EscapedPath() == "/apis/authentication.k8s.io/v1/tokenreviews" {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(tokenReviewSuccessful))
tokenReviewed = true
return
}
}))
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/mcp", ctx.HttpAddress), nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Authorization", "Bearer "+validOidcClientToken)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to get protected endpoint: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close() })
t.Run(fmt.Sprintf("Protected resource with validate-token='%t' with VALID OIDC EXCHANGE Authorization header returns 200 - OK", validateToken), func(t *testing.T) {
if resp.StatusCode != http.StatusOK {
t.Errorf("Expected HTTP 200 OK, got %d", resp.StatusCode)
}
})
t.Run(fmt.Sprintf("Protected resource with validate-token='%t' with VALID OIDC EXCHANGE Authorization header performs token validation accordingly", validateToken), func(t *testing.T) {
if tokenReviewed == true && !validateToken {
t.Errorf("Expected token review to be skipped when validate-token is false, but it was performed")
}
if tokenReviewed == false && validateToken {
t.Errorf("Expected token review to be performed when validate-token is true, but it was skipped")
}
})
})
}
}
```