Merge pull request #100322 from minbaev/cluster-event-reg-cleanup

got rid of ClusterEventReg generate ClusterEvent objects on the fly
This commit is contained in:
Kubernetes Prow Robot 2021-04-14 21:27:49 -07:00 committed by GitHub
commit a95fd40203
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 120 additions and 119 deletions

View File

@ -119,8 +119,8 @@ func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
nodeInfo := sched.SchedulerCache.UpdateNode(oldNode, newNode) nodeInfo := sched.SchedulerCache.UpdateNode(oldNode, newNode)
// Only requeue unschedulable pods if the node became more schedulable. // Only requeue unschedulable pods if the node became more schedulable.
if event := nodeSchedulingPropertiesChange(newNode, oldNode); event != "" { if event := nodeSchedulingPropertiesChange(newNode, oldNode); event != nil {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(event, preCheckForNode(nodeInfo)) sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(*event, preCheckForNode(nodeInfo))
} }
} }
@ -412,24 +412,24 @@ func addAllEventHandlers(
) )
} }
func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) string { func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent {
if nodeSpecUnschedulableChanged(newNode, oldNode) { if nodeSpecUnschedulableChanged(newNode, oldNode) {
return queue.NodeSpecUnschedulableChange return &queue.NodeSpecUnschedulableChange
} }
if nodeAllocatableChanged(newNode, oldNode) { if nodeAllocatableChanged(newNode, oldNode) {
return queue.NodeAllocatableChange return &queue.NodeAllocatableChange
} }
if nodeLabelsChanged(newNode, oldNode) { if nodeLabelsChanged(newNode, oldNode) {
return queue.NodeLabelChange return &queue.NodeLabelChange
} }
if nodeTaintsChanged(newNode, oldNode) { if nodeTaintsChanged(newNode, oldNode) {
return queue.NodeTaintChange return &queue.NodeTaintChange
} }
if nodeConditionsChanged(newNode, oldNode) { if nodeConditionsChanged(newNode, oldNode) {
return queue.NodeConditionChange return &queue.NodeConditionChange
} }
return "" return nil
} }
func nodeAllocatableChanged(newNode *v1.Node, oldNode *v1.Node) bool { func nodeAllocatableChanged(newNode *v1.Node, oldNode *v1.Node) bool {

View File

@ -75,15 +75,21 @@ const (
WildCard GVK = "*" WildCard GVK = "*"
) )
// WildCardEvent semantically matches all resources on all actions.
var WildCardEvent = ClusterEvent{Resource: WildCard, ActionType: All}
// ClusterEvent abstracts how a system resource's state gets changed. // ClusterEvent abstracts how a system resource's state gets changed.
// Resource represents the standard API resources such as Pod, Node, etc. // Resource represents the standard API resources such as Pod, Node, etc.
// ActionType denotes the specific change such as Add, Update or Delete. // ActionType denotes the specific change such as Add, Update or Delete.
type ClusterEvent struct { type ClusterEvent struct {
Resource GVK Resource GVK
ActionType ActionType ActionType ActionType
Label string
}
// IsWildCard returns true if ClusterEvent follows WildCard semantics
func (ce ClusterEvent) IsWildCard() bool {
if ce.Resource == WildCard && ce.ActionType == All {
return true
}
return false
} }
// QueuedPodInfo is a Pod wrapper with additional information related to // QueuedPodInfo is a Pod wrapper with additional information related to

View File

@ -20,80 +20,59 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
) )
// Events that trigger scheduler queue to change.
const ( const (
// PodAdd is the event when a new pod is added to API server. // PodAdd is the event when a new pod is added to API server.
PodAdd = "PodAdd" PodAdd = "PodAdd"
// NodeAdd is the event when a new node is added to the cluster.
NodeAdd = "NodeAdd"
// ScheduleAttemptFailure is the event when a schedule attempt fails. // ScheduleAttemptFailure is the event when a schedule attempt fails.
ScheduleAttemptFailure = "ScheduleAttemptFailure" ScheduleAttemptFailure = "ScheduleAttemptFailure"
// BackoffComplete is the event when a pod finishes backoff. // BackoffComplete is the event when a pod finishes backoff.
BackoffComplete = "BackoffComplete" BackoffComplete = "BackoffComplete"
// UnschedulableTimeout is the event when a pod stays in unschedulable for longer than timeout.
UnschedulableTimeout = "UnschedulableTimeout"
// AssignedPodAdd is the event when a pod is added that causes pods with matching affinity terms
// to be more schedulable.
AssignedPodAdd = "AssignedPodAdd"
// AssignedPodUpdate is the event when a pod is updated that causes pods with matching affinity
// terms to be more schedulable.
AssignedPodUpdate = "AssignedPodUpdate"
// AssignedPodDelete is the event when a pod is deleted that causes pods with matching affinity
// terms to be more schedulable.
AssignedPodDelete = "AssignedPodDelete"
// PvAdd is the event when a persistent volume is added in the cluster.
PvAdd = "PvAdd"
// PvUpdate is the event when a persistent volume is updated in the cluster.
PvUpdate = "PvUpdate"
// PvcAdd is the event when a persistent volume claim is added in the cluster.
PvcAdd = "PvcAdd"
// PvcUpdate is the event when a persistent volume claim is updated in the cluster.
PvcUpdate = "PvcUpdate"
// StorageClassAdd is the event when a StorageClass is added in the cluster.
StorageClassAdd = "StorageClassAdd"
// ServiceAdd is the event when a service is added in the cluster.
ServiceAdd = "ServiceAdd"
// ServiceUpdate is the event when a service is updated in the cluster.
ServiceUpdate = "ServiceUpdate"
// ServiceDelete is the event when a service is deleted in the cluster.
ServiceDelete = "ServiceDelete"
// CSINodeAdd is the event when a CSI node is added in the cluster.
CSINodeAdd = "CSINodeAdd"
// CSINodeUpdate is the event when a CSI node is updated in the cluster.
CSINodeUpdate = "CSINodeUpdate"
// NodeSpecUnschedulableChange is the event when unschedulable node spec is changed.
NodeSpecUnschedulableChange = "NodeSpecUnschedulableChange"
// NodeAllocatableChange is the event when node allocatable is changed.
NodeAllocatableChange = "NodeAllocatableChange"
// NodeLabelChange is the event when node label is changed.
NodeLabelChange = "NodeLabelChange"
// NodeTaintChange is the event when node taint is changed.
NodeTaintChange = "NodeTaintChange"
// NodeConditionChange is the event when node condition is changed.
NodeConditionChange = "NodeConditionChange"
) )
// TODO: benchmark the perf gain if making the keys as enums (int), and then var (
// making clusterEventReg a []framework.ClusterEvent. // AssignedPodAdd is the event when a pod is added that causes pods with matching affinity terms
var clusterEventReg = map[string]framework.ClusterEvent{ // to be more schedulable.
AssignedPodAdd: {Resource: framework.Pod, ActionType: framework.Add}, AssignedPodAdd = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add, Label: "AssignedPodAdd"}
AssignedPodUpdate: {Resource: framework.Pod, ActionType: framework.Update}, // NodeAdd is the event when a new node is added to the cluster.
AssignedPodDelete: {Resource: framework.Pod, ActionType: framework.Delete}, NodeAdd = framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add, Label: "NodeAdd"}
NodeAdd: {Resource: framework.Node, ActionType: framework.Add}, // AssignedPodUpdate is the event when a pod is updated that causes pods with matching affinity
NodeSpecUnschedulableChange: {Resource: framework.Node, ActionType: framework.UpdateNodeTaint}, // terms to be more schedulable.
NodeAllocatableChange: {Resource: framework.Node, ActionType: framework.UpdateNodeAllocatable}, AssignedPodUpdate = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update, Label: "AssignedPodUpdate"}
NodeLabelChange: {Resource: framework.Node, ActionType: framework.UpdateNodeLabel}, // AssignedPodDelete is the event when a pod is deleted that causes pods with matching affinity
NodeTaintChange: {Resource: framework.Node, ActionType: framework.UpdateNodeTaint}, // terms to be more schedulable.
NodeConditionChange: {Resource: framework.Node, ActionType: framework.UpdateNodeCondition}, AssignedPodDelete = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete, Label: "AssignedPodDelete"}
PvAdd: {Resource: framework.PersistentVolume, ActionType: framework.Add}, // NodeSpecUnschedulableChange is the event when unschedulable node spec is changed.
PvUpdate: {Resource: framework.PersistentVolume, ActionType: framework.Update}, NodeSpecUnschedulableChange = framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint, Label: "NodeSpecUnschedulableChange"}
PvcAdd: {Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}, // NodeAllocatableChange is the event when node allocatable is changed.
PvcUpdate: {Resource: framework.PersistentVolumeClaim, ActionType: framework.Update}, NodeAllocatableChange = framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeAllocatable, Label: "NodeAllocatableChange"}
StorageClassAdd: {Resource: framework.StorageClass, ActionType: framework.Add}, // NodeLabelChange is the event when node label is changed.
CSINodeAdd: {Resource: framework.CSINode, ActionType: framework.Add}, NodeLabelChange = framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeLabel, Label: "NodeLabelChange"}
CSINodeUpdate: {Resource: framework.CSINode, ActionType: framework.Update}, // NodeTaintChange is the event when node taint is changed.
ServiceAdd: {Resource: framework.Service, ActionType: framework.Add}, NodeTaintChange = framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint, Label: "NodeTaintChange"}
ServiceUpdate: {Resource: framework.Service, ActionType: framework.Update}, // NodeConditionChange is the event when node condition is changed.
ServiceDelete: {Resource: framework.Service, ActionType: framework.Delete}, NodeConditionChange = framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeCondition, Label: "NodeConditionChange"}
UnschedulableTimeout: framework.WildCardEvent, // PvAdd is the event when a persistent volume is added in the cluster.
} PvAdd = framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add, Label: "PvAdd"}
// PvUpdate is the event when a persistent volume is updated in the cluster.
PvUpdate = framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Update, Label: "PvUpdate"}
// PvcAdd is the event when a persistent volume claim is added in the cluster.
PvcAdd = framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add, Label: "PvcAdd"}
// PvcUpdate is the event when a persistent volume claim is updated in the cluster.
PvcUpdate = framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Update, Label: "PvcUpdate"}
// StorageClassAdd is the event when a StorageClass is added in the cluster.
StorageClassAdd = framework.ClusterEvent{Resource: framework.StorageClass, ActionType: framework.Add, Label: "StorageClassAdd"}
// CSINodeAdd is the event when a CSI node is added in the cluster.
CSINodeAdd = framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.Add, Label: "CSINodeAdd"}
// CSINodeUpdate is the event when a CSI node is updated in the cluster.
CSINodeUpdate = framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.Update, Label: "CSINodeUpdate"}
// ServiceAdd is the event when a service is added in the cluster.
ServiceAdd = framework.ClusterEvent{Resource: framework.Service, ActionType: framework.Add, Label: "ServiceAdd"}
// ServiceUpdate is the event when a service is updated in the cluster.
ServiceUpdate = framework.ClusterEvent{Resource: framework.Service, ActionType: framework.Update, Label: "ServiceUpdate"}
// ServiceDelete is the event when a service is deleted in the cluster.
ServiceDelete = framework.ClusterEvent{Resource: framework.Service, ActionType: framework.Delete, Label: "ServiceDelete"}
// WildCardEvent semantically matches all resources on all actions.
WildCardEvent = framework.ClusterEvent{Resource: framework.WildCard, ActionType: framework.All, Label: "WildCardEvent"}
// UnschedulableTimeout is the event when a pod stays in unschedulable for longer than timeout.
UnschedulableTimeout = framework.ClusterEvent{Resource: framework.WildCard, ActionType: framework.All, Label: "UnschedulableTimeout"}
)

View File

@ -90,7 +90,7 @@ type SchedulingQueue interface {
Pop() (*framework.QueuedPodInfo, error) Pop() (*framework.QueuedPodInfo, error)
Update(oldPod, newPod *v1.Pod) error Update(oldPod, newPod *v1.Pod) error
Delete(pod *v1.Pod) error Delete(pod *v1.Pod) error
MoveAllToActiveOrBackoffQueue(event string, preCheck PreEnqueueCheck) MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck)
AssignedPodAdded(pod *v1.Pod) AssignedPodAdded(pod *v1.Pod)
AssignedPodUpdated(pod *v1.Pod) AssignedPodUpdated(pod *v1.Pod)
PendingPods() []*v1.Pod PendingPods() []*v1.Pod
@ -531,7 +531,7 @@ func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
// This function adds all pods and then signals the condition variable to ensure that // This function adds all pods and then signals the condition variable to ensure that
// if Pop() is waiting for an item, it receives it after all the pods are in the // if Pop() is waiting for an item, it receives it after all the pods are in the
// queue and the head is the highest priority pod. // queue and the head is the highest priority pod.
func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event string, preCheck PreEnqueueCheck) { func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck) {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
unschedulablePods := make([]*framework.QueuedPodInfo, 0, len(p.unschedulableQ.podInfoMap)) unschedulablePods := make([]*framework.QueuedPodInfo, 0, len(p.unschedulableQ.podInfoMap))
@ -544,7 +544,7 @@ func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event string, preCheck Pre
} }
// NOTE: this function assumes lock has been acquired in caller // NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.QueuedPodInfo, event string) { func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.QueuedPodInfo, event framework.ClusterEvent) {
moved := false moved := false
for _, pInfo := range podInfoList { for _, pInfo := range podInfoList {
// If the event doesn't help making the Pod schedulable, continue. // If the event doesn't help making the Pod schedulable, continue.
@ -560,14 +560,14 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.
if err := p.podBackoffQ.Add(pInfo); err != nil { if err := p.podBackoffQ.Add(pInfo); err != nil {
klog.ErrorS(err, "Error adding pod to the backoff queue", "pod", klog.KObj(pod)) klog.ErrorS(err, "Error adding pod to the backoff queue", "pod", klog.KObj(pod))
} else { } else {
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc() metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event.Label).Inc()
p.unschedulableQ.delete(pod) p.unschedulableQ.delete(pod)
} }
} else { } else {
if err := p.activeQ.Add(pInfo); err != nil { if err := p.activeQ.Add(pInfo); err != nil {
klog.ErrorS(err, "Error adding pod to the scheduling queue", "pod", klog.KObj(pod)) klog.ErrorS(err, "Error adding pod to the scheduling queue", "pod", klog.KObj(pod))
} else { } else {
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc() metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event.Label).Inc()
p.unschedulableQ.delete(pod) p.unschedulableQ.delete(pod)
} }
} }
@ -869,12 +869,9 @@ func podInfoKeyFunc(obj interface{}) (string, error) {
// Checks if the Pod may become schedulable upon the event. // Checks if the Pod may become schedulable upon the event.
// This is achieved by looking up the global clusterEventMap registry. // This is achieved by looking up the global clusterEventMap registry.
func (p *PriorityQueue) podMatchesEvent(podInfo *framework.QueuedPodInfo, event string) bool { func (p *PriorityQueue) podMatchesEvent(podInfo *framework.QueuedPodInfo, clusterEvent framework.ClusterEvent) bool {
clusterEvent, ok := clusterEventReg[event]
if !ok { if clusterEvent.IsWildCard() {
return false
}
if clusterEvent == framework.WildCardEvent {
return true return true
} }
@ -885,7 +882,7 @@ func (p *PriorityQueue) podMatchesEvent(podInfo *framework.QueuedPodInfo, event
// Note the ActionTypes don't need to be *identical*. We check if the ANDed value // Note the ActionTypes don't need to be *identical*. We check if the ANDed value
// is zero or not. In this way, it's easy to tell Update&Delete is not compatible, // is zero or not. In this way, it's easy to tell Update&Delete is not compatible,
// but Update&All is. // but Update&All is.
evtMatch := evt == framework.WildCardEvent || evtMatch := evt.IsWildCard() ||
(evt.Resource == clusterEvent.Resource && evt.ActionType&clusterEvent.ActionType != 0) (evt.Resource == clusterEvent.Resource && evt.ActionType&clusterEvent.ActionType != 0)
// Secondly verify the plugin name matches. // Secondly verify the plugin name matches.

View File

@ -46,6 +46,12 @@ const queueMetricMetadata = `
# TYPE scheduler_queue_incoming_pods_total counter # TYPE scheduler_queue_incoming_pods_total counter
` `
var (
TestEvent = framework.ClusterEvent{Resource: "test"}
NodeAllEvent = framework.ClusterEvent{Resource: framework.Node, ActionType: framework.All}
EmptyEvent = framework.ClusterEvent{}
)
var lowPriority, midPriority, highPriority = int32(0), int32(100), int32(1000) var lowPriority, midPriority, highPriority = int32(0), int32(100), int32(1000)
var mediumPriority = (lowPriority + highPriority) / 2 var mediumPriority = (lowPriority + highPriority) / 2
var highPriorityPodInfo, highPriNominatedPodInfo, medPriorityPodInfo, unschedulablePodInfo = framework.NewPodInfo(&v1.Pod{ var highPriorityPodInfo, highPriNominatedPodInfo, medPriorityPodInfo, unschedulablePodInfo = framework.NewPodInfo(&v1.Pod{
@ -239,7 +245,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
} }
// move all pods to active queue when we were trying to schedule them // move all pods to active queue when we were trying to schedule them
q.MoveAllToActiveOrBackoffQueue("test", nil) q.MoveAllToActiveOrBackoffQueue(TestEvent, nil)
oldCycle := q.SchedulingCycle() oldCycle := q.SchedulingCycle()
firstPod, _ := q.Pop() firstPod, _ := q.Pop()
@ -397,9 +403,10 @@ func TestPriorityQueue_Delete(t *testing.T) {
} }
func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) { func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) {
tests := []struct { tests := []struct {
name string name string
moveEvent string moveEvent framework.ClusterEvent
}{ }{
{ {
name: "baseline", name: "baseline",
@ -419,8 +426,22 @@ func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) {
highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod, highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod,
medPriorityPodInfo.Pod, unschedulablePodInfo.Pod, medPriorityPodInfo.Pod, unschedulablePodInfo.Pod,
} }
events := []string{NodeAdd, NodeTaintChange, NodeAllocatableChange, NodeConditionChange, NodeLabelChange, PodAdd,
PvcAdd, PvcUpdate, PvAdd, PvUpdate, StorageClassAdd, CSINodeAdd, CSINodeUpdate} events := []framework.ClusterEvent{
NodeAdd,
NodeTaintChange,
NodeAllocatableChange,
NodeConditionChange,
NodeLabelChange,
PvcAdd,
PvcUpdate,
PvAdd,
PvUpdate,
StorageClassAdd,
CSINodeAdd,
CSINodeUpdate,
}
pluginNum := 20 pluginNum := 20
var plugins []string var plugins []string
// Mimic that we have 20 plugins loaded in runtime. // Mimic that we have 20 plugins loaded in runtime.
@ -441,10 +462,10 @@ func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) {
// - 1/3 of plugins registered for events[2] // - 1/3 of plugins registered for events[2]
// - ... // - ...
for j := 0; j < len(events); j++ { for j := 0; j < len(events); j++ {
m[clusterEventReg[events[j]]] = sets.NewString() m[events[j]] = sets.NewString()
for k := 0; k < len(plugins); k++ { for k := 0; k < len(plugins); k++ {
if (k+1)%(j+1) == 0 { if (k+1)%(j+1) == 0 {
m[clusterEventReg[events[j]]].Insert(plugins[k]) m[events[j]].Insert(plugins[k])
} }
} }
} }
@ -478,7 +499,7 @@ func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) {
} }
b.StartTimer() b.StartTimer()
if tt.moveEvent != "" { if tt.moveEvent.Resource != "" {
q.MoveAllToActiveOrBackoffQueue(tt.moveEvent, nil) q.MoveAllToActiveOrBackoffQueue(tt.moveEvent, nil)
} else { } else {
// Random case. // Random case.
@ -581,9 +602,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
} }
c := clock.NewFakeClock(time.Now()) c := clock.NewFakeClock(time.Now())
m := map[framework.ClusterEvent]sets.String{ m := map[framework.ClusterEvent]sets.String{AssignedPodAdd: sets.NewString("fakePlugin")}
{Resource: framework.Pod, ActionType: framework.Add}: sets.NewString("fakePlugin"),
}
q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m))
q.Add(medPriorityPodInfo.Pod) q.Add(medPriorityPodInfo.Pod)
// Add a couple of pods to the unschedulableQ. // Add a couple of pods to the unschedulableQ.
@ -643,7 +662,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
t.Error("Unexpected list of pending Pods.") t.Error("Unexpected list of pending Pods.")
} }
// Move all to active queue. We should still see the same set of pods. // Move all to active queue. We should still see the same set of pods.
q.MoveAllToActiveOrBackoffQueue("test", nil) q.MoveAllToActiveOrBackoffQueue(TestEvent, nil)
if !reflect.DeepEqual(expectedSet, makeSet(q.PendingPods())) { if !reflect.DeepEqual(expectedSet, makeSet(q.PendingPods())) {
t.Error("Unexpected list of pending Pods...") t.Error("Unexpected list of pending Pods...")
} }
@ -1106,7 +1125,7 @@ func TestHighPriorityBackoff(t *testing.T) {
// Put in the unschedulable queue. // Put in the unschedulable queue.
q.AddUnschedulableIfNotPresent(p, q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(p, q.SchedulingCycle())
// Move all unschedulable pods to the active queue. // Move all unschedulable pods to the active queue.
q.MoveAllToActiveOrBackoffQueue("test", nil) q.MoveAllToActiveOrBackoffQueue(TestEvent, nil)
p, err = q.Pop() p, err = q.Pop()
if err != nil { if err != nil {
@ -1122,7 +1141,7 @@ func TestHighPriorityBackoff(t *testing.T) {
func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) { func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) {
c := clock.NewFakeClock(time.Now()) c := clock.NewFakeClock(time.Now())
m := map[framework.ClusterEvent]sets.String{ m := map[framework.ClusterEvent]sets.String{
{Resource: framework.Node, ActionType: framework.Add}: sets.NewString("fakePlugin"), NodeAdd: sets.NewString("fakePlugin"),
} }
q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m))
midPod := v1.Pod{ midPod := v1.Pod{
@ -1727,34 +1746,34 @@ func TestPodMatchesEvent(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
podInfo *framework.QueuedPodInfo podInfo *framework.QueuedPodInfo
event string event framework.ClusterEvent
clusterEventMap map[framework.ClusterEvent]sets.String clusterEventMap map[framework.ClusterEvent]sets.String
want bool want bool
}{ }{
{ {
name: "event not registered", name: "event not registered",
podInfo: newQueuedPodInfoForLookup(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}), podInfo: newQueuedPodInfoForLookup(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}),
event: "ClusterTearDown", event: EmptyEvent,
clusterEventMap: map[framework.ClusterEvent]sets.String{ clusterEventMap: map[framework.ClusterEvent]sets.String{
{Resource: framework.Node, ActionType: framework.All}: sets.NewString("foo"), NodeAllEvent: sets.NewString("foo"),
}, },
want: false, want: false,
}, },
{ {
name: "pod's failed plugin matches but event does not match", name: "pod's failed plugin matches but event does not match",
podInfo: newQueuedPodInfoForLookup(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}, "bar"), podInfo: newQueuedPodInfoForLookup(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}, "bar"),
event: PodAdd, event: AssignedPodAdd,
clusterEventMap: map[framework.ClusterEvent]sets.String{ clusterEventMap: map[framework.ClusterEvent]sets.String{
{Resource: framework.Node, ActionType: framework.All}: sets.NewString("foo", "bar"), NodeAllEvent: sets.NewString("foo", "bar"),
}, },
want: false, want: false,
}, },
{ {
name: "wildcard event wins regardless of event matching", name: "wildcard event wins regardless of event matching",
podInfo: newQueuedPodInfoForLookup(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}, "bar"), podInfo: newQueuedPodInfoForLookup(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}, "bar"),
event: UnschedulableTimeout, event: WildCardEvent,
clusterEventMap: map[framework.ClusterEvent]sets.String{ clusterEventMap: map[framework.ClusterEvent]sets.String{
{Resource: framework.Node, ActionType: framework.All}: sets.NewString("foo"), NodeAllEvent: sets.NewString("foo"),
}, },
want: true, want: true,
}, },
@ -1763,7 +1782,7 @@ func TestPodMatchesEvent(t *testing.T) {
podInfo: newQueuedPodInfoForLookup(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}, "bar"), podInfo: newQueuedPodInfoForLookup(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}, "bar"),
event: NodeTaintChange, event: NodeTaintChange,
clusterEventMap: map[framework.ClusterEvent]sets.String{ clusterEventMap: map[framework.ClusterEvent]sets.String{
{Resource: framework.Node, ActionType: framework.All}: sets.NewString("foo", "bar"), NodeAllEvent: sets.NewString("foo", "bar"),
}, },
want: true, want: true,
}, },
@ -1772,8 +1791,8 @@ func TestPodMatchesEvent(t *testing.T) {
podInfo: newQueuedPodInfoForLookup(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}, "bar"), podInfo: newQueuedPodInfoForLookup(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}, "bar"),
event: NodeTaintChange, event: NodeTaintChange,
clusterEventMap: map[framework.ClusterEvent]sets.String{ clusterEventMap: map[framework.ClusterEvent]sets.String{
{Resource: framework.Node, ActionType: framework.All}: sets.NewString("foo"), NodeAllEvent: sets.NewString("foo"),
{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}: sets.NewString("bar"), NodeTaintChange: sets.NewString("bar"),
}, },
want: true, want: true,
}, },
@ -1782,7 +1801,7 @@ func TestPodMatchesEvent(t *testing.T) {
podInfo: newQueuedPodInfoForLookup(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}, "foo", "bar"), podInfo: newQueuedPodInfoForLookup(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}, "foo", "bar"),
event: NodeAdd, event: NodeAdd,
clusterEventMap: map[framework.ClusterEvent]sets.String{ clusterEventMap: map[framework.ClusterEvent]sets.String{
{Resource: framework.Node, ActionType: framework.All}: sets.NewString("bar"), NodeAllEvent: sets.NewString("bar"),
}, },
want: true, want: true,
}, },
@ -1791,7 +1810,7 @@ func TestPodMatchesEvent(t *testing.T) {
podInfo: newQueuedPodInfoForLookup(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}, "foo"), podInfo: newQueuedPodInfoForLookup(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}, "foo"),
event: PvAdd, event: PvAdd,
clusterEventMap: map[framework.ClusterEvent]sets.String{ clusterEventMap: map[framework.ClusterEvent]sets.String{
framework.WildCardEvent: sets.NewString("foo"), WildCardEvent: sets.NewString("foo"),
}, },
want: true, want: true,
}, },
@ -1800,7 +1819,7 @@ func TestPodMatchesEvent(t *testing.T) {
podInfo: newQueuedPodInfoForLookup(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}, "foo"), podInfo: newQueuedPodInfoForLookup(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}, "foo"),
event: PvAdd, event: PvAdd,
clusterEventMap: map[framework.ClusterEvent]sets.String{ clusterEventMap: map[framework.ClusterEvent]sets.String{
framework.WildCardEvent: sets.NewString("bar"), WildCardEvent: sets.NewString("bar"),
}, },
want: false, want: false,
}, },
@ -1867,7 +1886,7 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
for _, podInfo := range tt.podInfos { for _, podInfo := range tt.podInfos {
q.AddUnschedulableIfNotPresent(podInfo, q.schedulingCycle) q.AddUnschedulableIfNotPresent(podInfo, q.schedulingCycle)
} }
q.MoveAllToActiveOrBackoffQueue("test", tt.preEnqueueCheck) q.MoveAllToActiveOrBackoffQueue(TestEvent, tt.preEnqueueCheck)
var got []string var got []string
for q.podBackoffQ.Len() != 0 { for q.podBackoffQ.Len() != 0 {
obj, err := q.podBackoffQ.Pop() obj, err := q.podBackoffQ.Pop()