Merge pull request #124102 from pohly/dra-scheduler-assume-cache

scheduler: move assume cache to utils
This commit is contained in:
Kubernetes Prow Robot 2024-04-26 08:49:12 -07:00 committed by GitHub
commit cffc2c0b40
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 785 additions and 447 deletions

View File

@ -46,8 +46,8 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
"k8s.io/utils/ptr"
)
@ -302,7 +302,7 @@ type dynamicResources struct {
// When implementing cluster autoscaler support, this assume cache or
// something like it (see https://github.com/kubernetes/kubernetes/pull/112202)
// might have to be managed by the cluster autoscaler.
claimAssumeCache volumebinding.AssumeCache
claimAssumeCache *assumecache.AssumeCache
// inFlightAllocations is map from claim UUIDs to claim objects for those claims
// for which allocation was triggered during a scheduling cycle and the
@ -355,7 +355,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
classParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Lister(),
resourceSliceLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceSlices().Lister(),
claimNameLookup: resourceclaim.NewNameLookup(fh.ClientSet()),
claimAssumeCache: volumebinding.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil),
claimAssumeCache: assumecache.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil),
}
return pl, nil

View File

@ -18,353 +18,17 @@ package volumebinding
import (
"fmt"
"strconv"
"sync"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/tools/cache"
storagehelpers "k8s.io/component-helpers/storage/volume"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)
// AssumeCache is a cache on top of the informer that allows for updating
// objects outside of informer events and also restoring the informer
// cache's version of the object. Objects are assumed to be
// Kubernetes API objects that implement meta.Interface
type AssumeCache interface {
// Assume updates the object in-memory only
Assume(obj interface{}) error
// Restore the informer cache's version of the object
Restore(objName string)
// Get the object by name
Get(objName string) (interface{}, error)
// GetAPIObj gets the API object by name
GetAPIObj(objName string) (interface{}, error)
// List all the objects in the cache
List(indexObj interface{}) []interface{}
}
type errWrongType struct {
typeName string
object interface{}
}
func (e *errWrongType) Error() string {
return fmt.Sprintf("could not convert object to type %v: %+v", e.typeName, e.object)
}
type errNotFound struct {
typeName string
objectName string
}
func (e *errNotFound) Error() string {
return fmt.Sprintf("could not find %v %q", e.typeName, e.objectName)
}
type errObjectName struct {
detailedErr error
}
func (e *errObjectName) Error() string {
return fmt.Sprintf("failed to get object name: %v", e.detailedErr)
}
// assumeCache stores two pointers to represent a single object:
// - The pointer to the informer object.
// - The pointer to the latest object, which could be the same as
// the informer object, or an in-memory object.
//
// An informer update always overrides the latest object pointer.
//
// Assume() only updates the latest object pointer.
// Restore() sets the latest object pointer back to the informer object.
// Get/List() always returns the latest object pointer.
type assumeCache struct {
// The logger that was chosen when setting up the cache.
// Will be used for all operations.
logger klog.Logger
// Synchronizes updates to store
rwMutex sync.RWMutex
// describes the object stored
description string
// Stores objInfo pointers
store cache.Indexer
// Index function for object
indexFunc cache.IndexFunc
indexName string
}
type objInfo struct {
// name of the object
name string
// Latest version of object could be cached-only or from informer
latestObj interface{}
// Latest object from informer
apiObj interface{}
}
func objInfoKeyFunc(obj interface{}) (string, error) {
objInfo, ok := obj.(*objInfo)
if !ok {
return "", &errWrongType{"objInfo", obj}
}
return objInfo.name, nil
}
func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) {
objInfo, ok := obj.(*objInfo)
if !ok {
return []string{""}, &errWrongType{"objInfo", obj}
}
return c.indexFunc(objInfo.latestObj)
}
// NewAssumeCache creates an assume cache for general objects.
func NewAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache {
c := &assumeCache{
logger: logger,
description: description,
indexFunc: indexFunc,
indexName: indexName,
}
indexers := cache.Indexers{}
if indexName != "" && indexFunc != nil {
indexers[indexName] = c.objInfoIndexFunc
}
c.store = cache.NewIndexer(objInfoKeyFunc, indexers)
// Unit tests don't use informers
if informer != nil {
informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.add,
UpdateFunc: c.update,
DeleteFunc: c.delete,
},
)
}
return c
}
func (c *assumeCache) add(obj interface{}) {
if obj == nil {
return
}
name, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
c.logger.Error(&errObjectName{err}, "Add failed")
return
}
c.rwMutex.Lock()
defer c.rwMutex.Unlock()
if objInfo, _ := c.getObjInfo(name); objInfo != nil {
newVersion, err := c.getObjVersion(name, obj)
if err != nil {
c.logger.Error(err, "Add failed: couldn't get object version")
return
}
storedVersion, err := c.getObjVersion(name, objInfo.latestObj)
if err != nil {
c.logger.Error(err, "Add failed: couldn't get stored object version")
return
}
// Only update object if version is newer.
// This is so we don't override assumed objects due to informer resync.
if newVersion <= storedVersion {
c.logger.V(10).Info("Skip adding object to assume cache because version is not newer than storedVersion", "description", c.description, "cacheKey", name, "newVersion", newVersion, "storedVersion", storedVersion)
return
}
}
objInfo := &objInfo{name: name, latestObj: obj, apiObj: obj}
if err = c.store.Update(objInfo); err != nil {
c.logger.Info("Error occurred while updating stored object", "err", err)
} else {
c.logger.V(10).Info("Adding object to assume cache", "description", c.description, "cacheKey", name, "assumeCache", obj)
}
}
func (c *assumeCache) update(oldObj interface{}, newObj interface{}) {
c.add(newObj)
}
func (c *assumeCache) delete(obj interface{}) {
if obj == nil {
return
}
name, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
c.logger.Error(&errObjectName{err}, "Failed to delete")
return
}
c.rwMutex.Lock()
defer c.rwMutex.Unlock()
objInfo := &objInfo{name: name}
err = c.store.Delete(objInfo)
if err != nil {
c.logger.Error(err, "Failed to delete", "description", c.description, "cacheKey", name)
}
}
func (c *assumeCache) getObjVersion(name string, obj interface{}) (int64, error) {
objAccessor, err := meta.Accessor(obj)
if err != nil {
return -1, err
}
objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64)
if err != nil {
return -1, fmt.Errorf("error parsing ResourceVersion %q for %v %q: %s", objAccessor.GetResourceVersion(), c.description, name, err)
}
return objResourceVersion, nil
}
func (c *assumeCache) getObjInfo(name string) (*objInfo, error) {
obj, ok, err := c.store.GetByKey(name)
if err != nil {
return nil, err
}
if !ok {
return nil, &errNotFound{c.description, name}
}
objInfo, ok := obj.(*objInfo)
if !ok {
return nil, &errWrongType{"objInfo", obj}
}
return objInfo, nil
}
func (c *assumeCache) Get(objName string) (interface{}, error) {
c.rwMutex.RLock()
defer c.rwMutex.RUnlock()
objInfo, err := c.getObjInfo(objName)
if err != nil {
return nil, err
}
return objInfo.latestObj, nil
}
func (c *assumeCache) GetAPIObj(objName string) (interface{}, error) {
c.rwMutex.RLock()
defer c.rwMutex.RUnlock()
objInfo, err := c.getObjInfo(objName)
if err != nil {
return nil, err
}
return objInfo.apiObj, nil
}
func (c *assumeCache) List(indexObj interface{}) []interface{} {
c.rwMutex.RLock()
defer c.rwMutex.RUnlock()
allObjs := []interface{}{}
var objs []interface{}
if c.indexName != "" {
o, err := c.store.Index(c.indexName, &objInfo{latestObj: indexObj})
if err != nil {
c.logger.Error(err, "List index error")
return nil
}
objs = o
} else {
objs = c.store.List()
}
for _, obj := range objs {
objInfo, ok := obj.(*objInfo)
if !ok {
c.logger.Error(&errWrongType{"objInfo", obj}, "List error")
continue
}
allObjs = append(allObjs, objInfo.latestObj)
}
return allObjs
}
func (c *assumeCache) Assume(obj interface{}) error {
name, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
return &errObjectName{err}
}
c.rwMutex.Lock()
defer c.rwMutex.Unlock()
objInfo, err := c.getObjInfo(name)
if err != nil {
return err
}
newVersion, err := c.getObjVersion(name, obj)
if err != nil {
return err
}
storedVersion, err := c.getObjVersion(name, objInfo.latestObj)
if err != nil {
return err
}
if newVersion < storedVersion {
return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion)
}
// Only update the cached object
objInfo.latestObj = obj
c.logger.V(4).Info("Assumed object", "description", c.description, "cacheKey", name, "version", newVersion)
return nil
}
func (c *assumeCache) Restore(objName string) {
c.rwMutex.Lock()
defer c.rwMutex.Unlock()
objInfo, err := c.getObjInfo(objName)
if err != nil {
// This could be expected if object got deleted
c.logger.V(5).Info("Restore object", "description", c.description, "cacheKey", objName, "err", err)
} else {
objInfo.latestObj = objInfo.apiObj
c.logger.V(4).Info("Restored object", "description", c.description, "cacheKey", objName)
}
}
// PVAssumeCache is a AssumeCache for PersistentVolume objects
type PVAssumeCache interface {
AssumeCache
GetPV(pvName string) (*v1.PersistentVolume, error)
GetAPIPV(pvName string) (*v1.PersistentVolume, error)
ListPVs(storageClassName string) []*v1.PersistentVolume
}
type pvAssumeCache struct {
AssumeCache
type PVAssumeCache struct {
*assumecache.AssumeCache
logger klog.Logger
}
@ -376,15 +40,15 @@ func pvStorageClassIndexFunc(obj interface{}) ([]string, error) {
}
// NewPVAssumeCache creates a PV assume cache.
func NewPVAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer) PVAssumeCache {
func NewPVAssumeCache(logger klog.Logger, informer assumecache.Informer) *PVAssumeCache {
logger = klog.LoggerWithName(logger, "PV Cache")
return &pvAssumeCache{
AssumeCache: NewAssumeCache(logger, informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc),
return &PVAssumeCache{
AssumeCache: assumecache.NewAssumeCache(logger, informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc),
logger: logger,
}
}
func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) {
func (c *PVAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) {
obj, err := c.Get(pvName)
if err != nil {
return nil, err
@ -392,24 +56,24 @@ func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) {
pv, ok := obj.(*v1.PersistentVolume)
if !ok {
return nil, &errWrongType{"v1.PersistentVolume", obj}
return nil, &assumecache.WrongTypeError{TypeName: "v1.PersistentVolume", Object: obj}
}
return pv, nil
}
func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) {
func (c *PVAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) {
obj, err := c.GetAPIObj(pvName)
if err != nil {
return nil, err
}
pv, ok := obj.(*v1.PersistentVolume)
if !ok {
return nil, &errWrongType{"v1.PersistentVolume", obj}
return nil, &assumecache.WrongTypeError{TypeName: "v1.PersistentVolume", Object: obj}
}
return pv, nil
}
func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume {
func (c *PVAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume {
objs := c.List(&v1.PersistentVolume{
Spec: v1.PersistentVolumeSpec{
StorageClassName: storageClassName,
@ -419,7 +83,7 @@ func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume
for _, obj := range objs {
pv, ok := obj.(*v1.PersistentVolume)
if !ok {
c.logger.Error(&errWrongType{"v1.PersistentVolume", obj}, "ListPVs")
c.logger.Error(&assumecache.WrongTypeError{TypeName: "v1.PersistentVolume", Object: obj}, "ListPVs")
continue
}
pvs = append(pvs, pv)
@ -428,30 +92,21 @@ func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume
}
// PVCAssumeCache is a AssumeCache for PersistentVolumeClaim objects
type PVCAssumeCache interface {
AssumeCache
// GetPVC returns the PVC from the cache with given pvcKey.
// pvcKey is the result of MetaNamespaceKeyFunc on PVC obj
GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
}
type pvcAssumeCache struct {
AssumeCache
type PVCAssumeCache struct {
*assumecache.AssumeCache
logger klog.Logger
}
// NewPVCAssumeCache creates a PVC assume cache.
func NewPVCAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer) PVCAssumeCache {
func NewPVCAssumeCache(logger klog.Logger, informer assumecache.Informer) *PVCAssumeCache {
logger = klog.LoggerWithName(logger, "PVC Cache")
return &pvcAssumeCache{
AssumeCache: NewAssumeCache(logger, informer, "v1.PersistentVolumeClaim", "", nil),
return &PVCAssumeCache{
AssumeCache: assumecache.NewAssumeCache(logger, informer, "v1.PersistentVolumeClaim", "", nil),
logger: logger,
}
}
func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
func (c *PVCAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
obj, err := c.Get(pvcKey)
if err != nil {
return nil, err
@ -459,19 +114,19 @@ func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
return nil, &errWrongType{"v1.PersistentVolumeClaim", obj}
return nil, &assumecache.WrongTypeError{TypeName: "v1.PersistentVolumeClaim", Object: obj}
}
return pvc, nil
}
func (c *pvcAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
func (c *PVCAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
obj, err := c.GetAPIObj(pvcKey)
if err != nil {
return nil, err
}
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
return nil, &errWrongType{"v1.PersistentVolumeClaim", obj}
return nil, &assumecache.WrongTypeError{TypeName: "v1.PersistentVolumeClaim", Object: obj}
}
return pvc, nil
}

View File

@ -24,9 +24,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/component-helpers/storage/volume"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)
func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) {
func verifyListPVs(t *testing.T, cache *PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) {
pvList := cache.ListPVs(storageClassName)
if len(pvList) != len(expectedPVs) {
t.Errorf("ListPVs() returned %v PVs, expected %v", len(pvList), len(expectedPVs))
@ -42,7 +43,7 @@ func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1
}
}
func verifyPV(cache PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error {
func verifyPV(cache *PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error {
pv, err := cache.GetPV(name)
if err != nil {
return err
@ -99,13 +100,9 @@ func TestAssumePV(t *testing.T) {
for name, scenario := range scenarios {
cache := NewPVAssumeCache(logger, nil)
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
// Add oldPV to cache
internalCache.add(scenario.oldPV)
assumecache.AddTestObject(cache.AssumeCache, scenario.oldPV)
if err := verifyPV(cache, scenario.oldPV.Name, scenario.oldPV); err != nil {
t.Errorf("Failed to GetPV() after initial update: %v", err)
continue
@ -134,10 +131,6 @@ func TestAssumePV(t *testing.T) {
func TestRestorePV(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cache := NewPVAssumeCache(logger, nil)
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
oldPV := makePV("pv1", "").withVersion("5").PersistentVolume
newPV := makePV("pv1", "").withVersion("5").PersistentVolume
@ -146,7 +139,7 @@ func TestRestorePV(t *testing.T) {
cache.Restore("nothing")
// Add oldPV to cache
internalCache.add(oldPV)
assumecache.AddTestObject(cache.AssumeCache, oldPV)
if err := verifyPV(cache, oldPV.Name, oldPV); err != nil {
t.Fatalf("Failed to GetPV() after initial update: %v", err)
}
@ -175,10 +168,6 @@ func TestRestorePV(t *testing.T) {
func TestBasicPVCache(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cache := NewPVAssumeCache(logger, nil)
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
// Get object that doesn't exist
pv, err := cache.GetPV("nothere")
@ -194,7 +183,7 @@ func TestBasicPVCache(t *testing.T) {
for i := 0; i < 10; i++ {
pv := makePV(fmt.Sprintf("test-pv%v", i), "").withVersion("1").PersistentVolume
pvs[pv.Name] = pv
internalCache.add(pv)
assumecache.AddTestObject(cache.AssumeCache, pv)
}
// List them
@ -203,7 +192,7 @@ func TestBasicPVCache(t *testing.T) {
// Update a PV
updatedPV := makePV("test-pv3", "").withVersion("2").PersistentVolume
pvs[updatedPV.Name] = updatedPV
internalCache.update(nil, updatedPV)
assumecache.UpdateTestObject(cache.AssumeCache, updatedPV)
// List them
verifyListPVs(t, cache, pvs, "")
@ -211,7 +200,7 @@ func TestBasicPVCache(t *testing.T) {
// Delete a PV
deletedPV := pvs["test-pv7"]
delete(pvs, deletedPV.Name)
internalCache.delete(deletedPV)
assumecache.DeleteTestObject(cache.AssumeCache, deletedPV)
// List them
verifyListPVs(t, cache, pvs, "")
@ -220,17 +209,13 @@ func TestBasicPVCache(t *testing.T) {
func TestPVCacheWithStorageClasses(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cache := NewPVAssumeCache(logger, nil)
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
// Add a bunch of PVs
pvs1 := map[string]*v1.PersistentVolume{}
for i := 0; i < 10; i++ {
pv := makePV(fmt.Sprintf("test-pv%v", i), "class1").withVersion("1").PersistentVolume
pvs1[pv.Name] = pv
internalCache.add(pv)
assumecache.AddTestObject(cache.AssumeCache, pv)
}
// Add a bunch of PVs
@ -238,7 +223,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) {
for i := 0; i < 10; i++ {
pv := makePV(fmt.Sprintf("test2-pv%v", i), "class2").withVersion("1").PersistentVolume
pvs2[pv.Name] = pv
internalCache.add(pv)
assumecache.AddTestObject(cache.AssumeCache, pv)
}
// List them
@ -248,7 +233,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) {
// Update a PV
updatedPV := makePV("test-pv3", "class1").withVersion("2").PersistentVolume
pvs1[updatedPV.Name] = updatedPV
internalCache.update(nil, updatedPV)
assumecache.UpdateTestObject(cache.AssumeCache, updatedPV)
// List them
verifyListPVs(t, cache, pvs1, "class1")
@ -257,7 +242,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) {
// Delete a PV
deletedPV := pvs1["test-pv7"]
delete(pvs1, deletedPV.Name)
internalCache.delete(deletedPV)
assumecache.DeleteTestObject(cache.AssumeCache, deletedPV)
// List them
verifyListPVs(t, cache, pvs1, "class1")
@ -267,16 +252,12 @@ func TestPVCacheWithStorageClasses(t *testing.T) {
func TestAssumeUpdatePVCache(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cache := NewPVAssumeCache(logger, nil)
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
pvName := "test-pv0"
// Add a PV
pv := makePV(pvName, "").withVersion("1").PersistentVolume
internalCache.add(pv)
assumecache.AddTestObject(cache.AssumeCache, pv)
if err := verifyPV(cache, pvName, pv); err != nil {
t.Fatalf("failed to get PV: %v", err)
}
@ -292,7 +273,7 @@ func TestAssumeUpdatePVCache(t *testing.T) {
}
// Add old PV
internalCache.add(pv)
assumecache.AddTestObject(cache.AssumeCache, pv)
if err := verifyPV(cache, pvName, newPV); err != nil {
t.Fatalf("failed to get PV after old PV added: %v", err)
}
@ -309,7 +290,7 @@ func makeClaim(name, version, namespace string) *v1.PersistentVolumeClaim {
}
}
func verifyPVC(cache PVCAssumeCache, pvcKey string, expectedPVC *v1.PersistentVolumeClaim) error {
func verifyPVC(cache *PVCAssumeCache, pvcKey string, expectedPVC *v1.PersistentVolumeClaim) error {
pvc, err := cache.GetPVC(pvcKey)
if err != nil {
return err
@ -361,13 +342,9 @@ func TestAssumePVC(t *testing.T) {
for name, scenario := range scenarios {
cache := NewPVCAssumeCache(logger, nil)
internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
// Add oldPVC to cache
internalCache.add(scenario.oldPVC)
assumecache.AddTestObject(cache.AssumeCache, scenario.oldPVC)
if err := verifyPVC(cache, getPVCName(scenario.oldPVC), scenario.oldPVC); err != nil {
t.Errorf("Failed to GetPVC() after initial update: %v", err)
continue
@ -396,10 +373,6 @@ func TestAssumePVC(t *testing.T) {
func TestRestorePVC(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cache := NewPVCAssumeCache(logger, nil)
internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
oldPVC := makeClaim("pvc1", "5", "ns1")
newPVC := makeClaim("pvc1", "5", "ns1")
@ -408,7 +381,7 @@ func TestRestorePVC(t *testing.T) {
cache.Restore("nothing")
// Add oldPVC to cache
internalCache.add(oldPVC)
assumecache.AddTestObject(cache.AssumeCache, oldPVC)
if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil {
t.Fatalf("Failed to GetPVC() after initial update: %v", err)
}
@ -437,17 +410,13 @@ func TestRestorePVC(t *testing.T) {
func TestAssumeUpdatePVCCache(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cache := NewPVCAssumeCache(logger, nil)
internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
pvcName := "test-pvc0"
pvcNamespace := "test-ns"
// Add a PVC
pvc := makeClaim(pvcName, "1", pvcNamespace)
internalCache.add(pvc)
assumecache.AddTestObject(cache.AssumeCache, pvc)
if err := verifyPVC(cache, getPVCName(pvc), pvc); err != nil {
t.Fatalf("failed to get PVC: %v", err)
}
@ -463,7 +432,7 @@ func TestAssumeUpdatePVCCache(t *testing.T) {
}
// Add old PVC
internalCache.add(pvc)
assumecache.AddTestObject(cache.AssumeCache, pvc)
if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil {
t.Fatalf("failed to get PVC after old PVC added: %v", err)
}

View File

@ -18,6 +18,7 @@ package volumebinding
import (
"context"
"errors"
"fmt"
"sort"
"strings"
@ -45,6 +46,7 @@ import (
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding/metrics"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
"k8s.io/kubernetes/pkg/volume/util"
)
@ -218,8 +220,8 @@ type volumeBinder struct {
nodeLister corelisters.NodeLister
csiNodeLister storagelisters.CSINodeLister
pvcCache PVCAssumeCache
pvCache PVAssumeCache
pvcCache *PVCAssumeCache
pvCache *PVAssumeCache
// Amount of time to wait for the bind operation to succeed
bindTimeout time.Duration
@ -720,7 +722,7 @@ func (b *volumeBinder) checkBindings(logger klog.Logger, pod *v1.Pod, bindings [
if pvc.Spec.VolumeName != "" {
pv, err := b.pvCache.GetAPIPV(pvc.Spec.VolumeName)
if err != nil {
if _, ok := err.(*errNotFound); ok {
if errors.Is(err, assumecache.ErrNotFound) {
// We tolerate NotFound error here, because PV is possibly
// not found because of API delay, we can check next time.
// And if PV does not exist because it's deleted, PVC will
@ -873,7 +875,7 @@ func (b *volumeBinder) checkBoundClaims(logger klog.Logger, claims []*v1.Persist
pvName := pvc.Spec.VolumeName
pv, err := b.pvCache.GetPV(pvName)
if err != nil {
if _, ok := err.(*errNotFound); ok {
if errors.Is(err, assumecache.ErrNotFound) {
err = nil
}
return true, false, err

View File

@ -47,6 +47,7 @@ import (
_ "k8s.io/klog/v2/ktesting/init"
"k8s.io/kubernetes/pkg/controller"
pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)
var (
@ -138,8 +139,6 @@ type testEnv struct {
internalPodInformer coreinformers.PodInformer
internalNodeInformer coreinformers.NodeInformer
internalCSINodeInformer storageinformers.CSINodeInformer
internalPVCache *assumeCache
internalPVCCache *assumeCache
// For CSIStorageCapacity feature testing:
internalCSIDriverInformer storageinformers.CSIDriverInformer
@ -258,18 +257,6 @@ func newTestBinder(t *testing.T, ctx context.Context) *testEnv {
t.Fatalf("Failed to convert to internal binder")
}
pvCache := internalBinder.pvCache
internalPVCache, ok := pvCache.(*pvAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to convert to internal PV cache")
}
pvcCache := internalBinder.pvcCache
internalPVCCache, ok := pvcCache.(*pvcAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to convert to internal PVC cache")
}
return &testEnv{
client: client,
reactor: reactor,
@ -278,8 +265,6 @@ func newTestBinder(t *testing.T, ctx context.Context) *testEnv {
internalPodInformer: podInformer,
internalNodeInformer: nodeInformer,
internalCSINodeInformer: csiNodeInformer,
internalPVCache: internalPVCache,
internalPVCCache: internalPVCCache,
internalCSIDriverInformer: csiDriverInformer,
internalCSIStorageCapacityInformer: csiStorageCapacityInformer,
@ -313,9 +298,8 @@ func (env *testEnv) addCSIStorageCapacities(capacities []*storagev1.CSIStorageCa
}
func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs []*v1.PersistentVolumeClaim) {
internalPVCCache := env.internalPVCCache
for _, pvc := range cachedPVCs {
internalPVCCache.add(pvc)
assumecache.AddTestObject(env.internalBinder.pvcCache.AssumeCache, pvc)
if apiPVCs == nil {
env.reactor.AddClaim(pvc)
}
@ -326,9 +310,8 @@ func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs [
}
func (env *testEnv) initVolumes(cachedPVs []*v1.PersistentVolume, apiPVs []*v1.PersistentVolume) {
internalPVCache := env.internalPVCache
for _, pv := range cachedPVs {
internalPVCache.add(pv)
assumecache.AddTestObject(env.internalBinder.pvCache.AssumeCache, pv)
if apiPVs == nil {
env.reactor.AddVolume(pv)
}
@ -349,7 +332,7 @@ func (env *testEnv) updateVolumes(ctx context.Context, pvs []*v1.PersistentVolum
}
return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 3*time.Second, false, func(ctx context.Context) (bool, error) {
for _, pv := range pvs {
obj, err := env.internalPVCache.GetAPIObj(pv.Name)
obj, err := env.internalBinder.pvCache.GetAPIObj(pv.Name)
if obj == nil || err != nil {
return false, nil
}
@ -375,7 +358,7 @@ func (env *testEnv) updateClaims(ctx context.Context, pvcs []*v1.PersistentVolum
}
return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 3*time.Second, false, func(ctx context.Context) (bool, error) {
for _, pvc := range pvcs {
obj, err := env.internalPVCCache.GetAPIObj(getPVCName(pvc))
obj, err := env.internalBinder.pvcCache.GetAPIObj(getPVCName(pvc))
if obj == nil || err != nil {
return false, nil
}
@ -393,13 +376,13 @@ func (env *testEnv) updateClaims(ctx context.Context, pvcs []*v1.PersistentVolum
func (env *testEnv) deleteVolumes(pvs []*v1.PersistentVolume) {
for _, pv := range pvs {
env.internalPVCache.delete(pv)
assumecache.DeleteTestObject(env.internalBinder.pvCache.AssumeCache, pv)
}
}
func (env *testEnv) deleteClaims(pvcs []*v1.PersistentVolumeClaim) {
for _, pvc := range pvcs {
env.internalPVCCache.delete(pvc)
assumecache.DeleteTestObject(env.internalBinder.pvcCache.AssumeCache, pvc)
}
}

View File

@ -0,0 +1,402 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package assumecache
import (
"errors"
"fmt"
"strconv"
"sync"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/tools/cache"
)
// Informer is the subset of [cache.SharedInformer] that NewAssumeCache depends upon.
type Informer interface {
AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error)
}
// AddTestObject adds an object to the assume cache.
// Only use this for unit testing!
func AddTestObject(cache *AssumeCache, obj interface{}) {
cache.add(obj)
}
// UpdateTestObject updates an object in the assume cache.
// Only use this for unit testing!
func UpdateTestObject(cache *AssumeCache, obj interface{}) {
cache.update(nil, obj)
}
// DeleteTestObject deletes object in the assume cache.
// Only use this for unit testing!
func DeleteTestObject(cache *AssumeCache, obj interface{}) {
cache.delete(obj)
}
// Sentinel errors that can be checked for with errors.Is.
var (
ErrWrongType = errors.New("object has wrong type")
ErrNotFound = errors.New("object not found")
ErrObjectName = errors.New("cannot determine object name")
)
type WrongTypeError struct {
TypeName string
Object interface{}
}
func (e WrongTypeError) Error() string {
return fmt.Sprintf("could not convert object to type %v: %+v", e.TypeName, e.Object)
}
func (e WrongTypeError) Is(err error) bool {
return err == ErrWrongType
}
type NotFoundError struct {
TypeName string
ObjectKey string
}
func (e NotFoundError) Error() string {
return fmt.Sprintf("could not find %v %q", e.TypeName, e.ObjectKey)
}
func (e NotFoundError) Is(err error) bool {
return err == ErrNotFound
}
type ObjectNameError struct {
DetailedErr error
}
func (e ObjectNameError) Error() string {
return fmt.Sprintf("failed to get object name: %v", e.DetailedErr)
}
func (e ObjectNameError) Is(err error) bool {
return err == ErrObjectName
}
// AssumeCache is a cache on top of the informer that allows for updating
// objects outside of informer events and also restoring the informer
// cache's version of the object. Objects are assumed to be
// Kubernetes API objects that are supported by [meta.Accessor].
//
// Objects can referenced via their key, with [cache.MetaNamespaceKeyFunc]
// as key function.
//
// AssumeCache stores two pointers to represent a single object:
// - The pointer to the informer object.
// - The pointer to the latest object, which could be the same as
// the informer object, or an in-memory object.
//
// An informer update always overrides the latest object pointer.
//
// Assume() only updates the latest object pointer.
// Restore() sets the latest object pointer back to the informer object.
// Get/List() always returns the latest object pointer.
type AssumeCache struct {
// The logger that was chosen when setting up the cache.
// Will be used for all operations.
logger klog.Logger
// Synchronizes updates to store
rwMutex sync.RWMutex
// describes the object stored
description string
// Stores objInfo pointers
store cache.Indexer
// Index function for object
indexFunc cache.IndexFunc
indexName string
}
type objInfo struct {
// name of the object
name string
// Latest version of object could be cached-only or from informer
latestObj interface{}
// Latest object from informer
apiObj interface{}
}
func objInfoKeyFunc(obj interface{}) (string, error) {
objInfo, ok := obj.(*objInfo)
if !ok {
return "", &WrongTypeError{TypeName: "objInfo", Object: obj}
}
return objInfo.name, nil
}
func (c *AssumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) {
objInfo, ok := obj.(*objInfo)
if !ok {
return []string{""}, &WrongTypeError{TypeName: "objInfo", Object: obj}
}
return c.indexFunc(objInfo.latestObj)
}
// NewAssumeCache creates an assume cache for general objects.
func NewAssumeCache(logger klog.Logger, informer Informer, description, indexName string, indexFunc cache.IndexFunc) *AssumeCache {
c := &AssumeCache{
logger: logger,
description: description,
indexFunc: indexFunc,
indexName: indexName,
}
indexers := cache.Indexers{}
if indexName != "" && indexFunc != nil {
indexers[indexName] = c.objInfoIndexFunc
}
c.store = cache.NewIndexer(objInfoKeyFunc, indexers)
// Unit tests don't use informers
if informer != nil {
// Cannot fail in practice?! No-one bothers checking the error.
_, _ = informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.add,
UpdateFunc: c.update,
DeleteFunc: c.delete,
},
)
}
return c
}
func (c *AssumeCache) add(obj interface{}) {
if obj == nil {
return
}
name, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
c.logger.Error(&ObjectNameError{err}, "Add failed")
return
}
c.rwMutex.Lock()
defer c.rwMutex.Unlock()
if objInfo, _ := c.getObjInfo(name); objInfo != nil {
newVersion, err := c.getObjVersion(name, obj)
if err != nil {
c.logger.Error(err, "Add failed: couldn't get object version")
return
}
storedVersion, err := c.getObjVersion(name, objInfo.latestObj)
if err != nil {
c.logger.Error(err, "Add failed: couldn't get stored object version")
return
}
// Only update object if version is newer.
// This is so we don't override assumed objects due to informer resync.
if newVersion <= storedVersion {
c.logger.V(10).Info("Skip adding object to assume cache because version is not newer than storedVersion", "description", c.description, "cacheKey", name, "newVersion", newVersion, "storedVersion", storedVersion)
return
}
}
objInfo := &objInfo{name: name, latestObj: obj, apiObj: obj}
if err = c.store.Update(objInfo); err != nil {
c.logger.Info("Error occurred while updating stored object", "err", err)
} else {
c.logger.V(10).Info("Adding object to assume cache", "description", c.description, "cacheKey", name, "assumeCache", obj)
}
}
func (c *AssumeCache) update(oldObj interface{}, newObj interface{}) {
c.add(newObj)
}
func (c *AssumeCache) delete(obj interface{}) {
if obj == nil {
return
}
name, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
c.logger.Error(&ObjectNameError{err}, "Failed to delete")
return
}
c.rwMutex.Lock()
defer c.rwMutex.Unlock()
objInfo := &objInfo{name: name}
err = c.store.Delete(objInfo)
if err != nil {
c.logger.Error(err, "Failed to delete", "description", c.description, "cacheKey", name)
}
}
func (c *AssumeCache) getObjVersion(name string, obj interface{}) (int64, error) {
objAccessor, err := meta.Accessor(obj)
if err != nil {
return -1, err
}
objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64)
if err != nil {
//nolint:errorlint // Intentionally not wrapping the error, the underlying error is an implementation detail.
return -1, fmt.Errorf("error parsing ResourceVersion %q for %v %q: %v", objAccessor.GetResourceVersion(), c.description, name, err)
}
return objResourceVersion, nil
}
func (c *AssumeCache) getObjInfo(key string) (*objInfo, error) {
obj, ok, err := c.store.GetByKey(key)
if err != nil {
return nil, err
}
if !ok {
return nil, &NotFoundError{TypeName: c.description, ObjectKey: key}
}
objInfo, ok := obj.(*objInfo)
if !ok {
return nil, &WrongTypeError{"objInfo", obj}
}
return objInfo, nil
}
// Get the object by its key.
func (c *AssumeCache) Get(key string) (interface{}, error) {
c.rwMutex.RLock()
defer c.rwMutex.RUnlock()
objInfo, err := c.getObjInfo(key)
if err != nil {
return nil, err
}
return objInfo.latestObj, nil
}
// GetAPIObj gets the informer cache's version by its key.
func (c *AssumeCache) GetAPIObj(key string) (interface{}, error) {
c.rwMutex.RLock()
defer c.rwMutex.RUnlock()
objInfo, err := c.getObjInfo(key)
if err != nil {
return nil, err
}
return objInfo.apiObj, nil
}
// List all the objects in the cache.
func (c *AssumeCache) List(indexObj interface{}) []interface{} {
c.rwMutex.RLock()
defer c.rwMutex.RUnlock()
allObjs := []interface{}{}
var objs []interface{}
if c.indexName != "" {
o, err := c.store.Index(c.indexName, &objInfo{latestObj: indexObj})
if err != nil {
c.logger.Error(err, "List index error")
return nil
}
objs = o
} else {
objs = c.store.List()
}
for _, obj := range objs {
objInfo, ok := obj.(*objInfo)
if !ok {
c.logger.Error(&WrongTypeError{TypeName: "objInfo", Object: obj}, "List error")
continue
}
allObjs = append(allObjs, objInfo.latestObj)
}
return allObjs
}
// Assume updates the object in-memory only.
//
// The version of the object must be greater or equal to
// the current object, otherwise an error is returned.
//
// Storing an object with the same version is supported
// by the assume cache, but suffers from a race: if an
// update is received via the informer while such an
// object is assumed, it gets dropped in favor of the
// newer object from the apiserver.
//
// Only assuming objects that were returned by an apiserver
// operation (Update, Patch) is safe.
func (c *AssumeCache) Assume(obj interface{}) error {
name, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
return &ObjectNameError{err}
}
c.rwMutex.Lock()
defer c.rwMutex.Unlock()
objInfo, err := c.getObjInfo(name)
if err != nil {
return err
}
newVersion, err := c.getObjVersion(name, obj)
if err != nil {
return err
}
storedVersion, err := c.getObjVersion(name, objInfo.latestObj)
if err != nil {
return err
}
if newVersion < storedVersion {
return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion)
}
// Only update the cached object
objInfo.latestObj = obj
c.logger.V(4).Info("Assumed object", "description", c.description, "cacheKey", name, "version", newVersion)
return nil
}
// Restore the informer cache's version of the object.
func (c *AssumeCache) Restore(objName string) {
c.rwMutex.Lock()
defer c.rwMutex.Unlock()
objInfo, err := c.getObjInfo(objName)
if err != nil {
// This could be expected if object got deleted
c.logger.V(5).Info("Restore object", "description", c.description, "cacheKey", objName, "err", err)
} else {
objInfo.latestObj = objInfo.apiObj
c.logger.V(4).Info("Restored object", "description", c.description, "cacheKey", objName)
}
}

View File

@ -0,0 +1,327 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package assumecache
import (
"fmt"
"slices"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/test/utils/ktesting"
)
// testInformer implements [Informer] and can be used to feed changes into an assume
// cache during unit testing. Only a single event handler is supported, which is
// sufficient for one assume cache.
type testInformer struct {
handler cache.ResourceEventHandler
}
func (i *testInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
i.handler = handler
return nil, nil
}
func (i *testInformer) add(obj interface{}) {
if i.handler == nil {
return
}
i.handler.OnAdd(obj, false)
}
func (i *testInformer) update(obj interface{}) {
if i.handler == nil {
return
}
i.handler.OnUpdate(nil, obj)
}
func (i *testInformer) delete(obj interface{}) {
if i.handler == nil {
return
}
i.handler.OnDelete(obj)
}
func makeObj(name, version, namespace string) metav1.Object {
return &metav1.ObjectMeta{
Name: name,
Namespace: namespace,
ResourceVersion: version,
}
}
func newTest(t *testing.T) (ktesting.TContext, *AssumeCache, *testInformer) {
return newTestWithIndexer(t, "", nil)
}
func newTestWithIndexer(t *testing.T, indexName string, indexFunc cache.IndexFunc) (ktesting.TContext, *AssumeCache, *testInformer) {
tCtx := ktesting.Init(t)
informer := new(testInformer)
cache := NewAssumeCache(tCtx.Logger(), informer, "TestObject", indexName, indexFunc)
return tCtx, cache, informer
}
func verify(tCtx ktesting.TContext, cache *AssumeCache, key string, expectedObject, expectedAPIObject interface{}) {
tCtx.Helper()
actualObject, err := cache.Get(key)
if err != nil {
tCtx.Fatalf("unexpected error retrieving object for key %s: %v", key, err)
}
if actualObject != expectedObject {
tCtx.Fatalf("Get() returned %v, expected %v", actualObject, expectedObject)
}
actualAPIObject, err := cache.GetAPIObj(key)
if err != nil {
tCtx.Fatalf("unexpected error retrieving API object for key %s: %v", key, err)
}
if actualAPIObject != expectedAPIObject {
tCtx.Fatalf("GetAPIObject() returned %v, expected %v", actualAPIObject, expectedAPIObject)
}
}
func verifyList(tCtx ktesting.TContext, assumeCache *AssumeCache, expectedObjs []interface{}, indexObj interface{}) {
actualObjs := assumeCache.List(indexObj)
diff := cmp.Diff(expectedObjs, actualObjs, cmpopts.SortSlices(func(x, y interface{}) bool {
xKey, err := cache.MetaNamespaceKeyFunc(x)
if err != nil {
tCtx.Fatalf("unexpected error determining key for %v: %v", x, err)
}
yKey, err := cache.MetaNamespaceKeyFunc(y)
if err != nil {
tCtx.Fatalf("unexpected error determining key for %v: %v", y, err)
}
return xKey < yKey
}))
if diff != "" {
tCtx.Fatalf("List() result differs (- expected, + actual):\n%s", diff)
}
}
func TestAssume(t *testing.T) {
scenarios := map[string]struct {
oldObj metav1.Object
newObj interface{}
expectErr error
}{
"success-same-version": {
oldObj: makeObj("pvc1", "5", ""),
newObj: makeObj("pvc1", "5", ""),
},
"success-new-higher-version": {
oldObj: makeObj("pvc1", "5", ""),
newObj: makeObj("pvc1", "6", ""),
},
"fail-old-not-found": {
oldObj: makeObj("pvc2", "5", ""),
newObj: makeObj("pvc1", "5", ""),
expectErr: ErrNotFound,
},
"fail-new-lower-version": {
oldObj: makeObj("pvc1", "5", ""),
newObj: makeObj("pvc1", "4", ""),
expectErr: cmpopts.AnyError,
},
"fail-new-bad-version": {
oldObj: makeObj("pvc1", "5", ""),
newObj: makeObj("pvc1", "a", ""),
expectErr: cmpopts.AnyError,
},
"fail-old-bad-version": {
oldObj: makeObj("pvc1", "a", ""),
newObj: makeObj("pvc1", "5", ""),
expectErr: cmpopts.AnyError,
},
"fail-new-bad-object": {
oldObj: makeObj("pvc1", "5", ""),
newObj: 1,
expectErr: ErrObjectName,
},
}
for name, scenario := range scenarios {
t.Run(name, func(t *testing.T) {
tCtx, cache, informer := newTest(t)
// Add old object to cache.
informer.add(scenario.oldObj)
verify(tCtx, cache, scenario.oldObj.GetName(), scenario.oldObj, scenario.oldObj)
// Assume new object.
err := cache.Assume(scenario.newObj)
if diff := cmp.Diff(scenario.expectErr, err, cmpopts.EquateErrors()); diff != "" {
t.Errorf("Assume() returned error: %v\ndiff (- expected, + actual):\n%s", err, diff)
}
// Check that Get returns correct object.
expectedObj := scenario.newObj
if scenario.expectErr != nil {
expectedObj = scenario.oldObj
}
verify(tCtx, cache, scenario.oldObj.GetName(), expectedObj, scenario.oldObj)
})
}
}
func TestRestore(t *testing.T) {
tCtx, cache, informer := newTest(t)
// This test assumes an object with the same version as the API object.
// The assume cache supports that, but doing so in real code suffers from
// a race: if an unrelated update is received from the apiserver while
// such an object is assumed, the local modification gets dropped.
oldObj := makeObj("pvc1", "5", "")
newObj := makeObj("pvc1", "5", "")
// Restore object that doesn't exist
cache.Restore("nothing")
// Add old object to cache.
informer.add(oldObj)
verify(ktesting.WithStep(tCtx, "after initial update"), cache, oldObj.GetName(), oldObj, oldObj)
// Restore object.
cache.Restore(oldObj.GetName())
verify(ktesting.WithStep(tCtx, "after initial Restore"), cache, oldObj.GetName(), oldObj, oldObj)
// Assume new object.
if err := cache.Assume(newObj); err != nil {
t.Fatalf("Assume() returned error %v", err)
}
verify(ktesting.WithStep(tCtx, "after Assume"), cache, oldObj.GetName(), newObj, oldObj)
// Restore object.
cache.Restore(oldObj.GetName())
verify(ktesting.WithStep(tCtx, "after second Restore"), cache, oldObj.GetName(), oldObj, oldObj)
}
func TestEvents(t *testing.T) {
tCtx, cache, informer := newTest(t)
oldObj := makeObj("pvc1", "5", "")
newObj := makeObj("pvc1", "6", "")
key := oldObj.GetName()
// Add old object to cache.
informer.add(oldObj)
verify(ktesting.WithStep(tCtx, "after initial update"), cache, key, oldObj, oldObj)
// Update object.
informer.update(newObj)
verify(ktesting.WithStep(tCtx, "after initial update"), cache, key, newObj, newObj)
// Some error cases (don't occur in practice).
informer.add(1)
verify(ktesting.WithStep(tCtx, "after nop add"), cache, key, newObj, newObj)
informer.add(nil)
verify(ktesting.WithStep(tCtx, "after nil add"), cache, key, newObj, newObj)
informer.update(oldObj)
verify(ktesting.WithStep(tCtx, "after nop update"), cache, key, newObj, newObj)
informer.update(nil)
verify(ktesting.WithStep(tCtx, "after nil update"), cache, key, newObj, newObj)
informer.delete(nil)
verify(ktesting.WithStep(tCtx, "after nop delete"), cache, key, newObj, newObj)
// Delete object.
informer.delete(oldObj)
_, err := cache.Get(key)
if diff := cmp.Diff(ErrNotFound, err, cmpopts.EquateErrors()); diff != "" {
t.Errorf("Get did not return expected error: %v\ndiff (- expected, + actual):\n%s", err, diff)
}
}
func TestListNoIndexer(t *testing.T) {
tCtx, cache, informer := newTest(t)
// Add a bunch of objects.
objs := make([]interface{}, 0, 10)
for i := 0; i < 10; i++ {
obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", "")
objs = append(objs, obj)
informer.add(obj)
}
// List them
verifyList(ktesting.WithStep(tCtx, "after add"), cache, objs, "")
// Update an object.
updatedObj := makeObj("test-pvc3", "2", "")
objs[3] = updatedObj
informer.update(updatedObj)
// List them
verifyList(ktesting.WithStep(tCtx, "after update"), cache, objs, "")
// Delete a PV
deletedObj := objs[7]
objs = slices.Delete(objs, 7, 8)
informer.delete(deletedObj)
// List them
verifyList(ktesting.WithStep(tCtx, "after delete"), cache, objs, "")
}
func TestListWithIndexer(t *testing.T) {
namespaceIndexer := func(obj interface{}) ([]string, error) {
objAccessor, err := meta.Accessor(obj)
if err != nil {
return nil, err
}
return []string{objAccessor.GetNamespace()}, nil
}
tCtx, cache, informer := newTestWithIndexer(t, "myNamespace", namespaceIndexer)
// Add a bunch of objects.
ns := "ns1"
objs := make([]interface{}, 0, 10)
for i := 0; i < 10; i++ {
obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", ns)
objs = append(objs, obj)
informer.add(obj)
}
// Add a bunch of other objects.
for i := 0; i < 10; i++ {
obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", "ns2")
informer.add(obj)
}
// List them
verifyList(ktesting.WithStep(tCtx, "after add"), cache, objs, objs[0])
// Update an object.
updatedObj := makeObj("test-pvc3", "2", ns)
objs[3] = updatedObj
informer.update(updatedObj)
// List them
verifyList(ktesting.WithStep(tCtx, "after update"), cache, objs, objs[0])
// Delete a PV
deletedObj := objs[7]
objs = slices.Delete(objs, 7, 8)
informer.delete(deletedObj)
// List them
verifyList(ktesting.WithStep(tCtx, "after delete"), cache, objs, objs[0])
}