From 6d77f53af438ea3991c6f605bce29eb5e7f868b1 Mon Sep 17 00:00:00 2001 From: Seth Jennings Date: Wed, 20 Jul 2016 14:59:33 -0500 Subject: [PATCH 1/2] refactor maybeDeleteTerminatingPod --- pkg/controller/node/controller_utils.go | 12 ++++++------ pkg/controller/node/nodecontroller.go | 4 ++-- pkg/controller/node/nodecontroller_test.go | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/controller/node/controller_utils.go b/pkg/controller/node/controller_utils.go index 3d4eb5aa840..215b811cf05 100644 --- a/pkg/controller/node/controller_utils.go +++ b/pkg/controller/node/controller_utils.go @@ -139,7 +139,7 @@ func forcefullyDeleteNode(kubeClient clientset.Interface, nodeName string, force // maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating // that should not be gracefully terminated. -func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}, nodeStore cache.Store, forcefulDeletePodFunc func(*api.Pod) error) { +func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) { pod, ok := obj.(*api.Pod) if !ok { return @@ -152,11 +152,11 @@ func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}, nodeStore c // delete terminating pods that have not yet been scheduled if len(pod.Spec.NodeName) == 0 { - utilruntime.HandleError(forcefulDeletePodFunc(pod)) + utilruntime.HandleError(nc.forcefullyDeletePod(pod)) return } - nodeObj, found, err := nodeStore.GetByKey(pod.Spec.NodeName) + nodeObj, found, err := nc.nodeStore.Store.GetByKey(pod.Spec.NodeName) if err != nil { // this can only happen if the Store.KeyFunc has a problem creating // a key for the pod. If it happens once, it will happen again so @@ -169,7 +169,7 @@ func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}, nodeStore c // nonexistent nodes if !found { glog.Warningf("Unable to find Node: %v, deleting all assigned Pods.", pod.Spec.NodeName) - utilruntime.HandleError(forcefulDeletePodFunc(pod)) + utilruntime.HandleError(nc.forcefullyDeletePod(pod)) return } @@ -182,11 +182,11 @@ func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}, nodeStore c v, err := version.Parse(node.Status.NodeInfo.KubeletVersion) if err != nil { glog.V(0).Infof("couldn't parse verions %q of minion: %v", node.Status.NodeInfo.KubeletVersion, err) - utilruntime.HandleError(forcefulDeletePodFunc(pod)) + utilruntime.HandleError(nc.forcefullyDeletePod(pod)) return } if gracefulDeletionVersion.GT(v) { - utilruntime.HandleError(forcefulDeletePodFunc(pod)) + utilruntime.HandleError(nc.forcefullyDeletePod(pod)) return } } diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index b7a96fc4989..ccd4a9f1c24 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -222,10 +222,10 @@ func NewNodeController( controller.NoResyncPeriodFunc(), framework.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - nc.maybeDeleteTerminatingPod(obj, nc.nodeStore.Store, nc.forcefullyDeletePod) + nc.maybeDeleteTerminatingPod(obj) }, UpdateFunc: func(_, obj interface{}) { - nc.maybeDeleteTerminatingPod(obj, nc.nodeStore.Store, nc.forcefullyDeletePod) + nc.maybeDeleteTerminatingPod(obj) }, }, // We don't need to build a index for podStore here actually, but build one for consistency. diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index 67d3d581df0..82ee932e826 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -1288,12 +1288,12 @@ func TestCheckPod(t *testing.T) { for i, tc := range tcs { var deleteCalls int - forcefullyDeletePodsFunc := func(_ *api.Pod) error { + nc.forcefullyDeletePod = func(_ *api.Pod) error { deleteCalls++ return nil } - nc.maybeDeleteTerminatingPod(&tc.pod, nc.nodeStore.Store, forcefullyDeletePodsFunc) + nc.maybeDeleteTerminatingPod(&tc.pod) if tc.prune && deleteCalls != 1 { t.Errorf("[%v] expected number of delete calls to be 1 but got %v", i, deleteCalls) From db6026c82a8f7ce5a19e9aeb327e517d95c0ad7c Mon Sep 17 00:00:00 2001 From: Seth Jennings Date: Wed, 20 Jul 2016 15:26:07 -0500 Subject: [PATCH 2/2] node controller use shared pod informer --- .../app/controllermanager.go | 2 +- .../controllermanager/controllermanager.go | 2 +- pkg/controller/node/nodecontroller.go | 66 ++++++++++++------- pkg/controller/node/nodecontroller_test.go | 14 ++-- 4 files changed, 50 insertions(+), 34 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 5bc186de941..6df33132166 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -239,7 +239,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig if err != nil { glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err) } - nodeController, err := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), + nodeController, err := nodecontroller.NewNodeController(podInformer, cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), s.PodEvictionTimeout.Duration, s.DeletingPodsQps, s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index 7adad2caf79..7f555a0c118 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -153,7 +153,7 @@ func (s *CMServer) Run(_ []string) error { } _, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR) _, serviceCIDR, _ := net.ParseCIDR(s.ServiceCIDR) - nodeController, err := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), + nodeController, err := nodecontroller.NewNodeControllerFromClient(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), s.PodEvictionTimeout.Duration, s.DeletingPodsQps, s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) if err != nil { diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index ccd4a9f1c24..b57aba7b5b3 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -34,6 +34,7 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" @@ -124,7 +125,7 @@ type NodeController struct { maximumGracePeriod time.Duration recorder record.EventRecorder // Pod framework and store - podController *framework.Controller + podController framework.ControllerInterface podStore cache.StoreToPodLister // Node framework and store nodeController *framework.Controller @@ -140,6 +141,13 @@ type NodeController struct { computeZoneStateFunc func(nodeConditions []*api.NodeCondition) zoneState zoneStates map[string]zoneState + + // internalPodInformer is used to hold a personal informer. If we're using + // a normal shared informer, then the informer will be started for us. If + // we have a personal informer, we must start it ourselves. If you start + // the controller using NewDaemonSetsController(passing SharedInformer), this + // will be null + internalPodInformer framework.SharedIndexInformer } // NewNodeController returns a new node controller to sync instances from cloudprovider. @@ -147,6 +155,7 @@ type NodeController struct { // podCIDRs it has already allocated to nodes. Since we don't allow podCIDR changes // currently, this should be handled as a fatal error. func NewNodeController( + podInformer framework.SharedIndexInformer, cloud cloudprovider.Interface, kubeClient clientset.Interface, podEvictionTimeout time.Duration, @@ -209,30 +218,12 @@ func NewNodeController( zoneStates: make(map[string]zoneState), } - nc.podStore.Indexer, nc.podController = framework.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return nc.kubeClient.Core().Pods(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return nc.kubeClient.Core().Pods(api.NamespaceAll).Watch(options) - }, - }, - &api.Pod{}, - controller.NoResyncPeriodFunc(), - framework.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - nc.maybeDeleteTerminatingPod(obj) - }, - UpdateFunc: func(_, obj interface{}) { - nc.maybeDeleteTerminatingPod(obj) - }, - }, - // We don't need to build a index for podStore here actually, but build one for consistency. - // It will ensure that if people start making use of the podStore in more specific ways, - // they'll get the benefits they expect. It will also reserve the name for future refactorings. - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) + podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{ + AddFunc: nc.maybeDeleteTerminatingPod, + UpdateFunc: func(_, obj interface{}) { nc.maybeDeleteTerminatingPod(obj) }, + }) + nc.podStore.Indexer = podInformer.GetIndexer() + nc.podController = podInformer.GetController() nodeEventHandlerFuncs := framework.ResourceEventHandlerFuncs{} if nc.allocateNodeCIDRs { @@ -337,11 +328,36 @@ func NewNodeController( return nc, nil } +func NewNodeControllerFromClient( + cloud cloudprovider.Interface, + kubeClient clientset.Interface, + podEvictionTimeout time.Duration, + evictionLimiterQPS float32, + nodeMonitorGracePeriod time.Duration, + nodeStartupGracePeriod time.Duration, + nodeMonitorPeriod time.Duration, + clusterCIDR *net.IPNet, + serviceCIDR *net.IPNet, + nodeCIDRMaskSize int, + allocateNodeCIDRs bool) (*NodeController, error) { + podInformer := informers.CreateSharedPodIndexInformer(kubeClient, controller.NoResyncPeriodFunc()) + nc, err := NewNodeController(podInformer, cloud, kubeClient, podEvictionTimeout, evictionLimiterQPS, nodeMonitorGracePeriod, + nodeStartupGracePeriod, nodeMonitorPeriod, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, allocateNodeCIDRs) + if err != nil { + return nil, err + } + nc.internalPodInformer = podInformer + return nc, nil +} + // Run starts an asynchronous loop that monitors the status of cluster nodes. func (nc *NodeController) Run(period time.Duration) { go nc.nodeController.Run(wait.NeverStop) go nc.podController.Run(wait.NeverStop) go nc.daemonSetController.Run(wait.NeverStop) + if nc.internalPodInformer != nil { + nc.internalPodInformer.Run(wait.NeverStop) + } // Incorporate the results of node status pushed from kubelet to master. go wait.Until(func() { diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index 82ee932e826..ededa280253 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -600,7 +600,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { } for _, item := range table { - nodeController, _ := NewNodeController(nil, item.fakeNodeHandler, + nodeController, _ := NewNodeControllerFromClient(nil, item.fakeNodeHandler, evictionTimeout, testRateLimiterQPS, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.now = func() unversioned.Time { return fakeNow } @@ -673,7 +673,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) { Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node0")}}), deleteWaitChan: make(chan struct{}), } - nodeController, _ := NewNodeController(nil, fnh, 10*time.Minute, + nodeController, _ := NewNodeControllerFromClient(nil, fnh, 10*time.Minute, testRateLimiterQPS, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) @@ -907,7 +907,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { } for i, item := range table { - nodeController, _ := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, + nodeController, _ := NewNodeControllerFromClient(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.now = func() unversioned.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1057,7 +1057,7 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) { } for i, item := range table { - nodeController, _ := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, + nodeController, _ := NewNodeControllerFromClient(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.now = func() unversioned.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1139,7 +1139,7 @@ func TestNodeDeletion(t *testing.T) { Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}), } - nodeController, _ := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, + nodeController, _ := NewNodeControllerFromClient(nil, fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.now = func() unversioned.Time { return fakeNow } @@ -1243,7 +1243,7 @@ func TestCheckPod(t *testing.T) { }, } - nc, _ := NewNodeController(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false) + nc, _ := NewNodeControllerFromClient(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false) nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc) nc.nodeStore.Store.Add(&api.Node{ ObjectMeta: api.ObjectMeta{ @@ -1310,7 +1310,7 @@ func TestCleanupOrphanedPods(t *testing.T) { newPod("b", "bar"), newPod("c", "gone"), } - nc, _ := NewNodeController(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false) + nc, _ := NewNodeControllerFromClient(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false) nc.nodeStore.Store.Add(newNode("foo")) nc.nodeStore.Store.Add(newNode("bar"))