diff --git a/pkg/volume/util/operationexecutor/BUILD b/pkg/volume/util/operationexecutor/BUILD index 05e79f3a3da..27c51518d12 100644 --- a/pkg/volume/util/operationexecutor/BUILD +++ b/pkg/volume/util/operationexecutor/BUILD @@ -41,6 +41,7 @@ go_test( "//pkg/api/v1:go_default_library", "//pkg/types:go_default_library", "//pkg/util/mount:go_default_library", - "//vendor:k8s.io/client-go/_vendor/github.com/pborman/uuid", + "//pkg/util/uuid:go_default_library", + "//pkg/volume/util/types:go_default_library", ], ) diff --git a/pkg/volume/util/operationexecutor/operation_executor_test.go b/pkg/volume/util/operationexecutor/operation_executor_test.go index 1c1cf7f2eb5..96a207f2b9d 100644 --- a/pkg/volume/util/operationexecutor/operation_executor_test.go +++ b/pkg/volume/util/operationexecutor/operation_executor_test.go @@ -17,25 +17,32 @@ limitations under the License. package operationexecutor import ( - "k8s.io/client-go/_vendor/github.com/pborman/uuid" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/mount" + "k8s.io/kubernetes/pkg/util/uuid" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" "strconv" "testing" "time" ) -const numVolumesToMount = 2 +const ( + numVolumesToMount = 2 + numAttachableVolumesToUnmount = 2 + numNonAttachableVolumesToUnmount = 2 + numDevicesToUnmount = 2 + numVolumesToAttach = 2 + numVolumesToDetach = 2 + numVolumesToVerifyAttached = 2 + numVolumesToVerifyControllerAttached = 2 +) -var _ OperationGenerator = &mockOperationGenerator{} +var _ OperationGenerator = &fakeOperationGenerator{} -func TestOperationExecutor_MountVolume_ParallelMountForNonAttachablePlugins(t *testing.T) { +func TestOperationExecutor_MountVolume_ConcurrentMountForNonAttachablePlugins(t *testing.T) { // Arrange - ch, quit := make(chan interface{}), make(chan interface{}) - numMountOperationsStarted := 0 - mopg := newMockOperationGenerator(ch, quit) - oe := NewOperationExecutor(mopg) + ch, quit, oe := setup() volumesToMount := make([]VolumeToMount, numVolumesToMount) secretName := "secret-volume" volumeName := v1.UniqueVolumeName(secretName) @@ -54,29 +61,15 @@ func TestOperationExecutor_MountVolume_ParallelMountForNonAttachablePlugins(t *t } // Assert -loop: - for { - select { - case <-ch: - numMountOperationsStarted++ - if numMountOperationsStarted == numVolumesToMount { - break loop - } - case <-time.After(5 * time.Second): - t.Fatalf("Unable to start mount operations in parallel for non-attachable volumes") - break loop - } + if !isOperationRunConcurrently(ch, quit, numVolumesToMount) { + t.Fatalf("Unable to start mount operations in Concurrent for non-attachable volumes") } - close(quit) } -func TestOperationExecutor_MountVolume_ParallelMountForAttachablePlugins(t *testing.T) { +func TestOperationExecutor_MountVolume_ConcurrentMountForAttachablePlugins(t *testing.T) { // Arrange - ch, quit := make(chan interface{}), make(chan interface{}) - numMountOperationsStarted := 0 - mopg := newMockOperationGenerator(ch, quit) - oe := NewOperationExecutor(mopg) - volumesToMount := make([]VolumeToMount, numVolumesToMount) + ch, quit, oe := setup() + volumesToMount := make([]VolumeToMount, numVolumesToAttach) pdName := "pd-volume" volumeName := v1.UniqueVolumeName(pdName) @@ -94,59 +87,196 @@ func TestOperationExecutor_MountVolume_ParallelMountForAttachablePlugins(t *test } // Assert -loop: - for { - select { - case <-ch: - numMountOperationsStarted++ - if numMountOperationsStarted > 1 { - t.Fatalf("Mount operations should not start in parallel for attachable volumes") - break loop - } - case <-time.After(5 * time.Second): - break loop - } + if !isOperationRunSerially(ch, quit) { + t.Fatalf("Mount operations should not start concurrently for attachable volumes") } - close(quit) } -type mockOperationGenerator struct { +func TestOperationExecutor_UnmountVolume_ConcurrentUnmountForAllPlugins(t *testing.T) { + // Arrange + ch, quit, oe := setup() + volumesToUnmount := make([]MountedVolume, numAttachableVolumesToUnmount+numNonAttachableVolumesToUnmount) + pdName := "pd-volume" + secretName := "secret-volume" + + // Act + for i := 0; i < numNonAttachableVolumesToUnmount+numAttachableVolumesToUnmount; i++ { + podName := "pod-" + strconv.Itoa(i+1) + if i < numNonAttachableVolumesToUnmount { + pod := getTestPodWithSecret(podName, secretName) + volumesToUnmount[i] = MountedVolume{ + PodName: volumetypes.UniquePodName(podName), + VolumeName: v1.UniqueVolumeName(secretName), + PodUID: pod.UID, + } + } else { + pod := getTestPodWithGCEPD(podName, pdName) + volumesToUnmount[i] = MountedVolume{ + PodName: volumetypes.UniquePodName(podName), + VolumeName: v1.UniqueVolumeName(pdName), + PodUID: pod.UID, + } + } + oe.UnmountVolume(volumesToUnmount[i], nil /* actualStateOfWorldMounterUpdater */) + } + + // Assert + if !isOperationRunConcurrently(ch, quit, numNonAttachableVolumesToUnmount+numAttachableVolumesToUnmount) { + t.Fatalf("Unable to start unmount operations concurrently for volume plugins") + } +} + +func TestOperationExecutor_UnmountDeviceConcurrently(t *testing.T) { + // Arrange + ch, quit, oe := setup() + attachedVolumes := make([]AttachedVolume, numDevicesToUnmount) + pdName := "pd-volume" + + // Act + for i := range attachedVolumes { + attachedVolumes[i] = AttachedVolume{ + VolumeName: v1.UniqueVolumeName(pdName), + NodeName: "node-name", + } + oe.UnmountDevice(attachedVolumes[i], nil /* actualStateOfWorldMounterUpdater */, nil /* mount.Interface */) + } + + // Assert + if !isOperationRunSerially(ch, quit) { + t.Fatalf("Unmount device operations should not start concurrently") + } +} + +func TestOperationExecutor_AttachVolumeConcurrently(t *testing.T) { + // Arrange + ch, quit, oe := setup() + volumesToAttach := make([]VolumeToAttach, numVolumesToAttach) + pdName := "pd-volume" + + // Act + for i := range volumesToAttach { + volumesToAttach[i] = VolumeToAttach{ + VolumeName: v1.UniqueVolumeName(pdName), + NodeName: "node", + } + oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */) + } + + // Assert + if !isOperationRunSerially(ch, quit) { + t.Fatalf("Attach volume operations should not start concurrently") + } +} + +func TestOperationExecutor_DetachVolumeConcurrently(t *testing.T) { + // Arrange + ch, quit, oe := setup() + attachedVolumes := make([]AttachedVolume, numVolumesToDetach) + pdName := "pd-volume" + + // Act + for i := range attachedVolumes { + attachedVolumes[i] = AttachedVolume{ + VolumeName: v1.UniqueVolumeName(pdName), + NodeName: "node", + } + oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */) + } + + // Assert + if !isOperationRunSerially(ch, quit) { + t.Fatalf("DetachVolume operations should not run concurrently") + } +} + +func TestOperationExecutor_VerifyVolumesAreAttachedConcurrently(t *testing.T) { + // Arrange + ch, quit, oe := setup() + + // Act + for i := 0; i < numVolumesToVerifyAttached; i++ { + oe.VerifyVolumesAreAttached(nil /* attachedVolumes */, "node-name", nil /* actualStateOfWorldAttacherUpdater */) + } + + // Assert + if !isOperationRunConcurrently(ch, quit, numVolumesToVerifyAttached) { + t.Fatalf("VerifyVolumesAreAttached operation is not being run concurrently") + } +} + +func TestOperationExecutor_VerifyControllerAttachedVolumeConcurrently(t *testing.T) { + // Arrange + ch, quit, oe := setup() + volumesToMount := make([]VolumeToMount, numVolumesToVerifyControllerAttached) + pdName := "pd-volume" + + // Act + for i := range volumesToMount { + volumesToMount[i] = VolumeToMount{ + VolumeName: v1.UniqueVolumeName(pdName), + } + oe.VerifyControllerAttachedVolume(volumesToMount[i], types.NodeName("node-name"), nil /* actualStateOfWorldMounterUpdater */) + } + + // Assert + if !isOperationRunSerially(ch, quit) { + t.Fatalf("VerifyControllerAttachedVolume should not run concurrently") + } +} + +type fakeOperationGenerator struct { ch chan interface{} quit chan interface{} } -func newMockOperationGenerator(ch, quit chan interface{}) OperationGenerator { - return &mockOperationGenerator{ +func newFakeOperationGenerator(ch chan interface{}, quit chan interface{}) OperationGenerator { + return &fakeOperationGenerator{ ch: ch, quit: quit, } } -func (mopg *mockOperationGenerator) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (func() error, error) { +func (fopg *fakeOperationGenerator) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (func() error, error) { return func() error { - mopg.ch <- nil - // Blocks until the assertion is complete - <-mopg.quit + startOperationAndBlock(fopg.ch, fopg.quit) return nil }, nil } -func (mopg *mockOperationGenerator) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, error) { - return func() error { return nil }, nil +func (fopg *fakeOperationGenerator) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, error) { + return func() error { + startOperationAndBlock(fopg.ch, fopg.quit) + return nil + }, nil } -func (mopg *mockOperationGenerator) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { - return func() error { return nil }, nil +func (fopg *fakeOperationGenerator) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { + return func() error { + startOperationAndBlock(fopg.ch, fopg.quit) + return nil + }, nil } -func (mopg *mockOperationGenerator) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { - return func() error { return nil }, nil +func (fopg *fakeOperationGenerator) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { + return func() error { + startOperationAndBlock(fopg.ch, fopg.quit) + return nil + }, nil } -func (mopg *mockOperationGenerator) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { - return func() error { return nil }, nil +func (fopg *fakeOperationGenerator) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { + return func() error { + startOperationAndBlock(fopg.ch, fopg.quit) + return nil + }, nil } -func (mopg *mockOperationGenerator) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, error) { - return func() error { return nil }, nil +func (fopg *fakeOperationGenerator) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, error) { + return func() error { + startOperationAndBlock(fopg.ch, fopg.quit) + return nil + }, nil } -func (mopg *mockOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { - return func() error { return nil }, nil +func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { + return func() error { + startOperationAndBlock(fopg.ch, fopg.quit) + return nil + }, nil } func getTestPodWithSecret(podName, secretName string) *v1.Pod { @@ -190,7 +320,7 @@ func getTestPodWithGCEPD(podName, pdName string) *v1.Pod { return &v1.Pod{ ObjectMeta: v1.ObjectMeta{ Name: podName, - UID: types.UID(podName + string(uuid.New())), + UID: types.UID(podName + string(uuid.NewUUID())), }, Spec: v1.PodSpec{ Volumes: []v1.Volume{ @@ -224,3 +354,51 @@ func getTestPodWithGCEPD(podName, pdName string) *v1.Pod { }, } } + +func isOperationRunSerially(ch <-chan interface{}, quit chan<- interface{}) bool { + defer close(quit) + numOperationsStarted := 0 +loop: + for { + select { + case <-ch: + numOperationsStarted++ + if numOperationsStarted > 1 { + return false + } + case <-time.After(5 * time.Second): + break loop + } + } + return true +} + +func isOperationRunConcurrently(ch <-chan interface{}, quit chan<- interface{}, numOperationsToRun int) bool { + defer close(quit) + numOperationsStarted := 0 +loop: + for { + select { + case <-ch: + numOperationsStarted++ + if numOperationsStarted == numOperationsToRun { + return true + } + case <-time.After(5 * time.Second): + break loop + } + } + return false +} + +func setup() (chan interface{}, chan interface{}, OperationExecutor) { + ch, quit := make(chan interface{}), make(chan interface{}) + return ch, quit, NewOperationExecutor(newFakeOperationGenerator(ch, quit)) +} + +// This function starts by writing to ch and blocks on the quit channel +// until it is closed by the currently running test +func startOperationAndBlock(ch chan<- interface{}, quit <-chan interface{}) { + ch <- nil + <-quit +}