DRA scheduler: fix re-scheduling after ResourceSlice changes

Making unschedulable pods schedulable again after ResourceSlice cluster events
was accidentally left out when adding structured parameters to Kubernetes 1.30.

All E2E tests were defined so that a driver starts first. A new test with a
different order (create pod first, wait for unschedulable, start driver)
triggered the bug and now passes.
This commit is contained in:
Patrick Ohly 2024-08-20 10:54:23 +02:00
parent 6dd2ade762
commit e85d3babf0
6 changed files with 212 additions and 89 deletions

View File

@ -529,6 +529,15 @@ func addAllEventHandlers(
)
handlers = append(handlers, handlerRegistration)
}
case framework.ResourceSlice:
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
if handlerRegistration, err = informerFactory.Resource().V1alpha3().ResourceSlices().Informer().AddEventHandler(
buildEvtResHandler(at, framework.ResourceSlice, "ResourceSlice"),
); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)
}
case framework.DeviceClass:
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
if handlerRegistration, err = informerFactory.Resource().V1alpha3().DeviceClasses().Informer().AddEventHandler(

View File

@ -216,6 +216,7 @@ func TestAddAllEventHandlers(t *testing.T) {
name string
gvkMap map[framework.GVK]framework.ActionType
enableDRA bool
enableClassicDRA bool
expectStaticInformers map[reflect.Type]bool
expectDynamicInformers map[schema.GroupVersionResource]bool
}{
@ -234,6 +235,7 @@ func TestAddAllEventHandlers(t *testing.T) {
gvkMap: map[framework.GVK]framework.ActionType{
framework.PodSchedulingContext: framework.Add,
framework.ResourceClaim: framework.Add,
framework.ResourceSlice: framework.Add,
framework.DeviceClass: framework.Add,
},
expectStaticInformers: map[reflect.Type]bool{
@ -244,19 +246,41 @@ func TestAddAllEventHandlers(t *testing.T) {
expectDynamicInformers: map[schema.GroupVersionResource]bool{},
},
{
name: "DRA events enabled",
name: "some DRA events enabled",
gvkMap: map[framework.GVK]framework.ActionType{
framework.PodSchedulingContext: framework.Add,
framework.ResourceClaim: framework.Add,
framework.ResourceSlice: framework.Add,
framework.DeviceClass: framework.Add,
},
enableDRA: true,
expectStaticInformers: map[reflect.Type]bool{
reflect.TypeOf(&v1.Pod{}): true,
reflect.TypeOf(&v1.Node{}): true,
reflect.TypeOf(&v1.Namespace{}): true,
reflect.TypeOf(&resourceapi.ResourceClaim{}): true,
reflect.TypeOf(&resourceapi.ResourceSlice{}): true,
reflect.TypeOf(&resourceapi.DeviceClass{}): true,
},
expectDynamicInformers: map[schema.GroupVersionResource]bool{},
},
{
name: "all DRA events enabled",
gvkMap: map[framework.GVK]framework.ActionType{
framework.PodSchedulingContext: framework.Add,
framework.ResourceClaim: framework.Add,
framework.ResourceSlice: framework.Add,
framework.DeviceClass: framework.Add,
},
enableDRA: true,
enableClassicDRA: true,
expectStaticInformers: map[reflect.Type]bool{
reflect.TypeOf(&v1.Pod{}): true,
reflect.TypeOf(&v1.Node{}): true,
reflect.TypeOf(&v1.Namespace{}): true,
reflect.TypeOf(&resourceapi.PodSchedulingContext{}): true,
reflect.TypeOf(&resourceapi.ResourceClaim{}): true,
reflect.TypeOf(&resourceapi.ResourceSlice{}): true,
reflect.TypeOf(&resourceapi.DeviceClass{}): true,
},
expectDynamicInformers: map[schema.GroupVersionResource]bool{},
@ -320,6 +344,7 @@ func TestAddAllEventHandlers(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicResourceAllocation, tt.enableDRA)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DRAControlPlaneController, tt.enableClassicDRA)
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

View File

@ -398,6 +398,8 @@ func (pl *dynamicResources) EventsToRegister(_ context.Context) ([]framework.Clu
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}},
// A pod might be waiting for a class to get created or modified.
{Event: framework.ClusterEvent{Resource: framework.DeviceClass, ActionType: framework.Add | framework.Update}},
// Adding or updating a ResourceSlice might make a pod schedulable because new resources became available.
{Event: framework.ClusterEvent{Resource: framework.ResourceSlice, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterResourceSliceChange},
}
if pl.podSchedulingContextLister != nil {
@ -491,6 +493,38 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po
return framework.Queue, nil
}
// isSchedulableAfterResourceSliceChange is invoked for add and update slice events reported by
// an informer. Such changes can make an unschedulable pod schedulable when the pod requests a device
// and the change adds a suitable device.
//
// For the sake of faster execution and avoiding code duplication, isSchedulableAfterResourceSliceChange
// only checks whether the pod uses claims. All of the more detailed checks are done in the scheduling
// attempt.
//
// The delete claim event will not invoke it, so newObj will never be nil.
func (pl *dynamicResources) isSchedulableAfterResourceSliceChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
_, modifiedSlice, err := schedutil.As[*resourceapi.ResourceSlice](oldObj, newObj)
if err != nil {
// Shouldn't happen.
return framework.Queue, fmt.Errorf("unexpected object in isSchedulableAfterResourceSliceChange: %w", err)
}
if err := pl.foreachPodResourceClaim(pod, nil); err != nil {
// This is not an unexpected error: we know that
// foreachPodResourceClaim only returns errors for "not
// schedulable".
logger.V(6).Info("pod is not schedulable after resource slice change", "pod", klog.KObj(pod), "resourceSlice", klog.KObj(modifiedSlice), "reason", err.Error())
return framework.QueueSkip, nil
}
// We could check what got changed in the slice, but right now that's likely to be
// about the spec (there's no status yet...).
// We could check whether all claims use classic DRA, but that doesn't seem worth it.
// Let's assume that changing the slice may make the pod schedulable.
logger.V(5).Info("ResourceSlice change might make pod schedulable", "pod", klog.KObj(pod), "resourceSlice", klog.KObj(modifiedSlice))
return framework.Queue, nil
}
// isSchedulableAfterPodSchedulingContextChange is invoked for all
// PodSchedulingContext events reported by an informer. It checks whether that
// change made a previously unschedulable pod schedulable (updated) or a new

View File

@ -125,6 +125,7 @@ const (
StorageClass GVK = "storage.k8s.io/StorageClass"
PodSchedulingContext GVK = "PodSchedulingContext"
ResourceClaim GVK = "ResourceClaim"
ResourceSlice GVK = "ResourceSlice"
DeviceClass GVK = "DeviceClass"
// WildCard is a special GVK to match all resources.

View File

@ -77,10 +77,24 @@ type Nodes struct {
var pluginPermissions string
// NewNodes selects nodes to run the test on.
//
// Call this outside of ginkgo.It, then use the instance inside ginkgo.It.
func NewNodes(f *framework.Framework, minNodes, maxNodes int) *Nodes {
nodes := &Nodes{}
ginkgo.BeforeEach(func(ctx context.Context) {
nodes.init(ctx, f, minNodes, maxNodes)
})
return nodes
}
// NewNodesNow is a variant of NewNodes which can be used inside a ginkgo.It.
func NewNodesNow(ctx context.Context, f *framework.Framework, minNodes, maxNodes int) *Nodes {
nodes := &Nodes{}
nodes.init(ctx, f, minNodes, maxNodes)
return nodes
}
func (nodes *Nodes) init(ctx context.Context, f *framework.Framework, minNodes, maxNodes int) {
ginkgo.By("selecting nodes")
// The kubelet plugin is harder. We deploy the builtin manifest
// after patching in the driver name and all nodes on which we
@ -136,8 +150,6 @@ func NewNodes(f *framework.Framework, minNodes, maxNodes int) *Nodes {
defer wg.Done()
claimInformer.Run(cancelCtx.Done())
}()
})
return nodes
}
func validateClaim(claim *resourceapi.ResourceClaim) {
@ -153,15 +165,32 @@ func validateClaim(claim *resourceapi.ResourceClaim) {
// NewDriver sets up controller (as client of the cluster) and
// kubelet plugin (via proxy) before the test runs. It cleans
// up after the test.
//
// Call this outside of ginkgo.It, then use the instance inside ginkgo.It.
func NewDriver(f *framework.Framework, nodes *Nodes, configureResources func() app.Resources, devicesPerNode ...map[string]map[resourceapi.QualifiedName]resourceapi.DeviceAttribute) *Driver {
d := NewDriverInstance(f)
ginkgo.BeforeEach(func() {
d.Run(nodes, configureResources, devicesPerNode...)
})
return d
}
// NewDriverInstance is a variant of NewDriver where the driver is inactive and must
// be started explicitly with Run. May be used inside ginkgo.It.
func NewDriverInstance(f *framework.Framework) *Driver {
d := &Driver{
f: f,
fail: map[MethodInstance]bool{},
callCounts: map[MethodInstance]int64{},
NodeV1alpha3: true,
parameterMode: parameterModeStructured,
}
d.initName()
return d
}
ginkgo.BeforeEach(func() {
func (d *Driver) Run(nodes *Nodes, configureResources func() app.Resources, devicesPerNode ...map[string]map[resourceapi.QualifiedName]resourceapi.DeviceAttribute) {
resources := configureResources()
if len(resources.Nodes) == 0 {
// This always has to be set because the driver might
@ -171,8 +200,6 @@ func NewDriver(f *framework.Framework, nodes *Nodes, configureResources func() a
ginkgo.DeferCleanup(d.IsGone) // Register first so it gets called last.
d.SetUp(nodes, resources, devicesPerNode...)
ginkgo.DeferCleanup(d.TearDown)
})
return d
}
type MethodInstance struct {
@ -215,25 +242,23 @@ const (
parameterModeStructured parameterMode = "structured" // allocation through scheduler
)
func (d *Driver) SetUp(nodes *Nodes, resources app.Resources, devicesPerNode ...map[string]map[resourceapi.QualifiedName]resourceapi.DeviceAttribute) {
ginkgo.By(fmt.Sprintf("deploying driver on nodes %v", nodes.NodeNames))
d.Nodes = make(map[string]KubeletPlugin)
func (d *Driver) initName() {
d.Name = d.f.UniqueName + d.NameSuffix + ".k8s.io"
}
func (d *Driver) SetUp(nodes *Nodes, resources app.Resources, devicesPerNode ...map[string]map[resourceapi.QualifiedName]resourceapi.DeviceAttribute) {
d.initName()
ginkgo.By(fmt.Sprintf("deploying driver %s on nodes %v", d.Name, nodes.NodeNames))
d.Nodes = make(map[string]KubeletPlugin)
resources.DriverName = d.Name
ctx, cancel := context.WithCancel(context.Background())
if d.NameSuffix != "" {
logger := klog.FromContext(ctx)
logger = klog.LoggerWithName(logger, "instance"+d.NameSuffix)
logger = klog.LoggerWithValues(logger, "driverName", d.Name)
ctx = klog.NewContext(ctx, logger)
}
d.ctx = ctx
d.cleanup = append(d.cleanup, cancel)
if d.parameterMode == "" {
d.parameterMode = parameterModeStructured
}
switch d.parameterMode {
case parameterModeClassicDRA:
// The controller is easy: we simply connect to the API server.
@ -387,7 +412,7 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources, devicesPerNode ...
// Here we merely use impersonation, which is faster.
driverClient := d.impersonateKubeletPlugin(&pod)
logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin"), "node", pod.Spec.NodeName, "pod", klog.KObj(&pod))
logger := klog.LoggerWithValues(klog.LoggerWithName(logger, "kubelet-plugin"), "node", pod.Spec.NodeName, "pod", klog.KObj(&pod))
loggerCtx := klog.NewContext(ctx, logger)
fileOps := app.FileOperations{
Create: func(name string, content []byte) error {

View File

@ -1390,6 +1390,31 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
ginkgo.Context("multiple drivers", func() {
multipleDriversContext("using only drapbv1alpha3", true)
})
ginkgo.It("runs pod after driver starts", func(ctx context.Context) {
nodes := NewNodesNow(ctx, f, 1, 4)
driver := NewDriverInstance(f)
b := newBuilderNow(ctx, f, driver)
claim := b.externalClaim()
pod := b.podExternal()
b.create(ctx, claim, pod)
// Cannot run pod, no devices.
framework.ExpectNoError(e2epod.WaitForPodNameUnschedulableInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace))
// Set up driver, which makes devices available.
driver.Run(nodes, perNode(1, nodes))
// Now it should run.
b.testPod(ctx, f.ClientSet, pod)
// We need to clean up explicitly because the normal
// cleanup doesn't work (driver shuts down first).
// framework.ExpectNoError(f.ClientSet.ResourceV1alpha3().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{}))
framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}))
framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete))
})
})
// builder contains a running counter to make objects unique within thir
@ -1676,16 +1701,20 @@ func testContainerEnv(ctx context.Context, clientSet kubernetes.Interface, pod *
func newBuilder(f *framework.Framework, driver *Driver) *builder {
b := &builder{f: f, driver: driver}
ginkgo.BeforeEach(b.setUp)
return b
}
func (b *builder) setUp() {
func newBuilderNow(ctx context.Context, f *framework.Framework, driver *Driver) *builder {
b := &builder{f: f, driver: driver}
b.setUp(ctx)
return b
}
func (b *builder) setUp(ctx context.Context) {
b.podCounter = 0
b.claimCounter = 0
b.create(context.Background(), b.class())
b.create(ctx, b.class())
ginkgo.DeferCleanup(b.tearDown)
}