Merge UpdateProvisionedPVCs with UpdateBindings.

This simplifies code and saves a lock.
This commit is contained in:
Yecheng Fu 2019-01-16 13:25:24 +08:00
parent 3478647333
commit 7fe97886a8
4 changed files with 20 additions and 46 deletions

View File

@ -160,11 +160,8 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume
if len(provisionedClaims) == 0 { if len(provisionedClaims) == 0 {
provisionedClaims = nil provisionedClaims = nil
} }
// TODO merge into one atomic function // Mark cache with all matched and provisioned claims for this node
// Mark cache with all the matches for each PVC for this node b.podBindingCache.UpdateBindings(pod, node.Name, matchedClaims, provisionedClaims)
b.podBindingCache.UpdateBindings(pod, node.Name, matchedClaims)
// Mark cache with all the PVCs that need provisioning for this node
b.podBindingCache.UpdateProvisionedPVCs(pod, node.Name, provisionedClaims)
}() }()
podName := getPodName(pod) podName := getPodName(pod)
@ -318,8 +315,7 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al
// Update cache with the assumed pvcs and pvs // Update cache with the assumed pvcs and pvs
// Even if length is zero, update the cache with an empty slice to indicate that no // Even if length is zero, update the cache with an empty slice to indicate that no
// operations are needed // operations are needed
b.podBindingCache.UpdateBindings(assumedPod, nodeName, newBindings) b.podBindingCache.UpdateBindings(assumedPod, nodeName, newBindings, newProvisionedPVCs)
b.podBindingCache.UpdateProvisionedPVCs(assumedPod, nodeName, newProvisionedPVCs)
return return
} }

View File

@ -28,18 +28,13 @@ import (
type PodBindingCache interface { type PodBindingCache interface {
// UpdateBindings will update the cache with the given bindings for the // UpdateBindings will update the cache with the given bindings for the
// pod and node. // pod and node.
UpdateBindings(pod *v1.Pod, node string, bindings []*bindingInfo) UpdateBindings(pod *v1.Pod, node string, bindings []*bindingInfo, provisionings []*v1.PersistentVolumeClaim)
// GetBindings will return the cached bindings for the given pod and node. // GetBindings will return the cached bindings for the given pod and node.
// A nil return value means that the entry was not found. An empty slice // A nil return value means that the entry was not found. An empty slice
// means that no binding operations are needed. // means that no binding operations are needed.
GetBindings(pod *v1.Pod, node string) []*bindingInfo GetBindings(pod *v1.Pod, node string) []*bindingInfo
// UpdateProvisionedPVCs will update the cache with the given provisioning decisions
// for the pod and node.
UpdateProvisionedPVCs(pod *v1.Pod, node string, provisionings []*v1.PersistentVolumeClaim)
// GetProvisionedPVCs will return the cached provisioning decisions for the given pod and node.
// A nil return value means that the entry was not found. An empty slice // A nil return value means that the entry was not found. An empty slice
// means that no provisioning operations are needed. // means that no provisioning operations are needed.
GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim
@ -98,7 +93,7 @@ func (c *podBindingCache) DeleteBindings(pod *v1.Pod) {
} }
} }
func (c *podBindingCache) UpdateBindings(pod *v1.Pod, node string, bindings []*bindingInfo) { func (c *podBindingCache) UpdateBindings(pod *v1.Pod, node string, bindings []*bindingInfo, pvcs []*v1.PersistentVolumeClaim) {
c.rwMutex.Lock() c.rwMutex.Lock()
defer c.rwMutex.Unlock() defer c.rwMutex.Unlock()
@ -112,10 +107,12 @@ func (c *podBindingCache) UpdateBindings(pod *v1.Pod, node string, bindings []*b
if !ok { if !ok {
decision = nodeDecision{ decision = nodeDecision{
bindings: bindings, bindings: bindings,
provisionings: pvcs,
} }
VolumeBindingRequestSchedulerBinderCache.WithLabelValues("add").Inc() VolumeBindingRequestSchedulerBinderCache.WithLabelValues("add").Inc()
} else { } else {
decision.bindings = bindings decision.bindings = bindings
decision.provisionings = pvcs
} }
decisions[node] = decision decisions[node] = decision
} }
@ -136,27 +133,6 @@ func (c *podBindingCache) GetBindings(pod *v1.Pod, node string) []*bindingInfo {
return decision.bindings return decision.bindings
} }
func (c *podBindingCache) UpdateProvisionedPVCs(pod *v1.Pod, node string, pvcs []*v1.PersistentVolumeClaim) {
c.rwMutex.Lock()
defer c.rwMutex.Unlock()
podName := getPodName(pod)
decisions, ok := c.bindingDecisions[podName]
if !ok {
decisions = nodeDecisions{}
c.bindingDecisions[podName] = decisions
}
decision, ok := decisions[node]
if !ok {
decision = nodeDecision{
provisionings: pvcs,
}
} else {
decision.provisionings = pvcs
}
decisions[node] = decision
}
func (c *podBindingCache) GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim { func (c *podBindingCache) GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim {
c.rwMutex.RLock() c.rwMutex.RLock()
defer c.rwMutex.RUnlock() defer c.rwMutex.RUnlock()

View File

@ -48,6 +48,14 @@ func TestUpdateGetBindings(t *testing.T) {
getPod: "pod1", getPod: "pod1",
getNode: "node2", getNode: "node2",
}, },
"binding-nil": {
updatePod: "pod1",
updateNode: "node1",
updateBindings: nil,
updateProvisionings: nil,
getPod: "pod1",
getNode: "node1",
},
"binding-exists": { "binding-exists": {
updatePod: "pod1", updatePod: "pod1",
updateNode: "node1", updateNode: "node1",
@ -65,8 +73,7 @@ func TestUpdateGetBindings(t *testing.T) {
// Perform updates // Perform updates
updatePod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: scenario.updatePod, Namespace: "ns"}} updatePod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: scenario.updatePod, Namespace: "ns"}}
cache.UpdateBindings(updatePod, scenario.updateNode, scenario.updateBindings) cache.UpdateBindings(updatePod, scenario.updateNode, scenario.updateBindings, scenario.updateProvisionings)
cache.UpdateProvisionedPVCs(updatePod, scenario.updateNode, scenario.updateProvisionings)
// Verify updated bindings // Verify updated bindings
bindings := cache.GetBindings(updatePod, scenario.updateNode) bindings := cache.GetBindings(updatePod, scenario.updateNode)
@ -116,8 +123,7 @@ func TestDeleteBindings(t *testing.T) {
cache.DeleteBindings(pod) cache.DeleteBindings(pod)
// Perform updates // Perform updates
cache.UpdateBindings(pod, "node1", initialBindings) cache.UpdateBindings(pod, "node1", initialBindings, initialProvisionings)
cache.UpdateProvisionedPVCs(pod, "node1", initialProvisionings)
// Get bindings and provisionings // Get bindings and provisionings
bindings = cache.GetBindings(pod, "node1") bindings = cache.GetBindings(pod, "node1")

View File

@ -328,8 +328,6 @@ func (env *testEnv) assumeVolumes(t *testing.T, name, node string, pod *v1.Pod,
} }
} }
env.internalBinder.podBindingCache.UpdateBindings(pod, node, bindings)
pvcCache := env.internalBinder.pvcCache pvcCache := env.internalBinder.pvcCache
for _, pvc := range provisionings { for _, pvc := range provisionings {
if err := pvcCache.Assume(pvc); err != nil { if err := pvcCache.Assume(pvc); err != nil {
@ -337,14 +335,12 @@ func (env *testEnv) assumeVolumes(t *testing.T, name, node string, pod *v1.Pod,
} }
} }
env.internalBinder.podBindingCache.UpdateProvisionedPVCs(pod, node, provisionings) env.internalBinder.podBindingCache.UpdateBindings(pod, node, bindings, provisionings)
} }
func (env *testEnv) initPodCache(pod *v1.Pod, node string, bindings []*bindingInfo, provisionings []*v1.PersistentVolumeClaim) { func (env *testEnv) initPodCache(pod *v1.Pod, node string, bindings []*bindingInfo, provisionings []*v1.PersistentVolumeClaim) {
cache := env.internalBinder.podBindingCache cache := env.internalBinder.podBindingCache
cache.UpdateBindings(pod, node, bindings) cache.UpdateBindings(pod, node, bindings, provisionings)
cache.UpdateProvisionedPVCs(pod, node, provisionings)
} }
func (env *testEnv) validatePodCache(t *testing.T, name, node string, pod *v1.Pod, expectedBindings []*bindingInfo, expectedProvisionings []*v1.PersistentVolumeClaim) { func (env *testEnv) validatePodCache(t *testing.T, name, node string, pod *v1.Pod, expectedBindings []*bindingInfo, expectedProvisionings []*v1.PersistentVolumeClaim) {