diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 653b94417a4..08c5a39ada4 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -21,6 +21,8 @@ import ( "os" "path/filepath" "reflect" + "sync" + "sync/atomic" "testing" "time" @@ -111,7 +113,7 @@ func TestNewManagerImplStartProbeMode(t *testing.T) { socketDir, socketName, pluginSocketName, err := tmpSocketDir() require.NoError(t, err) defer os.RemoveAll(socketDir) - m, _, p, _ := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName, nil) + m, _, p, _ := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName) cleanup(t, m, p) } @@ -201,7 +203,7 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) { {ID: "Dev3", Health: pluginapi.Healthy}, } - m, ch, p1, _ := setupInProbeMode(t, devs, nil, socketName, pluginSocketName, nil) + m, ch, p1, _ := setupInProbeMode(t, devs, nil, socketName, pluginSocketName) // Wait for the first callback to be issued. select { @@ -308,8 +310,8 @@ func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, soc return m, updateChan, p } -func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string, topology []cadvisorapi.Node) (Manager, <-chan interface{}, *plugin.Stub, pluginmanager.PluginManager) { - m, updateChan := setupDeviceManager(t, devs, callback, socketName, topology) +func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *plugin.Stub, pluginmanager.PluginManager) { + m, updateChan := setupDeviceManager(t, devs, callback, socketName, nil) p := setupDevicePlugin(t, devs, pluginSocketName) pm := setupPluginManager(t, pluginSocketName, m) return m, updateChan, p, pm @@ -1406,15 +1408,13 @@ func TestReadPreNUMACheckpoint(t *testing.T) { } func TestGetTopologyHintsWithUpdates(t *testing.T) { - socketDir, socketName, pluginSocketName, err := tmpSocketDir() + socketDir, socketName, _, err := tmpSocketDir() defer os.RemoveAll(socketDir) - testPod := makePod(v1.ResourceList{ - testResourceName: *resource.NewQuantity(int64(1), resource.DecimalSI)}) require.NoError(t, err) - devs := []*pluginapi.Device{} - for i := 0; i < 5000; i++ { - devs = append(devs, &pluginapi.Device{ + devs := []pluginapi.Device{} + for i := 0; i < 1000; i++ { + devs = append(devs, pluginapi.Device{ ID: fmt.Sprintf("dev-%d", i), Health: pluginapi.Healthy, Topology: &pluginapi.TopologyInfo{ @@ -1426,22 +1426,69 @@ func TestGetTopologyHintsWithUpdates(t *testing.T) { topology := []cadvisorapi.Node{ {Id: 0}, } - m, ch, p1, _ := setupInProbeMode(t, devs, nil, socketName, pluginSocketName, topology) - - <-ch - go func() { - p1.Update(devs) - }() - - updated := false - for i := 0; i < 5000 && !updated; i++ { - m.GetTopologyHints(testPod, &testPod.Spec.Containers[0]) - select { - case <-ch: - updated = true - default: - } + testCases := []struct { + description string + count int + devices []pluginapi.Device + testfunc func(manager *wrappedManagerImpl) + }{ + { + description: "getAvailableDevices data race when update device", + count: 1, + devices: devs, + testfunc: func(manager *wrappedManagerImpl) { + manager.getAvailableDevices(testResourceName) + }, + }, + { + description: "generateDeviceTopologyHints data race when update device", + count: 1, + devices: devs, + testfunc: func(manager *wrappedManagerImpl) { + manager.generateDeviceTopologyHints( + testResourceName, sets.NewString(), sets.NewString(), 1) + }, + }, + { + description: "deviceHasTopologyAlignment data race when update device", + count: 1000, + devices: devs[:1], + testfunc: func(manager *wrappedManagerImpl) { + manager.deviceHasTopologyAlignment(testResourceName) + }, + }, } - cleanup(t, m, p1) + for _, test := range testCases { + t.Run(test.description, func(t *testing.T) { + m, _ := setupDeviceManager(t, nil, nil, socketName, topology) + defer m.Stop() + mimpl := m.(*wrappedManagerImpl) + + wg := sync.WaitGroup{} + wg.Add(2) + + updated := atomic.Bool{} + updated.Store(false) + go func() { + defer wg.Done() + for i := 0; i < test.count; i++ { + // simulate the device plugin to send device updates + mimpl.genericDeviceUpdateCallback(testResourceName, devs) + } + updated.Store(true) + }() + go func() { + defer wg.Done() + for !updated.Load() { + // When a data race occurs, golang will throw an error, and recover() cannot catch this error, + // Such as: `throw("Concurrent map iteration and map writing")`. + // When this test ends quietly, no data race error occurs. + // Otherwise, the test process exits automatically and prints all goroutine call stacks. + test.testfunc(mimpl) + } + }() + wg.Wait() + }) + } }