mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Merge pull request #39288 from rkouj/unit-test-operation-executor
Automatic merge from submit-queue Add unit tests for operation_executor Add unit test for `Unmount operations should start in parallel for all volume plugins` cc: @saad-ali
This commit is contained in:
commit
fd7408d076
@ -41,6 +41,7 @@ go_test(
|
|||||||
"//pkg/api/v1:go_default_library",
|
"//pkg/api/v1:go_default_library",
|
||||||
"//pkg/types:go_default_library",
|
"//pkg/types:go_default_library",
|
||||||
"//pkg/util/mount:go_default_library",
|
"//pkg/util/mount:go_default_library",
|
||||||
"//vendor:k8s.io/client-go/_vendor/github.com/pborman/uuid",
|
"//pkg/util/uuid:go_default_library",
|
||||||
|
"//pkg/volume/util/types:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -17,25 +17,32 @@ limitations under the License.
|
|||||||
package operationexecutor
|
package operationexecutor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"k8s.io/client-go/_vendor/github.com/pborman/uuid"
|
|
||||||
"k8s.io/kubernetes/pkg/api/v1"
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
"k8s.io/kubernetes/pkg/types"
|
"k8s.io/kubernetes/pkg/types"
|
||||||
"k8s.io/kubernetes/pkg/util/mount"
|
"k8s.io/kubernetes/pkg/util/mount"
|
||||||
|
"k8s.io/kubernetes/pkg/util/uuid"
|
||||||
|
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const numVolumesToMount = 2
|
const (
|
||||||
|
numVolumesToMount = 2
|
||||||
|
numAttachableVolumesToUnmount = 2
|
||||||
|
numNonAttachableVolumesToUnmount = 2
|
||||||
|
numDevicesToUnmount = 2
|
||||||
|
numVolumesToAttach = 2
|
||||||
|
numVolumesToDetach = 2
|
||||||
|
numVolumesToVerifyAttached = 2
|
||||||
|
numVolumesToVerifyControllerAttached = 2
|
||||||
|
)
|
||||||
|
|
||||||
var _ OperationGenerator = &mockOperationGenerator{}
|
var _ OperationGenerator = &fakeOperationGenerator{}
|
||||||
|
|
||||||
func TestOperationExecutor_MountVolume_ParallelMountForNonAttachablePlugins(t *testing.T) {
|
func TestOperationExecutor_MountVolume_ConcurrentMountForNonAttachablePlugins(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
ch, quit := make(chan interface{}), make(chan interface{})
|
ch, quit, oe := setup()
|
||||||
numMountOperationsStarted := 0
|
|
||||||
mopg := newMockOperationGenerator(ch, quit)
|
|
||||||
oe := NewOperationExecutor(mopg)
|
|
||||||
volumesToMount := make([]VolumeToMount, numVolumesToMount)
|
volumesToMount := make([]VolumeToMount, numVolumesToMount)
|
||||||
secretName := "secret-volume"
|
secretName := "secret-volume"
|
||||||
volumeName := v1.UniqueVolumeName(secretName)
|
volumeName := v1.UniqueVolumeName(secretName)
|
||||||
@ -54,29 +61,15 @@ func TestOperationExecutor_MountVolume_ParallelMountForNonAttachablePlugins(t *t
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
loop:
|
if !isOperationRunConcurrently(ch, quit, numVolumesToMount) {
|
||||||
for {
|
t.Fatalf("Unable to start mount operations in Concurrent for non-attachable volumes")
|
||||||
select {
|
|
||||||
case <-ch:
|
|
||||||
numMountOperationsStarted++
|
|
||||||
if numMountOperationsStarted == numVolumesToMount {
|
|
||||||
break loop
|
|
||||||
}
|
|
||||||
case <-time.After(5 * time.Second):
|
|
||||||
t.Fatalf("Unable to start mount operations in parallel for non-attachable volumes")
|
|
||||||
break loop
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
close(quit)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestOperationExecutor_MountVolume_ParallelMountForAttachablePlugins(t *testing.T) {
|
func TestOperationExecutor_MountVolume_ConcurrentMountForAttachablePlugins(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
ch, quit := make(chan interface{}), make(chan interface{})
|
ch, quit, oe := setup()
|
||||||
numMountOperationsStarted := 0
|
volumesToMount := make([]VolumeToMount, numVolumesToAttach)
|
||||||
mopg := newMockOperationGenerator(ch, quit)
|
|
||||||
oe := NewOperationExecutor(mopg)
|
|
||||||
volumesToMount := make([]VolumeToMount, numVolumesToMount)
|
|
||||||
pdName := "pd-volume"
|
pdName := "pd-volume"
|
||||||
volumeName := v1.UniqueVolumeName(pdName)
|
volumeName := v1.UniqueVolumeName(pdName)
|
||||||
|
|
||||||
@ -94,59 +87,196 @@ func TestOperationExecutor_MountVolume_ParallelMountForAttachablePlugins(t *test
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
loop:
|
if !isOperationRunSerially(ch, quit) {
|
||||||
for {
|
t.Fatalf("Mount operations should not start concurrently for attachable volumes")
|
||||||
select {
|
|
||||||
case <-ch:
|
|
||||||
numMountOperationsStarted++
|
|
||||||
if numMountOperationsStarted > 1 {
|
|
||||||
t.Fatalf("Mount operations should not start in parallel for attachable volumes")
|
|
||||||
break loop
|
|
||||||
}
|
|
||||||
case <-time.After(5 * time.Second):
|
|
||||||
break loop
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
close(quit)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type mockOperationGenerator struct {
|
func TestOperationExecutor_UnmountVolume_ConcurrentUnmountForAllPlugins(t *testing.T) {
|
||||||
|
// Arrange
|
||||||
|
ch, quit, oe := setup()
|
||||||
|
volumesToUnmount := make([]MountedVolume, numAttachableVolumesToUnmount+numNonAttachableVolumesToUnmount)
|
||||||
|
pdName := "pd-volume"
|
||||||
|
secretName := "secret-volume"
|
||||||
|
|
||||||
|
// Act
|
||||||
|
for i := 0; i < numNonAttachableVolumesToUnmount+numAttachableVolumesToUnmount; i++ {
|
||||||
|
podName := "pod-" + strconv.Itoa(i+1)
|
||||||
|
if i < numNonAttachableVolumesToUnmount {
|
||||||
|
pod := getTestPodWithSecret(podName, secretName)
|
||||||
|
volumesToUnmount[i] = MountedVolume{
|
||||||
|
PodName: volumetypes.UniquePodName(podName),
|
||||||
|
VolumeName: v1.UniqueVolumeName(secretName),
|
||||||
|
PodUID: pod.UID,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pod := getTestPodWithGCEPD(podName, pdName)
|
||||||
|
volumesToUnmount[i] = MountedVolume{
|
||||||
|
PodName: volumetypes.UniquePodName(podName),
|
||||||
|
VolumeName: v1.UniqueVolumeName(pdName),
|
||||||
|
PodUID: pod.UID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
oe.UnmountVolume(volumesToUnmount[i], nil /* actualStateOfWorldMounterUpdater */)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
if !isOperationRunConcurrently(ch, quit, numNonAttachableVolumesToUnmount+numAttachableVolumesToUnmount) {
|
||||||
|
t.Fatalf("Unable to start unmount operations concurrently for volume plugins")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOperationExecutor_UnmountDeviceConcurrently(t *testing.T) {
|
||||||
|
// Arrange
|
||||||
|
ch, quit, oe := setup()
|
||||||
|
attachedVolumes := make([]AttachedVolume, numDevicesToUnmount)
|
||||||
|
pdName := "pd-volume"
|
||||||
|
|
||||||
|
// Act
|
||||||
|
for i := range attachedVolumes {
|
||||||
|
attachedVolumes[i] = AttachedVolume{
|
||||||
|
VolumeName: v1.UniqueVolumeName(pdName),
|
||||||
|
NodeName: "node-name",
|
||||||
|
}
|
||||||
|
oe.UnmountDevice(attachedVolumes[i], nil /* actualStateOfWorldMounterUpdater */, nil /* mount.Interface */)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
if !isOperationRunSerially(ch, quit) {
|
||||||
|
t.Fatalf("Unmount device operations should not start concurrently")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOperationExecutor_AttachVolumeConcurrently(t *testing.T) {
|
||||||
|
// Arrange
|
||||||
|
ch, quit, oe := setup()
|
||||||
|
volumesToAttach := make([]VolumeToAttach, numVolumesToAttach)
|
||||||
|
pdName := "pd-volume"
|
||||||
|
|
||||||
|
// Act
|
||||||
|
for i := range volumesToAttach {
|
||||||
|
volumesToAttach[i] = VolumeToAttach{
|
||||||
|
VolumeName: v1.UniqueVolumeName(pdName),
|
||||||
|
NodeName: "node",
|
||||||
|
}
|
||||||
|
oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
if !isOperationRunSerially(ch, quit) {
|
||||||
|
t.Fatalf("Attach volume operations should not start concurrently")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOperationExecutor_DetachVolumeConcurrently(t *testing.T) {
|
||||||
|
// Arrange
|
||||||
|
ch, quit, oe := setup()
|
||||||
|
attachedVolumes := make([]AttachedVolume, numVolumesToDetach)
|
||||||
|
pdName := "pd-volume"
|
||||||
|
|
||||||
|
// Act
|
||||||
|
for i := range attachedVolumes {
|
||||||
|
attachedVolumes[i] = AttachedVolume{
|
||||||
|
VolumeName: v1.UniqueVolumeName(pdName),
|
||||||
|
NodeName: "node",
|
||||||
|
}
|
||||||
|
oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
if !isOperationRunSerially(ch, quit) {
|
||||||
|
t.Fatalf("DetachVolume operations should not run concurrently")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOperationExecutor_VerifyVolumesAreAttachedConcurrently(t *testing.T) {
|
||||||
|
// Arrange
|
||||||
|
ch, quit, oe := setup()
|
||||||
|
|
||||||
|
// Act
|
||||||
|
for i := 0; i < numVolumesToVerifyAttached; i++ {
|
||||||
|
oe.VerifyVolumesAreAttached(nil /* attachedVolumes */, "node-name", nil /* actualStateOfWorldAttacherUpdater */)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
if !isOperationRunConcurrently(ch, quit, numVolumesToVerifyAttached) {
|
||||||
|
t.Fatalf("VerifyVolumesAreAttached operation is not being run concurrently")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOperationExecutor_VerifyControllerAttachedVolumeConcurrently(t *testing.T) {
|
||||||
|
// Arrange
|
||||||
|
ch, quit, oe := setup()
|
||||||
|
volumesToMount := make([]VolumeToMount, numVolumesToVerifyControllerAttached)
|
||||||
|
pdName := "pd-volume"
|
||||||
|
|
||||||
|
// Act
|
||||||
|
for i := range volumesToMount {
|
||||||
|
volumesToMount[i] = VolumeToMount{
|
||||||
|
VolumeName: v1.UniqueVolumeName(pdName),
|
||||||
|
}
|
||||||
|
oe.VerifyControllerAttachedVolume(volumesToMount[i], types.NodeName("node-name"), nil /* actualStateOfWorldMounterUpdater */)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
if !isOperationRunSerially(ch, quit) {
|
||||||
|
t.Fatalf("VerifyControllerAttachedVolume should not run concurrently")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakeOperationGenerator struct {
|
||||||
ch chan interface{}
|
ch chan interface{}
|
||||||
quit chan interface{}
|
quit chan interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMockOperationGenerator(ch, quit chan interface{}) OperationGenerator {
|
func newFakeOperationGenerator(ch chan interface{}, quit chan interface{}) OperationGenerator {
|
||||||
return &mockOperationGenerator{
|
return &fakeOperationGenerator{
|
||||||
ch: ch,
|
ch: ch,
|
||||||
quit: quit,
|
quit: quit,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mopg *mockOperationGenerator) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (func() error, error) {
|
func (fopg *fakeOperationGenerator) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (func() error, error) {
|
||||||
return func() error {
|
return func() error {
|
||||||
mopg.ch <- nil
|
startOperationAndBlock(fopg.ch, fopg.quit)
|
||||||
// Blocks until the assertion is complete
|
|
||||||
<-mopg.quit
|
|
||||||
return nil
|
return nil
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
func (mopg *mockOperationGenerator) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, error) {
|
func (fopg *fakeOperationGenerator) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, error) {
|
||||||
return func() error { return nil }, nil
|
return func() error {
|
||||||
|
startOperationAndBlock(fopg.ch, fopg.quit)
|
||||||
|
return nil
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
func (mopg *mockOperationGenerator) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
|
func (fopg *fakeOperationGenerator) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
|
||||||
return func() error { return nil }, nil
|
return func() error {
|
||||||
|
startOperationAndBlock(fopg.ch, fopg.quit)
|
||||||
|
return nil
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
func (mopg *mockOperationGenerator) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
|
func (fopg *fakeOperationGenerator) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
|
||||||
return func() error { return nil }, nil
|
return func() error {
|
||||||
|
startOperationAndBlock(fopg.ch, fopg.quit)
|
||||||
|
return nil
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
func (mopg *mockOperationGenerator) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
|
func (fopg *fakeOperationGenerator) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
|
||||||
return func() error { return nil }, nil
|
return func() error {
|
||||||
|
startOperationAndBlock(fopg.ch, fopg.quit)
|
||||||
|
return nil
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
func (mopg *mockOperationGenerator) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, error) {
|
func (fopg *fakeOperationGenerator) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, error) {
|
||||||
return func() error { return nil }, nil
|
return func() error {
|
||||||
|
startOperationAndBlock(fopg.ch, fopg.quit)
|
||||||
|
return nil
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
func (mopg *mockOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
|
func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
|
||||||
return func() error { return nil }, nil
|
return func() error {
|
||||||
|
startOperationAndBlock(fopg.ch, fopg.quit)
|
||||||
|
return nil
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getTestPodWithSecret(podName, secretName string) *v1.Pod {
|
func getTestPodWithSecret(podName, secretName string) *v1.Pod {
|
||||||
@ -190,7 +320,7 @@ func getTestPodWithGCEPD(podName, pdName string) *v1.Pod {
|
|||||||
return &v1.Pod{
|
return &v1.Pod{
|
||||||
ObjectMeta: v1.ObjectMeta{
|
ObjectMeta: v1.ObjectMeta{
|
||||||
Name: podName,
|
Name: podName,
|
||||||
UID: types.UID(podName + string(uuid.New())),
|
UID: types.UID(podName + string(uuid.NewUUID())),
|
||||||
},
|
},
|
||||||
Spec: v1.PodSpec{
|
Spec: v1.PodSpec{
|
||||||
Volumes: []v1.Volume{
|
Volumes: []v1.Volume{
|
||||||
@ -224,3 +354,51 @@ func getTestPodWithGCEPD(podName, pdName string) *v1.Pod {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isOperationRunSerially(ch <-chan interface{}, quit chan<- interface{}) bool {
|
||||||
|
defer close(quit)
|
||||||
|
numOperationsStarted := 0
|
||||||
|
loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
numOperationsStarted++
|
||||||
|
if numOperationsStarted > 1 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
break loop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func isOperationRunConcurrently(ch <-chan interface{}, quit chan<- interface{}, numOperationsToRun int) bool {
|
||||||
|
defer close(quit)
|
||||||
|
numOperationsStarted := 0
|
||||||
|
loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
numOperationsStarted++
|
||||||
|
if numOperationsStarted == numOperationsToRun {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
break loop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func setup() (chan interface{}, chan interface{}, OperationExecutor) {
|
||||||
|
ch, quit := make(chan interface{}), make(chan interface{})
|
||||||
|
return ch, quit, NewOperationExecutor(newFakeOperationGenerator(ch, quit))
|
||||||
|
}
|
||||||
|
|
||||||
|
// This function starts by writing to ch and blocks on the quit channel
|
||||||
|
// until it is closed by the currently running test
|
||||||
|
func startOperationAndBlock(ch chan<- interface{}, quit <-chan interface{}) {
|
||||||
|
ch <- nil
|
||||||
|
<-quit
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user