diff --git a/pkg/scheduler/core/scheduling_queue.go b/pkg/scheduler/core/scheduling_queue.go index 9145416658b..0710a27f856 100644 --- a/pkg/scheduler/core/scheduling_queue.go +++ b/pkg/scheduler/core/scheduling_queue.go @@ -39,8 +39,9 @@ import ( priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" "k8s.io/kubernetes/pkg/scheduler/util" - "github.com/golang/glog" "reflect" + + "github.com/golang/glog" ) // SchedulingQueue is an interface for a queue to store pods waiting to be scheduled. @@ -57,6 +58,7 @@ type SchedulingQueue interface { AssignedPodAdded(pod *v1.Pod) AssignedPodUpdated(pod *v1.Pod) WaitingPodsForNode(nodeName string) []*v1.Pod + WaitingPods() []*v1.Pod } // NewSchedulingQueue initializes a new scheduling queue. If pod priority is @@ -116,6 +118,15 @@ func (f *FIFO) Pop() (*v1.Pod, error) { return result.(*v1.Pod), nil } +// WaitingPods returns all the waiting pods in the queue. +func (f *FIFO) WaitingPods() []*v1.Pod { + result := []*v1.Pod{} + for _, pod := range f.FIFO.List() { + result = append(result, pod.(*v1.Pod)) + } + return result +} + // FIFO does not need to react to events, as all pods are always in the active // scheduling queue anyway. @@ -460,6 +471,21 @@ func (p *PriorityQueue) WaitingPodsForNode(nodeName string) []*v1.Pod { return nil } +// WaitingPods returns all the waiting pods in the queue. +func (p *PriorityQueue) WaitingPods() []*v1.Pod { + p.lock.Lock() + defer p.lock.Unlock() + + result := []*v1.Pod{} + for _, pod := range p.activeQ.List() { + result = append(result, pod.(*v1.Pod)) + } + for _, pod := range p.unschedulableQ.pods { + result = append(result, pod) + } + return result +} + // UnschedulablePodsMap holds pods that cannot be scheduled. This data structure // is used to implement unschedulableQ. type UnschedulablePodsMap struct { diff --git a/pkg/scheduler/factory/cache_comparer.go b/pkg/scheduler/factory/cache_comparer.go index a8821dbf6a8..b93cff7f2b6 100644 --- a/pkg/scheduler/factory/cache_comparer.go +++ b/pkg/scheduler/factory/cache_comparer.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/labels" corelisters "k8s.io/client-go/listers/core/v1" v1beta1 "k8s.io/client-go/listers/policy/v1beta1" + "k8s.io/kubernetes/pkg/scheduler/core" "k8s.io/kubernetes/pkg/scheduler/schedulercache" ) @@ -34,6 +35,7 @@ type cacheComparer struct { podLister corelisters.PodLister pdbLister v1beta1.PodDisruptionBudgetLister cache schedulercache.Cache + podQueue core.SchedulingQueue compareStrategy } @@ -59,11 +61,13 @@ func (c *cacheComparer) Compare() error { snapshot := c.cache.Snapshot() + waitingPods := c.podQueue.WaitingPods() + if missed, redundant := c.CompareNodes(nodes, snapshot.Nodes); len(missed)+len(redundant) != 0 { glog.Warningf("cache mismatch: missed nodes: %s; redundant nodes: %s", missed, redundant) } - if missed, redundant := c.ComparePods(pods, snapshot.Nodes); len(missed)+len(redundant) != 0 { + if missed, redundant := c.ComparePods(pods, waitingPods, snapshot.Nodes); len(missed)+len(redundant) != 0 { glog.Warningf("cache mismatch: missed pods: %s; redundant pods: %s", missed, redundant) } @@ -91,7 +95,7 @@ func (c compareStrategy) CompareNodes(nodes []*v1.Node, nodeinfos map[string]*sc return compareStrings(actual, cached) } -func (c compareStrategy) ComparePods(pods []*v1.Pod, nodeinfos map[string]*schedulercache.NodeInfo) (missed, redundant []string) { +func (c compareStrategy) ComparePods(pods, waitingPods []*v1.Pod, nodeinfos map[string]*schedulercache.NodeInfo) (missed, redundant []string) { actual := []string{} for _, pod := range pods { actual = append(actual, string(pod.UID)) @@ -103,6 +107,9 @@ func (c compareStrategy) ComparePods(pods []*v1.Pod, nodeinfos map[string]*sched cached = append(cached, string(pod.UID)) } } + for _, pod := range waitingPods { + cached = append(cached, string(pod.UID)) + } return compareStrings(actual, cached) } diff --git a/pkg/scheduler/factory/cache_comparer_test.go b/pkg/scheduler/factory/cache_comparer_test.go index ff87e400e21..03f44595c62 100644 --- a/pkg/scheduler/factory/cache_comparer_test.go +++ b/pkg/scheduler/factory/cache_comparer_test.go @@ -86,24 +86,49 @@ func TestComparePods(t *testing.T) { tests := []struct { actual []string cached []string + queued []string missing []string redundant []string }{ { actual: []string{"foo", "bar"}, cached: []string{"bar", "foo", "foobar"}, + queued: []string{}, + missing: []string{}, + redundant: []string{"foobar"}, + }, + { + actual: []string{"foo", "bar"}, + cached: []string{"foo", "foobar"}, + queued: []string{"bar"}, missing: []string{}, redundant: []string{"foobar"}, }, { actual: []string{"foo", "bar", "foobar"}, cached: []string{"bar", "foo"}, + queued: []string{}, + missing: []string{"foobar"}, + redundant: []string{}, + }, + { + actual: []string{"foo", "bar", "foobar"}, + cached: []string{"foo"}, + queued: []string{"bar"}, missing: []string{"foobar"}, redundant: []string{}, }, { actual: []string{"foo", "bar", "foobar"}, cached: []string{"bar", "foobar", "foo"}, + queued: []string{}, + missing: []string{}, + redundant: []string{}, + }, + { + actual: []string{"foo", "bar", "foobar"}, + cached: []string{"foobar", "foo"}, + queued: []string{"bar"}, missing: []string{}, redundant: []string{}, }, @@ -117,6 +142,13 @@ func TestComparePods(t *testing.T) { pods = append(pods, pod) } + queuedPods := []*v1.Pod{} + for _, uid := range test.queued { + pod := &v1.Pod{} + pod.UID = types.UID(uid) + queuedPods = append(queuedPods, pod) + } + nodeInfo := make(map[string]*schedulercache.NodeInfo) for _, uid := range test.cached { pod := &v1.Pod{} @@ -127,7 +159,7 @@ func TestComparePods(t *testing.T) { nodeInfo[uid] = schedulercache.NewNodeInfo(pod) } - m, r := compare.ComparePods(pods, nodeInfo) + m, r := compare.ComparePods(pods, queuedPods, nodeInfo) if !reflect.DeepEqual(m, test.missing) { t.Errorf("missing expected to be %s; got %s", test.missing, m) diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 8c276269216..11e30a71074 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -303,6 +303,7 @@ func NewConfigFactory( nodeLister: nodeInformer.Lister(), pdbLister: pdbInformer.Lister(), cache: c.schedulerCache, + podQueue: c.podQueue, } ch := make(chan os.Signal, 1) diff --git a/pkg/scheduler/schedulercache/cache.go b/pkg/scheduler/schedulercache/cache.go index 18545979209..b84147535de 100644 --- a/pkg/scheduler/schedulercache/cache.go +++ b/pkg/scheduler/schedulercache/cache.go @@ -80,7 +80,8 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul } } -// Snapshot takes a snapshot of the current schedulerCache. +// Snapshot takes a snapshot of the current schedulerCache. The method has performance impact, +// and should be only used in non-critical path. func (cache *schedulerCache) Snapshot() *Snapshot { cache.mu.Lock() defer cache.mu.Unlock()