Adding implementation of KEP-3335, StatefulSetSlice

This commit is contained in:
Peter Schuurman
2022-09-26 18:34:03 -07:00
parent 2b7b5245ea
commit 7b3d77a41a
37 changed files with 1804 additions and 507 deletions

View File

@@ -29,11 +29,13 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/strategicpatch"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/history"
"k8s.io/kubernetes/pkg/features"
)
var patchCodec = scheme.Codecs.LegacyCodec(apps.SchemeGroupVersion)
@@ -85,6 +87,29 @@ func getOrdinal(pod *v1.Pod) int {
return ordinal
}
// getStartOrdinal gets the first possible ordinal (inclusive).
// Returns spec.ordinals.start if spec.ordinals is set, otherwise returns 0.
func getStartOrdinal(set *apps.StatefulSet) int {
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetSlice) {
if set.Spec.Ordinals != nil {
return int(set.Spec.Ordinals.Start)
}
}
return 0
}
// getEndOrdinal gets the last possible ordinal (inclusive).
func getEndOrdinal(set *apps.StatefulSet) int {
return getStartOrdinal(set) + int(*set.Spec.Replicas) - 1
}
// podInOrdinalRange returns true if the pod ordinal is within the allowed
// range of ordinals that this StatefulSet is set to control.
func podInOrdinalRange(pod *v1.Pod, set *apps.StatefulSet) bool {
ordinal := getOrdinal(pod)
return ordinal >= getStartOrdinal(set) && ordinal <= getEndOrdinal(set)
}
// getPodName gets the name of set's child Pod with an ordinal index of ordinal
func getPodName(set *apps.StatefulSet, ordinal int) string {
return fmt.Sprintf("%s-%d", set.Name, ordinal)
@@ -170,12 +195,12 @@ func claimOwnerMatchesSetAndPod(claim *v1.PersistentVolumeClaim, set *apps.State
if hasOwnerRef(claim, set) {
return false
}
podScaledDown := getOrdinal(pod) >= int(*set.Spec.Replicas)
podScaledDown := !podInOrdinalRange(pod, set)
if podScaledDown != hasOwnerRef(claim, pod) {
return false
}
case policy.WhenScaled == delete && policy.WhenDeleted == delete:
podScaledDown := getOrdinal(pod) >= int(*set.Spec.Replicas)
podScaledDown := !podInOrdinalRange(pod, set)
// If a pod is scaled down, there should be no set ref and a pod ref;
// if the pod is not scaled down it's the other way around.
if podScaledDown == hasOwnerRef(claim, set) {
@@ -226,7 +251,7 @@ func updateClaimOwnerRefForSetAndPod(claim *v1.PersistentVolumeClaim, set *apps.
needsUpdate = removeOwnerRef(claim, pod) || needsUpdate
case policy.WhenScaled == delete && policy.WhenDeleted == retain:
needsUpdate = removeOwnerRef(claim, set) || needsUpdate
podScaledDown := getOrdinal(pod) >= int(*set.Spec.Replicas)
podScaledDown := !podInOrdinalRange(pod, set)
if podScaledDown {
needsUpdate = setOwnerRef(claim, pod, &podMeta) || needsUpdate
}
@@ -234,7 +259,7 @@ func updateClaimOwnerRefForSetAndPod(claim *v1.PersistentVolumeClaim, set *apps.
needsUpdate = removeOwnerRef(claim, pod) || needsUpdate
}
case policy.WhenScaled == delete && policy.WhenDeleted == delete:
podScaledDown := getOrdinal(pod) >= int(*set.Spec.Replicas)
podScaledDown := !podInOrdinalRange(pod, set)
if podScaledDown {
needsUpdate = removeOwnerRef(claim, set) || needsUpdate
needsUpdate = setOwnerRef(claim, pod, &podMeta) || needsUpdate
@@ -440,8 +465,8 @@ func newStatefulSetPod(set *apps.StatefulSet, ordinal int) *v1.Pod {
// returned error is nil, the returned Pod is valid.
func newVersionedStatefulSetPod(currentSet, updateSet *apps.StatefulSet, currentRevision, updateRevision string, ordinal int) *v1.Pod {
if currentSet.Spec.UpdateStrategy.Type == apps.RollingUpdateStatefulSetStrategyType &&
(currentSet.Spec.UpdateStrategy.RollingUpdate == nil && ordinal < int(currentSet.Status.CurrentReplicas)) ||
(currentSet.Spec.UpdateStrategy.RollingUpdate != nil && ordinal < int(*currentSet.Spec.UpdateStrategy.RollingUpdate.Partition)) {
(currentSet.Spec.UpdateStrategy.RollingUpdate == nil && ordinal < (getStartOrdinal(currentSet)+int(currentSet.Status.CurrentReplicas))) ||
(currentSet.Spec.UpdateStrategy.RollingUpdate != nil && ordinal < (getStartOrdinal(currentSet)+int(*currentSet.Spec.UpdateStrategy.RollingUpdate.Partition))) {
pod := newStatefulSetPod(currentSet, ordinal)
setPodRevision(pod, currentRevision)
return pod