diff --git a/pkg/controller/statefulset/stateful_pod_control.go b/pkg/controller/statefulset/stateful_pod_control.go index 4bfc5c847b1..0d045763e33 100644 --- a/pkg/controller/statefulset/stateful_pod_control.go +++ b/pkg/controller/statefulset/stateful_pod_control.go @@ -27,68 +27,115 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" errorutils "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" - appslisters "k8s.io/client-go/listers/apps/v1" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/features" ) -// StatefulPodControlInterface defines the interface that StatefulSetController uses to create, update, and delete Pods, +// StatefulPodControlObjectManager abstracts the manipulation of Pods and PVCs. The real controller implements this +// with a clientset for writes and listers for reads; for tests we provide stubs. +type StatefulPodControlObjectManager interface { + CreatePod(ctx context.Context, pod *v1.Pod) error + GetPod(namespace, podName string) (*v1.Pod, error) + UpdatePod(pod *v1.Pod) error + DeletePod(pod *v1.Pod) error + CreateClaim(claim *v1.PersistentVolumeClaim) error + GetClaim(namespace, claimName string) (*v1.PersistentVolumeClaim, error) + UpdateClaim(claim *v1.PersistentVolumeClaim) error +} + +// StatefulPodControl defines the interface that StatefulSetController uses to create, update, and delete Pods, // and to update the Status of a StatefulSet. It follows the design paradigms used for PodControl, but its // implementation provides for PVC creation, ordered Pod creation, ordered Pod termination, and Pod identity enforcement. -// Like controller.PodControlInterface, it is implemented as an interface to provide for testing fakes. -type StatefulPodControlInterface interface { - // CreateStatefulPod create a Pod in a StatefulSet. Any PVCs necessary for the Pod are created prior to creating - // the Pod. If the returned error is nil the Pod and its PVCs have been created. - CreateStatefulPod(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error - // UpdateStatefulPod Updates a Pod in a StatefulSet. If the Pod already has the correct identity and stable - // storage this method is a no-op. If the Pod must be mutated to conform to the Set, it is mutated and updated. - // pod is an in-out parameter, and any updates made to the pod are reflected as mutations to this parameter. If - // the create is successful, the returned error is nil. - UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error - // DeleteStatefulPod deletes a Pod in a StatefulSet. The pods PVCs are not deleted. If the delete is successful, - // the returned error is nil. - DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error -} - -func NewRealStatefulPodControl( - client clientset.Interface, - setLister appslisters.StatefulSetLister, - podLister corelisters.PodLister, - pvcLister corelisters.PersistentVolumeClaimLister, - recorder record.EventRecorder, -) StatefulPodControlInterface { - return &realStatefulPodControl{client, setLister, podLister, pvcLister, recorder} -} - -// realStatefulPodControl implements StatefulPodControlInterface using a clientset.Interface to communicate with the -// API server. The struct is package private as the internal details are irrelevant to importing packages. -type realStatefulPodControl struct { - client clientset.Interface - setLister appslisters.StatefulSetLister - podLister corelisters.PodLister - pvcLister corelisters.PersistentVolumeClaimLister +// Manipulation of objects is provided through objectMgr, which allows the k8s API to be mocked out for testing. +type StatefulPodControl struct { + objectMgr StatefulPodControlObjectManager recorder record.EventRecorder } -func (spc *realStatefulPodControl) CreateStatefulPod(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error { +// NewStatefulPodControl constructs a StatefulPodControl using a realStatefulPodControlObjectManager with the given +// clientset, listers and EventRecorder. +func NewStatefulPodControl( + client clientset.Interface, + podLister corelisters.PodLister, + claimLister corelisters.PersistentVolumeClaimLister, + recorder record.EventRecorder, +) *StatefulPodControl { + return &StatefulPodControl{&realStatefulPodControlObjectManager{client, podLister, claimLister}, recorder} +} + +// NewStatefulPodControlFromManager creates a StatefulPodControl using the given StatefulPodControlObjectManager and recorder. +func NewStatefulPodControlFromManager(om StatefulPodControlObjectManager, recorder record.EventRecorder) *StatefulPodControl { + return &StatefulPodControl{om, recorder} +} + +// realStatefulPodControlObjectManager uses a clientset.Interface and listers. +type realStatefulPodControlObjectManager struct { + client clientset.Interface + podLister corelisters.PodLister + claimLister corelisters.PersistentVolumeClaimLister +} + +func (om *realStatefulPodControlObjectManager) CreatePod(ctx context.Context, pod *v1.Pod) error { + _, err := om.client.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}) + return err +} + +func (om *realStatefulPodControlObjectManager) GetPod(namespace, podName string) (*v1.Pod, error) { + return om.podLister.Pods(namespace).Get(podName) +} + +func (om *realStatefulPodControlObjectManager) UpdatePod(pod *v1.Pod) error { + _, err := om.client.CoreV1().Pods(pod.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{}) + return err +} + +func (om *realStatefulPodControlObjectManager) DeletePod(pod *v1.Pod) error { + return om.client.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) +} + +func (om *realStatefulPodControlObjectManager) CreateClaim(claim *v1.PersistentVolumeClaim) error { + _, err := om.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Create(context.TODO(), claim, metav1.CreateOptions{}) + return err +} + +func (om *realStatefulPodControlObjectManager) GetClaim(namespace, claimName string) (*v1.PersistentVolumeClaim, error) { + return om.claimLister.PersistentVolumeClaims(namespace).Get(claimName) +} + +func (om *realStatefulPodControlObjectManager) UpdateClaim(claim *v1.PersistentVolumeClaim) error { + _, err := om.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(context.TODO(), claim, metav1.UpdateOptions{}) + return err +} + +func (spc *StatefulPodControl) CreateStatefulPod(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error { // Create the Pod's PVCs prior to creating the Pod if err := spc.createPersistentVolumeClaims(set, pod); err != nil { spc.recordPodEvent("create", set, pod, err) return err } // If we created the PVCs attempt to create the Pod - _, err := spc.client.CoreV1().Pods(set.Namespace).Create(ctx, pod, metav1.CreateOptions{}) + err := spc.objectMgr.CreatePod(ctx, pod) // sink already exists errors if apierrors.IsAlreadyExists(err) { return err } + if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { + // Set PVC policy as much as is possible at this point. + if err := spc.UpdatePodClaimForRetentionPolicy(set, pod); err != nil { + spc.recordPodEvent("update", set, pod, err) + return err + } + } spc.recordPodEvent("create", set, pod, err) return err } -func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error { +func (spc *StatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error { attemptedUpdate := false err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { // assume the Pod is consistent @@ -108,6 +155,21 @@ func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod return err } } + if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { + // if the Pod's PVCs are not consistent with the StatefulSet's PVC deletion policy, update the PVC + // and dirty the pod. + if match, err := spc.ClaimsMatchRetentionPolicy(set, pod); err != nil { + spc.recordPodEvent("update", set, pod, err) + return err + } else if !match { + if err := spc.UpdatePodClaimForRetentionPolicy(set, pod); err != nil { + spc.recordPodEvent("update", set, pod, err) + return err + } + consistent = false + } + } + // if the Pod is not dirty, do nothing if consistent { return nil @@ -115,16 +177,17 @@ func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod attemptedUpdate = true // commit the update, retrying on conflicts - _, updateErr := spc.client.CoreV1().Pods(set.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{}) + + updateErr := spc.objectMgr.UpdatePod(pod) if updateErr == nil { return nil } - if updated, err := spc.podLister.Pods(set.Namespace).Get(pod.Name); err == nil { + if updated, err := spc.objectMgr.GetPod(set.Namespace, pod.Name); err == nil { // make a copy so we don't mutate the shared cache pod = updated.DeepCopy() } else { - utilruntime.HandleError(fmt.Errorf("error getting updated Pod %s/%s from lister: %v", set.Namespace, pod.Name, err)) + utilruntime.HandleError(fmt.Errorf("error getting updated Pod %s/%s: %w", set.Namespace, pod.Name, err)) } return updateErr @@ -135,15 +198,92 @@ func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod return err } -func (spc *realStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error { - err := spc.client.CoreV1().Pods(set.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) +func (spc *StatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error { + err := spc.objectMgr.DeletePod(pod) spc.recordPodEvent("delete", set, pod, err) return err } +// ClaimsMatchRetentionPolicy returns false if the PVCs for pod are not consistent with set's PVC deletion policy. +// An error is returned if something is not consistent. This is expected if the pod is being otherwise updated, +// but a problem otherwise (see usage of this method in UpdateStatefulPod). +func (spc *StatefulPodControl) ClaimsMatchRetentionPolicy(set *apps.StatefulSet, pod *v1.Pod) (bool, error) { + ordinal := getOrdinal(pod) + templates := set.Spec.VolumeClaimTemplates + for i := range templates { + claimName := getPersistentVolumeClaimName(set, &templates[i], ordinal) + claim, err := spc.objectMgr.GetClaim(set.Namespace, claimName) + switch { + case apierrors.IsNotFound(err): + klog.V(4).Infof("Expected claim %s missing, continuing to pick up in next iteration", claimName) + case err != nil: + return false, fmt.Errorf("Could not retrieve claim %s for %s when checking PVC deletion policy", claimName, pod.Name) + default: + if !claimOwnerMatchesSetAndPod(claim, set, pod) { + return false, nil + } + } + } + return true, nil +} + +// UpdatePodClaimForRetentionPolicy updates the PVCs used by pod to match the PVC deletion policy of set. +func (spc *StatefulPodControl) UpdatePodClaimForRetentionPolicy(set *apps.StatefulSet, pod *v1.Pod) error { + ordinal := getOrdinal(pod) + templates := set.Spec.VolumeClaimTemplates + for i := range templates { + claimName := getPersistentVolumeClaimName(set, &templates[i], ordinal) + claim, err := spc.objectMgr.GetClaim(set.Namespace, claimName) + switch { + case apierrors.IsNotFound(err): + klog.V(4).Infof("Expected claim %s missing, continuing to pick up in next iteration.") + case err != nil: + return fmt.Errorf("Could not retrieve claim %s not found for %s when checking PVC deletion policy: %w", claimName, pod.Name, err) + default: + if !claimOwnerMatchesSetAndPod(claim, set, pod) { + needsUpdate := updateClaimOwnerRefForSetAndPod(claim, set, pod) + if needsUpdate { + err := spc.objectMgr.UpdateClaim(claim) + if err != nil { + return fmt.Errorf("Could not update claim %s for delete policy ownerRefs: %w", claimName, err) + } + } + } + } + } + return nil +} + +// PodClaimIsStale returns true for a stale PVC that should block pod creation. If the scaling +// policy is deletion, and a PVC has an ownerRef that does not match the pod, the PVC is stale. This +// includes pods whose UID has not been created. +func (spc *StatefulPodControl) PodClaimIsStale(set *apps.StatefulSet, pod *v1.Pod) (bool, error) { + policy := getPersistentVolumeClaimRetentionPolicy(set) + if policy.WhenScaled == apps.RetainPersistentVolumeClaimRetentionPolicyType { + // PVCs are meant to be reused and so can't be stale. + return false, nil + } + for _, claim := range getPersistentVolumeClaims(set, pod) { + pvc, err := spc.objectMgr.GetClaim(claim.Namespace, claim.Name) + switch { + case apierrors.IsNotFound(err): + // If the claim doesn't exist yet, it can't be stale. + continue + case err != nil: + return false, err + case err == nil: + // A claim is stale if it doesn't match the pod's UID, including if the pod has no UID. + if hasStaleOwnerRef(pvc, pod) { + return true, nil + } + } + } + return false, nil +} + // recordPodEvent records an event for verb applied to a Pod in a StatefulSet. If err is nil the generated event will // have a reason of v1.EventTypeNormal. If err is not nil the generated event will have a reason of v1.EventTypeWarning. -func (spc *realStatefulPodControl) recordPodEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, err error) { +func (spc *StatefulPodControl) recordPodEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, err error) { if err == nil { reason := fmt.Sprintf("Successful%s", strings.Title(verb)) message := fmt.Sprintf("%s Pod %s in StatefulSet %s successful", @@ -160,7 +300,7 @@ func (spc *realStatefulPodControl) recordPodEvent(verb string, set *apps.Statefu // recordClaimEvent records an event for verb applied to the PersistentVolumeClaim of a Pod in a StatefulSet. If err is // nil the generated event will have a reason of v1.EventTypeNormal. If err is not nil the generated event will have a // reason of v1.EventTypeWarning. -func (spc *realStatefulPodControl) recordClaimEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, claim *v1.PersistentVolumeClaim, err error) { +func (spc *StatefulPodControl) recordClaimEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, claim *v1.PersistentVolumeClaim, err error) { if err == nil { reason := fmt.Sprintf("Successful%s", strings.Title(verb)) message := fmt.Sprintf("%s Claim %s Pod %s in StatefulSet %s success", @@ -178,13 +318,13 @@ func (spc *realStatefulPodControl) recordClaimEvent(verb string, set *apps.State // set. If all of the claims for Pod are successfully created, the returned error is nil. If creation fails, this method // may be called again until no error is returned, indicating the PersistentVolumeClaims for pod are consistent with // set's Spec. -func (spc *realStatefulPodControl) createPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error { +func (spc *StatefulPodControl) createPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error { var errs []error for _, claim := range getPersistentVolumeClaims(set, pod) { - pvc, err := spc.pvcLister.PersistentVolumeClaims(claim.Namespace).Get(claim.Name) + pvc, err := spc.objectMgr.GetClaim(claim.Namespace, claim.Name) switch { case apierrors.IsNotFound(err): - _, err := spc.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Create(context.TODO(), &claim, metav1.CreateOptions{}) + err := spc.objectMgr.CreateClaim(&claim) if err != nil { errs = append(errs, fmt.Errorf("failed to create PVC %s: %s", claim.Name, err)) } @@ -203,5 +343,3 @@ func (spc *realStatefulPodControl) createPersistentVolumeClaims(set *apps.Statef } return errorutils.NewAggregate(errs) } - -var _ StatefulPodControlInterface = &realStatefulPodControl{} diff --git a/pkg/controller/statefulset/stateful_set.go b/pkg/controller/statefulset/stateful_set.go index 7f5f1174ddb..6c1d1ee7fac 100644 --- a/pkg/controller/statefulset/stateful_set.go +++ b/pkg/controller/statefulset/stateful_set.go @@ -91,9 +91,8 @@ func NewStatefulSetController( ssc := &StatefulSetController{ kubeClient: kubeClient, control: NewDefaultStatefulSetControl( - NewRealStatefulPodControl( + NewStatefulPodControl( kubeClient, - setInformer.Lister(), podInformer.Lister(), pvcInformer.Lister(), recorder), @@ -352,10 +351,14 @@ func (ssc *StatefulSetController) getStatefulSetsForPod(pod *v1.Pod) []*apps.Sta if len(sets) > 1 { // ControllerRef will ensure we don't do anything crazy, but more than one // item in this list nevertheless constitutes user error. + setNames := []string{} + for _, s := range sets { + setNames = append(setNames, s.Name) + } utilruntime.HandleError( fmt.Errorf( - "user error: more than one StatefulSet is selecting pods with labels: %+v", - pod.Labels)) + "user error: more than one StatefulSet is selecting pods with labels: %+v. Sets: %v", + pod.Labels, setNames)) } return sets } diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index 7f653d13c20..a353e86bae0 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -54,7 +54,7 @@ type StatefulSetControlInterface interface { // to update the status of StatefulSets. You should use an instance returned from NewRealStatefulPodControl() for any // scenario other than testing. func NewDefaultStatefulSetControl( - podControl StatefulPodControlInterface, + podControl *StatefulPodControl, statusUpdater StatefulSetStatusUpdaterInterface, controllerHistory history.Interface, recorder record.EventRecorder) StatefulSetControlInterface { @@ -62,7 +62,7 @@ func NewDefaultStatefulSetControl( } type defaultStatefulSetControl struct { - podControl StatefulPodControlInterface + podControl *StatefulPodControl statusUpdater StatefulSetStatusUpdaterInterface controllerHistory history.Interface recorder record.EventRecorder @@ -333,7 +333,6 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( // if the ordinal of the pod is within the range of the current number of replicas, // insert it at the indirection of its ordinal replicas[ord] = pods[i] - } else if ord >= replicaCount { // if the ordinal is greater than the number of replicas add it to the condemned list condemned = append(condemned, pods[i]) @@ -418,6 +417,14 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( } // If we find a Pod that has not been created we create the Pod if !isCreated(replicas[i]) { + if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { + if isStale, err := ssc.podControl.PodClaimIsStale(set, replicas[i]); err != nil { + return &status, err + } else if isStale { + // If a pod has a stale PVC, no more work can be done this round. + return &status, err + } + } if err := ssc.podControl.CreateStatefulPod(ctx, set, replicas[i]); err != nil { return &status, err } @@ -428,7 +435,6 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( if getPodRevision(replicas[i]) == updateRevision.Name { status.UpdatedReplicas++ } - // if the set does not allow bursting, return immediately if monotonic { return &status, nil @@ -471,7 +477,16 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( return &status, nil } // Enforce the StatefulSet invariants - if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) { + retentionMatch := true + if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { + var err error + retentionMatch, err = ssc.podControl.ClaimsMatchRetentionPolicy(updateSet, replicas[i]) + // An error is expected if the pod is not yet fully updated, and so return is treated as matching. + if err != nil { + retentionMatch = true + } + } + if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) && retentionMatch { continue } // Make a deep copy so we don't mutate the shared cache @@ -481,6 +496,19 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( } } + if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { + // Ensure ownerRefs are set correctly for the condemned pods. + for i := range condemned { + if matchPolicy, err := ssc.podControl.ClaimsMatchRetentionPolicy(updateSet, condemned[i]); err != nil { + return &status, err + } else if !matchPolicy { + if err := ssc.podControl.UpdatePodClaimForRetentionPolicy(updateSet, condemned[i]); err != nil { + return &status, err + } + } + } + } + // At this point, all of the current Replicas are Running, Ready and Available, we can consider termination. // We will wait for all predecessors to be Running and Ready prior to attempting a deletion. // We will terminate Pods in a monotonically decreasing order over [len(pods),set.Spec.Replicas). diff --git a/pkg/controller/statefulset/stateful_set_utils.go b/pkg/controller/statefulset/stateful_set_utils.go index 88ae93d2834..3233a9e5255 100644 --- a/pkg/controller/statefulset/stateful_set_utils.go +++ b/pkg/controller/statefulset/stateful_set_utils.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/history" @@ -132,6 +133,179 @@ func storageMatches(set *apps.StatefulSet, pod *v1.Pod) bool { return true } +// getPersistentVolumeClaimPolicy returns the PVC policy for a StatefulSet, returning a retain policy if the set policy is nil. +func getPersistentVolumeClaimRetentionPolicy(set *apps.StatefulSet) apps.StatefulSetPersistentVolumeClaimRetentionPolicy { + policy := apps.StatefulSetPersistentVolumeClaimRetentionPolicy{ + WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType, + WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType, + } + if set.Spec.PersistentVolumeClaimRetentionPolicy != nil { + policy = *set.Spec.PersistentVolumeClaimRetentionPolicy + } + return policy +} + +// claimOwnerMatchesSetAndPod returns false if the ownerRefs of the claim are not set consistently with the +// PVC deletion policy for the StatefulSet. +func claimOwnerMatchesSetAndPod(claim *v1.PersistentVolumeClaim, set *apps.StatefulSet, pod *v1.Pod) bool { + policy := set.Spec.PersistentVolumeClaimRetentionPolicy + const retain = apps.RetainPersistentVolumeClaimRetentionPolicyType + const delete = apps.DeletePersistentVolumeClaimRetentionPolicyType + switch { + default: + klog.Errorf("Unknown policy %v; treating as Retain", set.Spec.PersistentVolumeClaimRetentionPolicy) + fallthrough + case policy.WhenScaled == retain && policy.WhenDeleted == retain: + if hasOwnerRef(claim, set) || + hasOwnerRef(claim, pod) { + return false + } + case policy.WhenScaled == retain && policy.WhenDeleted == delete: + if !hasOwnerRef(claim, set) || + hasOwnerRef(claim, pod) { + return false + } + case policy.WhenScaled == delete && policy.WhenDeleted == retain: + if hasOwnerRef(claim, set) { + return false + } + podScaledDown := getOrdinal(pod) >= int(*set.Spec.Replicas) + if podScaledDown != hasOwnerRef(claim, pod) { + return false + } + case policy.WhenScaled == delete && policy.WhenDeleted == delete: + podScaledDown := getOrdinal(pod) >= int(*set.Spec.Replicas) + // If a pod is scaled down, there should be no set ref and a pod ref; + // if the pod is not scaled down it's the other way around. + if podScaledDown == hasOwnerRef(claim, set) { + return false + } + if podScaledDown != hasOwnerRef(claim, pod) { + return false + } + } + return true +} + +// updateClaimOwnerRefForSetAndPod updates the ownerRefs for the claim according to the deletion policy of +// the StatefulSet. Returns true if the claim was changed and should be updated and false otherwise. +func updateClaimOwnerRefForSetAndPod(claim *v1.PersistentVolumeClaim, set *apps.StatefulSet, pod *v1.Pod) bool { + needsUpdate := false + // Sometimes the version and kind are not set {pod,set}.TypeMeta. These are necessary for the ownerRef. + // This is the case both in real clusters and the unittests. + // TODO: there must be a better way to do this other than hardcoding the pod version? + updateMeta := func(tm *metav1.TypeMeta, kind string) { + if tm.APIVersion == "" { + if kind == "StatefulSet" { + tm.APIVersion = "apps/v1" + } else { + tm.APIVersion = "v1" + } + } + if tm.Kind == "" { + tm.Kind = kind + } + } + podMeta := pod.TypeMeta + updateMeta(&podMeta, "Pod") + setMeta := set.TypeMeta + updateMeta(&setMeta, "StatefulSet") + policy := set.Spec.PersistentVolumeClaimRetentionPolicy + const retain = apps.RetainPersistentVolumeClaimRetentionPolicyType + const delete = apps.DeletePersistentVolumeClaimRetentionPolicyType + switch { + default: + klog.Errorf("Unknown policy %v, treating as Retain", set.Spec.PersistentVolumeClaimRetentionPolicy) + fallthrough + case policy.WhenScaled == retain && policy.WhenDeleted == retain: + needsUpdate = removeOwnerRef(claim, set) || needsUpdate + needsUpdate = removeOwnerRef(claim, pod) || needsUpdate + case policy.WhenScaled == retain && policy.WhenDeleted == delete: + needsUpdate = setOwnerRef(claim, set, &setMeta) || needsUpdate + needsUpdate = removeOwnerRef(claim, pod) || needsUpdate + case policy.WhenScaled == delete && policy.WhenDeleted == retain: + needsUpdate = removeOwnerRef(claim, set) || needsUpdate + podScaledDown := getOrdinal(pod) >= int(*set.Spec.Replicas) + if podScaledDown { + needsUpdate = setOwnerRef(claim, pod, &podMeta) || needsUpdate + } + if !podScaledDown { + needsUpdate = removeOwnerRef(claim, pod) || needsUpdate + } + case policy.WhenScaled == delete && policy.WhenDeleted == delete: + podScaledDown := getOrdinal(pod) >= int(*set.Spec.Replicas) + if podScaledDown { + needsUpdate = removeOwnerRef(claim, set) || needsUpdate + needsUpdate = setOwnerRef(claim, pod, &podMeta) || needsUpdate + } + if !podScaledDown { + needsUpdate = setOwnerRef(claim, set, &setMeta) || needsUpdate + needsUpdate = removeOwnerRef(claim, pod) || needsUpdate + } + } + return needsUpdate +} + +// hasOwnerRef returns true if target has an ownerRef to owner. +func hasOwnerRef(target, owner metav1.Object) bool { + ownerUID := owner.GetUID() + for _, ownerRef := range target.GetOwnerReferences() { + if ownerRef.UID == ownerUID { + return true + } + } + return false +} + +// hasStaleOwnerRef returns true if target has a ref to owner that appears to be stale. +func hasStaleOwnerRef(target, owner metav1.Object) bool { + for _, ownerRef := range target.GetOwnerReferences() { + if ownerRef.Name == owner.GetName() && ownerRef.UID != owner.GetUID() { + return true + } + } + return false +} + +// setOwnerRef adds owner to the ownerRefs of target, if necessary. Returns true if target needs to be +// updated and false otherwise. +func setOwnerRef(target, owner metav1.Object, ownerType *metav1.TypeMeta) bool { + if hasOwnerRef(target, owner) { + return false + } + ownerRefs := append( + target.GetOwnerReferences(), + metav1.OwnerReference{ + APIVersion: ownerType.APIVersion, + Kind: ownerType.Kind, + Name: owner.GetName(), + UID: owner.GetUID(), + }) + target.SetOwnerReferences(ownerRefs) + return true +} + +// removeOwnerRef removes owner from the ownerRefs of target, if necessary. Returns true if target needs +// to be updated and false otherwise. +func removeOwnerRef(target, owner metav1.Object) bool { + if !hasOwnerRef(target, owner) { + return false + } + ownerUID := owner.GetUID() + oldRefs := target.GetOwnerReferences() + newRefs := make([]metav1.OwnerReference, len(oldRefs)-1) + skip := 0 + for i := range oldRefs { + if oldRefs[i].UID == ownerUID { + skip = -1 + } else { + newRefs[i+skip] = oldRefs[i] + } + } + target.SetOwnerReferences(newRefs) + return true +} + // getPersistentVolumeClaims gets a map of PersistentVolumeClaims to their template names, as defined in set. The // returned PersistentVolumeClaims are each constructed with a the name specific to the Pod. This name is determined // by getPersistentVolumeClaimName. @@ -278,7 +452,10 @@ func newVersionedStatefulSetPod(currentSet, updateSet *apps.StatefulSet, current // Match check if the given StatefulSet's template matches the template stored in the given history. func Match(ss *apps.StatefulSet, history *apps.ControllerRevision) (bool, error) { - patch, err := getPatch(ss) + // Encoding the set for the patch may update its GVK metadata, which causes data races if this + // set is in an informer cache. + clone := ss.DeepCopy() + patch, err := getPatch(clone) if err != nil { return false, err } diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index 5ff47db220a..2b74e848953 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -339,19 +339,27 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding) eventsRule(), }, }) - addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{ - ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "statefulset-controller"}, - Rules: []rbacv1.PolicyRule{ - rbacv1helpers.NewRule("list", "watch").Groups(legacyGroup).Resources("pods").RuleOrDie(), - rbacv1helpers.NewRule("get", "list", "watch").Groups(appsGroup).Resources("statefulsets").RuleOrDie(), - rbacv1helpers.NewRule("update").Groups(appsGroup).Resources("statefulsets/status").RuleOrDie(), - rbacv1helpers.NewRule("update").Groups(appsGroup).Resources("statefulsets/finalizers").RuleOrDie(), - rbacv1helpers.NewRule("get", "create", "delete", "update", "patch").Groups(legacyGroup).Resources("pods").RuleOrDie(), - rbacv1helpers.NewRule("get", "create", "delete", "update", "patch", "list", "watch").Groups(appsGroup).Resources("controllerrevisions").RuleOrDie(), - rbacv1helpers.NewRule("get", "create").Groups(legacyGroup).Resources("persistentvolumeclaims").RuleOrDie(), - eventsRule(), - }, - }) + addControllerRole(&controllerRoles, &controllerRoleBindings, func() rbacv1.ClusterRole { + role := rbacv1.ClusterRole{ + ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "statefulset-controller"}, + Rules: []rbacv1.PolicyRule{ + rbacv1helpers.NewRule("list", "watch").Groups(legacyGroup).Resources("pods").RuleOrDie(), + rbacv1helpers.NewRule("get", "list", "watch").Groups(appsGroup).Resources("statefulsets").RuleOrDie(), + rbacv1helpers.NewRule("update").Groups(appsGroup).Resources("statefulsets/status").RuleOrDie(), + rbacv1helpers.NewRule("update").Groups(appsGroup).Resources("statefulsets/finalizers").RuleOrDie(), + rbacv1helpers.NewRule("get", "create", "delete", "update", "patch").Groups(legacyGroup).Resources("pods").RuleOrDie(), + rbacv1helpers.NewRule("get", "create", "delete", "update", "patch", "list", "watch").Groups(appsGroup).Resources("controllerrevisions").RuleOrDie(), + rbacv1helpers.NewRule("get", "create").Groups(legacyGroup).Resources("persistentvolumeclaims").RuleOrDie(), + eventsRule(), + }, + } + + if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { + role.Rules = append(role.Rules, rbacv1helpers.NewRule("update").Groups(legacyGroup).Resources("persistentvolumeclaims").RuleOrDie()) + } + + return role + }()) addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{ ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "ttl-controller"}, Rules: []rbacv1.PolicyRule{