diff --git a/pkg/controller/volume/persistentvolume/delete_test.go b/pkg/controller/volume/persistentvolume/delete_test.go index 202d0d89abf..b535af0ba3f 100644 --- a/pkg/controller/volume/persistentvolume/delete_test.go +++ b/pkg/controller/volume/persistentvolume/delete_test.go @@ -22,6 +22,7 @@ import ( "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" + pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing" ) // Test single call to syncVolume, expecting recycling to happen. @@ -91,11 +92,9 @@ func TestDeleteSync(t *testing.T) { noclaims, noclaims, noevents, noerrors, - wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationDelete, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *volumeReactor) { + wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationDelete, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor) { // Delete the volume before delete operation starts - reactor.lock.Lock() - delete(reactor.volumes, "volume8-6") - reactor.lock.Unlock() + reactor.DeleteVolume("volume8-6") }), }, { @@ -108,16 +107,12 @@ func TestDeleteSync(t *testing.T) { noclaims, newClaimArray("claim8-7", "uid8-7", "10Gi", "volume8-7", v1.ClaimBound, nil), noevents, noerrors, - wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationDelete, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *volumeReactor) { - reactor.lock.Lock() - defer reactor.lock.Unlock() + wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationDelete, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor) { // Bind the volume to resurrected claim (this should never // happen) claim := newClaim("claim8-7", "uid8-7", "10Gi", "volume8-7", v1.ClaimBound, nil) - reactor.claims[claim.Name] = claim + reactor.AddClaimBoundToVolume(claim) ctrl.claims.Add(claim) - volume := reactor.volumes["volume8-7"] - volume.Status.Phase = v1.VolumeBound }), }, { @@ -141,7 +136,7 @@ func TestDeleteSync(t *testing.T) { noclaims, noclaims, noevents, noerrors, - func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error { + func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error { // Inject external deleter annotation test.initialVolumes[0].Annotations[annDynamicallyProvisioned] = "external.io/test" test.expectedVolumes[0].Annotations[annDynamicallyProvisioned] = "external.io/test" @@ -184,7 +179,7 @@ func TestDeleteSync(t *testing.T) { newClaimArray("claim8-12", "uid8-12", "10Gi", "volume8-12-2", v1.ClaimBound, nil), newClaimArray("claim8-12", "uid8-12", "10Gi", "volume8-12-2", v1.ClaimBound, nil), noevents, noerrors, - func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error { + func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error { // Inject external deleter annotation test.initialVolumes[0].Annotations[annDynamicallyProvisioned] = "external.io/test" test.expectedVolumes[0].Annotations[annDynamicallyProvisioned] = "external.io/test" diff --git a/pkg/controller/volume/persistentvolume/framework_test.go b/pkg/controller/volume/persistentvolume/framework_test.go index 0092ca229cf..c8d89ffc04c 100644 --- a/pkg/controller/volume/persistentvolume/framework_test.go +++ b/pkg/controller/volume/persistentvolume/framework_test.go @@ -17,12 +17,9 @@ limitations under the License. package persistentvolume import ( - "errors" "fmt" "reflect" - "strconv" "strings" - "sync" "sync/atomic" "testing" "time" @@ -31,27 +28,21 @@ import ( v1 "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" - apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" - utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" corelisters "k8s.io/client-go/listers/core/v1" storagelisters "k8s.io/client-go/listers/storage/v1" - core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/features" + pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing" vol "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util/recyclerclient" ) @@ -95,366 +86,75 @@ type controllerTest struct { // event message. expectedEvents []string // Errors to produce on matching action - errors []reactorError + errors []pvtesting.ReactorError // Function to call as the test. test testCall } -type testCall func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error +type testCall func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error const testNamespace = "default" const mockPluginName = "kubernetes.io/mock-volume" -var versionConflictError = errors.New("VersionError") var novolumes []*v1.PersistentVolume var noclaims []*v1.PersistentVolumeClaim var noevents = []string{} -var noerrors = []reactorError{} +var noerrors = []pvtesting.ReactorError{} -// volumeReactor is a core.Reactor that simulates etcd and API server. It -// stores: -// - Latest version of claims volumes saved by the controller. -// - Queue of all saves (to simulate "volume/claim updated" events). This queue -// contains all intermediate state of an object - e.g. a claim.VolumeName -// is updated first and claim.Phase second. This queue will then contain both -// updates as separate entries. -// - Number of changes since the last call to volumeReactor.syncAll(). -// - Optionally, volume and claim fake watchers which should be the same ones -// used by the controller. Any time an event function like deleteVolumeEvent -// is called to simulate an event, the reactor's stores are updated and the -// controller is sent the event via the fake watcher. -// - Optionally, list of error that should be returned by reactor, simulating -// etcd / API server failures. These errors are evaluated in order and every -// error is returned only once. I.e. when the reactor finds matching -// reactorError, it return appropriate error and removes the reactorError from -// the list. type volumeReactor struct { - volumes map[string]*v1.PersistentVolume - claims map[string]*v1.PersistentVolumeClaim - changedObjects []interface{} - changedSinceLastSync int - ctrl *PersistentVolumeController - fakeVolumeWatch *watch.FakeWatcher - fakeClaimWatch *watch.FakeWatcher - lock sync.Mutex - errors []reactorError - watchers map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher + *pvtesting.VolumeReactor + ctrl *PersistentVolumeController } -// reactorError is an error that is returned by test reactor (=simulated -// etcd+/API server) when an action performed by the reactor matches given verb -// ("get", "update", "create", "delete" or "*"") on given resource -// ("persistentvolumes", "persistentvolumeclaims" or "*"). -type reactorError struct { - verb string - resource string - error error +func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController, fakeVolumeWatch, fakeClaimWatch *watch.FakeWatcher, errors []pvtesting.ReactorError) *volumeReactor { + return &volumeReactor{ + pvtesting.NewVolumeReactor(client, fakeVolumeWatch, fakeClaimWatch, errors), + ctrl, + } } -// React is a callback called by fake kubeClient from the controller. -// In other words, every claim/volume change performed by the controller ends -// here. -// This callback checks versions of the updated objects and refuse those that -// are too old (simulating real etcd). -// All updated objects are stored locally to keep track of object versions and -// to evaluate test results. -// All updated objects are also inserted into changedObjects queue and -// optionally sent back to the controller via its watchers. -func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Object, err error) { - r.lock.Lock() - defer r.lock.Unlock() - - klog.V(4).Infof("reactor got operation %q on %q", action.GetVerb(), action.GetResource()) - - // Inject error when requested - err = r.injectReactError(action) - if err != nil { - return true, nil, err +// waitForIdle waits until all tests, controllers and other goroutines do their +// job and no new actions are registered for 10 milliseconds. +func (r *volumeReactor) waitForIdle() { + r.ctrl.runningOperations.WaitForCompletion() + // Check every 10ms if the controller does something and stop if it's + // idle. + oldChanges := -1 + for { + time.Sleep(10 * time.Millisecond) + changes := r.GetChangeCount() + if changes == oldChanges { + // No changes for last 10ms -> controller must be idle. + break + } + oldChanges = changes } - - // Test did not request to inject an error, continue simulating API server. - switch { - case action.Matches("create", "persistentvolumes"): - obj := action.(core.UpdateAction).GetObject() - volume := obj.(*v1.PersistentVolume) - - // check the volume does not exist - _, found := r.volumes[volume.Name] - if found { - return true, nil, fmt.Errorf("Cannot create volume %s: volume already exists", volume.Name) - } - - // mimic apiserver defaulting - if volume.Spec.VolumeMode == nil && utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) { - volume.Spec.VolumeMode = new(v1.PersistentVolumeMode) - *volume.Spec.VolumeMode = v1.PersistentVolumeFilesystem - } - - // Store the updated object to appropriate places. - r.volumes[volume.Name] = volume - for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { - w.Add(volume) - } - r.changedObjects = append(r.changedObjects, volume) - r.changedSinceLastSync++ - klog.V(4).Infof("created volume %s", volume.Name) - return true, volume, nil - - case action.Matches("create", "persistentvolumeclaims"): - obj := action.(core.UpdateAction).GetObject() - claim := obj.(*v1.PersistentVolumeClaim) - - // check the claim does not exist - _, found := r.claims[claim.Name] - if found { - return true, nil, fmt.Errorf("Cannot create claim %s: claim already exists", claim.Name) - } - - // Store the updated object to appropriate places. - r.claims[claim.Name] = claim - for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { - w.Add(claim) - } - r.changedObjects = append(r.changedObjects, claim) - r.changedSinceLastSync++ - klog.V(4).Infof("created claim %s", claim.Name) - return true, claim, nil - - case action.Matches("update", "persistentvolumes"): - obj := action.(core.UpdateAction).GetObject() - volume := obj.(*v1.PersistentVolume) - - // Check and bump object version - storedVolume, found := r.volumes[volume.Name] - if found { - storedVer, _ := strconv.Atoi(storedVolume.ResourceVersion) - requestedVer, _ := strconv.Atoi(volume.ResourceVersion) - if storedVer != requestedVer { - return true, obj, versionConflictError - } - if reflect.DeepEqual(storedVolume, volume) { - klog.V(4).Infof("nothing updated volume %s", volume.Name) - return true, volume, nil - } - // Don't modify the existing object - volume = volume.DeepCopy() - volume.ResourceVersion = strconv.Itoa(storedVer + 1) - } else { - return true, nil, fmt.Errorf("Cannot update volume %s: volume not found", volume.Name) - } - - // Store the updated object to appropriate places. - for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { - w.Modify(volume) - } - r.volumes[volume.Name] = volume - r.changedObjects = append(r.changedObjects, volume) - r.changedSinceLastSync++ - klog.V(4).Infof("saved updated volume %s", volume.Name) - return true, volume, nil - - case action.Matches("update", "persistentvolumeclaims"): - obj := action.(core.UpdateAction).GetObject() - claim := obj.(*v1.PersistentVolumeClaim) - - // Check and bump object version - storedClaim, found := r.claims[claim.Name] - if found { - storedVer, _ := strconv.Atoi(storedClaim.ResourceVersion) - requestedVer, _ := strconv.Atoi(claim.ResourceVersion) - if storedVer != requestedVer { - return true, obj, versionConflictError - } - if reflect.DeepEqual(storedClaim, claim) { - klog.V(4).Infof("nothing updated claim %s", claim.Name) - return true, claim, nil - } - // Don't modify the existing object - claim = claim.DeepCopy() - claim.ResourceVersion = strconv.Itoa(storedVer + 1) - } else { - return true, nil, fmt.Errorf("Cannot update claim %s: claim not found", claim.Name) - } - - // Store the updated object to appropriate places. - for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { - w.Modify(claim) - } - r.claims[claim.Name] = claim - r.changedObjects = append(r.changedObjects, claim) - r.changedSinceLastSync++ - klog.V(4).Infof("saved updated claim %s", claim.Name) - return true, claim, nil - - case action.Matches("get", "persistentvolumes"): - name := action.(core.GetAction).GetName() - volume, found := r.volumes[name] - if found { - klog.V(4).Infof("GetVolume: found %s", volume.Name) - return true, volume.DeepCopy(), nil - } else { - klog.V(4).Infof("GetVolume: volume %s not found", name) - return true, nil, fmt.Errorf("Cannot find volume %s", name) - } - - case action.Matches("get", "persistentvolumeclaims"): - name := action.(core.GetAction).GetName() - claim, found := r.claims[name] - if found { - klog.V(4).Infof("GetClaim: found %s", claim.Name) - return true, claim.DeepCopy(), nil - } else { - klog.V(4).Infof("GetClaim: claim %s not found", name) - return true, nil, apierrs.NewNotFound(action.GetResource().GroupResource(), name) - } - - case action.Matches("delete", "persistentvolumes"): - name := action.(core.DeleteAction).GetName() - klog.V(4).Infof("deleted volume %s", name) - obj, found := r.volumes[name] - if found { - delete(r.volumes, name) - for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { - w.Delete(obj) - } - r.changedSinceLastSync++ - return true, nil, nil - } else { - return true, nil, fmt.Errorf("Cannot delete volume %s: not found", name) - } - - case action.Matches("delete", "persistentvolumeclaims"): - name := action.(core.DeleteAction).GetName() - klog.V(4).Infof("deleted claim %s", name) - obj, found := r.claims[name] - if found { - delete(r.claims, name) - for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { - w.Delete(obj) - } - r.changedSinceLastSync++ - return true, nil, nil - } else { - return true, nil, fmt.Errorf("Cannot delete claim %s: not found", name) - } - } - - return false, nil, nil } -// Watch watches objects from the volumeReactor. Watch returns a channel which -// will push added / modified / deleted object. -func (r *volumeReactor) Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) { - r.lock.Lock() - defer r.lock.Unlock() - - fakewatcher := watch.NewRaceFreeFake() - - if _, exists := r.watchers[gvr]; !exists { - r.watchers[gvr] = make(map[string][]*watch.RaceFreeFakeWatcher) +// waitTest waits until all tests, controllers and other goroutines do their +// job and list of current volumes/claims is equal to list of expected +// volumes/claims (with ~10 second timeout). +func (r *volumeReactor) waitTest(test controllerTest) error { + // start with 10 ms, multiply by 2 each step, 10 steps = 10.23 seconds + backoff := wait.Backoff{ + Duration: 10 * time.Millisecond, + Jitter: 0, + Factor: 2, + Steps: 10, } - r.watchers[gvr][ns] = append(r.watchers[gvr][ns], fakewatcher) - return fakewatcher, nil -} + err := wait.ExponentialBackoff(backoff, func() (done bool, err error) { + // Finish all operations that are in progress + r.ctrl.runningOperations.WaitForCompletion() -func (r *volumeReactor) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.RaceFreeFakeWatcher { - watches := []*watch.RaceFreeFakeWatcher{} - if r.watchers[gvr] != nil { - if w := r.watchers[gvr][ns]; w != nil { - watches = append(watches, w...) + // Return 'true' if the reactor reached the expected state + err1 := r.CheckClaims(test.expectedClaims) + err2 := r.CheckVolumes(test.expectedVolumes) + if err1 == nil && err2 == nil { + return true, nil } - if ns != metav1.NamespaceAll { - if w := r.watchers[gvr][metav1.NamespaceAll]; w != nil { - watches = append(watches, w...) - } - } - } - return watches -} - -// injectReactError returns an error when the test requested given action to -// fail. nil is returned otherwise. -func (r *volumeReactor) injectReactError(action core.Action) error { - if len(r.errors) == 0 { - // No more errors to inject, everything should succeed. - return nil - } - - for i, expected := range r.errors { - klog.V(4).Infof("trying to match %q %q with %q %q", expected.verb, expected.resource, action.GetVerb(), action.GetResource()) - if action.Matches(expected.verb, expected.resource) { - // That's the action we're waiting for, remove it from injectedErrors - r.errors = append(r.errors[:i], r.errors[i+1:]...) - klog.V(4).Infof("reactor found matching error at index %d: %q %q, returning %v", i, expected.verb, expected.resource, expected.error) - return expected.error - } - } - return nil -} - -// checkVolumes compares all expectedVolumes with set of volumes at the end of -// the test and reports differences. -func (r *volumeReactor) checkVolumes(expectedVolumes []*v1.PersistentVolume) error { - r.lock.Lock() - defer r.lock.Unlock() - - expectedMap := make(map[string]*v1.PersistentVolume) - gotMap := make(map[string]*v1.PersistentVolume) - // Clear any ResourceVersion from both sets - for _, v := range expectedVolumes { - // Don't modify the existing object - v := v.DeepCopy() - v.ResourceVersion = "" - if v.Spec.ClaimRef != nil { - v.Spec.ClaimRef.ResourceVersion = "" - } - expectedMap[v.Name] = v - } - for _, v := range r.volumes { - // We must clone the volume because of golang race check - it was - // written by the controller without any locks on it. - v := v.DeepCopy() - v.ResourceVersion = "" - if v.Spec.ClaimRef != nil { - v.Spec.ClaimRef.ResourceVersion = "" - } - gotMap[v.Name] = v - } - if !reflect.DeepEqual(expectedMap, gotMap) { - // Print ugly but useful diff of expected and received objects for - // easier debugging. - return fmt.Errorf("Volume check failed [A-expected, B-got]: %s", diff.ObjectDiff(expectedMap, gotMap)) - } - return nil -} - -// checkClaims compares all expectedClaims with set of claims at the end of the -// test and reports differences. -func (r *volumeReactor) checkClaims(expectedClaims []*v1.PersistentVolumeClaim) error { - r.lock.Lock() - defer r.lock.Unlock() - - expectedMap := make(map[string]*v1.PersistentVolumeClaim) - gotMap := make(map[string]*v1.PersistentVolumeClaim) - for _, c := range expectedClaims { - // Don't modify the existing object - c = c.DeepCopy() - c.ResourceVersion = "" - expectedMap[c.Name] = c - } - for _, c := range r.claims { - // We must clone the claim because of golang race check - it was - // written by the controller without any locks on it. - c = c.DeepCopy() - c.ResourceVersion = "" - gotMap[c.Name] = c - } - if !reflect.DeepEqual(expectedMap, gotMap) { - // Print ugly but useful diff of expected and received objects for - // easier debugging. - return fmt.Errorf("Claim check failed [A-expected, B-got result]: %s", diff.ObjectDiff(expectedMap, gotMap)) - } - return nil + return false, nil + }) + return err } // checkEvents compares all expectedEvents with events generated during the test @@ -506,196 +206,6 @@ func checkEvents(t *testing.T, expectedEvents []string, ctrl *PersistentVolumeCo return err } -// popChange returns one recorded updated object, either *v1.PersistentVolume -// or *v1.PersistentVolumeClaim. Returns nil when there are no changes. -func (r *volumeReactor) popChange() interface{} { - r.lock.Lock() - defer r.lock.Unlock() - - if len(r.changedObjects) == 0 { - return nil - } - - // For debugging purposes, print the queue - for _, obj := range r.changedObjects { - switch obj.(type) { - case *v1.PersistentVolume: - vol, _ := obj.(*v1.PersistentVolume) - klog.V(4).Infof("reactor queue: %s", vol.Name) - case *v1.PersistentVolumeClaim: - claim, _ := obj.(*v1.PersistentVolumeClaim) - klog.V(4).Infof("reactor queue: %s", claim.Name) - } - } - - // Pop the first item from the queue and return it - obj := r.changedObjects[0] - r.changedObjects = r.changedObjects[1:] - return obj -} - -// syncAll simulates the controller periodic sync of volumes and claim. It -// simply adds all these objects to the internal queue of updates. This method -// should be used when the test manually calls syncClaim/syncVolume. Test that -// use real controller loop (ctrl.Run()) will get periodic sync automatically. -func (r *volumeReactor) syncAll() { - r.lock.Lock() - defer r.lock.Unlock() - - for _, c := range r.claims { - r.changedObjects = append(r.changedObjects, c) - } - for _, v := range r.volumes { - r.changedObjects = append(r.changedObjects, v) - } - r.changedSinceLastSync = 0 -} - -func (r *volumeReactor) getChangeCount() int { - r.lock.Lock() - defer r.lock.Unlock() - return r.changedSinceLastSync -} - -// waitForIdle waits until all tests, controllers and other goroutines do their -// job and no new actions are registered for 10 milliseconds. -func (r *volumeReactor) waitForIdle() { - r.ctrl.runningOperations.WaitForCompletion() - // Check every 10ms if the controller does something and stop if it's - // idle. - oldChanges := -1 - for { - time.Sleep(10 * time.Millisecond) - changes := r.getChangeCount() - if changes == oldChanges { - // No changes for last 10ms -> controller must be idle. - break - } - oldChanges = changes - } -} - -// waitTest waits until all tests, controllers and other goroutines do their -// job and list of current volumes/claims is equal to list of expected -// volumes/claims (with ~10 second timeout). -func (r *volumeReactor) waitTest(test controllerTest) error { - // start with 10 ms, multiply by 2 each step, 10 steps = 10.23 seconds - backoff := wait.Backoff{ - Duration: 10 * time.Millisecond, - Jitter: 0, - Factor: 2, - Steps: 10, - } - err := wait.ExponentialBackoff(backoff, func() (done bool, err error) { - // Finish all operations that are in progress - r.ctrl.runningOperations.WaitForCompletion() - - // Return 'true' if the reactor reached the expected state - err1 := r.checkClaims(test.expectedClaims) - err2 := r.checkVolumes(test.expectedVolumes) - if err1 == nil && err2 == nil { - return true, nil - } - return false, nil - }) - return err -} - -// deleteVolumeEvent simulates that a volume has been deleted in etcd and -// the controller receives 'volume deleted' event. -func (r *volumeReactor) deleteVolumeEvent(volume *v1.PersistentVolume) { - r.lock.Lock() - defer r.lock.Unlock() - - // Remove the volume from list of resulting volumes. - delete(r.volumes, volume.Name) - - // Generate deletion event. Cloned volume is needed to prevent races (and we - // would get a clone from etcd too). - if r.fakeVolumeWatch != nil { - r.fakeVolumeWatch.Delete(volume.DeepCopy()) - } -} - -// deleteClaimEvent simulates that a claim has been deleted in etcd and the -// controller receives 'claim deleted' event. -func (r *volumeReactor) deleteClaimEvent(claim *v1.PersistentVolumeClaim) { - r.lock.Lock() - defer r.lock.Unlock() - - // Remove the claim from list of resulting claims. - delete(r.claims, claim.Name) - - // Generate deletion event. Cloned volume is needed to prevent races (and we - // would get a clone from etcd too). - if r.fakeClaimWatch != nil { - r.fakeClaimWatch.Delete(claim.DeepCopy()) - } -} - -// addVolumeEvent simulates that a volume has been added in etcd and the -// controller receives 'volume added' event. -func (r *volumeReactor) addVolumeEvent(volume *v1.PersistentVolume) { - r.lock.Lock() - defer r.lock.Unlock() - - r.volumes[volume.Name] = volume - // Generate event. No cloning is needed, this claim is not stored in the - // controller cache yet. - if r.fakeVolumeWatch != nil { - r.fakeVolumeWatch.Add(volume) - } -} - -// modifyVolumeEvent simulates that a volume has been modified in etcd and the -// controller receives 'volume modified' event. -func (r *volumeReactor) modifyVolumeEvent(volume *v1.PersistentVolume) { - r.lock.Lock() - defer r.lock.Unlock() - - r.volumes[volume.Name] = volume - // Generate deletion event. Cloned volume is needed to prevent races (and we - // would get a clone from etcd too). - if r.fakeVolumeWatch != nil { - r.fakeVolumeWatch.Modify(volume.DeepCopy()) - } -} - -// addClaimEvent simulates that a claim has been deleted in etcd and the -// controller receives 'claim added' event. -func (r *volumeReactor) addClaimEvent(claim *v1.PersistentVolumeClaim) { - r.lock.Lock() - defer r.lock.Unlock() - - r.claims[claim.Name] = claim - // Generate event. No cloning is needed, this claim is not stored in the - // controller cache yet. - if r.fakeClaimWatch != nil { - r.fakeClaimWatch.Add(claim) - } -} - -func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController, fakeVolumeWatch, fakeClaimWatch *watch.FakeWatcher, errors []reactorError) *volumeReactor { - reactor := &volumeReactor{ - volumes: make(map[string]*v1.PersistentVolume), - claims: make(map[string]*v1.PersistentVolumeClaim), - ctrl: ctrl, - fakeVolumeWatch: fakeVolumeWatch, - fakeClaimWatch: fakeClaimWatch, - errors: errors, - watchers: make(map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher), - } - client.AddReactor("create", "persistentvolumes", reactor.React) - client.AddReactor("create", "persistentvolumeclaims", reactor.React) - client.AddReactor("update", "persistentvolumes", reactor.React) - client.AddReactor("update", "persistentvolumeclaims", reactor.React) - client.AddReactor("get", "persistentvolumes", reactor.React) - client.AddReactor("get", "persistentvolumeclaims", reactor.React) - client.AddReactor("delete", "persistentvolumes", reactor.React) - client.AddReactor("delete", "persistentvolumeclaims", reactor.React) - - return reactor -} func alwaysReady() bool { return true } func newTestController(kubeClient clientset.Interface, informerFactory informers.SharedInformerFactory, enableDynamicProvisioning bool) (*PersistentVolumeController, error) { @@ -915,11 +425,11 @@ func claimWithAccessMode(modes []v1.PersistentVolumeAccessMode, claims []*v1.Per return claims } -func testSyncClaim(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error { +func testSyncClaim(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error { return ctrl.syncClaim(test.initialClaims[0]) } -func testSyncClaimError(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error { +func testSyncClaimError(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error { err := ctrl.syncClaim(test.initialClaims[0]) if err != nil { @@ -928,7 +438,7 @@ func testSyncClaimError(ctrl *PersistentVolumeController, reactor *volumeReactor return fmt.Errorf("syncClaim succeeded when failure was expected") } -func testSyncVolume(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error { +func testSyncVolume(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error { return ctrl.syncVolume(test.initialVolumes[0]) } @@ -957,7 +467,7 @@ var ( // is deleted, recycled or provisioned. // - calls given testCall func wrapTestWithPluginCalls(expectedRecycleCalls, expectedDeleteCalls []error, expectedProvisionCalls []provisionCall, toWrap testCall) testCall { - return func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error { + return func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error { plugin := &mockVolumePlugin{ recycleCalls: expectedRecycleCalls, deleteCalls: expectedDeleteCalls, @@ -992,7 +502,7 @@ func wrapTestWithProvisionCalls(expectedProvisionCalls []provisionCall, toWrap t // - configures controller with a volume plugin that emulates CSI migration // - calls given testCall func wrapTestWithCSIMigrationProvisionCalls(toWrap testCall) testCall { - return func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error { + return func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error { plugin := &mockVolumePlugin{ isMigratedToCSI: true, } @@ -1010,9 +520,9 @@ func wrapTestWithCSIMigrationProvisionCalls(toWrap testCall) testCall { // injected function to simulate that something is happening when the // controller waits for the operation lock. Controller is then resumed and we // check how it behaves. -func wrapTestWithInjectedOperation(toWrap testCall, injectBeforeOperation func(ctrl *PersistentVolumeController, reactor *volumeReactor)) testCall { +func wrapTestWithInjectedOperation(toWrap testCall, injectBeforeOperation func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor)) testCall { - return func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error { + return func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error { // Inject a hook before async operation starts ctrl.preOperationHook = func(operationName string) { // Inside the hook, run the function to inject @@ -1040,13 +550,13 @@ func wrapTestWithInjectedOperation(toWrap testCall, injectBeforeOperation func(c } } -func evaluateTestResults(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest, t *testing.T) { +func evaluateTestResults(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest, t *testing.T) { // Evaluate results - if err := reactor.checkClaims(test.expectedClaims); err != nil { + if err := reactor.CheckClaims(test.expectedClaims); err != nil { t.Errorf("Test %q: %v", test.name, err) } - if err := reactor.checkVolumes(test.expectedVolumes); err != nil { + if err := reactor.CheckVolumes(test.expectedVolumes); err != nil { t.Errorf("Test %q: %v", test.name, err) } @@ -1074,12 +584,12 @@ func runSyncTests(t *testing.T, tests []controllerTest, storageClasses []*storag reactor := newVolumeReactor(client, ctrl, nil, nil, test.errors) for _, claim := range test.initialClaims { ctrl.claims.Add(claim) - reactor.claims[claim.Name] = claim } for _, volume := range test.initialVolumes { ctrl.volumes.store.Add(volume) - reactor.volumes[volume.Name] = volume } + reactor.AddClaims(test.initialClaims) + reactor.AddVolumes(test.initialVolumes) // Inject classes into controller via a custom lister. indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) @@ -1095,7 +605,7 @@ func runSyncTests(t *testing.T, tests []controllerTest, storageClasses []*storag ctrl.podLister = corelisters.NewPodLister(podIndexer) // Run the tested functions - err = test.test(ctrl, reactor, test) + err = test.test(ctrl, reactor.VolumeReactor, test) if err != nil { t.Errorf("Test %q failed: %v", test.name, err) } @@ -1106,7 +616,7 @@ func runSyncTests(t *testing.T, tests []controllerTest, storageClasses []*storag t.Errorf("Test %q failed: %v", test.name, err) } - evaluateTestResults(ctrl, reactor, test, t) + evaluateTestResults(ctrl, reactor.VolumeReactor, test, t) } } @@ -1145,15 +655,15 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s reactor := newVolumeReactor(client, ctrl, nil, nil, test.errors) for _, claim := range test.initialClaims { ctrl.claims.Add(claim) - reactor.claims[claim.Name] = claim } for _, volume := range test.initialVolumes { ctrl.volumes.store.Add(volume) - reactor.volumes[volume.Name] = volume } + reactor.AddClaims(test.initialClaims) + reactor.AddVolumes(test.initialVolumes) // Run the tested function - err = test.test(ctrl, reactor, test) + err = test.test(ctrl, reactor.VolumeReactor, test) if err != nil { t.Errorf("Test %q failed: %v", test.name, err) } @@ -1174,16 +684,16 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s // Wait for all goroutines to finish reactor.waitForIdle() - obj := reactor.popChange() + obj := reactor.PopChange() if obj == nil { // Nothing was changed, should we exit? - if firstSync || reactor.changedSinceLastSync > 0 { + if firstSync || reactor.ChangedSinceLastSync() > 0 { // There were some changes after the last "periodic sync". // Simulate "periodic sync" of everything (until it produces // no changes). firstSync = false klog.V(4).Infof("test %q: simulating periodical sync of all claims and volumes", test.name) - reactor.syncAll() + reactor.SyncAll() } else { // Last sync did not produce any updates, the test reached // stable state -> finish. @@ -1201,7 +711,7 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s ctrl.claims.Update(claim) err = ctrl.syncClaim(claim) if err != nil { - if err == versionConflictError { + if err == pvtesting.VersionConflictError { // Ignore version errors klog.V(4).Infof("test intentionaly ignores version error.") } else { @@ -1218,7 +728,7 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s ctrl.volumes.store.Update(volume) err = ctrl.syncVolume(volume) if err != nil { - if err == versionConflictError { + if err == pvtesting.VersionConflictError { // Ignore version errors klog.V(4).Infof("test intentionaly ignores version error.") } else { @@ -1231,7 +741,7 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s continue } } - evaluateTestResults(ctrl, reactor, test, t) + evaluateTestResults(ctrl, reactor.VolumeReactor, test, t) klog.V(4).Infof("test %q finished after %d iterations", test.name, counter) } } diff --git a/pkg/controller/volume/persistentvolume/provision_test.go b/pkg/controller/volume/persistentvolume/provision_test.go index 0ea0f272a79..97cf29cfbff 100644 --- a/pkg/controller/volume/persistentvolume/provision_test.go +++ b/pkg/controller/volume/persistentvolume/provision_test.go @@ -25,6 +25,7 @@ import ( apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" api "k8s.io/kubernetes/pkg/apis/core" + pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing" ) var class1Parameters = map[string]string{ @@ -191,13 +192,11 @@ func TestProvisionSync(t *testing.T) { // The claim would be bound in next syncClaim newClaimArray("claim11-7", "uid11-7", "1Gi", "", v1.ClaimPending, &classGold, annStorageProvisioner), noevents, noerrors, - wrapTestWithInjectedOperation(wrapTestWithProvisionCalls([]provisionCall{}, testSyncClaim), func(ctrl *PersistentVolumeController, reactor *volumeReactor) { + wrapTestWithInjectedOperation(wrapTestWithProvisionCalls([]provisionCall{}, testSyncClaim), func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor) { // Create a volume before provisionClaimOperation starts. // This similates a parallel controller provisioning the volume. - reactor.lock.Lock() volume := newVolume("pvc-uid11-7", "1Gi", "uid11-7", "claim11-7", v1.VolumeBound, v1.PersistentVolumeReclaimDelete, classGold, annBoundByController, annDynamicallyProvisioned) - reactor.volumes[volume.Name] = volume - reactor.lock.Unlock() + reactor.AddVolume(volume) }), }, { @@ -210,7 +209,7 @@ func TestProvisionSync(t *testing.T) { // Binding will be completed in the next syncClaim newClaimArray("claim11-8", "uid11-8", "1Gi", "", v1.ClaimPending, &classGold, annStorageProvisioner), []string{"Normal ProvisioningSucceeded"}, - []reactorError{ + []pvtesting.ReactorError{ // Inject error to the first // kubeclient.PersistentVolumes.Create() call. All other calls // will succeed. @@ -227,7 +226,7 @@ func TestProvisionSync(t *testing.T) { newClaimArray("claim11-9", "uid11-9", "1Gi", "", v1.ClaimPending, &classGold), newClaimArray("claim11-9", "uid11-9", "1Gi", "", v1.ClaimPending, &classGold, annStorageProvisioner), []string{"Warning ProvisioningFailed"}, - []reactorError{ + []pvtesting.ReactorError{ // Inject error to five kubeclient.PersistentVolumes.Create() // calls {"create", "persistentvolumes", errors.New("Mock creation error1")}, @@ -252,7 +251,7 @@ func TestProvisionSync(t *testing.T) { newClaimArray("claim11-10", "uid11-10", "1Gi", "", v1.ClaimPending, &classGold), newClaimArray("claim11-10", "uid11-10", "1Gi", "", v1.ClaimPending, &classGold, annStorageProvisioner), []string{"Warning ProvisioningFailed", "Warning ProvisioningCleanupFailed"}, - []reactorError{ + []pvtesting.ReactorError{ // Inject error to five kubeclient.PersistentVolumes.Create() // calls {"create", "persistentvolumes", errors.New("Mock creation error1")}, @@ -273,7 +272,7 @@ func TestProvisionSync(t *testing.T) { newClaimArray("claim11-11", "uid11-11", "1Gi", "", v1.ClaimPending, &classGold), newClaimArray("claim11-11", "uid11-11", "1Gi", "", v1.ClaimPending, &classGold, annStorageProvisioner), []string{"Warning ProvisioningFailed", "Warning ProvisioningCleanupFailed"}, - []reactorError{ + []pvtesting.ReactorError{ // Inject error to five kubeclient.PersistentVolumes.Create() // calls {"create", "persistentvolumes", errors.New("Mock creation error1")}, @@ -303,7 +302,7 @@ func TestProvisionSync(t *testing.T) { newClaimArray("claim11-12", "uid11-12", "1Gi", "", v1.ClaimPending, &classGold), newClaimArray("claim11-12", "uid11-12", "1Gi", "", v1.ClaimPending, &classGold, annStorageProvisioner), []string{"Warning ProvisioningFailed"}, - []reactorError{ + []pvtesting.ReactorError{ // Inject error to five kubeclient.PersistentVolumes.Create() // calls {"create", "persistentvolumes", errors.New("Mock creation error1")}, @@ -397,7 +396,7 @@ func TestProvisionSync(t *testing.T) { newClaimArray("claim11-19", "uid11-19", "1Gi", "", v1.ClaimPending, &classGold), newClaimArray("claim11-19", "uid11-19", "1Gi", "", v1.ClaimPending, &classGold, annStorageProvisioner), noevents, - []reactorError{ + []pvtesting.ReactorError{ // Inject errors to simulate crashed API server during // kubeclient.PersistentVolumes.Create() {"create", "persistentvolumes", errors.New("Mock creation error1")}, diff --git a/pkg/controller/volume/persistentvolume/pv_controller_test.go b/pkg/controller/volume/persistentvolume/pv_controller_test.go index c4461ded228..c1c8dde46e7 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller_test.go +++ b/pkg/controller/volume/persistentvolume/pv_controller_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/klog" "k8s.io/kubernetes/pkg/controller" + pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing" ) var ( @@ -60,9 +61,9 @@ func TestControllerSync(t *testing.T) { newClaimArray("claim5-2", "uid5-2", "1Gi", "volume5-2", v1.ClaimBound, nil, annBoundByController, annBindCompleted), noevents, noerrors, // Custom test function that generates an add event - func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error { + func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error { claim := newClaim("claim5-2", "uid5-2", "1Gi", "", v1.ClaimPending, nil) - reactor.addClaimEvent(claim) + reactor.AddClaimEvent(claim) return nil }, }, @@ -75,10 +76,10 @@ func TestControllerSync(t *testing.T) { noclaims, noevents, noerrors, // Custom test function that generates a delete event - func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error { + func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error { obj := ctrl.claims.List()[0] claim := obj.(*v1.PersistentVolumeClaim) - reactor.deleteClaimEvent(claim) + reactor.DeleteClaimEvent(claim) return nil }, }, @@ -91,10 +92,10 @@ func TestControllerSync(t *testing.T) { newClaimArray("claim5-4", "uid5-4", "1Gi", "volume5-4", v1.ClaimLost, nil, annBoundByController, annBindCompleted), []string{"Warning ClaimLost"}, noerrors, // Custom test function that generates a delete event - func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error { + func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error { obj := ctrl.volumes.store.List()[0] volume := obj.(*v1.PersistentVolume) - reactor.deleteVolumeEvent(volume) + reactor.DeleteVolumeEvent(volume) return nil }, }, @@ -120,13 +121,13 @@ func TestControllerSync(t *testing.T) { reactor := newVolumeReactor(client, ctrl, fakeVolumeWatch, fakeClaimWatch, test.errors) for _, claim := range test.initialClaims { - reactor.claims[claim.Name] = claim + reactor.AddClaim(claim) go func(claim *v1.PersistentVolumeClaim) { fakeClaimWatch.Add(claim) }(claim) } for _, volume := range test.initialVolumes { - reactor.volumes[volume.Name] = volume + reactor.AddVolume(volume) go func(volume *v1.PersistentVolume) { fakeVolumeWatch.Add(volume) }(volume) @@ -148,7 +149,7 @@ func TestControllerSync(t *testing.T) { klog.V(4).Infof("controller synced, starting test") // Call the tested function - err = test.test(ctrl, reactor, test) + err = test.test(ctrl, reactor.VolumeReactor, test) if err != nil { t.Errorf("Test %q initial test call failed: %v", test.name, err) } @@ -162,7 +163,7 @@ func TestControllerSync(t *testing.T) { } close(stopCh) - evaluateTestResults(ctrl, reactor, test, t) + evaluateTestResults(ctrl, reactor.VolumeReactor, test, t) } } diff --git a/pkg/controller/volume/persistentvolume/recycle_test.go b/pkg/controller/volume/persistentvolume/recycle_test.go index f0c0f7dcba9..2849c687697 100644 --- a/pkg/controller/volume/persistentvolume/recycle_test.go +++ b/pkg/controller/volume/persistentvolume/recycle_test.go @@ -23,6 +23,7 @@ import ( "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing" ) // Test single call to syncVolume, expecting recycling to happen. @@ -130,11 +131,9 @@ func TestRecycleSync(t *testing.T) { noclaims, noclaims, noevents, noerrors, - wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationRecycle, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *volumeReactor) { + wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationRecycle, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor) { // Delete the volume before recycle operation starts - reactor.lock.Lock() - delete(reactor.volumes, "volume6-6") - reactor.lock.Unlock() + reactor.DeleteVolume("volume6-6") }), }, { @@ -147,14 +146,9 @@ func TestRecycleSync(t *testing.T) { noclaims, noclaims, noevents, noerrors, - wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationRecycle, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *volumeReactor) { + wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationRecycle, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor) { // Mark the volume as Available before the recycler starts - reactor.lock.Lock() - volume := reactor.volumes["volume6-7"] - volume.Spec.ClaimRef = nil - volume.Status.Phase = v1.VolumeAvailable - volume.Annotations = nil - reactor.lock.Unlock() + reactor.MarkVolumeAvaiable("volume6-7") }), }, { @@ -164,17 +158,13 @@ func TestRecycleSync(t *testing.T) { // user. "6-8 - prebound volume is deleted before recycling", newVolumeArray("volume6-8", "1Gi", "uid6-8", "claim6-8", v1.VolumeBound, v1.PersistentVolumeReclaimRecycle, classEmpty), - newVolumeArray("volume6-8", "1Gi", "", "claim6-8", v1.VolumeAvailable, v1.PersistentVolumeReclaimRecycle, classEmpty), + newVolumeArray("volume6-8", "1Gi", "", "", v1.VolumeAvailable, v1.PersistentVolumeReclaimRecycle, classEmpty), noclaims, noclaims, noevents, noerrors, - wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationRecycle, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *volumeReactor) { + wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationRecycle, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor) { // Mark the volume as Available before the recycler starts - reactor.lock.Lock() - volume := reactor.volumes["volume6-8"] - volume.Spec.ClaimRef.UID = "" - volume.Status.Phase = v1.VolumeAvailable - reactor.lock.Unlock() + reactor.MarkVolumeAvaiable("volume6-8") }), }, { diff --git a/pkg/controller/volume/persistentvolume/testing/testing.go b/pkg/controller/volume/persistentvolume/testing/testing.go new file mode 100644 index 00000000000..b1fc867cf4b --- /dev/null +++ b/pkg/controller/volume/persistentvolume/testing/testing.go @@ -0,0 +1,589 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "errors" + "fmt" + "reflect" + "strconv" + "sync" + + v1 "k8s.io/api/core/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/diff" + "k8s.io/apimachinery/pkg/watch" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/kubernetes/fake" + core "k8s.io/client-go/testing" + "k8s.io/klog" + "k8s.io/kubernetes/pkg/features" +) + +var VersionConflictError = errors.New("VersionError") + +// VolumeReactor is a core.Reactor that simulates etcd and API server. It +// stores: +// - Latest version of claims volumes saved by the controller. +// - Queue of all saves (to simulate "volume/claim updated" events). This queue +// contains all intermediate state of an object - e.g. a claim.VolumeName +// is updated first and claim.Phase second. This queue will then contain both +// updates as separate entries. +// - Number of changes since the last call to VolumeReactor.syncAll(). +// - Optionally, volume and claim fake watchers which should be the same ones +// used by the controller. Any time an event function like deleteVolumeEvent +// is called to simulate an event, the reactor's stores are updated and the +// controller is sent the event via the fake watcher. +// - Optionally, list of error that should be returned by reactor, simulating +// etcd / API server failures. These errors are evaluated in order and every +// error is returned only once. I.e. when the reactor finds matching +// ReactorError, it return appropriate error and removes the ReactorError from +// the list. +type VolumeReactor struct { + volumes map[string]*v1.PersistentVolume + claims map[string]*v1.PersistentVolumeClaim + changedObjects []interface{} + changedSinceLastSync int + fakeVolumeWatch *watch.FakeWatcher + fakeClaimWatch *watch.FakeWatcher + lock sync.RWMutex + errors []ReactorError + watchers map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher +} + +// ReactorError is an error that is returned by test reactor (=simulated +// etcd+/API server) when an action performed by the reactor matches given verb +// ("get", "update", "create", "delete" or "*"") on given resource +// ("persistentvolumes", "persistentvolumeclaims" or "*"). +type ReactorError struct { + Verb string + Resource string + Error error +} + +// React is a callback called by fake kubeClient from the controller. +// In other words, every claim/volume change performed by the controller ends +// here. +// This callback checks versions of the updated objects and refuse those that +// are too old (simulating real etcd). +// All updated objects are stored locally to keep track of object versions and +// to evaluate test results. +// All updated objects are also inserted into changedObjects queue and +// optionally sent back to the controller via its watchers. +func (r *VolumeReactor) React(action core.Action) (handled bool, ret runtime.Object, err error) { + r.lock.Lock() + defer r.lock.Unlock() + + klog.V(4).Infof("reactor got operation %q on %q", action.GetVerb(), action.GetResource()) + + // Inject error when requested + err = r.injectReactError(action) + if err != nil { + return true, nil, err + } + + // Test did not request to inject an error, continue simulating API server. + switch { + case action.Matches("create", "persistentvolumes"): + obj := action.(core.UpdateAction).GetObject() + volume := obj.(*v1.PersistentVolume) + + // check the volume does not exist + _, found := r.volumes[volume.Name] + if found { + return true, nil, fmt.Errorf("Cannot create volume %s: volume already exists", volume.Name) + } + + // mimic apiserver defaulting + if volume.Spec.VolumeMode == nil && utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) { + volume.Spec.VolumeMode = new(v1.PersistentVolumeMode) + *volume.Spec.VolumeMode = v1.PersistentVolumeFilesystem + } + + // Store the updated object to appropriate places. + r.volumes[volume.Name] = volume + for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { + w.Add(volume) + } + r.changedObjects = append(r.changedObjects, volume) + r.changedSinceLastSync++ + klog.V(4).Infof("created volume %s", volume.Name) + return true, volume, nil + + case action.Matches("create", "persistentvolumeclaims"): + obj := action.(core.UpdateAction).GetObject() + claim := obj.(*v1.PersistentVolumeClaim) + + // check the claim does not exist + _, found := r.claims[claim.Name] + if found { + return true, nil, fmt.Errorf("Cannot create claim %s: claim already exists", claim.Name) + } + + // Store the updated object to appropriate places. + r.claims[claim.Name] = claim + for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { + w.Add(claim) + } + r.changedObjects = append(r.changedObjects, claim) + r.changedSinceLastSync++ + klog.V(4).Infof("created claim %s", claim.Name) + return true, claim, nil + + case action.Matches("update", "persistentvolumes"): + obj := action.(core.UpdateAction).GetObject() + volume := obj.(*v1.PersistentVolume) + + // Check and bump object version + storedVolume, found := r.volumes[volume.Name] + if found { + storedVer, _ := strconv.Atoi(storedVolume.ResourceVersion) + requestedVer, _ := strconv.Atoi(volume.ResourceVersion) + if storedVer != requestedVer { + return true, obj, VersionConflictError + } + if reflect.DeepEqual(storedVolume, volume) { + klog.V(4).Infof("nothing updated volume %s", volume.Name) + return true, volume, nil + } + // Don't modify the existing object + volume = volume.DeepCopy() + volume.ResourceVersion = strconv.Itoa(storedVer + 1) + } else { + return true, nil, fmt.Errorf("Cannot update volume %s: volume not found", volume.Name) + } + + // Store the updated object to appropriate places. + for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { + w.Modify(volume) + } + r.volumes[volume.Name] = volume + r.changedObjects = append(r.changedObjects, volume) + r.changedSinceLastSync++ + klog.V(4).Infof("saved updated volume %s", volume.Name) + return true, volume, nil + + case action.Matches("update", "persistentvolumeclaims"): + obj := action.(core.UpdateAction).GetObject() + claim := obj.(*v1.PersistentVolumeClaim) + + // Check and bump object version + storedClaim, found := r.claims[claim.Name] + if found { + storedVer, _ := strconv.Atoi(storedClaim.ResourceVersion) + requestedVer, _ := strconv.Atoi(claim.ResourceVersion) + if storedVer != requestedVer { + return true, obj, VersionConflictError + } + if reflect.DeepEqual(storedClaim, claim) { + klog.V(4).Infof("nothing updated claim %s", claim.Name) + return true, claim, nil + } + // Don't modify the existing object + claim = claim.DeepCopy() + claim.ResourceVersion = strconv.Itoa(storedVer + 1) + } else { + return true, nil, fmt.Errorf("Cannot update claim %s: claim not found", claim.Name) + } + + // Store the updated object to appropriate places. + for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { + w.Modify(claim) + } + r.claims[claim.Name] = claim + r.changedObjects = append(r.changedObjects, claim) + r.changedSinceLastSync++ + klog.V(4).Infof("saved updated claim %s", claim.Name) + return true, claim, nil + + case action.Matches("get", "persistentvolumes"): + name := action.(core.GetAction).GetName() + volume, found := r.volumes[name] + if found { + klog.V(4).Infof("GetVolume: found %s", volume.Name) + return true, volume.DeepCopy(), nil + } else { + klog.V(4).Infof("GetVolume: volume %s not found", name) + return true, nil, fmt.Errorf("Cannot find volume %s", name) + } + + case action.Matches("get", "persistentvolumeclaims"): + name := action.(core.GetAction).GetName() + claim, found := r.claims[name] + if found { + klog.V(4).Infof("GetClaim: found %s", claim.Name) + return true, claim.DeepCopy(), nil + } else { + klog.V(4).Infof("GetClaim: claim %s not found", name) + return true, nil, apierrs.NewNotFound(action.GetResource().GroupResource(), name) + } + + case action.Matches("delete", "persistentvolumes"): + name := action.(core.DeleteAction).GetName() + klog.V(4).Infof("deleted volume %s", name) + obj, found := r.volumes[name] + if found { + delete(r.volumes, name) + for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { + w.Delete(obj) + } + r.changedSinceLastSync++ + return true, nil, nil + } else { + return true, nil, fmt.Errorf("Cannot delete volume %s: not found", name) + } + + case action.Matches("delete", "persistentvolumeclaims"): + name := action.(core.DeleteAction).GetName() + klog.V(4).Infof("deleted claim %s", name) + obj, found := r.claims[name] + if found { + delete(r.claims, name) + for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { + w.Delete(obj) + } + r.changedSinceLastSync++ + return true, nil, nil + } else { + return true, nil, fmt.Errorf("Cannot delete claim %s: not found", name) + } + } + + return false, nil, nil +} + +// Watch watches objects from the VolumeReactor. Watch returns a channel which +// will push added / modified / deleted object. +func (r *VolumeReactor) Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) { + r.lock.Lock() + defer r.lock.Unlock() + + fakewatcher := watch.NewRaceFreeFake() + + if _, exists := r.watchers[gvr]; !exists { + r.watchers[gvr] = make(map[string][]*watch.RaceFreeFakeWatcher) + } + r.watchers[gvr][ns] = append(r.watchers[gvr][ns], fakewatcher) + return fakewatcher, nil +} + +func (r *VolumeReactor) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.RaceFreeFakeWatcher { + watches := []*watch.RaceFreeFakeWatcher{} + if r.watchers[gvr] != nil { + if w := r.watchers[gvr][ns]; w != nil { + watches = append(watches, w...) + } + if ns != metav1.NamespaceAll { + if w := r.watchers[gvr][metav1.NamespaceAll]; w != nil { + watches = append(watches, w...) + } + } + } + return watches +} + +func (r *VolumeReactor) ChangedSinceLastSync() int { + r.lock.RLock() + defer r.lock.RUnlock() + return r.changedSinceLastSync +} + +// injectReactError returns an error when the test requested given action to +// fail. nil is returned otherwise. +func (r *VolumeReactor) injectReactError(action core.Action) error { + if len(r.errors) == 0 { + // No more errors to inject, everything should succeed. + return nil + } + + for i, expected := range r.errors { + klog.V(4).Infof("trying to match %q %q with %q %q", expected.Verb, expected.Resource, action.GetVerb(), action.GetResource()) + if action.Matches(expected.Verb, expected.Resource) { + // That's the action we're waiting for, remove it from injectedErrors + r.errors = append(r.errors[:i], r.errors[i+1:]...) + klog.V(4).Infof("reactor found matching error at index %d: %q %q, returning %v", i, expected.Verb, expected.Resource, expected.Error) + return expected.Error + } + } + return nil +} + +// CheckVolumes compares all expectedVolumes with set of volumes at the end of +// the test and reports differences. +func (r *VolumeReactor) CheckVolumes(expectedVolumes []*v1.PersistentVolume) error { + r.lock.Lock() + defer r.lock.Unlock() + + expectedMap := make(map[string]*v1.PersistentVolume) + gotMap := make(map[string]*v1.PersistentVolume) + // Clear any ResourceVersion from both sets + for _, v := range expectedVolumes { + // Don't modify the existing object + v := v.DeepCopy() + v.ResourceVersion = "" + if v.Spec.ClaimRef != nil { + v.Spec.ClaimRef.ResourceVersion = "" + } + expectedMap[v.Name] = v + } + for _, v := range r.volumes { + // We must clone the volume because of golang race check - it was + // written by the controller without any locks on it. + v := v.DeepCopy() + v.ResourceVersion = "" + if v.Spec.ClaimRef != nil { + v.Spec.ClaimRef.ResourceVersion = "" + } + gotMap[v.Name] = v + } + if !reflect.DeepEqual(expectedMap, gotMap) { + // Print ugly but useful diff of expected and received objects for + // easier debugging. + return fmt.Errorf("Volume check failed [A-expected, B-got]: %s", diff.ObjectDiff(expectedMap, gotMap)) + } + return nil +} + +// CheckClaims compares all expectedClaims with set of claims at the end of the +// test and reports differences. +func (r *VolumeReactor) CheckClaims(expectedClaims []*v1.PersistentVolumeClaim) error { + r.lock.Lock() + defer r.lock.Unlock() + + expectedMap := make(map[string]*v1.PersistentVolumeClaim) + gotMap := make(map[string]*v1.PersistentVolumeClaim) + for _, c := range expectedClaims { + // Don't modify the existing object + c = c.DeepCopy() + c.ResourceVersion = "" + expectedMap[c.Name] = c + } + for _, c := range r.claims { + // We must clone the claim because of golang race check - it was + // written by the controller without any locks on it. + c = c.DeepCopy() + c.ResourceVersion = "" + gotMap[c.Name] = c + } + if !reflect.DeepEqual(expectedMap, gotMap) { + // Print ugly but useful diff of expected and received objects for + // easier debugging. + return fmt.Errorf("Claim check failed [A-expected, B-got result]: %s", diff.ObjectDiff(expectedMap, gotMap)) + } + return nil +} + +// PopChange returns one recorded updated object, either *v1.PersistentVolume +// or *v1.PersistentVolumeClaim. Returns nil when there are no changes. +func (r *VolumeReactor) PopChange() interface{} { + r.lock.Lock() + defer r.lock.Unlock() + + if len(r.changedObjects) == 0 { + return nil + } + + // For debugging purposes, print the queue + for _, obj := range r.changedObjects { + switch obj.(type) { + case *v1.PersistentVolume: + vol, _ := obj.(*v1.PersistentVolume) + klog.V(4).Infof("reactor queue: %s", vol.Name) + case *v1.PersistentVolumeClaim: + claim, _ := obj.(*v1.PersistentVolumeClaim) + klog.V(4).Infof("reactor queue: %s", claim.Name) + } + } + + // Pop the first item from the queue and return it + obj := r.changedObjects[0] + r.changedObjects = r.changedObjects[1:] + return obj +} + +// SyncAll simulates the controller periodic sync of volumes and claim. It +// simply adds all these objects to the internal queue of updates. This method +// should be used when the test manually calls syncClaim/syncVolume. Test that +// use real controller loop (ctrl.Run()) will get periodic sync automatically. +func (r *VolumeReactor) SyncAll() { + r.lock.Lock() + defer r.lock.Unlock() + + for _, c := range r.claims { + r.changedObjects = append(r.changedObjects, c) + } + for _, v := range r.volumes { + r.changedObjects = append(r.changedObjects, v) + } + r.changedSinceLastSync = 0 +} + +func (r *VolumeReactor) GetChangeCount() int { + r.lock.Lock() + defer r.lock.Unlock() + return r.changedSinceLastSync +} + +// DeleteVolumeEvent simulates that a volume has been deleted in etcd and +// the controller receives 'volume deleted' event. +func (r *VolumeReactor) DeleteVolumeEvent(volume *v1.PersistentVolume) { + r.lock.Lock() + defer r.lock.Unlock() + + // Remove the volume from list of resulting volumes. + delete(r.volumes, volume.Name) + + // Generate deletion event. Cloned volume is needed to prevent races (and we + // would get a clone from etcd too). + if r.fakeVolumeWatch != nil { + r.fakeVolumeWatch.Delete(volume.DeepCopy()) + } +} + +// DeleteClaimEvent simulates that a claim has been deleted in etcd and the +// controller receives 'claim deleted' event. +func (r *VolumeReactor) DeleteClaimEvent(claim *v1.PersistentVolumeClaim) { + r.lock.Lock() + defer r.lock.Unlock() + + // Remove the claim from list of resulting claims. + delete(r.claims, claim.Name) + + // Generate deletion event. Cloned volume is needed to prevent races (and we + // would get a clone from etcd too). + if r.fakeClaimWatch != nil { + r.fakeClaimWatch.Delete(claim.DeepCopy()) + } +} + +// addVolumeEvent simulates that a volume has been added in etcd and the +// controller receives 'volume added' event. +func (r *VolumeReactor) addVolumeEvent(volume *v1.PersistentVolume) { + r.lock.Lock() + defer r.lock.Unlock() + + r.volumes[volume.Name] = volume + // Generate event. No cloning is needed, this claim is not stored in the + // controller cache yet. + if r.fakeVolumeWatch != nil { + r.fakeVolumeWatch.Add(volume) + } +} + +// modifyVolumeEvent simulates that a volume has been modified in etcd and the +// controller receives 'volume modified' event. +func (r *VolumeReactor) modifyVolumeEvent(volume *v1.PersistentVolume) { + r.lock.Lock() + defer r.lock.Unlock() + + r.volumes[volume.Name] = volume + // Generate deletion event. Cloned volume is needed to prevent races (and we + // would get a clone from etcd too). + if r.fakeVolumeWatch != nil { + r.fakeVolumeWatch.Modify(volume.DeepCopy()) + } +} + +// AddClaimEvent simulates that a claim has been deleted in etcd and the +// controller receives 'claim added' event. +func (r *VolumeReactor) AddClaimEvent(claim *v1.PersistentVolumeClaim) { + r.lock.Lock() + defer r.lock.Unlock() + + r.claims[claim.Name] = claim + // Generate event. No cloning is needed, this claim is not stored in the + // controller cache yet. + if r.fakeClaimWatch != nil { + r.fakeClaimWatch.Add(claim) + } +} + +func (r *VolumeReactor) AddClaims(claims []*v1.PersistentVolumeClaim) { + r.lock.Lock() + defer r.lock.Unlock() + for _, claim := range claims { + r.claims[claim.Name] = claim + } +} + +func (r *VolumeReactor) AddVolumes(volumes []*v1.PersistentVolume) { + r.lock.Lock() + defer r.lock.Unlock() + for _, volume := range volumes { + r.volumes[volume.Name] = volume + } +} + +func (r *VolumeReactor) AddClaim(claim *v1.PersistentVolumeClaim) { + r.lock.Lock() + defer r.lock.Unlock() + r.claims[claim.Name] = claim +} + +func (r *VolumeReactor) AddVolume(volume *v1.PersistentVolume) { + r.lock.Lock() + defer r.lock.Unlock() + r.volumes[volume.Name] = volume +} + +func (r *VolumeReactor) DeleteVolume(name string) { + r.lock.Lock() + defer r.lock.Unlock() + delete(r.volumes, name) +} + +func (r *VolumeReactor) AddClaimBoundToVolume(claim *v1.PersistentVolumeClaim) { + r.lock.Lock() + defer r.lock.Unlock() + r.claims[claim.Name] = claim + if volume, ok := r.volumes[claim.Spec.VolumeName]; ok { + volume.Status.Phase = v1.VolumeBound + } +} + +func (r *VolumeReactor) MarkVolumeAvaiable(name string) { + r.lock.Lock() + defer r.lock.Unlock() + if volume, ok := r.volumes[name]; ok { + volume.Spec.ClaimRef = nil + volume.Status.Phase = v1.VolumeAvailable + volume.Annotations = nil + } +} + +func NewVolumeReactor(client *fake.Clientset, fakeVolumeWatch, fakeClaimWatch *watch.FakeWatcher, errors []ReactorError) *VolumeReactor { + reactor := &VolumeReactor{ + volumes: make(map[string]*v1.PersistentVolume), + claims: make(map[string]*v1.PersistentVolumeClaim), + fakeVolumeWatch: fakeVolumeWatch, + fakeClaimWatch: fakeClaimWatch, + errors: errors, + watchers: make(map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher), + } + client.AddReactor("create", "persistentvolumes", reactor.React) + client.AddReactor("create", "persistentvolumeclaims", reactor.React) + client.AddReactor("update", "persistentvolumes", reactor.React) + client.AddReactor("update", "persistentvolumeclaims", reactor.React) + client.AddReactor("get", "persistentvolumes", reactor.React) + client.AddReactor("get", "persistentvolumeclaims", reactor.React) + client.AddReactor("delete", "persistentvolumes", reactor.React) + client.AddReactor("delete", "persistentvolumeclaims", reactor.React) + return reactor +}