mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
Test e2e prometheus addon
This commit is contained in:
parent
113987e0db
commit
9544222e91
@ -676,6 +676,7 @@ SERVICE_CLUSTER_IP_RANGE: $(yaml-quote ${SERVICE_CLUSTER_IP_RANGE})
|
||||
KUBERNETES_MASTER_NAME: $(yaml-quote ${KUBERNETES_MASTER_NAME})
|
||||
ALLOCATE_NODE_CIDRS: $(yaml-quote ${ALLOCATE_NODE_CIDRS:-false})
|
||||
ENABLE_CLUSTER_MONITORING: $(yaml-quote ${ENABLE_CLUSTER_MONITORING:-none})
|
||||
ENABLE_PROMETHEUS_MONITORING: $(yaml-quote ${ENABLE_PROMETHEUS_MONITORING:-false})
|
||||
ENABLE_METRICS_SERVER: $(yaml-quote ${ENABLE_METRICS_SERVER:-false})
|
||||
ENABLE_METADATA_AGENT: $(yaml-quote ${ENABLE_METADATA_AGENT:-none})
|
||||
METADATA_AGENT_VERSION: $(yaml-quote ${METADATA_AGENT_VERSION:-})
|
||||
|
@ -160,6 +160,7 @@ export PATH=$(dirname "${e2e_test}"):"${PATH}"
|
||||
--node-tag="${NODE_TAG:-}" \
|
||||
--master-tag="${MASTER_TAG:-}" \
|
||||
--cluster-monitoring-mode="${KUBE_ENABLE_CLUSTER_MONITORING:-influxdb}" \
|
||||
--prometheus-monitoring="${KUBE_ENABLE_PROMETHEUS_MONITORING:-false}" \
|
||||
${KUBE_CONTAINER_RUNTIME:+"--container-runtime=${KUBE_CONTAINER_RUNTIME}"} \
|
||||
${MASTER_OS_DISTRIBUTION:+"--master-os-distro=${MASTER_OS_DISTRIBUTION}"} \
|
||||
${NODE_OS_DISTRIBUTION:+"--node-os-distro=${NODE_OS_DISTRIBUTION}"} \
|
||||
|
@ -108,13 +108,18 @@ func GetResourceConsumerImage() string {
|
||||
|
||||
func NewDynamicResourceConsumer(name, nsName string, kind schema.GroupVersionKind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric int, cpuLimit, memLimit int64, clientset clientset.Interface, internalClientset *internalclientset.Clientset, scaleClient scaleclient.ScalesGetter) *ResourceConsumer {
|
||||
return newResourceConsumer(name, nsName, kind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric, dynamicConsumptionTimeInSeconds,
|
||||
dynamicRequestSizeInMillicores, dynamicRequestSizeInMegabytes, dynamicRequestSizeCustomMetric, cpuLimit, memLimit, clientset, internalClientset, scaleClient)
|
||||
dynamicRequestSizeInMillicores, dynamicRequestSizeInMegabytes, dynamicRequestSizeCustomMetric, cpuLimit, memLimit, clientset, internalClientset, scaleClient, nil, nil)
|
||||
}
|
||||
|
||||
// TODO this still defaults to replication controller
|
||||
func NewStaticResourceConsumer(name, nsName string, replicas, initCPUTotal, initMemoryTotal, initCustomMetric int, cpuLimit, memLimit int64, clientset clientset.Interface, internalClientset *internalclientset.Clientset, scaleClient scaleclient.ScalesGetter) *ResourceConsumer {
|
||||
return newResourceConsumer(name, nsName, KindRC, replicas, initCPUTotal, initMemoryTotal, initCustomMetric, staticConsumptionTimeInSeconds,
|
||||
initCPUTotal/replicas, initMemoryTotal/replicas, initCustomMetric/replicas, cpuLimit, memLimit, clientset, internalClientset, scaleClient)
|
||||
initCPUTotal/replicas, initMemoryTotal/replicas, initCustomMetric/replicas, cpuLimit, memLimit, clientset, internalClientset, scaleClient, nil, nil)
|
||||
}
|
||||
|
||||
func NewMetricExporter(name, nsName string, podAnnotations, serviceAnnotations map[string]string, metricValue int, clientset clientset.Interface, internalClientset *internalclientset.Clientset, scaleClient scaleclient.ScalesGetter) *ResourceConsumer {
|
||||
return newResourceConsumer(name, nsName, KindDeployment, 1, 0, 0, metricValue, dynamicConsumptionTimeInSeconds,
|
||||
dynamicRequestSizeInMillicores, dynamicRequestSizeInMegabytes, dynamicRequestSizeCustomMetric, 100, 100, clientset, internalClientset, scaleClient, podAnnotations, serviceAnnotations)
|
||||
}
|
||||
|
||||
/*
|
||||
@ -125,9 +130,14 @@ memLimit argument is in megabytes, memLimit is a maximum amount of memory that c
|
||||
cpuLimit argument is in millicores, cpuLimit is a maximum amount of cpu that can be consumed by a single pod
|
||||
*/
|
||||
func newResourceConsumer(name, nsName string, kind schema.GroupVersionKind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric, consumptionTimeInSeconds, requestSizeInMillicores,
|
||||
requestSizeInMegabytes int, requestSizeCustomMetric int, cpuLimit, memLimit int64, clientset clientset.Interface, internalClientset *internalclientset.Clientset, scaleClient scaleclient.ScalesGetter) *ResourceConsumer {
|
||||
|
||||
runServiceAndWorkloadForResourceConsumer(clientset, internalClientset, nsName, name, kind, replicas, cpuLimit, memLimit)
|
||||
requestSizeInMegabytes int, requestSizeCustomMetric int, cpuLimit, memLimit int64, clientset clientset.Interface, internalClientset *internalclientset.Clientset, scaleClient scaleclient.ScalesGetter, podAnnotations, serviceAnnotations map[string]string) *ResourceConsumer {
|
||||
if podAnnotations == nil {
|
||||
podAnnotations = make(map[string]string)
|
||||
}
|
||||
if serviceAnnotations == nil {
|
||||
serviceAnnotations = make(map[string]string)
|
||||
}
|
||||
runServiceAndWorkloadForResourceConsumer(clientset, internalClientset, nsName, name, kind, replicas, cpuLimit, memLimit, podAnnotations, serviceAnnotations)
|
||||
rc := &ResourceConsumer{
|
||||
name: name,
|
||||
controllerName: name + "-ctrl",
|
||||
@ -227,7 +237,7 @@ func (rc *ResourceConsumer) makeConsumeCustomMetric() {
|
||||
delta := 0
|
||||
for {
|
||||
select {
|
||||
case delta := <-rc.customMetric:
|
||||
case delta = <-rc.customMetric:
|
||||
framework.Logf("RC %s: setting bump of metric %s to %d in total", rc.name, customMetricName, delta)
|
||||
case <-time.After(sleepTime):
|
||||
framework.Logf("RC %s: sending request to consume %d of custom metric %s", rc.name, delta, customMetricName)
|
||||
@ -410,11 +420,12 @@ func (rc *ResourceConsumer) CleanUp() {
|
||||
framework.ExpectNoError(rc.clientSet.CoreV1().Services(rc.nsName).Delete(rc.controllerName, nil))
|
||||
}
|
||||
|
||||
func runServiceAndWorkloadForResourceConsumer(c clientset.Interface, internalClient internalclientset.Interface, ns, name string, kind schema.GroupVersionKind, replicas int, cpuLimitMillis, memLimitMb int64) {
|
||||
func runServiceAndWorkloadForResourceConsumer(c clientset.Interface, internalClient internalclientset.Interface, ns, name string, kind schema.GroupVersionKind, replicas int, cpuLimitMillis, memLimitMb int64, podAnnotations, serviceAnnotations map[string]string) {
|
||||
By(fmt.Sprintf("Running consuming RC %s via %s with %v replicas", name, kind, replicas))
|
||||
_, err := c.CoreV1().Services(ns).Create(&v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Name: name,
|
||||
Annotations: serviceAnnotations,
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
Ports: []v1.ServicePort{{
|
||||
@ -441,6 +452,7 @@ func runServiceAndWorkloadForResourceConsumer(c clientset.Interface, internalCli
|
||||
CpuLimit: cpuLimitMillis,
|
||||
MemRequest: memLimitMb * 1024 * 1024, // MemLimit is in bytes
|
||||
MemLimit: memLimitMb * 1024 * 1024,
|
||||
Annotations: podAnnotations,
|
||||
}
|
||||
|
||||
switch kind {
|
||||
|
@ -112,6 +112,8 @@ type TestContextType struct {
|
||||
NodeTestContextType
|
||||
// Monitoring solution that is used in current cluster.
|
||||
ClusterMonitoringMode string
|
||||
// Separate Prometheus monitoring deployed in cluster
|
||||
EnablePrometheusMonitoring bool
|
||||
|
||||
// Indicates what path the kubernetes-anywhere is installed on
|
||||
KubernetesAnywherePath string
|
||||
@ -237,6 +239,7 @@ func RegisterClusterFlags() {
|
||||
flag.StringVar(&TestContext.MasterOSDistro, "master-os-distro", "debian", "The OS distribution of cluster master (debian, trusty, or coreos).")
|
||||
flag.StringVar(&TestContext.NodeOSDistro, "node-os-distro", "debian", "The OS distribution of cluster VM instances (debian, trusty, or coreos).")
|
||||
flag.StringVar(&TestContext.ClusterMonitoringMode, "cluster-monitoring-mode", "influxdb", "The monitoring solution that is used in the cluster.")
|
||||
flag.BoolVar(&TestContext.EnablePrometheusMonitoring, "prometheus-monitoring", false, "Separate Prometheus monitoring deployed in cluster.")
|
||||
|
||||
// TODO: Flags per provider? Rename gce-project/gce-zone?
|
||||
cloudConfig := &TestContext.CloudConfig
|
||||
|
@ -364,6 +364,12 @@ func SkipUnlessClusterMonitoringModeIs(supportedMonitoring ...string) {
|
||||
}
|
||||
}
|
||||
|
||||
func SkipUnlessPrometheusMonitoringIsEnabled(supportedMonitoring ...string) {
|
||||
if !TestContext.EnablePrometheusMonitoring {
|
||||
Skipf("Skipped because prometheus monitoring is not enabled")
|
||||
}
|
||||
}
|
||||
|
||||
func SkipUnlessMasterOSDistroIs(supportedMasterOsDistros ...string) {
|
||||
if !MasterOSDistroIs(supportedMasterOsDistros...) {
|
||||
Skipf("Only supported for master OS distro %v (not %s)", supportedMasterOsDistros, TestContext.MasterOSDistro)
|
||||
|
@ -14,6 +14,7 @@ go_library(
|
||||
"custom_metrics_stackdriver.go",
|
||||
"influxdb.go",
|
||||
"metrics_grabber.go",
|
||||
"prometheus.go",
|
||||
"stackdriver.go",
|
||||
"stackdriver_metadata_agent.go",
|
||||
],
|
||||
@ -28,6 +29,7 @@ go_library(
|
||||
"//vendor/github.com/influxdata/influxdb/client/v2:go_default_library",
|
||||
"//vendor/github.com/onsi/ginkgo:go_default_library",
|
||||
"//vendor/github.com/onsi/gomega:go_default_library",
|
||||
"//vendor/github.com/prometheus/common/model:go_default_library",
|
||||
"//vendor/golang.org/x/oauth2/google:go_default_library",
|
||||
"//vendor/google.golang.org/api/monitoring/v3:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
|
382
test/e2e/instrumentation/monitoring/prometheus.go
Normal file
382
test/e2e/instrumentation/monitoring/prometheus.go
Normal file
@ -0,0 +1,382 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package monitoring
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/kubernetes/test/e2e/common"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common"
|
||||
)
|
||||
|
||||
const (
|
||||
prometheusQueryStep = time.Minute
|
||||
prometheusMetricErrorTolerance = 0.25
|
||||
prometheusMetricValidationDuration = time.Minute * 2
|
||||
prometheusRate = time.Minute * 2
|
||||
prometheusRequiredNodesUpDuration = time.Minute * 5
|
||||
prometheusService = "prometheus"
|
||||
prometheusSleepBetweenAttempts = time.Second * 30
|
||||
prometheusTestTimeout = time.Minute * 5
|
||||
customMetricValue = 1000
|
||||
targetCPUUsage = 0.1
|
||||
)
|
||||
|
||||
var _ = instrumentation.SIGDescribe("[Feature:PrometheusMonitoring] Prometheus", func() {
|
||||
BeforeEach(func() {
|
||||
framework.SkipUnlessPrometheusMonitoringIsEnabled()
|
||||
})
|
||||
|
||||
f := framework.NewDefaultFramework("prometheus-monitoring")
|
||||
It("should scrape container metrics from all nodes.", func() {
|
||||
expectedNodes, err := getAllNodes(f.ClientSet)
|
||||
framework.ExpectNoError(err)
|
||||
retryUntilSucceeds(func() error {
|
||||
return validateMetricAvailableForAllNodes(f.ClientSet, `container_cpu_usage_seconds_total`, expectedNodes)
|
||||
}, prometheusTestTimeout)
|
||||
})
|
||||
It("should successfully scrape all targets", func() {
|
||||
retryUntilSucceeds(func() error {
|
||||
return validateAllActiveTargetsAreHealthy(f.ClientSet)
|
||||
}, prometheusTestTimeout)
|
||||
})
|
||||
It("should contain correct container CPU metric.", func() {
|
||||
query := prometheusCPUQuery(f.Namespace.Name, "prometheus-cpu-consumer", prometheusRate)
|
||||
consumer := consumeCPUResources(f, "prometheus-cpu-consumer", targetCPUUsage*1000)
|
||||
defer consumer.CleanUp()
|
||||
retryUntilSucceeds(func() error {
|
||||
return validateQueryReturnsCorrectValues(f.ClientSet, query, targetCPUUsage, 3, prometheusMetricErrorTolerance)
|
||||
}, prometheusTestTimeout)
|
||||
})
|
||||
It("should scrape metrics from annotated pods.", func() {
|
||||
query := prometheusPodCustomMetricQuery(f.Namespace.Name, "prometheus-custom-pod-metric")
|
||||
consumer := exportCustomMetricFromPod(f, "prometheus-custom-pod-metric", customMetricValue)
|
||||
defer consumer.CleanUp()
|
||||
retryUntilSucceeds(func() error {
|
||||
return validateQueryReturnsCorrectValues(f.ClientSet, query, customMetricValue, 1, prometheusMetricErrorTolerance)
|
||||
}, prometheusTestTimeout)
|
||||
})
|
||||
It("should scrape metrics from annotated services.", func() {
|
||||
query := prometheusServiceCustomMetricQuery(f.Namespace.Name, "prometheus-custom-service-metric")
|
||||
consumer := exportCustomMetricFromService(f, "prometheus-custom-service-metric", customMetricValue)
|
||||
defer consumer.CleanUp()
|
||||
retryUntilSucceeds(func() error {
|
||||
return validateQueryReturnsCorrectValues(f.ClientSet, query, customMetricValue, 1, prometheusMetricErrorTolerance)
|
||||
}, prometheusTestTimeout)
|
||||
})
|
||||
})
|
||||
|
||||
func prometheusCPUQuery(namespace, podNamePrefix string, rate time.Duration) string {
|
||||
return fmt.Sprintf(`sum(irate(container_cpu_usage_seconds_total{namespace="%v",pod_name=~"%v.*"}[%vm]))`,
|
||||
namespace, podNamePrefix, int64(rate.Minutes()))
|
||||
}
|
||||
|
||||
func prometheusServiceCustomMetricQuery(namespace, service string) string {
|
||||
return fmt.Sprintf(`sum(QPS{kubernetes_namespace="%v",kubernetes_name="%v"})`, namespace, service)
|
||||
}
|
||||
|
||||
func prometheusPodCustomMetricQuery(namespace, podNamePrefix string) string {
|
||||
return fmt.Sprintf(`sum(QPS{kubernetes_namespace="%s",kubernetes_pod_name=~"%s.*"})`, namespace, podNamePrefix)
|
||||
}
|
||||
|
||||
func consumeCPUResources(f *framework.Framework, consumerName string, cpuUsage int) *common.ResourceConsumer {
|
||||
return common.NewDynamicResourceConsumer(consumerName, f.Namespace.Name, common.KindDeployment, 1, cpuUsage,
|
||||
memoryUsed, 0, int64(cpuUsage), memoryLimit, f.ClientSet, f.InternalClientset, f.ScalesGetter)
|
||||
}
|
||||
|
||||
func exportCustomMetricFromPod(f *framework.Framework, consumerName string, metricValue int) *common.ResourceConsumer {
|
||||
podAnnotations := map[string]string{
|
||||
"prometheus.io/scrape": "true",
|
||||
"prometheus.io/path": "/Metrics",
|
||||
"prometheus.io/port": "8080",
|
||||
}
|
||||
return common.NewMetricExporter(consumerName, f.Namespace.Name, podAnnotations, nil, metricValue, f.ClientSet, f.InternalClientset, f.ScalesGetter)
|
||||
}
|
||||
|
||||
func exportCustomMetricFromService(f *framework.Framework, consumerName string, metricValue int) *common.ResourceConsumer {
|
||||
serviceAnnotations := map[string]string{
|
||||
"prometheus.io/scrape": "true",
|
||||
"prometheus.io/path": "/Metrics",
|
||||
"prometheus.io/port": "8080",
|
||||
}
|
||||
return common.NewMetricExporter(consumerName, f.Namespace.Name, nil, serviceAnnotations, metricValue, f.ClientSet, f.InternalClientset, f.ScalesGetter)
|
||||
}
|
||||
|
||||
func validateMetricAvailableForAllNodes(c clientset.Interface, metric string, expectedNodesNames []string) error {
|
||||
instanceLabels, err := getInstanceLabelsAvailableForMetric(c, prometheusRequiredNodesUpDuration, metric)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
nodesWithMetric := make(map[string]bool)
|
||||
for _, instance := range instanceLabels {
|
||||
nodesWithMetric[instance] = true
|
||||
}
|
||||
missedNodesCount := 0
|
||||
for _, nodeName := range expectedNodesNames {
|
||||
if _, found := nodesWithMetric[nodeName]; !found {
|
||||
missedNodesCount++
|
||||
}
|
||||
}
|
||||
if missedNodesCount > 0 {
|
||||
return fmt.Errorf("Metric not found for %v out of %v nodes", missedNodesCount, len(expectedNodesNames))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateAllActiveTargetsAreHealthy(c clientset.Interface) error {
|
||||
discovery, err := fetchPrometheusTargetDiscovery(c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(discovery.ActiveTargets) == 0 {
|
||||
return fmt.Errorf("Prometheus is not scraping any targets, at least one target is required")
|
||||
}
|
||||
for _, target := range discovery.ActiveTargets {
|
||||
if target.Health != HealthGood {
|
||||
return fmt.Errorf("Target health not good. Target: %v", target)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateQueryReturnsCorrectValues(c clientset.Interface, query string, expectedValue float64, minSamplesCount int, errorTolerance float64) error {
|
||||
samples, err := fetchQueryValues(c, query, prometheusMetricValidationDuration)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(samples) < minSamplesCount {
|
||||
return fmt.Errorf("Not enough samples for query '%v', got %v", query, samples)
|
||||
}
|
||||
framework.Logf("Executed query '%v' returned %v", query, samples)
|
||||
for _, value := range samples {
|
||||
error := math.Abs(value-expectedValue) / expectedValue
|
||||
if error >= errorTolerance {
|
||||
return fmt.Errorf("Query result values outside expected value tolerance. Expected error below %v, got %v", errorTolerance, error)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func fetchQueryValues(c clientset.Interface, query string, duration time.Duration) ([]float64, error) {
|
||||
now := time.Now()
|
||||
response, err := queryPrometheus(c, query, now.Add(-duration), now, prometheusQueryStep)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m, ok := response.(model.Matrix)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Expected matric response, got: %T", response)
|
||||
}
|
||||
values := make([]float64, 0)
|
||||
for _, stream := range m {
|
||||
for _, sample := range stream.Values {
|
||||
values = append(values, float64(sample.Value))
|
||||
}
|
||||
}
|
||||
return values, nil
|
||||
}
|
||||
|
||||
func getInstanceLabelsAvailableForMetric(c clientset.Interface, duration time.Duration, metric string) ([]string, error) {
|
||||
var instance model.LabelValue
|
||||
now := time.Now()
|
||||
query := fmt.Sprintf(`sum(%v)by(instance)`, metric)
|
||||
result, err := queryPrometheus(c, query, now.Add(-duration), now, prometheusQueryStep)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
instanceLabels := make([]string, 0)
|
||||
m, ok := result.(model.Matrix)
|
||||
if !ok {
|
||||
framework.Failf("Expected matrix response for query '%v', got: %T", query, result)
|
||||
return instanceLabels, nil
|
||||
}
|
||||
for _, stream := range m {
|
||||
if instance, ok = stream.Metric["instance"]; !ok {
|
||||
continue
|
||||
}
|
||||
instanceLabels = append(instanceLabels, string(instance))
|
||||
}
|
||||
return instanceLabels, nil
|
||||
}
|
||||
|
||||
func fetchPrometheusTargetDiscovery(c clientset.Interface) (TargetDiscovery, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
|
||||
defer cancel()
|
||||
|
||||
response, err := c.CoreV1().RESTClient().Get().
|
||||
Context(ctx).
|
||||
Namespace("kube-system").
|
||||
Resource("services").
|
||||
Name(prometheusService+":9090").
|
||||
SubResource("proxy").
|
||||
Suffix("api", "v1", "targets").
|
||||
Do().
|
||||
Raw()
|
||||
var qres promTargetsResponse
|
||||
if err != nil {
|
||||
fmt.Printf(string(response))
|
||||
return qres.Data, err
|
||||
}
|
||||
err = json.Unmarshal(response, &qres)
|
||||
|
||||
return qres.Data, nil
|
||||
}
|
||||
|
||||
type promTargetsResponse struct {
|
||||
Status string `json:"status"`
|
||||
Data TargetDiscovery `json:"data"`
|
||||
}
|
||||
|
||||
type TargetDiscovery struct {
|
||||
ActiveTargets []*Target `json:"activeTargets"`
|
||||
DroppedTargets []*DroppedTarget `json:"droppedTargets"`
|
||||
}
|
||||
type Target struct {
|
||||
DiscoveredLabels map[string]string `json:"discoveredLabels"`
|
||||
Labels map[string]string `json:"labels"`
|
||||
|
||||
ScrapeURL string `json:"scrapeUrl"`
|
||||
|
||||
LastError string `json:"lastError"`
|
||||
LastScrape time.Time `json:"lastScrape"`
|
||||
Health TargetHealth `json:"health"`
|
||||
}
|
||||
|
||||
type DroppedTarget struct {
|
||||
// Labels before any processing.
|
||||
DiscoveredLabels map[string]string `json:"discoveredLabels"`
|
||||
}
|
||||
|
||||
const (
|
||||
HealthUnknown TargetHealth = "unknown"
|
||||
HealthGood TargetHealth = "up"
|
||||
HealthBad TargetHealth = "down"
|
||||
)
|
||||
|
||||
type TargetHealth string
|
||||
|
||||
func queryPrometheus(c clientset.Interface, query string, start, end time.Time, step time.Duration) (model.Value, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
|
||||
defer cancel()
|
||||
|
||||
response, err := c.CoreV1().RESTClient().Get().
|
||||
Context(ctx).
|
||||
Namespace("kube-system").
|
||||
Resource("services").
|
||||
Name(prometheusService+":9090").
|
||||
SubResource("proxy").
|
||||
Suffix("api", "v1", "query_range").
|
||||
Param("query", query).
|
||||
Param("start", fmt.Sprintf("%v", start.Unix())).
|
||||
Param("end", fmt.Sprintf("%v", end.Unix())).
|
||||
Param("step", fmt.Sprintf("%vs", step.Seconds())).
|
||||
Do().
|
||||
Raw()
|
||||
if err != nil {
|
||||
fmt.Printf(string(response))
|
||||
return nil, err
|
||||
}
|
||||
var qres promQueryResponse
|
||||
err = json.Unmarshal(response, &qres)
|
||||
|
||||
return model.Value(qres.Data.v), err
|
||||
}
|
||||
|
||||
type promQueryResponse struct {
|
||||
Status string `json:"status"`
|
||||
Data responseData `json:"data"`
|
||||
}
|
||||
|
||||
type responseData struct {
|
||||
Type model.ValueType `json:"resultType"`
|
||||
Result interface{} `json:"result"`
|
||||
|
||||
// The decoded value.
|
||||
v model.Value
|
||||
}
|
||||
|
||||
func (qr *responseData) UnmarshalJSON(b []byte) error {
|
||||
v := struct {
|
||||
Type model.ValueType `json:"resultType"`
|
||||
Result json.RawMessage `json:"result"`
|
||||
}{}
|
||||
|
||||
err := json.Unmarshal(b, &v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch v.Type {
|
||||
case model.ValScalar:
|
||||
var sv model.Scalar
|
||||
err = json.Unmarshal(v.Result, &sv)
|
||||
qr.v = &sv
|
||||
|
||||
case model.ValVector:
|
||||
var vv model.Vector
|
||||
err = json.Unmarshal(v.Result, &vv)
|
||||
qr.v = vv
|
||||
|
||||
case model.ValMatrix:
|
||||
var mv model.Matrix
|
||||
err = json.Unmarshal(v.Result, &mv)
|
||||
qr.v = mv
|
||||
|
||||
default:
|
||||
err = fmt.Errorf("unexpected value type %q", v.Type)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func retryUntilSucceeds(validator func() error, timeout time.Duration) {
|
||||
startTime := time.Now()
|
||||
var err error
|
||||
for {
|
||||
err = validator()
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
if time.Since(startTime) >= timeout {
|
||||
break
|
||||
}
|
||||
framework.Logf(err.Error())
|
||||
time.Sleep(prometheusSleepBetweenAttempts)
|
||||
}
|
||||
framework.Failf(err.Error())
|
||||
}
|
||||
|
||||
func getAllNodes(c clientset.Interface) ([]string, error) {
|
||||
nodeList, err := c.CoreV1().Nodes().List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result := []string{}
|
||||
for _, node := range nodeList.Items {
|
||||
result = append(result, node.Name)
|
||||
}
|
||||
return result, nil
|
||||
}
|
@ -131,8 +131,9 @@ type RCConfig struct {
|
||||
// Env vars, set the same for every pod.
|
||||
Env map[string]string
|
||||
|
||||
// Extra labels added to every pod.
|
||||
Labels map[string]string
|
||||
// Extra labels and annotations added to every pod.
|
||||
Labels map[string]string
|
||||
Annotations map[string]string
|
||||
|
||||
// Node selector for pods in the RC.
|
||||
NodeSelector map[string]string
|
||||
@ -292,7 +293,8 @@ func (config *DeploymentConfig) create() error {
|
||||
},
|
||||
Template: v1.PodTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: map[string]string{"name": config.Name},
|
||||
Labels: map[string]string{"name": config.Name},
|
||||
Annotations: config.Annotations,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
@ -362,7 +364,8 @@ func (config *ReplicaSetConfig) create() error {
|
||||
},
|
||||
Template: v1.PodTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: map[string]string{"name": config.Name},
|
||||
Labels: map[string]string{"name": config.Name},
|
||||
Annotations: config.Annotations,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
@ -428,7 +431,8 @@ func (config *JobConfig) create() error {
|
||||
Completions: func(i int) *int32 { x := int32(i); return &x }(config.Replicas),
|
||||
Template: v1.PodTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: map[string]string{"name": config.Name},
|
||||
Labels: map[string]string{"name": config.Name},
|
||||
Annotations: config.Annotations,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
@ -542,7 +546,8 @@ func (config *RCConfig) create() error {
|
||||
},
|
||||
Template: &v1.PodTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: map[string]string{"name": config.Name},
|
||||
Labels: map[string]string{"name": config.Name},
|
||||
Annotations: config.Annotations,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Affinity: config.Affinity,
|
||||
|
Loading…
Reference in New Issue
Block a user