From ffef11f7686378a23e982f9c90629e183780bd36 Mon Sep 17 00:00:00 2001 From: draveness Date: Fri, 23 Aug 2019 22:32:16 +0800 Subject: [PATCH] feat(scheduler): move node info snapshot out of internal package --- pkg/scheduler/core/generic_scheduler.go | 2 +- pkg/scheduler/framework/v1alpha1/BUILD | 2 +- pkg/scheduler/framework/v1alpha1/framework.go | 8 ++--- pkg/scheduler/framework/v1alpha1/interface.go | 4 +-- pkg/scheduler/internal/cache/cache.go | 9 +----- pkg/scheduler/internal/cache/cache_test.go | 12 +++---- pkg/scheduler/internal/cache/fake/BUILD | 1 + .../internal/cache/fake/fake_cache.go | 3 +- pkg/scheduler/internal/cache/interface.go | 10 +----- pkg/scheduler/internal/queue/BUILD | 2 +- .../internal/queue/scheduling_queue_test.go | 4 +-- pkg/scheduler/nodeinfo/BUILD | 1 + pkg/scheduler/nodeinfo/snapshot.go | 32 +++++++++++++++++++ 13 files changed, 55 insertions(+), 35 deletions(-) create mode 100644 pkg/scheduler/nodeinfo/snapshot.go diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 1fe3b913d76..39edb142d33 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -167,7 +167,7 @@ type genericScheduler struct { framework framework.Framework extenders []algorithm.SchedulerExtender alwaysCheckAllPredicates bool - nodeInfoSnapshot *internalcache.NodeInfoSnapshot + nodeInfoSnapshot *schedulernodeinfo.Snapshot volumeBinder *volumebinder.VolumeBinder pvcLister corelisters.PersistentVolumeClaimLister pdbLister algorithm.PDBLister diff --git a/pkg/scheduler/framework/v1alpha1/BUILD b/pkg/scheduler/framework/v1alpha1/BUILD index 02b272c235d..b942588d778 100644 --- a/pkg/scheduler/framework/v1alpha1/BUILD +++ b/pkg/scheduler/framework/v1alpha1/BUILD @@ -14,7 +14,7 @@ go_library( deps = [ "//pkg/scheduler/api:go_default_library", "//pkg/scheduler/apis/config:go_default_library", - "//pkg/scheduler/internal/cache:go_default_library", + "//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index 893567b58a2..46e0747cfc1 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -27,7 +27,7 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog" "k8s.io/kubernetes/pkg/scheduler/apis/config" - "k8s.io/kubernetes/pkg/scheduler/internal/cache" + schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" schedutil "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -35,7 +35,7 @@ import ( // plugins. type framework struct { registry Registry - nodeInfoSnapshot *cache.NodeInfoSnapshot + nodeInfoSnapshot *schedulernodeinfo.Snapshot waitingPods *waitingPodsMap pluginNameToWeightMap map[string]int queueSortPlugins []QueueSortPlugin @@ -63,7 +63,7 @@ var _ = Framework(&framework{}) func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfig) (Framework, error) { f := &framework{ registry: r, - nodeInfoSnapshot: cache.NewNodeInfoSnapshot(), + nodeInfoSnapshot: schedulernodeinfo.NewSnapshot(), pluginNameToWeightMap: make(map[string]int), waitingPods: newWaitingPodsMap(), } @@ -566,7 +566,7 @@ func (f *framework) RunPermitPlugins( // is taken at the beginning of a scheduling cycle and remains unchanged until a // pod finishes "Reserve". There is no guarantee that the information remains // unchanged after "Reserve". -func (f *framework) NodeInfoSnapshot() *cache.NodeInfoSnapshot { +func (f *framework) NodeInfoSnapshot() *schedulernodeinfo.Snapshot { return f.nodeInfoSnapshot } diff --git a/pkg/scheduler/framework/v1alpha1/interface.go b/pkg/scheduler/framework/v1alpha1/interface.go index ba5b8a77895..ae5d4fe210e 100644 --- a/pkg/scheduler/framework/v1alpha1/interface.go +++ b/pkg/scheduler/framework/v1alpha1/interface.go @@ -25,7 +25,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" - internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" + schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) // Code is the Status code/type which is returned from plugins. @@ -357,7 +357,7 @@ type FrameworkHandle interface { // cycle(permit/pre-bind/bind/post-bind/un-reserve plugin) should not use it, // otherwise a concurrent read/write error might occur, they should use scheduler // cache instead. - NodeInfoSnapshot() *internalcache.NodeInfoSnapshot + NodeInfoSnapshot() *schedulernodeinfo.Snapshot // IterateOverWaitingPods acquires a read lock and iterates over the WaitingPods map. IterateOverWaitingPods(callback func(WaitingPod)) diff --git a/pkg/scheduler/internal/cache/cache.go b/pkg/scheduler/internal/cache/cache.go index 1d79227387e..9a25cb4daf8 100644 --- a/pkg/scheduler/internal/cache/cache.go +++ b/pkg/scheduler/internal/cache/cache.go @@ -124,13 +124,6 @@ func newNodeInfoListItem(ni *schedulernodeinfo.NodeInfo) *nodeInfoListItem { } } -// NewNodeInfoSnapshot initializes a NodeInfoSnapshot struct and returns it. -func NewNodeInfoSnapshot() *NodeInfoSnapshot { - return &NodeInfoSnapshot{ - NodeInfoMap: make(map[string]*schedulernodeinfo.NodeInfo), - } -} - // moveNodeInfoToHead moves a NodeInfo to the head of "cache.nodes" doubly // linked list. The head is the most recently updated NodeInfo. // We assume cache lock is already acquired. @@ -210,7 +203,7 @@ func (cache *schedulerCache) Snapshot() *Snapshot { // beginning of every scheduling cycle. // This function tracks generation number of NodeInfo and updates only the // entries of an existing snapshot that have changed after the snapshot was taken. -func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *NodeInfoSnapshot) error { +func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *schedulernodeinfo.Snapshot) error { cache.mu.Lock() defer cache.mu.Unlock() balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) diff --git a/pkg/scheduler/internal/cache/cache_test.go b/pkg/scheduler/internal/cache/cache_test.go index 95b9de7bc33..f243efb3cfd 100644 --- a/pkg/scheduler/internal/cache/cache_test.go +++ b/pkg/scheduler/internal/cache/cache_test.go @@ -23,7 +23,7 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -1077,7 +1077,7 @@ func TestNodeOperators(t *testing.T) { } // Case 2: dump cached nodes successfully. - cachedNodes := NewNodeInfoSnapshot() + cachedNodes := schedulernodeinfo.NewSnapshot() cache.UpdateNodeInfoSnapshot(cachedNodes) newNode, found := cachedNodes.NodeInfoMap[node.Name] if !found || len(cachedNodes.NodeInfoMap) != 1 { @@ -1180,7 +1180,7 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) { } var cache *schedulerCache - var snapshot *NodeInfoSnapshot + var snapshot *schedulernodeinfo.Snapshot type operation = func() addNode := func(i int) operation { @@ -1333,7 +1333,7 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { cache = newSchedulerCache(time.Second, time.Second, nil) - snapshot = NewNodeInfoSnapshot() + snapshot = schedulernodeinfo.NewSnapshot() for _, op := range test.operations { op() @@ -1364,7 +1364,7 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) { } } -func compareCacheWithNodeInfoSnapshot(cache *schedulerCache, snapshot *NodeInfoSnapshot) error { +func compareCacheWithNodeInfoSnapshot(cache *schedulerCache, snapshot *schedulernodeinfo.Snapshot) error { if len(snapshot.NodeInfoMap) != len(cache.nodes) { return fmt.Errorf("unexpected number of nodes in the snapshot. Expected: %v, got: %v", len(cache.nodes), len(snapshot.NodeInfoMap)) } @@ -1390,7 +1390,7 @@ func BenchmarkUpdate1kNodes30kPods(b *testing.B) { cache := setupCacheOf1kNodes30kPods(b) b.ResetTimer() for n := 0; n < b.N; n++ { - cachedNodes := NewNodeInfoSnapshot() + cachedNodes := schedulernodeinfo.NewSnapshot() cache.UpdateNodeInfoSnapshot(cachedNodes) } } diff --git a/pkg/scheduler/internal/cache/fake/BUILD b/pkg/scheduler/internal/cache/fake/BUILD index 1193d9cc5d7..8659788fb85 100644 --- a/pkg/scheduler/internal/cache/fake/BUILD +++ b/pkg/scheduler/internal/cache/fake/BUILD @@ -8,6 +8,7 @@ go_library( deps = [ "//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", + "//pkg/scheduler/nodeinfo:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", diff --git a/pkg/scheduler/internal/cache/fake/fake_cache.go b/pkg/scheduler/internal/cache/fake/fake_cache.go index 75ca7f37cf2..5350a2ec776 100644 --- a/pkg/scheduler/internal/cache/fake/fake_cache.go +++ b/pkg/scheduler/internal/cache/fake/fake_cache.go @@ -22,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/kubernetes/pkg/scheduler/algorithm" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" + schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) // Cache is used for testing @@ -85,7 +86,7 @@ func (c *Cache) UpdateCSINode(oldCSINode, newCSINode *storagev1beta1.CSINode) er func (c *Cache) RemoveCSINode(csiNode *storagev1beta1.CSINode) error { return nil } // UpdateNodeInfoSnapshot is a fake method for testing. -func (c *Cache) UpdateNodeInfoSnapshot(nodeSnapshot *internalcache.NodeInfoSnapshot) error { +func (c *Cache) UpdateNodeInfoSnapshot(nodeSnapshot *schedulernodeinfo.Snapshot) error { return nil } diff --git a/pkg/scheduler/internal/cache/interface.go b/pkg/scheduler/internal/cache/interface.go index 2371abfcaea..00f711d4869 100644 --- a/pkg/scheduler/internal/cache/interface.go +++ b/pkg/scheduler/internal/cache/interface.go @@ -101,7 +101,7 @@ type Cache interface { // UpdateNodeInfoSnapshot updates the passed infoSnapshot to the current contents of Cache. // The node info contains aggregated information of pods scheduled (including assumed to be) // on this node. - UpdateNodeInfoSnapshot(nodeSnapshot *NodeInfoSnapshot) error + UpdateNodeInfoSnapshot(nodeSnapshot *schedulernodeinfo.Snapshot) error // AddCSINode adds overall CSI-related information about node. AddCSINode(csiNode *storagev1beta1.CSINode) error @@ -130,11 +130,3 @@ type Snapshot struct { AssumedPods map[string]bool Nodes map[string]*schedulernodeinfo.NodeInfo } - -// NodeInfoSnapshot is a snapshot of cache NodeInfo. The scheduler takes a -// snapshot at the beginning of each scheduling cycle and uses it for its -// operations in that cycle. -type NodeInfoSnapshot struct { - NodeInfoMap map[string]*schedulernodeinfo.NodeInfo - Generation int64 -} diff --git a/pkg/scheduler/internal/queue/BUILD b/pkg/scheduler/internal/queue/BUILD index c9c52749139..915e557e4ad 100644 --- a/pkg/scheduler/internal/queue/BUILD +++ b/pkg/scheduler/internal/queue/BUILD @@ -33,8 +33,8 @@ go_test( deps = [ "//pkg/api/v1/pod:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", - "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/metrics:go_default_library", + "//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 475dffe4bca..abc75a39474 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -30,8 +30,8 @@ import ( "k8s.io/apimachinery/pkg/util/clock" podutil "k8s.io/kubernetes/pkg/api/v1/pod" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" - internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" "k8s.io/kubernetes/pkg/scheduler/metrics" + schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -163,7 +163,7 @@ func (*fakeFramework) QueueSortFunc() framework.LessFunc { } } -func (*fakeFramework) NodeInfoSnapshot() *internalcache.NodeInfoSnapshot { +func (*fakeFramework) NodeInfoSnapshot() *schedulernodeinfo.Snapshot { return nil } diff --git a/pkg/scheduler/nodeinfo/BUILD b/pkg/scheduler/nodeinfo/BUILD index 70c156041f7..ec7644e790a 100644 --- a/pkg/scheduler/nodeinfo/BUILD +++ b/pkg/scheduler/nodeinfo/BUILD @@ -5,6 +5,7 @@ go_library( srcs = [ "host_ports.go", "node_info.go", + "snapshot.go", "util.go", ], importpath = "k8s.io/kubernetes/pkg/scheduler/nodeinfo", diff --git a/pkg/scheduler/nodeinfo/snapshot.go b/pkg/scheduler/nodeinfo/snapshot.go new file mode 100644 index 00000000000..d60c6fddb06 --- /dev/null +++ b/pkg/scheduler/nodeinfo/snapshot.go @@ -0,0 +1,32 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodeinfo + +// Snapshot is a snapshot of cache NodeInfo. The scheduler takes a +// snapshot at the beginning of each scheduling cycle and uses it for its +// operations in that cycle. +type Snapshot struct { + NodeInfoMap map[string]*NodeInfo + Generation int64 +} + +// NewSnapshot initializes a Snapshot struct and returns it. +func NewSnapshot() *Snapshot { + return &Snapshot{ + NodeInfoMap: make(map[string]*NodeInfo), + } +}