scheduler: move assume cache to utils, part 2

This is now used by both the volumebinding and dynamicresources plugin, so
promoting it to a common helper package is better.

In terms of functionality, nothing was changed. Documentation got
updated (warns about storing locally modified objects, clarifies what the Get
parameters are). Code coverage should be a bit better than before (tested with
and without indexer, exercises event handlers, more error paths).

Checking for specific errors can now be done via errors.Is.
This commit is contained in:
Patrick Ohly
2024-03-28 20:28:29 +01:00
parent 910b90fca3
commit 26e0409c36
7 changed files with 519 additions and 616 deletions

View File

@@ -46,8 +46,8 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
"k8s.io/utils/ptr"
)
@@ -302,7 +302,7 @@ type dynamicResources struct {
// When implementing cluster autoscaler support, this assume cache or
// something like it (see https://github.com/kubernetes/kubernetes/pull/112202)
// might have to be managed by the cluster autoscaler.
claimAssumeCache volumebinding.AssumeCache
claimAssumeCache assumecache.AssumeCache
// inFlightAllocations is map from claim UUIDs to claim objects for those claims
// for which allocation was triggered during a scheduling cycle and the
@@ -355,7 +355,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
classParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Lister(),
resourceSliceLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceSlices().Lister(),
claimNameLookup: resourceclaim.NewNameLookup(fh.ClientSet()),
claimAssumeCache: volumebinding.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil),
claimAssumeCache: assumecache.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil),
}
return pl, nil

View File

@@ -0,0 +1,149 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumebinding
import (
"fmt"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
storagehelpers "k8s.io/component-helpers/storage/volume"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)
// PVAssumeCache is a AssumeCache for PersistentVolume objects
type PVAssumeCache interface {
assumecache.AssumeCache
GetPV(pvName string) (*v1.PersistentVolume, error)
GetAPIPV(pvName string) (*v1.PersistentVolume, error)
ListPVs(storageClassName string) []*v1.PersistentVolume
}
type pvAssumeCache struct {
assumecache.AssumeCache
logger klog.Logger
}
func pvStorageClassIndexFunc(obj interface{}) ([]string, error) {
if pv, ok := obj.(*v1.PersistentVolume); ok {
return []string{storagehelpers.GetPersistentVolumeClass(pv)}, nil
}
return []string{""}, fmt.Errorf("object is not a v1.PersistentVolume: %v", obj)
}
// NewPVAssumeCache creates a PV assume cache.
func NewPVAssumeCache(logger klog.Logger, informer assumecache.Informer) PVAssumeCache {
logger = klog.LoggerWithName(logger, "PV Cache")
return &pvAssumeCache{
AssumeCache: assumecache.NewAssumeCache(logger, informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc),
logger: logger,
}
}
func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) {
obj, err := c.Get(pvName)
if err != nil {
return nil, err
}
pv, ok := obj.(*v1.PersistentVolume)
if !ok {
return nil, &assumecache.WrongTypeError{TypeName: "v1.PersistentVolume", Object: obj}
}
return pv, nil
}
func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) {
obj, err := c.GetAPIObj(pvName)
if err != nil {
return nil, err
}
pv, ok := obj.(*v1.PersistentVolume)
if !ok {
return nil, &assumecache.WrongTypeError{TypeName: "v1.PersistentVolume", Object: obj}
}
return pv, nil
}
func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume {
objs := c.List(&v1.PersistentVolume{
Spec: v1.PersistentVolumeSpec{
StorageClassName: storageClassName,
},
})
pvs := []*v1.PersistentVolume{}
for _, obj := range objs {
pv, ok := obj.(*v1.PersistentVolume)
if !ok {
c.logger.Error(&assumecache.WrongTypeError{TypeName: "v1.PersistentVolume", Object: obj}, "ListPVs")
continue
}
pvs = append(pvs, pv)
}
return pvs
}
// PVCAssumeCache is a AssumeCache for PersistentVolumeClaim objects
type PVCAssumeCache interface {
assumecache.AssumeCache
// GetPVC returns the PVC from the cache with given pvcKey.
// pvcKey is the result of MetaNamespaceKeyFunc on PVC obj
GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
}
type pvcAssumeCache struct {
assumecache.AssumeCache
logger klog.Logger
}
// NewPVCAssumeCache creates a PVC assume cache.
func NewPVCAssumeCache(logger klog.Logger, informer assumecache.Informer) PVCAssumeCache {
logger = klog.LoggerWithName(logger, "PVC Cache")
return &pvcAssumeCache{
AssumeCache: assumecache.NewAssumeCache(logger, informer, "v1.PersistentVolumeClaim", "", nil),
logger: logger,
}
}
func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
obj, err := c.Get(pvcKey)
if err != nil {
return nil, err
}
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
return nil, &assumecache.WrongTypeError{TypeName: "v1.PersistentVolumeClaim", Object: obj}
}
return pvc, nil
}
func (c *pvcAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
obj, err := c.GetAPIObj(pvcKey)
if err != nil {
return nil, err
}
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
return nil, &assumecache.WrongTypeError{TypeName: "v1.PersistentVolumeClaim", Object: obj}
}
return pvc, nil
}

View File

@@ -24,6 +24,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/component-helpers/storage/volume"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)
func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) {
@@ -99,13 +100,9 @@ func TestAssumePV(t *testing.T) {
for name, scenario := range scenarios {
cache := NewPVAssumeCache(logger, nil)
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
// Add oldPV to cache
internalCache.add(scenario.oldPV)
assumecache.AddTestObject(cache, scenario.oldPV)
if err := verifyPV(cache, scenario.oldPV.Name, scenario.oldPV); err != nil {
t.Errorf("Failed to GetPV() after initial update: %v", err)
continue
@@ -134,10 +131,6 @@ func TestAssumePV(t *testing.T) {
func TestRestorePV(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cache := NewPVAssumeCache(logger, nil)
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
oldPV := makePV("pv1", "").withVersion("5").PersistentVolume
newPV := makePV("pv1", "").withVersion("5").PersistentVolume
@@ -146,7 +139,7 @@ func TestRestorePV(t *testing.T) {
cache.Restore("nothing")
// Add oldPV to cache
internalCache.add(oldPV)
assumecache.AddTestObject(cache, oldPV)
if err := verifyPV(cache, oldPV.Name, oldPV); err != nil {
t.Fatalf("Failed to GetPV() after initial update: %v", err)
}
@@ -175,10 +168,6 @@ func TestRestorePV(t *testing.T) {
func TestBasicPVCache(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cache := NewPVAssumeCache(logger, nil)
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
// Get object that doesn't exist
pv, err := cache.GetPV("nothere")
@@ -194,7 +183,7 @@ func TestBasicPVCache(t *testing.T) {
for i := 0; i < 10; i++ {
pv := makePV(fmt.Sprintf("test-pv%v", i), "").withVersion("1").PersistentVolume
pvs[pv.Name] = pv
internalCache.add(pv)
assumecache.AddTestObject(cache, pv)
}
// List them
@@ -203,7 +192,7 @@ func TestBasicPVCache(t *testing.T) {
// Update a PV
updatedPV := makePV("test-pv3", "").withVersion("2").PersistentVolume
pvs[updatedPV.Name] = updatedPV
internalCache.update(nil, updatedPV)
assumecache.UpdateTestObject(cache, updatedPV)
// List them
verifyListPVs(t, cache, pvs, "")
@@ -211,7 +200,7 @@ func TestBasicPVCache(t *testing.T) {
// Delete a PV
deletedPV := pvs["test-pv7"]
delete(pvs, deletedPV.Name)
internalCache.delete(deletedPV)
assumecache.DeleteTestObject(cache, deletedPV)
// List them
verifyListPVs(t, cache, pvs, "")
@@ -220,17 +209,13 @@ func TestBasicPVCache(t *testing.T) {
func TestPVCacheWithStorageClasses(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cache := NewPVAssumeCache(logger, nil)
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
// Add a bunch of PVs
pvs1 := map[string]*v1.PersistentVolume{}
for i := 0; i < 10; i++ {
pv := makePV(fmt.Sprintf("test-pv%v", i), "class1").withVersion("1").PersistentVolume
pvs1[pv.Name] = pv
internalCache.add(pv)
assumecache.AddTestObject(cache, pv)
}
// Add a bunch of PVs
@@ -238,7 +223,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) {
for i := 0; i < 10; i++ {
pv := makePV(fmt.Sprintf("test2-pv%v", i), "class2").withVersion("1").PersistentVolume
pvs2[pv.Name] = pv
internalCache.add(pv)
assumecache.AddTestObject(cache, pv)
}
// List them
@@ -248,7 +233,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) {
// Update a PV
updatedPV := makePV("test-pv3", "class1").withVersion("2").PersistentVolume
pvs1[updatedPV.Name] = updatedPV
internalCache.update(nil, updatedPV)
assumecache.UpdateTestObject(cache, updatedPV)
// List them
verifyListPVs(t, cache, pvs1, "class1")
@@ -257,7 +242,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) {
// Delete a PV
deletedPV := pvs1["test-pv7"]
delete(pvs1, deletedPV.Name)
internalCache.delete(deletedPV)
assumecache.DeleteTestObject(cache, deletedPV)
// List them
verifyListPVs(t, cache, pvs1, "class1")
@@ -267,16 +252,12 @@ func TestPVCacheWithStorageClasses(t *testing.T) {
func TestAssumeUpdatePVCache(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cache := NewPVAssumeCache(logger, nil)
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
pvName := "test-pv0"
// Add a PV
pv := makePV(pvName, "").withVersion("1").PersistentVolume
internalCache.add(pv)
assumecache.AddTestObject(cache, pv)
if err := verifyPV(cache, pvName, pv); err != nil {
t.Fatalf("failed to get PV: %v", err)
}
@@ -292,7 +273,7 @@ func TestAssumeUpdatePVCache(t *testing.T) {
}
// Add old PV
internalCache.add(pv)
assumecache.AddTestObject(cache, pv)
if err := verifyPV(cache, pvName, newPV); err != nil {
t.Fatalf("failed to get PV after old PV added: %v", err)
}
@@ -361,13 +342,9 @@ func TestAssumePVC(t *testing.T) {
for name, scenario := range scenarios {
cache := NewPVCAssumeCache(logger, nil)
internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
// Add oldPVC to cache
internalCache.add(scenario.oldPVC)
assumecache.AddTestObject(cache, scenario.oldPVC)
if err := verifyPVC(cache, getPVCName(scenario.oldPVC), scenario.oldPVC); err != nil {
t.Errorf("Failed to GetPVC() after initial update: %v", err)
continue
@@ -396,10 +373,6 @@ func TestAssumePVC(t *testing.T) {
func TestRestorePVC(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cache := NewPVCAssumeCache(logger, nil)
internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
oldPVC := makeClaim("pvc1", "5", "ns1")
newPVC := makeClaim("pvc1", "5", "ns1")
@@ -408,7 +381,7 @@ func TestRestorePVC(t *testing.T) {
cache.Restore("nothing")
// Add oldPVC to cache
internalCache.add(oldPVC)
assumecache.AddTestObject(cache, oldPVC)
if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil {
t.Fatalf("Failed to GetPVC() after initial update: %v", err)
}
@@ -437,17 +410,13 @@ func TestRestorePVC(t *testing.T) {
func TestAssumeUpdatePVCCache(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cache := NewPVCAssumeCache(logger, nil)
internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
pvcName := "test-pvc0"
pvcNamespace := "test-ns"
// Add a PVC
pvc := makeClaim(pvcName, "1", pvcNamespace)
internalCache.add(pvc)
assumecache.AddTestObject(cache, pvc)
if err := verifyPVC(cache, getPVCName(pvc), pvc); err != nil {
t.Fatalf("failed to get PVC: %v", err)
}
@@ -463,7 +432,7 @@ func TestAssumeUpdatePVCCache(t *testing.T) {
}
// Add old PVC
internalCache.add(pvc)
assumecache.AddTestObject(cache, pvc)
if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil {
t.Fatalf("failed to get PVC after old PVC added: %v", err)
}

View File

@@ -18,6 +18,7 @@ package volumebinding
import (
"context"
"errors"
"fmt"
"sort"
"strings"
@@ -45,6 +46,7 @@ import (
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding/metrics"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
"k8s.io/kubernetes/pkg/volume/util"
)
@@ -720,7 +722,7 @@ func (b *volumeBinder) checkBindings(logger klog.Logger, pod *v1.Pod, bindings [
if pvc.Spec.VolumeName != "" {
pv, err := b.pvCache.GetAPIPV(pvc.Spec.VolumeName)
if err != nil {
if _, ok := err.(*errNotFound); ok {
if errors.Is(err, assumecache.ErrNotFound) {
// We tolerate NotFound error here, because PV is possibly
// not found because of API delay, we can check next time.
// And if PV does not exist because it's deleted, PVC will
@@ -873,7 +875,7 @@ func (b *volumeBinder) checkBoundClaims(logger klog.Logger, claims []*v1.Persist
pvName := pvc.Spec.VolumeName
pv, err := b.pvCache.GetPV(pvName)
if err != nil {
if _, ok := err.(*errNotFound); ok {
if errors.Is(err, assumecache.ErrNotFound) {
err = nil
}
return true, false, err

View File

@@ -47,6 +47,7 @@ import (
_ "k8s.io/klog/v2/ktesting/init"
"k8s.io/kubernetes/pkg/controller"
pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)
var (
@@ -138,8 +139,6 @@ type testEnv struct {
internalPodInformer coreinformers.PodInformer
internalNodeInformer coreinformers.NodeInformer
internalCSINodeInformer storageinformers.CSINodeInformer
internalPVCache *assumeCache
internalPVCCache *assumeCache
// For CSIStorageCapacity feature testing:
internalCSIDriverInformer storageinformers.CSIDriverInformer
@@ -258,18 +257,6 @@ func newTestBinder(t *testing.T, ctx context.Context) *testEnv {
t.Fatalf("Failed to convert to internal binder")
}
pvCache := internalBinder.pvCache
internalPVCache, ok := pvCache.(*pvAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to convert to internal PV cache")
}
pvcCache := internalBinder.pvcCache
internalPVCCache, ok := pvcCache.(*pvcAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to convert to internal PVC cache")
}
return &testEnv{
client: client,
reactor: reactor,
@@ -278,8 +265,6 @@ func newTestBinder(t *testing.T, ctx context.Context) *testEnv {
internalPodInformer: podInformer,
internalNodeInformer: nodeInformer,
internalCSINodeInformer: csiNodeInformer,
internalPVCache: internalPVCache,
internalPVCCache: internalPVCCache,
internalCSIDriverInformer: csiDriverInformer,
internalCSIStorageCapacityInformer: csiStorageCapacityInformer,
@@ -313,9 +298,8 @@ func (env *testEnv) addCSIStorageCapacities(capacities []*storagev1.CSIStorageCa
}
func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs []*v1.PersistentVolumeClaim) {
internalPVCCache := env.internalPVCCache
for _, pvc := range cachedPVCs {
internalPVCCache.add(pvc)
assumecache.AddTestObject(env.internalBinder.pvcCache, pvc)
if apiPVCs == nil {
env.reactor.AddClaim(pvc)
}
@@ -326,9 +310,8 @@ func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs [
}
func (env *testEnv) initVolumes(cachedPVs []*v1.PersistentVolume, apiPVs []*v1.PersistentVolume) {
internalPVCache := env.internalPVCache
for _, pv := range cachedPVs {
internalPVCache.add(pv)
assumecache.AddTestObject(env.internalBinder.pvCache, pv)
if apiPVs == nil {
env.reactor.AddVolume(pv)
}
@@ -349,7 +332,7 @@ func (env *testEnv) updateVolumes(ctx context.Context, pvs []*v1.PersistentVolum
}
return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 3*time.Second, false, func(ctx context.Context) (bool, error) {
for _, pv := range pvs {
obj, err := env.internalPVCache.GetAPIObj(pv.Name)
obj, err := env.internalBinder.pvCache.GetAPIObj(pv.Name)
if obj == nil || err != nil {
return false, nil
}
@@ -375,7 +358,7 @@ func (env *testEnv) updateClaims(ctx context.Context, pvcs []*v1.PersistentVolum
}
return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 3*time.Second, false, func(ctx context.Context) (bool, error) {
for _, pvc := range pvcs {
obj, err := env.internalPVCCache.GetAPIObj(getPVCName(pvc))
obj, err := env.internalBinder.pvcCache.GetAPIObj(getPVCName(pvc))
if obj == nil || err != nil {
return false, nil
}
@@ -393,13 +376,13 @@ func (env *testEnv) updateClaims(ctx context.Context, pvcs []*v1.PersistentVolum
func (env *testEnv) deleteVolumes(pvs []*v1.PersistentVolume) {
for _, pv := range pvs {
env.internalPVCache.delete(pv)
assumecache.DeleteTestObject(env.internalBinder.pvCache, pv)
}
}
func (env *testEnv) deleteClaims(pvcs []*v1.PersistentVolumeClaim) {
for _, pvc := range pvcs {
env.internalPVCCache.delete(pvc)
assumecache.DeleteTestObject(env.internalBinder.pvcCache, pvc)
}
}

View File

@@ -14,66 +14,125 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package volumebinding
package assumecache
import (
"errors"
"fmt"
"strconv"
"sync"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/tools/cache"
storagehelpers "k8s.io/component-helpers/storage/volume"
)
// AssumeCache is a cache on top of the informer that allows for updating
// objects outside of informer events and also restoring the informer
// cache's version of the object. Objects are assumed to be
// Kubernetes API objects that implement meta.Interface
// cache's version of the object. Objects are assumed to be
// Kubernetes API objects that are supported by [meta.Accessor].
//
// Objects can referenced via their key, with [cache.MetaNamespaceKeyFunc]
// as key function.
type AssumeCache interface {
// Assume updates the object in-memory only
// Assume updates the object in-memory only.
//
// The version of the object must be greater or equal to
// the current object, otherwise an error is returned.
//
// Storing an object with the same version is supported
// by the assume cache, but suffers from a race: if an
// update is received via the informer while such an
// object is assumed, it gets dropped in favor of the
// newer object from the apiserver.
//
// Only assuming objects that were returned by an apiserver
// operation (Update, Patch) is safe.
Assume(obj interface{}) error
// Restore the informer cache's version of the object
Restore(objName string)
// Restore the informer cache's version of the object.
Restore(key string)
// Get the object by name
Get(objName string) (interface{}, error)
// Get the object by its key.
Get(key string) (interface{}, error)
// GetAPIObj gets the API object by name
GetAPIObj(objName string) (interface{}, error)
// GetAPIObj gets the informer cache's version by its key.
GetAPIObj(key string) (interface{}, error)
// List all the objects in the cache
// List all the objects in the cache.
List(indexObj interface{}) []interface{}
// getImplementation is used internally by [AddTestObject], [UpdateTestObject], [DeleteTestObject].
getImplementation() *assumeCache
}
type errWrongType struct {
typeName string
object interface{}
// Informer is the subset of [cache.SharedInformer] that NewAssumeCache depends upon.
type Informer interface {
AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error)
}
func (e *errWrongType) Error() string {
return fmt.Sprintf("could not convert object to type %v: %+v", e.typeName, e.object)
// AddTestObject adds an object to the assume cache.
// Only use this for unit testing!
func AddTestObject(cache AssumeCache, obj interface{}) {
cache.getImplementation().add(obj)
}
type errNotFound struct {
typeName string
objectName string
// UpdateTestObject updates an object in the assume cache.
// Only use this for unit testing!
func UpdateTestObject(cache AssumeCache, obj interface{}) {
cache.getImplementation().update(nil, obj)
}
func (e *errNotFound) Error() string {
return fmt.Sprintf("could not find %v %q", e.typeName, e.objectName)
// DeleteTestObject deletes object in the assume cache.
// Only use this for unit testing!
func DeleteTestObject(cache AssumeCache, obj interface{}) {
cache.getImplementation().delete(obj)
}
type errObjectName struct {
detailedErr error
// Sentinel errors that can be checked for with errors.Is.
var (
ErrWrongType = errors.New("object has wrong type")
ErrNotFound = errors.New("object not found")
ErrObjectName = errors.New("cannot determine object name")
)
type WrongTypeError struct {
TypeName string
Object interface{}
}
func (e *errObjectName) Error() string {
return fmt.Sprintf("failed to get object name: %v", e.detailedErr)
func (e WrongTypeError) Error() string {
return fmt.Sprintf("could not convert object to type %v: %+v", e.TypeName, e.Object)
}
func (e WrongTypeError) Is(err error) bool {
return err == ErrWrongType
}
type NotFoundError struct {
TypeName string
ObjectKey string
}
func (e NotFoundError) Error() string {
return fmt.Sprintf("could not find %v %q", e.TypeName, e.ObjectKey)
}
func (e NotFoundError) Is(err error) bool {
return err == ErrNotFound
}
type ObjectNameError struct {
DetailedErr error
}
func (e ObjectNameError) Error() string {
return fmt.Sprintf("failed to get object name: %v", e.DetailedErr)
}
func (e ObjectNameError) Is(err error) bool {
return err == ErrObjectName
}
// assumeCache stores two pointers to represent a single object:
@@ -119,7 +178,7 @@ type objInfo struct {
func objInfoKeyFunc(obj interface{}) (string, error) {
objInfo, ok := obj.(*objInfo)
if !ok {
return "", &errWrongType{"objInfo", obj}
return "", &WrongTypeError{TypeName: "objInfo", Object: obj}
}
return objInfo.name, nil
}
@@ -127,13 +186,13 @@ func objInfoKeyFunc(obj interface{}) (string, error) {
func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) {
objInfo, ok := obj.(*objInfo)
if !ok {
return []string{""}, &errWrongType{"objInfo", obj}
return []string{""}, &WrongTypeError{TypeName: "objInfo", Object: obj}
}
return c.indexFunc(objInfo.latestObj)
}
// NewAssumeCache creates an assume cache for general objects.
func NewAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache {
func NewAssumeCache(logger klog.Logger, informer Informer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache {
c := &assumeCache{
logger: logger,
description: description,
@@ -148,7 +207,8 @@ func NewAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer, desc
// Unit tests don't use informers
if informer != nil {
informer.AddEventHandler(
// Cannot fail in practice?! No-one bothers checking the error.
_, _ = informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.add,
UpdateFunc: c.update,
@@ -159,6 +219,10 @@ func NewAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer, desc
return c
}
func (c *assumeCache) getImplementation() *assumeCache {
return c
}
func (c *assumeCache) add(obj interface{}) {
if obj == nil {
return
@@ -166,7 +230,7 @@ func (c *assumeCache) add(obj interface{}) {
name, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
c.logger.Error(&errObjectName{err}, "Add failed")
c.logger.Error(&ObjectNameError{err}, "Add failed")
return
}
@@ -213,7 +277,7 @@ func (c *assumeCache) delete(obj interface{}) {
name, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
c.logger.Error(&errObjectName{err}, "Failed to delete")
c.logger.Error(&ObjectNameError{err}, "Failed to delete")
return
}
@@ -235,43 +299,44 @@ func (c *assumeCache) getObjVersion(name string, obj interface{}) (int64, error)
objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64)
if err != nil {
return -1, fmt.Errorf("error parsing ResourceVersion %q for %v %q: %s", objAccessor.GetResourceVersion(), c.description, name, err)
//nolint:errorlint // Intentionally not wrapping the error, the underlying error is an implementation detail.
return -1, fmt.Errorf("error parsing ResourceVersion %q for %v %q: %v", objAccessor.GetResourceVersion(), c.description, name, err)
}
return objResourceVersion, nil
}
func (c *assumeCache) getObjInfo(name string) (*objInfo, error) {
obj, ok, err := c.store.GetByKey(name)
func (c *assumeCache) getObjInfo(key string) (*objInfo, error) {
obj, ok, err := c.store.GetByKey(key)
if err != nil {
return nil, err
}
if !ok {
return nil, &errNotFound{c.description, name}
return nil, &NotFoundError{TypeName: c.description, ObjectKey: key}
}
objInfo, ok := obj.(*objInfo)
if !ok {
return nil, &errWrongType{"objInfo", obj}
return nil, &WrongTypeError{"objInfo", obj}
}
return objInfo, nil
}
func (c *assumeCache) Get(objName string) (interface{}, error) {
func (c *assumeCache) Get(key string) (interface{}, error) {
c.rwMutex.RLock()
defer c.rwMutex.RUnlock()
objInfo, err := c.getObjInfo(objName)
objInfo, err := c.getObjInfo(key)
if err != nil {
return nil, err
}
return objInfo.latestObj, nil
}
func (c *assumeCache) GetAPIObj(objName string) (interface{}, error) {
func (c *assumeCache) GetAPIObj(key string) (interface{}, error) {
c.rwMutex.RLock()
defer c.rwMutex.RUnlock()
objInfo, err := c.getObjInfo(objName)
objInfo, err := c.getObjInfo(key)
if err != nil {
return nil, err
}
@@ -298,7 +363,7 @@ func (c *assumeCache) List(indexObj interface{}) []interface{} {
for _, obj := range objs {
objInfo, ok := obj.(*objInfo)
if !ok {
c.logger.Error(&errWrongType{"objInfo", obj}, "List error")
c.logger.Error(&WrongTypeError{TypeName: "objInfo", Object: obj}, "List error")
continue
}
allObjs = append(allObjs, objInfo.latestObj)
@@ -309,7 +374,7 @@ func (c *assumeCache) List(indexObj interface{}) []interface{} {
func (c *assumeCache) Assume(obj interface{}) error {
name, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
return &errObjectName{err}
return &ObjectNameError{err}
}
c.rwMutex.Lock()
@@ -353,125 +418,3 @@ func (c *assumeCache) Restore(objName string) {
c.logger.V(4).Info("Restored object", "description", c.description, "cacheKey", objName)
}
}
// PVAssumeCache is a AssumeCache for PersistentVolume objects
type PVAssumeCache interface {
AssumeCache
GetPV(pvName string) (*v1.PersistentVolume, error)
GetAPIPV(pvName string) (*v1.PersistentVolume, error)
ListPVs(storageClassName string) []*v1.PersistentVolume
}
type pvAssumeCache struct {
AssumeCache
logger klog.Logger
}
func pvStorageClassIndexFunc(obj interface{}) ([]string, error) {
if pv, ok := obj.(*v1.PersistentVolume); ok {
return []string{storagehelpers.GetPersistentVolumeClass(pv)}, nil
}
return []string{""}, fmt.Errorf("object is not a v1.PersistentVolume: %v", obj)
}
// NewPVAssumeCache creates a PV assume cache.
func NewPVAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer) PVAssumeCache {
logger = klog.LoggerWithName(logger, "PV Cache")
return &pvAssumeCache{
AssumeCache: NewAssumeCache(logger, informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc),
logger: logger,
}
}
func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) {
obj, err := c.Get(pvName)
if err != nil {
return nil, err
}
pv, ok := obj.(*v1.PersistentVolume)
if !ok {
return nil, &errWrongType{"v1.PersistentVolume", obj}
}
return pv, nil
}
func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) {
obj, err := c.GetAPIObj(pvName)
if err != nil {
return nil, err
}
pv, ok := obj.(*v1.PersistentVolume)
if !ok {
return nil, &errWrongType{"v1.PersistentVolume", obj}
}
return pv, nil
}
func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume {
objs := c.List(&v1.PersistentVolume{
Spec: v1.PersistentVolumeSpec{
StorageClassName: storageClassName,
},
})
pvs := []*v1.PersistentVolume{}
for _, obj := range objs {
pv, ok := obj.(*v1.PersistentVolume)
if !ok {
c.logger.Error(&errWrongType{"v1.PersistentVolume", obj}, "ListPVs")
continue
}
pvs = append(pvs, pv)
}
return pvs
}
// PVCAssumeCache is a AssumeCache for PersistentVolumeClaim objects
type PVCAssumeCache interface {
AssumeCache
// GetPVC returns the PVC from the cache with given pvcKey.
// pvcKey is the result of MetaNamespaceKeyFunc on PVC obj
GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
}
type pvcAssumeCache struct {
AssumeCache
logger klog.Logger
}
// NewPVCAssumeCache creates a PVC assume cache.
func NewPVCAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer) PVCAssumeCache {
logger = klog.LoggerWithName(logger, "PVC Cache")
return &pvcAssumeCache{
AssumeCache: NewAssumeCache(logger, informer, "v1.PersistentVolumeClaim", "", nil),
logger: logger,
}
}
func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
obj, err := c.Get(pvcKey)
if err != nil {
return nil, err
}
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
return nil, &errWrongType{"v1.PersistentVolumeClaim", obj}
}
return pvc, nil
}
func (c *pvcAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
obj, err := c.GetAPIObj(pvcKey)
if err != nil {
return nil, err
}
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
return nil, &errWrongType{"v1.PersistentVolumeClaim", obj}
}
return pvc, nil
}

View File

@@ -14,457 +14,314 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package volumebinding
package assumecache
import (
"fmt"
"slices"
"testing"
v1 "k8s.io/api/core/v1"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/component-helpers/storage/volume"
"k8s.io/klog/v2/ktesting"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/test/utils/ktesting"
)
func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) {
pvList := cache.ListPVs(storageClassName)
if len(pvList) != len(expectedPVs) {
t.Errorf("ListPVs() returned %v PVs, expected %v", len(pvList), len(expectedPVs))
// testInformer implements [Informer] and can be used to feed changes into an assume
// cache during unit testing. Only a single event handler is supported, which is
// sufficient for one assume cache.
type testInformer struct {
handler cache.ResourceEventHandler
}
func (i *testInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
i.handler = handler
return nil, nil
}
func (i *testInformer) add(obj interface{}) {
if i.handler == nil {
return
}
for _, pv := range pvList {
expectedPV, ok := expectedPVs[pv.Name]
if !ok {
t.Errorf("ListPVs() returned unexpected PV %q", pv.Name)
}
if expectedPV != pv {
t.Errorf("ListPVs() returned PV %p, expected %p", pv, expectedPV)
}
i.handler.OnAdd(obj, false)
}
func (i *testInformer) update(obj interface{}) {
if i.handler == nil {
return
}
i.handler.OnUpdate(nil, obj)
}
func (i *testInformer) delete(obj interface{}) {
if i.handler == nil {
return
}
i.handler.OnDelete(obj)
}
func makeObj(name, version, namespace string) metav1.Object {
return &metav1.ObjectMeta{
Name: name,
Namespace: namespace,
ResourceVersion: version,
}
}
func verifyPV(cache PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error {
pv, err := cache.GetPV(name)
func newTest(t *testing.T) (ktesting.TContext, AssumeCache, *testInformer) {
return newTestWithIndexer(t, "", nil)
}
func newTestWithIndexer(t *testing.T, indexName string, indexFunc cache.IndexFunc) (ktesting.TContext, AssumeCache, *testInformer) {
tCtx := ktesting.Init(t)
informer := new(testInformer)
cache := NewAssumeCache(tCtx.Logger(), informer, "TestObject", indexName, indexFunc)
return tCtx, cache, informer
}
func verify(tCtx ktesting.TContext, cache AssumeCache, key string, expectedObject, expectedAPIObject interface{}) {
tCtx.Helper()
actualObject, err := cache.Get(key)
if err != nil {
return err
tCtx.Fatalf("unexpected error retrieving object for key %s: %v", key, err)
}
if pv != expectedPV {
return fmt.Errorf("GetPV() returned %p, expected %p", pv, expectedPV)
if actualObject != expectedObject {
tCtx.Fatalf("Get() returned %v, expected %v", actualObject, expectedObject)
}
actualAPIObject, err := cache.GetAPIObj(key)
if err != nil {
tCtx.Fatalf("unexpected error retrieving API object for key %s: %v", key, err)
}
if actualAPIObject != expectedAPIObject {
tCtx.Fatalf("GetAPIObject() returned %v, expected %v", actualAPIObject, expectedAPIObject)
}
return nil
}
func TestAssumePV(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
func verifyList(tCtx ktesting.TContext, assumeCache AssumeCache, expectedObjs []interface{}, indexObj interface{}) {
actualObjs := assumeCache.List(indexObj)
diff := cmp.Diff(expectedObjs, actualObjs, cmpopts.SortSlices(func(x, y interface{}) bool {
xKey, err := cache.MetaNamespaceKeyFunc(x)
if err != nil {
tCtx.Fatalf("unexpected error determining key for %v: %v", x, err)
}
yKey, err := cache.MetaNamespaceKeyFunc(y)
if err != nil {
tCtx.Fatalf("unexpected error determining key for %v: %v", y, err)
}
return xKey < yKey
}))
if diff != "" {
tCtx.Fatalf("List() result differs (- expected, + actual):\n%s", diff)
}
}
func TestAssume(t *testing.T) {
scenarios := map[string]struct {
oldPV *v1.PersistentVolume
newPV *v1.PersistentVolume
shouldSucceed bool
oldObj metav1.Object
newObj interface{}
expectErr error
}{
"success-same-version": {
oldPV: makePV("pv1", "").withVersion("5").PersistentVolume,
newPV: makePV("pv1", "").withVersion("5").PersistentVolume,
shouldSucceed: true,
},
"success-storageclass-same-version": {
oldPV: makePV("pv1", "class1").withVersion("5").PersistentVolume,
newPV: makePV("pv1", "class1").withVersion("5").PersistentVolume,
shouldSucceed: true,
oldObj: makeObj("pvc1", "5", ""),
newObj: makeObj("pvc1", "5", ""),
},
"success-new-higher-version": {
oldPV: makePV("pv1", "").withVersion("5").PersistentVolume,
newPV: makePV("pv1", "").withVersion("6").PersistentVolume,
shouldSucceed: true,
oldObj: makeObj("pvc1", "5", ""),
newObj: makeObj("pvc1", "6", ""),
},
"fail-old-not-found": {
oldPV: makePV("pv2", "").withVersion("5").PersistentVolume,
newPV: makePV("pv1", "").withVersion("5").PersistentVolume,
shouldSucceed: false,
oldObj: makeObj("pvc2", "5", ""),
newObj: makeObj("pvc1", "5", ""),
expectErr: ErrNotFound,
},
"fail-new-lower-version": {
oldPV: makePV("pv1", "").withVersion("5").PersistentVolume,
newPV: makePV("pv1", "").withVersion("4").PersistentVolume,
shouldSucceed: false,
oldObj: makeObj("pvc1", "5", ""),
newObj: makeObj("pvc1", "4", ""),
expectErr: cmpopts.AnyError,
},
"fail-new-bad-version": {
oldPV: makePV("pv1", "").withVersion("5").PersistentVolume,
newPV: makePV("pv1", "").withVersion("a").PersistentVolume,
shouldSucceed: false,
oldObj: makeObj("pvc1", "5", ""),
newObj: makeObj("pvc1", "a", ""),
expectErr: cmpopts.AnyError,
},
"fail-old-bad-version": {
oldPV: makePV("pv1", "").withVersion("a").PersistentVolume,
newPV: makePV("pv1", "").withVersion("5").PersistentVolume,
shouldSucceed: false,
oldObj: makeObj("pvc1", "a", ""),
newObj: makeObj("pvc1", "5", ""),
expectErr: cmpopts.AnyError,
},
"fail-new-bad-object": {
oldObj: makeObj("pvc1", "5", ""),
newObj: 1,
expectErr: ErrObjectName,
},
}
for name, scenario := range scenarios {
cache := NewPVAssumeCache(logger, nil)
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
t.Run(name, func(t *testing.T) {
tCtx, cache, informer := newTest(t)
// Add oldPV to cache
internalCache.add(scenario.oldPV)
if err := verifyPV(cache, scenario.oldPV.Name, scenario.oldPV); err != nil {
t.Errorf("Failed to GetPV() after initial update: %v", err)
continue
}
// Add old object to cache.
informer.add(scenario.oldObj)
verify(tCtx, cache, scenario.oldObj.GetName(), scenario.oldObj, scenario.oldObj)
// Assume newPV
err := cache.Assume(scenario.newPV)
if scenario.shouldSucceed && err != nil {
t.Errorf("Test %q failed: Assume() returned error %v", name, err)
}
if !scenario.shouldSucceed && err == nil {
t.Errorf("Test %q failed: Assume() returned success but expected error", name)
}
// Assume new object.
err := cache.Assume(scenario.newObj)
if diff := cmp.Diff(scenario.expectErr, err, cmpopts.EquateErrors()); diff != "" {
t.Errorf("Assume() returned error: %v\ndiff (- expected, + actual):\n%s", err, diff)
}
// Check that GetPV returns correct PV
expectedPV := scenario.newPV
if !scenario.shouldSucceed {
expectedPV = scenario.oldPV
}
if err := verifyPV(cache, scenario.oldPV.Name, expectedPV); err != nil {
t.Errorf("Failed to GetPV() after initial update: %v", err)
}
// Check that Get returns correct object.
expectedObj := scenario.newObj
if scenario.expectErr != nil {
expectedObj = scenario.oldObj
}
verify(tCtx, cache, scenario.oldObj.GetName(), expectedObj, scenario.oldObj)
})
}
}
func TestRestorePV(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cache := NewPVAssumeCache(logger, nil)
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
func TestRestore(t *testing.T) {
tCtx, cache, informer := newTest(t)
oldPV := makePV("pv1", "").withVersion("5").PersistentVolume
newPV := makePV("pv1", "").withVersion("5").PersistentVolume
// This test assumes an object with the same version as the API object.
// The assume cache supports that, but doing so in real code suffers from
// a race: if an unrelated update is received from the apiserver while
// such an object is assumed, the local modification gets dropped.
oldObj := makeObj("pvc1", "5", "")
newObj := makeObj("pvc1", "5", "")
// Restore PV that doesn't exist
// Restore object that doesn't exist
cache.Restore("nothing")
// Add oldPV to cache
internalCache.add(oldPV)
if err := verifyPV(cache, oldPV.Name, oldPV); err != nil {
t.Fatalf("Failed to GetPV() after initial update: %v", err)
}
// Add old object to cache.
informer.add(oldObj)
verify(ktesting.WithStep(tCtx, "after initial update"), cache, oldObj.GetName(), oldObj, oldObj)
// Restore PV
cache.Restore(oldPV.Name)
if err := verifyPV(cache, oldPV.Name, oldPV); err != nil {
t.Fatalf("Failed to GetPV() after initial restore: %v", err)
}
// Restore object.
cache.Restore(oldObj.GetName())
verify(ktesting.WithStep(tCtx, "after initial Restore"), cache, oldObj.GetName(), oldObj, oldObj)
// Assume newPV
if err := cache.Assume(newPV); err != nil {
// Assume new object.
if err := cache.Assume(newObj); err != nil {
t.Fatalf("Assume() returned error %v", err)
}
if err := verifyPV(cache, oldPV.Name, newPV); err != nil {
t.Fatalf("Failed to GetPV() after Assume: %v", err)
}
verify(ktesting.WithStep(tCtx, "after Assume"), cache, oldObj.GetName(), newObj, oldObj)
// Restore PV
cache.Restore(oldPV.Name)
if err := verifyPV(cache, oldPV.Name, oldPV); err != nil {
t.Fatalf("Failed to GetPV() after restore: %v", err)
// Restore object.
cache.Restore(oldObj.GetName())
verify(ktesting.WithStep(tCtx, "after second Restore"), cache, oldObj.GetName(), oldObj, oldObj)
}
func TestEvents(t *testing.T) {
tCtx, cache, informer := newTest(t)
oldObj := makeObj("pvc1", "5", "")
newObj := makeObj("pvc1", "6", "")
key := oldObj.GetName()
// Add old object to cache.
informer.add(oldObj)
verify(ktesting.WithStep(tCtx, "after initial update"), cache, key, oldObj, oldObj)
// Update object.
informer.update(newObj)
verify(ktesting.WithStep(tCtx, "after initial update"), cache, key, newObj, newObj)
// Some error cases (don't occur in practice).
informer.add(1)
verify(ktesting.WithStep(tCtx, "after nop add"), cache, key, newObj, newObj)
informer.add(nil)
verify(ktesting.WithStep(tCtx, "after nil add"), cache, key, newObj, newObj)
informer.update(oldObj)
verify(ktesting.WithStep(tCtx, "after nop update"), cache, key, newObj, newObj)
informer.update(nil)
verify(ktesting.WithStep(tCtx, "after nil update"), cache, key, newObj, newObj)
informer.delete(nil)
verify(ktesting.WithStep(tCtx, "after nop delete"), cache, key, newObj, newObj)
// Delete object.
informer.delete(oldObj)
_, err := cache.Get(key)
if diff := cmp.Diff(ErrNotFound, err, cmpopts.EquateErrors()); diff != "" {
t.Errorf("Get did not return expected error: %v\ndiff (- expected, + actual):\n%s", err, diff)
}
}
func TestBasicPVCache(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cache := NewPVAssumeCache(logger, nil)
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
func TestListNoIndexer(t *testing.T) {
tCtx, cache, informer := newTest(t)
// Get object that doesn't exist
pv, err := cache.GetPV("nothere")
if err == nil {
t.Errorf("GetPV() returned unexpected success")
}
if pv != nil {
t.Errorf("GetPV() returned unexpected PV %q", pv.Name)
}
// Add a bunch of PVs
pvs := map[string]*v1.PersistentVolume{}
// Add a bunch of objects.
objs := make([]interface{}, 0, 10)
for i := 0; i < 10; i++ {
pv := makePV(fmt.Sprintf("test-pv%v", i), "").withVersion("1").PersistentVolume
pvs[pv.Name] = pv
internalCache.add(pv)
obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", "")
objs = append(objs, obj)
informer.add(obj)
}
// List them
verifyListPVs(t, cache, pvs, "")
verifyList(ktesting.WithStep(tCtx, "after add"), cache, objs, "")
// Update a PV
updatedPV := makePV("test-pv3", "").withVersion("2").PersistentVolume
pvs[updatedPV.Name] = updatedPV
internalCache.update(nil, updatedPV)
// Update an object.
updatedObj := makeObj("test-pvc3", "2", "")
objs[3] = updatedObj
informer.update(updatedObj)
// List them
verifyListPVs(t, cache, pvs, "")
verifyList(ktesting.WithStep(tCtx, "after update"), cache, objs, "")
// Delete a PV
deletedPV := pvs["test-pv7"]
delete(pvs, deletedPV.Name)
internalCache.delete(deletedPV)
deletedObj := objs[7]
objs = slices.Delete(objs, 7, 8)
informer.delete(deletedObj)
// List them
verifyListPVs(t, cache, pvs, "")
verifyList(ktesting.WithStep(tCtx, "after delete"), cache, objs, "")
}
func TestPVCacheWithStorageClasses(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cache := NewPVAssumeCache(logger, nil)
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
func TestListWithIndexer(t *testing.T) {
namespaceIndexer := func(obj interface{}) ([]string, error) {
objAccessor, err := meta.Accessor(obj)
if err != nil {
return nil, err
}
return []string{objAccessor.GetNamespace()}, nil
}
tCtx, cache, informer := newTestWithIndexer(t, "myNamespace", namespaceIndexer)
// Add a bunch of objects.
ns := "ns1"
objs := make([]interface{}, 0, 10)
for i := 0; i < 10; i++ {
obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", ns)
objs = append(objs, obj)
informer.add(obj)
}
// Add a bunch of PVs
pvs1 := map[string]*v1.PersistentVolume{}
// Add a bunch of other objects.
for i := 0; i < 10; i++ {
pv := makePV(fmt.Sprintf("test-pv%v", i), "class1").withVersion("1").PersistentVolume
pvs1[pv.Name] = pv
internalCache.add(pv)
}
// Add a bunch of PVs
pvs2 := map[string]*v1.PersistentVolume{}
for i := 0; i < 10; i++ {
pv := makePV(fmt.Sprintf("test2-pv%v", i), "class2").withVersion("1").PersistentVolume
pvs2[pv.Name] = pv
internalCache.add(pv)
obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", "ns2")
informer.add(obj)
}
// List them
verifyListPVs(t, cache, pvs1, "class1")
verifyListPVs(t, cache, pvs2, "class2")
verifyList(ktesting.WithStep(tCtx, "after add"), cache, objs, objs[0])
// Update a PV
updatedPV := makePV("test-pv3", "class1").withVersion("2").PersistentVolume
pvs1[updatedPV.Name] = updatedPV
internalCache.update(nil, updatedPV)
// Update an object.
updatedObj := makeObj("test-pvc3", "2", ns)
objs[3] = updatedObj
informer.update(updatedObj)
// List them
verifyListPVs(t, cache, pvs1, "class1")
verifyListPVs(t, cache, pvs2, "class2")
verifyList(ktesting.WithStep(tCtx, "after update"), cache, objs, objs[0])
// Delete a PV
deletedPV := pvs1["test-pv7"]
delete(pvs1, deletedPV.Name)
internalCache.delete(deletedPV)
deletedObj := objs[7]
objs = slices.Delete(objs, 7, 8)
informer.delete(deletedObj)
// List them
verifyListPVs(t, cache, pvs1, "class1")
verifyListPVs(t, cache, pvs2, "class2")
}
func TestAssumeUpdatePVCache(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cache := NewPVAssumeCache(logger, nil)
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
pvName := "test-pv0"
// Add a PV
pv := makePV(pvName, "").withVersion("1").PersistentVolume
internalCache.add(pv)
if err := verifyPV(cache, pvName, pv); err != nil {
t.Fatalf("failed to get PV: %v", err)
}
// Assume PV
newPV := pv.DeepCopy()
newPV.Spec.ClaimRef = &v1.ObjectReference{Name: "test-claim"}
if err := cache.Assume(newPV); err != nil {
t.Fatalf("failed to assume PV: %v", err)
}
if err := verifyPV(cache, pvName, newPV); err != nil {
t.Fatalf("failed to get PV after assume: %v", err)
}
// Add old PV
internalCache.add(pv)
if err := verifyPV(cache, pvName, newPV); err != nil {
t.Fatalf("failed to get PV after old PV added: %v", err)
}
}
func makeClaim(name, version, namespace string) *v1.PersistentVolumeClaim {
return &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
ResourceVersion: version,
Annotations: map[string]string{},
},
}
}
func verifyPVC(cache PVCAssumeCache, pvcKey string, expectedPVC *v1.PersistentVolumeClaim) error {
pvc, err := cache.GetPVC(pvcKey)
if err != nil {
return err
}
if pvc != expectedPVC {
return fmt.Errorf("GetPVC() returned %p, expected %p", pvc, expectedPVC)
}
return nil
}
func TestAssumePVC(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
scenarios := map[string]struct {
oldPVC *v1.PersistentVolumeClaim
newPVC *v1.PersistentVolumeClaim
shouldSucceed bool
}{
"success-same-version": {
oldPVC: makeClaim("pvc1", "5", "ns1"),
newPVC: makeClaim("pvc1", "5", "ns1"),
shouldSucceed: true,
},
"success-new-higher-version": {
oldPVC: makeClaim("pvc1", "5", "ns1"),
newPVC: makeClaim("pvc1", "6", "ns1"),
shouldSucceed: true,
},
"fail-old-not-found": {
oldPVC: makeClaim("pvc2", "5", "ns1"),
newPVC: makeClaim("pvc1", "5", "ns1"),
shouldSucceed: false,
},
"fail-new-lower-version": {
oldPVC: makeClaim("pvc1", "5", "ns1"),
newPVC: makeClaim("pvc1", "4", "ns1"),
shouldSucceed: false,
},
"fail-new-bad-version": {
oldPVC: makeClaim("pvc1", "5", "ns1"),
newPVC: makeClaim("pvc1", "a", "ns1"),
shouldSucceed: false,
},
"fail-old-bad-version": {
oldPVC: makeClaim("pvc1", "a", "ns1"),
newPVC: makeClaim("pvc1", "5", "ns1"),
shouldSucceed: false,
},
}
for name, scenario := range scenarios {
cache := NewPVCAssumeCache(logger, nil)
internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
// Add oldPVC to cache
internalCache.add(scenario.oldPVC)
if err := verifyPVC(cache, getPVCName(scenario.oldPVC), scenario.oldPVC); err != nil {
t.Errorf("Failed to GetPVC() after initial update: %v", err)
continue
}
// Assume newPVC
err := cache.Assume(scenario.newPVC)
if scenario.shouldSucceed && err != nil {
t.Errorf("Test %q failed: Assume() returned error %v", name, err)
}
if !scenario.shouldSucceed && err == nil {
t.Errorf("Test %q failed: Assume() returned success but expected error", name)
}
// Check that GetPVC returns correct PVC
expectedPV := scenario.newPVC
if !scenario.shouldSucceed {
expectedPV = scenario.oldPVC
}
if err := verifyPVC(cache, getPVCName(scenario.oldPVC), expectedPV); err != nil {
t.Errorf("Failed to GetPVC() after initial update: %v", err)
}
}
}
func TestRestorePVC(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cache := NewPVCAssumeCache(logger, nil)
internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
oldPVC := makeClaim("pvc1", "5", "ns1")
newPVC := makeClaim("pvc1", "5", "ns1")
// Restore PVC that doesn't exist
cache.Restore("nothing")
// Add oldPVC to cache
internalCache.add(oldPVC)
if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil {
t.Fatalf("Failed to GetPVC() after initial update: %v", err)
}
// Restore PVC
cache.Restore(getPVCName(oldPVC))
if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil {
t.Fatalf("Failed to GetPVC() after initial restore: %v", err)
}
// Assume newPVC
if err := cache.Assume(newPVC); err != nil {
t.Fatalf("Assume() returned error %v", err)
}
if err := verifyPVC(cache, getPVCName(oldPVC), newPVC); err != nil {
t.Fatalf("Failed to GetPVC() after Assume: %v", err)
}
// Restore PVC
cache.Restore(getPVCName(oldPVC))
if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil {
t.Fatalf("Failed to GetPVC() after restore: %v", err)
}
}
func TestAssumeUpdatePVCCache(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cache := NewPVCAssumeCache(logger, nil)
internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
pvcName := "test-pvc0"
pvcNamespace := "test-ns"
// Add a PVC
pvc := makeClaim(pvcName, "1", pvcNamespace)
internalCache.add(pvc)
if err := verifyPVC(cache, getPVCName(pvc), pvc); err != nil {
t.Fatalf("failed to get PVC: %v", err)
}
// Assume PVC
newPVC := pvc.DeepCopy()
newPVC.Annotations[volume.AnnSelectedNode] = "test-node"
if err := cache.Assume(newPVC); err != nil {
t.Fatalf("failed to assume PVC: %v", err)
}
if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil {
t.Fatalf("failed to get PVC after assume: %v", err)
}
// Add old PVC
internalCache.add(pvc)
if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil {
t.Fatalf("failed to get PVC after old PVC added: %v", err)
}
verifyList(ktesting.WithStep(tCtx, "after delete"), cache, objs, objs[0])
}