diff --git a/cmd/kubeadm/app/cmd/phases/join/kubelet.go b/cmd/kubeadm/app/cmd/phases/join/kubelet.go index c20d81b52e2..a147f9944e0 100644 --- a/cmd/kubeadm/app/cmd/phases/join/kubelet.go +++ b/cmd/kubeadm/app/cmd/phases/join/kubelet.go @@ -236,11 +236,13 @@ func waitForTLSBootstrappedClient() error { fmt.Println("[kubelet-start] Waiting for the kubelet to perform the TLS Bootstrap") // Loop on every falsy return. Return with an error if raised. Exit successfully if true is returned. - return wait.PollImmediate(kubeadmconstants.TLSBootstrapRetryInterval, kubeadmconstants.TLSBootstrapTimeout, func() (bool, error) { - // Check that we can create a client set out of the kubelet kubeconfig. This ensures not - // only that the kubeconfig file exists, but that other files required by it also exist (like - // client certificate and key) - _, err := kubeconfigutil.ClientSetFromFile(kubeadmconstants.GetKubeletKubeConfigPath()) - return (err == nil), nil - }) + return wait.PollUntilContextTimeout(context.Background(), + kubeadmconstants.TLSBootstrapRetryInterval, kubeadmconstants.TLSBootstrapTimeout, + true, func(_ context.Context) (bool, error) { + // Check that we can create a client set out of the kubelet kubeconfig. This ensures not + // only that the kubeconfig file exists, but that other files required by it also exist (like + // client certificate and key) + _, err := kubeconfigutil.ClientSetFromFile(kubeadmconstants.GetKubeletKubeConfigPath()) + return (err == nil), nil + }) } diff --git a/cmd/kubeadm/app/discovery/file/file.go b/cmd/kubeadm/app/discovery/file/file.go index 6a9627bb8b4..4c937679df7 100644 --- a/cmd/kubeadm/app/discovery/file/file.go +++ b/cmd/kubeadm/app/discovery/file/file.go @@ -91,23 +91,26 @@ func ValidateConfigInfo(config *clientcmdapi.Config, discoveryTimeout time.Durat var clusterinfoCM *v1.ConfigMap - err = wait.Poll(constants.DiscoveryRetryInterval, discoveryTimeout, func() (bool, error) { - var err error - clusterinfoCM, err = client.CoreV1().ConfigMaps(metav1.NamespacePublic).Get(context.TODO(), bootstrapapi.ConfigMapClusterInfo, metav1.GetOptions{}) - if err != nil { - if apierrors.IsForbidden(err) { - // If the request is unauthorized, the cluster admin has not granted access to the cluster info configmap for unauthenticated users - // In that case, trust the cluster admin and do not refresh the cluster-info data - klog.Warningf("[discovery] Could not access the %s ConfigMap for refreshing the cluster-info information, but the TLS cert is valid so proceeding...\n", bootstrapapi.ConfigMapClusterInfo) - return true, nil + var lastError error + err = wait.PollUntilContextTimeout(context.Background(), + constants.DiscoveryRetryInterval, discoveryTimeout, + true, func(_ context.Context) (bool, error) { + clusterinfoCM, lastError = client.CoreV1().ConfigMaps(metav1.NamespacePublic).Get(context.TODO(), bootstrapapi.ConfigMapClusterInfo, metav1.GetOptions{}) + if lastError != nil { + if apierrors.IsForbidden(lastError) { + // If the request is unauthorized, the cluster admin has not granted access to the cluster info configmap for unauthenticated users + // In that case, trust the cluster admin and do not refresh the cluster-info data + klog.Warningf("[discovery] Could not access the %s ConfigMap for refreshing the cluster-info information, but the TLS cert is valid so proceeding...\n", bootstrapapi.ConfigMapClusterInfo) + return true, nil + } + klog.V(1).Infof("[discovery] Error reading the %s ConfigMap, will try again: %v\n", bootstrapapi.ConfigMapClusterInfo, err) + return false, nil } - klog.V(1).Infof("[discovery] Error reading the %s ConfigMap, will try again: %v\n", bootstrapapi.ConfigMapClusterInfo, err) - return false, nil - } - return true, nil - }) - if err == wait.ErrWaitTimeout { - return nil, errors.Errorf("Abort reading the %s ConfigMap after timeout of %v", bootstrapapi.ConfigMapClusterInfo, discoveryTimeout) + return true, nil + }) + if err != nil { + return nil, errors.Wrapf(lastError, "Abort reading the %s ConfigMap after timeout of %v", + bootstrapapi.ConfigMapClusterInfo, discoveryTimeout) } // If we couldn't fetch the cluster-info ConfigMap, just return the cluster-info object the user provided diff --git a/cmd/kubeadm/app/discovery/token/token.go b/cmd/kubeadm/app/discovery/token/token.go index 6a3f3114e2e..296231a22c0 100644 --- a/cmd/kubeadm/app/discovery/token/token.go +++ b/cmd/kubeadm/app/discovery/token/token.go @@ -208,28 +208,31 @@ func getClusterInfo(client clientset.Interface, kubeconfig *clientcmdapi.Config, } } - ctx, cancel := context.WithTimeout(context.TODO(), duration) - defer cancel() - - wait.JitterUntil(func() { - cm, err = client.CoreV1().ConfigMaps(metav1.NamespacePublic).Get(context.TODO(), bootstrapapi.ConfigMapClusterInfo, metav1.GetOptions{}) - if err != nil { - klog.V(1).Infof("[discovery] Failed to request cluster-info, will try again: %v", err) - return - } - // Even if the ConfigMap is available the JWS signature is patched-in a bit later. - // Make sure we retry util then. - if _, ok := cm.Data[bootstrapapi.JWSSignatureKeyPrefix+token.ID]; !ok { - klog.V(1).Infof("[discovery] The cluster-info ConfigMap does not yet contain a JWS signature for token ID %q, will try again", token.ID) - err = errors.Errorf("could not find a JWS signature in the cluster-info ConfigMap for token ID %q", token.ID) - return - } - // Cancel the context on success - cancel() - }, interval, 0.3, true, ctx.Done()) + klog.V(1).Infof("[discovery] Waiting for the cluster-info ConfigMap to receive a JWS signature"+ + "for token ID %q", token.ID) + var lastError error + err = wait.PollUntilContextTimeout(context.Background(), + interval, duration, true, + func(ctx context.Context) (bool, error) { + cm, err = client.CoreV1().ConfigMaps(metav1.NamespacePublic). + Get(ctx, bootstrapapi.ConfigMapClusterInfo, metav1.GetOptions{}) + if err != nil { + lastError = errors.Wrapf(err, "failed to request the cluster-info ConfigMap") + klog.V(1).Infof("[discovery] Retrying due to error: %v", lastError) + return false, nil + } + // Even if the ConfigMap is available the JWS signature is patched-in a bit later. + if _, ok := cm.Data[bootstrapapi.JWSSignatureKeyPrefix+token.ID]; !ok { + lastError = errors.Errorf("could not find a JWS signature in the cluster-info ConfigMap"+ + " for token ID %q", token.ID) + klog.V(1).Infof("[discovery] Retrying due to error: %v", lastError) + return false, nil + } + return true, nil + }) if err != nil { - return nil, err + return nil, lastError } return cm, nil diff --git a/cmd/kubeadm/app/util/apiclient/idempotency.go b/cmd/kubeadm/app/util/apiclient/idempotency.go index fa5fc00cbff..49a2a7c2a6a 100644 --- a/cmd/kubeadm/app/util/apiclient/idempotency.go +++ b/cmd/kubeadm/app/util/apiclient/idempotency.go @@ -61,17 +61,19 @@ func CreateOrUpdateConfigMap(client clientset.Interface, cm *v1.ConfigMap) error // taking place) func CreateOrMutateConfigMap(client clientset.Interface, cm *v1.ConfigMap, mutator ConfigMapMutator) error { var lastError error - err := wait.PollImmediate(constants.APICallRetryInterval, constants.APICallWithWriteTimeout, func() (bool, error) { - if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Create(context.TODO(), cm, metav1.CreateOptions{}); err != nil { - lastError = err - if apierrors.IsAlreadyExists(err) { - lastError = MutateConfigMap(client, metav1.ObjectMeta{Namespace: cm.ObjectMeta.Namespace, Name: cm.ObjectMeta.Name}, mutator) - return lastError == nil, nil + err := wait.PollUntilContextTimeout(context.Background(), + constants.APICallRetryInterval, constants.APICallWithWriteTimeout, + true, func(_ context.Context) (bool, error) { + if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Create(context.TODO(), cm, metav1.CreateOptions{}); err != nil { + lastError = err + if apierrors.IsAlreadyExists(err) { + lastError = MutateConfigMap(client, metav1.ObjectMeta{Namespace: cm.ObjectMeta.Namespace, Name: cm.ObjectMeta.Name}, mutator) + return lastError == nil, nil + } + return false, nil } - return false, nil - } - return true, nil - }) + return true, nil + }) if err == nil { return nil } @@ -84,19 +86,21 @@ func CreateOrMutateConfigMap(client clientset.Interface, cm *v1.ConfigMap, mutat // taking place). func MutateConfigMap(client clientset.Interface, meta metav1.ObjectMeta, mutator ConfigMapMutator) error { var lastError error - err := wait.PollImmediate(constants.APICallRetryInterval, constants.APICallWithWriteTimeout, func() (bool, error) { - configMap, err := client.CoreV1().ConfigMaps(meta.Namespace).Get(context.TODO(), meta.Name, metav1.GetOptions{}) - if err != nil { - lastError = err - return false, nil - } - if err = mutator(configMap); err != nil { - lastError = errors.Wrap(err, "unable to mutate ConfigMap") - return false, nil - } - _, lastError = client.CoreV1().ConfigMaps(configMap.ObjectMeta.Namespace).Update(context.TODO(), configMap, metav1.UpdateOptions{}) - return lastError == nil, nil - }) + err := wait.PollUntilContextTimeout(context.Background(), + constants.APICallRetryInterval, constants.APICallWithWriteTimeout, + true, func(_ context.Context) (bool, error) { + configMap, err := client.CoreV1().ConfigMaps(meta.Namespace).Get(context.TODO(), meta.Name, metav1.GetOptions{}) + if err != nil { + lastError = err + return false, nil + } + if err = mutator(configMap); err != nil { + lastError = errors.Wrap(err, "unable to mutate ConfigMap") + return false, nil + } + _, lastError = client.CoreV1().ConfigMaps(configMap.ObjectMeta.Namespace).Update(context.TODO(), configMap, metav1.UpdateOptions{}) + return lastError == nil, nil + }) if err == nil { return nil } @@ -190,20 +194,22 @@ func CreateOrUpdateDaemonSet(client clientset.Interface, ds *apps.DaemonSet) err // CreateOrUpdateRole creates a Role if the target resource doesn't exist. If the resource exists already, this function will update the resource instead. func CreateOrUpdateRole(client clientset.Interface, role *rbac.Role) error { var lastError error - err := wait.PollImmediate(constants.APICallRetryInterval, constants.APICallWithWriteTimeout, func() (bool, error) { - if _, err := client.RbacV1().Roles(role.ObjectMeta.Namespace).Create(context.TODO(), role, metav1.CreateOptions{}); err != nil { - if !apierrors.IsAlreadyExists(err) { - lastError = errors.Wrap(err, "unable to create RBAC role") - return false, nil - } + err := wait.PollUntilContextTimeout(context.Background(), + constants.APICallRetryInterval, constants.APICallWithWriteTimeout, + true, func(_ context.Context) (bool, error) { + if _, err := client.RbacV1().Roles(role.ObjectMeta.Namespace).Create(context.TODO(), role, metav1.CreateOptions{}); err != nil { + if !apierrors.IsAlreadyExists(err) { + lastError = errors.Wrap(err, "unable to create RBAC role") + return false, nil + } - if _, err := client.RbacV1().Roles(role.ObjectMeta.Namespace).Update(context.TODO(), role, metav1.UpdateOptions{}); err != nil { - lastError = errors.Wrap(err, "unable to update RBAC role") - return false, nil + if _, err := client.RbacV1().Roles(role.ObjectMeta.Namespace).Update(context.TODO(), role, metav1.UpdateOptions{}); err != nil { + lastError = errors.Wrap(err, "unable to update RBAC role") + return false, nil + } } - } - return true, nil - }) + return true, nil + }) if err == nil { return nil } @@ -213,20 +219,22 @@ func CreateOrUpdateRole(client clientset.Interface, role *rbac.Role) error { // CreateOrUpdateRoleBinding creates a RoleBinding if the target resource doesn't exist. If the resource exists already, this function will update the resource instead. func CreateOrUpdateRoleBinding(client clientset.Interface, roleBinding *rbac.RoleBinding) error { var lastError error - err := wait.PollImmediate(constants.APICallRetryInterval, constants.APICallWithWriteTimeout, func() (bool, error) { - if _, err := client.RbacV1().RoleBindings(roleBinding.ObjectMeta.Namespace).Create(context.TODO(), roleBinding, metav1.CreateOptions{}); err != nil { - if !apierrors.IsAlreadyExists(err) { - lastError = errors.Wrap(err, "unable to create RBAC rolebinding") - return false, nil - } + err := wait.PollUntilContextTimeout(context.Background(), + constants.APICallRetryInterval, constants.APICallWithWriteTimeout, + true, func(_ context.Context) (bool, error) { + if _, err := client.RbacV1().RoleBindings(roleBinding.ObjectMeta.Namespace).Create(context.TODO(), roleBinding, metav1.CreateOptions{}); err != nil { + if !apierrors.IsAlreadyExists(err) { + lastError = errors.Wrap(err, "unable to create RBAC rolebinding") + return false, nil + } - if _, err := client.RbacV1().RoleBindings(roleBinding.ObjectMeta.Namespace).Update(context.TODO(), roleBinding, metav1.UpdateOptions{}); err != nil { - lastError = errors.Wrap(err, "unable to update RBAC rolebinding") - return false, nil + if _, err := client.RbacV1().RoleBindings(roleBinding.ObjectMeta.Namespace).Update(context.TODO(), roleBinding, metav1.UpdateOptions{}); err != nil { + lastError = errors.Wrap(err, "unable to update RBAC rolebinding") + return false, nil + } } - } - return true, nil - }) + return true, nil + }) if err == nil { return nil } @@ -262,11 +270,8 @@ func CreateOrUpdateClusterRoleBinding(client clientset.Interface, clusterRoleBin } // PatchNodeOnce executes patchFn on the node object found by the node name. -// This is a condition function meant to be used with wait.Poll. false, nil -// implies it is safe to try again, an error indicates no more tries should be -// made and true indicates success. -func PatchNodeOnce(client clientset.Interface, nodeName string, patchFn func(*v1.Node), lastError *error) func() (bool, error) { - return func() (bool, error) { +func PatchNodeOnce(client clientset.Interface, nodeName string, patchFn func(*v1.Node), lastError *error) func(context.Context) (bool, error) { + return func(_ context.Context) (bool, error) { // First get the node object n, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) if err != nil { @@ -317,10 +322,9 @@ func PatchNodeOnce(client clientset.Interface, nodeName string, patchFn func(*v1 // Retries are provided by the wait package. func PatchNode(client clientset.Interface, nodeName string, patchFn func(*v1.Node)) error { var lastError error - // wait.Poll will rerun the condition function every interval function if - // the function returns false. If the condition function returns an error - // then the retries end and the error is returned. - err := wait.Poll(constants.APICallRetryInterval, constants.PatchNodeTimeout, PatchNodeOnce(client, nodeName, patchFn, &lastError)) + err := wait.PollUntilContextTimeout(context.Background(), + constants.APICallRetryInterval, constants.PatchNodeTimeout, + true, PatchNodeOnce(client, nodeName, patchFn, &lastError)) if err == nil { return nil } diff --git a/cmd/kubeadm/app/util/apiclient/idempotency_test.go b/cmd/kubeadm/app/util/apiclient/idempotency_test.go index 6c10cae0c04..9a9e3f18f64 100644 --- a/cmd/kubeadm/app/util/apiclient/idempotency_test.go +++ b/cmd/kubeadm/app/util/apiclient/idempotency_test.go @@ -135,7 +135,7 @@ func TestPatchNode(t *testing.T) { "updatedBy": "test", } }, &lastError) - success, err := conditionFunction() + success, err := conditionFunction(context.Background()) if err != nil { t.Fatalf("did not expect error: %v", err) } diff --git a/cmd/kubeadm/app/util/apiclient/wait.go b/cmd/kubeadm/app/util/apiclient/wait.go index f7939dddc3a..0f9bc819347 100644 --- a/cmd/kubeadm/app/util/apiclient/wait.go +++ b/cmd/kubeadm/app/util/apiclient/wait.go @@ -103,43 +103,47 @@ func (w *KubeWaiter) WaitForAPI() error { func (w *KubeWaiter) WaitForPodsWithLabel(kvLabel string) error { lastKnownPodNumber := -1 - return wait.PollImmediate(kubeadmconstants.APICallRetryInterval, w.timeout, func() (bool, error) { - listOpts := metav1.ListOptions{LabelSelector: kvLabel} - pods, err := w.client.CoreV1().Pods(metav1.NamespaceSystem).List(context.TODO(), listOpts) - if err != nil { - fmt.Fprintf(w.writer, "[apiclient] Error getting Pods with label selector %q [%v]\n", kvLabel, err) - return false, nil - } - - if lastKnownPodNumber != len(pods.Items) { - fmt.Fprintf(w.writer, "[apiclient] Found %d Pods for label selector %s\n", len(pods.Items), kvLabel) - lastKnownPodNumber = len(pods.Items) - } - - if len(pods.Items) == 0 { - return false, nil - } - - for _, pod := range pods.Items { - if pod.Status.Phase != v1.PodRunning { + return wait.PollUntilContextTimeout(context.Background(), + kubeadmconstants.APICallRetryInterval, w.timeout, + true, func(_ context.Context) (bool, error) { + listOpts := metav1.ListOptions{LabelSelector: kvLabel} + pods, err := w.client.CoreV1().Pods(metav1.NamespaceSystem).List(context.TODO(), listOpts) + if err != nil { + fmt.Fprintf(w.writer, "[apiclient] Error getting Pods with label selector %q [%v]\n", kvLabel, err) return false, nil } - } - return true, nil - }) + if lastKnownPodNumber != len(pods.Items) { + fmt.Fprintf(w.writer, "[apiclient] Found %d Pods for label selector %s\n", len(pods.Items), kvLabel) + lastKnownPodNumber = len(pods.Items) + } + + if len(pods.Items) == 0 { + return false, nil + } + + for _, pod := range pods.Items { + if pod.Status.Phase != v1.PodRunning { + return false, nil + } + } + + return true, nil + }) } // WaitForPodToDisappear blocks until it timeouts or gets a "NotFound" response from the API Server when getting the Static Pod in question func (w *KubeWaiter) WaitForPodToDisappear(podName string) error { - return wait.PollImmediate(kubeadmconstants.APICallRetryInterval, w.timeout, func() (bool, error) { - _, err := w.client.CoreV1().Pods(metav1.NamespaceSystem).Get(context.TODO(), podName, metav1.GetOptions{}) - if err != nil && apierrors.IsNotFound(err) { - fmt.Printf("[apiclient] The old Pod %q is now removed (which is desired)\n", podName) - return true, nil - } - return false, nil - }) + return wait.PollUntilContextTimeout(context.Background(), + kubeadmconstants.APICallRetryInterval, w.timeout, + true, func(_ context.Context) (bool, error) { + _, err := w.client.CoreV1().Pods(metav1.NamespaceSystem).Get(context.TODO(), podName, metav1.GetOptions{}) + if err != nil && apierrors.IsNotFound(err) { + fmt.Printf("[apiclient] The old Pod %q is now removed (which is desired)\n", podName) + return true, nil + } + return false, nil + }) } // WaitForKubelet blocks until the kubelet /healthz endpoint returns 'ok'. @@ -204,14 +208,16 @@ func (w *KubeWaiter) WaitForStaticPodControlPlaneHashes(nodeName string) (map[st var err, lastErr error mirrorPodHashes := map[string]string{} for _, component := range kubeadmconstants.ControlPlaneComponents { - err = wait.PollImmediate(kubeadmconstants.APICallRetryInterval, w.timeout, func() (bool, error) { - componentHash, err = getStaticPodSingleHash(w.client, nodeName, component) - if err != nil { - lastErr = err - return false, nil - } - return true, nil - }) + err = wait.PollUntilContextTimeout(context.Background(), + kubeadmconstants.APICallRetryInterval, w.timeout, + true, func(_ context.Context) (bool, error) { + componentHash, err = getStaticPodSingleHash(w.client, nodeName, component) + if err != nil { + lastErr = err + return false, nil + } + return true, nil + }) if err != nil { return nil, lastErr } @@ -226,14 +232,16 @@ func (w *KubeWaiter) WaitForStaticPodSingleHash(nodeName string, component strin componentPodHash := "" var err, lastErr error - err = wait.PollImmediate(kubeadmconstants.APICallRetryInterval, w.timeout, func() (bool, error) { - componentPodHash, err = getStaticPodSingleHash(w.client, nodeName, component) - if err != nil { - lastErr = err - return false, nil - } - return true, nil - }) + err = wait.PollUntilContextTimeout(context.Background(), + kubeadmconstants.APICallRetryInterval, w.timeout, + true, func(_ context.Context) (bool, error) { + componentPodHash, err = getStaticPodSingleHash(w.client, nodeName, component) + if err != nil { + lastErr = err + return false, nil + } + return true, nil + }) if err != nil { err = lastErr @@ -245,21 +253,23 @@ func (w *KubeWaiter) WaitForStaticPodSingleHash(nodeName string, component strin // This implicitly means this function blocks until the kubelet has restarted the Static Pod in question func (w *KubeWaiter) WaitForStaticPodHashChange(nodeName, component, previousHash string) error { var err, lastErr error - err = wait.PollImmediate(kubeadmconstants.APICallRetryInterval, w.timeout, func() (bool, error) { - hash, err := getStaticPodSingleHash(w.client, nodeName, component) - if err != nil { - lastErr = err - return false, nil - } - // Set lastErr to nil to be able to later distinguish between getStaticPodSingleHash() and timeout errors - lastErr = nil - // We should continue polling until the UID changes - if hash == previousHash { - return false, nil - } + err = wait.PollUntilContextTimeout(context.Background(), + kubeadmconstants.APICallRetryInterval, w.timeout, + true, func(_ context.Context) (bool, error) { + hash, err := getStaticPodSingleHash(w.client, nodeName, component) + if err != nil { + lastErr = err + return false, nil + } + // Set lastErr to nil to be able to later distinguish between getStaticPodSingleHash() and timeout errors + lastErr = nil + // We should continue polling until the UID changes + if hash == previousHash { + return false, nil + } - return true, nil - }) + return true, nil + }) // If lastError is not nil, this must be a getStaticPodSingleHash() error, else if err is not nil there was a poll timeout if lastErr != nil {