From 73a9836d69d4e6c0dbf844b27c710d39aec50e65 Mon Sep 17 00:00:00 2001 From: Yongkun Anfernee Gui Date: Wed, 7 Mar 2018 15:12:53 -0800 Subject: [PATCH 1/4] Support snapshotting a scheduler cache Towards #60860 --- pkg/scheduler/schedulercache/cache.go | 27 +++++++++++++++ pkg/scheduler/schedulercache/cache_test.go | 40 ++++++++++++++++++++++ pkg/scheduler/schedulercache/interface.go | 10 ++++++ pkg/scheduler/testing/fake_cache.go | 4 +++ 4 files changed, 81 insertions(+) diff --git a/pkg/scheduler/schedulercache/cache.go b/pkg/scheduler/schedulercache/cache.go index 78cdca25b3e..18545979209 100644 --- a/pkg/scheduler/schedulercache/cache.go +++ b/pkg/scheduler/schedulercache/cache.go @@ -80,6 +80,33 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul } } +// Snapshot takes a snapshot of the current schedulerCache. +func (cache *schedulerCache) Snapshot() *Snapshot { + cache.mu.Lock() + defer cache.mu.Unlock() + + nodes := make(map[string]*NodeInfo) + for k, v := range cache.nodes { + nodes[k] = v.Clone() + } + + assumedPods := make(map[string]bool) + for k, v := range cache.assumedPods { + assumedPods[k] = v + } + + pdbs := make(map[string]*policy.PodDisruptionBudget) + for k, v := range cache.pdbs { + pdbs[k] = v.DeepCopy() + } + + return &Snapshot{ + Nodes: nodes, + AssumedPods: assumedPods, + Pdbs: pdbs, + } +} + func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*NodeInfo) error { cache.mu.Lock() defer cache.mu.Unlock() diff --git a/pkg/scheduler/schedulercache/cache_test.go b/pkg/scheduler/schedulercache/cache_test.go index 0eb184bdf3b..04677e7b340 100644 --- a/pkg/scheduler/schedulercache/cache_test.go +++ b/pkg/scheduler/schedulercache/cache_test.go @@ -325,6 +325,46 @@ func TestAddPodWillConfirm(t *testing.T) { } } +func TestSnapshot(t *testing.T) { + nodeName := "node" + now := time.Now() + ttl := 10 * time.Second + + testPods := []*v1.Pod{ + makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), + makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), + } + tests := []struct { + podsToAssume []*v1.Pod + podsToAdd []*v1.Pod + }{{ // two pod were assumed at same time. But first one is called Add() and gets confirmed. + podsToAssume: []*v1.Pod{testPods[0], testPods[1]}, + podsToAdd: []*v1.Pod{testPods[0]}, + }} + + for _, tt := range tests { + cache := newSchedulerCache(ttl, time.Second, nil) + for _, podToAssume := range tt.podsToAssume { + if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { + t.Fatalf("assumePod failed: %v", err) + } + } + for _, podToAdd := range tt.podsToAdd { + if err := cache.AddPod(podToAdd); err != nil { + t.Fatalf("AddPod failed: %v", err) + } + } + + snapshot := cache.Snapshot() + if !reflect.DeepEqual(snapshot.Nodes, cache.nodes) { + t.Fatalf("expect \n%+v; got \n%+v", cache.nodes, snapshot.Nodes) + } + if !reflect.DeepEqual(snapshot.AssumedPods, cache.assumedPods) { + t.Fatalf("expect \n%+v; got \n%+v", cache.assumedPods, snapshot.AssumedPods) + } + } +} + // TestAddPodWillReplaceAssumed tests that a pod being Add()ed will replace any assumed pod. func TestAddPodWillReplaceAssumed(t *testing.T) { now := time.Now() diff --git a/pkg/scheduler/schedulercache/interface.go b/pkg/scheduler/schedulercache/interface.go index d3c65313b83..8e4f4909c75 100644 --- a/pkg/scheduler/schedulercache/interface.go +++ b/pkg/scheduler/schedulercache/interface.go @@ -119,4 +119,14 @@ type Cache interface { // FilteredList returns all cached pods that pass the filter. FilteredList(filter PodFilter, selector labels.Selector) ([]*v1.Pod, error) + + // Snapshot takes a snapshot on current cache + Snapshot() *Snapshot +} + +// Snapshot is a snapshot of cache state +type Snapshot struct { + AssumedPods map[string]bool + Nodes map[string]*NodeInfo + Pdbs map[string]*policy.PodDisruptionBudget } diff --git a/pkg/scheduler/testing/fake_cache.go b/pkg/scheduler/testing/fake_cache.go index 21c4528ae21..6e1143d2e70 100644 --- a/pkg/scheduler/testing/fake_cache.go +++ b/pkg/scheduler/testing/fake_cache.go @@ -100,3 +100,7 @@ func (f *FakeCache) List(s labels.Selector) ([]*v1.Pod, error) { return nil, nil func (f *FakeCache) FilteredList(filter schedulercache.PodFilter, selector labels.Selector) ([]*v1.Pod, error) { return nil, nil } + +func (f *FakeCache) Snapshot() *schedulercache.Snapshot { + return &schedulercache.Snapshot{} +} From fda0d07eb66951ae76aafd8c415fd29b612e67d8 Mon Sep 17 00:00:00 2001 From: Yongkun Anfernee Gui Date: Thu, 8 Mar 2018 16:32:53 -0800 Subject: [PATCH 2/4] Scheduler cache comparer A debug tool that collects resources from api server and compares it with the scheduler cache. It currently only compares the node list, but it should be easy to extend. The compare is triggered by signal USER2, by doing kill -12 ${SCHED_PID} The compare result goes to scheduler log. Towards #60860 --- pkg/scheduler/factory/BUILD | 39 +++++++- pkg/scheduler/factory/cache_comparer.go | 99 ++++++++++++++++++++ pkg/scheduler/factory/cache_comparer_test.go | 79 ++++++++++++++++ pkg/scheduler/factory/factory.go | 23 +++++ pkg/scheduler/factory/signal.go | 25 +++++ pkg/scheduler/factory/signal_windows.go | 23 +++++ pkg/scheduler/testing/fake_cache.go | 1 + 7 files changed, 288 insertions(+), 1 deletion(-) create mode 100644 pkg/scheduler/factory/cache_comparer.go create mode 100644 pkg/scheduler/factory/cache_comparer_test.go create mode 100644 pkg/scheduler/factory/signal.go create mode 100644 pkg/scheduler/factory/signal_windows.go diff --git a/pkg/scheduler/factory/BUILD b/pkg/scheduler/factory/BUILD index a4c87066f6a..05b52544892 100644 --- a/pkg/scheduler/factory/BUILD +++ b/pkg/scheduler/factory/BUILD @@ -9,9 +9,45 @@ load( go_library( name = "go_default_library", srcs = [ + "cache_comparer.go", "factory.go", "plugins.go", - ], + ] + select({ + "@io_bazel_rules_go//go/platform:android": [ + "signal.go", + ], + "@io_bazel_rules_go//go/platform:darwin": [ + "signal.go", + ], + "@io_bazel_rules_go//go/platform:dragonfly": [ + "signal.go", + ], + "@io_bazel_rules_go//go/platform:freebsd": [ + "signal.go", + ], + "@io_bazel_rules_go//go/platform:linux": [ + "signal.go", + ], + "@io_bazel_rules_go//go/platform:nacl": [ + "signal.go", + ], + "@io_bazel_rules_go//go/platform:netbsd": [ + "signal.go", + ], + "@io_bazel_rules_go//go/platform:openbsd": [ + "signal.go", + ], + "@io_bazel_rules_go//go/platform:plan9": [ + "signal.go", + ], + "@io_bazel_rules_go//go/platform:solaris": [ + "signal.go", + ], + "@io_bazel_rules_go//go/platform:windows": [ + "signal_windows.go", + ], + "//conditions:default": [], + }), importpath = "k8s.io/kubernetes/pkg/scheduler/factory", deps = [ "//pkg/api/v1/pod:go_default_library", @@ -59,6 +95,7 @@ go_library( go_test( name = "go_default_test", srcs = [ + "cache_comparer_test.go", "factory_test.go", "plugins_test.go", ], diff --git a/pkg/scheduler/factory/cache_comparer.go b/pkg/scheduler/factory/cache_comparer.go new file mode 100644 index 00000000000..3a6227af3cd --- /dev/null +++ b/pkg/scheduler/factory/cache_comparer.go @@ -0,0 +1,99 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package factory + +import ( + "sort" + "strings" + + "github.com/golang/glog" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/kubernetes/pkg/scheduler/schedulercache" +) + +type cacheComparer struct { + nodeLister corelisters.NodeLister + podLister corelisters.PodLister + cache schedulercache.Cache + + compareStrategy +} + +func (c *cacheComparer) Compare() error { + glog.V(3).Info("cache comparer started") + defer glog.V(3).Info("cache comparer finished") + + nodes, err := c.nodeLister.List(labels.Everything()) + if err != nil { + return err + } + + snapshot := c.cache.Snapshot() + + if missed, redundant := c.CompareNodes(nodes, snapshot.Nodes); len(missed)+len(redundant) != 0 { + glog.Warningf("cache mismatch: missed nodes: %s; redundant nodes: %s", missed, redundant) + } + + return nil +} + +type compareStrategy struct { +} + +func (c compareStrategy) CompareNodes(nodes []*v1.Node, nodeinfos map[string]*schedulercache.NodeInfo) (missed, redundant []string) { + missed, redundant = []string{}, []string{} + + actual := []string{} + for _, node := range nodes { + actual = append(actual, node.Name) + } + + cached := []string{} + for nodeName := range nodeinfos { + cached = append(cached, nodeName) + } + + sort.Strings(actual) + sort.Strings(cached) + + compare := func(i, j int) int { + if i == len(actual) { + return 1 + } else if j == len(cached) { + return -1 + } + return strings.Compare(actual[i], cached[j]) + } + + for i, j := 0, 0; i < len(actual) || j < len(cached); { + switch compare(i, j) { + case 0: + i++ + j++ + case -1: + missed = append(missed, actual[i]) + i++ + case 1: + redundant = append(redundant, cached[j]) + j++ + } + } + + return +} diff --git a/pkg/scheduler/factory/cache_comparer_test.go b/pkg/scheduler/factory/cache_comparer_test.go new file mode 100644 index 00000000000..ae6622f4b35 --- /dev/null +++ b/pkg/scheduler/factory/cache_comparer_test.go @@ -0,0 +1,79 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package factory + +import ( + "reflect" + "testing" + + "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/scheduler/schedulercache" +) + +func TestCompareNodes(t *testing.T) { + compare := compareStrategy{} + + tests := []struct { + actual []string + cached []string + missing []string + redundant []string + }{ + { + actual: []string{"foo", "bar"}, + cached: []string{"bar", "foo", "foobar"}, + missing: []string{}, + redundant: []string{"foobar"}, + }, + { + actual: []string{"foo", "bar", "foobar"}, + cached: []string{"bar", "foo"}, + missing: []string{"foobar"}, + redundant: []string{}, + }, + { + actual: []string{"foo", "bar", "foobar"}, + cached: []string{"bar", "foobar", "foo"}, + missing: []string{}, + redundant: []string{}, + }, + } + + for _, test := range tests { + nodes := []*v1.Node{} + for _, nodeName := range test.actual { + node := &v1.Node{} + node.Name = nodeName + nodes = append(nodes, node) + } + + nodeInfo := make(map[string]*schedulercache.NodeInfo) + for _, nodeName := range test.cached { + nodeInfo[nodeName] = &schedulercache.NodeInfo{} + } + + m, r := compare.CompareNodes(nodes, nodeInfo) + + if !reflect.DeepEqual(m, test.missing) { + t.Errorf("missing expected to be %s; got %s", test.missing, m) + } + + if !reflect.DeepEqual(r, test.redundant) { + t.Errorf("redundant expected to be %s; got %s", test.redundant, r) + } + } +} diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 407d5b85c12..9f089004663 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -20,6 +20,8 @@ package factory import ( "fmt" + "os" + "os/signal" "reflect" "time" @@ -295,6 +297,27 @@ func NewConfigFactory( c.volumeBinder = volumebinder.NewVolumeBinder(client, pvcInformer, pvInformer, nodeInformer, storageClassInformer) } + // Setup cache comparer + comparer := &cacheComparer{ + podLister: podInformer.Lister(), + nodeLister: nodeInformer.Lister(), + cache: c.schedulerCache, + } + + ch := make(chan os.Signal, 1) + signal.Notify(ch, compareSignal) + + go func() { + for { + select { + case <-c.StopEverything: + return + case <-ch: + comparer.Compare() + } + } + }() + return c } diff --git a/pkg/scheduler/factory/signal.go b/pkg/scheduler/factory/signal.go new file mode 100644 index 00000000000..8ec17048ebb --- /dev/null +++ b/pkg/scheduler/factory/signal.go @@ -0,0 +1,25 @@ +// +build !windows + +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package factory + +import "syscall" + +// compareSignal is the signal to trigger cache compare. For non-windows +// environment it's SIGUSR2. +var compareSignal = syscall.SIGUSR2 diff --git a/pkg/scheduler/factory/signal_windows.go b/pkg/scheduler/factory/signal_windows.go new file mode 100644 index 00000000000..9df239874c3 --- /dev/null +++ b/pkg/scheduler/factory/signal_windows.go @@ -0,0 +1,23 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package factory + +import "os" + +// compareSignal is the signal to trigger cache compare. For windows, +// it's SIGINT. +var compareSignal = os.Interrupt diff --git a/pkg/scheduler/testing/fake_cache.go b/pkg/scheduler/testing/fake_cache.go index 6e1143d2e70..e958a14b2f7 100644 --- a/pkg/scheduler/testing/fake_cache.go +++ b/pkg/scheduler/testing/fake_cache.go @@ -101,6 +101,7 @@ func (f *FakeCache) FilteredList(filter schedulercache.PodFilter, selector label return nil, nil } +// Snapshot is a fake method for testing func (f *FakeCache) Snapshot() *schedulercache.Snapshot { return &schedulercache.Snapshot{} } From eba9528753a9263fdafd2548d13089b03601607f Mon Sep 17 00:00:00 2001 From: Yongkun Anfernee Gui Date: Fri, 9 Mar 2018 11:32:45 -0800 Subject: [PATCH 3/4] Add cache comparison for pods and pdbs --- pkg/scheduler/factory/BUILD | 2 + pkg/scheduler/factory/cache_comparer.go | 59 +++++++++- pkg/scheduler/factory/cache_comparer_test.go | 117 +++++++++++++++++++ pkg/scheduler/factory/factory.go | 1 + 4 files changed, 177 insertions(+), 2 deletions(-) diff --git a/pkg/scheduler/factory/BUILD b/pkg/scheduler/factory/BUILD index 05b52544892..57cf5e0e9c7 100644 --- a/pkg/scheduler/factory/BUILD +++ b/pkg/scheduler/factory/BUILD @@ -112,8 +112,10 @@ go_test( "//pkg/scheduler/testing:go_default_library", "//pkg/scheduler/util:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/api/policy/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", diff --git a/pkg/scheduler/factory/cache_comparer.go b/pkg/scheduler/factory/cache_comparer.go index 3a6227af3cd..a8821dbf6a8 100644 --- a/pkg/scheduler/factory/cache_comparer.go +++ b/pkg/scheduler/factory/cache_comparer.go @@ -22,14 +22,17 @@ import ( "github.com/golang/glog" "k8s.io/api/core/v1" + policy "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/labels" corelisters "k8s.io/client-go/listers/core/v1" + v1beta1 "k8s.io/client-go/listers/policy/v1beta1" "k8s.io/kubernetes/pkg/scheduler/schedulercache" ) type cacheComparer struct { nodeLister corelisters.NodeLister podLister corelisters.PodLister + pdbLister v1beta1.PodDisruptionBudgetLister cache schedulercache.Cache compareStrategy @@ -44,12 +47,30 @@ func (c *cacheComparer) Compare() error { return err } + pods, err := c.podLister.List(labels.Everything()) + if err != nil { + return err + } + + pdbs, err := c.pdbLister.List(labels.Everything()) + if err != nil { + return err + } + snapshot := c.cache.Snapshot() if missed, redundant := c.CompareNodes(nodes, snapshot.Nodes); len(missed)+len(redundant) != 0 { glog.Warningf("cache mismatch: missed nodes: %s; redundant nodes: %s", missed, redundant) } + if missed, redundant := c.ComparePods(pods, snapshot.Nodes); len(missed)+len(redundant) != 0 { + glog.Warningf("cache mismatch: missed pods: %s; redundant pods: %s", missed, redundant) + } + + if missed, redundant := c.ComparePdbs(pdbs, snapshot.Pdbs); len(missed)+len(redundant) != 0 { + glog.Warningf("cache mismatch: missed pdbs: %s; redundant pdbs: %s", missed, redundant) + } + return nil } @@ -57,8 +78,6 @@ type compareStrategy struct { } func (c compareStrategy) CompareNodes(nodes []*v1.Node, nodeinfos map[string]*schedulercache.NodeInfo) (missed, redundant []string) { - missed, redundant = []string{}, []string{} - actual := []string{} for _, node := range nodes { actual = append(actual, node.Name) @@ -69,6 +88,42 @@ func (c compareStrategy) CompareNodes(nodes []*v1.Node, nodeinfos map[string]*sc cached = append(cached, nodeName) } + return compareStrings(actual, cached) +} + +func (c compareStrategy) ComparePods(pods []*v1.Pod, nodeinfos map[string]*schedulercache.NodeInfo) (missed, redundant []string) { + actual := []string{} + for _, pod := range pods { + actual = append(actual, string(pod.UID)) + } + + cached := []string{} + for _, nodeinfo := range nodeinfos { + for _, pod := range nodeinfo.Pods() { + cached = append(cached, string(pod.UID)) + } + } + + return compareStrings(actual, cached) +} + +func (c compareStrategy) ComparePdbs(pdbs []*policy.PodDisruptionBudget, pdbCache map[string]*policy.PodDisruptionBudget) (missed, redundant []string) { + actual := []string{} + for _, pdb := range pdbs { + actual = append(actual, string(pdb.Name)) + } + + cached := []string{} + for pdbName := range pdbCache { + cached = append(cached, pdbName) + } + + return compareStrings(actual, cached) +} + +func compareStrings(actual, cached []string) (missed, redundant []string) { + missed, redundant = []string{}, []string{} + sort.Strings(actual) sort.Strings(cached) diff --git a/pkg/scheduler/factory/cache_comparer_test.go b/pkg/scheduler/factory/cache_comparer_test.go index ae6622f4b35..ff87e400e21 100644 --- a/pkg/scheduler/factory/cache_comparer_test.go +++ b/pkg/scheduler/factory/cache_comparer_test.go @@ -21,6 +21,8 @@ import ( "testing" "k8s.io/api/core/v1" + policy "k8s.io/api/policy/v1beta1" + "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/scheduler/schedulercache" ) @@ -77,3 +79,118 @@ func TestCompareNodes(t *testing.T) { } } } + +func TestComparePods(t *testing.T) { + compare := compareStrategy{} + + tests := []struct { + actual []string + cached []string + missing []string + redundant []string + }{ + { + actual: []string{"foo", "bar"}, + cached: []string{"bar", "foo", "foobar"}, + missing: []string{}, + redundant: []string{"foobar"}, + }, + { + actual: []string{"foo", "bar", "foobar"}, + cached: []string{"bar", "foo"}, + missing: []string{"foobar"}, + redundant: []string{}, + }, + { + actual: []string{"foo", "bar", "foobar"}, + cached: []string{"bar", "foobar", "foo"}, + missing: []string{}, + redundant: []string{}, + }, + } + + for _, test := range tests { + pods := []*v1.Pod{} + for _, uid := range test.actual { + pod := &v1.Pod{} + pod.UID = types.UID(uid) + pods = append(pods, pod) + } + + nodeInfo := make(map[string]*schedulercache.NodeInfo) + for _, uid := range test.cached { + pod := &v1.Pod{} + pod.UID = types.UID(uid) + pod.Namespace = "ns" + pod.Name = uid + + nodeInfo[uid] = schedulercache.NewNodeInfo(pod) + } + + m, r := compare.ComparePods(pods, nodeInfo) + + if !reflect.DeepEqual(m, test.missing) { + t.Errorf("missing expected to be %s; got %s", test.missing, m) + } + + if !reflect.DeepEqual(r, test.redundant) { + t.Errorf("redundant expected to be %s; got %s", test.redundant, r) + } + } +} + +func TestComparePdbs(t *testing.T) { + compare := compareStrategy{} + + tests := []struct { + actual []string + cached []string + missing []string + redundant []string + }{ + { + actual: []string{"foo", "bar"}, + cached: []string{"bar", "foo", "foobar"}, + missing: []string{}, + redundant: []string{"foobar"}, + }, + { + actual: []string{"foo", "bar", "foobar"}, + cached: []string{"bar", "foo"}, + missing: []string{"foobar"}, + redundant: []string{}, + }, + { + actual: []string{"foo", "bar", "foobar"}, + cached: []string{"bar", "foobar", "foo"}, + missing: []string{}, + redundant: []string{}, + }, + } + + for _, test := range tests { + pdbs := []*policy.PodDisruptionBudget{} + for _, name := range test.actual { + pdb := &policy.PodDisruptionBudget{} + pdb.Name = name + pdbs = append(pdbs, pdb) + } + + cache := make(map[string]*policy.PodDisruptionBudget) + for _, name := range test.cached { + pdb := &policy.PodDisruptionBudget{} + pdb.Name = name + cache[name] = pdb + } + + m, r := compare.ComparePdbs(pdbs, cache) + + if !reflect.DeepEqual(m, test.missing) { + t.Errorf("missing expected to be %s; got %s", test.missing, m) + } + + if !reflect.DeepEqual(r, test.redundant) { + t.Errorf("redundant expected to be %s; got %s", test.redundant, r) + } + } +} diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 9f089004663..8c276269216 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -301,6 +301,7 @@ func NewConfigFactory( comparer := &cacheComparer{ podLister: podInformer.Lister(), nodeLister: nodeInformer.Lister(), + pdbLister: pdbInformer.Lister(), cache: c.schedulerCache, } From cda749c2376b3a3cfcecf9929728e80f4961ebac Mon Sep 17 00:00:00 2001 From: Yongkun Anfernee Gui Date: Mon, 12 Mar 2018 14:43:01 -0700 Subject: [PATCH 4/4] Pod comparer should count pods in scheduling queue Pods in scheduler cache contains both the scheduled pods and those not scheduled yet in scheduling queue. This commit adds the second group of pods into consideration while comparing the cache. --- pkg/scheduler/core/scheduling_queue.go | 28 +++++++++++++++- pkg/scheduler/factory/cache_comparer.go | 11 +++++-- pkg/scheduler/factory/cache_comparer_test.go | 34 +++++++++++++++++++- pkg/scheduler/factory/factory.go | 1 + pkg/scheduler/schedulercache/cache.go | 3 +- 5 files changed, 72 insertions(+), 5 deletions(-) diff --git a/pkg/scheduler/core/scheduling_queue.go b/pkg/scheduler/core/scheduling_queue.go index 9145416658b..0710a27f856 100644 --- a/pkg/scheduler/core/scheduling_queue.go +++ b/pkg/scheduler/core/scheduling_queue.go @@ -39,8 +39,9 @@ import ( priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" "k8s.io/kubernetes/pkg/scheduler/util" - "github.com/golang/glog" "reflect" + + "github.com/golang/glog" ) // SchedulingQueue is an interface for a queue to store pods waiting to be scheduled. @@ -57,6 +58,7 @@ type SchedulingQueue interface { AssignedPodAdded(pod *v1.Pod) AssignedPodUpdated(pod *v1.Pod) WaitingPodsForNode(nodeName string) []*v1.Pod + WaitingPods() []*v1.Pod } // NewSchedulingQueue initializes a new scheduling queue. If pod priority is @@ -116,6 +118,15 @@ func (f *FIFO) Pop() (*v1.Pod, error) { return result.(*v1.Pod), nil } +// WaitingPods returns all the waiting pods in the queue. +func (f *FIFO) WaitingPods() []*v1.Pod { + result := []*v1.Pod{} + for _, pod := range f.FIFO.List() { + result = append(result, pod.(*v1.Pod)) + } + return result +} + // FIFO does not need to react to events, as all pods are always in the active // scheduling queue anyway. @@ -460,6 +471,21 @@ func (p *PriorityQueue) WaitingPodsForNode(nodeName string) []*v1.Pod { return nil } +// WaitingPods returns all the waiting pods in the queue. +func (p *PriorityQueue) WaitingPods() []*v1.Pod { + p.lock.Lock() + defer p.lock.Unlock() + + result := []*v1.Pod{} + for _, pod := range p.activeQ.List() { + result = append(result, pod.(*v1.Pod)) + } + for _, pod := range p.unschedulableQ.pods { + result = append(result, pod) + } + return result +} + // UnschedulablePodsMap holds pods that cannot be scheduled. This data structure // is used to implement unschedulableQ. type UnschedulablePodsMap struct { diff --git a/pkg/scheduler/factory/cache_comparer.go b/pkg/scheduler/factory/cache_comparer.go index a8821dbf6a8..b93cff7f2b6 100644 --- a/pkg/scheduler/factory/cache_comparer.go +++ b/pkg/scheduler/factory/cache_comparer.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/labels" corelisters "k8s.io/client-go/listers/core/v1" v1beta1 "k8s.io/client-go/listers/policy/v1beta1" + "k8s.io/kubernetes/pkg/scheduler/core" "k8s.io/kubernetes/pkg/scheduler/schedulercache" ) @@ -34,6 +35,7 @@ type cacheComparer struct { podLister corelisters.PodLister pdbLister v1beta1.PodDisruptionBudgetLister cache schedulercache.Cache + podQueue core.SchedulingQueue compareStrategy } @@ -59,11 +61,13 @@ func (c *cacheComparer) Compare() error { snapshot := c.cache.Snapshot() + waitingPods := c.podQueue.WaitingPods() + if missed, redundant := c.CompareNodes(nodes, snapshot.Nodes); len(missed)+len(redundant) != 0 { glog.Warningf("cache mismatch: missed nodes: %s; redundant nodes: %s", missed, redundant) } - if missed, redundant := c.ComparePods(pods, snapshot.Nodes); len(missed)+len(redundant) != 0 { + if missed, redundant := c.ComparePods(pods, waitingPods, snapshot.Nodes); len(missed)+len(redundant) != 0 { glog.Warningf("cache mismatch: missed pods: %s; redundant pods: %s", missed, redundant) } @@ -91,7 +95,7 @@ func (c compareStrategy) CompareNodes(nodes []*v1.Node, nodeinfos map[string]*sc return compareStrings(actual, cached) } -func (c compareStrategy) ComparePods(pods []*v1.Pod, nodeinfos map[string]*schedulercache.NodeInfo) (missed, redundant []string) { +func (c compareStrategy) ComparePods(pods, waitingPods []*v1.Pod, nodeinfos map[string]*schedulercache.NodeInfo) (missed, redundant []string) { actual := []string{} for _, pod := range pods { actual = append(actual, string(pod.UID)) @@ -103,6 +107,9 @@ func (c compareStrategy) ComparePods(pods []*v1.Pod, nodeinfos map[string]*sched cached = append(cached, string(pod.UID)) } } + for _, pod := range waitingPods { + cached = append(cached, string(pod.UID)) + } return compareStrings(actual, cached) } diff --git a/pkg/scheduler/factory/cache_comparer_test.go b/pkg/scheduler/factory/cache_comparer_test.go index ff87e400e21..03f44595c62 100644 --- a/pkg/scheduler/factory/cache_comparer_test.go +++ b/pkg/scheduler/factory/cache_comparer_test.go @@ -86,24 +86,49 @@ func TestComparePods(t *testing.T) { tests := []struct { actual []string cached []string + queued []string missing []string redundant []string }{ { actual: []string{"foo", "bar"}, cached: []string{"bar", "foo", "foobar"}, + queued: []string{}, + missing: []string{}, + redundant: []string{"foobar"}, + }, + { + actual: []string{"foo", "bar"}, + cached: []string{"foo", "foobar"}, + queued: []string{"bar"}, missing: []string{}, redundant: []string{"foobar"}, }, { actual: []string{"foo", "bar", "foobar"}, cached: []string{"bar", "foo"}, + queued: []string{}, + missing: []string{"foobar"}, + redundant: []string{}, + }, + { + actual: []string{"foo", "bar", "foobar"}, + cached: []string{"foo"}, + queued: []string{"bar"}, missing: []string{"foobar"}, redundant: []string{}, }, { actual: []string{"foo", "bar", "foobar"}, cached: []string{"bar", "foobar", "foo"}, + queued: []string{}, + missing: []string{}, + redundant: []string{}, + }, + { + actual: []string{"foo", "bar", "foobar"}, + cached: []string{"foobar", "foo"}, + queued: []string{"bar"}, missing: []string{}, redundant: []string{}, }, @@ -117,6 +142,13 @@ func TestComparePods(t *testing.T) { pods = append(pods, pod) } + queuedPods := []*v1.Pod{} + for _, uid := range test.queued { + pod := &v1.Pod{} + pod.UID = types.UID(uid) + queuedPods = append(queuedPods, pod) + } + nodeInfo := make(map[string]*schedulercache.NodeInfo) for _, uid := range test.cached { pod := &v1.Pod{} @@ -127,7 +159,7 @@ func TestComparePods(t *testing.T) { nodeInfo[uid] = schedulercache.NewNodeInfo(pod) } - m, r := compare.ComparePods(pods, nodeInfo) + m, r := compare.ComparePods(pods, queuedPods, nodeInfo) if !reflect.DeepEqual(m, test.missing) { t.Errorf("missing expected to be %s; got %s", test.missing, m) diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 8c276269216..11e30a71074 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -303,6 +303,7 @@ func NewConfigFactory( nodeLister: nodeInformer.Lister(), pdbLister: pdbInformer.Lister(), cache: c.schedulerCache, + podQueue: c.podQueue, } ch := make(chan os.Signal, 1) diff --git a/pkg/scheduler/schedulercache/cache.go b/pkg/scheduler/schedulercache/cache.go index 18545979209..b84147535de 100644 --- a/pkg/scheduler/schedulercache/cache.go +++ b/pkg/scheduler/schedulercache/cache.go @@ -80,7 +80,8 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul } } -// Snapshot takes a snapshot of the current schedulerCache. +// Snapshot takes a snapshot of the current schedulerCache. The method has performance impact, +// and should be only used in non-critical path. func (cache *schedulerCache) Snapshot() *Snapshot { cache.mu.Lock() defer cache.mu.Unlock()