From 47a6c5b69357549f5a01540521d7c9a86a3b4fbb Mon Sep 17 00:00:00 2001 From: draveness Date: Wed, 28 Aug 2019 19:12:02 +0800 Subject: [PATCH] feat(scheduler): use context in the scheduler package + Use context instead of stopCh + Add context to the scheduling framework interface --- cmd/kube-scheduler/app/server.go | 47 ++++----- cmd/kube-scheduler/app/testing/testserver.go | 11 ++- pkg/scheduler/core/extender_test.go | 3 +- pkg/scheduler/core/generic_scheduler.go | 47 +++++---- pkg/scheduler/core/generic_scheduler_test.go | 20 ++-- pkg/scheduler/factory_test.go | 8 +- .../plugins/examples/multipoint/multipoint.go | 6 +- .../plugins/examples/prebind/prebind.go | 3 +- .../plugins/examples/stateful/stateful.go | 5 +- .../plugins/imagelocality/image_locality.go | 3 +- .../imagelocality/image_locality_test.go | 3 +- .../plugins/nodeaffinity/node_affinity.go | 6 +- .../nodeaffinity/node_affinity_test.go | 5 +- .../framework/plugins/nodename/node_name.go | 6 +- .../plugins/nodename/node_name_test.go | 5 +- .../framework/plugins/nodeports/node_ports.go | 6 +- .../plugins/nodeports/node_ports_test.go | 5 +- .../node_prefer_avoid_pods.go | 5 +- .../node_prefer_avoid_pods_test.go | 3 +- .../plugins/noderesources/node_resources.go | 5 +- .../noderesources/node_resources_test.go | 9 +- .../tainttoleration/taint_toleration.go | 7 +- .../tainttoleration/taint_toleration_test.go | 7 +- .../plugins/volumebinding/volume_binding.go | 4 +- .../volumerestrictions/volume_restrictions.go | 6 +- .../volume_restrictions_test.go | 11 ++- .../plugins/volumezone/volume_zone.go | 6 +- .../plugins/volumezone/volume_zone_test.go | 9 +- pkg/scheduler/framework/v1alpha1/framework.go | 73 ++++++++------ .../framework/v1alpha1/framework_test.go | 96 ++++++++++--------- pkg/scheduler/framework/v1alpha1/interface.go | 51 +++++----- .../internal/queue/scheduling_queue_test.go | 27 +++--- pkg/scheduler/scheduler.go | 41 ++++---- pkg/scheduler/scheduler_test.go | 21 ++-- test/integration/daemonset/daemonset_test.go | 94 +++++++++--------- test/integration/scheduler/framework_test.go | 25 ++--- test/integration/scheduler/preemption_test.go | 13 +-- test/integration/scheduler/taint_test.go | 22 ++--- test/integration/scheduler/util.go | 14 ++- test/integration/util/util.go | 14 +-- test/integration/volumescheduling/util.go | 18 ++-- .../volumescheduling/volume_binding_test.go | 8 +- 42 files changed, 426 insertions(+), 352 deletions(-) diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index 53aa6bb118d..90d067dd0b1 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -136,7 +136,6 @@ func runCommand(cmd *cobra.Command, args []string, opts *options.Options, regist os.Exit(1) } - stopCh := make(chan struct{}) // Get the completed config cc := c.Complete() @@ -151,11 +150,14 @@ func runCommand(cmd *cobra.Command, args []string, opts *options.Options, regist return fmt.Errorf("unable to register configz: %s", err) } - return Run(cc, stopCh, registryOptions...) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + return Run(ctx, cc, registryOptions...) } -// Run executes the scheduler based on the given configuration. It only return on error or when stopCh is closed. -func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, registryOptions ...Option) error { +// Run executes the scheduler based on the given configuration. It only returns on error or when context is done. +func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, registryOptions ...Option) error { // To help debugging, immediately log version klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get()) @@ -172,7 +174,7 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, regis cc.PodInformer, cc.Recorder, cc.ComponentConfig.AlgorithmSource, - stopCh, + ctx.Done(), scheduler.WithName(cc.ComponentConfig.SchedulerName), scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight), scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption), @@ -190,7 +192,7 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, regis // Prepare the event broadcaster. if cc.Broadcaster != nil && cc.EventClient != nil { - cc.Broadcaster.StartRecordingToSink(stopCh) + cc.Broadcaster.StartRecordingToSink(ctx.Done()) } if cc.LeaderElectionBroadcaster != nil && cc.CoreEventClient != nil { cc.LeaderElectionBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.CoreEventClient.Events("")}) @@ -205,53 +207,36 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, regis if cc.InsecureServing != nil { separateMetrics := cc.InsecureMetricsServing != nil handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil) - if err := cc.InsecureServing.Serve(handler, 0, stopCh); err != nil { + if err := cc.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil { return fmt.Errorf("failed to start healthz server: %v", err) } } if cc.InsecureMetricsServing != nil { handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil) - if err := cc.InsecureMetricsServing.Serve(handler, 0, stopCh); err != nil { + if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil { return fmt.Errorf("failed to start metrics server: %v", err) } } if cc.SecureServing != nil { handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer) // TODO: handle stoppedCh returned by c.SecureServing.Serve - if _, err := cc.SecureServing.Serve(handler, 0, stopCh); err != nil { + if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil { // fail early for secure handlers, removing the old error loop from above return fmt.Errorf("failed to start secure server: %v", err) } } // Start all informers. - go cc.PodInformer.Informer().Run(stopCh) - cc.InformerFactory.Start(stopCh) + go cc.PodInformer.Informer().Run(ctx.Done()) + cc.InformerFactory.Start(ctx.Done()) // Wait for all caches to sync before scheduling. - cc.InformerFactory.WaitForCacheSync(stopCh) - - // Prepare a reusable runCommand function. - run := func(ctx context.Context) { - sched.Run() - <-ctx.Done() - } - - ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here - defer cancel() - - go func() { - select { - case <-stopCh: - cancel() - case <-ctx.Done(): - } - }() + cc.InformerFactory.WaitForCacheSync(ctx.Done()) // If leader election is enabled, runCommand via LeaderElector until done and exit. if cc.LeaderElection != nil { cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{ - OnStartedLeading: run, + OnStartedLeading: sched.Run, OnStoppedLeading: func() { klog.Fatalf("leaderelection lost") }, @@ -267,7 +252,7 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, regis } // Leader election is disabled, so runCommand inline until done. - run(ctx) + sched.Run(ctx) return fmt.Errorf("finished without leader elect") } diff --git a/cmd/kube-scheduler/app/testing/testserver.go b/cmd/kube-scheduler/app/testing/testserver.go index 9ce7ec3dbe5..63c24003e5d 100644 --- a/cmd/kube-scheduler/app/testing/testserver.go +++ b/cmd/kube-scheduler/app/testing/testserver.go @@ -17,6 +17,7 @@ limitations under the License. package testing import ( + "context" "fmt" "io/ioutil" "net" @@ -62,9 +63,9 @@ type Logger interface { // files that because Golang testing's call to os.Exit will not give a stop channel go routine // enough time to remove temporary files. func StartTestServer(t Logger, customFlags []string) (result TestServer, err error) { - stopCh := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) tearDown := func() { - close(stopCh) + cancel() if len(result.TmpDir) != 0 { os.RemoveAll(result.TmpDir) } @@ -119,11 +120,11 @@ func StartTestServer(t Logger, customFlags []string) (result TestServer, err err } errCh := make(chan error) - go func(stopCh <-chan struct{}) { - if err := app.Run(config.Complete(), stopCh); err != nil { + go func(ctx context.Context) { + if err := app.Run(ctx, config.Complete()); err != nil { errCh <- err } - }(stopCh) + }(ctx) t.Logf("Waiting for /healthz to be ok...") client, err := kubernetes.NewForConfig(config.LoopbackClientConfig) diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index 8baf76c808b..66cf8ebc6a6 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -17,6 +17,7 @@ limitations under the License. package core import ( + "context" "fmt" "reflect" "sort" @@ -558,7 +559,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { schedulerapi.DefaultPercentageOfNodesToScore, false) podIgnored := &v1.Pod{} - result, err := scheduler.Schedule(framework.NewCycleState(), podIgnored) + result, err := scheduler.Schedule(context.Background(), framework.NewCycleState(), podIgnored) if test.expectsErr { if err == nil { t.Errorf("Unexpected non-error, result %+v", result) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 9037c782b1b..83f7e43ef13 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -114,12 +114,12 @@ func (f *FitError) Error() string { // onto machines. // TODO: Rename this type. type ScheduleAlgorithm interface { - Schedule(*framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error) + Schedule(context.Context, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error) // Preempt receives scheduling errors for a pod and tries to create room for // the pod by preempting lower priority pods if possible. // It returns the node where preemption happened, a list of preempted pods, a // list of pods whose nominated node name should be removed, and error if any. - Preempt(*framework.CycleState, *v1.Pod, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error) + Preempt(context.Context, *framework.CycleState, *v1.Pod, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error) // Predicates() returns a pointer to a map of predicate functions. This is // exposed for testing. Predicates() map[string]predicates.FitPredicate @@ -171,7 +171,7 @@ func (g *genericScheduler) snapshot() error { // Schedule tries to schedule the given pod to one of the nodes in the node list. // If it succeeds, it will return the name of the node. // If it fails, it will return a FitError error with reasons. -func (g *genericScheduler) Schedule(state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { +func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name}) defer trace.LogIfLong(100 * time.Millisecond) @@ -181,7 +181,7 @@ func (g *genericScheduler) Schedule(state *framework.CycleState, pod *v1.Pod) (r trace.Step("Basic checks done") // Run "prefilter" plugins. - preFilterStatus := g.framework.RunPreFilterPlugins(state, pod) + preFilterStatus := g.framework.RunPreFilterPlugins(ctx, state, pod) if !preFilterStatus.IsSuccess() { return result, preFilterStatus.AsError() } @@ -198,14 +198,14 @@ func (g *genericScheduler) Schedule(state *framework.CycleState, pod *v1.Pod) (r trace.Step("Snapshoting scheduler cache and node infos done") startPredicateEvalTime := time.Now() - filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(state, pod) + filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(ctx, state, pod) if err != nil { return result, err } trace.Step("Computing predicates done") // Run "postfilter" plugins. - postfilterStatus := g.framework.RunPostFilterPlugins(state, pod, filteredNodes, filteredNodesStatuses) + postfilterStatus := g.framework.RunPostFilterPlugins(ctx, state, pod, filteredNodes, filteredNodesStatuses) if !postfilterStatus.IsSuccess() { return result, postfilterStatus.AsError() } @@ -237,7 +237,7 @@ func (g *genericScheduler) Schedule(state *framework.CycleState, pod *v1.Pod) (r } metaPrioritiesInterface := g.priorityMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap) - priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders, g.framework, state) + priorityList, err := PrioritizeNodes(ctx, pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders, g.framework, state) if err != nil { return result, err } @@ -310,7 +310,7 @@ func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (st // other pods with the same priority. The nominated pod prevents other pods from // using the nominated resources and the nominated pod could take a long time // before it is retried after many other pending pods. -func (g *genericScheduler) Preempt(state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) { +func (g *genericScheduler) Preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) { // Scheduler may return various types of errors. Consider preemption only if // the error is of type FitError. fitError, ok := scheduleErr.(*FitError) @@ -334,7 +334,7 @@ func (g *genericScheduler) Preempt(state *framework.CycleState, pod *v1.Pod, sch if err != nil { return nil, nil, nil, err } - nodeToVictims, err := g.selectNodesForPreemption(state, pod, g.nodeInfoSnapshot.NodeInfoMap, potentialNodes, g.predicates, + nodeToVictims, err := g.selectNodesForPreemption(ctx, state, pod, g.nodeInfoSnapshot.NodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer, g.schedulingQueue, pdbs) if err != nil { return nil, nil, nil, err @@ -453,7 +453,7 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i // Filters the nodes to find the ones that fit based on the given predicate functions // Each node is passed through the predicate functions to determine if it is a fit -func (g *genericScheduler) findNodesThatFit(state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, FailedPredicateMap, framework.NodeToStatusMap, error) { +func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, FailedPredicateMap, framework.NodeToStatusMap, error) { var filtered []*v1.Node failedPredicateMap := FailedPredicateMap{} filteredNodesStatuses := framework.NodeToStatusMap{} @@ -473,7 +473,7 @@ func (g *genericScheduler) findNodesThatFit(state *framework.CycleState, pod *v1 filteredLen int32 ) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) // We can use the same metadata producer for all nodes. meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap) @@ -483,6 +483,7 @@ func (g *genericScheduler) findNodesThatFit(state *framework.CycleState, pod *v1 nodeName := g.cache.NodeTree().Next() fits, failedPredicates, status, err := g.podFitsOnNode( + ctx, state, pod, meta, @@ -561,7 +562,7 @@ func (g *genericScheduler) findNodesThatFit(state *framework.CycleState, pod *v1 // addNominatedPods adds pods with equal or greater priority which are nominated // to run on the node given in nodeInfo to meta and nodeInfo. It returns 1) whether // any pod was added, 2) augmented metadata, 3) augmented CycleState 4) augmented nodeInfo. -func (g *genericScheduler) addNominatedPods(pod *v1.Pod, meta predicates.PredicateMetadata, state *framework.CycleState, +func (g *genericScheduler) addNominatedPods(ctx context.Context, pod *v1.Pod, meta predicates.PredicateMetadata, state *framework.CycleState, nodeInfo *schedulernodeinfo.NodeInfo, queue internalqueue.SchedulingQueue) (bool, predicates.PredicateMetadata, *framework.CycleState, *schedulernodeinfo.NodeInfo, error) { if queue == nil || nodeInfo == nil || nodeInfo.Node() == nil { @@ -588,7 +589,7 @@ func (g *genericScheduler) addNominatedPods(pod *v1.Pod, meta predicates.Predica return false, meta, state, nodeInfo, err } } - status := g.framework.RunPreFilterExtensionAddPod(stateOut, pod, p, nodeInfoOut) + status := g.framework.RunPreFilterExtensionAddPod(ctx, stateOut, pod, p, nodeInfoOut) if !status.IsSuccess() { return false, meta, state, nodeInfo, status.AsError() } @@ -609,6 +610,7 @@ func (g *genericScheduler) addNominatedPods(pod *v1.Pod, meta predicates.Predica // add the nominated pods. Removal of the victims is done by SelectVictimsOnNode(). // It removes victims from meta and NodeInfo before calling this function. func (g *genericScheduler) podFitsOnNode( + ctx context.Context, state *framework.CycleState, pod *v1.Pod, meta predicates.PredicateMetadata, @@ -645,7 +647,7 @@ func (g *genericScheduler) podFitsOnNode( nodeInfoToUse := info if i == 0 { var err error - podsAdded, metaToUse, stateToUse, nodeInfoToUse, err = g.addNominatedPods(pod, meta, state, info, queue) + podsAdded, metaToUse, stateToUse, nodeInfoToUse, err = g.addNominatedPods(ctx, pod, meta, state, info, queue) if err != nil { return false, []predicates.PredicateFailureReason{}, nil, err } @@ -680,7 +682,7 @@ func (g *genericScheduler) podFitsOnNode( } } - status = g.framework.RunFilterPlugins(stateToUse, pod, nodeInfoToUse) + status = g.framework.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse) if !status.IsSuccess() && !status.IsUnschedulable() { return false, failedPredicates, status, status.AsError() } @@ -696,6 +698,7 @@ func (g *genericScheduler) podFitsOnNode( // The node scores returned by the priority function are multiplied by the weights to get weighted scores // All scores are finally combined (added) to get the total weighted scores of all nodes func PrioritizeNodes( + ctx context.Context, pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, meta interface{}, @@ -790,7 +793,7 @@ func PrioritizeNodes( // Run the Score plugins. state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: meta}) - scoresMap, scoreStatus := fwk.RunScorePlugins(state, pod, nodes) + scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes) if !scoreStatus.IsSuccess() { return framework.NodeScoreList{}, scoreStatus.AsError() } @@ -1004,6 +1007,7 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*extenderv1.Victims) * // selectNodesForPreemption finds all the nodes with possible victims for // preemption in parallel. func (g *genericScheduler) selectNodesForPreemption( + ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, @@ -1031,7 +1035,7 @@ func (g *genericScheduler) selectNodesForPreemption( stateCopy := state.Clone() stateCopy.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: metaCopy}) pods, numPDBViolations, fits := g.selectVictimsOnNode( - stateCopy, pod, metaCopy, nodeInfoCopy, fitPredicates, queue, pdbs) + ctx, stateCopy, pod, metaCopy, nodeInfoCopy, fitPredicates, queue, pdbs) if fits { resultLock.Lock() victims := extenderv1.Victims{ @@ -1101,6 +1105,7 @@ func filterPodsWithPDBViolation(pods []*v1.Pod, pdbs []*policy.PodDisruptionBudg // due to pod affinity, node affinity, or node anti-affinity reasons. None of // these predicates can be satisfied by removing more pods from the node. func (g *genericScheduler) selectVictimsOnNode( + ctx context.Context, state *framework.CycleState, pod *v1.Pod, meta predicates.PredicateMetadata, @@ -1120,7 +1125,7 @@ func (g *genericScheduler) selectVictimsOnNode( return err } } - status := g.framework.RunPreFilterExtensionRemovePod(state, pod, rp, nodeInfo) + status := g.framework.RunPreFilterExtensionRemovePod(ctx, state, pod, rp, nodeInfo) if !status.IsSuccess() { return status.AsError() } @@ -1133,7 +1138,7 @@ func (g *genericScheduler) selectVictimsOnNode( return err } } - status := g.framework.RunPreFilterExtensionAddPod(state, pod, ap, nodeInfo) + status := g.framework.RunPreFilterExtensionAddPod(ctx, state, pod, ap, nodeInfo) if !status.IsSuccess() { return status.AsError() } @@ -1156,7 +1161,7 @@ func (g *genericScheduler) selectVictimsOnNode( // inter-pod affinity to one or more victims, but we have decided not to // support this case for performance reasons. Having affinity to lower // priority pods is not a recommended configuration anyway. - if fits, _, _, err := g.podFitsOnNode(state, pod, meta, nodeInfo, fitPredicates, queue, false); !fits { + if fits, _, _, err := g.podFitsOnNode(ctx, state, pod, meta, nodeInfo, fitPredicates, queue, false); !fits { if err != nil { klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) } @@ -1174,7 +1179,7 @@ func (g *genericScheduler) selectVictimsOnNode( if err := addPod(p); err != nil { return false, err } - fits, _, _, _ := g.podFitsOnNode(state, pod, meta, nodeInfo, fitPredicates, queue, false) + fits, _, _, _ := g.podFitsOnNode(ctx, state, pod, meta, nodeInfo, fitPredicates, queue, false) if !fits { if err := removePod(p); err != nil { return false, err diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index 6c728e33bf3..6530739f5d9 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -17,6 +17,7 @@ limitations under the License. package core import ( + "context" "fmt" "math" "reflect" @@ -161,7 +162,7 @@ func (fp *FakeFilterPlugin) reset() { // Filter is a test function that returns an error or nil, depending on the // value of "failedNodeReturnCodeMap". -func (fp *FakeFilterPlugin) Filter(state *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { +func (fp *FakeFilterPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { atomic.AddInt32(&fp.numFilterCalled, 1) if returnCode, ok := fp.failedNodeReturnCodeMap[nodeInfo.Node().Name]; ok { @@ -674,7 +675,7 @@ func TestGenericScheduler(t *testing.T) { false, schedulerapi.DefaultPercentageOfNodesToScore, false) - result, err := scheduler.Schedule(framework.NewCycleState(), test.pod) + result, err := scheduler.Schedule(context.Background(), framework.NewCycleState(), test.pod) if !reflect.DeepEqual(err, test.wErr) { t.Errorf("Unexpected error: %v, expected: %v", err.Error(), test.wErr) } @@ -719,7 +720,7 @@ func TestFindFitAllError(t *testing.T) { nodes := makeNodeList([]string{"3", "2", "1"}) scheduler := makeScheduler(predicates, nodes) - _, predicateMap, _, err := scheduler.findNodesThatFit(framework.NewCycleState(), &v1.Pod{}) + _, predicateMap, _, err := scheduler.findNodesThatFit(context.Background(), framework.NewCycleState(), &v1.Pod{}) if err != nil { t.Errorf("unexpected error: %v", err) @@ -749,7 +750,7 @@ func TestFindFitSomeError(t *testing.T) { scheduler := makeScheduler(predicates, nodes) pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}} - _, predicateMap, _, err := scheduler.findNodesThatFit(framework.NewCycleState(), pod) + _, predicateMap, _, err := scheduler.findNodesThatFit(context.Background(), framework.NewCycleState(), pod) if err != nil { t.Errorf("unexpected error: %v", err) @@ -830,7 +831,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) { cache.UpdateNodeInfoSnapshot(scheduler.nodeInfoSnapshot) queue.UpdateNominatedPodForNode(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: types.UID("nominated")}, Spec: v1.PodSpec{Priority: &midPriority}}, "1") - _, _, _, err := scheduler.findNodesThatFit(framework.NewCycleState(), test.pod) + _, _, _, err := scheduler.findNodesThatFit(context.Background(), framework.NewCycleState(), test.pod) if err != nil { t.Errorf("unexpected error: %v", err) @@ -998,6 +999,7 @@ func TestZeroRequest(t *testing.T) { metaData := metaDataProducer(test.pod, nodeNameToInfo) list, err := PrioritizeNodes( + context.Background(), test.pod, nodeNameToInfo, metaData, priorityConfigs, test.nodes, []algorithm.SchedulerExtender{}, emptyFramework, framework.NewCycleState()) if err != nil { @@ -1427,7 +1429,7 @@ func TestSelectNodesForPreemption(t *testing.T) { newnode.ObjectMeta.Labels = map[string]string{"hostname": "newnode"} nodes = append(nodes, newnode) state := framework.NewCycleState() - nodeToPods, err := g.selectNodesForPreemption(state, test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil, nil) + nodeToPods, err := g.selectNodesForPreemption(context.Background(), state, test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil, nil) if err != nil { t.Error(err) } @@ -1651,7 +1653,7 @@ func TestPickOneNodeForPreemption(t *testing.T) { } nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, nodes) state := framework.NewCycleState() - candidateNodes, _ := g.selectNodesForPreemption(state, test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil, nil) + candidateNodes, _ := g.selectNodesForPreemption(context.Background(), state, test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil, nil) node := pickOneNodeForPreemption(candidateNodes) found := false for _, nodeName := range test.expected { @@ -2142,7 +2144,7 @@ func TestPreempt(t *testing.T) { if test.failedPredMap != nil { failedPredMap = test.failedPredMap } - node, victims, _, err := scheduler.Preempt(state, test.pod, error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap})) + node, victims, _, err := scheduler.Preempt(context.Background(), state, test.pod, error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap})) if err != nil { t.Errorf("unexpected error in preemption: %v", err) } @@ -2172,7 +2174,7 @@ func TestPreempt(t *testing.T) { test.pod.Status.NominatedNodeName = node.Name } // Call preempt again and make sure it doesn't preempt any more pods. - node, victims, _, err = scheduler.Preempt(state, test.pod, error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap})) + node, victims, _, err = scheduler.Preempt(context.Background(), state, test.pod, error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap})) if err != nil { t.Errorf("unexpected error in preemption: %v", err) } diff --git a/pkg/scheduler/factory_test.go b/pkg/scheduler/factory_test.go index f82eb454f12..2aa9433f818 100644 --- a/pkg/scheduler/factory_test.go +++ b/pkg/scheduler/factory_test.go @@ -17,6 +17,7 @@ limitations under the License. package scheduler import ( + "context" "errors" "fmt" "reflect" @@ -633,11 +634,14 @@ type TestPlugin struct { name string } +var _ framework.ScorePlugin = &TestPlugin{} +var _ framework.FilterPlugin = &TestPlugin{} + func (t *TestPlugin) Name() string { return t.name } -func (t *TestPlugin) Score(state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { +func (t *TestPlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { return 1, nil } @@ -645,7 +649,7 @@ func (t *TestPlugin) ScoreExtensions() framework.ScoreExtensions { return nil } -func (t *TestPlugin) Filter(state *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { +func (t *TestPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { return nil } diff --git a/pkg/scheduler/framework/plugins/examples/multipoint/multipoint.go b/pkg/scheduler/framework/plugins/examples/multipoint/multipoint.go index 3ae10c7216e..52132a47564 100644 --- a/pkg/scheduler/framework/plugins/examples/multipoint/multipoint.go +++ b/pkg/scheduler/framework/plugins/examples/multipoint/multipoint.go @@ -17,6 +17,8 @@ limitations under the License. package multipoint import ( + "context" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" @@ -49,7 +51,7 @@ func (f *stateData) Clone() framework.StateData { } // Reserve is the functions invoked by the framework at "reserve" extension point. -func (mc CommunicatingPlugin) Reserve(state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { +func (mc CommunicatingPlugin) Reserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { if pod == nil { return framework.NewStatus(framework.Error, "pod cannot be nil") } @@ -62,7 +64,7 @@ func (mc CommunicatingPlugin) Reserve(state *framework.CycleState, pod *v1.Pod, } // PreBind is the functions invoked by the framework at "prebind" extension point. -func (mc CommunicatingPlugin) PreBind(state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { +func (mc CommunicatingPlugin) PreBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { if pod == nil { return framework.NewStatus(framework.Error, "pod cannot be nil") } diff --git a/pkg/scheduler/framework/plugins/examples/prebind/prebind.go b/pkg/scheduler/framework/plugins/examples/prebind/prebind.go index 7e304ebc3a9..8ee7ca92c6c 100644 --- a/pkg/scheduler/framework/plugins/examples/prebind/prebind.go +++ b/pkg/scheduler/framework/plugins/examples/prebind/prebind.go @@ -17,6 +17,7 @@ limitations under the License. package prebind import ( + "context" "fmt" v1 "k8s.io/api/core/v1" @@ -39,7 +40,7 @@ func (sr StatelessPreBindExample) Name() string { } // PreBind is the functions invoked by the framework at "prebind" extension point. -func (sr StatelessPreBindExample) PreBind(state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { +func (sr StatelessPreBindExample) PreBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { if pod == nil { return framework.NewStatus(framework.Error, fmt.Sprintf("pod cannot be nil")) } diff --git a/pkg/scheduler/framework/plugins/examples/stateful/stateful.go b/pkg/scheduler/framework/plugins/examples/stateful/stateful.go index 25164506b43..0bdd502cd4b 100644 --- a/pkg/scheduler/framework/plugins/examples/stateful/stateful.go +++ b/pkg/scheduler/framework/plugins/examples/stateful/stateful.go @@ -17,6 +17,7 @@ limitations under the License. package stateful import ( + "context" "fmt" "sync" @@ -47,14 +48,14 @@ func (mp *MultipointExample) Name() string { } // Reserve is the functions invoked by the framework at "reserve" extension point. -func (mp *MultipointExample) Reserve(state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { +func (mp *MultipointExample) Reserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { // Reserve is not called concurrently, and so we don't need to lock. mp.numRuns++ return nil } // PreBind is the functions invoked by the framework at "prebind" extension point. -func (mp *MultipointExample) PreBind(state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { +func (mp *MultipointExample) PreBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { // PreBind could be called concurrently for different pods. mp.mu.Lock() defer mp.mu.Unlock() diff --git a/pkg/scheduler/framework/plugins/imagelocality/image_locality.go b/pkg/scheduler/framework/plugins/imagelocality/image_locality.go index d5a269f1fe7..dde6c8c7542 100644 --- a/pkg/scheduler/framework/plugins/imagelocality/image_locality.go +++ b/pkg/scheduler/framework/plugins/imagelocality/image_locality.go @@ -17,6 +17,7 @@ limitations under the License. package imagelocality import ( + "context" "fmt" v1 "k8s.io/api/core/v1" @@ -44,7 +45,7 @@ func (pl *ImageLocality) Name() string { } // Score invoked at the score extension point. -func (pl *ImageLocality) Score(state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { +func (pl *ImageLocality) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { nodeInfo, exist := pl.handle.NodeInfoSnapshot().NodeInfoMap[nodeName] if !exist { return 0, framework.NewStatus(framework.Error, fmt.Sprintf("node %q does not exist in NodeInfoSnapshot", nodeName)) diff --git a/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go b/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go index f42f28f2e47..1c3bca6949e 100644 --- a/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go +++ b/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go @@ -17,6 +17,7 @@ limitations under the License. package imagelocality import ( + "context" "reflect" "testing" @@ -207,7 +208,7 @@ func TestImageLocalityPriority(t *testing.T) { var gotList framework.NodeScoreList for _, n := range test.nodes { nodeName := n.ObjectMeta.Name - score, status := p.(framework.ScorePlugin).Score(state, test.pod, nodeName) + score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.pod, nodeName) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) } diff --git a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go index 15a20359868..164a15e335d 100644 --- a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go +++ b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go @@ -17,7 +17,9 @@ limitations under the License. package nodeaffinity import ( - "k8s.io/api/core/v1" + "context" + + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" @@ -39,7 +41,7 @@ func (pl *NodeAffinity) Name() string { } // Filter invoked at the filter extension point. -func (pl *NodeAffinity) Filter(_ *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status { +func (pl *NodeAffinity) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status { _, reasons, err := predicates.PodMatchNodeSelector(pod, nil, nodeInfo) return migration.PredicateResultToFrameworkStatus(reasons, err) } diff --git a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go index 3f2a41749bd..0499e2993bb 100644 --- a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go +++ b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go @@ -17,10 +17,11 @@ limitations under the License. package nodeaffinity import ( + "context" "reflect" "testing" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" @@ -697,7 +698,7 @@ func TestNodeAffinity(t *testing.T) { nodeInfo.SetNode(&node) p, _ := New(nil, nil) - gotStatus := p.(framework.FilterPlugin).Filter(nil, test.pod, nodeInfo) + gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } diff --git a/pkg/scheduler/framework/plugins/nodename/node_name.go b/pkg/scheduler/framework/plugins/nodename/node_name.go index 95cf20122ec..31715671b68 100644 --- a/pkg/scheduler/framework/plugins/nodename/node_name.go +++ b/pkg/scheduler/framework/plugins/nodename/node_name.go @@ -17,7 +17,9 @@ limitations under the License. package nodename import ( - "k8s.io/api/core/v1" + "context" + + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" @@ -39,7 +41,7 @@ func (pl *NodeName) Name() string { } // Filter invoked at the filter extension point. -func (pl *NodeName) Filter(_ *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status { +func (pl *NodeName) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status { _, reasons, err := predicates.PodFitsHost(pod, nil, nodeInfo) return migration.PredicateResultToFrameworkStatus(reasons, err) } diff --git a/pkg/scheduler/framework/plugins/nodename/node_name_test.go b/pkg/scheduler/framework/plugins/nodename/node_name_test.go index 579e4771398..bfd2b9711d8 100644 --- a/pkg/scheduler/framework/plugins/nodename/node_name_test.go +++ b/pkg/scheduler/framework/plugins/nodename/node_name_test.go @@ -17,10 +17,11 @@ limitations under the License. package nodename import ( + "context" "reflect" "testing" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" @@ -78,7 +79,7 @@ func TestNodeName(t *testing.T) { nodeInfo.SetNode(test.node) p, _ := New(nil, nil) - gotStatus := p.(framework.FilterPlugin).Filter(nil, test.pod, nodeInfo) + gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } diff --git a/pkg/scheduler/framework/plugins/nodeports/node_ports.go b/pkg/scheduler/framework/plugins/nodeports/node_ports.go index bed0ccb1b2d..d41a050bd8d 100644 --- a/pkg/scheduler/framework/plugins/nodeports/node_ports.go +++ b/pkg/scheduler/framework/plugins/nodeports/node_ports.go @@ -17,7 +17,9 @@ limitations under the License. package nodeports import ( - "k8s.io/api/core/v1" + "context" + + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" @@ -39,7 +41,7 @@ func (pl *NodePorts) Name() string { } // Filter invoked at the filter extension point. -func (pl *NodePorts) Filter(_ *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status { +func (pl *NodePorts) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status { _, reasons, err := predicates.PodFitsHostPorts(pod, nil, nodeInfo) return migration.PredicateResultToFrameworkStatus(reasons, err) } diff --git a/pkg/scheduler/framework/plugins/nodeports/node_ports_test.go b/pkg/scheduler/framework/plugins/nodeports/node_ports_test.go index ce01633e000..a06a76f3038 100644 --- a/pkg/scheduler/framework/plugins/nodeports/node_ports_test.go +++ b/pkg/scheduler/framework/plugins/nodeports/node_ports_test.go @@ -17,12 +17,13 @@ limitations under the License. package nodeports import ( + "context" "reflect" "strconv" "strings" "testing" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" @@ -149,7 +150,7 @@ func TestNodePorts(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { p, _ := New(nil, nil) - gotStatus := p.(framework.FilterPlugin).Filter(nil, test.pod, test.nodeInfo) + gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } diff --git a/pkg/scheduler/framework/plugins/nodepreferavoidpods/node_prefer_avoid_pods.go b/pkg/scheduler/framework/plugins/nodepreferavoidpods/node_prefer_avoid_pods.go index 9fa8d76e6d4..af17e48bbf5 100644 --- a/pkg/scheduler/framework/plugins/nodepreferavoidpods/node_prefer_avoid_pods.go +++ b/pkg/scheduler/framework/plugins/nodepreferavoidpods/node_prefer_avoid_pods.go @@ -17,6 +17,7 @@ limitations under the License. package nodepreferavoidpods import ( + "context" "fmt" v1 "k8s.io/api/core/v1" @@ -32,7 +33,7 @@ type NodePreferAvoidPods struct { handle framework.FrameworkHandle } -var _ = framework.ScorePlugin(&NodePreferAvoidPods{}) +var _ framework.ScorePlugin = &NodePreferAvoidPods{} // Name is the name of the plugin used in the plugin registry and configurations. const Name = "NodePreferAvoidPods" @@ -43,7 +44,7 @@ func (pl *NodePreferAvoidPods) Name() string { } // Score invoked at the score extension point. -func (pl *NodePreferAvoidPods) Score(state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { +func (pl *NodePreferAvoidPods) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { nodeInfo, exist := pl.handle.NodeInfoSnapshot().NodeInfoMap[nodeName] if !exist { return 0, framework.NewStatus(framework.Error, fmt.Sprintf("node %q does not exist in NodeInfoSnapshot", nodeName)) diff --git a/pkg/scheduler/framework/plugins/nodepreferavoidpods/node_prefer_avoid_pods_test.go b/pkg/scheduler/framework/plugins/nodepreferavoidpods/node_prefer_avoid_pods_test.go index 4beaa2939a0..16c36b0c7a7 100644 --- a/pkg/scheduler/framework/plugins/nodepreferavoidpods/node_prefer_avoid_pods_test.go +++ b/pkg/scheduler/framework/plugins/nodepreferavoidpods/node_prefer_avoid_pods_test.go @@ -17,6 +17,7 @@ limitations under the License. package nodepreferavoidpods import ( + "context" "reflect" "testing" @@ -151,7 +152,7 @@ func TestNodePreferAvoidPods(t *testing.T) { var gotList framework.NodeScoreList for _, n := range test.nodes { nodeName := n.ObjectMeta.Name - score, status := p.(framework.ScorePlugin).Score(state, test.pod, nodeName) + score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.pod, nodeName) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) } diff --git a/pkg/scheduler/framework/plugins/noderesources/node_resources.go b/pkg/scheduler/framework/plugins/noderesources/node_resources.go index 2db29b36da7..7cf39a4c7cc 100644 --- a/pkg/scheduler/framework/plugins/noderesources/node_resources.go +++ b/pkg/scheduler/framework/plugins/noderesources/node_resources.go @@ -17,9 +17,10 @@ limitations under the License. package noderesources import ( + "context" "fmt" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" @@ -41,7 +42,7 @@ func (pl *NodeResources) Name() string { } // Filter invoked at the filter extension point. -func (pl *NodeResources) Filter(cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status { +func (pl *NodeResources) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status { meta, ok := migration.PredicateMetadata(cycleState).(predicates.PredicateMetadata) if !ok { return migration.ErrorToFrameworkStatus(fmt.Errorf("%+v convert to predicates.PredicateMetadata error", cycleState)) diff --git a/pkg/scheduler/framework/plugins/noderesources/node_resources_test.go b/pkg/scheduler/framework/plugins/noderesources/node_resources_test.go index 0a91fbb2a19..41cd7fd85f6 100644 --- a/pkg/scheduler/framework/plugins/noderesources/node_resources_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/node_resources_test.go @@ -17,10 +17,11 @@ limitations under the License. package noderesources import ( + "context" "reflect" "testing" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -350,7 +351,7 @@ func TestNodeResources(t *testing.T) { test.nodeInfo.SetNode(&node) p, _ := New(nil, nil) - gotStatus := p.(framework.FilterPlugin).Filter(state, test.pod, test.nodeInfo) + gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), state, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -403,7 +404,7 @@ func TestNodeResources(t *testing.T) { test.nodeInfo.SetNode(&node) p, _ := New(nil, nil) - gotStatus := p.(framework.FilterPlugin).Filter(state, test.pod, test.nodeInfo) + gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), state, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -454,7 +455,7 @@ func TestNodeResources(t *testing.T) { test.nodeInfo.SetNode(&node) p, _ := New(nil, nil) - gotStatus := p.(framework.FilterPlugin).Filter(state, test.pod, test.nodeInfo) + gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), state, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } diff --git a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go index 3e469bb5a47..99616a61cf7 100644 --- a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go +++ b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go @@ -17,6 +17,7 @@ limitations under the License. package tainttoleration import ( + "context" "fmt" v1 "k8s.io/api/core/v1" @@ -45,14 +46,14 @@ func (pl *TaintToleration) Name() string { } // Filter invoked at the filter extension point. -func (pl *TaintToleration) Filter(state *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status { +func (pl *TaintToleration) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status { // Note that PodToleratesNodeTaints doesn't use predicate metadata, hence passing nil here. _, reasons, err := predicates.PodToleratesNodeTaints(pod, nil, nodeInfo) return migration.PredicateResultToFrameworkStatus(reasons, err) } // Score invoked at the Score extension point. -func (pl *TaintToleration) Score(state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { +func (pl *TaintToleration) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { nodeInfo, exist := pl.handle.NodeInfoSnapshot().NodeInfoMap[nodeName] if !exist { return 0, framework.NewStatus(framework.Error, fmt.Sprintf("node %q does not exist in NodeInfoSnapshot", nodeName)) @@ -63,7 +64,7 @@ func (pl *TaintToleration) Score(state *framework.CycleState, pod *v1.Pod, nodeN } // NormalizeScore invoked after scoring all nodes. -func (pl *TaintToleration) NormalizeScore(_ *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status { +func (pl *TaintToleration) NormalizeScore(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status { // Note that ComputeTaintTolerationPriorityReduce doesn't use priority metadata, hence passing nil here. err := priorities.ComputeTaintTolerationPriorityReduce(pod, nil, pl.handle.NodeInfoSnapshot().NodeInfoMap, scores) return migration.ErrorToFrameworkStatus(err) diff --git a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go index ab63719b450..dd88ee5a1e4 100644 --- a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go +++ b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go @@ -17,6 +17,7 @@ limitations under the License. package tainttoleration import ( + "context" "reflect" "testing" @@ -237,14 +238,14 @@ func TestTaintTolerationScore(t *testing.T) { var gotList framework.NodeScoreList for _, n := range test.nodes { nodeName := n.ObjectMeta.Name - score, status := p.(framework.ScorePlugin).Score(state, test.pod, nodeName) + score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.pod, nodeName) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) } gotList = append(gotList, framework.NodeScore{Name: nodeName, Score: score}) } - status := p.(framework.ScorePlugin).ScoreExtensions().NormalizeScore(state, test.pod, gotList) + status := p.(framework.ScorePlugin).ScoreExtensions().NormalizeScore(context.Background(), state, test.pod, gotList) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) } @@ -328,7 +329,7 @@ func TestTaintTolerationFilter(t *testing.T) { nodeInfo := schedulernodeinfo.NewNodeInfo() nodeInfo.SetNode(test.node) p, _ := New(nil, nil) - gotStatus := p.(framework.FilterPlugin).Filter(nil, test.pod, nodeInfo) + gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } diff --git a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go index 8bd9a70e669..2738a31ff19 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go +++ b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go @@ -17,6 +17,8 @@ limitations under the License. package volumebinding import ( + "context" + v1 "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" @@ -41,7 +43,7 @@ func (pl *VolumeBinding) Name() string { } // Filter invoked at the filter extension point. -func (pl *VolumeBinding) Filter(cs *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { +func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { _, reasons, err := pl.predicate(pod, nil, nodeInfo) return migration.PredicateResultToFrameworkStatus(reasons, err) } diff --git a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go index f69e9be799c..642c45af19f 100644 --- a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go +++ b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go @@ -17,7 +17,9 @@ limitations under the License. package volumerestrictions import ( - "k8s.io/api/core/v1" + "context" + + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" @@ -39,7 +41,7 @@ func (pl *VolumeRestrictions) Name() string { } // Filter invoked at the filter extension point. -func (pl *VolumeRestrictions) Filter(_ *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status { +func (pl *VolumeRestrictions) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status { // metadata is not needed for NoDiskConflict _, reasons, err := predicates.NoDiskConflict(pod, nil, nodeInfo) return migration.PredicateResultToFrameworkStatus(reasons, err) diff --git a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go index 77426df601d..b31e4947ab4 100644 --- a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go +++ b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go @@ -17,10 +17,11 @@ limitations under the License. package volumerestrictions import ( + "context" "reflect" "testing" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" @@ -66,7 +67,7 @@ func TestGCEDiskConflicts(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { p, _ := New(nil, nil) - gotStatus := p.(framework.FilterPlugin).Filter(nil, test.pod, test.nodeInfo) + gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -114,7 +115,7 @@ func TestAWSDiskConflicts(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { p, _ := New(nil, nil) - gotStatus := p.(framework.FilterPlugin).Filter(nil, test.pod, test.nodeInfo) + gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -168,7 +169,7 @@ func TestRBDDiskConflicts(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { p, _ := New(nil, nil) - gotStatus := p.(framework.FilterPlugin).Filter(nil, test.pod, test.nodeInfo) + gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -222,7 +223,7 @@ func TestISCSIDiskConflicts(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { p, _ := New(nil, nil) - gotStatus := p.(framework.FilterPlugin).Filter(nil, test.pod, test.nodeInfo) + gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } diff --git a/pkg/scheduler/framework/plugins/volumezone/volume_zone.go b/pkg/scheduler/framework/plugins/volumezone/volume_zone.go index f7a11804c57..e7ed01de27a 100644 --- a/pkg/scheduler/framework/plugins/volumezone/volume_zone.go +++ b/pkg/scheduler/framework/plugins/volumezone/volume_zone.go @@ -17,7 +17,9 @@ limitations under the License. package volumezone import ( - "k8s.io/api/core/v1" + "context" + + v1 "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" @@ -40,7 +42,7 @@ func (pl *VolumeZone) Name() string { } // Filter invoked at the filter extension point. -func (pl *VolumeZone) Filter(_ *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status { +func (pl *VolumeZone) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status { // metadata is not needed _, reasons, err := pl.predicate(pod, nil, nodeInfo) return migration.PredicateResultToFrameworkStatus(reasons, err) diff --git a/pkg/scheduler/framework/plugins/volumezone/volume_zone_test.go b/pkg/scheduler/framework/plugins/volumezone/volume_zone_test.go index 6e6442d2536..20119317877 100644 --- a/pkg/scheduler/framework/plugins/volumezone/volume_zone_test.go +++ b/pkg/scheduler/framework/plugins/volumezone/volume_zone_test.go @@ -17,10 +17,11 @@ limitations under the License. package volumezone import ( + "context" "reflect" "testing" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" @@ -154,7 +155,7 @@ func TestSingleZone(t *testing.T) { node := &schedulernodeinfo.NodeInfo{} node.SetNode(test.Node) p := New(pvInfo, pvcInfo, nil) - gotStatus := p.(framework.FilterPlugin).Filter(nil, test.Pod, node) + gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.Pod, node) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -237,7 +238,7 @@ func TestMultiZone(t *testing.T) { node := &schedulernodeinfo.NodeInfo{} node.SetNode(test.Node) p := New(pvInfo, pvcInfo, nil) - gotStatus := p.(framework.FilterPlugin).Filter(nil, test.Pod, node) + gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.Pod, node) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -340,7 +341,7 @@ func TestWithBinding(t *testing.T) { node := &schedulernodeinfo.NodeInfo{} node.SetNode(test.Node) p := New(pvInfo, pvcInfo, classInfo) - gotStatus := p.(framework.FilterPlugin).Filter(nil, test.Pod, node) + gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.Pod, node) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index 90fc2648f7f..b29214a2358 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -22,7 +22,7 @@ import ( "reflect" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" @@ -246,11 +246,11 @@ func (f *framework) QueueSortFunc() LessFunc { // anything but Success. If a non-success status is returned, then the scheduling // cycle is aborted. func (f *framework) RunPreFilterPlugins( - state *CycleState, pod *v1.Pod) (status *Status) { + ctx context.Context, state *CycleState, pod *v1.Pod) (status *Status) { startTime := time.Now() defer func() { recordExtensionPointDuration(startTime, preFilter, status) }() for _, pl := range f.preFilterPlugins { - status := pl.PreFilter(state, pod) + status := pl.PreFilter(ctx, state, pod) if !status.IsSuccess() { if status.IsUnschedulable() { msg := fmt.Sprintf("rejected by %q at prefilter: %v", pl.Name(), status.Message()) @@ -269,15 +269,20 @@ func (f *framework) RunPreFilterPlugins( // RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured // PreFilter plugins. It returns directly if any of the plugins return any // status other than Success. -func (f *framework) RunPreFilterExtensionAddPod(state *CycleState, podToSchedule *v1.Pod, - podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) (status *Status) { +func (f *framework) RunPreFilterExtensionAddPod( + ctx context.Context, + state *CycleState, + podToSchedule *v1.Pod, + podToAdd *v1.Pod, + nodeInfo *schedulernodeinfo.NodeInfo, +) (status *Status) { startTime := time.Now() defer func() { recordExtensionPointDuration(startTime, preFilterExtensionAddPod, status) }() for _, pl := range f.preFilterPlugins { if pl.PreFilterExtensions() == nil { continue } - if status := pl.PreFilterExtensions().AddPod(state, podToSchedule, podToAdd, nodeInfo); !status.IsSuccess() { + if status := pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podToAdd, nodeInfo); !status.IsSuccess() { msg := fmt.Sprintf("error while running AddPod for plugin %q while scheduling pod %q: %v", pl.Name(), podToSchedule.Name, status.Message()) klog.Error(msg) @@ -291,15 +296,20 @@ func (f *framework) RunPreFilterExtensionAddPod(state *CycleState, podToSchedule // RunPreFilterExtensionRemovePod calls the RemovePod interface for the set of configured // PreFilter plugins. It returns directly if any of the plugins return any // status other than Success. -func (f *framework) RunPreFilterExtensionRemovePod(state *CycleState, podToSchedule *v1.Pod, - podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) (status *Status) { +func (f *framework) RunPreFilterExtensionRemovePod( + ctx context.Context, + state *CycleState, + podToSchedule *v1.Pod, + podToRemove *v1.Pod, + nodeInfo *schedulernodeinfo.NodeInfo, +) (status *Status) { startTime := time.Now() defer func() { recordExtensionPointDuration(startTime, preFilterExtensionRemovePod, status) }() for _, pl := range f.preFilterPlugins { if pl.PreFilterExtensions() == nil { continue } - if status := pl.PreFilterExtensions().RemovePod(state, podToSchedule, podToRemove, nodeInfo); !status.IsSuccess() { + if status := pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podToRemove, nodeInfo); !status.IsSuccess() { msg := fmt.Sprintf("error while running RemovePod for plugin %q while scheduling pod %q: %v", pl.Name(), podToSchedule.Name, status.Message()) klog.Error(msg) @@ -314,12 +324,16 @@ func (f *framework) RunPreFilterExtensionRemovePod(state *CycleState, podToSched // the given node. If any of these plugins doesn't return "Success", the // given node is not suitable for running pod. // Meanwhile, the failure message and status are set for the given node. -func (f *framework) RunFilterPlugins(state *CycleState, - pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) (status *Status) { +func (f *framework) RunFilterPlugins( + ctx context.Context, + state *CycleState, + pod *v1.Pod, + nodeInfo *schedulernodeinfo.NodeInfo, +) (status *Status) { startTime := time.Now() defer func() { recordExtensionPointDuration(startTime, filter, status) }() for _, pl := range f.filterPlugins { - status := pl.Filter(state, pod, nodeInfo) + status := pl.Filter(ctx, state, pod, nodeInfo) if !status.IsSuccess() { if !status.IsUnschedulable() { errMsg := fmt.Sprintf("error while running %q filter plugin for pod %q: %v", @@ -338,6 +352,7 @@ func (f *framework) RunFilterPlugins(state *CycleState, // of these plugins returns any status other than "Success", the given node is // rejected. The filteredNodeStatuses is the set of filtered nodes and their statuses. func (f *framework) RunPostFilterPlugins( + ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node, @@ -346,7 +361,7 @@ func (f *framework) RunPostFilterPlugins( startTime := time.Now() defer func() { recordExtensionPointDuration(startTime, postFilter, status) }() for _, pl := range f.postFilterPlugins { - status := pl.PostFilter(state, pod, nodes, filteredNodesStatuses) + status := pl.PostFilter(ctx, state, pod, nodes, filteredNodesStatuses) if !status.IsSuccess() { msg := fmt.Sprintf("error while running %q postfilter plugin for pod %q: %v", pl.Name(), pod.Name, status.Message()) klog.Error(msg) @@ -361,21 +376,21 @@ func (f *framework) RunPostFilterPlugins( // stores for each scoring plugin name the corresponding NodeScoreList(s). // It also returns *Status, which is set to non-success if any of the plugins returns // a non-success status. -func (f *framework) RunScorePlugins(state *CycleState, pod *v1.Pod, nodes []*v1.Node) (ps PluginToNodeScores, status *Status) { +func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) (ps PluginToNodeScores, status *Status) { startTime := time.Now() defer func() { recordExtensionPointDuration(startTime, score, status) }() pluginToNodeScores := make(PluginToNodeScores, len(f.scorePlugins)) for _, pl := range f.scorePlugins { pluginToNodeScores[pl.Name()] = make(NodeScoreList, len(nodes)) } - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) errCh := schedutil.NewErrorChannel() // Run Score method for each node in parallel. workqueue.ParallelizeUntil(ctx, 16, len(nodes), func(index int) { for _, pl := range f.scorePlugins { nodeName := nodes[index].Name - score, status := pl.Score(state, pod, nodeName) + score, status := pl.Score(ctx, state, pod, nodeName) if !status.IsSuccess() { errCh.SendErrorWithCancel(fmt.Errorf(status.Message()), cancel) return @@ -399,7 +414,7 @@ func (f *framework) RunScorePlugins(state *CycleState, pod *v1.Pod, nodes []*v1. if pl.ScoreExtensions() == nil { return } - status := pl.ScoreExtensions().NormalizeScore(state, pod, nodeScoreList) + status := pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList) if !status.IsSuccess() { err := fmt.Errorf("normalize score plugin %q failed with error %v", pl.Name(), status.Message()) errCh.SendErrorWithCancel(err, cancel) @@ -442,11 +457,11 @@ func (f *framework) RunScorePlugins(state *CycleState, pod *v1.Pod, nodes []*v1. // failure (bool) if any of the plugins returns an error. It also returns an // error containing the rejection message or the error occurred in the plugin. func (f *framework) RunPreBindPlugins( - state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { + ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { startTime := time.Now() defer func() { recordExtensionPointDuration(startTime, preBind, status) }() for _, pl := range f.preBindPlugins { - status := pl.PreBind(state, pod, nodeName) + status := pl.PreBind(ctx, state, pod, nodeName) if !status.IsSuccess() { msg := fmt.Sprintf("error while running %q prebind plugin for pod %q: %v", pl.Name(), pod.Name, status.Message()) klog.Error(msg) @@ -457,14 +472,14 @@ func (f *framework) RunPreBindPlugins( } // RunBindPlugins runs the set of configured bind plugins until one returns a non `Skip` status. -func (f *framework) RunBindPlugins(state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { +func (f *framework) RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { startTime := time.Now() defer func() { recordExtensionPointDuration(startTime, bind, status) }() if len(f.bindPlugins) == 0 { return NewStatus(Skip, "") } for _, bp := range f.bindPlugins { - status = bp.Bind(state, pod, nodeName) + status = bp.Bind(ctx, state, pod, nodeName) if status != nil && status.Code() == Skip { continue } @@ -480,11 +495,11 @@ func (f *framework) RunBindPlugins(state *CycleState, pod *v1.Pod, nodeName stri // RunPostBindPlugins runs the set of configured postbind plugins. func (f *framework) RunPostBindPlugins( - state *CycleState, pod *v1.Pod, nodeName string) { + ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) { startTime := time.Now() defer recordExtensionPointDuration(startTime, postBind, nil) for _, pl := range f.postBindPlugins { - pl.PostBind(state, pod, nodeName) + pl.PostBind(ctx, state, pod, nodeName) } } @@ -492,11 +507,11 @@ func (f *framework) RunPostBindPlugins( // plugins returns an error, it does not continue running the remaining ones and // returns the error. In such case, pod will not be scheduled. func (f *framework) RunReservePlugins( - state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { + ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { startTime := time.Now() defer func() { recordExtensionPointDuration(startTime, reserve, status) }() for _, pl := range f.reservePlugins { - status := pl.Reserve(state, pod, nodeName) + status := pl.Reserve(ctx, state, pod, nodeName) if !status.IsSuccess() { msg := fmt.Sprintf("error while running %q reserve plugin for pod %q: %v", pl.Name(), pod.Name, status.Message()) klog.Error(msg) @@ -508,11 +523,11 @@ func (f *framework) RunReservePlugins( // RunUnreservePlugins runs the set of configured unreserve plugins. func (f *framework) RunUnreservePlugins( - state *CycleState, pod *v1.Pod, nodeName string) { + ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) { startTime := time.Now() defer recordExtensionPointDuration(startTime, unreserve, nil) for _, pl := range f.unreservePlugins { - pl.Unreserve(state, pod, nodeName) + pl.Unreserve(ctx, state, pod, nodeName) } } @@ -524,13 +539,13 @@ func (f *framework) RunUnreservePlugins( // Note that if multiple plugins asked to wait, then we wait for the minimum // timeout duration. func (f *framework) RunPermitPlugins( - state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { + ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { startTime := time.Now() defer func() { recordExtensionPointDuration(startTime, permit, status) }() timeout := maxTimeout statusCode := Success for _, pl := range f.permitPlugins { - status, d := pl.Permit(state, pod, nodeName) + status, d := pl.Permit(ctx, state, pod, nodeName) if !status.IsSuccess() { if status.IsUnschedulable() { msg := fmt.Sprintf("rejected by %q at permit: %v", pl.Name(), status.Message()) diff --git a/pkg/scheduler/framework/v1alpha1/framework_test.go b/pkg/scheduler/framework/v1alpha1/framework_test.go index f9e9fc701c1..496b0d17035 100644 --- a/pkg/scheduler/framework/v1alpha1/framework_test.go +++ b/pkg/scheduler/framework/v1alpha1/framework_test.go @@ -17,19 +17,20 @@ limitations under the License. package v1alpha1 import ( + "context" "fmt" - "github.com/prometheus/client_golang/prometheus" - "k8s.io/kubernetes/pkg/scheduler/metrics" "reflect" "strings" "testing" "time" + "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/kubernetes/pkg/scheduler/apis/config" + "k8s.io/kubernetes/pkg/scheduler/metrics" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) @@ -86,11 +87,11 @@ func (pl *TestScoreWithNormalizePlugin) Name() string { return pl.name } -func (pl *TestScoreWithNormalizePlugin) NormalizeScore(state *CycleState, pod *v1.Pod, scores NodeScoreList) *Status { +func (pl *TestScoreWithNormalizePlugin) NormalizeScore(ctx context.Context, state *CycleState, pod *v1.Pod, scores NodeScoreList) *Status { return injectNormalizeRes(pl.inj, scores) } -func (pl *TestScoreWithNormalizePlugin) Score(state *CycleState, p *v1.Pod, nodeName string) (int64, *Status) { +func (pl *TestScoreWithNormalizePlugin) Score(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (int64, *Status) { return setScoreRes(pl.inj) } @@ -108,7 +109,7 @@ func (pl *TestScorePlugin) Name() string { return pl.name } -func (pl *TestScorePlugin) Score(state *CycleState, p *v1.Pod, nodeName string) (int64, *Status) { +func (pl *TestScorePlugin) Score(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (int64, *Status) { return setScoreRes(pl.inj) } @@ -133,7 +134,7 @@ func (pl *TestPlugin) Name() string { return pl.name } -func (pl *TestPlugin) Score(state *CycleState, p *v1.Pod, nodeName string) (int64, *Status) { +func (pl *TestPlugin) Score(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (int64, *Status) { return 0, NewStatus(Code(pl.inj.ScoreStatus), "injected status") } @@ -141,30 +142,39 @@ func (pl *TestPlugin) ScoreExtensions() ScoreExtensions { return nil } -func (pl *TestPlugin) PreFilter(state *CycleState, p *v1.Pod) *Status { +func (pl *TestPlugin) PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) *Status { return NewStatus(Code(pl.inj.PreFilterStatus), "injected status") } + func (pl *TestPlugin) PreFilterExtensions() PreFilterExtensions { return nil } -func (pl *TestPlugin) Filter(state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { + +func (pl *TestPlugin) Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { return NewStatus(Code(pl.inj.FilterStatus), "injected status") } -func (pl *TestPlugin) PostFilter(state *CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status { + +func (pl *TestPlugin) PostFilter(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status { return NewStatus(Code(pl.inj.PostFilterStatus), "injected status") } -func (pl *TestPlugin) Reserve(state *CycleState, p *v1.Pod, nodeName string) *Status { + +func (pl *TestPlugin) Reserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status { return NewStatus(Code(pl.inj.ReserveStatus), "injected status") } -func (pl *TestPlugin) PreBind(state *CycleState, p *v1.Pod, nodeName string) *Status { + +func (pl *TestPlugin) PreBind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status { return NewStatus(Code(pl.inj.PreBindStatus), "injected status") } -func (pl *TestPlugin) PostBind(state *CycleState, p *v1.Pod, nodeName string) {} -func (pl *TestPlugin) Unreserve(state *CycleState, p *v1.Pod, nodeName string) {} -func (pl *TestPlugin) Permit(state *CycleState, p *v1.Pod, nodeName string) (*Status, time.Duration) { + +func (pl *TestPlugin) PostBind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) {} + +func (pl *TestPlugin) Unreserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) {} + +func (pl *TestPlugin) Permit(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (*Status, time.Duration) { return NewStatus(Code(pl.inj.PermitStatus), "injected status"), time.Duration(0) } -func (pl *TestPlugin) Bind(state *CycleState, p *v1.Pod, nodeName string) *Status { + +func (pl *TestPlugin) Bind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status { return NewStatus(Code(pl.inj.BindStatus), "injected status") } @@ -177,7 +187,7 @@ func (pl *TestPreFilterPlugin) Name() string { return preFilterPluginName } -func (pl *TestPreFilterPlugin) PreFilter(state *CycleState, p *v1.Pod) *Status { +func (pl *TestPreFilterPlugin) PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) *Status { pl.PreFilterCalled++ return nil } @@ -197,18 +207,18 @@ func (pl *TestPreFilterWithExtensionsPlugin) Name() string { return preFilterWithExtensionsPluginName } -func (pl *TestPreFilterWithExtensionsPlugin) PreFilter(state *CycleState, p *v1.Pod) *Status { +func (pl *TestPreFilterWithExtensionsPlugin) PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) *Status { pl.PreFilterCalled++ return nil } -func (pl *TestPreFilterWithExtensionsPlugin) AddPod(state *CycleState, podToSchedule *v1.Pod, +func (pl *TestPreFilterWithExtensionsPlugin) AddPod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { pl.AddCalled++ return nil } -func (pl *TestPreFilterWithExtensionsPlugin) RemovePod(state *CycleState, podToSchedule *v1.Pod, +func (pl *TestPreFilterWithExtensionsPlugin) RemovePod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { pl.RemoveCalled++ return nil @@ -225,7 +235,7 @@ func (dp *TestDuplicatePlugin) Name() string { return duplicatePluginName } -func (dp *TestDuplicatePlugin) PreFilter(state *CycleState, p *v1.Pod) *Status { +func (dp *TestDuplicatePlugin) PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) *Status { return nil } @@ -503,7 +513,7 @@ func TestRunScorePlugins(t *testing.T) { t.Fatalf("Failed to create framework for testing: %v", err) } - res, status := f.RunScorePlugins(state, pod, nodes) + res, status := f.RunScorePlugins(context.Background(), state, pod, nodes) if tt.err { if status.IsSuccess() { @@ -540,9 +550,9 @@ func TestPreFilterPlugins(t *testing.T) { if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) } - f.RunPreFilterPlugins(nil, nil) - f.RunPreFilterExtensionAddPod(nil, nil, nil, nil) - f.RunPreFilterExtensionRemovePod(nil, nil, nil, nil) + f.RunPreFilterPlugins(context.Background(), nil, nil) + f.RunPreFilterExtensionAddPod(context.Background(), nil, nil, nil, nil) + f.RunPreFilterExtensionRemovePod(context.Background(), nil, nil, nil, nil) if preFilter1.PreFilterCalled != 1 { t.Errorf("preFilter1 called %v, expected: 1", preFilter1.PreFilterCalled) @@ -570,117 +580,117 @@ func TestRecordingMetrics(t *testing.T) { }{ { name: "PreFilter - Success", - action: func(f Framework) { f.RunPreFilterPlugins(nil, pod) }, + action: func(f Framework) { f.RunPreFilterPlugins(context.Background(), nil, pod) }, wantExtensionPoint: "PreFilter", wantStatus: Success, }, { name: "Filter - Success", - action: func(f Framework) { f.RunFilterPlugins(nil, pod, nil) }, + action: func(f Framework) { f.RunFilterPlugins(context.Background(), nil, pod, nil) }, wantExtensionPoint: "Filter", wantStatus: Success, }, { name: "PostFilter - Success", - action: func(f Framework) { f.RunPostFilterPlugins(nil, pod, nil, nil) }, + action: func(f Framework) { f.RunPostFilterPlugins(context.Background(), nil, pod, nil, nil) }, wantExtensionPoint: "PostFilter", wantStatus: Success, }, { name: "Score - Success", - action: func(f Framework) { f.RunScorePlugins(nil, pod, nodes) }, + action: func(f Framework) { f.RunScorePlugins(context.Background(), nil, pod, nodes) }, wantExtensionPoint: "Score", wantStatus: Success, }, { name: "Reserve - Success", - action: func(f Framework) { f.RunReservePlugins(nil, pod, "") }, + action: func(f Framework) { f.RunReservePlugins(context.Background(), nil, pod, "") }, wantExtensionPoint: "Reserve", wantStatus: Success, }, { name: "Unreserve - Success", - action: func(f Framework) { f.RunUnreservePlugins(nil, pod, "") }, + action: func(f Framework) { f.RunUnreservePlugins(context.Background(), nil, pod, "") }, wantExtensionPoint: "Unreserve", wantStatus: Success, }, { name: "PreBind - Success", - action: func(f Framework) { f.RunPreBindPlugins(nil, pod, "") }, + action: func(f Framework) { f.RunPreBindPlugins(context.Background(), nil, pod, "") }, wantExtensionPoint: "PreBind", wantStatus: Success, }, { name: "Bind - Success", - action: func(f Framework) { f.RunBindPlugins(nil, pod, "") }, + action: func(f Framework) { f.RunBindPlugins(context.Background(), nil, pod, "") }, wantExtensionPoint: "Bind", wantStatus: Success, }, { name: "PostBind - Success", - action: func(f Framework) { f.RunPostBindPlugins(nil, pod, "") }, + action: func(f Framework) { f.RunPostBindPlugins(context.Background(), nil, pod, "") }, wantExtensionPoint: "PostBind", wantStatus: Success, }, { name: "Permit - Success", - action: func(f Framework) { f.RunPermitPlugins(nil, pod, "") }, + action: func(f Framework) { f.RunPermitPlugins(context.Background(), nil, pod, "") }, wantExtensionPoint: "Permit", wantStatus: Success, }, { name: "PreFilter - Error", - action: func(f Framework) { f.RunPreFilterPlugins(nil, pod) }, + action: func(f Framework) { f.RunPreFilterPlugins(context.Background(), nil, pod) }, inject: injectedResult{PreFilterStatus: int(Error)}, wantExtensionPoint: "PreFilter", wantStatus: Error, }, { name: "Filter - Error", - action: func(f Framework) { f.RunFilterPlugins(nil, pod, nil) }, + action: func(f Framework) { f.RunFilterPlugins(context.Background(), nil, pod, nil) }, inject: injectedResult{FilterStatus: int(Error)}, wantExtensionPoint: "Filter", wantStatus: Error, }, { name: "PostFilter - Error", - action: func(f Framework) { f.RunPostFilterPlugins(nil, pod, nil, nil) }, + action: func(f Framework) { f.RunPostFilterPlugins(context.Background(), nil, pod, nil, nil) }, inject: injectedResult{PostFilterStatus: int(Error)}, wantExtensionPoint: "PostFilter", wantStatus: Error, }, { name: "Score - Error", - action: func(f Framework) { f.RunScorePlugins(nil, pod, nodes) }, + action: func(f Framework) { f.RunScorePlugins(context.Background(), nil, pod, nodes) }, inject: injectedResult{ScoreStatus: int(Error)}, wantExtensionPoint: "Score", wantStatus: Error, }, { name: "Reserve - Error", - action: func(f Framework) { f.RunReservePlugins(nil, pod, "") }, + action: func(f Framework) { f.RunReservePlugins(context.Background(), nil, pod, "") }, inject: injectedResult{ReserveStatus: int(Error)}, wantExtensionPoint: "Reserve", wantStatus: Error, }, { name: "PreBind - Error", - action: func(f Framework) { f.RunPreBindPlugins(nil, pod, "") }, + action: func(f Framework) { f.RunPreBindPlugins(context.Background(), nil, pod, "") }, inject: injectedResult{PreBindStatus: int(Error)}, wantExtensionPoint: "PreBind", wantStatus: Error, }, { name: "Bind - Error", - action: func(f Framework) { f.RunBindPlugins(nil, pod, "") }, + action: func(f Framework) { f.RunBindPlugins(context.Background(), nil, pod, "") }, inject: injectedResult{BindStatus: int(Error)}, wantExtensionPoint: "Bind", wantStatus: Error, }, { name: "Permit - Error", - action: func(f Framework) { f.RunPermitPlugins(nil, pod, "") }, + action: func(f Framework) { f.RunPermitPlugins(context.Background(), nil, pod, "") }, inject: injectedResult{PermitStatus: int(Error)}, wantExtensionPoint: "Permit", wantStatus: Error, diff --git a/pkg/scheduler/framework/v1alpha1/interface.go b/pkg/scheduler/framework/v1alpha1/interface.go index a5396c8dc53..e3f15c30760 100644 --- a/pkg/scheduler/framework/v1alpha1/interface.go +++ b/pkg/scheduler/framework/v1alpha1/interface.go @@ -19,6 +19,7 @@ limitations under the License. package v1alpha1 import ( + "context" "errors" "math" "time" @@ -206,10 +207,10 @@ type QueueSortPlugin interface { type PreFilterExtensions interface { // AddPod is called by the framework while trying to evaluate the impact // of adding podToAdd to the node while scheduling podToSchedule. - AddPod(state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status + AddPod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status // RemovePod is called by the framework while trying to evaluate the impact // of removing podToRemove from the node while scheduling podToSchedule. - RemovePod(state *CycleState, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status + RemovePod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status } // PreFilterPlugin is an interface that must be implemented by "prefilter" plugins. @@ -218,7 +219,7 @@ type PreFilterPlugin interface { Plugin // PreFilter is called at the beginning of the scheduling cycle. All PreFilter // plugins must return success or the pod will be rejected. - PreFilter(state *CycleState, p *v1.Pod) *Status + PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) *Status // PreFilterExtensions returns a PreFilterExtensions interface if the plugin implements one, // or nil if it does not. A Pre-filter plugin can provide extensions to incrementally // modify its pre-processed info. The framework guarantees that the extensions @@ -249,7 +250,7 @@ type FilterPlugin interface { // For example, during preemption, we may pass a copy of the original // nodeInfo object that has some pods removed from it to evaluate the // possibility of preempting them to schedule the target pod. - Filter(state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status + Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status } // PostFilterPlugin is an interface for Post-filter plugin. Post-filter is an @@ -262,7 +263,7 @@ type PostFilterPlugin interface { // passed the filtering phase. All postfilter plugins must return success or // the pod will be rejected. The filteredNodesStatuses is the set of filtered nodes // and their filter status. - PostFilter(state *CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status + PostFilter(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status } // ScoreExtensions is an interface for Score extended functionality. @@ -270,7 +271,7 @@ type ScoreExtensions interface { // NormalizeScore is called for all node scores produced by the same plugin's "Score" // method. A successful run of NormalizeScore will update the scores list and return // a success status. - NormalizeScore(state *CycleState, p *v1.Pod, scores NodeScoreList) *Status + NormalizeScore(ctx context.Context, state *CycleState, p *v1.Pod, scores NodeScoreList) *Status } // ScorePlugin is an interface that must be implemented by "score" plugins to rank @@ -280,7 +281,7 @@ type ScorePlugin interface { // Score is called on each filtered node. It must return success and an integer // indicating the rank of the node. All scoring plugins must return success or // the pod will be rejected. - Score(state *CycleState, p *v1.Pod, nodeName string) (int64, *Status) + Score(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (int64, *Status) // ScoreExtensions returns a ScoreExtensions interface if it implements one, or nil if does not. ScoreExtensions() ScoreExtensions @@ -296,7 +297,7 @@ type ReservePlugin interface { Plugin // Reserve is called by the scheduling framework when the scheduler cache is // updated. - Reserve(state *CycleState, p *v1.Pod, nodeName string) *Status + Reserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status } // PreBindPlugin is an interface that must be implemented by "prebind" plugins. @@ -305,7 +306,7 @@ type PreBindPlugin interface { Plugin // PreBind is called before binding a pod. All prebind plugins must return // success or the pod will be rejected and won't be sent for binding. - PreBind(state *CycleState, p *v1.Pod, nodeName string) *Status + PreBind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status } // PostBindPlugin is an interface that must be implemented by "postbind" plugins. @@ -316,7 +317,7 @@ type PostBindPlugin interface { // informational. A common application of this extension point is for cleaning // up. If a plugin needs to clean-up its state after a pod is scheduled and // bound, PostBind is the extension point that it should register. - PostBind(state *CycleState, p *v1.Pod, nodeName string) + PostBind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) } // UnreservePlugin is an interface for Unreserve plugins. This is an informational @@ -327,7 +328,7 @@ type UnreservePlugin interface { Plugin // Unreserve is called by the scheduling framework when a reserved pod was // rejected in a later phase. - Unreserve(state *CycleState, p *v1.Pod, nodeName string) + Unreserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) } // PermitPlugin is an interface that must be implemented by "permit" plugins. @@ -340,7 +341,7 @@ type PermitPlugin interface { // The pod will also be rejected if the wait timeout or the pod is rejected while // waiting. Note that if the plugin returns "wait", the framework will wait only // after running the remaining plugins given that no other plugin rejects the pod. - Permit(state *CycleState, p *v1.Pod, nodeName string) (*Status, time.Duration) + Permit(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (*Status, time.Duration) } // BindPlugin is an interface that must be implemented by "bind" plugins. Bind @@ -353,7 +354,7 @@ type BindPlugin interface { // remaining bind plugins are skipped. When a bind plugin does not handle a pod, // it must return Skip in its Status code. If a bind plugin returns an Error, the // pod is rejected and will not be bound. - Bind(state *CycleState, p *v1.Pod, nodeName string) *Status + Bind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status } // Framework manages the set of plugins in use by the scheduling framework. @@ -367,7 +368,7 @@ type Framework interface { // *Status and its code is set to non-success if any of the plugins returns // anything but Success. If a non-success status is returned, then the scheduling // cycle is aborted. - RunPreFilterPlugins(state *CycleState, pod *v1.Pod) *Status + RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) *Status // RunFilterPlugins runs the set of configured filter plugins for pod on // the given node. It returns directly if any of the filter plugins @@ -378,46 +379,46 @@ type Framework interface { // pass a copy of the original nodeInfo object that has some pods // removed from it to evaluate the possibility of preempting them to // schedule the target pod. - RunFilterPlugins(state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status + RunFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status // RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured // PreFilter plugins. It returns directly if any of the plugins return any // status other than Success. - RunPreFilterExtensionAddPod(state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status + RunPreFilterExtensionAddPod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status // RunPreFilterExtensionRemovePod calls the RemovePod interface for the set of configured // PreFilter plugins. It returns directly if any of the plugins return any // status other than Success. - RunPreFilterExtensionRemovePod(state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status + RunPreFilterExtensionRemovePod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status // RunPostFilterPlugins runs the set of configured post-filter plugins. If any // of these plugins returns any status other than "Success", the given node is // rejected. The filteredNodeStatuses is the set of filtered nodes and their statuses. - RunPostFilterPlugins(state *CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status + RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status // RunScorePlugins runs the set of configured scoring plugins. It returns a map that // stores for each scoring plugin name the corresponding NodeScoreList(s). // It also returns *Status, which is set to non-success if any of the plugins returns // a non-success status. - RunScorePlugins(state *CycleState, pod *v1.Pod, nodes []*v1.Node) (PluginToNodeScores, *Status) + RunScorePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) (PluginToNodeScores, *Status) // RunPreBindPlugins runs the set of configured prebind plugins. It returns // *Status and its code is set to non-success if any of the plugins returns // anything but Success. If the Status code is "Unschedulable", it is // considered as a scheduling check failure, otherwise, it is considered as an // internal error. In either case the pod is not going to be bound. - RunPreBindPlugins(state *CycleState, pod *v1.Pod, nodeName string) *Status + RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status // RunPostBindPlugins runs the set of configured postbind plugins. - RunPostBindPlugins(state *CycleState, pod *v1.Pod, nodeName string) + RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) // RunReservePlugins runs the set of configured reserve plugins. If any of these // plugins returns an error, it does not continue running the remaining ones and // returns the error. In such case, pod will not be scheduled. - RunReservePlugins(state *CycleState, pod *v1.Pod, nodeName string) *Status + RunReservePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status // RunUnreservePlugins runs the set of configured unreserve plugins. - RunUnreservePlugins(state *CycleState, pod *v1.Pod, nodeName string) + RunUnreservePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) // RunPermitPlugins runs the set of configured permit plugins. If any of these // plugins returns a status other than "Success" or "Wait", it does not continue @@ -426,14 +427,14 @@ type Framework interface { // returned by the plugin, if the time expires, then it will return an error. // Note that if multiple plugins asked to wait, then we wait for the minimum // timeout duration. - RunPermitPlugins(state *CycleState, pod *v1.Pod, nodeName string) *Status + RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status // RunBindPlugins runs the set of configured bind plugins. A bind plugin may choose // whether or not to handle the given Pod. If a bind plugin chooses to skip the // binding, it should return code=4("skip") status. Otherwise, it should return "Error" // or "Success". If none of the plugins handled binding, RunBindPlugins returns // code=4("skip") status. - RunBindPlugins(state *CycleState, pod *v1.Pod, nodeName string) *Status + RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status // ListPlugins returns a map of extension point name to list of configured Plugins. ListPlugins() map[string][]config.Plugin diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 0d63c09e050..66b2917b978 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -17,6 +17,7 @@ limitations under the License. package queue import ( + "context" "fmt" "reflect" "strings" @@ -180,47 +181,49 @@ func (*fakeFramework) NodeInfoSnapshot() *schedulernodeinfo.Snapshot { return nil } -func (*fakeFramework) RunPreFilterPlugins(state *framework.CycleState, pod *v1.Pod) *framework.Status { +func (*fakeFramework) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status { return nil } -func (*fakeFramework) RunFilterPlugins(state *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { +func (*fakeFramework) RunFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { return nil } -func (*fakeFramework) RunPreFilterExtensionAddPod(state *framework.CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { +func (*fakeFramework) RunPreFilterExtensionAddPod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { return nil } -func (*fakeFramework) RunPreFilterExtensionRemovePod(state *framework.CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { +func (*fakeFramework) RunPreFilterExtensionRemovePod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { return nil } -func (*fakeFramework) RunScorePlugins(state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) (framework.PluginToNodeScores, *framework.Status) { +func (*fakeFramework) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) (framework.PluginToNodeScores, *framework.Status) { return nil, nil } -func (*fakeFramework) RunPreBindPlugins(state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { +func (*fakeFramework) RunPreBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { return nil } -func (*fakeFramework) RunBindPlugins(state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { +func (*fakeFramework) RunBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { return nil } -func (*fakeFramework) RunPostBindPlugins(state *framework.CycleState, pod *v1.Pod, nodeName string) {} +func (*fakeFramework) RunPostBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) { +} -func (*fakeFramework) RunPostFilterPlugins(state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses framework.NodeToStatusMap) *framework.Status { +func (*fakeFramework) RunPostFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses framework.NodeToStatusMap) *framework.Status { return nil } -func (*fakeFramework) RunReservePlugins(state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { +func (*fakeFramework) RunReservePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { return nil } -func (*fakeFramework) RunUnreservePlugins(state *framework.CycleState, pod *v1.Pod, nodeName string) {} +func (*fakeFramework) RunUnreservePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) { +} -func (*fakeFramework) RunPermitPlugins(state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { +func (*fakeFramework) RunPermitPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { return nil } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 43d5f4d8fab..aa74e6070f2 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -17,6 +17,7 @@ limitations under the License. package scheduler import ( + "context" "fmt" "io/ioutil" "os" @@ -415,13 +416,13 @@ func NewFromConfig(config *Config) *Scheduler { } } -// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately. -func (sched *Scheduler) Run() { +// Run begins watching and scheduling. It waits for cache to be synced, then starts scheduling and blocked until the context is done. +func (sched *Scheduler) Run(ctx context.Context) { if !sched.WaitForCacheSync() { return } - go wait.Until(sched.scheduleOne, 0, sched.StopEverything) + wait.UntilWithContext(ctx, sched.scheduleOne, 0) } // recordFailedSchedulingEvent records an event for the pod that indicates the @@ -444,14 +445,14 @@ func (sched *Scheduler) recordSchedulingFailure(podInfo *framework.PodInfo, err // preempt tries to create room for a pod that has failed to schedule, by preempting lower priority pods if possible. // If it succeeds, it adds the name of the node where preemption has happened to the pod spec. // It returns the node name and an error if any. -func (sched *Scheduler) preempt(state *framework.CycleState, fwk framework.Framework, preemptor *v1.Pod, scheduleErr error) (string, error) { +func (sched *Scheduler) preempt(ctx context.Context, state *framework.CycleState, fwk framework.Framework, preemptor *v1.Pod, scheduleErr error) (string, error) { preemptor, err := sched.podPreemptor.getUpdatedPod(preemptor) if err != nil { klog.Errorf("Error getting the updated preemptor pod object: %v", err) return "", err } - node, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(state, preemptor, scheduleErr) + node, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(ctx, state, preemptor, scheduleErr) if err != nil { klog.Errorf("Error preempting victims to make room for %v/%v: %v", preemptor.Namespace, preemptor.Name, err) return "", err @@ -547,9 +548,9 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { // bind binds a pod to a given node defined in a binding object. We expect this to run asynchronously, so we // handle binding metrics internally. -func (sched *Scheduler) bind(assumed *v1.Pod, targetNode string, state *framework.CycleState) error { +func (sched *Scheduler) bind(ctx context.Context, assumed *v1.Pod, targetNode string, state *framework.CycleState) error { bindingStart := time.Now() - bindStatus := sched.Framework.RunBindPlugins(state, assumed, targetNode) + bindStatus := sched.Framework.RunBindPlugins(ctx, state, assumed, targetNode) var err error if !bindStatus.IsSuccess() { if bindStatus.Code() == framework.Skip { @@ -587,7 +588,7 @@ func (sched *Scheduler) bind(assumed *v1.Pod, targetNode string, state *framewor } // scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting. -func (sched *Scheduler) scheduleOne() { +func (sched *Scheduler) scheduleOne(ctx context.Context) { fwk := sched.Framework podInfo := sched.NextPod() @@ -607,7 +608,7 @@ func (sched *Scheduler) scheduleOne() { // Synchronously attempt to find a fit for the pod. start := time.Now() state := framework.NewCycleState() - scheduleResult, err := sched.Algorithm.Schedule(state, pod) + scheduleResult, err := sched.Algorithm.Schedule(ctx, state, pod) if err != nil { sched.recordSchedulingFailure(podInfo.DeepCopy(), err, v1.PodReasonUnschedulable, err.Error()) // Schedule() may have failed because the pod would not fit on any host, so we try to @@ -620,7 +621,7 @@ func (sched *Scheduler) scheduleOne() { " No preemption is performed.") } else { preemptionStartTime := time.Now() - sched.preempt(state, fwk, pod, fitError) + sched.preempt(ctx, state, fwk, pod, fitError) metrics.PreemptionAttempts.Inc() metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime)) metrics.DeprecatedSchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime)) @@ -660,7 +661,7 @@ func (sched *Scheduler) scheduleOne() { } // Run "reserve" plugins. - if sts := fwk.RunReservePlugins(state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { + if sts := fwk.RunReservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { sched.recordSchedulingFailure(assumedPodInfo, sts.AsError(), SchedulerError, sts.Message()) metrics.PodScheduleErrors.Inc() return @@ -677,7 +678,7 @@ func (sched *Scheduler) scheduleOne() { sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("AssumePod failed: %v", err)) metrics.PodScheduleErrors.Inc() // trigger un-reserve plugins to clean up state associated with the reserved Pod - fwk.RunUnreservePlugins(state, assumedPod, scheduleResult.SuggestedHost) + fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) return } // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). @@ -689,13 +690,13 @@ func (sched *Scheduler) scheduleOne() { sched.recordSchedulingFailure(assumedPodInfo, err, "VolumeBindingFailed", err.Error()) metrics.PodScheduleErrors.Inc() // trigger un-reserve plugins to clean up state associated with the reserved Pod - fwk.RunUnreservePlugins(state, assumedPod, scheduleResult.SuggestedHost) + fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) return } } // Run "permit" plugins. - permitStatus := fwk.RunPermitPlugins(state, assumedPod, scheduleResult.SuggestedHost) + permitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) if !permitStatus.IsSuccess() { var reason string if permitStatus.IsUnschedulable() { @@ -709,13 +710,13 @@ func (sched *Scheduler) scheduleOne() { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } // trigger un-reserve plugins to clean up state associated with the reserved Pod - fwk.RunUnreservePlugins(state, assumedPod, scheduleResult.SuggestedHost) + fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) sched.recordSchedulingFailure(assumedPodInfo, permitStatus.AsError(), reason, permitStatus.Message()) return } // Run "prebind" plugins. - preBindStatus := fwk.RunPreBindPlugins(state, assumedPod, scheduleResult.SuggestedHost) + preBindStatus := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) if !preBindStatus.IsSuccess() { var reason string metrics.PodScheduleErrors.Inc() @@ -724,18 +725,18 @@ func (sched *Scheduler) scheduleOne() { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } // trigger un-reserve plugins to clean up state associated with the reserved Pod - fwk.RunUnreservePlugins(state, assumedPod, scheduleResult.SuggestedHost) + fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) sched.recordSchedulingFailure(assumedPodInfo, preBindStatus.AsError(), reason, preBindStatus.Message()) return } - err := sched.bind(assumedPod, scheduleResult.SuggestedHost, state) + err := sched.bind(ctx, assumedPod, scheduleResult.SuggestedHost, state) metrics.E2eSchedulingLatency.Observe(metrics.SinceInSeconds(start)) metrics.DeprecatedE2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start)) if err != nil { metrics.PodScheduleErrors.Inc() // trigger un-reserve plugins to clean up state associated with the reserved Pod - fwk.RunUnreservePlugins(state, assumedPod, scheduleResult.SuggestedHost) + fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("Binding rejected: %v", err)) } else { // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2. @@ -749,7 +750,7 @@ func (sched *Scheduler) scheduleOne() { metrics.PodSchedulingDuration.Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp)) // Run "postbind" plugins. - fwk.RunPostBindPlugins(state, assumedPod, scheduleResult.SuggestedHost) + fwk.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) } }() } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index c87f1b0c979..8d849e7ec56 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -17,6 +17,7 @@ limitations under the License. package scheduler import ( + "context" "errors" "fmt" "io/ioutil" @@ -150,7 +151,7 @@ type mockScheduler struct { err error } -func (es mockScheduler) Schedule(state *framework.CycleState, pod *v1.Pod) (core.ScheduleResult, error) { +func (es mockScheduler) Schedule(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (core.ScheduleResult, error) { return es.result, es.err } @@ -164,7 +165,7 @@ func (es mockScheduler) Extenders() []algorithm.SchedulerExtender { return nil } -func (es mockScheduler) Preempt(state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) { +func (es mockScheduler) Preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) { return nil, nil, nil, nil } @@ -301,7 +302,7 @@ func TestScheduler(t *testing.T) { } close(called) }) - s.scheduleOne() + s.scheduleOne(context.Background()) <-called if e, a := item.expectAssumedPod, gotAssumedPod; !reflect.DeepEqual(e, a) { t.Errorf("assumed pod: wanted %v, got %v", e, a) @@ -371,7 +372,7 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) { // We use conflicted pod ports to incur fit predicate failure if first pod not removed. secondPod := podWithPort("bar", "", 8080) queuedPodStore.Add(secondPod) - scheduler.scheduleOne() + scheduler.scheduleOne(context.Background()) select { case b := <-bindingChan: expectBinding := &v1.Binding{ @@ -405,7 +406,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { // queuedPodStore: [bar:8080] // cache: [(assumed)foo:8080] - scheduler.scheduleOne() + scheduler.scheduleOne(context.Background()) select { case err := <-errChan: expectErr := &core.FitError{ @@ -436,7 +437,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { } queuedPodStore.Add(secondPod) - scheduler.scheduleOne() + scheduler.scheduleOne(context.Background()) select { case b := <-bindingChan: expectBinding := &v1.Binding{ @@ -496,7 +497,7 @@ func TestSchedulerErrorWithLongBinding(t *testing.T) { informerFactory.Start(stop) informerFactory.WaitForCacheSync(stop) - scheduler.Run() + go scheduler.Run(context.Background()) queuedPodStore.Add(firstPod) queuedPodStore.Add(conflictPod) @@ -534,7 +535,7 @@ func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcach // queuedPodStore: [foo:8080] // cache: [] - scheduler.scheduleOne() + scheduler.scheduleOne(context.Background()) // queuedPodStore: [] // cache: [(assumed)foo:8080] @@ -613,7 +614,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { informerFactory.WaitForCacheSync(stop) queuedPodStore.Add(podWithTooBigResourceRequests) - scheduler.scheduleOne() + scheduler.scheduleOne(context.Background()) select { case err := <-errChan: expectErr := &core.FitError{ @@ -890,7 +891,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) { } close(eventChan) }) - s.scheduleOne() + s.scheduleOne(context.Background()) // Wait for pod to succeed or fail scheduling select { case <-eventChan: diff --git a/test/integration/daemonset/daemonset_test.go b/test/integration/daemonset/daemonset_test.go index 3c0828f26ae..8b4e7c5d347 100644 --- a/test/integration/daemonset/daemonset_test.go +++ b/test/integration/daemonset/daemonset_test.go @@ -17,6 +17,7 @@ limitations under the License. package daemonset import ( + "context" "fmt" "net/http/httptest" "testing" @@ -84,10 +85,10 @@ func setup(t *testing.T) (*httptest.Server, framework.CloseFunc, *daemon.DaemonS } func setupScheduler( + ctx context.Context, t *testing.T, cs clientset.Interface, informerFactory informers.SharedInformerFactory, - stopCh chan struct{}, ) (restoreFeatureGates func()) { restoreFeatureGates = func() {} // If ScheduleDaemonSetPods is disabled, do not start scheduler. @@ -115,15 +116,15 @@ func setupScheduler( schedulerconfig.SchedulerAlgorithmSource{ Provider: &defaultProviderName, }, - stopCh, + ctx.Done(), ) if err != nil { t.Fatalf("Couldn't create scheduler: %v", err) } - eventBroadcaster.StartRecordingToSink(stopCh) + eventBroadcaster.StartRecordingToSink(ctx.Done()) - go sched.Run() + go sched.Run(ctx) return } @@ -498,14 +499,14 @@ func TestOneNodeDaemonLaunchesPod(t *testing.T) { nodeClient := clientset.CoreV1().Nodes() podInformer := informers.Core().V1().Pods().Informer() - stopCh := make(chan struct{}) - defer close(stopCh) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Start Scheduler - defer setupScheduler(t, clientset, informers, stopCh)() + defer setupScheduler(ctx, t, clientset, informers)() - informers.Start(stopCh) - go dc.Run(5, stopCh) + informers.Start(ctx.Done()) + go dc.Run(5, ctx.Done()) ds := newDaemonSet("foo", ns.Name) ds.Spec.UpdateStrategy = *strategy @@ -539,14 +540,14 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) { nodeClient := clientset.CoreV1().Nodes() podInformer := informers.Core().V1().Pods().Informer() - stopCh := make(chan struct{}) - defer close(stopCh) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - informers.Start(stopCh) - go dc.Run(5, stopCh) + informers.Start(ctx.Done()) + go dc.Run(5, ctx.Done()) // Start Scheduler - defer setupScheduler(t, clientset, informers, stopCh)() + defer setupScheduler(ctx, t, clientset, informers)() ds := newDaemonSet("foo", ns.Name) ds.Spec.UpdateStrategy = *strategy @@ -577,14 +578,14 @@ func TestDaemonSetWithNodeSelectorLaunchesPods(t *testing.T) { nodeClient := clientset.CoreV1().Nodes() podInformer := informers.Core().V1().Pods().Informer() - stopCh := make(chan struct{}) - defer close(stopCh) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - informers.Start(stopCh) - go dc.Run(5, stopCh) + informers.Start(ctx.Done()) + go dc.Run(5, ctx.Done()) // Start Scheduler - defer setupScheduler(t, clientset, informers, stopCh)() + defer setupScheduler(ctx, t, clientset, informers)() ds := newDaemonSet("foo", ns.Name) ds.Spec.UpdateStrategy = *strategy @@ -647,14 +648,14 @@ func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) { nodeClient := clientset.CoreV1().Nodes() podInformer := informers.Core().V1().Pods().Informer() - stopCh := make(chan struct{}) - defer close(stopCh) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - informers.Start(stopCh) - go dc.Run(5, stopCh) + informers.Start(ctx.Done()) + go dc.Run(5, ctx.Done()) // Start Scheduler - defer setupScheduler(t, clientset, informers, stopCh)() + defer setupScheduler(ctx, t, clientset, informers)() ds := newDaemonSet("foo", ns.Name) ds.Spec.UpdateStrategy = *strategy @@ -735,14 +736,15 @@ func TestInsufficientCapacityNodeWhenScheduleDaemonSetPodsEnabled(t *testing.T) podClient := clientset.CoreV1().Pods(ns.Name) podInformer := informers.Core().V1().Pods().Informer() nodeClient := clientset.CoreV1().Nodes() - stopCh := make(chan struct{}) - defer close(stopCh) - informers.Start(stopCh) - go dc.Run(5, stopCh) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + informers.Start(ctx.Done()) + go dc.Run(5, ctx.Done()) // Start Scheduler - defer setupScheduler(t, clientset, informers, stopCh)() + defer setupScheduler(ctx, t, clientset, informers)() ds := newDaemonSet("foo", ns.Name) ds.Spec.Template.Spec = resourcePodSpec("", "120M", "75m") @@ -799,13 +801,14 @@ func TestLaunchWithHashCollision(t *testing.T) { podInformer := informers.Core().V1().Pods().Informer() nodeClient := clientset.CoreV1().Nodes() - stopCh := make(chan struct{}) - defer close(stopCh) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - informers.Start(stopCh) - go dc.Run(1, stopCh) + informers.Start(ctx.Done()) + go dc.Run(5, ctx.Done()) - defer setupScheduler(t, clientset, informers, stopCh)() + // Start Scheduler + defer setupScheduler(ctx, t, clientset, informers)() // Create single node _, err := nodeClient.Create(newNode("single-node", nil)) @@ -909,14 +912,15 @@ func TestTaintedNode(t *testing.T) { podClient := clientset.CoreV1().Pods(ns.Name) podInformer := informers.Core().V1().Pods().Informer() nodeClient := clientset.CoreV1().Nodes() - stopCh := make(chan struct{}) - defer close(stopCh) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + informers.Start(ctx.Done()) + go dc.Run(5, ctx.Done()) // Start Scheduler - defer setupScheduler(t, clientset, informers, stopCh)() - informers.Start(stopCh) - - go dc.Run(5, stopCh) + defer setupScheduler(ctx, t, clientset, informers)() ds := newDaemonSet("foo", ns.Name) ds.Spec.UpdateStrategy = *strategy @@ -978,14 +982,14 @@ func TestUnschedulableNodeDaemonDoesLaunchPod(t *testing.T) { nodeClient := clientset.CoreV1().Nodes() podInformer := informers.Core().V1().Pods().Informer() - stopCh := make(chan struct{}) - defer close(stopCh) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - informers.Start(stopCh) - go dc.Run(5, stopCh) + informers.Start(ctx.Done()) + go dc.Run(5, ctx.Done()) // Start Scheduler - defer setupScheduler(t, clientset, informers, stopCh)() + defer setupScheduler(ctx, t, clientset, informers)() ds := newDaemonSet("foo", ns.Name) ds.Spec.UpdateStrategy = *strategy diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index 5cf1d501ced..94b4f08a2b0 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -17,6 +17,7 @@ limitations under the License. package scheduler import ( + "context" "fmt" "testing" "time" @@ -148,7 +149,7 @@ func (sp *ScorePlugin) reset() { } // Score returns the score of scheduling a pod on a specific node. -func (sp *ScorePlugin) Score(state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { +func (sp *ScorePlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { sp.numScoreCalled++ if sp.failScore { return 0, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", p.Name)) @@ -179,13 +180,13 @@ func (sp *ScoreWithNormalizePlugin) reset() { } // Score returns the score of scheduling a pod on a specific node. -func (sp *ScoreWithNormalizePlugin) Score(state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { +func (sp *ScoreWithNormalizePlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { sp.numScoreCalled++ score := int64(10) return score, nil } -func (sp *ScoreWithNormalizePlugin) NormalizeScore(state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status { +func (sp *ScoreWithNormalizePlugin) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status { sp.numNormalizeScoreCalled++ return nil } @@ -207,7 +208,7 @@ func (fp *FilterPlugin) reset() { // Filter is a test function that returns an error or nil, depending on the // value of "failFilter". -func (fp *FilterPlugin) Filter(state *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { +func (fp *FilterPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { fp.numFilterCalled++ if fp.failFilter { @@ -224,7 +225,7 @@ func (rp *ReservePlugin) Name() string { // Reserve is a test function that returns an error or nil, depending on the // value of "failReserve". -func (rp *ReservePlugin) Reserve(state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { +func (rp *ReservePlugin) Reserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { rp.numReserveCalled++ if rp.failReserve { return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) @@ -243,7 +244,7 @@ func (*PostFilterPlugin) Name() string { } // PostFilter is a test function. -func (pfp *PostFilterPlugin) PostFilter(_ *framework.CycleState, pod *v1.Pod, _ []*v1.Node, _ framework.NodeToStatusMap) *framework.Status { +func (pfp *PostFilterPlugin) PostFilter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, _ []*v1.Node, _ framework.NodeToStatusMap) *framework.Status { pfp.numPostFilterCalled++ if pfp.failPostFilter { return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) @@ -264,7 +265,7 @@ func (pp *PreBindPlugin) Name() string { } // PreBind is a test function that returns (true, nil) or errors for testing. -func (pp *PreBindPlugin) PreBind(state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { +func (pp *PreBindPlugin) PreBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { pp.numPreBindCalled++ if pp.failPreBind { return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) @@ -288,7 +289,7 @@ func (bp *BindPlugin) Name() string { return bp.PluginName } -func (bp *BindPlugin) Bind(state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status { +func (bp *BindPlugin) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status { bp.numBindCalled++ if bp.pluginInvokeEventChan != nil { bp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: bp.Name(), val: bp.numBindCalled} @@ -318,7 +319,7 @@ func (pp *PostBindPlugin) Name() string { } // PostBind is a test function, which counts the number of times called. -func (pp *PostBindPlugin) PostBind(state *framework.CycleState, pod *v1.Pod, nodeName string) { +func (pp *PostBindPlugin) PostBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) { pp.numPostBindCalled++ if pp.pluginInvokeEventChan != nil { pp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: pp.Name(), val: pp.numPostBindCalled} @@ -341,7 +342,7 @@ func (pp *PreFilterPlugin) PreFilterExtensions() framework.PreFilterExtensions { } // PreFilter is a test function that returns (true, nil) or errors for testing. -func (pp *PreFilterPlugin) PreFilter(state *framework.CycleState, pod *v1.Pod) *framework.Status { +func (pp *PreFilterPlugin) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status { pp.numPreFilterCalled++ if pp.failPreFilter { return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) @@ -366,7 +367,7 @@ func (up *UnreservePlugin) Name() string { // Unreserve is a test function that returns an error or nil, depending on the // value of "failUnreserve". -func (up *UnreservePlugin) Unreserve(state *framework.CycleState, pod *v1.Pod, nodeName string) { +func (up *UnreservePlugin) Unreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) { up.numUnreserveCalled++ if up.pluginInvokeEventChan != nil { up.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: up.Name(), val: up.numUnreserveCalled} @@ -384,7 +385,7 @@ func (pp *PermitPlugin) Name() string { } // Permit implements the permit test plugin. -func (pp *PermitPlugin) Permit(state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) { +func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) { pp.numPermitCalled++ if pp.failPermit { return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)), 0 diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index daf72df29aa..cc7483e2168 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -19,6 +19,7 @@ limitations under the License. package scheduler import ( + "context" "fmt" "testing" "time" @@ -84,7 +85,7 @@ func (fp *tokenFilter) Name() string { return tokenFilterName } -func (fp *tokenFilter) Filter(state *framework.CycleState, pod *v1.Pod, +func (fp *tokenFilter) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { if fp.Tokens > 0 { fp.Tokens-- @@ -97,17 +98,17 @@ func (fp *tokenFilter) Filter(state *framework.CycleState, pod *v1.Pod, return framework.NewStatus(status, fmt.Sprintf("can't fit %v", pod.Name)) } -func (fp *tokenFilter) PreFilter(state *framework.CycleState, pod *v1.Pod) *framework.Status { +func (fp *tokenFilter) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status { return nil } -func (fp *tokenFilter) AddPod(state *framework.CycleState, podToSchedule *v1.Pod, +func (fp *tokenFilter) AddPod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { fp.Tokens-- return nil } -func (fp *tokenFilter) RemovePod(state *framework.CycleState, podToSchedule *v1.Pod, +func (fp *tokenFilter) RemovePod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { fp.Tokens++ return nil @@ -526,8 +527,8 @@ func TestPodPriorityResolution(t *testing.T) { externalInformers := informers.NewSharedInformerFactory(externalClientset, time.Second) admission.SetExternalKubeClientSet(externalClientset) admission.SetExternalKubeInformerFactory(externalInformers) - externalInformers.Start(context.stopCh) - externalInformers.WaitForCacheSync(context.stopCh) + externalInformers.Start(context.ctx.Done()) + externalInformers.WaitForCacheSync(context.ctx.Done()) tests := []struct { Name string diff --git a/test/integration/scheduler/taint_test.go b/test/integration/scheduler/taint_test.go index cd5086453b0..38660bb8c5b 100644 --- a/test/integration/scheduler/taint_test.go +++ b/test/integration/scheduler/taint_test.go @@ -23,7 +23,7 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -116,13 +116,13 @@ func TestTaintNodeByCondition(t *testing.T) { t.Errorf("Failed to create node controller: %v", err) return } - go nc.Run(context.stopCh) + go nc.Run(context.ctx.Done()) // Waiting for all controller sync. - externalInformers.Start(context.stopCh) - externalInformers.WaitForCacheSync(context.stopCh) - informers.Start(context.stopCh) - informers.WaitForCacheSync(context.stopCh) + externalInformers.Start(context.ctx.Done()) + externalInformers.WaitForCacheSync(context.ctx.Done()) + informers.Start(context.ctx.Done()) + informers.WaitForCacheSync(context.ctx.Done()) // ------------------------------------------- // Test TaintNodeByCondition feature. @@ -696,13 +696,13 @@ func TestTaintBasedEvictions(t *testing.T) { return } - go nc.Run(context.stopCh) + go nc.Run(context.ctx.Done()) // Waiting for all controller sync. - externalInformers.Start(context.stopCh) - externalInformers.WaitForCacheSync(context.stopCh) - informers.Start(context.stopCh) - informers.WaitForCacheSync(context.stopCh) + externalInformers.Start(context.ctx.Done()) + externalInformers.WaitForCacheSync(context.ctx.Done()) + informers.Start(context.ctx.Done()) + informers.WaitForCacheSync(context.ctx.Done()) nodeRes := v1.ResourceList{ v1.ResourceCPU: resource.MustParse("4000m"), diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index 852a5fc9cac..47547fe680f 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -17,6 +17,7 @@ limitations under the License. package scheduler import ( + "context" "fmt" "net/http" "net/http/httptest" @@ -67,7 +68,8 @@ type testContext struct { clientSet *clientset.Clientset informerFactory informers.SharedInformerFactory scheduler *scheduler.Scheduler - stopCh chan struct{} + ctx context.Context + cancelFn context.CancelFunc } func createAlgorithmSourceFromPolicy(policy *schedulerapi.Policy, clientSet clientset.Interface) schedulerconfig.SchedulerAlgorithmSource { @@ -93,8 +95,10 @@ func createAlgorithmSourceFromPolicy(policy *schedulerapi.Policy, clientSet clie // initTestMasterAndScheduler initializes a test environment and creates a master with default // configuration. func initTestMaster(t *testing.T, nsPrefix string, admission admission.Interface) *testContext { + ctx, cancelFunc := context.WithCancel(context.Background()) context := testContext{ - stopCh: make(chan struct{}), + ctx: ctx, + cancelFn: cancelFunc, } // 1. Create master @@ -187,7 +191,7 @@ func initTestSchedulerWithOptions( podInformer, recorder, algorithmSrc, - context.stopCh, + context.ctx.Done(), opts..., ) @@ -207,7 +211,7 @@ func initTestSchedulerWithOptions( context.informerFactory.Start(context.scheduler.StopEverything) context.informerFactory.WaitForCacheSync(context.scheduler.StopEverything) - context.scheduler.Run() + go context.scheduler.Run(context.ctx) return context } @@ -261,7 +265,7 @@ func initTestDisablePreemption(t *testing.T, nsPrefix string) *testContext { // at the end of a test. func cleanupTest(t *testing.T, context *testContext) { // Kill the scheduler. - close(context.stopCh) + context.cancelFn() // Cleanup nodes. context.clientSet.CoreV1().Nodes().DeleteCollection(nil, metav1.ListOptions{}) framework.DeleteTestingNamespace(context.ns, context.httpServer, t) diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 1226b21c5bc..af387617584 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -17,6 +17,7 @@ limitations under the License. package util import ( + "context" "net/http" "net/http/httptest" @@ -59,30 +60,31 @@ func StartApiserver() (string, ShutdownFunc) { // StartScheduler configures and starts a scheduler given a handle to the clientSet interface // and event broadcaster. It returns the running scheduler and the shutdown function to stop it. func StartScheduler(clientSet clientset.Interface) (*scheduler.Scheduler, coreinformers.PodInformer, ShutdownFunc) { + ctx, cancel := context.WithCancel(context.Background()) + informerFactory := informers.NewSharedInformerFactory(clientSet, 0) podInformer := informerFactory.Core().V1().Pods() - stopCh := make(chan struct{}) evtBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ Interface: clientSet.EventsV1beta1().Events("")}) - evtBroadcaster.StartRecordingToSink(stopCh) + evtBroadcaster.StartRecordingToSink(ctx.Done()) recorder := evtBroadcaster.NewRecorder( legacyscheme.Scheme, v1.DefaultSchedulerName, ) - sched, err := createScheduler(clientSet, informerFactory, podInformer, recorder, stopCh) + sched, err := createScheduler(clientSet, informerFactory, podInformer, recorder, ctx.Done()) if err != nil { klog.Fatalf("Error creating scheduler: %v", err) } - informerFactory.Start(stopCh) - sched.Run() + informerFactory.Start(ctx.Done()) + go sched.Run(ctx) shutdownFunc := func() { klog.Infof("destroying scheduler") - close(stopCh) + cancel() klog.Infof("destroyed scheduler") } return sched, podInformer, shutdownFunc diff --git a/test/integration/volumescheduling/util.go b/test/integration/volumescheduling/util.go index 747a0eecefd..79c4935598e 100644 --- a/test/integration/volumescheduling/util.go +++ b/test/integration/volumescheduling/util.go @@ -17,6 +17,7 @@ limitations under the License. package volumescheduling import ( + "context" "net/http" "net/http/httptest" "testing" @@ -50,14 +51,18 @@ type testContext struct { clientSet *clientset.Clientset informerFactory informers.SharedInformerFactory scheduler *scheduler.Scheduler - stopCh chan struct{} + + ctx context.Context + cancelFn context.CancelFunc } // initTestMaster initializes a test environment and creates a master with default // configuration. func initTestMaster(t *testing.T, nsPrefix string, admission admission.Interface) *testContext { + ctx, cancelFunc := context.WithCancel(context.Background()) context := testContext{ - stopCh: make(chan struct{}), + ctx: ctx, + cancelFn: cancelFunc, } // 1. Create master @@ -114,19 +119,18 @@ func initTestSchedulerWithOptions( var err error context.scheduler, err = createSchedulerWithPodInformer( - context.clientSet, podInformer, context.informerFactory, recorder, context.stopCh) + context.clientSet, podInformer, context.informerFactory, recorder, context.ctx.Done()) if err != nil { t.Fatalf("Couldn't create scheduler: %v", err) } - stopCh := make(chan struct{}) - eventBroadcaster.StartRecordingToSink(stopCh) + eventBroadcaster.StartRecordingToSink(context.ctx.Done()) context.informerFactory.Start(context.scheduler.StopEverything) context.informerFactory.WaitForCacheSync(context.scheduler.StopEverything) - context.scheduler.Run() + go context.scheduler.Run(context.ctx) return context } @@ -156,7 +160,7 @@ func createSchedulerWithPodInformer( // at the end of a test. func cleanupTest(t *testing.T, context *testContext) { // Kill the scheduler. - close(context.stopCh) + context.cancelFn() // Cleanup nodes. context.clientSet.CoreV1().Nodes().DeleteCollection(nil, metav1.ListOptions{}) framework.DeleteTestingNamespace(context.ns, context.httpServer, t) diff --git a/test/integration/volumescheduling/volume_binding_test.go b/test/integration/volumescheduling/volume_binding_test.go index b4a0cd18a57..9ef36c97afb 100644 --- a/test/integration/volumescheduling/volume_binding_test.go +++ b/test/integration/volumescheduling/volume_binding_test.go @@ -891,10 +891,10 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int, resyncPeriod t if err != nil { t.Fatalf("Failed to create PV controller: %v", err) } - go ctrl.Run(context.stopCh) + go ctrl.Run(context.ctx.Done()) // Start informer factory after all controllers are configured and running. - informerFactory.Start(context.stopCh) - informerFactory.WaitForCacheSync(context.stopCh) + informerFactory.Start(context.ctx.Done()) + informerFactory.WaitForCacheSync(context.ctx.Done()) // Create shared objects // Create nodes @@ -915,7 +915,7 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int, resyncPeriod t return &testConfig{ client: clientset, ns: ns, - stop: context.stopCh, + stop: context.ctx.Done(), teardown: func() { klog.Infof("test cluster %q start to tear down", ns) deleteTestObjects(clientset, ns, nil)