This is page 2 of 4. Use http://codebase.md/manusa/kubernetes-mcp-server?page={x} to view the full context.
# Directory Structure
```
├── .github
│ ├── dependabot.yml
│ └── workflows
│ ├── build.yaml
│ ├── release-image.yml
│ └── release.yaml
├── .gitignore
├── AGENTS.md
├── build
│ ├── keycloak.mk
│ ├── kind.mk
│ └── tools.mk
├── CLAUDE.md
├── cmd
│ └── kubernetes-mcp-server
│ ├── main_test.go
│ └── main.go
├── dev
│ └── config
│ ├── cert-manager
│ │ └── selfsigned-issuer.yaml
│ ├── ingress
│ │ └── nginx-ingress.yaml
│ ├── keycloak
│ │ ├── client-scopes
│ │ │ ├── groups.json
│ │ │ ├── mcp-openshift.json
│ │ │ └── mcp-server.json
│ │ ├── clients
│ │ │ ├── mcp-client.json
│ │ │ ├── mcp-server-update.json
│ │ │ ├── mcp-server.json
│ │ │ └── openshift.json
│ │ ├── deployment.yaml
│ │ ├── ingress.yaml
│ │ ├── mappers
│ │ │ ├── groups-membership.json
│ │ │ ├── mcp-server-audience.json
│ │ │ ├── openshift-audience.json
│ │ │ └── username.json
│ │ ├── rbac.yaml
│ │ ├── realm
│ │ │ ├── realm-create.json
│ │ │ └── realm-events-config.json
│ │ └── users
│ │ └── mcp.json
│ └── kind
│ └── cluster.yaml
├── Dockerfile
├── docs
│ └── images
│ ├── kubernetes-mcp-server-github-copilot.jpg
│ └── vibe-coding.jpg
├── go.mod
├── go.sum
├── hack
│ └── generate-placeholder-ca.sh
├── internal
│ ├── test
│ │ ├── env.go
│ │ ├── kubernetes.go
│ │ ├── mcp.go
│ │ ├── mock_server.go
│ │ └── test.go
│ └── tools
│ └── update-readme
│ └── main.go
├── LICENSE
├── Makefile
├── npm
│ ├── kubernetes-mcp-server
│ │ ├── bin
│ │ │ └── index.js
│ │ └── package.json
│ ├── kubernetes-mcp-server-darwin-amd64
│ │ └── package.json
│ ├── kubernetes-mcp-server-darwin-arm64
│ │ └── package.json
│ ├── kubernetes-mcp-server-linux-amd64
│ │ └── package.json
│ ├── kubernetes-mcp-server-linux-arm64
│ │ └── package.json
│ ├── kubernetes-mcp-server-windows-amd64
│ │ └── package.json
│ └── kubernetes-mcp-server-windows-arm64
│ └── package.json
├── pkg
│ ├── api
│ │ ├── toolsets_test.go
│ │ └── toolsets.go
│ ├── config
│ │ ├── config_default_overrides.go
│ │ ├── config_default.go
│ │ ├── config_test.go
│ │ ├── config.go
│ │ ├── provider_config_test.go
│ │ └── provider_config.go
│ ├── helm
│ │ └── helm.go
│ ├── http
│ │ ├── authorization_test.go
│ │ ├── authorization.go
│ │ ├── http_test.go
│ │ ├── http.go
│ │ ├── middleware.go
│ │ ├── sts_test.go
│ │ ├── sts.go
│ │ └── wellknown.go
│ ├── kubernetes
│ │ ├── accesscontrol_clientset.go
│ │ ├── accesscontrol_restmapper.go
│ │ ├── accesscontrol.go
│ │ ├── common_test.go
│ │ ├── configuration.go
│ │ ├── events.go
│ │ ├── impersonate_roundtripper.go
│ │ ├── kubernetes_derived_test.go
│ │ ├── kubernetes.go
│ │ ├── manager_test.go
│ │ ├── manager.go
│ │ ├── namespaces.go
│ │ ├── nodes.go
│ │ ├── openshift.go
│ │ ├── pods.go
│ │ ├── provider_kubeconfig_test.go
│ │ ├── provider_kubeconfig.go
│ │ ├── provider_registry_test.go
│ │ ├── provider_registry.go
│ │ ├── provider_single_test.go
│ │ ├── provider_single.go
│ │ ├── provider_test.go
│ │ ├── provider.go
│ │ ├── resources.go
│ │ └── token.go
│ ├── kubernetes-mcp-server
│ │ └── cmd
│ │ ├── root_test.go
│ │ ├── root.go
│ │ └── testdata
│ │ ├── empty-config.toml
│ │ └── valid-config.toml
│ ├── mcp
│ │ ├── common_test.go
│ │ ├── configuration_test.go
│ │ ├── events_test.go
│ │ ├── helm_test.go
│ │ ├── m3labs.go
│ │ ├── mcp_middleware_test.go
│ │ ├── mcp_test.go
│ │ ├── mcp_tools_test.go
│ │ ├── mcp.go
│ │ ├── modules.go
│ │ ├── namespaces_test.go
│ │ ├── nodes_test.go
│ │ ├── pods_exec_test.go
│ │ ├── pods_test.go
│ │ ├── pods_top_test.go
│ │ ├── resources_test.go
│ │ ├── testdata
│ │ │ ├── helm-chart-no-op
│ │ │ │ └── Chart.yaml
│ │ │ ├── helm-chart-secret
│ │ │ │ ├── Chart.yaml
│ │ │ │ └── templates
│ │ │ │ └── secret.yaml
│ │ │ ├── toolsets-config-tools.json
│ │ │ ├── toolsets-core-tools.json
│ │ │ ├── toolsets-full-tools-multicluster-enum.json
│ │ │ ├── toolsets-full-tools-multicluster.json
│ │ │ ├── toolsets-full-tools-openshift.json
│ │ │ ├── toolsets-full-tools.json
│ │ │ └── toolsets-helm-tools.json
│ │ ├── tool_filter_test.go
│ │ ├── tool_filter.go
│ │ ├── tool_mutator_test.go
│ │ ├── tool_mutator.go
│ │ └── toolsets_test.go
│ ├── output
│ │ ├── output_test.go
│ │ └── output.go
│ ├── toolsets
│ │ ├── config
│ │ │ ├── configuration.go
│ │ │ └── toolset.go
│ │ ├── core
│ │ │ ├── events.go
│ │ │ ├── namespaces.go
│ │ │ ├── nodes.go
│ │ │ ├── pods.go
│ │ │ ├── resources.go
│ │ │ └── toolset.go
│ │ ├── helm
│ │ │ ├── helm.go
│ │ │ └── toolset.go
│ │ ├── toolsets_test.go
│ │ └── toolsets.go
│ └── version
│ └── version.go
├── python
│ ├── kubernetes_mcp_server
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ └── kubernetes_mcp_server.py
│ ├── pyproject.toml
│ └── README.md
├── README.md
└── smithery.yaml
```
# Files
--------------------------------------------------------------------------------
/internal/test/mock_server.go:
--------------------------------------------------------------------------------
```go
package test
import (
"encoding/json"
"errors"
"io"
"net/http"
"net/http/httptest"
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/httpstream/spdy"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api"
)
type MockServer struct {
server *httptest.Server
config *rest.Config
restHandlers []http.HandlerFunc
}
func NewMockServer() *MockServer {
ms := &MockServer{}
scheme := runtime.NewScheme()
codecs := serializer.NewCodecFactory(scheme)
ms.server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
for _, handler := range ms.restHandlers {
handler(w, req)
}
}))
ms.config = &rest.Config{
Host: ms.server.URL,
APIPath: "/api",
ContentConfig: rest.ContentConfig{
NegotiatedSerializer: codecs,
ContentType: runtime.ContentTypeJSON,
GroupVersion: &v1.SchemeGroupVersion,
},
}
ms.restHandlers = make([]http.HandlerFunc, 0)
return ms
}
func (m *MockServer) Close() {
if m.server != nil {
m.server.Close()
}
}
func (m *MockServer) Handle(handler http.Handler) {
m.restHandlers = append(m.restHandlers, handler.ServeHTTP)
}
func (m *MockServer) Config() *rest.Config {
return m.config
}
func (m *MockServer) Kubeconfig() *api.Config {
fakeConfig := KubeConfigFake()
fakeConfig.Clusters["fake"].Server = m.config.Host
fakeConfig.Clusters["fake"].CertificateAuthorityData = m.config.CAData
fakeConfig.AuthInfos["fake"].ClientKeyData = m.config.KeyData
fakeConfig.AuthInfos["fake"].ClientCertificateData = m.config.CertData
return fakeConfig
}
func (m *MockServer) KubeconfigFile(t *testing.T) string {
return KubeconfigFile(t, m.Kubeconfig())
}
func KubeconfigFile(t *testing.T, kubeconfig *api.Config) string {
kubeconfigFile := filepath.Join(t.TempDir(), "config")
err := clientcmd.WriteToFile(*kubeconfig, kubeconfigFile)
require.NoError(t, err, "Expected no error writing kubeconfig file")
return kubeconfigFile
}
func WriteObject(w http.ResponseWriter, obj runtime.Object) {
w.Header().Set("Content-Type", runtime.ContentTypeJSON)
if err := json.NewEncoder(w).Encode(obj); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
type streamAndReply struct {
httpstream.Stream
replySent <-chan struct{}
}
type StreamContext struct {
Closer io.Closer
StdinStream io.ReadCloser
StdoutStream io.WriteCloser
StderrStream io.WriteCloser
writeStatus func(status *apierrors.StatusError) error
}
type StreamOptions struct {
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
}
func v4WriteStatusFunc(stream io.Writer) func(status *apierrors.StatusError) error {
return func(status *apierrors.StatusError) error {
bs, err := json.Marshal(status.Status())
if err != nil {
return err
}
_, err = stream.Write(bs)
return err
}
}
func CreateHTTPStreams(w http.ResponseWriter, req *http.Request, opts *StreamOptions) (*StreamContext, error) {
_, err := httpstream.Handshake(req, w, []string{"v4.channel.k8s.io"})
if err != nil {
return nil, err
}
upgrader := spdy.NewResponseUpgrader()
streamCh := make(chan streamAndReply)
connection := upgrader.UpgradeResponse(w, req, func(stream httpstream.Stream, replySent <-chan struct{}) error {
streamCh <- streamAndReply{Stream: stream, replySent: replySent}
return nil
})
ctx := &StreamContext{
Closer: connection,
}
// wait for stream
replyChan := make(chan struct{}, 4)
defer close(replyChan)
receivedStreams := 0
expectedStreams := 1
if opts.Stdout != nil {
expectedStreams++
}
if opts.Stdin != nil {
expectedStreams++
}
if opts.Stderr != nil {
expectedStreams++
}
WaitForStreams:
for {
select {
case stream := <-streamCh:
streamType := stream.Headers().Get(v1.StreamType)
switch streamType {
case v1.StreamTypeError:
replyChan <- struct{}{}
ctx.writeStatus = v4WriteStatusFunc(stream)
case v1.StreamTypeStdout:
replyChan <- struct{}{}
ctx.StdoutStream = stream
case v1.StreamTypeStdin:
replyChan <- struct{}{}
ctx.StdinStream = stream
case v1.StreamTypeStderr:
replyChan <- struct{}{}
ctx.StderrStream = stream
default:
// add other stream ...
return nil, errors.New("unimplemented stream type")
}
case <-replyChan:
receivedStreams++
if receivedStreams == expectedStreams {
break WaitForStreams
}
}
}
return ctx, nil
}
type InOpenShiftHandler struct {
}
var _ http.Handler = (*InOpenShiftHandler)(nil)
func (h *InOpenShiftHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Type", "application/json")
// Request Performed by DiscoveryClient to Kube API (Get API Groups legacy -core-)
if req.URL.Path == "/api" {
_, _ = w.Write([]byte(`{"kind":"APIVersions","versions":[],"serverAddressByClientCIDRs":[{"clientCIDR":"0.0.0.0/0"}]}`))
return
}
// Request Performed by DiscoveryClient to Kube API (Get API Groups)
if req.URL.Path == "/apis" {
_, _ = w.Write([]byte(`{
"kind":"APIGroupList",
"groups":[{
"name":"project.openshift.io",
"versions":[{"groupVersion":"project.openshift.io/v1","version":"v1"}],
"preferredVersion":{"groupVersion":"project.openshift.io/v1","version":"v1"}
}]}`))
return
}
if req.URL.Path == "/apis/project.openshift.io/v1" {
_, _ = w.Write([]byte(`{
"kind":"APIResourceList",
"apiVersion":"v1",
"groupVersion":"project.openshift.io/v1",
"resources":[
{"name":"projects","singularName":"","namespaced":false,"kind":"Project","verbs":["create","delete","get","list","patch","update","watch"],"shortNames":["pr"]}
]}`))
return
}
}
```
--------------------------------------------------------------------------------
/pkg/kubernetes/accesscontrol_clientset.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import (
"context"
"fmt"
authenticationv1api "k8s.io/api/authentication/v1"
authorizationv1api "k8s.io/api/authorization/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes"
authenticationv1 "k8s.io/client-go/kubernetes/typed/authentication/v1"
authorizationv1 "k8s.io/client-go/kubernetes/typed/authorization/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/metrics/pkg/apis/metrics"
metricsv1beta1api "k8s.io/metrics/pkg/apis/metrics/v1beta1"
metricsv1beta1 "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1"
"github.com/containers/kubernetes-mcp-server/pkg/config"
)
// AccessControlClientset is a limited clientset delegating interface to the standard kubernetes.Clientset
// Only a limited set of functions are implemented with a single point of access to the kubernetes API where
// apiVersion and kinds are checked for allowed access
type AccessControlClientset struct {
cfg *rest.Config
delegate kubernetes.Interface
discoveryClient discovery.DiscoveryInterface
metricsV1beta1 *metricsv1beta1.MetricsV1beta1Client
staticConfig *config.StaticConfig // TODO: maybe just store the denied resource slice
}
func (a *AccessControlClientset) DiscoveryClient() discovery.DiscoveryInterface {
return a.discoveryClient
}
func (a *AccessControlClientset) NodesLogs(ctx context.Context, name, logPath string) (*rest.Request, error) {
gvk := &schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Node"}
if !isAllowed(a.staticConfig, gvk) {
return nil, isNotAllowedError(gvk)
}
if _, err := a.delegate.CoreV1().Nodes().Get(ctx, name, metav1.GetOptions{}); err != nil {
return nil, fmt.Errorf("failed to get node %s: %w", name, err)
}
url := []string{"api", "v1", "nodes", name, "proxy", "logs", logPath}
return a.delegate.CoreV1().RESTClient().
Get().
AbsPath(url...), nil
}
func (a *AccessControlClientset) Pods(namespace string) (corev1.PodInterface, error) {
gvk := &schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}
if !isAllowed(a.staticConfig, gvk) {
return nil, isNotAllowedError(gvk)
}
return a.delegate.CoreV1().Pods(namespace), nil
}
func (a *AccessControlClientset) PodsExec(namespace, name string, podExecOptions *v1.PodExecOptions) (remotecommand.Executor, error) {
gvk := &schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}
if !isAllowed(a.staticConfig, gvk) {
return nil, isNotAllowedError(gvk)
}
// Compute URL
// https://github.com/kubernetes/kubectl/blob/5366de04e168bcbc11f5e340d131a9ca8b7d0df4/pkg/cmd/exec/exec.go#L382-L397
execRequest := a.delegate.CoreV1().RESTClient().
Post().
Resource("pods").
Namespace(namespace).
Name(name).
SubResource("exec")
execRequest.VersionedParams(podExecOptions, ParameterCodec)
spdyExec, err := remotecommand.NewSPDYExecutor(a.cfg, "POST", execRequest.URL())
if err != nil {
return nil, err
}
webSocketExec, err := remotecommand.NewWebSocketExecutor(a.cfg, "GET", execRequest.URL().String())
if err != nil {
return nil, err
}
return remotecommand.NewFallbackExecutor(webSocketExec, spdyExec, func(err error) bool {
return httpstream.IsUpgradeFailure(err) || httpstream.IsHTTPSProxyError(err)
})
}
func (a *AccessControlClientset) PodsMetricses(ctx context.Context, namespace, name string, listOptions metav1.ListOptions) (*metrics.PodMetricsList, error) {
gvk := &schema.GroupVersionKind{Group: metrics.GroupName, Version: metricsv1beta1api.SchemeGroupVersion.Version, Kind: "PodMetrics"}
if !isAllowed(a.staticConfig, gvk) {
return nil, isNotAllowedError(gvk)
}
versionedMetrics := &metricsv1beta1api.PodMetricsList{}
var err error
if name != "" {
m, err := a.metricsV1beta1.PodMetricses(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get metrics for pod %s/%s: %w", namespace, name, err)
}
versionedMetrics.Items = []metricsv1beta1api.PodMetrics{*m}
} else {
versionedMetrics, err = a.metricsV1beta1.PodMetricses(namespace).List(ctx, listOptions)
if err != nil {
return nil, fmt.Errorf("failed to list pod metrics in namespace %s: %w", namespace, err)
}
}
convertedMetrics := &metrics.PodMetricsList{}
return convertedMetrics, metricsv1beta1api.Convert_v1beta1_PodMetricsList_To_metrics_PodMetricsList(versionedMetrics, convertedMetrics, nil)
}
func (a *AccessControlClientset) Services(namespace string) (corev1.ServiceInterface, error) {
gvk := &schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Service"}
if !isAllowed(a.staticConfig, gvk) {
return nil, isNotAllowedError(gvk)
}
return a.delegate.CoreV1().Services(namespace), nil
}
func (a *AccessControlClientset) SelfSubjectAccessReviews() (authorizationv1.SelfSubjectAccessReviewInterface, error) {
gvk := &schema.GroupVersionKind{Group: authorizationv1api.GroupName, Version: authorizationv1api.SchemeGroupVersion.Version, Kind: "SelfSubjectAccessReview"}
if !isAllowed(a.staticConfig, gvk) {
return nil, isNotAllowedError(gvk)
}
return a.delegate.AuthorizationV1().SelfSubjectAccessReviews(), nil
}
// TokenReview returns TokenReviewInterface
func (a *AccessControlClientset) TokenReview() (authenticationv1.TokenReviewInterface, error) {
gvk := &schema.GroupVersionKind{Group: authenticationv1api.GroupName, Version: authorizationv1api.SchemeGroupVersion.Version, Kind: "TokenReview"}
if !isAllowed(a.staticConfig, gvk) {
return nil, isNotAllowedError(gvk)
}
return a.delegate.AuthenticationV1().TokenReviews(), nil
}
func NewAccessControlClientset(cfg *rest.Config, staticConfig *config.StaticConfig) (*AccessControlClientset, error) {
clientSet, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, err
}
metricsClient, err := metricsv1beta1.NewForConfig(cfg)
if err != nil {
return nil, err
}
return &AccessControlClientset{
cfg: cfg,
delegate: clientSet,
discoveryClient: clientSet.DiscoveryClient,
metricsV1beta1: metricsClient,
staticConfig: staticConfig,
}, nil
}
```
--------------------------------------------------------------------------------
/pkg/mcp/namespaces_test.go:
--------------------------------------------------------------------------------
```go
package mcp
import (
"regexp"
"slices"
"testing"
"github.com/BurntSushi/toml"
"github.com/mark3labs/mcp-go/mcp"
"github.com/stretchr/testify/suite"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"sigs.k8s.io/yaml"
"github.com/containers/kubernetes-mcp-server/internal/test"
"github.com/containers/kubernetes-mcp-server/pkg/config"
)
type NamespacesSuite struct {
BaseMcpSuite
}
func (s *NamespacesSuite) TestNamespacesList() {
s.InitMcpClient()
s.Run("namespaces_list", func() {
toolResult, err := s.CallTool("namespaces_list", map[string]interface{}{})
s.Run("no error", func() {
s.Nilf(err, "call tool failed %v", err)
s.Falsef(toolResult.IsError, "call tool failed")
})
s.Require().NotNil(toolResult, "Expected tool result from call")
var decoded []unstructured.Unstructured
err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded)
s.Run("has yaml content", func() {
s.Nilf(err, "invalid tool result content %v", err)
})
s.Run("returns at least 3 items", func() {
s.Truef(len(decoded) >= 3, "expected at least 3 items, got %v", len(decoded))
for _, expectedNamespace := range []string{"default", "ns-1", "ns-2"} {
s.Truef(slices.ContainsFunc(decoded, func(ns unstructured.Unstructured) bool {
return ns.GetName() == expectedNamespace
}), "namespace %s not found in the list", expectedNamespace)
}
})
})
}
func (s *NamespacesSuite) TestNamespacesListDenied() {
s.Require().NoError(toml.Unmarshal([]byte(`
denied_resources = [ { version = "v1", kind = "Namespace" } ]
`), s.Cfg), "Expected to parse denied resources config")
s.InitMcpClient()
s.Run("namespaces_list (denied)", func() {
toolResult, err := s.CallTool("namespaces_list", map[string]interface{}{})
s.Run("has error", func() {
s.Truef(toolResult.IsError, "call tool should fail")
s.Nilf(err, "call tool should not return error object")
})
s.Run("describes denial", func() {
expectedMessage := "failed to list namespaces: resource not allowed: /v1, Kind=Namespace"
s.Equalf(expectedMessage, toolResult.Content[0].(mcp.TextContent).Text,
"expected descriptive error '%s', got %v", expectedMessage, toolResult.Content[0].(mcp.TextContent).Text)
})
})
}
func (s *NamespacesSuite) TestNamespacesListAsTable() {
s.Cfg.ListOutput = "table"
s.InitMcpClient()
s.Run("namespaces_list (list_output=table)", func() {
toolResult, err := s.CallTool("namespaces_list", map[string]interface{}{})
s.Run("no error", func() {
s.Nilf(err, "call tool failed %v", err)
s.Falsef(toolResult.IsError, "call tool failed")
})
s.Require().NotNil(toolResult, "Expected tool result from call")
out := toolResult.Content[0].(mcp.TextContent).Text
s.Run("returns column headers", func() {
expectedHeaders := "APIVERSION\\s+KIND\\s+NAME\\s+STATUS\\s+AGE\\s+LABELS"
m, e := regexp.MatchString(expectedHeaders, out)
s.Truef(m, "Expected headers '%s' not found in output:\n%s", expectedHeaders, out)
s.NoErrorf(e, "Error matching headers regex: %v", e)
})
s.Run("returns formatted row for ns-1", func() {
expectedRow := "(?<apiVersion>v1)\\s+" +
"(?<kind>Namespace)\\s+" +
"(?<name>ns-1)\\s+" +
"(?<status>Active)\\s+" +
"(?<age>(\\d+m)?(\\d+s)?)\\s+" +
"(?<labels>kubernetes.io/metadata.name=ns-1)"
m, e := regexp.MatchString(expectedRow, out)
s.Truef(m, "Expected row '%s' not found in output:\n%s", expectedRow, out)
s.NoErrorf(e, "Error matching ns-1 regex: %v", e)
})
s.Run("returns formatted row for ns-2", func() {
expectedRow := "(?<apiVersion>v1)\\s+" +
"(?<kind>Namespace)\\s+" +
"(?<name>ns-2)\\s+" +
"(?<status>Active)\\s+" +
"(?<age>(\\d+m)?(\\d+s)?)\\s+" +
"(?<labels>kubernetes.io/metadata.name=ns-2)"
m, e := regexp.MatchString(expectedRow, out)
s.Truef(m, "Expected row '%s' not found in output:\n%s", expectedRow, out)
s.NoErrorf(e, "Error matching ns-2 regex: %v", e)
})
})
}
func TestNamespaces(t *testing.T) {
suite.Run(t, new(NamespacesSuite))
}
func TestProjectsListInOpenShift(t *testing.T) {
testCaseWithContext(t, &mcpContext{before: inOpenShift, after: inOpenShiftClear}, func(c *mcpContext) {
dynamicClient := dynamic.NewForConfigOrDie(envTestRestConfig)
_, _ = dynamicClient.Resource(schema.GroupVersionResource{Group: "project.openshift.io", Version: "v1", Resource: "projects"}).
Create(c.ctx, &unstructured.Unstructured{Object: map[string]interface{}{
"apiVersion": "project.openshift.io/v1",
"kind": "Project",
"metadata": map[string]interface{}{
"name": "an-openshift-project",
},
}}, metav1.CreateOptions{})
toolResult, err := c.callTool("projects_list", map[string]interface{}{})
t.Run("projects_list returns project list", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
}
if toolResult.IsError {
t.Fatalf("call tool failed")
}
})
var decoded []unstructured.Unstructured
err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded)
t.Run("projects_list has yaml content", func(t *testing.T) {
if err != nil {
t.Fatalf("invalid tool result content %v", err)
}
})
t.Run("projects_list returns at least 1 items", func(t *testing.T) {
if len(decoded) < 1 {
t.Errorf("invalid project count, expected at least 1, got %v", len(decoded))
}
idx := slices.IndexFunc(decoded, func(ns unstructured.Unstructured) bool {
return ns.GetName() == "an-openshift-project"
})
if idx == -1 {
t.Errorf("namespace %s not found in the list", "an-openshift-project")
}
})
})
}
func TestProjectsListInOpenShiftDenied(t *testing.T) {
deniedResourcesServer := test.Must(config.ReadToml([]byte(`
denied_resources = [ { group = "project.openshift.io", version = "v1" } ]
`)))
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer, before: inOpenShift, after: inOpenShiftClear}, func(c *mcpContext) {
c.withEnvTest()
projectsList, _ := c.callTool("projects_list", map[string]interface{}{})
t.Run("projects_list has error", func(t *testing.T) {
if !projectsList.IsError {
t.Fatalf("call tool should fail")
}
})
t.Run("projects_list describes denial", func(t *testing.T) {
expectedMessage := "failed to list projects: resource not allowed: project.openshift.io/v1, Kind=Project"
if projectsList.Content[0].(mcp.TextContent).Text != expectedMessage {
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, projectsList.Content[0].(mcp.TextContent).Text)
}
})
})
}
```
--------------------------------------------------------------------------------
/pkg/kubernetes/provider_test.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import (
"os"
"strings"
"testing"
"github.com/containers/kubernetes-mcp-server/internal/test"
"github.com/containers/kubernetes-mcp-server/pkg/config"
"github.com/stretchr/testify/suite"
"k8s.io/client-go/rest"
)
type BaseProviderSuite struct {
suite.Suite
originalProviderFactories map[string]ProviderFactory
}
func (s *BaseProviderSuite) SetupTest() {
s.originalProviderFactories = make(map[string]ProviderFactory)
for k, v := range providerFactories {
s.originalProviderFactories[k] = v
}
}
func (s *BaseProviderSuite) TearDownTest() {
providerFactories = make(map[string]ProviderFactory)
for k, v := range s.originalProviderFactories {
providerFactories[k] = v
}
}
type ProviderTestSuite struct {
BaseProviderSuite
originalEnv []string
originalInClusterConfig func() (*rest.Config, error)
mockServer *test.MockServer
kubeconfigPath string
}
func (s *ProviderTestSuite) SetupTest() {
s.BaseProviderSuite.SetupTest()
s.originalEnv = os.Environ()
s.originalInClusterConfig = InClusterConfig
s.mockServer = test.NewMockServer()
s.kubeconfigPath = strings.ReplaceAll(s.mockServer.KubeconfigFile(s.T()), `\`, `\\`)
}
func (s *ProviderTestSuite) TearDownTest() {
s.BaseProviderSuite.TearDownTest()
test.RestoreEnv(s.originalEnv)
InClusterConfig = s.originalInClusterConfig
if s.mockServer != nil {
s.mockServer.Close()
}
}
func (s *ProviderTestSuite) TestNewProviderInCluster() {
InClusterConfig = func() (*rest.Config, error) {
return &rest.Config{}, nil
}
s.Run("With no cluster_provider_strategy, returns single-cluster provider", func() {
cfg := test.Must(config.ReadToml([]byte{}))
provider, err := NewProvider(cfg)
s.Require().NoError(err, "Expected no error for in-cluster provider")
s.NotNil(provider, "Expected provider instance")
s.IsType(&singleClusterProvider{}, provider, "Expected singleClusterProvider type")
})
s.Run("With cluster_provider_strategy=in-cluster, returns single-cluster provider", func() {
cfg := test.Must(config.ReadToml([]byte(`
cluster_provider_strategy = "in-cluster"
`)))
provider, err := NewProvider(cfg)
s.Require().NoError(err, "Expected no error for single-cluster strategy")
s.NotNil(provider, "Expected provider instance")
s.IsType(&singleClusterProvider{}, provider, "Expected singleClusterProvider type")
})
s.Run("With cluster_provider_strategy=kubeconfig, returns error", func() {
cfg := test.Must(config.ReadToml([]byte(`
cluster_provider_strategy = "kubeconfig"
`)))
provider, err := NewProvider(cfg)
s.Require().Error(err, "Expected error for kubeconfig strategy")
s.ErrorContains(err, "kubeconfig ClusterProviderStrategy is invalid for in-cluster deployments")
s.Nilf(provider, "Expected no provider instance, got %v", provider)
})
s.Run("With cluster_provider_strategy=kubeconfig and kubeconfig set to valid path, returns kubeconfig provider", func() {
cfg := test.Must(config.ReadToml([]byte(`
cluster_provider_strategy = "kubeconfig"
kubeconfig = "` + s.kubeconfigPath + `"
`)))
provider, err := NewProvider(cfg)
s.Require().NoError(err, "Expected no error for kubeconfig strategy")
s.NotNil(provider, "Expected provider instance")
s.IsType(&kubeConfigClusterProvider{}, provider, "Expected kubeConfigClusterProvider type")
})
s.Run("With cluster_provider_strategy=non-existent, returns error", func() {
cfg := test.Must(config.ReadToml([]byte(`
cluster_provider_strategy = "i-do-not-exist"
`)))
provider, err := NewProvider(cfg)
s.Require().Error(err, "Expected error for non-existent strategy")
s.ErrorContains(err, "no provider registered for strategy 'i-do-not-exist'")
s.Nilf(provider, "Expected no provider instance, got %v", provider)
})
}
func (s *ProviderTestSuite) TestNewProviderLocal() {
InClusterConfig = func() (*rest.Config, error) {
return nil, rest.ErrNotInCluster
}
s.Require().NoError(os.Setenv("KUBECONFIG", s.kubeconfigPath))
s.Run("With no cluster_provider_strategy, returns kubeconfig provider", func() {
cfg := test.Must(config.ReadToml([]byte{}))
provider, err := NewProvider(cfg)
s.Require().NoError(err, "Expected no error for kubeconfig provider")
s.NotNil(provider, "Expected provider instance")
s.IsType(&kubeConfigClusterProvider{}, provider, "Expected kubeConfigClusterProvider type")
})
s.Run("With cluster_provider_strategy=kubeconfig, returns kubeconfig provider", func() {
cfg := test.Must(config.ReadToml([]byte(`
cluster_provider_strategy = "kubeconfig"
`)))
provider, err := NewProvider(cfg)
s.Require().NoError(err, "Expected no error for kubeconfig provider")
s.NotNil(provider, "Expected provider instance")
s.IsType(&kubeConfigClusterProvider{}, provider, "Expected kubeConfigClusterProvider type")
})
s.Run("With cluster_provider_strategy=disabled, returns single-cluster provider", func() {
cfg := test.Must(config.ReadToml([]byte(`
cluster_provider_strategy = "disabled"
`)))
provider, err := NewProvider(cfg)
s.Require().NoError(err, "Expected no error for disabled strategy")
s.NotNil(provider, "Expected provider instance")
s.IsType(&singleClusterProvider{}, provider, "Expected singleClusterProvider type")
})
s.Run("With cluster_provider_strategy=in-cluster, returns error", func() {
cfg := test.Must(config.ReadToml([]byte(`
cluster_provider_strategy = "in-cluster"
`)))
provider, err := NewProvider(cfg)
s.Require().Error(err, "Expected error for in-cluster strategy")
s.ErrorContains(err, "server must be deployed in cluster for the in-cluster ClusterProviderStrategy")
s.Nilf(provider, "Expected no provider instance, got %v", provider)
})
s.Run("With cluster_provider_strategy=in-cluster and kubeconfig set to valid path, returns error", func() {
cfg := test.Must(config.ReadToml([]byte(`
kubeconfig = "` + s.kubeconfigPath + `"
cluster_provider_strategy = "in-cluster"
`)))
provider, err := NewProvider(cfg)
s.Require().Error(err, "Expected error for in-cluster strategy")
s.Regexp("kubeconfig file .+ cannot be used with the in-cluster ClusterProviderStrategy", err.Error())
s.Nilf(provider, "Expected no provider instance, got %v", provider)
})
s.Run("With cluster_provider_strategy=non-existent, returns error", func() {
cfg := test.Must(config.ReadToml([]byte(`
cluster_provider_strategy = "i-do-not-exist"
`)))
provider, err := NewProvider(cfg)
s.Require().Error(err, "Expected error for non-existent strategy")
s.ErrorContains(err, "no provider registered for strategy 'i-do-not-exist'")
s.Nilf(provider, "Expected no provider instance, got %v", provider)
})
}
func TestProvider(t *testing.T) {
suite.Run(t, new(ProviderTestSuite))
}
```
--------------------------------------------------------------------------------
/pkg/http/sts_test.go:
--------------------------------------------------------------------------------
```go
package http
import (
"encoding/base64"
"fmt"
"net/http"
"strings"
"testing"
"github.com/containers/kubernetes-mcp-server/internal/test"
"github.com/coreos/go-oidc/v3/oidc"
"golang.org/x/oauth2"
)
func TestIsEnabled(t *testing.T) {
disabledCases := []SecurityTokenService{
{},
{Provider: nil},
{Provider: &oidc.Provider{}},
{Provider: &oidc.Provider{}, ClientId: "test-client-id", ClientSecret: "test-client-secret"},
{ClientId: "test-client-id", ClientSecret: "test-client-secret", ExternalAccountAudience: "test-audience"},
{Provider: &oidc.Provider{}, ClientSecret: "test-client-secret", ExternalAccountAudience: "test-audience"},
}
for _, sts := range disabledCases {
t.Run(fmt.Sprintf("SecurityTokenService{%+v}.IsEnabled() = false", sts), func(t *testing.T) {
if sts.IsEnabled() {
t.Errorf("SecurityTokenService{%+v}.IsEnabled() = true; want false", sts)
}
})
}
enabledCases := []SecurityTokenService{
{Provider: &oidc.Provider{}, ClientId: "test-client-id", ExternalAccountAudience: "test-audience"},
{Provider: &oidc.Provider{}, ClientId: "test-client-id", ExternalAccountAudience: "test-audience", ClientSecret: "test-client-secret"},
{Provider: &oidc.Provider{}, ClientId: "test-client-id", ExternalAccountAudience: "test-audience", ClientSecret: "test-client-secret", ExternalAccountScopes: []string{"test-scope"}},
}
for _, sts := range enabledCases {
t.Run(fmt.Sprintf("SecurityTokenService{%+v}.IsEnabled() = true", sts), func(t *testing.T) {
if !sts.IsEnabled() {
t.Errorf("SecurityTokenService{%+v}.IsEnabled() = false; want true", sts)
}
})
}
}
func TestExternalAccountTokenExchange(t *testing.T) {
mockServer := test.NewMockServer()
authServer := mockServer.Config().Host
var tokenExchangeRequest *http.Request
mockServer.Handle(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.Path == "/.well-known/openid-configuration" {
w.Header().Set("Content-Type", "application/json")
_, _ = fmt.Fprintf(w, `{
"issuer": "%s",
"authorization_endpoint": "https://mock-oidc-provider/authorize",
"token_endpoint": "%s/token"
}`, authServer, authServer)
return
}
if req.URL.Path == "/token" {
tokenExchangeRequest = req
_ = tokenExchangeRequest.ParseForm()
if tokenExchangeRequest.PostForm.Get("subject_token") != "the-original-access-token" {
http.Error(w, "Invalid subject_token", http.StatusUnauthorized)
return
}
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"access_token":"exchanged-access-token","token_type":"Bearer","expires_in":253402297199}`))
return
}
}))
t.Cleanup(mockServer.Close)
provider, err := oidc.NewProvider(t.Context(), authServer)
if err != nil {
t.Fatalf("oidc.NewProvider() error = %v; want nil", err)
}
// With missing Token Source information
_, err = (&SecurityTokenService{Provider: provider}).ExternalAccountTokenExchange(t.Context(), &oauth2.Token{})
t.Run("ExternalAccountTokenExchange with missing token source returns error", func(t *testing.T) {
if err == nil {
t.Fatalf("ExternalAccountTokenExchange() error = nil; want error")
}
if !strings.Contains(err.Error(), "must be set") {
t.Errorf("ExternalAccountTokenExchange() error = %v; want missing required field", err)
}
})
// With valid Token Source information
sts := SecurityTokenService{
Provider: provider,
ClientId: "test-client-id",
ClientSecret: "test-client-secret",
ExternalAccountAudience: "test-audience",
ExternalAccountScopes: []string{"test-scope"},
}
// With Invalid token
_, err = sts.ExternalAccountTokenExchange(t.Context(), &oauth2.Token{
AccessToken: "invalid-access-token",
TokenType: "Bearer",
})
t.Run("ExternalAccountTokenExchange with invalid token returns error", func(t *testing.T) {
if err == nil {
t.Fatalf("ExternalAccountTokenExchange() error = nil; want error")
}
if !strings.Contains(err.Error(), "status code 401: Invalid subject_token") {
t.Errorf("ExternalAccountTokenExchange() error = %v; want invalid_grant: Invalid subject_token", err)
}
})
// With Valid token
exchangeToken, err := sts.ExternalAccountTokenExchange(t.Context(), &oauth2.Token{
AccessToken: "the-original-access-token",
TokenType: "Bearer",
})
t.Run("ExternalAccountTokenExchange with valid token returns new token", func(t *testing.T) {
if err != nil {
t.Errorf("ExternalAccountTokenExchange() error = %v; want nil", err)
}
if exchangeToken == nil {
t.Fatal("ExternalAccountTokenExchange() = nil; want token")
}
if exchangeToken.AccessToken != "exchanged-access-token" {
t.Errorf("exchangeToken.AccessToken = %s; want exchanged-access-token", exchangeToken.AccessToken)
}
})
t.Run("ExternalAccountTokenExchange with valid token sends POST request", func(t *testing.T) {
if tokenExchangeRequest == nil {
t.Fatal("tokenExchangeRequest is nil; want request")
}
if tokenExchangeRequest.Method != "POST" {
t.Errorf("tokenExchangeRequest.Method = %s; want POST", tokenExchangeRequest.Method)
}
})
t.Run("ExternalAccountTokenExchange with valid token has correct form data", func(t *testing.T) {
if tokenExchangeRequest.Header.Get("Content-Type") != "application/x-www-form-urlencoded" {
t.Errorf("tokenExchangeRequest.Content-Type = %s; want application/x-www-form-urlencoded", tokenExchangeRequest.Header.Get("Content-Type"))
}
if tokenExchangeRequest.PostForm.Get("audience") != "test-audience" {
t.Errorf("tokenExchangeRequest.PostForm[audience] = %s; want test-audience", tokenExchangeRequest.PostForm.Get("audience"))
}
if tokenExchangeRequest.PostForm.Get("subject_token_type") != "urn:ietf:params:oauth:token-type:access_token" {
t.Errorf("tokenExchangeRequest.PostForm[subject_token_type] = %s; want urn:ietf:params:oauth:token-type:access_token", tokenExchangeRequest.PostForm.Get("subject_token_type"))
}
if tokenExchangeRequest.PostForm.Get("subject_token") != "the-original-access-token" {
t.Errorf("tokenExchangeRequest.PostForm[subject_token] = %s; want the-original-access-token", tokenExchangeRequest.PostForm.Get("subject_token"))
}
if len(tokenExchangeRequest.PostForm["scope"]) == 0 || tokenExchangeRequest.PostForm["scope"][0] != "test-scope" {
t.Errorf("tokenExchangeRequest.PostForm[scope] = %v; want [test-scope]", tokenExchangeRequest.PostForm["scope"])
}
})
t.Run("ExternalAccountTokenExchange with valid token sends correct client credentials header", func(t *testing.T) {
if tokenExchangeRequest.Header.Get("Authorization") != "Basic "+base64.StdEncoding.EncodeToString([]byte("test-client-id:test-client-secret")) {
t.Errorf("tokenExchangeRequest.Header[Authorization] = %s; want Basic base64(test-client-id:test-client-secret)", tokenExchangeRequest.Header.Get("Authorization"))
}
})
}
```
--------------------------------------------------------------------------------
/pkg/kubernetes/provider_kubeconfig_test.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import (
"fmt"
"net/http"
"testing"
"github.com/containers/kubernetes-mcp-server/internal/test"
"github.com/containers/kubernetes-mcp-server/pkg/config"
"github.com/stretchr/testify/suite"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)
type ProviderKubeconfigTestSuite struct {
BaseProviderSuite
mockServer *test.MockServer
provider Provider
}
func (s *ProviderKubeconfigTestSuite) SetupTest() {
// Kubeconfig provider is used when the multi-cluster feature is enabled with the kubeconfig strategy.
// For this test suite we simulate a kubeconfig with multiple contexts.
s.mockServer = test.NewMockServer()
kubeconfig := s.mockServer.Kubeconfig()
for i := 0; i < 10; i++ {
// Add multiple fake contexts to force multi-cluster behavior
kubeconfig.Contexts[fmt.Sprintf("context-%d", i)] = clientcmdapi.NewContext()
}
provider, err := NewProvider(&config.StaticConfig{KubeConfig: test.KubeconfigFile(s.T(), kubeconfig)})
s.Require().NoError(err, "Expected no error creating provider with kubeconfig")
s.provider = provider
}
func (s *ProviderKubeconfigTestSuite) TearDownTest() {
if s.mockServer != nil {
s.mockServer.Close()
}
}
func (s *ProviderKubeconfigTestSuite) TestType() {
s.IsType(&kubeConfigClusterProvider{}, s.provider)
}
func (s *ProviderKubeconfigTestSuite) TestWithNonOpenShiftCluster() {
s.Run("IsOpenShift returns false", func() {
inOpenShift := s.provider.IsOpenShift(s.T().Context())
s.False(inOpenShift, "Expected InOpenShift to return false")
})
}
func (s *ProviderKubeconfigTestSuite) TestWithOpenShiftCluster() {
s.mockServer.Handle(&test.InOpenShiftHandler{})
s.Run("IsOpenShift returns true", func() {
inOpenShift := s.provider.IsOpenShift(s.T().Context())
s.True(inOpenShift, "Expected InOpenShift to return true")
})
}
func (s *ProviderKubeconfigTestSuite) TestVerifyToken() {
s.mockServer.Handle(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.EscapedPath() == "/apis/authentication.k8s.io/v1/tokenreviews" {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`
{
"kind": "TokenReview",
"apiVersion": "authentication.k8s.io/v1",
"spec": {"token": "the-token"},
"status": {
"authenticated": true,
"user": {
"username": "test-user",
"groups": ["system:authenticated"]
},
"audiences": ["the-audience"]
}
}`))
}
}))
s.Run("VerifyToken returns UserInfo for non-empty context", func() {
userInfo, audiences, err := s.provider.VerifyToken(s.T().Context(), "fake-context", "some-token", "the-audience")
s.Require().NoError(err, "Expected no error from VerifyToken with empty target")
s.Require().NotNil(userInfo, "Expected UserInfo from VerifyToken with empty target")
s.Equalf(userInfo.Username, "test-user", "Expected username test-user, got: %s", userInfo.Username)
s.Containsf(userInfo.Groups, "system:authenticated", "Expected group system:authenticated in %v", userInfo.Groups)
s.Require().NotNil(audiences, "Expected audiences from VerifyToken with empty target")
s.Len(audiences, 1, "Expected audiences from VerifyToken with empty target")
s.Containsf(audiences, "the-audience", "Expected audience the-audience in %v", audiences)
})
s.Run("VerifyToken returns UserInfo for empty context (default context)", func() {
userInfo, audiences, err := s.provider.VerifyToken(s.T().Context(), "", "the-token", "the-audience")
s.Require().NoError(err, "Expected no error from VerifyToken with empty target")
s.Require().NotNil(userInfo, "Expected UserInfo from VerifyToken with empty target")
s.Equalf(userInfo.Username, "test-user", "Expected username test-user, got: %s", userInfo.Username)
s.Containsf(userInfo.Groups, "system:authenticated", "Expected group system:authenticated in %v", userInfo.Groups)
s.Require().NotNil(audiences, "Expected audiences from VerifyToken with empty target")
s.Len(audiences, 1, "Expected audiences from VerifyToken with empty target")
s.Containsf(audiences, "the-audience", "Expected audience the-audience in %v", audiences)
})
s.Run("VerifyToken returns error for invalid context", func() {
userInfo, audiences, err := s.provider.VerifyToken(s.T().Context(), "invalid-context", "some-token", "the-audience")
s.Require().Error(err, "Expected error from VerifyToken with invalid target")
s.ErrorContainsf(err, `context "invalid-context" does not exist`, "Expected context does not exist error, got: %v", err)
s.Nil(userInfo, "Expected no UserInfo from VerifyToken with invalid target")
s.Nil(audiences, "Expected no audiences from VerifyToken with invalid target")
})
}
func (s *ProviderKubeconfigTestSuite) TestGetTargets() {
s.Run("GetTargets returns all contexts defined in kubeconfig", func() {
targets, err := s.provider.GetTargets(s.T().Context())
s.Require().NoError(err, "Expected no error from GetTargets")
s.Len(targets, 11, "Expected 11 targets from GetTargets")
s.Contains(targets, "fake-context", "Expected fake-context in targets from GetTargets")
for i := 0; i < 10; i++ {
s.Contains(targets, fmt.Sprintf("context-%d", i), "Expected context-%d in targets from GetTargets", i)
}
})
}
func (s *ProviderKubeconfigTestSuite) TestGetDerivedKubernetes() {
s.Run("GetDerivedKubernetes returns Kubernetes for valid context", func() {
k8s, err := s.provider.GetDerivedKubernetes(s.T().Context(), "fake-context")
s.Require().NoError(err, "Expected no error from GetDerivedKubernetes with valid context")
s.NotNil(k8s, "Expected Kubernetes from GetDerivedKubernetes with valid context")
})
s.Run("GetDerivedKubernetes returns Kubernetes for empty context (default)", func() {
k8s, err := s.provider.GetDerivedKubernetes(s.T().Context(), "")
s.Require().NoError(err, "Expected no error from GetDerivedKubernetes with empty context")
s.NotNil(k8s, "Expected Kubernetes from GetDerivedKubernetes with empty context")
})
s.Run("GetDerivedKubernetes returns error for invalid context", func() {
k8s, err := s.provider.GetDerivedKubernetes(s.T().Context(), "invalid-context")
s.Require().Error(err, "Expected error from GetDerivedKubernetes with invalid context")
s.ErrorContainsf(err, `context "invalid-context" does not exist`, "Expected context does not exist error, got: %v", err)
s.Nil(k8s, "Expected no Kubernetes from GetDerivedKubernetes with invalid context")
})
}
func (s *ProviderKubeconfigTestSuite) TestGetDefaultTarget() {
s.Run("GetDefaultTarget returns current-context defined in kubeconfig", func() {
s.Equal("fake-context", s.provider.GetDefaultTarget(), "Expected fake-context as default target")
})
}
func (s *ProviderKubeconfigTestSuite) TestGetTargetParameterName() {
s.Equal("context", s.provider.GetTargetParameterName(), "Expected context as target parameter name")
}
func TestProviderKubeconfig(t *testing.T) {
suite.Run(t, new(ProviderKubeconfigTestSuite))
}
```
--------------------------------------------------------------------------------
/pkg/mcp/mcp.go:
--------------------------------------------------------------------------------
```go
package mcp
import (
"bytes"
"context"
"fmt"
"net/http"
"slices"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
authenticationapiv1 "k8s.io/api/authentication/v1"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
"github.com/containers/kubernetes-mcp-server/pkg/api"
"github.com/containers/kubernetes-mcp-server/pkg/config"
internalk8s "github.com/containers/kubernetes-mcp-server/pkg/kubernetes"
"github.com/containers/kubernetes-mcp-server/pkg/output"
"github.com/containers/kubernetes-mcp-server/pkg/toolsets"
"github.com/containers/kubernetes-mcp-server/pkg/version"
)
type ContextKey string
const TokenScopesContextKey = ContextKey("TokenScopesContextKey")
type Configuration struct {
*config.StaticConfig
listOutput output.Output
toolsets []api.Toolset
}
func (c *Configuration) Toolsets() []api.Toolset {
if c.toolsets == nil {
for _, toolset := range c.StaticConfig.Toolsets {
c.toolsets = append(c.toolsets, toolsets.ToolsetFromString(toolset))
}
}
return c.toolsets
}
func (c *Configuration) ListOutput() output.Output {
if c.listOutput == nil {
c.listOutput = output.FromString(c.StaticConfig.ListOutput)
}
return c.listOutput
}
func (c *Configuration) isToolApplicable(tool api.ServerTool) bool {
if c.ReadOnly && !ptr.Deref(tool.Tool.Annotations.ReadOnlyHint, false) {
return false
}
if c.DisableDestructive && ptr.Deref(tool.Tool.Annotations.DestructiveHint, false) {
return false
}
if c.EnabledTools != nil && !slices.Contains(c.EnabledTools, tool.Tool.Name) {
return false
}
if c.DisabledTools != nil && slices.Contains(c.DisabledTools, tool.Tool.Name) {
return false
}
return true
}
type Server struct {
configuration *Configuration
server *server.MCPServer
enabledTools []string
p internalk8s.Provider
}
func NewServer(configuration Configuration) (*Server, error) {
var serverOptions []server.ServerOption
serverOptions = append(serverOptions,
server.WithResourceCapabilities(true, true),
server.WithPromptCapabilities(true),
server.WithToolCapabilities(true),
server.WithLogging(),
server.WithToolHandlerMiddleware(toolCallLoggingMiddleware),
)
if configuration.RequireOAuth && false { // TODO: Disabled scope auth validation for now
serverOptions = append(serverOptions, server.WithToolHandlerMiddleware(toolScopedAuthorizationMiddleware))
}
s := &Server{
configuration: &configuration,
server: server.NewMCPServer(
version.BinaryName,
version.Version,
serverOptions...,
),
}
if err := s.reloadKubernetesClusterProvider(); err != nil {
return nil, err
}
s.p.WatchTargets(s.reloadKubernetesClusterProvider)
return s, nil
}
func (s *Server) reloadKubernetesClusterProvider() error {
ctx := context.Background()
p, err := internalk8s.NewProvider(s.configuration.StaticConfig)
if err != nil {
return err
}
// close the old provider
if s.p != nil {
s.p.Close()
}
s.p = p
targets, err := p.GetTargets(ctx)
if err != nil {
return err
}
filter := CompositeFilter(
s.configuration.isToolApplicable,
ShouldIncludeTargetListTool(p.GetTargetParameterName(), targets),
)
mutator := WithTargetParameter(
p.GetDefaultTarget(),
p.GetTargetParameterName(),
targets,
)
applicableTools := make([]api.ServerTool, 0)
for _, toolset := range s.configuration.Toolsets() {
for _, tool := range toolset.GetTools(p) {
tool := mutator(tool)
if !filter(tool) {
continue
}
applicableTools = append(applicableTools, tool)
s.enabledTools = append(s.enabledTools, tool.Tool.Name)
}
}
m3labsServerTools, err := ServerToolToM3LabsServerTool(s, applicableTools)
if err != nil {
return fmt.Errorf("failed to convert tools: %v", err)
}
s.server.SetTools(m3labsServerTools...)
// start new watch
s.p.WatchTargets(s.reloadKubernetesClusterProvider)
return nil
}
func (s *Server) ServeStdio() error {
return server.ServeStdio(s.server)
}
func (s *Server) ServeSse(baseUrl string, httpServer *http.Server) *server.SSEServer {
options := make([]server.SSEOption, 0)
options = append(options, server.WithSSEContextFunc(contextFunc), server.WithHTTPServer(httpServer))
if baseUrl != "" {
options = append(options, server.WithBaseURL(baseUrl))
}
return server.NewSSEServer(s.server, options...)
}
func (s *Server) ServeHTTP(httpServer *http.Server) *server.StreamableHTTPServer {
options := []server.StreamableHTTPOption{
server.WithHTTPContextFunc(contextFunc),
server.WithStreamableHTTPServer(httpServer),
server.WithStateLess(true),
}
return server.NewStreamableHTTPServer(s.server, options...)
}
// KubernetesApiVerifyToken verifies the given token with the audience by
// sending an TokenReview request to API Server for the specified cluster.
func (s *Server) KubernetesApiVerifyToken(ctx context.Context, cluster, token, audience string) (*authenticationapiv1.UserInfo, []string, error) {
if s.p == nil {
return nil, nil, fmt.Errorf("kubernetes cluster provider is not initialized")
}
return s.p.VerifyToken(ctx, cluster, token, audience)
}
// GetTargetParameterName returns the parameter name used for target identification in MCP requests
func (s *Server) GetTargetParameterName() string {
if s.p == nil {
return "" // fallback for uninitialized provider
}
return s.p.GetTargetParameterName()
}
func (s *Server) GetEnabledTools() []string {
return s.enabledTools
}
func (s *Server) Close() {
if s.p != nil {
s.p.Close()
}
}
func NewTextResult(content string, err error) *mcp.CallToolResult {
if err != nil {
return &mcp.CallToolResult{
IsError: true,
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: err.Error(),
},
},
}
}
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: content,
},
},
}
}
func contextFunc(ctx context.Context, r *http.Request) context.Context {
// Get the standard Authorization header (OAuth compliant)
authHeader := r.Header.Get(string(internalk8s.OAuthAuthorizationHeader))
if authHeader != "" {
return context.WithValue(ctx, internalk8s.OAuthAuthorizationHeader, authHeader)
}
// Fallback to custom header for backward compatibility
customAuthHeader := r.Header.Get(string(internalk8s.CustomAuthorizationHeader))
if customAuthHeader != "" {
return context.WithValue(ctx, internalk8s.OAuthAuthorizationHeader, customAuthHeader)
}
return ctx
}
func toolCallLoggingMiddleware(next server.ToolHandlerFunc) server.ToolHandlerFunc {
return func(ctx context.Context, ctr mcp.CallToolRequest) (*mcp.CallToolResult, error) {
klog.V(5).Infof("mcp tool call: %s(%v)", ctr.Params.Name, ctr.Params.Arguments)
if ctr.Header != nil {
buffer := bytes.NewBuffer(make([]byte, 0))
if err := ctr.Header.WriteSubset(buffer, map[string]bool{"Authorization": true, "authorization": true}); err == nil {
klog.V(7).Infof("mcp tool call headers: %s", buffer)
}
}
return next(ctx, ctr)
}
}
func toolScopedAuthorizationMiddleware(next server.ToolHandlerFunc) server.ToolHandlerFunc {
return func(ctx context.Context, ctr mcp.CallToolRequest) (*mcp.CallToolResult, error) {
scopes, ok := ctx.Value(TokenScopesContextKey).([]string)
if !ok {
return NewTextResult("", fmt.Errorf("authorization failed: Access denied: Tool '%s' requires scope 'mcp:%s' but no scope is available", ctr.Params.Name, ctr.Params.Name)), nil
}
if !slices.Contains(scopes, "mcp:"+ctr.Params.Name) && !slices.Contains(scopes, ctr.Params.Name) {
return NewTextResult("", fmt.Errorf("authorization failed: Access denied: Tool '%s' requires scope 'mcp:%s' but only scopes %s are available", ctr.Params.Name, ctr.Params.Name, scopes)), nil
}
return next(ctx, ctr)
}
}
```
--------------------------------------------------------------------------------
/pkg/kubernetes/resources.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/runtime"
"regexp"
"strings"
"github.com/containers/kubernetes-mcp-server/pkg/version"
authv1 "k8s.io/api/authorization/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/yaml"
)
const (
AppKubernetesComponent = "app.kubernetes.io/component"
AppKubernetesManagedBy = "app.kubernetes.io/managed-by"
AppKubernetesName = "app.kubernetes.io/name"
AppKubernetesPartOf = "app.kubernetes.io/part-of"
)
type ResourceListOptions struct {
metav1.ListOptions
AsTable bool
}
func (k *Kubernetes) ResourcesList(ctx context.Context, gvk *schema.GroupVersionKind, namespace string, options ResourceListOptions) (runtime.Unstructured, error) {
gvr, err := k.resourceFor(gvk)
if err != nil {
return nil, err
}
// Check if operation is allowed for all namespaces (applicable for namespaced resources)
isNamespaced, _ := k.isNamespaced(gvk)
if isNamespaced && !k.canIUse(ctx, gvr, namespace, "list") && namespace == "" {
namespace = k.manager.configuredNamespace()
}
if options.AsTable {
return k.resourcesListAsTable(ctx, gvk, gvr, namespace, options)
}
return k.manager.dynamicClient.Resource(*gvr).Namespace(namespace).List(ctx, options.ListOptions)
}
func (k *Kubernetes) ResourcesGet(ctx context.Context, gvk *schema.GroupVersionKind, namespace, name string) (*unstructured.Unstructured, error) {
gvr, err := k.resourceFor(gvk)
if err != nil {
return nil, err
}
// If it's a namespaced resource and namespace wasn't provided, try to use the default configured one
if namespaced, nsErr := k.isNamespaced(gvk); nsErr == nil && namespaced {
namespace = k.NamespaceOrDefault(namespace)
}
return k.manager.dynamicClient.Resource(*gvr).Namespace(namespace).Get(ctx, name, metav1.GetOptions{})
}
func (k *Kubernetes) ResourcesCreateOrUpdate(ctx context.Context, resource string) ([]*unstructured.Unstructured, error) {
separator := regexp.MustCompile(`\r?\n---\r?\n`)
resources := separator.Split(resource, -1)
var parsedResources []*unstructured.Unstructured
for _, r := range resources {
var obj unstructured.Unstructured
if err := yaml.NewYAMLToJSONDecoder(strings.NewReader(r)).Decode(&obj); err != nil {
return nil, err
}
parsedResources = append(parsedResources, &obj)
}
return k.resourcesCreateOrUpdate(ctx, parsedResources)
}
func (k *Kubernetes) ResourcesDelete(ctx context.Context, gvk *schema.GroupVersionKind, namespace, name string) error {
gvr, err := k.resourceFor(gvk)
if err != nil {
return err
}
// If it's a namespaced resource and namespace wasn't provided, try to use the default configured one
if namespaced, nsErr := k.isNamespaced(gvk); nsErr == nil && namespaced {
namespace = k.NamespaceOrDefault(namespace)
}
return k.manager.dynamicClient.Resource(*gvr).Namespace(namespace).Delete(ctx, name, metav1.DeleteOptions{})
}
// resourcesListAsTable retrieves a list of resources in a table format.
// It's almost identical to the dynamic.DynamicClient implementation, but it uses a specific Accept header to request the table format.
// dynamic.DynamicClient does not provide a way to set the HTTP header (TODO: create an issue to request this feature)
func (k *Kubernetes) resourcesListAsTable(ctx context.Context, gvk *schema.GroupVersionKind, gvr *schema.GroupVersionResource, namespace string, options ResourceListOptions) (runtime.Unstructured, error) {
var url []string
if len(gvr.Group) == 0 {
url = append(url, "api")
} else {
url = append(url, "apis", gvr.Group)
}
url = append(url, gvr.Version)
if len(namespace) > 0 {
url = append(url, "namespaces", namespace)
}
url = append(url, gvr.Resource)
var table metav1.Table
err := k.manager.discoveryClient.RESTClient().
Get().
SetHeader("Accept", strings.Join([]string{
fmt.Sprintf("application/json;as=Table;v=%s;g=%s", metav1.SchemeGroupVersion.Version, metav1.GroupName),
fmt.Sprintf("application/json;as=Table;v=%s;g=%s", metav1beta1.SchemeGroupVersion.Version, metav1beta1.GroupName),
"application/json",
}, ",")).
AbsPath(url...).
SpecificallyVersionedParams(&options.ListOptions, ParameterCodec, schema.GroupVersion{Version: "v1"}).
Do(ctx).Into(&table)
if err != nil {
return nil, err
}
// Add metav1.Table apiVersion and kind to the unstructured object (server may not return these fields)
table.SetGroupVersionKind(metav1.SchemeGroupVersion.WithKind("Table"))
// Add additional columns for fields that aren't returned by the server
table.ColumnDefinitions = append([]metav1.TableColumnDefinition{
{Name: "apiVersion", Type: "string"},
{Name: "kind", Type: "string"},
}, table.ColumnDefinitions...)
for i := range table.Rows {
row := &table.Rows[i]
row.Cells = append([]interface{}{
gvr.GroupVersion().String(),
gvk.Kind,
}, row.Cells...)
}
unstructuredObject, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&table)
return &unstructured.Unstructured{Object: unstructuredObject}, err
}
func (k *Kubernetes) resourcesCreateOrUpdate(ctx context.Context, resources []*unstructured.Unstructured) ([]*unstructured.Unstructured, error) {
for i, obj := range resources {
gvk := obj.GroupVersionKind()
gvr, rErr := k.resourceFor(&gvk)
if rErr != nil {
return nil, rErr
}
namespace := obj.GetNamespace()
// If it's a namespaced resource and namespace wasn't provided, try to use the default configured one
if namespaced, nsErr := k.isNamespaced(&gvk); nsErr == nil && namespaced {
namespace = k.NamespaceOrDefault(namespace)
}
resources[i], rErr = k.manager.dynamicClient.Resource(*gvr).Namespace(namespace).Apply(ctx, obj.GetName(), obj, metav1.ApplyOptions{
FieldManager: version.BinaryName,
})
if rErr != nil {
return nil, rErr
}
// Clear the cache to ensure the next operation is performed on the latest exposed APIs (will change after the CRD creation)
if gvk.Kind == "CustomResourceDefinition" {
k.manager.accessControlRESTMapper.Reset()
}
}
return resources, nil
}
func (k *Kubernetes) resourceFor(gvk *schema.GroupVersionKind) (*schema.GroupVersionResource, error) {
m, err := k.manager.accessControlRESTMapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version)
if err != nil {
return nil, err
}
return &m.Resource, nil
}
func (k *Kubernetes) isNamespaced(gvk *schema.GroupVersionKind) (bool, error) {
apiResourceList, err := k.manager.discoveryClient.ServerResourcesForGroupVersion(gvk.GroupVersion().String())
if err != nil {
return false, err
}
for _, apiResource := range apiResourceList.APIResources {
if apiResource.Kind == gvk.Kind {
return apiResource.Namespaced, nil
}
}
return false, nil
}
func (k *Kubernetes) supportsGroupVersion(groupVersion string) bool {
if _, err := k.manager.discoveryClient.ServerResourcesForGroupVersion(groupVersion); err != nil {
return false
}
return true
}
func (k *Kubernetes) canIUse(ctx context.Context, gvr *schema.GroupVersionResource, namespace, verb string) bool {
accessReviews, err := k.manager.accessControlClientSet.SelfSubjectAccessReviews()
if err != nil {
return false
}
response, err := accessReviews.Create(ctx, &authv1.SelfSubjectAccessReview{
Spec: authv1.SelfSubjectAccessReviewSpec{ResourceAttributes: &authv1.ResourceAttributes{
Namespace: namespace,
Verb: verb,
Group: gvr.Group,
Version: gvr.Version,
Resource: gvr.Resource,
}},
}, metav1.CreateOptions{})
if err != nil {
// TODO: maybe return the error too
return false
}
return response.Status.Allowed
}
```
--------------------------------------------------------------------------------
/pkg/mcp/nodes_test.go:
--------------------------------------------------------------------------------
```go
package mcp
import (
"net/http"
"testing"
"github.com/BurntSushi/toml"
"github.com/containers/kubernetes-mcp-server/internal/test"
"github.com/mark3labs/mcp-go/mcp"
"github.com/stretchr/testify/suite"
)
type NodesSuite struct {
BaseMcpSuite
mockServer *test.MockServer
}
func (s *NodesSuite) SetupTest() {
s.BaseMcpSuite.SetupTest()
s.mockServer = test.NewMockServer()
s.Cfg.KubeConfig = s.mockServer.KubeconfigFile(s.T())
}
func (s *NodesSuite) TearDownTest() {
s.BaseMcpSuite.TearDownTest()
if s.mockServer != nil {
s.mockServer.Close()
}
}
func (s *NodesSuite) TestNodesLog() {
s.mockServer.Handle(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
// Get Node response
if req.URL.Path == "/api/v1/nodes/existing-node" {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{
"apiVersion": "v1",
"kind": "Node",
"metadata": {
"name": "existing-node"
}
}`))
return
}
// Get Empty Log response
if req.URL.Path == "/api/v1/nodes/existing-node/proxy/logs/empty.log" {
w.Header().Set("Content-Type", "text/plain")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(``))
return
}
// Get Kubelet Log response
if req.URL.Path == "/api/v1/nodes/existing-node/proxy/logs/kubelet.log" {
w.Header().Set("Content-Type", "text/plain")
w.WriteHeader(http.StatusOK)
logContent := "Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n"
if req.URL.Query().Get("tailLines") != "" {
logContent = "Line 4\nLine 5\n"
}
_, _ = w.Write([]byte(logContent))
return
}
w.WriteHeader(http.StatusNotFound)
}))
s.InitMcpClient()
s.Run("nodes_log(name=nil)", func() {
toolResult, err := s.CallTool("nodes_log", map[string]interface{}{})
s.Require().NotNil(toolResult, "toolResult should not be nil")
s.Run("has error", func() {
s.Truef(toolResult.IsError, "call tool should fail")
s.Nilf(err, "call tool should not return error object")
})
s.Run("describes missing name", func() {
expectedMessage := "failed to get node log, missing argument name"
s.Equalf(expectedMessage, toolResult.Content[0].(mcp.TextContent).Text,
"expected descriptive error '%s', got %v", expectedMessage, toolResult.Content[0].(mcp.TextContent).Text)
})
})
s.Run("nodes_log(name=inexistent-node)", func() {
toolResult, err := s.CallTool("nodes_log", map[string]interface{}{
"name": "inexistent-node",
})
s.Require().NotNil(toolResult, "toolResult should not be nil")
s.Run("has error", func() {
s.Truef(toolResult.IsError, "call tool should fail")
s.Nilf(err, "call tool should not return error object")
})
s.Run("describes missing node", func() {
expectedMessage := "failed to get node log for inexistent-node: failed to get node inexistent-node: the server could not find the requested resource (get nodes inexistent-node)"
s.Equalf(expectedMessage, toolResult.Content[0].(mcp.TextContent).Text,
"expected descriptive error '%s', got %v", expectedMessage, toolResult.Content[0].(mcp.TextContent).Text)
})
})
s.Run("nodes_log(name=existing-node, log_path=missing.log)", func() {
toolResult, err := s.CallTool("nodes_log", map[string]interface{}{
"name": "existing-node",
"log_path": "missing.log",
})
s.Require().NotNil(toolResult, "toolResult should not be nil")
s.Run("has error", func() {
s.Truef(toolResult.IsError, "call tool should fail")
s.Nilf(err, "call tool should not return error object")
})
s.Run("describes missing log file", func() {
expectedMessage := "failed to get node log for existing-node: failed to get node logs: the server could not find the requested resource"
s.Equalf(expectedMessage, toolResult.Content[0].(mcp.TextContent).Text,
"expected descriptive error '%s', got %v", expectedMessage, toolResult.Content[0].(mcp.TextContent).Text)
})
})
s.Run("nodes_log(name=existing-node, log_path=empty.log)", func() {
toolResult, err := s.CallTool("nodes_log", map[string]interface{}{
"name": "existing-node",
"log_path": "empty.log",
})
s.Require().NotNil(toolResult, "toolResult should not be nil")
s.Run("no error", func() {
s.Falsef(toolResult.IsError, "call tool should succeed")
s.Nilf(err, "call tool should not return error object")
})
s.Run("describes empty log", func() {
expectedMessage := "The node existing-node has not logged any message yet or the log file is empty"
s.Equalf(expectedMessage, toolResult.Content[0].(mcp.TextContent).Text,
"expected descriptive message '%s', got %v", expectedMessage, toolResult.Content[0].(mcp.TextContent).Text)
})
})
s.Run("nodes_log(name=existing-node, log_path=kubelet.log)", func() {
toolResult, err := s.CallTool("nodes_log", map[string]interface{}{
"name": "existing-node",
"log_path": "kubelet.log",
})
s.Require().NotNil(toolResult, "toolResult should not be nil")
s.Run("no error", func() {
s.Falsef(toolResult.IsError, "call tool should succeed")
s.Nilf(err, "call tool should not return error object")
})
s.Run("returns full log", func() {
expectedMessage := "Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n"
s.Equalf(expectedMessage, toolResult.Content[0].(mcp.TextContent).Text,
"expected log content '%s', got %v", expectedMessage, toolResult.Content[0].(mcp.TextContent).Text)
})
})
for _, tailCase := range []interface{}{2, int64(2), float64(2)} {
s.Run("nodes_log(name=existing-node, log_path=kubelet.log, tail=2)", func() {
toolResult, err := s.CallTool("nodes_log", map[string]interface{}{
"name": "existing-node",
"log_path": "kubelet.log",
"tail": tailCase,
})
s.Require().NotNil(toolResult, "toolResult should not be nil")
s.Run("no error", func() {
s.Falsef(toolResult.IsError, "call tool should succeed")
s.Nilf(err, "call tool should not return error object")
})
s.Run("returns tail log", func() {
expectedMessage := "Line 4\nLine 5\n"
s.Equalf(expectedMessage, toolResult.Content[0].(mcp.TextContent).Text,
"expected log content '%s', got %v", expectedMessage, toolResult.Content[0].(mcp.TextContent).Text)
})
})
s.Run("nodes_log(name=existing-node, log_path=kubelet.log, tail=-1)", func() {
toolResult, err := s.CallTool("nodes_log", map[string]interface{}{
"name": "existing-node",
"log_path": "kubelet.log",
"tail": -1,
})
s.Require().NotNil(toolResult, "toolResult should not be nil")
s.Run("no error", func() {
s.Falsef(toolResult.IsError, "call tool should succeed")
s.Nilf(err, "call tool should not return error object")
})
s.Run("returns full log", func() {
expectedMessage := "Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n"
s.Equalf(expectedMessage, toolResult.Content[0].(mcp.TextContent).Text,
"expected log content '%s', got %v", expectedMessage, toolResult.Content[0].(mcp.TextContent).Text)
})
})
}
}
func (s *NodesSuite) TestNodesLogDenied() {
s.Require().NoError(toml.Unmarshal([]byte(`
denied_resources = [ { version = "v1", kind = "Node" } ]
`), s.Cfg), "Expected to parse denied resources config")
s.InitMcpClient()
s.Run("nodes_log (denied)", func() {
toolResult, err := s.CallTool("nodes_log", map[string]interface{}{
"name": "does-not-matter",
})
s.Require().NotNil(toolResult, "toolResult should not be nil")
s.Run("has error", func() {
s.Truef(toolResult.IsError, "call tool should fail")
s.Nilf(err, "call tool should not return error object")
})
s.Run("describes denial", func() {
expectedMessage := "failed to get node log for does-not-matter: resource not allowed: /v1, Kind=Node"
s.Equalf(expectedMessage, toolResult.Content[0].(mcp.TextContent).Text,
"expected descriptive error '%s', got %v", expectedMessage, toolResult.Content[0].(mcp.TextContent).Text)
})
})
}
func TestNodes(t *testing.T) {
suite.Run(t, new(NodesSuite))
}
```
--------------------------------------------------------------------------------
/pkg/config/config_test.go:
--------------------------------------------------------------------------------
```go
package config
import (
"errors"
"io/fs"
"os"
"path/filepath"
"strings"
"testing"
"github.com/stretchr/testify/suite"
)
type BaseConfigSuite struct {
suite.Suite
}
func (s *BaseConfigSuite) writeConfig(content string) string {
s.T().Helper()
tempDir := s.T().TempDir()
path := filepath.Join(tempDir, "config.toml")
err := os.WriteFile(path, []byte(content), 0644)
if err != nil {
s.T().Fatalf("Failed to write config file %s: %v", path, err)
}
return path
}
type ConfigSuite struct {
BaseConfigSuite
}
func (s *ConfigSuite) TestReadConfigMissingFile() {
config, err := Read("non-existent-config.toml")
s.Run("returns error for missing file", func() {
s.Require().NotNil(err, "Expected error for missing file, got nil")
s.True(errors.Is(err, fs.ErrNotExist), "Expected ErrNotExist, got %v", err)
})
s.Run("returns nil config for missing file", func() {
s.Nil(config, "Expected nil config for missing file")
})
}
func (s *ConfigSuite) TestReadConfigInvalid() {
invalidConfigPath := s.writeConfig(`
[[denied_resources]]
group = "apps"
version = "v1"
kind = "Deployment"
[[denied_resources]]
group = "rbac.authorization.k8s.io"
version = "v1"
kind = "Role
`)
config, err := Read(invalidConfigPath)
s.Run("returns error for invalid file", func() {
s.Require().NotNil(err, "Expected error for invalid file, got nil")
})
s.Run("error message contains toml error with line number", func() {
expectedError := "toml: line 9"
s.Truef(strings.HasPrefix(err.Error(), expectedError), "Expected error message to contain line number, got %v", err)
})
s.Run("returns nil config for invalid file", func() {
s.Nil(config, "Expected nil config for missing file")
})
}
func (s *ConfigSuite) TestReadConfigValid() {
validConfigPath := s.writeConfig(`
log_level = 1
port = "9999"
sse_base_url = "https://example.com"
kubeconfig = "./path/to/config"
list_output = "yaml"
read_only = true
disable_destructive = true
toolsets = ["core", "config", "helm", "metrics"]
enabled_tools = ["configuration_view", "events_list", "namespaces_list", "pods_list", "resources_list", "resources_get", "resources_create_or_update", "resources_delete"]
disabled_tools = ["pods_delete", "pods_top", "pods_log", "pods_run", "pods_exec"]
denied_resources = [
{group = "apps", version = "v1", kind = "Deployment"},
{group = "rbac.authorization.k8s.io", version = "v1", kind = "Role"}
]
`)
config, err := Read(validConfigPath)
s.Require().NotNil(config)
s.Run("reads and unmarshalls file", func() {
s.Nil(err, "Expected nil error for valid file")
s.Require().NotNil(config, "Expected non-nil config for valid file")
})
s.Run("log_level parsed correctly", func() {
s.Equalf(1, config.LogLevel, "Expected LogLevel to be 1, got %d", config.LogLevel)
})
s.Run("port parsed correctly", func() {
s.Equalf("9999", config.Port, "Expected Port to be 9999, got %s", config.Port)
})
s.Run("sse_base_url parsed correctly", func() {
s.Equalf("https://example.com", config.SSEBaseURL, "Expected SSEBaseURL to be https://example.com, got %s", config.SSEBaseURL)
})
s.Run("kubeconfig parsed correctly", func() {
s.Equalf("./path/to/config", config.KubeConfig, "Expected KubeConfig to be ./path/to/config, got %s", config.KubeConfig)
})
s.Run("list_output parsed correctly", func() {
s.Equalf("yaml", config.ListOutput, "Expected ListOutput to be yaml, got %s", config.ListOutput)
})
s.Run("read_only parsed correctly", func() {
s.Truef(config.ReadOnly, "Expected ReadOnly to be true, got %v", config.ReadOnly)
})
s.Run("disable_destructive parsed correctly", func() {
s.Truef(config.DisableDestructive, "Expected DisableDestructive to be true, got %v", config.DisableDestructive)
})
s.Run("toolsets", func() {
s.Require().Lenf(config.Toolsets, 4, "Expected 4 toolsets, got %d", len(config.Toolsets))
for _, toolset := range []string{"core", "config", "helm", "metrics"} {
s.Containsf(config.Toolsets, toolset, "Expected toolsets to contain %s", toolset)
}
})
s.Run("enabled_tools", func() {
s.Require().Lenf(config.EnabledTools, 8, "Expected 8 enabled tools, got %d", len(config.EnabledTools))
for _, tool := range []string{"configuration_view", "events_list", "namespaces_list", "pods_list", "resources_list", "resources_get", "resources_create_or_update", "resources_delete"} {
s.Containsf(config.EnabledTools, tool, "Expected enabled tools to contain %s", tool)
}
})
s.Run("disabled_tools", func() {
s.Require().Lenf(config.DisabledTools, 5, "Expected 5 disabled tools, got %d", len(config.DisabledTools))
for _, tool := range []string{"pods_delete", "pods_top", "pods_log", "pods_run", "pods_exec"} {
s.Containsf(config.DisabledTools, tool, "Expected disabled tools to contain %s", tool)
}
})
s.Run("denied_resources", func() {
s.Require().Lenf(config.DeniedResources, 2, "Expected 2 denied resources, got %d", len(config.DeniedResources))
s.Run("contains apps/v1/Deployment", func() {
s.Contains(config.DeniedResources, GroupVersionKind{Group: "apps", Version: "v1", Kind: "Deployment"},
"Expected denied resources to contain apps/v1/Deployment")
})
s.Run("contains rbac.authorization.k8s.io/v1/Role", func() {
s.Contains(config.DeniedResources, GroupVersionKind{Group: "rbac.authorization.k8s.io", Version: "v1", Kind: "Role"},
"Expected denied resources to contain rbac.authorization.k8s.io/v1/Role")
})
})
}
func (s *ConfigSuite) TestReadConfigValidPreservesDefaultsForMissingFields() {
validConfigPath := s.writeConfig(`
port = "1337"
`)
config, err := Read(validConfigPath)
s.Require().NotNil(config)
s.Run("reads and unmarshalls file", func() {
s.Nil(err, "Expected nil error for valid file")
s.Require().NotNil(config, "Expected non-nil config for valid file")
})
s.Run("log_level defaulted correctly", func() {
s.Equalf(0, config.LogLevel, "Expected LogLevel to be 0, got %d", config.LogLevel)
})
s.Run("port parsed correctly", func() {
s.Equalf("1337", config.Port, "Expected Port to be 9999, got %s", config.Port)
})
s.Run("list_output defaulted correctly", func() {
s.Equalf("table", config.ListOutput, "Expected ListOutput to be table, got %s", config.ListOutput)
})
s.Run("toolsets defaulted correctly", func() {
s.Require().Lenf(config.Toolsets, 3, "Expected 3 toolsets, got %d", len(config.Toolsets))
for _, toolset := range []string{"core", "config", "helm"} {
s.Containsf(config.Toolsets, toolset, "Expected toolsets to contain %s", toolset)
}
})
}
func (s *ConfigSuite) TestMergeConfig() {
base := StaticConfig{
ListOutput: "table",
Toolsets: []string{"core", "config", "helm"},
Port: "8080",
}
s.Run("merges override values on top of base", func() {
override := StaticConfig{
ListOutput: "json",
Port: "9090",
}
result := mergeConfig(base, override)
s.Equal("json", result.ListOutput, "ListOutput should be overridden")
s.Equal("9090", result.Port, "Port should be overridden")
})
s.Run("preserves base values when override is empty", func() {
override := StaticConfig{}
result := mergeConfig(base, override)
s.Equal("table", result.ListOutput, "ListOutput should be preserved from base")
s.Equal([]string{"core", "config", "helm"}, result.Toolsets, "Toolsets should be preserved from base")
s.Equal("8080", result.Port, "Port should be preserved from base")
})
s.Run("handles partial overrides", func() {
override := StaticConfig{
Toolsets: []string{"custom"},
ReadOnly: true,
}
result := mergeConfig(base, override)
s.Equal("table", result.ListOutput, "ListOutput should be preserved from base")
s.Equal([]string{"custom"}, result.Toolsets, "Toolsets should be overridden")
s.Equal("8080", result.Port, "Port should be preserved from base since override doesn't specify it")
s.True(result.ReadOnly, "ReadOnly should be overridden to true")
})
}
func TestConfig(t *testing.T) {
suite.Run(t, new(ConfigSuite))
}
```
--------------------------------------------------------------------------------
/pkg/mcp/toolsets_test.go:
--------------------------------------------------------------------------------
```go
package mcp
import (
"encoding/json"
"strconv"
"testing"
"github.com/containers/kubernetes-mcp-server/internal/test"
"github.com/containers/kubernetes-mcp-server/pkg/api"
configuration "github.com/containers/kubernetes-mcp-server/pkg/config"
"github.com/containers/kubernetes-mcp-server/pkg/toolsets"
"github.com/containers/kubernetes-mcp-server/pkg/toolsets/config"
"github.com/containers/kubernetes-mcp-server/pkg/toolsets/core"
"github.com/containers/kubernetes-mcp-server/pkg/toolsets/helm"
"github.com/mark3labs/mcp-go/mcp"
"github.com/stretchr/testify/suite"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)
type ToolsetsSuite struct {
suite.Suite
originalToolsets []api.Toolset
*test.MockServer
*test.McpClient
Cfg *configuration.StaticConfig
mcpServer *Server
}
func (s *ToolsetsSuite) SetupTest() {
s.originalToolsets = toolsets.Toolsets()
s.MockServer = test.NewMockServer()
s.Cfg = configuration.Default()
s.Cfg.KubeConfig = s.KubeconfigFile(s.T())
}
func (s *ToolsetsSuite) TearDownTest() {
toolsets.Clear()
for _, toolset := range s.originalToolsets {
toolsets.Register(toolset)
}
s.MockServer.Close()
}
func (s *ToolsetsSuite) TearDownSubTest() {
if s.McpClient != nil {
s.McpClient.Close()
}
if s.mcpServer != nil {
s.mcpServer.Close()
}
}
func (s *ToolsetsSuite) TestNoToolsets() {
s.Run("No toolsets registered", func() {
toolsets.Clear()
s.Cfg.Toolsets = []string{}
s.InitMcpClient()
tools, err := s.ListTools(s.T().Context(), mcp.ListToolsRequest{})
s.Run("ListTools returns no tools", func() {
s.NotNil(tools, "Expected tools from ListTools")
s.NoError(err, "Expected no error from ListTools")
s.Empty(tools.Tools, "Expected no tools from ListTools")
})
})
}
func (s *ToolsetsSuite) TestDefaultToolsetsTools() {
if configuration.HasDefaultOverrides() {
s.T().Skip("Skipping test because default configuration overrides are present (this is a downstream fork)")
}
s.Run("Default configuration toolsets", func() {
s.InitMcpClient()
tools, err := s.ListTools(s.T().Context(), mcp.ListToolsRequest{})
s.Run("ListTools returns tools", func() {
s.NotNil(tools, "Expected tools from ListTools")
s.NoError(err, "Expected no error from ListTools")
})
s.Run("ListTools returns correct Tool metadata", func() {
expectedMetadata := test.ReadFile("testdata", "toolsets-full-tools.json")
metadata, err := json.MarshalIndent(tools.Tools, "", " ")
s.Require().NoErrorf(err, "failed to marshal tools metadata: %v", err)
s.JSONEq(expectedMetadata, string(metadata), "tools metadata does not match expected")
})
})
}
func (s *ToolsetsSuite) TestDefaultToolsetsToolsInOpenShift() {
if configuration.HasDefaultOverrides() {
s.T().Skip("Skipping test because default configuration overrides are present (this is a downstream fork)")
}
s.Run("Default configuration toolsets in OpenShift", func() {
s.Handle(&test.InOpenShiftHandler{})
s.InitMcpClient()
tools, err := s.ListTools(s.T().Context(), mcp.ListToolsRequest{})
s.Run("ListTools returns tools", func() {
s.NotNil(tools, "Expected tools from ListTools")
s.NoError(err, "Expected no error from ListTools")
})
s.Run("ListTools returns correct Tool metadata", func() {
expectedMetadata := test.ReadFile("testdata", "toolsets-full-tools-openshift.json")
metadata, err := json.MarshalIndent(tools.Tools, "", " ")
s.Require().NoErrorf(err, "failed to marshal tools metadata: %v", err)
s.JSONEq(expectedMetadata, string(metadata), "tools metadata does not match expected")
})
})
}
func (s *ToolsetsSuite) TestDefaultToolsetsToolsInMultiCluster() {
if configuration.HasDefaultOverrides() {
s.T().Skip("Skipping test because default configuration overrides are present (this is a downstream fork)")
}
s.Run("Default configuration toolsets in multi-cluster (with 11 clusters)", func() {
kubeconfig := s.Kubeconfig()
for i := 0; i < 10; i++ {
// Add multiple fake contexts to force multi-cluster behavior
kubeconfig.Contexts[strconv.Itoa(i)] = clientcmdapi.NewContext()
}
s.Cfg.KubeConfig = test.KubeconfigFile(s.T(), kubeconfig)
s.InitMcpClient()
tools, err := s.ListTools(s.T().Context(), mcp.ListToolsRequest{})
s.Run("ListTools returns tools", func() {
s.NotNil(tools, "Expected tools from ListTools")
s.NoError(err, "Expected no error from ListTools")
})
s.Run("ListTools returns correct Tool metadata", func() {
expectedMetadata := test.ReadFile("testdata", "toolsets-full-tools-multicluster.json")
metadata, err := json.MarshalIndent(tools.Tools, "", " ")
s.Require().NoErrorf(err, "failed to marshal tools metadata: %v", err)
s.JSONEq(expectedMetadata, string(metadata), "tools metadata does not match expected")
})
})
}
func (s *ToolsetsSuite) TestDefaultToolsetsToolsInMultiClusterEnum() {
if configuration.HasDefaultOverrides() {
s.T().Skip("Skipping test because default configuration overrides are present (this is a downstream fork)")
}
s.Run("Default configuration toolsets in multi-cluster (with 2 clusters)", func() {
kubeconfig := s.Kubeconfig()
// Add additional cluster to force multi-cluster behavior with enum parameter
kubeconfig.Contexts["extra-cluster"] = clientcmdapi.NewContext()
s.Cfg.KubeConfig = test.KubeconfigFile(s.T(), kubeconfig)
s.InitMcpClient()
tools, err := s.ListTools(s.T().Context(), mcp.ListToolsRequest{})
s.Run("ListTools returns tools", func() {
s.NotNil(tools, "Expected tools from ListTools")
s.NoError(err, "Expected no error from ListTools")
})
s.Run("ListTools returns correct Tool metadata", func() {
expectedMetadata := test.ReadFile("testdata", "toolsets-full-tools-multicluster-enum.json")
metadata, err := json.MarshalIndent(tools.Tools, "", " ")
s.Require().NoErrorf(err, "failed to marshal tools metadata: %v", err)
s.JSONEq(expectedMetadata, string(metadata), "tools metadata does not match expected")
})
})
}
func (s *ToolsetsSuite) TestGranularToolsetsTools() {
testCases := []api.Toolset{
&core.Toolset{},
&config.Toolset{},
&helm.Toolset{},
}
for _, testCase := range testCases {
s.Run("Toolset "+testCase.GetName(), func() {
toolsets.Clear()
toolsets.Register(testCase)
s.Cfg.Toolsets = []string{testCase.GetName()}
s.InitMcpClient()
tools, err := s.ListTools(s.T().Context(), mcp.ListToolsRequest{})
s.Run("ListTools returns tools", func() {
s.NotNil(tools, "Expected tools from ListTools")
s.NoError(err, "Expected no error from ListTools")
})
s.Run("ListTools returns correct Tool metadata", func() {
expectedMetadata := test.ReadFile("testdata", "toolsets-"+testCase.GetName()+"-tools.json")
metadata, err := json.MarshalIndent(tools.Tools, "", " ")
s.Require().NoErrorf(err, "failed to marshal tools metadata: %v", err)
s.JSONEq(expectedMetadata, string(metadata), "tools metadata does not match expected")
})
})
}
}
func (s *ToolsetsSuite) TestInputSchemaEdgeCases() {
//https://github.com/containers/kubernetes-mcp-server/issues/340
s.Run("InputSchema for no-arg tool is object with empty properties", func() {
s.InitMcpClient()
tools, err := s.ListTools(s.T().Context(), mcp.ListToolsRequest{})
s.Run("ListTools returns tools", func() {
s.NotNil(tools, "Expected tools from ListTools")
s.NoError(err, "Expected no error from ListTools")
})
var namespacesList *mcp.Tool
for _, tool := range tools.Tools {
if tool.Name == "namespaces_list" {
namespacesList = &tool
break
}
}
s.Require().NotNil(namespacesList, "Expected namespaces_list from ListTools")
s.NotNil(namespacesList.InputSchema.Properties, "Expected namespaces_list.InputSchema.Properties not to be nil")
s.Empty(namespacesList.InputSchema.Properties, "Expected namespaces_list.InputSchema.Properties to be empty")
})
}
func (s *ToolsetsSuite) InitMcpClient() {
var err error
s.mcpServer, err = NewServer(Configuration{StaticConfig: s.Cfg})
s.Require().NoError(err, "Expected no error creating MCP server")
s.McpClient = test.NewMcpClient(s.T(), s.mcpServer.ServeHTTP(nil))
}
func TestToolsets(t *testing.T) {
suite.Run(t, new(ToolsetsSuite))
}
```
--------------------------------------------------------------------------------
/pkg/kubernetes/manager_test.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import (
"os"
"path/filepath"
"runtime"
"testing"
"github.com/containers/kubernetes-mcp-server/internal/test"
"github.com/containers/kubernetes-mcp-server/pkg/config"
"github.com/stretchr/testify/suite"
"k8s.io/client-go/rest"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)
type ManagerTestSuite struct {
suite.Suite
originalEnv []string
originalInClusterConfig func() (*rest.Config, error)
mockServer *test.MockServer
}
func (s *ManagerTestSuite) SetupTest() {
s.originalEnv = os.Environ()
s.originalInClusterConfig = InClusterConfig
s.mockServer = test.NewMockServer()
}
func (s *ManagerTestSuite) TearDownTest() {
test.RestoreEnv(s.originalEnv)
InClusterConfig = s.originalInClusterConfig
if s.mockServer != nil {
s.mockServer.Close()
}
}
func (s *ManagerTestSuite) TestNewInClusterManager() {
s.Run("In cluster", func() {
InClusterConfig = func() (*rest.Config, error) {
return &rest.Config{}, nil
}
s.Run("with default StaticConfig (empty kubeconfig)", func() {
manager, err := NewInClusterManager(&config.StaticConfig{})
s.Require().NoError(err)
s.Require().NotNil(manager)
s.Run("behaves as in cluster", func() {
rawConfig, err := manager.clientCmdConfig.RawConfig()
s.Require().NoError(err)
s.Equal("in-cluster", rawConfig.CurrentContext, "expected current context to be 'in-cluster'")
})
s.Run("sets default user-agent", func() {
s.Contains(manager.cfg.UserAgent, "("+runtime.GOOS+"/"+runtime.GOARCH+")")
})
})
s.Run("with explicit kubeconfig", func() {
manager, err := NewInClusterManager(&config.StaticConfig{
KubeConfig: s.mockServer.KubeconfigFile(s.T()),
})
s.Run("returns error", func() {
s.Error(err)
s.Nil(manager)
s.Regexp("kubeconfig file .+ cannot be used with the in-cluster deployments", err.Error())
})
})
})
s.Run("Out of cluster", func() {
InClusterConfig = func() (*rest.Config, error) {
return nil, rest.ErrNotInCluster
}
manager, err := NewInClusterManager(&config.StaticConfig{})
s.Run("returns error", func() {
s.Error(err)
s.Nil(manager)
s.ErrorIs(err, ErrorInClusterNotInCluster)
s.ErrorContains(err, "in-cluster manager cannot be used outside of a cluster")
})
})
}
func (s *ManagerTestSuite) TestNewKubeconfigManager() {
s.Run("Out of cluster", func() {
InClusterConfig = func() (*rest.Config, error) {
return nil, rest.ErrNotInCluster
}
s.Run("with valid kubeconfig in env", func() {
kubeconfig := s.mockServer.KubeconfigFile(s.T())
s.Require().NoError(os.Setenv("KUBECONFIG", kubeconfig))
manager, err := NewKubeconfigManager(&config.StaticConfig{}, "")
s.Require().NoError(err)
s.Require().NotNil(manager)
s.Run("behaves as NOT in cluster", func() {
rawConfig, err := manager.clientCmdConfig.RawConfig()
s.Require().NoError(err)
s.NotEqual("in-cluster", rawConfig.CurrentContext, "expected current context to NOT be 'in-cluster'")
s.Equal("fake-context", rawConfig.CurrentContext, "expected current context to be 'fake-context' as in kubeconfig")
})
s.Run("loads correct config", func() {
s.Contains(manager.clientCmdConfig.ConfigAccess().GetLoadingPrecedence(), kubeconfig, "expected kubeconfig path to match")
})
s.Run("sets default user-agent", func() {
s.Contains(manager.cfg.UserAgent, "("+runtime.GOOS+"/"+runtime.GOARCH+")")
})
s.Run("rest config host points to mock server", func() {
s.Equal(s.mockServer.Config().Host, manager.cfg.Host, "expected rest config host to match mock server")
})
})
s.Run("with valid kubeconfig in env and explicit kubeconfig in config", func() {
kubeconfigInEnv := s.mockServer.KubeconfigFile(s.T())
s.Require().NoError(os.Setenv("KUBECONFIG", kubeconfigInEnv))
kubeconfigExplicit := s.mockServer.KubeconfigFile(s.T())
manager, err := NewKubeconfigManager(&config.StaticConfig{
KubeConfig: kubeconfigExplicit,
}, "")
s.Require().NoError(err)
s.Require().NotNil(manager)
s.Run("behaves as NOT in cluster", func() {
rawConfig, err := manager.clientCmdConfig.RawConfig()
s.Require().NoError(err)
s.NotEqual("in-cluster", rawConfig.CurrentContext, "expected current context to NOT be 'in-cluster'")
s.Equal("fake-context", rawConfig.CurrentContext, "expected current context to be 'fake-context' as in kubeconfig")
})
s.Run("loads correct config (explicit)", func() {
s.NotContains(manager.clientCmdConfig.ConfigAccess().GetLoadingPrecedence(), kubeconfigInEnv, "expected kubeconfig path to NOT match env")
s.Contains(manager.clientCmdConfig.ConfigAccess().GetLoadingPrecedence(), kubeconfigExplicit, "expected kubeconfig path to match explicit")
})
s.Run("rest config host points to mock server", func() {
s.Equal(s.mockServer.Config().Host, manager.cfg.Host, "expected rest config host to match mock server")
})
})
s.Run("with valid kubeconfig in env and explicit kubeconfig context (valid)", func() {
kubeconfig := s.mockServer.Kubeconfig()
kubeconfig.Contexts["not-the-mock-server"] = clientcmdapi.NewContext()
kubeconfig.Contexts["not-the-mock-server"].Cluster = "not-the-mock-server"
kubeconfig.Clusters["not-the-mock-server"] = clientcmdapi.NewCluster()
kubeconfig.Clusters["not-the-mock-server"].Server = "https://not-the-mock-server:6443" // REST configuration should point to mock server, not this
kubeconfig.CurrentContext = "not-the-mock-server"
kubeconfigFile := test.KubeconfigFile(s.T(), kubeconfig)
s.Require().NoError(os.Setenv("KUBECONFIG", kubeconfigFile))
manager, err := NewKubeconfigManager(&config.StaticConfig{}, "fake-context") // fake-context is the one mock-server serves
s.Require().NoError(err)
s.Require().NotNil(manager)
s.Run("behaves as NOT in cluster", func() {
rawConfig, err := manager.clientCmdConfig.RawConfig()
s.Require().NoError(err)
s.NotEqual("in-cluster", rawConfig.CurrentContext, "expected current context to NOT be 'in-cluster'")
s.Equal("not-the-mock-server", rawConfig.CurrentContext, "expected current context to be 'not-the-mock-server' as in explicit context")
})
s.Run("loads correct config", func() {
s.Contains(manager.clientCmdConfig.ConfigAccess().GetLoadingPrecedence(), kubeconfigFile, "expected kubeconfig path to match")
})
s.Run("rest config host points to mock server", func() {
s.Equal(s.mockServer.Config().Host, manager.cfg.Host, "expected rest config host to match mock server")
})
})
s.Run("with valid kubeconfig in env and explicit kubeconfig context (invalid)", func() {
kubeconfigInEnv := s.mockServer.KubeconfigFile(s.T())
s.Require().NoError(os.Setenv("KUBECONFIG", kubeconfigInEnv))
manager, err := NewKubeconfigManager(&config.StaticConfig{}, "i-do-not-exist")
s.Run("returns error", func() {
s.Error(err)
s.Nil(manager)
s.ErrorContains(err, `failed to create kubernetes rest config from kubeconfig: context "i-do-not-exist" does not exist`)
})
})
s.Run("with invalid path kubeconfig in env", func() {
s.Require().NoError(os.Setenv("KUBECONFIG", "i-dont-exist"))
manager, err := NewKubeconfigManager(&config.StaticConfig{}, "")
s.Run("returns error", func() {
s.Error(err)
s.Nil(manager)
s.ErrorContains(err, "failed to create kubernetes rest config")
})
})
s.Run("with empty kubeconfig in env", func() {
kubeconfigPath := filepath.Join(s.T().TempDir(), "config")
s.Require().NoError(os.WriteFile(kubeconfigPath, []byte(""), 0644))
s.Require().NoError(os.Setenv("KUBECONFIG", kubeconfigPath))
manager, err := NewKubeconfigManager(&config.StaticConfig{}, "")
s.Run("returns error", func() {
s.Error(err)
s.Nil(manager)
s.ErrorContains(err, "no configuration has been provided")
})
})
})
s.Run("In cluster", func() {
InClusterConfig = func() (*rest.Config, error) {
return &rest.Config{}, nil
}
manager, err := NewKubeconfigManager(&config.StaticConfig{}, "")
s.Run("returns error", func() {
s.Error(err)
s.Nil(manager)
s.ErrorIs(err, ErrorKubeconfigInClusterNotAllowed)
s.ErrorContains(err, "kubeconfig manager cannot be used in in-cluster deployments")
})
})
}
func TestManager(t *testing.T) {
suite.Run(t, new(ManagerTestSuite))
}
```
--------------------------------------------------------------------------------
/pkg/mcp/configuration_test.go:
--------------------------------------------------------------------------------
```go
package mcp
import (
"fmt"
"testing"
"github.com/mark3labs/mcp-go/mcp"
"github.com/stretchr/testify/suite"
"k8s.io/client-go/rest"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
v1 "k8s.io/client-go/tools/clientcmd/api/v1"
"sigs.k8s.io/yaml"
"github.com/containers/kubernetes-mcp-server/internal/test"
"github.com/containers/kubernetes-mcp-server/pkg/kubernetes"
)
type ConfigurationSuite struct {
BaseMcpSuite
}
func (s *ConfigurationSuite) SetupTest() {
s.BaseMcpSuite.SetupTest()
// Use mock server for predictable kubeconfig content
mockServer := test.NewMockServer()
s.T().Cleanup(mockServer.Close)
kubeconfig := mockServer.Kubeconfig()
for i := 0; i < 10; i++ {
// Add multiple fake contexts to force configuration_contexts_list tool to appear
// and test minification in configuration_view tool
name := fmt.Sprintf("cluster-%d", i)
kubeconfig.Contexts[name] = clientcmdapi.NewContext()
kubeconfig.Clusters[name+"-cluster"] = clientcmdapi.NewCluster()
kubeconfig.AuthInfos[name+"-auth"] = clientcmdapi.NewAuthInfo()
kubeconfig.Contexts[name].Cluster = name + "-cluster"
kubeconfig.Contexts[name].AuthInfo = name + "-auth"
}
s.Cfg.KubeConfig = test.KubeconfigFile(s.T(), kubeconfig)
}
func (s *ConfigurationSuite) TestContextsList() {
s.InitMcpClient()
s.Run("configuration_contexts_list", func() {
toolResult, err := s.CallTool("configuration_contexts_list", map[string]interface{}{})
s.Run("returns contexts", func() {
s.Nilf(err, "call tool failed %v", err)
})
s.Require().NotNil(toolResult, "Expected tool result from call")
s.Lenf(toolResult.Content, 1, "invalid tool result content length %v", len(toolResult.Content))
s.Run("contains context count", func() {
s.Regexpf(`^Available Kubernetes contexts \(11 total`, toolResult.Content[0].(mcp.TextContent).Text, "invalid tool count result content %v", toolResult.Content[0].(mcp.TextContent).Text)
})
s.Run("contains default context name", func() {
s.Regexpf(`^Available Kubernetes contexts \(\d+ total, default: fake-context\)`, toolResult.Content[0].(mcp.TextContent).Text, "invalid tool context default result content %v", toolResult.Content[0].(mcp.TextContent).Text)
s.Regexpf(`(?m)^\*fake-context -> http:\/\/127\.0\.0\.1:\d*$`, toolResult.Content[0].(mcp.TextContent).Text, "invalid tool context default result content %v", toolResult.Content[0].(mcp.TextContent).Text)
})
})
}
func (s *ConfigurationSuite) TestConfigurationView() {
s.InitMcpClient()
s.Run("configuration_view", func() {
toolResult, err := s.CallTool("configuration_view", map[string]interface{}{})
s.Run("returns configuration", func() {
s.Nilf(err, "call tool failed %v", err)
})
s.Require().NotNil(toolResult, "Expected tool result from call")
var decoded *v1.Config
err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded)
s.Run("has yaml content", func() {
s.Nilf(err, "invalid tool result content %v", err)
})
s.Run("returns current-context", func() {
s.Equalf("fake-context", decoded.CurrentContext, "fake-context not found: %v", decoded.CurrentContext)
})
s.Run("returns context info", func() {
s.Lenf(decoded.Contexts, 1, "invalid context count, expected 1, got %v", len(decoded.Contexts))
s.Equalf("fake-context", decoded.Contexts[0].Name, "fake-context not found: %v", decoded.Contexts)
s.Equalf("fake", decoded.Contexts[0].Context.Cluster, "fake-cluster not found: %v", decoded.Contexts)
s.Equalf("fake", decoded.Contexts[0].Context.AuthInfo, "fake-auth not found: %v", decoded.Contexts)
})
s.Run("returns cluster info", func() {
s.Lenf(decoded.Clusters, 1, "invalid cluster count, expected 1, got %v", len(decoded.Clusters))
s.Equalf("fake", decoded.Clusters[0].Name, "fake-cluster not found: %v", decoded.Clusters)
s.Regexpf(`^https?://(127\.0\.0\.1|localhost):\d{1,5}$`, decoded.Clusters[0].Cluster.Server, "fake-server not found: %v", decoded.Clusters)
})
s.Run("returns auth info", func() {
s.Lenf(decoded.AuthInfos, 1, "invalid auth info count, expected 1, got %v", len(decoded.AuthInfos))
s.Equalf("fake", decoded.AuthInfos[0].Name, "fake-auth not found: %v", decoded.AuthInfos)
})
})
s.Run("configuration_view(minified=false)", func() {
toolResult, err := s.CallTool("configuration_view", map[string]interface{}{
"minified": false,
})
s.Run("returns configuration", func() {
s.Nilf(err, "call tool failed %v", err)
})
var decoded *v1.Config
err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded)
s.Run("has yaml content", func() {
s.Nilf(err, "invalid tool result content %v", err)
})
s.Run("returns additional context info", func() {
s.Lenf(decoded.Contexts, 11, "invalid context count, expected 12, got %v", len(decoded.Contexts))
s.Equalf("cluster-0", decoded.Contexts[0].Name, "cluster-0 not found: %v", decoded.Contexts)
s.Equalf("cluster-0-cluster", decoded.Contexts[0].Context.Cluster, "cluster-0-cluster not found: %v", decoded.Contexts)
s.Equalf("cluster-0-auth", decoded.Contexts[0].Context.AuthInfo, "cluster-0-auth not found: %v", decoded.Contexts)
s.Equalf("fake", decoded.Contexts[10].Context.Cluster, "fake not found: %v", decoded.Contexts)
s.Equalf("fake", decoded.Contexts[10].Context.AuthInfo, "fake not found: %v", decoded.Contexts)
s.Equalf("fake-context", decoded.Contexts[10].Name, "fake-context not found: %v", decoded.Contexts)
})
s.Run("returns cluster info", func() {
s.Lenf(decoded.Clusters, 11, "invalid cluster count, expected 2, got %v", len(decoded.Clusters))
s.Equalf("cluster-0-cluster", decoded.Clusters[0].Name, "cluster-0-cluster not found: %v", decoded.Clusters)
s.Equalf("fake", decoded.Clusters[10].Name, "fake not found: %v", decoded.Clusters)
})
s.Run("configuration_view with minified=false returns auth info", func() {
s.Lenf(decoded.AuthInfos, 11, "invalid auth info count, expected 2, got %v", len(decoded.AuthInfos))
s.Equalf("cluster-0-auth", decoded.AuthInfos[0].Name, "cluster-0-auth not found: %v", decoded.AuthInfos)
s.Equalf("fake", decoded.AuthInfos[10].Name, "fake not found: %v", decoded.AuthInfos)
})
})
}
func (s *ConfigurationSuite) TestConfigurationViewInCluster() {
s.Cfg.KubeConfig = "" // Force in-cluster
kubernetes.InClusterConfig = func() (*rest.Config, error) {
return &rest.Config{
Host: "https://kubernetes.default.svc",
BearerToken: "fake-token",
}, nil
}
s.T().Cleanup(func() { kubernetes.InClusterConfig = rest.InClusterConfig })
s.InitMcpClient()
s.Run("configuration_view", func() {
toolResult, err := s.CallTool("configuration_view", map[string]interface{}{})
s.Run("returns configuration", func() {
s.Nilf(err, "call tool failed %v", err)
})
s.Require().NotNil(toolResult, "Expected tool result from call")
var decoded *v1.Config
err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded)
s.Run("has yaml content", func() {
s.Nilf(err, "invalid tool result content %v", err)
})
s.Run("returns current-context", func() {
s.Equalf("in-cluster", decoded.CurrentContext, "context not found: %v", decoded.CurrentContext)
})
s.Run("returns context info", func() {
s.Lenf(decoded.Contexts, 1, "invalid context count, expected 1, got %v", len(decoded.Contexts))
s.Equalf("in-cluster", decoded.Contexts[0].Name, "context not found: %v", decoded.Contexts)
s.Equalf("cluster", decoded.Contexts[0].Context.Cluster, "cluster not found: %v", decoded.Contexts)
s.Equalf("user", decoded.Contexts[0].Context.AuthInfo, "user not found: %v", decoded.Contexts)
})
s.Run("returns cluster info", func() {
s.Lenf(decoded.Clusters, 1, "invalid cluster count, expected 1, got %v", len(decoded.Clusters))
s.Equalf("cluster", decoded.Clusters[0].Name, "cluster not found: %v", decoded.Clusters)
s.Equalf("https://kubernetes.default.svc", decoded.Clusters[0].Cluster.Server, "server not found: %v", decoded.Clusters)
})
s.Run("returns auth info", func() {
s.Lenf(decoded.AuthInfos, 1, "invalid auth info count, expected 1, got %v", len(decoded.AuthInfos))
s.Equalf("user", decoded.AuthInfos[0].Name, "user not found: %v", decoded.AuthInfos)
})
})
}
func TestConfiguration(t *testing.T) {
suite.Run(t, new(ConfigurationSuite))
}
```
--------------------------------------------------------------------------------
/pkg/kubernetes/pods.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import (
"bytes"
"context"
"errors"
"fmt"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
labelutil "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/metrics/pkg/apis/metrics"
metricsv1beta1api "k8s.io/metrics/pkg/apis/metrics/v1beta1"
"k8s.io/utils/ptr"
"github.com/containers/kubernetes-mcp-server/pkg/version"
)
// Default number of lines to retrieve from the end of the logs
const DefaultTailLines = int64(100)
type PodsTopOptions struct {
metav1.ListOptions
AllNamespaces bool
Namespace string
Name string
}
func (k *Kubernetes) PodsListInAllNamespaces(ctx context.Context, options ResourceListOptions) (runtime.Unstructured, error) {
return k.ResourcesList(ctx, &schema.GroupVersionKind{
Group: "", Version: "v1", Kind: "Pod",
}, "", options)
}
func (k *Kubernetes) PodsListInNamespace(ctx context.Context, namespace string, options ResourceListOptions) (runtime.Unstructured, error) {
return k.ResourcesList(ctx, &schema.GroupVersionKind{
Group: "", Version: "v1", Kind: "Pod",
}, namespace, options)
}
func (k *Kubernetes) PodsGet(ctx context.Context, namespace, name string) (*unstructured.Unstructured, error) {
return k.ResourcesGet(ctx, &schema.GroupVersionKind{
Group: "", Version: "v1", Kind: "Pod",
}, k.NamespaceOrDefault(namespace), name)
}
func (k *Kubernetes) PodsDelete(ctx context.Context, namespace, name string) (string, error) {
namespace = k.NamespaceOrDefault(namespace)
pod, err := k.ResourcesGet(ctx, &schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}, namespace, name)
if err != nil {
return "", err
}
isManaged := pod.GetLabels()[AppKubernetesManagedBy] == version.BinaryName
managedLabelSelector := labelutil.Set{
AppKubernetesManagedBy: version.BinaryName,
AppKubernetesName: pod.GetLabels()[AppKubernetesName],
}.AsSelector()
// Delete managed service
if isManaged {
services, err := k.manager.accessControlClientSet.Services(namespace)
if err != nil {
return "", err
}
if sl, _ := services.List(ctx, metav1.ListOptions{
LabelSelector: managedLabelSelector.String(),
}); sl != nil {
for _, svc := range sl.Items {
_ = services.Delete(ctx, svc.Name, metav1.DeleteOptions{})
}
}
}
// Delete managed Route
if isManaged && k.supportsGroupVersion("route.openshift.io/v1") {
routeResources := k.manager.dynamicClient.
Resource(schema.GroupVersionResource{Group: "route.openshift.io", Version: "v1", Resource: "routes"}).
Namespace(namespace)
if rl, _ := routeResources.List(ctx, metav1.ListOptions{
LabelSelector: managedLabelSelector.String(),
}); rl != nil {
for _, route := range rl.Items {
_ = routeResources.Delete(ctx, route.GetName(), metav1.DeleteOptions{})
}
}
}
return "Pod deleted successfully",
k.ResourcesDelete(ctx, &schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}, namespace, name)
}
func (k *Kubernetes) PodsLog(ctx context.Context, namespace, name, container string, previous bool, tail int64) (string, error) {
pods, err := k.manager.accessControlClientSet.Pods(k.NamespaceOrDefault(namespace))
if err != nil {
return "", err
}
logOptions := &v1.PodLogOptions{
Container: container,
Previous: previous,
}
// Only set tailLines if a value is provided (non-zero)
if tail > 0 {
logOptions.TailLines = &tail
} else {
// Default to DefaultTailLines lines when not specified
logOptions.TailLines = ptr.To(DefaultTailLines)
}
req := pods.GetLogs(name, logOptions)
res := req.Do(ctx)
if res.Error() != nil {
return "", res.Error()
}
rawData, err := res.Raw()
if err != nil {
return "", err
}
return string(rawData), nil
}
func (k *Kubernetes) PodsRun(ctx context.Context, namespace, name, image string, port int32) ([]*unstructured.Unstructured, error) {
if name == "" {
name = version.BinaryName + "-run-" + rand.String(5)
}
labels := map[string]string{
AppKubernetesName: name,
AppKubernetesComponent: name,
AppKubernetesManagedBy: version.BinaryName,
AppKubernetesPartOf: version.BinaryName + "-run-sandbox",
}
// NewPod
var resources []any
pod := &v1.Pod{
TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Pod"},
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: k.NamespaceOrDefault(namespace), Labels: labels},
Spec: v1.PodSpec{Containers: []v1.Container{{
Name: name,
Image: image,
ImagePullPolicy: v1.PullAlways,
}}},
}
resources = append(resources, pod)
if port > 0 {
pod.Spec.Containers[0].Ports = []v1.ContainerPort{{ContainerPort: port}}
resources = append(resources, &v1.Service{
TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Service"},
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: k.NamespaceOrDefault(namespace), Labels: labels},
Spec: v1.ServiceSpec{
Selector: labels,
Type: v1.ServiceTypeClusterIP,
Ports: []v1.ServicePort{{Port: port, TargetPort: intstr.FromInt32(port)}},
},
})
}
if port > 0 && k.supportsGroupVersion("route.openshift.io/v1") {
resources = append(resources, &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "route.openshift.io/v1",
"kind": "Route",
"metadata": map[string]interface{}{
"name": name,
"namespace": k.NamespaceOrDefault(namespace),
"labels": labels,
},
"spec": map[string]interface{}{
"to": map[string]interface{}{
"kind": "Service",
"name": name,
"weight": 100,
},
"port": map[string]interface{}{
"targetPort": intstr.FromInt32(port),
},
"tls": map[string]interface{}{
"termination": "edge",
"insecureEdgeTerminationPolicy": "Redirect",
},
},
},
})
}
// Convert the objects to Unstructured and reuse resourcesCreateOrUpdate functionality
converter := runtime.DefaultUnstructuredConverter
var toCreate []*unstructured.Unstructured
for _, obj := range resources {
m, err := converter.ToUnstructured(obj)
if err != nil {
return nil, err
}
u := &unstructured.Unstructured{}
if err = converter.FromUnstructured(m, u); err != nil {
return nil, err
}
toCreate = append(toCreate, u)
}
return k.resourcesCreateOrUpdate(ctx, toCreate)
}
func (k *Kubernetes) PodsTop(ctx context.Context, options PodsTopOptions) (*metrics.PodMetricsList, error) {
// TODO, maybe move to mcp Tools setup and omit in case metrics aren't available in the target cluster
if !k.supportsGroupVersion(metrics.GroupName + "/" + metricsv1beta1api.SchemeGroupVersion.Version) {
return nil, errors.New("metrics API is not available")
}
namespace := options.Namespace
if options.AllNamespaces && namespace == "" {
namespace = ""
} else {
namespace = k.NamespaceOrDefault(namespace)
}
return k.manager.accessControlClientSet.PodsMetricses(ctx, namespace, options.Name, options.ListOptions)
}
func (k *Kubernetes) PodsExec(ctx context.Context, namespace, name, container string, command []string) (string, error) {
namespace = k.NamespaceOrDefault(namespace)
pods, err := k.manager.accessControlClientSet.Pods(namespace)
if err != nil {
return "", err
}
pod, err := pods.Get(ctx, name, metav1.GetOptions{})
if err != nil {
return "", err
}
// https://github.com/kubernetes/kubectl/blob/5366de04e168bcbc11f5e340d131a9ca8b7d0df4/pkg/cmd/exec/exec.go#L350-L352
if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
return "", fmt.Errorf("cannot exec into a container in a completed pod; current phase is %s", pod.Status.Phase)
}
if container == "" {
container = pod.Spec.Containers[0].Name
}
podExecOptions := &v1.PodExecOptions{
Container: container,
Command: command,
Stdout: true,
Stderr: true,
}
executor, err := k.manager.accessControlClientSet.PodsExec(namespace, name, podExecOptions)
if err != nil {
return "", err
}
stdout := bytes.NewBuffer(make([]byte, 0))
stderr := bytes.NewBuffer(make([]byte, 0))
if err = executor.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdout: stdout, Stderr: stderr, Tty: false,
}); err != nil {
return "", err
}
if stdout.Len() > 0 {
return stdout.String(), nil
}
if stderr.Len() > 0 {
return stderr.String(), nil
}
return "", nil
}
```
--------------------------------------------------------------------------------
/dev/config/ingress/nginx-ingress.yaml:
--------------------------------------------------------------------------------
```yaml
---
apiVersion: v1
kind: Namespace
metadata:
name: ingress-nginx
labels:
app.kubernetes.io/name: ingress-nginx
app.kubernetes.io/instance: ingress-nginx
---
apiVersion: v1
kind: ServiceAccount
metadata:
labels:
app.kubernetes.io/name: ingress-nginx
app.kubernetes.io/instance: ingress-nginx
app.kubernetes.io/component: controller
name: ingress-nginx
namespace: ingress-nginx
---
apiVersion: v1
kind: ConfigMap
metadata:
labels:
app.kubernetes.io/name: ingress-nginx
app.kubernetes.io/instance: ingress-nginx
app.kubernetes.io/component: controller
name: ingress-nginx-controller
namespace: ingress-nginx
data:
allow-snippet-annotations: "true"
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
labels:
app.kubernetes.io/name: ingress-nginx
app.kubernetes.io/instance: ingress-nginx
name: ingress-nginx
rules:
- apiGroups:
- ""
resources:
- configmaps
- endpoints
- nodes
- pods
- secrets
- namespaces
verbs:
- list
- watch
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- list
- watch
- apiGroups:
- ""
resources:
- nodes
verbs:
- get
- apiGroups:
- ""
resources:
- services
verbs:
- get
- list
- watch
- apiGroups:
- networking.k8s.io
resources:
- ingresses
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
- apiGroups:
- networking.k8s.io
resources:
- ingresses/status
verbs:
- update
- apiGroups:
- networking.k8s.io
resources:
- ingressclasses
verbs:
- get
- list
- watch
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- list
- watch
- get
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
labels:
app.kubernetes.io/name: ingress-nginx
app.kubernetes.io/instance: ingress-nginx
name: ingress-nginx
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: ingress-nginx
subjects:
- kind: ServiceAccount
name: ingress-nginx
namespace: ingress-nginx
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
labels:
app.kubernetes.io/name: ingress-nginx
app.kubernetes.io/instance: ingress-nginx
app.kubernetes.io/component: controller
name: ingress-nginx
namespace: ingress-nginx
rules:
- apiGroups:
- ""
resources:
- namespaces
verbs:
- get
- apiGroups:
- ""
resources:
- configmaps
- pods
- secrets
- endpoints
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- services
verbs:
- get
- list
- watch
- apiGroups:
- networking.k8s.io
resources:
- ingresses
verbs:
- get
- list
- watch
- apiGroups:
- networking.k8s.io
resources:
- ingresses/status
verbs:
- update
- apiGroups:
- networking.k8s.io
resources:
- ingressclasses
verbs:
- get
- list
- watch
- apiGroups:
- coordination.k8s.io
resources:
- leases
resourceNames:
- ingress-nginx-leader
verbs:
- get
- update
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- create
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- list
- watch
- get
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
labels:
app.kubernetes.io/name: ingress-nginx
app.kubernetes.io/instance: ingress-nginx
app.kubernetes.io/component: controller
name: ingress-nginx
namespace: ingress-nginx
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: ingress-nginx
subjects:
- kind: ServiceAccount
name: ingress-nginx
namespace: ingress-nginx
---
apiVersion: v1
kind: Service
metadata:
labels:
app.kubernetes.io/name: ingress-nginx
app.kubernetes.io/instance: ingress-nginx
app.kubernetes.io/component: controller
name: ingress-nginx-controller
namespace: ingress-nginx
spec:
type: NodePort
ports:
- name: http
port: 80
protocol: TCP
targetPort: http
appProtocol: http
- name: https
port: 443
protocol: TCP
targetPort: https
appProtocol: https
selector:
app.kubernetes.io/name: ingress-nginx
app.kubernetes.io/instance: ingress-nginx
app.kubernetes.io/component: controller
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app.kubernetes.io/name: ingress-nginx
app.kubernetes.io/instance: ingress-nginx
app.kubernetes.io/component: controller
name: ingress-nginx-controller
namespace: ingress-nginx
spec:
selector:
matchLabels:
app.kubernetes.io/name: ingress-nginx
app.kubernetes.io/instance: ingress-nginx
app.kubernetes.io/component: controller
replicas: 1
revisionHistoryLimit: 10
minReadySeconds: 0
template:
metadata:
labels:
app.kubernetes.io/name: ingress-nginx
app.kubernetes.io/instance: ingress-nginx
app.kubernetes.io/component: controller
spec:
dnsPolicy: ClusterFirst
containers:
- name: controller
image: registry.k8s.io/ingress-nginx/controller:v1.11.1
imagePullPolicy: IfNotPresent
lifecycle:
preStop:
exec:
command:
- /wait-shutdown
args:
- /nginx-ingress-controller
- --election-id=ingress-nginx-leader
- --controller-class=k8s.io/ingress-nginx
- --ingress-class=nginx
- --configmap=$(POD_NAMESPACE)/ingress-nginx-controller
- --watch-ingress-without-class=true
securityContext:
runAsNonRoot: true
runAsUser: 101
allowPrivilegeEscalation: false
seccompProfile:
type: RuntimeDefault
capabilities:
drop:
- ALL
add:
- NET_BIND_SERVICE
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: LD_PRELOAD
value: /usr/local/lib/libmimalloc.so
livenessProbe:
failureThreshold: 5
httpGet:
path: /healthz
port: 10254
scheme: HTTP
initialDelaySeconds: 10
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 1
readinessProbe:
failureThreshold: 3
httpGet:
path: /healthz
port: 10254
scheme: HTTP
initialDelaySeconds: 10
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 1
ports:
- name: http
containerPort: 80
protocol: TCP
hostPort: 80
- name: https
containerPort: 443
protocol: TCP
hostPort: 443
- name: https-alt
containerPort: 443
protocol: TCP
hostPort: 8443
- name: webhook
containerPort: 8443
protocol: TCP
resources:
requests:
cpu: 100m
memory: 90Mi
nodeSelector:
ingress-ready: "true"
kubernetes.io/os: linux
serviceAccountName: ingress-nginx
terminationGracePeriodSeconds: 0
tolerations:
- effect: NoSchedule
key: node-role.kubernetes.io/master
operator: Equal
- effect: NoSchedule
key: node-role.kubernetes.io/control-plane
operator: Equal
---
apiVersion: networking.k8s.io/v1
kind: IngressClass
metadata:
labels:
app.kubernetes.io/name: ingress-nginx
app.kubernetes.io/instance: ingress-nginx
app.kubernetes.io/component: controller
name: nginx
spec:
controller: k8s.io/ingress-nginx
```
--------------------------------------------------------------------------------
/pkg/kubernetes/kubernetes_derived_test.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import (
"context"
"os"
"path/filepath"
"strings"
"testing"
"github.com/containers/kubernetes-mcp-server/internal/test"
"github.com/containers/kubernetes-mcp-server/pkg/config"
"github.com/stretchr/testify/suite"
)
type DerivedTestSuite struct {
suite.Suite
}
func (s *DerivedTestSuite) TestKubeConfig() {
// Create a temporary kubeconfig file for testing
tempDir := s.T().TempDir()
kubeconfigPath := filepath.Join(tempDir, "config")
kubeconfigContent := `
apiVersion: v1
kind: Config
clusters:
- cluster:
server: https://test-cluster.example.com
name: test-cluster
contexts:
- context:
cluster: test-cluster
user: test-user
name: test-context
current-context: test-context
users:
- name: test-user
user:
username: test-username
password: test-password
`
err := os.WriteFile(kubeconfigPath, []byte(kubeconfigContent), 0644)
s.Require().NoError(err, "failed to create kubeconfig file")
s.Run("with no RequireOAuth (default) config", func() {
testStaticConfig := test.Must(config.ReadToml([]byte(`
kubeconfig = "` + strings.ReplaceAll(kubeconfigPath, `\`, `\\`) + `"
`)))
s.Run("without authorization header returns original manager", func() {
testManager, err := NewKubeconfigManager(testStaticConfig, "")
s.Require().NoErrorf(err, "failed to create test manager: %v", err)
s.T().Cleanup(testManager.Close)
derived, err := testManager.Derived(s.T().Context())
s.Require().NoErrorf(err, "failed to create derived manager: %v", err)
s.Equal(derived.manager, testManager, "expected original manager, got different manager")
})
s.Run("with invalid authorization header returns original manager", func() {
testManager, err := NewKubeconfigManager(testStaticConfig, "")
s.Require().NoErrorf(err, "failed to create test manager: %v", err)
s.T().Cleanup(testManager.Close)
ctx := context.WithValue(s.T().Context(), HeaderKey("Authorization"), "invalid-token")
derived, err := testManager.Derived(ctx)
s.Require().NoErrorf(err, "failed to create derived manager: %v", err)
s.Equal(derived.manager, testManager, "expected original manager, got different manager")
})
s.Run("with valid bearer token creates derived manager with correct configuration", func() {
testManager, err := NewKubeconfigManager(testStaticConfig, "")
s.Require().NoErrorf(err, "failed to create test manager: %v", err)
s.T().Cleanup(testManager.Close)
ctx := context.WithValue(s.T().Context(), HeaderKey("Authorization"), "Bearer aiTana-julIA")
derived, err := testManager.Derived(ctx)
s.Require().NoErrorf(err, "failed to create derived manager: %v", err)
s.NotEqual(derived.manager, testManager, "expected new derived manager, got original manager")
s.Equal(derived.manager.staticConfig, testStaticConfig, "staticConfig not properly wired to derived manager")
s.Run("RestConfig is correctly copied and sensitive fields are omitted", func() {
derivedCfg := derived.manager.cfg
s.Require().NotNil(derivedCfg, "derived config is nil")
originalCfg := testManager.cfg
s.Equalf(originalCfg.Host, derivedCfg.Host, "expected Host %s, got %s", originalCfg.Host, derivedCfg.Host)
s.Equalf(originalCfg.APIPath, derivedCfg.APIPath, "expected APIPath %s, got %s", originalCfg.APIPath, derivedCfg.APIPath)
s.Equalf(originalCfg.QPS, derivedCfg.QPS, "expected QPS %f, got %f", originalCfg.QPS, derivedCfg.QPS)
s.Equalf(originalCfg.Burst, derivedCfg.Burst, "expected Burst %d, got %d", originalCfg.Burst, derivedCfg.Burst)
s.Equalf(originalCfg.Timeout, derivedCfg.Timeout, "expected Timeout %v, got %v", originalCfg.Timeout, derivedCfg.Timeout)
s.Equalf(originalCfg.Insecure, derivedCfg.Insecure, "expected TLS Insecure %v, got %v", originalCfg.Insecure, derivedCfg.Insecure)
s.Equalf(originalCfg.ServerName, derivedCfg.ServerName, "expected TLS ServerName %s, got %s", originalCfg.ServerName, derivedCfg.ServerName)
s.Equalf(originalCfg.CAFile, derivedCfg.CAFile, "expected TLS CAFile %s, got %s", originalCfg.CAFile, derivedCfg.CAFile)
s.Equalf(string(originalCfg.CAData), string(derivedCfg.CAData), "expected TLS CAData %s, got %s", string(originalCfg.CAData), string(derivedCfg.CAData))
s.Equalf("aiTana-julIA", derivedCfg.BearerToken, "expected BearerToken %s, got %s", "aiTana-julIA", derivedCfg.BearerToken)
s.Equalf("kubernetes-mcp-server/bearer-token-auth", derivedCfg.UserAgent, "expected UserAgent \"kubernetes-mcp-server/bearer-token-auth\", got %s", derivedCfg.UserAgent)
// Verify that sensitive fields are NOT copied to prevent credential leakage
// The derived config should only use the bearer token from the Authorization header
// and not inherit any authentication credentials from the original kubeconfig
s.Emptyf(derivedCfg.CertFile, "expected TLS CertFile to be empty, got %s", derivedCfg.CertFile)
s.Emptyf(derivedCfg.KeyFile, "expected TLS KeyFile to be empty, got %s", derivedCfg.KeyFile)
s.Emptyf(len(derivedCfg.CertData), "expected TLS CertData to be empty, got %v", derivedCfg.CertData)
s.Emptyf(len(derivedCfg.KeyData), "expected TLS KeyData to be empty, got %v", derivedCfg.KeyData)
s.Emptyf(derivedCfg.Username, "expected Username to be empty, got %s", derivedCfg.Username)
s.Emptyf(derivedCfg.Password, "expected Password to be empty, got %s", derivedCfg.Password)
s.Nilf(derivedCfg.AuthProvider, "expected AuthProvider to be nil, got %v", derivedCfg.AuthProvider)
s.Nilf(derivedCfg.ExecProvider, "expected ExecProvider to be nil, got %v", derivedCfg.ExecProvider)
s.Emptyf(derivedCfg.BearerTokenFile, "expected BearerTokenFile to be empty, got %s", derivedCfg.BearerTokenFile)
s.Emptyf(derivedCfg.Impersonate.UserName, "expected Impersonate.UserName to be empty, got %s", derivedCfg.Impersonate.UserName)
// Verify that the original manager still has the sensitive data
s.Falsef(originalCfg.Username == "" && originalCfg.Password == "", "original kubeconfig shouldn't be modified")
})
s.Run("derived manager has initialized clients", func() {
// Verify that the derived manager has proper clients initialized
s.NotNilf(derived.manager.accessControlClientSet, "expected accessControlClientSet to be initialized")
s.Equalf(testStaticConfig, derived.manager.accessControlClientSet.staticConfig, "staticConfig not properly wired to derived manager")
s.NotNilf(derived.manager.discoveryClient, "expected discoveryClient to be initialized")
s.NotNilf(derived.manager.accessControlRESTMapper, "expected accessControlRESTMapper to be initialized")
s.Equalf(testStaticConfig, derived.manager.accessControlRESTMapper.staticConfig, "staticConfig not properly wired to derived manager")
s.NotNilf(derived.manager.dynamicClient, "expected dynamicClient to be initialized")
})
})
})
s.Run("with RequireOAuth=true", func() {
testStaticConfig := test.Must(config.ReadToml([]byte(`
kubeconfig = "` + strings.ReplaceAll(kubeconfigPath, `\`, `\\`) + `"
require_oauth = true
`)))
s.Run("with no authorization header returns oauth token required error", func() {
testManager, err := NewKubeconfigManager(testStaticConfig, "")
s.Require().NoErrorf(err, "failed to create test manager: %v", err)
s.T().Cleanup(testManager.Close)
derived, err := testManager.Derived(s.T().Context())
s.Require().Error(err, "expected error for missing oauth token, got nil")
s.EqualError(err, "oauth token required", "expected error 'oauth token required', got %s", err.Error())
s.Nil(derived, "expected nil derived manager when oauth token required")
})
s.Run("with invalid authorization header returns oauth token required error", func() {
testManager, err := NewKubeconfigManager(testStaticConfig, "")
s.Require().NoErrorf(err, "failed to create test manager: %v", err)
s.T().Cleanup(testManager.Close)
ctx := context.WithValue(s.T().Context(), HeaderKey("Authorization"), "invalid-token")
derived, err := testManager.Derived(ctx)
s.Require().Error(err, "expected error for invalid oauth token, got nil")
s.EqualError(err, "oauth token required", "expected error 'oauth token required', got %s", err.Error())
s.Nil(derived, "expected nil derived manager when oauth token required")
})
s.Run("with valid bearer token creates derived manager", func() {
testManager, err := NewKubeconfigManager(testStaticConfig, "")
s.Require().NoErrorf(err, "failed to create test manager: %v", err)
s.T().Cleanup(testManager.Close)
ctx := context.WithValue(s.T().Context(), HeaderKey("Authorization"), "Bearer aiTana-julIA")
derived, err := testManager.Derived(ctx)
s.Require().NoErrorf(err, "failed to create derived manager: %v", err)
s.NotEqual(derived.manager, testManager, "expected new derived manager, got original manager")
s.Equal(derived.manager.staticConfig, testStaticConfig, "staticConfig not properly wired to derived manager")
derivedCfg := derived.manager.cfg
s.Require().NotNil(derivedCfg, "derived config is nil")
s.Equalf("aiTana-julIA", derivedCfg.BearerToken, "expected BearerToken %s, got %s", "aiTana-julIA", derivedCfg.BearerToken)
})
})
}
func TestDerived(t *testing.T) {
suite.Run(t, new(DerivedTestSuite))
}
```
--------------------------------------------------------------------------------
/pkg/kubernetes/manager.go:
--------------------------------------------------------------------------------
```go
package kubernetes
import (
"context"
"errors"
"fmt"
"strings"
"github.com/containers/kubernetes-mcp-server/pkg/config"
"github.com/containers/kubernetes-mcp-server/pkg/helm"
"github.com/fsnotify/fsnotify"
authenticationv1api "k8s.io/api/authentication/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/klog/v2"
)
type Manager struct {
cfg *rest.Config
clientCmdConfig clientcmd.ClientConfig
discoveryClient discovery.CachedDiscoveryInterface
accessControlClientSet *AccessControlClientset
accessControlRESTMapper *AccessControlRESTMapper
dynamicClient *dynamic.DynamicClient
staticConfig *config.StaticConfig
CloseWatchKubeConfig CloseWatchKubeConfig
}
var _ helm.Kubernetes = (*Manager)(nil)
var _ Openshift = (*Manager)(nil)
var (
ErrorKubeconfigInClusterNotAllowed = errors.New("kubeconfig manager cannot be used in in-cluster deployments")
ErrorInClusterNotInCluster = errors.New("in-cluster manager cannot be used outside of a cluster")
)
func NewKubeconfigManager(config *config.StaticConfig, kubeconfigContext string) (*Manager, error) {
if IsInCluster(config) {
return nil, ErrorKubeconfigInClusterNotAllowed
}
pathOptions := clientcmd.NewDefaultPathOptions()
if config.KubeConfig != "" {
pathOptions.LoadingRules.ExplicitPath = config.KubeConfig
}
clientCmdConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
pathOptions.LoadingRules,
&clientcmd.ConfigOverrides{
ClusterInfo: clientcmdapi.Cluster{Server: ""},
CurrentContext: kubeconfigContext,
})
restConfig, err := clientCmdConfig.ClientConfig()
if err != nil {
return nil, fmt.Errorf("failed to create kubernetes rest config from kubeconfig: %v", err)
}
return newManager(config, restConfig, clientCmdConfig)
}
func NewInClusterManager(config *config.StaticConfig) (*Manager, error) {
if config.KubeConfig != "" {
return nil, fmt.Errorf("kubeconfig file %s cannot be used with the in-cluster deployments: %v", config.KubeConfig, ErrorKubeconfigInClusterNotAllowed)
}
if !IsInCluster(config) {
return nil, ErrorInClusterNotInCluster
}
restConfig, err := InClusterConfig()
if err != nil {
return nil, fmt.Errorf("failed to create in-cluster kubernetes rest config: %v", err)
}
// Create a dummy kubeconfig clientcmdapi.Config for in-cluster config to be used in places where clientcmd.ClientConfig is required
clientCmdConfig := clientcmdapi.NewConfig()
clientCmdConfig.Clusters["cluster"] = &clientcmdapi.Cluster{
Server: restConfig.Host,
InsecureSkipTLSVerify: restConfig.Insecure,
}
clientCmdConfig.AuthInfos["user"] = &clientcmdapi.AuthInfo{
Token: restConfig.BearerToken,
}
clientCmdConfig.Contexts[inClusterKubeConfigDefaultContext] = &clientcmdapi.Context{
Cluster: "cluster",
AuthInfo: "user",
}
clientCmdConfig.CurrentContext = inClusterKubeConfigDefaultContext
return newManager(config, restConfig, clientcmd.NewDefaultClientConfig(*clientCmdConfig, nil))
}
func newManager(config *config.StaticConfig, restConfig *rest.Config, clientCmdConfig clientcmd.ClientConfig) (*Manager, error) {
k8s := &Manager{
staticConfig: config,
cfg: restConfig,
clientCmdConfig: clientCmdConfig,
}
if k8s.cfg.UserAgent == "" {
k8s.cfg.UserAgent = rest.DefaultKubernetesUserAgent()
}
var err error
// TODO: Won't work because not all client-go clients use the shared context (e.g. discovery client uses context.TODO())
//k8s.cfg.Wrap(func(original http.RoundTripper) http.RoundTripper {
// return &impersonateRoundTripper{original}
//})
k8s.accessControlClientSet, err = NewAccessControlClientset(k8s.cfg, k8s.staticConfig)
if err != nil {
return nil, err
}
k8s.discoveryClient = memory.NewMemCacheClient(k8s.accessControlClientSet.DiscoveryClient())
k8s.accessControlRESTMapper = NewAccessControlRESTMapper(
restmapper.NewDeferredDiscoveryRESTMapper(k8s.discoveryClient),
k8s.staticConfig,
)
k8s.dynamicClient, err = dynamic.NewForConfig(k8s.cfg)
if err != nil {
return nil, err
}
return k8s, nil
}
func (m *Manager) WatchKubeConfig(onKubeConfigChange func() error) {
if m.clientCmdConfig == nil {
return
}
kubeConfigFiles := m.clientCmdConfig.ConfigAccess().GetLoadingPrecedence()
if len(kubeConfigFiles) == 0 {
return
}
watcher, err := fsnotify.NewWatcher()
if err != nil {
return
}
for _, file := range kubeConfigFiles {
_ = watcher.Add(file)
}
go func() {
for {
select {
case _, ok := <-watcher.Events:
if !ok {
return
}
_ = onKubeConfigChange()
case _, ok := <-watcher.Errors:
if !ok {
return
}
}
}
}()
if m.CloseWatchKubeConfig != nil {
_ = m.CloseWatchKubeConfig()
}
m.CloseWatchKubeConfig = watcher.Close
}
func (m *Manager) Close() {
if m.CloseWatchKubeConfig != nil {
_ = m.CloseWatchKubeConfig()
}
}
func (m *Manager) configuredNamespace() string {
if ns, _, nsErr := m.clientCmdConfig.Namespace(); nsErr == nil {
return ns
}
return ""
}
func (m *Manager) NamespaceOrDefault(namespace string) string {
if namespace == "" {
return m.configuredNamespace()
}
return namespace
}
func (m *Manager) ToDiscoveryClient() (discovery.CachedDiscoveryInterface, error) {
return m.discoveryClient, nil
}
func (m *Manager) ToRESTMapper() (meta.RESTMapper, error) {
return m.accessControlRESTMapper, nil
}
// ToRESTConfig returns the rest.Config object (genericclioptions.RESTClientGetter)
func (m *Manager) ToRESTConfig() (*rest.Config, error) {
return m.cfg, nil
}
// ToRawKubeConfigLoader returns the clientcmd.ClientConfig object (genericclioptions.RESTClientGetter)
func (m *Manager) ToRawKubeConfigLoader() clientcmd.ClientConfig {
return m.clientCmdConfig
}
func (m *Manager) VerifyToken(ctx context.Context, token, audience string) (*authenticationv1api.UserInfo, []string, error) {
tokenReviewClient, err := m.accessControlClientSet.TokenReview()
if err != nil {
return nil, nil, err
}
tokenReview := &authenticationv1api.TokenReview{
TypeMeta: metav1.TypeMeta{
APIVersion: "authentication.k8s.io/v1",
Kind: "TokenReview",
},
Spec: authenticationv1api.TokenReviewSpec{
Token: token,
Audiences: []string{audience},
},
}
result, err := tokenReviewClient.Create(ctx, tokenReview, metav1.CreateOptions{})
if err != nil {
return nil, nil, fmt.Errorf("failed to create token review: %v", err)
}
if !result.Status.Authenticated {
if result.Status.Error != "" {
return nil, nil, fmt.Errorf("token authentication failed: %s", result.Status.Error)
}
return nil, nil, fmt.Errorf("token authentication failed")
}
return &result.Status.User, result.Status.Audiences, nil
}
func (m *Manager) Derived(ctx context.Context) (*Kubernetes, error) {
authorization, ok := ctx.Value(OAuthAuthorizationHeader).(string)
if !ok || !strings.HasPrefix(authorization, "Bearer ") {
if m.staticConfig.RequireOAuth {
return nil, errors.New("oauth token required")
}
return &Kubernetes{manager: m}, nil
}
klog.V(5).Infof("%s header found (Bearer), using provided bearer token", OAuthAuthorizationHeader)
derivedCfg := &rest.Config{
Host: m.cfg.Host,
APIPath: m.cfg.APIPath,
// Copy only server verification TLS settings (CA bundle and server name)
TLSClientConfig: rest.TLSClientConfig{
Insecure: m.cfg.Insecure,
ServerName: m.cfg.ServerName,
CAFile: m.cfg.CAFile,
CAData: m.cfg.CAData,
},
BearerToken: strings.TrimPrefix(authorization, "Bearer "),
// pass custom UserAgent to identify the client
UserAgent: CustomUserAgent,
QPS: m.cfg.QPS,
Burst: m.cfg.Burst,
Timeout: m.cfg.Timeout,
Impersonate: rest.ImpersonationConfig{},
}
clientCmdApiConfig, err := m.clientCmdConfig.RawConfig()
if err != nil {
if m.staticConfig.RequireOAuth {
klog.Errorf("failed to get kubeconfig: %v", err)
return nil, errors.New("failed to get kubeconfig")
}
return &Kubernetes{manager: m}, nil
}
clientCmdApiConfig.AuthInfos = make(map[string]*clientcmdapi.AuthInfo)
derived := &Kubernetes{
manager: &Manager{
clientCmdConfig: clientcmd.NewDefaultClientConfig(clientCmdApiConfig, nil),
cfg: derivedCfg,
staticConfig: m.staticConfig,
},
}
derived.manager.accessControlClientSet, err = NewAccessControlClientset(derived.manager.cfg, derived.manager.staticConfig)
if err != nil {
if m.staticConfig.RequireOAuth {
klog.Errorf("failed to get kubeconfig: %v", err)
return nil, errors.New("failed to get kubeconfig")
}
return &Kubernetes{manager: m}, nil
}
derived.manager.discoveryClient = memory.NewMemCacheClient(derived.manager.accessControlClientSet.DiscoveryClient())
derived.manager.accessControlRESTMapper = NewAccessControlRESTMapper(
restmapper.NewDeferredDiscoveryRESTMapper(derived.manager.discoveryClient),
derived.manager.staticConfig,
)
derived.manager.dynamicClient, err = dynamic.NewForConfig(derived.manager.cfg)
if err != nil {
if m.staticConfig.RequireOAuth {
klog.Errorf("failed to initialize dynamic client: %v", err)
return nil, errors.New("failed to initialize dynamic client")
}
return &Kubernetes{manager: m}, nil
}
return derived, nil
}
```
--------------------------------------------------------------------------------
/pkg/http/authorization.go:
--------------------------------------------------------------------------------
```go
package http
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"github.com/coreos/go-oidc/v3/oidc"
"github.com/go-jose/go-jose/v4"
"github.com/go-jose/go-jose/v4/jwt"
"golang.org/x/oauth2"
authenticationapiv1 "k8s.io/api/authentication/v1"
"k8s.io/klog/v2"
"k8s.io/utils/strings/slices"
"github.com/containers/kubernetes-mcp-server/pkg/config"
"github.com/containers/kubernetes-mcp-server/pkg/mcp"
)
type KubernetesApiTokenVerifier interface {
// KubernetesApiVerifyToken TODO: clarify proper implementation
KubernetesApiVerifyToken(ctx context.Context, cluster, token, audience string) (*authenticationapiv1.UserInfo, []string, error)
// GetTargetParameterName returns the parameter name used for target identification in MCP requests
GetTargetParameterName() string
}
// extractTargetFromRequest extracts cluster parameter from MCP request body
func extractTargetFromRequest(r *http.Request, targetName string) (string, error) {
if r.Body == nil {
return "", nil
}
// Read the body
body, err := io.ReadAll(r.Body)
if err != nil {
return "", err
}
// Restore the body for downstream handlers
r.Body = io.NopCloser(bytes.NewBuffer(body))
// Parse the MCP request
var mcpRequest struct {
Params struct {
Arguments map[string]interface{} `json:"arguments"`
} `json:"params"`
}
if err := json.Unmarshal(body, &mcpRequest); err != nil {
// If we can't parse the request, just return empty cluster (will use default)
return "", nil
}
// Extract target parameter
if cluster, ok := mcpRequest.Params.Arguments[targetName].(string); ok {
return cluster, nil
}
return "", nil
}
// write401 sends a 401/Unauthorized response with WWW-Authenticate header.
func write401(w http.ResponseWriter, wwwAuthenticateHeader, errorType, message string) {
w.Header().Set("WWW-Authenticate", wwwAuthenticateHeader+fmt.Sprintf(`, error="%s"`, errorType))
http.Error(w, message, http.StatusUnauthorized)
}
// AuthorizationMiddleware validates the OAuth flow for protected resources.
//
// The flow is skipped for unprotected resources, such as health checks and well-known endpoints.
//
// There are several auth scenarios supported by this middleware:
//
// 1. requireOAuth is false:
//
// - The OAuth flow is skipped, and the server is effectively unprotected.
// - The request is passed to the next handler without any validation.
//
// see TestAuthorizationRequireOAuthFalse
//
// 2. requireOAuth is set to true, server is protected:
//
// 2.1. Raw Token Validation (oidcProvider is nil):
// - The token is validated offline for basic sanity checks (expiration).
// - If OAuthAudience is set, the token is validated against the audience.
// - If ValidateToken is set, the token is then used against the Kubernetes API Server for TokenReview.
//
// see TestAuthorizationRawToken
//
// 2.2. OIDC Provider Validation (oidcProvider is not nil):
// - The token is validated offline for basic sanity checks (audience and expiration).
// - If OAuthAudience is set, the token is validated against the audience.
// - The token is then validated against the OIDC Provider.
// - If ValidateToken is set, the token is then used against the Kubernetes API Server for TokenReview.
//
// see TestAuthorizationOidcToken
//
// 2.3. OIDC Token Exchange (oidcProvider is not nil, StsClientId and StsAudience are set):
// - The token is validated offline for basic sanity checks (audience and expiration).
// - If OAuthAudience is set, the token is validated against the audience.
// - The token is then validated against the OIDC Provider.
// - If the token is valid, an external account token exchange is performed using
// the OIDC Provider to obtain a new token with the specified audience and scopes.
// - If ValidateToken is set, the exchanged token is then used against the Kubernetes API Server for TokenReview.
//
// see TestAuthorizationOidcTokenExchange
func AuthorizationMiddleware(staticConfig *config.StaticConfig, oidcProvider *oidc.Provider, verifier KubernetesApiTokenVerifier, httpClient *http.Client) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == healthEndpoint || slices.Contains(WellKnownEndpoints, r.URL.EscapedPath()) {
next.ServeHTTP(w, r)
return
}
if !staticConfig.RequireOAuth {
next.ServeHTTP(w, r)
return
}
wwwAuthenticateHeader := "Bearer realm=\"Kubernetes MCP Server\""
if staticConfig.OAuthAudience != "" {
wwwAuthenticateHeader += fmt.Sprintf(`, audience="%s"`, staticConfig.OAuthAudience)
}
authHeader := r.Header.Get("Authorization")
if authHeader == "" || !strings.HasPrefix(authHeader, "Bearer ") {
klog.V(1).Infof("Authentication failed - missing or invalid bearer token: %s %s from %s", r.Method, r.URL.Path, r.RemoteAddr)
write401(w, wwwAuthenticateHeader, "missing_token", "Unauthorized: Bearer token required")
return
}
token := strings.TrimPrefix(authHeader, "Bearer ")
claims, err := ParseJWTClaims(token)
if err == nil && claims == nil {
// Impossible case, but just in case
err = fmt.Errorf("failed to parse JWT claims from token")
}
// Offline validation
if err == nil {
err = claims.ValidateOffline(staticConfig.OAuthAudience)
}
// Online OIDC provider validation
if err == nil {
err = claims.ValidateWithProvider(r.Context(), staticConfig.OAuthAudience, oidcProvider)
}
// Scopes propagation, they are likely to be used for authorization.
if err == nil {
scopes := claims.GetScopes()
klog.V(2).Infof("JWT token validated - Scopes: %v", scopes)
r = r.WithContext(context.WithValue(r.Context(), mcp.TokenScopesContextKey, scopes))
}
// Token exchange with OIDC provider
sts := NewFromConfig(staticConfig, oidcProvider)
// TODO: Maybe the token had already been exchanged, if it has the right audience and scopes, we can skip this step.
if err == nil && sts.IsEnabled() {
var exchangedToken *oauth2.Token
// If the token is valid, we can exchange it for a new token with the specified audience and scopes.
ctx := r.Context()
if httpClient != nil {
ctx = context.WithValue(ctx, oauth2.HTTPClient, httpClient)
}
exchangedToken, err = sts.ExternalAccountTokenExchange(ctx, &oauth2.Token{
AccessToken: claims.Token,
TokenType: "Bearer",
})
if err == nil {
// Replace the original token with the exchanged token
token = exchangedToken.AccessToken
claims, err = ParseJWTClaims(token)
r.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) // TODO: Implement test to verify, THIS IS A CRITICAL PART
}
}
// Kubernetes API Server TokenReview validation
if err == nil && staticConfig.ValidateToken {
targetParameterName := verifier.GetTargetParameterName()
cluster, clusterErr := extractTargetFromRequest(r, targetParameterName)
if clusterErr != nil {
klog.V(2).Infof("Failed to extract cluster from request, using default: %v", clusterErr)
}
err = claims.ValidateWithKubernetesApi(r.Context(), staticConfig.OAuthAudience, cluster, verifier)
}
if err != nil {
klog.V(1).Infof("Authentication failed - JWT validation error: %s %s from %s, error: %v", r.Method, r.URL.Path, r.RemoteAddr, err)
write401(w, wwwAuthenticateHeader, "invalid_token", "Unauthorized: Invalid token")
return
}
next.ServeHTTP(w, r)
})
}
}
var allSignatureAlgorithms = []jose.SignatureAlgorithm{
jose.EdDSA,
jose.HS256,
jose.HS384,
jose.HS512,
jose.RS256,
jose.RS384,
jose.RS512,
jose.ES256,
jose.ES384,
jose.ES512,
jose.PS256,
jose.PS384,
jose.PS512,
}
type JWTClaims struct {
jwt.Claims
Token string `json:"-"`
Scope string `json:"scope,omitempty"`
}
func (c *JWTClaims) GetScopes() []string {
if c.Scope == "" {
return nil
}
return strings.Fields(c.Scope)
}
// ValidateOffline Checks if the JWT claims are valid and if the audience matches the expected one.
func (c *JWTClaims) ValidateOffline(audience string) error {
expected := jwt.Expected{}
if audience != "" {
expected.AnyAudience = jwt.Audience{audience}
}
if err := c.Validate(expected); err != nil {
return fmt.Errorf("JWT token validation error: %v", err)
}
return nil
}
// ValidateWithProvider validates the JWT claims against the OIDC provider.
func (c *JWTClaims) ValidateWithProvider(ctx context.Context, audience string, provider *oidc.Provider) error {
if provider != nil {
verifier := provider.Verifier(&oidc.Config{
ClientID: audience,
})
_, err := verifier.Verify(ctx, c.Token)
if err != nil {
return fmt.Errorf("OIDC token validation error: %v", err)
}
}
return nil
}
func (c *JWTClaims) ValidateWithKubernetesApi(ctx context.Context, audience, cluster string, verifier KubernetesApiTokenVerifier) error {
if verifier != nil {
_, _, err := verifier.KubernetesApiVerifyToken(ctx, cluster, c.Token, audience)
if err != nil {
return fmt.Errorf("kubernetes API token validation error: %v", err)
}
}
return nil
}
func ParseJWTClaims(token string) (*JWTClaims, error) {
tkn, err := jwt.ParseSigned(token, allSignatureAlgorithms)
if err != nil {
return nil, fmt.Errorf("failed to parse JWT token: %w", err)
}
claims := &JWTClaims{}
err = tkn.UnsafeClaimsWithoutVerification(claims)
claims.Token = token
return claims, err
}
```
--------------------------------------------------------------------------------
/pkg/toolsets/core/resources.go:
--------------------------------------------------------------------------------
```go
package core
import (
"context"
"errors"
"fmt"
"github.com/google/jsonschema-go/jsonschema"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/utils/ptr"
"github.com/containers/kubernetes-mcp-server/pkg/api"
internalk8s "github.com/containers/kubernetes-mcp-server/pkg/kubernetes"
"github.com/containers/kubernetes-mcp-server/pkg/output"
)
func initResources(o internalk8s.Openshift) []api.ServerTool {
commonApiVersion := "v1 Pod, v1 Service, v1 Node, apps/v1 Deployment, networking.k8s.io/v1 Ingress"
if o.IsOpenShift(context.Background()) {
commonApiVersion += ", route.openshift.io/v1 Route"
}
commonApiVersion = fmt.Sprintf("(common apiVersion and kind include: %s)", commonApiVersion)
return []api.ServerTool{
{Tool: api.Tool{
Name: "resources_list",
Description: "List Kubernetes resources and objects in the current cluster by providing their apiVersion and kind and optionally the namespace and label selector\n" + commonApiVersion,
InputSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"apiVersion": {
Type: "string",
Description: "apiVersion of the resources (examples of valid apiVersion are: v1, apps/v1, networking.k8s.io/v1)",
},
"kind": {
Type: "string",
Description: "kind of the resources (examples of valid kind are: Pod, Service, Deployment, Ingress)",
},
"namespace": {
Type: "string",
Description: "Optional Namespace to retrieve the namespaced resources from (ignored in case of cluster scoped resources). If not provided, will list resources from all namespaces",
},
"labelSelector": {
Type: "string",
Description: "Optional Kubernetes label selector (e.g. 'app=myapp,env=prod' or 'app in (myapp,yourapp)'), use this option when you want to filter the pods by label",
Pattern: "([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9]",
},
},
Required: []string{"apiVersion", "kind"},
},
Annotations: api.ToolAnnotations{
Title: "Resources: List",
ReadOnlyHint: ptr.To(true),
DestructiveHint: ptr.To(false),
IdempotentHint: ptr.To(false),
OpenWorldHint: ptr.To(true),
},
}, Handler: resourcesList},
{Tool: api.Tool{
Name: "resources_get",
Description: "Get a Kubernetes resource in the current cluster by providing its apiVersion, kind, optionally the namespace, and its name\n" + commonApiVersion,
InputSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"apiVersion": {
Type: "string",
Description: "apiVersion of the resource (examples of valid apiVersion are: v1, apps/v1, networking.k8s.io/v1)",
},
"kind": {
Type: "string",
Description: "kind of the resource (examples of valid kind are: Pod, Service, Deployment, Ingress)",
},
"namespace": {
Type: "string",
Description: "Optional Namespace to retrieve the namespaced resource from (ignored in case of cluster scoped resources). If not provided, will get resource from configured namespace",
},
"name": {
Type: "string",
Description: "Name of the resource",
},
},
Required: []string{"apiVersion", "kind", "name"},
},
Annotations: api.ToolAnnotations{
Title: "Resources: Get",
ReadOnlyHint: ptr.To(true),
DestructiveHint: ptr.To(false),
IdempotentHint: ptr.To(false),
OpenWorldHint: ptr.To(true),
},
}, Handler: resourcesGet},
{Tool: api.Tool{
Name: "resources_create_or_update",
Description: "Create or update a Kubernetes resource in the current cluster by providing a YAML or JSON representation of the resource\n" + commonApiVersion,
InputSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"resource": {
Type: "string",
Description: "A JSON or YAML containing a representation of the Kubernetes resource. Should include top-level fields such as apiVersion,kind,metadata, and spec",
},
},
Required: []string{"resource"},
},
Annotations: api.ToolAnnotations{
Title: "Resources: Create or Update",
ReadOnlyHint: ptr.To(false),
DestructiveHint: ptr.To(true),
IdempotentHint: ptr.To(true),
OpenWorldHint: ptr.To(true),
},
}, Handler: resourcesCreateOrUpdate},
{Tool: api.Tool{
Name: "resources_delete",
Description: "Delete a Kubernetes resource in the current cluster by providing its apiVersion, kind, optionally the namespace, and its name\n" + commonApiVersion,
InputSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"apiVersion": {
Type: "string",
Description: "apiVersion of the resource (examples of valid apiVersion are: v1, apps/v1, networking.k8s.io/v1)",
},
"kind": {
Type: "string",
Description: "kind of the resource (examples of valid kind are: Pod, Service, Deployment, Ingress)",
},
"namespace": {
Type: "string",
Description: "Optional Namespace to delete the namespaced resource from (ignored in case of cluster scoped resources). If not provided, will delete resource from configured namespace",
},
"name": {
Type: "string",
Description: "Name of the resource",
},
},
Required: []string{"apiVersion", "kind", "name"},
},
Annotations: api.ToolAnnotations{
Title: "Resources: Delete",
ReadOnlyHint: ptr.To(false),
DestructiveHint: ptr.To(true),
IdempotentHint: ptr.To(true),
OpenWorldHint: ptr.To(true),
},
}, Handler: resourcesDelete},
}
}
func resourcesList(params api.ToolHandlerParams) (*api.ToolCallResult, error) {
namespace := params.GetArguments()["namespace"]
if namespace == nil {
namespace = ""
}
labelSelector := params.GetArguments()["labelSelector"]
resourceListOptions := internalk8s.ResourceListOptions{
AsTable: params.ListOutput.AsTable(),
}
if labelSelector != nil {
l, ok := labelSelector.(string)
if !ok {
return api.NewToolCallResult("", fmt.Errorf("labelSelector is not a string")), nil
}
resourceListOptions.LabelSelector = l
}
gvk, err := parseGroupVersionKind(params.GetArguments())
if err != nil {
return api.NewToolCallResult("", fmt.Errorf("failed to list resources, %s", err)), nil
}
ns, ok := namespace.(string)
if !ok {
return api.NewToolCallResult("", fmt.Errorf("namespace is not a string")), nil
}
ret, err := params.ResourcesList(params, gvk, ns, resourceListOptions)
if err != nil {
return api.NewToolCallResult("", fmt.Errorf("failed to list resources: %v", err)), nil
}
return api.NewToolCallResult(params.ListOutput.PrintObj(ret)), nil
}
func resourcesGet(params api.ToolHandlerParams) (*api.ToolCallResult, error) {
namespace := params.GetArguments()["namespace"]
if namespace == nil {
namespace = ""
}
gvk, err := parseGroupVersionKind(params.GetArguments())
if err != nil {
return api.NewToolCallResult("", fmt.Errorf("failed to get resource, %s", err)), nil
}
name := params.GetArguments()["name"]
if name == nil {
return api.NewToolCallResult("", errors.New("failed to get resource, missing argument name")), nil
}
ns, ok := namespace.(string)
if !ok {
return api.NewToolCallResult("", fmt.Errorf("namespace is not a string")), nil
}
n, ok := name.(string)
if !ok {
return api.NewToolCallResult("", fmt.Errorf("name is not a string")), nil
}
ret, err := params.ResourcesGet(params, gvk, ns, n)
if err != nil {
return api.NewToolCallResult("", fmt.Errorf("failed to get resource: %v", err)), nil
}
return api.NewToolCallResult(output.MarshalYaml(ret)), nil
}
func resourcesCreateOrUpdate(params api.ToolHandlerParams) (*api.ToolCallResult, error) {
resource := params.GetArguments()["resource"]
if resource == nil || resource == "" {
return api.NewToolCallResult("", errors.New("failed to create or update resources, missing argument resource")), nil
}
r, ok := resource.(string)
if !ok {
return api.NewToolCallResult("", fmt.Errorf("resource is not a string")), nil
}
resources, err := params.ResourcesCreateOrUpdate(params, r)
if err != nil {
return api.NewToolCallResult("", fmt.Errorf("failed to create or update resources: %v", err)), nil
}
marshalledYaml, err := output.MarshalYaml(resources)
if err != nil {
err = fmt.Errorf("failed to create or update resources:: %v", err)
}
return api.NewToolCallResult("# The following resources (YAML) have been created or updated successfully\n"+marshalledYaml, err), nil
}
func resourcesDelete(params api.ToolHandlerParams) (*api.ToolCallResult, error) {
namespace := params.GetArguments()["namespace"]
if namespace == nil {
namespace = ""
}
gvk, err := parseGroupVersionKind(params.GetArguments())
if err != nil {
return api.NewToolCallResult("", fmt.Errorf("failed to delete resource, %s", err)), nil
}
name := params.GetArguments()["name"]
if name == nil {
return api.NewToolCallResult("", errors.New("failed to delete resource, missing argument name")), nil
}
ns, ok := namespace.(string)
if !ok {
return api.NewToolCallResult("", fmt.Errorf("namespace is not a string")), nil
}
n, ok := name.(string)
if !ok {
return api.NewToolCallResult("", fmt.Errorf("name is not a string")), nil
}
err = params.ResourcesDelete(params, gvk, ns, n)
if err != nil {
return api.NewToolCallResult("", fmt.Errorf("failed to delete resource: %v", err)), nil
}
return api.NewToolCallResult("Resource deleted successfully", err), nil
}
func parseGroupVersionKind(arguments map[string]interface{}) (*schema.GroupVersionKind, error) {
apiVersion := arguments["apiVersion"]
if apiVersion == nil {
return nil, errors.New("missing argument apiVersion")
}
kind := arguments["kind"]
if kind == nil {
return nil, errors.New("missing argument kind")
}
a, ok := apiVersion.(string)
if !ok {
return nil, fmt.Errorf("name is not a string")
}
gv, err := schema.ParseGroupVersion(a)
if err != nil {
return nil, errors.New("invalid argument apiVersion")
}
return &schema.GroupVersionKind{Group: gv.Group, Version: gv.Version, Kind: kind.(string)}, nil
}
```
--------------------------------------------------------------------------------
/pkg/http/authorization_test.go:
--------------------------------------------------------------------------------
```go
package http
import (
"strings"
"testing"
"github.com/go-jose/go-jose/v4/jwt"
)
const (
// https://jwt.io/#token=eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOlsiaHR0cHM6Ly9rdWJlcm5ldGVzLmRlZmF1bHQuc3ZjLmNsdXN0ZXIubG9jYWwiLCJtY3Atc2VydmVyIl0sImV4cCI6MjUzNDAyMjk3MTk5LCJpYXQiOjAsImlzcyI6Imh0dHBzOi8va3ViZXJuZXRlcy5kZWZhdWx0LnN2Yy5jbHVzdGVyLmxvY2FsIiwianRpIjoiOTkyMjJkNTYtMzQwZS00ZWI2LTg1ODgtMjYxNDExZjM1ZDI2Iiwia3ViZXJuZXRlcy5pbyI6eyJuYW1lc3BhY2UiOiJkZWZhdWx0Iiwic2VydmljZWFjY291bnQiOnsibmFtZSI6ImRlZmF1bHQiLCJ1aWQiOiJlYWNiNmFkMi04MGI3LTQxNzktODQzZC05MmViMWU2YmJiYTYifX0sIm5iZiI6MCwic3ViIjoic3lzdGVtOnNlcnZpY2VhY2NvdW50OmRlZmF1bHQ6ZGVmYXVsdCJ9.ld9aJaQX5k44KOV1bv8MCY2RceAZ9jAjN2vKswKmINNiOpRMl0f8Y0trrq7gdRlKwGLsCUjz8hbHsGcM43QtNrcwfvH5imRnlAKANPUgswwEadCTjASihlo6ADsn9fjAWB4viplFwq8VdzcwpcyActYJi2TBFoRq204STZJIcAW_B40HOuCB2XxQ81V4_XWLzL03Bt-YmYUhliiiE5YSKS1WEEWIbdel--b7Gvp-VS1I2eeiOqV3SelMBHbF9EwKGAkyObg0JhGqr5XHLd6WOmhvLus4eCkyakQMgr2tZIdvbt2yEUDiId6r27tlgAPLmqlyYMEhyiM212_Sth3T3Q // notsecret
tokenBasicNotExpired = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOlsiaHR0cHM6Ly9rdWJlcm5ldGVzLmRlZmF1bHQuc3ZjLmNsdXN0ZXIubG9jYWwiLCJtY3Atc2VydmVyIl0sImV4cCI6MjUzNDAyMjk3MTk5LCJpYXQiOjAsImlzcyI6Imh0dHBzOi8va3ViZXJuZXRlcy5kZWZhdWx0LnN2Yy5jbHVzdGVyLmxvY2FsIiwianRpIjoiOTkyMjJkNTYtMzQwZS00ZWI2LTg1ODgtMjYxNDExZjM1ZDI2Iiwia3ViZXJuZXRlcy5pbyI6eyJuYW1lc3BhY2UiOiJkZWZhdWx0Iiwic2VydmljZWFjY291bnQiOnsibmFtZSI6ImRlZmF1bHQiLCJ1aWQiOiJlYWNiNmFkMi04MGI3LTQxNzktODQzZC05MmViMWU2YmJiYTYifX0sIm5iZiI6MCwic3ViIjoic3lzdGVtOnNlcnZpY2VhY2NvdW50OmRlZmF1bHQ6ZGVmYXVsdCJ9.ld9aJaQX5k44KOV1bv8MCY2RceAZ9jAjN2vKswKmINNiOpRMl0f8Y0trrq7gdRlKwGLsCUjz8hbHsGcM43QtNrcwfvH5imRnlAKANPUgswwEadCTjASihlo6ADsn9fjAWB4viplFwq8VdzcwpcyActYJi2TBFoRq204STZJIcAW_B40HOuCB2XxQ81V4_XWLzL03Bt-YmYUhliiiE5YSKS1WEEWIbdel--b7Gvp-VS1I2eeiOqV3SelMBHbF9EwKGAkyObg0JhGqr5XHLd6WOmhvLus4eCkyakQMgr2tZIdvbt2yEUDiId6r27tlgAPLmqlyYMEhyiM212_Sth3T3Q" // notsecret
// https://jwt.io/#token=eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiIsImtpZCI6Ijk4ZDU3YmUwNWI3ZjUzNWIwMzYyYjg2MDJhNTJlNGYxIn0.eyJhdWQiOlsiaHR0cHM6Ly9rdWJlcm5ldGVzLmRlZmF1bHQuc3ZjLmNsdXN0ZXIubG9jYWwiLCJtY3Atc2VydmVyIl0sImV4cCI6MSwiaWF0IjowLCJpc3MiOiJodHRwczovL2t1YmVybmV0ZXMuZGVmYXVsdC5zdmMuY2x1c3Rlci5sb2NhbCIsImp0aSI6Ijk5MjIyZDU2LTM0MGUtNGViNi04NTg4LTI2MTQxMWYzNWQyNiIsImt1YmVybmV0ZXMuaW8iOnsibmFtZXNwYWNlIjoiZGVmYXVsdCIsInNlcnZpY2VhY2NvdW50Ijp7Im5hbWUiOiJkZWZhdWx0IiwidWlkIjoiZWFjYjZhZDItODBiNy00MTc5LTg0M2QtOTJlYjFlNmJiYmE2In19LCJuYmYiOjAsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDpkZWZhdWx0OmRlZmF1bHQifQ.iVrxt6glbY3Qe_mEtK-lYpx4Z3VC1a7zgGRSmfu29pMmnKhlTk56y0Wx45DQ4PSYCTwC6CJnGGZNbJyr4JS8PQ // notsecret
tokenBasicExpired = "eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiIsImtpZCI6Ijk4ZDU3YmUwNWI3ZjUzNWIwMzYyYjg2MDJhNTJlNGYxIn0.eyJhdWQiOlsiaHR0cHM6Ly9rdWJlcm5ldGVzLmRlZmF1bHQuc3ZjLmNsdXN0ZXIubG9jYWwiLCJtY3Atc2VydmVyIl0sImV4cCI6MSwiaWF0IjowLCJpc3MiOiJodHRwczovL2t1YmVybmV0ZXMuZGVmYXVsdC5zdmMuY2x1c3Rlci5sb2NhbCIsImp0aSI6Ijk5MjIyZDU2LTM0MGUtNGViNi04NTg4LTI2MTQxMWYzNWQyNiIsImt1YmVybmV0ZXMuaW8iOnsibmFtZXNwYWNlIjoiZGVmYXVsdCIsInNlcnZpY2VhY2NvdW50Ijp7Im5hbWUiOiJkZWZhdWx0IiwidWlkIjoiZWFjYjZhZDItODBiNy00MTc5LTg0M2QtOTJlYjFlNmJiYmE2In19LCJuYmYiOjAsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDpkZWZhdWx0OmRlZmF1bHQifQ.iVrxt6glbY3Qe_mEtK-lYpx4Z3VC1a7zgGRSmfu29pMmnKhlTk56y0Wx45DQ4PSYCTwC6CJnGGZNbJyr4JS8PQ" // notsecret
// https://jwt.io/#token=eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiIsImtpZCI6Ijk4ZDU3YmUwNWI3ZjUzNWIwMzYyYjg2MDJhNTJlNGYxIn0.eyJhdWQiOlsiaHR0cHM6Ly9rdWJlcm5ldGVzLmRlZmF1bHQuc3ZjLmNsdXN0ZXIubG9jYWwiLCJtY3Atc2VydmVyIl0sImV4cCI6MjUzNDAyMjk3MTk5LCJpYXQiOjAsImlzcyI6Imh0dHBzOi8va3ViZXJuZXRlcy5kZWZhdWx0LnN2Yy5jbHVzdGVyLmxvY2FsIiwianRpIjoiOTkyMjJkNTYtMzQwZS00ZWI2LTg1ODgtMjYxNDExZjM1ZDI2Iiwia3ViZXJuZXRlcy5pbyI6eyJuYW1lc3BhY2UiOiJkZWZhdWx0Iiwic2VydmljZWFjY291bnQiOnsibmFtZSI6ImRlZmF1bHQiLCJ1aWQiOiJlYWNiNmFkMi04MGI3LTQxNzktODQzZC05MmViMWU2YmJiYTYifX0sIm5iZiI6MCwic3ViIjoic3lzdGVtOnNlcnZpY2VhY2NvdW50OmRlZmF1bHQ6ZGVmYXVsdCIsInNjb3BlIjoicmVhZCB3cml0ZSJ9.m5mFXp0TDSvgLevQ76nX65N14w1RxTClMaannLLOuBIUEsmXhMYZjGtf5mWMcxVOkSh65rLFiKugaMXgv877Mg // notsecret
tokenMultipleAudienceNotExpired = "eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiIsImtpZCI6Ijk4ZDU3YmUwNWI3ZjUzNWIwMzYyYjg2MDJhNTJlNGYxIn0.eyJhdWQiOlsiaHR0cHM6Ly9rdWJlcm5ldGVzLmRlZmF1bHQuc3ZjLmNsdXN0ZXIubG9jYWwiLCJtY3Atc2VydmVyIl0sImV4cCI6MjUzNDAyMjk3MTk5LCJpYXQiOjAsImlzcyI6Imh0dHBzOi8va3ViZXJuZXRlcy5kZWZhdWx0LnN2Yy5jbHVzdGVyLmxvY2FsIiwianRpIjoiOTkyMjJkNTYtMzQwZS00ZWI2LTg1ODgtMjYxNDExZjM1ZDI2Iiwia3ViZXJuZXRlcy5pbyI6eyJuYW1lc3BhY2UiOiJkZWZhdWx0Iiwic2VydmljZWFjY291bnQiOnsibmFtZSI6ImRlZmF1bHQiLCJ1aWQiOiJlYWNiNmFkMi04MGI3LTQxNzktODQzZC05MmViMWU2YmJiYTYifX0sIm5iZiI6MCwic3ViIjoic3lzdGVtOnNlcnZpY2VhY2NvdW50OmRlZmF1bHQ6ZGVmYXVsdCIsInNjb3BlIjoicmVhZCB3cml0ZSJ9.m5mFXp0TDSvgLevQ76nX65N14w1RxTClMaannLLOuBIUEsmXhMYZjGtf5mWMcxVOkSh65rLFiKugaMXgv877Mg" // notsecret
)
func TestParseJWTClaimsPayloadValid(t *testing.T) {
basicClaims, err := ParseJWTClaims(tokenBasicNotExpired)
t.Run("Is parseable", func(t *testing.T) {
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if basicClaims == nil {
t.Fatal("expected claims, got nil")
}
})
t.Run("Parses issuer", func(t *testing.T) {
if basicClaims.Issuer != "https://kubernetes.default.svc.cluster.local" {
t.Errorf("expected issuer 'https://kubernetes.default.svc.cluster.local', got %s", basicClaims.Issuer)
}
})
t.Run("Parses audience", func(t *testing.T) {
expectedAudiences := []string{"https://kubernetes.default.svc.cluster.local", "mcp-server"}
for _, expected := range expectedAudiences {
if !basicClaims.Audience.Contains(expected) {
t.Errorf("expected audience to contain %s", expected)
}
}
})
t.Run("Parses expiration", func(t *testing.T) {
if *basicClaims.Expiry != jwt.NumericDate(253402297199) {
t.Errorf("expected expiration 253402297199, got %d", basicClaims.Expiry)
}
})
t.Run("Parses scope", func(t *testing.T) {
scopeClaims, err := ParseJWTClaims(tokenMultipleAudienceNotExpired)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if scopeClaims == nil {
t.Fatal("expected claims, got nil")
}
scopes := scopeClaims.GetScopes()
expectedScopes := []string{"read", "write"}
if len(scopes) != len(expectedScopes) {
t.Errorf("expected %d scopes, got %d", len(expectedScopes), len(scopes))
}
for i, expectedScope := range expectedScopes {
if scopes[i] != expectedScope {
t.Errorf("expected scope[%d] to be '%s', got '%s'", i, expectedScope, scopes[i])
}
}
})
t.Run("Parses expired token", func(t *testing.T) {
expiredClaims, err := ParseJWTClaims(tokenBasicExpired)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if *expiredClaims.Expiry != jwt.NumericDate(1) {
t.Errorf("expected expiration 1, got %d", basicClaims.Expiry)
}
})
}
func TestParseJWTClaimsPayloadInvalid(t *testing.T) {
t.Run("invalid token segments", func(t *testing.T) {
invalidToken := "header.payload.signature.extra"
_, err := ParseJWTClaims(invalidToken)
if err == nil {
t.Fatal("expected error for invalid token segments, got nil")
}
if !strings.Contains(err.Error(), "compact JWS format must have three parts") {
t.Errorf("expected invalid token segments error message, got %v", err)
}
})
t.Run("invalid base64 payload", func(t *testing.T) {
invalidPayload := strings.ReplaceAll(tokenBasicNotExpired, ".", ".invalid")
_, err := ParseJWTClaims(invalidPayload)
if err == nil {
t.Fatal("expected error for invalid base64, got nil")
}
if !strings.Contains(err.Error(), "illegal base64 data") {
t.Errorf("expected decode error message, got %v", err)
}
})
}
func TestJWTTokenValidateOffline(t *testing.T) {
t.Run("expired token returns error", func(t *testing.T) {
claims, err := ParseJWTClaims(tokenBasicExpired)
if err != nil {
t.Fatalf("expected no error for expired token parsing, got %v", err)
}
err = claims.ValidateOffline("mcp-server")
if err == nil {
t.Fatalf("expected error for expired token, got nil")
}
if !strings.Contains(err.Error(), "token is expired (exp)") {
t.Errorf("expected expiration error message, got %v", err)
}
})
t.Run("multiple audiences with correct one", func(t *testing.T) {
claims, err := ParseJWTClaims(tokenMultipleAudienceNotExpired)
if err != nil {
t.Fatalf("expected no error for multiple audience token parsing, got %v", err)
}
if claims == nil {
t.Fatalf("expected claims to be returned, got nil")
}
err = claims.ValidateOffline("mcp-server")
if err != nil {
t.Fatalf("expected no error for valid audience, got %v", err)
}
})
t.Run("multiple audiences with mismatch returns error", func(t *testing.T) {
claims, err := ParseJWTClaims(tokenMultipleAudienceNotExpired)
if err != nil {
t.Fatalf("expected no error for multiple audience token parsing, got %v", err)
}
if claims == nil {
t.Fatalf("expected claims to be returned, got nil")
}
err = claims.ValidateOffline("missing-audience")
if err == nil {
t.Fatalf("expected error for token with wrong audience, got nil")
}
if !strings.Contains(err.Error(), "invalid audience claim (aud)") {
t.Errorf("expected audience mismatch error, got %v", err)
}
})
}
func TestJWTClaimsGetScopes(t *testing.T) {
t.Run("no scopes", func(t *testing.T) {
claims, err := ParseJWTClaims(tokenBasicExpired)
if err != nil {
t.Fatalf("expected no error for parsing token, got %v", err)
}
if scopes := claims.GetScopes(); len(scopes) != 0 {
t.Errorf("expected no scopes, got %d", len(scopes))
}
})
t.Run("single scope", func(t *testing.T) {
claims := &JWTClaims{
Scope: "read",
}
scopes := claims.GetScopes()
expected := []string{"read"}
if len(scopes) != 1 {
t.Errorf("expected 1 scope, got %d", len(scopes))
}
if scopes[0] != expected[0] {
t.Errorf("expected scope 'read', got '%s'", scopes[0])
}
})
t.Run("multiple scopes", func(t *testing.T) {
claims := &JWTClaims{
Scope: "read write admin",
}
scopes := claims.GetScopes()
expected := []string{"read", "write", "admin"}
if len(scopes) != 3 {
t.Errorf("expected 3 scopes, got %d", len(scopes))
}
for i, expectedScope := range expected {
if i >= len(scopes) || scopes[i] != expectedScope {
t.Errorf("expected scope[%d] to be '%s', got '%s'", i, expectedScope, scopes[i])
}
}
})
t.Run("scopes with extra whitespace", func(t *testing.T) {
claims := &JWTClaims{
Scope: " read write admin ",
}
scopes := claims.GetScopes()
expected := []string{"read", "write", "admin"}
if len(scopes) != 3 {
t.Errorf("expected 3 scopes, got %d", len(scopes))
}
for i, expectedScope := range expected {
if i >= len(scopes) || scopes[i] != expectedScope {
t.Errorf("expected scope[%d] to be '%s', got '%s'", i, expectedScope, scopes[i])
}
}
})
}
```
--------------------------------------------------------------------------------
/pkg/mcp/tool_mutator_test.go:
--------------------------------------------------------------------------------
```go
package mcp
import (
"testing"
"github.com/containers/kubernetes-mcp-server/pkg/api"
"github.com/google/jsonschema-go/jsonschema"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"k8s.io/utils/ptr"
)
// createTestTool creates a basic ServerTool for testing
func createTestTool(name string) api.ServerTool {
return api.ServerTool{
Tool: api.Tool{
Name: name,
Description: "A test tool",
InputSchema: &jsonschema.Schema{
Type: "object",
Properties: make(map[string]*jsonschema.Schema),
},
},
}
}
// createTestToolWithNilSchema creates a ServerTool with nil InputSchema for testing
func createTestToolWithNilSchema(name string) api.ServerTool {
return api.ServerTool{
Tool: api.Tool{
Name: name,
Description: "A test tool",
InputSchema: nil,
},
}
}
// createTestToolWithNilProperties creates a ServerTool with nil Properties for testing
func createTestToolWithNilProperties(name string) api.ServerTool {
return api.ServerTool{
Tool: api.Tool{
Name: name,
Description: "A test tool",
InputSchema: &jsonschema.Schema{
Type: "object",
Properties: nil,
},
},
}
}
// createTestToolWithExistingProperties creates a ServerTool with existing properties for testing
func createTestToolWithExistingProperties(name string) api.ServerTool {
return api.ServerTool{
Tool: api.Tool{
Name: name,
Description: "A test tool",
InputSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"existing-prop": {Type: "string"},
},
},
},
}
}
func TestWithClusterParameter(t *testing.T) {
tests := []struct {
name string
defaultCluster string
targetParameterName string
clusters []string
toolName string
toolFactory func(string) api.ServerTool
expectCluster bool
expectEnum bool
enumCount int
}{
{
name: "adds cluster parameter when multiple clusters provided",
defaultCluster: "default-cluster",
clusters: []string{"cluster1", "cluster2", "cluster3"},
toolName: "test-tool",
toolFactory: createTestTool,
expectCluster: true,
expectEnum: true,
enumCount: 3,
},
{
name: "does not add cluster parameter when single cluster provided",
defaultCluster: "default-cluster",
clusters: []string{"single-cluster"},
toolName: "test-tool",
toolFactory: createTestTool,
expectCluster: false,
expectEnum: false,
enumCount: 0,
},
{
name: "creates InputSchema when nil",
defaultCluster: "default-cluster",
clusters: []string{"cluster1", "cluster2"},
toolName: "test-tool",
toolFactory: createTestToolWithNilSchema,
expectCluster: true,
expectEnum: true,
enumCount: 2,
},
{
name: "creates Properties map when nil",
defaultCluster: "default-cluster",
clusters: []string{"cluster1", "cluster2"},
toolName: "test-tool",
toolFactory: createTestToolWithNilProperties,
expectCluster: true,
expectEnum: true,
enumCount: 2,
},
{
name: "preserves existing properties",
defaultCluster: "default-cluster",
clusters: []string{"cluster1", "cluster2"},
toolName: "test-tool",
toolFactory: createTestToolWithExistingProperties,
expectCluster: true,
expectEnum: true,
enumCount: 2,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.targetParameterName == "" {
tt.targetParameterName = "cluster"
}
mutator := WithTargetParameter(tt.defaultCluster, tt.targetParameterName, tt.clusters)
tool := tt.toolFactory(tt.toolName)
originalTool := tool // Keep reference to check if tool was unchanged
result := mutator(tool)
if !tt.expectCluster {
if tt.toolName == "skip-this-tool" {
// For skipped tools, the entire tool should be unchanged
assert.Equal(t, originalTool, result)
} else {
// For single cluster, schema should exist but no cluster property
require.NotNil(t, result.Tool.InputSchema)
require.NotNil(t, result.Tool.InputSchema.Properties)
_, exists := result.Tool.InputSchema.Properties["cluster"]
assert.False(t, exists, "cluster property should not exist")
}
return
}
// Common assertions for cases where cluster parameter should be added
require.NotNil(t, result.Tool.InputSchema)
assert.Equal(t, "object", result.Tool.InputSchema.Type)
require.NotNil(t, result.Tool.InputSchema.Properties)
clusterProperty, exists := result.Tool.InputSchema.Properties["cluster"]
assert.True(t, exists, "cluster property should exist")
assert.NotNil(t, clusterProperty)
assert.Equal(t, "string", clusterProperty.Type)
assert.Contains(t, clusterProperty.Description, tt.defaultCluster)
if tt.expectEnum {
assert.NotNil(t, clusterProperty.Enum)
assert.Equal(t, tt.enumCount, len(clusterProperty.Enum))
for _, cluster := range tt.clusters {
assert.Contains(t, clusterProperty.Enum, cluster)
}
}
})
}
}
func TestCreateClusterProperty(t *testing.T) {
tests := []struct {
name string
defaultCluster string
targetName string
clusters []string
expectEnum bool
expectedCount int
}{
{
name: "creates property with enum when clusters <= maxClustersInEnum",
defaultCluster: "default",
targetName: "cluster",
clusters: []string{"cluster1", "cluster2", "cluster3"},
expectEnum: true,
expectedCount: 3,
},
{
name: "creates property without enum when clusters > maxClustersInEnum",
defaultCluster: "default",
targetName: "cluster",
clusters: make([]string, maxTargetsInEnum+5), // 20 clusters
expectEnum: false,
expectedCount: 0,
},
{
name: "creates property with exact maxClustersInEnum clusters",
defaultCluster: "default",
targetName: "cluster",
clusters: make([]string, maxTargetsInEnum),
expectEnum: true,
expectedCount: maxTargetsInEnum,
},
{
name: "handles single cluster",
defaultCluster: "default",
targetName: "cluster",
clusters: []string{"single-cluster"},
expectEnum: true,
expectedCount: 1,
},
{
name: "handles empty clusters list",
defaultCluster: "default",
targetName: "cluster",
clusters: []string{},
expectEnum: true,
expectedCount: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Initialize clusters with names if they were created with make()
if len(tt.clusters) > 3 && tt.clusters[0] == "" {
for i := range tt.clusters {
tt.clusters[i] = "cluster" + string(rune('A'+i))
}
}
property := createTargetProperty(tt.defaultCluster, tt.targetName, tt.clusters)
assert.Equal(t, "string", property.Type)
assert.Contains(t, property.Description, tt.defaultCluster)
assert.Contains(t, property.Description, "Defaults to "+tt.defaultCluster+" if not set")
if tt.expectEnum {
assert.NotNil(t, property.Enum, "enum should be created")
assert.Equal(t, tt.expectedCount, len(property.Enum))
if tt.expectedCount > 0 && tt.expectedCount <= 3 {
// Only check specific values for small, predefined lists
for _, cluster := range tt.clusters {
assert.Contains(t, property.Enum, cluster)
}
}
} else {
assert.Nil(t, property.Enum, "enum should not be created for too many clusters")
}
})
}
}
func TestToolMutatorType(t *testing.T) {
t.Run("ToolMutator type can be used as function", func(t *testing.T) {
var mutator ToolMutator = func(tool api.ServerTool) api.ServerTool {
tool.Tool.Name = "modified-" + tool.Tool.Name
return tool
}
originalTool := createTestTool("original")
result := mutator(originalTool)
assert.Equal(t, "modified-original", result.Tool.Name)
})
}
func TestMaxClustersInEnumConstant(t *testing.T) {
t.Run("maxClustersInEnum has expected value", func(t *testing.T) {
assert.Equal(t, 5, maxTargetsInEnum, "maxClustersInEnum should be 5")
})
}
type TargetParameterToolMutatorSuite struct {
suite.Suite
}
func (s *TargetParameterToolMutatorSuite) TestClusterAwareTool() {
tm := WithTargetParameter("default-cluster", "cluster", []string{"cluster-1", "cluster-2", "cluster-3"})
tool := createTestTool("cluster-aware-tool")
// Tools are cluster-aware by default
tm(tool)
s.Require().NotNil(tool.Tool.InputSchema.Properties)
s.Run("adds cluster parameter", func() {
s.NotNil(tool.Tool.InputSchema.Properties["cluster"], "Expected cluster property to be added")
})
s.Run("adds correct description", func() {
desc := tool.Tool.InputSchema.Properties["cluster"].Description
s.Contains(desc, "Optional parameter selecting which cluster to run the tool in", "Expected description to mention cluster selection")
s.Contains(desc, "Defaults to default-cluster if not set", "Expected description to mention default cluster")
})
s.Run("adds enum with clusters", func() {
s.Require().NotNil(tool.Tool.InputSchema.Properties["cluster"])
enum := tool.Tool.InputSchema.Properties["cluster"].Enum
s.NotNilf(enum, "Expected enum to be set")
s.Equal(3, len(enum), "Expected enum to have 3 entries")
s.Contains(enum, "cluster-1", "Expected enum to contain cluster-1")
s.Contains(enum, "cluster-2", "Expected enum to contain cluster-2")
s.Contains(enum, "cluster-3", "Expected enum to contain cluster-3")
})
}
func (s *TargetParameterToolMutatorSuite) TestClusterAwareToolSingleCluster() {
tm := WithTargetParameter("default", "cluster", []string{"only-cluster"})
tool := createTestTool("cluster-aware-tool-single-cluster")
// Tools are cluster-aware by default
tm(tool)
s.Run("does not add cluster parameter for single cluster", func() {
s.Nilf(tool.Tool.InputSchema.Properties["cluster"], "Expected cluster property to not be added for single cluster")
})
}
func (s *TargetParameterToolMutatorSuite) TestClusterAwareToolMultipleClusters() {
tm := WithTargetParameter("default", "cluster", []string{"cluster-1", "cluster-2", "cluster-3", "cluster-4", "cluster-5", "cluster-6"})
tool := createTestTool("cluster-aware-tool-multiple-clusters")
// Tools are cluster-aware by default
tm(tool)
s.Run("adds cluster parameter", func() {
s.NotNilf(tool.Tool.InputSchema.Properties["cluster"], "Expected cluster property to be added")
})
s.Run("does not add enum when list of clusters is > 5", func() {
s.Require().NotNil(tool.Tool.InputSchema.Properties["cluster"])
enum := tool.Tool.InputSchema.Properties["cluster"].Enum
s.Nilf(enum, "Expected enum to not be set for too many clusters")
})
}
func (s *TargetParameterToolMutatorSuite) TestNonClusterAwareTool() {
tm := WithTargetParameter("default", "cluster", []string{"cluster-1", "cluster-2"})
tool := createTestTool("non-cluster-aware-tool")
tool.ClusterAware = ptr.To(false)
tm(tool)
s.Run("does not add cluster parameter", func() {
s.Nilf(tool.Tool.InputSchema.Properties["cluster"], "Expected cluster property to not be added")
})
}
func TestTargetParameterToolMutator(t *testing.T) {
suite.Run(t, new(TargetParameterToolMutatorSuite))
}
```