mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-03 18:27:49 +00:00
@@ -21,6 +21,8 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -111,7 +113,7 @@ func TestNewManagerImplStartProbeMode(t *testing.T) {
|
|||||||
socketDir, socketName, pluginSocketName, err := tmpSocketDir()
|
socketDir, socketName, pluginSocketName, err := tmpSocketDir()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer os.RemoveAll(socketDir)
|
defer os.RemoveAll(socketDir)
|
||||||
m, _, p, _ := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName, nil)
|
m, _, p, _ := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName)
|
||||||
cleanup(t, m, p)
|
cleanup(t, m, p)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -201,7 +203,7 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) {
|
|||||||
{ID: "Dev3", Health: pluginapi.Healthy},
|
{ID: "Dev3", Health: pluginapi.Healthy},
|
||||||
}
|
}
|
||||||
|
|
||||||
m, ch, p1, _ := setupInProbeMode(t, devs, nil, socketName, pluginSocketName, nil)
|
m, ch, p1, _ := setupInProbeMode(t, devs, nil, socketName, pluginSocketName)
|
||||||
|
|
||||||
// Wait for the first callback to be issued.
|
// Wait for the first callback to be issued.
|
||||||
select {
|
select {
|
||||||
@@ -308,8 +310,8 @@ func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, soc
|
|||||||
return m, updateChan, p
|
return m, updateChan, p
|
||||||
}
|
}
|
||||||
|
|
||||||
func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string, topology []cadvisorapi.Node) (Manager, <-chan interface{}, *plugin.Stub, pluginmanager.PluginManager) {
|
func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *plugin.Stub, pluginmanager.PluginManager) {
|
||||||
m, updateChan := setupDeviceManager(t, devs, callback, socketName, topology)
|
m, updateChan := setupDeviceManager(t, devs, callback, socketName, nil)
|
||||||
p := setupDevicePlugin(t, devs, pluginSocketName)
|
p := setupDevicePlugin(t, devs, pluginSocketName)
|
||||||
pm := setupPluginManager(t, pluginSocketName, m)
|
pm := setupPluginManager(t, pluginSocketName, m)
|
||||||
return m, updateChan, p, pm
|
return m, updateChan, p, pm
|
||||||
@@ -1406,15 +1408,13 @@ func TestReadPreNUMACheckpoint(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestGetTopologyHintsWithUpdates(t *testing.T) {
|
func TestGetTopologyHintsWithUpdates(t *testing.T) {
|
||||||
socketDir, socketName, pluginSocketName, err := tmpSocketDir()
|
socketDir, socketName, _, err := tmpSocketDir()
|
||||||
defer os.RemoveAll(socketDir)
|
defer os.RemoveAll(socketDir)
|
||||||
testPod := makePod(v1.ResourceList{
|
|
||||||
testResourceName: *resource.NewQuantity(int64(1), resource.DecimalSI)})
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
devs := []*pluginapi.Device{}
|
devs := []pluginapi.Device{}
|
||||||
for i := 0; i < 5000; i++ {
|
for i := 0; i < 1000; i++ {
|
||||||
devs = append(devs, &pluginapi.Device{
|
devs = append(devs, pluginapi.Device{
|
||||||
ID: fmt.Sprintf("dev-%d", i),
|
ID: fmt.Sprintf("dev-%d", i),
|
||||||
Health: pluginapi.Healthy,
|
Health: pluginapi.Healthy,
|
||||||
Topology: &pluginapi.TopologyInfo{
|
Topology: &pluginapi.TopologyInfo{
|
||||||
@@ -1426,22 +1426,69 @@ func TestGetTopologyHintsWithUpdates(t *testing.T) {
|
|||||||
topology := []cadvisorapi.Node{
|
topology := []cadvisorapi.Node{
|
||||||
{Id: 0},
|
{Id: 0},
|
||||||
}
|
}
|
||||||
m, ch, p1, _ := setupInProbeMode(t, devs, nil, socketName, pluginSocketName, topology)
|
testCases := []struct {
|
||||||
|
description string
|
||||||
<-ch
|
count int
|
||||||
go func() {
|
devices []pluginapi.Device
|
||||||
p1.Update(devs)
|
testfunc func(manager *wrappedManagerImpl)
|
||||||
}()
|
}{
|
||||||
|
{
|
||||||
updated := false
|
description: "getAvailableDevices data race when update device",
|
||||||
for i := 0; i < 5000 && !updated; i++ {
|
count: 1,
|
||||||
m.GetTopologyHints(testPod, &testPod.Spec.Containers[0])
|
devices: devs,
|
||||||
select {
|
testfunc: func(manager *wrappedManagerImpl) {
|
||||||
case <-ch:
|
manager.getAvailableDevices(testResourceName)
|
||||||
updated = true
|
},
|
||||||
default:
|
},
|
||||||
}
|
{
|
||||||
|
description: "generateDeviceTopologyHints data race when update device",
|
||||||
|
count: 1,
|
||||||
|
devices: devs,
|
||||||
|
testfunc: func(manager *wrappedManagerImpl) {
|
||||||
|
manager.generateDeviceTopologyHints(
|
||||||
|
testResourceName, sets.NewString(), sets.NewString(), 1)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "deviceHasTopologyAlignment data race when update device",
|
||||||
|
count: 1000,
|
||||||
|
devices: devs[:1],
|
||||||
|
testfunc: func(manager *wrappedManagerImpl) {
|
||||||
|
manager.deviceHasTopologyAlignment(testResourceName)
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanup(t, m, p1)
|
for _, test := range testCases {
|
||||||
|
t.Run(test.description, func(t *testing.T) {
|
||||||
|
m, _ := setupDeviceManager(t, nil, nil, socketName, topology)
|
||||||
|
defer m.Stop()
|
||||||
|
mimpl := m.(*wrappedManagerImpl)
|
||||||
|
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
wg.Add(2)
|
||||||
|
|
||||||
|
updated := atomic.Bool{}
|
||||||
|
updated.Store(false)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for i := 0; i < test.count; i++ {
|
||||||
|
// simulate the device plugin to send device updates
|
||||||
|
mimpl.genericDeviceUpdateCallback(testResourceName, devs)
|
||||||
|
}
|
||||||
|
updated.Store(true)
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for !updated.Load() {
|
||||||
|
// When a data race occurs, golang will throw an error, and recover() cannot catch this error,
|
||||||
|
// Such as: `throw("Concurrent map iteration and map writing")`.
|
||||||
|
// When this test ends quietly, no data race error occurs.
|
||||||
|
// Otherwise, the test process exits automatically and prints all goroutine call stacks.
|
||||||
|
test.testfunc(mimpl)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
wg.Wait()
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user