Fix CSINodeInfo startup

To speed up unit tests and add more observability when things go wrong.
This commit is contained in:
Jan Safranek 2020-03-27 18:29:34 +01:00
parent 8205f815fb
commit 8bdbd4d683
3 changed files with 23 additions and 18 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package csi package csi
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"os" "os"
@ -24,8 +25,6 @@ import (
"strings" "strings"
"time" "time"
"context"
"k8s.io/klog" "k8s.io/klog"
api "k8s.io/api/core/v1" api "k8s.io/api/core/v1"
@ -256,7 +255,8 @@ func initializeCSINode(host volume.VolumeHost) error {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
// First wait indefinitely to talk to Kube APIServer // First wait indefinitely to talk to Kube APIServer
err := waitForAPIServerForever(kubeClient) nodeName := host.GetNodeName()
err := waitForAPIServerForever(kubeClient, nodeName)
if err != nil { if err != nil {
klog.Fatalf("Failed to initialize CSINode while waiting for API server to report ok: %v", err) klog.Fatalf("Failed to initialize CSINode while waiting for API server to report ok: %v", err)
} }
@ -921,20 +921,25 @@ func highestSupportedVersion(versions []string) (*utilversion.Version, error) {
return highestSupportedVersion, nil return highestSupportedVersion, nil
} }
// waitForAPIServerForever waits forever to get the APIServer Version as a proxy // waitForAPIServerForever waits forever to get a CSINode instance as a proxy
// for a healthy APIServer. // for a healthy APIServer
func waitForAPIServerForever(client clientset.Interface) error { func waitForAPIServerForever(client clientset.Interface, nodeName types.NodeName) error {
var lastErr error var lastErr error
err := wait.PollInfinite(time.Second, func() (bool, error) { err := wait.PollImmediateInfinite(time.Second, func() (bool, error) {
_, lastErr = client.Discovery().ServerVersion() // Get a CSINode from API server to make sure 1) kubelet can reach API server
if lastErr != nil { // and 2) it has enough permissions. Kubelet may have restricted permissions
lastErr = fmt.Errorf("failed to get apiserver version: %v", lastErr) // when it's bootstrapping TLS.
return false, nil // https://kubernetes.io/docs/reference/command-line-tools-reference/kubelet-tls-bootstrapping/
_, lastErr = client.StorageV1().CSINodes().Get(context.TODO(), string(nodeName), meta.GetOptions{})
if lastErr == nil || apierrors.IsNotFound(lastErr) {
// API server contacted
return true, nil
} }
klog.V(2).Infof("Failed to contact API server when waiting for CSINode publishing: %s", lastErr)
return true, nil return false, nil
}) })
if err != nil { if err != nil {
// In theory this is unreachable, but just in case:
return fmt.Errorf("%v: %v", err, lastErr) return fmt.Errorf("%v: %v", err, lastErr)
} }

View File

@ -397,16 +397,16 @@ func (nim *nodeInfoManager) InitializeCSINodeWithAnnotation() error {
return goerrors.New("error getting CSI client") return goerrors.New("error getting CSI client")
} }
var updateErrs []error var lastErr error
err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) { err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
if err := nim.tryInitializeCSINodeWithAnnotation(csiKubeClient); err != nil { if lastErr = nim.tryInitializeCSINodeWithAnnotation(csiKubeClient); lastErr != nil {
updateErrs = append(updateErrs, err) klog.V(2).Infof("Failed to publish CSINode: %v", lastErr)
return false, nil return false, nil
} }
return true, nil return true, nil
}) })
if err != nil { if err != nil {
return fmt.Errorf("error updating CSINode annotation: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs)) return fmt.Errorf("error updating CSINode annotation: %v; caused by: %v", err, lastErr)
} }
return nil return nil

View File

@ -1870,7 +1870,7 @@ func (f *fakeVolumeHost) WaitForCacheSync() error {
} }
func (f *fakeVolumeHost) WaitForKubeletErrNil() error { func (f *fakeVolumeHost) WaitForKubeletErrNil() error {
return wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) { return wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
f.mux.Lock() f.mux.Lock()
defer f.mux.Unlock() defer f.mux.Unlock()
return f.kubeletErr == nil, nil return f.kubeletErr == nil, nil