From 9544222e919f1aec3381620aca3025d10668a071 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 6 Apr 2018 18:24:56 +0200 Subject: [PATCH] Test e2e prometheus addon --- cluster/gce/util.sh | 1 + hack/ginkgo-e2e.sh | 1 + test/e2e/common/autoscaling_utils.go | 28 +- test/e2e/framework/test_context.go | 3 + test/e2e/framework/util.go | 6 + test/e2e/instrumentation/monitoring/BUILD | 2 + .../instrumentation/monitoring/prometheus.go | 382 ++++++++++++++++++ test/utils/runners.go | 17 +- 8 files changed, 426 insertions(+), 14 deletions(-) create mode 100644 test/e2e/instrumentation/monitoring/prometheus.go diff --git a/cluster/gce/util.sh b/cluster/gce/util.sh index 8fbabb81c71..3f51abfec7e 100755 --- a/cluster/gce/util.sh +++ b/cluster/gce/util.sh @@ -676,6 +676,7 @@ SERVICE_CLUSTER_IP_RANGE: $(yaml-quote ${SERVICE_CLUSTER_IP_RANGE}) KUBERNETES_MASTER_NAME: $(yaml-quote ${KUBERNETES_MASTER_NAME}) ALLOCATE_NODE_CIDRS: $(yaml-quote ${ALLOCATE_NODE_CIDRS:-false}) ENABLE_CLUSTER_MONITORING: $(yaml-quote ${ENABLE_CLUSTER_MONITORING:-none}) +ENABLE_PROMETHEUS_MONITORING: $(yaml-quote ${ENABLE_PROMETHEUS_MONITORING:-false}) ENABLE_METRICS_SERVER: $(yaml-quote ${ENABLE_METRICS_SERVER:-false}) ENABLE_METADATA_AGENT: $(yaml-quote ${ENABLE_METADATA_AGENT:-none}) METADATA_AGENT_VERSION: $(yaml-quote ${METADATA_AGENT_VERSION:-}) diff --git a/hack/ginkgo-e2e.sh b/hack/ginkgo-e2e.sh index bc754018612..8472115da2b 100755 --- a/hack/ginkgo-e2e.sh +++ b/hack/ginkgo-e2e.sh @@ -160,6 +160,7 @@ export PATH=$(dirname "${e2e_test}"):"${PATH}" --node-tag="${NODE_TAG:-}" \ --master-tag="${MASTER_TAG:-}" \ --cluster-monitoring-mode="${KUBE_ENABLE_CLUSTER_MONITORING:-influxdb}" \ + --prometheus-monitoring="${KUBE_ENABLE_PROMETHEUS_MONITORING:-false}" \ ${KUBE_CONTAINER_RUNTIME:+"--container-runtime=${KUBE_CONTAINER_RUNTIME}"} \ ${MASTER_OS_DISTRIBUTION:+"--master-os-distro=${MASTER_OS_DISTRIBUTION}"} \ ${NODE_OS_DISTRIBUTION:+"--node-os-distro=${NODE_OS_DISTRIBUTION}"} \ diff --git a/test/e2e/common/autoscaling_utils.go b/test/e2e/common/autoscaling_utils.go index f93a9a59940..2b467946dbc 100644 --- a/test/e2e/common/autoscaling_utils.go +++ b/test/e2e/common/autoscaling_utils.go @@ -108,13 +108,18 @@ func GetResourceConsumerImage() string { func NewDynamicResourceConsumer(name, nsName string, kind schema.GroupVersionKind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric int, cpuLimit, memLimit int64, clientset clientset.Interface, internalClientset *internalclientset.Clientset, scaleClient scaleclient.ScalesGetter) *ResourceConsumer { return newResourceConsumer(name, nsName, kind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric, dynamicConsumptionTimeInSeconds, - dynamicRequestSizeInMillicores, dynamicRequestSizeInMegabytes, dynamicRequestSizeCustomMetric, cpuLimit, memLimit, clientset, internalClientset, scaleClient) + dynamicRequestSizeInMillicores, dynamicRequestSizeInMegabytes, dynamicRequestSizeCustomMetric, cpuLimit, memLimit, clientset, internalClientset, scaleClient, nil, nil) } // TODO this still defaults to replication controller func NewStaticResourceConsumer(name, nsName string, replicas, initCPUTotal, initMemoryTotal, initCustomMetric int, cpuLimit, memLimit int64, clientset clientset.Interface, internalClientset *internalclientset.Clientset, scaleClient scaleclient.ScalesGetter) *ResourceConsumer { return newResourceConsumer(name, nsName, KindRC, replicas, initCPUTotal, initMemoryTotal, initCustomMetric, staticConsumptionTimeInSeconds, - initCPUTotal/replicas, initMemoryTotal/replicas, initCustomMetric/replicas, cpuLimit, memLimit, clientset, internalClientset, scaleClient) + initCPUTotal/replicas, initMemoryTotal/replicas, initCustomMetric/replicas, cpuLimit, memLimit, clientset, internalClientset, scaleClient, nil, nil) +} + +func NewMetricExporter(name, nsName string, podAnnotations, serviceAnnotations map[string]string, metricValue int, clientset clientset.Interface, internalClientset *internalclientset.Clientset, scaleClient scaleclient.ScalesGetter) *ResourceConsumer { + return newResourceConsumer(name, nsName, KindDeployment, 1, 0, 0, metricValue, dynamicConsumptionTimeInSeconds, + dynamicRequestSizeInMillicores, dynamicRequestSizeInMegabytes, dynamicRequestSizeCustomMetric, 100, 100, clientset, internalClientset, scaleClient, podAnnotations, serviceAnnotations) } /* @@ -125,9 +130,14 @@ memLimit argument is in megabytes, memLimit is a maximum amount of memory that c cpuLimit argument is in millicores, cpuLimit is a maximum amount of cpu that can be consumed by a single pod */ func newResourceConsumer(name, nsName string, kind schema.GroupVersionKind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric, consumptionTimeInSeconds, requestSizeInMillicores, - requestSizeInMegabytes int, requestSizeCustomMetric int, cpuLimit, memLimit int64, clientset clientset.Interface, internalClientset *internalclientset.Clientset, scaleClient scaleclient.ScalesGetter) *ResourceConsumer { - - runServiceAndWorkloadForResourceConsumer(clientset, internalClientset, nsName, name, kind, replicas, cpuLimit, memLimit) + requestSizeInMegabytes int, requestSizeCustomMetric int, cpuLimit, memLimit int64, clientset clientset.Interface, internalClientset *internalclientset.Clientset, scaleClient scaleclient.ScalesGetter, podAnnotations, serviceAnnotations map[string]string) *ResourceConsumer { + if podAnnotations == nil { + podAnnotations = make(map[string]string) + } + if serviceAnnotations == nil { + serviceAnnotations = make(map[string]string) + } + runServiceAndWorkloadForResourceConsumer(clientset, internalClientset, nsName, name, kind, replicas, cpuLimit, memLimit, podAnnotations, serviceAnnotations) rc := &ResourceConsumer{ name: name, controllerName: name + "-ctrl", @@ -227,7 +237,7 @@ func (rc *ResourceConsumer) makeConsumeCustomMetric() { delta := 0 for { select { - case delta := <-rc.customMetric: + case delta = <-rc.customMetric: framework.Logf("RC %s: setting bump of metric %s to %d in total", rc.name, customMetricName, delta) case <-time.After(sleepTime): framework.Logf("RC %s: sending request to consume %d of custom metric %s", rc.name, delta, customMetricName) @@ -410,11 +420,12 @@ func (rc *ResourceConsumer) CleanUp() { framework.ExpectNoError(rc.clientSet.CoreV1().Services(rc.nsName).Delete(rc.controllerName, nil)) } -func runServiceAndWorkloadForResourceConsumer(c clientset.Interface, internalClient internalclientset.Interface, ns, name string, kind schema.GroupVersionKind, replicas int, cpuLimitMillis, memLimitMb int64) { +func runServiceAndWorkloadForResourceConsumer(c clientset.Interface, internalClient internalclientset.Interface, ns, name string, kind schema.GroupVersionKind, replicas int, cpuLimitMillis, memLimitMb int64, podAnnotations, serviceAnnotations map[string]string) { By(fmt.Sprintf("Running consuming RC %s via %s with %v replicas", name, kind, replicas)) _, err := c.CoreV1().Services(ns).Create(&v1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: name, + Name: name, + Annotations: serviceAnnotations, }, Spec: v1.ServiceSpec{ Ports: []v1.ServicePort{{ @@ -441,6 +452,7 @@ func runServiceAndWorkloadForResourceConsumer(c clientset.Interface, internalCli CpuLimit: cpuLimitMillis, MemRequest: memLimitMb * 1024 * 1024, // MemLimit is in bytes MemLimit: memLimitMb * 1024 * 1024, + Annotations: podAnnotations, } switch kind { diff --git a/test/e2e/framework/test_context.go b/test/e2e/framework/test_context.go index e5e1fc93ca4..e3dc9edce22 100644 --- a/test/e2e/framework/test_context.go +++ b/test/e2e/framework/test_context.go @@ -112,6 +112,8 @@ type TestContextType struct { NodeTestContextType // Monitoring solution that is used in current cluster. ClusterMonitoringMode string + // Separate Prometheus monitoring deployed in cluster + EnablePrometheusMonitoring bool // Indicates what path the kubernetes-anywhere is installed on KubernetesAnywherePath string @@ -237,6 +239,7 @@ func RegisterClusterFlags() { flag.StringVar(&TestContext.MasterOSDistro, "master-os-distro", "debian", "The OS distribution of cluster master (debian, trusty, or coreos).") flag.StringVar(&TestContext.NodeOSDistro, "node-os-distro", "debian", "The OS distribution of cluster VM instances (debian, trusty, or coreos).") flag.StringVar(&TestContext.ClusterMonitoringMode, "cluster-monitoring-mode", "influxdb", "The monitoring solution that is used in the cluster.") + flag.BoolVar(&TestContext.EnablePrometheusMonitoring, "prometheus-monitoring", false, "Separate Prometheus monitoring deployed in cluster.") // TODO: Flags per provider? Rename gce-project/gce-zone? cloudConfig := &TestContext.CloudConfig diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 738dd946a6c..0dcb53f448f 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -364,6 +364,12 @@ func SkipUnlessClusterMonitoringModeIs(supportedMonitoring ...string) { } } +func SkipUnlessPrometheusMonitoringIsEnabled(supportedMonitoring ...string) { + if !TestContext.EnablePrometheusMonitoring { + Skipf("Skipped because prometheus monitoring is not enabled") + } +} + func SkipUnlessMasterOSDistroIs(supportedMasterOsDistros ...string) { if !MasterOSDistroIs(supportedMasterOsDistros...) { Skipf("Only supported for master OS distro %v (not %s)", supportedMasterOsDistros, TestContext.MasterOSDistro) diff --git a/test/e2e/instrumentation/monitoring/BUILD b/test/e2e/instrumentation/monitoring/BUILD index a817039f9c2..2eaa75ec707 100644 --- a/test/e2e/instrumentation/monitoring/BUILD +++ b/test/e2e/instrumentation/monitoring/BUILD @@ -14,6 +14,7 @@ go_library( "custom_metrics_stackdriver.go", "influxdb.go", "metrics_grabber.go", + "prometheus.go", "stackdriver.go", "stackdriver_metadata_agent.go", ], @@ -28,6 +29,7 @@ go_library( "//vendor/github.com/influxdata/influxdb/client/v2:go_default_library", "//vendor/github.com/onsi/ginkgo:go_default_library", "//vendor/github.com/onsi/gomega:go_default_library", + "//vendor/github.com/prometheus/common/model:go_default_library", "//vendor/golang.org/x/oauth2/google:go_default_library", "//vendor/google.golang.org/api/monitoring/v3:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", diff --git a/test/e2e/instrumentation/monitoring/prometheus.go b/test/e2e/instrumentation/monitoring/prometheus.go new file mode 100644 index 00000000000..e05b3e63e0f --- /dev/null +++ b/test/e2e/instrumentation/monitoring/prometheus.go @@ -0,0 +1,382 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package monitoring + +import ( + "context" + "encoding/json" + "fmt" + "math" + "time" + + "github.com/prometheus/common/model" + + . "github.com/onsi/ginkgo" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/test/e2e/common" + "k8s.io/kubernetes/test/e2e/framework" + instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common" +) + +const ( + prometheusQueryStep = time.Minute + prometheusMetricErrorTolerance = 0.25 + prometheusMetricValidationDuration = time.Minute * 2 + prometheusRate = time.Minute * 2 + prometheusRequiredNodesUpDuration = time.Minute * 5 + prometheusService = "prometheus" + prometheusSleepBetweenAttempts = time.Second * 30 + prometheusTestTimeout = time.Minute * 5 + customMetricValue = 1000 + targetCPUUsage = 0.1 +) + +var _ = instrumentation.SIGDescribe("[Feature:PrometheusMonitoring] Prometheus", func() { + BeforeEach(func() { + framework.SkipUnlessPrometheusMonitoringIsEnabled() + }) + + f := framework.NewDefaultFramework("prometheus-monitoring") + It("should scrape container metrics from all nodes.", func() { + expectedNodes, err := getAllNodes(f.ClientSet) + framework.ExpectNoError(err) + retryUntilSucceeds(func() error { + return validateMetricAvailableForAllNodes(f.ClientSet, `container_cpu_usage_seconds_total`, expectedNodes) + }, prometheusTestTimeout) + }) + It("should successfully scrape all targets", func() { + retryUntilSucceeds(func() error { + return validateAllActiveTargetsAreHealthy(f.ClientSet) + }, prometheusTestTimeout) + }) + It("should contain correct container CPU metric.", func() { + query := prometheusCPUQuery(f.Namespace.Name, "prometheus-cpu-consumer", prometheusRate) + consumer := consumeCPUResources(f, "prometheus-cpu-consumer", targetCPUUsage*1000) + defer consumer.CleanUp() + retryUntilSucceeds(func() error { + return validateQueryReturnsCorrectValues(f.ClientSet, query, targetCPUUsage, 3, prometheusMetricErrorTolerance) + }, prometheusTestTimeout) + }) + It("should scrape metrics from annotated pods.", func() { + query := prometheusPodCustomMetricQuery(f.Namespace.Name, "prometheus-custom-pod-metric") + consumer := exportCustomMetricFromPod(f, "prometheus-custom-pod-metric", customMetricValue) + defer consumer.CleanUp() + retryUntilSucceeds(func() error { + return validateQueryReturnsCorrectValues(f.ClientSet, query, customMetricValue, 1, prometheusMetricErrorTolerance) + }, prometheusTestTimeout) + }) + It("should scrape metrics from annotated services.", func() { + query := prometheusServiceCustomMetricQuery(f.Namespace.Name, "prometheus-custom-service-metric") + consumer := exportCustomMetricFromService(f, "prometheus-custom-service-metric", customMetricValue) + defer consumer.CleanUp() + retryUntilSucceeds(func() error { + return validateQueryReturnsCorrectValues(f.ClientSet, query, customMetricValue, 1, prometheusMetricErrorTolerance) + }, prometheusTestTimeout) + }) +}) + +func prometheusCPUQuery(namespace, podNamePrefix string, rate time.Duration) string { + return fmt.Sprintf(`sum(irate(container_cpu_usage_seconds_total{namespace="%v",pod_name=~"%v.*"}[%vm]))`, + namespace, podNamePrefix, int64(rate.Minutes())) +} + +func prometheusServiceCustomMetricQuery(namespace, service string) string { + return fmt.Sprintf(`sum(QPS{kubernetes_namespace="%v",kubernetes_name="%v"})`, namespace, service) +} + +func prometheusPodCustomMetricQuery(namespace, podNamePrefix string) string { + return fmt.Sprintf(`sum(QPS{kubernetes_namespace="%s",kubernetes_pod_name=~"%s.*"})`, namespace, podNamePrefix) +} + +func consumeCPUResources(f *framework.Framework, consumerName string, cpuUsage int) *common.ResourceConsumer { + return common.NewDynamicResourceConsumer(consumerName, f.Namespace.Name, common.KindDeployment, 1, cpuUsage, + memoryUsed, 0, int64(cpuUsage), memoryLimit, f.ClientSet, f.InternalClientset, f.ScalesGetter) +} + +func exportCustomMetricFromPod(f *framework.Framework, consumerName string, metricValue int) *common.ResourceConsumer { + podAnnotations := map[string]string{ + "prometheus.io/scrape": "true", + "prometheus.io/path": "/Metrics", + "prometheus.io/port": "8080", + } + return common.NewMetricExporter(consumerName, f.Namespace.Name, podAnnotations, nil, metricValue, f.ClientSet, f.InternalClientset, f.ScalesGetter) +} + +func exportCustomMetricFromService(f *framework.Framework, consumerName string, metricValue int) *common.ResourceConsumer { + serviceAnnotations := map[string]string{ + "prometheus.io/scrape": "true", + "prometheus.io/path": "/Metrics", + "prometheus.io/port": "8080", + } + return common.NewMetricExporter(consumerName, f.Namespace.Name, nil, serviceAnnotations, metricValue, f.ClientSet, f.InternalClientset, f.ScalesGetter) +} + +func validateMetricAvailableForAllNodes(c clientset.Interface, metric string, expectedNodesNames []string) error { + instanceLabels, err := getInstanceLabelsAvailableForMetric(c, prometheusRequiredNodesUpDuration, metric) + if err != nil { + return err + } + nodesWithMetric := make(map[string]bool) + for _, instance := range instanceLabels { + nodesWithMetric[instance] = true + } + missedNodesCount := 0 + for _, nodeName := range expectedNodesNames { + if _, found := nodesWithMetric[nodeName]; !found { + missedNodesCount++ + } + } + if missedNodesCount > 0 { + return fmt.Errorf("Metric not found for %v out of %v nodes", missedNodesCount, len(expectedNodesNames)) + } + return nil +} + +func validateAllActiveTargetsAreHealthy(c clientset.Interface) error { + discovery, err := fetchPrometheusTargetDiscovery(c) + if err != nil { + return err + } + if len(discovery.ActiveTargets) == 0 { + return fmt.Errorf("Prometheus is not scraping any targets, at least one target is required") + } + for _, target := range discovery.ActiveTargets { + if target.Health != HealthGood { + return fmt.Errorf("Target health not good. Target: %v", target) + } + } + return nil +} + +func validateQueryReturnsCorrectValues(c clientset.Interface, query string, expectedValue float64, minSamplesCount int, errorTolerance float64) error { + samples, err := fetchQueryValues(c, query, prometheusMetricValidationDuration) + if err != nil { + return err + } + if len(samples) < minSamplesCount { + return fmt.Errorf("Not enough samples for query '%v', got %v", query, samples) + } + framework.Logf("Executed query '%v' returned %v", query, samples) + for _, value := range samples { + error := math.Abs(value-expectedValue) / expectedValue + if error >= errorTolerance { + return fmt.Errorf("Query result values outside expected value tolerance. Expected error below %v, got %v", errorTolerance, error) + } + } + return nil +} + +func fetchQueryValues(c clientset.Interface, query string, duration time.Duration) ([]float64, error) { + now := time.Now() + response, err := queryPrometheus(c, query, now.Add(-duration), now, prometheusQueryStep) + if err != nil { + return nil, err + } + m, ok := response.(model.Matrix) + if !ok { + return nil, fmt.Errorf("Expected matric response, got: %T", response) + } + values := make([]float64, 0) + for _, stream := range m { + for _, sample := range stream.Values { + values = append(values, float64(sample.Value)) + } + } + return values, nil +} + +func getInstanceLabelsAvailableForMetric(c clientset.Interface, duration time.Duration, metric string) ([]string, error) { + var instance model.LabelValue + now := time.Now() + query := fmt.Sprintf(`sum(%v)by(instance)`, metric) + result, err := queryPrometheus(c, query, now.Add(-duration), now, prometheusQueryStep) + if err != nil { + return nil, err + } + instanceLabels := make([]string, 0) + m, ok := result.(model.Matrix) + if !ok { + framework.Failf("Expected matrix response for query '%v', got: %T", query, result) + return instanceLabels, nil + } + for _, stream := range m { + if instance, ok = stream.Metric["instance"]; !ok { + continue + } + instanceLabels = append(instanceLabels, string(instance)) + } + return instanceLabels, nil +} + +func fetchPrometheusTargetDiscovery(c clientset.Interface) (TargetDiscovery, error) { + ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout) + defer cancel() + + response, err := c.CoreV1().RESTClient().Get(). + Context(ctx). + Namespace("kube-system"). + Resource("services"). + Name(prometheusService+":9090"). + SubResource("proxy"). + Suffix("api", "v1", "targets"). + Do(). + Raw() + var qres promTargetsResponse + if err != nil { + fmt.Printf(string(response)) + return qres.Data, err + } + err = json.Unmarshal(response, &qres) + + return qres.Data, nil +} + +type promTargetsResponse struct { + Status string `json:"status"` + Data TargetDiscovery `json:"data"` +} + +type TargetDiscovery struct { + ActiveTargets []*Target `json:"activeTargets"` + DroppedTargets []*DroppedTarget `json:"droppedTargets"` +} +type Target struct { + DiscoveredLabels map[string]string `json:"discoveredLabels"` + Labels map[string]string `json:"labels"` + + ScrapeURL string `json:"scrapeUrl"` + + LastError string `json:"lastError"` + LastScrape time.Time `json:"lastScrape"` + Health TargetHealth `json:"health"` +} + +type DroppedTarget struct { + // Labels before any processing. + DiscoveredLabels map[string]string `json:"discoveredLabels"` +} + +const ( + HealthUnknown TargetHealth = "unknown" + HealthGood TargetHealth = "up" + HealthBad TargetHealth = "down" +) + +type TargetHealth string + +func queryPrometheus(c clientset.Interface, query string, start, end time.Time, step time.Duration) (model.Value, error) { + ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout) + defer cancel() + + response, err := c.CoreV1().RESTClient().Get(). + Context(ctx). + Namespace("kube-system"). + Resource("services"). + Name(prometheusService+":9090"). + SubResource("proxy"). + Suffix("api", "v1", "query_range"). + Param("query", query). + Param("start", fmt.Sprintf("%v", start.Unix())). + Param("end", fmt.Sprintf("%v", end.Unix())). + Param("step", fmt.Sprintf("%vs", step.Seconds())). + Do(). + Raw() + if err != nil { + fmt.Printf(string(response)) + return nil, err + } + var qres promQueryResponse + err = json.Unmarshal(response, &qres) + + return model.Value(qres.Data.v), err +} + +type promQueryResponse struct { + Status string `json:"status"` + Data responseData `json:"data"` +} + +type responseData struct { + Type model.ValueType `json:"resultType"` + Result interface{} `json:"result"` + + // The decoded value. + v model.Value +} + +func (qr *responseData) UnmarshalJSON(b []byte) error { + v := struct { + Type model.ValueType `json:"resultType"` + Result json.RawMessage `json:"result"` + }{} + + err := json.Unmarshal(b, &v) + if err != nil { + return err + } + + switch v.Type { + case model.ValScalar: + var sv model.Scalar + err = json.Unmarshal(v.Result, &sv) + qr.v = &sv + + case model.ValVector: + var vv model.Vector + err = json.Unmarshal(v.Result, &vv) + qr.v = vv + + case model.ValMatrix: + var mv model.Matrix + err = json.Unmarshal(v.Result, &mv) + qr.v = mv + + default: + err = fmt.Errorf("unexpected value type %q", v.Type) + } + return err +} + +func retryUntilSucceeds(validator func() error, timeout time.Duration) { + startTime := time.Now() + var err error + for { + err = validator() + if err == nil { + return + } + if time.Since(startTime) >= timeout { + break + } + framework.Logf(err.Error()) + time.Sleep(prometheusSleepBetweenAttempts) + } + framework.Failf(err.Error()) +} + +func getAllNodes(c clientset.Interface) ([]string, error) { + nodeList, err := c.CoreV1().Nodes().List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + result := []string{} + for _, node := range nodeList.Items { + result = append(result, node.Name) + } + return result, nil +} diff --git a/test/utils/runners.go b/test/utils/runners.go index da5dd49f5ff..f81494a14cf 100644 --- a/test/utils/runners.go +++ b/test/utils/runners.go @@ -131,8 +131,9 @@ type RCConfig struct { // Env vars, set the same for every pod. Env map[string]string - // Extra labels added to every pod. - Labels map[string]string + // Extra labels and annotations added to every pod. + Labels map[string]string + Annotations map[string]string // Node selector for pods in the RC. NodeSelector map[string]string @@ -292,7 +293,8 @@ func (config *DeploymentConfig) create() error { }, Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"name": config.Name}, + Labels: map[string]string{"name": config.Name}, + Annotations: config.Annotations, }, Spec: v1.PodSpec{ Containers: []v1.Container{ @@ -362,7 +364,8 @@ func (config *ReplicaSetConfig) create() error { }, Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"name": config.Name}, + Labels: map[string]string{"name": config.Name}, + Annotations: config.Annotations, }, Spec: v1.PodSpec{ Containers: []v1.Container{ @@ -428,7 +431,8 @@ func (config *JobConfig) create() error { Completions: func(i int) *int32 { x := int32(i); return &x }(config.Replicas), Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"name": config.Name}, + Labels: map[string]string{"name": config.Name}, + Annotations: config.Annotations, }, Spec: v1.PodSpec{ Containers: []v1.Container{ @@ -542,7 +546,8 @@ func (config *RCConfig) create() error { }, Template: &v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"name": config.Name}, + Labels: map[string]string{"name": config.Name}, + Annotations: config.Annotations, }, Spec: v1.PodSpec{ Affinity: config.Affinity,