mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-07 03:03:59 +00:00
Merge pull request #127083 from sanposhiho/scheduler-smaller-event
feat: implement Pod smaller update events
This commit is contained in:
commit
4bc6a11d78
@ -61,6 +61,10 @@ var (
|
|||||||
PodRequestScaledDown = ClusterEvent{Resource: Pod, ActionType: UpdatePodScaleDown, Label: "PodRequestScaledDown"}
|
PodRequestScaledDown = ClusterEvent{Resource: Pod, ActionType: UpdatePodScaleDown, Label: "PodRequestScaledDown"}
|
||||||
// PodLabelChange is the event when a pod's label is changed.
|
// PodLabelChange is the event when a pod's label is changed.
|
||||||
PodLabelChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodLabel, Label: "PodLabelChange"}
|
PodLabelChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodLabel, Label: "PodLabelChange"}
|
||||||
|
// PodTolerationChange is the event when a pod's toleration is changed.
|
||||||
|
PodTolerationChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodTolerations, Label: "PodTolerationChange"}
|
||||||
|
// PodSchedulingGateEliminatedChange is the event when a pod's scheduling gate is changed.
|
||||||
|
PodSchedulingGateEliminatedChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodSchedulingGatesEliminated, Label: "PodSchedulingGateChange"}
|
||||||
// NodeSpecUnschedulableChange is the event when unschedulable node spec is changed.
|
// NodeSpecUnschedulableChange is the event when unschedulable node spec is changed.
|
||||||
NodeSpecUnschedulableChange = ClusterEvent{Resource: Node, ActionType: UpdateNodeTaint, Label: "NodeSpecUnschedulableChange"}
|
NodeSpecUnschedulableChange = ClusterEvent{Resource: Node, ActionType: UpdateNodeTaint, Label: "NodeSpecUnschedulableChange"}
|
||||||
// NodeAllocatableChange is the event when node allocatable is changed.
|
// NodeAllocatableChange is the event when node allocatable is changed.
|
||||||
@ -109,6 +113,8 @@ func PodSchedulingPropertiesChange(newPod *v1.Pod, oldPod *v1.Pod) (events []Clu
|
|||||||
podChangeExtracters := []podChangeExtractor{
|
podChangeExtracters := []podChangeExtractor{
|
||||||
extractPodLabelsChange,
|
extractPodLabelsChange,
|
||||||
extractPodScaleDown,
|
extractPodScaleDown,
|
||||||
|
extractPodSchedulingGateEliminatedChange,
|
||||||
|
extractPodTolerationChange,
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, fn := range podChangeExtracters {
|
for _, fn := range podChangeExtracters {
|
||||||
@ -159,6 +165,27 @@ func extractPodLabelsChange(newPod *v1.Pod, oldPod *v1.Pod) *ClusterEvent {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func extractPodTolerationChange(newPod *v1.Pod, oldPod *v1.Pod) *ClusterEvent {
|
||||||
|
if len(newPod.Spec.Tolerations) != len(oldPod.Spec.Tolerations) {
|
||||||
|
// A Pod got a new toleration.
|
||||||
|
// Due to API validation, the user can add, but cannot modify or remove tolerations.
|
||||||
|
// So, it's enough to just check the length of tolerations to notice the update.
|
||||||
|
// And, any updates in tolerations could make Pod schedulable.
|
||||||
|
return &PodTolerationChange
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func extractPodSchedulingGateEliminatedChange(newPod *v1.Pod, oldPod *v1.Pod) *ClusterEvent {
|
||||||
|
if len(newPod.Spec.SchedulingGates) == 0 && len(oldPod.Spec.SchedulingGates) != 0 {
|
||||||
|
// A scheduling gate on the pod is completely removed.
|
||||||
|
return &PodSchedulingGateEliminatedChange
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// NodeSchedulingPropertiesChange interprets the update of a node and returns corresponding UpdateNodeXYZ event(s).
|
// NodeSchedulingPropertiesChange interprets the update of a node and returns corresponding UpdateNodeXYZ event(s).
|
||||||
func NodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) (events []ClusterEvent) {
|
func NodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) (events []ClusterEvent) {
|
||||||
nodeChangeExtracters := []nodeChangeExtractor{
|
nodeChangeExtracters := []nodeChangeExtractor{
|
||||||
|
@ -370,6 +370,24 @@ func Test_podSchedulingPropertiesChange(t *testing.T) {
|
|||||||
oldPod: st.MakePod().Annotation("foo", "bar2").Obj(),
|
oldPod: st.MakePod().Annotation("foo", "bar2").Obj(),
|
||||||
want: []ClusterEvent{assignedPodOtherUpdate},
|
want: []ClusterEvent{assignedPodOtherUpdate},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "scheduling gate is eliminated",
|
||||||
|
newPod: st.MakePod().SchedulingGates([]string{}).Obj(),
|
||||||
|
oldPod: st.MakePod().SchedulingGates([]string{"foo"}).Obj(),
|
||||||
|
want: []ClusterEvent{PodSchedulingGateEliminatedChange},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "scheduling gate is removed, but not completely eliminated",
|
||||||
|
newPod: st.MakePod().SchedulingGates([]string{"foo"}).Obj(),
|
||||||
|
oldPod: st.MakePod().SchedulingGates([]string{"foo", "bar"}).Obj(),
|
||||||
|
want: []ClusterEvent{assignedPodOtherUpdate},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "pod's tolerations are updated",
|
||||||
|
newPod: st.MakePod().Toleration("key").Toleration("key2").Obj(),
|
||||||
|
oldPod: st.MakePod().Toleration("key").Obj(),
|
||||||
|
want: []ClusterEvent{PodTolerationChange},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
@ -68,7 +68,7 @@ func (pl *SchedulingGates) EventsToRegister(_ context.Context) ([]framework.Clus
|
|||||||
// https://github.com/kubernetes/kubernetes/pull/122234
|
// https://github.com/kubernetes/kubernetes/pull/122234
|
||||||
return []framework.ClusterEventWithHint{
|
return []framework.ClusterEventWithHint{
|
||||||
// Pods can be more schedulable once it's gates are removed
|
// Pods can be more schedulable once it's gates are removed
|
||||||
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodChange},
|
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.UpdatePodSchedulingGatesEliminated}, QueueingHintFn: pl.isSchedulableAfterUpdatePodSchedulingGatesEliminated},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -79,7 +79,7 @@ func New(_ context.Context, _ runtime.Object, _ framework.Handle, fts feature.Fe
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pl *SchedulingGates) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
|
func (pl *SchedulingGates) isSchedulableAfterUpdatePodSchedulingGatesEliminated(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
|
||||||
_, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj)
|
_, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return framework.Queue, err
|
return framework.Queue, err
|
||||||
@ -90,8 +90,5 @@ func (pl *SchedulingGates) isSchedulableAfterPodChange(logger klog.Logger, pod *
|
|||||||
return framework.QueueSkip, nil
|
return framework.QueueSkip, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(modifiedPod.Spec.SchedulingGates) == 0 {
|
|
||||||
return framework.Queue, nil
|
return framework.Queue, nil
|
||||||
}
|
|
||||||
return framework.QueueSkip, nil
|
|
||||||
}
|
}
|
||||||
|
@ -88,13 +88,7 @@ func Test_isSchedulableAfterPodChange(t *testing.T) {
|
|||||||
newObj: st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).UID("uid1").Obj(),
|
newObj: st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).UID("uid1").Obj(),
|
||||||
expectedHint: framework.QueueSkip,
|
expectedHint: framework.QueueSkip,
|
||||||
},
|
},
|
||||||
"skip-queue-on-gates-not-empty": {
|
"queue-on-the-unsched-pod-updated": {
|
||||||
pod: st.MakePod().Name("p").SchedulingGates([]string{"foo", "bar"}).Obj(),
|
|
||||||
oldObj: st.MakePod().Name("p").SchedulingGates([]string{"foo", "bar"}).Obj(),
|
|
||||||
newObj: st.MakePod().Name("p").SchedulingGates([]string{"foo"}).Obj(),
|
|
||||||
expectedHint: framework.QueueSkip,
|
|
||||||
},
|
|
||||||
"queue-on-gates-become-empty": {
|
|
||||||
pod: st.MakePod().Name("p").SchedulingGates([]string{"foo"}).Obj(),
|
pod: st.MakePod().Name("p").SchedulingGates([]string{"foo"}).Obj(),
|
||||||
oldObj: st.MakePod().Name("p").SchedulingGates([]string{"foo"}).Obj(),
|
oldObj: st.MakePod().Name("p").SchedulingGates([]string{"foo"}).Obj(),
|
||||||
newObj: st.MakePod().Name("p").SchedulingGates([]string{}).Obj(),
|
newObj: st.MakePod().Name("p").SchedulingGates([]string{}).Obj(),
|
||||||
@ -109,7 +103,7 @@ func Test_isSchedulableAfterPodChange(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Creating plugin: %v", err)
|
t.Fatalf("Creating plugin: %v", err)
|
||||||
}
|
}
|
||||||
actualHint, err := p.(*SchedulingGates).isSchedulableAfterPodChange(logger, tc.pod, tc.oldObj, tc.newObj)
|
actualHint, err := p.(*SchedulingGates).isSchedulableAfterUpdatePodSchedulingGatesEliminated(logger, tc.pod, tc.oldObj, tc.newObj)
|
||||||
if tc.expectedErr {
|
if tc.expectedErr {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
return
|
return
|
||||||
|
@ -70,7 +70,7 @@ func (pl *TaintToleration) EventsToRegister(_ context.Context) ([]framework.Clus
|
|||||||
// to determine whether a Pod's update makes the Pod schedulable or not.
|
// to determine whether a Pod's update makes the Pod schedulable or not.
|
||||||
// https://github.com/kubernetes/kubernetes/pull/122234
|
// https://github.com/kubernetes/kubernetes/pull/122234
|
||||||
clusterEventWithHint = append(clusterEventWithHint,
|
clusterEventWithHint = append(clusterEventWithHint,
|
||||||
framework.ClusterEventWithHint{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodChange})
|
framework.ClusterEventWithHint{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.UpdatePodTolerations}, QueueingHintFn: pl.isSchedulableAfterPodTolerationChange})
|
||||||
return clusterEventWithHint, nil
|
return clusterEventWithHint, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -210,25 +210,20 @@ func New(_ context.Context, _ runtime.Object, h framework.Handle, fts feature.Fe
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// isSchedulableAfterPodChange is invoked whenever a pod changed. It checks whether
|
// isSchedulableAfterPodTolerationChange is invoked whenever a pod's toleration changed.
|
||||||
// that change made a previously unschedulable pod schedulable.
|
func (pl *TaintToleration) isSchedulableAfterPodTolerationChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
|
||||||
// When an unscheduled Pod, which was rejected by TaintToleration, is updated to have a new toleration,
|
_, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj)
|
||||||
// it may make the Pod schedulable.
|
|
||||||
func (pl *TaintToleration) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
|
|
||||||
originalPod, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return framework.Queue, err
|
return framework.Queue, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if pod.UID == modifiedPod.UID &&
|
if pod.UID == modifiedPod.UID {
|
||||||
len(originalPod.Spec.Tolerations) != len(modifiedPod.Spec.Tolerations) {
|
// The updated Pod is the unschedulable Pod.
|
||||||
// An unscheduled Pod got a new toleration.
|
logger.V(5).Info("a new toleration is added for the unschedulable Pod, and it may make it schedulable", "pod", klog.KObj(modifiedPod))
|
||||||
// Due to API validation, the user can add, but cannot modify or remove tolerations.
|
|
||||||
// So, it's enough to just check the length of tolerations to notice the update.
|
|
||||||
// And, any updates in tolerations could make Pod schedulable.
|
|
||||||
logger.V(5).Info("a new toleration is added for the Pod, and it may make it schedulable", "pod", klog.KObj(modifiedPod))
|
|
||||||
return framework.Queue, nil
|
return framework.Queue, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.V(5).Info("a new toleration is added for a Pod, but it's an unrelated Pod and wouldn't change the TaintToleration plugin's decision", "pod", klog.KObj(modifiedPod))
|
||||||
|
|
||||||
return framework.QueueSkip, nil
|
return framework.QueueSkip, nil
|
||||||
}
|
}
|
||||||
|
@ -421,7 +421,7 @@ func TestIsSchedulableAfterNodeChange(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_isSchedulableAfterPodChange(t *testing.T) {
|
func Test_isSchedulableAfterPodTolerationChange(t *testing.T) {
|
||||||
testcases := map[string]struct {
|
testcases := map[string]struct {
|
||||||
pod *v1.Pod
|
pod *v1.Pod
|
||||||
oldObj, newObj interface{}
|
oldObj, newObj interface{}
|
||||||
@ -472,27 +472,6 @@ func Test_isSchedulableAfterPodChange(t *testing.T) {
|
|||||||
expectedHint: framework.QueueSkip,
|
expectedHint: framework.QueueSkip,
|
||||||
expectedErr: false,
|
expectedErr: false,
|
||||||
},
|
},
|
||||||
"skip-updates-not-toleration": {
|
|
||||||
pod: &v1.Pod{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: "pod-1",
|
|
||||||
Namespace: "ns-1",
|
|
||||||
}},
|
|
||||||
oldObj: &v1.Pod{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: "pod-1",
|
|
||||||
Namespace: "ns-1",
|
|
||||||
}},
|
|
||||||
newObj: &v1.Pod{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: "pod-1",
|
|
||||||
Namespace: "ns-1",
|
|
||||||
Labels: map[string]string{"foo": "bar"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
expectedHint: framework.QueueSkip,
|
|
||||||
expectedErr: false,
|
|
||||||
},
|
|
||||||
"queue-on-toleration-added": {
|
"queue-on-toleration-added": {
|
||||||
pod: &v1.Pod{
|
pod: &v1.Pod{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
@ -530,7 +509,7 @@ func Test_isSchedulableAfterPodChange(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("creating plugin: %v", err)
|
t.Fatalf("creating plugin: %v", err)
|
||||||
}
|
}
|
||||||
actualHint, err := p.(*TaintToleration).isSchedulableAfterPodChange(logger, tc.pod, tc.oldObj, tc.newObj)
|
actualHint, err := p.(*TaintToleration).isSchedulableAfterPodTolerationChange(logger, tc.pod, tc.oldObj, tc.newObj)
|
||||||
if tc.expectedErr {
|
if tc.expectedErr {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("unexpected success")
|
t.Errorf("unexpected success")
|
||||||
|
@ -68,6 +68,10 @@ const (
|
|||||||
UpdatePodLabel
|
UpdatePodLabel
|
||||||
// UpdatePodScaleDown is an update for pod's scale down (i.e., any resource request is reduced).
|
// UpdatePodScaleDown is an update for pod's scale down (i.e., any resource request is reduced).
|
||||||
UpdatePodScaleDown
|
UpdatePodScaleDown
|
||||||
|
// UpdatePodTolerations is an update for pod's tolerations.
|
||||||
|
UpdatePodTolerations
|
||||||
|
// UpdatePodSchedulingGatesEliminated is an update for pod's scheduling gates, which eliminates all scheduling gates in the Pod.
|
||||||
|
UpdatePodSchedulingGatesEliminated
|
||||||
|
|
||||||
// updatePodOther is a update for pod's other fields.
|
// updatePodOther is a update for pod's other fields.
|
||||||
// It's used only for the internal event handling, and thus unexported.
|
// It's used only for the internal event handling, and thus unexported.
|
||||||
@ -76,7 +80,7 @@ const (
|
|||||||
All ActionType = 1<<iota - 1
|
All ActionType = 1<<iota - 1
|
||||||
|
|
||||||
// Use the general Update type if you don't either know or care the specific sub-Update type to use.
|
// Use the general Update type if you don't either know or care the specific sub-Update type to use.
|
||||||
Update = UpdateNodeAllocatable | UpdateNodeLabel | UpdateNodeTaint | UpdateNodeCondition | UpdateNodeAnnotation | UpdatePodLabel | UpdatePodScaleDown | updatePodOther
|
Update = UpdateNodeAllocatable | UpdateNodeLabel | UpdateNodeTaint | UpdateNodeCondition | UpdateNodeAnnotation | UpdatePodLabel | UpdatePodScaleDown | UpdatePodTolerations | UpdatePodSchedulingGatesEliminated | updatePodOther
|
||||||
)
|
)
|
||||||
|
|
||||||
// GVK is short for group/version/kind, which can uniquely represent a particular API resource.
|
// GVK is short for group/version/kind, which can uniquely represent a particular API resource.
|
||||||
|
@ -784,6 +784,7 @@ func Test_UnionedGVKs(t *testing.T) {
|
|||||||
plugins schedulerapi.PluginSet
|
plugins schedulerapi.PluginSet
|
||||||
want map[framework.GVK]framework.ActionType
|
want map[framework.GVK]framework.ActionType
|
||||||
enableInPlacePodVerticalScaling bool
|
enableInPlacePodVerticalScaling bool
|
||||||
|
enableSchedulerQueueingHints bool
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "filter without EnqueueExtensions plugin",
|
name: "filter without EnqueueExtensions plugin",
|
||||||
@ -894,10 +895,27 @@ func Test_UnionedGVKs(t *testing.T) {
|
|||||||
},
|
},
|
||||||
enableInPlacePodVerticalScaling: true,
|
enableInPlacePodVerticalScaling: true,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "plugins with default profile (queueingHint: enabled)",
|
||||||
|
plugins: schedulerapi.PluginSet{Enabled: defaults.PluginsV1.MultiPoint.Enabled},
|
||||||
|
want: map[framework.GVK]framework.ActionType{
|
||||||
|
framework.Pod: framework.Add | framework.UpdatePodLabel | framework.UpdatePodScaleDown | framework.UpdatePodTolerations | framework.UpdatePodSchedulingGatesEliminated | framework.Delete,
|
||||||
|
framework.Node: framework.All,
|
||||||
|
framework.CSINode: framework.All - framework.Delete,
|
||||||
|
framework.CSIDriver: framework.All - framework.Delete,
|
||||||
|
framework.CSIStorageCapacity: framework.All - framework.Delete,
|
||||||
|
framework.PersistentVolume: framework.All - framework.Delete,
|
||||||
|
framework.PersistentVolumeClaim: framework.All - framework.Delete,
|
||||||
|
framework.StorageClass: framework.All - framework.Delete,
|
||||||
|
},
|
||||||
|
enableInPlacePodVerticalScaling: true,
|
||||||
|
enableSchedulerQueueingHints: true,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, tt.enableInPlacePodVerticalScaling)
|
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, tt.enableInPlacePodVerticalScaling)
|
||||||
|
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, tt.enableSchedulerQueueingHints)
|
||||||
|
|
||||||
_, ctx := ktesting.NewTestContext(t)
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
Loading…
Reference in New Issue
Block a user