AllocateOrOccupyCIDR returs quickly

This commit is contained in:
gmarek 2016-05-30 16:16:24 +02:00
parent d1277e34fd
commit 7cac170214
2 changed files with 64 additions and 23 deletions

View File

@ -42,6 +42,7 @@ type rangeAllocator struct {
maxCIDRs int maxCIDRs int
used big.Int used big.Int
lock sync.Mutex lock sync.Mutex
nextCandidate int
} }
// NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node // NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node
@ -56,6 +57,7 @@ func NewCIDRRangeAllocator(clusterCIDR *net.IPNet, subNetMaskSize int) CIDRAlloc
clusterMaskSize: clusterMaskSize, clusterMaskSize: clusterMaskSize,
subNetMaskSize: subNetMaskSize, subNetMaskSize: subNetMaskSize,
maxCIDRs: 1 << uint32(subNetMaskSize-clusterMaskSize), maxCIDRs: 1 << uint32(subNetMaskSize-clusterMaskSize),
nextCandidate: 0,
} }
return ra return ra
} }
@ -66,14 +68,16 @@ func (r *rangeAllocator) AllocateNext() (*net.IPNet, error) {
nextUnused := -1 nextUnused := -1
for i := 0; i < r.maxCIDRs; i++ { for i := 0; i < r.maxCIDRs; i++ {
if r.used.Bit(i) == 0 { candidate := (i + r.nextCandidate) % r.maxCIDRs
nextUnused = i if r.used.Bit(candidate) == 0 {
nextUnused = candidate
break break
} }
} }
if nextUnused == -1 { if nextUnused == -1 {
return nil, errCIDRRangeNoCIDRsRemaining return nil, errCIDRRangeNoCIDRsRemaining
} }
r.nextCandidate = (nextUnused + 1) % r.maxCIDRs
r.used.SetBit(&r.used, nextUnused, 1) r.used.SetBit(&r.used, nextUnused, 1)
@ -138,7 +142,6 @@ func (r *rangeAllocator) Occupy(cidr *net.IPNet) (err error) {
r.lock.Lock() r.lock.Lock()
defer r.lock.Unlock() defer r.lock.Unlock()
for i := begin; i <= end; i++ { for i := begin; i <= end; i++ {
r.used.SetBit(&r.used, i, 1) r.used.SetBit(&r.used, i, 1)
} }

View File

@ -61,6 +61,9 @@ const (
podCIDRUpdateRetry = 5 podCIDRUpdateRetry = 5
// controls how often NodeController will try to evict Pods from non-responsive Nodes. // controls how often NodeController will try to evict Pods from non-responsive Nodes.
nodeEvictionPeriod = 100 * time.Millisecond nodeEvictionPeriod = 100 * time.Millisecond
// controlls how many NodeSpec updates NC can process in any moment.
cidrUpdateWorkers = 10
cidrUpdateQueueSize = 5000
) )
type nodeStatusData struct { type nodeStatusData struct {
@ -69,6 +72,11 @@ type nodeStatusData struct {
status api.NodeStatus status api.NodeStatus
} }
type nodeAndCIDR struct {
nodeName string
cidr *net.IPNet
}
type NodeController struct { type NodeController struct {
allocateNodeCIDRs bool allocateNodeCIDRs bool
cloud cloudprovider.Interface cloud cloudprovider.Interface
@ -134,6 +142,8 @@ type NodeController struct {
// It is enabled when all Nodes observed by the NodeController are NotReady and disabled // It is enabled when all Nodes observed by the NodeController are NotReady and disabled
// when NC sees any healthy Node. This is a temporary fix for v1.3. // when NC sees any healthy Node. This is a temporary fix for v1.3.
networkSegmentationMode bool networkSegmentationMode bool
nodeCIDRUpdateChannel chan nodeAndCIDR
} }
// NewNodeController returns a new node controller to sync instances from cloudprovider. // NewNodeController returns a new node controller to sync instances from cloudprovider.
@ -196,6 +206,7 @@ func NewNodeController(
allocateNodeCIDRs: allocateNodeCIDRs, allocateNodeCIDRs: allocateNodeCIDRs,
forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) }, forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) },
nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) }, nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) },
nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize),
} }
nc.podStore.Indexer, nc.podController = framework.NewIndexerInformer( nc.podStore.Indexer, nc.podController = framework.NewIndexerInformer(
@ -340,6 +351,24 @@ func (nc *NodeController) Run(period time.Duration) {
}, nodeEvictionPeriod, wait.NeverStop) }, nodeEvictionPeriod, wait.NeverStop)
go wait.Until(nc.cleanupOrphanedPods, 30*time.Second, wait.NeverStop) go wait.Until(nc.cleanupOrphanedPods, 30*time.Second, wait.NeverStop)
for i := 0; i < cidrUpdateWorkers; i++ {
go func(stopChan <-chan struct{}) {
for {
select {
case workItem, ok := <-nc.nodeCIDRUpdateChannel:
if !ok {
glog.Warning("NodeCIDRUpdateChannel read returned false.")
return
}
nc.updateCIDRAllocation(workItem)
case <-stopChan:
glog.V(0).Info("StopChannel is closed.")
return
}
}
}(wait.NeverStop)
}
} }
func (nc *NodeController) filterOutServiceRange() { func (nc *NodeController) filterOutServiceRange() {
@ -352,11 +381,34 @@ func (nc *NodeController) filterOutServiceRange() {
} }
} }
func (nc *NodeController) updateCIDRAllocation(data nodeAndCIDR) {
var err error
var node *api.Node
for rep := 0; rep < podCIDRUpdateRetry; rep++ {
node, err = nc.kubeClient.Core().Nodes().Get(data.nodeName)
if err != nil {
glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", data.nodeName, err)
continue
}
node.Spec.PodCIDR = data.cidr.String()
if _, err := nc.kubeClient.Core().Nodes().Update(node); err != nil {
glog.Errorf("Failed while updating Node.Spec.PodCIDR (%d retries left): %v", podCIDRUpdateRetry-rep-1, err)
} else {
break
}
}
if err != nil {
nc.recordNodeStatusChange(node, "CIDRAssignmentFailed")
glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err)
err := nc.cidrAllocator.Release(data.cidr)
glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, err)
}
}
// allocateOrOccupyCIDR looks at each new observed node, assigns it a valid CIDR // allocateOrOccupyCIDR looks at each new observed node, assigns it a valid CIDR
// if it doesn't currently have one or mark the CIDR as used if the node already have one. // if it doesn't currently have one or mark the CIDR as used if the node already have one.
func (nc *NodeController) allocateOrOccupyCIDR(obj interface{}) { func (nc *NodeController) allocateOrOccupyCIDR(obj interface{}) {
node := obj.(*api.Node) node := obj.(*api.Node)
if node.Spec.PodCIDR != "" { if node.Spec.PodCIDR != "" {
_, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR) _, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
if err != nil { if err != nil {
@ -369,31 +421,16 @@ func (nc *NodeController) allocateOrOccupyCIDR(obj interface{}) {
} }
return return
} }
podCIDR, err := nc.cidrAllocator.AllocateNext() podCIDR, err := nc.cidrAllocator.AllocateNext()
if err != nil { if err != nil {
nc.recordNodeStatusChange(node, "CIDRNotAvailable") nc.recordNodeStatusChange(node, "CIDRNotAvailable")
return return
} }
glog.V(4).Infof("Assigning node %s CIDR %s", node.Name, podCIDR) glog.V(4).Infof("Putting node %s with CIDR %s into the work queue", node.Name, podCIDR)
for rep := 0; rep < podCIDRUpdateRetry; rep++ { nc.nodeCIDRUpdateChannel <- nodeAndCIDR{
node.Spec.PodCIDR = podCIDR.String() nodeName: node.Name,
if _, err := nc.kubeClient.Core().Nodes().Update(node); err != nil { cidr: podCIDR,
glog.Errorf("Failed while updating Node.Spec.PodCIDR (%d retries left): %v", podCIDRUpdateRetry-rep-1, err)
} else {
break
}
node, err = nc.kubeClient.Core().Nodes().Get(node.Name)
if err != nil {
glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", node.Name, err)
break
}
}
if err != nil {
glog.Errorf("Update PodCIDR of node %v from NodeController exceeds retry count.", node.Name)
nc.recordNodeStatusChange(node, "CIDRAssignmentFailed")
glog.Errorf("CIDR assignment for node %v failed: %v", node.Name, err)
} }
} }
@ -450,6 +487,7 @@ func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) {
// delete terminating pods that have been scheduled on // delete terminating pods that have been scheduled on
// nonexistent nodes // nonexistent nodes
if !found { if !found {
glog.Warningf("Unable to find Node: %v, deleting all assigned Pods.", pod.Spec.NodeName)
utilruntime.HandleError(nc.forcefullyDeletePod(pod)) utilruntime.HandleError(nc.forcefullyDeletePod(pod))
return return
} }