kubeadm: replace deprecated wait.Poll() and wait.PollImmediate()

Replace the usage of the deprecated wait.Poll() and
wait.PollImmediate() functions with wait.PollUntilContextTimeout().
Since we don't have piping of context around kubeadm,
use context.Background() everywhere.

Some wait.Poll() functions were converted to "immediate" as there
is no point for them to not be. This is done for consistency.

Replace the only instance of wait.JitterUntil with
wait.PollUntilContextTimeout. JitterUntil is not deprecated
but this is also done for consistency.
This commit is contained in:
Lubomir I. Ivanov 2023-12-26 14:24:48 +02:00
parent d9e48705ff
commit 374e41cf66
6 changed files with 182 additions and 160 deletions

View File

@ -236,11 +236,13 @@ func waitForTLSBootstrappedClient() error {
fmt.Println("[kubelet-start] Waiting for the kubelet to perform the TLS Bootstrap")
// Loop on every falsy return. Return with an error if raised. Exit successfully if true is returned.
return wait.PollImmediate(kubeadmconstants.TLSBootstrapRetryInterval, kubeadmconstants.TLSBootstrapTimeout, func() (bool, error) {
// Check that we can create a client set out of the kubelet kubeconfig. This ensures not
// only that the kubeconfig file exists, but that other files required by it also exist (like
// client certificate and key)
_, err := kubeconfigutil.ClientSetFromFile(kubeadmconstants.GetKubeletKubeConfigPath())
return (err == nil), nil
})
return wait.PollUntilContextTimeout(context.Background(),
kubeadmconstants.TLSBootstrapRetryInterval, kubeadmconstants.TLSBootstrapTimeout,
true, func(_ context.Context) (bool, error) {
// Check that we can create a client set out of the kubelet kubeconfig. This ensures not
// only that the kubeconfig file exists, but that other files required by it also exist (like
// client certificate and key)
_, err := kubeconfigutil.ClientSetFromFile(kubeadmconstants.GetKubeletKubeConfigPath())
return (err == nil), nil
})
}

View File

@ -91,23 +91,26 @@ func ValidateConfigInfo(config *clientcmdapi.Config, discoveryTimeout time.Durat
var clusterinfoCM *v1.ConfigMap
err = wait.Poll(constants.DiscoveryRetryInterval, discoveryTimeout, func() (bool, error) {
var err error
clusterinfoCM, err = client.CoreV1().ConfigMaps(metav1.NamespacePublic).Get(context.TODO(), bootstrapapi.ConfigMapClusterInfo, metav1.GetOptions{})
if err != nil {
if apierrors.IsForbidden(err) {
// If the request is unauthorized, the cluster admin has not granted access to the cluster info configmap for unauthenticated users
// In that case, trust the cluster admin and do not refresh the cluster-info data
klog.Warningf("[discovery] Could not access the %s ConfigMap for refreshing the cluster-info information, but the TLS cert is valid so proceeding...\n", bootstrapapi.ConfigMapClusterInfo)
return true, nil
var lastError error
err = wait.PollUntilContextTimeout(context.Background(),
constants.DiscoveryRetryInterval, discoveryTimeout,
true, func(_ context.Context) (bool, error) {
clusterinfoCM, lastError = client.CoreV1().ConfigMaps(metav1.NamespacePublic).Get(context.TODO(), bootstrapapi.ConfigMapClusterInfo, metav1.GetOptions{})
if lastError != nil {
if apierrors.IsForbidden(lastError) {
// If the request is unauthorized, the cluster admin has not granted access to the cluster info configmap for unauthenticated users
// In that case, trust the cluster admin and do not refresh the cluster-info data
klog.Warningf("[discovery] Could not access the %s ConfigMap for refreshing the cluster-info information, but the TLS cert is valid so proceeding...\n", bootstrapapi.ConfigMapClusterInfo)
return true, nil
}
klog.V(1).Infof("[discovery] Error reading the %s ConfigMap, will try again: %v\n", bootstrapapi.ConfigMapClusterInfo, err)
return false, nil
}
klog.V(1).Infof("[discovery] Error reading the %s ConfigMap, will try again: %v\n", bootstrapapi.ConfigMapClusterInfo, err)
return false, nil
}
return true, nil
})
if err == wait.ErrWaitTimeout {
return nil, errors.Errorf("Abort reading the %s ConfigMap after timeout of %v", bootstrapapi.ConfigMapClusterInfo, discoveryTimeout)
return true, nil
})
if err != nil {
return nil, errors.Wrapf(lastError, "Abort reading the %s ConfigMap after timeout of %v",
bootstrapapi.ConfigMapClusterInfo, discoveryTimeout)
}
// If we couldn't fetch the cluster-info ConfigMap, just return the cluster-info object the user provided

View File

@ -208,28 +208,31 @@ func getClusterInfo(client clientset.Interface, kubeconfig *clientcmdapi.Config,
}
}
ctx, cancel := context.WithTimeout(context.TODO(), duration)
defer cancel()
wait.JitterUntil(func() {
cm, err = client.CoreV1().ConfigMaps(metav1.NamespacePublic).Get(context.TODO(), bootstrapapi.ConfigMapClusterInfo, metav1.GetOptions{})
if err != nil {
klog.V(1).Infof("[discovery] Failed to request cluster-info, will try again: %v", err)
return
}
// Even if the ConfigMap is available the JWS signature is patched-in a bit later.
// Make sure we retry util then.
if _, ok := cm.Data[bootstrapapi.JWSSignatureKeyPrefix+token.ID]; !ok {
klog.V(1).Infof("[discovery] The cluster-info ConfigMap does not yet contain a JWS signature for token ID %q, will try again", token.ID)
err = errors.Errorf("could not find a JWS signature in the cluster-info ConfigMap for token ID %q", token.ID)
return
}
// Cancel the context on success
cancel()
}, interval, 0.3, true, ctx.Done())
klog.V(1).Infof("[discovery] Waiting for the cluster-info ConfigMap to receive a JWS signature"+
"for token ID %q", token.ID)
var lastError error
err = wait.PollUntilContextTimeout(context.Background(),
interval, duration, true,
func(ctx context.Context) (bool, error) {
cm, err = client.CoreV1().ConfigMaps(metav1.NamespacePublic).
Get(ctx, bootstrapapi.ConfigMapClusterInfo, metav1.GetOptions{})
if err != nil {
lastError = errors.Wrapf(err, "failed to request the cluster-info ConfigMap")
klog.V(1).Infof("[discovery] Retrying due to error: %v", lastError)
return false, nil
}
// Even if the ConfigMap is available the JWS signature is patched-in a bit later.
if _, ok := cm.Data[bootstrapapi.JWSSignatureKeyPrefix+token.ID]; !ok {
lastError = errors.Errorf("could not find a JWS signature in the cluster-info ConfigMap"+
" for token ID %q", token.ID)
klog.V(1).Infof("[discovery] Retrying due to error: %v", lastError)
return false, nil
}
return true, nil
})
if err != nil {
return nil, err
return nil, lastError
}
return cm, nil

View File

@ -61,17 +61,19 @@ func CreateOrUpdateConfigMap(client clientset.Interface, cm *v1.ConfigMap) error
// taking place)
func CreateOrMutateConfigMap(client clientset.Interface, cm *v1.ConfigMap, mutator ConfigMapMutator) error {
var lastError error
err := wait.PollImmediate(constants.APICallRetryInterval, constants.APICallWithWriteTimeout, func() (bool, error) {
if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Create(context.TODO(), cm, metav1.CreateOptions{}); err != nil {
lastError = err
if apierrors.IsAlreadyExists(err) {
lastError = MutateConfigMap(client, metav1.ObjectMeta{Namespace: cm.ObjectMeta.Namespace, Name: cm.ObjectMeta.Name}, mutator)
return lastError == nil, nil
err := wait.PollUntilContextTimeout(context.Background(),
constants.APICallRetryInterval, constants.APICallWithWriteTimeout,
true, func(_ context.Context) (bool, error) {
if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Create(context.TODO(), cm, metav1.CreateOptions{}); err != nil {
lastError = err
if apierrors.IsAlreadyExists(err) {
lastError = MutateConfigMap(client, metav1.ObjectMeta{Namespace: cm.ObjectMeta.Namespace, Name: cm.ObjectMeta.Name}, mutator)
return lastError == nil, nil
}
return false, nil
}
return false, nil
}
return true, nil
})
return true, nil
})
if err == nil {
return nil
}
@ -84,19 +86,21 @@ func CreateOrMutateConfigMap(client clientset.Interface, cm *v1.ConfigMap, mutat
// taking place).
func MutateConfigMap(client clientset.Interface, meta metav1.ObjectMeta, mutator ConfigMapMutator) error {
var lastError error
err := wait.PollImmediate(constants.APICallRetryInterval, constants.APICallWithWriteTimeout, func() (bool, error) {
configMap, err := client.CoreV1().ConfigMaps(meta.Namespace).Get(context.TODO(), meta.Name, metav1.GetOptions{})
if err != nil {
lastError = err
return false, nil
}
if err = mutator(configMap); err != nil {
lastError = errors.Wrap(err, "unable to mutate ConfigMap")
return false, nil
}
_, lastError = client.CoreV1().ConfigMaps(configMap.ObjectMeta.Namespace).Update(context.TODO(), configMap, metav1.UpdateOptions{})
return lastError == nil, nil
})
err := wait.PollUntilContextTimeout(context.Background(),
constants.APICallRetryInterval, constants.APICallWithWriteTimeout,
true, func(_ context.Context) (bool, error) {
configMap, err := client.CoreV1().ConfigMaps(meta.Namespace).Get(context.TODO(), meta.Name, metav1.GetOptions{})
if err != nil {
lastError = err
return false, nil
}
if err = mutator(configMap); err != nil {
lastError = errors.Wrap(err, "unable to mutate ConfigMap")
return false, nil
}
_, lastError = client.CoreV1().ConfigMaps(configMap.ObjectMeta.Namespace).Update(context.TODO(), configMap, metav1.UpdateOptions{})
return lastError == nil, nil
})
if err == nil {
return nil
}
@ -190,20 +194,22 @@ func CreateOrUpdateDaemonSet(client clientset.Interface, ds *apps.DaemonSet) err
// CreateOrUpdateRole creates a Role if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
func CreateOrUpdateRole(client clientset.Interface, role *rbac.Role) error {
var lastError error
err := wait.PollImmediate(constants.APICallRetryInterval, constants.APICallWithWriteTimeout, func() (bool, error) {
if _, err := client.RbacV1().Roles(role.ObjectMeta.Namespace).Create(context.TODO(), role, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) {
lastError = errors.Wrap(err, "unable to create RBAC role")
return false, nil
}
err := wait.PollUntilContextTimeout(context.Background(),
constants.APICallRetryInterval, constants.APICallWithWriteTimeout,
true, func(_ context.Context) (bool, error) {
if _, err := client.RbacV1().Roles(role.ObjectMeta.Namespace).Create(context.TODO(), role, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) {
lastError = errors.Wrap(err, "unable to create RBAC role")
return false, nil
}
if _, err := client.RbacV1().Roles(role.ObjectMeta.Namespace).Update(context.TODO(), role, metav1.UpdateOptions{}); err != nil {
lastError = errors.Wrap(err, "unable to update RBAC role")
return false, nil
if _, err := client.RbacV1().Roles(role.ObjectMeta.Namespace).Update(context.TODO(), role, metav1.UpdateOptions{}); err != nil {
lastError = errors.Wrap(err, "unable to update RBAC role")
return false, nil
}
}
}
return true, nil
})
return true, nil
})
if err == nil {
return nil
}
@ -213,20 +219,22 @@ func CreateOrUpdateRole(client clientset.Interface, role *rbac.Role) error {
// CreateOrUpdateRoleBinding creates a RoleBinding if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
func CreateOrUpdateRoleBinding(client clientset.Interface, roleBinding *rbac.RoleBinding) error {
var lastError error
err := wait.PollImmediate(constants.APICallRetryInterval, constants.APICallWithWriteTimeout, func() (bool, error) {
if _, err := client.RbacV1().RoleBindings(roleBinding.ObjectMeta.Namespace).Create(context.TODO(), roleBinding, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) {
lastError = errors.Wrap(err, "unable to create RBAC rolebinding")
return false, nil
}
err := wait.PollUntilContextTimeout(context.Background(),
constants.APICallRetryInterval, constants.APICallWithWriteTimeout,
true, func(_ context.Context) (bool, error) {
if _, err := client.RbacV1().RoleBindings(roleBinding.ObjectMeta.Namespace).Create(context.TODO(), roleBinding, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) {
lastError = errors.Wrap(err, "unable to create RBAC rolebinding")
return false, nil
}
if _, err := client.RbacV1().RoleBindings(roleBinding.ObjectMeta.Namespace).Update(context.TODO(), roleBinding, metav1.UpdateOptions{}); err != nil {
lastError = errors.Wrap(err, "unable to update RBAC rolebinding")
return false, nil
if _, err := client.RbacV1().RoleBindings(roleBinding.ObjectMeta.Namespace).Update(context.TODO(), roleBinding, metav1.UpdateOptions{}); err != nil {
lastError = errors.Wrap(err, "unable to update RBAC rolebinding")
return false, nil
}
}
}
return true, nil
})
return true, nil
})
if err == nil {
return nil
}
@ -262,11 +270,8 @@ func CreateOrUpdateClusterRoleBinding(client clientset.Interface, clusterRoleBin
}
// PatchNodeOnce executes patchFn on the node object found by the node name.
// This is a condition function meant to be used with wait.Poll. false, nil
// implies it is safe to try again, an error indicates no more tries should be
// made and true indicates success.
func PatchNodeOnce(client clientset.Interface, nodeName string, patchFn func(*v1.Node), lastError *error) func() (bool, error) {
return func() (bool, error) {
func PatchNodeOnce(client clientset.Interface, nodeName string, patchFn func(*v1.Node), lastError *error) func(context.Context) (bool, error) {
return func(_ context.Context) (bool, error) {
// First get the node object
n, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
if err != nil {
@ -317,10 +322,9 @@ func PatchNodeOnce(client clientset.Interface, nodeName string, patchFn func(*v1
// Retries are provided by the wait package.
func PatchNode(client clientset.Interface, nodeName string, patchFn func(*v1.Node)) error {
var lastError error
// wait.Poll will rerun the condition function every interval function if
// the function returns false. If the condition function returns an error
// then the retries end and the error is returned.
err := wait.Poll(constants.APICallRetryInterval, constants.PatchNodeTimeout, PatchNodeOnce(client, nodeName, patchFn, &lastError))
err := wait.PollUntilContextTimeout(context.Background(),
constants.APICallRetryInterval, constants.PatchNodeTimeout,
true, PatchNodeOnce(client, nodeName, patchFn, &lastError))
if err == nil {
return nil
}

View File

@ -135,7 +135,7 @@ func TestPatchNode(t *testing.T) {
"updatedBy": "test",
}
}, &lastError)
success, err := conditionFunction()
success, err := conditionFunction(context.Background())
if err != nil {
t.Fatalf("did not expect error: %v", err)
}

View File

@ -103,43 +103,47 @@ func (w *KubeWaiter) WaitForAPI() error {
func (w *KubeWaiter) WaitForPodsWithLabel(kvLabel string) error {
lastKnownPodNumber := -1
return wait.PollImmediate(kubeadmconstants.APICallRetryInterval, w.timeout, func() (bool, error) {
listOpts := metav1.ListOptions{LabelSelector: kvLabel}
pods, err := w.client.CoreV1().Pods(metav1.NamespaceSystem).List(context.TODO(), listOpts)
if err != nil {
fmt.Fprintf(w.writer, "[apiclient] Error getting Pods with label selector %q [%v]\n", kvLabel, err)
return false, nil
}
if lastKnownPodNumber != len(pods.Items) {
fmt.Fprintf(w.writer, "[apiclient] Found %d Pods for label selector %s\n", len(pods.Items), kvLabel)
lastKnownPodNumber = len(pods.Items)
}
if len(pods.Items) == 0 {
return false, nil
}
for _, pod := range pods.Items {
if pod.Status.Phase != v1.PodRunning {
return wait.PollUntilContextTimeout(context.Background(),
kubeadmconstants.APICallRetryInterval, w.timeout,
true, func(_ context.Context) (bool, error) {
listOpts := metav1.ListOptions{LabelSelector: kvLabel}
pods, err := w.client.CoreV1().Pods(metav1.NamespaceSystem).List(context.TODO(), listOpts)
if err != nil {
fmt.Fprintf(w.writer, "[apiclient] Error getting Pods with label selector %q [%v]\n", kvLabel, err)
return false, nil
}
}
return true, nil
})
if lastKnownPodNumber != len(pods.Items) {
fmt.Fprintf(w.writer, "[apiclient] Found %d Pods for label selector %s\n", len(pods.Items), kvLabel)
lastKnownPodNumber = len(pods.Items)
}
if len(pods.Items) == 0 {
return false, nil
}
for _, pod := range pods.Items {
if pod.Status.Phase != v1.PodRunning {
return false, nil
}
}
return true, nil
})
}
// WaitForPodToDisappear blocks until it timeouts or gets a "NotFound" response from the API Server when getting the Static Pod in question
func (w *KubeWaiter) WaitForPodToDisappear(podName string) error {
return wait.PollImmediate(kubeadmconstants.APICallRetryInterval, w.timeout, func() (bool, error) {
_, err := w.client.CoreV1().Pods(metav1.NamespaceSystem).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil && apierrors.IsNotFound(err) {
fmt.Printf("[apiclient] The old Pod %q is now removed (which is desired)\n", podName)
return true, nil
}
return false, nil
})
return wait.PollUntilContextTimeout(context.Background(),
kubeadmconstants.APICallRetryInterval, w.timeout,
true, func(_ context.Context) (bool, error) {
_, err := w.client.CoreV1().Pods(metav1.NamespaceSystem).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil && apierrors.IsNotFound(err) {
fmt.Printf("[apiclient] The old Pod %q is now removed (which is desired)\n", podName)
return true, nil
}
return false, nil
})
}
// WaitForKubelet blocks until the kubelet /healthz endpoint returns 'ok'.
@ -204,14 +208,16 @@ func (w *KubeWaiter) WaitForStaticPodControlPlaneHashes(nodeName string) (map[st
var err, lastErr error
mirrorPodHashes := map[string]string{}
for _, component := range kubeadmconstants.ControlPlaneComponents {
err = wait.PollImmediate(kubeadmconstants.APICallRetryInterval, w.timeout, func() (bool, error) {
componentHash, err = getStaticPodSingleHash(w.client, nodeName, component)
if err != nil {
lastErr = err
return false, nil
}
return true, nil
})
err = wait.PollUntilContextTimeout(context.Background(),
kubeadmconstants.APICallRetryInterval, w.timeout,
true, func(_ context.Context) (bool, error) {
componentHash, err = getStaticPodSingleHash(w.client, nodeName, component)
if err != nil {
lastErr = err
return false, nil
}
return true, nil
})
if err != nil {
return nil, lastErr
}
@ -226,14 +232,16 @@ func (w *KubeWaiter) WaitForStaticPodSingleHash(nodeName string, component strin
componentPodHash := ""
var err, lastErr error
err = wait.PollImmediate(kubeadmconstants.APICallRetryInterval, w.timeout, func() (bool, error) {
componentPodHash, err = getStaticPodSingleHash(w.client, nodeName, component)
if err != nil {
lastErr = err
return false, nil
}
return true, nil
})
err = wait.PollUntilContextTimeout(context.Background(),
kubeadmconstants.APICallRetryInterval, w.timeout,
true, func(_ context.Context) (bool, error) {
componentPodHash, err = getStaticPodSingleHash(w.client, nodeName, component)
if err != nil {
lastErr = err
return false, nil
}
return true, nil
})
if err != nil {
err = lastErr
@ -245,21 +253,23 @@ func (w *KubeWaiter) WaitForStaticPodSingleHash(nodeName string, component strin
// This implicitly means this function blocks until the kubelet has restarted the Static Pod in question
func (w *KubeWaiter) WaitForStaticPodHashChange(nodeName, component, previousHash string) error {
var err, lastErr error
err = wait.PollImmediate(kubeadmconstants.APICallRetryInterval, w.timeout, func() (bool, error) {
hash, err := getStaticPodSingleHash(w.client, nodeName, component)
if err != nil {
lastErr = err
return false, nil
}
// Set lastErr to nil to be able to later distinguish between getStaticPodSingleHash() and timeout errors
lastErr = nil
// We should continue polling until the UID changes
if hash == previousHash {
return false, nil
}
err = wait.PollUntilContextTimeout(context.Background(),
kubeadmconstants.APICallRetryInterval, w.timeout,
true, func(_ context.Context) (bool, error) {
hash, err := getStaticPodSingleHash(w.client, nodeName, component)
if err != nil {
lastErr = err
return false, nil
}
// Set lastErr to nil to be able to later distinguish between getStaticPodSingleHash() and timeout errors
lastErr = nil
// We should continue polling until the UID changes
if hash == previousHash {
return false, nil
}
return true, nil
})
return true, nil
})
// If lastError is not nil, this must be a getStaticPodSingleHash() error, else if err is not nil there was a poll timeout
if lastErr != nil {