From 015d06bf41001f42365ba4f6620f922dfead0f74 Mon Sep 17 00:00:00 2001 From: Harsh Singh Date: Wed, 1 Apr 2020 08:04:13 +0530 Subject: [PATCH] Internal channels for scheduler --- pkg/scheduler/core/generic_scheduler.go | 4 ++-- .../framework/plugins/interpodaffinity/BUILD | 1 + .../framework/plugins/interpodaffinity/filtering.go | 7 ++++--- .../framework/plugins/interpodaffinity/scoring.go | 2 +- pkg/scheduler/framework/v1alpha1/BUILD | 1 - pkg/scheduler/framework/v1alpha1/framework.go | 3 +-- pkg/scheduler/internal/parallelize/BUILD | 13 +++++++++++-- .../{util => internal/parallelize}/error_channel.go | 2 +- .../parallelize}/error_channel_test.go | 2 +- pkg/scheduler/util/BUILD | 2 -- 10 files changed, 22 insertions(+), 15 deletions(-) rename pkg/scheduler/{util => internal/parallelize}/error_channel.go (98%) rename pkg/scheduler/{util => internal/parallelize}/error_channel_test.go (98%) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 5a52aada6f6..527af3dc277 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -28,7 +28,6 @@ import ( "time" "k8s.io/klog" - "k8s.io/kubernetes/pkg/scheduler/internal/parallelize" v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1beta1" @@ -40,6 +39,7 @@ import ( podutil "k8s.io/kubernetes/pkg/api/v1/pod" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" + "k8s.io/kubernetes/pkg/scheduler/internal/parallelize" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/listers" "k8s.io/kubernetes/pkg/scheduler/metrics" @@ -438,7 +438,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p return filtered, nil } - errCh := util.NewErrorChannel() + errCh := parallelize.NewErrorChannel() var statusesLock sync.Mutex var filteredLen int32 ctx, cancel := context.WithCancel(ctx) diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/BUILD b/pkg/scheduler/framework/plugins/interpodaffinity/BUILD index e508192bde0..8bea46d3872 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/BUILD +++ b/pkg/scheduler/framework/plugins/interpodaffinity/BUILD @@ -13,6 +13,7 @@ go_library( "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/internal/parallelize:go_default_library", "//pkg/scheduler/listers:go_default_library", + "//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/types:go_default_library", "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go b/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go index 95dea48d563..c71dde56287 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go @@ -21,13 +21,14 @@ import ( "fmt" "sync" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" "k8s.io/kubernetes/pkg/scheduler/internal/parallelize" + "k8s.io/kubernetes/pkg/scheduler/nodeinfo" schedulertypes "k8s.io/kubernetes/pkg/scheduler/types" schedutil "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -209,8 +210,8 @@ func podMatchesAllAffinityTerms(pod *v1.Pod, terms []*affinityTerm) bool { // getTPMapMatchingExistingAntiAffinity calculates the following for each existing pod on each node: // (1) Whether it has PodAntiAffinity // (2) Whether any AffinityTerm matches the incoming pod -func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*schedulertypes.NodeInfo) (topologyToMatchedTermCount, error) { - errCh := schedutil.NewErrorChannel() +func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*nodeinfo.NodeInfo) (topologyToMatchedTermCount, error) { + errCh := parallelize.NewErrorChannel() var lock sync.Mutex topologyMap := make(topologyToMatchedTermCount) diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go b/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go index 9467448e92c..a211a18ccbb 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go @@ -231,7 +231,7 @@ func (pl *InterPodAffinity) PreScore( antiAffinityTerms: antiAffinityTerms, } - errCh := schedutil.NewErrorChannel() + errCh := parallelize.NewErrorChannel() ctx, cancel := context.WithCancel(pCtx) processNode := func(i int) { nodeInfo := allNodes[i] diff --git a/pkg/scheduler/framework/v1alpha1/BUILD b/pkg/scheduler/framework/v1alpha1/BUILD index a2e11b7f6f2..515b298ca62 100644 --- a/pkg/scheduler/framework/v1alpha1/BUILD +++ b/pkg/scheduler/framework/v1alpha1/BUILD @@ -19,7 +19,6 @@ go_library( "//pkg/scheduler/listers:go_default_library", "//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/types:go_default_library", - "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index e2a1c531720..4a85f83d668 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -35,7 +35,6 @@ import ( schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" "k8s.io/kubernetes/pkg/scheduler/metrics" schedulertypes "k8s.io/kubernetes/pkg/scheduler/types" - schedutil "k8s.io/kubernetes/pkg/scheduler/util" ) const ( @@ -510,7 +509,7 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod pluginToNodeScores[pl.Name()] = make(NodeScoreList, len(nodes)) } ctx, cancel := context.WithCancel(ctx) - errCh := schedutil.NewErrorChannel() + errCh := parallelize.NewErrorChannel() // Run Score method for each node in parallel. parallelize.Until(ctx, len(nodes), func(index int) { diff --git a/pkg/scheduler/internal/parallelize/BUILD b/pkg/scheduler/internal/parallelize/BUILD index dcc3d6473fe..d78eb1121ed 100644 --- a/pkg/scheduler/internal/parallelize/BUILD +++ b/pkg/scheduler/internal/parallelize/BUILD @@ -1,8 +1,11 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", - srcs = ["parallelism.go"], + srcs = [ + "error_channel.go", + "parallelism.go", + ], importpath = "k8s.io/kubernetes/pkg/scheduler/internal/parallelize", visibility = ["//pkg/scheduler:__subpackages__"], deps = ["//staging/src/k8s.io/client-go/util/workqueue:go_default_library"], @@ -21,3 +24,9 @@ filegroup( tags = ["automanaged"], visibility = ["//visibility:public"], ) + +go_test( + name = "go_default_test", + srcs = ["error_channel_test.go"], + embed = [":go_default_library"], +) diff --git a/pkg/scheduler/util/error_channel.go b/pkg/scheduler/internal/parallelize/error_channel.go similarity index 98% rename from pkg/scheduler/util/error_channel.go rename to pkg/scheduler/internal/parallelize/error_channel.go index ef300a79d90..2eff825bae8 100644 --- a/pkg/scheduler/util/error_channel.go +++ b/pkg/scheduler/internal/parallelize/error_channel.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package parallelize import "context" diff --git a/pkg/scheduler/util/error_channel_test.go b/pkg/scheduler/internal/parallelize/error_channel_test.go similarity index 98% rename from pkg/scheduler/util/error_channel_test.go rename to pkg/scheduler/internal/parallelize/error_channel_test.go index 7a1ba0e3c1b..fc756f38b62 100644 --- a/pkg/scheduler/util/error_channel_test.go +++ b/pkg/scheduler/internal/parallelize/error_channel_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package parallelize import ( "context" diff --git a/pkg/scheduler/util/BUILD b/pkg/scheduler/util/BUILD index 857710883e1..4f9b9b52bad 100644 --- a/pkg/scheduler/util/BUILD +++ b/pkg/scheduler/util/BUILD @@ -9,7 +9,6 @@ load( go_test( name = "go_default_test", srcs = [ - "error_channel_test.go", "non_zero_test.go", "topologies_test.go", "utils_test.go", @@ -31,7 +30,6 @@ go_library( name = "go_default_library", srcs = [ "clock.go", - "error_channel.go", "non_zero.go", "topologies.go", "utils.go",