cache update for dynamic provisioning

This commit is contained in:
lichuqiang 2018-04-09 11:14:59 +08:00
parent b5cd7d81bd
commit 91d403f384
4 changed files with 345 additions and 51 deletions

View File

@ -371,3 +371,34 @@ func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume
}
return pvs
}
// PVCAssumeCache is a AssumeCache for PersistentVolumeClaim objects
type PVCAssumeCache interface {
AssumeCache
// GetPVC returns the PVC from the cache with the same
// namespace and the same name of the specified pod.
// pvcKey is the result of MetaNamespaceKeyFunc on PVC obj
GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
}
type pvcAssumeCache struct {
*assumeCache
}
func NewPVCAssumeCache(informer cache.SharedIndexInformer) PVCAssumeCache {
return &pvcAssumeCache{assumeCache: NewAssumeCache(informer, "v1.PersistentVolumeClaim", "namespace", cache.MetaNamespaceIndexFunc)}
}
func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
obj, err := c.Get(pvcKey)
if err != nil {
return nil, err
}
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
return nil, &errWrongType{"v1.PersistentVolumeClaim", obj}
}
return pvc, nil
}

View File

@ -36,6 +36,33 @@ func makePV(name, version, storageClass string) *v1.PersistentVolume {
}
}
func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) {
pvList := cache.ListPVs(storageClassName)
if len(pvList) != len(expectedPVs) {
t.Errorf("ListPVs() returned %v PVs, expected %v", len(pvList), len(expectedPVs))
}
for _, pv := range pvList {
expectedPV, ok := expectedPVs[pv.Name]
if !ok {
t.Errorf("ListPVs() returned unexpected PV %q", pv.Name)
}
if expectedPV != pv {
t.Errorf("ListPVs() returned PV %p, expected %p", pv, expectedPV)
}
}
}
func verifyPV(cache PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error {
pv, err := cache.GetPV(name)
if err != nil {
return err
}
if pv != expectedPV {
return fmt.Errorf("GetPV() returned %p, expected %p", pv, expectedPV)
}
return nil
}
func TestAssumePV(t *testing.T) {
scenarios := map[string]struct {
oldPV *v1.PersistentVolume
@ -276,29 +303,170 @@ func TestAssumeUpdatePVCache(t *testing.T) {
}
}
func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) {
pvList := cache.ListPVs(storageClassName)
if len(pvList) != len(expectedPVs) {
t.Errorf("ListPVs() returned %v PVs, expected %v", len(pvList), len(expectedPVs))
func makeClaim(name, version, namespace string) *v1.PersistentVolumeClaim {
return &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
ResourceVersion: version,
Annotations: map[string]string{},
},
}
for _, pv := range pvList {
expectedPV, ok := expectedPVs[pv.Name]
}
func verifyPVC(cache PVCAssumeCache, pvcKey string, expectedPVC *v1.PersistentVolumeClaim) error {
pvc, err := cache.GetPVC(pvcKey)
if err != nil {
return err
}
if pvc != expectedPVC {
return fmt.Errorf("GetPVC() returned %p, expected %p", pvc, expectedPVC)
}
return nil
}
func TestAssumePVC(t *testing.T) {
scenarios := map[string]struct {
oldPVC *v1.PersistentVolumeClaim
newPVC *v1.PersistentVolumeClaim
shouldSucceed bool
}{
"success-same-version": {
oldPVC: makeClaim("pvc1", "5", "ns1"),
newPVC: makeClaim("pvc1", "5", "ns1"),
shouldSucceed: true,
},
"success-new-higher-version": {
oldPVC: makeClaim("pvc1", "5", "ns1"),
newPVC: makeClaim("pvc1", "6", "ns1"),
shouldSucceed: true,
},
"fail-old-not-found": {
oldPVC: makeClaim("pvc2", "5", "ns1"),
newPVC: makeClaim("pvc1", "5", "ns1"),
shouldSucceed: false,
},
"fail-new-lower-version": {
oldPVC: makeClaim("pvc1", "5", "ns1"),
newPVC: makeClaim("pvc1", "4", "ns1"),
shouldSucceed: false,
},
"fail-new-bad-version": {
oldPVC: makeClaim("pvc1", "5", "ns1"),
newPVC: makeClaim("pvc1", "a", "ns1"),
shouldSucceed: false,
},
"fail-old-bad-version": {
oldPVC: makeClaim("pvc1", "a", "ns1"),
newPVC: makeClaim("pvc1", "5", "ns1"),
shouldSucceed: false,
},
}
for name, scenario := range scenarios {
cache := NewPVCAssumeCache(nil)
internal_cache, ok := cache.(*pvcAssumeCache)
if !ok {
t.Errorf("ListPVs() returned unexpected PV %q", pv.Name)
t.Fatalf("Failed to get internal cache")
}
if expectedPV != pv {
t.Errorf("ListPVs() returned PV %p, expected %p", pv, expectedPV)
// Add oldPVC to cache
internal_cache.add(scenario.oldPVC)
if err := verifyPVC(cache, getPVCName(scenario.oldPVC), scenario.oldPVC); err != nil {
t.Errorf("Failed to GetPVC() after initial update: %v", err)
continue
}
// Assume newPVC
err := cache.Assume(scenario.newPVC)
if scenario.shouldSucceed && err != nil {
t.Errorf("Test %q failed: Assume() returned error %v", name, err)
}
if !scenario.shouldSucceed && err == nil {
t.Errorf("Test %q failed: Assume() returned success but expected error", name)
}
// Check that GetPVC returns correct PVC
expectedPV := scenario.newPVC
if !scenario.shouldSucceed {
expectedPV = scenario.oldPVC
}
if err := verifyPVC(cache, getPVCName(scenario.oldPVC), expectedPV); err != nil {
t.Errorf("Failed to GetPVC() after initial update: %v", err)
}
}
}
func verifyPV(cache PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error {
pv, err := cache.GetPV(name)
if err != nil {
return err
func TestRestorePVC(t *testing.T) {
cache := NewPVCAssumeCache(nil)
internal_cache, ok := cache.(*pvcAssumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
if pv != expectedPV {
return fmt.Errorf("GetPV() returned %p, expected %p", pv, expectedPV)
oldPVC := makeClaim("pvc1", "5", "ns1")
newPVC := makeClaim("pvc1", "5", "ns1")
// Restore PVC that doesn't exist
cache.Restore("nothing")
// Add oldPVC to cache
internal_cache.add(oldPVC)
if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil {
t.Fatalf("Failed to GetPVC() after initial update: %v", err)
}
// Restore PVC
cache.Restore(getPVCName(oldPVC))
if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil {
t.Fatalf("Failed to GetPVC() after iniital restore: %v", err)
}
// Assume newPVC
if err := cache.Assume(newPVC); err != nil {
t.Fatalf("Assume() returned error %v", err)
}
if err := verifyPVC(cache, getPVCName(oldPVC), newPVC); err != nil {
t.Fatalf("Failed to GetPVC() after Assume: %v", err)
}
// Restore PVC
cache.Restore(getPVCName(oldPVC))
if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil {
t.Fatalf("Failed to GetPVC() after restore: %v", err)
}
}
func TestAssumeUpdatePVCCache(t *testing.T) {
cache := NewPVCAssumeCache(nil)
internal_cache, ok := cache.(*pvcAssumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
pvcName := "test-pvc0"
pvcNamespace := "test-ns"
// Add a PVC
pvc := makeClaim(pvcName, "1", pvcNamespace)
internal_cache.add(pvc)
if err := verifyPVC(cache, getPVCName(pvc), pvc); err != nil {
t.Fatalf("failed to get PVC: %v", err)
}
// Assume PVC
newPVC := pvc.DeepCopy()
newPVC.Annotations["volume.alpha.kubernetes.io/selected-node"] = "test-node"
if err := cache.Assume(newPVC); err != nil {
t.Fatalf("failed to assume PVC: %v", err)
}
if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil {
t.Fatalf("failed to get PVC after assume: %v", err)
}
// Add old PVC
internal_cache.add(pvc)
if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil {
t.Fatalf("failed to get PVC after old PVC added: %v", err)
}
return nil
}

View File

@ -30,27 +30,41 @@ type PodBindingCache interface {
// pod and node.
UpdateBindings(pod *v1.Pod, node string, bindings []*bindingInfo)
// DeleteBindings will remove all cached bindings for the given pod.
DeleteBindings(pod *v1.Pod)
// GetBindings will return the cached bindings for the given pod and node.
GetBindings(pod *v1.Pod, node string) []*bindingInfo
// UpdateProvisionedPVCs will update the cache with the given provisioning decisions
// for the pod and node.
UpdateProvisionedPVCs(pod *v1.Pod, node string, provisionings []*v1.PersistentVolumeClaim)
// GetProvisionedPVCs will return the cached provisioning decisions for the given pod and node.
GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim
// DeleteBindings will remove all cached bindings and provisionings for the given pod.
// TODO: separate the func if it is needed to delete bindings/provisionings individually
DeleteBindings(pod *v1.Pod)
}
type podBindingCache struct {
mutex sync.Mutex
// Key = pod name
// Value = nodeBindings
bindings map[string]nodeBindings
// Value = nodeDecisions
bindingDecisions map[string]nodeDecisions
}
// Key = nodeName
// Value = array of bindingInfo
type nodeBindings map[string][]*bindingInfo
// Value = bindings & provisioned PVCs of the node
type nodeDecisions map[string]nodeDecision
// A decision includes bindingInfo and provisioned PVCs of the node
type nodeDecision struct {
bindings []*bindingInfo
provisionings []*v1.PersistentVolumeClaim
}
func NewPodBindingCache() PodBindingCache {
return &podBindingCache{bindings: map[string]nodeBindings{}}
return &podBindingCache{bindingDecisions: map[string]nodeDecisions{}}
}
func (c *podBindingCache) DeleteBindings(pod *v1.Pod) {
@ -58,7 +72,7 @@ func (c *podBindingCache) DeleteBindings(pod *v1.Pod) {
defer c.mutex.Unlock()
podName := getPodName(pod)
delete(c.bindings, podName)
delete(c.bindingDecisions, podName)
}
func (c *podBindingCache) UpdateBindings(pod *v1.Pod, node string, bindings []*bindingInfo) {
@ -66,12 +80,20 @@ func (c *podBindingCache) UpdateBindings(pod *v1.Pod, node string, bindings []*b
defer c.mutex.Unlock()
podName := getPodName(pod)
nodeBinding, ok := c.bindings[podName]
decisions, ok := c.bindingDecisions[podName]
if !ok {
nodeBinding = nodeBindings{}
c.bindings[podName] = nodeBinding
decisions = nodeDecisions{}
c.bindingDecisions[podName] = decisions
}
nodeBinding[node] = bindings
decision, ok := decisions[node]
if !ok {
decision = nodeDecision{
bindings: bindings,
}
} else {
decision.bindings = bindings
}
decisions[node] = decision
}
func (c *podBindingCache) GetBindings(pod *v1.Pod, node string) []*bindingInfo {
@ -79,9 +101,50 @@ func (c *podBindingCache) GetBindings(pod *v1.Pod, node string) []*bindingInfo {
defer c.mutex.Unlock()
podName := getPodName(pod)
nodeBindings, ok := c.bindings[podName]
decisions, ok := c.bindingDecisions[podName]
if !ok {
return nil
}
return nodeBindings[node]
decision, ok := decisions[node]
if !ok {
return nil
}
return decision.bindings
}
func (c *podBindingCache) UpdateProvisionedPVCs(pod *v1.Pod, node string, pvcs []*v1.PersistentVolumeClaim) {
c.mutex.Lock()
defer c.mutex.Unlock()
podName := getPodName(pod)
decisions, ok := c.bindingDecisions[podName]
if !ok {
decisions = nodeDecisions{}
c.bindingDecisions[podName] = decisions
}
decision, ok := decisions[node]
if !ok {
decision = nodeDecision{
provisionings: pvcs,
}
} else {
decision.provisionings = pvcs
}
decisions[node] = decision
}
func (c *podBindingCache) GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim {
c.mutex.Lock()
defer c.mutex.Unlock()
podName := getPodName(pod)
decisions, ok := c.bindingDecisions[podName]
if !ok {
return nil
}
decision, ok := decisions[node]
if !ok {
return nil
}
return decision.provisionings
}

View File

@ -26,32 +26,37 @@ import (
func TestUpdateGetBindings(t *testing.T) {
scenarios := map[string]struct {
updateBindings []*bindingInfo
updatePod string
updateNode string
updateBindings []*bindingInfo
updateProvisionings []*v1.PersistentVolumeClaim
updatePod string
updateNode string
getBindings []*bindingInfo
getPod string
getNode string
getBindings []*bindingInfo
getProvisionings []*v1.PersistentVolumeClaim
getPod string
getNode string
}{
"no-pod": {
getPod: "pod1",
getNode: "node1",
},
"no-node": {
updatePod: "pod1",
updateNode: "node1",
updateBindings: []*bindingInfo{},
getPod: "pod1",
getNode: "node2",
updatePod: "pod1",
updateNode: "node1",
updateBindings: []*bindingInfo{},
updateProvisionings: []*v1.PersistentVolumeClaim{},
getPod: "pod1",
getNode: "node2",
},
"binding-exists": {
updatePod: "pod1",
updateNode: "node1",
updateBindings: []*bindingInfo{{pvc: &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "pvc1"}}}},
getPod: "pod1",
getNode: "node1",
getBindings: []*bindingInfo{{pvc: &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "pvc1"}}}},
updatePod: "pod1",
updateNode: "node1",
updateBindings: []*bindingInfo{{pvc: &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "pvc1"}}}},
updateProvisionings: []*v1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{Name: "pvc2"}}},
getPod: "pod1",
getNode: "node1",
getBindings: []*bindingInfo{{pvc: &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "pvc1"}}}},
getProvisionings: []*v1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{Name: "pvc2"}}},
},
}
@ -61,6 +66,7 @@ func TestUpdateGetBindings(t *testing.T) {
// Perform updates
updatePod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: scenario.updatePod, Namespace: "ns"}}
cache.UpdateBindings(updatePod, scenario.updateNode, scenario.updateBindings)
cache.UpdateProvisionedPVCs(updatePod, scenario.updateNode, scenario.updateProvisionings)
// Verify updated bindings
bindings := cache.GetBindings(updatePod, scenario.updateNode)
@ -68,45 +74,71 @@ func TestUpdateGetBindings(t *testing.T) {
t.Errorf("Test %v failed: returned bindings after update different. Got %+v, expected %+v", name, bindings, scenario.updateBindings)
}
// Verify updated provisionings
provisionings := cache.GetProvisionedPVCs(updatePod, scenario.updateNode)
if !reflect.DeepEqual(provisionings, scenario.updateProvisionings) {
t.Errorf("Test %v failed: returned provisionings after update different. Got %+v, expected %+v", name, provisionings, scenario.updateProvisionings)
}
// Get bindings
getPod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: scenario.getPod, Namespace: "ns"}}
bindings = cache.GetBindings(getPod, scenario.getNode)
if !reflect.DeepEqual(bindings, scenario.getBindings) {
t.Errorf("Test %v failed: unexpected bindings returned. Got %+v, expected %+v", name, bindings, scenario.updateBindings)
}
// Get provisionings
provisionings = cache.GetProvisionedPVCs(getPod, scenario.getNode)
if !reflect.DeepEqual(provisionings, scenario.getProvisionings) {
t.Errorf("Test %v failed: unexpected bindings returned. Got %+v, expected %+v", name, provisionings, scenario.getProvisionings)
}
}
}
func TestDeleteBindings(t *testing.T) {
initialBindings := []*bindingInfo{{pvc: &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "pvc1"}}}}
initialProvisionings := []*v1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{Name: "pvc2"}}}
cache := NewPodBindingCache()
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "ns"}}
// Get nil bindings
// Get nil bindings and provisionings
bindings := cache.GetBindings(pod, "node1")
if bindings != nil {
t.Errorf("Test failed: expected initial nil bindings, got %+v", bindings)
}
provisionings := cache.GetProvisionedPVCs(pod, "node1")
if provisionings != nil {
t.Errorf("Test failed: expected initial nil provisionings, got %+v", provisionings)
}
// Delete nothing
cache.DeleteBindings(pod)
// Perform updates
cache.UpdateBindings(pod, "node1", initialBindings)
cache.UpdateProvisionedPVCs(pod, "node1", initialProvisionings)
// Get bindings
// Get bindings and provisionings
bindings = cache.GetBindings(pod, "node1")
if !reflect.DeepEqual(bindings, initialBindings) {
t.Errorf("Test failed: expected bindings %+v, got %+v", initialBindings, bindings)
}
provisionings = cache.GetProvisionedPVCs(pod, "node1")
if !reflect.DeepEqual(provisionings, initialProvisionings) {
t.Errorf("Test failed: expected provisionings %+v, got %+v", initialProvisionings, provisionings)
}
// Delete
cache.DeleteBindings(pod)
// Get bindings
// Get bindings and provisionings
bindings = cache.GetBindings(pod, "node1")
if bindings != nil {
t.Errorf("Test failed: expected nil bindings, got %+v", bindings)
}
provisionings = cache.GetProvisionedPVCs(pod, "node1")
if provisionings != nil {
t.Errorf("Test failed: expected nil provisionings, got %+v", provisionings)
}
}