scheduler: remove AssumeCache interface

There's no reason for having the interface because there is only one
implementation. Makes the implementation of the test functions a bit
simpler (no casting). They are still stand-alone functions instead of methods
because they should not be considered part of the "normal" API.
This commit is contained in:
Patrick Ohly 2024-04-25 11:36:17 +02:00
parent 26e0409c36
commit 7f54c5dfec
7 changed files with 88 additions and 123 deletions

View File

@ -302,7 +302,7 @@ type dynamicResources struct {
// When implementing cluster autoscaler support, this assume cache or // When implementing cluster autoscaler support, this assume cache or
// something like it (see https://github.com/kubernetes/kubernetes/pull/112202) // something like it (see https://github.com/kubernetes/kubernetes/pull/112202)
// might have to be managed by the cluster autoscaler. // might have to be managed by the cluster autoscaler.
claimAssumeCache assumecache.AssumeCache claimAssumeCache *assumecache.AssumeCache
// inFlightAllocations is map from claim UUIDs to claim objects for those claims // inFlightAllocations is map from claim UUIDs to claim objects for those claims
// for which allocation was triggered during a scheduling cycle and the // for which allocation was triggered during a scheduling cycle and the

View File

@ -27,16 +27,8 @@ import (
) )
// PVAssumeCache is a AssumeCache for PersistentVolume objects // PVAssumeCache is a AssumeCache for PersistentVolume objects
type PVAssumeCache interface { type PVAssumeCache struct {
assumecache.AssumeCache *assumecache.AssumeCache
GetPV(pvName string) (*v1.PersistentVolume, error)
GetAPIPV(pvName string) (*v1.PersistentVolume, error)
ListPVs(storageClassName string) []*v1.PersistentVolume
}
type pvAssumeCache struct {
assumecache.AssumeCache
logger klog.Logger logger klog.Logger
} }
@ -48,15 +40,15 @@ func pvStorageClassIndexFunc(obj interface{}) ([]string, error) {
} }
// NewPVAssumeCache creates a PV assume cache. // NewPVAssumeCache creates a PV assume cache.
func NewPVAssumeCache(logger klog.Logger, informer assumecache.Informer) PVAssumeCache { func NewPVAssumeCache(logger klog.Logger, informer assumecache.Informer) *PVAssumeCache {
logger = klog.LoggerWithName(logger, "PV Cache") logger = klog.LoggerWithName(logger, "PV Cache")
return &pvAssumeCache{ return &PVAssumeCache{
AssumeCache: assumecache.NewAssumeCache(logger, informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc), AssumeCache: assumecache.NewAssumeCache(logger, informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc),
logger: logger, logger: logger,
} }
} }
func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) { func (c *PVAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) {
obj, err := c.Get(pvName) obj, err := c.Get(pvName)
if err != nil { if err != nil {
return nil, err return nil, err
@ -69,7 +61,7 @@ func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) {
return pv, nil return pv, nil
} }
func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) { func (c *PVAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) {
obj, err := c.GetAPIObj(pvName) obj, err := c.GetAPIObj(pvName)
if err != nil { if err != nil {
return nil, err return nil, err
@ -81,7 +73,7 @@ func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) {
return pv, nil return pv, nil
} }
func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume { func (c *PVAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume {
objs := c.List(&v1.PersistentVolume{ objs := c.List(&v1.PersistentVolume{
Spec: v1.PersistentVolumeSpec{ Spec: v1.PersistentVolumeSpec{
StorageClassName: storageClassName, StorageClassName: storageClassName,
@ -100,30 +92,21 @@ func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume
} }
// PVCAssumeCache is a AssumeCache for PersistentVolumeClaim objects // PVCAssumeCache is a AssumeCache for PersistentVolumeClaim objects
type PVCAssumeCache interface { type PVCAssumeCache struct {
assumecache.AssumeCache *assumecache.AssumeCache
// GetPVC returns the PVC from the cache with given pvcKey.
// pvcKey is the result of MetaNamespaceKeyFunc on PVC obj
GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
}
type pvcAssumeCache struct {
assumecache.AssumeCache
logger klog.Logger logger klog.Logger
} }
// NewPVCAssumeCache creates a PVC assume cache. // NewPVCAssumeCache creates a PVC assume cache.
func NewPVCAssumeCache(logger klog.Logger, informer assumecache.Informer) PVCAssumeCache { func NewPVCAssumeCache(logger klog.Logger, informer assumecache.Informer) *PVCAssumeCache {
logger = klog.LoggerWithName(logger, "PVC Cache") logger = klog.LoggerWithName(logger, "PVC Cache")
return &pvcAssumeCache{ return &PVCAssumeCache{
AssumeCache: assumecache.NewAssumeCache(logger, informer, "v1.PersistentVolumeClaim", "", nil), AssumeCache: assumecache.NewAssumeCache(logger, informer, "v1.PersistentVolumeClaim", "", nil),
logger: logger, logger: logger,
} }
} }
func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { func (c *PVCAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
obj, err := c.Get(pvcKey) obj, err := c.Get(pvcKey)
if err != nil { if err != nil {
return nil, err return nil, err
@ -136,7 +119,7 @@ func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error
return pvc, nil return pvc, nil
} }
func (c *pvcAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { func (c *PVCAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
obj, err := c.GetAPIObj(pvcKey) obj, err := c.GetAPIObj(pvcKey)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -27,7 +27,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/util/assumecache" "k8s.io/kubernetes/pkg/scheduler/util/assumecache"
) )
func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) { func verifyListPVs(t *testing.T, cache *PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) {
pvList := cache.ListPVs(storageClassName) pvList := cache.ListPVs(storageClassName)
if len(pvList) != len(expectedPVs) { if len(pvList) != len(expectedPVs) {
t.Errorf("ListPVs() returned %v PVs, expected %v", len(pvList), len(expectedPVs)) t.Errorf("ListPVs() returned %v PVs, expected %v", len(pvList), len(expectedPVs))
@ -43,7 +43,7 @@ func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1
} }
} }
func verifyPV(cache PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error { func verifyPV(cache *PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error {
pv, err := cache.GetPV(name) pv, err := cache.GetPV(name)
if err != nil { if err != nil {
return err return err
@ -102,7 +102,7 @@ func TestAssumePV(t *testing.T) {
cache := NewPVAssumeCache(logger, nil) cache := NewPVAssumeCache(logger, nil)
// Add oldPV to cache // Add oldPV to cache
assumecache.AddTestObject(cache, scenario.oldPV) assumecache.AddTestObject(cache.AssumeCache, scenario.oldPV)
if err := verifyPV(cache, scenario.oldPV.Name, scenario.oldPV); err != nil { if err := verifyPV(cache, scenario.oldPV.Name, scenario.oldPV); err != nil {
t.Errorf("Failed to GetPV() after initial update: %v", err) t.Errorf("Failed to GetPV() after initial update: %v", err)
continue continue
@ -139,7 +139,7 @@ func TestRestorePV(t *testing.T) {
cache.Restore("nothing") cache.Restore("nothing")
// Add oldPV to cache // Add oldPV to cache
assumecache.AddTestObject(cache, oldPV) assumecache.AddTestObject(cache.AssumeCache, oldPV)
if err := verifyPV(cache, oldPV.Name, oldPV); err != nil { if err := verifyPV(cache, oldPV.Name, oldPV); err != nil {
t.Fatalf("Failed to GetPV() after initial update: %v", err) t.Fatalf("Failed to GetPV() after initial update: %v", err)
} }
@ -183,7 +183,7 @@ func TestBasicPVCache(t *testing.T) {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
pv := makePV(fmt.Sprintf("test-pv%v", i), "").withVersion("1").PersistentVolume pv := makePV(fmt.Sprintf("test-pv%v", i), "").withVersion("1").PersistentVolume
pvs[pv.Name] = pv pvs[pv.Name] = pv
assumecache.AddTestObject(cache, pv) assumecache.AddTestObject(cache.AssumeCache, pv)
} }
// List them // List them
@ -192,7 +192,7 @@ func TestBasicPVCache(t *testing.T) {
// Update a PV // Update a PV
updatedPV := makePV("test-pv3", "").withVersion("2").PersistentVolume updatedPV := makePV("test-pv3", "").withVersion("2").PersistentVolume
pvs[updatedPV.Name] = updatedPV pvs[updatedPV.Name] = updatedPV
assumecache.UpdateTestObject(cache, updatedPV) assumecache.UpdateTestObject(cache.AssumeCache, updatedPV)
// List them // List them
verifyListPVs(t, cache, pvs, "") verifyListPVs(t, cache, pvs, "")
@ -200,7 +200,7 @@ func TestBasicPVCache(t *testing.T) {
// Delete a PV // Delete a PV
deletedPV := pvs["test-pv7"] deletedPV := pvs["test-pv7"]
delete(pvs, deletedPV.Name) delete(pvs, deletedPV.Name)
assumecache.DeleteTestObject(cache, deletedPV) assumecache.DeleteTestObject(cache.AssumeCache, deletedPV)
// List them // List them
verifyListPVs(t, cache, pvs, "") verifyListPVs(t, cache, pvs, "")
@ -215,7 +215,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
pv := makePV(fmt.Sprintf("test-pv%v", i), "class1").withVersion("1").PersistentVolume pv := makePV(fmt.Sprintf("test-pv%v", i), "class1").withVersion("1").PersistentVolume
pvs1[pv.Name] = pv pvs1[pv.Name] = pv
assumecache.AddTestObject(cache, pv) assumecache.AddTestObject(cache.AssumeCache, pv)
} }
// Add a bunch of PVs // Add a bunch of PVs
@ -223,7 +223,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
pv := makePV(fmt.Sprintf("test2-pv%v", i), "class2").withVersion("1").PersistentVolume pv := makePV(fmt.Sprintf("test2-pv%v", i), "class2").withVersion("1").PersistentVolume
pvs2[pv.Name] = pv pvs2[pv.Name] = pv
assumecache.AddTestObject(cache, pv) assumecache.AddTestObject(cache.AssumeCache, pv)
} }
// List them // List them
@ -233,7 +233,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) {
// Update a PV // Update a PV
updatedPV := makePV("test-pv3", "class1").withVersion("2").PersistentVolume updatedPV := makePV("test-pv3", "class1").withVersion("2").PersistentVolume
pvs1[updatedPV.Name] = updatedPV pvs1[updatedPV.Name] = updatedPV
assumecache.UpdateTestObject(cache, updatedPV) assumecache.UpdateTestObject(cache.AssumeCache, updatedPV)
// List them // List them
verifyListPVs(t, cache, pvs1, "class1") verifyListPVs(t, cache, pvs1, "class1")
@ -242,7 +242,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) {
// Delete a PV // Delete a PV
deletedPV := pvs1["test-pv7"] deletedPV := pvs1["test-pv7"]
delete(pvs1, deletedPV.Name) delete(pvs1, deletedPV.Name)
assumecache.DeleteTestObject(cache, deletedPV) assumecache.DeleteTestObject(cache.AssumeCache, deletedPV)
// List them // List them
verifyListPVs(t, cache, pvs1, "class1") verifyListPVs(t, cache, pvs1, "class1")
@ -257,7 +257,7 @@ func TestAssumeUpdatePVCache(t *testing.T) {
// Add a PV // Add a PV
pv := makePV(pvName, "").withVersion("1").PersistentVolume pv := makePV(pvName, "").withVersion("1").PersistentVolume
assumecache.AddTestObject(cache, pv) assumecache.AddTestObject(cache.AssumeCache, pv)
if err := verifyPV(cache, pvName, pv); err != nil { if err := verifyPV(cache, pvName, pv); err != nil {
t.Fatalf("failed to get PV: %v", err) t.Fatalf("failed to get PV: %v", err)
} }
@ -273,7 +273,7 @@ func TestAssumeUpdatePVCache(t *testing.T) {
} }
// Add old PV // Add old PV
assumecache.AddTestObject(cache, pv) assumecache.AddTestObject(cache.AssumeCache, pv)
if err := verifyPV(cache, pvName, newPV); err != nil { if err := verifyPV(cache, pvName, newPV); err != nil {
t.Fatalf("failed to get PV after old PV added: %v", err) t.Fatalf("failed to get PV after old PV added: %v", err)
} }
@ -290,7 +290,7 @@ func makeClaim(name, version, namespace string) *v1.PersistentVolumeClaim {
} }
} }
func verifyPVC(cache PVCAssumeCache, pvcKey string, expectedPVC *v1.PersistentVolumeClaim) error { func verifyPVC(cache *PVCAssumeCache, pvcKey string, expectedPVC *v1.PersistentVolumeClaim) error {
pvc, err := cache.GetPVC(pvcKey) pvc, err := cache.GetPVC(pvcKey)
if err != nil { if err != nil {
return err return err
@ -344,7 +344,7 @@ func TestAssumePVC(t *testing.T) {
cache := NewPVCAssumeCache(logger, nil) cache := NewPVCAssumeCache(logger, nil)
// Add oldPVC to cache // Add oldPVC to cache
assumecache.AddTestObject(cache, scenario.oldPVC) assumecache.AddTestObject(cache.AssumeCache, scenario.oldPVC)
if err := verifyPVC(cache, getPVCName(scenario.oldPVC), scenario.oldPVC); err != nil { if err := verifyPVC(cache, getPVCName(scenario.oldPVC), scenario.oldPVC); err != nil {
t.Errorf("Failed to GetPVC() after initial update: %v", err) t.Errorf("Failed to GetPVC() after initial update: %v", err)
continue continue
@ -381,7 +381,7 @@ func TestRestorePVC(t *testing.T) {
cache.Restore("nothing") cache.Restore("nothing")
// Add oldPVC to cache // Add oldPVC to cache
assumecache.AddTestObject(cache, oldPVC) assumecache.AddTestObject(cache.AssumeCache, oldPVC)
if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil { if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil {
t.Fatalf("Failed to GetPVC() after initial update: %v", err) t.Fatalf("Failed to GetPVC() after initial update: %v", err)
} }
@ -416,7 +416,7 @@ func TestAssumeUpdatePVCCache(t *testing.T) {
// Add a PVC // Add a PVC
pvc := makeClaim(pvcName, "1", pvcNamespace) pvc := makeClaim(pvcName, "1", pvcNamespace)
assumecache.AddTestObject(cache, pvc) assumecache.AddTestObject(cache.AssumeCache, pvc)
if err := verifyPVC(cache, getPVCName(pvc), pvc); err != nil { if err := verifyPVC(cache, getPVCName(pvc), pvc); err != nil {
t.Fatalf("failed to get PVC: %v", err) t.Fatalf("failed to get PVC: %v", err)
} }
@ -432,7 +432,7 @@ func TestAssumeUpdatePVCCache(t *testing.T) {
} }
// Add old PVC // Add old PVC
assumecache.AddTestObject(cache, pvc) assumecache.AddTestObject(cache.AssumeCache, pvc)
if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil { if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil {
t.Fatalf("failed to get PVC after old PVC added: %v", err) t.Fatalf("failed to get PVC after old PVC added: %v", err)
} }

View File

@ -220,8 +220,8 @@ type volumeBinder struct {
nodeLister corelisters.NodeLister nodeLister corelisters.NodeLister
csiNodeLister storagelisters.CSINodeLister csiNodeLister storagelisters.CSINodeLister
pvcCache PVCAssumeCache pvcCache *PVCAssumeCache
pvCache PVAssumeCache pvCache *PVAssumeCache
// Amount of time to wait for the bind operation to succeed // Amount of time to wait for the bind operation to succeed
bindTimeout time.Duration bindTimeout time.Duration

View File

@ -299,7 +299,7 @@ func (env *testEnv) addCSIStorageCapacities(capacities []*storagev1.CSIStorageCa
func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs []*v1.PersistentVolumeClaim) { func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs []*v1.PersistentVolumeClaim) {
for _, pvc := range cachedPVCs { for _, pvc := range cachedPVCs {
assumecache.AddTestObject(env.internalBinder.pvcCache, pvc) assumecache.AddTestObject(env.internalBinder.pvcCache.AssumeCache, pvc)
if apiPVCs == nil { if apiPVCs == nil {
env.reactor.AddClaim(pvc) env.reactor.AddClaim(pvc)
} }
@ -311,7 +311,7 @@ func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs [
func (env *testEnv) initVolumes(cachedPVs []*v1.PersistentVolume, apiPVs []*v1.PersistentVolume) { func (env *testEnv) initVolumes(cachedPVs []*v1.PersistentVolume, apiPVs []*v1.PersistentVolume) {
for _, pv := range cachedPVs { for _, pv := range cachedPVs {
assumecache.AddTestObject(env.internalBinder.pvCache, pv) assumecache.AddTestObject(env.internalBinder.pvCache.AssumeCache, pv)
if apiPVs == nil { if apiPVs == nil {
env.reactor.AddVolume(pv) env.reactor.AddVolume(pv)
} }
@ -376,13 +376,13 @@ func (env *testEnv) updateClaims(ctx context.Context, pvcs []*v1.PersistentVolum
func (env *testEnv) deleteVolumes(pvs []*v1.PersistentVolume) { func (env *testEnv) deleteVolumes(pvs []*v1.PersistentVolume) {
for _, pv := range pvs { for _, pv := range pvs {
assumecache.DeleteTestObject(env.internalBinder.pvCache, pv) assumecache.DeleteTestObject(env.internalBinder.pvCache.AssumeCache, pv)
} }
} }
func (env *testEnv) deleteClaims(pvcs []*v1.PersistentVolumeClaim) { func (env *testEnv) deleteClaims(pvcs []*v1.PersistentVolumeClaim) {
for _, pvc := range pvcs { for _, pvc := range pvcs {
assumecache.DeleteTestObject(env.internalBinder.pvcCache, pvc) assumecache.DeleteTestObject(env.internalBinder.pvcCache.AssumeCache, pvc)
} }
} }

View File

@ -28,45 +28,6 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
) )
// AssumeCache is a cache on top of the informer that allows for updating
// objects outside of informer events and also restoring the informer
// cache's version of the object. Objects are assumed to be
// Kubernetes API objects that are supported by [meta.Accessor].
//
// Objects can referenced via their key, with [cache.MetaNamespaceKeyFunc]
// as key function.
type AssumeCache interface {
// Assume updates the object in-memory only.
//
// The version of the object must be greater or equal to
// the current object, otherwise an error is returned.
//
// Storing an object with the same version is supported
// by the assume cache, but suffers from a race: if an
// update is received via the informer while such an
// object is assumed, it gets dropped in favor of the
// newer object from the apiserver.
//
// Only assuming objects that were returned by an apiserver
// operation (Update, Patch) is safe.
Assume(obj interface{}) error
// Restore the informer cache's version of the object.
Restore(key string)
// Get the object by its key.
Get(key string) (interface{}, error)
// GetAPIObj gets the informer cache's version by its key.
GetAPIObj(key string) (interface{}, error)
// List all the objects in the cache.
List(indexObj interface{}) []interface{}
// getImplementation is used internally by [AddTestObject], [UpdateTestObject], [DeleteTestObject].
getImplementation() *assumeCache
}
// Informer is the subset of [cache.SharedInformer] that NewAssumeCache depends upon. // Informer is the subset of [cache.SharedInformer] that NewAssumeCache depends upon.
type Informer interface { type Informer interface {
AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error)
@ -74,20 +35,20 @@ type Informer interface {
// AddTestObject adds an object to the assume cache. // AddTestObject adds an object to the assume cache.
// Only use this for unit testing! // Only use this for unit testing!
func AddTestObject(cache AssumeCache, obj interface{}) { func AddTestObject(cache *AssumeCache, obj interface{}) {
cache.getImplementation().add(obj) cache.add(obj)
} }
// UpdateTestObject updates an object in the assume cache. // UpdateTestObject updates an object in the assume cache.
// Only use this for unit testing! // Only use this for unit testing!
func UpdateTestObject(cache AssumeCache, obj interface{}) { func UpdateTestObject(cache *AssumeCache, obj interface{}) {
cache.getImplementation().update(nil, obj) cache.update(nil, obj)
} }
// DeleteTestObject deletes object in the assume cache. // DeleteTestObject deletes object in the assume cache.
// Only use this for unit testing! // Only use this for unit testing!
func DeleteTestObject(cache AssumeCache, obj interface{}) { func DeleteTestObject(cache *AssumeCache, obj interface{}) {
cache.getImplementation().delete(obj) cache.delete(obj)
} }
// Sentinel errors that can be checked for with errors.Is. // Sentinel errors that can be checked for with errors.Is.
@ -135,7 +96,15 @@ func (e ObjectNameError) Is(err error) bool {
return err == ErrObjectName return err == ErrObjectName
} }
// assumeCache stores two pointers to represent a single object: // AssumeCache is a cache on top of the informer that allows for updating
// objects outside of informer events and also restoring the informer
// cache's version of the object. Objects are assumed to be
// Kubernetes API objects that are supported by [meta.Accessor].
//
// Objects can referenced via their key, with [cache.MetaNamespaceKeyFunc]
// as key function.
//
// AssumeCache stores two pointers to represent a single object:
// - The pointer to the informer object. // - The pointer to the informer object.
// - The pointer to the latest object, which could be the same as // - The pointer to the latest object, which could be the same as
// the informer object, or an in-memory object. // the informer object, or an in-memory object.
@ -145,7 +114,7 @@ func (e ObjectNameError) Is(err error) bool {
// Assume() only updates the latest object pointer. // Assume() only updates the latest object pointer.
// Restore() sets the latest object pointer back to the informer object. // Restore() sets the latest object pointer back to the informer object.
// Get/List() always returns the latest object pointer. // Get/List() always returns the latest object pointer.
type assumeCache struct { type AssumeCache struct {
// The logger that was chosen when setting up the cache. // The logger that was chosen when setting up the cache.
// Will be used for all operations. // Will be used for all operations.
logger klog.Logger logger klog.Logger
@ -183,7 +152,7 @@ func objInfoKeyFunc(obj interface{}) (string, error) {
return objInfo.name, nil return objInfo.name, nil
} }
func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) { func (c *AssumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) {
objInfo, ok := obj.(*objInfo) objInfo, ok := obj.(*objInfo)
if !ok { if !ok {
return []string{""}, &WrongTypeError{TypeName: "objInfo", Object: obj} return []string{""}, &WrongTypeError{TypeName: "objInfo", Object: obj}
@ -192,8 +161,8 @@ func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) {
} }
// NewAssumeCache creates an assume cache for general objects. // NewAssumeCache creates an assume cache for general objects.
func NewAssumeCache(logger klog.Logger, informer Informer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache { func NewAssumeCache(logger klog.Logger, informer Informer, description, indexName string, indexFunc cache.IndexFunc) *AssumeCache {
c := &assumeCache{ c := &AssumeCache{
logger: logger, logger: logger,
description: description, description: description,
indexFunc: indexFunc, indexFunc: indexFunc,
@ -219,11 +188,7 @@ func NewAssumeCache(logger klog.Logger, informer Informer, description, indexNam
return c return c
} }
func (c *assumeCache) getImplementation() *assumeCache { func (c *AssumeCache) add(obj interface{}) {
return c
}
func (c *assumeCache) add(obj interface{}) {
if obj == nil { if obj == nil {
return return
} }
@ -266,11 +231,11 @@ func (c *assumeCache) add(obj interface{}) {
} }
} }
func (c *assumeCache) update(oldObj interface{}, newObj interface{}) { func (c *AssumeCache) update(oldObj interface{}, newObj interface{}) {
c.add(newObj) c.add(newObj)
} }
func (c *assumeCache) delete(obj interface{}) { func (c *AssumeCache) delete(obj interface{}) {
if obj == nil { if obj == nil {
return return
} }
@ -291,7 +256,7 @@ func (c *assumeCache) delete(obj interface{}) {
} }
} }
func (c *assumeCache) getObjVersion(name string, obj interface{}) (int64, error) { func (c *AssumeCache) getObjVersion(name string, obj interface{}) (int64, error) {
objAccessor, err := meta.Accessor(obj) objAccessor, err := meta.Accessor(obj)
if err != nil { if err != nil {
return -1, err return -1, err
@ -305,7 +270,7 @@ func (c *assumeCache) getObjVersion(name string, obj interface{}) (int64, error)
return objResourceVersion, nil return objResourceVersion, nil
} }
func (c *assumeCache) getObjInfo(key string) (*objInfo, error) { func (c *AssumeCache) getObjInfo(key string) (*objInfo, error) {
obj, ok, err := c.store.GetByKey(key) obj, ok, err := c.store.GetByKey(key)
if err != nil { if err != nil {
return nil, err return nil, err
@ -321,7 +286,8 @@ func (c *assumeCache) getObjInfo(key string) (*objInfo, error) {
return objInfo, nil return objInfo, nil
} }
func (c *assumeCache) Get(key string) (interface{}, error) { // Get the object by its key.
func (c *AssumeCache) Get(key string) (interface{}, error) {
c.rwMutex.RLock() c.rwMutex.RLock()
defer c.rwMutex.RUnlock() defer c.rwMutex.RUnlock()
@ -332,7 +298,8 @@ func (c *assumeCache) Get(key string) (interface{}, error) {
return objInfo.latestObj, nil return objInfo.latestObj, nil
} }
func (c *assumeCache) GetAPIObj(key string) (interface{}, error) { // GetAPIObj gets the informer cache's version by its key.
func (c *AssumeCache) GetAPIObj(key string) (interface{}, error) {
c.rwMutex.RLock() c.rwMutex.RLock()
defer c.rwMutex.RUnlock() defer c.rwMutex.RUnlock()
@ -343,7 +310,8 @@ func (c *assumeCache) GetAPIObj(key string) (interface{}, error) {
return objInfo.apiObj, nil return objInfo.apiObj, nil
} }
func (c *assumeCache) List(indexObj interface{}) []interface{} { // List all the objects in the cache.
func (c *AssumeCache) List(indexObj interface{}) []interface{} {
c.rwMutex.RLock() c.rwMutex.RLock()
defer c.rwMutex.RUnlock() defer c.rwMutex.RUnlock()
@ -371,7 +339,20 @@ func (c *assumeCache) List(indexObj interface{}) []interface{} {
return allObjs return allObjs
} }
func (c *assumeCache) Assume(obj interface{}) error { // Assume updates the object in-memory only.
//
// The version of the object must be greater or equal to
// the current object, otherwise an error is returned.
//
// Storing an object with the same version is supported
// by the assume cache, but suffers from a race: if an
// update is received via the informer while such an
// object is assumed, it gets dropped in favor of the
// newer object from the apiserver.
//
// Only assuming objects that were returned by an apiserver
// operation (Update, Patch) is safe.
func (c *AssumeCache) Assume(obj interface{}) error {
name, err := cache.MetaNamespaceKeyFunc(obj) name, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil { if err != nil {
return &ObjectNameError{err} return &ObjectNameError{err}
@ -405,7 +386,8 @@ func (c *assumeCache) Assume(obj interface{}) error {
return nil return nil
} }
func (c *assumeCache) Restore(objName string) { // Restore the informer cache's version of the object.
func (c *AssumeCache) Restore(objName string) {
c.rwMutex.Lock() c.rwMutex.Lock()
defer c.rwMutex.Unlock() defer c.rwMutex.Unlock()

View File

@ -71,18 +71,18 @@ func makeObj(name, version, namespace string) metav1.Object {
} }
} }
func newTest(t *testing.T) (ktesting.TContext, AssumeCache, *testInformer) { func newTest(t *testing.T) (ktesting.TContext, *AssumeCache, *testInformer) {
return newTestWithIndexer(t, "", nil) return newTestWithIndexer(t, "", nil)
} }
func newTestWithIndexer(t *testing.T, indexName string, indexFunc cache.IndexFunc) (ktesting.TContext, AssumeCache, *testInformer) { func newTestWithIndexer(t *testing.T, indexName string, indexFunc cache.IndexFunc) (ktesting.TContext, *AssumeCache, *testInformer) {
tCtx := ktesting.Init(t) tCtx := ktesting.Init(t)
informer := new(testInformer) informer := new(testInformer)
cache := NewAssumeCache(tCtx.Logger(), informer, "TestObject", indexName, indexFunc) cache := NewAssumeCache(tCtx.Logger(), informer, "TestObject", indexName, indexFunc)
return tCtx, cache, informer return tCtx, cache, informer
} }
func verify(tCtx ktesting.TContext, cache AssumeCache, key string, expectedObject, expectedAPIObject interface{}) { func verify(tCtx ktesting.TContext, cache *AssumeCache, key string, expectedObject, expectedAPIObject interface{}) {
tCtx.Helper() tCtx.Helper()
actualObject, err := cache.Get(key) actualObject, err := cache.Get(key)
if err != nil { if err != nil {
@ -100,7 +100,7 @@ func verify(tCtx ktesting.TContext, cache AssumeCache, key string, expectedObjec
} }
} }
func verifyList(tCtx ktesting.TContext, assumeCache AssumeCache, expectedObjs []interface{}, indexObj interface{}) { func verifyList(tCtx ktesting.TContext, assumeCache *AssumeCache, expectedObjs []interface{}, indexObj interface{}) {
actualObjs := assumeCache.List(indexObj) actualObjs := assumeCache.List(indexObj)
diff := cmp.Diff(expectedObjs, actualObjs, cmpopts.SortSlices(func(x, y interface{}) bool { diff := cmp.Diff(expectedObjs, actualObjs, cmpopts.SortSlices(func(x, y interface{}) bool {
xKey, err := cache.MetaNamespaceKeyFunc(x) xKey, err := cache.MetaNamespaceKeyFunc(x)