From ddfb1c1e5330459e7f8d7b8d07e305de6a394fb4 Mon Sep 17 00:00:00 2001 From: rogowski-piotr <62942330+rogowski-piotr@users.noreply.github.com> Date: Tue, 23 Dec 2025 22:15:53 +0100 Subject: [PATCH 1/3] kubelet(dra): fix multiple claims handling --- pkg/kubelet/cm/dra/manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index 9d034a091a5..2c2c19e6c50 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -359,10 +359,10 @@ func (m *Manager) prepareResources(ctx context.Context, pod *v1.Pod) error { return fmt.Errorf("checkpoint ResourceClaim cache: %w", err) } - // If this claim is already prepared, there is no need to prepare it again. + // If this claim is already prepared, continue preparing for any remaining claims. if claimInfo.isPrepared() { logger.V(5).Info("Resources already prepared", "pod", klog.KObj(pod), "podClaim", podClaim.Name, "claim", klog.KObj(resourceClaim)) - return nil + continue } // This saved claim will be used to update ClaimInfo cache From 6af7de8d1cefe8ccdcaab501206cb4b06b9ae307 Mon Sep 17 00:00:00 2001 From: Piotr Rogowski Date: Tue, 6 Jan 2026 20:54:12 +0000 Subject: [PATCH 2/3] test(e2e/dra): add test for pod requesting allocated and new claims --- test/e2e/dra/dra.go | 49 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go index 2e12cb39a9d..541366709b1 100644 --- a/test/e2e/dra/dra.go +++ b/test/e2e/dra/dra.go @@ -942,6 +942,54 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), func() { } } + singleNodeMultipleClaimsTests := func() { + nodes := drautils.NewNodes(f, 1, 1) + // Allow allocating more than one device so that multiple claims can be prepared on the same node. + maxAllocations := 2 + driver := drautils.NewDriver(f, nodes, drautils.DriverResources(maxAllocations)) // All tests get their own driver instance. + driver.WithKubelet = true + b := drautils.NewBuilder(f, driver) + + // https://github.com/kubernetes/kubernetes/issues/135901 was fixed in master for Kubernetes 1.35 and not backported + // so this test only passes for kubelet >= 1.35. + f.It("requests an already allocated and a new claim for a pod", f.WithLabel("KubeletMinVersion:1.35"), func(ctx context.Context) { + // This test covers a situation when a pod references a mix of already-prepared and new claims. + tCtx := f.TContext(ctx) + + firstClaim := b.ExternalClaim() + secondClaim := b.ExternalClaim() + + b.Create(tCtx, firstClaim, secondClaim) + + // First pod uses only firstClaim + firstPod := b.PodExternal() + b.Create(tCtx, firstPod) + b.TestPod(tCtx, firstPod) + + // Second pod uses firstClaim (already prepared) + secondClaim (new) + secondPod := b.PodExternal() + + secondPod.Spec.ResourceClaims = []v1.PodResourceClaim{ + { + Name: "first", + ResourceClaimName: &firstClaim.Name, + }, + { + Name: "second", + ResourceClaimName: &secondClaim.Name, + }, + } + + secondPod.Spec.Containers[0].Resources.Claims = []v1.ResourceClaim{ + {Name: "first"}, + {Name: "second"}, + } + + b.Create(tCtx, secondPod) + b.TestPod(tCtx, secondPod) + }) + } + // The following tests only make sense when there is more than one node. // They get skipped when there's only one node. multiNodeTests := func(withKubelet bool) { @@ -1806,6 +1854,7 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), func() { framework.Context("control plane", framework.WithLabel("ConformanceCandidate") /* TODO: replace with framework.WithConformance() */, func() { singleNodeTests(false) }) framework.Context("kubelet", feature.DynamicResourceAllocation, "on single node", func() { singleNodeTests(true) }) + framework.Context("kubelet", feature.DynamicResourceAllocation, "on single node with multiple claims allocation", singleNodeMultipleClaimsTests) framework.Context("control plane", framework.WithLabel("ConformanceCandidate") /* TODO: replace with framework.WithConformance() */, func() { multiNodeTests(false) }) framework.Context("kubelet", feature.DynamicResourceAllocation, "on multiple nodes", func() { multiNodeTests(true) }) From dacc7257f618682a243bc87936a262881d65f47b Mon Sep 17 00:00:00 2001 From: Piotr Rogowski Date: Wed, 14 Jan 2026 11:43:55 +0000 Subject: [PATCH 3/3] test(ut/dra): add unit test for pod requesting prepared and new claims --- pkg/kubelet/cm/dra/manager_test.go | 169 ++++++++++++++++++++++------- test/e2e/dra/dra.go | 16 +-- 2 files changed, 138 insertions(+), 47 deletions(-) diff --git a/pkg/kubelet/cm/dra/manager_test.go b/pkg/kubelet/cm/dra/manager_test.go index d9df5655839..222c93623c8 100644 --- a/pkg/kubelet/cm/dra/manager_test.go +++ b/pkg/kubelet/cm/dra/manager_test.go @@ -61,6 +61,10 @@ const ( containerName = "test-container" ) +var ( + testPodCounter atomic.Uint32 +) + type fakeDRADriverGRPCServer struct { drapb.UnimplementedDRAPluginServer drahealthv1alpha1.UnimplementedDRAResourceHealthServer @@ -276,6 +280,23 @@ func setupFakeDRADriverGRPCServer(ctx context.Context, shouldTimeout bool, plugi }, nil } +func genPrepareResourcesResponse(claimUID types.UID) *drapb.NodePrepareResourcesResponse { + return &drapb.NodePrepareResourcesResponse{ + Claims: map[string]*drapb.NodePrepareResourceResponse{ + string(claimUID): { + Devices: []*drapb.Device{ + { + PoolName: poolName, + DeviceName: deviceName, + RequestNames: []string{requestName}, + CDIDeviceIDs: []string{cdiID}, + }, + }, + }, + }, + } +} + func TestNewManagerImpl(t *testing.T) { kubeClient := fake.NewSimpleClientset() for _, test := range []struct { @@ -339,6 +360,45 @@ func genTestPod() *v1.Pod { } } +func genTestPodWithClaims(claimNames ...string) *v1.Pod { + podCounter := testPodCounter.Add(1) + + podName := fmt.Sprintf("test-pod-%d", podCounter) + podUID := types.UID(fmt.Sprintf("test-pod-uid-%d", podCounter)) + + resourceClaims := make([]v1.PodResourceClaim, 0, len(claimNames)) + containerClaims := make([]v1.ResourceClaim, 0, len(claimNames)) + + for _, claimName := range claimNames { + cn := claimName + resourceClaims = append(resourceClaims, v1.PodResourceClaim{ + Name: cn, + ResourceClaimName: &cn, + }) + containerClaims = append(containerClaims, v1.ResourceClaim{ + Name: cn, + }) + } + + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: namespace, + UID: podUID, + }, + Spec: v1.PodSpec{ + ResourceClaims: resourceClaims, + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Claims: containerClaims, + }, + }, + }, + }, + } +} + // genTestPodWithExtendedResource generates pod object func genTestPodWithExtendedResource() *v1.Pod { return &v1.Pod{ @@ -692,18 +752,7 @@ func TestPrepareResources(t *testing.T) { claim: genTestClaim(claimName, driverName, deviceName, podUID), expectedPrepareCalls: 1, expectedClaimInfoState: genClaimInfoState(cdiID), - resp: &drapb.NodePrepareResourcesResponse{Claims: map[string]*drapb.NodePrepareResourceResponse{ - string(claimUID): { - Devices: []*drapb.Device{ - { - PoolName: poolName, - DeviceName: deviceName, - RequestNames: []string{requestName}, - CDIDeviceIDs: []string{cdiID}, - }, - }, - }, - }}, + resp: genPrepareResourcesResponse(claimUID), }, { description: "resource already prepared", @@ -712,18 +761,7 @@ func TestPrepareResources(t *testing.T) { claim: genTestClaim(claimName, driverName, deviceName, podUID), claimInfo: genTestClaimInfo(claimUID, []string{podUID}, true), expectedClaimInfoState: genClaimInfoState(cdiID), - resp: &drapb.NodePrepareResourcesResponse{Claims: map[string]*drapb.NodePrepareResourceResponse{ - string(claimUID): { - Devices: []*drapb.Device{ - { - PoolName: poolName, - DeviceName: deviceName, - RequestNames: []string{requestName}, - CDIDeviceIDs: []string{cdiID}, - }, - }, - }, - }}, + resp: genPrepareResourcesResponse(claimUID), }, { description: "should timeout", @@ -740,19 +778,8 @@ func TestPrepareResources(t *testing.T) { pod: genTestPod(), claim: genTestClaim(claimName, driverName, deviceName, podUID), expectedClaimInfoState: genClaimInfoState(cdiID), - resp: &drapb.NodePrepareResourcesResponse{Claims: map[string]*drapb.NodePrepareResourceResponse{ - string(claimUID): { - Devices: []*drapb.Device{ - { - PoolName: poolName, - DeviceName: deviceName, - RequestNames: []string{requestName}, - CDIDeviceIDs: []string{cdiID}, - }, - }, - }, - }}, - expectedPrepareCalls: 1, + resp: genPrepareResourcesResponse(claimUID), + expectedPrepareCalls: 1, }, { description: "should prepare extended resource claim backed by DRA", @@ -863,6 +890,74 @@ func TestPrepareResources(t *testing.T) { } } +// TestPrepareResourcesWithPreparedAndNewClaim verifies that PrepareResources +// correctly handles a pod that references a mix of ResourceClaims: +// - first claim already prepared by a previous pod +// - second claim is new and needs to be prepared +func TestPrepareResourcesWithPreparedAndNewClaim(t *testing.T) { + logger, tCtx := ktesting.NewTestContext(t) + fakeKubeClient := fake.NewClientset() + + manager, err := NewManager(logger, fakeKubeClient, t.TempDir()) + require.NoError(t, err) + manager.initDRAPluginManager(tCtx, getFakeNode, time.Second) + + secondClaimName := fmt.Sprintf("%s-second", claimName) + + // Generate two pods where the second pod reuses an existing claim and adds a new one + firstPod := genTestPodWithClaims(claimName) + secondPod := genTestPodWithClaims(claimName, secondClaimName) + + firstClaim := genTestClaim(claimName, driverName, deviceName, string(firstPod.UID)) + secondClaim := genTestClaim(secondClaimName, driverName, deviceName, string(secondPod.UID)) + + // Make firstClaim reserved for first and second pod + firstClaim.Status.ReservedFor = append( + firstClaim.Status.ReservedFor, + resourceapi.ResourceClaimConsumerReference{UID: secondPod.UID}, + ) + + _, err = fakeKubeClient.ResourceV1().ResourceClaims(namespace).Create(tCtx, firstClaim, metav1.CreateOptions{}) + require.NoError(t, err) + + _, err = fakeKubeClient.ResourceV1().ResourceClaims(namespace).Create(tCtx, secondClaim, metav1.CreateOptions{}) + require.NoError(t, err) + + respFirst := genPrepareResourcesResponse(firstClaim.UID) + draServerInfo, err := setupFakeDRADriverGRPCServer(tCtx, false, nil, respFirst, nil, nil) + require.NoError(t, err) + defer draServerInfo.teardownFn() + + plg := manager.GetWatcherHandler() + require.NoError(t, plg.RegisterPlugin(driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, nil)) + + err = manager.PrepareResources(tCtx, firstPod) + require.NoError(t, err) + + assert.Equal(t, uint32(1), + draServerInfo.server.prepareResourceCalls.Load(), + "first pod should trigger one prepare call", + ) + + respSecond := genPrepareResourcesResponse(secondClaim.UID) + draServerInfo.server.prepareResourcesResponse = respSecond + + err = manager.PrepareResources(tCtx, secondPod) + require.NoError(t, err) + + // second pod triggered exactly one prepare call (new claim only) + previous one call + assert.Equal(t, uint32(2), + draServerInfo.server.prepareResourceCalls.Load(), + "second pod should trigger one prepare call for the new claim", + ) + + for _, claimName := range []string{firstClaim.Name, secondClaim.Name} { + claimInfo, exists := manager.cache.get(claimName, namespace) + require.True(t, exists, "claim %s should exist in cache", claimName) + assert.True(t, claimInfo.prepared, "claim %s should be marked as prepared", claimName) + } +} + func TestUnprepareResources(t *testing.T) { fakeKubeClient := fake.NewSimpleClientset() for _, test := range []struct { diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go index 541366709b1..b4bdf8e272c 100644 --- a/test/e2e/dra/dra.go +++ b/test/e2e/dra/dra.go @@ -950,21 +950,17 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), func() { driver.WithKubelet = true b := drautils.NewBuilder(f, driver) - // https://github.com/kubernetes/kubernetes/issues/135901 was fixed in master for Kubernetes 1.35 and not backported - // so this test only passes for kubelet >= 1.35. - f.It("requests an already allocated and a new claim for a pod", f.WithLabel("KubeletMinVersion:1.35"), func(ctx context.Context) { + ginkgo.It("requests an already allocated and a new claim for a pod", func(ctx context.Context) { // This test covers a situation when a pod references a mix of already-prepared and new claims. - tCtx := f.TContext(ctx) - firstClaim := b.ExternalClaim() secondClaim := b.ExternalClaim() - b.Create(tCtx, firstClaim, secondClaim) + b.Create(ctx, firstClaim, secondClaim) // First pod uses only firstClaim firstPod := b.PodExternal() - b.Create(tCtx, firstPod) - b.TestPod(tCtx, firstPod) + b.Create(ctx, firstPod) + b.TestPod(ctx, f, firstPod) // Second pod uses firstClaim (already prepared) + secondClaim (new) secondPod := b.PodExternal() @@ -985,8 +981,8 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), func() { {Name: "second"}, } - b.Create(tCtx, secondPod) - b.TestPod(tCtx, secondPod) + b.Create(ctx, secondPod) + b.TestPod(ctx, f, secondPod) }) }