Merge pull request #26531 from gmarek/cidrs

Automatic merge from submit-queue

AllocateOrOccupyCIDR returs quickly

Fix #26511.

AllocateOrOccupy is called before adding a Node to NCs cache. It turns out that sending an update to API server can take too long which will make NC think that given Node does not exist and all Pods from it will be removed.

The fix is to move the long part of the call (updating NodeSpec) to a separate go-routine. To prevent overloading the server and huge number of retries we limit the number of concurrent Update calls.

cc @zmerlynn @davidopp
This commit is contained in:
k8s-merge-robot 2016-05-31 02:03:11 -07:00
commit d4ffb03119
2 changed files with 64 additions and 23 deletions

View File

@ -42,6 +42,7 @@ type rangeAllocator struct {
maxCIDRs int
used big.Int
lock sync.Mutex
nextCandidate int
}
// NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node
@ -56,6 +57,7 @@ func NewCIDRRangeAllocator(clusterCIDR *net.IPNet, subNetMaskSize int) CIDRAlloc
clusterMaskSize: clusterMaskSize,
subNetMaskSize: subNetMaskSize,
maxCIDRs: 1 << uint32(subNetMaskSize-clusterMaskSize),
nextCandidate: 0,
}
return ra
}
@ -66,14 +68,16 @@ func (r *rangeAllocator) AllocateNext() (*net.IPNet, error) {
nextUnused := -1
for i := 0; i < r.maxCIDRs; i++ {
if r.used.Bit(i) == 0 {
nextUnused = i
candidate := (i + r.nextCandidate) % r.maxCIDRs
if r.used.Bit(candidate) == 0 {
nextUnused = candidate
break
}
}
if nextUnused == -1 {
return nil, errCIDRRangeNoCIDRsRemaining
}
r.nextCandidate = (nextUnused + 1) % r.maxCIDRs
r.used.SetBit(&r.used, nextUnused, 1)
@ -138,7 +142,6 @@ func (r *rangeAllocator) Occupy(cidr *net.IPNet) (err error) {
r.lock.Lock()
defer r.lock.Unlock()
for i := begin; i <= end; i++ {
r.used.SetBit(&r.used, i, 1)
}

View File

@ -61,6 +61,9 @@ const (
podCIDRUpdateRetry = 5
// controls how often NodeController will try to evict Pods from non-responsive Nodes.
nodeEvictionPeriod = 100 * time.Millisecond
// controlls how many NodeSpec updates NC can process in any moment.
cidrUpdateWorkers = 10
cidrUpdateQueueSize = 5000
)
type nodeStatusData struct {
@ -69,6 +72,11 @@ type nodeStatusData struct {
status api.NodeStatus
}
type nodeAndCIDR struct {
nodeName string
cidr *net.IPNet
}
type NodeController struct {
allocateNodeCIDRs bool
cloud cloudprovider.Interface
@ -134,6 +142,8 @@ type NodeController struct {
// It is enabled when all Nodes observed by the NodeController are NotReady and disabled
// when NC sees any healthy Node. This is a temporary fix for v1.3.
networkSegmentationMode bool
nodeCIDRUpdateChannel chan nodeAndCIDR
}
// NewNodeController returns a new node controller to sync instances from cloudprovider.
@ -196,6 +206,7 @@ func NewNodeController(
allocateNodeCIDRs: allocateNodeCIDRs,
forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) },
nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) },
nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize),
}
nc.podStore.Indexer, nc.podController = framework.NewIndexerInformer(
@ -340,6 +351,24 @@ func (nc *NodeController) Run(period time.Duration) {
}, nodeEvictionPeriod, wait.NeverStop)
go wait.Until(nc.cleanupOrphanedPods, 30*time.Second, wait.NeverStop)
for i := 0; i < cidrUpdateWorkers; i++ {
go func(stopChan <-chan struct{}) {
for {
select {
case workItem, ok := <-nc.nodeCIDRUpdateChannel:
if !ok {
glog.Warning("NodeCIDRUpdateChannel read returned false.")
return
}
nc.updateCIDRAllocation(workItem)
case <-stopChan:
glog.V(0).Info("StopChannel is closed.")
return
}
}
}(wait.NeverStop)
}
}
func (nc *NodeController) filterOutServiceRange() {
@ -352,11 +381,34 @@ func (nc *NodeController) filterOutServiceRange() {
}
}
func (nc *NodeController) updateCIDRAllocation(data nodeAndCIDR) {
var err error
var node *api.Node
for rep := 0; rep < podCIDRUpdateRetry; rep++ {
node, err = nc.kubeClient.Core().Nodes().Get(data.nodeName)
if err != nil {
glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", data.nodeName, err)
continue
}
node.Spec.PodCIDR = data.cidr.String()
if _, err := nc.kubeClient.Core().Nodes().Update(node); err != nil {
glog.Errorf("Failed while updating Node.Spec.PodCIDR (%d retries left): %v", podCIDRUpdateRetry-rep-1, err)
} else {
break
}
}
if err != nil {
nc.recordNodeStatusChange(node, "CIDRAssignmentFailed")
glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err)
err := nc.cidrAllocator.Release(data.cidr)
glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, err)
}
}
// allocateOrOccupyCIDR looks at each new observed node, assigns it a valid CIDR
// if it doesn't currently have one or mark the CIDR as used if the node already have one.
func (nc *NodeController) allocateOrOccupyCIDR(obj interface{}) {
node := obj.(*api.Node)
if node.Spec.PodCIDR != "" {
_, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
if err != nil {
@ -369,31 +421,16 @@ func (nc *NodeController) allocateOrOccupyCIDR(obj interface{}) {
}
return
}
podCIDR, err := nc.cidrAllocator.AllocateNext()
if err != nil {
nc.recordNodeStatusChange(node, "CIDRNotAvailable")
return
}
glog.V(4).Infof("Assigning node %s CIDR %s", node.Name, podCIDR)
for rep := 0; rep < podCIDRUpdateRetry; rep++ {
node.Spec.PodCIDR = podCIDR.String()
if _, err := nc.kubeClient.Core().Nodes().Update(node); err != nil {
glog.Errorf("Failed while updating Node.Spec.PodCIDR (%d retries left): %v", podCIDRUpdateRetry-rep-1, err)
} else {
break
}
node, err = nc.kubeClient.Core().Nodes().Get(node.Name)
if err != nil {
glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", node.Name, err)
break
}
}
if err != nil {
glog.Errorf("Update PodCIDR of node %v from NodeController exceeds retry count.", node.Name)
nc.recordNodeStatusChange(node, "CIDRAssignmentFailed")
glog.Errorf("CIDR assignment for node %v failed: %v", node.Name, err)
glog.V(4).Infof("Putting node %s with CIDR %s into the work queue", node.Name, podCIDR)
nc.nodeCIDRUpdateChannel <- nodeAndCIDR{
nodeName: node.Name,
cidr: podCIDR,
}
}
@ -450,6 +487,7 @@ func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) {
// delete terminating pods that have been scheduled on
// nonexistent nodes
if !found {
glog.Warningf("Unable to find Node: %v, deleting all assigned Pods.", pod.Spec.NodeName)
utilruntime.HandleError(nc.forcefullyDeletePod(pod))
return
}