Merge pull request #64752 from wojtek-t/default_to_watching_managers

Automatic merge from submit-queue (batch tested with PRs 65187, 65206, 65223, 64752, 65238). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Kubelet watches necessary secrets/configmaps instead of periodic polling
This commit is contained in:
Kubernetes Submit Queue 2018-06-21 19:48:14 -07:00 committed by GitHub
commit 96c7f3a34a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 122 additions and 5 deletions

View File

@ -96,6 +96,7 @@ KubeletConfiguration:
clusterDNS:
- 10.96.0.10
clusterDomain: cluster.local
configMapAndSecretChangeDetectionStrategy: Watch
containerLogMaxFiles: 5
containerLogMaxSize: 10Mi
contentType: application/vnd.kubernetes.protobuf

View File

@ -85,6 +85,7 @@ kubeletConfiguration:
clusterDNS:
- 10.96.0.10
clusterDomain: cluster.local
configMapAndSecretChangeDetectionStrategy: Watch
containerLogMaxFiles: 5
containerLogMaxSize: 10Mi
contentType: application/vnd.kubernetes.protobuf

View File

@ -82,6 +82,7 @@ kubeletConfiguration:
clusterDNS:
- 10.96.0.10
clusterDomain: cluster.local
configMapAndSecretChangeDetectionStrategy: Watch
containerLogMaxFiles: 5
containerLogMaxSize: 10Mi
contentType: application/vnd.kubernetes.protobuf

View File

@ -86,6 +86,7 @@ kubeletConfiguration:
clusterDNS:
- 10.96.0.10
clusterDomain: cluster.local
configMapAndSecretChangeDetectionStrategy: Watch
containerLogMaxFiles: 5
containerLogMaxSize: 10Mi
contentType: application/vnd.kubernetes.protobuf

View File

@ -81,6 +81,7 @@ kubeletConfiguration:
clusterDNS:
- 10.192.0.10
clusterDomain: cluster.global
configMapAndSecretChangeDetectionStrategy: Watch
containerLogMaxFiles: 5
containerLogMaxSize: 10Mi
contentType: application/vnd.kubernetes.protobuf

View File

@ -153,6 +153,7 @@ var (
"CgroupsPerQOS",
"ClusterDNS[*]",
"ClusterDomain",
"ConfigMapAndSecretChangeDetectionStrategy",
"ContainerLogMaxFiles",
"ContainerLogMaxSize",
"ContentType",

View File

@ -39,6 +39,23 @@ const (
HairpinNone = "none"
)
// ResourceChangeDetectionStrategy denotes a mode in which internal
// managers (secret, configmap) are discovering object changes.
type ResourceChangeDetectionStrategy string
// Enum settings for different strategies of kubelet managers.
const (
// GetChangeDetectionStrategy is a mode in which kubelet fetches
// necessary objects directly from apiserver.
GetChangeDetectionStrategy ResourceChangeDetectionStrategy = "Get"
// TTLCacheChangeDetectionStrategy is a mode in which kubelet uses
// ttl cache for object directly fetched from apiserver.
TTLCacheChangeDetectionStrategy ResourceChangeDetectionStrategy = "Cache"
// WatchChangeDetectionStrategy is a mode in which kubelet uses
// watches to observe changes to objects that are in its interest.
WatchChangeDetectionStrategy ResourceChangeDetectionStrategy = "Watch"
)
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// KubeletConfiguration contains the configuration for the Kubelet
@ -259,6 +276,8 @@ type KubeletConfiguration struct {
ContainerLogMaxSize string
// Maximum number of container log files that can be present for a container.
ContainerLogMaxFiles int32
// ConfigMapAndSecretChangeDetectionStrategy is a mode in which config map and secret managers are running.
ConfigMapAndSecretChangeDetectionStrategy ResourceChangeDetectionStrategy
/* the following fields are meant for Node Allocatable */

View File

@ -198,6 +198,9 @@ func SetDefaults_KubeletConfiguration(obj *KubeletConfiguration) {
if obj.ContainerLogMaxFiles == nil {
obj.ContainerLogMaxFiles = utilpointer.Int32Ptr(5)
}
if obj.ConfigMapAndSecretChangeDetectionStrategy == "" {
obj.ConfigMapAndSecretChangeDetectionStrategy = WatchChangeDetectionStrategy
}
if obj.EnforceNodeAllocatable == nil {
obj.EnforceNodeAllocatable = DefaultNodeAllocatableEnforcement
}

View File

@ -39,6 +39,23 @@ const (
HairpinNone = "none"
)
// ResourceChangeDetectionStrategy denotes a mode in which internal
// managers (secret, configmap) are discovering object changes.
type ResourceChangeDetectionStrategy string
// Enum settings for different strategies of kubelet managers.
const (
// GetChangeDetectionStrategy is a mode in which kubelet fetches
// necessary objects directly from apiserver.
GetChangeDetectionStrategy ResourceChangeDetectionStrategy = "Get"
// TTLCacheChangeDetectionStrategy is a mode in which kubelet uses
// ttl cache for object directly fetched from apiserver.
TTLCacheChangeDetectionStrategy ResourceChangeDetectionStrategy = "Cache"
// WatchChangeDetectionStrategy is a mode in which kubelet uses
// watches to observe changes to objects that are in its interest.
WatchChangeDetectionStrategy ResourceChangeDetectionStrategy = "Watch"
)
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// KubeletConfiguration contains the configuration for the Kubelet
@ -612,6 +629,11 @@ type KubeletConfiguration struct {
// Default: 5
// +optional
ContainerLogMaxFiles *int32 `json:"containerLogMaxFiles,omitempty"`
// ConfigMapAndSecretChangeDetectionStrategy is a mode in which
// config map and secret managers are running.
// Default: "Watching"
// +optional
ConfigMapAndSecretChangeDetectionStrategy ResourceChangeDetectionStrategy `json:"configMapAndSecretChangeDetectionStrategy,omitempty"`
/* the following fields are meant for Node Allocatable */

View File

@ -253,6 +253,7 @@ func autoConvert_v1beta1_KubeletConfiguration_To_kubeletconfig_KubeletConfigurat
if err := v1.Convert_Pointer_int32_To_int32(&in.ContainerLogMaxFiles, &out.ContainerLogMaxFiles, s); err != nil {
return err
}
out.ConfigMapAndSecretChangeDetectionStrategy = kubeletconfig.ResourceChangeDetectionStrategy(in.ConfigMapAndSecretChangeDetectionStrategy)
out.SystemReserved = *(*map[string]string)(unsafe.Pointer(&in.SystemReserved))
out.KubeReserved = *(*map[string]string)(unsafe.Pointer(&in.KubeReserved))
out.SystemReservedCgroup = in.SystemReservedCgroup
@ -377,6 +378,7 @@ func autoConvert_kubeletconfig_KubeletConfiguration_To_v1beta1_KubeletConfigurat
if err := v1.Convert_int32_To_Pointer_int32(&in.ContainerLogMaxFiles, &out.ContainerLogMaxFiles, s); err != nil {
return err
}
out.ConfigMapAndSecretChangeDetectionStrategy = ResourceChangeDetectionStrategy(in.ConfigMapAndSecretChangeDetectionStrategy)
out.SystemReserved = *(*map[string]string)(unsafe.Pointer(&in.SystemReserved))
out.KubeReserved = *(*map[string]string)(unsafe.Pointer(&in.KubeReserved))
out.SystemReservedCgroup = in.SystemReservedCgroup

View File

@ -15,12 +15,14 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/kubelet/configmap",
deps = [
"//pkg/api/v1/pod:go_default_library",
"//pkg/apis/core/v1:go_default_library",
"//pkg/kubelet/util/manager:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
],
)

View File

@ -23,12 +23,14 @@ import (
"k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
corev1 "k8s.io/kubernetes/pkg/apis/core/v1"
"k8s.io/kubernetes/pkg/kubelet/util/manager"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
)
type Manager interface {
@ -123,3 +125,25 @@ func NewCachingConfigMapManager(kubeClient clientset.Interface, getTTL manager.G
manager: manager.NewCacheBasedManager(configMapStore, getConfigMapNames),
}
}
// NewWatchingConfigMapManager creates a manager that keeps a cache of all configmaps
// necessary for registered pods.
// It implements the following logic:
// - whenever a pod is created or updated, we start inidvidual watches for all
// referenced objects that aren't referenced from other registered pods
// - every GetObject() returns a value from local cache propagated via watches
func NewWatchingConfigMapManager(kubeClient clientset.Interface) Manager {
listConfigMap := func(namespace string, opts metav1.ListOptions) (runtime.Object, error) {
return kubeClient.CoreV1().ConfigMaps(namespace).List(opts)
}
watchConfigMap := func(namespace string, opts metav1.ListOptions) (watch.Interface, error) {
return kubeClient.CoreV1().ConfigMaps(namespace).Watch(opts)
}
newConfigMap := func() runtime.Object {
return &v1.ConfigMap{}
}
gr := corev1.Resource("configmap")
return &configMapManager{
manager: manager.NewWatchBasedManager(listConfigMap, watchConfigMap, newConfigMap, gr, getConfigMapNames),
}
}

View File

@ -551,12 +551,25 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.cloudproviderRequestTimeout = 10 * time.Second
}
secretManager := secret.NewCachingSecretManager(
kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
klet.secretManager = secretManager
var secretManager secret.Manager
var configMapManager configmap.Manager
switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {
case kubeletconfiginternal.WatchChangeDetectionStrategy:
secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient)
configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient)
case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:
secretManager = secret.NewCachingSecretManager(
kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
configMapManager = configmap.NewCachingConfigMapManager(
kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
case kubeletconfiginternal.GetChangeDetectionStrategy:
secretManager = secret.NewSimpleSecretManager(kubeDeps.KubeClient)
configMapManager = configmap.NewSimpleConfigMapManager(kubeDeps.KubeClient)
default:
return nil, fmt.Errorf("unknown configmap and secret manager mode: %v", kubeCfg.ConfigMapAndSecretChangeDetectionStrategy)
}
configMapManager := configmap.NewCachingConfigMapManager(
kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
klet.secretManager = secretManager
klet.configMapManager = configMapManager
if klet.experimentalHostUserNamespaceDefaulting {

View File

@ -30,12 +30,14 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/kubelet/secret",
deps = [
"//pkg/api/v1/pod:go_default_library",
"//pkg/apis/core/v1:go_default_library",
"//pkg/kubelet/util/manager:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
],
)

View File

@ -23,12 +23,14 @@ import (
"k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
corev1 "k8s.io/kubernetes/pkg/apis/core/v1"
"k8s.io/kubernetes/pkg/kubelet/util/manager"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
)
type Manager interface {
@ -123,3 +125,25 @@ func NewCachingSecretManager(kubeClient clientset.Interface, getTTL manager.GetO
manager: manager.NewCacheBasedManager(secretStore, getSecretNames),
}
}
// NewWatchingSecretManager creates a manager that keeps a cache of all secrets
// necessary for registered pods.
// It implements the following logic:
// - whenever a pod is created or updated, we start inidvidual watches for all
// referenced objects that aren't referenced from other registered pods
// - every GetObject() returns a value from local cache propagated via watches
func NewWatchingSecretManager(kubeClient clientset.Interface) Manager {
listSecret := func(namespace string, opts metav1.ListOptions) (runtime.Object, error) {
return kubeClient.CoreV1().Secrets(namespace).List(opts)
}
watchSecret := func(namespace string, opts metav1.ListOptions) (watch.Interface, error) {
return kubeClient.CoreV1().Secrets(namespace).Watch(opts)
}
newSecret := func() runtime.Object {
return &v1.Secret{}
}
gr := corev1.Resource("secret")
return &secretManager{
manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, gr, getSecretNames),
}
}