Merge pull request #12975 from wojtek-t/switch_cacher_for_pods

Switch on Cacher for pods, endpoints and nodes.
This commit is contained in:
Jerzy Szczepkowski 2015-08-21 10:28:38 +02:00
commit 46ca41c8e3
10 changed files with 151 additions and 21 deletions

View File

@ -430,7 +430,7 @@ func logStackOnRecover(panicReason interface{}, httpWriter http.ResponseWriter)
func (m *Master) init(c *Config) {
healthzChecks := []healthz.HealthzChecker{}
m.clock = util.RealClock{}
podStorage := podetcd.NewStorage(c.DatabaseStorage, c.KubeletClient)
podStorage := podetcd.NewStorage(c.DatabaseStorage, true, c.KubeletClient)
podTemplateStorage := podtemplateetcd.NewREST(c.DatabaseStorage)
@ -446,10 +446,10 @@ func (m *Master) init(c *Config) {
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(c.DatabaseStorage)
m.namespaceRegistry = namespace.NewRegistry(namespaceStorage)
endpointsStorage := endpointsetcd.NewREST(c.DatabaseStorage)
endpointsStorage := endpointsetcd.NewREST(c.DatabaseStorage, true)
m.endpointRegistry = endpoint.NewRegistry(endpointsStorage)
nodeStorage, nodeStatusStorage := nodeetcd.NewREST(c.DatabaseStorage, c.KubeletClient)
nodeStorage, nodeStatusStorage := nodeetcd.NewREST(c.DatabaseStorage, true, c.KubeletClient)
m.nodeRegistry = minion.NewRegistry(nodeStorage)
serviceStorage := serviceetcd.NewREST(c.DatabaseStorage)

View File

@ -32,8 +32,24 @@ type REST struct {
}
// NewREST returns a RESTStorage object that will work against endpoints.
func NewREST(s storage.Interface) *REST {
func NewREST(s storage.Interface, useCacher bool) *REST {
prefix := "/services/endpoints"
storageInterface := s
if useCacher {
config := storage.CacherConfig{
CacheCapacity: 1000,
Storage: s,
Type: &api.Endpoints{},
ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) {
return storage.NamespaceKeyFunc(prefix, obj)
},
NewListFunc: func() runtime.Object { return &api.EndpointsList{} },
}
storageInterface = storage.NewCacher(config)
}
store := &etcdgeneric.Etcd{
NewFunc: func() runtime.Object { return &api.Endpoints{} },
NewListFunc: func() runtime.Object { return &api.EndpointsList{} },
@ -54,7 +70,7 @@ func NewREST(s storage.Interface) *REST {
CreateStrategy: endpoint.Strategy,
UpdateStrategy: endpoint.Strategy,
Storage: s,
Storage: storageInterface,
}
return &REST{store}
}

View File

@ -33,7 +33,7 @@ import (
func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) {
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t)
return NewREST(etcdStorage), fakeClient
return NewREST(etcdStorage, false), fakeClient
}
func validNewEndpoints() *api.Endpoints {

View File

@ -49,8 +49,24 @@ func (r *StatusREST) Update(ctx api.Context, obj runtime.Object) (runtime.Object
}
// NewStorage returns a RESTStorage object that will work against nodes.
func NewREST(s storage.Interface, connection client.ConnectionInfoGetter) (*REST, *StatusREST) {
func NewREST(s storage.Interface, useCacher bool, connection client.ConnectionInfoGetter) (*REST, *StatusREST) {
prefix := "/minions"
storageInterface := s
if useCacher {
config := storage.CacherConfig{
CacheCapacity: 1000,
Storage: s,
Type: &api.Node{},
ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) {
return storage.NoNamespaceKeyFunc(prefix, obj)
},
NewListFunc: func() runtime.Object { return &api.NodeList{} },
}
storageInterface = storage.NewCacher(config)
}
store := &etcdgeneric.Etcd{
NewFunc: func() runtime.Object { return &api.Node{} },
NewListFunc: func() runtime.Object { return &api.NodeList{} },
@ -69,7 +85,7 @@ func NewREST(s storage.Interface, connection client.ConnectionInfoGetter) (*REST
CreateStrategy: minion.Strategy,
UpdateStrategy: minion.Strategy,
Storage: s,
Storage: storageInterface,
}
statusStore := *store

View File

@ -44,7 +44,7 @@ func (fakeConnectionInfoGetter) GetConnectionInfo(host string) (string, uint, ht
func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) {
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t)
storage, _ := NewREST(etcdStorage, fakeConnectionInfoGetter{})
storage, _ := NewREST(etcdStorage, false, fakeConnectionInfoGetter{})
return storage, fakeClient
}

View File

@ -57,8 +57,24 @@ type REST struct {
}
// NewStorage returns a RESTStorage object that will work against pods.
func NewStorage(s storage.Interface, k client.ConnectionInfoGetter) PodStorage {
func NewStorage(s storage.Interface, useCacher bool, k client.ConnectionInfoGetter) PodStorage {
prefix := "/pods"
storageInterface := s
if useCacher {
config := storage.CacherConfig{
CacheCapacity: 1000,
Storage: s,
Type: &api.Pod{},
ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) {
return storage.NamespaceKeyFunc(prefix, obj)
},
NewListFunc: func() runtime.Object { return &api.PodList{} },
}
storageInterface = storage.NewCacher(config)
}
store := &etcdgeneric.Etcd{
NewFunc: func() runtime.Object { return &api.Pod{} },
NewListFunc: func() runtime.Object { return &api.PodList{} },
@ -76,7 +92,7 @@ func NewStorage(s storage.Interface, k client.ConnectionInfoGetter) PodStorage {
},
EndpointName: "pods",
Storage: s,
Storage: storageInterface,
}
statusStore := *store

View File

@ -43,7 +43,7 @@ import (
func newStorage(t *testing.T) (*REST, *BindingREST, *StatusREST, *tools.FakeEtcdClient) {
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t)
storage := NewStorage(etcdStorage, nil)
storage := NewStorage(etcdStorage, false, nil)
return storage.Pod, storage.Binding, storage.Status, fakeClient
}
@ -989,7 +989,8 @@ func TestEtcdWatchPodsMatch(t *testing.T) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Name: "foo",
Namespace: "default",
Labels: map[string]string{
"name": "foo",
},

View File

@ -23,6 +23,7 @@ import (
"strings"
"sync"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/unversioned/cache"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime"
@ -62,9 +63,14 @@ type CacherConfig struct {
// Cacher is responsible for serving WATCH and LIST requests for a given
// resource from its internal cache and updating its cache in the background
// based on the underlying storage contents.
// Cacher implements storage.Interface (although most of the calls are just
// delegated to the underlying storage).
type Cacher struct {
sync.RWMutex
// Underlying storage.Interface.
storage Interface
// Whether Cacher is initialized.
initialized sync.WaitGroup
initOnce sync.Once
@ -93,6 +99,7 @@ func NewCacher(config CacherConfig) *Cacher {
cacher := &Cacher{
initialized: sync.WaitGroup{},
storage: config.Storage,
watchCache: watchCache,
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
watcherIdx: 0,
@ -134,7 +141,32 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
}
}
// Implements Watch (signature from storage.Interface).
// Implements storage.Interface.
func (c *Cacher) Backends() []string {
return c.storage.Backends()
}
// Implements storage.Interface.
func (c *Cacher) Versioner() Versioner {
return c.storage.Versioner()
}
// Implements storage.Interface.
func (c *Cacher) Create(key string, obj, out runtime.Object, ttl uint64) error {
return c.storage.Create(key, obj, out, ttl)
}
// Implements storage.Interface.
func (c *Cacher) Set(key string, obj, out runtime.Object, ttl uint64) error {
return c.storage.Set(key, obj, out, ttl)
}
// Implements storage.Interface.
func (c *Cacher) Delete(key string, out runtime.Object) error {
return c.storage.Delete(key, out)
}
// Implements storage.Interface.
func (c *Cacher) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
// We explicitly use thread unsafe version and do locking ourself to ensure that
// no new events will be processed in the meantime. The watchCache will be unlocked
@ -156,13 +188,34 @@ func (c *Cacher) Watch(key string, resourceVersion uint64, filter FilterFunc) (w
return watcher, nil
}
// Implements WatchList (signature from storage.Interface).
// Implements storage.Interface.
func (c *Cacher) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
return c.Watch(key, resourceVersion, filter)
}
// Implements List (signature from storage.Interface).
// Implements storage.Interface.
func (c *Cacher) Get(key string, objPtr runtime.Object, ignoreNotFound bool) error {
return c.storage.Get(key, objPtr, ignoreNotFound)
}
// Implements storage.Interface.
func (c *Cacher) GetToList(key string, listObj runtime.Object) error {
return c.storage.GetToList(key, listObj)
}
// Implements storage.Interface.
func (c *Cacher) List(key string, listObj runtime.Object) error {
return c.storage.List(key, listObj)
}
// ListFromMemory implements list operation (the same signature as List method)
// but it serves the contents from memory.
// Current we cannot use ListFromMemory() instead of List(), because it only
// guarantees eventual consistency (e.g. it's possible for Get called right after
// Create to return not-exist, before the change is propagate).
// TODO: We may consider changing to use ListFromMemory in the future, but this
// requires wider discussion as an "api semantic change".
func (c *Cacher) ListFromMemory(key string, listObj runtime.Object) error {
listPtr, err := runtime.GetItemsPtr(listObj)
if err != nil {
return err
@ -191,6 +244,16 @@ func (c *Cacher) List(key string, listObj runtime.Object) error {
return nil
}
// Implements storage.Interface.
func (c *Cacher) GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate UpdateFunc) error {
return c.storage.GuaranteedUpdate(key, ptrToType, ignoreNotFound, tryUpdate)
}
// Implements storage.Interface.
func (c *Cacher) Codec() runtime.Codec {
return c.storage.Codec()
}
func (c *Cacher) processEvent(event cache.WatchCacheEvent) {
c.Lock()
defer c.Unlock()
@ -327,13 +390,23 @@ func (c *cacheWatcher) sendWatchCacheEvent(event cache.WatchCacheEvent) {
if event.PrevObject != nil {
oldObjPasses = c.filter(event.PrevObject)
}
if !curObjPasses && !oldObjPasses {
// Watcher is not interested in that object.
return
}
object, err := api.Scheme.Copy(event.Object)
if err != nil {
glog.Errorf("unexpected copy error: %v", err)
return
}
switch {
case curObjPasses && !oldObjPasses:
c.result <- watch.Event{Type: watch.Added, Object: event.Object}
c.result <- watch.Event{Type: watch.Added, Object: object}
case curObjPasses && oldObjPasses:
c.result <- watch.Event{Type: watch.Modified, Object: event.Object}
c.result <- watch.Event{Type: watch.Modified, Object: object}
case !curObjPasses && oldObjPasses:
c.result <- watch.Event{Type: watch.Deleted, Object: event.Object}
c.result <- watch.Event{Type: watch.Deleted, Object: object}
}
}

View File

@ -76,7 +76,7 @@ func waitForUpToDateCache(cacher *storage.Cacher, resourceVersion uint64) error
return wait.Poll(10*time.Millisecond, 100*time.Millisecond, ready)
}
func TestList(t *testing.T) {
func TestListFromMemory(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
prefixedKey := etcdtest.AddPrefix("pods")
fakeClient.ExpectNotFoundGet(prefixedKey)
@ -151,7 +151,7 @@ func TestList(t *testing.T) {
}
result := &api.PodList{}
if err := cacher.List("pods/ns", result); err != nil {
if err := cacher.ListFromMemory("pods/ns", result); err != nil {
t.Errorf("unexpected error: %v", err)
}
if result.ListMeta.ResourceVersion != "5" {

View File

@ -58,3 +58,11 @@ func NamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
}
return prefix + "/" + meta.Namespace() + "/" + meta.Name(), nil
}
func NoNamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return "", err
}
return prefix + "/" + meta.Name(), nil
}