Add function NominatedPodsForNode to PodNominator interface.

- replace SchedulingQueue with PodNominator in genericScheduler.
This commit is contained in:
Wei Huang 2020-05-20 11:38:21 -07:00
parent eb34058777
commit f4b726237a
No known key found for this signature in database
GPG Key ID: BE5E9752F8B6E005
6 changed files with 20 additions and 28 deletions

View File

@ -14,7 +14,6 @@ go_library(
"//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/parallelize:go_default_library", "//pkg/scheduler/internal/parallelize:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/profile:go_default_library", "//pkg/scheduler/profile:go_default_library",
"//pkg/scheduler/util:go_default_library", "//pkg/scheduler/util:go_default_library",

View File

@ -40,7 +40,6 @@ import (
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize" "k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/profile" "k8s.io/kubernetes/pkg/scheduler/profile"
"k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/util"
@ -124,7 +123,7 @@ type ScheduleResult struct {
type genericScheduler struct { type genericScheduler struct {
cache internalcache.Cache cache internalcache.Cache
schedulingQueue internalqueue.SchedulingQueue podNominator framework.PodNominator
extenders []framework.Extender extenders []framework.Extender
nodeInfoSnapshot *internalcache.Snapshot nodeInfoSnapshot *internalcache.Snapshot
pvcLister corelisters.PersistentVolumeClaimLister pvcLister corelisters.PersistentVolumeClaimLister
@ -353,7 +352,7 @@ func (g *genericScheduler) processPreemptionWithExtenders(
// worth the complexity, especially because we generally expect to have a very // worth the complexity, especially because we generally expect to have a very
// small number of nominated pods per node. // small number of nominated pods per node.
func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod { func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod {
pods := g.schedulingQueue.NominatedPodsForNode(nodeName) pods := g.podNominator.NominatedPodsForNode(nodeName)
if len(pods) == 0 { if len(pods) == 0 {
return nil return nil
@ -522,11 +521,11 @@ func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v
// to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState, // to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState,
// 3) augmented nodeInfo. // 3) augmented nodeInfo.
func (g *genericScheduler) addNominatedPods(ctx context.Context, prof *profile.Profile, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) { func (g *genericScheduler) addNominatedPods(ctx context.Context, prof *profile.Profile, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) {
if g.schedulingQueue == nil || nodeInfo == nil || nodeInfo.Node() == nil { if g.podNominator == nil || nodeInfo == nil || nodeInfo.Node() == nil {
// This may happen only in tests. // This may happen only in tests.
return false, state, nodeInfo, nil return false, state, nodeInfo, nil
} }
nominatedPods := g.schedulingQueue.NominatedPodsForNode(nodeInfo.Node().Name) nominatedPods := g.podNominator.NominatedPodsForNode(nodeInfo.Node().Name)
if len(nominatedPods) == 0 { if len(nominatedPods) == 0 {
return false, state, nodeInfo, nil return false, state, nodeInfo, nil
} }
@ -1101,7 +1100,7 @@ func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeCla
// NewGenericScheduler creates a genericScheduler object. // NewGenericScheduler creates a genericScheduler object.
func NewGenericScheduler( func NewGenericScheduler(
cache internalcache.Cache, cache internalcache.Cache,
podQueue internalqueue.SchedulingQueue, podNominator framework.PodNominator,
nodeInfoSnapshot *internalcache.Snapshot, nodeInfoSnapshot *internalcache.Snapshot,
extenders []framework.Extender, extenders []framework.Extender,
pvcLister corelisters.PersistentVolumeClaimLister, pvcLister corelisters.PersistentVolumeClaimLister,
@ -1110,7 +1109,7 @@ func NewGenericScheduler(
percentageOfNodesToScore int32) ScheduleAlgorithm { percentageOfNodesToScore int32) ScheduleAlgorithm {
return &genericScheduler{ return &genericScheduler{
cache: cache, cache: cache,
schedulingQueue: podQueue, podNominator: podNominator,
extenders: extenders, extenders: extenders,
nodeInfoSnapshot: nodeInfoSnapshot, nodeInfoSnapshot: nodeInfoSnapshot,
pvcLister: pvcLister, pvcLister: pvcLister,

View File

@ -979,7 +979,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
if err := scheduler.cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil { if err := scheduler.cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil {
t.Fatal(err) t.Fatal(err)
} }
scheduler.schedulingQueue.AddNominatedPod(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}, "1") scheduler.podNominator.AddNominatedPod(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}, "1")
_, _, err = scheduler.findNodesThatFitPod(context.Background(), prof, framework.NewCycleState(), test.pod) _, _, err = scheduler.findNodesThatFitPod(context.Background(), prof, framework.NewCycleState(), test.pod)

View File

@ -192,7 +192,7 @@ func (c *Configurator) create() (*Scheduler, error) {
algo := core.NewGenericScheduler( algo := core.NewGenericScheduler(
c.schedulerCache, c.schedulerCache,
podQueue, nominator,
c.nodeInfoSnapshot, c.nodeInfoSnapshot,
extenders, extenders,
c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(), c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(),

View File

@ -510,4 +510,6 @@ type PodNominator interface {
DeleteNominatedPodIfExists(pod *v1.Pod) DeleteNominatedPodIfExists(pod *v1.Pod)
// UpdateNominatedPod updates the <oldPod> with <newPod>. // UpdateNominatedPod updates the <oldPod> with <newPod>.
UpdateNominatedPod(oldPod, newPod *v1.Pod) UpdateNominatedPod(oldPod, newPod *v1.Pod)
// NominatedPodsForNode returns nominatedPods on the given node.
NominatedPodsForNode(nodeName string) []*v1.Pod
} }

View File

@ -83,7 +83,6 @@ type SchedulingQueue interface {
MoveAllToActiveOrBackoffQueue(event string) MoveAllToActiveOrBackoffQueue(event string)
AssignedPodAdded(pod *v1.Pod) AssignedPodAdded(pod *v1.Pod)
AssignedPodUpdated(pod *v1.Pod) AssignedPodUpdated(pod *v1.Pod)
NominatedPodsForNode(nodeName string) []*v1.Pod
PendingPods() []*v1.Pod PendingPods() []*v1.Pod
// Close closes the SchedulingQueue so that the goroutine which is // Close closes the SchedulingQueue so that the goroutine which is
// waiting to pop items can exit gracefully. // waiting to pop items can exit gracefully.
@ -556,14 +555,6 @@ func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod
return podsToMove return podsToMove
} }
// NominatedPodsForNode returns pods that are nominated to run on the given node,
// but they are waiting for other pods to be removed from the node before they
// can be actually scheduled.
func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*v1.Pod {
// TODO: make podsForNode() public?
return p.PodNominator.(*nominatedPodMap).podsForNode(nodeName)
}
// PendingPods returns all the pending pods in the queue. This function is // PendingPods returns all the pending pods in the queue. This function is
// used for debugging purposes in the scheduler cache dumper and comparer. // used for debugging purposes in the scheduler cache dumper and comparer.
func (p *PriorityQueue) PendingPods() []*v1.Pod { func (p *PriorityQueue) PendingPods() []*v1.Pod {
@ -608,6 +599,16 @@ func (npm *nominatedPodMap) AddNominatedPod(pod *v1.Pod, nodeName string) {
npm.Unlock() npm.Unlock()
} }
// NominatedPodsForNode returns pods that are nominated to run on the given node,
// but they are waiting for other pods to be removed from the node.
func (npm *nominatedPodMap) NominatedPodsForNode(nodeName string) []*v1.Pod {
npm.RLock()
defer npm.RUnlock()
// TODO: we may need to return a copy of []*Pods to avoid modification
// on the caller side.
return npm.nominatedPods[nodeName]
}
func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool { func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
pInfo1 := podInfo1.(*framework.QueuedPodInfo) pInfo1 := podInfo1.(*framework.QueuedPodInfo)
pInfo2 := podInfo2.(*framework.QueuedPodInfo) pInfo2 := podInfo2.(*framework.QueuedPodInfo)
@ -794,15 +795,6 @@ func (npm *nominatedPodMap) UpdateNominatedPod(oldPod, newPod *v1.Pod) {
npm.add(newPod, nodeName) npm.add(newPod, nodeName)
} }
func (npm *nominatedPodMap) podsForNode(nodeName string) []*v1.Pod {
npm.RLock()
defer npm.RUnlock()
if list, ok := npm.nominatedPods[nodeName]; ok {
return list
}
return nil
}
// NewPodNominator creates a nominatedPodMap as a backing of framework.PodNominator. // NewPodNominator creates a nominatedPodMap as a backing of framework.PodNominator.
func NewPodNominator() framework.PodNominator { func NewPodNominator() framework.PodNominator {
return &nominatedPodMap{ return &nominatedPodMap{