e2e: use Ginkgo context

All code must use the context from Ginkgo when doing API calls or polling for a
change, otherwise the code would not return immediately when the test gets
aborted.
This commit is contained in:
Patrick Ohly
2022-12-12 10:11:10 +01:00
parent bf1d1dfd0f
commit 2f6c4f5eab
418 changed files with 11489 additions and 11369 deletions

View File

@@ -88,7 +88,7 @@ type Grabber struct {
// Collecting metrics data is an optional debug feature. Not all clusters will
// support it. If disabled for a component, the corresponding Grab function
// will immediately return an error derived from MetricsGrabbingDisabledError.
func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, config *rest.Config, kubelets bool, scheduler bool, controllers bool, apiServer bool, clusterAutoscaler bool, snapshotController bool) (*Grabber, error) {
func NewMetricsGrabber(ctx context.Context, c clientset.Interface, ec clientset.Interface, config *rest.Config, kubelets bool, scheduler bool, controllers bool, apiServer bool, clusterAutoscaler bool, snapshotController bool) (*Grabber, error) {
kubeScheduler := ""
kubeControllerManager := ""
@@ -102,7 +102,7 @@ func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, config *re
return nil, errors.New("a rest config is required for grabbing kube-controller and kube-controller-manager metrics")
}
podList, err := c.CoreV1().Pods(metav1.NamespaceSystem).List(context.TODO(), metav1.ListOptions{})
podList, err := c.CoreV1().Pods(metav1.NamespaceSystem).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
@@ -132,18 +132,18 @@ func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, config *re
externalClient: ec,
config: config,
grabFromAPIServer: apiServer,
grabFromControllerManager: checkPodDebugHandlers(c, controllers, "kube-controller-manager", kubeControllerManager),
grabFromControllerManager: checkPodDebugHandlers(ctx, c, controllers, "kube-controller-manager", kubeControllerManager),
grabFromKubelets: kubelets,
grabFromScheduler: checkPodDebugHandlers(c, scheduler, "kube-scheduler", kubeScheduler),
grabFromScheduler: checkPodDebugHandlers(ctx, c, scheduler, "kube-scheduler", kubeScheduler),
grabFromClusterAutoscaler: clusterAutoscaler,
grabFromSnapshotController: checkPodDebugHandlers(c, snapshotController, "snapshot-controller", snapshotControllerManager),
grabFromSnapshotController: checkPodDebugHandlers(ctx, c, snapshotController, "snapshot-controller", snapshotControllerManager),
kubeScheduler: kubeScheduler,
kubeControllerManager: kubeControllerManager,
snapshotController: snapshotControllerManager,
}, nil
}
func checkPodDebugHandlers(c clientset.Interface, requested bool, component, podName string) bool {
func checkPodDebugHandlers(ctx context.Context, c clientset.Interface, requested bool, component, podName string) bool {
if !requested {
return false
}
@@ -155,7 +155,7 @@ func checkPodDebugHandlers(c clientset.Interface, requested bool, component, pod
// The debug handlers on the host where the pod runs might be disabled.
// We can check that indirectly by trying to retrieve log output.
limit := int64(1)
if _, err := c.CoreV1().Pods(metav1.NamespaceSystem).GetLogs(podName, &v1.PodLogOptions{LimitBytes: &limit}).DoRaw(context.TODO()); err != nil {
if _, err := c.CoreV1().Pods(metav1.NamespaceSystem).GetLogs(podName, &v1.PodLogOptions{LimitBytes: &limit}).DoRaw(ctx); err != nil {
klog.Warningf("Can't retrieve log output of %s (%q). Debug handlers might be disabled in kubelet. Grabbing metrics from %s is disabled.",
podName, err, component)
return false
@@ -171,8 +171,8 @@ func (g *Grabber) HasControlPlanePods() bool {
}
// GrabFromKubelet returns metrics from kubelet
func (g *Grabber) GrabFromKubelet(nodeName string) (KubeletMetrics, error) {
nodes, err := g.client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{FieldSelector: fields.Set{"metadata.name": nodeName}.AsSelector().String()})
func (g *Grabber) GrabFromKubelet(ctx context.Context, nodeName string) (KubeletMetrics, error) {
nodes, err := g.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{FieldSelector: fields.Set{"metadata.name": nodeName}.AsSelector().String()})
if err != nil {
return KubeletMetrics{}, err
}
@@ -180,14 +180,14 @@ func (g *Grabber) GrabFromKubelet(nodeName string) (KubeletMetrics, error) {
return KubeletMetrics{}, fmt.Errorf("Error listing nodes with name %v, got %v", nodeName, nodes.Items)
}
kubeletPort := nodes.Items[0].Status.DaemonEndpoints.KubeletEndpoint.Port
return g.grabFromKubeletInternal(nodeName, int(kubeletPort))
return g.grabFromKubeletInternal(ctx, nodeName, int(kubeletPort))
}
func (g *Grabber) grabFromKubeletInternal(nodeName string, kubeletPort int) (KubeletMetrics, error) {
func (g *Grabber) grabFromKubeletInternal(ctx context.Context, nodeName string, kubeletPort int) (KubeletMetrics, error) {
if kubeletPort <= 0 || kubeletPort > 65535 {
return KubeletMetrics{}, fmt.Errorf("Invalid Kubelet port %v. Skipping Kubelet's metrics gathering", kubeletPort)
}
output, err := g.getMetricsFromNode(nodeName, int(kubeletPort))
output, err := g.getMetricsFromNode(ctx, nodeName, int(kubeletPort))
if err != nil {
return KubeletMetrics{}, err
}
@@ -195,7 +195,7 @@ func (g *Grabber) grabFromKubeletInternal(nodeName string, kubeletPort int) (Kub
}
// GrabFromScheduler returns metrics from scheduler
func (g *Grabber) GrabFromScheduler() (SchedulerMetrics, error) {
func (g *Grabber) GrabFromScheduler(ctx context.Context) (SchedulerMetrics, error) {
if !g.grabFromScheduler {
return SchedulerMetrics{}, fmt.Errorf("kube-scheduler: %w", MetricsGrabbingDisabledError)
}
@@ -203,7 +203,7 @@ func (g *Grabber) GrabFromScheduler() (SchedulerMetrics, error) {
var err error
g.waitForSchedulerReadyOnce.Do(func() {
if readyErr := e2epod.WaitTimeoutForPodReadyInNamespace(g.client, g.kubeScheduler, metav1.NamespaceSystem, 5*time.Minute); readyErr != nil {
if readyErr := e2epod.WaitTimeoutForPodReadyInNamespace(ctx, g.client, g.kubeScheduler, metav1.NamespaceSystem, 5*time.Minute); readyErr != nil {
err = fmt.Errorf("error waiting for kube-scheduler pod to be ready: %w", readyErr)
}
})
@@ -213,8 +213,8 @@ func (g *Grabber) GrabFromScheduler() (SchedulerMetrics, error) {
var lastMetricsFetchErr error
var output string
if metricsWaitErr := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
output, lastMetricsFetchErr = g.getSecureMetricsFromPod(g.kubeScheduler, metav1.NamespaceSystem, kubeSchedulerPort)
if metricsWaitErr := wait.PollImmediateWithContext(ctx, time.Second, time.Minute, func(ctx context.Context) (bool, error) {
output, lastMetricsFetchErr = g.getSecureMetricsFromPod(ctx, g.kubeScheduler, metav1.NamespaceSystem, kubeSchedulerPort)
return lastMetricsFetchErr == nil, nil
}); metricsWaitErr != nil {
err := fmt.Errorf("error waiting for kube-scheduler pod to expose metrics: %v; %v", metricsWaitErr, lastMetricsFetchErr)
@@ -225,7 +225,7 @@ func (g *Grabber) GrabFromScheduler() (SchedulerMetrics, error) {
}
// GrabFromClusterAutoscaler returns metrics from cluster autoscaler
func (g *Grabber) GrabFromClusterAutoscaler() (ClusterAutoscalerMetrics, error) {
func (g *Grabber) GrabFromClusterAutoscaler(ctx context.Context) (ClusterAutoscalerMetrics, error) {
if !g.HasControlPlanePods() && g.externalClient == nil {
return ClusterAutoscalerMetrics{}, fmt.Errorf("ClusterAutoscaler: %w", MetricsGrabbingDisabledError)
}
@@ -238,7 +238,7 @@ func (g *Grabber) GrabFromClusterAutoscaler() (ClusterAutoscalerMetrics, error)
client = g.client
namespace = metav1.NamespaceSystem
}
output, err := g.getMetricsFromPod(client, "cluster-autoscaler", namespace, 8085)
output, err := g.getMetricsFromPod(ctx, client, "cluster-autoscaler", namespace, 8085)
if err != nil {
return ClusterAutoscalerMetrics{}, err
}
@@ -246,7 +246,7 @@ func (g *Grabber) GrabFromClusterAutoscaler() (ClusterAutoscalerMetrics, error)
}
// GrabFromControllerManager returns metrics from controller manager
func (g *Grabber) GrabFromControllerManager() (ControllerManagerMetrics, error) {
func (g *Grabber) GrabFromControllerManager(ctx context.Context) (ControllerManagerMetrics, error) {
if !g.grabFromControllerManager {
return ControllerManagerMetrics{}, fmt.Errorf("kube-controller-manager: %w", MetricsGrabbingDisabledError)
}
@@ -254,7 +254,7 @@ func (g *Grabber) GrabFromControllerManager() (ControllerManagerMetrics, error)
var err error
g.waitForControllerManagerReadyOnce.Do(func() {
if readyErr := e2epod.WaitTimeoutForPodReadyInNamespace(g.client, g.kubeControllerManager, metav1.NamespaceSystem, 5*time.Minute); readyErr != nil {
if readyErr := e2epod.WaitTimeoutForPodReadyInNamespace(ctx, g.client, g.kubeControllerManager, metav1.NamespaceSystem, 5*time.Minute); readyErr != nil {
err = fmt.Errorf("error waiting for kube-controller-manager pod to be ready: %w", readyErr)
}
})
@@ -264,8 +264,8 @@ func (g *Grabber) GrabFromControllerManager() (ControllerManagerMetrics, error)
var output string
var lastMetricsFetchErr error
if metricsWaitErr := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
output, lastMetricsFetchErr = g.getSecureMetricsFromPod(g.kubeControllerManager, metav1.NamespaceSystem, kubeControllerManagerPort)
if metricsWaitErr := wait.PollImmediateWithContext(ctx, time.Second, time.Minute, func(ctx context.Context) (bool, error) {
output, lastMetricsFetchErr = g.getSecureMetricsFromPod(ctx, g.kubeControllerManager, metav1.NamespaceSystem, kubeControllerManagerPort)
return lastMetricsFetchErr == nil, nil
}); metricsWaitErr != nil {
err := fmt.Errorf("error waiting for kube-controller-manager to expose metrics: %v; %v", metricsWaitErr, lastMetricsFetchErr)
@@ -276,7 +276,7 @@ func (g *Grabber) GrabFromControllerManager() (ControllerManagerMetrics, error)
}
// GrabFromSnapshotController returns metrics from controller manager
func (g *Grabber) GrabFromSnapshotController(podName string, port int) (SnapshotControllerMetrics, error) {
func (g *Grabber) GrabFromSnapshotController(ctx context.Context, podName string, port int) (SnapshotControllerMetrics, error) {
if !g.grabFromSnapshotController {
return SnapshotControllerMetrics{}, fmt.Errorf("volume-snapshot-controller: %w", MetricsGrabbingDisabledError)
}
@@ -293,7 +293,7 @@ func (g *Grabber) GrabFromSnapshotController(podName string, port int) (Snapshot
var err error
g.waitForSnapshotControllerReadyOnce.Do(func() {
if readyErr := e2epod.WaitTimeoutForPodReadyInNamespace(g.client, podName, metav1.NamespaceSystem, 5*time.Minute); readyErr != nil {
if readyErr := e2epod.WaitTimeoutForPodReadyInNamespace(ctx, g.client, podName, metav1.NamespaceSystem, 5*time.Minute); readyErr != nil {
err = fmt.Errorf("error waiting for volume-snapshot-controller pod to be ready: %w", readyErr)
}
})
@@ -303,8 +303,8 @@ func (g *Grabber) GrabFromSnapshotController(podName string, port int) (Snapshot
var output string
var lastMetricsFetchErr error
if metricsWaitErr := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
output, lastMetricsFetchErr = g.getMetricsFromPod(g.client, podName, metav1.NamespaceSystem, port)
if metricsWaitErr := wait.PollImmediateWithContext(ctx, time.Second, time.Minute, func(ctx context.Context) (bool, error) {
output, lastMetricsFetchErr = g.getMetricsFromPod(ctx, g.client, podName, metav1.NamespaceSystem, port)
return lastMetricsFetchErr == nil, nil
}); metricsWaitErr != nil {
err = fmt.Errorf("error waiting for volume-snapshot-controller pod to expose metrics: %v; %v", metricsWaitErr, lastMetricsFetchErr)
@@ -315,8 +315,8 @@ func (g *Grabber) GrabFromSnapshotController(podName string, port int) (Snapshot
}
// GrabFromAPIServer returns metrics from API server
func (g *Grabber) GrabFromAPIServer() (APIServerMetrics, error) {
output, err := g.getMetricsFromAPIServer()
func (g *Grabber) GrabFromAPIServer(ctx context.Context) (APIServerMetrics, error) {
output, err := g.getMetricsFromAPIServer(ctx)
if err != nil {
return APIServerMetrics{}, err
}
@@ -324,11 +324,11 @@ func (g *Grabber) GrabFromAPIServer() (APIServerMetrics, error) {
}
// Grab returns metrics from corresponding component
func (g *Grabber) Grab() (Collection, error) {
func (g *Grabber) Grab(ctx context.Context) (Collection, error) {
result := Collection{}
var errs []error
if g.grabFromAPIServer {
metrics, err := g.GrabFromAPIServer()
metrics, err := g.GrabFromAPIServer(ctx)
if err != nil {
errs = append(errs, err)
} else {
@@ -336,7 +336,7 @@ func (g *Grabber) Grab() (Collection, error) {
}
}
if g.grabFromScheduler {
metrics, err := g.GrabFromScheduler()
metrics, err := g.GrabFromScheduler(ctx)
if err != nil {
errs = append(errs, err)
} else {
@@ -344,7 +344,7 @@ func (g *Grabber) Grab() (Collection, error) {
}
}
if g.grabFromControllerManager {
metrics, err := g.GrabFromControllerManager()
metrics, err := g.GrabFromControllerManager(ctx)
if err != nil {
errs = append(errs, err)
} else {
@@ -352,7 +352,7 @@ func (g *Grabber) Grab() (Collection, error) {
}
}
if g.grabFromSnapshotController {
metrics, err := g.GrabFromSnapshotController(g.snapshotController, snapshotControllerPort)
metrics, err := g.GrabFromSnapshotController(ctx, g.snapshotController, snapshotControllerPort)
if err != nil {
errs = append(errs, err)
} else {
@@ -360,7 +360,7 @@ func (g *Grabber) Grab() (Collection, error) {
}
}
if g.grabFromClusterAutoscaler {
metrics, err := g.GrabFromClusterAutoscaler()
metrics, err := g.GrabFromClusterAutoscaler(ctx)
if err != nil {
errs = append(errs, err)
} else {
@@ -369,13 +369,13 @@ func (g *Grabber) Grab() (Collection, error) {
}
if g.grabFromKubelets {
result.KubeletMetrics = make(map[string]KubeletMetrics)
nodes, err := g.client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
nodes, err := g.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
errs = append(errs, err)
} else {
for _, node := range nodes.Items {
kubeletPort := node.Status.DaemonEndpoints.KubeletEndpoint.Port
metrics, err := g.grabFromKubeletInternal(node.Name, int(kubeletPort))
metrics, err := g.grabFromKubeletInternal(ctx, node.Name, int(kubeletPort))
if err != nil {
errs = append(errs, err)
}
@@ -390,14 +390,14 @@ func (g *Grabber) Grab() (Collection, error) {
}
// getMetricsFromPod retrieves metrics data from an insecure port.
func (g *Grabber) getMetricsFromPod(client clientset.Interface, podName string, namespace string, port int) (string, error) {
func (g *Grabber) getMetricsFromPod(ctx context.Context, client clientset.Interface, podName string, namespace string, port int) (string, error) {
rawOutput, err := client.CoreV1().RESTClient().Get().
Namespace(namespace).
Resource("pods").
SubResource("proxy").
Name(fmt.Sprintf("%s:%d", podName, port)).
Suffix("metrics").
Do(context.TODO()).Raw()
Do(ctx).Raw()
if err != nil {
return "", err
}
@@ -409,7 +409,7 @@ func (g *Grabber) getMetricsFromPod(client clientset.Interface, podName string,
// similar to "kubectl port-forward" + "kubectl get --raw
// https://localhost:<port>/metrics". It uses the same credentials
// as kubelet.
func (g *Grabber) getSecureMetricsFromPod(podName string, namespace string, port int) (string, error) {
func (g *Grabber) getSecureMetricsFromPod(ctx context.Context, podName string, namespace string, port int) (string, error) {
dialer := e2epod.NewDialer(g.client, g.config)
metricConfig := rest.CopyConfig(g.config)
addr := e2epod.Addr{
@@ -444,7 +444,7 @@ func (g *Grabber) getSecureMetricsFromPod(podName string, namespace string, port
rawOutput, err := metricClient.RESTClient().Get().
AbsPath("metrics").
Do(context.TODO()).Raw()
Do(ctx).Raw()
if err != nil {
return "", err
}