From 1cf6b339f677600d12333e5a16fb38ae6cea72ad Mon Sep 17 00:00:00 2001 From: Shyam Jeedigunta Date: Tue, 30 May 2017 15:35:43 +0200 Subject: [PATCH] Use TTL-based caching configmap manager in kubelet --- pkg/api/v1/pod/util.go | 54 ++ pkg/kubelet/configmap/BUILD | 24 + pkg/kubelet/configmap/configmap_manager.go | 241 ++++++++ .../configmap/configmap_manager_test.go | 537 ++++++++++++++++++ pkg/kubelet/kubelet.go | 3 +- 5 files changed, 858 insertions(+), 1 deletion(-) create mode 100644 pkg/kubelet/configmap/configmap_manager_test.go diff --git a/pkg/api/v1/pod/util.go b/pkg/api/v1/pod/util.go index 3aadefdb546..9734ad98f7d 100644 --- a/pkg/api/v1/pod/util.go +++ b/pkg/api/v1/pod/util.go @@ -197,6 +197,60 @@ func visitContainerSecretNames(container *v1.Container, visitor Visitor) bool { return true } +// VisitPodConfigmapNames invokes the visitor function with the name of every configmap +// referenced by the pod spec. If visitor returns false, visiting is short-circuited. +// Transitive references (e.g. pod -> pvc -> pv -> secret) are not visited. +// Returns true if visiting completed, false if visiting was short-circuited. +func VisitPodConfigmapNames(pod *v1.Pod, visitor Visitor) bool { + for i := range pod.Spec.InitContainers { + if !visitContainerConfigmapNames(&pod.Spec.InitContainers[i], visitor) { + return false + } + } + for i := range pod.Spec.Containers { + if !visitContainerConfigmapNames(&pod.Spec.Containers[i], visitor) { + return false + } + } + var source *v1.VolumeSource + for i := range pod.Spec.Volumes { + source = &pod.Spec.Volumes[i].VolumeSource + switch { + case source.Projected != nil: + for j := range source.Projected.Sources { + if source.Projected.Sources[j].ConfigMap != nil { + if !visitor(source.Projected.Sources[j].ConfigMap.Name) { + return false + } + } + } + case source.ConfigMap != nil: + if !visitor(source.ConfigMap.Name) { + return false + } + } + } + return true +} + +func visitContainerConfigmapNames(container *v1.Container, visitor Visitor) bool { + for _, env := range container.EnvFrom { + if env.ConfigMapRef != nil { + if !visitor(env.ConfigMapRef.Name) { + return false + } + } + } + for _, envVar := range container.Env { + if envVar.ValueFrom != nil && envVar.ValueFrom.ConfigMapKeyRef != nil { + if !visitor(envVar.ValueFrom.ConfigMapKeyRef.Name) { + return false + } + } + } + return true +} + // GetContainerStatus extracts the status of container "name" from "statuses". // It also returns if "name" exists. func GetContainerStatus(statuses []v1.ContainerStatus, name string) (v1.ContainerStatus, bool) { diff --git a/pkg/kubelet/configmap/BUILD b/pkg/kubelet/configmap/BUILD index 983ddcb3c33..df03366e2ae 100644 --- a/pkg/kubelet/configmap/BUILD +++ b/pkg/kubelet/configmap/BUILD @@ -5,6 +5,7 @@ licenses(["notice"]) load( "@io_bazel_rules_go//go:def.bzl", "go_library", + "go_test", ) go_library( @@ -16,8 +17,14 @@ go_library( tags = ["automanaged"], deps = [ "//pkg/api/v1:go_default_library", + "//pkg/api/v1/pod:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/kubelet/util:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/apiserver/pkg/storage/etcd:go_default_library", ], ) @@ -33,3 +40,20 @@ filegroup( srcs = [":package-srcs"], tags = ["automanaged"], ) + +go_test( + name = "go_default_test", + srcs = ["configmap_manager_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/api/v1:go_default_library", + "//pkg/client/clientset_generated/clientset/fake:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//vendor/k8s.io/client-go/testing:go_default_library", + ], +) diff --git a/pkg/kubelet/configmap/configmap_manager.go b/pkg/kubelet/configmap/configmap_manager.go index 2d69c51080a..88ac32624e8 100644 --- a/pkg/kubelet/configmap/configmap_manager.go +++ b/pkg/kubelet/configmap/configmap_manager.go @@ -17,12 +17,25 @@ limitations under the License. package configmap import ( + "fmt" + "strconv" + "sync" "time" + storageetcd "k8s.io/apiserver/pkg/storage/etcd" "k8s.io/kubernetes/pkg/api/v1" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + "k8s.io/kubernetes/pkg/kubelet/util" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/sets" +) + +const ( + defaultTTL = time.Minute ) type Manager interface { @@ -59,3 +72,231 @@ func (s *simpleConfigMapManager) RegisterPod(pod *v1.Pod) { func (s *simpleConfigMapManager) UnregisterPod(pod *v1.Pod) { } + +type GetObjectTTLFunc func() (time.Duration, bool) + +type objectKey struct { + namespace string + name string +} + +// configMapStoreItems is a single item stored in configMapStore. +type configMapStoreItem struct { + refCount int + configMap *configMapData +} + +type configMapData struct { + sync.Mutex + + configMap *v1.ConfigMap + err error + lastUpdateTime time.Time +} + +// configMapStore is a local cache of configmaps. +type configMapStore struct { + kubeClient clientset.Interface + clock clock.Clock + + lock sync.Mutex + items map[objectKey]*configMapStoreItem + + defaultTTL time.Duration + getTTL GetObjectTTLFunc +} + +func newConfigMapStore(kubeClient clientset.Interface, clock clock.Clock, getTTL GetObjectTTLFunc, ttl time.Duration) *configMapStore { + return &configMapStore{ + kubeClient: kubeClient, + clock: clock, + items: make(map[objectKey]*configMapStoreItem), + defaultTTL: ttl, + getTTL: getTTL, + } +} + +func isConfigMapOlder(newConfigMap, oldConfigMap *v1.ConfigMap) bool { + if newConfigMap == nil || oldConfigMap == nil { + return false + } + newVersion, _ := storageetcd.Versioner.ObjectResourceVersion(newConfigMap) + oldVersion, _ := storageetcd.Versioner.ObjectResourceVersion(oldConfigMap) + return newVersion < oldVersion +} + +func (s *configMapStore) Add(namespace, name string) { + key := objectKey{namespace: namespace, name: name} + + // Add is called from RegisterPod, thus it needs to be efficient. + // Thus Add() is only increasing refCount and generation of a given configmap. + // Then Get() is responsible for fetching if needed. + s.lock.Lock() + defer s.lock.Unlock() + item, exists := s.items[key] + if !exists { + item = &configMapStoreItem{ + refCount: 0, + configMap: &configMapData{}, + } + s.items[key] = item + } + + item.refCount++ + // This will trigger fetch on the next Get() operation. + item.configMap = nil +} + +func (s *configMapStore) Delete(namespace, name string) { + key := objectKey{namespace: namespace, name: name} + + s.lock.Lock() + defer s.lock.Unlock() + if item, ok := s.items[key]; ok { + item.refCount-- + if item.refCount == 0 { + delete(s.items, key) + } + } +} + +func GetObjectTTLFromNodeFunc(getNode func() (*v1.Node, error)) GetObjectTTLFunc { + return func() (time.Duration, bool) { + node, err := getNode() + if err != nil { + return time.Duration(0), false + } + if node != nil && node.Annotations != nil { + if value, ok := node.Annotations[v1.ObjectTTLAnnotationKey]; ok { + if intValue, err := strconv.Atoi(value); err == nil { + return time.Duration(intValue) * time.Second, true + } + } + } + return time.Duration(0), false + } +} + +func (s *configMapStore) isConfigMapFresh(data *configMapData) bool { + configMapTTL := s.defaultTTL + if ttl, ok := s.getTTL(); ok { + configMapTTL = ttl + } + return s.clock.Now().Before(data.lastUpdateTime.Add(configMapTTL)) +} + +func (s *configMapStore) Get(namespace, name string) (*v1.ConfigMap, error) { + key := objectKey{namespace: namespace, name: name} + + data := func() *configMapData { + s.lock.Lock() + defer s.lock.Unlock() + item, exists := s.items[key] + if !exists { + return nil + } + if item.configMap == nil { + item.configMap = &configMapData{} + } + return item.configMap + }() + if data == nil { + return nil, fmt.Errorf("configmap %q/%q not registered", namespace, name) + } + + // After updating data in configMapStore, lock the data, fetch configMap if + // needed and return data. + data.Lock() + defer data.Unlock() + if data.err != nil || !s.isConfigMapFresh(data) { + opts := metav1.GetOptions{} + if data.configMap != nil && data.err == nil { + // This is just a periodic refresh of a configmap we successfully fetched previously. + // In this case, server data from apiserver cache to reduce the load on both + // etcd and apiserver (the cache is eventually consistent). + util.FromApiserverCache(&opts) + } + configMap, err := s.kubeClient.Core().ConfigMaps(namespace).Get(name, opts) + if err != nil && !apierrors.IsNotFound(err) && data.configMap == nil && data.err == nil { + // Couldn't fetch the latest configmap, but there is no cached data to return. + // Return the fetch result instead. + return configMap, err + } + if (err == nil && !isConfigMapOlder(configMap, data.configMap)) || apierrors.IsNotFound(err) { + // If the fetch succeeded with a newer version of the configmap, or if the + // configmap could not be found in the apiserver, update the cached data to + // reflect the current status. + data.configMap = configMap + data.err = err + data.lastUpdateTime = s.clock.Now() + } + } + return data.configMap, data.err +} + +// cachingConfigMapManager keeps a cache of all configmaps necessary for registered pods. +// It implements the following logic: +// - whenever a pod is created or updated, the cached versions of all its configmaps +// are invalidated +// - every GetConfigMap() call tries to fetch the value from local cache; if it is +// not there, invalidated or too old, we fetch it from apiserver and refresh the +// value in cache; otherwise it is just fetched from cache +type cachingConfigMapManager struct { + configMapStore *configMapStore + + lock sync.Mutex + registeredPods map[objectKey]*v1.Pod +} + +func NewCachingConfigMapManager(kubeClient clientset.Interface, getTTL GetObjectTTLFunc) Manager { + csm := &cachingConfigMapManager{ + configMapStore: newConfigMapStore(kubeClient, clock.RealClock{}, getTTL, defaultTTL), + registeredPods: make(map[objectKey]*v1.Pod), + } + return csm +} + +func (c *cachingConfigMapManager) GetConfigMap(namespace, name string) (*v1.ConfigMap, error) { + return c.configMapStore.Get(namespace, name) +} + +func getConfigMapNames(pod *v1.Pod) sets.String { + result := sets.NewString() + podutil.VisitPodConfigmapNames(pod, func(name string) bool { + result.Insert(name) + return true + }) + return result +} + +func (c *cachingConfigMapManager) RegisterPod(pod *v1.Pod) { + names := getConfigMapNames(pod) + c.lock.Lock() + defer c.lock.Unlock() + for name := range names { + c.configMapStore.Add(pod.Namespace, name) + } + var prev *v1.Pod + key := objectKey{namespace: pod.Namespace, name: pod.Name} + prev = c.registeredPods[key] + c.registeredPods[key] = pod + if prev != nil { + for name := range getConfigMapNames(prev) { + c.configMapStore.Delete(prev.Namespace, name) + } + } +} + +func (c *cachingConfigMapManager) UnregisterPod(pod *v1.Pod) { + var prev *v1.Pod + key := objectKey{namespace: pod.Namespace, name: pod.Name} + c.lock.Lock() + defer c.lock.Unlock() + prev = c.registeredPods[key] + delete(c.registeredPods, key) + if prev != nil { + for name := range getConfigMapNames(prev) { + c.configMapStore.Delete(prev.Namespace, name) + } + } +} diff --git a/pkg/kubelet/configmap/configmap_manager_test.go b/pkg/kubelet/configmap/configmap_manager_test.go new file mode 100644 index 00000000000..b2a36c516ef --- /dev/null +++ b/pkg/kubelet/configmap/configmap_manager_test.go @@ -0,0 +1,537 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package configmap + +import ( + "fmt" + "reflect" + "strings" + "sync" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/clock" + core "k8s.io/client-go/testing" + + "github.com/stretchr/testify/assert" +) + +func checkConfigMap(t *testing.T, store *configMapStore, ns, name string, shouldExist bool) { + _, err := store.Get(ns, name) + if shouldExist && err != nil { + t.Errorf("unexpected actions: %#v", err) + } + if !shouldExist && (err == nil || !strings.Contains(err.Error(), fmt.Sprintf("configmap %q/%q not registered", ns, name))) { + t.Errorf("unexpected actions: %#v", err) + } +} + +func noObjectTTL() (time.Duration, bool) { + return time.Duration(0), false +} + +func TestConfigMapStore(t *testing.T) { + fakeClient := &fake.Clientset{} + store := newConfigMapStore(fakeClient, clock.RealClock{}, noObjectTTL, 0) + store.Add("ns1", "name1") + store.Add("ns2", "name2") + store.Add("ns1", "name1") + store.Add("ns1", "name1") + store.Delete("ns1", "name1") + store.Delete("ns2", "name2") + store.Add("ns3", "name3") + + // Adds don't issue Get requests. + actions := fakeClient.Actions() + assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions) + // Should issue Get request + store.Get("ns1", "name1") + // Shouldn't issue Get request, as configMap is not registered + store.Get("ns2", "name2") + // Should issue Get request + store.Get("ns3", "name3") + + actions = fakeClient.Actions() + assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions) + + for _, a := range actions { + assert.True(t, a.Matches("get", "configmaps"), "unexpected actions: %#v", a) + } + + checkConfigMap(t, store, "ns1", "name1", true) + checkConfigMap(t, store, "ns2", "name2", false) + checkConfigMap(t, store, "ns3", "name3", true) + checkConfigMap(t, store, "ns4", "name4", false) +} + +func TestConfigMapStoreDeletingConfigMap(t *testing.T) { + fakeClient := &fake.Clientset{} + store := newConfigMapStore(fakeClient, clock.RealClock{}, noObjectTTL, 0) + store.Add("ns", "name") + + result := &v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name", ResourceVersion: "10"}} + fakeClient.AddReactor("get", "configmaps", func(action core.Action) (bool, runtime.Object, error) { + return true, result, nil + }) + configMap, err := store.Get("ns", "name") + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(configMap, result) { + t.Errorf("Unexpected configMap: %v", configMap) + } + + fakeClient.PrependReactor("get", "configmaps", func(action core.Action) (bool, runtime.Object, error) { + return true, &v1.ConfigMap{}, apierrors.NewNotFound(v1.Resource("configMap"), "name") + }) + configMap, err = store.Get("ns", "name") + if err == nil || !apierrors.IsNotFound(err) { + t.Errorf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(configMap, &v1.ConfigMap{}) { + t.Errorf("Unexpected configMap: %v", configMap) + } +} + +func TestConfigMapStoreGetAlwaysRefresh(t *testing.T) { + fakeClient := &fake.Clientset{} + fakeClock := clock.NewFakeClock(time.Now()) + store := newConfigMapStore(fakeClient, fakeClock, noObjectTTL, 0) + + for i := 0; i < 10; i++ { + store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i)) + } + fakeClient.ClearActions() + + wg := sync.WaitGroup{} + wg.Add(100) + for i := 0; i < 100; i++ { + go func(i int) { + store.Get(fmt.Sprintf("ns-%d", i%10), fmt.Sprintf("name-%d", i%10)) + wg.Done() + }(i) + } + wg.Wait() + actions := fakeClient.Actions() + assert.Equal(t, 100, len(actions), "unexpected actions: %#v", actions) + + for _, a := range actions { + assert.True(t, a.Matches("get", "configmaps"), "unexpected actions: %#v", a) + } +} + +func TestConfigMapStoreGetNeverRefresh(t *testing.T) { + fakeClient := &fake.Clientset{} + fakeClock := clock.NewFakeClock(time.Now()) + store := newConfigMapStore(fakeClient, fakeClock, noObjectTTL, time.Minute) + + for i := 0; i < 10; i++ { + store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i)) + } + fakeClient.ClearActions() + + wg := sync.WaitGroup{} + wg.Add(100) + for i := 0; i < 100; i++ { + go func(i int) { + store.Get(fmt.Sprintf("ns-%d", i%10), fmt.Sprintf("name-%d", i%10)) + wg.Done() + }(i) + } + wg.Wait() + actions := fakeClient.Actions() + // Only first Get, should forward the Get request. + assert.Equal(t, 10, len(actions), "unexpected actions: %#v", actions) +} + +func TestCustomTTL(t *testing.T) { + ttl := time.Duration(0) + ttlExists := false + customTTL := func() (time.Duration, bool) { + return ttl, ttlExists + } + + fakeClient := &fake.Clientset{} + fakeClock := clock.NewFakeClock(time.Time{}) + store := newConfigMapStore(fakeClient, fakeClock, customTTL, time.Minute) + + store.Add("ns", "name") + store.Get("ns", "name") + fakeClient.ClearActions() + + // Set 0-ttl and see if that works. + ttl = time.Duration(0) + ttlExists = true + store.Get("ns", "name") + actions := fakeClient.Actions() + assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions) + fakeClient.ClearActions() + + // Set 5-minute ttl and see if this works. + ttl = time.Duration(5) * time.Minute + store.Get("ns", "name") + actions = fakeClient.Actions() + assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions) + // Still no effect after 4 minutes. + fakeClock.Step(4 * time.Minute) + store.Get("ns", "name") + actions = fakeClient.Actions() + assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions) + // Now it should have an effect. + fakeClock.Step(time.Minute) + store.Get("ns", "name") + actions = fakeClient.Actions() + assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions) + fakeClient.ClearActions() + + // Now remove the custom ttl and see if that works. + ttlExists = false + fakeClock.Step(55 * time.Second) + store.Get("ns", "name") + actions = fakeClient.Actions() + assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions) + // Pass the minute and it should be triggered now. + fakeClock.Step(5 * time.Second) + store.Get("ns", "name") + actions = fakeClient.Actions() + assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions) +} + +func TestParseNodeAnnotation(t *testing.T) { + testCases := []struct { + node *v1.Node + err error + exists bool + ttl time.Duration + }{ + { + node: nil, + err: fmt.Errorf("error"), + exists: false, + }, + { + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node", + }, + }, + exists: false, + }, + { + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node", + Annotations: map[string]string{}, + }, + }, + exists: false, + }, + { + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node", + Annotations: map[string]string{v1.ObjectTTLAnnotationKey: "bad"}, + }, + }, + exists: false, + }, + { + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node", + Annotations: map[string]string{v1.ObjectTTLAnnotationKey: "0"}, + }, + }, + exists: true, + ttl: time.Duration(0), + }, + { + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node", + Annotations: map[string]string{v1.ObjectTTLAnnotationKey: "60"}, + }, + }, + exists: true, + ttl: time.Minute, + }, + } + for i, testCase := range testCases { + getNode := func() (*v1.Node, error) { return testCase.node, testCase.err } + ttl, exists := GetObjectTTLFromNodeFunc(getNode)() + if exists != testCase.exists { + t.Errorf("%d: incorrect parsing: %t", i, exists) + continue + } + if exists && ttl != testCase.ttl { + t.Errorf("%d: incorrect ttl: %v", i, ttl) + } + } +} + +type envConfigMaps struct { + envVarNames []string + envFromNames []string +} + +type configMapsToAttach struct { + containerEnvConfigMaps []envConfigMaps + volumes []string +} + +func podWithConfigMaps(ns, name string, toAttach configMapsToAttach) *v1.Pod { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: name, + }, + Spec: v1.PodSpec{}, + } + for i, configMaps := range toAttach.containerEnvConfigMaps { + container := v1.Container{ + Name: fmt.Sprintf("container-%d", i), + } + for _, name := range configMaps.envFromNames { + envFrom := v1.EnvFromSource{ + ConfigMapRef: &v1.ConfigMapEnvSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: name, + }, + }, + } + container.EnvFrom = append(container.EnvFrom, envFrom) + } + + for _, name := range configMaps.envVarNames { + envSource := &v1.EnvVarSource{ + ConfigMapKeyRef: &v1.ConfigMapKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: name, + }, + }, + } + container.Env = append(container.Env, v1.EnvVar{ValueFrom: envSource}) + } + pod.Spec.Containers = append(pod.Spec.Containers, container) + } + for _, configMap := range toAttach.volumes { + volume := &v1.ConfigMapVolumeSource{ + LocalObjectReference: v1.LocalObjectReference{Name: configMap}, + } + pod.Spec.Volumes = append(pod.Spec.Volumes, v1.Volume{ + Name: configMap, + VolumeSource: v1.VolumeSource{ + ConfigMap: volume, + }, + }) + } + return pod +} + +func TestCacheInvalidation(t *testing.T) { + fakeClient := &fake.Clientset{} + fakeClock := clock.NewFakeClock(time.Now()) + store := newConfigMapStore(fakeClient, fakeClock, noObjectTTL, time.Minute) + manager := &cachingConfigMapManager{ + configMapStore: store, + registeredPods: make(map[objectKey]*v1.Pod), + } + + // Create a pod with some configMaps. + s1 := configMapsToAttach{ + containerEnvConfigMaps: []envConfigMaps{ + {envVarNames: []string{"s1"}, envFromNames: []string{"s10"}}, + {envVarNames: []string{"s2"}}, + }, + } + manager.RegisterPod(podWithConfigMaps("ns1", "name1", s1)) + // Fetch both configMaps - this should triggger get operations. + store.Get("ns1", "s1") + store.Get("ns1", "s10") + store.Get("ns1", "s2") + actions := fakeClient.Actions() + assert.Equal(t, 3, len(actions), "unexpected actions: %#v", actions) + fakeClient.ClearActions() + + // Update a pod with a new configMap. + s2 := configMapsToAttach{ + containerEnvConfigMaps: []envConfigMaps{ + {envVarNames: []string{"s1"}}, + {envVarNames: []string{"s2"}, envFromNames: []string{"s20"}}, + }, + volumes: []string{"s3"}, + } + manager.RegisterPod(podWithConfigMaps("ns1", "name1", s2)) + // All configMaps should be invalidated - this should trigger get operations. + store.Get("ns1", "s1") + store.Get("ns1", "s2") + store.Get("ns1", "s20") + store.Get("ns1", "s3") + actions = fakeClient.Actions() + assert.Equal(t, 4, len(actions), "unexpected actions: %#v", actions) + fakeClient.ClearActions() + + // Create a new pod that is refencing the first three configMaps - those should + // be invalidated. + manager.RegisterPod(podWithConfigMaps("ns1", "name2", s1)) + store.Get("ns1", "s1") + store.Get("ns1", "s10") + store.Get("ns1", "s2") + store.Get("ns1", "s20") + store.Get("ns1", "s3") + actions = fakeClient.Actions() + assert.Equal(t, 3, len(actions), "unexpected actions: %#v", actions) + fakeClient.ClearActions() +} + +func TestCacheRefcounts(t *testing.T) { + fakeClient := &fake.Clientset{} + fakeClock := clock.NewFakeClock(time.Now()) + store := newConfigMapStore(fakeClient, fakeClock, noObjectTTL, time.Minute) + manager := &cachingConfigMapManager{ + configMapStore: store, + registeredPods: make(map[objectKey]*v1.Pod), + } + + s1 := configMapsToAttach{ + containerEnvConfigMaps: []envConfigMaps{ + {envVarNames: []string{"s1"}, envFromNames: []string{"s10"}}, + {envVarNames: []string{"s2"}}, + }, + volumes: []string{"s3"}, + } + manager.RegisterPod(podWithConfigMaps("ns1", "name1", s1)) + manager.RegisterPod(podWithConfigMaps("ns1", "name2", s1)) + s2 := configMapsToAttach{ + containerEnvConfigMaps: []envConfigMaps{ + {envVarNames: []string{"s4"}}, + {envVarNames: []string{"s5"}, envFromNames: []string{"s50"}}, + }, + } + manager.RegisterPod(podWithConfigMaps("ns1", "name2", s2)) + manager.RegisterPod(podWithConfigMaps("ns1", "name3", s2)) + manager.RegisterPod(podWithConfigMaps("ns1", "name4", s2)) + manager.UnregisterPod(podWithConfigMaps("ns1", "name3", s2)) + s3 := configMapsToAttach{ + containerEnvConfigMaps: []envConfigMaps{ + {envVarNames: []string{"s3"}, envFromNames: []string{"s30"}}, + {envVarNames: []string{"s5"}}, + }, + } + manager.RegisterPod(podWithConfigMaps("ns1", "name5", s3)) + manager.RegisterPod(podWithConfigMaps("ns1", "name6", s3)) + s4 := configMapsToAttach{ + containerEnvConfigMaps: []envConfigMaps{ + {envVarNames: []string{"s6"}}, + {envFromNames: []string{"s60"}}, + }, + } + manager.RegisterPod(podWithConfigMaps("ns1", "name7", s4)) + manager.UnregisterPod(podWithConfigMaps("ns1", "name7", s4)) + + // Also check the Add + Update + Remove scenario. + manager.RegisterPod(podWithConfigMaps("ns1", "other-name", s1)) + manager.RegisterPod(podWithConfigMaps("ns1", "other-name", s2)) + manager.UnregisterPod(podWithConfigMaps("ns1", "other-name", s2)) + + refs := func(ns, name string) int { + store.lock.Lock() + defer store.lock.Unlock() + item, ok := store.items[objectKey{ns, name}] + if !ok { + return 0 + } + return item.refCount + } + assert.Equal(t, refs("ns1", "s1"), 1) + assert.Equal(t, refs("ns1", "s10"), 1) + assert.Equal(t, refs("ns1", "s2"), 1) + assert.Equal(t, refs("ns1", "s3"), 3) + assert.Equal(t, refs("ns1", "s30"), 2) + assert.Equal(t, refs("ns1", "s4"), 2) + assert.Equal(t, refs("ns1", "s5"), 4) + assert.Equal(t, refs("ns1", "s50"), 2) + assert.Equal(t, refs("ns1", "s6"), 0) + assert.Equal(t, refs("ns1", "s60"), 0) + assert.Equal(t, refs("ns1", "s7"), 0) +} + +func TestCachingConfigMapManager(t *testing.T) { + fakeClient := &fake.Clientset{} + configMapStore := newConfigMapStore(fakeClient, clock.RealClock{}, noObjectTTL, 0) + manager := &cachingConfigMapManager{ + configMapStore: configMapStore, + registeredPods: make(map[objectKey]*v1.Pod), + } + + // Create a pod with some configMaps. + s1 := configMapsToAttach{ + containerEnvConfigMaps: []envConfigMaps{ + {envVarNames: []string{"s1"}}, + {envFromNames: []string{"s20"}}, + }, + volumes: []string{"s2"}, + } + manager.RegisterPod(podWithConfigMaps("ns1", "name1", s1)) + manager.RegisterPod(podWithConfigMaps("ns2", "name2", s1)) + // Update the pod with a different configMaps. + s2 := configMapsToAttach{ + containerEnvConfigMaps: []envConfigMaps{ + {envVarNames: []string{"s3"}}, + {envVarNames: []string{"s4"}}, + {envFromNames: []string{"s40"}}, + }, + } + // Create another pod, but with same configMaps in different namespace. + manager.RegisterPod(podWithConfigMaps("ns2", "name2", s2)) + // Create and delete a pod with some other configMaps. + s3 := configMapsToAttach{ + containerEnvConfigMaps: []envConfigMaps{ + {envVarNames: []string{"s6"}}, + {envFromNames: []string{"s60"}}, + }, + } + manager.RegisterPod(podWithConfigMaps("ns3", "name", s3)) + manager.UnregisterPod(podWithConfigMaps("ns3", "name", s3)) + + existingMaps := map[string][]string{ + "ns1": {"s1", "s2", "s20"}, + "ns2": {"s3", "s4", "s40"}, + } + shouldExist := func(ns, configMap string) bool { + if cmaps, ok := existingMaps[ns]; ok { + for _, cm := range cmaps { + if cm == configMap { + return true + } + } + } + return false + } + + for _, ns := range []string{"ns1", "ns2", "ns3"} { + for _, configMap := range []string{"s1", "s2", "s3", "s4", "s5", "s6", "s20", "s40", "s50"} { + checkConfigMap(t, configMapStore, ns, configMap, shouldExist(ns, configMap)) + } + } +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 637af615847..12143e46d4b 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -490,7 +490,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub kubeDeps.KubeClient, secret.GetObjectTTLFromNodeFunc(klet.GetNode)) klet.secretManager = secretManager - configMapManager := configmap.NewSimpleConfigMapManager(kubeDeps.KubeClient) + configMapManager := configmap.NewCachingConfigMapManager( + kubeDeps.KubeClient, configmap.GetObjectTTLFromNodeFunc(klet.GetNode)) klet.configMapManager = configMapManager if klet.experimentalHostUserNamespaceDefaulting {