mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 04:33:26 +00:00
Merge pull request #89589 from jsafrane/fix-node-startup
Wait for APIServer 'ok' forever during CSINode initialization during Kubelet init
This commit is contained in:
commit
e178cacb97
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package csi
|
package csi
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
@ -24,8 +25,6 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
|
|
||||||
api "k8s.io/api/core/v1"
|
api "k8s.io/api/core/v1"
|
||||||
@ -227,10 +226,10 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
|
|||||||
|
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) &&
|
if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) &&
|
||||||
utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) {
|
utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) {
|
||||||
// This function prevents Kubelet from posting Ready status until CSINodeInfo
|
// This function prevents Kubelet from posting Ready status until CSINode
|
||||||
// is both installed and initialized
|
// is both installed and initialized
|
||||||
if err := initializeCSINode(host); err != nil {
|
if err := initializeCSINode(host); err != nil {
|
||||||
return errors.New(log("failed to initialize CSINodeInfo: %v", err))
|
return errors.New(log("failed to initialize CSINode: %v", err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -240,21 +239,28 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
|
|||||||
func initializeCSINode(host volume.VolumeHost) error {
|
func initializeCSINode(host volume.VolumeHost) error {
|
||||||
kvh, ok := host.(volume.KubeletVolumeHost)
|
kvh, ok := host.(volume.KubeletVolumeHost)
|
||||||
if !ok {
|
if !ok {
|
||||||
klog.V(4).Info("Cast from VolumeHost to KubeletVolumeHost failed. Skipping CSINodeInfo initialization, not running on kubelet")
|
klog.V(4).Info("Cast from VolumeHost to KubeletVolumeHost failed. Skipping CSINode initialization, not running on kubelet")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
kubeClient := host.GetKubeClient()
|
kubeClient := host.GetKubeClient()
|
||||||
if kubeClient == nil {
|
if kubeClient == nil {
|
||||||
// Kubelet running in standalone mode. Skip CSINodeInfo initialization
|
// Kubelet running in standalone mode. Skip CSINode initialization
|
||||||
klog.Warning("Skipping CSINodeInfo initialization, kubelet running in standalone mode")
|
klog.Warning("Skipping CSINode initialization, kubelet running in standalone mode")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
kvh.SetKubeletError(errors.New("CSINodeInfo is not yet initialized"))
|
kvh.SetKubeletError(errors.New("CSINode is not yet initialized"))
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
|
|
||||||
|
// First wait indefinitely to talk to Kube APIServer
|
||||||
|
nodeName := host.GetNodeName()
|
||||||
|
err := waitForAPIServerForever(kubeClient, nodeName)
|
||||||
|
if err != nil {
|
||||||
|
klog.Fatalf("Failed to initialize CSINode while waiting for API server to report ok: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Backoff parameters tuned to retry over 140 seconds. Will fail and restart the Kubelet
|
// Backoff parameters tuned to retry over 140 seconds. Will fail and restart the Kubelet
|
||||||
// after max retry steps.
|
// after max retry steps.
|
||||||
initBackoff := wait.Backoff{
|
initBackoff := wait.Backoff{
|
||||||
@ -263,12 +269,12 @@ func initializeCSINode(host volume.VolumeHost) error {
|
|||||||
Factor: 6.0,
|
Factor: 6.0,
|
||||||
Jitter: 0.1,
|
Jitter: 0.1,
|
||||||
}
|
}
|
||||||
err := wait.ExponentialBackoff(initBackoff, func() (bool, error) {
|
err = wait.ExponentialBackoff(initBackoff, func() (bool, error) {
|
||||||
klog.V(4).Infof("Initializing migrated drivers on CSINodeInfo")
|
klog.V(4).Infof("Initializing migrated drivers on CSINode")
|
||||||
err := nim.InitializeCSINodeWithAnnotation()
|
err := nim.InitializeCSINodeWithAnnotation()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
kvh.SetKubeletError(fmt.Errorf("Failed to initialize CSINodeInfo: %v", err))
|
kvh.SetKubeletError(fmt.Errorf("Failed to initialize CSINode: %v", err))
|
||||||
klog.Errorf("Failed to initialize CSINodeInfo: %v", err)
|
klog.Errorf("Failed to initialize CSINode: %v", err)
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -282,7 +288,7 @@ func initializeCSINode(host volume.VolumeHost) error {
|
|||||||
// using CSI for all Migrated volume plugins. Then all the CSINode initialization
|
// using CSI for all Migrated volume plugins. Then all the CSINode initialization
|
||||||
// code can be dropped from Kubelet.
|
// code can be dropped from Kubelet.
|
||||||
// Kill the Kubelet process and allow it to restart to retry initialization
|
// Kill the Kubelet process and allow it to restart to retry initialization
|
||||||
klog.Fatalf("Failed to initialize CSINodeInfo after retrying")
|
klog.Fatalf("Failed to initialize CSINode after retrying: %v", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return nil
|
return nil
|
||||||
@ -914,3 +920,28 @@ func highestSupportedVersion(versions []string) (*utilversion.Version, error) {
|
|||||||
}
|
}
|
||||||
return highestSupportedVersion, nil
|
return highestSupportedVersion, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// waitForAPIServerForever waits forever to get a CSINode instance as a proxy
|
||||||
|
// for a healthy APIServer
|
||||||
|
func waitForAPIServerForever(client clientset.Interface, nodeName types.NodeName) error {
|
||||||
|
var lastErr error
|
||||||
|
err := wait.PollImmediateInfinite(time.Second, func() (bool, error) {
|
||||||
|
// Get a CSINode from API server to make sure 1) kubelet can reach API server
|
||||||
|
// and 2) it has enough permissions. Kubelet may have restricted permissions
|
||||||
|
// when it's bootstrapping TLS.
|
||||||
|
// https://kubernetes.io/docs/reference/command-line-tools-reference/kubelet-tls-bootstrapping/
|
||||||
|
_, lastErr = client.StorageV1().CSINodes().Get(context.TODO(), string(nodeName), meta.GetOptions{})
|
||||||
|
if lastErr == nil || apierrors.IsNotFound(lastErr) {
|
||||||
|
// API server contacted
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
klog.V(2).Infof("Failed to contact API server when waiting for CSINode publishing: %s", lastErr)
|
||||||
|
return false, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
// In theory this is unreachable, but just in case:
|
||||||
|
return fmt.Errorf("%v: %v", err, lastErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -397,16 +397,16 @@ func (nim *nodeInfoManager) InitializeCSINodeWithAnnotation() error {
|
|||||||
return goerrors.New("error getting CSI client")
|
return goerrors.New("error getting CSI client")
|
||||||
}
|
}
|
||||||
|
|
||||||
var updateErrs []error
|
var lastErr error
|
||||||
err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
|
err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
|
||||||
if err := nim.tryInitializeCSINodeWithAnnotation(csiKubeClient); err != nil {
|
if lastErr = nim.tryInitializeCSINodeWithAnnotation(csiKubeClient); lastErr != nil {
|
||||||
updateErrs = append(updateErrs, err)
|
klog.V(2).Infof("Failed to publish CSINode: %v", lastErr)
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error updating CSINode annotation: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
|
return fmt.Errorf("error updating CSINode annotation: %v; caused by: %v", err, lastErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -1870,7 +1870,7 @@ func (f *fakeVolumeHost) WaitForCacheSync() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (f *fakeVolumeHost) WaitForKubeletErrNil() error {
|
func (f *fakeVolumeHost) WaitForKubeletErrNil() error {
|
||||||
return wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) {
|
return wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
|
||||||
f.mux.Lock()
|
f.mux.Lock()
|
||||||
defer f.mux.Unlock()
|
defer f.mux.Unlock()
|
||||||
return f.kubeletErr == nil, nil
|
return f.kubeletErr == nil, nil
|
||||||
|
Loading…
Reference in New Issue
Block a user