mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 09:49:50 +00:00
Prepare for extracting EtcdHelper interface
This commit is contained in:
parent
43f6651c94
commit
ee92aa3897
@ -217,7 +217,7 @@ func (s *APIServer) verifyClusterIPFlags() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newEtcd(etcdConfigFile string, etcdServerList util.StringList, storageVersion string, pathPrefix string) (helper tools.EtcdHelper, err error) {
|
func newEtcd(etcdConfigFile string, etcdServerList util.StringList, storageVersion string, pathPrefix string) (helper tools.EtcdHelper, err error) {
|
||||||
var client tools.EtcdGetSet
|
var client tools.EtcdClient
|
||||||
if etcdConfigFile != "" {
|
if etcdConfigFile != "" {
|
||||||
client, err = etcd.NewClientFromFile(etcdConfigFile)
|
client, err = etcd.NewClientFromFile(etcdConfigFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -37,7 +37,7 @@ type Master string
|
|||||||
func (Master) IsAnAPIObject() {}
|
func (Master) IsAnAPIObject() {}
|
||||||
|
|
||||||
// NewEtcdMasterElector returns an implementation of election.MasterElector backed by etcd.
|
// NewEtcdMasterElector returns an implementation of election.MasterElector backed by etcd.
|
||||||
func NewEtcdMasterElector(h tools.EtcdGetSet) MasterElector {
|
func NewEtcdMasterElector(h tools.EtcdClient) MasterElector {
|
||||||
return &etcdMasterElector{etcd: h}
|
return &etcdMasterElector{etcd: h}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -45,7 +45,7 @@ type empty struct{}
|
|||||||
|
|
||||||
// internal implementation struct
|
// internal implementation struct
|
||||||
type etcdMasterElector struct {
|
type etcdMasterElector struct {
|
||||||
etcd tools.EtcdGetSet
|
etcd tools.EtcdClient
|
||||||
done chan empty
|
done chan empty
|
||||||
events chan watch.Event
|
events chan watch.Event
|
||||||
}
|
}
|
||||||
|
@ -125,7 +125,7 @@ type KubernetesScheduler struct {
|
|||||||
executorGroup uint64
|
executorGroup uint64
|
||||||
scheduleFunc PodScheduleFunc
|
scheduleFunc PodScheduleFunc
|
||||||
client *client.Client
|
client *client.Client
|
||||||
etcdClient tools.EtcdGetSet
|
etcdClient tools.EtcdClient
|
||||||
failoverTimeout float64 // in seconds
|
failoverTimeout float64 // in seconds
|
||||||
reconcileInterval int64
|
reconcileInterval int64
|
||||||
|
|
||||||
@ -158,7 +158,7 @@ type Config struct {
|
|||||||
Executor *mesos.ExecutorInfo
|
Executor *mesos.ExecutorInfo
|
||||||
ScheduleFunc PodScheduleFunc
|
ScheduleFunc PodScheduleFunc
|
||||||
Client *client.Client
|
Client *client.Client
|
||||||
EtcdClient tools.EtcdGetSet
|
EtcdClient tools.EtcdClient
|
||||||
FailoverTimeout float64
|
FailoverTimeout float64
|
||||||
ReconcileInterval int64
|
ReconcileInterval int64
|
||||||
ReconcileCooldown time.Duration
|
ReconcileCooldown time.Duration
|
||||||
|
@ -525,7 +525,7 @@ func validateLeadershipTransition(desired, current string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// hacked from https://github.com/GoogleCloudPlatform/kubernetes/blob/release-0.14/cmd/kube-apiserver/app/server.go
|
// hacked from https://github.com/GoogleCloudPlatform/kubernetes/blob/release-0.14/cmd/kube-apiserver/app/server.go
|
||||||
func newEtcd(etcdConfigFile string, etcdServerList util.StringList) (client tools.EtcdGetSet, err error) {
|
func newEtcd(etcdConfigFile string, etcdServerList util.StringList) (client tools.EtcdClient, err error) {
|
||||||
if etcdConfigFile != "" {
|
if etcdConfigFile != "" {
|
||||||
client, err = etcd.NewClientFromFile(etcdConfigFile)
|
client, err = etcd.NewClientFromFile(etcdConfigFile)
|
||||||
} else {
|
} else {
|
||||||
@ -534,7 +534,7 @@ func newEtcd(etcdConfigFile string, etcdServerList util.StringList) (client tool
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config) (*ha.SchedulerProcess, ha.DriverFactory, tools.EtcdGetSet, *uid.UID) {
|
func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config) (*ha.SchedulerProcess, ha.DriverFactory, tools.EtcdClient, *uid.UID) {
|
||||||
|
|
||||||
s.FrameworkName = strings.TrimSpace(s.FrameworkName)
|
s.FrameworkName = strings.TrimSpace(s.FrameworkName)
|
||||||
if s.FrameworkName == "" {
|
if s.FrameworkName == "" {
|
||||||
@ -737,7 +737,7 @@ func (s *SchedulerServer) buildFrameworkInfo() (info *mesos.FrameworkInfo, cred
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SchedulerServer) fetchFrameworkID(client tools.EtcdGetSet) (*mesos.FrameworkID, error) {
|
func (s *SchedulerServer) fetchFrameworkID(client tools.EtcdClient) (*mesos.FrameworkID, error) {
|
||||||
if s.FailoverTimeout > 0 {
|
if s.FailoverTimeout > 0 {
|
||||||
if response, err := client.Get(meta.FrameworkIDKey, false, false); err != nil {
|
if response, err := client.Get(meta.FrameworkIDKey, false, false); err != nil {
|
||||||
if !tools.IsEtcdNotFound(err) {
|
if !tools.IsEtcdNotFound(err) {
|
||||||
|
@ -229,7 +229,7 @@ type Master struct {
|
|||||||
|
|
||||||
// NewEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version
|
// NewEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version
|
||||||
// is incorrect.
|
// is incorrect.
|
||||||
func NewEtcdHelper(client tools.EtcdGetSet, version string, prefix string) (helper tools.EtcdHelper, err error) {
|
func NewEtcdHelper(client tools.EtcdClient, version string, prefix string) (helper tools.EtcdHelper, err error) {
|
||||||
if version == "" {
|
if version == "" {
|
||||||
version = latest.Version
|
version = latest.Version
|
||||||
}
|
}
|
||||||
|
@ -31,76 +31,26 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools/metrics"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
"github.com/coreos/go-etcd/etcd"
|
"github.com/coreos/go-etcd/etcd"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
cacheHitCounter = prometheus.NewCounter(
|
|
||||||
prometheus.CounterOpts{
|
|
||||||
Name: "etcd_helper_cache_hit_count",
|
|
||||||
Help: "Counter of etcd helper cache hits.",
|
|
||||||
},
|
|
||||||
)
|
|
||||||
cacheMissCounter = prometheus.NewCounter(
|
|
||||||
prometheus.CounterOpts{
|
|
||||||
Name: "etcd_helper_cache_miss_count",
|
|
||||||
Help: "Counter of etcd helper cache miss.",
|
|
||||||
},
|
|
||||||
)
|
|
||||||
cacheEntryCounter = prometheus.NewCounter(
|
|
||||||
prometheus.CounterOpts{
|
|
||||||
Name: "etcd_helper_cache_entry_count",
|
|
||||||
Help: "Counter of etcd helper cache entries. This can be different from etcd_helper_cache_miss_count " +
|
|
||||||
"because two concurrent threads can miss the cache and generate the same entry twice.",
|
|
||||||
},
|
|
||||||
)
|
|
||||||
cacheGetLatency = prometheus.NewSummary(
|
|
||||||
prometheus.SummaryOpts{
|
|
||||||
Name: "etcd_request_cache_get_latencies_summary",
|
|
||||||
Help: "Latency in microseconds of getting an object from etcd cache",
|
|
||||||
},
|
|
||||||
)
|
|
||||||
cacheAddLatency = prometheus.NewSummary(
|
|
||||||
prometheus.SummaryOpts{
|
|
||||||
Name: "etcd_request_cache_add_latencies_summary",
|
|
||||||
Help: "Latency in microseconds of adding an object to etcd cache",
|
|
||||||
},
|
|
||||||
)
|
|
||||||
etcdRequestLatenciesSummary = prometheus.NewSummaryVec(
|
|
||||||
prometheus.SummaryOpts{
|
|
||||||
Name: "etcd_request_latencies_summary",
|
|
||||||
Help: "Etcd request latency summary in microseconds for each operation and object type.",
|
|
||||||
},
|
|
||||||
[]string{"operation", "type"},
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
const maxEtcdCacheEntries int = 50000
|
const maxEtcdCacheEntries int = 50000
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
prometheus.MustRegister(cacheHitCounter)
|
metrics.Register()
|
||||||
prometheus.MustRegister(cacheMissCounter)
|
|
||||||
prometheus.MustRegister(cacheEntryCounter)
|
|
||||||
prometheus.MustRegister(cacheAddLatency)
|
|
||||||
prometheus.MustRegister(cacheGetLatency)
|
|
||||||
prometheus.MustRegister(etcdRequestLatenciesSummary)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func getTypeName(obj interface{}) string {
|
func getTypeName(obj interface{}) string {
|
||||||
return reflect.TypeOf(obj).String()
|
return reflect.TypeOf(obj).String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func recordEtcdRequestLatency(verb, resource string, startTime time.Time) {
|
|
||||||
etcdRequestLatenciesSummary.WithLabelValues(verb, resource).Observe(float64(time.Since(startTime) / time.Microsecond))
|
|
||||||
}
|
|
||||||
|
|
||||||
// EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client.
|
// EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client.
|
||||||
type EtcdHelper struct {
|
type EtcdHelper struct {
|
||||||
Client EtcdGetSet
|
Client EtcdClient
|
||||||
Codec runtime.Codec
|
Codec runtime.Codec
|
||||||
Copier runtime.ObjectCopier
|
Copier runtime.ObjectCopier
|
||||||
// optional, no atomic operations can be performed without this interface
|
// optional, no atomic operations can be performed without this interface
|
||||||
@ -121,7 +71,7 @@ type EtcdHelper struct {
|
|||||||
// NewEtcdHelper creates a helper that works against objects that use the internal
|
// NewEtcdHelper creates a helper that works against objects that use the internal
|
||||||
// Kubernetes API objects.
|
// Kubernetes API objects.
|
||||||
// TODO: Refactor to take a runtiem.ObjectCopier
|
// TODO: Refactor to take a runtiem.ObjectCopier
|
||||||
func NewEtcdHelper(client EtcdGetSet, codec runtime.Codec, prefix string) EtcdHelper {
|
func NewEtcdHelper(client EtcdClient, codec runtime.Codec, prefix string) EtcdHelper {
|
||||||
return EtcdHelper{
|
return EtcdHelper{
|
||||||
Client: client,
|
Client: client,
|
||||||
Codec: codec,
|
Codec: codec,
|
||||||
@ -234,7 +184,7 @@ type etcdCache interface {
|
|||||||
func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) {
|
func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
cacheGetLatency.Observe(float64(time.Since(startTime) / time.Microsecond))
|
metrics.ObserveGetCache(startTime)
|
||||||
}()
|
}()
|
||||||
obj, found := h.cache.Get(index)
|
obj, found := h.cache.Get(index)
|
||||||
if found {
|
if found {
|
||||||
@ -245,17 +195,17 @@ func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) {
|
|||||||
glog.Errorf("Error during DeepCopy of cached object: %q", err)
|
glog.Errorf("Error during DeepCopy of cached object: %q", err)
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
cacheHitCounter.Inc()
|
metrics.ObserveCacheHit()
|
||||||
return objCopy.(runtime.Object), true
|
return objCopy.(runtime.Object), true
|
||||||
}
|
}
|
||||||
cacheMissCounter.Inc()
|
metrics.ObserveCacheMiss()
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *EtcdHelper) addToCache(index uint64, obj runtime.Object) {
|
func (h *EtcdHelper) addToCache(index uint64, obj runtime.Object) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
cacheAddLatency.Observe(float64(time.Since(startTime) / time.Microsecond))
|
metrics.ObserveAddCache(startTime)
|
||||||
}()
|
}()
|
||||||
objCopy, err := h.Copier.Copy(obj)
|
objCopy, err := h.Copier.Copy(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -264,7 +214,7 @@ func (h *EtcdHelper) addToCache(index uint64, obj runtime.Object) {
|
|||||||
}
|
}
|
||||||
isOverwrite := h.cache.Add(index, objCopy)
|
isOverwrite := h.cache.Add(index, objCopy)
|
||||||
if !isOverwrite {
|
if !isOverwrite {
|
||||||
cacheEntryCounter.Inc()
|
metrics.ObserveNewEntry()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -281,7 +231,7 @@ func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error {
|
|||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
trace.Step("About to list etcd node")
|
trace.Step("About to list etcd node")
|
||||||
nodes, index, err := h.listEtcdNode(key)
|
nodes, index, err := h.listEtcdNode(key)
|
||||||
recordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
|
metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
|
||||||
trace.Step("Etcd node listed")
|
trace.Step("Etcd node listed")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -310,7 +260,7 @@ func (h *EtcdHelper) ExtractObjToList(key string, listObj runtime.Object) error
|
|||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
trace.Step("About to read etcd node")
|
trace.Step("About to read etcd node")
|
||||||
response, err := h.Client.Get(key, false, false)
|
response, err := h.Client.Get(key, false, false)
|
||||||
recordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
|
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
|
||||||
trace.Step("Etcd node read")
|
trace.Step("Etcd node read")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if IsEtcdNotFound(err) {
|
if IsEtcdNotFound(err) {
|
||||||
@ -348,7 +298,7 @@ func (h *EtcdHelper) ExtractObj(key string, objPtr runtime.Object, ignoreNotFoun
|
|||||||
func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, err error) {
|
func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, err error) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
response, err := h.Client.Get(key, false, false)
|
response, err := h.Client.Get(key, false, false)
|
||||||
recordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
|
metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
|
||||||
|
|
||||||
if err != nil && !IsEtcdNotFound(err) {
|
if err != nil && !IsEtcdNotFound(err) {
|
||||||
return "", nil, nil, err
|
return "", nil, nil, err
|
||||||
@ -404,7 +354,7 @@ func (h *EtcdHelper) CreateObj(key string, obj, out runtime.Object, ttl uint64)
|
|||||||
|
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
response, err := h.Client.Create(key, string(data), ttl)
|
response, err := h.Client.Create(key, string(data), ttl)
|
||||||
recordEtcdRequestLatency("create", getTypeName(obj), startTime)
|
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -422,7 +372,7 @@ func (h *EtcdHelper) Delete(key string, recursive bool) error {
|
|||||||
key = h.PrefixEtcdKey(key)
|
key = h.PrefixEtcdKey(key)
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
_, err := h.Client.Delete(key, recursive)
|
_, err := h.Client.Delete(key, recursive)
|
||||||
recordEtcdRequestLatency("delete", "UNKNOWN", startTime)
|
metrics.RecordEtcdRequestLatency("delete", "UNKNOWN", startTime)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -435,7 +385,7 @@ func (h *EtcdHelper) DeleteObj(key string, out runtime.Object) error {
|
|||||||
|
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
response, err := h.Client.Delete(key, false)
|
response, err := h.Client.Delete(key, false)
|
||||||
recordEtcdRequestLatency("delete", getTypeName(out), startTime)
|
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
|
||||||
if !IsEtcdNotFound(err) {
|
if !IsEtcdNotFound(err) {
|
||||||
// if the object that existed prior to the delete is returned by etcd, update out.
|
// if the object that existed prior to the delete is returned by etcd, update out.
|
||||||
if err != nil || response.PrevNode != nil {
|
if err != nil || response.PrevNode != nil {
|
||||||
@ -462,7 +412,7 @@ func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) err
|
|||||||
create = false
|
create = false
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
response, err = h.Client.CompareAndSwap(key, string(data), ttl, "", version)
|
response, err = h.Client.CompareAndSwap(key, string(data), ttl, "", version)
|
||||||
recordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime)
|
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -472,7 +422,7 @@ func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) err
|
|||||||
// Create will fail if a key already exists.
|
// Create will fail if a key already exists.
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
response, err = h.Client.Create(key, string(data), ttl)
|
response, err = h.Client.Create(key, string(data), ttl)
|
||||||
recordEtcdRequestLatency("create", getTypeName(obj), startTime)
|
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -589,7 +539,7 @@ func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno
|
|||||||
if index == 0 {
|
if index == 0 {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
response, err := h.Client.Create(key, string(data), ttl)
|
response, err := h.Client.Create(key, string(data), ttl)
|
||||||
recordEtcdRequestLatency("create", getTypeName(ptrToType), startTime)
|
metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime)
|
||||||
if IsEtcdNodeExist(err) {
|
if IsEtcdNodeExist(err) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -604,7 +554,7 @@ func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno
|
|||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
// Swap origBody with data, if origBody is the latest etcd data.
|
// Swap origBody with data, if origBody is the latest etcd data.
|
||||||
response, err := h.Client.CompareAndSwap(key, string(data), ttl, origBody, index)
|
response, err := h.Client.CompareAndSwap(key, string(data), ttl, origBody, index)
|
||||||
recordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime)
|
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime)
|
||||||
if IsEtcdTestFailed(err) {
|
if IsEtcdTestFailed(err) {
|
||||||
// Try again.
|
// Try again.
|
||||||
continue
|
continue
|
||||||
|
@ -186,7 +186,7 @@ func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding
|
|||||||
|
|
||||||
// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
|
// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
|
||||||
// as a goroutine.
|
// as a goroutine.
|
||||||
func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) {
|
func (w *etcdWatcher) etcdWatch(client EtcdClient, key string, resourceVersion uint64) {
|
||||||
defer util.HandleCrash()
|
defer util.HandleCrash()
|
||||||
defer close(w.etcdError)
|
defer close(w.etcdError)
|
||||||
if resourceVersion == 0 {
|
if resourceVersion == 0 {
|
||||||
@ -204,7 +204,7 @@ func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion u
|
|||||||
}
|
}
|
||||||
|
|
||||||
// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent
|
// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent
|
||||||
func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) {
|
func etcdGetInitialWatchState(client EtcdClient, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) {
|
||||||
resp, err := client.Get(key, false, recursive)
|
resp, err := client.Get(key, false, recursive)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !IsEtcdNotFound(err) {
|
if !IsEtcdNotFound(err) {
|
||||||
|
@ -18,7 +18,6 @@ package tools
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
@ -125,14 +124,6 @@ func (f *FakeEtcdClient) updateResponse(key string) {
|
|||||||
f.Data[key] = *resp.N
|
f.Data[key] = *resp.N
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response, error) {
|
|
||||||
f.Mutex.Lock()
|
|
||||||
defer f.Mutex.Unlock()
|
|
||||||
|
|
||||||
f.Ix = f.Ix + 1
|
|
||||||
return f.setLocked(fmt.Sprintf("%s/%d", key, f.Ix), data, ttl)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response, error) {
|
func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response, error) {
|
||||||
if f.Err != nil {
|
if f.Err != nil {
|
||||||
return nil, f.Err
|
return nil, f.Err
|
||||||
|
@ -39,7 +39,6 @@ var (
|
|||||||
// EtcdClient is an injectable interface for testing.
|
// EtcdClient is an injectable interface for testing.
|
||||||
type EtcdClient interface {
|
type EtcdClient interface {
|
||||||
GetCluster() []string
|
GetCluster() []string
|
||||||
AddChild(key, data string, ttl uint64) (*etcd.Response, error)
|
|
||||||
Get(key string, sort, recursive bool) (*etcd.Response, error)
|
Get(key string, sort, recursive bool) (*etcd.Response, error)
|
||||||
Set(key, value string, ttl uint64) (*etcd.Response, error)
|
Set(key, value string, ttl uint64) (*etcd.Response, error)
|
||||||
Create(key, value string, ttl uint64) (*etcd.Response, error)
|
Create(key, value string, ttl uint64) (*etcd.Response, error)
|
||||||
@ -50,17 +49,6 @@ type EtcdClient interface {
|
|||||||
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
|
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// EtcdGetSet interface exposes only the etcd operations needed by EtcdHelper.
|
|
||||||
type EtcdGetSet interface {
|
|
||||||
GetCluster() []string
|
|
||||||
Get(key string, sort, recursive bool) (*etcd.Response, error)
|
|
||||||
Set(key, value string, ttl uint64) (*etcd.Response, error)
|
|
||||||
Create(key, value string, ttl uint64) (*etcd.Response, error)
|
|
||||||
Delete(key string, recursive bool) (*etcd.Response, error)
|
|
||||||
CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error)
|
|
||||||
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// EtcdVersioner abstracts setting and retrieving fields from the etcd response onto the object
|
// EtcdVersioner abstracts setting and retrieving fields from the etcd response onto the object
|
||||||
// or list.
|
// or list.
|
||||||
type EtcdVersioner interface {
|
type EtcdVersioner interface {
|
||||||
|
104
pkg/tools/metrics/metrics.go
Normal file
104
pkg/tools/metrics/metrics.go
Normal file
@ -0,0 +1,104 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
cacheHitCounter = prometheus.NewCounter(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Name: "etcd_helper_cache_hit_count",
|
||||||
|
Help: "Counter of etcd helper cache hits.",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
cacheMissCounter = prometheus.NewCounter(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Name: "etcd_helper_cache_miss_count",
|
||||||
|
Help: "Counter of etcd helper cache miss.",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
cacheEntryCounter = prometheus.NewCounter(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Name: "etcd_helper_cache_entry_count",
|
||||||
|
Help: "Counter of etcd helper cache entries. This can be different from etcd_helper_cache_miss_count " +
|
||||||
|
"because two concurrent threads can miss the cache and generate the same entry twice.",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
cacheGetLatency = prometheus.NewSummary(
|
||||||
|
prometheus.SummaryOpts{
|
||||||
|
Name: "etcd_request_cache_get_latencies_summary",
|
||||||
|
Help: "Latency in microseconds of getting an object from etcd cache",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
cacheAddLatency = prometheus.NewSummary(
|
||||||
|
prometheus.SummaryOpts{
|
||||||
|
Name: "etcd_request_cache_add_latencies_summary",
|
||||||
|
Help: "Latency in microseconds of adding an object to etcd cache",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
etcdRequestLatenciesSummary = prometheus.NewSummaryVec(
|
||||||
|
prometheus.SummaryOpts{
|
||||||
|
Name: "etcd_request_latencies_summary",
|
||||||
|
Help: "Etcd request latency summary in microseconds for each operation and object type.",
|
||||||
|
},
|
||||||
|
[]string{"operation", "type"},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
var registerMetrics sync.Once
|
||||||
|
|
||||||
|
// Register all metrics.
|
||||||
|
func Register() {
|
||||||
|
// Register the metrics.
|
||||||
|
registerMetrics.Do(func() {
|
||||||
|
prometheus.MustRegister(cacheHitCounter)
|
||||||
|
prometheus.MustRegister(cacheMissCounter)
|
||||||
|
prometheus.MustRegister(cacheEntryCounter)
|
||||||
|
prometheus.MustRegister(cacheAddLatency)
|
||||||
|
prometheus.MustRegister(cacheGetLatency)
|
||||||
|
prometheus.MustRegister(etcdRequestLatenciesSummary)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) {
|
||||||
|
etcdRequestLatenciesSummary.WithLabelValues(verb, resource).Observe(float64(time.Since(startTime) / time.Microsecond))
|
||||||
|
}
|
||||||
|
|
||||||
|
func ObserveGetCache(startTime time.Time) {
|
||||||
|
cacheGetLatency.Observe(float64(time.Since(startTime) / time.Microsecond))
|
||||||
|
}
|
||||||
|
|
||||||
|
func ObserveAddCache(startTime time.Time) {
|
||||||
|
cacheAddLatency.Observe(float64(time.Since(startTime) / time.Microsecond))
|
||||||
|
}
|
||||||
|
|
||||||
|
func ObserveCacheHit() {
|
||||||
|
cacheHitCounter.Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
func ObserveCacheMiss() {
|
||||||
|
cacheMissCounter.Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
func ObserveNewEntry() {
|
||||||
|
cacheEntryCounter.Inc()
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user