diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 63e89a7c61f..660e7e6b2d8 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -240,7 +240,9 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, stop < if err != nil { glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err) } - nodeController, err := nodecontroller.NewNodeController(sharedInformers.Pods().Informer(), cloud, client("node-controller"), + nodeController, err := nodecontroller.NewNodeController( + sharedInformers.Pods(), sharedInformers.Nodes(), sharedInformers.DaemonSets(), + cloud, client("node-controller"), s.PodEvictionTimeout.Duration, s.NodeEvictionRate, s.SecondaryNodeEvictionRate, s.LargeClusterSizeThreshold, s.UnhealthyZoneThreshold, s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index 3334effd6ef..8b2720c53c7 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -26,17 +26,14 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" - "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/cloudprovider" - "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/metrics" @@ -45,7 +42,6 @@ import ( "k8s.io/kubernetes/pkg/util/system" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/version" - "k8s.io/kubernetes/pkg/watch" ) func init() { @@ -135,15 +131,9 @@ type NodeController struct { // The maximum duration before a pod evicted from a node can be forcefully terminated. maximumGracePeriod time.Duration recorder record.EventRecorder - // Pod framework and store - podController cache.ControllerInterface - podStore cache.StoreToPodLister - // Node framework and store - nodeController *cache.Controller - nodeStore cache.StoreToNodeLister - // DaemonSet framework and store - daemonSetController *cache.Controller - daemonSetStore cache.StoreToDaemonSetLister + podStore cache.StoreToPodLister + nodeStore cache.StoreToNodeLister + daemonSetStore cache.StoreToDaemonSetLister // allocate/recycle CIDRs for node if allocateNodeCIDRs == true cidrAllocator CIDRAllocator @@ -172,7 +162,9 @@ type NodeController struct { // podCIDRs it has already allocated to nodes. Since we don't allow podCIDR changes // currently, this should be handled as a fatal error. func NewNodeController( - podInformer cache.SharedIndexInformer, + podInformer informers.PodInformer, + nodeInformer informers.NodeInformer, + daemonSetInformer informers.DaemonSetInformer, cloud cloudprovider.Interface, kubeClient clientset.Interface, podEvictionTimeout time.Duration, @@ -241,21 +233,25 @@ func NewNodeController( nc.enterFullDisruptionFunc = nc.HealthyQPSFunc nc.computeZoneStateFunc = nc.ComputeZoneState - podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: nc.maybeDeleteTerminatingPod, UpdateFunc: func(_, obj interface{}) { nc.maybeDeleteTerminatingPod(obj) }, }) - nc.podStore.Indexer = podInformer.GetIndexer() - nc.podController = podInformer.GetController() + nc.podStore = *podInformer.Lister() nodeEventHandlerFuncs := cache.ResourceEventHandlerFuncs{} if nc.allocateNodeCIDRs { nodeEventHandlerFuncs = cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - node := obj.(*api.Node) - err := nc.cidrAllocator.AllocateOrOccupyCIDR(node) + AddFunc: func(originalObj interface{}) { + obj, err := api.Scheme.DeepCopy(originalObj) if err != nil { - glog.Errorf("Error allocating CIDR: %v", err) + utilruntime.HandleError(err) + return + } + node := obj.(*api.Node) + + if err := nc.cidrAllocator.AllocateOrOccupyCIDR(node); err != nil { + utilruntime.HandleError(fmt.Errorf("Error allocating CIDR: %v", err)) } }, UpdateFunc: func(_, obj interface{}) { @@ -280,49 +276,36 @@ func NewNodeController( // state is correct. // Restart of NC fixes the issue. if node.Spec.PodCIDR == "" { - err := nc.cidrAllocator.AllocateOrOccupyCIDR(node) + nodeCopy, err := api.Scheme.Copy(node) if err != nil { - glog.Errorf("Error allocating CIDR: %v", err) + utilruntime.HandleError(err) + return + } + + if err := nc.cidrAllocator.AllocateOrOccupyCIDR(nodeCopy.(*api.Node)); err != nil { + utilruntime.HandleError(fmt.Errorf("Error allocating CIDR: %v", err)) } } }, - DeleteFunc: func(obj interface{}) { - node := obj.(*api.Node) - err := nc.cidrAllocator.ReleaseCIDR(node) + DeleteFunc: func(originalObj interface{}) { + obj, err := api.Scheme.DeepCopy(originalObj) if err != nil { + utilruntime.HandleError(err) + return + } + + node := obj.(*api.Node) + if err := nc.cidrAllocator.ReleaseCIDR(node); err != nil { glog.Errorf("Error releasing CIDR: %v", err) } }, } } - nc.nodeStore.Store, nc.nodeController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return nc.kubeClient.Core().Nodes().List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return nc.kubeClient.Core().Nodes().Watch(options) - }, - }, - &api.Node{}, - controller.NoResyncPeriodFunc(), - nodeEventHandlerFuncs, - ) + nodeInformer.Informer().AddEventHandler(nodeEventHandlerFuncs) + nc.nodeStore = *nodeInformer.Lister() - nc.daemonSetStore.Store, nc.daemonSetController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return nc.kubeClient.Extensions().DaemonSets(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return nc.kubeClient.Extensions().DaemonSets(api.NamespaceAll).Watch(options) - }, - }, - &extensions.DaemonSet{}, - controller.NoResyncPeriodFunc(), - cache.ResourceEventHandlerFuncs{}, - ) + nc.daemonSetStore = *daemonSetInformer.Lister() if allocateNodeCIDRs { var nodeList *api.NodeList @@ -351,41 +334,8 @@ func NewNodeController( return nc, nil } -func NewNodeControllerFromClient( - cloud cloudprovider.Interface, - kubeClient clientset.Interface, - podEvictionTimeout time.Duration, - evictionLimiterQPS float32, - secondaryEvictionLimiterQPS float32, - largeClusterThreshold int32, - unhealthyZoneThreshold float32, - nodeMonitorGracePeriod time.Duration, - nodeStartupGracePeriod time.Duration, - nodeMonitorPeriod time.Duration, - clusterCIDR *net.IPNet, - serviceCIDR *net.IPNet, - nodeCIDRMaskSize int, - allocateNodeCIDRs bool) (*NodeController, error) { - podInformer := informers.NewPodInformer(kubeClient, controller.NoResyncPeriodFunc()) - nc, err := NewNodeController(podInformer, cloud, kubeClient, podEvictionTimeout, evictionLimiterQPS, secondaryEvictionLimiterQPS, - largeClusterThreshold, unhealthyZoneThreshold, nodeMonitorGracePeriod, nodeStartupGracePeriod, nodeMonitorPeriod, clusterCIDR, - serviceCIDR, nodeCIDRMaskSize, allocateNodeCIDRs) - if err != nil { - return nil, err - } - nc.internalPodInformer = podInformer - return nc, nil -} - // Run starts an asynchronous loop that monitors the status of cluster nodes. func (nc *NodeController) Run() { - go nc.nodeController.Run(wait.NeverStop) - go nc.podController.Run(wait.NeverStop) - go nc.daemonSetController.Run(wait.NeverStop) - if nc.internalPodInformer != nil { - go nc.internalPodInformer.Run(wait.NeverStop) - } - // Incorporate the results of node status pushed from kubelet to master. go wait.Until(func() { if err := nc.monitorNodeStatus(); err != nil { diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index a578d36b24e..8449ccd1919 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -17,6 +17,7 @@ limitations under the License. package node import ( + "net" "strings" "testing" "time" @@ -26,8 +27,12 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/cache" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + "k8s.io/kubernetes/pkg/cloudprovider" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/diff" "k8s.io/kubernetes/pkg/util/wait" @@ -42,6 +47,34 @@ const ( testUnhealtyThreshold = float32(0.55) ) +func NewNodeControllerFromClient( + cloud cloudprovider.Interface, + kubeClient clientset.Interface, + podEvictionTimeout time.Duration, + evictionLimiterQPS float32, + secondaryEvictionLimiterQPS float32, + largeClusterThreshold int32, + unhealthyZoneThreshold float32, + nodeMonitorGracePeriod time.Duration, + nodeStartupGracePeriod time.Duration, + nodeMonitorPeriod time.Duration, + clusterCIDR *net.IPNet, + serviceCIDR *net.IPNet, + nodeCIDRMaskSize int, + allocateNodeCIDRs bool) (*NodeController, error) { + + factory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) + + nc, err := NewNodeController(factory.Pods(), factory.Nodes(), factory.DaemonSets(), cloud, kubeClient, podEvictionTimeout, evictionLimiterQPS, secondaryEvictionLimiterQPS, + largeClusterThreshold, unhealthyZoneThreshold, nodeMonitorGracePeriod, nodeStartupGracePeriod, nodeMonitorPeriod, clusterCIDR, + serviceCIDR, nodeCIDRMaskSize, allocateNodeCIDRs) + if err != nil { + return nil, err + } + + return nc, nil +} + func TestMonitorNodeStatusEvictPods(t *testing.T) { fakeNow := unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) evictionTimeout := 10 * time.Minute @@ -1701,7 +1734,7 @@ func TestCheckPod(t *testing.T) { }, } - nc, _ := NewNodeControllerFromClient(nil, nil, 0, 0, 0, 0, 0, 0, 0, 0, nil, nil, 0, false) + nc, _ := NewNodeControllerFromClient(nil, fake.NewSimpleClientset(), 0, 0, 0, 0, 0, 0, 0, 0, nil, nil, 0, false) nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc) nc.nodeStore.Store.Add(&api.Node{ ObjectMeta: api.ObjectMeta{ diff --git a/pkg/controller/node/test_utils.go b/pkg/controller/node/test_utils.go index 72f92dab678..062294a01c1 100644 --- a/pkg/controller/node/test_utils.go +++ b/pkg/controller/node/test_utils.go @@ -202,7 +202,7 @@ func (m *FakeNodeHandler) PatchStatus(nodeName string, data []byte) (*api.Node, } func (m *FakeNodeHandler) Watch(opts api.ListOptions) (watch.Interface, error) { - return nil, nil + return watch.NewFake(), nil } func (m *FakeNodeHandler) Patch(name string, pt api.PatchType, data []byte, subresources ...string) (*api.Node, error) { diff --git a/pkg/generated/openapi/zz_generated.openapi.go b/pkg/generated/openapi/zz_generated.openapi.go index ec362fcdb79..927a85a7744 100644 --- a/pkg/generated/openapi/zz_generated.openapi.go +++ b/pkg/generated/openapi/zz_generated.openapi.go @@ -9148,7 +9148,7 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{ "v1.Node": { Schema: spec.Schema{ SchemaProps: spec.SchemaProps{ - Description: "Node is a worker node in Kubernetes, formerly known as minion. Each node will have a unique identifier in the cache (i.e. in etcd).", + Description: "Node is a worker node in Kubernetes. Each node will have a unique identifier in the cache (i.e. in etcd).", Properties: map[string]spec.Schema{ "metadata": { SchemaProps: spec.SchemaProps{