From cda749c2376b3a3cfcecf9929728e80f4961ebac Mon Sep 17 00:00:00 2001 From: Yongkun Anfernee Gui Date: Mon, 12 Mar 2018 14:43:01 -0700 Subject: [PATCH] Pod comparer should count pods in scheduling queue Pods in scheduler cache contains both the scheduled pods and those not scheduled yet in scheduling queue. This commit adds the second group of pods into consideration while comparing the cache. --- pkg/scheduler/core/scheduling_queue.go | 28 +++++++++++++++- pkg/scheduler/factory/cache_comparer.go | 11 +++++-- pkg/scheduler/factory/cache_comparer_test.go | 34 +++++++++++++++++++- pkg/scheduler/factory/factory.go | 1 + pkg/scheduler/schedulercache/cache.go | 3 +- 5 files changed, 72 insertions(+), 5 deletions(-) diff --git a/pkg/scheduler/core/scheduling_queue.go b/pkg/scheduler/core/scheduling_queue.go index 9145416658b..0710a27f856 100644 --- a/pkg/scheduler/core/scheduling_queue.go +++ b/pkg/scheduler/core/scheduling_queue.go @@ -39,8 +39,9 @@ import ( priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" "k8s.io/kubernetes/pkg/scheduler/util" - "github.com/golang/glog" "reflect" + + "github.com/golang/glog" ) // SchedulingQueue is an interface for a queue to store pods waiting to be scheduled. @@ -57,6 +58,7 @@ type SchedulingQueue interface { AssignedPodAdded(pod *v1.Pod) AssignedPodUpdated(pod *v1.Pod) WaitingPodsForNode(nodeName string) []*v1.Pod + WaitingPods() []*v1.Pod } // NewSchedulingQueue initializes a new scheduling queue. If pod priority is @@ -116,6 +118,15 @@ func (f *FIFO) Pop() (*v1.Pod, error) { return result.(*v1.Pod), nil } +// WaitingPods returns all the waiting pods in the queue. +func (f *FIFO) WaitingPods() []*v1.Pod { + result := []*v1.Pod{} + for _, pod := range f.FIFO.List() { + result = append(result, pod.(*v1.Pod)) + } + return result +} + // FIFO does not need to react to events, as all pods are always in the active // scheduling queue anyway. @@ -460,6 +471,21 @@ func (p *PriorityQueue) WaitingPodsForNode(nodeName string) []*v1.Pod { return nil } +// WaitingPods returns all the waiting pods in the queue. +func (p *PriorityQueue) WaitingPods() []*v1.Pod { + p.lock.Lock() + defer p.lock.Unlock() + + result := []*v1.Pod{} + for _, pod := range p.activeQ.List() { + result = append(result, pod.(*v1.Pod)) + } + for _, pod := range p.unschedulableQ.pods { + result = append(result, pod) + } + return result +} + // UnschedulablePodsMap holds pods that cannot be scheduled. This data structure // is used to implement unschedulableQ. type UnschedulablePodsMap struct { diff --git a/pkg/scheduler/factory/cache_comparer.go b/pkg/scheduler/factory/cache_comparer.go index a8821dbf6a8..b93cff7f2b6 100644 --- a/pkg/scheduler/factory/cache_comparer.go +++ b/pkg/scheduler/factory/cache_comparer.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/labels" corelisters "k8s.io/client-go/listers/core/v1" v1beta1 "k8s.io/client-go/listers/policy/v1beta1" + "k8s.io/kubernetes/pkg/scheduler/core" "k8s.io/kubernetes/pkg/scheduler/schedulercache" ) @@ -34,6 +35,7 @@ type cacheComparer struct { podLister corelisters.PodLister pdbLister v1beta1.PodDisruptionBudgetLister cache schedulercache.Cache + podQueue core.SchedulingQueue compareStrategy } @@ -59,11 +61,13 @@ func (c *cacheComparer) Compare() error { snapshot := c.cache.Snapshot() + waitingPods := c.podQueue.WaitingPods() + if missed, redundant := c.CompareNodes(nodes, snapshot.Nodes); len(missed)+len(redundant) != 0 { glog.Warningf("cache mismatch: missed nodes: %s; redundant nodes: %s", missed, redundant) } - if missed, redundant := c.ComparePods(pods, snapshot.Nodes); len(missed)+len(redundant) != 0 { + if missed, redundant := c.ComparePods(pods, waitingPods, snapshot.Nodes); len(missed)+len(redundant) != 0 { glog.Warningf("cache mismatch: missed pods: %s; redundant pods: %s", missed, redundant) } @@ -91,7 +95,7 @@ func (c compareStrategy) CompareNodes(nodes []*v1.Node, nodeinfos map[string]*sc return compareStrings(actual, cached) } -func (c compareStrategy) ComparePods(pods []*v1.Pod, nodeinfos map[string]*schedulercache.NodeInfo) (missed, redundant []string) { +func (c compareStrategy) ComparePods(pods, waitingPods []*v1.Pod, nodeinfos map[string]*schedulercache.NodeInfo) (missed, redundant []string) { actual := []string{} for _, pod := range pods { actual = append(actual, string(pod.UID)) @@ -103,6 +107,9 @@ func (c compareStrategy) ComparePods(pods []*v1.Pod, nodeinfos map[string]*sched cached = append(cached, string(pod.UID)) } } + for _, pod := range waitingPods { + cached = append(cached, string(pod.UID)) + } return compareStrings(actual, cached) } diff --git a/pkg/scheduler/factory/cache_comparer_test.go b/pkg/scheduler/factory/cache_comparer_test.go index ff87e400e21..03f44595c62 100644 --- a/pkg/scheduler/factory/cache_comparer_test.go +++ b/pkg/scheduler/factory/cache_comparer_test.go @@ -86,24 +86,49 @@ func TestComparePods(t *testing.T) { tests := []struct { actual []string cached []string + queued []string missing []string redundant []string }{ { actual: []string{"foo", "bar"}, cached: []string{"bar", "foo", "foobar"}, + queued: []string{}, + missing: []string{}, + redundant: []string{"foobar"}, + }, + { + actual: []string{"foo", "bar"}, + cached: []string{"foo", "foobar"}, + queued: []string{"bar"}, missing: []string{}, redundant: []string{"foobar"}, }, { actual: []string{"foo", "bar", "foobar"}, cached: []string{"bar", "foo"}, + queued: []string{}, + missing: []string{"foobar"}, + redundant: []string{}, + }, + { + actual: []string{"foo", "bar", "foobar"}, + cached: []string{"foo"}, + queued: []string{"bar"}, missing: []string{"foobar"}, redundant: []string{}, }, { actual: []string{"foo", "bar", "foobar"}, cached: []string{"bar", "foobar", "foo"}, + queued: []string{}, + missing: []string{}, + redundant: []string{}, + }, + { + actual: []string{"foo", "bar", "foobar"}, + cached: []string{"foobar", "foo"}, + queued: []string{"bar"}, missing: []string{}, redundant: []string{}, }, @@ -117,6 +142,13 @@ func TestComparePods(t *testing.T) { pods = append(pods, pod) } + queuedPods := []*v1.Pod{} + for _, uid := range test.queued { + pod := &v1.Pod{} + pod.UID = types.UID(uid) + queuedPods = append(queuedPods, pod) + } + nodeInfo := make(map[string]*schedulercache.NodeInfo) for _, uid := range test.cached { pod := &v1.Pod{} @@ -127,7 +159,7 @@ func TestComparePods(t *testing.T) { nodeInfo[uid] = schedulercache.NewNodeInfo(pod) } - m, r := compare.ComparePods(pods, nodeInfo) + m, r := compare.ComparePods(pods, queuedPods, nodeInfo) if !reflect.DeepEqual(m, test.missing) { t.Errorf("missing expected to be %s; got %s", test.missing, m) diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 8c276269216..11e30a71074 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -303,6 +303,7 @@ func NewConfigFactory( nodeLister: nodeInformer.Lister(), pdbLister: pdbInformer.Lister(), cache: c.schedulerCache, + podQueue: c.podQueue, } ch := make(chan os.Signal, 1) diff --git a/pkg/scheduler/schedulercache/cache.go b/pkg/scheduler/schedulercache/cache.go index 18545979209..b84147535de 100644 --- a/pkg/scheduler/schedulercache/cache.go +++ b/pkg/scheduler/schedulercache/cache.go @@ -80,7 +80,8 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul } } -// Snapshot takes a snapshot of the current schedulerCache. +// Snapshot takes a snapshot of the current schedulerCache. The method has performance impact, +// and should be only used in non-critical path. func (cache *schedulerCache) Snapshot() *Snapshot { cache.mu.Lock() defer cache.mu.Unlock()