mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 03:11:40 +00:00
WatchBasedManager stops watching immutable objects
This commit is contained in:
parent
1bb68a2cde
commit
b11b7d354d
@ -144,8 +144,14 @@ func NewWatchingConfigMapManager(kubeClient clientset.Interface) Manager {
|
||||
newConfigMap := func() runtime.Object {
|
||||
return &v1.ConfigMap{}
|
||||
}
|
||||
isImmutable := func(object runtime.Object) bool {
|
||||
if configMap, ok := object.(*v1.ConfigMap); ok {
|
||||
return configMap.Immutable != nil && *configMap.Immutable
|
||||
}
|
||||
return false
|
||||
}
|
||||
gr := corev1.Resource("configmap")
|
||||
return &configMapManager{
|
||||
manager: manager.NewWatchBasedManager(listConfigMap, watchConfigMap, newConfigMap, gr, getConfigMapNames),
|
||||
manager: manager.NewWatchBasedManager(listConfigMap, watchConfigMap, newConfigMap, isImmutable, gr, getConfigMapNames),
|
||||
}
|
||||
}
|
||||
|
@ -145,8 +145,14 @@ func NewWatchingSecretManager(kubeClient clientset.Interface) Manager {
|
||||
newSecret := func() runtime.Object {
|
||||
return &v1.Secret{}
|
||||
}
|
||||
isImmutable := func(object runtime.Object) bool {
|
||||
if secret, ok := object.(*v1.Secret); ok {
|
||||
return secret.Immutable != nil && *secret.Immutable
|
||||
}
|
||||
return false
|
||||
}
|
||||
gr := corev1.Resource("secret")
|
||||
return &secretManager{
|
||||
manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, gr, getSecretNames),
|
||||
manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, isImmutable, gr, getSecretNames),
|
||||
}
|
||||
}
|
||||
|
@ -10,6 +10,7 @@ go_library(
|
||||
importpath = "k8s.io/kubernetes/pkg/kubelet/util/manager",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/kubelet/util:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
@ -22,7 +23,9 @@ go_library(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
@ -36,7 +39,9 @@ go_test(
|
||||
deps = [
|
||||
"//pkg/api/v1/pod:go_default_library",
|
||||
"//pkg/apis/core/v1:go_default_library",
|
||||
"//pkg/features:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
@ -44,9 +49,11 @@ go_test(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/testing:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||
],
|
||||
)
|
||||
|
@ -14,13 +14,6 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// TODO: We did some scalability tests and using watchBasedManager
|
||||
// seems to help with apiserver performance at scale visibly.
|
||||
// No issues we also observed at the scale of ~200k watchers with a
|
||||
// single apiserver.
|
||||
// However, we need to perform more extensive testing before we
|
||||
// enable this in production setups.
|
||||
|
||||
package manager
|
||||
|
||||
import (
|
||||
@ -31,6 +24,8 @@ import (
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"k8s.io/klog"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
@ -39,18 +34,37 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
)
|
||||
|
||||
type listObjectFunc func(string, metav1.ListOptions) (runtime.Object, error)
|
||||
type watchObjectFunc func(string, metav1.ListOptions) (watch.Interface, error)
|
||||
type newObjectFunc func() runtime.Object
|
||||
type isImmutableFunc func(runtime.Object) bool
|
||||
|
||||
// objectCacheItem is a single item stored in objectCache.
|
||||
type objectCacheItem struct {
|
||||
refCount int
|
||||
store cache.Store
|
||||
hasSynced func() (bool, error)
|
||||
stopCh chan struct{}
|
||||
|
||||
// lock is protecting from closing stopCh multiple times.
|
||||
lock sync.Mutex
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
func (i *objectCacheItem) stop() bool {
|
||||
i.lock.Lock()
|
||||
defer i.lock.Unlock()
|
||||
select {
|
||||
case <-i.stopCh:
|
||||
// This means that channel is already closed.
|
||||
return false
|
||||
default:
|
||||
close(i.stopCh)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// objectCache is a local cache of objects propagated via
|
||||
@ -59,6 +73,7 @@ type objectCache struct {
|
||||
listObject listObjectFunc
|
||||
watchObject watchObjectFunc
|
||||
newObject newObjectFunc
|
||||
isImmutable isImmutableFunc
|
||||
groupResource schema.GroupResource
|
||||
|
||||
lock sync.RWMutex
|
||||
@ -66,11 +81,17 @@ type objectCache struct {
|
||||
}
|
||||
|
||||
// NewObjectCache returns a new watch-based instance of Store interface.
|
||||
func NewObjectCache(listObject listObjectFunc, watchObject watchObjectFunc, newObject newObjectFunc, groupResource schema.GroupResource) Store {
|
||||
func NewObjectCache(
|
||||
listObject listObjectFunc,
|
||||
watchObject watchObjectFunc,
|
||||
newObject newObjectFunc,
|
||||
isImmutable isImmutableFunc,
|
||||
groupResource schema.GroupResource) Store {
|
||||
return &objectCache{
|
||||
listObject: listObject,
|
||||
watchObject: watchObject,
|
||||
newObject: newObject,
|
||||
isImmutable: isImmutable,
|
||||
groupResource: groupResource,
|
||||
items: make(map[objectKey]*objectCacheItem),
|
||||
}
|
||||
@ -140,7 +161,7 @@ func (c *objectCache) DeleteReference(namespace, name string) {
|
||||
item.refCount--
|
||||
if item.refCount == 0 {
|
||||
// Stop the underlying reflector.
|
||||
close(item.stopCh)
|
||||
item.stop()
|
||||
delete(c.items, key)
|
||||
}
|
||||
}
|
||||
@ -177,6 +198,21 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
|
||||
return nil, apierrors.NewNotFound(c.groupResource, name)
|
||||
}
|
||||
if object, ok := obj.(runtime.Object); ok {
|
||||
// If the returned object is immutable, stop the reflector.
|
||||
//
|
||||
// NOTE: we may potentially not even start the reflector if the object is
|
||||
// already immutable. However, given that:
|
||||
// - we want to also handle the case when object is marked as immutable later
|
||||
// - Secrets and ConfigMaps are periodically fetched by volumemanager anyway
|
||||
// - doing that wouldn't provide visible scalability/performance gain - we
|
||||
// already have it from here
|
||||
// - doing that would require significant refactoring to reflector
|
||||
// we limit ourselves to just quickly stop the reflector here.
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ImmutableEphemeralVolumes) && c.isImmutable(object) {
|
||||
if item.stop() {
|
||||
klog.V(4).Infof("Stopped watching for changes of %q/%q - object is immutable", namespace, name)
|
||||
}
|
||||
}
|
||||
return object, nil
|
||||
}
|
||||
return nil, fmt.Errorf("unexpected object type: %v", obj)
|
||||
@ -188,7 +224,13 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
|
||||
// - whenever a pod is created or updated, we start individual watches for all
|
||||
// referenced objects that aren't referenced from other registered pods
|
||||
// - every GetObject() returns a value from local cache propagated via watches
|
||||
func NewWatchBasedManager(listObject listObjectFunc, watchObject watchObjectFunc, newObject newObjectFunc, groupResource schema.GroupResource, getReferencedObjects func(*v1.Pod) sets.String) Manager {
|
||||
objectStore := NewObjectCache(listObject, watchObject, newObject, groupResource)
|
||||
func NewWatchBasedManager(
|
||||
listObject listObjectFunc,
|
||||
watchObject watchObjectFunc,
|
||||
newObject newObjectFunc,
|
||||
isImmutable isImmutableFunc,
|
||||
groupResource schema.GroupResource,
|
||||
getReferencedObjects func(*v1.Pod) sets.String) Manager {
|
||||
objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource)
|
||||
return NewCacheBasedManager(objectStore, getReferencedObjects)
|
||||
}
|
||||
|
@ -23,17 +23,21 @@ import (
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
core "k8s.io/client-go/testing"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
|
||||
corev1 "k8s.io/kubernetes/pkg/apis/core/v1"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
@ -50,11 +54,19 @@ func watchSecret(fakeClient clientset.Interface) watchObjectFunc {
|
||||
}
|
||||
}
|
||||
|
||||
func isSecretImmutable(object runtime.Object) bool {
|
||||
if secret, ok := object.(*v1.Secret); ok {
|
||||
return secret.Immutable != nil && *secret.Immutable
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func newSecretCache(fakeClient clientset.Interface) *objectCache {
|
||||
return &objectCache{
|
||||
listObject: listSecret(fakeClient),
|
||||
watchObject: watchSecret(fakeClient),
|
||||
newObject: func() runtime.Object { return &v1.Secret{} },
|
||||
isImmutable: isSecretImmutable,
|
||||
groupResource: corev1.Resource("secret"),
|
||||
items: make(map[objectKey]*objectCacheItem),
|
||||
}
|
||||
@ -182,3 +194,138 @@ func TestSecretCacheMultipleRegistrations(t *testing.T) {
|
||||
actions = fakeClient.Actions()
|
||||
assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions)
|
||||
}
|
||||
|
||||
func TestImmutableSecretStopsTheReflector(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ImmutableEphemeralVolumes, true)()
|
||||
|
||||
secret := func(rv string, immutable bool) *v1.Secret {
|
||||
result := &v1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "name",
|
||||
Namespace: "ns",
|
||||
ResourceVersion: rv,
|
||||
},
|
||||
}
|
||||
if immutable {
|
||||
trueVal := true
|
||||
result.Immutable = &trueVal
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
desc string
|
||||
initial *v1.Secret
|
||||
eventual *v1.Secret
|
||||
}{
|
||||
{
|
||||
desc: "secret doesn't exist, created as mutable",
|
||||
initial: nil,
|
||||
eventual: secret("200", false),
|
||||
},
|
||||
{
|
||||
desc: "secret doesn't exist, created as immutable",
|
||||
initial: nil,
|
||||
eventual: secret("200", true),
|
||||
},
|
||||
{
|
||||
desc: "mutable secret modified to mutable",
|
||||
initial: secret("100", false),
|
||||
eventual: secret("200", false),
|
||||
},
|
||||
{
|
||||
desc: "mutable secret modified to immutable",
|
||||
initial: secret("100", false),
|
||||
eventual: secret("200", true),
|
||||
},
|
||||
{
|
||||
desc: "immutable secret",
|
||||
initial: secret("100", true),
|
||||
eventual: nil,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
fakeClient := &fake.Clientset{}
|
||||
listReactor := func(a core.Action) (bool, runtime.Object, error) {
|
||||
result := &v1.SecretList{
|
||||
ListMeta: metav1.ListMeta{
|
||||
ResourceVersion: "100",
|
||||
},
|
||||
}
|
||||
if tc.initial != nil {
|
||||
result.Items = []v1.Secret{*tc.initial}
|
||||
}
|
||||
return true, result, nil
|
||||
}
|
||||
fakeClient.AddReactor("list", "secrets", listReactor)
|
||||
fakeWatch := watch.NewFake()
|
||||
fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
|
||||
|
||||
store := newSecretCache(fakeClient)
|
||||
|
||||
key := objectKey{namespace: "ns", name: "name"}
|
||||
itemExists := func() (bool, error) {
|
||||
store.lock.Lock()
|
||||
defer store.lock.Unlock()
|
||||
_, ok := store.items[key]
|
||||
return ok, nil
|
||||
}
|
||||
reflectorRunning := func() bool {
|
||||
store.lock.Lock()
|
||||
defer store.lock.Unlock()
|
||||
item := store.items[key]
|
||||
|
||||
item.lock.Lock()
|
||||
defer item.lock.Unlock()
|
||||
select {
|
||||
case <-item.stopCh:
|
||||
return false
|
||||
default:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// AddReference should start reflector.
|
||||
store.AddReference("ns", "name")
|
||||
if err := wait.Poll(10*time.Millisecond, time.Second, itemExists); err != nil {
|
||||
t.Errorf("item wasn't added to cache")
|
||||
}
|
||||
|
||||
obj, err := store.Get("ns", "name")
|
||||
if tc.initial != nil {
|
||||
assert.True(t, apiequality.Semantic.DeepEqual(tc.initial, obj))
|
||||
} else {
|
||||
assert.True(t, apierrors.IsNotFound(err))
|
||||
}
|
||||
|
||||
// Reflector should already be stopped for immutable secrets.
|
||||
assert.Equal(t, tc.initial == nil || !isSecretImmutable(tc.initial), reflectorRunning())
|
||||
|
||||
if tc.eventual == nil {
|
||||
return
|
||||
}
|
||||
fakeWatch.Add(tc.eventual)
|
||||
|
||||
// Eventually Get should return that secret.
|
||||
getFn := func() (bool, error) {
|
||||
object, err := store.Get("ns", "name")
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
secret := object.(*v1.Secret)
|
||||
return apiequality.Semantic.DeepEqual(tc.eventual, secret), nil
|
||||
}
|
||||
if err := wait.PollImmediate(10*time.Millisecond, time.Second, getFn); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Reflector should already be stopped for immutable secrets.
|
||||
assert.Equal(t, tc.eventual == nil || !isSecretImmutable(tc.eventual), reflectorRunning())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -56,7 +56,10 @@ func TestWatchBasedManager(t *testing.T) {
|
||||
return client.CoreV1().Secrets(namespace).Watch(options)
|
||||
}
|
||||
newObj := func() runtime.Object { return &v1.Secret{} }
|
||||
store := manager.NewObjectCache(listObj, watchObj, newObj, schema.GroupResource{Group: "v1", Resource: "secrets"})
|
||||
// We want all watches to be up and running to stress test it.
|
||||
// So don't treat any secret as immutable here.
|
||||
isImmutable := func(_ runtime.Object) bool { return false }
|
||||
store := manager.NewObjectCache(listObj, watchObj, newObj, isImmutable, schema.GroupResource{Group: "v1", Resource: "secrets"})
|
||||
|
||||
// create 1000 secrets in parallel
|
||||
t.Log(time.Now(), "creating 1000 secrets")
|
||||
|
Loading…
Reference in New Issue
Block a user