mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 07:20:13 +00:00
Merge pull request #16067 from smarterclayton/backoff_conflict
Provide a RetryOnConflict helper for client libraries
This commit is contained in:
commit
f3a638de31
@ -17,11 +17,69 @@ limitations under the License.
|
||||
package unversioned
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
)
|
||||
|
||||
// DefaultRetry is the recommended retry for a conflict where multiple clients
|
||||
// are making changes to the same resource.
|
||||
var DefaultRetry = wait.Backoff{
|
||||
Steps: 5,
|
||||
Duration: 10 * time.Millisecond,
|
||||
Factor: 1.0,
|
||||
Jitter: 0.1,
|
||||
}
|
||||
|
||||
// DefaultBackoff is the recommended backoff for a conflict where a client
|
||||
// may be attempting to make an unrelated modification to a resource under
|
||||
// active management by one or more controllers.
|
||||
var DefaultBackoff = wait.Backoff{
|
||||
Steps: 4,
|
||||
Duration: 10 * time.Millisecond,
|
||||
Factor: 5.0,
|
||||
Jitter: 0.1,
|
||||
}
|
||||
|
||||
// RetryConflict executes the provided function repeatedly, retrying if the server returns a conflicting
|
||||
// write. Callers should preserve previous executions if they wish to retry changes. It performs an
|
||||
// exponential backoff.
|
||||
//
|
||||
// var pod *api.Pod
|
||||
// err := RetryOnConflict(DefaultBackoff, func() (err error) {
|
||||
// pod, err = c.Pods("mynamespace").UpdateStatus(podStatus)
|
||||
// return
|
||||
// })
|
||||
// if err != nil {
|
||||
// // may be conflict if max retries were hit
|
||||
// return err
|
||||
// }
|
||||
// ...
|
||||
//
|
||||
// TODO: Make Backoff an interface?
|
||||
func RetryOnConflict(backoff wait.Backoff, fn func() error) error {
|
||||
var lastConflictErr error
|
||||
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
|
||||
err := fn()
|
||||
switch {
|
||||
case err == nil:
|
||||
return true, nil
|
||||
case errors.IsConflict(err):
|
||||
lastConflictErr = err
|
||||
return false, nil
|
||||
default:
|
||||
return false, err
|
||||
}
|
||||
})
|
||||
if err == wait.ErrWaitTimeout {
|
||||
err = lastConflictErr
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// ControllerHasDesiredReplicas returns a condition that will be true if and only if
|
||||
// the desired replica count for a controller's ReplicaSelector equals the Replicas count.
|
||||
func ControllerHasDesiredReplicas(c Interface, controller *api.ReplicationController) wait.ConditionFunc {
|
||||
|
71
pkg/client/unversioned/conditions_test.go
Normal file
71
pkg/client/unversioned/conditions_test.go
Normal file
@ -0,0 +1,71 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package unversioned
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
)
|
||||
|
||||
func TestRetryOnConflict(t *testing.T) {
|
||||
opts := wait.Backoff{Factor: 1.0, Steps: 3}
|
||||
conflictErr := errors.NewConflict(unversioned.GroupResource{Resource: "test"}, "other", nil)
|
||||
|
||||
// never returns
|
||||
err := RetryOnConflict(opts, func() error {
|
||||
return conflictErr
|
||||
})
|
||||
if err != conflictErr {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// returns immediately
|
||||
i := 0
|
||||
err = RetryOnConflict(opts, func() error {
|
||||
i++
|
||||
return nil
|
||||
})
|
||||
if err != nil || i != 1 {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// returns immediately on error
|
||||
testErr := fmt.Errorf("some other error")
|
||||
err = RetryOnConflict(opts, func() error {
|
||||
return testErr
|
||||
})
|
||||
if err != testErr {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// keeps retrying
|
||||
i = 0
|
||||
err = RetryOnConflict(opts, func() error {
|
||||
if i < 2 {
|
||||
i++
|
||||
return errors.NewConflict(unversioned.GroupResource{Resource: "test"}, "other", nil)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil || i != 2 {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
}
|
@ -358,7 +358,7 @@ func (s *ServiceController) persistUpdate(service *api.Service) error {
|
||||
// balancer status. For now, just rely on the fact that we'll
|
||||
// also process the update that caused the resource version to change.
|
||||
if errors.IsConflict(err) {
|
||||
glog.Infof("Not persisting update to service that has been changed since we received it: %v", err)
|
||||
glog.V(4).Infof("Not persisting update to service that has been changed since we received it: %v", err)
|
||||
return nil
|
||||
}
|
||||
glog.Warningf("Failed to persist updated LoadBalancerStatus to service %s after creating its load balancer: %v",
|
||||
|
@ -31,12 +31,20 @@ import (
|
||||
"k8s.io/kubernetes/pkg/registry/secret"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/serviceaccount"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
const NumServiceAccountRemoveReferenceRetries = 10
|
||||
// RemoveTokenBackoff is the recommended (empirical) retry interval for removing
|
||||
// a secret reference from a service account when the secret is deleted. It is
|
||||
// exported for use by custom secret controllers.
|
||||
var RemoveTokenBackoff = wait.Backoff{
|
||||
Steps: 10,
|
||||
Duration: 100 * time.Millisecond,
|
||||
Jitter: 1.0,
|
||||
}
|
||||
|
||||
// TokensControllerOptions contains options for the TokensController
|
||||
type TokensControllerOptions struct {
|
||||
@ -244,16 +252,10 @@ func (e *TokensController) secretDeleted(obj interface{}) {
|
||||
return
|
||||
}
|
||||
|
||||
for i := 1; i <= NumServiceAccountRemoveReferenceRetries; i++ {
|
||||
if _, err := e.removeSecretReferenceIfNeeded(serviceAccount, secret.Name); err != nil {
|
||||
if apierrors.IsConflict(err) && i < NumServiceAccountRemoveReferenceRetries {
|
||||
time.Sleep(wait.Jitter(100*time.Millisecond, 0.0))
|
||||
continue
|
||||
}
|
||||
glog.Error(err)
|
||||
break
|
||||
}
|
||||
break
|
||||
if err := client.RetryOnConflict(RemoveTokenBackoff, func() error {
|
||||
return e.removeSecretReferenceIfNeeded(serviceAccount, secret.Name)
|
||||
}); err != nil {
|
||||
util.HandleError(err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -400,10 +402,10 @@ func (e *TokensController) deleteSecret(secret *api.Secret) error {
|
||||
|
||||
// removeSecretReferenceIfNeeded updates the given ServiceAccount to remove a reference to the given secretName if needed.
|
||||
// Returns whether an update was performed, and any error that occurred
|
||||
func (e *TokensController) removeSecretReferenceIfNeeded(serviceAccount *api.ServiceAccount, secretName string) (bool, error) {
|
||||
func (e *TokensController) removeSecretReferenceIfNeeded(serviceAccount *api.ServiceAccount, secretName string) error {
|
||||
// See if the account even referenced the secret
|
||||
if !getSecretReferences(serviceAccount).Has(secretName) {
|
||||
return false, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// We don't want to update the cache's copy of the service account
|
||||
@ -411,12 +413,12 @@ func (e *TokensController) removeSecretReferenceIfNeeded(serviceAccount *api.Ser
|
||||
serviceAccounts := e.client.ServiceAccounts(serviceAccount.Namespace)
|
||||
serviceAccount, err := serviceAccounts.Get(serviceAccount.Name)
|
||||
if err != nil {
|
||||
return false, err
|
||||
return err
|
||||
}
|
||||
|
||||
// Double-check to see if the account still references the secret
|
||||
if !getSecretReferences(serviceAccount).Has(secretName) {
|
||||
return false, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
secrets := []api.ObjectReference{}
|
||||
@ -429,10 +431,10 @@ func (e *TokensController) removeSecretReferenceIfNeeded(serviceAccount *api.Ser
|
||||
|
||||
_, err = serviceAccounts.Update(serviceAccount)
|
||||
if err != nil {
|
||||
return false, err
|
||||
return err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// getServiceAccount returns the ServiceAccount referenced by the given secret. If the secret is not
|
||||
|
@ -40,6 +40,37 @@ var ErrWaitTimeout = errors.New("timed out waiting for the condition")
|
||||
// if the loop should be aborted.
|
||||
type ConditionFunc func() (done bool, err error)
|
||||
|
||||
// Backoff is parameters applied to a Backoff function.
|
||||
type Backoff struct {
|
||||
Duration time.Duration
|
||||
Factor float64
|
||||
Jitter float64
|
||||
Steps int
|
||||
}
|
||||
|
||||
// ExponentialBackoff repeats a condition check up to steps times, increasing the wait
|
||||
// by multipling the previous duration by factor. If jitter is greater than zero,
|
||||
// a random amount of each duration is added (between duration and duration*(1+jitter)).
|
||||
// If the condition never returns true, ErrWaitTimeout is returned. All other errors
|
||||
// terminate immediately.
|
||||
func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
|
||||
duration := backoff.Duration
|
||||
for i := 0; i < backoff.Steps; i++ {
|
||||
if i != 0 {
|
||||
adjusted := duration
|
||||
if backoff.Jitter > 0.0 {
|
||||
adjusted = Jitter(duration, backoff.Jitter)
|
||||
}
|
||||
time.Sleep(adjusted)
|
||||
duration = time.Duration(float64(duration) * backoff.Factor)
|
||||
}
|
||||
if ok, err := condition(); err != nil || ok {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return ErrWaitTimeout
|
||||
}
|
||||
|
||||
// Poll tries a condition func until it returns true, an error, or the timeout
|
||||
// is reached. condition will always be invoked at least once but some intervals
|
||||
// may be missed if the condition takes too long or the time window is too short.
|
||||
|
@ -18,6 +18,7 @@ package wait
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
@ -26,6 +27,52 @@ import (
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
)
|
||||
|
||||
func TestExponentialBackoff(t *testing.T) {
|
||||
opts := Backoff{Factor: 1.0, Steps: 3}
|
||||
|
||||
// waits up to steps
|
||||
i := 0
|
||||
err := ExponentialBackoff(opts, func() (bool, error) {
|
||||
i++
|
||||
return false, nil
|
||||
})
|
||||
if err != ErrWaitTimeout || i != opts.Steps {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// returns immediately
|
||||
i = 0
|
||||
err = ExponentialBackoff(opts, func() (bool, error) {
|
||||
i++
|
||||
return true, nil
|
||||
})
|
||||
if err != nil || i != 1 {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// returns immediately on error
|
||||
testErr := fmt.Errorf("some other error")
|
||||
err = ExponentialBackoff(opts, func() (bool, error) {
|
||||
return false, testErr
|
||||
})
|
||||
if err != testErr {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// invoked multiple times
|
||||
i = 1
|
||||
err = ExponentialBackoff(opts, func() (bool, error) {
|
||||
if i < opts.Steps {
|
||||
i++
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil || i != opts.Steps {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPoller(t *testing.T) {
|
||||
done := make(chan struct{})
|
||||
defer close(done)
|
||||
|
Loading…
Reference in New Issue
Block a user