diff --git a/cmd/kube-controller-manager/app/BUILD b/cmd/kube-controller-manager/app/BUILD index 299c54b7cf1..5138b40f062 100644 --- a/cmd/kube-controller-manager/app/BUILD +++ b/cmd/kube-controller-manager/app/BUILD @@ -64,6 +64,7 @@ go_library( "//pkg/controller/service:go_default_library", "//pkg/controller/serviceaccount:go_default_library", "//pkg/controller/statefulset:go_default_library", + "//pkg/controller/ttl:go_default_library", "//pkg/controller/volume/attachdetach:go_default_library", "//pkg/controller/volume/persistentvolume:go_default_library", "//pkg/features:go_default_library", diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 8c22ddc1f5f..a6779771922 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -287,6 +287,7 @@ func newControllerInitializers() map[string]InitFunc { controllers["statefuleset"] = startStatefulSetController controllers["cronjob"] = startCronJobController controllers["certificatesigningrequests"] = startCSRController + controllers["ttl"] = startTTLController return controllers } diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 6c61e03aad9..8eae4791cb0 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -40,6 +40,7 @@ import ( replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota" serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" + ttlcontroller "k8s.io/kubernetes/pkg/controller/ttl" quotainstall "k8s.io/kubernetes/pkg/quota/install" ) @@ -141,6 +142,14 @@ func startServiceAccountController(ctx ControllerContext) (bool, error) { return true, nil } +func startTTLController(ctx ControllerContext) (bool, error) { + go ttlcontroller.NewTTLController( + ctx.NewInformerFactory.Core().V1().Nodes(), + ctx.ClientBuilder.ClientOrDie("ttl-controller"), + ).Run(5, ctx.Stop) + return true, nil +} + func startGarbageCollectorController(ctx ControllerContext) (bool, error) { if !ctx.Options.EnableGarbageCollector { return false, nil diff --git a/pkg/api/helpers.go b/pkg/api/helpers.go index 7ac8d9689b3..256cfbcd4fe 100644 --- a/pkg/api/helpers.go +++ b/pkg/api/helpers.go @@ -466,6 +466,11 @@ const ( // is at-your-own-risk. Pods that attempt to set an unsafe sysctl that is not enabled for a kubelet // will fail to launch. UnsafeSysctlsPodAnnotationKey string = "security.alpha.kubernetes.io/unsafe-sysctls" + + // ObjectTTLAnnotations represents a suggestion for kubelet for how long it can cache + // an object (e.g. secret, config map) before fetching it again from apiserver. + // This annotation can be attached to node. + ObjectTTLAnnotationKey string = "node.alpha.kubernetes.io/ttl" ) // TolerationToleratesTaint checks if the toleration tolerates the taint. diff --git a/pkg/api/v1/helpers.go b/pkg/api/v1/helpers.go index e5029021794..62b85d91f12 100644 --- a/pkg/api/v1/helpers.go +++ b/pkg/api/v1/helpers.go @@ -271,6 +271,11 @@ const ( // is at-your-own-risk. Pods that attempt to set an unsafe sysctl that is not enabled for a kubelet // will fail to launch. UnsafeSysctlsPodAnnotationKey string = "security.alpha.kubernetes.io/unsafe-sysctls" + + // ObjectTTLAnnotations represents a suggestion for kubelet for how long it can cache + // an object (e.g. secret, config map) before fetching it again from apiserver. + // This annotation can be attached to node. + ObjectTTLAnnotationKey string = "node.alpha.kubernetes.io/ttl" ) // GetTolerationsFromPodAnnotations gets the json serialized tolerations data from Pod.Annotations diff --git a/pkg/controller/BUILD b/pkg/controller/BUILD index 1d70433670a..9f4848a9c7f 100644 --- a/pkg/controller/BUILD +++ b/pkg/controller/BUILD @@ -105,6 +105,7 @@ filegroup( "//pkg/controller/service:all-srcs", "//pkg/controller/serviceaccount:all-srcs", "//pkg/controller/statefulset:all-srcs", + "//pkg/controller/ttl:all-srcs", "//pkg/controller/volume/attachdetach:all-srcs", "//pkg/controller/volume/persistentvolume:all-srcs", ], diff --git a/pkg/controller/ttl/BUILD b/pkg/controller/ttl/BUILD new file mode 100644 index 00000000000..2cb311ae557 --- /dev/null +++ b/pkg/controller/ttl/BUILD @@ -0,0 +1,62 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_library( + name = "go_default_library", + srcs = ["ttlcontroller.go"], + tags = ["automanaged"], + deps = [ + "//pkg/api:go_default_library", + "//pkg/api/v1:go_default_library", + "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/client/informers/informers_generated/core/v1:go_default_library", + "//pkg/client/listers/core/v1:go_default_library", + "//pkg/controller:go_default_library", + "//vendor:github.com/golang/glog", + "//vendor:k8s.io/apimachinery/pkg/api/errors", + "//vendor:k8s.io/apimachinery/pkg/types", + "//vendor:k8s.io/apimachinery/pkg/util/json", + "//vendor:k8s.io/apimachinery/pkg/util/runtime", + "//vendor:k8s.io/apimachinery/pkg/util/strategicpatch", + "//vendor:k8s.io/apimachinery/pkg/util/wait", + "//vendor:k8s.io/client-go/tools/cache", + "//vendor:k8s.io/client-go/util/workqueue", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) + +go_test( + name = "go_default_test", + srcs = ["ttlcontroller_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/api/v1:go_default_library", + "//pkg/client/clientset_generated/clientset/fake:go_default_library", + "//pkg/client/listers/core/v1:go_default_library", + "//vendor:github.com/stretchr/testify/assert", + "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", + "//vendor:k8s.io/client-go/testing", + "//vendor:k8s.io/client-go/tools/cache", + "//vendor:k8s.io/client-go/util/workqueue", + ], +) diff --git a/pkg/controller/ttl/ttlcontroller.go b/pkg/controller/ttl/ttlcontroller.go new file mode 100644 index 00000000000..2d0b921ef77 --- /dev/null +++ b/pkg/controller/ttl/ttlcontroller.go @@ -0,0 +1,295 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// The TTLController sets ttl annotations on nodes, based on cluster size. +// The annotations are consumed by Kubelets as suggestions for how long +// it can cache objects (e.g. secrets or config maps) before refetching +// from apiserver again. +// +// TODO: This is a temporary workaround for the Kubelet not being able to +// send "watch secrets attached to pods from my node" request. Once +// sending such request will be possible, we will modify Kubelet to +// use it and get rid of this controller completely. + +package ttl + +import ( + "fmt" + "math" + "strconv" + "sync" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/json" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/core/v1" + listers "k8s.io/kubernetes/pkg/client/listers/core/v1" + "k8s.io/kubernetes/pkg/controller" + + "github.com/golang/glog" +) + +type TTLController struct { + kubeClient clientset.Interface + + // nodeStore is a local cache of nodes. + nodeStore listers.NodeLister + + // Nodes that need to be synced. + queue workqueue.RateLimitingInterface + + // Returns true if all underlying informers are synced. + hasSynced func() bool + + lock sync.RWMutex + + // Number of nodes in the cluster. + nodeCount int + + // Desired TTL for all nodes in the cluster. + desiredTTLSeconds int + + // In which interval of cluster size we currently are. + boundaryStep int +} + +func NewTTLController(nodeInformer informers.NodeInformer, kubeClient clientset.Interface) *TTLController { + ttlc := &TTLController{ + kubeClient: kubeClient, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ttlcontroller"), + } + + nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: ttlc.addNode, + UpdateFunc: ttlc.updateNode, + DeleteFunc: ttlc.deleteNode, + }) + + ttlc.nodeStore = listers.NewNodeLister(nodeInformer.Informer().GetIndexer()) + ttlc.hasSynced = nodeInformer.Informer().HasSynced + + return ttlc +} + +type ttlBoundary struct { + sizeMin int + sizeMax int + ttlSeconds int +} + +var ( + ttlBoundaries = []ttlBoundary{ + {sizeMin: 0, sizeMax: 100, ttlSeconds: 0}, + {sizeMin: 90, sizeMax: 500, ttlSeconds: 15}, + {sizeMin: 450, sizeMax: 1000, ttlSeconds: 30}, + {sizeMin: 900, sizeMax: 2000, ttlSeconds: 60}, + {sizeMin: 1800, sizeMax: 10000, ttlSeconds: 300}, + {sizeMin: 9000, sizeMax: math.MaxInt32, ttlSeconds: 600}, + } +) + +func (ttlc *TTLController) Run(workers int, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer ttlc.queue.ShutDown() + + glog.Infof("Starting TTL controller") + defer glog.Infof("Shutting down TTL controller") + if !cache.WaitForCacheSync(stopCh, ttlc.hasSynced) { + return + } + + for i := 0; i < workers; i++ { + go wait.Until(ttlc.worker, time.Second, stopCh) + } + + <-stopCh +} + +func (ttlc *TTLController) addNode(obj interface{}) { + node, ok := obj.(*v1.Node) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) + return + } + + func() { + ttlc.lock.Lock() + defer ttlc.lock.Unlock() + ttlc.nodeCount++ + if ttlc.nodeCount > ttlBoundaries[ttlc.boundaryStep].sizeMax { + ttlc.boundaryStep++ + ttlc.desiredTTLSeconds = ttlBoundaries[ttlc.boundaryStep].ttlSeconds + } + }() + ttlc.enqueueNode(node) +} + +func (ttlc *TTLController) updateNode(_, newObj interface{}) { + node, ok := newObj.(*v1.Node) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj)) + return + } + // Processing all updates of nodes guarantees that we will update + // the ttl annotation, when cluster size changes. + // We are relying on the fact that Kubelet is updating node status + // every 10s (or generally every X seconds), which means that whenever + // required, its ttl annotation should be updated within that period. + ttlc.enqueueNode(node) +} + +func (ttlc *TTLController) deleteNode(obj interface{}) { + _, ok := obj.(*v1.Node) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) + return + } + _, ok = tombstone.Obj.(*v1.Node) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object types: %v", obj)) + return + } + } + + func() { + ttlc.lock.Lock() + defer ttlc.lock.Unlock() + ttlc.nodeCount-- + if ttlc.nodeCount < ttlBoundaries[ttlc.boundaryStep].sizeMin { + ttlc.boundaryStep-- + ttlc.desiredTTLSeconds = ttlBoundaries[ttlc.boundaryStep].ttlSeconds + } + }() + // We are not processing the node, as it no longer exists. +} + +func (ttlc *TTLController) enqueueNode(node *v1.Node) { + key, err := controller.KeyFunc(node) + if err != nil { + glog.Errorf("Couldn't get key for object %+v", node) + return + } + ttlc.queue.Add(key) +} + +func (ttlc *TTLController) worker() { + for ttlc.processItem() { + } +} + +func (ttlc *TTLController) processItem() bool { + key, quit := ttlc.queue.Get() + if quit { + return false + } + defer ttlc.queue.Done(key) + + err := ttlc.updateNodeIfNeeded(key.(string)) + if err == nil { + ttlc.queue.Forget(key) + return true + } + + ttlc.queue.AddRateLimited(key) + utilruntime.HandleError(err) + return true +} + +func (ttlc *TTLController) getDesiredTTLSeconds() int { + ttlc.lock.RLock() + defer ttlc.lock.RUnlock() + return ttlc.desiredTTLSeconds +} + +func getIntFromAnnotation(node *v1.Node, annotationKey string) (int, bool) { + if node.Annotations == nil { + return 0, false + } + annotationValue, ok := node.Annotations[annotationKey] + if !ok { + return 0, false + } + intValue, err := strconv.Atoi(annotationValue) + if err != nil { + glog.Warningf("Cannot convert the value %q with annotation key %q for the node %q", + annotationValue, annotationKey, node.Name) + return 0, false + } + return intValue, true +} + +func setIntAnnotation(node *v1.Node, annotationKey string, value int) { + if node.Annotations == nil { + node.Annotations = make(map[string]string) + } + node.Annotations[annotationKey] = strconv.Itoa(value) +} + +func (ttlc *TTLController) patchNodeWithAnnotation(node *v1.Node, annotationKey string, value int) error { + oldData, err := json.Marshal(node) + if err != nil { + return err + } + setIntAnnotation(node, annotationKey, value) + newData, err := json.Marshal(node) + if err != nil { + return err + } + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{}) + if err != nil { + return err + } + _, err = ttlc.kubeClient.Core().Nodes().Patch(node.Name, types.StrategicMergePatchType, patchBytes) + if err != nil { + glog.V(2).Infof("Failed to change ttl annotation for node %s: %v", node.Name, err) + return err + } + glog.V(2).Infof("Changed ttl annotation for node %s to %d seconds", node.Name, value) + return nil +} + +func (ttlc *TTLController) updateNodeIfNeeded(key string) error { + node, err := ttlc.nodeStore.Get(key) + if err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return err + } + + desiredTTL := ttlc.getDesiredTTLSeconds() + currentTTL, ok := getIntFromAnnotation(node, v1.ObjectTTLAnnotationKey) + if ok && currentTTL == desiredTTL { + return nil + } + + objCopy, err := api.Scheme.DeepCopy(node) + if err != nil { + return err + } + return ttlc.patchNodeWithAnnotation(objCopy.(*v1.Node), v1.ObjectTTLAnnotationKey, desiredTTL) +} diff --git a/pkg/controller/ttl/ttlcontroller_test.go b/pkg/controller/ttl/ttlcontroller_test.go new file mode 100644 index 00000000000..1493545e473 --- /dev/null +++ b/pkg/controller/ttl/ttlcontroller_test.go @@ -0,0 +1,231 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ttl + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + core "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + listers "k8s.io/kubernetes/pkg/client/listers/core/v1" + + "github.com/stretchr/testify/assert" +) + +func TestPatchNode(t *testing.T) { + testCases := []struct { + node *v1.Node + ttlSeconds int + patch string + }{ + { + node: &v1.Node{}, + ttlSeconds: 0, + patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"0\"}}}", + }, + { + node: &v1.Node{}, + ttlSeconds: 10, + patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"10\"}}}", + }, + { + node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "name"}}, + ttlSeconds: 10, + patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"10\"}}}", + }, + { + node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{}}}, + ttlSeconds: 10, + patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"10\"}}}", + }, + { + node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{"node.alpha.kubernetes.io/ttl": "0"}}}, + ttlSeconds: 10, + patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"10\"}}}", + }, + { + node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{"node.alpha.kubernetes.io/ttl": "0", "a": "b"}}}, + ttlSeconds: 10, + patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"10\"}}}", + }, + { + node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{"node.alpha.kubernetes.io/ttl": "10", "a": "b"}}}, + ttlSeconds: 10, + patch: "{}", + }, + } + + for i, testCase := range testCases { + fakeClient := &fake.Clientset{} + ttlController := &TTLController{ + kubeClient: fakeClient, + } + err := ttlController.patchNodeWithAnnotation(testCase.node, v1.ObjectTTLAnnotationKey, testCase.ttlSeconds) + if err != nil { + t.Errorf("%d: unexpected error: %v", i, err) + continue + } + actions := fakeClient.Actions() + assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions) + patchAction := actions[0].(core.PatchActionImpl) + assert.Equal(t, testCase.patch, string(patchAction.Patch), "%d: unexpected patch: %s", i, string(patchAction.Patch)) + } +} + +func TestUpdateNodeIfNeeded(t *testing.T) { + testCases := []struct { + node *v1.Node + desiredTTL int + patch string + }{ + { + node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "name"}}, + desiredTTL: 0, + patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"0\"}}}", + }, + { + node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "name"}}, + desiredTTL: 15, + patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"15\"}}}", + }, + { + node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "name"}}, + desiredTTL: 30, + patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"30\"}}}", + }, + { + node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "name", Annotations: map[string]string{"node.alpha.kubernetes.io/ttl": "0"}}}, + desiredTTL: 60, + patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"60\"}}}", + }, + { + node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "name", Annotations: map[string]string{"node.alpha.kubernetes.io/ttl": "60"}}}, + desiredTTL: 60, + patch: "", + }, + { + node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "name", Annotations: map[string]string{"node.alpha.kubernetes.io/ttl": "60"}}}, + desiredTTL: 30, + patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"30\"}}}", + }, + } + + for i, testCase := range testCases { + fakeClient := &fake.Clientset{} + nodeStore := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + nodeStore.Add(testCase.node) + ttlController := &TTLController{ + kubeClient: fakeClient, + nodeStore: listers.NewNodeLister(nodeStore), + desiredTTLSeconds: testCase.desiredTTL, + } + if err := ttlController.updateNodeIfNeeded(testCase.node.Name); err != nil { + t.Errorf("%d: unexpected error: %v", i, err) + continue + } + actions := fakeClient.Actions() + if testCase.patch == "" { + assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions) + } else { + assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions) + patchAction := actions[0].(core.PatchActionImpl) + assert.Equal(t, testCase.patch, string(patchAction.Patch), "%d: unexpected patch: %s", i, string(patchAction.Patch)) + } + } +} + +func TestDesiredTTL(t *testing.T) { + testCases := []struct { + addNode bool + deleteNode bool + nodeCount int + desiredTTL int + boundaryStep int + expectedTTL int + }{ + { + addNode: true, + nodeCount: 0, + desiredTTL: 0, + boundaryStep: 0, + expectedTTL: 0, + }, + { + addNode: true, + nodeCount: 99, + desiredTTL: 0, + boundaryStep: 0, + expectedTTL: 0, + }, + { + addNode: true, + nodeCount: 100, + desiredTTL: 0, + boundaryStep: 0, + expectedTTL: 15, + }, + { + deleteNode: true, + nodeCount: 101, + desiredTTL: 15, + boundaryStep: 1, + expectedTTL: 15, + }, + { + deleteNode: true, + nodeCount: 91, + desiredTTL: 15, + boundaryStep: 1, + expectedTTL: 15, + }, + { + addNode: true, + nodeCount: 91, + desiredTTL: 15, + boundaryStep: 1, + expectedTTL: 15, + }, + { + deleteNode: true, + nodeCount: 90, + desiredTTL: 15, + boundaryStep: 1, + expectedTTL: 0, + }, + } + + for i, testCase := range testCases { + ttlController := &TTLController{ + queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + nodeCount: testCase.nodeCount, + desiredTTLSeconds: testCase.desiredTTL, + boundaryStep: testCase.boundaryStep, + } + if testCase.addNode { + ttlController.addNode(&v1.Node{}) + } + if testCase.deleteNode { + ttlController.deleteNode(&v1.Node{}) + } + assert.Equal(t, testCase.expectedTTL, ttlController.getDesiredTTLSeconds(), + "%d: unexpected ttl: %d", i, ttlController.getDesiredTTLSeconds()) + } +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c2f38a54894..c6fe21cf028 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -406,11 +406,6 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub } containerRefManager := kubecontainer.NewRefManager() - secretManager, err := secret.NewCachingSecretManager(kubeDeps.KubeClient) - if err != nil { - return nil, fmt.Errorf("failed to initialize secret manager: %v", err) - } - oomWatcher := NewOOMWatcher(kubeDeps.CAdvisorInterface, kubeDeps.Recorder) klet := &Kubelet{ @@ -436,7 +431,6 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub recorder: kubeDeps.Recorder, cadvisor: kubeDeps.CAdvisorInterface, diskSpaceManager: diskSpaceManager, - secretManager: secretManager, cloud: kubeDeps.Cloud, autoDetectCloudProvider: (componentconfigv1alpha1.AutoDetectCloudProvider == kubeCfg.CloudProvider), nodeRef: nodeRef, @@ -471,6 +465,13 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate), } + secretManager, err := secret.NewCachingSecretManager( + kubeDeps.KubeClient, secret.GetObjectTTLFromNodeFunc(klet.GetNode)) + if err != nil { + return nil, fmt.Errorf("failed to initialize secret manager: %v", err) + } + klet.secretManager = secretManager + if klet.experimentalHostUserNamespaceDefaulting { glog.Infof("Experimental host user namespace defaulting is enabled.") } diff --git a/pkg/kubelet/secret/secret_manager.go b/pkg/kubelet/secret/secret_manager.go index 207fa853c0f..17248307544 100644 --- a/pkg/kubelet/secret/secret_manager.go +++ b/pkg/kubelet/secret/secret_manager.go @@ -18,6 +18,7 @@ package secret import ( "fmt" + "strconv" "sync" "time" @@ -32,6 +33,10 @@ import ( "k8s.io/client-go/util/clock" ) +const ( + defaultTTL = time.Minute +) + type Manager interface { // Get secret by secret namespace and name. GetSecret(namespace, name string) (*v1.Secret, error) @@ -67,6 +72,8 @@ func (s *simpleSecretManager) RegisterPod(pod *v1.Pod) { func (s *simpleSecretManager) UnregisterPod(pod *v1.Pod) { } +type GetObjectTTLFunc func() (time.Duration, bool) + type objectKey struct { namespace string name string @@ -93,15 +100,18 @@ type secretStore struct { lock sync.Mutex items map[objectKey]*secretStoreItem - ttl time.Duration + + defaultTTL time.Duration + getTTL GetObjectTTLFunc } -func newSecretStore(kubeClient clientset.Interface, clock clock.Clock, ttl time.Duration) *secretStore { +func newSecretStore(kubeClient clientset.Interface, clock clock.Clock, getTTL GetObjectTTLFunc, ttl time.Duration) *secretStore { return &secretStore{ kubeClient: kubeClient, clock: clock, items: make(map[objectKey]*secretStoreItem), - ttl: ttl, + defaultTTL: ttl, + getTTL: getTTL, } } @@ -149,6 +159,31 @@ func (s *secretStore) Delete(namespace, name string) { } } +func GetObjectTTLFromNodeFunc(getNode func() (*v1.Node, error)) GetObjectTTLFunc { + return func() (time.Duration, bool) { + node, err := getNode() + if err != nil { + return time.Duration(0), false + } + if node != nil && node.Annotations != nil { + if value, ok := node.Annotations[v1.ObjectTTLAnnotationKey]; ok { + if intValue, err := strconv.Atoi(value); err == nil { + return time.Duration(intValue) * time.Second, true + } + } + } + return time.Duration(0), false + } +} + +func (s *secretStore) isSecretFresh(data *secretData) bool { + secretTTL := s.defaultTTL + if ttl, ok := s.getTTL(); ok { + secretTTL = ttl + } + return s.clock.Now().Before(data.lastUpdateTime.Add(secretTTL)) +} + func (s *secretStore) Get(namespace, name string) (*v1.Secret, error) { key := objectKey{namespace: namespace, name: name} @@ -172,7 +207,7 @@ func (s *secretStore) Get(namespace, name string) (*v1.Secret, error) { // needed and return data. data.Lock() defer data.Unlock() - if data.err != nil || !s.clock.Now().Before(data.lastUpdateTime.Add(s.ttl)) { + if data.err != nil || !s.isSecretFresh(data) { opts := metav1.GetOptions{} if data.secret != nil && data.err == nil { // This is just a periodic refresh of a secret we successfully fetched previously. @@ -212,9 +247,9 @@ type cachingSecretManager struct { registeredPods map[objectKey]*v1.Pod } -func NewCachingSecretManager(kubeClient clientset.Interface) (Manager, error) { +func NewCachingSecretManager(kubeClient clientset.Interface, getTTL GetObjectTTLFunc) (Manager, error) { csm := &cachingSecretManager{ - secretStore: newSecretStore(kubeClient, clock.RealClock{}, time.Minute), + secretStore: newSecretStore(kubeClient, clock.RealClock{}, getTTL, defaultTTL), registeredPods: make(map[objectKey]*v1.Pod), } return csm, nil diff --git a/pkg/kubelet/secret/secret_manager_test.go b/pkg/kubelet/secret/secret_manager_test.go index 1fee59897d2..b809d23b94b 100644 --- a/pkg/kubelet/secret/secret_manager_test.go +++ b/pkg/kubelet/secret/secret_manager_test.go @@ -46,9 +46,13 @@ func checkSecret(t *testing.T, store *secretStore, ns, name string, shouldExist } } +func noObjectTTL() (time.Duration, bool) { + return time.Duration(0), false +} + func TestSecretStore(t *testing.T) { fakeClient := &fake.Clientset{} - store := newSecretStore(fakeClient, clock.RealClock{}, 0) + store := newSecretStore(fakeClient, clock.RealClock{}, noObjectTTL, 0) store.Add("ns1", "name1") store.Add("ns2", "name2") store.Add("ns1", "name1") @@ -82,7 +86,7 @@ func TestSecretStore(t *testing.T) { func TestSecretStoreDeletingSecret(t *testing.T) { fakeClient := &fake.Clientset{} - store := newSecretStore(fakeClient, clock.RealClock{}, 0) + store := newSecretStore(fakeClient, clock.RealClock{}, noObjectTTL, 0) store.Add("ns", "name") result := &v1.Secret{ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name", ResourceVersion: "10"}} @@ -112,7 +116,7 @@ func TestSecretStoreDeletingSecret(t *testing.T) { func TestSecretStoreGetAlwaysRefresh(t *testing.T) { fakeClient := &fake.Clientset{} fakeClock := clock.NewFakeClock(time.Now()) - store := newSecretStore(fakeClient, fakeClock, 0) + store := newSecretStore(fakeClient, fakeClock, noObjectTTL, 0) for i := 0; i < 10; i++ { store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i)) @@ -139,7 +143,7 @@ func TestSecretStoreGetAlwaysRefresh(t *testing.T) { func TestSecretStoreGetNeverRefresh(t *testing.T) { fakeClient := &fake.Clientset{} fakeClock := clock.NewFakeClock(time.Now()) - store := newSecretStore(fakeClient, fakeClock, time.Minute) + store := newSecretStore(fakeClient, fakeClock, noObjectTTL, time.Minute) for i := 0; i < 10; i++ { store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i)) @@ -160,6 +164,131 @@ func TestSecretStoreGetNeverRefresh(t *testing.T) { assert.Equal(t, 10, len(actions), "unexpected actions: %#v", actions) } +func TestCustomTTL(t *testing.T) { + ttl := time.Duration(0) + ttlExists := false + customTTL := func() (time.Duration, bool) { + return ttl, ttlExists + } + + fakeClient := &fake.Clientset{} + fakeClock := clock.NewFakeClock(time.Time{}) + store := newSecretStore(fakeClient, fakeClock, customTTL, time.Minute) + + store.Add("ns", "name") + store.Get("ns", "name") + fakeClient.ClearActions() + + // Set 0-ttl and see if that works. + ttl = time.Duration(0) + ttlExists = true + store.Get("ns", "name") + actions := fakeClient.Actions() + assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions) + fakeClient.ClearActions() + + // Set 5-minute ttl and see if this works. + ttl = time.Duration(5) * time.Minute + store.Get("ns", "name") + actions = fakeClient.Actions() + assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions) + // Still no effect after 4 minutes. + fakeClock.Step(4 * time.Minute) + store.Get("ns", "name") + actions = fakeClient.Actions() + assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions) + // Now it should have an effect. + fakeClock.Step(time.Minute) + store.Get("ns", "name") + actions = fakeClient.Actions() + assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions) + fakeClient.ClearActions() + + // Now remove the custom ttl and see if that works. + ttlExists = false + fakeClock.Step(55 * time.Second) + store.Get("ns", "name") + actions = fakeClient.Actions() + assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions) + // Pass the minute and it should be triggered now. + fakeClock.Step(5 * time.Second) + store.Get("ns", "name") + actions = fakeClient.Actions() + assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions) +} + +func TestParseNodeAnnotation(t *testing.T) { + testCases := []struct { + node *v1.Node + err error + exists bool + ttl time.Duration + }{ + { + node: nil, + err: fmt.Errorf("error"), + exists: false, + }, + { + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node", + }, + }, + exists: false, + }, + { + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node", + Annotations: map[string]string{}, + }, + }, + exists: false, + }, + { + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node", + Annotations: map[string]string{v1.ObjectTTLAnnotationKey: "bad"}, + }, + }, + exists: false, + }, + { + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node", + Annotations: map[string]string{v1.ObjectTTLAnnotationKey: "0"}, + }, + }, + exists: true, + ttl: time.Duration(0), + }, + { + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node", + Annotations: map[string]string{v1.ObjectTTLAnnotationKey: "60"}, + }, + }, + exists: true, + ttl: time.Minute, + }, + } + for i, testCase := range testCases { + getNode := func() (*v1.Node, error) { return testCase.node, testCase.err } + ttl, exists := GetObjectTTLFromNodeFunc(getNode)() + if exists != testCase.exists { + t.Errorf("%d: incorrect parsing: %t", i, exists) + continue + } + if exists && ttl != testCase.ttl { + t.Errorf("%d: incorrect ttl: %v", i, ttl) + } + } +} + type envSecrets struct { envVarNames []string envFromNames []string @@ -215,7 +344,7 @@ func podWithSecrets(ns, name string, toAttach secretsToAttach) *v1.Pod { func TestCacheInvalidation(t *testing.T) { fakeClient := &fake.Clientset{} fakeClock := clock.NewFakeClock(time.Now()) - store := newSecretStore(fakeClient, fakeClock, time.Minute) + store := newSecretStore(fakeClient, fakeClock, noObjectTTL, time.Minute) manager := &cachingSecretManager{ secretStore: store, registeredPods: make(map[objectKey]*v1.Pod), @@ -273,7 +402,7 @@ func TestCacheInvalidation(t *testing.T) { func TestCacheRefcounts(t *testing.T) { fakeClient := &fake.Clientset{} fakeClock := clock.NewFakeClock(time.Now()) - store := newSecretStore(fakeClient, fakeClock, time.Minute) + store := newSecretStore(fakeClient, fakeClock, noObjectTTL, time.Minute) manager := &cachingSecretManager{ secretStore: store, registeredPods: make(map[objectKey]*v1.Pod), @@ -349,7 +478,7 @@ func TestCacheRefcounts(t *testing.T) { func TestCachingSecretManager(t *testing.T) { fakeClient := &fake.Clientset{} - secretStore := newSecretStore(fakeClient, clock.RealClock{}, 0) + secretStore := newSecretStore(fakeClient, clock.RealClock{}, noObjectTTL, 0) manager := &cachingSecretManager{ secretStore: secretStore, registeredPods: make(map[objectKey]*v1.Pod), diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index 4473d1e72c0..f488cb4f0a5 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -264,6 +264,13 @@ func init() { eventsRule(), }, }) + addControllerRole(rbac.ClusterRole{ + ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "ttl-controller"}, + Rules: []rbac.PolicyRule{ + rbac.NewRule("update", "patch", "list", "watch").Groups(legacyGroup).Resources("nodes").RuleOrDie(), + eventsRule(), + }, + }) addControllerRole(rbac.ClusterRole{ ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "certificate-controller"}, Rules: []rbac.PolicyRule{ diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-role-bindings.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-role-bindings.yaml index 759c66215cf..6f7e5dd7465 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-role-bindings.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-role-bindings.yaml @@ -315,5 +315,20 @@ items: - kind: ServiceAccount name: statefulset-controller namespace: kube-system +- apiVersion: rbac.authorization.k8s.io/v1beta1 + kind: ClusterRoleBinding + metadata: + creationTimestamp: null + labels: + kubernetes.io/bootstrapping: rbac-defaults + name: system:controller:ttl-controller + roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: system:controller:ttl-controller + subjects: + - kind: ServiceAccount + name: ttl-controller + namespace: kube-system kind: List metadata: {} diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml index 5f5aeba044b..901ea67108a 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml @@ -905,5 +905,30 @@ items: - create - patch - update +- apiVersion: rbac.authorization.k8s.io/v1beta1 + kind: ClusterRole + metadata: + creationTimestamp: null + labels: + kubernetes.io/bootstrapping: rbac-defaults + name: system:controller:ttl-controller + rules: + - apiGroups: + - "" + resources: + - nodes + verbs: + - list + - patch + - update + - watch + - apiGroups: + - "" + resources: + - events + verbs: + - create + - patch + - update kind: List metadata: {} diff --git a/test/integration/ttlcontroller/ttlcontroller_test.go b/test/integration/ttlcontroller/ttlcontroller_test.go new file mode 100644 index 00000000000..1cb93c7d7f5 --- /dev/null +++ b/test/integration/ttlcontroller/ttlcontroller_test.go @@ -0,0 +1,141 @@ +// +build integration,!no-etcd + +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ttlcontroller + +import ( + "fmt" + "net/http/httptest" + "strconv" + "sync" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" + restclient "k8s.io/client-go/rest" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated" + listers "k8s.io/kubernetes/pkg/client/listers/core/v1" + "k8s.io/kubernetes/pkg/controller/ttl" + "k8s.io/kubernetes/test/integration/framework" +) + +func createClientAndInformers(t *testing.T, server *httptest.Server) (*clientset.Clientset, informers.SharedInformerFactory) { + config := restclient.Config{ + Host: server.URL, + QPS: 500, + Burst: 500, + } + testClient := clientset.NewForConfigOrDie(&config) + + informers := informers.NewSharedInformerFactory(nil, testClient, time.Second) + return testClient, informers +} + +func createNodes(t *testing.T, client *clientset.Clientset, startIndex, endIndex int) { + var wg sync.WaitGroup + for i := startIndex; i < endIndex; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("node-%d", idx), + }, + } + if _, err := client.Core().Nodes().Create(node); err != nil { + t.Fatalf("Failed to create node: %v", err) + } + }(i) + } + wg.Wait() +} + +func deleteNodes(t *testing.T, client *clientset.Clientset, startIndex, endIndex int) { + var wg sync.WaitGroup + for i := startIndex; i < endIndex; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + name := fmt.Sprintf("node-%d", idx) + if err := client.Core().Nodes().Delete(name, &metav1.DeleteOptions{}); err != nil { + t.Fatalf("Failed to create node: %v", err) + } + }(i) + } + wg.Wait() +} + +func waitForNodesWithTTLAnnotation(t *testing.T, nodeLister listers.NodeLister, numNodes, ttlSeconds int) { + if err := wait.Poll(time.Second, 30*time.Second, func() (bool, error) { + nodes, err := nodeLister.List(labels.Everything()) + if err != nil || len(nodes) != numNodes { + return false, nil + } + for _, node := range nodes { + if node.Annotations == nil { + return false, nil + } + value, ok := node.Annotations[v1.ObjectTTLAnnotationKey] + if !ok { + return false, nil + } + currentTTL, err := strconv.Atoi(value) + if err != nil || currentTTL != ttlSeconds { + return false, nil + } + } + return true, nil + }); err != nil { + t.Fatalf("Failed waiting for all nodes with annotation: %v", err) + } +} + +// Test whether ttlcontroller sets correct ttl annotations. +func TestTTLAnnotations(t *testing.T) { + _, server := framework.RunAMaster(nil) + defer server.Close() + + testClient, informers := createClientAndInformers(t, server) + nodeInformer := informers.Core().V1().Nodes() + ttlc := ttl.NewTTLController(nodeInformer, testClient) + + stopCh := make(chan struct{}) + defer close(stopCh) + go nodeInformer.Informer().Run(stopCh) + go ttlc.Run(1, stopCh) + + // Create 100 nodes all should have annotation equal to 0. + createNodes(t, testClient, 0, 100) + waitForNodesWithTTLAnnotation(t, informers.Core().V1().Nodes().Lister(), 100, 0) + + // Create 1 more node, all annotation should change to 15. + createNodes(t, testClient, 100, 101) + waitForNodesWithTTLAnnotation(t, informers.Core().V1().Nodes().Lister(), 101, 15) + + // Delete 11 nodes, it should still remain at the level of 15. + deleteNodes(t, testClient, 90, 101) + waitForNodesWithTTLAnnotation(t, informers.Core().V1().Nodes().Lister(), 90, 15) + + // Delete 1 more node, all should be decreased to 0. + deleteNodes(t, testClient, 89, 90) + waitForNodesWithTTLAnnotation(t, informers.Core().V1().Nodes().Lister(), 89, 0) +}