Merge pull request #81840 from draveness/feature/move-node-info-snapshot-to-framework

feat(scheduler): move node info snapshot out of internal package
This commit is contained in:
Kubernetes Prow Robot 2019-08-26 16:42:54 -07:00 committed by GitHub
commit e57bee7332
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 55 additions and 35 deletions

View File

@ -167,7 +167,7 @@ type genericScheduler struct {
framework framework.Framework framework framework.Framework
extenders []algorithm.SchedulerExtender extenders []algorithm.SchedulerExtender
alwaysCheckAllPredicates bool alwaysCheckAllPredicates bool
nodeInfoSnapshot *internalcache.NodeInfoSnapshot nodeInfoSnapshot *schedulernodeinfo.Snapshot
volumeBinder *volumebinder.VolumeBinder volumeBinder *volumebinder.VolumeBinder
pvcLister corelisters.PersistentVolumeClaimLister pvcLister corelisters.PersistentVolumeClaimLister
pdbLister algorithm.PDBLister pdbLister algorithm.PDBLister

View File

@ -14,7 +14,7 @@ go_library(
deps = [ deps = [
"//pkg/scheduler/api:go_default_library", "//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/util:go_default_library", "//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",

View File

@ -27,7 +27,7 @@ import (
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/klog" "k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/internal/cache" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util" schedutil "k8s.io/kubernetes/pkg/scheduler/util"
) )
@ -35,7 +35,7 @@ import (
// plugins. // plugins.
type framework struct { type framework struct {
registry Registry registry Registry
nodeInfoSnapshot *cache.NodeInfoSnapshot nodeInfoSnapshot *schedulernodeinfo.Snapshot
waitingPods *waitingPodsMap waitingPods *waitingPodsMap
pluginNameToWeightMap map[string]int pluginNameToWeightMap map[string]int
queueSortPlugins []QueueSortPlugin queueSortPlugins []QueueSortPlugin
@ -63,7 +63,7 @@ var _ = Framework(&framework{})
func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfig) (Framework, error) { func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfig) (Framework, error) {
f := &framework{ f := &framework{
registry: r, registry: r,
nodeInfoSnapshot: cache.NewNodeInfoSnapshot(), nodeInfoSnapshot: schedulernodeinfo.NewSnapshot(),
pluginNameToWeightMap: make(map[string]int), pluginNameToWeightMap: make(map[string]int),
waitingPods: newWaitingPodsMap(), waitingPods: newWaitingPodsMap(),
} }
@ -566,7 +566,7 @@ func (f *framework) RunPermitPlugins(
// is taken at the beginning of a scheduling cycle and remains unchanged until a // is taken at the beginning of a scheduling cycle and remains unchanged until a
// pod finishes "Reserve". There is no guarantee that the information remains // pod finishes "Reserve". There is no guarantee that the information remains
// unchanged after "Reserve". // unchanged after "Reserve".
func (f *framework) NodeInfoSnapshot() *cache.NodeInfoSnapshot { func (f *framework) NodeInfoSnapshot() *schedulernodeinfo.Snapshot {
return f.nodeInfoSnapshot return f.nodeInfoSnapshot
} }

View File

@ -25,7 +25,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
) )
// Code is the Status code/type which is returned from plugins. // Code is the Status code/type which is returned from plugins.
@ -357,7 +357,7 @@ type FrameworkHandle interface {
// cycle(permit/pre-bind/bind/post-bind/un-reserve plugin) should not use it, // cycle(permit/pre-bind/bind/post-bind/un-reserve plugin) should not use it,
// otherwise a concurrent read/write error might occur, they should use scheduler // otherwise a concurrent read/write error might occur, they should use scheduler
// cache instead. // cache instead.
NodeInfoSnapshot() *internalcache.NodeInfoSnapshot NodeInfoSnapshot() *schedulernodeinfo.Snapshot
// IterateOverWaitingPods acquires a read lock and iterates over the WaitingPods map. // IterateOverWaitingPods acquires a read lock and iterates over the WaitingPods map.
IterateOverWaitingPods(callback func(WaitingPod)) IterateOverWaitingPods(callback func(WaitingPod))

View File

@ -124,13 +124,6 @@ func newNodeInfoListItem(ni *schedulernodeinfo.NodeInfo) *nodeInfoListItem {
} }
} }
// NewNodeInfoSnapshot initializes a NodeInfoSnapshot struct and returns it.
func NewNodeInfoSnapshot() *NodeInfoSnapshot {
return &NodeInfoSnapshot{
NodeInfoMap: make(map[string]*schedulernodeinfo.NodeInfo),
}
}
// moveNodeInfoToHead moves a NodeInfo to the head of "cache.nodes" doubly // moveNodeInfoToHead moves a NodeInfo to the head of "cache.nodes" doubly
// linked list. The head is the most recently updated NodeInfo. // linked list. The head is the most recently updated NodeInfo.
// We assume cache lock is already acquired. // We assume cache lock is already acquired.
@ -210,7 +203,7 @@ func (cache *schedulerCache) Snapshot() *Snapshot {
// beginning of every scheduling cycle. // beginning of every scheduling cycle.
// This function tracks generation number of NodeInfo and updates only the // This function tracks generation number of NodeInfo and updates only the
// entries of an existing snapshot that have changed after the snapshot was taken. // entries of an existing snapshot that have changed after the snapshot was taken.
func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *NodeInfoSnapshot) error { func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *schedulernodeinfo.Snapshot) error {
cache.mu.Lock() cache.mu.Lock()
defer cache.mu.Unlock() defer cache.mu.Unlock()
balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes)

View File

@ -23,7 +23,7 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
@ -1077,7 +1077,7 @@ func TestNodeOperators(t *testing.T) {
} }
// Case 2: dump cached nodes successfully. // Case 2: dump cached nodes successfully.
cachedNodes := NewNodeInfoSnapshot() cachedNodes := schedulernodeinfo.NewSnapshot()
cache.UpdateNodeInfoSnapshot(cachedNodes) cache.UpdateNodeInfoSnapshot(cachedNodes)
newNode, found := cachedNodes.NodeInfoMap[node.Name] newNode, found := cachedNodes.NodeInfoMap[node.Name]
if !found || len(cachedNodes.NodeInfoMap) != 1 { if !found || len(cachedNodes.NodeInfoMap) != 1 {
@ -1180,7 +1180,7 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) {
} }
var cache *schedulerCache var cache *schedulerCache
var snapshot *NodeInfoSnapshot var snapshot *schedulernodeinfo.Snapshot
type operation = func() type operation = func()
addNode := func(i int) operation { addNode := func(i int) operation {
@ -1333,7 +1333,7 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
cache = newSchedulerCache(time.Second, time.Second, nil) cache = newSchedulerCache(time.Second, time.Second, nil)
snapshot = NewNodeInfoSnapshot() snapshot = schedulernodeinfo.NewSnapshot()
for _, op := range test.operations { for _, op := range test.operations {
op() op()
@ -1364,7 +1364,7 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) {
} }
} }
func compareCacheWithNodeInfoSnapshot(cache *schedulerCache, snapshot *NodeInfoSnapshot) error { func compareCacheWithNodeInfoSnapshot(cache *schedulerCache, snapshot *schedulernodeinfo.Snapshot) error {
if len(snapshot.NodeInfoMap) != len(cache.nodes) { if len(snapshot.NodeInfoMap) != len(cache.nodes) {
return fmt.Errorf("unexpected number of nodes in the snapshot. Expected: %v, got: %v", len(cache.nodes), len(snapshot.NodeInfoMap)) return fmt.Errorf("unexpected number of nodes in the snapshot. Expected: %v, got: %v", len(cache.nodes), len(snapshot.NodeInfoMap))
} }
@ -1390,7 +1390,7 @@ func BenchmarkUpdate1kNodes30kPods(b *testing.B) {
cache := setupCacheOf1kNodes30kPods(b) cache := setupCacheOf1kNodes30kPods(b)
b.ResetTimer() b.ResetTimer()
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
cachedNodes := NewNodeInfoSnapshot() cachedNodes := schedulernodeinfo.NewSnapshot()
cache.UpdateNodeInfoSnapshot(cachedNodes) cache.UpdateNodeInfoSnapshot(cachedNodes)
} }
} }

View File

@ -8,6 +8,7 @@ go_library(
deps = [ deps = [
"//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library", "//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",

View File

@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
) )
// Cache is used for testing // Cache is used for testing
@ -85,7 +86,7 @@ func (c *Cache) UpdateCSINode(oldCSINode, newCSINode *storagev1beta1.CSINode) er
func (c *Cache) RemoveCSINode(csiNode *storagev1beta1.CSINode) error { return nil } func (c *Cache) RemoveCSINode(csiNode *storagev1beta1.CSINode) error { return nil }
// UpdateNodeInfoSnapshot is a fake method for testing. // UpdateNodeInfoSnapshot is a fake method for testing.
func (c *Cache) UpdateNodeInfoSnapshot(nodeSnapshot *internalcache.NodeInfoSnapshot) error { func (c *Cache) UpdateNodeInfoSnapshot(nodeSnapshot *schedulernodeinfo.Snapshot) error {
return nil return nil
} }

View File

@ -101,7 +101,7 @@ type Cache interface {
// UpdateNodeInfoSnapshot updates the passed infoSnapshot to the current contents of Cache. // UpdateNodeInfoSnapshot updates the passed infoSnapshot to the current contents of Cache.
// The node info contains aggregated information of pods scheduled (including assumed to be) // The node info contains aggregated information of pods scheduled (including assumed to be)
// on this node. // on this node.
UpdateNodeInfoSnapshot(nodeSnapshot *NodeInfoSnapshot) error UpdateNodeInfoSnapshot(nodeSnapshot *schedulernodeinfo.Snapshot) error
// AddCSINode adds overall CSI-related information about node. // AddCSINode adds overall CSI-related information about node.
AddCSINode(csiNode *storagev1beta1.CSINode) error AddCSINode(csiNode *storagev1beta1.CSINode) error
@ -130,11 +130,3 @@ type Snapshot struct {
AssumedPods map[string]bool AssumedPods map[string]bool
Nodes map[string]*schedulernodeinfo.NodeInfo Nodes map[string]*schedulernodeinfo.NodeInfo
} }
// NodeInfoSnapshot is a snapshot of cache NodeInfo. The scheduler takes a
// snapshot at the beginning of each scheduling cycle and uses it for its
// operations in that cycle.
type NodeInfoSnapshot struct {
NodeInfoMap map[string]*schedulernodeinfo.NodeInfo
Generation int64
}

View File

@ -33,8 +33,8 @@ go_test(
deps = [ deps = [
"//pkg/api/v1/pod:go_default_library", "//pkg/api/v1/pod:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/util:go_default_library", "//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -30,8 +30,8 @@ import (
"k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/clock"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/util"
) )
@ -163,7 +163,7 @@ func (*fakeFramework) QueueSortFunc() framework.LessFunc {
} }
} }
func (*fakeFramework) NodeInfoSnapshot() *internalcache.NodeInfoSnapshot { func (*fakeFramework) NodeInfoSnapshot() *schedulernodeinfo.Snapshot {
return nil return nil
} }

View File

@ -5,6 +5,7 @@ go_library(
srcs = [ srcs = [
"host_ports.go", "host_ports.go",
"node_info.go", "node_info.go",
"snapshot.go",
"util.go", "util.go",
], ],
importpath = "k8s.io/kubernetes/pkg/scheduler/nodeinfo", importpath = "k8s.io/kubernetes/pkg/scheduler/nodeinfo",

View File

@ -0,0 +1,32 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodeinfo
// Snapshot is a snapshot of cache NodeInfo. The scheduler takes a
// snapshot at the beginning of each scheduling cycle and uses it for its
// operations in that cycle.
type Snapshot struct {
NodeInfoMap map[string]*NodeInfo
Generation int64
}
// NewSnapshot initializes a Snapshot struct and returns it.
func NewSnapshot() *Snapshot {
return &Snapshot{
NodeInfoMap: make(map[string]*NodeInfo),
}
}