mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 13:37:30 +00:00
Add MinReadySeconds to rolling updater
This commit is contained in:
parent
2baf9b0f27
commit
7ea28e42c0
@ -26,6 +26,7 @@ import (
|
|||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/errors"
|
"k8s.io/kubernetes/pkg/api/errors"
|
||||||
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/labels"
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
@ -58,6 +59,8 @@ type RollingUpdaterConfig struct {
|
|||||||
Interval time.Duration
|
Interval time.Duration
|
||||||
// Timeout is the time to wait for controller updates before giving up.
|
// Timeout is the time to wait for controller updates before giving up.
|
||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
|
// MinReadySeconds is the number of seconds to wait after the pods are ready
|
||||||
|
MinReadySeconds int32
|
||||||
// CleanupPolicy defines the cleanup action to take after the deployment is
|
// CleanupPolicy defines the cleanup action to take after the deployment is
|
||||||
// complete.
|
// complete.
|
||||||
CleanupPolicy RollingUpdaterCleanupPolicy
|
CleanupPolicy RollingUpdaterCleanupPolicy
|
||||||
@ -118,7 +121,9 @@ type RollingUpdater struct {
|
|||||||
// cleanup performs post deployment cleanup tasks for newRc and oldRc.
|
// cleanup performs post deployment cleanup tasks for newRc and oldRc.
|
||||||
cleanup func(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error
|
cleanup func(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error
|
||||||
// getReadyPods returns the amount of old and new ready pods.
|
// getReadyPods returns the amount of old and new ready pods.
|
||||||
getReadyPods func(oldRc, newRc *api.ReplicationController) (int32, int32, error)
|
getReadyPods func(oldRc, newRc *api.ReplicationController, minReadySeconds int32) (int32, int32, error)
|
||||||
|
// nowFn returns the current time used to calculate the minReadySeconds
|
||||||
|
nowFn func() unversioned.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRollingUpdater creates a RollingUpdater from a client.
|
// NewRollingUpdater creates a RollingUpdater from a client.
|
||||||
@ -132,6 +137,7 @@ func NewRollingUpdater(namespace string, client client.Interface) *RollingUpdate
|
|||||||
updater.getOrCreateTargetController = updater.getOrCreateTargetControllerWithClient
|
updater.getOrCreateTargetController = updater.getOrCreateTargetControllerWithClient
|
||||||
updater.getReadyPods = updater.readyPods
|
updater.getReadyPods = updater.readyPods
|
||||||
updater.cleanup = updater.cleanupWithClients
|
updater.cleanup = updater.cleanupWithClients
|
||||||
|
updater.nowFn = func() unversioned.Time { return unversioned.Now() }
|
||||||
return updater
|
return updater
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -340,7 +346,7 @@ func (r *RollingUpdater) scaleDown(newRc, oldRc *api.ReplicationController, desi
|
|||||||
// Get ready pods. We shouldn't block, otherwise in case both old and new
|
// Get ready pods. We shouldn't block, otherwise in case both old and new
|
||||||
// pods are unavailable then the rolling update process blocks.
|
// pods are unavailable then the rolling update process blocks.
|
||||||
// Timeout-wise we are already covered by the progress check.
|
// Timeout-wise we are already covered by the progress check.
|
||||||
_, newAvailable, err := r.getReadyPods(oldRc, newRc)
|
_, newAvailable, err := r.getReadyPods(oldRc, newRc, config.MinReadySeconds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -397,10 +403,13 @@ func (r *RollingUpdater) scaleAndWaitWithScaler(rc *api.ReplicationController, r
|
|||||||
// readyPods returns the old and new ready counts for their pods.
|
// readyPods returns the old and new ready counts for their pods.
|
||||||
// If a pod is observed as being ready, it's considered ready even
|
// If a pod is observed as being ready, it's considered ready even
|
||||||
// if it later becomes notReady.
|
// if it later becomes notReady.
|
||||||
func (r *RollingUpdater) readyPods(oldRc, newRc *api.ReplicationController) (int32, int32, error) {
|
func (r *RollingUpdater) readyPods(oldRc, newRc *api.ReplicationController, minReadySeconds int32) (int32, int32, error) {
|
||||||
controllers := []*api.ReplicationController{oldRc, newRc}
|
controllers := []*api.ReplicationController{oldRc, newRc}
|
||||||
oldReady := int32(0)
|
oldReady := int32(0)
|
||||||
newReady := int32(0)
|
newReady := int32(0)
|
||||||
|
if r.nowFn == nil {
|
||||||
|
r.nowFn = func() unversioned.Time { return unversioned.Now() }
|
||||||
|
}
|
||||||
|
|
||||||
for i := range controllers {
|
for i := range controllers {
|
||||||
controller := controllers[i]
|
controller := controllers[i]
|
||||||
@ -411,7 +420,9 @@ func (r *RollingUpdater) readyPods(oldRc, newRc *api.ReplicationController) (int
|
|||||||
return 0, 0, err
|
return 0, 0, err
|
||||||
}
|
}
|
||||||
for _, pod := range pods.Items {
|
for _, pod := range pods.Items {
|
||||||
if api.IsPodReady(&pod) {
|
if !deployment.IsPodAvailable(&pod, minReadySeconds, r.nowFn().Time) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
switch controller.Name {
|
switch controller.Name {
|
||||||
case oldRc.Name:
|
case oldRc.Name:
|
||||||
oldReady++
|
oldReady++
|
||||||
@ -420,7 +431,6 @@ func (r *RollingUpdater) readyPods(oldRc, newRc *api.ReplicationController) (int
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return oldReady, newReady, nil
|
return oldReady, newReady, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,6 +29,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/testapi"
|
"k8s.io/kubernetes/pkg/api/testapi"
|
||||||
apitesting "k8s.io/kubernetes/pkg/api/testing"
|
apitesting "k8s.io/kubernetes/pkg/api/testing"
|
||||||
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/client/restclient"
|
"k8s.io/kubernetes/pkg/client/restclient"
|
||||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/client/unversioned/fake"
|
"k8s.io/kubernetes/pkg/client/unversioned/fake"
|
||||||
@ -810,7 +811,7 @@ Scaling foo-v2 up to 2
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
// Set up a mock readiness check which handles the test assertions.
|
// Set up a mock readiness check which handles the test assertions.
|
||||||
updater.getReadyPods = func(oldRc, newRc *api.ReplicationController) (int32, int32, error) {
|
updater.getReadyPods = func(oldRc, newRc *api.ReplicationController, minReadySecondsDeadline int32) (int32, int32, error) {
|
||||||
// Return simulated readiness, and throw an error if this call has no
|
// Return simulated readiness, and throw an error if this call has no
|
||||||
// expectations defined.
|
// expectations defined.
|
||||||
oldReady := next(&oldReady)
|
oldReady := next(&oldReady)
|
||||||
@ -860,7 +861,7 @@ func TestUpdate_progressTimeout(t *testing.T) {
|
|||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
updater.getReadyPods = func(oldRc, newRc *api.ReplicationController) (int32, int32, error) {
|
updater.getReadyPods = func(oldRc, newRc *api.ReplicationController, minReadySeconds int32) (int32, int32, error) {
|
||||||
// Coerce a timeout by pods never becoming ready.
|
// Coerce a timeout by pods never becoming ready.
|
||||||
return 0, 0, nil
|
return 0, 0, nil
|
||||||
}
|
}
|
||||||
@ -913,7 +914,7 @@ func TestUpdate_assignOriginalAnnotation(t *testing.T) {
|
|||||||
cleanup: func(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error {
|
cleanup: func(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error {
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
getReadyPods: func(oldRc, newRc *api.ReplicationController) (int32, int32, error) {
|
getReadyPods: func(oldRc, newRc *api.ReplicationController, minReadySeconds int32) (int32, int32, error) {
|
||||||
return 1, 1, nil
|
return 1, 1, nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -1555,7 +1556,8 @@ func TestAddDeploymentHash(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestRollingUpdater_readyPods(t *testing.T) {
|
func TestRollingUpdater_readyPods(t *testing.T) {
|
||||||
mkpod := func(owner *api.ReplicationController, ready bool) *api.Pod {
|
now := unversioned.Date(2016, time.April, 1, 1, 0, 0, 0, time.UTC)
|
||||||
|
mkpod := func(owner *api.ReplicationController, ready bool, readyTime unversioned.Time) *api.Pod {
|
||||||
labels := map[string]string{}
|
labels := map[string]string{}
|
||||||
for k, v := range owner.Spec.Selector {
|
for k, v := range owner.Spec.Selector {
|
||||||
labels[k] = v
|
labels[k] = v
|
||||||
@ -1574,6 +1576,7 @@ func TestRollingUpdater_readyPods(t *testing.T) {
|
|||||||
{
|
{
|
||||||
Type: api.PodReady,
|
Type: api.PodReady,
|
||||||
Status: status,
|
Status: status,
|
||||||
|
LastTransitionTime: readyTime,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -1589,6 +1592,11 @@ func TestRollingUpdater_readyPods(t *testing.T) {
|
|||||||
// pods owned by the rcs; indicate whether they're ready
|
// pods owned by the rcs; indicate whether they're ready
|
||||||
oldPods []bool
|
oldPods []bool
|
||||||
newPods []bool
|
newPods []bool
|
||||||
|
// specify additional time to wait for deployment to wait on top of the
|
||||||
|
// pod ready time
|
||||||
|
minReadySeconds int32
|
||||||
|
podReadyTimeFn func() unversioned.Time
|
||||||
|
nowFn func() unversioned.Time
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
oldRc: oldRc(4, 4),
|
oldRc: oldRc(4, 4),
|
||||||
@ -1632,25 +1640,61 @@ func TestRollingUpdater_readyPods(t *testing.T) {
|
|||||||
false,
|
false,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
oldRc: oldRc(4, 4),
|
||||||
|
newRc: newRc(4, 4),
|
||||||
|
oldReady: 0,
|
||||||
|
newReady: 0,
|
||||||
|
oldPods: []bool{
|
||||||
|
true,
|
||||||
|
},
|
||||||
|
newPods: []bool{
|
||||||
|
true,
|
||||||
|
},
|
||||||
|
minReadySeconds: 5,
|
||||||
|
nowFn: func() unversioned.Time { return now },
|
||||||
|
},
|
||||||
|
{
|
||||||
|
oldRc: oldRc(4, 4),
|
||||||
|
newRc: newRc(4, 4),
|
||||||
|
oldReady: 1,
|
||||||
|
newReady: 1,
|
||||||
|
oldPods: []bool{
|
||||||
|
true,
|
||||||
|
},
|
||||||
|
newPods: []bool{
|
||||||
|
true,
|
||||||
|
},
|
||||||
|
minReadySeconds: 5,
|
||||||
|
nowFn: func() unversioned.Time { return unversioned.Time{Time: now.Add(time.Duration(6 * time.Second))} },
|
||||||
|
podReadyTimeFn: func() unversioned.Time { return now },
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, test := range tests {
|
for i, test := range tests {
|
||||||
t.Logf("evaluating test %d", i)
|
t.Logf("evaluating test %d", i)
|
||||||
|
if test.nowFn == nil {
|
||||||
|
test.nowFn = func() unversioned.Time { return now }
|
||||||
|
}
|
||||||
|
if test.podReadyTimeFn == nil {
|
||||||
|
test.podReadyTimeFn = test.nowFn
|
||||||
|
}
|
||||||
// Populate the fake client with pods associated with their owners.
|
// Populate the fake client with pods associated with their owners.
|
||||||
pods := []runtime.Object{}
|
pods := []runtime.Object{}
|
||||||
for _, ready := range test.oldPods {
|
for _, ready := range test.oldPods {
|
||||||
pods = append(pods, mkpod(test.oldRc, ready))
|
pods = append(pods, mkpod(test.oldRc, ready, test.podReadyTimeFn()))
|
||||||
}
|
}
|
||||||
for _, ready := range test.newPods {
|
for _, ready := range test.newPods {
|
||||||
pods = append(pods, mkpod(test.newRc, ready))
|
pods = append(pods, mkpod(test.newRc, ready, test.podReadyTimeFn()))
|
||||||
}
|
}
|
||||||
client := testclient.NewSimpleFake(pods...)
|
client := testclient.NewSimpleFake(pods...)
|
||||||
|
|
||||||
updater := &RollingUpdater{
|
updater := &RollingUpdater{
|
||||||
ns: "default",
|
ns: "default",
|
||||||
c: client,
|
c: client,
|
||||||
|
nowFn: test.nowFn,
|
||||||
}
|
}
|
||||||
oldReady, newReady, err := updater.readyPods(test.oldRc, test.newRc)
|
oldReady, newReady, err := updater.readyPods(test.oldRc, test.newRc, test.minReadySeconds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -354,14 +354,15 @@ func GetAvailablePodsForDeployment(c clientset.Interface, deployment *extensions
|
|||||||
func countAvailablePods(pods []api.Pod, minReadySeconds int32) int32 {
|
func countAvailablePods(pods []api.Pod, minReadySeconds int32) int32 {
|
||||||
availablePodCount := int32(0)
|
availablePodCount := int32(0)
|
||||||
for _, pod := range pods {
|
for _, pod := range pods {
|
||||||
if IsPodAvailable(&pod, minReadySeconds) {
|
// TODO: Make the time.Now() as argument to allow unit test this.
|
||||||
|
if IsPodAvailable(&pod, minReadySeconds, time.Now()) {
|
||||||
availablePodCount++
|
availablePodCount++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return availablePodCount
|
return availablePodCount
|
||||||
}
|
}
|
||||||
|
|
||||||
func IsPodAvailable(pod *api.Pod, minReadySeconds int32) bool {
|
func IsPodAvailable(pod *api.Pod, minReadySeconds int32, now time.Time) bool {
|
||||||
if !controller.IsPodActive(*pod) {
|
if !controller.IsPodActive(*pod) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -374,7 +375,7 @@ func IsPodAvailable(pod *api.Pod, minReadySeconds int32) bool {
|
|||||||
// 1. minReadySeconds == 0, or
|
// 1. minReadySeconds == 0, or
|
||||||
// 2. LastTransitionTime (is set) + minReadySeconds (>0) < current time
|
// 2. LastTransitionTime (is set) + minReadySeconds (>0) < current time
|
||||||
minReadySecondsDuration := time.Duration(minReadySeconds) * time.Second
|
minReadySecondsDuration := time.Duration(minReadySeconds) * time.Second
|
||||||
if minReadySeconds == 0 || !c.LastTransitionTime.IsZero() && c.LastTransitionTime.Add(minReadySecondsDuration).Before(time.Now()) {
|
if minReadySeconds == 0 || !c.LastTransitionTime.IsZero() && c.LastTransitionTime.Add(minReadySecondsDuration).Before(now) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3209,7 +3209,7 @@ func WaitForPodsReady(c *clientset.Clientset, ns, name string, minReadySeconds i
|
|||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
for _, pod := range pods.Items {
|
for _, pod := range pods.Items {
|
||||||
if !deploymentutil.IsPodAvailable(&pod, int32(minReadySeconds)) {
|
if !deploymentutil.IsPodAvailable(&pod, int32(minReadySeconds), time.Now()) {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -3260,7 +3260,7 @@ func logPodsOfDeployment(c clientset.Interface, deployment *extensions.Deploymen
|
|||||||
if err == nil {
|
if err == nil {
|
||||||
for _, pod := range podList.Items {
|
for _, pod := range podList.Items {
|
||||||
availability := "not available"
|
availability := "not available"
|
||||||
if deploymentutil.IsPodAvailable(&pod, minReadySeconds) {
|
if deploymentutil.IsPodAvailable(&pod, minReadySeconds, time.Now()) {
|
||||||
availability = "available"
|
availability = "available"
|
||||||
}
|
}
|
||||||
Logf("Pod %s is %s: %+v", pod.Name, availability, pod)
|
Logf("Pod %s is %s: %+v", pod.Name, availability, pod)
|
||||||
|
Loading…
Reference in New Issue
Block a user