Merge pull request #80492 from matte21/pvc-protection-controller-fix-issue-75980

Add live list of pods to PVC protection controller to make sure it does not delete a PVC which is being used by a Pod
This commit is contained in:
Kubernetes Prow Robot 2019-08-16 05:06:44 -07:00 committed by GitHub
commit 7420bb2214
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 137 additions and 48 deletions

View File

@ -13,6 +13,7 @@ go_library(
"//pkg/volume/util:go_default_library", "//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",

View File

@ -22,6 +22,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors" apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
@ -126,7 +127,7 @@ func (c *Controller) processNextWorkItem() bool {
pvcNamespace, pvcName, err := cache.SplitMetaNamespaceKey(pvcKey.(string)) pvcNamespace, pvcName, err := cache.SplitMetaNamespaceKey(pvcKey.(string))
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("Error parsing PVC key %q: %v", pvcKey, err)) utilruntime.HandleError(fmt.Errorf("error parsing PVC key %q: %v", pvcKey, err))
return true return true
} }
@ -168,6 +169,7 @@ func (c *Controller) processPVC(pvcNamespace, pvcName string) error {
if !isUsed { if !isUsed {
return c.removeFinalizer(pvc) return c.removeFinalizer(pvc)
} }
klog.V(2).Infof("Keeping PVC %s/%s because it is still being used", pvc.Namespace, pvc.Name)
} }
if protectionutil.NeedToAddFinalizer(pvc, volumeutil.PVCProtectionFinalizer) { if protectionutil.NeedToAddFinalizer(pvc, volumeutil.PVCProtectionFinalizer) {
@ -209,44 +211,84 @@ func (c *Controller) removeFinalizer(pvc *v1.PersistentVolumeClaim) error {
} }
func (c *Controller) isBeingUsed(pvc *v1.PersistentVolumeClaim) (bool, error) { func (c *Controller) isBeingUsed(pvc *v1.PersistentVolumeClaim) (bool, error) {
// Look for a Pod using pvc in the Informer's cache. If one is found the
// correct decision to keep pvc is taken without doing an expensive live
// list.
if inUse, err := c.askInformer(pvc); err != nil {
// No need to return because a live list will follow.
klog.Error(err)
} else if inUse {
return true, nil
}
// Even if no Pod using pvc was found in the Informer's cache it doesn't
// mean such a Pod doesn't exist: it might just not be in the cache yet. To
// be 100% confident that it is safe to delete pvc make sure no Pod is using
// it among those returned by a live list.
return c.askAPIServer(pvc)
}
func (c *Controller) askInformer(pvc *v1.PersistentVolumeClaim) (bool, error) {
klog.V(4).Infof("Looking for Pods using PVC %s/%s in the Informer's cache", pvc.Namespace, pvc.Name)
pods, err := c.podLister.Pods(pvc.Namespace).List(labels.Everything()) pods, err := c.podLister.Pods(pvc.Namespace).List(labels.Everything())
if err != nil { if err != nil {
return false, err return false, fmt.Errorf("cache-based list of pods failed while processing %s/%s: %s", pvc.Namespace, pvc.Name, err.Error())
} }
for _, pod := range pods { for _, pod := range pods {
if pod.Spec.NodeName == "" { if podUsesPVC(pod, pvc.Name) {
// This pod is not scheduled. We have a predicated in scheduler that return true, nil
// prevents scheduling pods with deletion timestamp, so we can be
// pretty sure it won't be scheduled in parallel to this check.
// Therefore this pod does not block the PVC from deletion.
klog.V(4).Infof("Skipping unscheduled pod %s when checking PVC %s/%s", pod.Name, pvc.Namespace, pvc.Name)
continue
}
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
continue
}
if volume.PersistentVolumeClaim.ClaimName == pvc.Name {
klog.V(2).Infof("Keeping PVC %s/%s, it is used by pod %s/%s", pvc.Namespace, pvc.Name, pod.Namespace, pod.Name)
return true, nil
}
} }
} }
klog.V(3).Infof("PVC %s/%s is unused", pvc.Namespace, pvc.Name) klog.V(4).Infof("No Pod using PVC %s/%s was found in the Informer's cache", pvc.Namespace, pvc.Name)
return false, nil return false, nil
} }
// pvcAddedUpdated reacts to pvc added/updated/deleted events func (c *Controller) askAPIServer(pvc *v1.PersistentVolumeClaim) (bool, error) {
klog.V(4).Infof("Looking for Pods using PVC %s/%s with a live list", pvc.Namespace, pvc.Name)
podsList, err := c.client.CoreV1().Pods(pvc.Namespace).List(metav1.ListOptions{})
if err != nil {
return false, fmt.Errorf("live list of pods failed: %s", err.Error())
}
for _, pod := range podsList.Items {
if podUsesPVC(&pod, pvc.Name) {
return true, nil
}
}
klog.V(2).Infof("PVC %s/%s is unused", pvc.Namespace, pvc.Name)
return false, nil
}
func podUsesPVC(pod *v1.Pod, pvc string) bool {
// Check whether pvc is used by pod only if pod is scheduled, because
// kubelet sees pods after they have been scheduled and it won't allow
// starting a pod referencing a PVC with a non-nil deletionTimestamp.
if pod.Spec.NodeName != "" {
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim != nil && volume.PersistentVolumeClaim.ClaimName == pvc {
klog.V(2).Infof("Pod %s/%s uses PVC %s", pod.Namespace, pod.Name, pvc)
return true
}
}
}
return false
}
// pvcAddedUpdated reacts to pvc added/updated events
func (c *Controller) pvcAddedUpdated(obj interface{}) { func (c *Controller) pvcAddedUpdated(obj interface{}) {
pvc, ok := obj.(*v1.PersistentVolumeClaim) pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok { if !ok {
utilruntime.HandleError(fmt.Errorf("PVC informer returned non-PVC object: %#v", obj)) utilruntime.HandleError(fmt.Errorf("PVC informer returned non-PVC object: %#v", obj))
return return
} }
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pvc) key, err := cache.MetaNamespaceKeyFunc(pvc)
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for Persistent Volume Claim %#v: %v", pvc, err)) utilruntime.HandleError(fmt.Errorf("couldn't get key for Persistent Volume Claim %#v: %v", pvc, err))
return return
} }
klog.V(4).Infof("Got event on PVC %s", key) klog.V(4).Infof("Got event on PVC %s", key)
@ -280,12 +322,12 @@ func (*Controller) parsePod(obj interface{}) *v1.Pod {
if !ok { if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown) tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok { if !ok {
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return nil return nil
} }
pod, ok = tombstone.Obj.(*v1.Pod) pod, ok = tombstone.Obj.(*v1.Pod)
if !ok { if !ok {
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Pod %#v", obj)) utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Pod %#v", obj))
return nil return nil
} }
} }

View File

@ -147,16 +147,30 @@ func generateUpdateErrorFunc(t *testing.T, failures int) clienttesting.ReactionF
} }
func TestPVCProtectionController(t *testing.T) { func TestPVCProtectionController(t *testing.T) {
pvcVer := schema.GroupVersionResource{ pvcGVR := schema.GroupVersionResource{
Group: v1.GroupName, Group: v1.GroupName,
Version: "v1", Version: "v1",
Resource: "persistentvolumeclaims", Resource: "persistentvolumeclaims",
} }
podGVR := schema.GroupVersionResource{
Group: v1.GroupName,
Version: "v1",
Resource: "pods",
}
podGVK := schema.GroupVersionKind{
Group: v1.GroupName,
Version: "v1",
Kind: "Pod",
}
tests := []struct { tests := []struct {
name string name string
// Object to insert into fake kubeclient before the test starts. // Object to insert into fake kubeclient before the test starts.
initialObjects []runtime.Object initialObjects []runtime.Object
// Whether not to insert the content of initialObjects into the
// informers before the test starts. Set it to true to simulate the case
// where informers have not been notified yet of certain API objects.
informersAreLate bool
// Optional client reactors. // Optional client reactors.
reactors []reaction reactors []reaction
// PVC event to simulate. This PVC will be automatically added to // PVC event to simulate. This PVC will be automatically added to
@ -180,7 +194,7 @@ func TestPVCProtectionController(t *testing.T) {
name: "StorageObjectInUseProtection Enabled, PVC without finalizer -> finalizer is added", name: "StorageObjectInUseProtection Enabled, PVC without finalizer -> finalizer is added",
updatedPVC: pvc(), updatedPVC: pvc(),
expectedActions: []clienttesting.Action{ expectedActions: []clienttesting.Action{
clienttesting.NewUpdateAction(pvcVer, defaultNS, withProtectionFinalizer(pvc())), clienttesting.NewUpdateAction(pvcGVR, defaultNS, withProtectionFinalizer(pvc())),
}, },
storageObjectInUseProtectionEnabled: true, storageObjectInUseProtectionEnabled: true,
}, },
@ -208,11 +222,11 @@ func TestPVCProtectionController(t *testing.T) {
}, },
expectedActions: []clienttesting.Action{ expectedActions: []clienttesting.Action{
// This fails // This fails
clienttesting.NewUpdateAction(pvcVer, defaultNS, withProtectionFinalizer(pvc())), clienttesting.NewUpdateAction(pvcGVR, defaultNS, withProtectionFinalizer(pvc())),
// This fails too // This fails too
clienttesting.NewUpdateAction(pvcVer, defaultNS, withProtectionFinalizer(pvc())), clienttesting.NewUpdateAction(pvcGVR, defaultNS, withProtectionFinalizer(pvc())),
// This succeeds // This succeeds
clienttesting.NewUpdateAction(pvcVer, defaultNS, withProtectionFinalizer(pvc())), clienttesting.NewUpdateAction(pvcGVR, defaultNS, withProtectionFinalizer(pvc())),
}, },
storageObjectInUseProtectionEnabled: true, storageObjectInUseProtectionEnabled: true,
}, },
@ -220,7 +234,8 @@ func TestPVCProtectionController(t *testing.T) {
name: "StorageObjectInUseProtection Enabled, deleted PVC with finalizer -> finalizer is removed", name: "StorageObjectInUseProtection Enabled, deleted PVC with finalizer -> finalizer is removed",
updatedPVC: deleted(withProtectionFinalizer(pvc())), updatedPVC: deleted(withProtectionFinalizer(pvc())),
expectedActions: []clienttesting.Action{ expectedActions: []clienttesting.Action{
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
}, },
storageObjectInUseProtectionEnabled: true, storageObjectInUseProtectionEnabled: true,
}, },
@ -228,7 +243,8 @@ func TestPVCProtectionController(t *testing.T) {
name: "StorageObjectInUseProtection Disabled, deleted PVC with finalizer -> finalizer is removed", name: "StorageObjectInUseProtection Disabled, deleted PVC with finalizer -> finalizer is removed",
updatedPVC: deleted(withProtectionFinalizer(pvc())), updatedPVC: deleted(withProtectionFinalizer(pvc())),
expectedActions: []clienttesting.Action{ expectedActions: []clienttesting.Action{
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
}, },
storageObjectInUseProtectionEnabled: false, storageObjectInUseProtectionEnabled: false,
}, },
@ -243,17 +259,20 @@ func TestPVCProtectionController(t *testing.T) {
}, },
}, },
expectedActions: []clienttesting.Action{ expectedActions: []clienttesting.Action{
clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
// Fails // Fails
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
// Fails too // Fails too
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
// Succeeds // Succeeds
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
}, },
storageObjectInUseProtectionEnabled: true, storageObjectInUseProtectionEnabled: true,
}, },
{ {
name: "deleted PVC with finalizer + pods with the PVC exists -> finalizer is not removed", name: "deleted PVC with finalizer + pod with the PVC exists -> finalizer is not removed",
initialObjects: []runtime.Object{ initialObjects: []runtime.Object{
withPVC(defaultPVCName, pod()), withPVC(defaultPVCName, pod()),
}, },
@ -261,18 +280,19 @@ func TestPVCProtectionController(t *testing.T) {
expectedActions: []clienttesting.Action{}, expectedActions: []clienttesting.Action{},
}, },
{ {
name: "deleted PVC with finalizer + pods with unrelated PVC and EmptyDir exists -> finalizer is removed", name: "deleted PVC with finalizer + pod with unrelated PVC and EmptyDir exists -> finalizer is removed",
initialObjects: []runtime.Object{ initialObjects: []runtime.Object{
withEmptyDir(withPVC("unrelatedPVC", pod())), withEmptyDir(withPVC("unrelatedPVC", pod())),
}, },
updatedPVC: deleted(withProtectionFinalizer(pvc())), updatedPVC: deleted(withProtectionFinalizer(pvc())),
expectedActions: []clienttesting.Action{ expectedActions: []clienttesting.Action{
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
}, },
storageObjectInUseProtectionEnabled: true, storageObjectInUseProtectionEnabled: true,
}, },
{ {
name: "deleted PVC with finalizer + pods with the PVC finished but is not deleted -> finalizer is not removed", name: "deleted PVC with finalizer + pod with the PVC finished but is not deleted -> finalizer is not removed",
initialObjects: []runtime.Object{ initialObjects: []runtime.Object{
withStatus(v1.PodFailed, withPVC(defaultPVCName, pod())), withStatus(v1.PodFailed, withPVC(defaultPVCName, pod())),
}, },
@ -280,6 +300,18 @@ func TestPVCProtectionController(t *testing.T) {
expectedActions: []clienttesting.Action{}, expectedActions: []clienttesting.Action{},
storageObjectInUseProtectionEnabled: true, storageObjectInUseProtectionEnabled: true,
}, },
{
name: "deleted PVC with finalizer + pod with the PVC exists but is not in the Informer's cache yet -> finalizer is not removed",
initialObjects: []runtime.Object{
withPVC(defaultPVCName, pod()),
},
informersAreLate: true,
updatedPVC: deleted(withProtectionFinalizer(pvc())),
expectedActions: []clienttesting.Action{
clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
},
storageObjectInUseProtectionEnabled: true,
},
// //
// Pod events // Pod events
// //
@ -308,7 +340,8 @@ func TestPVCProtectionController(t *testing.T) {
}, },
updatedPod: unscheduled(withPVC(defaultPVCName, pod())), updatedPod: unscheduled(withPVC(defaultPVCName, pod())),
expectedActions: []clienttesting.Action{ expectedActions: []clienttesting.Action{
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
}, },
storageObjectInUseProtectionEnabled: true, storageObjectInUseProtectionEnabled: true,
}, },
@ -319,7 +352,8 @@ func TestPVCProtectionController(t *testing.T) {
}, },
deletedPod: withStatus(v1.PodRunning, withPVC(defaultPVCName, pod())), deletedPod: withStatus(v1.PodRunning, withPVC(defaultPVCName, pod())),
expectedActions: []clienttesting.Action{ expectedActions: []clienttesting.Action{
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
}, },
storageObjectInUseProtectionEnabled: true, storageObjectInUseProtectionEnabled: true,
}, },
@ -331,7 +365,8 @@ func TestPVCProtectionController(t *testing.T) {
deletedPod: withPVC(defaultPVCName, pod()), deletedPod: withPVC(defaultPVCName, pod()),
updatedPod: withUID("uid2", pod()), updatedPod: withUID("uid2", pod()),
expectedActions: []clienttesting.Action{ expectedActions: []clienttesting.Action{
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())), clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
}, },
storageObjectInUseProtectionEnabled: true, storageObjectInUseProtectionEnabled: true,
}, },
@ -368,15 +403,26 @@ func TestPVCProtectionController(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
// Create client with initial data // Create initial data for client and informers.
objs := test.initialObjects var (
clientObjs []runtime.Object
informersObjs []runtime.Object
)
if test.updatedPVC != nil { if test.updatedPVC != nil {
objs = append(objs, test.updatedPVC) clientObjs = append(clientObjs, test.updatedPVC)
informersObjs = append(informersObjs, test.updatedPVC)
} }
if test.updatedPod != nil { if test.updatedPod != nil {
objs = append(objs, test.updatedPod) clientObjs = append(clientObjs, test.updatedPod)
informersObjs = append(informersObjs, test.updatedPod)
} }
client := fake.NewSimpleClientset(objs...) clientObjs = append(clientObjs, test.initialObjects...)
if !test.informersAreLate {
informersObjs = append(informersObjs, test.initialObjects...)
}
// Create client with initial data
client := fake.NewSimpleClientset(clientObjs...)
// Create informers // Create informers
informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
@ -385,7 +431,7 @@ func TestPVCProtectionController(t *testing.T) {
// Populate the informers with initial objects so the controller can // Populate the informers with initial objects so the controller can
// Get() and List() it. // Get() and List() it.
for _, obj := range objs { for _, obj := range informersObjs {
switch obj.(type) { switch obj.(type) {
case *v1.PersistentVolumeClaim: case *v1.PersistentVolumeClaim:
pvcInformer.Informer().GetStore().Add(obj) pvcInformer.Informer().GetStore().Add(obj)
@ -435,7 +481,7 @@ func TestPVCProtectionController(t *testing.T) {
} }
currentActionCount := len(client.Actions()) currentActionCount := len(client.Actions())
if currentActionCount < len(test.expectedActions) { if currentActionCount < len(test.expectedActions) {
// Do not log evey wait, only when the action count changes. // Do not log every wait, only when the action count changes.
if lastReportedActionCount < currentActionCount { if lastReportedActionCount < currentActionCount {
klog.V(5).Infof("Test %q: got %d actions out of %d, waiting for the rest", test.name, currentActionCount, len(test.expectedActions)) klog.V(5).Infof("Test %q: got %d actions out of %d, waiting for the rest", test.name, currentActionCount, len(test.expectedActions))
lastReportedActionCount = currentActionCount lastReportedActionCount = currentActionCount