From 791d1c81f0994ec0d7b529ab3dd91a41b5b9f961 Mon Sep 17 00:00:00 2001 From: matte21 Date: Tue, 23 Jul 2019 18:21:30 +0200 Subject: [PATCH 1/2] Add live list of pods to PVC protection controller Add live list of pods to PVC protection controller, as opposed to doing only a cache-based list through the Informer. Both lists are performed while processing a PVC with deletionTimestamp set to check whether Pods using the PVC exist and remove the finalizer to enable deletion of the PVC if that's not the case. Prior to this commit only the cache-based list was done but that's unreliable because a pod using the PVC might exist but not be in the cache just yet. On the other hand, the live list is 100% reliable. Note that it would be enough to do only the live list. Instead, this commit adds it after the cache-based list and performs it only if the latter finds no Pod blocking deletion of the PVC being processed. The rationale is that live lists are expensive and it's desirable to minimize them. The drawback is that if at the time of the cache-based list the cache has not been notified yet of the deletion of a Pod using the PVC the PVC is kept. Correctness is not compromised because the finalizer will be removed when the Pod deletion notification is received, but this means PVC deletion is delayed. Reducing live lists was valued more than deleting PVCs slightly faster. Also, add a unit test that fails without the change introduced by this commit and revamp old unit tests. The latter is needed because expected behavior is described in terms of API calls the controller makes, and this commit introduces new API calls (the live lists). --- pkg/controller/volume/pvcprotection/BUILD | 1 + .../pvc_protection_controller.go | 82 ++++++++++++---- .../pvc_protection_controller_test.go | 94 ++++++++++++++----- 3 files changed, 133 insertions(+), 44 deletions(-) diff --git a/pkg/controller/volume/pvcprotection/BUILD b/pkg/controller/volume/pvcprotection/BUILD index 9d6c4b77b63..1be72d6d342 100644 --- a/pkg/controller/volume/pvcprotection/BUILD +++ b/pkg/controller/volume/pvcprotection/BUILD @@ -13,6 +13,7 @@ go_library( "//pkg/volume/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", diff --git a/pkg/controller/volume/pvcprotection/pvc_protection_controller.go b/pkg/controller/volume/pvcprotection/pvc_protection_controller.go index 979679b3403..3b6aa13f85d 100644 --- a/pkg/controller/volume/pvcprotection/pvc_protection_controller.go +++ b/pkg/controller/volume/pvcprotection/pvc_protection_controller.go @@ -22,6 +22,7 @@ import ( v1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -168,6 +169,7 @@ func (c *Controller) processPVC(pvcNamespace, pvcName string) error { if !isUsed { return c.removeFinalizer(pvc) } + klog.V(2).Infof("Keeping PVC %s/%s because it is still being used", pvc.Namespace, pvc.Name) } if protectionutil.NeedToAddFinalizer(pvc, volumeutil.PVCProtectionFinalizer) { @@ -209,42 +211,82 @@ func (c *Controller) removeFinalizer(pvc *v1.PersistentVolumeClaim) error { } func (c *Controller) isBeingUsed(pvc *v1.PersistentVolumeClaim) (bool, error) { + // Look for a Pod using pvc in the Informer's cache. If one is found the + // correct decision to keep pvc is taken without doing an expensive live + // list. + if inUse, err := c.askInformer(pvc); err != nil { + // No need to return because a live list will follow. + klog.Error(err) + } else if inUse { + return true, nil + } + + // Even if no Pod using pvc was found in the Informer's cache it doesn't + // mean such a Pod doesn't exist: it might just not be in the cache yet. To + // be 100% confident that it is safe to delete pvc make sure no Pod is using + // it among those returned by a live list. + return c.askAPIServer(pvc) +} + +func (c *Controller) askInformer(pvc *v1.PersistentVolumeClaim) (bool, error) { + klog.V(4).Infof("Looking for Pods using PVC %s/%s in the Informer's cache", pvc.Namespace, pvc.Name) + pods, err := c.podLister.Pods(pvc.Namespace).List(labels.Everything()) if err != nil { - return false, err + return false, fmt.Errorf("Cache-based list of pods failed while processing %s/%s: %s", pvc.Namespace, pvc.Name, err.Error()) } + for _, pod := range pods { - if pod.Spec.NodeName == "" { - // This pod is not scheduled. We have a predicated in scheduler that - // prevents scheduling pods with deletion timestamp, so we can be - // pretty sure it won't be scheduled in parallel to this check. - // Therefore this pod does not block the PVC from deletion. - klog.V(4).Infof("Skipping unscheduled pod %s when checking PVC %s/%s", pod.Name, pvc.Namespace, pvc.Name) - continue - } - for _, volume := range pod.Spec.Volumes { - if volume.PersistentVolumeClaim == nil { - continue - } - if volume.PersistentVolumeClaim.ClaimName == pvc.Name { - klog.V(2).Infof("Keeping PVC %s/%s, it is used by pod %s/%s", pvc.Namespace, pvc.Name, pod.Namespace, pod.Name) - return true, nil - } + if podUsesPVC(pod, pvc.Name) { + return true, nil } } - klog.V(3).Infof("PVC %s/%s is unused", pvc.Namespace, pvc.Name) + klog.V(4).Infof("No Pod using PVC %s/%s was found in the Informer's cache", pvc.Namespace, pvc.Name) return false, nil } -// pvcAddedUpdated reacts to pvc added/updated/deleted events +func (c *Controller) askAPIServer(pvc *v1.PersistentVolumeClaim) (bool, error) { + klog.V(4).Infof("Looking for Pods using PVC %s/%s with a live list", pvc.Namespace, pvc.Name) + + podsList, err := c.client.CoreV1().Pods(pvc.Namespace).List(metav1.ListOptions{}) + if err != nil { + return false, fmt.Errorf("Live list of pods failed: %s", err.Error()) + } + + for _, pod := range podsList.Items { + if podUsesPVC(&pod, pvc.Name) { + return true, nil + } + } + + klog.V(2).Infof("PVC %s/%s is unused", pvc.Namespace, pvc.Name) + return false, nil +} + +func podUsesPVC(pod *v1.Pod, pvc string) bool { + // Check whether pvc is used by pod only if pod is scheduled, because + // kubelet sees pods after they have been scheduled and it won't allow + // starting a pod referencing a PVC with a non-nil deletionTimestamp. + if pod.Spec.NodeName != "" { + for _, volume := range pod.Spec.Volumes { + if volume.PersistentVolumeClaim != nil && volume.PersistentVolumeClaim.ClaimName == pvc { + klog.V(2).Infof("Pod %s/%s uses PVC %s", pod.Namespace, pod.Name, pvc) + return true + } + } + } + return false +} + +// pvcAddedUpdated reacts to pvc added/updated events func (c *Controller) pvcAddedUpdated(obj interface{}) { pvc, ok := obj.(*v1.PersistentVolumeClaim) if !ok { utilruntime.HandleError(fmt.Errorf("PVC informer returned non-PVC object: %#v", obj)) return } - key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pvc) + key, err := cache.MetaNamespaceKeyFunc(pvc) if err != nil { utilruntime.HandleError(fmt.Errorf("Couldn't get key for Persistent Volume Claim %#v: %v", pvc, err)) return diff --git a/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go b/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go index f65b55f75ec..c57037eff5d 100644 --- a/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go +++ b/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go @@ -147,16 +147,30 @@ func generateUpdateErrorFunc(t *testing.T, failures int) clienttesting.ReactionF } func TestPVCProtectionController(t *testing.T) { - pvcVer := schema.GroupVersionResource{ + pvcGVR := schema.GroupVersionResource{ Group: v1.GroupName, Version: "v1", Resource: "persistentvolumeclaims", } + podGVR := schema.GroupVersionResource{ + Group: v1.GroupName, + Version: "v1", + Resource: "pods", + } + podGVK := schema.GroupVersionKind{ + Group: v1.GroupName, + Version: "v1", + Kind: "Pod", + } tests := []struct { name string // Object to insert into fake kubeclient before the test starts. initialObjects []runtime.Object + // Whether not to insert the content of initialObjects into the + // informers before the test starts. Set it to true to simulate the case + // where informers have not been notified yet of certain API objects. + informersAreLate bool // Optional client reactors. reactors []reaction // PVC event to simulate. This PVC will be automatically added to @@ -180,7 +194,7 @@ func TestPVCProtectionController(t *testing.T) { name: "StorageObjectInUseProtection Enabled, PVC without finalizer -> finalizer is added", updatedPVC: pvc(), expectedActions: []clienttesting.Action{ - clienttesting.NewUpdateAction(pvcVer, defaultNS, withProtectionFinalizer(pvc())), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, withProtectionFinalizer(pvc())), }, storageObjectInUseProtectionEnabled: true, }, @@ -208,11 +222,11 @@ func TestPVCProtectionController(t *testing.T) { }, expectedActions: []clienttesting.Action{ // This fails - clienttesting.NewUpdateAction(pvcVer, defaultNS, withProtectionFinalizer(pvc())), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, withProtectionFinalizer(pvc())), // This fails too - clienttesting.NewUpdateAction(pvcVer, defaultNS, withProtectionFinalizer(pvc())), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, withProtectionFinalizer(pvc())), // This succeeds - clienttesting.NewUpdateAction(pvcVer, defaultNS, withProtectionFinalizer(pvc())), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, withProtectionFinalizer(pvc())), }, storageObjectInUseProtectionEnabled: true, }, @@ -220,7 +234,8 @@ func TestPVCProtectionController(t *testing.T) { name: "StorageObjectInUseProtection Enabled, deleted PVC with finalizer -> finalizer is removed", updatedPVC: deleted(withProtectionFinalizer(pvc())), expectedActions: []clienttesting.Action{ - clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())), }, storageObjectInUseProtectionEnabled: true, }, @@ -228,7 +243,8 @@ func TestPVCProtectionController(t *testing.T) { name: "StorageObjectInUseProtection Disabled, deleted PVC with finalizer -> finalizer is removed", updatedPVC: deleted(withProtectionFinalizer(pvc())), expectedActions: []clienttesting.Action{ - clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())), }, storageObjectInUseProtectionEnabled: false, }, @@ -243,17 +259,20 @@ func TestPVCProtectionController(t *testing.T) { }, }, expectedActions: []clienttesting.Action{ + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), // Fails - clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())), + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), // Fails too - clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())), + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), // Succeeds - clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())), }, storageObjectInUseProtectionEnabled: true, }, { - name: "deleted PVC with finalizer + pods with the PVC exists -> finalizer is not removed", + name: "deleted PVC with finalizer + pod with the PVC exists -> finalizer is not removed", initialObjects: []runtime.Object{ withPVC(defaultPVCName, pod()), }, @@ -261,18 +280,19 @@ func TestPVCProtectionController(t *testing.T) { expectedActions: []clienttesting.Action{}, }, { - name: "deleted PVC with finalizer + pods with unrelated PVC and EmptyDir exists -> finalizer is removed", + name: "deleted PVC with finalizer + pod with unrelated PVC and EmptyDir exists -> finalizer is removed", initialObjects: []runtime.Object{ withEmptyDir(withPVC("unrelatedPVC", pod())), }, updatedPVC: deleted(withProtectionFinalizer(pvc())), expectedActions: []clienttesting.Action{ - clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())), }, storageObjectInUseProtectionEnabled: true, }, { - name: "deleted PVC with finalizer + pods with the PVC finished but is not deleted -> finalizer is not removed", + name: "deleted PVC with finalizer + pod with the PVC finished but is not deleted -> finalizer is not removed", initialObjects: []runtime.Object{ withStatus(v1.PodFailed, withPVC(defaultPVCName, pod())), }, @@ -280,6 +300,18 @@ func TestPVCProtectionController(t *testing.T) { expectedActions: []clienttesting.Action{}, storageObjectInUseProtectionEnabled: true, }, + { + name: "deleted PVC with finalizer + pod with the PVC exists but is not in the Informer's cache yet -> finalizer is not removed", + initialObjects: []runtime.Object{ + withPVC(defaultPVCName, pod()), + }, + informersAreLate: true, + updatedPVC: deleted(withProtectionFinalizer(pvc())), + expectedActions: []clienttesting.Action{ + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), + }, + storageObjectInUseProtectionEnabled: true, + }, // // Pod events // @@ -308,7 +340,8 @@ func TestPVCProtectionController(t *testing.T) { }, updatedPod: unscheduled(withPVC(defaultPVCName, pod())), expectedActions: []clienttesting.Action{ - clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())), }, storageObjectInUseProtectionEnabled: true, }, @@ -319,7 +352,8 @@ func TestPVCProtectionController(t *testing.T) { }, deletedPod: withStatus(v1.PodRunning, withPVC(defaultPVCName, pod())), expectedActions: []clienttesting.Action{ - clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())), }, storageObjectInUseProtectionEnabled: true, }, @@ -331,7 +365,8 @@ func TestPVCProtectionController(t *testing.T) { deletedPod: withPVC(defaultPVCName, pod()), updatedPod: withUID("uid2", pod()), expectedActions: []clienttesting.Action{ - clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())), }, storageObjectInUseProtectionEnabled: true, }, @@ -368,15 +403,26 @@ func TestPVCProtectionController(t *testing.T) { } for _, test := range tests { - // Create client with initial data - objs := test.initialObjects + // Create initial data for client and informers. + var ( + clientObjs []runtime.Object + informersObjs []runtime.Object + ) if test.updatedPVC != nil { - objs = append(objs, test.updatedPVC) + clientObjs = append(clientObjs, test.updatedPVC) + informersObjs = append(informersObjs, test.updatedPVC) } if test.updatedPod != nil { - objs = append(objs, test.updatedPod) + clientObjs = append(clientObjs, test.updatedPod) + informersObjs = append(informersObjs, test.updatedPod) } - client := fake.NewSimpleClientset(objs...) + clientObjs = append(clientObjs, test.initialObjects...) + if !test.informersAreLate { + informersObjs = append(informersObjs, test.initialObjects...) + } + + // Create client with initial data + client := fake.NewSimpleClientset(clientObjs...) // Create informers informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) @@ -385,7 +431,7 @@ func TestPVCProtectionController(t *testing.T) { // Populate the informers with initial objects so the controller can // Get() and List() it. - for _, obj := range objs { + for _, obj := range informersObjs { switch obj.(type) { case *v1.PersistentVolumeClaim: pvcInformer.Informer().GetStore().Add(obj) @@ -435,7 +481,7 @@ func TestPVCProtectionController(t *testing.T) { } currentActionCount := len(client.Actions()) if currentActionCount < len(test.expectedActions) { - // Do not log evey wait, only when the action count changes. + // Do not log every wait, only when the action count changes. if lastReportedActionCount < currentActionCount { klog.V(5).Infof("Test %q: got %d actions out of %d, waiting for the rest", test.name, currentActionCount, len(test.expectedActions)) lastReportedActionCount = currentActionCount From 1371490bd687503bf9332eaf125692972b2c3132 Mon Sep 17 00:00:00 2001 From: matte21 Date: Wed, 7 Aug 2019 21:56:18 +0200 Subject: [PATCH 2/2] Lowercase first letter of chainable errors in PVC protection controller Lowercase first letter of error messages that are not printed right away to ease chaining with other error messages in PVC protection controller. --- .../pvcprotection/pvc_protection_controller.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/controller/volume/pvcprotection/pvc_protection_controller.go b/pkg/controller/volume/pvcprotection/pvc_protection_controller.go index 3b6aa13f85d..b6f964cc69c 100644 --- a/pkg/controller/volume/pvcprotection/pvc_protection_controller.go +++ b/pkg/controller/volume/pvcprotection/pvc_protection_controller.go @@ -127,7 +127,7 @@ func (c *Controller) processNextWorkItem() bool { pvcNamespace, pvcName, err := cache.SplitMetaNamespaceKey(pvcKey.(string)) if err != nil { - utilruntime.HandleError(fmt.Errorf("Error parsing PVC key %q: %v", pvcKey, err)) + utilruntime.HandleError(fmt.Errorf("error parsing PVC key %q: %v", pvcKey, err)) return true } @@ -233,7 +233,7 @@ func (c *Controller) askInformer(pvc *v1.PersistentVolumeClaim) (bool, error) { pods, err := c.podLister.Pods(pvc.Namespace).List(labels.Everything()) if err != nil { - return false, fmt.Errorf("Cache-based list of pods failed while processing %s/%s: %s", pvc.Namespace, pvc.Name, err.Error()) + return false, fmt.Errorf("cache-based list of pods failed while processing %s/%s: %s", pvc.Namespace, pvc.Name, err.Error()) } for _, pod := range pods { @@ -251,7 +251,7 @@ func (c *Controller) askAPIServer(pvc *v1.PersistentVolumeClaim) (bool, error) { podsList, err := c.client.CoreV1().Pods(pvc.Namespace).List(metav1.ListOptions{}) if err != nil { - return false, fmt.Errorf("Live list of pods failed: %s", err.Error()) + return false, fmt.Errorf("live list of pods failed: %s", err.Error()) } for _, pod := range podsList.Items { @@ -288,7 +288,7 @@ func (c *Controller) pvcAddedUpdated(obj interface{}) { } key, err := cache.MetaNamespaceKeyFunc(pvc) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for Persistent Volume Claim %#v: %v", pvc, err)) + utilruntime.HandleError(fmt.Errorf("couldn't get key for Persistent Volume Claim %#v: %v", pvc, err)) return } klog.V(4).Infof("Got event on PVC %s", key) @@ -322,12 +322,12 @@ func (*Controller) parsePod(obj interface{}) *v1.Pod { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) return nil } pod, ok = tombstone.Obj.(*v1.Pod) if !ok { - utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Pod %#v", obj)) + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Pod %#v", obj)) return nil } }