From 5cafb9042b100c6d6614a46f09e3f7410337627a Mon Sep 17 00:00:00 2001 From: NickrenREN Date: Thu, 23 Feb 2017 19:34:41 +0800 Subject: [PATCH] find and add active pods for dswp loops through the list of active pods and ensures that each one exists in the desired state of the world cache --- hack/.linted_packages | 1 + pkg/controller/volume/attachdetach/BUILD | 2 + .../attachdetach/attach_detach_controller.go | 237 ++-------------- .../volume/attachdetach/populator/BUILD | 22 ++ .../desired_state_of_world_populator.go | 66 ++++- .../desired_state_of_world_populator_test.go | 137 ++++++++++ pkg/controller/volume/attachdetach/util/BUILD | 37 +++ .../volume/attachdetach/util/util.go | 252 ++++++++++++++++++ .../desired_state_of_world_populator.go | 13 +- pkg/volume/util/volumehelper/volumehelper.go | 16 ++ test/integration/volume/attach_detach_test.go | 86 ++++++ 11 files changed, 626 insertions(+), 243 deletions(-) create mode 100644 pkg/controller/volume/attachdetach/populator/desired_state_of_world_populator_test.go create mode 100644 pkg/controller/volume/attachdetach/util/BUILD create mode 100644 pkg/controller/volume/attachdetach/util/util.go diff --git a/hack/.linted_packages b/hack/.linted_packages index 1ab1f0a5610..2c60a035b92 100644 --- a/hack/.linted_packages +++ b/hack/.linted_packages @@ -177,6 +177,7 @@ pkg/cloudprovider/providers/cloudstack pkg/controller/volume/attachdetach/cache pkg/controller/volume/attachdetach/populator pkg/controller/volume/attachdetach/reconciler +pkg/controller/volume/attachdetach/util pkg/conversion pkg/conversion/queryparams pkg/credentialprovider/aws diff --git a/pkg/controller/volume/attachdetach/BUILD b/pkg/controller/volume/attachdetach/BUILD index cbe1e7f99bf..75b4c7f1ec9 100644 --- a/pkg/controller/volume/attachdetach/BUILD +++ b/pkg/controller/volume/attachdetach/BUILD @@ -24,6 +24,7 @@ go_library( "//pkg/controller/volume/attachdetach/populator:go_default_library", "//pkg/controller/volume/attachdetach/reconciler:go_default_library", "//pkg/controller/volume/attachdetach/statusupdater:go_default_library", + "//pkg/controller/volume/attachdetach/util:go_default_library", "//pkg/util/io:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/volume:go_default_library", @@ -67,6 +68,7 @@ filegroup( "//pkg/controller/volume/attachdetach/reconciler:all-srcs", "//pkg/controller/volume/attachdetach/statusupdater:all-srcs", "//pkg/controller/volume/attachdetach/testing:all-srcs", + "//pkg/controller/volume/attachdetach/util:all-srcs", ], tags = ["automanaged"], ) diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index 8e0260561ea..1574e991783 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -41,6 +41,7 @@ import ( "k8s.io/kubernetes/pkg/controller/volume/attachdetach/populator" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/reconciler" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater" + "k8s.io/kubernetes/pkg/controller/volume/attachdetach/util" "k8s.io/kubernetes/pkg/util/io" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" @@ -62,6 +63,11 @@ const ( // desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the // DesiredStateOfWorldPopulator loop waits between successive executions desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 1 * time.Minute + + // desiredStateOfWorldPopulatorListPodsRetryDuration is the amount of + // time the DesiredStateOfWorldPopulator loop waits between list pods + // calls. + desiredStateOfWorldPopulatorListPodsRetryDuration time.Duration = 3 * time.Minute ) // AttachDetachController defines the operations supported by this controller. @@ -137,8 +143,12 @@ func NewAttachDetachController( adc.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator( desiredStateOfWorldPopulatorLoopSleepPeriod, + desiredStateOfWorldPopulatorListPodsRetryDuration, podInformer.Lister(), - adc.desiredStateOfWorld) + adc.desiredStateOfWorld, + &adc.volumePluginMgr, + pvcInformer.Lister(), + pvInformer.Lister()) podInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{ AddFunc: adc.podAdd, @@ -245,7 +255,8 @@ func (adc *attachDetachController) podAdd(obj interface{}) { return } - adc.processPodVolumes(pod, true /* addVolumes */) + util.ProcessPodVolumes(pod, true, /* addVolumes */ + adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister) } // GetDesiredStateOfWorld returns desired state of world associated with controller @@ -264,7 +275,8 @@ func (adc *attachDetachController) podDelete(obj interface{}) { return } - adc.processPodVolumes(pod, false /* addVolumes */) + util.ProcessPodVolumes(pod, false, /* addVolumes */ + adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister) } func (adc *attachDetachController) nodeAdd(obj interface{}) { @@ -313,225 +325,6 @@ func (adc *attachDetachController) nodeDelete(obj interface{}) { adc.processVolumesInUse(nodeName, node.Status.VolumesInUse) } -// processPodVolumes processes the volumes in the given pod and adds them to the -// desired state of the world if addVolumes is true, otherwise it removes them. -func (adc *attachDetachController) processPodVolumes( - pod *v1.Pod, addVolumes bool) { - if pod == nil { - return - } - - if len(pod.Spec.Volumes) <= 0 { - return - } - - nodeName := types.NodeName(pod.Spec.NodeName) - - if !adc.desiredStateOfWorld.NodeExists(nodeName) { - // If the node the pod is scheduled to does not exist in the desired - // state of the world data structure, that indicates the node is not - // yet managed by the controller. Therefore, ignore the pod. - // If the node is added to the list of managed nodes in the future, - // future adds and updates to the pod will be processed. - glog.V(10).Infof( - "Skipping processing of pod %q/%q: it is scheduled to node %q which is not managed by the controller.", - pod.Namespace, - pod.Name, - nodeName) - return - } - - // Process volume spec for each volume defined in pod - for _, podVolume := range pod.Spec.Volumes { - volumeSpec, err := adc.createVolumeSpec(podVolume, pod.Namespace) - if err != nil { - glog.V(10).Infof( - "Error processing volume %q for pod %q/%q: %v", - podVolume.Name, - pod.Namespace, - pod.Name, - err) - continue - } - - attachableVolumePlugin, err := - adc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec) - if err != nil || attachableVolumePlugin == nil { - glog.V(10).Infof( - "Skipping volume %q for pod %q/%q: it does not implement attacher interface. err=%v", - podVolume.Name, - pod.Namespace, - pod.Name, - err) - continue - } - - uniquePodName := volumehelper.GetUniquePodName(pod) - if addVolumes { - // Add volume to desired state of world - _, err := adc.desiredStateOfWorld.AddPod( - uniquePodName, pod, volumeSpec, nodeName) - if err != nil { - glog.V(10).Infof( - "Failed to add volume %q for pod %q/%q to desiredStateOfWorld. %v", - podVolume.Name, - pod.Namespace, - pod.Name, - err) - } - - } else { - // Remove volume from desired state of world - uniqueVolumeName, err := volumehelper.GetUniqueVolumeNameFromSpec( - attachableVolumePlugin, volumeSpec) - if err != nil { - glog.V(10).Infof( - "Failed to delete volume %q for pod %q/%q from desiredStateOfWorld. GetUniqueVolumeNameFromSpec failed with %v", - podVolume.Name, - pod.Namespace, - pod.Name, - err) - continue - } - adc.desiredStateOfWorld.DeletePod( - uniquePodName, uniqueVolumeName, nodeName) - } - } - - return -} - -// createVolumeSpec creates and returns a mutatable volume.Spec object for the -// specified volume. It dereference any PVC to get PV objects, if needed. -func (adc *attachDetachController) createVolumeSpec( - podVolume v1.Volume, podNamespace string) (*volume.Spec, error) { - if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil { - glog.V(10).Infof( - "Found PVC, ClaimName: %q/%q", - podNamespace, - pvcSource.ClaimName) - - // If podVolume is a PVC, fetch the real PV behind the claim - pvName, pvcUID, err := adc.getPVCFromCacheExtractPV( - podNamespace, pvcSource.ClaimName) - if err != nil { - return nil, fmt.Errorf( - "error processing PVC %q/%q: %v", - podNamespace, - pvcSource.ClaimName, - err) - } - - glog.V(10).Infof( - "Found bound PV for PVC (ClaimName %q/%q pvcUID %v): pvName=%q", - podNamespace, - pvcSource.ClaimName, - pvcUID, - pvName) - - // Fetch actual PV object - volumeSpec, err := adc.getPVSpecFromCache( - pvName, pvcSource.ReadOnly, pvcUID) - if err != nil { - return nil, fmt.Errorf( - "error processing PVC %q/%q: %v", - podNamespace, - pvcSource.ClaimName, - err) - } - - glog.V(10).Infof( - "Extracted volumeSpec (%v) from bound PV (pvName %q) and PVC (ClaimName %q/%q pvcUID %v)", - volumeSpec.Name, - pvName, - podNamespace, - pvcSource.ClaimName, - pvcUID) - - return volumeSpec, nil - } - - // Do not return the original volume object, since it's from the shared - // informer it may be mutated by another consumer. - clonedPodVolumeObj, err := api.Scheme.DeepCopy(&podVolume) - if err != nil || clonedPodVolumeObj == nil { - return nil, fmt.Errorf( - "failed to deep copy %q volume object. err=%v", podVolume.Name, err) - } - - clonedPodVolume, ok := clonedPodVolumeObj.(*v1.Volume) - if !ok { - return nil, fmt.Errorf("failed to cast clonedPodVolume %#v to v1.Volume", clonedPodVolumeObj) - } - - return volume.NewSpecFromVolume(clonedPodVolume), nil -} - -// getPVCFromCacheExtractPV fetches the PVC object with the given namespace and -// name from the shared internal PVC store extracts the name of the PV it is -// pointing to and returns it. -// This method returns an error if a PVC object does not exist in the cache -// with the given namespace/name. -// This method returns an error if the PVC object's phase is not "Bound". -func (adc *attachDetachController) getPVCFromCacheExtractPV(namespace string, name string) (string, types.UID, error) { - pvc, err := adc.pvcLister.PersistentVolumeClaims(namespace).Get(name) - if err != nil { - return "", "", fmt.Errorf("failed to find PVC %s/%s in PVCInformer cache: %v", namespace, name, err) - } - - if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" { - return "", "", fmt.Errorf( - "PVC %s/%s has non-bound phase (%q) or empty pvc.Spec.VolumeName (%q)", - namespace, - name, - pvc.Status.Phase, - pvc.Spec.VolumeName) - } - - return pvc.Spec.VolumeName, pvc.UID, nil -} - -// getPVSpecFromCache fetches the PV object with the given name from the shared -// internal PV store and returns a volume.Spec representing it. -// This method returns an error if a PV object does not exist in the cache with -// the given name. -// This method deep copies the PV object so the caller may use the returned -// volume.Spec object without worrying about it mutating unexpectedly. -func (adc *attachDetachController) getPVSpecFromCache(name string, pvcReadOnly bool, expectedClaimUID types.UID) (*volume.Spec, error) { - pv, err := adc.pvLister.Get(name) - if err != nil { - return nil, fmt.Errorf("failed to find PV %q in PVInformer cache: %v", name, err) - } - - if pv.Spec.ClaimRef == nil { - return nil, fmt.Errorf( - "found PV object %q but it has a nil pv.Spec.ClaimRef indicating it is not yet bound to the claim", - name) - } - - if pv.Spec.ClaimRef.UID != expectedClaimUID { - return nil, fmt.Errorf( - "found PV object %q but its pv.Spec.ClaimRef.UID (%q) does not point to claim.UID (%q)", - name, - pv.Spec.ClaimRef.UID, - expectedClaimUID) - } - - // Do not return the object from the informer, since the store is shared it - // may be mutated by another consumer. - clonedPVObj, err := api.Scheme.DeepCopy(pv) - if err != nil || clonedPVObj == nil { - return nil, fmt.Errorf("failed to deep copy %q PV object. err=%v", name, err) - } - - clonedPV, ok := clonedPVObj.(*v1.PersistentVolume) - if !ok { - return nil, fmt.Errorf("failed to cast %q clonedPV %#v to PersistentVolume", name, pv) - } - - return volume.NewSpecFromPersistentVolume(clonedPV, pvcReadOnly), nil -} - // processVolumesInUse processes the list of volumes marked as "in-use" // according to the specified Node's Status.VolumesInUse and updates the // corresponding volume in the actual state of the world to indicate that it is diff --git a/pkg/controller/volume/attachdetach/populator/BUILD b/pkg/controller/volume/attachdetach/populator/BUILD index 319bbe113b8..3b956d84b6c 100644 --- a/pkg/controller/volume/attachdetach/populator/BUILD +++ b/pkg/controller/volume/attachdetach/populator/BUILD @@ -5,6 +5,7 @@ licenses(["notice"]) load( "@io_bazel_rules_go//go:def.bzl", "go_library", + "go_test", ) go_library( @@ -14,9 +15,12 @@ go_library( deps = [ "//pkg/client/listers/core/v1:go_default_library", "//pkg/controller/volume/attachdetach/cache:go_default_library", + "//pkg/controller/volume/attachdetach/util:go_default_library", + "//pkg/volume:go_default_library", "//pkg/volume/util/volumehelper:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", @@ -35,3 +39,21 @@ filegroup( srcs = [":package-srcs"], tags = ["automanaged"], ) + +go_test( + name = "go_default_test", + srcs = ["desired_state_of_world_populator_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/api/v1:go_default_library", + "//pkg/client/clientset_generated/clientset/fake:go_default_library", + "//pkg/client/informers/informers_generated/externalversions:go_default_library", + "//pkg/controller:go_default_library", + "//pkg/controller/volume/attachdetach/cache:go_default_library", + "//pkg/volume/testing:go_default_library", + "//pkg/volume/util/volumehelper:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + ], +) diff --git a/pkg/controller/volume/attachdetach/populator/desired_state_of_world_populator.go b/pkg/controller/volume/attachdetach/populator/desired_state_of_world_populator.go index 91bbaeae57c..25e95741d6c 100644 --- a/pkg/controller/volume/attachdetach/populator/desired_state_of_world_populator.go +++ b/pkg/controller/volume/attachdetach/populator/desired_state_of_world_populator.go @@ -25,17 +25,20 @@ import ( "github.com/golang/glog" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" kcache "k8s.io/client-go/tools/cache" corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" + "k8s.io/kubernetes/pkg/controller/volume/attachdetach/util" + "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util/volumehelper" ) // DesiredStateOfWorldPopulator periodically verifies that the pods in the -// desired state of th world still exist, if not, it removes them. -// TODO: it also loops through the list of active pods and ensures that +// desired state of the world still exist, if not, it removes them. +// It also loops through the list of active pods and ensures that // each one exists in the desired state of the world cache // if it has volumes. type DesiredStateOfWorldPopulator interface { @@ -50,19 +53,32 @@ type DesiredStateOfWorldPopulator interface { // desiredStateOfWorld - the cache to populate func NewDesiredStateOfWorldPopulator( loopSleepDuration time.Duration, + listPodsRetryDuration time.Duration, podLister corelisters.PodLister, - desiredStateOfWorld cache.DesiredStateOfWorld) DesiredStateOfWorldPopulator { + desiredStateOfWorld cache.DesiredStateOfWorld, + volumePluginMgr *volume.VolumePluginMgr, + pvcLister corelisters.PersistentVolumeClaimLister, + pvLister corelisters.PersistentVolumeLister) DesiredStateOfWorldPopulator { return &desiredStateOfWorldPopulator{ - loopSleepDuration: loopSleepDuration, - podLister: podLister, - desiredStateOfWorld: desiredStateOfWorld, + loopSleepDuration: loopSleepDuration, + listPodsRetryDuration: listPodsRetryDuration, + podLister: podLister, + desiredStateOfWorld: desiredStateOfWorld, + volumePluginMgr: volumePluginMgr, + pvcLister: pvcLister, + pvLister: pvLister, } } type desiredStateOfWorldPopulator struct { - loopSleepDuration time.Duration - podLister corelisters.PodLister - desiredStateOfWorld cache.DesiredStateOfWorld + loopSleepDuration time.Duration + podLister corelisters.PodLister + desiredStateOfWorld cache.DesiredStateOfWorld + volumePluginMgr *volume.VolumePluginMgr + pvcLister corelisters.PersistentVolumeClaimLister + pvLister corelisters.PersistentVolumeLister + listPodsRetryDuration time.Duration + timeOfLastListPods time.Time } func (dswp *desiredStateOfWorldPopulator) Run(stopCh <-chan struct{}) { @@ -72,6 +88,18 @@ func (dswp *desiredStateOfWorldPopulator) Run(stopCh <-chan struct{}) { func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc() func() { return func() { dswp.findAndRemoveDeletedPods() + + // findAndAddActivePods is called periodically, independently of the main + // populator loop. + if time.Since(dswp.timeOfLastListPods) < dswp.listPodsRetryDuration { + glog.V(5).Infof( + "Skipping findAndAddActivePods(). Not permitted until %v (listPodsRetryDuration %v).", + dswp.timeOfLastListPods.Add(dswp.listPodsRetryDuration), + dswp.listPodsRetryDuration) + + return + } + dswp.findAndAddActivePods() } } @@ -113,3 +141,23 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() { dswp.desiredStateOfWorld.DeletePod(dswPodUID, dswPodToAdd.VolumeName, dswPodToAdd.NodeName) } } + +func (dswp *desiredStateOfWorldPopulator) findAndAddActivePods() { + pods, err := dswp.podLister.List(labels.Everything()) + if err != nil { + glog.Errorf("podLister List failed: %v", err) + return + } + dswp.timeOfLastListPods = time.Now() + + for _, pod := range pods { + if volumehelper.IsPodTerminated(pod, pod.Status) { + // Do not add volumes for terminated pods + continue + } + util.ProcessPodVolumes(pod, true, + dswp.desiredStateOfWorld, dswp.volumePluginMgr, dswp.pvcLister, dswp.pvLister) + + } + +} diff --git a/pkg/controller/volume/attachdetach/populator/desired_state_of_world_populator_test.go b/pkg/controller/volume/attachdetach/populator/desired_state_of_world_populator_test.go new file mode 100644 index 00000000000..d57d336055c --- /dev/null +++ b/pkg/controller/volume/attachdetach/populator/desired_state_of_world_populator_test.go @@ -0,0 +1,137 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package populator + +import ( + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8stypes "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" + volumetesting "k8s.io/kubernetes/pkg/volume/testing" + "k8s.io/kubernetes/pkg/volume/util/volumehelper" +) + +func TestFindAndAddActivePods_FindAndRemoveDeletedPods(t *testing.T) { + fakeVolumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) + fakeClient := &fake.Clientset{} + + fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, controller.NoResyncPeriodFunc()) + fakePodInformer := fakeInformerFactory.Core().V1().Pods() + + fakesDSW := cache.NewDesiredStateOfWorld(fakeVolumePluginMgr) + + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "dswp-test-pod", + UID: "dswp-test-pod-uid", + Namespace: "dswp-test", + }, + Spec: v1.PodSpec{ + NodeName: "dswp-test-host", + Volumes: []v1.Volume{ + { + Name: "dswp-test-volume-name", + VolumeSource: v1.VolumeSource{ + GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{ + PDName: "dswp-test-fake-device", + }, + }, + }, + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodPhase("Running"), + }, + } + + fakePodInformer.Informer().GetStore().Add(pod) + + podName := volumehelper.GetUniquePodName(pod) + + generatedVolumeName := "fake-plugin/" + pod.Spec.Volumes[0].Name + + pvcLister := fakeInformerFactory.Core().V1().PersistentVolumeClaims().Lister() + pvLister := fakeInformerFactory.Core().V1().PersistentVolumes().Lister() + + dswp := &desiredStateOfWorldPopulator{ + loopSleepDuration: 100 * time.Millisecond, + listPodsRetryDuration: 3 * time.Second, + desiredStateOfWorld: fakesDSW, + volumePluginMgr: fakeVolumePluginMgr, + podLister: fakePodInformer.Lister(), + pvcLister: pvcLister, + pvLister: pvLister, + } + + //add the given node to the list of nodes managed by dsw + dswp.desiredStateOfWorld.AddNode(k8stypes.NodeName(pod.Spec.NodeName)) + + dswp.findAndAddActivePods() + + expectedVolumeName := v1.UniqueVolumeName(generatedVolumeName) + + //check if the given volume referenced by the pod is added to dsw + volumeExists := dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName, k8stypes.NodeName(pod.Spec.NodeName)) + if !volumeExists { + t.Fatalf( + "VolumeExists(%q) failed. Expected: Actual: <%v>", + expectedVolumeName, + volumeExists) + } + + //delete the pod and volume manually + dswp.desiredStateOfWorld.DeletePod(podName, expectedVolumeName, k8stypes.NodeName(pod.Spec.NodeName)) + + //check if the given volume referenced by the pod still exists in dsw + volumeExists = dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName, k8stypes.NodeName(pod.Spec.NodeName)) + if volumeExists { + t.Fatalf( + "VolumeExists(%q) failed. Expected: Actual: <%v>", + expectedVolumeName, + volumeExists) + } + + //add pod and volume again + dswp.findAndAddActivePods() + + //check if the given volume referenced by the pod is added to dsw for the second time + volumeExists = dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName, k8stypes.NodeName(pod.Spec.NodeName)) + if !volumeExists { + t.Fatalf( + "VolumeExists(%q) failed. Expected: Actual: <%v>", + expectedVolumeName, + volumeExists) + } + + fakePodInformer.Informer().GetStore().Delete(pod) + dswp.findAndRemoveDeletedPods() + //check if the given volume referenced by the pod still exists in dsw + volumeExists = dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName, k8stypes.NodeName(pod.Spec.NodeName)) + if volumeExists { + t.Fatalf( + "VolumeExists(%q) failed. Expected: Actual: <%v>", + expectedVolumeName, + volumeExists) + } + +} diff --git a/pkg/controller/volume/attachdetach/util/BUILD b/pkg/controller/volume/attachdetach/util/BUILD new file mode 100644 index 00000000000..39c39359bac --- /dev/null +++ b/pkg/controller/volume/attachdetach/util/BUILD @@ -0,0 +1,37 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = ["util.go"], + tags = ["automanaged"], + deps = [ + "//pkg/api:go_default_library", + "//pkg/api/v1:go_default_library", + "//pkg/client/listers/core/v1:go_default_library", + "//pkg/controller/volume/attachdetach/cache:go_default_library", + "//pkg/volume:go_default_library", + "//pkg/volume/util/volumehelper:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/pkg/controller/volume/attachdetach/util/util.go b/pkg/controller/volume/attachdetach/util/util.go new file mode 100644 index 00000000000..ba898ac292a --- /dev/null +++ b/pkg/controller/volume/attachdetach/util/util.go @@ -0,0 +1,252 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/v1" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" + "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" + "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/util/volumehelper" +) + +// CreateVolumeSpec creates and returns a mutatable volume.Spec object for the +// specified volume. It dereference any PVC to get PV objects, if needed. +func CreateVolumeSpec(podVolume v1.Volume, podNamespace string, pvcLister corelisters.PersistentVolumeClaimLister, pvLister corelisters.PersistentVolumeLister) (*volume.Spec, error) { + if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil { + glog.V(10).Infof( + "Found PVC, ClaimName: %q/%q", + podNamespace, + pvcSource.ClaimName) + + // If podVolume is a PVC, fetch the real PV behind the claim + pvName, pvcUID, err := getPVCFromCacheExtractPV( + podNamespace, pvcSource.ClaimName, pvcLister) + if err != nil { + return nil, fmt.Errorf( + "error processing PVC %q/%q: %v", + podNamespace, + pvcSource.ClaimName, + err) + } + + glog.V(10).Infof( + "Found bound PV for PVC (ClaimName %q/%q pvcUID %v): pvName=%q", + podNamespace, + pvcSource.ClaimName, + pvcUID, + pvName) + + // Fetch actual PV object + volumeSpec, err := getPVSpecFromCache( + pvName, pvcSource.ReadOnly, pvcUID, pvLister) + if err != nil { + return nil, fmt.Errorf( + "error processing PVC %q/%q: %v", + podNamespace, + pvcSource.ClaimName, + err) + } + + glog.V(10).Infof( + "Extracted volumeSpec (%v) from bound PV (pvName %q) and PVC (ClaimName %q/%q pvcUID %v)", + volumeSpec.Name, + pvName, + podNamespace, + pvcSource.ClaimName, + pvcUID) + + return volumeSpec, nil + } + + // Do not return the original volume object, since it's from the shared + // informer it may be mutated by another consumer. + clonedPodVolumeObj, err := api.Scheme.DeepCopy(&podVolume) + if err != nil || clonedPodVolumeObj == nil { + return nil, fmt.Errorf( + "failed to deep copy %q volume object. err=%v", podVolume.Name, err) + } + + clonedPodVolume, ok := clonedPodVolumeObj.(*v1.Volume) + if !ok { + return nil, fmt.Errorf("failed to cast clonedPodVolume %#v to v1.Volume", clonedPodVolumeObj) + } + + return volume.NewSpecFromVolume(clonedPodVolume), nil +} + +// getPVCFromCacheExtractPV fetches the PVC object with the given namespace and +// name from the shared internal PVC store extracts the name of the PV it is +// pointing to and returns it. +// This method returns an error if a PVC object does not exist in the cache +// with the given namespace/name. +// This method returns an error if the PVC object's phase is not "Bound". +func getPVCFromCacheExtractPV(namespace string, name string, pvcLister corelisters.PersistentVolumeClaimLister) (string, types.UID, error) { + pvc, err := pvcLister.PersistentVolumeClaims(namespace).Get(name) + if err != nil { + return "", "", fmt.Errorf("failed to find PVC %s/%s in PVCInformer cache: %v", namespace, name, err) + } + + if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" { + return "", "", fmt.Errorf( + "PVC %s/%s has non-bound phase (%q) or empty pvc.Spec.VolumeName (%q)", + namespace, + name, + pvc.Status.Phase, + pvc.Spec.VolumeName) + } + + return pvc.Spec.VolumeName, pvc.UID, nil +} + +// getPVSpecFromCache fetches the PV object with the given name from the shared +// internal PV store and returns a volume.Spec representing it. +// This method returns an error if a PV object does not exist in the cache with +// the given name. +// This method deep copies the PV object so the caller may use the returned +// volume.Spec object without worrying about it mutating unexpectedly. +func getPVSpecFromCache(name string, pvcReadOnly bool, expectedClaimUID types.UID, pvLister corelisters.PersistentVolumeLister) (*volume.Spec, error) { + pv, err := pvLister.Get(name) + if err != nil { + return nil, fmt.Errorf("failed to find PV %q in PVInformer cache: %v", name, err) + } + + if pv.Spec.ClaimRef == nil { + return nil, fmt.Errorf( + "found PV object %q but it has a nil pv.Spec.ClaimRef indicating it is not yet bound to the claim", + name) + } + + if pv.Spec.ClaimRef.UID != expectedClaimUID { + return nil, fmt.Errorf( + "found PV object %q but its pv.Spec.ClaimRef.UID (%q) does not point to claim.UID (%q)", + name, + pv.Spec.ClaimRef.UID, + expectedClaimUID) + } + + // Do not return the object from the informer, since the store is shared it + // may be mutated by another consumer. + clonedPVObj, err := api.Scheme.DeepCopy(pv) + if err != nil || clonedPVObj == nil { + return nil, fmt.Errorf("failed to deep copy %q PV object. err=%v", name, err) + } + + clonedPV, ok := clonedPVObj.(*v1.PersistentVolume) + if !ok { + return nil, fmt.Errorf("failed to cast %q clonedPV %#v to PersistentVolume", name, pv) + } + + return volume.NewSpecFromPersistentVolume(clonedPV, pvcReadOnly), nil +} + +// ProcessPodVolumes processes the volumes in the given pod and adds them to the +// desired state of the world if addVolumes is true, otherwise it removes them. +func ProcessPodVolumes(pod *v1.Pod, addVolumes bool, desiredStateOfWorld cache.DesiredStateOfWorld, volumePluginMgr *volume.VolumePluginMgr, pvcLister corelisters.PersistentVolumeClaimLister, pvLister corelisters.PersistentVolumeLister) { + if pod == nil { + return + } + + if len(pod.Spec.Volumes) <= 0 { + glog.V(10).Infof("Skipping processing of pod %q/%q: it has no volumes.", + pod.Namespace, + pod.Name) + return + } + + nodeName := types.NodeName(pod.Spec.NodeName) + if nodeName == "" { + glog.V(10).Infof( + "Skipping processing of pod %q/%q: it is not scheduled to a node.", + pod.Namespace, + pod.Name) + return + } else if !desiredStateOfWorld.NodeExists(nodeName) { + // If the node the pod is scheduled to does not exist in the desired + // state of the world data structure, that indicates the node is not + // yet managed by the controller. Therefore, ignore the pod. + glog.V(10).Infof( + "Skipping processing of pod %q/%q: it is scheduled to node %q which is not managed by the controller.", + pod.Namespace, + pod.Name, + nodeName) + return + } + + // Process volume spec for each volume defined in pod + for _, podVolume := range pod.Spec.Volumes { + volumeSpec, err := CreateVolumeSpec(podVolume, pod.Namespace, pvcLister, pvLister) + if err != nil { + glog.V(10).Infof( + "Error processing volume %q for pod %q/%q: %v", + podVolume.Name, + pod.Namespace, + pod.Name, + err) + continue + } + + attachableVolumePlugin, err := + volumePluginMgr.FindAttachablePluginBySpec(volumeSpec) + if err != nil || attachableVolumePlugin == nil { + glog.V(10).Infof( + "Skipping volume %q for pod %q/%q: it does not implement attacher interface. err=%v", + podVolume.Name, + pod.Namespace, + pod.Name, + err) + continue + } + + uniquePodName := volumehelper.GetUniquePodName(pod) + if addVolumes { + // Add volume to desired state of world + _, err := desiredStateOfWorld.AddPod( + uniquePodName, pod, volumeSpec, nodeName) + if err != nil { + glog.V(10).Infof( + "Failed to add volume %q for pod %q/%q to desiredStateOfWorld. %v", + podVolume.Name, + pod.Namespace, + pod.Name, + err) + } + + } else { + // Remove volume from desired state of world + uniqueVolumeName, err := volumehelper.GetUniqueVolumeNameFromSpec( + attachableVolumePlugin, volumeSpec) + if err != nil { + glog.V(10).Infof( + "Failed to delete volume %q for pod %q/%q from desiredStateOfWorld. GetUniqueVolumeNameFromSpec failed with %v", + podVolume.Name, + pod.Namespace, + pod.Name, + err) + continue + } + desiredStateOfWorld.DeletePod( + uniquePodName, uniqueVolumeName, nodeName) + } + } + return +} diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go index 5854fbbe5cf..613cc9d2d97 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go @@ -143,18 +143,7 @@ func (dswp *desiredStateOfWorldPopulator) isPodTerminated(pod *v1.Pod) bool { if !found { podStatus = pod.Status } - return podStatus.Phase == v1.PodFailed || podStatus.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(podStatus.ContainerStatuses)) -} - -// notRunning returns true if every status is terminated or waiting, or the status list -// is empty. -func notRunning(statuses []v1.ContainerStatus) bool { - for _, status := range statuses { - if status.State.Terminated == nil && status.State.Waiting == nil { - return false - } - } - return true + return volumehelper.IsPodTerminated(pod, podStatus) } // Iterate through all pods and add to desired state of world if they don't diff --git a/pkg/volume/util/volumehelper/volumehelper.go b/pkg/volume/util/volumehelper/volumehelper.go index 7c51b7ed538..c55c8db60ec 100644 --- a/pkg/volume/util/volumehelper/volumehelper.go +++ b/pkg/volume/util/volumehelper/volumehelper.go @@ -87,3 +87,19 @@ func GetUniqueVolumeNameFromSpec( volumeName), nil } + +// IsPodTerminated checks if pod is terminated +func IsPodTerminated(pod *v1.Pod, podStatus v1.PodStatus) bool { + return podStatus.Phase == v1.PodFailed || podStatus.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(podStatus.ContainerStatuses)) +} + +// notRunning returns true if every status is terminated or waiting, or the status list +// is empty. +func notRunning(statuses []v1.ContainerStatus) bool { + for _, status := range statuses { + if status.State.Terminated == nil && status.State.Waiting == nil { + return false + } + } + return true +} diff --git a/test/integration/volume/attach_detach_test.go b/test/integration/volume/attach_detach_test.go index f24dcdd72e2..e0253dbd61f 100644 --- a/test/integration/volume/attach_detach_test.go +++ b/test/integration/volume/attach_detach_test.go @@ -222,3 +222,89 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy } return testClient, ctrl, informers } + +// Via integration test we can verify that if pod add +// event is somehow missed by AttachDetach controller - it still +// gets added by Desired State of World populator. +func TestPodAddedByDswp(t *testing.T) { + _, server := framework.RunAMaster(nil) + defer server.Close() + namespaceName := "test-pod-deletion" + + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-sandbox", + Annotations: map[string]string{ + volumehelper.ControllerManagedAttachAnnotation: "true", + }, + }, + } + + ns := framework.CreateTestingNamespace(namespaceName, server, t) + defer framework.DeleteTestingNamespace(ns, server, t) + + testClient, ctrl, informers := createAdClients(ns, t, server, defaultSyncPeriod) + + pod := fakePodWithVol(namespaceName) + podStopCh := make(chan struct{}) + + if _, err := testClient.Core().Nodes().Create(node); err != nil { + t.Fatalf("Failed to created node : %v", err) + } + + go informers.Core().V1().Nodes().Informer().Run(podStopCh) + + if _, err := testClient.Core().Pods(ns.Name).Create(pod); err != nil { + t.Errorf("Failed to create pod : %v", err) + } + + podInformer := informers.Core().V1().Pods().Informer() + go podInformer.Run(podStopCh) + + // start controller loop + stopCh := make(chan struct{}) + go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh) + go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh) + go ctrl.Run(stopCh) + + waitToObservePods(t, podInformer, 1) + podKey, err := cache.MetaNamespaceKeyFunc(pod) + if err != nil { + t.Fatalf("MetaNamespaceKeyFunc failed with : %v", err) + } + + _, _, err = podInformer.GetStore().GetByKey(podKey) + + if err != nil { + t.Fatalf("Pod not found in Pod Informer cache : %v", err) + } + + waitForPodsInDSWP(t, ctrl.GetDesiredStateOfWorld()) + + // let's stop pod events from getting triggered + close(podStopCh) + podObj, err := api.Scheme.DeepCopy(pod) + if err != nil { + t.Fatalf("Error copying pod : %v", err) + } + podNew, ok := podObj.(*v1.Pod) + if !ok { + t.Fatalf("Error converting pod : %v", err) + } + newPodName := "newFakepod" + podNew.SetName(newPodName) + err = podInformer.GetStore().Add(podNew) + if err != nil { + t.Fatalf("Error adding pod : %v", err) + } + + waitToObservePods(t, podInformer, 2) + // the findAndAddActivePods loop turns every 3 minute + time.Sleep(200 * time.Second) + podsToAdd := ctrl.GetDesiredStateOfWorld().GetPodToAdd() + if len(podsToAdd) != 2 { + t.Fatalf("DSW should have two pods") + } + + close(stopCh) +}