Merge pull request #121755 from kerthcet/fix/node-update-event

Fix nodeUpdate event missing some potential changes
This commit is contained in:
Kubernetes Prow Robot 2023-12-13 22:36:03 +01:00 committed by GitHub
commit 4189053453
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 279 additions and 9 deletions

View File

@ -92,8 +92,8 @@ func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
nodeInfo := sched.Cache.UpdateNode(logger, oldNode, newNode)
// Only requeue unschedulable pods if the node became more schedulable.
if event := nodeSchedulingPropertiesChange(newNode, oldNode); event != nil {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, *event, oldNode, newNode, preCheckForNode(nodeInfo))
for _, evt := range nodeSchedulingPropertiesChange(newNode, oldNode) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, oldNode, newNode, preCheckForNode(nodeInfo))
}
}
@ -522,24 +522,26 @@ func addAllEventHandlers(
return nil
}
func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent {
func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) []framework.ClusterEvent {
var events []framework.ClusterEvent
if nodeSpecUnschedulableChanged(newNode, oldNode) {
return &queue.NodeSpecUnschedulableChange
events = append(events, queue.NodeSpecUnschedulableChange)
}
if nodeAllocatableChanged(newNode, oldNode) {
return &queue.NodeAllocatableChange
events = append(events, queue.NodeAllocatableChange)
}
if nodeLabelsChanged(newNode, oldNode) {
return &queue.NodeLabelChange
events = append(events, queue.NodeLabelChange)
}
if nodeTaintsChanged(newNode, oldNode) {
return &queue.NodeTaintChange
events = append(events, queue.NodeTaintChange)
}
if nodeConditionsChanged(newNode, oldNode) {
return &queue.NodeConditionChange
events = append(events, queue.NodeConditionChange)
}
return nil
return events
}
func nodeAllocatableChanged(newNode *v1.Node, oldNode *v1.Node) bool {

View File

@ -524,3 +524,85 @@ func TestAdmissionCheck(t *testing.T) {
})
}
}
func TestNodeSchedulingPropertiesChange(t *testing.T) {
testCases := []struct {
name string
newNode *v1.Node
oldNode *v1.Node
wantEvents []framework.ClusterEvent
}{
{
name: "no specific changed applied",
newNode: st.MakeNode().Unschedulable(false).Obj(),
oldNode: st.MakeNode().Unschedulable(false).Obj(),
wantEvents: nil,
},
{
name: "only node spec unavailable changed",
newNode: st.MakeNode().Unschedulable(false).Obj(),
oldNode: st.MakeNode().Unschedulable(true).Obj(),
wantEvents: []framework.ClusterEvent{queue.NodeSpecUnschedulableChange},
},
{
name: "only node allocatable changed",
newNode: st.MakeNode().Capacity(map[v1.ResourceName]string{
v1.ResourceCPU: "1000m",
v1.ResourceMemory: "100m",
v1.ResourceName("example.com/foo"): "1"},
).Obj(),
oldNode: st.MakeNode().Capacity(map[v1.ResourceName]string{
v1.ResourceCPU: "1000m",
v1.ResourceMemory: "100m",
v1.ResourceName("example.com/foo"): "2"},
).Obj(),
wantEvents: []framework.ClusterEvent{queue.NodeAllocatableChange},
},
{
name: "only node label changed",
newNode: st.MakeNode().Label("foo", "bar").Obj(),
oldNode: st.MakeNode().Label("foo", "fuz").Obj(),
wantEvents: []framework.ClusterEvent{queue.NodeLabelChange},
},
{
name: "only node taint changed",
newNode: st.MakeNode().Taints([]v1.Taint{
{Key: v1.TaintNodeUnschedulable, Value: "", Effect: v1.TaintEffectNoSchedule},
}).Obj(),
oldNode: st.MakeNode().Taints([]v1.Taint{
{Key: v1.TaintNodeUnschedulable, Value: "foo", Effect: v1.TaintEffectNoSchedule},
}).Obj(),
wantEvents: []framework.ClusterEvent{queue.NodeTaintChange},
},
{
name: "only node condition changed",
newNode: st.MakeNode().Obj(),
oldNode: st.MakeNode().Condition(
v1.NodeReady,
v1.ConditionTrue,
"Ready",
"Ready",
).Obj(),
wantEvents: []framework.ClusterEvent{queue.NodeConditionChange},
},
{
name: "both node label and node taint changed",
newNode: st.MakeNode().
Label("foo", "bar").
Taints([]v1.Taint{
{Key: v1.TaintNodeUnschedulable, Value: "", Effect: v1.TaintEffectNoSchedule},
}).Obj(),
oldNode: st.MakeNode().Taints([]v1.Taint{
{Key: v1.TaintNodeUnschedulable, Value: "foo", Effect: v1.TaintEffectNoSchedule},
}).Obj(),
wantEvents: []framework.ClusterEvent{queue.NodeLabelChange, queue.NodeTaintChange},
},
}
for _, tc := range testCases {
gotEvents := nodeSchedulingPropertiesChange(tc.newNode, tc.oldNode)
if diff := cmp.Diff(tc.wantEvents, gotEvents); diff != "" {
t.Errorf("unexpected event (-want, +got):\n%s", diff)
}
}
}

View File

@ -18,6 +18,7 @@ package testing
import (
"fmt"
"time"
v1 "k8s.io/api/core/v1"
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
@ -760,6 +761,27 @@ func (n *NodeWrapper) Taints(taints []v1.Taint) *NodeWrapper {
return n
}
// Unschedulable applies the unschedulable field.
func (n *NodeWrapper) Unschedulable(unschedulable bool) *NodeWrapper {
n.Spec.Unschedulable = unschedulable
return n
}
// Condition applies the node condition.
func (n *NodeWrapper) Condition(typ v1.NodeConditionType, status v1.ConditionStatus, message, reason string) *NodeWrapper {
n.Status.Conditions = []v1.NodeCondition{
{
Type: typ,
Status: status,
Message: message,
Reason: reason,
LastHeartbeatTime: metav1.Time{Time: time.Now()},
LastTransitionTime: metav1.Time{Time: time.Now()},
},
}
return n
}
// PersistentVolumeClaimWrapper wraps a PersistentVolumeClaim inside.
type PersistentVolumeClaimWrapper struct{ v1.PersistentVolumeClaim }

View File

@ -0,0 +1,137 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eventhandler
import (
"context"
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/component-helpers/scheduling/corev1"
configv1 "k8s.io/kube-scheduler/config/v1"
"k8s.io/kubernetes/pkg/scheduler"
configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
"k8s.io/kubernetes/pkg/scheduler/framework"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
st "k8s.io/kubernetes/pkg/scheduler/testing"
schedulerutils "k8s.io/kubernetes/test/integration/scheduler"
testutils "k8s.io/kubernetes/test/integration/util"
"k8s.io/utils/ptr"
)
var _ framework.FilterPlugin = &fooPlugin{}
type fooPlugin struct {
}
func (pl *fooPlugin) Name() string {
return "foo"
}
func (pl *fooPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
taints := nodeInfo.Node().Spec.Taints
if len(taints) == 0 {
return nil
}
if corev1.TolerationsTolerateTaint(pod.Spec.Tolerations, &nodeInfo.Node().Spec.Taints[0]) {
return nil
}
return framework.NewStatus(framework.Unschedulable)
}
func (pl *fooPlugin) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}},
}
}
// newPlugin returns a plugin factory with specified Plugin.
func newPlugin(plugin framework.Plugin) frameworkruntime.PluginFactory {
return func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return plugin, nil
}
}
func TestUpdateNodeEvent(t *testing.T) {
testContext := testutils.InitTestAPIServer(t, "test-event", nil)
taints := []v1.Taint{{Key: v1.TaintNodeUnschedulable, Value: "", Effect: v1.TaintEffectNoSchedule}}
nodeWrapper := st.MakeNode().Name("node-0").Label("kubernetes.io/hostname", "node-0").Taints(taints).Obj()
podWrapper := testutils.InitPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testContext.NS.Name})
fooPlugin := &fooPlugin{}
registry := frameworkruntime.Registry{
fooPlugin.Name(): newPlugin(fooPlugin),
}
// Setup plugins for testing.
cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{
Profiles: []configv1.KubeSchedulerProfile{{
SchedulerName: ptr.To[string](v1.DefaultSchedulerName),
Plugins: &configv1.Plugins{
Filter: configv1.PluginSet{
Enabled: []configv1.Plugin{
{Name: fooPlugin.Name()},
},
Disabled: []configv1.Plugin{
{Name: "*"},
},
},
},
}},
})
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 0,
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry),
)
defer teardown()
node, err := testutils.CreateNode(testCtx.ClientSet, nodeWrapper)
if err != nil {
t.Fatalf("Creating node error: %v", err)
}
pod, err := testutils.CreatePausePod(testCtx.ClientSet, podWrapper)
if err != nil {
t.Fatalf("Creating pod error: %v", err)
}
if err := testutils.WaitForPodUnschedulable(testCtx.ClientSet, pod); err != nil {
t.Fatalf("Pod %v got scheduled: %v", pod.Name, err)
}
node, err = testCtx.ClientSet.CoreV1().Nodes().Get(testCtx.Ctx, node.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Error while getting a node: %v", err)
}
// Update node label and node taints
node.Labels["foo"] = "bar"
node.Spec.Taints = nil
_, err = testCtx.ClientSet.CoreV1().Nodes().Update(testCtx.Ctx, node, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Error updating the node: %v", err)
}
if err := testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil {
t.Errorf("Pod %v was not scheduled: %v", pod.Name, err)
}
}

View File

@ -0,0 +1,27 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eventhandler
import (
"testing"
"k8s.io/kubernetes/test/integration/framework"
)
func TestMain(m *testing.M) {
framework.EtcdMain(m.Run)
}