From 63e5b6bdb24c44872621874cfcbbe3423f8a525e Mon Sep 17 00:00:00 2001 From: Aleksandra Malinowska Date: Tue, 20 Jun 2023 10:30:30 +0200 Subject: [PATCH] Parallel StatefulSet pod create & delete --- .../statefulset/stateful_set_control.go | 96 +++++++++++++++---- .../statefulset/stateful_set_control_test.go | 25 ++++- 2 files changed, 100 insertions(+), 21 deletions(-) diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index 5f45a7b820e..f55b7d1d67c 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -19,6 +19,7 @@ package statefulset import ( "context" "sort" + "sync" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -30,8 +31,12 @@ import ( "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller/history" "k8s.io/kubernetes/pkg/features" + "k8s.io/utils/integer" ) +// Realistic value for maximum in-flight requests when processing in parallel mode. +const MaxBatchSize = 500 + // StatefulSetControl implements the control logic for updating StatefulSets and their children Pods. It is implemented // as an interface to allow for extensions that provide different semantics. Currently, there is only one implementation. type StatefulSetControlInterface interface { @@ -87,7 +92,11 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set currentRevision, updateRevision, status, err := ssc.performUpdate(ctx, set, pods, revisions) if err != nil { - return nil, utilerrors.NewAggregate([]error{err, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)}) + errs := []error{err} + if agg, ok := err.(utilerrors.Aggregate); ok { + errs = agg.Errors() + } + return nil, utilerrors.NewAggregate(append(errs, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision))) } // maintain the set's revision history limit @@ -269,6 +278,38 @@ func (ssc *defaultStatefulSetControl) getStatefulSetRevisions( return currentRevision, updateRevision, collisionCount, nil } +func slowStartBatch(initialBatchSize int, remaining int, fn func(int) (bool, error)) (int, error) { + successes := 0 + j := 0 + for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(integer.IntMin(2*batchSize, remaining), MaxBatchSize) { + errCh := make(chan error, batchSize) + var wg sync.WaitGroup + wg.Add(batchSize) + for i := 0; i < batchSize; i++ { + go func(k int) { + defer wg.Done() + // Ignore the first parameter - relevant for monotonic only. + if _, err := fn(k); err != nil { + errCh <- err + } + }(j) + j++ + } + wg.Wait() + successes += batchSize - len(errCh) + close(errCh) + if len(errCh) > 0 { + errs := make([]error, 0) + for err := range errCh { + errs = append(errs, err) + } + return successes, utilerrors.NewAggregate(errs) + } + remaining -= batchSize + } + return successes, nil +} + type replicaStatus struct { replicas int32 readyReplicas int32 @@ -460,6 +501,21 @@ func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set return true, ssc.podControl.DeleteStatefulPod(set, condemned[i]) } +func runForAll(pods []*v1.Pod, fn func(i int) (bool, error), monotonic bool) (bool, error) { + if monotonic { + for i := range pods { + if shouldExit, err := fn(i); shouldExit || err != nil { + return true, err + } + } + } else { + if _, err := slowStartBatch(1, len(pods), fn); err != nil { + return true, err + } + } + return false, nil +} + // updateStatefulSet performs the update function for a StatefulSet. This method creates, updates, and deletes Pods in // the set in order to conform the system to the target state for the set. The target state always contains // set.Spec.Replicas Pods with a Ready Condition. If the UpdateStrategy.Type for the set is @@ -565,12 +621,13 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( monotonic := !allowsBurst(set) - // Examine each replica with respect to its ordinal - for i := range replicas { - if shouldExit, err := ssc.processReplica(ctx, set, currentRevision, updateRevision, currentSet, updateSet, monotonic, replicas, i); shouldExit || err != nil { - updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) - return &status, err - } + // First, process each living replica. Exit if we run into an error or something blocking in monotonic mode. + processReplicaFn := func(i int) (bool, error) { + return ssc.processReplica(ctx, set, currentRevision, updateRevision, currentSet, updateSet, monotonic, replicas, i) + } + if shouldExit, err := runForAll(replicas, processReplicaFn, monotonic); shouldExit || err != nil { + updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) + return &status, err } // Fix pod claims for condemned pods, if necessary. @@ -585,21 +642,26 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( } return false, nil } - for i := range condemned { - if shouldExit, err := fixPodClaim(i); shouldExit || err != nil { - updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) - return &status, err - } - } - } - - for i := range condemned { - if shouldExit, err := ssc.processCondemned(ctx, set, firstUnhealthyPod, monotonic, condemned, i); shouldExit || err != nil { + if shouldExit, err := runForAll(condemned, fixPodClaim, monotonic); shouldExit || err != nil { updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) return &status, err } } + // At this point, in monotonic mode all of the current Replicas are Running, Ready and Available, + // and we can consider termination. + // We will wait for all predecessors to be Running and Ready prior to attempting a deletion. + // We will terminate Pods in a monotonically decreasing order. + // Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over + // updates. + processCondemnedFn := func(i int) (bool, error) { + return ssc.processCondemned(ctx, set, firstUnhealthyPod, monotonic, condemned, i) + } + if shouldExit, err := runForAll(condemned, processCondemnedFn, monotonic); shouldExit || err != nil { + updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) + return &status, err + } + updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) // for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted. diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index f12fba34e3c..bebaa4469d5 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -26,6 +26,7 @@ import ( "sort" "strconv" "strings" + "sync" "testing" "time" @@ -2357,24 +2358,39 @@ func TestStatefulSetStatusUpdate(t *testing.T) { } type requestTracker struct { + sync.Mutex requests int err error after int } func (rt *requestTracker) errorReady() bool { + rt.Lock() + defer rt.Unlock() return rt.err != nil && rt.requests >= rt.after } func (rt *requestTracker) inc() { + rt.Lock() + defer rt.Unlock() rt.requests++ } func (rt *requestTracker) reset() { + rt.Lock() + defer rt.Unlock() rt.err = nil rt.after = 0 } +func newRequestTracker(requests int, err error, after int) requestTracker { + return requestTracker{ + requests: requests, + err: err, + after: after, + } +} + type fakeObjectManager struct { podsLister corelisters.PodLister claimsLister corelisters.PersistentVolumeClaimLister @@ -2402,9 +2418,10 @@ func newFakeObjectManager(informerFactory informers.SharedInformerFactory) *fake claimInformer.Informer().GetIndexer(), setInformer.Informer().GetIndexer(), revisionInformer.Informer().GetIndexer(), - requestTracker{0, nil, 0}, - requestTracker{0, nil, 0}, - requestTracker{0, nil, 0}} + newRequestTracker(0, nil, 0), + newRequestTracker(0, nil, 0), + newRequestTracker(0, nil, 0), + } } func (om *fakeObjectManager) CreatePod(ctx context.Context, pod *v1.Pod) error { @@ -2619,7 +2636,7 @@ func newFakeStatefulSetStatusUpdater(setInformer appsinformers.StatefulSetInform return &fakeStatefulSetStatusUpdater{ setInformer.Lister(), setInformer.Informer().GetIndexer(), - requestTracker{0, nil, 0}, + newRequestTracker(0, nil, 0), } }