Merge pull request #100286 from tanjing2020/skip_updates_assumed_pods

Scheduler: skip updates of assumed pods
This commit is contained in:
Kubernetes Prow Robot 2021-04-08 20:29:29 -07:00 committed by GitHub
commit ae40c62c49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 16 additions and 276 deletions

View File

@ -174,9 +174,15 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
if oldPod.ResourceVersion == newPod.ResourceVersion {
return
}
if sched.skipPodUpdate(newPod) {
isAssumed, err := sched.SchedulerCache.IsAssumedPod(newPod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", newPod.Namespace, newPod.Name, err))
}
if isAssumed {
return
}
if err := sched.SchedulingQueue.Update(oldPod, newPod); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err))
}
@ -299,60 +305,6 @@ func responsibleForPod(pod *v1.Pod, profiles profile.Map) bool {
return profiles.HandlesSchedulerName(pod.Spec.SchedulerName)
}
// skipPodUpdate checks whether the specified pod update should be ignored.
// This function will return true if
// - The pod has already been assumed, AND
// - The pod has only its ResourceVersion, Spec.NodeName, Annotations,
// ManagedFields, Finalizers and/or Conditions updated.
func (sched *Scheduler) skipPodUpdate(pod *v1.Pod) bool {
// Non-assumed pods should never be skipped.
isAssumed, err := sched.SchedulerCache.IsAssumedPod(pod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
return false
}
if !isAssumed {
return false
}
// Gets the assumed pod from the cache.
assumedPod, err := sched.SchedulerCache.GetPod(pod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to get assumed pod %s/%s from cache: %v", pod.Namespace, pod.Name, err))
return false
}
// Compares the assumed pod in the cache with the pod update. If they are
// equal (with certain fields excluded), this pod update will be skipped.
f := func(pod *v1.Pod) *v1.Pod {
p := pod.DeepCopy()
// ResourceVersion must be excluded because each object update will
// have a new resource version.
p.ResourceVersion = ""
// Spec.NodeName must be excluded because the pod assumed in the cache
// is expected to have a node assigned while the pod update may nor may
// not have this field set.
p.Spec.NodeName = ""
// Annotations must be excluded for the reasons described in
// https://github.com/kubernetes/kubernetes/issues/52914.
p.Annotations = nil
// Same as above, when annotations are modified with ServerSideApply,
// ManagedFields may also change and must be excluded
p.ManagedFields = nil
// The following might be changed by external controllers, but they don't
// affect scheduling decisions.
p.Finalizers = nil
p.Status.Conditions = nil
return p
}
assumedPodCopy, podCopy := f(assumedPod), f(pod)
if !reflect.DeepEqual(assumedPodCopy, podCopy) {
return false
}
klog.V(3).InfoS("Pod update ignored because changes won't affect scheduling", "pod", klog.KObj(pod))
return true
}
// addAllEventHandlers is a helper function used in tests and in Scheduler
// to add event handlers for various informers.
func addAllEventHandlers(

View File

@ -28,226 +28,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
"k8s.io/kubernetes/pkg/scheduler/internal/queue"
st "k8s.io/kubernetes/pkg/scheduler/testing"
)
func TestSkipPodUpdate(t *testing.T) {
for _, test := range []struct {
pod *v1.Pod
isAssumedPodFunc func(*v1.Pod) bool
getPodFunc func(*v1.Pod) *v1.Pod
expected bool
name string
}{
{
name: "Non-assumed pod",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
},
},
isAssumedPodFunc: func(*v1.Pod) bool { return false },
getPodFunc: func(*v1.Pod) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
},
}
},
expected: false,
},
{
name: "with changes on ResourceVersion, Spec.NodeName and/or Annotations",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Annotations: map[string]string{"a": "b"},
ResourceVersion: "0",
},
Spec: v1.PodSpec{
NodeName: "node-0",
},
},
isAssumedPodFunc: func(*v1.Pod) bool {
return true
},
getPodFunc: func(*v1.Pod) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Annotations: map[string]string{"c": "d"},
ResourceVersion: "1",
},
Spec: v1.PodSpec{
NodeName: "node-1",
},
}
},
expected: true,
},
{
name: "with ServerSideApply changes on Annotations",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Annotations: map[string]string{"a": "b"},
ResourceVersion: "0",
ManagedFields: []metav1.ManagedFieldsEntry{
{
Manager: "some-actor",
Operation: metav1.ManagedFieldsOperationApply,
APIVersion: "v1",
FieldsType: "FieldsV1",
FieldsV1: &metav1.FieldsV1{
Raw: []byte(`
"f:metadata": {
"f:annotations": {
"f:a: {}
}
}
`),
},
},
},
},
Spec: v1.PodSpec{
NodeName: "node-0",
},
},
isAssumedPodFunc: func(*v1.Pod) bool {
return true
},
getPodFunc: func(*v1.Pod) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Annotations: map[string]string{"a": "c", "d": "e"},
ResourceVersion: "1",
ManagedFields: []metav1.ManagedFieldsEntry{
{
Manager: "some-actor",
Operation: metav1.ManagedFieldsOperationApply,
APIVersion: "v1",
FieldsType: "FieldsV1",
FieldsV1: &metav1.FieldsV1{
Raw: []byte(`
"f:metadata": {
"f:annotations": {
"f:a: {}
"f:d: {}
}
}
`),
},
},
{
Manager: "some-actor",
Operation: metav1.ManagedFieldsOperationApply,
APIVersion: "v1",
FieldsType: "FieldsV1",
FieldsV1: &metav1.FieldsV1{
Raw: []byte(`
"f:metadata": {
"f:annotations": {
"f:a: {}
}
}
`),
},
},
},
},
Spec: v1.PodSpec{
NodeName: "node-1",
},
}
},
expected: true,
},
{
name: "with changes on Labels",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Labels: map[string]string{"a": "b"},
},
},
isAssumedPodFunc: func(*v1.Pod) bool {
return true
},
getPodFunc: func(*v1.Pod) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Labels: map[string]string{"c": "d"},
},
}
},
expected: false,
},
{
name: "with changes on Finalizers",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Finalizers: []string{"a", "b"},
},
},
isAssumedPodFunc: func(*v1.Pod) bool {
return true
},
getPodFunc: func(*v1.Pod) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Finalizers: []string{"c", "d"},
},
}
},
expected: true,
},
{
name: "with changes on Conditions",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
},
Status: v1.PodStatus{
Conditions: []v1.PodCondition{
{Type: "foo"},
},
},
},
isAssumedPodFunc: func(*v1.Pod) bool {
return true
},
getPodFunc: func(*v1.Pod) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
},
}
},
expected: true,
},
} {
t.Run(test.name, func(t *testing.T) {
c := &Scheduler{
SchedulerCache: &fakecache.Cache{
IsAssumedPodFunc: test.isAssumedPodFunc,
GetPodFunc: test.getPodFunc,
},
}
got := c.skipPodUpdate(test.pod)
if got != test.expected {
t.Errorf("skipPodUpdate() = %t, expected = %t", got, test.expected)
}
})
}
}
func TestNodeAllocatableChanged(t *testing.T) {
newQuantity := func(value int64) resource.Quantity {
return *resource.NewQuantity(value, resource.BinarySI)

View File

@ -431,6 +431,8 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool {
p.ResourceVersion = ""
p.Generation = 0
p.Status = v1.PodStatus{}
p.ManagedFields = nil
p.Finalizers = nil
return p
}
return !reflect.DeepEqual(strip(oldPod), strip(newPod))

View File

@ -28,6 +28,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
@ -639,14 +640,15 @@ func (sched *Scheduler) skipPodSchedule(fwk framework.Framework, pod *v1.Pod) bo
return true
}
// Case 2: pod has been assumed and pod updates could be skipped.
// Case 2: pod has been assumed could be skipped.
// An assumed pod can be added again to the scheduling queue if it got an update event
// during its previous scheduling cycle but before getting assumed.
if sched.skipPodUpdate(pod) {
return true
isAssumed, err := sched.SchedulerCache.IsAssumedPod(pod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
return false
}
return false
return isAssumed
}
func defaultAlgorithmSourceProviderName() *string {