mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-09 03:57:41 +00:00
Merge pull request #118606 from sanposhiho/refactor-score
refactor: simplify RunScorePlugins for readability + performance
This commit is contained in:
commit
9740bc0e0a
@ -1016,19 +1016,17 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
|
|||||||
allNodePluginScores := make([]framework.NodePluginScores, len(nodes))
|
allNodePluginScores := make([]framework.NodePluginScores, len(nodes))
|
||||||
numPlugins := len(f.scorePlugins) - state.SkipScorePlugins.Len()
|
numPlugins := len(f.scorePlugins) - state.SkipScorePlugins.Len()
|
||||||
plugins := make([]framework.ScorePlugin, 0, numPlugins)
|
plugins := make([]framework.ScorePlugin, 0, numPlugins)
|
||||||
pluginToNodeScores := make(map[string]framework.NodeScoreList, numPlugins)
|
pluginToNodeScores := make([]framework.NodeScoreList, numPlugins)
|
||||||
for _, pl := range f.scorePlugins {
|
for _, pl := range f.scorePlugins {
|
||||||
if state.SkipScorePlugins.Has(pl.Name()) {
|
if state.SkipScorePlugins.Has(pl.Name()) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
plugins = append(plugins, pl)
|
plugins = append(plugins, pl)
|
||||||
pluginToNodeScores[pl.Name()] = make(framework.NodeScoreList, len(nodes))
|
|
||||||
}
|
}
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
errCh := parallelize.NewErrorChannel()
|
errCh := parallelize.NewErrorChannel()
|
||||||
|
|
||||||
if len(plugins) > 0 {
|
|
||||||
logger := klog.FromContext(ctx)
|
logger := klog.FromContext(ctx)
|
||||||
logger = klog.LoggerWithName(logger, "Score")
|
logger = klog.LoggerWithName(logger, "Score")
|
||||||
// TODO(knelasevero): Remove duplicated keys from log entry calls
|
// TODO(knelasevero): Remove duplicated keys from log entry calls
|
||||||
@ -1036,11 +1034,13 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
|
|||||||
// https://github.com/kubernetes/kubernetes/issues/111672
|
// https://github.com/kubernetes/kubernetes/issues/111672
|
||||||
logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod))
|
logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod))
|
||||||
// Run Score method for each node in parallel.
|
// Run Score method for each node in parallel.
|
||||||
f.Parallelizer().Until(ctx, len(nodes), func(index int) {
|
f.Parallelizer().Until(ctx, len(plugins), func(i int) {
|
||||||
nodeName := nodes[index].Name
|
pl := plugins[i]
|
||||||
logger := klog.LoggerWithValues(logger, "node", klog.ObjectRef{Name: nodeName})
|
|
||||||
for _, pl := range plugins {
|
|
||||||
logger := klog.LoggerWithName(logger, pl.Name())
|
logger := klog.LoggerWithName(logger, pl.Name())
|
||||||
|
nodeScores := make(framework.NodeScoreList, len(nodes))
|
||||||
|
for index, node := range nodes {
|
||||||
|
nodeName := node.Name
|
||||||
|
logger := klog.LoggerWithValues(logger, "node", klog.ObjectRef{Name: nodeName})
|
||||||
ctx := klog.NewContext(ctx, logger)
|
ctx := klog.NewContext(ctx, logger)
|
||||||
s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
|
s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
|
||||||
if !status.IsSuccess() {
|
if !status.IsSuccess() {
|
||||||
@ -1048,30 +1048,24 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
|
|||||||
errCh.SendErrorWithCancel(err, cancel)
|
errCh.SendErrorWithCancel(err, cancel)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
pluginToNodeScores[pl.Name()][index] = framework.NodeScore{
|
nodeScores[index] = framework.NodeScore{
|
||||||
Name: nodeName,
|
Name: nodeName,
|
||||||
Score: s,
|
Score: s,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}, metrics.Score)
|
|
||||||
if err := errCh.ReceiveError(); err != nil {
|
|
||||||
return nil, framework.AsStatus(fmt.Errorf("running Score plugins: %w", err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run NormalizeScore method for each ScorePlugin in parallel.
|
|
||||||
f.Parallelizer().Until(ctx, len(plugins), func(index int) {
|
|
||||||
pl := plugins[index]
|
|
||||||
if pl.ScoreExtensions() == nil {
|
if pl.ScoreExtensions() == nil {
|
||||||
|
pluginToNodeScores[i] = nodeScores
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
nodeScoreList := pluginToNodeScores[pl.Name()]
|
|
||||||
status := f.runScoreExtension(ctx, pl, state, pod, nodeScoreList)
|
status := f.runScoreExtension(ctx, pl, state, pod, nodeScores)
|
||||||
if !status.IsSuccess() {
|
if !status.IsSuccess() {
|
||||||
err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())
|
err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())
|
||||||
errCh.SendErrorWithCancel(err, cancel)
|
errCh.SendErrorWithCancel(err, cancel)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
pluginToNodeScores[i] = nodeScores
|
||||||
}, metrics.Score)
|
}, metrics.Score)
|
||||||
if err := errCh.ReceiveError(); err != nil {
|
if err := errCh.ReceiveError(); err != nil {
|
||||||
return nil, framework.AsStatus(fmt.Errorf("running Normalize on Score plugins: %w", err))
|
return nil, framework.AsStatus(fmt.Errorf("running Normalize on Score plugins: %w", err))
|
||||||
@ -1087,7 +1081,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
|
|||||||
|
|
||||||
for i, pl := range plugins {
|
for i, pl := range plugins {
|
||||||
weight := f.scorePluginWeight[pl.Name()]
|
weight := f.scorePluginWeight[pl.Name()]
|
||||||
nodeScoreList := pluginToNodeScores[pl.Name()]
|
nodeScoreList := pluginToNodeScores[i]
|
||||||
score := nodeScoreList[index].Score
|
score := nodeScoreList[index].Score
|
||||||
|
|
||||||
if score > framework.MaxNodeScore || score < framework.MinNodeScore {
|
if score > framework.MaxNodeScore || score < framework.MinNodeScore {
|
||||||
|
@ -808,6 +808,11 @@ func TestPostFilterPlugin(t *testing.T) {
|
|||||||
{Name: filterPluginName},
|
{Name: filterPluginName},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
PreScore: configv1.PluginSet{
|
||||||
|
Disabled: []configv1.Plugin{
|
||||||
|
{Name: "*"},
|
||||||
|
},
|
||||||
|
},
|
||||||
Score: configv1.PluginSet{
|
Score: configv1.PluginSet{
|
||||||
Enabled: []configv1.Plugin{
|
Enabled: []configv1.Plugin{
|
||||||
{Name: scorePluginName},
|
{Name: scorePluginName},
|
||||||
|
@ -72,11 +72,16 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// This file tests the scheduler priority functions.
|
// This file tests the scheduler priority functions.
|
||||||
func initTestSchedulerForPriorityTest(t *testing.T, scorePluginName string) *testutils.TestContext {
|
func initTestSchedulerForPriorityTest(t *testing.T, preScorePluginName, scorePluginName string) *testutils.TestContext {
|
||||||
cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{
|
cc := configv1.KubeSchedulerConfiguration{
|
||||||
Profiles: []configv1.KubeSchedulerProfile{{
|
Profiles: []configv1.KubeSchedulerProfile{{
|
||||||
SchedulerName: pointer.String(v1.DefaultSchedulerName),
|
SchedulerName: pointer.String(v1.DefaultSchedulerName),
|
||||||
Plugins: &configv1.Plugins{
|
Plugins: &configv1.Plugins{
|
||||||
|
PreScore: configv1.PluginSet{
|
||||||
|
Disabled: []configv1.Plugin{
|
||||||
|
{Name: "*"},
|
||||||
|
},
|
||||||
|
},
|
||||||
Score: configv1.PluginSet{
|
Score: configv1.PluginSet{
|
||||||
Enabled: []configv1.Plugin{
|
Enabled: []configv1.Plugin{
|
||||||
{Name: scorePluginName, Weight: pointer.Int32(1)},
|
{Name: scorePluginName, Weight: pointer.Int32(1)},
|
||||||
@ -87,7 +92,11 @@ func initTestSchedulerForPriorityTest(t *testing.T, scorePluginName string) *tes
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}},
|
}},
|
||||||
})
|
}
|
||||||
|
if preScorePluginName != "" {
|
||||||
|
cc.Profiles[0].Plugins.PreScore.Enabled = append(cc.Profiles[0].Plugins.PreScore.Enabled, configv1.Plugin{Name: preScorePluginName})
|
||||||
|
}
|
||||||
|
cfg := configtesting.V1ToInternalWithDefaults(t, cc)
|
||||||
testCtx := testutils.InitTestSchedulerWithOptions(
|
testCtx := testutils.InitTestSchedulerWithOptions(
|
||||||
t,
|
t,
|
||||||
testutils.InitTestAPIServer(t, strings.ToLower(scorePluginName), nil),
|
testutils.InitTestAPIServer(t, strings.ToLower(scorePluginName), nil),
|
||||||
@ -201,7 +210,7 @@ func TestNodeResourcesScoring(t *testing.T) {
|
|||||||
// TestNodeAffinityScoring verifies that scheduler's node affinity priority function
|
// TestNodeAffinityScoring verifies that scheduler's node affinity priority function
|
||||||
// works correctly.
|
// works correctly.
|
||||||
func TestNodeAffinityScoring(t *testing.T) {
|
func TestNodeAffinityScoring(t *testing.T) {
|
||||||
testCtx := initTestSchedulerForPriorityTest(t, nodeaffinity.Name)
|
testCtx := initTestSchedulerForPriorityTest(t, nodeaffinity.Name, nodeaffinity.Name)
|
||||||
// Add a few nodes.
|
// Add a few nodes.
|
||||||
_, err := createAndWaitForNodesInCache(testCtx, "testnode", st.MakeNode(), 4)
|
_, err := createAndWaitForNodesInCache(testCtx, "testnode", st.MakeNode(), 4)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -320,7 +329,7 @@ func TestPodAffinityScoring(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
testCtx := initTestSchedulerForPriorityTest(t, interpodaffinity.Name)
|
testCtx := initTestSchedulerForPriorityTest(t, interpodaffinity.Name, interpodaffinity.Name)
|
||||||
// Add a few nodes.
|
// Add a few nodes.
|
||||||
nodesInTopology, err := createAndWaitForNodesInCache(testCtx, "in-topology", st.MakeNode().Label(topologyKey, topologyValue), 5)
|
nodesInTopology, err := createAndWaitForNodesInCache(testCtx, "in-topology", st.MakeNode().Label(topologyKey, topologyValue), 5)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -364,7 +373,7 @@ func TestPodAffinityScoring(t *testing.T) {
|
|||||||
// TestImageLocalityScoring verifies that the scheduler's image locality priority function
|
// TestImageLocalityScoring verifies that the scheduler's image locality priority function
|
||||||
// works correctly, i.e., the pod gets scheduled to the node where its container images are ready.
|
// works correctly, i.e., the pod gets scheduled to the node where its container images are ready.
|
||||||
func TestImageLocalityScoring(t *testing.T) {
|
func TestImageLocalityScoring(t *testing.T) {
|
||||||
testCtx := initTestSchedulerForPriorityTest(t, imagelocality.Name)
|
testCtx := initTestSchedulerForPriorityTest(t, "", imagelocality.Name)
|
||||||
|
|
||||||
// Create a node with the large image.
|
// Create a node with the large image.
|
||||||
// We use a fake large image as the test image used by the pod, which has
|
// We use a fake large image as the test image used by the pod, which has
|
||||||
@ -596,7 +605,7 @@ func TestPodTopologySpreadScoring(t *testing.T) {
|
|||||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeInclusionPolicyInPodTopologySpread, tt.enableNodeInclusionPolicy)()
|
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeInclusionPolicyInPodTopologySpread, tt.enableNodeInclusionPolicy)()
|
||||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MatchLabelKeysInPodTopologySpread, tt.enableMatchLabelKeys)()
|
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MatchLabelKeysInPodTopologySpread, tt.enableMatchLabelKeys)()
|
||||||
|
|
||||||
testCtx := initTestSchedulerForPriorityTest(t, podtopologyspread.Name)
|
testCtx := initTestSchedulerForPriorityTest(t, podtopologyspread.Name, podtopologyspread.Name)
|
||||||
cs := testCtx.ClientSet
|
cs := testCtx.ClientSet
|
||||||
ns := testCtx.NS.Name
|
ns := testCtx.NS.Name
|
||||||
|
|
||||||
@ -646,7 +655,7 @@ func TestPodTopologySpreadScoring(t *testing.T) {
|
|||||||
// with the system default spreading spreads Pods belonging to a Service.
|
// with the system default spreading spreads Pods belonging to a Service.
|
||||||
// The setup has 300 nodes over 3 zones.
|
// The setup has 300 nodes over 3 zones.
|
||||||
func TestDefaultPodTopologySpreadScoring(t *testing.T) {
|
func TestDefaultPodTopologySpreadScoring(t *testing.T) {
|
||||||
testCtx := initTestSchedulerForPriorityTest(t, podtopologyspread.Name)
|
testCtx := initTestSchedulerForPriorityTest(t, podtopologyspread.Name, podtopologyspread.Name)
|
||||||
cs := testCtx.ClientSet
|
cs := testCtx.ClientSet
|
||||||
ns := testCtx.NS.Name
|
ns := testCtx.NS.Name
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user