diff --git a/pkg/controller/volume/attach_detach_controller.go b/pkg/controller/volume/attach_detach_controller.go index 7eb32880164..63f615793e9 100644 --- a/pkg/controller/volume/attach_detach_controller.go +++ b/pkg/controller/volume/attach_detach_controller.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/volume/cache" + "k8s.io/kubernetes/pkg/controller/volume/populator" "k8s.io/kubernetes/pkg/controller/volume/reconciler" "k8s.io/kubernetes/pkg/controller/volume/statusupdater" "k8s.io/kubernetes/pkg/types" @@ -50,6 +51,10 @@ const ( // from its node. Once this time has expired, the controller will assume the // node or kubelet are unresponsive and will detach the volume anyway. reconcilerMaxWaitForUnmountDuration time.Duration = 3 * time.Minute + + // desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the + // DesiredStateOfWorldPopulator loop waits between successive executions + desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 5 * time.Minute ) // AttachDetachController defines the operations supported by this controller. @@ -119,6 +124,11 @@ func NewAttachDetachController( adc.attacherDetacher, adc.nodeStatusUpdater) + adc.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator( + desiredStateOfWorldPopulatorLoopSleepPeriod, + podInformer, + adc.desiredStateOfWorld) + return adc, nil } @@ -170,6 +180,10 @@ type attachDetachController struct { // nodeStatusUpdater is used to update node status with the list of attached // volumes nodeStatusUpdater statusupdater.NodeStatusUpdater + + // desiredStateOfWorldPopulator runs an asynchronous periodic loop to + // populate the current pods using podInformer. + desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator } func (adc *attachDetachController) Run(stopCh <-chan struct{}) { @@ -177,6 +191,7 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) { glog.Infof("Starting Attach Detach Controller") go adc.reconciler.Run(stopCh) + go adc.desiredStateOfWorldPopulator.Run(stopCh) <-stopCh glog.Infof("Shutting down Attach Detach Controller") @@ -300,7 +315,7 @@ func (adc *attachDetachController) processPodVolumes( if addVolumes { // Add volume to desired state of world _, err := adc.desiredStateOfWorld.AddPod( - uniquePodName, volumeSpec, pod.Spec.NodeName) + uniquePodName, pod, volumeSpec, pod.Spec.NodeName) if err != nil { glog.V(10).Infof( "Failed to add volume %q for pod %q/%q to desiredStateOfWorld. %v", diff --git a/pkg/controller/volume/cache/desired_state_of_world.go b/pkg/controller/volume/cache/desired_state_of_world.go index 0f17a1bf7f4..2543ecfed38 100644 --- a/pkg/controller/volume/cache/desired_state_of_world.go +++ b/pkg/controller/volume/cache/desired_state_of_world.go @@ -57,7 +57,7 @@ type DesiredStateOfWorld interface { // should be attached to the specified node, the volume is implicitly added. // If no node with the name nodeName exists in list of nodes managed by the // attach/detach attached controller, an error is returned. - AddPod(podName types.UniquePodName, volumeSpec *volume.Spec, nodeName string) (api.UniqueVolumeName, error) + AddPod(podName types.UniquePodName, pod *api.Pod, volumeSpec *volume.Spec, nodeName string) (api.UniqueVolumeName, error) // DeleteNode removes the given node from the list of nodes managed by the // attach/detach controller. @@ -90,6 +90,10 @@ type DesiredStateOfWorld interface { // and the nodes they should be attached to based on the current desired // state of the world. GetVolumesToAttach() []VolumeToAttach + + // GetPodToAdd generates and returns a map of pods based on the current desired + // state of world + GetPodToAdd() map[types.UniquePodName]PodToAdd } // VolumeToAttach represents a volume that should be attached to a node. @@ -97,6 +101,19 @@ type VolumeToAttach struct { operationexecutor.VolumeToAttach } +// PodToAdd represents a pod that references the underlying volume and is +// scheduled to the underlying node. +type PodToAdd struct { + // pod contains the api object of pod + Pod *api.Pod + + // volumeName contains the unique identifier for this volume. + VolumeName api.UniqueVolumeName + + // nodeName contains the name of this node. + NodeName string +} + // NewDesiredStateOfWorld returns a new instance of DesiredStateOfWorld. func NewDesiredStateOfWorld(volumePluginMgr *volume.VolumePluginMgr) DesiredStateOfWorld { return &desiredStateOfWorld{ @@ -119,7 +136,7 @@ type desiredStateOfWorld struct { // nodeManaged represents a node that is being managed by the attach/detach // controller. type nodeManaged struct { - // nodName contains the name of this node. + // nodeName contains the name of this node. nodeName string // volumesToAttach is a map containing the set of volumes that should be @@ -145,11 +162,14 @@ type volumeToAttach struct { scheduledPods map[types.UniquePodName]pod } -// The pod object represents a pod that references the underlying volume and is +// The pod represents a pod that references the underlying volume and is // scheduled to the underlying node. type pod struct { - // podName contains the name of this pod. + // podName contains the unique identifier for this pod podName types.UniquePodName + + // pod object contains the api object of pod + podObj *api.Pod } func (dsw *desiredStateOfWorld) AddNode(nodeName string) { @@ -166,6 +186,7 @@ func (dsw *desiredStateOfWorld) AddNode(nodeName string) { func (dsw *desiredStateOfWorld) AddPod( podName types.UniquePodName, + podToAdd *api.Pod, volumeSpec *volume.Spec, nodeName string) (api.UniqueVolumeName, error) { dsw.Lock() @@ -204,11 +225,11 @@ func (dsw *desiredStateOfWorld) AddPod( } dsw.nodesManaged[nodeName].volumesToAttach[volumeName] = volumeObj } - if _, podExists := volumeObj.scheduledPods[podName]; !podExists { dsw.nodesManaged[nodeName].volumesToAttach[volumeName].scheduledPods[podName] = pod{ podName: podName, + podObj: podToAdd, } } @@ -309,3 +330,22 @@ func (dsw *desiredStateOfWorld) GetVolumesToAttach() []VolumeToAttach { return volumesToAttach } + +func (dsw *desiredStateOfWorld) GetPodToAdd() map[types.UniquePodName]PodToAdd { + dsw.RLock() + defer dsw.RUnlock() + + pods := make(map[types.UniquePodName]PodToAdd) + for nodeName, nodeObj := range dsw.nodesManaged { + for volumeName, volumeObj := range nodeObj.volumesToAttach { + for podUID, pod := range volumeObj.scheduledPods { + pods[podUID] = PodToAdd{ + Pod: pod.podObj, + VolumeName: volumeName, + NodeName: nodeName, + } + } + } + } + return pods +} diff --git a/pkg/controller/volume/cache/desired_state_of_world_test.go b/pkg/controller/volume/cache/desired_state_of_world_test.go index f18e0745773..039d78c629b 100644 --- a/pkg/controller/volume/cache/desired_state_of_world_test.go +++ b/pkg/controller/volume/cache/desired_state_of_world_test.go @@ -87,9 +87,9 @@ func Test_AddNode_Positive_ExistingNode(t *testing.T) { // Verifies node/volume exists, and 1 volumes to attach. func Test_AddPod_Positive_NewPodNodeExistsVolumeDoesntExist(t *testing.T) { // Arrange + podName := "pod-uid" volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) dsw := NewDesiredStateOfWorld(volumePluginMgr) - podName := types.UniquePodName("pod-uid") volumeName := api.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := "node-name" @@ -103,7 +103,7 @@ func Test_AddPod_Positive_NewPodNodeExistsVolumeDoesntExist(t *testing.T) { } // Act - generatedVolumeName, podErr := dsw.AddPod(podName, volumeSpec, nodeName) + generatedVolumeName, podErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName) // Assert if podErr != nil { @@ -136,8 +136,8 @@ func Test_AddPod_Positive_NewPodNodeExistsVolumeExists(t *testing.T) { // Arrange volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) dsw := NewDesiredStateOfWorld(volumePluginMgr) - pod1Name := types.UniquePodName("pod1-uid") - pod2Name := types.UniquePodName("pod2-uid") + pod1Name := "pod1-uid" + pod2Name := "pod2-uid" volumeName := api.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := "node-name" @@ -151,7 +151,7 @@ func Test_AddPod_Positive_NewPodNodeExistsVolumeExists(t *testing.T) { } // Act - generatedVolumeName, podErr := dsw.AddPod(pod1Name, volumeSpec, nodeName) + generatedVolumeName, podErr := dsw.AddPod(types.UniquePodName(pod1Name), controllervolumetesting.NewPod(pod1Name, pod1Name), volumeSpec, nodeName) // Assert if podErr != nil { @@ -171,7 +171,7 @@ func Test_AddPod_Positive_NewPodNodeExistsVolumeExists(t *testing.T) { } // Act - generatedVolumeName, podErr = dsw.AddPod(pod2Name, volumeSpec, nodeName) + generatedVolumeName, podErr = dsw.AddPod(types.UniquePodName(pod2Name), controllervolumetesting.NewPod(pod2Name, pod2Name), volumeSpec, nodeName) // Assert if podErr != nil { @@ -194,6 +194,10 @@ func Test_AddPod_Positive_NewPodNodeExistsVolumeExists(t *testing.T) { t.Fatalf("len(volumesToAttach) Expected: <1> Actual: <%v>", len(volumesToAttach)) } + podScheduled := dsw.GetPodToAdd() + if len(podScheduled) != 2 { + t.Fatalf("len(podScheduled) Expected: <2> Actual: <%v>", len(podScheduled)) + } verifyVolumeToAttach(t, volumesToAttach, nodeName, generatedVolumeName, string(volumeName)) } @@ -206,7 +210,7 @@ func Test_AddPod_Positive_PodExistsNodeExistsVolumeExists(t *testing.T) { // Arrange volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) dsw := NewDesiredStateOfWorld(volumePluginMgr) - podName := types.UniquePodName("pod-uid") + podName := "pod-uid" volumeName := api.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := "node-name" @@ -220,7 +224,7 @@ func Test_AddPod_Positive_PodExistsNodeExistsVolumeExists(t *testing.T) { } // Act - generatedVolumeName, podErr := dsw.AddPod(podName, volumeSpec, nodeName) + generatedVolumeName, podErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName) // Assert if podErr != nil { @@ -240,7 +244,7 @@ func Test_AddPod_Positive_PodExistsNodeExistsVolumeExists(t *testing.T) { } // Act - generatedVolumeName, podErr = dsw.AddPod(podName, volumeSpec, nodeName) + generatedVolumeName, podErr = dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName) // Assert if podErr != nil { @@ -272,7 +276,7 @@ func Test_AddPod_Negative_NewPodNodeDoesntExistVolumeDoesntExist(t *testing.T) { // Arrange volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) dsw := NewDesiredStateOfWorld(volumePluginMgr) - podName := types.UniquePodName("pod-uid") + podName := "pod-uid" volumeName := api.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := "node-name" @@ -285,7 +289,7 @@ func Test_AddPod_Negative_NewPodNodeDoesntExistVolumeDoesntExist(t *testing.T) { } // Act - _, podErr := dsw.AddPod(podName, volumeSpec, nodeName) + _, podErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName) // Assert if podErr == nil { @@ -371,10 +375,10 @@ func Test_DeleteNode_Negative_NodeExistsHasChildVolumes(t *testing.T) { dsw := NewDesiredStateOfWorld(volumePluginMgr) nodeName := "node-name" dsw.AddNode(nodeName) - podName := types.UniquePodName("pod-uid") + podName := "pod-uid" volumeName := api.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) - generatedVolumeName, podAddErr := dsw.AddPod(podName, volumeSpec, nodeName) + generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName) if podAddErr != nil { t.Fatalf( "AddPod failed for pod %q. Expected: Actual: <%v>", @@ -410,12 +414,12 @@ func Test_DeletePod_Positive_PodExistsNodeExistsVolumeExists(t *testing.T) { // Arrange volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) dsw := NewDesiredStateOfWorld(volumePluginMgr) - podName := types.UniquePodName("pod-uid") + podName := "pod-uid" volumeName := api.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := "node-name" dsw.AddNode(nodeName) - generatedVolumeName, podAddErr := dsw.AddPod(podName, volumeSpec, nodeName) + generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName) if podAddErr != nil { t.Fatalf( "AddPod failed for pod %q. Expected: Actual: <%v>", @@ -432,7 +436,7 @@ func Test_DeletePod_Positive_PodExistsNodeExistsVolumeExists(t *testing.T) { } // Act - dsw.DeletePod(podName, generatedVolumeName, nodeName) + dsw.DeletePod(types.UniquePodName(podName), generatedVolumeName, nodeName) // Assert volumeExists = dsw.VolumeExists(generatedVolumeName, nodeName) @@ -457,20 +461,20 @@ func Test_DeletePod_Positive_2PodsExistNodeExistsVolumesExist(t *testing.T) { // Arrange volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) dsw := NewDesiredStateOfWorld(volumePluginMgr) - pod1Name := types.UniquePodName("pod1-uid") - pod2Name := types.UniquePodName("pod2-uid") + pod1Name := "pod1-uid" + pod2Name := "pod2-uid" volumeName := api.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := "node-name" dsw.AddNode(nodeName) - generatedVolumeName1, pod1AddErr := dsw.AddPod(pod1Name, volumeSpec, nodeName) + generatedVolumeName1, pod1AddErr := dsw.AddPod(types.UniquePodName(pod1Name), controllervolumetesting.NewPod(pod1Name, pod1Name), volumeSpec, nodeName) if pod1AddErr != nil { t.Fatalf( "AddPod failed for pod %q. Expected: Actual: <%v>", pod1Name, pod1AddErr) } - generatedVolumeName2, pod2AddErr := dsw.AddPod(pod2Name, volumeSpec, nodeName) + generatedVolumeName2, pod2AddErr := dsw.AddPod(types.UniquePodName(pod2Name), controllervolumetesting.NewPod(pod2Name, pod2Name), volumeSpec, nodeName) if pod2AddErr != nil { t.Fatalf( "AddPod failed for pod %q. Expected: Actual: <%v>", @@ -492,7 +496,7 @@ func Test_DeletePod_Positive_2PodsExistNodeExistsVolumesExist(t *testing.T) { } // Act - dsw.DeletePod(pod1Name, generatedVolumeName1, nodeName) + dsw.DeletePod(types.UniquePodName(pod1Name), generatedVolumeName1, nodeName) // Assert volumeExists = dsw.VolumeExists(generatedVolumeName1, nodeName) @@ -518,13 +522,13 @@ func Test_DeletePod_Positive_PodDoesNotExist(t *testing.T) { // Arrange volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) dsw := NewDesiredStateOfWorld(volumePluginMgr) - pod1Name := types.UniquePodName("pod1-uid") - pod2Name := types.UniquePodName("pod2-uid") + pod1Name := "pod1-uid" + pod2Name := "pod2-uid" volumeName := api.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := "node-name" dsw.AddNode(nodeName) - generatedVolumeName, pod1AddErr := dsw.AddPod(pod1Name, volumeSpec, nodeName) + generatedVolumeName, pod1AddErr := dsw.AddPod(types.UniquePodName(pod1Name), controllervolumetesting.NewPod(pod1Name, pod1Name), volumeSpec, nodeName) if pod1AddErr != nil { t.Fatalf( "AddPod failed for pod %q. Expected: Actual: <%v>", @@ -541,7 +545,7 @@ func Test_DeletePod_Positive_PodDoesNotExist(t *testing.T) { } // Act - dsw.DeletePod(pod2Name, generatedVolumeName, nodeName) + dsw.DeletePod(types.UniquePodName(pod2Name), generatedVolumeName, nodeName) // Assert volumeExists = dsw.VolumeExists(generatedVolumeName, nodeName) @@ -567,12 +571,12 @@ func Test_DeletePod_Positive_NodeDoesNotExist(t *testing.T) { // Arrange volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) dsw := NewDesiredStateOfWorld(volumePluginMgr) - podName := types.UniquePodName("pod-uid") + podName := "pod-uid" volumeName := api.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) node1Name := "node1-name" dsw.AddNode(node1Name) - generatedVolumeName, podAddErr := dsw.AddPod(podName, volumeSpec, node1Name) + generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, node1Name) if podAddErr != nil { t.Fatalf( "AddPod failed for pod %q. Expected: Actual: <%v>", @@ -590,7 +594,7 @@ func Test_DeletePod_Positive_NodeDoesNotExist(t *testing.T) { node2Name := "node2-name" // Act - dsw.DeletePod(podName, generatedVolumeName, node2Name) + dsw.DeletePod(types.UniquePodName(podName), generatedVolumeName, node2Name) // Assert volumeExists = dsw.VolumeExists(generatedVolumeName, node1Name) @@ -622,12 +626,12 @@ func Test_DeletePod_Positive_VolumeDoesNotExist(t *testing.T) { // Arrange volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) dsw := NewDesiredStateOfWorld(volumePluginMgr) - podName := types.UniquePodName("pod-uid") + podName := "pod-uid" volume1Name := api.UniqueVolumeName("volume1-name") volume1Spec := controllervolumetesting.GetTestVolumeSpec(string(volume1Name), volume1Name) nodeName := "node-name" dsw.AddNode(nodeName) - generatedVolume1Name, podAddErr := dsw.AddPod(podName, volume1Spec, nodeName) + generatedVolume1Name, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volume1Spec, nodeName) if podAddErr != nil { t.Fatalf( "AddPod failed for pod %q. Expected: Actual: <%v>", @@ -645,7 +649,7 @@ func Test_DeletePod_Positive_VolumeDoesNotExist(t *testing.T) { volume2Name := api.UniqueVolumeName("volume2-name") // Act - dsw.DeletePod(podName, volume2Name, nodeName) + dsw.DeletePod(types.UniquePodName(podName), volume2Name, nodeName) // Assert volumeExists = dsw.VolumeExists(generatedVolume1Name, nodeName) @@ -725,10 +729,10 @@ func Test_VolumeExists_Positive_VolumeExistsNodeExists(t *testing.T) { dsw := NewDesiredStateOfWorld(volumePluginMgr) nodeName := "node-name" dsw.AddNode(nodeName) - podName := types.UniquePodName("pod-uid") + podName := "pod-uid" volumeName := api.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) - generatedVolumeName, _ := dsw.AddPod(podName, volumeSpec, nodeName) + generatedVolumeName, _ := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName) // Act volumeExists := dsw.VolumeExists(generatedVolumeName, nodeName) @@ -755,10 +759,10 @@ func Test_VolumeExists_Positive_VolumeDoesntExistNodeExists(t *testing.T) { dsw := NewDesiredStateOfWorld(volumePluginMgr) nodeName := "node-name" dsw.AddNode(nodeName) - podName := types.UniquePodName("pod-uid") + podName := "pod-uid" volume1Name := api.UniqueVolumeName("volume1-name") volume1Spec := controllervolumetesting.GetTestVolumeSpec(string(volume1Name), volume1Name) - generatedVolume1Name, podAddErr := dsw.AddPod(podName, volume1Spec, nodeName) + generatedVolume1Name, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volume1Spec, nodeName) if podAddErr != nil { t.Fatalf( "AddPod failed for pod %q. Expected: Actual: <%v>", @@ -851,11 +855,11 @@ func Test_GetVolumesToAttach_Positive_TwoNodesOneVolumeEach(t *testing.T) { volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) dsw := NewDesiredStateOfWorld(volumePluginMgr) node1Name := "node1-name" - pod1Name := types.UniquePodName("pod1-uid") + pod1Name := "pod1-uid" volume1Name := api.UniqueVolumeName("volume1-name") volume1Spec := controllervolumetesting.GetTestVolumeSpec(string(volume1Name), volume1Name) dsw.AddNode(node1Name) - generatedVolume1Name, podAddErr := dsw.AddPod(pod1Name, volume1Spec, node1Name) + generatedVolume1Name, podAddErr := dsw.AddPod(types.UniquePodName(pod1Name), controllervolumetesting.NewPod(pod1Name, pod1Name), volume1Spec, node1Name) if podAddErr != nil { t.Fatalf( "AddPod failed for pod %q. Expected: Actual: <%v>", @@ -863,11 +867,11 @@ func Test_GetVolumesToAttach_Positive_TwoNodesOneVolumeEach(t *testing.T) { podAddErr) } node2Name := "node2-name" - pod2Name := types.UniquePodName("pod2-uid") + pod2Name := "pod2-uid" volume2Name := api.UniqueVolumeName("volume2-name") volume2Spec := controllervolumetesting.GetTestVolumeSpec(string(volume2Name), volume2Name) dsw.AddNode(node2Name) - generatedVolume2Name, podAddErr := dsw.AddPod(pod2Name, volume2Spec, node2Name) + generatedVolume2Name, podAddErr := dsw.AddPod(types.UniquePodName(pod2Name), controllervolumetesting.NewPod(pod2Name, pod2Name), volume2Spec, node2Name) if podAddErr != nil { t.Fatalf( "AddPod failed for pod %q. Expected: Actual: <%v>", @@ -896,11 +900,11 @@ func Test_GetVolumesToAttach_Positive_TwoNodesOneVolumeEachExtraPod(t *testing.T volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) dsw := NewDesiredStateOfWorld(volumePluginMgr) node1Name := "node1-name" - pod1Name := types.UniquePodName("pod1-uid") + pod1Name := "pod1-uid" volume1Name := api.UniqueVolumeName("volume1-name") volume1Spec := controllervolumetesting.GetTestVolumeSpec(string(volume1Name), volume1Name) dsw.AddNode(node1Name) - generatedVolume1Name, podAddErr := dsw.AddPod(pod1Name, volume1Spec, node1Name) + generatedVolume1Name, podAddErr := dsw.AddPod(types.UniquePodName(pod1Name), controllervolumetesting.NewPod(pod1Name, pod1Name), volume1Spec, node1Name) if podAddErr != nil { t.Fatalf( "AddPod failed for pod %q. Expected: Actual: <%v>", @@ -908,20 +912,20 @@ func Test_GetVolumesToAttach_Positive_TwoNodesOneVolumeEachExtraPod(t *testing.T podAddErr) } node2Name := "node2-name" - pod2Name := types.UniquePodName("pod2-uid") + pod2Name := "pod2-uid" volume2Name := api.UniqueVolumeName("volume2-name") volume2Spec := controllervolumetesting.GetTestVolumeSpec(string(volume2Name), volume2Name) dsw.AddNode(node2Name) - generatedVolume2Name, podAddErr := dsw.AddPod(pod2Name, volume2Spec, node2Name) + generatedVolume2Name, podAddErr := dsw.AddPod(types.UniquePodName(pod2Name), controllervolumetesting.NewPod(pod2Name, pod2Name), volume2Spec, node2Name) if podAddErr != nil { t.Fatalf( "AddPod failed for pod %q. Expected: Actual: <%v>", pod2Name, podAddErr) } - pod3Name := types.UniquePodName("pod3-uid") - dsw.AddPod(pod3Name, volume2Spec, node2Name) - _, podAddErr = dsw.AddPod(pod3Name, volume2Spec, node2Name) + pod3Name := "pod3-uid" + dsw.AddPod(types.UniquePodName(pod3Name), controllervolumetesting.NewPod(pod3Name, pod3Name), volume2Spec, node2Name) + _, podAddErr = dsw.AddPod(types.UniquePodName(pod3Name), controllervolumetesting.NewPod(pod3Name, pod3Name), volume2Spec, node2Name) if podAddErr != nil { t.Fatalf( "AddPod failed for pod %q. Expected: Actual: <%v>", @@ -950,11 +954,11 @@ func Test_GetVolumesToAttach_Positive_TwoNodesThreeVolumes(t *testing.T) { volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) dsw := NewDesiredStateOfWorld(volumePluginMgr) node1Name := "node1-name" - pod1Name := types.UniquePodName("pod1-uid") + pod1Name := "pod1-uid" volume1Name := api.UniqueVolumeName("volume1-name") volume1Spec := controllervolumetesting.GetTestVolumeSpec(string(volume1Name), volume1Name) dsw.AddNode(node1Name) - generatedVolume1Name, podAddErr := dsw.AddPod(pod1Name, volume1Spec, node1Name) + generatedVolume1Name, podAddErr := dsw.AddPod(types.UniquePodName(pod1Name), controllervolumetesting.NewPod(pod1Name, pod1Name), volume1Spec, node1Name) if podAddErr != nil { t.Fatalf( "AddPod failed for pod %q. Expected: Actual: <%v>", @@ -962,19 +966,19 @@ func Test_GetVolumesToAttach_Positive_TwoNodesThreeVolumes(t *testing.T) { podAddErr) } node2Name := "node2-name" - pod2aName := types.UniquePodName("pod2a-name") + pod2aName := "pod2a-name" volume2Name := api.UniqueVolumeName("volume2-name") volume2Spec := controllervolumetesting.GetTestVolumeSpec(string(volume2Name), volume2Name) dsw.AddNode(node2Name) - generatedVolume2Name1, podAddErr := dsw.AddPod(pod2aName, volume2Spec, node2Name) + generatedVolume2Name1, podAddErr := dsw.AddPod(types.UniquePodName(pod2aName), controllervolumetesting.NewPod(pod2aName, pod2aName), volume2Spec, node2Name) if podAddErr != nil { t.Fatalf( "AddPod failed for pod %q. Expected: Actual: <%v>", pod2aName, podAddErr) } - pod2bName := types.UniquePodName("pod2b-name") - generatedVolume2Name2, podAddErr := dsw.AddPod(pod2bName, volume2Spec, node2Name) + pod2bName := "pod2b-name" + generatedVolume2Name2, podAddErr := dsw.AddPod(types.UniquePodName(pod2bName), controllervolumetesting.NewPod(pod2bName, pod2bName), volume2Spec, node2Name) if podAddErr != nil { t.Fatalf( "AddPod failed for pod %q. Expected: Actual: <%v>", @@ -987,10 +991,10 @@ func Test_GetVolumesToAttach_Positive_TwoNodesThreeVolumes(t *testing.T) { generatedVolume2Name1, generatedVolume2Name2) } - pod3Name := types.UniquePodName("pod3-uid") + pod3Name := "pod3-uid" volume3Name := api.UniqueVolumeName("volume3-name") volume3Spec := controllervolumetesting.GetTestVolumeSpec(string(volume3Name), volume3Name) - generatedVolume3Name, podAddErr := dsw.AddPod(pod3Name, volume3Spec, node1Name) + generatedVolume3Name, podAddErr := dsw.AddPod(types.UniquePodName(pod3Name), controllervolumetesting.NewPod(pod3Name, pod3Name), volume3Spec, node1Name) if podAddErr != nil { t.Fatalf( "AddPod failed for pod %q. Expected: Actual: <%v>", diff --git a/pkg/controller/volume/populator/desired_state_of_world_populator.go b/pkg/controller/volume/populator/desired_state_of_world_populator.go new file mode 100644 index 00000000000..da9554c2114 --- /dev/null +++ b/pkg/controller/volume/populator/desired_state_of_world_populator.go @@ -0,0 +1,112 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package populator implements interfaces that monitor and keep the states of the +// desired_state_of_word in sync with the "ground truth" from informer. +package populator + +import ( + "time" + + "github.com/golang/glog" + + "k8s.io/kubernetes/pkg/api" + kcache "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/controller/volume/cache" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/volume/util/volumehelper" +) + +// DesiredStateOfWorldPopulator periodically verifies that the pods in the +// desired state of th world still exist, if not, it removes them. +// TODO: it also loops through the list of active pods and ensures that +// each one exists in the desired state of the world cache +// if it has volumes. +type DesiredStateOfWorldPopulator interface { + Run(stopCh <-chan struct{}) +} + +// NewDesiredStateOfWorldPopulator returns a new instance of DesiredStateOfWorldPopulator. +// loopSleepDuration - the amount of time the populator loop sleeps between +// successive executions +// podManager - the kubelet podManager that is the source of truth for the pods +// that exist on this host +// desiredStateOfWorld - the cache to populate +func NewDesiredStateOfWorldPopulator( + loopSleepDuration time.Duration, + podInformer framework.SharedInformer, + desiredStateOfWorld cache.DesiredStateOfWorld) DesiredStateOfWorldPopulator { + return &desiredStateOfWorldPopulator{ + loopSleepDuration: loopSleepDuration, + podInformer: podInformer, + desiredStateOfWorld: desiredStateOfWorld, + } +} + +type desiredStateOfWorldPopulator struct { + loopSleepDuration time.Duration + podInformer framework.SharedInformer + desiredStateOfWorld cache.DesiredStateOfWorld +} + +func (dswp *desiredStateOfWorldPopulator) Run(stopCh <-chan struct{}) { + wait.Until(dswp.populatorLoopFunc(), dswp.loopSleepDuration, stopCh) +} + +func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc() func() { + return func() { + dswp.findAndRemoveDeletedPods() + } +} + +// Iterate through all pods in desired state of world, and remove if they no +// longer exist in the informer +func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() { + for dswPodUID, dswPodToAdd := range dswp.desiredStateOfWorld.GetPodToAdd() { + dswPodKey, err := kcache.MetaNamespaceKeyFunc(dswPodToAdd.Pod) + if err != nil { + glog.Errorf("MetaNamespaceKeyFunc failed for pod %q (UID %q) with: %v", dswPodKey, dswPodUID, err) + continue + } + // retrieve the pod object from pod informer with the namespace key + informerPodObj, exists, err := dswp.podInformer.GetStore().GetByKey(dswPodKey) + if err != nil || informerPodObj == nil { + glog.Errorf("podInformer GetByKey failed for pod %q (UID %q) with %v", dswPodKey, dswPodUID, err) + continue + } + if exists { + informerPod, ok := informerPodObj.(*api.Pod) + if !ok { + glog.Errorf("Failed to cast obj %#v to pod object for pod %q (UID %q)", informerPod, dswPodKey, dswPodUID) + continue + } + informerPodUID := volumehelper.GetUniquePodName(informerPod) + // Check whether the unique idenfier of the pod from dsw matches the one retrived from pod informer + if informerPodUID == dswPodUID { + glog.V(10).Infof( + "Verified pod %q (UID %q) from dsw exists in pod informer.", dswPodKey, dswPodUID) + continue + + } + } + // the pod from dsw does not exist in pod informer, or it does not match the unique idenfier retrieved + // from the informer, delete it from dsw + glog.V(1).Infof( + "Removing pod %q (UID %q) from dsw because it does not exist in pod informer.", dswPodKey, dswPodUID) + dswp.desiredStateOfWorld.DeletePod(dswPodUID, dswPodToAdd.VolumeName, dswPodToAdd.NodeName) + } +} diff --git a/pkg/controller/volume/reconciler/reconciler_test.go b/pkg/controller/volume/reconciler/reconciler_test.go index b28de9b3664..e79d63f7828 100644 --- a/pkg/controller/volume/reconciler/reconciler_test.go +++ b/pkg/controller/volume/reconciler/reconciler_test.go @@ -81,7 +81,7 @@ func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) { fakeKubeClient, nodeInformer, asw) reconciler := NewReconciler( reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) - podName := types.UniquePodName("pod-uid") + podName := "pod-uid" volumeName := api.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := "node-name" @@ -94,7 +94,7 @@ func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) { nodeName) } - _, podErr := dsw.AddPod(podName, volumeSpec, nodeName) + _, podErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName) if podErr != nil { t.Fatalf("AddPod failed. Expected: Actual: <%v>", podErr) } @@ -127,7 +127,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te fakeKubeClient, nodeInformer, asw) reconciler := NewReconciler( reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) - podName := types.UniquePodName("pod-uid") + podName := "pod-uid" volumeName := api.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := "node-name" @@ -140,7 +140,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te nodeName) } - generatedVolumeName, podAddErr := dsw.AddPod(podName, volumeSpec, nodeName) + generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName) if podAddErr != nil { t.Fatalf("AddPod failed. Expected: Actual: <%v>", podAddErr) } @@ -156,7 +156,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin) // Act - dsw.DeletePod(podName, generatedVolumeName, nodeName) + dsw.DeletePod(types.UniquePodName(podName), generatedVolumeName, nodeName) volumeExists = dsw.VolumeExists(generatedVolumeName, nodeName) if volumeExists { t.Fatalf( @@ -194,7 +194,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *test fakeKubeClient, nodeInformer, asw) reconciler := NewReconciler( reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) - podName := types.UniquePodName("pod-uid") + podName := "pod-uid" volumeName := api.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := "node-name" @@ -207,7 +207,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *test nodeName) } - generatedVolumeName, podAddErr := dsw.AddPod(podName, volumeSpec, nodeName) + generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName) if podAddErr != nil { t.Fatalf("AddPod failed. Expected: Actual: <%v>", podAddErr) } @@ -223,7 +223,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *test waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin) // Act - dsw.DeletePod(podName, generatedVolumeName, nodeName) + dsw.DeletePod(types.UniquePodName(podName), generatedVolumeName, nodeName) volumeExists = dsw.VolumeExists(generatedVolumeName, nodeName) if volumeExists { t.Fatalf( diff --git a/pkg/controller/volume/testing/testvolumespec.go b/pkg/controller/volume/testing/testvolumespec.go index 5651d7718ce..62515c818e7 100644 --- a/pkg/controller/volume/testing/testvolumespec.go +++ b/pkg/controller/volume/testing/testvolumespec.go @@ -23,6 +23,7 @@ import ( "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/watch" ) @@ -101,3 +102,14 @@ func CreateTestClient() *fake.Clientset { return fakeClient } + +// NewPod returns a test pod object +func NewPod(uid, name string) *api.Pod { + return &api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: types.UID(uid), + Name: name, + Namespace: name, + }, + } +}