mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #72953 from cofyc/fix72045-performance
Improve FindPodVolumes performance
This commit is contained in:
commit
bec0c72f5a
@ -142,31 +142,19 @@ func (b *volumeBinder) GetBindingsCache() PodBindingCache {
|
|||||||
return b.podBindingCache
|
return b.podBindingCache
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func podHasClaims(pod *v1.Pod) bool {
|
||||||
|
for _, vol := range pod.Spec.Volumes {
|
||||||
|
if vol.PersistentVolumeClaim != nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// FindPodVolumes caches the matching PVs and PVCs to provision per node in podBindingCache.
|
// FindPodVolumes caches the matching PVs and PVCs to provision per node in podBindingCache.
|
||||||
// This method intentionally takes in a *v1.Node object instead of using volumebinder.nodeInformer.
|
// This method intentionally takes in a *v1.Node object instead of using volumebinder.nodeInformer.
|
||||||
// That's necessary because some operations will need to pass in to the predicate fake node objects.
|
// That's necessary because some operations will need to pass in to the predicate fake node objects.
|
||||||
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisfied, boundVolumesSatisfied bool, err error) {
|
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisfied, boundVolumesSatisfied bool, err error) {
|
||||||
var (
|
|
||||||
matchedClaims []*bindingInfo
|
|
||||||
provisionedClaims []*v1.PersistentVolumeClaim
|
|
||||||
)
|
|
||||||
defer func() {
|
|
||||||
// We recreate bindings for each new schedule loop.
|
|
||||||
// Although we do not distinguish nil from empty in this function, for
|
|
||||||
// easier testing, we normalize empty to nil.
|
|
||||||
if len(matchedClaims) == 0 {
|
|
||||||
matchedClaims = nil
|
|
||||||
}
|
|
||||||
if len(provisionedClaims) == 0 {
|
|
||||||
provisionedClaims = nil
|
|
||||||
}
|
|
||||||
// TODO merge into one atomic function
|
|
||||||
// Mark cache with all the matches for each PVC for this node
|
|
||||||
b.podBindingCache.UpdateBindings(pod, node.Name, matchedClaims)
|
|
||||||
// Mark cache with all the PVCs that need provisioning for this node
|
|
||||||
b.podBindingCache.UpdateProvisionedPVCs(pod, node.Name, provisionedClaims)
|
|
||||||
}()
|
|
||||||
|
|
||||||
podName := getPodName(pod)
|
podName := getPodName(pod)
|
||||||
|
|
||||||
// Warning: Below log needs high verbosity as it can be printed several times (#60933).
|
// Warning: Below log needs high verbosity as it can be printed several times (#60933).
|
||||||
@ -183,6 +171,34 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
if !podHasClaims(pod) {
|
||||||
|
// Fast path
|
||||||
|
return unboundVolumesSatisfied, boundVolumesSatisfied, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
matchedClaims []*bindingInfo
|
||||||
|
provisionedClaims []*v1.PersistentVolumeClaim
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
// We recreate bindings for each new schedule loop.
|
||||||
|
if len(matchedClaims) == 0 && len(provisionedClaims) == 0 {
|
||||||
|
// Clear cache if no claims to bind or provision for this node.
|
||||||
|
b.podBindingCache.ClearBindings(pod, node.Name)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Although we do not distinguish nil from empty in this function, for
|
||||||
|
// easier testing, we normalize empty to nil.
|
||||||
|
if len(matchedClaims) == 0 {
|
||||||
|
matchedClaims = nil
|
||||||
|
}
|
||||||
|
if len(provisionedClaims) == 0 {
|
||||||
|
provisionedClaims = nil
|
||||||
|
}
|
||||||
|
// Mark cache with all matched and provisioned claims for this node
|
||||||
|
b.podBindingCache.UpdateBindings(pod, node.Name, matchedClaims, provisionedClaims)
|
||||||
|
}()
|
||||||
|
|
||||||
// The pod's volumes need to be processed in one call to avoid the race condition where
|
// The pod's volumes need to be processed in one call to avoid the race condition where
|
||||||
// volumes can get bound/provisioned in between calls.
|
// volumes can get bound/provisioned in between calls.
|
||||||
boundClaims, claimsToBind, unboundClaimsImmediate, err := b.getPodVolumes(pod)
|
boundClaims, claimsToBind, unboundClaimsImmediate, err := b.getPodVolumes(pod)
|
||||||
@ -318,8 +334,7 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al
|
|||||||
// Update cache with the assumed pvcs and pvs
|
// Update cache with the assumed pvcs and pvs
|
||||||
// Even if length is zero, update the cache with an empty slice to indicate that no
|
// Even if length is zero, update the cache with an empty slice to indicate that no
|
||||||
// operations are needed
|
// operations are needed
|
||||||
b.podBindingCache.UpdateBindings(assumedPod, nodeName, newBindings)
|
b.podBindingCache.UpdateBindings(assumedPod, nodeName, newBindings, newProvisionedPVCs)
|
||||||
b.podBindingCache.UpdateProvisionedPVCs(assumedPod, nodeName, newProvisionedPVCs)
|
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -28,18 +28,16 @@ import (
|
|||||||
type PodBindingCache interface {
|
type PodBindingCache interface {
|
||||||
// UpdateBindings will update the cache with the given bindings for the
|
// UpdateBindings will update the cache with the given bindings for the
|
||||||
// pod and node.
|
// pod and node.
|
||||||
UpdateBindings(pod *v1.Pod, node string, bindings []*bindingInfo)
|
UpdateBindings(pod *v1.Pod, node string, bindings []*bindingInfo, provisionings []*v1.PersistentVolumeClaim)
|
||||||
|
|
||||||
|
// ClearBindings will clear the cached bindings for the given pod and node.
|
||||||
|
ClearBindings(pod *v1.Pod, node string)
|
||||||
|
|
||||||
// GetBindings will return the cached bindings for the given pod and node.
|
// GetBindings will return the cached bindings for the given pod and node.
|
||||||
// A nil return value means that the entry was not found. An empty slice
|
// A nil return value means that the entry was not found. An empty slice
|
||||||
// means that no binding operations are needed.
|
// means that no binding operations are needed.
|
||||||
GetBindings(pod *v1.Pod, node string) []*bindingInfo
|
GetBindings(pod *v1.Pod, node string) []*bindingInfo
|
||||||
|
|
||||||
// UpdateProvisionedPVCs will update the cache with the given provisioning decisions
|
|
||||||
// for the pod and node.
|
|
||||||
UpdateProvisionedPVCs(pod *v1.Pod, node string, provisionings []*v1.PersistentVolumeClaim)
|
|
||||||
|
|
||||||
// GetProvisionedPVCs will return the cached provisioning decisions for the given pod and node.
|
|
||||||
// A nil return value means that the entry was not found. An empty slice
|
// A nil return value means that the entry was not found. An empty slice
|
||||||
// means that no provisioning operations are needed.
|
// means that no provisioning operations are needed.
|
||||||
GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim
|
GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim
|
||||||
@ -98,7 +96,7 @@ func (c *podBindingCache) DeleteBindings(pod *v1.Pod) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *podBindingCache) UpdateBindings(pod *v1.Pod, node string, bindings []*bindingInfo) {
|
func (c *podBindingCache) UpdateBindings(pod *v1.Pod, node string, bindings []*bindingInfo, pvcs []*v1.PersistentVolumeClaim) {
|
||||||
c.rwMutex.Lock()
|
c.rwMutex.Lock()
|
||||||
defer c.rwMutex.Unlock()
|
defer c.rwMutex.Unlock()
|
||||||
|
|
||||||
@ -111,11 +109,13 @@ func (c *podBindingCache) UpdateBindings(pod *v1.Pod, node string, bindings []*b
|
|||||||
decision, ok := decisions[node]
|
decision, ok := decisions[node]
|
||||||
if !ok {
|
if !ok {
|
||||||
decision = nodeDecision{
|
decision = nodeDecision{
|
||||||
bindings: bindings,
|
bindings: bindings,
|
||||||
|
provisionings: pvcs,
|
||||||
}
|
}
|
||||||
VolumeBindingRequestSchedulerBinderCache.WithLabelValues("add").Inc()
|
VolumeBindingRequestSchedulerBinderCache.WithLabelValues("add").Inc()
|
||||||
} else {
|
} else {
|
||||||
decision.bindings = bindings
|
decision.bindings = bindings
|
||||||
|
decision.provisionings = pvcs
|
||||||
}
|
}
|
||||||
decisions[node] = decision
|
decisions[node] = decision
|
||||||
}
|
}
|
||||||
@ -136,27 +136,6 @@ func (c *podBindingCache) GetBindings(pod *v1.Pod, node string) []*bindingInfo {
|
|||||||
return decision.bindings
|
return decision.bindings
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *podBindingCache) UpdateProvisionedPVCs(pod *v1.Pod, node string, pvcs []*v1.PersistentVolumeClaim) {
|
|
||||||
c.rwMutex.Lock()
|
|
||||||
defer c.rwMutex.Unlock()
|
|
||||||
|
|
||||||
podName := getPodName(pod)
|
|
||||||
decisions, ok := c.bindingDecisions[podName]
|
|
||||||
if !ok {
|
|
||||||
decisions = nodeDecisions{}
|
|
||||||
c.bindingDecisions[podName] = decisions
|
|
||||||
}
|
|
||||||
decision, ok := decisions[node]
|
|
||||||
if !ok {
|
|
||||||
decision = nodeDecision{
|
|
||||||
provisionings: pvcs,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
decision.provisionings = pvcs
|
|
||||||
}
|
|
||||||
decisions[node] = decision
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *podBindingCache) GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim {
|
func (c *podBindingCache) GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim {
|
||||||
c.rwMutex.RLock()
|
c.rwMutex.RLock()
|
||||||
defer c.rwMutex.RUnlock()
|
defer c.rwMutex.RUnlock()
|
||||||
@ -172,3 +151,15 @@ func (c *podBindingCache) GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.Per
|
|||||||
}
|
}
|
||||||
return decision.provisionings
|
return decision.provisionings
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *podBindingCache) ClearBindings(pod *v1.Pod, node string) {
|
||||||
|
c.rwMutex.Lock()
|
||||||
|
defer c.rwMutex.Unlock()
|
||||||
|
|
||||||
|
podName := getPodName(pod)
|
||||||
|
decisions, ok := c.bindingDecisions[podName]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
delete(decisions, node)
|
||||||
|
}
|
||||||
|
@ -48,6 +48,14 @@ func TestUpdateGetBindings(t *testing.T) {
|
|||||||
getPod: "pod1",
|
getPod: "pod1",
|
||||||
getNode: "node2",
|
getNode: "node2",
|
||||||
},
|
},
|
||||||
|
"binding-nil": {
|
||||||
|
updatePod: "pod1",
|
||||||
|
updateNode: "node1",
|
||||||
|
updateBindings: nil,
|
||||||
|
updateProvisionings: nil,
|
||||||
|
getPod: "pod1",
|
||||||
|
getNode: "node1",
|
||||||
|
},
|
||||||
"binding-exists": {
|
"binding-exists": {
|
||||||
updatePod: "pod1",
|
updatePod: "pod1",
|
||||||
updateNode: "node1",
|
updateNode: "node1",
|
||||||
@ -65,8 +73,7 @@ func TestUpdateGetBindings(t *testing.T) {
|
|||||||
|
|
||||||
// Perform updates
|
// Perform updates
|
||||||
updatePod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: scenario.updatePod, Namespace: "ns"}}
|
updatePod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: scenario.updatePod, Namespace: "ns"}}
|
||||||
cache.UpdateBindings(updatePod, scenario.updateNode, scenario.updateBindings)
|
cache.UpdateBindings(updatePod, scenario.updateNode, scenario.updateBindings, scenario.updateProvisionings)
|
||||||
cache.UpdateProvisionedPVCs(updatePod, scenario.updateNode, scenario.updateProvisionings)
|
|
||||||
|
|
||||||
// Verify updated bindings
|
// Verify updated bindings
|
||||||
bindings := cache.GetBindings(updatePod, scenario.updateNode)
|
bindings := cache.GetBindings(updatePod, scenario.updateNode)
|
||||||
@ -116,8 +123,7 @@ func TestDeleteBindings(t *testing.T) {
|
|||||||
cache.DeleteBindings(pod)
|
cache.DeleteBindings(pod)
|
||||||
|
|
||||||
// Perform updates
|
// Perform updates
|
||||||
cache.UpdateBindings(pod, "node1", initialBindings)
|
cache.UpdateBindings(pod, "node1", initialBindings, initialProvisionings)
|
||||||
cache.UpdateProvisionedPVCs(pod, "node1", initialProvisionings)
|
|
||||||
|
|
||||||
// Get bindings and provisionings
|
// Get bindings and provisionings
|
||||||
bindings = cache.GetBindings(pod, "node1")
|
bindings = cache.GetBindings(pod, "node1")
|
||||||
|
@ -328,8 +328,6 @@ func (env *testEnv) assumeVolumes(t *testing.T, name, node string, pod *v1.Pod,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
env.internalBinder.podBindingCache.UpdateBindings(pod, node, bindings)
|
|
||||||
|
|
||||||
pvcCache := env.internalBinder.pvcCache
|
pvcCache := env.internalBinder.pvcCache
|
||||||
for _, pvc := range provisionings {
|
for _, pvc := range provisionings {
|
||||||
if err := pvcCache.Assume(pvc); err != nil {
|
if err := pvcCache.Assume(pvc); err != nil {
|
||||||
@ -337,14 +335,12 @@ func (env *testEnv) assumeVolumes(t *testing.T, name, node string, pod *v1.Pod,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
env.internalBinder.podBindingCache.UpdateProvisionedPVCs(pod, node, provisionings)
|
env.internalBinder.podBindingCache.UpdateBindings(pod, node, bindings, provisionings)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (env *testEnv) initPodCache(pod *v1.Pod, node string, bindings []*bindingInfo, provisionings []*v1.PersistentVolumeClaim) {
|
func (env *testEnv) initPodCache(pod *v1.Pod, node string, bindings []*bindingInfo, provisionings []*v1.PersistentVolumeClaim) {
|
||||||
cache := env.internalBinder.podBindingCache
|
cache := env.internalBinder.podBindingCache
|
||||||
cache.UpdateBindings(pod, node, bindings)
|
cache.UpdateBindings(pod, node, bindings, provisionings)
|
||||||
|
|
||||||
cache.UpdateProvisionedPVCs(pod, node, provisionings)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (env *testEnv) validatePodCache(t *testing.T, name, node string, pod *v1.Pod, expectedBindings []*bindingInfo, expectedProvisionings []*v1.PersistentVolumeClaim) {
|
func (env *testEnv) validatePodCache(t *testing.T, name, node string, pod *v1.Pod, expectedBindings []*bindingInfo, expectedProvisionings []*v1.PersistentVolumeClaim) {
|
||||||
|
Loading…
Reference in New Issue
Block a user