mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 18:00:08 +00:00
Merge pull request #123271 from neolit123/1.30-retry-all-api-calls
kubeadm: apply retries to all API calls in idempotency.go
This commit is contained in:
commit
7225dc6c3a
@ -22,6 +22,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/lithammer/dedent"
|
"github.com/lithammer/dedent"
|
||||||
|
|
||||||
@ -114,6 +115,14 @@ func TestEnsureProxyAddon(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Override the default timeouts to be shorter
|
||||||
|
defaultTimeouts := kubeadmapi.GetActiveTimeouts()
|
||||||
|
defaultAPICallTimeout := defaultTimeouts.KubernetesAPICall
|
||||||
|
defaultTimeouts.KubernetesAPICall = &metav1.Duration{Duration: time.Microsecond * 500}
|
||||||
|
defer func() {
|
||||||
|
defaultTimeouts.KubernetesAPICall = defaultAPICallTimeout
|
||||||
|
}()
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
// Create a fake client and set up default test configuration
|
// Create a fake client and set up default test configuration
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
"text/template"
|
"text/template"
|
||||||
|
"time"
|
||||||
|
|
||||||
rbac "k8s.io/api/rbac/v1"
|
rbac "k8s.io/api/rbac/v1"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
@ -31,6 +32,8 @@ import (
|
|||||||
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
||||||
core "k8s.io/client-go/testing"
|
core "k8s.io/client-go/testing"
|
||||||
bootstrapapi "k8s.io/cluster-bootstrap/token/api"
|
bootstrapapi "k8s.io/cluster-bootstrap/token/api"
|
||||||
|
|
||||||
|
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
|
||||||
)
|
)
|
||||||
|
|
||||||
var testConfigTempl = template.Must(template.New("test").Parse(`apiVersion: v1
|
var testConfigTempl = template.Must(template.New("test").Parse(`apiVersion: v1
|
||||||
@ -104,6 +107,14 @@ func TestCreateBootstrapConfigMapIfNotExists(t *testing.T) {
|
|||||||
t.Fatalf("could not close tempfile: %v", err)
|
t.Fatalf("could not close tempfile: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Override the default timeouts to be shorter
|
||||||
|
defaultTimeouts := kubeadmapi.GetActiveTimeouts()
|
||||||
|
defaultAPICallTimeout := defaultTimeouts.KubernetesAPICall
|
||||||
|
defaultTimeouts.KubernetesAPICall = &metav1.Duration{Duration: time.Microsecond * 500}
|
||||||
|
defer func() {
|
||||||
|
defaultTimeouts.KubernetesAPICall = defaultAPICallTimeout
|
||||||
|
}()
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
client := clientsetfake.NewSimpleClientset()
|
client := clientsetfake.NewSimpleClientset()
|
||||||
|
@ -40,20 +40,34 @@ import (
|
|||||||
// ConfigMapMutator is a function that mutates the given ConfigMap and optionally returns an error
|
// ConfigMapMutator is a function that mutates the given ConfigMap and optionally returns an error
|
||||||
type ConfigMapMutator func(*v1.ConfigMap) error
|
type ConfigMapMutator func(*v1.ConfigMap) error
|
||||||
|
|
||||||
|
// apiCallRetryInterval holds a local copy of apiCallRetryInterval for testing purposes
|
||||||
|
var apiCallRetryInterval = constants.KubernetesAPICallRetryInterval
|
||||||
|
|
||||||
// TODO: We should invent a dynamic mechanism for this using the dynamic client instead of hard-coding these functions per-type
|
// TODO: We should invent a dynamic mechanism for this using the dynamic client instead of hard-coding these functions per-type
|
||||||
|
|
||||||
// CreateOrUpdateConfigMap creates a ConfigMap if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
|
// CreateOrUpdateConfigMap creates a ConfigMap if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
|
||||||
func CreateOrUpdateConfigMap(client clientset.Interface, cm *v1.ConfigMap) error {
|
func CreateOrUpdateConfigMap(client clientset.Interface, cm *v1.ConfigMap) error {
|
||||||
if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Create(context.TODO(), cm, metav1.CreateOptions{}); err != nil {
|
var lastError error
|
||||||
if !apierrors.IsAlreadyExists(err) {
|
err := wait.PollUntilContextTimeout(context.Background(),
|
||||||
return errors.Wrap(err, "unable to create ConfigMap")
|
apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
|
||||||
}
|
true, func(_ context.Context) (bool, error) {
|
||||||
|
ctx := context.Background()
|
||||||
if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Update(context.TODO(), cm, metav1.UpdateOptions{}); err != nil {
|
if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Create(ctx, cm, metav1.CreateOptions{}); err != nil {
|
||||||
return errors.Wrap(err, "unable to update ConfigMap")
|
if !apierrors.IsAlreadyExists(err) {
|
||||||
}
|
lastError = errors.Wrap(err, "unable to create ConfigMap")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Update(ctx, cm, metav1.UpdateOptions{}); err != nil {
|
||||||
|
lastError = errors.Wrap(err, "unable to update ConfigMap")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
return nil
|
return lastError
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateOrMutateConfigMap tries to create the ConfigMap provided as cm. If the resource exists already, the latest version will be fetched from
|
// CreateOrMutateConfigMap tries to create the ConfigMap provided as cm. If the resource exists already, the latest version will be fetched from
|
||||||
@ -63,12 +77,12 @@ func CreateOrUpdateConfigMap(client clientset.Interface, cm *v1.ConfigMap) error
|
|||||||
func CreateOrMutateConfigMap(client clientset.Interface, cm *v1.ConfigMap, mutator ConfigMapMutator) error {
|
func CreateOrMutateConfigMap(client clientset.Interface, cm *v1.ConfigMap, mutator ConfigMapMutator) error {
|
||||||
var lastError error
|
var lastError error
|
||||||
err := wait.PollUntilContextTimeout(context.Background(),
|
err := wait.PollUntilContextTimeout(context.Background(),
|
||||||
constants.KubernetesAPICallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
|
apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
|
||||||
true, func(_ context.Context) (bool, error) {
|
true, func(_ context.Context) (bool, error) {
|
||||||
if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Create(context.TODO(), cm, metav1.CreateOptions{}); err != nil {
|
if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Create(context.Background(), cm, metav1.CreateOptions{}); err != nil {
|
||||||
lastError = err
|
lastError = err
|
||||||
if apierrors.IsAlreadyExists(err) {
|
if apierrors.IsAlreadyExists(err) {
|
||||||
lastError = MutateConfigMap(client, metav1.ObjectMeta{Namespace: cm.ObjectMeta.Namespace, Name: cm.ObjectMeta.Name}, mutator)
|
lastError = mutateConfigMap(client, metav1.ObjectMeta{Namespace: cm.ObjectMeta.Namespace, Name: cm.ObjectMeta.Name}, mutator)
|
||||||
return lastError == nil, nil
|
return lastError == nil, nil
|
||||||
}
|
}
|
||||||
return false, nil
|
return false, nil
|
||||||
@ -81,26 +95,41 @@ func CreateOrMutateConfigMap(client clientset.Interface, cm *v1.ConfigMap, mutat
|
|||||||
return lastError
|
return lastError
|
||||||
}
|
}
|
||||||
|
|
||||||
// MutateConfigMap takes a ConfigMap Object Meta (namespace and name), retrieves the resource from the server and tries to mutate it
|
// mutateConfigMap takes a ConfigMap Object Meta (namespace and name), retrieves the resource from the server and tries to mutate it
|
||||||
// by calling to the mutator callback, then an Update of the mutated ConfigMap will be performed. This function is resilient
|
// by calling to the mutator callback, then an Update of the mutated ConfigMap will be performed. This function is resilient
|
||||||
// to conflicts, and a retry will be issued if the ConfigMap was modified on the server between the refresh and the update (while the mutation was
|
// to conflicts, and a retry will be issued if the ConfigMap was modified on the server between the refresh and the update (while the mutation was
|
||||||
// taking place).
|
// taking place).
|
||||||
func MutateConfigMap(client clientset.Interface, meta metav1.ObjectMeta, mutator ConfigMapMutator) error {
|
func mutateConfigMap(client clientset.Interface, meta metav1.ObjectMeta, mutator ConfigMapMutator) error {
|
||||||
|
ctx := context.Background()
|
||||||
|
configMap, err := client.CoreV1().ConfigMaps(meta.Namespace).Get(ctx, meta.Name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "unable to get ConfigMap")
|
||||||
|
}
|
||||||
|
if err = mutator(configMap); err != nil {
|
||||||
|
return errors.Wrap(err, "unable to mutate ConfigMap")
|
||||||
|
}
|
||||||
|
_, err = client.CoreV1().ConfigMaps(configMap.ObjectMeta.Namespace).Update(ctx, configMap, metav1.UpdateOptions{})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateOrRetainConfigMap creates a ConfigMap if the target resource doesn't exist. If the resource exists already, this function will retain the resource instead.
|
||||||
|
func CreateOrRetainConfigMap(client clientset.Interface, cm *v1.ConfigMap, configMapName string) error {
|
||||||
var lastError error
|
var lastError error
|
||||||
err := wait.PollUntilContextTimeout(context.Background(),
|
err := wait.PollUntilContextTimeout(context.Background(),
|
||||||
constants.KubernetesAPICallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
|
apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
|
||||||
true, func(_ context.Context) (bool, error) {
|
true, func(_ context.Context) (bool, error) {
|
||||||
configMap, err := client.CoreV1().ConfigMaps(meta.Namespace).Get(context.TODO(), meta.Name, metav1.GetOptions{})
|
ctx := context.Background()
|
||||||
if err != nil {
|
if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Get(ctx, configMapName, metav1.GetOptions{}); err != nil {
|
||||||
lastError = err
|
if !apierrors.IsNotFound(err) {
|
||||||
return false, nil
|
lastError = errors.Wrap(err, "unable to get ConfigMap")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Create(ctx, cm, metav1.CreateOptions{}); err != nil {
|
||||||
|
lastError = errors.Wrap(err, "unable to create ConfigMap")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if err = mutator(configMap); err != nil {
|
return true, nil
|
||||||
lastError = errors.Wrap(err, "unable to mutate ConfigMap")
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
_, lastError = client.CoreV1().ConfigMaps(configMap.ObjectMeta.Namespace).Update(context.TODO(), configMap, metav1.UpdateOptions{})
|
|
||||||
return lastError == nil, nil
|
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
@ -108,104 +137,147 @@ func MutateConfigMap(client clientset.Interface, meta metav1.ObjectMeta, mutator
|
|||||||
return lastError
|
return lastError
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateOrRetainConfigMap creates a ConfigMap if the target resource doesn't exist. If the resource exists already, this function will retain the resource instead.
|
|
||||||
func CreateOrRetainConfigMap(client clientset.Interface, cm *v1.ConfigMap, configMapName string) error {
|
|
||||||
if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Get(context.TODO(), configMapName, metav1.GetOptions{}); err != nil {
|
|
||||||
if !apierrors.IsNotFound(err) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Create(context.TODO(), cm, metav1.CreateOptions{}); err != nil {
|
|
||||||
if !apierrors.IsAlreadyExists(err) {
|
|
||||||
return errors.Wrap(err, "unable to create ConfigMap")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// CreateOrUpdateSecret creates a Secret if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
|
// CreateOrUpdateSecret creates a Secret if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
|
||||||
func CreateOrUpdateSecret(client clientset.Interface, secret *v1.Secret) error {
|
func CreateOrUpdateSecret(client clientset.Interface, secret *v1.Secret) error {
|
||||||
if _, err := client.CoreV1().Secrets(secret.ObjectMeta.Namespace).Create(context.TODO(), secret, metav1.CreateOptions{}); err != nil {
|
var lastError error
|
||||||
if !apierrors.IsAlreadyExists(err) {
|
err := wait.PollUntilContextTimeout(context.Background(),
|
||||||
return errors.Wrap(err, "unable to create secret")
|
apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
|
||||||
}
|
true, func(_ context.Context) (bool, error) {
|
||||||
|
ctx := context.Background()
|
||||||
if _, err := client.CoreV1().Secrets(secret.ObjectMeta.Namespace).Update(context.TODO(), secret, metav1.UpdateOptions{}); err != nil {
|
if _, err := client.CoreV1().Secrets(secret.ObjectMeta.Namespace).Create(ctx, secret, metav1.CreateOptions{}); err != nil {
|
||||||
return errors.Wrap(err, "unable to update secret")
|
if !apierrors.IsAlreadyExists(err) {
|
||||||
}
|
lastError = errors.Wrap(err, "unable to create Secret")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if _, err := client.CoreV1().Secrets(secret.ObjectMeta.Namespace).Update(ctx, secret, metav1.UpdateOptions{}); err != nil {
|
||||||
|
lastError = errors.Wrap(err, "unable to update Secret")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
return nil
|
return lastError
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateOrUpdateServiceAccount creates a ServiceAccount if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
|
// CreateOrUpdateServiceAccount creates a ServiceAccount if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
|
||||||
func CreateOrUpdateServiceAccount(client clientset.Interface, sa *v1.ServiceAccount) error {
|
func CreateOrUpdateServiceAccount(client clientset.Interface, sa *v1.ServiceAccount) error {
|
||||||
if _, err := client.CoreV1().ServiceAccounts(sa.ObjectMeta.Namespace).Create(context.TODO(), sa, metav1.CreateOptions{}); err != nil {
|
var lastError error
|
||||||
// Note: We don't run .Update here afterwards as that's probably not required
|
err := wait.PollUntilContextTimeout(context.Background(),
|
||||||
// Only thing that could be updated is annotations/labels in .metadata, but we don't use that currently
|
apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
|
||||||
if !apierrors.IsAlreadyExists(err) {
|
true, func(_ context.Context) (bool, error) {
|
||||||
return errors.Wrap(err, "unable to create serviceaccount")
|
ctx := context.Background()
|
||||||
}
|
if _, err := client.CoreV1().ServiceAccounts(sa.ObjectMeta.Namespace).Create(ctx, sa, metav1.CreateOptions{}); err != nil {
|
||||||
|
if !apierrors.IsAlreadyExists(err) {
|
||||||
|
lastError = errors.Wrap(err, "unable to create ServicAccount")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if _, err := client.CoreV1().ServiceAccounts(sa.ObjectMeta.Namespace).Update(ctx, sa, metav1.UpdateOptions{}); err != nil {
|
||||||
|
lastError = errors.Wrap(err, "unable to update ServicAccount")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
return nil
|
return lastError
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateOrUpdateDeployment creates a Deployment if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
|
// CreateOrUpdateDeployment creates a Deployment if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
|
||||||
func CreateOrUpdateDeployment(client clientset.Interface, deploy *apps.Deployment) error {
|
func CreateOrUpdateDeployment(client clientset.Interface, deploy *apps.Deployment) error {
|
||||||
if _, err := client.AppsV1().Deployments(deploy.ObjectMeta.Namespace).Create(context.TODO(), deploy, metav1.CreateOptions{}); err != nil {
|
var lastError error
|
||||||
if !apierrors.IsAlreadyExists(err) {
|
err := wait.PollUntilContextTimeout(context.Background(),
|
||||||
return errors.Wrap(err, "unable to create deployment")
|
apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
|
||||||
}
|
true, func(_ context.Context) (bool, error) {
|
||||||
|
ctx := context.Background()
|
||||||
if _, err := client.AppsV1().Deployments(deploy.ObjectMeta.Namespace).Update(context.TODO(), deploy, metav1.UpdateOptions{}); err != nil {
|
if _, err := client.AppsV1().Deployments(deploy.ObjectMeta.Namespace).Create(ctx, deploy, metav1.CreateOptions{}); err != nil {
|
||||||
return errors.Wrap(err, "unable to update deployment")
|
if !apierrors.IsAlreadyExists(err) {
|
||||||
}
|
lastError = errors.Wrap(err, "unable to create Deployment")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if _, err := client.AppsV1().Deployments(deploy.ObjectMeta.Namespace).Update(ctx, deploy, metav1.UpdateOptions{}); err != nil {
|
||||||
|
lastError = errors.Wrap(err, "unable to update Deployment")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
return nil
|
return lastError
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateOrRetainDeployment creates a Deployment if the target resource doesn't exist. If the resource exists already, this function will retain the resource instead.
|
// CreateOrRetainDeployment creates a Deployment if the target resource doesn't exist. If the resource exists already, this function will retain the resource instead.
|
||||||
func CreateOrRetainDeployment(client clientset.Interface, deploy *apps.Deployment, deployName string) error {
|
func CreateOrRetainDeployment(client clientset.Interface, deploy *apps.Deployment, deployName string) error {
|
||||||
if _, err := client.AppsV1().Deployments(deploy.ObjectMeta.Namespace).Get(context.TODO(), deployName, metav1.GetOptions{}); err != nil {
|
var lastError error
|
||||||
if !apierrors.IsNotFound(err) {
|
err := wait.PollUntilContextTimeout(context.Background(),
|
||||||
return nil
|
apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
|
||||||
}
|
true, func(_ context.Context) (bool, error) {
|
||||||
if _, err := client.AppsV1().Deployments(deploy.ObjectMeta.Namespace).Create(context.TODO(), deploy, metav1.CreateOptions{}); err != nil {
|
ctx := context.Background()
|
||||||
if !apierrors.IsAlreadyExists(err) {
|
if _, err := client.AppsV1().Deployments(deploy.ObjectMeta.Namespace).Get(ctx, deployName, metav1.GetOptions{}); err != nil {
|
||||||
return errors.Wrap(err, "unable to create deployment")
|
if !apierrors.IsNotFound(err) {
|
||||||
|
lastError = errors.Wrap(err, "unable to get Deployment")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if _, err := client.AppsV1().Deployments(deploy.ObjectMeta.Namespace).Create(ctx, deploy, metav1.CreateOptions{}); err != nil {
|
||||||
|
if !apierrors.IsAlreadyExists(err) {
|
||||||
|
lastError = errors.Wrap(err, "unable to create Deployment")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
return true, nil
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
return nil
|
return lastError
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateOrUpdateDaemonSet creates a DaemonSet if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
|
// CreateOrUpdateDaemonSet creates a DaemonSet if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
|
||||||
func CreateOrUpdateDaemonSet(client clientset.Interface, ds *apps.DaemonSet) error {
|
func CreateOrUpdateDaemonSet(client clientset.Interface, ds *apps.DaemonSet) error {
|
||||||
if _, err := client.AppsV1().DaemonSets(ds.ObjectMeta.Namespace).Create(context.TODO(), ds, metav1.CreateOptions{}); err != nil {
|
var lastError error
|
||||||
if !apierrors.IsAlreadyExists(err) {
|
err := wait.PollUntilContextTimeout(context.Background(),
|
||||||
return errors.Wrap(err, "unable to create daemonset")
|
apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
|
||||||
}
|
true, func(_ context.Context) (bool, error) {
|
||||||
|
ctx := context.Background()
|
||||||
if _, err := client.AppsV1().DaemonSets(ds.ObjectMeta.Namespace).Update(context.TODO(), ds, metav1.UpdateOptions{}); err != nil {
|
if _, err := client.AppsV1().DaemonSets(ds.ObjectMeta.Namespace).Create(ctx, ds, metav1.CreateOptions{}); err != nil {
|
||||||
return errors.Wrap(err, "unable to update daemonset")
|
if !apierrors.IsAlreadyExists(err) {
|
||||||
}
|
lastError = errors.Wrap(err, "unable to create DaemonSet")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if _, err := client.AppsV1().DaemonSets(ds.ObjectMeta.Namespace).Update(ctx, ds, metav1.UpdateOptions{}); err != nil {
|
||||||
|
lastError = errors.Wrap(err, "unable to update DaemonSet")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
return nil
|
return lastError
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateOrUpdateRole creates a Role if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
|
// CreateOrUpdateRole creates a Role if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
|
||||||
func CreateOrUpdateRole(client clientset.Interface, role *rbac.Role) error {
|
func CreateOrUpdateRole(client clientset.Interface, role *rbac.Role) error {
|
||||||
var lastError error
|
var lastError error
|
||||||
err := wait.PollUntilContextTimeout(context.Background(),
|
err := wait.PollUntilContextTimeout(context.Background(),
|
||||||
constants.KubernetesAPICallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
|
apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
|
||||||
true, func(_ context.Context) (bool, error) {
|
true, func(_ context.Context) (bool, error) {
|
||||||
if _, err := client.RbacV1().Roles(role.ObjectMeta.Namespace).Create(context.TODO(), role, metav1.CreateOptions{}); err != nil {
|
ctx := context.Background()
|
||||||
|
if _, err := client.RbacV1().Roles(role.ObjectMeta.Namespace).Create(ctx, role, metav1.CreateOptions{}); err != nil {
|
||||||
if !apierrors.IsAlreadyExists(err) {
|
if !apierrors.IsAlreadyExists(err) {
|
||||||
lastError = errors.Wrap(err, "unable to create RBAC role")
|
lastError = errors.Wrap(err, "unable to create Role")
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
if _, err := client.RbacV1().Roles(role.ObjectMeta.Namespace).Update(ctx, role, metav1.UpdateOptions{}); err != nil {
|
||||||
if _, err := client.RbacV1().Roles(role.ObjectMeta.Namespace).Update(context.TODO(), role, metav1.UpdateOptions{}); err != nil {
|
lastError = errors.Wrap(err, "unable to update Role")
|
||||||
lastError = errors.Wrap(err, "unable to update RBAC role")
|
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -221,16 +293,16 @@ func CreateOrUpdateRole(client clientset.Interface, role *rbac.Role) error {
|
|||||||
func CreateOrUpdateRoleBinding(client clientset.Interface, roleBinding *rbac.RoleBinding) error {
|
func CreateOrUpdateRoleBinding(client clientset.Interface, roleBinding *rbac.RoleBinding) error {
|
||||||
var lastError error
|
var lastError error
|
||||||
err := wait.PollUntilContextTimeout(context.Background(),
|
err := wait.PollUntilContextTimeout(context.Background(),
|
||||||
constants.KubernetesAPICallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
|
apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
|
||||||
true, func(_ context.Context) (bool, error) {
|
true, func(_ context.Context) (bool, error) {
|
||||||
if _, err := client.RbacV1().RoleBindings(roleBinding.ObjectMeta.Namespace).Create(context.TODO(), roleBinding, metav1.CreateOptions{}); err != nil {
|
ctx := context.Background()
|
||||||
|
if _, err := client.RbacV1().RoleBindings(roleBinding.ObjectMeta.Namespace).Create(ctx, roleBinding, metav1.CreateOptions{}); err != nil {
|
||||||
if !apierrors.IsAlreadyExists(err) {
|
if !apierrors.IsAlreadyExists(err) {
|
||||||
lastError = errors.Wrap(err, "unable to create RBAC rolebinding")
|
lastError = errors.Wrap(err, "unable to create RoleBinding")
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
if _, err := client.RbacV1().RoleBindings(roleBinding.ObjectMeta.Namespace).Update(ctx, roleBinding, metav1.UpdateOptions{}); err != nil {
|
||||||
if _, err := client.RbacV1().RoleBindings(roleBinding.ObjectMeta.Namespace).Update(context.TODO(), roleBinding, metav1.UpdateOptions{}); err != nil {
|
lastError = errors.Wrap(err, "unable to update RoleBinding")
|
||||||
lastError = errors.Wrap(err, "unable to update RBAC rolebinding")
|
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -244,37 +316,60 @@ func CreateOrUpdateRoleBinding(client clientset.Interface, roleBinding *rbac.Rol
|
|||||||
|
|
||||||
// CreateOrUpdateClusterRole creates a ClusterRole if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
|
// CreateOrUpdateClusterRole creates a ClusterRole if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
|
||||||
func CreateOrUpdateClusterRole(client clientset.Interface, clusterRole *rbac.ClusterRole) error {
|
func CreateOrUpdateClusterRole(client clientset.Interface, clusterRole *rbac.ClusterRole) error {
|
||||||
if _, err := client.RbacV1().ClusterRoles().Create(context.TODO(), clusterRole, metav1.CreateOptions{}); err != nil {
|
var lastError error
|
||||||
if !apierrors.IsAlreadyExists(err) {
|
err := wait.PollUntilContextTimeout(context.Background(),
|
||||||
return errors.Wrap(err, "unable to create RBAC clusterrole")
|
apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
|
||||||
}
|
true, func(_ context.Context) (bool, error) {
|
||||||
|
ctx := context.Background()
|
||||||
if _, err := client.RbacV1().ClusterRoles().Update(context.TODO(), clusterRole, metav1.UpdateOptions{}); err != nil {
|
if _, err := client.RbacV1().ClusterRoles().Create(ctx, clusterRole, metav1.CreateOptions{}); err != nil {
|
||||||
return errors.Wrap(err, "unable to update RBAC clusterrole")
|
if !apierrors.IsAlreadyExists(err) {
|
||||||
}
|
lastError = errors.Wrap(err, "unable to create ClusterRole")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if _, err := client.RbacV1().ClusterRoles().Update(ctx, clusterRole, metav1.UpdateOptions{}); err != nil {
|
||||||
|
lastError = errors.Wrap(err, "unable to update ClusterRole")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
return nil
|
return lastError
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateOrUpdateClusterRoleBinding creates a ClusterRoleBinding if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
|
// CreateOrUpdateClusterRoleBinding creates a ClusterRoleBinding if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
|
||||||
func CreateOrUpdateClusterRoleBinding(client clientset.Interface, clusterRoleBinding *rbac.ClusterRoleBinding) error {
|
func CreateOrUpdateClusterRoleBinding(client clientset.Interface, clusterRoleBinding *rbac.ClusterRoleBinding) error {
|
||||||
if _, err := client.RbacV1().ClusterRoleBindings().Create(context.TODO(), clusterRoleBinding, metav1.CreateOptions{}); err != nil {
|
var lastError error
|
||||||
if !apierrors.IsAlreadyExists(err) {
|
err := wait.PollUntilContextTimeout(context.Background(),
|
||||||
return errors.Wrap(err, "unable to create RBAC clusterrolebinding")
|
apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
|
||||||
}
|
true, func(_ context.Context) (bool, error) {
|
||||||
|
ctx := context.Background()
|
||||||
if _, err := client.RbacV1().ClusterRoleBindings().Update(context.TODO(), clusterRoleBinding, metav1.UpdateOptions{}); err != nil {
|
if _, err := client.RbacV1().ClusterRoleBindings().Create(ctx, clusterRoleBinding, metav1.CreateOptions{}); err != nil {
|
||||||
return errors.Wrap(err, "unable to update RBAC clusterrolebinding")
|
if !apierrors.IsAlreadyExists(err) {
|
||||||
}
|
lastError = errors.Wrap(err, "unable to create ClusterRoleBinding")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if _, err := client.RbacV1().ClusterRoleBindings().Update(ctx, clusterRoleBinding, metav1.UpdateOptions{}); err != nil {
|
||||||
|
lastError = errors.Wrap(err, "unable to update ClusterRoleBinding")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
return nil
|
return lastError
|
||||||
}
|
}
|
||||||
|
|
||||||
// PatchNodeOnce executes patchFn on the node object found by the node name.
|
// PatchNodeOnce executes patchFn on the node object found by the node name.
|
||||||
func PatchNodeOnce(client clientset.Interface, nodeName string, patchFn func(*v1.Node), lastError *error) func(context.Context) (bool, error) {
|
func PatchNodeOnce(client clientset.Interface, nodeName string, patchFn func(*v1.Node), lastError *error) func(context.Context) (bool, error) {
|
||||||
return func(_ context.Context) (bool, error) {
|
return func(_ context.Context) (bool, error) {
|
||||||
// First get the node object
|
// First get the node object
|
||||||
n, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
|
ctx := context.Background()
|
||||||
|
n, err := client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
*lastError = err
|
*lastError = err
|
||||||
return false, nil // retry on any error
|
return false, nil // retry on any error
|
||||||
@ -307,8 +402,8 @@ func PatchNodeOnce(client clientset.Interface, nodeName string, patchFn func(*v1
|
|||||||
return false, *lastError
|
return false, *lastError
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := client.CoreV1().Nodes().Patch(context.TODO(), n.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}); err != nil {
|
if _, err := client.CoreV1().Nodes().Patch(ctx, n.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}); err != nil {
|
||||||
*lastError = errors.Wrapf(err, "error patching node %q through apiserver", n.Name)
|
*lastError = errors.Wrapf(err, "error patching Node %q", n.Name)
|
||||||
if apierrors.IsTimeout(err) || apierrors.IsConflict(err) || apierrors.IsServerTimeout(err) || apierrors.IsServiceUnavailable(err) {
|
if apierrors.IsTimeout(err) || apierrors.IsConflict(err) || apierrors.IsServerTimeout(err) || apierrors.IsServiceUnavailable(err) {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
@ -324,7 +419,7 @@ func PatchNodeOnce(client clientset.Interface, nodeName string, patchFn func(*v1
|
|||||||
func PatchNode(client clientset.Interface, nodeName string, patchFn func(*v1.Node)) error {
|
func PatchNode(client clientset.Interface, nodeName string, patchFn func(*v1.Node)) error {
|
||||||
var lastError error
|
var lastError error
|
||||||
err := wait.PollUntilContextTimeout(context.Background(),
|
err := wait.PollUntilContextTimeout(context.Background(),
|
||||||
constants.KubernetesAPICallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
|
apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
|
||||||
true, PatchNodeOnce(client, nodeName, patchFn, &lastError))
|
true, PatchNodeOnce(client, nodeName, patchFn, &lastError))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user