feat(scheduler): use context in the scheduler package

+ Use context instead of stopCh
+ Add context to the scheduling framework interface
This commit is contained in:
draveness
2019-08-28 19:12:02 +08:00
parent f7091992c0
commit 47a6c5b693
42 changed files with 426 additions and 352 deletions

View File

@@ -17,6 +17,7 @@ limitations under the License.
package scheduler
import (
"context"
"fmt"
"testing"
"time"
@@ -148,7 +149,7 @@ func (sp *ScorePlugin) reset() {
}
// Score returns the score of scheduling a pod on a specific node.
func (sp *ScorePlugin) Score(state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {
func (sp *ScorePlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {
sp.numScoreCalled++
if sp.failScore {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", p.Name))
@@ -179,13 +180,13 @@ func (sp *ScoreWithNormalizePlugin) reset() {
}
// Score returns the score of scheduling a pod on a specific node.
func (sp *ScoreWithNormalizePlugin) Score(state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {
func (sp *ScoreWithNormalizePlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {
sp.numScoreCalled++
score := int64(10)
return score, nil
}
func (sp *ScoreWithNormalizePlugin) NormalizeScore(state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
func (sp *ScoreWithNormalizePlugin) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
sp.numNormalizeScoreCalled++
return nil
}
@@ -207,7 +208,7 @@ func (fp *FilterPlugin) reset() {
// Filter is a test function that returns an error or nil, depending on the
// value of "failFilter".
func (fp *FilterPlugin) Filter(state *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
func (fp *FilterPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
fp.numFilterCalled++
if fp.failFilter {
@@ -224,7 +225,7 @@ func (rp *ReservePlugin) Name() string {
// Reserve is a test function that returns an error or nil, depending on the
// value of "failReserve".
func (rp *ReservePlugin) Reserve(state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
func (rp *ReservePlugin) Reserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
rp.numReserveCalled++
if rp.failReserve {
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
@@ -243,7 +244,7 @@ func (*PostFilterPlugin) Name() string {
}
// PostFilter is a test function.
func (pfp *PostFilterPlugin) PostFilter(_ *framework.CycleState, pod *v1.Pod, _ []*v1.Node, _ framework.NodeToStatusMap) *framework.Status {
func (pfp *PostFilterPlugin) PostFilter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, _ []*v1.Node, _ framework.NodeToStatusMap) *framework.Status {
pfp.numPostFilterCalled++
if pfp.failPostFilter {
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
@@ -264,7 +265,7 @@ func (pp *PreBindPlugin) Name() string {
}
// PreBind is a test function that returns (true, nil) or errors for testing.
func (pp *PreBindPlugin) PreBind(state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
func (pp *PreBindPlugin) PreBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
pp.numPreBindCalled++
if pp.failPreBind {
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
@@ -288,7 +289,7 @@ func (bp *BindPlugin) Name() string {
return bp.PluginName
}
func (bp *BindPlugin) Bind(state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
func (bp *BindPlugin) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
bp.numBindCalled++
if bp.pluginInvokeEventChan != nil {
bp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: bp.Name(), val: bp.numBindCalled}
@@ -318,7 +319,7 @@ func (pp *PostBindPlugin) Name() string {
}
// PostBind is a test function, which counts the number of times called.
func (pp *PostBindPlugin) PostBind(state *framework.CycleState, pod *v1.Pod, nodeName string) {
func (pp *PostBindPlugin) PostBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
pp.numPostBindCalled++
if pp.pluginInvokeEventChan != nil {
pp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: pp.Name(), val: pp.numPostBindCalled}
@@ -341,7 +342,7 @@ func (pp *PreFilterPlugin) PreFilterExtensions() framework.PreFilterExtensions {
}
// PreFilter is a test function that returns (true, nil) or errors for testing.
func (pp *PreFilterPlugin) PreFilter(state *framework.CycleState, pod *v1.Pod) *framework.Status {
func (pp *PreFilterPlugin) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status {
pp.numPreFilterCalled++
if pp.failPreFilter {
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
@@ -366,7 +367,7 @@ func (up *UnreservePlugin) Name() string {
// Unreserve is a test function that returns an error or nil, depending on the
// value of "failUnreserve".
func (up *UnreservePlugin) Unreserve(state *framework.CycleState, pod *v1.Pod, nodeName string) {
func (up *UnreservePlugin) Unreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
up.numUnreserveCalled++
if up.pluginInvokeEventChan != nil {
up.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: up.Name(), val: up.numUnreserveCalled}
@@ -384,7 +385,7 @@ func (pp *PermitPlugin) Name() string {
}
// Permit implements the permit test plugin.
func (pp *PermitPlugin) Permit(state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
pp.numPermitCalled++
if pp.failPermit {
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)), 0

View File

@@ -19,6 +19,7 @@ limitations under the License.
package scheduler
import (
"context"
"fmt"
"testing"
"time"
@@ -84,7 +85,7 @@ func (fp *tokenFilter) Name() string {
return tokenFilterName
}
func (fp *tokenFilter) Filter(state *framework.CycleState, pod *v1.Pod,
func (fp *tokenFilter) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod,
nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
if fp.Tokens > 0 {
fp.Tokens--
@@ -97,17 +98,17 @@ func (fp *tokenFilter) Filter(state *framework.CycleState, pod *v1.Pod,
return framework.NewStatus(status, fmt.Sprintf("can't fit %v", pod.Name))
}
func (fp *tokenFilter) PreFilter(state *framework.CycleState, pod *v1.Pod) *framework.Status {
func (fp *tokenFilter) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status {
return nil
}
func (fp *tokenFilter) AddPod(state *framework.CycleState, podToSchedule *v1.Pod,
func (fp *tokenFilter) AddPod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod,
podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
fp.Tokens--
return nil
}
func (fp *tokenFilter) RemovePod(state *framework.CycleState, podToSchedule *v1.Pod,
func (fp *tokenFilter) RemovePod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod,
podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
fp.Tokens++
return nil
@@ -526,8 +527,8 @@ func TestPodPriorityResolution(t *testing.T) {
externalInformers := informers.NewSharedInformerFactory(externalClientset, time.Second)
admission.SetExternalKubeClientSet(externalClientset)
admission.SetExternalKubeInformerFactory(externalInformers)
externalInformers.Start(context.stopCh)
externalInformers.WaitForCacheSync(context.stopCh)
externalInformers.Start(context.ctx.Done())
externalInformers.WaitForCacheSync(context.ctx.Done())
tests := []struct {
Name string

View File

@@ -23,7 +23,7 @@ import (
"testing"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -116,13 +116,13 @@ func TestTaintNodeByCondition(t *testing.T) {
t.Errorf("Failed to create node controller: %v", err)
return
}
go nc.Run(context.stopCh)
go nc.Run(context.ctx.Done())
// Waiting for all controller sync.
externalInformers.Start(context.stopCh)
externalInformers.WaitForCacheSync(context.stopCh)
informers.Start(context.stopCh)
informers.WaitForCacheSync(context.stopCh)
externalInformers.Start(context.ctx.Done())
externalInformers.WaitForCacheSync(context.ctx.Done())
informers.Start(context.ctx.Done())
informers.WaitForCacheSync(context.ctx.Done())
// -------------------------------------------
// Test TaintNodeByCondition feature.
@@ -696,13 +696,13 @@ func TestTaintBasedEvictions(t *testing.T) {
return
}
go nc.Run(context.stopCh)
go nc.Run(context.ctx.Done())
// Waiting for all controller sync.
externalInformers.Start(context.stopCh)
externalInformers.WaitForCacheSync(context.stopCh)
informers.Start(context.stopCh)
informers.WaitForCacheSync(context.stopCh)
externalInformers.Start(context.ctx.Done())
externalInformers.WaitForCacheSync(context.ctx.Done())
informers.Start(context.ctx.Done())
informers.WaitForCacheSync(context.ctx.Done())
nodeRes := v1.ResourceList{
v1.ResourceCPU: resource.MustParse("4000m"),

View File

@@ -17,6 +17,7 @@ limitations under the License.
package scheduler
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
@@ -67,7 +68,8 @@ type testContext struct {
clientSet *clientset.Clientset
informerFactory informers.SharedInformerFactory
scheduler *scheduler.Scheduler
stopCh chan struct{}
ctx context.Context
cancelFn context.CancelFunc
}
func createAlgorithmSourceFromPolicy(policy *schedulerapi.Policy, clientSet clientset.Interface) schedulerconfig.SchedulerAlgorithmSource {
@@ -93,8 +95,10 @@ func createAlgorithmSourceFromPolicy(policy *schedulerapi.Policy, clientSet clie
// initTestMasterAndScheduler initializes a test environment and creates a master with default
// configuration.
func initTestMaster(t *testing.T, nsPrefix string, admission admission.Interface) *testContext {
ctx, cancelFunc := context.WithCancel(context.Background())
context := testContext{
stopCh: make(chan struct{}),
ctx: ctx,
cancelFn: cancelFunc,
}
// 1. Create master
@@ -187,7 +191,7 @@ func initTestSchedulerWithOptions(
podInformer,
recorder,
algorithmSrc,
context.stopCh,
context.ctx.Done(),
opts...,
)
@@ -207,7 +211,7 @@ func initTestSchedulerWithOptions(
context.informerFactory.Start(context.scheduler.StopEverything)
context.informerFactory.WaitForCacheSync(context.scheduler.StopEverything)
context.scheduler.Run()
go context.scheduler.Run(context.ctx)
return context
}
@@ -261,7 +265,7 @@ func initTestDisablePreemption(t *testing.T, nsPrefix string) *testContext {
// at the end of a test.
func cleanupTest(t *testing.T, context *testContext) {
// Kill the scheduler.
close(context.stopCh)
context.cancelFn()
// Cleanup nodes.
context.clientSet.CoreV1().Nodes().DeleteCollection(nil, metav1.ListOptions{})
framework.DeleteTestingNamespace(context.ns, context.httpServer, t)