diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 355b3dc6821..83dedabd136 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -238,10 +238,13 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig if err != nil { glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err) } - nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), + nodeController, err := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), s.PodEvictionTimeout.Duration, s.DeletingPodsQps, s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) + if err != nil { + glog.Fatalf("Failed to initialize nodecontroller: %v", err) + } nodeController.Run(s.NodeSyncPeriod.Duration) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index 44076ace6d8..7adad2caf79 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -153,9 +153,12 @@ func (s *CMServer) Run(_ []string) error { } _, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR) _, serviceCIDR, _ := net.ParseCIDR(s.ServiceCIDR) - nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), + nodeController, err := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), s.PodEvictionTimeout.Duration, s.DeletingPodsQps, s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) + if err != nil { + glog.Fatalf("Failed to initialize nodecontroller: %v", err) + } nodeController.Run(s.NodeSyncPeriod.Duration) nodeStatusUpdaterController := node.NewStatusUpdater(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-status-controller")), s.NodeMonitorPeriod.Duration, time.Now) diff --git a/pkg/controller/node/cidr_allocator.go b/pkg/controller/node/cidr_allocator.go index 4af5748aa9d..66cb47b5f5f 100644 --- a/pkg/controller/node/cidr_allocator.go +++ b/pkg/controller/node/cidr_allocator.go @@ -64,7 +64,9 @@ type rangeAllocator struct { // NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node // Caller must ensure subNetMaskSize is not less than cluster CIDR mask size. -func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int) CIDRAllocator { +// Caller must always pass in a list of existing nodes so the new allocator +// can initialize its CIDR map. NodeList is only nil in testing. +func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int, nodeList *api.NodeList) (CIDRAllocator, error) { eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "cidrAllocator"}) eventBroadcaster.StartLogging(glog.Infof) @@ -82,6 +84,24 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s } else { glog.V(0).Info("No Service CIDR provided. Skipping filtering out service addresses.") } + + if nodeList != nil { + for _, node := range nodeList.Items { + if node.Spec.PodCIDR == "" { + glog.Infof("Node %v has no CIDR, ignoring", node.Name) + continue + } else { + glog.Infof("Node %v has CIDR %s, occupying it in CIDR map", node.Name, node.Spec.PodCIDR) + } + if err := ra.occupyCIDR(&node); err != nil { + // This will happen if: + // 1. We find garbage in the podCIDR field. Retrying is useless. + // 2. CIDR out of range: This means a node CIDR has changed. + // This error will keep crashing controller-manager. + return nil, err + } + } + } for i := 0; i < cidrUpdateWorkers; i++ { go func(stopChan <-chan struct{}) { for { @@ -99,21 +119,28 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s }(wait.NeverStop) } - return ra + return ra, nil +} + +func (r *rangeAllocator) occupyCIDR(node *api.Node) error { + if node.Spec.PodCIDR == "" { + return nil + } + _, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR) + if err != nil { + return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR) + } + if err := r.cidrs.occupy(podCIDR); err != nil { + return fmt.Errorf("failed to mark cidr as occupied: %v", err) + } + return nil } // AllocateOrOccupyCIDR looks at the given node, assigns it a valid CIDR // if it doesn't currently have one or mark the CIDR as used if the node already have one. func (r *rangeAllocator) AllocateOrOccupyCIDR(node *api.Node) error { if node.Spec.PodCIDR != "" { - _, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR) - if err != nil { - return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR) - } - if err := r.cidrs.occupy(podCIDR); err != nil { - return fmt.Errorf("failed to mark cidr as occupied: %v", err) - } - return nil + return r.occupyCIDR(node) } podCIDR, err := r.cidrs.allocateNext() if err != nil { diff --git a/pkg/controller/node/cidr_allocator_test.go b/pkg/controller/node/cidr_allocator_test.go index 4d946fd60f6..169448d9c77 100644 --- a/pkg/controller/node/cidr_allocator_test.go +++ b/pkg/controller/node/cidr_allocator_test.go @@ -128,7 +128,7 @@ func TestAllocateOrOccupyCIDRSuccess(t *testing.T) { expectedAllocatedCIDR string allocatedCIDRs []string }) { - allocator := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize) + allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil) // this is a bit of white box testing for _, allocated := range tc.allocatedCIDRs { _, cidr, err := net.ParseCIDR(allocated) @@ -209,7 +209,7 @@ func TestAllocateOrOccupyCIDRFailure(t *testing.T) { subNetMaskSize int allocatedCIDRs []string }) { - allocator := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize) + allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil) // this is a bit of white box testing for _, allocated := range tc.allocatedCIDRs { _, cidr, err := net.ParseCIDR(allocated) @@ -320,7 +320,7 @@ func TestReleaseCIDRSuccess(t *testing.T) { allocatedCIDRs []string cidrsToRelease []string }) { - allocator := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize) + allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil) // this is a bit of white box testing for _, allocated := range tc.allocatedCIDRs { _, cidr, err := net.ParseCIDR(allocated) diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index face4bc5434..7fded001aad 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -34,6 +34,7 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/flowcontrol" @@ -58,6 +59,8 @@ const ( nodeEvictionPeriod = 100 * time.Millisecond // Burst value for all eviction rate limiters evictionRateLimiterBurst = 1 + // The amount of time the nodecontroller polls on the list nodes endpoint. + apiserverStartupGracePeriod = 10 * time.Minute ) type zoneState string @@ -140,6 +143,9 @@ type NodeController struct { } // NewNodeController returns a new node controller to sync instances from cloudprovider. +// This method returns an error if it is unable to initialize the CIDR bitmap with +// podCIDRs it has already allocated to nodes. Since we don't allow podCIDR changes +// currently, this should be handled as a fatal error. func NewNodeController( cloud cloudprovider.Interface, kubeClient clientset.Interface, @@ -151,7 +157,7 @@ func NewNodeController( clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, nodeCIDRMaskSize int, - allocateNodeCIDRs bool) *NodeController { + allocateNodeCIDRs bool) (*NodeController, error) { eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"}) eventBroadcaster.StartLogging(glog.Infof) @@ -277,10 +283,30 @@ func NewNodeController( ) if allocateNodeCIDRs { - nc.cidrAllocator = NewCIDRRangeAllocator(kubeClient, clusterCIDR, serviceCIDR, nodeCIDRMaskSize) + var nodeList *api.NodeList + var err error + // We must poll because apiserver might not be up. This error causes + // controller manager to restart. + if pollErr := wait.Poll(10*time.Second, apiserverStartupGracePeriod, func() (bool, error) { + nodeList, err = kubeClient.Core().Nodes().List(api.ListOptions{ + FieldSelector: fields.Everything(), + LabelSelector: labels.Everything(), + }) + if err != nil { + glog.Errorf("Failed to list all nodes: %v", err) + return false, nil + } + return true, nil + }); pollErr != nil { + return nil, fmt.Errorf("Failed to list all nodes in %v, cannot proceed without updating CIDR map", apiserverStartupGracePeriod) + } + nc.cidrAllocator, err = NewCIDRRangeAllocator(kubeClient, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, nodeList) + if err != nil { + return nil, err + } } - return nc + return nc, nil } // Run starts an asynchronous loop that monitors the status of cluster nodes. diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index 48b73a210c5..67d3d581df0 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -600,7 +600,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(nil, item.fakeNodeHandler, + nodeController, _ := NewNodeController(nil, item.fakeNodeHandler, evictionTimeout, testRateLimiterQPS, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.now = func() unversioned.Time { return fakeNow } @@ -673,7 +673,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) { Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node0")}}), deleteWaitChan: make(chan struct{}), } - nodeController := NewNodeController(nil, fnh, 10*time.Minute, + nodeController, _ := NewNodeController(nil, fnh, 10*time.Minute, testRateLimiterQPS, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) @@ -907,7 +907,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { } for i, item := range table { - nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, + nodeController, _ := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.now = func() unversioned.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1057,7 +1057,7 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) { } for i, item := range table { - nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, + nodeController, _ := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.now = func() unversioned.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1139,7 +1139,7 @@ func TestNodeDeletion(t *testing.T) { Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}), } - nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, + nodeController, _ := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.now = func() unversioned.Time { return fakeNow } @@ -1243,7 +1243,7 @@ func TestCheckPod(t *testing.T) { }, } - nc := NewNodeController(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false) + nc, _ := NewNodeController(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false) nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc) nc.nodeStore.Store.Add(&api.Node{ ObjectMeta: api.ObjectMeta{ @@ -1310,7 +1310,7 @@ func TestCleanupOrphanedPods(t *testing.T) { newPod("b", "bar"), newPod("c", "gone"), } - nc := NewNodeController(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false) + nc, _ := NewNodeController(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false) nc.nodeStore.Store.Add(newNode("foo")) nc.nodeStore.Store.Add(newNode("bar"))