Implement default queue sort logic as a scheduler plugin

This commit is contained in:
Wei Huang
2020-01-15 12:26:22 -08:00
parent 90d6484f1c
commit c712230ac1
23 changed files with 648 additions and 270 deletions

View File

@@ -17,7 +17,6 @@ limitations under the License.
package queue
import (
"context"
"fmt"
"reflect"
"strings"
@@ -30,17 +29,16 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/component-base/metrics/testutil"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
"k8s.io/kubernetes/pkg/scheduler/metrics"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)
const queueMetricMetadata = `
@@ -130,7 +128,7 @@ func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod {
}
func TestPriorityQueue_Add(t *testing.T) {
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
if err := q.Add(&medPriorityPod); err != nil {
t.Errorf("add failed: %v", err)
}
@@ -166,121 +164,42 @@ func TestPriorityQueue_Add(t *testing.T) {
}
}
type fakeFramework struct{}
func (*fakeFramework) QueueSortFunc() framework.LessFunc {
return func(podInfo1, podInfo2 *framework.PodInfo) bool {
prio1 := podutil.GetPodPriority(podInfo1.Pod)
prio2 := podutil.GetPodPriority(podInfo2.Pod)
return prio1 < prio2
func newDefaultFramework() framework.Framework {
defaultCfg := algorithmprovider.NewRegistry(1)[schedulerapi.SchedulerDefaultProviderName]
pl, pls := defaultCfg.FrameworkPlugins, defaultCfg.FrameworkPluginConfig
fakeClient := fake.NewSimpleClientset()
fwk, err := framework.NewFramework(
frameworkplugins.NewInTreeRegistry(&frameworkplugins.RegistryArgs{}),
pl,
pls,
framework.WithClientSet(fakeClient),
framework.WithInformerFactory(informers.NewSharedInformerFactory(fakeClient, 0)),
framework.WithSnapshotSharedLister(nodeinfosnapshot.NewEmptySnapshot()),
)
if err != nil {
panic(err)
}
}
func (f *fakeFramework) HasFilterPlugins() bool {
return true
}
func (f *fakeFramework) HasScorePlugins() bool {
return true
}
func (f *fakeFramework) ListPlugins() map[string][]config.Plugin {
return nil
}
func (*fakeFramework) NodeInfoSnapshot() *nodeinfosnapshot.Snapshot {
return nil
}
func (*fakeFramework) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status {
return nil
}
func (*fakeFramework) RunFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) framework.PluginToStatus {
return nil
}
func (*fakeFramework) RunPreFilterExtensionAddPod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
return nil
}
func (*fakeFramework) RunPreFilterExtensionRemovePod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
return nil
}
func (*fakeFramework) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) (framework.PluginToNodeScores, *framework.Status) {
return nil, nil
}
func (*fakeFramework) RunPreBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
return nil
}
func (*fakeFramework) RunBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
return nil
}
func (*fakeFramework) RunPostBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
}
func (*fakeFramework) RunPostFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses framework.NodeToStatusMap) *framework.Status {
return nil
}
func (*fakeFramework) RunReservePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
return nil
}
func (*fakeFramework) RunUnreservePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
}
func (*fakeFramework) RunPermitPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
return nil
}
func (*fakeFramework) IterateOverWaitingPods(callback func(framework.WaitingPod)) {}
func (*fakeFramework) GetWaitingPod(uid types.UID) framework.WaitingPod {
return nil
}
func (*fakeFramework) RejectWaitingPod(uid types.UID) {
}
func (*fakeFramework) ClientSet() clientset.Interface {
return nil
}
func (*fakeFramework) SharedInformerFactory() informers.SharedInformerFactory {
return nil
}
func (*fakeFramework) SnapshotSharedLister() schedulerlisters.SharedLister {
return nil
}
func (*fakeFramework) VolumeBinder() *volumebinder.VolumeBinder {
return nil
return fwk
}
func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
q := createAndRunPriorityQueue(&fakeFramework{})
q := createAndRunPriorityQueue(newDefaultFramework())
if err := q.Add(&medPriorityPod); err != nil {
t.Errorf("add failed: %v", err)
}
if err := q.Add(&highPriorityPod); err != nil {
t.Errorf("add failed: %v", err)
}
if p, err := q.Pop(); err != nil || p.Pod != &medPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Pod.Name)
}
if p, err := q.Pop(); err != nil || p.Pod != &highPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name)
}
if p, err := q.Pop(); err != nil || p.Pod != &medPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Pod.Name)
}
}
func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
q.Add(&highPriNominatedPod)
q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&highPriNominatedPod), q.SchedulingCycle()) // Must not add anything.
q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle())
@@ -312,7 +231,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
// Pods in and before current scheduling cycle will be put back to activeQueue
// if we were trying to schedule them when we received move request.
func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
q := createAndRunPriorityQueue(nil, WithClock(clock.NewFakeClock(time.Now())))
q := createAndRunPriorityQueue(newDefaultFramework(), WithClock(clock.NewFakeClock(time.Now())))
totalNum := 10
expectedPods := make([]v1.Pod, 0, totalNum)
for i := 0; i < totalNum; i++ {
@@ -379,7 +298,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
}
func TestPriorityQueue_Pop(t *testing.T) {
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
@@ -396,7 +315,7 @@ func TestPriorityQueue_Pop(t *testing.T) {
}
func TestPriorityQueue_Update(t *testing.T) {
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
q.Update(nil, &highPriorityPod)
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&highPriorityPod)); !exists {
t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name)
@@ -432,7 +351,7 @@ func TestPriorityQueue_Update(t *testing.T) {
}
func TestPriorityQueue_Delete(t *testing.T) {
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
q.Update(&highPriorityPod, &highPriNominatedPod)
q.Add(&unschedulablePod)
if err := q.Delete(&highPriNominatedPod); err != nil {
@@ -456,7 +375,7 @@ func TestPriorityQueue_Delete(t *testing.T) {
}
func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) {
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
q.Add(&medPriorityPod)
addOrUpdateUnschedulablePod(q, q.newPodInfo(&unschedulablePod))
addOrUpdateUnschedulablePod(q, q.newPodInfo(&highPriorityPod))
@@ -502,7 +421,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
Spec: v1.PodSpec{NodeName: "machine1"},
}
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
q.Add(&medPriorityPod)
// Add a couple of pods to the unschedulableQ.
addOrUpdateUnschedulablePod(q, q.newPodInfo(&unschedulablePod))
@@ -523,7 +442,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
}
func TestPriorityQueue_NominatedPodsForNode(t *testing.T) {
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
q.Add(&medPriorityPod)
q.Add(&unschedulablePod)
q.Add(&highPriorityPod)
@@ -548,7 +467,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
return pendingSet
}
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
q.Add(&medPriorityPod)
addOrUpdateUnschedulablePod(q, q.newPodInfo(&unschedulablePod))
addOrUpdateUnschedulablePod(q, q.newPodInfo(&highPriorityPod))
@@ -564,7 +483,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
}
func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
if err := q.Add(&medPriorityPod); err != nil {
t.Errorf("add failed: %v", err)
}
@@ -634,7 +553,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
func TestPriorityQueue_NewWithOptions(t *testing.T) {
q := createAndRunPriorityQueue(
nil,
newDefaultFramework(),
WithPodInitialBackoffDuration(2*time.Second),
WithPodMaxBackoffDuration(20*time.Second),
)
@@ -806,7 +725,7 @@ func TestSchedulingQueue_Close(t *testing.T) {
}{
{
name: "PriorityQueue close",
q: createAndRunPriorityQueue(nil),
q: createAndRunPriorityQueue(newDefaultFramework()),
expectedErr: fmt.Errorf(queueClosed),
},
}
@@ -835,7 +754,7 @@ func TestSchedulingQueue_Close(t *testing.T) {
// ensures that an unschedulable pod does not block head of the queue when there
// are frequent events that move pods to the active queue.
func TestRecentlyTriedPodsGoBack(t *testing.T) {
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
// Add a few pods to priority queue.
for i := 0; i < 5; i++ {
p := v1.Pod{
@@ -889,7 +808,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) {
// This behavior ensures that an unschedulable pod does not block head of the queue when there
// are frequent events that move pods to the active queue.
func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
// Add an unschedulable pod to a priority queue.
// This makes a situation that the pod was tried to schedule
@@ -980,7 +899,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
// TestHighPriorityBackoff tests that a high priority pod does not block
// other pods if it is unschedulable
func TestHighPriorityBackoff(t *testing.T) {
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
midPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@@ -1043,7 +962,7 @@ func TestHighPriorityBackoff(t *testing.T) {
// TestHighPriorityFlushUnschedulableQLeftover tests that pods will be moved to
// activeQ after one minutes if it is in unschedulableQ
func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) {
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
midPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-midpod",
@@ -1240,7 +1159,7 @@ func TestPodTimestamp(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
queue := createAndRunPriorityQueue(nil, WithClock(clock.NewFakeClock(timestamp)))
queue := createAndRunPriorityQueue(newDefaultFramework(), WithClock(clock.NewFakeClock(timestamp)))
var podInfoList []*framework.PodInfo
for i, op := range test.operations {
@@ -1407,7 +1326,7 @@ scheduler_pending_pods{queue="unschedulable"} 0
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
resetMetrics()
queue := createAndRunPriorityQueue(nil, WithClock(clock.NewFakeClock(timestamp)))
queue := createAndRunPriorityQueue(newDefaultFramework(), WithClock(clock.NewFakeClock(timestamp)))
for i, op := range test.operations {
for _, pInfo := range test.operands[i] {
op(queue, pInfo)
@@ -1436,7 +1355,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
// Case 1: A pod is created and scheduled after 1 attempt. The queue operations are
// Add -> Pop.
c := clock.NewFakeClock(timestamp)
queue := createAndRunPriorityQueue(nil, WithClock(c))
queue := createAndRunPriorityQueue(newDefaultFramework(), WithClock(c))
queue.Add(pod)
pInfo, err := queue.Pop()
if err != nil {
@@ -1447,7 +1366,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
// Case 2: A pod is created and scheduled after 2 attempts. The queue operations are
// Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Pop.
c = clock.NewFakeClock(timestamp)
queue = createAndRunPriorityQueue(nil, WithClock(c))
queue = createAndRunPriorityQueue(newDefaultFramework(), WithClock(c))
queue.Add(pod)
pInfo, err = queue.Pop()
if err != nil {
@@ -1467,7 +1386,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
// Case 3: Similar to case 2, but before the second pop, call update, the queue operations are
// Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Update -> Pop.
c = clock.NewFakeClock(timestamp)
queue = createAndRunPriorityQueue(nil, WithClock(c))
queue = createAndRunPriorityQueue(newDefaultFramework(), WithClock(c))
queue.Add(pod)
pInfo, err = queue.Pop()
if err != nil {
@@ -1565,7 +1484,7 @@ func TestIncomingPodsMetrics(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
metrics.SchedulerQueueIncomingPods.Reset()
queue := NewPriorityQueue(nil, WithClock(clock.NewFakeClock(timestamp)))
queue := NewPriorityQueue(newDefaultFramework(), WithClock(clock.NewFakeClock(timestamp)))
queue.Close()
queue.Run()
for _, op := range test.operations {