diff --git a/pkg/api/v1/pod/util.go b/pkg/api/v1/pod/util.go index 3aadefdb546..9734ad98f7d 100644 --- a/pkg/api/v1/pod/util.go +++ b/pkg/api/v1/pod/util.go @@ -197,6 +197,60 @@ func visitContainerSecretNames(container *v1.Container, visitor Visitor) bool { return true } +// VisitPodConfigmapNames invokes the visitor function with the name of every configmap +// referenced by the pod spec. If visitor returns false, visiting is short-circuited. +// Transitive references (e.g. pod -> pvc -> pv -> secret) are not visited. +// Returns true if visiting completed, false if visiting was short-circuited. +func VisitPodConfigmapNames(pod *v1.Pod, visitor Visitor) bool { + for i := range pod.Spec.InitContainers { + if !visitContainerConfigmapNames(&pod.Spec.InitContainers[i], visitor) { + return false + } + } + for i := range pod.Spec.Containers { + if !visitContainerConfigmapNames(&pod.Spec.Containers[i], visitor) { + return false + } + } + var source *v1.VolumeSource + for i := range pod.Spec.Volumes { + source = &pod.Spec.Volumes[i].VolumeSource + switch { + case source.Projected != nil: + for j := range source.Projected.Sources { + if source.Projected.Sources[j].ConfigMap != nil { + if !visitor(source.Projected.Sources[j].ConfigMap.Name) { + return false + } + } + } + case source.ConfigMap != nil: + if !visitor(source.ConfigMap.Name) { + return false + } + } + } + return true +} + +func visitContainerConfigmapNames(container *v1.Container, visitor Visitor) bool { + for _, env := range container.EnvFrom { + if env.ConfigMapRef != nil { + if !visitor(env.ConfigMapRef.Name) { + return false + } + } + } + for _, envVar := range container.Env { + if envVar.ValueFrom != nil && envVar.ValueFrom.ConfigMapKeyRef != nil { + if !visitor(envVar.ValueFrom.ConfigMapKeyRef.Name) { + return false + } + } + } + return true +} + // GetContainerStatus extracts the status of container "name" from "statuses". // It also returns if "name" exists. func GetContainerStatus(statuses []v1.ContainerStatus, name string) (v1.ContainerStatus, bool) { diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index fd49f0f470e..8db08392418 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -552,6 +552,12 @@ func (adc *attachDetachController) GetSecretFunc() func(namespace, name string) } } +func (adc *attachDetachController) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) { + return func(_, _ string) (*v1.ConfigMap, error) { + return nil, fmt.Errorf("GetConfigMap unsupported in attachDetachController") + } +} + func (adc *attachDetachController) addNodeToDswp(node *v1.Node, nodeName types.NodeName) { if _, exists := node.Annotations[volumehelper.ControllerManagedAttachAnnotation]; exists { keepTerminatedPodVolumes := false diff --git a/pkg/controller/volume/persistentvolume/volume_host.go b/pkg/controller/volume/persistentvolume/volume_host.go index 111c2c0d8d5..f545f546391 100644 --- a/pkg/controller/volume/persistentvolume/volume_host.go +++ b/pkg/controller/volume/persistentvolume/volume_host.go @@ -87,6 +87,12 @@ func (adc *PersistentVolumeController) GetSecretFunc() func(namespace, name stri } } +func (adc *PersistentVolumeController) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) { + return func(_, _ string) (*v1.ConfigMap, error) { + return nil, fmt.Errorf("GetConfigMap unsupported in PersistentVolumeController") + } +} + func (ctrl *PersistentVolumeController) GetNodeLabels() (map[string]string, error) { return nil, fmt.Errorf("GetNodeLabels() unsupported in PersistentVolumeController") } diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index 1ae7c962341..1c732d19e09 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -57,6 +57,7 @@ go_library( "//pkg/kubelet/certificate:go_default_library", "//pkg/kubelet/cm:go_default_library", "//pkg/kubelet/config:go_default_library", + "//pkg/kubelet/configmap:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/dockershim:go_default_library", "//pkg/kubelet/dockershim/libdocker:go_default_library", @@ -174,6 +175,7 @@ go_test( "//pkg/kubelet/cadvisor/testing:go_default_library", "//pkg/kubelet/cm:go_default_library", "//pkg/kubelet/config:go_default_library", + "//pkg/kubelet/configmap:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container/testing:go_default_library", "//pkg/kubelet/eviction:go_default_library", @@ -249,6 +251,7 @@ filegroup( "//pkg/kubelet/client:all-srcs", "//pkg/kubelet/cm:all-srcs", "//pkg/kubelet/config:all-srcs", + "//pkg/kubelet/configmap:all-srcs", "//pkg/kubelet/container:all-srcs", "//pkg/kubelet/custommetrics:all-srcs", "//pkg/kubelet/dockershim:all-srcs", diff --git a/pkg/kubelet/configmap/BUILD b/pkg/kubelet/configmap/BUILD new file mode 100644 index 00000000000..df03366e2ae --- /dev/null +++ b/pkg/kubelet/configmap/BUILD @@ -0,0 +1,59 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_library( + name = "go_default_library", + srcs = [ + "configmap_manager.go", + "fake_manager.go", + ], + tags = ["automanaged"], + deps = [ + "//pkg/api/v1:go_default_library", + "//pkg/api/v1/pod:go_default_library", + "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/kubelet/util:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/apiserver/pkg/storage/etcd:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) + +go_test( + name = "go_default_test", + srcs = ["configmap_manager_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/api/v1:go_default_library", + "//pkg/client/clientset_generated/clientset/fake:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//vendor/k8s.io/client-go/testing:go_default_library", + ], +) diff --git a/pkg/kubelet/configmap/configmap_manager.go b/pkg/kubelet/configmap/configmap_manager.go new file mode 100644 index 00000000000..88ac32624e8 --- /dev/null +++ b/pkg/kubelet/configmap/configmap_manager.go @@ -0,0 +1,302 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package configmap + +import ( + "fmt" + "strconv" + "sync" + "time" + + storageetcd "k8s.io/apiserver/pkg/storage/etcd" + "k8s.io/kubernetes/pkg/api/v1" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + "k8s.io/kubernetes/pkg/kubelet/util" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/sets" +) + +const ( + defaultTTL = time.Minute +) + +type Manager interface { + // Get configmap by configmap namespace and name. + GetConfigMap(namespace, name string) (*v1.ConfigMap, error) + + // WARNING: Register/UnregisterPod functions should be efficient, + // i.e. should not block on network operations. + + // RegisterPod registers all configmaps from a given pod. + RegisterPod(pod *v1.Pod) + + // UnregisterPod unregisters configmaps from a given pod that are not + // used by any other registered pod. + UnregisterPod(pod *v1.Pod) +} + +// simpleConfigMapManager implements ConfigMap Manager interface with +// simple operations to apiserver. +type simpleConfigMapManager struct { + kubeClient clientset.Interface +} + +func NewSimpleConfigMapManager(kubeClient clientset.Interface) Manager { + return &simpleConfigMapManager{kubeClient: kubeClient} +} + +func (s *simpleConfigMapManager) GetConfigMap(namespace, name string) (*v1.ConfigMap, error) { + return s.kubeClient.Core().ConfigMaps(namespace).Get(name, metav1.GetOptions{}) +} + +func (s *simpleConfigMapManager) RegisterPod(pod *v1.Pod) { +} + +func (s *simpleConfigMapManager) UnregisterPod(pod *v1.Pod) { +} + +type GetObjectTTLFunc func() (time.Duration, bool) + +type objectKey struct { + namespace string + name string +} + +// configMapStoreItems is a single item stored in configMapStore. +type configMapStoreItem struct { + refCount int + configMap *configMapData +} + +type configMapData struct { + sync.Mutex + + configMap *v1.ConfigMap + err error + lastUpdateTime time.Time +} + +// configMapStore is a local cache of configmaps. +type configMapStore struct { + kubeClient clientset.Interface + clock clock.Clock + + lock sync.Mutex + items map[objectKey]*configMapStoreItem + + defaultTTL time.Duration + getTTL GetObjectTTLFunc +} + +func newConfigMapStore(kubeClient clientset.Interface, clock clock.Clock, getTTL GetObjectTTLFunc, ttl time.Duration) *configMapStore { + return &configMapStore{ + kubeClient: kubeClient, + clock: clock, + items: make(map[objectKey]*configMapStoreItem), + defaultTTL: ttl, + getTTL: getTTL, + } +} + +func isConfigMapOlder(newConfigMap, oldConfigMap *v1.ConfigMap) bool { + if newConfigMap == nil || oldConfigMap == nil { + return false + } + newVersion, _ := storageetcd.Versioner.ObjectResourceVersion(newConfigMap) + oldVersion, _ := storageetcd.Versioner.ObjectResourceVersion(oldConfigMap) + return newVersion < oldVersion +} + +func (s *configMapStore) Add(namespace, name string) { + key := objectKey{namespace: namespace, name: name} + + // Add is called from RegisterPod, thus it needs to be efficient. + // Thus Add() is only increasing refCount and generation of a given configmap. + // Then Get() is responsible for fetching if needed. + s.lock.Lock() + defer s.lock.Unlock() + item, exists := s.items[key] + if !exists { + item = &configMapStoreItem{ + refCount: 0, + configMap: &configMapData{}, + } + s.items[key] = item + } + + item.refCount++ + // This will trigger fetch on the next Get() operation. + item.configMap = nil +} + +func (s *configMapStore) Delete(namespace, name string) { + key := objectKey{namespace: namespace, name: name} + + s.lock.Lock() + defer s.lock.Unlock() + if item, ok := s.items[key]; ok { + item.refCount-- + if item.refCount == 0 { + delete(s.items, key) + } + } +} + +func GetObjectTTLFromNodeFunc(getNode func() (*v1.Node, error)) GetObjectTTLFunc { + return func() (time.Duration, bool) { + node, err := getNode() + if err != nil { + return time.Duration(0), false + } + if node != nil && node.Annotations != nil { + if value, ok := node.Annotations[v1.ObjectTTLAnnotationKey]; ok { + if intValue, err := strconv.Atoi(value); err == nil { + return time.Duration(intValue) * time.Second, true + } + } + } + return time.Duration(0), false + } +} + +func (s *configMapStore) isConfigMapFresh(data *configMapData) bool { + configMapTTL := s.defaultTTL + if ttl, ok := s.getTTL(); ok { + configMapTTL = ttl + } + return s.clock.Now().Before(data.lastUpdateTime.Add(configMapTTL)) +} + +func (s *configMapStore) Get(namespace, name string) (*v1.ConfigMap, error) { + key := objectKey{namespace: namespace, name: name} + + data := func() *configMapData { + s.lock.Lock() + defer s.lock.Unlock() + item, exists := s.items[key] + if !exists { + return nil + } + if item.configMap == nil { + item.configMap = &configMapData{} + } + return item.configMap + }() + if data == nil { + return nil, fmt.Errorf("configmap %q/%q not registered", namespace, name) + } + + // After updating data in configMapStore, lock the data, fetch configMap if + // needed and return data. + data.Lock() + defer data.Unlock() + if data.err != nil || !s.isConfigMapFresh(data) { + opts := metav1.GetOptions{} + if data.configMap != nil && data.err == nil { + // This is just a periodic refresh of a configmap we successfully fetched previously. + // In this case, server data from apiserver cache to reduce the load on both + // etcd and apiserver (the cache is eventually consistent). + util.FromApiserverCache(&opts) + } + configMap, err := s.kubeClient.Core().ConfigMaps(namespace).Get(name, opts) + if err != nil && !apierrors.IsNotFound(err) && data.configMap == nil && data.err == nil { + // Couldn't fetch the latest configmap, but there is no cached data to return. + // Return the fetch result instead. + return configMap, err + } + if (err == nil && !isConfigMapOlder(configMap, data.configMap)) || apierrors.IsNotFound(err) { + // If the fetch succeeded with a newer version of the configmap, or if the + // configmap could not be found in the apiserver, update the cached data to + // reflect the current status. + data.configMap = configMap + data.err = err + data.lastUpdateTime = s.clock.Now() + } + } + return data.configMap, data.err +} + +// cachingConfigMapManager keeps a cache of all configmaps necessary for registered pods. +// It implements the following logic: +// - whenever a pod is created or updated, the cached versions of all its configmaps +// are invalidated +// - every GetConfigMap() call tries to fetch the value from local cache; if it is +// not there, invalidated or too old, we fetch it from apiserver and refresh the +// value in cache; otherwise it is just fetched from cache +type cachingConfigMapManager struct { + configMapStore *configMapStore + + lock sync.Mutex + registeredPods map[objectKey]*v1.Pod +} + +func NewCachingConfigMapManager(kubeClient clientset.Interface, getTTL GetObjectTTLFunc) Manager { + csm := &cachingConfigMapManager{ + configMapStore: newConfigMapStore(kubeClient, clock.RealClock{}, getTTL, defaultTTL), + registeredPods: make(map[objectKey]*v1.Pod), + } + return csm +} + +func (c *cachingConfigMapManager) GetConfigMap(namespace, name string) (*v1.ConfigMap, error) { + return c.configMapStore.Get(namespace, name) +} + +func getConfigMapNames(pod *v1.Pod) sets.String { + result := sets.NewString() + podutil.VisitPodConfigmapNames(pod, func(name string) bool { + result.Insert(name) + return true + }) + return result +} + +func (c *cachingConfigMapManager) RegisterPod(pod *v1.Pod) { + names := getConfigMapNames(pod) + c.lock.Lock() + defer c.lock.Unlock() + for name := range names { + c.configMapStore.Add(pod.Namespace, name) + } + var prev *v1.Pod + key := objectKey{namespace: pod.Namespace, name: pod.Name} + prev = c.registeredPods[key] + c.registeredPods[key] = pod + if prev != nil { + for name := range getConfigMapNames(prev) { + c.configMapStore.Delete(prev.Namespace, name) + } + } +} + +func (c *cachingConfigMapManager) UnregisterPod(pod *v1.Pod) { + var prev *v1.Pod + key := objectKey{namespace: pod.Namespace, name: pod.Name} + c.lock.Lock() + defer c.lock.Unlock() + prev = c.registeredPods[key] + delete(c.registeredPods, key) + if prev != nil { + for name := range getConfigMapNames(prev) { + c.configMapStore.Delete(prev.Namespace, name) + } + } +} diff --git a/pkg/kubelet/configmap/configmap_manager_test.go b/pkg/kubelet/configmap/configmap_manager_test.go new file mode 100644 index 00000000000..b2a36c516ef --- /dev/null +++ b/pkg/kubelet/configmap/configmap_manager_test.go @@ -0,0 +1,537 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package configmap + +import ( + "fmt" + "reflect" + "strings" + "sync" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/clock" + core "k8s.io/client-go/testing" + + "github.com/stretchr/testify/assert" +) + +func checkConfigMap(t *testing.T, store *configMapStore, ns, name string, shouldExist bool) { + _, err := store.Get(ns, name) + if shouldExist && err != nil { + t.Errorf("unexpected actions: %#v", err) + } + if !shouldExist && (err == nil || !strings.Contains(err.Error(), fmt.Sprintf("configmap %q/%q not registered", ns, name))) { + t.Errorf("unexpected actions: %#v", err) + } +} + +func noObjectTTL() (time.Duration, bool) { + return time.Duration(0), false +} + +func TestConfigMapStore(t *testing.T) { + fakeClient := &fake.Clientset{} + store := newConfigMapStore(fakeClient, clock.RealClock{}, noObjectTTL, 0) + store.Add("ns1", "name1") + store.Add("ns2", "name2") + store.Add("ns1", "name1") + store.Add("ns1", "name1") + store.Delete("ns1", "name1") + store.Delete("ns2", "name2") + store.Add("ns3", "name3") + + // Adds don't issue Get requests. + actions := fakeClient.Actions() + assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions) + // Should issue Get request + store.Get("ns1", "name1") + // Shouldn't issue Get request, as configMap is not registered + store.Get("ns2", "name2") + // Should issue Get request + store.Get("ns3", "name3") + + actions = fakeClient.Actions() + assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions) + + for _, a := range actions { + assert.True(t, a.Matches("get", "configmaps"), "unexpected actions: %#v", a) + } + + checkConfigMap(t, store, "ns1", "name1", true) + checkConfigMap(t, store, "ns2", "name2", false) + checkConfigMap(t, store, "ns3", "name3", true) + checkConfigMap(t, store, "ns4", "name4", false) +} + +func TestConfigMapStoreDeletingConfigMap(t *testing.T) { + fakeClient := &fake.Clientset{} + store := newConfigMapStore(fakeClient, clock.RealClock{}, noObjectTTL, 0) + store.Add("ns", "name") + + result := &v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name", ResourceVersion: "10"}} + fakeClient.AddReactor("get", "configmaps", func(action core.Action) (bool, runtime.Object, error) { + return true, result, nil + }) + configMap, err := store.Get("ns", "name") + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(configMap, result) { + t.Errorf("Unexpected configMap: %v", configMap) + } + + fakeClient.PrependReactor("get", "configmaps", func(action core.Action) (bool, runtime.Object, error) { + return true, &v1.ConfigMap{}, apierrors.NewNotFound(v1.Resource("configMap"), "name") + }) + configMap, err = store.Get("ns", "name") + if err == nil || !apierrors.IsNotFound(err) { + t.Errorf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(configMap, &v1.ConfigMap{}) { + t.Errorf("Unexpected configMap: %v", configMap) + } +} + +func TestConfigMapStoreGetAlwaysRefresh(t *testing.T) { + fakeClient := &fake.Clientset{} + fakeClock := clock.NewFakeClock(time.Now()) + store := newConfigMapStore(fakeClient, fakeClock, noObjectTTL, 0) + + for i := 0; i < 10; i++ { + store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i)) + } + fakeClient.ClearActions() + + wg := sync.WaitGroup{} + wg.Add(100) + for i := 0; i < 100; i++ { + go func(i int) { + store.Get(fmt.Sprintf("ns-%d", i%10), fmt.Sprintf("name-%d", i%10)) + wg.Done() + }(i) + } + wg.Wait() + actions := fakeClient.Actions() + assert.Equal(t, 100, len(actions), "unexpected actions: %#v", actions) + + for _, a := range actions { + assert.True(t, a.Matches("get", "configmaps"), "unexpected actions: %#v", a) + } +} + +func TestConfigMapStoreGetNeverRefresh(t *testing.T) { + fakeClient := &fake.Clientset{} + fakeClock := clock.NewFakeClock(time.Now()) + store := newConfigMapStore(fakeClient, fakeClock, noObjectTTL, time.Minute) + + for i := 0; i < 10; i++ { + store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i)) + } + fakeClient.ClearActions() + + wg := sync.WaitGroup{} + wg.Add(100) + for i := 0; i < 100; i++ { + go func(i int) { + store.Get(fmt.Sprintf("ns-%d", i%10), fmt.Sprintf("name-%d", i%10)) + wg.Done() + }(i) + } + wg.Wait() + actions := fakeClient.Actions() + // Only first Get, should forward the Get request. + assert.Equal(t, 10, len(actions), "unexpected actions: %#v", actions) +} + +func TestCustomTTL(t *testing.T) { + ttl := time.Duration(0) + ttlExists := false + customTTL := func() (time.Duration, bool) { + return ttl, ttlExists + } + + fakeClient := &fake.Clientset{} + fakeClock := clock.NewFakeClock(time.Time{}) + store := newConfigMapStore(fakeClient, fakeClock, customTTL, time.Minute) + + store.Add("ns", "name") + store.Get("ns", "name") + fakeClient.ClearActions() + + // Set 0-ttl and see if that works. + ttl = time.Duration(0) + ttlExists = true + store.Get("ns", "name") + actions := fakeClient.Actions() + assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions) + fakeClient.ClearActions() + + // Set 5-minute ttl and see if this works. + ttl = time.Duration(5) * time.Minute + store.Get("ns", "name") + actions = fakeClient.Actions() + assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions) + // Still no effect after 4 minutes. + fakeClock.Step(4 * time.Minute) + store.Get("ns", "name") + actions = fakeClient.Actions() + assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions) + // Now it should have an effect. + fakeClock.Step(time.Minute) + store.Get("ns", "name") + actions = fakeClient.Actions() + assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions) + fakeClient.ClearActions() + + // Now remove the custom ttl and see if that works. + ttlExists = false + fakeClock.Step(55 * time.Second) + store.Get("ns", "name") + actions = fakeClient.Actions() + assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions) + // Pass the minute and it should be triggered now. + fakeClock.Step(5 * time.Second) + store.Get("ns", "name") + actions = fakeClient.Actions() + assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions) +} + +func TestParseNodeAnnotation(t *testing.T) { + testCases := []struct { + node *v1.Node + err error + exists bool + ttl time.Duration + }{ + { + node: nil, + err: fmt.Errorf("error"), + exists: false, + }, + { + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node", + }, + }, + exists: false, + }, + { + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node", + Annotations: map[string]string{}, + }, + }, + exists: false, + }, + { + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node", + Annotations: map[string]string{v1.ObjectTTLAnnotationKey: "bad"}, + }, + }, + exists: false, + }, + { + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node", + Annotations: map[string]string{v1.ObjectTTLAnnotationKey: "0"}, + }, + }, + exists: true, + ttl: time.Duration(0), + }, + { + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node", + Annotations: map[string]string{v1.ObjectTTLAnnotationKey: "60"}, + }, + }, + exists: true, + ttl: time.Minute, + }, + } + for i, testCase := range testCases { + getNode := func() (*v1.Node, error) { return testCase.node, testCase.err } + ttl, exists := GetObjectTTLFromNodeFunc(getNode)() + if exists != testCase.exists { + t.Errorf("%d: incorrect parsing: %t", i, exists) + continue + } + if exists && ttl != testCase.ttl { + t.Errorf("%d: incorrect ttl: %v", i, ttl) + } + } +} + +type envConfigMaps struct { + envVarNames []string + envFromNames []string +} + +type configMapsToAttach struct { + containerEnvConfigMaps []envConfigMaps + volumes []string +} + +func podWithConfigMaps(ns, name string, toAttach configMapsToAttach) *v1.Pod { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: name, + }, + Spec: v1.PodSpec{}, + } + for i, configMaps := range toAttach.containerEnvConfigMaps { + container := v1.Container{ + Name: fmt.Sprintf("container-%d", i), + } + for _, name := range configMaps.envFromNames { + envFrom := v1.EnvFromSource{ + ConfigMapRef: &v1.ConfigMapEnvSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: name, + }, + }, + } + container.EnvFrom = append(container.EnvFrom, envFrom) + } + + for _, name := range configMaps.envVarNames { + envSource := &v1.EnvVarSource{ + ConfigMapKeyRef: &v1.ConfigMapKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: name, + }, + }, + } + container.Env = append(container.Env, v1.EnvVar{ValueFrom: envSource}) + } + pod.Spec.Containers = append(pod.Spec.Containers, container) + } + for _, configMap := range toAttach.volumes { + volume := &v1.ConfigMapVolumeSource{ + LocalObjectReference: v1.LocalObjectReference{Name: configMap}, + } + pod.Spec.Volumes = append(pod.Spec.Volumes, v1.Volume{ + Name: configMap, + VolumeSource: v1.VolumeSource{ + ConfigMap: volume, + }, + }) + } + return pod +} + +func TestCacheInvalidation(t *testing.T) { + fakeClient := &fake.Clientset{} + fakeClock := clock.NewFakeClock(time.Now()) + store := newConfigMapStore(fakeClient, fakeClock, noObjectTTL, time.Minute) + manager := &cachingConfigMapManager{ + configMapStore: store, + registeredPods: make(map[objectKey]*v1.Pod), + } + + // Create a pod with some configMaps. + s1 := configMapsToAttach{ + containerEnvConfigMaps: []envConfigMaps{ + {envVarNames: []string{"s1"}, envFromNames: []string{"s10"}}, + {envVarNames: []string{"s2"}}, + }, + } + manager.RegisterPod(podWithConfigMaps("ns1", "name1", s1)) + // Fetch both configMaps - this should triggger get operations. + store.Get("ns1", "s1") + store.Get("ns1", "s10") + store.Get("ns1", "s2") + actions := fakeClient.Actions() + assert.Equal(t, 3, len(actions), "unexpected actions: %#v", actions) + fakeClient.ClearActions() + + // Update a pod with a new configMap. + s2 := configMapsToAttach{ + containerEnvConfigMaps: []envConfigMaps{ + {envVarNames: []string{"s1"}}, + {envVarNames: []string{"s2"}, envFromNames: []string{"s20"}}, + }, + volumes: []string{"s3"}, + } + manager.RegisterPod(podWithConfigMaps("ns1", "name1", s2)) + // All configMaps should be invalidated - this should trigger get operations. + store.Get("ns1", "s1") + store.Get("ns1", "s2") + store.Get("ns1", "s20") + store.Get("ns1", "s3") + actions = fakeClient.Actions() + assert.Equal(t, 4, len(actions), "unexpected actions: %#v", actions) + fakeClient.ClearActions() + + // Create a new pod that is refencing the first three configMaps - those should + // be invalidated. + manager.RegisterPod(podWithConfigMaps("ns1", "name2", s1)) + store.Get("ns1", "s1") + store.Get("ns1", "s10") + store.Get("ns1", "s2") + store.Get("ns1", "s20") + store.Get("ns1", "s3") + actions = fakeClient.Actions() + assert.Equal(t, 3, len(actions), "unexpected actions: %#v", actions) + fakeClient.ClearActions() +} + +func TestCacheRefcounts(t *testing.T) { + fakeClient := &fake.Clientset{} + fakeClock := clock.NewFakeClock(time.Now()) + store := newConfigMapStore(fakeClient, fakeClock, noObjectTTL, time.Minute) + manager := &cachingConfigMapManager{ + configMapStore: store, + registeredPods: make(map[objectKey]*v1.Pod), + } + + s1 := configMapsToAttach{ + containerEnvConfigMaps: []envConfigMaps{ + {envVarNames: []string{"s1"}, envFromNames: []string{"s10"}}, + {envVarNames: []string{"s2"}}, + }, + volumes: []string{"s3"}, + } + manager.RegisterPod(podWithConfigMaps("ns1", "name1", s1)) + manager.RegisterPod(podWithConfigMaps("ns1", "name2", s1)) + s2 := configMapsToAttach{ + containerEnvConfigMaps: []envConfigMaps{ + {envVarNames: []string{"s4"}}, + {envVarNames: []string{"s5"}, envFromNames: []string{"s50"}}, + }, + } + manager.RegisterPod(podWithConfigMaps("ns1", "name2", s2)) + manager.RegisterPod(podWithConfigMaps("ns1", "name3", s2)) + manager.RegisterPod(podWithConfigMaps("ns1", "name4", s2)) + manager.UnregisterPod(podWithConfigMaps("ns1", "name3", s2)) + s3 := configMapsToAttach{ + containerEnvConfigMaps: []envConfigMaps{ + {envVarNames: []string{"s3"}, envFromNames: []string{"s30"}}, + {envVarNames: []string{"s5"}}, + }, + } + manager.RegisterPod(podWithConfigMaps("ns1", "name5", s3)) + manager.RegisterPod(podWithConfigMaps("ns1", "name6", s3)) + s4 := configMapsToAttach{ + containerEnvConfigMaps: []envConfigMaps{ + {envVarNames: []string{"s6"}}, + {envFromNames: []string{"s60"}}, + }, + } + manager.RegisterPod(podWithConfigMaps("ns1", "name7", s4)) + manager.UnregisterPod(podWithConfigMaps("ns1", "name7", s4)) + + // Also check the Add + Update + Remove scenario. + manager.RegisterPod(podWithConfigMaps("ns1", "other-name", s1)) + manager.RegisterPod(podWithConfigMaps("ns1", "other-name", s2)) + manager.UnregisterPod(podWithConfigMaps("ns1", "other-name", s2)) + + refs := func(ns, name string) int { + store.lock.Lock() + defer store.lock.Unlock() + item, ok := store.items[objectKey{ns, name}] + if !ok { + return 0 + } + return item.refCount + } + assert.Equal(t, refs("ns1", "s1"), 1) + assert.Equal(t, refs("ns1", "s10"), 1) + assert.Equal(t, refs("ns1", "s2"), 1) + assert.Equal(t, refs("ns1", "s3"), 3) + assert.Equal(t, refs("ns1", "s30"), 2) + assert.Equal(t, refs("ns1", "s4"), 2) + assert.Equal(t, refs("ns1", "s5"), 4) + assert.Equal(t, refs("ns1", "s50"), 2) + assert.Equal(t, refs("ns1", "s6"), 0) + assert.Equal(t, refs("ns1", "s60"), 0) + assert.Equal(t, refs("ns1", "s7"), 0) +} + +func TestCachingConfigMapManager(t *testing.T) { + fakeClient := &fake.Clientset{} + configMapStore := newConfigMapStore(fakeClient, clock.RealClock{}, noObjectTTL, 0) + manager := &cachingConfigMapManager{ + configMapStore: configMapStore, + registeredPods: make(map[objectKey]*v1.Pod), + } + + // Create a pod with some configMaps. + s1 := configMapsToAttach{ + containerEnvConfigMaps: []envConfigMaps{ + {envVarNames: []string{"s1"}}, + {envFromNames: []string{"s20"}}, + }, + volumes: []string{"s2"}, + } + manager.RegisterPod(podWithConfigMaps("ns1", "name1", s1)) + manager.RegisterPod(podWithConfigMaps("ns2", "name2", s1)) + // Update the pod with a different configMaps. + s2 := configMapsToAttach{ + containerEnvConfigMaps: []envConfigMaps{ + {envVarNames: []string{"s3"}}, + {envVarNames: []string{"s4"}}, + {envFromNames: []string{"s40"}}, + }, + } + // Create another pod, but with same configMaps in different namespace. + manager.RegisterPod(podWithConfigMaps("ns2", "name2", s2)) + // Create and delete a pod with some other configMaps. + s3 := configMapsToAttach{ + containerEnvConfigMaps: []envConfigMaps{ + {envVarNames: []string{"s6"}}, + {envFromNames: []string{"s60"}}, + }, + } + manager.RegisterPod(podWithConfigMaps("ns3", "name", s3)) + manager.UnregisterPod(podWithConfigMaps("ns3", "name", s3)) + + existingMaps := map[string][]string{ + "ns1": {"s1", "s2", "s20"}, + "ns2": {"s3", "s4", "s40"}, + } + shouldExist := func(ns, configMap string) bool { + if cmaps, ok := existingMaps[ns]; ok { + for _, cm := range cmaps { + if cm == configMap { + return true + } + } + } + return false + } + + for _, ns := range []string{"ns1", "ns2", "ns3"} { + for _, configMap := range []string{"s1", "s2", "s3", "s4", "s5", "s6", "s20", "s40", "s50"} { + checkConfigMap(t, configMapStore, ns, configMap, shouldExist(ns, configMap)) + } + } +} diff --git a/pkg/kubelet/configmap/fake_manager.go b/pkg/kubelet/configmap/fake_manager.go new file mode 100644 index 00000000000..d1a4b85a6f9 --- /dev/null +++ b/pkg/kubelet/configmap/fake_manager.go @@ -0,0 +1,40 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package configmap + +import ( + "k8s.io/kubernetes/pkg/api/v1" +) + +// fakeManager implements Manager interface for testing purposes. +// simple operations to apiserver. +type fakeManager struct { +} + +func NewFakeManager() Manager { + return &fakeManager{} +} + +func (s *fakeManager) GetConfigMap(namespace, name string) (*v1.ConfigMap, error) { + return nil, nil +} + +func (s *fakeManager) RegisterPod(pod *v1.Pod) { +} + +func (s *fakeManager) UnregisterPod(pod *v1.Pod) { +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c87641c2c46..12143e46d4b 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -68,6 +68,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/certificate" "k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/config" + "k8s.io/kubernetes/pkg/kubelet/configmap" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockershim" "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker" @@ -487,9 +488,12 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub secretManager := secret.NewCachingSecretManager( kubeDeps.KubeClient, secret.GetObjectTTLFromNodeFunc(klet.GetNode)) - klet.secretManager = secretManager + configMapManager := configmap.NewCachingConfigMapManager( + kubeDeps.KubeClient, configmap.GetObjectTTLFromNodeFunc(klet.GetNode)) + klet.configMapManager = configMapManager + if klet.experimentalHostUserNamespaceDefaulting { glog.Infof("Experimental host user namespace defaulting is enabled.") } @@ -518,8 +522,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub klet.livenessManager = proberesults.NewManager() klet.podCache = kubecontainer.NewCache() - // podManager is also responsible for keeping secretManager contents up-to-date. - klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager) + // podManager is also responsible for keeping secretManager and configMapManager contents up-to-date. + klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager) if kubeCfg.RemoteRuntimeEndpoint != "" { // kubeCfg.RemoteImageEndpoint is same as kubeCfg.RemoteRuntimeEndpoint if not explicitly specified @@ -717,7 +721,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub kubeDeps.Recorder) klet.volumePluginMgr, err = - NewInitializedVolumePluginMgr(klet, secretManager, kubeDeps.VolumePlugins) + NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, kubeDeps.VolumePlugins) if err != nil { return nil, err } @@ -917,6 +921,9 @@ type Kubelet struct { // Secret manager. secretManager secret.Manager + // ConfigMap manager. + configMapManager configmap.Manager + // Cached MachineInfo returned by cadvisor. machineInfo *cadvisorapi.MachineInfo diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index b26449e954f..e4c1aa57380 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -446,7 +446,7 @@ func (kl *Kubelet) makeEnvironmentVariables(pod *v1.Pod, container *v1.Container return result, fmt.Errorf("Couldn't get configMap %v/%v, no kubeClient defined", pod.Namespace, name) } optional := cm.Optional != nil && *cm.Optional - configMap, err = kl.kubeClient.Core().ConfigMaps(pod.Namespace).Get(name, metav1.GetOptions{}) + configMap, err = kl.configMapManager.GetConfigMap(pod.Namespace, name) if err != nil { if errors.IsNotFound(err) && optional { // ignore error when marked optional @@ -554,7 +554,7 @@ func (kl *Kubelet) makeEnvironmentVariables(pod *v1.Pod, container *v1.Container if kl.kubeClient == nil { return result, fmt.Errorf("Couldn't get configMap %v/%v, no kubeClient defined", pod.Namespace, name) } - configMap, err = kl.kubeClient.Core().ConfigMaps(pod.Namespace).Get(name, metav1.GetOptions{}) + configMap, err = kl.configMapManager.GetConfigMap(pod.Namespace, name) if err != nil { if errors.IsNotFound(err) && optional { // ignore error when marked optional diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 00e3a9340ca..a2c63c30d8a 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -46,6 +46,7 @@ import ( cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing" "k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/config" + "k8s.io/kubernetes/pkg/kubelet/configmap" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" "k8s.io/kubernetes/pkg/kubelet/eviction" @@ -208,7 +209,9 @@ func newTestKubeletWithImageList( fakeMirrorClient := podtest.NewFakeMirrorClient() secretManager := secret.NewSimpleSecretManager(kubelet.kubeClient) kubelet.secretManager = secretManager - kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient, kubelet.secretManager) + configMapManager := configmap.NewSimpleConfigMapManager(kubelet.kubeClient) + kubelet.configMapManager = configMapManager + kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient, kubelet.secretManager, kubelet.configMapManager) kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}) diskSpaceManager, err := newDiskSpaceManager(mockCadvisor, DiskSpacePolicy{}) if err != nil { @@ -276,7 +279,7 @@ func newTestKubeletWithImageList( plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil} kubelet.volumePluginMgr, err = - NewInitializedVolumePluginMgr(kubelet, kubelet.secretManager, []volume.VolumePlugin{plug}) + NewInitializedVolumePluginMgr(kubelet, kubelet.secretManager, kubelet.configMapManager, []volume.VolumePlugin{plug}) require.NoError(t, err, "Failed to initialize VolumePluginMgr") kubelet.mounter = &mount.FakeMounter{} diff --git a/pkg/kubelet/pod/BUILD b/pkg/kubelet/pod/BUILD index 999d6dfeada..48e9e30b827 100644 --- a/pkg/kubelet/pod/BUILD +++ b/pkg/kubelet/pod/BUILD @@ -18,6 +18,7 @@ go_library( deps = [ "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/kubelet/configmap:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/secret:go_default_library", "//pkg/kubelet/types:go_default_library", @@ -38,6 +39,7 @@ go_test( tags = ["automanaged"], deps = [ "//pkg/api/v1:go_default_library", + "//pkg/kubelet/configmap:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/pod/testing:go_default_library", "//pkg/kubelet/secret:go_default_library", diff --git a/pkg/kubelet/pod/pod_manager.go b/pkg/kubelet/pod/pod_manager.go index dd069b0fa3e..f2a3f94470b 100644 --- a/pkg/kubelet/pod/pod_manager.go +++ b/pkg/kubelet/pod/pod_manager.go @@ -21,6 +21,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/kubelet/configmap" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/secret" ) @@ -113,17 +114,19 @@ type basicManager struct { // Mirror pod UID to pod UID map. translationByUID map[types.UID]types.UID - // basicManager is keeping secretManager up-to-date. - secretManager secret.Manager + // basicManager is keeping secretManager and configMapManager up-to-date. + secretManager secret.Manager + configMapManager configmap.Manager // A mirror pod client to create/delete mirror pods. MirrorClient } // NewBasicPodManager returns a functional Manager. -func NewBasicPodManager(client MirrorClient, secretManager secret.Manager) Manager { +func NewBasicPodManager(client MirrorClient, secretManager secret.Manager, configMapManager configmap.Manager) Manager { pm := &basicManager{} pm.secretManager = secretManager + pm.configMapManager = configMapManager pm.MirrorClient = client pm.SetPods(nil) return pm @@ -163,6 +166,11 @@ func (pm *basicManager) updatePodsInternal(pods ...*v1.Pod) { // not register pod, as it doesn't really matter. pm.secretManager.RegisterPod(pod) } + if pm.configMapManager != nil { + // TODO: Consider detecting only status update and in such case do + // not register pod, as it doesn't really matter. + pm.configMapManager.RegisterPod(pod) + } podFullName := kubecontainer.GetPodFullName(pod) if IsMirrorPod(pod) { pm.mirrorPodByUID[pod.UID] = pod @@ -186,6 +194,9 @@ func (pm *basicManager) DeletePod(pod *v1.Pod) { if pm.secretManager != nil { pm.secretManager.UnregisterPod(pod) } + if pm.configMapManager != nil { + pm.configMapManager.UnregisterPod(pod) + } podFullName := kubecontainer.GetPodFullName(pod) if IsMirrorPod(pod) { delete(pm.mirrorPodByUID, pod.UID) diff --git a/pkg/kubelet/pod/pod_manager_test.go b/pkg/kubelet/pod/pod_manager_test.go index 06dab024b5d..cb17725d528 100644 --- a/pkg/kubelet/pod/pod_manager_test.go +++ b/pkg/kubelet/pod/pod_manager_test.go @@ -23,6 +23,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/kubelet/configmap" podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" "k8s.io/kubernetes/pkg/kubelet/secret" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" @@ -32,7 +33,8 @@ import ( func newTestManager() (*basicManager, *podtest.FakeMirrorClient) { fakeMirrorClient := podtest.NewFakeMirrorClient() secretManager := secret.NewFakeManager() - manager := NewBasicPodManager(fakeMirrorClient, secretManager).(*basicManager) + configMapManager := configmap.NewFakeManager() + manager := NewBasicPodManager(fakeMirrorClient, secretManager, configMapManager).(*basicManager) return manager, fakeMirrorClient } diff --git a/pkg/kubelet/prober/common_test.go b/pkg/kubelet/prober/common_test.go index 409dbdf4236..0dc4e70bf05 100644 --- a/pkg/kubelet/prober/common_test.go +++ b/pkg/kubelet/prober/common_test.go @@ -99,7 +99,7 @@ func setTestProbe(pod *v1.Pod, probeType probeType, probeSpec v1.Probe) { func newTestManager() *manager { refManager := kubecontainer.NewRefManager() refManager.SetRef(testContainerID, &v1.ObjectReference{}) // Suppress prober warnings. - podManager := kubepod.NewBasicPodManager(nil, nil) + podManager := kubepod.NewBasicPodManager(nil, nil, nil) // Add test pod to pod manager, so that status manager can get the pod from pod manager if needed. podManager.AddPod(getTestPod()) m := NewManager( diff --git a/pkg/kubelet/prober/worker_test.go b/pkg/kubelet/prober/worker_test.go index da4a523a1dc..ccb21556fab 100644 --- a/pkg/kubelet/prober/worker_test.go +++ b/pkg/kubelet/prober/worker_test.go @@ -118,7 +118,7 @@ func TestDoProbe(t *testing.T) { } // Clean up. - m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil), &statustest.FakePodDeletionSafetyProvider{}) + m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil, nil), &statustest.FakePodDeletionSafetyProvider{}) resultsManager(m, probeType).Remove(testContainerID) } } diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 99d932b7ac8..d23a79f3a7a 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing" "k8s.io/kubernetes/pkg/kubelet/cm" + "k8s.io/kubernetes/pkg/kubelet/configmap" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" "k8s.io/kubernetes/pkg/kubelet/eviction" @@ -63,8 +64,9 @@ func TestRunOnce(t *testing.T) { Capacity: 10 * mb, }, nil) fakeSecretManager := secret.NewFakeManager() + fakeConfigMapManager := configmap.NewFakeManager() podManager := kubepod.NewBasicPodManager( - podtest.NewFakeMirrorClient(), fakeSecretManager) + podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager) diskSpaceManager, _ := newDiskSpaceManager(cadvisor, DiskSpacePolicy{}) fakeRuntime := &containertest.FakeRuntime{} basePath, err := utiltesting.MkTmpdir("kubelet") @@ -93,7 +95,7 @@ func TestRunOnce(t *testing.T) { plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil} kb.volumePluginMgr, err = - NewInitializedVolumePluginMgr(kb, fakeSecretManager, []volume.VolumePlugin{plug}) + NewInitializedVolumePluginMgr(kb, fakeSecretManager, fakeConfigMapManager, []volume.VolumePlugin{plug}) if err != nil { t.Fatalf("failed to initialize VolumePluginMgr: %v", err) } diff --git a/pkg/kubelet/status/BUILD b/pkg/kubelet/status/BUILD index 20ef87777a1..0c2a9a01734 100644 --- a/pkg/kubelet/status/BUILD +++ b/pkg/kubelet/status/BUILD @@ -48,6 +48,7 @@ go_test( "//pkg/api/v1/pod:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/clientset/fake:go_default_library", + "//pkg/kubelet/configmap:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/pod:go_default_library", "//pkg/kubelet/pod/testing:go_default_library", diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index b51d1ced28d..3c1bc7f91d3 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -36,6 +36,7 @@ import ( podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + kubeconfigmap "k8s.io/kubernetes/pkg/kubelet/configmap" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" @@ -74,7 +75,7 @@ func (m *manager) testSyncBatch() { } func newTestManager(kubeClient clientset.Interface) *manager { - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager()) + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager(), kubeconfigmap.NewFakeManager()) podManager.AddPod(getTestPod()) return NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{}).(*manager) } diff --git a/pkg/kubelet/volume_host.go b/pkg/kubelet/volume_host.go index 87d625f33a3..24f3acf8513 100644 --- a/pkg/kubelet/volume_host.go +++ b/pkg/kubelet/volume_host.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/kubelet/configmap" "k8s.io/kubernetes/pkg/kubelet/secret" "k8s.io/kubernetes/pkg/util/io" "k8s.io/kubernetes/pkg/util/mount" @@ -39,11 +40,13 @@ import ( func NewInitializedVolumePluginMgr( kubelet *Kubelet, secretManager secret.Manager, + configMapManager configmap.Manager, plugins []volume.VolumePlugin) (*volume.VolumePluginMgr, error) { kvh := &kubeletVolumeHost{ - kubelet: kubelet, - volumePluginMgr: volume.VolumePluginMgr{}, - secretManager: secretManager, + kubelet: kubelet, + volumePluginMgr: volume.VolumePluginMgr{}, + secretManager: secretManager, + configMapManager: configMapManager, } if err := kvh.volumePluginMgr.InitPlugins(plugins, kvh); err != nil { @@ -63,9 +66,10 @@ func (kvh *kubeletVolumeHost) GetPluginDir(pluginName string) string { } type kubeletVolumeHost struct { - kubelet *Kubelet - volumePluginMgr volume.VolumePluginMgr - secretManager secret.Manager + kubelet *Kubelet + volumePluginMgr volume.VolumePluginMgr + secretManager secret.Manager + configMapManager configmap.Manager } func (kvh *kubeletVolumeHost) GetPodVolumeDir(podUID types.UID, pluginName string, volumeName string) string { @@ -141,6 +145,10 @@ func (kvh *kubeletVolumeHost) GetSecretFunc() func(namespace, name string) (*v1. return kvh.secretManager.GetSecret } +func (kvh *kubeletVolumeHost) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) { + return kvh.configMapManager.GetConfigMap +} + func (kvh *kubeletVolumeHost) GetNodeLabels() (map[string]string, error) { node, err := kvh.kubelet.GetNode() if err != nil { diff --git a/pkg/kubelet/volumemanager/BUILD b/pkg/kubelet/volumemanager/BUILD index 855089f3a87..46ee7c04cce 100644 --- a/pkg/kubelet/volumemanager/BUILD +++ b/pkg/kubelet/volumemanager/BUILD @@ -47,6 +47,7 @@ go_test( "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/clientset/fake:go_default_library", "//pkg/kubelet/config:go_default_library", + "//pkg/kubelet/configmap:go_default_library", "//pkg/kubelet/container/testing:go_default_library", "//pkg/kubelet/pod:go_default_library", "//pkg/kubelet/pod/testing:go_default_library", diff --git a/pkg/kubelet/volumemanager/populator/BUILD b/pkg/kubelet/volumemanager/populator/BUILD index e4596291bd8..b1b85188e05 100644 --- a/pkg/kubelet/volumemanager/populator/BUILD +++ b/pkg/kubelet/volumemanager/populator/BUILD @@ -53,6 +53,7 @@ go_test( deps = [ "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset/fake:go_default_library", + "//pkg/kubelet/configmap:go_default_library", "//pkg/kubelet/container/testing:go_default_library", "//pkg/kubelet/pod:go_default_library", "//pkg/kubelet/pod/testing:go_default_library", diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go index 66d74ed0e71..79d6da5ae76 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go @@ -23,6 +23,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + "k8s.io/kubernetes/pkg/kubelet/configmap" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" @@ -40,8 +41,9 @@ func TestFindAndAddNewPods_FindAndRemoveDeletedPods(t *testing.T) { fakeClient := &fake.Clientset{} fakeSecretManager := secret.NewFakeManager() + fakeConfigMapManager := configmap.NewFakeManager() fakePodManager := kubepod.NewBasicPodManager( - podtest.NewFakeMirrorClient(), fakeSecretManager) + podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager) fakesDSW := cache.NewDesiredStateOfWorld(fakeVolumePluginMgr) fakeRuntime := &containertest.FakeRuntime{} diff --git a/pkg/kubelet/volumemanager/volume_manager_test.go b/pkg/kubelet/volumemanager/volume_manager_test.go index 0f523c90593..4794ceaa7d4 100644 --- a/pkg/kubelet/volumemanager/volume_manager_test.go +++ b/pkg/kubelet/volumemanager/volume_manager_test.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" "k8s.io/kubernetes/pkg/kubelet/config" + "k8s.io/kubernetes/pkg/kubelet/configmap" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" "k8s.io/kubernetes/pkg/kubelet/pod" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" @@ -56,7 +57,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) { t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager()) + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager()) node, pod, pv, claim := createObjects() kubeClient := fake.NewSimpleClientset(node, pod, pv, claim) @@ -101,7 +102,7 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) { t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager()) + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager()) node, pod, _, claim := createObjects() diff --git a/pkg/volume/configmap/configmap.go b/pkg/volume/configmap/configmap.go index 1134a068b6a..4e869293bd7 100644 --- a/pkg/volume/configmap/configmap.go +++ b/pkg/volume/configmap/configmap.go @@ -42,13 +42,15 @@ const ( // configMapPlugin implements the VolumePlugin interface. type configMapPlugin struct { - host volume.VolumeHost + host volume.VolumeHost + getConfigMap func(namespace, name string) (*v1.ConfigMap, error) } var _ volume.VolumePlugin = &configMapPlugin{} func (plugin *configMapPlugin) Init(host volume.VolumeHost) error { plugin.host = host + plugin.getConfigMap = host.GetConfigMapFunc() return nil } @@ -86,14 +88,32 @@ func (plugin *configMapPlugin) SupportsBulkVolumeVerification() bool { func (plugin *configMapPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) { return &configMapVolumeMounter{ - configMapVolume: &configMapVolume{spec.Name(), pod.UID, plugin, plugin.host.GetMounter(), plugin.host.GetWriter(), volume.MetricsNil{}}, - source: *spec.Volume.ConfigMap, - pod: *pod, - opts: &opts}, nil + configMapVolume: &configMapVolume{ + spec.Name(), + pod.UID, + plugin, + plugin.host.GetMounter(), + plugin.host.GetWriter(), + volume.MetricsNil{}, + }, + source: *spec.Volume.ConfigMap, + pod: *pod, + opts: &opts, + getConfigMap: plugin.getConfigMap, + }, nil } func (plugin *configMapPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) { - return &configMapVolumeUnmounter{&configMapVolume{volName, podUID, plugin, plugin.host.GetMounter(), plugin.host.GetWriter(), volume.MetricsNil{}}}, nil + return &configMapVolumeUnmounter{ + &configMapVolume{ + volName, + podUID, + plugin, + plugin.host.GetMounter(), + plugin.host.GetWriter(), + volume.MetricsNil{}, + }, + }, nil } func (plugin *configMapPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { @@ -126,9 +146,10 @@ func (sv *configMapVolume) GetPath() string { type configMapVolumeMounter struct { *configMapVolume - source v1.ConfigMapVolumeSource - pod v1.Pod - opts *volume.VolumeOptions + source v1.ConfigMapVolumeSource + pod v1.Pod + opts *volume.VolumeOptions + getConfigMap func(namespace, name string) (*v1.ConfigMap, error) } var _ volume.Mounter = &configMapVolumeMounter{} @@ -174,13 +195,8 @@ func (b *configMapVolumeMounter) SetUpAt(dir string, fsGroup *types.UnixGroupID) return err } - kubeClient := b.plugin.host.GetKubeClient() - if kubeClient == nil { - return fmt.Errorf("Cannot setup configMap volume %v because kube client is not configured", b.volName) - } - optional := b.source.Optional != nil && *b.source.Optional - configMap, err := kubeClient.Core().ConfigMaps(b.pod.Namespace).Get(b.source.Name, metav1.GetOptions{}) + configMap, err := b.getConfigMap(b.pod.Namespace, b.source.Name) if err != nil { if !(errors.IsNotFound(err) && optional) { glog.Errorf("Couldn't get configMap %v/%v: %v", b.pod.Namespace, b.source.Name, err) diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index a6abfd3cd2d..41721d1eeaf 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -233,6 +233,9 @@ type VolumeHost interface { // Returns a function that returns a secret. GetSecretFunc() func(namespace, name string) (*v1.Secret, error) + // Returns a function that returns a configmap. + GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) + // Returns the labels on the node GetNodeLabels() (map[string]string, error) } diff --git a/pkg/volume/projected/projected.go b/pkg/volume/projected/projected.go index 310d8674361..390e6eea2d3 100644 --- a/pkg/volume/projected/projected.go +++ b/pkg/volume/projected/projected.go @@ -45,8 +45,9 @@ const ( ) type projectedPlugin struct { - host volume.VolumeHost - getSecret func(namespace, name string) (*v1.Secret, error) + host volume.VolumeHost + getSecret func(namespace, name string) (*v1.Secret, error) + getConfigMap func(namespace, name string) (*v1.ConfigMap, error) } var _ volume.VolumePlugin = &projectedPlugin{} @@ -68,6 +69,7 @@ func getPath(uid types.UID, volName string, host volume.VolumeHost) string { func (plugin *projectedPlugin) Init(host volume.VolumeHost) error { plugin.host = host plugin.getSecret = host.GetSecretFunc() + plugin.getConfigMap = host.GetConfigMapFunc() return nil } @@ -235,10 +237,10 @@ func (s *projectedVolumeMounter) collectData() (map[string]volumeutil.FileProjec secretapi, err := s.plugin.getSecret(s.pod.Namespace, source.Secret.Name) if err != nil { if !(errors.IsNotFound(err) && optional) { - glog.Errorf("Couldn't get secret %v/%v", s.pod.Namespace, source.Secret.Name) + glog.Errorf("Couldn't get secret %v/%v: %v", s.pod.Namespace, source.Secret.Name, err) errlist = append(errlist, err) + continue } - secretapi = &v1.Secret{ ObjectMeta: metav1.ObjectMeta{ Namespace: s.pod.Namespace, @@ -248,17 +250,16 @@ func (s *projectedVolumeMounter) collectData() (map[string]volumeutil.FileProjec } secretPayload, err := secret.MakePayload(source.Secret.Items, secretapi, s.source.DefaultMode, optional) if err != nil { - glog.Errorf("Couldn't get secret %v/%v: %v", s.pod.Namespace, source.Secret.Name, err) + glog.Errorf("Couldn't get secret payload %v/%v: %v", s.pod.Namespace, source.Secret.Name, err) errlist = append(errlist, err) continue } - for k, v := range secretPayload { payload[k] = v } } else if source.ConfigMap != nil { optional := source.ConfigMap.Optional != nil && *source.ConfigMap.Optional - configMap, err := kubeClient.Core().ConfigMaps(s.pod.Namespace).Get(source.ConfigMap.Name, metav1.GetOptions{}) + configMap, err := s.plugin.getConfigMap(s.pod.Namespace, source.ConfigMap.Name) if err != nil { if !(errors.IsNotFound(err) && optional) { glog.Errorf("Couldn't get configMap %v/%v: %v", s.pod.Namespace, source.ConfigMap.Name, err) @@ -274,6 +275,7 @@ func (s *projectedVolumeMounter) collectData() (map[string]volumeutil.FileProjec } configMapPayload, err := configmap.MakePayload(source.ConfigMap.Items, configMap, s.source.DefaultMode, optional) if err != nil { + glog.Errorf("Couldn't get configMap payload %v/%v: %v", s.pod.Namespace, source.ConfigMap.Name, err) errlist = append(errlist, err) continue } diff --git a/pkg/volume/projected/projected_test.go b/pkg/volume/projected/projected_test.go index a8d1f4a8377..131ff907516 100644 --- a/pkg/volume/projected/projected_test.go +++ b/pkg/volume/projected/projected_test.go @@ -505,7 +505,8 @@ func TestCollectDataWithConfigMap(t *testing.T) { sources: source.Sources, podUID: pod.UID, plugin: &projectedPlugin{ - host: host, + host: host, + getConfigMap: host.GetConfigMapFunc(), }, }, source: *source, diff --git a/pkg/volume/secret/secret.go b/pkg/volume/secret/secret.go index ae638bb4f08..40af0d20280 100644 --- a/pkg/volume/secret/secret.go +++ b/pkg/volume/secret/secret.go @@ -198,7 +198,7 @@ func (b *secretVolumeMounter) SetUpAt(dir string, fsGroup *types.UnixGroupID) er secret, err := b.getSecret(b.pod.Namespace, b.source.SecretName) if err != nil { if !(errors.IsNotFound(err) && optional) { - glog.Errorf("Couldn't get secret %v/%v", b.pod.Namespace, b.source.SecretName) + glog.Errorf("Couldn't get secret %v/%v: %v", b.pod.Namespace, b.source.SecretName, err) return err } secret = &v1.Secret{ diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 2e489b7a4b8..e80a8a5271d 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -134,6 +134,12 @@ func (f *fakeVolumeHost) GetSecretFunc() func(namespace, name string) (*v1.Secre } } +func (f *fakeVolumeHost) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) { + return func(namespace, name string) (*v1.ConfigMap, error) { + return f.kubeClient.Core().ConfigMaps(namespace).Get(name, metav1.GetOptions{}) + } +} + func (f *fakeVolumeHost) GetNodeLabels() (map[string]string, error) { return map[string]string{"test-label": "test-value"}, nil }