mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-13 13:55:41 +00:00
Fix cache expiration check
This commit is contained in:
parent
6cb0db2651
commit
1a43b01d3c
@ -22,7 +22,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
lru "github.com/hashicorp/golang-lru"
|
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/client/cache"
|
"k8s.io/kubernetes/pkg/client/cache"
|
||||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||||
@ -31,6 +30,8 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/admission"
|
"k8s.io/kubernetes/pkg/admission"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/errors"
|
"k8s.io/kubernetes/pkg/api/errors"
|
||||||
|
utilcache "k8s.io/kubernetes/pkg/util/cache"
|
||||||
|
"k8s.io/kubernetes/pkg/util/clock"
|
||||||
"k8s.io/kubernetes/pkg/util/sets"
|
"k8s.io/kubernetes/pkg/util/sets"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -62,7 +63,7 @@ type lifecycle struct {
|
|||||||
namespaceInformer cache.SharedIndexInformer
|
namespaceInformer cache.SharedIndexInformer
|
||||||
// forceLiveLookupCache holds a list of entries for namespaces that we have a strong reason to believe are stale in our local cache.
|
// forceLiveLookupCache holds a list of entries for namespaces that we have a strong reason to believe are stale in our local cache.
|
||||||
// if a namespace is in this cache, then we will ignore our local state and always fetch latest from api server.
|
// if a namespace is in this cache, then we will ignore our local state and always fetch latest from api server.
|
||||||
forceLiveLookupCache *lru.Cache
|
forceLiveLookupCache *utilcache.LRUExpireCache
|
||||||
}
|
}
|
||||||
|
|
||||||
type forceLiveLookupEntry struct {
|
type forceLiveLookupEntry struct {
|
||||||
@ -95,10 +96,7 @@ func (l *lifecycle) Admit(a admission.Attributes) error {
|
|||||||
// is slow to update, we add the namespace into a force live lookup list to ensure
|
// is slow to update, we add the namespace into a force live lookup list to ensure
|
||||||
// we are not looking at stale state.
|
// we are not looking at stale state.
|
||||||
if a.GetOperation() == admission.Delete {
|
if a.GetOperation() == admission.Delete {
|
||||||
newEntry := forceLiveLookupEntry{
|
l.forceLiveLookupCache.Add(a.GetName(), true, forceLiveLookupTTL)
|
||||||
expiry: time.Now().Add(forceLiveLookupTTL),
|
|
||||||
}
|
|
||||||
l.forceLiveLookupCache.Add(a.GetName(), newEntry)
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -135,8 +133,7 @@ func (l *lifecycle) Admit(a admission.Attributes) error {
|
|||||||
|
|
||||||
// forceLiveLookup if true will skip looking at local cache state and instead always make a live call to server.
|
// forceLiveLookup if true will skip looking at local cache state and instead always make a live call to server.
|
||||||
forceLiveLookup := false
|
forceLiveLookup := false
|
||||||
lruItemObj, ok := l.forceLiveLookupCache.Get(a.GetNamespace())
|
if _, ok := l.forceLiveLookupCache.Get(a.GetNamespace()); ok {
|
||||||
if ok && lruItemObj.(forceLiveLookupEntry).expiry.Before(time.Now()) {
|
|
||||||
// we think the namespace was marked for deletion, but our current local cache says otherwise, we will force a live lookup.
|
// we think the namespace was marked for deletion, but our current local cache says otherwise, we will force a live lookup.
|
||||||
forceLiveLookup = exists && namespaceObj.(*api.Namespace).Status.Phase == api.NamespaceActive
|
forceLiveLookup = exists && namespaceObj.(*api.Namespace).Status.Phase == api.NamespaceActive
|
||||||
}
|
}
|
||||||
@ -170,10 +167,11 @@ func (l *lifecycle) Admit(a admission.Attributes) error {
|
|||||||
|
|
||||||
// NewLifecycle creates a new namespace lifecycle admission control handler
|
// NewLifecycle creates a new namespace lifecycle admission control handler
|
||||||
func NewLifecycle(c clientset.Interface, immortalNamespaces sets.String) (admission.Interface, error) {
|
func NewLifecycle(c clientset.Interface, immortalNamespaces sets.String) (admission.Interface, error) {
|
||||||
forceLiveLookupCache, err := lru.New(100)
|
return newLifecycleWithClock(c, immortalNamespaces, clock.RealClock{})
|
||||||
if err != nil {
|
}
|
||||||
panic(err)
|
|
||||||
}
|
func newLifecycleWithClock(c clientset.Interface, immortalNamespaces sets.String, clock utilcache.Clock) (admission.Interface, error) {
|
||||||
|
forceLiveLookupCache := utilcache.NewLRUExpireCacheWithClock(100, clock)
|
||||||
return &lifecycle{
|
return &lifecycle{
|
||||||
Handler: admission.NewHandler(admission.Create, admission.Update, admission.Delete),
|
Handler: admission.NewHandler(admission.Create, admission.Update, admission.Delete),
|
||||||
client: c,
|
client: c,
|
||||||
|
@ -29,14 +29,20 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/client/testing/core"
|
"k8s.io/kubernetes/pkg/client/testing/core"
|
||||||
"k8s.io/kubernetes/pkg/controller/informers"
|
"k8s.io/kubernetes/pkg/controller/informers"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
|
"k8s.io/kubernetes/pkg/util/clock"
|
||||||
"k8s.io/kubernetes/pkg/util/sets"
|
"k8s.io/kubernetes/pkg/util/sets"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
)
|
)
|
||||||
|
|
||||||
// newHandlerForTest returns a configured handler for testing.
|
// newHandlerForTest returns a configured handler for testing.
|
||||||
func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) {
|
func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) {
|
||||||
|
return newHandlerForTestWithClock(c, clock.RealClock{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// newHandlerForTestWithClock returns a configured handler for testing.
|
||||||
|
func newHandlerForTestWithClock(c clientset.Interface, cacheClock clock.Clock) (admission.Interface, informers.SharedInformerFactory, error) {
|
||||||
f := informers.NewSharedInformerFactory(c, 5*time.Minute)
|
f := informers.NewSharedInformerFactory(c, 5*time.Minute)
|
||||||
handler, err := NewLifecycle(c, sets.NewString(api.NamespaceDefault, api.NamespaceSystem))
|
handler, err := newLifecycleWithClock(c, sets.NewString(api.NamespaceDefault, api.NamespaceSystem), cacheClock)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, f, err
|
return nil, f, err
|
||||||
}
|
}
|
||||||
@ -173,3 +179,80 @@ func TestAdmissionNamespaceTerminating(t *testing.T) {
|
|||||||
t.Errorf("Did not expect an error %v", err)
|
t.Errorf("Did not expect an error %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestAdmissionNamespaceForceLiveLookup verifies live lookups are done after deleting a namespace
|
||||||
|
func TestAdmissionNamespaceForceLiveLookup(t *testing.T) {
|
||||||
|
namespace := "test"
|
||||||
|
getCalls := int64(0)
|
||||||
|
phases := map[string]api.NamespacePhase{namespace: api.NamespaceActive}
|
||||||
|
mockClient := newMockClientForTest(phases)
|
||||||
|
mockClient.AddReactor("get", "namespaces", func(action core.Action) (bool, runtime.Object, error) {
|
||||||
|
getCalls++
|
||||||
|
return true, &api.Namespace{ObjectMeta: api.ObjectMeta{Name: namespace}, Status: api.NamespaceStatus{Phase: phases[namespace]}}, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
fakeClock := clock.NewFakeClock(time.Now())
|
||||||
|
|
||||||
|
handler, informerFactory, err := newHandlerForTestWithClock(mockClient, fakeClock)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error initializing handler: %v", err)
|
||||||
|
}
|
||||||
|
informerFactory.Start(wait.NeverStop)
|
||||||
|
|
||||||
|
pod := newPod(namespace)
|
||||||
|
// verify create operations in the namespace is allowed
|
||||||
|
err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error rejecting creates in an active namespace")
|
||||||
|
}
|
||||||
|
if getCalls != 0 {
|
||||||
|
t.Errorf("Expected no live lookups of the namespace, got %d", getCalls)
|
||||||
|
}
|
||||||
|
getCalls = 0
|
||||||
|
|
||||||
|
// verify delete of namespace can proceed
|
||||||
|
err = handler.Admit(admission.NewAttributesRecord(nil, nil, api.Kind("Namespace").WithVersion("version"), "", namespace, api.Resource("namespaces").WithVersion("version"), "", admission.Delete, nil))
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Expected namespace deletion to be allowed")
|
||||||
|
}
|
||||||
|
if getCalls != 0 {
|
||||||
|
t.Errorf("Expected no live lookups of the namespace, got %d", getCalls)
|
||||||
|
}
|
||||||
|
getCalls = 0
|
||||||
|
|
||||||
|
// simulate the phase changing
|
||||||
|
phases[namespace] = api.NamespaceTerminating
|
||||||
|
|
||||||
|
// verify create operations in the namespace cause an error
|
||||||
|
err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("Expected error rejecting creates in a namespace right after deleting it")
|
||||||
|
}
|
||||||
|
if getCalls != 1 {
|
||||||
|
t.Errorf("Expected a live lookup of the namespace at t=0, got %d", getCalls)
|
||||||
|
}
|
||||||
|
getCalls = 0
|
||||||
|
|
||||||
|
// Ensure the live lookup is still forced up to forceLiveLookupTTL
|
||||||
|
fakeClock.Step(forceLiveLookupTTL)
|
||||||
|
|
||||||
|
// verify create operations in the namespace cause an error
|
||||||
|
err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("Expected error rejecting creates in a namespace right after deleting it")
|
||||||
|
}
|
||||||
|
if getCalls != 1 {
|
||||||
|
t.Errorf("Expected a live lookup of the namespace at t=forceLiveLookupTTL, got %d", getCalls)
|
||||||
|
}
|
||||||
|
getCalls = 0
|
||||||
|
|
||||||
|
// Ensure the live lookup expires
|
||||||
|
fakeClock.Step(time.Millisecond)
|
||||||
|
|
||||||
|
// verify create operations in the namespace don't force a live lookup after the timeout
|
||||||
|
handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
|
||||||
|
if getCalls != 0 {
|
||||||
|
t.Errorf("Expected no live lookup of the namespace at t=forceLiveLookupTTL+1ms, got %d", getCalls)
|
||||||
|
}
|
||||||
|
getCalls = 0
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user