Fix #61363, Bounded retries for cloud allocator.

This commit is contained in:
Satyadeep Musuvathy 2018-03-19 15:29:20 -07:00
parent d0f8f41890
commit adc71ff034
4 changed files with 100 additions and 10 deletions

View File

@ -9,6 +9,7 @@ load(
go_test(
name = "go_default_test",
srcs = [
"cloud_cidr_allocator_test.go",
"controller_test.go",
"range_allocator_test.go",
"timeout_test.go",

View File

@ -69,6 +69,12 @@ const (
// cidrUpdateRetries is the no. of times a NodeSpec update will be retried before dropping it.
cidrUpdateRetries = 3
// updateRetryTimeout is the time to wait before requeing a failed node for retry
updateRetryTimeout = 100 * time.Millisecond
// updateMaxRetries is the max retries for a failed node
updateMaxRetries = 10
)
// CIDRAllocator is an interface implemented by things that know how

View File

@ -20,13 +20,14 @@ import (
"fmt"
"net"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
informers "k8s.io/client-go/informers/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
@ -44,6 +45,11 @@ import (
utilnode "k8s.io/kubernetes/pkg/util/node"
)
// nodeProcessingInfo tracks information related to current nodes in processing
type nodeProcessingInfo struct {
retries int
}
// cloudCIDRAllocator allocates node CIDRs according to IP address aliases
// assigned by the cloud provider. In this case, the allocation and
// deallocation is delegated to the external provider, and the controller
@ -67,7 +73,7 @@ type cloudCIDRAllocator struct {
// Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation
lock sync.Mutex
nodesInProcessing sets.String
nodesInProcessing map[string]*nodeProcessingInfo
}
var _ CIDRAllocator = (*cloudCIDRAllocator)(nil)
@ -97,7 +103,7 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter
nodesSynced: nodeInformer.Informer().HasSynced,
nodeUpdateChannel: make(chan string, cidrUpdateQueueSize),
recorder: recorder,
nodesInProcessing: sets.NewString(),
nodesInProcessing: map[string]*nodeProcessingInfo{},
}
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -147,9 +153,15 @@ func (ca *cloudCIDRAllocator) worker(stopChan <-chan struct{}) {
return
}
if err := ca.updateCIDRAllocation(workItem); err != nil {
// Requeue the failed node for update again.
ca.nodeUpdateChannel <- workItem
if ca.canRetry(workItem) {
time.AfterFunc(updateRetryTimeout, func() {
// Requeue the failed node for update again.
ca.nodeUpdateChannel <- workItem
})
continue
}
}
ca.removeNodeFromProcessing(workItem)
case <-stopChan:
return
}
@ -159,17 +171,28 @@ func (ca *cloudCIDRAllocator) worker(stopChan <-chan struct{}) {
func (ca *cloudCIDRAllocator) insertNodeToProcessing(nodeName string) bool {
ca.lock.Lock()
defer ca.lock.Unlock()
if ca.nodesInProcessing.Has(nodeName) {
if _, found := ca.nodesInProcessing[nodeName]; found {
return false
}
ca.nodesInProcessing.Insert(nodeName)
ca.nodesInProcessing[nodeName] = &nodeProcessingInfo{}
return true
}
func (ca *cloudCIDRAllocator) canRetry(nodeName string) bool {
ca.lock.Lock()
defer ca.lock.Unlock()
count := ca.nodesInProcessing[nodeName].retries + 1
if count > updateMaxRetries {
return false
}
ca.nodesInProcessing[nodeName].retries = count
return true
}
func (ca *cloudCIDRAllocator) removeNodeFromProcessing(nodeName string) {
ca.lock.Lock()
defer ca.lock.Unlock()
ca.nodesInProcessing.Delete(nodeName)
delete(ca.nodesInProcessing, nodeName)
}
// WARNING: If you're adding any return calls or defer any more work from this
@ -191,10 +214,11 @@ func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
// updateCIDRAllocation assigns CIDR to Node and sends an update to the API server.
func (ca *cloudCIDRAllocator) updateCIDRAllocation(nodeName string) error {
defer ca.removeNodeFromProcessing(nodeName)
node, err := ca.nodeLister.Get(nodeName)
if err != nil {
if errors.IsNotFound(err) {
return nil // node no longer available, skip processing
}
glog.Errorf("Failed while getting node %v for updating Node.Spec.PodCIDR: %v", nodeName, err)
return err
}

View File

@ -0,0 +1,59 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipam
import (
"testing"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
)
func hasNodeInProcessing(ca *cloudCIDRAllocator, name string) bool {
ca.lock.Lock()
defer ca.lock.Unlock()
_, found := ca.nodesInProcessing[name]
return found
}
func TestBoundedRetries(t *testing.T) {
clientSet := fake.NewSimpleClientset()
updateChan := make(chan string, 1) // need to buffer as we are using only on go routine
stopChan := make(chan struct{})
sharedInfomer := informers.NewSharedInformerFactory(clientSet, 1*time.Hour)
ca := &cloudCIDRAllocator{
client: clientSet,
nodeUpdateChannel: updateChan,
nodeLister: sharedInfomer.Core().V1().Nodes().Lister(),
nodesSynced: sharedInfomer.Core().V1().Nodes().Informer().HasSynced,
nodesInProcessing: map[string]*nodeProcessingInfo{},
}
go ca.worker(stopChan)
nodeName := "testNode"
ca.AllocateOrOccupyCIDR(&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
},
})
for hasNodeInProcessing(ca, nodeName) {
// wait for node to finish processing (should terminate and not time out)
}
}