From 557118897de64377e481998e2fea746258701b27 Mon Sep 17 00:00:00 2001 From: "Lubomir I. Ivanov" Date: Sun, 19 Nov 2023 17:56:28 +0200 Subject: [PATCH] kubeadm: drop concurrency when waiting for kubelet /healthz The function wait.go#WaitForKubeletAndFunc() has been used in a number of places in kubeadm. It starts a go routine to wait for the kubelet /healthz and in parallel starts another go routine to wait for an custom function. This logic is problematic. If kubeadm is waiting for the kubelet in parallel with something that requires the kubelet, the right solution would be to first wait for the kubelet in serial and only then proceed with the other action. The parallelism here particularly during "init" required a unwanted "initial timeout" of 40s, before the kubelet waiting even starts. In most cases, this makes the kubelet waiter to not even start, while the main point of waiting becomes the "other action". - Remove the function WaitForKubeletAndFunc() from the Waiter interface. - Rename the function WaitForHealthyKubelet() to just WaitForKubelet() to be consistent with the naming WaitForAPI(). - Update WaitForKubelet() to not use TryRunCommand() and instead use PollUntilContextTimeout(). - Remove the "initial timeout" of 40s in WaitForKubelet(). - Make both WaitForKubelet() and WaitForAPI() use similar error handling and output. - Update all usage of WaitForKubelet() to be a serial call before any other action, such as another wait* call. - Make the default wait timeout for the kubelet /healthz to be 1 minute (kubeadmconstants.DefaultKubeletTimeout). - Apply updates to all implementations of the Waiter interface. --- .../app/cmd/phases/init/waitcontrolplane.go | 27 ++-- cmd/kubeadm/app/cmd/phases/join/kubelet.go | 12 +- cmd/kubeadm/app/constants/constants.go | 2 + .../app/phases/upgrade/staticpods_test.go | 9 +- cmd/kubeadm/app/util/apiclient/wait.go | 118 ++++++++++-------- cmd/kubeadm/app/util/dryrun/dryrun.go | 11 +- 6 files changed, 101 insertions(+), 78 deletions(-) diff --git a/cmd/kubeadm/app/cmd/phases/init/waitcontrolplane.go b/cmd/kubeadm/app/cmd/phases/init/waitcontrolplane.go index bbe023ae3db..f96c9da06ee 100644 --- a/cmd/kubeadm/app/cmd/phases/init/waitcontrolplane.go +++ b/cmd/kubeadm/app/cmd/phases/init/waitcontrolplane.go @@ -26,9 +26,9 @@ import ( "github.com/pkg/errors" clientset "k8s.io/client-go/kubernetes" - "k8s.io/klog/v2" "k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases/workflow" + kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants" "k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient" dryrunutil "k8s.io/kubernetes/cmd/kubeadm/app/util/dryrun" ) @@ -79,24 +79,23 @@ func runWaitControlPlanePhase(c workflow.RunData) error { } } - // waiter holds the apiclient.Waiter implementation of choice, responsible for querying the API server in various ways and waiting for conditions to be fulfilled - klog.V(1).Infoln("[wait-control-plane] Waiting for the API server to be healthy") - - // WaitForAPI uses the /healthz endpoint, thus a client without permissions works fine + // Both Wait* calls below use a /healthz endpoint, thus a client without permissions works fine client, err := data.ClientWithoutBootstrap() if err != nil { return errors.Wrap(err, "cannot obtain client without bootstrap") } - timeout := data.Cfg().ClusterConfiguration.APIServer.TimeoutForControlPlane.Duration - waiter, err := newControlPlaneWaiter(data.DryRun(), timeout, client, data.OutputWriter()) + waiter, err := newControlPlaneWaiter(data.DryRun(), 0, client, data.OutputWriter()) if err != nil { return errors.Wrap(err, "error creating waiter") } - fmt.Printf("[wait-control-plane] Waiting for the kubelet to boot up the control plane as static Pods from directory %q. This can take up to %v\n", data.ManifestDir(), timeout) + controlPlaneTimeout := data.Cfg().ClusterConfiguration.APIServer.TimeoutForControlPlane.Duration + fmt.Printf("[wait-control-plane] Waiting for the kubelet to boot up the control plane as static Pods"+ + " from directory %q\n", + data.ManifestDir()) - if err := waiter.WaitForKubeletAndFunc(waiter.WaitForAPI); err != nil { + handleError := func(err error) error { context := struct { Error string Socket string @@ -109,6 +108,16 @@ func runWaitControlPlanePhase(c workflow.RunData) error { return errors.New("couldn't initialize a Kubernetes cluster") } + waiter.SetTimeout(kubeadmconstants.DefaultKubeletTimeout) + if err := waiter.WaitForKubelet(); err != nil { + return handleError(err) + } + + waiter.SetTimeout(controlPlaneTimeout) + if err := waiter.WaitForAPI(); err != nil { + return handleError(err) + } + return nil } diff --git a/cmd/kubeadm/app/cmd/phases/join/kubelet.go b/cmd/kubeadm/app/cmd/phases/join/kubelet.go index 9558dbf79bd..c20d81b52e2 100644 --- a/cmd/kubeadm/app/cmd/phases/join/kubelet.go +++ b/cmd/kubeadm/app/cmd/phases/join/kubelet.go @@ -205,8 +205,14 @@ func runKubeletStartJoinPhase(c workflow.RunData) (returnErr error) { // Now the kubelet will perform the TLS Bootstrap, transforming /etc/kubernetes/bootstrap-kubelet.conf to /etc/kubernetes/kubelet.conf // Wait for the kubelet to create the /etc/kubernetes/kubelet.conf kubeconfig file. If this process // times out, display a somewhat user-friendly message. - waiter := apiclient.NewKubeWaiter(nil, kubeadmconstants.TLSBootstrapTimeout, os.Stdout) - if err := waiter.WaitForKubeletAndFunc(waitForTLSBootstrappedClient); err != nil { + waiter := apiclient.NewKubeWaiter(nil, 0, os.Stdout) + waiter.SetTimeout(kubeadmconstants.DefaultKubeletTimeout) + if err := waiter.WaitForKubelet(); err != nil { + fmt.Printf(kubeadmJoinFailMsg, err) + return err + } + + if err := waitForTLSBootstrappedClient(); err != nil { fmt.Printf(kubeadmJoinFailMsg, err) return err } @@ -227,7 +233,7 @@ func runKubeletStartJoinPhase(c workflow.RunData) (returnErr error) { // waitForTLSBootstrappedClient waits for the /etc/kubernetes/kubelet.conf file to be available func waitForTLSBootstrappedClient() error { - fmt.Println("[kubelet-start] Waiting for the kubelet to perform the TLS Bootstrap...") + fmt.Println("[kubelet-start] Waiting for the kubelet to perform the TLS Bootstrap") // Loop on every falsy return. Return with an error if raised. Exit successfully if true is returned. return wait.PollImmediate(kubeadmconstants.TLSBootstrapRetryInterval, kubeadmconstants.TLSBootstrapTimeout, func() (bool, error) { diff --git a/cmd/kubeadm/app/constants/constants.go b/cmd/kubeadm/app/constants/constants.go index 2678a5be4f8..937d95a2638 100644 --- a/cmd/kubeadm/app/constants/constants.go +++ b/cmd/kubeadm/app/constants/constants.go @@ -230,6 +230,8 @@ const ( // DefaultControlPlaneTimeout specifies the default control plane (actually API Server) timeout for use by kubeadm DefaultControlPlaneTimeout = 4 * time.Minute + // DefaultKubeletTimeout specifies the default kubelet timeout + DefaultKubeletTimeout = 4 * time.Minute // MinimumAddressesInServiceSubnet defines minimum amount of nodes the Service subnet should allow. // We need at least ten, because the DNS service is always at the tenth cluster clusterIP diff --git a/cmd/kubeadm/app/phases/upgrade/staticpods_test.go b/cmd/kubeadm/app/phases/upgrade/staticpods_test.go index 4781a888c36..94ba22b66ba 100644 --- a/cmd/kubeadm/app/phases/upgrade/staticpods_test.go +++ b/cmd/kubeadm/app/phases/upgrade/staticpods_test.go @@ -131,13 +131,8 @@ func (w *fakeWaiter) WaitForStaticPodHashChange(_, _, _ string) error { return w.errsToReturn[waitForHashChange] } -// WaitForHealthyKubelet returns a dummy nil just to implement the interface -func (w *fakeWaiter) WaitForHealthyKubelet(_ time.Duration, _ string) error { - return nil -} - -// WaitForKubeletAndFunc is a wrapper for WaitForHealthyKubelet that also blocks for a function -func (w *fakeWaiter) WaitForKubeletAndFunc(f func() error) error { +// WaitForHKubelet returns a dummy nil just to implement the interface +func (w *fakeWaiter) WaitForKubelet() error { return nil } diff --git a/cmd/kubeadm/app/util/apiclient/wait.go b/cmd/kubeadm/app/util/apiclient/wait.go index cd417980ff0..7808afdb214 100644 --- a/cmd/kubeadm/app/util/apiclient/wait.go +++ b/cmd/kubeadm/app/util/apiclient/wait.go @@ -50,10 +50,8 @@ type Waiter interface { WaitForStaticPodHashChange(nodeName, component, previousHash string) error // WaitForStaticPodControlPlaneHashes fetches sha256 hashes for the control plane static pods WaitForStaticPodControlPlaneHashes(nodeName string) (map[string]string, error) - // WaitForHealthyKubelet blocks until the kubelet /healthz endpoint returns 'ok' - WaitForHealthyKubelet(initialTimeout time.Duration, healthzEndpoint string) error - // WaitForKubeletAndFunc is a wrapper for WaitForHealthyKubelet that also blocks for a function - WaitForKubeletAndFunc(f func() error) error + // WaitForKubelet blocks until the kubelet /healthz endpoint returns 'ok' + WaitForKubelet() error // SetTimeout adjusts the timeout to the specified duration SetTimeout(timeout time.Duration) } @@ -76,17 +74,28 @@ func NewKubeWaiter(client clientset.Interface, timeout time.Duration, writer io. // WaitForAPI waits for the API Server's /healthz endpoint to report "ok" func (w *KubeWaiter) WaitForAPI() error { - start := time.Now() - return wait.PollImmediate(kubeadmconstants.APICallRetryInterval, w.timeout, func() (bool, error) { - healthStatus := 0 - w.client.Discovery().RESTClient().Get().AbsPath("/healthz").Do(context.TODO()).StatusCode(&healthStatus) - if healthStatus != http.StatusOK { - return false, nil - } + fmt.Printf("[api-check] Waiting for a healthy API server. This can take up to %v\n", w.timeout) - fmt.Printf("[apiclient] All control plane components are healthy after %f seconds\n", time.Since(start).Seconds()) - return true, nil - }) + start := time.Now() + err := wait.PollUntilContextTimeout( + context.Background(), + kubeadmconstants.APICallRetryInterval, + w.timeout, + true, func(ctx context.Context) (bool, error) { + healthStatus := 0 + w.client.Discovery().RESTClient().Get().AbsPath("/healthz").Do(ctx).StatusCode(&healthStatus) + if healthStatus != http.StatusOK { + return false, nil + } + return true, nil + }) + if err != nil { + fmt.Printf("[api-check] The API server is not healthy after %v\n", time.Since(start)) + return err + } + + fmt.Printf("[api-check] The API server is healthy after %v\n", time.Since(start)) + return nil } // WaitForPodsWithLabel will lookup pods with the given label and wait until they are all @@ -133,47 +142,54 @@ func (w *KubeWaiter) WaitForPodToDisappear(podName string) error { }) } -// WaitForHealthyKubelet blocks until the kubelet /healthz endpoint returns 'ok' -func (w *KubeWaiter) WaitForHealthyKubelet(initialTimeout time.Duration, healthzEndpoint string) error { - time.Sleep(initialTimeout) - fmt.Printf("[kubelet-check] Initial timeout of %v passed.\n", initialTimeout) - return TryRunCommand(func() error { - client := &http.Client{Transport: netutil.SetOldTransportDefaults(&http.Transport{})} - resp, err := client.Get(healthzEndpoint) - if err != nil { - fmt.Println("[kubelet-check] It seems like the kubelet isn't running or healthy.") - fmt.Printf("[kubelet-check] The HTTP call equal to 'curl -sSL %s' failed with error: %v.\n", healthzEndpoint, err) - return err - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - fmt.Println("[kubelet-check] It seems like the kubelet isn't running or healthy.") - fmt.Printf("[kubelet-check] The HTTP call equal to 'curl -sSL %s' returned HTTP code %d\n", healthzEndpoint, resp.StatusCode) - return errors.New("the kubelet healthz endpoint is unhealthy") - } - return nil - }, 5) // a failureThreshold of five means waiting for a total of 155 seconds -} +// WaitForKubelet blocks until the kubelet /healthz endpoint returns 'ok'. +func (w *KubeWaiter) WaitForKubelet() error { + var ( + lastError error + start = time.Now() + healthzEndpoint = fmt.Sprintf("http://localhost:%d/healthz", kubeadmconstants.KubeletHealthzPort) + ) -// WaitForKubeletAndFunc waits primarily for the function f to execute, even though it might take some time. If that takes a long time, and the kubelet -// /healthz continuously are unhealthy, kubeadm will error out after a period of exponential backoff -func (w *KubeWaiter) WaitForKubeletAndFunc(f func() error) error { - errorChan := make(chan error, 1) + fmt.Printf("[kubelet-check] Waiting for a healthy kubelet. This can take up to %v\n", w.timeout) - go func(errC chan error, waiter Waiter) { - if err := waiter.WaitForHealthyKubelet(40*time.Second, fmt.Sprintf("http://localhost:%d/healthz", kubeadmconstants.KubeletHealthzPort)); err != nil { - errC <- err - } - }(errorChan, w) + formatError := func(cause string) error { + return errors.Errorf("The HTTP call equal to 'curl -sSL %s' returned %s\n", + healthzEndpoint, cause) + } - go func(errC chan error) { - // This main goroutine sends whatever the f function returns (error or not) to the channel - // This in order to continue on success (nil error), or just fail if the function returns an error - errC <- f() - }(errorChan) + err := wait.PollUntilContextTimeout( + context.Background(), + kubeadmconstants.APICallRetryInterval, + w.timeout, + true, func(ctx context.Context) (bool, error) { + client := &http.Client{Transport: netutil.SetOldTransportDefaults(&http.Transport{})} + req, err := http.NewRequestWithContext(ctx, http.MethodGet, healthzEndpoint, nil) + if err != nil { + lastError = formatError(fmt.Sprintf("error: %v", err)) + return false, err + } + resp, err := client.Do(req) + if err != nil { + lastError = formatError(fmt.Sprintf("error: %v", err)) + return false, nil + } + defer func() { + _ = resp.Body.Close() + }() + if resp.StatusCode != http.StatusOK { + lastError = formatError(fmt.Sprintf("status code: %d", resp.StatusCode)) + return false, nil + } - // This call is blocking until one of the goroutines sends to errorChan - return <-errorChan + return true, nil + }) + if err != nil { + fmt.Printf("[kubelet-check] The kubelet is not healthy after %v\n", time.Since(start)) + return lastError + } + + fmt.Printf("[kubelet-check] The kubelet is healthy after %v\n", time.Since(start)) + return nil } // SetTimeout adjusts the timeout to the specified duration diff --git a/cmd/kubeadm/app/util/dryrun/dryrun.go b/cmd/kubeadm/app/util/dryrun/dryrun.go index 0e150eca31b..b1c4e7a4712 100644 --- a/cmd/kubeadm/app/util/dryrun/dryrun.go +++ b/cmd/kubeadm/app/util/dryrun/dryrun.go @@ -106,14 +106,9 @@ func (w *Waiter) WaitForPodToDisappear(podName string) error { return nil } -// WaitForHealthyKubelet blocks until the kubelet /healthz endpoint returns 'ok' -func (w *Waiter) WaitForHealthyKubelet(_ time.Duration, healthzEndpoint string) error { - fmt.Printf("[dryrun] Would make sure the kubelet %q endpoint is healthy\n", healthzEndpoint) - return nil -} - -// WaitForKubeletAndFunc is a wrapper for WaitForHealthyKubelet that also blocks for a function -func (w *Waiter) WaitForKubeletAndFunc(f func() error) error { +// WaitForKubelet blocks until the kubelet /healthz endpoint returns 'ok' +func (w *Waiter) WaitForKubelet() error { + fmt.Println("[dryrun] Would make sure the kubelet's /healthz endpoint is healthy") return nil }