diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index fa9f7ae6992..5f45a7b820e 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -269,6 +269,197 @@ func (ssc *defaultStatefulSetControl) getStatefulSetRevisions( return currentRevision, updateRevision, collisionCount, nil } +type replicaStatus struct { + replicas int32 + readyReplicas int32 + availableReplicas int32 + currentReplicas int32 + updatedReplicas int32 +} + +func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision) replicaStatus { + status := replicaStatus{} + for _, pod := range pods { + if isCreated(pod) { + status.replicas++ + } + + // count the number of running and ready replicas + if isRunningAndReady(pod) { + status.readyReplicas++ + // count the number of running and available replicas + if isRunningAndAvailable(pod, minReadySeconds) { + status.availableReplicas++ + } + + } + + // count the number of current and update replicas + if isCreated(pod) && !isTerminating(pod) { + if getPodRevision(pod) == currentRevision.Name { + status.currentReplicas++ + } + if getPodRevision(pod) == updateRevision.Name { + status.updatedReplicas++ + } + } + } + return status +} + +func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision, podLists ...[]*v1.Pod) { + status.Replicas = 0 + status.ReadyReplicas = 0 + status.AvailableReplicas = 0 + status.CurrentReplicas = 0 + status.UpdatedReplicas = 0 + for _, list := range podLists { + replicaStatus := computeReplicaStatus(list, minReadySeconds, currentRevision, updateRevision) + status.Replicas += replicaStatus.replicas + status.ReadyReplicas += replicaStatus.readyReplicas + status.AvailableReplicas += replicaStatus.availableReplicas + status.CurrentReplicas += replicaStatus.currentReplicas + status.UpdatedReplicas += replicaStatus.updatedReplicas + } +} + +func (ssc *defaultStatefulSetControl) processReplica( + ctx context.Context, + set *apps.StatefulSet, + currentRevision *apps.ControllerRevision, + updateRevision *apps.ControllerRevision, + currentSet *apps.StatefulSet, + updateSet *apps.StatefulSet, + monotonic bool, + replicas []*v1.Pod, + i int) (bool, error) { + logger := klog.FromContext(ctx) + // delete and recreate failed pods + if isFailed(replicas[i]) { + ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod", + "StatefulSet %s/%s is recreating failed Pod %s", + set.Namespace, + set.Name, + replicas[i].Name) + if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil { + return true, err + } + replicaOrd := i + getStartOrdinal(set) + replicas[i] = newVersionedStatefulSetPod( + currentSet, + updateSet, + currentRevision.Name, + updateRevision.Name, + replicaOrd) + } + // If we find a Pod that has not been created we create the Pod + if !isCreated(replicas[i]) { + if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { + if isStale, err := ssc.podControl.PodClaimIsStale(set, replicas[i]); err != nil { + return true, err + } else if isStale { + // If a pod has a stale PVC, no more work can be done this round. + return true, err + } + } + if err := ssc.podControl.CreateStatefulPod(ctx, set, replicas[i]); err != nil { + return true, err + } + if monotonic { + // if the set does not allow bursting, return immediately + return true, nil + } + } + + // If the Pod is in pending state then trigger PVC creation to create missing PVCs + if isPending(replicas[i]) { + logger.V(4).Info( + "StatefulSet is triggering PVC creation for pending Pod", + "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) + if err := ssc.podControl.createMissingPersistentVolumeClaims(ctx, set, replicas[i]); err != nil { + return true, err + } + } + + // If we find a Pod that is currently terminating, we must wait until graceful deletion + // completes before we continue to make progress. + if isTerminating(replicas[i]) && monotonic { + logger.V(4).Info("StatefulSet is waiting for Pod to Terminate", + "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) + return true, nil + } + + // If we have a Pod that has been created but is not running and ready we can not make progress. + // We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its + // ordinal, are Running and Ready. + if !isRunningAndReady(replicas[i]) && monotonic { + logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready", + "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) + return true, nil + } + + // If we have a Pod that has been created but is not available we can not make progress. + // We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its + // ordinal, are Available. + if !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic { + logger.V(4).Info("StatefulSet is waiting for Pod to be Available", + "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) + return true, nil + } + + // Enforce the StatefulSet invariants + retentionMatch := true + if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { + var err error + retentionMatch, err = ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, replicas[i]) + // An error is expected if the pod is not yet fully updated, and so return is treated as matching. + if err != nil { + retentionMatch = true + } + } + + if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) && retentionMatch { + return false, nil + } + + // Make a deep copy so we don't mutate the shared cache + replica := replicas[i].DeepCopy() + if err := ssc.podControl.UpdateStatefulPod(ctx, updateSet, replica); err != nil { + return true, err + } + + return false, nil +} + +func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set *apps.StatefulSet, firstUnhealthyPod *v1.Pod, monotonic bool, condemned []*v1.Pod, i int) (bool, error) { + logger := klog.FromContext(ctx) + if isTerminating(condemned[i]) { + // if we are in monotonic mode, block and wait for terminating pods to expire + if monotonic { + logger.V(4).Info("StatefulSet is waiting for Pod to Terminate prior to scale down", + "statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i])) + return true, nil + } + return false, nil + } + // if we are in monotonic mode and the condemned target is not the first unhealthy Pod block + if !isRunningAndReady(condemned[i]) && monotonic && condemned[i] != firstUnhealthyPod { + logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready prior to scale down", + "statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod)) + return true, nil + } + // if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block. + if !isRunningAndAvailable(condemned[i], set.Spec.MinReadySeconds) && monotonic && condemned[i] != firstUnhealthyPod { + logger.V(4).Info("StatefulSet is waiting for Pod to be Available prior to scale down", + "statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod)) + return true, nil + } + + logger.V(2).Info("Pod of StatefulSet is terminating for scale down", + "statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i])) + return true, ssc.podControl.DeleteStatefulPod(set, condemned[i]) +} + // updateStatefulSet performs the update function for a StatefulSet. This method creates, updates, and deletes Pods in // the set in order to conform the system to the target state for the set. The target state always contains // set.Spec.Replicas Pods with a Ready Condition. If the UpdateStrategy.Type for the set is @@ -304,6 +495,8 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( status.CollisionCount = new(int32) *status.CollisionCount = collisionCount + updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, pods) + replicaCount := int(*set.Spec.Replicas) // slice that will contain all Pods such that getStartOrdinal(set) <= getOrdinal(pod) <= getEndOrdinal(set) replicas := make([]*v1.Pod, replicaCount) @@ -314,28 +507,6 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( // First we partition pods into two lists valid replicas and condemned Pods for _, pod := range pods { - status.Replicas++ - - // count the number of running and ready replicas - if isRunningAndReady(pod) { - status.ReadyReplicas++ - // count the number of running and available replicas - if isRunningAndAvailable(pod, set.Spec.MinReadySeconds) { - status.AvailableReplicas++ - } - - } - - // count the number of current and update replicas - if isCreated(pod) && !isTerminating(pod) { - if getPodRevision(pod) == currentRevision.Name { - status.CurrentReplicas++ - } - if getPodRevision(pod) == updateRevision.Name { - status.UpdatedReplicas++ - } - } - if podInOrdinalRange(pod, set) { // if the ordinal of the pod is within the range of the current number of replicas, // insert it at the indirection of its ordinal @@ -360,7 +531,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( } // sort the condemned Pods by their ordinals - sort.Sort(ascendingOrdinal(condemned)) + sort.Sort(descendingOrdinal(condemned)) // find the first unhealthy Pod for i := range replicas { @@ -372,7 +543,8 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( } } - for i := range condemned { + // or the first unhealthy condemned Pod (condemned are sorted in descending order for ease of use) + for i := len(condemned) - 1; i >= 0; i-- { if !isHealthy(condemned[i]) { unhealthy++ if firstUnhealthyPod == nil { @@ -395,168 +567,41 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( // Examine each replica with respect to its ordinal for i := range replicas { - // delete and recreate failed pods - if isFailed(replicas[i]) { - ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod", - "StatefulSet %s/%s is recreating failed Pod %s", - set.Namespace, - set.Name, - replicas[i].Name) - if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil { - return &status, err - } - if getPodRevision(replicas[i]) == currentRevision.Name { - status.CurrentReplicas-- - } - if getPodRevision(replicas[i]) == updateRevision.Name { - status.UpdatedReplicas-- - } - status.Replicas-- - replicaOrd := i + getStartOrdinal(set) - replicas[i] = newVersionedStatefulSetPod( - currentSet, - updateSet, - currentRevision.Name, - updateRevision.Name, - replicaOrd) - } - // If we find a Pod that has not been created we create the Pod - if !isCreated(replicas[i]) { - if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { - if isStale, err := ssc.podControl.PodClaimIsStale(set, replicas[i]); err != nil { - return &status, err - } else if isStale { - // If a pod has a stale PVC, no more work can be done this round. - return &status, err - } - } - if err := ssc.podControl.CreateStatefulPod(ctx, set, replicas[i]); err != nil { - return &status, err - } - status.Replicas++ - if getPodRevision(replicas[i]) == currentRevision.Name { - status.CurrentReplicas++ - } - if getPodRevision(replicas[i]) == updateRevision.Name { - status.UpdatedReplicas++ - } - // if the set does not allow bursting, return immediately - if monotonic { - return &status, nil - } - // pod created, no more work possible for this round - continue - } - - // If the Pod is in pending state then trigger PVC creation to create missing PVCs - if isPending(replicas[i]) { - logger.V(4).Info("StatefulSet is triggering PVC Creation for pending Pod", - "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) - if err := ssc.podControl.createMissingPersistentVolumeClaims(ctx, set, replicas[i]); err != nil { - return &status, err - } - } - // If we find a Pod that is currently terminating, we must wait until graceful deletion - // completes before we continue to make progress. - if isTerminating(replicas[i]) && monotonic { - logger.V(4).Info("StatefulSet is waiting for Pod to Terminate", - "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) - return &status, nil - } - // If we have a Pod that has been created but is not running and ready we can not make progress. - // We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its - // ordinal, are Running and Ready. - if !isRunningAndReady(replicas[i]) && monotonic { - logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready", - "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) - return &status, nil - } - // If we have a Pod that has been created but is not available we can not make progress. - // We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its - // ordinal, are Available. - if !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic { - logger.V(4).Info("StatefulSet is waiting for Pod to be Available", - "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) - return &status, nil - } - // Enforce the StatefulSet invariants - retentionMatch := true - if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { - var err error - retentionMatch, err = ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, replicas[i]) - // An error is expected if the pod is not yet fully updated, and so return is treated as matching. - if err != nil { - retentionMatch = true - } - } - if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) && retentionMatch { - continue - } - // Make a deep copy so we don't mutate the shared cache - replica := replicas[i].DeepCopy() - if err := ssc.podControl.UpdateStatefulPod(ctx, updateSet, replica); err != nil { + if shouldExit, err := ssc.processReplica(ctx, set, currentRevision, updateRevision, currentSet, updateSet, monotonic, replicas, i); shouldExit || err != nil { + updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) return &status, err } } + // Fix pod claims for condemned pods, if necessary. if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { - // Ensure ownerRefs are set correctly for the condemned pods. - for i := range condemned { + fixPodClaim := func(i int) (bool, error) { if matchPolicy, err := ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, condemned[i]); err != nil { - return &status, err + return true, err } else if !matchPolicy { if err := ssc.podControl.UpdatePodClaimForRetentionPolicy(ctx, updateSet, condemned[i]); err != nil { - return &status, err + return true, err } } + return false, nil + } + for i := range condemned { + if shouldExit, err := fixPodClaim(i); shouldExit || err != nil { + updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) + return &status, err + } } } - // At this point, all of the current Replicas are Running, Ready and Available, we can consider termination. - // We will wait for all predecessors to be Running and Ready prior to attempting a deletion. - // We will terminate Pods in a monotonically decreasing order. - // Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over - // updates. - for target := len(condemned) - 1; target >= 0; target-- { - // wait for terminating pods to expire - if isTerminating(condemned[target]) { - logger.V(4).Info("StatefulSet is waiting for Pod to Terminate prior to scale down", - "statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[target])) - // block if we are in monotonic mode - if monotonic { - return &status, nil - } - continue - } - // if we are in monotonic mode and the condemned target is not the first unhealthy Pod block - if !isRunningAndReady(condemned[target]) && monotonic && condemned[target] != firstUnhealthyPod { - logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready prior to scale down", - "statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod)) - return &status, nil - } - // if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block. - if !isRunningAndAvailable(condemned[target], set.Spec.MinReadySeconds) && monotonic && condemned[target] != firstUnhealthyPod { - logger.V(4).Info("StatefulSet is waiting for Pod to be Available prior to scale down", - "statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod)) - return &status, nil - } - logger.V(2).Info("Pod of StatefulSet is terminating for scale down", - "statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[target])) - - if err := ssc.podControl.DeleteStatefulPod(set, condemned[target]); err != nil { + for i := range condemned { + if shouldExit, err := ssc.processCondemned(ctx, set, firstUnhealthyPod, monotonic, condemned, i); shouldExit || err != nil { + updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) return &status, err } - if getPodRevision(condemned[target]) == currentRevision.Name { - status.CurrentReplicas-- - } - if getPodRevision(condemned[target]) == updateRevision.Name { - status.UpdatedReplicas-- - } - if monotonic { - return &status, nil - } } + updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) + // for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted. if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType { return &status, nil diff --git a/pkg/controller/statefulset/stateful_set_utils.go b/pkg/controller/statefulset/stateful_set_utils.go index a741047f75f..5ab1f9af114 100644 --- a/pkg/controller/statefulset/stateful_set_utils.go +++ b/pkg/controller/statefulset/stateful_set_utils.go @@ -602,6 +602,23 @@ func (ao ascendingOrdinal) Less(i, j int) bool { return getOrdinal(ao[i]) < getOrdinal(ao[j]) } +// descendingOrdinal is a sort.Interface that Sorts a list of Pods based on the ordinals extracted +// from the Pod. Pod's that have not been constructed by StatefulSet's have an ordinal of -1, and are therefore pushed +// to the end of the list. +type descendingOrdinal []*v1.Pod + +func (do descendingOrdinal) Len() int { + return len(do) +} + +func (do descendingOrdinal) Swap(i, j int) { + do[i], do[j] = do[j], do[i] +} + +func (do descendingOrdinal) Less(i, j int) bool { + return getOrdinal(do[i]) > getOrdinal(do[j]) +} + // getStatefulSetMaxUnavailable calculates the real maxUnavailable number according to the replica count // and maxUnavailable from rollingUpdateStrategy. The number defaults to 1 if the maxUnavailable field is // not set, and it will be round down to at least 1 if the maxUnavailable value is a percentage.