Merge pull request #29926 from derekwaynecarr/ns_lifecycle_informer

Automatic merge from submit-queue

Move NamespaceLifecycle to use shared informers

This was a follow-up to https://github.com/kubernetes/kubernetes/pull/29634

Moves the `NamespaceLifecycle` plug-in to a shared infomer cache.

/cc @kubernetes/rh-cluster-infra @deads2k @hodovska
This commit is contained in:
Kubernetes Submit Queue 2016-08-04 19:22:59 -07:00 committed by GitHub
commit 353df20854
2 changed files with 192 additions and 116 deletions

View File

@ -21,22 +21,28 @@ import (
"io" "io"
"time" "time"
lru "github.com/hashicorp/golang-lru"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch"
) )
const PluginName = "NamespaceLifecycle" const (
// Name of admission plug-in
PluginName = "NamespaceLifecycle"
// how long a namespace stays in the force live lookup cache before expiration.
forceLiveLookupTTL = 30 * time.Second
)
func init() { func init() {
admission.RegisterPlugin(PluginName, func(client clientset.Interface, config io.Reader) (admission.Interface, error) { admission.RegisterPlugin(PluginName, func(client clientset.Interface, config io.Reader) (admission.Interface, error) {
return NewLifecycle(client, sets.NewString(api.NamespaceDefault, api.NamespaceSystem)), nil return NewLifecycle(client, sets.NewString(api.NamespaceDefault, api.NamespaceSystem))
}) })
} }
@ -45,11 +51,29 @@ func init() {
type lifecycle struct { type lifecycle struct {
*admission.Handler *admission.Handler
client clientset.Interface client clientset.Interface
store cache.Store
immortalNamespaces sets.String immortalNamespaces sets.String
namespaceInformer framework.SharedIndexInformer
// forceLiveLookupCache holds a list of entries for namespaces that we have a strong reason to believe are stale in our local cache.
// if a namespace is in this cache, then we will ignore our local state and always fetch latest from api server.
forceLiveLookupCache *lru.Cache
} }
func (l *lifecycle) Admit(a admission.Attributes) (err error) { type forceLiveLookupEntry struct {
expiry time.Time
}
var _ = admission.WantsInformerFactory(&lifecycle{})
func makeNamespaceKey(namespace string) *api.Namespace {
return &api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: namespace,
Namespace: "",
},
}
}
func (l *lifecycle) Admit(a admission.Attributes) error {
// prevent deletion of immortal namespaces // prevent deletion of immortal namespaces
if a.GetOperation() == admission.Delete && a.GetKind().GroupKind() == api.Kind("Namespace") && l.immortalNamespaces.Has(a.GetName()) { if a.GetOperation() == admission.Delete && a.GetKind().GroupKind() == api.Kind("Namespace") && l.immortalNamespaces.Has(a.GetName()) {
return errors.NewForbidden(a.GetResource().GroupResource(), a.GetName(), fmt.Errorf("this namespace may not be deleted")) return errors.NewForbidden(a.GetResource().GroupResource(), a.GetName(), fmt.Errorf("this namespace may not be deleted"))
@ -61,31 +85,44 @@ func (l *lifecycle) Admit(a admission.Attributes) (err error) {
if len(a.GetNamespace()) == 0 || a.GetKind().GroupKind() == api.Kind("Namespace") { if len(a.GetNamespace()) == 0 || a.GetKind().GroupKind() == api.Kind("Namespace") {
// if a namespace is deleted, we want to prevent all further creates into it // if a namespace is deleted, we want to prevent all further creates into it
// while it is undergoing termination. to reduce incidences where the cache // while it is undergoing termination. to reduce incidences where the cache
// is slow to update, we forcefully remove the namespace from our local cache. // is slow to update, we add the namespace into a force live lookup list to ensure
// this will cause a live lookup of the namespace to get its latest state even // we are not looking at stale state.
// before the watch notification is received.
if a.GetOperation() == admission.Delete { if a.GetOperation() == admission.Delete {
l.store.Delete(&api.Namespace{ newEntry := forceLiveLookupEntry{
ObjectMeta: api.ObjectMeta{ expiry: time.Now().Add(forceLiveLookupTTL),
Name: a.GetName(), }
}, l.forceLiveLookupCache.Add(a.GetName(), newEntry)
})
} }
return nil return nil
} }
namespaceObj, exists, err := l.store.Get(&api.Namespace{ // we need to wait for our caches to warm
ObjectMeta: api.ObjectMeta{ if !l.WaitForReady() {
Name: a.GetNamespace(), return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request"))
Namespace: "", }
},
}) var (
namespaceObj interface{}
exists bool
err error
)
key := makeNamespaceKey(a.GetNamespace())
namespaceObj, exists, err = l.namespaceInformer.GetStore().Get(key)
if err != nil { if err != nil {
return errors.NewInternalError(err) return errors.NewInternalError(err)
} }
// forceLiveLookup if true will skip looking at local cache state and instead always make a live call to server.
forceLiveLookup := false
lruItemObj, ok := l.forceLiveLookupCache.Get(a.GetNamespace())
if ok && lruItemObj.(forceLiveLookupEntry).expiry.Before(time.Now()) {
// we think the namespace was marked for deletion, but our current local cache says otherwise, we will force a live lookup.
forceLiveLookup = exists && namespaceObj.(*api.Namespace).Status.Phase == api.NamespaceActive
}
// refuse to operate on non-existent namespaces // refuse to operate on non-existent namespaces
if !exists { if !exists || forceLiveLookup {
// in case of latency in our caches, make a call direct to storage to verify that it truly exists or not // in case of latency in our caches, make a call direct to storage to verify that it truly exists or not
namespaceObj, err = l.client.Core().Namespaces().Get(a.GetNamespace()) namespaceObj, err = l.client.Core().Namespaces().Get(a.GetNamespace())
if err != nil { if err != nil {
@ -111,26 +148,27 @@ func (l *lifecycle) Admit(a admission.Attributes) (err error) {
} }
// NewLifecycle creates a new namespace lifecycle admission control handler // NewLifecycle creates a new namespace lifecycle admission control handler
func NewLifecycle(c clientset.Interface, immortalNamespaces sets.String) admission.Interface { func NewLifecycle(c clientset.Interface, immortalNamespaces sets.String) (admission.Interface, error) {
store := cache.NewStore(cache.MetaNamespaceKeyFunc) forceLiveLookupCache, err := lru.New(100)
reflector := cache.NewReflector( if err != nil {
&cache.ListWatch{ panic(err)
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return c.Core().Namespaces().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return c.Core().Namespaces().Watch(options)
},
},
&api.Namespace{},
store,
5*time.Minute,
)
reflector.Run()
return &lifecycle{
Handler: admission.NewHandler(admission.Create, admission.Update, admission.Delete),
client: c,
store: store,
immortalNamespaces: immortalNamespaces,
} }
return &lifecycle{
Handler: admission.NewHandler(admission.Create, admission.Update, admission.Delete),
client: c,
immortalNamespaces: immortalNamespaces,
forceLiveLookupCache: forceLiveLookupCache,
}, nil
}
func (l *lifecycle) SetInformerFactory(f informers.SharedInformerFactory) {
l.namespaceInformer = f.Namespaces().Informer()
l.SetReadyFunc(l.namespaceInformer.HasSynced)
}
func (l *lifecycle) Validate() error {
if l.namespaceInformer == nil {
return fmt.Errorf("missing namespaceInformer")
}
return nil
} }

View File

@ -18,77 +18,131 @@ package lifecycle
import ( import (
"fmt" "fmt"
"sync"
"testing" "testing"
"time"
"k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/api/unversioned"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/client/unversioned/testclient" "k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
) )
// TestAdmission // newHandlerForTest returns a configured handler for testing.
func TestAdmission(t *testing.T) { func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) {
namespaceObj := &api.Namespace{ f := informers.NewSharedInformerFactory(c, 5*time.Minute)
ObjectMeta: api.ObjectMeta{ handler, err := NewLifecycle(c, sets.NewString(api.NamespaceDefault, api.NamespaceSystem))
Name: "test",
Namespace: "",
},
Status: api.NamespaceStatus{
Phase: api.NamespaceActive,
},
}
var namespaceLock sync.RWMutex
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
store.Add(namespaceObj)
mockClient := fake.NewSimpleClientset()
mockClient.PrependReactor("get", "namespaces", func(action core.Action) (bool, runtime.Object, error) {
namespaceLock.RLock()
defer namespaceLock.RUnlock()
if getAction, ok := action.(testclient.GetAction); ok && getAction.GetName() == namespaceObj.Name {
return true, namespaceObj, nil
}
return true, nil, fmt.Errorf("No result for action %v", action)
})
mockClient.PrependReactor("list", "namespaces", func(action core.Action) (bool, runtime.Object, error) {
namespaceLock.RLock()
defer namespaceLock.RUnlock()
return true, &api.NamespaceList{Items: []api.Namespace{*namespaceObj}}, nil
})
lfhandler := NewLifecycle(mockClient, sets.NewString("default")).(*lifecycle)
lfhandler.store = store
handler := admission.NewChainHandler(lfhandler)
pod := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespaceObj.Name},
Spec: api.PodSpec{
Volumes: []api.Volume{{Name: "vol"}},
Containers: []api.Container{{Name: "ctr", Image: "image"}},
},
}
badPod := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "456", Namespace: "doesnotexist"},
Spec: api.PodSpec{
Volumes: []api.Volume{{Name: "vol"}},
Containers: []api.Container{{Name: "ctr", Image: "image"}},
},
}
err := handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
if err != nil { if err != nil {
t.Errorf("Unexpected error returned from admission handler: %v", err) return nil, f, err
} }
plugins := []admission.Interface{handler}
pluginInitializer := admission.NewPluginInitializer(f)
pluginInitializer.Initialize(plugins)
err = admission.Validate(plugins)
return handler, f, err
}
// change namespace state to terminating // newMockClientForTest creates a mock client that returns a client configured for the specified list of namespaces with the specified phase.
namespaceLock.Lock() func newMockClientForTest(namespaces map[string]api.NamespacePhase) *fake.Clientset {
namespaceObj.Status.Phase = api.NamespaceTerminating mockClient := &fake.Clientset{}
namespaceLock.Unlock() mockClient.AddReactor("list", "namespaces", func(action core.Action) (bool, runtime.Object, error) {
store.Add(namespaceObj) namespaceList := &api.NamespaceList{
ListMeta: unversioned.ListMeta{
ResourceVersion: fmt.Sprintf("%d", len(namespaces)),
},
}
index := 0
for name, phase := range namespaces {
namespaceList.Items = append(namespaceList.Items, api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: name,
ResourceVersion: fmt.Sprintf("%d", index),
},
Status: api.NamespaceStatus{
Phase: phase,
},
})
index++
}
return true, namespaceList, nil
})
return mockClient
}
// newPod returns a new pod for the specified namespace
func newPod(namespace string) api.Pod {
return api.Pod{
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace},
Spec: api.PodSpec{
Volumes: []api.Volume{{Name: "vol"}},
Containers: []api.Container{{Name: "ctr", Image: "image"}},
},
}
}
// TestAdmissionNamespaceDoesNotExist verifies pod is not admitted if namespace does not exist.
func TestAdmissionNamespaceDoesNotExist(t *testing.T) {
namespace := "test"
mockClient := newMockClientForTest(map[string]api.NamespacePhase{})
mockClient.AddReactor("get", "namespaces", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, fmt.Errorf("nope, out of luck")
})
handler, informerFactory, err := newHandlerForTest(mockClient)
if err != nil {
t.Errorf("unexpected error initializing handler: %v", err)
}
informerFactory.Start(wait.NeverStop)
pod := newPod(namespace)
err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
if err == nil {
actions := ""
for _, action := range mockClient.Actions() {
actions = actions + action.GetVerb() + ":" + action.GetResource().Resource + ":" + action.GetSubresource() + ", "
}
t.Errorf("expected error returned from admission handler: %v", actions)
}
}
// TestAdmissionNamespaceActive verifies a resource is admitted when the namespace is active.
func TestAdmissionNamespaceActive(t *testing.T) {
namespace := "test"
mockClient := newMockClientForTest(map[string]api.NamespacePhase{
namespace: api.NamespaceActive,
})
handler, informerFactory, err := newHandlerForTest(mockClient)
if err != nil {
t.Errorf("unexpected error initializing handler: %v", err)
}
informerFactory.Start(wait.NeverStop)
pod := newPod(namespace)
err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
if err != nil {
t.Errorf("unexpected error returned from admission handler")
}
}
// TestAdmissionNamespaceTerminating verifies a resource is not created when the namespace is active.
func TestAdmissionNamespaceTerminating(t *testing.T) {
namespace := "test"
mockClient := newMockClientForTest(map[string]api.NamespacePhase{
namespace: api.NamespaceTerminating,
})
handler, informerFactory, err := newHandlerForTest(mockClient)
if err != nil {
t.Errorf("unexpected error initializing handler: %v", err)
}
informerFactory.Start(wait.NeverStop)
pod := newPod(namespace)
// verify create operations in the namespace cause an error // verify create operations in the namespace cause an error
err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
if err == nil { if err == nil {
@ -118,20 +172,4 @@ func TestAdmission(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("Did not expect an error %v", err) t.Errorf("Did not expect an error %v", err)
} }
// verify create/update/delete of object in non-existent namespace throws error
err = handler.Admit(admission.NewAttributesRecord(&badPod, nil, api.Kind("Pod").WithVersion("version"), badPod.Namespace, badPod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
if err == nil {
t.Errorf("Expected, but didn't get, an error (%v) that objects cannot be created in non-existant namespaces", err)
}
err = handler.Admit(admission.NewAttributesRecord(&badPod, nil, api.Kind("Pod").WithVersion("version"), badPod.Namespace, badPod.Name, api.Resource("pods").WithVersion("version"), "", admission.Update, nil))
if err == nil {
t.Errorf("Expected, but didn't get, an error (%v) that objects cannot be updated in non-existant namespaces", err)
}
err = handler.Admit(admission.NewAttributesRecord(&badPod, nil, api.Kind("Pod").WithVersion("version"), badPod.Namespace, badPod.Name, api.Resource("pods").WithVersion("version"), "", admission.Delete, nil))
if err == nil {
t.Errorf("Expected, but didn't get, an error (%v) that objects cannot be deleted in non-existant namespaces", err)
}
} }