Pod comparer should count pods in scheduling queue

Pods in scheduler cache contains both the scheduled pods and those not
scheduled yet in scheduling queue. This commit adds the second group of
pods into consideration while comparing the cache.
This commit is contained in:
Yongkun Anfernee Gui 2018-03-12 14:43:01 -07:00
parent eba9528753
commit cda749c237
5 changed files with 72 additions and 5 deletions

View File

@ -39,8 +39,9 @@ import (
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
"k8s.io/kubernetes/pkg/scheduler/util"
"github.com/golang/glog"
"reflect"
"github.com/golang/glog"
)
// SchedulingQueue is an interface for a queue to store pods waiting to be scheduled.
@ -57,6 +58,7 @@ type SchedulingQueue interface {
AssignedPodAdded(pod *v1.Pod)
AssignedPodUpdated(pod *v1.Pod)
WaitingPodsForNode(nodeName string) []*v1.Pod
WaitingPods() []*v1.Pod
}
// NewSchedulingQueue initializes a new scheduling queue. If pod priority is
@ -116,6 +118,15 @@ func (f *FIFO) Pop() (*v1.Pod, error) {
return result.(*v1.Pod), nil
}
// WaitingPods returns all the waiting pods in the queue.
func (f *FIFO) WaitingPods() []*v1.Pod {
result := []*v1.Pod{}
for _, pod := range f.FIFO.List() {
result = append(result, pod.(*v1.Pod))
}
return result
}
// FIFO does not need to react to events, as all pods are always in the active
// scheduling queue anyway.
@ -460,6 +471,21 @@ func (p *PriorityQueue) WaitingPodsForNode(nodeName string) []*v1.Pod {
return nil
}
// WaitingPods returns all the waiting pods in the queue.
func (p *PriorityQueue) WaitingPods() []*v1.Pod {
p.lock.Lock()
defer p.lock.Unlock()
result := []*v1.Pod{}
for _, pod := range p.activeQ.List() {
result = append(result, pod.(*v1.Pod))
}
for _, pod := range p.unschedulableQ.pods {
result = append(result, pod)
}
return result
}
// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
// is used to implement unschedulableQ.
type UnschedulablePodsMap struct {

View File

@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
corelisters "k8s.io/client-go/listers/core/v1"
v1beta1 "k8s.io/client-go/listers/policy/v1beta1"
"k8s.io/kubernetes/pkg/scheduler/core"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
)
@ -34,6 +35,7 @@ type cacheComparer struct {
podLister corelisters.PodLister
pdbLister v1beta1.PodDisruptionBudgetLister
cache schedulercache.Cache
podQueue core.SchedulingQueue
compareStrategy
}
@ -59,11 +61,13 @@ func (c *cacheComparer) Compare() error {
snapshot := c.cache.Snapshot()
waitingPods := c.podQueue.WaitingPods()
if missed, redundant := c.CompareNodes(nodes, snapshot.Nodes); len(missed)+len(redundant) != 0 {
glog.Warningf("cache mismatch: missed nodes: %s; redundant nodes: %s", missed, redundant)
}
if missed, redundant := c.ComparePods(pods, snapshot.Nodes); len(missed)+len(redundant) != 0 {
if missed, redundant := c.ComparePods(pods, waitingPods, snapshot.Nodes); len(missed)+len(redundant) != 0 {
glog.Warningf("cache mismatch: missed pods: %s; redundant pods: %s", missed, redundant)
}
@ -91,7 +95,7 @@ func (c compareStrategy) CompareNodes(nodes []*v1.Node, nodeinfos map[string]*sc
return compareStrings(actual, cached)
}
func (c compareStrategy) ComparePods(pods []*v1.Pod, nodeinfos map[string]*schedulercache.NodeInfo) (missed, redundant []string) {
func (c compareStrategy) ComparePods(pods, waitingPods []*v1.Pod, nodeinfos map[string]*schedulercache.NodeInfo) (missed, redundant []string) {
actual := []string{}
for _, pod := range pods {
actual = append(actual, string(pod.UID))
@ -103,6 +107,9 @@ func (c compareStrategy) ComparePods(pods []*v1.Pod, nodeinfos map[string]*sched
cached = append(cached, string(pod.UID))
}
}
for _, pod := range waitingPods {
cached = append(cached, string(pod.UID))
}
return compareStrings(actual, cached)
}

View File

@ -86,24 +86,49 @@ func TestComparePods(t *testing.T) {
tests := []struct {
actual []string
cached []string
queued []string
missing []string
redundant []string
}{
{
actual: []string{"foo", "bar"},
cached: []string{"bar", "foo", "foobar"},
queued: []string{},
missing: []string{},
redundant: []string{"foobar"},
},
{
actual: []string{"foo", "bar"},
cached: []string{"foo", "foobar"},
queued: []string{"bar"},
missing: []string{},
redundant: []string{"foobar"},
},
{
actual: []string{"foo", "bar", "foobar"},
cached: []string{"bar", "foo"},
queued: []string{},
missing: []string{"foobar"},
redundant: []string{},
},
{
actual: []string{"foo", "bar", "foobar"},
cached: []string{"foo"},
queued: []string{"bar"},
missing: []string{"foobar"},
redundant: []string{},
},
{
actual: []string{"foo", "bar", "foobar"},
cached: []string{"bar", "foobar", "foo"},
queued: []string{},
missing: []string{},
redundant: []string{},
},
{
actual: []string{"foo", "bar", "foobar"},
cached: []string{"foobar", "foo"},
queued: []string{"bar"},
missing: []string{},
redundant: []string{},
},
@ -117,6 +142,13 @@ func TestComparePods(t *testing.T) {
pods = append(pods, pod)
}
queuedPods := []*v1.Pod{}
for _, uid := range test.queued {
pod := &v1.Pod{}
pod.UID = types.UID(uid)
queuedPods = append(queuedPods, pod)
}
nodeInfo := make(map[string]*schedulercache.NodeInfo)
for _, uid := range test.cached {
pod := &v1.Pod{}
@ -127,7 +159,7 @@ func TestComparePods(t *testing.T) {
nodeInfo[uid] = schedulercache.NewNodeInfo(pod)
}
m, r := compare.ComparePods(pods, nodeInfo)
m, r := compare.ComparePods(pods, queuedPods, nodeInfo)
if !reflect.DeepEqual(m, test.missing) {
t.Errorf("missing expected to be %s; got %s", test.missing, m)

View File

@ -303,6 +303,7 @@ func NewConfigFactory(
nodeLister: nodeInformer.Lister(),
pdbLister: pdbInformer.Lister(),
cache: c.schedulerCache,
podQueue: c.podQueue,
}
ch := make(chan os.Signal, 1)

View File

@ -80,7 +80,8 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul
}
}
// Snapshot takes a snapshot of the current schedulerCache.
// Snapshot takes a snapshot of the current schedulerCache. The method has performance impact,
// and should be only used in non-critical path.
func (cache *schedulerCache) Snapshot() *Snapshot {
cache.mu.Lock()
defer cache.mu.Unlock()