Rolling updater enhancements

Add an UpdateAcceptor interface to the rolling updater which supports
injecting code to validate the first replica during scale-up. If the
replica is not accepted, the deployment fails. This facilitates canary
checking so that many broken replicas aren't rolled out during an update.

Make the rolling update scale amount configurable as a percent of the replica
count; a negative value changes the scale direction to down/up to support
in-place deployments.
This commit is contained in:
Dan Mace 2015-06-18 16:29:28 -04:00
parent 7645513d2a
commit d044208b38
3 changed files with 546 additions and 208 deletions

View File

@ -265,6 +265,7 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg
Interval: interval, Interval: interval,
Timeout: timeout, Timeout: timeout,
CleanupPolicy: updateCleanupPolicy, CleanupPolicy: updateCleanupPolicy,
UpdateAcceptor: kubectl.DefaultUpdateAcceptor,
} }
if cmdutil.GetFlagBool(cmd, "rollback") { if cmdutil.GetFlagBool(cmd, "rollback") {
kubectl.AbortRollingUpdate(config) kubectl.AbortRollingUpdate(config)

View File

@ -20,6 +20,7 @@ import (
goerrors "errors" goerrors "errors"
"fmt" "fmt"
"io" "io"
"math"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -39,8 +40,14 @@ type RollingUpdater struct {
c RollingUpdaterClient c RollingUpdaterClient
// Namespace for resources // Namespace for resources
ns string ns string
// scaleAndWait scales a controller and returns its updated state.
scaleAndWait scaleAndWait
} }
// scaleAndWait scales rc and returns its updated state. This typedef is to
// abstract away the use of a Scaler to ease testing.
type scaleAndWait func(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error)
// RollingUpdaterConfig is the configuration for a rolling deployment process. // RollingUpdaterConfig is the configuration for a rolling deployment process.
type RollingUpdaterConfig struct { type RollingUpdaterConfig struct {
// Out is a writer for progress output. // Out is a writer for progress output.
@ -60,6 +67,16 @@ type RollingUpdaterConfig struct {
// CleanupPolicy defines the cleanup action to take after the deployment is // CleanupPolicy defines the cleanup action to take after the deployment is
// complete. // complete.
CleanupPolicy RollingUpdaterCleanupPolicy CleanupPolicy RollingUpdaterCleanupPolicy
// UpdateAcceptor is given a chance to accept the new controller after each
// scale-up operation. If the controller is accepted, updates continue; if
// the controller is rejected, the update will fail immediately.
UpdateAcceptor UpdateAcceptor
// UpdatePercent is optional; if specified, the amount of replicas scaled up
// and down each interval will be computed as a percentage of the desired
// replicas for the new RC. If UpdatePercent is nil, one replica will be
// scaled up and down each interval. If UpdatePercent is negative, the order
// of scaling will be down/up instead of up/down.
UpdatePercent *int
} }
// RollingUpdaterCleanupPolicy is a cleanup action to take after the // RollingUpdaterCleanupPolicy is a cleanup action to take after the
@ -76,6 +93,26 @@ const (
RenameRollingUpdateCleanupPolicy RollingUpdaterCleanupPolicy = "Rename" RenameRollingUpdateCleanupPolicy RollingUpdaterCleanupPolicy = "Rename"
) )
// UpdateAcceptor is given a chance to accept or reject the new controller
// during a deployment each time the controller is scaled up.
//
// After the successful scale-up of the controller, the controller is given to
// the UpdateAcceptor. If the UpdateAcceptor rejects the controller, the
// deployment is stopped with an error.
type UpdateAcceptor interface {
// Accept returns nil if the controller is okay, otherwise returns an error.
Accept(*api.ReplicationController) error
}
// AlwaysAccept is an UpdateAcceptor which always accepts the controller.
type AlwaysAccept struct{}
// Accept implements UpdateAcceptor.
func (a *AlwaysAccept) Accept(*api.ReplicationController) error { return nil }
// DefaultUpdaterAcceptor always accepts controllers.
var DefaultUpdateAcceptor UpdateAcceptor = &AlwaysAccept{}
func LoadExistingNextReplicationController(c *client.Client, namespace, newName string) (*api.ReplicationController, error) { func LoadExistingNextReplicationController(c *client.Client, namespace, newName string) (*api.ReplicationController, error) {
if len(newName) == 0 { if len(newName) == 0 {
return nil, nil return nil, nil
@ -121,10 +158,12 @@ func CreateNewControllerFromCurrentController(c *client.Client, namespace, oldNa
} }
// NewRollingUpdater creates a RollingUpdater from a client // NewRollingUpdater creates a RollingUpdater from a client
func NewRollingUpdater(namespace string, c RollingUpdaterClient) *RollingUpdater { func NewRollingUpdater(namespace string, client RollingUpdaterClient) *RollingUpdater {
return &RollingUpdater{ return &RollingUpdater{
c, c: client,
namespace, ns: namespace,
// Use a real scaleAndWait implementation.
scaleAndWait: scalerScaleAndWait(client, namespace),
} }
} }
@ -175,6 +214,21 @@ func UpdateExistingReplicationController(c client.Interface, oldRc *api.Replicat
} }
} }
// scalerScaleAndWait returns a scaleAndWait function which scales a
// controller using a Scaler and a real client.
func scalerScaleAndWait(client RollingUpdaterClient, namespace string) scaleAndWait {
scaler, err := ScalerFor("ReplicationController", client)
return func(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) {
if err != nil {
return nil, fmt.Errorf("Couldn't make scaler: %s", err)
}
if err := scaler.Scale(rc.Namespace, rc.Name, uint(rc.Spec.Replicas), &ScalePrecondition{-1, ""}, retry, wait); err != nil {
return nil, err
}
return client.GetReplicationController(namespace, rc.ObjectMeta.Name)
}
}
const MaxRetries = 3 const MaxRetries = 3
func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, client client.Interface, deploymentKey, deploymentValue, namespace string, out io.Writer) (*api.ReplicationController, error) { func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, client client.Interface, deploymentKey, deploymentValue, namespace string, out io.Writer) (*api.ReplicationController, error) {
@ -297,10 +351,14 @@ func FindSourceController(r RollingUpdaterClient, namespace, name string) (*api.
} }
// Update all pods for a ReplicationController (oldRc) by creating a new // Update all pods for a ReplicationController (oldRc) by creating a new
// controller (newRc) with 0 replicas, and synchronously scaling oldRc,newRc // controller (newRc) with 0 replicas, and synchronously scaling oldRc and
// by 1 until oldRc has 0 replicas and newRc has the original # of desired // newRc until oldRc has 0 replicas and newRc has the original # of desired
// replicas. Cleanup occurs based on a RollingUpdaterCleanupPolicy. // replicas. Cleanup occurs based on a RollingUpdaterCleanupPolicy.
// //
// The scaling amount each interval is either 1 or based on a percent of the
// desired replicas. If a percentage is used and the percentage is negative,
// the scaling order is inverted to down/up instead of the default up/down.
//
// If an update from newRc to oldRc is already in progress, we attempt to // If an update from newRc to oldRc is already in progress, we attempt to
// drive it to completion. If an error occurs at any step of the update, the // drive it to completion. If an error occurs at any step of the update, the
// error will be returned. // error will be returned.
@ -353,51 +411,50 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error {
} }
} }
// +1, -1 on oldRc, newRc until newRc has desired number of replicas or oldRc has 0 replicas // Compute the scale amount based on a percentage of the new desired count.
for newRc.Spec.Replicas < desired && oldRc.Spec.Replicas != 0 { // A negative percentage indicates our scale direction should be down-first.
newRc.Spec.Replicas += 1 scaleAmount := 1
oldRc.Spec.Replicas -= 1 skipFirstUp := false
fmt.Printf("At beginning of loop: %s replicas: %d, %s replicas: %d\n", if config.UpdatePercent != nil {
oldName, oldRc.Spec.Replicas, scaleAmount = int(math.Ceil(float64(desired) * (math.Abs(float64(*config.UpdatePercent)) / 100)))
newName, newRc.Spec.Replicas) if *config.UpdatePercent < 0 {
fmt.Fprintf(out, "Updating %s replicas: %d, %s replicas: %d\n", skipFirstUp = true
oldName, oldRc.Spec.Replicas, }
newName, newRc.Spec.Replicas) }
// Helpful output about what we're about to do.
direction := "up"
if skipFirstUp {
direction = "down"
}
fmt.Fprintf(out, "Scaling up %s from %d to %d, scaling down %s from %d to 0 (scale %s first by %d each interval)\n",
newRc.Name, newRc.Spec.Replicas, desired, oldRc.Name, oldRc.Spec.Replicas, direction, scaleAmount)
newRc, err = r.scaleAndWait(newRc, retry, waitForReplicas) // Scale newRc and oldRc until newRc has the desired number of replicas and
// oldRc has 0 replicas.
for newRc.Spec.Replicas != desired || oldRc.Spec.Replicas != 0 {
// Choose up/down vs. down/up scaling direction.
if !skipFirstUp {
scaledRc, err := r.scaleUp(newRc, oldRc, desired, scaleAmount, retry, waitForReplicas, out, config)
if err != nil { if err != nil {
return err return err
} }
newRc = scaledRc
time.Sleep(updatePeriod) time.Sleep(updatePeriod)
oldRc, err = r.scaleAndWait(oldRc, retry, waitForReplicas) skipFirstUp = true
}
scaledRc, err := r.scaleDown(newRc, oldRc, desired, scaleAmount, retry, waitForReplicas, out, config)
if err != nil { if err != nil {
return err return err
} }
fmt.Printf("At end of loop: %s replicas: %d, %s replicas: %d\n", rc = scaledRc
oldName, oldRc.Spec.Replicas, time.Sleep(updatePeriod)
newName, newRc.Spec.Replicas) scaledRc, err = r.scaleUp(newRc, oldRc, desired, scaleAmount, retry, waitForReplicas, out, config)
}
// delete remaining replicas on oldRc
if oldRc.Spec.Replicas != 0 {
fmt.Fprintf(out, "Stopping %s replicas: %d -> %d\n",
oldName, oldRc.Spec.Replicas, 0)
oldRc.Spec.Replicas = 0
oldRc, err = r.scaleAndWait(oldRc, retry, waitForReplicas)
// oldRc, err = r.scaleAndWait(oldRc, interval, timeout)
if err != nil {
return err
}
}
// add remaining replicas on newRc
if newRc.Spec.Replicas != desired {
fmt.Fprintf(out, "Scaling %s replicas: %d -> %d\n",
newName, newRc.Spec.Replicas, desired)
newRc.Spec.Replicas = desired
newRc, err = r.scaleAndWait(newRc, retry, waitForReplicas)
if err != nil { if err != nil {
return err return err
} }
newRc = scaledRc
} }
// Clean up annotations // Clean up annotations
if newRc, err = r.c.GetReplicationController(r.ns, newName); err != nil { if newRc, err = r.c.GetReplicationController(r.ns, newName); err != nil {
return err return err
@ -429,6 +486,50 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error {
} }
} }
// scaleUp scales up newRc to desired by scaleAmount. It accounts for
// fencepost conditions. If newRc is already scaled to desired, scaleUp does
// nothing. If the oldRc is already scaled to 0, newRc is scaled to desired
// immediately regardless of scale count.
func (r *RollingUpdater) scaleUp(newRc, oldRc *api.ReplicationController, desired, scaleAmount int, retry, wait *RetryParams, out io.Writer, config *RollingUpdaterConfig) (*api.ReplicationController, error) {
if newRc.Spec.Replicas == desired {
return newRc, nil
}
newRc.Spec.Replicas += scaleAmount
if newRc.Spec.Replicas > desired || oldRc.Spec.Replicas == 0 {
newRc.Spec.Replicas = desired
}
fmt.Fprintf(out, "Scaling %s up to %d\n", newRc.Name, newRc.Spec.Replicas)
scaledRc, err := r.scaleAndWait(newRc, retry, wait)
if err != nil {
return nil, err
}
err = config.UpdateAcceptor.Accept(scaledRc)
if err != nil {
return nil, fmt.Errorf("update rejected for %s: %v", scaledRc.Name, err)
}
return scaledRc, nil
}
// scaleDown scales down oldRc to 0 by scaleAmount. It accounts for fencepost
// conditions. If oldRc is already scaled to 0, scaleDown does nothing. If
// newRc is already scaled to desired, oldRc is scaled to 0 immediately
// regardless of scaleAmount.
func (r *RollingUpdater) scaleDown(newRc, oldRc *api.ReplicationController, desired, scaleAmount int, retry, wait *RetryParams, out io.Writer, config *RollingUpdaterConfig) (*api.ReplicationController, error) {
if oldRc.Spec.Replicas == 0 {
return oldRc, nil
}
oldRc.Spec.Replicas -= scaleAmount
if oldRc.Spec.Replicas < 0 || newRc.Spec.Replicas == desired {
oldRc.Spec.Replicas = 0
}
fmt.Fprintf(out, "Scaling %s down to %d\n", oldRc.Name, oldRc.Spec.Replicas)
scaledRc, err := r.scaleAndWait(oldRc, retry, wait)
if err != nil {
return nil, err
}
return scaledRc, nil
}
func (r *RollingUpdater) getExistingNewRc(sourceId, name string) (rc *api.ReplicationController, existing bool, err error) { func (r *RollingUpdater) getExistingNewRc(sourceId, name string) (rc *api.ReplicationController, existing bool, err error) {
if rc, err = r.c.GetReplicationController(r.ns, name); err == nil { if rc, err = r.c.GetReplicationController(r.ns, name); err == nil {
existing = true existing = true
@ -444,17 +545,6 @@ func (r *RollingUpdater) getExistingNewRc(sourceId, name string) (rc *api.Replic
return return
} }
func (r *RollingUpdater) scaleAndWait(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) {
scaler, err := ScalerFor("ReplicationController", r.c)
if err != nil {
return nil, err
}
if err := scaler.Scale(rc.Namespace, rc.Name, uint(rc.Spec.Replicas), &ScalePrecondition{-1, ""}, retry, wait); err != nil {
return nil, err
}
return r.c.GetReplicationController(r.ns, rc.ObjectMeta.Name)
}
func (r *RollingUpdater) updateAndWait(rc *api.ReplicationController, interval, timeout time.Duration) (*api.ReplicationController, error) { func (r *RollingUpdater) updateAndWait(rc *api.ReplicationController, interval, timeout time.Duration) (*api.ReplicationController, error) {
rc, err := r.c.UpdateReplicationController(r.ns, rc) rc, err := r.c.UpdateReplicationController(r.ns, rc)
if err != nil { if err != nil {

View File

@ -132,154 +132,378 @@ func newRc(replicas int, desired int) *api.ReplicationController {
} }
func TestUpdate(t *testing.T) { func TestUpdate(t *testing.T) {
// Helpers
Percent := func(p int) *int {
return &p
}
var NilPercent *int
// Scenarios
tests := []struct { tests := []struct {
oldRc, newRc *api.ReplicationController oldRc, newRc *api.ReplicationController
accepted bool
percent *int
responses []fakeResponse responses []fakeResponse
output string output string
}{ }{
{ {
oldRc(1), newRc(1, 1), oldRc: oldRc(1),
[]fakeResponse{ newRc: newRc(1, 1),
accepted: true,
percent: NilPercent,
responses: []fakeResponse{
// no existing newRc // no existing newRc
{nil, fmt.Errorf("not found")}, {nil, fmt.Errorf("not found")},
// 4 gets for each scale // scaling iteration
{newRc(1, 1), nil},
{newRc(1, 1), nil},
{newRc(1, 1), nil},
{newRc(1, 1), nil},
{newRc(1, 1), nil},
{newRc(1, 1), nil}, {newRc(1, 1), nil},
{oldRc(0), nil}, {oldRc(0), nil},
{oldRc(0), nil},
{oldRc(0), nil},
// {oldRc(0), nil},
// cleanup annotations // cleanup annotations
{newRc(1, 1), nil}, {newRc(1, 1), nil},
{newRc(1, 1), nil}, {newRc(1, 1), nil},
{newRc(1, 1), nil},
}, },
`Creating foo-v2 output: `Creating foo-v2
Updating foo-v1 replicas: 0, foo-v2 replicas: 1 Scaling up foo-v2 from 0 to 1, scaling down foo-v1 from 1 to 0 (scale up first by 1 each interval)
Scaling foo-v2 up to 1
Scaling foo-v1 down to 0
Update succeeded. Deleting foo-v1 Update succeeded. Deleting foo-v1
`, `,
}, { }, {
oldRc(2), newRc(2, 2), oldRc: oldRc(1),
[]fakeResponse{ newRc: newRc(1, 1),
accepted: true,
percent: NilPercent,
responses: []fakeResponse{
// no existing newRc // no existing newRc
{nil, fmt.Errorf("not found")}, {nil, fmt.Errorf("not found")},
// 4 gets for each scale // scaling iteration
{newRc(1, 2), nil}, {newRc(1, 1), nil},
{newRc(1, 2), nil}, {oldRc(0), nil},
{newRc(1, 2), nil}, // cleanup annotations
{newRc(1, 2), nil}, {newRc(1, 1), nil},
{newRc(1, 2), nil}, {newRc(1, 1), nil},
{newRc(1, 2), nil}, {newRc(1, 1), nil},
},
output: `Creating foo-v2
Scaling up foo-v2 from 0 to 1, scaling down foo-v1 from 1 to 0 (scale up first by 1 each interval)
Scaling foo-v2 up to 1
Scaling foo-v1 down to 0
Update succeeded. Deleting foo-v1
`,
}, {
oldRc: oldRc(2),
newRc: newRc(2, 2),
accepted: true,
percent: NilPercent,
responses: []fakeResponse{
// no existing newRc
{nil, fmt.Errorf("not found")},
// scaling iteration
{newRc(1, 2), nil}, {newRc(1, 2), nil},
{oldRc(1), nil}, {oldRc(1), nil},
{oldRc(1), nil}, // scaling iteration
{oldRc(1), nil},
// {oldRc(1), nil},
{newRc(2, 2), nil},
{newRc(2, 2), nil},
{newRc(2, 2), nil},
{newRc(2, 2), nil}, {newRc(2, 2), nil},
{oldRc(0), nil}, {oldRc(0), nil},
{oldRc(0), nil},
{oldRc(0), nil},
// {oldRc(0), nil},
// cleanup annotations // cleanup annotations
{newRc(2, 2), nil}, {newRc(2, 2), nil},
{newRc(2, 2), nil}, {newRc(2, 2), nil},
{newRc(1, 1), nil},
}, },
`Creating foo-v2 output: `Creating foo-v2
Updating foo-v1 replicas: 1, foo-v2 replicas: 1 Scaling up foo-v2 from 0 to 2, scaling down foo-v1 from 2 to 0 (scale up first by 1 each interval)
Updating foo-v1 replicas: 0, foo-v2 replicas: 2 Scaling foo-v2 up to 1
Scaling foo-v1 down to 1
Scaling foo-v2 up to 2
Scaling foo-v1 down to 0
Update succeeded. Deleting foo-v1 Update succeeded. Deleting foo-v1
`, `,
}, { }, {
oldRc(2), newRc(7, 7), oldRc: oldRc(2),
[]fakeResponse{ newRc: newRc(7, 7),
accepted: true,
percent: NilPercent,
responses: []fakeResponse{
// no existing newRc // no existing newRc
{nil, fmt.Errorf("not found")}, {nil, fmt.Errorf("not found")},
// 4 gets for each scale // scaling iteration
{newRc(1, 2), nil}, {newRc(1, 7), nil},
{newRc(1, 2), nil},
{newRc(1, 2), nil},
{newRc(1, 2), nil},
{newRc(1, 2), nil},
{newRc(1, 2), nil},
{newRc(1, 2), nil},
{oldRc(1), nil}, {oldRc(1), nil},
{oldRc(1), nil}, // scaling iteration
{oldRc(1), nil}, {newRc(2, 7), nil},
{newRc(2, 2), nil},
{newRc(2, 2), nil},
{newRc(2, 2), nil},
{newRc(2, 2), nil},
{oldRc(0), nil},
{oldRc(0), nil},
{oldRc(0), nil}, {oldRc(0), nil},
// final scale on newRc // final scale on newRc
{newRc(7, 7), nil}, {newRc(7, 7), nil},
{newRc(7, 7), nil},
{newRc(7, 7), nil},
{newRc(7, 7), nil},
// cleanup annotations // cleanup annotations
{newRc(7, 7), nil}, {newRc(7, 7), nil},
{newRc(7, 7), nil}, {newRc(7, 7), nil},
{newRc(7, 7), nil},
}, },
`Creating foo-v2 output: `Creating foo-v2
Updating foo-v1 replicas: 1, foo-v2 replicas: 1 Scaling up foo-v2 from 0 to 7, scaling down foo-v1 from 2 to 0 (scale up first by 1 each interval)
Updating foo-v1 replicas: 0, foo-v2 replicas: 2 Scaling foo-v2 up to 1
Scaling foo-v2 replicas: 2 -> 7 Scaling foo-v1 down to 1
Scaling foo-v2 up to 2
Scaling foo-v1 down to 0
Scaling foo-v2 up to 7
Update succeeded. Deleting foo-v1 Update succeeded. Deleting foo-v1
`, `,
}, { }, {
oldRc(7), newRc(2, 2), oldRc: oldRc(7),
[]fakeResponse{ newRc: newRc(2, 2),
accepted: true,
percent: NilPercent,
responses: []fakeResponse{
// no existing newRc // no existing newRc
{nil, fmt.Errorf("not found")}, {nil, fmt.Errorf("not found")},
// 4 gets for each update // scaling iteration
{newRc(1, 2), nil},
{newRc(1, 2), nil},
{newRc(1, 2), nil},
{newRc(1, 2), nil},
{newRc(1, 2), nil},
{newRc(1, 2), nil},
{newRc(1, 2), nil}, {newRc(1, 2), nil},
{oldRc(6), nil}, {oldRc(6), nil},
{oldRc(6), nil}, // scaling iteration
{oldRc(6), nil},
{newRc(2, 2), nil}, {newRc(2, 2), nil},
{newRc(2, 2), nil},
{newRc(2, 2), nil},
{newRc(2, 2), nil},
{oldRc(5), nil},
{oldRc(5), nil},
{oldRc(5), nil},
// stop oldRc
{oldRc(0), nil},
{oldRc(0), nil},
{oldRc(0), nil},
{oldRc(0), nil}, {oldRc(0), nil},
// cleanup annotations // cleanup annotations
{newRc(2, 2), nil}, {newRc(2, 2), nil},
{newRc(2, 2), nil}, {newRc(2, 2), nil},
{newRc(2, 2), nil},
}, },
`Creating foo-v2 output: `Creating foo-v2
Updating foo-v1 replicas: 6, foo-v2 replicas: 1 Scaling up foo-v2 from 0 to 2, scaling down foo-v1 from 7 to 0 (scale up first by 1 each interval)
Updating foo-v1 replicas: 5, foo-v2 replicas: 2 Scaling foo-v2 up to 1
Stopping foo-v1 replicas: 5 -> 0 Scaling foo-v1 down to 6
Scaling foo-v2 up to 2
Scaling foo-v1 down to 0
Update succeeded. Deleting foo-v1
`,
}, {
oldRc: oldRc(7),
newRc: newRc(2, 2),
accepted: false,
percent: NilPercent,
responses: []fakeResponse{
// no existing newRc
{nil, fmt.Errorf("not found")},
// scaling iteration (only up occurs since the update is rejected)
{newRc(1, 2), nil},
},
output: `Creating foo-v2
Scaling up foo-v2 from 0 to 2, scaling down foo-v1 from 7 to 0 (scale up first by 1 each interval)
Scaling foo-v2 up to 1
`,
}, {
oldRc: oldRc(10),
newRc: newRc(10, 10),
accepted: true,
percent: Percent(20),
responses: []fakeResponse{
// no existing newRc
{nil, fmt.Errorf("not found")},
// scaling iteration
{newRc(2, 10), nil},
{oldRc(8), nil},
// scaling iteration
{newRc(4, 10), nil},
{oldRc(6), nil},
// scaling iteration
{newRc(6, 10), nil},
{oldRc(4), nil},
// scaling iteration
{newRc(8, 10), nil},
{oldRc(2), nil},
// scaling iteration
{newRc(10, 10), nil},
{oldRc(0), nil},
// cleanup annotations
{newRc(10, 10), nil},
{newRc(10, 10), nil},
{newRc(10, 10), nil},
},
output: `Creating foo-v2
Scaling up foo-v2 from 0 to 10, scaling down foo-v1 from 10 to 0 (scale up first by 2 each interval)
Scaling foo-v2 up to 2
Scaling foo-v1 down to 8
Scaling foo-v2 up to 4
Scaling foo-v1 down to 6
Scaling foo-v2 up to 6
Scaling foo-v1 down to 4
Scaling foo-v2 up to 8
Scaling foo-v1 down to 2
Scaling foo-v2 up to 10
Scaling foo-v1 down to 0
Update succeeded. Deleting foo-v1
`,
}, {
oldRc: oldRc(2),
newRc: newRc(6, 6),
accepted: true,
percent: Percent(50),
responses: []fakeResponse{
// no existing newRc
{nil, fmt.Errorf("not found")},
// scaling iteration
{newRc(3, 6), nil},
{oldRc(0), nil},
// scaling iteration
{newRc(6, 6), nil},
// cleanup annotations
{newRc(6, 6), nil},
{newRc(6, 6), nil},
{newRc(6, 6), nil},
},
output: `Creating foo-v2
Scaling up foo-v2 from 0 to 6, scaling down foo-v1 from 2 to 0 (scale up first by 3 each interval)
Scaling foo-v2 up to 3
Scaling foo-v1 down to 0
Scaling foo-v2 up to 6
Update succeeded. Deleting foo-v1
`,
}, {
oldRc: oldRc(10),
newRc: newRc(3, 3),
accepted: true,
percent: Percent(50),
responses: []fakeResponse{
// no existing newRc
{nil, fmt.Errorf("not found")},
// scaling iteration
{newRc(2, 3), nil},
{oldRc(8), nil},
// scaling iteration
{newRc(3, 3), nil},
{oldRc(0), nil},
// cleanup annotations
{newRc(3, 3), nil},
{newRc(3, 3), nil},
{newRc(3, 3), nil},
},
output: `Creating foo-v2
Scaling up foo-v2 from 0 to 3, scaling down foo-v1 from 10 to 0 (scale up first by 2 each interval)
Scaling foo-v2 up to 2
Scaling foo-v1 down to 8
Scaling foo-v2 up to 3
Scaling foo-v1 down to 0
Update succeeded. Deleting foo-v1
`,
}, {
oldRc: oldRc(4),
newRc: newRc(4, 4),
accepted: true,
percent: Percent(-50),
responses: []fakeResponse{
// no existing newRc
{nil, fmt.Errorf("not found")},
// scaling iteration
{oldRc(2), nil},
{newRc(2, 4), nil},
// scaling iteration
{oldRc(0), nil},
{newRc(4, 4), nil},
// cleanup annotations
{newRc(4, 4), nil},
{newRc(4, 4), nil},
{newRc(4, 4), nil},
},
output: `Creating foo-v2
Scaling up foo-v2 from 0 to 4, scaling down foo-v1 from 4 to 0 (scale down first by 2 each interval)
Scaling foo-v1 down to 2
Scaling foo-v2 up to 2
Scaling foo-v1 down to 0
Scaling foo-v2 up to 4
Update succeeded. Deleting foo-v1
`,
}, {
oldRc: oldRc(2),
newRc: newRc(4, 4),
accepted: true,
percent: Percent(-50),
responses: []fakeResponse{
// no existing newRc
{nil, fmt.Errorf("not found")},
// scaling iteration
{oldRc(0), nil},
{newRc(4, 4), nil},
// cleanup annotations
{newRc(4, 4), nil},
{newRc(4, 4), nil},
{newRc(4, 4), nil},
},
output: `Creating foo-v2
Scaling up foo-v2 from 0 to 4, scaling down foo-v1 from 2 to 0 (scale down first by 2 each interval)
Scaling foo-v1 down to 0
Scaling foo-v2 up to 4
Update succeeded. Deleting foo-v1
`,
}, {
oldRc: oldRc(4),
newRc: newRc(2, 2),
accepted: true,
percent: Percent(-50),
responses: []fakeResponse{
// no existing newRc
{nil, fmt.Errorf("not found")},
// scaling iteration
{oldRc(3), nil},
{newRc(1, 2), nil},
// scaling iteration
{oldRc(2), nil},
{newRc(2, 2), nil},
// scaling iteration
{oldRc(0), nil},
// cleanup annotations
{newRc(2, 2), nil},
{newRc(2, 2), nil},
{newRc(2, 2), nil},
},
output: `Creating foo-v2
Scaling up foo-v2 from 0 to 2, scaling down foo-v1 from 4 to 0 (scale down first by 1 each interval)
Scaling foo-v1 down to 3
Scaling foo-v2 up to 1
Scaling foo-v1 down to 2
Scaling foo-v2 up to 2
Scaling foo-v1 down to 0
Update succeeded. Deleting foo-v1
`,
}, {
oldRc: oldRc(4),
newRc: newRc(4, 4),
accepted: true,
percent: Percent(-100),
responses: []fakeResponse{
// no existing newRc
{nil, fmt.Errorf("not found")},
// scaling iteration
{oldRc(0), nil},
{newRc(4, 4), nil},
// cleanup annotations
{newRc(4, 4), nil},
{newRc(4, 4), nil},
{newRc(4, 4), nil},
},
output: `Creating foo-v2
Scaling up foo-v2 from 0 to 4, scaling down foo-v1 from 4 to 0 (scale down first by 4 each interval)
Scaling foo-v1 down to 0
Scaling foo-v2 up to 4
Update succeeded. Deleting foo-v1 Update succeeded. Deleting foo-v1
`, `,
}, },
} }
for _, test := range tests { for _, test := range tests {
client := NewRollingUpdaterClient(fakeClientFor("default", test.responses))
updater := RollingUpdater{ updater := RollingUpdater{
NewRollingUpdaterClient(fakeClientFor("default", test.responses)), c: client,
"default", ns: "default",
scaleAndWait: func(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) {
return client.GetReplicationController(rc.Namespace, rc.Name)
},
} }
var buffer bytes.Buffer var buffer bytes.Buffer
acceptor := &testAcceptor{
accept: func(rc *api.ReplicationController) error {
if test.accepted {
return nil
}
return fmt.Errorf("rejecting controller %s", rc.Name)
},
}
config := &RollingUpdaterConfig{ config := &RollingUpdaterConfig{
Out: &buffer, Out: &buffer,
OldRc: test.oldRc, OldRc: test.oldRc,
@ -288,10 +512,16 @@ Update succeeded. Deleting foo-v1
Interval: time.Millisecond, Interval: time.Millisecond,
Timeout: time.Millisecond, Timeout: time.Millisecond,
CleanupPolicy: DeleteRollingUpdateCleanupPolicy, CleanupPolicy: DeleteRollingUpdateCleanupPolicy,
UpdateAcceptor: acceptor,
UpdatePercent: test.percent,
} }
if err := updater.Update(config); err != nil { err := updater.Update(config)
if test.accepted && err != nil {
t.Errorf("Update failed: %v", err) t.Errorf("Update failed: %v", err)
} }
if !test.accepted && err == nil {
t.Errorf("Expected update to fail")
}
if buffer.String() != test.output { if buffer.String() != test.output {
t.Errorf("Bad output. expected:\n%s\ngot:\n%s", test.output, buffer.String()) t.Errorf("Bad output. expected:\n%s\ngot:\n%s", test.output, buffer.String())
} }
@ -304,31 +534,36 @@ func PTestUpdateRecovery(t *testing.T) {
rcExisting := newRc(1, 3) rcExisting := newRc(1, 3)
output := `Continuing update with existing controller foo-v2. output := `Continuing update with existing controller foo-v2.
Updating foo-v1 replicas: 1, foo-v2 replicas: 2 Scaling up foo-v2 from 1 to 3, scaling down foo-v1 from 2 to 0 (scale up first by 1 each interval)
Updating foo-v1 replicas: 0, foo-v2 replicas: 3 Scaling foo-v2 to 2
Scaling foo-v1 to 1
Scaling foo-v2 to 3
Scaling foo-v2 to 0
Update succeeded. Deleting foo-v1 Update succeeded. Deleting foo-v1
` `
responses := []fakeResponse{ responses := []fakeResponse{
// Existing newRc // Existing newRc
{rcExisting, nil}, {rcExisting, nil},
// 3 gets for each scale // scaling iteration
{newRc(2, 2), nil},
{newRc(2, 2), nil},
{newRc(2, 2), nil}, {newRc(2, 2), nil},
{oldRc(1), nil}, {oldRc(1), nil},
{oldRc(1), nil}, // scaling iteration
{oldRc(1), nil},
{newRc(3, 3), nil}, {newRc(3, 3), nil},
{newRc(3, 3), nil},
{newRc(3, 3), nil},
{oldRc(0), nil},
{oldRc(0), nil},
{oldRc(0), nil}, {oldRc(0), nil},
// cleanup annotations // cleanup annotations
{newRc(3, 3), nil}, {newRc(3, 3), nil},
{newRc(3, 3), nil}, {newRc(3, 3), nil},
{newRc(3, 3), nil},
}
client := NewRollingUpdaterClient(fakeClientFor("default", responses))
updater := RollingUpdater{
c: client,
ns: "default",
scaleAndWait: func(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) {
return client.GetReplicationController(rc.Namespace, rc.Name)
},
} }
updater := RollingUpdater{NewRollingUpdaterClient(fakeClientFor("default", responses)), "default"}
var buffer bytes.Buffer var buffer bytes.Buffer
config := &RollingUpdaterConfig{ config := &RollingUpdaterConfig{
@ -339,6 +574,7 @@ Update succeeded. Deleting foo-v1
Interval: time.Millisecond, Interval: time.Millisecond,
Timeout: time.Millisecond, Timeout: time.Millisecond,
CleanupPolicy: DeleteRollingUpdateCleanupPolicy, CleanupPolicy: DeleteRollingUpdateCleanupPolicy,
UpdateAcceptor: DefaultUpdateAcceptor,
} }
if err := updater.Update(config); err != nil { if err := updater.Update(config); err != nil {
t.Errorf("Update failed: %v", err) t.Errorf("Update failed: %v", err)
@ -354,9 +590,7 @@ func TestRollingUpdater_preserveCleanup(t *testing.T) {
rc := oldRc(2) rc := oldRc(2)
rcExisting := newRc(1, 3) rcExisting := newRc(1, 3)
updater := &RollingUpdater{ client := &rollingUpdaterClientImpl{
ns: "default",
c: &rollingUpdaterClientImpl{
GetReplicationControllerFn: func(namespace, name string) (*api.ReplicationController, error) { GetReplicationControllerFn: func(namespace, name string) (*api.ReplicationController, error) {
switch name { switch name {
case rc.Name: case rc.Name:
@ -383,7 +617,11 @@ func TestRollingUpdater_preserveCleanup(t *testing.T) {
return true, nil return true, nil
} }
}, },
}, }
updater := &RollingUpdater{
ns: "default",
c: client,
scaleAndWait: scalerScaleAndWait(client, "default"),
} }
config := &RollingUpdaterConfig{ config := &RollingUpdaterConfig{
@ -394,6 +632,7 @@ func TestRollingUpdater_preserveCleanup(t *testing.T) {
Interval: time.Millisecond, Interval: time.Millisecond,
Timeout: time.Millisecond, Timeout: time.Millisecond,
CleanupPolicy: PreserveRollingUpdateCleanupPolicy, CleanupPolicy: PreserveRollingUpdateCleanupPolicy,
UpdateAcceptor: DefaultUpdateAcceptor,
} }
err := updater.Update(config) err := updater.Update(config)
if err != nil { if err != nil {
@ -855,3 +1094,11 @@ func (c *rollingUpdaterClientImpl) DeleteReplicationController(namespace, name s
func (c *rollingUpdaterClientImpl) ControllerHasDesiredReplicas(rc *api.ReplicationController) wait.ConditionFunc { func (c *rollingUpdaterClientImpl) ControllerHasDesiredReplicas(rc *api.ReplicationController) wait.ConditionFunc {
return c.ControllerHasDesiredReplicasFn(rc) return c.ControllerHasDesiredReplicasFn(rc)
} }
type testAcceptor struct {
accept func(*api.ReplicationController) error
}
func (a *testAcceptor) Accept(rc *api.ReplicationController) error {
return a.accept(rc)
}