mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #46470 from shyamjvs/configmap-interface
Automatic merge from submit-queue Migrate kubelet to ConfigMapManager interface and use TTL-based caching manager Fixes #41379 Sometime ago we moved to a secret manager interface for kubelet to manage secrets. This PR's first commit moves config map management also to a similar interface. The second commit adds TTL-based CachingConfigMapManager (similar to CachingSecretManager) and makes kubelet use it. /cc @kubernetes/sig-node-pr-reviews @kubernetes/sig-scalability-misc @wojtek-t @dchen1107
This commit is contained in:
commit
c13d8917c2
@ -197,6 +197,60 @@ func visitContainerSecretNames(container *v1.Container, visitor Visitor) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// VisitPodConfigmapNames invokes the visitor function with the name of every configmap
|
||||
// referenced by the pod spec. If visitor returns false, visiting is short-circuited.
|
||||
// Transitive references (e.g. pod -> pvc -> pv -> secret) are not visited.
|
||||
// Returns true if visiting completed, false if visiting was short-circuited.
|
||||
func VisitPodConfigmapNames(pod *v1.Pod, visitor Visitor) bool {
|
||||
for i := range pod.Spec.InitContainers {
|
||||
if !visitContainerConfigmapNames(&pod.Spec.InitContainers[i], visitor) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
for i := range pod.Spec.Containers {
|
||||
if !visitContainerConfigmapNames(&pod.Spec.Containers[i], visitor) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
var source *v1.VolumeSource
|
||||
for i := range pod.Spec.Volumes {
|
||||
source = &pod.Spec.Volumes[i].VolumeSource
|
||||
switch {
|
||||
case source.Projected != nil:
|
||||
for j := range source.Projected.Sources {
|
||||
if source.Projected.Sources[j].ConfigMap != nil {
|
||||
if !visitor(source.Projected.Sources[j].ConfigMap.Name) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
case source.ConfigMap != nil:
|
||||
if !visitor(source.ConfigMap.Name) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func visitContainerConfigmapNames(container *v1.Container, visitor Visitor) bool {
|
||||
for _, env := range container.EnvFrom {
|
||||
if env.ConfigMapRef != nil {
|
||||
if !visitor(env.ConfigMapRef.Name) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, envVar := range container.Env {
|
||||
if envVar.ValueFrom != nil && envVar.ValueFrom.ConfigMapKeyRef != nil {
|
||||
if !visitor(envVar.ValueFrom.ConfigMapKeyRef.Name) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// GetContainerStatus extracts the status of container "name" from "statuses".
|
||||
// It also returns if "name" exists.
|
||||
func GetContainerStatus(statuses []v1.ContainerStatus, name string) (v1.ContainerStatus, bool) {
|
||||
|
@ -552,6 +552,12 @@ func (adc *attachDetachController) GetSecretFunc() func(namespace, name string)
|
||||
}
|
||||
}
|
||||
|
||||
func (adc *attachDetachController) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) {
|
||||
return func(_, _ string) (*v1.ConfigMap, error) {
|
||||
return nil, fmt.Errorf("GetConfigMap unsupported in attachDetachController")
|
||||
}
|
||||
}
|
||||
|
||||
func (adc *attachDetachController) addNodeToDswp(node *v1.Node, nodeName types.NodeName) {
|
||||
if _, exists := node.Annotations[volumehelper.ControllerManagedAttachAnnotation]; exists {
|
||||
keepTerminatedPodVolumes := false
|
||||
|
@ -87,6 +87,12 @@ func (adc *PersistentVolumeController) GetSecretFunc() func(namespace, name stri
|
||||
}
|
||||
}
|
||||
|
||||
func (adc *PersistentVolumeController) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) {
|
||||
return func(_, _ string) (*v1.ConfigMap, error) {
|
||||
return nil, fmt.Errorf("GetConfigMap unsupported in PersistentVolumeController")
|
||||
}
|
||||
}
|
||||
|
||||
func (ctrl *PersistentVolumeController) GetNodeLabels() (map[string]string, error) {
|
||||
return nil, fmt.Errorf("GetNodeLabels() unsupported in PersistentVolumeController")
|
||||
}
|
||||
|
@ -57,6 +57,7 @@ go_library(
|
||||
"//pkg/kubelet/certificate:go_default_library",
|
||||
"//pkg/kubelet/cm:go_default_library",
|
||||
"//pkg/kubelet/config:go_default_library",
|
||||
"//pkg/kubelet/configmap:go_default_library",
|
||||
"//pkg/kubelet/container:go_default_library",
|
||||
"//pkg/kubelet/dockershim:go_default_library",
|
||||
"//pkg/kubelet/dockershim/libdocker:go_default_library",
|
||||
@ -174,6 +175,7 @@ go_test(
|
||||
"//pkg/kubelet/cadvisor/testing:go_default_library",
|
||||
"//pkg/kubelet/cm:go_default_library",
|
||||
"//pkg/kubelet/config:go_default_library",
|
||||
"//pkg/kubelet/configmap:go_default_library",
|
||||
"//pkg/kubelet/container:go_default_library",
|
||||
"//pkg/kubelet/container/testing:go_default_library",
|
||||
"//pkg/kubelet/eviction:go_default_library",
|
||||
@ -249,6 +251,7 @@ filegroup(
|
||||
"//pkg/kubelet/client:all-srcs",
|
||||
"//pkg/kubelet/cm:all-srcs",
|
||||
"//pkg/kubelet/config:all-srcs",
|
||||
"//pkg/kubelet/configmap:all-srcs",
|
||||
"//pkg/kubelet/container:all-srcs",
|
||||
"//pkg/kubelet/custommetrics:all-srcs",
|
||||
"//pkg/kubelet/dockershim:all-srcs",
|
||||
|
59
pkg/kubelet/configmap/BUILD
Normal file
59
pkg/kubelet/configmap/BUILD
Normal file
@ -0,0 +1,59 @@
|
||||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
licenses(["notice"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
"go_test",
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"configmap_manager.go",
|
||||
"fake_manager.go",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/api/v1:go_default_library",
|
||||
"//pkg/api/v1/pod:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||
"//pkg/kubelet/util:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["configmap_manager_test.go"],
|
||||
library = ":go_default_library",
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/api/v1:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||
"//vendor/k8s.io/client-go/testing:go_default_library",
|
||||
],
|
||||
)
|
302
pkg/kubelet/configmap/configmap_manager.go
Normal file
302
pkg/kubelet/configmap/configmap_manager.go
Normal file
@ -0,0 +1,302 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package configmap
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
storageetcd "k8s.io/apiserver/pkg/storage/etcd"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultTTL = time.Minute
|
||||
)
|
||||
|
||||
type Manager interface {
|
||||
// Get configmap by configmap namespace and name.
|
||||
GetConfigMap(namespace, name string) (*v1.ConfigMap, error)
|
||||
|
||||
// WARNING: Register/UnregisterPod functions should be efficient,
|
||||
// i.e. should not block on network operations.
|
||||
|
||||
// RegisterPod registers all configmaps from a given pod.
|
||||
RegisterPod(pod *v1.Pod)
|
||||
|
||||
// UnregisterPod unregisters configmaps from a given pod that are not
|
||||
// used by any other registered pod.
|
||||
UnregisterPod(pod *v1.Pod)
|
||||
}
|
||||
|
||||
// simpleConfigMapManager implements ConfigMap Manager interface with
|
||||
// simple operations to apiserver.
|
||||
type simpleConfigMapManager struct {
|
||||
kubeClient clientset.Interface
|
||||
}
|
||||
|
||||
func NewSimpleConfigMapManager(kubeClient clientset.Interface) Manager {
|
||||
return &simpleConfigMapManager{kubeClient: kubeClient}
|
||||
}
|
||||
|
||||
func (s *simpleConfigMapManager) GetConfigMap(namespace, name string) (*v1.ConfigMap, error) {
|
||||
return s.kubeClient.Core().ConfigMaps(namespace).Get(name, metav1.GetOptions{})
|
||||
}
|
||||
|
||||
func (s *simpleConfigMapManager) RegisterPod(pod *v1.Pod) {
|
||||
}
|
||||
|
||||
func (s *simpleConfigMapManager) UnregisterPod(pod *v1.Pod) {
|
||||
}
|
||||
|
||||
type GetObjectTTLFunc func() (time.Duration, bool)
|
||||
|
||||
type objectKey struct {
|
||||
namespace string
|
||||
name string
|
||||
}
|
||||
|
||||
// configMapStoreItems is a single item stored in configMapStore.
|
||||
type configMapStoreItem struct {
|
||||
refCount int
|
||||
configMap *configMapData
|
||||
}
|
||||
|
||||
type configMapData struct {
|
||||
sync.Mutex
|
||||
|
||||
configMap *v1.ConfigMap
|
||||
err error
|
||||
lastUpdateTime time.Time
|
||||
}
|
||||
|
||||
// configMapStore is a local cache of configmaps.
|
||||
type configMapStore struct {
|
||||
kubeClient clientset.Interface
|
||||
clock clock.Clock
|
||||
|
||||
lock sync.Mutex
|
||||
items map[objectKey]*configMapStoreItem
|
||||
|
||||
defaultTTL time.Duration
|
||||
getTTL GetObjectTTLFunc
|
||||
}
|
||||
|
||||
func newConfigMapStore(kubeClient clientset.Interface, clock clock.Clock, getTTL GetObjectTTLFunc, ttl time.Duration) *configMapStore {
|
||||
return &configMapStore{
|
||||
kubeClient: kubeClient,
|
||||
clock: clock,
|
||||
items: make(map[objectKey]*configMapStoreItem),
|
||||
defaultTTL: ttl,
|
||||
getTTL: getTTL,
|
||||
}
|
||||
}
|
||||
|
||||
func isConfigMapOlder(newConfigMap, oldConfigMap *v1.ConfigMap) bool {
|
||||
if newConfigMap == nil || oldConfigMap == nil {
|
||||
return false
|
||||
}
|
||||
newVersion, _ := storageetcd.Versioner.ObjectResourceVersion(newConfigMap)
|
||||
oldVersion, _ := storageetcd.Versioner.ObjectResourceVersion(oldConfigMap)
|
||||
return newVersion < oldVersion
|
||||
}
|
||||
|
||||
func (s *configMapStore) Add(namespace, name string) {
|
||||
key := objectKey{namespace: namespace, name: name}
|
||||
|
||||
// Add is called from RegisterPod, thus it needs to be efficient.
|
||||
// Thus Add() is only increasing refCount and generation of a given configmap.
|
||||
// Then Get() is responsible for fetching if needed.
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
item, exists := s.items[key]
|
||||
if !exists {
|
||||
item = &configMapStoreItem{
|
||||
refCount: 0,
|
||||
configMap: &configMapData{},
|
||||
}
|
||||
s.items[key] = item
|
||||
}
|
||||
|
||||
item.refCount++
|
||||
// This will trigger fetch on the next Get() operation.
|
||||
item.configMap = nil
|
||||
}
|
||||
|
||||
func (s *configMapStore) Delete(namespace, name string) {
|
||||
key := objectKey{namespace: namespace, name: name}
|
||||
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
if item, ok := s.items[key]; ok {
|
||||
item.refCount--
|
||||
if item.refCount == 0 {
|
||||
delete(s.items, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func GetObjectTTLFromNodeFunc(getNode func() (*v1.Node, error)) GetObjectTTLFunc {
|
||||
return func() (time.Duration, bool) {
|
||||
node, err := getNode()
|
||||
if err != nil {
|
||||
return time.Duration(0), false
|
||||
}
|
||||
if node != nil && node.Annotations != nil {
|
||||
if value, ok := node.Annotations[v1.ObjectTTLAnnotationKey]; ok {
|
||||
if intValue, err := strconv.Atoi(value); err == nil {
|
||||
return time.Duration(intValue) * time.Second, true
|
||||
}
|
||||
}
|
||||
}
|
||||
return time.Duration(0), false
|
||||
}
|
||||
}
|
||||
|
||||
func (s *configMapStore) isConfigMapFresh(data *configMapData) bool {
|
||||
configMapTTL := s.defaultTTL
|
||||
if ttl, ok := s.getTTL(); ok {
|
||||
configMapTTL = ttl
|
||||
}
|
||||
return s.clock.Now().Before(data.lastUpdateTime.Add(configMapTTL))
|
||||
}
|
||||
|
||||
func (s *configMapStore) Get(namespace, name string) (*v1.ConfigMap, error) {
|
||||
key := objectKey{namespace: namespace, name: name}
|
||||
|
||||
data := func() *configMapData {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
item, exists := s.items[key]
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
if item.configMap == nil {
|
||||
item.configMap = &configMapData{}
|
||||
}
|
||||
return item.configMap
|
||||
}()
|
||||
if data == nil {
|
||||
return nil, fmt.Errorf("configmap %q/%q not registered", namespace, name)
|
||||
}
|
||||
|
||||
// After updating data in configMapStore, lock the data, fetch configMap if
|
||||
// needed and return data.
|
||||
data.Lock()
|
||||
defer data.Unlock()
|
||||
if data.err != nil || !s.isConfigMapFresh(data) {
|
||||
opts := metav1.GetOptions{}
|
||||
if data.configMap != nil && data.err == nil {
|
||||
// This is just a periodic refresh of a configmap we successfully fetched previously.
|
||||
// In this case, server data from apiserver cache to reduce the load on both
|
||||
// etcd and apiserver (the cache is eventually consistent).
|
||||
util.FromApiserverCache(&opts)
|
||||
}
|
||||
configMap, err := s.kubeClient.Core().ConfigMaps(namespace).Get(name, opts)
|
||||
if err != nil && !apierrors.IsNotFound(err) && data.configMap == nil && data.err == nil {
|
||||
// Couldn't fetch the latest configmap, but there is no cached data to return.
|
||||
// Return the fetch result instead.
|
||||
return configMap, err
|
||||
}
|
||||
if (err == nil && !isConfigMapOlder(configMap, data.configMap)) || apierrors.IsNotFound(err) {
|
||||
// If the fetch succeeded with a newer version of the configmap, or if the
|
||||
// configmap could not be found in the apiserver, update the cached data to
|
||||
// reflect the current status.
|
||||
data.configMap = configMap
|
||||
data.err = err
|
||||
data.lastUpdateTime = s.clock.Now()
|
||||
}
|
||||
}
|
||||
return data.configMap, data.err
|
||||
}
|
||||
|
||||
// cachingConfigMapManager keeps a cache of all configmaps necessary for registered pods.
|
||||
// It implements the following logic:
|
||||
// - whenever a pod is created or updated, the cached versions of all its configmaps
|
||||
// are invalidated
|
||||
// - every GetConfigMap() call tries to fetch the value from local cache; if it is
|
||||
// not there, invalidated or too old, we fetch it from apiserver and refresh the
|
||||
// value in cache; otherwise it is just fetched from cache
|
||||
type cachingConfigMapManager struct {
|
||||
configMapStore *configMapStore
|
||||
|
||||
lock sync.Mutex
|
||||
registeredPods map[objectKey]*v1.Pod
|
||||
}
|
||||
|
||||
func NewCachingConfigMapManager(kubeClient clientset.Interface, getTTL GetObjectTTLFunc) Manager {
|
||||
csm := &cachingConfigMapManager{
|
||||
configMapStore: newConfigMapStore(kubeClient, clock.RealClock{}, getTTL, defaultTTL),
|
||||
registeredPods: make(map[objectKey]*v1.Pod),
|
||||
}
|
||||
return csm
|
||||
}
|
||||
|
||||
func (c *cachingConfigMapManager) GetConfigMap(namespace, name string) (*v1.ConfigMap, error) {
|
||||
return c.configMapStore.Get(namespace, name)
|
||||
}
|
||||
|
||||
func getConfigMapNames(pod *v1.Pod) sets.String {
|
||||
result := sets.NewString()
|
||||
podutil.VisitPodConfigmapNames(pod, func(name string) bool {
|
||||
result.Insert(name)
|
||||
return true
|
||||
})
|
||||
return result
|
||||
}
|
||||
|
||||
func (c *cachingConfigMapManager) RegisterPod(pod *v1.Pod) {
|
||||
names := getConfigMapNames(pod)
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
for name := range names {
|
||||
c.configMapStore.Add(pod.Namespace, name)
|
||||
}
|
||||
var prev *v1.Pod
|
||||
key := objectKey{namespace: pod.Namespace, name: pod.Name}
|
||||
prev = c.registeredPods[key]
|
||||
c.registeredPods[key] = pod
|
||||
if prev != nil {
|
||||
for name := range getConfigMapNames(prev) {
|
||||
c.configMapStore.Delete(prev.Namespace, name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cachingConfigMapManager) UnregisterPod(pod *v1.Pod) {
|
||||
var prev *v1.Pod
|
||||
key := objectKey{namespace: pod.Namespace, name: pod.Name}
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
prev = c.registeredPods[key]
|
||||
delete(c.registeredPods, key)
|
||||
if prev != nil {
|
||||
for name := range getConfigMapNames(prev) {
|
||||
c.configMapStore.Delete(prev.Namespace, name)
|
||||
}
|
||||
}
|
||||
}
|
537
pkg/kubelet/configmap/configmap_manager_test.go
Normal file
537
pkg/kubelet/configmap/configmap_manager_test.go
Normal file
@ -0,0 +1,537 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package configmap
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
core "k8s.io/client-go/testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func checkConfigMap(t *testing.T, store *configMapStore, ns, name string, shouldExist bool) {
|
||||
_, err := store.Get(ns, name)
|
||||
if shouldExist && err != nil {
|
||||
t.Errorf("unexpected actions: %#v", err)
|
||||
}
|
||||
if !shouldExist && (err == nil || !strings.Contains(err.Error(), fmt.Sprintf("configmap %q/%q not registered", ns, name))) {
|
||||
t.Errorf("unexpected actions: %#v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func noObjectTTL() (time.Duration, bool) {
|
||||
return time.Duration(0), false
|
||||
}
|
||||
|
||||
func TestConfigMapStore(t *testing.T) {
|
||||
fakeClient := &fake.Clientset{}
|
||||
store := newConfigMapStore(fakeClient, clock.RealClock{}, noObjectTTL, 0)
|
||||
store.Add("ns1", "name1")
|
||||
store.Add("ns2", "name2")
|
||||
store.Add("ns1", "name1")
|
||||
store.Add("ns1", "name1")
|
||||
store.Delete("ns1", "name1")
|
||||
store.Delete("ns2", "name2")
|
||||
store.Add("ns3", "name3")
|
||||
|
||||
// Adds don't issue Get requests.
|
||||
actions := fakeClient.Actions()
|
||||
assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions)
|
||||
// Should issue Get request
|
||||
store.Get("ns1", "name1")
|
||||
// Shouldn't issue Get request, as configMap is not registered
|
||||
store.Get("ns2", "name2")
|
||||
// Should issue Get request
|
||||
store.Get("ns3", "name3")
|
||||
|
||||
actions = fakeClient.Actions()
|
||||
assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions)
|
||||
|
||||
for _, a := range actions {
|
||||
assert.True(t, a.Matches("get", "configmaps"), "unexpected actions: %#v", a)
|
||||
}
|
||||
|
||||
checkConfigMap(t, store, "ns1", "name1", true)
|
||||
checkConfigMap(t, store, "ns2", "name2", false)
|
||||
checkConfigMap(t, store, "ns3", "name3", true)
|
||||
checkConfigMap(t, store, "ns4", "name4", false)
|
||||
}
|
||||
|
||||
func TestConfigMapStoreDeletingConfigMap(t *testing.T) {
|
||||
fakeClient := &fake.Clientset{}
|
||||
store := newConfigMapStore(fakeClient, clock.RealClock{}, noObjectTTL, 0)
|
||||
store.Add("ns", "name")
|
||||
|
||||
result := &v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name", ResourceVersion: "10"}}
|
||||
fakeClient.AddReactor("get", "configmaps", func(action core.Action) (bool, runtime.Object, error) {
|
||||
return true, result, nil
|
||||
})
|
||||
configMap, err := store.Get("ns", "name")
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(configMap, result) {
|
||||
t.Errorf("Unexpected configMap: %v", configMap)
|
||||
}
|
||||
|
||||
fakeClient.PrependReactor("get", "configmaps", func(action core.Action) (bool, runtime.Object, error) {
|
||||
return true, &v1.ConfigMap{}, apierrors.NewNotFound(v1.Resource("configMap"), "name")
|
||||
})
|
||||
configMap, err = store.Get("ns", "name")
|
||||
if err == nil || !apierrors.IsNotFound(err) {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(configMap, &v1.ConfigMap{}) {
|
||||
t.Errorf("Unexpected configMap: %v", configMap)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigMapStoreGetAlwaysRefresh(t *testing.T) {
|
||||
fakeClient := &fake.Clientset{}
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
store := newConfigMapStore(fakeClient, fakeClock, noObjectTTL, 0)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i))
|
||||
}
|
||||
fakeClient.ClearActions()
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(100)
|
||||
for i := 0; i < 100; i++ {
|
||||
go func(i int) {
|
||||
store.Get(fmt.Sprintf("ns-%d", i%10), fmt.Sprintf("name-%d", i%10))
|
||||
wg.Done()
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
actions := fakeClient.Actions()
|
||||
assert.Equal(t, 100, len(actions), "unexpected actions: %#v", actions)
|
||||
|
||||
for _, a := range actions {
|
||||
assert.True(t, a.Matches("get", "configmaps"), "unexpected actions: %#v", a)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigMapStoreGetNeverRefresh(t *testing.T) {
|
||||
fakeClient := &fake.Clientset{}
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
store := newConfigMapStore(fakeClient, fakeClock, noObjectTTL, time.Minute)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i))
|
||||
}
|
||||
fakeClient.ClearActions()
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(100)
|
||||
for i := 0; i < 100; i++ {
|
||||
go func(i int) {
|
||||
store.Get(fmt.Sprintf("ns-%d", i%10), fmt.Sprintf("name-%d", i%10))
|
||||
wg.Done()
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
actions := fakeClient.Actions()
|
||||
// Only first Get, should forward the Get request.
|
||||
assert.Equal(t, 10, len(actions), "unexpected actions: %#v", actions)
|
||||
}
|
||||
|
||||
func TestCustomTTL(t *testing.T) {
|
||||
ttl := time.Duration(0)
|
||||
ttlExists := false
|
||||
customTTL := func() (time.Duration, bool) {
|
||||
return ttl, ttlExists
|
||||
}
|
||||
|
||||
fakeClient := &fake.Clientset{}
|
||||
fakeClock := clock.NewFakeClock(time.Time{})
|
||||
store := newConfigMapStore(fakeClient, fakeClock, customTTL, time.Minute)
|
||||
|
||||
store.Add("ns", "name")
|
||||
store.Get("ns", "name")
|
||||
fakeClient.ClearActions()
|
||||
|
||||
// Set 0-ttl and see if that works.
|
||||
ttl = time.Duration(0)
|
||||
ttlExists = true
|
||||
store.Get("ns", "name")
|
||||
actions := fakeClient.Actions()
|
||||
assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions)
|
||||
fakeClient.ClearActions()
|
||||
|
||||
// Set 5-minute ttl and see if this works.
|
||||
ttl = time.Duration(5) * time.Minute
|
||||
store.Get("ns", "name")
|
||||
actions = fakeClient.Actions()
|
||||
assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions)
|
||||
// Still no effect after 4 minutes.
|
||||
fakeClock.Step(4 * time.Minute)
|
||||
store.Get("ns", "name")
|
||||
actions = fakeClient.Actions()
|
||||
assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions)
|
||||
// Now it should have an effect.
|
||||
fakeClock.Step(time.Minute)
|
||||
store.Get("ns", "name")
|
||||
actions = fakeClient.Actions()
|
||||
assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions)
|
||||
fakeClient.ClearActions()
|
||||
|
||||
// Now remove the custom ttl and see if that works.
|
||||
ttlExists = false
|
||||
fakeClock.Step(55 * time.Second)
|
||||
store.Get("ns", "name")
|
||||
actions = fakeClient.Actions()
|
||||
assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions)
|
||||
// Pass the minute and it should be triggered now.
|
||||
fakeClock.Step(5 * time.Second)
|
||||
store.Get("ns", "name")
|
||||
actions = fakeClient.Actions()
|
||||
assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions)
|
||||
}
|
||||
|
||||
func TestParseNodeAnnotation(t *testing.T) {
|
||||
testCases := []struct {
|
||||
node *v1.Node
|
||||
err error
|
||||
exists bool
|
||||
ttl time.Duration
|
||||
}{
|
||||
{
|
||||
node: nil,
|
||||
err: fmt.Errorf("error"),
|
||||
exists: false,
|
||||
},
|
||||
{
|
||||
node: &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node",
|
||||
},
|
||||
},
|
||||
exists: false,
|
||||
},
|
||||
{
|
||||
node: &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node",
|
||||
Annotations: map[string]string{},
|
||||
},
|
||||
},
|
||||
exists: false,
|
||||
},
|
||||
{
|
||||
node: &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node",
|
||||
Annotations: map[string]string{v1.ObjectTTLAnnotationKey: "bad"},
|
||||
},
|
||||
},
|
||||
exists: false,
|
||||
},
|
||||
{
|
||||
node: &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node",
|
||||
Annotations: map[string]string{v1.ObjectTTLAnnotationKey: "0"},
|
||||
},
|
||||
},
|
||||
exists: true,
|
||||
ttl: time.Duration(0),
|
||||
},
|
||||
{
|
||||
node: &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node",
|
||||
Annotations: map[string]string{v1.ObjectTTLAnnotationKey: "60"},
|
||||
},
|
||||
},
|
||||
exists: true,
|
||||
ttl: time.Minute,
|
||||
},
|
||||
}
|
||||
for i, testCase := range testCases {
|
||||
getNode := func() (*v1.Node, error) { return testCase.node, testCase.err }
|
||||
ttl, exists := GetObjectTTLFromNodeFunc(getNode)()
|
||||
if exists != testCase.exists {
|
||||
t.Errorf("%d: incorrect parsing: %t", i, exists)
|
||||
continue
|
||||
}
|
||||
if exists && ttl != testCase.ttl {
|
||||
t.Errorf("%d: incorrect ttl: %v", i, ttl)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type envConfigMaps struct {
|
||||
envVarNames []string
|
||||
envFromNames []string
|
||||
}
|
||||
|
||||
type configMapsToAttach struct {
|
||||
containerEnvConfigMaps []envConfigMaps
|
||||
volumes []string
|
||||
}
|
||||
|
||||
func podWithConfigMaps(ns, name string, toAttach configMapsToAttach) *v1.Pod {
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: ns,
|
||||
Name: name,
|
||||
},
|
||||
Spec: v1.PodSpec{},
|
||||
}
|
||||
for i, configMaps := range toAttach.containerEnvConfigMaps {
|
||||
container := v1.Container{
|
||||
Name: fmt.Sprintf("container-%d", i),
|
||||
}
|
||||
for _, name := range configMaps.envFromNames {
|
||||
envFrom := v1.EnvFromSource{
|
||||
ConfigMapRef: &v1.ConfigMapEnvSource{
|
||||
LocalObjectReference: v1.LocalObjectReference{
|
||||
Name: name,
|
||||
},
|
||||
},
|
||||
}
|
||||
container.EnvFrom = append(container.EnvFrom, envFrom)
|
||||
}
|
||||
|
||||
for _, name := range configMaps.envVarNames {
|
||||
envSource := &v1.EnvVarSource{
|
||||
ConfigMapKeyRef: &v1.ConfigMapKeySelector{
|
||||
LocalObjectReference: v1.LocalObjectReference{
|
||||
Name: name,
|
||||
},
|
||||
},
|
||||
}
|
||||
container.Env = append(container.Env, v1.EnvVar{ValueFrom: envSource})
|
||||
}
|
||||
pod.Spec.Containers = append(pod.Spec.Containers, container)
|
||||
}
|
||||
for _, configMap := range toAttach.volumes {
|
||||
volume := &v1.ConfigMapVolumeSource{
|
||||
LocalObjectReference: v1.LocalObjectReference{Name: configMap},
|
||||
}
|
||||
pod.Spec.Volumes = append(pod.Spec.Volumes, v1.Volume{
|
||||
Name: configMap,
|
||||
VolumeSource: v1.VolumeSource{
|
||||
ConfigMap: volume,
|
||||
},
|
||||
})
|
||||
}
|
||||
return pod
|
||||
}
|
||||
|
||||
func TestCacheInvalidation(t *testing.T) {
|
||||
fakeClient := &fake.Clientset{}
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
store := newConfigMapStore(fakeClient, fakeClock, noObjectTTL, time.Minute)
|
||||
manager := &cachingConfigMapManager{
|
||||
configMapStore: store,
|
||||
registeredPods: make(map[objectKey]*v1.Pod),
|
||||
}
|
||||
|
||||
// Create a pod with some configMaps.
|
||||
s1 := configMapsToAttach{
|
||||
containerEnvConfigMaps: []envConfigMaps{
|
||||
{envVarNames: []string{"s1"}, envFromNames: []string{"s10"}},
|
||||
{envVarNames: []string{"s2"}},
|
||||
},
|
||||
}
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "name1", s1))
|
||||
// Fetch both configMaps - this should triggger get operations.
|
||||
store.Get("ns1", "s1")
|
||||
store.Get("ns1", "s10")
|
||||
store.Get("ns1", "s2")
|
||||
actions := fakeClient.Actions()
|
||||
assert.Equal(t, 3, len(actions), "unexpected actions: %#v", actions)
|
||||
fakeClient.ClearActions()
|
||||
|
||||
// Update a pod with a new configMap.
|
||||
s2 := configMapsToAttach{
|
||||
containerEnvConfigMaps: []envConfigMaps{
|
||||
{envVarNames: []string{"s1"}},
|
||||
{envVarNames: []string{"s2"}, envFromNames: []string{"s20"}},
|
||||
},
|
||||
volumes: []string{"s3"},
|
||||
}
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "name1", s2))
|
||||
// All configMaps should be invalidated - this should trigger get operations.
|
||||
store.Get("ns1", "s1")
|
||||
store.Get("ns1", "s2")
|
||||
store.Get("ns1", "s20")
|
||||
store.Get("ns1", "s3")
|
||||
actions = fakeClient.Actions()
|
||||
assert.Equal(t, 4, len(actions), "unexpected actions: %#v", actions)
|
||||
fakeClient.ClearActions()
|
||||
|
||||
// Create a new pod that is refencing the first three configMaps - those should
|
||||
// be invalidated.
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "name2", s1))
|
||||
store.Get("ns1", "s1")
|
||||
store.Get("ns1", "s10")
|
||||
store.Get("ns1", "s2")
|
||||
store.Get("ns1", "s20")
|
||||
store.Get("ns1", "s3")
|
||||
actions = fakeClient.Actions()
|
||||
assert.Equal(t, 3, len(actions), "unexpected actions: %#v", actions)
|
||||
fakeClient.ClearActions()
|
||||
}
|
||||
|
||||
func TestCacheRefcounts(t *testing.T) {
|
||||
fakeClient := &fake.Clientset{}
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
store := newConfigMapStore(fakeClient, fakeClock, noObjectTTL, time.Minute)
|
||||
manager := &cachingConfigMapManager{
|
||||
configMapStore: store,
|
||||
registeredPods: make(map[objectKey]*v1.Pod),
|
||||
}
|
||||
|
||||
s1 := configMapsToAttach{
|
||||
containerEnvConfigMaps: []envConfigMaps{
|
||||
{envVarNames: []string{"s1"}, envFromNames: []string{"s10"}},
|
||||
{envVarNames: []string{"s2"}},
|
||||
},
|
||||
volumes: []string{"s3"},
|
||||
}
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "name1", s1))
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "name2", s1))
|
||||
s2 := configMapsToAttach{
|
||||
containerEnvConfigMaps: []envConfigMaps{
|
||||
{envVarNames: []string{"s4"}},
|
||||
{envVarNames: []string{"s5"}, envFromNames: []string{"s50"}},
|
||||
},
|
||||
}
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "name2", s2))
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "name3", s2))
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "name4", s2))
|
||||
manager.UnregisterPod(podWithConfigMaps("ns1", "name3", s2))
|
||||
s3 := configMapsToAttach{
|
||||
containerEnvConfigMaps: []envConfigMaps{
|
||||
{envVarNames: []string{"s3"}, envFromNames: []string{"s30"}},
|
||||
{envVarNames: []string{"s5"}},
|
||||
},
|
||||
}
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "name5", s3))
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "name6", s3))
|
||||
s4 := configMapsToAttach{
|
||||
containerEnvConfigMaps: []envConfigMaps{
|
||||
{envVarNames: []string{"s6"}},
|
||||
{envFromNames: []string{"s60"}},
|
||||
},
|
||||
}
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "name7", s4))
|
||||
manager.UnregisterPod(podWithConfigMaps("ns1", "name7", s4))
|
||||
|
||||
// Also check the Add + Update + Remove scenario.
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "other-name", s1))
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "other-name", s2))
|
||||
manager.UnregisterPod(podWithConfigMaps("ns1", "other-name", s2))
|
||||
|
||||
refs := func(ns, name string) int {
|
||||
store.lock.Lock()
|
||||
defer store.lock.Unlock()
|
||||
item, ok := store.items[objectKey{ns, name}]
|
||||
if !ok {
|
||||
return 0
|
||||
}
|
||||
return item.refCount
|
||||
}
|
||||
assert.Equal(t, refs("ns1", "s1"), 1)
|
||||
assert.Equal(t, refs("ns1", "s10"), 1)
|
||||
assert.Equal(t, refs("ns1", "s2"), 1)
|
||||
assert.Equal(t, refs("ns1", "s3"), 3)
|
||||
assert.Equal(t, refs("ns1", "s30"), 2)
|
||||
assert.Equal(t, refs("ns1", "s4"), 2)
|
||||
assert.Equal(t, refs("ns1", "s5"), 4)
|
||||
assert.Equal(t, refs("ns1", "s50"), 2)
|
||||
assert.Equal(t, refs("ns1", "s6"), 0)
|
||||
assert.Equal(t, refs("ns1", "s60"), 0)
|
||||
assert.Equal(t, refs("ns1", "s7"), 0)
|
||||
}
|
||||
|
||||
func TestCachingConfigMapManager(t *testing.T) {
|
||||
fakeClient := &fake.Clientset{}
|
||||
configMapStore := newConfigMapStore(fakeClient, clock.RealClock{}, noObjectTTL, 0)
|
||||
manager := &cachingConfigMapManager{
|
||||
configMapStore: configMapStore,
|
||||
registeredPods: make(map[objectKey]*v1.Pod),
|
||||
}
|
||||
|
||||
// Create a pod with some configMaps.
|
||||
s1 := configMapsToAttach{
|
||||
containerEnvConfigMaps: []envConfigMaps{
|
||||
{envVarNames: []string{"s1"}},
|
||||
{envFromNames: []string{"s20"}},
|
||||
},
|
||||
volumes: []string{"s2"},
|
||||
}
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "name1", s1))
|
||||
manager.RegisterPod(podWithConfigMaps("ns2", "name2", s1))
|
||||
// Update the pod with a different configMaps.
|
||||
s2 := configMapsToAttach{
|
||||
containerEnvConfigMaps: []envConfigMaps{
|
||||
{envVarNames: []string{"s3"}},
|
||||
{envVarNames: []string{"s4"}},
|
||||
{envFromNames: []string{"s40"}},
|
||||
},
|
||||
}
|
||||
// Create another pod, but with same configMaps in different namespace.
|
||||
manager.RegisterPod(podWithConfigMaps("ns2", "name2", s2))
|
||||
// Create and delete a pod with some other configMaps.
|
||||
s3 := configMapsToAttach{
|
||||
containerEnvConfigMaps: []envConfigMaps{
|
||||
{envVarNames: []string{"s6"}},
|
||||
{envFromNames: []string{"s60"}},
|
||||
},
|
||||
}
|
||||
manager.RegisterPod(podWithConfigMaps("ns3", "name", s3))
|
||||
manager.UnregisterPod(podWithConfigMaps("ns3", "name", s3))
|
||||
|
||||
existingMaps := map[string][]string{
|
||||
"ns1": {"s1", "s2", "s20"},
|
||||
"ns2": {"s3", "s4", "s40"},
|
||||
}
|
||||
shouldExist := func(ns, configMap string) bool {
|
||||
if cmaps, ok := existingMaps[ns]; ok {
|
||||
for _, cm := range cmaps {
|
||||
if cm == configMap {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
for _, ns := range []string{"ns1", "ns2", "ns3"} {
|
||||
for _, configMap := range []string{"s1", "s2", "s3", "s4", "s5", "s6", "s20", "s40", "s50"} {
|
||||
checkConfigMap(t, configMapStore, ns, configMap, shouldExist(ns, configMap))
|
||||
}
|
||||
}
|
||||
}
|
40
pkg/kubelet/configmap/fake_manager.go
Normal file
40
pkg/kubelet/configmap/fake_manager.go
Normal file
@ -0,0 +1,40 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package configmap
|
||||
|
||||
import (
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
)
|
||||
|
||||
// fakeManager implements Manager interface for testing purposes.
|
||||
// simple operations to apiserver.
|
||||
type fakeManager struct {
|
||||
}
|
||||
|
||||
func NewFakeManager() Manager {
|
||||
return &fakeManager{}
|
||||
}
|
||||
|
||||
func (s *fakeManager) GetConfigMap(namespace, name string) (*v1.ConfigMap, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *fakeManager) RegisterPod(pod *v1.Pod) {
|
||||
}
|
||||
|
||||
func (s *fakeManager) UnregisterPod(pod *v1.Pod) {
|
||||
}
|
@ -68,6 +68,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/certificate"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
"k8s.io/kubernetes/pkg/kubelet/configmap"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/dockershim"
|
||||
"k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker"
|
||||
@ -487,9 +488,12 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
|
||||
|
||||
secretManager := secret.NewCachingSecretManager(
|
||||
kubeDeps.KubeClient, secret.GetObjectTTLFromNodeFunc(klet.GetNode))
|
||||
|
||||
klet.secretManager = secretManager
|
||||
|
||||
configMapManager := configmap.NewCachingConfigMapManager(
|
||||
kubeDeps.KubeClient, configmap.GetObjectTTLFromNodeFunc(klet.GetNode))
|
||||
klet.configMapManager = configMapManager
|
||||
|
||||
if klet.experimentalHostUserNamespaceDefaulting {
|
||||
glog.Infof("Experimental host user namespace defaulting is enabled.")
|
||||
}
|
||||
@ -518,8 +522,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
|
||||
klet.livenessManager = proberesults.NewManager()
|
||||
|
||||
klet.podCache = kubecontainer.NewCache()
|
||||
// podManager is also responsible for keeping secretManager contents up-to-date.
|
||||
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager)
|
||||
// podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.
|
||||
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager)
|
||||
|
||||
if kubeCfg.RemoteRuntimeEndpoint != "" {
|
||||
// kubeCfg.RemoteImageEndpoint is same as kubeCfg.RemoteRuntimeEndpoint if not explicitly specified
|
||||
@ -717,7 +721,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
|
||||
kubeDeps.Recorder)
|
||||
|
||||
klet.volumePluginMgr, err =
|
||||
NewInitializedVolumePluginMgr(klet, secretManager, kubeDeps.VolumePlugins)
|
||||
NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, kubeDeps.VolumePlugins)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -917,6 +921,9 @@ type Kubelet struct {
|
||||
// Secret manager.
|
||||
secretManager secret.Manager
|
||||
|
||||
// ConfigMap manager.
|
||||
configMapManager configmap.Manager
|
||||
|
||||
// Cached MachineInfo returned by cadvisor.
|
||||
machineInfo *cadvisorapi.MachineInfo
|
||||
|
||||
|
@ -446,7 +446,7 @@ func (kl *Kubelet) makeEnvironmentVariables(pod *v1.Pod, container *v1.Container
|
||||
return result, fmt.Errorf("Couldn't get configMap %v/%v, no kubeClient defined", pod.Namespace, name)
|
||||
}
|
||||
optional := cm.Optional != nil && *cm.Optional
|
||||
configMap, err = kl.kubeClient.Core().ConfigMaps(pod.Namespace).Get(name, metav1.GetOptions{})
|
||||
configMap, err = kl.configMapManager.GetConfigMap(pod.Namespace, name)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) && optional {
|
||||
// ignore error when marked optional
|
||||
@ -554,7 +554,7 @@ func (kl *Kubelet) makeEnvironmentVariables(pod *v1.Pod, container *v1.Container
|
||||
if kl.kubeClient == nil {
|
||||
return result, fmt.Errorf("Couldn't get configMap %v/%v, no kubeClient defined", pod.Namespace, name)
|
||||
}
|
||||
configMap, err = kl.kubeClient.Core().ConfigMaps(pod.Namespace).Get(name, metav1.GetOptions{})
|
||||
configMap, err = kl.configMapManager.GetConfigMap(pod.Namespace, name)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) && optional {
|
||||
// ignore error when marked optional
|
||||
|
@ -46,6 +46,7 @@ import (
|
||||
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
"k8s.io/kubernetes/pkg/kubelet/configmap"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/eviction"
|
||||
@ -208,7 +209,9 @@ func newTestKubeletWithImageList(
|
||||
fakeMirrorClient := podtest.NewFakeMirrorClient()
|
||||
secretManager := secret.NewSimpleSecretManager(kubelet.kubeClient)
|
||||
kubelet.secretManager = secretManager
|
||||
kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient, kubelet.secretManager)
|
||||
configMapManager := configmap.NewSimpleConfigMapManager(kubelet.kubeClient)
|
||||
kubelet.configMapManager = configMapManager
|
||||
kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient, kubelet.secretManager, kubelet.configMapManager)
|
||||
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{})
|
||||
diskSpaceManager, err := newDiskSpaceManager(mockCadvisor, DiskSpacePolicy{})
|
||||
if err != nil {
|
||||
@ -276,7 +279,7 @@ func newTestKubeletWithImageList(
|
||||
|
||||
plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil}
|
||||
kubelet.volumePluginMgr, err =
|
||||
NewInitializedVolumePluginMgr(kubelet, kubelet.secretManager, []volume.VolumePlugin{plug})
|
||||
NewInitializedVolumePluginMgr(kubelet, kubelet.secretManager, kubelet.configMapManager, []volume.VolumePlugin{plug})
|
||||
require.NoError(t, err, "Failed to initialize VolumePluginMgr")
|
||||
|
||||
kubelet.mounter = &mount.FakeMounter{}
|
||||
|
@ -18,6 +18,7 @@ go_library(
|
||||
deps = [
|
||||
"//pkg/api/v1:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||
"//pkg/kubelet/configmap:go_default_library",
|
||||
"//pkg/kubelet/container:go_default_library",
|
||||
"//pkg/kubelet/secret:go_default_library",
|
||||
"//pkg/kubelet/types:go_default_library",
|
||||
@ -38,6 +39,7 @@ go_test(
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/api/v1:go_default_library",
|
||||
"//pkg/kubelet/configmap:go_default_library",
|
||||
"//pkg/kubelet/container:go_default_library",
|
||||
"//pkg/kubelet/pod/testing:go_default_library",
|
||||
"//pkg/kubelet/secret:go_default_library",
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/configmap"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/secret"
|
||||
)
|
||||
@ -113,17 +114,19 @@ type basicManager struct {
|
||||
// Mirror pod UID to pod UID map.
|
||||
translationByUID map[types.UID]types.UID
|
||||
|
||||
// basicManager is keeping secretManager up-to-date.
|
||||
secretManager secret.Manager
|
||||
// basicManager is keeping secretManager and configMapManager up-to-date.
|
||||
secretManager secret.Manager
|
||||
configMapManager configmap.Manager
|
||||
|
||||
// A mirror pod client to create/delete mirror pods.
|
||||
MirrorClient
|
||||
}
|
||||
|
||||
// NewBasicPodManager returns a functional Manager.
|
||||
func NewBasicPodManager(client MirrorClient, secretManager secret.Manager) Manager {
|
||||
func NewBasicPodManager(client MirrorClient, secretManager secret.Manager, configMapManager configmap.Manager) Manager {
|
||||
pm := &basicManager{}
|
||||
pm.secretManager = secretManager
|
||||
pm.configMapManager = configMapManager
|
||||
pm.MirrorClient = client
|
||||
pm.SetPods(nil)
|
||||
return pm
|
||||
@ -163,6 +166,11 @@ func (pm *basicManager) updatePodsInternal(pods ...*v1.Pod) {
|
||||
// not register pod, as it doesn't really matter.
|
||||
pm.secretManager.RegisterPod(pod)
|
||||
}
|
||||
if pm.configMapManager != nil {
|
||||
// TODO: Consider detecting only status update and in such case do
|
||||
// not register pod, as it doesn't really matter.
|
||||
pm.configMapManager.RegisterPod(pod)
|
||||
}
|
||||
podFullName := kubecontainer.GetPodFullName(pod)
|
||||
if IsMirrorPod(pod) {
|
||||
pm.mirrorPodByUID[pod.UID] = pod
|
||||
@ -186,6 +194,9 @@ func (pm *basicManager) DeletePod(pod *v1.Pod) {
|
||||
if pm.secretManager != nil {
|
||||
pm.secretManager.UnregisterPod(pod)
|
||||
}
|
||||
if pm.configMapManager != nil {
|
||||
pm.configMapManager.UnregisterPod(pod)
|
||||
}
|
||||
podFullName := kubecontainer.GetPodFullName(pod)
|
||||
if IsMirrorPod(pod) {
|
||||
delete(pm.mirrorPodByUID, pod.UID)
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/configmap"
|
||||
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/secret"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
@ -32,7 +33,8 @@ import (
|
||||
func newTestManager() (*basicManager, *podtest.FakeMirrorClient) {
|
||||
fakeMirrorClient := podtest.NewFakeMirrorClient()
|
||||
secretManager := secret.NewFakeManager()
|
||||
manager := NewBasicPodManager(fakeMirrorClient, secretManager).(*basicManager)
|
||||
configMapManager := configmap.NewFakeManager()
|
||||
manager := NewBasicPodManager(fakeMirrorClient, secretManager, configMapManager).(*basicManager)
|
||||
return manager, fakeMirrorClient
|
||||
}
|
||||
|
||||
|
@ -99,7 +99,7 @@ func setTestProbe(pod *v1.Pod, probeType probeType, probeSpec v1.Probe) {
|
||||
func newTestManager() *manager {
|
||||
refManager := kubecontainer.NewRefManager()
|
||||
refManager.SetRef(testContainerID, &v1.ObjectReference{}) // Suppress prober warnings.
|
||||
podManager := kubepod.NewBasicPodManager(nil, nil)
|
||||
podManager := kubepod.NewBasicPodManager(nil, nil, nil)
|
||||
// Add test pod to pod manager, so that status manager can get the pod from pod manager if needed.
|
||||
podManager.AddPod(getTestPod())
|
||||
m := NewManager(
|
||||
|
@ -118,7 +118,7 @@ func TestDoProbe(t *testing.T) {
|
||||
}
|
||||
|
||||
// Clean up.
|
||||
m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil), &statustest.FakePodDeletionSafetyProvider{})
|
||||
m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil, nil), &statustest.FakePodDeletionSafetyProvider{})
|
||||
resultsManager(m, probeType).Remove(testContainerID)
|
||||
}
|
||||
}
|
||||
|
@ -34,6 +34,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
||||
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm"
|
||||
"k8s.io/kubernetes/pkg/kubelet/configmap"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/eviction"
|
||||
@ -63,8 +64,9 @@ func TestRunOnce(t *testing.T) {
|
||||
Capacity: 10 * mb,
|
||||
}, nil)
|
||||
fakeSecretManager := secret.NewFakeManager()
|
||||
fakeConfigMapManager := configmap.NewFakeManager()
|
||||
podManager := kubepod.NewBasicPodManager(
|
||||
podtest.NewFakeMirrorClient(), fakeSecretManager)
|
||||
podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager)
|
||||
diskSpaceManager, _ := newDiskSpaceManager(cadvisor, DiskSpacePolicy{})
|
||||
fakeRuntime := &containertest.FakeRuntime{}
|
||||
basePath, err := utiltesting.MkTmpdir("kubelet")
|
||||
@ -93,7 +95,7 @@ func TestRunOnce(t *testing.T) {
|
||||
|
||||
plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil}
|
||||
kb.volumePluginMgr, err =
|
||||
NewInitializedVolumePluginMgr(kb, fakeSecretManager, []volume.VolumePlugin{plug})
|
||||
NewInitializedVolumePluginMgr(kb, fakeSecretManager, fakeConfigMapManager, []volume.VolumePlugin{plug})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to initialize VolumePluginMgr: %v", err)
|
||||
}
|
||||
|
@ -48,6 +48,7 @@ go_test(
|
||||
"//pkg/api/v1/pod:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
|
||||
"//pkg/kubelet/configmap:go_default_library",
|
||||
"//pkg/kubelet/container:go_default_library",
|
||||
"//pkg/kubelet/pod:go_default_library",
|
||||
"//pkg/kubelet/pod/testing:go_default_library",
|
||||
|
@ -36,6 +36,7 @@ import (
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
||||
kubeconfigmap "k8s.io/kubernetes/pkg/kubelet/configmap"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
||||
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
|
||||
@ -74,7 +75,7 @@ func (m *manager) testSyncBatch() {
|
||||
}
|
||||
|
||||
func newTestManager(kubeClient clientset.Interface) *manager {
|
||||
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager())
|
||||
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager(), kubeconfigmap.NewFakeManager())
|
||||
podManager.AddPod(getTestPod())
|
||||
return NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{}).(*manager)
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/kubelet/configmap"
|
||||
"k8s.io/kubernetes/pkg/kubelet/secret"
|
||||
"k8s.io/kubernetes/pkg/util/io"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
@ -39,11 +40,13 @@ import (
|
||||
func NewInitializedVolumePluginMgr(
|
||||
kubelet *Kubelet,
|
||||
secretManager secret.Manager,
|
||||
configMapManager configmap.Manager,
|
||||
plugins []volume.VolumePlugin) (*volume.VolumePluginMgr, error) {
|
||||
kvh := &kubeletVolumeHost{
|
||||
kubelet: kubelet,
|
||||
volumePluginMgr: volume.VolumePluginMgr{},
|
||||
secretManager: secretManager,
|
||||
kubelet: kubelet,
|
||||
volumePluginMgr: volume.VolumePluginMgr{},
|
||||
secretManager: secretManager,
|
||||
configMapManager: configMapManager,
|
||||
}
|
||||
|
||||
if err := kvh.volumePluginMgr.InitPlugins(plugins, kvh); err != nil {
|
||||
@ -63,9 +66,10 @@ func (kvh *kubeletVolumeHost) GetPluginDir(pluginName string) string {
|
||||
}
|
||||
|
||||
type kubeletVolumeHost struct {
|
||||
kubelet *Kubelet
|
||||
volumePluginMgr volume.VolumePluginMgr
|
||||
secretManager secret.Manager
|
||||
kubelet *Kubelet
|
||||
volumePluginMgr volume.VolumePluginMgr
|
||||
secretManager secret.Manager
|
||||
configMapManager configmap.Manager
|
||||
}
|
||||
|
||||
func (kvh *kubeletVolumeHost) GetPodVolumeDir(podUID types.UID, pluginName string, volumeName string) string {
|
||||
@ -141,6 +145,10 @@ func (kvh *kubeletVolumeHost) GetSecretFunc() func(namespace, name string) (*v1.
|
||||
return kvh.secretManager.GetSecret
|
||||
}
|
||||
|
||||
func (kvh *kubeletVolumeHost) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) {
|
||||
return kvh.configMapManager.GetConfigMap
|
||||
}
|
||||
|
||||
func (kvh *kubeletVolumeHost) GetNodeLabels() (map[string]string, error) {
|
||||
node, err := kvh.kubelet.GetNode()
|
||||
if err != nil {
|
||||
|
@ -47,6 +47,7 @@ go_test(
|
||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
|
||||
"//pkg/kubelet/config:go_default_library",
|
||||
"//pkg/kubelet/configmap:go_default_library",
|
||||
"//pkg/kubelet/container/testing:go_default_library",
|
||||
"//pkg/kubelet/pod:go_default_library",
|
||||
"//pkg/kubelet/pod/testing:go_default_library",
|
||||
|
@ -53,6 +53,7 @@ go_test(
|
||||
deps = [
|
||||
"//pkg/api/v1:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
|
||||
"//pkg/kubelet/configmap:go_default_library",
|
||||
"//pkg/kubelet/container/testing:go_default_library",
|
||||
"//pkg/kubelet/pod:go_default_library",
|
||||
"//pkg/kubelet/pod/testing:go_default_library",
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
||||
"k8s.io/kubernetes/pkg/kubelet/configmap"
|
||||
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
|
||||
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
||||
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
|
||||
@ -40,8 +41,9 @@ func TestFindAndAddNewPods_FindAndRemoveDeletedPods(t *testing.T) {
|
||||
fakeClient := &fake.Clientset{}
|
||||
|
||||
fakeSecretManager := secret.NewFakeManager()
|
||||
fakeConfigMapManager := configmap.NewFakeManager()
|
||||
fakePodManager := kubepod.NewBasicPodManager(
|
||||
podtest.NewFakeMirrorClient(), fakeSecretManager)
|
||||
podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager)
|
||||
|
||||
fakesDSW := cache.NewDesiredStateOfWorld(fakeVolumePluginMgr)
|
||||
fakeRuntime := &containertest.FakeRuntime{}
|
||||
|
@ -32,6 +32,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
"k8s.io/kubernetes/pkg/kubelet/configmap"
|
||||
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/pod"
|
||||
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
||||
@ -56,7 +57,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) {
|
||||
t.Fatalf("can't make a temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager())
|
||||
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager())
|
||||
|
||||
node, pod, pv, claim := createObjects()
|
||||
kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
|
||||
@ -101,7 +102,7 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) {
|
||||
t.Fatalf("can't make a temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager())
|
||||
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager())
|
||||
|
||||
node, pod, _, claim := createObjects()
|
||||
|
||||
|
@ -42,13 +42,15 @@ const (
|
||||
|
||||
// configMapPlugin implements the VolumePlugin interface.
|
||||
type configMapPlugin struct {
|
||||
host volume.VolumeHost
|
||||
host volume.VolumeHost
|
||||
getConfigMap func(namespace, name string) (*v1.ConfigMap, error)
|
||||
}
|
||||
|
||||
var _ volume.VolumePlugin = &configMapPlugin{}
|
||||
|
||||
func (plugin *configMapPlugin) Init(host volume.VolumeHost) error {
|
||||
plugin.host = host
|
||||
plugin.getConfigMap = host.GetConfigMapFunc()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -86,14 +88,32 @@ func (plugin *configMapPlugin) SupportsBulkVolumeVerification() bool {
|
||||
|
||||
func (plugin *configMapPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) {
|
||||
return &configMapVolumeMounter{
|
||||
configMapVolume: &configMapVolume{spec.Name(), pod.UID, plugin, plugin.host.GetMounter(), plugin.host.GetWriter(), volume.MetricsNil{}},
|
||||
source: *spec.Volume.ConfigMap,
|
||||
pod: *pod,
|
||||
opts: &opts}, nil
|
||||
configMapVolume: &configMapVolume{
|
||||
spec.Name(),
|
||||
pod.UID,
|
||||
plugin,
|
||||
plugin.host.GetMounter(),
|
||||
plugin.host.GetWriter(),
|
||||
volume.MetricsNil{},
|
||||
},
|
||||
source: *spec.Volume.ConfigMap,
|
||||
pod: *pod,
|
||||
opts: &opts,
|
||||
getConfigMap: plugin.getConfigMap,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (plugin *configMapPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
|
||||
return &configMapVolumeUnmounter{&configMapVolume{volName, podUID, plugin, plugin.host.GetMounter(), plugin.host.GetWriter(), volume.MetricsNil{}}}, nil
|
||||
return &configMapVolumeUnmounter{
|
||||
&configMapVolume{
|
||||
volName,
|
||||
podUID,
|
||||
plugin,
|
||||
plugin.host.GetMounter(),
|
||||
plugin.host.GetWriter(),
|
||||
volume.MetricsNil{},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (plugin *configMapPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
|
||||
@ -126,9 +146,10 @@ func (sv *configMapVolume) GetPath() string {
|
||||
type configMapVolumeMounter struct {
|
||||
*configMapVolume
|
||||
|
||||
source v1.ConfigMapVolumeSource
|
||||
pod v1.Pod
|
||||
opts *volume.VolumeOptions
|
||||
source v1.ConfigMapVolumeSource
|
||||
pod v1.Pod
|
||||
opts *volume.VolumeOptions
|
||||
getConfigMap func(namespace, name string) (*v1.ConfigMap, error)
|
||||
}
|
||||
|
||||
var _ volume.Mounter = &configMapVolumeMounter{}
|
||||
@ -174,13 +195,8 @@ func (b *configMapVolumeMounter) SetUpAt(dir string, fsGroup *types.UnixGroupID)
|
||||
return err
|
||||
}
|
||||
|
||||
kubeClient := b.plugin.host.GetKubeClient()
|
||||
if kubeClient == nil {
|
||||
return fmt.Errorf("Cannot setup configMap volume %v because kube client is not configured", b.volName)
|
||||
}
|
||||
|
||||
optional := b.source.Optional != nil && *b.source.Optional
|
||||
configMap, err := kubeClient.Core().ConfigMaps(b.pod.Namespace).Get(b.source.Name, metav1.GetOptions{})
|
||||
configMap, err := b.getConfigMap(b.pod.Namespace, b.source.Name)
|
||||
if err != nil {
|
||||
if !(errors.IsNotFound(err) && optional) {
|
||||
glog.Errorf("Couldn't get configMap %v/%v: %v", b.pod.Namespace, b.source.Name, err)
|
||||
|
@ -233,6 +233,9 @@ type VolumeHost interface {
|
||||
// Returns a function that returns a secret.
|
||||
GetSecretFunc() func(namespace, name string) (*v1.Secret, error)
|
||||
|
||||
// Returns a function that returns a configmap.
|
||||
GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error)
|
||||
|
||||
// Returns the labels on the node
|
||||
GetNodeLabels() (map[string]string, error)
|
||||
}
|
||||
|
@ -45,8 +45,9 @@ const (
|
||||
)
|
||||
|
||||
type projectedPlugin struct {
|
||||
host volume.VolumeHost
|
||||
getSecret func(namespace, name string) (*v1.Secret, error)
|
||||
host volume.VolumeHost
|
||||
getSecret func(namespace, name string) (*v1.Secret, error)
|
||||
getConfigMap func(namespace, name string) (*v1.ConfigMap, error)
|
||||
}
|
||||
|
||||
var _ volume.VolumePlugin = &projectedPlugin{}
|
||||
@ -68,6 +69,7 @@ func getPath(uid types.UID, volName string, host volume.VolumeHost) string {
|
||||
func (plugin *projectedPlugin) Init(host volume.VolumeHost) error {
|
||||
plugin.host = host
|
||||
plugin.getSecret = host.GetSecretFunc()
|
||||
plugin.getConfigMap = host.GetConfigMapFunc()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -235,10 +237,10 @@ func (s *projectedVolumeMounter) collectData() (map[string]volumeutil.FileProjec
|
||||
secretapi, err := s.plugin.getSecret(s.pod.Namespace, source.Secret.Name)
|
||||
if err != nil {
|
||||
if !(errors.IsNotFound(err) && optional) {
|
||||
glog.Errorf("Couldn't get secret %v/%v", s.pod.Namespace, source.Secret.Name)
|
||||
glog.Errorf("Couldn't get secret %v/%v: %v", s.pod.Namespace, source.Secret.Name, err)
|
||||
errlist = append(errlist, err)
|
||||
continue
|
||||
}
|
||||
|
||||
secretapi = &v1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: s.pod.Namespace,
|
||||
@ -248,17 +250,16 @@ func (s *projectedVolumeMounter) collectData() (map[string]volumeutil.FileProjec
|
||||
}
|
||||
secretPayload, err := secret.MakePayload(source.Secret.Items, secretapi, s.source.DefaultMode, optional)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get secret %v/%v: %v", s.pod.Namespace, source.Secret.Name, err)
|
||||
glog.Errorf("Couldn't get secret payload %v/%v: %v", s.pod.Namespace, source.Secret.Name, err)
|
||||
errlist = append(errlist, err)
|
||||
continue
|
||||
}
|
||||
|
||||
for k, v := range secretPayload {
|
||||
payload[k] = v
|
||||
}
|
||||
} else if source.ConfigMap != nil {
|
||||
optional := source.ConfigMap.Optional != nil && *source.ConfigMap.Optional
|
||||
configMap, err := kubeClient.Core().ConfigMaps(s.pod.Namespace).Get(source.ConfigMap.Name, metav1.GetOptions{})
|
||||
configMap, err := s.plugin.getConfigMap(s.pod.Namespace, source.ConfigMap.Name)
|
||||
if err != nil {
|
||||
if !(errors.IsNotFound(err) && optional) {
|
||||
glog.Errorf("Couldn't get configMap %v/%v: %v", s.pod.Namespace, source.ConfigMap.Name, err)
|
||||
@ -274,6 +275,7 @@ func (s *projectedVolumeMounter) collectData() (map[string]volumeutil.FileProjec
|
||||
}
|
||||
configMapPayload, err := configmap.MakePayload(source.ConfigMap.Items, configMap, s.source.DefaultMode, optional)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get configMap payload %v/%v: %v", s.pod.Namespace, source.ConfigMap.Name, err)
|
||||
errlist = append(errlist, err)
|
||||
continue
|
||||
}
|
||||
|
@ -505,7 +505,8 @@ func TestCollectDataWithConfigMap(t *testing.T) {
|
||||
sources: source.Sources,
|
||||
podUID: pod.UID,
|
||||
plugin: &projectedPlugin{
|
||||
host: host,
|
||||
host: host,
|
||||
getConfigMap: host.GetConfigMapFunc(),
|
||||
},
|
||||
},
|
||||
source: *source,
|
||||
|
@ -198,7 +198,7 @@ func (b *secretVolumeMounter) SetUpAt(dir string, fsGroup *types.UnixGroupID) er
|
||||
secret, err := b.getSecret(b.pod.Namespace, b.source.SecretName)
|
||||
if err != nil {
|
||||
if !(errors.IsNotFound(err) && optional) {
|
||||
glog.Errorf("Couldn't get secret %v/%v", b.pod.Namespace, b.source.SecretName)
|
||||
glog.Errorf("Couldn't get secret %v/%v: %v", b.pod.Namespace, b.source.SecretName, err)
|
||||
return err
|
||||
}
|
||||
secret = &v1.Secret{
|
||||
|
@ -134,6 +134,12 @@ func (f *fakeVolumeHost) GetSecretFunc() func(namespace, name string) (*v1.Secre
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fakeVolumeHost) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) {
|
||||
return func(namespace, name string) (*v1.ConfigMap, error) {
|
||||
return f.kubeClient.Core().ConfigMaps(namespace).Get(name, metav1.GetOptions{})
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fakeVolumeHost) GetNodeLabels() (map[string]string, error) {
|
||||
return map[string]string{"test-label": "test-value"}, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user