mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #96617 from yuga711/dangling
Recover CSI volumes from dangling attachments
This commit is contained in:
commit
efb9489acb
@ -391,7 +391,11 @@ func (adc *attachDetachController) populateActualStateOfWorld() error {
|
|||||||
adc.addNodeToDswp(node, types.NodeName(node.Name))
|
adc.addNodeToDswp(node, types.NodeName(node.Name))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
err = adc.processVolumeAttachments()
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Failed to process volume attachments: %v", err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (adc *attachDetachController) getNodeVolumeDevicePath(
|
func (adc *attachDetachController) getNodeVolumeDevicePath(
|
||||||
@ -684,6 +688,67 @@ func (adc *attachDetachController) processVolumesInUse(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Process Volume-Attachment objects.
|
||||||
|
// Should be called only after populating attached volumes in the ASW.
|
||||||
|
// For each VA object, this function checks if its present in the ASW.
|
||||||
|
// If not, adds the volume to ASW as an "uncertain" attachment.
|
||||||
|
// In the reconciler, the logic checks if the volume is present in the DSW;
|
||||||
|
// if yes, the reconciler will attempt attach on the volume;
|
||||||
|
// if not (could be a dangling attachment), the reconciler will detach this volume.
|
||||||
|
func (adc *attachDetachController) processVolumeAttachments() error {
|
||||||
|
vas, err := adc.volumeAttachmentLister.List(labels.Everything())
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("failed to list VolumeAttachment objects: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, va := range vas {
|
||||||
|
nodeName := types.NodeName(va.Spec.NodeName)
|
||||||
|
pvName := va.Spec.Source.PersistentVolumeName
|
||||||
|
if pvName == nil {
|
||||||
|
// Currently VA objects are created for CSI volumes only. nil pvName is unexpected, generate a warning
|
||||||
|
klog.Warningf("Skipping the va as its pvName is nil, va.Name: %q, nodeName: %q",
|
||||||
|
va.Name, nodeName)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
pv, err := adc.pvLister.Get(*pvName)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Unable to lookup pv object for: %q, err: %v", *pvName, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
volumeSpec := volume.NewSpecFromPersistentVolume(pv, false)
|
||||||
|
plugin, err := adc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
|
||||||
|
if err != nil || plugin == nil {
|
||||||
|
// Currently VA objects are created for CSI volumes only. nil plugin is unexpected, generate a warning
|
||||||
|
klog.Warningf(
|
||||||
|
"Skipping processing the volume %q on nodeName: %q, no attacher interface found. err=%v",
|
||||||
|
*pvName,
|
||||||
|
nodeName,
|
||||||
|
err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
volumeName, err := volumeutil.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf(
|
||||||
|
"Failed to find unique name for volume:%q, va.Name:%q, nodeName:%q: %v",
|
||||||
|
*pvName,
|
||||||
|
va.Name,
|
||||||
|
nodeName,
|
||||||
|
err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
attachState := adc.actualStateOfWorld.GetAttachState(volumeName, nodeName)
|
||||||
|
if attachState == cache.AttachStateDetached {
|
||||||
|
klog.V(1).Infof("Marking volume attachment as uncertain as volume:%q (%q) is not attached (%v)",
|
||||||
|
volumeName, nodeName, attachState)
|
||||||
|
err = adc.actualStateOfWorld.MarkVolumeAsUncertain(volumeName, volumeSpec, nodeName)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("MarkVolumeAsUncertain fail to add the volume %q (%q) to ASW. err: %s", volumeName, nodeName, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
var _ volume.VolumeHost = &attachDetachController{}
|
var _ volume.VolumeHost = &attachDetachController{}
|
||||||
var _ volume.AttachDetachVolumeHost = &attachDetachController{}
|
var _ volume.AttachDetachVolumeHost = &attachDetachController{}
|
||||||
|
|
||||||
|
@ -72,18 +72,21 @@ func Test_AttachDetachControllerStateOfWolrdPopulators_Positive(t *testing.T) {
|
|||||||
nodeInformer := informerFactory.Core().V1().Nodes()
|
nodeInformer := informerFactory.Core().V1().Nodes()
|
||||||
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
|
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
|
||||||
pvInformer := informerFactory.Core().V1().PersistentVolumes()
|
pvInformer := informerFactory.Core().V1().PersistentVolumes()
|
||||||
|
volumeAttachmentInformer := informerFactory.Storage().V1().VolumeAttachments()
|
||||||
|
|
||||||
adc := &attachDetachController{
|
adc := &attachDetachController{
|
||||||
kubeClient: fakeKubeClient,
|
kubeClient: fakeKubeClient,
|
||||||
pvcLister: pvcInformer.Lister(),
|
pvcLister: pvcInformer.Lister(),
|
||||||
pvcsSynced: pvcInformer.Informer().HasSynced,
|
pvcsSynced: pvcInformer.Informer().HasSynced,
|
||||||
pvLister: pvInformer.Lister(),
|
pvLister: pvInformer.Lister(),
|
||||||
pvsSynced: pvInformer.Informer().HasSynced,
|
pvsSynced: pvInformer.Informer().HasSynced,
|
||||||
podLister: podInformer.Lister(),
|
podLister: podInformer.Lister(),
|
||||||
podsSynced: podInformer.Informer().HasSynced,
|
podsSynced: podInformer.Informer().HasSynced,
|
||||||
nodeLister: nodeInformer.Lister(),
|
nodeLister: nodeInformer.Lister(),
|
||||||
nodesSynced: nodeInformer.Informer().HasSynced,
|
nodesSynced: nodeInformer.Informer().HasSynced,
|
||||||
cloud: nil,
|
volumeAttachmentLister: volumeAttachmentInformer.Lister(),
|
||||||
|
volumeAttachmentSynced: volumeAttachmentInformer.Informer().HasSynced,
|
||||||
|
cloud: nil,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
@ -335,3 +338,210 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type vaTest struct {
|
||||||
|
testName string
|
||||||
|
volName string
|
||||||
|
podName string
|
||||||
|
podNodeName string
|
||||||
|
pvName string
|
||||||
|
vaName string
|
||||||
|
vaNodeName string
|
||||||
|
vaAttachStatus bool
|
||||||
|
expected_attaches map[string][]string
|
||||||
|
expected_detaches map[string][]string
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_ADC_VolumeAttachmentRecovery(t *testing.T) {
|
||||||
|
for _, tc := range []vaTest{
|
||||||
|
{ // pod is scheduled
|
||||||
|
testName: "Scheduled pod",
|
||||||
|
volName: "vol1",
|
||||||
|
podName: "pod1",
|
||||||
|
podNodeName: "mynode-1",
|
||||||
|
pvName: "pv1",
|
||||||
|
vaName: "va1",
|
||||||
|
vaNodeName: "mynode-1",
|
||||||
|
vaAttachStatus: false,
|
||||||
|
expected_attaches: map[string][]string{"mynode-1": {"vol1"}},
|
||||||
|
expected_detaches: map[string][]string{},
|
||||||
|
},
|
||||||
|
{ // pod is deleted, attach status:true, verify dangling volume is detached
|
||||||
|
testName: "VA status is attached",
|
||||||
|
volName: "vol1",
|
||||||
|
pvName: "pv1",
|
||||||
|
vaName: "va1",
|
||||||
|
vaNodeName: "mynode-1",
|
||||||
|
vaAttachStatus: true,
|
||||||
|
expected_attaches: map[string][]string{},
|
||||||
|
expected_detaches: map[string][]string{"mynode-1": {"vol1"}},
|
||||||
|
},
|
||||||
|
{ // pod is deleted, attach status:false, verify dangling volume is detached
|
||||||
|
testName: "VA status is unattached",
|
||||||
|
volName: "vol1",
|
||||||
|
pvName: "pv1",
|
||||||
|
vaName: "va1",
|
||||||
|
vaNodeName: "mynode-1",
|
||||||
|
vaAttachStatus: false,
|
||||||
|
expected_attaches: map[string][]string{},
|
||||||
|
expected_detaches: map[string][]string{"mynode-1": {"vol1"}},
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.testName, func(t *testing.T) {
|
||||||
|
volumeAttachmentRecoveryTestCase(t, tc)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) {
|
||||||
|
fakeKubeClient := controllervolumetesting.CreateTestClient()
|
||||||
|
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, time.Second*1)
|
||||||
|
plugins := controllervolumetesting.CreateTestPlugin()
|
||||||
|
nodeInformer := informerFactory.Core().V1().Nodes().Informer()
|
||||||
|
podInformer := informerFactory.Core().V1().Pods().Informer()
|
||||||
|
pvInformer := informerFactory.Core().V1().PersistentVolumes().Informer()
|
||||||
|
vaInformer := informerFactory.Storage().V1().VolumeAttachments().Informer()
|
||||||
|
|
||||||
|
// Create the controller
|
||||||
|
adcObj, err := NewAttachDetachController(
|
||||||
|
fakeKubeClient,
|
||||||
|
informerFactory.Core().V1().Pods(),
|
||||||
|
informerFactory.Core().V1().Nodes(),
|
||||||
|
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||||
|
informerFactory.Core().V1().PersistentVolumes(),
|
||||||
|
informerFactory.Storage().V1().CSINodes(),
|
||||||
|
informerFactory.Storage().V1().CSIDrivers(),
|
||||||
|
informerFactory.Storage().V1().VolumeAttachments(),
|
||||||
|
nil, /* cloud */
|
||||||
|
plugins,
|
||||||
|
nil, /* prober */
|
||||||
|
false,
|
||||||
|
1*time.Second,
|
||||||
|
DefaultTimerConfig,
|
||||||
|
nil, /* filteredDialOptions */
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewAttachDetachController failed with error. Expected: <no error> Actual: <%v>", err)
|
||||||
|
}
|
||||||
|
adc := adcObj.(*attachDetachController)
|
||||||
|
|
||||||
|
// Add existing objects (created by testplugin) to the respective informers
|
||||||
|
pods, err := fakeKubeClient.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
|
||||||
|
}
|
||||||
|
for _, pod := range pods.Items {
|
||||||
|
podToAdd := pod
|
||||||
|
podInformer.GetIndexer().Add(&podToAdd)
|
||||||
|
}
|
||||||
|
nodes, err := fakeKubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
|
||||||
|
}
|
||||||
|
for _, node := range nodes.Items {
|
||||||
|
nodeToAdd := node
|
||||||
|
nodeInformer.GetIndexer().Add(&nodeToAdd)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create and add objects requested by the test
|
||||||
|
if tc.podName != "" {
|
||||||
|
newPod := controllervolumetesting.NewPodWithVolume(tc.podName, tc.volName, tc.podNodeName)
|
||||||
|
_, err = adc.kubeClient.CoreV1().Pods(newPod.ObjectMeta.Namespace).Create(context.TODO(), newPod, metav1.CreateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Run failed with error. Failed to create a new pod: <%v>", err)
|
||||||
|
}
|
||||||
|
podInformer.GetIndexer().Add(newPod)
|
||||||
|
}
|
||||||
|
if tc.pvName != "" {
|
||||||
|
newPv := controllervolumetesting.NewPV(tc.pvName, tc.volName)
|
||||||
|
_, err = adc.kubeClient.CoreV1().PersistentVolumes().Create(context.TODO(), newPv, metav1.CreateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Run failed with error. Failed to create a new pv: <%v>", err)
|
||||||
|
}
|
||||||
|
pvInformer.GetIndexer().Add(newPv)
|
||||||
|
}
|
||||||
|
if tc.vaName != "" {
|
||||||
|
newVa := controllervolumetesting.NewVolumeAttachment("va1", "pv1", "mynode-1", false)
|
||||||
|
_, err = adc.kubeClient.StorageV1().VolumeAttachments().Create(context.TODO(), newVa, metav1.CreateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Run failed with error. Failed to create a new volumeAttachment: <%v>", err)
|
||||||
|
}
|
||||||
|
vaInformer.GetIndexer().Add(newVa)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Makesure the informer cache is synced
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
informerFactory.Start(stopCh)
|
||||||
|
|
||||||
|
if !kcache.WaitForNamedCacheSync("attach detach", stopCh,
|
||||||
|
informerFactory.Core().V1().Pods().Informer().HasSynced,
|
||||||
|
informerFactory.Core().V1().Nodes().Informer().HasSynced,
|
||||||
|
informerFactory.Core().V1().PersistentVolumes().Informer().HasSynced,
|
||||||
|
informerFactory.Storage().V1().VolumeAttachments().Informer().HasSynced) {
|
||||||
|
t.Fatalf("Error waiting for the informer caches to sync")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Populate ASW
|
||||||
|
err = adc.populateActualStateOfWorld()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Populate DSW
|
||||||
|
err = adc.populateDesiredStateOfWorld()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
|
||||||
|
}
|
||||||
|
// Run reconciler and DSW populator loops
|
||||||
|
go adc.reconciler.Run(stopCh)
|
||||||
|
go adc.desiredStateOfWorldPopulator.Run(stopCh)
|
||||||
|
defer close(stopCh)
|
||||||
|
|
||||||
|
// Verify if expected attaches and detaches have happened
|
||||||
|
testPlugin := plugins[0].(*controllervolumetesting.TestPlugin)
|
||||||
|
for tries := 0; tries <= 10; tries++ { // wait & try few times before failing the test
|
||||||
|
expected_op_map := tc.expected_attaches
|
||||||
|
plugin_map := testPlugin.GetAttachedVolumes()
|
||||||
|
verify_op := "attach"
|
||||||
|
volFound, nodeFound := false, false
|
||||||
|
for i := 0; i <= 1; i++ { // verify attaches and detaches
|
||||||
|
if i == 1 {
|
||||||
|
expected_op_map = tc.expected_detaches
|
||||||
|
plugin_map = testPlugin.GetDetachedVolumes()
|
||||||
|
verify_op = "detach"
|
||||||
|
}
|
||||||
|
// Verify every (node, volume) in the expected_op_map is in the
|
||||||
|
// plugin_map
|
||||||
|
for expectedNode, expectedVolumeList := range expected_op_map {
|
||||||
|
var volumeList []string
|
||||||
|
volumeList, nodeFound = plugin_map[expectedNode]
|
||||||
|
if !nodeFound && tries == 10 {
|
||||||
|
t.Fatalf("Expected node not found, node:%v, op: %v, tries: %d",
|
||||||
|
expectedNode, verify_op, tries)
|
||||||
|
}
|
||||||
|
for _, expectedVolume := range expectedVolumeList {
|
||||||
|
volFound = false
|
||||||
|
for _, volume := range volumeList {
|
||||||
|
if expectedVolume == volume {
|
||||||
|
volFound = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !volFound && tries == 10 {
|
||||||
|
t.Fatalf("Expected %v operation not found, node:%v, volume: %v, tries: %d",
|
||||||
|
verify_op, expectedNode, expectedVolume, tries)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if nodeFound && volFound {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(time.Second * 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
if testPlugin.GetErrorEncountered() {
|
||||||
|
t.Fatalf("Fatal error encountered in the testing volume plugin")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
@ -20,7 +20,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
|
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
|
||||||
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
|
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
|
||||||
@ -1374,6 +1374,79 @@ func Test_updateNodeStatusUpdateNeededError(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Mark a volume as attached to a node.
|
||||||
|
// Verify GetAttachState returns AttachedState
|
||||||
|
// Verify GetAttachedVolumes return this volume
|
||||||
|
func Test_MarkVolumeAsAttached(t *testing.T) {
|
||||||
|
// Arrange
|
||||||
|
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
|
||||||
|
asw := NewActualStateOfWorld(volumePluginMgr)
|
||||||
|
volumeName := v1.UniqueVolumeName("volume-name")
|
||||||
|
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
|
||||||
|
|
||||||
|
nodeName := types.NodeName("node-name")
|
||||||
|
devicePath := "fake/device/path"
|
||||||
|
|
||||||
|
plugin, err := volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
|
||||||
|
if err != nil || plugin == nil {
|
||||||
|
t.Fatalf("Failed to get volume plugin from spec %v, %v", volumeSpec, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Act
|
||||||
|
err = asw.MarkVolumeAsAttached(volumeName, volumeSpec, nodeName, devicePath)
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
volumeNodeComboState := asw.GetAttachState(volumeName, nodeName)
|
||||||
|
if volumeNodeComboState != AttachStateAttached {
|
||||||
|
t.Fatalf("asw says the volume: %q is not attached (%v) to node:%q, it should.",
|
||||||
|
volumeName, AttachStateAttached, nodeName)
|
||||||
|
}
|
||||||
|
attachedVolumes := asw.GetAttachedVolumes()
|
||||||
|
if len(attachedVolumes) != 1 {
|
||||||
|
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
|
||||||
|
}
|
||||||
|
verifyAttachedVolume(t, attachedVolumes, volumeName, string(volumeName), nodeName, devicePath, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark a volume as attachment as uncertain.
|
||||||
|
// Verify GetAttachState returns UncertainState
|
||||||
|
// Verify GetAttachedVolumes return this volume
|
||||||
|
func Test_MarkVolumeAsUncertain(t *testing.T) {
|
||||||
|
// Arrange
|
||||||
|
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
|
||||||
|
asw := NewActualStateOfWorld(volumePluginMgr)
|
||||||
|
volumeName := v1.UniqueVolumeName("volume-name")
|
||||||
|
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
|
||||||
|
nodeName := types.NodeName("node-name")
|
||||||
|
|
||||||
|
plugin, err := volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
|
||||||
|
if err != nil || plugin == nil {
|
||||||
|
t.Fatalf("Failed to get volume plugin from spec %v, %v", volumeSpec, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Act
|
||||||
|
err = asw.MarkVolumeAsUncertain(volumeName, volumeSpec, nodeName)
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("MarkVolumeAsUncertain failed. Expected: <no error> Actual: <%v>", err)
|
||||||
|
}
|
||||||
|
volumeNodeComboState := asw.GetAttachState(volumeName, nodeName)
|
||||||
|
if volumeNodeComboState != AttachStateUncertain {
|
||||||
|
t.Fatalf("asw says the volume: %q is attached (%v) to node:%q, it should not.",
|
||||||
|
volumeName, volumeNodeComboState, nodeName)
|
||||||
|
}
|
||||||
|
attachedVolumes := asw.GetAttachedVolumes()
|
||||||
|
if len(attachedVolumes) != 1 {
|
||||||
|
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
|
||||||
|
}
|
||||||
|
verifyAttachedVolume(t, attachedVolumes, volumeName, string(volumeName), nodeName, "", true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
|
||||||
|
}
|
||||||
|
|
||||||
func verifyAttachedVolume(
|
func verifyAttachedVolume(
|
||||||
t *testing.T,
|
t *testing.T,
|
||||||
attachedVolumes []AttachedVolume,
|
attachedVolumes []AttachedVolume,
|
||||||
|
@ -60,6 +60,9 @@ func GetTestVolumeSpec(volumeName string, diskName v1.UniqueVolumeName) *volume.
|
|||||||
}
|
}
|
||||||
|
|
||||||
var extraPods *v1.PodList
|
var extraPods *v1.PodList
|
||||||
|
var volumeAttachments *storagev1.VolumeAttachmentList
|
||||||
|
var pvs *v1.PersistentVolumeList
|
||||||
|
var nodes *v1.NodeList
|
||||||
|
|
||||||
func CreateTestClient() *fake.Clientset {
|
func CreateTestClient() *fake.Clientset {
|
||||||
fakeClient := &fake.Clientset{}
|
fakeClient := &fake.Clientset{}
|
||||||
@ -143,40 +146,48 @@ func CreateTestClient() *fake.Clientset {
|
|||||||
}
|
}
|
||||||
return true, obj, nil
|
return true, obj, nil
|
||||||
})
|
})
|
||||||
|
nodes = &v1.NodeList{}
|
||||||
|
nodeNamePrefix := "mynode"
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
var nodeName string
|
||||||
|
if i != 0 {
|
||||||
|
nodeName = fmt.Sprintf("%s-%d", nodeNamePrefix, i)
|
||||||
|
} else {
|
||||||
|
// We want also the "mynode" node since all the testing pods live there
|
||||||
|
nodeName = nodeNamePrefix
|
||||||
|
}
|
||||||
|
attachVolumeToNode("lostVolumeName", nodeName)
|
||||||
|
}
|
||||||
fakeClient.AddReactor("list", "nodes", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
fakeClient.AddReactor("list", "nodes", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||||
obj := &v1.NodeList{}
|
obj := &v1.NodeList{}
|
||||||
nodeNamePrefix := "mynode"
|
obj.Items = append(obj.Items, nodes.Items...)
|
||||||
for i := 0; i < 5; i++ {
|
|
||||||
var nodeName string
|
|
||||||
if i != 0 {
|
|
||||||
nodeName = fmt.Sprintf("%s-%d", nodeNamePrefix, i)
|
|
||||||
} else {
|
|
||||||
// We want also the "mynode" node since all the testing pods live there
|
|
||||||
nodeName = nodeNamePrefix
|
|
||||||
}
|
|
||||||
node := v1.Node{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: nodeName,
|
|
||||||
Labels: map[string]string{
|
|
||||||
"name": nodeName,
|
|
||||||
},
|
|
||||||
Annotations: map[string]string{
|
|
||||||
util.ControllerManagedAttachAnnotation: "true",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Status: v1.NodeStatus{
|
|
||||||
VolumesAttached: []v1.AttachedVolume{
|
|
||||||
{
|
|
||||||
Name: TestPluginName + "/lostVolumeName",
|
|
||||||
DevicePath: "fake/path",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
obj.Items = append(obj.Items, node)
|
|
||||||
}
|
|
||||||
return true, obj, nil
|
return true, obj, nil
|
||||||
})
|
})
|
||||||
|
volumeAttachments = &storagev1.VolumeAttachmentList{}
|
||||||
|
fakeClient.AddReactor("list", "volumeattachments", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||||
|
obj := &storagev1.VolumeAttachmentList{}
|
||||||
|
obj.Items = append(obj.Items, volumeAttachments.Items...)
|
||||||
|
return true, obj, nil
|
||||||
|
})
|
||||||
|
fakeClient.AddReactor("create", "volumeattachments", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||||
|
createAction := action.(core.CreateAction)
|
||||||
|
va := createAction.GetObject().(*storagev1.VolumeAttachment)
|
||||||
|
volumeAttachments.Items = append(volumeAttachments.Items, *va)
|
||||||
|
return true, createAction.GetObject(), nil
|
||||||
|
})
|
||||||
|
|
||||||
|
pvs = &v1.PersistentVolumeList{}
|
||||||
|
fakeClient.AddReactor("list", "persistentvolumes", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||||
|
obj := &v1.PersistentVolumeList{}
|
||||||
|
obj.Items = append(obj.Items, pvs.Items...)
|
||||||
|
return true, obj, nil
|
||||||
|
})
|
||||||
|
fakeClient.AddReactor("create", "persistentvolumes", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||||
|
createAction := action.(core.CreateAction)
|
||||||
|
pv := createAction.GetObject().(*v1.PersistentVolume)
|
||||||
|
pvs.Items = append(pvs.Items, *pv)
|
||||||
|
return true, createAction.GetObject(), nil
|
||||||
|
})
|
||||||
|
|
||||||
fakeWatch := watch.NewFake()
|
fakeWatch := watch.NewFake()
|
||||||
fakeClient.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
|
fakeClient.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
|
||||||
@ -237,6 +248,88 @@ func NewPodWithVolume(podName, volumeName, nodeName string) *v1.Pod {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns a volumeAttachment object
|
||||||
|
func NewVolumeAttachment(vaName, pvName, nodeName string, status bool) *storagev1.VolumeAttachment {
|
||||||
|
return &storagev1.VolumeAttachment{
|
||||||
|
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
UID: types.UID(vaName),
|
||||||
|
Name: vaName,
|
||||||
|
},
|
||||||
|
Spec: storagev1.VolumeAttachmentSpec{
|
||||||
|
Attacher: "test.storage.gke.io",
|
||||||
|
NodeName: nodeName,
|
||||||
|
Source: storagev1.VolumeAttachmentSource{
|
||||||
|
PersistentVolumeName: &pvName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: storagev1.VolumeAttachmentStatus{
|
||||||
|
Attached: status,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns a persistentVolume object
|
||||||
|
func NewPV(pvName, volumeName string) *v1.PersistentVolume {
|
||||||
|
return &v1.PersistentVolume{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
UID: types.UID(pvName),
|
||||||
|
Name: pvName,
|
||||||
|
},
|
||||||
|
Spec: v1.PersistentVolumeSpec{
|
||||||
|
PersistentVolumeSource: v1.PersistentVolumeSource{
|
||||||
|
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
|
||||||
|
PDName: volumeName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func attachVolumeToNode(volumeName, nodeName string) {
|
||||||
|
// if nodeName exists, get the object.. if not create node object
|
||||||
|
var node *v1.Node
|
||||||
|
found := false
|
||||||
|
nodes.Size()
|
||||||
|
for i := range nodes.Items {
|
||||||
|
curNode := nodes.Items[i]
|
||||||
|
if curNode.ObjectMeta.Name == nodeName {
|
||||||
|
node = &curNode
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
node = &v1.Node{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: nodeName,
|
||||||
|
Labels: map[string]string{
|
||||||
|
"name": nodeName,
|
||||||
|
},
|
||||||
|
Annotations: map[string]string{
|
||||||
|
util.ControllerManagedAttachAnnotation: "true",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
VolumesAttached: []v1.AttachedVolume{
|
||||||
|
{
|
||||||
|
Name: v1.UniqueVolumeName(TestPluginName + "/" + volumeName),
|
||||||
|
DevicePath: "fake/path",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
volumeAttached := v1.AttachedVolume{
|
||||||
|
Name: v1.UniqueVolumeName(TestPluginName + "/" + volumeName),
|
||||||
|
DevicePath: "fake/path",
|
||||||
|
}
|
||||||
|
node.Status.VolumesAttached = append(node.Status.VolumesAttached, volumeAttached)
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes.Items = append(nodes.Items, *node)
|
||||||
|
}
|
||||||
|
|
||||||
type TestPlugin struct {
|
type TestPlugin struct {
|
||||||
ErrorEncountered bool
|
ErrorEncountered bool
|
||||||
attachedVolumeMap map[string][]string
|
attachedVolumeMap map[string][]string
|
||||||
@ -258,8 +351,15 @@ func (plugin *TestPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
|
|||||||
if spec == nil {
|
if spec == nil {
|
||||||
klog.Errorf("GetVolumeName called with nil volume spec")
|
klog.Errorf("GetVolumeName called with nil volume spec")
|
||||||
plugin.ErrorEncountered = true
|
plugin.ErrorEncountered = true
|
||||||
|
return "", fmt.Errorf("GetVolumeName called with nil volume spec")
|
||||||
|
}
|
||||||
|
if spec.Volume != nil {
|
||||||
|
return spec.Name(), nil
|
||||||
|
} else if spec.PersistentVolume != nil {
|
||||||
|
return spec.PersistentVolume.Spec.PersistentVolumeSource.GCEPersistentDisk.PDName, nil
|
||||||
|
} else {
|
||||||
|
return "", nil
|
||||||
}
|
}
|
||||||
return spec.Name(), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (plugin *TestPlugin) CanSupport(spec *volume.Spec) bool {
|
func (plugin *TestPlugin) CanSupport(spec *volume.Spec) bool {
|
||||||
|
Loading…
Reference in New Issue
Block a user