From 8b6bf4e235204a6646f3902221afd42c65c3d8bb Mon Sep 17 00:00:00 2001 From: Yang Guo Date: Mon, 16 Oct 2017 13:30:25 -0700 Subject: [PATCH] Change scheduler to skip pod with only annotations updates --- plugin/pkg/scheduler/factory/BUILD | 1 + plugin/pkg/scheduler/factory/factory.go | 56 +++++++++++- plugin/pkg/scheduler/factory/factory_test.go | 91 +++++++++++++++++++ plugin/pkg/scheduler/schedulercache/cache.go | 33 +++++++ .../scheduler/schedulercache/cache_test.go | 24 +++++ .../pkg/scheduler/schedulercache/interface.go | 7 ++ plugin/pkg/scheduler/testing/fake_cache.go | 14 ++- 7 files changed, 221 insertions(+), 5 deletions(-) diff --git a/plugin/pkg/scheduler/factory/BUILD b/plugin/pkg/scheduler/factory/BUILD index 9b13f8557a0..fae12b6e8a8 100644 --- a/plugin/pkg/scheduler/factory/BUILD +++ b/plugin/pkg/scheduler/factory/BUILD @@ -61,6 +61,7 @@ go_test( "//plugin/pkg/scheduler/api:go_default_library", "//plugin/pkg/scheduler/api/latest:go_default_library", "//plugin/pkg/scheduler/schedulercache:go_default_library", + "//plugin/pkg/scheduler/testing:go_default_library", "//plugin/pkg/scheduler/util:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 56f52b4f072..0d46abb7980 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -19,10 +19,13 @@ limitations under the License. package factory import ( + "encoding/json" "fmt" "reflect" "time" + "github.com/golang/glog" + "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -52,9 +55,6 @@ import ( "k8s.io/kubernetes/plugin/pkg/scheduler/core" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/util" - - "encoding/json" - "github.com/golang/glog" ) const ( @@ -192,6 +192,9 @@ func NewConfigFactory( } }, UpdateFunc: func(oldObj, newObj interface{}) { + if c.skipPodUpdate(newObj.(*v1.Pod)) { + return + } if err := c.podQueue.Update(newObj); err != nil { runtime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err)) } @@ -255,6 +258,53 @@ func NewConfigFactory( return c } +// skipPodUpdate checks whether the specified pod update should be ignored. +// This function will return true if +// - The pod has already been assumed, AND +// - The pod has only its ResourceVersion, Spec.NodeName and/or Annotations +// updated. +func (c *configFactory) skipPodUpdate(pod *v1.Pod) bool { + // Non-assumed pods should never be skipped. + isAssumed, err := c.schedulerCache.IsAssumedPod(pod) + if err != nil { + runtime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err)) + return false + } + if !isAssumed { + return false + } + + // Gets the assumed pod from the cache. + assumedPod, err := c.schedulerCache.GetPod(pod) + if err != nil { + runtime.HandleError(fmt.Errorf("failed to get assumed pod %s/%s from cache: %v", pod.Namespace, pod.Name, err)) + return false + } + + // Compares the assumed pod in the cache with the pod update. If they are + // equal (with certain fields excluded), this pod update will be skipped. + f := func(pod *v1.Pod) *v1.Pod { + p := pod.DeepCopy() + // ResourceVersion must be excluded because each object update will + // have a new resource version. + p.ResourceVersion = "" + // Spec.NodeName must be excluded because the pod assumed in the cache + // is expected to have a node assigned while the pod update may nor may + // not have this field set. + p.Spec.NodeName = "" + // Annotations must be excluded for the reasons described in + // https://github.com/kubernetes/kubernetes/issues/52914. + p.Annotations = nil + return p + } + assumedPodCopy, podCopy := f(assumedPod), f(pod) + if !reflect.DeepEqual(assumedPodCopy, podCopy) { + return false + } + glog.V(3).Infof("Skipping pod %s/%s update", pod.Namespace, pod.Name) + return true +} + func (c *configFactory) onPvAdd(obj interface{}) { if c.enableEquivalenceClassCache { pv, ok := obj.(*v1.PersistentVolume) diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index f75f782e257..33f2020c7b1 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -37,6 +37,7 @@ import ( schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" + schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing" "k8s.io/kubernetes/plugin/pkg/scheduler/util" ) @@ -525,3 +526,93 @@ func TestInvalidFactoryArgs(t *testing.T) { } } + +func TestSkipPodUpdate(t *testing.T) { + for _, test := range []struct { + pod *v1.Pod + isAssumedPodFunc func(*v1.Pod) bool + getPodFunc func(*v1.Pod) *v1.Pod + expected bool + }{ + // Non-assumed pod should not be skipped. + { + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-0", + }, + }, + isAssumedPodFunc: func(*v1.Pod) bool { return false }, + getPodFunc: func(*v1.Pod) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-0", + }, + } + }, + expected: false, + }, + // Pod update (with changes on ResourceVersion, Spec.NodeName and/or + // Annotations) for an already assumed pod should be skipped. + { + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-0", + Annotations: map[string]string{"a": "b"}, + ResourceVersion: "0", + }, + Spec: v1.PodSpec{ + NodeName: "node-0", + }, + }, + isAssumedPodFunc: func(*v1.Pod) bool { + return true + }, + getPodFunc: func(*v1.Pod) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-0", + Annotations: map[string]string{"c": "d"}, + ResourceVersion: "1", + }, + Spec: v1.PodSpec{ + NodeName: "node-1", + }, + } + }, + expected: true, + }, + // Pod update (with changes on Labels) for an already assumed pod + // should not be skipped. + { + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-0", + Labels: map[string]string{"a": "b"}, + }, + }, + isAssumedPodFunc: func(*v1.Pod) bool { + return true + }, + getPodFunc: func(*v1.Pod) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-0", + Labels: map[string]string{"c": "d"}, + }, + } + }, + expected: false, + }, + } { + c := &configFactory{ + schedulerCache: &schedulertesting.FakeCache{ + IsAssumedPodFunc: test.isAssumedPodFunc, + GetPodFunc: test.getPodFunc, + }, + } + got := c.skipPodUpdate(test.pod) + if got != test.expected { + t.Errorf("skipPodUpdate() = %t, expected = %t", got, test.expected) + } + } +} diff --git a/plugin/pkg/scheduler/schedulercache/cache.go b/plugin/pkg/scheduler/schedulercache/cache.go index 0f562e25925..9b2dfd7ea77 100644 --- a/plugin/pkg/scheduler/schedulercache/cache.go +++ b/plugin/pkg/scheduler/schedulercache/cache.go @@ -307,6 +307,39 @@ func (cache *schedulerCache) RemovePod(pod *v1.Pod) error { return nil } +func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) { + key, err := getPodKey(pod) + if err != nil { + return false, err + } + + cache.mu.Lock() + defer cache.mu.Unlock() + + b, found := cache.assumedPods[key] + if !found { + return false, nil + } + return b, nil +} + +func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) { + key, err := getPodKey(pod) + if err != nil { + return nil, err + } + + cache.mu.Lock() + defer cache.mu.Unlock() + + podState, ok := cache.podStates[key] + if !ok { + return nil, fmt.Errorf("pod %v does not exist", key) + } + + return podState.pod, nil +} + func (cache *schedulerCache) AddNode(node *v1.Node) error { cache.mu.Lock() defer cache.mu.Unlock() diff --git a/plugin/pkg/scheduler/schedulercache/cache_test.go b/plugin/pkg/scheduler/schedulercache/cache_test.go index e289365d9af..f27c7b5ef7b 100644 --- a/plugin/pkg/scheduler/schedulercache/cache_test.go +++ b/plugin/pkg/scheduler/schedulercache/cache_test.go @@ -546,11 +546,35 @@ func TestForgetPod(t *testing.T) { if err := assumeAndFinishBinding(cache, pod, now); err != nil { t.Fatalf("assumePod failed: %v", err) } + isAssumed, err := cache.IsAssumedPod(pod) + if err != nil { + t.Fatalf("IsAssumedPod failed: %v.", err) + } + if !isAssumed { + t.Fatalf("Pod is expected to be assumed.") + } + assumedPod, err := cache.GetPod(pod) + if err != nil { + t.Fatalf("GetPod failed: %v.", err) + } + if assumedPod.Namespace != pod.Namespace { + t.Errorf("assumedPod.Namespace != pod.Namespace (%s != %s)", assumedPod.Namespace, pod.Namespace) + } + if assumedPod.Name != pod.Name { + t.Errorf("assumedPod.Name != pod.Name (%s != %s)", assumedPod.Name, pod.Name) + } } for _, pod := range tt.pods { if err := cache.ForgetPod(pod); err != nil { t.Fatalf("ForgetPod failed: %v", err) } + isAssumed, err := cache.IsAssumedPod(pod) + if err != nil { + t.Fatalf("IsAssumedPod failed: %v.", err) + } + if isAssumed { + t.Fatalf("Pod is expected to be unassumed.") + } } cache.cleanupAssumedPods(now.Add(2 * ttl)) if n := cache.nodes[nodeName]; n != nil { diff --git a/plugin/pkg/scheduler/schedulercache/interface.go b/plugin/pkg/scheduler/schedulercache/interface.go index 6acb92cd53d..a1ef77ea4fa 100644 --- a/plugin/pkg/scheduler/schedulercache/interface.go +++ b/plugin/pkg/scheduler/schedulercache/interface.go @@ -79,6 +79,13 @@ type Cache interface { // RemovePod removes a pod. The pod's information would be subtracted from assigned node. RemovePod(pod *v1.Pod) error + // GetPod returns the pod from the cache with the same namespace and the + // same name of the specified pod. + GetPod(pod *v1.Pod) (*v1.Pod, error) + + // IsAssumedPod returns true if the pod is assumed and not expired. + IsAssumedPod(pod *v1.Pod) (bool, error) + // AddNode adds overall information about node. AddNode(node *v1.Node) error diff --git a/plugin/pkg/scheduler/testing/fake_cache.go b/plugin/pkg/scheduler/testing/fake_cache.go index b85b165aa3d..f0de231e579 100644 --- a/plugin/pkg/scheduler/testing/fake_cache.go +++ b/plugin/pkg/scheduler/testing/fake_cache.go @@ -24,8 +24,10 @@ import ( // FakeCache is used for testing type FakeCache struct { - AssumeFunc func(*v1.Pod) - ForgetFunc func(*v1.Pod) + AssumeFunc func(*v1.Pod) + ForgetFunc func(*v1.Pod) + IsAssumedPodFunc func(*v1.Pod) bool + GetPodFunc func(*v1.Pod) *v1.Pod } func (f *FakeCache) AssumePod(pod *v1.Pod) error { @@ -46,6 +48,14 @@ func (f *FakeCache) UpdatePod(oldPod, newPod *v1.Pod) error { return nil } func (f *FakeCache) RemovePod(pod *v1.Pod) error { return nil } +func (f *FakeCache) IsAssumedPod(pod *v1.Pod) (bool, error) { + return f.IsAssumedPodFunc(pod), nil +} + +func (f *FakeCache) GetPod(pod *v1.Pod) (*v1.Pod, error) { + return f.GetPodFunc(pod), nil +} + func (f *FakeCache) AddNode(node *v1.Node) error { return nil } func (f *FakeCache) UpdateNode(oldNode, newNode *v1.Node) error { return nil }