diff --git a/pkg/api/helpers.go b/pkg/api/helpers.go index c09c8d570bb..0c589518eb5 100644 --- a/pkg/api/helpers.go +++ b/pkg/api/helpers.go @@ -17,12 +17,15 @@ limitations under the License. package api import ( + "crypto/md5" + "fmt" "reflect" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/conversion" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/davecgh/go-spew/spew" @@ -130,3 +133,11 @@ func AddToNodeAddresses(addresses *[]NodeAddress, addAddresses ...NodeAddress) { } } } + +func HashObject(obj runtime.Object, codec runtime.Codec) (string, error) { + data, err := codec.Encode(obj) + if err != nil { + return "", err + } + return fmt.Sprintf("%x", md5.Sum(data)), nil +} diff --git a/pkg/kubectl/cmd/rollingupdate.go b/pkg/kubectl/cmd/rollingupdate.go index 3e161f29fed..f433e14ea2d 100644 --- a/pkg/kubectl/cmd/rollingupdate.go +++ b/pkg/kubectl/cmd/rollingupdate.go @@ -18,23 +18,15 @@ package cmd import ( "bytes" - "crypto/md5" - "errors" "fmt" "io" "os" - "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - apierrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl" cmdutil "github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl/cmd/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl/resource" - "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" "github.com/spf13/cobra" ) @@ -131,16 +123,26 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg return err } + updaterClient := kubectl.NewRollingUpdaterClient(client) + + var newRc *api.ReplicationController // fetch rc oldRc, err := client.ReplicationControllers(cmdNamespace).Get(oldName) if err != nil { - return err + if !errors.IsNotFound(err) || len(image) == 0 || len(args) > 1 { + return err + } + // We're in the middle of a rename, look for an RC with a source annotation of oldName + newRc, err := kubectl.FindSourceController(updaterClient, cmdNamespace, oldName) + if err != nil { + return err + } + return kubectl.Rename(kubectl.NewRollingUpdaterClient(client), newRc, oldName) } - keepOldName := false + var keepOldName bool mapper, typer := f.Object() - var newRc *api.ReplicationController if len(filename) != 0 { obj, err := resource.NewBuilder(mapper, typer, f.ClientMapperForCommand()). @@ -161,73 +163,36 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg // than the old rc. This selector is the hash of the rc, which will differ because the new rc has a // different image. if len(image) != 0 { - var newName string - var err error - - if len(args) >= 2 { - newName = args[1] + keepOldName = len(args) == 1 + newName := findNewName(args, oldRc) + if newRc, err = kubectl.LoadExistingNextReplicationController(client, cmdNamespace, newName); err != nil { + return err + } + if newRc != nil { + fmt.Fprintf(out, "Found existing update in progress (%s), resuming.\n", newRc.Name) } else { - newName, _ = kubectl.GetNextControllerAnnotation(oldRc) - } - - if len(newName) > 0 { - newRc, err = client.ReplicationControllers(cmdNamespace).Get(newName) - if err != nil { - if !apierrors.IsNotFound(err) { - return err - } else { - newRc = nil - } - } else { - fmt.Fprint(out, "Found existing update in progress (%s), resuming.\n", newName) - } - } - if newRc == nil { - // load the old RC into the "new" RC - if newRc, err = client.ReplicationControllers(cmdNamespace).Get(oldName); err != nil { - return err - } - - if len(newRc.Spec.Template.Spec.Containers) > 1 { - // TODO: support multi-container image update. - return errors.New("Image update is not supported for multi-container pods") - } - if len(newRc.Spec.Template.Spec.Containers) == 0 { - return cmdutil.UsageError(cmd, "Pod has no containers! (%v)", newRc) - } - newRc.Spec.Template.Spec.Containers[0].Image = image - - newHash, err := hashObject(newRc, client.Codec) + newRc, err = kubectl.CreateNewControllerFromCurrentController(client, cmdNamespace, oldName, newName, image, deploymentKey) if err != nil { return err } - - if len(newName) == 0 { - keepOldName = true - newName = fmt.Sprintf("%s-%s", newRc.Name, newHash) - } - newRc.Name = newName - - newRc.Spec.Selector[deploymentKey] = newHash - newRc.Spec.Template.Labels[deploymentKey] = newHash - // Clear resource version after hashing so that identical updates get different hashes. - newRc.ResourceVersion = "" - - kubectl.SetNextControllerAnnotation(oldRc, newName) - if _, found := oldRc.Spec.Selector[deploymentKey]; !found { - if oldRc, err = addDeploymentKeyToReplicationController(oldRc, client, deploymentKey, cmdNamespace, out); err != nil { - return err - } - } + } + // Update the existing replication controller with pointers to the 'next' controller + // and adding the label if necessary to distinguish it from the 'next' controller. + oldHash, err := api.HashObject(oldRc, client.Codec) + if err != nil { + return err + } + oldRc, err = kubectl.UpdateExistingReplicationController(client, oldRc, cmdNamespace, newRc.Name, deploymentKey, oldHash, out) + if err != nil { + return err } } - newName := newRc.Name - if oldName == newName { + if oldName == newRc.Name { return cmdutil.UsageError(cmd, "%s cannot have the same name as the existing ReplicationController %s", filename, oldName) } - updater := kubectl.NewRollingUpdater(newRc.Namespace, kubectl.NewRollingUpdaterClient(client)) + updater := kubectl.NewRollingUpdater(newRc.Namespace, updaterClient) // To successfully pull off a rolling update the new and old rc have to differ // by at least one selector. Every new pod should have the selector and every @@ -284,124 +249,18 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg if keepOldName { fmt.Fprintf(out, "%s\n", oldName) } else { - fmt.Fprintf(out, "%s\n", newName) + fmt.Fprintf(out, "%s\n", newRc.Name) } return nil } -func hashObject(obj runtime.Object, codec runtime.Codec) (string, error) { - data, err := codec.Encode(obj) - if err != nil { - return "", err +func findNewName(args []string, oldRc *api.ReplicationController) string { + if len(args) >= 2 { + return args[1] } - return fmt.Sprintf("%x", md5.Sum(data)), nil -} - -const MaxRetries = 3 - -type updateFunc func(controller *api.ReplicationController) - -// updateWithRetries updates applies the given rc as an update. -func updateWithRetries(rcClient client.ReplicationControllerInterface, rc *api.ReplicationController, applyUpdate updateFunc) (*api.ReplicationController, error) { - // Each update could take ~100ms, so give it 0.5 second - var err error - oldRc := rc - err = wait.Poll(10*time.Millisecond, 500*time.Millisecond, func() (bool, error) { - // Apply the update, then attempt to push it to the apiserver. - applyUpdate(rc) - if rc, err = rcClient.Update(rc); err == nil { - // rc contains the latest controller post update - return true, nil - } - // Update the controller with the latest resource version, if the update failed we - // can't trust rc so use oldRc.Name. - if rc, err = rcClient.Get(oldRc.Name); err != nil { - // The Get failed: Value in rc cannot be trusted. - rc = oldRc - } - // The Get passed: rc contains the latest controller, expect a poll for the update. - return false, nil - }) - // If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned - // controller contains the applied update. - return rc, err -} - -func addDeploymentKeyToReplicationController(oldRc *api.ReplicationController, client *client.Client, deploymentKey, namespace string, out io.Writer) (*api.ReplicationController, error) { - oldHash, err := hashObject(oldRc, client.Codec) - if err != nil { - return nil, err - } - // First, update the template label. This ensures that any newly created pods will have the new label - if oldRc.Spec.Template.Labels == nil { - oldRc.Spec.Template.Labels = map[string]string{} - } - if oldRc, err = updateWithRetries(client.ReplicationControllers(namespace), oldRc, func(rc *api.ReplicationController) { - rc.Spec.Template.Labels[deploymentKey] = oldHash - }); err != nil { - return nil, err - } - - // Update all pods managed by the rc to have the new hash label, so they are correctly adopted - // TODO: extract the code from the label command and re-use it here. - podList, err := client.Pods(namespace).List(labels.SelectorFromSet(oldRc.Spec.Selector), fields.Everything()) - if err != nil { - return nil, err - } - for ix := range podList.Items { - pod := &podList.Items[ix] - if pod.Labels == nil { - pod.Labels = map[string]string{ - deploymentKey: oldHash, - } - } else { - pod.Labels[deploymentKey] = oldHash - } - err = nil - delay := 3 - for i := 0; i < MaxRetries; i++ { - _, err = client.Pods(namespace).Update(pod) - if err != nil { - fmt.Fprint(out, "Error updating pod (%v), retrying after %d seconds", err, delay) - time.Sleep(time.Second * time.Duration(delay)) - delay *= delay - } else { - break - } - } - if err != nil { - return nil, err - } - } - - if oldRc.Spec.Selector == nil { - oldRc.Spec.Selector = map[string]string{} - } - // Copy the old selector, so that we can scrub out any orphaned pods - selectorCopy := map[string]string{} - for k, v := range oldRc.Spec.Selector { - selectorCopy[k] = v - } - - // Update the selector of the rc so it manages all the pods we updated above - if oldRc, err = updateWithRetries(client.ReplicationControllers(namespace), oldRc, func(rc *api.ReplicationController) { - rc.Spec.Selector[deploymentKey] = oldHash - }); err != nil { - return nil, err - } - - // Clean up any orphaned pods that don't have the new label, this can happen if the rc manager - // doesn't see the update to its pod template and creates a new pod with the old labels after - // we've finished re-adopting existing pods to the rc. - podList, err = client.Pods(namespace).List(labels.SelectorFromSet(selectorCopy), fields.Everything()) - for ix := range podList.Items { - pod := &podList.Items[ix] - if value, found := pod.Labels[deploymentKey]; !found || value != oldHash { - if err := client.Pods(namespace).Delete(pod.Name, nil); err != nil { - return nil, err - } - } - } - - return oldRc, nil + if oldRc != nil { + newName, _ := kubectl.GetNextControllerAnnotation(oldRc) + return newName + } + return "" } diff --git a/pkg/kubectl/cmd/rollingupdate_test.go b/pkg/kubectl/cmd/rollingupdate_test.go index 06288a413bc..5ee3286bde0 100644 --- a/pkg/kubectl/cmd/rollingupdate_test.go +++ b/pkg/kubectl/cmd/rollingupdate_test.go @@ -18,16 +18,7 @@ package cmd import ( "bytes" - "io/ioutil" - "net/http" - "reflect" "testing" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) func TestValidateArgs(t *testing.T) { @@ -98,192 +89,3 @@ func TestValidateArgs(t *testing.T) { } } } - -func TestAddDeploymentHash(t *testing.T) { - buf := &bytes.Buffer{} - f, tf, codec := NewAPIFactory() - rc := &api.ReplicationController{ - ObjectMeta: api.ObjectMeta{Name: "rc"}, - Spec: api.ReplicationControllerSpec{ - Selector: map[string]string{ - "foo": "bar", - }, - Template: &api.PodTemplateSpec{ - ObjectMeta: api.ObjectMeta{ - Labels: map[string]string{ - "foo": "bar", - }, - }, - }, - }, - } - - podList := &api.PodList{ - Items: []api.Pod{ - {ObjectMeta: api.ObjectMeta{Name: "foo"}}, - {ObjectMeta: api.ObjectMeta{Name: "bar"}}, - {ObjectMeta: api.ObjectMeta{Name: "baz"}}, - }, - } - - seen := util.StringSet{} - updatedRc := false - tf.Client = &client.FakeRESTClient{ - Codec: codec, - Client: client.HTTPClientFunc(func(req *http.Request) (*http.Response, error) { - switch p, m := req.URL.Path, req.Method; { - case p == "/api/v1beta3/namespaces/default/pods" && m == "GET": - if req.URL.RawQuery != "labelSelector=foo%3Dbar" { - t.Errorf("Unexpected query string: %s", req.URL.RawQuery) - } - return &http.Response{StatusCode: 200, Body: objBody(codec, podList)}, nil - case p == "/api/v1beta3/namespaces/default/pods/foo" && m == "PUT": - seen.Insert("foo") - obj := readOrDie(t, req, codec) - podList.Items[0] = *(obj.(*api.Pod)) - return &http.Response{StatusCode: 200, Body: objBody(codec, &podList.Items[0])}, nil - case p == "/api/v1beta3/namespaces/default/pods/bar" && m == "PUT": - seen.Insert("bar") - obj := readOrDie(t, req, codec) - podList.Items[1] = *(obj.(*api.Pod)) - return &http.Response{StatusCode: 200, Body: objBody(codec, &podList.Items[1])}, nil - case p == "/api/v1beta3/namespaces/default/pods/baz" && m == "PUT": - seen.Insert("baz") - obj := readOrDie(t, req, codec) - podList.Items[2] = *(obj.(*api.Pod)) - return &http.Response{StatusCode: 200, Body: objBody(codec, &podList.Items[2])}, nil - case p == "/api/v1beta3/namespaces/default/replicationcontrollers/rc" && m == "PUT": - updatedRc = true - return &http.Response{StatusCode: 200, Body: objBody(codec, rc)}, nil - default: - t.Fatalf("unexpected request: %#v\n%#v", req.URL, req) - return nil, nil - } - }), - } - tf.ClientConfig = &client.Config{Version: latest.Version} - tf.Namespace = "test" - - client, err := f.Client() - if err != nil { - t.Errorf("unexpected error: %v", err) - t.Fail() - return - } - - if _, err := addDeploymentKeyToReplicationController(rc, client, "hash", api.NamespaceDefault, buf); err != nil { - t.Errorf("unexpected error: %v", err) - } - for _, pod := range podList.Items { - if !seen.Has(pod.Name) { - t.Errorf("Missing update for pod: %s", pod.Name) - } - } - if !updatedRc { - t.Errorf("Failed to update replication controller with new labels") - } -} - -func TestUpdateWithRetries(t *testing.T) { - f, tf, codec := NewAPIFactory() - rc := &api.ReplicationController{ - ObjectMeta: api.ObjectMeta{Name: "rc", - Labels: map[string]string{ - "foo": "bar", - }, - }, - Spec: api.ReplicationControllerSpec{ - Selector: map[string]string{ - "foo": "bar", - }, - Template: &api.PodTemplateSpec{ - ObjectMeta: api.ObjectMeta{ - Labels: map[string]string{ - "foo": "bar", - }, - }, - Spec: api.PodSpec{ - RestartPolicy: api.RestartPolicyAlways, - DNSPolicy: api.DNSClusterFirst, - }, - }, - }, - } - - // Test end to end updating of the rc with retries. Essentially make sure the update handler - // sees the right updates, failures in update/get are handled properly, and that the updated - // rc with new resource version is returned to the caller. Without any of these rollingupdate - // will fail cryptically. - newRc := *rc - newRc.ResourceVersion = "2" - newRc.Spec.Selector["baz"] = "foobar" - updates := []*http.Response{ - {StatusCode: 500, Body: objBody(codec, &api.ReplicationController{})}, - {StatusCode: 500, Body: objBody(codec, &api.ReplicationController{})}, - {StatusCode: 200, Body: objBody(codec, &newRc)}, - } - gets := []*http.Response{ - {StatusCode: 500, Body: objBody(codec, &api.ReplicationController{})}, - {StatusCode: 200, Body: objBody(codec, rc)}, - } - tf.Client = &client.FakeRESTClient{ - Codec: codec, - Client: client.HTTPClientFunc(func(req *http.Request) (*http.Response, error) { - switch p, m := req.URL.Path, req.Method; { - case p == "/api/v1beta3/namespaces/default/replicationcontrollers/rc" && m == "PUT": - update := updates[0] - updates = updates[1:] - // We should always get an update with a valid rc even when the get fails. The rc should always - // contain the update. - if c, ok := readOrDie(t, req, codec).(*api.ReplicationController); !ok || !reflect.DeepEqual(rc, c) { - t.Errorf("Unexpected update body, got %+v expected %+v", c, rc) - } else if sel, ok := c.Spec.Selector["baz"]; !ok || sel != "foobar" { - t.Errorf("Expected selector label update, got %+v", c.Spec.Selector) - } else { - delete(c.Spec.Selector, "baz") - } - return update, nil - case p == "/api/v1beta3/namespaces/default/replicationcontrollers/rc" && m == "GET": - get := gets[0] - gets = gets[1:] - return get, nil - default: - t.Fatalf("unexpected request: %#v\n%#v", req.URL, req) - return nil, nil - } - }), - } - tf.ClientConfig = &client.Config{Version: latest.Version} - client, err := f.Client() - if err != nil { - t.Errorf("unexpected error: %v", err) - t.Fail() - return - } - - if rc, err := updateWithRetries( - client.ReplicationControllers("default"), rc, func(c *api.ReplicationController) { - c.Spec.Selector["baz"] = "foobar" - }); err != nil { - t.Errorf("unexpected error: %v", err) - } else if sel, ok := rc.Spec.Selector["baz"]; !ok || sel != "foobar" || rc.ResourceVersion != "2" { - t.Errorf("Expected updated rc, got %+v", rc) - } - if len(updates) != 0 || len(gets) != 0 { - t.Errorf("Remaining updates %+v gets %+v", updates, gets) - } -} - -func readOrDie(t *testing.T, req *http.Request, codec runtime.Codec) runtime.Object { - data, err := ioutil.ReadAll(req.Body) - if err != nil { - t.Errorf("Error reading: %v", err) - t.FailNow() - } - obj, err := codec.Decode(data) - if err != nil { - t.Errorf("error decoding: %v", err) - t.FailNow() - } - return obj -} diff --git a/pkg/kubectl/rolling_updater.go b/pkg/kubectl/rolling_updater.go index 11afeb23aa8..b4a892decc6 100644 --- a/pkg/kubectl/rolling_updater.go +++ b/pkg/kubectl/rolling_updater.go @@ -17,13 +17,18 @@ limitations under the License. package kubectl import ( + goerrors "errors" "fmt" "io" "strconv" + "strings" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" ) @@ -71,6 +76,50 @@ const ( RenameRollingUpdateCleanupPolicy RollingUpdaterCleanupPolicy = "Rename" ) +func LoadExistingNextReplicationController(c *client.Client, namespace, newName string) (*api.ReplicationController, error) { + if len(newName) == 0 { + return nil, nil + } + newRc, err := c.ReplicationControllers(namespace).Get(newName) + if err != nil && errors.IsNotFound(err) { + return nil, nil + } + return newRc, err +} + +func CreateNewControllerFromCurrentController(c *client.Client, namespace, oldName, newName, image, deploymentKey string) (*api.ReplicationController, error) { + // load the old RC into the "new" RC + newRc, err := c.ReplicationControllers(namespace).Get(oldName) + if err != nil { + return nil, err + } + + if len(newRc.Spec.Template.Spec.Containers) > 1 { + // TODO: support multi-container image update. + return nil, goerrors.New("Image update is not supported for multi-container pods") + } + if len(newRc.Spec.Template.Spec.Containers) == 0 { + return nil, goerrors.New(fmt.Sprintf("Pod has no containers! (%v)", newRc)) + } + newRc.Spec.Template.Spec.Containers[0].Image = image + + newHash, err := api.HashObject(newRc, c.Codec) + if err != nil { + return nil, err + } + + if len(newName) == 0 { + newName = fmt.Sprintf("%s-%s", newRc.Name, newHash) + } + newRc.Name = newName + + newRc.Spec.Selector[deploymentKey] = newHash + newRc.Spec.Template.Labels[deploymentKey] = newHash + // Clear resource version after hashing so that identical updates get different hashes. + newRc.ResourceVersion = "" + return newRc, nil +} + // NewRollingUpdater creates a RollingUpdater from a client func NewRollingUpdater(namespace string, c RollingUpdaterClient) *RollingUpdater { return &RollingUpdater{ @@ -115,6 +164,138 @@ func SetNextControllerAnnotation(rc *api.ReplicationController, name string) { rc.Annotations[nextControllerAnnotation] = name } +func UpdateExistingReplicationController(c client.Interface, oldRc *api.ReplicationController, namespace, newName, deploymentKey, deploymentValue string, out io.Writer) (*api.ReplicationController, error) { + SetNextControllerAnnotation(oldRc, newName) + if _, found := oldRc.Spec.Selector[deploymentKey]; !found { + return AddDeploymentKeyToReplicationController(oldRc, c, deploymentKey, deploymentValue, namespace, out) + } else { + // If we didn't need to update the controller for the deployment key, we still need to write + // the "next" controller. + return c.ReplicationControllers(namespace).Update(oldRc) + } +} + +const MaxRetries = 3 + +func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, client client.Interface, deploymentKey, deploymentValue, namespace string, out io.Writer) (*api.ReplicationController, error) { + var err error + // First, update the template label. This ensures that any newly created pods will have the new label + if oldRc, err = updateWithRetries(client.ReplicationControllers(namespace), oldRc, func(rc *api.ReplicationController) { + if rc.Spec.Template.Labels == nil { + rc.Spec.Template.Labels = map[string]string{} + } + rc.Spec.Template.Labels[deploymentKey] = deploymentValue + }); err != nil { + return nil, err + } + + // Update all pods managed by the rc to have the new hash label, so they are correctly adopted + // TODO: extract the code from the label command and re-use it here. + podList, err := client.Pods(namespace).List(labels.SelectorFromSet(oldRc.Spec.Selector), fields.Everything()) + if err != nil { + return nil, err + } + for ix := range podList.Items { + pod := &podList.Items[ix] + if pod.Labels == nil { + pod.Labels = map[string]string{ + deploymentKey: deploymentValue, + } + } else { + pod.Labels[deploymentKey] = deploymentValue + } + err = nil + delay := 3 + for i := 0; i < MaxRetries; i++ { + _, err = client.Pods(namespace).Update(pod) + if err != nil { + fmt.Fprint(out, "Error updating pod (%v), retrying after %d seconds", err, delay) + time.Sleep(time.Second * time.Duration(delay)) + delay *= delay + } else { + break + } + } + if err != nil { + return nil, err + } + } + + if oldRc.Spec.Selector == nil { + oldRc.Spec.Selector = map[string]string{} + } + // Copy the old selector, so that we can scrub out any orphaned pods + selectorCopy := map[string]string{} + for k, v := range oldRc.Spec.Selector { + selectorCopy[k] = v + } + oldRc.Spec.Selector[deploymentKey] = deploymentValue + + // Update the selector of the rc so it manages all the pods we updated above + if oldRc, err = updateWithRetries(client.ReplicationControllers(namespace), oldRc, func(rc *api.ReplicationController) { + rc.Spec.Selector[deploymentKey] = deploymentValue + }); err != nil { + return nil, err + } + + // Clean up any orphaned pods that don't have the new label, this can happen if the rc manager + // doesn't see the update to its pod template and creates a new pod with the old labels after + // we've finished re-adopting existing pods to the rc. + podList, err = client.Pods(namespace).List(labels.SelectorFromSet(selectorCopy), fields.Everything()) + for ix := range podList.Items { + pod := &podList.Items[ix] + if value, found := pod.Labels[deploymentKey]; !found || value != deploymentValue { + if err := client.Pods(namespace).Delete(pod.Name, nil); err != nil { + return nil, err + } + } + } + + return oldRc, nil +} + +type updateFunc func(controller *api.ReplicationController) + +// updateWithRetries updates applies the given rc as an update. +func updateWithRetries(rcClient client.ReplicationControllerInterface, rc *api.ReplicationController, applyUpdate updateFunc) (*api.ReplicationController, error) { + // Each update could take ~100ms, so give it 0.5 second + var err error + oldRc := rc + err = wait.Poll(10*time.Millisecond, 500*time.Millisecond, func() (bool, error) { + // Apply the update, then attempt to push it to the apiserver. + applyUpdate(rc) + if rc, err = rcClient.Update(rc); err == nil { + // rc contains the latest controller post update + return true, nil + } + // Update the controller with the latest resource version, if the update failed we + // can't trust rc so use oldRc.Name. + if rc, err = rcClient.Get(oldRc.Name); err != nil { + // The Get failed: Value in rc cannot be trusted. + rc = oldRc + } + // The Get passed: rc contains the latest controller, expect a poll for the update. + return false, nil + }) + // If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned + // controller contains the applied update. + return rc, err +} + +func FindSourceController(r RollingUpdaterClient, namespace, name string) (*api.ReplicationController, error) { + list, err := r.ListReplicationControllers(namespace, labels.Everything()) + if err != nil { + return nil, err + } + for ix := range list.Items { + rc := &list.Items[ix] + if rc.Annotations != nil && strings.HasPrefix(rc.Annotations[sourceIdAnnotation], name) { + return rc, nil + } + } + return nil, fmt.Errorf("couldn't find a replication controller with source id == %s/%s", namespace, name) +} + // Update all pods for a ReplicationController (oldRc) by creating a new // controller (newRc) with 0 replicas, and synchronously resizing oldRc,newRc // by 1 until oldRc has 0 replicas and newRc has the original # of desired @@ -286,19 +467,28 @@ func (r *RollingUpdater) updateAndWait(rc *api.ReplicationController, interval, } func (r *RollingUpdater) rename(rc *api.ReplicationController, newName string) error { + return Rename(r.c, rc, newName) +} + +func Rename(c RollingUpdaterClient, rc *api.ReplicationController, newName string) error { oldName := rc.Name rc.Name = newName rc.ResourceVersion = "" - _, err := r.c.CreateReplicationController(rc.Namespace, rc) + _, err := c.CreateReplicationController(rc.Namespace, rc) if err != nil { return err } - return r.c.DeleteReplicationController(rc.Namespace, oldName) + err = c.DeleteReplicationController(rc.Namespace, oldName) + if err != nil && !errors.IsNotFound(err) { + return err + } + return nil } // RollingUpdaterClient abstracts access to ReplicationControllers. type RollingUpdaterClient interface { + ListReplicationControllers(namespace string, selector labels.Selector) (*api.ReplicationControllerList, error) GetReplicationController(namespace, name string) (*api.ReplicationController, error) UpdateReplicationController(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) CreateReplicationController(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) @@ -315,6 +505,10 @@ type realRollingUpdaterClient struct { client client.Interface } +func (c *realRollingUpdaterClient) ListReplicationControllers(namespace string, selector labels.Selector) (*api.ReplicationControllerList, error) { + return c.client.ReplicationControllers(namespace).List(selector) +} + func (c *realRollingUpdaterClient) GetReplicationController(namespace, name string) (*api.ReplicationController, error) { return c.client.ReplicationControllers(namespace).Get(name) } diff --git a/pkg/kubectl/rolling_updater_test.go b/pkg/kubectl/rolling_updater_test.go index fc1152474c0..4bae91d748f 100644 --- a/pkg/kubectl/rolling_updater_test.go +++ b/pkg/kubectl/rolling_updater_test.go @@ -19,13 +19,21 @@ package kubectl import ( "bytes" "fmt" + "io" "io/ioutil" + "net/http" + "reflect" "testing" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" ) @@ -380,8 +388,430 @@ func TestRollingUpdater_preserveCleanup(t *testing.T) { } } +func TestRename(t *testing.T) { + tests := []struct { + namespace string + newName string + oldName string + err error + expectError bool + }{ + { + namespace: "default", + newName: "bar", + oldName: "foo", + }, + { + namespace: "default", + newName: "bar", + oldName: "foo", + err: fmt.Errorf("Test Error"), + expectError: true, + }, + } + for _, test := range tests { + fakeClient := &rollingUpdaterClientImpl{ + CreateReplicationControllerFn: func(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) { + if namespace != test.namespace { + t.Errorf("unexepected namespace: %s, expected %s", namespace, test.namespace) + } + if rc.Name != test.newName { + t.Errorf("unexepected name: %s, expected %s", rc.Name, test.newName) + } + return rc, test.err + }, + DeleteReplicationControllerFn: func(namespace, name string) error { + if namespace != test.namespace { + t.Errorf("unexepected namespace: %s, expected %s", namespace, test.namespace) + } + if name != test.oldName { + t.Errorf("unexepected name: %s, expected %s", name, test.oldName) + } + return nil + }, + } + err := Rename(fakeClient, &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: test.namespace, Name: test.oldName}}, test.newName) + if err != nil && !test.expectError { + t.Errorf("unexpected error: %v", err) + } + if err == nil && test.expectError { + t.Errorf("unexpected non-error") + } + } +} + +func TestFindSourceController(t *testing.T) { + ctrl1 := api.ReplicationController{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Annotations: map[string]string{ + sourceIdAnnotation: "bar:1234", + }, + }, + } + ctrl2 := api.ReplicationController{ + ObjectMeta: api.ObjectMeta{ + Name: "bar", + Annotations: map[string]string{ + sourceIdAnnotation: "foo:12345", + }, + }, + } + ctrl3 := api.ReplicationController{ + ObjectMeta: api.ObjectMeta{ + Annotations: map[string]string{ + sourceIdAnnotation: "baz:45667", + }, + }, + } + tests := []struct { + list *api.ReplicationControllerList + expectedController *api.ReplicationController + err error + name string + expectError bool + }{ + { + list: &api.ReplicationControllerList{}, + expectError: true, + }, + { + list: &api.ReplicationControllerList{ + Items: []api.ReplicationController{ctrl1}, + }, + name: "foo", + expectError: true, + }, + { + list: &api.ReplicationControllerList{ + Items: []api.ReplicationController{ctrl1}, + }, + name: "bar", + expectedController: &ctrl1, + }, + { + list: &api.ReplicationControllerList{ + Items: []api.ReplicationController{ctrl1, ctrl2}, + }, + name: "bar", + expectedController: &ctrl1, + }, + { + list: &api.ReplicationControllerList{ + Items: []api.ReplicationController{ctrl1, ctrl2}, + }, + name: "foo", + expectedController: &ctrl2, + }, + { + list: &api.ReplicationControllerList{ + Items: []api.ReplicationController{ctrl1, ctrl2, ctrl3}, + }, + name: "baz", + expectedController: &ctrl3, + }, + } + for _, test := range tests { + fakeClient := rollingUpdaterClientImpl{ + ListReplicationControllersFn: func(namespace string, selector labels.Selector) (*api.ReplicationControllerList, error) { + return test.list, test.err + }, + } + ctrl, err := FindSourceController(&fakeClient, "default", test.name) + if test.expectError && err == nil { + t.Errorf("unexpected non-error") + } + if !test.expectError && err != nil { + t.Errorf("unexpected error") + } + if !reflect.DeepEqual(ctrl, test.expectedController) { + t.Errorf("expected:\n%v\ngot:\n%v\n", test.expectedController, ctrl) + } + } +} + +func TestUpdateExistingReplicationController(t *testing.T) { + tests := []struct { + rc *api.ReplicationController + name string + deploymentKey string + deploymentValue string + + expectedRc *api.ReplicationController + expectErr bool + }{ + { + rc: &api.ReplicationController{ + Spec: api.ReplicationControllerSpec{ + Template: &api.PodTemplateSpec{}, + }, + }, + name: "foo", + deploymentKey: "dk", + deploymentValue: "some-hash", + + expectedRc: &api.ReplicationController{ + ObjectMeta: api.ObjectMeta{ + Annotations: map[string]string{ + "kubectl.kubernetes.io/next-controller-id": "foo", + }, + }, + Spec: api.ReplicationControllerSpec{ + Selector: map[string]string{ + "dk": "some-hash", + }, + Template: &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{ + "dk": "some-hash", + }, + }, + }, + }, + }, + }, + { + rc: &api.ReplicationController{ + Spec: api.ReplicationControllerSpec{ + Template: &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{ + "dk": "some-other-hash", + }, + }, + }, + Selector: map[string]string{ + "dk": "some-other-hash", + }, + }, + }, + name: "foo", + deploymentKey: "dk", + deploymentValue: "some-hash", + + expectedRc: &api.ReplicationController{ + ObjectMeta: api.ObjectMeta{ + Annotations: map[string]string{ + "kubectl.kubernetes.io/next-controller-id": "foo", + }, + }, + Spec: api.ReplicationControllerSpec{ + Selector: map[string]string{ + "dk": "some-other-hash", + }, + Template: &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{ + "dk": "some-other-hash", + }, + }, + }, + }, + }, + }, + } + for _, test := range tests { + buffer := &bytes.Buffer{} + fakeClient := fakeClientFor("default", []fakeResponse{}) + rc, err := UpdateExistingReplicationController(fakeClient, test.rc, "default", test.name, test.deploymentKey, test.deploymentValue, buffer) + if !reflect.DeepEqual(rc, test.expectedRc) { + t.Errorf("expected:\n%#v\ngot:\n%#v\n", test.expectedRc, rc) + } + if test.expectErr && err == nil { + t.Errorf("unexpected non-error") + } + if !test.expectErr && err != nil { + t.Errorf("unexpected error: %v", err) + } + } +} + +func TestUpdateWithRetries(t *testing.T) { + codec := testapi.Codec() + rc := &api.ReplicationController{ + ObjectMeta: api.ObjectMeta{Name: "rc", + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: api.ReplicationControllerSpec{ + Selector: map[string]string{ + "foo": "bar", + }, + Template: &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: api.PodSpec{ + RestartPolicy: api.RestartPolicyAlways, + DNSPolicy: api.DNSClusterFirst, + }, + }, + }, + } + + // Test end to end updating of the rc with retries. Essentially make sure the update handler + // sees the right updates, failures in update/get are handled properly, and that the updated + // rc with new resource version is returned to the caller. Without any of these rollingupdate + // will fail cryptically. + newRc := *rc + newRc.ResourceVersion = "2" + newRc.Spec.Selector["baz"] = "foobar" + updates := []*http.Response{ + {StatusCode: 500, Body: objBody(codec, &api.ReplicationController{})}, + {StatusCode: 500, Body: objBody(codec, &api.ReplicationController{})}, + {StatusCode: 200, Body: objBody(codec, &newRc)}, + } + gets := []*http.Response{ + {StatusCode: 500, Body: objBody(codec, &api.ReplicationController{})}, + {StatusCode: 200, Body: objBody(codec, rc)}, + } + fakeClient := &client.FakeRESTClient{ + Codec: codec, + Client: client.HTTPClientFunc(func(req *http.Request) (*http.Response, error) { + switch p, m := req.URL.Path, req.Method; { + case p == "/api/v1beta3/namespaces/default/replicationcontrollers/rc" && m == "PUT": + update := updates[0] + updates = updates[1:] + // We should always get an update with a valid rc even when the get fails. The rc should always + // contain the update. + if c, ok := readOrDie(t, req, codec).(*api.ReplicationController); !ok || !reflect.DeepEqual(rc, c) { + t.Errorf("Unexpected update body, got %+v expected %+v", c, rc) + } else if sel, ok := c.Spec.Selector["baz"]; !ok || sel != "foobar" { + t.Errorf("Expected selector label update, got %+v", c.Spec.Selector) + } else { + delete(c.Spec.Selector, "baz") + } + return update, nil + case p == "/api/v1beta3/namespaces/default/replicationcontrollers/rc" && m == "GET": + get := gets[0] + gets = gets[1:] + return get, nil + default: + t.Fatalf("unexpected request: %#v\n%#v", req.URL, req) + return nil, nil + } + }), + } + clientConfig := &client.Config{Version: latest.Version} + client := client.NewOrDie(clientConfig) + client.Client = fakeClient.Client + + if rc, err := updateWithRetries( + client.ReplicationControllers("default"), rc, func(c *api.ReplicationController) { + c.Spec.Selector["baz"] = "foobar" + }); err != nil { + t.Errorf("unexpected error: %v", err) + } else if sel, ok := rc.Spec.Selector["baz"]; !ok || sel != "foobar" || rc.ResourceVersion != "2" { + t.Errorf("Expected updated rc, got %+v", rc) + } + if len(updates) != 0 || len(gets) != 0 { + t.Errorf("Remaining updates %+v gets %+v", updates, gets) + } +} + +func readOrDie(t *testing.T, req *http.Request, codec runtime.Codec) runtime.Object { + data, err := ioutil.ReadAll(req.Body) + if err != nil { + t.Errorf("Error reading: %v", err) + t.FailNow() + } + obj, err := codec.Decode(data) + if err != nil { + t.Errorf("error decoding: %v", err) + t.FailNow() + } + return obj +} + +func objBody(codec runtime.Codec, obj runtime.Object) io.ReadCloser { + return ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(codec, obj)))) +} + +func TestAddDeploymentHash(t *testing.T) { + buf := &bytes.Buffer{} + codec := testapi.Codec() + rc := &api.ReplicationController{ + ObjectMeta: api.ObjectMeta{Name: "rc"}, + Spec: api.ReplicationControllerSpec{ + Selector: map[string]string{ + "foo": "bar", + }, + Template: &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{ + "foo": "bar", + }, + }, + }, + }, + } + + podList := &api.PodList{ + Items: []api.Pod{ + {ObjectMeta: api.ObjectMeta{Name: "foo"}}, + {ObjectMeta: api.ObjectMeta{Name: "bar"}}, + {ObjectMeta: api.ObjectMeta{Name: "baz"}}, + }, + } + + seen := util.StringSet{} + updatedRc := false + fakeClient := &client.FakeRESTClient{ + Codec: codec, + Client: client.HTTPClientFunc(func(req *http.Request) (*http.Response, error) { + switch p, m := req.URL.Path, req.Method; { + case p == "/api/v1beta3/namespaces/default/pods" && m == "GET": + if req.URL.RawQuery != "labelSelector=foo%3Dbar" { + t.Errorf("Unexpected query string: %s", req.URL.RawQuery) + } + return &http.Response{StatusCode: 200, Body: objBody(codec, podList)}, nil + case p == "/api/v1beta3/namespaces/default/pods/foo" && m == "PUT": + seen.Insert("foo") + obj := readOrDie(t, req, codec) + podList.Items[0] = *(obj.(*api.Pod)) + return &http.Response{StatusCode: 200, Body: objBody(codec, &podList.Items[0])}, nil + case p == "/api/v1beta3/namespaces/default/pods/bar" && m == "PUT": + seen.Insert("bar") + obj := readOrDie(t, req, codec) + podList.Items[1] = *(obj.(*api.Pod)) + return &http.Response{StatusCode: 200, Body: objBody(codec, &podList.Items[1])}, nil + case p == "/api/v1beta3/namespaces/default/pods/baz" && m == "PUT": + seen.Insert("baz") + obj := readOrDie(t, req, codec) + podList.Items[2] = *(obj.(*api.Pod)) + return &http.Response{StatusCode: 200, Body: objBody(codec, &podList.Items[2])}, nil + case p == "/api/v1beta3/namespaces/default/replicationcontrollers/rc" && m == "PUT": + updatedRc = true + return &http.Response{StatusCode: 200, Body: objBody(codec, rc)}, nil + default: + t.Fatalf("unexpected request: %#v\n%#v", req.URL, req) + return nil, nil + } + }), + } + clientConfig := &client.Config{Version: latest.Version} + client := client.NewOrDie(clientConfig) + client.Client = fakeClient.Client + + if _, err := AddDeploymentKeyToReplicationController(rc, client, "dk", "hash", api.NamespaceDefault, buf); err != nil { + t.Errorf("unexpected error: %v", err) + } + for _, pod := range podList.Items { + if !seen.Has(pod.Name) { + t.Errorf("Missing update for pod: %s", pod.Name) + } + } + if !updatedRc { + t.Errorf("Failed to update replication controller with new labels") + } +} + // rollingUpdaterClientImpl is a dynamic RollingUpdaterClient. type rollingUpdaterClientImpl struct { + ListReplicationControllersFn func(namespace string, selector labels.Selector) (*api.ReplicationControllerList, error) GetReplicationControllerFn func(namespace, name string) (*api.ReplicationController, error) UpdateReplicationControllerFn func(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) CreateReplicationControllerFn func(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) @@ -389,6 +819,10 @@ type rollingUpdaterClientImpl struct { ControllerHasDesiredReplicasFn func(rc *api.ReplicationController) wait.ConditionFunc } +func (c *rollingUpdaterClientImpl) ListReplicationControllers(namespace string, selector labels.Selector) (*api.ReplicationControllerList, error) { + return c.ListReplicationControllersFn(namespace, selector) +} + func (c *rollingUpdaterClientImpl) GetReplicationController(namespace, name string) (*api.ReplicationController, error) { return c.GetReplicationControllerFn(namespace, name) }