diff --git a/test/integration/persistent_volumes_test.go b/test/integration/persistent_volumes_test.go index 5ebc3e13e14..198c6295028 100644 --- a/test/integration/persistent_volumes_test.go +++ b/test/integration/persistent_volumes_test.go @@ -22,10 +22,13 @@ import ( "fmt" "math/rand" "net/http/httptest" + "os" "strconv" "testing" "time" + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/testapi" @@ -44,6 +47,61 @@ func init() { requireEtcd() } +// Several tests in this file are configurable by enviroment variables: +// KUBE_INTEGRATION_PV_OBJECTS - nr. of PVs/PVCs to be created +// (100 by default) +// KUBE_INTEGRATION_PV_SYNC_PERIOD - volume controller sync period +// (10s by default) +// KUBE_INTEGRATION_PV_END_SLEEP - for how long should +// TestPersistentVolumeMultiPVsPVCs sleep when it's finished (0s by +// default). This is useful to test how long does it take for periodic sync +// to process bound PVs/PVCs. +// +const defaultObjectCount = 100 +const defaultSyncPeriod = 10 * time.Second + +func getObjectCount() int { + objectCount := defaultObjectCount + if s := os.Getenv("KUBE_INTEGRATION_PV_OBJECTS"); s != "" { + var err error + objectCount, err = strconv.Atoi(s) + if err != nil { + glog.Fatalf("cannot parse value of KUBE_INTEGRATION_PV_OBJECTS: %v", err) + } + } + glog.V(2).Infof("using KUBE_INTEGRATION_PV_OBJECTS=%d", objectCount) + return objectCount +} + +func getSyncPeriod() time.Duration { + period := defaultSyncPeriod + if s := os.Getenv("KUBE_INTEGRATION_PV_SYNC_PERIOD"); s != "" { + var err error + period, err = time.ParseDuration(s) + if err != nil { + glog.Fatalf("cannot parse value of KUBE_INTEGRATION_PV_SYNC_PERIOD: %v", err) + } + } + glog.V(2).Infof("using KUBE_INTEGRATION_PV_SYNC_PERIOD=%v", period) + return period +} + +func testSleep() { + var period time.Duration + if s := os.Getenv("KUBE_INTEGRATION_PV_END_SLEEP"); s != "" { + var err error + period, err = time.ParseDuration(s) + if err != nil { + glog.Fatalf("cannot parse value of KUBE_INTEGRATION_PV_END_SLEEP: %v", err) + } + } + glog.V(2).Infof("using KUBE_INTEGRATION_PV_END_SLEEP=%v", period) + if period != 0 { + time.Sleep(period) + glog.V(2).Infof("sleep finished") + } +} + func TestPersistentVolumeRecycler(t *testing.T) { _, s := framework.RunAMaster(t) defer s.Close() @@ -72,16 +130,16 @@ func TestPersistentVolumeRecycler(t *testing.T) { } // wait until the controller pairs the volume and claim - waitForPersistentVolumePhase(watchPV, api.VolumeBound) - waitForPersistentVolumeClaimPhase(watchPVC, api.ClaimBound) + waitForPersistentVolumePhase(testClient, pv.Name, watchPV, api.VolumeBound) + waitForPersistentVolumeClaimPhase(testClient, pvc.Name, watchPVC, api.ClaimBound) // deleting a claim releases the volume, after which it can be recycled if err := testClient.PersistentVolumeClaims(api.NamespaceDefault).Delete(pvc.Name, nil); err != nil { t.Errorf("error deleting claim %s", pvc.Name) } - waitForPersistentVolumePhase(watchPV, api.VolumeReleased) - waitForPersistentVolumePhase(watchPV, api.VolumeAvailable) + waitForPersistentVolumePhase(testClient, pv.Name, watchPV, api.VolumeReleased) + waitForPersistentVolumePhase(testClient, pv.Name, watchPV, api.VolumeAvailable) // end of Recycler test. // Deleter test begins now. @@ -100,16 +158,15 @@ func TestPersistentVolumeRecycler(t *testing.T) { if err != nil { t.Errorf("Failed to create PersistentVolumeClaim: %v", err) } - - waitForPersistentVolumePhase(watchPV, api.VolumeBound) - waitForPersistentVolumeClaimPhase(watchPVC, api.ClaimBound) + waitForPersistentVolumePhase(testClient, pv.Name, watchPV, api.VolumeBound) + waitForPersistentVolumeClaimPhase(testClient, pvc.Name, watchPVC, api.ClaimBound) // deleting a claim releases the volume, after which it can be recycled if err := testClient.PersistentVolumeClaims(api.NamespaceDefault).Delete(pvc.Name, nil); err != nil { t.Errorf("error deleting claim %s", pvc.Name) } - waitForPersistentVolumePhase(watchPV, api.VolumeReleased) + waitForPersistentVolumePhase(testClient, pv.Name, watchPV, api.VolumeReleased) for { event := <-watchPV.ResultChan() @@ -149,8 +206,8 @@ func TestPersistentVolumeRecycler(t *testing.T) { t.Fatalf("Unexpected error creating pv: %v", err) } - waitForPersistentVolumePhase(watchPV, api.VolumeBound) - waitForPersistentVolumeClaimPhase(watchPVC, api.ClaimBound) + waitForPersistentVolumePhase(testClient, pv.Name, watchPV, api.VolumeBound) + waitForAnyPersistentVolumeClaimPhase(watchPVC, api.ClaimBound) pv, err = testClient.PersistentVolumes().Get(pv.Name) if err != nil { @@ -164,6 +221,8 @@ func TestPersistentVolumeRecycler(t *testing.T) { } } +// TestPersistentVolumeMultiPVs tests binding of one PVC to 100 PVs with +// different size. func TestPersistentVolumeMultiPVs(t *testing.T) { _, s := framework.RunAMaster(t) defer s.Close() @@ -176,7 +235,7 @@ func TestPersistentVolumeMultiPVs(t *testing.T) { controller.Run() defer controller.Stop() - maxPVs := 100 + maxPVs := getObjectCount() pvs := make([]*api.PersistentVolume, maxPVs) for i := 0; i < maxPVs; i++ { // This PV will be claimed, released, and deleted @@ -200,10 +259,10 @@ func TestPersistentVolumeMultiPVs(t *testing.T) { } t.Log("claim created") - // wait until the controller pairs the volume and claim - waitForPersistentVolumePhase(watchPV, api.VolumeBound) + // wait until the binder pairs the claim with a volume + waitForAnyPersistentVolumePhase(watchPV, api.VolumeBound) t.Log("volume bound") - waitForPersistentVolumeClaimPhase(watchPVC, api.ClaimBound) + waitForPersistentVolumeClaimPhase(testClient, pvc.Name, watchPVC, api.ClaimBound) t.Log("claim bound") // only one PV is bound @@ -239,12 +298,94 @@ func TestPersistentVolumeMultiPVs(t *testing.T) { } t.Log("claim deleted") - waitForPersistentVolumePhase(watchPV, api.VolumeReleased) + waitForAnyPersistentVolumePhase(watchPV, api.VolumeReleased) t.Log("volumes released") deleteAllEtcdKeys() } +// TestPersistentVolumeMultiPVsPVCs tests binding of 100 PVC to 100 PVs. +// This test is configurable by KUBE_INTEGRATION_PV_* variables. +func TestPersistentVolumeMultiPVsPVCs(t *testing.T) { + _, s := framework.RunAMaster(t) + defer s.Close() + + deleteAllEtcdKeys() + testClient, binder, watchPV, watchPVC := createClients(t, s) + defer watchPV.Stop() + defer watchPVC.Stop() + + binder.Run() + defer binder.Stop() + + objCount := getObjectCount() + pvs := make([]*api.PersistentVolume, objCount) + pvcs := make([]*api.PersistentVolumeClaim, objCount) + for i := 0; i < objCount; i++ { + // This PV will be claimed, released, and deleted + pvs[i] = createPV("pv-"+strconv.Itoa(i), "/tmp/foo"+strconv.Itoa(i), "1G", + []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, api.PersistentVolumeReclaimRetain) + pvcs[i] = createPVC("pvc-"+strconv.Itoa(i), "1G", []api.PersistentVolumeAccessMode{api.ReadWriteOnce}) + } + + // Create PVs first + glog.V(2).Infof("TestPersistentVolumeMultiPVsPVCs: start") + + // Create the volumes in a separate goroutine to pop events from + // watchPV early - it seems it has limited capacity and it gets stuck + // with >3000 volumes. + go func() { + for i := 0; i < objCount; i++ { + _, _ = testClient.PersistentVolumes().Create(pvs[i]) + } + }() + // Wait for them to get Available + for i := 0; i < objCount; i++ { + waitForAnyPersistentVolumePhase(watchPV, api.VolumeAvailable) + glog.V(1).Infof("%d volumes available", i+1) + } + glog.V(2).Infof("TestPersistentVolumeMultiPVsPVCs: volumes are Available") + + // Create the claims, again in a separate goroutine. + go func() { + for i := 0; i < objCount; i++ { + _, _ = testClient.PersistentVolumeClaims(api.NamespaceDefault).Create(pvcs[i]) + } + }() + + // wait until the binder pairs all volumes + for i := 0; i < objCount; i++ { + waitForAnyPersistentVolumeClaimPhase(watchPVC, api.ClaimBound) + glog.V(1).Infof("%d claims bound", i+1) + } + glog.V(2).Infof("TestPersistentVolumeMultiPVsPVCs: claims are bound") + + // check that everything is bound to something + for i := 0; i < objCount; i++ { + pv, err := testClient.PersistentVolumes().Get(pvs[i].Name) + if err != nil { + t.Fatalf("Unexpected error getting pv: %v", err) + } + if pv.Spec.ClaimRef == nil { + t.Fatalf("PV %q is not bound", pv.Name) + } + glog.V(2).Infof("PV %q is bound to PVC %q", pv.Name, pv.Spec.ClaimRef.Name) + + pvc, err := testClient.PersistentVolumeClaims(api.NamespaceDefault).Get(pvcs[i].Name) + if err != nil { + t.Fatalf("Unexpected error getting pvc: %v", err) + } + if pvc.Spec.VolumeName == "" { + t.Fatalf("PVC %q is not bound", pvc.Name) + } + glog.V(2).Infof("PVC %q is bound to PV %q", pvc.Name, pvc.Spec.VolumeName) + } + testSleep() + deleteAllEtcdKeys() +} + +// TestPersistentVolumeMultiPVsDiffAccessModes tests binding of one PVC to two +// PVs with different access modes. func TestPersistentVolumeMultiPVsDiffAccessModes(t *testing.T) { _, s := framework.RunAMaster(t) defer s.Close() @@ -282,9 +423,9 @@ func TestPersistentVolumeMultiPVsDiffAccessModes(t *testing.T) { t.Log("claim created") // wait until the controller pairs the volume and claim - waitForPersistentVolumePhase(watchPV, api.VolumeBound) + waitForAnyPersistentVolumePhase(watchPV, api.VolumeBound) t.Log("volume bound") - waitForPersistentVolumeClaimPhase(watchPVC, api.ClaimBound) + waitForPersistentVolumeClaimPhase(testClient, pvc.Name, watchPVC, api.ClaimBound) t.Log("claim bound") // only RWM PV is bound @@ -312,36 +453,88 @@ func TestPersistentVolumeMultiPVsDiffAccessModes(t *testing.T) { } t.Log("claim deleted") - waitForPersistentVolumePhase(watchPV, api.VolumeReleased) + waitForAnyPersistentVolumePhase(watchPV, api.VolumeReleased) t.Log("volume released") deleteAllEtcdKeys() } -func waitForPersistentVolumePhase(w watch.Interface, phase api.PersistentVolumePhase) { +func waitForPersistentVolumePhase(client *clientset.Clientset, pvName string, w watch.Interface, phase api.PersistentVolumePhase) { + // Check if the volume is already in requested phase + volume, err := client.Core().PersistentVolumes().Get(pvName) + if err == nil && volume.Status.Phase == phase { + return + } + + // Wait for the phase for { event := <-w.ResultChan() - volume := event.Object.(*api.PersistentVolume) - if volume.Status.Phase == phase { + volume, ok := event.Object.(*api.PersistentVolume) + if !ok { + continue + } + if volume.Status.Phase == phase && volume.Name == pvName { + glog.V(2).Infof("volume %q is %s", volume.Name, phase) break } } } -func waitForPersistentVolumeClaimPhase(w watch.Interface, phase api.PersistentVolumeClaimPhase) { +func waitForPersistentVolumeClaimPhase(client *clientset.Clientset, claimName string, w watch.Interface, phase api.PersistentVolumeClaimPhase) { + // Check if the claim is already in requested phase + claim, err := client.Core().PersistentVolumeClaims(api.NamespaceDefault).Get(claimName) + if err == nil && claim.Status.Phase == phase { + return + } + + // Wait for the phase for { event := <-w.ResultChan() - claim := event.Object.(*api.PersistentVolumeClaim) + claim, ok := event.Object.(*api.PersistentVolumeClaim) + if !ok { + continue + } + if claim.Status.Phase == phase && claim.Name == claimName { + glog.V(2).Infof("claim %q is %s", claim.Name, phase) + break + } + } +} + +func waitForAnyPersistentVolumePhase(w watch.Interface, phase api.PersistentVolumePhase) { + for { + event := <-w.ResultChan() + volume, ok := event.Object.(*api.PersistentVolume) + if !ok { + continue + } + if volume.Status.Phase == phase { + glog.V(2).Infof("volume %q is %s", volume.Name, phase) + break + } + } +} + +func waitForAnyPersistentVolumeClaimPhase(w watch.Interface, phase api.PersistentVolumeClaimPhase) { + for { + event := <-w.ResultChan() + claim, ok := event.Object.(*api.PersistentVolumeClaim) + if !ok { + continue + } if claim.Status.Phase == phase { + glog.V(2).Infof("claim %q is %s", claim.Name, phase) break } } } func createClients(t *testing.T, s *httptest.Server) (*clientset.Clientset, *persistentvolumecontroller.PersistentVolumeController, watch.Interface, watch.Interface) { - // Use higher QPS and Burst, there is a test for race condition below, which - // creates many claims and default values were too low. - testClient := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}, QPS: 1000, Burst: 100000}) + // Use higher QPS and Burst, there is a test for race conditions which + // creates many objects and default values were too low. + binderClient := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}, QPS: 1000000, Burst: 1000000}) + testClient := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}, QPS: 1000000, Burst: 1000000}) + host := volumetest.NewFakeVolumeHost("/tmp/fake", nil, nil) plugins := []volume.VolumePlugin{&volumetest.FakeVolumePlugin{ PluginName: "plugin-name", @@ -356,7 +549,9 @@ func createClients(t *testing.T, s *httptest.Server) (*clientset.Clientset, *per Detachers: nil, }} cloud := &fake_cloud.FakeCloud{} - ctrl := persistentvolumecontroller.NewPersistentVolumeController(testClient, 10*time.Second, nil, plugins, cloud, "", nil, nil, nil) + + syncPeriod := getSyncPeriod() + ctrl := persistentvolumecontroller.NewPersistentVolumeController(binderClient, syncPeriod, nil, plugins, cloud, "", nil, nil, nil) watchPV, err := testClient.PersistentVolumes().Watch(api.ListOptions{}) if err != nil { @@ -384,7 +579,10 @@ func createPV(name, path, cap string, mode []api.PersistentVolumeAccessMode, rec func createPVC(name, cap string, mode []api.PersistentVolumeAccessMode) *api.PersistentVolumeClaim { return &api.PersistentVolumeClaim{ - ObjectMeta: api.ObjectMeta{Name: name}, + ObjectMeta: api.ObjectMeta{ + Name: name, + Namespace: api.NamespaceDefault, + }, Spec: api.PersistentVolumeClaimSpec{ Resources: api.ResourceRequirements{Requests: api.ResourceList{api.ResourceName(api.ResourceStorage): resource.MustParse(cap)}}, AccessModes: mode,