From 1b3a124ba6049f91175ac2f2b141720af1601ffc Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Fri, 21 May 2021 13:47:06 -0700 Subject: [PATCH] Scheduler now registers event handlers dynamically - move clusterEventMap to Configurator - dynamic event handlers registration for core API resources - dynamic event handlers registration for custom resources --- pkg/scheduler/eventhandlers.go | 181 +++++++++++----------- pkg/scheduler/factory.go | 8 +- pkg/scheduler/factory_test.go | 1 + pkg/scheduler/scheduler.go | 27 +++- test/integration/scheduler/queue_test.go | 184 +++++++++++++++++++++++ vendor/modules.txt | 2 + 6 files changed, 314 insertions(+), 89 deletions(-) diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index 15ba318adb5..3d9ae24234e 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -19,7 +19,10 @@ package scheduler import ( "fmt" "reflect" + "strings" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/klog/v2" v1 "k8s.io/api/core/v1" @@ -39,32 +42,6 @@ import ( "k8s.io/kubernetes/pkg/scheduler/profile" ) -func (sched *Scheduler) onPvAdd(obj interface{}) { - // Pods created when there are no PVs available will be stuck in - // unschedulable queue. But unbound PVs created for static provisioning and - // delay binding storage class are skipped in PV controller dynamic - // provisioning and binding process, will not trigger events to schedule pod - // again. So we need to move pods to active queue on PV add for this - // scenario. - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvAdd, nil) -} - -func (sched *Scheduler) onPvUpdate(old, new interface{}) { - // Scheduler.bindVolumesWorker may fail to update assumed pod volume - // bindings due to conflicts if PVs are updated by PV controller or other - // parties, then scheduler will add pod back to unschedulable queue. We - // need to move pods to active queue on PV update for this scenario. - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvUpdate, nil) -} - -func (sched *Scheduler) onPvcAdd(obj interface{}) { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcAdd, nil) -} - -func (sched *Scheduler) onPvcUpdate(old, new interface{}) { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcUpdate, nil) -} - func (sched *Scheduler) onStorageClassAdd(obj interface{}) { sc, ok := obj.(*storagev1.StorageClass) if !ok { @@ -83,18 +60,6 @@ func (sched *Scheduler) onStorageClassAdd(obj interface{}) { } } -func (sched *Scheduler) onServiceAdd(obj interface{}) { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceAdd, nil) -} - -func (sched *Scheduler) onServiceUpdate(oldObj interface{}, newObj interface{}) { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceUpdate, nil) -} - -func (sched *Scheduler) onServiceDelete(obj interface{}) { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceDelete, nil) -} - func (sched *Scheduler) addNodeToCache(obj interface{}) { node, ok := obj.(*v1.Node) if !ok { @@ -153,14 +118,6 @@ func (sched *Scheduler) deleteNodeFromCache(obj interface{}) { } } -func (sched *Scheduler) onCSINodeAdd(obj interface{}) { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeAdd, nil) -} - -func (sched *Scheduler) onCSINodeUpdate(oldObj, newObj interface{}) { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeUpdate, nil) -} - func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) { pod := obj.(*v1.Pod) klog.V(3).InfoS("Add event for unscheduled pod", "pod", klog.KObj(pod)) @@ -312,6 +269,8 @@ func responsibleForPod(pod *v1.Pod, profiles profile.Map) bool { func addAllEventHandlers( sched *Scheduler, informerFactory informers.SharedInformerFactory, + dynInformerFactory dynamicinformer.DynamicSharedInformerFactory, + gvkMap map[framework.GVK]framework.ActionType, ) { // scheduled pod cache informerFactory.Core().V1().Pods().Informer().AddEventHandler( @@ -372,46 +331,100 @@ func addAllEventHandlers( }, ) - informerFactory.Storage().V1().CSINodes().Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: sched.onCSINodeAdd, - UpdateFunc: sched.onCSINodeUpdate, - }, - ) + buildEvtResHandler := func(at framework.ActionType, gvk framework.GVK, shortGVK string) cache.ResourceEventHandlerFuncs { + funcs := cache.ResourceEventHandlerFuncs{} + if at&framework.Add != 0 { + evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Add, Label: fmt.Sprintf("%vAdd", shortGVK)} + funcs.AddFunc = func(_ interface{}) { + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(evt, nil) + } + } + if at&framework.Update != 0 { + evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Update, Label: fmt.Sprintf("%vUpdate", shortGVK)} + funcs.UpdateFunc = func(_, _ interface{}) { + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(evt, nil) + } + } + if at&framework.Delete != 0 { + evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Delete, Label: fmt.Sprintf("%vDelete", shortGVK)} + funcs.DeleteFunc = func(_ interface{}) { + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(evt, nil) + } + } + return funcs + } - // On add and update of PVs. - informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ + for gvk, at := range gvkMap { + switch gvk { + case framework.Node, framework.Pod: + // Do nothing. + case framework.CSINode: + informerFactory.Storage().V1().CSINodes().Informer().AddEventHandler( + buildEvtResHandler(at, framework.CSINode, "CSINode"), + ) + case framework.PersistentVolume: // MaxPDVolumeCountPredicate: since it relies on the counts of PV. - AddFunc: sched.onPvAdd, - UpdateFunc: sched.onPvUpdate, - }, - ) - - // This is for MaxPDVolumeCountPredicate: add/update PVC will affect counts of PV when it is bound. - informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: sched.onPvcAdd, - UpdateFunc: sched.onPvcUpdate, - }, - ) - - // This is for ServiceAffinity: affected by the selector of the service is updated. - // Also, if new service is added, equivalence cache will also become invalid since - // existing pods may be "captured" by this service and change this predicate result. - informerFactory.Core().V1().Services().Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: sched.onServiceAdd, - UpdateFunc: sched.onServiceUpdate, - DeleteFunc: sched.onServiceDelete, - }, - ) - - informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: sched.onStorageClassAdd, - }, - ) + // + // PvAdd: Pods created when there are no PVs available will be stuck in + // unschedulable queue. But unbound PVs created for static provisioning and + // delay binding storage class are skipped in PV controller dynamic + // provisioning and binding process, will not trigger events to schedule pod + // again. So we need to move pods to active queue on PV add for this + // scenario. + // + // PvUpdate: Scheduler.bindVolumesWorker may fail to update assumed pod volume + // bindings due to conflicts if PVs are updated by PV controller or other + // parties, then scheduler will add pod back to unschedulable queue. We + // need to move pods to active queue on PV update for this scenario. + informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler( + buildEvtResHandler(at, framework.PersistentVolume, "Pv"), + ) + case framework.PersistentVolumeClaim: + // MaxPDVolumeCountPredicate: add/update PVC will affect counts of PV when it is bound. + informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler( + buildEvtResHandler(at, framework.PersistentVolumeClaim, "Pvc"), + ) + case framework.StorageClass: + if at&framework.Add != 0 { + informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: sched.onStorageClassAdd, + }, + ) + } + case framework.Service: + // ServiceAffinity: affected by the selector of the service is updated. + // Also, if new service is added, equivalence cache will also become invalid since + // existing pods may be "captured" by this service and change this predicate result. + informerFactory.Core().V1().Services().Informer().AddEventHandler( + buildEvtResHandler(at, framework.Service, "Service"), + ) + default: + // Tests may not instantiate dynInformerFactory. + if dynInformerFactory == nil { + continue + } + // GVK is expected to be at least 3-folded, separated by dots. + // .. + // Valid examples: + // - foos.v1.example.com + // - bars.v1beta1.a.b.c + // Invalid examples: + // - foos.v1 (2 sections) + // - foo.v1.example.com (the first section should be plural) + if strings.Count(string(gvk), ".") < 2 { + klog.ErrorS(nil, "incorrect event registration", "gvk", gvk) + continue + } + // Fall back to try dynamic informers. + gvr, _ := schema.ParseResourceArg(string(gvk)) + dynInformer := dynInformerFactory.ForResource(*gvr).Informer() + dynInformer.AddEventHandler( + buildEvtResHandler(at, gvk, strings.Title(gvr.Resource)), + ) + go dynInformer.Run(sched.StopEverything) + } + } } func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent { diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index fb501219672..0c071b9a700 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -86,6 +86,8 @@ type Configurator struct { extenders []schedulerapi.Extender frameworkCapturer FrameworkCapturer parallellism int32 + // A "cluster event" -> "plugin names" map. + clusterEventMap map[framework.ClusterEvent]sets.String } // create a scheduler from a set of registered plugins. @@ -135,8 +137,6 @@ func (c *Configurator) create() (*Scheduler, error) { // The nominator will be passed all the way to framework instantiation. nominator := internalqueue.NewPodNominator() - // It's a "cluster event" -> "plugin names" map. - clusterEventMap := make(map[framework.ClusterEvent]sets.String) profiles, err := profile.NewMap(c.profiles, c.registry, c.recorderFactory, frameworkruntime.WithClientSet(c.client), frameworkruntime.WithKubeConfig(c.kubeConfig), @@ -145,7 +145,7 @@ func (c *Configurator) create() (*Scheduler, error) { frameworkruntime.WithRunAllFilters(c.alwaysCheckAllPredicates), frameworkruntime.WithPodNominator(nominator), frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(c.frameworkCapturer)), - frameworkruntime.WithClusterEventMap(clusterEventMap), + frameworkruntime.WithClusterEventMap(c.clusterEventMap), frameworkruntime.WithParallelism(int(c.parallellism)), ) if err != nil { @@ -162,7 +162,7 @@ func (c *Configurator) create() (*Scheduler, error) { internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second), internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second), internalqueue.WithPodNominator(nominator), - internalqueue.WithClusterEventMap(clusterEventMap), + internalqueue.WithClusterEventMap(c.clusterEventMap), ) // Setup cache debugger. diff --git a/pkg/scheduler/factory_test.go b/pkg/scheduler/factory_test.go index 9694ca3c4b3..b84f054acf5 100644 --- a/pkg/scheduler/factory_test.go +++ b/pkg/scheduler/factory_test.go @@ -648,6 +648,7 @@ func newConfigFactoryWithFrameworkRegistry( }, recorderFactory: recorderFactory, nodeInfoSnapshot: snapshot, + clusterEventMap: make(map[framework.ClusterEvent]sets.String), } } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index a8cef0fb50a..8d4d734c731 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -29,7 +29,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" @@ -221,6 +224,7 @@ func New(client clientset.Interface, } snapshot := internalcache.NewEmptySnapshot() + clusterEventMap := make(map[framework.ClusterEvent]sets.String) configurator := &Configurator{ client: client, @@ -238,6 +242,7 @@ func New(client clientset.Interface, extenders: options.extenders, frameworkCapturer: options.frameworkCapturer, parallellism: options.parallelism, + clusterEventMap: clusterEventMap, } metrics.Register() @@ -281,10 +286,30 @@ func New(client clientset.Interface, sched.StopEverything = stopEverything sched.client = client - addAllEventHandlers(sched, informerFactory) + // Build dynamic client and dynamic informer factory + var dynInformerFactory dynamicinformer.DynamicSharedInformerFactory + // options.kubeConfig can be nil in tests. + if options.kubeConfig != nil { + dynClient := dynamic.NewForConfigOrDie(options.kubeConfig) + dynInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, v1.NamespaceAll, nil) + } + + addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap)) return sched, nil } +func unionedGVKs(m map[framework.ClusterEvent]sets.String) map[framework.GVK]framework.ActionType { + gvkMap := make(map[framework.GVK]framework.ActionType) + for evt := range m { + if _, ok := gvkMap[evt.Resource]; ok { + gvkMap[evt.Resource] |= evt.ActionType + } else { + gvkMap[evt.Resource] = evt.ActionType + } + } + return gvkMap +} + // initPolicyFromFile initialize policy from file func initPolicyFromFile(policyFile string, policy *schedulerapi.Policy) error { // Use a policy serialized in a file. diff --git a/test/integration/scheduler/queue_test.go b/test/integration/scheduler/queue_test.go index fd0e22e036d..895f1441179 100644 --- a/test/integration/scheduler/queue_test.go +++ b/test/integration/scheduler/queue_test.go @@ -17,18 +17,31 @@ limitations under the License. package scheduler import ( + "context" "fmt" "testing" "time" v1 "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/pkg/scheduler" schedapi "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity" + frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" st "k8s.io/kubernetes/pkg/scheduler/testing" + testfwk "k8s.io/kubernetes/test/integration/framework" testutils "k8s.io/kubernetes/test/integration/util" imageutils "k8s.io/kubernetes/test/utils/image" ) @@ -148,3 +161,174 @@ func TestServiceAffinityEnqueue(t *testing.T) { t.Errorf("Expected the Pod to be attempted 2 times, but got %v", podInfo.Attempts) } } + +var _ framework.FilterPlugin = &fakeCRPlugin{} +var _ framework.EnqueueExtensions = &fakeCRPlugin{} + +type fakeCRPlugin struct{} + +func (f *fakeCRPlugin) Name() string { + return "fakeCRPlugin" +} + +func (f *fakeCRPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status { + return framework.NewStatus(framework.Unschedulable, "always fail") +} + +// EventsToRegister returns the possible events that may make a Pod +// failed by this plugin schedulable. +func (f *fakeCRPlugin) EventsToRegister() []framework.ClusterEvent { + return []framework.ClusterEvent{ + {Resource: "foos.v1.example.com", ActionType: framework.All}, + } +} + +// TestCustomResourceEnqueue constructs a fake plugin that registers custom resources +// to verify Pods failed by this plugin can be moved properly upon CR events. +func TestCustomResourceEnqueue(t *testing.T) { + // Start API Server with apiextensions supported. + server := apiservertesting.StartTestServerOrDie( + t, apiservertesting.NewDefaultTestServerOptions(), + []string{"--disable-admission-plugins=ServiceAccount,TaintNodesByCondition", "--runtime-config=api/all=true"}, + testfwk.SharedEtcd(), + ) + testCtx := &testutils.TestContext{} + testCtx.Ctx, testCtx.CancelFn = context.WithCancel(context.Background()) + testCtx.CloseFn = func() { server.TearDownFn() } + + apiExtensionClient := apiextensionsclient.NewForConfigOrDie(server.ClientConfig) + dynamicClient := dynamic.NewForConfigOrDie(server.ClientConfig) + + // Create a Foo CRD. + fooCRD := &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foos.example.com", + }, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Group: "example.com", + Scope: apiextensionsv1.NamespaceScoped, + Names: apiextensionsv1.CustomResourceDefinitionNames{ + Plural: "foos", + Kind: "Foo", + }, + Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ + { + Name: "v1", + Served: true, + Storage: true, + Schema: &apiextensionsv1.CustomResourceValidation{ + OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{ + Type: "object", + Properties: map[string]apiextensionsv1.JSONSchemaProps{ + "field": {Type: "string"}, + }, + }, + }, + }, + }, + }, + } + var err error + fooCRD, err = apiExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Create(testCtx.Ctx, fooCRD, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + + registry := frameworkruntime.Registry{ + "fakeCRPlugin": func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + return &fakeCRPlugin{}, nil + }, + } + profile := schedapi.KubeSchedulerProfile{ + SchedulerName: v1.DefaultSchedulerName, + Plugins: &schedapi.Plugins{ + Filter: schedapi.PluginSet{ + Enabled: []schedapi.Plugin{ + {Name: "fakeCRPlugin"}, + }, + }, + }, + } + + testCtx.KubeConfig = server.ClientConfig + testCtx.ClientSet = kubernetes.NewForConfigOrDie(server.ClientConfig) + testCtx.NS, err = testCtx.ClientSet.CoreV1().Namespaces().Create(testCtx.Ctx, &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("cr-enqueue-%v", string(uuid.NewUUID()))}}, metav1.CreateOptions{}) + if err != nil && !errors.IsAlreadyExists(err) { + t.Fatalf("Failed to integration test ns: %v", err) + } + + // Use zero backoff seconds to bypass backoffQ. + testCtx = testutils.InitTestSchedulerWithOptions( + t, + testCtx, + nil, + scheduler.WithProfiles(profile), + scheduler.WithFrameworkOutOfTreeRegistry(registry), + scheduler.WithPodInitialBackoffSeconds(0), + scheduler.WithPodMaxBackoffSeconds(0), + ) + testutils.SyncInformerFactory(testCtx) + // It's intended to not start the scheduler's queue, and hence to + // not start any flushing logic. We will pop and schedule the Pods manually later. + defer testutils.CleanupTest(t, testCtx) + + cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx + // Create one Node. + node := st.MakeNode().Name("fake-node").Obj() + if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create Node %q: %v", node.Name, err) + } + + // Create a testing Pod. + pause := imageutils.GetPauseImageName() + pod := st.MakePod().Namespace(ns).Name("fake-pod").Container(pause).Obj() + if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create Pod %q: %v", pod.Name, err) + } + + // Wait for the testing Pod to be present in the scheduling queue. + if err := wait.Poll(time.Millisecond*200, wait.ForeverTestTimeout, func() (bool, error) { + return len(testCtx.Scheduler.SchedulingQueue.PendingPods()) == 1, nil + }); err != nil { + t.Fatal(err) + } + + // Pop fake-pod out. It should be unschedulable. + podInfo := testCtx.Scheduler.NextPod() + fwk, ok := testCtx.Scheduler.Profiles[podInfo.Pod.Spec.SchedulerName] + if !ok { + t.Fatalf("Cannot find the profile for Pod %v", podInfo.Pod.Name) + } + // Schedule the Pod manually. + _, fitError := testCtx.Scheduler.Algorithm.Schedule(ctx, fwk, framework.NewCycleState(), podInfo.Pod) + // The fitError is expected to be non-nil as it failed the fakeCRPlugin plugin. + if fitError == nil { + t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name) + } + testCtx.Scheduler.Error(podInfo, fitError) + + // Scheduling cycle is incremented from 0 to 1 after NextPod() is called, so + // pass a number larger than 1 to move Pod to unschedulableQ. + testCtx.Scheduler.SchedulingQueue.AddUnschedulableIfNotPresent(podInfo, 10) + + // Trigger a Custom Resource event. + // We expect this event to trigger moving the test Pod from unschedulableQ to activeQ. + crdGVR := schema.GroupVersionResource{Group: fooCRD.Spec.Group, Version: fooCRD.Spec.Versions[0].Name, Resource: "foos"} + crClient := dynamicClient.Resource(crdGVR).Namespace(ns) + if _, err := crClient.Create(ctx, &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "example.com/v1", + "kind": "Foo", + "metadata": map[string]interface{}{"name": "foo1"}, + }, + }, metav1.CreateOptions{}); err != nil { + t.Fatalf("Unable to create cr: %v", err) + } + + // Now we should be able to pop the Pod from activeQ again. + podInfo = testCtx.Scheduler.NextPod() + if podInfo.Attempts != 2 { + t.Errorf("Expected the Pod to be attempted 2 times, but got %v", podInfo.Attempts) + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 2e5890fc886..1d22f2efcca 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1593,6 +1593,8 @@ k8s.io/client-go/discovery/cached/disk k8s.io/client-go/discovery/cached/memory k8s.io/client-go/discovery/fake k8s.io/client-go/dynamic +k8s.io/client-go/dynamic/dynamicinformer +k8s.io/client-go/dynamic/dynamiclister k8s.io/client-go/dynamic/fake k8s.io/client-go/informers k8s.io/client-go/informers/admissionregistration