From 5a8d77a2ae29f151d493c7f2a6eddce6d48ea7c7 Mon Sep 17 00:00:00 2001 From: Henry Wu Date: Wed, 27 Nov 2024 00:47:44 +0000 Subject: [PATCH 1/2] Add statusz endpoint for kube-controller-manager --- .../app/controllermanager.go | 16 +- test/integration/serving/serving_test.go | 140 ++++++++++++++++++ 2 files changed, 152 insertions(+), 4 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index fc3f899d3a7..2b9f2535057 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -64,6 +64,8 @@ import ( "k8s.io/component-base/term" utilversion "k8s.io/component-base/version" "k8s.io/component-base/version/verflag" + zpagesfeatures "k8s.io/component-base/zpages/features" + "k8s.io/component-base/zpages/statusz" genericcontrollermanager "k8s.io/controller-manager/app" "k8s.io/controller-manager/controller" "k8s.io/controller-manager/pkg/clientbuilder" @@ -91,6 +93,8 @@ const ( ControllerStartJitter = 1.0 // ConfigzName is the name used for register kube-controller manager /configz, same with GroupName. ConfigzName = "kubecontrollermanager.config.k8s.io" + // kubeControllerManager defines variable used internally when referring to cloud-controller-manager component + kubeControllerManager = "kube-controller-manager" ) // NewControllerManagerCommand creates a *cobra.Command object with default parameters @@ -105,7 +109,7 @@ func NewControllerManagerCommand() *cobra.Command { } cmd := &cobra.Command{ - Use: "kube-controller-manager", + Use: kubeControllerManager, Long: `The Kubernetes controller manager is a daemon that embeds the core control loops shipped with Kubernetes. In applications of robotics and automation, a control loop is a non-terminating loop that regulates the state of @@ -213,6 +217,10 @@ func Run(ctx context.Context, c *config.CompletedConfig) error { unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, healthzHandler) slis.SLIMetricsWithReset{}.Install(unsecuredMux) + if utilfeature.DefaultFeatureGate.Enabled(zpagesfeatures.ComponentStatusz) { + statusz.Install(unsecuredMux, kubeControllerManager, statusz.NewRegistry()) + } + handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication) // TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve if _, _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil { @@ -267,7 +275,7 @@ func Run(ctx context.Context, c *config.CompletedConfig) error { logger.Info("starting leader migration") leaderMigrator = leadermigration.NewLeaderMigrator(&c.ComponentConfig.Generic.LeaderMigration, - "kube-controller-manager") + kubeControllerManager) // startSATokenControllerInit is the original InitFunc. startSATokenControllerInit := saTokenControllerDescriptor.GetInitFunc() @@ -295,7 +303,7 @@ func Run(ctx context.Context, c *config.CompletedConfig) error { c.Client, "kube-system", id, - "kube-controller-manager", + kubeControllerManager, binaryVersion.FinalizeVersion(), emulationVersion.FinalizeVersion(), coordinationv1.OldestEmulationVersion, @@ -634,7 +642,7 @@ func CreateControllerContext(ctx context.Context, s *config.CompletedConfig, roo RESTMapper: restMapper, InformersStarted: make(chan struct{}), ResyncPeriod: ResyncPeriod(s), - ControllerManagerMetrics: controllersmetrics.NewControllerManagerMetrics("kube-controller-manager"), + ControllerManagerMetrics: controllersmetrics.NewControllerManagerMetrics(kubeControllerManager), } if controllerContext.ComponentConfig.GarbageCollectorController.EnableGarbageCollector && diff --git a/test/integration/serving/serving_test.go b/test/integration/serving/serving_test.go index b6aff38b7bf..6bb0df3566c 100644 --- a/test/integration/serving/serving_test.go +++ b/test/integration/serving/serving_test.go @@ -30,9 +30,12 @@ import ( "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/options" + utilfeature "k8s.io/apiserver/pkg/util/feature" cloudprovider "k8s.io/cloud-provider" cloudctrlmgrtesting "k8s.io/cloud-provider/app/testing" "k8s.io/cloud-provider/fake" + featuregatetesting "k8s.io/component-base/featuregate/testing" + zpagesfeatures "k8s.io/component-base/zpages/features" "k8s.io/klog/v2/ktesting" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" kubectrlmgrtesting "k8s.io/kubernetes/cmd/kube-controller-manager/app/testing" @@ -289,3 +292,140 @@ func fakeCloudProviderFactory(io.Reader) (cloudprovider.Interface, error) { DisableRoutes: true, // disable routes for server tests, otherwise --cluster-cidr is required }, nil } + +func TestKubeControllerManagerServingStatusz(t *testing.T) { + + // authenticate to apiserver via bearer token + token := "flwqkenfjasasdfmwerasd" // Fake token for testing. + tokenFile, err := os.CreateTemp("", "kubeconfig") + if err != nil { + t.Fatal(err) + } + if _, err = tokenFile.WriteString(fmt.Sprintf(` +%s,system:kube-controller-manager,system:kube-controller-manager,"" +`, token)); err != nil { + t.Fatal(err) + } + if err = tokenFile.Close(); err != nil { + t.Fatal(err) + } + + // start apiserver + server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{ + "--token-auth-file", tokenFile.Name(), + "--authorization-mode", "RBAC", + }, framework.SharedEtcd()) + defer server.TearDownFn() + + // create kubeconfig for the apiserver + apiserverConfig, err := os.CreateTemp("", "kubeconfig") + if err != nil { + t.Fatal(err) + } + if _, err = apiserverConfig.WriteString(fmt.Sprintf(` +apiVersion: v1 +kind: Config +clusters: +- cluster: + server: %s + certificate-authority: %s + name: integration +contexts: +- context: + cluster: integration + user: controller-manager + name: default-context +current-context: default-context +users: +- name: controller-manager + user: + token: %s +`, server.ClientConfig.Host, server.ServerOpts.SecureServing.ServerCert.CertKey.CertFile, token)); err != nil { + t.Fatal(err) + } + if err = apiserverConfig.Close(); err != nil { + t.Fatal(err) + } + + tests := []struct { + name string + flags []string + path string + anonymous bool // to use the token or not + wantErr bool + wantSecureCode *int + }{ + {"serving /statusz", []string{ + "--authentication-skip-lookup", // to survive inaccessible extensions-apiserver-authentication configmap + "--authentication-kubeconfig", apiserverConfig.Name(), + "--authorization-kubeconfig", apiserverConfig.Name(), + "--authorization-always-allow-paths", "/statusz", + "--kubeconfig", apiserverConfig.Name(), + "--leader-elect=false", + }, "/statusz", false, false, intPtr(http.StatusOK)}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, zpagesfeatures.ComponentStatusz, true) + _, ctx := ktesting.NewTestContext(t) + secureOptions, secureInfo, tearDownFn, err := kubeControllerManagerTester{}.StartTestServer(ctx, append(append([]string{}, tt.flags...), []string{}...)) + if tearDownFn != nil { + defer tearDownFn() + } + if (err != nil) != tt.wantErr { + t.Fatalf("StartTestServer() error = %v, wantErr %v", err, tt.wantErr) + } + if err != nil { + return + } + + if want, got := tt.wantSecureCode != nil, secureInfo != nil; want != got { + t.Errorf("SecureServing enabled: expected=%v got=%v", want, got) + } else if want { + url := fmt.Sprintf("https://%s%s", secureInfo.Listener.Addr().String(), tt.path) + url = strings.ReplaceAll(url, "[::]", "127.0.0.1") // switch to IPv4 because the self-signed cert does not support [::] + + // read self-signed server cert disk + pool := x509.NewCertPool() + serverCertPath := path.Join(secureOptions.ServerCert.CertDirectory, secureOptions.ServerCert.PairName+".crt") + serverCert, err := os.ReadFile(serverCertPath) + if err != nil { + t.Fatalf("Failed to read component server cert %q: %v", serverCertPath, err) + } + pool.AppendCertsFromPEM(serverCert) + tr := &http.Transport{ + TLSClientConfig: &tls.Config{ + RootCAs: pool, + }, + } + + client := &http.Client{Transport: tr} + req, err := http.NewRequest(http.MethodGet, url, nil) + req.Header.Set("Accept", "text/plain") + if err != nil { + t.Fatal(err) + } + if !tt.anonymous { + req.Header.Add("Authorization", fmt.Sprintf("Token %s", token)) + } + r, err := client.Do(req) + if err != nil { + t.Fatalf("failed to GET %s from component: %v", tt.path, err) + } + + if _, err = io.ReadAll(r.Body); err != nil { + t.Fatalf("failed to read response body: %v", err) + } + defer func() { + if err := r.Body.Close(); err != nil { + t.Fatalf("Error closing response body: %v", err) + } + }() + + if got, expected := r.StatusCode, *tt.wantSecureCode; got != expected { + t.Fatalf("expected http %d at %s of component, got: %d", expected, tt.path, got) + } + } + }) + } +} From 8bd4e1bab2248e533ceabf131a4b1f3283af3d77 Mon Sep 17 00:00:00 2001 From: "Henry(Qishan) Wu" Date: Wed, 5 Feb 2025 09:25:49 -0800 Subject: [PATCH 2/2] Update test/integration/serving/serving_test.go Co-authored-by: Antonio Ojea --- test/integration/serving/serving_test.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/test/integration/serving/serving_test.go b/test/integration/serving/serving_test.go index 6bb0df3566c..d71df638ca3 100644 --- a/test/integration/serving/serving_test.go +++ b/test/integration/serving/serving_test.go @@ -22,6 +22,7 @@ import ( "crypto/x509" "fmt" "io" + "net" "net/http" "os" "path" @@ -382,8 +383,13 @@ users: if want, got := tt.wantSecureCode != nil, secureInfo != nil; want != got { t.Errorf("SecureServing enabled: expected=%v got=%v", want, got) } else if want { - url := fmt.Sprintf("https://%s%s", secureInfo.Listener.Addr().String(), tt.path) - url = strings.ReplaceAll(url, "[::]", "127.0.0.1") // switch to IPv4 because the self-signed cert does not support [::] + // only interested on the port, because we are using always localhost + _, port, err := net.SplitHostPort(secureInfo.Listener.Addr().String()) + if err != nil { + t.Fatalf("could not get host and port from %s : %v", secureInfo.Listener.Addr().String(), err) + } + // use IPv4 because the self-signed cert does not support [::] + url := fmt.Sprintf("https://127.0.0.1:%s%s", port, tt.path) // read self-signed server cert disk pool := x509.NewCertPool()