From 91d403f3843e34668ca0f31c5aa569376f1a64bd Mon Sep 17 00:00:00 2001 From: lichuqiang Date: Mon, 9 Apr 2018 11:14:59 +0800 Subject: [PATCH] cache update for dynamic provisioning --- .../scheduler_assume_cache.go | 31 +++ .../scheduler_assume_cache_test.go | 200 ++++++++++++++++-- .../scheduler_binder_cache.go | 93 ++++++-- .../scheduler_binder_cache_test.go | 72 +++++-- 4 files changed, 345 insertions(+), 51 deletions(-) diff --git a/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go b/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go index 3b2352beac2..b04b402bff4 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go +++ b/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go @@ -371,3 +371,34 @@ func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume } return pvs } + +// PVCAssumeCache is a AssumeCache for PersistentVolumeClaim objects +type PVCAssumeCache interface { + AssumeCache + + // GetPVC returns the PVC from the cache with the same + // namespace and the same name of the specified pod. + // pvcKey is the result of MetaNamespaceKeyFunc on PVC obj + GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) +} + +type pvcAssumeCache struct { + *assumeCache +} + +func NewPVCAssumeCache(informer cache.SharedIndexInformer) PVCAssumeCache { + return &pvcAssumeCache{assumeCache: NewAssumeCache(informer, "v1.PersistentVolumeClaim", "namespace", cache.MetaNamespaceIndexFunc)} +} + +func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { + obj, err := c.Get(pvcKey) + if err != nil { + return nil, err + } + + pvc, ok := obj.(*v1.PersistentVolumeClaim) + if !ok { + return nil, &errWrongType{"v1.PersistentVolumeClaim", obj} + } + return pvc, nil +} diff --git a/pkg/controller/volume/persistentvolume/scheduler_assume_cache_test.go b/pkg/controller/volume/persistentvolume/scheduler_assume_cache_test.go index 467daffe747..c6c0f1f0ccd 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_assume_cache_test.go +++ b/pkg/controller/volume/persistentvolume/scheduler_assume_cache_test.go @@ -36,6 +36,33 @@ func makePV(name, version, storageClass string) *v1.PersistentVolume { } } +func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) { + pvList := cache.ListPVs(storageClassName) + if len(pvList) != len(expectedPVs) { + t.Errorf("ListPVs() returned %v PVs, expected %v", len(pvList), len(expectedPVs)) + } + for _, pv := range pvList { + expectedPV, ok := expectedPVs[pv.Name] + if !ok { + t.Errorf("ListPVs() returned unexpected PV %q", pv.Name) + } + if expectedPV != pv { + t.Errorf("ListPVs() returned PV %p, expected %p", pv, expectedPV) + } + } +} + +func verifyPV(cache PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error { + pv, err := cache.GetPV(name) + if err != nil { + return err + } + if pv != expectedPV { + return fmt.Errorf("GetPV() returned %p, expected %p", pv, expectedPV) + } + return nil +} + func TestAssumePV(t *testing.T) { scenarios := map[string]struct { oldPV *v1.PersistentVolume @@ -276,29 +303,170 @@ func TestAssumeUpdatePVCache(t *testing.T) { } } -func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) { - pvList := cache.ListPVs(storageClassName) - if len(pvList) != len(expectedPVs) { - t.Errorf("ListPVs() returned %v PVs, expected %v", len(pvList), len(expectedPVs)) +func makeClaim(name, version, namespace string) *v1.PersistentVolumeClaim { + return &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + ResourceVersion: version, + Annotations: map[string]string{}, + }, } - for _, pv := range pvList { - expectedPV, ok := expectedPVs[pv.Name] +} + +func verifyPVC(cache PVCAssumeCache, pvcKey string, expectedPVC *v1.PersistentVolumeClaim) error { + pvc, err := cache.GetPVC(pvcKey) + if err != nil { + return err + } + if pvc != expectedPVC { + return fmt.Errorf("GetPVC() returned %p, expected %p", pvc, expectedPVC) + } + return nil +} + +func TestAssumePVC(t *testing.T) { + scenarios := map[string]struct { + oldPVC *v1.PersistentVolumeClaim + newPVC *v1.PersistentVolumeClaim + shouldSucceed bool + }{ + "success-same-version": { + oldPVC: makeClaim("pvc1", "5", "ns1"), + newPVC: makeClaim("pvc1", "5", "ns1"), + shouldSucceed: true, + }, + "success-new-higher-version": { + oldPVC: makeClaim("pvc1", "5", "ns1"), + newPVC: makeClaim("pvc1", "6", "ns1"), + shouldSucceed: true, + }, + "fail-old-not-found": { + oldPVC: makeClaim("pvc2", "5", "ns1"), + newPVC: makeClaim("pvc1", "5", "ns1"), + shouldSucceed: false, + }, + "fail-new-lower-version": { + oldPVC: makeClaim("pvc1", "5", "ns1"), + newPVC: makeClaim("pvc1", "4", "ns1"), + shouldSucceed: false, + }, + "fail-new-bad-version": { + oldPVC: makeClaim("pvc1", "5", "ns1"), + newPVC: makeClaim("pvc1", "a", "ns1"), + shouldSucceed: false, + }, + "fail-old-bad-version": { + oldPVC: makeClaim("pvc1", "a", "ns1"), + newPVC: makeClaim("pvc1", "5", "ns1"), + shouldSucceed: false, + }, + } + + for name, scenario := range scenarios { + cache := NewPVCAssumeCache(nil) + internal_cache, ok := cache.(*pvcAssumeCache) if !ok { - t.Errorf("ListPVs() returned unexpected PV %q", pv.Name) + t.Fatalf("Failed to get internal cache") } - if expectedPV != pv { - t.Errorf("ListPVs() returned PV %p, expected %p", pv, expectedPV) + + // Add oldPVC to cache + internal_cache.add(scenario.oldPVC) + if err := verifyPVC(cache, getPVCName(scenario.oldPVC), scenario.oldPVC); err != nil { + t.Errorf("Failed to GetPVC() after initial update: %v", err) + continue + } + + // Assume newPVC + err := cache.Assume(scenario.newPVC) + if scenario.shouldSucceed && err != nil { + t.Errorf("Test %q failed: Assume() returned error %v", name, err) + } + if !scenario.shouldSucceed && err == nil { + t.Errorf("Test %q failed: Assume() returned success but expected error", name) + } + + // Check that GetPVC returns correct PVC + expectedPV := scenario.newPVC + if !scenario.shouldSucceed { + expectedPV = scenario.oldPVC + } + if err := verifyPVC(cache, getPVCName(scenario.oldPVC), expectedPV); err != nil { + t.Errorf("Failed to GetPVC() after initial update: %v", err) } } } -func verifyPV(cache PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error { - pv, err := cache.GetPV(name) - if err != nil { - return err +func TestRestorePVC(t *testing.T) { + cache := NewPVCAssumeCache(nil) + internal_cache, ok := cache.(*pvcAssumeCache) + if !ok { + t.Fatalf("Failed to get internal cache") } - if pv != expectedPV { - return fmt.Errorf("GetPV() returned %p, expected %p", pv, expectedPV) + + oldPVC := makeClaim("pvc1", "5", "ns1") + newPVC := makeClaim("pvc1", "5", "ns1") + + // Restore PVC that doesn't exist + cache.Restore("nothing") + + // Add oldPVC to cache + internal_cache.add(oldPVC) + if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil { + t.Fatalf("Failed to GetPVC() after initial update: %v", err) + } + + // Restore PVC + cache.Restore(getPVCName(oldPVC)) + if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil { + t.Fatalf("Failed to GetPVC() after iniital restore: %v", err) + } + + // Assume newPVC + if err := cache.Assume(newPVC); err != nil { + t.Fatalf("Assume() returned error %v", err) + } + if err := verifyPVC(cache, getPVCName(oldPVC), newPVC); err != nil { + t.Fatalf("Failed to GetPVC() after Assume: %v", err) + } + + // Restore PVC + cache.Restore(getPVCName(oldPVC)) + if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil { + t.Fatalf("Failed to GetPVC() after restore: %v", err) + } +} + +func TestAssumeUpdatePVCCache(t *testing.T) { + cache := NewPVCAssumeCache(nil) + internal_cache, ok := cache.(*pvcAssumeCache) + if !ok { + t.Fatalf("Failed to get internal cache") + } + + pvcName := "test-pvc0" + pvcNamespace := "test-ns" + + // Add a PVC + pvc := makeClaim(pvcName, "1", pvcNamespace) + internal_cache.add(pvc) + if err := verifyPVC(cache, getPVCName(pvc), pvc); err != nil { + t.Fatalf("failed to get PVC: %v", err) + } + + // Assume PVC + newPVC := pvc.DeepCopy() + newPVC.Annotations["volume.alpha.kubernetes.io/selected-node"] = "test-node" + if err := cache.Assume(newPVC); err != nil { + t.Fatalf("failed to assume PVC: %v", err) + } + if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil { + t.Fatalf("failed to get PVC after assume: %v", err) + } + + // Add old PVC + internal_cache.add(pvc) + if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil { + t.Fatalf("failed to get PVC after old PVC added: %v", err) } - return nil } diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder_cache.go b/pkg/controller/volume/persistentvolume/scheduler_binder_cache.go index 8a0a7796085..c523011795f 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder_cache.go +++ b/pkg/controller/volume/persistentvolume/scheduler_binder_cache.go @@ -30,27 +30,41 @@ type PodBindingCache interface { // pod and node. UpdateBindings(pod *v1.Pod, node string, bindings []*bindingInfo) - // DeleteBindings will remove all cached bindings for the given pod. - DeleteBindings(pod *v1.Pod) - // GetBindings will return the cached bindings for the given pod and node. GetBindings(pod *v1.Pod, node string) []*bindingInfo + + // UpdateProvisionedPVCs will update the cache with the given provisioning decisions + // for the pod and node. + UpdateProvisionedPVCs(pod *v1.Pod, node string, provisionings []*v1.PersistentVolumeClaim) + + // GetProvisionedPVCs will return the cached provisioning decisions for the given pod and node. + GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim + + // DeleteBindings will remove all cached bindings and provisionings for the given pod. + // TODO: separate the func if it is needed to delete bindings/provisionings individually + DeleteBindings(pod *v1.Pod) } type podBindingCache struct { mutex sync.Mutex // Key = pod name - // Value = nodeBindings - bindings map[string]nodeBindings + // Value = nodeDecisions + bindingDecisions map[string]nodeDecisions } // Key = nodeName -// Value = array of bindingInfo -type nodeBindings map[string][]*bindingInfo +// Value = bindings & provisioned PVCs of the node +type nodeDecisions map[string]nodeDecision + +// A decision includes bindingInfo and provisioned PVCs of the node +type nodeDecision struct { + bindings []*bindingInfo + provisionings []*v1.PersistentVolumeClaim +} func NewPodBindingCache() PodBindingCache { - return &podBindingCache{bindings: map[string]nodeBindings{}} + return &podBindingCache{bindingDecisions: map[string]nodeDecisions{}} } func (c *podBindingCache) DeleteBindings(pod *v1.Pod) { @@ -58,7 +72,7 @@ func (c *podBindingCache) DeleteBindings(pod *v1.Pod) { defer c.mutex.Unlock() podName := getPodName(pod) - delete(c.bindings, podName) + delete(c.bindingDecisions, podName) } func (c *podBindingCache) UpdateBindings(pod *v1.Pod, node string, bindings []*bindingInfo) { @@ -66,12 +80,20 @@ func (c *podBindingCache) UpdateBindings(pod *v1.Pod, node string, bindings []*b defer c.mutex.Unlock() podName := getPodName(pod) - nodeBinding, ok := c.bindings[podName] + decisions, ok := c.bindingDecisions[podName] if !ok { - nodeBinding = nodeBindings{} - c.bindings[podName] = nodeBinding + decisions = nodeDecisions{} + c.bindingDecisions[podName] = decisions } - nodeBinding[node] = bindings + decision, ok := decisions[node] + if !ok { + decision = nodeDecision{ + bindings: bindings, + } + } else { + decision.bindings = bindings + } + decisions[node] = decision } func (c *podBindingCache) GetBindings(pod *v1.Pod, node string) []*bindingInfo { @@ -79,9 +101,50 @@ func (c *podBindingCache) GetBindings(pod *v1.Pod, node string) []*bindingInfo { defer c.mutex.Unlock() podName := getPodName(pod) - nodeBindings, ok := c.bindings[podName] + decisions, ok := c.bindingDecisions[podName] if !ok { return nil } - return nodeBindings[node] + decision, ok := decisions[node] + if !ok { + return nil + } + return decision.bindings +} + +func (c *podBindingCache) UpdateProvisionedPVCs(pod *v1.Pod, node string, pvcs []*v1.PersistentVolumeClaim) { + c.mutex.Lock() + defer c.mutex.Unlock() + + podName := getPodName(pod) + decisions, ok := c.bindingDecisions[podName] + if !ok { + decisions = nodeDecisions{} + c.bindingDecisions[podName] = decisions + } + decision, ok := decisions[node] + if !ok { + decision = nodeDecision{ + provisionings: pvcs, + } + } else { + decision.provisionings = pvcs + } + decisions[node] = decision +} + +func (c *podBindingCache) GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim { + c.mutex.Lock() + defer c.mutex.Unlock() + + podName := getPodName(pod) + decisions, ok := c.bindingDecisions[podName] + if !ok { + return nil + } + decision, ok := decisions[node] + if !ok { + return nil + } + return decision.provisionings } diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder_cache_test.go b/pkg/controller/volume/persistentvolume/scheduler_binder_cache_test.go index d39d823c26d..65086274cc3 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder_cache_test.go +++ b/pkg/controller/volume/persistentvolume/scheduler_binder_cache_test.go @@ -26,32 +26,37 @@ import ( func TestUpdateGetBindings(t *testing.T) { scenarios := map[string]struct { - updateBindings []*bindingInfo - updatePod string - updateNode string + updateBindings []*bindingInfo + updateProvisionings []*v1.PersistentVolumeClaim + updatePod string + updateNode string - getBindings []*bindingInfo - getPod string - getNode string + getBindings []*bindingInfo + getProvisionings []*v1.PersistentVolumeClaim + getPod string + getNode string }{ "no-pod": { getPod: "pod1", getNode: "node1", }, "no-node": { - updatePod: "pod1", - updateNode: "node1", - updateBindings: []*bindingInfo{}, - getPod: "pod1", - getNode: "node2", + updatePod: "pod1", + updateNode: "node1", + updateBindings: []*bindingInfo{}, + updateProvisionings: []*v1.PersistentVolumeClaim{}, + getPod: "pod1", + getNode: "node2", }, "binding-exists": { - updatePod: "pod1", - updateNode: "node1", - updateBindings: []*bindingInfo{{pvc: &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "pvc1"}}}}, - getPod: "pod1", - getNode: "node1", - getBindings: []*bindingInfo{{pvc: &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "pvc1"}}}}, + updatePod: "pod1", + updateNode: "node1", + updateBindings: []*bindingInfo{{pvc: &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "pvc1"}}}}, + updateProvisionings: []*v1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{Name: "pvc2"}}}, + getPod: "pod1", + getNode: "node1", + getBindings: []*bindingInfo{{pvc: &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "pvc1"}}}}, + getProvisionings: []*v1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{Name: "pvc2"}}}, }, } @@ -61,6 +66,7 @@ func TestUpdateGetBindings(t *testing.T) { // Perform updates updatePod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: scenario.updatePod, Namespace: "ns"}} cache.UpdateBindings(updatePod, scenario.updateNode, scenario.updateBindings) + cache.UpdateProvisionedPVCs(updatePod, scenario.updateNode, scenario.updateProvisionings) // Verify updated bindings bindings := cache.GetBindings(updatePod, scenario.updateNode) @@ -68,45 +74,71 @@ func TestUpdateGetBindings(t *testing.T) { t.Errorf("Test %v failed: returned bindings after update different. Got %+v, expected %+v", name, bindings, scenario.updateBindings) } + // Verify updated provisionings + provisionings := cache.GetProvisionedPVCs(updatePod, scenario.updateNode) + if !reflect.DeepEqual(provisionings, scenario.updateProvisionings) { + t.Errorf("Test %v failed: returned provisionings after update different. Got %+v, expected %+v", name, provisionings, scenario.updateProvisionings) + } + // Get bindings getPod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: scenario.getPod, Namespace: "ns"}} bindings = cache.GetBindings(getPod, scenario.getNode) if !reflect.DeepEqual(bindings, scenario.getBindings) { t.Errorf("Test %v failed: unexpected bindings returned. Got %+v, expected %+v", name, bindings, scenario.updateBindings) } + + // Get provisionings + provisionings = cache.GetProvisionedPVCs(getPod, scenario.getNode) + if !reflect.DeepEqual(provisionings, scenario.getProvisionings) { + t.Errorf("Test %v failed: unexpected bindings returned. Got %+v, expected %+v", name, provisionings, scenario.getProvisionings) + } } } func TestDeleteBindings(t *testing.T) { initialBindings := []*bindingInfo{{pvc: &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "pvc1"}}}} + initialProvisionings := []*v1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{Name: "pvc2"}}} cache := NewPodBindingCache() pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "ns"}} - // Get nil bindings + // Get nil bindings and provisionings bindings := cache.GetBindings(pod, "node1") if bindings != nil { t.Errorf("Test failed: expected initial nil bindings, got %+v", bindings) } + provisionings := cache.GetProvisionedPVCs(pod, "node1") + if provisionings != nil { + t.Errorf("Test failed: expected initial nil provisionings, got %+v", provisionings) + } // Delete nothing cache.DeleteBindings(pod) // Perform updates cache.UpdateBindings(pod, "node1", initialBindings) + cache.UpdateProvisionedPVCs(pod, "node1", initialProvisionings) - // Get bindings + // Get bindings and provisionings bindings = cache.GetBindings(pod, "node1") if !reflect.DeepEqual(bindings, initialBindings) { t.Errorf("Test failed: expected bindings %+v, got %+v", initialBindings, bindings) } + provisionings = cache.GetProvisionedPVCs(pod, "node1") + if !reflect.DeepEqual(provisionings, initialProvisionings) { + t.Errorf("Test failed: expected provisionings %+v, got %+v", initialProvisionings, provisionings) + } // Delete cache.DeleteBindings(pod) - // Get bindings + // Get bindings and provisionings bindings = cache.GetBindings(pod, "node1") if bindings != nil { t.Errorf("Test failed: expected nil bindings, got %+v", bindings) } + provisionings = cache.GetProvisionedPVCs(pod, "node1") + if provisionings != nil { + t.Errorf("Test failed: expected nil provisionings, got %+v", provisionings) + } }