From 8bdbd4d683df36ccdefd4d491e8042f4099fc74b Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Fri, 27 Mar 2020 18:29:34 +0100 Subject: [PATCH] Fix CSINodeInfo startup To speed up unit tests and add more observability when things go wrong. --- pkg/volume/csi/csi_plugin.go | 31 +++++++++++-------- .../csi/nodeinfomanager/nodeinfomanager.go | 8 ++--- pkg/volume/testing/testing.go | 2 +- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 77182743275..80d24482939 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -17,6 +17,7 @@ limitations under the License. package csi import ( + "context" "errors" "fmt" "os" @@ -24,8 +25,6 @@ import ( "strings" "time" - "context" - "k8s.io/klog" api "k8s.io/api/core/v1" @@ -256,7 +255,8 @@ func initializeCSINode(host volume.VolumeHost) error { defer utilruntime.HandleCrash() // First wait indefinitely to talk to Kube APIServer - err := waitForAPIServerForever(kubeClient) + nodeName := host.GetNodeName() + err := waitForAPIServerForever(kubeClient, nodeName) if err != nil { klog.Fatalf("Failed to initialize CSINode while waiting for API server to report ok: %v", err) } @@ -921,20 +921,25 @@ func highestSupportedVersion(versions []string) (*utilversion.Version, error) { return highestSupportedVersion, nil } -// waitForAPIServerForever waits forever to get the APIServer Version as a proxy -// for a healthy APIServer. -func waitForAPIServerForever(client clientset.Interface) error { +// waitForAPIServerForever waits forever to get a CSINode instance as a proxy +// for a healthy APIServer +func waitForAPIServerForever(client clientset.Interface, nodeName types.NodeName) error { var lastErr error - err := wait.PollInfinite(time.Second, func() (bool, error) { - _, lastErr = client.Discovery().ServerVersion() - if lastErr != nil { - lastErr = fmt.Errorf("failed to get apiserver version: %v", lastErr) - return false, nil + err := wait.PollImmediateInfinite(time.Second, func() (bool, error) { + // Get a CSINode from API server to make sure 1) kubelet can reach API server + // and 2) it has enough permissions. Kubelet may have restricted permissions + // when it's bootstrapping TLS. + // https://kubernetes.io/docs/reference/command-line-tools-reference/kubelet-tls-bootstrapping/ + _, lastErr = client.StorageV1().CSINodes().Get(context.TODO(), string(nodeName), meta.GetOptions{}) + if lastErr == nil || apierrors.IsNotFound(lastErr) { + // API server contacted + return true, nil } - - return true, nil + klog.V(2).Infof("Failed to contact API server when waiting for CSINode publishing: %s", lastErr) + return false, nil }) if err != nil { + // In theory this is unreachable, but just in case: return fmt.Errorf("%v: %v", err, lastErr) } diff --git a/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go b/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go index 98a16be6e8a..76b4cc0da1f 100644 --- a/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go +++ b/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go @@ -397,16 +397,16 @@ func (nim *nodeInfoManager) InitializeCSINodeWithAnnotation() error { return goerrors.New("error getting CSI client") } - var updateErrs []error + var lastErr error err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) { - if err := nim.tryInitializeCSINodeWithAnnotation(csiKubeClient); err != nil { - updateErrs = append(updateErrs, err) + if lastErr = nim.tryInitializeCSINodeWithAnnotation(csiKubeClient); lastErr != nil { + klog.V(2).Infof("Failed to publish CSINode: %v", lastErr) return false, nil } return true, nil }) if err != nil { - return fmt.Errorf("error updating CSINode annotation: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs)) + return fmt.Errorf("error updating CSINode annotation: %v; caused by: %v", err, lastErr) } return nil diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 78ccafce094..18ce9cfd6eb 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -1870,7 +1870,7 @@ func (f *fakeVolumeHost) WaitForCacheSync() error { } func (f *fakeVolumeHost) WaitForKubeletErrNil() error { - return wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) { + return wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { f.mux.Lock() defer f.mux.Unlock() return f.kubeletErr == nil, nil