Merge pull request #21273 from kargakis/rolling-updater-fix

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-02-23 04:50:59 -08:00
commit d3661414a1
2 changed files with 62 additions and 45 deletions

View File

@ -113,10 +113,8 @@ type RollingUpdater struct {
getOrCreateTargetController func(controller *api.ReplicationController, sourceId string) (*api.ReplicationController, bool, error) getOrCreateTargetController func(controller *api.ReplicationController, sourceId string) (*api.ReplicationController, bool, error)
// cleanup performs post deployment cleanup tasks for newRc and oldRc. // cleanup performs post deployment cleanup tasks for newRc and oldRc.
cleanup func(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error cleanup func(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error
// waitForReadyPods should block until there are >0 total pods ready amongst // getReadyPods returns the amount of old and new ready pods.
// the old and new controllers, and should return the amount of old and new getReadyPods func(oldRc, newRc *api.ReplicationController) (int, int, error)
// ready.
waitForReadyPods func(interval, timeout time.Duration, oldRc, newRc *api.ReplicationController) (int, int, error)
} }
// NewRollingUpdater creates a RollingUpdater from a client. // NewRollingUpdater creates a RollingUpdater from a client.
@ -128,7 +126,7 @@ func NewRollingUpdater(namespace string, client client.Interface) *RollingUpdate
// Inject real implementations. // Inject real implementations.
updater.scaleAndWait = updater.scaleAndWaitWithScaler updater.scaleAndWait = updater.scaleAndWaitWithScaler
updater.getOrCreateTargetController = updater.getOrCreateTargetControllerWithClient updater.getOrCreateTargetController = updater.getOrCreateTargetControllerWithClient
updater.waitForReadyPods = updater.pollForReadyPods updater.getReadyPods = updater.readyPods
updater.cleanup = updater.cleanupWithClients updater.cleanup = updater.cleanupWithClients
return updater return updater
} }
@ -299,7 +297,7 @@ func (r *RollingUpdater) scaleUp(newRc, oldRc *api.ReplicationController, origin
return scaledRc, nil return scaledRc, nil
} }
// scaleDown scales down oldRc to 0 at whatever increment possible given the // scaleDown scales down oldRc to 0 at whatever decrement possible given the
// thresholds defined on the config. scaleDown will safely no-op as necessary // thresholds defined on the config. scaleDown will safely no-op as necessary
// when it detects redundancy or other relevant conditions. // when it detects redundancy or other relevant conditions.
func (r *RollingUpdater) scaleDown(newRc, oldRc *api.ReplicationController, desired, minAvailable, maxUnavailable, maxSurge int, config *RollingUpdaterConfig) (*api.ReplicationController, error) { func (r *RollingUpdater) scaleDown(newRc, oldRc *api.ReplicationController, desired, minAvailable, maxUnavailable, maxSurge int, config *RollingUpdaterConfig) (*api.ReplicationController, error) {
@ -307,15 +305,19 @@ func (r *RollingUpdater) scaleDown(newRc, oldRc *api.ReplicationController, desi
if oldRc.Spec.Replicas == 0 { if oldRc.Spec.Replicas == 0 {
return oldRc, nil return oldRc, nil
} }
// Block until there are any pods ready. // Get ready pods. We shouldn't block, otherwise in case both old and new
_, newAvailable, err := r.waitForReadyPods(config.Interval, config.Timeout, oldRc, newRc) // pods are unavailable then the rolling update process blocks.
// Timeout-wise we are already covered by the progress check.
_, newAvailable, err := r.getReadyPods(oldRc, newRc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// The old controller is considered as part of the total because we want to // The old controller is considered as part of the total because we want to
// maintain minimum availability even with a volatile old controller. // maintain minimum availability even with a volatile old controller.
// Scale down as much as possible while maintaining minimum availability // Scale down as much as possible while maintaining minimum availability
decrement := oldRc.Spec.Replicas + newAvailable - minAvailable allPods := oldRc.Spec.Replicas + newRc.Spec.Replicas
newUnavailable := newRc.Spec.Replicas - newAvailable
decrement := allPods - minAvailable - newUnavailable
// The decrement normally shouldn't drop below 0 because the available count // The decrement normally shouldn't drop below 0 because the available count
// always starts below the old replica count, but the old replica count can // always starts below the old replica count, but the old replica count can
// decrement due to externalities like pods death in the replica set. This // decrement due to externalities like pods death in the replica set. This
@ -360,40 +362,34 @@ func (r *RollingUpdater) scaleAndWaitWithScaler(rc *api.ReplicationController, r
return r.c.ReplicationControllers(rc.Namespace).Get(rc.Name) return r.c.ReplicationControllers(rc.Namespace).Get(rc.Name)
} }
// pollForReadyPods polls oldRc and newRc each interval and returns the old // readyPods returns the old and new ready counts for their pods.
// and new ready counts for their pods. If a pod is observed as being ready, // If a pod is observed as being ready, it's considered ready even
// it's considered ready even if it later becomes notReady. // if it later becomes notReady.
func (r *RollingUpdater) pollForReadyPods(interval, timeout time.Duration, oldRc, newRc *api.ReplicationController) (int, int, error) { func (r *RollingUpdater) readyPods(oldRc, newRc *api.ReplicationController) (int, int, error) {
controllers := []*api.ReplicationController{oldRc, newRc} controllers := []*api.ReplicationController{oldRc, newRc}
oldReady := 0 oldReady := 0
newReady := 0 newReady := 0
err := wait.Poll(interval, timeout, func() (done bool, err error) {
anyReady := false for i := range controllers {
for _, controller := range controllers { controller := controllers[i]
selector := labels.Set(controller.Spec.Selector).AsSelector() selector := labels.Set(controller.Spec.Selector).AsSelector()
options := api.ListOptions{LabelSelector: selector} options := api.ListOptions{LabelSelector: selector}
pods, err := r.c.Pods(controller.Namespace).List(options) pods, err := r.c.Pods(controller.Namespace).List(options)
if err != nil { if err != nil {
return false, err return 0, 0, err
} }
for _, pod := range pods.Items { for _, pod := range pods.Items {
if api.IsPodReady(&pod) { if api.IsPodReady(&pod) {
switch controller.Name { switch controller.Name {
case oldRc.Name: case oldRc.Name:
oldReady++ oldReady++
case newRc.Name: case newRc.Name:
newReady++ newReady++
}
anyReady = true
} }
} }
} }
if anyReady { }
return true, nil return oldReady, newReady, nil
}
return false, nil
})
return oldReady, newReady, err
} }
// getOrCreateTargetControllerWithClient looks for an existing controller with // getOrCreateTargetControllerWithClient looks for an existing controller with

View File

@ -634,6 +634,27 @@ Scaling up foo-v2 from 0 to 2, scaling down foo-v1 from 30 to 0 (keep 1 pods ava
Scaling foo-v1 down to 1 Scaling foo-v1 down to 1
Scaling foo-v2 up to 2 Scaling foo-v2 up to 2
Scaling foo-v1 down to 0 Scaling foo-v1 down to 0
`,
},
{
name: "2->2 1/0 blocked oldRc",
oldRc: oldRc(2, 2),
newRc: newRc(0, 2),
newRcExists: false,
maxUnavail: intstr.FromInt(1),
maxSurge: intstr.FromInt(0),
expected: []interface{}{
down{oldReady: 1, newReady: 0, to: 1},
up{1},
down{oldReady: 1, newReady: 1, to: 0},
up{2},
},
output: `Created foo-v2
Scaling up foo-v2 from 0 to 2, scaling down foo-v1 from 2 to 0 (keep 1 pods available, don't exceed 2 pods)
Scaling foo-v1 down to 1
Scaling foo-v2 up to 1
Scaling foo-v1 down to 0
Scaling foo-v2 up to 2
`, `,
}, },
} }
@ -685,10 +706,10 @@ Scaling foo-v1 down to 0
expected := -1 expected := -1
switch { switch {
case rc == test.newRc: case rc == test.newRc:
t.Logf("scaling up %s:%d", rc.Name, rc.Spec.Replicas) t.Logf("scaling up %s to %d", rc.Name, rc.Spec.Replicas)
expected = next(&upTo) expected = next(&upTo)
case rc == test.oldRc: case rc == test.oldRc:
t.Logf("scaling down %s:%d", rc.Name, rc.Spec.Replicas) t.Logf("scaling down %s to %d", rc.Name, rc.Spec.Replicas)
expected = next(&downTo) expected = next(&downTo)
} }
if expected == -1 { if expected == -1 {
@ -709,13 +730,13 @@ Scaling foo-v1 down to 0
}, },
} }
// Set up a mock readiness check which handles the test assertions. // Set up a mock readiness check which handles the test assertions.
updater.waitForReadyPods = func(interval, timeout time.Duration, oldRc, newRc *api.ReplicationController) (int, int, error) { updater.getReadyPods = func(oldRc, newRc *api.ReplicationController) (int, int, error) {
// Return simulated readiness, and throw an error if this call has no // Return simulated readiness, and throw an error if this call has no
// expectations defined. // expectations defined.
oldReady := next(&oldReady) oldReady := next(&oldReady)
newReady := next(&newReady) newReady := next(&newReady)
if oldReady == -1 || newReady == -1 { if oldReady == -1 || newReady == -1 {
t.Fatalf("unexpected waitForReadyPods call for:\noldRc: %+v\nnewRc: %+v", oldRc, newRc) t.Fatalf("unexpected getReadyPods call for:\noldRc: %+v\nnewRc: %+v", oldRc, newRc)
} }
return oldReady, newReady, nil return oldReady, newReady, nil
} }
@ -759,7 +780,7 @@ func TestUpdate_progressTimeout(t *testing.T) {
return nil return nil
}, },
} }
updater.waitForReadyPods = func(interval, timeout time.Duration, oldRc, newRc *api.ReplicationController) (int, int, error) { updater.getReadyPods = func(oldRc, newRc *api.ReplicationController) (int, int, error) {
// Coerce a timeout by pods never becoming ready. // Coerce a timeout by pods never becoming ready.
return 0, 0, nil return 0, 0, nil
} }
@ -812,7 +833,7 @@ func TestUpdate_assignOriginalAnnotation(t *testing.T) {
cleanup: func(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error { cleanup: func(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error {
return nil return nil
}, },
waitForReadyPods: func(interval, timeout time.Duration, oldRc, newRc *api.ReplicationController) (int, int, error) { getReadyPods: func(oldRc, newRc *api.ReplicationController) (int, int, error) {
return 1, 1, nil return 1, 1, nil
}, },
} }
@ -1442,7 +1463,7 @@ func TestAddDeploymentHash(t *testing.T) {
} }
} }
func TestRollingUpdater_pollForReadyPods(t *testing.T) { func TestRollingUpdater_readyPods(t *testing.T) {
mkpod := func(owner *api.ReplicationController, ready bool) *api.Pod { mkpod := func(owner *api.ReplicationController, ready bool) *api.Pod {
labels := map[string]string{} labels := map[string]string{}
for k, v := range owner.Spec.Selector { for k, v := range owner.Spec.Selector {
@ -1538,7 +1559,7 @@ func TestRollingUpdater_pollForReadyPods(t *testing.T) {
ns: "default", ns: "default",
c: client, c: client,
} }
oldReady, newReady, err := updater.pollForReadyPods(time.Millisecond, time.Second, test.oldRc, test.newRc) oldReady, newReady, err := updater.readyPods(test.oldRc, test.newRc)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }