Internal channels for scheduler

This commit is contained in:
Harsh Singh 2020-04-01 08:04:13 +05:30
parent 60df45fa55
commit 015d06bf41
10 changed files with 22 additions and 15 deletions

View File

@ -28,7 +28,6 @@ import (
"time" "time"
"k8s.io/klog" "k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1" policy "k8s.io/api/policy/v1beta1"
@ -40,6 +39,7 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/listers" "k8s.io/kubernetes/pkg/scheduler/listers"
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
@ -438,7 +438,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p
return filtered, nil return filtered, nil
} }
errCh := util.NewErrorChannel() errCh := parallelize.NewErrorChannel()
var statusesLock sync.Mutex var statusesLock sync.Mutex
var filteredLen int32 var filteredLen int32
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)

View File

@ -13,6 +13,7 @@ go_library(
"//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/parallelize:go_default_library", "//pkg/scheduler/internal/parallelize:go_default_library",
"//pkg/scheduler/listers:go_default_library", "//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/types:go_default_library", "//pkg/scheduler/types:go_default_library",
"//pkg/scheduler/util:go_default_library", "//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@ -21,13 +21,14 @@ import (
"fmt" "fmt"
"sync" "sync"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog" "k8s.io/klog"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize" "k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedulertypes "k8s.io/kubernetes/pkg/scheduler/types" schedulertypes "k8s.io/kubernetes/pkg/scheduler/types"
schedutil "k8s.io/kubernetes/pkg/scheduler/util" schedutil "k8s.io/kubernetes/pkg/scheduler/util"
) )
@ -209,8 +210,8 @@ func podMatchesAllAffinityTerms(pod *v1.Pod, terms []*affinityTerm) bool {
// getTPMapMatchingExistingAntiAffinity calculates the following for each existing pod on each node: // getTPMapMatchingExistingAntiAffinity calculates the following for each existing pod on each node:
// (1) Whether it has PodAntiAffinity // (1) Whether it has PodAntiAffinity
// (2) Whether any AffinityTerm matches the incoming pod // (2) Whether any AffinityTerm matches the incoming pod
func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*schedulertypes.NodeInfo) (topologyToMatchedTermCount, error) { func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*nodeinfo.NodeInfo) (topologyToMatchedTermCount, error) {
errCh := schedutil.NewErrorChannel() errCh := parallelize.NewErrorChannel()
var lock sync.Mutex var lock sync.Mutex
topologyMap := make(topologyToMatchedTermCount) topologyMap := make(topologyToMatchedTermCount)

View File

@ -231,7 +231,7 @@ func (pl *InterPodAffinity) PreScore(
antiAffinityTerms: antiAffinityTerms, antiAffinityTerms: antiAffinityTerms,
} }
errCh := schedutil.NewErrorChannel() errCh := parallelize.NewErrorChannel()
ctx, cancel := context.WithCancel(pCtx) ctx, cancel := context.WithCancel(pCtx)
processNode := func(i int) { processNode := func(i int) {
nodeInfo := allNodes[i] nodeInfo := allNodes[i]

View File

@ -19,7 +19,6 @@ go_library(
"//pkg/scheduler/listers:go_default_library", "//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/types:go_default_library", "//pkg/scheduler/types:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",

View File

@ -35,7 +35,6 @@ import (
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
schedulertypes "k8s.io/kubernetes/pkg/scheduler/types" schedulertypes "k8s.io/kubernetes/pkg/scheduler/types"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
) )
const ( const (
@ -510,7 +509,7 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod
pluginToNodeScores[pl.Name()] = make(NodeScoreList, len(nodes)) pluginToNodeScores[pl.Name()] = make(NodeScoreList, len(nodes))
} }
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
errCh := schedutil.NewErrorChannel() errCh := parallelize.NewErrorChannel()
// Run Score method for each node in parallel. // Run Score method for each node in parallel.
parallelize.Until(ctx, len(nodes), func(index int) { parallelize.Until(ctx, len(nodes), func(index int) {

View File

@ -1,8 +1,11 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library") load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = ["parallelism.go"], srcs = [
"error_channel.go",
"parallelism.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/internal/parallelize", importpath = "k8s.io/kubernetes/pkg/scheduler/internal/parallelize",
visibility = ["//pkg/scheduler:__subpackages__"], visibility = ["//pkg/scheduler:__subpackages__"],
deps = ["//staging/src/k8s.io/client-go/util/workqueue:go_default_library"], deps = ["//staging/src/k8s.io/client-go/util/workqueue:go_default_library"],
@ -21,3 +24,9 @@ filegroup(
tags = ["automanaged"], tags = ["automanaged"],
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
) )
go_test(
name = "go_default_test",
srcs = ["error_channel_test.go"],
embed = [":go_default_library"],
)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package util package parallelize
import "context" import "context"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package util package parallelize
import ( import (
"context" "context"

View File

@ -9,7 +9,6 @@ load(
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = [ srcs = [
"error_channel_test.go",
"non_zero_test.go", "non_zero_test.go",
"topologies_test.go", "topologies_test.go",
"utils_test.go", "utils_test.go",
@ -31,7 +30,6 @@ go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"clock.go", "clock.go",
"error_channel.go",
"non_zero.go", "non_zero.go",
"topologies.go", "topologies.go",
"utils.go", "utils.go",