Merge pull request #62195 from serathius/prometheus

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Add prometheus cluster monitoring addon.

This PR adds new cluster monitoring addon based on prometheus.
It adds prometheus deployment with e2e tests.
Additional components will be added iterativly in future.
Manifests based on current Helm chart.
At current state it's not intended for production use.

cc @piosz @kawych @miekg
```release-note
Add prometheus cluster monitoring addon to kube-up
```
/sig instrumentation
/kind feature
/priority important-soon
This commit is contained in:
Kubernetes Submit Queue 2018-04-18 02:17:48 -07:00 committed by GitHub
commit bb8f58b6e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 792 additions and 14 deletions

View File

@ -0,0 +1,169 @@
# Prometheus configuration format https://prometheus.io/docs/prometheus/latest/configuration/configuration/
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-config
namespace: kube-system
labels:
kubernetes.io/cluster-service: "true"
addonmanager.kubernetes.io/mode: EnsureExists
data:
prometheus.yml: |
scrape_configs:
- job_name: prometheus
static_configs:
- targets:
- localhost:9090
- job_name: kubernetes-apiservers
kubernetes_sd_configs:
- role: endpoints
relabel_configs:
- action: keep
regex: default;kubernetes;https
source_labels:
- __meta_kubernetes_namespace
- __meta_kubernetes_service_name
- __meta_kubernetes_endpoint_port_name
scheme: https
tls_config:
ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
insecure_skip_verify: true
bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token
- job_name: kubernetes-nodes-kubelet
kubernetes_sd_configs:
- role: node
relabel_configs:
- action: labelmap
regex: __meta_kubernetes_node_label_(.+)
- source_labels: [__address__]
action: replace
target_label: __address__
regex: ([^:;]+):(\d+)
replacement: ${1}:10255
- job_name: kubernetes-nodes-cadvisor
kubernetes_sd_configs:
- role: node
relabel_configs:
- action: labelmap
regex: __meta_kubernetes_node_label_(.+)
- source_labels: [__address__]
action: replace
target_label: __address__
regex: ([^:;]+):(\d+)
replacement: ${1}:4194
- job_name: kubernetes-service-endpoints
kubernetes_sd_configs:
- role: endpoints
relabel_configs:
- action: keep
regex: true
source_labels:
- __meta_kubernetes_service_annotation_prometheus_io_scrape
- action: replace
regex: (https?)
source_labels:
- __meta_kubernetes_service_annotation_prometheus_io_scheme
target_label: __scheme__
- action: replace
regex: (.+)
source_labels:
- __meta_kubernetes_service_annotation_prometheus_io_path
target_label: __metrics_path__
- action: replace
regex: ([^:]+)(?::\d+)?;(\d+)
replacement: $1:$2
source_labels:
- __address__
- __meta_kubernetes_service_annotation_prometheus_io_port
target_label: __address__
- action: labelmap
regex: __meta_kubernetes_service_label_(.+)
- action: replace
source_labels:
- __meta_kubernetes_namespace
target_label: kubernetes_namespace
- action: replace
source_labels:
- __meta_kubernetes_service_name
target_label: kubernetes_name
- job_name: kubernetes-services
kubernetes_sd_configs:
- role: service
metrics_path: /probe
params:
module:
- http_2xx
relabel_configs:
- action: keep
regex: true
source_labels:
- __meta_kubernetes_service_annotation_prometheus_io_probe
- source_labels:
- __address__
target_label: __param_target
- replacement: blackbox
target_label: __address__
- source_labels:
- __param_target
target_label: instance
- action: labelmap
regex: __meta_kubernetes_service_label_(.+)
- source_labels:
- __meta_kubernetes_namespace
target_label: kubernetes_namespace
- source_labels:
- __meta_kubernetes_service_name
target_label: kubernetes_name
- job_name: kubernetes-pods
kubernetes_sd_configs:
- role: pod
relabel_configs:
- action: keep
regex: true
source_labels:
- __meta_kubernetes_pod_annotation_prometheus_io_scrape
- action: replace
regex: (.+)
source_labels:
- __meta_kubernetes_pod_annotation_prometheus_io_path
target_label: __metrics_path__
- action: replace
regex: ([^:]+)(?::\d+)?;(\d+)
replacement: $1:$2
source_labels:
- __address__
- __meta_kubernetes_pod_annotation_prometheus_io_port
target_label: __address__
- action: labelmap
regex: __meta_kubernetes_pod_label_(.+)
- action: replace
source_labels:
- __meta_kubernetes_namespace
target_label: kubernetes_namespace
- action: replace
source_labels:
- __meta_kubernetes_pod_name
target_label: kubernetes_pod_name
alerting:
alertmanagers:
- kubernetes_sd_configs:
- role: pod
tls_config:
ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token
relabel_configs:
- source_labels: [__meta_kubernetes_namespace]
regex: kube-system
action: keep
- source_labels: [__meta_kubernetes_pod_label_k8s_app]
regex: alertmanager
action: keep
- source_labels: [__meta_kubernetes_pod_container_port_number]
regex:
action: drop

View File

@ -0,0 +1,100 @@
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: prometheus
namespace: kube-system
labels:
k8s-app: prometheus
kubernetes.io/cluster-service: "true"
addonmanager.kubernetes.io/mode: Reconcile
version: v2.2.1
spec:
replicas: 1
selector:
matchLabels:
k8s-app: prometheus
version: v2.2.1
template:
metadata:
labels:
k8s-app: prometheus
version: v2.2.1
annotations:
scheduler.alpha.kubernetes.io/critical-pod: ''
spec:
priorityClassName: system-cluster-critical
serviceAccountName: prometheus
initContainers:
- name: "init-chown-data"
image: "busybox:latest"
imagePullPolicy: "IfNotPresent"
command: ["chown", "-R", "65534:65534", "/data"]
volumeMounts:
- name: storage-volume
mountPath: /data
subPath: ""
containers:
- name: prometheus-server-configmap-reload
image: "jimmidyson/configmap-reload:v0.1"
imagePullPolicy: "IfNotPresent"
args:
- --volume-dir=/etc/config
- --webhook-url=http://localhost:9090/-/reload
volumeMounts:
- name: config-volume
mountPath: /etc/config
readOnly: true
resources:
limits:
cpu: 10m
memory: 10Mi
requests:
cpu: 10m
memory: 10Mi
- name: prometheus-server
image: "prom/prometheus:v2.2.1"
imagePullPolicy: "IfNotPresent"
args:
- --config.file=/etc/config/prometheus.yml
- --storage.tsdb.path=/data
- --web.console.libraries=/etc/prometheus/console_libraries
- --web.console.templates=/etc/prometheus/consoles
- --web.enable-lifecycle
ports:
- containerPort: 9090
readinessProbe:
httpGet:
path: /-/ready
port: 9090
initialDelaySeconds: 30
timeoutSeconds: 30
livenessProbe:
httpGet:
path: /-/healthy
port: 9090
initialDelaySeconds: 30
timeoutSeconds: 30
# based on 10 running nodes with 30 pods each
resources:
limits:
cpu: 200m
memory: 1000Mi
requests:
cpu: 200m
memory: 1000Mi
volumeMounts:
- name: config-volume
mountPath: /etc/config
- name: storage-volume
mountPath: /data
subPath: ""
terminationGracePeriodSeconds: 300
volumes:
- name: config-volume
configMap:
name: prometheus-config
- name: storage-volume
persistentVolumeClaim:
claimName: prometheus

View File

@ -0,0 +1,15 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: prometheus
namespace: kube-system
labels:
kubernetes.io/cluster-service: "true"
addonmanager.kubernetes.io/mode: Reconcile
spec:
storageClassName: standard
accessModes:
- ReadWriteOnce
resources:
requests:
storage: "16Gi"

View File

@ -0,0 +1,54 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: prometheus
namespace: kube-system
labels:
kubernetes.io/cluster-service: "true"
addonmanager.kubernetes.io/mode: Reconcile
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
name: prometheus
labels:
kubernetes.io/cluster-service: "true"
addonmanager.kubernetes.io/mode: Reconcile
rules:
- apiGroups:
- ""
resources:
- nodes
- services
- endpoints
- pods
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- configmaps
verbs:
- get
- nonResourceURLs:
- "/metrics"
verbs:
- get
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: prometheus
labels:
kubernetes.io/cluster-service: "true"
addonmanager.kubernetes.io/mode: Reconcile
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: prometheus
subjects:
- kind: ServiceAccount
name: prometheus
namespace: kube-system

View File

@ -0,0 +1,17 @@
kind: Service
apiVersion: v1
metadata:
name: prometheus
namespace: kube-system
labels:
kubernetes.io/name: "Prometheus"
kubernetes.io/cluster-service: "true"
addonmanager.kubernetes.io/mode: Reconcile
spec:
ports:
- name: http
port: 9090
protocol: TCP
targetPort: 9090
selector:
k8s-app: prometheus

View File

@ -144,6 +144,9 @@ ENABLE_L7_LOADBALANCING="${KUBE_ENABLE_L7_LOADBALANCING:-glbc}"
# standalone - Heapster only. Metrics available via Heapster REST API.
ENABLE_CLUSTER_MONITORING="${KUBE_ENABLE_CLUSTER_MONITORING:-influxdb}"
# Optional: Enable deploying separate prometheus stack for monitoring kubernetes cluster
ENABLE_PROMETHEUS_MONITORING="${KUBE_ENABLE_PROMETHEUS_MONITORING:-false}"
# Optional: Enable Metrics Server. Metrics Server should be enable everywhere,
# since it's a critical component, but in the first release we need a way to disable
# this in case of stability issues.

View File

@ -146,6 +146,9 @@ ENABLE_L7_LOADBALANCING="${KUBE_ENABLE_L7_LOADBALANCING:-glbc}"
# standalone - Heapster only. Metrics available via Heapster REST API.
ENABLE_CLUSTER_MONITORING="${KUBE_ENABLE_CLUSTER_MONITORING:-influxdb}"
# Optional: Enable deploying separate prometheus stack for monitoring kubernetes cluster
ENABLE_PROMETHEUS_MONITORING="${KUBE_ENABLE_PROMETHEUS_MONITORING:-false}"
# Optional: Enable Metrics Server. Metrics Server should be enable everywhere,
# since it's a critical component, but in the first release we need a way to disable
# this in case of stability issues.

View File

@ -2214,6 +2214,11 @@ EOF
prepare-kube-proxy-manifest-variables "$src_dir/kube-proxy/kube-proxy-ds.yaml"
setup-addon-manifests "addons" "kube-proxy"
fi
# Setup prometheus stack for monitoring kubernetes cluster
if [[ "${ENABLE_PROMETHEUS_MONITORING:-}" == "true" ]]; then
setup-addon-manifests "addons" "prometheus"
fi
# Setup cluster monitoring using heapster
if [[ "${ENABLE_CLUSTER_MONITORING:-}" == "influxdb" ]] || \
[[ "${ENABLE_CLUSTER_MONITORING:-}" == "google" ]] || \
[[ "${ENABLE_CLUSTER_MONITORING:-}" == "stackdriver" ]] || \

View File

@ -812,6 +812,7 @@ SERVICE_CLUSTER_IP_RANGE: $(yaml-quote ${SERVICE_CLUSTER_IP_RANGE})
KUBERNETES_MASTER_NAME: $(yaml-quote ${KUBERNETES_MASTER_NAME})
ALLOCATE_NODE_CIDRS: $(yaml-quote ${ALLOCATE_NODE_CIDRS:-false})
ENABLE_CLUSTER_MONITORING: $(yaml-quote ${ENABLE_CLUSTER_MONITORING:-none})
ENABLE_PROMETHEUS_MONITORING: $(yaml-quote ${ENABLE_PROMETHEUS_MONITORING:-false})
ENABLE_METRICS_SERVER: $(yaml-quote ${ENABLE_METRICS_SERVER:-false})
ENABLE_METADATA_AGENT: $(yaml-quote ${ENABLE_METADATA_AGENT:-none})
METADATA_AGENT_VERSION: $(yaml-quote ${METADATA_AGENT_VERSION:-})

View File

@ -160,6 +160,7 @@ export PATH=$(dirname "${e2e_test}"):"${PATH}"
--node-tag="${NODE_TAG:-}" \
--master-tag="${MASTER_TAG:-}" \
--cluster-monitoring-mode="${KUBE_ENABLE_CLUSTER_MONITORING:-influxdb}" \
--prometheus-monitoring="${KUBE_ENABLE_PROMETHEUS_MONITORING:-false}" \
${KUBE_CONTAINER_RUNTIME:+"--container-runtime=${KUBE_CONTAINER_RUNTIME}"} \
${MASTER_OS_DISTRIBUTION:+"--master-os-distro=${MASTER_OS_DISTRIBUTION}"} \
${NODE_OS_DISTRIBUTION:+"--node-os-distro=${NODE_OS_DISTRIBUTION}"} \

View File

@ -108,13 +108,18 @@ func GetResourceConsumerImage() string {
func NewDynamicResourceConsumer(name, nsName string, kind schema.GroupVersionKind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric int, cpuLimit, memLimit int64, clientset clientset.Interface, internalClientset *internalclientset.Clientset, scaleClient scaleclient.ScalesGetter) *ResourceConsumer {
return newResourceConsumer(name, nsName, kind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric, dynamicConsumptionTimeInSeconds,
dynamicRequestSizeInMillicores, dynamicRequestSizeInMegabytes, dynamicRequestSizeCustomMetric, cpuLimit, memLimit, clientset, internalClientset, scaleClient)
dynamicRequestSizeInMillicores, dynamicRequestSizeInMegabytes, dynamicRequestSizeCustomMetric, cpuLimit, memLimit, clientset, internalClientset, scaleClient, nil, nil)
}
// TODO this still defaults to replication controller
func NewStaticResourceConsumer(name, nsName string, replicas, initCPUTotal, initMemoryTotal, initCustomMetric int, cpuLimit, memLimit int64, clientset clientset.Interface, internalClientset *internalclientset.Clientset, scaleClient scaleclient.ScalesGetter) *ResourceConsumer {
return newResourceConsumer(name, nsName, KindRC, replicas, initCPUTotal, initMemoryTotal, initCustomMetric, staticConsumptionTimeInSeconds,
initCPUTotal/replicas, initMemoryTotal/replicas, initCustomMetric/replicas, cpuLimit, memLimit, clientset, internalClientset, scaleClient)
initCPUTotal/replicas, initMemoryTotal/replicas, initCustomMetric/replicas, cpuLimit, memLimit, clientset, internalClientset, scaleClient, nil, nil)
}
func NewMetricExporter(name, nsName string, podAnnotations, serviceAnnotations map[string]string, metricValue int, clientset clientset.Interface, internalClientset *internalclientset.Clientset, scaleClient scaleclient.ScalesGetter) *ResourceConsumer {
return newResourceConsumer(name, nsName, KindDeployment, 1, 0, 0, metricValue, dynamicConsumptionTimeInSeconds,
dynamicRequestSizeInMillicores, dynamicRequestSizeInMegabytes, dynamicRequestSizeCustomMetric, 100, 100, clientset, internalClientset, scaleClient, podAnnotations, serviceAnnotations)
}
/*
@ -125,9 +130,14 @@ memLimit argument is in megabytes, memLimit is a maximum amount of memory that c
cpuLimit argument is in millicores, cpuLimit is a maximum amount of cpu that can be consumed by a single pod
*/
func newResourceConsumer(name, nsName string, kind schema.GroupVersionKind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric, consumptionTimeInSeconds, requestSizeInMillicores,
requestSizeInMegabytes int, requestSizeCustomMetric int, cpuLimit, memLimit int64, clientset clientset.Interface, internalClientset *internalclientset.Clientset, scaleClient scaleclient.ScalesGetter) *ResourceConsumer {
runServiceAndWorkloadForResourceConsumer(clientset, internalClientset, nsName, name, kind, replicas, cpuLimit, memLimit)
requestSizeInMegabytes int, requestSizeCustomMetric int, cpuLimit, memLimit int64, clientset clientset.Interface, internalClientset *internalclientset.Clientset, scaleClient scaleclient.ScalesGetter, podAnnotations, serviceAnnotations map[string]string) *ResourceConsumer {
if podAnnotations == nil {
podAnnotations = make(map[string]string)
}
if serviceAnnotations == nil {
serviceAnnotations = make(map[string]string)
}
runServiceAndWorkloadForResourceConsumer(clientset, internalClientset, nsName, name, kind, replicas, cpuLimit, memLimit, podAnnotations, serviceAnnotations)
rc := &ResourceConsumer{
name: name,
controllerName: name + "-ctrl",
@ -227,7 +237,7 @@ func (rc *ResourceConsumer) makeConsumeCustomMetric() {
delta := 0
for {
select {
case delta := <-rc.customMetric:
case delta = <-rc.customMetric:
framework.Logf("RC %s: setting bump of metric %s to %d in total", rc.name, customMetricName, delta)
case <-time.After(sleepTime):
framework.Logf("RC %s: sending request to consume %d of custom metric %s", rc.name, delta, customMetricName)
@ -410,11 +420,12 @@ func (rc *ResourceConsumer) CleanUp() {
framework.ExpectNoError(rc.clientSet.CoreV1().Services(rc.nsName).Delete(rc.controllerName, nil))
}
func runServiceAndWorkloadForResourceConsumer(c clientset.Interface, internalClient internalclientset.Interface, ns, name string, kind schema.GroupVersionKind, replicas int, cpuLimitMillis, memLimitMb int64) {
func runServiceAndWorkloadForResourceConsumer(c clientset.Interface, internalClient internalclientset.Interface, ns, name string, kind schema.GroupVersionKind, replicas int, cpuLimitMillis, memLimitMb int64, podAnnotations, serviceAnnotations map[string]string) {
By(fmt.Sprintf("Running consuming RC %s via %s with %v replicas", name, kind, replicas))
_, err := c.CoreV1().Services(ns).Create(&v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Name: name,
Annotations: serviceAnnotations,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{{
@ -441,6 +452,7 @@ func runServiceAndWorkloadForResourceConsumer(c clientset.Interface, internalCli
CpuLimit: cpuLimitMillis,
MemRequest: memLimitMb * 1024 * 1024, // MemLimit is in bytes
MemLimit: memLimitMb * 1024 * 1024,
Annotations: podAnnotations,
}
switch kind {

View File

@ -112,6 +112,8 @@ type TestContextType struct {
NodeTestContextType
// Monitoring solution that is used in current cluster.
ClusterMonitoringMode string
// Separate Prometheus monitoring deployed in cluster
EnablePrometheusMonitoring bool
// Indicates what path the kubernetes-anywhere is installed on
KubernetesAnywherePath string
@ -237,6 +239,7 @@ func RegisterClusterFlags() {
flag.StringVar(&TestContext.MasterOSDistro, "master-os-distro", "debian", "The OS distribution of cluster master (debian, trusty, or coreos).")
flag.StringVar(&TestContext.NodeOSDistro, "node-os-distro", "debian", "The OS distribution of cluster VM instances (debian, trusty, or coreos).")
flag.StringVar(&TestContext.ClusterMonitoringMode, "cluster-monitoring-mode", "influxdb", "The monitoring solution that is used in the cluster.")
flag.BoolVar(&TestContext.EnablePrometheusMonitoring, "prometheus-monitoring", false, "Separate Prometheus monitoring deployed in cluster.")
// TODO: Flags per provider? Rename gce-project/gce-zone?
cloudConfig := &TestContext.CloudConfig

View File

@ -364,6 +364,12 @@ func SkipUnlessClusterMonitoringModeIs(supportedMonitoring ...string) {
}
}
func SkipUnlessPrometheusMonitoringIsEnabled(supportedMonitoring ...string) {
if !TestContext.EnablePrometheusMonitoring {
Skipf("Skipped because prometheus monitoring is not enabled")
}
}
func SkipUnlessMasterOSDistroIs(supportedMasterOsDistros ...string) {
if !MasterOSDistroIs(supportedMasterOsDistros...) {
Skipf("Only supported for master OS distro %v (not %s)", supportedMasterOsDistros, TestContext.MasterOSDistro)

View File

@ -14,6 +14,7 @@ go_library(
"custom_metrics_stackdriver.go",
"influxdb.go",
"metrics_grabber.go",
"prometheus.go",
"stackdriver.go",
"stackdriver_metadata_agent.go",
],
@ -28,6 +29,7 @@ go_library(
"//vendor/github.com/influxdata/influxdb/client/v2:go_default_library",
"//vendor/github.com/onsi/ginkgo:go_default_library",
"//vendor/github.com/onsi/gomega:go_default_library",
"//vendor/github.com/prometheus/common/model:go_default_library",
"//vendor/golang.org/x/oauth2/google:go_default_library",
"//vendor/google.golang.org/api/monitoring/v3:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",

View File

@ -0,0 +1,382 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package monitoring
import (
"context"
"encoding/json"
"fmt"
"math"
"time"
"github.com/prometheus/common/model"
. "github.com/onsi/ginkgo"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/common"
"k8s.io/kubernetes/test/e2e/framework"
instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common"
)
const (
prometheusQueryStep = time.Minute
prometheusMetricErrorTolerance = 0.25
prometheusMetricValidationDuration = time.Minute * 2
prometheusRate = time.Minute * 2
prometheusRequiredNodesUpDuration = time.Minute * 5
prometheusService = "prometheus"
prometheusSleepBetweenAttempts = time.Second * 30
prometheusTestTimeout = time.Minute * 5
customMetricValue = 1000
targetCPUUsage = 0.1
)
var _ = instrumentation.SIGDescribe("[Feature:PrometheusMonitoring] Prometheus", func() {
BeforeEach(func() {
framework.SkipUnlessPrometheusMonitoringIsEnabled()
})
f := framework.NewDefaultFramework("prometheus-monitoring")
It("should scrape container metrics from all nodes.", func() {
expectedNodes, err := getAllNodes(f.ClientSet)
framework.ExpectNoError(err)
retryUntilSucceeds(func() error {
return validateMetricAvailableForAllNodes(f.ClientSet, `container_cpu_usage_seconds_total`, expectedNodes)
}, prometheusTestTimeout)
})
It("should successfully scrape all targets", func() {
retryUntilSucceeds(func() error {
return validateAllActiveTargetsAreHealthy(f.ClientSet)
}, prometheusTestTimeout)
})
It("should contain correct container CPU metric.", func() {
query := prometheusCPUQuery(f.Namespace.Name, "prometheus-cpu-consumer", prometheusRate)
consumer := consumeCPUResources(f, "prometheus-cpu-consumer", targetCPUUsage*1000)
defer consumer.CleanUp()
retryUntilSucceeds(func() error {
return validateQueryReturnsCorrectValues(f.ClientSet, query, targetCPUUsage, 3, prometheusMetricErrorTolerance)
}, prometheusTestTimeout)
})
It("should scrape metrics from annotated pods.", func() {
query := prometheusPodCustomMetricQuery(f.Namespace.Name, "prometheus-custom-pod-metric")
consumer := exportCustomMetricFromPod(f, "prometheus-custom-pod-metric", customMetricValue)
defer consumer.CleanUp()
retryUntilSucceeds(func() error {
return validateQueryReturnsCorrectValues(f.ClientSet, query, customMetricValue, 1, prometheusMetricErrorTolerance)
}, prometheusTestTimeout)
})
It("should scrape metrics from annotated services.", func() {
query := prometheusServiceCustomMetricQuery(f.Namespace.Name, "prometheus-custom-service-metric")
consumer := exportCustomMetricFromService(f, "prometheus-custom-service-metric", customMetricValue)
defer consumer.CleanUp()
retryUntilSucceeds(func() error {
return validateQueryReturnsCorrectValues(f.ClientSet, query, customMetricValue, 1, prometheusMetricErrorTolerance)
}, prometheusTestTimeout)
})
})
func prometheusCPUQuery(namespace, podNamePrefix string, rate time.Duration) string {
return fmt.Sprintf(`sum(irate(container_cpu_usage_seconds_total{namespace="%v",pod_name=~"%v.*"}[%vm]))`,
namespace, podNamePrefix, int64(rate.Minutes()))
}
func prometheusServiceCustomMetricQuery(namespace, service string) string {
return fmt.Sprintf(`sum(QPS{kubernetes_namespace="%v",kubernetes_name="%v"})`, namespace, service)
}
func prometheusPodCustomMetricQuery(namespace, podNamePrefix string) string {
return fmt.Sprintf(`sum(QPS{kubernetes_namespace="%s",kubernetes_pod_name=~"%s.*"})`, namespace, podNamePrefix)
}
func consumeCPUResources(f *framework.Framework, consumerName string, cpuUsage int) *common.ResourceConsumer {
return common.NewDynamicResourceConsumer(consumerName, f.Namespace.Name, common.KindDeployment, 1, cpuUsage,
memoryUsed, 0, int64(cpuUsage), memoryLimit, f.ClientSet, f.InternalClientset, f.ScalesGetter)
}
func exportCustomMetricFromPod(f *framework.Framework, consumerName string, metricValue int) *common.ResourceConsumer {
podAnnotations := map[string]string{
"prometheus.io/scrape": "true",
"prometheus.io/path": "/Metrics",
"prometheus.io/port": "8080",
}
return common.NewMetricExporter(consumerName, f.Namespace.Name, podAnnotations, nil, metricValue, f.ClientSet, f.InternalClientset, f.ScalesGetter)
}
func exportCustomMetricFromService(f *framework.Framework, consumerName string, metricValue int) *common.ResourceConsumer {
serviceAnnotations := map[string]string{
"prometheus.io/scrape": "true",
"prometheus.io/path": "/Metrics",
"prometheus.io/port": "8080",
}
return common.NewMetricExporter(consumerName, f.Namespace.Name, nil, serviceAnnotations, metricValue, f.ClientSet, f.InternalClientset, f.ScalesGetter)
}
func validateMetricAvailableForAllNodes(c clientset.Interface, metric string, expectedNodesNames []string) error {
instanceLabels, err := getInstanceLabelsAvailableForMetric(c, prometheusRequiredNodesUpDuration, metric)
if err != nil {
return err
}
nodesWithMetric := make(map[string]bool)
for _, instance := range instanceLabels {
nodesWithMetric[instance] = true
}
missedNodesCount := 0
for _, nodeName := range expectedNodesNames {
if _, found := nodesWithMetric[nodeName]; !found {
missedNodesCount++
}
}
if missedNodesCount > 0 {
return fmt.Errorf("Metric not found for %v out of %v nodes", missedNodesCount, len(expectedNodesNames))
}
return nil
}
func validateAllActiveTargetsAreHealthy(c clientset.Interface) error {
discovery, err := fetchPrometheusTargetDiscovery(c)
if err != nil {
return err
}
if len(discovery.ActiveTargets) == 0 {
return fmt.Errorf("Prometheus is not scraping any targets, at least one target is required")
}
for _, target := range discovery.ActiveTargets {
if target.Health != HealthGood {
return fmt.Errorf("Target health not good. Target: %v", target)
}
}
return nil
}
func validateQueryReturnsCorrectValues(c clientset.Interface, query string, expectedValue float64, minSamplesCount int, errorTolerance float64) error {
samples, err := fetchQueryValues(c, query, prometheusMetricValidationDuration)
if err != nil {
return err
}
if len(samples) < minSamplesCount {
return fmt.Errorf("Not enough samples for query '%v', got %v", query, samples)
}
framework.Logf("Executed query '%v' returned %v", query, samples)
for _, value := range samples {
error := math.Abs(value-expectedValue) / expectedValue
if error >= errorTolerance {
return fmt.Errorf("Query result values outside expected value tolerance. Expected error below %v, got %v", errorTolerance, error)
}
}
return nil
}
func fetchQueryValues(c clientset.Interface, query string, duration time.Duration) ([]float64, error) {
now := time.Now()
response, err := queryPrometheus(c, query, now.Add(-duration), now, prometheusQueryStep)
if err != nil {
return nil, err
}
m, ok := response.(model.Matrix)
if !ok {
return nil, fmt.Errorf("Expected matric response, got: %T", response)
}
values := make([]float64, 0)
for _, stream := range m {
for _, sample := range stream.Values {
values = append(values, float64(sample.Value))
}
}
return values, nil
}
func getInstanceLabelsAvailableForMetric(c clientset.Interface, duration time.Duration, metric string) ([]string, error) {
var instance model.LabelValue
now := time.Now()
query := fmt.Sprintf(`sum(%v)by(instance)`, metric)
result, err := queryPrometheus(c, query, now.Add(-duration), now, prometheusQueryStep)
if err != nil {
return nil, err
}
instanceLabels := make([]string, 0)
m, ok := result.(model.Matrix)
if !ok {
framework.Failf("Expected matrix response for query '%v', got: %T", query, result)
return instanceLabels, nil
}
for _, stream := range m {
if instance, ok = stream.Metric["instance"]; !ok {
continue
}
instanceLabels = append(instanceLabels, string(instance))
}
return instanceLabels, nil
}
func fetchPrometheusTargetDiscovery(c clientset.Interface) (TargetDiscovery, error) {
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
response, err := c.CoreV1().RESTClient().Get().
Context(ctx).
Namespace("kube-system").
Resource("services").
Name(prometheusService+":9090").
SubResource("proxy").
Suffix("api", "v1", "targets").
Do().
Raw()
var qres promTargetsResponse
if err != nil {
fmt.Printf(string(response))
return qres.Data, err
}
err = json.Unmarshal(response, &qres)
return qres.Data, nil
}
type promTargetsResponse struct {
Status string `json:"status"`
Data TargetDiscovery `json:"data"`
}
type TargetDiscovery struct {
ActiveTargets []*Target `json:"activeTargets"`
DroppedTargets []*DroppedTarget `json:"droppedTargets"`
}
type Target struct {
DiscoveredLabels map[string]string `json:"discoveredLabels"`
Labels map[string]string `json:"labels"`
ScrapeURL string `json:"scrapeUrl"`
LastError string `json:"lastError"`
LastScrape time.Time `json:"lastScrape"`
Health TargetHealth `json:"health"`
}
type DroppedTarget struct {
// Labels before any processing.
DiscoveredLabels map[string]string `json:"discoveredLabels"`
}
const (
HealthUnknown TargetHealth = "unknown"
HealthGood TargetHealth = "up"
HealthBad TargetHealth = "down"
)
type TargetHealth string
func queryPrometheus(c clientset.Interface, query string, start, end time.Time, step time.Duration) (model.Value, error) {
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
response, err := c.CoreV1().RESTClient().Get().
Context(ctx).
Namespace("kube-system").
Resource("services").
Name(prometheusService+":9090").
SubResource("proxy").
Suffix("api", "v1", "query_range").
Param("query", query).
Param("start", fmt.Sprintf("%v", start.Unix())).
Param("end", fmt.Sprintf("%v", end.Unix())).
Param("step", fmt.Sprintf("%vs", step.Seconds())).
Do().
Raw()
if err != nil {
fmt.Printf(string(response))
return nil, err
}
var qres promQueryResponse
err = json.Unmarshal(response, &qres)
return model.Value(qres.Data.v), err
}
type promQueryResponse struct {
Status string `json:"status"`
Data responseData `json:"data"`
}
type responseData struct {
Type model.ValueType `json:"resultType"`
Result interface{} `json:"result"`
// The decoded value.
v model.Value
}
func (qr *responseData) UnmarshalJSON(b []byte) error {
v := struct {
Type model.ValueType `json:"resultType"`
Result json.RawMessage `json:"result"`
}{}
err := json.Unmarshal(b, &v)
if err != nil {
return err
}
switch v.Type {
case model.ValScalar:
var sv model.Scalar
err = json.Unmarshal(v.Result, &sv)
qr.v = &sv
case model.ValVector:
var vv model.Vector
err = json.Unmarshal(v.Result, &vv)
qr.v = vv
case model.ValMatrix:
var mv model.Matrix
err = json.Unmarshal(v.Result, &mv)
qr.v = mv
default:
err = fmt.Errorf("unexpected value type %q", v.Type)
}
return err
}
func retryUntilSucceeds(validator func() error, timeout time.Duration) {
startTime := time.Now()
var err error
for {
err = validator()
if err == nil {
return
}
if time.Since(startTime) >= timeout {
break
}
framework.Logf(err.Error())
time.Sleep(prometheusSleepBetweenAttempts)
}
framework.Failf(err.Error())
}
func getAllNodes(c clientset.Interface) ([]string, error) {
nodeList, err := c.CoreV1().Nodes().List(metav1.ListOptions{})
if err != nil {
return nil, err
}
result := []string{}
for _, node := range nodeList.Items {
result = append(result, node.Name)
}
return result, nil
}

View File

@ -131,8 +131,9 @@ type RCConfig struct {
// Env vars, set the same for every pod.
Env map[string]string
// Extra labels added to every pod.
Labels map[string]string
// Extra labels and annotations added to every pod.
Labels map[string]string
Annotations map[string]string
// Node selector for pods in the RC.
NodeSelector map[string]string
@ -292,7 +293,8 @@ func (config *DeploymentConfig) create() error {
},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"name": config.Name},
Labels: map[string]string{"name": config.Name},
Annotations: config.Annotations,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
@ -362,7 +364,8 @@ func (config *ReplicaSetConfig) create() error {
},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"name": config.Name},
Labels: map[string]string{"name": config.Name},
Annotations: config.Annotations,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
@ -428,7 +431,8 @@ func (config *JobConfig) create() error {
Completions: func(i int) *int32 { x := int32(i); return &x }(config.Replicas),
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"name": config.Name},
Labels: map[string]string{"name": config.Name},
Annotations: config.Annotations,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
@ -542,7 +546,8 @@ func (config *RCConfig) create() error {
},
Template: &v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"name": config.Name},
Labels: map[string]string{"name": config.Name},
Annotations: config.Annotations,
},
Spec: v1.PodSpec{
Affinity: config.Affinity,