From 35fab99af760f1cea72680f435b4322764652f91 Mon Sep 17 00:00:00 2001 From: Michail Kargakis Date: Mon, 15 Feb 2016 14:43:36 +0100 Subject: [PATCH] kubectl: scale down based on ready during rolling updates --- pkg/kubectl/rolling_updater.go | 70 ++++++++++++++--------------- pkg/kubectl/rolling_updater_test.go | 37 +++++++++++---- 2 files changed, 62 insertions(+), 45 deletions(-) diff --git a/pkg/kubectl/rolling_updater.go b/pkg/kubectl/rolling_updater.go index d734b153dcc..6b332d0685a 100644 --- a/pkg/kubectl/rolling_updater.go +++ b/pkg/kubectl/rolling_updater.go @@ -113,10 +113,8 @@ type RollingUpdater struct { getOrCreateTargetController func(controller *api.ReplicationController, sourceId string) (*api.ReplicationController, bool, error) // cleanup performs post deployment cleanup tasks for newRc and oldRc. cleanup func(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error - // waitForReadyPods should block until there are >0 total pods ready amongst - // the old and new controllers, and should return the amount of old and new - // ready. - waitForReadyPods func(interval, timeout time.Duration, oldRc, newRc *api.ReplicationController) (int, int, error) + // getReadyPods returns the amount of old and new ready pods. + getReadyPods func(oldRc, newRc *api.ReplicationController) (int, int, error) } // NewRollingUpdater creates a RollingUpdater from a client. @@ -128,7 +126,7 @@ func NewRollingUpdater(namespace string, client client.Interface) *RollingUpdate // Inject real implementations. updater.scaleAndWait = updater.scaleAndWaitWithScaler updater.getOrCreateTargetController = updater.getOrCreateTargetControllerWithClient - updater.waitForReadyPods = updater.pollForReadyPods + updater.getReadyPods = updater.readyPods updater.cleanup = updater.cleanupWithClients return updater } @@ -299,7 +297,7 @@ func (r *RollingUpdater) scaleUp(newRc, oldRc *api.ReplicationController, origin return scaledRc, nil } -// scaleDown scales down oldRc to 0 at whatever increment possible given the +// scaleDown scales down oldRc to 0 at whatever decrement possible given the // thresholds defined on the config. scaleDown will safely no-op as necessary // when it detects redundancy or other relevant conditions. func (r *RollingUpdater) scaleDown(newRc, oldRc *api.ReplicationController, desired, minAvailable, maxUnavailable, maxSurge int, config *RollingUpdaterConfig) (*api.ReplicationController, error) { @@ -307,15 +305,19 @@ func (r *RollingUpdater) scaleDown(newRc, oldRc *api.ReplicationController, desi if oldRc.Spec.Replicas == 0 { return oldRc, nil } - // Block until there are any pods ready. - _, newAvailable, err := r.waitForReadyPods(config.Interval, config.Timeout, oldRc, newRc) + // Get ready pods. We shouldn't block, otherwise in case both old and new + // pods are unavailable then the rolling update process blocks. + // Timeout-wise we are already covered by the progress check. + _, newAvailable, err := r.getReadyPods(oldRc, newRc) if err != nil { return nil, err } // The old controller is considered as part of the total because we want to // maintain minimum availability even with a volatile old controller. // Scale down as much as possible while maintaining minimum availability - decrement := oldRc.Spec.Replicas + newAvailable - minAvailable + allPods := oldRc.Spec.Replicas + newRc.Spec.Replicas + newUnavailable := newRc.Spec.Replicas - newAvailable + decrement := allPods - minAvailable - newUnavailable // The decrement normally shouldn't drop below 0 because the available count // always starts below the old replica count, but the old replica count can // decrement due to externalities like pods death in the replica set. This @@ -360,40 +362,34 @@ func (r *RollingUpdater) scaleAndWaitWithScaler(rc *api.ReplicationController, r return r.c.ReplicationControllers(rc.Namespace).Get(rc.Name) } -// pollForReadyPods polls oldRc and newRc each interval and returns the old -// and new ready counts for their pods. If a pod is observed as being ready, -// it's considered ready even if it later becomes notReady. -func (r *RollingUpdater) pollForReadyPods(interval, timeout time.Duration, oldRc, newRc *api.ReplicationController) (int, int, error) { +// readyPods returns the old and new ready counts for their pods. +// If a pod is observed as being ready, it's considered ready even +// if it later becomes notReady. +func (r *RollingUpdater) readyPods(oldRc, newRc *api.ReplicationController) (int, int, error) { controllers := []*api.ReplicationController{oldRc, newRc} oldReady := 0 newReady := 0 - err := wait.Poll(interval, timeout, func() (done bool, err error) { - anyReady := false - for _, controller := range controllers { - selector := labels.Set(controller.Spec.Selector).AsSelector() - options := api.ListOptions{LabelSelector: selector} - pods, err := r.c.Pods(controller.Namespace).List(options) - if err != nil { - return false, err - } - for _, pod := range pods.Items { - if api.IsPodReady(&pod) { - switch controller.Name { - case oldRc.Name: - oldReady++ - case newRc.Name: - newReady++ - } - anyReady = true + + for i := range controllers { + controller := controllers[i] + selector := labels.Set(controller.Spec.Selector).AsSelector() + options := api.ListOptions{LabelSelector: selector} + pods, err := r.c.Pods(controller.Namespace).List(options) + if err != nil { + return 0, 0, err + } + for _, pod := range pods.Items { + if api.IsPodReady(&pod) { + switch controller.Name { + case oldRc.Name: + oldReady++ + case newRc.Name: + newReady++ } } } - if anyReady { - return true, nil - } - return false, nil - }) - return oldReady, newReady, err + } + return oldReady, newReady, nil } // getOrCreateTargetControllerWithClient looks for an existing controller with diff --git a/pkg/kubectl/rolling_updater_test.go b/pkg/kubectl/rolling_updater_test.go index 4ba556847ad..350a524ba3d 100644 --- a/pkg/kubectl/rolling_updater_test.go +++ b/pkg/kubectl/rolling_updater_test.go @@ -634,6 +634,27 @@ Scaling up foo-v2 from 0 to 2, scaling down foo-v1 from 30 to 0 (keep 1 pods ava Scaling foo-v1 down to 1 Scaling foo-v2 up to 2 Scaling foo-v1 down to 0 +`, + }, + { + name: "2->2 1/0 blocked oldRc", + oldRc: oldRc(2, 2), + newRc: newRc(0, 2), + newRcExists: false, + maxUnavail: intstr.FromInt(1), + maxSurge: intstr.FromInt(0), + expected: []interface{}{ + down{oldReady: 1, newReady: 0, to: 1}, + up{1}, + down{oldReady: 1, newReady: 1, to: 0}, + up{2}, + }, + output: `Created foo-v2 +Scaling up foo-v2 from 0 to 2, scaling down foo-v1 from 2 to 0 (keep 1 pods available, don't exceed 2 pods) +Scaling foo-v1 down to 1 +Scaling foo-v2 up to 1 +Scaling foo-v1 down to 0 +Scaling foo-v2 up to 2 `, }, } @@ -685,10 +706,10 @@ Scaling foo-v1 down to 0 expected := -1 switch { case rc == test.newRc: - t.Logf("scaling up %s:%d", rc.Name, rc.Spec.Replicas) + t.Logf("scaling up %s to %d", rc.Name, rc.Spec.Replicas) expected = next(&upTo) case rc == test.oldRc: - t.Logf("scaling down %s:%d", rc.Name, rc.Spec.Replicas) + t.Logf("scaling down %s to %d", rc.Name, rc.Spec.Replicas) expected = next(&downTo) } if expected == -1 { @@ -709,13 +730,13 @@ Scaling foo-v1 down to 0 }, } // Set up a mock readiness check which handles the test assertions. - updater.waitForReadyPods = func(interval, timeout time.Duration, oldRc, newRc *api.ReplicationController) (int, int, error) { + updater.getReadyPods = func(oldRc, newRc *api.ReplicationController) (int, int, error) { // Return simulated readiness, and throw an error if this call has no // expectations defined. oldReady := next(&oldReady) newReady := next(&newReady) if oldReady == -1 || newReady == -1 { - t.Fatalf("unexpected waitForReadyPods call for:\noldRc: %+v\nnewRc: %+v", oldRc, newRc) + t.Fatalf("unexpected getReadyPods call for:\noldRc: %+v\nnewRc: %+v", oldRc, newRc) } return oldReady, newReady, nil } @@ -759,7 +780,7 @@ func TestUpdate_progressTimeout(t *testing.T) { return nil }, } - updater.waitForReadyPods = func(interval, timeout time.Duration, oldRc, newRc *api.ReplicationController) (int, int, error) { + updater.getReadyPods = func(oldRc, newRc *api.ReplicationController) (int, int, error) { // Coerce a timeout by pods never becoming ready. return 0, 0, nil } @@ -812,7 +833,7 @@ func TestUpdate_assignOriginalAnnotation(t *testing.T) { cleanup: func(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error { return nil }, - waitForReadyPods: func(interval, timeout time.Duration, oldRc, newRc *api.ReplicationController) (int, int, error) { + getReadyPods: func(oldRc, newRc *api.ReplicationController) (int, int, error) { return 1, 1, nil }, } @@ -1442,7 +1463,7 @@ func TestAddDeploymentHash(t *testing.T) { } } -func TestRollingUpdater_pollForReadyPods(t *testing.T) { +func TestRollingUpdater_readyPods(t *testing.T) { mkpod := func(owner *api.ReplicationController, ready bool) *api.Pod { labels := map[string]string{} for k, v := range owner.Spec.Selector { @@ -1538,7 +1559,7 @@ func TestRollingUpdater_pollForReadyPods(t *testing.T) { ns: "default", c: client, } - oldReady, newReady, err := updater.pollForReadyPods(time.Millisecond, time.Second, test.oldRc, test.newRc) + oldReady, newReady, err := updater.readyPods(test.oldRc, test.newRc) if err != nil { t.Errorf("unexpected error: %v", err) }