Merge pull request #62543 from ingvagabund/timeout-on-cloud-provider-request

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Timeout on instances.NodeAddresses cloud provider request

**What this PR does / why we need it**:

In cases the cloud provider does not respond before the node gets evicted.

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #

**Special notes for your reviewer**:


**Release note**:
```release-note
stop kubelet to cloud provider integration potentially wedging kubelet sync loop
```
This commit is contained in:
Kubernetes Submit Queue 2018-04-23 09:12:42 -07:00 committed by GitHub
commit 5b77996433
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 48 additions and 1 deletions

View File

@ -531,6 +531,13 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
keepTerminatedPodVolumes: keepTerminatedPodVolumes, keepTerminatedPodVolumes: keepTerminatedPodVolumes,
} }
if klet.cloud != nil {
klet.cloudproviderRequestParallelism = make(chan int, 1)
klet.cloudproviderRequestSync = make(chan int)
// TODO(jchaloup): Make it configurable via --cloud-provider-request-timeout
klet.cloudproviderRequestTimeout = 10 * time.Second
}
secretManager := secret.NewCachingSecretManager( secretManager := secret.NewCachingSecretManager(
kubeDeps.KubeClient, secret.GetObjectTTLFromNodeFunc(klet.GetNode)) kubeDeps.KubeClient, secret.GetObjectTTLFromNodeFunc(klet.GetNode))
klet.secretManager = secretManager klet.secretManager = secretManager
@ -967,6 +974,14 @@ type Kubelet struct {
// Cloud provider interface. // Cloud provider interface.
cloud cloudprovider.Interface cloud cloudprovider.Interface
// To keep exclusive access to the cloudproviderRequestParallelism
cloudproviderRequestMux sync.Mutex
// Keep the count of requests processed in parallel (expected to be 1 at most at a given time)
cloudproviderRequestParallelism chan int
// Sync with finished requests
cloudproviderRequestSync chan int
// Request timeout
cloudproviderRequestTimeout time.Duration
// Indicates that the node initialization happens in an external cloud controller // Indicates that the node initialization happens in an external cloud controller
externalCloudProvider bool externalCloudProvider bool

View File

@ -432,7 +432,36 @@ func (kl *Kubelet) setNodeAddress(node *v1.Node) error {
// to the cloud provider? // to the cloud provider?
// TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface // TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface
// TODO: If IP addresses couldn't be fetched from the cloud provider, should kubelet fallback on the other methods for getting the IP below? // TODO: If IP addresses couldn't be fetched from the cloud provider, should kubelet fallback on the other methods for getting the IP below?
nodeAddresses, err := instances.NodeAddresses(context.TODO(), kl.nodeName) var nodeAddresses []v1.NodeAddress
var err error
// Make sure the instances.NodeAddresses returns even if the cloud provider API hangs for a long time
func() {
kl.cloudproviderRequestMux.Lock()
if len(kl.cloudproviderRequestParallelism) > 0 {
kl.cloudproviderRequestMux.Unlock()
return
}
kl.cloudproviderRequestParallelism <- 0
kl.cloudproviderRequestMux.Unlock()
go func() {
nodeAddresses, err = instances.NodeAddresses(context.TODO(), kl.nodeName)
kl.cloudproviderRequestMux.Lock()
<-kl.cloudproviderRequestParallelism
kl.cloudproviderRequestMux.Unlock()
kl.cloudproviderRequestSync <- 0
}()
}()
select {
case <-kl.cloudproviderRequestSync:
case <-time.After(kl.cloudproviderRequestTimeout):
err = fmt.Errorf("Timeout after %v", kl.cloudproviderRequestTimeout)
}
if err != nil { if err != nil {
return fmt.Errorf("failed to get node address from cloud provider: %v", err) return fmt.Errorf("failed to get node address from cloud provider: %v", err)
} }

View File

@ -192,6 +192,9 @@ func TestNodeStatusWithCloudProviderNodeIP(t *testing.T) {
Err: nil, Err: nil,
} }
kubelet.cloud = fakeCloud kubelet.cloud = fakeCloud
kubelet.cloudproviderRequestParallelism = make(chan int, 1)
kubelet.cloudproviderRequestSync = make(chan int)
kubelet.cloudproviderRequestTimeout = 10 * time.Second
kubelet.setNodeAddress(&existingNode) kubelet.setNodeAddress(&existingNode)