Extract testing VolumeReactor into a separate package

This commit is contained in:
Yecheng Fu 2019-03-18 09:55:28 +08:00
parent 6d691c9985
commit 0b6c028c8a
6 changed files with 694 additions and 610 deletions

View File

@ -22,6 +22,7 @@ import (
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing"
)
// Test single call to syncVolume, expecting recycling to happen.
@ -91,11 +92,9 @@ func TestDeleteSync(t *testing.T) {
noclaims,
noclaims,
noevents, noerrors,
wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationDelete, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *volumeReactor) {
wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationDelete, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor) {
// Delete the volume before delete operation starts
reactor.lock.Lock()
delete(reactor.volumes, "volume8-6")
reactor.lock.Unlock()
reactor.DeleteVolume("volume8-6")
}),
},
{
@ -108,16 +107,12 @@ func TestDeleteSync(t *testing.T) {
noclaims,
newClaimArray("claim8-7", "uid8-7", "10Gi", "volume8-7", v1.ClaimBound, nil),
noevents, noerrors,
wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationDelete, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *volumeReactor) {
reactor.lock.Lock()
defer reactor.lock.Unlock()
wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationDelete, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor) {
// Bind the volume to resurrected claim (this should never
// happen)
claim := newClaim("claim8-7", "uid8-7", "10Gi", "volume8-7", v1.ClaimBound, nil)
reactor.claims[claim.Name] = claim
reactor.AddClaimBoundToVolume(claim)
ctrl.claims.Add(claim)
volume := reactor.volumes["volume8-7"]
volume.Status.Phase = v1.VolumeBound
}),
},
{
@ -141,7 +136,7 @@ func TestDeleteSync(t *testing.T) {
noclaims,
noclaims,
noevents, noerrors,
func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
// Inject external deleter annotation
test.initialVolumes[0].Annotations[annDynamicallyProvisioned] = "external.io/test"
test.expectedVolumes[0].Annotations[annDynamicallyProvisioned] = "external.io/test"
@ -184,7 +179,7 @@ func TestDeleteSync(t *testing.T) {
newClaimArray("claim8-12", "uid8-12", "10Gi", "volume8-12-2", v1.ClaimBound, nil),
newClaimArray("claim8-12", "uid8-12", "10Gi", "volume8-12-2", v1.ClaimBound, nil),
noevents, noerrors,
func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
// Inject external deleter annotation
test.initialVolumes[0].Annotations[annDynamicallyProvisioned] = "external.io/test"
test.expectedVolumes[0].Annotations[annDynamicallyProvisioned] = "external.io/test"

View File

@ -17,12 +17,9 @@ limitations under the License.
package persistentvolume
import (
"errors"
"fmt"
"reflect"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@ -31,27 +28,21 @@ import (
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
corelisters "k8s.io/client-go/listers/core/v1"
storagelisters "k8s.io/client-go/listers/storage/v1"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/features"
pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing"
vol "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/recyclerclient"
)
@ -95,366 +86,75 @@ type controllerTest struct {
// event message.
expectedEvents []string
// Errors to produce on matching action
errors []reactorError
errors []pvtesting.ReactorError
// Function to call as the test.
test testCall
}
type testCall func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error
type testCall func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error
const testNamespace = "default"
const mockPluginName = "kubernetes.io/mock-volume"
var versionConflictError = errors.New("VersionError")
var novolumes []*v1.PersistentVolume
var noclaims []*v1.PersistentVolumeClaim
var noevents = []string{}
var noerrors = []reactorError{}
var noerrors = []pvtesting.ReactorError{}
// volumeReactor is a core.Reactor that simulates etcd and API server. It
// stores:
// - Latest version of claims volumes saved by the controller.
// - Queue of all saves (to simulate "volume/claim updated" events). This queue
// contains all intermediate state of an object - e.g. a claim.VolumeName
// is updated first and claim.Phase second. This queue will then contain both
// updates as separate entries.
// - Number of changes since the last call to volumeReactor.syncAll().
// - Optionally, volume and claim fake watchers which should be the same ones
// used by the controller. Any time an event function like deleteVolumeEvent
// is called to simulate an event, the reactor's stores are updated and the
// controller is sent the event via the fake watcher.
// - Optionally, list of error that should be returned by reactor, simulating
// etcd / API server failures. These errors are evaluated in order and every
// error is returned only once. I.e. when the reactor finds matching
// reactorError, it return appropriate error and removes the reactorError from
// the list.
type volumeReactor struct {
volumes map[string]*v1.PersistentVolume
claims map[string]*v1.PersistentVolumeClaim
changedObjects []interface{}
changedSinceLastSync int
ctrl *PersistentVolumeController
fakeVolumeWatch *watch.FakeWatcher
fakeClaimWatch *watch.FakeWatcher
lock sync.Mutex
errors []reactorError
watchers map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher
*pvtesting.VolumeReactor
ctrl *PersistentVolumeController
}
// reactorError is an error that is returned by test reactor (=simulated
// etcd+/API server) when an action performed by the reactor matches given verb
// ("get", "update", "create", "delete" or "*"") on given resource
// ("persistentvolumes", "persistentvolumeclaims" or "*").
type reactorError struct {
verb string
resource string
error error
func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController, fakeVolumeWatch, fakeClaimWatch *watch.FakeWatcher, errors []pvtesting.ReactorError) *volumeReactor {
return &volumeReactor{
pvtesting.NewVolumeReactor(client, fakeVolumeWatch, fakeClaimWatch, errors),
ctrl,
}
}
// React is a callback called by fake kubeClient from the controller.
// In other words, every claim/volume change performed by the controller ends
// here.
// This callback checks versions of the updated objects and refuse those that
// are too old (simulating real etcd).
// All updated objects are stored locally to keep track of object versions and
// to evaluate test results.
// All updated objects are also inserted into changedObjects queue and
// optionally sent back to the controller via its watchers.
func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Object, err error) {
r.lock.Lock()
defer r.lock.Unlock()
klog.V(4).Infof("reactor got operation %q on %q", action.GetVerb(), action.GetResource())
// Inject error when requested
err = r.injectReactError(action)
if err != nil {
return true, nil, err
// waitForIdle waits until all tests, controllers and other goroutines do their
// job and no new actions are registered for 10 milliseconds.
func (r *volumeReactor) waitForIdle() {
r.ctrl.runningOperations.WaitForCompletion()
// Check every 10ms if the controller does something and stop if it's
// idle.
oldChanges := -1
for {
time.Sleep(10 * time.Millisecond)
changes := r.GetChangeCount()
if changes == oldChanges {
// No changes for last 10ms -> controller must be idle.
break
}
oldChanges = changes
}
// Test did not request to inject an error, continue simulating API server.
switch {
case action.Matches("create", "persistentvolumes"):
obj := action.(core.UpdateAction).GetObject()
volume := obj.(*v1.PersistentVolume)
// check the volume does not exist
_, found := r.volumes[volume.Name]
if found {
return true, nil, fmt.Errorf("Cannot create volume %s: volume already exists", volume.Name)
}
// mimic apiserver defaulting
if volume.Spec.VolumeMode == nil && utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
volume.Spec.VolumeMode = new(v1.PersistentVolumeMode)
*volume.Spec.VolumeMode = v1.PersistentVolumeFilesystem
}
// Store the updated object to appropriate places.
r.volumes[volume.Name] = volume
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Add(volume)
}
r.changedObjects = append(r.changedObjects, volume)
r.changedSinceLastSync++
klog.V(4).Infof("created volume %s", volume.Name)
return true, volume, nil
case action.Matches("create", "persistentvolumeclaims"):
obj := action.(core.UpdateAction).GetObject()
claim := obj.(*v1.PersistentVolumeClaim)
// check the claim does not exist
_, found := r.claims[claim.Name]
if found {
return true, nil, fmt.Errorf("Cannot create claim %s: claim already exists", claim.Name)
}
// Store the updated object to appropriate places.
r.claims[claim.Name] = claim
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Add(claim)
}
r.changedObjects = append(r.changedObjects, claim)
r.changedSinceLastSync++
klog.V(4).Infof("created claim %s", claim.Name)
return true, claim, nil
case action.Matches("update", "persistentvolumes"):
obj := action.(core.UpdateAction).GetObject()
volume := obj.(*v1.PersistentVolume)
// Check and bump object version
storedVolume, found := r.volumes[volume.Name]
if found {
storedVer, _ := strconv.Atoi(storedVolume.ResourceVersion)
requestedVer, _ := strconv.Atoi(volume.ResourceVersion)
if storedVer != requestedVer {
return true, obj, versionConflictError
}
if reflect.DeepEqual(storedVolume, volume) {
klog.V(4).Infof("nothing updated volume %s", volume.Name)
return true, volume, nil
}
// Don't modify the existing object
volume = volume.DeepCopy()
volume.ResourceVersion = strconv.Itoa(storedVer + 1)
} else {
return true, nil, fmt.Errorf("Cannot update volume %s: volume not found", volume.Name)
}
// Store the updated object to appropriate places.
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Modify(volume)
}
r.volumes[volume.Name] = volume
r.changedObjects = append(r.changedObjects, volume)
r.changedSinceLastSync++
klog.V(4).Infof("saved updated volume %s", volume.Name)
return true, volume, nil
case action.Matches("update", "persistentvolumeclaims"):
obj := action.(core.UpdateAction).GetObject()
claim := obj.(*v1.PersistentVolumeClaim)
// Check and bump object version
storedClaim, found := r.claims[claim.Name]
if found {
storedVer, _ := strconv.Atoi(storedClaim.ResourceVersion)
requestedVer, _ := strconv.Atoi(claim.ResourceVersion)
if storedVer != requestedVer {
return true, obj, versionConflictError
}
if reflect.DeepEqual(storedClaim, claim) {
klog.V(4).Infof("nothing updated claim %s", claim.Name)
return true, claim, nil
}
// Don't modify the existing object
claim = claim.DeepCopy()
claim.ResourceVersion = strconv.Itoa(storedVer + 1)
} else {
return true, nil, fmt.Errorf("Cannot update claim %s: claim not found", claim.Name)
}
// Store the updated object to appropriate places.
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Modify(claim)
}
r.claims[claim.Name] = claim
r.changedObjects = append(r.changedObjects, claim)
r.changedSinceLastSync++
klog.V(4).Infof("saved updated claim %s", claim.Name)
return true, claim, nil
case action.Matches("get", "persistentvolumes"):
name := action.(core.GetAction).GetName()
volume, found := r.volumes[name]
if found {
klog.V(4).Infof("GetVolume: found %s", volume.Name)
return true, volume.DeepCopy(), nil
} else {
klog.V(4).Infof("GetVolume: volume %s not found", name)
return true, nil, fmt.Errorf("Cannot find volume %s", name)
}
case action.Matches("get", "persistentvolumeclaims"):
name := action.(core.GetAction).GetName()
claim, found := r.claims[name]
if found {
klog.V(4).Infof("GetClaim: found %s", claim.Name)
return true, claim.DeepCopy(), nil
} else {
klog.V(4).Infof("GetClaim: claim %s not found", name)
return true, nil, apierrs.NewNotFound(action.GetResource().GroupResource(), name)
}
case action.Matches("delete", "persistentvolumes"):
name := action.(core.DeleteAction).GetName()
klog.V(4).Infof("deleted volume %s", name)
obj, found := r.volumes[name]
if found {
delete(r.volumes, name)
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Delete(obj)
}
r.changedSinceLastSync++
return true, nil, nil
} else {
return true, nil, fmt.Errorf("Cannot delete volume %s: not found", name)
}
case action.Matches("delete", "persistentvolumeclaims"):
name := action.(core.DeleteAction).GetName()
klog.V(4).Infof("deleted claim %s", name)
obj, found := r.claims[name]
if found {
delete(r.claims, name)
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Delete(obj)
}
r.changedSinceLastSync++
return true, nil, nil
} else {
return true, nil, fmt.Errorf("Cannot delete claim %s: not found", name)
}
}
return false, nil, nil
}
// Watch watches objects from the volumeReactor. Watch returns a channel which
// will push added / modified / deleted object.
func (r *volumeReactor) Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) {
r.lock.Lock()
defer r.lock.Unlock()
fakewatcher := watch.NewRaceFreeFake()
if _, exists := r.watchers[gvr]; !exists {
r.watchers[gvr] = make(map[string][]*watch.RaceFreeFakeWatcher)
// waitTest waits until all tests, controllers and other goroutines do their
// job and list of current volumes/claims is equal to list of expected
// volumes/claims (with ~10 second timeout).
func (r *volumeReactor) waitTest(test controllerTest) error {
// start with 10 ms, multiply by 2 each step, 10 steps = 10.23 seconds
backoff := wait.Backoff{
Duration: 10 * time.Millisecond,
Jitter: 0,
Factor: 2,
Steps: 10,
}
r.watchers[gvr][ns] = append(r.watchers[gvr][ns], fakewatcher)
return fakewatcher, nil
}
err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
// Finish all operations that are in progress
r.ctrl.runningOperations.WaitForCompletion()
func (r *volumeReactor) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.RaceFreeFakeWatcher {
watches := []*watch.RaceFreeFakeWatcher{}
if r.watchers[gvr] != nil {
if w := r.watchers[gvr][ns]; w != nil {
watches = append(watches, w...)
// Return 'true' if the reactor reached the expected state
err1 := r.CheckClaims(test.expectedClaims)
err2 := r.CheckVolumes(test.expectedVolumes)
if err1 == nil && err2 == nil {
return true, nil
}
if ns != metav1.NamespaceAll {
if w := r.watchers[gvr][metav1.NamespaceAll]; w != nil {
watches = append(watches, w...)
}
}
}
return watches
}
// injectReactError returns an error when the test requested given action to
// fail. nil is returned otherwise.
func (r *volumeReactor) injectReactError(action core.Action) error {
if len(r.errors) == 0 {
// No more errors to inject, everything should succeed.
return nil
}
for i, expected := range r.errors {
klog.V(4).Infof("trying to match %q %q with %q %q", expected.verb, expected.resource, action.GetVerb(), action.GetResource())
if action.Matches(expected.verb, expected.resource) {
// That's the action we're waiting for, remove it from injectedErrors
r.errors = append(r.errors[:i], r.errors[i+1:]...)
klog.V(4).Infof("reactor found matching error at index %d: %q %q, returning %v", i, expected.verb, expected.resource, expected.error)
return expected.error
}
}
return nil
}
// checkVolumes compares all expectedVolumes with set of volumes at the end of
// the test and reports differences.
func (r *volumeReactor) checkVolumes(expectedVolumes []*v1.PersistentVolume) error {
r.lock.Lock()
defer r.lock.Unlock()
expectedMap := make(map[string]*v1.PersistentVolume)
gotMap := make(map[string]*v1.PersistentVolume)
// Clear any ResourceVersion from both sets
for _, v := range expectedVolumes {
// Don't modify the existing object
v := v.DeepCopy()
v.ResourceVersion = ""
if v.Spec.ClaimRef != nil {
v.Spec.ClaimRef.ResourceVersion = ""
}
expectedMap[v.Name] = v
}
for _, v := range r.volumes {
// We must clone the volume because of golang race check - it was
// written by the controller without any locks on it.
v := v.DeepCopy()
v.ResourceVersion = ""
if v.Spec.ClaimRef != nil {
v.Spec.ClaimRef.ResourceVersion = ""
}
gotMap[v.Name] = v
}
if !reflect.DeepEqual(expectedMap, gotMap) {
// Print ugly but useful diff of expected and received objects for
// easier debugging.
return fmt.Errorf("Volume check failed [A-expected, B-got]: %s", diff.ObjectDiff(expectedMap, gotMap))
}
return nil
}
// checkClaims compares all expectedClaims with set of claims at the end of the
// test and reports differences.
func (r *volumeReactor) checkClaims(expectedClaims []*v1.PersistentVolumeClaim) error {
r.lock.Lock()
defer r.lock.Unlock()
expectedMap := make(map[string]*v1.PersistentVolumeClaim)
gotMap := make(map[string]*v1.PersistentVolumeClaim)
for _, c := range expectedClaims {
// Don't modify the existing object
c = c.DeepCopy()
c.ResourceVersion = ""
expectedMap[c.Name] = c
}
for _, c := range r.claims {
// We must clone the claim because of golang race check - it was
// written by the controller without any locks on it.
c = c.DeepCopy()
c.ResourceVersion = ""
gotMap[c.Name] = c
}
if !reflect.DeepEqual(expectedMap, gotMap) {
// Print ugly but useful diff of expected and received objects for
// easier debugging.
return fmt.Errorf("Claim check failed [A-expected, B-got result]: %s", diff.ObjectDiff(expectedMap, gotMap))
}
return nil
return false, nil
})
return err
}
// checkEvents compares all expectedEvents with events generated during the test
@ -506,196 +206,6 @@ func checkEvents(t *testing.T, expectedEvents []string, ctrl *PersistentVolumeCo
return err
}
// popChange returns one recorded updated object, either *v1.PersistentVolume
// or *v1.PersistentVolumeClaim. Returns nil when there are no changes.
func (r *volumeReactor) popChange() interface{} {
r.lock.Lock()
defer r.lock.Unlock()
if len(r.changedObjects) == 0 {
return nil
}
// For debugging purposes, print the queue
for _, obj := range r.changedObjects {
switch obj.(type) {
case *v1.PersistentVolume:
vol, _ := obj.(*v1.PersistentVolume)
klog.V(4).Infof("reactor queue: %s", vol.Name)
case *v1.PersistentVolumeClaim:
claim, _ := obj.(*v1.PersistentVolumeClaim)
klog.V(4).Infof("reactor queue: %s", claim.Name)
}
}
// Pop the first item from the queue and return it
obj := r.changedObjects[0]
r.changedObjects = r.changedObjects[1:]
return obj
}
// syncAll simulates the controller periodic sync of volumes and claim. It
// simply adds all these objects to the internal queue of updates. This method
// should be used when the test manually calls syncClaim/syncVolume. Test that
// use real controller loop (ctrl.Run()) will get periodic sync automatically.
func (r *volumeReactor) syncAll() {
r.lock.Lock()
defer r.lock.Unlock()
for _, c := range r.claims {
r.changedObjects = append(r.changedObjects, c)
}
for _, v := range r.volumes {
r.changedObjects = append(r.changedObjects, v)
}
r.changedSinceLastSync = 0
}
func (r *volumeReactor) getChangeCount() int {
r.lock.Lock()
defer r.lock.Unlock()
return r.changedSinceLastSync
}
// waitForIdle waits until all tests, controllers and other goroutines do their
// job and no new actions are registered for 10 milliseconds.
func (r *volumeReactor) waitForIdle() {
r.ctrl.runningOperations.WaitForCompletion()
// Check every 10ms if the controller does something and stop if it's
// idle.
oldChanges := -1
for {
time.Sleep(10 * time.Millisecond)
changes := r.getChangeCount()
if changes == oldChanges {
// No changes for last 10ms -> controller must be idle.
break
}
oldChanges = changes
}
}
// waitTest waits until all tests, controllers and other goroutines do their
// job and list of current volumes/claims is equal to list of expected
// volumes/claims (with ~10 second timeout).
func (r *volumeReactor) waitTest(test controllerTest) error {
// start with 10 ms, multiply by 2 each step, 10 steps = 10.23 seconds
backoff := wait.Backoff{
Duration: 10 * time.Millisecond,
Jitter: 0,
Factor: 2,
Steps: 10,
}
err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
// Finish all operations that are in progress
r.ctrl.runningOperations.WaitForCompletion()
// Return 'true' if the reactor reached the expected state
err1 := r.checkClaims(test.expectedClaims)
err2 := r.checkVolumes(test.expectedVolumes)
if err1 == nil && err2 == nil {
return true, nil
}
return false, nil
})
return err
}
// deleteVolumeEvent simulates that a volume has been deleted in etcd and
// the controller receives 'volume deleted' event.
func (r *volumeReactor) deleteVolumeEvent(volume *v1.PersistentVolume) {
r.lock.Lock()
defer r.lock.Unlock()
// Remove the volume from list of resulting volumes.
delete(r.volumes, volume.Name)
// Generate deletion event. Cloned volume is needed to prevent races (and we
// would get a clone from etcd too).
if r.fakeVolumeWatch != nil {
r.fakeVolumeWatch.Delete(volume.DeepCopy())
}
}
// deleteClaimEvent simulates that a claim has been deleted in etcd and the
// controller receives 'claim deleted' event.
func (r *volumeReactor) deleteClaimEvent(claim *v1.PersistentVolumeClaim) {
r.lock.Lock()
defer r.lock.Unlock()
// Remove the claim from list of resulting claims.
delete(r.claims, claim.Name)
// Generate deletion event. Cloned volume is needed to prevent races (and we
// would get a clone from etcd too).
if r.fakeClaimWatch != nil {
r.fakeClaimWatch.Delete(claim.DeepCopy())
}
}
// addVolumeEvent simulates that a volume has been added in etcd and the
// controller receives 'volume added' event.
func (r *volumeReactor) addVolumeEvent(volume *v1.PersistentVolume) {
r.lock.Lock()
defer r.lock.Unlock()
r.volumes[volume.Name] = volume
// Generate event. No cloning is needed, this claim is not stored in the
// controller cache yet.
if r.fakeVolumeWatch != nil {
r.fakeVolumeWatch.Add(volume)
}
}
// modifyVolumeEvent simulates that a volume has been modified in etcd and the
// controller receives 'volume modified' event.
func (r *volumeReactor) modifyVolumeEvent(volume *v1.PersistentVolume) {
r.lock.Lock()
defer r.lock.Unlock()
r.volumes[volume.Name] = volume
// Generate deletion event. Cloned volume is needed to prevent races (and we
// would get a clone from etcd too).
if r.fakeVolumeWatch != nil {
r.fakeVolumeWatch.Modify(volume.DeepCopy())
}
}
// addClaimEvent simulates that a claim has been deleted in etcd and the
// controller receives 'claim added' event.
func (r *volumeReactor) addClaimEvent(claim *v1.PersistentVolumeClaim) {
r.lock.Lock()
defer r.lock.Unlock()
r.claims[claim.Name] = claim
// Generate event. No cloning is needed, this claim is not stored in the
// controller cache yet.
if r.fakeClaimWatch != nil {
r.fakeClaimWatch.Add(claim)
}
}
func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController, fakeVolumeWatch, fakeClaimWatch *watch.FakeWatcher, errors []reactorError) *volumeReactor {
reactor := &volumeReactor{
volumes: make(map[string]*v1.PersistentVolume),
claims: make(map[string]*v1.PersistentVolumeClaim),
ctrl: ctrl,
fakeVolumeWatch: fakeVolumeWatch,
fakeClaimWatch: fakeClaimWatch,
errors: errors,
watchers: make(map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher),
}
client.AddReactor("create", "persistentvolumes", reactor.React)
client.AddReactor("create", "persistentvolumeclaims", reactor.React)
client.AddReactor("update", "persistentvolumes", reactor.React)
client.AddReactor("update", "persistentvolumeclaims", reactor.React)
client.AddReactor("get", "persistentvolumes", reactor.React)
client.AddReactor("get", "persistentvolumeclaims", reactor.React)
client.AddReactor("delete", "persistentvolumes", reactor.React)
client.AddReactor("delete", "persistentvolumeclaims", reactor.React)
return reactor
}
func alwaysReady() bool { return true }
func newTestController(kubeClient clientset.Interface, informerFactory informers.SharedInformerFactory, enableDynamicProvisioning bool) (*PersistentVolumeController, error) {
@ -915,11 +425,11 @@ func claimWithAccessMode(modes []v1.PersistentVolumeAccessMode, claims []*v1.Per
return claims
}
func testSyncClaim(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
func testSyncClaim(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
return ctrl.syncClaim(test.initialClaims[0])
}
func testSyncClaimError(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
func testSyncClaimError(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
err := ctrl.syncClaim(test.initialClaims[0])
if err != nil {
@ -928,7 +438,7 @@ func testSyncClaimError(ctrl *PersistentVolumeController, reactor *volumeReactor
return fmt.Errorf("syncClaim succeeded when failure was expected")
}
func testSyncVolume(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
func testSyncVolume(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
return ctrl.syncVolume(test.initialVolumes[0])
}
@ -957,7 +467,7 @@ var (
// is deleted, recycled or provisioned.
// - calls given testCall
func wrapTestWithPluginCalls(expectedRecycleCalls, expectedDeleteCalls []error, expectedProvisionCalls []provisionCall, toWrap testCall) testCall {
return func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
return func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
plugin := &mockVolumePlugin{
recycleCalls: expectedRecycleCalls,
deleteCalls: expectedDeleteCalls,
@ -992,7 +502,7 @@ func wrapTestWithProvisionCalls(expectedProvisionCalls []provisionCall, toWrap t
// - configures controller with a volume plugin that emulates CSI migration
// - calls given testCall
func wrapTestWithCSIMigrationProvisionCalls(toWrap testCall) testCall {
return func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
return func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
plugin := &mockVolumePlugin{
isMigratedToCSI: true,
}
@ -1010,9 +520,9 @@ func wrapTestWithCSIMigrationProvisionCalls(toWrap testCall) testCall {
// injected function to simulate that something is happening when the
// controller waits for the operation lock. Controller is then resumed and we
// check how it behaves.
func wrapTestWithInjectedOperation(toWrap testCall, injectBeforeOperation func(ctrl *PersistentVolumeController, reactor *volumeReactor)) testCall {
func wrapTestWithInjectedOperation(toWrap testCall, injectBeforeOperation func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor)) testCall {
return func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
return func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
// Inject a hook before async operation starts
ctrl.preOperationHook = func(operationName string) {
// Inside the hook, run the function to inject
@ -1040,13 +550,13 @@ func wrapTestWithInjectedOperation(toWrap testCall, injectBeforeOperation func(c
}
}
func evaluateTestResults(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest, t *testing.T) {
func evaluateTestResults(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest, t *testing.T) {
// Evaluate results
if err := reactor.checkClaims(test.expectedClaims); err != nil {
if err := reactor.CheckClaims(test.expectedClaims); err != nil {
t.Errorf("Test %q: %v", test.name, err)
}
if err := reactor.checkVolumes(test.expectedVolumes); err != nil {
if err := reactor.CheckVolumes(test.expectedVolumes); err != nil {
t.Errorf("Test %q: %v", test.name, err)
}
@ -1074,12 +584,12 @@ func runSyncTests(t *testing.T, tests []controllerTest, storageClasses []*storag
reactor := newVolumeReactor(client, ctrl, nil, nil, test.errors)
for _, claim := range test.initialClaims {
ctrl.claims.Add(claim)
reactor.claims[claim.Name] = claim
}
for _, volume := range test.initialVolumes {
ctrl.volumes.store.Add(volume)
reactor.volumes[volume.Name] = volume
}
reactor.AddClaims(test.initialClaims)
reactor.AddVolumes(test.initialVolumes)
// Inject classes into controller via a custom lister.
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
@ -1095,7 +605,7 @@ func runSyncTests(t *testing.T, tests []controllerTest, storageClasses []*storag
ctrl.podLister = corelisters.NewPodLister(podIndexer)
// Run the tested functions
err = test.test(ctrl, reactor, test)
err = test.test(ctrl, reactor.VolumeReactor, test)
if err != nil {
t.Errorf("Test %q failed: %v", test.name, err)
}
@ -1106,7 +616,7 @@ func runSyncTests(t *testing.T, tests []controllerTest, storageClasses []*storag
t.Errorf("Test %q failed: %v", test.name, err)
}
evaluateTestResults(ctrl, reactor, test, t)
evaluateTestResults(ctrl, reactor.VolumeReactor, test, t)
}
}
@ -1145,15 +655,15 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s
reactor := newVolumeReactor(client, ctrl, nil, nil, test.errors)
for _, claim := range test.initialClaims {
ctrl.claims.Add(claim)
reactor.claims[claim.Name] = claim
}
for _, volume := range test.initialVolumes {
ctrl.volumes.store.Add(volume)
reactor.volumes[volume.Name] = volume
}
reactor.AddClaims(test.initialClaims)
reactor.AddVolumes(test.initialVolumes)
// Run the tested function
err = test.test(ctrl, reactor, test)
err = test.test(ctrl, reactor.VolumeReactor, test)
if err != nil {
t.Errorf("Test %q failed: %v", test.name, err)
}
@ -1174,16 +684,16 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s
// Wait for all goroutines to finish
reactor.waitForIdle()
obj := reactor.popChange()
obj := reactor.PopChange()
if obj == nil {
// Nothing was changed, should we exit?
if firstSync || reactor.changedSinceLastSync > 0 {
if firstSync || reactor.ChangedSinceLastSync() > 0 {
// There were some changes after the last "periodic sync".
// Simulate "periodic sync" of everything (until it produces
// no changes).
firstSync = false
klog.V(4).Infof("test %q: simulating periodical sync of all claims and volumes", test.name)
reactor.syncAll()
reactor.SyncAll()
} else {
// Last sync did not produce any updates, the test reached
// stable state -> finish.
@ -1201,7 +711,7 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s
ctrl.claims.Update(claim)
err = ctrl.syncClaim(claim)
if err != nil {
if err == versionConflictError {
if err == pvtesting.VersionConflictError {
// Ignore version errors
klog.V(4).Infof("test intentionaly ignores version error.")
} else {
@ -1218,7 +728,7 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s
ctrl.volumes.store.Update(volume)
err = ctrl.syncVolume(volume)
if err != nil {
if err == versionConflictError {
if err == pvtesting.VersionConflictError {
// Ignore version errors
klog.V(4).Infof("test intentionaly ignores version error.")
} else {
@ -1231,7 +741,7 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s
continue
}
}
evaluateTestResults(ctrl, reactor, test, t)
evaluateTestResults(ctrl, reactor.VolumeReactor, test, t)
klog.V(4).Infof("test %q finished after %d iterations", test.name, counter)
}
}

View File

@ -25,6 +25,7 @@ import (
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
api "k8s.io/kubernetes/pkg/apis/core"
pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing"
)
var class1Parameters = map[string]string{
@ -191,13 +192,11 @@ func TestProvisionSync(t *testing.T) {
// The claim would be bound in next syncClaim
newClaimArray("claim11-7", "uid11-7", "1Gi", "", v1.ClaimPending, &classGold, annStorageProvisioner),
noevents, noerrors,
wrapTestWithInjectedOperation(wrapTestWithProvisionCalls([]provisionCall{}, testSyncClaim), func(ctrl *PersistentVolumeController, reactor *volumeReactor) {
wrapTestWithInjectedOperation(wrapTestWithProvisionCalls([]provisionCall{}, testSyncClaim), func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor) {
// Create a volume before provisionClaimOperation starts.
// This similates a parallel controller provisioning the volume.
reactor.lock.Lock()
volume := newVolume("pvc-uid11-7", "1Gi", "uid11-7", "claim11-7", v1.VolumeBound, v1.PersistentVolumeReclaimDelete, classGold, annBoundByController, annDynamicallyProvisioned)
reactor.volumes[volume.Name] = volume
reactor.lock.Unlock()
reactor.AddVolume(volume)
}),
},
{
@ -210,7 +209,7 @@ func TestProvisionSync(t *testing.T) {
// Binding will be completed in the next syncClaim
newClaimArray("claim11-8", "uid11-8", "1Gi", "", v1.ClaimPending, &classGold, annStorageProvisioner),
[]string{"Normal ProvisioningSucceeded"},
[]reactorError{
[]pvtesting.ReactorError{
// Inject error to the first
// kubeclient.PersistentVolumes.Create() call. All other calls
// will succeed.
@ -227,7 +226,7 @@ func TestProvisionSync(t *testing.T) {
newClaimArray("claim11-9", "uid11-9", "1Gi", "", v1.ClaimPending, &classGold),
newClaimArray("claim11-9", "uid11-9", "1Gi", "", v1.ClaimPending, &classGold, annStorageProvisioner),
[]string{"Warning ProvisioningFailed"},
[]reactorError{
[]pvtesting.ReactorError{
// Inject error to five kubeclient.PersistentVolumes.Create()
// calls
{"create", "persistentvolumes", errors.New("Mock creation error1")},
@ -252,7 +251,7 @@ func TestProvisionSync(t *testing.T) {
newClaimArray("claim11-10", "uid11-10", "1Gi", "", v1.ClaimPending, &classGold),
newClaimArray("claim11-10", "uid11-10", "1Gi", "", v1.ClaimPending, &classGold, annStorageProvisioner),
[]string{"Warning ProvisioningFailed", "Warning ProvisioningCleanupFailed"},
[]reactorError{
[]pvtesting.ReactorError{
// Inject error to five kubeclient.PersistentVolumes.Create()
// calls
{"create", "persistentvolumes", errors.New("Mock creation error1")},
@ -273,7 +272,7 @@ func TestProvisionSync(t *testing.T) {
newClaimArray("claim11-11", "uid11-11", "1Gi", "", v1.ClaimPending, &classGold),
newClaimArray("claim11-11", "uid11-11", "1Gi", "", v1.ClaimPending, &classGold, annStorageProvisioner),
[]string{"Warning ProvisioningFailed", "Warning ProvisioningCleanupFailed"},
[]reactorError{
[]pvtesting.ReactorError{
// Inject error to five kubeclient.PersistentVolumes.Create()
// calls
{"create", "persistentvolumes", errors.New("Mock creation error1")},
@ -303,7 +302,7 @@ func TestProvisionSync(t *testing.T) {
newClaimArray("claim11-12", "uid11-12", "1Gi", "", v1.ClaimPending, &classGold),
newClaimArray("claim11-12", "uid11-12", "1Gi", "", v1.ClaimPending, &classGold, annStorageProvisioner),
[]string{"Warning ProvisioningFailed"},
[]reactorError{
[]pvtesting.ReactorError{
// Inject error to five kubeclient.PersistentVolumes.Create()
// calls
{"create", "persistentvolumes", errors.New("Mock creation error1")},
@ -397,7 +396,7 @@ func TestProvisionSync(t *testing.T) {
newClaimArray("claim11-19", "uid11-19", "1Gi", "", v1.ClaimPending, &classGold),
newClaimArray("claim11-19", "uid11-19", "1Gi", "", v1.ClaimPending, &classGold, annStorageProvisioner),
noevents,
[]reactorError{
[]pvtesting.ReactorError{
// Inject errors to simulate crashed API server during
// kubeclient.PersistentVolumes.Create()
{"create", "persistentvolumes", errors.New("Mock creation error1")},

View File

@ -30,6 +30,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/controller"
pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing"
)
var (
@ -60,9 +61,9 @@ func TestControllerSync(t *testing.T) {
newClaimArray("claim5-2", "uid5-2", "1Gi", "volume5-2", v1.ClaimBound, nil, annBoundByController, annBindCompleted),
noevents, noerrors,
// Custom test function that generates an add event
func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
claim := newClaim("claim5-2", "uid5-2", "1Gi", "", v1.ClaimPending, nil)
reactor.addClaimEvent(claim)
reactor.AddClaimEvent(claim)
return nil
},
},
@ -75,10 +76,10 @@ func TestControllerSync(t *testing.T) {
noclaims,
noevents, noerrors,
// Custom test function that generates a delete event
func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
obj := ctrl.claims.List()[0]
claim := obj.(*v1.PersistentVolumeClaim)
reactor.deleteClaimEvent(claim)
reactor.DeleteClaimEvent(claim)
return nil
},
},
@ -91,10 +92,10 @@ func TestControllerSync(t *testing.T) {
newClaimArray("claim5-4", "uid5-4", "1Gi", "volume5-4", v1.ClaimLost, nil, annBoundByController, annBindCompleted),
[]string{"Warning ClaimLost"}, noerrors,
// Custom test function that generates a delete event
func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
obj := ctrl.volumes.store.List()[0]
volume := obj.(*v1.PersistentVolume)
reactor.deleteVolumeEvent(volume)
reactor.DeleteVolumeEvent(volume)
return nil
},
},
@ -120,13 +121,13 @@ func TestControllerSync(t *testing.T) {
reactor := newVolumeReactor(client, ctrl, fakeVolumeWatch, fakeClaimWatch, test.errors)
for _, claim := range test.initialClaims {
reactor.claims[claim.Name] = claim
reactor.AddClaim(claim)
go func(claim *v1.PersistentVolumeClaim) {
fakeClaimWatch.Add(claim)
}(claim)
}
for _, volume := range test.initialVolumes {
reactor.volumes[volume.Name] = volume
reactor.AddVolume(volume)
go func(volume *v1.PersistentVolume) {
fakeVolumeWatch.Add(volume)
}(volume)
@ -148,7 +149,7 @@ func TestControllerSync(t *testing.T) {
klog.V(4).Infof("controller synced, starting test")
// Call the tested function
err = test.test(ctrl, reactor, test)
err = test.test(ctrl, reactor.VolumeReactor, test)
if err != nil {
t.Errorf("Test %q initial test call failed: %v", test.name, err)
}
@ -162,7 +163,7 @@ func TestControllerSync(t *testing.T) {
}
close(stopCh)
evaluateTestResults(ctrl, reactor, test, t)
evaluateTestResults(ctrl, reactor.VolumeReactor, test, t)
}
}

View File

@ -23,6 +23,7 @@ import (
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing"
)
// Test single call to syncVolume, expecting recycling to happen.
@ -130,11 +131,9 @@ func TestRecycleSync(t *testing.T) {
noclaims,
noclaims,
noevents, noerrors,
wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationRecycle, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *volumeReactor) {
wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationRecycle, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor) {
// Delete the volume before recycle operation starts
reactor.lock.Lock()
delete(reactor.volumes, "volume6-6")
reactor.lock.Unlock()
reactor.DeleteVolume("volume6-6")
}),
},
{
@ -147,14 +146,9 @@ func TestRecycleSync(t *testing.T) {
noclaims,
noclaims,
noevents, noerrors,
wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationRecycle, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *volumeReactor) {
wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationRecycle, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor) {
// Mark the volume as Available before the recycler starts
reactor.lock.Lock()
volume := reactor.volumes["volume6-7"]
volume.Spec.ClaimRef = nil
volume.Status.Phase = v1.VolumeAvailable
volume.Annotations = nil
reactor.lock.Unlock()
reactor.MarkVolumeAvaiable("volume6-7")
}),
},
{
@ -164,17 +158,13 @@ func TestRecycleSync(t *testing.T) {
// user.
"6-8 - prebound volume is deleted before recycling",
newVolumeArray("volume6-8", "1Gi", "uid6-8", "claim6-8", v1.VolumeBound, v1.PersistentVolumeReclaimRecycle, classEmpty),
newVolumeArray("volume6-8", "1Gi", "", "claim6-8", v1.VolumeAvailable, v1.PersistentVolumeReclaimRecycle, classEmpty),
newVolumeArray("volume6-8", "1Gi", "", "", v1.VolumeAvailable, v1.PersistentVolumeReclaimRecycle, classEmpty),
noclaims,
noclaims,
noevents, noerrors,
wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationRecycle, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *volumeReactor) {
wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationRecycle, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor) {
// Mark the volume as Available before the recycler starts
reactor.lock.Lock()
volume := reactor.volumes["volume6-8"]
volume.Spec.ClaimRef.UID = ""
volume.Status.Phase = v1.VolumeAvailable
reactor.lock.Unlock()
reactor.MarkVolumeAvaiable("volume6-8")
}),
},
{

View File

@ -0,0 +1,589 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"errors"
"fmt"
"reflect"
"strconv"
"sync"
v1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/watch"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/features"
)
var VersionConflictError = errors.New("VersionError")
// VolumeReactor is a core.Reactor that simulates etcd and API server. It
// stores:
// - Latest version of claims volumes saved by the controller.
// - Queue of all saves (to simulate "volume/claim updated" events). This queue
// contains all intermediate state of an object - e.g. a claim.VolumeName
// is updated first and claim.Phase second. This queue will then contain both
// updates as separate entries.
// - Number of changes since the last call to VolumeReactor.syncAll().
// - Optionally, volume and claim fake watchers which should be the same ones
// used by the controller. Any time an event function like deleteVolumeEvent
// is called to simulate an event, the reactor's stores are updated and the
// controller is sent the event via the fake watcher.
// - Optionally, list of error that should be returned by reactor, simulating
// etcd / API server failures. These errors are evaluated in order and every
// error is returned only once. I.e. when the reactor finds matching
// ReactorError, it return appropriate error and removes the ReactorError from
// the list.
type VolumeReactor struct {
volumes map[string]*v1.PersistentVolume
claims map[string]*v1.PersistentVolumeClaim
changedObjects []interface{}
changedSinceLastSync int
fakeVolumeWatch *watch.FakeWatcher
fakeClaimWatch *watch.FakeWatcher
lock sync.RWMutex
errors []ReactorError
watchers map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher
}
// ReactorError is an error that is returned by test reactor (=simulated
// etcd+/API server) when an action performed by the reactor matches given verb
// ("get", "update", "create", "delete" or "*"") on given resource
// ("persistentvolumes", "persistentvolumeclaims" or "*").
type ReactorError struct {
Verb string
Resource string
Error error
}
// React is a callback called by fake kubeClient from the controller.
// In other words, every claim/volume change performed by the controller ends
// here.
// This callback checks versions of the updated objects and refuse those that
// are too old (simulating real etcd).
// All updated objects are stored locally to keep track of object versions and
// to evaluate test results.
// All updated objects are also inserted into changedObjects queue and
// optionally sent back to the controller via its watchers.
func (r *VolumeReactor) React(action core.Action) (handled bool, ret runtime.Object, err error) {
r.lock.Lock()
defer r.lock.Unlock()
klog.V(4).Infof("reactor got operation %q on %q", action.GetVerb(), action.GetResource())
// Inject error when requested
err = r.injectReactError(action)
if err != nil {
return true, nil, err
}
// Test did not request to inject an error, continue simulating API server.
switch {
case action.Matches("create", "persistentvolumes"):
obj := action.(core.UpdateAction).GetObject()
volume := obj.(*v1.PersistentVolume)
// check the volume does not exist
_, found := r.volumes[volume.Name]
if found {
return true, nil, fmt.Errorf("Cannot create volume %s: volume already exists", volume.Name)
}
// mimic apiserver defaulting
if volume.Spec.VolumeMode == nil && utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
volume.Spec.VolumeMode = new(v1.PersistentVolumeMode)
*volume.Spec.VolumeMode = v1.PersistentVolumeFilesystem
}
// Store the updated object to appropriate places.
r.volumes[volume.Name] = volume
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Add(volume)
}
r.changedObjects = append(r.changedObjects, volume)
r.changedSinceLastSync++
klog.V(4).Infof("created volume %s", volume.Name)
return true, volume, nil
case action.Matches("create", "persistentvolumeclaims"):
obj := action.(core.UpdateAction).GetObject()
claim := obj.(*v1.PersistentVolumeClaim)
// check the claim does not exist
_, found := r.claims[claim.Name]
if found {
return true, nil, fmt.Errorf("Cannot create claim %s: claim already exists", claim.Name)
}
// Store the updated object to appropriate places.
r.claims[claim.Name] = claim
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Add(claim)
}
r.changedObjects = append(r.changedObjects, claim)
r.changedSinceLastSync++
klog.V(4).Infof("created claim %s", claim.Name)
return true, claim, nil
case action.Matches("update", "persistentvolumes"):
obj := action.(core.UpdateAction).GetObject()
volume := obj.(*v1.PersistentVolume)
// Check and bump object version
storedVolume, found := r.volumes[volume.Name]
if found {
storedVer, _ := strconv.Atoi(storedVolume.ResourceVersion)
requestedVer, _ := strconv.Atoi(volume.ResourceVersion)
if storedVer != requestedVer {
return true, obj, VersionConflictError
}
if reflect.DeepEqual(storedVolume, volume) {
klog.V(4).Infof("nothing updated volume %s", volume.Name)
return true, volume, nil
}
// Don't modify the existing object
volume = volume.DeepCopy()
volume.ResourceVersion = strconv.Itoa(storedVer + 1)
} else {
return true, nil, fmt.Errorf("Cannot update volume %s: volume not found", volume.Name)
}
// Store the updated object to appropriate places.
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Modify(volume)
}
r.volumes[volume.Name] = volume
r.changedObjects = append(r.changedObjects, volume)
r.changedSinceLastSync++
klog.V(4).Infof("saved updated volume %s", volume.Name)
return true, volume, nil
case action.Matches("update", "persistentvolumeclaims"):
obj := action.(core.UpdateAction).GetObject()
claim := obj.(*v1.PersistentVolumeClaim)
// Check and bump object version
storedClaim, found := r.claims[claim.Name]
if found {
storedVer, _ := strconv.Atoi(storedClaim.ResourceVersion)
requestedVer, _ := strconv.Atoi(claim.ResourceVersion)
if storedVer != requestedVer {
return true, obj, VersionConflictError
}
if reflect.DeepEqual(storedClaim, claim) {
klog.V(4).Infof("nothing updated claim %s", claim.Name)
return true, claim, nil
}
// Don't modify the existing object
claim = claim.DeepCopy()
claim.ResourceVersion = strconv.Itoa(storedVer + 1)
} else {
return true, nil, fmt.Errorf("Cannot update claim %s: claim not found", claim.Name)
}
// Store the updated object to appropriate places.
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Modify(claim)
}
r.claims[claim.Name] = claim
r.changedObjects = append(r.changedObjects, claim)
r.changedSinceLastSync++
klog.V(4).Infof("saved updated claim %s", claim.Name)
return true, claim, nil
case action.Matches("get", "persistentvolumes"):
name := action.(core.GetAction).GetName()
volume, found := r.volumes[name]
if found {
klog.V(4).Infof("GetVolume: found %s", volume.Name)
return true, volume.DeepCopy(), nil
} else {
klog.V(4).Infof("GetVolume: volume %s not found", name)
return true, nil, fmt.Errorf("Cannot find volume %s", name)
}
case action.Matches("get", "persistentvolumeclaims"):
name := action.(core.GetAction).GetName()
claim, found := r.claims[name]
if found {
klog.V(4).Infof("GetClaim: found %s", claim.Name)
return true, claim.DeepCopy(), nil
} else {
klog.V(4).Infof("GetClaim: claim %s not found", name)
return true, nil, apierrs.NewNotFound(action.GetResource().GroupResource(), name)
}
case action.Matches("delete", "persistentvolumes"):
name := action.(core.DeleteAction).GetName()
klog.V(4).Infof("deleted volume %s", name)
obj, found := r.volumes[name]
if found {
delete(r.volumes, name)
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Delete(obj)
}
r.changedSinceLastSync++
return true, nil, nil
} else {
return true, nil, fmt.Errorf("Cannot delete volume %s: not found", name)
}
case action.Matches("delete", "persistentvolumeclaims"):
name := action.(core.DeleteAction).GetName()
klog.V(4).Infof("deleted claim %s", name)
obj, found := r.claims[name]
if found {
delete(r.claims, name)
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Delete(obj)
}
r.changedSinceLastSync++
return true, nil, nil
} else {
return true, nil, fmt.Errorf("Cannot delete claim %s: not found", name)
}
}
return false, nil, nil
}
// Watch watches objects from the VolumeReactor. Watch returns a channel which
// will push added / modified / deleted object.
func (r *VolumeReactor) Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) {
r.lock.Lock()
defer r.lock.Unlock()
fakewatcher := watch.NewRaceFreeFake()
if _, exists := r.watchers[gvr]; !exists {
r.watchers[gvr] = make(map[string][]*watch.RaceFreeFakeWatcher)
}
r.watchers[gvr][ns] = append(r.watchers[gvr][ns], fakewatcher)
return fakewatcher, nil
}
func (r *VolumeReactor) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.RaceFreeFakeWatcher {
watches := []*watch.RaceFreeFakeWatcher{}
if r.watchers[gvr] != nil {
if w := r.watchers[gvr][ns]; w != nil {
watches = append(watches, w...)
}
if ns != metav1.NamespaceAll {
if w := r.watchers[gvr][metav1.NamespaceAll]; w != nil {
watches = append(watches, w...)
}
}
}
return watches
}
func (r *VolumeReactor) ChangedSinceLastSync() int {
r.lock.RLock()
defer r.lock.RUnlock()
return r.changedSinceLastSync
}
// injectReactError returns an error when the test requested given action to
// fail. nil is returned otherwise.
func (r *VolumeReactor) injectReactError(action core.Action) error {
if len(r.errors) == 0 {
// No more errors to inject, everything should succeed.
return nil
}
for i, expected := range r.errors {
klog.V(4).Infof("trying to match %q %q with %q %q", expected.Verb, expected.Resource, action.GetVerb(), action.GetResource())
if action.Matches(expected.Verb, expected.Resource) {
// That's the action we're waiting for, remove it from injectedErrors
r.errors = append(r.errors[:i], r.errors[i+1:]...)
klog.V(4).Infof("reactor found matching error at index %d: %q %q, returning %v", i, expected.Verb, expected.Resource, expected.Error)
return expected.Error
}
}
return nil
}
// CheckVolumes compares all expectedVolumes with set of volumes at the end of
// the test and reports differences.
func (r *VolumeReactor) CheckVolumes(expectedVolumes []*v1.PersistentVolume) error {
r.lock.Lock()
defer r.lock.Unlock()
expectedMap := make(map[string]*v1.PersistentVolume)
gotMap := make(map[string]*v1.PersistentVolume)
// Clear any ResourceVersion from both sets
for _, v := range expectedVolumes {
// Don't modify the existing object
v := v.DeepCopy()
v.ResourceVersion = ""
if v.Spec.ClaimRef != nil {
v.Spec.ClaimRef.ResourceVersion = ""
}
expectedMap[v.Name] = v
}
for _, v := range r.volumes {
// We must clone the volume because of golang race check - it was
// written by the controller without any locks on it.
v := v.DeepCopy()
v.ResourceVersion = ""
if v.Spec.ClaimRef != nil {
v.Spec.ClaimRef.ResourceVersion = ""
}
gotMap[v.Name] = v
}
if !reflect.DeepEqual(expectedMap, gotMap) {
// Print ugly but useful diff of expected and received objects for
// easier debugging.
return fmt.Errorf("Volume check failed [A-expected, B-got]: %s", diff.ObjectDiff(expectedMap, gotMap))
}
return nil
}
// CheckClaims compares all expectedClaims with set of claims at the end of the
// test and reports differences.
func (r *VolumeReactor) CheckClaims(expectedClaims []*v1.PersistentVolumeClaim) error {
r.lock.Lock()
defer r.lock.Unlock()
expectedMap := make(map[string]*v1.PersistentVolumeClaim)
gotMap := make(map[string]*v1.PersistentVolumeClaim)
for _, c := range expectedClaims {
// Don't modify the existing object
c = c.DeepCopy()
c.ResourceVersion = ""
expectedMap[c.Name] = c
}
for _, c := range r.claims {
// We must clone the claim because of golang race check - it was
// written by the controller without any locks on it.
c = c.DeepCopy()
c.ResourceVersion = ""
gotMap[c.Name] = c
}
if !reflect.DeepEqual(expectedMap, gotMap) {
// Print ugly but useful diff of expected and received objects for
// easier debugging.
return fmt.Errorf("Claim check failed [A-expected, B-got result]: %s", diff.ObjectDiff(expectedMap, gotMap))
}
return nil
}
// PopChange returns one recorded updated object, either *v1.PersistentVolume
// or *v1.PersistentVolumeClaim. Returns nil when there are no changes.
func (r *VolumeReactor) PopChange() interface{} {
r.lock.Lock()
defer r.lock.Unlock()
if len(r.changedObjects) == 0 {
return nil
}
// For debugging purposes, print the queue
for _, obj := range r.changedObjects {
switch obj.(type) {
case *v1.PersistentVolume:
vol, _ := obj.(*v1.PersistentVolume)
klog.V(4).Infof("reactor queue: %s", vol.Name)
case *v1.PersistentVolumeClaim:
claim, _ := obj.(*v1.PersistentVolumeClaim)
klog.V(4).Infof("reactor queue: %s", claim.Name)
}
}
// Pop the first item from the queue and return it
obj := r.changedObjects[0]
r.changedObjects = r.changedObjects[1:]
return obj
}
// SyncAll simulates the controller periodic sync of volumes and claim. It
// simply adds all these objects to the internal queue of updates. This method
// should be used when the test manually calls syncClaim/syncVolume. Test that
// use real controller loop (ctrl.Run()) will get periodic sync automatically.
func (r *VolumeReactor) SyncAll() {
r.lock.Lock()
defer r.lock.Unlock()
for _, c := range r.claims {
r.changedObjects = append(r.changedObjects, c)
}
for _, v := range r.volumes {
r.changedObjects = append(r.changedObjects, v)
}
r.changedSinceLastSync = 0
}
func (r *VolumeReactor) GetChangeCount() int {
r.lock.Lock()
defer r.lock.Unlock()
return r.changedSinceLastSync
}
// DeleteVolumeEvent simulates that a volume has been deleted in etcd and
// the controller receives 'volume deleted' event.
func (r *VolumeReactor) DeleteVolumeEvent(volume *v1.PersistentVolume) {
r.lock.Lock()
defer r.lock.Unlock()
// Remove the volume from list of resulting volumes.
delete(r.volumes, volume.Name)
// Generate deletion event. Cloned volume is needed to prevent races (and we
// would get a clone from etcd too).
if r.fakeVolumeWatch != nil {
r.fakeVolumeWatch.Delete(volume.DeepCopy())
}
}
// DeleteClaimEvent simulates that a claim has been deleted in etcd and the
// controller receives 'claim deleted' event.
func (r *VolumeReactor) DeleteClaimEvent(claim *v1.PersistentVolumeClaim) {
r.lock.Lock()
defer r.lock.Unlock()
// Remove the claim from list of resulting claims.
delete(r.claims, claim.Name)
// Generate deletion event. Cloned volume is needed to prevent races (and we
// would get a clone from etcd too).
if r.fakeClaimWatch != nil {
r.fakeClaimWatch.Delete(claim.DeepCopy())
}
}
// addVolumeEvent simulates that a volume has been added in etcd and the
// controller receives 'volume added' event.
func (r *VolumeReactor) addVolumeEvent(volume *v1.PersistentVolume) {
r.lock.Lock()
defer r.lock.Unlock()
r.volumes[volume.Name] = volume
// Generate event. No cloning is needed, this claim is not stored in the
// controller cache yet.
if r.fakeVolumeWatch != nil {
r.fakeVolumeWatch.Add(volume)
}
}
// modifyVolumeEvent simulates that a volume has been modified in etcd and the
// controller receives 'volume modified' event.
func (r *VolumeReactor) modifyVolumeEvent(volume *v1.PersistentVolume) {
r.lock.Lock()
defer r.lock.Unlock()
r.volumes[volume.Name] = volume
// Generate deletion event. Cloned volume is needed to prevent races (and we
// would get a clone from etcd too).
if r.fakeVolumeWatch != nil {
r.fakeVolumeWatch.Modify(volume.DeepCopy())
}
}
// AddClaimEvent simulates that a claim has been deleted in etcd and the
// controller receives 'claim added' event.
func (r *VolumeReactor) AddClaimEvent(claim *v1.PersistentVolumeClaim) {
r.lock.Lock()
defer r.lock.Unlock()
r.claims[claim.Name] = claim
// Generate event. No cloning is needed, this claim is not stored in the
// controller cache yet.
if r.fakeClaimWatch != nil {
r.fakeClaimWatch.Add(claim)
}
}
func (r *VolumeReactor) AddClaims(claims []*v1.PersistentVolumeClaim) {
r.lock.Lock()
defer r.lock.Unlock()
for _, claim := range claims {
r.claims[claim.Name] = claim
}
}
func (r *VolumeReactor) AddVolumes(volumes []*v1.PersistentVolume) {
r.lock.Lock()
defer r.lock.Unlock()
for _, volume := range volumes {
r.volumes[volume.Name] = volume
}
}
func (r *VolumeReactor) AddClaim(claim *v1.PersistentVolumeClaim) {
r.lock.Lock()
defer r.lock.Unlock()
r.claims[claim.Name] = claim
}
func (r *VolumeReactor) AddVolume(volume *v1.PersistentVolume) {
r.lock.Lock()
defer r.lock.Unlock()
r.volumes[volume.Name] = volume
}
func (r *VolumeReactor) DeleteVolume(name string) {
r.lock.Lock()
defer r.lock.Unlock()
delete(r.volumes, name)
}
func (r *VolumeReactor) AddClaimBoundToVolume(claim *v1.PersistentVolumeClaim) {
r.lock.Lock()
defer r.lock.Unlock()
r.claims[claim.Name] = claim
if volume, ok := r.volumes[claim.Spec.VolumeName]; ok {
volume.Status.Phase = v1.VolumeBound
}
}
func (r *VolumeReactor) MarkVolumeAvaiable(name string) {
r.lock.Lock()
defer r.lock.Unlock()
if volume, ok := r.volumes[name]; ok {
volume.Spec.ClaimRef = nil
volume.Status.Phase = v1.VolumeAvailable
volume.Annotations = nil
}
}
func NewVolumeReactor(client *fake.Clientset, fakeVolumeWatch, fakeClaimWatch *watch.FakeWatcher, errors []ReactorError) *VolumeReactor {
reactor := &VolumeReactor{
volumes: make(map[string]*v1.PersistentVolume),
claims: make(map[string]*v1.PersistentVolumeClaim),
fakeVolumeWatch: fakeVolumeWatch,
fakeClaimWatch: fakeClaimWatch,
errors: errors,
watchers: make(map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher),
}
client.AddReactor("create", "persistentvolumes", reactor.React)
client.AddReactor("create", "persistentvolumeclaims", reactor.React)
client.AddReactor("update", "persistentvolumes", reactor.React)
client.AddReactor("update", "persistentvolumeclaims", reactor.React)
client.AddReactor("get", "persistentvolumes", reactor.React)
client.AddReactor("get", "persistentvolumeclaims", reactor.React)
client.AddReactor("delete", "persistentvolumes", reactor.React)
client.AddReactor("delete", "persistentvolumeclaims", reactor.React)
return reactor
}