From 4342fccc0d6a2e7bc2c398a8bc0b528a29d9bc77 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Wed, 21 Oct 2015 18:16:55 -0400 Subject: [PATCH 1/3] Provide a RetryOnConflict helper for client libraries Used like: var pod *api.Pod err := client.RetryOnConflict(client.DefaultBackoff, func() (err error) { pod, err = c.Pods("mynamespace").UpdateStatus(update) return }) // err may be conflict --- pkg/client/unversioned/conditions.go | 58 ++++++++++++++++++ pkg/client/unversioned/conditions_test.go | 71 +++++++++++++++++++++++ pkg/util/wait/wait.go | 31 ++++++++++ pkg/util/wait/wait_test.go | 47 +++++++++++++++ 4 files changed, 207 insertions(+) create mode 100644 pkg/client/unversioned/conditions_test.go diff --git a/pkg/client/unversioned/conditions.go b/pkg/client/unversioned/conditions.go index d1d370f49a4..4556a643745 100644 --- a/pkg/client/unversioned/conditions.go +++ b/pkg/client/unversioned/conditions.go @@ -17,11 +17,69 @@ limitations under the License. package unversioned import ( + "time" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/util/wait" ) +// DefaultRetry is the recommended retry for a conflict where multiple clients +// are making changes to the same resource. +var DefaultRetry = wait.Backoff{ + Steps: 5, + Duration: 10 * time.Millisecond, + Factor: 1.0, + Jitter: 0.1, +} + +// DefaultBackoff is the recommended backoff for a conflict where a client +// may be attempting to make an unrelated modification to a resource under +// active management by one or more controllers. +var DefaultBackoff = wait.Backoff{ + Steps: 4, + Duration: 10 * time.Millisecond, + Factor: 5.0, + Jitter: 0.1, +} + +// RetryConflict executes the provided function repeatedly, retrying if the server returns a conflicting +// write. Callers should preserve previous executions if they wish to retry changes. It performs an +// exponential backoff. +// +// var pod *api.Pod +// err := RetryOnConflict(DefaultBackoff, func() (err error) { +// pod, err = c.Pods("mynamespace").UpdateStatus(podStatus) +// return +// }) +// if err != nil { +// // may be conflict if max retries were hit +// return err +// } +// ... +// +// TODO: Make Backoff an interface? +func RetryOnConflict(backoff wait.Backoff, fn func() error) error { + var lastConflictErr error + err := wait.ExponentialBackoff(backoff, func() (bool, error) { + err := fn() + switch { + case err == nil: + return true, nil + case errors.IsConflict(err): + lastConflictErr = err + return false, nil + default: + return false, err + } + }) + if err == wait.ErrWaitTimeout { + err = lastConflictErr + } + return err +} + // ControllerHasDesiredReplicas returns a condition that will be true if and only if // the desired replica count for a controller's ReplicaSelector equals the Replicas count. func ControllerHasDesiredReplicas(c Interface, controller *api.ReplicationController) wait.ConditionFunc { diff --git a/pkg/client/unversioned/conditions_test.go b/pkg/client/unversioned/conditions_test.go new file mode 100644 index 00000000000..1042461c095 --- /dev/null +++ b/pkg/client/unversioned/conditions_test.go @@ -0,0 +1,71 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package unversioned + +import ( + "fmt" + "testing" + + "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/util/wait" +) + +func TestRetryOnConflict(t *testing.T) { + opts := wait.Backoff{Factor: 1.0, Steps: 3} + conflictErr := errors.NewConflict(unversioned.GroupResource{Resource: "test"}, "other", nil) + + // never returns + err := RetryOnConflict(opts, func() error { + return conflictErr + }) + if err != conflictErr { + t.Errorf("unexpected error: %v", err) + } + + // returns immediately + i := 0 + err = RetryOnConflict(opts, func() error { + i++ + return nil + }) + if err != nil || i != 1 { + t.Errorf("unexpected error: %v", err) + } + + // returns immediately on error + testErr := fmt.Errorf("some other error") + err = RetryOnConflict(opts, func() error { + return testErr + }) + if err != testErr { + t.Errorf("unexpected error: %v", err) + } + + // keeps retrying + i = 0 + err = RetryOnConflict(opts, func() error { + if i < 2 { + i++ + return errors.NewConflict(unversioned.GroupResource{Resource: "test"}, "other", nil) + } + return nil + }) + if err != nil || i != 2 { + t.Errorf("unexpected error: %v", err) + } +} diff --git a/pkg/util/wait/wait.go b/pkg/util/wait/wait.go index 853c288dfd6..275505f73ff 100644 --- a/pkg/util/wait/wait.go +++ b/pkg/util/wait/wait.go @@ -40,6 +40,37 @@ var ErrWaitTimeout = errors.New("timed out waiting for the condition") // if the loop should be aborted. type ConditionFunc func() (done bool, err error) +// Backoff is parameters applied to a Backoff function. +type Backoff struct { + Duration time.Duration + Factor float64 + Jitter float64 + Steps int +} + +// ExponentialBackoff repeats a condition check up to steps times, increasing the wait +// by multipling the previous duration by factor. If jitter is greater than zero, +// a random amount of each duration is added (between duration and duration*(1+jitter)). +// If the condition never returns true, ErrWaitTimeout is returned. All other errors +// terminate immediately. +func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error { + duration := backoff.Duration + for i := 0; i < backoff.Steps; i++ { + if i != 0 { + adjusted := duration + if backoff.Jitter > 0.0 { + adjusted = Jitter(duration, backoff.Jitter) + } + time.Sleep(adjusted) + duration = time.Duration(float64(duration) * backoff.Factor) + } + if ok, err := condition(); err != nil || ok { + return err + } + } + return ErrWaitTimeout +} + // Poll tries a condition func until it returns true, an error, or the timeout // is reached. condition will always be invoked at least once but some intervals // may be missed if the condition takes too long or the time window is too short. diff --git a/pkg/util/wait/wait_test.go b/pkg/util/wait/wait_test.go index 561a68438e1..09dcb0c4900 100644 --- a/pkg/util/wait/wait_test.go +++ b/pkg/util/wait/wait_test.go @@ -18,6 +18,7 @@ package wait import ( "errors" + "fmt" "sync" "sync/atomic" "testing" @@ -26,6 +27,52 @@ import ( "k8s.io/kubernetes/pkg/util" ) +func TestExponentialBackoff(t *testing.T) { + opts := Backoff{Factor: 1.0, Steps: 3} + + // waits up to steps + i := 0 + err := ExponentialBackoff(opts, func() (bool, error) { + i++ + return false, nil + }) + if err != ErrWaitTimeout || i != opts.Steps { + t.Errorf("unexpected error: %v", err) + } + + // returns immediately + i = 0 + err = ExponentialBackoff(opts, func() (bool, error) { + i++ + return true, nil + }) + if err != nil || i != 1 { + t.Errorf("unexpected error: %v", err) + } + + // returns immediately on error + testErr := fmt.Errorf("some other error") + err = ExponentialBackoff(opts, func() (bool, error) { + return false, testErr + }) + if err != testErr { + t.Errorf("unexpected error: %v", err) + } + + // invoked multiple times + i = 1 + err = ExponentialBackoff(opts, func() (bool, error) { + if i < opts.Steps { + i++ + return false, nil + } + return true, nil + }) + if err != nil || i != opts.Steps { + t.Errorf("unexpected error: %v", err) + } +} + func TestPoller(t *testing.T) { done := make(chan struct{}) defer close(done) From 33660e001a7018c899bea00c7b2569623c43ca89 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Wed, 21 Oct 2015 22:09:05 -0400 Subject: [PATCH 2/3] Update service account tokens controller to use client.RetryOnConflict --- .../serviceaccount/tokens_controller.go | 36 ++++++++++--------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/pkg/controller/serviceaccount/tokens_controller.go b/pkg/controller/serviceaccount/tokens_controller.go index a115fe93622..dce9f4f5291 100644 --- a/pkg/controller/serviceaccount/tokens_controller.go +++ b/pkg/controller/serviceaccount/tokens_controller.go @@ -31,12 +31,20 @@ import ( "k8s.io/kubernetes/pkg/registry/secret" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/serviceaccount" + "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" ) -const NumServiceAccountRemoveReferenceRetries = 10 +// RemoveTokenBackoff is the recommended (empirical) retry interval for removing +// a secret reference from a service account when the secret is deleted. It is +// exported for use by custom secret controllers. +var RemoveTokenBackoff = wait.Backoff{ + Steps: 10, + Duration: 100 * time.Millisecond, + Jitter: 1.0, +} // TokensControllerOptions contains options for the TokensController type TokensControllerOptions struct { @@ -244,16 +252,10 @@ func (e *TokensController) secretDeleted(obj interface{}) { return } - for i := 1; i <= NumServiceAccountRemoveReferenceRetries; i++ { - if _, err := e.removeSecretReferenceIfNeeded(serviceAccount, secret.Name); err != nil { - if apierrors.IsConflict(err) && i < NumServiceAccountRemoveReferenceRetries { - time.Sleep(wait.Jitter(100*time.Millisecond, 0.0)) - continue - } - glog.Error(err) - break - } - break + if err := client.RetryOnConflict(RemoveTokenBackoff, func() error { + return e.removeSecretReferenceIfNeeded(serviceAccount, secret.Name) + }); err != nil { + util.HandleError(err) } } @@ -400,10 +402,10 @@ func (e *TokensController) deleteSecret(secret *api.Secret) error { // removeSecretReferenceIfNeeded updates the given ServiceAccount to remove a reference to the given secretName if needed. // Returns whether an update was performed, and any error that occurred -func (e *TokensController) removeSecretReferenceIfNeeded(serviceAccount *api.ServiceAccount, secretName string) (bool, error) { +func (e *TokensController) removeSecretReferenceIfNeeded(serviceAccount *api.ServiceAccount, secretName string) error { // See if the account even referenced the secret if !getSecretReferences(serviceAccount).Has(secretName) { - return false, nil + return nil } // We don't want to update the cache's copy of the service account @@ -411,12 +413,12 @@ func (e *TokensController) removeSecretReferenceIfNeeded(serviceAccount *api.Ser serviceAccounts := e.client.ServiceAccounts(serviceAccount.Namespace) serviceAccount, err := serviceAccounts.Get(serviceAccount.Name) if err != nil { - return false, err + return err } // Double-check to see if the account still references the secret if !getSecretReferences(serviceAccount).Has(secretName) { - return false, nil + return nil } secrets := []api.ObjectReference{} @@ -429,10 +431,10 @@ func (e *TokensController) removeSecretReferenceIfNeeded(serviceAccount *api.Ser _, err = serviceAccounts.Update(serviceAccount) if err != nil { - return false, err + return err } - return true, nil + return nil } // getServiceAccount returns the ServiceAccount referenced by the given secret. If the secret is not From 29028a1332a71e7386c83a7d4b0229b3ed1b15c7 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Wed, 21 Oct 2015 22:09:22 -0400 Subject: [PATCH 3/3] Fix unrelated log level error --- pkg/controller/service/servicecontroller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/service/servicecontroller.go b/pkg/controller/service/servicecontroller.go index 221645b7d31..f04f1060c82 100644 --- a/pkg/controller/service/servicecontroller.go +++ b/pkg/controller/service/servicecontroller.go @@ -358,7 +358,7 @@ func (s *ServiceController) persistUpdate(service *api.Service) error { // balancer status. For now, just rely on the fact that we'll // also process the update that caused the resource version to change. if errors.IsConflict(err) { - glog.Infof("Not persisting update to service that has been changed since we received it: %v", err) + glog.V(4).Infof("Not persisting update to service that has been changed since we received it: %v", err) return nil } glog.Warningf("Failed to persist updated LoadBalancerStatus to service %s after creating its load balancer: %v",