diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index fc0c53a1f06..f591b70da1d 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -217,7 +217,7 @@ func (s *APIServer) verifyClusterIPFlags() { } func newEtcd(etcdConfigFile string, etcdServerList util.StringList, storageVersion string, pathPrefix string) (helper tools.EtcdHelper, err error) { - var client tools.EtcdGetSet + var client tools.EtcdClient if etcdConfigFile != "" { client, err = etcd.NewClientFromFile(etcdConfigFile) if err != nil { diff --git a/contrib/mesos/pkg/election/etcd_master.go b/contrib/mesos/pkg/election/etcd_master.go index 17f4d71fd80..59183a90580 100644 --- a/contrib/mesos/pkg/election/etcd_master.go +++ b/contrib/mesos/pkg/election/etcd_master.go @@ -37,7 +37,7 @@ type Master string func (Master) IsAnAPIObject() {} // NewEtcdMasterElector returns an implementation of election.MasterElector backed by etcd. -func NewEtcdMasterElector(h tools.EtcdGetSet) MasterElector { +func NewEtcdMasterElector(h tools.EtcdClient) MasterElector { return &etcdMasterElector{etcd: h} } @@ -45,7 +45,7 @@ type empty struct{} // internal implementation struct type etcdMasterElector struct { - etcd tools.EtcdGetSet + etcd tools.EtcdClient done chan empty events chan watch.Event } diff --git a/contrib/mesos/pkg/scheduler/scheduler.go b/contrib/mesos/pkg/scheduler/scheduler.go index f6633cdacf6..a62278e52ec 100644 --- a/contrib/mesos/pkg/scheduler/scheduler.go +++ b/contrib/mesos/pkg/scheduler/scheduler.go @@ -125,7 +125,7 @@ type KubernetesScheduler struct { executorGroup uint64 scheduleFunc PodScheduleFunc client *client.Client - etcdClient tools.EtcdGetSet + etcdClient tools.EtcdClient failoverTimeout float64 // in seconds reconcileInterval int64 @@ -158,7 +158,7 @@ type Config struct { Executor *mesos.ExecutorInfo ScheduleFunc PodScheduleFunc Client *client.Client - EtcdClient tools.EtcdGetSet + EtcdClient tools.EtcdClient FailoverTimeout float64 ReconcileInterval int64 ReconcileCooldown time.Duration diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index 3fef41c1214..c9aab079493 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -525,7 +525,7 @@ func validateLeadershipTransition(desired, current string) { } // hacked from https://github.com/GoogleCloudPlatform/kubernetes/blob/release-0.14/cmd/kube-apiserver/app/server.go -func newEtcd(etcdConfigFile string, etcdServerList util.StringList) (client tools.EtcdGetSet, err error) { +func newEtcd(etcdConfigFile string, etcdServerList util.StringList) (client tools.EtcdClient, err error) { if etcdConfigFile != "" { client, err = etcd.NewClientFromFile(etcdConfigFile) } else { @@ -534,7 +534,7 @@ func newEtcd(etcdConfigFile string, etcdServerList util.StringList) (client tool return } -func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config) (*ha.SchedulerProcess, ha.DriverFactory, tools.EtcdGetSet, *uid.UID) { +func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config) (*ha.SchedulerProcess, ha.DriverFactory, tools.EtcdClient, *uid.UID) { s.FrameworkName = strings.TrimSpace(s.FrameworkName) if s.FrameworkName == "" { @@ -737,7 +737,7 @@ func (s *SchedulerServer) buildFrameworkInfo() (info *mesos.FrameworkInfo, cred return } -func (s *SchedulerServer) fetchFrameworkID(client tools.EtcdGetSet) (*mesos.FrameworkID, error) { +func (s *SchedulerServer) fetchFrameworkID(client tools.EtcdClient) (*mesos.FrameworkID, error) { if s.FailoverTimeout > 0 { if response, err := client.Get(meta.FrameworkIDKey, false, false); err != nil { if !tools.IsEtcdNotFound(err) { diff --git a/pkg/master/master.go b/pkg/master/master.go index cb01bca77e4..58ca89ae106 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -229,7 +229,7 @@ type Master struct { // NewEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version // is incorrect. -func NewEtcdHelper(client tools.EtcdGetSet, version string, prefix string) (helper tools.EtcdHelper, err error) { +func NewEtcdHelper(client tools.EtcdClient, version string, prefix string) (helper tools.EtcdHelper, err error) { if version == "" { version = latest.Version } diff --git a/pkg/tools/etcd_helper.go b/pkg/tools/etcd_helper.go index ca5ce551148..17ff1af5336 100644 --- a/pkg/tools/etcd_helper.go +++ b/pkg/tools/etcd_helper.go @@ -31,76 +31,26 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/conversion" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/metrics" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/coreos/go-etcd/etcd" - "github.com/prometheus/client_golang/prometheus" "github.com/golang/glog" ) -var ( - cacheHitCounter = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "etcd_helper_cache_hit_count", - Help: "Counter of etcd helper cache hits.", - }, - ) - cacheMissCounter = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "etcd_helper_cache_miss_count", - Help: "Counter of etcd helper cache miss.", - }, - ) - cacheEntryCounter = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "etcd_helper_cache_entry_count", - Help: "Counter of etcd helper cache entries. This can be different from etcd_helper_cache_miss_count " + - "because two concurrent threads can miss the cache and generate the same entry twice.", - }, - ) - cacheGetLatency = prometheus.NewSummary( - prometheus.SummaryOpts{ - Name: "etcd_request_cache_get_latencies_summary", - Help: "Latency in microseconds of getting an object from etcd cache", - }, - ) - cacheAddLatency = prometheus.NewSummary( - prometheus.SummaryOpts{ - Name: "etcd_request_cache_add_latencies_summary", - Help: "Latency in microseconds of adding an object to etcd cache", - }, - ) - etcdRequestLatenciesSummary = prometheus.NewSummaryVec( - prometheus.SummaryOpts{ - Name: "etcd_request_latencies_summary", - Help: "Etcd request latency summary in microseconds for each operation and object type.", - }, - []string{"operation", "type"}, - ) -) - const maxEtcdCacheEntries int = 50000 func init() { - prometheus.MustRegister(cacheHitCounter) - prometheus.MustRegister(cacheMissCounter) - prometheus.MustRegister(cacheEntryCounter) - prometheus.MustRegister(cacheAddLatency) - prometheus.MustRegister(cacheGetLatency) - prometheus.MustRegister(etcdRequestLatenciesSummary) + metrics.Register() } func getTypeName(obj interface{}) string { return reflect.TypeOf(obj).String() } -func recordEtcdRequestLatency(verb, resource string, startTime time.Time) { - etcdRequestLatenciesSummary.WithLabelValues(verb, resource).Observe(float64(time.Since(startTime) / time.Microsecond)) -} - // EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client. type EtcdHelper struct { - Client EtcdGetSet + Client EtcdClient Codec runtime.Codec Copier runtime.ObjectCopier // optional, no atomic operations can be performed without this interface @@ -121,7 +71,7 @@ type EtcdHelper struct { // NewEtcdHelper creates a helper that works against objects that use the internal // Kubernetes API objects. // TODO: Refactor to take a runtiem.ObjectCopier -func NewEtcdHelper(client EtcdGetSet, codec runtime.Codec, prefix string) EtcdHelper { +func NewEtcdHelper(client EtcdClient, codec runtime.Codec, prefix string) EtcdHelper { return EtcdHelper{ Client: client, Codec: codec, @@ -234,7 +184,7 @@ type etcdCache interface { func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) { startTime := time.Now() defer func() { - cacheGetLatency.Observe(float64(time.Since(startTime) / time.Microsecond)) + metrics.ObserveGetCache(startTime) }() obj, found := h.cache.Get(index) if found { @@ -245,17 +195,17 @@ func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) { glog.Errorf("Error during DeepCopy of cached object: %q", err) return nil, false } - cacheHitCounter.Inc() + metrics.ObserveCacheHit() return objCopy.(runtime.Object), true } - cacheMissCounter.Inc() + metrics.ObserveCacheMiss() return nil, false } func (h *EtcdHelper) addToCache(index uint64, obj runtime.Object) { startTime := time.Now() defer func() { - cacheAddLatency.Observe(float64(time.Since(startTime) / time.Microsecond)) + metrics.ObserveAddCache(startTime) }() objCopy, err := h.Copier.Copy(obj) if err != nil { @@ -264,7 +214,7 @@ func (h *EtcdHelper) addToCache(index uint64, obj runtime.Object) { } isOverwrite := h.cache.Add(index, objCopy) if !isOverwrite { - cacheEntryCounter.Inc() + metrics.ObserveNewEntry() } } @@ -281,7 +231,7 @@ func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error { startTime := time.Now() trace.Step("About to list etcd node") nodes, index, err := h.listEtcdNode(key) - recordEtcdRequestLatency("list", getTypeName(listPtr), startTime) + metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime) trace.Step("Etcd node listed") if err != nil { return err @@ -310,7 +260,7 @@ func (h *EtcdHelper) ExtractObjToList(key string, listObj runtime.Object) error startTime := time.Now() trace.Step("About to read etcd node") response, err := h.Client.Get(key, false, false) - recordEtcdRequestLatency("get", getTypeName(listPtr), startTime) + metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime) trace.Step("Etcd node read") if err != nil { if IsEtcdNotFound(err) { @@ -348,7 +298,7 @@ func (h *EtcdHelper) ExtractObj(key string, objPtr runtime.Object, ignoreNotFoun func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, err error) { startTime := time.Now() response, err := h.Client.Get(key, false, false) - recordEtcdRequestLatency("get", getTypeName(objPtr), startTime) + metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime) if err != nil && !IsEtcdNotFound(err) { return "", nil, nil, err @@ -404,7 +354,7 @@ func (h *EtcdHelper) CreateObj(key string, obj, out runtime.Object, ttl uint64) startTime := time.Now() response, err := h.Client.Create(key, string(data), ttl) - recordEtcdRequestLatency("create", getTypeName(obj), startTime) + metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime) if err != nil { return err } @@ -422,7 +372,7 @@ func (h *EtcdHelper) Delete(key string, recursive bool) error { key = h.PrefixEtcdKey(key) startTime := time.Now() _, err := h.Client.Delete(key, recursive) - recordEtcdRequestLatency("delete", "UNKNOWN", startTime) + metrics.RecordEtcdRequestLatency("delete", "UNKNOWN", startTime) return err } @@ -435,7 +385,7 @@ func (h *EtcdHelper) DeleteObj(key string, out runtime.Object) error { startTime := time.Now() response, err := h.Client.Delete(key, false) - recordEtcdRequestLatency("delete", getTypeName(out), startTime) + metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime) if !IsEtcdNotFound(err) { // if the object that existed prior to the delete is returned by etcd, update out. if err != nil || response.PrevNode != nil { @@ -462,7 +412,7 @@ func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) err create = false startTime := time.Now() response, err = h.Client.CompareAndSwap(key, string(data), ttl, "", version) - recordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime) + metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime) if err != nil { return err } @@ -472,7 +422,7 @@ func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) err // Create will fail if a key already exists. startTime := time.Now() response, err = h.Client.Create(key, string(data), ttl) - recordEtcdRequestLatency("create", getTypeName(obj), startTime) + metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime) } if err != nil { @@ -589,7 +539,7 @@ func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno if index == 0 { startTime := time.Now() response, err := h.Client.Create(key, string(data), ttl) - recordEtcdRequestLatency("create", getTypeName(ptrToType), startTime) + metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime) if IsEtcdNodeExist(err) { continue } @@ -604,7 +554,7 @@ func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno startTime := time.Now() // Swap origBody with data, if origBody is the latest etcd data. response, err := h.Client.CompareAndSwap(key, string(data), ttl, origBody, index) - recordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime) + metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime) if IsEtcdTestFailed(err) { // Try again. continue diff --git a/pkg/tools/etcd_helper_watch.go b/pkg/tools/etcd_helper_watch.go index fb66e2af25b..deca3b4ef0f 100644 --- a/pkg/tools/etcd_helper_watch.go +++ b/pkg/tools/etcd_helper_watch.go @@ -186,7 +186,7 @@ func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding // etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called // as a goroutine. -func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) { +func (w *etcdWatcher) etcdWatch(client EtcdClient, key string, resourceVersion uint64) { defer util.HandleCrash() defer close(w.etcdError) if resourceVersion == 0 { @@ -204,7 +204,7 @@ func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion u } // etcdGetInitialWatchState turns an etcd Get request into a watch equivalent -func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) { +func etcdGetInitialWatchState(client EtcdClient, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) { resp, err := client.Get(key, false, recursive) if err != nil { if !IsEtcdNotFound(err) { diff --git a/pkg/tools/fake_etcd_client.go b/pkg/tools/fake_etcd_client.go index 11d7d435108..45232f5d8b9 100644 --- a/pkg/tools/fake_etcd_client.go +++ b/pkg/tools/fake_etcd_client.go @@ -18,7 +18,6 @@ package tools import ( "errors" - "fmt" "sort" "sync" @@ -125,14 +124,6 @@ func (f *FakeEtcdClient) updateResponse(key string) { f.Data[key] = *resp.N } -func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response, error) { - f.Mutex.Lock() - defer f.Mutex.Unlock() - - f.Ix = f.Ix + 1 - return f.setLocked(fmt.Sprintf("%s/%d", key, f.Ix), data, ttl) -} - func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response, error) { if f.Err != nil { return nil, f.Err diff --git a/pkg/tools/interfaces.go b/pkg/tools/interfaces.go index 873d09602b3..257abbfd522 100644 --- a/pkg/tools/interfaces.go +++ b/pkg/tools/interfaces.go @@ -39,7 +39,6 @@ var ( // EtcdClient is an injectable interface for testing. type EtcdClient interface { GetCluster() []string - AddChild(key, data string, ttl uint64) (*etcd.Response, error) Get(key string, sort, recursive bool) (*etcd.Response, error) Set(key, value string, ttl uint64) (*etcd.Response, error) Create(key, value string, ttl uint64) (*etcd.Response, error) @@ -50,17 +49,6 @@ type EtcdClient interface { Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) } -// EtcdGetSet interface exposes only the etcd operations needed by EtcdHelper. -type EtcdGetSet interface { - GetCluster() []string - Get(key string, sort, recursive bool) (*etcd.Response, error) - Set(key, value string, ttl uint64) (*etcd.Response, error) - Create(key, value string, ttl uint64) (*etcd.Response, error) - Delete(key string, recursive bool) (*etcd.Response, error) - CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) - Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) -} - // EtcdVersioner abstracts setting and retrieving fields from the etcd response onto the object // or list. type EtcdVersioner interface { diff --git a/pkg/tools/metrics/metrics.go b/pkg/tools/metrics/metrics.go new file mode 100644 index 00000000000..00bd35fc8c6 --- /dev/null +++ b/pkg/tools/metrics/metrics.go @@ -0,0 +1,104 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +var ( + cacheHitCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "etcd_helper_cache_hit_count", + Help: "Counter of etcd helper cache hits.", + }, + ) + cacheMissCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "etcd_helper_cache_miss_count", + Help: "Counter of etcd helper cache miss.", + }, + ) + cacheEntryCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "etcd_helper_cache_entry_count", + Help: "Counter of etcd helper cache entries. This can be different from etcd_helper_cache_miss_count " + + "because two concurrent threads can miss the cache and generate the same entry twice.", + }, + ) + cacheGetLatency = prometheus.NewSummary( + prometheus.SummaryOpts{ + Name: "etcd_request_cache_get_latencies_summary", + Help: "Latency in microseconds of getting an object from etcd cache", + }, + ) + cacheAddLatency = prometheus.NewSummary( + prometheus.SummaryOpts{ + Name: "etcd_request_cache_add_latencies_summary", + Help: "Latency in microseconds of adding an object to etcd cache", + }, + ) + etcdRequestLatenciesSummary = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Name: "etcd_request_latencies_summary", + Help: "Etcd request latency summary in microseconds for each operation and object type.", + }, + []string{"operation", "type"}, + ) +) + +var registerMetrics sync.Once + +// Register all metrics. +func Register() { + // Register the metrics. + registerMetrics.Do(func() { + prometheus.MustRegister(cacheHitCounter) + prometheus.MustRegister(cacheMissCounter) + prometheus.MustRegister(cacheEntryCounter) + prometheus.MustRegister(cacheAddLatency) + prometheus.MustRegister(cacheGetLatency) + prometheus.MustRegister(etcdRequestLatenciesSummary) + }) +} + +func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) { + etcdRequestLatenciesSummary.WithLabelValues(verb, resource).Observe(float64(time.Since(startTime) / time.Microsecond)) +} + +func ObserveGetCache(startTime time.Time) { + cacheGetLatency.Observe(float64(time.Since(startTime) / time.Microsecond)) +} + +func ObserveAddCache(startTime time.Time) { + cacheAddLatency.Observe(float64(time.Since(startTime) / time.Microsecond)) +} + +func ObserveCacheHit() { + cacheHitCounter.Inc() +} + +func ObserveCacheMiss() { + cacheMissCounter.Inc() +} + +func ObserveNewEntry() { + cacheEntryCounter.Inc() +}