diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index f96b8c94c4c..c7954f9dfa0 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -251,7 +251,7 @@ func runReplicationControllerTest(c *client.Client) { glog.Infof("Done creating replication controllers") // Give the controllers some time to actually create the pods - if err := wait.Poll(time.Second, time.Second*30, c.ControllerHasDesiredReplicas(controller)); err != nil { + if err := wait.Poll(time.Second, time.Second*30, client.ControllerHasDesiredReplicas(c, &controller)); err != nil { glog.Fatalf("FAILED: pods never created %v", err) } diff --git a/pkg/client/conditions.go b/pkg/client/conditions.go index a5934d90286..b67164d6957 100644 --- a/pkg/client/conditions.go +++ b/pkg/client/conditions.go @@ -18,18 +18,17 @@ package client import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" ) // ControllerHasDesiredReplicas returns a condition that will be true iff the desired replica count // for a controller's ReplicaSelector equals the Replicas count. -func (c *Client) ControllerHasDesiredReplicas(controller api.ReplicationController) wait.ConditionFunc { +func ControllerHasDesiredReplicas(c Interface, controller *api.ReplicationController) wait.ConditionFunc { return func() (bool, error) { - pods, err := c.Pods(controller.Namespace).List(labels.Set(controller.Spec.Selector).AsSelector()) + ctrl, err := c.ReplicationControllers(controller.Namespace).Get(controller.Name) if err != nil { return false, err } - return len(pods.Items) == controller.Spec.Replicas, nil + return ctrl.Status.Replicas == ctrl.Spec.Replicas, nil } } diff --git a/pkg/kubectl/cmd/cmd.go b/pkg/kubectl/cmd/cmd.go index 6547ae56e66..98005b4c5dc 100644 --- a/pkg/kubectl/cmd/cmd.go +++ b/pkg/kubectl/cmd/cmd.go @@ -118,6 +118,7 @@ Find more information at https://github.com/GoogleCloudPlatform/kubernetes.`, cmds.AddCommand(NewCmdNamespace(out)) cmds.AddCommand(f.NewCmdLog(out)) + cmds.AddCommand(f.NewCmdRollingUpdate(out)) if err := cmds.Execute(); err != nil { os.Exit(1) diff --git a/pkg/kubectl/cmd/helpers.go b/pkg/kubectl/cmd/helpers.go index a460c5e06ee..4b42e5124c5 100644 --- a/pkg/kubectl/cmd/helpers.go +++ b/pkg/kubectl/cmd/helpers.go @@ -24,6 +24,7 @@ import ( "path/filepath" "strconv" "strings" + "time" "github.com/golang/glog" "github.com/spf13/cobra" @@ -82,6 +83,16 @@ func GetFlagInt(cmd *cobra.Command, flag string) int { return v } +func GetFlagDuration(cmd *cobra.Command, flag string) time.Duration { + f := cmd.Flags().Lookup(flag) + if f == nil { + glog.Fatalf("Flag accessed but not defined for command %s: %s", cmd.Name(), flag) + } + v, err := time.ParseDuration(f.Value.String()) + checkErr(err) + return v +} + // Returns the first non-empty string out of the ones provided. If all // strings are empty, returns an empty string. func FirstNonEmptyString(args ...string) string { diff --git a/pkg/kubectl/cmd/rollingupdate.go b/pkg/kubectl/cmd/rollingupdate.go new file mode 100644 index 00000000000..839c09c874e --- /dev/null +++ b/pkg/kubectl/cmd/rollingupdate.go @@ -0,0 +1,109 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cmd + +import ( + "fmt" + "io" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl" + "github.com/spf13/cobra" +) + +const ( + updatePeriod = "1m0s" + timeout = "5m0s" + pollInterval = "3s" +) + +func (f *Factory) NewCmdRollingUpdate(out io.Writer) *cobra.Command { + cmd := &cobra.Command{ + Use: "rollingupdate -f ", + Short: "Perform a rolling update of the given replicationController", + Long: `Perform a rolling update of the given replicationController.", + +Replaces named controller with new controller, updating one pod at a time to use the +new PodTemplate. The new-controller.json must specify the same namespace as the +existing controller and overwrite at least one (common) label in its replicaSelector. + +Examples: +$ kubectl rollingupdate frontend-v1 -f frontend-v2.json + + +$ cat frontend-v2.json | kubectl rollingupdate frontend-v1 -f - + `, + Run: func(cmd *cobra.Command, args []string) { + filename := GetFlagString(cmd, "filename") + if len(filename) == 0 { + usageError(cmd, "Must specify filename for new controller") + } + period := GetFlagDuration(cmd, "update-period") + interval := GetFlagDuration(cmd, "poll-interval") + timeout := GetFlagDuration(cmd, "timeout") + if len(args) != 1 { + usageError(cmd, "Must specify the controller to update") + } + oldName := args[0] + schema, err := f.Validator(cmd) + checkErr(err) + mapping, namespace, newName, data := ResourceFromFile(cmd, filename, f.Typer, f.Mapper, schema) + if mapping.Kind != "ReplicationController" { + usageError(cmd, "%s does not specify a valid ReplicationController", filename) + } + err = CompareNamespaceFromFile(cmd, namespace) + checkErr(err) + + client, err := f.ClientBuilder.Client() + checkErr(err) + obj, err := mapping.Codec.Decode(data) + checkErr(err) + newRc := obj.(*api.ReplicationController) + + updater := kubectl.NewRollingUpdater(namespace, client) + + // fetch rc + oldRc, err := client.ReplicationControllers(namespace).Get(oldName) + checkErr(err) + + var hasLabel bool + for key, oldValue := range oldRc.Spec.Selector { + if newValue, ok := newRc.Spec.Selector[key]; ok && newValue != oldValue { + hasLabel = true + break + } + } + if !hasLabel { + usageError(cmd, "%s must specify a matching key with non-equal value in Selector for %s", + filename, oldName) + } + // TODO: handle resizes during rolling update + if newRc.Spec.Replicas == 0 { + newRc.Spec.Replicas = oldRc.Spec.Replicas + } + err = updater.Update(out, oldRc, newRc, period, interval, timeout) + checkErr(err) + + fmt.Fprintf(out, "%s\n", newName) + }, + } + cmd.Flags().String("update-period", updatePeriod, `Time to wait between updating pods. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".`) + cmd.Flags().String("poll-interval", pollInterval, `Time delay between polling controller status after update. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".`) + cmd.Flags().String("timeout", timeout, `Max time to wait for a controller to update before giving up. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".`) + cmd.Flags().StringP("filename", "f", "", "Filename or URL to file to use to create the new controller") + return cmd +} diff --git a/pkg/kubectl/kubectl.go b/pkg/kubectl/kubectl.go index c32254cbd2f..4a5a1d9ac78 100644 --- a/pkg/kubectl/kubectl.go +++ b/pkg/kubectl/kubectl.go @@ -34,6 +34,8 @@ import ( var apiVersionToUse = "v1beta1" +const kubectlAnnotationPrefix = "kubectl.kubernetes.io/" + func GetKubeClient(config *client.Config, matchVersion bool) (*client.Client, error) { // TODO: get the namespace context when kubectl ns is completed c, err := client.New(config) diff --git a/pkg/kubectl/rolling_updater.go b/pkg/kubectl/rolling_updater.go new file mode 100644 index 00000000000..b7fbb266c32 --- /dev/null +++ b/pkg/kubectl/rolling_updater.go @@ -0,0 +1,173 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubectl + +import ( + "fmt" + "io" + "strconv" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" +) + +// RollingUpdater provides methods for updating replicated pods in a predictable, +// fault-tolerant way. +type RollingUpdater struct { + // Client interface for creating and updating controllers + c client.Interface + // Namespace for resources + ns string +} + +// NewRollingUpdater creates a RollingUpdater from a client +func NewRollingUpdater(namespace string, c client.Interface) *RollingUpdater { + return &RollingUpdater{ + c, + namespace, + } +} + +const ( + sourceIdAnnotation = kubectlAnnotationPrefix + "update-source-id" + desiredReplicasAnnotation = kubectlAnnotationPrefix + "desired-replicas" +) + +// Update all pods for a ReplicationController (oldRc) by creating a new controller (newRc) +// with 0 replicas, and synchronously resizing oldRc,newRc by 1 until oldRc has 0 replicas +// and newRc has the original # of desired replicas. oldRc is then deleted. +// If an update from newRc to oldRc is already in progress, we attempt to drive it to completion. +// If an error occurs at any step of the update, the error will be returned. +// 'out' writer for progress output +// 'oldRc' existing controller to be replaced +// 'newRc' controller that will take ownership of updated pods (will be created if needed) +// 'updatePeriod' time to wait between individual pod updates +// 'interval' time to wait between polling controller status after update +// 'timeout' time to wait for controller updates before giving up +// +// TODO: make this handle performing a rollback of a partially completed rollout. +func (r *RollingUpdater) Update(out io.Writer, oldRc, newRc *api.ReplicationController, updatePeriod, interval, timeout time.Duration) error { + oldName := oldRc.ObjectMeta.Name + newName := newRc.ObjectMeta.Name + + if newRc.Spec.Replicas <= 0 { + return fmt.Errorf("Invalid controller spec for %s; required: > 0 replicas, actual: %s\n", newName, newRc.Spec) + } + desired := newRc.Spec.Replicas + sourceId := fmt.Sprintf("%s:%s", oldName, oldRc.ObjectMeta.UID) + + // look for existing newRc, incase this update was previously started but interrupted + rc, existing, err := r.getExistingNewRc(sourceId, newName) + if existing { + fmt.Fprintf(out, "Continuing update with existing controller %s.\n", newName) + if err != nil { + return err + } + replicas := rc.ObjectMeta.Annotations[desiredReplicasAnnotation] + desired, err = strconv.Atoi(replicas) + if err != nil { + return fmt.Errorf("Unable to parse annotation for %s: %s=%s", + newName, desiredReplicasAnnotation, replicas) + } + newRc = rc + } else { + fmt.Fprintf(out, "Creating %s\n", newName) + if newRc.ObjectMeta.Annotations == nil { + newRc.ObjectMeta.Annotations = map[string]string{} + } + newRc.ObjectMeta.Annotations[desiredReplicasAnnotation] = fmt.Sprintf("%d", desired) + newRc.ObjectMeta.Annotations[sourceIdAnnotation] = sourceId + newRc.Spec.Replicas = 0 + newRc, err = r.c.ReplicationControllers(r.ns).Create(newRc) + if err != nil { + return err + } + } + + // +1, -1 on oldRc, newRc until newRc has desired number of replicas or oldRc has 0 replicas + for newRc.Spec.Replicas < desired && oldRc.Spec.Replicas != 0 { + newRc.Spec.Replicas += 1 + oldRc.Spec.Replicas -= 1 + fmt.Fprintf(out, "Updating %s replicas: %d, %s replicas: %d\n", + oldName, oldRc.Spec.Replicas, + newName, newRc.Spec.Replicas) + + newRc, err = r.updateAndWait(newRc, interval, timeout) + if err != nil { + return err + } + time.Sleep(updatePeriod) + oldRc, err = r.updateAndWait(oldRc, interval, timeout) + if err != nil { + return err + } + } + // delete remaining replicas on oldRc + if oldRc.Spec.Replicas != 0 { + fmt.Fprintf(out, "Stopping %s replicas: %d -> %d\n", + oldName, oldRc.Spec.Replicas, 0) + oldRc.Spec.Replicas = 0 + oldRc, err = r.updateAndWait(oldRc, interval, timeout) + if err != nil { + return err + } + } + // add remaining replicas on newRc, cleanup annotations + if newRc.Spec.Replicas != desired { + fmt.Fprintf(out, "Resizing %s replicas: %d -> %d\n", + newName, newRc.Spec.Replicas, desired) + newRc.Spec.Replicas = desired + } + delete(newRc.ObjectMeta.Annotations, sourceIdAnnotation) + delete(newRc.ObjectMeta.Annotations, desiredReplicasAnnotation) + newRc, err = r.updateAndWait(newRc, interval, timeout) + if err != nil { + return err + } + // delete old rc + fmt.Fprintf(out, "Update succeeded. Deleting %s\n", oldName) + return r.c.ReplicationControllers(r.ns).Delete(oldName) +} + +func (r *RollingUpdater) getExistingNewRc(sourceId, name string) (rc *api.ReplicationController, existing bool, err error) { + if rc, err = r.c.ReplicationControllers(r.ns).Get(name); err == nil { + existing = true + annotations := rc.ObjectMeta.Annotations + source := annotations[sourceIdAnnotation] + _, ok := annotations[desiredReplicasAnnotation] + if source != sourceId || !ok { + err = fmt.Errorf("Missing/unexpected annotations for controller %s: %s", name, annotations) + } + return + } + err = nil + return +} + +func (r *RollingUpdater) updateAndWait(rc *api.ReplicationController, interval, timeout time.Duration) (*api.ReplicationController, error) { + rc, err := r.c.ReplicationControllers(r.ns).Update(rc) + if err != nil { + return nil, err + } + if err := wait.Poll(interval, timeout, + client.ControllerHasDesiredReplicas(r.c, rc)); err != nil { + return nil, err + } + return r.c.ReplicationControllers(r.ns).Get(rc.ObjectMeta.Name) +} diff --git a/pkg/kubectl/rolling_updater_test.go b/pkg/kubectl/rolling_updater_test.go new file mode 100644 index 00000000000..d1b56cb9551 --- /dev/null +++ b/pkg/kubectl/rolling_updater_test.go @@ -0,0 +1,276 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubectl + +import ( + "bytes" + "fmt" + "testing" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" +) + +type customFake struct { + *client.Fake + ctrl client.ReplicationControllerInterface +} + +func (c *customFake) ReplicationControllers(namespace string) client.ReplicationControllerInterface { + return c.ctrl +} + +func fakeClientFor(namespace string, responses []fakeResponse) client.Interface { + fake := client.Fake{} + return &customFake{ + &fake, + &fakeRc{ + &client.FakeReplicationControllers{ + Fake: &fake, + Namespace: namespace, + }, + responses, + }, + } +} + +type fakeResponse struct { + controller *api.ReplicationController + err error +} + +type fakeRc struct { + *client.FakeReplicationControllers + responses []fakeResponse +} + +func (c *fakeRc) Get(name string) (*api.ReplicationController, error) { + action := client.FakeAction{Action: "get-controller", Value: name} + if len(c.responses) == 0 { + return nil, fmt.Errorf("Unexpected Action: %s", action) + } + c.Fake.Actions = append(c.Fake.Actions, action) + result := c.responses[0] + c.responses = c.responses[1:] + return result.controller, result.err +} + +func (c *fakeRc) Create(controller *api.ReplicationController) (*api.ReplicationController, error) { + c.Fake.Actions = append(c.Fake.Actions, client.FakeAction{Action: "create-controller", Value: controller.ObjectMeta.Name}) + return controller, nil +} + +func (c *fakeRc) Update(controller *api.ReplicationController) (*api.ReplicationController, error) { + c.Fake.Actions = append(c.Fake.Actions, client.FakeAction{Action: "update-controller", Value: controller.ObjectMeta.Name}) + return controller, nil +} + +func oldRc(replicas int) *api.ReplicationController { + return &api.ReplicationController{ + ObjectMeta: api.ObjectMeta{ + Name: "foo-v1", + UID: "7764ae47-9092-11e4-8393-42010af018ff", + }, + Spec: api.ReplicationControllerSpec{ + Replicas: replicas, + Selector: map[string]string{"version": "v1"}, + Template: &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Name: "foo-v1", + Labels: map[string]string{"version": "v1"}, + }, + }, + }, + Status: api.ReplicationControllerStatus{ + Replicas: replicas, + }, + } +} + +func newRc(replicas int, desired int) *api.ReplicationController { + rc := oldRc(replicas) + rc.Spec.Template = &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Name: "foo-v2", + Labels: map[string]string{"version": "v2"}, + }, + } + rc.Spec.Selector = map[string]string{"version": "v2"} + rc.ObjectMeta = api.ObjectMeta{ + Name: "foo-v2", + Annotations: map[string]string{ + desiredReplicasAnnotation: fmt.Sprintf("%d", desired), + sourceIdAnnotation: "foo-v1:7764ae47-9092-11e4-8393-42010af018ff", + }, + } + return rc +} + +func TestUpdate(t *testing.T) { + tests := []struct { + oldRc, newRc *api.ReplicationController + responses []fakeResponse + output string + }{ + { + oldRc(1), newRc(1, 1), + []fakeResponse{ + // no existing newRc + {nil, fmt.Errorf("not found")}, + // one update round + {newRc(1, 1), nil}, + {newRc(1, 1), nil}, + {oldRc(0), nil}, + {oldRc(0), nil}, + // get newRc after final update (to cleanup annotations) + {newRc(1, 1), nil}, + {newRc(1, 1), nil}, + }, + `Creating foo-v2 +Updating foo-v1 replicas: 0, foo-v2 replicas: 1 +Update succeeded. Deleting foo-v1 +`, + }, { + oldRc(2), newRc(2, 2), + []fakeResponse{ + // no existing newRc + {nil, fmt.Errorf("not found")}, + // 2 gets for each update (poll for condition, refetch) + {newRc(1, 2), nil}, + {newRc(1, 2), nil}, + {oldRc(1), nil}, + {oldRc(1), nil}, + {newRc(2, 2), nil}, + {newRc(2, 2), nil}, + {oldRc(0), nil}, + {oldRc(0), nil}, + // get newRc after final update (cleanup annotations) + {newRc(2, 2), nil}, + {newRc(2, 2), nil}, + }, + `Creating foo-v2 +Updating foo-v1 replicas: 1, foo-v2 replicas: 1 +Updating foo-v1 replicas: 0, foo-v2 replicas: 2 +Update succeeded. Deleting foo-v1 +`, + }, { + oldRc(2), newRc(7, 7), + []fakeResponse{ + // no existing newRc + {nil, fmt.Errorf("not found")}, + // 2 gets for each update (poll for condition, refetch) + {newRc(1, 2), nil}, + {newRc(1, 2), nil}, + {oldRc(1), nil}, + {oldRc(1), nil}, + {newRc(2, 2), nil}, + {newRc(2, 2), nil}, + {oldRc(0), nil}, + {oldRc(0), nil}, + // final update on newRc (resize + cleanup annotations) + {newRc(7, 7), nil}, + {newRc(7, 7), nil}, + }, + `Creating foo-v2 +Updating foo-v1 replicas: 1, foo-v2 replicas: 1 +Updating foo-v1 replicas: 0, foo-v2 replicas: 2 +Resizing foo-v2 replicas: 2 -> 7 +Update succeeded. Deleting foo-v1 +`, + }, { + oldRc(7), newRc(2, 2), + []fakeResponse{ + // no existing newRc + {nil, fmt.Errorf("not found")}, + // 2 gets for each update (poll for condition, refetch) + {newRc(1, 2), nil}, + {newRc(1, 2), nil}, + {oldRc(6), nil}, + {oldRc(6), nil}, + {newRc(2, 2), nil}, + {newRc(2, 2), nil}, + {oldRc(5), nil}, + {oldRc(5), nil}, + // stop oldRc + {oldRc(0), nil}, + {oldRc(0), nil}, + // final update on newRc (cleanup annotations) + {newRc(2, 2), nil}, + {newRc(2, 2), nil}, + }, + `Creating foo-v2 +Updating foo-v1 replicas: 6, foo-v2 replicas: 1 +Updating foo-v1 replicas: 5, foo-v2 replicas: 2 +Stopping foo-v1 replicas: 5 -> 0 +Update succeeded. Deleting foo-v1 +`, + }, + } + + for _, test := range tests { + updater := RollingUpdater{ + fakeClientFor("default", test.responses), + "default", + } + var buffer bytes.Buffer + + if err := updater.Update(&buffer, test.oldRc, test.newRc, 0, 1*time.Millisecond, 1*time.Millisecond); err != nil { + t.Errorf("Update failed: %v", err) + } + if buffer.String() != test.output { + t.Errorf("Bad output. expected:\n%s\ngot:\n%s", test.output, buffer.String()) + } + } +} + +func TestUpdateRecovery(t *testing.T) { + // Test recovery from interruption + rc := oldRc(2) + rcExisting := newRc(1, 3) + + output := `Continuing update with existing controller foo-v2. +Updating foo-v1 replicas: 1, foo-v2 replicas: 2 +Updating foo-v1 replicas: 0, foo-v2 replicas: 3 +Update succeeded. Deleting foo-v1 +` + responses := []fakeResponse{ + // Existing newRc + {rcExisting, nil}, + // 2 gets for each update (poll for condition, refetch) + {newRc(2, 2), nil}, + {newRc(2, 2), nil}, + {oldRc(1), nil}, + {oldRc(1), nil}, + {newRc(3, 3), nil}, + {newRc(3, 3), nil}, + {oldRc(0), nil}, + {oldRc(0), nil}, + // get newRc after final update (cleanup annotations) + {newRc(3, 3), nil}, + {newRc(3, 3), nil}, + } + updater := RollingUpdater{fakeClientFor("default", responses), "default"} + + var buffer bytes.Buffer + if err := updater.Update(&buffer, rc, rcExisting, 0, 1*time.Millisecond, 1*time.Millisecond); err != nil { + t.Errorf("Update failed: %v", err) + } + if buffer.String() != output { + t.Errorf("Output was not as expected. Expected:\n%s\nGot:\n%s", output, buffer.String()) + } +}