diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 7acd84bee68..80d24482939 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -17,6 +17,7 @@ limitations under the License. package csi import ( + "context" "errors" "fmt" "os" @@ -24,8 +25,6 @@ import ( "strings" "time" - "context" - "k8s.io/klog" api "k8s.io/api/core/v1" @@ -227,10 +226,10 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error { if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) { - // This function prevents Kubelet from posting Ready status until CSINodeInfo + // This function prevents Kubelet from posting Ready status until CSINode // is both installed and initialized if err := initializeCSINode(host); err != nil { - return errors.New(log("failed to initialize CSINodeInfo: %v", err)) + return errors.New(log("failed to initialize CSINode: %v", err)) } } @@ -240,21 +239,28 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error { func initializeCSINode(host volume.VolumeHost) error { kvh, ok := host.(volume.KubeletVolumeHost) if !ok { - klog.V(4).Info("Cast from VolumeHost to KubeletVolumeHost failed. Skipping CSINodeInfo initialization, not running on kubelet") + klog.V(4).Info("Cast from VolumeHost to KubeletVolumeHost failed. Skipping CSINode initialization, not running on kubelet") return nil } kubeClient := host.GetKubeClient() if kubeClient == nil { - // Kubelet running in standalone mode. Skip CSINodeInfo initialization - klog.Warning("Skipping CSINodeInfo initialization, kubelet running in standalone mode") + // Kubelet running in standalone mode. Skip CSINode initialization + klog.Warning("Skipping CSINode initialization, kubelet running in standalone mode") return nil } - kvh.SetKubeletError(errors.New("CSINodeInfo is not yet initialized")) + kvh.SetKubeletError(errors.New("CSINode is not yet initialized")) go func() { defer utilruntime.HandleCrash() + // First wait indefinitely to talk to Kube APIServer + nodeName := host.GetNodeName() + err := waitForAPIServerForever(kubeClient, nodeName) + if err != nil { + klog.Fatalf("Failed to initialize CSINode while waiting for API server to report ok: %v", err) + } + // Backoff parameters tuned to retry over 140 seconds. Will fail and restart the Kubelet // after max retry steps. initBackoff := wait.Backoff{ @@ -263,12 +269,12 @@ func initializeCSINode(host volume.VolumeHost) error { Factor: 6.0, Jitter: 0.1, } - err := wait.ExponentialBackoff(initBackoff, func() (bool, error) { - klog.V(4).Infof("Initializing migrated drivers on CSINodeInfo") + err = wait.ExponentialBackoff(initBackoff, func() (bool, error) { + klog.V(4).Infof("Initializing migrated drivers on CSINode") err := nim.InitializeCSINodeWithAnnotation() if err != nil { - kvh.SetKubeletError(fmt.Errorf("Failed to initialize CSINodeInfo: %v", err)) - klog.Errorf("Failed to initialize CSINodeInfo: %v", err) + kvh.SetKubeletError(fmt.Errorf("Failed to initialize CSINode: %v", err)) + klog.Errorf("Failed to initialize CSINode: %v", err) return false, nil } @@ -282,7 +288,7 @@ func initializeCSINode(host volume.VolumeHost) error { // using CSI for all Migrated volume plugins. Then all the CSINode initialization // code can be dropped from Kubelet. // Kill the Kubelet process and allow it to restart to retry initialization - klog.Fatalf("Failed to initialize CSINodeInfo after retrying") + klog.Fatalf("Failed to initialize CSINode after retrying: %v", err) } }() return nil @@ -914,3 +920,28 @@ func highestSupportedVersion(versions []string) (*utilversion.Version, error) { } return highestSupportedVersion, nil } + +// waitForAPIServerForever waits forever to get a CSINode instance as a proxy +// for a healthy APIServer +func waitForAPIServerForever(client clientset.Interface, nodeName types.NodeName) error { + var lastErr error + err := wait.PollImmediateInfinite(time.Second, func() (bool, error) { + // Get a CSINode from API server to make sure 1) kubelet can reach API server + // and 2) it has enough permissions. Kubelet may have restricted permissions + // when it's bootstrapping TLS. + // https://kubernetes.io/docs/reference/command-line-tools-reference/kubelet-tls-bootstrapping/ + _, lastErr = client.StorageV1().CSINodes().Get(context.TODO(), string(nodeName), meta.GetOptions{}) + if lastErr == nil || apierrors.IsNotFound(lastErr) { + // API server contacted + return true, nil + } + klog.V(2).Infof("Failed to contact API server when waiting for CSINode publishing: %s", lastErr) + return false, nil + }) + if err != nil { + // In theory this is unreachable, but just in case: + return fmt.Errorf("%v: %v", err, lastErr) + } + + return nil +} diff --git a/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go b/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go index 98a16be6e8a..76b4cc0da1f 100644 --- a/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go +++ b/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go @@ -397,16 +397,16 @@ func (nim *nodeInfoManager) InitializeCSINodeWithAnnotation() error { return goerrors.New("error getting CSI client") } - var updateErrs []error + var lastErr error err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) { - if err := nim.tryInitializeCSINodeWithAnnotation(csiKubeClient); err != nil { - updateErrs = append(updateErrs, err) + if lastErr = nim.tryInitializeCSINodeWithAnnotation(csiKubeClient); lastErr != nil { + klog.V(2).Infof("Failed to publish CSINode: %v", lastErr) return false, nil } return true, nil }) if err != nil { - return fmt.Errorf("error updating CSINode annotation: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs)) + return fmt.Errorf("error updating CSINode annotation: %v; caused by: %v", err, lastErr) } return nil diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 78ccafce094..18ce9cfd6eb 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -1870,7 +1870,7 @@ func (f *fakeVolumeHost) WaitForCacheSync() error { } func (f *fakeVolumeHost) WaitForKubeletErrNil() error { - return wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) { + return wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { f.mux.Lock() defer f.mux.Unlock() return f.kubeletErr == nil, nil