mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-10 04:27:54 +00:00
Refactor StatefulSet controller update logic
This commit is contained in:
parent
c0147ff528
commit
2fb472c83c
@ -269,6 +269,197 @@ func (ssc *defaultStatefulSetControl) getStatefulSetRevisions(
|
||||
return currentRevision, updateRevision, collisionCount, nil
|
||||
}
|
||||
|
||||
type replicaStatus struct {
|
||||
replicas int32
|
||||
readyReplicas int32
|
||||
availableReplicas int32
|
||||
currentReplicas int32
|
||||
updatedReplicas int32
|
||||
}
|
||||
|
||||
func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision) replicaStatus {
|
||||
status := replicaStatus{}
|
||||
for _, pod := range pods {
|
||||
if isCreated(pod) {
|
||||
status.replicas++
|
||||
}
|
||||
|
||||
// count the number of running and ready replicas
|
||||
if isRunningAndReady(pod) {
|
||||
status.readyReplicas++
|
||||
// count the number of running and available replicas
|
||||
if isRunningAndAvailable(pod, minReadySeconds) {
|
||||
status.availableReplicas++
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// count the number of current and update replicas
|
||||
if isCreated(pod) && !isTerminating(pod) {
|
||||
if getPodRevision(pod) == currentRevision.Name {
|
||||
status.currentReplicas++
|
||||
}
|
||||
if getPodRevision(pod) == updateRevision.Name {
|
||||
status.updatedReplicas++
|
||||
}
|
||||
}
|
||||
}
|
||||
return status
|
||||
}
|
||||
|
||||
func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision, podLists ...[]*v1.Pod) {
|
||||
status.Replicas = 0
|
||||
status.ReadyReplicas = 0
|
||||
status.AvailableReplicas = 0
|
||||
status.CurrentReplicas = 0
|
||||
status.UpdatedReplicas = 0
|
||||
for _, list := range podLists {
|
||||
replicaStatus := computeReplicaStatus(list, minReadySeconds, currentRevision, updateRevision)
|
||||
status.Replicas += replicaStatus.replicas
|
||||
status.ReadyReplicas += replicaStatus.readyReplicas
|
||||
status.AvailableReplicas += replicaStatus.availableReplicas
|
||||
status.CurrentReplicas += replicaStatus.currentReplicas
|
||||
status.UpdatedReplicas += replicaStatus.updatedReplicas
|
||||
}
|
||||
}
|
||||
|
||||
func (ssc *defaultStatefulSetControl) processReplica(
|
||||
ctx context.Context,
|
||||
set *apps.StatefulSet,
|
||||
currentRevision *apps.ControllerRevision,
|
||||
updateRevision *apps.ControllerRevision,
|
||||
currentSet *apps.StatefulSet,
|
||||
updateSet *apps.StatefulSet,
|
||||
monotonic bool,
|
||||
replicas []*v1.Pod,
|
||||
i int) (bool, error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
// delete and recreate failed pods
|
||||
if isFailed(replicas[i]) {
|
||||
ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
|
||||
"StatefulSet %s/%s is recreating failed Pod %s",
|
||||
set.Namespace,
|
||||
set.Name,
|
||||
replicas[i].Name)
|
||||
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
|
||||
return true, err
|
||||
}
|
||||
replicaOrd := i + getStartOrdinal(set)
|
||||
replicas[i] = newVersionedStatefulSetPod(
|
||||
currentSet,
|
||||
updateSet,
|
||||
currentRevision.Name,
|
||||
updateRevision.Name,
|
||||
replicaOrd)
|
||||
}
|
||||
// If we find a Pod that has not been created we create the Pod
|
||||
if !isCreated(replicas[i]) {
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
|
||||
if isStale, err := ssc.podControl.PodClaimIsStale(set, replicas[i]); err != nil {
|
||||
return true, err
|
||||
} else if isStale {
|
||||
// If a pod has a stale PVC, no more work can be done this round.
|
||||
return true, err
|
||||
}
|
||||
}
|
||||
if err := ssc.podControl.CreateStatefulPod(ctx, set, replicas[i]); err != nil {
|
||||
return true, err
|
||||
}
|
||||
if monotonic {
|
||||
// if the set does not allow bursting, return immediately
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
// If the Pod is in pending state then trigger PVC creation to create missing PVCs
|
||||
if isPending(replicas[i]) {
|
||||
logger.V(4).Info(
|
||||
"StatefulSet is triggering PVC creation for pending Pod",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
|
||||
if err := ssc.podControl.createMissingPersistentVolumeClaims(ctx, set, replicas[i]); err != nil {
|
||||
return true, err
|
||||
}
|
||||
}
|
||||
|
||||
// If we find a Pod that is currently terminating, we must wait until graceful deletion
|
||||
// completes before we continue to make progress.
|
||||
if isTerminating(replicas[i]) && monotonic {
|
||||
logger.V(4).Info("StatefulSet is waiting for Pod to Terminate",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// If we have a Pod that has been created but is not running and ready we can not make progress.
|
||||
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
|
||||
// ordinal, are Running and Ready.
|
||||
if !isRunningAndReady(replicas[i]) && monotonic {
|
||||
logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// If we have a Pod that has been created but is not available we can not make progress.
|
||||
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
|
||||
// ordinal, are Available.
|
||||
if !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic {
|
||||
logger.V(4).Info("StatefulSet is waiting for Pod to be Available",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Enforce the StatefulSet invariants
|
||||
retentionMatch := true
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
|
||||
var err error
|
||||
retentionMatch, err = ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, replicas[i])
|
||||
// An error is expected if the pod is not yet fully updated, and so return is treated as matching.
|
||||
if err != nil {
|
||||
retentionMatch = true
|
||||
}
|
||||
}
|
||||
|
||||
if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) && retentionMatch {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Make a deep copy so we don't mutate the shared cache
|
||||
replica := replicas[i].DeepCopy()
|
||||
if err := ssc.podControl.UpdateStatefulPod(ctx, updateSet, replica); err != nil {
|
||||
return true, err
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set *apps.StatefulSet, firstUnhealthyPod *v1.Pod, monotonic bool, condemned []*v1.Pod, i int) (bool, error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
if isTerminating(condemned[i]) {
|
||||
// if we are in monotonic mode, block and wait for terminating pods to expire
|
||||
if monotonic {
|
||||
logger.V(4).Info("StatefulSet is waiting for Pod to Terminate prior to scale down",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i]))
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod block
|
||||
if !isRunningAndReady(condemned[i]) && monotonic && condemned[i] != firstUnhealthyPod {
|
||||
logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready prior to scale down",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
|
||||
return true, nil
|
||||
}
|
||||
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block.
|
||||
if !isRunningAndAvailable(condemned[i], set.Spec.MinReadySeconds) && monotonic && condemned[i] != firstUnhealthyPod {
|
||||
logger.V(4).Info("StatefulSet is waiting for Pod to be Available prior to scale down",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
|
||||
return true, nil
|
||||
}
|
||||
|
||||
logger.V(2).Info("Pod of StatefulSet is terminating for scale down",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i]))
|
||||
return true, ssc.podControl.DeleteStatefulPod(set, condemned[i])
|
||||
}
|
||||
|
||||
// updateStatefulSet performs the update function for a StatefulSet. This method creates, updates, and deletes Pods in
|
||||
// the set in order to conform the system to the target state for the set. The target state always contains
|
||||
// set.Spec.Replicas Pods with a Ready Condition. If the UpdateStrategy.Type for the set is
|
||||
@ -304,6 +495,8 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
|
||||
status.CollisionCount = new(int32)
|
||||
*status.CollisionCount = collisionCount
|
||||
|
||||
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, pods)
|
||||
|
||||
replicaCount := int(*set.Spec.Replicas)
|
||||
// slice that will contain all Pods such that getStartOrdinal(set) <= getOrdinal(pod) <= getEndOrdinal(set)
|
||||
replicas := make([]*v1.Pod, replicaCount)
|
||||
@ -314,28 +507,6 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
|
||||
|
||||
// First we partition pods into two lists valid replicas and condemned Pods
|
||||
for _, pod := range pods {
|
||||
status.Replicas++
|
||||
|
||||
// count the number of running and ready replicas
|
||||
if isRunningAndReady(pod) {
|
||||
status.ReadyReplicas++
|
||||
// count the number of running and available replicas
|
||||
if isRunningAndAvailable(pod, set.Spec.MinReadySeconds) {
|
||||
status.AvailableReplicas++
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// count the number of current and update replicas
|
||||
if isCreated(pod) && !isTerminating(pod) {
|
||||
if getPodRevision(pod) == currentRevision.Name {
|
||||
status.CurrentReplicas++
|
||||
}
|
||||
if getPodRevision(pod) == updateRevision.Name {
|
||||
status.UpdatedReplicas++
|
||||
}
|
||||
}
|
||||
|
||||
if podInOrdinalRange(pod, set) {
|
||||
// if the ordinal of the pod is within the range of the current number of replicas,
|
||||
// insert it at the indirection of its ordinal
|
||||
@ -360,7 +531,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
|
||||
}
|
||||
|
||||
// sort the condemned Pods by their ordinals
|
||||
sort.Sort(ascendingOrdinal(condemned))
|
||||
sort.Sort(descendingOrdinal(condemned))
|
||||
|
||||
// find the first unhealthy Pod
|
||||
for i := range replicas {
|
||||
@ -372,7 +543,8 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
|
||||
}
|
||||
}
|
||||
|
||||
for i := range condemned {
|
||||
// or the first unhealthy condemned Pod (condemned are sorted in descending order for ease of use)
|
||||
for i := len(condemned) - 1; i >= 0; i-- {
|
||||
if !isHealthy(condemned[i]) {
|
||||
unhealthy++
|
||||
if firstUnhealthyPod == nil {
|
||||
@ -395,168 +567,41 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
|
||||
|
||||
// Examine each replica with respect to its ordinal
|
||||
for i := range replicas {
|
||||
// delete and recreate failed pods
|
||||
if isFailed(replicas[i]) {
|
||||
ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
|
||||
"StatefulSet %s/%s is recreating failed Pod %s",
|
||||
set.Namespace,
|
||||
set.Name,
|
||||
replicas[i].Name)
|
||||
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
|
||||
return &status, err
|
||||
}
|
||||
if getPodRevision(replicas[i]) == currentRevision.Name {
|
||||
status.CurrentReplicas--
|
||||
}
|
||||
if getPodRevision(replicas[i]) == updateRevision.Name {
|
||||
status.UpdatedReplicas--
|
||||
}
|
||||
status.Replicas--
|
||||
replicaOrd := i + getStartOrdinal(set)
|
||||
replicas[i] = newVersionedStatefulSetPod(
|
||||
currentSet,
|
||||
updateSet,
|
||||
currentRevision.Name,
|
||||
updateRevision.Name,
|
||||
replicaOrd)
|
||||
}
|
||||
// If we find a Pod that has not been created we create the Pod
|
||||
if !isCreated(replicas[i]) {
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
|
||||
if isStale, err := ssc.podControl.PodClaimIsStale(set, replicas[i]); err != nil {
|
||||
return &status, err
|
||||
} else if isStale {
|
||||
// If a pod has a stale PVC, no more work can be done this round.
|
||||
return &status, err
|
||||
}
|
||||
}
|
||||
if err := ssc.podControl.CreateStatefulPod(ctx, set, replicas[i]); err != nil {
|
||||
return &status, err
|
||||
}
|
||||
status.Replicas++
|
||||
if getPodRevision(replicas[i]) == currentRevision.Name {
|
||||
status.CurrentReplicas++
|
||||
}
|
||||
if getPodRevision(replicas[i]) == updateRevision.Name {
|
||||
status.UpdatedReplicas++
|
||||
}
|
||||
// if the set does not allow bursting, return immediately
|
||||
if monotonic {
|
||||
return &status, nil
|
||||
}
|
||||
// pod created, no more work possible for this round
|
||||
continue
|
||||
}
|
||||
|
||||
// If the Pod is in pending state then trigger PVC creation to create missing PVCs
|
||||
if isPending(replicas[i]) {
|
||||
logger.V(4).Info("StatefulSet is triggering PVC Creation for pending Pod",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
|
||||
if err := ssc.podControl.createMissingPersistentVolumeClaims(ctx, set, replicas[i]); err != nil {
|
||||
return &status, err
|
||||
}
|
||||
}
|
||||
// If we find a Pod that is currently terminating, we must wait until graceful deletion
|
||||
// completes before we continue to make progress.
|
||||
if isTerminating(replicas[i]) && monotonic {
|
||||
logger.V(4).Info("StatefulSet is waiting for Pod to Terminate",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
|
||||
return &status, nil
|
||||
}
|
||||
// If we have a Pod that has been created but is not running and ready we can not make progress.
|
||||
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
|
||||
// ordinal, are Running and Ready.
|
||||
if !isRunningAndReady(replicas[i]) && monotonic {
|
||||
logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
|
||||
return &status, nil
|
||||
}
|
||||
// If we have a Pod that has been created but is not available we can not make progress.
|
||||
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
|
||||
// ordinal, are Available.
|
||||
if !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic {
|
||||
logger.V(4).Info("StatefulSet is waiting for Pod to be Available",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
|
||||
return &status, nil
|
||||
}
|
||||
// Enforce the StatefulSet invariants
|
||||
retentionMatch := true
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
|
||||
var err error
|
||||
retentionMatch, err = ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, replicas[i])
|
||||
// An error is expected if the pod is not yet fully updated, and so return is treated as matching.
|
||||
if err != nil {
|
||||
retentionMatch = true
|
||||
}
|
||||
}
|
||||
if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) && retentionMatch {
|
||||
continue
|
||||
}
|
||||
// Make a deep copy so we don't mutate the shared cache
|
||||
replica := replicas[i].DeepCopy()
|
||||
if err := ssc.podControl.UpdateStatefulPod(ctx, updateSet, replica); err != nil {
|
||||
if shouldExit, err := ssc.processReplica(ctx, set, currentRevision, updateRevision, currentSet, updateSet, monotonic, replicas, i); shouldExit || err != nil {
|
||||
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
|
||||
return &status, err
|
||||
}
|
||||
}
|
||||
|
||||
// Fix pod claims for condemned pods, if necessary.
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
|
||||
// Ensure ownerRefs are set correctly for the condemned pods.
|
||||
for i := range condemned {
|
||||
fixPodClaim := func(i int) (bool, error) {
|
||||
if matchPolicy, err := ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, condemned[i]); err != nil {
|
||||
return &status, err
|
||||
return true, err
|
||||
} else if !matchPolicy {
|
||||
if err := ssc.podControl.UpdatePodClaimForRetentionPolicy(ctx, updateSet, condemned[i]); err != nil {
|
||||
return &status, err
|
||||
return true, err
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
for i := range condemned {
|
||||
if shouldExit, err := fixPodClaim(i); shouldExit || err != nil {
|
||||
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
|
||||
return &status, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// At this point, all of the current Replicas are Running, Ready and Available, we can consider termination.
|
||||
// We will wait for all predecessors to be Running and Ready prior to attempting a deletion.
|
||||
// We will terminate Pods in a monotonically decreasing order.
|
||||
// Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over
|
||||
// updates.
|
||||
for target := len(condemned) - 1; target >= 0; target-- {
|
||||
// wait for terminating pods to expire
|
||||
if isTerminating(condemned[target]) {
|
||||
logger.V(4).Info("StatefulSet is waiting for Pod to Terminate prior to scale down",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[target]))
|
||||
// block if we are in monotonic mode
|
||||
if monotonic {
|
||||
return &status, nil
|
||||
}
|
||||
continue
|
||||
}
|
||||
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod block
|
||||
if !isRunningAndReady(condemned[target]) && monotonic && condemned[target] != firstUnhealthyPod {
|
||||
logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready prior to scale down",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
|
||||
return &status, nil
|
||||
}
|
||||
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block.
|
||||
if !isRunningAndAvailable(condemned[target], set.Spec.MinReadySeconds) && monotonic && condemned[target] != firstUnhealthyPod {
|
||||
logger.V(4).Info("StatefulSet is waiting for Pod to be Available prior to scale down",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
|
||||
return &status, nil
|
||||
}
|
||||
logger.V(2).Info("Pod of StatefulSet is terminating for scale down",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[target]))
|
||||
|
||||
if err := ssc.podControl.DeleteStatefulPod(set, condemned[target]); err != nil {
|
||||
for i := range condemned {
|
||||
if shouldExit, err := ssc.processCondemned(ctx, set, firstUnhealthyPod, monotonic, condemned, i); shouldExit || err != nil {
|
||||
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
|
||||
return &status, err
|
||||
}
|
||||
if getPodRevision(condemned[target]) == currentRevision.Name {
|
||||
status.CurrentReplicas--
|
||||
}
|
||||
if getPodRevision(condemned[target]) == updateRevision.Name {
|
||||
status.UpdatedReplicas--
|
||||
}
|
||||
if monotonic {
|
||||
return &status, nil
|
||||
}
|
||||
}
|
||||
|
||||
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
|
||||
|
||||
// for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted.
|
||||
if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
|
||||
return &status, nil
|
||||
|
@ -602,6 +602,23 @@ func (ao ascendingOrdinal) Less(i, j int) bool {
|
||||
return getOrdinal(ao[i]) < getOrdinal(ao[j])
|
||||
}
|
||||
|
||||
// descendingOrdinal is a sort.Interface that Sorts a list of Pods based on the ordinals extracted
|
||||
// from the Pod. Pod's that have not been constructed by StatefulSet's have an ordinal of -1, and are therefore pushed
|
||||
// to the end of the list.
|
||||
type descendingOrdinal []*v1.Pod
|
||||
|
||||
func (do descendingOrdinal) Len() int {
|
||||
return len(do)
|
||||
}
|
||||
|
||||
func (do descendingOrdinal) Swap(i, j int) {
|
||||
do[i], do[j] = do[j], do[i]
|
||||
}
|
||||
|
||||
func (do descendingOrdinal) Less(i, j int) bool {
|
||||
return getOrdinal(do[i]) > getOrdinal(do[j])
|
||||
}
|
||||
|
||||
// getStatefulSetMaxUnavailable calculates the real maxUnavailable number according to the replica count
|
||||
// and maxUnavailable from rollingUpdateStrategy. The number defaults to 1 if the maxUnavailable field is
|
||||
// not set, and it will be round down to at least 1 if the maxUnavailable value is a percentage.
|
||||
|
Loading…
Reference in New Issue
Block a user