Merge pull request #84197 from ahg-g/ahg-snapshot

move nodeinfo/snapshot.go into its own package
This commit is contained in:
Kubernetes Prow Robot 2019-10-23 14:33:05 -07:00 committed by GitHub
commit 9eb73ee545
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 64 additions and 23 deletions

View File

@ -21,6 +21,7 @@ go_library(
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@ -49,6 +49,7 @@ import (
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
utiltrace "k8s.io/utils/trace"
@ -153,7 +154,7 @@ type genericScheduler struct {
framework framework.Framework
extenders []algorithm.SchedulerExtender
alwaysCheckAllPredicates bool
nodeInfoSnapshot *schedulernodeinfo.Snapshot
nodeInfoSnapshot *nodeinfosnapshot.Snapshot
volumeBinder *volumebinder.VolumeBinder
pvcLister corelisters.PersistentVolumeClaimLister
pdbLister policylisters.PodDisruptionBudgetLister

View File

@ -15,6 +15,7 @@ go_library(
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",

View File

@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/metrics"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
)
@ -57,7 +58,7 @@ const (
// plugins.
type framework struct {
registry Registry
nodeInfoSnapshot *schedulernodeinfo.Snapshot
nodeInfoSnapshot *nodeinfosnapshot.Snapshot
waitingPods *waitingPodsMap
pluginNameToWeightMap map[string]int
queueSortPlugins []QueueSortPlugin
@ -138,7 +139,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
f := &framework{
registry: r,
nodeInfoSnapshot: schedulernodeinfo.NewSnapshot(),
nodeInfoSnapshot: nodeinfosnapshot.NewSnapshot(),
pluginNameToWeightMap: make(map[string]int),
waitingPods: newWaitingPodsMap(),
clientSet: options.clientSet,
@ -594,7 +595,7 @@ func (f *framework) RunPermitPlugins(
// is taken at the beginning of a scheduling cycle and remains unchanged until a
// pod finishes "Reserve". There is no guarantee that the information remains
// unchanged after "Reserve".
func (f *framework) NodeInfoSnapshot() *schedulernodeinfo.Snapshot {
func (f *framework) NodeInfoSnapshot() *nodeinfosnapshot.Snapshot {
return f.nodeInfoSnapshot
}

View File

@ -30,6 +30,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
// NodeScoreList declares a list of nodes and their scores.
@ -458,7 +459,7 @@ type FrameworkHandle interface {
// cycle(permit/pre-bind/bind/post-bind/un-reserve plugin) should not use it,
// otherwise a concurrent read/write error might occur, they should use scheduler
// cache instead.
NodeInfoSnapshot() *schedulernodeinfo.Snapshot
NodeInfoSnapshot() *nodeinfosnapshot.Snapshot
// IterateOverWaitingPods acquires a read lock and iterates over the WaitingPods map.
IterateOverWaitingPods(callback func(WaitingPod))

View File

@ -13,6 +13,7 @@ go_library(
"//pkg/features:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
@ -34,6 +35,7 @@ go_test(
"//pkg/features:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/features"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
var (
@ -199,7 +200,7 @@ func (cache *schedulerCache) Snapshot() *Snapshot {
// beginning of every scheduling cycle.
// This function tracks generation number of NodeInfo and updates only the
// entries of an existing snapshot that have changed after the snapshot was taken.
func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *schedulernodeinfo.Snapshot) error {
func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *nodeinfosnapshot.Snapshot) error {
cache.mu.Lock()
defer cache.mu.Unlock()
balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes)

View File

@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/features"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
func deepEqualWithoutGeneration(t *testing.T, testcase int, actual *nodeInfoListItem, expected *schedulernodeinfo.NodeInfo) {
@ -1076,7 +1077,7 @@ func TestNodeOperators(t *testing.T) {
}
// Case 2: dump cached nodes successfully.
cachedNodes := schedulernodeinfo.NewSnapshot()
cachedNodes := nodeinfosnapshot.NewSnapshot()
cache.UpdateNodeInfoSnapshot(cachedNodes)
newNode, found := cachedNodes.NodeInfoMap[node.Name]
if !found || len(cachedNodes.NodeInfoMap) != 1 {
@ -1179,7 +1180,7 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) {
}
var cache *schedulerCache
var snapshot *schedulernodeinfo.Snapshot
var snapshot *nodeinfosnapshot.Snapshot
type operation = func()
addNode := func(i int) operation {
@ -1332,7 +1333,7 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cache = newSchedulerCache(time.Second, time.Second, nil)
snapshot = schedulernodeinfo.NewSnapshot()
snapshot = nodeinfosnapshot.NewSnapshot()
for _, op := range test.operations {
op()
@ -1363,7 +1364,7 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) {
}
}
func compareCacheWithNodeInfoSnapshot(cache *schedulerCache, snapshot *schedulernodeinfo.Snapshot) error {
func compareCacheWithNodeInfoSnapshot(cache *schedulerCache, snapshot *nodeinfosnapshot.Snapshot) error {
if len(snapshot.NodeInfoMap) != len(cache.nodes) {
return fmt.Errorf("unexpected number of nodes in the snapshot. Expected: %v, got: %v", len(cache.nodes), len(snapshot.NodeInfoMap))
}
@ -1381,7 +1382,7 @@ func BenchmarkUpdate1kNodes30kPods(b *testing.B) {
cache := setupCacheOf1kNodes30kPods(b)
b.ResetTimer()
for n := 0; n < b.N; n++ {
cachedNodes := schedulernodeinfo.NewSnapshot()
cachedNodes := nodeinfosnapshot.NewSnapshot()
cache.UpdateNodeInfoSnapshot(cachedNodes)
}
}

View File

@ -8,7 +8,7 @@ go_library(
deps = [
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
],

View File

@ -21,7 +21,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
// Cache is used for testing
@ -76,7 +76,7 @@ func (c *Cache) UpdateNode(oldNode, newNode *v1.Node) error { return nil }
func (c *Cache) RemoveNode(node *v1.Node) error { return nil }
// UpdateNodeInfoSnapshot is a fake method for testing.
func (c *Cache) UpdateNodeInfoSnapshot(nodeSnapshot *schedulernodeinfo.Snapshot) error {
func (c *Cache) UpdateNodeInfoSnapshot(nodeSnapshot *nodeinfosnapshot.Snapshot) error {
return nil
}

View File

@ -20,6 +20,7 @@ import (
v1 "k8s.io/api/core/v1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
// Cache collects pods' information and provides node-level aggregated information.
@ -99,7 +100,7 @@ type Cache interface {
// UpdateNodeInfoSnapshot updates the passed infoSnapshot to the current contents of Cache.
// The node info contains aggregated information of pods scheduled (including assumed to be)
// on this node.
UpdateNodeInfoSnapshot(nodeSnapshot *schedulernodeinfo.Snapshot) error
UpdateNodeInfoSnapshot(nodeSnapshot *nodeinfosnapshot.Snapshot) error
// GetNodeInfo returns the node object with node string.
GetNodeInfo(nodeName string) (*v1.Node, error)

View File

@ -39,6 +39,7 @@ go_test(
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -37,6 +37,7 @@ import (
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/metrics"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
"k8s.io/kubernetes/pkg/scheduler/util"
)
@ -181,7 +182,7 @@ func (f *fakeFramework) ListPlugins() map[string][]config.Plugin {
return nil
}
func (*fakeFramework) NodeInfoSnapshot() *schedulernodeinfo.Snapshot {
func (*fakeFramework) NodeInfoSnapshot() *nodeinfosnapshot.Snapshot {
return nil
}

View File

@ -5,7 +5,6 @@ go_library(
srcs = [
"host_ports.go",
"node_info.go",
"snapshot.go",
"util.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/nodeinfo",
@ -51,7 +50,10 @@ filegroup(
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
srcs = [
":package-srcs",
"//pkg/scheduler/nodeinfo/snapshot:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,26 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["snapshot.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -18,22 +18,23 @@ package nodeinfo
import (
v1 "k8s.io/api/core/v1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
// Snapshot is a snapshot of cache NodeInfo and NodeTree order. The scheduler takes a
// snapshot at the beginning of each scheduling cycle and uses it for its operations in that cycle.
type Snapshot struct {
// NodeInfoMap a map of node name to a snapshot of its NodeInfo.
NodeInfoMap map[string]*NodeInfo
NodeInfoMap map[string]*schedulernodeinfo.NodeInfo
// NodeInfoList is the list of nodes as ordered in the cache's nodeTree.
NodeInfoList []*NodeInfo
NodeInfoList []*schedulernodeinfo.NodeInfo
Generation int64
}
// NewSnapshot initializes a Snapshot struct and returns it.
func NewSnapshot() *Snapshot {
return &Snapshot{
NodeInfoMap: make(map[string]*NodeInfo),
NodeInfoMap: make(map[string]*schedulernodeinfo.NodeInfo),
}
}
@ -41,8 +42,8 @@ func NewSnapshot() *Snapshot {
func (s *Snapshot) ListNodes() []*v1.Node {
nodes := make([]*v1.Node, 0, len(s.NodeInfoMap))
for _, n := range s.NodeInfoList {
if n != nil && n.node != nil {
nodes = append(nodes, n.node)
if n.Node() != nil {
nodes = append(nodes, n.Node())
}
}
return nodes