implement desiredWorld populator to sync up with informer

This change implements the desiredStateOfWorld populator to sync up with
the pod informer. It periodically check each pod in the
desiredStateOfworld and verify whether it is still in pod informer
cache. If it not, remove it from the desiredStateOfWorld
This commit is contained in:
Jing Xu 2016-06-15 15:59:59 -07:00
parent 8b817c4a0d
commit 0fefb23f94
6 changed files with 251 additions and 68 deletions

View File

@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/volume/cache"
"k8s.io/kubernetes/pkg/controller/volume/populator"
"k8s.io/kubernetes/pkg/controller/volume/reconciler"
"k8s.io/kubernetes/pkg/controller/volume/statusupdater"
"k8s.io/kubernetes/pkg/types"
@ -50,6 +51,10 @@ const (
// from its node. Once this time has expired, the controller will assume the
// node or kubelet are unresponsive and will detach the volume anyway.
reconcilerMaxWaitForUnmountDuration time.Duration = 3 * time.Minute
// desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
// DesiredStateOfWorldPopulator loop waits between successive executions
desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 5 * time.Minute
)
// AttachDetachController defines the operations supported by this controller.
@ -119,6 +124,11 @@ func NewAttachDetachController(
adc.attacherDetacher,
adc.nodeStatusUpdater)
adc.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
desiredStateOfWorldPopulatorLoopSleepPeriod,
podInformer,
adc.desiredStateOfWorld)
return adc, nil
}
@ -170,6 +180,10 @@ type attachDetachController struct {
// nodeStatusUpdater is used to update node status with the list of attached
// volumes
nodeStatusUpdater statusupdater.NodeStatusUpdater
// desiredStateOfWorldPopulator runs an asynchronous periodic loop to
// populate the current pods using podInformer.
desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
}
func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
@ -177,6 +191,7 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
glog.Infof("Starting Attach Detach Controller")
go adc.reconciler.Run(stopCh)
go adc.desiredStateOfWorldPopulator.Run(stopCh)
<-stopCh
glog.Infof("Shutting down Attach Detach Controller")
@ -300,7 +315,7 @@ func (adc *attachDetachController) processPodVolumes(
if addVolumes {
// Add volume to desired state of world
_, err := adc.desiredStateOfWorld.AddPod(
uniquePodName, volumeSpec, pod.Spec.NodeName)
uniquePodName, pod, volumeSpec, pod.Spec.NodeName)
if err != nil {
glog.V(10).Infof(
"Failed to add volume %q for pod %q/%q to desiredStateOfWorld. %v",

View File

@ -57,7 +57,7 @@ type DesiredStateOfWorld interface {
// should be attached to the specified node, the volume is implicitly added.
// If no node with the name nodeName exists in list of nodes managed by the
// attach/detach attached controller, an error is returned.
AddPod(podName types.UniquePodName, volumeSpec *volume.Spec, nodeName string) (api.UniqueVolumeName, error)
AddPod(podName types.UniquePodName, pod *api.Pod, volumeSpec *volume.Spec, nodeName string) (api.UniqueVolumeName, error)
// DeleteNode removes the given node from the list of nodes managed by the
// attach/detach controller.
@ -90,6 +90,10 @@ type DesiredStateOfWorld interface {
// and the nodes they should be attached to based on the current desired
// state of the world.
GetVolumesToAttach() []VolumeToAttach
// GetPodToAdd generates and returns a map of pods based on the current desired
// state of world
GetPodToAdd() map[types.UniquePodName]PodToAdd
}
// VolumeToAttach represents a volume that should be attached to a node.
@ -97,6 +101,19 @@ type VolumeToAttach struct {
operationexecutor.VolumeToAttach
}
// PodToAdd represents a pod that references the underlying volume and is
// scheduled to the underlying node.
type PodToAdd struct {
// pod contains the api object of pod
Pod *api.Pod
// volumeName contains the unique identifier for this volume.
VolumeName api.UniqueVolumeName
// nodeName contains the name of this node.
NodeName string
}
// NewDesiredStateOfWorld returns a new instance of DesiredStateOfWorld.
func NewDesiredStateOfWorld(volumePluginMgr *volume.VolumePluginMgr) DesiredStateOfWorld {
return &desiredStateOfWorld{
@ -119,7 +136,7 @@ type desiredStateOfWorld struct {
// nodeManaged represents a node that is being managed by the attach/detach
// controller.
type nodeManaged struct {
// nodName contains the name of this node.
// nodeName contains the name of this node.
nodeName string
// volumesToAttach is a map containing the set of volumes that should be
@ -145,11 +162,14 @@ type volumeToAttach struct {
scheduledPods map[types.UniquePodName]pod
}
// The pod object represents a pod that references the underlying volume and is
// The pod represents a pod that references the underlying volume and is
// scheduled to the underlying node.
type pod struct {
// podName contains the name of this pod.
// podName contains the unique identifier for this pod
podName types.UniquePodName
// pod object contains the api object of pod
podObj *api.Pod
}
func (dsw *desiredStateOfWorld) AddNode(nodeName string) {
@ -166,6 +186,7 @@ func (dsw *desiredStateOfWorld) AddNode(nodeName string) {
func (dsw *desiredStateOfWorld) AddPod(
podName types.UniquePodName,
podToAdd *api.Pod,
volumeSpec *volume.Spec,
nodeName string) (api.UniqueVolumeName, error) {
dsw.Lock()
@ -204,11 +225,11 @@ func (dsw *desiredStateOfWorld) AddPod(
}
dsw.nodesManaged[nodeName].volumesToAttach[volumeName] = volumeObj
}
if _, podExists := volumeObj.scheduledPods[podName]; !podExists {
dsw.nodesManaged[nodeName].volumesToAttach[volumeName].scheduledPods[podName] =
pod{
podName: podName,
podObj: podToAdd,
}
}
@ -309,3 +330,22 @@ func (dsw *desiredStateOfWorld) GetVolumesToAttach() []VolumeToAttach {
return volumesToAttach
}
func (dsw *desiredStateOfWorld) GetPodToAdd() map[types.UniquePodName]PodToAdd {
dsw.RLock()
defer dsw.RUnlock()
pods := make(map[types.UniquePodName]PodToAdd)
for nodeName, nodeObj := range dsw.nodesManaged {
for volumeName, volumeObj := range nodeObj.volumesToAttach {
for podUID, pod := range volumeObj.scheduledPods {
pods[podUID] = PodToAdd{
Pod: pod.podObj,
VolumeName: volumeName,
NodeName: nodeName,
}
}
}
}
return pods
}

View File

@ -87,9 +87,9 @@ func Test_AddNode_Positive_ExistingNode(t *testing.T) {
// Verifies node/volume exists, and 1 volumes to attach.
func Test_AddPod_Positive_NewPodNodeExistsVolumeDoesntExist(t *testing.T) {
// Arrange
podName := "pod-uid"
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
dsw := NewDesiredStateOfWorld(volumePluginMgr)
podName := types.UniquePodName("pod-uid")
volumeName := api.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
@ -103,7 +103,7 @@ func Test_AddPod_Positive_NewPodNodeExistsVolumeDoesntExist(t *testing.T) {
}
// Act
generatedVolumeName, podErr := dsw.AddPod(podName, volumeSpec, nodeName)
generatedVolumeName, podErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName)
// Assert
if podErr != nil {
@ -136,8 +136,8 @@ func Test_AddPod_Positive_NewPodNodeExistsVolumeExists(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
dsw := NewDesiredStateOfWorld(volumePluginMgr)
pod1Name := types.UniquePodName("pod1-uid")
pod2Name := types.UniquePodName("pod2-uid")
pod1Name := "pod1-uid"
pod2Name := "pod2-uid"
volumeName := api.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
@ -151,7 +151,7 @@ func Test_AddPod_Positive_NewPodNodeExistsVolumeExists(t *testing.T) {
}
// Act
generatedVolumeName, podErr := dsw.AddPod(pod1Name, volumeSpec, nodeName)
generatedVolumeName, podErr := dsw.AddPod(types.UniquePodName(pod1Name), controllervolumetesting.NewPod(pod1Name, pod1Name), volumeSpec, nodeName)
// Assert
if podErr != nil {
@ -171,7 +171,7 @@ func Test_AddPod_Positive_NewPodNodeExistsVolumeExists(t *testing.T) {
}
// Act
generatedVolumeName, podErr = dsw.AddPod(pod2Name, volumeSpec, nodeName)
generatedVolumeName, podErr = dsw.AddPod(types.UniquePodName(pod2Name), controllervolumetesting.NewPod(pod2Name, pod2Name), volumeSpec, nodeName)
// Assert
if podErr != nil {
@ -194,6 +194,10 @@ func Test_AddPod_Positive_NewPodNodeExistsVolumeExists(t *testing.T) {
t.Fatalf("len(volumesToAttach) Expected: <1> Actual: <%v>", len(volumesToAttach))
}
podScheduled := dsw.GetPodToAdd()
if len(podScheduled) != 2 {
t.Fatalf("len(podScheduled) Expected: <2> Actual: <%v>", len(podScheduled))
}
verifyVolumeToAttach(t, volumesToAttach, nodeName, generatedVolumeName, string(volumeName))
}
@ -206,7 +210,7 @@ func Test_AddPod_Positive_PodExistsNodeExistsVolumeExists(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
dsw := NewDesiredStateOfWorld(volumePluginMgr)
podName := types.UniquePodName("pod-uid")
podName := "pod-uid"
volumeName := api.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
@ -220,7 +224,7 @@ func Test_AddPod_Positive_PodExistsNodeExistsVolumeExists(t *testing.T) {
}
// Act
generatedVolumeName, podErr := dsw.AddPod(podName, volumeSpec, nodeName)
generatedVolumeName, podErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName)
// Assert
if podErr != nil {
@ -240,7 +244,7 @@ func Test_AddPod_Positive_PodExistsNodeExistsVolumeExists(t *testing.T) {
}
// Act
generatedVolumeName, podErr = dsw.AddPod(podName, volumeSpec, nodeName)
generatedVolumeName, podErr = dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName)
// Assert
if podErr != nil {
@ -272,7 +276,7 @@ func Test_AddPod_Negative_NewPodNodeDoesntExistVolumeDoesntExist(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
dsw := NewDesiredStateOfWorld(volumePluginMgr)
podName := types.UniquePodName("pod-uid")
podName := "pod-uid"
volumeName := api.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
@ -285,7 +289,7 @@ func Test_AddPod_Negative_NewPodNodeDoesntExistVolumeDoesntExist(t *testing.T) {
}
// Act
_, podErr := dsw.AddPod(podName, volumeSpec, nodeName)
_, podErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName)
// Assert
if podErr == nil {
@ -371,10 +375,10 @@ func Test_DeleteNode_Negative_NodeExistsHasChildVolumes(t *testing.T) {
dsw := NewDesiredStateOfWorld(volumePluginMgr)
nodeName := "node-name"
dsw.AddNode(nodeName)
podName := types.UniquePodName("pod-uid")
podName := "pod-uid"
volumeName := api.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
generatedVolumeName, podAddErr := dsw.AddPod(podName, volumeSpec, nodeName)
generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName)
if podAddErr != nil {
t.Fatalf(
"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",
@ -410,12 +414,12 @@ func Test_DeletePod_Positive_PodExistsNodeExistsVolumeExists(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
dsw := NewDesiredStateOfWorld(volumePluginMgr)
podName := types.UniquePodName("pod-uid")
podName := "pod-uid"
volumeName := api.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
dsw.AddNode(nodeName)
generatedVolumeName, podAddErr := dsw.AddPod(podName, volumeSpec, nodeName)
generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName)
if podAddErr != nil {
t.Fatalf(
"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",
@ -432,7 +436,7 @@ func Test_DeletePod_Positive_PodExistsNodeExistsVolumeExists(t *testing.T) {
}
// Act
dsw.DeletePod(podName, generatedVolumeName, nodeName)
dsw.DeletePod(types.UniquePodName(podName), generatedVolumeName, nodeName)
// Assert
volumeExists = dsw.VolumeExists(generatedVolumeName, nodeName)
@ -457,20 +461,20 @@ func Test_DeletePod_Positive_2PodsExistNodeExistsVolumesExist(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
dsw := NewDesiredStateOfWorld(volumePluginMgr)
pod1Name := types.UniquePodName("pod1-uid")
pod2Name := types.UniquePodName("pod2-uid")
pod1Name := "pod1-uid"
pod2Name := "pod2-uid"
volumeName := api.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
dsw.AddNode(nodeName)
generatedVolumeName1, pod1AddErr := dsw.AddPod(pod1Name, volumeSpec, nodeName)
generatedVolumeName1, pod1AddErr := dsw.AddPod(types.UniquePodName(pod1Name), controllervolumetesting.NewPod(pod1Name, pod1Name), volumeSpec, nodeName)
if pod1AddErr != nil {
t.Fatalf(
"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",
pod1Name,
pod1AddErr)
}
generatedVolumeName2, pod2AddErr := dsw.AddPod(pod2Name, volumeSpec, nodeName)
generatedVolumeName2, pod2AddErr := dsw.AddPod(types.UniquePodName(pod2Name), controllervolumetesting.NewPod(pod2Name, pod2Name), volumeSpec, nodeName)
if pod2AddErr != nil {
t.Fatalf(
"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",
@ -492,7 +496,7 @@ func Test_DeletePod_Positive_2PodsExistNodeExistsVolumesExist(t *testing.T) {
}
// Act
dsw.DeletePod(pod1Name, generatedVolumeName1, nodeName)
dsw.DeletePod(types.UniquePodName(pod1Name), generatedVolumeName1, nodeName)
// Assert
volumeExists = dsw.VolumeExists(generatedVolumeName1, nodeName)
@ -518,13 +522,13 @@ func Test_DeletePod_Positive_PodDoesNotExist(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
dsw := NewDesiredStateOfWorld(volumePluginMgr)
pod1Name := types.UniquePodName("pod1-uid")
pod2Name := types.UniquePodName("pod2-uid")
pod1Name := "pod1-uid"
pod2Name := "pod2-uid"
volumeName := api.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
dsw.AddNode(nodeName)
generatedVolumeName, pod1AddErr := dsw.AddPod(pod1Name, volumeSpec, nodeName)
generatedVolumeName, pod1AddErr := dsw.AddPod(types.UniquePodName(pod1Name), controllervolumetesting.NewPod(pod1Name, pod1Name), volumeSpec, nodeName)
if pod1AddErr != nil {
t.Fatalf(
"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",
@ -541,7 +545,7 @@ func Test_DeletePod_Positive_PodDoesNotExist(t *testing.T) {
}
// Act
dsw.DeletePod(pod2Name, generatedVolumeName, nodeName)
dsw.DeletePod(types.UniquePodName(pod2Name), generatedVolumeName, nodeName)
// Assert
volumeExists = dsw.VolumeExists(generatedVolumeName, nodeName)
@ -567,12 +571,12 @@ func Test_DeletePod_Positive_NodeDoesNotExist(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
dsw := NewDesiredStateOfWorld(volumePluginMgr)
podName := types.UniquePodName("pod-uid")
podName := "pod-uid"
volumeName := api.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
node1Name := "node1-name"
dsw.AddNode(node1Name)
generatedVolumeName, podAddErr := dsw.AddPod(podName, volumeSpec, node1Name)
generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, node1Name)
if podAddErr != nil {
t.Fatalf(
"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",
@ -590,7 +594,7 @@ func Test_DeletePod_Positive_NodeDoesNotExist(t *testing.T) {
node2Name := "node2-name"
// Act
dsw.DeletePod(podName, generatedVolumeName, node2Name)
dsw.DeletePod(types.UniquePodName(podName), generatedVolumeName, node2Name)
// Assert
volumeExists = dsw.VolumeExists(generatedVolumeName, node1Name)
@ -622,12 +626,12 @@ func Test_DeletePod_Positive_VolumeDoesNotExist(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
dsw := NewDesiredStateOfWorld(volumePluginMgr)
podName := types.UniquePodName("pod-uid")
podName := "pod-uid"
volume1Name := api.UniqueVolumeName("volume1-name")
volume1Spec := controllervolumetesting.GetTestVolumeSpec(string(volume1Name), volume1Name)
nodeName := "node-name"
dsw.AddNode(nodeName)
generatedVolume1Name, podAddErr := dsw.AddPod(podName, volume1Spec, nodeName)
generatedVolume1Name, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volume1Spec, nodeName)
if podAddErr != nil {
t.Fatalf(
"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",
@ -645,7 +649,7 @@ func Test_DeletePod_Positive_VolumeDoesNotExist(t *testing.T) {
volume2Name := api.UniqueVolumeName("volume2-name")
// Act
dsw.DeletePod(podName, volume2Name, nodeName)
dsw.DeletePod(types.UniquePodName(podName), volume2Name, nodeName)
// Assert
volumeExists = dsw.VolumeExists(generatedVolume1Name, nodeName)
@ -725,10 +729,10 @@ func Test_VolumeExists_Positive_VolumeExistsNodeExists(t *testing.T) {
dsw := NewDesiredStateOfWorld(volumePluginMgr)
nodeName := "node-name"
dsw.AddNode(nodeName)
podName := types.UniquePodName("pod-uid")
podName := "pod-uid"
volumeName := api.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
generatedVolumeName, _ := dsw.AddPod(podName, volumeSpec, nodeName)
generatedVolumeName, _ := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName)
// Act
volumeExists := dsw.VolumeExists(generatedVolumeName, nodeName)
@ -755,10 +759,10 @@ func Test_VolumeExists_Positive_VolumeDoesntExistNodeExists(t *testing.T) {
dsw := NewDesiredStateOfWorld(volumePluginMgr)
nodeName := "node-name"
dsw.AddNode(nodeName)
podName := types.UniquePodName("pod-uid")
podName := "pod-uid"
volume1Name := api.UniqueVolumeName("volume1-name")
volume1Spec := controllervolumetesting.GetTestVolumeSpec(string(volume1Name), volume1Name)
generatedVolume1Name, podAddErr := dsw.AddPod(podName, volume1Spec, nodeName)
generatedVolume1Name, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volume1Spec, nodeName)
if podAddErr != nil {
t.Fatalf(
"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",
@ -851,11 +855,11 @@ func Test_GetVolumesToAttach_Positive_TwoNodesOneVolumeEach(t *testing.T) {
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
dsw := NewDesiredStateOfWorld(volumePluginMgr)
node1Name := "node1-name"
pod1Name := types.UniquePodName("pod1-uid")
pod1Name := "pod1-uid"
volume1Name := api.UniqueVolumeName("volume1-name")
volume1Spec := controllervolumetesting.GetTestVolumeSpec(string(volume1Name), volume1Name)
dsw.AddNode(node1Name)
generatedVolume1Name, podAddErr := dsw.AddPod(pod1Name, volume1Spec, node1Name)
generatedVolume1Name, podAddErr := dsw.AddPod(types.UniquePodName(pod1Name), controllervolumetesting.NewPod(pod1Name, pod1Name), volume1Spec, node1Name)
if podAddErr != nil {
t.Fatalf(
"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",
@ -863,11 +867,11 @@ func Test_GetVolumesToAttach_Positive_TwoNodesOneVolumeEach(t *testing.T) {
podAddErr)
}
node2Name := "node2-name"
pod2Name := types.UniquePodName("pod2-uid")
pod2Name := "pod2-uid"
volume2Name := api.UniqueVolumeName("volume2-name")
volume2Spec := controllervolumetesting.GetTestVolumeSpec(string(volume2Name), volume2Name)
dsw.AddNode(node2Name)
generatedVolume2Name, podAddErr := dsw.AddPod(pod2Name, volume2Spec, node2Name)
generatedVolume2Name, podAddErr := dsw.AddPod(types.UniquePodName(pod2Name), controllervolumetesting.NewPod(pod2Name, pod2Name), volume2Spec, node2Name)
if podAddErr != nil {
t.Fatalf(
"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",
@ -896,11 +900,11 @@ func Test_GetVolumesToAttach_Positive_TwoNodesOneVolumeEachExtraPod(t *testing.T
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
dsw := NewDesiredStateOfWorld(volumePluginMgr)
node1Name := "node1-name"
pod1Name := types.UniquePodName("pod1-uid")
pod1Name := "pod1-uid"
volume1Name := api.UniqueVolumeName("volume1-name")
volume1Spec := controllervolumetesting.GetTestVolumeSpec(string(volume1Name), volume1Name)
dsw.AddNode(node1Name)
generatedVolume1Name, podAddErr := dsw.AddPod(pod1Name, volume1Spec, node1Name)
generatedVolume1Name, podAddErr := dsw.AddPod(types.UniquePodName(pod1Name), controllervolumetesting.NewPod(pod1Name, pod1Name), volume1Spec, node1Name)
if podAddErr != nil {
t.Fatalf(
"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",
@ -908,20 +912,20 @@ func Test_GetVolumesToAttach_Positive_TwoNodesOneVolumeEachExtraPod(t *testing.T
podAddErr)
}
node2Name := "node2-name"
pod2Name := types.UniquePodName("pod2-uid")
pod2Name := "pod2-uid"
volume2Name := api.UniqueVolumeName("volume2-name")
volume2Spec := controllervolumetesting.GetTestVolumeSpec(string(volume2Name), volume2Name)
dsw.AddNode(node2Name)
generatedVolume2Name, podAddErr := dsw.AddPod(pod2Name, volume2Spec, node2Name)
generatedVolume2Name, podAddErr := dsw.AddPod(types.UniquePodName(pod2Name), controllervolumetesting.NewPod(pod2Name, pod2Name), volume2Spec, node2Name)
if podAddErr != nil {
t.Fatalf(
"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",
pod2Name,
podAddErr)
}
pod3Name := types.UniquePodName("pod3-uid")
dsw.AddPod(pod3Name, volume2Spec, node2Name)
_, podAddErr = dsw.AddPod(pod3Name, volume2Spec, node2Name)
pod3Name := "pod3-uid"
dsw.AddPod(types.UniquePodName(pod3Name), controllervolumetesting.NewPod(pod3Name, pod3Name), volume2Spec, node2Name)
_, podAddErr = dsw.AddPod(types.UniquePodName(pod3Name), controllervolumetesting.NewPod(pod3Name, pod3Name), volume2Spec, node2Name)
if podAddErr != nil {
t.Fatalf(
"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",
@ -950,11 +954,11 @@ func Test_GetVolumesToAttach_Positive_TwoNodesThreeVolumes(t *testing.T) {
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
dsw := NewDesiredStateOfWorld(volumePluginMgr)
node1Name := "node1-name"
pod1Name := types.UniquePodName("pod1-uid")
pod1Name := "pod1-uid"
volume1Name := api.UniqueVolumeName("volume1-name")
volume1Spec := controllervolumetesting.GetTestVolumeSpec(string(volume1Name), volume1Name)
dsw.AddNode(node1Name)
generatedVolume1Name, podAddErr := dsw.AddPod(pod1Name, volume1Spec, node1Name)
generatedVolume1Name, podAddErr := dsw.AddPod(types.UniquePodName(pod1Name), controllervolumetesting.NewPod(pod1Name, pod1Name), volume1Spec, node1Name)
if podAddErr != nil {
t.Fatalf(
"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",
@ -962,19 +966,19 @@ func Test_GetVolumesToAttach_Positive_TwoNodesThreeVolumes(t *testing.T) {
podAddErr)
}
node2Name := "node2-name"
pod2aName := types.UniquePodName("pod2a-name")
pod2aName := "pod2a-name"
volume2Name := api.UniqueVolumeName("volume2-name")
volume2Spec := controllervolumetesting.GetTestVolumeSpec(string(volume2Name), volume2Name)
dsw.AddNode(node2Name)
generatedVolume2Name1, podAddErr := dsw.AddPod(pod2aName, volume2Spec, node2Name)
generatedVolume2Name1, podAddErr := dsw.AddPod(types.UniquePodName(pod2aName), controllervolumetesting.NewPod(pod2aName, pod2aName), volume2Spec, node2Name)
if podAddErr != nil {
t.Fatalf(
"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",
pod2aName,
podAddErr)
}
pod2bName := types.UniquePodName("pod2b-name")
generatedVolume2Name2, podAddErr := dsw.AddPod(pod2bName, volume2Spec, node2Name)
pod2bName := "pod2b-name"
generatedVolume2Name2, podAddErr := dsw.AddPod(types.UniquePodName(pod2bName), controllervolumetesting.NewPod(pod2bName, pod2bName), volume2Spec, node2Name)
if podAddErr != nil {
t.Fatalf(
"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",
@ -987,10 +991,10 @@ func Test_GetVolumesToAttach_Positive_TwoNodesThreeVolumes(t *testing.T) {
generatedVolume2Name1,
generatedVolume2Name2)
}
pod3Name := types.UniquePodName("pod3-uid")
pod3Name := "pod3-uid"
volume3Name := api.UniqueVolumeName("volume3-name")
volume3Spec := controllervolumetesting.GetTestVolumeSpec(string(volume3Name), volume3Name)
generatedVolume3Name, podAddErr := dsw.AddPod(pod3Name, volume3Spec, node1Name)
generatedVolume3Name, podAddErr := dsw.AddPod(types.UniquePodName(pod3Name), controllervolumetesting.NewPod(pod3Name, pod3Name), volume3Spec, node1Name)
if podAddErr != nil {
t.Fatalf(
"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",

View File

@ -0,0 +1,112 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package populator implements interfaces that monitor and keep the states of the
// desired_state_of_word in sync with the "ground truth" from informer.
package populator
import (
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
kcache "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/volume/cache"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)
// DesiredStateOfWorldPopulator periodically verifies that the pods in the
// desired state of th world still exist, if not, it removes them.
// TODO: it also loops through the list of active pods and ensures that
// each one exists in the desired state of the world cache
// if it has volumes.
type DesiredStateOfWorldPopulator interface {
Run(stopCh <-chan struct{})
}
// NewDesiredStateOfWorldPopulator returns a new instance of DesiredStateOfWorldPopulator.
// loopSleepDuration - the amount of time the populator loop sleeps between
// successive executions
// podManager - the kubelet podManager that is the source of truth for the pods
// that exist on this host
// desiredStateOfWorld - the cache to populate
func NewDesiredStateOfWorldPopulator(
loopSleepDuration time.Duration,
podInformer framework.SharedInformer,
desiredStateOfWorld cache.DesiredStateOfWorld) DesiredStateOfWorldPopulator {
return &desiredStateOfWorldPopulator{
loopSleepDuration: loopSleepDuration,
podInformer: podInformer,
desiredStateOfWorld: desiredStateOfWorld,
}
}
type desiredStateOfWorldPopulator struct {
loopSleepDuration time.Duration
podInformer framework.SharedInformer
desiredStateOfWorld cache.DesiredStateOfWorld
}
func (dswp *desiredStateOfWorldPopulator) Run(stopCh <-chan struct{}) {
wait.Until(dswp.populatorLoopFunc(), dswp.loopSleepDuration, stopCh)
}
func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc() func() {
return func() {
dswp.findAndRemoveDeletedPods()
}
}
// Iterate through all pods in desired state of world, and remove if they no
// longer exist in the informer
func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
for dswPodUID, dswPodToAdd := range dswp.desiredStateOfWorld.GetPodToAdd() {
dswPodKey, err := kcache.MetaNamespaceKeyFunc(dswPodToAdd.Pod)
if err != nil {
glog.Errorf("MetaNamespaceKeyFunc failed for pod %q (UID %q) with: %v", dswPodKey, dswPodUID, err)
continue
}
// retrieve the pod object from pod informer with the namespace key
informerPodObj, exists, err := dswp.podInformer.GetStore().GetByKey(dswPodKey)
if err != nil || informerPodObj == nil {
glog.Errorf("podInformer GetByKey failed for pod %q (UID %q) with %v", dswPodKey, dswPodUID, err)
continue
}
if exists {
informerPod, ok := informerPodObj.(*api.Pod)
if !ok {
glog.Errorf("Failed to cast obj %#v to pod object for pod %q (UID %q)", informerPod, dswPodKey, dswPodUID)
continue
}
informerPodUID := volumehelper.GetUniquePodName(informerPod)
// Check whether the unique idenfier of the pod from dsw matches the one retrived from pod informer
if informerPodUID == dswPodUID {
glog.V(10).Infof(
"Verified pod %q (UID %q) from dsw exists in pod informer.", dswPodKey, dswPodUID)
continue
}
}
// the pod from dsw does not exist in pod informer, or it does not match the unique idenfier retrieved
// from the informer, delete it from dsw
glog.V(1).Infof(
"Removing pod %q (UID %q) from dsw because it does not exist in pod informer.", dswPodKey, dswPodUID)
dswp.desiredStateOfWorld.DeletePod(dswPodUID, dswPodToAdd.VolumeName, dswPodToAdd.NodeName)
}
}

View File

@ -81,7 +81,7 @@ func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) {
fakeKubeClient, nodeInformer, asw)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu)
podName := types.UniquePodName("pod-uid")
podName := "pod-uid"
volumeName := api.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
@ -94,7 +94,7 @@ func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) {
nodeName)
}
_, podErr := dsw.AddPod(podName, volumeSpec, nodeName)
_, podErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName)
if podErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podErr)
}
@ -127,7 +127,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te
fakeKubeClient, nodeInformer, asw)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu)
podName := types.UniquePodName("pod-uid")
podName := "pod-uid"
volumeName := api.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
@ -140,7 +140,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te
nodeName)
}
generatedVolumeName, podAddErr := dsw.AddPod(podName, volumeSpec, nodeName)
generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName)
if podAddErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
}
@ -156,7 +156,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
// Act
dsw.DeletePod(podName, generatedVolumeName, nodeName)
dsw.DeletePod(types.UniquePodName(podName), generatedVolumeName, nodeName)
volumeExists = dsw.VolumeExists(generatedVolumeName, nodeName)
if volumeExists {
t.Fatalf(
@ -194,7 +194,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *test
fakeKubeClient, nodeInformer, asw)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu)
podName := types.UniquePodName("pod-uid")
podName := "pod-uid"
volumeName := api.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
@ -207,7 +207,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *test
nodeName)
}
generatedVolumeName, podAddErr := dsw.AddPod(podName, volumeSpec, nodeName)
generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName)
if podAddErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
}
@ -223,7 +223,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *test
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
// Act
dsw.DeletePod(podName, generatedVolumeName, nodeName)
dsw.DeletePod(types.UniquePodName(podName), generatedVolumeName, nodeName)
volumeExists = dsw.VolumeExists(generatedVolumeName, nodeName)
if volumeExists {
t.Fatalf(

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/watch"
)
@ -101,3 +102,14 @@ func CreateTestClient() *fake.Clientset {
return fakeClient
}
// NewPod returns a test pod object
func NewPod(uid, name string) *api.Pod {
return &api.Pod{
ObjectMeta: api.ObjectMeta{
UID: types.UID(uid),
Name: name,
Namespace: name,
},
}
}