Merge pull request #122628 from sanposhiho/pod-smaller-events

add(scheduler/framework): implement smaller Pod update events
This commit is contained in:
Kubernetes Prow Robot 2024-07-23 18:01:46 -07:00 committed by GitHub
commit 39a80796b6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 610 additions and 395 deletions

View File

@ -48,7 +48,7 @@ import (
func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
start := time.Now()
defer metrics.EventHandlingLatency.WithLabelValues(queue.StorageClassAdd.Label).Observe(metrics.SinceInSeconds(start))
defer metrics.EventHandlingLatency.WithLabelValues(framework.StorageClassAdd.Label).Observe(metrics.SinceInSeconds(start))
logger := sched.logger
sc, ok := obj.(*storagev1.StorageClass)
if !ok {
@ -63,13 +63,13 @@ func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
// We don't need to invalidate cached results because results will not be
// cached for pod that has unbound immediate PVCs.
if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.StorageClassAdd, nil, sc, nil)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.StorageClassAdd, nil, sc, nil)
}
}
func (sched *Scheduler) addNodeToCache(obj interface{}) {
start := time.Now()
defer metrics.EventHandlingLatency.WithLabelValues(queue.NodeAdd.Label).Observe(metrics.SinceInSeconds(start))
defer metrics.EventHandlingLatency.WithLabelValues(framework.NodeAdd.Label).Observe(metrics.SinceInSeconds(start))
logger := sched.logger
node, ok := obj.(*v1.Node)
if !ok {
@ -79,7 +79,7 @@ func (sched *Scheduler) addNodeToCache(obj interface{}) {
logger.V(3).Info("Add event for node", "node", klog.KObj(node))
nodeInfo := sched.Cache.AddNode(logger, node)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.NodeAdd, nil, node, preCheckForNode(nodeInfo))
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.NodeAdd, nil, node, preCheckForNode(nodeInfo))
}
func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
@ -98,7 +98,7 @@ func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
logger.V(4).Info("Update event for node", "node", klog.KObj(newNode))
nodeInfo := sched.Cache.UpdateNode(logger, oldNode, newNode)
events := queue.NodeSchedulingPropertiesChange(newNode, oldNode)
events := framework.NodeSchedulingPropertiesChange(newNode, oldNode)
// Save the time it takes to update the node in the cache.
updatingDuration := metrics.SinceInSeconds(start)
@ -115,7 +115,7 @@ func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
func (sched *Scheduler) deleteNodeFromCache(obj interface{}) {
start := time.Now()
defer metrics.EventHandlingLatency.WithLabelValues(queue.NodeDelete.Label).Observe(metrics.SinceInSeconds(start))
defer metrics.EventHandlingLatency.WithLabelValues(framework.NodeDelete.Label).Observe(metrics.SinceInSeconds(start))
logger := sched.logger
var node *v1.Node
@ -142,7 +142,7 @@ func (sched *Scheduler) deleteNodeFromCache(obj interface{}) {
func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
start := time.Now()
defer metrics.EventHandlingLatency.WithLabelValues(queue.UnscheduledPodAdd.Label).Observe(metrics.SinceInSeconds(start))
defer metrics.EventHandlingLatency.WithLabelValues(framework.UnscheduledPodAdd.Label).Observe(metrics.SinceInSeconds(start))
logger := sched.logger
pod := obj.(*v1.Pod)
@ -154,7 +154,7 @@ func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
start := time.Now()
defer metrics.EventHandlingLatency.WithLabelValues(queue.UnscheduledPodUpdate.Label).Observe(metrics.SinceInSeconds(start))
defer metrics.EventHandlingLatency.WithLabelValues(framework.UnscheduledPodUpdate.Label).Observe(metrics.SinceInSeconds(start))
logger := sched.logger
oldPod, newPod := oldObj.(*v1.Pod), newObj.(*v1.Pod)
// Bypass update event that carries identical objects; otherwise, a duplicated
@ -179,7 +179,7 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
start := time.Now()
defer metrics.EventHandlingLatency.WithLabelValues(queue.UnscheduledPodDelete.Label).Observe(metrics.SinceInSeconds(start))
defer metrics.EventHandlingLatency.WithLabelValues(framework.UnscheduledPodDelete.Label).Observe(metrics.SinceInSeconds(start))
logger := sched.logger
var pod *v1.Pod
@ -213,13 +213,13 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
// removing it from the scheduler cache. In this case, signal a AssignedPodDelete
// event to immediately retry some unscheduled Pods.
if fwk.RejectWaitingPod(pod.UID) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodDelete, pod, nil, nil)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.AssignedPodDelete, pod, nil, nil)
}
}
func (sched *Scheduler) addPodToCache(obj interface{}) {
start := time.Now()
defer metrics.EventHandlingLatency.WithLabelValues(queue.AssignedPodAdd.Label).Observe(metrics.SinceInSeconds(start))
defer metrics.EventHandlingLatency.WithLabelValues(framework.AssignedPodAdd.Label).Observe(metrics.SinceInSeconds(start))
logger := sched.logger
pod, ok := obj.(*v1.Pod)
@ -243,7 +243,7 @@ func (sched *Scheduler) addPodToCache(obj interface{}) {
// Here we use MoveAllToActiveOrBackoffQueue only when QueueingHint is enabled.
// (We cannot switch to MoveAllToActiveOrBackoffQueue right away because of throughput concern.)
if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodAdd, nil, pod, nil)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.AssignedPodAdd, nil, pod, nil)
} else {
sched.SchedulingQueue.AssignedPodAdded(logger, pod)
}
@ -251,7 +251,7 @@ func (sched *Scheduler) addPodToCache(obj interface{}) {
func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
start := time.Now()
defer metrics.EventHandlingLatency.WithLabelValues(queue.AssignedPodUpdate.Label).Observe(metrics.SinceInSeconds(start))
defer metrics.EventHandlingLatency.WithLabelValues(framework.AssignedPodUpdate.Label).Observe(metrics.SinceInSeconds(start))
logger := sched.logger
oldPod, ok := oldObj.(*v1.Pod)
@ -270,25 +270,28 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
logger.Error(err, "Scheduler cache UpdatePod failed", "pod", klog.KObj(oldPod))
}
// SchedulingQueue.AssignedPodUpdated has a problem:
// It internally pre-filters Pods to move to activeQ,
// while taking only in-tree plugins into consideration.
// Consequently, if custom plugins that subscribes Pod/Update events reject Pods,
// those Pods will never be requeued to activeQ by an assigned Pod related events,
// and they may be stuck in unschedulableQ.
//
// Here we use MoveAllToActiveOrBackoffQueue only when QueueingHint is enabled.
// (We cannot switch to MoveAllToActiveOrBackoffQueue right away because of throughput concern.)
if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodUpdate, oldPod, newPod, nil)
} else {
sched.SchedulingQueue.AssignedPodUpdated(logger, oldPod, newPod)
events := framework.PodSchedulingPropertiesChange(newPod, oldPod)
for _, evt := range events {
// SchedulingQueue.AssignedPodUpdated has a problem:
// It internally pre-filters Pods to move to activeQ,
// while taking only in-tree plugins into consideration.
// Consequently, if custom plugins that subscribes Pod/Update events reject Pods,
// those Pods will never be requeued to activeQ by an assigned Pod related events,
// and they may be stuck in unschedulableQ.
//
// Here we use MoveAllToActiveOrBackoffQueue only when QueueingHint is enabled.
// (We cannot switch to MoveAllToActiveOrBackoffQueue right away because of throughput concern.)
if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.AssignedPodUpdate, oldPod, newPod, nil)
} else {
sched.SchedulingQueue.AssignedPodUpdated(logger, oldPod, newPod, evt)
}
}
}
func (sched *Scheduler) deletePodFromCache(obj interface{}) {
start := time.Now()
defer metrics.EventHandlingLatency.WithLabelValues(queue.AssignedPodDelete.Label).Observe(metrics.SinceInSeconds(start))
defer metrics.EventHandlingLatency.WithLabelValues(framework.AssignedPodDelete.Label).Observe(metrics.SinceInSeconds(start))
logger := sched.logger
var pod *v1.Pod
@ -312,7 +315,7 @@ func (sched *Scheduler) deletePodFromCache(obj interface{}) {
logger.Error(err, "Scheduler cache RemovePod failed", "pod", klog.KObj(pod))
}
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodDelete, pod, nil, nil)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.AssignedPodDelete, pod, nil, nil)
}
// assignedPod selects pods that are assigned (scheduled and running).
@ -557,8 +560,8 @@ func addAllEventHandlers(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, obj interface{}) {
start := time.Now()
defer metrics.EventHandlingLatency.WithLabelValues(queue.StorageClassUpdate.Label).Observe(metrics.SinceInSeconds(start))
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.StorageClassUpdate, old, obj, nil)
defer metrics.EventHandlingLatency.WithLabelValues(framework.StorageClassUpdate.Label).Observe(metrics.SinceInSeconds(start))
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.StorageClassUpdate, old, obj, nil)
},
},
); err != nil {

View File

@ -0,0 +1,234 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/api/v1/resource"
"k8s.io/kubernetes/pkg/features"
)
const (
// PodAdd is the event when a new pod is added to API server.
PodAdd = "PodAdd"
// ScheduleAttemptFailure is the event when a schedule attempt fails.
ScheduleAttemptFailure = "ScheduleAttemptFailure"
// BackoffComplete is the event when a pod finishes backoff.
BackoffComplete = "BackoffComplete"
// ForceActivate is the event when a pod is moved from unschedulablePods/backoffQ
// to activeQ. Usually it's triggered by plugin implementations.
ForceActivate = "ForceActivate"
// PodUpdate is the event when a pod is updated
PodUpdate = "PodUpdate"
)
var (
// AssignedPodAdd is the event when an assigned pod is added.
AssignedPodAdd = ClusterEvent{Resource: Pod, ActionType: Add, Label: "AssignedPodAdd"}
// NodeAdd is the event when a new node is added to the cluster.
NodeAdd = ClusterEvent{Resource: Node, ActionType: Add, Label: "NodeAdd"}
// NodeDelete is the event when a node is deleted from the cluster.
NodeDelete = ClusterEvent{Resource: Node, ActionType: Delete, Label: "NodeDelete"}
// AssignedPodUpdate is the event when an assigned pod is updated.
AssignedPodUpdate = ClusterEvent{Resource: Pod, ActionType: Update, Label: "AssignedPodUpdate"}
// UnscheduledPodAdd is the event when an unscheduled pod is added.
UnscheduledPodAdd = ClusterEvent{Resource: Pod, ActionType: Update, Label: "UnschedulablePodAdd"}
// UnscheduledPodUpdate is the event when an unscheduled pod is updated.
UnscheduledPodUpdate = ClusterEvent{Resource: Pod, ActionType: Update, Label: "UnschedulablePodUpdate"}
// UnscheduledPodDelete is the event when an unscheduled pod is deleted.
UnscheduledPodDelete = ClusterEvent{Resource: Pod, ActionType: Update, Label: "UnschedulablePodDelete"}
// assignedPodOtherUpdate is the event when an assigned pod got updated in fields that are not covered by UpdatePodXXX.
assignedPodOtherUpdate = ClusterEvent{Resource: Pod, ActionType: updatePodOther, Label: "AssignedPodUpdate"}
// AssignedPodDelete is the event when an assigned pod is deleted.
AssignedPodDelete = ClusterEvent{Resource: Pod, ActionType: Delete, Label: "AssignedPodDelete"}
// PodRequestScaledDown is the event when a pod's resource request is scaled down.
PodRequestScaledDown = ClusterEvent{Resource: Pod, ActionType: UpdatePodScaleDown, Label: "PodRequestScaledDown"}
// PodLabelChange is the event when a pod's label is changed.
PodLabelChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodLabel, Label: "PodLabelChange"}
// NodeSpecUnschedulableChange is the event when unschedulable node spec is changed.
NodeSpecUnschedulableChange = ClusterEvent{Resource: Node, ActionType: UpdateNodeTaint, Label: "NodeSpecUnschedulableChange"}
// NodeAllocatableChange is the event when node allocatable is changed.
NodeAllocatableChange = ClusterEvent{Resource: Node, ActionType: UpdateNodeAllocatable, Label: "NodeAllocatableChange"}
// NodeLabelChange is the event when node label is changed.
NodeLabelChange = ClusterEvent{Resource: Node, ActionType: UpdateNodeLabel, Label: "NodeLabelChange"}
// NodeAnnotationChange is the event when node annotation is changed.
NodeAnnotationChange = ClusterEvent{Resource: Node, ActionType: UpdateNodeAnnotation, Label: "NodeAnnotationChange"}
// NodeTaintChange is the event when node taint is changed.
NodeTaintChange = ClusterEvent{Resource: Node, ActionType: UpdateNodeTaint, Label: "NodeTaintChange"}
// NodeConditionChange is the event when node condition is changed.
NodeConditionChange = ClusterEvent{Resource: Node, ActionType: UpdateNodeCondition, Label: "NodeConditionChange"}
// PvAdd is the event when a persistent volume is added in the cluster.
PvAdd = ClusterEvent{Resource: PersistentVolume, ActionType: Add, Label: "PvAdd"}
// PvUpdate is the event when a persistent volume is updated in the cluster.
PvUpdate = ClusterEvent{Resource: PersistentVolume, ActionType: Update, Label: "PvUpdate"}
// PvcAdd is the event when a persistent volume claim is added in the cluster.
PvcAdd = ClusterEvent{Resource: PersistentVolumeClaim, ActionType: Add, Label: "PvcAdd"}
// PvcUpdate is the event when a persistent volume claim is updated in the cluster.
PvcUpdate = ClusterEvent{Resource: PersistentVolumeClaim, ActionType: Update, Label: "PvcUpdate"}
// StorageClassAdd is the event when a StorageClass is added in the cluster.
StorageClassAdd = ClusterEvent{Resource: StorageClass, ActionType: Add, Label: "StorageClassAdd"}
// StorageClassUpdate is the event when a StorageClass is updated in the cluster.
StorageClassUpdate = ClusterEvent{Resource: StorageClass, ActionType: Update, Label: "StorageClassUpdate"}
// CSINodeAdd is the event when a CSI node is added in the cluster.
CSINodeAdd = ClusterEvent{Resource: CSINode, ActionType: Add, Label: "CSINodeAdd"}
// CSINodeUpdate is the event when a CSI node is updated in the cluster.
CSINodeUpdate = ClusterEvent{Resource: CSINode, ActionType: Update, Label: "CSINodeUpdate"}
// CSIDriverAdd is the event when a CSI driver is added in the cluster.
CSIDriverAdd = ClusterEvent{Resource: CSIDriver, ActionType: Add, Label: "CSIDriverAdd"}
// CSIDriverUpdate is the event when a CSI driver is updated in the cluster.
CSIDriverUpdate = ClusterEvent{Resource: CSIDriver, ActionType: Update, Label: "CSIDriverUpdate"}
// CSIStorageCapacityAdd is the event when a CSI storage capacity is added in the cluster.
CSIStorageCapacityAdd = ClusterEvent{Resource: CSIStorageCapacity, ActionType: Add, Label: "CSIStorageCapacityAdd"}
// CSIStorageCapacityUpdate is the event when a CSI storage capacity is updated in the cluster.
CSIStorageCapacityUpdate = ClusterEvent{Resource: CSIStorageCapacity, ActionType: Update, Label: "CSIStorageCapacityUpdate"}
// WildCardEvent semantically matches all resources on all actions.
WildCardEvent = ClusterEvent{Resource: WildCard, ActionType: All, Label: "WildCardEvent"}
// UnschedulableTimeout is the event when a pod stays in unschedulable for longer than timeout.
UnschedulableTimeout = ClusterEvent{Resource: WildCard, ActionType: All, Label: "UnschedulableTimeout"}
)
// PodSchedulingPropertiesChange interprets the update of a pod and returns corresponding UpdatePodXYZ event(s).
// Once we have other pod update events, we should update here as well.
func PodSchedulingPropertiesChange(newPod *v1.Pod, oldPod *v1.Pod) (events []ClusterEvent) {
podChangeExtracters := []podChangeExtractor{
extractPodLabelsChange,
extractPodScaleDown,
}
for _, fn := range podChangeExtracters {
if event := fn(newPod, oldPod); event != nil {
events = append(events, *event)
}
}
if len(events) == 0 {
// When no specific event is found, we use AssignedPodOtherUpdate,
// which should only trigger plugins registering a general Pod/Update event.
events = append(events, assignedPodOtherUpdate)
}
return
}
type podChangeExtractor func(newNode *v1.Pod, oldNode *v1.Pod) *ClusterEvent
// extractPodScaleDown interprets the update of a pod and returns PodRequestScaledDown event if any pod's resource request(s) is scaled down.
func extractPodScaleDown(newPod, oldPod *v1.Pod) *ClusterEvent {
opt := resource.PodResourcesOptions{
InPlacePodVerticalScalingEnabled: utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling),
}
newPodRequests := resource.PodRequests(newPod, opt)
oldPodRequests := resource.PodRequests(oldPod, opt)
for rName, oldReq := range oldPodRequests {
newReq, ok := newPodRequests[rName]
if !ok {
// The resource request of rName is removed.
return &PodRequestScaledDown
}
if oldReq.MilliValue() > newReq.MilliValue() {
// The resource request of rName is scaled down.
return &PodRequestScaledDown
}
}
return nil
}
func extractPodLabelsChange(newPod *v1.Pod, oldPod *v1.Pod) *ClusterEvent {
if isLabelChanged(newPod.GetLabels(), oldPod.GetLabels()) {
return &PodLabelChange
}
return nil
}
// NodeSchedulingPropertiesChange interprets the update of a node and returns corresponding UpdateNodeXYZ event(s).
func NodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) (events []ClusterEvent) {
nodeChangeExtracters := []nodeChangeExtractor{
extractNodeSpecUnschedulableChange,
extractNodeAllocatableChange,
extractNodeLabelsChange,
extractNodeTaintsChange,
extractNodeConditionsChange,
extractNodeAnnotationsChange,
}
for _, fn := range nodeChangeExtracters {
if event := fn(newNode, oldNode); event != nil {
events = append(events, *event)
}
}
return
}
type nodeChangeExtractor func(newNode *v1.Node, oldNode *v1.Node) *ClusterEvent
func extractNodeAllocatableChange(newNode *v1.Node, oldNode *v1.Node) *ClusterEvent {
if !equality.Semantic.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable) {
return &NodeAllocatableChange
}
return nil
}
func extractNodeLabelsChange(newNode *v1.Node, oldNode *v1.Node) *ClusterEvent {
if isLabelChanged(newNode.GetLabels(), oldNode.GetLabels()) {
return &NodeLabelChange
}
return nil
}
func isLabelChanged(newLabels map[string]string, oldLabels map[string]string) bool {
return !equality.Semantic.DeepEqual(newLabels, oldLabels)
}
func extractNodeTaintsChange(newNode *v1.Node, oldNode *v1.Node) *ClusterEvent {
if !equality.Semantic.DeepEqual(newNode.Spec.Taints, oldNode.Spec.Taints) {
return &NodeTaintChange
}
return nil
}
func extractNodeConditionsChange(newNode *v1.Node, oldNode *v1.Node) *ClusterEvent {
strip := func(conditions []v1.NodeCondition) map[v1.NodeConditionType]v1.ConditionStatus {
conditionStatuses := make(map[v1.NodeConditionType]v1.ConditionStatus, len(conditions))
for i := range conditions {
conditionStatuses[conditions[i].Type] = conditions[i].Status
}
return conditionStatuses
}
if !equality.Semantic.DeepEqual(strip(oldNode.Status.Conditions), strip(newNode.Status.Conditions)) {
return &NodeConditionChange
}
return nil
}
func extractNodeSpecUnschedulableChange(newNode *v1.Node, oldNode *v1.Node) *ClusterEvent {
if newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable && !newNode.Spec.Unschedulable {
return &NodeSpecUnschedulableChange
}
return nil
}
func extractNodeAnnotationsChange(newNode *v1.Node, oldNode *v1.Node) *ClusterEvent {
if !equality.Semantic.DeepEqual(oldNode.GetAnnotations(), newNode.GetAnnotations()) {
return &NodeAnnotationChange
}
return nil
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
package framework
import (
"reflect"
@ -24,7 +24,6 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/scheduler/framework"
st "k8s.io/kubernetes/pkg/scheduler/testing"
)
@ -188,7 +187,7 @@ func TestNodeSchedulingPropertiesChange(t *testing.T) {
name string
newNode *v1.Node
oldNode *v1.Node
wantEvents []framework.ClusterEvent
wantEvents []ClusterEvent
}{
{
name: "no specific changed applied",
@ -200,7 +199,7 @@ func TestNodeSchedulingPropertiesChange(t *testing.T) {
name: "only node spec unavailable changed",
newNode: st.MakeNode().Unschedulable(false).Obj(),
oldNode: st.MakeNode().Unschedulable(true).Obj(),
wantEvents: []framework.ClusterEvent{NodeSpecUnschedulableChange},
wantEvents: []ClusterEvent{NodeSpecUnschedulableChange},
},
{
name: "only node allocatable changed",
@ -214,13 +213,13 @@ func TestNodeSchedulingPropertiesChange(t *testing.T) {
v1.ResourceMemory: "100m",
v1.ResourceName("example.com/foo"): "2"},
).Obj(),
wantEvents: []framework.ClusterEvent{NodeAllocatableChange},
wantEvents: []ClusterEvent{NodeAllocatableChange},
},
{
name: "only node label changed",
newNode: st.MakeNode().Label("foo", "bar").Obj(),
oldNode: st.MakeNode().Label("foo", "fuz").Obj(),
wantEvents: []framework.ClusterEvent{NodeLabelChange},
wantEvents: []ClusterEvent{NodeLabelChange},
},
{
name: "only node taint changed",
@ -230,13 +229,13 @@ func TestNodeSchedulingPropertiesChange(t *testing.T) {
oldNode: st.MakeNode().Taints([]v1.Taint{
{Key: v1.TaintNodeUnschedulable, Value: "foo", Effect: v1.TaintEffectNoSchedule},
}).Obj(),
wantEvents: []framework.ClusterEvent{NodeTaintChange},
wantEvents: []ClusterEvent{NodeTaintChange},
},
{
name: "only node annotation changed",
newNode: st.MakeNode().Annotation("foo", "bar").Obj(),
oldNode: st.MakeNode().Annotation("foo", "fuz").Obj(),
wantEvents: []framework.ClusterEvent{NodeAnnotationChange},
wantEvents: []ClusterEvent{NodeAnnotationChange},
},
{
name: "only node condition changed",
@ -247,7 +246,7 @@ func TestNodeSchedulingPropertiesChange(t *testing.T) {
"Ready",
"Ready",
).Obj(),
wantEvents: []framework.ClusterEvent{NodeConditionChange},
wantEvents: []ClusterEvent{NodeConditionChange},
},
{
name: "both node label and node taint changed",
@ -259,7 +258,7 @@ func TestNodeSchedulingPropertiesChange(t *testing.T) {
oldNode: st.MakeNode().Taints([]v1.Taint{
{Key: v1.TaintNodeUnschedulable, Value: "foo", Effect: v1.TaintEffectNoSchedule},
}).Obj(),
wantEvents: []framework.ClusterEvent{NodeLabelChange, NodeTaintChange},
wantEvents: []ClusterEvent{NodeLabelChange, NodeTaintChange},
},
}
@ -270,3 +269,114 @@ func TestNodeSchedulingPropertiesChange(t *testing.T) {
}
}
}
func Test_podSchedulingPropertiesChange(t *testing.T) {
podWithBigRequest := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "app",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("101m")},
},
},
},
},
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
Name: "app",
AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("101m")},
},
},
},
}
podWithSmallRequestAndLabel := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"foo": "bar"},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "app",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("100m")},
},
},
},
},
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
Name: "app",
AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("100m")},
},
},
},
}
podWithSmallRequest := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "app",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("100m")},
},
},
},
},
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
Name: "app",
AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("100m")},
},
},
},
}
tests := []struct {
name string
newPod *v1.Pod
oldPod *v1.Pod
want []ClusterEvent
}{
{
name: "only label is updated",
newPod: st.MakePod().Label("foo", "bar").Obj(),
oldPod: st.MakePod().Label("foo", "bar2").Obj(),
want: []ClusterEvent{PodLabelChange},
},
{
name: "pod's resource request is scaled down",
oldPod: podWithBigRequest,
newPod: podWithSmallRequest,
want: []ClusterEvent{PodRequestScaledDown},
},
{
name: "pod's resource request is scaled up",
oldPod: podWithSmallRequest,
newPod: podWithBigRequest,
want: []ClusterEvent{assignedPodOtherUpdate},
},
{
name: "both pod's resource request and label are updated",
oldPod: podWithBigRequest,
newPod: podWithSmallRequestAndLabel,
want: []ClusterEvent{PodLabelChange, PodRequestScaledDown},
},
{
name: "untracked properties of pod is updated",
newPod: st.MakePod().Annotation("foo", "bar").Obj(),
oldPod: st.MakePod().Annotation("foo", "bar2").Obj(),
want: []ClusterEvent{assignedPodOtherUpdate},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := PodSchedulingPropertiesChange(tt.newPod, tt.oldPod)
if diff := cmp.Diff(tt.want, got); diff != "" {
t.Errorf("unexpected event is returned from podSchedulingPropertiesChange (-want, +got):\n%s", diff)
}
})
}
}

View File

@ -62,7 +62,7 @@ func (pl *InterPodAffinity) EventsToRegister(_ context.Context) ([]framework.Clu
// All ActionType includes the following events:
// - Delete. An unschedulable Pod may fail due to violating an existing Pod's anti-affinity constraints,
// deleting an existing Pod may make it schedulable.
// - Update. Updating on an existing Pod's labels (e.g., removal) may make
// - UpdatePodLabel. Updating on an existing Pod's labels (e.g., removal) may make
// an unschedulable Pod schedulable.
// - Add. An unschedulable Pod may fail due to violating pod-affinity constraints,
// adding an assigned Pod may make it schedulable.
@ -75,7 +75,7 @@ func (pl *InterPodAffinity) EventsToRegister(_ context.Context) ([]framework.Clu
// As a workaround, we add UpdateNodeTaint event to catch the case.
// We can remove UpdateNodeTaint when we remove the preCheck feature.
// See: https://github.com/kubernetes/kubernetes/issues/110175
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}, QueueingHintFn: pl.isSchedulableAfterPodChange},
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add | framework.UpdatePodLabel | framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodChange},
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}, QueueingHintFn: pl.isSchedulableAfterNodeChange},
}, nil
}

View File

@ -250,9 +250,9 @@ func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error
func (f *Fit) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
podActionType := framework.Delete
if f.enableInPlacePodVerticalScaling {
// If InPlacePodVerticalScaling (KEP 1287) is enabled, then PodUpdate event should be registered
// If InPlacePodVerticalScaling (KEP 1287) is enabled, then UpdatePodScaleDown event should be registered
// for this plugin since a Pod update may free up resources that make other Pods schedulable.
podActionType |= framework.Update
podActionType |= framework.UpdatePodScaleDown
}
return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: podActionType}, QueueingHintFn: f.isSchedulableAfterPodChange},
@ -296,7 +296,7 @@ func (f *Fit) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldOb
return framework.QueueSkip, nil
}
logger.V(5).Info("the max request resources of another scheduled pod got reduced and it may make the unscheduled pod schedulable", "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod))
logger.V(5).Info("another scheduled pod or the target pod itself got scaled down, and it may make the unscheduled pod schedulable", "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod))
return framework.Queue, nil
}

View File

@ -1095,7 +1095,7 @@ func TestEventsToRegister(t *testing.T) {
"Register events with InPlacePodVerticalScaling feature enabled",
true,
[]framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: "Pod", ActionType: framework.Update | framework.Delete}},
{Event: framework.ClusterEvent{Resource: "Pod", ActionType: framework.UpdatePodScaleDown | framework.Delete}},
{Event: framework.ClusterEvent{Resource: "Node", ActionType: framework.Add | framework.Update}},
},
},

View File

@ -139,11 +139,11 @@ func (pl *PodTopologySpread) EventsToRegister(_ context.Context) ([]framework.Cl
// All ActionType includes the following events:
// - Add. An unschedulable Pod may fail due to violating topology spread constraints,
// adding an assigned Pod may make it schedulable.
// - Update. Updating on an existing Pod's labels (e.g., removal) may make
// - UpdatePodLabel. Updating on an existing Pod's labels (e.g., removal) may make
// an unschedulable Pod schedulable.
// - Delete. An unschedulable Pod may fail due to violating an existing Pod's topology spread constraints,
// deleting an existing Pod may make it schedulable.
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}, QueueingHintFn: pl.isSchedulableAfterPodChange},
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add | framework.UpdatePodLabel | framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodChange},
// Node add|delete|update maybe lead an topology key changed,
// and make these pod in scheduling schedulable or unschedulable.
//

View File

@ -46,25 +46,48 @@ type ActionType int64
// Constants for ActionTypes.
const (
Add ActionType = 1 << iota // 1
Delete // 10
// UpdateNodeXYZ is only applicable for Node events.
UpdateNodeAllocatable // 100
UpdateNodeLabel // 1000
UpdateNodeTaint // 10000
UpdateNodeCondition // 100000
UpdateNodeAnnotation // 1000000
Add ActionType = 1 << iota
Delete
All ActionType = 1<<iota - 1 // 1111111
// UpdateNodeXYZ is only applicable for Node events.
// If you use UpdateNodeXYZ,
// your plugin's QueueingHint is only executed for the specific sub-Update event.
// It's better to narrow down the scope of the event by using them instead of just using Update event
// for better performance in requeueing.
UpdateNodeAllocatable
UpdateNodeLabel
UpdateNodeTaint
UpdateNodeCondition
UpdateNodeAnnotation
// UpdatePodXYZ is only applicable for Pod events.
// If you use UpdatePodXYZ,
// your plugin's QueueingHint is only executed for the specific sub-Update event.
// It's better to narrow down the scope of the event by using them instead of Update event
// for better performance in requeueing.
UpdatePodLabel
// UpdatePodScaleDown is an update for pod's scale down (i.e., any resource request is reduced).
UpdatePodScaleDown
// updatePodOther is a update for pod's other fields.
// It's used only for the internal event handling, and thus unexported.
updatePodOther
All ActionType = 1<<iota - 1
// Use the general Update type if you don't either know or care the specific sub-Update type to use.
Update = UpdateNodeAllocatable | UpdateNodeLabel | UpdateNodeTaint | UpdateNodeCondition | UpdateNodeAnnotation
Update = UpdateNodeAllocatable | UpdateNodeLabel | UpdateNodeTaint | UpdateNodeCondition | UpdateNodeAnnotation | UpdatePodLabel | UpdatePodScaleDown | updatePodOther
)
// GVK is short for group/version/kind, which can uniquely represent a particular API resource.
type GVK string
// Constants for GVKs.
//
// Note:
// - UpdatePodXYZ or UpdateNodeXYZ: triggered by updating particular parts of a Pod or a Node, e.g. updatePodLabel.
// Use specific events rather than general ones (updatePodLabel vs update) can make the requeueing process more efficient
// and consume less memory as less events will be cached at scheduler.
const (
// There are a couple of notes about how the scheduler notifies the events of Pods:
// - Add: add events could be triggered by either a newly created Pod or an existing Pod that is scheduled to a Node.
@ -164,7 +187,8 @@ func (s QueueingHint) String() string {
type ClusterEvent struct {
Resource GVK
ActionType ActionType
Label string
// Label describes this cluster event, only used in logging and metrics.
Label string
}
// IsWildCard returns true if ClusterEvent follows WildCard semantics

View File

@ -1,166 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
const (
// PodAdd is the event when a new pod is added to API server.
PodAdd = "PodAdd"
// ScheduleAttemptFailure is the event when a schedule attempt fails.
ScheduleAttemptFailure = "ScheduleAttemptFailure"
// BackoffComplete is the event when a pod finishes backoff.
BackoffComplete = "BackoffComplete"
// ForceActivate is the event when a pod is moved from unschedulablePods/backoffQ
// to activeQ. Usually it's triggered by plugin implementations.
ForceActivate = "ForceActivate"
// PodUpdate is the event when a pod is updated
PodUpdate = "PodUpdate"
)
var (
// AssignedPodAdd is the event when an assigned pod is added.
AssignedPodAdd = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add, Label: "AssignedPodAdd"}
// NodeAdd is the event when a new node is added to the cluster.
NodeAdd = framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add, Label: "NodeAdd"}
// NodeDelete is the event when a node is deleted from the cluster.
NodeDelete = framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Delete, Label: "NodeDelete"}
// AssignedPodUpdate is the event when an assigned pod is updated.
AssignedPodUpdate = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update, Label: "AssignedPodUpdate"}
// UnscheduledPodAdd is the event when an unscheduled pod is added.
UnscheduledPodAdd = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update, Label: "UnschedulablePodAdd"}
// UnscheduledPodUpdate is the event when an unscheduled pod is updated.
UnscheduledPodUpdate = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update, Label: "UnschedulablePodUpdate"}
// UnscheduledPodDelete is the event when an unscheduled pod is deleted.
UnscheduledPodDelete = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update, Label: "UnschedulablePodDelete"}
// AssignedPodDelete is the event when an assigned pod is deleted.
AssignedPodDelete = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete, Label: "AssignedPodDelete"}
// NodeSpecUnschedulableChange is the event when unschedulable node spec is changed.
NodeSpecUnschedulableChange = framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint, Label: "NodeSpecUnschedulableChange"}
// NodeAllocatableChange is the event when node allocatable is changed.
NodeAllocatableChange = framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeAllocatable, Label: "NodeAllocatableChange"}
// NodeLabelChange is the event when node label is changed.
NodeLabelChange = framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeLabel, Label: "NodeLabelChange"}
// NodeAnnotationChange is the event when node annotation is changed.
NodeAnnotationChange = framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeAnnotation, Label: "NodeAnnotationChange"}
// NodeTaintChange is the event when node taint is changed.
NodeTaintChange = framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint, Label: "NodeTaintChange"}
// NodeConditionChange is the event when node condition is changed.
NodeConditionChange = framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeCondition, Label: "NodeConditionChange"}
// PvAdd is the event when a persistent volume is added in the cluster.
PvAdd = framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add, Label: "PvAdd"}
// PvUpdate is the event when a persistent volume is updated in the cluster.
PvUpdate = framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Update, Label: "PvUpdate"}
// PvcAdd is the event when a persistent volume claim is added in the cluster.
PvcAdd = framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add, Label: "PvcAdd"}
// PvcUpdate is the event when a persistent volume claim is updated in the cluster.
PvcUpdate = framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Update, Label: "PvcUpdate"}
// StorageClassAdd is the event when a StorageClass is added in the cluster.
StorageClassAdd = framework.ClusterEvent{Resource: framework.StorageClass, ActionType: framework.Add, Label: "StorageClassAdd"}
// StorageClassUpdate is the event when a StorageClass is updated in the cluster.
StorageClassUpdate = framework.ClusterEvent{Resource: framework.StorageClass, ActionType: framework.Update, Label: "StorageClassUpdate"}
// CSINodeAdd is the event when a CSI node is added in the cluster.
CSINodeAdd = framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.Add, Label: "CSINodeAdd"}
// CSINodeUpdate is the event when a CSI node is updated in the cluster.
CSINodeUpdate = framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.Update, Label: "CSINodeUpdate"}
// CSIDriverAdd is the event when a CSI driver is added in the cluster.
CSIDriverAdd = framework.ClusterEvent{Resource: framework.CSIDriver, ActionType: framework.Add, Label: "CSIDriverAdd"}
// CSIDriverUpdate is the event when a CSI driver is updated in the cluster.
CSIDriverUpdate = framework.ClusterEvent{Resource: framework.CSIDriver, ActionType: framework.Update, Label: "CSIDriverUpdate"}
// CSIStorageCapacityAdd is the event when a CSI storage capacity is added in the cluster.
CSIStorageCapacityAdd = framework.ClusterEvent{Resource: framework.CSIStorageCapacity, ActionType: framework.Add, Label: "CSIStorageCapacityAdd"}
// CSIStorageCapacityUpdate is the event when a CSI storage capacity is updated in the cluster.
CSIStorageCapacityUpdate = framework.ClusterEvent{Resource: framework.CSIStorageCapacity, ActionType: framework.Update, Label: "CSIStorageCapacityUpdate"}
// WildCardEvent semantically matches all resources on all actions.
WildCardEvent = framework.ClusterEvent{Resource: framework.WildCard, ActionType: framework.All, Label: "WildCardEvent"}
// UnschedulableTimeout is the event when a pod stays in unschedulable for longer than timeout.
UnschedulableTimeout = framework.ClusterEvent{Resource: framework.WildCard, ActionType: framework.All, Label: "UnschedulableTimeout"}
)
// NodeSchedulingPropertiesChange interprets the update of a node and returns corresponding UpdateNodeXYZ event(s).
func NodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) (events []framework.ClusterEvent) {
nodeChangeExtracters := []nodeChangeExtractor{
extractNodeSpecUnschedulableChange,
extractNodeAllocatableChange,
extractNodeLabelsChange,
extractNodeTaintsChange,
extractNodeConditionsChange,
extractNodeAnnotationsChange,
}
for _, fn := range nodeChangeExtracters {
if event := fn(newNode, oldNode); event != nil {
events = append(events, *event)
}
}
return
}
type nodeChangeExtractor func(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent
func extractNodeAllocatableChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent {
if !equality.Semantic.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable) {
return &NodeAllocatableChange
}
return nil
}
func extractNodeLabelsChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent {
if !equality.Semantic.DeepEqual(oldNode.GetLabels(), newNode.GetLabels()) {
return &NodeLabelChange
}
return nil
}
func extractNodeTaintsChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent {
if !equality.Semantic.DeepEqual(newNode.Spec.Taints, oldNode.Spec.Taints) {
return &NodeTaintChange
}
return nil
}
func extractNodeConditionsChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent {
strip := func(conditions []v1.NodeCondition) map[v1.NodeConditionType]v1.ConditionStatus {
conditionStatuses := make(map[v1.NodeConditionType]v1.ConditionStatus, len(conditions))
for i := range conditions {
conditionStatuses[conditions[i].Type] = conditions[i].Status
}
return conditionStatuses
}
if !equality.Semantic.DeepEqual(strip(oldNode.Status.Conditions), strip(newNode.Status.Conditions)) {
return &NodeConditionChange
}
return nil
}
func extractNodeSpecUnschedulableChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent {
if newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable && !newNode.Spec.Unschedulable {
return &NodeSpecUnschedulableChange
}
return nil
}
func extractNodeAnnotationsChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent {
if !equality.Semantic.DeepEqual(oldNode.GetAnnotations(), newNode.GetAnnotations()) {
return &NodeAnnotationChange
}
return nil
}

View File

@ -120,7 +120,7 @@ type SchedulingQueue interface {
// See https://github.com/kubernetes/kubernetes/issues/110175
MoveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, oldObj, newObj interface{}, preCheck PreEnqueueCheck)
AssignedPodAdded(logger klog.Logger, pod *v1.Pod)
AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod)
AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod, event framework.ClusterEvent)
PendingPods() ([]*v1.Pod, string)
PodsInActiveQ() []*v1.Pod
// Close closes the SchedulingQueue so that the goroutine which is
@ -438,6 +438,7 @@ const (
// isEventOfInterest returns true if the event is of interest by some plugins.
func (p *PriorityQueue) isEventOfInterest(logger klog.Logger, event framework.ClusterEvent) bool {
if event.IsWildCard() {
// Wildcard event moves Pods that failed with any plugins.
return true
}
@ -633,7 +634,7 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue
_ = p.podBackoffQ.Delete(pInfo) // Don't need to react when pInfo is not found.
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
if event == PodAdd || event == PodUpdate {
if event == framework.PodAdd || event == framework.PodUpdate {
p.AddNominatedPod(logger, pInfo.PodInfo, nil)
}
@ -647,7 +648,7 @@ func (p *PriorityQueue) Add(logger klog.Logger, pod *v1.Pod) error {
defer p.lock.Unlock()
pInfo := p.newQueuedPodInfo(pod)
if added, err := p.moveToActiveQ(logger, pInfo, PodAdd); !added {
if added, err := p.moveToActiveQ(logger, pInfo, framework.PodAdd); !added {
return err
}
p.cond.Broadcast()
@ -698,7 +699,7 @@ func (p *PriorityQueue) activate(logger klog.Logger, pod *v1.Pod) bool {
return false
}
added, _ := p.moveToActiveQ(logger, pInfo, ForceActivate)
added, _ := p.moveToActiveQ(logger, pInfo, framework.ForceActivate)
return added
}
@ -822,12 +823,12 @@ func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger,
if err := p.podBackoffQ.Add(pInfo); err != nil {
return fmt.Errorf("error adding pod %v to the backoff queue: %v", klog.KObj(pod), err)
}
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", backoffQ)
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", ScheduleAttemptFailure).Inc()
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", backoffQ)
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", framework.ScheduleAttemptFailure).Inc()
} else {
p.unschedulablePods.addOrUpdate(pInfo)
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", unschedulablePods)
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", unschedulablePods)
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", framework.ScheduleAttemptFailure).Inc()
}
return nil
@ -875,8 +876,8 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *
schedulingHint := p.determineSchedulingHintForInFlightPod(logger, pInfo)
// In this case, we try to requeue this Pod to activeQ/backoffQ.
queue := p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, ScheduleAttemptFailure)
logger.V(3).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", queue, "schedulingCycle", podSchedulingCycle, "hint", schedulingHint, "unschedulable plugins", rejectorPlugins)
queue := p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, framework.ScheduleAttemptFailure)
logger.V(3).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", queue, "schedulingCycle", podSchedulingCycle, "hint", schedulingHint, "unschedulable plugins", rejectorPlugins)
if queue == activeQ {
// When the Pod is moved to activeQ, need to let p.cond know so that the Pod will be pop()ed out.
p.cond.Broadcast()
@ -905,7 +906,7 @@ func (p *PriorityQueue) flushBackoffQCompleted(logger klog.Logger) {
logger.Error(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod))
break
}
if added, _ := p.moveToActiveQ(logger, pInfo, BackoffComplete); added {
if added, _ := p.moveToActiveQ(logger, pInfo, framework.BackoffComplete); added {
activated = true
}
}
@ -931,7 +932,7 @@ func (p *PriorityQueue) flushUnschedulablePodsLeftover(logger klog.Logger) {
}
if len(podsToMove) > 0 {
p.movePodsToActiveOrBackoffQueue(logger, podsToMove, UnschedulableTimeout, nil, nil)
p.movePodsToActiveOrBackoffQueue(logger, podsToMove, framework.UnschedulableTimeout, nil, nil)
}
}
@ -1068,7 +1069,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error
// We can clean it up once we change updatePodInSchedulingQueue to call MoveAllToActiveOrBackoffQueue.
// See https://github.com/kubernetes/kubernetes/pull/125578#discussion_r1648338033 for more context.
p.inFlightEvents.PushBack(&clusterEvent{
event: UnscheduledPodUpdate,
event: framework.UnscheduledPodUpdate,
oldObj: oldPod,
newObj: newPod,
})
@ -1104,15 +1105,21 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error
// whether the update may make the pods schedulable.
// Plugins have to implement a QueueingHint for Pod/Update event
// if the rejection from them could be resolved by updating unscheduled Pods itself.
hint := p.isPodWorthRequeuing(logger, pInfo, UnscheduledPodUpdate, oldPod, newPod)
queue := p.requeuePodViaQueueingHint(logger, pInfo, hint, UnscheduledPodUpdate.Label)
if queue != unschedulablePods {
logger.V(5).Info("Pod moved to an internal scheduling queue because the Pod is updated", "pod", klog.KObj(newPod), "event", PodUpdate, "queue", queue)
p.unschedulablePods.delete(usPodInfo.Pod, gated)
}
if queue == activeQ {
p.cond.Broadcast()
events := framework.PodSchedulingPropertiesChange(newPod, oldPod)
for _, evt := range events {
hint := p.isPodWorthRequeuing(logger, pInfo, evt, oldPod, newPod)
queue := p.requeuePodViaQueueingHint(logger, pInfo, hint, framework.UnscheduledPodUpdate.Label)
if queue != unschedulablePods {
logger.V(5).Info("Pod moved to an internal scheduling queue because the Pod is updated", "pod", klog.KObj(newPod), "event", framework.PodUpdate, "queue", queue)
p.unschedulablePods.delete(usPodInfo.Pod, gated)
}
if queue == activeQ {
p.cond.Broadcast()
break
}
}
return nil
}
if isPodUpdated(oldPod, newPod) {
@ -1122,11 +1129,11 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error
return err
}
p.unschedulablePods.delete(usPodInfo.Pod, gated)
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", PodUpdate, "queue", backoffQ)
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", framework.PodUpdate, "queue", backoffQ)
return nil
}
if added, err := p.moveToActiveQ(logger, pInfo, BackoffComplete); !added {
if added, err := p.moveToActiveQ(logger, pInfo, framework.BackoffComplete); !added {
return err
}
p.cond.Broadcast()
@ -1139,7 +1146,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error
}
// If pod is not in any of the queues, we put it in the active queue.
pInfo := p.newQueuedPodInfo(newPod)
if added, err := p.moveToActiveQ(logger, pInfo, PodUpdate); !added {
if added, err := p.moveToActiveQ(logger, pInfo, framework.PodUpdate); !added {
return err
}
p.cond.Broadcast()
@ -1172,36 +1179,22 @@ func (p *PriorityQueue) AssignedPodAdded(logger klog.Logger, pod *v1.Pod) {
// Pre-filter Pods to move by getUnschedulablePodsWithCrossTopologyTerm
// because Pod related events shouldn't make Pods that rejected by single-node scheduling requirement schedulable.
p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithCrossTopologyTerm(logger, pod), AssignedPodAdd, nil, pod)
p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithCrossTopologyTerm(logger, pod), framework.AssignedPodAdd, nil, pod)
p.lock.Unlock()
}
// isPodResourcesResizedDown returns true if a pod CPU and/or memory resize request has been
// admitted by kubelet, is 'InProgress', and results in a net sizing down of updated resources.
// It returns false if either CPU or memory resource is net resized up, or if no resize is in progress.
func isPodResourcesResizedDown(pod *v1.Pod) bool {
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
// TODO(vinaykul,wangchen615,InPlacePodVerticalScaling): Fix this to determine when a
// pod is truly resized down (might need oldPod if we cannot determine from Status alone)
if pod.Status.Resize == v1.PodResizeStatusInProgress {
return true
}
}
return false
}
// AssignedPodUpdated is called when a bound pod is updated. Change of labels
// may make pending pods with matching affinity terms schedulable.
func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod) {
func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod, event framework.ClusterEvent) {
p.lock.Lock()
if isPodResourcesResizedDown(newPod) {
if event.Resource == framework.Pod && event.ActionType&framework.UpdatePodScaleDown != 0 {
// In this case, we don't want to pre-filter Pods by getUnschedulablePodsWithCrossTopologyTerm
// because Pod related events may make Pods that were rejected by NodeResourceFit schedulable.
p.moveAllToActiveOrBackoffQueue(logger, AssignedPodUpdate, oldPod, newPod, nil)
p.moveAllToActiveOrBackoffQueue(logger, framework.AssignedPodUpdate, oldPod, newPod, nil)
} else {
// Pre-filter Pods to move by getUnschedulablePodsWithCrossTopologyTerm
// because Pod related events only make Pods rejected by cross topology term schedulable.
p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithCrossTopologyTerm(logger, newPod), AssignedPodUpdate, oldPod, newPod)
p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithCrossTopologyTerm(logger, newPod), event, oldPod, newPod)
}
p.lock.Unlock()
}
@ -1275,7 +1268,7 @@ func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *fra
}
p.unschedulablePods.addOrUpdate(pInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", framework.ScheduleAttemptFailure).Inc()
return unschedulablePods
}

View File

@ -225,13 +225,13 @@ func Test_InFlightPods(t *testing.T) {
// This Pod shouldn't be added to inFlightPods because SchedulingQueueHint is disabled.
{podPopped: pod},
// This event shouldn't be added to inFlightEvents because SchedulingQueueHint is disabled.
{eventHappens: &PvAdd},
{eventHappens: &framework.PvAdd},
},
wantInFlightPods: nil,
wantInFlightEvents: nil,
queueingHintMap: QueueingHintMapPerProfile{
"": {
PvAdd: {
framework.PvAdd: {
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnQueue,
@ -246,18 +246,18 @@ func Test_InFlightPods(t *testing.T) {
initialPods: []*v1.Pod{pod},
actions: []action{
// This won't be added to inFlightEvents because no inFlightPods at this point.
{eventHappens: &PvcAdd},
{eventHappens: &framework.PvcAdd},
{podPopped: pod},
// This gets added for the pod.
{eventHappens: &PvAdd},
// This doesn't get added because no plugin is interested in PvUpdate.
{eventHappens: &PvUpdate},
{eventHappens: &framework.PvAdd},
// This doesn't get added because no plugin is interested in framework.PvUpdate.
{eventHappens: &framework.PvUpdate},
},
wantInFlightPods: []*v1.Pod{pod},
wantInFlightEvents: []interface{}{pod, PvAdd},
wantInFlightEvents: []interface{}{pod, framework.PvAdd},
queueingHintMap: QueueingHintMapPerProfile{
"": {
PvAdd: {
framework.PvAdd: {
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnQueue,
@ -272,32 +272,32 @@ func Test_InFlightPods(t *testing.T) {
initialPods: []*v1.Pod{pod, pod2},
actions: []action{
// This won't be added to inFlightEvents because no inFlightPods at this point.
{eventHappens: &PvcAdd},
{eventHappens: &framework.PvcAdd},
{podPopped: pod},
{eventHappens: &PvAdd},
{eventHappens: &framework.PvAdd},
{podPopped: pod2},
{eventHappens: &NodeAdd},
{eventHappens: &framework.NodeAdd},
// This pod will be requeued to backoffQ because no plugin is registered as unschedulable plugin.
{podEnqueued: newQueuedPodInfoForLookup(pod)},
},
wantBackoffQPodNames: []string{"targetpod"},
wantInFlightPods: []*v1.Pod{pod2}, // only pod2 is registered because pod is already enqueued back.
wantInFlightEvents: []interface{}{pod2, NodeAdd},
wantInFlightEvents: []interface{}{pod2, framework.NodeAdd},
queueingHintMap: QueueingHintMapPerProfile{
"": {
PvAdd: {
framework.PvAdd: {
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnQueue,
},
},
NodeAdd: {
framework.NodeAdd: {
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnQueue,
},
},
PvcAdd: {
framework.PvcAdd: {
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnQueue,
@ -312,14 +312,14 @@ func Test_InFlightPods(t *testing.T) {
initialPods: []*v1.Pod{pod, pod2},
actions: []action{
// This won't be added to inFlightEvents because no inFlightPods at this point.
{eventHappens: &PvcAdd},
{eventHappens: &framework.PvcAdd},
{podPopped: pod},
{eventHappens: &PvAdd},
{eventHappens: &framework.PvAdd},
{podPopped: pod2},
{eventHappens: &NodeAdd},
{eventHappens: &framework.NodeAdd},
// This pod will be requeued to backoffQ because no plugin is registered as unschedulable plugin.
{podEnqueued: newQueuedPodInfoForLookup(pod)},
{eventHappens: &CSINodeUpdate},
{eventHappens: &framework.CSINodeUpdate},
// This pod will be requeued to backoffQ because no plugin is registered as unschedulable plugin.
{podEnqueued: newQueuedPodInfoForLookup(pod2)},
},
@ -327,25 +327,25 @@ func Test_InFlightPods(t *testing.T) {
wantInFlightPods: nil, // empty
queueingHintMap: QueueingHintMapPerProfile{
"": {
PvAdd: {
framework.PvAdd: {
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnQueue,
},
},
NodeAdd: {
framework.NodeAdd: {
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnQueue,
},
},
PvcAdd: {
framework.PvcAdd: {
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnQueue,
},
},
CSINodeUpdate: {
framework.CSINodeUpdate: {
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnQueue,
@ -360,34 +360,34 @@ func Test_InFlightPods(t *testing.T) {
initialPods: []*v1.Pod{pod, pod2, pod3},
actions: []action{
// This won't be added to inFlightEvents because no inFlightPods at this point.
{eventHappens: &PvcAdd},
{eventHappens: &framework.PvcAdd},
{podPopped: pod},
{eventHappens: &PvAdd},
{eventHappens: &framework.PvAdd},
{podPopped: pod2},
{eventHappens: &NodeAdd},
{eventHappens: &framework.NodeAdd},
// This Pod won't be requeued again.
{podPopped: pod3},
{eventHappens: &AssignedPodAdd},
{eventHappens: &framework.AssignedPodAdd},
{podEnqueued: newQueuedPodInfoForLookup(pod2)},
},
wantBackoffQPodNames: []string{"targetpod2"},
wantInFlightPods: []*v1.Pod{pod, pod3},
wantInFlightEvents: []interface{}{pod, PvAdd, NodeAdd, pod3, AssignedPodAdd},
wantInFlightEvents: []interface{}{pod, framework.PvAdd, framework.NodeAdd, pod3, framework.AssignedPodAdd},
queueingHintMap: QueueingHintMapPerProfile{
"": {
PvAdd: {
framework.PvAdd: {
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnQueue,
},
},
NodeAdd: {
framework.NodeAdd: {
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnQueue,
},
},
AssignedPodAdd: {
framework.AssignedPodAdd: {
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnQueue,
@ -401,7 +401,7 @@ func Test_InFlightPods(t *testing.T) {
initialPods: []*v1.Pod{pod},
actions: []action{
{podPopped: pod},
{eventHappens: &AssignedPodAdd},
{eventHappens: &framework.AssignedPodAdd},
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")},
},
wantBackoffQPodNames: []string{"targetpod"},
@ -411,7 +411,7 @@ func Test_InFlightPods(t *testing.T) {
"": {
// This hint fn tells that this event doesn't make a Pod schedulable.
// However, this QueueingHintFn will be ignored actually because SchedulingQueueHint is disabled.
AssignedPodAdd: {
framework.AssignedPodAdd: {
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnSkip,
@ -425,9 +425,9 @@ func Test_InFlightPods(t *testing.T) {
isSchedulingQueueHintEnabled: true,
initialPods: []*v1.Pod{pod},
actions: []action{
{eventHappens: &WildCardEvent},
{eventHappens: &framework.WildCardEvent},
{podPopped: pod},
{eventHappens: &AssignedPodAdd},
{eventHappens: &framework.AssignedPodAdd},
// This Pod won't be requeued to activeQ/backoffQ because fooPlugin1 returns QueueSkip.
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")},
},
@ -436,9 +436,9 @@ func Test_InFlightPods(t *testing.T) {
wantInFlightEvents: nil,
queueingHintMap: QueueingHintMapPerProfile{
"": {
// fooPlugin1 has a queueing hint function for AssignedPodAdd,
// fooPlugin1 has a queueing hint function for framework.AssignedPodAdd,
// but hint fn tells that this event doesn't make a Pod scheudlable.
AssignedPodAdd: {
framework.AssignedPodAdd: {
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnSkip,
@ -453,7 +453,7 @@ func Test_InFlightPods(t *testing.T) {
initialPods: []*v1.Pod{pod},
actions: []action{
{podPopped: pod},
{eventHappens: &AssignedPodAdd},
{eventHappens: &framework.AssignedPodAdd},
{podEnqueued: newQueuedPodInfoForLookup(pod)},
},
wantBackoffQPodNames: []string{"targetpod"},
@ -462,7 +462,7 @@ func Test_InFlightPods(t *testing.T) {
queueingHintMap: QueueingHintMapPerProfile{
"": {
// It will be ignored because no failed plugin.
AssignedPodAdd: {
framework.AssignedPodAdd: {
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnQueue,
@ -477,7 +477,7 @@ func Test_InFlightPods(t *testing.T) {
initialPods: []*v1.Pod{pod},
actions: []action{
{podPopped: pod},
{eventHappens: &NodeAdd},
{eventHappens: &framework.NodeAdd},
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")},
},
wantUnschedPodPoolPodNames: []string{"targetpod"},
@ -485,10 +485,10 @@ func Test_InFlightPods(t *testing.T) {
wantInFlightEvents: nil,
queueingHintMap: QueueingHintMapPerProfile{
"": {
// fooPlugin1 has no queueing hint function for NodeAdd.
AssignedPodAdd: {
// fooPlugin1 has no queueing hint function for framework.NodeAdd.
framework.AssignedPodAdd: {
{
// It will be ignored because the event is not NodeAdd.
// It will be ignored because the event is not framework.NodeAdd.
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnQueue,
},
@ -502,7 +502,7 @@ func Test_InFlightPods(t *testing.T) {
initialPods: []*v1.Pod{pod},
actions: []action{
{podPopped: pod},
{eventHappens: &AssignedPodAdd},
{eventHappens: &framework.AssignedPodAdd},
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")},
},
wantUnschedPodPoolPodNames: []string{"targetpod"},
@ -510,9 +510,9 @@ func Test_InFlightPods(t *testing.T) {
wantInFlightEvents: nil,
queueingHintMap: QueueingHintMapPerProfile{
"": {
// fooPlugin1 has a queueing hint function for AssignedPodAdd,
// fooPlugin1 has a queueing hint function for framework.AssignedPodAdd,
// but hint fn tells that this event doesn't make a Pod scheudlable.
AssignedPodAdd: {
framework.AssignedPodAdd: {
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnSkip,
@ -527,7 +527,7 @@ func Test_InFlightPods(t *testing.T) {
initialPods: []*v1.Pod{pod},
actions: []action{
{podPopped: pod},
{eventHappens: &AssignedPodAdd},
{eventHappens: &framework.AssignedPodAdd},
{podEnqueued: &framework.QueuedPodInfo{
PodInfo: mustNewPodInfo(pod),
UnschedulablePlugins: sets.New("fooPlugin2", "fooPlugin3"),
@ -539,7 +539,7 @@ func Test_InFlightPods(t *testing.T) {
wantInFlightEvents: nil,
queueingHintMap: QueueingHintMapPerProfile{
"": {
AssignedPodAdd: {
framework.AssignedPodAdd: {
{
PluginName: "fooPlugin3",
QueueingHintFn: queueHintReturnSkip,
@ -563,7 +563,7 @@ func Test_InFlightPods(t *testing.T) {
initialPods: []*v1.Pod{pod},
actions: []action{
{podPopped: pod},
{eventHappens: &AssignedPodAdd},
{eventHappens: &framework.AssignedPodAdd},
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1", "fooPlugin2")},
},
wantBackoffQPodNames: []string{"targetpod"},
@ -571,7 +571,7 @@ func Test_InFlightPods(t *testing.T) {
wantInFlightEvents: nil,
queueingHintMap: QueueingHintMapPerProfile{
"": {
AssignedPodAdd: {
framework.AssignedPodAdd: {
{
// it will be ignored because the hint fn returns Skip that is weaker than queueHintReturnQueue from fooPlugin1.
PluginName: "fooPlugin2",
@ -592,9 +592,9 @@ func Test_InFlightPods(t *testing.T) {
initialPods: []*v1.Pod{pod, pod2},
actions: []action{
{callback: func(t *testing.T, q *PriorityQueue) { poppedPod = popPod(t, logger, q, pod) }},
{eventHappens: &NodeAdd},
{eventHappens: &framework.NodeAdd},
{callback: func(t *testing.T, q *PriorityQueue) { poppedPod2 = popPod(t, logger, q, pod2) }},
{eventHappens: &AssignedPodAdd},
{eventHappens: &framework.AssignedPodAdd},
{callback: func(t *testing.T, q *PriorityQueue) {
logger, _ := ktesting.NewTestContext(t)
err := q.AddUnschedulableIfNotPresent(logger, poppedPod, q.SchedulingCycle())
@ -617,7 +617,7 @@ func Test_InFlightPods(t *testing.T) {
wantInFlightEvents: nil,
queueingHintMap: QueueingHintMapPerProfile{
"": {
AssignedPodAdd: {
framework.AssignedPodAdd: {
{
// it will be ignored because the hint fn returns QueueSkip that is weaker than queueHintReturnQueueImmediately from fooPlugin1.
PluginName: "fooPlugin3",
@ -653,7 +653,7 @@ func Test_InFlightPods(t *testing.T) {
t.Errorf("Unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
}},
{eventHappens: &PvAdd}, // Active again.
{eventHappens: &framework.PvAdd}, // Active again.
{callback: func(t *testing.T, q *PriorityQueue) {
poppedPod = popPod(t, logger, q, pod)
if len(poppedPod.UnschedulablePlugins) > 0 {
@ -670,7 +670,7 @@ func Test_InFlightPods(t *testing.T) {
},
queueingHintMap: QueueingHintMapPerProfile{
"": {
PvAdd: {
framework.PvAdd: {
{
// The hint fn tells that this event makes a Pod scheudlable immediately.
PluginName: "fooPlugin1",
@ -808,7 +808,7 @@ func TestPop(t *testing.T) {
pod := st.MakePod().Name("targetpod").UID("pod1").Obj()
queueingHintMap := QueueingHintMapPerProfile{
"": {
PvAdd: {
framework.PvAdd: {
{
// The hint fn tells that this event makes a Pod scheudlable.
PluginName: "fooPlugin1",
@ -836,7 +836,7 @@ func TestPop(t *testing.T) {
}
// Activate it again.
q.MoveAllToActiveOrBackoffQueue(logger, PvAdd, nil, nil, nil)
q.MoveAllToActiveOrBackoffQueue(logger, framework.PvAdd, nil, nil, nil)
// Now check result of Pop.
poppedPod = popPod(t, logger, q, pod)
@ -905,7 +905,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
}
// move all pods to active queue when we were trying to schedule them
q.MoveAllToActiveOrBackoffQueue(logger, WildCardEvent, nil, nil, nil)
q.MoveAllToActiveOrBackoffQueue(logger, framework.WildCardEvent, nil, nil, nil)
oldCycle := q.SchedulingCycle()
firstPod, _ := q.Pop(logger)
@ -969,7 +969,7 @@ func TestPriorityQueue_Update(t *testing.T) {
skipPlugin := "skipPlugin"
queueingHintMap := QueueingHintMapPerProfile{
"": {
UnscheduledPodUpdate: {
framework.UnscheduledPodUpdate: {
{
PluginName: queuePlugin,
QueueingHintFn: queueHintReturnQueue,
@ -1172,7 +1172,7 @@ func TestPriorityQueue_UpdateWhenInflight(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, true)
m := makeEmptyQueueingHintMapPerProfile()
// fakePlugin could change its scheduling result by any updates in Pods.
m[""][UnscheduledPodUpdate] = []*QueueingHintFunction{
m[""][framework.UnscheduledPodUpdate] = []*QueueingHintFunction{
{
PluginName: "fakePlugin",
QueueingHintFn: queueHintReturnQueue,
@ -1393,7 +1393,7 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) {
m := map[string][]framework.PreEnqueuePlugin{"": tt.plugins}
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m),
WithPodInitialBackoffDuration(time.Second*30), WithPodMaxBackoffDuration(time.Second*60))
got, _ := q.moveToActiveQ(logger, q.newQueuedPodInfo(tt.pod), PodAdd)
got, _ := q.moveToActiveQ(logger, q.newQueuedPodInfo(tt.pod), framework.PodAdd)
if got != tt.wantSuccess {
t.Errorf("Unexpected result: want %v, but got %v", tt.wantSuccess, got)
}
@ -1420,11 +1420,11 @@ func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) {
}{
{
name: "baseline",
moveEvent: UnschedulableTimeout,
moveEvent: framework.UnschedulableTimeout,
},
{
name: "worst",
moveEvent: NodeAdd,
moveEvent: framework.NodeAdd,
},
{
name: "random",
@ -1438,24 +1438,24 @@ func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) {
}
events := []framework.ClusterEvent{
NodeAdd,
NodeTaintChange,
NodeAllocatableChange,
NodeConditionChange,
NodeLabelChange,
NodeAnnotationChange,
PvcAdd,
PvcUpdate,
PvAdd,
PvUpdate,
StorageClassAdd,
StorageClassUpdate,
CSINodeAdd,
CSINodeUpdate,
CSIDriverAdd,
CSIDriverUpdate,
CSIStorageCapacityAdd,
CSIStorageCapacityUpdate,
framework.NodeAdd,
framework.NodeTaintChange,
framework.NodeAllocatableChange,
framework.NodeConditionChange,
framework.NodeLabelChange,
framework.NodeAnnotationChange,
framework.PvcAdd,
framework.PvcUpdate,
framework.PvAdd,
framework.PvUpdate,
framework.StorageClassAdd,
framework.StorageClassUpdate,
framework.CSINodeAdd,
framework.CSINodeUpdate,
framework.CSIDriverAdd,
framework.CSIDriverUpdate,
framework.CSIStorageCapacityAdd,
framework.CSIStorageCapacityUpdate,
}
pluginNum := 20
@ -1474,7 +1474,7 @@ func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) {
c := testingclock.NewFakeClock(time.Now())
m := makeEmptyQueueingHintMapPerProfile()
// - All plugins registered for events[0], which is NodeAdd.
// - All plugins registered for events[0], which is framework.NodeAdd.
// - 1/2 of plugins registered for events[1]
// - 1/3 of plugins registered for events[2]
// - ...
@ -1592,7 +1592,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing.
t.Run(test.name, func(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
m := makeEmptyQueueingHintMapPerProfile()
m[""][NodeAdd] = []*QueueingHintFunction{
m[""][framework.NodeAdd] = []*QueueingHintFunction{
{
PluginName: "foo",
QueueingHintFn: test.hint,
@ -1611,7 +1611,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing.
}
cl.Step(test.duration)
q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil, nil, nil)
q.MoveAllToActiveOrBackoffQueue(logger, framework.NodeAdd, nil, nil, nil)
if q.podBackoffQ.Len() == 0 && test.expectedQ == backoffQ {
t.Fatalf("expected pod to be queued to backoffQ, but it was not")
@ -1636,7 +1636,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
m := makeEmptyQueueingHintMapPerProfile()
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, true)
m[""][NodeAdd] = []*QueueingHintFunction{
m[""][framework.NodeAdd] = []*QueueingHintFunction{
{
PluginName: "fooPlugin",
QueueingHintFn: queueHintReturnQueue,
@ -1691,7 +1691,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
expectInFlightPods(t, q)
// This NodeAdd event moves unschedulablePodInfo and highPriorityPodInfo to the backoffQ,
// because of the queueing hint function registered for NodeAdd/fooPlugin.
q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil, nil, nil)
q.MoveAllToActiveOrBackoffQueue(logger, framework.NodeAdd, nil, nil, nil)
q.Add(logger, medPriorityPodInfo.Pod)
if q.activeQ.Len() != 1 {
t.Errorf("Expected 1 item to be in activeQ, but got: %v", q.activeQ.Len())
@ -1763,7 +1763,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
// and the pods will be moved into activeQ.
c.Step(q.podInitialBackoffDuration)
q.flushBackoffQCompleted(logger) // flush the completed backoffQ to move hpp1 to activeQ.
q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil, nil, nil)
q.MoveAllToActiveOrBackoffQueue(logger, framework.NodeAdd, nil, nil, nil)
if q.activeQ.Len() != 4 {
t.Errorf("Expected 4 items to be in activeQ, but got: %v", q.activeQ.Len())
}
@ -1784,7 +1784,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi
defer cancel()
m := makeEmptyQueueingHintMapPerProfile()
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, false)
m[""][NodeAdd] = []*QueueingHintFunction{
m[""][framework.NodeAdd] = []*QueueingHintFunction{
{
PluginName: "fooPlugin",
QueueingHintFn: queueHintReturnQueue,
@ -1820,7 +1820,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi
}
// This NodeAdd event moves unschedulablePodInfo and highPriorityPodInfo to the backoffQ,
// because of the queueing hint function registered for NodeAdd/fooPlugin.
q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil, nil, nil)
q.MoveAllToActiveOrBackoffQueue(logger, framework.NodeAdd, nil, nil, nil)
if q.activeQ.Len() != 1 {
t.Errorf("Expected 1 item to be in activeQ, but got: %v", q.activeQ.Len())
}
@ -1876,7 +1876,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi
// and the pods will be moved into activeQ.
c.Step(q.podInitialBackoffDuration)
q.flushBackoffQCompleted(logger) // flush the completed backoffQ to move hpp1 to activeQ.
q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil, nil, nil)
q.MoveAllToActiveOrBackoffQueue(logger, framework.NodeAdd, nil, nil, nil)
if q.activeQ.Len() != 4 {
t.Errorf("Expected 4 items to be in activeQ, but got: %v", q.activeQ.Len())
}
@ -1973,7 +1973,7 @@ func TestPriorityQueue_AssignedPodAdded_(t *testing.T) {
c := testingclock.NewFakeClock(time.Now())
m := makeEmptyQueueingHintMapPerProfile()
m[""][AssignedPodAdd] = []*QueueingHintFunction{
m[""][framework.AssignedPodAdd] = []*QueueingHintFunction{
{
PluginName: "fakePlugin",
QueueingHintFn: queueHintReturnQueue,
@ -2134,7 +2134,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
t.Errorf("Unexpected pending pods summary: want %v, but got %v.", wantSummary, gotSummary)
}
// Move all to active queue. We should still see the same set of pods.
q.MoveAllToActiveOrBackoffQueue(logger, WildCardEvent, nil, nil, nil)
q.MoveAllToActiveOrBackoffQueue(logger, framework.WildCardEvent, nil, nil, nil)
gotPods, gotSummary = q.PendingPods()
if diff := cmp.Diff(expectedSet, makeSet(gotPods)); diff != "" {
t.Errorf("Unexpected list of pending Pods (-want, +got):\n%s", diff)
@ -2420,7 +2420,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) {
}
c.Step(DefaultPodInitialBackoffDuration)
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil, nil, nil)
q.MoveAllToActiveOrBackoffQueue(logger, framework.UnschedulableTimeout, nil, nil, nil)
// Simulation is over. Now let's pop all pods. The pod popped first should be
// the last one we pop here.
for i := 0; i < 5; i++ {
@ -2471,7 +2471,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
// Move clock to make the unschedulable pods complete backoff.
c.Step(DefaultPodInitialBackoffDuration + time.Second)
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil, nil, nil)
q.MoveAllToActiveOrBackoffQueue(logger, framework.UnschedulableTimeout, nil, nil, nil)
// Simulate a pod being popped by the scheduler,
// At this time, unschedulable pod should be popped.
@ -2504,7 +2504,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
// Move clock to make the unschedulable pods complete backoff.
c.Step(DefaultPodInitialBackoffDuration + time.Second)
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil, nil, nil)
q.MoveAllToActiveOrBackoffQueue(logger, framework.UnschedulableTimeout, nil, nil, nil)
// At this time, newerPod should be popped
// because it is the oldest tried pod.
@ -2551,7 +2551,7 @@ func TestHighPriorityBackoff(t *testing.T) {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveOrBackoffQueue(logger, WildCardEvent, nil, nil, nil)
q.MoveAllToActiveOrBackoffQueue(logger, framework.WildCardEvent, nil, nil, nil)
p, err = q.Pop(logger)
if err != nil {
@ -2567,7 +2567,7 @@ func TestHighPriorityBackoff(t *testing.T) {
func TestHighPriorityFlushUnschedulablePodsLeftover(t *testing.T) {
c := testingclock.NewFakeClock(time.Now())
m := makeEmptyQueueingHintMapPerProfile()
m[""][NodeAdd] = []*QueueingHintFunction{
m[""][framework.NodeAdd] = []*QueueingHintFunction{
{
PluginName: "fakePlugin",
QueueingHintFn: queueHintReturnQueue,
@ -2784,7 +2784,7 @@ var (
queue.podBackoffQ.Add(pInfo)
}
moveAllToActiveOrBackoffQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) {
queue.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil, nil, nil)
queue.MoveAllToActiveOrBackoffQueue(logger, framework.UnschedulableTimeout, nil, nil, nil)
}
flushBackoffQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) {
queue.clock.(*testingclock.FakeClock).Step(2 * time.Second)
@ -3419,7 +3419,7 @@ func TestBackOffFlow(t *testing.T) {
}
// An event happens.
q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil, nil, nil)
q.MoveAllToActiveOrBackoffQueue(logger, framework.UnschedulableTimeout, nil, nil, nil)
if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok {
t.Errorf("pod %v is not in the backoff queue", podID)
@ -3468,20 +3468,20 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
{
name: "nil PreEnqueueCheck",
podInfos: podInfos,
event: WildCardEvent,
event: framework.WildCardEvent,
want: []string{"p0", "p1", "p2", "p3", "p4"},
},
{
name: "move Pods with priority greater than 2",
podInfos: podInfos,
event: WildCardEvent,
event: framework.WildCardEvent,
preEnqueueCheck: func(pod *v1.Pod) bool { return *pod.Spec.Priority >= 2 },
want: []string{"p2", "p3", "p4"},
},
{
name: "move Pods with even priority and greater than 2",
podInfos: podInfos,
event: WildCardEvent,
event: framework.WildCardEvent,
preEnqueueCheck: func(pod *v1.Pod) bool {
return *pod.Spec.Priority%2 == 0 && *pod.Spec.Priority >= 2
},
@ -3490,7 +3490,7 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
{
name: "move Pods with even and negative priority",
podInfos: podInfos,
event: WildCardEvent,
event: framework.WildCardEvent,
preEnqueueCheck: func(pod *v1.Pod) bool {
return *pod.Spec.Priority%2 == 0 && *pod.Spec.Priority < 0
},
@ -3498,7 +3498,7 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
{
name: "preCheck isn't called if the event is not interested by any plugins",
podInfos: podInfos,
event: PvAdd, // No plugin is interested in this event.
event: framework.PvAdd, // No plugin is interested in this event.
preEnqueueCheck: func(pod *v1.Pod) bool {
panic("preCheck shouldn't be called")
},
@ -3653,17 +3653,17 @@ func Test_isPodWorthRequeuing(t *testing.T) {
UnschedulablePlugins: sets.New("fooPlugin1"),
PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()),
},
event: NodeAdd,
event: framework.NodeAdd,
oldObj: nil,
newObj: st.MakeNode().Obj(),
expected: queueSkip,
expectedExecutionCount: 0,
queueingHintMap: QueueingHintMapPerProfile{
"": {
// no queueing hint function for NodeAdd.
AssignedPodAdd: {
// no queueing hint function for framework.NodeAdd.
framework.AssignedPodAdd: {
{
// It will be ignored because the event is not NodeAdd.
// It will be ignored because the event is not framework.NodeAdd.
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnQueue,
},
@ -3677,14 +3677,14 @@ func Test_isPodWorthRequeuing(t *testing.T) {
UnschedulablePlugins: sets.New("fooPlugin1"),
PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()),
},
event: NodeAdd,
event: framework.NodeAdd,
oldObj: nil,
newObj: st.MakeNode().Obj(),
expected: queueAfterBackoff,
expectedExecutionCount: 1,
queueingHintMap: QueueingHintMapPerProfile{
"": {
NodeAdd: {
framework.NodeAdd: {
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnErr,
@ -3699,7 +3699,7 @@ func Test_isPodWorthRequeuing(t *testing.T) {
UnschedulablePlugins: sets.New("fooPlugin1"),
PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()),
},
event: WildCardEvent,
event: framework.WildCardEvent,
oldObj: nil,
newObj: st.MakeNode().Obj(),
expected: queueAfterBackoff,
@ -3713,14 +3713,14 @@ func Test_isPodWorthRequeuing(t *testing.T) {
PendingPlugins: sets.New("fooPlugin2"),
PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()),
},
event: NodeAdd,
event: framework.NodeAdd,
oldObj: nil,
newObj: st.MakeNode().Node,
expected: queueImmediately,
expectedExecutionCount: 2,
queueingHintMap: QueueingHintMapPerProfile{
"": {
NodeAdd: {
framework.NodeAdd: {
{
PluginName: "fooPlugin1",
// It returns Queue and it's interpreted as queueAfterBackoff.
@ -3751,14 +3751,14 @@ func Test_isPodWorthRequeuing(t *testing.T) {
UnschedulablePlugins: sets.New("fooPlugin1", "fooPlugin2"),
PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()),
},
event: NodeAdd,
event: framework.NodeAdd,
oldObj: nil,
newObj: st.MakeNode().Obj(),
expected: queueAfterBackoff,
expectedExecutionCount: 2,
queueingHintMap: QueueingHintMapPerProfile{
"": {
NodeAdd: {
framework.NodeAdd: {
{
// Skip will be ignored
PluginName: "fooPlugin1",
@ -3779,14 +3779,14 @@ func Test_isPodWorthRequeuing(t *testing.T) {
UnschedulablePlugins: sets.New("fooPlugin1", "fooPlugin2"),
PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()),
},
event: NodeAdd,
event: framework.NodeAdd,
oldObj: nil,
newObj: st.MakeNode().Node,
expected: queueSkip,
expectedExecutionCount: 2,
queueingHintMap: QueueingHintMapPerProfile{
"": {
NodeAdd: {
framework.NodeAdd: {
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnSkip,
@ -3834,7 +3834,7 @@ func Test_isPodWorthRequeuing(t *testing.T) {
QueueingHintFn: queueHintReturnQueue,
},
},
NodeAdd: { // not executed because NodeAdd is unrelated.
framework.NodeAdd: { // not executed because NodeAdd is unrelated.
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnQueue,

View File

@ -38,7 +38,6 @@ import (
"k8s.io/kubernetes/pkg/apis/core/validation"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util"
utiltrace "k8s.io/utils/trace"
@ -355,11 +354,11 @@ func (sched *Scheduler) handleBindingCycleError(
// It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would
// add this event to in-flight events and thus move the assumed pod to backoffQ anyways if the plugins don't have appropriate QueueingHint.
if status.IsRejected() {
defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, internalqueue.AssignedPodDelete, assumedPod, nil, func(pod *v1.Pod) bool {
defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.AssignedPodDelete, assumedPod, nil, func(pod *v1.Pod) bool {
return assumedPod.UID != pod.UID
})
} else {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, internalqueue.AssignedPodDelete, assumedPod, nil, nil)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.AssignedPodDelete, assumedPod, nil, nil)
}
}

View File

@ -782,9 +782,10 @@ func Test_buildQueueingHintMap(t *testing.T) {
// Test_UnionedGVKs tests UnionedGVKs worked with buildQueueingHintMap.
func Test_UnionedGVKs(t *testing.T) {
tests := []struct {
name string
plugins schedulerapi.PluginSet
want map[framework.GVK]framework.ActionType
name string
plugins schedulerapi.PluginSet
want map[framework.GVK]framework.ActionType
enableInPlacePodVerticalScaling bool
}{
{
name: "filter without EnqueueExtensions plugin",
@ -867,10 +868,10 @@ func Test_UnionedGVKs(t *testing.T) {
want: map[framework.GVK]framework.ActionType{},
},
{
name: "plugins with default profile",
name: "plugins with default profile (InPlacePodVerticalScaling: disabled)",
plugins: schedulerapi.PluginSet{Enabled: defaults.PluginsV1.MultiPoint.Enabled},
want: map[framework.GVK]framework.ActionType{
framework.Pod: framework.All,
framework.Pod: framework.Add | framework.UpdatePodLabel | framework.Delete,
framework.Node: framework.All,
framework.CSINode: framework.All - framework.Delete,
framework.CSIDriver: framework.All - framework.Delete,
@ -880,9 +881,26 @@ func Test_UnionedGVKs(t *testing.T) {
framework.StorageClass: framework.All - framework.Delete,
},
},
{
name: "plugins with default profile (InPlacePodVerticalScaling: enabled)",
plugins: schedulerapi.PluginSet{Enabled: defaults.PluginsV1.MultiPoint.Enabled},
want: map[framework.GVK]framework.ActionType{
framework.Pod: framework.Add | framework.UpdatePodLabel | framework.UpdatePodScaleDown | framework.Delete,
framework.Node: framework.All,
framework.CSINode: framework.All - framework.Delete,
framework.CSIDriver: framework.All - framework.Delete,
framework.CSIStorageCapacity: framework.All - framework.Delete,
framework.PersistentVolume: framework.All - framework.Delete,
framework.PersistentVolumeClaim: framework.All - framework.Delete,
framework.StorageClass: framework.All - framework.Delete,
},
enableInPlacePodVerticalScaling: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, tt.enableInPlacePodVerticalScaling)
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()