diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index a09c6a6f91e..e1efabbb53e 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -531,6 +531,13 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, keepTerminatedPodVolumes: keepTerminatedPodVolumes, } + if klet.cloud != nil { + klet.cloudproviderRequestParallelism = make(chan int, 1) + klet.cloudproviderRequestSync = make(chan int) + // TODO(jchaloup): Make it configurable via --cloud-provider-request-timeout + klet.cloudproviderRequestTimeout = 10 * time.Second + } + secretManager := secret.NewCachingSecretManager( kubeDeps.KubeClient, secret.GetObjectTTLFromNodeFunc(klet.GetNode)) klet.secretManager = secretManager @@ -967,6 +974,14 @@ type Kubelet struct { // Cloud provider interface. cloud cloudprovider.Interface + // To keep exclusive access to the cloudproviderRequestParallelism + cloudproviderRequestMux sync.Mutex + // Keep the count of requests processed in parallel (expected to be 1 at most at a given time) + cloudproviderRequestParallelism chan int + // Sync with finished requests + cloudproviderRequestSync chan int + // Request timeout + cloudproviderRequestTimeout time.Duration // Indicates that the node initialization happens in an external cloud controller externalCloudProvider bool diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 44093139599..3d842b2f72a 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -432,7 +432,36 @@ func (kl *Kubelet) setNodeAddress(node *v1.Node) error { // to the cloud provider? // TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface // TODO: If IP addresses couldn't be fetched from the cloud provider, should kubelet fallback on the other methods for getting the IP below? - nodeAddresses, err := instances.NodeAddresses(context.TODO(), kl.nodeName) + var nodeAddresses []v1.NodeAddress + var err error + + // Make sure the instances.NodeAddresses returns even if the cloud provider API hangs for a long time + func() { + kl.cloudproviderRequestMux.Lock() + if len(kl.cloudproviderRequestParallelism) > 0 { + kl.cloudproviderRequestMux.Unlock() + return + } + kl.cloudproviderRequestParallelism <- 0 + kl.cloudproviderRequestMux.Unlock() + + go func() { + nodeAddresses, err = instances.NodeAddresses(context.TODO(), kl.nodeName) + + kl.cloudproviderRequestMux.Lock() + <-kl.cloudproviderRequestParallelism + kl.cloudproviderRequestMux.Unlock() + + kl.cloudproviderRequestSync <- 0 + }() + }() + + select { + case <-kl.cloudproviderRequestSync: + case <-time.After(kl.cloudproviderRequestTimeout): + err = fmt.Errorf("Timeout after %v", kl.cloudproviderRequestTimeout) + } + if err != nil { return fmt.Errorf("failed to get node address from cloud provider: %v", err) } diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index b3002022882..20ab8c8c74e 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -192,6 +192,9 @@ func TestNodeStatusWithCloudProviderNodeIP(t *testing.T) { Err: nil, } kubelet.cloud = fakeCloud + kubelet.cloudproviderRequestParallelism = make(chan int, 1) + kubelet.cloudproviderRequestSync = make(chan int) + kubelet.cloudproviderRequestTimeout = 10 * time.Second kubelet.setNodeAddress(&existingNode)