mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-20 09:33:52 +00:00
Remove "pkg/controller/volume/scheduling" dependency from "pkg/scheduler/framework/plugins"
All dependencies of VolumeBinding plugin from "k8s.io/kubernetes/pkg/controller/volume/scheduling" package moved to "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" package: - whole file pkg/controller/volume/scheduling/scheduler_assume_cache.go - whole file pkg/controller/volume/scheduling/scheduler_assume_cache_test.go - whole file pkg/controller/volume/scheduling/scheduler_binder.go - whole file pkg/controller/volume/scheduling/scheduler_binder_fake.go - whole file pkg/controller/volume/scheduling/scheduler_binder_test.go Package "k8s.io/kubernetes/pkg/controller/volume/scheduling/metrics" moved to "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding/metrics" because it only used in VolumeBinding plugin and (e2e) tests. More described in issue #89930 and PR #102953. Signed-off-by: Konstantin Misyutin <konstantin.misyutin@huawei.com>
This commit is contained in:
@@ -1,9 +0,0 @@
|
||||
# See the OWNERS docs at https://go.k8s.io/owners
|
||||
|
||||
approvers:
|
||||
- msau42
|
||||
- cofyc
|
||||
reviewers:
|
||||
- msau42
|
||||
- cofyc
|
||||
- lichuqiang
|
@@ -1,68 +0,0 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"k8s.io/component-base/metrics"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
)
|
||||
|
||||
// VolumeSchedulerSubsystem - subsystem name used by scheduler
|
||||
const VolumeSchedulerSubsystem = "scheduler_volume"
|
||||
|
||||
var (
|
||||
// VolumeBindingRequestSchedulerBinderCache tracks the number of volume binder cache operations.
|
||||
VolumeBindingRequestSchedulerBinderCache = metrics.NewCounterVec(
|
||||
&metrics.CounterOpts{
|
||||
Subsystem: VolumeSchedulerSubsystem,
|
||||
Name: "binder_cache_requests_total",
|
||||
Help: "Total number for request volume binding cache",
|
||||
StabilityLevel: metrics.ALPHA,
|
||||
},
|
||||
[]string{"operation"},
|
||||
)
|
||||
// VolumeSchedulingStageLatency tracks the latency of volume scheduling operations.
|
||||
VolumeSchedulingStageLatency = metrics.NewHistogramVec(
|
||||
&metrics.HistogramOpts{
|
||||
Subsystem: VolumeSchedulerSubsystem,
|
||||
Name: "scheduling_duration_seconds",
|
||||
Help: "Volume scheduling stage latency (Deprecated since 1.19.0)",
|
||||
Buckets: metrics.ExponentialBuckets(1000, 2, 15),
|
||||
StabilityLevel: metrics.ALPHA,
|
||||
DeprecatedVersion: "1.19.0",
|
||||
},
|
||||
[]string{"operation"},
|
||||
)
|
||||
// VolumeSchedulingStageFailed tracks the number of failed volume scheduling operations.
|
||||
VolumeSchedulingStageFailed = metrics.NewCounterVec(
|
||||
&metrics.CounterOpts{
|
||||
Subsystem: VolumeSchedulerSubsystem,
|
||||
Name: "scheduling_stage_error_total",
|
||||
Help: "Volume scheduling stage error count",
|
||||
StabilityLevel: metrics.ALPHA,
|
||||
},
|
||||
[]string{"operation"},
|
||||
)
|
||||
)
|
||||
|
||||
// RegisterVolumeSchedulingMetrics is used for scheduler, because the volume binding cache is a library
|
||||
// used by scheduler process.
|
||||
func RegisterVolumeSchedulingMetrics() {
|
||||
legacyregistry.MustRegister(VolumeBindingRequestSchedulerBinderCache)
|
||||
legacyregistry.MustRegister(VolumeSchedulingStageLatency)
|
||||
legacyregistry.MustRegister(VolumeSchedulingStageFailed)
|
||||
}
|
@@ -1,456 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package scheduling
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
storagehelpers "k8s.io/component-helpers/storage/volume"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
// AssumeCache is a cache on top of the informer that allows for updating
|
||||
// objects outside of informer events and also restoring the informer
|
||||
// cache's version of the object. Objects are assumed to be
|
||||
// Kubernetes API objects that implement meta.Interface
|
||||
type AssumeCache interface {
|
||||
// Assume updates the object in-memory only
|
||||
Assume(obj interface{}) error
|
||||
|
||||
// Restore the informer cache's version of the object
|
||||
Restore(objName string)
|
||||
|
||||
// Get the object by name
|
||||
Get(objName string) (interface{}, error)
|
||||
|
||||
// Get the API object by name
|
||||
GetAPIObj(objName string) (interface{}, error)
|
||||
|
||||
// List all the objects in the cache
|
||||
List(indexObj interface{}) []interface{}
|
||||
}
|
||||
|
||||
type errWrongType struct {
|
||||
typeName string
|
||||
object interface{}
|
||||
}
|
||||
|
||||
func (e *errWrongType) Error() string {
|
||||
return fmt.Sprintf("could not convert object to type %v: %+v", e.typeName, e.object)
|
||||
}
|
||||
|
||||
type errNotFound struct {
|
||||
typeName string
|
||||
objectName string
|
||||
}
|
||||
|
||||
func (e *errNotFound) Error() string {
|
||||
return fmt.Sprintf("could not find %v %q", e.typeName, e.objectName)
|
||||
}
|
||||
|
||||
type errObjectName struct {
|
||||
detailedErr error
|
||||
}
|
||||
|
||||
func (e *errObjectName) Error() string {
|
||||
return fmt.Sprintf("failed to get object name: %v", e.detailedErr)
|
||||
}
|
||||
|
||||
// assumeCache stores two pointers to represent a single object:
|
||||
// * The pointer to the informer object.
|
||||
// * The pointer to the latest object, which could be the same as
|
||||
// the informer object, or an in-memory object.
|
||||
//
|
||||
// An informer update always overrides the latest object pointer.
|
||||
//
|
||||
// Assume() only updates the latest object pointer.
|
||||
// Restore() sets the latest object pointer back to the informer object.
|
||||
// Get/List() always returns the latest object pointer.
|
||||
type assumeCache struct {
|
||||
// Synchronizes updates to store
|
||||
rwMutex sync.RWMutex
|
||||
|
||||
// describes the object stored
|
||||
description string
|
||||
|
||||
// Stores objInfo pointers
|
||||
store cache.Indexer
|
||||
|
||||
// Index function for object
|
||||
indexFunc cache.IndexFunc
|
||||
indexName string
|
||||
}
|
||||
|
||||
type objInfo struct {
|
||||
// name of the object
|
||||
name string
|
||||
|
||||
// Latest version of object could be cached-only or from informer
|
||||
latestObj interface{}
|
||||
|
||||
// Latest object from informer
|
||||
apiObj interface{}
|
||||
}
|
||||
|
||||
func objInfoKeyFunc(obj interface{}) (string, error) {
|
||||
objInfo, ok := obj.(*objInfo)
|
||||
if !ok {
|
||||
return "", &errWrongType{"objInfo", obj}
|
||||
}
|
||||
return objInfo.name, nil
|
||||
}
|
||||
|
||||
func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) {
|
||||
objInfo, ok := obj.(*objInfo)
|
||||
if !ok {
|
||||
return []string{""}, &errWrongType{"objInfo", obj}
|
||||
}
|
||||
return c.indexFunc(objInfo.latestObj)
|
||||
}
|
||||
|
||||
// NewAssumeCache creates an assume cache for general objects.
|
||||
func NewAssumeCache(informer cache.SharedIndexInformer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache {
|
||||
c := &assumeCache{
|
||||
description: description,
|
||||
indexFunc: indexFunc,
|
||||
indexName: indexName,
|
||||
}
|
||||
indexers := cache.Indexers{}
|
||||
if indexName != "" && indexFunc != nil {
|
||||
indexers[indexName] = c.objInfoIndexFunc
|
||||
}
|
||||
c.store = cache.NewIndexer(objInfoKeyFunc, indexers)
|
||||
|
||||
// Unit tests don't use informers
|
||||
if informer != nil {
|
||||
informer.AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.add,
|
||||
UpdateFunc: c.update,
|
||||
DeleteFunc: c.delete,
|
||||
},
|
||||
)
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *assumeCache) add(obj interface{}) {
|
||||
if obj == nil {
|
||||
return
|
||||
}
|
||||
|
||||
name, err := cache.MetaNamespaceKeyFunc(obj)
|
||||
if err != nil {
|
||||
klog.Errorf("add failed: %v", &errObjectName{err})
|
||||
return
|
||||
}
|
||||
|
||||
c.rwMutex.Lock()
|
||||
defer c.rwMutex.Unlock()
|
||||
|
||||
if objInfo, _ := c.getObjInfo(name); objInfo != nil {
|
||||
newVersion, err := c.getObjVersion(name, obj)
|
||||
if err != nil {
|
||||
klog.Errorf("add: couldn't get object version: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
storedVersion, err := c.getObjVersion(name, objInfo.latestObj)
|
||||
if err != nil {
|
||||
klog.Errorf("add: couldn't get stored object version: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Only update object if version is newer.
|
||||
// This is so we don't override assumed objects due to informer resync.
|
||||
if newVersion <= storedVersion {
|
||||
klog.V(10).Infof("Skip adding %v %v to assume cache because version %v is not newer than %v", c.description, name, newVersion, storedVersion)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
objInfo := &objInfo{name: name, latestObj: obj, apiObj: obj}
|
||||
if err = c.store.Update(objInfo); err != nil {
|
||||
klog.Warningf("got error when updating stored object : %v", err)
|
||||
} else {
|
||||
klog.V(10).Infof("Adding %v %v to assume cache: %+v ", c.description, name, obj)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *assumeCache) update(oldObj interface{}, newObj interface{}) {
|
||||
c.add(newObj)
|
||||
}
|
||||
|
||||
func (c *assumeCache) delete(obj interface{}) {
|
||||
if obj == nil {
|
||||
return
|
||||
}
|
||||
|
||||
name, err := cache.MetaNamespaceKeyFunc(obj)
|
||||
if err != nil {
|
||||
klog.Errorf("delete failed: %v", &errObjectName{err})
|
||||
return
|
||||
}
|
||||
|
||||
c.rwMutex.Lock()
|
||||
defer c.rwMutex.Unlock()
|
||||
|
||||
objInfo := &objInfo{name: name}
|
||||
err = c.store.Delete(objInfo)
|
||||
if err != nil {
|
||||
klog.Errorf("delete: failed to delete %v %v: %v", c.description, name, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *assumeCache) getObjVersion(name string, obj interface{}) (int64, error) {
|
||||
objAccessor, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64)
|
||||
if err != nil {
|
||||
return -1, fmt.Errorf("error parsing ResourceVersion %q for %v %q: %s", objAccessor.GetResourceVersion(), c.description, name, err)
|
||||
}
|
||||
return objResourceVersion, nil
|
||||
}
|
||||
|
||||
func (c *assumeCache) getObjInfo(name string) (*objInfo, error) {
|
||||
obj, ok, err := c.store.GetByKey(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !ok {
|
||||
return nil, &errNotFound{c.description, name}
|
||||
}
|
||||
|
||||
objInfo, ok := obj.(*objInfo)
|
||||
if !ok {
|
||||
return nil, &errWrongType{"objInfo", obj}
|
||||
}
|
||||
return objInfo, nil
|
||||
}
|
||||
|
||||
func (c *assumeCache) Get(objName string) (interface{}, error) {
|
||||
c.rwMutex.RLock()
|
||||
defer c.rwMutex.RUnlock()
|
||||
|
||||
objInfo, err := c.getObjInfo(objName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return objInfo.latestObj, nil
|
||||
}
|
||||
|
||||
func (c *assumeCache) GetAPIObj(objName string) (interface{}, error) {
|
||||
c.rwMutex.RLock()
|
||||
defer c.rwMutex.RUnlock()
|
||||
|
||||
objInfo, err := c.getObjInfo(objName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return objInfo.apiObj, nil
|
||||
}
|
||||
|
||||
func (c *assumeCache) List(indexObj interface{}) []interface{} {
|
||||
c.rwMutex.RLock()
|
||||
defer c.rwMutex.RUnlock()
|
||||
|
||||
allObjs := []interface{}{}
|
||||
objs, err := c.store.Index(c.indexName, &objInfo{latestObj: indexObj})
|
||||
if err != nil {
|
||||
klog.Errorf("list index error: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, obj := range objs {
|
||||
objInfo, ok := obj.(*objInfo)
|
||||
if !ok {
|
||||
klog.Errorf("list error: %v", &errWrongType{"objInfo", obj})
|
||||
continue
|
||||
}
|
||||
allObjs = append(allObjs, objInfo.latestObj)
|
||||
}
|
||||
return allObjs
|
||||
}
|
||||
|
||||
func (c *assumeCache) Assume(obj interface{}) error {
|
||||
name, err := cache.MetaNamespaceKeyFunc(obj)
|
||||
if err != nil {
|
||||
return &errObjectName{err}
|
||||
}
|
||||
|
||||
c.rwMutex.Lock()
|
||||
defer c.rwMutex.Unlock()
|
||||
|
||||
objInfo, err := c.getObjInfo(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
newVersion, err := c.getObjVersion(name, obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
storedVersion, err := c.getObjVersion(name, objInfo.latestObj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if newVersion < storedVersion {
|
||||
return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion)
|
||||
}
|
||||
|
||||
// Only update the cached object
|
||||
objInfo.latestObj = obj
|
||||
klog.V(4).Infof("Assumed %v %q, version %v", c.description, name, newVersion)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *assumeCache) Restore(objName string) {
|
||||
c.rwMutex.Lock()
|
||||
defer c.rwMutex.Unlock()
|
||||
|
||||
objInfo, err := c.getObjInfo(objName)
|
||||
if err != nil {
|
||||
// This could be expected if object got deleted
|
||||
klog.V(5).Infof("Restore %v %q warning: %v", c.description, objName, err)
|
||||
} else {
|
||||
objInfo.latestObj = objInfo.apiObj
|
||||
klog.V(4).Infof("Restored %v %q", c.description, objName)
|
||||
}
|
||||
}
|
||||
|
||||
// PVAssumeCache is a AssumeCache for PersistentVolume objects
|
||||
type PVAssumeCache interface {
|
||||
AssumeCache
|
||||
|
||||
GetPV(pvName string) (*v1.PersistentVolume, error)
|
||||
GetAPIPV(pvName string) (*v1.PersistentVolume, error)
|
||||
ListPVs(storageClassName string) []*v1.PersistentVolume
|
||||
}
|
||||
|
||||
type pvAssumeCache struct {
|
||||
AssumeCache
|
||||
}
|
||||
|
||||
func pvStorageClassIndexFunc(obj interface{}) ([]string, error) {
|
||||
if pv, ok := obj.(*v1.PersistentVolume); ok {
|
||||
return []string{storagehelpers.GetPersistentVolumeClass(pv)}, nil
|
||||
}
|
||||
return []string{""}, fmt.Errorf("object is not a v1.PersistentVolume: %v", obj)
|
||||
}
|
||||
|
||||
// NewPVAssumeCache creates a PV assume cache.
|
||||
func NewPVAssumeCache(informer cache.SharedIndexInformer) PVAssumeCache {
|
||||
return &pvAssumeCache{NewAssumeCache(informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc)}
|
||||
}
|
||||
|
||||
func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) {
|
||||
obj, err := c.Get(pvName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pv, ok := obj.(*v1.PersistentVolume)
|
||||
if !ok {
|
||||
return nil, &errWrongType{"v1.PersistentVolume", obj}
|
||||
}
|
||||
return pv, nil
|
||||
}
|
||||
|
||||
func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) {
|
||||
obj, err := c.GetAPIObj(pvName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pv, ok := obj.(*v1.PersistentVolume)
|
||||
if !ok {
|
||||
return nil, &errWrongType{"v1.PersistentVolume", obj}
|
||||
}
|
||||
return pv, nil
|
||||
}
|
||||
|
||||
func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume {
|
||||
objs := c.List(&v1.PersistentVolume{
|
||||
Spec: v1.PersistentVolumeSpec{
|
||||
StorageClassName: storageClassName,
|
||||
},
|
||||
})
|
||||
pvs := []*v1.PersistentVolume{}
|
||||
for _, obj := range objs {
|
||||
pv, ok := obj.(*v1.PersistentVolume)
|
||||
if !ok {
|
||||
klog.Errorf("ListPVs: %v", &errWrongType{"v1.PersistentVolume", obj})
|
||||
continue
|
||||
}
|
||||
pvs = append(pvs, pv)
|
||||
}
|
||||
return pvs
|
||||
}
|
||||
|
||||
// PVCAssumeCache is a AssumeCache for PersistentVolumeClaim objects
|
||||
type PVCAssumeCache interface {
|
||||
AssumeCache
|
||||
|
||||
// GetPVC returns the PVC from the cache with given pvcKey.
|
||||
// pvcKey is the result of MetaNamespaceKeyFunc on PVC obj
|
||||
GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
|
||||
GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
|
||||
}
|
||||
|
||||
type pvcAssumeCache struct {
|
||||
AssumeCache
|
||||
}
|
||||
|
||||
// NewPVCAssumeCache creates a PVC assume cache.
|
||||
func NewPVCAssumeCache(informer cache.SharedIndexInformer) PVCAssumeCache {
|
||||
return &pvcAssumeCache{NewAssumeCache(informer, "v1.PersistentVolumeClaim", "", nil)}
|
||||
}
|
||||
|
||||
func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
|
||||
obj, err := c.Get(pvcKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pvc, ok := obj.(*v1.PersistentVolumeClaim)
|
||||
if !ok {
|
||||
return nil, &errWrongType{"v1.PersistentVolumeClaim", obj}
|
||||
}
|
||||
return pvc, nil
|
||||
}
|
||||
|
||||
func (c *pvcAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
|
||||
obj, err := c.GetAPIObj(pvcKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pvc, ok := obj.(*v1.PersistentVolumeClaim)
|
||||
if !ok {
|
||||
return nil, &errWrongType{"v1.PersistentVolumeClaim", obj}
|
||||
}
|
||||
return pvc, nil
|
||||
}
|
@@ -1,473 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package scheduling
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
|
||||
)
|
||||
|
||||
func makePV(name, version, storageClass string) *v1.PersistentVolume {
|
||||
return &v1.PersistentVolume{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
ResourceVersion: version,
|
||||
},
|
||||
Spec: v1.PersistentVolumeSpec{
|
||||
StorageClassName: storageClass,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) {
|
||||
pvList := cache.ListPVs(storageClassName)
|
||||
if len(pvList) != len(expectedPVs) {
|
||||
t.Errorf("ListPVs() returned %v PVs, expected %v", len(pvList), len(expectedPVs))
|
||||
}
|
||||
for _, pv := range pvList {
|
||||
expectedPV, ok := expectedPVs[pv.Name]
|
||||
if !ok {
|
||||
t.Errorf("ListPVs() returned unexpected PV %q", pv.Name)
|
||||
}
|
||||
if expectedPV != pv {
|
||||
t.Errorf("ListPVs() returned PV %p, expected %p", pv, expectedPV)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func verifyPV(cache PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error {
|
||||
pv, err := cache.GetPV(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if pv != expectedPV {
|
||||
return fmt.Errorf("GetPV() returned %p, expected %p", pv, expectedPV)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestAssumePV(t *testing.T) {
|
||||
scenarios := map[string]struct {
|
||||
oldPV *v1.PersistentVolume
|
||||
newPV *v1.PersistentVolume
|
||||
shouldSucceed bool
|
||||
}{
|
||||
"success-same-version": {
|
||||
oldPV: makePV("pv1", "5", ""),
|
||||
newPV: makePV("pv1", "5", ""),
|
||||
shouldSucceed: true,
|
||||
},
|
||||
"success-storageclass-same-version": {
|
||||
oldPV: makePV("pv1", "5", "class1"),
|
||||
newPV: makePV("pv1", "5", "class1"),
|
||||
shouldSucceed: true,
|
||||
},
|
||||
"success-new-higher-version": {
|
||||
oldPV: makePV("pv1", "5", ""),
|
||||
newPV: makePV("pv1", "6", ""),
|
||||
shouldSucceed: true,
|
||||
},
|
||||
"fail-old-not-found": {
|
||||
oldPV: makePV("pv2", "5", ""),
|
||||
newPV: makePV("pv1", "5", ""),
|
||||
shouldSucceed: false,
|
||||
},
|
||||
"fail-new-lower-version": {
|
||||
oldPV: makePV("pv1", "5", ""),
|
||||
newPV: makePV("pv1", "4", ""),
|
||||
shouldSucceed: false,
|
||||
},
|
||||
"fail-new-bad-version": {
|
||||
oldPV: makePV("pv1", "5", ""),
|
||||
newPV: makePV("pv1", "a", ""),
|
||||
shouldSucceed: false,
|
||||
},
|
||||
"fail-old-bad-version": {
|
||||
oldPV: makePV("pv1", "a", ""),
|
||||
newPV: makePV("pv1", "5", ""),
|
||||
shouldSucceed: false,
|
||||
},
|
||||
}
|
||||
|
||||
for name, scenario := range scenarios {
|
||||
cache := NewPVAssumeCache(nil)
|
||||
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
|
||||
if !ok {
|
||||
t.Fatalf("Failed to get internal cache")
|
||||
}
|
||||
|
||||
// Add oldPV to cache
|
||||
internalCache.add(scenario.oldPV)
|
||||
if err := verifyPV(cache, scenario.oldPV.Name, scenario.oldPV); err != nil {
|
||||
t.Errorf("Failed to GetPV() after initial update: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Assume newPV
|
||||
err := cache.Assume(scenario.newPV)
|
||||
if scenario.shouldSucceed && err != nil {
|
||||
t.Errorf("Test %q failed: Assume() returned error %v", name, err)
|
||||
}
|
||||
if !scenario.shouldSucceed && err == nil {
|
||||
t.Errorf("Test %q failed: Assume() returned success but expected error", name)
|
||||
}
|
||||
|
||||
// Check that GetPV returns correct PV
|
||||
expectedPV := scenario.newPV
|
||||
if !scenario.shouldSucceed {
|
||||
expectedPV = scenario.oldPV
|
||||
}
|
||||
if err := verifyPV(cache, scenario.oldPV.Name, expectedPV); err != nil {
|
||||
t.Errorf("Failed to GetPV() after initial update: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRestorePV(t *testing.T) {
|
||||
cache := NewPVAssumeCache(nil)
|
||||
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
|
||||
if !ok {
|
||||
t.Fatalf("Failed to get internal cache")
|
||||
}
|
||||
|
||||
oldPV := makePV("pv1", "5", "")
|
||||
newPV := makePV("pv1", "5", "")
|
||||
|
||||
// Restore PV that doesn't exist
|
||||
cache.Restore("nothing")
|
||||
|
||||
// Add oldPV to cache
|
||||
internalCache.add(oldPV)
|
||||
if err := verifyPV(cache, oldPV.Name, oldPV); err != nil {
|
||||
t.Fatalf("Failed to GetPV() after initial update: %v", err)
|
||||
}
|
||||
|
||||
// Restore PV
|
||||
cache.Restore(oldPV.Name)
|
||||
if err := verifyPV(cache, oldPV.Name, oldPV); err != nil {
|
||||
t.Fatalf("Failed to GetPV() after initial restore: %v", err)
|
||||
}
|
||||
|
||||
// Assume newPV
|
||||
if err := cache.Assume(newPV); err != nil {
|
||||
t.Fatalf("Assume() returned error %v", err)
|
||||
}
|
||||
if err := verifyPV(cache, oldPV.Name, newPV); err != nil {
|
||||
t.Fatalf("Failed to GetPV() after Assume: %v", err)
|
||||
}
|
||||
|
||||
// Restore PV
|
||||
cache.Restore(oldPV.Name)
|
||||
if err := verifyPV(cache, oldPV.Name, oldPV); err != nil {
|
||||
t.Fatalf("Failed to GetPV() after restore: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBasicPVCache(t *testing.T) {
|
||||
cache := NewPVAssumeCache(nil)
|
||||
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
|
||||
if !ok {
|
||||
t.Fatalf("Failed to get internal cache")
|
||||
}
|
||||
|
||||
// Get object that doesn't exist
|
||||
pv, err := cache.GetPV("nothere")
|
||||
if err == nil {
|
||||
t.Errorf("GetPV() returned unexpected success")
|
||||
}
|
||||
if pv != nil {
|
||||
t.Errorf("GetPV() returned unexpected PV %q", pv.Name)
|
||||
}
|
||||
|
||||
// Add a bunch of PVs
|
||||
pvs := map[string]*v1.PersistentVolume{}
|
||||
for i := 0; i < 10; i++ {
|
||||
pv := makePV(fmt.Sprintf("test-pv%v", i), "1", "")
|
||||
pvs[pv.Name] = pv
|
||||
internalCache.add(pv)
|
||||
}
|
||||
|
||||
// List them
|
||||
verifyListPVs(t, cache, pvs, "")
|
||||
|
||||
// Update a PV
|
||||
updatedPV := makePV("test-pv3", "2", "")
|
||||
pvs[updatedPV.Name] = updatedPV
|
||||
internalCache.update(nil, updatedPV)
|
||||
|
||||
// List them
|
||||
verifyListPVs(t, cache, pvs, "")
|
||||
|
||||
// Delete a PV
|
||||
deletedPV := pvs["test-pv7"]
|
||||
delete(pvs, deletedPV.Name)
|
||||
internalCache.delete(deletedPV)
|
||||
|
||||
// List them
|
||||
verifyListPVs(t, cache, pvs, "")
|
||||
}
|
||||
|
||||
func TestPVCacheWithStorageClasses(t *testing.T) {
|
||||
cache := NewPVAssumeCache(nil)
|
||||
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
|
||||
if !ok {
|
||||
t.Fatalf("Failed to get internal cache")
|
||||
}
|
||||
|
||||
// Add a bunch of PVs
|
||||
pvs1 := map[string]*v1.PersistentVolume{}
|
||||
for i := 0; i < 10; i++ {
|
||||
pv := makePV(fmt.Sprintf("test-pv%v", i), "1", "class1")
|
||||
pvs1[pv.Name] = pv
|
||||
internalCache.add(pv)
|
||||
}
|
||||
|
||||
// Add a bunch of PVs
|
||||
pvs2 := map[string]*v1.PersistentVolume{}
|
||||
for i := 0; i < 10; i++ {
|
||||
pv := makePV(fmt.Sprintf("test2-pv%v", i), "1", "class2")
|
||||
pvs2[pv.Name] = pv
|
||||
internalCache.add(pv)
|
||||
}
|
||||
|
||||
// List them
|
||||
verifyListPVs(t, cache, pvs1, "class1")
|
||||
verifyListPVs(t, cache, pvs2, "class2")
|
||||
|
||||
// Update a PV
|
||||
updatedPV := makePV("test-pv3", "2", "class1")
|
||||
pvs1[updatedPV.Name] = updatedPV
|
||||
internalCache.update(nil, updatedPV)
|
||||
|
||||
// List them
|
||||
verifyListPVs(t, cache, pvs1, "class1")
|
||||
verifyListPVs(t, cache, pvs2, "class2")
|
||||
|
||||
// Delete a PV
|
||||
deletedPV := pvs1["test-pv7"]
|
||||
delete(pvs1, deletedPV.Name)
|
||||
internalCache.delete(deletedPV)
|
||||
|
||||
// List them
|
||||
verifyListPVs(t, cache, pvs1, "class1")
|
||||
verifyListPVs(t, cache, pvs2, "class2")
|
||||
}
|
||||
|
||||
func TestAssumeUpdatePVCache(t *testing.T) {
|
||||
cache := NewPVAssumeCache(nil)
|
||||
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
|
||||
if !ok {
|
||||
t.Fatalf("Failed to get internal cache")
|
||||
}
|
||||
|
||||
pvName := "test-pv0"
|
||||
|
||||
// Add a PV
|
||||
pv := makePV(pvName, "1", "")
|
||||
internalCache.add(pv)
|
||||
if err := verifyPV(cache, pvName, pv); err != nil {
|
||||
t.Fatalf("failed to get PV: %v", err)
|
||||
}
|
||||
|
||||
// Assume PV
|
||||
newPV := pv.DeepCopy()
|
||||
newPV.Spec.ClaimRef = &v1.ObjectReference{Name: "test-claim"}
|
||||
if err := cache.Assume(newPV); err != nil {
|
||||
t.Fatalf("failed to assume PV: %v", err)
|
||||
}
|
||||
if err := verifyPV(cache, pvName, newPV); err != nil {
|
||||
t.Fatalf("failed to get PV after assume: %v", err)
|
||||
}
|
||||
|
||||
// Add old PV
|
||||
internalCache.add(pv)
|
||||
if err := verifyPV(cache, pvName, newPV); err != nil {
|
||||
t.Fatalf("failed to get PV after old PV added: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func makeClaim(name, version, namespace string) *v1.PersistentVolumeClaim {
|
||||
return &v1.PersistentVolumeClaim{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: namespace,
|
||||
ResourceVersion: version,
|
||||
Annotations: map[string]string{},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func verifyPVC(cache PVCAssumeCache, pvcKey string, expectedPVC *v1.PersistentVolumeClaim) error {
|
||||
pvc, err := cache.GetPVC(pvcKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if pvc != expectedPVC {
|
||||
return fmt.Errorf("GetPVC() returned %p, expected %p", pvc, expectedPVC)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestAssumePVC(t *testing.T) {
|
||||
scenarios := map[string]struct {
|
||||
oldPVC *v1.PersistentVolumeClaim
|
||||
newPVC *v1.PersistentVolumeClaim
|
||||
shouldSucceed bool
|
||||
}{
|
||||
"success-same-version": {
|
||||
oldPVC: makeClaim("pvc1", "5", "ns1"),
|
||||
newPVC: makeClaim("pvc1", "5", "ns1"),
|
||||
shouldSucceed: true,
|
||||
},
|
||||
"success-new-higher-version": {
|
||||
oldPVC: makeClaim("pvc1", "5", "ns1"),
|
||||
newPVC: makeClaim("pvc1", "6", "ns1"),
|
||||
shouldSucceed: true,
|
||||
},
|
||||
"fail-old-not-found": {
|
||||
oldPVC: makeClaim("pvc2", "5", "ns1"),
|
||||
newPVC: makeClaim("pvc1", "5", "ns1"),
|
||||
shouldSucceed: false,
|
||||
},
|
||||
"fail-new-lower-version": {
|
||||
oldPVC: makeClaim("pvc1", "5", "ns1"),
|
||||
newPVC: makeClaim("pvc1", "4", "ns1"),
|
||||
shouldSucceed: false,
|
||||
},
|
||||
"fail-new-bad-version": {
|
||||
oldPVC: makeClaim("pvc1", "5", "ns1"),
|
||||
newPVC: makeClaim("pvc1", "a", "ns1"),
|
||||
shouldSucceed: false,
|
||||
},
|
||||
"fail-old-bad-version": {
|
||||
oldPVC: makeClaim("pvc1", "a", "ns1"),
|
||||
newPVC: makeClaim("pvc1", "5", "ns1"),
|
||||
shouldSucceed: false,
|
||||
},
|
||||
}
|
||||
|
||||
for name, scenario := range scenarios {
|
||||
cache := NewPVCAssumeCache(nil)
|
||||
internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache)
|
||||
if !ok {
|
||||
t.Fatalf("Failed to get internal cache")
|
||||
}
|
||||
|
||||
// Add oldPVC to cache
|
||||
internalCache.add(scenario.oldPVC)
|
||||
if err := verifyPVC(cache, getPVCName(scenario.oldPVC), scenario.oldPVC); err != nil {
|
||||
t.Errorf("Failed to GetPVC() after initial update: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Assume newPVC
|
||||
err := cache.Assume(scenario.newPVC)
|
||||
if scenario.shouldSucceed && err != nil {
|
||||
t.Errorf("Test %q failed: Assume() returned error %v", name, err)
|
||||
}
|
||||
if !scenario.shouldSucceed && err == nil {
|
||||
t.Errorf("Test %q failed: Assume() returned success but expected error", name)
|
||||
}
|
||||
|
||||
// Check that GetPVC returns correct PVC
|
||||
expectedPV := scenario.newPVC
|
||||
if !scenario.shouldSucceed {
|
||||
expectedPV = scenario.oldPVC
|
||||
}
|
||||
if err := verifyPVC(cache, getPVCName(scenario.oldPVC), expectedPV); err != nil {
|
||||
t.Errorf("Failed to GetPVC() after initial update: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRestorePVC(t *testing.T) {
|
||||
cache := NewPVCAssumeCache(nil)
|
||||
internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache)
|
||||
if !ok {
|
||||
t.Fatalf("Failed to get internal cache")
|
||||
}
|
||||
|
||||
oldPVC := makeClaim("pvc1", "5", "ns1")
|
||||
newPVC := makeClaim("pvc1", "5", "ns1")
|
||||
|
||||
// Restore PVC that doesn't exist
|
||||
cache.Restore("nothing")
|
||||
|
||||
// Add oldPVC to cache
|
||||
internalCache.add(oldPVC)
|
||||
if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil {
|
||||
t.Fatalf("Failed to GetPVC() after initial update: %v", err)
|
||||
}
|
||||
|
||||
// Restore PVC
|
||||
cache.Restore(getPVCName(oldPVC))
|
||||
if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil {
|
||||
t.Fatalf("Failed to GetPVC() after initial restore: %v", err)
|
||||
}
|
||||
|
||||
// Assume newPVC
|
||||
if err := cache.Assume(newPVC); err != nil {
|
||||
t.Fatalf("Assume() returned error %v", err)
|
||||
}
|
||||
if err := verifyPVC(cache, getPVCName(oldPVC), newPVC); err != nil {
|
||||
t.Fatalf("Failed to GetPVC() after Assume: %v", err)
|
||||
}
|
||||
|
||||
// Restore PVC
|
||||
cache.Restore(getPVCName(oldPVC))
|
||||
if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil {
|
||||
t.Fatalf("Failed to GetPVC() after restore: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAssumeUpdatePVCCache(t *testing.T) {
|
||||
cache := NewPVCAssumeCache(nil)
|
||||
internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache)
|
||||
if !ok {
|
||||
t.Fatalf("Failed to get internal cache")
|
||||
}
|
||||
|
||||
pvcName := "test-pvc0"
|
||||
pvcNamespace := "test-ns"
|
||||
|
||||
// Add a PVC
|
||||
pvc := makeClaim(pvcName, "1", pvcNamespace)
|
||||
internalCache.add(pvc)
|
||||
if err := verifyPVC(cache, getPVCName(pvc), pvc); err != nil {
|
||||
t.Fatalf("failed to get PVC: %v", err)
|
||||
}
|
||||
|
||||
// Assume PVC
|
||||
newPVC := pvc.DeepCopy()
|
||||
newPVC.Annotations[pvutil.AnnSelectedNode] = "test-node"
|
||||
if err := cache.Assume(newPVC); err != nil {
|
||||
t.Fatalf("failed to assume PVC: %v", err)
|
||||
}
|
||||
if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil {
|
||||
t.Fatalf("failed to get PVC after assume: %v", err)
|
||||
}
|
||||
|
||||
// Add old PVC
|
||||
internalCache.add(pvc)
|
||||
if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil {
|
||||
t.Fatalf("failed to get PVC after old PVC added: %v", err)
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
@@ -1,68 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package scheduling
|
||||
|
||||
import v1 "k8s.io/api/core/v1"
|
||||
|
||||
// FakeVolumeBinderConfig holds configurations for fake volume binder.
|
||||
type FakeVolumeBinderConfig struct {
|
||||
AllBound bool
|
||||
FindReasons ConflictReasons
|
||||
FindErr error
|
||||
AssumeErr error
|
||||
BindErr error
|
||||
}
|
||||
|
||||
// NewFakeVolumeBinder sets up all the caches needed for the scheduler to make
|
||||
// topology-aware volume binding decisions.
|
||||
func NewFakeVolumeBinder(config *FakeVolumeBinderConfig) *FakeVolumeBinder {
|
||||
return &FakeVolumeBinder{
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
// FakeVolumeBinder represents a fake volume binder for testing.
|
||||
type FakeVolumeBinder struct {
|
||||
config *FakeVolumeBinderConfig
|
||||
AssumeCalled bool
|
||||
BindCalled bool
|
||||
}
|
||||
|
||||
// GetPodVolumes implements SchedulerVolumeBinder.GetPodVolumes.
|
||||
func (b *FakeVolumeBinder) GetPodVolumes(pod *v1.Pod) (boundClaims, unboundClaimsDelayBinding, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) {
|
||||
return nil, nil, nil, nil
|
||||
}
|
||||
|
||||
// FindPodVolumes implements SchedulerVolumeBinder.FindPodVolumes.
|
||||
func (b *FakeVolumeBinder) FindPodVolumes(pod *v1.Pod, _, _ []*v1.PersistentVolumeClaim, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error) {
|
||||
return nil, b.config.FindReasons, b.config.FindErr
|
||||
}
|
||||
|
||||
// AssumePodVolumes implements SchedulerVolumeBinder.AssumePodVolumes.
|
||||
func (b *FakeVolumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string, podVolumes *PodVolumes) (bool, error) {
|
||||
b.AssumeCalled = true
|
||||
return b.config.AllBound, b.config.AssumeErr
|
||||
}
|
||||
|
||||
// RevertAssumedPodVolumes implements SchedulerVolumeBinder.RevertAssumedPodVolumes
|
||||
func (b *FakeVolumeBinder) RevertAssumedPodVolumes(_ *PodVolumes) {}
|
||||
|
||||
// BindPodVolumes implements SchedulerVolumeBinder.BindPodVolumes.
|
||||
func (b *FakeVolumeBinder) BindPodVolumes(assumedPod *v1.Pod, podVolumes *PodVolumes) error {
|
||||
b.BindCalled = true
|
||||
return b.config.BindErr
|
||||
}
|
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user