diff --git a/pkg/master/master.go b/pkg/master/master.go index 61f2b82dede..cb668ab789a 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -430,7 +430,7 @@ func logStackOnRecover(panicReason interface{}, httpWriter http.ResponseWriter) func (m *Master) init(c *Config) { healthzChecks := []healthz.HealthzChecker{} m.clock = util.RealClock{} - podStorage := podetcd.NewStorage(c.DatabaseStorage, c.KubeletClient) + podStorage := podetcd.NewStorage(c.DatabaseStorage, true, c.KubeletClient) podTemplateStorage := podtemplateetcd.NewREST(c.DatabaseStorage) @@ -446,10 +446,10 @@ func (m *Master) init(c *Config) { namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(c.DatabaseStorage) m.namespaceRegistry = namespace.NewRegistry(namespaceStorage) - endpointsStorage := endpointsetcd.NewREST(c.DatabaseStorage) + endpointsStorage := endpointsetcd.NewREST(c.DatabaseStorage, true) m.endpointRegistry = endpoint.NewRegistry(endpointsStorage) - nodeStorage, nodeStatusStorage := nodeetcd.NewREST(c.DatabaseStorage, c.KubeletClient) + nodeStorage, nodeStatusStorage := nodeetcd.NewREST(c.DatabaseStorage, true, c.KubeletClient) m.nodeRegistry = minion.NewRegistry(nodeStorage) serviceStorage := serviceetcd.NewREST(c.DatabaseStorage) diff --git a/pkg/registry/endpoint/etcd/etcd.go b/pkg/registry/endpoint/etcd/etcd.go index 9d9f906015b..12388968626 100644 --- a/pkg/registry/endpoint/etcd/etcd.go +++ b/pkg/registry/endpoint/etcd/etcd.go @@ -32,8 +32,24 @@ type REST struct { } // NewREST returns a RESTStorage object that will work against endpoints. -func NewREST(s storage.Interface) *REST { +func NewREST(s storage.Interface, useCacher bool) *REST { prefix := "/services/endpoints" + + storageInterface := s + if useCacher { + config := storage.CacherConfig{ + CacheCapacity: 1000, + Storage: s, + Type: &api.Endpoints{}, + ResourcePrefix: prefix, + KeyFunc: func(obj runtime.Object) (string, error) { + return storage.NamespaceKeyFunc(prefix, obj) + }, + NewListFunc: func() runtime.Object { return &api.EndpointsList{} }, + } + storageInterface = storage.NewCacher(config) + } + store := &etcdgeneric.Etcd{ NewFunc: func() runtime.Object { return &api.Endpoints{} }, NewListFunc: func() runtime.Object { return &api.EndpointsList{} }, @@ -54,7 +70,7 @@ func NewREST(s storage.Interface) *REST { CreateStrategy: endpoint.Strategy, UpdateStrategy: endpoint.Strategy, - Storage: s, + Storage: storageInterface, } return &REST{store} } diff --git a/pkg/registry/endpoint/etcd/etcd_test.go b/pkg/registry/endpoint/etcd/etcd_test.go index aa0c79c366e..4540c46fc62 100644 --- a/pkg/registry/endpoint/etcd/etcd_test.go +++ b/pkg/registry/endpoint/etcd/etcd_test.go @@ -33,7 +33,7 @@ import ( func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) { etcdStorage, fakeClient := registrytest.NewEtcdStorage(t) - return NewREST(etcdStorage), fakeClient + return NewREST(etcdStorage, false), fakeClient } func validNewEndpoints() *api.Endpoints { diff --git a/pkg/registry/minion/etcd/etcd.go b/pkg/registry/minion/etcd/etcd.go index a46c11987f4..ac150d5aacf 100644 --- a/pkg/registry/minion/etcd/etcd.go +++ b/pkg/registry/minion/etcd/etcd.go @@ -49,8 +49,24 @@ func (r *StatusREST) Update(ctx api.Context, obj runtime.Object) (runtime.Object } // NewStorage returns a RESTStorage object that will work against nodes. -func NewREST(s storage.Interface, connection client.ConnectionInfoGetter) (*REST, *StatusREST) { +func NewREST(s storage.Interface, useCacher bool, connection client.ConnectionInfoGetter) (*REST, *StatusREST) { prefix := "/minions" + + storageInterface := s + if useCacher { + config := storage.CacherConfig{ + CacheCapacity: 1000, + Storage: s, + Type: &api.Node{}, + ResourcePrefix: prefix, + KeyFunc: func(obj runtime.Object) (string, error) { + return storage.NoNamespaceKeyFunc(prefix, obj) + }, + NewListFunc: func() runtime.Object { return &api.NodeList{} }, + } + storageInterface = storage.NewCacher(config) + } + store := &etcdgeneric.Etcd{ NewFunc: func() runtime.Object { return &api.Node{} }, NewListFunc: func() runtime.Object { return &api.NodeList{} }, @@ -69,7 +85,7 @@ func NewREST(s storage.Interface, connection client.ConnectionInfoGetter) (*REST CreateStrategy: minion.Strategy, UpdateStrategy: minion.Strategy, - Storage: s, + Storage: storageInterface, } statusStore := *store diff --git a/pkg/registry/minion/etcd/etcd_test.go b/pkg/registry/minion/etcd/etcd_test.go index f8682730b94..db53651cf4c 100644 --- a/pkg/registry/minion/etcd/etcd_test.go +++ b/pkg/registry/minion/etcd/etcd_test.go @@ -44,7 +44,7 @@ func (fakeConnectionInfoGetter) GetConnectionInfo(host string) (string, uint, ht func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) { etcdStorage, fakeClient := registrytest.NewEtcdStorage(t) - storage, _ := NewREST(etcdStorage, fakeConnectionInfoGetter{}) + storage, _ := NewREST(etcdStorage, false, fakeConnectionInfoGetter{}) return storage, fakeClient } diff --git a/pkg/registry/pod/etcd/etcd.go b/pkg/registry/pod/etcd/etcd.go index a2aa5eb7024..3fce7c9df29 100644 --- a/pkg/registry/pod/etcd/etcd.go +++ b/pkg/registry/pod/etcd/etcd.go @@ -57,8 +57,24 @@ type REST struct { } // NewStorage returns a RESTStorage object that will work against pods. -func NewStorage(s storage.Interface, k client.ConnectionInfoGetter) PodStorage { +func NewStorage(s storage.Interface, useCacher bool, k client.ConnectionInfoGetter) PodStorage { prefix := "/pods" + + storageInterface := s + if useCacher { + config := storage.CacherConfig{ + CacheCapacity: 1000, + Storage: s, + Type: &api.Pod{}, + ResourcePrefix: prefix, + KeyFunc: func(obj runtime.Object) (string, error) { + return storage.NamespaceKeyFunc(prefix, obj) + }, + NewListFunc: func() runtime.Object { return &api.PodList{} }, + } + storageInterface = storage.NewCacher(config) + } + store := &etcdgeneric.Etcd{ NewFunc: func() runtime.Object { return &api.Pod{} }, NewListFunc: func() runtime.Object { return &api.PodList{} }, @@ -76,7 +92,7 @@ func NewStorage(s storage.Interface, k client.ConnectionInfoGetter) PodStorage { }, EndpointName: "pods", - Storage: s, + Storage: storageInterface, } statusStore := *store diff --git a/pkg/registry/pod/etcd/etcd_test.go b/pkg/registry/pod/etcd/etcd_test.go index 1a8805ed803..34e94b3672a 100644 --- a/pkg/registry/pod/etcd/etcd_test.go +++ b/pkg/registry/pod/etcd/etcd_test.go @@ -43,7 +43,7 @@ import ( func newStorage(t *testing.T) (*REST, *BindingREST, *StatusREST, *tools.FakeEtcdClient) { etcdStorage, fakeClient := registrytest.NewEtcdStorage(t) - storage := NewStorage(etcdStorage, nil) + storage := NewStorage(etcdStorage, false, nil) return storage.Pod, storage.Binding, storage.Status, fakeClient } @@ -989,7 +989,8 @@ func TestEtcdWatchPodsMatch(t *testing.T) { pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ - Name: "foo", + Name: "foo", + Namespace: "default", Labels: map[string]string{ "name": "foo", }, diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index dbf8cbc722d..032fce5d0a8 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -23,6 +23,7 @@ import ( "strings" "sync" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/unversioned/cache" "k8s.io/kubernetes/pkg/conversion" "k8s.io/kubernetes/pkg/runtime" @@ -62,9 +63,14 @@ type CacherConfig struct { // Cacher is responsible for serving WATCH and LIST requests for a given // resource from its internal cache and updating its cache in the background // based on the underlying storage contents. +// Cacher implements storage.Interface (although most of the calls are just +// delegated to the underlying storage). type Cacher struct { sync.RWMutex + // Underlying storage.Interface. + storage Interface + // Whether Cacher is initialized. initialized sync.WaitGroup initOnce sync.Once @@ -93,6 +99,7 @@ func NewCacher(config CacherConfig) *Cacher { cacher := &Cacher{ initialized: sync.WaitGroup{}, + storage: config.Storage, watchCache: watchCache, reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0), watcherIdx: 0, @@ -134,7 +141,32 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) { } } -// Implements Watch (signature from storage.Interface). +// Implements storage.Interface. +func (c *Cacher) Backends() []string { + return c.storage.Backends() +} + +// Implements storage.Interface. +func (c *Cacher) Versioner() Versioner { + return c.storage.Versioner() +} + +// Implements storage.Interface. +func (c *Cacher) Create(key string, obj, out runtime.Object, ttl uint64) error { + return c.storage.Create(key, obj, out, ttl) +} + +// Implements storage.Interface. +func (c *Cacher) Set(key string, obj, out runtime.Object, ttl uint64) error { + return c.storage.Set(key, obj, out, ttl) +} + +// Implements storage.Interface. +func (c *Cacher) Delete(key string, out runtime.Object) error { + return c.storage.Delete(key, out) +} + +// Implements storage.Interface. func (c *Cacher) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { // We explicitly use thread unsafe version and do locking ourself to ensure that // no new events will be processed in the meantime. The watchCache will be unlocked @@ -156,13 +188,34 @@ func (c *Cacher) Watch(key string, resourceVersion uint64, filter FilterFunc) (w return watcher, nil } -// Implements WatchList (signature from storage.Interface). +// Implements storage.Interface. func (c *Cacher) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { return c.Watch(key, resourceVersion, filter) } -// Implements List (signature from storage.Interface). +// Implements storage.Interface. +func (c *Cacher) Get(key string, objPtr runtime.Object, ignoreNotFound bool) error { + return c.storage.Get(key, objPtr, ignoreNotFound) +} + +// Implements storage.Interface. +func (c *Cacher) GetToList(key string, listObj runtime.Object) error { + return c.storage.GetToList(key, listObj) +} + +// Implements storage.Interface. func (c *Cacher) List(key string, listObj runtime.Object) error { + return c.storage.List(key, listObj) +} + +// ListFromMemory implements list operation (the same signature as List method) +// but it serves the contents from memory. +// Current we cannot use ListFromMemory() instead of List(), because it only +// guarantees eventual consistency (e.g. it's possible for Get called right after +// Create to return not-exist, before the change is propagate). +// TODO: We may consider changing to use ListFromMemory in the future, but this +// requires wider discussion as an "api semantic change". +func (c *Cacher) ListFromMemory(key string, listObj runtime.Object) error { listPtr, err := runtime.GetItemsPtr(listObj) if err != nil { return err @@ -191,6 +244,16 @@ func (c *Cacher) List(key string, listObj runtime.Object) error { return nil } +// Implements storage.Interface. +func (c *Cacher) GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate UpdateFunc) error { + return c.storage.GuaranteedUpdate(key, ptrToType, ignoreNotFound, tryUpdate) +} + +// Implements storage.Interface. +func (c *Cacher) Codec() runtime.Codec { + return c.storage.Codec() +} + func (c *Cacher) processEvent(event cache.WatchCacheEvent) { c.Lock() defer c.Unlock() @@ -327,13 +390,23 @@ func (c *cacheWatcher) sendWatchCacheEvent(event cache.WatchCacheEvent) { if event.PrevObject != nil { oldObjPasses = c.filter(event.PrevObject) } + if !curObjPasses && !oldObjPasses { + // Watcher is not interested in that object. + return + } + + object, err := api.Scheme.Copy(event.Object) + if err != nil { + glog.Errorf("unexpected copy error: %v", err) + return + } switch { case curObjPasses && !oldObjPasses: - c.result <- watch.Event{Type: watch.Added, Object: event.Object} + c.result <- watch.Event{Type: watch.Added, Object: object} case curObjPasses && oldObjPasses: - c.result <- watch.Event{Type: watch.Modified, Object: event.Object} + c.result <- watch.Event{Type: watch.Modified, Object: object} case !curObjPasses && oldObjPasses: - c.result <- watch.Event{Type: watch.Deleted, Object: event.Object} + c.result <- watch.Event{Type: watch.Deleted, Object: object} } } diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go index edbcaf32caf..7a160e50a09 100644 --- a/pkg/storage/cacher_test.go +++ b/pkg/storage/cacher_test.go @@ -76,7 +76,7 @@ func waitForUpToDateCache(cacher *storage.Cacher, resourceVersion uint64) error return wait.Poll(10*time.Millisecond, 100*time.Millisecond, ready) } -func TestList(t *testing.T) { +func TestListFromMemory(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) prefixedKey := etcdtest.AddPrefix("pods") fakeClient.ExpectNotFoundGet(prefixedKey) @@ -151,7 +151,7 @@ func TestList(t *testing.T) { } result := &api.PodList{} - if err := cacher.List("pods/ns", result); err != nil { + if err := cacher.ListFromMemory("pods/ns", result); err != nil { t.Errorf("unexpected error: %v", err) } if result.ListMeta.ResourceVersion != "5" { diff --git a/pkg/storage/util.go b/pkg/storage/util.go index 518b31b2537..89b0b2af7da 100644 --- a/pkg/storage/util.go +++ b/pkg/storage/util.go @@ -58,3 +58,11 @@ func NamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) { } return prefix + "/" + meta.Namespace() + "/" + meta.Name(), nil } + +func NoNamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) { + meta, err := meta.Accessor(obj) + if err != nil { + return "", err + } + return prefix + "/" + meta.Name(), nil +}