Merge pull request #27509 from janetkuo/retry-update-e2e-rolling-update

Automatic merge from submit-queue

Retry Pod/RC updates in kubectl rolling-update

Fixes #27328

@kubernetes/kubectl 

[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/.github/PULL_REQUEST_TEMPLATE.md?pixel)]()
This commit is contained in:
k8s-merge-robot 2016-06-21 18:52:43 -07:00 committed by GitHub
commit 24c3be145c
5 changed files with 168 additions and 113 deletions

View File

@ -18,7 +18,6 @@ package unversioned
import ( import (
"fmt" "fmt"
"time"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
@ -29,61 +28,6 @@ import (
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
) )
// DefaultRetry is the recommended retry for a conflict where multiple clients
// are making changes to the same resource.
var DefaultRetry = wait.Backoff{
Steps: 5,
Duration: 10 * time.Millisecond,
Factor: 1.0,
Jitter: 0.1,
}
// DefaultBackoff is the recommended backoff for a conflict where a client
// may be attempting to make an unrelated modification to a resource under
// active management by one or more controllers.
var DefaultBackoff = wait.Backoff{
Steps: 4,
Duration: 10 * time.Millisecond,
Factor: 5.0,
Jitter: 0.1,
}
// RetryConflict executes the provided function repeatedly, retrying if the server returns a conflicting
// write. Callers should preserve previous executions if they wish to retry changes. It performs an
// exponential backoff.
//
// var pod *api.Pod
// err := RetryOnConflict(DefaultBackoff, func() (err error) {
// pod, err = c.Pods("mynamespace").UpdateStatus(podStatus)
// return
// })
// if err != nil {
// // may be conflict if max retries were hit
// return err
// }
// ...
//
// TODO: Make Backoff an interface?
func RetryOnConflict(backoff wait.Backoff, fn func() error) error {
var lastConflictErr error
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
err := fn()
switch {
case err == nil:
return true, nil
case errors.IsConflict(err):
lastConflictErr = err
return false, nil
default:
return false, err
}
})
if err == wait.ErrWaitTimeout {
err = lastConflictErr
}
return err
}
// ControllerHasDesiredReplicas returns a condition that will be true if and only if // ControllerHasDesiredReplicas returns a condition that will be true if and only if
// the desired replica count for a controller's ReplicaSelector equals the Replicas count. // the desired replica count for a controller's ReplicaSelector equals the Replicas count.
func ControllerHasDesiredReplicas(c Interface, controller *api.ReplicationController) wait.ConditionFunc { func ControllerHasDesiredReplicas(c Interface, controller *api.ReplicationController) wait.ConditionFunc {

View File

@ -0,0 +1,79 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package unversioned
import (
"time"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/util/wait"
)
// DefaultRetry is the recommended retry for a conflict where multiple clients
// are making changes to the same resource.
var DefaultRetry = wait.Backoff{
Steps: 5,
Duration: 10 * time.Millisecond,
Factor: 1.0,
Jitter: 0.1,
}
// DefaultBackoff is the recommended backoff for a conflict where a client
// may be attempting to make an unrelated modification to a resource under
// active management by one or more controllers.
var DefaultBackoff = wait.Backoff{
Steps: 4,
Duration: 10 * time.Millisecond,
Factor: 5.0,
Jitter: 0.1,
}
// RetryConflict executes the provided function repeatedly, retrying if the server returns a conflicting
// write. Callers should preserve previous executions if they wish to retry changes. It performs an
// exponential backoff.
//
// var pod *api.Pod
// err := RetryOnConflict(DefaultBackoff, func() (err error) {
// pod, err = c.Pods("mynamespace").UpdateStatus(podStatus)
// return
// })
// if err != nil {
// // may be conflict if max retries were hit
// return err
// }
// ...
//
// TODO: Make Backoff an interface?
func RetryOnConflict(backoff wait.Backoff, fn func() error) error {
var lastConflictErr error
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
err := fn()
switch {
case err == nil:
return true, nil
case errors.IsConflict(err):
lastConflictErr = err
return false, nil
default:
return false, err
}
})
if err == wait.ErrWaitTimeout {
err = lastConflictErr
}
return err
}

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2014 The Kubernetes Authors All rights reserved. Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.

View File

@ -187,15 +187,16 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error {
if err != nil { if err != nil {
return err return err
} }
if existing.Annotations == nil { originReplicas := strconv.Itoa(int(existing.Spec.Replicas))
existing.Annotations = map[string]string{} applyUpdate := func(rc *api.ReplicationController) {
if rc.Annotations == nil {
rc.Annotations = map[string]string{}
}
rc.Annotations[originalReplicasAnnotation] = originReplicas
} }
existing.Annotations[originalReplicasAnnotation] = strconv.Itoa(int(existing.Spec.Replicas)) if oldRc, err = updateRcWithRetries(r.c, existing.Namespace, existing, applyUpdate); err != nil {
updated, err := r.c.ReplicationControllers(existing.Namespace).Update(existing)
if err != nil {
return err return err
} }
oldRc = updated
} }
// maxSurge is the maximum scaling increment and maxUnavailable are the maximum pods // maxSurge is the maximum scaling increment and maxUnavailable are the maximum pods
// that can be unavailable during a rollout. // that can be unavailable during a rollout.
@ -482,13 +483,14 @@ func (r *RollingUpdater) cleanupWithClients(oldRc, newRc *api.ReplicationControl
if err != nil { if err != nil {
return err return err
} }
delete(newRc.Annotations, sourceIdAnnotation) applyUpdate := func(rc *api.ReplicationController) {
delete(newRc.Annotations, desiredReplicasAnnotation) delete(rc.Annotations, sourceIdAnnotation)
delete(rc.Annotations, desiredReplicasAnnotation)
newRc, err = r.c.ReplicationControllers(r.ns).Update(newRc) }
if err != nil { if newRc, err = updateRcWithRetries(r.c, r.ns, newRc, applyUpdate); err != nil {
return err return err
} }
if err = wait.Poll(config.Interval, config.Timeout, client.ControllerHasDesiredReplicas(r.c, newRc)); err != nil { if err = wait.Poll(config.Interval, config.Timeout, client.ControllerHasDesiredReplicas(r.c, newRc)); err != nil {
return err return err
} }
@ -643,27 +645,29 @@ func SetNextControllerAnnotation(rc *api.ReplicationController, name string) {
} }
func UpdateExistingReplicationController(c client.Interface, oldRc *api.ReplicationController, namespace, newName, deploymentKey, deploymentValue string, out io.Writer) (*api.ReplicationController, error) { func UpdateExistingReplicationController(c client.Interface, oldRc *api.ReplicationController, namespace, newName, deploymentKey, deploymentValue string, out io.Writer) (*api.ReplicationController, error) {
SetNextControllerAnnotation(oldRc, newName)
if _, found := oldRc.Spec.Selector[deploymentKey]; !found { if _, found := oldRc.Spec.Selector[deploymentKey]; !found {
SetNextControllerAnnotation(oldRc, newName)
return AddDeploymentKeyToReplicationController(oldRc, c, deploymentKey, deploymentValue, namespace, out) return AddDeploymentKeyToReplicationController(oldRc, c, deploymentKey, deploymentValue, namespace, out)
} else { } else {
// If we didn't need to update the controller for the deployment key, we still need to write // If we didn't need to update the controller for the deployment key, we still need to write
// the "next" controller. // the "next" controller.
return c.ReplicationControllers(namespace).Update(oldRc) applyUpdate := func(rc *api.ReplicationController) {
SetNextControllerAnnotation(rc, newName)
}
return updateRcWithRetries(c, namespace, oldRc, applyUpdate)
} }
} }
const MaxRetries = 3
func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, client client.Interface, deploymentKey, deploymentValue, namespace string, out io.Writer) (*api.ReplicationController, error) { func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, client client.Interface, deploymentKey, deploymentValue, namespace string, out io.Writer) (*api.ReplicationController, error) {
var err error var err error
// First, update the template label. This ensures that any newly created pods will have the new label // First, update the template label. This ensures that any newly created pods will have the new label
if oldRc, err = updateWithRetries(client.ReplicationControllers(namespace), oldRc, func(rc *api.ReplicationController) { applyUpdate := func(rc *api.ReplicationController) {
if rc.Spec.Template.Labels == nil { if rc.Spec.Template.Labels == nil {
rc.Spec.Template.Labels = map[string]string{} rc.Spec.Template.Labels = map[string]string{}
} }
rc.Spec.Template.Labels[deploymentKey] = deploymentValue rc.Spec.Template.Labels[deploymentKey] = deploymentValue
}); err != nil { }
if oldRc, err = updateRcWithRetries(client, namespace, oldRc, applyUpdate); err != nil {
return nil, err return nil, err
} }
@ -677,26 +681,16 @@ func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, c
} }
for ix := range podList.Items { for ix := range podList.Items {
pod := &podList.Items[ix] pod := &podList.Items[ix]
if pod.Labels == nil { applyUpdate := func(p *api.Pod) {
pod.Labels = map[string]string{ if p.Labels == nil {
deploymentKey: deploymentValue, p.Labels = map[string]string{
} deploymentKey: deploymentValue,
} else { }
pod.Labels[deploymentKey] = deploymentValue
}
err = nil
delay := 3
for i := 0; i < MaxRetries; i++ {
_, err = client.Pods(namespace).Update(pod)
if err != nil {
fmt.Fprintf(out, "Error updating pod (%v), retrying after %d seconds", err, delay)
time.Sleep(time.Second * time.Duration(delay))
delay *= delay
} else { } else {
break p.Labels[deploymentKey] = deploymentValue
} }
} }
if err != nil { if pod, err = updatePodWithRetries(client, namespace, pod, applyUpdate); err != nil {
return nil, err return nil, err
} }
} }
@ -709,12 +703,11 @@ func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, c
for k, v := range oldRc.Spec.Selector { for k, v := range oldRc.Spec.Selector {
selectorCopy[k] = v selectorCopy[k] = v
} }
oldRc.Spec.Selector[deploymentKey] = deploymentValue applyUpdate = func(rc *api.ReplicationController) {
// Update the selector of the rc so it manages all the pods we updated above
if oldRc, err = updateWithRetries(client.ReplicationControllers(namespace), oldRc, func(rc *api.ReplicationController) {
rc.Spec.Selector[deploymentKey] = deploymentValue rc.Spec.Selector[deploymentKey] = deploymentValue
}); err != nil { }
// Update the selector of the rc so it manages all the pods we updated above
if oldRc, err = updateRcWithRetries(client, namespace, oldRc, applyUpdate); err != nil {
return nil, err return nil, err
} }
@ -736,33 +729,72 @@ func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, c
return oldRc, nil return oldRc, nil
} }
type updateFunc func(controller *api.ReplicationController) type updateRcFunc func(controller *api.ReplicationController)
// updateWithRetries updates applies the given rc as an update. // updateRcWithRetries retries updating the given rc on conflict with the following steps:
func updateWithRetries(rcClient client.ReplicationControllerInterface, rc *api.ReplicationController, applyUpdate updateFunc) (*api.ReplicationController, error) { // 1. Get latest resource
var err error // 2. applyUpdate
oldRc := rc // 3. Update the resource
err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) { func updateRcWithRetries(c client.Interface, namespace string, rc *api.ReplicationController, applyUpdate updateRcFunc) (*api.ReplicationController, error) {
// Deep copy the rc in case we failed on Get during retry loop
obj, err := api.Scheme.Copy(rc)
if err != nil {
return nil, fmt.Errorf("failed to deep copy rc before updating it: %v", err)
}
oldRc := obj.(*api.ReplicationController)
err = client.RetryOnConflict(client.DefaultBackoff, func() (e error) {
// Apply the update, then attempt to push it to the apiserver. // Apply the update, then attempt to push it to the apiserver.
applyUpdate(rc) applyUpdate(rc)
if rc, err = rcClient.Update(rc); err == nil { if rc, e = c.ReplicationControllers(namespace).Update(rc); e == nil {
// rc contains the latest controller post update // rc contains the latest controller post update
return true, nil return
} }
updateErr := e
// Update the controller with the latest resource version, if the update failed we // Update the controller with the latest resource version, if the update failed we
// can't trust rc so use oldRc.Name. // can't trust rc so use oldRc.Name.
if rc, err = rcClient.Get(oldRc.Name); err != nil { if rc, e = c.ReplicationControllers(namespace).Get(oldRc.Name); e != nil {
// The Get failed: Value in rc cannot be trusted. // The Get failed: Value in rc cannot be trusted.
rc = oldRc rc = oldRc
} }
// The Get passed: rc contains the latest controller, expect a poll for the update. // Only return the error from update
return false, nil return updateErr
}) })
// If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned // If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned
// controller contains the applied update. // controller contains the applied update.
return rc, err return rc, err
} }
type updatePodFunc func(controller *api.Pod)
// updatePodWithRetries retries updating the given pod on conflict with the following steps:
// 1. Get latest resource
// 2. applyUpdate
// 3. Update the resource
func updatePodWithRetries(c client.Interface, namespace string, pod *api.Pod, applyUpdate updatePodFunc) (*api.Pod, error) {
// Deep copy the pod in case we failed on Get during retry loop
obj, err := api.Scheme.Copy(pod)
if err != nil {
return nil, fmt.Errorf("failed to deep copy pod before updating it: %v", err)
}
oldPod := obj.(*api.Pod)
err = client.RetryOnConflict(client.DefaultBackoff, func() (e error) {
// Apply the update, then attempt to push it to the apiserver.
applyUpdate(pod)
if pod, e = c.Pods(namespace).Update(pod); e == nil {
return
}
updateErr := e
if pod, e = c.Pods(namespace).Get(oldPod.Name); e != nil {
pod = oldPod
}
// Only return the error from update
return updateErr
})
// If the error is non-nil the returned pod cannot be trusted, if it is nil, the returned
// controller contains the applied update.
return pod, err
}
func FindSourceController(r client.ReplicationControllersNamespacer, namespace, name string) (*api.ReplicationController, error) { func FindSourceController(r client.ReplicationControllersNamespacer, namespace, name string) (*api.ReplicationController, error) {
list, err := r.ReplicationControllers(namespace).List(api.ListOptions{}) list, err := r.ReplicationControllers(namespace).List(api.ListOptions{})
if err != nil { if err != nil {

View File

@ -1370,7 +1370,7 @@ func TestUpdateExistingReplicationController(t *testing.T) {
} }
} }
func TestUpdateWithRetries(t *testing.T) { func TestUpdateRcWithRetries(t *testing.T) {
codec := testapi.Default.Codec() codec := testapi.Default.Codec()
rc := &api.ReplicationController{ rc := &api.ReplicationController{
ObjectMeta: api.ObjectMeta{Name: "rc", ObjectMeta: api.ObjectMeta{Name: "rc",
@ -1403,8 +1403,8 @@ func TestUpdateWithRetries(t *testing.T) {
header := http.Header{} header := http.Header{}
header.Set("Content-Type", runtime.ContentTypeJSON) header.Set("Content-Type", runtime.ContentTypeJSON)
updates := []*http.Response{ updates := []*http.Response{
{StatusCode: 500, Header: header, Body: objBody(codec, &api.ReplicationController{})}, {StatusCode: 409, Header: header, Body: objBody(codec, &api.ReplicationController{})}, // conflict
{StatusCode: 500, Header: header, Body: objBody(codec, &api.ReplicationController{})}, {StatusCode: 409, Header: header, Body: objBody(codec, &api.ReplicationController{})}, // conflict
{StatusCode: 200, Header: header, Body: objBody(codec, &newRc)}, {StatusCode: 200, Header: header, Body: objBody(codec, &newRc)},
} }
gets := []*http.Response{ gets := []*http.Response{
@ -1442,8 +1442,8 @@ func TestUpdateWithRetries(t *testing.T) {
client := client.NewOrDie(clientConfig) client := client.NewOrDie(clientConfig)
client.Client = fakeClient.Client client.Client = fakeClient.Client
if rc, err := updateWithRetries( if rc, err := updateRcWithRetries(
client.ReplicationControllers("default"), rc, func(c *api.ReplicationController) { client, "default", rc, func(c *api.ReplicationController) {
c.Spec.Selector["baz"] = "foobar" c.Spec.Selector["baz"] = "foobar"
}); err != nil { }); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
@ -1451,7 +1451,7 @@ func TestUpdateWithRetries(t *testing.T) {
t.Errorf("Expected updated rc, got %+v", rc) t.Errorf("Expected updated rc, got %+v", rc)
} }
if len(updates) != 0 || len(gets) != 0 { if len(updates) != 0 || len(gets) != 0 {
t.Errorf("Remaining updates %+v gets %+v", updates, gets) t.Errorf("Remaining updates %#v gets %#v", updates, gets)
} }
} }