diff --git a/pkg/controller/node/cidr_allocator.go b/pkg/controller/node/cidr_allocator.go index 4bb4a2503d9..f05792a1c43 100644 --- a/pkg/controller/node/cidr_allocator.go +++ b/pkg/controller/node/cidr_allocator.go @@ -42,6 +42,7 @@ type rangeAllocator struct { maxCIDRs int used big.Int lock sync.Mutex + nextCandidate int } // NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node @@ -56,6 +57,7 @@ func NewCIDRRangeAllocator(clusterCIDR *net.IPNet, subNetMaskSize int) CIDRAlloc clusterMaskSize: clusterMaskSize, subNetMaskSize: subNetMaskSize, maxCIDRs: 1 << uint32(subNetMaskSize-clusterMaskSize), + nextCandidate: 0, } return ra } @@ -66,14 +68,16 @@ func (r *rangeAllocator) AllocateNext() (*net.IPNet, error) { nextUnused := -1 for i := 0; i < r.maxCIDRs; i++ { - if r.used.Bit(i) == 0 { - nextUnused = i + candidate := (i + r.nextCandidate) % r.maxCIDRs + if r.used.Bit(candidate) == 0 { + nextUnused = candidate break } } if nextUnused == -1 { return nil, errCIDRRangeNoCIDRsRemaining } + r.nextCandidate = (nextUnused + 1) % r.maxCIDRs r.used.SetBit(&r.used, nextUnused, 1) @@ -138,7 +142,6 @@ func (r *rangeAllocator) Occupy(cidr *net.IPNet) (err error) { r.lock.Lock() defer r.lock.Unlock() - for i := begin; i <= end; i++ { r.used.SetBit(&r.used, i, 1) } diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index 2d3998bdc31..29423eec8bb 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -61,6 +61,9 @@ const ( podCIDRUpdateRetry = 5 // controls how often NodeController will try to evict Pods from non-responsive Nodes. nodeEvictionPeriod = 100 * time.Millisecond + // controlls how many NodeSpec updates NC can process in any moment. + cidrUpdateWorkers = 10 + cidrUpdateQueueSize = 5000 ) type nodeStatusData struct { @@ -69,6 +72,11 @@ type nodeStatusData struct { status api.NodeStatus } +type nodeAndCIDR struct { + nodeName string + cidr *net.IPNet +} + type NodeController struct { allocateNodeCIDRs bool cloud cloudprovider.Interface @@ -134,6 +142,8 @@ type NodeController struct { // It is enabled when all Nodes observed by the NodeController are NotReady and disabled // when NC sees any healthy Node. This is a temporary fix for v1.3. networkSegmentationMode bool + + nodeCIDRUpdateChannel chan nodeAndCIDR } // NewNodeController returns a new node controller to sync instances from cloudprovider. @@ -196,6 +206,7 @@ func NewNodeController( allocateNodeCIDRs: allocateNodeCIDRs, forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) }, nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) }, + nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize), } nc.podStore.Indexer, nc.podController = framework.NewIndexerInformer( @@ -340,6 +351,24 @@ func (nc *NodeController) Run(period time.Duration) { }, nodeEvictionPeriod, wait.NeverStop) go wait.Until(nc.cleanupOrphanedPods, 30*time.Second, wait.NeverStop) + + for i := 0; i < cidrUpdateWorkers; i++ { + go func(stopChan <-chan struct{}) { + for { + select { + case workItem, ok := <-nc.nodeCIDRUpdateChannel: + if !ok { + glog.Warning("NodeCIDRUpdateChannel read returned false.") + return + } + nc.updateCIDRAllocation(workItem) + case <-stopChan: + glog.V(0).Info("StopChannel is closed.") + return + } + } + }(wait.NeverStop) + } } func (nc *NodeController) filterOutServiceRange() { @@ -352,11 +381,34 @@ func (nc *NodeController) filterOutServiceRange() { } } +func (nc *NodeController) updateCIDRAllocation(data nodeAndCIDR) { + var err error + var node *api.Node + for rep := 0; rep < podCIDRUpdateRetry; rep++ { + node, err = nc.kubeClient.Core().Nodes().Get(data.nodeName) + if err != nil { + glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", data.nodeName, err) + continue + } + node.Spec.PodCIDR = data.cidr.String() + if _, err := nc.kubeClient.Core().Nodes().Update(node); err != nil { + glog.Errorf("Failed while updating Node.Spec.PodCIDR (%d retries left): %v", podCIDRUpdateRetry-rep-1, err) + } else { + break + } + } + if err != nil { + nc.recordNodeStatusChange(node, "CIDRAssignmentFailed") + glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err) + err := nc.cidrAllocator.Release(data.cidr) + glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, err) + } +} + // allocateOrOccupyCIDR looks at each new observed node, assigns it a valid CIDR // if it doesn't currently have one or mark the CIDR as used if the node already have one. func (nc *NodeController) allocateOrOccupyCIDR(obj interface{}) { node := obj.(*api.Node) - if node.Spec.PodCIDR != "" { _, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR) if err != nil { @@ -369,31 +421,16 @@ func (nc *NodeController) allocateOrOccupyCIDR(obj interface{}) { } return } - podCIDR, err := nc.cidrAllocator.AllocateNext() if err != nil { nc.recordNodeStatusChange(node, "CIDRNotAvailable") return } - glog.V(4).Infof("Assigning node %s CIDR %s", node.Name, podCIDR) - for rep := 0; rep < podCIDRUpdateRetry; rep++ { - node.Spec.PodCIDR = podCIDR.String() - if _, err := nc.kubeClient.Core().Nodes().Update(node); err != nil { - glog.Errorf("Failed while updating Node.Spec.PodCIDR (%d retries left): %v", podCIDRUpdateRetry-rep-1, err) - } else { - break - } - node, err = nc.kubeClient.Core().Nodes().Get(node.Name) - if err != nil { - glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", node.Name, err) - break - } - } - if err != nil { - glog.Errorf("Update PodCIDR of node %v from NodeController exceeds retry count.", node.Name) - nc.recordNodeStatusChange(node, "CIDRAssignmentFailed") - glog.Errorf("CIDR assignment for node %v failed: %v", node.Name, err) + glog.V(4).Infof("Putting node %s with CIDR %s into the work queue", node.Name, podCIDR) + nc.nodeCIDRUpdateChannel <- nodeAndCIDR{ + nodeName: node.Name, + cidr: podCIDR, } } @@ -450,6 +487,7 @@ func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) { // delete terminating pods that have been scheduled on // nonexistent nodes if !found { + glog.Warningf("Unable to find Node: %v, deleting all assigned Pods.", pod.Spec.NodeName) utilruntime.HandleError(nc.forcefullyDeletePod(pod)) return }