diff --git a/cmd/kubeadm/app/cmd/phases/init/waitcontrolplane.go b/cmd/kubeadm/app/cmd/phases/init/waitcontrolplane.go index bbe023ae3db..f96c9da06ee 100644 --- a/cmd/kubeadm/app/cmd/phases/init/waitcontrolplane.go +++ b/cmd/kubeadm/app/cmd/phases/init/waitcontrolplane.go @@ -26,9 +26,9 @@ import ( "github.com/pkg/errors" clientset "k8s.io/client-go/kubernetes" - "k8s.io/klog/v2" "k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases/workflow" + kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants" "k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient" dryrunutil "k8s.io/kubernetes/cmd/kubeadm/app/util/dryrun" ) @@ -79,24 +79,23 @@ func runWaitControlPlanePhase(c workflow.RunData) error { } } - // waiter holds the apiclient.Waiter implementation of choice, responsible for querying the API server in various ways and waiting for conditions to be fulfilled - klog.V(1).Infoln("[wait-control-plane] Waiting for the API server to be healthy") - - // WaitForAPI uses the /healthz endpoint, thus a client without permissions works fine + // Both Wait* calls below use a /healthz endpoint, thus a client without permissions works fine client, err := data.ClientWithoutBootstrap() if err != nil { return errors.Wrap(err, "cannot obtain client without bootstrap") } - timeout := data.Cfg().ClusterConfiguration.APIServer.TimeoutForControlPlane.Duration - waiter, err := newControlPlaneWaiter(data.DryRun(), timeout, client, data.OutputWriter()) + waiter, err := newControlPlaneWaiter(data.DryRun(), 0, client, data.OutputWriter()) if err != nil { return errors.Wrap(err, "error creating waiter") } - fmt.Printf("[wait-control-plane] Waiting for the kubelet to boot up the control plane as static Pods from directory %q. This can take up to %v\n", data.ManifestDir(), timeout) + controlPlaneTimeout := data.Cfg().ClusterConfiguration.APIServer.TimeoutForControlPlane.Duration + fmt.Printf("[wait-control-plane] Waiting for the kubelet to boot up the control plane as static Pods"+ + " from directory %q\n", + data.ManifestDir()) - if err := waiter.WaitForKubeletAndFunc(waiter.WaitForAPI); err != nil { + handleError := func(err error) error { context := struct { Error string Socket string @@ -109,6 +108,16 @@ func runWaitControlPlanePhase(c workflow.RunData) error { return errors.New("couldn't initialize a Kubernetes cluster") } + waiter.SetTimeout(kubeadmconstants.DefaultKubeletTimeout) + if err := waiter.WaitForKubelet(); err != nil { + return handleError(err) + } + + waiter.SetTimeout(controlPlaneTimeout) + if err := waiter.WaitForAPI(); err != nil { + return handleError(err) + } + return nil } diff --git a/cmd/kubeadm/app/cmd/phases/join/kubelet.go b/cmd/kubeadm/app/cmd/phases/join/kubelet.go index 9558dbf79bd..c20d81b52e2 100644 --- a/cmd/kubeadm/app/cmd/phases/join/kubelet.go +++ b/cmd/kubeadm/app/cmd/phases/join/kubelet.go @@ -205,8 +205,14 @@ func runKubeletStartJoinPhase(c workflow.RunData) (returnErr error) { // Now the kubelet will perform the TLS Bootstrap, transforming /etc/kubernetes/bootstrap-kubelet.conf to /etc/kubernetes/kubelet.conf // Wait for the kubelet to create the /etc/kubernetes/kubelet.conf kubeconfig file. If this process // times out, display a somewhat user-friendly message. - waiter := apiclient.NewKubeWaiter(nil, kubeadmconstants.TLSBootstrapTimeout, os.Stdout) - if err := waiter.WaitForKubeletAndFunc(waitForTLSBootstrappedClient); err != nil { + waiter := apiclient.NewKubeWaiter(nil, 0, os.Stdout) + waiter.SetTimeout(kubeadmconstants.DefaultKubeletTimeout) + if err := waiter.WaitForKubelet(); err != nil { + fmt.Printf(kubeadmJoinFailMsg, err) + return err + } + + if err := waitForTLSBootstrappedClient(); err != nil { fmt.Printf(kubeadmJoinFailMsg, err) return err } @@ -227,7 +233,7 @@ func runKubeletStartJoinPhase(c workflow.RunData) (returnErr error) { // waitForTLSBootstrappedClient waits for the /etc/kubernetes/kubelet.conf file to be available func waitForTLSBootstrappedClient() error { - fmt.Println("[kubelet-start] Waiting for the kubelet to perform the TLS Bootstrap...") + fmt.Println("[kubelet-start] Waiting for the kubelet to perform the TLS Bootstrap") // Loop on every falsy return. Return with an error if raised. Exit successfully if true is returned. return wait.PollImmediate(kubeadmconstants.TLSBootstrapRetryInterval, kubeadmconstants.TLSBootstrapTimeout, func() (bool, error) { diff --git a/cmd/kubeadm/app/constants/constants.go b/cmd/kubeadm/app/constants/constants.go index 2678a5be4f8..937d95a2638 100644 --- a/cmd/kubeadm/app/constants/constants.go +++ b/cmd/kubeadm/app/constants/constants.go @@ -230,6 +230,8 @@ const ( // DefaultControlPlaneTimeout specifies the default control plane (actually API Server) timeout for use by kubeadm DefaultControlPlaneTimeout = 4 * time.Minute + // DefaultKubeletTimeout specifies the default kubelet timeout + DefaultKubeletTimeout = 4 * time.Minute // MinimumAddressesInServiceSubnet defines minimum amount of nodes the Service subnet should allow. // We need at least ten, because the DNS service is always at the tenth cluster clusterIP diff --git a/cmd/kubeadm/app/phases/upgrade/staticpods_test.go b/cmd/kubeadm/app/phases/upgrade/staticpods_test.go index 4781a888c36..94ba22b66ba 100644 --- a/cmd/kubeadm/app/phases/upgrade/staticpods_test.go +++ b/cmd/kubeadm/app/phases/upgrade/staticpods_test.go @@ -131,13 +131,8 @@ func (w *fakeWaiter) WaitForStaticPodHashChange(_, _, _ string) error { return w.errsToReturn[waitForHashChange] } -// WaitForHealthyKubelet returns a dummy nil just to implement the interface -func (w *fakeWaiter) WaitForHealthyKubelet(_ time.Duration, _ string) error { - return nil -} - -// WaitForKubeletAndFunc is a wrapper for WaitForHealthyKubelet that also blocks for a function -func (w *fakeWaiter) WaitForKubeletAndFunc(f func() error) error { +// WaitForHKubelet returns a dummy nil just to implement the interface +func (w *fakeWaiter) WaitForKubelet() error { return nil } diff --git a/cmd/kubeadm/app/util/apiclient/wait.go b/cmd/kubeadm/app/util/apiclient/wait.go index cd417980ff0..7808afdb214 100644 --- a/cmd/kubeadm/app/util/apiclient/wait.go +++ b/cmd/kubeadm/app/util/apiclient/wait.go @@ -50,10 +50,8 @@ type Waiter interface { WaitForStaticPodHashChange(nodeName, component, previousHash string) error // WaitForStaticPodControlPlaneHashes fetches sha256 hashes for the control plane static pods WaitForStaticPodControlPlaneHashes(nodeName string) (map[string]string, error) - // WaitForHealthyKubelet blocks until the kubelet /healthz endpoint returns 'ok' - WaitForHealthyKubelet(initialTimeout time.Duration, healthzEndpoint string) error - // WaitForKubeletAndFunc is a wrapper for WaitForHealthyKubelet that also blocks for a function - WaitForKubeletAndFunc(f func() error) error + // WaitForKubelet blocks until the kubelet /healthz endpoint returns 'ok' + WaitForKubelet() error // SetTimeout adjusts the timeout to the specified duration SetTimeout(timeout time.Duration) } @@ -76,17 +74,28 @@ func NewKubeWaiter(client clientset.Interface, timeout time.Duration, writer io. // WaitForAPI waits for the API Server's /healthz endpoint to report "ok" func (w *KubeWaiter) WaitForAPI() error { - start := time.Now() - return wait.PollImmediate(kubeadmconstants.APICallRetryInterval, w.timeout, func() (bool, error) { - healthStatus := 0 - w.client.Discovery().RESTClient().Get().AbsPath("/healthz").Do(context.TODO()).StatusCode(&healthStatus) - if healthStatus != http.StatusOK { - return false, nil - } + fmt.Printf("[api-check] Waiting for a healthy API server. This can take up to %v\n", w.timeout) - fmt.Printf("[apiclient] All control plane components are healthy after %f seconds\n", time.Since(start).Seconds()) - return true, nil - }) + start := time.Now() + err := wait.PollUntilContextTimeout( + context.Background(), + kubeadmconstants.APICallRetryInterval, + w.timeout, + true, func(ctx context.Context) (bool, error) { + healthStatus := 0 + w.client.Discovery().RESTClient().Get().AbsPath("/healthz").Do(ctx).StatusCode(&healthStatus) + if healthStatus != http.StatusOK { + return false, nil + } + return true, nil + }) + if err != nil { + fmt.Printf("[api-check] The API server is not healthy after %v\n", time.Since(start)) + return err + } + + fmt.Printf("[api-check] The API server is healthy after %v\n", time.Since(start)) + return nil } // WaitForPodsWithLabel will lookup pods with the given label and wait until they are all @@ -133,47 +142,54 @@ func (w *KubeWaiter) WaitForPodToDisappear(podName string) error { }) } -// WaitForHealthyKubelet blocks until the kubelet /healthz endpoint returns 'ok' -func (w *KubeWaiter) WaitForHealthyKubelet(initialTimeout time.Duration, healthzEndpoint string) error { - time.Sleep(initialTimeout) - fmt.Printf("[kubelet-check] Initial timeout of %v passed.\n", initialTimeout) - return TryRunCommand(func() error { - client := &http.Client{Transport: netutil.SetOldTransportDefaults(&http.Transport{})} - resp, err := client.Get(healthzEndpoint) - if err != nil { - fmt.Println("[kubelet-check] It seems like the kubelet isn't running or healthy.") - fmt.Printf("[kubelet-check] The HTTP call equal to 'curl -sSL %s' failed with error: %v.\n", healthzEndpoint, err) - return err - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - fmt.Println("[kubelet-check] It seems like the kubelet isn't running or healthy.") - fmt.Printf("[kubelet-check] The HTTP call equal to 'curl -sSL %s' returned HTTP code %d\n", healthzEndpoint, resp.StatusCode) - return errors.New("the kubelet healthz endpoint is unhealthy") - } - return nil - }, 5) // a failureThreshold of five means waiting for a total of 155 seconds -} +// WaitForKubelet blocks until the kubelet /healthz endpoint returns 'ok'. +func (w *KubeWaiter) WaitForKubelet() error { + var ( + lastError error + start = time.Now() + healthzEndpoint = fmt.Sprintf("http://localhost:%d/healthz", kubeadmconstants.KubeletHealthzPort) + ) -// WaitForKubeletAndFunc waits primarily for the function f to execute, even though it might take some time. If that takes a long time, and the kubelet -// /healthz continuously are unhealthy, kubeadm will error out after a period of exponential backoff -func (w *KubeWaiter) WaitForKubeletAndFunc(f func() error) error { - errorChan := make(chan error, 1) + fmt.Printf("[kubelet-check] Waiting for a healthy kubelet. This can take up to %v\n", w.timeout) - go func(errC chan error, waiter Waiter) { - if err := waiter.WaitForHealthyKubelet(40*time.Second, fmt.Sprintf("http://localhost:%d/healthz", kubeadmconstants.KubeletHealthzPort)); err != nil { - errC <- err - } - }(errorChan, w) + formatError := func(cause string) error { + return errors.Errorf("The HTTP call equal to 'curl -sSL %s' returned %s\n", + healthzEndpoint, cause) + } - go func(errC chan error) { - // This main goroutine sends whatever the f function returns (error or not) to the channel - // This in order to continue on success (nil error), or just fail if the function returns an error - errC <- f() - }(errorChan) + err := wait.PollUntilContextTimeout( + context.Background(), + kubeadmconstants.APICallRetryInterval, + w.timeout, + true, func(ctx context.Context) (bool, error) { + client := &http.Client{Transport: netutil.SetOldTransportDefaults(&http.Transport{})} + req, err := http.NewRequestWithContext(ctx, http.MethodGet, healthzEndpoint, nil) + if err != nil { + lastError = formatError(fmt.Sprintf("error: %v", err)) + return false, err + } + resp, err := client.Do(req) + if err != nil { + lastError = formatError(fmt.Sprintf("error: %v", err)) + return false, nil + } + defer func() { + _ = resp.Body.Close() + }() + if resp.StatusCode != http.StatusOK { + lastError = formatError(fmt.Sprintf("status code: %d", resp.StatusCode)) + return false, nil + } - // This call is blocking until one of the goroutines sends to errorChan - return <-errorChan + return true, nil + }) + if err != nil { + fmt.Printf("[kubelet-check] The kubelet is not healthy after %v\n", time.Since(start)) + return lastError + } + + fmt.Printf("[kubelet-check] The kubelet is healthy after %v\n", time.Since(start)) + return nil } // SetTimeout adjusts the timeout to the specified duration diff --git a/cmd/kubeadm/app/util/dryrun/dryrun.go b/cmd/kubeadm/app/util/dryrun/dryrun.go index 0e150eca31b..b1c4e7a4712 100644 --- a/cmd/kubeadm/app/util/dryrun/dryrun.go +++ b/cmd/kubeadm/app/util/dryrun/dryrun.go @@ -106,14 +106,9 @@ func (w *Waiter) WaitForPodToDisappear(podName string) error { return nil } -// WaitForHealthyKubelet blocks until the kubelet /healthz endpoint returns 'ok' -func (w *Waiter) WaitForHealthyKubelet(_ time.Duration, healthzEndpoint string) error { - fmt.Printf("[dryrun] Would make sure the kubelet %q endpoint is healthy\n", healthzEndpoint) - return nil -} - -// WaitForKubeletAndFunc is a wrapper for WaitForHealthyKubelet that also blocks for a function -func (w *Waiter) WaitForKubeletAndFunc(f func() error) error { +// WaitForKubelet blocks until the kubelet /healthz endpoint returns 'ok' +func (w *Waiter) WaitForKubelet() error { + fmt.Println("[dryrun] Would make sure the kubelet's /healthz endpoint is healthy") return nil }