diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index 9d60a9b7d0f..1909577a293 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -92,8 +92,8 @@ func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) { nodeInfo := sched.Cache.UpdateNode(logger, oldNode, newNode) // Only requeue unschedulable pods if the node became more schedulable. - if event := nodeSchedulingPropertiesChange(newNode, oldNode); event != nil { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, *event, oldNode, newNode, preCheckForNode(nodeInfo)) + for _, evt := range nodeSchedulingPropertiesChange(newNode, oldNode) { + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, oldNode, newNode, preCheckForNode(nodeInfo)) } } @@ -522,24 +522,26 @@ func addAllEventHandlers( return nil } -func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent { +func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) []framework.ClusterEvent { + var events []framework.ClusterEvent + if nodeSpecUnschedulableChanged(newNode, oldNode) { - return &queue.NodeSpecUnschedulableChange + events = append(events, queue.NodeSpecUnschedulableChange) } if nodeAllocatableChanged(newNode, oldNode) { - return &queue.NodeAllocatableChange + events = append(events, queue.NodeAllocatableChange) } if nodeLabelsChanged(newNode, oldNode) { - return &queue.NodeLabelChange + events = append(events, queue.NodeLabelChange) } if nodeTaintsChanged(newNode, oldNode) { - return &queue.NodeTaintChange + events = append(events, queue.NodeTaintChange) } if nodeConditionsChanged(newNode, oldNode) { - return &queue.NodeConditionChange + events = append(events, queue.NodeConditionChange) } - return nil + return events } func nodeAllocatableChanged(newNode *v1.Node, oldNode *v1.Node) bool { diff --git a/pkg/scheduler/eventhandlers_test.go b/pkg/scheduler/eventhandlers_test.go index 7a67d8d6ce9..e81a0fdd938 100644 --- a/pkg/scheduler/eventhandlers_test.go +++ b/pkg/scheduler/eventhandlers_test.go @@ -524,3 +524,85 @@ func TestAdmissionCheck(t *testing.T) { }) } } + +func TestNodeSchedulingPropertiesChange(t *testing.T) { + testCases := []struct { + name string + newNode *v1.Node + oldNode *v1.Node + wantEvents []framework.ClusterEvent + }{ + { + name: "no specific changed applied", + newNode: st.MakeNode().Unschedulable(false).Obj(), + oldNode: st.MakeNode().Unschedulable(false).Obj(), + wantEvents: nil, + }, + { + name: "only node spec unavailable changed", + newNode: st.MakeNode().Unschedulable(false).Obj(), + oldNode: st.MakeNode().Unschedulable(true).Obj(), + wantEvents: []framework.ClusterEvent{queue.NodeSpecUnschedulableChange}, + }, + { + name: "only node allocatable changed", + newNode: st.MakeNode().Capacity(map[v1.ResourceName]string{ + v1.ResourceCPU: "1000m", + v1.ResourceMemory: "100m", + v1.ResourceName("example.com/foo"): "1"}, + ).Obj(), + oldNode: st.MakeNode().Capacity(map[v1.ResourceName]string{ + v1.ResourceCPU: "1000m", + v1.ResourceMemory: "100m", + v1.ResourceName("example.com/foo"): "2"}, + ).Obj(), + wantEvents: []framework.ClusterEvent{queue.NodeAllocatableChange}, + }, + { + name: "only node label changed", + newNode: st.MakeNode().Label("foo", "bar").Obj(), + oldNode: st.MakeNode().Label("foo", "fuz").Obj(), + wantEvents: []framework.ClusterEvent{queue.NodeLabelChange}, + }, + { + name: "only node taint changed", + newNode: st.MakeNode().Taints([]v1.Taint{ + {Key: v1.TaintNodeUnschedulable, Value: "", Effect: v1.TaintEffectNoSchedule}, + }).Obj(), + oldNode: st.MakeNode().Taints([]v1.Taint{ + {Key: v1.TaintNodeUnschedulable, Value: "foo", Effect: v1.TaintEffectNoSchedule}, + }).Obj(), + wantEvents: []framework.ClusterEvent{queue.NodeTaintChange}, + }, + { + name: "only node condition changed", + newNode: st.MakeNode().Obj(), + oldNode: st.MakeNode().Condition( + v1.NodeReady, + v1.ConditionTrue, + "Ready", + "Ready", + ).Obj(), + wantEvents: []framework.ClusterEvent{queue.NodeConditionChange}, + }, + { + name: "both node label and node taint changed", + newNode: st.MakeNode(). + Label("foo", "bar"). + Taints([]v1.Taint{ + {Key: v1.TaintNodeUnschedulable, Value: "", Effect: v1.TaintEffectNoSchedule}, + }).Obj(), + oldNode: st.MakeNode().Taints([]v1.Taint{ + {Key: v1.TaintNodeUnschedulable, Value: "foo", Effect: v1.TaintEffectNoSchedule}, + }).Obj(), + wantEvents: []framework.ClusterEvent{queue.NodeLabelChange, queue.NodeTaintChange}, + }, + } + + for _, tc := range testCases { + gotEvents := nodeSchedulingPropertiesChange(tc.newNode, tc.oldNode) + if diff := cmp.Diff(tc.wantEvents, gotEvents); diff != "" { + t.Errorf("unexpected event (-want, +got):\n%s", diff) + } + } +} diff --git a/pkg/scheduler/testing/wrappers.go b/pkg/scheduler/testing/wrappers.go index b564bd0cafb..6cda66b6661 100644 --- a/pkg/scheduler/testing/wrappers.go +++ b/pkg/scheduler/testing/wrappers.go @@ -18,6 +18,7 @@ package testing import ( "fmt" + "time" v1 "k8s.io/api/core/v1" resourcev1alpha2 "k8s.io/api/resource/v1alpha2" @@ -760,6 +761,27 @@ func (n *NodeWrapper) Taints(taints []v1.Taint) *NodeWrapper { return n } +// Unschedulable applies the unschedulable field. +func (n *NodeWrapper) Unschedulable(unschedulable bool) *NodeWrapper { + n.Spec.Unschedulable = unschedulable + return n +} + +// Condition applies the node condition. +func (n *NodeWrapper) Condition(typ v1.NodeConditionType, status v1.ConditionStatus, message, reason string) *NodeWrapper { + n.Status.Conditions = []v1.NodeCondition{ + { + Type: typ, + Status: status, + Message: message, + Reason: reason, + LastHeartbeatTime: metav1.Time{Time: time.Now()}, + LastTransitionTime: metav1.Time{Time: time.Now()}, + }, + } + return n +} + // PersistentVolumeClaimWrapper wraps a PersistentVolumeClaim inside. type PersistentVolumeClaimWrapper struct{ v1.PersistentVolumeClaim } diff --git a/test/integration/scheduler/eventhandler/eventhandler_test.go b/test/integration/scheduler/eventhandler/eventhandler_test.go new file mode 100644 index 00000000000..1917501ccd2 --- /dev/null +++ b/test/integration/scheduler/eventhandler/eventhandler_test.go @@ -0,0 +1,137 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventhandler + +import ( + "context" + "testing" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/component-helpers/scheduling/corev1" + configv1 "k8s.io/kube-scheduler/config/v1" + "k8s.io/kubernetes/pkg/scheduler" + configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" + "k8s.io/kubernetes/pkg/scheduler/framework" + frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + st "k8s.io/kubernetes/pkg/scheduler/testing" + schedulerutils "k8s.io/kubernetes/test/integration/scheduler" + testutils "k8s.io/kubernetes/test/integration/util" + "k8s.io/utils/ptr" +) + +var _ framework.FilterPlugin = &fooPlugin{} + +type fooPlugin struct { +} + +func (pl *fooPlugin) Name() string { + return "foo" +} + +func (pl *fooPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { + taints := nodeInfo.Node().Spec.Taints + if len(taints) == 0 { + return nil + } + + if corev1.TolerationsTolerateTaint(pod.Spec.Tolerations, &nodeInfo.Node().Spec.Taints[0]) { + return nil + } + return framework.NewStatus(framework.Unschedulable) +} + +func (pl *fooPlugin) EventsToRegister() []framework.ClusterEventWithHint { + return []framework.ClusterEventWithHint{ + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}}, + } +} + +// newPlugin returns a plugin factory with specified Plugin. +func newPlugin(plugin framework.Plugin) frameworkruntime.PluginFactory { + return func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + return plugin, nil + } +} + +func TestUpdateNodeEvent(t *testing.T) { + testContext := testutils.InitTestAPIServer(t, "test-event", nil) + + taints := []v1.Taint{{Key: v1.TaintNodeUnschedulable, Value: "", Effect: v1.TaintEffectNoSchedule}} + nodeWrapper := st.MakeNode().Name("node-0").Label("kubernetes.io/hostname", "node-0").Taints(taints).Obj() + podWrapper := testutils.InitPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testContext.NS.Name}) + fooPlugin := &fooPlugin{} + + registry := frameworkruntime.Registry{ + fooPlugin.Name(): newPlugin(fooPlugin), + } + + // Setup plugins for testing. + cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{ + Profiles: []configv1.KubeSchedulerProfile{{ + SchedulerName: ptr.To[string](v1.DefaultSchedulerName), + Plugins: &configv1.Plugins{ + Filter: configv1.PluginSet{ + Enabled: []configv1.Plugin{ + {Name: fooPlugin.Name()}, + }, + Disabled: []configv1.Plugin{ + {Name: "*"}, + }, + }, + }, + }}, + }) + + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 0, + scheduler.WithProfiles(cfg.Profiles...), + scheduler.WithFrameworkOutOfTreeRegistry(registry), + ) + defer teardown() + + node, err := testutils.CreateNode(testCtx.ClientSet, nodeWrapper) + if err != nil { + t.Fatalf("Creating node error: %v", err) + } + + pod, err := testutils.CreatePausePod(testCtx.ClientSet, podWrapper) + if err != nil { + t.Fatalf("Creating pod error: %v", err) + } + + if err := testutils.WaitForPodUnschedulable(testCtx.ClientSet, pod); err != nil { + t.Fatalf("Pod %v got scheduled: %v", pod.Name, err) + } + node, err = testCtx.ClientSet.CoreV1().Nodes().Get(testCtx.Ctx, node.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Error while getting a node: %v", err) + } + + // Update node label and node taints + node.Labels["foo"] = "bar" + node.Spec.Taints = nil + + _, err = testCtx.ClientSet.CoreV1().Nodes().Update(testCtx.Ctx, node, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Error updating the node: %v", err) + } + + if err := testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { + t.Errorf("Pod %v was not scheduled: %v", pod.Name, err) + } +} diff --git a/test/integration/scheduler/eventhandler/main_test.go b/test/integration/scheduler/eventhandler/main_test.go new file mode 100644 index 00000000000..d3b7a6cab05 --- /dev/null +++ b/test/integration/scheduler/eventhandler/main_test.go @@ -0,0 +1,27 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventhandler + +import ( + "testing" + + "k8s.io/kubernetes/test/integration/framework" +) + +func TestMain(m *testing.M) { + framework.EtcdMain(m.Run) +}