diff --git a/pkg/kubelet/configmap/configmap_manager.go b/pkg/kubelet/configmap/configmap_manager.go index 132f60c18f2..7aa5d2765a1 100644 --- a/pkg/kubelet/configmap/configmap_manager.go +++ b/pkg/kubelet/configmap/configmap_manager.go @@ -144,8 +144,14 @@ func NewWatchingConfigMapManager(kubeClient clientset.Interface) Manager { newConfigMap := func() runtime.Object { return &v1.ConfigMap{} } + isImmutable := func(object runtime.Object) bool { + if configMap, ok := object.(*v1.ConfigMap); ok { + return configMap.Immutable != nil && *configMap.Immutable + } + return false + } gr := corev1.Resource("configmap") return &configMapManager{ - manager: manager.NewWatchBasedManager(listConfigMap, watchConfigMap, newConfigMap, gr, getConfigMapNames), + manager: manager.NewWatchBasedManager(listConfigMap, watchConfigMap, newConfigMap, isImmutable, gr, getConfigMapNames), } } diff --git a/pkg/kubelet/secret/secret_manager.go b/pkg/kubelet/secret/secret_manager.go index 56843ed9107..97f3e100c97 100644 --- a/pkg/kubelet/secret/secret_manager.go +++ b/pkg/kubelet/secret/secret_manager.go @@ -145,8 +145,14 @@ func NewWatchingSecretManager(kubeClient clientset.Interface) Manager { newSecret := func() runtime.Object { return &v1.Secret{} } + isImmutable := func(object runtime.Object) bool { + if secret, ok := object.(*v1.Secret); ok { + return secret.Immutable != nil && *secret.Immutable + } + return false + } gr := corev1.Resource("secret") return &secretManager{ - manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, gr, getSecretNames), + manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, isImmutable, gr, getSecretNames), } } diff --git a/pkg/kubelet/util/manager/BUILD b/pkg/kubelet/util/manager/BUILD index fbe9e111f86..c88f7023e68 100644 --- a/pkg/kubelet/util/manager/BUILD +++ b/pkg/kubelet/util/manager/BUILD @@ -10,6 +10,7 @@ go_library( importpath = "k8s.io/kubernetes/pkg/kubelet/util/manager", visibility = ["//visibility:public"], deps = [ + "//pkg/features:go_default_library", "//pkg/kubelet/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", @@ -22,7 +23,9 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", + "//vendor/k8s.io/klog:go_default_library", ], ) @@ -36,7 +39,9 @@ go_test( deps = [ "//pkg/api/v1/pod:go_default_library", "//pkg/apis/core/v1:go_default_library", + "//pkg/features:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", @@ -44,9 +49,11 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library", + "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", ], ) diff --git a/pkg/kubelet/util/manager/watch_based_manager.go b/pkg/kubelet/util/manager/watch_based_manager.go index 952326348f0..56ff2af0eb5 100644 --- a/pkg/kubelet/util/manager/watch_based_manager.go +++ b/pkg/kubelet/util/manager/watch_based_manager.go @@ -14,13 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -// TODO: We did some scalability tests and using watchBasedManager -// seems to help with apiserver performance at scale visibly. -// No issues we also observed at the scale of ~200k watchers with a -// single apiserver. -// However, we need to perform more extensive testing before we -// enable this in production setups. - package manager import ( @@ -31,6 +24,8 @@ import ( "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" + "k8s.io/klog" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -39,18 +34,37 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/features" ) type listObjectFunc func(string, metav1.ListOptions) (runtime.Object, error) type watchObjectFunc func(string, metav1.ListOptions) (watch.Interface, error) type newObjectFunc func() runtime.Object +type isImmutableFunc func(runtime.Object) bool // objectCacheItem is a single item stored in objectCache. type objectCacheItem struct { refCount int store cache.Store hasSynced func() (bool, error) - stopCh chan struct{} + + // lock is protecting from closing stopCh multiple times. + lock sync.Mutex + stopCh chan struct{} +} + +func (i *objectCacheItem) stop() bool { + i.lock.Lock() + defer i.lock.Unlock() + select { + case <-i.stopCh: + // This means that channel is already closed. + return false + default: + close(i.stopCh) + return true + } } // objectCache is a local cache of objects propagated via @@ -59,6 +73,7 @@ type objectCache struct { listObject listObjectFunc watchObject watchObjectFunc newObject newObjectFunc + isImmutable isImmutableFunc groupResource schema.GroupResource lock sync.RWMutex @@ -66,11 +81,17 @@ type objectCache struct { } // NewObjectCache returns a new watch-based instance of Store interface. -func NewObjectCache(listObject listObjectFunc, watchObject watchObjectFunc, newObject newObjectFunc, groupResource schema.GroupResource) Store { +func NewObjectCache( + listObject listObjectFunc, + watchObject watchObjectFunc, + newObject newObjectFunc, + isImmutable isImmutableFunc, + groupResource schema.GroupResource) Store { return &objectCache{ listObject: listObject, watchObject: watchObject, newObject: newObject, + isImmutable: isImmutable, groupResource: groupResource, items: make(map[objectKey]*objectCacheItem), } @@ -140,7 +161,7 @@ func (c *objectCache) DeleteReference(namespace, name string) { item.refCount-- if item.refCount == 0 { // Stop the underlying reflector. - close(item.stopCh) + item.stop() delete(c.items, key) } } @@ -177,6 +198,21 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) { return nil, apierrors.NewNotFound(c.groupResource, name) } if object, ok := obj.(runtime.Object); ok { + // If the returned object is immutable, stop the reflector. + // + // NOTE: we may potentially not even start the reflector if the object is + // already immutable. However, given that: + // - we want to also handle the case when object is marked as immutable later + // - Secrets and ConfigMaps are periodically fetched by volumemanager anyway + // - doing that wouldn't provide visible scalability/performance gain - we + // already have it from here + // - doing that would require significant refactoring to reflector + // we limit ourselves to just quickly stop the reflector here. + if utilfeature.DefaultFeatureGate.Enabled(features.ImmutableEphemeralVolumes) && c.isImmutable(object) { + if item.stop() { + klog.V(4).Infof("Stopped watching for changes of %q/%q - object is immutable", namespace, name) + } + } return object, nil } return nil, fmt.Errorf("unexpected object type: %v", obj) @@ -188,7 +224,13 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) { // - whenever a pod is created or updated, we start individual watches for all // referenced objects that aren't referenced from other registered pods // - every GetObject() returns a value from local cache propagated via watches -func NewWatchBasedManager(listObject listObjectFunc, watchObject watchObjectFunc, newObject newObjectFunc, groupResource schema.GroupResource, getReferencedObjects func(*v1.Pod) sets.String) Manager { - objectStore := NewObjectCache(listObject, watchObject, newObject, groupResource) +func NewWatchBasedManager( + listObject listObjectFunc, + watchObject watchObjectFunc, + newObject newObjectFunc, + isImmutable isImmutableFunc, + groupResource schema.GroupResource, + getReferencedObjects func(*v1.Pod) sets.String) Manager { + objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource) return NewCacheBasedManager(objectStore, getReferencedObjects) } diff --git a/pkg/kubelet/util/manager/watch_based_manager_test.go b/pkg/kubelet/util/manager/watch_based_manager_test.go index fc27a7374d6..140f7d1f27b 100644 --- a/pkg/kubelet/util/manager/watch_based_manager_test.go +++ b/pkg/kubelet/util/manager/watch_based_manager_test.go @@ -23,17 +23,21 @@ import ( "time" "k8s.io/api/core/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" + featuregatetesting "k8s.io/component-base/featuregate/testing" corev1 "k8s.io/kubernetes/pkg/apis/core/v1" + "k8s.io/kubernetes/pkg/features" "github.com/stretchr/testify/assert" ) @@ -50,11 +54,19 @@ func watchSecret(fakeClient clientset.Interface) watchObjectFunc { } } +func isSecretImmutable(object runtime.Object) bool { + if secret, ok := object.(*v1.Secret); ok { + return secret.Immutable != nil && *secret.Immutable + } + return false +} + func newSecretCache(fakeClient clientset.Interface) *objectCache { return &objectCache{ listObject: listSecret(fakeClient), watchObject: watchSecret(fakeClient), newObject: func() runtime.Object { return &v1.Secret{} }, + isImmutable: isSecretImmutable, groupResource: corev1.Resource("secret"), items: make(map[objectKey]*objectCacheItem), } @@ -182,3 +194,138 @@ func TestSecretCacheMultipleRegistrations(t *testing.T) { actions = fakeClient.Actions() assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions) } + +func TestImmutableSecretStopsTheReflector(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ImmutableEphemeralVolumes, true)() + + secret := func(rv string, immutable bool) *v1.Secret { + result := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "ns", + ResourceVersion: rv, + }, + } + if immutable { + trueVal := true + result.Immutable = &trueVal + } + return result + } + + tests := []struct { + desc string + initial *v1.Secret + eventual *v1.Secret + }{ + { + desc: "secret doesn't exist, created as mutable", + initial: nil, + eventual: secret("200", false), + }, + { + desc: "secret doesn't exist, created as immutable", + initial: nil, + eventual: secret("200", true), + }, + { + desc: "mutable secret modified to mutable", + initial: secret("100", false), + eventual: secret("200", false), + }, + { + desc: "mutable secret modified to immutable", + initial: secret("100", false), + eventual: secret("200", true), + }, + { + desc: "immutable secret", + initial: secret("100", true), + eventual: nil, + }, + } + + for _, tc := range tests { + t.Run(tc.desc, func(t *testing.T) { + fakeClient := &fake.Clientset{} + listReactor := func(a core.Action) (bool, runtime.Object, error) { + result := &v1.SecretList{ + ListMeta: metav1.ListMeta{ + ResourceVersion: "100", + }, + } + if tc.initial != nil { + result.Items = []v1.Secret{*tc.initial} + } + return true, result, nil + } + fakeClient.AddReactor("list", "secrets", listReactor) + fakeWatch := watch.NewFake() + fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil)) + + store := newSecretCache(fakeClient) + + key := objectKey{namespace: "ns", name: "name"} + itemExists := func() (bool, error) { + store.lock.Lock() + defer store.lock.Unlock() + _, ok := store.items[key] + return ok, nil + } + reflectorRunning := func() bool { + store.lock.Lock() + defer store.lock.Unlock() + item := store.items[key] + + item.lock.Lock() + defer item.lock.Unlock() + select { + case <-item.stopCh: + return false + default: + return true + } + } + + // AddReference should start reflector. + store.AddReference("ns", "name") + if err := wait.Poll(10*time.Millisecond, time.Second, itemExists); err != nil { + t.Errorf("item wasn't added to cache") + } + + obj, err := store.Get("ns", "name") + if tc.initial != nil { + assert.True(t, apiequality.Semantic.DeepEqual(tc.initial, obj)) + } else { + assert.True(t, apierrors.IsNotFound(err)) + } + + // Reflector should already be stopped for immutable secrets. + assert.Equal(t, tc.initial == nil || !isSecretImmutable(tc.initial), reflectorRunning()) + + if tc.eventual == nil { + return + } + fakeWatch.Add(tc.eventual) + + // Eventually Get should return that secret. + getFn := func() (bool, error) { + object, err := store.Get("ns", "name") + if err != nil { + if apierrors.IsNotFound(err) { + return false, nil + } + return false, err + } + secret := object.(*v1.Secret) + return apiequality.Semantic.DeepEqual(tc.eventual, secret), nil + } + if err := wait.PollImmediate(10*time.Millisecond, time.Second, getFn); err != nil { + t.Errorf("unexpected error: %v", err) + } + + // Reflector should already be stopped for immutable secrets. + assert.Equal(t, tc.eventual == nil || !isSecretImmutable(tc.eventual), reflectorRunning()) + }) + } +} diff --git a/test/integration/kubelet/watch_manager_test.go b/test/integration/kubelet/watch_manager_test.go index 248c7059fe7..0bcaea1102f 100644 --- a/test/integration/kubelet/watch_manager_test.go +++ b/test/integration/kubelet/watch_manager_test.go @@ -56,7 +56,10 @@ func TestWatchBasedManager(t *testing.T) { return client.CoreV1().Secrets(namespace).Watch(options) } newObj := func() runtime.Object { return &v1.Secret{} } - store := manager.NewObjectCache(listObj, watchObj, newObj, schema.GroupResource{Group: "v1", Resource: "secrets"}) + // We want all watches to be up and running to stress test it. + // So don't treat any secret as immutable here. + isImmutable := func(_ runtime.Object) bool { return false } + store := manager.NewObjectCache(listObj, watchObj, newObj, isImmutable, schema.GroupResource{Group: "v1", Resource: "secrets"}) // create 1000 secrets in parallel t.Log(time.Now(), "creating 1000 secrets")