diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index 3b9b6ebb410..efda92053fe 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -48,7 +48,7 @@ import ( func (sched *Scheduler) onStorageClassAdd(obj interface{}) { start := time.Now() - defer metrics.EventHandlingLatency.WithLabelValues(queue.StorageClassAdd.Label).Observe(metrics.SinceInSeconds(start)) + defer metrics.EventHandlingLatency.WithLabelValues(framework.StorageClassAdd.Label).Observe(metrics.SinceInSeconds(start)) logger := sched.logger sc, ok := obj.(*storagev1.StorageClass) if !ok { @@ -63,13 +63,13 @@ func (sched *Scheduler) onStorageClassAdd(obj interface{}) { // We don't need to invalidate cached results because results will not be // cached for pod that has unbound immediate PVCs. if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.StorageClassAdd, nil, sc, nil) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.StorageClassAdd, nil, sc, nil) } } func (sched *Scheduler) addNodeToCache(obj interface{}) { start := time.Now() - defer metrics.EventHandlingLatency.WithLabelValues(queue.NodeAdd.Label).Observe(metrics.SinceInSeconds(start)) + defer metrics.EventHandlingLatency.WithLabelValues(framework.NodeAdd.Label).Observe(metrics.SinceInSeconds(start)) logger := sched.logger node, ok := obj.(*v1.Node) if !ok { @@ -79,7 +79,7 @@ func (sched *Scheduler) addNodeToCache(obj interface{}) { logger.V(3).Info("Add event for node", "node", klog.KObj(node)) nodeInfo := sched.Cache.AddNode(logger, node) - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.NodeAdd, nil, node, preCheckForNode(nodeInfo)) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.NodeAdd, nil, node, preCheckForNode(nodeInfo)) } func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) { @@ -98,7 +98,7 @@ func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) { logger.V(4).Info("Update event for node", "node", klog.KObj(newNode)) nodeInfo := sched.Cache.UpdateNode(logger, oldNode, newNode) - events := queue.NodeSchedulingPropertiesChange(newNode, oldNode) + events := framework.NodeSchedulingPropertiesChange(newNode, oldNode) // Save the time it takes to update the node in the cache. updatingDuration := metrics.SinceInSeconds(start) @@ -115,7 +115,7 @@ func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) { func (sched *Scheduler) deleteNodeFromCache(obj interface{}) { start := time.Now() - defer metrics.EventHandlingLatency.WithLabelValues(queue.NodeDelete.Label).Observe(metrics.SinceInSeconds(start)) + defer metrics.EventHandlingLatency.WithLabelValues(framework.NodeDelete.Label).Observe(metrics.SinceInSeconds(start)) logger := sched.logger var node *v1.Node @@ -142,7 +142,7 @@ func (sched *Scheduler) deleteNodeFromCache(obj interface{}) { func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) { start := time.Now() - defer metrics.EventHandlingLatency.WithLabelValues(queue.UnscheduledPodAdd.Label).Observe(metrics.SinceInSeconds(start)) + defer metrics.EventHandlingLatency.WithLabelValues(framework.UnscheduledPodAdd.Label).Observe(metrics.SinceInSeconds(start)) logger := sched.logger pod := obj.(*v1.Pod) @@ -154,7 +154,7 @@ func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) { func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) { start := time.Now() - defer metrics.EventHandlingLatency.WithLabelValues(queue.UnscheduledPodUpdate.Label).Observe(metrics.SinceInSeconds(start)) + defer metrics.EventHandlingLatency.WithLabelValues(framework.UnscheduledPodUpdate.Label).Observe(metrics.SinceInSeconds(start)) logger := sched.logger oldPod, newPod := oldObj.(*v1.Pod), newObj.(*v1.Pod) // Bypass update event that carries identical objects; otherwise, a duplicated @@ -179,7 +179,7 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) { func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) { start := time.Now() - defer metrics.EventHandlingLatency.WithLabelValues(queue.UnscheduledPodDelete.Label).Observe(metrics.SinceInSeconds(start)) + defer metrics.EventHandlingLatency.WithLabelValues(framework.UnscheduledPodDelete.Label).Observe(metrics.SinceInSeconds(start)) logger := sched.logger var pod *v1.Pod @@ -213,13 +213,13 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) { // removing it from the scheduler cache. In this case, signal a AssignedPodDelete // event to immediately retry some unscheduled Pods. if fwk.RejectWaitingPod(pod.UID) { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodDelete, pod, nil, nil) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.AssignedPodDelete, pod, nil, nil) } } func (sched *Scheduler) addPodToCache(obj interface{}) { start := time.Now() - defer metrics.EventHandlingLatency.WithLabelValues(queue.AssignedPodAdd.Label).Observe(metrics.SinceInSeconds(start)) + defer metrics.EventHandlingLatency.WithLabelValues(framework.AssignedPodAdd.Label).Observe(metrics.SinceInSeconds(start)) logger := sched.logger pod, ok := obj.(*v1.Pod) @@ -243,7 +243,7 @@ func (sched *Scheduler) addPodToCache(obj interface{}) { // Here we use MoveAllToActiveOrBackoffQueue only when QueueingHint is enabled. // (We cannot switch to MoveAllToActiveOrBackoffQueue right away because of throughput concern.) if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodAdd, nil, pod, nil) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.AssignedPodAdd, nil, pod, nil) } else { sched.SchedulingQueue.AssignedPodAdded(logger, pod) } @@ -251,7 +251,7 @@ func (sched *Scheduler) addPodToCache(obj interface{}) { func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) { start := time.Now() - defer metrics.EventHandlingLatency.WithLabelValues(queue.AssignedPodUpdate.Label).Observe(metrics.SinceInSeconds(start)) + defer metrics.EventHandlingLatency.WithLabelValues(framework.AssignedPodUpdate.Label).Observe(metrics.SinceInSeconds(start)) logger := sched.logger oldPod, ok := oldObj.(*v1.Pod) @@ -270,25 +270,28 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) { logger.Error(err, "Scheduler cache UpdatePod failed", "pod", klog.KObj(oldPod)) } - // SchedulingQueue.AssignedPodUpdated has a problem: - // It internally pre-filters Pods to move to activeQ, - // while taking only in-tree plugins into consideration. - // Consequently, if custom plugins that subscribes Pod/Update events reject Pods, - // those Pods will never be requeued to activeQ by an assigned Pod related events, - // and they may be stuck in unschedulableQ. - // - // Here we use MoveAllToActiveOrBackoffQueue only when QueueingHint is enabled. - // (We cannot switch to MoveAllToActiveOrBackoffQueue right away because of throughput concern.) - if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodUpdate, oldPod, newPod, nil) - } else { - sched.SchedulingQueue.AssignedPodUpdated(logger, oldPod, newPod) + events := framework.PodSchedulingPropertiesChange(newPod, oldPod) + for _, evt := range events { + // SchedulingQueue.AssignedPodUpdated has a problem: + // It internally pre-filters Pods to move to activeQ, + // while taking only in-tree plugins into consideration. + // Consequently, if custom plugins that subscribes Pod/Update events reject Pods, + // those Pods will never be requeued to activeQ by an assigned Pod related events, + // and they may be stuck in unschedulableQ. + // + // Here we use MoveAllToActiveOrBackoffQueue only when QueueingHint is enabled. + // (We cannot switch to MoveAllToActiveOrBackoffQueue right away because of throughput concern.) + if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) { + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.AssignedPodUpdate, oldPod, newPod, nil) + } else { + sched.SchedulingQueue.AssignedPodUpdated(logger, oldPod, newPod, evt) + } } } func (sched *Scheduler) deletePodFromCache(obj interface{}) { start := time.Now() - defer metrics.EventHandlingLatency.WithLabelValues(queue.AssignedPodDelete.Label).Observe(metrics.SinceInSeconds(start)) + defer metrics.EventHandlingLatency.WithLabelValues(framework.AssignedPodDelete.Label).Observe(metrics.SinceInSeconds(start)) logger := sched.logger var pod *v1.Pod @@ -312,7 +315,7 @@ func (sched *Scheduler) deletePodFromCache(obj interface{}) { logger.Error(err, "Scheduler cache RemovePod failed", "pod", klog.KObj(pod)) } - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodDelete, pod, nil, nil) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.AssignedPodDelete, pod, nil, nil) } // assignedPod selects pods that are assigned (scheduled and running). @@ -557,8 +560,8 @@ func addAllEventHandlers( cache.ResourceEventHandlerFuncs{ UpdateFunc: func(old, obj interface{}) { start := time.Now() - defer metrics.EventHandlingLatency.WithLabelValues(queue.StorageClassUpdate.Label).Observe(metrics.SinceInSeconds(start)) - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.StorageClassUpdate, old, obj, nil) + defer metrics.EventHandlingLatency.WithLabelValues(framework.StorageClassUpdate.Label).Observe(metrics.SinceInSeconds(start)) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.StorageClassUpdate, old, obj, nil) }, }, ); err != nil { diff --git a/pkg/scheduler/framework/events.go b/pkg/scheduler/framework/events.go new file mode 100644 index 00000000000..89422a36dbd --- /dev/null +++ b/pkg/scheduler/framework/events.go @@ -0,0 +1,234 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/api/v1/resource" + "k8s.io/kubernetes/pkg/features" +) + +const ( + // PodAdd is the event when a new pod is added to API server. + PodAdd = "PodAdd" + // ScheduleAttemptFailure is the event when a schedule attempt fails. + ScheduleAttemptFailure = "ScheduleAttemptFailure" + // BackoffComplete is the event when a pod finishes backoff. + BackoffComplete = "BackoffComplete" + // ForceActivate is the event when a pod is moved from unschedulablePods/backoffQ + // to activeQ. Usually it's triggered by plugin implementations. + ForceActivate = "ForceActivate" + // PodUpdate is the event when a pod is updated + PodUpdate = "PodUpdate" +) + +var ( + // AssignedPodAdd is the event when an assigned pod is added. + AssignedPodAdd = ClusterEvent{Resource: Pod, ActionType: Add, Label: "AssignedPodAdd"} + // NodeAdd is the event when a new node is added to the cluster. + NodeAdd = ClusterEvent{Resource: Node, ActionType: Add, Label: "NodeAdd"} + // NodeDelete is the event when a node is deleted from the cluster. + NodeDelete = ClusterEvent{Resource: Node, ActionType: Delete, Label: "NodeDelete"} + // AssignedPodUpdate is the event when an assigned pod is updated. + AssignedPodUpdate = ClusterEvent{Resource: Pod, ActionType: Update, Label: "AssignedPodUpdate"} + // UnscheduledPodAdd is the event when an unscheduled pod is added. + UnscheduledPodAdd = ClusterEvent{Resource: Pod, ActionType: Update, Label: "UnschedulablePodAdd"} + // UnscheduledPodUpdate is the event when an unscheduled pod is updated. + UnscheduledPodUpdate = ClusterEvent{Resource: Pod, ActionType: Update, Label: "UnschedulablePodUpdate"} + // UnscheduledPodDelete is the event when an unscheduled pod is deleted. + UnscheduledPodDelete = ClusterEvent{Resource: Pod, ActionType: Update, Label: "UnschedulablePodDelete"} + // assignedPodOtherUpdate is the event when an assigned pod got updated in fields that are not covered by UpdatePodXXX. + assignedPodOtherUpdate = ClusterEvent{Resource: Pod, ActionType: updatePodOther, Label: "AssignedPodUpdate"} + // AssignedPodDelete is the event when an assigned pod is deleted. + AssignedPodDelete = ClusterEvent{Resource: Pod, ActionType: Delete, Label: "AssignedPodDelete"} + // PodRequestScaledDown is the event when a pod's resource request is scaled down. + PodRequestScaledDown = ClusterEvent{Resource: Pod, ActionType: UpdatePodScaleDown, Label: "PodRequestScaledDown"} + // PodLabelChange is the event when a pod's label is changed. + PodLabelChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodLabel, Label: "PodLabelChange"} + // NodeSpecUnschedulableChange is the event when unschedulable node spec is changed. + NodeSpecUnschedulableChange = ClusterEvent{Resource: Node, ActionType: UpdateNodeTaint, Label: "NodeSpecUnschedulableChange"} + // NodeAllocatableChange is the event when node allocatable is changed. + NodeAllocatableChange = ClusterEvent{Resource: Node, ActionType: UpdateNodeAllocatable, Label: "NodeAllocatableChange"} + // NodeLabelChange is the event when node label is changed. + NodeLabelChange = ClusterEvent{Resource: Node, ActionType: UpdateNodeLabel, Label: "NodeLabelChange"} + // NodeAnnotationChange is the event when node annotation is changed. + NodeAnnotationChange = ClusterEvent{Resource: Node, ActionType: UpdateNodeAnnotation, Label: "NodeAnnotationChange"} + // NodeTaintChange is the event when node taint is changed. + NodeTaintChange = ClusterEvent{Resource: Node, ActionType: UpdateNodeTaint, Label: "NodeTaintChange"} + // NodeConditionChange is the event when node condition is changed. + NodeConditionChange = ClusterEvent{Resource: Node, ActionType: UpdateNodeCondition, Label: "NodeConditionChange"} + // PvAdd is the event when a persistent volume is added in the cluster. + PvAdd = ClusterEvent{Resource: PersistentVolume, ActionType: Add, Label: "PvAdd"} + // PvUpdate is the event when a persistent volume is updated in the cluster. + PvUpdate = ClusterEvent{Resource: PersistentVolume, ActionType: Update, Label: "PvUpdate"} + // PvcAdd is the event when a persistent volume claim is added in the cluster. + PvcAdd = ClusterEvent{Resource: PersistentVolumeClaim, ActionType: Add, Label: "PvcAdd"} + // PvcUpdate is the event when a persistent volume claim is updated in the cluster. + PvcUpdate = ClusterEvent{Resource: PersistentVolumeClaim, ActionType: Update, Label: "PvcUpdate"} + // StorageClassAdd is the event when a StorageClass is added in the cluster. + StorageClassAdd = ClusterEvent{Resource: StorageClass, ActionType: Add, Label: "StorageClassAdd"} + // StorageClassUpdate is the event when a StorageClass is updated in the cluster. + StorageClassUpdate = ClusterEvent{Resource: StorageClass, ActionType: Update, Label: "StorageClassUpdate"} + // CSINodeAdd is the event when a CSI node is added in the cluster. + CSINodeAdd = ClusterEvent{Resource: CSINode, ActionType: Add, Label: "CSINodeAdd"} + // CSINodeUpdate is the event when a CSI node is updated in the cluster. + CSINodeUpdate = ClusterEvent{Resource: CSINode, ActionType: Update, Label: "CSINodeUpdate"} + // CSIDriverAdd is the event when a CSI driver is added in the cluster. + CSIDriverAdd = ClusterEvent{Resource: CSIDriver, ActionType: Add, Label: "CSIDriverAdd"} + // CSIDriverUpdate is the event when a CSI driver is updated in the cluster. + CSIDriverUpdate = ClusterEvent{Resource: CSIDriver, ActionType: Update, Label: "CSIDriverUpdate"} + // CSIStorageCapacityAdd is the event when a CSI storage capacity is added in the cluster. + CSIStorageCapacityAdd = ClusterEvent{Resource: CSIStorageCapacity, ActionType: Add, Label: "CSIStorageCapacityAdd"} + // CSIStorageCapacityUpdate is the event when a CSI storage capacity is updated in the cluster. + CSIStorageCapacityUpdate = ClusterEvent{Resource: CSIStorageCapacity, ActionType: Update, Label: "CSIStorageCapacityUpdate"} + // WildCardEvent semantically matches all resources on all actions. + WildCardEvent = ClusterEvent{Resource: WildCard, ActionType: All, Label: "WildCardEvent"} + // UnschedulableTimeout is the event when a pod stays in unschedulable for longer than timeout. + UnschedulableTimeout = ClusterEvent{Resource: WildCard, ActionType: All, Label: "UnschedulableTimeout"} +) + +// PodSchedulingPropertiesChange interprets the update of a pod and returns corresponding UpdatePodXYZ event(s). +// Once we have other pod update events, we should update here as well. +func PodSchedulingPropertiesChange(newPod *v1.Pod, oldPod *v1.Pod) (events []ClusterEvent) { + podChangeExtracters := []podChangeExtractor{ + extractPodLabelsChange, + extractPodScaleDown, + } + + for _, fn := range podChangeExtracters { + if event := fn(newPod, oldPod); event != nil { + events = append(events, *event) + } + } + + if len(events) == 0 { + // When no specific event is found, we use AssignedPodOtherUpdate, + // which should only trigger plugins registering a general Pod/Update event. + events = append(events, assignedPodOtherUpdate) + } + + return +} + +type podChangeExtractor func(newNode *v1.Pod, oldNode *v1.Pod) *ClusterEvent + +// extractPodScaleDown interprets the update of a pod and returns PodRequestScaledDown event if any pod's resource request(s) is scaled down. +func extractPodScaleDown(newPod, oldPod *v1.Pod) *ClusterEvent { + opt := resource.PodResourcesOptions{ + InPlacePodVerticalScalingEnabled: utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling), + } + newPodRequests := resource.PodRequests(newPod, opt) + oldPodRequests := resource.PodRequests(oldPod, opt) + + for rName, oldReq := range oldPodRequests { + newReq, ok := newPodRequests[rName] + if !ok { + // The resource request of rName is removed. + return &PodRequestScaledDown + } + + if oldReq.MilliValue() > newReq.MilliValue() { + // The resource request of rName is scaled down. + return &PodRequestScaledDown + } + } + + return nil +} + +func extractPodLabelsChange(newPod *v1.Pod, oldPod *v1.Pod) *ClusterEvent { + if isLabelChanged(newPod.GetLabels(), oldPod.GetLabels()) { + return &PodLabelChange + } + return nil +} + +// NodeSchedulingPropertiesChange interprets the update of a node and returns corresponding UpdateNodeXYZ event(s). +func NodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) (events []ClusterEvent) { + nodeChangeExtracters := []nodeChangeExtractor{ + extractNodeSpecUnschedulableChange, + extractNodeAllocatableChange, + extractNodeLabelsChange, + extractNodeTaintsChange, + extractNodeConditionsChange, + extractNodeAnnotationsChange, + } + + for _, fn := range nodeChangeExtracters { + if event := fn(newNode, oldNode); event != nil { + events = append(events, *event) + } + } + return +} + +type nodeChangeExtractor func(newNode *v1.Node, oldNode *v1.Node) *ClusterEvent + +func extractNodeAllocatableChange(newNode *v1.Node, oldNode *v1.Node) *ClusterEvent { + if !equality.Semantic.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable) { + return &NodeAllocatableChange + } + return nil +} + +func extractNodeLabelsChange(newNode *v1.Node, oldNode *v1.Node) *ClusterEvent { + if isLabelChanged(newNode.GetLabels(), oldNode.GetLabels()) { + return &NodeLabelChange + } + return nil +} + +func isLabelChanged(newLabels map[string]string, oldLabels map[string]string) bool { + return !equality.Semantic.DeepEqual(newLabels, oldLabels) +} + +func extractNodeTaintsChange(newNode *v1.Node, oldNode *v1.Node) *ClusterEvent { + if !equality.Semantic.DeepEqual(newNode.Spec.Taints, oldNode.Spec.Taints) { + return &NodeTaintChange + } + return nil +} + +func extractNodeConditionsChange(newNode *v1.Node, oldNode *v1.Node) *ClusterEvent { + strip := func(conditions []v1.NodeCondition) map[v1.NodeConditionType]v1.ConditionStatus { + conditionStatuses := make(map[v1.NodeConditionType]v1.ConditionStatus, len(conditions)) + for i := range conditions { + conditionStatuses[conditions[i].Type] = conditions[i].Status + } + return conditionStatuses + } + if !equality.Semantic.DeepEqual(strip(oldNode.Status.Conditions), strip(newNode.Status.Conditions)) { + return &NodeConditionChange + } + return nil +} + +func extractNodeSpecUnschedulableChange(newNode *v1.Node, oldNode *v1.Node) *ClusterEvent { + if newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable && !newNode.Spec.Unschedulable { + return &NodeSpecUnschedulableChange + } + return nil +} + +func extractNodeAnnotationsChange(newNode *v1.Node, oldNode *v1.Node) *ClusterEvent { + if !equality.Semantic.DeepEqual(oldNode.GetAnnotations(), newNode.GetAnnotations()) { + return &NodeAnnotationChange + } + return nil +} diff --git a/pkg/scheduler/internal/queue/events_test.go b/pkg/scheduler/framework/events_test.go similarity index 72% rename from pkg/scheduler/internal/queue/events_test.go rename to pkg/scheduler/framework/events_test.go index e01160baa2f..2e0bb110663 100644 --- a/pkg/scheduler/internal/queue/events_test.go +++ b/pkg/scheduler/framework/events_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package queue +package framework import ( "reflect" @@ -24,7 +24,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/kubernetes/pkg/scheduler/framework" st "k8s.io/kubernetes/pkg/scheduler/testing" ) @@ -188,7 +187,7 @@ func TestNodeSchedulingPropertiesChange(t *testing.T) { name string newNode *v1.Node oldNode *v1.Node - wantEvents []framework.ClusterEvent + wantEvents []ClusterEvent }{ { name: "no specific changed applied", @@ -200,7 +199,7 @@ func TestNodeSchedulingPropertiesChange(t *testing.T) { name: "only node spec unavailable changed", newNode: st.MakeNode().Unschedulable(false).Obj(), oldNode: st.MakeNode().Unschedulable(true).Obj(), - wantEvents: []framework.ClusterEvent{NodeSpecUnschedulableChange}, + wantEvents: []ClusterEvent{NodeSpecUnschedulableChange}, }, { name: "only node allocatable changed", @@ -214,13 +213,13 @@ func TestNodeSchedulingPropertiesChange(t *testing.T) { v1.ResourceMemory: "100m", v1.ResourceName("example.com/foo"): "2"}, ).Obj(), - wantEvents: []framework.ClusterEvent{NodeAllocatableChange}, + wantEvents: []ClusterEvent{NodeAllocatableChange}, }, { name: "only node label changed", newNode: st.MakeNode().Label("foo", "bar").Obj(), oldNode: st.MakeNode().Label("foo", "fuz").Obj(), - wantEvents: []framework.ClusterEvent{NodeLabelChange}, + wantEvents: []ClusterEvent{NodeLabelChange}, }, { name: "only node taint changed", @@ -230,13 +229,13 @@ func TestNodeSchedulingPropertiesChange(t *testing.T) { oldNode: st.MakeNode().Taints([]v1.Taint{ {Key: v1.TaintNodeUnschedulable, Value: "foo", Effect: v1.TaintEffectNoSchedule}, }).Obj(), - wantEvents: []framework.ClusterEvent{NodeTaintChange}, + wantEvents: []ClusterEvent{NodeTaintChange}, }, { name: "only node annotation changed", newNode: st.MakeNode().Annotation("foo", "bar").Obj(), oldNode: st.MakeNode().Annotation("foo", "fuz").Obj(), - wantEvents: []framework.ClusterEvent{NodeAnnotationChange}, + wantEvents: []ClusterEvent{NodeAnnotationChange}, }, { name: "only node condition changed", @@ -247,7 +246,7 @@ func TestNodeSchedulingPropertiesChange(t *testing.T) { "Ready", "Ready", ).Obj(), - wantEvents: []framework.ClusterEvent{NodeConditionChange}, + wantEvents: []ClusterEvent{NodeConditionChange}, }, { name: "both node label and node taint changed", @@ -259,7 +258,7 @@ func TestNodeSchedulingPropertiesChange(t *testing.T) { oldNode: st.MakeNode().Taints([]v1.Taint{ {Key: v1.TaintNodeUnschedulable, Value: "foo", Effect: v1.TaintEffectNoSchedule}, }).Obj(), - wantEvents: []framework.ClusterEvent{NodeLabelChange, NodeTaintChange}, + wantEvents: []ClusterEvent{NodeLabelChange, NodeTaintChange}, }, } @@ -270,3 +269,114 @@ func TestNodeSchedulingPropertiesChange(t *testing.T) { } } } + +func Test_podSchedulingPropertiesChange(t *testing.T) { + podWithBigRequest := &v1.Pod{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "app", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("101m")}, + }, + }, + }, + }, + Status: v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + { + Name: "app", + AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("101m")}, + }, + }, + }, + } + podWithSmallRequestAndLabel := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"foo": "bar"}, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "app", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("100m")}, + }, + }, + }, + }, + Status: v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + { + Name: "app", + AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("100m")}, + }, + }, + }, + } + podWithSmallRequest := &v1.Pod{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "app", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("100m")}, + }, + }, + }, + }, + Status: v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + { + Name: "app", + AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("100m")}, + }, + }, + }, + } + tests := []struct { + name string + newPod *v1.Pod + oldPod *v1.Pod + want []ClusterEvent + }{ + { + name: "only label is updated", + newPod: st.MakePod().Label("foo", "bar").Obj(), + oldPod: st.MakePod().Label("foo", "bar2").Obj(), + want: []ClusterEvent{PodLabelChange}, + }, + { + name: "pod's resource request is scaled down", + oldPod: podWithBigRequest, + newPod: podWithSmallRequest, + want: []ClusterEvent{PodRequestScaledDown}, + }, + { + name: "pod's resource request is scaled up", + oldPod: podWithSmallRequest, + newPod: podWithBigRequest, + want: []ClusterEvent{assignedPodOtherUpdate}, + }, + { + name: "both pod's resource request and label are updated", + oldPod: podWithBigRequest, + newPod: podWithSmallRequestAndLabel, + want: []ClusterEvent{PodLabelChange, PodRequestScaledDown}, + }, + { + name: "untracked properties of pod is updated", + newPod: st.MakePod().Annotation("foo", "bar").Obj(), + oldPod: st.MakePod().Annotation("foo", "bar2").Obj(), + want: []ClusterEvent{assignedPodOtherUpdate}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := PodSchedulingPropertiesChange(tt.newPod, tt.oldPod) + if diff := cmp.Diff(tt.want, got); diff != "" { + t.Errorf("unexpected event is returned from podSchedulingPropertiesChange (-want, +got):\n%s", diff) + } + }) + } +} diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go b/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go index 76ff0bca5f1..77aacab2cf6 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go @@ -62,7 +62,7 @@ func (pl *InterPodAffinity) EventsToRegister(_ context.Context) ([]framework.Clu // All ActionType includes the following events: // - Delete. An unschedulable Pod may fail due to violating an existing Pod's anti-affinity constraints, // deleting an existing Pod may make it schedulable. - // - Update. Updating on an existing Pod's labels (e.g., removal) may make + // - UpdatePodLabel. Updating on an existing Pod's labels (e.g., removal) may make // an unschedulable Pod schedulable. // - Add. An unschedulable Pod may fail due to violating pod-affinity constraints, // adding an assigned Pod may make it schedulable. @@ -75,7 +75,7 @@ func (pl *InterPodAffinity) EventsToRegister(_ context.Context) ([]framework.Clu // As a workaround, we add UpdateNodeTaint event to catch the case. // We can remove UpdateNodeTaint when we remove the preCheck feature. // See: https://github.com/kubernetes/kubernetes/issues/110175 - {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}, QueueingHintFn: pl.isSchedulableAfterPodChange}, + {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add | framework.UpdatePodLabel | framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodChange}, {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, }, nil } diff --git a/pkg/scheduler/framework/plugins/noderesources/fit.go b/pkg/scheduler/framework/plugins/noderesources/fit.go index 7b8c8ec174f..d62fab1937b 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit.go @@ -250,9 +250,9 @@ func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error func (f *Fit) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { podActionType := framework.Delete if f.enableInPlacePodVerticalScaling { - // If InPlacePodVerticalScaling (KEP 1287) is enabled, then PodUpdate event should be registered + // If InPlacePodVerticalScaling (KEP 1287) is enabled, then UpdatePodScaleDown event should be registered // for this plugin since a Pod update may free up resources that make other Pods schedulable. - podActionType |= framework.Update + podActionType |= framework.UpdatePodScaleDown } return []framework.ClusterEventWithHint{ {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: podActionType}, QueueingHintFn: f.isSchedulableAfterPodChange}, @@ -296,7 +296,7 @@ func (f *Fit) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldOb return framework.QueueSkip, nil } - logger.V(5).Info("the max request resources of another scheduled pod got reduced and it may make the unscheduled pod schedulable", "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod)) + logger.V(5).Info("another scheduled pod or the target pod itself got scaled down, and it may make the unscheduled pod schedulable", "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod)) return framework.Queue, nil } diff --git a/pkg/scheduler/framework/plugins/noderesources/fit_test.go b/pkg/scheduler/framework/plugins/noderesources/fit_test.go index d0cdddfbb6a..263e661abde 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit_test.go @@ -1095,7 +1095,7 @@ func TestEventsToRegister(t *testing.T) { "Register events with InPlacePodVerticalScaling feature enabled", true, []framework.ClusterEventWithHint{ - {Event: framework.ClusterEvent{Resource: "Pod", ActionType: framework.Update | framework.Delete}}, + {Event: framework.ClusterEvent{Resource: "Pod", ActionType: framework.UpdatePodScaleDown | framework.Delete}}, {Event: framework.ClusterEvent{Resource: "Node", ActionType: framework.Add | framework.Update}}, }, }, diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go b/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go index cafec02b0aa..7bcfbd7f3b0 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go @@ -139,11 +139,11 @@ func (pl *PodTopologySpread) EventsToRegister(_ context.Context) ([]framework.Cl // All ActionType includes the following events: // - Add. An unschedulable Pod may fail due to violating topology spread constraints, // adding an assigned Pod may make it schedulable. - // - Update. Updating on an existing Pod's labels (e.g., removal) may make + // - UpdatePodLabel. Updating on an existing Pod's labels (e.g., removal) may make // an unschedulable Pod schedulable. // - Delete. An unschedulable Pod may fail due to violating an existing Pod's topology spread constraints, // deleting an existing Pod may make it schedulable. - {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}, QueueingHintFn: pl.isSchedulableAfterPodChange}, + {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add | framework.UpdatePodLabel | framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodChange}, // Node add|delete|update maybe lead an topology key changed, // and make these pod in scheduling schedulable or unschedulable. // diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index 43cc16e15df..372bcdde0da 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -46,25 +46,48 @@ type ActionType int64 // Constants for ActionTypes. const ( - Add ActionType = 1 << iota // 1 - Delete // 10 - // UpdateNodeXYZ is only applicable for Node events. - UpdateNodeAllocatable // 100 - UpdateNodeLabel // 1000 - UpdateNodeTaint // 10000 - UpdateNodeCondition // 100000 - UpdateNodeAnnotation // 1000000 + Add ActionType = 1 << iota + Delete - All ActionType = 1< 0 { - p.movePodsToActiveOrBackoffQueue(logger, podsToMove, UnschedulableTimeout, nil, nil) + p.movePodsToActiveOrBackoffQueue(logger, podsToMove, framework.UnschedulableTimeout, nil, nil) } } @@ -1068,7 +1069,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error // We can clean it up once we change updatePodInSchedulingQueue to call MoveAllToActiveOrBackoffQueue. // See https://github.com/kubernetes/kubernetes/pull/125578#discussion_r1648338033 for more context. p.inFlightEvents.PushBack(&clusterEvent{ - event: UnscheduledPodUpdate, + event: framework.UnscheduledPodUpdate, oldObj: oldPod, newObj: newPod, }) @@ -1104,15 +1105,21 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error // whether the update may make the pods schedulable. // Plugins have to implement a QueueingHint for Pod/Update event // if the rejection from them could be resolved by updating unscheduled Pods itself. - hint := p.isPodWorthRequeuing(logger, pInfo, UnscheduledPodUpdate, oldPod, newPod) - queue := p.requeuePodViaQueueingHint(logger, pInfo, hint, UnscheduledPodUpdate.Label) - if queue != unschedulablePods { - logger.V(5).Info("Pod moved to an internal scheduling queue because the Pod is updated", "pod", klog.KObj(newPod), "event", PodUpdate, "queue", queue) - p.unschedulablePods.delete(usPodInfo.Pod, gated) - } - if queue == activeQ { - p.cond.Broadcast() + + events := framework.PodSchedulingPropertiesChange(newPod, oldPod) + for _, evt := range events { + hint := p.isPodWorthRequeuing(logger, pInfo, evt, oldPod, newPod) + queue := p.requeuePodViaQueueingHint(logger, pInfo, hint, framework.UnscheduledPodUpdate.Label) + if queue != unschedulablePods { + logger.V(5).Info("Pod moved to an internal scheduling queue because the Pod is updated", "pod", klog.KObj(newPod), "event", framework.PodUpdate, "queue", queue) + p.unschedulablePods.delete(usPodInfo.Pod, gated) + } + if queue == activeQ { + p.cond.Broadcast() + break + } } + return nil } if isPodUpdated(oldPod, newPod) { @@ -1122,11 +1129,11 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error return err } p.unschedulablePods.delete(usPodInfo.Pod, gated) - logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", PodUpdate, "queue", backoffQ) + logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", framework.PodUpdate, "queue", backoffQ) return nil } - if added, err := p.moveToActiveQ(logger, pInfo, BackoffComplete); !added { + if added, err := p.moveToActiveQ(logger, pInfo, framework.BackoffComplete); !added { return err } p.cond.Broadcast() @@ -1139,7 +1146,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error } // If pod is not in any of the queues, we put it in the active queue. pInfo := p.newQueuedPodInfo(newPod) - if added, err := p.moveToActiveQ(logger, pInfo, PodUpdate); !added { + if added, err := p.moveToActiveQ(logger, pInfo, framework.PodUpdate); !added { return err } p.cond.Broadcast() @@ -1172,36 +1179,22 @@ func (p *PriorityQueue) AssignedPodAdded(logger klog.Logger, pod *v1.Pod) { // Pre-filter Pods to move by getUnschedulablePodsWithCrossTopologyTerm // because Pod related events shouldn't make Pods that rejected by single-node scheduling requirement schedulable. - p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithCrossTopologyTerm(logger, pod), AssignedPodAdd, nil, pod) + p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithCrossTopologyTerm(logger, pod), framework.AssignedPodAdd, nil, pod) p.lock.Unlock() } -// isPodResourcesResizedDown returns true if a pod CPU and/or memory resize request has been -// admitted by kubelet, is 'InProgress', and results in a net sizing down of updated resources. -// It returns false if either CPU or memory resource is net resized up, or if no resize is in progress. -func isPodResourcesResizedDown(pod *v1.Pod) bool { - if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { - // TODO(vinaykul,wangchen615,InPlacePodVerticalScaling): Fix this to determine when a - // pod is truly resized down (might need oldPod if we cannot determine from Status alone) - if pod.Status.Resize == v1.PodResizeStatusInProgress { - return true - } - } - return false -} - // AssignedPodUpdated is called when a bound pod is updated. Change of labels // may make pending pods with matching affinity terms schedulable. -func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod) { +func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod, event framework.ClusterEvent) { p.lock.Lock() - if isPodResourcesResizedDown(newPod) { + if event.Resource == framework.Pod && event.ActionType&framework.UpdatePodScaleDown != 0 { // In this case, we don't want to pre-filter Pods by getUnschedulablePodsWithCrossTopologyTerm // because Pod related events may make Pods that were rejected by NodeResourceFit schedulable. - p.moveAllToActiveOrBackoffQueue(logger, AssignedPodUpdate, oldPod, newPod, nil) + p.moveAllToActiveOrBackoffQueue(logger, framework.AssignedPodUpdate, oldPod, newPod, nil) } else { // Pre-filter Pods to move by getUnschedulablePodsWithCrossTopologyTerm // because Pod related events only make Pods rejected by cross topology term schedulable. - p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithCrossTopologyTerm(logger, newPod), AssignedPodUpdate, oldPod, newPod) + p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithCrossTopologyTerm(logger, newPod), event, oldPod, newPod) } p.lock.Unlock() } @@ -1275,7 +1268,7 @@ func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *fra } p.unschedulablePods.addOrUpdate(pInfo) - metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc() + metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", framework.ScheduleAttemptFailure).Inc() return unschedulablePods } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 32549f1aad7..07c321bcb38 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -225,13 +225,13 @@ func Test_InFlightPods(t *testing.T) { // This Pod shouldn't be added to inFlightPods because SchedulingQueueHint is disabled. {podPopped: pod}, // This event shouldn't be added to inFlightEvents because SchedulingQueueHint is disabled. - {eventHappens: &PvAdd}, + {eventHappens: &framework.PvAdd}, }, wantInFlightPods: nil, wantInFlightEvents: nil, queueingHintMap: QueueingHintMapPerProfile{ "": { - PvAdd: { + framework.PvAdd: { { PluginName: "fooPlugin1", QueueingHintFn: queueHintReturnQueue, @@ -246,18 +246,18 @@ func Test_InFlightPods(t *testing.T) { initialPods: []*v1.Pod{pod}, actions: []action{ // This won't be added to inFlightEvents because no inFlightPods at this point. - {eventHappens: &PvcAdd}, + {eventHappens: &framework.PvcAdd}, {podPopped: pod}, // This gets added for the pod. - {eventHappens: &PvAdd}, - // This doesn't get added because no plugin is interested in PvUpdate. - {eventHappens: &PvUpdate}, + {eventHappens: &framework.PvAdd}, + // This doesn't get added because no plugin is interested in framework.PvUpdate. + {eventHappens: &framework.PvUpdate}, }, wantInFlightPods: []*v1.Pod{pod}, - wantInFlightEvents: []interface{}{pod, PvAdd}, + wantInFlightEvents: []interface{}{pod, framework.PvAdd}, queueingHintMap: QueueingHintMapPerProfile{ "": { - PvAdd: { + framework.PvAdd: { { PluginName: "fooPlugin1", QueueingHintFn: queueHintReturnQueue, @@ -272,32 +272,32 @@ func Test_InFlightPods(t *testing.T) { initialPods: []*v1.Pod{pod, pod2}, actions: []action{ // This won't be added to inFlightEvents because no inFlightPods at this point. - {eventHappens: &PvcAdd}, + {eventHappens: &framework.PvcAdd}, {podPopped: pod}, - {eventHappens: &PvAdd}, + {eventHappens: &framework.PvAdd}, {podPopped: pod2}, - {eventHappens: &NodeAdd}, + {eventHappens: &framework.NodeAdd}, // This pod will be requeued to backoffQ because no plugin is registered as unschedulable plugin. {podEnqueued: newQueuedPodInfoForLookup(pod)}, }, wantBackoffQPodNames: []string{"targetpod"}, wantInFlightPods: []*v1.Pod{pod2}, // only pod2 is registered because pod is already enqueued back. - wantInFlightEvents: []interface{}{pod2, NodeAdd}, + wantInFlightEvents: []interface{}{pod2, framework.NodeAdd}, queueingHintMap: QueueingHintMapPerProfile{ "": { - PvAdd: { + framework.PvAdd: { { PluginName: "fooPlugin1", QueueingHintFn: queueHintReturnQueue, }, }, - NodeAdd: { + framework.NodeAdd: { { PluginName: "fooPlugin1", QueueingHintFn: queueHintReturnQueue, }, }, - PvcAdd: { + framework.PvcAdd: { { PluginName: "fooPlugin1", QueueingHintFn: queueHintReturnQueue, @@ -312,14 +312,14 @@ func Test_InFlightPods(t *testing.T) { initialPods: []*v1.Pod{pod, pod2}, actions: []action{ // This won't be added to inFlightEvents because no inFlightPods at this point. - {eventHappens: &PvcAdd}, + {eventHappens: &framework.PvcAdd}, {podPopped: pod}, - {eventHappens: &PvAdd}, + {eventHappens: &framework.PvAdd}, {podPopped: pod2}, - {eventHappens: &NodeAdd}, + {eventHappens: &framework.NodeAdd}, // This pod will be requeued to backoffQ because no plugin is registered as unschedulable plugin. {podEnqueued: newQueuedPodInfoForLookup(pod)}, - {eventHappens: &CSINodeUpdate}, + {eventHappens: &framework.CSINodeUpdate}, // This pod will be requeued to backoffQ because no plugin is registered as unschedulable plugin. {podEnqueued: newQueuedPodInfoForLookup(pod2)}, }, @@ -327,25 +327,25 @@ func Test_InFlightPods(t *testing.T) { wantInFlightPods: nil, // empty queueingHintMap: QueueingHintMapPerProfile{ "": { - PvAdd: { + framework.PvAdd: { { PluginName: "fooPlugin1", QueueingHintFn: queueHintReturnQueue, }, }, - NodeAdd: { + framework.NodeAdd: { { PluginName: "fooPlugin1", QueueingHintFn: queueHintReturnQueue, }, }, - PvcAdd: { + framework.PvcAdd: { { PluginName: "fooPlugin1", QueueingHintFn: queueHintReturnQueue, }, }, - CSINodeUpdate: { + framework.CSINodeUpdate: { { PluginName: "fooPlugin1", QueueingHintFn: queueHintReturnQueue, @@ -360,34 +360,34 @@ func Test_InFlightPods(t *testing.T) { initialPods: []*v1.Pod{pod, pod2, pod3}, actions: []action{ // This won't be added to inFlightEvents because no inFlightPods at this point. - {eventHappens: &PvcAdd}, + {eventHappens: &framework.PvcAdd}, {podPopped: pod}, - {eventHappens: &PvAdd}, + {eventHappens: &framework.PvAdd}, {podPopped: pod2}, - {eventHappens: &NodeAdd}, + {eventHappens: &framework.NodeAdd}, // This Pod won't be requeued again. {podPopped: pod3}, - {eventHappens: &AssignedPodAdd}, + {eventHappens: &framework.AssignedPodAdd}, {podEnqueued: newQueuedPodInfoForLookup(pod2)}, }, wantBackoffQPodNames: []string{"targetpod2"}, wantInFlightPods: []*v1.Pod{pod, pod3}, - wantInFlightEvents: []interface{}{pod, PvAdd, NodeAdd, pod3, AssignedPodAdd}, + wantInFlightEvents: []interface{}{pod, framework.PvAdd, framework.NodeAdd, pod3, framework.AssignedPodAdd}, queueingHintMap: QueueingHintMapPerProfile{ "": { - PvAdd: { + framework.PvAdd: { { PluginName: "fooPlugin1", QueueingHintFn: queueHintReturnQueue, }, }, - NodeAdd: { + framework.NodeAdd: { { PluginName: "fooPlugin1", QueueingHintFn: queueHintReturnQueue, }, }, - AssignedPodAdd: { + framework.AssignedPodAdd: { { PluginName: "fooPlugin1", QueueingHintFn: queueHintReturnQueue, @@ -401,7 +401,7 @@ func Test_InFlightPods(t *testing.T) { initialPods: []*v1.Pod{pod}, actions: []action{ {podPopped: pod}, - {eventHappens: &AssignedPodAdd}, + {eventHappens: &framework.AssignedPodAdd}, {podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")}, }, wantBackoffQPodNames: []string{"targetpod"}, @@ -411,7 +411,7 @@ func Test_InFlightPods(t *testing.T) { "": { // This hint fn tells that this event doesn't make a Pod schedulable. // However, this QueueingHintFn will be ignored actually because SchedulingQueueHint is disabled. - AssignedPodAdd: { + framework.AssignedPodAdd: { { PluginName: "fooPlugin1", QueueingHintFn: queueHintReturnSkip, @@ -425,9 +425,9 @@ func Test_InFlightPods(t *testing.T) { isSchedulingQueueHintEnabled: true, initialPods: []*v1.Pod{pod}, actions: []action{ - {eventHappens: &WildCardEvent}, + {eventHappens: &framework.WildCardEvent}, {podPopped: pod}, - {eventHappens: &AssignedPodAdd}, + {eventHappens: &framework.AssignedPodAdd}, // This Pod won't be requeued to activeQ/backoffQ because fooPlugin1 returns QueueSkip. {podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")}, }, @@ -436,9 +436,9 @@ func Test_InFlightPods(t *testing.T) { wantInFlightEvents: nil, queueingHintMap: QueueingHintMapPerProfile{ "": { - // fooPlugin1 has a queueing hint function for AssignedPodAdd, + // fooPlugin1 has a queueing hint function for framework.AssignedPodAdd, // but hint fn tells that this event doesn't make a Pod scheudlable. - AssignedPodAdd: { + framework.AssignedPodAdd: { { PluginName: "fooPlugin1", QueueingHintFn: queueHintReturnSkip, @@ -453,7 +453,7 @@ func Test_InFlightPods(t *testing.T) { initialPods: []*v1.Pod{pod}, actions: []action{ {podPopped: pod}, - {eventHappens: &AssignedPodAdd}, + {eventHappens: &framework.AssignedPodAdd}, {podEnqueued: newQueuedPodInfoForLookup(pod)}, }, wantBackoffQPodNames: []string{"targetpod"}, @@ -462,7 +462,7 @@ func Test_InFlightPods(t *testing.T) { queueingHintMap: QueueingHintMapPerProfile{ "": { // It will be ignored because no failed plugin. - AssignedPodAdd: { + framework.AssignedPodAdd: { { PluginName: "fooPlugin1", QueueingHintFn: queueHintReturnQueue, @@ -477,7 +477,7 @@ func Test_InFlightPods(t *testing.T) { initialPods: []*v1.Pod{pod}, actions: []action{ {podPopped: pod}, - {eventHappens: &NodeAdd}, + {eventHappens: &framework.NodeAdd}, {podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")}, }, wantUnschedPodPoolPodNames: []string{"targetpod"}, @@ -485,10 +485,10 @@ func Test_InFlightPods(t *testing.T) { wantInFlightEvents: nil, queueingHintMap: QueueingHintMapPerProfile{ "": { - // fooPlugin1 has no queueing hint function for NodeAdd. - AssignedPodAdd: { + // fooPlugin1 has no queueing hint function for framework.NodeAdd. + framework.AssignedPodAdd: { { - // It will be ignored because the event is not NodeAdd. + // It will be ignored because the event is not framework.NodeAdd. PluginName: "fooPlugin1", QueueingHintFn: queueHintReturnQueue, }, @@ -502,7 +502,7 @@ func Test_InFlightPods(t *testing.T) { initialPods: []*v1.Pod{pod}, actions: []action{ {podPopped: pod}, - {eventHappens: &AssignedPodAdd}, + {eventHappens: &framework.AssignedPodAdd}, {podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")}, }, wantUnschedPodPoolPodNames: []string{"targetpod"}, @@ -510,9 +510,9 @@ func Test_InFlightPods(t *testing.T) { wantInFlightEvents: nil, queueingHintMap: QueueingHintMapPerProfile{ "": { - // fooPlugin1 has a queueing hint function for AssignedPodAdd, + // fooPlugin1 has a queueing hint function for framework.AssignedPodAdd, // but hint fn tells that this event doesn't make a Pod scheudlable. - AssignedPodAdd: { + framework.AssignedPodAdd: { { PluginName: "fooPlugin1", QueueingHintFn: queueHintReturnSkip, @@ -527,7 +527,7 @@ func Test_InFlightPods(t *testing.T) { initialPods: []*v1.Pod{pod}, actions: []action{ {podPopped: pod}, - {eventHappens: &AssignedPodAdd}, + {eventHappens: &framework.AssignedPodAdd}, {podEnqueued: &framework.QueuedPodInfo{ PodInfo: mustNewPodInfo(pod), UnschedulablePlugins: sets.New("fooPlugin2", "fooPlugin3"), @@ -539,7 +539,7 @@ func Test_InFlightPods(t *testing.T) { wantInFlightEvents: nil, queueingHintMap: QueueingHintMapPerProfile{ "": { - AssignedPodAdd: { + framework.AssignedPodAdd: { { PluginName: "fooPlugin3", QueueingHintFn: queueHintReturnSkip, @@ -563,7 +563,7 @@ func Test_InFlightPods(t *testing.T) { initialPods: []*v1.Pod{pod}, actions: []action{ {podPopped: pod}, - {eventHappens: &AssignedPodAdd}, + {eventHappens: &framework.AssignedPodAdd}, {podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1", "fooPlugin2")}, }, wantBackoffQPodNames: []string{"targetpod"}, @@ -571,7 +571,7 @@ func Test_InFlightPods(t *testing.T) { wantInFlightEvents: nil, queueingHintMap: QueueingHintMapPerProfile{ "": { - AssignedPodAdd: { + framework.AssignedPodAdd: { { // it will be ignored because the hint fn returns Skip that is weaker than queueHintReturnQueue from fooPlugin1. PluginName: "fooPlugin2", @@ -592,9 +592,9 @@ func Test_InFlightPods(t *testing.T) { initialPods: []*v1.Pod{pod, pod2}, actions: []action{ {callback: func(t *testing.T, q *PriorityQueue) { poppedPod = popPod(t, logger, q, pod) }}, - {eventHappens: &NodeAdd}, + {eventHappens: &framework.NodeAdd}, {callback: func(t *testing.T, q *PriorityQueue) { poppedPod2 = popPod(t, logger, q, pod2) }}, - {eventHappens: &AssignedPodAdd}, + {eventHappens: &framework.AssignedPodAdd}, {callback: func(t *testing.T, q *PriorityQueue) { logger, _ := ktesting.NewTestContext(t) err := q.AddUnschedulableIfNotPresent(logger, poppedPod, q.SchedulingCycle()) @@ -617,7 +617,7 @@ func Test_InFlightPods(t *testing.T) { wantInFlightEvents: nil, queueingHintMap: QueueingHintMapPerProfile{ "": { - AssignedPodAdd: { + framework.AssignedPodAdd: { { // it will be ignored because the hint fn returns QueueSkip that is weaker than queueHintReturnQueueImmediately from fooPlugin1. PluginName: "fooPlugin3", @@ -653,7 +653,7 @@ func Test_InFlightPods(t *testing.T) { t.Errorf("Unexpected error from AddUnschedulableIfNotPresent: %v", err) } }}, - {eventHappens: &PvAdd}, // Active again. + {eventHappens: &framework.PvAdd}, // Active again. {callback: func(t *testing.T, q *PriorityQueue) { poppedPod = popPod(t, logger, q, pod) if len(poppedPod.UnschedulablePlugins) > 0 { @@ -670,7 +670,7 @@ func Test_InFlightPods(t *testing.T) { }, queueingHintMap: QueueingHintMapPerProfile{ "": { - PvAdd: { + framework.PvAdd: { { // The hint fn tells that this event makes a Pod scheudlable immediately. PluginName: "fooPlugin1", @@ -808,7 +808,7 @@ func TestPop(t *testing.T) { pod := st.MakePod().Name("targetpod").UID("pod1").Obj() queueingHintMap := QueueingHintMapPerProfile{ "": { - PvAdd: { + framework.PvAdd: { { // The hint fn tells that this event makes a Pod scheudlable. PluginName: "fooPlugin1", @@ -836,7 +836,7 @@ func TestPop(t *testing.T) { } // Activate it again. - q.MoveAllToActiveOrBackoffQueue(logger, PvAdd, nil, nil, nil) + q.MoveAllToActiveOrBackoffQueue(logger, framework.PvAdd, nil, nil, nil) // Now check result of Pop. poppedPod = popPod(t, logger, q, pod) @@ -905,7 +905,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { } // move all pods to active queue when we were trying to schedule them - q.MoveAllToActiveOrBackoffQueue(logger, WildCardEvent, nil, nil, nil) + q.MoveAllToActiveOrBackoffQueue(logger, framework.WildCardEvent, nil, nil, nil) oldCycle := q.SchedulingCycle() firstPod, _ := q.Pop(logger) @@ -969,7 +969,7 @@ func TestPriorityQueue_Update(t *testing.T) { skipPlugin := "skipPlugin" queueingHintMap := QueueingHintMapPerProfile{ "": { - UnscheduledPodUpdate: { + framework.UnscheduledPodUpdate: { { PluginName: queuePlugin, QueueingHintFn: queueHintReturnQueue, @@ -1172,7 +1172,7 @@ func TestPriorityQueue_UpdateWhenInflight(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, true) m := makeEmptyQueueingHintMapPerProfile() // fakePlugin could change its scheduling result by any updates in Pods. - m[""][UnscheduledPodUpdate] = []*QueueingHintFunction{ + m[""][framework.UnscheduledPodUpdate] = []*QueueingHintFunction{ { PluginName: "fakePlugin", QueueingHintFn: queueHintReturnQueue, @@ -1393,7 +1393,7 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) { m := map[string][]framework.PreEnqueuePlugin{"": tt.plugins} q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m), WithPodInitialBackoffDuration(time.Second*30), WithPodMaxBackoffDuration(time.Second*60)) - got, _ := q.moveToActiveQ(logger, q.newQueuedPodInfo(tt.pod), PodAdd) + got, _ := q.moveToActiveQ(logger, q.newQueuedPodInfo(tt.pod), framework.PodAdd) if got != tt.wantSuccess { t.Errorf("Unexpected result: want %v, but got %v", tt.wantSuccess, got) } @@ -1420,11 +1420,11 @@ func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) { }{ { name: "baseline", - moveEvent: UnschedulableTimeout, + moveEvent: framework.UnschedulableTimeout, }, { name: "worst", - moveEvent: NodeAdd, + moveEvent: framework.NodeAdd, }, { name: "random", @@ -1438,24 +1438,24 @@ func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) { } events := []framework.ClusterEvent{ - NodeAdd, - NodeTaintChange, - NodeAllocatableChange, - NodeConditionChange, - NodeLabelChange, - NodeAnnotationChange, - PvcAdd, - PvcUpdate, - PvAdd, - PvUpdate, - StorageClassAdd, - StorageClassUpdate, - CSINodeAdd, - CSINodeUpdate, - CSIDriverAdd, - CSIDriverUpdate, - CSIStorageCapacityAdd, - CSIStorageCapacityUpdate, + framework.NodeAdd, + framework.NodeTaintChange, + framework.NodeAllocatableChange, + framework.NodeConditionChange, + framework.NodeLabelChange, + framework.NodeAnnotationChange, + framework.PvcAdd, + framework.PvcUpdate, + framework.PvAdd, + framework.PvUpdate, + framework.StorageClassAdd, + framework.StorageClassUpdate, + framework.CSINodeAdd, + framework.CSINodeUpdate, + framework.CSIDriverAdd, + framework.CSIDriverUpdate, + framework.CSIStorageCapacityAdd, + framework.CSIStorageCapacityUpdate, } pluginNum := 20 @@ -1474,7 +1474,7 @@ func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) { c := testingclock.NewFakeClock(time.Now()) m := makeEmptyQueueingHintMapPerProfile() - // - All plugins registered for events[0], which is NodeAdd. + // - All plugins registered for events[0], which is framework.NodeAdd. // - 1/2 of plugins registered for events[1] // - 1/3 of plugins registered for events[2] // - ... @@ -1592,7 +1592,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing. t.Run(test.name, func(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) m := makeEmptyQueueingHintMapPerProfile() - m[""][NodeAdd] = []*QueueingHintFunction{ + m[""][framework.NodeAdd] = []*QueueingHintFunction{ { PluginName: "foo", QueueingHintFn: test.hint, @@ -1611,7 +1611,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing. } cl.Step(test.duration) - q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil, nil, nil) + q.MoveAllToActiveOrBackoffQueue(logger, framework.NodeAdd, nil, nil, nil) if q.podBackoffQ.Len() == 0 && test.expectedQ == backoffQ { t.Fatalf("expected pod to be queued to backoffQ, but it was not") @@ -1636,7 +1636,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { m := makeEmptyQueueingHintMapPerProfile() featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, true) - m[""][NodeAdd] = []*QueueingHintFunction{ + m[""][framework.NodeAdd] = []*QueueingHintFunction{ { PluginName: "fooPlugin", QueueingHintFn: queueHintReturnQueue, @@ -1691,7 +1691,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { expectInFlightPods(t, q) // This NodeAdd event moves unschedulablePodInfo and highPriorityPodInfo to the backoffQ, // because of the queueing hint function registered for NodeAdd/fooPlugin. - q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil, nil, nil) + q.MoveAllToActiveOrBackoffQueue(logger, framework.NodeAdd, nil, nil, nil) q.Add(logger, medPriorityPodInfo.Pod) if q.activeQ.Len() != 1 { t.Errorf("Expected 1 item to be in activeQ, but got: %v", q.activeQ.Len()) @@ -1763,7 +1763,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { // and the pods will be moved into activeQ. c.Step(q.podInitialBackoffDuration) q.flushBackoffQCompleted(logger) // flush the completed backoffQ to move hpp1 to activeQ. - q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil, nil, nil) + q.MoveAllToActiveOrBackoffQueue(logger, framework.NodeAdd, nil, nil, nil) if q.activeQ.Len() != 4 { t.Errorf("Expected 4 items to be in activeQ, but got: %v", q.activeQ.Len()) } @@ -1784,7 +1784,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi defer cancel() m := makeEmptyQueueingHintMapPerProfile() featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, false) - m[""][NodeAdd] = []*QueueingHintFunction{ + m[""][framework.NodeAdd] = []*QueueingHintFunction{ { PluginName: "fooPlugin", QueueingHintFn: queueHintReturnQueue, @@ -1820,7 +1820,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi } // This NodeAdd event moves unschedulablePodInfo and highPriorityPodInfo to the backoffQ, // because of the queueing hint function registered for NodeAdd/fooPlugin. - q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil, nil, nil) + q.MoveAllToActiveOrBackoffQueue(logger, framework.NodeAdd, nil, nil, nil) if q.activeQ.Len() != 1 { t.Errorf("Expected 1 item to be in activeQ, but got: %v", q.activeQ.Len()) } @@ -1876,7 +1876,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi // and the pods will be moved into activeQ. c.Step(q.podInitialBackoffDuration) q.flushBackoffQCompleted(logger) // flush the completed backoffQ to move hpp1 to activeQ. - q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil, nil, nil) + q.MoveAllToActiveOrBackoffQueue(logger, framework.NodeAdd, nil, nil, nil) if q.activeQ.Len() != 4 { t.Errorf("Expected 4 items to be in activeQ, but got: %v", q.activeQ.Len()) } @@ -1973,7 +1973,7 @@ func TestPriorityQueue_AssignedPodAdded_(t *testing.T) { c := testingclock.NewFakeClock(time.Now()) m := makeEmptyQueueingHintMapPerProfile() - m[""][AssignedPodAdd] = []*QueueingHintFunction{ + m[""][framework.AssignedPodAdd] = []*QueueingHintFunction{ { PluginName: "fakePlugin", QueueingHintFn: queueHintReturnQueue, @@ -2134,7 +2134,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) { t.Errorf("Unexpected pending pods summary: want %v, but got %v.", wantSummary, gotSummary) } // Move all to active queue. We should still see the same set of pods. - q.MoveAllToActiveOrBackoffQueue(logger, WildCardEvent, nil, nil, nil) + q.MoveAllToActiveOrBackoffQueue(logger, framework.WildCardEvent, nil, nil, nil) gotPods, gotSummary = q.PendingPods() if diff := cmp.Diff(expectedSet, makeSet(gotPods)); diff != "" { t.Errorf("Unexpected list of pending Pods (-want, +got):\n%s", diff) @@ -2420,7 +2420,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) { } c.Step(DefaultPodInitialBackoffDuration) // Move all unschedulable pods to the active queue. - q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil, nil, nil) + q.MoveAllToActiveOrBackoffQueue(logger, framework.UnschedulableTimeout, nil, nil, nil) // Simulation is over. Now let's pop all pods. The pod popped first should be // the last one we pop here. for i := 0; i < 5; i++ { @@ -2471,7 +2471,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { // Move clock to make the unschedulable pods complete backoff. c.Step(DefaultPodInitialBackoffDuration + time.Second) // Move all unschedulable pods to the active queue. - q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil, nil, nil) + q.MoveAllToActiveOrBackoffQueue(logger, framework.UnschedulableTimeout, nil, nil, nil) // Simulate a pod being popped by the scheduler, // At this time, unschedulable pod should be popped. @@ -2504,7 +2504,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { // Move clock to make the unschedulable pods complete backoff. c.Step(DefaultPodInitialBackoffDuration + time.Second) // Move all unschedulable pods to the active queue. - q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil, nil, nil) + q.MoveAllToActiveOrBackoffQueue(logger, framework.UnschedulableTimeout, nil, nil, nil) // At this time, newerPod should be popped // because it is the oldest tried pod. @@ -2551,7 +2551,7 @@ func TestHighPriorityBackoff(t *testing.T) { t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) } // Move all unschedulable pods to the active queue. - q.MoveAllToActiveOrBackoffQueue(logger, WildCardEvent, nil, nil, nil) + q.MoveAllToActiveOrBackoffQueue(logger, framework.WildCardEvent, nil, nil, nil) p, err = q.Pop(logger) if err != nil { @@ -2567,7 +2567,7 @@ func TestHighPriorityBackoff(t *testing.T) { func TestHighPriorityFlushUnschedulablePodsLeftover(t *testing.T) { c := testingclock.NewFakeClock(time.Now()) m := makeEmptyQueueingHintMapPerProfile() - m[""][NodeAdd] = []*QueueingHintFunction{ + m[""][framework.NodeAdd] = []*QueueingHintFunction{ { PluginName: "fakePlugin", QueueingHintFn: queueHintReturnQueue, @@ -2784,7 +2784,7 @@ var ( queue.podBackoffQ.Add(pInfo) } moveAllToActiveOrBackoffQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) { - queue.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil, nil, nil) + queue.MoveAllToActiveOrBackoffQueue(logger, framework.UnschedulableTimeout, nil, nil, nil) } flushBackoffQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) { queue.clock.(*testingclock.FakeClock).Step(2 * time.Second) @@ -3419,7 +3419,7 @@ func TestBackOffFlow(t *testing.T) { } // An event happens. - q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil, nil, nil) + q.MoveAllToActiveOrBackoffQueue(logger, framework.UnschedulableTimeout, nil, nil, nil) if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok { t.Errorf("pod %v is not in the backoff queue", podID) @@ -3468,20 +3468,20 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { { name: "nil PreEnqueueCheck", podInfos: podInfos, - event: WildCardEvent, + event: framework.WildCardEvent, want: []string{"p0", "p1", "p2", "p3", "p4"}, }, { name: "move Pods with priority greater than 2", podInfos: podInfos, - event: WildCardEvent, + event: framework.WildCardEvent, preEnqueueCheck: func(pod *v1.Pod) bool { return *pod.Spec.Priority >= 2 }, want: []string{"p2", "p3", "p4"}, }, { name: "move Pods with even priority and greater than 2", podInfos: podInfos, - event: WildCardEvent, + event: framework.WildCardEvent, preEnqueueCheck: func(pod *v1.Pod) bool { return *pod.Spec.Priority%2 == 0 && *pod.Spec.Priority >= 2 }, @@ -3490,7 +3490,7 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { { name: "move Pods with even and negative priority", podInfos: podInfos, - event: WildCardEvent, + event: framework.WildCardEvent, preEnqueueCheck: func(pod *v1.Pod) bool { return *pod.Spec.Priority%2 == 0 && *pod.Spec.Priority < 0 }, @@ -3498,7 +3498,7 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { { name: "preCheck isn't called if the event is not interested by any plugins", podInfos: podInfos, - event: PvAdd, // No plugin is interested in this event. + event: framework.PvAdd, // No plugin is interested in this event. preEnqueueCheck: func(pod *v1.Pod) bool { panic("preCheck shouldn't be called") }, @@ -3653,17 +3653,17 @@ func Test_isPodWorthRequeuing(t *testing.T) { UnschedulablePlugins: sets.New("fooPlugin1"), PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()), }, - event: NodeAdd, + event: framework.NodeAdd, oldObj: nil, newObj: st.MakeNode().Obj(), expected: queueSkip, expectedExecutionCount: 0, queueingHintMap: QueueingHintMapPerProfile{ "": { - // no queueing hint function for NodeAdd. - AssignedPodAdd: { + // no queueing hint function for framework.NodeAdd. + framework.AssignedPodAdd: { { - // It will be ignored because the event is not NodeAdd. + // It will be ignored because the event is not framework.NodeAdd. PluginName: "fooPlugin1", QueueingHintFn: queueHintReturnQueue, }, @@ -3677,14 +3677,14 @@ func Test_isPodWorthRequeuing(t *testing.T) { UnschedulablePlugins: sets.New("fooPlugin1"), PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()), }, - event: NodeAdd, + event: framework.NodeAdd, oldObj: nil, newObj: st.MakeNode().Obj(), expected: queueAfterBackoff, expectedExecutionCount: 1, queueingHintMap: QueueingHintMapPerProfile{ "": { - NodeAdd: { + framework.NodeAdd: { { PluginName: "fooPlugin1", QueueingHintFn: queueHintReturnErr, @@ -3699,7 +3699,7 @@ func Test_isPodWorthRequeuing(t *testing.T) { UnschedulablePlugins: sets.New("fooPlugin1"), PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()), }, - event: WildCardEvent, + event: framework.WildCardEvent, oldObj: nil, newObj: st.MakeNode().Obj(), expected: queueAfterBackoff, @@ -3713,14 +3713,14 @@ func Test_isPodWorthRequeuing(t *testing.T) { PendingPlugins: sets.New("fooPlugin2"), PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()), }, - event: NodeAdd, + event: framework.NodeAdd, oldObj: nil, newObj: st.MakeNode().Node, expected: queueImmediately, expectedExecutionCount: 2, queueingHintMap: QueueingHintMapPerProfile{ "": { - NodeAdd: { + framework.NodeAdd: { { PluginName: "fooPlugin1", // It returns Queue and it's interpreted as queueAfterBackoff. @@ -3751,14 +3751,14 @@ func Test_isPodWorthRequeuing(t *testing.T) { UnschedulablePlugins: sets.New("fooPlugin1", "fooPlugin2"), PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()), }, - event: NodeAdd, + event: framework.NodeAdd, oldObj: nil, newObj: st.MakeNode().Obj(), expected: queueAfterBackoff, expectedExecutionCount: 2, queueingHintMap: QueueingHintMapPerProfile{ "": { - NodeAdd: { + framework.NodeAdd: { { // Skip will be ignored PluginName: "fooPlugin1", @@ -3779,14 +3779,14 @@ func Test_isPodWorthRequeuing(t *testing.T) { UnschedulablePlugins: sets.New("fooPlugin1", "fooPlugin2"), PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()), }, - event: NodeAdd, + event: framework.NodeAdd, oldObj: nil, newObj: st.MakeNode().Node, expected: queueSkip, expectedExecutionCount: 2, queueingHintMap: QueueingHintMapPerProfile{ "": { - NodeAdd: { + framework.NodeAdd: { { PluginName: "fooPlugin1", QueueingHintFn: queueHintReturnSkip, @@ -3834,7 +3834,7 @@ func Test_isPodWorthRequeuing(t *testing.T) { QueueingHintFn: queueHintReturnQueue, }, }, - NodeAdd: { // not executed because NodeAdd is unrelated. + framework.NodeAdd: { // not executed because NodeAdd is unrelated. { PluginName: "fooPlugin1", QueueingHintFn: queueHintReturnQueue, diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index 2ac55fc6b60..b842366eb20 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -38,7 +38,6 @@ import ( "k8s.io/kubernetes/pkg/apis/core/validation" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" - internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" utiltrace "k8s.io/utils/trace" @@ -355,11 +354,11 @@ func (sched *Scheduler) handleBindingCycleError( // It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would // add this event to in-flight events and thus move the assumed pod to backoffQ anyways if the plugins don't have appropriate QueueingHint. if status.IsRejected() { - defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, internalqueue.AssignedPodDelete, assumedPod, nil, func(pod *v1.Pod) bool { + defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.AssignedPodDelete, assumedPod, nil, func(pod *v1.Pod) bool { return assumedPod.UID != pod.UID }) } else { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, internalqueue.AssignedPodDelete, assumedPod, nil, nil) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.AssignedPodDelete, assumedPod, nil, nil) } } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index ed342ae7635..d6351d64509 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -782,9 +782,10 @@ func Test_buildQueueingHintMap(t *testing.T) { // Test_UnionedGVKs tests UnionedGVKs worked with buildQueueingHintMap. func Test_UnionedGVKs(t *testing.T) { tests := []struct { - name string - plugins schedulerapi.PluginSet - want map[framework.GVK]framework.ActionType + name string + plugins schedulerapi.PluginSet + want map[framework.GVK]framework.ActionType + enableInPlacePodVerticalScaling bool }{ { name: "filter without EnqueueExtensions plugin", @@ -867,10 +868,10 @@ func Test_UnionedGVKs(t *testing.T) { want: map[framework.GVK]framework.ActionType{}, }, { - name: "plugins with default profile", + name: "plugins with default profile (InPlacePodVerticalScaling: disabled)", plugins: schedulerapi.PluginSet{Enabled: defaults.PluginsV1.MultiPoint.Enabled}, want: map[framework.GVK]framework.ActionType{ - framework.Pod: framework.All, + framework.Pod: framework.Add | framework.UpdatePodLabel | framework.Delete, framework.Node: framework.All, framework.CSINode: framework.All - framework.Delete, framework.CSIDriver: framework.All - framework.Delete, @@ -880,9 +881,26 @@ func Test_UnionedGVKs(t *testing.T) { framework.StorageClass: framework.All - framework.Delete, }, }, + { + name: "plugins with default profile (InPlacePodVerticalScaling: enabled)", + plugins: schedulerapi.PluginSet{Enabled: defaults.PluginsV1.MultiPoint.Enabled}, + want: map[framework.GVK]framework.ActionType{ + framework.Pod: framework.Add | framework.UpdatePodLabel | framework.UpdatePodScaleDown | framework.Delete, + framework.Node: framework.All, + framework.CSINode: framework.All - framework.Delete, + framework.CSIDriver: framework.All - framework.Delete, + framework.CSIStorageCapacity: framework.All - framework.Delete, + framework.PersistentVolume: framework.All - framework.Delete, + framework.PersistentVolumeClaim: framework.All - framework.Delete, + framework.StorageClass: framework.All - framework.Delete, + }, + enableInPlacePodVerticalScaling: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, tt.enableInPlacePodVerticalScaling) + _, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel()