mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-09 12:07:47 +00:00
Parallel StatefulSet pod create & delete
This commit is contained in:
parent
2fb472c83c
commit
63e5b6bdb2
@ -19,6 +19,7 @@ package statefulset
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sort"
|
"sort"
|
||||||
|
"sync"
|
||||||
|
|
||||||
apps "k8s.io/api/apps/v1"
|
apps "k8s.io/api/apps/v1"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
@ -30,8 +31,12 @@ import (
|
|||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
"k8s.io/kubernetes/pkg/controller/history"
|
"k8s.io/kubernetes/pkg/controller/history"
|
||||||
"k8s.io/kubernetes/pkg/features"
|
"k8s.io/kubernetes/pkg/features"
|
||||||
|
"k8s.io/utils/integer"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Realistic value for maximum in-flight requests when processing in parallel mode.
|
||||||
|
const MaxBatchSize = 500
|
||||||
|
|
||||||
// StatefulSetControl implements the control logic for updating StatefulSets and their children Pods. It is implemented
|
// StatefulSetControl implements the control logic for updating StatefulSets and their children Pods. It is implemented
|
||||||
// as an interface to allow for extensions that provide different semantics. Currently, there is only one implementation.
|
// as an interface to allow for extensions that provide different semantics. Currently, there is only one implementation.
|
||||||
type StatefulSetControlInterface interface {
|
type StatefulSetControlInterface interface {
|
||||||
@ -87,7 +92,11 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set
|
|||||||
|
|
||||||
currentRevision, updateRevision, status, err := ssc.performUpdate(ctx, set, pods, revisions)
|
currentRevision, updateRevision, status, err := ssc.performUpdate(ctx, set, pods, revisions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, utilerrors.NewAggregate([]error{err, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)})
|
errs := []error{err}
|
||||||
|
if agg, ok := err.(utilerrors.Aggregate); ok {
|
||||||
|
errs = agg.Errors()
|
||||||
|
}
|
||||||
|
return nil, utilerrors.NewAggregate(append(errs, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)))
|
||||||
}
|
}
|
||||||
|
|
||||||
// maintain the set's revision history limit
|
// maintain the set's revision history limit
|
||||||
@ -269,6 +278,38 @@ func (ssc *defaultStatefulSetControl) getStatefulSetRevisions(
|
|||||||
return currentRevision, updateRevision, collisionCount, nil
|
return currentRevision, updateRevision, collisionCount, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func slowStartBatch(initialBatchSize int, remaining int, fn func(int) (bool, error)) (int, error) {
|
||||||
|
successes := 0
|
||||||
|
j := 0
|
||||||
|
for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(integer.IntMin(2*batchSize, remaining), MaxBatchSize) {
|
||||||
|
errCh := make(chan error, batchSize)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(batchSize)
|
||||||
|
for i := 0; i < batchSize; i++ {
|
||||||
|
go func(k int) {
|
||||||
|
defer wg.Done()
|
||||||
|
// Ignore the first parameter - relevant for monotonic only.
|
||||||
|
if _, err := fn(k); err != nil {
|
||||||
|
errCh <- err
|
||||||
|
}
|
||||||
|
}(j)
|
||||||
|
j++
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
successes += batchSize - len(errCh)
|
||||||
|
close(errCh)
|
||||||
|
if len(errCh) > 0 {
|
||||||
|
errs := make([]error, 0)
|
||||||
|
for err := range errCh {
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
return successes, utilerrors.NewAggregate(errs)
|
||||||
|
}
|
||||||
|
remaining -= batchSize
|
||||||
|
}
|
||||||
|
return successes, nil
|
||||||
|
}
|
||||||
|
|
||||||
type replicaStatus struct {
|
type replicaStatus struct {
|
||||||
replicas int32
|
replicas int32
|
||||||
readyReplicas int32
|
readyReplicas int32
|
||||||
@ -460,6 +501,21 @@ func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set
|
|||||||
return true, ssc.podControl.DeleteStatefulPod(set, condemned[i])
|
return true, ssc.podControl.DeleteStatefulPod(set, condemned[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func runForAll(pods []*v1.Pod, fn func(i int) (bool, error), monotonic bool) (bool, error) {
|
||||||
|
if monotonic {
|
||||||
|
for i := range pods {
|
||||||
|
if shouldExit, err := fn(i); shouldExit || err != nil {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if _, err := slowStartBatch(1, len(pods), fn); err != nil {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
// updateStatefulSet performs the update function for a StatefulSet. This method creates, updates, and deletes Pods in
|
// updateStatefulSet performs the update function for a StatefulSet. This method creates, updates, and deletes Pods in
|
||||||
// the set in order to conform the system to the target state for the set. The target state always contains
|
// the set in order to conform the system to the target state for the set. The target state always contains
|
||||||
// set.Spec.Replicas Pods with a Ready Condition. If the UpdateStrategy.Type for the set is
|
// set.Spec.Replicas Pods with a Ready Condition. If the UpdateStrategy.Type for the set is
|
||||||
@ -565,13 +621,14 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
|
|||||||
|
|
||||||
monotonic := !allowsBurst(set)
|
monotonic := !allowsBurst(set)
|
||||||
|
|
||||||
// Examine each replica with respect to its ordinal
|
// First, process each living replica. Exit if we run into an error or something blocking in monotonic mode.
|
||||||
for i := range replicas {
|
processReplicaFn := func(i int) (bool, error) {
|
||||||
if shouldExit, err := ssc.processReplica(ctx, set, currentRevision, updateRevision, currentSet, updateSet, monotonic, replicas, i); shouldExit || err != nil {
|
return ssc.processReplica(ctx, set, currentRevision, updateRevision, currentSet, updateSet, monotonic, replicas, i)
|
||||||
|
}
|
||||||
|
if shouldExit, err := runForAll(replicas, processReplicaFn, monotonic); shouldExit || err != nil {
|
||||||
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
|
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
|
||||||
return &status, err
|
return &status, err
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Fix pod claims for condemned pods, if necessary.
|
// Fix pod claims for condemned pods, if necessary.
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
|
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
|
||||||
@ -585,20 +642,25 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
|
|||||||
}
|
}
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
for i := range condemned {
|
if shouldExit, err := runForAll(condemned, fixPodClaim, monotonic); shouldExit || err != nil {
|
||||||
if shouldExit, err := fixPodClaim(i); shouldExit || err != nil {
|
|
||||||
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
|
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
|
||||||
return &status, err
|
return &status, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
for i := range condemned {
|
// At this point, in monotonic mode all of the current Replicas are Running, Ready and Available,
|
||||||
if shouldExit, err := ssc.processCondemned(ctx, set, firstUnhealthyPod, monotonic, condemned, i); shouldExit || err != nil {
|
// and we can consider termination.
|
||||||
|
// We will wait for all predecessors to be Running and Ready prior to attempting a deletion.
|
||||||
|
// We will terminate Pods in a monotonically decreasing order.
|
||||||
|
// Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over
|
||||||
|
// updates.
|
||||||
|
processCondemnedFn := func(i int) (bool, error) {
|
||||||
|
return ssc.processCondemned(ctx, set, firstUnhealthyPod, monotonic, condemned, i)
|
||||||
|
}
|
||||||
|
if shouldExit, err := runForAll(condemned, processCondemnedFn, monotonic); shouldExit || err != nil {
|
||||||
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
|
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
|
||||||
return &status, err
|
return &status, err
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
|
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
|
||||||
|
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -2357,24 +2358,39 @@ func TestStatefulSetStatusUpdate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type requestTracker struct {
|
type requestTracker struct {
|
||||||
|
sync.Mutex
|
||||||
requests int
|
requests int
|
||||||
err error
|
err error
|
||||||
after int
|
after int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rt *requestTracker) errorReady() bool {
|
func (rt *requestTracker) errorReady() bool {
|
||||||
|
rt.Lock()
|
||||||
|
defer rt.Unlock()
|
||||||
return rt.err != nil && rt.requests >= rt.after
|
return rt.err != nil && rt.requests >= rt.after
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rt *requestTracker) inc() {
|
func (rt *requestTracker) inc() {
|
||||||
|
rt.Lock()
|
||||||
|
defer rt.Unlock()
|
||||||
rt.requests++
|
rt.requests++
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rt *requestTracker) reset() {
|
func (rt *requestTracker) reset() {
|
||||||
|
rt.Lock()
|
||||||
|
defer rt.Unlock()
|
||||||
rt.err = nil
|
rt.err = nil
|
||||||
rt.after = 0
|
rt.after = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newRequestTracker(requests int, err error, after int) requestTracker {
|
||||||
|
return requestTracker{
|
||||||
|
requests: requests,
|
||||||
|
err: err,
|
||||||
|
after: after,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type fakeObjectManager struct {
|
type fakeObjectManager struct {
|
||||||
podsLister corelisters.PodLister
|
podsLister corelisters.PodLister
|
||||||
claimsLister corelisters.PersistentVolumeClaimLister
|
claimsLister corelisters.PersistentVolumeClaimLister
|
||||||
@ -2402,9 +2418,10 @@ func newFakeObjectManager(informerFactory informers.SharedInformerFactory) *fake
|
|||||||
claimInformer.Informer().GetIndexer(),
|
claimInformer.Informer().GetIndexer(),
|
||||||
setInformer.Informer().GetIndexer(),
|
setInformer.Informer().GetIndexer(),
|
||||||
revisionInformer.Informer().GetIndexer(),
|
revisionInformer.Informer().GetIndexer(),
|
||||||
requestTracker{0, nil, 0},
|
newRequestTracker(0, nil, 0),
|
||||||
requestTracker{0, nil, 0},
|
newRequestTracker(0, nil, 0),
|
||||||
requestTracker{0, nil, 0}}
|
newRequestTracker(0, nil, 0),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (om *fakeObjectManager) CreatePod(ctx context.Context, pod *v1.Pod) error {
|
func (om *fakeObjectManager) CreatePod(ctx context.Context, pod *v1.Pod) error {
|
||||||
@ -2619,7 +2636,7 @@ func newFakeStatefulSetStatusUpdater(setInformer appsinformers.StatefulSetInform
|
|||||||
return &fakeStatefulSetStatusUpdater{
|
return &fakeStatefulSetStatusUpdater{
|
||||||
setInformer.Lister(),
|
setInformer.Lister(),
|
||||||
setInformer.Informer().GetIndexer(),
|
setInformer.Informer().GetIndexer(),
|
||||||
requestTracker{0, nil, 0},
|
newRequestTracker(0, nil, 0),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user