mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 17:30:00 +00:00
Timeout on instances.NodeAddresses cloud provider request
This commit is contained in:
parent
2bfe40a1d1
commit
61efc29394
@ -531,6 +531,13 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
keepTerminatedPodVolumes: keepTerminatedPodVolumes,
|
keepTerminatedPodVolumes: keepTerminatedPodVolumes,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if klet.cloud != nil {
|
||||||
|
klet.cloudproviderRequestParallelism = make(chan int, 1)
|
||||||
|
klet.cloudproviderRequestSync = make(chan int)
|
||||||
|
// TODO(jchaloup): Make it configurable via --cloud-provider-request-timeout
|
||||||
|
klet.cloudproviderRequestTimeout = 10 * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
secretManager := secret.NewCachingSecretManager(
|
secretManager := secret.NewCachingSecretManager(
|
||||||
kubeDeps.KubeClient, secret.GetObjectTTLFromNodeFunc(klet.GetNode))
|
kubeDeps.KubeClient, secret.GetObjectTTLFromNodeFunc(klet.GetNode))
|
||||||
klet.secretManager = secretManager
|
klet.secretManager = secretManager
|
||||||
@ -967,6 +974,14 @@ type Kubelet struct {
|
|||||||
|
|
||||||
// Cloud provider interface.
|
// Cloud provider interface.
|
||||||
cloud cloudprovider.Interface
|
cloud cloudprovider.Interface
|
||||||
|
// To keep exclusive access to the cloudproviderRequestParallelism
|
||||||
|
cloudproviderRequestMux sync.Mutex
|
||||||
|
// Keep the count of requests processed in parallel (expected to be 1 at most at a given time)
|
||||||
|
cloudproviderRequestParallelism chan int
|
||||||
|
// Sync with finished requests
|
||||||
|
cloudproviderRequestSync chan int
|
||||||
|
// Request timeout
|
||||||
|
cloudproviderRequestTimeout time.Duration
|
||||||
|
|
||||||
// Indicates that the node initialization happens in an external cloud controller
|
// Indicates that the node initialization happens in an external cloud controller
|
||||||
externalCloudProvider bool
|
externalCloudProvider bool
|
||||||
|
@ -432,7 +432,36 @@ func (kl *Kubelet) setNodeAddress(node *v1.Node) error {
|
|||||||
// to the cloud provider?
|
// to the cloud provider?
|
||||||
// TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface
|
// TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface
|
||||||
// TODO: If IP addresses couldn't be fetched from the cloud provider, should kubelet fallback on the other methods for getting the IP below?
|
// TODO: If IP addresses couldn't be fetched from the cloud provider, should kubelet fallback on the other methods for getting the IP below?
|
||||||
nodeAddresses, err := instances.NodeAddresses(context.TODO(), kl.nodeName)
|
var nodeAddresses []v1.NodeAddress
|
||||||
|
var err error
|
||||||
|
|
||||||
|
// Make sure the instances.NodeAddresses returns even if the cloud provider API hangs for a long time
|
||||||
|
func() {
|
||||||
|
kl.cloudproviderRequestMux.Lock()
|
||||||
|
if len(kl.cloudproviderRequestParallelism) > 0 {
|
||||||
|
kl.cloudproviderRequestMux.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
kl.cloudproviderRequestParallelism <- 0
|
||||||
|
kl.cloudproviderRequestMux.Unlock()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
nodeAddresses, err = instances.NodeAddresses(context.TODO(), kl.nodeName)
|
||||||
|
|
||||||
|
kl.cloudproviderRequestMux.Lock()
|
||||||
|
<-kl.cloudproviderRequestParallelism
|
||||||
|
kl.cloudproviderRequestMux.Unlock()
|
||||||
|
|
||||||
|
kl.cloudproviderRequestSync <- 0
|
||||||
|
}()
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-kl.cloudproviderRequestSync:
|
||||||
|
case <-time.After(kl.cloudproviderRequestTimeout):
|
||||||
|
err = fmt.Errorf("Timeout after %v", kl.cloudproviderRequestTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to get node address from cloud provider: %v", err)
|
return fmt.Errorf("failed to get node address from cloud provider: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -192,6 +192,9 @@ func TestNodeStatusWithCloudProviderNodeIP(t *testing.T) {
|
|||||||
Err: nil,
|
Err: nil,
|
||||||
}
|
}
|
||||||
kubelet.cloud = fakeCloud
|
kubelet.cloud = fakeCloud
|
||||||
|
kubelet.cloudproviderRequestParallelism = make(chan int, 1)
|
||||||
|
kubelet.cloudproviderRequestSync = make(chan int)
|
||||||
|
kubelet.cloudproviderRequestTimeout = 10 * time.Second
|
||||||
|
|
||||||
kubelet.setNodeAddress(&existingNode)
|
kubelet.setNodeAddress(&existingNode)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user