This is page 4 of 4. Use http://codebase.md/manusa/kubernetes-mcp-server?page={x} to view the full context.
# Directory Structure
```
├── .github
│ ├── dependabot.yml
│ └── workflows
│ ├── build.yaml
│ ├── release-image.yml
│ └── release.yaml
├── .gitignore
├── AGENTS.md
├── build
│ ├── keycloak.mk
│ ├── kind.mk
│ └── tools.mk
├── CLAUDE.md
├── cmd
│ └── kubernetes-mcp-server
│ ├── main_test.go
│ └── main.go
├── dev
│ └── config
│ ├── cert-manager
│ │ └── selfsigned-issuer.yaml
│ ├── ingress
│ │ └── nginx-ingress.yaml
│ ├── keycloak
│ │ ├── client-scopes
│ │ │ ├── groups.json
│ │ │ ├── mcp-openshift.json
│ │ │ └── mcp-server.json
│ │ ├── clients
│ │ │ ├── mcp-client.json
│ │ │ ├── mcp-server-update.json
│ │ │ ├── mcp-server.json
│ │ │ └── openshift.json
│ │ ├── deployment.yaml
│ │ ├── ingress.yaml
│ │ ├── mappers
│ │ │ ├── groups-membership.json
│ │ │ ├── mcp-server-audience.json
│ │ │ ├── openshift-audience.json
│ │ │ └── username.json
│ │ ├── rbac.yaml
│ │ ├── realm
│ │ │ ├── realm-create.json
│ │ │ └── realm-events-config.json
│ │ └── users
│ │ └── mcp.json
│ └── kind
│ └── cluster.yaml
├── Dockerfile
├── docs
│ └── images
│ ├── kubernetes-mcp-server-github-copilot.jpg
│ └── vibe-coding.jpg
├── go.mod
├── go.sum
├── hack
│ └── generate-placeholder-ca.sh
├── internal
│ ├── test
│ │ ├── env.go
│ │ ├── kubernetes.go
│ │ ├── mcp.go
│ │ ├── mock_server.go
│ │ └── test.go
│ └── tools
│ └── update-readme
│ └── main.go
├── LICENSE
├── Makefile
├── npm
│ ├── kubernetes-mcp-server
│ │ ├── bin
│ │ │ └── index.js
│ │ └── package.json
│ ├── kubernetes-mcp-server-darwin-amd64
│ │ └── package.json
│ ├── kubernetes-mcp-server-darwin-arm64
│ │ └── package.json
│ ├── kubernetes-mcp-server-linux-amd64
│ │ └── package.json
│ ├── kubernetes-mcp-server-linux-arm64
│ │ └── package.json
│ ├── kubernetes-mcp-server-windows-amd64
│ │ └── package.json
│ └── kubernetes-mcp-server-windows-arm64
│ └── package.json
├── pkg
│ ├── api
│ │ ├── toolsets_test.go
│ │ └── toolsets.go
│ ├── config
│ │ ├── config_default_overrides.go
│ │ ├── config_default.go
│ │ ├── config_test.go
│ │ ├── config.go
│ │ ├── provider_config_test.go
│ │ └── provider_config.go
│ ├── helm
│ │ └── helm.go
│ ├── http
│ │ ├── authorization_test.go
│ │ ├── authorization.go
│ │ ├── http_test.go
│ │ ├── http.go
│ │ ├── middleware.go
│ │ ├── sts_test.go
│ │ ├── sts.go
│ │ └── wellknown.go
│ ├── kubernetes
│ │ ├── accesscontrol_clientset.go
│ │ ├── accesscontrol_restmapper.go
│ │ ├── accesscontrol.go
│ │ ├── common_test.go
│ │ ├── configuration.go
│ │ ├── events.go
│ │ ├── impersonate_roundtripper.go
│ │ ├── kubernetes_derived_test.go
│ │ ├── kubernetes.go
│ │ ├── manager_test.go
│ │ ├── manager.go
│ │ ├── namespaces.go
│ │ ├── nodes.go
│ │ ├── openshift.go
│ │ ├── pods.go
│ │ ├── provider_kubeconfig_test.go
│ │ ├── provider_kubeconfig.go
│ │ ├── provider_registry_test.go
│ │ ├── provider_registry.go
│ │ ├── provider_single_test.go
│ │ ├── provider_single.go
│ │ ├── provider_test.go
│ │ ├── provider.go
│ │ ├── resources.go
│ │ └── token.go
│ ├── kubernetes-mcp-server
│ │ └── cmd
│ │ ├── root_test.go
│ │ ├── root.go
│ │ └── testdata
│ │ ├── empty-config.toml
│ │ └── valid-config.toml
│ ├── mcp
│ │ ├── common_test.go
│ │ ├── configuration_test.go
│ │ ├── events_test.go
│ │ ├── helm_test.go
│ │ ├── m3labs.go
│ │ ├── mcp_middleware_test.go
│ │ ├── mcp_test.go
│ │ ├── mcp_tools_test.go
│ │ ├── mcp.go
│ │ ├── modules.go
│ │ ├── namespaces_test.go
│ │ ├── nodes_test.go
│ │ ├── pods_exec_test.go
│ │ ├── pods_test.go
│ │ ├── pods_top_test.go
│ │ ├── resources_test.go
│ │ ├── testdata
│ │ │ ├── helm-chart-no-op
│ │ │ │ └── Chart.yaml
│ │ │ ├── helm-chart-secret
│ │ │ │ ├── Chart.yaml
│ │ │ │ └── templates
│ │ │ │ └── secret.yaml
│ │ │ ├── toolsets-config-tools.json
│ │ │ ├── toolsets-core-tools.json
│ │ │ ├── toolsets-full-tools-multicluster-enum.json
│ │ │ ├── toolsets-full-tools-multicluster.json
│ │ │ ├── toolsets-full-tools-openshift.json
│ │ │ ├── toolsets-full-tools.json
│ │ │ └── toolsets-helm-tools.json
│ │ ├── tool_filter_test.go
│ │ ├── tool_filter.go
│ │ ├── tool_mutator_test.go
│ │ ├── tool_mutator.go
│ │ └── toolsets_test.go
│ ├── output
│ │ ├── output_test.go
│ │ └── output.go
│ ├── toolsets
│ │ ├── config
│ │ │ ├── configuration.go
│ │ │ └── toolset.go
│ │ ├── core
│ │ │ ├── events.go
│ │ │ ├── namespaces.go
│ │ │ ├── nodes.go
│ │ │ ├── pods.go
│ │ │ ├── resources.go
│ │ │ └── toolset.go
│ │ ├── helm
│ │ │ ├── helm.go
│ │ │ └── toolset.go
│ │ ├── toolsets_test.go
│ │ └── toolsets.go
│ └── version
│ └── version.go
├── python
│ ├── kubernetes_mcp_server
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ └── kubernetes_mcp_server.py
│ ├── pyproject.toml
│ └── README.md
├── README.md
└── smithery.yaml
```
# Files
--------------------------------------------------------------------------------
/pkg/mcp/resources_test.go:
--------------------------------------------------------------------------------
```go
package mcp
import (
"regexp"
"strings"
"testing"
"github.com/mark3labs/mcp-go/mcp"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"sigs.k8s.io/yaml"
"github.com/containers/kubernetes-mcp-server/internal/test"
"github.com/containers/kubernetes-mcp-server/pkg/config"
"github.com/containers/kubernetes-mcp-server/pkg/output"
)
func TestResourcesList(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
t.Run("resources_list with missing apiVersion returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_list", map[string]interface{}{})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to list resources, missing argument apiVersion" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
}
})
t.Run("resources_list with missing kind returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_list", map[string]interface{}{"apiVersion": "v1"})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to list resources, missing argument kind" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
}
})
t.Run("resources_list with invalid apiVersion returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_list", map[string]interface{}{"apiVersion": "invalid/api/version", "kind": "Pod"})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to list resources, invalid argument apiVersion" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
}
})
t.Run("resources_list with nonexistent apiVersion returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_list", map[string]interface{}{"apiVersion": "custom.non.existent.example.com/v1", "kind": "Custom"})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
}
if toolResult.Content[0].(mcp.TextContent).Text != `failed to list resources: no matches for kind "Custom" in version "custom.non.existent.example.com/v1"` {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
}
})
namespaces, err := c.callTool("resources_list", map[string]interface{}{"apiVersion": "v1", "kind": "Namespace"})
t.Run("resources_list returns namespaces", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if namespaces.IsError {
t.Fatalf("call tool failed")
return
}
})
var decodedNamespaces []unstructured.Unstructured
err = yaml.Unmarshal([]byte(namespaces.Content[0].(mcp.TextContent).Text), &decodedNamespaces)
t.Run("resources_list has yaml content", func(t *testing.T) {
if err != nil {
t.Fatalf("invalid tool result content %v", err)
}
})
t.Run("resources_list returns more than 2 items", func(t *testing.T) {
if len(decodedNamespaces) < 3 {
t.Fatalf("invalid namespace count, expected >2, got %v", len(decodedNamespaces))
}
})
// Test label selector functionality
t.Run("resources_list with label selector returns filtered pods", func(t *testing.T) {
// List pods with label selector
result, err := c.callTool("resources_list", map[string]interface{}{
"apiVersion": "v1",
"kind": "Pod",
"namespace": "default",
"labelSelector": "app=nginx",
})
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if result.IsError {
t.Fatalf("call tool failed")
return
}
var decodedPods []unstructured.Unstructured
err = yaml.Unmarshal([]byte(result.Content[0].(mcp.TextContent).Text), &decodedPods)
if err != nil {
t.Fatalf("invalid tool result content %v", err)
return
}
// Verify only the pod with matching label is returned
if len(decodedPods) != 1 {
t.Fatalf("expected 1 pod, got %d", len(decodedPods))
return
}
if decodedPods[0].GetName() != "a-pod-in-default" {
t.Fatalf("expected pod-with-label, got %s", decodedPods[0].GetName())
return
}
// Test that multiple label selectors work
result, err = c.callTool("resources_list", map[string]interface{}{
"apiVersion": "v1",
"kind": "Pod",
"namespace": "default",
"labelSelector": "test-label=test-value,another=value",
})
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if result.IsError {
t.Fatalf("call tool failed")
return
}
err = yaml.Unmarshal([]byte(result.Content[0].(mcp.TextContent).Text), &decodedPods)
if err != nil {
t.Fatalf("invalid tool result content %v", err)
return
}
// Verify no pods match multiple label selector
if len(decodedPods) != 0 {
t.Fatalf("expected 0 pods, got %d", len(decodedPods))
return
}
})
})
}
func TestResourcesListDenied(t *testing.T) {
deniedResourcesServer := test.Must(config.ReadToml([]byte(`
denied_resources = [
{ version = "v1", kind = "Secret" },
{ group = "rbac.authorization.k8s.io", version = "v1" }
]
`)))
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
c.withEnvTest()
deniedByKind, _ := c.callTool("resources_list", map[string]interface{}{"apiVersion": "v1", "kind": "Secret"})
t.Run("resources_list (denied by kind) has error", func(t *testing.T) {
if !deniedByKind.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("resources_list (denied by kind) describes denial", func(t *testing.T) {
expectedMessage := "failed to list resources: resource not allowed: /v1, Kind=Secret"
if deniedByKind.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, deniedByKind.Content[0].(mcp.TextContent).Text)
}
})
deniedByGroup, _ := c.callTool("resources_list", map[string]interface{}{"apiVersion": "rbac.authorization.k8s.io/v1", "kind": "Role"})
t.Run("resources_list (denied by group) has error", func(t *testing.T) {
if !deniedByGroup.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("resources_list (denied by group) describes denial", func(t *testing.T) {
expectedMessage := "failed to list resources: resource not allowed: rbac.authorization.k8s.io/v1, Kind=Role"
if deniedByGroup.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, deniedByKind.Content[0].(mcp.TextContent).Text)
}
})
allowedResource, _ := c.callTool("resources_list", map[string]interface{}{"apiVersion": "v1", "kind": "Namespace"})
t.Run("resources_list (not denied) returns list", func(t *testing.T) {
if allowedResource.IsError {
t.Fatalf("call tool should not fail")
}
})
})
}
func TestResourcesListAsTable(t *testing.T) {
testCaseWithContext(t, &mcpContext{listOutput: output.Table, before: inOpenShift, after: inOpenShiftClear}, func(c *mcpContext) {
c.withEnvTest()
kc := c.newKubernetesClient()
_, _ = kc.CoreV1().ConfigMaps("default").Create(t.Context(), &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: "a-configmap-to-list-as-table", Labels: map[string]string{"resource": "config-map"}},
Data: map[string]string{"key": "value"},
}, metav1.CreateOptions{})
configMapList, err := c.callTool("resources_list", map[string]interface{}{"apiVersion": "v1", "kind": "ConfigMap"})
t.Run("resources_list returns ConfigMap list", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
}
if configMapList.IsError {
t.Fatalf("call tool failed")
}
})
outConfigMapList := configMapList.Content[0].(mcp.TextContent).Text
t.Run("resources_list returns column headers for ConfigMap list", func(t *testing.T) {
expectedHeaders := "NAMESPACE\\s+APIVERSION\\s+KIND\\s+NAME\\s+DATA\\s+AGE\\s+LABELS"
if m, e := regexp.MatchString(expectedHeaders, outConfigMapList); !m || e != nil {
t.Fatalf("Expected headers '%s' not found in output:\n%s", expectedHeaders, outConfigMapList)
}
})
t.Run("resources_list returns formatted row for a-configmap-to-list-as-table", func(t *testing.T) {
expectedRow := "(?<namespace>default)\\s+" +
"(?<apiVersion>v1)\\s+" +
"(?<kind>ConfigMap)\\s+" +
"(?<name>a-configmap-to-list-as-table)\\s+" +
"(?<data>1)\\s+" +
"(?<age>(\\d+m)?(\\d+s)?)\\s+" +
"(?<labels>resource=config-map)"
if m, e := regexp.MatchString(expectedRow, outConfigMapList); !m || e != nil {
t.Fatalf("Expected row '%s' not found in output:\n%s", expectedRow, outConfigMapList)
}
})
// Custom Resource List
_, _ = dynamic.NewForConfigOrDie(envTestRestConfig).
Resource(schema.GroupVersionResource{Group: "route.openshift.io", Version: "v1", Resource: "routes"}).
Namespace("default").
Create(c.ctx, &unstructured.Unstructured{Object: map[string]interface{}{
"apiVersion": "route.openshift.io/v1",
"kind": "Route",
"metadata": map[string]interface{}{
"name": "an-openshift-route-to-list-as-table",
},
}}, metav1.CreateOptions{})
routeList, err := c.callTool("resources_list", map[string]interface{}{"apiVersion": "route.openshift.io/v1", "kind": "Route"})
t.Run("resources_list returns Route list", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
}
if routeList.IsError {
t.Fatalf("call tool failed")
}
})
outRouteList := routeList.Content[0].(mcp.TextContent).Text
t.Run("resources_list returns column headers for Route list", func(t *testing.T) {
expectedHeaders := "NAMESPACE\\s+APIVERSION\\s+KIND\\s+NAME\\s+AGE\\s+LABELS"
if m, e := regexp.MatchString(expectedHeaders, outRouteList); !m || e != nil {
t.Fatalf("Expected headers '%s' not found in output:\n%s", expectedHeaders, outRouteList)
}
})
t.Run("resources_list returns formatted row for an-openshift-route-to-list-as-table", func(t *testing.T) {
expectedRow := "(?<namespace>default)\\s+" +
"(?<apiVersion>route.openshift.io/v1)\\s+" +
"(?<kind>Route)\\s+" +
"(?<name>an-openshift-route-to-list-as-table)\\s+" +
"(?<age>(\\d+m)?(\\d+s)?)\\s+" +
"(?<labels><none>)"
if m, e := regexp.MatchString(expectedRow, outRouteList); !m || e != nil {
t.Fatalf("Expected row '%s' not found in output:\n%s", expectedRow, outRouteList)
}
})
})
}
func TestResourcesGet(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
t.Run("resources_get with missing apiVersion returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_get", map[string]interface{}{})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to get resource, missing argument apiVersion" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("resources_get with missing kind returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_get", map[string]interface{}{"apiVersion": "v1"})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to get resource, missing argument kind" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("resources_get with invalid apiVersion returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_get", map[string]interface{}{"apiVersion": "invalid/api/version", "kind": "Pod", "name": "a-pod"})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to get resource, invalid argument apiVersion" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("resources_get with nonexistent apiVersion returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_get", map[string]interface{}{"apiVersion": "custom.non.existent.example.com/v1", "kind": "Custom", "name": "a-custom"})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != `failed to get resource: no matches for kind "Custom" in version "custom.non.existent.example.com/v1"` {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("resources_get with missing name returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_get", map[string]interface{}{"apiVersion": "v1", "kind": "Namespace"})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to get resource, missing argument name" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
namespace, err := c.callTool("resources_get", map[string]interface{}{"apiVersion": "v1", "kind": "Namespace", "name": "default"})
t.Run("resources_get returns namespace", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if namespace.IsError {
t.Fatalf("call tool failed")
return
}
})
var decodedNamespace unstructured.Unstructured
err = yaml.Unmarshal([]byte(namespace.Content[0].(mcp.TextContent).Text), &decodedNamespace)
t.Run("resources_get has yaml content", func(t *testing.T) {
if err != nil {
t.Fatalf("invalid tool result content %v", err)
return
}
})
t.Run("resources_get returns default namespace", func(t *testing.T) {
if decodedNamespace.GetName() != "default" {
t.Fatalf("invalid namespace name, expected default, got %v", decodedNamespace.GetName())
return
}
})
})
}
func TestResourcesGetDenied(t *testing.T) {
deniedResourcesServer := test.Must(config.ReadToml([]byte(`
denied_resources = [
{ version = "v1", kind = "Secret" },
{ group = "rbac.authorization.k8s.io", version = "v1" }
]
`)))
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
c.withEnvTest()
kc := c.newKubernetesClient()
_, _ = kc.CoreV1().Secrets("default").Create(c.ctx, &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{Name: "denied-secret"},
}, metav1.CreateOptions{})
_, _ = kc.RbacV1().Roles("default").Create(c.ctx, &v1.Role{
ObjectMeta: metav1.ObjectMeta{Name: "denied-role"},
}, metav1.CreateOptions{})
deniedByKind, _ := c.callTool("resources_get", map[string]interface{}{"apiVersion": "v1", "kind": "Secret", "namespace": "default", "name": "denied-secret"})
t.Run("resources_get (denied by kind) has error", func(t *testing.T) {
if !deniedByKind.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("resources_get (denied by kind) describes denial", func(t *testing.T) {
expectedMessage := "failed to get resource: resource not allowed: /v1, Kind=Secret"
if deniedByKind.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, deniedByKind.Content[0].(mcp.TextContent).Text)
}
})
deniedByGroup, _ := c.callTool("resources_get", map[string]interface{}{"apiVersion": "rbac.authorization.k8s.io/v1", "kind": "Role", "namespace": "default", "name": "denied-role"})
t.Run("resources_get (denied by group) has error", func(t *testing.T) {
if !deniedByGroup.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("resources_get (denied by group) describes denial", func(t *testing.T) {
expectedMessage := "failed to get resource: resource not allowed: rbac.authorization.k8s.io/v1, Kind=Role"
if deniedByGroup.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, deniedByKind.Content[0].(mcp.TextContent).Text)
}
})
allowedResource, _ := c.callTool("resources_get", map[string]interface{}{"apiVersion": "v1", "kind": "Namespace", "name": "default"})
t.Run("resources_get (not denied) returns resource", func(t *testing.T) {
if allowedResource.IsError {
t.Fatalf("call tool should not fail")
}
})
})
}
func TestResourcesCreateOrUpdate(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
t.Run("resources_create_or_update with nil resource returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_create_or_update", map[string]interface{}{})
if toolResult.IsError != true {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to create or update resources, missing argument resource" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("resources_create_or_update with empty resource returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_create_or_update", map[string]interface{}{"resource": ""})
if toolResult.IsError != true {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to create or update resources, missing argument resource" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
client := c.newKubernetesClient()
configMapYaml := "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: a-cm-created-or-updated\n namespace: default\n"
resourcesCreateOrUpdateCm1, err := c.callTool("resources_create_or_update", map[string]interface{}{"resource": configMapYaml})
t.Run("resources_create_or_update with valid namespaced yaml resource returns success", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if resourcesCreateOrUpdateCm1.IsError {
t.Errorf("call tool failed")
return
}
})
var decodedCreateOrUpdateCm1 []unstructured.Unstructured
err = yaml.Unmarshal([]byte(resourcesCreateOrUpdateCm1.Content[0].(mcp.TextContent).Text), &decodedCreateOrUpdateCm1)
t.Run("resources_create_or_update with valid namespaced yaml resource returns yaml content", func(t *testing.T) {
if err != nil {
t.Errorf("invalid tool result content %v", err)
return
}
if !strings.HasPrefix(resourcesCreateOrUpdateCm1.Content[0].(mcp.TextContent).Text, "# The following resources (YAML) have been created or updated successfully") {
t.Errorf("Excpected success message, got %v", resourcesCreateOrUpdateCm1.Content[0].(mcp.TextContent).Text)
return
}
if len(decodedCreateOrUpdateCm1) != 1 {
t.Errorf("invalid resource count, expected 1, got %v", len(decodedCreateOrUpdateCm1))
return
}
if decodedCreateOrUpdateCm1[0].GetName() != "a-cm-created-or-updated" {
t.Errorf("invalid resource name, expected a-cm-created-or-updated, got %v", decodedCreateOrUpdateCm1[0].GetName())
return
}
if decodedCreateOrUpdateCm1[0].GetUID() == "" {
t.Errorf("invalid uid, got %v", decodedCreateOrUpdateCm1[0].GetUID())
return
}
})
t.Run("resources_create_or_update with valid namespaced yaml resource creates ConfigMap", func(t *testing.T) {
cm, _ := client.CoreV1().ConfigMaps("default").Get(c.ctx, "a-cm-created-or-updated", metav1.GetOptions{})
if cm == nil {
t.Fatalf("ConfigMap not found")
return
}
})
configMapJson := "{\"apiVersion\": \"v1\", \"kind\": \"ConfigMap\", \"metadata\": {\"name\": \"a-cm-created-or-updated-2\", \"namespace\": \"default\"}}"
resourcesCreateOrUpdateCm2, err := c.callTool("resources_create_or_update", map[string]interface{}{"resource": configMapJson})
t.Run("resources_create_or_update with valid namespaced json resource returns success", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if resourcesCreateOrUpdateCm2.IsError {
t.Fatalf("call tool failed")
return
}
})
t.Run("resources_create_or_update with valid namespaced json resource creates config map", func(t *testing.T) {
cm, _ := client.CoreV1().ConfigMaps("default").Get(c.ctx, "a-cm-created-or-updated-2", metav1.GetOptions{})
if cm == nil {
t.Fatalf("ConfigMap not found")
return
}
})
customResourceDefinitionJson := `
{
"apiVersion": "apiextensions.k8s.io/v1",
"kind": "CustomResourceDefinition",
"metadata": {"name": "customs.example.com"},
"spec": {
"group": "example.com",
"versions": [{
"name": "v1","served": true,"storage": true,
"schema": {"openAPIV3Schema": {"type": "object"}}
}],
"scope": "Namespaced",
"names": {"plural": "customs","singular": "custom","kind": "Custom"}
}
}`
resourcesCreateOrUpdateCrd, err := c.callTool("resources_create_or_update", map[string]interface{}{"resource": customResourceDefinitionJson})
t.Run("resources_create_or_update with valid cluster-scoped json resource returns success", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if resourcesCreateOrUpdateCrd.IsError {
t.Fatalf("call tool failed")
return
}
})
t.Run("resources_create_or_update with valid cluster-scoped json resource creates custom resource definition", func(t *testing.T) {
apiExtensionsV1Client := c.newApiExtensionsClient()
_, err = apiExtensionsV1Client.CustomResourceDefinitions().Get(c.ctx, "customs.example.com", metav1.GetOptions{})
if err != nil {
t.Fatalf("custom resource definition not found")
return
}
})
c.crdWaitUntilReady("customs.example.com")
customJson := "{\"apiVersion\": \"example.com/v1\", \"kind\": \"Custom\", \"metadata\": {\"name\": \"a-custom-resource\"}}"
resourcesCreateOrUpdateCustom, err := c.callTool("resources_create_or_update", map[string]interface{}{"resource": customJson})
t.Run("resources_create_or_update with valid namespaced json resource returns success", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if resourcesCreateOrUpdateCustom.IsError {
t.Fatalf("call tool failed, got: %v", resourcesCreateOrUpdateCustom.Content)
return
}
})
t.Run("resources_create_or_update with valid namespaced json resource creates custom resource", func(t *testing.T) {
dynamicClient := dynamic.NewForConfigOrDie(envTestRestConfig)
_, err = dynamicClient.
Resource(schema.GroupVersionResource{Group: "example.com", Version: "v1", Resource: "customs"}).
Namespace("default").
Get(c.ctx, "a-custom-resource", metav1.GetOptions{})
if err != nil {
t.Fatalf("custom resource not found")
return
}
})
customJsonUpdated := "{\"apiVersion\": \"example.com/v1\", \"kind\": \"Custom\", \"metadata\": {\"name\": \"a-custom-resource\",\"annotations\": {\"updated\": \"true\"}}}"
resourcesCreateOrUpdateCustomUpdated, err := c.callTool("resources_create_or_update", map[string]interface{}{"resource": customJsonUpdated})
t.Run("resources_create_or_update with valid namespaced json resource updates custom resource", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if resourcesCreateOrUpdateCustomUpdated.IsError {
t.Fatalf("call tool failed")
return
}
})
t.Run("resources_create_or_update with valid namespaced json resource updates custom resource", func(t *testing.T) {
dynamicClient := dynamic.NewForConfigOrDie(envTestRestConfig)
customResource, _ := dynamicClient.
Resource(schema.GroupVersionResource{Group: "example.com", Version: "v1", Resource: "customs"}).
Namespace("default").
Get(c.ctx, "a-custom-resource", metav1.GetOptions{})
if customResource == nil {
t.Fatalf("custom resource not found")
return
}
annotations := customResource.GetAnnotations()
if annotations == nil || annotations["updated"] != "true" {
t.Fatalf("custom resource not updated")
return
}
})
})
}
func TestResourcesCreateOrUpdateDenied(t *testing.T) {
deniedResourcesServer := test.Must(config.ReadToml([]byte(`
denied_resources = [
{ version = "v1", kind = "Secret" },
{ group = "rbac.authorization.k8s.io", version = "v1" }
]
`)))
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
c.withEnvTest()
secretYaml := "apiVersion: v1\nkind: Secret\nmetadata:\n name: a-denied-secret\n namespace: default\n"
deniedByKind, _ := c.callTool("resources_create_or_update", map[string]interface{}{"resource": secretYaml})
t.Run("resources_create_or_update (denied by kind) has error", func(t *testing.T) {
if !deniedByKind.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("resources_create_or_update (denied by kind) describes denial", func(t *testing.T) {
expectedMessage := "failed to create or update resources: resource not allowed: /v1, Kind=Secret"
if deniedByKind.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, deniedByKind.Content[0].(mcp.TextContent).Text)
}
})
roleYaml := "apiVersion: rbac.authorization.k8s.io/v1\nkind: Role\nmetadata:\n name: a-denied-role\n namespace: default\n"
deniedByGroup, _ := c.callTool("resources_create_or_update", map[string]interface{}{"resource": roleYaml})
t.Run("resources_create_or_update (denied by group) has error", func(t *testing.T) {
if !deniedByGroup.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("resources_create_or_update (denied by group) describes denial", func(t *testing.T) {
expectedMessage := "failed to create or update resources: resource not allowed: rbac.authorization.k8s.io/v1, Kind=Role"
if deniedByGroup.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, deniedByKind.Content[0].(mcp.TextContent).Text)
}
})
configMapYaml := "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: a-cm-created-or-updated\n namespace: default\n"
allowedResource, _ := c.callTool("resources_create_or_update", map[string]interface{}{"resource": configMapYaml})
t.Run("resources_create_or_update (not denied) creates or updates resource", func(t *testing.T) {
if allowedResource.IsError {
t.Fatalf("call tool should not fail")
}
})
})
}
func TestResourcesDelete(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
t.Run("resources_delete with missing apiVersion returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_delete", map[string]interface{}{})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to delete resource, missing argument apiVersion" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("resources_delete with missing kind returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_delete", map[string]interface{}{"apiVersion": "v1"})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to delete resource, missing argument kind" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("resources_delete with invalid apiVersion returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_delete", map[string]interface{}{"apiVersion": "invalid/api/version", "kind": "Pod", "name": "a-pod"})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to delete resource, invalid argument apiVersion" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("resources_delete with nonexistent apiVersion returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_delete", map[string]interface{}{"apiVersion": "custom.non.existent.example.com/v1", "kind": "Custom", "name": "a-custom"})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != `failed to delete resource: no matches for kind "Custom" in version "custom.non.existent.example.com/v1"` {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("resources_delete with missing name returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_delete", map[string]interface{}{"apiVersion": "v1", "kind": "Namespace"})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to delete resource, missing argument name" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("resources_delete with nonexistent resource returns error", func(t *testing.T) {
toolResult, _ := c.callTool("resources_delete", map[string]interface{}{"apiVersion": "v1", "kind": "ConfigMap", "name": "nonexistent-configmap"})
if !toolResult.IsError {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != `failed to delete resource: configmaps "nonexistent-configmap" not found` {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
resourcesDeleteCm, err := c.callTool("resources_delete", map[string]interface{}{"apiVersion": "v1", "kind": "ConfigMap", "name": "a-configmap-to-delete"})
t.Run("resources_delete with valid namespaced resource returns success", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if resourcesDeleteCm.IsError {
t.Fatalf("call tool failed")
return
}
if resourcesDeleteCm.Content[0].(mcp.TextContent).Text != "Resource deleted successfully" {
t.Fatalf("invalid tool result content got: %v", resourcesDeleteCm.Content[0].(mcp.TextContent).Text)
return
}
})
client := c.newKubernetesClient()
t.Run("resources_delete with valid namespaced resource deletes ConfigMap", func(t *testing.T) {
_, err := client.CoreV1().ConfigMaps("default").Get(c.ctx, "a-configmap-to-delete", metav1.GetOptions{})
if err == nil {
t.Fatalf("ConfigMap not deleted")
return
}
})
resourcesDeleteNamespace, err := c.callTool("resources_delete", map[string]interface{}{"apiVersion": "v1", "kind": "Namespace", "name": "ns-to-delete"})
t.Run("resources_delete with valid namespaced resource returns success", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if resourcesDeleteNamespace.IsError {
t.Fatalf("call tool failed")
return
}
if resourcesDeleteNamespace.Content[0].(mcp.TextContent).Text != "Resource deleted successfully" {
t.Fatalf("invalid tool result content got: %v", resourcesDeleteNamespace.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("resources_delete with valid namespaced resource deletes Namespace", func(t *testing.T) {
ns, err := client.CoreV1().Namespaces().Get(c.ctx, "ns-to-delete", metav1.GetOptions{})
if err == nil && ns != nil && ns.DeletionTimestamp == nil {
t.Fatalf("Namespace not deleted")
return
}
})
})
}
func TestResourcesDeleteDenied(t *testing.T) {
deniedResourcesServer := test.Must(config.ReadToml([]byte(`
denied_resources = [
{ version = "v1", kind = "Secret" },
{ group = "rbac.authorization.k8s.io", version = "v1" }
]
`)))
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
c.withEnvTest()
kc := c.newKubernetesClient()
_, _ = kc.CoreV1().ConfigMaps("default").Create(c.ctx, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: "allowed-configmap-to-delete"},
}, metav1.CreateOptions{})
deniedByKind, _ := c.callTool("resources_delete", map[string]interface{}{"apiVersion": "v1", "kind": "Secret", "namespace": "default", "name": "denied-secret"})
t.Run("resources_delete (denied by kind) has error", func(t *testing.T) {
if !deniedByKind.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("resources_delete (denied by kind) describes denial", func(t *testing.T) {
expectedMessage := "failed to delete resource: resource not allowed: /v1, Kind=Secret"
if deniedByKind.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, deniedByKind.Content[0].(mcp.TextContent).Text)
}
})
deniedByGroup, _ := c.callTool("resources_delete", map[string]interface{}{"apiVersion": "rbac.authorization.k8s.io/v1", "kind": "Role", "namespace": "default", "name": "denied-role"})
t.Run("resources_delete (denied by group) has error", func(t *testing.T) {
if !deniedByGroup.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("resources_delete (denied by group) describes denial", func(t *testing.T) {
expectedMessage := "failed to delete resource: resource not allowed: rbac.authorization.k8s.io/v1, Kind=Role"
if deniedByGroup.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, deniedByKind.Content[0].(mcp.TextContent).Text)
}
})
allowedResource, _ := c.callTool("resources_delete", map[string]interface{}{"apiVersion": "v1", "kind": "ConfigMap", "name": "allowed-configmap-to-delete"})
t.Run("resources_delete (not denied) deletes resource", func(t *testing.T) {
if allowedResource.IsError {
t.Fatalf("call tool should not fail")
}
})
})
}
```
--------------------------------------------------------------------------------
/pkg/mcp/pods_test.go:
--------------------------------------------------------------------------------
```go
package mcp
import (
"regexp"
"strings"
"testing"
"github.com/containers/kubernetes-mcp-server/internal/test"
"github.com/containers/kubernetes-mcp-server/pkg/config"
"github.com/containers/kubernetes-mcp-server/pkg/output"
"github.com/mark3labs/mcp-go/mcp"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"sigs.k8s.io/yaml"
)
func TestPodsListInAllNamespaces(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
toolResult, err := c.callTool("pods_list", map[string]interface{}{})
t.Run("pods_list returns pods list", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
}
if toolResult.IsError {
t.Fatalf("call tool failed")
}
})
var decoded []unstructured.Unstructured
err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded)
t.Run("pods_list has yaml content", func(t *testing.T) {
if err != nil {
t.Fatalf("invalid tool result content %v", err)
}
})
t.Run("pods_list returns 3 items", func(t *testing.T) {
if len(decoded) != 3 {
t.Fatalf("invalid pods count, expected 3, got %v", len(decoded))
}
})
t.Run("pods_list returns pod in ns-1", func(t *testing.T) {
if decoded[1].GetName() != "a-pod-in-ns-1" {
t.Fatalf("invalid pod name, expected a-pod-in-ns-1, got %v", decoded[1].GetName())
}
if decoded[1].GetNamespace() != "ns-1" {
t.Fatalf("invalid pod namespace, expected ns-1, got %v", decoded[1].GetNamespace())
}
})
t.Run("pods_list returns pod in ns-2", func(t *testing.T) {
if decoded[2].GetName() != "a-pod-in-ns-2" {
t.Fatalf("invalid pod name, expected a-pod-in-ns-2, got %v", decoded[2].GetName())
}
if decoded[2].GetNamespace() != "ns-2" {
t.Fatalf("invalid pod namespace, expected ns-2, got %v", decoded[2].GetNamespace())
}
})
t.Run("pods_list omits managed fields", func(t *testing.T) {
if decoded[1].GetManagedFields() != nil {
t.Fatalf("managed fields should be omitted, got %v", decoded[0].GetManagedFields())
}
})
})
}
func TestPodsListInAllNamespacesUnauthorized(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
defer restoreAuth(c.ctx)
client := c.newKubernetesClient()
// Authorize user only for default/configured namespace
r, _ := client.RbacV1().Roles("default").Create(c.ctx, &rbacv1.Role{
ObjectMeta: metav1.ObjectMeta{Name: "allow-pods-list"},
Rules: []rbacv1.PolicyRule{{
Verbs: []string{"get", "list"},
APIGroups: []string{""},
Resources: []string{"pods"},
}},
}, metav1.CreateOptions{})
_, _ = client.RbacV1().RoleBindings("default").Create(c.ctx, &rbacv1.RoleBinding{
ObjectMeta: metav1.ObjectMeta{Name: "allow-pods-list"},
Subjects: []rbacv1.Subject{{Kind: "User", Name: envTestUser.Name}},
RoleRef: rbacv1.RoleRef{Kind: "Role", Name: r.Name},
}, metav1.CreateOptions{})
// Deny cluster by removing cluster rule
_ = client.RbacV1().ClusterRoles().Delete(c.ctx, "allow-all", metav1.DeleteOptions{})
toolResult, err := c.callTool("pods_list", map[string]interface{}{})
t.Run("pods_list returns pods list for default namespace only", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if toolResult.IsError {
t.Fatalf("call tool failed %v", toolResult.Content)
return
}
})
var decoded []unstructured.Unstructured
err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded)
t.Run("pods_list has yaml content", func(t *testing.T) {
if err != nil {
t.Fatalf("invalid tool result content %v", err)
return
}
})
t.Run("pods_list returns 1 items", func(t *testing.T) {
if len(decoded) != 1 {
t.Fatalf("invalid pods count, expected 1, got %v", len(decoded))
return
}
})
t.Run("pods_list returns pod in default", func(t *testing.T) {
if decoded[0].GetName() != "a-pod-in-default" {
t.Fatalf("invalid pod name, expected a-pod-in-default, got %v", decoded[0].GetName())
return
}
if decoded[0].GetNamespace() != "default" {
t.Fatalf("invalid pod namespace, expected default, got %v", decoded[0].GetNamespace())
return
}
})
})
}
func TestPodsListInNamespace(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
t.Run("pods_list_in_namespace with nil namespace returns error", func(t *testing.T) {
toolResult, _ := c.callTool("pods_list_in_namespace", map[string]interface{}{})
if toolResult.IsError != true {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to list pods in namespace, missing argument namespace" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
toolResult, err := c.callTool("pods_list_in_namespace", map[string]interface{}{
"namespace": "ns-1",
})
t.Run("pods_list_in_namespace returns pods list", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
}
if toolResult.IsError {
t.Fatalf("call tool failed")
}
})
var decoded []unstructured.Unstructured
err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded)
t.Run("pods_list_in_namespace has yaml content", func(t *testing.T) {
if err != nil {
t.Fatalf("invalid tool result content %v", err)
}
})
t.Run("pods_list_in_namespace returns 1 items", func(t *testing.T) {
if len(decoded) != 1 {
t.Fatalf("invalid pods count, expected 1, got %v", len(decoded))
}
})
t.Run("pods_list_in_namespace returns pod in ns-1", func(t *testing.T) {
if decoded[0].GetName() != "a-pod-in-ns-1" {
t.Errorf("invalid pod name, expected a-pod-in-ns-1, got %v", decoded[0].GetName())
}
if decoded[0].GetNamespace() != "ns-1" {
t.Errorf("invalid pod namespace, expected ns-1, got %v", decoded[0].GetNamespace())
}
})
t.Run("pods_list_in_namespace omits managed fields", func(t *testing.T) {
if decoded[0].GetManagedFields() != nil {
t.Fatalf("managed fields should be omitted, got %v", decoded[0].GetManagedFields())
}
})
})
}
func TestPodsListDenied(t *testing.T) {
deniedResourcesServer := test.Must(config.ReadToml([]byte(`
denied_resources = [ { version = "v1", kind = "Pod" } ]
`)))
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
c.withEnvTest()
podsList, _ := c.callTool("pods_list", map[string]interface{}{})
t.Run("pods_list has error", func(t *testing.T) {
if !podsList.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("pods_list describes denial", func(t *testing.T) {
expectedMessage := "failed to list pods in all namespaces: resource not allowed: /v1, Kind=Pod"
if podsList.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, podsList.Content[0].(mcp.TextContent).Text)
}
})
podsListInNamespace, _ := c.callTool("pods_list_in_namespace", map[string]interface{}{"namespace": "ns-1"})
t.Run("pods_list_in_namespace has error", func(t *testing.T) {
if !podsListInNamespace.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("pods_list_in_namespace describes denial", func(t *testing.T) {
expectedMessage := "failed to list pods in namespace ns-1: resource not allowed: /v1, Kind=Pod"
if podsListInNamespace.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, podsListInNamespace.Content[0].(mcp.TextContent).Text)
}
})
})
}
func TestPodsListAsTable(t *testing.T) {
testCaseWithContext(t, &mcpContext{listOutput: output.Table}, func(c *mcpContext) {
c.withEnvTest()
podsList, err := c.callTool("pods_list", map[string]interface{}{})
t.Run("pods_list returns pods list", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
}
if podsList.IsError {
t.Fatalf("call tool failed")
}
})
outPodsList := podsList.Content[0].(mcp.TextContent).Text
t.Run("pods_list returns table with 1 header and 3 rows", func(t *testing.T) {
lines := strings.Count(outPodsList, "\n")
if lines != 4 {
t.Fatalf("invalid line count, expected 4 (1 header, 3 row), got %v", lines)
}
})
t.Run("pods_list_in_namespace returns column headers", func(t *testing.T) {
expectedHeaders := "NAMESPACE\\s+APIVERSION\\s+KIND\\s+NAME\\s+READY\\s+STATUS\\s+RESTARTS\\s+AGE\\s+IP\\s+NODE\\s+NOMINATED NODE\\s+READINESS GATES\\s+LABELS"
if m, e := regexp.MatchString(expectedHeaders, outPodsList); !m || e != nil {
t.Fatalf("Expected headers '%s' not found in output:\n%s", expectedHeaders, outPodsList)
}
})
t.Run("pods_list_in_namespace returns formatted row for a-pod-in-ns-1", func(t *testing.T) {
expectedRow := "(?<namespace>ns-1)\\s+" +
"(?<apiVersion>v1)\\s+" +
"(?<kind>Pod)\\s+" +
"(?<name>a-pod-in-ns-1)\\s+" +
"(?<ready>0\\/1)\\s+" +
"(?<status>Pending)\\s+" +
"(?<restarts>0)\\s+" +
"(?<age>(\\d+m)?(\\d+s)?)\\s+" +
"(?<ip><none>)\\s+" +
"(?<node><none>)\\s+" +
"(?<nominated_node><none>)\\s+" +
"(?<readiness_gates><none>)\\s+" +
"(?<labels><none>)"
if m, e := regexp.MatchString(expectedRow, outPodsList); !m || e != nil {
t.Fatalf("Expected row '%s' not found in output:\n%s", expectedRow, outPodsList)
}
})
t.Run("pods_list_in_namespace returns formatted row for a-pod-in-default", func(t *testing.T) {
expectedRow := "(?<namespace>default)\\s+" +
"(?<apiVersion>v1)\\s+" +
"(?<kind>Pod)\\s+" +
"(?<name>a-pod-in-default)\\s+" +
"(?<ready>0\\/1)\\s+" +
"(?<status>Pending)\\s+" +
"(?<restarts>0)\\s+" +
"(?<age>(\\d+m)?(\\d+s)?)\\s+" +
"(?<ip><none>)\\s+" +
"(?<node><none>)\\s+" +
"(?<nominated_node><none>)\\s+" +
"(?<readiness_gates><none>)\\s+" +
"(?<labels>app=nginx)"
if m, e := regexp.MatchString(expectedRow, outPodsList); !m || e != nil {
t.Fatalf("Expected row '%s' not found in output:\n%s", expectedRow, outPodsList)
}
})
podsListInNamespace, err := c.callTool("pods_list_in_namespace", map[string]interface{}{
"namespace": "ns-1",
})
t.Run("pods_list_in_namespace returns pods list", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if podsListInNamespace.IsError {
t.Fatalf("call tool failed")
}
})
outPodsListInNamespace := podsListInNamespace.Content[0].(mcp.TextContent).Text
t.Run("pods_list_in_namespace returns table with 1 header and 1 row", func(t *testing.T) {
lines := strings.Count(outPodsListInNamespace, "\n")
if lines != 2 {
t.Fatalf("invalid line count, expected 2 (1 header, 1 row), got %v", lines)
}
})
t.Run("pods_list_in_namespace returns column headers", func(t *testing.T) {
expectedHeaders := "NAMESPACE\\s+APIVERSION\\s+KIND\\s+NAME\\s+READY\\s+STATUS\\s+RESTARTS\\s+AGE\\s+IP\\s+NODE\\s+NOMINATED NODE\\s+READINESS GATES\\s+LABELS"
if m, e := regexp.MatchString(expectedHeaders, outPodsListInNamespace); !m || e != nil {
t.Fatalf("Expected headers '%s' not found in output:\n%s", expectedHeaders, outPodsListInNamespace)
}
})
t.Run("pods_list_in_namespace returns formatted row", func(t *testing.T) {
expectedRow := "(?<namespace>ns-1)\\s+" +
"(?<apiVersion>v1)\\s+" +
"(?<kind>Pod)\\s+" +
"(?<name>a-pod-in-ns-1)\\s+" +
"(?<ready>0\\/1)\\s+" +
"(?<status>Pending)\\s+" +
"(?<restarts>0)\\s+" +
"(?<age>(\\d+m)?(\\d+s)?)\\s+" +
"(?<ip><none>)\\s+" +
"(?<node><none>)\\s+" +
"(?<nominated_node><none>)\\s+" +
"(?<readiness_gates><none>)\\s+" +
"(?<labels><none>)"
if m, e := regexp.MatchString(expectedRow, outPodsListInNamespace); !m || e != nil {
t.Fatalf("Expected row '%s' not found in output:\n%s", expectedRow, outPodsListInNamespace)
}
})
})
}
func TestPodsGet(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
t.Run("pods_get with nil name returns error", func(t *testing.T) {
toolResult, _ := c.callTool("pods_get", map[string]interface{}{})
if toolResult.IsError != true {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to get pod, missing argument name" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("pods_get with not found name returns error", func(t *testing.T) {
toolResult, _ := c.callTool("pods_get", map[string]interface{}{"name": "not-found"})
if toolResult.IsError != true {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to get pod not-found in namespace : pods \"not-found\" not found" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
podsGetNilNamespace, err := c.callTool("pods_get", map[string]interface{}{
"name": "a-pod-in-default",
})
t.Run("pods_get with name and nil namespace returns pod", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if podsGetNilNamespace.IsError {
t.Fatalf("call tool failed")
return
}
})
var decodedNilNamespace unstructured.Unstructured
err = yaml.Unmarshal([]byte(podsGetNilNamespace.Content[0].(mcp.TextContent).Text), &decodedNilNamespace)
t.Run("pods_get with name and nil namespace has yaml content", func(t *testing.T) {
if err != nil {
t.Fatalf("invalid tool result content %v", err)
return
}
})
t.Run("pods_get with name and nil namespace returns pod in default", func(t *testing.T) {
if decodedNilNamespace.GetName() != "a-pod-in-default" {
t.Fatalf("invalid pod name, expected a-pod-in-default, got %v", decodedNilNamespace.GetName())
return
}
if decodedNilNamespace.GetNamespace() != "default" {
t.Fatalf("invalid pod namespace, expected default, got %v", decodedNilNamespace.GetNamespace())
return
}
})
t.Run("pods_get with name and nil namespace omits managed fields", func(t *testing.T) {
if decodedNilNamespace.GetManagedFields() != nil {
t.Fatalf("managed fields should be omitted, got %v", decodedNilNamespace.GetManagedFields())
return
}
})
podsGetInNamespace, err := c.callTool("pods_get", map[string]interface{}{
"namespace": "ns-1",
"name": "a-pod-in-ns-1",
})
t.Run("pods_get with name and namespace returns pod", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if podsGetInNamespace.IsError {
t.Fatalf("call tool failed")
return
}
})
var decodedInNamespace unstructured.Unstructured
err = yaml.Unmarshal([]byte(podsGetInNamespace.Content[0].(mcp.TextContent).Text), &decodedInNamespace)
t.Run("pods_get with name and namespace has yaml content", func(t *testing.T) {
if err != nil {
t.Fatalf("invalid tool result content %v", err)
return
}
})
t.Run("pods_get with name and namespace returns pod in ns-1", func(t *testing.T) {
if decodedInNamespace.GetName() != "a-pod-in-ns-1" {
t.Fatalf("invalid pod name, expected a-pod-in-ns-1, got %v", decodedInNamespace.GetName())
return
}
if decodedInNamespace.GetNamespace() != "ns-1" {
t.Fatalf("invalid pod namespace, ns-1 ns-1, got %v", decodedInNamespace.GetNamespace())
return
}
})
})
}
func TestPodsGetDenied(t *testing.T) {
deniedResourcesServer := test.Must(config.ReadToml([]byte(`
denied_resources = [ { version = "v1", kind = "Pod" } ]
`)))
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
c.withEnvTest()
podsGet, _ := c.callTool("pods_get", map[string]interface{}{"name": "a-pod-in-default"})
t.Run("pods_get has error", func(t *testing.T) {
if !podsGet.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("pods_get describes denial", func(t *testing.T) {
expectedMessage := "failed to get pod a-pod-in-default in namespace : resource not allowed: /v1, Kind=Pod"
if podsGet.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, podsGet.Content[0].(mcp.TextContent).Text)
}
})
})
}
func TestPodsDelete(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
// Errors
t.Run("pods_delete with nil name returns error", func(t *testing.T) {
toolResult, _ := c.callTool("pods_delete", map[string]interface{}{})
if toolResult.IsError != true {
t.Errorf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to delete pod, missing argument name" {
t.Errorf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("pods_delete with not found name returns error", func(t *testing.T) {
toolResult, _ := c.callTool("pods_delete", map[string]interface{}{"name": "not-found"})
if toolResult.IsError != true {
t.Errorf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to delete pod not-found in namespace : pods \"not-found\" not found" {
t.Errorf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
// Default/nil Namespace
kc := c.newKubernetesClient()
_, _ = kc.CoreV1().Pods("default").Create(c.ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "a-pod-to-delete"},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}}},
}, metav1.CreateOptions{})
podsDeleteNilNamespace, err := c.callTool("pods_delete", map[string]interface{}{
"name": "a-pod-to-delete",
})
t.Run("pods_delete with name and nil namespace returns success", func(t *testing.T) {
if err != nil {
t.Errorf("call tool failed %v", err)
return
}
if podsDeleteNilNamespace.IsError {
t.Errorf("call tool failed")
return
}
if podsDeleteNilNamespace.Content[0].(mcp.TextContent).Text != "Pod deleted successfully" {
t.Errorf("invalid tool result content, got %v", podsDeleteNilNamespace.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("pods_delete with name and nil namespace deletes Pod", func(t *testing.T) {
p, pErr := kc.CoreV1().Pods("default").Get(c.ctx, "a-pod-to-delete", metav1.GetOptions{})
if pErr == nil && p != nil && p.DeletionTimestamp == nil {
t.Errorf("Pod not deleted")
return
}
})
// Provided Namespace
_, _ = kc.CoreV1().Pods("ns-1").Create(c.ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "a-pod-to-delete-in-ns-1"},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}}},
}, metav1.CreateOptions{})
podsDeleteInNamespace, err := c.callTool("pods_delete", map[string]interface{}{
"namespace": "ns-1",
"name": "a-pod-to-delete-in-ns-1",
})
t.Run("pods_delete with name and namespace returns success", func(t *testing.T) {
if err != nil {
t.Errorf("call tool failed %v", err)
return
}
if podsDeleteInNamespace.IsError {
t.Errorf("call tool failed")
return
}
if podsDeleteInNamespace.Content[0].(mcp.TextContent).Text != "Pod deleted successfully" {
t.Errorf("invalid tool result content, got %v", podsDeleteInNamespace.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("pods_delete with name and namespace deletes Pod", func(t *testing.T) {
p, pErr := kc.CoreV1().Pods("ns-1").Get(c.ctx, "a-pod-to-delete-in-ns-1", metav1.GetOptions{})
if pErr == nil && p != nil && p.DeletionTimestamp == nil {
t.Errorf("Pod not deleted")
return
}
})
// Managed Pod
managedLabels := map[string]string{
"app.kubernetes.io/managed-by": "kubernetes-mcp-server",
"app.kubernetes.io/name": "a-manged-pod-to-delete",
}
_, _ = kc.CoreV1().Pods("default").Create(c.ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "a-managed-pod-to-delete", Labels: managedLabels},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}}},
}, metav1.CreateOptions{})
_, _ = kc.CoreV1().Services("default").Create(c.ctx, &corev1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "a-managed-service-to-delete", Labels: managedLabels},
Spec: corev1.ServiceSpec{Selector: managedLabels, Ports: []corev1.ServicePort{{Port: 80}}},
}, metav1.CreateOptions{})
podsDeleteManaged, err := c.callTool("pods_delete", map[string]interface{}{
"name": "a-managed-pod-to-delete",
})
t.Run("pods_delete with managed pod returns success", func(t *testing.T) {
if err != nil {
t.Errorf("call tool failed %v", err)
return
}
if podsDeleteManaged.IsError {
t.Errorf("call tool failed")
return
}
if podsDeleteManaged.Content[0].(mcp.TextContent).Text != "Pod deleted successfully" {
t.Errorf("invalid tool result content, got %v", podsDeleteManaged.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("pods_delete with managed pod deletes Pod and Service", func(t *testing.T) {
p, pErr := kc.CoreV1().Pods("default").Get(c.ctx, "a-managed-pod-to-delete", metav1.GetOptions{})
if pErr == nil && p != nil && p.DeletionTimestamp == nil {
t.Errorf("Pod not deleted")
return
}
s, sErr := kc.CoreV1().Services("default").Get(c.ctx, "a-managed-service-to-delete", metav1.GetOptions{})
if sErr == nil && s != nil && s.DeletionTimestamp == nil {
t.Errorf("Service not deleted")
return
}
})
})
}
func TestPodsDeleteDenied(t *testing.T) {
deniedResourcesServer := test.Must(config.ReadToml([]byte(`
denied_resources = [ { version = "v1", kind = "Pod" } ]
`)))
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
c.withEnvTest()
podsDelete, _ := c.callTool("pods_delete", map[string]interface{}{"name": "a-pod-in-default"})
t.Run("pods_delete has error", func(t *testing.T) {
if !podsDelete.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("pods_delete describes denial", func(t *testing.T) {
expectedMessage := "failed to delete pod a-pod-in-default in namespace : resource not allowed: /v1, Kind=Pod"
if podsDelete.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, podsDelete.Content[0].(mcp.TextContent).Text)
}
})
})
}
func TestPodsDeleteInOpenShift(t *testing.T) {
testCaseWithContext(t, &mcpContext{before: inOpenShift, after: inOpenShiftClear}, func(c *mcpContext) {
managedLabels := map[string]string{
"app.kubernetes.io/managed-by": "kubernetes-mcp-server",
"app.kubernetes.io/name": "a-manged-pod-to-delete",
}
kc := c.newKubernetesClient()
_, _ = kc.CoreV1().Pods("default").Create(c.ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "a-managed-pod-to-delete-in-openshift", Labels: managedLabels},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}}},
}, metav1.CreateOptions{})
dynamicClient := dynamic.NewForConfigOrDie(envTestRestConfig)
_, _ = dynamicClient.Resource(schema.GroupVersionResource{Group: "route.openshift.io", Version: "v1", Resource: "routes"}).
Namespace("default").Create(c.ctx, &unstructured.Unstructured{Object: map[string]interface{}{
"apiVersion": "route.openshift.io/v1",
"kind": "Route",
"metadata": map[string]interface{}{
"name": "a-managed-route-to-delete",
"labels": managedLabels,
},
}}, metav1.CreateOptions{})
podsDeleteManagedOpenShift, err := c.callTool("pods_delete", map[string]interface{}{
"name": "a-managed-pod-to-delete-in-openshift",
})
t.Run("pods_delete with managed pod in OpenShift returns success", func(t *testing.T) {
if err != nil {
t.Errorf("call tool failed %v", err)
return
}
if podsDeleteManagedOpenShift.IsError {
t.Errorf("call tool failed")
return
}
if podsDeleteManagedOpenShift.Content[0].(mcp.TextContent).Text != "Pod deleted successfully" {
t.Errorf("invalid tool result content, got %v", podsDeleteManagedOpenShift.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("pods_delete with managed pod in OpenShift deletes Pod and Route", func(t *testing.T) {
p, pErr := kc.CoreV1().Pods("default").Get(c.ctx, "a-managed-pod-to-delete-in-openshift", metav1.GetOptions{})
if pErr == nil && p != nil && p.DeletionTimestamp == nil {
t.Errorf("Pod not deleted")
return
}
r, rErr := dynamicClient.
Resource(schema.GroupVersionResource{Group: "route.openshift.io", Version: "v1", Resource: "routes"}).
Namespace("default").Get(c.ctx, "a-managed-route-to-delete", metav1.GetOptions{})
if rErr == nil && r != nil && r.GetDeletionTimestamp() == nil {
t.Errorf("Route not deleted")
return
}
})
})
}
func TestPodsLog(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
t.Run("pods_log with nil name returns error", func(t *testing.T) {
toolResult, _ := c.callTool("pods_log", map[string]interface{}{})
if toolResult.IsError != true {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to get pod log, missing argument name" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
t.Run("pods_log with not found name returns error", func(t *testing.T) {
toolResult, _ := c.callTool("pods_log", map[string]interface{}{"name": "not-found"})
if toolResult.IsError != true {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to get pod not-found log in namespace : pods \"not-found\" not found" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
podsLogNilNamespace, err := c.callTool("pods_log", map[string]interface{}{
"name": "a-pod-in-default",
})
t.Run("pods_log with name and nil namespace returns pod log", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if podsLogNilNamespace.IsError {
t.Fatalf("call tool failed")
return
}
})
podsLogInNamespace, err := c.callTool("pods_log", map[string]interface{}{
"namespace": "ns-1",
"name": "a-pod-in-ns-1",
})
t.Run("pods_log with name and namespace returns pod log", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if podsLogInNamespace.IsError {
t.Fatalf("call tool failed")
return
}
})
podsContainerLogInNamespace, err := c.callTool("pods_log", map[string]interface{}{
"namespace": "ns-1",
"name": "a-pod-in-ns-1",
"container": "nginx",
})
t.Run("pods_log with name, container and namespace returns pod log", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if podsContainerLogInNamespace.IsError {
t.Fatalf("call tool failed")
return
}
})
toolResult, err := c.callTool("pods_log", map[string]interface{}{
"namespace": "ns-1",
"name": "a-pod-in-ns-1",
"container": "a-not-existing-container",
})
t.Run("pods_log with non existing container returns error", func(t *testing.T) {
if toolResult.IsError != true {
t.Fatalf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to get pod a-pod-in-ns-1 log in namespace ns-1: container a-not-existing-container is not valid for pod a-pod-in-ns-1" {
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
podsPreviousLogInNamespace, err := c.callTool("pods_log", map[string]interface{}{
"namespace": "ns-1",
"name": "a-pod-in-ns-1",
"previous": true,
})
t.Run("pods_log with previous=true returns previous pod log", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if podsPreviousLogInNamespace.IsError {
t.Fatalf("call tool failed")
return
}
})
podsPreviousLogFalse, err := c.callTool("pods_log", map[string]interface{}{
"namespace": "ns-1",
"name": "a-pod-in-ns-1",
"previous": false,
})
t.Run("pods_log with previous=false returns current pod log", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if podsPreviousLogFalse.IsError {
t.Fatalf("call tool failed")
return
}
})
// Test with tail parameter
podsTailLines, err := c.callTool("pods_log", map[string]interface{}{
"namespace": "ns-1",
"name": "a-pod-in-ns-1",
"tail": 50,
})
t.Run("pods_log with tail=50 returns pod log", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if podsTailLines.IsError {
t.Fatalf("call tool failed")
return
}
})
// Test with invalid tail parameter
podsInvalidTailLines, _ := c.callTool("pods_log", map[string]interface{}{
"namespace": "ns-1",
"name": "a-pod-in-ns-1",
"tail": "invalid",
})
t.Run("pods_log with invalid tail returns error", func(t *testing.T) {
if !podsInvalidTailLines.IsError {
t.Fatalf("call tool should fail")
return
}
expectedErrorMsg := "failed to parse tail parameter: expected integer"
if errMsg := podsInvalidTailLines.Content[0].(mcp.TextContent).Text; !strings.Contains(errMsg, expectedErrorMsg) {
t.Fatalf("unexpected error message, expected to contain '%s', got '%s'", expectedErrorMsg, errMsg)
return
}
})
})
}
func TestPodsLogDenied(t *testing.T) {
deniedResourcesServer := test.Must(config.ReadToml([]byte(`
denied_resources = [ { version = "v1", kind = "Pod" } ]
`)))
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
c.withEnvTest()
podsLog, _ := c.callTool("pods_log", map[string]interface{}{"name": "a-pod-in-default"})
t.Run("pods_log has error", func(t *testing.T) {
if !podsLog.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("pods_log describes denial", func(t *testing.T) {
expectedMessage := "failed to get pod a-pod-in-default log in namespace : resource not allowed: /v1, Kind=Pod"
if podsLog.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, podsLog.Content[0].(mcp.TextContent).Text)
}
})
})
}
func TestPodsRun(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
t.Run("pods_run with nil image returns error", func(t *testing.T) {
toolResult, _ := c.callTool("pods_run", map[string]interface{}{})
if toolResult.IsError != true {
t.Errorf("call tool should fail")
return
}
if toolResult.Content[0].(mcp.TextContent).Text != "failed to run pod, missing argument image" {
t.Errorf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
return
}
})
podsRunNilNamespace, err := c.callTool("pods_run", map[string]interface{}{"image": "nginx"})
t.Run("pods_run with image and nil namespace runs pod", func(t *testing.T) {
if err != nil {
t.Errorf("call tool failed %v", err)
return
}
if podsRunNilNamespace.IsError {
t.Errorf("call tool failed")
return
}
})
var decodedNilNamespace []unstructured.Unstructured
err = yaml.Unmarshal([]byte(podsRunNilNamespace.Content[0].(mcp.TextContent).Text), &decodedNilNamespace)
t.Run("pods_run with image and nil namespace has yaml content", func(t *testing.T) {
if err != nil {
t.Errorf("invalid tool result content %v", err)
return
}
})
t.Run("pods_run with image and nil namespace returns 1 item (Pod)", func(t *testing.T) {
if len(decodedNilNamespace) != 1 {
t.Errorf("invalid pods count, expected 1, got %v", len(decodedNilNamespace))
return
}
if decodedNilNamespace[0].GetKind() != "Pod" {
t.Errorf("invalid pod kind, expected Pod, got %v", decodedNilNamespace[0].GetKind())
return
}
})
t.Run("pods_run with image and nil namespace returns pod in default", func(t *testing.T) {
if decodedNilNamespace[0].GetNamespace() != "default" {
t.Errorf("invalid pod namespace, expected default, got %v", decodedNilNamespace[0].GetNamespace())
return
}
})
t.Run("pods_run with image and nil namespace returns pod with random name", func(t *testing.T) {
if !strings.HasPrefix(decodedNilNamespace[0].GetName(), "kubernetes-mcp-server-run-") {
t.Errorf("invalid pod name, expected random, got %v", decodedNilNamespace[0].GetName())
return
}
})
t.Run("pods_run with image and nil namespace returns pod with labels", func(t *testing.T) {
labels := decodedNilNamespace[0].Object["metadata"].(map[string]interface{})["labels"].(map[string]interface{})
if labels["app.kubernetes.io/name"] == "" {
t.Errorf("invalid labels, expected app.kubernetes.io/name, got %v", labels)
return
}
if labels["app.kubernetes.io/component"] == "" {
t.Errorf("invalid labels, expected app.kubernetes.io/component, got %v", labels)
return
}
if labels["app.kubernetes.io/managed-by"] != "kubernetes-mcp-server" {
t.Errorf("invalid labels, expected app.kubernetes.io/managed-by, got %v", labels)
return
}
if labels["app.kubernetes.io/part-of"] != "kubernetes-mcp-server-run-sandbox" {
t.Errorf("invalid labels, expected app.kubernetes.io/part-of, got %v", labels)
return
}
})
t.Run("pods_run with image and nil namespace returns pod with nginx container", func(t *testing.T) {
containers := decodedNilNamespace[0].Object["spec"].(map[string]interface{})["containers"].([]interface{})
if containers[0].(map[string]interface{})["image"] != "nginx" {
t.Errorf("invalid container name, expected nginx, got %v", containers[0].(map[string]interface{})["image"])
return
}
})
podsRunNamespaceAndPort, err := c.callTool("pods_run", map[string]interface{}{"image": "nginx", "port": 80})
t.Run("pods_run with image, namespace, and port runs pod", func(t *testing.T) {
if err != nil {
t.Errorf("call tool failed %v", err)
return
}
if podsRunNamespaceAndPort.IsError {
t.Errorf("call tool failed")
return
}
})
var decodedNamespaceAndPort []unstructured.Unstructured
err = yaml.Unmarshal([]byte(podsRunNamespaceAndPort.Content[0].(mcp.TextContent).Text), &decodedNamespaceAndPort)
t.Run("pods_run with image, namespace, and port has yaml content", func(t *testing.T) {
if err != nil {
t.Errorf("invalid tool result content %v", err)
return
}
})
t.Run("pods_run with image, namespace, and port returns 2 items (Pod + Service)", func(t *testing.T) {
if len(decodedNamespaceAndPort) != 2 {
t.Errorf("invalid pods count, expected 2, got %v", len(decodedNamespaceAndPort))
return
}
if decodedNamespaceAndPort[0].GetKind() != "Pod" {
t.Errorf("invalid pod kind, expected Pod, got %v", decodedNamespaceAndPort[0].GetKind())
return
}
if decodedNamespaceAndPort[1].GetKind() != "Service" {
t.Errorf("invalid service kind, expected Service, got %v", decodedNamespaceAndPort[1].GetKind())
return
}
})
t.Run("pods_run with image, namespace, and port returns pod with port", func(t *testing.T) {
containers := decodedNamespaceAndPort[0].Object["spec"].(map[string]interface{})["containers"].([]interface{})
ports := containers[0].(map[string]interface{})["ports"].([]interface{})
if ports[0].(map[string]interface{})["containerPort"] != int64(80) {
t.Errorf("invalid container port, expected 80, got %v", ports[0].(map[string]interface{})["containerPort"])
return
}
})
t.Run("pods_run with image, namespace, and port returns service with port and selector", func(t *testing.T) {
ports := decodedNamespaceAndPort[1].Object["spec"].(map[string]interface{})["ports"].([]interface{})
if ports[0].(map[string]interface{})["port"] != int64(80) {
t.Errorf("invalid service port, expected 80, got %v", ports[0].(map[string]interface{})["port"])
return
}
if ports[0].(map[string]interface{})["targetPort"] != int64(80) {
t.Errorf("invalid service target port, expected 80, got %v", ports[0].(map[string]interface{})["targetPort"])
return
}
selector := decodedNamespaceAndPort[1].Object["spec"].(map[string]interface{})["selector"].(map[string]interface{})
if selector["app.kubernetes.io/name"] == "" {
t.Errorf("invalid service selector, expected app.kubernetes.io/name, got %v", selector)
return
}
if selector["app.kubernetes.io/managed-by"] != "kubernetes-mcp-server" {
t.Errorf("invalid service selector, expected app.kubernetes.io/managed-by, got %v", selector)
return
}
if selector["app.kubernetes.io/part-of"] != "kubernetes-mcp-server-run-sandbox" {
t.Errorf("invalid service selector, expected app.kubernetes.io/part-of, got %v", selector)
return
}
})
})
}
func TestPodsRunDenied(t *testing.T) {
deniedResourcesServer := test.Must(config.ReadToml([]byte(`
denied_resources = [ { version = "v1", kind = "Pod" } ]
`)))
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
c.withEnvTest()
podsRun, _ := c.callTool("pods_run", map[string]interface{}{"image": "nginx"})
t.Run("pods_run has error", func(t *testing.T) {
if !podsRun.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("pods_run describes denial", func(t *testing.T) {
expectedMessage := "failed to run pod in namespace : resource not allowed: /v1, Kind=Pod"
if podsRun.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, podsRun.Content[0].(mcp.TextContent).Text)
}
})
})
}
func TestPodsRunInOpenShift(t *testing.T) {
testCaseWithContext(t, &mcpContext{before: inOpenShift, after: inOpenShiftClear}, func(c *mcpContext) {
t.Run("pods_run with image, namespace, and port returns route with port", func(t *testing.T) {
podsRunInOpenShift, err := c.callTool("pods_run", map[string]interface{}{"image": "nginx", "port": 80})
if err != nil {
t.Errorf("call tool failed %v", err)
return
}
if podsRunInOpenShift.IsError {
t.Errorf("call tool failed")
return
}
var decodedPodServiceRoute []unstructured.Unstructured
err = yaml.Unmarshal([]byte(podsRunInOpenShift.Content[0].(mcp.TextContent).Text), &decodedPodServiceRoute)
if err != nil {
t.Errorf("invalid tool result content %v", err)
return
}
if len(decodedPodServiceRoute) != 3 {
t.Errorf("invalid pods count, expected 3, got %v", len(decodedPodServiceRoute))
return
}
if decodedPodServiceRoute[2].GetKind() != "Route" {
t.Errorf("invalid route kind, expected Route, got %v", decodedPodServiceRoute[2].GetKind())
return
}
targetPort := decodedPodServiceRoute[2].Object["spec"].(map[string]interface{})["port"].(map[string]interface{})["targetPort"].(int64)
if targetPort != 80 {
t.Errorf("invalid route target port, expected 80, got %v", targetPort)
return
}
})
})
}
func TestPodsListWithLabelSelector(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
kc := c.newKubernetesClient()
// Create pods with labels
_, _ = kc.CoreV1().Pods("default").Create(c.ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-with-labels",
Labels: map[string]string{"app": "test", "env": "dev"},
},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}}},
}, metav1.CreateOptions{})
_, _ = kc.CoreV1().Pods("ns-1").Create(c.ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "another-pod-with-labels",
Labels: map[string]string{"app": "test", "env": "prod"},
},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}}},
}, metav1.CreateOptions{})
// Test pods_list with label selector
t.Run("pods_list with label selector returns filtered pods", func(t *testing.T) {
toolResult, err := c.callTool("pods_list", map[string]interface{}{
"labelSelector": "app=test",
})
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if toolResult.IsError {
t.Fatalf("call tool failed")
return
}
var decoded []unstructured.Unstructured
err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded)
if err != nil {
t.Fatalf("invalid tool result content %v", err)
return
}
if len(decoded) != 2 {
t.Fatalf("invalid pods count, expected 2, got %v", len(decoded))
return
}
})
// Test pods_list_in_namespace with label selector
t.Run("pods_list_in_namespace with label selector returns filtered pods", func(t *testing.T) {
toolResult, err := c.callTool("pods_list_in_namespace", map[string]interface{}{
"namespace": "ns-1",
"labelSelector": "env=prod",
})
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if toolResult.IsError {
t.Fatalf("call tool failed")
return
}
var decoded []unstructured.Unstructured
err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded)
if err != nil {
t.Fatalf("invalid tool result content %v", err)
return
}
if len(decoded) != 1 {
t.Fatalf("invalid pods count, expected 1, got %v", len(decoded))
return
}
if decoded[0].GetName() != "another-pod-with-labels" {
t.Fatalf("invalid pod name, expected another-pod-with-labels, got %v", decoded[0].GetName())
return
}
})
// Test multiple label selectors
t.Run("pods_list with multiple label selectors returns filtered pods", func(t *testing.T) {
toolResult, err := c.callTool("pods_list", map[string]interface{}{
"labelSelector": "app=test,env=prod",
})
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if toolResult.IsError {
t.Fatalf("call tool failed")
return
}
var decoded []unstructured.Unstructured
err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded)
if err != nil {
t.Fatalf("invalid tool result content %v", err)
return
}
if len(decoded) != 1 {
t.Fatalf("invalid pods count, expected 1, got %v", len(decoded))
return
}
if decoded[0].GetName() != "another-pod-with-labels" {
t.Fatalf("invalid pod name, expected another-pod-with-labels, got %v", decoded[0].GetName())
return
}
})
})
}
```
--------------------------------------------------------------------------------
/pkg/http/http_test.go:
--------------------------------------------------------------------------------
```go
package http
import (
"bufio"
"bytes"
"context"
"crypto/rand"
"crypto/rsa"
"flag"
"fmt"
"io"
"net"
"net/http"
"net/http/httptest"
"os"
"regexp"
"strconv"
"strings"
"testing"
"time"
"github.com/containers/kubernetes-mcp-server/internal/test"
"github.com/coreos/go-oidc/v3/oidc"
"github.com/coreos/go-oidc/v3/oidc/oidctest"
"golang.org/x/sync/errgroup"
"k8s.io/klog/v2"
"k8s.io/klog/v2/textlogger"
"github.com/containers/kubernetes-mcp-server/pkg/config"
"github.com/containers/kubernetes-mcp-server/pkg/mcp"
)
type httpContext struct {
klogState klog.State
mockServer *test.MockServer
LogBuffer bytes.Buffer
HttpAddress string // HTTP server address
timeoutCancel context.CancelFunc // Release resources if test completes before the timeout
StopServer context.CancelFunc
WaitForShutdown func() error
StaticConfig *config.StaticConfig
OidcProvider *oidc.Provider
}
const tokenReviewSuccessful = `
{
"kind": "TokenReview",
"apiVersion": "authentication.k8s.io/v1",
"spec": {"token": "valid-token"},
"status": {
"authenticated": true,
"user": {
"username": "test-user",
"groups": ["system:authenticated"]
}
}
}`
func (c *httpContext) beforeEach(t *testing.T) {
t.Helper()
http.DefaultClient.Timeout = 10 * time.Second
if c.StaticConfig == nil {
c.StaticConfig = config.Default()
}
c.mockServer = test.NewMockServer()
// Fake Kubernetes configuration
c.StaticConfig.KubeConfig = c.mockServer.KubeconfigFile(t)
// Capture logging
c.klogState = klog.CaptureState()
flags := flag.NewFlagSet("test", flag.ContinueOnError)
klog.InitFlags(flags)
_ = flags.Set("v", "5")
klog.SetLogger(textlogger.NewLogger(textlogger.NewConfig(textlogger.Verbosity(5), textlogger.Output(&c.LogBuffer))))
// Start server in random port
ln, err := net.Listen("tcp", "0.0.0.0:0")
if err != nil {
t.Fatalf("Failed to find random port for HTTP server: %v", err)
}
c.HttpAddress = ln.Addr().String()
if randomPortErr := ln.Close(); randomPortErr != nil {
t.Fatalf("Failed to close random port listener: %v", randomPortErr)
}
c.StaticConfig.Port = fmt.Sprintf("%d", ln.Addr().(*net.TCPAddr).Port)
mcpServer, err := mcp.NewServer(mcp.Configuration{StaticConfig: c.StaticConfig})
if err != nil {
t.Fatalf("Failed to create MCP server: %v", err)
}
var timeoutCtx, cancelCtx context.Context
timeoutCtx, c.timeoutCancel = context.WithTimeout(t.Context(), 10*time.Second)
group, gc := errgroup.WithContext(timeoutCtx)
cancelCtx, c.StopServer = context.WithCancel(gc)
group.Go(func() error { return Serve(cancelCtx, mcpServer, c.StaticConfig, c.OidcProvider, nil) })
c.WaitForShutdown = group.Wait
// Wait for HTTP server to start (using net)
for i := 0; i < 10; i++ {
conn, err := net.Dial("tcp", c.HttpAddress)
if err == nil {
_ = conn.Close()
break
}
time.Sleep(50 * time.Millisecond) // Wait before retrying
}
}
func (c *httpContext) afterEach(t *testing.T) {
t.Helper()
c.mockServer.Close()
c.StopServer()
err := c.WaitForShutdown()
if err != nil {
t.Errorf("HTTP server did not shut down gracefully: %v", err)
}
c.timeoutCancel()
c.klogState.Restore()
_ = os.Setenv("KUBECONFIG", "")
}
func testCase(t *testing.T, test func(c *httpContext)) {
testCaseWithContext(t, &httpContext{}, test)
}
func testCaseWithContext(t *testing.T, httpCtx *httpContext, test func(c *httpContext)) {
httpCtx.beforeEach(t)
t.Cleanup(func() { httpCtx.afterEach(t) })
test(httpCtx)
}
type OidcTestServer struct {
*rsa.PrivateKey
*oidc.Provider
*httptest.Server
TokenEndpointHandler http.HandlerFunc
}
func NewOidcTestServer(t *testing.T) (oidcTestServer *OidcTestServer) {
t.Helper()
var err error
oidcTestServer = &OidcTestServer{}
oidcTestServer.PrivateKey, err = rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
t.Fatalf("failed to generate private key for oidc: %v", err)
}
oidcServer := &oidctest.Server{
Algorithms: []string{oidc.RS256, oidc.ES256},
PublicKeys: []oidctest.PublicKey{
{
PublicKey: oidcTestServer.Public(),
KeyID: "test-oidc-key-id",
Algorithm: oidc.RS256,
},
},
}
oidcTestServer.Server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/token" && oidcTestServer.TokenEndpointHandler != nil {
oidcTestServer.TokenEndpointHandler.ServeHTTP(w, r)
return
}
oidcServer.ServeHTTP(w, r)
}))
oidcServer.SetIssuer(oidcTestServer.URL)
oidcTestServer.Provider, err = oidc.NewProvider(t.Context(), oidcTestServer.URL)
if err != nil {
t.Fatalf("failed to create OIDC provider: %v", err)
}
return
}
func TestGracefulShutdown(t *testing.T) {
testCase(t, func(ctx *httpContext) {
ctx.StopServer()
err := ctx.WaitForShutdown()
t.Run("Stops gracefully", func(t *testing.T) {
if err != nil {
t.Errorf("Expected graceful shutdown, but got error: %v", err)
}
})
t.Run("Stops on context cancel", func(t *testing.T) {
if !strings.Contains(ctx.LogBuffer.String(), "Context cancelled, initiating graceful shutdown") {
t.Errorf("Context cancelled, initiating graceful shutdown, got: %s", ctx.LogBuffer.String())
}
})
t.Run("Starts server shutdown", func(t *testing.T) {
if !strings.Contains(ctx.LogBuffer.String(), "Shutting down HTTP server gracefully") {
t.Errorf("Expected graceful shutdown log, got: %s", ctx.LogBuffer.String())
}
})
t.Run("Server shutdown completes", func(t *testing.T) {
if !strings.Contains(ctx.LogBuffer.String(), "HTTP server shutdown complete") {
t.Errorf("Expected HTTP server shutdown completed log, got: %s", ctx.LogBuffer.String())
}
})
})
}
func TestSseTransport(t *testing.T) {
testCase(t, func(ctx *httpContext) {
sseResp, sseErr := http.Get(fmt.Sprintf("http://%s/sse", ctx.HttpAddress))
t.Cleanup(func() { _ = sseResp.Body.Close() })
t.Run("Exposes SSE endpoint at /sse", func(t *testing.T) {
if sseErr != nil {
t.Fatalf("Failed to get SSE endpoint: %v", sseErr)
}
if sseResp.StatusCode != http.StatusOK {
t.Errorf("Expected HTTP 200 OK, got %d", sseResp.StatusCode)
}
})
t.Run("SSE endpoint returns text/event-stream content type", func(t *testing.T) {
if sseResp.Header.Get("Content-Type") != "text/event-stream" {
t.Errorf("Expected Content-Type text/event-stream, got %s", sseResp.Header.Get("Content-Type"))
}
})
responseReader := bufio.NewReader(sseResp.Body)
event, eventErr := responseReader.ReadString('\n')
endpoint, endpointErr := responseReader.ReadString('\n')
t.Run("SSE endpoint returns stream with messages endpoint", func(t *testing.T) {
if eventErr != nil {
t.Fatalf("Failed to read SSE response body (event): %v", eventErr)
}
if event != "event: endpoint\n" {
t.Errorf("Expected SSE event 'endpoint', got %s", event)
}
if endpointErr != nil {
t.Fatalf("Failed to read SSE response body (endpoint): %v", endpointErr)
}
if !strings.HasPrefix(endpoint, "data: /message?sessionId=") {
t.Errorf("Expected SSE data: '/message', got %s", endpoint)
}
})
messageResp, messageErr := http.Post(
fmt.Sprintf("http://%s/message?sessionId=%s", ctx.HttpAddress, strings.TrimSpace(endpoint[25:])),
"application/json",
bytes.NewBufferString("{}"),
)
t.Cleanup(func() { _ = messageResp.Body.Close() })
t.Run("Exposes message endpoint at /message", func(t *testing.T) {
if messageErr != nil {
t.Fatalf("Failed to get message endpoint: %v", messageErr)
}
if messageResp.StatusCode != http.StatusAccepted {
t.Errorf("Expected HTTP 202 OK, got %d", messageResp.StatusCode)
}
})
})
}
func TestStreamableHttpTransport(t *testing.T) {
testCase(t, func(ctx *httpContext) {
mcpGetResp, mcpGetErr := http.Get(fmt.Sprintf("http://%s/mcp", ctx.HttpAddress))
t.Cleanup(func() { _ = mcpGetResp.Body.Close() })
t.Run("Exposes MCP GET endpoint at /mcp", func(t *testing.T) {
if mcpGetErr != nil {
t.Fatalf("Failed to get MCP endpoint: %v", mcpGetErr)
}
if mcpGetResp.StatusCode != http.StatusOK {
t.Errorf("Expected HTTP 200 OK, got %d", mcpGetResp.StatusCode)
}
})
t.Run("MCP GET endpoint returns text/event-stream content type", func(t *testing.T) {
if mcpGetResp.Header.Get("Content-Type") != "text/event-stream" {
t.Errorf("Expected Content-Type text/event-stream (GET), got %s", mcpGetResp.Header.Get("Content-Type"))
}
})
mcpPostResp, mcpPostErr := http.Post(fmt.Sprintf("http://%s/mcp", ctx.HttpAddress), "application/json", bytes.NewBufferString("{}"))
t.Cleanup(func() { _ = mcpPostResp.Body.Close() })
t.Run("Exposes MCP POST endpoint at /mcp", func(t *testing.T) {
if mcpPostErr != nil {
t.Fatalf("Failed to post to MCP endpoint: %v", mcpPostErr)
}
if mcpPostResp.StatusCode != http.StatusOK {
t.Errorf("Expected HTTP 200 OK, got %d", mcpPostResp.StatusCode)
}
})
t.Run("MCP POST endpoint returns application/json content type", func(t *testing.T) {
if mcpPostResp.Header.Get("Content-Type") != "application/json" {
t.Errorf("Expected Content-Type application/json (POST), got %s", mcpPostResp.Header.Get("Content-Type"))
}
})
})
}
func TestHealthCheck(t *testing.T) {
testCase(t, func(ctx *httpContext) {
t.Run("Exposes health check endpoint at /healthz", func(t *testing.T) {
resp, err := http.Get(fmt.Sprintf("http://%s/healthz", ctx.HttpAddress))
if err != nil {
t.Fatalf("Failed to get health check endpoint: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close })
if resp.StatusCode != http.StatusOK {
t.Errorf("Expected HTTP 200 OK, got %d", resp.StatusCode)
}
})
})
// Health exposed even when require Authorization
testCaseWithContext(t, &httpContext{StaticConfig: &config.StaticConfig{RequireOAuth: true, ValidateToken: true, ClusterProviderStrategy: config.ClusterProviderKubeConfig}}, func(ctx *httpContext) {
resp, err := http.Get(fmt.Sprintf("http://%s/healthz", ctx.HttpAddress))
if err != nil {
t.Fatalf("Failed to get health check endpoint with OAuth: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close() })
t.Run("Health check with OAuth returns HTTP 200 OK", func(t *testing.T) {
if resp.StatusCode != http.StatusOK {
t.Errorf("Expected HTTP 200 OK, got %d", resp.StatusCode)
}
})
})
}
func TestWellKnownReverseProxy(t *testing.T) {
cases := []string{
".well-known/oauth-authorization-server",
".well-known/oauth-protected-resource",
".well-known/openid-configuration",
}
// With No Authorization URL configured
testCaseWithContext(t, &httpContext{StaticConfig: &config.StaticConfig{RequireOAuth: true, ValidateToken: true, ClusterProviderStrategy: config.ClusterProviderKubeConfig}}, func(ctx *httpContext) {
for _, path := range cases {
resp, err := http.Get(fmt.Sprintf("http://%s/%s", ctx.HttpAddress, path))
t.Cleanup(func() { _ = resp.Body.Close() })
t.Run("Protected resource '"+path+"' without Authorization URL returns 404 - Not Found", func(t *testing.T) {
if err != nil {
t.Fatalf("Failed to get %s endpoint: %v", path, err)
}
if resp.StatusCode != http.StatusNotFound {
t.Errorf("Expected HTTP 404 Not Found, got %d", resp.StatusCode)
}
})
}
})
// With Authorization URL configured but invalid payload
invalidPayloadServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`NOT A JSON PAYLOAD`))
}))
t.Cleanup(invalidPayloadServer.Close)
invalidPayloadConfig := &config.StaticConfig{
AuthorizationURL: invalidPayloadServer.URL,
RequireOAuth: true,
ValidateToken: true,
ClusterProviderStrategy: config.ClusterProviderKubeConfig,
}
testCaseWithContext(t, &httpContext{StaticConfig: invalidPayloadConfig}, func(ctx *httpContext) {
for _, path := range cases {
resp, err := http.Get(fmt.Sprintf("http://%s/%s", ctx.HttpAddress, path))
t.Cleanup(func() { _ = resp.Body.Close() })
t.Run("Protected resource '"+path+"' with invalid Authorization URL payload returns 500 - Internal Server Error", func(t *testing.T) {
if err != nil {
t.Fatalf("Failed to get %s endpoint: %v", path, err)
}
if resp.StatusCode != http.StatusInternalServerError {
t.Errorf("Expected HTTP 500 Internal Server Error, got %d", resp.StatusCode)
}
})
}
})
// With Authorization URL configured and valid payload
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !strings.HasPrefix(r.URL.EscapedPath(), "/.well-known/") {
http.NotFound(w, r)
return
}
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"issuer": "https://example.com","scopes_supported":["mcp-server"]}`))
}))
t.Cleanup(testServer.Close)
staticConfig := &config.StaticConfig{
AuthorizationURL: testServer.URL,
RequireOAuth: true,
ValidateToken: true,
ClusterProviderStrategy: config.ClusterProviderKubeConfig,
}
testCaseWithContext(t, &httpContext{StaticConfig: staticConfig}, func(ctx *httpContext) {
for _, path := range cases {
resp, err := http.Get(fmt.Sprintf("http://%s/%s", ctx.HttpAddress, path))
t.Cleanup(func() { _ = resp.Body.Close() })
t.Run("Exposes "+path+" endpoint", func(t *testing.T) {
if err != nil {
t.Fatalf("Failed to get %s endpoint: %v", path, err)
}
if resp.StatusCode != http.StatusOK {
t.Errorf("Expected HTTP 200 OK, got %d", resp.StatusCode)
}
})
t.Run(path+" returns application/json content type", func(t *testing.T) {
if resp.Header.Get("Content-Type") != "application/json" {
t.Errorf("Expected Content-Type application/json, got %s", resp.Header.Get("Content-Type"))
}
})
}
})
}
func TestWellKnownHeaderPropagation(t *testing.T) {
cases := []string{
".well-known/oauth-authorization-server",
".well-known/oauth-protected-resource",
".well-known/openid-configuration",
}
var receivedRequestHeaders http.Header
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !strings.HasPrefix(r.URL.EscapedPath(), "/.well-known/") {
http.NotFound(w, r)
return
}
// Capture headers received from the proxy
receivedRequestHeaders = r.Header.Clone()
// Set response headers that should be propagated back
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "https://example.com")
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("X-Custom-Backend-Header", "backend-value")
_, _ = w.Write([]byte(`{"issuer": "https://example.com"}`))
}))
t.Cleanup(testServer.Close)
staticConfig := &config.StaticConfig{
AuthorizationURL: testServer.URL,
RequireOAuth: true,
ValidateToken: true,
ClusterProviderStrategy: config.ClusterProviderKubeConfig,
}
testCaseWithContext(t, &httpContext{StaticConfig: staticConfig}, func(ctx *httpContext) {
for _, path := range cases {
receivedRequestHeaders = nil
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/%s", ctx.HttpAddress, path), nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
// Add various headers to test propagation
req.Header.Set("Origin", "https://example.com")
req.Header.Set("User-Agent", "Test-Agent/1.0")
req.Header.Set("Accept", "application/json")
req.Header.Set("Accept-Language", "en-US")
req.Header.Set("X-Custom-Header", "custom-value")
req.Header.Set("Referer", "https://example.com/page")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to get %s endpoint: %v", path, err)
}
t.Cleanup(func() { _ = resp.Body.Close() })
t.Run("Well-known proxy propagates Origin header to backend for "+path, func(t *testing.T) {
if receivedRequestHeaders == nil {
t.Fatal("Backend did not receive any headers")
}
if receivedRequestHeaders.Get("Origin") != "https://example.com" {
t.Errorf("Expected Origin header 'https://example.com', got '%s'", receivedRequestHeaders.Get("Origin"))
}
})
t.Run("Well-known proxy propagates User-Agent header to backend for "+path, func(t *testing.T) {
if receivedRequestHeaders.Get("User-Agent") != "Test-Agent/1.0" {
t.Errorf("Expected User-Agent header 'Test-Agent/1.0', got '%s'", receivedRequestHeaders.Get("User-Agent"))
}
})
t.Run("Well-known proxy propagates Accept header to backend for "+path, func(t *testing.T) {
if receivedRequestHeaders.Get("Accept") != "application/json" {
t.Errorf("Expected Accept header 'application/json', got '%s'", receivedRequestHeaders.Get("Accept"))
}
})
t.Run("Well-known proxy propagates Accept-Language header to backend for "+path, func(t *testing.T) {
if receivedRequestHeaders.Get("Accept-Language") != "en-US" {
t.Errorf("Expected Accept-Language header 'en-US', got '%s'", receivedRequestHeaders.Get("Accept-Language"))
}
})
t.Run("Well-known proxy propagates custom headers to backend for "+path, func(t *testing.T) {
if receivedRequestHeaders.Get("X-Custom-Header") != "custom-value" {
t.Errorf("Expected X-Custom-Header 'custom-value', got '%s'", receivedRequestHeaders.Get("X-Custom-Header"))
}
})
t.Run("Well-known proxy propagates Referer header to backend for "+path, func(t *testing.T) {
if receivedRequestHeaders.Get("Referer") != "https://example.com/page" {
t.Errorf("Expected Referer header 'https://example.com/page', got '%s'", receivedRequestHeaders.Get("Referer"))
}
})
t.Run("Well-known proxy returns Access-Control-Allow-Origin from backend for "+path, func(t *testing.T) {
if resp.Header.Get("Access-Control-Allow-Origin") != "https://example.com" {
t.Errorf("Expected Access-Control-Allow-Origin header 'https://example.com', got '%s'", resp.Header.Get("Access-Control-Allow-Origin"))
}
})
t.Run("Well-known proxy returns Access-Control-Allow-Methods from backend for "+path, func(t *testing.T) {
if resp.Header.Get("Access-Control-Allow-Methods") != "GET, POST, OPTIONS" {
t.Errorf("Expected Access-Control-Allow-Methods header 'GET, POST, OPTIONS', got '%s'", resp.Header.Get("Access-Control-Allow-Methods"))
}
})
t.Run("Well-known proxy returns Cache-Control from backend for "+path, func(t *testing.T) {
if resp.Header.Get("Cache-Control") != "no-cache" {
t.Errorf("Expected Cache-Control header 'no-cache', got '%s'", resp.Header.Get("Cache-Control"))
}
})
t.Run("Well-known proxy returns custom response headers from backend for "+path, func(t *testing.T) {
if resp.Header.Get("X-Custom-Backend-Header") != "backend-value" {
t.Errorf("Expected X-Custom-Backend-Header 'backend-value', got '%s'", resp.Header.Get("X-Custom-Backend-Header"))
}
})
}
})
}
func TestWellKnownOverrides(t *testing.T) {
cases := []string{
".well-known/oauth-authorization-server",
".well-known/oauth-protected-resource",
".well-known/openid-configuration",
}
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !strings.HasPrefix(r.URL.EscapedPath(), "/.well-known/") {
http.NotFound(w, r)
return
}
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`
{
"issuer": "https://localhost",
"registration_endpoint": "https://localhost/clients-registrations/openid-connect",
"require_request_uri_registration": true,
"scopes_supported":["scope-1", "scope-2"]
}`))
}))
t.Cleanup(testServer.Close)
baseConfig := config.StaticConfig{
AuthorizationURL: testServer.URL,
RequireOAuth: true,
ValidateToken: true,
ClusterProviderStrategy: config.ClusterProviderKubeConfig,
}
// With Dynamic Client Registration disabled
disableDynamicRegistrationConfig := baseConfig
disableDynamicRegistrationConfig.DisableDynamicClientRegistration = true
testCaseWithContext(t, &httpContext{StaticConfig: &disableDynamicRegistrationConfig}, func(ctx *httpContext) {
for _, path := range cases {
resp, _ := http.Get(fmt.Sprintf("http://%s/%s", ctx.HttpAddress, path))
t.Cleanup(func() { _ = resp.Body.Close() })
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Failed to read response body: %v", err)
}
t.Run("DisableDynamicClientRegistration removes registration_endpoint field", func(t *testing.T) {
if strings.Contains(string(body), "registration_endpoint") {
t.Error("Expected registration_endpoint to be removed, but it was found in the response")
}
})
t.Run("DisableDynamicClientRegistration sets require_request_uri_registration = false", func(t *testing.T) {
if !strings.Contains(string(body), `"require_request_uri_registration":false`) {
t.Error("Expected require_request_uri_registration to be false, but it was not found in the response")
}
})
t.Run("DisableDynamicClientRegistration includes/preserves scopes_supported", func(t *testing.T) {
if !strings.Contains(string(body), `"scopes_supported":["scope-1","scope-2"]`) {
t.Error("Expected scopes_supported to be present, but it was not found in the response")
}
})
}
})
// With overrides for OAuth scopes (client/frontend)
oAuthScopesConfig := baseConfig
oAuthScopesConfig.OAuthScopes = []string{"openid", "mcp-server"}
testCaseWithContext(t, &httpContext{StaticConfig: &oAuthScopesConfig}, func(ctx *httpContext) {
for _, path := range cases {
resp, _ := http.Get(fmt.Sprintf("http://%s/%s", ctx.HttpAddress, path))
t.Cleanup(func() { _ = resp.Body.Close() })
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Failed to read response body: %v", err)
}
t.Run("OAuthScopes overrides scopes_supported", func(t *testing.T) {
if !strings.Contains(string(body), `"scopes_supported":["openid","mcp-server"]`) {
t.Errorf("Expected scopes_supported to be overridden, but original was preserved, response: %s", string(body))
}
})
t.Run("OAuthScopes preserves other fields", func(t *testing.T) {
if !strings.Contains(string(body), `"issuer":"https://localhost"`) {
t.Errorf("Expected issuer to be preserved, but got: %s", string(body))
}
if !strings.Contains(string(body), `"registration_endpoint":"https://localhost`) {
t.Errorf("Expected registration_endpoint to be preserved, but got: %s", string(body))
}
if !strings.Contains(string(body), `"require_request_uri_registration":true`) {
t.Error("Expected require_request_uri_registration to be true, but it was not found in the response")
}
})
}
})
}
func TestMiddlewareLogging(t *testing.T) {
testCase(t, func(ctx *httpContext) {
_, _ = http.Get(fmt.Sprintf("http://%s/.well-known/oauth-protected-resource", ctx.HttpAddress))
t.Run("Logs HTTP requests and responses", func(t *testing.T) {
if !strings.Contains(ctx.LogBuffer.String(), "GET /.well-known/oauth-protected-resource 404") {
t.Errorf("Expected log entry for GET /.well-known/oauth-protected-resource, got: %s", ctx.LogBuffer.String())
}
})
t.Run("Logs HTTP request duration", func(t *testing.T) {
expected := `"GET /.well-known/oauth-protected-resource 404 (.+)"`
m := regexp.MustCompile(expected).FindStringSubmatch(ctx.LogBuffer.String())
if len(m) != 2 {
t.Fatalf("Expected log entry to contain duration, got %s", ctx.LogBuffer.String())
}
duration, err := time.ParseDuration(m[1])
if err != nil {
t.Fatalf("Failed to parse duration from log entry: %v", err)
}
if duration < 0 {
t.Errorf("Expected duration to be non-negative, got %v", duration)
}
})
})
}
func TestAuthorizationUnauthorized(t *testing.T) {
// Missing Authorization header
testCaseWithContext(t, &httpContext{StaticConfig: &config.StaticConfig{RequireOAuth: true, ValidateToken: true, ClusterProviderStrategy: config.ClusterProviderKubeConfig}}, func(ctx *httpContext) {
resp, err := http.Get(fmt.Sprintf("http://%s/mcp", ctx.HttpAddress))
if err != nil {
t.Fatalf("Failed to get protected endpoint: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close })
t.Run("Protected resource with MISSING Authorization header returns 401 - Unauthorized", func(t *testing.T) {
if resp.StatusCode != 401 {
t.Errorf("Expected HTTP 401, got %d", resp.StatusCode)
}
})
t.Run("Protected resource with MISSING Authorization header returns WWW-Authenticate header", func(t *testing.T) {
authHeader := resp.Header.Get("WWW-Authenticate")
expected := `Bearer realm="Kubernetes MCP Server", error="missing_token"`
if authHeader != expected {
t.Errorf("Expected WWW-Authenticate header to be %q, got %q", expected, authHeader)
}
})
t.Run("Protected resource with MISSING Authorization header logs error", func(t *testing.T) {
if !strings.Contains(ctx.LogBuffer.String(), "Authentication failed - missing or invalid bearer token") {
t.Errorf("Expected log entry for missing or invalid bearer token, got: %s", ctx.LogBuffer.String())
}
})
})
// Authorization header without Bearer prefix
testCaseWithContext(t, &httpContext{StaticConfig: &config.StaticConfig{RequireOAuth: true, ValidateToken: true, ClusterProviderStrategy: config.ClusterProviderKubeConfig}}, func(ctx *httpContext) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/mcp", ctx.HttpAddress), nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Authorization", "Basic YWxhZGRpbjpvcGVuc2VzYW1l")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to get protected endpoint: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close })
t.Run("Protected resource with INCOMPATIBLE Authorization header returns WWW-Authenticate header", func(t *testing.T) {
authHeader := resp.Header.Get("WWW-Authenticate")
expected := `Bearer realm="Kubernetes MCP Server", error="missing_token"`
if authHeader != expected {
t.Errorf("Expected WWW-Authenticate header to be %q, got %q", expected, authHeader)
}
})
t.Run("Protected resource with INCOMPATIBLE Authorization header logs error", func(t *testing.T) {
if !strings.Contains(ctx.LogBuffer.String(), "Authentication failed - missing or invalid bearer token") {
t.Errorf("Expected log entry for missing or invalid bearer token, got: %s", ctx.LogBuffer.String())
}
})
})
// Invalid Authorization header
testCaseWithContext(t, &httpContext{StaticConfig: &config.StaticConfig{RequireOAuth: true, ValidateToken: true, ClusterProviderStrategy: config.ClusterProviderKubeConfig}}, func(ctx *httpContext) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/mcp", ctx.HttpAddress), nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Authorization", "Bearer "+strings.ReplaceAll(tokenBasicNotExpired, ".", ".invalid"))
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to get protected endpoint: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close })
t.Run("Protected resource with INVALID Authorization header returns 401 - Unauthorized", func(t *testing.T) {
if resp.StatusCode != 401 {
t.Errorf("Expected HTTP 401, got %d", resp.StatusCode)
}
})
t.Run("Protected resource with INVALID Authorization header returns WWW-Authenticate header", func(t *testing.T) {
authHeader := resp.Header.Get("WWW-Authenticate")
expected := `Bearer realm="Kubernetes MCP Server", error="invalid_token"`
if authHeader != expected {
t.Errorf("Expected WWW-Authenticate header to be %q, got %q", expected, authHeader)
}
})
t.Run("Protected resource with INVALID Authorization header logs error", func(t *testing.T) {
if !strings.Contains(ctx.LogBuffer.String(), "Authentication failed - JWT validation error") ||
!strings.Contains(ctx.LogBuffer.String(), "error: failed to parse JWT token: illegal base64 data") {
t.Errorf("Expected log entry for JWT validation error, got: %s", ctx.LogBuffer.String())
}
})
})
// Expired Authorization Bearer token
testCaseWithContext(t, &httpContext{StaticConfig: &config.StaticConfig{RequireOAuth: true, ValidateToken: true, ClusterProviderStrategy: config.ClusterProviderKubeConfig}}, func(ctx *httpContext) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/mcp", ctx.HttpAddress), nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Authorization", "Bearer "+tokenBasicExpired)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to get protected endpoint: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close })
t.Run("Protected resource with EXPIRED Authorization header returns 401 - Unauthorized", func(t *testing.T) {
if resp.StatusCode != 401 {
t.Errorf("Expected HTTP 401, got %d", resp.StatusCode)
}
})
t.Run("Protected resource with EXPIRED Authorization header returns WWW-Authenticate header", func(t *testing.T) {
authHeader := resp.Header.Get("WWW-Authenticate")
expected := `Bearer realm="Kubernetes MCP Server", error="invalid_token"`
if authHeader != expected {
t.Errorf("Expected WWW-Authenticate header to be %q, got %q", expected, authHeader)
}
})
t.Run("Protected resource with EXPIRED Authorization header logs error", func(t *testing.T) {
if !strings.Contains(ctx.LogBuffer.String(), "Authentication failed - JWT validation error") ||
!strings.Contains(ctx.LogBuffer.String(), "validation failed, token is expired (exp)") {
t.Errorf("Expected log entry for JWT validation error, got: %s", ctx.LogBuffer.String())
}
})
})
// Invalid audience claim Bearer token
testCaseWithContext(t, &httpContext{StaticConfig: &config.StaticConfig{RequireOAuth: true, OAuthAudience: "expected-audience", ValidateToken: true, ClusterProviderStrategy: config.ClusterProviderKubeConfig}}, func(ctx *httpContext) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/mcp", ctx.HttpAddress), nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Authorization", "Bearer "+tokenBasicExpired)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to get protected endpoint: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close })
t.Run("Protected resource with INVALID AUDIENCE Authorization header returns 401 - Unauthorized", func(t *testing.T) {
if resp.StatusCode != 401 {
t.Errorf("Expected HTTP 401, got %d", resp.StatusCode)
}
})
t.Run("Protected resource with INVALID AUDIENCE Authorization header returns WWW-Authenticate header", func(t *testing.T) {
authHeader := resp.Header.Get("WWW-Authenticate")
expected := `Bearer realm="Kubernetes MCP Server", audience="expected-audience", error="invalid_token"`
if authHeader != expected {
t.Errorf("Expected WWW-Authenticate header to be %q, got %q", expected, authHeader)
}
})
t.Run("Protected resource with INVALID AUDIENCE Authorization header logs error", func(t *testing.T) {
if !strings.Contains(ctx.LogBuffer.String(), "Authentication failed - JWT validation error") ||
!strings.Contains(ctx.LogBuffer.String(), "invalid audience claim (aud)") {
t.Errorf("Expected log entry for JWT validation error, got: %s", ctx.LogBuffer.String())
}
})
})
// Failed OIDC validation
oidcTestServer := NewOidcTestServer(t)
t.Cleanup(oidcTestServer.Close)
testCaseWithContext(t, &httpContext{StaticConfig: &config.StaticConfig{RequireOAuth: true, OAuthAudience: "mcp-server", ValidateToken: true, ClusterProviderStrategy: config.ClusterProviderKubeConfig}, OidcProvider: oidcTestServer.Provider}, func(ctx *httpContext) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/mcp", ctx.HttpAddress), nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Authorization", "Bearer "+tokenBasicNotExpired)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to get protected endpoint: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close })
t.Run("Protected resource with INVALID OIDC Authorization header returns 401 - Unauthorized", func(t *testing.T) {
if resp.StatusCode != 401 {
t.Errorf("Expected HTTP 401, got %d", resp.StatusCode)
}
})
t.Run("Protected resource with INVALID OIDC Authorization header returns WWW-Authenticate header", func(t *testing.T) {
authHeader := resp.Header.Get("WWW-Authenticate")
expected := `Bearer realm="Kubernetes MCP Server", audience="mcp-server", error="invalid_token"`
if authHeader != expected {
t.Errorf("Expected WWW-Authenticate header to be %q, got %q", expected, authHeader)
}
})
t.Run("Protected resource with INVALID OIDC Authorization header logs error", func(t *testing.T) {
if !strings.Contains(ctx.LogBuffer.String(), "Authentication failed - JWT validation error") ||
!strings.Contains(ctx.LogBuffer.String(), "OIDC token validation error: failed to verify signature") {
t.Errorf("Expected log entry for OIDC validation error, got: %s", ctx.LogBuffer.String())
}
})
})
// Failed Kubernetes TokenReview
rawClaims := `{
"iss": "` + oidcTestServer.URL + `",
"exp": ` + strconv.FormatInt(time.Now().Add(time.Hour).Unix(), 10) + `,
"aud": "mcp-server"
}`
validOidcToken := oidctest.SignIDToken(oidcTestServer.PrivateKey, "test-oidc-key-id", oidc.RS256, rawClaims)
testCaseWithContext(t, &httpContext{StaticConfig: &config.StaticConfig{RequireOAuth: true, OAuthAudience: "mcp-server", ValidateToken: true, ClusterProviderStrategy: config.ClusterProviderKubeConfig}, OidcProvider: oidcTestServer.Provider}, func(ctx *httpContext) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/mcp", ctx.HttpAddress), nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Authorization", "Bearer "+validOidcToken)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to get protected endpoint: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close })
t.Run("Protected resource with INVALID KUBERNETES Authorization header returns 401 - Unauthorized", func(t *testing.T) {
if resp.StatusCode != 401 {
t.Errorf("Expected HTTP 401, got %d", resp.StatusCode)
}
})
t.Run("Protected resource with INVALID KUBERNETES Authorization header returns WWW-Authenticate header", func(t *testing.T) {
authHeader := resp.Header.Get("WWW-Authenticate")
expected := `Bearer realm="Kubernetes MCP Server", audience="mcp-server", error="invalid_token"`
if authHeader != expected {
t.Errorf("Expected WWW-Authenticate header to be %q, got %q", expected, authHeader)
}
})
t.Run("Protected resource with INVALID KUBERNETES Authorization header logs error", func(t *testing.T) {
if !strings.Contains(ctx.LogBuffer.String(), "Authentication failed - JWT validation error") ||
!strings.Contains(ctx.LogBuffer.String(), "kubernetes API token validation error: failed to create token review") {
t.Errorf("Expected log entry for Kubernetes TokenReview error, got: %s", ctx.LogBuffer.String())
}
})
})
}
func TestAuthorizationRequireOAuthFalse(t *testing.T) {
testCaseWithContext(t, &httpContext{StaticConfig: &config.StaticConfig{RequireOAuth: false, ClusterProviderStrategy: config.ClusterProviderKubeConfig}}, func(ctx *httpContext) {
resp, err := http.Get(fmt.Sprintf("http://%s/mcp", ctx.HttpAddress))
if err != nil {
t.Fatalf("Failed to get protected endpoint: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close() })
t.Run("Protected resource with MISSING Authorization header returns 200 - OK)", func(t *testing.T) {
if resp.StatusCode != http.StatusOK {
t.Errorf("Expected HTTP 200 OK, got %d", resp.StatusCode)
}
})
})
}
func TestAuthorizationRawToken(t *testing.T) {
cases := []struct {
audience string
validateToken bool
}{
{"", false}, // No audience, no validation
{"", true}, // No audience, validation enabled
{"mcp-server", false}, // Audience set, no validation
{"mcp-server", true}, // Audience set, validation enabled
}
for _, c := range cases {
testCaseWithContext(t, &httpContext{StaticConfig: &config.StaticConfig{RequireOAuth: true, OAuthAudience: c.audience, ValidateToken: c.validateToken, ClusterProviderStrategy: config.ClusterProviderKubeConfig}}, func(ctx *httpContext) {
tokenReviewed := false
ctx.mockServer.Handle(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.EscapedPath() == "/apis/authentication.k8s.io/v1/tokenreviews" {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(tokenReviewSuccessful))
tokenReviewed = true
return
}
}))
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/mcp", ctx.HttpAddress), nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Authorization", "Bearer "+tokenBasicNotExpired)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to get protected endpoint: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close() })
t.Run(fmt.Sprintf("Protected resource with audience = '%s' and validate-token = '%t', with VALID Authorization header returns 200 - OK", c.audience, c.validateToken), func(t *testing.T) {
if resp.StatusCode != http.StatusOK {
t.Errorf("Expected HTTP 200 OK, got %d", resp.StatusCode)
}
})
t.Run(fmt.Sprintf("Protected resource with audience = '%s' and validate-token = '%t', with VALID Authorization header performs token validation accordingly", c.audience, c.validateToken), func(t *testing.T) {
if tokenReviewed == true && !c.validateToken {
t.Errorf("Expected token review to be skipped when validate-token is false, but it was performed")
}
if tokenReviewed == false && c.validateToken {
t.Errorf("Expected token review to be performed when validate-token is true, but it was skipped")
}
})
})
}
}
func TestAuthorizationOidcToken(t *testing.T) {
oidcTestServer := NewOidcTestServer(t)
t.Cleanup(oidcTestServer.Close)
rawClaims := `{
"iss": "` + oidcTestServer.URL + `",
"exp": ` + strconv.FormatInt(time.Now().Add(time.Hour).Unix(), 10) + `,
"aud": "mcp-server"
}`
validOidcToken := oidctest.SignIDToken(oidcTestServer.PrivateKey, "test-oidc-key-id", oidc.RS256, rawClaims)
cases := []bool{false, true}
for _, validateToken := range cases {
testCaseWithContext(t, &httpContext{StaticConfig: &config.StaticConfig{RequireOAuth: true, OAuthAudience: "mcp-server", ValidateToken: validateToken, ClusterProviderStrategy: config.ClusterProviderKubeConfig}, OidcProvider: oidcTestServer.Provider}, func(ctx *httpContext) {
tokenReviewed := false
ctx.mockServer.Handle(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.EscapedPath() == "/apis/authentication.k8s.io/v1/tokenreviews" {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(tokenReviewSuccessful))
tokenReviewed = true
return
}
}))
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/mcp", ctx.HttpAddress), nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Authorization", "Bearer "+validOidcToken)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to get protected endpoint: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close() })
t.Run(fmt.Sprintf("Protected resource with validate-token='%t' with VALID OIDC Authorization header returns 200 - OK", validateToken), func(t *testing.T) {
if resp.StatusCode != http.StatusOK {
t.Errorf("Expected HTTP 200 OK, got %d", resp.StatusCode)
}
})
t.Run(fmt.Sprintf("Protected resource with validate-token='%t' with VALID OIDC Authorization header performs token validation accordingly", validateToken), func(t *testing.T) {
if tokenReviewed == true && !validateToken {
t.Errorf("Expected token review to be skipped when validate-token is false, but it was performed")
}
if tokenReviewed == false && validateToken {
t.Errorf("Expected token review to be performed when validate-token is true, but it was skipped")
}
})
})
}
}
func TestAuthorizationOidcTokenExchange(t *testing.T) {
oidcTestServer := NewOidcTestServer(t)
t.Cleanup(oidcTestServer.Close)
rawClaims := `{
"iss": "` + oidcTestServer.URL + `",
"exp": ` + strconv.FormatInt(time.Now().Add(time.Hour).Unix(), 10) + `,
"aud": "%s"
}`
validOidcClientToken := oidctest.SignIDToken(oidcTestServer.PrivateKey, "test-oidc-key-id", oidc.RS256,
fmt.Sprintf(rawClaims, "mcp-server"))
validOidcBackendToken := oidctest.SignIDToken(oidcTestServer.PrivateKey, "test-oidc-key-id", oidc.RS256,
fmt.Sprintf(rawClaims, "backend-audience"))
oidcTestServer.TokenEndpointHandler = func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = fmt.Fprintf(w, `{"access_token":"%s","token_type":"Bearer","expires_in":253402297199}`, validOidcBackendToken)
}
cases := []bool{false, true}
for _, validateToken := range cases {
staticConfig := &config.StaticConfig{
RequireOAuth: true,
OAuthAudience: "mcp-server",
ValidateToken: validateToken,
StsClientId: "test-sts-client-id",
StsClientSecret: "test-sts-client-secret",
StsAudience: "backend-audience",
StsScopes: []string{"backend-scope"},
ClusterProviderStrategy: config.ClusterProviderKubeConfig,
}
testCaseWithContext(t, &httpContext{StaticConfig: staticConfig, OidcProvider: oidcTestServer.Provider}, func(ctx *httpContext) {
tokenReviewed := false
ctx.mockServer.Handle(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.EscapedPath() == "/apis/authentication.k8s.io/v1/tokenreviews" {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(tokenReviewSuccessful))
tokenReviewed = true
return
}
}))
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/mcp", ctx.HttpAddress), nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Authorization", "Bearer "+validOidcClientToken)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to get protected endpoint: %v", err)
}
t.Cleanup(func() { _ = resp.Body.Close() })
t.Run(fmt.Sprintf("Protected resource with validate-token='%t' with VALID OIDC EXCHANGE Authorization header returns 200 - OK", validateToken), func(t *testing.T) {
if resp.StatusCode != http.StatusOK {
t.Errorf("Expected HTTP 200 OK, got %d", resp.StatusCode)
}
})
t.Run(fmt.Sprintf("Protected resource with validate-token='%t' with VALID OIDC EXCHANGE Authorization header performs token validation accordingly", validateToken), func(t *testing.T) {
if tokenReviewed == true && !validateToken {
t.Errorf("Expected token review to be skipped when validate-token is false, but it was performed")
}
if tokenReviewed == false && validateToken {
t.Errorf("Expected token review to be performed when validate-token is true, but it was skipped")
}
})
})
}
}
```