mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #56871 from jpbetz/etcd-version-monitor-3.1
Automatic merge from submit-queue (batch tested with PRs 56650, 55813, 56911, 56921, 56871). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Improve etcd-version-monitor metrics proxying, add etcd 3.1 gprc metr… Partially addresses https://github.com/kubernetes/kubernetes/issues/56869: - Fix `etcd-version-monitor` to support etcd 3.1: Add support for the etcd 3.1+ [go-grpc-prometheus](https://github.com/grpc-ecosystem/go-grpc-prometheus) metrics format, which backward incompatibly replaces the 3.0 legacy grpc metric format. Expose the go-grpc-prometheus metrics both in the 3.1 format and in the 3.0 format so systems consuming `etcd-version-monitor` metrics have a clean, simple upgrade path. - Expose *all* etcd metrics by default, making this a one stop shop for all etcd metrics. - Expose grpc request latency histogram metrics (`grpc_server_handling_seconds` from [go-grpc-prometheus](https://github.com/grpc-ecosystem/go-grpc-prometheus) metrics format). Rewrite etcd 3.0 legacy metric for latency histograms to the etcd 3.1+ `go-grpc-prometheus` format so there is a single format exported for all etcd versions. etcd 3.0 to 3.1 upgrade path: Continue to use the `etcd_grpc_requests_total`. Once the upgrade is complete and all etcd nodes are running 3.1, migrate to the `grpc_server_handled_total` metric at your leisure. This PR reorganizes the code substantially. Previously, the code to proxy etcd metrics was hard coded and limited to a single counter metric. This has been entirely replaced with code that generically filters, rewrites proxied etcd metrics and then aggregates them with custom metrics such as the etcd version metric. cc @wojtek-t @mml @shyamjvs @cheftako ```release-note Fix `etcd-version-monitor` to backward compatibly support etcd 3.1 [go-grpc-prometheus](https://github.com/grpc-ecosystem/go-grpc-prometheus) metrics format. ```
This commit is contained in:
commit
8ae6202b7e
@ -19,6 +19,8 @@ go_library(
|
||||
deps = [
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
|
||||
"//vendor/github.com/prometheus/client_golang/prometheus/promhttp:go_default_library",
|
||||
"//vendor/github.com/prometheus/client_model/go:go_default_library",
|
||||
"//vendor/github.com/prometheus/common/expfmt:go_default_library",
|
||||
"//vendor/github.com/spf13/pflag:go_default_library",
|
||||
],
|
||||
|
@ -20,7 +20,7 @@
|
||||
ARCH:=amd64
|
||||
GOLANG_VERSION?=1.8.3
|
||||
REGISTRY?=gcr.io/google-containers
|
||||
TAG?=0.1.0
|
||||
TAG?=0.1.1
|
||||
IMAGE:=$(REGISTRY)/etcd-version-monitor:$(TAG)
|
||||
CURRENT_DIR:=$(pwd)
|
||||
TEMP_DIR:=$(shell mktemp -d)
|
||||
|
@ -1,11 +1,19 @@
|
||||
# etcd-version-monitor
|
||||
|
||||
This is a tool for exporting metrics related to etcd version, like etcd
|
||||
server's binary version, cluster version, and counts of different kinds of
|
||||
gRPC calls (which is a characteristic of v3), etc. These metrics are in
|
||||
This is a tool for exporting etcd metrics and supplementing them with etcd
|
||||
server binary version and cluster version. These metrics are in
|
||||
prometheus format and can be scraped by a prometheus server.
|
||||
The metrics are exposed at the http://localhost:9101/metrics endpoint.
|
||||
|
||||
For etcd 3.1+, the
|
||||
[go-grpc-prometheus](https://github.com/grpc-ecosystem/go-grpc-prometheus)
|
||||
metrics format, which backward incompatibly replaces the 3.0 legacy grpc metric
|
||||
format, is exposed in both the 3.1 format and in the 3.0. This preserves
|
||||
backward compatiblity.
|
||||
|
||||
For etcd 3.1+, the `--metrics=extensive` must be set on etcd for grpc request
|
||||
latency metrics (`etcd_grpc_unary_requests_duration_seconds`) to be exposed.
|
||||
|
||||
**RUNNING THE TOOL**
|
||||
|
||||
To run this tool as a docker container:
|
||||
|
@ -25,6 +25,8 @@ import (
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
"github.com/prometheus/common/expfmt"
|
||||
"github.com/spf13/pflag"
|
||||
)
|
||||
@ -52,6 +54,11 @@ const (
|
||||
|
||||
// Initialize prometheus metrics to be exported.
|
||||
var (
|
||||
// Register all custom metrics with a dedicated registry to keep them separate.
|
||||
customMetricRegistry = prometheus.NewRegistry()
|
||||
|
||||
// Custom etcd version metric since etcd 3.2- does not export one.
|
||||
// This will be replaced by https://github.com/coreos/etcd/pull/8960 in etcd 3.3.
|
||||
etcdVersion = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
@ -59,15 +66,122 @@ var (
|
||||
Help: "Etcd server's binary version",
|
||||
},
|
||||
[]string{"binary_version"})
|
||||
etcdGRPCRequestsTotal = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Name: "grpc_requests_total",
|
||||
Help: "Counter of received grpc requests, labeled by the grpc method and service names",
|
||||
|
||||
gatherer = &monitorGatherer{
|
||||
// Rewrite rules for etcd metrics that are exported by default.
|
||||
exported: map[string]*exportedMetric{
|
||||
// etcd 3.0 metric format for total grpc requests with renamed method and service labels.
|
||||
"etcd_grpc_requests_total": {
|
||||
rewriters: []rewriteFunc{
|
||||
func(mf *dto.MetricFamily) (*dto.MetricFamily, error) {
|
||||
mf = deepCopyMetricFamily(mf)
|
||||
renameLabels(mf, map[string]string{
|
||||
"grpc_method": "method",
|
||||
"grpc_service": "service",
|
||||
})
|
||||
return mf, nil
|
||||
},
|
||||
},
|
||||
},
|
||||
// etcd 3.1+ metric format for total grpc requests.
|
||||
"grpc_server_handled_total": {
|
||||
rewriters: []rewriteFunc{
|
||||
// Export the metric exactly as-is. For 3.1+ metrics, we will
|
||||
// pass all metrics directly through.
|
||||
identity,
|
||||
// Write to the etcd 3.0 metric format for backward compatibility.
|
||||
func(mf *dto.MetricFamily) (*dto.MetricFamily, error) {
|
||||
mf = deepCopyMetricFamily(mf)
|
||||
renameMetric(mf, "etcd_grpc_requests_total")
|
||||
renameLabels(mf, map[string]string{
|
||||
"grpc_method": "method",
|
||||
"grpc_service": "service",
|
||||
})
|
||||
return mf, nil
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
// etcd 3.0 metric format for grpc request latencies,
|
||||
// rewritten to the etcd 3.1+ format.
|
||||
"etcd_grpc_unary_requests_duration_seconds": {
|
||||
rewriters: []rewriteFunc{
|
||||
func(mf *dto.MetricFamily) (*dto.MetricFamily, error) {
|
||||
mf = deepCopyMetricFamily(mf)
|
||||
renameMetric(mf, "grpc_server_handling_seconds")
|
||||
tpeName := "grpc_type"
|
||||
tpeVal := "unary"
|
||||
for _, m := range mf.Metric {
|
||||
m.Label = append(m.Label, &dto.LabelPair{Name: &tpeName, Value: &tpeVal})
|
||||
}
|
||||
return mf, nil
|
||||
},
|
||||
},
|
||||
},
|
||||
// etcd 3.1+ metric format for total grpc requests.
|
||||
"grpc_server_handling_seconds": {},
|
||||
},
|
||||
[]string{"method", "service"})
|
||||
}
|
||||
)
|
||||
|
||||
// monitorGatherer is a custom metric gatherer for prometheus that exports custom metrics
|
||||
// defined by this monitor as well as rewritten etcd metrics.
|
||||
type monitorGatherer struct {
|
||||
exported map[string]*exportedMetric
|
||||
}
|
||||
|
||||
// exportedMetric identifies a metric that is exported and defines how it is rewritten before
|
||||
// it is exported.
|
||||
type exportedMetric struct {
|
||||
rewriters []rewriteFunc
|
||||
}
|
||||
|
||||
// rewriteFunc rewrites metrics before they are exported.
|
||||
type rewriteFunc func(mf *dto.MetricFamily) (*dto.MetricFamily, error)
|
||||
|
||||
func (m *monitorGatherer) Gather() ([]*dto.MetricFamily, error) {
|
||||
etcdMetrics, err := scrapeMetrics()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
exported, err := m.rewriteExportedMetrics(etcdMetrics)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
custom, err := customMetricRegistry.Gather()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result := make([]*dto.MetricFamily, 0, len(exported)+len(custom))
|
||||
result = append(result, exported...)
|
||||
result = append(result, custom...)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (m *monitorGatherer) rewriteExportedMetrics(metrics map[string]*dto.MetricFamily) ([]*dto.MetricFamily, error) {
|
||||
results := make([]*dto.MetricFamily, 0, len(metrics))
|
||||
for n, mf := range metrics {
|
||||
if e, ok := m.exported[n]; ok {
|
||||
// Apply rewrite rules for metrics that have them.
|
||||
if e.rewriters == nil {
|
||||
results = append(results, mf)
|
||||
} else {
|
||||
for _, rewriter := range e.rewriters {
|
||||
new, err := rewriter(mf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
results = append(results, new)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Proxy all metrics without any rewrite rules directly.
|
||||
results = append(results, mf)
|
||||
}
|
||||
}
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// Struct for unmarshalling the json response from etcd's /version endpoint.
|
||||
type EtcdVersion struct {
|
||||
BinaryVersion string `json:"etcdserver"`
|
||||
@ -132,83 +246,78 @@ func getVersionPeriodically(stopCh <-chan struct{}) {
|
||||
}
|
||||
}
|
||||
|
||||
// Struct for storing labels for gRPC request types.
|
||||
type GRPCRequestLabels struct {
|
||||
Method string
|
||||
Service string
|
||||
}
|
||||
|
||||
// Function for fetching etcd grpc request counts and feeding it to the prometheus metric.
|
||||
func getGRPCRequestCount(lastRecordedCount *map[GRPCRequestLabels]float64) error {
|
||||
// Create the get request for the etcd metrics endpoint.
|
||||
// scrapeMetrics scrapes the prometheus metrics from the etcd metrics URI.
|
||||
func scrapeMetrics() (map[string]*dto.MetricFamily, error) {
|
||||
req, err := http.NewRequest("GET", etcdMetricsScrapeURI, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to create GET request for etcd metrics: %v", err)
|
||||
return nil, fmt.Errorf("Failed to create GET request for etcd metrics: %v", err)
|
||||
}
|
||||
|
||||
// Send the get request and receive a response.
|
||||
client := &http.Client{}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to receive GET response for etcd metrics: %v", err)
|
||||
return nil, fmt.Errorf("Failed to receive GET response for etcd metrics: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// Parse the metrics in text format to a MetricFamily struct.
|
||||
var textParser expfmt.TextParser
|
||||
metricFamilies, err := textParser.TextToMetricFamilies(resp.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to parse etcd metrics: %v", err)
|
||||
}
|
||||
|
||||
// Look through the grpc requests metric family and update our promotheus metric.
|
||||
for _, metric := range metricFamilies["etcd_grpc_requests_total"].GetMetric() {
|
||||
var grpcRequestLabels GRPCRequestLabels
|
||||
for _, label := range metric.GetLabel() {
|
||||
if label.GetName() == "grpc_method" {
|
||||
grpcRequestLabels.Method = label.GetValue()
|
||||
}
|
||||
if label.GetName() == "grpc_service" {
|
||||
grpcRequestLabels.Service = label.GetValue()
|
||||
}
|
||||
}
|
||||
if grpcRequestLabels.Method == "" || grpcRequestLabels.Service == "" {
|
||||
return fmt.Errorf("Could not get value for grpc_method and/or grpc_service label")
|
||||
}
|
||||
|
||||
// Get last recorded value and new value of the metric and update it suitably.
|
||||
previousMetricValue := 0.0
|
||||
if value, ok := (*lastRecordedCount)[grpcRequestLabels]; ok {
|
||||
previousMetricValue = value
|
||||
}
|
||||
newMetricValue := metric.GetCounter().GetValue()
|
||||
(*lastRecordedCount)[grpcRequestLabels] = newMetricValue
|
||||
if newMetricValue >= previousMetricValue {
|
||||
etcdGRPCRequestsTotal.With(prometheus.Labels{
|
||||
"method": grpcRequestLabels.Method,
|
||||
"service": grpcRequestLabels.Service,
|
||||
}).Add(newMetricValue - previousMetricValue)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return textParser.TextToMetricFamilies(resp.Body)
|
||||
}
|
||||
|
||||
// Function for periodically fetching etcd GRPC request counts.
|
||||
func getGRPCRequestCountPeriodically(stopCh <-chan struct{}) {
|
||||
// This map stores last recorded count for a given grpc request type.
|
||||
lastRecordedCount := make(map[GRPCRequestLabels]float64)
|
||||
for {
|
||||
if err := getGRPCRequestCount(&lastRecordedCount); err != nil {
|
||||
glog.Errorf("Failed to fetch etcd grpc request counts: %v", err)
|
||||
}
|
||||
select {
|
||||
case <-stopCh:
|
||||
break
|
||||
case <-time.After(scrapeTimeout):
|
||||
func renameMetric(mf *dto.MetricFamily, name string) {
|
||||
mf.Name = &name
|
||||
}
|
||||
|
||||
func renameLabels(mf *dto.MetricFamily, nameMapping map[string]string) {
|
||||
for _, m := range mf.Metric {
|
||||
for _, lbl := range m.Label {
|
||||
if alias, ok := nameMapping[*lbl.Name]; ok {
|
||||
lbl.Name = &alias
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func identity(mf *dto.MetricFamily) (*dto.MetricFamily, error) {
|
||||
return mf, nil
|
||||
}
|
||||
|
||||
func deepCopyMetricFamily(mf *dto.MetricFamily) *dto.MetricFamily {
|
||||
r := &dto.MetricFamily{}
|
||||
r.Name = mf.Name
|
||||
r.Help = mf.Help
|
||||
r.Type = mf.Type
|
||||
r.Metric = make([]*dto.Metric, len(mf.Metric))
|
||||
for i, m := range mf.Metric {
|
||||
r.Metric[i] = deepCopyMetric(m)
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
func deepCopyMetric(m *dto.Metric) *dto.Metric {
|
||||
r := &dto.Metric{}
|
||||
r.Label = make([]*dto.LabelPair, len(m.Label))
|
||||
for i, lp := range m.Label {
|
||||
r.Label[i] = deepCopyLabelPair(lp)
|
||||
}
|
||||
r.Gauge = m.Gauge
|
||||
r.Counter = m.Counter
|
||||
r.Summary = m.Summary
|
||||
r.Untyped = m.Untyped
|
||||
r.Histogram = m.Histogram
|
||||
r.TimestampMs = m.TimestampMs
|
||||
return r
|
||||
}
|
||||
|
||||
func deepCopyLabelPair(lp *dto.LabelPair) *dto.LabelPair {
|
||||
r := &dto.LabelPair{}
|
||||
r.Name = lp.Name
|
||||
r.Value = lp.Value
|
||||
return r
|
||||
}
|
||||
|
||||
func main() {
|
||||
// Register the commandline flags passed to the tool.
|
||||
registerFlags(pflag.CommandLine)
|
||||
@ -216,18 +325,16 @@ func main() {
|
||||
pflag.Parse()
|
||||
|
||||
// Register the metrics we defined above with prometheus.
|
||||
prometheus.MustRegister(etcdVersion)
|
||||
prometheus.MustRegister(etcdGRPCRequestsTotal)
|
||||
prometheus.Unregister(prometheus.NewGoCollector())
|
||||
customMetricRegistry.MustRegister(etcdVersion)
|
||||
customMetricRegistry.Unregister(prometheus.NewGoCollector())
|
||||
|
||||
// Spawn threads for periodically scraping etcd version metrics.
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
go getVersionPeriodically(stopCh)
|
||||
go getGRPCRequestCountPeriodically(stopCh)
|
||||
|
||||
// Serve our metrics on listenAddress/metricsPath.
|
||||
glog.Infof("Listening on: %v", listenAddress)
|
||||
http.Handle(metricsPath, prometheus.UninstrumentedHandler())
|
||||
http.Handle(metricsPath, promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{}))
|
||||
glog.Errorf("Stopped listening/serving metrics: %v", http.ListenAndServe(listenAddress, nil))
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user