diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index e39bfa57636..ab5a187d423 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -46,8 +46,8 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" schedutil "k8s.io/kubernetes/pkg/scheduler/util" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" "k8s.io/utils/ptr" ) @@ -302,7 +302,7 @@ type dynamicResources struct { // When implementing cluster autoscaler support, this assume cache or // something like it (see https://github.com/kubernetes/kubernetes/pull/112202) // might have to be managed by the cluster autoscaler. - claimAssumeCache volumebinding.AssumeCache + claimAssumeCache assumecache.AssumeCache // inFlightAllocations is map from claim UUIDs to claim objects for those claims // for which allocation was triggered during a scheduling cycle and the @@ -355,7 +355,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe classParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Lister(), resourceSliceLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceSlices().Lister(), claimNameLookup: resourceclaim.NewNameLookup(fh.ClientSet()), - claimAssumeCache: volumebinding.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil), + claimAssumeCache: assumecache.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil), } return pl, nil diff --git a/pkg/scheduler/framework/plugins/volumebinding/assume_cache.go b/pkg/scheduler/framework/plugins/volumebinding/assume_cache.go new file mode 100644 index 00000000000..bfbd322af76 --- /dev/null +++ b/pkg/scheduler/framework/plugins/volumebinding/assume_cache.go @@ -0,0 +1,149 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package volumebinding + +import ( + "fmt" + + "k8s.io/klog/v2" + + v1 "k8s.io/api/core/v1" + storagehelpers "k8s.io/component-helpers/storage/volume" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" +) + +// PVAssumeCache is a AssumeCache for PersistentVolume objects +type PVAssumeCache interface { + assumecache.AssumeCache + + GetPV(pvName string) (*v1.PersistentVolume, error) + GetAPIPV(pvName string) (*v1.PersistentVolume, error) + ListPVs(storageClassName string) []*v1.PersistentVolume +} + +type pvAssumeCache struct { + assumecache.AssumeCache + logger klog.Logger +} + +func pvStorageClassIndexFunc(obj interface{}) ([]string, error) { + if pv, ok := obj.(*v1.PersistentVolume); ok { + return []string{storagehelpers.GetPersistentVolumeClass(pv)}, nil + } + return []string{""}, fmt.Errorf("object is not a v1.PersistentVolume: %v", obj) +} + +// NewPVAssumeCache creates a PV assume cache. +func NewPVAssumeCache(logger klog.Logger, informer assumecache.Informer) PVAssumeCache { + logger = klog.LoggerWithName(logger, "PV Cache") + return &pvAssumeCache{ + AssumeCache: assumecache.NewAssumeCache(logger, informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc), + logger: logger, + } +} + +func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) { + obj, err := c.Get(pvName) + if err != nil { + return nil, err + } + + pv, ok := obj.(*v1.PersistentVolume) + if !ok { + return nil, &assumecache.WrongTypeError{TypeName: "v1.PersistentVolume", Object: obj} + } + return pv, nil +} + +func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) { + obj, err := c.GetAPIObj(pvName) + if err != nil { + return nil, err + } + pv, ok := obj.(*v1.PersistentVolume) + if !ok { + return nil, &assumecache.WrongTypeError{TypeName: "v1.PersistentVolume", Object: obj} + } + return pv, nil +} + +func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume { + objs := c.List(&v1.PersistentVolume{ + Spec: v1.PersistentVolumeSpec{ + StorageClassName: storageClassName, + }, + }) + pvs := []*v1.PersistentVolume{} + for _, obj := range objs { + pv, ok := obj.(*v1.PersistentVolume) + if !ok { + c.logger.Error(&assumecache.WrongTypeError{TypeName: "v1.PersistentVolume", Object: obj}, "ListPVs") + continue + } + pvs = append(pvs, pv) + } + return pvs +} + +// PVCAssumeCache is a AssumeCache for PersistentVolumeClaim objects +type PVCAssumeCache interface { + assumecache.AssumeCache + + // GetPVC returns the PVC from the cache with given pvcKey. + // pvcKey is the result of MetaNamespaceKeyFunc on PVC obj + GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) + GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) +} + +type pvcAssumeCache struct { + assumecache.AssumeCache + logger klog.Logger +} + +// NewPVCAssumeCache creates a PVC assume cache. +func NewPVCAssumeCache(logger klog.Logger, informer assumecache.Informer) PVCAssumeCache { + logger = klog.LoggerWithName(logger, "PVC Cache") + return &pvcAssumeCache{ + AssumeCache: assumecache.NewAssumeCache(logger, informer, "v1.PersistentVolumeClaim", "", nil), + logger: logger, + } +} + +func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { + obj, err := c.Get(pvcKey) + if err != nil { + return nil, err + } + + pvc, ok := obj.(*v1.PersistentVolumeClaim) + if !ok { + return nil, &assumecache.WrongTypeError{TypeName: "v1.PersistentVolumeClaim", Object: obj} + } + return pvc, nil +} + +func (c *pvcAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { + obj, err := c.GetAPIObj(pvcKey) + if err != nil { + return nil, err + } + pvc, ok := obj.(*v1.PersistentVolumeClaim) + if !ok { + return nil, &assumecache.WrongTypeError{TypeName: "v1.PersistentVolumeClaim", Object: obj} + } + return pvc, nil +} diff --git a/pkg/scheduler/framework/plugins/volumebinding/assume_cache_test.go b/pkg/scheduler/framework/plugins/volumebinding/assume_cache_test.go index 7391a412f89..92f047fba06 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/assume_cache_test.go +++ b/pkg/scheduler/framework/plugins/volumebinding/assume_cache_test.go @@ -24,6 +24,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/component-helpers/storage/volume" "k8s.io/klog/v2/ktesting" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" ) func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) { @@ -99,13 +100,9 @@ func TestAssumePV(t *testing.T) { for name, scenario := range scenarios { cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } // Add oldPV to cache - internalCache.add(scenario.oldPV) + assumecache.AddTestObject(cache, scenario.oldPV) if err := verifyPV(cache, scenario.oldPV.Name, scenario.oldPV); err != nil { t.Errorf("Failed to GetPV() after initial update: %v", err) continue @@ -134,10 +131,6 @@ func TestAssumePV(t *testing.T) { func TestRestorePV(t *testing.T) { logger, _ := ktesting.NewTestContext(t) cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } oldPV := makePV("pv1", "").withVersion("5").PersistentVolume newPV := makePV("pv1", "").withVersion("5").PersistentVolume @@ -146,7 +139,7 @@ func TestRestorePV(t *testing.T) { cache.Restore("nothing") // Add oldPV to cache - internalCache.add(oldPV) + assumecache.AddTestObject(cache, oldPV) if err := verifyPV(cache, oldPV.Name, oldPV); err != nil { t.Fatalf("Failed to GetPV() after initial update: %v", err) } @@ -175,10 +168,6 @@ func TestRestorePV(t *testing.T) { func TestBasicPVCache(t *testing.T) { logger, _ := ktesting.NewTestContext(t) cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } // Get object that doesn't exist pv, err := cache.GetPV("nothere") @@ -194,7 +183,7 @@ func TestBasicPVCache(t *testing.T) { for i := 0; i < 10; i++ { pv := makePV(fmt.Sprintf("test-pv%v", i), "").withVersion("1").PersistentVolume pvs[pv.Name] = pv - internalCache.add(pv) + assumecache.AddTestObject(cache, pv) } // List them @@ -203,7 +192,7 @@ func TestBasicPVCache(t *testing.T) { // Update a PV updatedPV := makePV("test-pv3", "").withVersion("2").PersistentVolume pvs[updatedPV.Name] = updatedPV - internalCache.update(nil, updatedPV) + assumecache.UpdateTestObject(cache, updatedPV) // List them verifyListPVs(t, cache, pvs, "") @@ -211,7 +200,7 @@ func TestBasicPVCache(t *testing.T) { // Delete a PV deletedPV := pvs["test-pv7"] delete(pvs, deletedPV.Name) - internalCache.delete(deletedPV) + assumecache.DeleteTestObject(cache, deletedPV) // List them verifyListPVs(t, cache, pvs, "") @@ -220,17 +209,13 @@ func TestBasicPVCache(t *testing.T) { func TestPVCacheWithStorageClasses(t *testing.T) { logger, _ := ktesting.NewTestContext(t) cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } // Add a bunch of PVs pvs1 := map[string]*v1.PersistentVolume{} for i := 0; i < 10; i++ { pv := makePV(fmt.Sprintf("test-pv%v", i), "class1").withVersion("1").PersistentVolume pvs1[pv.Name] = pv - internalCache.add(pv) + assumecache.AddTestObject(cache, pv) } // Add a bunch of PVs @@ -238,7 +223,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) { for i := 0; i < 10; i++ { pv := makePV(fmt.Sprintf("test2-pv%v", i), "class2").withVersion("1").PersistentVolume pvs2[pv.Name] = pv - internalCache.add(pv) + assumecache.AddTestObject(cache, pv) } // List them @@ -248,7 +233,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) { // Update a PV updatedPV := makePV("test-pv3", "class1").withVersion("2").PersistentVolume pvs1[updatedPV.Name] = updatedPV - internalCache.update(nil, updatedPV) + assumecache.UpdateTestObject(cache, updatedPV) // List them verifyListPVs(t, cache, pvs1, "class1") @@ -257,7 +242,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) { // Delete a PV deletedPV := pvs1["test-pv7"] delete(pvs1, deletedPV.Name) - internalCache.delete(deletedPV) + assumecache.DeleteTestObject(cache, deletedPV) // List them verifyListPVs(t, cache, pvs1, "class1") @@ -267,16 +252,12 @@ func TestPVCacheWithStorageClasses(t *testing.T) { func TestAssumeUpdatePVCache(t *testing.T) { logger, _ := ktesting.NewTestContext(t) cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } pvName := "test-pv0" // Add a PV pv := makePV(pvName, "").withVersion("1").PersistentVolume - internalCache.add(pv) + assumecache.AddTestObject(cache, pv) if err := verifyPV(cache, pvName, pv); err != nil { t.Fatalf("failed to get PV: %v", err) } @@ -292,7 +273,7 @@ func TestAssumeUpdatePVCache(t *testing.T) { } // Add old PV - internalCache.add(pv) + assumecache.AddTestObject(cache, pv) if err := verifyPV(cache, pvName, newPV); err != nil { t.Fatalf("failed to get PV after old PV added: %v", err) } @@ -361,13 +342,9 @@ func TestAssumePVC(t *testing.T) { for name, scenario := range scenarios { cache := NewPVCAssumeCache(logger, nil) - internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } // Add oldPVC to cache - internalCache.add(scenario.oldPVC) + assumecache.AddTestObject(cache, scenario.oldPVC) if err := verifyPVC(cache, getPVCName(scenario.oldPVC), scenario.oldPVC); err != nil { t.Errorf("Failed to GetPVC() after initial update: %v", err) continue @@ -396,10 +373,6 @@ func TestAssumePVC(t *testing.T) { func TestRestorePVC(t *testing.T) { logger, _ := ktesting.NewTestContext(t) cache := NewPVCAssumeCache(logger, nil) - internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } oldPVC := makeClaim("pvc1", "5", "ns1") newPVC := makeClaim("pvc1", "5", "ns1") @@ -408,7 +381,7 @@ func TestRestorePVC(t *testing.T) { cache.Restore("nothing") // Add oldPVC to cache - internalCache.add(oldPVC) + assumecache.AddTestObject(cache, oldPVC) if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil { t.Fatalf("Failed to GetPVC() after initial update: %v", err) } @@ -437,17 +410,13 @@ func TestRestorePVC(t *testing.T) { func TestAssumeUpdatePVCCache(t *testing.T) { logger, _ := ktesting.NewTestContext(t) cache := NewPVCAssumeCache(logger, nil) - internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } pvcName := "test-pvc0" pvcNamespace := "test-ns" // Add a PVC pvc := makeClaim(pvcName, "1", pvcNamespace) - internalCache.add(pvc) + assumecache.AddTestObject(cache, pvc) if err := verifyPVC(cache, getPVCName(pvc), pvc); err != nil { t.Fatalf("failed to get PVC: %v", err) } @@ -463,7 +432,7 @@ func TestAssumeUpdatePVCCache(t *testing.T) { } // Add old PVC - internalCache.add(pvc) + assumecache.AddTestObject(cache, pvc) if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil { t.Fatalf("failed to get PVC after old PVC added: %v", err) } diff --git a/pkg/scheduler/framework/plugins/volumebinding/binder.go b/pkg/scheduler/framework/plugins/volumebinding/binder.go index f6ce916c6bf..ac1031da0e9 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/binder.go +++ b/pkg/scheduler/framework/plugins/volumebinding/binder.go @@ -18,6 +18,7 @@ package volumebinding import ( "context" + "errors" "fmt" "sort" "strings" @@ -45,6 +46,7 @@ import ( v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding/metrics" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" "k8s.io/kubernetes/pkg/volume/util" ) @@ -720,7 +722,7 @@ func (b *volumeBinder) checkBindings(logger klog.Logger, pod *v1.Pod, bindings [ if pvc.Spec.VolumeName != "" { pv, err := b.pvCache.GetAPIPV(pvc.Spec.VolumeName) if err != nil { - if _, ok := err.(*errNotFound); ok { + if errors.Is(err, assumecache.ErrNotFound) { // We tolerate NotFound error here, because PV is possibly // not found because of API delay, we can check next time. // And if PV does not exist because it's deleted, PVC will @@ -873,7 +875,7 @@ func (b *volumeBinder) checkBoundClaims(logger klog.Logger, claims []*v1.Persist pvName := pvc.Spec.VolumeName pv, err := b.pvCache.GetPV(pvName) if err != nil { - if _, ok := err.(*errNotFound); ok { + if errors.Is(err, assumecache.ErrNotFound) { err = nil } return true, false, err diff --git a/pkg/scheduler/framework/plugins/volumebinding/binder_test.go b/pkg/scheduler/framework/plugins/volumebinding/binder_test.go index 1746780ce2e..b9497d089aa 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/binder_test.go +++ b/pkg/scheduler/framework/plugins/volumebinding/binder_test.go @@ -47,6 +47,7 @@ import ( _ "k8s.io/klog/v2/ktesting/init" "k8s.io/kubernetes/pkg/controller" pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" ) var ( @@ -138,8 +139,6 @@ type testEnv struct { internalPodInformer coreinformers.PodInformer internalNodeInformer coreinformers.NodeInformer internalCSINodeInformer storageinformers.CSINodeInformer - internalPVCache *assumeCache - internalPVCCache *assumeCache // For CSIStorageCapacity feature testing: internalCSIDriverInformer storageinformers.CSIDriverInformer @@ -258,18 +257,6 @@ func newTestBinder(t *testing.T, ctx context.Context) *testEnv { t.Fatalf("Failed to convert to internal binder") } - pvCache := internalBinder.pvCache - internalPVCache, ok := pvCache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to convert to internal PV cache") - } - - pvcCache := internalBinder.pvcCache - internalPVCCache, ok := pvcCache.(*pvcAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to convert to internal PVC cache") - } - return &testEnv{ client: client, reactor: reactor, @@ -278,8 +265,6 @@ func newTestBinder(t *testing.T, ctx context.Context) *testEnv { internalPodInformer: podInformer, internalNodeInformer: nodeInformer, internalCSINodeInformer: csiNodeInformer, - internalPVCache: internalPVCache, - internalPVCCache: internalPVCCache, internalCSIDriverInformer: csiDriverInformer, internalCSIStorageCapacityInformer: csiStorageCapacityInformer, @@ -313,9 +298,8 @@ func (env *testEnv) addCSIStorageCapacities(capacities []*storagev1.CSIStorageCa } func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs []*v1.PersistentVolumeClaim) { - internalPVCCache := env.internalPVCCache for _, pvc := range cachedPVCs { - internalPVCCache.add(pvc) + assumecache.AddTestObject(env.internalBinder.pvcCache, pvc) if apiPVCs == nil { env.reactor.AddClaim(pvc) } @@ -326,9 +310,8 @@ func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs [ } func (env *testEnv) initVolumes(cachedPVs []*v1.PersistentVolume, apiPVs []*v1.PersistentVolume) { - internalPVCache := env.internalPVCache for _, pv := range cachedPVs { - internalPVCache.add(pv) + assumecache.AddTestObject(env.internalBinder.pvCache, pv) if apiPVs == nil { env.reactor.AddVolume(pv) } @@ -349,7 +332,7 @@ func (env *testEnv) updateVolumes(ctx context.Context, pvs []*v1.PersistentVolum } return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 3*time.Second, false, func(ctx context.Context) (bool, error) { for _, pv := range pvs { - obj, err := env.internalPVCache.GetAPIObj(pv.Name) + obj, err := env.internalBinder.pvCache.GetAPIObj(pv.Name) if obj == nil || err != nil { return false, nil } @@ -375,7 +358,7 @@ func (env *testEnv) updateClaims(ctx context.Context, pvcs []*v1.PersistentVolum } return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 3*time.Second, false, func(ctx context.Context) (bool, error) { for _, pvc := range pvcs { - obj, err := env.internalPVCCache.GetAPIObj(getPVCName(pvc)) + obj, err := env.internalBinder.pvcCache.GetAPIObj(getPVCName(pvc)) if obj == nil || err != nil { return false, nil } @@ -393,13 +376,13 @@ func (env *testEnv) updateClaims(ctx context.Context, pvcs []*v1.PersistentVolum func (env *testEnv) deleteVolumes(pvs []*v1.PersistentVolume) { for _, pv := range pvs { - env.internalPVCache.delete(pv) + assumecache.DeleteTestObject(env.internalBinder.pvCache, pv) } } func (env *testEnv) deleteClaims(pvcs []*v1.PersistentVolumeClaim) { for _, pvc := range pvcs { - env.internalPVCCache.delete(pvc) + assumecache.DeleteTestObject(env.internalBinder.pvcCache, pvc) } } diff --git a/pkg/scheduler/util/assumecache/assume_cache.go b/pkg/scheduler/util/assumecache/assume_cache.go index 945c7a3efff..1fd1354c768 100644 --- a/pkg/scheduler/util/assumecache/assume_cache.go +++ b/pkg/scheduler/util/assumecache/assume_cache.go @@ -14,66 +14,125 @@ See the License for the specific language governing permissions and limitations under the License. */ -package volumebinding +package assumecache import ( + "errors" "fmt" "strconv" "sync" "k8s.io/klog/v2" - v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/client-go/tools/cache" - storagehelpers "k8s.io/component-helpers/storage/volume" ) // AssumeCache is a cache on top of the informer that allows for updating // objects outside of informer events and also restoring the informer -// cache's version of the object. Objects are assumed to be -// Kubernetes API objects that implement meta.Interface +// cache's version of the object. Objects are assumed to be +// Kubernetes API objects that are supported by [meta.Accessor]. +// +// Objects can referenced via their key, with [cache.MetaNamespaceKeyFunc] +// as key function. type AssumeCache interface { - // Assume updates the object in-memory only + // Assume updates the object in-memory only. + // + // The version of the object must be greater or equal to + // the current object, otherwise an error is returned. + // + // Storing an object with the same version is supported + // by the assume cache, but suffers from a race: if an + // update is received via the informer while such an + // object is assumed, it gets dropped in favor of the + // newer object from the apiserver. + // + // Only assuming objects that were returned by an apiserver + // operation (Update, Patch) is safe. Assume(obj interface{}) error - // Restore the informer cache's version of the object - Restore(objName string) + // Restore the informer cache's version of the object. + Restore(key string) - // Get the object by name - Get(objName string) (interface{}, error) + // Get the object by its key. + Get(key string) (interface{}, error) - // GetAPIObj gets the API object by name - GetAPIObj(objName string) (interface{}, error) + // GetAPIObj gets the informer cache's version by its key. + GetAPIObj(key string) (interface{}, error) - // List all the objects in the cache + // List all the objects in the cache. List(indexObj interface{}) []interface{} + + // getImplementation is used internally by [AddTestObject], [UpdateTestObject], [DeleteTestObject]. + getImplementation() *assumeCache } -type errWrongType struct { - typeName string - object interface{} +// Informer is the subset of [cache.SharedInformer] that NewAssumeCache depends upon. +type Informer interface { + AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) } -func (e *errWrongType) Error() string { - return fmt.Sprintf("could not convert object to type %v: %+v", e.typeName, e.object) +// AddTestObject adds an object to the assume cache. +// Only use this for unit testing! +func AddTestObject(cache AssumeCache, obj interface{}) { + cache.getImplementation().add(obj) } -type errNotFound struct { - typeName string - objectName string +// UpdateTestObject updates an object in the assume cache. +// Only use this for unit testing! +func UpdateTestObject(cache AssumeCache, obj interface{}) { + cache.getImplementation().update(nil, obj) } -func (e *errNotFound) Error() string { - return fmt.Sprintf("could not find %v %q", e.typeName, e.objectName) +// DeleteTestObject deletes object in the assume cache. +// Only use this for unit testing! +func DeleteTestObject(cache AssumeCache, obj interface{}) { + cache.getImplementation().delete(obj) } -type errObjectName struct { - detailedErr error +// Sentinel errors that can be checked for with errors.Is. +var ( + ErrWrongType = errors.New("object has wrong type") + ErrNotFound = errors.New("object not found") + ErrObjectName = errors.New("cannot determine object name") +) + +type WrongTypeError struct { + TypeName string + Object interface{} } -func (e *errObjectName) Error() string { - return fmt.Sprintf("failed to get object name: %v", e.detailedErr) +func (e WrongTypeError) Error() string { + return fmt.Sprintf("could not convert object to type %v: %+v", e.TypeName, e.Object) +} + +func (e WrongTypeError) Is(err error) bool { + return err == ErrWrongType +} + +type NotFoundError struct { + TypeName string + ObjectKey string +} + +func (e NotFoundError) Error() string { + return fmt.Sprintf("could not find %v %q", e.TypeName, e.ObjectKey) +} + +func (e NotFoundError) Is(err error) bool { + return err == ErrNotFound +} + +type ObjectNameError struct { + DetailedErr error +} + +func (e ObjectNameError) Error() string { + return fmt.Sprintf("failed to get object name: %v", e.DetailedErr) +} + +func (e ObjectNameError) Is(err error) bool { + return err == ErrObjectName } // assumeCache stores two pointers to represent a single object: @@ -119,7 +178,7 @@ type objInfo struct { func objInfoKeyFunc(obj interface{}) (string, error) { objInfo, ok := obj.(*objInfo) if !ok { - return "", &errWrongType{"objInfo", obj} + return "", &WrongTypeError{TypeName: "objInfo", Object: obj} } return objInfo.name, nil } @@ -127,13 +186,13 @@ func objInfoKeyFunc(obj interface{}) (string, error) { func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) { objInfo, ok := obj.(*objInfo) if !ok { - return []string{""}, &errWrongType{"objInfo", obj} + return []string{""}, &WrongTypeError{TypeName: "objInfo", Object: obj} } return c.indexFunc(objInfo.latestObj) } // NewAssumeCache creates an assume cache for general objects. -func NewAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache { +func NewAssumeCache(logger klog.Logger, informer Informer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache { c := &assumeCache{ logger: logger, description: description, @@ -148,7 +207,8 @@ func NewAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer, desc // Unit tests don't use informers if informer != nil { - informer.AddEventHandler( + // Cannot fail in practice?! No-one bothers checking the error. + _, _ = informer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: c.add, UpdateFunc: c.update, @@ -159,6 +219,10 @@ func NewAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer, desc return c } +func (c *assumeCache) getImplementation() *assumeCache { + return c +} + func (c *assumeCache) add(obj interface{}) { if obj == nil { return @@ -166,7 +230,7 @@ func (c *assumeCache) add(obj interface{}) { name, err := cache.MetaNamespaceKeyFunc(obj) if err != nil { - c.logger.Error(&errObjectName{err}, "Add failed") + c.logger.Error(&ObjectNameError{err}, "Add failed") return } @@ -213,7 +277,7 @@ func (c *assumeCache) delete(obj interface{}) { name, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { - c.logger.Error(&errObjectName{err}, "Failed to delete") + c.logger.Error(&ObjectNameError{err}, "Failed to delete") return } @@ -235,43 +299,44 @@ func (c *assumeCache) getObjVersion(name string, obj interface{}) (int64, error) objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64) if err != nil { - return -1, fmt.Errorf("error parsing ResourceVersion %q for %v %q: %s", objAccessor.GetResourceVersion(), c.description, name, err) + //nolint:errorlint // Intentionally not wrapping the error, the underlying error is an implementation detail. + return -1, fmt.Errorf("error parsing ResourceVersion %q for %v %q: %v", objAccessor.GetResourceVersion(), c.description, name, err) } return objResourceVersion, nil } -func (c *assumeCache) getObjInfo(name string) (*objInfo, error) { - obj, ok, err := c.store.GetByKey(name) +func (c *assumeCache) getObjInfo(key string) (*objInfo, error) { + obj, ok, err := c.store.GetByKey(key) if err != nil { return nil, err } if !ok { - return nil, &errNotFound{c.description, name} + return nil, &NotFoundError{TypeName: c.description, ObjectKey: key} } objInfo, ok := obj.(*objInfo) if !ok { - return nil, &errWrongType{"objInfo", obj} + return nil, &WrongTypeError{"objInfo", obj} } return objInfo, nil } -func (c *assumeCache) Get(objName string) (interface{}, error) { +func (c *assumeCache) Get(key string) (interface{}, error) { c.rwMutex.RLock() defer c.rwMutex.RUnlock() - objInfo, err := c.getObjInfo(objName) + objInfo, err := c.getObjInfo(key) if err != nil { return nil, err } return objInfo.latestObj, nil } -func (c *assumeCache) GetAPIObj(objName string) (interface{}, error) { +func (c *assumeCache) GetAPIObj(key string) (interface{}, error) { c.rwMutex.RLock() defer c.rwMutex.RUnlock() - objInfo, err := c.getObjInfo(objName) + objInfo, err := c.getObjInfo(key) if err != nil { return nil, err } @@ -298,7 +363,7 @@ func (c *assumeCache) List(indexObj interface{}) []interface{} { for _, obj := range objs { objInfo, ok := obj.(*objInfo) if !ok { - c.logger.Error(&errWrongType{"objInfo", obj}, "List error") + c.logger.Error(&WrongTypeError{TypeName: "objInfo", Object: obj}, "List error") continue } allObjs = append(allObjs, objInfo.latestObj) @@ -309,7 +374,7 @@ func (c *assumeCache) List(indexObj interface{}) []interface{} { func (c *assumeCache) Assume(obj interface{}) error { name, err := cache.MetaNamespaceKeyFunc(obj) if err != nil { - return &errObjectName{err} + return &ObjectNameError{err} } c.rwMutex.Lock() @@ -353,125 +418,3 @@ func (c *assumeCache) Restore(objName string) { c.logger.V(4).Info("Restored object", "description", c.description, "cacheKey", objName) } } - -// PVAssumeCache is a AssumeCache for PersistentVolume objects -type PVAssumeCache interface { - AssumeCache - - GetPV(pvName string) (*v1.PersistentVolume, error) - GetAPIPV(pvName string) (*v1.PersistentVolume, error) - ListPVs(storageClassName string) []*v1.PersistentVolume -} - -type pvAssumeCache struct { - AssumeCache - logger klog.Logger -} - -func pvStorageClassIndexFunc(obj interface{}) ([]string, error) { - if pv, ok := obj.(*v1.PersistentVolume); ok { - return []string{storagehelpers.GetPersistentVolumeClass(pv)}, nil - } - return []string{""}, fmt.Errorf("object is not a v1.PersistentVolume: %v", obj) -} - -// NewPVAssumeCache creates a PV assume cache. -func NewPVAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer) PVAssumeCache { - logger = klog.LoggerWithName(logger, "PV Cache") - return &pvAssumeCache{ - AssumeCache: NewAssumeCache(logger, informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc), - logger: logger, - } -} - -func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) { - obj, err := c.Get(pvName) - if err != nil { - return nil, err - } - - pv, ok := obj.(*v1.PersistentVolume) - if !ok { - return nil, &errWrongType{"v1.PersistentVolume", obj} - } - return pv, nil -} - -func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) { - obj, err := c.GetAPIObj(pvName) - if err != nil { - return nil, err - } - pv, ok := obj.(*v1.PersistentVolume) - if !ok { - return nil, &errWrongType{"v1.PersistentVolume", obj} - } - return pv, nil -} - -func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume { - objs := c.List(&v1.PersistentVolume{ - Spec: v1.PersistentVolumeSpec{ - StorageClassName: storageClassName, - }, - }) - pvs := []*v1.PersistentVolume{} - for _, obj := range objs { - pv, ok := obj.(*v1.PersistentVolume) - if !ok { - c.logger.Error(&errWrongType{"v1.PersistentVolume", obj}, "ListPVs") - continue - } - pvs = append(pvs, pv) - } - return pvs -} - -// PVCAssumeCache is a AssumeCache for PersistentVolumeClaim objects -type PVCAssumeCache interface { - AssumeCache - - // GetPVC returns the PVC from the cache with given pvcKey. - // pvcKey is the result of MetaNamespaceKeyFunc on PVC obj - GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) - GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) -} - -type pvcAssumeCache struct { - AssumeCache - logger klog.Logger -} - -// NewPVCAssumeCache creates a PVC assume cache. -func NewPVCAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer) PVCAssumeCache { - logger = klog.LoggerWithName(logger, "PVC Cache") - return &pvcAssumeCache{ - AssumeCache: NewAssumeCache(logger, informer, "v1.PersistentVolumeClaim", "", nil), - logger: logger, - } -} - -func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { - obj, err := c.Get(pvcKey) - if err != nil { - return nil, err - } - - pvc, ok := obj.(*v1.PersistentVolumeClaim) - if !ok { - return nil, &errWrongType{"v1.PersistentVolumeClaim", obj} - } - return pvc, nil -} - -func (c *pvcAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { - obj, err := c.GetAPIObj(pvcKey) - if err != nil { - return nil, err - } - pvc, ok := obj.(*v1.PersistentVolumeClaim) - if !ok { - return nil, &errWrongType{"v1.PersistentVolumeClaim", obj} - } - return pvc, nil -} diff --git a/pkg/scheduler/util/assumecache/assume_cache_test.go b/pkg/scheduler/util/assumecache/assume_cache_test.go index 7391a412f89..febcb9e0a01 100644 --- a/pkg/scheduler/util/assumecache/assume_cache_test.go +++ b/pkg/scheduler/util/assumecache/assume_cache_test.go @@ -14,457 +14,314 @@ See the License for the specific language governing permissions and limitations under the License. */ -package volumebinding +package assumecache import ( "fmt" + "slices" "testing" - v1 "k8s.io/api/core/v1" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/component-helpers/storage/volume" - "k8s.io/klog/v2/ktesting" + "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/test/utils/ktesting" ) -func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) { - pvList := cache.ListPVs(storageClassName) - if len(pvList) != len(expectedPVs) { - t.Errorf("ListPVs() returned %v PVs, expected %v", len(pvList), len(expectedPVs)) +// testInformer implements [Informer] and can be used to feed changes into an assume +// cache during unit testing. Only a single event handler is supported, which is +// sufficient for one assume cache. +type testInformer struct { + handler cache.ResourceEventHandler +} + +func (i *testInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { + i.handler = handler + return nil, nil +} + +func (i *testInformer) add(obj interface{}) { + if i.handler == nil { + return } - for _, pv := range pvList { - expectedPV, ok := expectedPVs[pv.Name] - if !ok { - t.Errorf("ListPVs() returned unexpected PV %q", pv.Name) - } - if expectedPV != pv { - t.Errorf("ListPVs() returned PV %p, expected %p", pv, expectedPV) - } + i.handler.OnAdd(obj, false) +} + +func (i *testInformer) update(obj interface{}) { + if i.handler == nil { + return + } + i.handler.OnUpdate(nil, obj) +} + +func (i *testInformer) delete(obj interface{}) { + if i.handler == nil { + return + } + i.handler.OnDelete(obj) +} + +func makeObj(name, version, namespace string) metav1.Object { + return &metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + ResourceVersion: version, } } -func verifyPV(cache PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error { - pv, err := cache.GetPV(name) +func newTest(t *testing.T) (ktesting.TContext, AssumeCache, *testInformer) { + return newTestWithIndexer(t, "", nil) +} + +func newTestWithIndexer(t *testing.T, indexName string, indexFunc cache.IndexFunc) (ktesting.TContext, AssumeCache, *testInformer) { + tCtx := ktesting.Init(t) + informer := new(testInformer) + cache := NewAssumeCache(tCtx.Logger(), informer, "TestObject", indexName, indexFunc) + return tCtx, cache, informer +} + +func verify(tCtx ktesting.TContext, cache AssumeCache, key string, expectedObject, expectedAPIObject interface{}) { + tCtx.Helper() + actualObject, err := cache.Get(key) if err != nil { - return err + tCtx.Fatalf("unexpected error retrieving object for key %s: %v", key, err) } - if pv != expectedPV { - return fmt.Errorf("GetPV() returned %p, expected %p", pv, expectedPV) + if actualObject != expectedObject { + tCtx.Fatalf("Get() returned %v, expected %v", actualObject, expectedObject) + } + actualAPIObject, err := cache.GetAPIObj(key) + if err != nil { + tCtx.Fatalf("unexpected error retrieving API object for key %s: %v", key, err) + } + if actualAPIObject != expectedAPIObject { + tCtx.Fatalf("GetAPIObject() returned %v, expected %v", actualAPIObject, expectedAPIObject) } - return nil } -func TestAssumePV(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) +func verifyList(tCtx ktesting.TContext, assumeCache AssumeCache, expectedObjs []interface{}, indexObj interface{}) { + actualObjs := assumeCache.List(indexObj) + diff := cmp.Diff(expectedObjs, actualObjs, cmpopts.SortSlices(func(x, y interface{}) bool { + xKey, err := cache.MetaNamespaceKeyFunc(x) + if err != nil { + tCtx.Fatalf("unexpected error determining key for %v: %v", x, err) + } + yKey, err := cache.MetaNamespaceKeyFunc(y) + if err != nil { + tCtx.Fatalf("unexpected error determining key for %v: %v", y, err) + } + return xKey < yKey + })) + if diff != "" { + tCtx.Fatalf("List() result differs (- expected, + actual):\n%s", diff) + } +} + +func TestAssume(t *testing.T) { scenarios := map[string]struct { - oldPV *v1.PersistentVolume - newPV *v1.PersistentVolume - shouldSucceed bool + oldObj metav1.Object + newObj interface{} + expectErr error }{ "success-same-version": { - oldPV: makePV("pv1", "").withVersion("5").PersistentVolume, - newPV: makePV("pv1", "").withVersion("5").PersistentVolume, - shouldSucceed: true, - }, - "success-storageclass-same-version": { - oldPV: makePV("pv1", "class1").withVersion("5").PersistentVolume, - newPV: makePV("pv1", "class1").withVersion("5").PersistentVolume, - shouldSucceed: true, + oldObj: makeObj("pvc1", "5", ""), + newObj: makeObj("pvc1", "5", ""), }, "success-new-higher-version": { - oldPV: makePV("pv1", "").withVersion("5").PersistentVolume, - newPV: makePV("pv1", "").withVersion("6").PersistentVolume, - shouldSucceed: true, + oldObj: makeObj("pvc1", "5", ""), + newObj: makeObj("pvc1", "6", ""), }, "fail-old-not-found": { - oldPV: makePV("pv2", "").withVersion("5").PersistentVolume, - newPV: makePV("pv1", "").withVersion("5").PersistentVolume, - shouldSucceed: false, + oldObj: makeObj("pvc2", "5", ""), + newObj: makeObj("pvc1", "5", ""), + expectErr: ErrNotFound, }, "fail-new-lower-version": { - oldPV: makePV("pv1", "").withVersion("5").PersistentVolume, - newPV: makePV("pv1", "").withVersion("4").PersistentVolume, - shouldSucceed: false, + oldObj: makeObj("pvc1", "5", ""), + newObj: makeObj("pvc1", "4", ""), + expectErr: cmpopts.AnyError, }, "fail-new-bad-version": { - oldPV: makePV("pv1", "").withVersion("5").PersistentVolume, - newPV: makePV("pv1", "").withVersion("a").PersistentVolume, - shouldSucceed: false, + oldObj: makeObj("pvc1", "5", ""), + newObj: makeObj("pvc1", "a", ""), + expectErr: cmpopts.AnyError, }, "fail-old-bad-version": { - oldPV: makePV("pv1", "").withVersion("a").PersistentVolume, - newPV: makePV("pv1", "").withVersion("5").PersistentVolume, - shouldSucceed: false, + oldObj: makeObj("pvc1", "a", ""), + newObj: makeObj("pvc1", "5", ""), + expectErr: cmpopts.AnyError, + }, + "fail-new-bad-object": { + oldObj: makeObj("pvc1", "5", ""), + newObj: 1, + expectErr: ErrObjectName, }, } for name, scenario := range scenarios { - cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } + t.Run(name, func(t *testing.T) { + tCtx, cache, informer := newTest(t) - // Add oldPV to cache - internalCache.add(scenario.oldPV) - if err := verifyPV(cache, scenario.oldPV.Name, scenario.oldPV); err != nil { - t.Errorf("Failed to GetPV() after initial update: %v", err) - continue - } + // Add old object to cache. + informer.add(scenario.oldObj) + verify(tCtx, cache, scenario.oldObj.GetName(), scenario.oldObj, scenario.oldObj) - // Assume newPV - err := cache.Assume(scenario.newPV) - if scenario.shouldSucceed && err != nil { - t.Errorf("Test %q failed: Assume() returned error %v", name, err) - } - if !scenario.shouldSucceed && err == nil { - t.Errorf("Test %q failed: Assume() returned success but expected error", name) - } + // Assume new object. + err := cache.Assume(scenario.newObj) + if diff := cmp.Diff(scenario.expectErr, err, cmpopts.EquateErrors()); diff != "" { + t.Errorf("Assume() returned error: %v\ndiff (- expected, + actual):\n%s", err, diff) + } - // Check that GetPV returns correct PV - expectedPV := scenario.newPV - if !scenario.shouldSucceed { - expectedPV = scenario.oldPV - } - if err := verifyPV(cache, scenario.oldPV.Name, expectedPV); err != nil { - t.Errorf("Failed to GetPV() after initial update: %v", err) - } + // Check that Get returns correct object. + expectedObj := scenario.newObj + if scenario.expectErr != nil { + expectedObj = scenario.oldObj + } + verify(tCtx, cache, scenario.oldObj.GetName(), expectedObj, scenario.oldObj) + }) } } -func TestRestorePV(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) - cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } +func TestRestore(t *testing.T) { + tCtx, cache, informer := newTest(t) - oldPV := makePV("pv1", "").withVersion("5").PersistentVolume - newPV := makePV("pv1", "").withVersion("5").PersistentVolume + // This test assumes an object with the same version as the API object. + // The assume cache supports that, but doing so in real code suffers from + // a race: if an unrelated update is received from the apiserver while + // such an object is assumed, the local modification gets dropped. + oldObj := makeObj("pvc1", "5", "") + newObj := makeObj("pvc1", "5", "") - // Restore PV that doesn't exist + // Restore object that doesn't exist cache.Restore("nothing") - // Add oldPV to cache - internalCache.add(oldPV) - if err := verifyPV(cache, oldPV.Name, oldPV); err != nil { - t.Fatalf("Failed to GetPV() after initial update: %v", err) - } + // Add old object to cache. + informer.add(oldObj) + verify(ktesting.WithStep(tCtx, "after initial update"), cache, oldObj.GetName(), oldObj, oldObj) - // Restore PV - cache.Restore(oldPV.Name) - if err := verifyPV(cache, oldPV.Name, oldPV); err != nil { - t.Fatalf("Failed to GetPV() after initial restore: %v", err) - } + // Restore object. + cache.Restore(oldObj.GetName()) + verify(ktesting.WithStep(tCtx, "after initial Restore"), cache, oldObj.GetName(), oldObj, oldObj) - // Assume newPV - if err := cache.Assume(newPV); err != nil { + // Assume new object. + if err := cache.Assume(newObj); err != nil { t.Fatalf("Assume() returned error %v", err) } - if err := verifyPV(cache, oldPV.Name, newPV); err != nil { - t.Fatalf("Failed to GetPV() after Assume: %v", err) - } + verify(ktesting.WithStep(tCtx, "after Assume"), cache, oldObj.GetName(), newObj, oldObj) - // Restore PV - cache.Restore(oldPV.Name) - if err := verifyPV(cache, oldPV.Name, oldPV); err != nil { - t.Fatalf("Failed to GetPV() after restore: %v", err) + // Restore object. + cache.Restore(oldObj.GetName()) + verify(ktesting.WithStep(tCtx, "after second Restore"), cache, oldObj.GetName(), oldObj, oldObj) +} + +func TestEvents(t *testing.T) { + tCtx, cache, informer := newTest(t) + + oldObj := makeObj("pvc1", "5", "") + newObj := makeObj("pvc1", "6", "") + key := oldObj.GetName() + + // Add old object to cache. + informer.add(oldObj) + verify(ktesting.WithStep(tCtx, "after initial update"), cache, key, oldObj, oldObj) + + // Update object. + informer.update(newObj) + verify(ktesting.WithStep(tCtx, "after initial update"), cache, key, newObj, newObj) + + // Some error cases (don't occur in practice). + informer.add(1) + verify(ktesting.WithStep(tCtx, "after nop add"), cache, key, newObj, newObj) + informer.add(nil) + verify(ktesting.WithStep(tCtx, "after nil add"), cache, key, newObj, newObj) + informer.update(oldObj) + verify(ktesting.WithStep(tCtx, "after nop update"), cache, key, newObj, newObj) + informer.update(nil) + verify(ktesting.WithStep(tCtx, "after nil update"), cache, key, newObj, newObj) + informer.delete(nil) + verify(ktesting.WithStep(tCtx, "after nop delete"), cache, key, newObj, newObj) + + // Delete object. + informer.delete(oldObj) + _, err := cache.Get(key) + if diff := cmp.Diff(ErrNotFound, err, cmpopts.EquateErrors()); diff != "" { + t.Errorf("Get did not return expected error: %v\ndiff (- expected, + actual):\n%s", err, diff) } } -func TestBasicPVCache(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) - cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } +func TestListNoIndexer(t *testing.T) { + tCtx, cache, informer := newTest(t) - // Get object that doesn't exist - pv, err := cache.GetPV("nothere") - if err == nil { - t.Errorf("GetPV() returned unexpected success") - } - if pv != nil { - t.Errorf("GetPV() returned unexpected PV %q", pv.Name) - } - - // Add a bunch of PVs - pvs := map[string]*v1.PersistentVolume{} + // Add a bunch of objects. + objs := make([]interface{}, 0, 10) for i := 0; i < 10; i++ { - pv := makePV(fmt.Sprintf("test-pv%v", i), "").withVersion("1").PersistentVolume - pvs[pv.Name] = pv - internalCache.add(pv) + obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", "") + objs = append(objs, obj) + informer.add(obj) } // List them - verifyListPVs(t, cache, pvs, "") + verifyList(ktesting.WithStep(tCtx, "after add"), cache, objs, "") - // Update a PV - updatedPV := makePV("test-pv3", "").withVersion("2").PersistentVolume - pvs[updatedPV.Name] = updatedPV - internalCache.update(nil, updatedPV) + // Update an object. + updatedObj := makeObj("test-pvc3", "2", "") + objs[3] = updatedObj + informer.update(updatedObj) // List them - verifyListPVs(t, cache, pvs, "") + verifyList(ktesting.WithStep(tCtx, "after update"), cache, objs, "") // Delete a PV - deletedPV := pvs["test-pv7"] - delete(pvs, deletedPV.Name) - internalCache.delete(deletedPV) + deletedObj := objs[7] + objs = slices.Delete(objs, 7, 8) + informer.delete(deletedObj) // List them - verifyListPVs(t, cache, pvs, "") + verifyList(ktesting.WithStep(tCtx, "after delete"), cache, objs, "") } -func TestPVCacheWithStorageClasses(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) - cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") +func TestListWithIndexer(t *testing.T) { + namespaceIndexer := func(obj interface{}) ([]string, error) { + objAccessor, err := meta.Accessor(obj) + if err != nil { + return nil, err + } + return []string{objAccessor.GetNamespace()}, nil + } + tCtx, cache, informer := newTestWithIndexer(t, "myNamespace", namespaceIndexer) + + // Add a bunch of objects. + ns := "ns1" + objs := make([]interface{}, 0, 10) + for i := 0; i < 10; i++ { + obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", ns) + objs = append(objs, obj) + informer.add(obj) } - // Add a bunch of PVs - pvs1 := map[string]*v1.PersistentVolume{} + // Add a bunch of other objects. for i := 0; i < 10; i++ { - pv := makePV(fmt.Sprintf("test-pv%v", i), "class1").withVersion("1").PersistentVolume - pvs1[pv.Name] = pv - internalCache.add(pv) - } - - // Add a bunch of PVs - pvs2 := map[string]*v1.PersistentVolume{} - for i := 0; i < 10; i++ { - pv := makePV(fmt.Sprintf("test2-pv%v", i), "class2").withVersion("1").PersistentVolume - pvs2[pv.Name] = pv - internalCache.add(pv) + obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", "ns2") + informer.add(obj) } // List them - verifyListPVs(t, cache, pvs1, "class1") - verifyListPVs(t, cache, pvs2, "class2") + verifyList(ktesting.WithStep(tCtx, "after add"), cache, objs, objs[0]) - // Update a PV - updatedPV := makePV("test-pv3", "class1").withVersion("2").PersistentVolume - pvs1[updatedPV.Name] = updatedPV - internalCache.update(nil, updatedPV) + // Update an object. + updatedObj := makeObj("test-pvc3", "2", ns) + objs[3] = updatedObj + informer.update(updatedObj) // List them - verifyListPVs(t, cache, pvs1, "class1") - verifyListPVs(t, cache, pvs2, "class2") + verifyList(ktesting.WithStep(tCtx, "after update"), cache, objs, objs[0]) // Delete a PV - deletedPV := pvs1["test-pv7"] - delete(pvs1, deletedPV.Name) - internalCache.delete(deletedPV) + deletedObj := objs[7] + objs = slices.Delete(objs, 7, 8) + informer.delete(deletedObj) // List them - verifyListPVs(t, cache, pvs1, "class1") - verifyListPVs(t, cache, pvs2, "class2") -} - -func TestAssumeUpdatePVCache(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) - cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } - - pvName := "test-pv0" - - // Add a PV - pv := makePV(pvName, "").withVersion("1").PersistentVolume - internalCache.add(pv) - if err := verifyPV(cache, pvName, pv); err != nil { - t.Fatalf("failed to get PV: %v", err) - } - - // Assume PV - newPV := pv.DeepCopy() - newPV.Spec.ClaimRef = &v1.ObjectReference{Name: "test-claim"} - if err := cache.Assume(newPV); err != nil { - t.Fatalf("failed to assume PV: %v", err) - } - if err := verifyPV(cache, pvName, newPV); err != nil { - t.Fatalf("failed to get PV after assume: %v", err) - } - - // Add old PV - internalCache.add(pv) - if err := verifyPV(cache, pvName, newPV); err != nil { - t.Fatalf("failed to get PV after old PV added: %v", err) - } -} - -func makeClaim(name, version, namespace string) *v1.PersistentVolumeClaim { - return &v1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: namespace, - ResourceVersion: version, - Annotations: map[string]string{}, - }, - } -} - -func verifyPVC(cache PVCAssumeCache, pvcKey string, expectedPVC *v1.PersistentVolumeClaim) error { - pvc, err := cache.GetPVC(pvcKey) - if err != nil { - return err - } - if pvc != expectedPVC { - return fmt.Errorf("GetPVC() returned %p, expected %p", pvc, expectedPVC) - } - return nil -} - -func TestAssumePVC(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) - scenarios := map[string]struct { - oldPVC *v1.PersistentVolumeClaim - newPVC *v1.PersistentVolumeClaim - shouldSucceed bool - }{ - "success-same-version": { - oldPVC: makeClaim("pvc1", "5", "ns1"), - newPVC: makeClaim("pvc1", "5", "ns1"), - shouldSucceed: true, - }, - "success-new-higher-version": { - oldPVC: makeClaim("pvc1", "5", "ns1"), - newPVC: makeClaim("pvc1", "6", "ns1"), - shouldSucceed: true, - }, - "fail-old-not-found": { - oldPVC: makeClaim("pvc2", "5", "ns1"), - newPVC: makeClaim("pvc1", "5", "ns1"), - shouldSucceed: false, - }, - "fail-new-lower-version": { - oldPVC: makeClaim("pvc1", "5", "ns1"), - newPVC: makeClaim("pvc1", "4", "ns1"), - shouldSucceed: false, - }, - "fail-new-bad-version": { - oldPVC: makeClaim("pvc1", "5", "ns1"), - newPVC: makeClaim("pvc1", "a", "ns1"), - shouldSucceed: false, - }, - "fail-old-bad-version": { - oldPVC: makeClaim("pvc1", "a", "ns1"), - newPVC: makeClaim("pvc1", "5", "ns1"), - shouldSucceed: false, - }, - } - - for name, scenario := range scenarios { - cache := NewPVCAssumeCache(logger, nil) - internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } - - // Add oldPVC to cache - internalCache.add(scenario.oldPVC) - if err := verifyPVC(cache, getPVCName(scenario.oldPVC), scenario.oldPVC); err != nil { - t.Errorf("Failed to GetPVC() after initial update: %v", err) - continue - } - - // Assume newPVC - err := cache.Assume(scenario.newPVC) - if scenario.shouldSucceed && err != nil { - t.Errorf("Test %q failed: Assume() returned error %v", name, err) - } - if !scenario.shouldSucceed && err == nil { - t.Errorf("Test %q failed: Assume() returned success but expected error", name) - } - - // Check that GetPVC returns correct PVC - expectedPV := scenario.newPVC - if !scenario.shouldSucceed { - expectedPV = scenario.oldPVC - } - if err := verifyPVC(cache, getPVCName(scenario.oldPVC), expectedPV); err != nil { - t.Errorf("Failed to GetPVC() after initial update: %v", err) - } - } -} - -func TestRestorePVC(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) - cache := NewPVCAssumeCache(logger, nil) - internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } - - oldPVC := makeClaim("pvc1", "5", "ns1") - newPVC := makeClaim("pvc1", "5", "ns1") - - // Restore PVC that doesn't exist - cache.Restore("nothing") - - // Add oldPVC to cache - internalCache.add(oldPVC) - if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil { - t.Fatalf("Failed to GetPVC() after initial update: %v", err) - } - - // Restore PVC - cache.Restore(getPVCName(oldPVC)) - if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil { - t.Fatalf("Failed to GetPVC() after initial restore: %v", err) - } - - // Assume newPVC - if err := cache.Assume(newPVC); err != nil { - t.Fatalf("Assume() returned error %v", err) - } - if err := verifyPVC(cache, getPVCName(oldPVC), newPVC); err != nil { - t.Fatalf("Failed to GetPVC() after Assume: %v", err) - } - - // Restore PVC - cache.Restore(getPVCName(oldPVC)) - if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil { - t.Fatalf("Failed to GetPVC() after restore: %v", err) - } -} - -func TestAssumeUpdatePVCCache(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) - cache := NewPVCAssumeCache(logger, nil) - internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } - - pvcName := "test-pvc0" - pvcNamespace := "test-ns" - - // Add a PVC - pvc := makeClaim(pvcName, "1", pvcNamespace) - internalCache.add(pvc) - if err := verifyPVC(cache, getPVCName(pvc), pvc); err != nil { - t.Fatalf("failed to get PVC: %v", err) - } - - // Assume PVC - newPVC := pvc.DeepCopy() - newPVC.Annotations[volume.AnnSelectedNode] = "test-node" - if err := cache.Assume(newPVC); err != nil { - t.Fatalf("failed to assume PVC: %v", err) - } - if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil { - t.Fatalf("failed to get PVC after assume: %v", err) - } - - // Add old PVC - internalCache.add(pvc) - if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil { - t.Fatalf("failed to get PVC after old PVC added: %v", err) - } + verifyList(ktesting.WithStep(tCtx, "after delete"), cache, objs, objs[0]) }