diff --git a/test/e2e/apimachinery/garbage_collector.go b/test/e2e/apimachinery/garbage_collector.go index 6d0edee238e..4bfe7ba1864 100644 --- a/test/e2e/apimachinery/garbage_collector.go +++ b/test/e2e/apimachinery/garbage_collector.go @@ -254,7 +254,7 @@ func verifyRemainingObjects(f *framework.Framework, objects map[string]int) (boo func gatherMetrics(f *framework.Framework) { ginkgo.By("Gathering metrics") var summary framework.TestDataSummary - grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, false, false, true, false, false, false) + grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, f.ClientConfig(), false, false, true, false, false, false) if err != nil { framework.Logf("Failed to create MetricsGrabber. Skipping metrics gathering.") } else { diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go index 28f72057122..9b826b06112 100644 --- a/test/e2e/e2e.go +++ b/test/e2e/e2e.go @@ -46,7 +46,6 @@ import ( "k8s.io/kubernetes/test/e2e/framework" e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl" e2emanifest "k8s.io/kubernetes/test/e2e/framework/manifest" - "k8s.io/kubernetes/test/e2e/framework/metrics" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2ereporters "k8s.io/kubernetes/test/e2e/reporters" @@ -308,11 +307,6 @@ func setupSuite() { nodeKiller := framework.NewNodeKiller(framework.TestContext.NodeKiller, c, framework.TestContext.Provider) go nodeKiller.Run(framework.TestContext.NodeKiller.NodeKillerStopCh) } - - err = metrics.SetupMetricsProxy(c) - if err != nil { - framework.Logf("Fail to setup metrics proxy: %v", err) - } } // logClusterImageSources writes out cluster image sources. diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go index 143ded1781c..5d72e89f792 100644 --- a/test/e2e/framework/framework.go +++ b/test/e2e/framework/framework.go @@ -296,7 +296,7 @@ func (f *Framework) BeforeEach() { gatherMetricsAfterTest := TestContext.GatherMetricsAfterTest == "true" || TestContext.GatherMetricsAfterTest == "master" if gatherMetricsAfterTest && TestContext.IncludeClusterAutoscalerMetrics { - grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, !ProviderIs("kubemark"), false, false, false, TestContext.IncludeClusterAutoscalerMetrics, false) + grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, f.ClientConfig(), !ProviderIs("kubemark"), false, false, false, TestContext.IncludeClusterAutoscalerMetrics, false) if err != nil { Logf("Failed to create MetricsGrabber (skipping ClusterAutoscaler metrics gathering before test): %v", err) } else { @@ -449,7 +449,7 @@ func (f *Framework) AfterEach() { ginkgo.By("Gathering metrics") // Grab apiserver, scheduler, controller-manager metrics and (optionally) nodes' kubelet metrics. grabMetricsFromKubelets := TestContext.GatherMetricsAfterTest != "master" && !ProviderIs("kubemark") - grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, grabMetricsFromKubelets, true, true, true, TestContext.IncludeClusterAutoscalerMetrics, false) + grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, f.ClientConfig(), grabMetricsFromKubelets, true, true, true, TestContext.IncludeClusterAutoscalerMetrics, false) if err != nil { Logf("Failed to create MetricsGrabber (skipping metrics gathering): %v", err) } else { diff --git a/test/e2e/framework/metrics/kubelet_metrics.go b/test/e2e/framework/metrics/kubelet_metrics.go index a26eeed5604..78381493cfe 100644 --- a/test/e2e/framework/metrics/kubelet_metrics.go +++ b/test/e2e/framework/metrics/kubelet_metrics.go @@ -139,7 +139,7 @@ func getKubeletMetricsFromNode(c clientset.Interface, nodeName string) (KubeletM if c == nil { return GrabKubeletMetricsWithoutProxy(nodeName, "/metrics") } - grabber, err := NewMetricsGrabber(c, nil, true, false, false, false, false, false) + grabber, err := NewMetricsGrabber(c, nil, nil, true, false, false, false, false, false) if err != nil { return KubeletMetrics{}, err } diff --git a/test/e2e/framework/metrics/metrics_grabber.go b/test/e2e/framework/metrics/metrics_grabber.go index 6f795317420..7add8147523 100644 --- a/test/e2e/framework/metrics/metrics_grabber.go +++ b/test/e2e/framework/metrics/metrics_grabber.go @@ -18,7 +18,9 @@ package metrics import ( "context" + "errors" "fmt" + "net" "regexp" "sync" "time" @@ -27,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "k8s.io/klog/v2" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" @@ -37,7 +40,6 @@ const ( kubeSchedulerPort = 10259 // kubeControllerManagerPort is the default port for the controller manager status server. kubeControllerManagerPort = 10257 - metricsProxyPod = "metrics-proxy" // snapshotControllerPort is the port for the snapshot controller snapshotControllerPort = 9102 ) @@ -56,6 +58,7 @@ type Collection struct { type Grabber struct { client clientset.Interface externalClient clientset.Interface + config *rest.Config grabFromAPIServer bool grabFromControllerManager bool grabFromKubelets bool @@ -71,7 +74,7 @@ type Grabber struct { } // NewMetricsGrabber returns new metrics which are initialized. -func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, kubelets bool, scheduler bool, controllers bool, apiServer bool, clusterAutoscaler bool, snapshotController bool) (*Grabber, error) { +func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, config *rest.Config, kubelets bool, scheduler bool, controllers bool, apiServer bool, clusterAutoscaler bool, snapshotController bool) (*Grabber, error) { kubeScheduler := "" kubeControllerManager := "" @@ -81,6 +84,10 @@ func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, kubelets b regKubeControllerManager := regexp.MustCompile("kube-controller-manager-.*") regSnapshotController := regexp.MustCompile("volume-snapshot-controller.*") + if (scheduler || controllers) && config == nil { + return nil, errors.New("a rest config is required for grabbing kube-controller and kube-controller-manager metrics") + } + podList, err := c.CoreV1().Pods(metav1.NamespaceSystem).List(context.TODO(), metav1.ListOptions{}) if err != nil { return nil, err @@ -121,6 +128,7 @@ func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, kubelets b return &Grabber{ client: c, externalClient: ec, + config: config, grabFromAPIServer: apiServer, grabFromControllerManager: controllers, grabFromKubelets: kubelets, @@ -173,7 +181,7 @@ func (g *Grabber) GrabFromScheduler() (SchedulerMetrics, error) { g.waitForSchedulerReadyOnce.Do(func() { var lastMetricsFetchErr error if metricsWaitErr := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) { - output, lastMetricsFetchErr = g.getMetricsFromPod(g.client, metricsProxyPod, metav1.NamespaceSystem, kubeSchedulerPort) + output, lastMetricsFetchErr = g.getSecureMetricsFromPod(g.kubeScheduler, metav1.NamespaceSystem, kubeSchedulerPort) return lastMetricsFetchErr == nil, nil }); metricsWaitErr != nil { err = fmt.Errorf("error waiting for scheduler pod to expose metrics: %v; %v", metricsWaitErr, lastMetricsFetchErr) @@ -224,7 +232,7 @@ func (g *Grabber) GrabFromControllerManager() (ControllerManagerMetrics, error) var lastMetricsFetchErr error if metricsWaitErr := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) { - output, lastMetricsFetchErr = g.getMetricsFromPod(g.client, metricsProxyPod, metav1.NamespaceSystem, kubeControllerManagerPort) + output, lastMetricsFetchErr = g.getSecureMetricsFromPod(g.kubeControllerManager, metav1.NamespaceSystem, kubeControllerManagerPort) return lastMetricsFetchErr == nil, nil }); metricsWaitErr != nil { err = fmt.Errorf("error waiting for controller manager pod to expose metrics: %v; %v", metricsWaitErr, lastMetricsFetchErr) @@ -354,6 +362,7 @@ func (g *Grabber) Grab() (Collection, error) { return result, nil } +// getMetricsFromPod retrieves metrics data from an insecure port. func (g *Grabber) getMetricsFromPod(client clientset.Interface, podName string, namespace string, port int) (string, error) { rawOutput, err := client.CoreV1().RESTClient().Get(). Namespace(namespace). @@ -367,3 +376,50 @@ func (g *Grabber) getMetricsFromPod(client clientset.Interface, podName string, } return string(rawOutput), nil } + +// getSecureMetricsFromPod retrieves metrics from a pod that uses TLS +// and checks client credentials. Conceptually this function is +// similar to "kubectl port-forward" + "kubectl get --raw +// https://localhost:/metrics". It uses the same credentials +// as kubelet. +func (g *Grabber) getSecureMetricsFromPod(podName string, namespace string, port int) (string, error) { + dialer := e2epod.NewDialer(g.client, g.config) + metricConfig := rest.CopyConfig(g.config) + addr := e2epod.Addr{ + Namespace: namespace, + PodName: podName, + Port: port, + } + metricConfig.Dial = func(ctx context.Context, network, address string) (net.Conn, error) { + return dialer.DialContainerPort(ctx, addr) + } + // This should make it possible verify the server, but while it + // got past the server name check, certificate validation + // still failed. + metricConfig.Host = addr.String() + metricConfig.ServerName = "localhost" + // Verifying the pod certificate with the same root CA + // as for the API server led to an error about "unknown root + // certificate". Disabling certificate checking on the client + // side gets around that and should be good enough for + // E2E testing. + metricConfig.Insecure = true + metricConfig.CAFile = "" + metricConfig.CAData = nil + + // clientset.NewForConfig is used because + // metricClient.RESTClient() is directly usable, in contrast + // to the client constructed by rest.RESTClientFor(). + metricClient, err := clientset.NewForConfig(metricConfig) + if err != nil { + return "", err + } + + rawOutput, err := metricClient.RESTClient().Get(). + AbsPath("metrics"). + Do(context.TODO()).Raw() + if err != nil { + return "", err + } + return string(rawOutput), nil +} diff --git a/test/e2e/framework/metrics/metrics_proxy.go b/test/e2e/framework/metrics/metrics_proxy.go deleted file mode 100644 index afd04db61ab..00000000000 --- a/test/e2e/framework/metrics/metrics_proxy.go +++ /dev/null @@ -1,214 +0,0 @@ -/* -Copyright 2021 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package metrics - -import ( - "context" - "fmt" - "strings" - "time" - - v1 "k8s.io/api/core/v1" - rbacv1 "k8s.io/api/rbac/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - clientset "k8s.io/client-go/kubernetes" - "k8s.io/klog/v2" - - e2epod "k8s.io/kubernetes/test/e2e/framework/pod" - imageutils "k8s.io/kubernetes/test/utils/image" -) - -type componentInfo struct { - Name string - Port int - IP string -} - -// SetupMetricsProxy creates a nginx Pod to expose metrics from the secure port of kube-scheduler and kube-controller-manager in tests. -func SetupMetricsProxy(c clientset.Interface) error { - var infos []componentInfo - // The component pods might take some time to show up. - err := wait.PollImmediate(time.Second*5, time.Minute*5, func() (bool, error) { - podList, err := c.CoreV1().Pods(metav1.NamespaceSystem).List(context.TODO(), metav1.ListOptions{}) - if err != nil { - return false, fmt.Errorf("list pods in ns %s: %w", metav1.NamespaceSystem, err) - } - var foundComponents []componentInfo - for _, pod := range podList.Items { - if len(pod.Status.PodIP) == 0 { - continue - } - switch { - case strings.HasPrefix(pod.Name, "kube-scheduler-"): - foundComponents = append(foundComponents, componentInfo{ - Name: pod.Name, - Port: kubeSchedulerPort, - IP: pod.Status.PodIP, - }) - case strings.HasPrefix(pod.Name, "kube-controller-manager-"): - foundComponents = append(foundComponents, componentInfo{ - Name: pod.Name, - Port: kubeControllerManagerPort, - IP: pod.Status.PodIP, - }) - } - } - if len(foundComponents) != 2 { - klog.Infof("Only %d components found. Will retry.", len(foundComponents)) - klog.Infof("Found components: %v", foundComponents) - return false, nil - } - infos = foundComponents - return true, nil - }) - if err != nil { - return fmt.Errorf("missing component pods: %w", err) - } - - klog.Infof("Found components: %v", infos) - - const name = metricsProxyPod - _, err = c.CoreV1().ServiceAccounts(metav1.NamespaceSystem).Create(context.TODO(), &v1.ServiceAccount{ - ObjectMeta: metav1.ObjectMeta{Name: name}, - }, metav1.CreateOptions{}) - if err != nil { - return fmt.Errorf("create serviceAccount: %w", err) - } - _, err = c.RbacV1().ClusterRoles().Create(context.TODO(), &rbacv1.ClusterRole{ - ObjectMeta: metav1.ObjectMeta{Name: name}, - Rules: []rbacv1.PolicyRule{ - { - NonResourceURLs: []string{"/metrics"}, - Verbs: []string{"get"}, - }, - }, - }, metav1.CreateOptions{}) - if err != nil { - return fmt.Errorf("create clusterRole: %w", err) - } - _, err = c.RbacV1().ClusterRoleBindings().Create(context.TODO(), &rbacv1.ClusterRoleBinding{ - ObjectMeta: metav1.ObjectMeta{Name: name}, - Subjects: []rbacv1.Subject{ - { - Kind: rbacv1.ServiceAccountKind, - Name: name, - Namespace: metav1.NamespaceSystem, - }, - }, - RoleRef: rbacv1.RoleRef{ - APIGroup: "rbac.authorization.k8s.io", - Kind: "ClusterRole", - Name: name, - }, - }, metav1.CreateOptions{}) - if err != nil { - return fmt.Errorf("create clusterRoleBinding: %w", err) - } - - var token string - err = wait.PollImmediate(time.Second*5, time.Minute*5, func() (done bool, err error) { - sa, err := c.CoreV1().ServiceAccounts(metav1.NamespaceSystem).Get(context.TODO(), name, metav1.GetOptions{}) - if err != nil { - klog.Warningf("Fail to get serviceAccount %s: %v", name, err) - return false, nil - } - if len(sa.Secrets) < 1 { - klog.Warningf("No secret found in serviceAccount %s", name) - return false, nil - } - secretRef := sa.Secrets[0] - secret, err := c.CoreV1().Secrets(metav1.NamespaceSystem).Get(context.TODO(), secretRef.Name, metav1.GetOptions{}) - if err != nil { - klog.Warningf("Fail to get secret %s", secretRef.Name) - return false, nil - } - token = string(secret.Data["token"]) - if len(token) == 0 { - klog.Warningf("Token in secret %s is empty", secretRef.Name) - return false, nil - } - return true, nil - }) - if err != nil { - return err - } - - var nginxConfig string - for _, info := range infos { - nginxConfig += fmt.Sprintf(` -server { - listen %d; - server_name _; - proxy_set_header Authorization "Bearer %s"; - proxy_ssl_verify off; - location /metrics { - proxy_pass https://%s:%d; - } -} -`, info.Port, token, info.IP, info.Port) - } - _, err = c.CoreV1().ConfigMaps(metav1.NamespaceSystem).Create(context.TODO(), &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: metav1.NamespaceSystem, - }, - Data: map[string]string{ - "metrics.conf": nginxConfig, - }, - }, metav1.CreateOptions{}) - if err != nil { - return fmt.Errorf("create nginx configmap: %w", err) - } - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: metav1.NamespaceSystem, - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{{ - Name: "nginx", - Image: imageutils.GetE2EImage(imageutils.Nginx), - VolumeMounts: []v1.VolumeMount{{ - Name: "config", - MountPath: "/etc/nginx/conf.d", - ReadOnly: true, - }}, - }}, - Volumes: []v1.Volume{{ - Name: "config", - VolumeSource: v1.VolumeSource{ - ConfigMap: &v1.ConfigMapVolumeSource{ - LocalObjectReference: v1.LocalObjectReference{ - Name: name, - }, - }, - }, - }}, - }, - } - _, err = c.CoreV1().Pods(metav1.NamespaceSystem).Create(context.TODO(), pod, metav1.CreateOptions{}) - if err != nil { - return err - } - err = e2epod.WaitForPodNameRunningInNamespace(c, name, metav1.NamespaceSystem) - if err != nil { - return err - } - klog.Info("Successfully setup metrics-proxy") - return nil -} diff --git a/test/e2e/framework/pod/dial.go b/test/e2e/framework/pod/dial.go new file mode 100644 index 00000000000..d0ae2880acf --- /dev/null +++ b/test/e2e/framework/pod/dial.go @@ -0,0 +1,215 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "context" + "errors" + "fmt" + "io/ioutil" + "net" + "net/http" + "regexp" + "strconv" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/httpstream" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/portforward" + "k8s.io/client-go/transport/spdy" + "k8s.io/klog/v2" +) + +// NewTransport creates a transport which uses the port forward dialer. +// URLs must use .: as host. +func NewTransport(client kubernetes.Interface, restConfig *rest.Config) *http.Transport { + return &http.Transport{ + DialContext: func(ctx context.Context, _, addr string) (net.Conn, error) { + dialer := NewDialer(client, restConfig) + a, err := ParseAddr(addr) + if err != nil { + return nil, err + } + return dialer.DialContainerPort(ctx, *a) + }, + } +} + +// NewDialer creates a dialer that supports connecting to container ports. +func NewDialer(client kubernetes.Interface, restConfig *rest.Config) *Dialer { + return &Dialer{ + client: client, + restConfig: restConfig, + } +} + +// Dialer holds the relevant parameters that are independent of a particular connection. +type Dialer struct { + client kubernetes.Interface + restConfig *rest.Config +} + +// DialContainerPort connects to a certain container port in a pod. +func (d *Dialer) DialContainerPort(ctx context.Context, addr Addr) (conn net.Conn, finalErr error) { + restClient := d.client.CoreV1().RESTClient() + restConfig := d.restConfig + if restConfig.GroupVersion == nil { + restConfig.GroupVersion = &schema.GroupVersion{} + } + if restConfig.NegotiatedSerializer == nil { + restConfig.NegotiatedSerializer = scheme.Codecs + } + + // The setup code around the actual portforward is from + // https://github.com/kubernetes/kubernetes/blob/c652ffbe4a29143623a1aaec39f745575f7e43ad/staging/src/k8s.io/kubectl/pkg/cmd/portforward/portforward.go + req := restClient.Post(). + Resource("pods"). + Namespace(addr.Namespace). + Name(addr.PodName). + SubResource("portforward") + transport, upgrader, err := spdy.RoundTripperFor(restConfig) + if err != nil { + return nil, fmt.Errorf("create round tripper: %v", err) + } + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", req.URL()) + + streamConn, _, err := dialer.Dial(portforward.PortForwardProtocolV1Name) + if err != nil { + return nil, fmt.Errorf("dialer failed: %v", err) + } + requestID := "1" + defer func() { + if finalErr != nil { + streamConn.Close() + } + }() + + // create error stream + headers := http.Header{} + headers.Set(v1.StreamType, v1.StreamTypeError) + headers.Set(v1.PortHeader, fmt.Sprintf("%d", addr.Port)) + headers.Set(v1.PortForwardRequestIDHeader, requestID) + + // We're not writing to this stream, just reading an error message from it. + // This happens asynchronously. + errorStream, err := streamConn.CreateStream(headers) + if err != nil { + return nil, fmt.Errorf("error creating error stream: %v", err) + } + errorStream.Close() + go func() { + message, err := ioutil.ReadAll(errorStream) + switch { + case err != nil: + klog.ErrorS(err, "error reading from error stream") + case len(message) > 0: + klog.ErrorS(errors.New(string(message)), "an error occurred connecting to the remote port") + } + }() + + // create data stream + headers.Set(v1.StreamType, v1.StreamTypeData) + dataStream, err := streamConn.CreateStream(headers) + if err != nil { + return nil, fmt.Errorf("error creating data stream: %v", err) + } + + return &stream{ + Stream: dataStream, + streamConn: streamConn, + }, nil +} + +// Addr contains all relevant parameters for a certain port in a pod. +// The container should be running before connections are attempted, +// otherwise the connection will fail. +type Addr struct { + Namespace, PodName string + Port int +} + +var _ net.Addr = Addr{} + +func (a Addr) Network() string { + return "port-forwarding" +} + +func (a Addr) String() string { + return fmt.Sprintf("%s.%s:%d", a.Namespace, a.PodName, a.Port) +} + +// ParseAddr expects a .: as produced +// by Addr.String. +func ParseAddr(addr string) (*Addr, error) { + parts := addrRegex.FindStringSubmatch(addr) + if parts == nil { + return nil, fmt.Errorf("%q: must match the format .:", addr) + } + port, _ := strconv.Atoi(parts[3]) + return &Addr{ + Namespace: parts[1], + PodName: parts[2], + Port: port, + }, nil +} + +var addrRegex = regexp.MustCompile(`^([^\.]+)\.([^:]+):(\d+)$`) + +type stream struct { + addr Addr + httpstream.Stream + streamConn httpstream.Connection +} + +var _ net.Conn = &stream{} + +func (s *stream) Close() error { + s.Stream.Close() + s.streamConn.Close() + return nil +} + +func (s *stream) LocalAddr() net.Addr { + return LocalAddr{} +} + +func (s *stream) RemoteAddr() net.Addr { + return s.addr +} + +func (s *stream) SetDeadline(t time.Time) error { + return nil +} + +func (s *stream) SetReadDeadline(t time.Time) error { + return nil +} + +func (s *stream) SetWriteDeadline(t time.Time) error { + return nil +} + +type LocalAddr struct{} + +var _ net.Addr = LocalAddr{} + +func (l LocalAddr) Network() string { return "port-forwarding" } +func (l LocalAddr) String() string { return "apiserver" } diff --git a/test/e2e/instrumentation/monitoring/metrics_grabber.go b/test/e2e/instrumentation/monitoring/metrics_grabber.go index 8ccd877f96e..0237f22f916 100644 --- a/test/e2e/instrumentation/monitoring/metrics_grabber.go +++ b/test/e2e/instrumentation/monitoring/metrics_grabber.go @@ -51,7 +51,7 @@ var _ = instrumentation.SIGDescribe("MetricsGrabber", func() { } } gomega.Eventually(func() error { - grabber, err = e2emetrics.NewMetricsGrabber(c, ec, true, true, true, true, true, true) + grabber, err = e2emetrics.NewMetricsGrabber(c, ec, f.ClientConfig(), true, true, true, true, true, true) if err != nil { return fmt.Errorf("failed to create metrics grabber: %v", err) } diff --git a/test/e2e/storage/csi_mock_volume.go b/test/e2e/storage/csi_mock_volume.go index 161a238b4e5..3cc92a896f1 100644 --- a/test/e2e/storage/csi_mock_volume.go +++ b/test/e2e/storage/csi_mock_volume.go @@ -1653,7 +1653,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { } defer cleanup() - metricsGrabber, err := e2emetrics.NewMetricsGrabber(m.config.Framework.ClientSet, nil, false, false, false, false, false, true) + metricsGrabber, err := e2emetrics.NewMetricsGrabber(m.config.Framework.ClientSet, nil, f.ClientConfig(), false, false, false, false, false, true) if err != nil { framework.Failf("Error creating metrics grabber : %v", err) } diff --git a/test/e2e/storage/testsuites/base.go b/test/e2e/storage/testsuites/base.go index 9a8f58d3f32..d5283a103cc 100644 --- a/test/e2e/storage/testsuites/base.go +++ b/test/e2e/storage/testsuites/base.go @@ -24,6 +24,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "k8s.io/component-base/metrics/testutil" csitrans "k8s.io/csi-translation-lib" "k8s.io/kubernetes/test/e2e/framework" @@ -43,6 +44,7 @@ type opCounts map[string]int64 // migrationOpCheck validates migrated metrics. type migrationOpCheck struct { cs clientset.Interface + config *rest.Config pluginName string skipCheck bool @@ -100,14 +102,14 @@ func getVolumeOpsFromMetricsForPlugin(ms testutil.Metrics, pluginName string) op return totOps } -func getVolumeOpCounts(c clientset.Interface, pluginName string) opCounts { +func getVolumeOpCounts(c clientset.Interface, config *rest.Config, pluginName string) opCounts { if !framework.ProviderIs("gce", "gke", "aws") { return opCounts{} } nodeLimit := 25 - metricsGrabber, err := e2emetrics.NewMetricsGrabber(c, nil, true, false, true, false, false, false) + metricsGrabber, err := e2emetrics.NewMetricsGrabber(c, nil, config, true, false, true, false, false, false) if err != nil { framework.ExpectNoError(err, "Error creating metrics grabber: %v", err) @@ -156,7 +158,7 @@ func addOpCounts(o1 opCounts, o2 opCounts) opCounts { return totOps } -func getMigrationVolumeOpCounts(cs clientset.Interface, pluginName string) (opCounts, opCounts) { +func getMigrationVolumeOpCounts(cs clientset.Interface, config *rest.Config, pluginName string) (opCounts, opCounts) { if len(pluginName) > 0 { var migratedOps opCounts l := csitrans.New() @@ -166,18 +168,19 @@ func getMigrationVolumeOpCounts(cs clientset.Interface, pluginName string) (opCo migratedOps = opCounts{} } else { csiName = "kubernetes.io/csi:" + csiName - migratedOps = getVolumeOpCounts(cs, csiName) + migratedOps = getVolumeOpCounts(cs, config, csiName) } - return getVolumeOpCounts(cs, pluginName), migratedOps + return getVolumeOpCounts(cs, config, pluginName), migratedOps } // Not an in-tree driver framework.Logf("Test running for native CSI Driver, not checking metrics") return opCounts{}, opCounts{} } -func newMigrationOpCheck(cs clientset.Interface, pluginName string) *migrationOpCheck { +func newMigrationOpCheck(cs clientset.Interface, config *rest.Config, pluginName string) *migrationOpCheck { moc := migrationOpCheck{ cs: cs, + config: config, pluginName: pluginName, } if len(pluginName) == 0 { @@ -214,7 +217,7 @@ func newMigrationOpCheck(cs clientset.Interface, pluginName string) *migrationOp return &moc } - moc.oldInTreeOps, moc.oldMigratedOps = getMigrationVolumeOpCounts(cs, pluginName) + moc.oldInTreeOps, moc.oldMigratedOps = getMigrationVolumeOpCounts(cs, config, pluginName) return &moc } @@ -223,7 +226,7 @@ func (moc *migrationOpCheck) validateMigrationVolumeOpCounts() { return } - newInTreeOps, _ := getMigrationVolumeOpCounts(moc.cs, moc.pluginName) + newInTreeOps, _ := getMigrationVolumeOpCounts(moc.cs, moc.config, moc.pluginName) for op, count := range newInTreeOps { if count != moc.oldInTreeOps[op] { diff --git a/test/e2e/storage/testsuites/multivolume.go b/test/e2e/storage/testsuites/multivolume.go index eedb7c9c25a..f1c0a190fac 100644 --- a/test/e2e/storage/testsuites/multivolume.go +++ b/test/e2e/storage/testsuites/multivolume.go @@ -112,7 +112,7 @@ func (t *multiVolumeTestSuite) DefineTests(driver storageframework.TestDriver, p // Now do the more expensive test initialization. l.config, l.driverCleanup = driver.PrepareTest(f) - l.migrationCheck = newMigrationOpCheck(f.ClientSet, dInfo.InTreePluginName) + l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName) } cleanup := func() { diff --git a/test/e2e/storage/testsuites/provisioning.go b/test/e2e/storage/testsuites/provisioning.go index 5283cbdee71..8897b6ed558 100644 --- a/test/e2e/storage/testsuites/provisioning.go +++ b/test/e2e/storage/testsuites/provisioning.go @@ -134,7 +134,7 @@ func (p *provisioningTestSuite) DefineTests(driver storageframework.TestDriver, dDriver, _ = driver.(storageframework.DynamicPVTestDriver) // Now do the more expensive test initialization. l.config, l.driverCleanup = driver.PrepareTest(f) - l.migrationCheck = newMigrationOpCheck(f.ClientSet, dInfo.InTreePluginName) + l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName) l.cs = l.config.Framework.ClientSet testVolumeSizeRange := p.GetTestSuiteInfo().SupportedSizeRange driverVolumeSizeRange := dDriver.GetDriverInfo().SupportedSizeRange diff --git a/test/e2e/storage/testsuites/subpath.go b/test/e2e/storage/testsuites/subpath.go index bb853aa3383..b81512fdb44 100644 --- a/test/e2e/storage/testsuites/subpath.go +++ b/test/e2e/storage/testsuites/subpath.go @@ -122,7 +122,7 @@ func (s *subPathTestSuite) DefineTests(driver storageframework.TestDriver, patte // Now do the more expensive test initialization. l.config, l.driverCleanup = driver.PrepareTest(f) - l.migrationCheck = newMigrationOpCheck(f.ClientSet, driver.GetDriverInfo().InTreePluginName) + l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), driver.GetDriverInfo().InTreePluginName) testVolumeSizeRange := s.GetTestSuiteInfo().SupportedSizeRange l.resource = storageframework.CreateVolumeResource(driver, l.config, pattern, testVolumeSizeRange) l.hostExec = utils.NewHostExec(f) diff --git a/test/e2e/storage/testsuites/topology.go b/test/e2e/storage/testsuites/topology.go index ac0f826f415..1d24ffdb6a0 100644 --- a/test/e2e/storage/testsuites/topology.go +++ b/test/e2e/storage/testsuites/topology.go @@ -148,7 +148,7 @@ func (t *topologyTestSuite) DefineTests(driver storageframework.TestDriver, patt StorageClassName: &(l.resource.Sc.Name), }, l.config.Framework.Namespace.Name) - l.migrationCheck = newMigrationOpCheck(f.ClientSet, dInfo.InTreePluginName) + l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName) return l } diff --git a/test/e2e/storage/testsuites/volume_expand.go b/test/e2e/storage/testsuites/volume_expand.go index 6172c305f3a..704f4bd2aa3 100644 --- a/test/e2e/storage/testsuites/volume_expand.go +++ b/test/e2e/storage/testsuites/volume_expand.go @@ -121,7 +121,7 @@ func (v *volumeExpandTestSuite) DefineTests(driver storageframework.TestDriver, // Now do the more expensive test initialization. l.config, l.driverCleanup = driver.PrepareTest(f) - l.migrationCheck = newMigrationOpCheck(f.ClientSet, driver.GetDriverInfo().InTreePluginName) + l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), driver.GetDriverInfo().InTreePluginName) testVolumeSizeRange := v.GetTestSuiteInfo().SupportedSizeRange l.resource = storageframework.CreateVolumeResource(driver, l.config, pattern, testVolumeSizeRange) } diff --git a/test/e2e/storage/testsuites/volume_io.go b/test/e2e/storage/testsuites/volume_io.go index 23aa911e2de..ad4c627d66f 100644 --- a/test/e2e/storage/testsuites/volume_io.go +++ b/test/e2e/storage/testsuites/volume_io.go @@ -117,7 +117,7 @@ func (t *volumeIOTestSuite) DefineTests(driver storageframework.TestDriver, patt // Now do the more expensive test initialization. l.config, l.driverCleanup = driver.PrepareTest(f) - l.migrationCheck = newMigrationOpCheck(f.ClientSet, dInfo.InTreePluginName) + l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName) testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange l.resource = storageframework.CreateVolumeResource(driver, l.config, pattern, testVolumeSizeRange) diff --git a/test/e2e/storage/testsuites/volume_stress.go b/test/e2e/storage/testsuites/volume_stress.go index 5741f50fe9a..289c9d154de 100644 --- a/test/e2e/storage/testsuites/volume_stress.go +++ b/test/e2e/storage/testsuites/volume_stress.go @@ -120,7 +120,7 @@ func (t *volumeStressTestSuite) DefineTests(driver storageframework.TestDriver, // Now do the more expensive test initialization. l.config, l.driverCleanup = driver.PrepareTest(f) - l.migrationCheck = newMigrationOpCheck(f.ClientSet, dInfo.InTreePluginName) + l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName) l.volumes = []*storageframework.VolumeResource{} l.pods = []*v1.Pod{} l.testOptions = *dInfo.StressTestOptions diff --git a/test/e2e/storage/testsuites/volumemode.go b/test/e2e/storage/testsuites/volumemode.go index 48df518457c..65115aa3679 100644 --- a/test/e2e/storage/testsuites/volumemode.go +++ b/test/e2e/storage/testsuites/volumemode.go @@ -115,7 +115,7 @@ func (t *volumeModeTestSuite) DefineTests(driver storageframework.TestDriver, pa // Now do the more expensive test initialization. l.config, l.driverCleanup = driver.PrepareTest(f) - l.migrationCheck = newMigrationOpCheck(f.ClientSet, dInfo.InTreePluginName) + l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName) } // manualInit initializes l.VolumeResource without creating the PV & PVC objects. diff --git a/test/e2e/storage/testsuites/volumes.go b/test/e2e/storage/testsuites/volumes.go index ab9799497f5..cef61af2c3e 100644 --- a/test/e2e/storage/testsuites/volumes.go +++ b/test/e2e/storage/testsuites/volumes.go @@ -135,7 +135,7 @@ func (t *volumesTestSuite) DefineTests(driver storageframework.TestDriver, patte // Now do the more expensive test initialization. l.config, l.driverCleanup = driver.PrepareTest(f) - l.migrationCheck = newMigrationOpCheck(f.ClientSet, dInfo.InTreePluginName) + l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName) testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange l.resource = storageframework.CreateVolumeResource(driver, l.config, pattern, testVolumeSizeRange) if l.resource.VolSource == nil { diff --git a/test/e2e/storage/volume_metrics.go b/test/e2e/storage/volume_metrics.go index 2576b6a975d..eed1f365fb6 100644 --- a/test/e2e/storage/volume_metrics.go +++ b/test/e2e/storage/volume_metrics.go @@ -81,7 +81,7 @@ var _ = utils.SIGDescribe("[Serial] Volume metrics", func() { VolumeMode: &blockMode, }, ns) - metricsGrabber, err = e2emetrics.NewMetricsGrabber(c, nil, true, false, true, false, false, false) + metricsGrabber, err = e2emetrics.NewMetricsGrabber(c, nil, f.ClientConfig(), true, false, true, false, false, false) if err != nil { framework.Failf("Error creating metrics grabber : %v", err) diff --git a/test/e2e/suites.go b/test/e2e/suites.go index 790a58978d6..d8bf0f8bd88 100644 --- a/test/e2e/suites.go +++ b/test/e2e/suites.go @@ -22,6 +22,7 @@ import ( "path" "time" + clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/test/e2e/framework" e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics" ) @@ -55,13 +56,17 @@ func AfterSuiteActions() { func gatherTestSuiteMetrics() error { framework.Logf("Gathering metrics") - c, err := framework.LoadClientset() + config, err := framework.LoadConfig() if err != nil { - return fmt.Errorf("error loading client: %v", err) + return fmt.Errorf("error loading client config: %v", err) + } + c, err := clientset.NewForConfig(config) + if err != nil { + return fmt.Errorf("error creating client: %v", err) } // Grab metrics for apiserver, scheduler, controller-manager, kubelet (for non-kubemark case) and cluster autoscaler (optionally). - grabber, err := e2emetrics.NewMetricsGrabber(c, nil, !framework.ProviderIs("kubemark"), true, true, true, framework.TestContext.IncludeClusterAutoscalerMetrics, false) + grabber, err := e2emetrics.NewMetricsGrabber(c, nil, config, !framework.ProviderIs("kubemark"), true, true, true, framework.TestContext.IncludeClusterAutoscalerMetrics, false) if err != nil { return fmt.Errorf("failed to create MetricsGrabber: %v", err) }