From 5e9076da93c8bd8d51dd9c6d2d4f87d5a269e586 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 17 May 2021 09:20:11 +0200 Subject: [PATCH 1/5] e2e: grab controller and scheduler metrics via port forwarding The previous approach with grabbing via a nginx proxy had some drawbacks: - it did not work when the pods only listened on localhost (as configured by kubeadm) and the proxy got deployed on a different node - starting the proxy raced with starting the pods, causing sporadic test failures because the proxy was not set up properly unless it saw all pods when starting the e2e.test - the proxy was always started, whether it is needed or not - the proxy was left running after a test and then the next test run triggered potentially confusing messages when it failed to create objects for the proxy The new approach is similar to "kubectl port-forward" + "kubectl get --raw". It uses the port forwarding feature to establish a TCP connection via a custom dialer, then lets client-go handle TLS and credentials. Somehow verifying the server certificate did not work. As this shouldn't be a big concern for E2E testing, certificate checking gets disabled on the client side instead of investigating this further. --- test/e2e/apimachinery/garbage_collector.go | 2 +- test/e2e/e2e.go | 6 - test/e2e/framework/framework.go | 4 +- test/e2e/framework/metrics/kubelet_metrics.go | 2 +- test/e2e/framework/metrics/metrics_grabber.go | 64 +++++- test/e2e/framework/metrics/metrics_proxy.go | 214 ----------------- test/e2e/framework/pod/dial.go | 215 ++++++++++++++++++ .../monitoring/metrics_grabber.go | 2 +- test/e2e/storage/csi_mock_volume.go | 2 +- test/e2e/storage/testsuites/base.go | 19 +- test/e2e/storage/testsuites/multivolume.go | 2 +- test/e2e/storage/testsuites/provisioning.go | 2 +- test/e2e/storage/testsuites/subpath.go | 2 +- test/e2e/storage/testsuites/topology.go | 2 +- test/e2e/storage/testsuites/volume_expand.go | 2 +- test/e2e/storage/testsuites/volume_io.go | 2 +- test/e2e/storage/testsuites/volume_stress.go | 2 +- test/e2e/storage/testsuites/volumemode.go | 2 +- test/e2e/storage/testsuites/volumes.go | 2 +- test/e2e/storage/volume_metrics.go | 2 +- test/e2e/suites.go | 11 +- 21 files changed, 310 insertions(+), 251 deletions(-) delete mode 100644 test/e2e/framework/metrics/metrics_proxy.go create mode 100644 test/e2e/framework/pod/dial.go diff --git a/test/e2e/apimachinery/garbage_collector.go b/test/e2e/apimachinery/garbage_collector.go index 6d0edee238e..4bfe7ba1864 100644 --- a/test/e2e/apimachinery/garbage_collector.go +++ b/test/e2e/apimachinery/garbage_collector.go @@ -254,7 +254,7 @@ func verifyRemainingObjects(f *framework.Framework, objects map[string]int) (boo func gatherMetrics(f *framework.Framework) { ginkgo.By("Gathering metrics") var summary framework.TestDataSummary - grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, false, false, true, false, false, false) + grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, f.ClientConfig(), false, false, true, false, false, false) if err != nil { framework.Logf("Failed to create MetricsGrabber. Skipping metrics gathering.") } else { diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go index 28f72057122..9b826b06112 100644 --- a/test/e2e/e2e.go +++ b/test/e2e/e2e.go @@ -46,7 +46,6 @@ import ( "k8s.io/kubernetes/test/e2e/framework" e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl" e2emanifest "k8s.io/kubernetes/test/e2e/framework/manifest" - "k8s.io/kubernetes/test/e2e/framework/metrics" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2ereporters "k8s.io/kubernetes/test/e2e/reporters" @@ -308,11 +307,6 @@ func setupSuite() { nodeKiller := framework.NewNodeKiller(framework.TestContext.NodeKiller, c, framework.TestContext.Provider) go nodeKiller.Run(framework.TestContext.NodeKiller.NodeKillerStopCh) } - - err = metrics.SetupMetricsProxy(c) - if err != nil { - framework.Logf("Fail to setup metrics proxy: %v", err) - } } // logClusterImageSources writes out cluster image sources. diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go index 143ded1781c..5d72e89f792 100644 --- a/test/e2e/framework/framework.go +++ b/test/e2e/framework/framework.go @@ -296,7 +296,7 @@ func (f *Framework) BeforeEach() { gatherMetricsAfterTest := TestContext.GatherMetricsAfterTest == "true" || TestContext.GatherMetricsAfterTest == "master" if gatherMetricsAfterTest && TestContext.IncludeClusterAutoscalerMetrics { - grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, !ProviderIs("kubemark"), false, false, false, TestContext.IncludeClusterAutoscalerMetrics, false) + grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, f.ClientConfig(), !ProviderIs("kubemark"), false, false, false, TestContext.IncludeClusterAutoscalerMetrics, false) if err != nil { Logf("Failed to create MetricsGrabber (skipping ClusterAutoscaler metrics gathering before test): %v", err) } else { @@ -449,7 +449,7 @@ func (f *Framework) AfterEach() { ginkgo.By("Gathering metrics") // Grab apiserver, scheduler, controller-manager metrics and (optionally) nodes' kubelet metrics. grabMetricsFromKubelets := TestContext.GatherMetricsAfterTest != "master" && !ProviderIs("kubemark") - grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, grabMetricsFromKubelets, true, true, true, TestContext.IncludeClusterAutoscalerMetrics, false) + grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, f.ClientConfig(), grabMetricsFromKubelets, true, true, true, TestContext.IncludeClusterAutoscalerMetrics, false) if err != nil { Logf("Failed to create MetricsGrabber (skipping metrics gathering): %v", err) } else { diff --git a/test/e2e/framework/metrics/kubelet_metrics.go b/test/e2e/framework/metrics/kubelet_metrics.go index a26eeed5604..78381493cfe 100644 --- a/test/e2e/framework/metrics/kubelet_metrics.go +++ b/test/e2e/framework/metrics/kubelet_metrics.go @@ -139,7 +139,7 @@ func getKubeletMetricsFromNode(c clientset.Interface, nodeName string) (KubeletM if c == nil { return GrabKubeletMetricsWithoutProxy(nodeName, "/metrics") } - grabber, err := NewMetricsGrabber(c, nil, true, false, false, false, false, false) + grabber, err := NewMetricsGrabber(c, nil, nil, true, false, false, false, false, false) if err != nil { return KubeletMetrics{}, err } diff --git a/test/e2e/framework/metrics/metrics_grabber.go b/test/e2e/framework/metrics/metrics_grabber.go index 6f795317420..7add8147523 100644 --- a/test/e2e/framework/metrics/metrics_grabber.go +++ b/test/e2e/framework/metrics/metrics_grabber.go @@ -18,7 +18,9 @@ package metrics import ( "context" + "errors" "fmt" + "net" "regexp" "sync" "time" @@ -27,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "k8s.io/klog/v2" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" @@ -37,7 +40,6 @@ const ( kubeSchedulerPort = 10259 // kubeControllerManagerPort is the default port for the controller manager status server. kubeControllerManagerPort = 10257 - metricsProxyPod = "metrics-proxy" // snapshotControllerPort is the port for the snapshot controller snapshotControllerPort = 9102 ) @@ -56,6 +58,7 @@ type Collection struct { type Grabber struct { client clientset.Interface externalClient clientset.Interface + config *rest.Config grabFromAPIServer bool grabFromControllerManager bool grabFromKubelets bool @@ -71,7 +74,7 @@ type Grabber struct { } // NewMetricsGrabber returns new metrics which are initialized. -func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, kubelets bool, scheduler bool, controllers bool, apiServer bool, clusterAutoscaler bool, snapshotController bool) (*Grabber, error) { +func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, config *rest.Config, kubelets bool, scheduler bool, controllers bool, apiServer bool, clusterAutoscaler bool, snapshotController bool) (*Grabber, error) { kubeScheduler := "" kubeControllerManager := "" @@ -81,6 +84,10 @@ func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, kubelets b regKubeControllerManager := regexp.MustCompile("kube-controller-manager-.*") regSnapshotController := regexp.MustCompile("volume-snapshot-controller.*") + if (scheduler || controllers) && config == nil { + return nil, errors.New("a rest config is required for grabbing kube-controller and kube-controller-manager metrics") + } + podList, err := c.CoreV1().Pods(metav1.NamespaceSystem).List(context.TODO(), metav1.ListOptions{}) if err != nil { return nil, err @@ -121,6 +128,7 @@ func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, kubelets b return &Grabber{ client: c, externalClient: ec, + config: config, grabFromAPIServer: apiServer, grabFromControllerManager: controllers, grabFromKubelets: kubelets, @@ -173,7 +181,7 @@ func (g *Grabber) GrabFromScheduler() (SchedulerMetrics, error) { g.waitForSchedulerReadyOnce.Do(func() { var lastMetricsFetchErr error if metricsWaitErr := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) { - output, lastMetricsFetchErr = g.getMetricsFromPod(g.client, metricsProxyPod, metav1.NamespaceSystem, kubeSchedulerPort) + output, lastMetricsFetchErr = g.getSecureMetricsFromPod(g.kubeScheduler, metav1.NamespaceSystem, kubeSchedulerPort) return lastMetricsFetchErr == nil, nil }); metricsWaitErr != nil { err = fmt.Errorf("error waiting for scheduler pod to expose metrics: %v; %v", metricsWaitErr, lastMetricsFetchErr) @@ -224,7 +232,7 @@ func (g *Grabber) GrabFromControllerManager() (ControllerManagerMetrics, error) var lastMetricsFetchErr error if metricsWaitErr := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) { - output, lastMetricsFetchErr = g.getMetricsFromPod(g.client, metricsProxyPod, metav1.NamespaceSystem, kubeControllerManagerPort) + output, lastMetricsFetchErr = g.getSecureMetricsFromPod(g.kubeControllerManager, metav1.NamespaceSystem, kubeControllerManagerPort) return lastMetricsFetchErr == nil, nil }); metricsWaitErr != nil { err = fmt.Errorf("error waiting for controller manager pod to expose metrics: %v; %v", metricsWaitErr, lastMetricsFetchErr) @@ -354,6 +362,7 @@ func (g *Grabber) Grab() (Collection, error) { return result, nil } +// getMetricsFromPod retrieves metrics data from an insecure port. func (g *Grabber) getMetricsFromPod(client clientset.Interface, podName string, namespace string, port int) (string, error) { rawOutput, err := client.CoreV1().RESTClient().Get(). Namespace(namespace). @@ -367,3 +376,50 @@ func (g *Grabber) getMetricsFromPod(client clientset.Interface, podName string, } return string(rawOutput), nil } + +// getSecureMetricsFromPod retrieves metrics from a pod that uses TLS +// and checks client credentials. Conceptually this function is +// similar to "kubectl port-forward" + "kubectl get --raw +// https://localhost:/metrics". It uses the same credentials +// as kubelet. +func (g *Grabber) getSecureMetricsFromPod(podName string, namespace string, port int) (string, error) { + dialer := e2epod.NewDialer(g.client, g.config) + metricConfig := rest.CopyConfig(g.config) + addr := e2epod.Addr{ + Namespace: namespace, + PodName: podName, + Port: port, + } + metricConfig.Dial = func(ctx context.Context, network, address string) (net.Conn, error) { + return dialer.DialContainerPort(ctx, addr) + } + // This should make it possible verify the server, but while it + // got past the server name check, certificate validation + // still failed. + metricConfig.Host = addr.String() + metricConfig.ServerName = "localhost" + // Verifying the pod certificate with the same root CA + // as for the API server led to an error about "unknown root + // certificate". Disabling certificate checking on the client + // side gets around that and should be good enough for + // E2E testing. + metricConfig.Insecure = true + metricConfig.CAFile = "" + metricConfig.CAData = nil + + // clientset.NewForConfig is used because + // metricClient.RESTClient() is directly usable, in contrast + // to the client constructed by rest.RESTClientFor(). + metricClient, err := clientset.NewForConfig(metricConfig) + if err != nil { + return "", err + } + + rawOutput, err := metricClient.RESTClient().Get(). + AbsPath("metrics"). + Do(context.TODO()).Raw() + if err != nil { + return "", err + } + return string(rawOutput), nil +} diff --git a/test/e2e/framework/metrics/metrics_proxy.go b/test/e2e/framework/metrics/metrics_proxy.go deleted file mode 100644 index afd04db61ab..00000000000 --- a/test/e2e/framework/metrics/metrics_proxy.go +++ /dev/null @@ -1,214 +0,0 @@ -/* -Copyright 2021 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package metrics - -import ( - "context" - "fmt" - "strings" - "time" - - v1 "k8s.io/api/core/v1" - rbacv1 "k8s.io/api/rbac/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - clientset "k8s.io/client-go/kubernetes" - "k8s.io/klog/v2" - - e2epod "k8s.io/kubernetes/test/e2e/framework/pod" - imageutils "k8s.io/kubernetes/test/utils/image" -) - -type componentInfo struct { - Name string - Port int - IP string -} - -// SetupMetricsProxy creates a nginx Pod to expose metrics from the secure port of kube-scheduler and kube-controller-manager in tests. -func SetupMetricsProxy(c clientset.Interface) error { - var infos []componentInfo - // The component pods might take some time to show up. - err := wait.PollImmediate(time.Second*5, time.Minute*5, func() (bool, error) { - podList, err := c.CoreV1().Pods(metav1.NamespaceSystem).List(context.TODO(), metav1.ListOptions{}) - if err != nil { - return false, fmt.Errorf("list pods in ns %s: %w", metav1.NamespaceSystem, err) - } - var foundComponents []componentInfo - for _, pod := range podList.Items { - if len(pod.Status.PodIP) == 0 { - continue - } - switch { - case strings.HasPrefix(pod.Name, "kube-scheduler-"): - foundComponents = append(foundComponents, componentInfo{ - Name: pod.Name, - Port: kubeSchedulerPort, - IP: pod.Status.PodIP, - }) - case strings.HasPrefix(pod.Name, "kube-controller-manager-"): - foundComponents = append(foundComponents, componentInfo{ - Name: pod.Name, - Port: kubeControllerManagerPort, - IP: pod.Status.PodIP, - }) - } - } - if len(foundComponents) != 2 { - klog.Infof("Only %d components found. Will retry.", len(foundComponents)) - klog.Infof("Found components: %v", foundComponents) - return false, nil - } - infos = foundComponents - return true, nil - }) - if err != nil { - return fmt.Errorf("missing component pods: %w", err) - } - - klog.Infof("Found components: %v", infos) - - const name = metricsProxyPod - _, err = c.CoreV1().ServiceAccounts(metav1.NamespaceSystem).Create(context.TODO(), &v1.ServiceAccount{ - ObjectMeta: metav1.ObjectMeta{Name: name}, - }, metav1.CreateOptions{}) - if err != nil { - return fmt.Errorf("create serviceAccount: %w", err) - } - _, err = c.RbacV1().ClusterRoles().Create(context.TODO(), &rbacv1.ClusterRole{ - ObjectMeta: metav1.ObjectMeta{Name: name}, - Rules: []rbacv1.PolicyRule{ - { - NonResourceURLs: []string{"/metrics"}, - Verbs: []string{"get"}, - }, - }, - }, metav1.CreateOptions{}) - if err != nil { - return fmt.Errorf("create clusterRole: %w", err) - } - _, err = c.RbacV1().ClusterRoleBindings().Create(context.TODO(), &rbacv1.ClusterRoleBinding{ - ObjectMeta: metav1.ObjectMeta{Name: name}, - Subjects: []rbacv1.Subject{ - { - Kind: rbacv1.ServiceAccountKind, - Name: name, - Namespace: metav1.NamespaceSystem, - }, - }, - RoleRef: rbacv1.RoleRef{ - APIGroup: "rbac.authorization.k8s.io", - Kind: "ClusterRole", - Name: name, - }, - }, metav1.CreateOptions{}) - if err != nil { - return fmt.Errorf("create clusterRoleBinding: %w", err) - } - - var token string - err = wait.PollImmediate(time.Second*5, time.Minute*5, func() (done bool, err error) { - sa, err := c.CoreV1().ServiceAccounts(metav1.NamespaceSystem).Get(context.TODO(), name, metav1.GetOptions{}) - if err != nil { - klog.Warningf("Fail to get serviceAccount %s: %v", name, err) - return false, nil - } - if len(sa.Secrets) < 1 { - klog.Warningf("No secret found in serviceAccount %s", name) - return false, nil - } - secretRef := sa.Secrets[0] - secret, err := c.CoreV1().Secrets(metav1.NamespaceSystem).Get(context.TODO(), secretRef.Name, metav1.GetOptions{}) - if err != nil { - klog.Warningf("Fail to get secret %s", secretRef.Name) - return false, nil - } - token = string(secret.Data["token"]) - if len(token) == 0 { - klog.Warningf("Token in secret %s is empty", secretRef.Name) - return false, nil - } - return true, nil - }) - if err != nil { - return err - } - - var nginxConfig string - for _, info := range infos { - nginxConfig += fmt.Sprintf(` -server { - listen %d; - server_name _; - proxy_set_header Authorization "Bearer %s"; - proxy_ssl_verify off; - location /metrics { - proxy_pass https://%s:%d; - } -} -`, info.Port, token, info.IP, info.Port) - } - _, err = c.CoreV1().ConfigMaps(metav1.NamespaceSystem).Create(context.TODO(), &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: metav1.NamespaceSystem, - }, - Data: map[string]string{ - "metrics.conf": nginxConfig, - }, - }, metav1.CreateOptions{}) - if err != nil { - return fmt.Errorf("create nginx configmap: %w", err) - } - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: metav1.NamespaceSystem, - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{{ - Name: "nginx", - Image: imageutils.GetE2EImage(imageutils.Nginx), - VolumeMounts: []v1.VolumeMount{{ - Name: "config", - MountPath: "/etc/nginx/conf.d", - ReadOnly: true, - }}, - }}, - Volumes: []v1.Volume{{ - Name: "config", - VolumeSource: v1.VolumeSource{ - ConfigMap: &v1.ConfigMapVolumeSource{ - LocalObjectReference: v1.LocalObjectReference{ - Name: name, - }, - }, - }, - }}, - }, - } - _, err = c.CoreV1().Pods(metav1.NamespaceSystem).Create(context.TODO(), pod, metav1.CreateOptions{}) - if err != nil { - return err - } - err = e2epod.WaitForPodNameRunningInNamespace(c, name, metav1.NamespaceSystem) - if err != nil { - return err - } - klog.Info("Successfully setup metrics-proxy") - return nil -} diff --git a/test/e2e/framework/pod/dial.go b/test/e2e/framework/pod/dial.go new file mode 100644 index 00000000000..d0ae2880acf --- /dev/null +++ b/test/e2e/framework/pod/dial.go @@ -0,0 +1,215 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "context" + "errors" + "fmt" + "io/ioutil" + "net" + "net/http" + "regexp" + "strconv" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/httpstream" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/portforward" + "k8s.io/client-go/transport/spdy" + "k8s.io/klog/v2" +) + +// NewTransport creates a transport which uses the port forward dialer. +// URLs must use .: as host. +func NewTransport(client kubernetes.Interface, restConfig *rest.Config) *http.Transport { + return &http.Transport{ + DialContext: func(ctx context.Context, _, addr string) (net.Conn, error) { + dialer := NewDialer(client, restConfig) + a, err := ParseAddr(addr) + if err != nil { + return nil, err + } + return dialer.DialContainerPort(ctx, *a) + }, + } +} + +// NewDialer creates a dialer that supports connecting to container ports. +func NewDialer(client kubernetes.Interface, restConfig *rest.Config) *Dialer { + return &Dialer{ + client: client, + restConfig: restConfig, + } +} + +// Dialer holds the relevant parameters that are independent of a particular connection. +type Dialer struct { + client kubernetes.Interface + restConfig *rest.Config +} + +// DialContainerPort connects to a certain container port in a pod. +func (d *Dialer) DialContainerPort(ctx context.Context, addr Addr) (conn net.Conn, finalErr error) { + restClient := d.client.CoreV1().RESTClient() + restConfig := d.restConfig + if restConfig.GroupVersion == nil { + restConfig.GroupVersion = &schema.GroupVersion{} + } + if restConfig.NegotiatedSerializer == nil { + restConfig.NegotiatedSerializer = scheme.Codecs + } + + // The setup code around the actual portforward is from + // https://github.com/kubernetes/kubernetes/blob/c652ffbe4a29143623a1aaec39f745575f7e43ad/staging/src/k8s.io/kubectl/pkg/cmd/portforward/portforward.go + req := restClient.Post(). + Resource("pods"). + Namespace(addr.Namespace). + Name(addr.PodName). + SubResource("portforward") + transport, upgrader, err := spdy.RoundTripperFor(restConfig) + if err != nil { + return nil, fmt.Errorf("create round tripper: %v", err) + } + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", req.URL()) + + streamConn, _, err := dialer.Dial(portforward.PortForwardProtocolV1Name) + if err != nil { + return nil, fmt.Errorf("dialer failed: %v", err) + } + requestID := "1" + defer func() { + if finalErr != nil { + streamConn.Close() + } + }() + + // create error stream + headers := http.Header{} + headers.Set(v1.StreamType, v1.StreamTypeError) + headers.Set(v1.PortHeader, fmt.Sprintf("%d", addr.Port)) + headers.Set(v1.PortForwardRequestIDHeader, requestID) + + // We're not writing to this stream, just reading an error message from it. + // This happens asynchronously. + errorStream, err := streamConn.CreateStream(headers) + if err != nil { + return nil, fmt.Errorf("error creating error stream: %v", err) + } + errorStream.Close() + go func() { + message, err := ioutil.ReadAll(errorStream) + switch { + case err != nil: + klog.ErrorS(err, "error reading from error stream") + case len(message) > 0: + klog.ErrorS(errors.New(string(message)), "an error occurred connecting to the remote port") + } + }() + + // create data stream + headers.Set(v1.StreamType, v1.StreamTypeData) + dataStream, err := streamConn.CreateStream(headers) + if err != nil { + return nil, fmt.Errorf("error creating data stream: %v", err) + } + + return &stream{ + Stream: dataStream, + streamConn: streamConn, + }, nil +} + +// Addr contains all relevant parameters for a certain port in a pod. +// The container should be running before connections are attempted, +// otherwise the connection will fail. +type Addr struct { + Namespace, PodName string + Port int +} + +var _ net.Addr = Addr{} + +func (a Addr) Network() string { + return "port-forwarding" +} + +func (a Addr) String() string { + return fmt.Sprintf("%s.%s:%d", a.Namespace, a.PodName, a.Port) +} + +// ParseAddr expects a .: as produced +// by Addr.String. +func ParseAddr(addr string) (*Addr, error) { + parts := addrRegex.FindStringSubmatch(addr) + if parts == nil { + return nil, fmt.Errorf("%q: must match the format .:", addr) + } + port, _ := strconv.Atoi(parts[3]) + return &Addr{ + Namespace: parts[1], + PodName: parts[2], + Port: port, + }, nil +} + +var addrRegex = regexp.MustCompile(`^([^\.]+)\.([^:]+):(\d+)$`) + +type stream struct { + addr Addr + httpstream.Stream + streamConn httpstream.Connection +} + +var _ net.Conn = &stream{} + +func (s *stream) Close() error { + s.Stream.Close() + s.streamConn.Close() + return nil +} + +func (s *stream) LocalAddr() net.Addr { + return LocalAddr{} +} + +func (s *stream) RemoteAddr() net.Addr { + return s.addr +} + +func (s *stream) SetDeadline(t time.Time) error { + return nil +} + +func (s *stream) SetReadDeadline(t time.Time) error { + return nil +} + +func (s *stream) SetWriteDeadline(t time.Time) error { + return nil +} + +type LocalAddr struct{} + +var _ net.Addr = LocalAddr{} + +func (l LocalAddr) Network() string { return "port-forwarding" } +func (l LocalAddr) String() string { return "apiserver" } diff --git a/test/e2e/instrumentation/monitoring/metrics_grabber.go b/test/e2e/instrumentation/monitoring/metrics_grabber.go index 8ccd877f96e..0237f22f916 100644 --- a/test/e2e/instrumentation/monitoring/metrics_grabber.go +++ b/test/e2e/instrumentation/monitoring/metrics_grabber.go @@ -51,7 +51,7 @@ var _ = instrumentation.SIGDescribe("MetricsGrabber", func() { } } gomega.Eventually(func() error { - grabber, err = e2emetrics.NewMetricsGrabber(c, ec, true, true, true, true, true, true) + grabber, err = e2emetrics.NewMetricsGrabber(c, ec, f.ClientConfig(), true, true, true, true, true, true) if err != nil { return fmt.Errorf("failed to create metrics grabber: %v", err) } diff --git a/test/e2e/storage/csi_mock_volume.go b/test/e2e/storage/csi_mock_volume.go index 161a238b4e5..3cc92a896f1 100644 --- a/test/e2e/storage/csi_mock_volume.go +++ b/test/e2e/storage/csi_mock_volume.go @@ -1653,7 +1653,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { } defer cleanup() - metricsGrabber, err := e2emetrics.NewMetricsGrabber(m.config.Framework.ClientSet, nil, false, false, false, false, false, true) + metricsGrabber, err := e2emetrics.NewMetricsGrabber(m.config.Framework.ClientSet, nil, f.ClientConfig(), false, false, false, false, false, true) if err != nil { framework.Failf("Error creating metrics grabber : %v", err) } diff --git a/test/e2e/storage/testsuites/base.go b/test/e2e/storage/testsuites/base.go index 9a8f58d3f32..d5283a103cc 100644 --- a/test/e2e/storage/testsuites/base.go +++ b/test/e2e/storage/testsuites/base.go @@ -24,6 +24,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "k8s.io/component-base/metrics/testutil" csitrans "k8s.io/csi-translation-lib" "k8s.io/kubernetes/test/e2e/framework" @@ -43,6 +44,7 @@ type opCounts map[string]int64 // migrationOpCheck validates migrated metrics. type migrationOpCheck struct { cs clientset.Interface + config *rest.Config pluginName string skipCheck bool @@ -100,14 +102,14 @@ func getVolumeOpsFromMetricsForPlugin(ms testutil.Metrics, pluginName string) op return totOps } -func getVolumeOpCounts(c clientset.Interface, pluginName string) opCounts { +func getVolumeOpCounts(c clientset.Interface, config *rest.Config, pluginName string) opCounts { if !framework.ProviderIs("gce", "gke", "aws") { return opCounts{} } nodeLimit := 25 - metricsGrabber, err := e2emetrics.NewMetricsGrabber(c, nil, true, false, true, false, false, false) + metricsGrabber, err := e2emetrics.NewMetricsGrabber(c, nil, config, true, false, true, false, false, false) if err != nil { framework.ExpectNoError(err, "Error creating metrics grabber: %v", err) @@ -156,7 +158,7 @@ func addOpCounts(o1 opCounts, o2 opCounts) opCounts { return totOps } -func getMigrationVolumeOpCounts(cs clientset.Interface, pluginName string) (opCounts, opCounts) { +func getMigrationVolumeOpCounts(cs clientset.Interface, config *rest.Config, pluginName string) (opCounts, opCounts) { if len(pluginName) > 0 { var migratedOps opCounts l := csitrans.New() @@ -166,18 +168,19 @@ func getMigrationVolumeOpCounts(cs clientset.Interface, pluginName string) (opCo migratedOps = opCounts{} } else { csiName = "kubernetes.io/csi:" + csiName - migratedOps = getVolumeOpCounts(cs, csiName) + migratedOps = getVolumeOpCounts(cs, config, csiName) } - return getVolumeOpCounts(cs, pluginName), migratedOps + return getVolumeOpCounts(cs, config, pluginName), migratedOps } // Not an in-tree driver framework.Logf("Test running for native CSI Driver, not checking metrics") return opCounts{}, opCounts{} } -func newMigrationOpCheck(cs clientset.Interface, pluginName string) *migrationOpCheck { +func newMigrationOpCheck(cs clientset.Interface, config *rest.Config, pluginName string) *migrationOpCheck { moc := migrationOpCheck{ cs: cs, + config: config, pluginName: pluginName, } if len(pluginName) == 0 { @@ -214,7 +217,7 @@ func newMigrationOpCheck(cs clientset.Interface, pluginName string) *migrationOp return &moc } - moc.oldInTreeOps, moc.oldMigratedOps = getMigrationVolumeOpCounts(cs, pluginName) + moc.oldInTreeOps, moc.oldMigratedOps = getMigrationVolumeOpCounts(cs, config, pluginName) return &moc } @@ -223,7 +226,7 @@ func (moc *migrationOpCheck) validateMigrationVolumeOpCounts() { return } - newInTreeOps, _ := getMigrationVolumeOpCounts(moc.cs, moc.pluginName) + newInTreeOps, _ := getMigrationVolumeOpCounts(moc.cs, moc.config, moc.pluginName) for op, count := range newInTreeOps { if count != moc.oldInTreeOps[op] { diff --git a/test/e2e/storage/testsuites/multivolume.go b/test/e2e/storage/testsuites/multivolume.go index eedb7c9c25a..f1c0a190fac 100644 --- a/test/e2e/storage/testsuites/multivolume.go +++ b/test/e2e/storage/testsuites/multivolume.go @@ -112,7 +112,7 @@ func (t *multiVolumeTestSuite) DefineTests(driver storageframework.TestDriver, p // Now do the more expensive test initialization. l.config, l.driverCleanup = driver.PrepareTest(f) - l.migrationCheck = newMigrationOpCheck(f.ClientSet, dInfo.InTreePluginName) + l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName) } cleanup := func() { diff --git a/test/e2e/storage/testsuites/provisioning.go b/test/e2e/storage/testsuites/provisioning.go index 5283cbdee71..8897b6ed558 100644 --- a/test/e2e/storage/testsuites/provisioning.go +++ b/test/e2e/storage/testsuites/provisioning.go @@ -134,7 +134,7 @@ func (p *provisioningTestSuite) DefineTests(driver storageframework.TestDriver, dDriver, _ = driver.(storageframework.DynamicPVTestDriver) // Now do the more expensive test initialization. l.config, l.driverCleanup = driver.PrepareTest(f) - l.migrationCheck = newMigrationOpCheck(f.ClientSet, dInfo.InTreePluginName) + l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName) l.cs = l.config.Framework.ClientSet testVolumeSizeRange := p.GetTestSuiteInfo().SupportedSizeRange driverVolumeSizeRange := dDriver.GetDriverInfo().SupportedSizeRange diff --git a/test/e2e/storage/testsuites/subpath.go b/test/e2e/storage/testsuites/subpath.go index bb853aa3383..b81512fdb44 100644 --- a/test/e2e/storage/testsuites/subpath.go +++ b/test/e2e/storage/testsuites/subpath.go @@ -122,7 +122,7 @@ func (s *subPathTestSuite) DefineTests(driver storageframework.TestDriver, patte // Now do the more expensive test initialization. l.config, l.driverCleanup = driver.PrepareTest(f) - l.migrationCheck = newMigrationOpCheck(f.ClientSet, driver.GetDriverInfo().InTreePluginName) + l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), driver.GetDriverInfo().InTreePluginName) testVolumeSizeRange := s.GetTestSuiteInfo().SupportedSizeRange l.resource = storageframework.CreateVolumeResource(driver, l.config, pattern, testVolumeSizeRange) l.hostExec = utils.NewHostExec(f) diff --git a/test/e2e/storage/testsuites/topology.go b/test/e2e/storage/testsuites/topology.go index ac0f826f415..1d24ffdb6a0 100644 --- a/test/e2e/storage/testsuites/topology.go +++ b/test/e2e/storage/testsuites/topology.go @@ -148,7 +148,7 @@ func (t *topologyTestSuite) DefineTests(driver storageframework.TestDriver, patt StorageClassName: &(l.resource.Sc.Name), }, l.config.Framework.Namespace.Name) - l.migrationCheck = newMigrationOpCheck(f.ClientSet, dInfo.InTreePluginName) + l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName) return l } diff --git a/test/e2e/storage/testsuites/volume_expand.go b/test/e2e/storage/testsuites/volume_expand.go index 6172c305f3a..704f4bd2aa3 100644 --- a/test/e2e/storage/testsuites/volume_expand.go +++ b/test/e2e/storage/testsuites/volume_expand.go @@ -121,7 +121,7 @@ func (v *volumeExpandTestSuite) DefineTests(driver storageframework.TestDriver, // Now do the more expensive test initialization. l.config, l.driverCleanup = driver.PrepareTest(f) - l.migrationCheck = newMigrationOpCheck(f.ClientSet, driver.GetDriverInfo().InTreePluginName) + l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), driver.GetDriverInfo().InTreePluginName) testVolumeSizeRange := v.GetTestSuiteInfo().SupportedSizeRange l.resource = storageframework.CreateVolumeResource(driver, l.config, pattern, testVolumeSizeRange) } diff --git a/test/e2e/storage/testsuites/volume_io.go b/test/e2e/storage/testsuites/volume_io.go index 23aa911e2de..ad4c627d66f 100644 --- a/test/e2e/storage/testsuites/volume_io.go +++ b/test/e2e/storage/testsuites/volume_io.go @@ -117,7 +117,7 @@ func (t *volumeIOTestSuite) DefineTests(driver storageframework.TestDriver, patt // Now do the more expensive test initialization. l.config, l.driverCleanup = driver.PrepareTest(f) - l.migrationCheck = newMigrationOpCheck(f.ClientSet, dInfo.InTreePluginName) + l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName) testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange l.resource = storageframework.CreateVolumeResource(driver, l.config, pattern, testVolumeSizeRange) diff --git a/test/e2e/storage/testsuites/volume_stress.go b/test/e2e/storage/testsuites/volume_stress.go index 5741f50fe9a..289c9d154de 100644 --- a/test/e2e/storage/testsuites/volume_stress.go +++ b/test/e2e/storage/testsuites/volume_stress.go @@ -120,7 +120,7 @@ func (t *volumeStressTestSuite) DefineTests(driver storageframework.TestDriver, // Now do the more expensive test initialization. l.config, l.driverCleanup = driver.PrepareTest(f) - l.migrationCheck = newMigrationOpCheck(f.ClientSet, dInfo.InTreePluginName) + l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName) l.volumes = []*storageframework.VolumeResource{} l.pods = []*v1.Pod{} l.testOptions = *dInfo.StressTestOptions diff --git a/test/e2e/storage/testsuites/volumemode.go b/test/e2e/storage/testsuites/volumemode.go index 48df518457c..65115aa3679 100644 --- a/test/e2e/storage/testsuites/volumemode.go +++ b/test/e2e/storage/testsuites/volumemode.go @@ -115,7 +115,7 @@ func (t *volumeModeTestSuite) DefineTests(driver storageframework.TestDriver, pa // Now do the more expensive test initialization. l.config, l.driverCleanup = driver.PrepareTest(f) - l.migrationCheck = newMigrationOpCheck(f.ClientSet, dInfo.InTreePluginName) + l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName) } // manualInit initializes l.VolumeResource without creating the PV & PVC objects. diff --git a/test/e2e/storage/testsuites/volumes.go b/test/e2e/storage/testsuites/volumes.go index ab9799497f5..cef61af2c3e 100644 --- a/test/e2e/storage/testsuites/volumes.go +++ b/test/e2e/storage/testsuites/volumes.go @@ -135,7 +135,7 @@ func (t *volumesTestSuite) DefineTests(driver storageframework.TestDriver, patte // Now do the more expensive test initialization. l.config, l.driverCleanup = driver.PrepareTest(f) - l.migrationCheck = newMigrationOpCheck(f.ClientSet, dInfo.InTreePluginName) + l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName) testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange l.resource = storageframework.CreateVolumeResource(driver, l.config, pattern, testVolumeSizeRange) if l.resource.VolSource == nil { diff --git a/test/e2e/storage/volume_metrics.go b/test/e2e/storage/volume_metrics.go index 2576b6a975d..eed1f365fb6 100644 --- a/test/e2e/storage/volume_metrics.go +++ b/test/e2e/storage/volume_metrics.go @@ -81,7 +81,7 @@ var _ = utils.SIGDescribe("[Serial] Volume metrics", func() { VolumeMode: &blockMode, }, ns) - metricsGrabber, err = e2emetrics.NewMetricsGrabber(c, nil, true, false, true, false, false, false) + metricsGrabber, err = e2emetrics.NewMetricsGrabber(c, nil, f.ClientConfig(), true, false, true, false, false, false) if err != nil { framework.Failf("Error creating metrics grabber : %v", err) diff --git a/test/e2e/suites.go b/test/e2e/suites.go index 790a58978d6..d8bf0f8bd88 100644 --- a/test/e2e/suites.go +++ b/test/e2e/suites.go @@ -22,6 +22,7 @@ import ( "path" "time" + clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/test/e2e/framework" e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics" ) @@ -55,13 +56,17 @@ func AfterSuiteActions() { func gatherTestSuiteMetrics() error { framework.Logf("Gathering metrics") - c, err := framework.LoadClientset() + config, err := framework.LoadConfig() if err != nil { - return fmt.Errorf("error loading client: %v", err) + return fmt.Errorf("error loading client config: %v", err) + } + c, err := clientset.NewForConfig(config) + if err != nil { + return fmt.Errorf("error creating client: %v", err) } // Grab metrics for apiserver, scheduler, controller-manager, kubelet (for non-kubemark case) and cluster autoscaler (optionally). - grabber, err := e2emetrics.NewMetricsGrabber(c, nil, !framework.ProviderIs("kubemark"), true, true, true, framework.TestContext.IncludeClusterAutoscalerMetrics, false) + grabber, err := e2emetrics.NewMetricsGrabber(c, nil, config, !framework.ProviderIs("kubemark"), true, true, true, framework.TestContext.IncludeClusterAutoscalerMetrics, false) if err != nil { return fmt.Errorf("failed to create MetricsGrabber: %v", err) } From 1d3420ca72f6586be92d90be53e9e179633208e0 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Wed, 2 Jun 2021 18:15:22 +0200 Subject: [PATCH 2/5] e2e metrics: check whether debug handlers are available This can be checked by trying to retrieve log output. As in the case of no pod found, a warning gets emitted when log retrieval fails and metrics grabbing gets disabled. Logging is checked instead of actual metrics retrieval because the latter is more complex and thus more likely to fail for other reasons. --- test/e2e/framework/metrics/metrics_grabber.go | 43 ++++++++++++------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/test/e2e/framework/metrics/metrics_grabber.go b/test/e2e/framework/metrics/metrics_grabber.go index 7add8147523..8b260c9903f 100644 --- a/test/e2e/framework/metrics/metrics_grabber.go +++ b/test/e2e/framework/metrics/metrics_grabber.go @@ -25,6 +25,7 @@ import ( "sync" "time" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/wait" @@ -109,19 +110,7 @@ func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, config *re break } } - if kubeScheduler == "" { - scheduler = false - klog.Warningf("Can't find kube-scheduler pod. Grabbing metrics from kube-scheduler is disabled.") - } - if kubeControllerManager == "" { - controllers = false - klog.Warningf("Can't find kube-controller-manager pod. Grabbing metrics from kube-controller-manager is disabled.") - } - if snapshotControllerManager == "" { - snapshotController = false - klog.Warningf("Can't find snapshot-controller pod. Grabbing metrics from snapshot-controller is disabled.") - } - if ec == nil { + if clusterAutoscaler && ec == nil { klog.Warningf("Did not receive an external client interface. Grabbing metrics from ClusterAutoscaler is disabled.") } @@ -130,17 +119,39 @@ func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, config *re externalClient: ec, config: config, grabFromAPIServer: apiServer, - grabFromControllerManager: controllers, + grabFromControllerManager: checkPodDebugHandlers(c, controllers, "kube-controller-manager", kubeControllerManager), grabFromKubelets: kubelets, - grabFromScheduler: scheduler, + grabFromScheduler: checkPodDebugHandlers(c, scheduler, "kube-scheduler", kubeScheduler), grabFromClusterAutoscaler: clusterAutoscaler, - grabFromSnapshotController: snapshotController, + grabFromSnapshotController: checkPodDebugHandlers(c, snapshotController, "snapshot-controller", snapshotControllerManager), kubeScheduler: kubeScheduler, kubeControllerManager: kubeControllerManager, snapshotController: snapshotControllerManager, }, nil } +func checkPodDebugHandlers(c clientset.Interface, requested bool, component, podName string) bool { + if !requested { + return false + } + if podName == "" { + klog.Warningf("Can't find %s pod. Grabbing metrics from %s is disabled.", component, component) + return false + } + + // The debug handlers on the host where the pod runs might be disabled. + // We can check that indirectly by trying to retrieve log output. + limit := int64(1) + if _, err := c.CoreV1().Pods(metav1.NamespaceSystem).GetLogs(podName, &v1.PodLogOptions{LimitBytes: &limit}).DoRaw(context.TODO()); err != nil { + klog.Warningf("Can't retrieve log output of %s (%q). Debug handlers might be disabled in kubelet. Grabbing metrics from %s is disabled.", + podName, err, component) + return false + } + + // Metrics gathering enabled. + return true +} + // HasControlPlanePods returns true if metrics grabber was able to find control-plane pods func (g *Grabber) HasControlPlanePods() bool { return g.kubeScheduler != "" && g.kubeControllerManager != "" From a4c7e91b591a6704b9355d4cfefeabfaf4eb1d98 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 3 Jun 2021 10:36:02 +0200 Subject: [PATCH 3/5] e2e metrics: skip tests when metrics grabbing is disabled The MetricsGrabber checked whether a component supported metrics grabbing, but then tests didn't have an API to use the result of that check. Because metrics grabbing is an optional debug feature, tests must skip checks that depend on metrics data or, when the entire test is about metrics data, skip the test. This is now supported with a special error that gets wrapped and returned by the individual Grab functions. --- test/e2e/framework/metrics/metrics_grabber.go | 29 ++++++++++++++----- .../monitoring/metrics_grabber.go | 18 ++++++++++-- 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/test/e2e/framework/metrics/metrics_grabber.go b/test/e2e/framework/metrics/metrics_grabber.go index 8b260c9903f..b794a7a0639 100644 --- a/test/e2e/framework/metrics/metrics_grabber.go +++ b/test/e2e/framework/metrics/metrics_grabber.go @@ -45,6 +45,12 @@ const ( snapshotControllerPort = 9102 ) +// MetricsGrabbingDisabledError is an error that is wrapped by the +// different MetricsGrabber.Wrap functions when metrics grabbing is +// not supported. Tests that check metrics data should then skip +// the check. +var MetricsGrabbingDisabledError = errors.New("metrics grabbing disabled") + // Collection is metrics collection of components type Collection struct { APIServerMetrics APIServerMetrics @@ -74,7 +80,14 @@ type Grabber struct { waitForSnapshotControllerReadyOnce sync.Once } -// NewMetricsGrabber returns new metrics which are initialized. +// NewMetricsGrabber prepares for grabbing metrics data from several different +// components. It should be called when those components are running because +// it needs to communicate with them to determine for which components +// metrics data can be retrieved. +// +// Collecting metrics data is an optional debug feature. Not all clusters will +// support it. If disabled for a component, the corresponding Grab function +// will immediately return an error derived from MetricsGrabbingDisabledError. func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, config *rest.Config, kubelets bool, scheduler bool, controllers bool, apiServer bool, clusterAutoscaler bool, snapshotController bool) (*Grabber, error) { kubeScheduler := "" @@ -183,8 +196,8 @@ func (g *Grabber) grabFromKubeletInternal(nodeName string, kubeletPort int) (Kub // GrabFromScheduler returns metrics from scheduler func (g *Grabber) GrabFromScheduler() (SchedulerMetrics, error) { - if g.kubeScheduler == "" { - return SchedulerMetrics{}, fmt.Errorf("kube-scheduler pod is not registered. Skipping Scheduler's metrics gathering") + if !g.grabFromScheduler { + return SchedulerMetrics{}, fmt.Errorf("kube-scheduler: %w", MetricsGrabbingDisabledError) } var err error @@ -208,7 +221,7 @@ func (g *Grabber) GrabFromScheduler() (SchedulerMetrics, error) { // GrabFromClusterAutoscaler returns metrics from cluster autoscaler func (g *Grabber) GrabFromClusterAutoscaler() (ClusterAutoscalerMetrics, error) { if !g.HasControlPlanePods() && g.externalClient == nil { - return ClusterAutoscalerMetrics{}, fmt.Errorf("Did not find control-plane pods. Skipping ClusterAutoscaler's metrics gathering") + return ClusterAutoscalerMetrics{}, fmt.Errorf("ClusterAutoscaler: %w", MetricsGrabbingDisabledError) } var client clientset.Interface var namespace string @@ -228,8 +241,8 @@ func (g *Grabber) GrabFromClusterAutoscaler() (ClusterAutoscalerMetrics, error) // GrabFromControllerManager returns metrics from controller manager func (g *Grabber) GrabFromControllerManager() (ControllerManagerMetrics, error) { - if g.kubeControllerManager == "" { - return ControllerManagerMetrics{}, fmt.Errorf("kube-controller-manager pod is not registered. Skipping ControllerManager's metrics gathering") + if !g.grabFromControllerManager { + return ControllerManagerMetrics{}, fmt.Errorf("kube-controller-manager: %w", MetricsGrabbingDisabledError) } var err error @@ -258,8 +271,8 @@ func (g *Grabber) GrabFromControllerManager() (ControllerManagerMetrics, error) // GrabFromSnapshotController returns metrics from controller manager func (g *Grabber) GrabFromSnapshotController(podName string, port int) (SnapshotControllerMetrics, error) { - if g.snapshotController == "" { - return SnapshotControllerMetrics{}, fmt.Errorf("SnapshotController pod is not registered. Skipping SnapshotController's metrics gathering") + if !g.grabFromSnapshotController { + return SnapshotControllerMetrics{}, fmt.Errorf("snapshot controller: %w", MetricsGrabbingDisabledError) } // Use overrides if provided via test config flags. diff --git a/test/e2e/instrumentation/monitoring/metrics_grabber.go b/test/e2e/instrumentation/monitoring/metrics_grabber.go index 0237f22f916..e1adb628106 100644 --- a/test/e2e/instrumentation/monitoring/metrics_grabber.go +++ b/test/e2e/instrumentation/monitoring/metrics_grabber.go @@ -18,6 +18,7 @@ package monitoring import ( "context" + "errors" "fmt" "strings" "time" @@ -30,6 +31,7 @@ import ( "k8s.io/kubernetes/test/e2e/framework" e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics" e2enode "k8s.io/kubernetes/test/e2e/framework/node" + e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common" ) @@ -65,6 +67,9 @@ var _ = instrumentation.SIGDescribe("MetricsGrabber", func() { ginkgo.It("should grab all metrics from API server.", func() { ginkgo.By("Connecting to /metrics endpoint") response, err := grabber.GrabFromAPIServer() + if errors.Is(err, e2emetrics.MetricsGrabbingDisabledError) { + e2eskipper.Skipf("%v", err) + } framework.ExpectNoError(err) gomega.Expect(response).NotTo(gomega.BeEmpty()) }) @@ -72,6 +77,9 @@ var _ = instrumentation.SIGDescribe("MetricsGrabber", func() { ginkgo.It("should grab all metrics from a Kubelet.", func() { ginkgo.By("Proxying to Node through the API server") node, err := e2enode.GetRandomReadySchedulableNode(f.ClientSet) + if errors.Is(err, e2emetrics.MetricsGrabbingDisabledError) { + e2eskipper.Skipf("%v", err) + } framework.ExpectNoError(err) response, err := grabber.GrabFromKubelet(node.Name) framework.ExpectNoError(err) @@ -81,10 +89,13 @@ var _ = instrumentation.SIGDescribe("MetricsGrabber", func() { ginkgo.It("should grab all metrics from a Scheduler.", func() { ginkgo.By("Proxying to Pod through the API server") if !masterRegistered { - framework.Logf("Master is node api.Registry. Skipping testing Scheduler metrics.") + e2eskipper.Skipf("Master is node api.Registry. Skipping testing Scheduler metrics.") return } response, err := grabber.GrabFromScheduler() + if errors.Is(err, e2emetrics.MetricsGrabbingDisabledError) { + e2eskipper.Skipf("%v", err) + } framework.ExpectNoError(err) gomega.Expect(response).NotTo(gomega.BeEmpty()) }) @@ -92,10 +103,13 @@ var _ = instrumentation.SIGDescribe("MetricsGrabber", func() { ginkgo.It("should grab all metrics from a ControllerManager.", func() { ginkgo.By("Proxying to Pod through the API server") if !masterRegistered { - framework.Logf("Master is node api.Registry. Skipping testing ControllerManager metrics.") + e2eskipper.Skipf("Master is node api.Registry. Skipping testing ControllerManager metrics.") return } response, err := grabber.GrabFromControllerManager() + if errors.Is(err, e2emetrics.MetricsGrabbingDisabledError) { + e2eskipper.Skipf("%v", err) + } framework.ExpectNoError(err) gomega.Expect(response).NotTo(gomega.BeEmpty()) }) From f298a658aeda7625a63a07086aad0b07f246f5cc Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 3 Jun 2021 14:13:22 +0200 Subject: [PATCH 4/5] e2e metrics: remove redundant checks around metrics tests The MetricsGrabber itself knows now whether it supports each component. The checks inside the tests therefore are redundant at best or worse, they are wrong: for example, on a KinD cluster the check for "has master node registered" failed and metrics grabbing from scheduler and controller manager were skipped unnecessarily. --- .../monitoring/metrics_grabber.go | 23 ------------------- 1 file changed, 23 deletions(-) diff --git a/test/e2e/instrumentation/monitoring/metrics_grabber.go b/test/e2e/instrumentation/monitoring/metrics_grabber.go index e1adb628106..efe230ab4ee 100644 --- a/test/e2e/instrumentation/monitoring/metrics_grabber.go +++ b/test/e2e/instrumentation/monitoring/metrics_grabber.go @@ -17,15 +17,12 @@ limitations under the License. package monitoring import ( - "context" "errors" "fmt" - "strings" "time" "github.com/onsi/ginkgo" "github.com/onsi/gomega" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/test/e2e/framework" @@ -39,27 +36,15 @@ var _ = instrumentation.SIGDescribe("MetricsGrabber", func() { f := framework.NewDefaultFramework("metrics-grabber") var c, ec clientset.Interface var grabber *e2emetrics.Grabber - var masterRegistered bool ginkgo.BeforeEach(func() { var err error c = f.ClientSet ec = f.KubemarkExternalClusterClientSet - // Check if master Node is registered - nodes, err := c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) - framework.ExpectNoError(err) - for _, node := range nodes.Items { - if strings.HasSuffix(node.Name, "master") { - masterRegistered = true - } - } gomega.Eventually(func() error { grabber, err = e2emetrics.NewMetricsGrabber(c, ec, f.ClientConfig(), true, true, true, true, true, true) if err != nil { return fmt.Errorf("failed to create metrics grabber: %v", err) } - if masterRegistered && !grabber.HasControlPlanePods() { - return fmt.Errorf("unable to get find control plane pods") - } return nil }, 5*time.Minute, 10*time.Second).Should(gomega.BeNil()) }) @@ -88,10 +73,6 @@ var _ = instrumentation.SIGDescribe("MetricsGrabber", func() { ginkgo.It("should grab all metrics from a Scheduler.", func() { ginkgo.By("Proxying to Pod through the API server") - if !masterRegistered { - e2eskipper.Skipf("Master is node api.Registry. Skipping testing Scheduler metrics.") - return - } response, err := grabber.GrabFromScheduler() if errors.Is(err, e2emetrics.MetricsGrabbingDisabledError) { e2eskipper.Skipf("%v", err) @@ -102,10 +83,6 @@ var _ = instrumentation.SIGDescribe("MetricsGrabber", func() { ginkgo.It("should grab all metrics from a ControllerManager.", func() { ginkgo.By("Proxying to Pod through the API server") - if !masterRegistered { - e2eskipper.Skipf("Master is node api.Registry. Skipping testing ControllerManager metrics.") - return - } response, err := grabber.GrabFromControllerManager() if errors.Is(err, e2emetrics.MetricsGrabbingDisabledError) { e2eskipper.Skipf("%v", err) From c91496dda04bdb3d685fffa4747d12a763f3ad37 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 20 May 2021 13:28:22 +0200 Subject: [PATCH 5/5] cluster: enable debug handlers on GCE master nodes This is needed for testing metrics support via the secure port of kube-scheduler and kube-controller-manager. To access that port, port-forwarding is used. --- cluster/gce/config-test.sh | 4 ++++ cluster/gce/util.sh | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/cluster/gce/config-test.sh b/cluster/gce/config-test.sh index 855c1f7b1f6..97eb02c46a1 100755 --- a/cluster/gce/config-test.sh +++ b/cluster/gce/config-test.sh @@ -236,6 +236,10 @@ TEST_CLUSTER_RESYNC_PERIOD=${TEST_CLUSTER_RESYNC_PERIOD:---min-resync-period=3m} # ContentType used by all components to communicate with apiserver. TEST_CLUSTER_API_CONTENT_TYPE=${TEST_CLUSTER_API_CONTENT_TYPE:-} +# Enable debug handlers (port forwarding, exec, container logs, etc.). +KUBELET_ENABLE_DEBUGGING_HANDLERS=${KUBELET_ENABLE_DEBUGGING_HANDLERS:-true} +MASTER_KUBELET_ENABLE_DEBUGGING_HANDLERS=${MASTER_KUBELET_ENABLE_DEBUGGING_HANDLERS:-${KUBELET_ENABLE_DEBUGGING_HANDLERS}} + KUBELET_TEST_ARGS="${KUBELET_TEST_ARGS:-} --serialize-image-pulls=false ${TEST_CLUSTER_API_CONTENT_TYPE}" if [[ "${NODE_OS_DISTRIBUTION}" = 'gci' ]] || [[ "${NODE_OS_DISTRIBUTION}" = 'ubuntu' ]] || [[ "${NODE_OS_DISTRIBUTION}" = 'custom' ]]; then NODE_KUBELET_TEST_ARGS="${NODE_KUBELET_TEST_ARGS:-} --kernel-memcg-notification=true" diff --git a/cluster/gce/util.sh b/cluster/gce/util.sh index 4e2e48cda2f..3da39b90ee5 100755 --- a/cluster/gce/util.sh +++ b/cluster/gce/util.sh @@ -1025,7 +1025,7 @@ EOF # cat the Kubelet config yaml for masters function print-master-kubelet-config { cat <