mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Change scheduler to skip pod with only annotations updates
This commit is contained in:
parent
4ad4e1ec40
commit
8b6bf4e235
@ -61,6 +61,7 @@ go_test(
|
||||
"//plugin/pkg/scheduler/api:go_default_library",
|
||||
"//plugin/pkg/scheduler/api/latest:go_default_library",
|
||||
"//plugin/pkg/scheduler/schedulercache:go_default_library",
|
||||
"//plugin/pkg/scheduler/testing:go_default_library",
|
||||
"//plugin/pkg/scheduler/util:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
|
@ -19,10 +19,13 @@ limitations under the License.
|
||||
package factory
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@ -52,9 +55,6 @@ import (
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/core"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/util"
|
||||
|
||||
"encoding/json"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -192,6 +192,9 @@ func NewConfigFactory(
|
||||
}
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
if c.skipPodUpdate(newObj.(*v1.Pod)) {
|
||||
return
|
||||
}
|
||||
if err := c.podQueue.Update(newObj); err != nil {
|
||||
runtime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err))
|
||||
}
|
||||
@ -255,6 +258,53 @@ func NewConfigFactory(
|
||||
return c
|
||||
}
|
||||
|
||||
// skipPodUpdate checks whether the specified pod update should be ignored.
|
||||
// This function will return true if
|
||||
// - The pod has already been assumed, AND
|
||||
// - The pod has only its ResourceVersion, Spec.NodeName and/or Annotations
|
||||
// updated.
|
||||
func (c *configFactory) skipPodUpdate(pod *v1.Pod) bool {
|
||||
// Non-assumed pods should never be skipped.
|
||||
isAssumed, err := c.schedulerCache.IsAssumedPod(pod)
|
||||
if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
|
||||
return false
|
||||
}
|
||||
if !isAssumed {
|
||||
return false
|
||||
}
|
||||
|
||||
// Gets the assumed pod from the cache.
|
||||
assumedPod, err := c.schedulerCache.GetPod(pod)
|
||||
if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("failed to get assumed pod %s/%s from cache: %v", pod.Namespace, pod.Name, err))
|
||||
return false
|
||||
}
|
||||
|
||||
// Compares the assumed pod in the cache with the pod update. If they are
|
||||
// equal (with certain fields excluded), this pod update will be skipped.
|
||||
f := func(pod *v1.Pod) *v1.Pod {
|
||||
p := pod.DeepCopy()
|
||||
// ResourceVersion must be excluded because each object update will
|
||||
// have a new resource version.
|
||||
p.ResourceVersion = ""
|
||||
// Spec.NodeName must be excluded because the pod assumed in the cache
|
||||
// is expected to have a node assigned while the pod update may nor may
|
||||
// not have this field set.
|
||||
p.Spec.NodeName = ""
|
||||
// Annotations must be excluded for the reasons described in
|
||||
// https://github.com/kubernetes/kubernetes/issues/52914.
|
||||
p.Annotations = nil
|
||||
return p
|
||||
}
|
||||
assumedPodCopy, podCopy := f(assumedPod), f(pod)
|
||||
if !reflect.DeepEqual(assumedPodCopy, podCopy) {
|
||||
return false
|
||||
}
|
||||
glog.V(3).Infof("Skipping pod %s/%s update", pod.Namespace, pod.Name)
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *configFactory) onPvAdd(obj interface{}) {
|
||||
if c.enableEquivalenceClassCache {
|
||||
pv, ok := obj.(*v1.PersistentVolume)
|
||||
|
@ -37,6 +37,7 @@ import (
|
||||
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
|
||||
latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/util"
|
||||
)
|
||||
|
||||
@ -525,3 +526,93 @@ func TestInvalidFactoryArgs(t *testing.T) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestSkipPodUpdate(t *testing.T) {
|
||||
for _, test := range []struct {
|
||||
pod *v1.Pod
|
||||
isAssumedPodFunc func(*v1.Pod) bool
|
||||
getPodFunc func(*v1.Pod) *v1.Pod
|
||||
expected bool
|
||||
}{
|
||||
// Non-assumed pod should not be skipped.
|
||||
{
|
||||
pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-0",
|
||||
},
|
||||
},
|
||||
isAssumedPodFunc: func(*v1.Pod) bool { return false },
|
||||
getPodFunc: func(*v1.Pod) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-0",
|
||||
},
|
||||
}
|
||||
},
|
||||
expected: false,
|
||||
},
|
||||
// Pod update (with changes on ResourceVersion, Spec.NodeName and/or
|
||||
// Annotations) for an already assumed pod should be skipped.
|
||||
{
|
||||
pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-0",
|
||||
Annotations: map[string]string{"a": "b"},
|
||||
ResourceVersion: "0",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: "node-0",
|
||||
},
|
||||
},
|
||||
isAssumedPodFunc: func(*v1.Pod) bool {
|
||||
return true
|
||||
},
|
||||
getPodFunc: func(*v1.Pod) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-0",
|
||||
Annotations: map[string]string{"c": "d"},
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: "node-1",
|
||||
},
|
||||
}
|
||||
},
|
||||
expected: true,
|
||||
},
|
||||
// Pod update (with changes on Labels) for an already assumed pod
|
||||
// should not be skipped.
|
||||
{
|
||||
pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-0",
|
||||
Labels: map[string]string{"a": "b"},
|
||||
},
|
||||
},
|
||||
isAssumedPodFunc: func(*v1.Pod) bool {
|
||||
return true
|
||||
},
|
||||
getPodFunc: func(*v1.Pod) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-0",
|
||||
Labels: map[string]string{"c": "d"},
|
||||
},
|
||||
}
|
||||
},
|
||||
expected: false,
|
||||
},
|
||||
} {
|
||||
c := &configFactory{
|
||||
schedulerCache: &schedulertesting.FakeCache{
|
||||
IsAssumedPodFunc: test.isAssumedPodFunc,
|
||||
GetPodFunc: test.getPodFunc,
|
||||
},
|
||||
}
|
||||
got := c.skipPodUpdate(test.pod)
|
||||
if got != test.expected {
|
||||
t.Errorf("skipPodUpdate() = %t, expected = %t", got, test.expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -307,6 +307,39 @@ func (cache *schedulerCache) RemovePod(pod *v1.Pod) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) {
|
||||
key, err := getPodKey(pod)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
cache.mu.Lock()
|
||||
defer cache.mu.Unlock()
|
||||
|
||||
b, found := cache.assumedPods[key]
|
||||
if !found {
|
||||
return false, nil
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) {
|
||||
key, err := getPodKey(pod)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cache.mu.Lock()
|
||||
defer cache.mu.Unlock()
|
||||
|
||||
podState, ok := cache.podStates[key]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("pod %v does not exist", key)
|
||||
}
|
||||
|
||||
return podState.pod, nil
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) AddNode(node *v1.Node) error {
|
||||
cache.mu.Lock()
|
||||
defer cache.mu.Unlock()
|
||||
|
@ -546,11 +546,35 @@ func TestForgetPod(t *testing.T) {
|
||||
if err := assumeAndFinishBinding(cache, pod, now); err != nil {
|
||||
t.Fatalf("assumePod failed: %v", err)
|
||||
}
|
||||
isAssumed, err := cache.IsAssumedPod(pod)
|
||||
if err != nil {
|
||||
t.Fatalf("IsAssumedPod failed: %v.", err)
|
||||
}
|
||||
if !isAssumed {
|
||||
t.Fatalf("Pod is expected to be assumed.")
|
||||
}
|
||||
assumedPod, err := cache.GetPod(pod)
|
||||
if err != nil {
|
||||
t.Fatalf("GetPod failed: %v.", err)
|
||||
}
|
||||
if assumedPod.Namespace != pod.Namespace {
|
||||
t.Errorf("assumedPod.Namespace != pod.Namespace (%s != %s)", assumedPod.Namespace, pod.Namespace)
|
||||
}
|
||||
if assumedPod.Name != pod.Name {
|
||||
t.Errorf("assumedPod.Name != pod.Name (%s != %s)", assumedPod.Name, pod.Name)
|
||||
}
|
||||
}
|
||||
for _, pod := range tt.pods {
|
||||
if err := cache.ForgetPod(pod); err != nil {
|
||||
t.Fatalf("ForgetPod failed: %v", err)
|
||||
}
|
||||
isAssumed, err := cache.IsAssumedPod(pod)
|
||||
if err != nil {
|
||||
t.Fatalf("IsAssumedPod failed: %v.", err)
|
||||
}
|
||||
if isAssumed {
|
||||
t.Fatalf("Pod is expected to be unassumed.")
|
||||
}
|
||||
}
|
||||
cache.cleanupAssumedPods(now.Add(2 * ttl))
|
||||
if n := cache.nodes[nodeName]; n != nil {
|
||||
|
@ -79,6 +79,13 @@ type Cache interface {
|
||||
// RemovePod removes a pod. The pod's information would be subtracted from assigned node.
|
||||
RemovePod(pod *v1.Pod) error
|
||||
|
||||
// GetPod returns the pod from the cache with the same namespace and the
|
||||
// same name of the specified pod.
|
||||
GetPod(pod *v1.Pod) (*v1.Pod, error)
|
||||
|
||||
// IsAssumedPod returns true if the pod is assumed and not expired.
|
||||
IsAssumedPod(pod *v1.Pod) (bool, error)
|
||||
|
||||
// AddNode adds overall information about node.
|
||||
AddNode(node *v1.Node) error
|
||||
|
||||
|
@ -24,8 +24,10 @@ import (
|
||||
|
||||
// FakeCache is used for testing
|
||||
type FakeCache struct {
|
||||
AssumeFunc func(*v1.Pod)
|
||||
ForgetFunc func(*v1.Pod)
|
||||
AssumeFunc func(*v1.Pod)
|
||||
ForgetFunc func(*v1.Pod)
|
||||
IsAssumedPodFunc func(*v1.Pod) bool
|
||||
GetPodFunc func(*v1.Pod) *v1.Pod
|
||||
}
|
||||
|
||||
func (f *FakeCache) AssumePod(pod *v1.Pod) error {
|
||||
@ -46,6 +48,14 @@ func (f *FakeCache) UpdatePod(oldPod, newPod *v1.Pod) error { return nil }
|
||||
|
||||
func (f *FakeCache) RemovePod(pod *v1.Pod) error { return nil }
|
||||
|
||||
func (f *FakeCache) IsAssumedPod(pod *v1.Pod) (bool, error) {
|
||||
return f.IsAssumedPodFunc(pod), nil
|
||||
}
|
||||
|
||||
func (f *FakeCache) GetPod(pod *v1.Pod) (*v1.Pod, error) {
|
||||
return f.GetPodFunc(pod), nil
|
||||
}
|
||||
|
||||
func (f *FakeCache) AddNode(node *v1.Node) error { return nil }
|
||||
|
||||
func (f *FakeCache) UpdateNode(oldNode, newNode *v1.Node) error { return nil }
|
||||
|
Loading…
Reference in New Issue
Block a user