e2e: grab controller and scheduler metrics via port forwarding

The previous approach with grabbing via a nginx proxy had some
drawbacks:
- it did not work when the pods only listened on localhost (as
  configured by kubeadm) and the proxy got deployed on a different
  node
- starting the proxy raced with starting the pods, causing
  sporadic test failures because the proxy was not set up
  properly unless it saw all pods when starting the e2e.test
- the proxy was always started, whether it is needed or not
- the proxy was left running after a test and then the next
  test run triggered potentially confusing messages when
  it failed to create objects for the proxy

The new approach is similar to "kubectl port-forward" + "kubectl get
--raw". It uses the port forwarding feature to establish a TCP
connection via a custom dialer, then lets client-go handle TLS and
credentials.

Somehow verifying the server certificate did not work. As this
shouldn't be a big concern for E2E testing, certificate checking gets
disabled on the client side instead of investigating this further.
This commit is contained in:
Patrick Ohly 2021-05-17 09:20:11 +02:00
parent 71d6a48d3e
commit 5e9076da93
21 changed files with 310 additions and 251 deletions

View File

@ -254,7 +254,7 @@ func verifyRemainingObjects(f *framework.Framework, objects map[string]int) (boo
func gatherMetrics(f *framework.Framework) { func gatherMetrics(f *framework.Framework) {
ginkgo.By("Gathering metrics") ginkgo.By("Gathering metrics")
var summary framework.TestDataSummary var summary framework.TestDataSummary
grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, false, false, true, false, false, false) grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, f.ClientConfig(), false, false, true, false, false, false)
if err != nil { if err != nil {
framework.Logf("Failed to create MetricsGrabber. Skipping metrics gathering.") framework.Logf("Failed to create MetricsGrabber. Skipping metrics gathering.")
} else { } else {

View File

@ -46,7 +46,6 @@ import (
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl" e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
e2emanifest "k8s.io/kubernetes/test/e2e/framework/manifest" e2emanifest "k8s.io/kubernetes/test/e2e/framework/manifest"
"k8s.io/kubernetes/test/e2e/framework/metrics"
e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2ereporters "k8s.io/kubernetes/test/e2e/reporters" e2ereporters "k8s.io/kubernetes/test/e2e/reporters"
@ -308,11 +307,6 @@ func setupSuite() {
nodeKiller := framework.NewNodeKiller(framework.TestContext.NodeKiller, c, framework.TestContext.Provider) nodeKiller := framework.NewNodeKiller(framework.TestContext.NodeKiller, c, framework.TestContext.Provider)
go nodeKiller.Run(framework.TestContext.NodeKiller.NodeKillerStopCh) go nodeKiller.Run(framework.TestContext.NodeKiller.NodeKillerStopCh)
} }
err = metrics.SetupMetricsProxy(c)
if err != nil {
framework.Logf("Fail to setup metrics proxy: %v", err)
}
} }
// logClusterImageSources writes out cluster image sources. // logClusterImageSources writes out cluster image sources.

View File

@ -296,7 +296,7 @@ func (f *Framework) BeforeEach() {
gatherMetricsAfterTest := TestContext.GatherMetricsAfterTest == "true" || TestContext.GatherMetricsAfterTest == "master" gatherMetricsAfterTest := TestContext.GatherMetricsAfterTest == "true" || TestContext.GatherMetricsAfterTest == "master"
if gatherMetricsAfterTest && TestContext.IncludeClusterAutoscalerMetrics { if gatherMetricsAfterTest && TestContext.IncludeClusterAutoscalerMetrics {
grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, !ProviderIs("kubemark"), false, false, false, TestContext.IncludeClusterAutoscalerMetrics, false) grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, f.ClientConfig(), !ProviderIs("kubemark"), false, false, false, TestContext.IncludeClusterAutoscalerMetrics, false)
if err != nil { if err != nil {
Logf("Failed to create MetricsGrabber (skipping ClusterAutoscaler metrics gathering before test): %v", err) Logf("Failed to create MetricsGrabber (skipping ClusterAutoscaler metrics gathering before test): %v", err)
} else { } else {
@ -449,7 +449,7 @@ func (f *Framework) AfterEach() {
ginkgo.By("Gathering metrics") ginkgo.By("Gathering metrics")
// Grab apiserver, scheduler, controller-manager metrics and (optionally) nodes' kubelet metrics. // Grab apiserver, scheduler, controller-manager metrics and (optionally) nodes' kubelet metrics.
grabMetricsFromKubelets := TestContext.GatherMetricsAfterTest != "master" && !ProviderIs("kubemark") grabMetricsFromKubelets := TestContext.GatherMetricsAfterTest != "master" && !ProviderIs("kubemark")
grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, grabMetricsFromKubelets, true, true, true, TestContext.IncludeClusterAutoscalerMetrics, false) grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, f.ClientConfig(), grabMetricsFromKubelets, true, true, true, TestContext.IncludeClusterAutoscalerMetrics, false)
if err != nil { if err != nil {
Logf("Failed to create MetricsGrabber (skipping metrics gathering): %v", err) Logf("Failed to create MetricsGrabber (skipping metrics gathering): %v", err)
} else { } else {

View File

@ -139,7 +139,7 @@ func getKubeletMetricsFromNode(c clientset.Interface, nodeName string) (KubeletM
if c == nil { if c == nil {
return GrabKubeletMetricsWithoutProxy(nodeName, "/metrics") return GrabKubeletMetricsWithoutProxy(nodeName, "/metrics")
} }
grabber, err := NewMetricsGrabber(c, nil, true, false, false, false, false, false) grabber, err := NewMetricsGrabber(c, nil, nil, true, false, false, false, false, false)
if err != nil { if err != nil {
return KubeletMetrics{}, err return KubeletMetrics{}, err
} }

View File

@ -18,7 +18,9 @@ package metrics
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"net"
"regexp" "regexp"
"sync" "sync"
"time" "time"
@ -27,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2" "k8s.io/klog/v2"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
@ -37,7 +40,6 @@ const (
kubeSchedulerPort = 10259 kubeSchedulerPort = 10259
// kubeControllerManagerPort is the default port for the controller manager status server. // kubeControllerManagerPort is the default port for the controller manager status server.
kubeControllerManagerPort = 10257 kubeControllerManagerPort = 10257
metricsProxyPod = "metrics-proxy"
// snapshotControllerPort is the port for the snapshot controller // snapshotControllerPort is the port for the snapshot controller
snapshotControllerPort = 9102 snapshotControllerPort = 9102
) )
@ -56,6 +58,7 @@ type Collection struct {
type Grabber struct { type Grabber struct {
client clientset.Interface client clientset.Interface
externalClient clientset.Interface externalClient clientset.Interface
config *rest.Config
grabFromAPIServer bool grabFromAPIServer bool
grabFromControllerManager bool grabFromControllerManager bool
grabFromKubelets bool grabFromKubelets bool
@ -71,7 +74,7 @@ type Grabber struct {
} }
// NewMetricsGrabber returns new metrics which are initialized. // NewMetricsGrabber returns new metrics which are initialized.
func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, kubelets bool, scheduler bool, controllers bool, apiServer bool, clusterAutoscaler bool, snapshotController bool) (*Grabber, error) { func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, config *rest.Config, kubelets bool, scheduler bool, controllers bool, apiServer bool, clusterAutoscaler bool, snapshotController bool) (*Grabber, error) {
kubeScheduler := "" kubeScheduler := ""
kubeControllerManager := "" kubeControllerManager := ""
@ -81,6 +84,10 @@ func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, kubelets b
regKubeControllerManager := regexp.MustCompile("kube-controller-manager-.*") regKubeControllerManager := regexp.MustCompile("kube-controller-manager-.*")
regSnapshotController := regexp.MustCompile("volume-snapshot-controller.*") regSnapshotController := regexp.MustCompile("volume-snapshot-controller.*")
if (scheduler || controllers) && config == nil {
return nil, errors.New("a rest config is required for grabbing kube-controller and kube-controller-manager metrics")
}
podList, err := c.CoreV1().Pods(metav1.NamespaceSystem).List(context.TODO(), metav1.ListOptions{}) podList, err := c.CoreV1().Pods(metav1.NamespaceSystem).List(context.TODO(), metav1.ListOptions{})
if err != nil { if err != nil {
return nil, err return nil, err
@ -121,6 +128,7 @@ func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, kubelets b
return &Grabber{ return &Grabber{
client: c, client: c,
externalClient: ec, externalClient: ec,
config: config,
grabFromAPIServer: apiServer, grabFromAPIServer: apiServer,
grabFromControllerManager: controllers, grabFromControllerManager: controllers,
grabFromKubelets: kubelets, grabFromKubelets: kubelets,
@ -173,7 +181,7 @@ func (g *Grabber) GrabFromScheduler() (SchedulerMetrics, error) {
g.waitForSchedulerReadyOnce.Do(func() { g.waitForSchedulerReadyOnce.Do(func() {
var lastMetricsFetchErr error var lastMetricsFetchErr error
if metricsWaitErr := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) { if metricsWaitErr := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
output, lastMetricsFetchErr = g.getMetricsFromPod(g.client, metricsProxyPod, metav1.NamespaceSystem, kubeSchedulerPort) output, lastMetricsFetchErr = g.getSecureMetricsFromPod(g.kubeScheduler, metav1.NamespaceSystem, kubeSchedulerPort)
return lastMetricsFetchErr == nil, nil return lastMetricsFetchErr == nil, nil
}); metricsWaitErr != nil { }); metricsWaitErr != nil {
err = fmt.Errorf("error waiting for scheduler pod to expose metrics: %v; %v", metricsWaitErr, lastMetricsFetchErr) err = fmt.Errorf("error waiting for scheduler pod to expose metrics: %v; %v", metricsWaitErr, lastMetricsFetchErr)
@ -224,7 +232,7 @@ func (g *Grabber) GrabFromControllerManager() (ControllerManagerMetrics, error)
var lastMetricsFetchErr error var lastMetricsFetchErr error
if metricsWaitErr := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) { if metricsWaitErr := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
output, lastMetricsFetchErr = g.getMetricsFromPod(g.client, metricsProxyPod, metav1.NamespaceSystem, kubeControllerManagerPort) output, lastMetricsFetchErr = g.getSecureMetricsFromPod(g.kubeControllerManager, metav1.NamespaceSystem, kubeControllerManagerPort)
return lastMetricsFetchErr == nil, nil return lastMetricsFetchErr == nil, nil
}); metricsWaitErr != nil { }); metricsWaitErr != nil {
err = fmt.Errorf("error waiting for controller manager pod to expose metrics: %v; %v", metricsWaitErr, lastMetricsFetchErr) err = fmt.Errorf("error waiting for controller manager pod to expose metrics: %v; %v", metricsWaitErr, lastMetricsFetchErr)
@ -354,6 +362,7 @@ func (g *Grabber) Grab() (Collection, error) {
return result, nil return result, nil
} }
// getMetricsFromPod retrieves metrics data from an insecure port.
func (g *Grabber) getMetricsFromPod(client clientset.Interface, podName string, namespace string, port int) (string, error) { func (g *Grabber) getMetricsFromPod(client clientset.Interface, podName string, namespace string, port int) (string, error) {
rawOutput, err := client.CoreV1().RESTClient().Get(). rawOutput, err := client.CoreV1().RESTClient().Get().
Namespace(namespace). Namespace(namespace).
@ -367,3 +376,50 @@ func (g *Grabber) getMetricsFromPod(client clientset.Interface, podName string,
} }
return string(rawOutput), nil return string(rawOutput), nil
} }
// getSecureMetricsFromPod retrieves metrics from a pod that uses TLS
// and checks client credentials. Conceptually this function is
// similar to "kubectl port-forward" + "kubectl get --raw
// https://localhost:<port>/metrics". It uses the same credentials
// as kubelet.
func (g *Grabber) getSecureMetricsFromPod(podName string, namespace string, port int) (string, error) {
dialer := e2epod.NewDialer(g.client, g.config)
metricConfig := rest.CopyConfig(g.config)
addr := e2epod.Addr{
Namespace: namespace,
PodName: podName,
Port: port,
}
metricConfig.Dial = func(ctx context.Context, network, address string) (net.Conn, error) {
return dialer.DialContainerPort(ctx, addr)
}
// This should make it possible verify the server, but while it
// got past the server name check, certificate validation
// still failed.
metricConfig.Host = addr.String()
metricConfig.ServerName = "localhost"
// Verifying the pod certificate with the same root CA
// as for the API server led to an error about "unknown root
// certificate". Disabling certificate checking on the client
// side gets around that and should be good enough for
// E2E testing.
metricConfig.Insecure = true
metricConfig.CAFile = ""
metricConfig.CAData = nil
// clientset.NewForConfig is used because
// metricClient.RESTClient() is directly usable, in contrast
// to the client constructed by rest.RESTClientFor().
metricClient, err := clientset.NewForConfig(metricConfig)
if err != nil {
return "", err
}
rawOutput, err := metricClient.RESTClient().Get().
AbsPath("metrics").
Do(context.TODO()).Raw()
if err != nil {
return "", err
}
return string(rawOutput), nil
}

View File

@ -1,214 +0,0 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"context"
"fmt"
"strings"
"time"
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
imageutils "k8s.io/kubernetes/test/utils/image"
)
type componentInfo struct {
Name string
Port int
IP string
}
// SetupMetricsProxy creates a nginx Pod to expose metrics from the secure port of kube-scheduler and kube-controller-manager in tests.
func SetupMetricsProxy(c clientset.Interface) error {
var infos []componentInfo
// The component pods might take some time to show up.
err := wait.PollImmediate(time.Second*5, time.Minute*5, func() (bool, error) {
podList, err := c.CoreV1().Pods(metav1.NamespaceSystem).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return false, fmt.Errorf("list pods in ns %s: %w", metav1.NamespaceSystem, err)
}
var foundComponents []componentInfo
for _, pod := range podList.Items {
if len(pod.Status.PodIP) == 0 {
continue
}
switch {
case strings.HasPrefix(pod.Name, "kube-scheduler-"):
foundComponents = append(foundComponents, componentInfo{
Name: pod.Name,
Port: kubeSchedulerPort,
IP: pod.Status.PodIP,
})
case strings.HasPrefix(pod.Name, "kube-controller-manager-"):
foundComponents = append(foundComponents, componentInfo{
Name: pod.Name,
Port: kubeControllerManagerPort,
IP: pod.Status.PodIP,
})
}
}
if len(foundComponents) != 2 {
klog.Infof("Only %d components found. Will retry.", len(foundComponents))
klog.Infof("Found components: %v", foundComponents)
return false, nil
}
infos = foundComponents
return true, nil
})
if err != nil {
return fmt.Errorf("missing component pods: %w", err)
}
klog.Infof("Found components: %v", infos)
const name = metricsProxyPod
_, err = c.CoreV1().ServiceAccounts(metav1.NamespaceSystem).Create(context.TODO(), &v1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{Name: name},
}, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("create serviceAccount: %w", err)
}
_, err = c.RbacV1().ClusterRoles().Create(context.TODO(), &rbacv1.ClusterRole{
ObjectMeta: metav1.ObjectMeta{Name: name},
Rules: []rbacv1.PolicyRule{
{
NonResourceURLs: []string{"/metrics"},
Verbs: []string{"get"},
},
},
}, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("create clusterRole: %w", err)
}
_, err = c.RbacV1().ClusterRoleBindings().Create(context.TODO(), &rbacv1.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{Name: name},
Subjects: []rbacv1.Subject{
{
Kind: rbacv1.ServiceAccountKind,
Name: name,
Namespace: metav1.NamespaceSystem,
},
},
RoleRef: rbacv1.RoleRef{
APIGroup: "rbac.authorization.k8s.io",
Kind: "ClusterRole",
Name: name,
},
}, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("create clusterRoleBinding: %w", err)
}
var token string
err = wait.PollImmediate(time.Second*5, time.Minute*5, func() (done bool, err error) {
sa, err := c.CoreV1().ServiceAccounts(metav1.NamespaceSystem).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
klog.Warningf("Fail to get serviceAccount %s: %v", name, err)
return false, nil
}
if len(sa.Secrets) < 1 {
klog.Warningf("No secret found in serviceAccount %s", name)
return false, nil
}
secretRef := sa.Secrets[0]
secret, err := c.CoreV1().Secrets(metav1.NamespaceSystem).Get(context.TODO(), secretRef.Name, metav1.GetOptions{})
if err != nil {
klog.Warningf("Fail to get secret %s", secretRef.Name)
return false, nil
}
token = string(secret.Data["token"])
if len(token) == 0 {
klog.Warningf("Token in secret %s is empty", secretRef.Name)
return false, nil
}
return true, nil
})
if err != nil {
return err
}
var nginxConfig string
for _, info := range infos {
nginxConfig += fmt.Sprintf(`
server {
listen %d;
server_name _;
proxy_set_header Authorization "Bearer %s";
proxy_ssl_verify off;
location /metrics {
proxy_pass https://%s:%d;
}
}
`, info.Port, token, info.IP, info.Port)
}
_, err = c.CoreV1().ConfigMaps(metav1.NamespaceSystem).Create(context.TODO(), &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: metav1.NamespaceSystem,
},
Data: map[string]string{
"metrics.conf": nginxConfig,
},
}, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("create nginx configmap: %w", err)
}
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: metav1.NamespaceSystem,
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "nginx",
Image: imageutils.GetE2EImage(imageutils.Nginx),
VolumeMounts: []v1.VolumeMount{{
Name: "config",
MountPath: "/etc/nginx/conf.d",
ReadOnly: true,
}},
}},
Volumes: []v1.Volume{{
Name: "config",
VolumeSource: v1.VolumeSource{
ConfigMap: &v1.ConfigMapVolumeSource{
LocalObjectReference: v1.LocalObjectReference{
Name: name,
},
},
},
}},
},
}
_, err = c.CoreV1().Pods(metav1.NamespaceSystem).Create(context.TODO(), pod, metav1.CreateOptions{})
if err != nil {
return err
}
err = e2epod.WaitForPodNameRunningInNamespace(c, name, metav1.NamespaceSystem)
if err != nil {
return err
}
klog.Info("Successfully setup metrics-proxy")
return nil
}

View File

@ -0,0 +1,215 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pod
import (
"context"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"regexp"
"strconv"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
"k8s.io/klog/v2"
)
// NewTransport creates a transport which uses the port forward dialer.
// URLs must use <namespace>.<pod>:<port> as host.
func NewTransport(client kubernetes.Interface, restConfig *rest.Config) *http.Transport {
return &http.Transport{
DialContext: func(ctx context.Context, _, addr string) (net.Conn, error) {
dialer := NewDialer(client, restConfig)
a, err := ParseAddr(addr)
if err != nil {
return nil, err
}
return dialer.DialContainerPort(ctx, *a)
},
}
}
// NewDialer creates a dialer that supports connecting to container ports.
func NewDialer(client kubernetes.Interface, restConfig *rest.Config) *Dialer {
return &Dialer{
client: client,
restConfig: restConfig,
}
}
// Dialer holds the relevant parameters that are independent of a particular connection.
type Dialer struct {
client kubernetes.Interface
restConfig *rest.Config
}
// DialContainerPort connects to a certain container port in a pod.
func (d *Dialer) DialContainerPort(ctx context.Context, addr Addr) (conn net.Conn, finalErr error) {
restClient := d.client.CoreV1().RESTClient()
restConfig := d.restConfig
if restConfig.GroupVersion == nil {
restConfig.GroupVersion = &schema.GroupVersion{}
}
if restConfig.NegotiatedSerializer == nil {
restConfig.NegotiatedSerializer = scheme.Codecs
}
// The setup code around the actual portforward is from
// https://github.com/kubernetes/kubernetes/blob/c652ffbe4a29143623a1aaec39f745575f7e43ad/staging/src/k8s.io/kubectl/pkg/cmd/portforward/portforward.go
req := restClient.Post().
Resource("pods").
Namespace(addr.Namespace).
Name(addr.PodName).
SubResource("portforward")
transport, upgrader, err := spdy.RoundTripperFor(restConfig)
if err != nil {
return nil, fmt.Errorf("create round tripper: %v", err)
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", req.URL())
streamConn, _, err := dialer.Dial(portforward.PortForwardProtocolV1Name)
if err != nil {
return nil, fmt.Errorf("dialer failed: %v", err)
}
requestID := "1"
defer func() {
if finalErr != nil {
streamConn.Close()
}
}()
// create error stream
headers := http.Header{}
headers.Set(v1.StreamType, v1.StreamTypeError)
headers.Set(v1.PortHeader, fmt.Sprintf("%d", addr.Port))
headers.Set(v1.PortForwardRequestIDHeader, requestID)
// We're not writing to this stream, just reading an error message from it.
// This happens asynchronously.
errorStream, err := streamConn.CreateStream(headers)
if err != nil {
return nil, fmt.Errorf("error creating error stream: %v", err)
}
errorStream.Close()
go func() {
message, err := ioutil.ReadAll(errorStream)
switch {
case err != nil:
klog.ErrorS(err, "error reading from error stream")
case len(message) > 0:
klog.ErrorS(errors.New(string(message)), "an error occurred connecting to the remote port")
}
}()
// create data stream
headers.Set(v1.StreamType, v1.StreamTypeData)
dataStream, err := streamConn.CreateStream(headers)
if err != nil {
return nil, fmt.Errorf("error creating data stream: %v", err)
}
return &stream{
Stream: dataStream,
streamConn: streamConn,
}, nil
}
// Addr contains all relevant parameters for a certain port in a pod.
// The container should be running before connections are attempted,
// otherwise the connection will fail.
type Addr struct {
Namespace, PodName string
Port int
}
var _ net.Addr = Addr{}
func (a Addr) Network() string {
return "port-forwarding"
}
func (a Addr) String() string {
return fmt.Sprintf("%s.%s:%d", a.Namespace, a.PodName, a.Port)
}
// ParseAddr expects a <namespace>.<pod>:<port number> as produced
// by Addr.String.
func ParseAddr(addr string) (*Addr, error) {
parts := addrRegex.FindStringSubmatch(addr)
if parts == nil {
return nil, fmt.Errorf("%q: must match the format <namespace>.<pod>:<port number>", addr)
}
port, _ := strconv.Atoi(parts[3])
return &Addr{
Namespace: parts[1],
PodName: parts[2],
Port: port,
}, nil
}
var addrRegex = regexp.MustCompile(`^([^\.]+)\.([^:]+):(\d+)$`)
type stream struct {
addr Addr
httpstream.Stream
streamConn httpstream.Connection
}
var _ net.Conn = &stream{}
func (s *stream) Close() error {
s.Stream.Close()
s.streamConn.Close()
return nil
}
func (s *stream) LocalAddr() net.Addr {
return LocalAddr{}
}
func (s *stream) RemoteAddr() net.Addr {
return s.addr
}
func (s *stream) SetDeadline(t time.Time) error {
return nil
}
func (s *stream) SetReadDeadline(t time.Time) error {
return nil
}
func (s *stream) SetWriteDeadline(t time.Time) error {
return nil
}
type LocalAddr struct{}
var _ net.Addr = LocalAddr{}
func (l LocalAddr) Network() string { return "port-forwarding" }
func (l LocalAddr) String() string { return "apiserver" }

View File

@ -51,7 +51,7 @@ var _ = instrumentation.SIGDescribe("MetricsGrabber", func() {
} }
} }
gomega.Eventually(func() error { gomega.Eventually(func() error {
grabber, err = e2emetrics.NewMetricsGrabber(c, ec, true, true, true, true, true, true) grabber, err = e2emetrics.NewMetricsGrabber(c, ec, f.ClientConfig(), true, true, true, true, true, true)
if err != nil { if err != nil {
return fmt.Errorf("failed to create metrics grabber: %v", err) return fmt.Errorf("failed to create metrics grabber: %v", err)
} }

View File

@ -1653,7 +1653,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
} }
defer cleanup() defer cleanup()
metricsGrabber, err := e2emetrics.NewMetricsGrabber(m.config.Framework.ClientSet, nil, false, false, false, false, false, true) metricsGrabber, err := e2emetrics.NewMetricsGrabber(m.config.Framework.ClientSet, nil, f.ClientConfig(), false, false, false, false, false, true)
if err != nil { if err != nil {
framework.Failf("Error creating metrics grabber : %v", err) framework.Failf("Error creating metrics grabber : %v", err)
} }

View File

@ -24,6 +24,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/component-base/metrics/testutil" "k8s.io/component-base/metrics/testutil"
csitrans "k8s.io/csi-translation-lib" csitrans "k8s.io/csi-translation-lib"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
@ -43,6 +44,7 @@ type opCounts map[string]int64
// migrationOpCheck validates migrated metrics. // migrationOpCheck validates migrated metrics.
type migrationOpCheck struct { type migrationOpCheck struct {
cs clientset.Interface cs clientset.Interface
config *rest.Config
pluginName string pluginName string
skipCheck bool skipCheck bool
@ -100,14 +102,14 @@ func getVolumeOpsFromMetricsForPlugin(ms testutil.Metrics, pluginName string) op
return totOps return totOps
} }
func getVolumeOpCounts(c clientset.Interface, pluginName string) opCounts { func getVolumeOpCounts(c clientset.Interface, config *rest.Config, pluginName string) opCounts {
if !framework.ProviderIs("gce", "gke", "aws") { if !framework.ProviderIs("gce", "gke", "aws") {
return opCounts{} return opCounts{}
} }
nodeLimit := 25 nodeLimit := 25
metricsGrabber, err := e2emetrics.NewMetricsGrabber(c, nil, true, false, true, false, false, false) metricsGrabber, err := e2emetrics.NewMetricsGrabber(c, nil, config, true, false, true, false, false, false)
if err != nil { if err != nil {
framework.ExpectNoError(err, "Error creating metrics grabber: %v", err) framework.ExpectNoError(err, "Error creating metrics grabber: %v", err)
@ -156,7 +158,7 @@ func addOpCounts(o1 opCounts, o2 opCounts) opCounts {
return totOps return totOps
} }
func getMigrationVolumeOpCounts(cs clientset.Interface, pluginName string) (opCounts, opCounts) { func getMigrationVolumeOpCounts(cs clientset.Interface, config *rest.Config, pluginName string) (opCounts, opCounts) {
if len(pluginName) > 0 { if len(pluginName) > 0 {
var migratedOps opCounts var migratedOps opCounts
l := csitrans.New() l := csitrans.New()
@ -166,18 +168,19 @@ func getMigrationVolumeOpCounts(cs clientset.Interface, pluginName string) (opCo
migratedOps = opCounts{} migratedOps = opCounts{}
} else { } else {
csiName = "kubernetes.io/csi:" + csiName csiName = "kubernetes.io/csi:" + csiName
migratedOps = getVolumeOpCounts(cs, csiName) migratedOps = getVolumeOpCounts(cs, config, csiName)
} }
return getVolumeOpCounts(cs, pluginName), migratedOps return getVolumeOpCounts(cs, config, pluginName), migratedOps
} }
// Not an in-tree driver // Not an in-tree driver
framework.Logf("Test running for native CSI Driver, not checking metrics") framework.Logf("Test running for native CSI Driver, not checking metrics")
return opCounts{}, opCounts{} return opCounts{}, opCounts{}
} }
func newMigrationOpCheck(cs clientset.Interface, pluginName string) *migrationOpCheck { func newMigrationOpCheck(cs clientset.Interface, config *rest.Config, pluginName string) *migrationOpCheck {
moc := migrationOpCheck{ moc := migrationOpCheck{
cs: cs, cs: cs,
config: config,
pluginName: pluginName, pluginName: pluginName,
} }
if len(pluginName) == 0 { if len(pluginName) == 0 {
@ -214,7 +217,7 @@ func newMigrationOpCheck(cs clientset.Interface, pluginName string) *migrationOp
return &moc return &moc
} }
moc.oldInTreeOps, moc.oldMigratedOps = getMigrationVolumeOpCounts(cs, pluginName) moc.oldInTreeOps, moc.oldMigratedOps = getMigrationVolumeOpCounts(cs, config, pluginName)
return &moc return &moc
} }
@ -223,7 +226,7 @@ func (moc *migrationOpCheck) validateMigrationVolumeOpCounts() {
return return
} }
newInTreeOps, _ := getMigrationVolumeOpCounts(moc.cs, moc.pluginName) newInTreeOps, _ := getMigrationVolumeOpCounts(moc.cs, moc.config, moc.pluginName)
for op, count := range newInTreeOps { for op, count := range newInTreeOps {
if count != moc.oldInTreeOps[op] { if count != moc.oldInTreeOps[op] {

View File

@ -112,7 +112,7 @@ func (t *multiVolumeTestSuite) DefineTests(driver storageframework.TestDriver, p
// Now do the more expensive test initialization. // Now do the more expensive test initialization.
l.config, l.driverCleanup = driver.PrepareTest(f) l.config, l.driverCleanup = driver.PrepareTest(f)
l.migrationCheck = newMigrationOpCheck(f.ClientSet, dInfo.InTreePluginName) l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName)
} }
cleanup := func() { cleanup := func() {

View File

@ -134,7 +134,7 @@ func (p *provisioningTestSuite) DefineTests(driver storageframework.TestDriver,
dDriver, _ = driver.(storageframework.DynamicPVTestDriver) dDriver, _ = driver.(storageframework.DynamicPVTestDriver)
// Now do the more expensive test initialization. // Now do the more expensive test initialization.
l.config, l.driverCleanup = driver.PrepareTest(f) l.config, l.driverCleanup = driver.PrepareTest(f)
l.migrationCheck = newMigrationOpCheck(f.ClientSet, dInfo.InTreePluginName) l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName)
l.cs = l.config.Framework.ClientSet l.cs = l.config.Framework.ClientSet
testVolumeSizeRange := p.GetTestSuiteInfo().SupportedSizeRange testVolumeSizeRange := p.GetTestSuiteInfo().SupportedSizeRange
driverVolumeSizeRange := dDriver.GetDriverInfo().SupportedSizeRange driverVolumeSizeRange := dDriver.GetDriverInfo().SupportedSizeRange

View File

@ -122,7 +122,7 @@ func (s *subPathTestSuite) DefineTests(driver storageframework.TestDriver, patte
// Now do the more expensive test initialization. // Now do the more expensive test initialization.
l.config, l.driverCleanup = driver.PrepareTest(f) l.config, l.driverCleanup = driver.PrepareTest(f)
l.migrationCheck = newMigrationOpCheck(f.ClientSet, driver.GetDriverInfo().InTreePluginName) l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), driver.GetDriverInfo().InTreePluginName)
testVolumeSizeRange := s.GetTestSuiteInfo().SupportedSizeRange testVolumeSizeRange := s.GetTestSuiteInfo().SupportedSizeRange
l.resource = storageframework.CreateVolumeResource(driver, l.config, pattern, testVolumeSizeRange) l.resource = storageframework.CreateVolumeResource(driver, l.config, pattern, testVolumeSizeRange)
l.hostExec = utils.NewHostExec(f) l.hostExec = utils.NewHostExec(f)

View File

@ -148,7 +148,7 @@ func (t *topologyTestSuite) DefineTests(driver storageframework.TestDriver, patt
StorageClassName: &(l.resource.Sc.Name), StorageClassName: &(l.resource.Sc.Name),
}, l.config.Framework.Namespace.Name) }, l.config.Framework.Namespace.Name)
l.migrationCheck = newMigrationOpCheck(f.ClientSet, dInfo.InTreePluginName) l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName)
return l return l
} }

View File

@ -121,7 +121,7 @@ func (v *volumeExpandTestSuite) DefineTests(driver storageframework.TestDriver,
// Now do the more expensive test initialization. // Now do the more expensive test initialization.
l.config, l.driverCleanup = driver.PrepareTest(f) l.config, l.driverCleanup = driver.PrepareTest(f)
l.migrationCheck = newMigrationOpCheck(f.ClientSet, driver.GetDriverInfo().InTreePluginName) l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), driver.GetDriverInfo().InTreePluginName)
testVolumeSizeRange := v.GetTestSuiteInfo().SupportedSizeRange testVolumeSizeRange := v.GetTestSuiteInfo().SupportedSizeRange
l.resource = storageframework.CreateVolumeResource(driver, l.config, pattern, testVolumeSizeRange) l.resource = storageframework.CreateVolumeResource(driver, l.config, pattern, testVolumeSizeRange)
} }

View File

@ -117,7 +117,7 @@ func (t *volumeIOTestSuite) DefineTests(driver storageframework.TestDriver, patt
// Now do the more expensive test initialization. // Now do the more expensive test initialization.
l.config, l.driverCleanup = driver.PrepareTest(f) l.config, l.driverCleanup = driver.PrepareTest(f)
l.migrationCheck = newMigrationOpCheck(f.ClientSet, dInfo.InTreePluginName) l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName)
testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
l.resource = storageframework.CreateVolumeResource(driver, l.config, pattern, testVolumeSizeRange) l.resource = storageframework.CreateVolumeResource(driver, l.config, pattern, testVolumeSizeRange)

View File

@ -120,7 +120,7 @@ func (t *volumeStressTestSuite) DefineTests(driver storageframework.TestDriver,
// Now do the more expensive test initialization. // Now do the more expensive test initialization.
l.config, l.driverCleanup = driver.PrepareTest(f) l.config, l.driverCleanup = driver.PrepareTest(f)
l.migrationCheck = newMigrationOpCheck(f.ClientSet, dInfo.InTreePluginName) l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName)
l.volumes = []*storageframework.VolumeResource{} l.volumes = []*storageframework.VolumeResource{}
l.pods = []*v1.Pod{} l.pods = []*v1.Pod{}
l.testOptions = *dInfo.StressTestOptions l.testOptions = *dInfo.StressTestOptions

View File

@ -115,7 +115,7 @@ func (t *volumeModeTestSuite) DefineTests(driver storageframework.TestDriver, pa
// Now do the more expensive test initialization. // Now do the more expensive test initialization.
l.config, l.driverCleanup = driver.PrepareTest(f) l.config, l.driverCleanup = driver.PrepareTest(f)
l.migrationCheck = newMigrationOpCheck(f.ClientSet, dInfo.InTreePluginName) l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName)
} }
// manualInit initializes l.VolumeResource without creating the PV & PVC objects. // manualInit initializes l.VolumeResource without creating the PV & PVC objects.

View File

@ -135,7 +135,7 @@ func (t *volumesTestSuite) DefineTests(driver storageframework.TestDriver, patte
// Now do the more expensive test initialization. // Now do the more expensive test initialization.
l.config, l.driverCleanup = driver.PrepareTest(f) l.config, l.driverCleanup = driver.PrepareTest(f)
l.migrationCheck = newMigrationOpCheck(f.ClientSet, dInfo.InTreePluginName) l.migrationCheck = newMigrationOpCheck(f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName)
testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
l.resource = storageframework.CreateVolumeResource(driver, l.config, pattern, testVolumeSizeRange) l.resource = storageframework.CreateVolumeResource(driver, l.config, pattern, testVolumeSizeRange)
if l.resource.VolSource == nil { if l.resource.VolSource == nil {

View File

@ -81,7 +81,7 @@ var _ = utils.SIGDescribe("[Serial] Volume metrics", func() {
VolumeMode: &blockMode, VolumeMode: &blockMode,
}, ns) }, ns)
metricsGrabber, err = e2emetrics.NewMetricsGrabber(c, nil, true, false, true, false, false, false) metricsGrabber, err = e2emetrics.NewMetricsGrabber(c, nil, f.ClientConfig(), true, false, true, false, false, false)
if err != nil { if err != nil {
framework.Failf("Error creating metrics grabber : %v", err) framework.Failf("Error creating metrics grabber : %v", err)

View File

@ -22,6 +22,7 @@ import (
"path" "path"
"time" "time"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics" e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
) )
@ -55,13 +56,17 @@ func AfterSuiteActions() {
func gatherTestSuiteMetrics() error { func gatherTestSuiteMetrics() error {
framework.Logf("Gathering metrics") framework.Logf("Gathering metrics")
c, err := framework.LoadClientset() config, err := framework.LoadConfig()
if err != nil { if err != nil {
return fmt.Errorf("error loading client: %v", err) return fmt.Errorf("error loading client config: %v", err)
}
c, err := clientset.NewForConfig(config)
if err != nil {
return fmt.Errorf("error creating client: %v", err)
} }
// Grab metrics for apiserver, scheduler, controller-manager, kubelet (for non-kubemark case) and cluster autoscaler (optionally). // Grab metrics for apiserver, scheduler, controller-manager, kubelet (for non-kubemark case) and cluster autoscaler (optionally).
grabber, err := e2emetrics.NewMetricsGrabber(c, nil, !framework.ProviderIs("kubemark"), true, true, true, framework.TestContext.IncludeClusterAutoscalerMetrics, false) grabber, err := e2emetrics.NewMetricsGrabber(c, nil, config, !framework.ProviderIs("kubemark"), true, true, true, framework.TestContext.IncludeClusterAutoscalerMetrics, false)
if err != nil { if err != nil {
return fmt.Errorf("failed to create MetricsGrabber: %v", err) return fmt.Errorf("failed to create MetricsGrabber: %v", err)
} }