kubeadm: drop concurrency when waiting for kubelet /healthz

The function wait.go#WaitForKubeletAndFunc() has been used in
a number of places in kubeadm. It starts a go routine to wait for
the kubelet /healthz and in parallel starts another go routine
to wait for an custom function.

This logic is problematic. If kubeadm is waiting for the kubelet
in parallel with something that requires the kubelet, the right
solution would be to first wait for the kubelet in serial and only
then proceed with the other action. The parallelism here particularly
during "init" required a unwanted "initial timeout" of 40s, before
the kubelet waiting even starts. In most cases, this makes the kubelet
waiter to not even start, while the main point of waiting becomes
the "other action".

- Remove the function WaitForKubeletAndFunc() from the Waiter interface.
- Rename the function WaitForHealthyKubelet() to just WaitForKubelet()
to be consistent with the naming WaitForAPI().
- Update WaitForKubelet() to not use TryRunCommand() and instead
use PollUntilContextTimeout().
- Remove the "initial timeout" of 40s in WaitForKubelet().
- Make both WaitForKubelet() and WaitForAPI() use similar error
handling and output.
- Update all usage of WaitForKubelet() to be a serial call before
any other action, such as another wait* call.
- Make the default wait timeout for the kubelet
/healthz to be 1 minute (kubeadmconstants.DefaultKubeletTimeout).
- Apply updates to all implementations of the Waiter interface.
This commit is contained in:
Lubomir I. Ivanov 2023-11-19 17:56:28 +02:00
parent 1f07da7575
commit 557118897d
6 changed files with 101 additions and 78 deletions

View File

@ -26,9 +26,9 @@ import (
"github.com/pkg/errors"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases/workflow"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
dryrunutil "k8s.io/kubernetes/cmd/kubeadm/app/util/dryrun"
)
@ -79,24 +79,23 @@ func runWaitControlPlanePhase(c workflow.RunData) error {
}
}
// waiter holds the apiclient.Waiter implementation of choice, responsible for querying the API server in various ways and waiting for conditions to be fulfilled
klog.V(1).Infoln("[wait-control-plane] Waiting for the API server to be healthy")
// WaitForAPI uses the /healthz endpoint, thus a client without permissions works fine
// Both Wait* calls below use a /healthz endpoint, thus a client without permissions works fine
client, err := data.ClientWithoutBootstrap()
if err != nil {
return errors.Wrap(err, "cannot obtain client without bootstrap")
}
timeout := data.Cfg().ClusterConfiguration.APIServer.TimeoutForControlPlane.Duration
waiter, err := newControlPlaneWaiter(data.DryRun(), timeout, client, data.OutputWriter())
waiter, err := newControlPlaneWaiter(data.DryRun(), 0, client, data.OutputWriter())
if err != nil {
return errors.Wrap(err, "error creating waiter")
}
fmt.Printf("[wait-control-plane] Waiting for the kubelet to boot up the control plane as static Pods from directory %q. This can take up to %v\n", data.ManifestDir(), timeout)
controlPlaneTimeout := data.Cfg().ClusterConfiguration.APIServer.TimeoutForControlPlane.Duration
fmt.Printf("[wait-control-plane] Waiting for the kubelet to boot up the control plane as static Pods"+
" from directory %q\n",
data.ManifestDir())
if err := waiter.WaitForKubeletAndFunc(waiter.WaitForAPI); err != nil {
handleError := func(err error) error {
context := struct {
Error string
Socket string
@ -109,6 +108,16 @@ func runWaitControlPlanePhase(c workflow.RunData) error {
return errors.New("couldn't initialize a Kubernetes cluster")
}
waiter.SetTimeout(kubeadmconstants.DefaultKubeletTimeout)
if err := waiter.WaitForKubelet(); err != nil {
return handleError(err)
}
waiter.SetTimeout(controlPlaneTimeout)
if err := waiter.WaitForAPI(); err != nil {
return handleError(err)
}
return nil
}

View File

@ -205,8 +205,14 @@ func runKubeletStartJoinPhase(c workflow.RunData) (returnErr error) {
// Now the kubelet will perform the TLS Bootstrap, transforming /etc/kubernetes/bootstrap-kubelet.conf to /etc/kubernetes/kubelet.conf
// Wait for the kubelet to create the /etc/kubernetes/kubelet.conf kubeconfig file. If this process
// times out, display a somewhat user-friendly message.
waiter := apiclient.NewKubeWaiter(nil, kubeadmconstants.TLSBootstrapTimeout, os.Stdout)
if err := waiter.WaitForKubeletAndFunc(waitForTLSBootstrappedClient); err != nil {
waiter := apiclient.NewKubeWaiter(nil, 0, os.Stdout)
waiter.SetTimeout(kubeadmconstants.DefaultKubeletTimeout)
if err := waiter.WaitForKubelet(); err != nil {
fmt.Printf(kubeadmJoinFailMsg, err)
return err
}
if err := waitForTLSBootstrappedClient(); err != nil {
fmt.Printf(kubeadmJoinFailMsg, err)
return err
}
@ -227,7 +233,7 @@ func runKubeletStartJoinPhase(c workflow.RunData) (returnErr error) {
// waitForTLSBootstrappedClient waits for the /etc/kubernetes/kubelet.conf file to be available
func waitForTLSBootstrappedClient() error {
fmt.Println("[kubelet-start] Waiting for the kubelet to perform the TLS Bootstrap...")
fmt.Println("[kubelet-start] Waiting for the kubelet to perform the TLS Bootstrap")
// Loop on every falsy return. Return with an error if raised. Exit successfully if true is returned.
return wait.PollImmediate(kubeadmconstants.TLSBootstrapRetryInterval, kubeadmconstants.TLSBootstrapTimeout, func() (bool, error) {

View File

@ -230,6 +230,8 @@ const (
// DefaultControlPlaneTimeout specifies the default control plane (actually API Server) timeout for use by kubeadm
DefaultControlPlaneTimeout = 4 * time.Minute
// DefaultKubeletTimeout specifies the default kubelet timeout
DefaultKubeletTimeout = 4 * time.Minute
// MinimumAddressesInServiceSubnet defines minimum amount of nodes the Service subnet should allow.
// We need at least ten, because the DNS service is always at the tenth cluster clusterIP

View File

@ -131,13 +131,8 @@ func (w *fakeWaiter) WaitForStaticPodHashChange(_, _, _ string) error {
return w.errsToReturn[waitForHashChange]
}
// WaitForHealthyKubelet returns a dummy nil just to implement the interface
func (w *fakeWaiter) WaitForHealthyKubelet(_ time.Duration, _ string) error {
return nil
}
// WaitForKubeletAndFunc is a wrapper for WaitForHealthyKubelet that also blocks for a function
func (w *fakeWaiter) WaitForKubeletAndFunc(f func() error) error {
// WaitForHKubelet returns a dummy nil just to implement the interface
func (w *fakeWaiter) WaitForKubelet() error {
return nil
}

View File

@ -50,10 +50,8 @@ type Waiter interface {
WaitForStaticPodHashChange(nodeName, component, previousHash string) error
// WaitForStaticPodControlPlaneHashes fetches sha256 hashes for the control plane static pods
WaitForStaticPodControlPlaneHashes(nodeName string) (map[string]string, error)
// WaitForHealthyKubelet blocks until the kubelet /healthz endpoint returns 'ok'
WaitForHealthyKubelet(initialTimeout time.Duration, healthzEndpoint string) error
// WaitForKubeletAndFunc is a wrapper for WaitForHealthyKubelet that also blocks for a function
WaitForKubeletAndFunc(f func() error) error
// WaitForKubelet blocks until the kubelet /healthz endpoint returns 'ok'
WaitForKubelet() error
// SetTimeout adjusts the timeout to the specified duration
SetTimeout(timeout time.Duration)
}
@ -76,17 +74,28 @@ func NewKubeWaiter(client clientset.Interface, timeout time.Duration, writer io.
// WaitForAPI waits for the API Server's /healthz endpoint to report "ok"
func (w *KubeWaiter) WaitForAPI() error {
start := time.Now()
return wait.PollImmediate(kubeadmconstants.APICallRetryInterval, w.timeout, func() (bool, error) {
healthStatus := 0
w.client.Discovery().RESTClient().Get().AbsPath("/healthz").Do(context.TODO()).StatusCode(&healthStatus)
if healthStatus != http.StatusOK {
return false, nil
}
fmt.Printf("[api-check] Waiting for a healthy API server. This can take up to %v\n", w.timeout)
fmt.Printf("[apiclient] All control plane components are healthy after %f seconds\n", time.Since(start).Seconds())
return true, nil
})
start := time.Now()
err := wait.PollUntilContextTimeout(
context.Background(),
kubeadmconstants.APICallRetryInterval,
w.timeout,
true, func(ctx context.Context) (bool, error) {
healthStatus := 0
w.client.Discovery().RESTClient().Get().AbsPath("/healthz").Do(ctx).StatusCode(&healthStatus)
if healthStatus != http.StatusOK {
return false, nil
}
return true, nil
})
if err != nil {
fmt.Printf("[api-check] The API server is not healthy after %v\n", time.Since(start))
return err
}
fmt.Printf("[api-check] The API server is healthy after %v\n", time.Since(start))
return nil
}
// WaitForPodsWithLabel will lookup pods with the given label and wait until they are all
@ -133,47 +142,54 @@ func (w *KubeWaiter) WaitForPodToDisappear(podName string) error {
})
}
// WaitForHealthyKubelet blocks until the kubelet /healthz endpoint returns 'ok'
func (w *KubeWaiter) WaitForHealthyKubelet(initialTimeout time.Duration, healthzEndpoint string) error {
time.Sleep(initialTimeout)
fmt.Printf("[kubelet-check] Initial timeout of %v passed.\n", initialTimeout)
return TryRunCommand(func() error {
client := &http.Client{Transport: netutil.SetOldTransportDefaults(&http.Transport{})}
resp, err := client.Get(healthzEndpoint)
if err != nil {
fmt.Println("[kubelet-check] It seems like the kubelet isn't running or healthy.")
fmt.Printf("[kubelet-check] The HTTP call equal to 'curl -sSL %s' failed with error: %v.\n", healthzEndpoint, err)
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
fmt.Println("[kubelet-check] It seems like the kubelet isn't running or healthy.")
fmt.Printf("[kubelet-check] The HTTP call equal to 'curl -sSL %s' returned HTTP code %d\n", healthzEndpoint, resp.StatusCode)
return errors.New("the kubelet healthz endpoint is unhealthy")
}
return nil
}, 5) // a failureThreshold of five means waiting for a total of 155 seconds
}
// WaitForKubelet blocks until the kubelet /healthz endpoint returns 'ok'.
func (w *KubeWaiter) WaitForKubelet() error {
var (
lastError error
start = time.Now()
healthzEndpoint = fmt.Sprintf("http://localhost:%d/healthz", kubeadmconstants.KubeletHealthzPort)
)
// WaitForKubeletAndFunc waits primarily for the function f to execute, even though it might take some time. If that takes a long time, and the kubelet
// /healthz continuously are unhealthy, kubeadm will error out after a period of exponential backoff
func (w *KubeWaiter) WaitForKubeletAndFunc(f func() error) error {
errorChan := make(chan error, 1)
fmt.Printf("[kubelet-check] Waiting for a healthy kubelet. This can take up to %v\n", w.timeout)
go func(errC chan error, waiter Waiter) {
if err := waiter.WaitForHealthyKubelet(40*time.Second, fmt.Sprintf("http://localhost:%d/healthz", kubeadmconstants.KubeletHealthzPort)); err != nil {
errC <- err
}
}(errorChan, w)
formatError := func(cause string) error {
return errors.Errorf("The HTTP call equal to 'curl -sSL %s' returned %s\n",
healthzEndpoint, cause)
}
go func(errC chan error) {
// This main goroutine sends whatever the f function returns (error or not) to the channel
// This in order to continue on success (nil error), or just fail if the function returns an error
errC <- f()
}(errorChan)
err := wait.PollUntilContextTimeout(
context.Background(),
kubeadmconstants.APICallRetryInterval,
w.timeout,
true, func(ctx context.Context) (bool, error) {
client := &http.Client{Transport: netutil.SetOldTransportDefaults(&http.Transport{})}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, healthzEndpoint, nil)
if err != nil {
lastError = formatError(fmt.Sprintf("error: %v", err))
return false, err
}
resp, err := client.Do(req)
if err != nil {
lastError = formatError(fmt.Sprintf("error: %v", err))
return false, nil
}
defer func() {
_ = resp.Body.Close()
}()
if resp.StatusCode != http.StatusOK {
lastError = formatError(fmt.Sprintf("status code: %d", resp.StatusCode))
return false, nil
}
// This call is blocking until one of the goroutines sends to errorChan
return <-errorChan
return true, nil
})
if err != nil {
fmt.Printf("[kubelet-check] The kubelet is not healthy after %v\n", time.Since(start))
return lastError
}
fmt.Printf("[kubelet-check] The kubelet is healthy after %v\n", time.Since(start))
return nil
}
// SetTimeout adjusts the timeout to the specified duration

View File

@ -106,14 +106,9 @@ func (w *Waiter) WaitForPodToDisappear(podName string) error {
return nil
}
// WaitForHealthyKubelet blocks until the kubelet /healthz endpoint returns 'ok'
func (w *Waiter) WaitForHealthyKubelet(_ time.Duration, healthzEndpoint string) error {
fmt.Printf("[dryrun] Would make sure the kubelet %q endpoint is healthy\n", healthzEndpoint)
return nil
}
// WaitForKubeletAndFunc is a wrapper for WaitForHealthyKubelet that also blocks for a function
func (w *Waiter) WaitForKubeletAndFunc(f func() error) error {
// WaitForKubelet blocks until the kubelet /healthz endpoint returns 'ok'
func (w *Waiter) WaitForKubelet() error {
fmt.Println("[dryrun] Would make sure the kubelet's /healthz endpoint is healthy")
return nil
}