add maxUnavailable implementation and UT

This commit is contained in:
Mayank Kumar 2020-06-09 01:54:20 -07:00
parent 357203d992
commit 2733b66e80
2 changed files with 406 additions and 0 deletions

View File

@ -22,8 +22,10 @@ import (
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
intstrutil "k8s.io/apimachinery/pkg/util/intstr"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
@ -546,6 +548,16 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
return &status, nil
}
if utilfeature.DefaultFeatureGate.Enabled(features.MaxUnavailableStatefulSet) {
return updateStatefulSetAfterInvariantEstablished(ctx,
ssc,
set,
replicas,
updateRevision,
status,
)
}
// we compute the minimum ordinal of the target sequence for a destructive update based on the strategy.
updateMin := 0
if set.Spec.UpdateStrategy.RollingUpdate != nil {
@ -574,6 +586,80 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
return &status, nil
}
func updateStatefulSetAfterInvariantEstablished(
ctx context.Context,
ssc *defaultStatefulSetControl,
set *apps.StatefulSet,
replicas []*v1.Pod,
updateRevision *apps.ControllerRevision,
status apps.StatefulSetStatus,
) (*apps.StatefulSetStatus, error) {
replicaCount := int(*set.Spec.Replicas)
// we compute the minimum ordinal of the target sequence for a destructive update based on the strategy.
updateMin := 0
maxUnavailable := 1
if set.Spec.UpdateStrategy.RollingUpdate != nil {
updateMin = int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
// if the feature was enabled and then later disabled, MaxUnavailable may have a value
// other than 1. Ignore the passed in value and Use maxUnavailable as 1 to enforce
// expected behavior when feature gate is not enabled.
var err error
maxUnavailable, err = intstrutil.GetValueFromIntOrPercent(intstrutil.ValueOrDefault(set.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, intstrutil.FromInt(1)), int(replicaCount), false)
if err != nil {
return &status, err
}
}
// Collect all targets in the range between the 0 and Spec.Replicas. Count any targets in that range
// that are unhealthy i.e. terminated or not running and ready as unavailable). Select the
// (MaxUnavailable - Unavailable) Pods, in order with respect to their ordinal for termination. Delete
// those pods and count the successful deletions. Update the status with the correct number of deletions.
unavailablePods := 0
for target := len(replicas) - 1; target >= 0; target-- {
if !isHealthy(replicas[target]) {
unavailablePods++
}
}
if unavailablePods >= maxUnavailable {
klog.V(2).Infof("StatefulSet %s/%s found unavailablePods %v, more than or equal to allowed maxUnavailable %v",
set.Namespace,
set.Name,
unavailablePods,
maxUnavailable)
return &status, nil
}
// Now we need to delete MaxUnavailable- unavailablePods
// start deleting one by one starting from the highest ordinal first
podsToDelete := maxUnavailable - unavailablePods
deletedPods := 0
for target := len(replicas) - 1; target >= updateMin && deletedPods < podsToDelete; target-- {
// delete the Pod if it is healthy and the revision doesnt match the target
if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) {
// delete the Pod if it is healthy and the revision doesnt match the target
klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for update",
set.Namespace,
set.Name,
replicas[target].Name)
if err := ssc.podControl.DeleteStatefulPod(set, replicas[target]); err != nil {
if !errors.IsNotFound(err) {
return nil, err
}
}
deletedPods++
status.CurrentReplicas--
}
}
return &status, nil
}
// updateStatefulSetStatus updates set's Status to be equal to status. If status indicates a complete update, it is
// mutated to indicate completion. If status is semantically equivalent to set's Status no update is performed. If the
// returned error is nil, the update is successful.

View File

@ -36,6 +36,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/intstr"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
appsinformers "k8s.io/client-go/informers/apps/v1"
@ -728,6 +729,325 @@ func TestStatefulSetControl_getSetRevisions(t *testing.T) {
})
}
func setupPodManagementPolicy(podManagementPolicy apps.PodManagementPolicyType, set *apps.StatefulSet) *apps.StatefulSet {
set.Spec.PodManagementPolicy = podManagementPolicy
return set
}
func TestStatefulSetControlRollingUpdateWithMaxUnavailable(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MaxUnavailableStatefulSet, true)()
simpleParallelVerificationFn := func(
set *apps.StatefulSet,
spc *fakeObjectManager,
ssc StatefulSetControlInterface,
pods []*v1.Pod,
totalPods int,
selector labels.Selector,
) []*v1.Pod {
// in burst mode, 2 pods got deleted, so 2 new pods will be created at the same time
if len(pods) != totalPods {
t.Fatalf("Expected create pods 4/5, got pods %v", len(pods))
}
// if pod 4 ready, start to update pod 3, even though 5 is not ready
spc.setPodRunning(set, 4)
spc.setPodRunning(set, 5)
originalPods, _ := spc.setPodReady(set, 4)
sort.Sort(ascendingOrdinal(originalPods))
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
t.Fatal(err)
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
sort.Sort(ascendingOrdinal(pods))
// pods 0, 1,2, 4,5 should be present(note 3 is missing)
if !reflect.DeepEqual(pods, append(originalPods[:3], originalPods[4:]...)) {
t.Fatalf("Expected pods %v, got pods %v", append(originalPods[:3], originalPods[4:]...), pods)
}
// create new pod 3
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
if len(pods) != totalPods {
t.Fatalf("Expected create pods 2/3, got pods %v", pods)
}
return pods
}
simpleOrderedVerificationFn := func(
set *apps.StatefulSet,
spc *fakeObjectManager,
ssc StatefulSetControlInterface,
pods []*v1.Pod,
totalPods int,
selector labels.Selector,
) []*v1.Pod {
// only one pod gets created at a time due to OrderedReady
if len(pods) != 5 {
t.Fatalf("Expected create pods 5, got pods %v", len(pods))
}
spc.setPodRunning(set, 4)
pods, _ = spc.setPodReady(set, 4)
// create new pods 4(only one pod gets created at a time due to OrderedReady)
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Fatal(err)
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
if len(pods) != totalPods {
t.Fatalf("Expected create pods 4, got pods %v", len(pods))
}
// if pod 4 ready, start to update pod 3
spc.setPodRunning(set, 5)
originalPods, _ := spc.setPodReady(set, 5)
sort.Sort(ascendingOrdinal(originalPods))
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
sort.Sort(ascendingOrdinal(pods))
// verify the remaining pods are 0,1,2,4,5 (3 got deleted)
if !reflect.DeepEqual(pods, append(originalPods[:3], originalPods[4:]...)) {
t.Fatalf("Expected pods %v, got pods %v", append(originalPods[:3], originalPods[4:]...), pods)
}
// create new pod 3
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
if len(pods) != totalPods {
t.Fatalf("Expected create pods 2/3, got pods %v", pods)
}
return pods
}
testCases := []struct {
policyType apps.PodManagementPolicyType
verifyFn func(
set *apps.StatefulSet,
spc *fakeObjectManager,
ssc StatefulSetControlInterface,
pods []*v1.Pod,
totalPods int,
selector labels.Selector,
) []*v1.Pod
}{
{apps.OrderedReadyPodManagement, simpleOrderedVerificationFn},
{apps.ParallelPodManagement, simpleParallelVerificationFn},
}
for _, tc := range testCases {
// Setup the statefulSet controller
totalPods := 6
var partition int32 = 3
var maxUnavailable = intstr.FromInt(2)
set := setupPodManagementPolicy(tc.policyType, newStatefulSet(totalPods))
set.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{
Type: apps.RollingUpdateStatefulSetStrategyType,
RollingUpdate: func() *apps.RollingUpdateStatefulSetStrategy {
return &apps.RollingUpdateStatefulSetStrategy{
Partition: &partition,
MaxUnavailable: &maxUnavailable,
}
}(),
}
client := fake.NewSimpleClientset()
spc, _, ssc := setupController(client)
if err := scaleUpStatefulSetControl(set, ssc, spc, assertBurstInvariants); err != nil {
t.Fatal(err)
}
set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatal(err)
}
// Change the image to trigger an update
set.Spec.Template.Spec.Containers[0].Image = "foo"
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Fatal(err)
}
originalPods, err := spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
sort.Sort(ascendingOrdinal(originalPods))
// since maxUnavailable is 2, update pods 4 and 5, this will delete the pod 4 and 5,
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
t.Fatal(err)
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
sort.Sort(ascendingOrdinal(pods))
// expected number of pod is 0,1,2,3
if !reflect.DeepEqual(pods, originalPods[:4]) {
t.Fatalf("Expected pods %v, got pods %v", originalPods[:4], pods)
}
// create new pods
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
tc.verifyFn(set, spc, ssc, pods, totalPods, selector)
// pods 3/4/5 ready, should not update other pods
spc.setPodRunning(set, 3)
spc.setPodRunning(set, 5)
spc.setPodReady(set, 5)
originalPods, _ = spc.setPodReady(set, 3)
sort.Sort(ascendingOrdinal(originalPods))
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
sort.Sort(ascendingOrdinal(pods))
if !reflect.DeepEqual(pods, originalPods) {
t.Fatalf("Expected pods %v, got pods %v", originalPods, pods)
}
}
}
func setupForInvariant(t *testing.T) (*apps.StatefulSet, *fakeObjectManager, StatefulSetControlInterface, intstr.IntOrString, int) {
totalPods := 6
set := newStatefulSet(totalPods)
// update all pods >=3(3,4,5)
var partition int32 = 3
var maxUnavailable = intstr.FromInt(2)
set.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{
Type: apps.RollingUpdateStatefulSetStrategyType,
RollingUpdate: func() *apps.RollingUpdateStatefulSetStrategy {
return &apps.RollingUpdateStatefulSetStrategy{
Partition: &partition,
MaxUnavailable: &maxUnavailable,
}
}(),
}
client := fake.NewSimpleClientset()
spc, _, ssc := setupController(client)
if err := scaleUpStatefulSetControl(set, ssc, spc, assertBurstInvariants); err != nil {
t.Fatal(err)
}
set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatal(err)
}
return set, spc, ssc, maxUnavailable, totalPods
}
func TestStatefulSetControlRollingUpdateWithMaxUnavailableInOrderedModeVerifyInvariant(t *testing.T) {
// Make all pods in statefulset unavailable one by one
// and verify that RollingUpdate doesnt proceed with maxUnavailable set
// this could have been a simple loop, keeping it like this to be able
// to add more params here.
testCases := []struct {
ordinalOfPodToTerminate []int
}{
{[]int{}},
{[]int{5}},
{[]int{3}},
{[]int{4}},
{[]int{5, 4}},
{[]int{5, 3}},
{[]int{4, 3}},
{[]int{5, 4, 3}},
{[]int{2}}, // note this is an ordinal greater than partition(3)
{[]int{1}}, // note this is an ordinal greater than partition(3)
}
for _, tc := range testCases {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MaxUnavailableStatefulSet, true)()
set, spc, ssc, maxUnavailable, totalPods := setupForInvariant(t)
t.Run(fmt.Sprintf("terminating pod at ordinal %d", tc.ordinalOfPodToTerminate), func(t *testing.T) {
status := apps.StatefulSetStatus{Replicas: int32(totalPods)}
updateRevision := &apps.ControllerRevision{}
for i := 0; i < len(tc.ordinalOfPodToTerminate); i++ {
// Ensure at least one pod is unavailable before trying to update
_, err := spc.addTerminatingPod(set, tc.ordinalOfPodToTerminate[i])
if err != nil {
t.Fatal(err)
}
}
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Fatal(err)
}
originalPods, err := spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
sort.Sort(ascendingOrdinal(originalPods))
// start to update
set.Spec.Template.Spec.Containers[0].Image = "foo"
// try to update the statefulset
// this function is only called in main code when feature gate is enabled
if _, err = updateStatefulSetAfterInvariantEstablished(context.TODO(), ssc.(*defaultStatefulSetControl), set, originalPods, updateRevision, status); err != nil {
t.Fatal(err)
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
sort.Sort(ascendingOrdinal(pods))
expecteddPodsToBeDeleted := maxUnavailable.IntValue() - len(tc.ordinalOfPodToTerminate)
if expecteddPodsToBeDeleted < 0 {
expecteddPodsToBeDeleted = 0
}
expectedPodsAfterUpdate := totalPods - expecteddPodsToBeDeleted
if len(pods) != expectedPodsAfterUpdate {
t.Errorf("Expected pods %v, got pods %v", expectedPodsAfterUpdate, len(pods))
}
})
}
}
func TestStatefulSetControlRollingUpdate(t *testing.T) {
type testcase struct {
name string