From d4465b995e2fa30f618663e72017ecfac614ef3e Mon Sep 17 00:00:00 2001 From: tanjing2020 Date: Tue, 23 Mar 2021 10:18:50 +0800 Subject: [PATCH] Scheduler: skip updates of assumed pods --- pkg/scheduler/eventhandlers.go | 62 +---- pkg/scheduler/eventhandlers_test.go | 216 ------------------ .../internal/queue/scheduling_queue.go | 2 + pkg/scheduler/scheduler.go | 12 +- 4 files changed, 16 insertions(+), 276 deletions(-) diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index f90efd2969b..42a02cbe905 100755 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -174,9 +174,15 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) { if oldPod.ResourceVersion == newPod.ResourceVersion { return } - if sched.skipPodUpdate(newPod) { + + isAssumed, err := sched.SchedulerCache.IsAssumedPod(newPod) + if err != nil { + utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", newPod.Namespace, newPod.Name, err)) + } + if isAssumed { return } + if err := sched.SchedulingQueue.Update(oldPod, newPod); err != nil { utilruntime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err)) } @@ -299,60 +305,6 @@ func responsibleForPod(pod *v1.Pod, profiles profile.Map) bool { return profiles.HandlesSchedulerName(pod.Spec.SchedulerName) } -// skipPodUpdate checks whether the specified pod update should be ignored. -// This function will return true if -// - The pod has already been assumed, AND -// - The pod has only its ResourceVersion, Spec.NodeName, Annotations, -// ManagedFields, Finalizers and/or Conditions updated. -func (sched *Scheduler) skipPodUpdate(pod *v1.Pod) bool { - // Non-assumed pods should never be skipped. - isAssumed, err := sched.SchedulerCache.IsAssumedPod(pod) - if err != nil { - utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err)) - return false - } - if !isAssumed { - return false - } - - // Gets the assumed pod from the cache. - assumedPod, err := sched.SchedulerCache.GetPod(pod) - if err != nil { - utilruntime.HandleError(fmt.Errorf("failed to get assumed pod %s/%s from cache: %v", pod.Namespace, pod.Name, err)) - return false - } - - // Compares the assumed pod in the cache with the pod update. If they are - // equal (with certain fields excluded), this pod update will be skipped. - f := func(pod *v1.Pod) *v1.Pod { - p := pod.DeepCopy() - // ResourceVersion must be excluded because each object update will - // have a new resource version. - p.ResourceVersion = "" - // Spec.NodeName must be excluded because the pod assumed in the cache - // is expected to have a node assigned while the pod update may nor may - // not have this field set. - p.Spec.NodeName = "" - // Annotations must be excluded for the reasons described in - // https://github.com/kubernetes/kubernetes/issues/52914. - p.Annotations = nil - // Same as above, when annotations are modified with ServerSideApply, - // ManagedFields may also change and must be excluded - p.ManagedFields = nil - // The following might be changed by external controllers, but they don't - // affect scheduling decisions. - p.Finalizers = nil - p.Status.Conditions = nil - return p - } - assumedPodCopy, podCopy := f(assumedPod), f(pod) - if !reflect.DeepEqual(assumedPodCopy, podCopy) { - return false - } - klog.V(3).InfoS("Pod update ignored because changes won't affect scheduling", "pod", klog.KObj(pod)) - return true -} - // addAllEventHandlers is a helper function used in tests and in Scheduler // to add event handlers for various informers. func addAllEventHandlers( diff --git a/pkg/scheduler/eventhandlers_test.go b/pkg/scheduler/eventhandlers_test.go index b0bc31844e2..b76450255cc 100644 --- a/pkg/scheduler/eventhandlers_test.go +++ b/pkg/scheduler/eventhandlers_test.go @@ -26,225 +26,9 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/scheduler/internal/cache" - fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake" "k8s.io/kubernetes/pkg/scheduler/internal/queue" ) -func TestSkipPodUpdate(t *testing.T) { - for _, test := range []struct { - pod *v1.Pod - isAssumedPodFunc func(*v1.Pod) bool - getPodFunc func(*v1.Pod) *v1.Pod - expected bool - name string - }{ - { - name: "Non-assumed pod", - pod: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-0", - }, - }, - isAssumedPodFunc: func(*v1.Pod) bool { return false }, - getPodFunc: func(*v1.Pod) *v1.Pod { - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-0", - }, - } - }, - expected: false, - }, - { - name: "with changes on ResourceVersion, Spec.NodeName and/or Annotations", - pod: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-0", - Annotations: map[string]string{"a": "b"}, - ResourceVersion: "0", - }, - Spec: v1.PodSpec{ - NodeName: "node-0", - }, - }, - isAssumedPodFunc: func(*v1.Pod) bool { - return true - }, - getPodFunc: func(*v1.Pod) *v1.Pod { - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-0", - Annotations: map[string]string{"c": "d"}, - ResourceVersion: "1", - }, - Spec: v1.PodSpec{ - NodeName: "node-1", - }, - } - }, - expected: true, - }, - { - name: "with ServerSideApply changes on Annotations", - pod: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-0", - Annotations: map[string]string{"a": "b"}, - ResourceVersion: "0", - ManagedFields: []metav1.ManagedFieldsEntry{ - { - Manager: "some-actor", - Operation: metav1.ManagedFieldsOperationApply, - APIVersion: "v1", - FieldsType: "FieldsV1", - FieldsV1: &metav1.FieldsV1{ - Raw: []byte(` - "f:metadata": { - "f:annotations": { - "f:a: {} - } - } - `), - }, - }, - }, - }, - Spec: v1.PodSpec{ - NodeName: "node-0", - }, - }, - isAssumedPodFunc: func(*v1.Pod) bool { - return true - }, - getPodFunc: func(*v1.Pod) *v1.Pod { - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-0", - Annotations: map[string]string{"a": "c", "d": "e"}, - ResourceVersion: "1", - ManagedFields: []metav1.ManagedFieldsEntry{ - { - Manager: "some-actor", - Operation: metav1.ManagedFieldsOperationApply, - APIVersion: "v1", - FieldsType: "FieldsV1", - FieldsV1: &metav1.FieldsV1{ - Raw: []byte(` - "f:metadata": { - "f:annotations": { - "f:a: {} - "f:d: {} - } - } - `), - }, - }, - { - Manager: "some-actor", - Operation: metav1.ManagedFieldsOperationApply, - APIVersion: "v1", - FieldsType: "FieldsV1", - FieldsV1: &metav1.FieldsV1{ - Raw: []byte(` - "f:metadata": { - "f:annotations": { - "f:a: {} - } - } - `), - }, - }, - }, - }, - Spec: v1.PodSpec{ - NodeName: "node-1", - }, - } - }, - expected: true, - }, - { - name: "with changes on Labels", - pod: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-0", - Labels: map[string]string{"a": "b"}, - }, - }, - isAssumedPodFunc: func(*v1.Pod) bool { - return true - }, - getPodFunc: func(*v1.Pod) *v1.Pod { - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-0", - Labels: map[string]string{"c": "d"}, - }, - } - }, - expected: false, - }, - { - name: "with changes on Finalizers", - pod: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-0", - Finalizers: []string{"a", "b"}, - }, - }, - isAssumedPodFunc: func(*v1.Pod) bool { - return true - }, - getPodFunc: func(*v1.Pod) *v1.Pod { - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-0", - Finalizers: []string{"c", "d"}, - }, - } - }, - expected: true, - }, - { - name: "with changes on Conditions", - pod: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-0", - }, - Status: v1.PodStatus{ - Conditions: []v1.PodCondition{ - {Type: "foo"}, - }, - }, - }, - isAssumedPodFunc: func(*v1.Pod) bool { - return true - }, - getPodFunc: func(*v1.Pod) *v1.Pod { - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-0", - }, - } - }, - expected: true, - }, - } { - t.Run(test.name, func(t *testing.T) { - c := &Scheduler{ - SchedulerCache: &fakecache.Cache{ - IsAssumedPodFunc: test.isAssumedPodFunc, - GetPodFunc: test.getPodFunc, - }, - } - got := c.skipPodUpdate(test.pod) - if got != test.expected { - t.Errorf("skipPodUpdate() = %t, expected = %t", got, test.expected) - } - }) - } -} - func TestNodeAllocatableChanged(t *testing.T) { newQuantity := func(value int64) resource.Quantity { return *resource.NewQuantity(value, resource.BinarySI) diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 725a05b27f5..3e4a2233f31 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -426,6 +426,8 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool { p.ResourceVersion = "" p.Generation = 0 p.Status = v1.PodStatus{} + p.ManagedFields = nil + p.Finalizers = nil return p } return !reflect.DeepEqual(strip(oldPod), strip(newPod)) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index e1de5812fd7..18170d14a34 100755 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -28,6 +28,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" @@ -639,14 +640,15 @@ func (sched *Scheduler) skipPodSchedule(fwk framework.Framework, pod *v1.Pod) bo return true } - // Case 2: pod has been assumed and pod updates could be skipped. + // Case 2: pod has been assumed could be skipped. // An assumed pod can be added again to the scheduling queue if it got an update event // during its previous scheduling cycle but before getting assumed. - if sched.skipPodUpdate(pod) { - return true + isAssumed, err := sched.SchedulerCache.IsAssumedPod(pod) + if err != nil { + utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err)) + return false } - - return false + return isAssumed } func defaultAlgorithmSourceProviderName() *string {