Merge pull request #54008 from yguo0905/sched-fix

Automatic merge from submit-queue (batch tested with PRs 53978, 54008, 53037). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Change scheduler to skip pod with updates only on pod annotations

Fixes #52914, by checking whether the pod is already assumed before scheduling it.

**Release note**:

```
Scheduler cache ignores updates to an assumed pod if updates are limited to pod annotations.
```

/sig scheduling
/assign @bsalamat 
/cc @vishh
This commit is contained in:
Kubernetes Submit Queue 2017-10-17 04:53:40 -07:00 committed by GitHub
commit 5152f342eb
7 changed files with 221 additions and 5 deletions

View File

@ -63,6 +63,7 @@ go_test(
"//plugin/pkg/scheduler/api:go_default_library", "//plugin/pkg/scheduler/api:go_default_library",
"//plugin/pkg/scheduler/api/latest:go_default_library", "//plugin/pkg/scheduler/api/latest:go_default_library",
"//plugin/pkg/scheduler/schedulercache:go_default_library", "//plugin/pkg/scheduler/schedulercache:go_default_library",
"//plugin/pkg/scheduler/testing:go_default_library",
"//plugin/pkg/scheduler/util:go_default_library", "//plugin/pkg/scheduler/util:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -19,10 +19,13 @@ limitations under the License.
package factory package factory
import ( import (
"encoding/json"
"fmt" "fmt"
"reflect" "reflect"
"time" "time"
"github.com/golang/glog"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -52,9 +55,6 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/core" "k8s.io/kubernetes/plugin/pkg/scheduler/core"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"k8s.io/kubernetes/plugin/pkg/scheduler/util" "k8s.io/kubernetes/plugin/pkg/scheduler/util"
"encoding/json"
"github.com/golang/glog"
) )
const ( const (
@ -192,6 +192,9 @@ func NewConfigFactory(
} }
}, },
UpdateFunc: func(oldObj, newObj interface{}) { UpdateFunc: func(oldObj, newObj interface{}) {
if c.skipPodUpdate(newObj.(*v1.Pod)) {
return
}
if err := c.podQueue.Update(newObj); err != nil { if err := c.podQueue.Update(newObj); err != nil {
runtime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err)) runtime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err))
} }
@ -255,6 +258,53 @@ func NewConfigFactory(
return c return c
} }
// skipPodUpdate checks whether the specified pod update should be ignored.
// This function will return true if
// - The pod has already been assumed, AND
// - The pod has only its ResourceVersion, Spec.NodeName and/or Annotations
// updated.
func (c *configFactory) skipPodUpdate(pod *v1.Pod) bool {
// Non-assumed pods should never be skipped.
isAssumed, err := c.schedulerCache.IsAssumedPod(pod)
if err != nil {
runtime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
return false
}
if !isAssumed {
return false
}
// Gets the assumed pod from the cache.
assumedPod, err := c.schedulerCache.GetPod(pod)
if err != nil {
runtime.HandleError(fmt.Errorf("failed to get assumed pod %s/%s from cache: %v", pod.Namespace, pod.Name, err))
return false
}
// Compares the assumed pod in the cache with the pod update. If they are
// equal (with certain fields excluded), this pod update will be skipped.
f := func(pod *v1.Pod) *v1.Pod {
p := pod.DeepCopy()
// ResourceVersion must be excluded because each object update will
// have a new resource version.
p.ResourceVersion = ""
// Spec.NodeName must be excluded because the pod assumed in the cache
// is expected to have a node assigned while the pod update may nor may
// not have this field set.
p.Spec.NodeName = ""
// Annotations must be excluded for the reasons described in
// https://github.com/kubernetes/kubernetes/issues/52914.
p.Annotations = nil
return p
}
assumedPodCopy, podCopy := f(assumedPod), f(pod)
if !reflect.DeepEqual(assumedPodCopy, podCopy) {
return false
}
glog.V(3).Infof("Skipping pod %s/%s update", pod.Namespace, pod.Name)
return true
}
func (c *configFactory) onPvAdd(obj interface{}) { func (c *configFactory) onPvAdd(obj interface{}) {
if c.enableEquivalenceClassCache { if c.enableEquivalenceClassCache {
pv, ok := obj.(*v1.PersistentVolume) pv, ok := obj.(*v1.PersistentVolume)

View File

@ -37,6 +37,7 @@ import (
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest" latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
"k8s.io/kubernetes/plugin/pkg/scheduler/util" "k8s.io/kubernetes/plugin/pkg/scheduler/util"
) )
@ -525,3 +526,93 @@ func TestInvalidFactoryArgs(t *testing.T) {
} }
} }
func TestSkipPodUpdate(t *testing.T) {
for _, test := range []struct {
pod *v1.Pod
isAssumedPodFunc func(*v1.Pod) bool
getPodFunc func(*v1.Pod) *v1.Pod
expected bool
}{
// Non-assumed pod should not be skipped.
{
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
},
},
isAssumedPodFunc: func(*v1.Pod) bool { return false },
getPodFunc: func(*v1.Pod) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
},
}
},
expected: false,
},
// Pod update (with changes on ResourceVersion, Spec.NodeName and/or
// Annotations) for an already assumed pod should be skipped.
{
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Annotations: map[string]string{"a": "b"},
ResourceVersion: "0",
},
Spec: v1.PodSpec{
NodeName: "node-0",
},
},
isAssumedPodFunc: func(*v1.Pod) bool {
return true
},
getPodFunc: func(*v1.Pod) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Annotations: map[string]string{"c": "d"},
ResourceVersion: "1",
},
Spec: v1.PodSpec{
NodeName: "node-1",
},
}
},
expected: true,
},
// Pod update (with changes on Labels) for an already assumed pod
// should not be skipped.
{
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Labels: map[string]string{"a": "b"},
},
},
isAssumedPodFunc: func(*v1.Pod) bool {
return true
},
getPodFunc: func(*v1.Pod) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Labels: map[string]string{"c": "d"},
},
}
},
expected: false,
},
} {
c := &configFactory{
schedulerCache: &schedulertesting.FakeCache{
IsAssumedPodFunc: test.isAssumedPodFunc,
GetPodFunc: test.getPodFunc,
},
}
got := c.skipPodUpdate(test.pod)
if got != test.expected {
t.Errorf("skipPodUpdate() = %t, expected = %t", got, test.expected)
}
}
}

View File

@ -307,6 +307,39 @@ func (cache *schedulerCache) RemovePod(pod *v1.Pod) error {
return nil return nil
} }
func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) {
key, err := getPodKey(pod)
if err != nil {
return false, err
}
cache.mu.Lock()
defer cache.mu.Unlock()
b, found := cache.assumedPods[key]
if !found {
return false, nil
}
return b, nil
}
func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) {
key, err := getPodKey(pod)
if err != nil {
return nil, err
}
cache.mu.Lock()
defer cache.mu.Unlock()
podState, ok := cache.podStates[key]
if !ok {
return nil, fmt.Errorf("pod %v does not exist", key)
}
return podState.pod, nil
}
func (cache *schedulerCache) AddNode(node *v1.Node) error { func (cache *schedulerCache) AddNode(node *v1.Node) error {
cache.mu.Lock() cache.mu.Lock()
defer cache.mu.Unlock() defer cache.mu.Unlock()

View File

@ -546,11 +546,35 @@ func TestForgetPod(t *testing.T) {
if err := assumeAndFinishBinding(cache, pod, now); err != nil { if err := assumeAndFinishBinding(cache, pod, now); err != nil {
t.Fatalf("assumePod failed: %v", err) t.Fatalf("assumePod failed: %v", err)
} }
isAssumed, err := cache.IsAssumedPod(pod)
if err != nil {
t.Fatalf("IsAssumedPod failed: %v.", err)
}
if !isAssumed {
t.Fatalf("Pod is expected to be assumed.")
}
assumedPod, err := cache.GetPod(pod)
if err != nil {
t.Fatalf("GetPod failed: %v.", err)
}
if assumedPod.Namespace != pod.Namespace {
t.Errorf("assumedPod.Namespace != pod.Namespace (%s != %s)", assumedPod.Namespace, pod.Namespace)
}
if assumedPod.Name != pod.Name {
t.Errorf("assumedPod.Name != pod.Name (%s != %s)", assumedPod.Name, pod.Name)
}
} }
for _, pod := range tt.pods { for _, pod := range tt.pods {
if err := cache.ForgetPod(pod); err != nil { if err := cache.ForgetPod(pod); err != nil {
t.Fatalf("ForgetPod failed: %v", err) t.Fatalf("ForgetPod failed: %v", err)
} }
isAssumed, err := cache.IsAssumedPod(pod)
if err != nil {
t.Fatalf("IsAssumedPod failed: %v.", err)
}
if isAssumed {
t.Fatalf("Pod is expected to be unassumed.")
}
} }
cache.cleanupAssumedPods(now.Add(2 * ttl)) cache.cleanupAssumedPods(now.Add(2 * ttl))
if n := cache.nodes[nodeName]; n != nil { if n := cache.nodes[nodeName]; n != nil {

View File

@ -79,6 +79,13 @@ type Cache interface {
// RemovePod removes a pod. The pod's information would be subtracted from assigned node. // RemovePod removes a pod. The pod's information would be subtracted from assigned node.
RemovePod(pod *v1.Pod) error RemovePod(pod *v1.Pod) error
// GetPod returns the pod from the cache with the same namespace and the
// same name of the specified pod.
GetPod(pod *v1.Pod) (*v1.Pod, error)
// IsAssumedPod returns true if the pod is assumed and not expired.
IsAssumedPod(pod *v1.Pod) (bool, error)
// AddNode adds overall information about node. // AddNode adds overall information about node.
AddNode(node *v1.Node) error AddNode(node *v1.Node) error

View File

@ -24,8 +24,10 @@ import (
// FakeCache is used for testing // FakeCache is used for testing
type FakeCache struct { type FakeCache struct {
AssumeFunc func(*v1.Pod) AssumeFunc func(*v1.Pod)
ForgetFunc func(*v1.Pod) ForgetFunc func(*v1.Pod)
IsAssumedPodFunc func(*v1.Pod) bool
GetPodFunc func(*v1.Pod) *v1.Pod
} }
func (f *FakeCache) AssumePod(pod *v1.Pod) error { func (f *FakeCache) AssumePod(pod *v1.Pod) error {
@ -46,6 +48,14 @@ func (f *FakeCache) UpdatePod(oldPod, newPod *v1.Pod) error { return nil }
func (f *FakeCache) RemovePod(pod *v1.Pod) error { return nil } func (f *FakeCache) RemovePod(pod *v1.Pod) error { return nil }
func (f *FakeCache) IsAssumedPod(pod *v1.Pod) (bool, error) {
return f.IsAssumedPodFunc(pod), nil
}
func (f *FakeCache) GetPod(pod *v1.Pod) (*v1.Pod, error) {
return f.GetPodFunc(pod), nil
}
func (f *FakeCache) AddNode(node *v1.Node) error { return nil } func (f *FakeCache) AddNode(node *v1.Node) error { return nil }
func (f *FakeCache) UpdateNode(oldNode, newNode *v1.Node) error { return nil } func (f *FakeCache) UpdateNode(oldNode, newNode *v1.Node) error { return nil }