mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Add unit tests for operation_executor
This commit is contained in:
parent
a8b168a77a
commit
7ebcab8c19
@ -41,6 +41,7 @@ go_test(
|
||||
"//pkg/api/v1:go_default_library",
|
||||
"//pkg/types:go_default_library",
|
||||
"//pkg/util/mount:go_default_library",
|
||||
"//vendor:k8s.io/client-go/_vendor/github.com/pborman/uuid",
|
||||
"//pkg/util/uuid:go_default_library",
|
||||
"//pkg/volume/util/types:go_default_library",
|
||||
],
|
||||
)
|
||||
|
@ -17,25 +17,32 @@ limitations under the License.
|
||||
package operationexecutor
|
||||
|
||||
import (
|
||||
"k8s.io/client-go/_vendor/github.com/pborman/uuid"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
"k8s.io/kubernetes/pkg/util/uuid"
|
||||
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
const numVolumesToMount = 2
|
||||
const (
|
||||
numVolumesToMount = 2
|
||||
numAttachableVolumesToUnmount = 2
|
||||
numNonAttachableVolumesToUnmount = 2
|
||||
numDevicesToUnmount = 2
|
||||
numVolumesToAttach = 2
|
||||
numVolumesToDetach = 2
|
||||
numVolumesToVerifyAttached = 2
|
||||
numVolumesToVerifyControllerAttached = 2
|
||||
)
|
||||
|
||||
var _ OperationGenerator = &mockOperationGenerator{}
|
||||
var _ OperationGenerator = &fakeOperationGenerator{}
|
||||
|
||||
func TestOperationExecutor_MountVolume_ParallelMountForNonAttachablePlugins(t *testing.T) {
|
||||
func TestOperationExecutor_MountVolume_ConcurrentMountForNonAttachablePlugins(t *testing.T) {
|
||||
// Arrange
|
||||
ch, quit := make(chan interface{}), make(chan interface{})
|
||||
numMountOperationsStarted := 0
|
||||
mopg := newMockOperationGenerator(ch, quit)
|
||||
oe := NewOperationExecutor(mopg)
|
||||
ch, quit, oe := setup()
|
||||
volumesToMount := make([]VolumeToMount, numVolumesToMount)
|
||||
secretName := "secret-volume"
|
||||
volumeName := v1.UniqueVolumeName(secretName)
|
||||
@ -54,29 +61,15 @@ func TestOperationExecutor_MountVolume_ParallelMountForNonAttachablePlugins(t *t
|
||||
}
|
||||
|
||||
// Assert
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case <-ch:
|
||||
numMountOperationsStarted++
|
||||
if numMountOperationsStarted == numVolumesToMount {
|
||||
break loop
|
||||
}
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("Unable to start mount operations in parallel for non-attachable volumes")
|
||||
break loop
|
||||
}
|
||||
if !isOperationRunConcurrently(ch, quit, numVolumesToMount) {
|
||||
t.Fatalf("Unable to start mount operations in Concurrent for non-attachable volumes")
|
||||
}
|
||||
close(quit)
|
||||
}
|
||||
|
||||
func TestOperationExecutor_MountVolume_ParallelMountForAttachablePlugins(t *testing.T) {
|
||||
func TestOperationExecutor_MountVolume_ConcurrentMountForAttachablePlugins(t *testing.T) {
|
||||
// Arrange
|
||||
ch, quit := make(chan interface{}), make(chan interface{})
|
||||
numMountOperationsStarted := 0
|
||||
mopg := newMockOperationGenerator(ch, quit)
|
||||
oe := NewOperationExecutor(mopg)
|
||||
volumesToMount := make([]VolumeToMount, numVolumesToMount)
|
||||
ch, quit, oe := setup()
|
||||
volumesToMount := make([]VolumeToMount, numVolumesToAttach)
|
||||
pdName := "pd-volume"
|
||||
volumeName := v1.UniqueVolumeName(pdName)
|
||||
|
||||
@ -94,59 +87,196 @@ func TestOperationExecutor_MountVolume_ParallelMountForAttachablePlugins(t *test
|
||||
}
|
||||
|
||||
// Assert
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case <-ch:
|
||||
numMountOperationsStarted++
|
||||
if numMountOperationsStarted > 1 {
|
||||
t.Fatalf("Mount operations should not start in parallel for attachable volumes")
|
||||
break loop
|
||||
}
|
||||
case <-time.After(5 * time.Second):
|
||||
break loop
|
||||
}
|
||||
if !isOperationRunSerially(ch, quit) {
|
||||
t.Fatalf("Mount operations should not start concurrently for attachable volumes")
|
||||
}
|
||||
close(quit)
|
||||
}
|
||||
|
||||
type mockOperationGenerator struct {
|
||||
func TestOperationExecutor_UnmountVolume_ConcurrentUnmountForAllPlugins(t *testing.T) {
|
||||
// Arrange
|
||||
ch, quit, oe := setup()
|
||||
volumesToUnmount := make([]MountedVolume, numAttachableVolumesToUnmount+numNonAttachableVolumesToUnmount)
|
||||
pdName := "pd-volume"
|
||||
secretName := "secret-volume"
|
||||
|
||||
// Act
|
||||
for i := 0; i < numNonAttachableVolumesToUnmount+numAttachableVolumesToUnmount; i++ {
|
||||
podName := "pod-" + strconv.Itoa(i+1)
|
||||
if i < numNonAttachableVolumesToUnmount {
|
||||
pod := getTestPodWithSecret(podName, secretName)
|
||||
volumesToUnmount[i] = MountedVolume{
|
||||
PodName: volumetypes.UniquePodName(podName),
|
||||
VolumeName: v1.UniqueVolumeName(secretName),
|
||||
PodUID: pod.UID,
|
||||
}
|
||||
} else {
|
||||
pod := getTestPodWithGCEPD(podName, pdName)
|
||||
volumesToUnmount[i] = MountedVolume{
|
||||
PodName: volumetypes.UniquePodName(podName),
|
||||
VolumeName: v1.UniqueVolumeName(pdName),
|
||||
PodUID: pod.UID,
|
||||
}
|
||||
}
|
||||
oe.UnmountVolume(volumesToUnmount[i], nil /* actualStateOfWorldMounterUpdater */)
|
||||
}
|
||||
|
||||
// Assert
|
||||
if !isOperationRunConcurrently(ch, quit, numNonAttachableVolumesToUnmount+numAttachableVolumesToUnmount) {
|
||||
t.Fatalf("Unable to start unmount operations concurrently for volume plugins")
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperationExecutor_UnmountDeviceConcurrently(t *testing.T) {
|
||||
// Arrange
|
||||
ch, quit, oe := setup()
|
||||
attachedVolumes := make([]AttachedVolume, numDevicesToUnmount)
|
||||
pdName := "pd-volume"
|
||||
|
||||
// Act
|
||||
for i := range attachedVolumes {
|
||||
attachedVolumes[i] = AttachedVolume{
|
||||
VolumeName: v1.UniqueVolumeName(pdName),
|
||||
NodeName: "node-name",
|
||||
}
|
||||
oe.UnmountDevice(attachedVolumes[i], nil /* actualStateOfWorldMounterUpdater */, nil /* mount.Interface */)
|
||||
}
|
||||
|
||||
// Assert
|
||||
if !isOperationRunSerially(ch, quit) {
|
||||
t.Fatalf("Unmount device operations should not start concurrently")
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperationExecutor_AttachVolumeConcurrently(t *testing.T) {
|
||||
// Arrange
|
||||
ch, quit, oe := setup()
|
||||
volumesToAttach := make([]VolumeToAttach, numVolumesToAttach)
|
||||
pdName := "pd-volume"
|
||||
|
||||
// Act
|
||||
for i := range volumesToAttach {
|
||||
volumesToAttach[i] = VolumeToAttach{
|
||||
VolumeName: v1.UniqueVolumeName(pdName),
|
||||
NodeName: "node",
|
||||
}
|
||||
oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */)
|
||||
}
|
||||
|
||||
// Assert
|
||||
if !isOperationRunSerially(ch, quit) {
|
||||
t.Fatalf("Attach volume operations should not start concurrently")
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperationExecutor_DetachVolumeConcurrently(t *testing.T) {
|
||||
// Arrange
|
||||
ch, quit, oe := setup()
|
||||
attachedVolumes := make([]AttachedVolume, numVolumesToDetach)
|
||||
pdName := "pd-volume"
|
||||
|
||||
// Act
|
||||
for i := range attachedVolumes {
|
||||
attachedVolumes[i] = AttachedVolume{
|
||||
VolumeName: v1.UniqueVolumeName(pdName),
|
||||
NodeName: "node",
|
||||
}
|
||||
oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */)
|
||||
}
|
||||
|
||||
// Assert
|
||||
if !isOperationRunSerially(ch, quit) {
|
||||
t.Fatalf("DetachVolume operations should not run concurrently")
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperationExecutor_VerifyVolumesAreAttachedConcurrently(t *testing.T) {
|
||||
// Arrange
|
||||
ch, quit, oe := setup()
|
||||
|
||||
// Act
|
||||
for i := 0; i < numVolumesToVerifyAttached; i++ {
|
||||
oe.VerifyVolumesAreAttached(nil /* attachedVolumes */, "node-name", nil /* actualStateOfWorldAttacherUpdater */)
|
||||
}
|
||||
|
||||
// Assert
|
||||
if !isOperationRunConcurrently(ch, quit, numVolumesToVerifyAttached) {
|
||||
t.Fatalf("VerifyVolumesAreAttached operation is not being run concurrently")
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperationExecutor_VerifyControllerAttachedVolumeConcurrently(t *testing.T) {
|
||||
// Arrange
|
||||
ch, quit, oe := setup()
|
||||
volumesToMount := make([]VolumeToMount, numVolumesToVerifyControllerAttached)
|
||||
pdName := "pd-volume"
|
||||
|
||||
// Act
|
||||
for i := range volumesToMount {
|
||||
volumesToMount[i] = VolumeToMount{
|
||||
VolumeName: v1.UniqueVolumeName(pdName),
|
||||
}
|
||||
oe.VerifyControllerAttachedVolume(volumesToMount[i], types.NodeName("node-name"), nil /* actualStateOfWorldMounterUpdater */)
|
||||
}
|
||||
|
||||
// Assert
|
||||
if !isOperationRunSerially(ch, quit) {
|
||||
t.Fatalf("VerifyControllerAttachedVolume should not run concurrently")
|
||||
}
|
||||
}
|
||||
|
||||
type fakeOperationGenerator struct {
|
||||
ch chan interface{}
|
||||
quit chan interface{}
|
||||
}
|
||||
|
||||
func newMockOperationGenerator(ch, quit chan interface{}) OperationGenerator {
|
||||
return &mockOperationGenerator{
|
||||
func newFakeOperationGenerator(ch chan interface{}, quit chan interface{}) OperationGenerator {
|
||||
return &fakeOperationGenerator{
|
||||
ch: ch,
|
||||
quit: quit,
|
||||
}
|
||||
}
|
||||
|
||||
func (mopg *mockOperationGenerator) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (func() error, error) {
|
||||
func (fopg *fakeOperationGenerator) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (func() error, error) {
|
||||
return func() error {
|
||||
mopg.ch <- nil
|
||||
// Blocks until the assertion is complete
|
||||
<-mopg.quit
|
||||
startOperationAndBlock(fopg.ch, fopg.quit)
|
||||
return nil
|
||||
}, nil
|
||||
}
|
||||
func (mopg *mockOperationGenerator) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, error) {
|
||||
return func() error { return nil }, nil
|
||||
func (fopg *fakeOperationGenerator) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, error) {
|
||||
return func() error {
|
||||
startOperationAndBlock(fopg.ch, fopg.quit)
|
||||
return nil
|
||||
}, nil
|
||||
}
|
||||
func (mopg *mockOperationGenerator) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
|
||||
return func() error { return nil }, nil
|
||||
func (fopg *fakeOperationGenerator) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
|
||||
return func() error {
|
||||
startOperationAndBlock(fopg.ch, fopg.quit)
|
||||
return nil
|
||||
}, nil
|
||||
}
|
||||
func (mopg *mockOperationGenerator) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
|
||||
return func() error { return nil }, nil
|
||||
func (fopg *fakeOperationGenerator) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
|
||||
return func() error {
|
||||
startOperationAndBlock(fopg.ch, fopg.quit)
|
||||
return nil
|
||||
}, nil
|
||||
}
|
||||
func (mopg *mockOperationGenerator) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
|
||||
return func() error { return nil }, nil
|
||||
func (fopg *fakeOperationGenerator) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
|
||||
return func() error {
|
||||
startOperationAndBlock(fopg.ch, fopg.quit)
|
||||
return nil
|
||||
}, nil
|
||||
}
|
||||
func (mopg *mockOperationGenerator) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, error) {
|
||||
return func() error { return nil }, nil
|
||||
func (fopg *fakeOperationGenerator) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, error) {
|
||||
return func() error {
|
||||
startOperationAndBlock(fopg.ch, fopg.quit)
|
||||
return nil
|
||||
}, nil
|
||||
}
|
||||
func (mopg *mockOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
|
||||
return func() error { return nil }, nil
|
||||
func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
|
||||
return func() error {
|
||||
startOperationAndBlock(fopg.ch, fopg.quit)
|
||||
return nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
func getTestPodWithSecret(podName, secretName string) *v1.Pod {
|
||||
@ -190,7 +320,7 @@ func getTestPodWithGCEPD(podName, pdName string) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: podName,
|
||||
UID: types.UID(podName + string(uuid.New())),
|
||||
UID: types.UID(podName + string(uuid.NewUUID())),
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Volumes: []v1.Volume{
|
||||
@ -224,3 +354,51 @@ func getTestPodWithGCEPD(podName, pdName string) *v1.Pod {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func isOperationRunSerially(ch <-chan interface{}, quit chan<- interface{}) bool {
|
||||
defer close(quit)
|
||||
numOperationsStarted := 0
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case <-ch:
|
||||
numOperationsStarted++
|
||||
if numOperationsStarted > 1 {
|
||||
return false
|
||||
}
|
||||
case <-time.After(5 * time.Second):
|
||||
break loop
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func isOperationRunConcurrently(ch <-chan interface{}, quit chan<- interface{}, numOperationsToRun int) bool {
|
||||
defer close(quit)
|
||||
numOperationsStarted := 0
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case <-ch:
|
||||
numOperationsStarted++
|
||||
if numOperationsStarted == numOperationsToRun {
|
||||
return true
|
||||
}
|
||||
case <-time.After(5 * time.Second):
|
||||
break loop
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func setup() (chan interface{}, chan interface{}, OperationExecutor) {
|
||||
ch, quit := make(chan interface{}), make(chan interface{})
|
||||
return ch, quit, NewOperationExecutor(newFakeOperationGenerator(ch, quit))
|
||||
}
|
||||
|
||||
// This function starts by writing to ch and blocks on the quit channel
|
||||
// until it is closed by the currently running test
|
||||
func startOperationAndBlock(ch chan<- interface{}, quit <-chan interface{}) {
|
||||
ch <- nil
|
||||
<-quit
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user