From 4342fccc0d6a2e7bc2c398a8bc0b528a29d9bc77 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Wed, 21 Oct 2015 18:16:55 -0400 Subject: [PATCH] Provide a RetryOnConflict helper for client libraries Used like: var pod *api.Pod err := client.RetryOnConflict(client.DefaultBackoff, func() (err error) { pod, err = c.Pods("mynamespace").UpdateStatus(update) return }) // err may be conflict --- pkg/client/unversioned/conditions.go | 58 ++++++++++++++++++ pkg/client/unversioned/conditions_test.go | 71 +++++++++++++++++++++++ pkg/util/wait/wait.go | 31 ++++++++++ pkg/util/wait/wait_test.go | 47 +++++++++++++++ 4 files changed, 207 insertions(+) create mode 100644 pkg/client/unversioned/conditions_test.go diff --git a/pkg/client/unversioned/conditions.go b/pkg/client/unversioned/conditions.go index d1d370f49a4..4556a643745 100644 --- a/pkg/client/unversioned/conditions.go +++ b/pkg/client/unversioned/conditions.go @@ -17,11 +17,69 @@ limitations under the License. package unversioned import ( + "time" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/util/wait" ) +// DefaultRetry is the recommended retry for a conflict where multiple clients +// are making changes to the same resource. +var DefaultRetry = wait.Backoff{ + Steps: 5, + Duration: 10 * time.Millisecond, + Factor: 1.0, + Jitter: 0.1, +} + +// DefaultBackoff is the recommended backoff for a conflict where a client +// may be attempting to make an unrelated modification to a resource under +// active management by one or more controllers. +var DefaultBackoff = wait.Backoff{ + Steps: 4, + Duration: 10 * time.Millisecond, + Factor: 5.0, + Jitter: 0.1, +} + +// RetryConflict executes the provided function repeatedly, retrying if the server returns a conflicting +// write. Callers should preserve previous executions if they wish to retry changes. It performs an +// exponential backoff. +// +// var pod *api.Pod +// err := RetryOnConflict(DefaultBackoff, func() (err error) { +// pod, err = c.Pods("mynamespace").UpdateStatus(podStatus) +// return +// }) +// if err != nil { +// // may be conflict if max retries were hit +// return err +// } +// ... +// +// TODO: Make Backoff an interface? +func RetryOnConflict(backoff wait.Backoff, fn func() error) error { + var lastConflictErr error + err := wait.ExponentialBackoff(backoff, func() (bool, error) { + err := fn() + switch { + case err == nil: + return true, nil + case errors.IsConflict(err): + lastConflictErr = err + return false, nil + default: + return false, err + } + }) + if err == wait.ErrWaitTimeout { + err = lastConflictErr + } + return err +} + // ControllerHasDesiredReplicas returns a condition that will be true if and only if // the desired replica count for a controller's ReplicaSelector equals the Replicas count. func ControllerHasDesiredReplicas(c Interface, controller *api.ReplicationController) wait.ConditionFunc { diff --git a/pkg/client/unversioned/conditions_test.go b/pkg/client/unversioned/conditions_test.go new file mode 100644 index 00000000000..1042461c095 --- /dev/null +++ b/pkg/client/unversioned/conditions_test.go @@ -0,0 +1,71 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package unversioned + +import ( + "fmt" + "testing" + + "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/util/wait" +) + +func TestRetryOnConflict(t *testing.T) { + opts := wait.Backoff{Factor: 1.0, Steps: 3} + conflictErr := errors.NewConflict(unversioned.GroupResource{Resource: "test"}, "other", nil) + + // never returns + err := RetryOnConflict(opts, func() error { + return conflictErr + }) + if err != conflictErr { + t.Errorf("unexpected error: %v", err) + } + + // returns immediately + i := 0 + err = RetryOnConflict(opts, func() error { + i++ + return nil + }) + if err != nil || i != 1 { + t.Errorf("unexpected error: %v", err) + } + + // returns immediately on error + testErr := fmt.Errorf("some other error") + err = RetryOnConflict(opts, func() error { + return testErr + }) + if err != testErr { + t.Errorf("unexpected error: %v", err) + } + + // keeps retrying + i = 0 + err = RetryOnConflict(opts, func() error { + if i < 2 { + i++ + return errors.NewConflict(unversioned.GroupResource{Resource: "test"}, "other", nil) + } + return nil + }) + if err != nil || i != 2 { + t.Errorf("unexpected error: %v", err) + } +} diff --git a/pkg/util/wait/wait.go b/pkg/util/wait/wait.go index 853c288dfd6..275505f73ff 100644 --- a/pkg/util/wait/wait.go +++ b/pkg/util/wait/wait.go @@ -40,6 +40,37 @@ var ErrWaitTimeout = errors.New("timed out waiting for the condition") // if the loop should be aborted. type ConditionFunc func() (done bool, err error) +// Backoff is parameters applied to a Backoff function. +type Backoff struct { + Duration time.Duration + Factor float64 + Jitter float64 + Steps int +} + +// ExponentialBackoff repeats a condition check up to steps times, increasing the wait +// by multipling the previous duration by factor. If jitter is greater than zero, +// a random amount of each duration is added (between duration and duration*(1+jitter)). +// If the condition never returns true, ErrWaitTimeout is returned. All other errors +// terminate immediately. +func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error { + duration := backoff.Duration + for i := 0; i < backoff.Steps; i++ { + if i != 0 { + adjusted := duration + if backoff.Jitter > 0.0 { + adjusted = Jitter(duration, backoff.Jitter) + } + time.Sleep(adjusted) + duration = time.Duration(float64(duration) * backoff.Factor) + } + if ok, err := condition(); err != nil || ok { + return err + } + } + return ErrWaitTimeout +} + // Poll tries a condition func until it returns true, an error, or the timeout // is reached. condition will always be invoked at least once but some intervals // may be missed if the condition takes too long or the time window is too short. diff --git a/pkg/util/wait/wait_test.go b/pkg/util/wait/wait_test.go index 561a68438e1..09dcb0c4900 100644 --- a/pkg/util/wait/wait_test.go +++ b/pkg/util/wait/wait_test.go @@ -18,6 +18,7 @@ package wait import ( "errors" + "fmt" "sync" "sync/atomic" "testing" @@ -26,6 +27,52 @@ import ( "k8s.io/kubernetes/pkg/util" ) +func TestExponentialBackoff(t *testing.T) { + opts := Backoff{Factor: 1.0, Steps: 3} + + // waits up to steps + i := 0 + err := ExponentialBackoff(opts, func() (bool, error) { + i++ + return false, nil + }) + if err != ErrWaitTimeout || i != opts.Steps { + t.Errorf("unexpected error: %v", err) + } + + // returns immediately + i = 0 + err = ExponentialBackoff(opts, func() (bool, error) { + i++ + return true, nil + }) + if err != nil || i != 1 { + t.Errorf("unexpected error: %v", err) + } + + // returns immediately on error + testErr := fmt.Errorf("some other error") + err = ExponentialBackoff(opts, func() (bool, error) { + return false, testErr + }) + if err != testErr { + t.Errorf("unexpected error: %v", err) + } + + // invoked multiple times + i = 1 + err = ExponentialBackoff(opts, func() (bool, error) { + if i < opts.Steps { + i++ + return false, nil + } + return true, nil + }) + if err != nil || i != opts.Steps { + t.Errorf("unexpected error: %v", err) + } +} + func TestPoller(t *testing.T) { done := make(chan struct{}) defer close(done)