Merge pull request #40796 from wojtek-t/use_node_ttl_in_secret_manager

Automatic merge from submit-queue (batch tested with PRs 40796, 40878, 36033, 40838, 41210)

Implement TTL controller and use the ttl annotation attached to node in secret manager

For every secret attached to a pod as volume, Kubelet is trying to refresh it every sync period. Currently Kubelet has a ttl-cache of secrets of its pods and the ttl is set to 1 minute. That means that in large clusters we are targetting (5k nodes, 30pods/node), given that each pod has a secret associated with ServiceAccount from its namespaces, and with large enough number of namespaces (where on each node (almost) every pod is from a different namespace), that resource in ~30 GETs to refresh all secrets every minute from one node, which gives ~2500QPS for GET secrets to apiserver.

Apiserver cannot keep up with it very easily.

Desired solution would be to watch for secret changes, but because of security we don't want a node watching for all secrets, and it is not possible for now to watch only for secrets attached to pods from my node.

So as a temporary solution, we are introducing an annotation that would be a suggestion for kubelet for the TTL of secrets in the cache and a very simple controller that would be setting this annotation based on the cluster size (the large cluster is, the bigger ttl is). 
That workaround mean that only very local changes are needed in Kubelet, we are creating a well separated very simple controller, and once watching "my secrets" will be possible it will be easy to remove it and switch to that. And it will allow us to reach scalability goals.

@dchen1107 @thockin @liggitt
This commit is contained in:
Kubernetes Submit Queue 2017-02-10 00:04:44 -08:00 committed by GitHub
commit 8188c3cca4
16 changed files with 982 additions and 19 deletions

View File

@ -64,6 +64,7 @@ go_library(
"//pkg/controller/service:go_default_library", "//pkg/controller/service:go_default_library",
"//pkg/controller/serviceaccount:go_default_library", "//pkg/controller/serviceaccount:go_default_library",
"//pkg/controller/statefulset:go_default_library", "//pkg/controller/statefulset:go_default_library",
"//pkg/controller/ttl:go_default_library",
"//pkg/controller/volume/attachdetach:go_default_library", "//pkg/controller/volume/attachdetach:go_default_library",
"//pkg/controller/volume/persistentvolume:go_default_library", "//pkg/controller/volume/persistentvolume:go_default_library",
"//pkg/features:go_default_library", "//pkg/features:go_default_library",

View File

@ -287,6 +287,7 @@ func newControllerInitializers() map[string]InitFunc {
controllers["statefuleset"] = startStatefulSetController controllers["statefuleset"] = startStatefulSetController
controllers["cronjob"] = startCronJobController controllers["cronjob"] = startCronJobController
controllers["certificatesigningrequests"] = startCSRController controllers["certificatesigningrequests"] = startCSRController
controllers["ttl"] = startTTLController
return controllers return controllers
} }

View File

@ -40,6 +40,7 @@ import (
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota" resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
ttlcontroller "k8s.io/kubernetes/pkg/controller/ttl"
quotainstall "k8s.io/kubernetes/pkg/quota/install" quotainstall "k8s.io/kubernetes/pkg/quota/install"
) )
@ -141,6 +142,14 @@ func startServiceAccountController(ctx ControllerContext) (bool, error) {
return true, nil return true, nil
} }
func startTTLController(ctx ControllerContext) (bool, error) {
go ttlcontroller.NewTTLController(
ctx.NewInformerFactory.Core().V1().Nodes(),
ctx.ClientBuilder.ClientOrDie("ttl-controller"),
).Run(5, ctx.Stop)
return true, nil
}
func startGarbageCollectorController(ctx ControllerContext) (bool, error) { func startGarbageCollectorController(ctx ControllerContext) (bool, error) {
if !ctx.Options.EnableGarbageCollector { if !ctx.Options.EnableGarbageCollector {
return false, nil return false, nil

View File

@ -466,6 +466,11 @@ const (
// is at-your-own-risk. Pods that attempt to set an unsafe sysctl that is not enabled for a kubelet // is at-your-own-risk. Pods that attempt to set an unsafe sysctl that is not enabled for a kubelet
// will fail to launch. // will fail to launch.
UnsafeSysctlsPodAnnotationKey string = "security.alpha.kubernetes.io/unsafe-sysctls" UnsafeSysctlsPodAnnotationKey string = "security.alpha.kubernetes.io/unsafe-sysctls"
// ObjectTTLAnnotations represents a suggestion for kubelet for how long it can cache
// an object (e.g. secret, config map) before fetching it again from apiserver.
// This annotation can be attached to node.
ObjectTTLAnnotationKey string = "node.alpha.kubernetes.io/ttl"
) )
// TolerationToleratesTaint checks if the toleration tolerates the taint. // TolerationToleratesTaint checks if the toleration tolerates the taint.

View File

@ -271,6 +271,11 @@ const (
// is at-your-own-risk. Pods that attempt to set an unsafe sysctl that is not enabled for a kubelet // is at-your-own-risk. Pods that attempt to set an unsafe sysctl that is not enabled for a kubelet
// will fail to launch. // will fail to launch.
UnsafeSysctlsPodAnnotationKey string = "security.alpha.kubernetes.io/unsafe-sysctls" UnsafeSysctlsPodAnnotationKey string = "security.alpha.kubernetes.io/unsafe-sysctls"
// ObjectTTLAnnotations represents a suggestion for kubelet for how long it can cache
// an object (e.g. secret, config map) before fetching it again from apiserver.
// This annotation can be attached to node.
ObjectTTLAnnotationKey string = "node.alpha.kubernetes.io/ttl"
) )
// GetTolerationsFromPodAnnotations gets the json serialized tolerations data from Pod.Annotations // GetTolerationsFromPodAnnotations gets the json serialized tolerations data from Pod.Annotations

View File

@ -105,6 +105,7 @@ filegroup(
"//pkg/controller/service:all-srcs", "//pkg/controller/service:all-srcs",
"//pkg/controller/serviceaccount:all-srcs", "//pkg/controller/serviceaccount:all-srcs",
"//pkg/controller/statefulset:all-srcs", "//pkg/controller/statefulset:all-srcs",
"//pkg/controller/ttl:all-srcs",
"//pkg/controller/volume/attachdetach:all-srcs", "//pkg/controller/volume/attachdetach:all-srcs",
"//pkg/controller/volume/persistentvolume:all-srcs", "//pkg/controller/volume/persistentvolume:all-srcs",
], ],

62
pkg/controller/ttl/BUILD Normal file
View File

@ -0,0 +1,62 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = ["ttlcontroller.go"],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/informers/informers_generated/core/v1:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/controller:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/json",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/strategicpatch",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/util/workqueue",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)
go_test(
name = "go_default_test",
srcs = ["ttlcontroller_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//vendor:github.com/stretchr/testify/assert",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/client-go/testing",
"//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/util/workqueue",
],
)

View File

@ -0,0 +1,295 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// The TTLController sets ttl annotations on nodes, based on cluster size.
// The annotations are consumed by Kubelets as suggestions for how long
// it can cache objects (e.g. secrets or config maps) before refetching
// from apiserver again.
//
// TODO: This is a temporary workaround for the Kubelet not being able to
// send "watch secrets attached to pods from my node" request. Once
// sending such request will be possible, we will modify Kubelet to
// use it and get rid of this controller completely.
package ttl
import (
"fmt"
"math"
"strconv"
"sync"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/core/v1"
listers "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/controller"
"github.com/golang/glog"
)
type TTLController struct {
kubeClient clientset.Interface
// nodeStore is a local cache of nodes.
nodeStore listers.NodeLister
// Nodes that need to be synced.
queue workqueue.RateLimitingInterface
// Returns true if all underlying informers are synced.
hasSynced func() bool
lock sync.RWMutex
// Number of nodes in the cluster.
nodeCount int
// Desired TTL for all nodes in the cluster.
desiredTTLSeconds int
// In which interval of cluster size we currently are.
boundaryStep int
}
func NewTTLController(nodeInformer informers.NodeInformer, kubeClient clientset.Interface) *TTLController {
ttlc := &TTLController{
kubeClient: kubeClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ttlcontroller"),
}
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ttlc.addNode,
UpdateFunc: ttlc.updateNode,
DeleteFunc: ttlc.deleteNode,
})
ttlc.nodeStore = listers.NewNodeLister(nodeInformer.Informer().GetIndexer())
ttlc.hasSynced = nodeInformer.Informer().HasSynced
return ttlc
}
type ttlBoundary struct {
sizeMin int
sizeMax int
ttlSeconds int
}
var (
ttlBoundaries = []ttlBoundary{
{sizeMin: 0, sizeMax: 100, ttlSeconds: 0},
{sizeMin: 90, sizeMax: 500, ttlSeconds: 15},
{sizeMin: 450, sizeMax: 1000, ttlSeconds: 30},
{sizeMin: 900, sizeMax: 2000, ttlSeconds: 60},
{sizeMin: 1800, sizeMax: 10000, ttlSeconds: 300},
{sizeMin: 9000, sizeMax: math.MaxInt32, ttlSeconds: 600},
}
)
func (ttlc *TTLController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer ttlc.queue.ShutDown()
glog.Infof("Starting TTL controller")
defer glog.Infof("Shutting down TTL controller")
if !cache.WaitForCacheSync(stopCh, ttlc.hasSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(ttlc.worker, time.Second, stopCh)
}
<-stopCh
}
func (ttlc *TTLController) addNode(obj interface{}) {
node, ok := obj.(*v1.Node)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
func() {
ttlc.lock.Lock()
defer ttlc.lock.Unlock()
ttlc.nodeCount++
if ttlc.nodeCount > ttlBoundaries[ttlc.boundaryStep].sizeMax {
ttlc.boundaryStep++
ttlc.desiredTTLSeconds = ttlBoundaries[ttlc.boundaryStep].ttlSeconds
}
}()
ttlc.enqueueNode(node)
}
func (ttlc *TTLController) updateNode(_, newObj interface{}) {
node, ok := newObj.(*v1.Node)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
return
}
// Processing all updates of nodes guarantees that we will update
// the ttl annotation, when cluster size changes.
// We are relying on the fact that Kubelet is updating node status
// every 10s (or generally every X seconds), which means that whenever
// required, its ttl annotation should be updated within that period.
ttlc.enqueueNode(node)
}
func (ttlc *TTLController) deleteNode(obj interface{}) {
_, ok := obj.(*v1.Node)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
_, ok = tombstone.Obj.(*v1.Node)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object types: %v", obj))
return
}
}
func() {
ttlc.lock.Lock()
defer ttlc.lock.Unlock()
ttlc.nodeCount--
if ttlc.nodeCount < ttlBoundaries[ttlc.boundaryStep].sizeMin {
ttlc.boundaryStep--
ttlc.desiredTTLSeconds = ttlBoundaries[ttlc.boundaryStep].ttlSeconds
}
}()
// We are not processing the node, as it no longer exists.
}
func (ttlc *TTLController) enqueueNode(node *v1.Node) {
key, err := controller.KeyFunc(node)
if err != nil {
glog.Errorf("Couldn't get key for object %+v", node)
return
}
ttlc.queue.Add(key)
}
func (ttlc *TTLController) worker() {
for ttlc.processItem() {
}
}
func (ttlc *TTLController) processItem() bool {
key, quit := ttlc.queue.Get()
if quit {
return false
}
defer ttlc.queue.Done(key)
err := ttlc.updateNodeIfNeeded(key.(string))
if err == nil {
ttlc.queue.Forget(key)
return true
}
ttlc.queue.AddRateLimited(key)
utilruntime.HandleError(err)
return true
}
func (ttlc *TTLController) getDesiredTTLSeconds() int {
ttlc.lock.RLock()
defer ttlc.lock.RUnlock()
return ttlc.desiredTTLSeconds
}
func getIntFromAnnotation(node *v1.Node, annotationKey string) (int, bool) {
if node.Annotations == nil {
return 0, false
}
annotationValue, ok := node.Annotations[annotationKey]
if !ok {
return 0, false
}
intValue, err := strconv.Atoi(annotationValue)
if err != nil {
glog.Warningf("Cannot convert the value %q with annotation key %q for the node %q",
annotationValue, annotationKey, node.Name)
return 0, false
}
return intValue, true
}
func setIntAnnotation(node *v1.Node, annotationKey string, value int) {
if node.Annotations == nil {
node.Annotations = make(map[string]string)
}
node.Annotations[annotationKey] = strconv.Itoa(value)
}
func (ttlc *TTLController) patchNodeWithAnnotation(node *v1.Node, annotationKey string, value int) error {
oldData, err := json.Marshal(node)
if err != nil {
return err
}
setIntAnnotation(node, annotationKey, value)
newData, err := json.Marshal(node)
if err != nil {
return err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{})
if err != nil {
return err
}
_, err = ttlc.kubeClient.Core().Nodes().Patch(node.Name, types.StrategicMergePatchType, patchBytes)
if err != nil {
glog.V(2).Infof("Failed to change ttl annotation for node %s: %v", node.Name, err)
return err
}
glog.V(2).Infof("Changed ttl annotation for node %s to %d seconds", node.Name, value)
return nil
}
func (ttlc *TTLController) updateNodeIfNeeded(key string) error {
node, err := ttlc.nodeStore.Get(key)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}
desiredTTL := ttlc.getDesiredTTLSeconds()
currentTTL, ok := getIntFromAnnotation(node, v1.ObjectTTLAnnotationKey)
if ok && currentTTL == desiredTTL {
return nil
}
objCopy, err := api.Scheme.DeepCopy(node)
if err != nil {
return err
}
return ttlc.patchNodeWithAnnotation(objCopy.(*v1.Node), v1.ObjectTTLAnnotationKey, desiredTTL)
}

View File

@ -0,0 +1,231 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ttl
import (
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
listers "k8s.io/kubernetes/pkg/client/listers/core/v1"
"github.com/stretchr/testify/assert"
)
func TestPatchNode(t *testing.T) {
testCases := []struct {
node *v1.Node
ttlSeconds int
patch string
}{
{
node: &v1.Node{},
ttlSeconds: 0,
patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"0\"}}}",
},
{
node: &v1.Node{},
ttlSeconds: 10,
patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"10\"}}}",
},
{
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "name"}},
ttlSeconds: 10,
patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"10\"}}}",
},
{
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{}}},
ttlSeconds: 10,
patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"10\"}}}",
},
{
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{"node.alpha.kubernetes.io/ttl": "0"}}},
ttlSeconds: 10,
patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"10\"}}}",
},
{
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{"node.alpha.kubernetes.io/ttl": "0", "a": "b"}}},
ttlSeconds: 10,
patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"10\"}}}",
},
{
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{"node.alpha.kubernetes.io/ttl": "10", "a": "b"}}},
ttlSeconds: 10,
patch: "{}",
},
}
for i, testCase := range testCases {
fakeClient := &fake.Clientset{}
ttlController := &TTLController{
kubeClient: fakeClient,
}
err := ttlController.patchNodeWithAnnotation(testCase.node, v1.ObjectTTLAnnotationKey, testCase.ttlSeconds)
if err != nil {
t.Errorf("%d: unexpected error: %v", i, err)
continue
}
actions := fakeClient.Actions()
assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions)
patchAction := actions[0].(core.PatchActionImpl)
assert.Equal(t, testCase.patch, string(patchAction.Patch), "%d: unexpected patch: %s", i, string(patchAction.Patch))
}
}
func TestUpdateNodeIfNeeded(t *testing.T) {
testCases := []struct {
node *v1.Node
desiredTTL int
patch string
}{
{
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "name"}},
desiredTTL: 0,
patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"0\"}}}",
},
{
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "name"}},
desiredTTL: 15,
patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"15\"}}}",
},
{
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "name"}},
desiredTTL: 30,
patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"30\"}}}",
},
{
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "name", Annotations: map[string]string{"node.alpha.kubernetes.io/ttl": "0"}}},
desiredTTL: 60,
patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"60\"}}}",
},
{
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "name", Annotations: map[string]string{"node.alpha.kubernetes.io/ttl": "60"}}},
desiredTTL: 60,
patch: "",
},
{
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "name", Annotations: map[string]string{"node.alpha.kubernetes.io/ttl": "60"}}},
desiredTTL: 30,
patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"30\"}}}",
},
}
for i, testCase := range testCases {
fakeClient := &fake.Clientset{}
nodeStore := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
nodeStore.Add(testCase.node)
ttlController := &TTLController{
kubeClient: fakeClient,
nodeStore: listers.NewNodeLister(nodeStore),
desiredTTLSeconds: testCase.desiredTTL,
}
if err := ttlController.updateNodeIfNeeded(testCase.node.Name); err != nil {
t.Errorf("%d: unexpected error: %v", i, err)
continue
}
actions := fakeClient.Actions()
if testCase.patch == "" {
assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions)
} else {
assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions)
patchAction := actions[0].(core.PatchActionImpl)
assert.Equal(t, testCase.patch, string(patchAction.Patch), "%d: unexpected patch: %s", i, string(patchAction.Patch))
}
}
}
func TestDesiredTTL(t *testing.T) {
testCases := []struct {
addNode bool
deleteNode bool
nodeCount int
desiredTTL int
boundaryStep int
expectedTTL int
}{
{
addNode: true,
nodeCount: 0,
desiredTTL: 0,
boundaryStep: 0,
expectedTTL: 0,
},
{
addNode: true,
nodeCount: 99,
desiredTTL: 0,
boundaryStep: 0,
expectedTTL: 0,
},
{
addNode: true,
nodeCount: 100,
desiredTTL: 0,
boundaryStep: 0,
expectedTTL: 15,
},
{
deleteNode: true,
nodeCount: 101,
desiredTTL: 15,
boundaryStep: 1,
expectedTTL: 15,
},
{
deleteNode: true,
nodeCount: 91,
desiredTTL: 15,
boundaryStep: 1,
expectedTTL: 15,
},
{
addNode: true,
nodeCount: 91,
desiredTTL: 15,
boundaryStep: 1,
expectedTTL: 15,
},
{
deleteNode: true,
nodeCount: 90,
desiredTTL: 15,
boundaryStep: 1,
expectedTTL: 0,
},
}
for i, testCase := range testCases {
ttlController := &TTLController{
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
nodeCount: testCase.nodeCount,
desiredTTLSeconds: testCase.desiredTTL,
boundaryStep: testCase.boundaryStep,
}
if testCase.addNode {
ttlController.addNode(&v1.Node{})
}
if testCase.deleteNode {
ttlController.deleteNode(&v1.Node{})
}
assert.Equal(t, testCase.expectedTTL, ttlController.getDesiredTTLSeconds(),
"%d: unexpected ttl: %d", i, ttlController.getDesiredTTLSeconds())
}
}

View File

@ -406,11 +406,6 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
} }
containerRefManager := kubecontainer.NewRefManager() containerRefManager := kubecontainer.NewRefManager()
secretManager, err := secret.NewCachingSecretManager(kubeDeps.KubeClient)
if err != nil {
return nil, fmt.Errorf("failed to initialize secret manager: %v", err)
}
oomWatcher := NewOOMWatcher(kubeDeps.CAdvisorInterface, kubeDeps.Recorder) oomWatcher := NewOOMWatcher(kubeDeps.CAdvisorInterface, kubeDeps.Recorder)
klet := &Kubelet{ klet := &Kubelet{
@ -436,7 +431,6 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
recorder: kubeDeps.Recorder, recorder: kubeDeps.Recorder,
cadvisor: kubeDeps.CAdvisorInterface, cadvisor: kubeDeps.CAdvisorInterface,
diskSpaceManager: diskSpaceManager, diskSpaceManager: diskSpaceManager,
secretManager: secretManager,
cloud: kubeDeps.Cloud, cloud: kubeDeps.Cloud,
autoDetectCloudProvider: (componentconfigv1alpha1.AutoDetectCloudProvider == kubeCfg.CloudProvider), autoDetectCloudProvider: (componentconfigv1alpha1.AutoDetectCloudProvider == kubeCfg.CloudProvider),
nodeRef: nodeRef, nodeRef: nodeRef,
@ -471,6 +465,13 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate), experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate),
} }
secretManager, err := secret.NewCachingSecretManager(
kubeDeps.KubeClient, secret.GetObjectTTLFromNodeFunc(klet.GetNode))
if err != nil {
return nil, fmt.Errorf("failed to initialize secret manager: %v", err)
}
klet.secretManager = secretManager
if klet.experimentalHostUserNamespaceDefaulting { if klet.experimentalHostUserNamespaceDefaulting {
glog.Infof("Experimental host user namespace defaulting is enabled.") glog.Infof("Experimental host user namespace defaulting is enabled.")
} }

View File

@ -18,6 +18,7 @@ package secret
import ( import (
"fmt" "fmt"
"strconv"
"sync" "sync"
"time" "time"
@ -32,6 +33,10 @@ import (
"k8s.io/client-go/util/clock" "k8s.io/client-go/util/clock"
) )
const (
defaultTTL = time.Minute
)
type Manager interface { type Manager interface {
// Get secret by secret namespace and name. // Get secret by secret namespace and name.
GetSecret(namespace, name string) (*v1.Secret, error) GetSecret(namespace, name string) (*v1.Secret, error)
@ -67,6 +72,8 @@ func (s *simpleSecretManager) RegisterPod(pod *v1.Pod) {
func (s *simpleSecretManager) UnregisterPod(pod *v1.Pod) { func (s *simpleSecretManager) UnregisterPod(pod *v1.Pod) {
} }
type GetObjectTTLFunc func() (time.Duration, bool)
type objectKey struct { type objectKey struct {
namespace string namespace string
name string name string
@ -93,15 +100,18 @@ type secretStore struct {
lock sync.Mutex lock sync.Mutex
items map[objectKey]*secretStoreItem items map[objectKey]*secretStoreItem
ttl time.Duration
defaultTTL time.Duration
getTTL GetObjectTTLFunc
} }
func newSecretStore(kubeClient clientset.Interface, clock clock.Clock, ttl time.Duration) *secretStore { func newSecretStore(kubeClient clientset.Interface, clock clock.Clock, getTTL GetObjectTTLFunc, ttl time.Duration) *secretStore {
return &secretStore{ return &secretStore{
kubeClient: kubeClient, kubeClient: kubeClient,
clock: clock, clock: clock,
items: make(map[objectKey]*secretStoreItem), items: make(map[objectKey]*secretStoreItem),
ttl: ttl, defaultTTL: ttl,
getTTL: getTTL,
} }
} }
@ -149,6 +159,31 @@ func (s *secretStore) Delete(namespace, name string) {
} }
} }
func GetObjectTTLFromNodeFunc(getNode func() (*v1.Node, error)) GetObjectTTLFunc {
return func() (time.Duration, bool) {
node, err := getNode()
if err != nil {
return time.Duration(0), false
}
if node != nil && node.Annotations != nil {
if value, ok := node.Annotations[v1.ObjectTTLAnnotationKey]; ok {
if intValue, err := strconv.Atoi(value); err == nil {
return time.Duration(intValue) * time.Second, true
}
}
}
return time.Duration(0), false
}
}
func (s *secretStore) isSecretFresh(data *secretData) bool {
secretTTL := s.defaultTTL
if ttl, ok := s.getTTL(); ok {
secretTTL = ttl
}
return s.clock.Now().Before(data.lastUpdateTime.Add(secretTTL))
}
func (s *secretStore) Get(namespace, name string) (*v1.Secret, error) { func (s *secretStore) Get(namespace, name string) (*v1.Secret, error) {
key := objectKey{namespace: namespace, name: name} key := objectKey{namespace: namespace, name: name}
@ -172,7 +207,7 @@ func (s *secretStore) Get(namespace, name string) (*v1.Secret, error) {
// needed and return data. // needed and return data.
data.Lock() data.Lock()
defer data.Unlock() defer data.Unlock()
if data.err != nil || !s.clock.Now().Before(data.lastUpdateTime.Add(s.ttl)) { if data.err != nil || !s.isSecretFresh(data) {
opts := metav1.GetOptions{} opts := metav1.GetOptions{}
if data.secret != nil && data.err == nil { if data.secret != nil && data.err == nil {
// This is just a periodic refresh of a secret we successfully fetched previously. // This is just a periodic refresh of a secret we successfully fetched previously.
@ -212,9 +247,9 @@ type cachingSecretManager struct {
registeredPods map[objectKey]*v1.Pod registeredPods map[objectKey]*v1.Pod
} }
func NewCachingSecretManager(kubeClient clientset.Interface) (Manager, error) { func NewCachingSecretManager(kubeClient clientset.Interface, getTTL GetObjectTTLFunc) (Manager, error) {
csm := &cachingSecretManager{ csm := &cachingSecretManager{
secretStore: newSecretStore(kubeClient, clock.RealClock{}, time.Minute), secretStore: newSecretStore(kubeClient, clock.RealClock{}, getTTL, defaultTTL),
registeredPods: make(map[objectKey]*v1.Pod), registeredPods: make(map[objectKey]*v1.Pod),
} }
return csm, nil return csm, nil

View File

@ -46,9 +46,13 @@ func checkSecret(t *testing.T, store *secretStore, ns, name string, shouldExist
} }
} }
func noObjectTTL() (time.Duration, bool) {
return time.Duration(0), false
}
func TestSecretStore(t *testing.T) { func TestSecretStore(t *testing.T) {
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
store := newSecretStore(fakeClient, clock.RealClock{}, 0) store := newSecretStore(fakeClient, clock.RealClock{}, noObjectTTL, 0)
store.Add("ns1", "name1") store.Add("ns1", "name1")
store.Add("ns2", "name2") store.Add("ns2", "name2")
store.Add("ns1", "name1") store.Add("ns1", "name1")
@ -82,7 +86,7 @@ func TestSecretStore(t *testing.T) {
func TestSecretStoreDeletingSecret(t *testing.T) { func TestSecretStoreDeletingSecret(t *testing.T) {
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
store := newSecretStore(fakeClient, clock.RealClock{}, 0) store := newSecretStore(fakeClient, clock.RealClock{}, noObjectTTL, 0)
store.Add("ns", "name") store.Add("ns", "name")
result := &v1.Secret{ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name", ResourceVersion: "10"}} result := &v1.Secret{ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name", ResourceVersion: "10"}}
@ -112,7 +116,7 @@ func TestSecretStoreDeletingSecret(t *testing.T) {
func TestSecretStoreGetAlwaysRefresh(t *testing.T) { func TestSecretStoreGetAlwaysRefresh(t *testing.T) {
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
fakeClock := clock.NewFakeClock(time.Now()) fakeClock := clock.NewFakeClock(time.Now())
store := newSecretStore(fakeClient, fakeClock, 0) store := newSecretStore(fakeClient, fakeClock, noObjectTTL, 0)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i)) store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i))
@ -139,7 +143,7 @@ func TestSecretStoreGetAlwaysRefresh(t *testing.T) {
func TestSecretStoreGetNeverRefresh(t *testing.T) { func TestSecretStoreGetNeverRefresh(t *testing.T) {
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
fakeClock := clock.NewFakeClock(time.Now()) fakeClock := clock.NewFakeClock(time.Now())
store := newSecretStore(fakeClient, fakeClock, time.Minute) store := newSecretStore(fakeClient, fakeClock, noObjectTTL, time.Minute)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i)) store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i))
@ -160,6 +164,131 @@ func TestSecretStoreGetNeverRefresh(t *testing.T) {
assert.Equal(t, 10, len(actions), "unexpected actions: %#v", actions) assert.Equal(t, 10, len(actions), "unexpected actions: %#v", actions)
} }
func TestCustomTTL(t *testing.T) {
ttl := time.Duration(0)
ttlExists := false
customTTL := func() (time.Duration, bool) {
return ttl, ttlExists
}
fakeClient := &fake.Clientset{}
fakeClock := clock.NewFakeClock(time.Time{})
store := newSecretStore(fakeClient, fakeClock, customTTL, time.Minute)
store.Add("ns", "name")
store.Get("ns", "name")
fakeClient.ClearActions()
// Set 0-ttl and see if that works.
ttl = time.Duration(0)
ttlExists = true
store.Get("ns", "name")
actions := fakeClient.Actions()
assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions)
fakeClient.ClearActions()
// Set 5-minute ttl and see if this works.
ttl = time.Duration(5) * time.Minute
store.Get("ns", "name")
actions = fakeClient.Actions()
assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions)
// Still no effect after 4 minutes.
fakeClock.Step(4 * time.Minute)
store.Get("ns", "name")
actions = fakeClient.Actions()
assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions)
// Now it should have an effect.
fakeClock.Step(time.Minute)
store.Get("ns", "name")
actions = fakeClient.Actions()
assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions)
fakeClient.ClearActions()
// Now remove the custom ttl and see if that works.
ttlExists = false
fakeClock.Step(55 * time.Second)
store.Get("ns", "name")
actions = fakeClient.Actions()
assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions)
// Pass the minute and it should be triggered now.
fakeClock.Step(5 * time.Second)
store.Get("ns", "name")
actions = fakeClient.Actions()
assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions)
}
func TestParseNodeAnnotation(t *testing.T) {
testCases := []struct {
node *v1.Node
err error
exists bool
ttl time.Duration
}{
{
node: nil,
err: fmt.Errorf("error"),
exists: false,
},
{
node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node",
},
},
exists: false,
},
{
node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node",
Annotations: map[string]string{},
},
},
exists: false,
},
{
node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node",
Annotations: map[string]string{v1.ObjectTTLAnnotationKey: "bad"},
},
},
exists: false,
},
{
node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node",
Annotations: map[string]string{v1.ObjectTTLAnnotationKey: "0"},
},
},
exists: true,
ttl: time.Duration(0),
},
{
node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node",
Annotations: map[string]string{v1.ObjectTTLAnnotationKey: "60"},
},
},
exists: true,
ttl: time.Minute,
},
}
for i, testCase := range testCases {
getNode := func() (*v1.Node, error) { return testCase.node, testCase.err }
ttl, exists := GetObjectTTLFromNodeFunc(getNode)()
if exists != testCase.exists {
t.Errorf("%d: incorrect parsing: %t", i, exists)
continue
}
if exists && ttl != testCase.ttl {
t.Errorf("%d: incorrect ttl: %v", i, ttl)
}
}
}
type envSecrets struct { type envSecrets struct {
envVarNames []string envVarNames []string
envFromNames []string envFromNames []string
@ -215,7 +344,7 @@ func podWithSecrets(ns, name string, toAttach secretsToAttach) *v1.Pod {
func TestCacheInvalidation(t *testing.T) { func TestCacheInvalidation(t *testing.T) {
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
fakeClock := clock.NewFakeClock(time.Now()) fakeClock := clock.NewFakeClock(time.Now())
store := newSecretStore(fakeClient, fakeClock, time.Minute) store := newSecretStore(fakeClient, fakeClock, noObjectTTL, time.Minute)
manager := &cachingSecretManager{ manager := &cachingSecretManager{
secretStore: store, secretStore: store,
registeredPods: make(map[objectKey]*v1.Pod), registeredPods: make(map[objectKey]*v1.Pod),
@ -273,7 +402,7 @@ func TestCacheInvalidation(t *testing.T) {
func TestCacheRefcounts(t *testing.T) { func TestCacheRefcounts(t *testing.T) {
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
fakeClock := clock.NewFakeClock(time.Now()) fakeClock := clock.NewFakeClock(time.Now())
store := newSecretStore(fakeClient, fakeClock, time.Minute) store := newSecretStore(fakeClient, fakeClock, noObjectTTL, time.Minute)
manager := &cachingSecretManager{ manager := &cachingSecretManager{
secretStore: store, secretStore: store,
registeredPods: make(map[objectKey]*v1.Pod), registeredPods: make(map[objectKey]*v1.Pod),
@ -349,7 +478,7 @@ func TestCacheRefcounts(t *testing.T) {
func TestCachingSecretManager(t *testing.T) { func TestCachingSecretManager(t *testing.T) {
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
secretStore := newSecretStore(fakeClient, clock.RealClock{}, 0) secretStore := newSecretStore(fakeClient, clock.RealClock{}, noObjectTTL, 0)
manager := &cachingSecretManager{ manager := &cachingSecretManager{
secretStore: secretStore, secretStore: secretStore,
registeredPods: make(map[objectKey]*v1.Pod), registeredPods: make(map[objectKey]*v1.Pod),

View File

@ -264,6 +264,13 @@ func init() {
eventsRule(), eventsRule(),
}, },
}) })
addControllerRole(rbac.ClusterRole{
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "ttl-controller"},
Rules: []rbac.PolicyRule{
rbac.NewRule("update", "patch", "list", "watch").Groups(legacyGroup).Resources("nodes").RuleOrDie(),
eventsRule(),
},
})
addControllerRole(rbac.ClusterRole{ addControllerRole(rbac.ClusterRole{
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "certificate-controller"}, ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "certificate-controller"},
Rules: []rbac.PolicyRule{ Rules: []rbac.PolicyRule{

View File

@ -315,5 +315,20 @@ items:
- kind: ServiceAccount - kind: ServiceAccount
name: statefulset-controller name: statefulset-controller
namespace: kube-system namespace: kube-system
- apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
creationTimestamp: null
labels:
kubernetes.io/bootstrapping: rbac-defaults
name: system:controller:ttl-controller
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: system:controller:ttl-controller
subjects:
- kind: ServiceAccount
name: ttl-controller
namespace: kube-system
kind: List kind: List
metadata: {} metadata: {}

View File

@ -905,5 +905,30 @@ items:
- create - create
- patch - patch
- update - update
- apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
creationTimestamp: null
labels:
kubernetes.io/bootstrapping: rbac-defaults
name: system:controller:ttl-controller
rules:
- apiGroups:
- ""
resources:
- nodes
verbs:
- list
- patch
- update
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
- update
kind: List kind: List
metadata: {} metadata: {}

View File

@ -0,0 +1,141 @@
// +build integration,!no-etcd
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ttlcontroller
import (
"fmt"
"net/http/httptest"
"strconv"
"sync"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated"
listers "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/controller/ttl"
"k8s.io/kubernetes/test/integration/framework"
)
func createClientAndInformers(t *testing.T, server *httptest.Server) (*clientset.Clientset, informers.SharedInformerFactory) {
config := restclient.Config{
Host: server.URL,
QPS: 500,
Burst: 500,
}
testClient := clientset.NewForConfigOrDie(&config)
informers := informers.NewSharedInformerFactory(nil, testClient, time.Second)
return testClient, informers
}
func createNodes(t *testing.T, client *clientset.Clientset, startIndex, endIndex int) {
var wg sync.WaitGroup
for i := startIndex; i < endIndex; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("node-%d", idx),
},
}
if _, err := client.Core().Nodes().Create(node); err != nil {
t.Fatalf("Failed to create node: %v", err)
}
}(i)
}
wg.Wait()
}
func deleteNodes(t *testing.T, client *clientset.Clientset, startIndex, endIndex int) {
var wg sync.WaitGroup
for i := startIndex; i < endIndex; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
name := fmt.Sprintf("node-%d", idx)
if err := client.Core().Nodes().Delete(name, &metav1.DeleteOptions{}); err != nil {
t.Fatalf("Failed to create node: %v", err)
}
}(i)
}
wg.Wait()
}
func waitForNodesWithTTLAnnotation(t *testing.T, nodeLister listers.NodeLister, numNodes, ttlSeconds int) {
if err := wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
nodes, err := nodeLister.List(labels.Everything())
if err != nil || len(nodes) != numNodes {
return false, nil
}
for _, node := range nodes {
if node.Annotations == nil {
return false, nil
}
value, ok := node.Annotations[v1.ObjectTTLAnnotationKey]
if !ok {
return false, nil
}
currentTTL, err := strconv.Atoi(value)
if err != nil || currentTTL != ttlSeconds {
return false, nil
}
}
return true, nil
}); err != nil {
t.Fatalf("Failed waiting for all nodes with annotation: %v", err)
}
}
// Test whether ttlcontroller sets correct ttl annotations.
func TestTTLAnnotations(t *testing.T) {
_, server := framework.RunAMaster(nil)
defer server.Close()
testClient, informers := createClientAndInformers(t, server)
nodeInformer := informers.Core().V1().Nodes()
ttlc := ttl.NewTTLController(nodeInformer, testClient)
stopCh := make(chan struct{})
defer close(stopCh)
go nodeInformer.Informer().Run(stopCh)
go ttlc.Run(1, stopCh)
// Create 100 nodes all should have annotation equal to 0.
createNodes(t, testClient, 0, 100)
waitForNodesWithTTLAnnotation(t, informers.Core().V1().Nodes().Lister(), 100, 0)
// Create 1 more node, all annotation should change to 15.
createNodes(t, testClient, 100, 101)
waitForNodesWithTTLAnnotation(t, informers.Core().V1().Nodes().Lister(), 101, 15)
// Delete 11 nodes, it should still remain at the level of 15.
deleteNodes(t, testClient, 90, 101)
waitForNodesWithTTLAnnotation(t, informers.Core().V1().Nodes().Lister(), 90, 15)
// Delete 1 more node, all should be decreased to 0.
deleteNodes(t, testClient, 89, 90)
waitForNodesWithTTLAnnotation(t, informers.Core().V1().Nodes().Lister(), 89, 0)
}