diff --git a/pkg/kubectl/cmd/rollingupdate.go b/pkg/kubectl/cmd/rollingupdate.go index c3bb899d1fb..9d934913139 100644 --- a/pkg/kubectl/cmd/rollingupdate.go +++ b/pkg/kubectl/cmd/rollingupdate.go @@ -258,13 +258,14 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg updateCleanupPolicy = kubectl.RenameRollingUpdateCleanupPolicy } config := &kubectl.RollingUpdaterConfig{ - Out: out, - OldRc: oldRc, - NewRc: newRc, - UpdatePeriod: period, - Interval: interval, - Timeout: timeout, - CleanupPolicy: updateCleanupPolicy, + Out: out, + OldRc: oldRc, + NewRc: newRc, + UpdatePeriod: period, + Interval: interval, + Timeout: timeout, + CleanupPolicy: updateCleanupPolicy, + UpdateAcceptor: kubectl.DefaultUpdateAcceptor, } if cmdutil.GetFlagBool(cmd, "rollback") { kubectl.AbortRollingUpdate(config) diff --git a/pkg/kubectl/rolling_updater.go b/pkg/kubectl/rolling_updater.go index a52c00e4aed..c8ced0c2c62 100644 --- a/pkg/kubectl/rolling_updater.go +++ b/pkg/kubectl/rolling_updater.go @@ -20,6 +20,7 @@ import ( goerrors "errors" "fmt" "io" + "math" "strconv" "strings" "time" @@ -39,8 +40,14 @@ type RollingUpdater struct { c RollingUpdaterClient // Namespace for resources ns string + // scaleAndWait scales a controller and returns its updated state. + scaleAndWait scaleAndWait } +// scaleAndWait scales rc and returns its updated state. This typedef is to +// abstract away the use of a Scaler to ease testing. +type scaleAndWait func(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) + // RollingUpdaterConfig is the configuration for a rolling deployment process. type RollingUpdaterConfig struct { // Out is a writer for progress output. @@ -60,6 +67,16 @@ type RollingUpdaterConfig struct { // CleanupPolicy defines the cleanup action to take after the deployment is // complete. CleanupPolicy RollingUpdaterCleanupPolicy + // UpdateAcceptor is given a chance to accept the new controller after each + // scale-up operation. If the controller is accepted, updates continue; if + // the controller is rejected, the update will fail immediately. + UpdateAcceptor UpdateAcceptor + // UpdatePercent is optional; if specified, the amount of replicas scaled up + // and down each interval will be computed as a percentage of the desired + // replicas for the new RC. If UpdatePercent is nil, one replica will be + // scaled up and down each interval. If UpdatePercent is negative, the order + // of scaling will be down/up instead of up/down. + UpdatePercent *int } // RollingUpdaterCleanupPolicy is a cleanup action to take after the @@ -76,6 +93,26 @@ const ( RenameRollingUpdateCleanupPolicy RollingUpdaterCleanupPolicy = "Rename" ) +// UpdateAcceptor is given a chance to accept or reject the new controller +// during a deployment each time the controller is scaled up. +// +// After the successful scale-up of the controller, the controller is given to +// the UpdateAcceptor. If the UpdateAcceptor rejects the controller, the +// deployment is stopped with an error. +type UpdateAcceptor interface { + // Accept returns nil if the controller is okay, otherwise returns an error. + Accept(*api.ReplicationController) error +} + +// AlwaysAccept is an UpdateAcceptor which always accepts the controller. +type AlwaysAccept struct{} + +// Accept implements UpdateAcceptor. +func (a *AlwaysAccept) Accept(*api.ReplicationController) error { return nil } + +// DefaultUpdaterAcceptor always accepts controllers. +var DefaultUpdateAcceptor UpdateAcceptor = &AlwaysAccept{} + func LoadExistingNextReplicationController(c *client.Client, namespace, newName string) (*api.ReplicationController, error) { if len(newName) == 0 { return nil, nil @@ -121,10 +158,12 @@ func CreateNewControllerFromCurrentController(c *client.Client, namespace, oldNa } // NewRollingUpdater creates a RollingUpdater from a client -func NewRollingUpdater(namespace string, c RollingUpdaterClient) *RollingUpdater { +func NewRollingUpdater(namespace string, client RollingUpdaterClient) *RollingUpdater { return &RollingUpdater{ - c, - namespace, + c: client, + ns: namespace, + // Use a real scaleAndWait implementation. + scaleAndWait: scalerScaleAndWait(client, namespace), } } @@ -175,6 +214,21 @@ func UpdateExistingReplicationController(c client.Interface, oldRc *api.Replicat } } +// scalerScaleAndWait returns a scaleAndWait function which scales a +// controller using a Scaler and a real client. +func scalerScaleAndWait(client RollingUpdaterClient, namespace string) scaleAndWait { + scaler, err := ScalerFor("ReplicationController", client) + return func(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) { + if err != nil { + return nil, fmt.Errorf("Couldn't make scaler: %s", err) + } + if err := scaler.Scale(rc.Namespace, rc.Name, uint(rc.Spec.Replicas), &ScalePrecondition{-1, ""}, retry, wait); err != nil { + return nil, err + } + return client.GetReplicationController(namespace, rc.ObjectMeta.Name) + } +} + const MaxRetries = 3 func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, client client.Interface, deploymentKey, deploymentValue, namespace string, out io.Writer) (*api.ReplicationController, error) { @@ -297,10 +351,14 @@ func FindSourceController(r RollingUpdaterClient, namespace, name string) (*api. } // Update all pods for a ReplicationController (oldRc) by creating a new -// controller (newRc) with 0 replicas, and synchronously scaling oldRc,newRc -// by 1 until oldRc has 0 replicas and newRc has the original # of desired +// controller (newRc) with 0 replicas, and synchronously scaling oldRc and +// newRc until oldRc has 0 replicas and newRc has the original # of desired // replicas. Cleanup occurs based on a RollingUpdaterCleanupPolicy. // +// The scaling amount each interval is either 1 or based on a percent of the +// desired replicas. If a percentage is used and the percentage is negative, +// the scaling order is inverted to down/up instead of the default up/down. +// // If an update from newRc to oldRc is already in progress, we attempt to // drive it to completion. If an error occurs at any step of the update, the // error will be returned. @@ -353,51 +411,50 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error { } } - // +1, -1 on oldRc, newRc until newRc has desired number of replicas or oldRc has 0 replicas - for newRc.Spec.Replicas < desired && oldRc.Spec.Replicas != 0 { - newRc.Spec.Replicas += 1 - oldRc.Spec.Replicas -= 1 - fmt.Printf("At beginning of loop: %s replicas: %d, %s replicas: %d\n", - oldName, oldRc.Spec.Replicas, - newName, newRc.Spec.Replicas) - fmt.Fprintf(out, "Updating %s replicas: %d, %s replicas: %d\n", - oldName, oldRc.Spec.Replicas, - newName, newRc.Spec.Replicas) + // Compute the scale amount based on a percentage of the new desired count. + // A negative percentage indicates our scale direction should be down-first. + scaleAmount := 1 + skipFirstUp := false + if config.UpdatePercent != nil { + scaleAmount = int(math.Ceil(float64(desired) * (math.Abs(float64(*config.UpdatePercent)) / 100))) + if *config.UpdatePercent < 0 { + skipFirstUp = true + } + } + // Helpful output about what we're about to do. + direction := "up" + if skipFirstUp { + direction = "down" + } + fmt.Fprintf(out, "Scaling up %s from %d to %d, scaling down %s from %d to 0 (scale %s first by %d each interval)\n", + newRc.Name, newRc.Spec.Replicas, desired, oldRc.Name, oldRc.Spec.Replicas, direction, scaleAmount) - newRc, err = r.scaleAndWait(newRc, retry, waitForReplicas) + // Scale newRc and oldRc until newRc has the desired number of replicas and + // oldRc has 0 replicas. + for newRc.Spec.Replicas != desired || oldRc.Spec.Replicas != 0 { + // Choose up/down vs. down/up scaling direction. + if !skipFirstUp { + scaledRc, err := r.scaleUp(newRc, oldRc, desired, scaleAmount, retry, waitForReplicas, out, config) + if err != nil { + return err + } + newRc = scaledRc + time.Sleep(updatePeriod) + skipFirstUp = true + } + scaledRc, err := r.scaleDown(newRc, oldRc, desired, scaleAmount, retry, waitForReplicas, out, config) if err != nil { return err } + rc = scaledRc time.Sleep(updatePeriod) - oldRc, err = r.scaleAndWait(oldRc, retry, waitForReplicas) - if err != nil { - return err - } - fmt.Printf("At end of loop: %s replicas: %d, %s replicas: %d\n", - oldName, oldRc.Spec.Replicas, - newName, newRc.Spec.Replicas) - } - // delete remaining replicas on oldRc - if oldRc.Spec.Replicas != 0 { - fmt.Fprintf(out, "Stopping %s replicas: %d -> %d\n", - oldName, oldRc.Spec.Replicas, 0) - oldRc.Spec.Replicas = 0 - oldRc, err = r.scaleAndWait(oldRc, retry, waitForReplicas) - // oldRc, err = r.scaleAndWait(oldRc, interval, timeout) - if err != nil { - return err - } - } - // add remaining replicas on newRc - if newRc.Spec.Replicas != desired { - fmt.Fprintf(out, "Scaling %s replicas: %d -> %d\n", - newName, newRc.Spec.Replicas, desired) - newRc.Spec.Replicas = desired - newRc, err = r.scaleAndWait(newRc, retry, waitForReplicas) + scaledRc, err = r.scaleUp(newRc, oldRc, desired, scaleAmount, retry, waitForReplicas, out, config) if err != nil { return err } + newRc = scaledRc } + // Clean up annotations if newRc, err = r.c.GetReplicationController(r.ns, newName); err != nil { return err @@ -429,6 +486,50 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error { } } +// scaleUp scales up newRc to desired by scaleAmount. It accounts for +// fencepost conditions. If newRc is already scaled to desired, scaleUp does +// nothing. If the oldRc is already scaled to 0, newRc is scaled to desired +// immediately regardless of scale count. +func (r *RollingUpdater) scaleUp(newRc, oldRc *api.ReplicationController, desired, scaleAmount int, retry, wait *RetryParams, out io.Writer, config *RollingUpdaterConfig) (*api.ReplicationController, error) { + if newRc.Spec.Replicas == desired { + return newRc, nil + } + newRc.Spec.Replicas += scaleAmount + if newRc.Spec.Replicas > desired || oldRc.Spec.Replicas == 0 { + newRc.Spec.Replicas = desired + } + fmt.Fprintf(out, "Scaling %s up to %d\n", newRc.Name, newRc.Spec.Replicas) + scaledRc, err := r.scaleAndWait(newRc, retry, wait) + if err != nil { + return nil, err + } + err = config.UpdateAcceptor.Accept(scaledRc) + if err != nil { + return nil, fmt.Errorf("update rejected for %s: %v", scaledRc.Name, err) + } + return scaledRc, nil +} + +// scaleDown scales down oldRc to 0 by scaleAmount. It accounts for fencepost +// conditions. If oldRc is already scaled to 0, scaleDown does nothing. If +// newRc is already scaled to desired, oldRc is scaled to 0 immediately +// regardless of scaleAmount. +func (r *RollingUpdater) scaleDown(newRc, oldRc *api.ReplicationController, desired, scaleAmount int, retry, wait *RetryParams, out io.Writer, config *RollingUpdaterConfig) (*api.ReplicationController, error) { + if oldRc.Spec.Replicas == 0 { + return oldRc, nil + } + oldRc.Spec.Replicas -= scaleAmount + if oldRc.Spec.Replicas < 0 || newRc.Spec.Replicas == desired { + oldRc.Spec.Replicas = 0 + } + fmt.Fprintf(out, "Scaling %s down to %d\n", oldRc.Name, oldRc.Spec.Replicas) + scaledRc, err := r.scaleAndWait(oldRc, retry, wait) + if err != nil { + return nil, err + } + return scaledRc, nil +} + func (r *RollingUpdater) getExistingNewRc(sourceId, name string) (rc *api.ReplicationController, existing bool, err error) { if rc, err = r.c.GetReplicationController(r.ns, name); err == nil { existing = true @@ -444,17 +545,6 @@ func (r *RollingUpdater) getExistingNewRc(sourceId, name string) (rc *api.Replic return } -func (r *RollingUpdater) scaleAndWait(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) { - scaler, err := ScalerFor("ReplicationController", r.c) - if err != nil { - return nil, err - } - if err := scaler.Scale(rc.Namespace, rc.Name, uint(rc.Spec.Replicas), &ScalePrecondition{-1, ""}, retry, wait); err != nil { - return nil, err - } - return r.c.GetReplicationController(r.ns, rc.ObjectMeta.Name) -} - func (r *RollingUpdater) updateAndWait(rc *api.ReplicationController, interval, timeout time.Duration) (*api.ReplicationController, error) { rc, err := r.c.UpdateReplicationController(r.ns, rc) if err != nil { diff --git a/pkg/kubectl/rolling_updater_test.go b/pkg/kubectl/rolling_updater_test.go index 79ba4bdd7d5..8bd497ff000 100644 --- a/pkg/kubectl/rolling_updater_test.go +++ b/pkg/kubectl/rolling_updater_test.go @@ -132,166 +132,396 @@ func newRc(replicas int, desired int) *api.ReplicationController { } func TestUpdate(t *testing.T) { + // Helpers + Percent := func(p int) *int { + return &p + } + var NilPercent *int + // Scenarios tests := []struct { oldRc, newRc *api.ReplicationController + accepted bool + percent *int responses []fakeResponse output string }{ { - oldRc(1), newRc(1, 1), - []fakeResponse{ + oldRc: oldRc(1), + newRc: newRc(1, 1), + accepted: true, + percent: NilPercent, + responses: []fakeResponse{ // no existing newRc {nil, fmt.Errorf("not found")}, - // 4 gets for each scale - {newRc(1, 1), nil}, - {newRc(1, 1), nil}, - {newRc(1, 1), nil}, - {newRc(1, 1), nil}, - {newRc(1, 1), nil}, + // scaling iteration {newRc(1, 1), nil}, {oldRc(0), nil}, - {oldRc(0), nil}, - {oldRc(0), nil}, - // {oldRc(0), nil}, // cleanup annotations {newRc(1, 1), nil}, {newRc(1, 1), nil}, + {newRc(1, 1), nil}, }, - `Creating foo-v2 -Updating foo-v1 replicas: 0, foo-v2 replicas: 1 + output: `Creating foo-v2 +Scaling up foo-v2 from 0 to 1, scaling down foo-v1 from 1 to 0 (scale up first by 1 each interval) +Scaling foo-v2 up to 1 +Scaling foo-v1 down to 0 Update succeeded. Deleting foo-v1 `, }, { - oldRc(2), newRc(2, 2), - []fakeResponse{ + oldRc: oldRc(1), + newRc: newRc(1, 1), + accepted: true, + percent: NilPercent, + responses: []fakeResponse{ // no existing newRc {nil, fmt.Errorf("not found")}, - // 4 gets for each scale - {newRc(1, 2), nil}, - {newRc(1, 2), nil}, - {newRc(1, 2), nil}, - {newRc(1, 2), nil}, - {newRc(1, 2), nil}, - {newRc(1, 2), nil}, + // scaling iteration + {newRc(1, 1), nil}, + {oldRc(0), nil}, + // cleanup annotations + {newRc(1, 1), nil}, + {newRc(1, 1), nil}, + {newRc(1, 1), nil}, + }, + output: `Creating foo-v2 +Scaling up foo-v2 from 0 to 1, scaling down foo-v1 from 1 to 0 (scale up first by 1 each interval) +Scaling foo-v2 up to 1 +Scaling foo-v1 down to 0 +Update succeeded. Deleting foo-v1 +`, + }, { + oldRc: oldRc(2), + newRc: newRc(2, 2), + accepted: true, + percent: NilPercent, + responses: []fakeResponse{ + // no existing newRc + {nil, fmt.Errorf("not found")}, + // scaling iteration {newRc(1, 2), nil}, {oldRc(1), nil}, - {oldRc(1), nil}, - {oldRc(1), nil}, - // {oldRc(1), nil}, - {newRc(2, 2), nil}, - {newRc(2, 2), nil}, - {newRc(2, 2), nil}, + // scaling iteration {newRc(2, 2), nil}, {oldRc(0), nil}, - {oldRc(0), nil}, - {oldRc(0), nil}, - // {oldRc(0), nil}, // cleanup annotations {newRc(2, 2), nil}, {newRc(2, 2), nil}, + {newRc(1, 1), nil}, }, - `Creating foo-v2 -Updating foo-v1 replicas: 1, foo-v2 replicas: 1 -Updating foo-v1 replicas: 0, foo-v2 replicas: 2 + output: `Creating foo-v2 +Scaling up foo-v2 from 0 to 2, scaling down foo-v1 from 2 to 0 (scale up first by 1 each interval) +Scaling foo-v2 up to 1 +Scaling foo-v1 down to 1 +Scaling foo-v2 up to 2 +Scaling foo-v1 down to 0 Update succeeded. Deleting foo-v1 `, }, { - oldRc(2), newRc(7, 7), - []fakeResponse{ + oldRc: oldRc(2), + newRc: newRc(7, 7), + accepted: true, + percent: NilPercent, + responses: []fakeResponse{ // no existing newRc {nil, fmt.Errorf("not found")}, - // 4 gets for each scale - {newRc(1, 2), nil}, - {newRc(1, 2), nil}, - {newRc(1, 2), nil}, - {newRc(1, 2), nil}, - {newRc(1, 2), nil}, - {newRc(1, 2), nil}, - {newRc(1, 2), nil}, + // scaling iteration + {newRc(1, 7), nil}, {oldRc(1), nil}, - {oldRc(1), nil}, - {oldRc(1), nil}, - {newRc(2, 2), nil}, - {newRc(2, 2), nil}, - {newRc(2, 2), nil}, - {newRc(2, 2), nil}, - {oldRc(0), nil}, - {oldRc(0), nil}, + // scaling iteration + {newRc(2, 7), nil}, {oldRc(0), nil}, // final scale on newRc {newRc(7, 7), nil}, - {newRc(7, 7), nil}, - {newRc(7, 7), nil}, - {newRc(7, 7), nil}, // cleanup annotations {newRc(7, 7), nil}, {newRc(7, 7), nil}, + {newRc(7, 7), nil}, }, - `Creating foo-v2 -Updating foo-v1 replicas: 1, foo-v2 replicas: 1 -Updating foo-v1 replicas: 0, foo-v2 replicas: 2 -Scaling foo-v2 replicas: 2 -> 7 + output: `Creating foo-v2 +Scaling up foo-v2 from 0 to 7, scaling down foo-v1 from 2 to 0 (scale up first by 1 each interval) +Scaling foo-v2 up to 1 +Scaling foo-v1 down to 1 +Scaling foo-v2 up to 2 +Scaling foo-v1 down to 0 +Scaling foo-v2 up to 7 Update succeeded. Deleting foo-v1 `, }, { - oldRc(7), newRc(2, 2), - []fakeResponse{ + oldRc: oldRc(7), + newRc: newRc(2, 2), + accepted: true, + percent: NilPercent, + responses: []fakeResponse{ // no existing newRc {nil, fmt.Errorf("not found")}, - // 4 gets for each update - {newRc(1, 2), nil}, - {newRc(1, 2), nil}, - {newRc(1, 2), nil}, - {newRc(1, 2), nil}, - {newRc(1, 2), nil}, - {newRc(1, 2), nil}, + // scaling iteration {newRc(1, 2), nil}, {oldRc(6), nil}, - {oldRc(6), nil}, - {oldRc(6), nil}, + // scaling iteration {newRc(2, 2), nil}, - {newRc(2, 2), nil}, - {newRc(2, 2), nil}, - {newRc(2, 2), nil}, - {oldRc(5), nil}, - {oldRc(5), nil}, - {oldRc(5), nil}, - // stop oldRc - {oldRc(0), nil}, - {oldRc(0), nil}, - {oldRc(0), nil}, {oldRc(0), nil}, // cleanup annotations {newRc(2, 2), nil}, {newRc(2, 2), nil}, + {newRc(2, 2), nil}, }, - `Creating foo-v2 -Updating foo-v1 replicas: 6, foo-v2 replicas: 1 -Updating foo-v1 replicas: 5, foo-v2 replicas: 2 -Stopping foo-v1 replicas: 5 -> 0 + output: `Creating foo-v2 +Scaling up foo-v2 from 0 to 2, scaling down foo-v1 from 7 to 0 (scale up first by 1 each interval) +Scaling foo-v2 up to 1 +Scaling foo-v1 down to 6 +Scaling foo-v2 up to 2 +Scaling foo-v1 down to 0 +Update succeeded. Deleting foo-v1 +`, + }, { + oldRc: oldRc(7), + newRc: newRc(2, 2), + accepted: false, + percent: NilPercent, + responses: []fakeResponse{ + // no existing newRc + {nil, fmt.Errorf("not found")}, + // scaling iteration (only up occurs since the update is rejected) + {newRc(1, 2), nil}, + }, + output: `Creating foo-v2 +Scaling up foo-v2 from 0 to 2, scaling down foo-v1 from 7 to 0 (scale up first by 1 each interval) +Scaling foo-v2 up to 1 +`, + }, { + oldRc: oldRc(10), + newRc: newRc(10, 10), + accepted: true, + percent: Percent(20), + responses: []fakeResponse{ + // no existing newRc + {nil, fmt.Errorf("not found")}, + // scaling iteration + {newRc(2, 10), nil}, + {oldRc(8), nil}, + // scaling iteration + {newRc(4, 10), nil}, + {oldRc(6), nil}, + // scaling iteration + {newRc(6, 10), nil}, + {oldRc(4), nil}, + // scaling iteration + {newRc(8, 10), nil}, + {oldRc(2), nil}, + // scaling iteration + {newRc(10, 10), nil}, + {oldRc(0), nil}, + // cleanup annotations + {newRc(10, 10), nil}, + {newRc(10, 10), nil}, + {newRc(10, 10), nil}, + }, + output: `Creating foo-v2 +Scaling up foo-v2 from 0 to 10, scaling down foo-v1 from 10 to 0 (scale up first by 2 each interval) +Scaling foo-v2 up to 2 +Scaling foo-v1 down to 8 +Scaling foo-v2 up to 4 +Scaling foo-v1 down to 6 +Scaling foo-v2 up to 6 +Scaling foo-v1 down to 4 +Scaling foo-v2 up to 8 +Scaling foo-v1 down to 2 +Scaling foo-v2 up to 10 +Scaling foo-v1 down to 0 +Update succeeded. Deleting foo-v1 +`, + }, { + oldRc: oldRc(2), + newRc: newRc(6, 6), + accepted: true, + percent: Percent(50), + responses: []fakeResponse{ + // no existing newRc + {nil, fmt.Errorf("not found")}, + // scaling iteration + {newRc(3, 6), nil}, + {oldRc(0), nil}, + // scaling iteration + {newRc(6, 6), nil}, + // cleanup annotations + {newRc(6, 6), nil}, + {newRc(6, 6), nil}, + {newRc(6, 6), nil}, + }, + output: `Creating foo-v2 +Scaling up foo-v2 from 0 to 6, scaling down foo-v1 from 2 to 0 (scale up first by 3 each interval) +Scaling foo-v2 up to 3 +Scaling foo-v1 down to 0 +Scaling foo-v2 up to 6 +Update succeeded. Deleting foo-v1 +`, + }, { + oldRc: oldRc(10), + newRc: newRc(3, 3), + accepted: true, + percent: Percent(50), + responses: []fakeResponse{ + // no existing newRc + {nil, fmt.Errorf("not found")}, + // scaling iteration + {newRc(2, 3), nil}, + {oldRc(8), nil}, + // scaling iteration + {newRc(3, 3), nil}, + {oldRc(0), nil}, + // cleanup annotations + {newRc(3, 3), nil}, + {newRc(3, 3), nil}, + {newRc(3, 3), nil}, + }, + output: `Creating foo-v2 +Scaling up foo-v2 from 0 to 3, scaling down foo-v1 from 10 to 0 (scale up first by 2 each interval) +Scaling foo-v2 up to 2 +Scaling foo-v1 down to 8 +Scaling foo-v2 up to 3 +Scaling foo-v1 down to 0 +Update succeeded. Deleting foo-v1 +`, + }, { + oldRc: oldRc(4), + newRc: newRc(4, 4), + accepted: true, + percent: Percent(-50), + responses: []fakeResponse{ + // no existing newRc + {nil, fmt.Errorf("not found")}, + // scaling iteration + {oldRc(2), nil}, + {newRc(2, 4), nil}, + // scaling iteration + {oldRc(0), nil}, + {newRc(4, 4), nil}, + // cleanup annotations + {newRc(4, 4), nil}, + {newRc(4, 4), nil}, + {newRc(4, 4), nil}, + }, + output: `Creating foo-v2 +Scaling up foo-v2 from 0 to 4, scaling down foo-v1 from 4 to 0 (scale down first by 2 each interval) +Scaling foo-v1 down to 2 +Scaling foo-v2 up to 2 +Scaling foo-v1 down to 0 +Scaling foo-v2 up to 4 +Update succeeded. Deleting foo-v1 +`, + }, { + oldRc: oldRc(2), + newRc: newRc(4, 4), + accepted: true, + percent: Percent(-50), + responses: []fakeResponse{ + // no existing newRc + {nil, fmt.Errorf("not found")}, + // scaling iteration + {oldRc(0), nil}, + {newRc(4, 4), nil}, + // cleanup annotations + {newRc(4, 4), nil}, + {newRc(4, 4), nil}, + {newRc(4, 4), nil}, + }, + output: `Creating foo-v2 +Scaling up foo-v2 from 0 to 4, scaling down foo-v1 from 2 to 0 (scale down first by 2 each interval) +Scaling foo-v1 down to 0 +Scaling foo-v2 up to 4 +Update succeeded. Deleting foo-v1 +`, + }, { + oldRc: oldRc(4), + newRc: newRc(2, 2), + accepted: true, + percent: Percent(-50), + responses: []fakeResponse{ + // no existing newRc + {nil, fmt.Errorf("not found")}, + // scaling iteration + {oldRc(3), nil}, + {newRc(1, 2), nil}, + // scaling iteration + {oldRc(2), nil}, + {newRc(2, 2), nil}, + // scaling iteration + {oldRc(0), nil}, + // cleanup annotations + {newRc(2, 2), nil}, + {newRc(2, 2), nil}, + {newRc(2, 2), nil}, + }, + output: `Creating foo-v2 +Scaling up foo-v2 from 0 to 2, scaling down foo-v1 from 4 to 0 (scale down first by 1 each interval) +Scaling foo-v1 down to 3 +Scaling foo-v2 up to 1 +Scaling foo-v1 down to 2 +Scaling foo-v2 up to 2 +Scaling foo-v1 down to 0 +Update succeeded. Deleting foo-v1 +`, + }, { + oldRc: oldRc(4), + newRc: newRc(4, 4), + accepted: true, + percent: Percent(-100), + responses: []fakeResponse{ + // no existing newRc + {nil, fmt.Errorf("not found")}, + // scaling iteration + {oldRc(0), nil}, + {newRc(4, 4), nil}, + // cleanup annotations + {newRc(4, 4), nil}, + {newRc(4, 4), nil}, + {newRc(4, 4), nil}, + }, + output: `Creating foo-v2 +Scaling up foo-v2 from 0 to 4, scaling down foo-v1 from 4 to 0 (scale down first by 4 each interval) +Scaling foo-v1 down to 0 +Scaling foo-v2 up to 4 Update succeeded. Deleting foo-v1 `, }, } for _, test := range tests { + client := NewRollingUpdaterClient(fakeClientFor("default", test.responses)) updater := RollingUpdater{ - NewRollingUpdaterClient(fakeClientFor("default", test.responses)), - "default", + c: client, + ns: "default", + scaleAndWait: func(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) { + return client.GetReplicationController(rc.Namespace, rc.Name) + }, } var buffer bytes.Buffer - config := &RollingUpdaterConfig{ - Out: &buffer, - OldRc: test.oldRc, - NewRc: test.newRc, - UpdatePeriod: 0, - Interval: time.Millisecond, - Timeout: time.Millisecond, - CleanupPolicy: DeleteRollingUpdateCleanupPolicy, + acceptor := &testAcceptor{ + accept: func(rc *api.ReplicationController) error { + if test.accepted { + return nil + } + return fmt.Errorf("rejecting controller %s", rc.Name) + }, } - if err := updater.Update(config); err != nil { + config := &RollingUpdaterConfig{ + Out: &buffer, + OldRc: test.oldRc, + NewRc: test.newRc, + UpdatePeriod: 0, + Interval: time.Millisecond, + Timeout: time.Millisecond, + CleanupPolicy: DeleteRollingUpdateCleanupPolicy, + UpdateAcceptor: acceptor, + UpdatePercent: test.percent, + } + err := updater.Update(config) + if test.accepted && err != nil { t.Errorf("Update failed: %v", err) } + if !test.accepted && err == nil { + t.Errorf("Expected update to fail") + } if buffer.String() != test.output { t.Errorf("Bad output. expected:\n%s\ngot:\n%s", test.output, buffer.String()) } @@ -304,41 +534,47 @@ func PTestUpdateRecovery(t *testing.T) { rcExisting := newRc(1, 3) output := `Continuing update with existing controller foo-v2. -Updating foo-v1 replicas: 1, foo-v2 replicas: 2 -Updating foo-v1 replicas: 0, foo-v2 replicas: 3 +Scaling up foo-v2 from 1 to 3, scaling down foo-v1 from 2 to 0 (scale up first by 1 each interval) +Scaling foo-v2 to 2 +Scaling foo-v1 to 1 +Scaling foo-v2 to 3 +Scaling foo-v2 to 0 Update succeeded. Deleting foo-v1 ` responses := []fakeResponse{ // Existing newRc {rcExisting, nil}, - // 3 gets for each scale - {newRc(2, 2), nil}, - {newRc(2, 2), nil}, + // scaling iteration {newRc(2, 2), nil}, {oldRc(1), nil}, - {oldRc(1), nil}, - {oldRc(1), nil}, + // scaling iteration {newRc(3, 3), nil}, - {newRc(3, 3), nil}, - {newRc(3, 3), nil}, - {oldRc(0), nil}, - {oldRc(0), nil}, {oldRc(0), nil}, // cleanup annotations {newRc(3, 3), nil}, {newRc(3, 3), nil}, + {newRc(3, 3), nil}, + } + + client := NewRollingUpdaterClient(fakeClientFor("default", responses)) + updater := RollingUpdater{ + c: client, + ns: "default", + scaleAndWait: func(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) { + return client.GetReplicationController(rc.Namespace, rc.Name) + }, } - updater := RollingUpdater{NewRollingUpdaterClient(fakeClientFor("default", responses)), "default"} var buffer bytes.Buffer config := &RollingUpdaterConfig{ - Out: &buffer, - OldRc: rc, - NewRc: rcExisting, - UpdatePeriod: 0, - Interval: time.Millisecond, - Timeout: time.Millisecond, - CleanupPolicy: DeleteRollingUpdateCleanupPolicy, + Out: &buffer, + OldRc: rc, + NewRc: rcExisting, + UpdatePeriod: 0, + Interval: time.Millisecond, + Timeout: time.Millisecond, + CleanupPolicy: DeleteRollingUpdateCleanupPolicy, + UpdateAcceptor: DefaultUpdateAcceptor, } if err := updater.Update(config); err != nil { t.Errorf("Update failed: %v", err) @@ -354,46 +590,49 @@ func TestRollingUpdater_preserveCleanup(t *testing.T) { rc := oldRc(2) rcExisting := newRc(1, 3) - updater := &RollingUpdater{ - ns: "default", - c: &rollingUpdaterClientImpl{ - GetReplicationControllerFn: func(namespace, name string) (*api.ReplicationController, error) { - switch name { - case rc.Name: - return rc, nil - case rcExisting.Name: - return rcExisting, nil - default: - return nil, fmt.Errorf("unexpected get call for %s/%s", namespace, name) - } - }, - UpdateReplicationControllerFn: func(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) { + client := &rollingUpdaterClientImpl{ + GetReplicationControllerFn: func(namespace, name string) (*api.ReplicationController, error) { + switch name { + case rc.Name: return rc, nil - }, - CreateReplicationControllerFn: func(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) { - t.Fatalf("unexpected call to create %s/rc:%#v", namespace, rc) - return nil, nil - }, - DeleteReplicationControllerFn: func(namespace, name string) error { - t.Fatalf("unexpected call to delete %s/%s", namespace, name) - return nil - }, - ControllerHasDesiredReplicasFn: func(rc *api.ReplicationController) wait.ConditionFunc { - return func() (done bool, err error) { - return true, nil - } - }, + case rcExisting.Name: + return rcExisting, nil + default: + return nil, fmt.Errorf("unexpected get call for %s/%s", namespace, name) + } }, + UpdateReplicationControllerFn: func(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) { + return rc, nil + }, + CreateReplicationControllerFn: func(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) { + t.Fatalf("unexpected call to create %s/rc:%#v", namespace, rc) + return nil, nil + }, + DeleteReplicationControllerFn: func(namespace, name string) error { + t.Fatalf("unexpected call to delete %s/%s", namespace, name) + return nil + }, + ControllerHasDesiredReplicasFn: func(rc *api.ReplicationController) wait.ConditionFunc { + return func() (done bool, err error) { + return true, nil + } + }, + } + updater := &RollingUpdater{ + ns: "default", + c: client, + scaleAndWait: scalerScaleAndWait(client, "default"), } config := &RollingUpdaterConfig{ - Out: ioutil.Discard, - OldRc: rc, - NewRc: rcExisting, - UpdatePeriod: 0, - Interval: time.Millisecond, - Timeout: time.Millisecond, - CleanupPolicy: PreserveRollingUpdateCleanupPolicy, + Out: ioutil.Discard, + OldRc: rc, + NewRc: rcExisting, + UpdatePeriod: 0, + Interval: time.Millisecond, + Timeout: time.Millisecond, + CleanupPolicy: PreserveRollingUpdateCleanupPolicy, + UpdateAcceptor: DefaultUpdateAcceptor, } err := updater.Update(config) if err != nil { @@ -855,3 +1094,11 @@ func (c *rollingUpdaterClientImpl) DeleteReplicationController(namespace, name s func (c *rollingUpdaterClientImpl) ControllerHasDesiredReplicas(rc *api.ReplicationController) wait.ConditionFunc { return c.ControllerHasDesiredReplicasFn(rc) } + +type testAcceptor struct { + accept func(*api.ReplicationController) error +} + +func (a *testAcceptor) Accept(rc *api.ReplicationController) error { + return a.accept(rc) +}