diff --git a/pkg/client/unversioned/conditions.go b/pkg/client/unversioned/conditions.go index a61674e2ba8..f68f98fe121 100644 --- a/pkg/client/unversioned/conditions.go +++ b/pkg/client/unversioned/conditions.go @@ -18,7 +18,6 @@ package unversioned import ( "fmt" - "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" @@ -29,61 +28,6 @@ import ( "k8s.io/kubernetes/pkg/watch" ) -// DefaultRetry is the recommended retry for a conflict where multiple clients -// are making changes to the same resource. -var DefaultRetry = wait.Backoff{ - Steps: 5, - Duration: 10 * time.Millisecond, - Factor: 1.0, - Jitter: 0.1, -} - -// DefaultBackoff is the recommended backoff for a conflict where a client -// may be attempting to make an unrelated modification to a resource under -// active management by one or more controllers. -var DefaultBackoff = wait.Backoff{ - Steps: 4, - Duration: 10 * time.Millisecond, - Factor: 5.0, - Jitter: 0.1, -} - -// RetryConflict executes the provided function repeatedly, retrying if the server returns a conflicting -// write. Callers should preserve previous executions if they wish to retry changes. It performs an -// exponential backoff. -// -// var pod *api.Pod -// err := RetryOnConflict(DefaultBackoff, func() (err error) { -// pod, err = c.Pods("mynamespace").UpdateStatus(podStatus) -// return -// }) -// if err != nil { -// // may be conflict if max retries were hit -// return err -// } -// ... -// -// TODO: Make Backoff an interface? -func RetryOnConflict(backoff wait.Backoff, fn func() error) error { - var lastConflictErr error - err := wait.ExponentialBackoff(backoff, func() (bool, error) { - err := fn() - switch { - case err == nil: - return true, nil - case errors.IsConflict(err): - lastConflictErr = err - return false, nil - default: - return false, err - } - }) - if err == wait.ErrWaitTimeout { - err = lastConflictErr - } - return err -} - // ControllerHasDesiredReplicas returns a condition that will be true if and only if // the desired replica count for a controller's ReplicaSelector equals the Replicas count. func ControllerHasDesiredReplicas(c Interface, controller *api.ReplicationController) wait.ConditionFunc { diff --git a/pkg/client/unversioned/util.go b/pkg/client/unversioned/util.go new file mode 100644 index 00000000000..37ada3c3d67 --- /dev/null +++ b/pkg/client/unversioned/util.go @@ -0,0 +1,79 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package unversioned + +import ( + "time" + + "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/util/wait" +) + +// DefaultRetry is the recommended retry for a conflict where multiple clients +// are making changes to the same resource. +var DefaultRetry = wait.Backoff{ + Steps: 5, + Duration: 10 * time.Millisecond, + Factor: 1.0, + Jitter: 0.1, +} + +// DefaultBackoff is the recommended backoff for a conflict where a client +// may be attempting to make an unrelated modification to a resource under +// active management by one or more controllers. +var DefaultBackoff = wait.Backoff{ + Steps: 4, + Duration: 10 * time.Millisecond, + Factor: 5.0, + Jitter: 0.1, +} + +// RetryConflict executes the provided function repeatedly, retrying if the server returns a conflicting +// write. Callers should preserve previous executions if they wish to retry changes. It performs an +// exponential backoff. +// +// var pod *api.Pod +// err := RetryOnConflict(DefaultBackoff, func() (err error) { +// pod, err = c.Pods("mynamespace").UpdateStatus(podStatus) +// return +// }) +// if err != nil { +// // may be conflict if max retries were hit +// return err +// } +// ... +// +// TODO: Make Backoff an interface? +func RetryOnConflict(backoff wait.Backoff, fn func() error) error { + var lastConflictErr error + err := wait.ExponentialBackoff(backoff, func() (bool, error) { + err := fn() + switch { + case err == nil: + return true, nil + case errors.IsConflict(err): + lastConflictErr = err + return false, nil + default: + return false, err + } + }) + if err == wait.ErrWaitTimeout { + err = lastConflictErr + } + return err +} diff --git a/pkg/client/unversioned/conditions_test.go b/pkg/client/unversioned/util_test.go similarity index 96% rename from pkg/client/unversioned/conditions_test.go rename to pkg/client/unversioned/util_test.go index 1042461c095..8a5826c22be 100644 --- a/pkg/client/unversioned/conditions_test.go +++ b/pkg/client/unversioned/util_test.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors All rights reserved. +Copyright 2016 The Kubernetes Authors All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/kubectl/rolling_updater.go b/pkg/kubectl/rolling_updater.go index ea5bb887c37..0af1253ad61 100644 --- a/pkg/kubectl/rolling_updater.go +++ b/pkg/kubectl/rolling_updater.go @@ -187,15 +187,16 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error { if err != nil { return err } - if existing.Annotations == nil { - existing.Annotations = map[string]string{} + originReplicas := strconv.Itoa(int(existing.Spec.Replicas)) + applyUpdate := func(rc *api.ReplicationController) { + if rc.Annotations == nil { + rc.Annotations = map[string]string{} + } + rc.Annotations[originalReplicasAnnotation] = originReplicas } - existing.Annotations[originalReplicasAnnotation] = strconv.Itoa(int(existing.Spec.Replicas)) - updated, err := r.c.ReplicationControllers(existing.Namespace).Update(existing) - if err != nil { + if oldRc, err = updateRcWithRetries(r.c, existing.Namespace, existing, applyUpdate); err != nil { return err } - oldRc = updated } // maxSurge is the maximum scaling increment and maxUnavailable are the maximum pods // that can be unavailable during a rollout. @@ -482,13 +483,14 @@ func (r *RollingUpdater) cleanupWithClients(oldRc, newRc *api.ReplicationControl if err != nil { return err } - delete(newRc.Annotations, sourceIdAnnotation) - delete(newRc.Annotations, desiredReplicasAnnotation) - - newRc, err = r.c.ReplicationControllers(r.ns).Update(newRc) - if err != nil { + applyUpdate := func(rc *api.ReplicationController) { + delete(rc.Annotations, sourceIdAnnotation) + delete(rc.Annotations, desiredReplicasAnnotation) + } + if newRc, err = updateRcWithRetries(r.c, r.ns, newRc, applyUpdate); err != nil { return err } + if err = wait.Poll(config.Interval, config.Timeout, client.ControllerHasDesiredReplicas(r.c, newRc)); err != nil { return err } @@ -643,27 +645,29 @@ func SetNextControllerAnnotation(rc *api.ReplicationController, name string) { } func UpdateExistingReplicationController(c client.Interface, oldRc *api.ReplicationController, namespace, newName, deploymentKey, deploymentValue string, out io.Writer) (*api.ReplicationController, error) { - SetNextControllerAnnotation(oldRc, newName) if _, found := oldRc.Spec.Selector[deploymentKey]; !found { + SetNextControllerAnnotation(oldRc, newName) return AddDeploymentKeyToReplicationController(oldRc, c, deploymentKey, deploymentValue, namespace, out) } else { // If we didn't need to update the controller for the deployment key, we still need to write // the "next" controller. - return c.ReplicationControllers(namespace).Update(oldRc) + applyUpdate := func(rc *api.ReplicationController) { + SetNextControllerAnnotation(rc, newName) + } + return updateRcWithRetries(c, namespace, oldRc, applyUpdate) } } -const MaxRetries = 3 - func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, client client.Interface, deploymentKey, deploymentValue, namespace string, out io.Writer) (*api.ReplicationController, error) { var err error // First, update the template label. This ensures that any newly created pods will have the new label - if oldRc, err = updateWithRetries(client.ReplicationControllers(namespace), oldRc, func(rc *api.ReplicationController) { + applyUpdate := func(rc *api.ReplicationController) { if rc.Spec.Template.Labels == nil { rc.Spec.Template.Labels = map[string]string{} } rc.Spec.Template.Labels[deploymentKey] = deploymentValue - }); err != nil { + } + if oldRc, err = updateRcWithRetries(client, namespace, oldRc, applyUpdate); err != nil { return nil, err } @@ -677,26 +681,16 @@ func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, c } for ix := range podList.Items { pod := &podList.Items[ix] - if pod.Labels == nil { - pod.Labels = map[string]string{ - deploymentKey: deploymentValue, - } - } else { - pod.Labels[deploymentKey] = deploymentValue - } - err = nil - delay := 3 - for i := 0; i < MaxRetries; i++ { - _, err = client.Pods(namespace).Update(pod) - if err != nil { - fmt.Fprintf(out, "Error updating pod (%v), retrying after %d seconds", err, delay) - time.Sleep(time.Second * time.Duration(delay)) - delay *= delay + applyUpdate := func(p *api.Pod) { + if p.Labels == nil { + p.Labels = map[string]string{ + deploymentKey: deploymentValue, + } } else { - break + p.Labels[deploymentKey] = deploymentValue } } - if err != nil { + if pod, err = updatePodWithRetries(client, namespace, pod, applyUpdate); err != nil { return nil, err } } @@ -709,12 +703,11 @@ func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, c for k, v := range oldRc.Spec.Selector { selectorCopy[k] = v } - oldRc.Spec.Selector[deploymentKey] = deploymentValue - - // Update the selector of the rc so it manages all the pods we updated above - if oldRc, err = updateWithRetries(client.ReplicationControllers(namespace), oldRc, func(rc *api.ReplicationController) { + applyUpdate = func(rc *api.ReplicationController) { rc.Spec.Selector[deploymentKey] = deploymentValue - }); err != nil { + } + // Update the selector of the rc so it manages all the pods we updated above + if oldRc, err = updateRcWithRetries(client, namespace, oldRc, applyUpdate); err != nil { return nil, err } @@ -736,33 +729,72 @@ func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, c return oldRc, nil } -type updateFunc func(controller *api.ReplicationController) +type updateRcFunc func(controller *api.ReplicationController) -// updateWithRetries updates applies the given rc as an update. -func updateWithRetries(rcClient client.ReplicationControllerInterface, rc *api.ReplicationController, applyUpdate updateFunc) (*api.ReplicationController, error) { - var err error - oldRc := rc - err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) { +// updateRcWithRetries retries updating the given rc on conflict with the following steps: +// 1. Get latest resource +// 2. applyUpdate +// 3. Update the resource +func updateRcWithRetries(c client.Interface, namespace string, rc *api.ReplicationController, applyUpdate updateRcFunc) (*api.ReplicationController, error) { + // Deep copy the rc in case we failed on Get during retry loop + obj, err := api.Scheme.Copy(rc) + if err != nil { + return nil, fmt.Errorf("failed to deep copy rc before updating it: %v", err) + } + oldRc := obj.(*api.ReplicationController) + err = client.RetryOnConflict(client.DefaultBackoff, func() (e error) { // Apply the update, then attempt to push it to the apiserver. applyUpdate(rc) - if rc, err = rcClient.Update(rc); err == nil { + if rc, e = c.ReplicationControllers(namespace).Update(rc); e == nil { // rc contains the latest controller post update - return true, nil + return } + updateErr := e // Update the controller with the latest resource version, if the update failed we // can't trust rc so use oldRc.Name. - if rc, err = rcClient.Get(oldRc.Name); err != nil { + if rc, e = c.ReplicationControllers(namespace).Get(oldRc.Name); e != nil { // The Get failed: Value in rc cannot be trusted. rc = oldRc } - // The Get passed: rc contains the latest controller, expect a poll for the update. - return false, nil + // Only return the error from update + return updateErr }) // If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned // controller contains the applied update. return rc, err } +type updatePodFunc func(controller *api.Pod) + +// updatePodWithRetries retries updating the given pod on conflict with the following steps: +// 1. Get latest resource +// 2. applyUpdate +// 3. Update the resource +func updatePodWithRetries(c client.Interface, namespace string, pod *api.Pod, applyUpdate updatePodFunc) (*api.Pod, error) { + // Deep copy the pod in case we failed on Get during retry loop + obj, err := api.Scheme.Copy(pod) + if err != nil { + return nil, fmt.Errorf("failed to deep copy pod before updating it: %v", err) + } + oldPod := obj.(*api.Pod) + err = client.RetryOnConflict(client.DefaultBackoff, func() (e error) { + // Apply the update, then attempt to push it to the apiserver. + applyUpdate(pod) + if pod, e = c.Pods(namespace).Update(pod); e == nil { + return + } + updateErr := e + if pod, e = c.Pods(namespace).Get(oldPod.Name); e != nil { + pod = oldPod + } + // Only return the error from update + return updateErr + }) + // If the error is non-nil the returned pod cannot be trusted, if it is nil, the returned + // controller contains the applied update. + return pod, err +} + func FindSourceController(r client.ReplicationControllersNamespacer, namespace, name string) (*api.ReplicationController, error) { list, err := r.ReplicationControllers(namespace).List(api.ListOptions{}) if err != nil { diff --git a/pkg/kubectl/rolling_updater_test.go b/pkg/kubectl/rolling_updater_test.go index d55dd7dbb4b..9abe440569b 100644 --- a/pkg/kubectl/rolling_updater_test.go +++ b/pkg/kubectl/rolling_updater_test.go @@ -1370,7 +1370,7 @@ func TestUpdateExistingReplicationController(t *testing.T) { } } -func TestUpdateWithRetries(t *testing.T) { +func TestUpdateRcWithRetries(t *testing.T) { codec := testapi.Default.Codec() rc := &api.ReplicationController{ ObjectMeta: api.ObjectMeta{Name: "rc", @@ -1403,8 +1403,8 @@ func TestUpdateWithRetries(t *testing.T) { header := http.Header{} header.Set("Content-Type", runtime.ContentTypeJSON) updates := []*http.Response{ - {StatusCode: 500, Header: header, Body: objBody(codec, &api.ReplicationController{})}, - {StatusCode: 500, Header: header, Body: objBody(codec, &api.ReplicationController{})}, + {StatusCode: 409, Header: header, Body: objBody(codec, &api.ReplicationController{})}, // conflict + {StatusCode: 409, Header: header, Body: objBody(codec, &api.ReplicationController{})}, // conflict {StatusCode: 200, Header: header, Body: objBody(codec, &newRc)}, } gets := []*http.Response{ @@ -1442,8 +1442,8 @@ func TestUpdateWithRetries(t *testing.T) { client := client.NewOrDie(clientConfig) client.Client = fakeClient.Client - if rc, err := updateWithRetries( - client.ReplicationControllers("default"), rc, func(c *api.ReplicationController) { + if rc, err := updateRcWithRetries( + client, "default", rc, func(c *api.ReplicationController) { c.Spec.Selector["baz"] = "foobar" }); err != nil { t.Errorf("unexpected error: %v", err) @@ -1451,7 +1451,7 @@ func TestUpdateWithRetries(t *testing.T) { t.Errorf("Expected updated rc, got %+v", rc) } if len(updates) != 0 || len(gets) != 0 { - t.Errorf("Remaining updates %+v gets %+v", updates, gets) + t.Errorf("Remaining updates %#v gets %#v", updates, gets) } }