diff --git a/pkg/kubectl/rolling_updater.go b/pkg/kubectl/rolling_updater.go index 0af1253ad61..85f302c63b1 100644 --- a/pkg/kubectl/rolling_updater.go +++ b/pkg/kubectl/rolling_updater.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" @@ -58,6 +59,8 @@ type RollingUpdaterConfig struct { Interval time.Duration // Timeout is the time to wait for controller updates before giving up. Timeout time.Duration + // MinReadySeconds is the number of seconds to wait after the pods are ready + MinReadySeconds int32 // CleanupPolicy defines the cleanup action to take after the deployment is // complete. CleanupPolicy RollingUpdaterCleanupPolicy @@ -118,7 +121,9 @@ type RollingUpdater struct { // cleanup performs post deployment cleanup tasks for newRc and oldRc. cleanup func(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error // getReadyPods returns the amount of old and new ready pods. - getReadyPods func(oldRc, newRc *api.ReplicationController) (int32, int32, error) + getReadyPods func(oldRc, newRc *api.ReplicationController, minReadySeconds int32) (int32, int32, error) + // nowFn returns the current time used to calculate the minReadySeconds + nowFn func() unversioned.Time } // NewRollingUpdater creates a RollingUpdater from a client. @@ -132,6 +137,7 @@ func NewRollingUpdater(namespace string, client client.Interface) *RollingUpdate updater.getOrCreateTargetController = updater.getOrCreateTargetControllerWithClient updater.getReadyPods = updater.readyPods updater.cleanup = updater.cleanupWithClients + updater.nowFn = func() unversioned.Time { return unversioned.Now() } return updater } @@ -340,7 +346,7 @@ func (r *RollingUpdater) scaleDown(newRc, oldRc *api.ReplicationController, desi // Get ready pods. We shouldn't block, otherwise in case both old and new // pods are unavailable then the rolling update process blocks. // Timeout-wise we are already covered by the progress check. - _, newAvailable, err := r.getReadyPods(oldRc, newRc) + _, newAvailable, err := r.getReadyPods(oldRc, newRc, config.MinReadySeconds) if err != nil { return nil, err } @@ -397,10 +403,13 @@ func (r *RollingUpdater) scaleAndWaitWithScaler(rc *api.ReplicationController, r // readyPods returns the old and new ready counts for their pods. // If a pod is observed as being ready, it's considered ready even // if it later becomes notReady. -func (r *RollingUpdater) readyPods(oldRc, newRc *api.ReplicationController) (int32, int32, error) { +func (r *RollingUpdater) readyPods(oldRc, newRc *api.ReplicationController, minReadySeconds int32) (int32, int32, error) { controllers := []*api.ReplicationController{oldRc, newRc} oldReady := int32(0) newReady := int32(0) + if r.nowFn == nil { + r.nowFn = func() unversioned.Time { return unversioned.Now() } + } for i := range controllers { controller := controllers[i] @@ -411,13 +420,14 @@ func (r *RollingUpdater) readyPods(oldRc, newRc *api.ReplicationController) (int return 0, 0, err } for _, pod := range pods.Items { - if api.IsPodReady(&pod) { - switch controller.Name { - case oldRc.Name: - oldReady++ - case newRc.Name: - newReady++ - } + if !deployment.IsPodAvailable(&pod, minReadySeconds, r.nowFn().Time) { + continue + } + switch controller.Name { + case oldRc.Name: + oldReady++ + case newRc.Name: + newReady++ } } } diff --git a/pkg/kubectl/rolling_updater_test.go b/pkg/kubectl/rolling_updater_test.go index 9abe440569b..99a2ef9bc63 100644 --- a/pkg/kubectl/rolling_updater_test.go +++ b/pkg/kubectl/rolling_updater_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" apitesting "k8s.io/kubernetes/pkg/api/testing" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/restclient" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/fake" @@ -810,7 +811,7 @@ Scaling foo-v2 up to 2 }, } // Set up a mock readiness check which handles the test assertions. - updater.getReadyPods = func(oldRc, newRc *api.ReplicationController) (int32, int32, error) { + updater.getReadyPods = func(oldRc, newRc *api.ReplicationController, minReadySecondsDeadline int32) (int32, int32, error) { // Return simulated readiness, and throw an error if this call has no // expectations defined. oldReady := next(&oldReady) @@ -860,7 +861,7 @@ func TestUpdate_progressTimeout(t *testing.T) { return nil }, } - updater.getReadyPods = func(oldRc, newRc *api.ReplicationController) (int32, int32, error) { + updater.getReadyPods = func(oldRc, newRc *api.ReplicationController, minReadySeconds int32) (int32, int32, error) { // Coerce a timeout by pods never becoming ready. return 0, 0, nil } @@ -913,7 +914,7 @@ func TestUpdate_assignOriginalAnnotation(t *testing.T) { cleanup: func(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error { return nil }, - getReadyPods: func(oldRc, newRc *api.ReplicationController) (int32, int32, error) { + getReadyPods: func(oldRc, newRc *api.ReplicationController, minReadySeconds int32) (int32, int32, error) { return 1, 1, nil }, } @@ -1555,7 +1556,8 @@ func TestAddDeploymentHash(t *testing.T) { } func TestRollingUpdater_readyPods(t *testing.T) { - mkpod := func(owner *api.ReplicationController, ready bool) *api.Pod { + now := unversioned.Date(2016, time.April, 1, 1, 0, 0, 0, time.UTC) + mkpod := func(owner *api.ReplicationController, ready bool, readyTime unversioned.Time) *api.Pod { labels := map[string]string{} for k, v := range owner.Spec.Selector { labels[k] = v @@ -1572,8 +1574,9 @@ func TestRollingUpdater_readyPods(t *testing.T) { Status: api.PodStatus{ Conditions: []api.PodCondition{ { - Type: api.PodReady, - Status: status, + Type: api.PodReady, + Status: status, + LastTransitionTime: readyTime, }, }, }, @@ -1589,6 +1592,11 @@ func TestRollingUpdater_readyPods(t *testing.T) { // pods owned by the rcs; indicate whether they're ready oldPods []bool newPods []bool + // specify additional time to wait for deployment to wait on top of the + // pod ready time + minReadySeconds int32 + podReadyTimeFn func() unversioned.Time + nowFn func() unversioned.Time }{ { oldRc: oldRc(4, 4), @@ -1632,25 +1640,61 @@ func TestRollingUpdater_readyPods(t *testing.T) { false, }, }, + { + oldRc: oldRc(4, 4), + newRc: newRc(4, 4), + oldReady: 0, + newReady: 0, + oldPods: []bool{ + true, + }, + newPods: []bool{ + true, + }, + minReadySeconds: 5, + nowFn: func() unversioned.Time { return now }, + }, + { + oldRc: oldRc(4, 4), + newRc: newRc(4, 4), + oldReady: 1, + newReady: 1, + oldPods: []bool{ + true, + }, + newPods: []bool{ + true, + }, + minReadySeconds: 5, + nowFn: func() unversioned.Time { return unversioned.Time{Time: now.Add(time.Duration(6 * time.Second))} }, + podReadyTimeFn: func() unversioned.Time { return now }, + }, } for i, test := range tests { t.Logf("evaluating test %d", i) + if test.nowFn == nil { + test.nowFn = func() unversioned.Time { return now } + } + if test.podReadyTimeFn == nil { + test.podReadyTimeFn = test.nowFn + } // Populate the fake client with pods associated with their owners. pods := []runtime.Object{} for _, ready := range test.oldPods { - pods = append(pods, mkpod(test.oldRc, ready)) + pods = append(pods, mkpod(test.oldRc, ready, test.podReadyTimeFn())) } for _, ready := range test.newPods { - pods = append(pods, mkpod(test.newRc, ready)) + pods = append(pods, mkpod(test.newRc, ready, test.podReadyTimeFn())) } client := testclient.NewSimpleFake(pods...) updater := &RollingUpdater{ - ns: "default", - c: client, + ns: "default", + c: client, + nowFn: test.nowFn, } - oldReady, newReady, err := updater.readyPods(test.oldRc, test.newRc) + oldReady, newReady, err := updater.readyPods(test.oldRc, test.newRc, test.minReadySeconds) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/pkg/util/deployment/deployment.go b/pkg/util/deployment/deployment.go index 05b2a83df46..a1ac20e553a 100644 --- a/pkg/util/deployment/deployment.go +++ b/pkg/util/deployment/deployment.go @@ -354,14 +354,15 @@ func GetAvailablePodsForDeployment(c clientset.Interface, deployment *extensions func countAvailablePods(pods []api.Pod, minReadySeconds int32) int32 { availablePodCount := int32(0) for _, pod := range pods { - if IsPodAvailable(&pod, minReadySeconds) { + // TODO: Make the time.Now() as argument to allow unit test this. + if IsPodAvailable(&pod, minReadySeconds, time.Now()) { availablePodCount++ } } return availablePodCount } -func IsPodAvailable(pod *api.Pod, minReadySeconds int32) bool { +func IsPodAvailable(pod *api.Pod, minReadySeconds int32, now time.Time) bool { if !controller.IsPodActive(*pod) { return false } @@ -374,7 +375,7 @@ func IsPodAvailable(pod *api.Pod, minReadySeconds int32) bool { // 1. minReadySeconds == 0, or // 2. LastTransitionTime (is set) + minReadySeconds (>0) < current time minReadySecondsDuration := time.Duration(minReadySeconds) * time.Second - if minReadySeconds == 0 || !c.LastTransitionTime.IsZero() && c.LastTransitionTime.Add(minReadySecondsDuration).Before(time.Now()) { + if minReadySeconds == 0 || !c.LastTransitionTime.IsZero() && c.LastTransitionTime.Add(minReadySecondsDuration).Before(now) { return true } } diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 76ef5fd4b9e..b03b5979ecd 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -3209,7 +3209,7 @@ func WaitForPodsReady(c *clientset.Clientset, ns, name string, minReadySeconds i return false, nil } for _, pod := range pods.Items { - if !deploymentutil.IsPodAvailable(&pod, int32(minReadySeconds)) { + if !deploymentutil.IsPodAvailable(&pod, int32(minReadySeconds), time.Now()) { return false, nil } } @@ -3260,7 +3260,7 @@ func logPodsOfDeployment(c clientset.Interface, deployment *extensions.Deploymen if err == nil { for _, pod := range podList.Items { availability := "not available" - if deploymentutil.IsPodAvailable(&pod, minReadySeconds) { + if deploymentutil.IsPodAvailable(&pod, minReadySeconds, time.Now()) { availability = "available" } Logf("Pod %s is %s: %+v", pod.Name, availability, pod)