diff --git a/pkg/scheduler/core/scheduling_queue.go b/pkg/scheduler/core/scheduling_queue.go index 9145416658b..0710a27f856 100644 --- a/pkg/scheduler/core/scheduling_queue.go +++ b/pkg/scheduler/core/scheduling_queue.go @@ -39,8 +39,9 @@ import ( priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" "k8s.io/kubernetes/pkg/scheduler/util" - "github.com/golang/glog" "reflect" + + "github.com/golang/glog" ) // SchedulingQueue is an interface for a queue to store pods waiting to be scheduled. @@ -57,6 +58,7 @@ type SchedulingQueue interface { AssignedPodAdded(pod *v1.Pod) AssignedPodUpdated(pod *v1.Pod) WaitingPodsForNode(nodeName string) []*v1.Pod + WaitingPods() []*v1.Pod } // NewSchedulingQueue initializes a new scheduling queue. If pod priority is @@ -116,6 +118,15 @@ func (f *FIFO) Pop() (*v1.Pod, error) { return result.(*v1.Pod), nil } +// WaitingPods returns all the waiting pods in the queue. +func (f *FIFO) WaitingPods() []*v1.Pod { + result := []*v1.Pod{} + for _, pod := range f.FIFO.List() { + result = append(result, pod.(*v1.Pod)) + } + return result +} + // FIFO does not need to react to events, as all pods are always in the active // scheduling queue anyway. @@ -460,6 +471,21 @@ func (p *PriorityQueue) WaitingPodsForNode(nodeName string) []*v1.Pod { return nil } +// WaitingPods returns all the waiting pods in the queue. +func (p *PriorityQueue) WaitingPods() []*v1.Pod { + p.lock.Lock() + defer p.lock.Unlock() + + result := []*v1.Pod{} + for _, pod := range p.activeQ.List() { + result = append(result, pod.(*v1.Pod)) + } + for _, pod := range p.unschedulableQ.pods { + result = append(result, pod) + } + return result +} + // UnschedulablePodsMap holds pods that cannot be scheduled. This data structure // is used to implement unschedulableQ. type UnschedulablePodsMap struct { diff --git a/pkg/scheduler/factory/BUILD b/pkg/scheduler/factory/BUILD index a4c87066f6a..57cf5e0e9c7 100644 --- a/pkg/scheduler/factory/BUILD +++ b/pkg/scheduler/factory/BUILD @@ -9,9 +9,45 @@ load( go_library( name = "go_default_library", srcs = [ + "cache_comparer.go", "factory.go", "plugins.go", - ], + ] + select({ + "@io_bazel_rules_go//go/platform:android": [ + "signal.go", + ], + "@io_bazel_rules_go//go/platform:darwin": [ + "signal.go", + ], + "@io_bazel_rules_go//go/platform:dragonfly": [ + "signal.go", + ], + "@io_bazel_rules_go//go/platform:freebsd": [ + "signal.go", + ], + "@io_bazel_rules_go//go/platform:linux": [ + "signal.go", + ], + "@io_bazel_rules_go//go/platform:nacl": [ + "signal.go", + ], + "@io_bazel_rules_go//go/platform:netbsd": [ + "signal.go", + ], + "@io_bazel_rules_go//go/platform:openbsd": [ + "signal.go", + ], + "@io_bazel_rules_go//go/platform:plan9": [ + "signal.go", + ], + "@io_bazel_rules_go//go/platform:solaris": [ + "signal.go", + ], + "@io_bazel_rules_go//go/platform:windows": [ + "signal_windows.go", + ], + "//conditions:default": [], + }), importpath = "k8s.io/kubernetes/pkg/scheduler/factory", deps = [ "//pkg/api/v1/pod:go_default_library", @@ -59,6 +95,7 @@ go_library( go_test( name = "go_default_test", srcs = [ + "cache_comparer_test.go", "factory_test.go", "plugins_test.go", ], @@ -75,8 +112,10 @@ go_test( "//pkg/scheduler/testing:go_default_library", "//pkg/scheduler/util:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/api/policy/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", diff --git a/pkg/scheduler/factory/cache_comparer.go b/pkg/scheduler/factory/cache_comparer.go new file mode 100644 index 00000000000..b93cff7f2b6 --- /dev/null +++ b/pkg/scheduler/factory/cache_comparer.go @@ -0,0 +1,161 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package factory + +import ( + "sort" + "strings" + + "github.com/golang/glog" + "k8s.io/api/core/v1" + policy "k8s.io/api/policy/v1beta1" + "k8s.io/apimachinery/pkg/labels" + corelisters "k8s.io/client-go/listers/core/v1" + v1beta1 "k8s.io/client-go/listers/policy/v1beta1" + "k8s.io/kubernetes/pkg/scheduler/core" + "k8s.io/kubernetes/pkg/scheduler/schedulercache" +) + +type cacheComparer struct { + nodeLister corelisters.NodeLister + podLister corelisters.PodLister + pdbLister v1beta1.PodDisruptionBudgetLister + cache schedulercache.Cache + podQueue core.SchedulingQueue + + compareStrategy +} + +func (c *cacheComparer) Compare() error { + glog.V(3).Info("cache comparer started") + defer glog.V(3).Info("cache comparer finished") + + nodes, err := c.nodeLister.List(labels.Everything()) + if err != nil { + return err + } + + pods, err := c.podLister.List(labels.Everything()) + if err != nil { + return err + } + + pdbs, err := c.pdbLister.List(labels.Everything()) + if err != nil { + return err + } + + snapshot := c.cache.Snapshot() + + waitingPods := c.podQueue.WaitingPods() + + if missed, redundant := c.CompareNodes(nodes, snapshot.Nodes); len(missed)+len(redundant) != 0 { + glog.Warningf("cache mismatch: missed nodes: %s; redundant nodes: %s", missed, redundant) + } + + if missed, redundant := c.ComparePods(pods, waitingPods, snapshot.Nodes); len(missed)+len(redundant) != 0 { + glog.Warningf("cache mismatch: missed pods: %s; redundant pods: %s", missed, redundant) + } + + if missed, redundant := c.ComparePdbs(pdbs, snapshot.Pdbs); len(missed)+len(redundant) != 0 { + glog.Warningf("cache mismatch: missed pdbs: %s; redundant pdbs: %s", missed, redundant) + } + + return nil +} + +type compareStrategy struct { +} + +func (c compareStrategy) CompareNodes(nodes []*v1.Node, nodeinfos map[string]*schedulercache.NodeInfo) (missed, redundant []string) { + actual := []string{} + for _, node := range nodes { + actual = append(actual, node.Name) + } + + cached := []string{} + for nodeName := range nodeinfos { + cached = append(cached, nodeName) + } + + return compareStrings(actual, cached) +} + +func (c compareStrategy) ComparePods(pods, waitingPods []*v1.Pod, nodeinfos map[string]*schedulercache.NodeInfo) (missed, redundant []string) { + actual := []string{} + for _, pod := range pods { + actual = append(actual, string(pod.UID)) + } + + cached := []string{} + for _, nodeinfo := range nodeinfos { + for _, pod := range nodeinfo.Pods() { + cached = append(cached, string(pod.UID)) + } + } + for _, pod := range waitingPods { + cached = append(cached, string(pod.UID)) + } + + return compareStrings(actual, cached) +} + +func (c compareStrategy) ComparePdbs(pdbs []*policy.PodDisruptionBudget, pdbCache map[string]*policy.PodDisruptionBudget) (missed, redundant []string) { + actual := []string{} + for _, pdb := range pdbs { + actual = append(actual, string(pdb.Name)) + } + + cached := []string{} + for pdbName := range pdbCache { + cached = append(cached, pdbName) + } + + return compareStrings(actual, cached) +} + +func compareStrings(actual, cached []string) (missed, redundant []string) { + missed, redundant = []string{}, []string{} + + sort.Strings(actual) + sort.Strings(cached) + + compare := func(i, j int) int { + if i == len(actual) { + return 1 + } else if j == len(cached) { + return -1 + } + return strings.Compare(actual[i], cached[j]) + } + + for i, j := 0, 0; i < len(actual) || j < len(cached); { + switch compare(i, j) { + case 0: + i++ + j++ + case -1: + missed = append(missed, actual[i]) + i++ + case 1: + redundant = append(redundant, cached[j]) + j++ + } + } + + return +} diff --git a/pkg/scheduler/factory/cache_comparer_test.go b/pkg/scheduler/factory/cache_comparer_test.go new file mode 100644 index 00000000000..03f44595c62 --- /dev/null +++ b/pkg/scheduler/factory/cache_comparer_test.go @@ -0,0 +1,228 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package factory + +import ( + "reflect" + "testing" + + "k8s.io/api/core/v1" + policy "k8s.io/api/policy/v1beta1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/scheduler/schedulercache" +) + +func TestCompareNodes(t *testing.T) { + compare := compareStrategy{} + + tests := []struct { + actual []string + cached []string + missing []string + redundant []string + }{ + { + actual: []string{"foo", "bar"}, + cached: []string{"bar", "foo", "foobar"}, + missing: []string{}, + redundant: []string{"foobar"}, + }, + { + actual: []string{"foo", "bar", "foobar"}, + cached: []string{"bar", "foo"}, + missing: []string{"foobar"}, + redundant: []string{}, + }, + { + actual: []string{"foo", "bar", "foobar"}, + cached: []string{"bar", "foobar", "foo"}, + missing: []string{}, + redundant: []string{}, + }, + } + + for _, test := range tests { + nodes := []*v1.Node{} + for _, nodeName := range test.actual { + node := &v1.Node{} + node.Name = nodeName + nodes = append(nodes, node) + } + + nodeInfo := make(map[string]*schedulercache.NodeInfo) + for _, nodeName := range test.cached { + nodeInfo[nodeName] = &schedulercache.NodeInfo{} + } + + m, r := compare.CompareNodes(nodes, nodeInfo) + + if !reflect.DeepEqual(m, test.missing) { + t.Errorf("missing expected to be %s; got %s", test.missing, m) + } + + if !reflect.DeepEqual(r, test.redundant) { + t.Errorf("redundant expected to be %s; got %s", test.redundant, r) + } + } +} + +func TestComparePods(t *testing.T) { + compare := compareStrategy{} + + tests := []struct { + actual []string + cached []string + queued []string + missing []string + redundant []string + }{ + { + actual: []string{"foo", "bar"}, + cached: []string{"bar", "foo", "foobar"}, + queued: []string{}, + missing: []string{}, + redundant: []string{"foobar"}, + }, + { + actual: []string{"foo", "bar"}, + cached: []string{"foo", "foobar"}, + queued: []string{"bar"}, + missing: []string{}, + redundant: []string{"foobar"}, + }, + { + actual: []string{"foo", "bar", "foobar"}, + cached: []string{"bar", "foo"}, + queued: []string{}, + missing: []string{"foobar"}, + redundant: []string{}, + }, + { + actual: []string{"foo", "bar", "foobar"}, + cached: []string{"foo"}, + queued: []string{"bar"}, + missing: []string{"foobar"}, + redundant: []string{}, + }, + { + actual: []string{"foo", "bar", "foobar"}, + cached: []string{"bar", "foobar", "foo"}, + queued: []string{}, + missing: []string{}, + redundant: []string{}, + }, + { + actual: []string{"foo", "bar", "foobar"}, + cached: []string{"foobar", "foo"}, + queued: []string{"bar"}, + missing: []string{}, + redundant: []string{}, + }, + } + + for _, test := range tests { + pods := []*v1.Pod{} + for _, uid := range test.actual { + pod := &v1.Pod{} + pod.UID = types.UID(uid) + pods = append(pods, pod) + } + + queuedPods := []*v1.Pod{} + for _, uid := range test.queued { + pod := &v1.Pod{} + pod.UID = types.UID(uid) + queuedPods = append(queuedPods, pod) + } + + nodeInfo := make(map[string]*schedulercache.NodeInfo) + for _, uid := range test.cached { + pod := &v1.Pod{} + pod.UID = types.UID(uid) + pod.Namespace = "ns" + pod.Name = uid + + nodeInfo[uid] = schedulercache.NewNodeInfo(pod) + } + + m, r := compare.ComparePods(pods, queuedPods, nodeInfo) + + if !reflect.DeepEqual(m, test.missing) { + t.Errorf("missing expected to be %s; got %s", test.missing, m) + } + + if !reflect.DeepEqual(r, test.redundant) { + t.Errorf("redundant expected to be %s; got %s", test.redundant, r) + } + } +} + +func TestComparePdbs(t *testing.T) { + compare := compareStrategy{} + + tests := []struct { + actual []string + cached []string + missing []string + redundant []string + }{ + { + actual: []string{"foo", "bar"}, + cached: []string{"bar", "foo", "foobar"}, + missing: []string{}, + redundant: []string{"foobar"}, + }, + { + actual: []string{"foo", "bar", "foobar"}, + cached: []string{"bar", "foo"}, + missing: []string{"foobar"}, + redundant: []string{}, + }, + { + actual: []string{"foo", "bar", "foobar"}, + cached: []string{"bar", "foobar", "foo"}, + missing: []string{}, + redundant: []string{}, + }, + } + + for _, test := range tests { + pdbs := []*policy.PodDisruptionBudget{} + for _, name := range test.actual { + pdb := &policy.PodDisruptionBudget{} + pdb.Name = name + pdbs = append(pdbs, pdb) + } + + cache := make(map[string]*policy.PodDisruptionBudget) + for _, name := range test.cached { + pdb := &policy.PodDisruptionBudget{} + pdb.Name = name + cache[name] = pdb + } + + m, r := compare.ComparePdbs(pdbs, cache) + + if !reflect.DeepEqual(m, test.missing) { + t.Errorf("missing expected to be %s; got %s", test.missing, m) + } + + if !reflect.DeepEqual(r, test.redundant) { + t.Errorf("redundant expected to be %s; got %s", test.redundant, r) + } + } +} diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 8bfccb2f5dd..e0098723797 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -20,6 +20,8 @@ package factory import ( "fmt" + "os" + "os/signal" "reflect" "time" @@ -297,6 +299,29 @@ func NewConfigFactory( c.volumeBinder = volumebinder.NewVolumeBinder(client, pvcInformer, pvInformer, nodeInformer, storageClassInformer) } + // Setup cache comparer + comparer := &cacheComparer{ + podLister: podInformer.Lister(), + nodeLister: nodeInformer.Lister(), + pdbLister: pdbInformer.Lister(), + cache: c.schedulerCache, + podQueue: c.podQueue, + } + + ch := make(chan os.Signal, 1) + signal.Notify(ch, compareSignal) + + go func() { + for { + select { + case <-c.StopEverything: + return + case <-ch: + comparer.Compare() + } + } + }() + return c } diff --git a/pkg/scheduler/factory/signal.go b/pkg/scheduler/factory/signal.go new file mode 100644 index 00000000000..8ec17048ebb --- /dev/null +++ b/pkg/scheduler/factory/signal.go @@ -0,0 +1,25 @@ +// +build !windows + +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package factory + +import "syscall" + +// compareSignal is the signal to trigger cache compare. For non-windows +// environment it's SIGUSR2. +var compareSignal = syscall.SIGUSR2 diff --git a/pkg/scheduler/factory/signal_windows.go b/pkg/scheduler/factory/signal_windows.go new file mode 100644 index 00000000000..9df239874c3 --- /dev/null +++ b/pkg/scheduler/factory/signal_windows.go @@ -0,0 +1,23 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package factory + +import "os" + +// compareSignal is the signal to trigger cache compare. For windows, +// it's SIGINT. +var compareSignal = os.Interrupt diff --git a/pkg/scheduler/schedulercache/cache.go b/pkg/scheduler/schedulercache/cache.go index 78cdca25b3e..b84147535de 100644 --- a/pkg/scheduler/schedulercache/cache.go +++ b/pkg/scheduler/schedulercache/cache.go @@ -80,6 +80,34 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul } } +// Snapshot takes a snapshot of the current schedulerCache. The method has performance impact, +// and should be only used in non-critical path. +func (cache *schedulerCache) Snapshot() *Snapshot { + cache.mu.Lock() + defer cache.mu.Unlock() + + nodes := make(map[string]*NodeInfo) + for k, v := range cache.nodes { + nodes[k] = v.Clone() + } + + assumedPods := make(map[string]bool) + for k, v := range cache.assumedPods { + assumedPods[k] = v + } + + pdbs := make(map[string]*policy.PodDisruptionBudget) + for k, v := range cache.pdbs { + pdbs[k] = v.DeepCopy() + } + + return &Snapshot{ + Nodes: nodes, + AssumedPods: assumedPods, + Pdbs: pdbs, + } +} + func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*NodeInfo) error { cache.mu.Lock() defer cache.mu.Unlock() diff --git a/pkg/scheduler/schedulercache/cache_test.go b/pkg/scheduler/schedulercache/cache_test.go index 724f25b4f3e..0fc2be33fb0 100644 --- a/pkg/scheduler/schedulercache/cache_test.go +++ b/pkg/scheduler/schedulercache/cache_test.go @@ -326,6 +326,46 @@ func TestAddPodWillConfirm(t *testing.T) { } } +func TestSnapshot(t *testing.T) { + nodeName := "node" + now := time.Now() + ttl := 10 * time.Second + + testPods := []*v1.Pod{ + makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), + makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), + } + tests := []struct { + podsToAssume []*v1.Pod + podsToAdd []*v1.Pod + }{{ // two pod were assumed at same time. But first one is called Add() and gets confirmed. + podsToAssume: []*v1.Pod{testPods[0], testPods[1]}, + podsToAdd: []*v1.Pod{testPods[0]}, + }} + + for _, tt := range tests { + cache := newSchedulerCache(ttl, time.Second, nil) + for _, podToAssume := range tt.podsToAssume { + if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { + t.Fatalf("assumePod failed: %v", err) + } + } + for _, podToAdd := range tt.podsToAdd { + if err := cache.AddPod(podToAdd); err != nil { + t.Fatalf("AddPod failed: %v", err) + } + } + + snapshot := cache.Snapshot() + if !reflect.DeepEqual(snapshot.Nodes, cache.nodes) { + t.Fatalf("expect \n%+v; got \n%+v", cache.nodes, snapshot.Nodes) + } + if !reflect.DeepEqual(snapshot.AssumedPods, cache.assumedPods) { + t.Fatalf("expect \n%+v; got \n%+v", cache.assumedPods, snapshot.AssumedPods) + } + } +} + // TestAddPodWillReplaceAssumed tests that a pod being Add()ed will replace any assumed pod. func TestAddPodWillReplaceAssumed(t *testing.T) { now := time.Now() diff --git a/pkg/scheduler/schedulercache/interface.go b/pkg/scheduler/schedulercache/interface.go index d3c65313b83..8e4f4909c75 100644 --- a/pkg/scheduler/schedulercache/interface.go +++ b/pkg/scheduler/schedulercache/interface.go @@ -119,4 +119,14 @@ type Cache interface { // FilteredList returns all cached pods that pass the filter. FilteredList(filter PodFilter, selector labels.Selector) ([]*v1.Pod, error) + + // Snapshot takes a snapshot on current cache + Snapshot() *Snapshot +} + +// Snapshot is a snapshot of cache state +type Snapshot struct { + AssumedPods map[string]bool + Nodes map[string]*NodeInfo + Pdbs map[string]*policy.PodDisruptionBudget } diff --git a/pkg/scheduler/testing/fake_cache.go b/pkg/scheduler/testing/fake_cache.go index 21c4528ae21..e958a14b2f7 100644 --- a/pkg/scheduler/testing/fake_cache.go +++ b/pkg/scheduler/testing/fake_cache.go @@ -100,3 +100,8 @@ func (f *FakeCache) List(s labels.Selector) ([]*v1.Pod, error) { return nil, nil func (f *FakeCache) FilteredList(filter schedulercache.PodFilter, selector labels.Selector) ([]*v1.Pod, error) { return nil, nil } + +// Snapshot is a fake method for testing +func (f *FakeCache) Snapshot() *schedulercache.Snapshot { + return &schedulercache.Snapshot{} +}