From adc71ff03485f303ef928ae308b94af1d715db85 Mon Sep 17 00:00:00 2001 From: Satyadeep Musuvathy Date: Mon, 19 Mar 2018 15:29:20 -0700 Subject: [PATCH] Fix #61363, Bounded retries for cloud allocator. --- pkg/controller/nodeipam/ipam/BUILD | 1 + .../nodeipam/ipam/cidr_allocator.go | 6 ++ .../nodeipam/ipam/cloud_cidr_allocator.go | 44 ++++++++++---- .../ipam/cloud_cidr_allocator_test.go | 59 +++++++++++++++++++ 4 files changed, 100 insertions(+), 10 deletions(-) create mode 100644 pkg/controller/nodeipam/ipam/cloud_cidr_allocator_test.go diff --git a/pkg/controller/nodeipam/ipam/BUILD b/pkg/controller/nodeipam/ipam/BUILD index 22da7772912..653fb14f192 100644 --- a/pkg/controller/nodeipam/ipam/BUILD +++ b/pkg/controller/nodeipam/ipam/BUILD @@ -9,6 +9,7 @@ load( go_test( name = "go_default_test", srcs = [ + "cloud_cidr_allocator_test.go", "controller_test.go", "range_allocator_test.go", "timeout_test.go", diff --git a/pkg/controller/nodeipam/ipam/cidr_allocator.go b/pkg/controller/nodeipam/ipam/cidr_allocator.go index 4a5cee34d83..768d221f9fa 100644 --- a/pkg/controller/nodeipam/ipam/cidr_allocator.go +++ b/pkg/controller/nodeipam/ipam/cidr_allocator.go @@ -69,6 +69,12 @@ const ( // cidrUpdateRetries is the no. of times a NodeSpec update will be retried before dropping it. cidrUpdateRetries = 3 + + // updateRetryTimeout is the time to wait before requeing a failed node for retry + updateRetryTimeout = 100 * time.Millisecond + + // updateMaxRetries is the max retries for a failed node + updateMaxRetries = 10 ) // CIDRAllocator is an interface implemented by things that know how diff --git a/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go b/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go index 6f4e4cfcc7a..c45c5acec33 100644 --- a/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go +++ b/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go @@ -20,13 +20,14 @@ import ( "fmt" "net" "sync" + "time" "github.com/golang/glog" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/sets" informers "k8s.io/client-go/informers/core/v1" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" @@ -44,6 +45,11 @@ import ( utilnode "k8s.io/kubernetes/pkg/util/node" ) +// nodeProcessingInfo tracks information related to current nodes in processing +type nodeProcessingInfo struct { + retries int +} + // cloudCIDRAllocator allocates node CIDRs according to IP address aliases // assigned by the cloud provider. In this case, the allocation and // deallocation is delegated to the external provider, and the controller @@ -67,7 +73,7 @@ type cloudCIDRAllocator struct { // Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation lock sync.Mutex - nodesInProcessing sets.String + nodesInProcessing map[string]*nodeProcessingInfo } var _ CIDRAllocator = (*cloudCIDRAllocator)(nil) @@ -97,7 +103,7 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter nodesSynced: nodeInformer.Informer().HasSynced, nodeUpdateChannel: make(chan string, cidrUpdateQueueSize), recorder: recorder, - nodesInProcessing: sets.NewString(), + nodesInProcessing: map[string]*nodeProcessingInfo{}, } nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -147,9 +153,15 @@ func (ca *cloudCIDRAllocator) worker(stopChan <-chan struct{}) { return } if err := ca.updateCIDRAllocation(workItem); err != nil { - // Requeue the failed node for update again. - ca.nodeUpdateChannel <- workItem + if ca.canRetry(workItem) { + time.AfterFunc(updateRetryTimeout, func() { + // Requeue the failed node for update again. + ca.nodeUpdateChannel <- workItem + }) + continue + } } + ca.removeNodeFromProcessing(workItem) case <-stopChan: return } @@ -159,17 +171,28 @@ func (ca *cloudCIDRAllocator) worker(stopChan <-chan struct{}) { func (ca *cloudCIDRAllocator) insertNodeToProcessing(nodeName string) bool { ca.lock.Lock() defer ca.lock.Unlock() - if ca.nodesInProcessing.Has(nodeName) { + if _, found := ca.nodesInProcessing[nodeName]; found { return false } - ca.nodesInProcessing.Insert(nodeName) + ca.nodesInProcessing[nodeName] = &nodeProcessingInfo{} + return true +} + +func (ca *cloudCIDRAllocator) canRetry(nodeName string) bool { + ca.lock.Lock() + defer ca.lock.Unlock() + count := ca.nodesInProcessing[nodeName].retries + 1 + if count > updateMaxRetries { + return false + } + ca.nodesInProcessing[nodeName].retries = count return true } func (ca *cloudCIDRAllocator) removeNodeFromProcessing(nodeName string) { ca.lock.Lock() defer ca.lock.Unlock() - ca.nodesInProcessing.Delete(nodeName) + delete(ca.nodesInProcessing, nodeName) } // WARNING: If you're adding any return calls or defer any more work from this @@ -191,10 +214,11 @@ func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(node *v1.Node) error { // updateCIDRAllocation assigns CIDR to Node and sends an update to the API server. func (ca *cloudCIDRAllocator) updateCIDRAllocation(nodeName string) error { - defer ca.removeNodeFromProcessing(nodeName) - node, err := ca.nodeLister.Get(nodeName) if err != nil { + if errors.IsNotFound(err) { + return nil // node no longer available, skip processing + } glog.Errorf("Failed while getting node %v for updating Node.Spec.PodCIDR: %v", nodeName, err) return err } diff --git a/pkg/controller/nodeipam/ipam/cloud_cidr_allocator_test.go b/pkg/controller/nodeipam/ipam/cloud_cidr_allocator_test.go new file mode 100644 index 00000000000..cf64b0fc31b --- /dev/null +++ b/pkg/controller/nodeipam/ipam/cloud_cidr_allocator_test.go @@ -0,0 +1,59 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipam + +import ( + "testing" + "time" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" +) + +func hasNodeInProcessing(ca *cloudCIDRAllocator, name string) bool { + ca.lock.Lock() + defer ca.lock.Unlock() + + _, found := ca.nodesInProcessing[name] + return found +} + +func TestBoundedRetries(t *testing.T) { + clientSet := fake.NewSimpleClientset() + updateChan := make(chan string, 1) // need to buffer as we are using only on go routine + stopChan := make(chan struct{}) + sharedInfomer := informers.NewSharedInformerFactory(clientSet, 1*time.Hour) + ca := &cloudCIDRAllocator{ + client: clientSet, + nodeUpdateChannel: updateChan, + nodeLister: sharedInfomer.Core().V1().Nodes().Lister(), + nodesSynced: sharedInfomer.Core().V1().Nodes().Informer().HasSynced, + nodesInProcessing: map[string]*nodeProcessingInfo{}, + } + go ca.worker(stopChan) + nodeName := "testNode" + ca.AllocateOrOccupyCIDR(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + }, + }) + for hasNodeInProcessing(ca, nodeName) { + // wait for node to finish processing (should terminate and not time out) + } +}