diff --git a/pkg/kubelet/cm/dra/manager_test.go b/pkg/kubelet/cm/dra/manager_test.go index 7aa04880f77..eaf05760fac 100644 --- a/pkg/kubelet/cm/dra/manager_test.go +++ b/pkg/kubelet/cm/dra/manager_test.go @@ -22,6 +22,7 @@ import ( "net" "os" "path/filepath" + "sync" "sync/atomic" "testing" "time" @@ -63,6 +64,18 @@ func (s *fakeDRADriverGRPCServer) NodePrepareResources(ctx context.Context, req time.Sleep(*s.timeout) } + if s.prepareResourcesResponse == nil { + deviceName := "claim-" + req.Claims[0].Uid + result := s.driverName + "/" + driverClassName + "=" + deviceName + return &drapbv1.NodePrepareResourcesResponse{ + Claims: map[string]*drapbv1.NodePrepareResourceResponse{ + req.Claims[0].Uid: { + CDIDevices: []string{result}, + }, + }, + }, nil + } + return s.prepareResourcesResponse, nil } @@ -73,6 +86,14 @@ func (s *fakeDRADriverGRPCServer) NodeUnprepareResources(ctx context.Context, re time.Sleep(*s.timeout) } + if s.unprepareResourcesResponse == nil { + return &drapbv1.NodeUnprepareResourcesResponse{ + Claims: map[string]*drapbv1.NodeUnprepareResourceResponse{ + req.Claims[0].Uid: {}, + }, + }, nil + } + return s.unprepareResourcesResponse, nil } @@ -1429,3 +1450,116 @@ func TestGetContainerClaimInfos(t *testing.T) { }) } } + +// TestParallelPrepareUnprepareResources calls PrepareResources and UnprepareResources APIs in parallel +// to detect possible data races +func TestParallelPrepareUnprepareResources(t *testing.T) { + // Setup and register fake DRA driver + draServerInfo, err := setupFakeDRADriverGRPCServer(false, nil, nil, nil) + if err != nil { + t.Fatal(err) + } + defer draServerInfo.teardownFn() + + plg := plugin.NewRegistrationHandler(nil, getFakeNode) + if err := plg.RegisterPlugin(driverName, draServerInfo.socketName, []string{"1.27"}, nil); err != nil { + t.Fatalf("failed to register plugin %s, err: %v", driverName, err) + } + defer plg.DeRegisterPlugin(driverName) + + // Create ClaimInfo cache + cache, err := newClaimInfoCache(t.TempDir(), draManagerStateFileName) + if err != nil { + t.Errorf("failed to newClaimInfoCache, err: %+v", err) + return + } + + // Create fake Kube client and DRA manager + fakeKubeClient := fake.NewSimpleClientset() + manager := &ManagerImpl{kubeClient: fakeKubeClient, cache: cache} + + // Call PrepareResources in parallel + var wgSync, wgStart sync.WaitGroup // groups to sync goroutines + numGoroutines := 30 + wgSync.Add(numGoroutines) + wgStart.Add(1) + for i := 0; i < numGoroutines; i++ { + go func(t *testing.T, goRoutineNum int) { + defer wgSync.Done() + wgStart.Wait() // Wait to start all goroutines at the same time + + var err error + nameSpace := "test-namespace-parallel" + claimName := fmt.Sprintf("test-pod-claim-%d", goRoutineNum) + podUID := types.UID(fmt.Sprintf("test-reserved-%d", goRoutineNum)) + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-pod-%d", goRoutineNum), + Namespace: nameSpace, + UID: podUID, + }, + Spec: v1.PodSpec{ + ResourceClaims: []v1.PodResourceClaim{ + { + Name: claimName, + Source: v1.ClaimSource{ResourceClaimName: func() *string { + s := claimName + return &s + }()}, + }, + }, + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Claims: []v1.ResourceClaim{ + { + Name: claimName, + }, + }, + }, + }, + }, + }, + } + resourceClaim := &resourcev1alpha2.ResourceClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: claimName, + Namespace: nameSpace, + UID: types.UID(fmt.Sprintf("claim-%d", goRoutineNum)), + }, + Spec: resourcev1alpha2.ResourceClaimSpec{ + ResourceClassName: "test-class", + }, + Status: resourcev1alpha2.ResourceClaimStatus{ + DriverName: driverName, + Allocation: &resourcev1alpha2.AllocationResult{ + ResourceHandles: []resourcev1alpha2.ResourceHandle{ + {Data: "test-data", DriverName: driverName}, + }, + }, + ReservedFor: []resourcev1alpha2.ResourceClaimConsumerReference{ + {UID: podUID}, + }, + }, + } + + if _, err = fakeKubeClient.ResourceV1alpha2().ResourceClaims(pod.Namespace).Create(context.Background(), resourceClaim, metav1.CreateOptions{}); err != nil { + t.Errorf("failed to create ResourceClaim %s: %+v", resourceClaim.Name, err) + return + } + + if err = manager.PrepareResources(pod); err != nil { + t.Errorf("pod: %s: PrepareResources failed: %+v", pod.Name, err) + return + } + + if err = manager.UnprepareResources(pod); err != nil { + t.Errorf("pod: %s: UnprepareResources failed: %+v", pod.Name, err) + return + } + + }(t, i) + } + wgStart.Done() // Start executing goroutines + wgSync.Wait() // Wait for all goroutines to finish +}