add resourceupdates.Update chan buffer

Signed-off-by: rongfu.leng <lenronfu@gmail.com>
This commit is contained in:
rongfu.leng 2024-09-14 09:17:57 +00:00
parent 6ded721910
commit ead64fb8f0
2 changed files with 43 additions and 42 deletions

View File

@ -18,6 +18,7 @@ package devicemanager
import ( import (
"context" "context"
goerrors "errors"
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
@ -157,7 +158,7 @@ func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffi
numaNodes: numaNodes, numaNodes: numaNodes,
topologyAffinityStore: topologyAffinityStore, topologyAffinityStore: topologyAffinityStore,
devicesToReuse: make(PodReusableDevices), devicesToReuse: make(PodReusableDevices),
update: make(chan resourceupdates.Update), update: make(chan resourceupdates.Update, 100),
} }
server, err := plugin.NewServer(socketPath, manager, manager) server, err := plugin.NewServer(socketPath, manager, manager)
@ -309,8 +310,10 @@ func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices [
if utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) { if utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) {
if len(podsToUpdate) > 0 { if len(podsToUpdate) > 0 {
m.update <- resourceupdates.Update{ select {
PodUIDs: podsToUpdate.UnsortedList(), case m.update <- resourceupdates.Update{PodUIDs: podsToUpdate.UnsortedList()}:
default:
klog.ErrorS(goerrors.New("device update channel is full"), "discard pods info", "podsToUpdate", podsToUpdate.UnsortedList())
} }
} }
} }

View File

@ -1970,18 +1970,17 @@ func TestFeatureGateResourceHealthStatus(t *testing.T) {
ckm, err := checkpointmanager.NewCheckpointManager(tmpDir) ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
require.NoError(t, err, "err should be nil") require.NoError(t, err, "err should be nil")
resourceName := "domain1.com/resource1" resourceName := "domain1.com/resource1"
existDevices := map[string]DeviceInstances{ existDevices := map[string]DeviceInstances{}
resourceName: map[string]pluginapi.Device{ resourceNameMap := make(map[string]pluginapi.Device)
"dev1": { deviceUpdateNumber, deviceUpdateChanBuffer := 200, 100
ID: "dev1", for i := 0; i < deviceUpdateNumber; i++ {
resourceNameMap[fmt.Sprintf("dev%d", i)] = pluginapi.Device{
ID: fmt.Sprintf("dev%d", i),
Health: pluginapi.Healthy, Health: pluginapi.Healthy,
},
"dev2": {
ID: "dev2",
Health: pluginapi.Unhealthy,
},
},
} }
}
existDevices[resourceName] = resourceNameMap
testManager := &ManagerImpl{ testManager := &ManagerImpl{
allDevices: ResourceDeviceInstances(existDevices), allDevices: ResourceDeviceInstances(existDevices),
endpoints: make(map[string]endpointInfo), endpoints: make(map[string]endpointInfo),
@ -1990,13 +1989,13 @@ func TestFeatureGateResourceHealthStatus(t *testing.T) {
allocatedDevices: make(map[string]sets.Set[string]), allocatedDevices: make(map[string]sets.Set[string]),
podDevices: newPodDevices(), podDevices: newPodDevices(),
checkpointManager: ckm, checkpointManager: ckm,
update: make(chan resourceupdates.Update), update: make(chan resourceupdates.Update, deviceUpdateChanBuffer),
} }
podID := "pod1" for i := 0; i < deviceUpdateNumber; i++ {
contID := "con1" podID := fmt.Sprintf("pod%d", i)
devices := checkpoint.DevicesPerNUMA{0: []string{"dev1"}, 1: []string{"dev1"}} contID := fmt.Sprintf("con%d", i)
devices := checkpoint.DevicesPerNUMA{0: []string{fmt.Sprintf("dev%d", i)}}
testManager.podDevices.insert(podID, contID, resourceName, testManager.podDevices.insert(podID, contID, resourceName,
devices, devices,
newContainerAllocateResponse( newContainerAllocateResponse(
@ -2004,29 +2003,28 @@ func TestFeatureGateResourceHealthStatus(t *testing.T) {
withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}), withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}),
), ),
) )
}
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ResourceHealthStatus, true) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ResourceHealthStatus, true)
for i := 0; i < deviceUpdateNumber; i++ {
testManager.genericDeviceUpdateCallback(resourceName, []pluginapi.Device{ testManager.genericDeviceUpdateCallback(resourceName, []pluginapi.Device{
{ID: "dev1", Health: pluginapi.Healthy}, {ID: "dev1", Health: pluginapi.Healthy},
{ID: "dev2", Health: pluginapi.Unhealthy},
}) })
}
// update chan no data // update chan no data
assert.Empty(t, testManager.update) assert.Empty(t, testManager.update)
// update chan receive pod1 // update device status, assume all device unhealthy.
var wg sync.WaitGroup for i := 0; i < deviceUpdateNumber; i++ {
go func() { testManager.genericDeviceUpdateCallback(resourceName, []pluginapi.Device{
defer wg.Done() {ID: fmt.Sprintf("dev%d", i), Health: pluginapi.Unhealthy},
})
}
for i := 0; i < deviceUpdateChanBuffer; i++ {
u := <-testManager.update u := <-testManager.update
assert.Equal(t, resourceupdates.Update{ assert.Equal(t, resourceupdates.Update{
PodUIDs: []string{"pod1"}, PodUIDs: []string{fmt.Sprintf("pod%d", i)},
}, u) }, u)
}() }
wg.Add(1)
testManager.genericDeviceUpdateCallback(resourceName, []pluginapi.Device{
{ID: "dev1", Health: pluginapi.Unhealthy},
{ID: "dev2", Health: pluginapi.Healthy},
})
wg.Wait()
} }