diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index a68db516b68..8a31cc4baa5 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -514,7 +514,7 @@ func addAllEventHandlers( } handlers = append(handlers, handlerRegistration) case framework.PodSchedulingContext: - if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + if utilfeature.DefaultFeatureGate.Enabled(features.DRAControlPlaneController) { if handlerRegistration, err = informerFactory.Resource().V1alpha3().PodSchedulingContexts().Informer().AddEventHandler( buildEvtResHandler(at, framework.PodSchedulingContext, "PodSchedulingContext"), ); err != nil { @@ -529,6 +529,15 @@ func addAllEventHandlers( ) handlers = append(handlers, handlerRegistration) } + case framework.ResourceSlice: + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + if handlerRegistration, err = informerFactory.Resource().V1alpha3().ResourceSlices().Informer().AddEventHandler( + buildEvtResHandler(at, framework.ResourceSlice, "ResourceSlice"), + ); err != nil { + return err + } + handlers = append(handlers, handlerRegistration) + } case framework.DeviceClass: if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { if handlerRegistration, err = informerFactory.Resource().V1alpha3().DeviceClasses().Informer().AddEventHandler( diff --git a/pkg/scheduler/eventhandlers_test.go b/pkg/scheduler/eventhandlers_test.go index 365c8dc7726..a053fa4bc55 100644 --- a/pkg/scheduler/eventhandlers_test.go +++ b/pkg/scheduler/eventhandlers_test.go @@ -216,6 +216,7 @@ func TestAddAllEventHandlers(t *testing.T) { name string gvkMap map[framework.GVK]framework.ActionType enableDRA bool + enableClassicDRA bool expectStaticInformers map[reflect.Type]bool expectDynamicInformers map[schema.GroupVersionResource]bool }{ @@ -234,6 +235,7 @@ func TestAddAllEventHandlers(t *testing.T) { gvkMap: map[framework.GVK]framework.ActionType{ framework.PodSchedulingContext: framework.Add, framework.ResourceClaim: framework.Add, + framework.ResourceSlice: framework.Add, framework.DeviceClass: framework.Add, }, expectStaticInformers: map[reflect.Type]bool{ @@ -244,19 +246,41 @@ func TestAddAllEventHandlers(t *testing.T) { expectDynamicInformers: map[schema.GroupVersionResource]bool{}, }, { - name: "DRA events enabled", + name: "some DRA events enabled", gvkMap: map[framework.GVK]framework.ActionType{ framework.PodSchedulingContext: framework.Add, framework.ResourceClaim: framework.Add, + framework.ResourceSlice: framework.Add, framework.DeviceClass: framework.Add, }, enableDRA: true, + expectStaticInformers: map[reflect.Type]bool{ + reflect.TypeOf(&v1.Pod{}): true, + reflect.TypeOf(&v1.Node{}): true, + reflect.TypeOf(&v1.Namespace{}): true, + reflect.TypeOf(&resourceapi.ResourceClaim{}): true, + reflect.TypeOf(&resourceapi.ResourceSlice{}): true, + reflect.TypeOf(&resourceapi.DeviceClass{}): true, + }, + expectDynamicInformers: map[schema.GroupVersionResource]bool{}, + }, + { + name: "all DRA events enabled", + gvkMap: map[framework.GVK]framework.ActionType{ + framework.PodSchedulingContext: framework.Add, + framework.ResourceClaim: framework.Add, + framework.ResourceSlice: framework.Add, + framework.DeviceClass: framework.Add, + }, + enableDRA: true, + enableClassicDRA: true, expectStaticInformers: map[reflect.Type]bool{ reflect.TypeOf(&v1.Pod{}): true, reflect.TypeOf(&v1.Node{}): true, reflect.TypeOf(&v1.Namespace{}): true, reflect.TypeOf(&resourceapi.PodSchedulingContext{}): true, reflect.TypeOf(&resourceapi.ResourceClaim{}): true, + reflect.TypeOf(&resourceapi.ResourceSlice{}): true, reflect.TypeOf(&resourceapi.DeviceClass{}): true, }, expectDynamicInformers: map[schema.GroupVersionResource]bool{}, @@ -320,6 +344,7 @@ func TestAddAllEventHandlers(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicResourceAllocation, tt.enableDRA) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DRAControlPlaneController, tt.enableClassicDRA) logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 81589fbfa34..ccbdb16b76a 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -398,6 +398,8 @@ func (pl *dynamicResources) EventsToRegister(_ context.Context) ([]framework.Clu {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}}, // A pod might be waiting for a class to get created or modified. {Event: framework.ClusterEvent{Resource: framework.DeviceClass, ActionType: framework.Add | framework.Update}}, + // Adding or updating a ResourceSlice might make a pod schedulable because new resources became available. + {Event: framework.ClusterEvent{Resource: framework.ResourceSlice, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterResourceSliceChange}, } if pl.podSchedulingContextLister != nil { @@ -445,7 +447,7 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po // This is not an unexpected error: we know that // foreachPodResourceClaim only returns errors for "not // schedulable". - logger.V(4).Info("pod is not schedulable", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim), "reason", err.Error()) + logger.V(6).Info("pod is not schedulable after resource claim change", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim), "reason", err.Error()) return framework.QueueSkip, nil } @@ -491,6 +493,38 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po return framework.Queue, nil } +// isSchedulableAfterResourceSliceChange is invoked for add and update slice events reported by +// an informer. Such changes can make an unschedulable pod schedulable when the pod requests a device +// and the change adds a suitable device. +// +// For the sake of faster execution and avoiding code duplication, isSchedulableAfterResourceSliceChange +// only checks whether the pod uses claims. All of the more detailed checks are done in the scheduling +// attempt. +// +// The delete claim event will not invoke it, so newObj will never be nil. +func (pl *dynamicResources) isSchedulableAfterResourceSliceChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + _, modifiedSlice, err := schedutil.As[*resourceapi.ResourceSlice](oldObj, newObj) + if err != nil { + // Shouldn't happen. + return framework.Queue, fmt.Errorf("unexpected object in isSchedulableAfterResourceSliceChange: %w", err) + } + + if err := pl.foreachPodResourceClaim(pod, nil); err != nil { + // This is not an unexpected error: we know that + // foreachPodResourceClaim only returns errors for "not + // schedulable". + logger.V(6).Info("pod is not schedulable after resource slice change", "pod", klog.KObj(pod), "resourceSlice", klog.KObj(modifiedSlice), "reason", err.Error()) + return framework.QueueSkip, nil + } + + // We could check what got changed in the slice, but right now that's likely to be + // about the spec (there's no status yet...). + // We could check whether all claims use classic DRA, but that doesn't seem worth it. + // Let's assume that changing the slice may make the pod schedulable. + logger.V(5).Info("ResourceSlice change might make pod schedulable", "pod", klog.KObj(pod), "resourceSlice", klog.KObj(modifiedSlice)) + return framework.Queue, nil +} + // isSchedulableAfterPodSchedulingContextChange is invoked for all // PodSchedulingContext events reported by an informer. It checks whether that // change made a previously unschedulable pod schedulable (updated) or a new diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index 870f1c5cdd2..4743306ca63 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -125,6 +125,7 @@ const ( StorageClass GVK = "storage.k8s.io/StorageClass" PodSchedulingContext GVK = "PodSchedulingContext" ResourceClaim GVK = "ResourceClaim" + ResourceSlice GVK = "ResourceSlice" DeviceClass GVK = "DeviceClass" // WildCard is a special GVK to match all resources. diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go index 61334e00ba4..c7bdd1f4247 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go @@ -208,7 +208,7 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] alloc.requestData[requestIndices{claimIndex: claimIndex, requestIndex: requestIndex}] = requestData numDevices += requestData.numDevices } - alloc.logger.Info("Checked claim", "claim", klog.KObj(claim), "numDevices", numDevices) + alloc.logger.V(6).Info("Checked claim", "claim", klog.KObj(claim), "numDevices", numDevices) // Check that we don't end up with too many results. if numDevices > resourceapi.AllocationResultsMaxSize { diff --git a/test/e2e/dra/deploy.go b/test/e2e/dra/deploy.go index eb9b424bed2..a3fd78ba2de 100644 --- a/test/e2e/dra/deploy.go +++ b/test/e2e/dra/deploy.go @@ -77,69 +77,81 @@ type Nodes struct { var pluginPermissions string // NewNodes selects nodes to run the test on. +// +// Call this outside of ginkgo.It, then use the instance inside ginkgo.It. func NewNodes(f *framework.Framework, minNodes, maxNodes int) *Nodes { nodes := &Nodes{} ginkgo.BeforeEach(func(ctx context.Context) { - - ginkgo.By("selecting nodes") - // The kubelet plugin is harder. We deploy the builtin manifest - // after patching in the driver name and all nodes on which we - // want the plugin to run. - // - // Only a subset of the nodes are picked to avoid causing - // unnecessary load on a big cluster. - nodeList, err := e2enode.GetBoundedReadySchedulableNodes(ctx, f.ClientSet, maxNodes) - framework.ExpectNoError(err, "get nodes") - numNodes := int32(len(nodeList.Items)) - if int(numNodes) < minNodes { - e2eskipper.Skipf("%d ready nodes required, only have %d", minNodes, numNodes) - } - nodes.NodeNames = nil - for _, node := range nodeList.Items { - nodes.NodeNames = append(nodes.NodeNames, node.Name) - } - sort.Strings(nodes.NodeNames) - framework.Logf("testing on nodes %v", nodes.NodeNames) - - // Watch claims in the namespace. This is useful for monitoring a test - // and enables additional sanity checks. - claimInformer := resourceapiinformer.NewResourceClaimInformer(f.ClientSet, f.Namespace.Name, 100*time.Hour /* resync */, nil) - cancelCtx, cancel := context.WithCancelCause(context.Background()) - var wg sync.WaitGroup - ginkgo.DeferCleanup(func() { - cancel(errors.New("test has completed")) - wg.Wait() - }) - _, err = claimInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj any) { - defer ginkgo.GinkgoRecover() - claim := obj.(*resourceapi.ResourceClaim) - framework.Logf("New claim:\n%s", format.Object(claim, 1)) - validateClaim(claim) - }, - UpdateFunc: func(oldObj, newObj any) { - defer ginkgo.GinkgoRecover() - oldClaim := oldObj.(*resourceapi.ResourceClaim) - newClaim := newObj.(*resourceapi.ResourceClaim) - framework.Logf("Updated claim:\n%s\nDiff:\n%s", format.Object(newClaim, 1), cmp.Diff(oldClaim, newClaim)) - validateClaim(newClaim) - }, - DeleteFunc: func(obj any) { - defer ginkgo.GinkgoRecover() - claim := obj.(*resourceapi.ResourceClaim) - framework.Logf("Deleted claim:\n%s", format.Object(claim, 1)) - }, - }) - framework.ExpectNoError(err, "AddEventHandler") - wg.Add(1) - go func() { - defer wg.Done() - claimInformer.Run(cancelCtx.Done()) - }() + nodes.init(ctx, f, minNodes, maxNodes) }) return nodes } +// NewNodesNow is a variant of NewNodes which can be used inside a ginkgo.It. +func NewNodesNow(ctx context.Context, f *framework.Framework, minNodes, maxNodes int) *Nodes { + nodes := &Nodes{} + nodes.init(ctx, f, minNodes, maxNodes) + return nodes +} + +func (nodes *Nodes) init(ctx context.Context, f *framework.Framework, minNodes, maxNodes int) { + ginkgo.By("selecting nodes") + // The kubelet plugin is harder. We deploy the builtin manifest + // after patching in the driver name and all nodes on which we + // want the plugin to run. + // + // Only a subset of the nodes are picked to avoid causing + // unnecessary load on a big cluster. + nodeList, err := e2enode.GetBoundedReadySchedulableNodes(ctx, f.ClientSet, maxNodes) + framework.ExpectNoError(err, "get nodes") + numNodes := int32(len(nodeList.Items)) + if int(numNodes) < minNodes { + e2eskipper.Skipf("%d ready nodes required, only have %d", minNodes, numNodes) + } + nodes.NodeNames = nil + for _, node := range nodeList.Items { + nodes.NodeNames = append(nodes.NodeNames, node.Name) + } + sort.Strings(nodes.NodeNames) + framework.Logf("testing on nodes %v", nodes.NodeNames) + + // Watch claims in the namespace. This is useful for monitoring a test + // and enables additional sanity checks. + claimInformer := resourceapiinformer.NewResourceClaimInformer(f.ClientSet, f.Namespace.Name, 100*time.Hour /* resync */, nil) + cancelCtx, cancel := context.WithCancelCause(context.Background()) + var wg sync.WaitGroup + ginkgo.DeferCleanup(func() { + cancel(errors.New("test has completed")) + wg.Wait() + }) + _, err = claimInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + defer ginkgo.GinkgoRecover() + claim := obj.(*resourceapi.ResourceClaim) + framework.Logf("New claim:\n%s", format.Object(claim, 1)) + validateClaim(claim) + }, + UpdateFunc: func(oldObj, newObj any) { + defer ginkgo.GinkgoRecover() + oldClaim := oldObj.(*resourceapi.ResourceClaim) + newClaim := newObj.(*resourceapi.ResourceClaim) + framework.Logf("Updated claim:\n%s\nDiff:\n%s", format.Object(newClaim, 1), cmp.Diff(oldClaim, newClaim)) + validateClaim(newClaim) + }, + DeleteFunc: func(obj any) { + defer ginkgo.GinkgoRecover() + claim := obj.(*resourceapi.ResourceClaim) + framework.Logf("Deleted claim:\n%s", format.Object(claim, 1)) + }, + }) + framework.ExpectNoError(err, "AddEventHandler") + wg.Add(1) + go func() { + defer wg.Done() + claimInformer.Run(cancelCtx.Done()) + }() +} + func validateClaim(claim *resourceapi.ResourceClaim) { // The apiserver doesn't enforce that a claim always has a finalizer // while being allocated. This is a convention that whoever allocates a @@ -153,28 +165,43 @@ func validateClaim(claim *resourceapi.ResourceClaim) { // NewDriver sets up controller (as client of the cluster) and // kubelet plugin (via proxy) before the test runs. It cleans // up after the test. +// +// Call this outside of ginkgo.It, then use the instance inside ginkgo.It. func NewDriver(f *framework.Framework, nodes *Nodes, configureResources func() app.Resources, devicesPerNode ...map[string]map[resourceapi.QualifiedName]resourceapi.DeviceAttribute) *Driver { - d := &Driver{ - f: f, - fail: map[MethodInstance]bool{}, - callCounts: map[MethodInstance]int64{}, - NodeV1alpha3: true, - } + d := NewDriverInstance(f) ginkgo.BeforeEach(func() { - resources := configureResources() - if len(resources.Nodes) == 0 { - // This always has to be set because the driver might - // not run on all nodes. - resources.Nodes = nodes.NodeNames - } - ginkgo.DeferCleanup(d.IsGone) // Register first so it gets called last. - d.SetUp(nodes, resources, devicesPerNode...) - ginkgo.DeferCleanup(d.TearDown) + d.Run(nodes, configureResources, devicesPerNode...) }) return d } +// NewDriverInstance is a variant of NewDriver where the driver is inactive and must +// be started explicitly with Run. May be used inside ginkgo.It. +func NewDriverInstance(f *framework.Framework) *Driver { + d := &Driver{ + f: f, + fail: map[MethodInstance]bool{}, + callCounts: map[MethodInstance]int64{}, + NodeV1alpha3: true, + parameterMode: parameterModeStructured, + } + d.initName() + return d +} + +func (d *Driver) Run(nodes *Nodes, configureResources func() app.Resources, devicesPerNode ...map[string]map[resourceapi.QualifiedName]resourceapi.DeviceAttribute) { + resources := configureResources() + if len(resources.Nodes) == 0 { + // This always has to be set because the driver might + // not run on all nodes. + resources.Nodes = nodes.NodeNames + } + ginkgo.DeferCleanup(d.IsGone) // Register first so it gets called last. + d.SetUp(nodes, resources, devicesPerNode...) + ginkgo.DeferCleanup(d.TearDown) +} + type MethodInstance struct { Nodename string FullMethod string @@ -215,25 +242,23 @@ const ( parameterModeStructured parameterMode = "structured" // allocation through scheduler ) -func (d *Driver) SetUp(nodes *Nodes, resources app.Resources, devicesPerNode ...map[string]map[resourceapi.QualifiedName]resourceapi.DeviceAttribute) { - ginkgo.By(fmt.Sprintf("deploying driver on nodes %v", nodes.NodeNames)) - d.Nodes = make(map[string]KubeletPlugin) +func (d *Driver) initName() { d.Name = d.f.UniqueName + d.NameSuffix + ".k8s.io" +} + +func (d *Driver) SetUp(nodes *Nodes, resources app.Resources, devicesPerNode ...map[string]map[resourceapi.QualifiedName]resourceapi.DeviceAttribute) { + d.initName() + ginkgo.By(fmt.Sprintf("deploying driver %s on nodes %v", d.Name, nodes.NodeNames)) + d.Nodes = make(map[string]KubeletPlugin) resources.DriverName = d.Name ctx, cancel := context.WithCancel(context.Background()) - if d.NameSuffix != "" { - logger := klog.FromContext(ctx) - logger = klog.LoggerWithName(logger, "instance"+d.NameSuffix) - ctx = klog.NewContext(ctx, logger) - } + logger := klog.FromContext(ctx) + logger = klog.LoggerWithValues(logger, "driverName", d.Name) + ctx = klog.NewContext(ctx, logger) d.ctx = ctx d.cleanup = append(d.cleanup, cancel) - if d.parameterMode == "" { - d.parameterMode = parameterModeStructured - } - switch d.parameterMode { case parameterModeClassicDRA: // The controller is easy: we simply connect to the API server. @@ -387,7 +412,7 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources, devicesPerNode ... // Here we merely use impersonation, which is faster. driverClient := d.impersonateKubeletPlugin(&pod) - logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin"), "node", pod.Spec.NodeName, "pod", klog.KObj(&pod)) + logger := klog.LoggerWithValues(klog.LoggerWithName(logger, "kubelet-plugin"), "node", pod.Spec.NodeName, "pod", klog.KObj(&pod)) loggerCtx := klog.NewContext(ctx, logger) fileOps := app.FileOperations{ Create: func(name string, content []byte) error { diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go index e775e0bacae..e8c176b6c91 100644 --- a/test/e2e/dra/dra.go +++ b/test/e2e/dra/dra.go @@ -1390,6 +1390,31 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, ginkgo.Context("multiple drivers", func() { multipleDriversContext("using only drapbv1alpha3", true) }) + + ginkgo.It("runs pod after driver starts", func(ctx context.Context) { + nodes := NewNodesNow(ctx, f, 1, 4) + driver := NewDriverInstance(f) + b := newBuilderNow(ctx, f, driver) + + claim := b.externalClaim() + pod := b.podExternal() + b.create(ctx, claim, pod) + + // Cannot run pod, no devices. + framework.ExpectNoError(e2epod.WaitForPodNameUnschedulableInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace)) + + // Set up driver, which makes devices available. + driver.Run(nodes, perNode(1, nodes)) + + // Now it should run. + b.testPod(ctx, f.ClientSet, pod) + + // We need to clean up explicitly because the normal + // cleanup doesn't work (driver shuts down first). + // framework.ExpectNoError(f.ClientSet.ResourceV1alpha3().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{})) + framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})) + framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete)) + }) }) // builder contains a running counter to make objects unique within thir @@ -1676,16 +1701,20 @@ func testContainerEnv(ctx context.Context, clientSet kubernetes.Interface, pod * func newBuilder(f *framework.Framework, driver *Driver) *builder { b := &builder{f: f, driver: driver} - ginkgo.BeforeEach(b.setUp) - return b } -func (b *builder) setUp() { +func newBuilderNow(ctx context.Context, f *framework.Framework, driver *Driver) *builder { + b := &builder{f: f, driver: driver} + b.setUp(ctx) + return b +} + +func (b *builder) setUp(ctx context.Context) { b.podCounter = 0 b.claimCounter = 0 - b.create(context.Background(), b.class()) + b.create(ctx, b.class()) ginkgo.DeferCleanup(b.tearDown) }