Merge pull request #61375 from satyasm/cloud-cidr-bound-retries

Automatic merge from submit-queue (batch tested with PRs 60455, 61365, 61375, 61597, 61491). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Fix #61363, Bounded retries for cloud allocator.

**What this PR does / why we need it**:

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #61363

**Special notes for your reviewer**:
Changed the tracking of nodesInProcessing from a set to map[string]int so that we can count the
number of times we re-process the node and not re-queue in case updateMaxRetries exceeded.

**Release note**:

```release-note
Bound cloud allocator to 10 retries with 100 ms delay between retries.
```
This commit is contained in:
Kubernetes Submit Queue 2018-03-26 15:34:45 -07:00 committed by GitHub
commit 0ab01d19c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 100 additions and 10 deletions

View File

@ -9,6 +9,7 @@ load(
go_test(
name = "go_default_test",
srcs = [
"cloud_cidr_allocator_test.go",
"controller_test.go",
"range_allocator_test.go",
"timeout_test.go",

View File

@ -69,6 +69,12 @@ const (
// cidrUpdateRetries is the no. of times a NodeSpec update will be retried before dropping it.
cidrUpdateRetries = 3
// updateRetryTimeout is the time to wait before requeing a failed node for retry
updateRetryTimeout = 100 * time.Millisecond
// updateMaxRetries is the max retries for a failed node
updateMaxRetries = 10
)
// CIDRAllocator is an interface implemented by things that know how

View File

@ -20,13 +20,14 @@ import (
"fmt"
"net"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
informers "k8s.io/client-go/informers/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
@ -46,6 +47,11 @@ import (
utiltaints "k8s.io/kubernetes/pkg/util/taints"
)
// nodeProcessingInfo tracks information related to current nodes in processing
type nodeProcessingInfo struct {
retries int
}
// cloudCIDRAllocator allocates node CIDRs according to IP address aliases
// assigned by the cloud provider. In this case, the allocation and
// deallocation is delegated to the external provider, and the controller
@ -69,7 +75,7 @@ type cloudCIDRAllocator struct {
// Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation
lock sync.Mutex
nodesInProcessing sets.String
nodesInProcessing map[string]*nodeProcessingInfo
}
var _ CIDRAllocator = (*cloudCIDRAllocator)(nil)
@ -99,7 +105,7 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter
nodesSynced: nodeInformer.Informer().HasSynced,
nodeUpdateChannel: make(chan string, cidrUpdateQueueSize),
recorder: recorder,
nodesInProcessing: sets.NewString(),
nodesInProcessing: map[string]*nodeProcessingInfo{},
}
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -150,9 +156,15 @@ func (ca *cloudCIDRAllocator) worker(stopChan <-chan struct{}) {
return
}
if err := ca.updateCIDRAllocation(workItem); err != nil {
// Requeue the failed node for update again.
ca.nodeUpdateChannel <- workItem
if ca.canRetry(workItem) {
time.AfterFunc(updateRetryTimeout, func() {
// Requeue the failed node for update again.
ca.nodeUpdateChannel <- workItem
})
continue
}
}
ca.removeNodeFromProcessing(workItem)
case <-stopChan:
return
}
@ -162,17 +174,28 @@ func (ca *cloudCIDRAllocator) worker(stopChan <-chan struct{}) {
func (ca *cloudCIDRAllocator) insertNodeToProcessing(nodeName string) bool {
ca.lock.Lock()
defer ca.lock.Unlock()
if ca.nodesInProcessing.Has(nodeName) {
if _, found := ca.nodesInProcessing[nodeName]; found {
return false
}
ca.nodesInProcessing.Insert(nodeName)
ca.nodesInProcessing[nodeName] = &nodeProcessingInfo{}
return true
}
func (ca *cloudCIDRAllocator) canRetry(nodeName string) bool {
ca.lock.Lock()
defer ca.lock.Unlock()
count := ca.nodesInProcessing[nodeName].retries + 1
if count > updateMaxRetries {
return false
}
ca.nodesInProcessing[nodeName].retries = count
return true
}
func (ca *cloudCIDRAllocator) removeNodeFromProcessing(nodeName string) {
ca.lock.Lock()
defer ca.lock.Unlock()
ca.nodesInProcessing.Delete(nodeName)
delete(ca.nodesInProcessing, nodeName)
}
// WARNING: If you're adding any return calls or defer any more work from this
@ -194,10 +217,11 @@ func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
// updateCIDRAllocation assigns CIDR to Node and sends an update to the API server.
func (ca *cloudCIDRAllocator) updateCIDRAllocation(nodeName string) error {
defer ca.removeNodeFromProcessing(nodeName)
node, err := ca.nodeLister.Get(nodeName)
if err != nil {
if errors.IsNotFound(err) {
return nil // node no longer available, skip processing
}
glog.Errorf("Failed while getting node %v for updating Node.Spec.PodCIDR: %v", nodeName, err)
return err
}

View File

@ -0,0 +1,59 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipam
import (
"testing"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
)
func hasNodeInProcessing(ca *cloudCIDRAllocator, name string) bool {
ca.lock.Lock()
defer ca.lock.Unlock()
_, found := ca.nodesInProcessing[name]
return found
}
func TestBoundedRetries(t *testing.T) {
clientSet := fake.NewSimpleClientset()
updateChan := make(chan string, 1) // need to buffer as we are using only on go routine
stopChan := make(chan struct{})
sharedInfomer := informers.NewSharedInformerFactory(clientSet, 1*time.Hour)
ca := &cloudCIDRAllocator{
client: clientSet,
nodeUpdateChannel: updateChan,
nodeLister: sharedInfomer.Core().V1().Nodes().Lister(),
nodesSynced: sharedInfomer.Core().V1().Nodes().Informer().HasSynced,
nodesInProcessing: map[string]*nodeProcessingInfo{},
}
go ca.worker(stopChan)
nodeName := "testNode"
ca.AllocateOrOccupyCIDR(&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
},
})
for hasNodeInProcessing(ca, nodeName) {
// wait for node to finish processing (should terminate and not time out)
}
}