diff --git a/plugin/pkg/admission/namespace/lifecycle/admission.go b/plugin/pkg/admission/namespace/lifecycle/admission.go index e75f56397c4..b0f697c5175 100644 --- a/plugin/pkg/admission/namespace/lifecycle/admission.go +++ b/plugin/pkg/admission/namespace/lifecycle/admission.go @@ -21,22 +21,28 @@ import ( "io" "time" + lru "github.com/hashicorp/golang-lru" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" - "k8s.io/kubernetes/pkg/client/cache" - "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/sets" - "k8s.io/kubernetes/pkg/watch" ) -const PluginName = "NamespaceLifecycle" +const ( + // Name of admission plug-in + PluginName = "NamespaceLifecycle" + // how long a namespace stays in the force live lookup cache before expiration. + forceLiveLookupTTL = 30 * time.Second +) func init() { admission.RegisterPlugin(PluginName, func(client clientset.Interface, config io.Reader) (admission.Interface, error) { - return NewLifecycle(client, sets.NewString(api.NamespaceDefault, api.NamespaceSystem)), nil + return NewLifecycle(client, sets.NewString(api.NamespaceDefault, api.NamespaceSystem)) }) } @@ -45,11 +51,29 @@ func init() { type lifecycle struct { *admission.Handler client clientset.Interface - store cache.Store immortalNamespaces sets.String + namespaceInformer framework.SharedIndexInformer + // forceLiveLookupCache holds a list of entries for namespaces that we have a strong reason to believe are stale in our local cache. + // if a namespace is in this cache, then we will ignore our local state and always fetch latest from api server. + forceLiveLookupCache *lru.Cache } -func (l *lifecycle) Admit(a admission.Attributes) (err error) { +type forceLiveLookupEntry struct { + expiry time.Time +} + +var _ = admission.WantsInformerFactory(&lifecycle{}) + +func makeNamespaceKey(namespace string) *api.Namespace { + return &api.Namespace{ + ObjectMeta: api.ObjectMeta{ + Name: namespace, + Namespace: "", + }, + } +} + +func (l *lifecycle) Admit(a admission.Attributes) error { // prevent deletion of immortal namespaces if a.GetOperation() == admission.Delete && a.GetKind().GroupKind() == api.Kind("Namespace") && l.immortalNamespaces.Has(a.GetName()) { return errors.NewForbidden(a.GetResource().GroupResource(), a.GetName(), fmt.Errorf("this namespace may not be deleted")) @@ -61,31 +85,44 @@ func (l *lifecycle) Admit(a admission.Attributes) (err error) { if len(a.GetNamespace()) == 0 || a.GetKind().GroupKind() == api.Kind("Namespace") { // if a namespace is deleted, we want to prevent all further creates into it // while it is undergoing termination. to reduce incidences where the cache - // is slow to update, we forcefully remove the namespace from our local cache. - // this will cause a live lookup of the namespace to get its latest state even - // before the watch notification is received. + // is slow to update, we add the namespace into a force live lookup list to ensure + // we are not looking at stale state. if a.GetOperation() == admission.Delete { - l.store.Delete(&api.Namespace{ - ObjectMeta: api.ObjectMeta{ - Name: a.GetName(), - }, - }) + newEntry := forceLiveLookupEntry{ + expiry: time.Now().Add(forceLiveLookupTTL), + } + l.forceLiveLookupCache.Add(a.GetName(), newEntry) } return nil } - namespaceObj, exists, err := l.store.Get(&api.Namespace{ - ObjectMeta: api.ObjectMeta{ - Name: a.GetNamespace(), - Namespace: "", - }, - }) + // we need to wait for our caches to warm + if !l.WaitForReady() { + return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request")) + } + + var ( + namespaceObj interface{} + exists bool + err error + ) + + key := makeNamespaceKey(a.GetNamespace()) + namespaceObj, exists, err = l.namespaceInformer.GetStore().Get(key) if err != nil { return errors.NewInternalError(err) } + // forceLiveLookup if true will skip looking at local cache state and instead always make a live call to server. + forceLiveLookup := false + lruItemObj, ok := l.forceLiveLookupCache.Get(a.GetNamespace()) + if ok && lruItemObj.(forceLiveLookupEntry).expiry.Before(time.Now()) { + // we think the namespace was marked for deletion, but our current local cache says otherwise, we will force a live lookup. + forceLiveLookup = exists && namespaceObj.(*api.Namespace).Status.Phase == api.NamespaceActive + } + // refuse to operate on non-existent namespaces - if !exists { + if !exists || forceLiveLookup { // in case of latency in our caches, make a call direct to storage to verify that it truly exists or not namespaceObj, err = l.client.Core().Namespaces().Get(a.GetNamespace()) if err != nil { @@ -111,26 +148,27 @@ func (l *lifecycle) Admit(a admission.Attributes) (err error) { } // NewLifecycle creates a new namespace lifecycle admission control handler -func NewLifecycle(c clientset.Interface, immortalNamespaces sets.String) admission.Interface { - store := cache.NewStore(cache.MetaNamespaceKeyFunc) - reflector := cache.NewReflector( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return c.Core().Namespaces().List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return c.Core().Namespaces().Watch(options) - }, - }, - &api.Namespace{}, - store, - 5*time.Minute, - ) - reflector.Run() - return &lifecycle{ - Handler: admission.NewHandler(admission.Create, admission.Update, admission.Delete), - client: c, - store: store, - immortalNamespaces: immortalNamespaces, +func NewLifecycle(c clientset.Interface, immortalNamespaces sets.String) (admission.Interface, error) { + forceLiveLookupCache, err := lru.New(100) + if err != nil { + panic(err) } + return &lifecycle{ + Handler: admission.NewHandler(admission.Create, admission.Update, admission.Delete), + client: c, + immortalNamespaces: immortalNamespaces, + forceLiveLookupCache: forceLiveLookupCache, + }, nil +} + +func (l *lifecycle) SetInformerFactory(f informers.SharedInformerFactory) { + l.namespaceInformer = f.Namespaces().Informer() + l.SetReadyFunc(l.namespaceInformer.HasSynced) +} + +func (l *lifecycle) Validate() error { + if l.namespaceInformer == nil { + return fmt.Errorf("missing namespaceInformer") + } + return nil } diff --git a/plugin/pkg/admission/namespace/lifecycle/admission_test.go b/plugin/pkg/admission/namespace/lifecycle/admission_test.go index 99597f48c18..e0dd88f87e8 100644 --- a/plugin/pkg/admission/namespace/lifecycle/admission_test.go +++ b/plugin/pkg/admission/namespace/lifecycle/admission_test.go @@ -18,77 +18,131 @@ package lifecycle import ( "fmt" - "sync" "testing" + "time" "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/api/unversioned" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/testing/core" - "k8s.io/kubernetes/pkg/client/unversioned/testclient" + "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/wait" ) -// TestAdmission -func TestAdmission(t *testing.T) { - namespaceObj := &api.Namespace{ - ObjectMeta: api.ObjectMeta{ - Name: "test", - Namespace: "", - }, - Status: api.NamespaceStatus{ - Phase: api.NamespaceActive, - }, - } - var namespaceLock sync.RWMutex - - store := cache.NewStore(cache.MetaNamespaceKeyFunc) - store.Add(namespaceObj) - mockClient := fake.NewSimpleClientset() - mockClient.PrependReactor("get", "namespaces", func(action core.Action) (bool, runtime.Object, error) { - namespaceLock.RLock() - defer namespaceLock.RUnlock() - if getAction, ok := action.(testclient.GetAction); ok && getAction.GetName() == namespaceObj.Name { - return true, namespaceObj, nil - } - return true, nil, fmt.Errorf("No result for action %v", action) - }) - mockClient.PrependReactor("list", "namespaces", func(action core.Action) (bool, runtime.Object, error) { - namespaceLock.RLock() - defer namespaceLock.RUnlock() - return true, &api.NamespaceList{Items: []api.Namespace{*namespaceObj}}, nil - }) - - lfhandler := NewLifecycle(mockClient, sets.NewString("default")).(*lifecycle) - lfhandler.store = store - handler := admission.NewChainHandler(lfhandler) - pod := api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespaceObj.Name}, - Spec: api.PodSpec{ - Volumes: []api.Volume{{Name: "vol"}}, - Containers: []api.Container{{Name: "ctr", Image: "image"}}, - }, - } - badPod := api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "456", Namespace: "doesnotexist"}, - Spec: api.PodSpec{ - Volumes: []api.Volume{{Name: "vol"}}, - Containers: []api.Container{{Name: "ctr", Image: "image"}}, - }, - } - err := handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) +// newHandlerForTest returns a configured handler for testing. +func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) { + f := informers.NewSharedInformerFactory(c, 5*time.Minute) + handler, err := NewLifecycle(c, sets.NewString(api.NamespaceDefault, api.NamespaceSystem)) if err != nil { - t.Errorf("Unexpected error returned from admission handler: %v", err) + return nil, f, err } + plugins := []admission.Interface{handler} + pluginInitializer := admission.NewPluginInitializer(f) + pluginInitializer.Initialize(plugins) + err = admission.Validate(plugins) + return handler, f, err +} - // change namespace state to terminating - namespaceLock.Lock() - namespaceObj.Status.Phase = api.NamespaceTerminating - namespaceLock.Unlock() - store.Add(namespaceObj) +// newMockClientForTest creates a mock client that returns a client configured for the specified list of namespaces with the specified phase. +func newMockClientForTest(namespaces map[string]api.NamespacePhase) *fake.Clientset { + mockClient := &fake.Clientset{} + mockClient.AddReactor("list", "namespaces", func(action core.Action) (bool, runtime.Object, error) { + namespaceList := &api.NamespaceList{ + ListMeta: unversioned.ListMeta{ + ResourceVersion: fmt.Sprintf("%d", len(namespaces)), + }, + } + index := 0 + for name, phase := range namespaces { + namespaceList.Items = append(namespaceList.Items, api.Namespace{ + ObjectMeta: api.ObjectMeta{ + Name: name, + ResourceVersion: fmt.Sprintf("%d", index), + }, + Status: api.NamespaceStatus{ + Phase: phase, + }, + }) + index++ + } + return true, namespaceList, nil + }) + return mockClient +} +// newPod returns a new pod for the specified namespace +func newPod(namespace string) api.Pod { + return api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace}, + Spec: api.PodSpec{ + Volumes: []api.Volume{{Name: "vol"}}, + Containers: []api.Container{{Name: "ctr", Image: "image"}}, + }, + } +} + +// TestAdmissionNamespaceDoesNotExist verifies pod is not admitted if namespace does not exist. +func TestAdmissionNamespaceDoesNotExist(t *testing.T) { + namespace := "test" + mockClient := newMockClientForTest(map[string]api.NamespacePhase{}) + mockClient.AddReactor("get", "namespaces", func(action core.Action) (bool, runtime.Object, error) { + return true, nil, fmt.Errorf("nope, out of luck") + }) + handler, informerFactory, err := newHandlerForTest(mockClient) + if err != nil { + t.Errorf("unexpected error initializing handler: %v", err) + } + informerFactory.Start(wait.NeverStop) + + pod := newPod(namespace) + err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) + if err == nil { + actions := "" + for _, action := range mockClient.Actions() { + actions = actions + action.GetVerb() + ":" + action.GetResource().Resource + ":" + action.GetSubresource() + ", " + } + t.Errorf("expected error returned from admission handler: %v", actions) + } +} + +// TestAdmissionNamespaceActive verifies a resource is admitted when the namespace is active. +func TestAdmissionNamespaceActive(t *testing.T) { + namespace := "test" + mockClient := newMockClientForTest(map[string]api.NamespacePhase{ + namespace: api.NamespaceActive, + }) + + handler, informerFactory, err := newHandlerForTest(mockClient) + if err != nil { + t.Errorf("unexpected error initializing handler: %v", err) + } + informerFactory.Start(wait.NeverStop) + + pod := newPod(namespace) + err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) + if err != nil { + t.Errorf("unexpected error returned from admission handler") + } +} + +// TestAdmissionNamespaceTerminating verifies a resource is not created when the namespace is active. +func TestAdmissionNamespaceTerminating(t *testing.T) { + namespace := "test" + mockClient := newMockClientForTest(map[string]api.NamespacePhase{ + namespace: api.NamespaceTerminating, + }) + + handler, informerFactory, err := newHandlerForTest(mockClient) + if err != nil { + t.Errorf("unexpected error initializing handler: %v", err) + } + informerFactory.Start(wait.NeverStop) + + pod := newPod(namespace) // verify create operations in the namespace cause an error err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) if err == nil { @@ -118,20 +172,4 @@ func TestAdmission(t *testing.T) { if err != nil { t.Errorf("Did not expect an error %v", err) } - - // verify create/update/delete of object in non-existent namespace throws error - err = handler.Admit(admission.NewAttributesRecord(&badPod, nil, api.Kind("Pod").WithVersion("version"), badPod.Namespace, badPod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) - if err == nil { - t.Errorf("Expected, but didn't get, an error (%v) that objects cannot be created in non-existant namespaces", err) - } - - err = handler.Admit(admission.NewAttributesRecord(&badPod, nil, api.Kind("Pod").WithVersion("version"), badPod.Namespace, badPod.Name, api.Resource("pods").WithVersion("version"), "", admission.Update, nil)) - if err == nil { - t.Errorf("Expected, but didn't get, an error (%v) that objects cannot be updated in non-existant namespaces", err) - } - - err = handler.Admit(admission.NewAttributesRecord(&badPod, nil, api.Kind("Pod").WithVersion("version"), badPod.Namespace, badPod.Name, api.Resource("pods").WithVersion("version"), "", admission.Delete, nil)) - if err == nil { - t.Errorf("Expected, but didn't get, an error (%v) that objects cannot be deleted in non-existant namespaces", err) - } }