statefulsets: MinReadySeconds implementation

https://github.com/kubernetes/kubernetes/pull/100842
introduced featuregate. This PR implements the logic
behind it.
This commit is contained in:
ravisantoshgudimetla 2021-04-19 16:37:00 -04:00
parent bc8acbc43e
commit ceb1dbd2f1
6 changed files with 364 additions and 49 deletions

View File

@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
appsinformers "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
@ -39,8 +40,10 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/history"
"k8s.io/kubernetes/pkg/features"
"k8s.io/klog/v2"
)
@ -85,7 +88,6 @@ func NewStatefulSetController(
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"})
ssc := &StatefulSetController{
kubeClient: kubeClient,
control: NewDefaultStatefulSetControl(
@ -221,6 +223,15 @@ func (ssc *StatefulSetController) updatePod(old, cur interface{}) {
}
klog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
ssc.enqueueStatefulSet(set)
// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
// the Pod status which in turn will trigger a requeue of the owning replica set thus
// having its status updated with the newly available replica.
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && set.Spec.MinReadySeconds > 0 {
klog.V(2).Infof("StatefulSet %s will be enqueued after %ds for availability check", set.Name, set.Spec.MinReadySeconds)
// Add a second to avoid milliseconds skew in AddAfter.
// See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
ssc.enqueueSSAfter(set, (time.Duration(set.Spec.MinReadySeconds)*time.Second)+time.Second)
}
return
}
@ -380,6 +391,16 @@ func (ssc *StatefulSetController) enqueueStatefulSet(obj interface{}) {
ssc.queue.Add(key)
}
// enqueueStatefulSet enqueues the given statefulset in the work queue after given time
func (ssc *StatefulSetController) enqueueSSAfter(ss *apps.StatefulSet, duration time.Duration) {
key, err := controller.KeyFunc(ss)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", ss, err))
return
}
ssc.queue.AddAfter(key, duration)
}
// processNextWorkItem dequeues items, processes them, and marks them done. It enforces that the syncHandler is never
// invoked concurrently with the same key.
func (ssc *StatefulSetController) processNextWorkItem() bool {
@ -446,10 +467,18 @@ func (ssc *StatefulSetController) sync(key string) error {
// syncStatefulSet syncs a tuple of (statefulset, []*v1.Pod).
func (ssc *StatefulSetController) syncStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {
klog.V(4).Infof("Syncing StatefulSet %v/%v with %d pods", set.Namespace, set.Name, len(pods))
var status *apps.StatefulSetStatus
var err error
// TODO: investigate where we mutate the set during the update as it is not obvious.
if err := ssc.control.UpdateStatefulSet(set.DeepCopy(), pods); err != nil {
status, err = ssc.control.UpdateStatefulSet(set.DeepCopy(), pods)
if err != nil {
return err
}
klog.V(4).Infof("Successfully synced StatefulSet %s/%s successful", set.Namespace, set.Name)
// One more sync to handle the clock skew. This is also helping in requeuing right after status update
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && set.Spec.MinReadySeconds > 0 && status != nil && status.AvailableReplicas != *set.Spec.Replicas {
ssc.enqueueSSAfter(set, time.Duration(set.Spec.MinReadySeconds)*time.Second)
}
return nil
}

View File

@ -23,9 +23,11 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/history"
"k8s.io/kubernetes/pkg/features"
)
// StatefulSetControl implements the control logic for updating StatefulSets and their children Pods. It is implemented
@ -36,7 +38,7 @@ type StatefulSetControlInterface interface {
// If an implementation returns a non-nil error, the invocation will be retried using a rate-limited strategy.
// Implementors should sink any errors that they do not wish to trigger a retry, and they may feel free to
// exit exceptionally at any point provided they wish the update to be re-run at a later point in time.
UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error
UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error)
// ListRevisions returns a array of the ControllerRevisions that represent the revisions of set. If the returned
// error is nil, the returns slice of ControllerRevisions is valid.
ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error)
@ -71,60 +73,57 @@ type defaultStatefulSetControl struct {
// strategy allows these constraints to be relaxed - pods will be created and deleted eagerly and
// in no particular order. Clients using the burst strategy should be careful to ensure they
// understand the consistency implications of having unpredictable numbers of pods available.
func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {
func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
// list all revisions and sort them
revisions, err := ssc.ListRevisions(set)
if err != nil {
return err
return nil, err
}
history.SortControllerRevisions(revisions)
currentRevision, updateRevision, err := ssc.performUpdate(set, pods, revisions)
currentRevision, updateRevision, status, err := ssc.performUpdate(set, pods, revisions)
if err != nil {
return utilerrors.NewAggregate([]error{err, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)})
return nil, utilerrors.NewAggregate([]error{err, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)})
}
// maintain the set's revision history limit
return ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
return status, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
}
func (ssc *defaultStatefulSetControl) performUpdate(
set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, error) {
set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) {
var currentStatus *apps.StatefulSetStatus
// get the current, and update revisions
currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions)
if err != nil {
return currentRevision, updateRevision, err
return currentRevision, updateRevision, currentStatus, err
}
// perform the main update function and get the status
status, err := ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods)
currentStatus, err = ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods)
if err != nil {
return currentRevision, updateRevision, err
return currentRevision, updateRevision, currentStatus, err
}
// update the set's status
err = ssc.updateStatefulSetStatus(set, status)
err = ssc.updateStatefulSetStatus(set, currentStatus)
if err != nil {
return currentRevision, updateRevision, err
return currentRevision, updateRevision, currentStatus, err
}
klog.V(4).Infof("StatefulSet %s/%s pod status replicas=%d ready=%d current=%d updated=%d",
set.Namespace,
set.Name,
status.Replicas,
status.ReadyReplicas,
status.CurrentReplicas,
status.UpdatedReplicas)
currentStatus.Replicas,
currentStatus.ReadyReplicas,
currentStatus.CurrentReplicas,
currentStatus.UpdatedReplicas)
klog.V(4).Infof("StatefulSet %s/%s revisions current=%s update=%s",
set.Namespace,
set.Name,
status.CurrentRevision,
status.UpdateRevision)
currentStatus.CurrentRevision,
currentStatus.UpdateRevision)
return currentRevision, updateRevision, nil
return currentRevision, updateRevision, currentStatus, nil
}
func (ssc *defaultStatefulSetControl) ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error) {
@ -307,6 +306,15 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
// count the number of running and ready replicas
if isRunningAndReady(pods[i]) {
status.ReadyReplicas++
// count the number of running and available replicas
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) {
if isRunningAndAvailable(pods[i], set.Spec.MinReadySeconds) {
status.AvailableReplicas++
}
} else {
// If the featuregate is not enabled, all the ready replicas should be considered as available replicas
status.AvailableReplicas = status.ReadyReplicas
}
}
// count the number of current and update replicas
@ -447,6 +455,19 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
replicas[i].Name)
return &status, nil
}
// If we have a Pod that has been created but is not available we can not make progress.
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
// ordinal, are Available.
// TODO: Since available is superset of Ready, once we have this featuregate enabled by default, we can remove the
// isRunningAndReady block as only Available pods should be brought down.
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to be Available",
set.Namespace,
set.Name,
replicas[i].Name)
return &status, nil
}
// Enforce the StatefulSet invariants
if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) {
continue
@ -458,7 +479,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
}
}
// At this point, all of the current Replicas are Running and Ready, we can consider termination.
// At this point, all of the current Replicas are Running, Ready and Available, we can consider termination.
// We will wait for all predecessors to be Running and Ready prior to attempting a deletion.
// We will terminate Pods in a monotonically decreasing order over [len(pods),set.Spec.Replicas).
// Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over
@ -486,6 +507,17 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
firstUnhealthyPod.Name)
return &status, nil
}
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block.
// TODO: Since available is superset of Ready, once we have this featuregate enabled by default, we can remove the
// isRunningAndReady block as only Available pods should be brought down.
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && !isRunningAndAvailable(condemned[target], set.Spec.MinReadySeconds) && monotonic && condemned[target] != firstUnhealthyPod {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to be Available prior to scale down",
set.Namespace,
set.Name,
firstUnhealthyPod.Name)
return &status, nil
}
klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for scale down",
set.Namespace,
set.Name,
@ -549,7 +581,6 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
func (ssc *defaultStatefulSetControl) updateStatefulSetStatus(
set *apps.StatefulSet,
status *apps.StatefulSetStatus) error {
// complete any in progress rolling update if necessary
completeRollingUpdate(set, status)

View File

@ -34,6 +34,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
appsinformers "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
@ -43,9 +44,11 @@ import (
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
featuregatetesting "k8s.io/component-base/featuregate/testing"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/history"
"k8s.io/kubernetes/pkg/features"
)
type invariantFunc func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error
@ -73,6 +76,11 @@ func burst(set *apps.StatefulSet) *apps.StatefulSet {
return set
}
func setMinReadySeconds(set *apps.StatefulSet, minReadySeconds int32) *apps.StatefulSet {
set.Spec.MinReadySeconds = minReadySeconds
return set
}
func TestStatefulSetControl(t *testing.T) {
simpleSetFn := func() *apps.StatefulSet { return newStatefulSet(3) }
largeSetFn := func() *apps.StatefulSet { return newStatefulSet(5) }
@ -221,7 +229,7 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc)
if err != nil {
t.Error(err)
}
if err = ssc.UpdateStatefulSet(set, pods); err != nil {
if _, err = ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Failed to update StatefulSet : %s", err)
}
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@ -231,7 +239,7 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc)
if pods, err = spc.setPodRunning(set, i); err != nil {
t.Error(err)
}
if err = ssc.UpdateStatefulSet(set, pods); err != nil {
if _, err = ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Failed to update StatefulSet : %s", err)
}
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@ -246,7 +254,7 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc)
if err != nil {
t.Error(err)
}
if err := ssc.UpdateStatefulSet(set, pods); err != nil {
if _, err := ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Failed to update StatefulSet : %s", err)
}
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@ -270,7 +278,7 @@ func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invarian
if err != nil {
t.Error(err)
}
if err := ssc.UpdateStatefulSet(set, pods); err != nil {
if _, err := ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, spc); err != nil {
@ -282,7 +290,7 @@ func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invarian
}
pods[0].Status.Phase = v1.PodFailed
spc.podsIndexer.Update(pods[0])
if err := ssc.UpdateStatefulSet(set, pods); err != nil {
if _, err := ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, spc); err != nil {
@ -363,7 +371,7 @@ func UpdatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantF
spc.podsIndexer.Update(pods[0])
// now it should fail
if err := ssc.UpdateStatefulSet(set, pods); err != nil && isOrHasInternalError(err) {
if _, err := ssc.UpdateStatefulSet(set, pods); err != nil && isOrHasInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError found %s", err)
}
}
@ -409,7 +417,7 @@ func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants in
if err != nil {
t.Error(err)
}
if err := ssc.UpdateStatefulSet(set, pods); err != nil {
if _, err := ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, spc); err != nil {
@ -422,13 +430,13 @@ func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants in
pods[0].Status.Phase = v1.PodFailed
spc.podsIndexer.Update(pods[0])
spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
if err := ssc.UpdateStatefulSet(set, pods); err != nil && isOrHasInternalError(err) {
if _, err := ssc.UpdateStatefulSet(set, pods); err != nil && isOrHasInternalError(err) {
t.Errorf("StatefulSet failed to %s", err)
}
if err := invariants(set, spc); err != nil {
t.Error(err)
}
if err := ssc.UpdateStatefulSet(set, pods); err != nil {
if _, err := ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, spc); err != nil {
@ -1274,7 +1282,7 @@ func TestStatefulSetControlLimitsHistory(t *testing.T) {
if err != nil {
t.Fatalf("%s: %s", test.name, err)
}
err = ssc.UpdateStatefulSet(set, pods)
_, err = ssc.UpdateStatefulSet(set, pods)
if err != nil {
t.Fatalf("%s: %s", test.name, err)
}
@ -1565,6 +1573,72 @@ func TestStatefulSetControlRollback(t *testing.T) {
}
}
func TestStatefulSetAvailability(t *testing.T) {
tests := []struct {
name string
inputSTS *apps.StatefulSet
expectedActiveReplicas int32
readyDuration time.Duration
minReadySecondsFeaturegateEnabled bool
}{
{
name: "replicas not running for required time, still will be available," +
" when minReadySeconds is disabled",
inputSTS: setMinReadySeconds(newStatefulSet(1), int32(3600)),
readyDuration: 0 * time.Minute,
expectedActiveReplicas: int32(1),
minReadySecondsFeaturegateEnabled: false,
},
{
name: "replicas running for required time, when minReadySeconds is enabled",
inputSTS: setMinReadySeconds(newStatefulSet(1), int32(3600)),
readyDuration: -120 * time.Minute,
expectedActiveReplicas: int32(1),
minReadySecondsFeaturegateEnabled: true,
},
{
name: "replicas not running for required time, when minReadySeconds is enabled",
inputSTS: setMinReadySeconds(newStatefulSet(1), int32(3600)),
readyDuration: -30 * time.Minute,
expectedActiveReplicas: int32(0),
minReadySecondsFeaturegateEnabled: true,
},
}
for _, test := range tests {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetMinReadySeconds, test.minReadySecondsFeaturegateEnabled)()
set := test.inputSTS
client := fake.NewSimpleClientset(set)
spc, _, ssc, stop := setupController(client)
defer close(stop)
if err := scaleUpStatefulSetControl(set, ssc, spc, assertBurstInvariants); err != nil {
t.Fatalf("%s: %s", test.name, err)
}
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Fatalf("%s: %s", test.name, err)
}
_, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatalf("%s: %s", test.name, err)
}
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatalf("%s: %s", test.name, err)
}
pods, err := spc.setPodAvailable(set, 0, time.Now().Add(test.readyDuration))
if err != nil {
t.Fatalf("%s: %s", test.name, err)
}
status, err := ssc.UpdateStatefulSet(set, pods)
if err != nil {
t.Fatalf("%s: %s", test.name, err)
}
if status.AvailableReplicas != test.expectedActiveReplicas {
t.Fatalf("expected %d active replicas got %d", test.expectedActiveReplicas, status.AvailableReplicas)
}
}
}
type requestTracker struct {
requests int
err error
@ -1688,6 +1762,39 @@ func (spc *fakeStatefulPodControl) setPodReady(set *apps.StatefulSet, ordinal in
return spc.podsLister.Pods(set.Namespace).List(selector)
}
func (spc *fakeStatefulPodControl) setPodAvailable(set *apps.StatefulSet, ordinal int, lastTransitionTime time.Time) ([]*v1.Pod, error) {
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
return nil, err
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
return nil, err
}
if 0 > ordinal || ordinal >= len(pods) {
return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods))
}
sort.Sort(ascendingOrdinal(pods))
pod := pods[ordinal].DeepCopy()
condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: metav1.Time{Time: lastTransitionTime}}
_, existingCondition := podutil.GetPodCondition(&pod.Status, condition.Type)
if existingCondition != nil {
existingCondition.Status = v1.ConditionTrue
existingCondition.LastTransitionTime = metav1.Time{Time: lastTransitionTime}
} else {
existingCondition = &v1.PodCondition{
Type: v1.PodReady,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: lastTransitionTime},
}
pod.Status.Conditions = append(pod.Status.Conditions, *existingCondition)
}
podutil.UpdatePodCondition(&pod.Status, &condition)
fakeResourceVersion(pod)
spc.podsIndexer.Update(pod)
return spc.podsLister.Pods(set.Namespace).List(selector)
}
func (spc *fakeStatefulPodControl) addTerminatingPod(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
pod := newStatefulSetPod(set, ordinal)
pod.Status.Phase = v1.PodRunning
@ -1981,9 +2088,9 @@ func scaleUpStatefulSetControl(set *apps.StatefulSet,
continue
}
}
// run the controller once and check invariants
if err = ssc.UpdateStatefulSet(set, pods); err != nil {
_, err = ssc.UpdateStatefulSet(set, pods)
if err != nil {
return err
}
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@ -1993,6 +2100,7 @@ func scaleUpStatefulSetControl(set *apps.StatefulSet,
if err := invariants(set, spc); err != nil {
return err
}
//fmt.Printf("Ravig pod conditions %v %v", set.Status.ReadyReplicas, *set.Spec.Replicas)
}
return invariants(set, spc)
}
@ -2009,7 +2117,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn
}
sort.Sort(ascendingOrdinal(pods))
if ordinal := len(pods) - 1; ordinal >= 0 {
if err := ssc.UpdateStatefulSet(set, pods); err != nil {
if _, err := ssc.UpdateStatefulSet(set, pods); err != nil {
return err
}
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@ -2019,7 +2127,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn
if pods, err = spc.addTerminatingPod(set, ordinal); err != nil {
return err
}
if err = ssc.UpdateStatefulSet(set, pods); err != nil {
if _, err = ssc.UpdateStatefulSet(set, pods); err != nil {
return err
}
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@ -2036,7 +2144,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn
spc.podsIndexer.Delete(pods[len(pods)-1])
}
}
if err := ssc.UpdateStatefulSet(set, pods); err != nil {
if _, err := ssc.UpdateStatefulSet(set, pods); err != nil {
return err
}
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@ -2099,7 +2207,7 @@ func updateStatefulSetControl(set *apps.StatefulSet,
if err != nil {
return err
}
if err = ssc.UpdateStatefulSet(set, pods); err != nil {
if _, err = ssc.UpdateStatefulSet(set, pods); err != nil {
return err
}
@ -2147,7 +2255,7 @@ func updateStatefulSetControl(set *apps.StatefulSet,
}
}
if err = ssc.UpdateStatefulSet(set, pods); err != nil {
if _, err = ssc.UpdateStatefulSet(set, pods); err != nil {
return err
}
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)

View File

@ -20,15 +20,16 @@ import (
"errors"
"testing"
apps "k8s.io/api/apps/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
apps "k8s.io/api/apps/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes/fake"
appslisters "k8s.io/client-go/listers/apps/v1"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
)
func TestStatefulSetUpdaterUpdatesSetStatus(t *testing.T) {
@ -124,3 +125,21 @@ func TestStatefulSetStatusUpdaterUpdateReplicasConflictFailure(t *testing.T) {
t.Error("UpdateStatefulSetStatus failed to return an error on get failure")
}
}
func TestStatefulSetStatusUpdaterGetAvailableReplicas(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetMinReadySeconds, true)()
set := newStatefulSet(3)
status := apps.StatefulSetStatus{ObservedGeneration: 1, Replicas: 2, AvailableReplicas: 3}
fakeClient := &fake.Clientset{}
updater := NewRealStatefulSetStatusUpdater(fakeClient, nil)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
return true, update.GetObject(), nil
})
if err := updater.UpdateStatefulSetStatus(set, &status); err != nil {
t.Errorf("Error returned on successful status update: %s", err)
}
if set.Status.AvailableReplicas != 3 {
t.Errorf("UpdateStatefulSetStatus mutated the sets replicas %d", set.Status.AvailableReplicas)
}
}

View File

@ -204,6 +204,10 @@ func isRunningAndReady(pod *v1.Pod) bool {
return pod.Status.Phase == v1.PodRunning && podutil.IsPodReady(pod)
}
func isRunningAndAvailable(pod *v1.Pod, minReadySeconds int32) bool {
return podutil.IsPodAvailable(pod, minReadySeconds, metav1.Now())
}
// isCreated returns true if pod has been created and is maintained by the API server
func isCreated(pod *v1.Pod) bool {
return pod.Status.Phase != ""
@ -366,6 +370,7 @@ func inconsistentStatus(set *apps.StatefulSet, status *apps.StatefulSetStatus) b
status.ReadyReplicas != set.Status.ReadyReplicas ||
status.UpdatedReplicas != set.Status.UpdatedReplicas ||
status.CurrentRevision != set.Status.CurrentRevision ||
status.AvailableReplicas != set.Status.AvailableReplicas ||
status.UpdateRevision != set.Status.UpdateRevision
}

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"testing"
"time"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
@ -27,11 +28,21 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes"
featuregatetesting "k8s.io/component-base/featuregate/testing"
apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/test/integration/framework"
)
const (
interval = 100 * time.Millisecond
timeout = 60 * time.Second
)
// TestVolumeTemplateNoopUpdate ensures embedded StatefulSet objects with embedded PersistentVolumes can be updated
func TestVolumeTemplateNoopUpdate(t *testing.T) {
// Start the server with default storage setup
@ -226,3 +237,115 @@ func TestDeletingAndFailedPods(t *testing.T) {
t.Fatalf("failed to verify deleting pod %s has been replaced with a new non-deleting pod: %v", deletingPod.Name, err)
}
}
func TestStatefulSetAvailable(t *testing.T) {
tests := []struct {
name string
totalReplicas int32
readyReplicas int32
activeReplicas int32
enabled bool
}{
{
name: "When feature gate is enabled, only certain replicas would become active",
totalReplicas: 4,
readyReplicas: 3,
activeReplicas: 2,
enabled: true,
},
{
name: "When feature gate is disabled, all the ready replicas would become active",
totalReplicas: 4,
readyReplicas: 3,
activeReplicas: 3,
enabled: false,
},
}
for _, test := range tests {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetMinReadySeconds, test.enabled)()
s, closeFn, rm, informers, c := scSetup(t)
defer closeFn()
ns := framework.CreateTestingNamespace("test-available-pods", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
stopCh := runControllerAndInformers(rm, informers)
defer close(stopCh)
labelMap := labelMap()
sts := newSTS("sts", ns.Name, 4)
sts.Spec.MinReadySeconds = int32(3600)
stss, _ := createSTSsPods(t, c, []*appsv1.StatefulSet{sts}, []*v1.Pod{})
sts = stss[0]
waitSTSStable(t, c, sts)
// Verify STS creates 4 pods
podClient := c.CoreV1().Pods(ns.Name)
pods := getPods(t, podClient, labelMap)
if len(pods.Items) != 4 {
t.Fatalf("len(pods) = %d, want 4", len(pods.Items))
}
// Separate 3 pods into their own list
firstPodList := &v1.PodList{Items: pods.Items[:1]}
secondPodList := &v1.PodList{Items: pods.Items[1:2]}
thirdPodList := &v1.PodList{Items: pods.Items[2:]}
// First pod: Running, but not Ready
// by setting the Ready condition to false with LastTransitionTime to be now
setPodsReadyCondition(t, c, firstPodList, v1.ConditionFalse, time.Now())
// Second pod: Running and Ready, but not Available
// by setting LastTransitionTime to now
setPodsReadyCondition(t, c, secondPodList, v1.ConditionTrue, time.Now())
// Third pod: Running, Ready, and Available
// by setting LastTransitionTime to more than 3600 seconds ago
setPodsReadyCondition(t, c, thirdPodList, v1.ConditionTrue, time.Now().Add(-120*time.Minute))
stsClient := c.AppsV1().StatefulSets(ns.Name)
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
newSts, err := stsClient.Get(context.TODO(), sts.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
// Verify 4 pods exist, 3 pods are Ready, and 2 pods are Available
return newSts.Status.Replicas == test.totalReplicas && newSts.Status.ReadyReplicas == test.readyReplicas && newSts.Status.AvailableReplicas == test.activeReplicas, nil
}); err != nil {
t.Fatalf("Failed to verify number of Replicas, ReadyReplicas and AvailableReplicas of rs %s to be as expected: %v", sts.Name, err)
}
}
}
func setPodsReadyCondition(t *testing.T, clientSet clientset.Interface, pods *v1.PodList, conditionStatus v1.ConditionStatus, lastTransitionTime time.Time) {
replicas := int32(len(pods.Items))
var readyPods int32
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
readyPods = 0
for i := range pods.Items {
pod := &pods.Items[i]
if podutil.IsPodReady(pod) {
readyPods++
continue
}
pod.Status.Phase = v1.PodRunning
_, condition := podutil.GetPodCondition(&pod.Status, v1.PodReady)
if condition != nil {
condition.Status = conditionStatus
condition.LastTransitionTime = metav1.Time{Time: lastTransitionTime}
} else {
condition = &v1.PodCondition{
Type: v1.PodReady,
Status: conditionStatus,
LastTransitionTime: metav1.Time{Time: lastTransitionTime},
}
pod.Status.Conditions = append(pod.Status.Conditions, *condition)
}
_, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{})
if err != nil {
// When status fails to be updated, we continue to next pod
continue
}
readyPods++
}
return readyPods >= replicas, nil
})
if err != nil {
t.Fatalf("failed to mark all StatefulSet pods to ready: %v", err)
}
}