diff --git a/hack/test-cmd.sh b/hack/test-cmd.sh index 15ecbbd3d35..6617db87a3e 100755 --- a/hack/test-cmd.sh +++ b/hack/test-cmd.sh @@ -546,8 +546,6 @@ __EOF__ kubectl delete service frontend{,-2,-3} "${kube_flags[@]}" ### Perform a rolling update with --image - # Pre-condition status.Replicas is 3, otherwise the rcmanager could update it and interfere with the rolling update - kube::test::get_object_assert 'rc frontend' "{{$rc_status_replicas_field}}" '3' # Command kubectl rolling-update frontend --image=kubernetes/pause --update-period=10ns --poll-interval=10ms "${kube_flags[@]}" # Post-condition: current image IS kubernetes/pause diff --git a/pkg/kubectl/cmd/rollingupdate.go b/pkg/kubectl/cmd/rollingupdate.go index c91f4d57b3b..3294d31ce8b 100644 --- a/pkg/kubectl/cmd/rollingupdate.go +++ b/pkg/kubectl/cmd/rollingupdate.go @@ -34,6 +34,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" "github.com/spf13/cobra" ) @@ -287,6 +288,34 @@ func hashObject(obj runtime.Object, codec runtime.Codec) (string, error) { const MaxRetries = 3 +type updateFunc func(controller *api.ReplicationController) + +// updateWithRetries updates applies the given rc as an update. +func updateWithRetries(rcClient client.ReplicationControllerInterface, rc *api.ReplicationController, applyUpdate updateFunc) (*api.ReplicationController, error) { + // Each update could take ~100ms, so give it 0.5 second + var err error + oldRc := rc + err = wait.Poll(10*time.Millisecond, 500*time.Millisecond, func() (bool, error) { + // Apply the update, then attempt to push it to the apiserver. + applyUpdate(rc) + if rc, err = rcClient.Update(rc); err == nil { + // rc contains the latest controller post update + return true, nil + } + // Update the controller with the latest resource version, if the update failed we + // can't trust rc so use oldRc.Name. + if rc, err = rcClient.Get(oldRc.Name); err != nil { + // The Get failed: Value in rc cannot be trusted. + rc = oldRc + } + // The Get passed: rc contains the latest controller, expect a poll for the update. + return false, nil + }) + // If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned + // controller contains the applied update. + return rc, err +} + func addDeploymentKeyToReplicationController(oldRc *api.ReplicationController, client *client.Client, deploymentKey, namespace string, out io.Writer) (*api.ReplicationController, error) { oldHash, err := hashObject(oldRc, client.Codec) if err != nil { @@ -296,8 +325,9 @@ func addDeploymentKeyToReplicationController(oldRc *api.ReplicationController, c if oldRc.Spec.Template.Labels == nil { oldRc.Spec.Template.Labels = map[string]string{} } - oldRc.Spec.Template.Labels[deploymentKey] = oldHash - if oldRc, err = client.ReplicationControllers(namespace).Update(oldRc); err != nil { + if oldRc, err = updateWithRetries(client.ReplicationControllers(namespace), oldRc, func(rc *api.ReplicationController) { + rc.Spec.Template.Labels[deploymentKey] = oldHash + }); err != nil { return nil, err } @@ -341,10 +371,11 @@ func addDeploymentKeyToReplicationController(oldRc *api.ReplicationController, c for k, v := range oldRc.Spec.Selector { selectorCopy[k] = v } - oldRc.Spec.Selector[deploymentKey] = oldHash // Update the selector of the rc so it manages all the pods we updated above - if oldRc, err = client.ReplicationControllers(namespace).Update(oldRc); err != nil { + if oldRc, err = updateWithRetries(client.ReplicationControllers(namespace), oldRc, func(rc *api.ReplicationController) { + rc.Spec.Selector[deploymentKey] = oldHash + }); err != nil { return nil, err } diff --git a/pkg/kubectl/cmd/rollingupdate_test.go b/pkg/kubectl/cmd/rollingupdate_test.go index f8a622f2ab4..06288a413bc 100644 --- a/pkg/kubectl/cmd/rollingupdate_test.go +++ b/pkg/kubectl/cmd/rollingupdate_test.go @@ -20,6 +20,7 @@ import ( "bytes" "io/ioutil" "net/http" + "reflect" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -183,6 +184,96 @@ func TestAddDeploymentHash(t *testing.T) { } } +func TestUpdateWithRetries(t *testing.T) { + f, tf, codec := NewAPIFactory() + rc := &api.ReplicationController{ + ObjectMeta: api.ObjectMeta{Name: "rc", + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: api.ReplicationControllerSpec{ + Selector: map[string]string{ + "foo": "bar", + }, + Template: &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: api.PodSpec{ + RestartPolicy: api.RestartPolicyAlways, + DNSPolicy: api.DNSClusterFirst, + }, + }, + }, + } + + // Test end to end updating of the rc with retries. Essentially make sure the update handler + // sees the right updates, failures in update/get are handled properly, and that the updated + // rc with new resource version is returned to the caller. Without any of these rollingupdate + // will fail cryptically. + newRc := *rc + newRc.ResourceVersion = "2" + newRc.Spec.Selector["baz"] = "foobar" + updates := []*http.Response{ + {StatusCode: 500, Body: objBody(codec, &api.ReplicationController{})}, + {StatusCode: 500, Body: objBody(codec, &api.ReplicationController{})}, + {StatusCode: 200, Body: objBody(codec, &newRc)}, + } + gets := []*http.Response{ + {StatusCode: 500, Body: objBody(codec, &api.ReplicationController{})}, + {StatusCode: 200, Body: objBody(codec, rc)}, + } + tf.Client = &client.FakeRESTClient{ + Codec: codec, + Client: client.HTTPClientFunc(func(req *http.Request) (*http.Response, error) { + switch p, m := req.URL.Path, req.Method; { + case p == "/api/v1beta3/namespaces/default/replicationcontrollers/rc" && m == "PUT": + update := updates[0] + updates = updates[1:] + // We should always get an update with a valid rc even when the get fails. The rc should always + // contain the update. + if c, ok := readOrDie(t, req, codec).(*api.ReplicationController); !ok || !reflect.DeepEqual(rc, c) { + t.Errorf("Unexpected update body, got %+v expected %+v", c, rc) + } else if sel, ok := c.Spec.Selector["baz"]; !ok || sel != "foobar" { + t.Errorf("Expected selector label update, got %+v", c.Spec.Selector) + } else { + delete(c.Spec.Selector, "baz") + } + return update, nil + case p == "/api/v1beta3/namespaces/default/replicationcontrollers/rc" && m == "GET": + get := gets[0] + gets = gets[1:] + return get, nil + default: + t.Fatalf("unexpected request: %#v\n%#v", req.URL, req) + return nil, nil + } + }), + } + tf.ClientConfig = &client.Config{Version: latest.Version} + client, err := f.Client() + if err != nil { + t.Errorf("unexpected error: %v", err) + t.Fail() + return + } + + if rc, err := updateWithRetries( + client.ReplicationControllers("default"), rc, func(c *api.ReplicationController) { + c.Spec.Selector["baz"] = "foobar" + }); err != nil { + t.Errorf("unexpected error: %v", err) + } else if sel, ok := rc.Spec.Selector["baz"]; !ok || sel != "foobar" || rc.ResourceVersion != "2" { + t.Errorf("Expected updated rc, got %+v", rc) + } + if len(updates) != 0 || len(gets) != 0 { + t.Errorf("Remaining updates %+v gets %+v", updates, gets) + } +} + func readOrDie(t *testing.T, req *http.Request, codec runtime.Codec) runtime.Object { data, err := ioutil.ReadAll(req.Body) if err != nil {