diff --git a/pkg/controller/volume/pvcprotection/BUILD b/pkg/controller/volume/pvcprotection/BUILD index 9d6c4b77b63..1be72d6d342 100644 --- a/pkg/controller/volume/pvcprotection/BUILD +++ b/pkg/controller/volume/pvcprotection/BUILD @@ -13,6 +13,7 @@ go_library( "//pkg/volume/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", diff --git a/pkg/controller/volume/pvcprotection/pvc_protection_controller.go b/pkg/controller/volume/pvcprotection/pvc_protection_controller.go index 979679b3403..b6f964cc69c 100644 --- a/pkg/controller/volume/pvcprotection/pvc_protection_controller.go +++ b/pkg/controller/volume/pvcprotection/pvc_protection_controller.go @@ -22,6 +22,7 @@ import ( v1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -126,7 +127,7 @@ func (c *Controller) processNextWorkItem() bool { pvcNamespace, pvcName, err := cache.SplitMetaNamespaceKey(pvcKey.(string)) if err != nil { - utilruntime.HandleError(fmt.Errorf("Error parsing PVC key %q: %v", pvcKey, err)) + utilruntime.HandleError(fmt.Errorf("error parsing PVC key %q: %v", pvcKey, err)) return true } @@ -168,6 +169,7 @@ func (c *Controller) processPVC(pvcNamespace, pvcName string) error { if !isUsed { return c.removeFinalizer(pvc) } + klog.V(2).Infof("Keeping PVC %s/%s because it is still being used", pvc.Namespace, pvc.Name) } if protectionutil.NeedToAddFinalizer(pvc, volumeutil.PVCProtectionFinalizer) { @@ -209,44 +211,84 @@ func (c *Controller) removeFinalizer(pvc *v1.PersistentVolumeClaim) error { } func (c *Controller) isBeingUsed(pvc *v1.PersistentVolumeClaim) (bool, error) { + // Look for a Pod using pvc in the Informer's cache. If one is found the + // correct decision to keep pvc is taken without doing an expensive live + // list. + if inUse, err := c.askInformer(pvc); err != nil { + // No need to return because a live list will follow. + klog.Error(err) + } else if inUse { + return true, nil + } + + // Even if no Pod using pvc was found in the Informer's cache it doesn't + // mean such a Pod doesn't exist: it might just not be in the cache yet. To + // be 100% confident that it is safe to delete pvc make sure no Pod is using + // it among those returned by a live list. + return c.askAPIServer(pvc) +} + +func (c *Controller) askInformer(pvc *v1.PersistentVolumeClaim) (bool, error) { + klog.V(4).Infof("Looking for Pods using PVC %s/%s in the Informer's cache", pvc.Namespace, pvc.Name) + pods, err := c.podLister.Pods(pvc.Namespace).List(labels.Everything()) if err != nil { - return false, err + return false, fmt.Errorf("cache-based list of pods failed while processing %s/%s: %s", pvc.Namespace, pvc.Name, err.Error()) } + for _, pod := range pods { - if pod.Spec.NodeName == "" { - // This pod is not scheduled. We have a predicated in scheduler that - // prevents scheduling pods with deletion timestamp, so we can be - // pretty sure it won't be scheduled in parallel to this check. - // Therefore this pod does not block the PVC from deletion. - klog.V(4).Infof("Skipping unscheduled pod %s when checking PVC %s/%s", pod.Name, pvc.Namespace, pvc.Name) - continue - } - for _, volume := range pod.Spec.Volumes { - if volume.PersistentVolumeClaim == nil { - continue - } - if volume.PersistentVolumeClaim.ClaimName == pvc.Name { - klog.V(2).Infof("Keeping PVC %s/%s, it is used by pod %s/%s", pvc.Namespace, pvc.Name, pod.Namespace, pod.Name) - return true, nil - } + if podUsesPVC(pod, pvc.Name) { + return true, nil } } - klog.V(3).Infof("PVC %s/%s is unused", pvc.Namespace, pvc.Name) + klog.V(4).Infof("No Pod using PVC %s/%s was found in the Informer's cache", pvc.Namespace, pvc.Name) return false, nil } -// pvcAddedUpdated reacts to pvc added/updated/deleted events +func (c *Controller) askAPIServer(pvc *v1.PersistentVolumeClaim) (bool, error) { + klog.V(4).Infof("Looking for Pods using PVC %s/%s with a live list", pvc.Namespace, pvc.Name) + + podsList, err := c.client.CoreV1().Pods(pvc.Namespace).List(metav1.ListOptions{}) + if err != nil { + return false, fmt.Errorf("live list of pods failed: %s", err.Error()) + } + + for _, pod := range podsList.Items { + if podUsesPVC(&pod, pvc.Name) { + return true, nil + } + } + + klog.V(2).Infof("PVC %s/%s is unused", pvc.Namespace, pvc.Name) + return false, nil +} + +func podUsesPVC(pod *v1.Pod, pvc string) bool { + // Check whether pvc is used by pod only if pod is scheduled, because + // kubelet sees pods after they have been scheduled and it won't allow + // starting a pod referencing a PVC with a non-nil deletionTimestamp. + if pod.Spec.NodeName != "" { + for _, volume := range pod.Spec.Volumes { + if volume.PersistentVolumeClaim != nil && volume.PersistentVolumeClaim.ClaimName == pvc { + klog.V(2).Infof("Pod %s/%s uses PVC %s", pod.Namespace, pod.Name, pvc) + return true + } + } + } + return false +} + +// pvcAddedUpdated reacts to pvc added/updated events func (c *Controller) pvcAddedUpdated(obj interface{}) { pvc, ok := obj.(*v1.PersistentVolumeClaim) if !ok { utilruntime.HandleError(fmt.Errorf("PVC informer returned non-PVC object: %#v", obj)) return } - key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pvc) + key, err := cache.MetaNamespaceKeyFunc(pvc) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for Persistent Volume Claim %#v: %v", pvc, err)) + utilruntime.HandleError(fmt.Errorf("couldn't get key for Persistent Volume Claim %#v: %v", pvc, err)) return } klog.V(4).Infof("Got event on PVC %s", key) @@ -280,12 +322,12 @@ func (*Controller) parsePod(obj interface{}) *v1.Pod { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) return nil } pod, ok = tombstone.Obj.(*v1.Pod) if !ok { - utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Pod %#v", obj)) + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Pod %#v", obj)) return nil } } diff --git a/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go b/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go index f65b55f75ec..c57037eff5d 100644 --- a/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go +++ b/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go @@ -147,16 +147,30 @@ func generateUpdateErrorFunc(t *testing.T, failures int) clienttesting.ReactionF } func TestPVCProtectionController(t *testing.T) { - pvcVer := schema.GroupVersionResource{ + pvcGVR := schema.GroupVersionResource{ Group: v1.GroupName, Version: "v1", Resource: "persistentvolumeclaims", } + podGVR := schema.GroupVersionResource{ + Group: v1.GroupName, + Version: "v1", + Resource: "pods", + } + podGVK := schema.GroupVersionKind{ + Group: v1.GroupName, + Version: "v1", + Kind: "Pod", + } tests := []struct { name string // Object to insert into fake kubeclient before the test starts. initialObjects []runtime.Object + // Whether not to insert the content of initialObjects into the + // informers before the test starts. Set it to true to simulate the case + // where informers have not been notified yet of certain API objects. + informersAreLate bool // Optional client reactors. reactors []reaction // PVC event to simulate. This PVC will be automatically added to @@ -180,7 +194,7 @@ func TestPVCProtectionController(t *testing.T) { name: "StorageObjectInUseProtection Enabled, PVC without finalizer -> finalizer is added", updatedPVC: pvc(), expectedActions: []clienttesting.Action{ - clienttesting.NewUpdateAction(pvcVer, defaultNS, withProtectionFinalizer(pvc())), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, withProtectionFinalizer(pvc())), }, storageObjectInUseProtectionEnabled: true, }, @@ -208,11 +222,11 @@ func TestPVCProtectionController(t *testing.T) { }, expectedActions: []clienttesting.Action{ // This fails - clienttesting.NewUpdateAction(pvcVer, defaultNS, withProtectionFinalizer(pvc())), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, withProtectionFinalizer(pvc())), // This fails too - clienttesting.NewUpdateAction(pvcVer, defaultNS, withProtectionFinalizer(pvc())), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, withProtectionFinalizer(pvc())), // This succeeds - clienttesting.NewUpdateAction(pvcVer, defaultNS, withProtectionFinalizer(pvc())), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, withProtectionFinalizer(pvc())), }, storageObjectInUseProtectionEnabled: true, }, @@ -220,7 +234,8 @@ func TestPVCProtectionController(t *testing.T) { name: "StorageObjectInUseProtection Enabled, deleted PVC with finalizer -> finalizer is removed", updatedPVC: deleted(withProtectionFinalizer(pvc())), expectedActions: []clienttesting.Action{ - clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())), }, storageObjectInUseProtectionEnabled: true, }, @@ -228,7 +243,8 @@ func TestPVCProtectionController(t *testing.T) { name: "StorageObjectInUseProtection Disabled, deleted PVC with finalizer -> finalizer is removed", updatedPVC: deleted(withProtectionFinalizer(pvc())), expectedActions: []clienttesting.Action{ - clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())), }, storageObjectInUseProtectionEnabled: false, }, @@ -243,17 +259,20 @@ func TestPVCProtectionController(t *testing.T) { }, }, expectedActions: []clienttesting.Action{ + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), // Fails - clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())), + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), // Fails too - clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())), + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), // Succeeds - clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())), }, storageObjectInUseProtectionEnabled: true, }, { - name: "deleted PVC with finalizer + pods with the PVC exists -> finalizer is not removed", + name: "deleted PVC with finalizer + pod with the PVC exists -> finalizer is not removed", initialObjects: []runtime.Object{ withPVC(defaultPVCName, pod()), }, @@ -261,18 +280,19 @@ func TestPVCProtectionController(t *testing.T) { expectedActions: []clienttesting.Action{}, }, { - name: "deleted PVC with finalizer + pods with unrelated PVC and EmptyDir exists -> finalizer is removed", + name: "deleted PVC with finalizer + pod with unrelated PVC and EmptyDir exists -> finalizer is removed", initialObjects: []runtime.Object{ withEmptyDir(withPVC("unrelatedPVC", pod())), }, updatedPVC: deleted(withProtectionFinalizer(pvc())), expectedActions: []clienttesting.Action{ - clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())), }, storageObjectInUseProtectionEnabled: true, }, { - name: "deleted PVC with finalizer + pods with the PVC finished but is not deleted -> finalizer is not removed", + name: "deleted PVC with finalizer + pod with the PVC finished but is not deleted -> finalizer is not removed", initialObjects: []runtime.Object{ withStatus(v1.PodFailed, withPVC(defaultPVCName, pod())), }, @@ -280,6 +300,18 @@ func TestPVCProtectionController(t *testing.T) { expectedActions: []clienttesting.Action{}, storageObjectInUseProtectionEnabled: true, }, + { + name: "deleted PVC with finalizer + pod with the PVC exists but is not in the Informer's cache yet -> finalizer is not removed", + initialObjects: []runtime.Object{ + withPVC(defaultPVCName, pod()), + }, + informersAreLate: true, + updatedPVC: deleted(withProtectionFinalizer(pvc())), + expectedActions: []clienttesting.Action{ + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), + }, + storageObjectInUseProtectionEnabled: true, + }, // // Pod events // @@ -308,7 +340,8 @@ func TestPVCProtectionController(t *testing.T) { }, updatedPod: unscheduled(withPVC(defaultPVCName, pod())), expectedActions: []clienttesting.Action{ - clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())), }, storageObjectInUseProtectionEnabled: true, }, @@ -319,7 +352,8 @@ func TestPVCProtectionController(t *testing.T) { }, deletedPod: withStatus(v1.PodRunning, withPVC(defaultPVCName, pod())), expectedActions: []clienttesting.Action{ - clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())), }, storageObjectInUseProtectionEnabled: true, }, @@ -331,7 +365,8 @@ func TestPVCProtectionController(t *testing.T) { deletedPod: withPVC(defaultPVCName, pod()), updatedPod: withUID("uid2", pod()), expectedActions: []clienttesting.Action{ - clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())), }, storageObjectInUseProtectionEnabled: true, }, @@ -368,15 +403,26 @@ func TestPVCProtectionController(t *testing.T) { } for _, test := range tests { - // Create client with initial data - objs := test.initialObjects + // Create initial data for client and informers. + var ( + clientObjs []runtime.Object + informersObjs []runtime.Object + ) if test.updatedPVC != nil { - objs = append(objs, test.updatedPVC) + clientObjs = append(clientObjs, test.updatedPVC) + informersObjs = append(informersObjs, test.updatedPVC) } if test.updatedPod != nil { - objs = append(objs, test.updatedPod) + clientObjs = append(clientObjs, test.updatedPod) + informersObjs = append(informersObjs, test.updatedPod) } - client := fake.NewSimpleClientset(objs...) + clientObjs = append(clientObjs, test.initialObjects...) + if !test.informersAreLate { + informersObjs = append(informersObjs, test.initialObjects...) + } + + // Create client with initial data + client := fake.NewSimpleClientset(clientObjs...) // Create informers informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) @@ -385,7 +431,7 @@ func TestPVCProtectionController(t *testing.T) { // Populate the informers with initial objects so the controller can // Get() and List() it. - for _, obj := range objs { + for _, obj := range informersObjs { switch obj.(type) { case *v1.PersistentVolumeClaim: pvcInformer.Informer().GetStore().Add(obj) @@ -435,7 +481,7 @@ func TestPVCProtectionController(t *testing.T) { } currentActionCount := len(client.Actions()) if currentActionCount < len(test.expectedActions) { - // Do not log evey wait, only when the action count changes. + // Do not log every wait, only when the action count changes. if lastReportedActionCount < currentActionCount { klog.V(5).Infof("Test %q: got %d actions out of %d, waiting for the rest", test.name, currentActionCount, len(test.expectedActions)) lastReportedActionCount = currentActionCount