From d5c9ad80499a9148a40b8a6c33c165cf12578649 Mon Sep 17 00:00:00 2001 From: wojtekt Date: Mon, 1 Apr 2019 12:46:06 +0200 Subject: [PATCH] Expose etcd client latency metrics --- .../k8s.io/apiserver/pkg/storage/etcd3/BUILD | 1 + .../apiserver/pkg/storage/etcd3/store.go | 28 ++++++++++++++++++- test/integration/metrics/metrics_test.go | 4 +++ 3 files changed, 32 insertions(+), 1 deletion(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD index 2d064bac409..5439a929406 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD @@ -62,6 +62,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage/etcd/metrics:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library", "//vendor/github.com/coreos/etcd/clientv3:go_default_library", "//vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 2413a6674a2..0667a407c02 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -38,6 +38,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/etcd" + "k8s.io/apiserver/pkg/storage/etcd/metrics" "k8s.io/apiserver/pkg/storage/value" utiltrace "k8s.io/utils/trace" ) @@ -111,7 +112,9 @@ func (s *store) Versioner() storage.Versioner { // Get implements storage.Interface.Get. func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error { key = path.Join(s.pathPrefix, key) + startTime := time.Now() getResp, err := s.client.KV.Get(ctx, key, s.getOps...) + metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime) if err != nil { return err } @@ -156,11 +159,13 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, return storage.NewInternalError(err.Error()) } + startTime := time.Now() txnResp, err := s.client.KV.Txn(ctx).If( notFound(key), ).Then( clientv3.OpPut(key, string(newData), opts...), ).Commit() + metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime) if err != nil { return err } @@ -191,10 +196,12 @@ func (s *store) Delete(ctx context.Context, key string, out runtime.Object, prec func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime.Object) error { // We need to do get and delete in single transaction in order to // know the value and revision before deleting it. + startTime := time.Now() txnResp, err := s.client.KV.Txn(ctx).If().Then( clientv3.OpGet(key), clientv3.OpDelete(key), ).Commit() + metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime) if err != nil { return err } @@ -212,7 +219,9 @@ func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime } func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions) error { + startTime := time.Now() getResp, err := s.client.KV.Get(ctx, key) + metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime) if err != nil { return err } @@ -224,6 +233,7 @@ func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.O if err := preconditions.Check(key, origState.obj); err != nil { return err } + startTime := time.Now() txnResp, err := s.client.KV.Txn(ctx).If( clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev), ).Then( @@ -231,6 +241,7 @@ func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.O ).Else( clientv3.OpGet(key), ).Commit() + metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime) if err != nil { return err } @@ -247,7 +258,7 @@ func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.O func (s *store) GuaranteedUpdate( ctx context.Context, key string, out runtime.Object, ignoreNotFound bool, preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error { - trace := utiltrace.New(fmt.Sprintf("GuaranteedUpdate etcd3: %s", reflect.TypeOf(out).String())) + trace := utiltrace.New(fmt.Sprintf("GuaranteedUpdate etcd3: %s", getTypeName(out))) defer trace.LogIfLong(500 * time.Millisecond) v, err := conversion.EnforcePtr(out) @@ -257,7 +268,9 @@ func (s *store) GuaranteedUpdate( key = path.Join(s.pathPrefix, key) getCurrentState := func() (*objState, error) { + startTime := time.Now() getResp, err := s.client.KV.Get(ctx, key, s.getOps...) + metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime) if err != nil { return nil, err } @@ -339,6 +352,7 @@ func (s *store) GuaranteedUpdate( } trace.Step("Transaction prepared") + startTime := time.Now() txnResp, err := s.client.KV.Txn(ctx).If( clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev), ).Then( @@ -346,6 +360,7 @@ func (s *store) GuaranteedUpdate( ).Else( clientv3.OpGet(key), ).Commit() + metrics.RecordEtcdRequestLatency("update", getTypeName(out), startTime) if err != nil { return err } @@ -379,7 +394,9 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin } key = path.Join(s.pathPrefix, key) + startTime := time.Now() getResp, err := s.client.KV.Get(ctx, key, s.getOps...) + metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime) if err != nil { return err } @@ -399,7 +416,9 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin func (s *store) Count(key string) (int64, error) { key = path.Join(s.pathPrefix, key) + startTime := time.Now() getResp, err := s.client.KV.Get(context.Background(), key, clientv3.WithRange(clientv3.GetPrefixRangeEnd(key)), clientv3.WithCountOnly()) + metrics.RecordEtcdRequestLatency("listWithCount", key, startTime) if err != nil { return 0, err } @@ -554,7 +573,9 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor var lastKey []byte var hasMore bool for { + startTime := time.Now() getResp, err := s.client.KV.Get(ctx, key, options...) + metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime) if err != nil { return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix) } @@ -786,3 +807,8 @@ func appendListItem(v reflect.Value, data []byte, rev uint64, pred storage.Selec func notFound(key string) clientv3.Cmp { return clientv3.Compare(clientv3.ModRevision(key), "=", 0) } + +// getTypeName returns type name of an object for reporting purposes. +func getTypeName(obj interface{}) string { + return reflect.TypeOf(obj).String() +} diff --git a/test/integration/metrics/metrics_test.go b/test/integration/metrics/metrics_test.go index 3dcd40e12e8..0ef2ba3283e 100644 --- a/test/integration/metrics/metrics_test.go +++ b/test/integration/metrics/metrics_test.go @@ -61,6 +61,9 @@ func scrapeMetrics(s *httptest.Server) ([]*prometheuspb.MetricFamily, error) { // Each line in the response body should contain all the data for a single metric. var metrics []*prometheuspb.MetricFamily scanner := bufio.NewScanner(resp.Body) + // Increase buffer size, since default one is too small for reading + // the /metrics contents. + scanner.Buffer(make([]byte, 10), 131072) for scanner.Scan() { var metric prometheuspb.MetricFamily if err := proto.UnmarshalText(scanner.Text(), &metric); err != nil { @@ -122,5 +125,6 @@ func TestApiserverMetrics(t *testing.T) { checkForExpectedMetrics(t, metrics, []string{ "apiserver_request_total", "apiserver_request_duration_seconds", + "etcd_request_duration_seconds", }) }