kubeadm: apply retries to all API calls in idempotency.go

The idempotency.go (perhaps not so accurately named) contains
API calls that kubeadm does against an API server using client-go.

Some users seem to have unstable setups where for unknown reasons
the API server can be unavailable or refuse to respond as expected.

Use PollUntilContextTimeout in all exported functions to ensure
such API calls are all retry-able.

NOTE: The context passed to PollUntilContextTimeout is not propagated
in the polled function. Instead the poll function creates it's own
context 'ctx := context.Background()', this is to avoid
breaking expectations on the side of the callers, that expect
a certain type of error and not "context timeout" errors.

Additional changes:
- Make all context.TODO() -> context.Background()
- Update all unit tests and make sure during testing the retry
interval and timeout are short. Test coverage of idempotency.go
is at ~97%.
- Remove the TestMutateConfigMapWithConflict test. It does not
contribute much, because conflict handling is done at the API,
server side, not on the side of kubeadm. This simulating this is not
needed.
This commit is contained in:
Lubomir I. Ivanov 2024-02-13 17:55:30 +02:00
parent 5bf23121cc
commit c29450eb00
4 changed files with 1117 additions and 406 deletions

View File

@ -22,6 +22,7 @@ import (
"os" "os"
"strings" "strings"
"testing" "testing"
"time"
"github.com/lithammer/dedent" "github.com/lithammer/dedent"
@ -114,6 +115,14 @@ func TestEnsureProxyAddon(t *testing.T) {
}, },
} }
// Override the default timeouts to be shorter
defaultTimeouts := kubeadmapi.GetActiveTimeouts()
defaultAPICallTimeout := defaultTimeouts.KubernetesAPICall
defaultTimeouts.KubernetesAPICall = &metav1.Duration{Duration: time.Microsecond * 500}
defer func() {
defaultTimeouts.KubernetesAPICall = defaultAPICallTimeout
}()
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
// Create a fake client and set up default test configuration // Create a fake client and set up default test configuration

View File

@ -21,6 +21,7 @@ import (
"os" "os"
"testing" "testing"
"text/template" "text/template"
"time"
rbac "k8s.io/api/rbac/v1" rbac "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -31,6 +32,8 @@ import (
clientsetfake "k8s.io/client-go/kubernetes/fake" clientsetfake "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing" core "k8s.io/client-go/testing"
bootstrapapi "k8s.io/cluster-bootstrap/token/api" bootstrapapi "k8s.io/cluster-bootstrap/token/api"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
) )
var testConfigTempl = template.Must(template.New("test").Parse(`apiVersion: v1 var testConfigTempl = template.Must(template.New("test").Parse(`apiVersion: v1
@ -104,6 +107,14 @@ func TestCreateBootstrapConfigMapIfNotExists(t *testing.T) {
t.Fatalf("could not close tempfile: %v", err) t.Fatalf("could not close tempfile: %v", err)
} }
// Override the default timeouts to be shorter
defaultTimeouts := kubeadmapi.GetActiveTimeouts()
defaultAPICallTimeout := defaultTimeouts.KubernetesAPICall
defaultTimeouts.KubernetesAPICall = &metav1.Duration{Duration: time.Microsecond * 500}
defer func() {
defaultTimeouts.KubernetesAPICall = defaultAPICallTimeout
}()
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
client := clientsetfake.NewSimpleClientset() client := clientsetfake.NewSimpleClientset()

View File

@ -40,20 +40,34 @@ import (
// ConfigMapMutator is a function that mutates the given ConfigMap and optionally returns an error // ConfigMapMutator is a function that mutates the given ConfigMap and optionally returns an error
type ConfigMapMutator func(*v1.ConfigMap) error type ConfigMapMutator func(*v1.ConfigMap) error
// apiCallRetryInterval holds a local copy of apiCallRetryInterval for testing purposes
var apiCallRetryInterval = constants.KubernetesAPICallRetryInterval
// TODO: We should invent a dynamic mechanism for this using the dynamic client instead of hard-coding these functions per-type // TODO: We should invent a dynamic mechanism for this using the dynamic client instead of hard-coding these functions per-type
// CreateOrUpdateConfigMap creates a ConfigMap if the target resource doesn't exist. If the resource exists already, this function will update the resource instead. // CreateOrUpdateConfigMap creates a ConfigMap if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
func CreateOrUpdateConfigMap(client clientset.Interface, cm *v1.ConfigMap) error { func CreateOrUpdateConfigMap(client clientset.Interface, cm *v1.ConfigMap) error {
if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Create(context.TODO(), cm, metav1.CreateOptions{}); err != nil { var lastError error
if !apierrors.IsAlreadyExists(err) { err := wait.PollUntilContextTimeout(context.Background(),
return errors.Wrap(err, "unable to create ConfigMap") apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
} true, func(_ context.Context) (bool, error) {
ctx := context.Background()
if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Update(context.TODO(), cm, metav1.UpdateOptions{}); err != nil { if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Create(ctx, cm, metav1.CreateOptions{}); err != nil {
return errors.Wrap(err, "unable to update ConfigMap") if !apierrors.IsAlreadyExists(err) {
} lastError = errors.Wrap(err, "unable to create ConfigMap")
return false, nil
}
if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Update(ctx, cm, metav1.UpdateOptions{}); err != nil {
lastError = errors.Wrap(err, "unable to update ConfigMap")
return false, nil
}
}
return true, nil
})
if err == nil {
return nil
} }
return nil return lastError
} }
// CreateOrMutateConfigMap tries to create the ConfigMap provided as cm. If the resource exists already, the latest version will be fetched from // CreateOrMutateConfigMap tries to create the ConfigMap provided as cm. If the resource exists already, the latest version will be fetched from
@ -63,12 +77,12 @@ func CreateOrUpdateConfigMap(client clientset.Interface, cm *v1.ConfigMap) error
func CreateOrMutateConfigMap(client clientset.Interface, cm *v1.ConfigMap, mutator ConfigMapMutator) error { func CreateOrMutateConfigMap(client clientset.Interface, cm *v1.ConfigMap, mutator ConfigMapMutator) error {
var lastError error var lastError error
err := wait.PollUntilContextTimeout(context.Background(), err := wait.PollUntilContextTimeout(context.Background(),
constants.KubernetesAPICallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration, apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
true, func(_ context.Context) (bool, error) { true, func(_ context.Context) (bool, error) {
if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Create(context.TODO(), cm, metav1.CreateOptions{}); err != nil { if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Create(context.Background(), cm, metav1.CreateOptions{}); err != nil {
lastError = err lastError = err
if apierrors.IsAlreadyExists(err) { if apierrors.IsAlreadyExists(err) {
lastError = MutateConfigMap(client, metav1.ObjectMeta{Namespace: cm.ObjectMeta.Namespace, Name: cm.ObjectMeta.Name}, mutator) lastError = mutateConfigMap(client, metav1.ObjectMeta{Namespace: cm.ObjectMeta.Namespace, Name: cm.ObjectMeta.Name}, mutator)
return lastError == nil, nil return lastError == nil, nil
} }
return false, nil return false, nil
@ -81,26 +95,41 @@ func CreateOrMutateConfigMap(client clientset.Interface, cm *v1.ConfigMap, mutat
return lastError return lastError
} }
// MutateConfigMap takes a ConfigMap Object Meta (namespace and name), retrieves the resource from the server and tries to mutate it // mutateConfigMap takes a ConfigMap Object Meta (namespace and name), retrieves the resource from the server and tries to mutate it
// by calling to the mutator callback, then an Update of the mutated ConfigMap will be performed. This function is resilient // by calling to the mutator callback, then an Update of the mutated ConfigMap will be performed. This function is resilient
// to conflicts, and a retry will be issued if the ConfigMap was modified on the server between the refresh and the update (while the mutation was // to conflicts, and a retry will be issued if the ConfigMap was modified on the server between the refresh and the update (while the mutation was
// taking place). // taking place).
func MutateConfigMap(client clientset.Interface, meta metav1.ObjectMeta, mutator ConfigMapMutator) error { func mutateConfigMap(client clientset.Interface, meta metav1.ObjectMeta, mutator ConfigMapMutator) error {
ctx := context.Background()
configMap, err := client.CoreV1().ConfigMaps(meta.Namespace).Get(ctx, meta.Name, metav1.GetOptions{})
if err != nil {
return errors.Wrap(err, "unable to get ConfigMap")
}
if err = mutator(configMap); err != nil {
return errors.Wrap(err, "unable to mutate ConfigMap")
}
_, err = client.CoreV1().ConfigMaps(configMap.ObjectMeta.Namespace).Update(ctx, configMap, metav1.UpdateOptions{})
return err
}
// CreateOrRetainConfigMap creates a ConfigMap if the target resource doesn't exist. If the resource exists already, this function will retain the resource instead.
func CreateOrRetainConfigMap(client clientset.Interface, cm *v1.ConfigMap, configMapName string) error {
var lastError error var lastError error
err := wait.PollUntilContextTimeout(context.Background(), err := wait.PollUntilContextTimeout(context.Background(),
constants.KubernetesAPICallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration, apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
true, func(_ context.Context) (bool, error) { true, func(_ context.Context) (bool, error) {
configMap, err := client.CoreV1().ConfigMaps(meta.Namespace).Get(context.TODO(), meta.Name, metav1.GetOptions{}) ctx := context.Background()
if err != nil { if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Get(ctx, configMapName, metav1.GetOptions{}); err != nil {
lastError = err if !apierrors.IsNotFound(err) {
return false, nil lastError = errors.Wrap(err, "unable to get ConfigMap")
return false, nil
}
if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Create(ctx, cm, metav1.CreateOptions{}); err != nil {
lastError = errors.Wrap(err, "unable to create ConfigMap")
return false, nil
}
} }
if err = mutator(configMap); err != nil { return true, nil
lastError = errors.Wrap(err, "unable to mutate ConfigMap")
return false, nil
}
_, lastError = client.CoreV1().ConfigMaps(configMap.ObjectMeta.Namespace).Update(context.TODO(), configMap, metav1.UpdateOptions{})
return lastError == nil, nil
}) })
if err == nil { if err == nil {
return nil return nil
@ -108,104 +137,147 @@ func MutateConfigMap(client clientset.Interface, meta metav1.ObjectMeta, mutator
return lastError return lastError
} }
// CreateOrRetainConfigMap creates a ConfigMap if the target resource doesn't exist. If the resource exists already, this function will retain the resource instead.
func CreateOrRetainConfigMap(client clientset.Interface, cm *v1.ConfigMap, configMapName string) error {
if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Get(context.TODO(), configMapName, metav1.GetOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
return nil
}
if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Create(context.TODO(), cm, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) {
return errors.Wrap(err, "unable to create ConfigMap")
}
}
}
return nil
}
// CreateOrUpdateSecret creates a Secret if the target resource doesn't exist. If the resource exists already, this function will update the resource instead. // CreateOrUpdateSecret creates a Secret if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
func CreateOrUpdateSecret(client clientset.Interface, secret *v1.Secret) error { func CreateOrUpdateSecret(client clientset.Interface, secret *v1.Secret) error {
if _, err := client.CoreV1().Secrets(secret.ObjectMeta.Namespace).Create(context.TODO(), secret, metav1.CreateOptions{}); err != nil { var lastError error
if !apierrors.IsAlreadyExists(err) { err := wait.PollUntilContextTimeout(context.Background(),
return errors.Wrap(err, "unable to create secret") apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
} true, func(_ context.Context) (bool, error) {
ctx := context.Background()
if _, err := client.CoreV1().Secrets(secret.ObjectMeta.Namespace).Update(context.TODO(), secret, metav1.UpdateOptions{}); err != nil { if _, err := client.CoreV1().Secrets(secret.ObjectMeta.Namespace).Create(ctx, secret, metav1.CreateOptions{}); err != nil {
return errors.Wrap(err, "unable to update secret") if !apierrors.IsAlreadyExists(err) {
} lastError = errors.Wrap(err, "unable to create Secret")
return false, nil
}
if _, err := client.CoreV1().Secrets(secret.ObjectMeta.Namespace).Update(ctx, secret, metav1.UpdateOptions{}); err != nil {
lastError = errors.Wrap(err, "unable to update Secret")
return false, nil
}
}
return true, nil
})
if err == nil {
return nil
} }
return nil return lastError
} }
// CreateOrUpdateServiceAccount creates a ServiceAccount if the target resource doesn't exist. If the resource exists already, this function will update the resource instead. // CreateOrUpdateServiceAccount creates a ServiceAccount if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
func CreateOrUpdateServiceAccount(client clientset.Interface, sa *v1.ServiceAccount) error { func CreateOrUpdateServiceAccount(client clientset.Interface, sa *v1.ServiceAccount) error {
if _, err := client.CoreV1().ServiceAccounts(sa.ObjectMeta.Namespace).Create(context.TODO(), sa, metav1.CreateOptions{}); err != nil { var lastError error
// Note: We don't run .Update here afterwards as that's probably not required err := wait.PollUntilContextTimeout(context.Background(),
// Only thing that could be updated is annotations/labels in .metadata, but we don't use that currently apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
if !apierrors.IsAlreadyExists(err) { true, func(_ context.Context) (bool, error) {
return errors.Wrap(err, "unable to create serviceaccount") ctx := context.Background()
} if _, err := client.CoreV1().ServiceAccounts(sa.ObjectMeta.Namespace).Create(ctx, sa, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) {
lastError = errors.Wrap(err, "unable to create ServicAccount")
return false, nil
}
if _, err := client.CoreV1().ServiceAccounts(sa.ObjectMeta.Namespace).Update(ctx, sa, metav1.UpdateOptions{}); err != nil {
lastError = errors.Wrap(err, "unable to update ServicAccount")
return false, nil
}
}
return true, nil
})
if err == nil {
return nil
} }
return nil return lastError
} }
// CreateOrUpdateDeployment creates a Deployment if the target resource doesn't exist. If the resource exists already, this function will update the resource instead. // CreateOrUpdateDeployment creates a Deployment if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
func CreateOrUpdateDeployment(client clientset.Interface, deploy *apps.Deployment) error { func CreateOrUpdateDeployment(client clientset.Interface, deploy *apps.Deployment) error {
if _, err := client.AppsV1().Deployments(deploy.ObjectMeta.Namespace).Create(context.TODO(), deploy, metav1.CreateOptions{}); err != nil { var lastError error
if !apierrors.IsAlreadyExists(err) { err := wait.PollUntilContextTimeout(context.Background(),
return errors.Wrap(err, "unable to create deployment") apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
} true, func(_ context.Context) (bool, error) {
ctx := context.Background()
if _, err := client.AppsV1().Deployments(deploy.ObjectMeta.Namespace).Update(context.TODO(), deploy, metav1.UpdateOptions{}); err != nil { if _, err := client.AppsV1().Deployments(deploy.ObjectMeta.Namespace).Create(ctx, deploy, metav1.CreateOptions{}); err != nil {
return errors.Wrap(err, "unable to update deployment") if !apierrors.IsAlreadyExists(err) {
} lastError = errors.Wrap(err, "unable to create Deployment")
return false, nil
}
if _, err := client.AppsV1().Deployments(deploy.ObjectMeta.Namespace).Update(ctx, deploy, metav1.UpdateOptions{}); err != nil {
lastError = errors.Wrap(err, "unable to update Deployment")
return false, nil
}
}
return true, nil
})
if err == nil {
return nil
} }
return nil return lastError
} }
// CreateOrRetainDeployment creates a Deployment if the target resource doesn't exist. If the resource exists already, this function will retain the resource instead. // CreateOrRetainDeployment creates a Deployment if the target resource doesn't exist. If the resource exists already, this function will retain the resource instead.
func CreateOrRetainDeployment(client clientset.Interface, deploy *apps.Deployment, deployName string) error { func CreateOrRetainDeployment(client clientset.Interface, deploy *apps.Deployment, deployName string) error {
if _, err := client.AppsV1().Deployments(deploy.ObjectMeta.Namespace).Get(context.TODO(), deployName, metav1.GetOptions{}); err != nil { var lastError error
if !apierrors.IsNotFound(err) { err := wait.PollUntilContextTimeout(context.Background(),
return nil apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
} true, func(_ context.Context) (bool, error) {
if _, err := client.AppsV1().Deployments(deploy.ObjectMeta.Namespace).Create(context.TODO(), deploy, metav1.CreateOptions{}); err != nil { ctx := context.Background()
if !apierrors.IsAlreadyExists(err) { if _, err := client.AppsV1().Deployments(deploy.ObjectMeta.Namespace).Get(ctx, deployName, metav1.GetOptions{}); err != nil {
return errors.Wrap(err, "unable to create deployment") if !apierrors.IsNotFound(err) {
lastError = errors.Wrap(err, "unable to get Deployment")
return false, nil
}
if _, err := client.AppsV1().Deployments(deploy.ObjectMeta.Namespace).Create(ctx, deploy, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) {
lastError = errors.Wrap(err, "unable to create Deployment")
return false, nil
}
}
} }
} return true, nil
})
if err == nil {
return nil
} }
return nil return lastError
} }
// CreateOrUpdateDaemonSet creates a DaemonSet if the target resource doesn't exist. If the resource exists already, this function will update the resource instead. // CreateOrUpdateDaemonSet creates a DaemonSet if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
func CreateOrUpdateDaemonSet(client clientset.Interface, ds *apps.DaemonSet) error { func CreateOrUpdateDaemonSet(client clientset.Interface, ds *apps.DaemonSet) error {
if _, err := client.AppsV1().DaemonSets(ds.ObjectMeta.Namespace).Create(context.TODO(), ds, metav1.CreateOptions{}); err != nil { var lastError error
if !apierrors.IsAlreadyExists(err) { err := wait.PollUntilContextTimeout(context.Background(),
return errors.Wrap(err, "unable to create daemonset") apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
} true, func(_ context.Context) (bool, error) {
ctx := context.Background()
if _, err := client.AppsV1().DaemonSets(ds.ObjectMeta.Namespace).Update(context.TODO(), ds, metav1.UpdateOptions{}); err != nil { if _, err := client.AppsV1().DaemonSets(ds.ObjectMeta.Namespace).Create(ctx, ds, metav1.CreateOptions{}); err != nil {
return errors.Wrap(err, "unable to update daemonset") if !apierrors.IsAlreadyExists(err) {
} lastError = errors.Wrap(err, "unable to create DaemonSet")
return false, nil
}
if _, err := client.AppsV1().DaemonSets(ds.ObjectMeta.Namespace).Update(ctx, ds, metav1.UpdateOptions{}); err != nil {
lastError = errors.Wrap(err, "unable to update DaemonSet")
return false, nil
}
}
return true, nil
})
if err == nil {
return nil
} }
return nil return lastError
} }
// CreateOrUpdateRole creates a Role if the target resource doesn't exist. If the resource exists already, this function will update the resource instead. // CreateOrUpdateRole creates a Role if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
func CreateOrUpdateRole(client clientset.Interface, role *rbac.Role) error { func CreateOrUpdateRole(client clientset.Interface, role *rbac.Role) error {
var lastError error var lastError error
err := wait.PollUntilContextTimeout(context.Background(), err := wait.PollUntilContextTimeout(context.Background(),
constants.KubernetesAPICallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration, apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
true, func(_ context.Context) (bool, error) { true, func(_ context.Context) (bool, error) {
if _, err := client.RbacV1().Roles(role.ObjectMeta.Namespace).Create(context.TODO(), role, metav1.CreateOptions{}); err != nil { ctx := context.Background()
if _, err := client.RbacV1().Roles(role.ObjectMeta.Namespace).Create(ctx, role, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) { if !apierrors.IsAlreadyExists(err) {
lastError = errors.Wrap(err, "unable to create RBAC role") lastError = errors.Wrap(err, "unable to create Role")
return false, nil return false, nil
} }
if _, err := client.RbacV1().Roles(role.ObjectMeta.Namespace).Update(ctx, role, metav1.UpdateOptions{}); err != nil {
if _, err := client.RbacV1().Roles(role.ObjectMeta.Namespace).Update(context.TODO(), role, metav1.UpdateOptions{}); err != nil { lastError = errors.Wrap(err, "unable to update Role")
lastError = errors.Wrap(err, "unable to update RBAC role")
return false, nil return false, nil
} }
} }
@ -221,16 +293,16 @@ func CreateOrUpdateRole(client clientset.Interface, role *rbac.Role) error {
func CreateOrUpdateRoleBinding(client clientset.Interface, roleBinding *rbac.RoleBinding) error { func CreateOrUpdateRoleBinding(client clientset.Interface, roleBinding *rbac.RoleBinding) error {
var lastError error var lastError error
err := wait.PollUntilContextTimeout(context.Background(), err := wait.PollUntilContextTimeout(context.Background(),
constants.KubernetesAPICallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration, apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
true, func(_ context.Context) (bool, error) { true, func(_ context.Context) (bool, error) {
if _, err := client.RbacV1().RoleBindings(roleBinding.ObjectMeta.Namespace).Create(context.TODO(), roleBinding, metav1.CreateOptions{}); err != nil { ctx := context.Background()
if _, err := client.RbacV1().RoleBindings(roleBinding.ObjectMeta.Namespace).Create(ctx, roleBinding, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) { if !apierrors.IsAlreadyExists(err) {
lastError = errors.Wrap(err, "unable to create RBAC rolebinding") lastError = errors.Wrap(err, "unable to create RoleBinding")
return false, nil return false, nil
} }
if _, err := client.RbacV1().RoleBindings(roleBinding.ObjectMeta.Namespace).Update(ctx, roleBinding, metav1.UpdateOptions{}); err != nil {
if _, err := client.RbacV1().RoleBindings(roleBinding.ObjectMeta.Namespace).Update(context.TODO(), roleBinding, metav1.UpdateOptions{}); err != nil { lastError = errors.Wrap(err, "unable to update RoleBinding")
lastError = errors.Wrap(err, "unable to update RBAC rolebinding")
return false, nil return false, nil
} }
} }
@ -244,37 +316,60 @@ func CreateOrUpdateRoleBinding(client clientset.Interface, roleBinding *rbac.Rol
// CreateOrUpdateClusterRole creates a ClusterRole if the target resource doesn't exist. If the resource exists already, this function will update the resource instead. // CreateOrUpdateClusterRole creates a ClusterRole if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
func CreateOrUpdateClusterRole(client clientset.Interface, clusterRole *rbac.ClusterRole) error { func CreateOrUpdateClusterRole(client clientset.Interface, clusterRole *rbac.ClusterRole) error {
if _, err := client.RbacV1().ClusterRoles().Create(context.TODO(), clusterRole, metav1.CreateOptions{}); err != nil { var lastError error
if !apierrors.IsAlreadyExists(err) { err := wait.PollUntilContextTimeout(context.Background(),
return errors.Wrap(err, "unable to create RBAC clusterrole") apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
} true, func(_ context.Context) (bool, error) {
ctx := context.Background()
if _, err := client.RbacV1().ClusterRoles().Update(context.TODO(), clusterRole, metav1.UpdateOptions{}); err != nil { if _, err := client.RbacV1().ClusterRoles().Create(ctx, clusterRole, metav1.CreateOptions{}); err != nil {
return errors.Wrap(err, "unable to update RBAC clusterrole") if !apierrors.IsAlreadyExists(err) {
} lastError = errors.Wrap(err, "unable to create ClusterRole")
return false, nil
}
if _, err := client.RbacV1().ClusterRoles().Update(ctx, clusterRole, metav1.UpdateOptions{}); err != nil {
lastError = errors.Wrap(err, "unable to update ClusterRole")
return false, nil
}
}
return true, nil
})
if err == nil {
return nil
} }
return nil return lastError
} }
// CreateOrUpdateClusterRoleBinding creates a ClusterRoleBinding if the target resource doesn't exist. If the resource exists already, this function will update the resource instead. // CreateOrUpdateClusterRoleBinding creates a ClusterRoleBinding if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
func CreateOrUpdateClusterRoleBinding(client clientset.Interface, clusterRoleBinding *rbac.ClusterRoleBinding) error { func CreateOrUpdateClusterRoleBinding(client clientset.Interface, clusterRoleBinding *rbac.ClusterRoleBinding) error {
if _, err := client.RbacV1().ClusterRoleBindings().Create(context.TODO(), clusterRoleBinding, metav1.CreateOptions{}); err != nil { var lastError error
if !apierrors.IsAlreadyExists(err) { err := wait.PollUntilContextTimeout(context.Background(),
return errors.Wrap(err, "unable to create RBAC clusterrolebinding") apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
} true, func(_ context.Context) (bool, error) {
ctx := context.Background()
if _, err := client.RbacV1().ClusterRoleBindings().Update(context.TODO(), clusterRoleBinding, metav1.UpdateOptions{}); err != nil { if _, err := client.RbacV1().ClusterRoleBindings().Create(ctx, clusterRoleBinding, metav1.CreateOptions{}); err != nil {
return errors.Wrap(err, "unable to update RBAC clusterrolebinding") if !apierrors.IsAlreadyExists(err) {
} lastError = errors.Wrap(err, "unable to create ClusterRoleBinding")
return false, nil
}
if _, err := client.RbacV1().ClusterRoleBindings().Update(ctx, clusterRoleBinding, metav1.UpdateOptions{}); err != nil {
lastError = errors.Wrap(err, "unable to update ClusterRoleBinding")
return false, nil
}
}
return true, nil
})
if err == nil {
return nil
} }
return nil return lastError
} }
// PatchNodeOnce executes patchFn on the node object found by the node name. // PatchNodeOnce executes patchFn on the node object found by the node name.
func PatchNodeOnce(client clientset.Interface, nodeName string, patchFn func(*v1.Node), lastError *error) func(context.Context) (bool, error) { func PatchNodeOnce(client clientset.Interface, nodeName string, patchFn func(*v1.Node), lastError *error) func(context.Context) (bool, error) {
return func(_ context.Context) (bool, error) { return func(_ context.Context) (bool, error) {
// First get the node object // First get the node object
n, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) ctx := context.Background()
n, err := client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil { if err != nil {
*lastError = err *lastError = err
return false, nil // retry on any error return false, nil // retry on any error
@ -307,8 +402,8 @@ func PatchNodeOnce(client clientset.Interface, nodeName string, patchFn func(*v1
return false, *lastError return false, *lastError
} }
if _, err := client.CoreV1().Nodes().Patch(context.TODO(), n.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}); err != nil { if _, err := client.CoreV1().Nodes().Patch(ctx, n.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}); err != nil {
*lastError = errors.Wrapf(err, "error patching node %q through apiserver", n.Name) *lastError = errors.Wrapf(err, "error patching Node %q", n.Name)
if apierrors.IsTimeout(err) || apierrors.IsConflict(err) || apierrors.IsServerTimeout(err) || apierrors.IsServiceUnavailable(err) { if apierrors.IsTimeout(err) || apierrors.IsConflict(err) || apierrors.IsServerTimeout(err) || apierrors.IsServiceUnavailable(err) {
return false, nil return false, nil
} }
@ -324,7 +419,7 @@ func PatchNodeOnce(client clientset.Interface, nodeName string, patchFn func(*v1
func PatchNode(client clientset.Interface, nodeName string, patchFn func(*v1.Node)) error { func PatchNode(client clientset.Interface, nodeName string, patchFn func(*v1.Node)) error {
var lastError error var lastError error
err := wait.PollUntilContextTimeout(context.Background(), err := wait.PollUntilContextTimeout(context.Background(),
constants.KubernetesAPICallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration, apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
true, PatchNodeOnce(client, nodeName, patchFn, &lastError)) true, PatchNodeOnce(client, nodeName, patchFn, &lastError))
if err == nil { if err == nil {
return nil return nil

File diff suppressed because it is too large Load Diff