mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Add assume cache for PVs
This commit is contained in:
parent
58823a75a4
commit
10800e68ac
318
pkg/controller/volume/persistentvolume/scheduler_assume_cache.go
Normal file
318
pkg/controller/volume/persistentvolume/scheduler_assume_cache.go
Normal file
@ -0,0 +1,318 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package persistentvolume
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
// AssumeCache is a cache on top of the informer that allows for updating
|
||||
// objects outside of informer events and also restoring the informer
|
||||
// cache's version of the object. Objects are assumed to be
|
||||
// Kubernetes API objects that implement meta.Interface
|
||||
type AssumeCache interface {
|
||||
// Assume updates the object in-memory only
|
||||
Assume(obj interface{}) error
|
||||
|
||||
// Restore the informer cache's version of the object
|
||||
Restore(objName string)
|
||||
|
||||
// Get the object by name
|
||||
Get(objName string) (interface{}, error)
|
||||
|
||||
// List all the objects in the cache
|
||||
List() []interface{}
|
||||
}
|
||||
|
||||
type errWrongType struct {
|
||||
typeName string
|
||||
object interface{}
|
||||
}
|
||||
|
||||
func (e *errWrongType) Error() string {
|
||||
return fmt.Sprintf("could not convert object to type %v: %+v", e.typeName, e.object)
|
||||
}
|
||||
|
||||
type errNotFound struct {
|
||||
typeName string
|
||||
objectName string
|
||||
}
|
||||
|
||||
func (e *errNotFound) Error() string {
|
||||
return fmt.Sprintf("could not find %v %q", e.typeName, e.objectName)
|
||||
}
|
||||
|
||||
type errObjectName struct {
|
||||
detailedErr error
|
||||
}
|
||||
|
||||
func (e *errObjectName) Error() string {
|
||||
return fmt.Sprintf("failed to get object name: %v", e.detailedErr)
|
||||
}
|
||||
|
||||
// assumeCache stores two pointers to represent a single object:
|
||||
// * The pointer to the informer object.
|
||||
// * The pointer to the latest object, which could be the same as
|
||||
// the informer object, or an in-memory object.
|
||||
//
|
||||
// An informer update always overrides the latest object pointer.
|
||||
//
|
||||
// Assume() only updates the latest object pointer.
|
||||
// Restore() sets the latest object pointer back to the informer object.
|
||||
// Get/List() always returns the latest object pointer.
|
||||
type assumeCache struct {
|
||||
mutex sync.Mutex
|
||||
|
||||
// describes the object stored
|
||||
description string
|
||||
|
||||
// Stores objInfo pointers
|
||||
store cache.Store
|
||||
}
|
||||
|
||||
type objInfo struct {
|
||||
// name of the object
|
||||
name string
|
||||
|
||||
// Latest version of object could be cached-only or from informer
|
||||
latestObj interface{}
|
||||
|
||||
// Latest object from informer
|
||||
apiObj interface{}
|
||||
}
|
||||
|
||||
func objInfoKeyFunc(obj interface{}) (string, error) {
|
||||
objInfo, ok := obj.(*objInfo)
|
||||
if !ok {
|
||||
return "", &errWrongType{"objInfo", obj}
|
||||
}
|
||||
return objInfo.name, nil
|
||||
}
|
||||
|
||||
func NewAssumeCache(informer cache.SharedIndexInformer, description string) *assumeCache {
|
||||
// TODO: index by storageclass
|
||||
c := &assumeCache{store: cache.NewStore(objInfoKeyFunc), description: description}
|
||||
|
||||
// Unit tests don't use informers
|
||||
if informer != nil {
|
||||
informer.AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.add,
|
||||
UpdateFunc: c.update,
|
||||
DeleteFunc: c.delete,
|
||||
},
|
||||
)
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *assumeCache) add(obj interface{}) {
|
||||
if obj == nil {
|
||||
return
|
||||
}
|
||||
|
||||
name, err := cache.MetaNamespaceKeyFunc(obj)
|
||||
if err != nil {
|
||||
glog.Errorf("add failed: %v", &errObjectName{err})
|
||||
return
|
||||
}
|
||||
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
objInfo := &objInfo{name: name, latestObj: obj, apiObj: obj}
|
||||
c.store.Update(objInfo)
|
||||
}
|
||||
|
||||
func (c *assumeCache) update(oldObj interface{}, newObj interface{}) {
|
||||
c.add(newObj)
|
||||
}
|
||||
|
||||
func (c *assumeCache) delete(obj interface{}) {
|
||||
if obj == nil {
|
||||
return
|
||||
}
|
||||
|
||||
name, err := cache.MetaNamespaceKeyFunc(obj)
|
||||
if err != nil {
|
||||
glog.Errorf("delete failed: %v", &errObjectName{err})
|
||||
return
|
||||
}
|
||||
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
objInfo := &objInfo{name: name}
|
||||
err = c.store.Delete(objInfo)
|
||||
if err != nil {
|
||||
glog.Errorf("delete: failed to delete %v %v: %v", c.description, name, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *assumeCache) getObjVersion(name string, obj interface{}) (int64, error) {
|
||||
objAccessor, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64)
|
||||
if err != nil {
|
||||
return -1, fmt.Errorf("error parsing ResourceVersion %q for %v %q: %s", objAccessor.GetResourceVersion(), c.description, name, err)
|
||||
}
|
||||
return objResourceVersion, nil
|
||||
}
|
||||
|
||||
func (c *assumeCache) getObjInfo(name string) (*objInfo, error) {
|
||||
obj, ok, err := c.store.GetByKey(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !ok {
|
||||
return nil, &errNotFound{c.description, name}
|
||||
}
|
||||
|
||||
objInfo, ok := obj.(*objInfo)
|
||||
if !ok {
|
||||
return nil, &errWrongType{"objInfo", obj}
|
||||
}
|
||||
return objInfo, nil
|
||||
}
|
||||
|
||||
func (c *assumeCache) Get(objName string) (interface{}, error) {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
objInfo, err := c.getObjInfo(objName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return objInfo.latestObj, nil
|
||||
}
|
||||
|
||||
func (c *assumeCache) List() []interface{} {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
allObjs := []interface{}{}
|
||||
for _, obj := range c.store.List() {
|
||||
objInfo, ok := obj.(*objInfo)
|
||||
if !ok {
|
||||
glog.Errorf("list error: %v", &errWrongType{"objInfo", obj})
|
||||
continue
|
||||
}
|
||||
allObjs = append(allObjs, objInfo.latestObj)
|
||||
}
|
||||
return allObjs
|
||||
}
|
||||
|
||||
func (c *assumeCache) Assume(obj interface{}) error {
|
||||
name, err := cache.MetaNamespaceKeyFunc(obj)
|
||||
if err != nil {
|
||||
return &errObjectName{err}
|
||||
}
|
||||
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
objInfo, err := c.getObjInfo(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
newVersion, err := c.getObjVersion(name, obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
storedVersion, err := c.getObjVersion(name, objInfo.latestObj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if newVersion < storedVersion {
|
||||
return fmt.Errorf("%v %q is out of sync", c.description, name)
|
||||
}
|
||||
|
||||
// Only update the cached object
|
||||
objInfo.latestObj = obj
|
||||
glog.V(4).Infof("Assumed %v %q, version %v", c.description, name, newVersion)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *assumeCache) Restore(objName string) {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
objInfo, err := c.getObjInfo(objName)
|
||||
if err != nil {
|
||||
// This could be expected if object got deleted
|
||||
glog.V(5).Infof("Restore %v %q warning: %v", c.description, objName, err)
|
||||
} else {
|
||||
objInfo.latestObj = objInfo.apiObj
|
||||
glog.V(4).Infof("Restored %v %q", c.description, objName)
|
||||
}
|
||||
}
|
||||
|
||||
// PVAssumeCache is a AssumeCache for PersistentVolume objects
|
||||
type PVAssumeCache interface {
|
||||
AssumeCache
|
||||
|
||||
GetPV(pvName string) (*v1.PersistentVolume, error)
|
||||
ListPVs() []*v1.PersistentVolume
|
||||
}
|
||||
|
||||
type pvAssumeCache struct {
|
||||
*assumeCache
|
||||
}
|
||||
|
||||
func NewPVAssumeCache(informer cache.SharedIndexInformer) PVAssumeCache {
|
||||
return &pvAssumeCache{assumeCache: NewAssumeCache(informer, "v1.PersistentVolume")}
|
||||
}
|
||||
|
||||
func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) {
|
||||
obj, err := c.Get(pvName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pv, ok := obj.(*v1.PersistentVolume)
|
||||
if !ok {
|
||||
return nil, &errWrongType{"v1.PersistentVolume", obj}
|
||||
}
|
||||
return pv, nil
|
||||
}
|
||||
|
||||
func (c *pvAssumeCache) ListPVs() []*v1.PersistentVolume {
|
||||
objs := c.List()
|
||||
pvs := []*v1.PersistentVolume{}
|
||||
for _, obj := range objs {
|
||||
pv, ok := obj.(*v1.PersistentVolume)
|
||||
if !ok {
|
||||
glog.Errorf("ListPVs: %v", &errWrongType{"v1.PersistentVolume", obj})
|
||||
}
|
||||
pvs = append(pvs, pv)
|
||||
}
|
||||
return pvs
|
||||
}
|
@ -0,0 +1,212 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package persistentvolume
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func makePV(name, version string) *v1.PersistentVolume {
|
||||
return &v1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: name, ResourceVersion: version}}
|
||||
}
|
||||
|
||||
func TestAssumePV(t *testing.T) {
|
||||
scenarios := map[string]struct {
|
||||
oldPV *v1.PersistentVolume
|
||||
newPV *v1.PersistentVolume
|
||||
shouldSucceed bool
|
||||
}{
|
||||
"success-same-version": {
|
||||
oldPV: makePV("pv1", "5"),
|
||||
newPV: makePV("pv1", "5"),
|
||||
shouldSucceed: true,
|
||||
},
|
||||
"success-new-higher-version": {
|
||||
oldPV: makePV("pv1", "5"),
|
||||
newPV: makePV("pv1", "6"),
|
||||
shouldSucceed: true,
|
||||
},
|
||||
"fail-old-not-found": {
|
||||
oldPV: makePV("pv2", "5"),
|
||||
newPV: makePV("pv1", "5"),
|
||||
shouldSucceed: false,
|
||||
},
|
||||
"fail-new-lower-version": {
|
||||
oldPV: makePV("pv1", "5"),
|
||||
newPV: makePV("pv1", "4"),
|
||||
shouldSucceed: false,
|
||||
},
|
||||
"fail-new-bad-version": {
|
||||
oldPV: makePV("pv1", "5"),
|
||||
newPV: makePV("pv1", "a"),
|
||||
shouldSucceed: false,
|
||||
},
|
||||
"fail-old-bad-version": {
|
||||
oldPV: makePV("pv1", "a"),
|
||||
newPV: makePV("pv1", "5"),
|
||||
shouldSucceed: false,
|
||||
},
|
||||
}
|
||||
|
||||
for name, scenario := range scenarios {
|
||||
cache := NewPVAssumeCache(nil)
|
||||
internal_cache, ok := cache.(*pvAssumeCache)
|
||||
if !ok {
|
||||
t.Fatalf("Failed to get internal cache")
|
||||
}
|
||||
|
||||
// Add oldPV to cache
|
||||
internal_cache.add(scenario.oldPV)
|
||||
if err := getPV(cache, scenario.oldPV.Name, scenario.oldPV); err != nil {
|
||||
t.Errorf("Failed to GetPV() after initial update: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Assume newPV
|
||||
err := cache.Assume(scenario.newPV)
|
||||
if scenario.shouldSucceed && err != nil {
|
||||
t.Errorf("Test %q failed: Assume() returned error %v", name, err)
|
||||
}
|
||||
if !scenario.shouldSucceed && err == nil {
|
||||
t.Errorf("Test %q failed: Assume() returned success but expected error", name)
|
||||
}
|
||||
|
||||
// Check that GetPV returns correct PV
|
||||
expectedPV := scenario.newPV
|
||||
if !scenario.shouldSucceed {
|
||||
expectedPV = scenario.oldPV
|
||||
}
|
||||
if err := getPV(cache, scenario.oldPV.Name, expectedPV); err != nil {
|
||||
t.Errorf("Failed to GetPV() after initial update: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRestorePV(t *testing.T) {
|
||||
cache := NewPVAssumeCache(nil)
|
||||
internal_cache, ok := cache.(*pvAssumeCache)
|
||||
if !ok {
|
||||
t.Fatalf("Failed to get internal cache")
|
||||
}
|
||||
|
||||
oldPV := makePV("pv1", "5")
|
||||
newPV := makePV("pv1", "5")
|
||||
|
||||
// Restore PV that doesn't exist
|
||||
cache.Restore("nothing")
|
||||
|
||||
// Add oldPV to cache
|
||||
internal_cache.add(oldPV)
|
||||
if err := getPV(cache, oldPV.Name, oldPV); err != nil {
|
||||
t.Fatalf("Failed to GetPV() after initial update: %v", err)
|
||||
}
|
||||
|
||||
// Restore PV
|
||||
cache.Restore(oldPV.Name)
|
||||
if err := getPV(cache, oldPV.Name, oldPV); err != nil {
|
||||
t.Fatalf("Failed to GetPV() after iniital restore: %v", err)
|
||||
}
|
||||
|
||||
// Assume newPV
|
||||
if err := cache.Assume(newPV); err != nil {
|
||||
t.Fatalf("Assume() returned error %v", err)
|
||||
}
|
||||
if err := getPV(cache, oldPV.Name, newPV); err != nil {
|
||||
t.Fatalf("Failed to GetPV() after Assume: %v", err)
|
||||
}
|
||||
|
||||
// Restore PV
|
||||
cache.Restore(oldPV.Name)
|
||||
if err := getPV(cache, oldPV.Name, oldPV); err != nil {
|
||||
t.Fatalf("Failed to GetPV() after restore: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBasicPVCache(t *testing.T) {
|
||||
cache := NewPVAssumeCache(nil)
|
||||
internal_cache, ok := cache.(*pvAssumeCache)
|
||||
if !ok {
|
||||
t.Fatalf("Failed to get internal cache")
|
||||
}
|
||||
|
||||
// Get object that doesn't exist
|
||||
pv, err := cache.GetPV("nothere")
|
||||
if err == nil {
|
||||
t.Errorf("GetPV() returned unexpected success")
|
||||
}
|
||||
if pv != nil {
|
||||
t.Errorf("GetPV() returned unexpected PV %q", pv.Name)
|
||||
}
|
||||
|
||||
// Add a bunch of PVs
|
||||
pvs := map[string]*v1.PersistentVolume{}
|
||||
for i := 0; i < 10; i++ {
|
||||
pv := makePV(fmt.Sprintf("test-pv%v", i), "1")
|
||||
pvs[pv.Name] = pv
|
||||
internal_cache.add(pv)
|
||||
}
|
||||
|
||||
// List them
|
||||
verifyListPVs(t, cache, pvs)
|
||||
|
||||
// Update a PV
|
||||
updatedPV := makePV("test-pv3", "2")
|
||||
pvs[updatedPV.Name] = updatedPV
|
||||
internal_cache.update(nil, updatedPV)
|
||||
|
||||
// List them
|
||||
verifyListPVs(t, cache, pvs)
|
||||
|
||||
// Delete a PV
|
||||
deletedPV := pvs["test-pv7"]
|
||||
delete(pvs, deletedPV.Name)
|
||||
internal_cache.delete(deletedPV)
|
||||
|
||||
// List them
|
||||
verifyListPVs(t, cache, pvs)
|
||||
}
|
||||
|
||||
func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume) {
|
||||
pvList := cache.ListPVs()
|
||||
if len(pvList) != len(expectedPVs) {
|
||||
t.Errorf("ListPVs() returned %v PVs, expected %v", len(pvList), len(expectedPVs))
|
||||
}
|
||||
for _, pv := range pvList {
|
||||
expectedPV, ok := expectedPVs[pv.Name]
|
||||
if !ok {
|
||||
t.Errorf("ListPVs() returned unexpected PV %q", pv.Name)
|
||||
}
|
||||
if expectedPV != pv {
|
||||
t.Errorf("ListPVs() returned PV %p, expected %p", pv, expectedPV)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func getPV(cache PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error {
|
||||
pv, err := cache.GetPV(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if pv != expectedPV {
|
||||
return fmt.Errorf("GetPV() returned %p, expected %p", pv, expectedPV)
|
||||
}
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user