Merge pull request #60953 from anfernee/sched-cache-resync

Automatic merge from submit-queue (batch tested with PRs 60919, 60953, 61085, 61083, 60971). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Sched cache resync

**What this PR does / why we need it**:  Scheduler cache comparer
    
    A debug tool that collects resources from api server and compares it
    with the scheduler cache. It currently only compares the node list, but
    it should be easy to extend. The compare is triggered by signal USER2,
    by doing
    
      kill -12 ${SCHED_PID}
    
    The compare result goes to scheduler log.

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Towards #60860

**Special notes for your reviewer**: @bsalamat 

**Release note**:
```release-note
None
```
This commit is contained in:
Kubernetes Submit Queue 2018-03-20 20:34:28 -07:00 committed by GitHub
commit 2d864f2359
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 612 additions and 2 deletions

View File

@ -39,8 +39,9 @@ import (
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
"k8s.io/kubernetes/pkg/scheduler/util"
"github.com/golang/glog"
"reflect"
"github.com/golang/glog"
)
// SchedulingQueue is an interface for a queue to store pods waiting to be scheduled.
@ -57,6 +58,7 @@ type SchedulingQueue interface {
AssignedPodAdded(pod *v1.Pod)
AssignedPodUpdated(pod *v1.Pod)
WaitingPodsForNode(nodeName string) []*v1.Pod
WaitingPods() []*v1.Pod
}
// NewSchedulingQueue initializes a new scheduling queue. If pod priority is
@ -116,6 +118,15 @@ func (f *FIFO) Pop() (*v1.Pod, error) {
return result.(*v1.Pod), nil
}
// WaitingPods returns all the waiting pods in the queue.
func (f *FIFO) WaitingPods() []*v1.Pod {
result := []*v1.Pod{}
for _, pod := range f.FIFO.List() {
result = append(result, pod.(*v1.Pod))
}
return result
}
// FIFO does not need to react to events, as all pods are always in the active
// scheduling queue anyway.
@ -460,6 +471,21 @@ func (p *PriorityQueue) WaitingPodsForNode(nodeName string) []*v1.Pod {
return nil
}
// WaitingPods returns all the waiting pods in the queue.
func (p *PriorityQueue) WaitingPods() []*v1.Pod {
p.lock.Lock()
defer p.lock.Unlock()
result := []*v1.Pod{}
for _, pod := range p.activeQ.List() {
result = append(result, pod.(*v1.Pod))
}
for _, pod := range p.unschedulableQ.pods {
result = append(result, pod)
}
return result
}
// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
// is used to implement unschedulableQ.
type UnschedulablePodsMap struct {

View File

@ -9,9 +9,45 @@ load(
go_library(
name = "go_default_library",
srcs = [
"cache_comparer.go",
"factory.go",
"plugins.go",
],
] + select({
"@io_bazel_rules_go//go/platform:android": [
"signal.go",
],
"@io_bazel_rules_go//go/platform:darwin": [
"signal.go",
],
"@io_bazel_rules_go//go/platform:dragonfly": [
"signal.go",
],
"@io_bazel_rules_go//go/platform:freebsd": [
"signal.go",
],
"@io_bazel_rules_go//go/platform:linux": [
"signal.go",
],
"@io_bazel_rules_go//go/platform:nacl": [
"signal.go",
],
"@io_bazel_rules_go//go/platform:netbsd": [
"signal.go",
],
"@io_bazel_rules_go//go/platform:openbsd": [
"signal.go",
],
"@io_bazel_rules_go//go/platform:plan9": [
"signal.go",
],
"@io_bazel_rules_go//go/platform:solaris": [
"signal.go",
],
"@io_bazel_rules_go//go/platform:windows": [
"signal_windows.go",
],
"//conditions:default": [],
}),
importpath = "k8s.io/kubernetes/pkg/scheduler/factory",
deps = [
"//pkg/api/v1/pod:go_default_library",
@ -59,6 +95,7 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"cache_comparer_test.go",
"factory_test.go",
"plugins_test.go",
],
@ -75,8 +112,10 @@ go_test(
"//pkg/scheduler/testing:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/policy/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",

View File

@ -0,0 +1,161 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package factory
import (
"sort"
"strings"
"github.com/golang/glog"
"k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/labels"
corelisters "k8s.io/client-go/listers/core/v1"
v1beta1 "k8s.io/client-go/listers/policy/v1beta1"
"k8s.io/kubernetes/pkg/scheduler/core"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
)
type cacheComparer struct {
nodeLister corelisters.NodeLister
podLister corelisters.PodLister
pdbLister v1beta1.PodDisruptionBudgetLister
cache schedulercache.Cache
podQueue core.SchedulingQueue
compareStrategy
}
func (c *cacheComparer) Compare() error {
glog.V(3).Info("cache comparer started")
defer glog.V(3).Info("cache comparer finished")
nodes, err := c.nodeLister.List(labels.Everything())
if err != nil {
return err
}
pods, err := c.podLister.List(labels.Everything())
if err != nil {
return err
}
pdbs, err := c.pdbLister.List(labels.Everything())
if err != nil {
return err
}
snapshot := c.cache.Snapshot()
waitingPods := c.podQueue.WaitingPods()
if missed, redundant := c.CompareNodes(nodes, snapshot.Nodes); len(missed)+len(redundant) != 0 {
glog.Warningf("cache mismatch: missed nodes: %s; redundant nodes: %s", missed, redundant)
}
if missed, redundant := c.ComparePods(pods, waitingPods, snapshot.Nodes); len(missed)+len(redundant) != 0 {
glog.Warningf("cache mismatch: missed pods: %s; redundant pods: %s", missed, redundant)
}
if missed, redundant := c.ComparePdbs(pdbs, snapshot.Pdbs); len(missed)+len(redundant) != 0 {
glog.Warningf("cache mismatch: missed pdbs: %s; redundant pdbs: %s", missed, redundant)
}
return nil
}
type compareStrategy struct {
}
func (c compareStrategy) CompareNodes(nodes []*v1.Node, nodeinfos map[string]*schedulercache.NodeInfo) (missed, redundant []string) {
actual := []string{}
for _, node := range nodes {
actual = append(actual, node.Name)
}
cached := []string{}
for nodeName := range nodeinfos {
cached = append(cached, nodeName)
}
return compareStrings(actual, cached)
}
func (c compareStrategy) ComparePods(pods, waitingPods []*v1.Pod, nodeinfos map[string]*schedulercache.NodeInfo) (missed, redundant []string) {
actual := []string{}
for _, pod := range pods {
actual = append(actual, string(pod.UID))
}
cached := []string{}
for _, nodeinfo := range nodeinfos {
for _, pod := range nodeinfo.Pods() {
cached = append(cached, string(pod.UID))
}
}
for _, pod := range waitingPods {
cached = append(cached, string(pod.UID))
}
return compareStrings(actual, cached)
}
func (c compareStrategy) ComparePdbs(pdbs []*policy.PodDisruptionBudget, pdbCache map[string]*policy.PodDisruptionBudget) (missed, redundant []string) {
actual := []string{}
for _, pdb := range pdbs {
actual = append(actual, string(pdb.Name))
}
cached := []string{}
for pdbName := range pdbCache {
cached = append(cached, pdbName)
}
return compareStrings(actual, cached)
}
func compareStrings(actual, cached []string) (missed, redundant []string) {
missed, redundant = []string{}, []string{}
sort.Strings(actual)
sort.Strings(cached)
compare := func(i, j int) int {
if i == len(actual) {
return 1
} else if j == len(cached) {
return -1
}
return strings.Compare(actual[i], cached[j])
}
for i, j := 0, 0; i < len(actual) || j < len(cached); {
switch compare(i, j) {
case 0:
i++
j++
case -1:
missed = append(missed, actual[i])
i++
case 1:
redundant = append(redundant, cached[j])
j++
}
}
return
}

View File

@ -0,0 +1,228 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package factory
import (
"reflect"
"testing"
"k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
)
func TestCompareNodes(t *testing.T) {
compare := compareStrategy{}
tests := []struct {
actual []string
cached []string
missing []string
redundant []string
}{
{
actual: []string{"foo", "bar"},
cached: []string{"bar", "foo", "foobar"},
missing: []string{},
redundant: []string{"foobar"},
},
{
actual: []string{"foo", "bar", "foobar"},
cached: []string{"bar", "foo"},
missing: []string{"foobar"},
redundant: []string{},
},
{
actual: []string{"foo", "bar", "foobar"},
cached: []string{"bar", "foobar", "foo"},
missing: []string{},
redundant: []string{},
},
}
for _, test := range tests {
nodes := []*v1.Node{}
for _, nodeName := range test.actual {
node := &v1.Node{}
node.Name = nodeName
nodes = append(nodes, node)
}
nodeInfo := make(map[string]*schedulercache.NodeInfo)
for _, nodeName := range test.cached {
nodeInfo[nodeName] = &schedulercache.NodeInfo{}
}
m, r := compare.CompareNodes(nodes, nodeInfo)
if !reflect.DeepEqual(m, test.missing) {
t.Errorf("missing expected to be %s; got %s", test.missing, m)
}
if !reflect.DeepEqual(r, test.redundant) {
t.Errorf("redundant expected to be %s; got %s", test.redundant, r)
}
}
}
func TestComparePods(t *testing.T) {
compare := compareStrategy{}
tests := []struct {
actual []string
cached []string
queued []string
missing []string
redundant []string
}{
{
actual: []string{"foo", "bar"},
cached: []string{"bar", "foo", "foobar"},
queued: []string{},
missing: []string{},
redundant: []string{"foobar"},
},
{
actual: []string{"foo", "bar"},
cached: []string{"foo", "foobar"},
queued: []string{"bar"},
missing: []string{},
redundant: []string{"foobar"},
},
{
actual: []string{"foo", "bar", "foobar"},
cached: []string{"bar", "foo"},
queued: []string{},
missing: []string{"foobar"},
redundant: []string{},
},
{
actual: []string{"foo", "bar", "foobar"},
cached: []string{"foo"},
queued: []string{"bar"},
missing: []string{"foobar"},
redundant: []string{},
},
{
actual: []string{"foo", "bar", "foobar"},
cached: []string{"bar", "foobar", "foo"},
queued: []string{},
missing: []string{},
redundant: []string{},
},
{
actual: []string{"foo", "bar", "foobar"},
cached: []string{"foobar", "foo"},
queued: []string{"bar"},
missing: []string{},
redundant: []string{},
},
}
for _, test := range tests {
pods := []*v1.Pod{}
for _, uid := range test.actual {
pod := &v1.Pod{}
pod.UID = types.UID(uid)
pods = append(pods, pod)
}
queuedPods := []*v1.Pod{}
for _, uid := range test.queued {
pod := &v1.Pod{}
pod.UID = types.UID(uid)
queuedPods = append(queuedPods, pod)
}
nodeInfo := make(map[string]*schedulercache.NodeInfo)
for _, uid := range test.cached {
pod := &v1.Pod{}
pod.UID = types.UID(uid)
pod.Namespace = "ns"
pod.Name = uid
nodeInfo[uid] = schedulercache.NewNodeInfo(pod)
}
m, r := compare.ComparePods(pods, queuedPods, nodeInfo)
if !reflect.DeepEqual(m, test.missing) {
t.Errorf("missing expected to be %s; got %s", test.missing, m)
}
if !reflect.DeepEqual(r, test.redundant) {
t.Errorf("redundant expected to be %s; got %s", test.redundant, r)
}
}
}
func TestComparePdbs(t *testing.T) {
compare := compareStrategy{}
tests := []struct {
actual []string
cached []string
missing []string
redundant []string
}{
{
actual: []string{"foo", "bar"},
cached: []string{"bar", "foo", "foobar"},
missing: []string{},
redundant: []string{"foobar"},
},
{
actual: []string{"foo", "bar", "foobar"},
cached: []string{"bar", "foo"},
missing: []string{"foobar"},
redundant: []string{},
},
{
actual: []string{"foo", "bar", "foobar"},
cached: []string{"bar", "foobar", "foo"},
missing: []string{},
redundant: []string{},
},
}
for _, test := range tests {
pdbs := []*policy.PodDisruptionBudget{}
for _, name := range test.actual {
pdb := &policy.PodDisruptionBudget{}
pdb.Name = name
pdbs = append(pdbs, pdb)
}
cache := make(map[string]*policy.PodDisruptionBudget)
for _, name := range test.cached {
pdb := &policy.PodDisruptionBudget{}
pdb.Name = name
cache[name] = pdb
}
m, r := compare.ComparePdbs(pdbs, cache)
if !reflect.DeepEqual(m, test.missing) {
t.Errorf("missing expected to be %s; got %s", test.missing, m)
}
if !reflect.DeepEqual(r, test.redundant) {
t.Errorf("redundant expected to be %s; got %s", test.redundant, r)
}
}
}

View File

@ -20,6 +20,8 @@ package factory
import (
"fmt"
"os"
"os/signal"
"reflect"
"time"
@ -297,6 +299,29 @@ func NewConfigFactory(
c.volumeBinder = volumebinder.NewVolumeBinder(client, pvcInformer, pvInformer, nodeInformer, storageClassInformer)
}
// Setup cache comparer
comparer := &cacheComparer{
podLister: podInformer.Lister(),
nodeLister: nodeInformer.Lister(),
pdbLister: pdbInformer.Lister(),
cache: c.schedulerCache,
podQueue: c.podQueue,
}
ch := make(chan os.Signal, 1)
signal.Notify(ch, compareSignal)
go func() {
for {
select {
case <-c.StopEverything:
return
case <-ch:
comparer.Compare()
}
}
}()
return c
}

View File

@ -0,0 +1,25 @@
// +build !windows
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package factory
import "syscall"
// compareSignal is the signal to trigger cache compare. For non-windows
// environment it's SIGUSR2.
var compareSignal = syscall.SIGUSR2

View File

@ -0,0 +1,23 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package factory
import "os"
// compareSignal is the signal to trigger cache compare. For windows,
// it's SIGINT.
var compareSignal = os.Interrupt

View File

@ -80,6 +80,34 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul
}
}
// Snapshot takes a snapshot of the current schedulerCache. The method has performance impact,
// and should be only used in non-critical path.
func (cache *schedulerCache) Snapshot() *Snapshot {
cache.mu.Lock()
defer cache.mu.Unlock()
nodes := make(map[string]*NodeInfo)
for k, v := range cache.nodes {
nodes[k] = v.Clone()
}
assumedPods := make(map[string]bool)
for k, v := range cache.assumedPods {
assumedPods[k] = v
}
pdbs := make(map[string]*policy.PodDisruptionBudget)
for k, v := range cache.pdbs {
pdbs[k] = v.DeepCopy()
}
return &Snapshot{
Nodes: nodes,
AssumedPods: assumedPods,
Pdbs: pdbs,
}
}
func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*NodeInfo) error {
cache.mu.Lock()
defer cache.mu.Unlock()

View File

@ -326,6 +326,46 @@ func TestAddPodWillConfirm(t *testing.T) {
}
}
func TestSnapshot(t *testing.T) {
nodeName := "node"
now := time.Now()
ttl := 10 * time.Second
testPods := []*v1.Pod{
makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
}
tests := []struct {
podsToAssume []*v1.Pod
podsToAdd []*v1.Pod
}{{ // two pod were assumed at same time. But first one is called Add() and gets confirmed.
podsToAssume: []*v1.Pod{testPods[0], testPods[1]},
podsToAdd: []*v1.Pod{testPods[0]},
}}
for _, tt := range tests {
cache := newSchedulerCache(ttl, time.Second, nil)
for _, podToAssume := range tt.podsToAssume {
if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
}
for _, podToAdd := range tt.podsToAdd {
if err := cache.AddPod(podToAdd); err != nil {
t.Fatalf("AddPod failed: %v", err)
}
}
snapshot := cache.Snapshot()
if !reflect.DeepEqual(snapshot.Nodes, cache.nodes) {
t.Fatalf("expect \n%+v; got \n%+v", cache.nodes, snapshot.Nodes)
}
if !reflect.DeepEqual(snapshot.AssumedPods, cache.assumedPods) {
t.Fatalf("expect \n%+v; got \n%+v", cache.assumedPods, snapshot.AssumedPods)
}
}
}
// TestAddPodWillReplaceAssumed tests that a pod being Add()ed will replace any assumed pod.
func TestAddPodWillReplaceAssumed(t *testing.T) {
now := time.Now()

View File

@ -119,4 +119,14 @@ type Cache interface {
// FilteredList returns all cached pods that pass the filter.
FilteredList(filter PodFilter, selector labels.Selector) ([]*v1.Pod, error)
// Snapshot takes a snapshot on current cache
Snapshot() *Snapshot
}
// Snapshot is a snapshot of cache state
type Snapshot struct {
AssumedPods map[string]bool
Nodes map[string]*NodeInfo
Pdbs map[string]*policy.PodDisruptionBudget
}

View File

@ -100,3 +100,8 @@ func (f *FakeCache) List(s labels.Selector) ([]*v1.Pod, error) { return nil, nil
func (f *FakeCache) FilteredList(filter schedulercache.PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
return nil, nil
}
// Snapshot is a fake method for testing
func (f *FakeCache) Snapshot() *schedulercache.Snapshot {
return &schedulercache.Snapshot{}
}