From 10800e68acbe99009f5522669eeac4abf38336c5 Mon Sep 17 00:00:00 2001 From: Michelle Au Date: Wed, 8 Nov 2017 13:09:48 -0800 Subject: [PATCH] Add assume cache for PVs --- .../scheduler_assume_cache.go | 318 ++++++++++++++++++ .../scheduler_assume_cache_test.go | 212 ++++++++++++ 2 files changed, 530 insertions(+) create mode 100644 pkg/controller/volume/persistentvolume/scheduler_assume_cache.go create mode 100644 pkg/controller/volume/persistentvolume/scheduler_assume_cache_test.go diff --git a/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go b/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go new file mode 100644 index 00000000000..28884004d7c --- /dev/null +++ b/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go @@ -0,0 +1,318 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package persistentvolume + +import ( + "fmt" + "strconv" + "sync" + + "github.com/golang/glog" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/client-go/tools/cache" +) + +// AssumeCache is a cache on top of the informer that allows for updating +// objects outside of informer events and also restoring the informer +// cache's version of the object. Objects are assumed to be +// Kubernetes API objects that implement meta.Interface +type AssumeCache interface { + // Assume updates the object in-memory only + Assume(obj interface{}) error + + // Restore the informer cache's version of the object + Restore(objName string) + + // Get the object by name + Get(objName string) (interface{}, error) + + // List all the objects in the cache + List() []interface{} +} + +type errWrongType struct { + typeName string + object interface{} +} + +func (e *errWrongType) Error() string { + return fmt.Sprintf("could not convert object to type %v: %+v", e.typeName, e.object) +} + +type errNotFound struct { + typeName string + objectName string +} + +func (e *errNotFound) Error() string { + return fmt.Sprintf("could not find %v %q", e.typeName, e.objectName) +} + +type errObjectName struct { + detailedErr error +} + +func (e *errObjectName) Error() string { + return fmt.Sprintf("failed to get object name: %v", e.detailedErr) +} + +// assumeCache stores two pointers to represent a single object: +// * The pointer to the informer object. +// * The pointer to the latest object, which could be the same as +// the informer object, or an in-memory object. +// +// An informer update always overrides the latest object pointer. +// +// Assume() only updates the latest object pointer. +// Restore() sets the latest object pointer back to the informer object. +// Get/List() always returns the latest object pointer. +type assumeCache struct { + mutex sync.Mutex + + // describes the object stored + description string + + // Stores objInfo pointers + store cache.Store +} + +type objInfo struct { + // name of the object + name string + + // Latest version of object could be cached-only or from informer + latestObj interface{} + + // Latest object from informer + apiObj interface{} +} + +func objInfoKeyFunc(obj interface{}) (string, error) { + objInfo, ok := obj.(*objInfo) + if !ok { + return "", &errWrongType{"objInfo", obj} + } + return objInfo.name, nil +} + +func NewAssumeCache(informer cache.SharedIndexInformer, description string) *assumeCache { + // TODO: index by storageclass + c := &assumeCache{store: cache.NewStore(objInfoKeyFunc), description: description} + + // Unit tests don't use informers + if informer != nil { + informer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.add, + UpdateFunc: c.update, + DeleteFunc: c.delete, + }, + ) + } + return c +} + +func (c *assumeCache) add(obj interface{}) { + if obj == nil { + return + } + + name, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + glog.Errorf("add failed: %v", &errObjectName{err}) + return + } + + c.mutex.Lock() + defer c.mutex.Unlock() + + objInfo := &objInfo{name: name, latestObj: obj, apiObj: obj} + c.store.Update(objInfo) +} + +func (c *assumeCache) update(oldObj interface{}, newObj interface{}) { + c.add(newObj) +} + +func (c *assumeCache) delete(obj interface{}) { + if obj == nil { + return + } + + name, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + glog.Errorf("delete failed: %v", &errObjectName{err}) + return + } + + c.mutex.Lock() + defer c.mutex.Unlock() + + objInfo := &objInfo{name: name} + err = c.store.Delete(objInfo) + if err != nil { + glog.Errorf("delete: failed to delete %v %v: %v", c.description, name, err) + } +} + +func (c *assumeCache) getObjVersion(name string, obj interface{}) (int64, error) { + objAccessor, err := meta.Accessor(obj) + if err != nil { + return -1, err + } + + objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64) + if err != nil { + return -1, fmt.Errorf("error parsing ResourceVersion %q for %v %q: %s", objAccessor.GetResourceVersion(), c.description, name, err) + } + return objResourceVersion, nil +} + +func (c *assumeCache) getObjInfo(name string) (*objInfo, error) { + obj, ok, err := c.store.GetByKey(name) + if err != nil { + return nil, err + } + if !ok { + return nil, &errNotFound{c.description, name} + } + + objInfo, ok := obj.(*objInfo) + if !ok { + return nil, &errWrongType{"objInfo", obj} + } + return objInfo, nil +} + +func (c *assumeCache) Get(objName string) (interface{}, error) { + c.mutex.Lock() + defer c.mutex.Unlock() + + objInfo, err := c.getObjInfo(objName) + if err != nil { + return nil, err + } + return objInfo.latestObj, nil +} + +func (c *assumeCache) List() []interface{} { + c.mutex.Lock() + defer c.mutex.Unlock() + + allObjs := []interface{}{} + for _, obj := range c.store.List() { + objInfo, ok := obj.(*objInfo) + if !ok { + glog.Errorf("list error: %v", &errWrongType{"objInfo", obj}) + continue + } + allObjs = append(allObjs, objInfo.latestObj) + } + return allObjs +} + +func (c *assumeCache) Assume(obj interface{}) error { + name, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + return &errObjectName{err} + } + + c.mutex.Lock() + defer c.mutex.Unlock() + + objInfo, err := c.getObjInfo(name) + if err != nil { + return err + } + + newVersion, err := c.getObjVersion(name, obj) + if err != nil { + return err + } + + storedVersion, err := c.getObjVersion(name, objInfo.latestObj) + if err != nil { + return err + } + + if newVersion < storedVersion { + return fmt.Errorf("%v %q is out of sync", c.description, name) + } + + // Only update the cached object + objInfo.latestObj = obj + glog.V(4).Infof("Assumed %v %q, version %v", c.description, name, newVersion) + return nil +} + +func (c *assumeCache) Restore(objName string) { + c.mutex.Lock() + defer c.mutex.Unlock() + + objInfo, err := c.getObjInfo(objName) + if err != nil { + // This could be expected if object got deleted + glog.V(5).Infof("Restore %v %q warning: %v", c.description, objName, err) + } else { + objInfo.latestObj = objInfo.apiObj + glog.V(4).Infof("Restored %v %q", c.description, objName) + } +} + +// PVAssumeCache is a AssumeCache for PersistentVolume objects +type PVAssumeCache interface { + AssumeCache + + GetPV(pvName string) (*v1.PersistentVolume, error) + ListPVs() []*v1.PersistentVolume +} + +type pvAssumeCache struct { + *assumeCache +} + +func NewPVAssumeCache(informer cache.SharedIndexInformer) PVAssumeCache { + return &pvAssumeCache{assumeCache: NewAssumeCache(informer, "v1.PersistentVolume")} +} + +func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) { + obj, err := c.Get(pvName) + if err != nil { + return nil, err + } + + pv, ok := obj.(*v1.PersistentVolume) + if !ok { + return nil, &errWrongType{"v1.PersistentVolume", obj} + } + return pv, nil +} + +func (c *pvAssumeCache) ListPVs() []*v1.PersistentVolume { + objs := c.List() + pvs := []*v1.PersistentVolume{} + for _, obj := range objs { + pv, ok := obj.(*v1.PersistentVolume) + if !ok { + glog.Errorf("ListPVs: %v", &errWrongType{"v1.PersistentVolume", obj}) + } + pvs = append(pvs, pv) + } + return pvs +} diff --git a/pkg/controller/volume/persistentvolume/scheduler_assume_cache_test.go b/pkg/controller/volume/persistentvolume/scheduler_assume_cache_test.go new file mode 100644 index 00000000000..7332c4d474a --- /dev/null +++ b/pkg/controller/volume/persistentvolume/scheduler_assume_cache_test.go @@ -0,0 +1,212 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package persistentvolume + +import ( + "fmt" + "testing" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func makePV(name, version string) *v1.PersistentVolume { + return &v1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: name, ResourceVersion: version}} +} + +func TestAssumePV(t *testing.T) { + scenarios := map[string]struct { + oldPV *v1.PersistentVolume + newPV *v1.PersistentVolume + shouldSucceed bool + }{ + "success-same-version": { + oldPV: makePV("pv1", "5"), + newPV: makePV("pv1", "5"), + shouldSucceed: true, + }, + "success-new-higher-version": { + oldPV: makePV("pv1", "5"), + newPV: makePV("pv1", "6"), + shouldSucceed: true, + }, + "fail-old-not-found": { + oldPV: makePV("pv2", "5"), + newPV: makePV("pv1", "5"), + shouldSucceed: false, + }, + "fail-new-lower-version": { + oldPV: makePV("pv1", "5"), + newPV: makePV("pv1", "4"), + shouldSucceed: false, + }, + "fail-new-bad-version": { + oldPV: makePV("pv1", "5"), + newPV: makePV("pv1", "a"), + shouldSucceed: false, + }, + "fail-old-bad-version": { + oldPV: makePV("pv1", "a"), + newPV: makePV("pv1", "5"), + shouldSucceed: false, + }, + } + + for name, scenario := range scenarios { + cache := NewPVAssumeCache(nil) + internal_cache, ok := cache.(*pvAssumeCache) + if !ok { + t.Fatalf("Failed to get internal cache") + } + + // Add oldPV to cache + internal_cache.add(scenario.oldPV) + if err := getPV(cache, scenario.oldPV.Name, scenario.oldPV); err != nil { + t.Errorf("Failed to GetPV() after initial update: %v", err) + continue + } + + // Assume newPV + err := cache.Assume(scenario.newPV) + if scenario.shouldSucceed && err != nil { + t.Errorf("Test %q failed: Assume() returned error %v", name, err) + } + if !scenario.shouldSucceed && err == nil { + t.Errorf("Test %q failed: Assume() returned success but expected error", name) + } + + // Check that GetPV returns correct PV + expectedPV := scenario.newPV + if !scenario.shouldSucceed { + expectedPV = scenario.oldPV + } + if err := getPV(cache, scenario.oldPV.Name, expectedPV); err != nil { + t.Errorf("Failed to GetPV() after initial update: %v", err) + } + } +} + +func TestRestorePV(t *testing.T) { + cache := NewPVAssumeCache(nil) + internal_cache, ok := cache.(*pvAssumeCache) + if !ok { + t.Fatalf("Failed to get internal cache") + } + + oldPV := makePV("pv1", "5") + newPV := makePV("pv1", "5") + + // Restore PV that doesn't exist + cache.Restore("nothing") + + // Add oldPV to cache + internal_cache.add(oldPV) + if err := getPV(cache, oldPV.Name, oldPV); err != nil { + t.Fatalf("Failed to GetPV() after initial update: %v", err) + } + + // Restore PV + cache.Restore(oldPV.Name) + if err := getPV(cache, oldPV.Name, oldPV); err != nil { + t.Fatalf("Failed to GetPV() after iniital restore: %v", err) + } + + // Assume newPV + if err := cache.Assume(newPV); err != nil { + t.Fatalf("Assume() returned error %v", err) + } + if err := getPV(cache, oldPV.Name, newPV); err != nil { + t.Fatalf("Failed to GetPV() after Assume: %v", err) + } + + // Restore PV + cache.Restore(oldPV.Name) + if err := getPV(cache, oldPV.Name, oldPV); err != nil { + t.Fatalf("Failed to GetPV() after restore: %v", err) + } +} + +func TestBasicPVCache(t *testing.T) { + cache := NewPVAssumeCache(nil) + internal_cache, ok := cache.(*pvAssumeCache) + if !ok { + t.Fatalf("Failed to get internal cache") + } + + // Get object that doesn't exist + pv, err := cache.GetPV("nothere") + if err == nil { + t.Errorf("GetPV() returned unexpected success") + } + if pv != nil { + t.Errorf("GetPV() returned unexpected PV %q", pv.Name) + } + + // Add a bunch of PVs + pvs := map[string]*v1.PersistentVolume{} + for i := 0; i < 10; i++ { + pv := makePV(fmt.Sprintf("test-pv%v", i), "1") + pvs[pv.Name] = pv + internal_cache.add(pv) + } + + // List them + verifyListPVs(t, cache, pvs) + + // Update a PV + updatedPV := makePV("test-pv3", "2") + pvs[updatedPV.Name] = updatedPV + internal_cache.update(nil, updatedPV) + + // List them + verifyListPVs(t, cache, pvs) + + // Delete a PV + deletedPV := pvs["test-pv7"] + delete(pvs, deletedPV.Name) + internal_cache.delete(deletedPV) + + // List them + verifyListPVs(t, cache, pvs) +} + +func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume) { + pvList := cache.ListPVs() + if len(pvList) != len(expectedPVs) { + t.Errorf("ListPVs() returned %v PVs, expected %v", len(pvList), len(expectedPVs)) + } + for _, pv := range pvList { + expectedPV, ok := expectedPVs[pv.Name] + if !ok { + t.Errorf("ListPVs() returned unexpected PV %q", pv.Name) + } + if expectedPV != pv { + t.Errorf("ListPVs() returned PV %p, expected %p", pv, expectedPV) + } + } +} + +func getPV(cache PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error { + pv, err := cache.GetPV(name) + if err != nil { + return err + } + if pv != expectedPV { + return fmt.Errorf("GetPV() returned %p, expected %p", pv, expectedPV) + } + return nil +}