mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 23:37:01 +00:00
optimize code, filter podUID is empty string
Signed-off-by: rongfu.leng <lenronfu@gmail.com>
This commit is contained in:
parent
359b9ba9bf
commit
d04a54c50b
@ -280,16 +280,20 @@ func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices [
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) {
|
||||
// compare with old device's health and send update to the channel if needed
|
||||
updatePodUIDFn := func(deviceID string) {
|
||||
podUID, _ := m.podDevices.getPodAndContainerForDevice(deviceID)
|
||||
if podUID != "" {
|
||||
podsToUpdate.Insert(podUID)
|
||||
}
|
||||
}
|
||||
if oldDev, ok := oldDevices[dev.ID]; ok {
|
||||
if oldDev.Health != dev.Health {
|
||||
podUID, _ := m.podDevices.getPodAndContainerForDevice(dev.ID)
|
||||
podsToUpdate.Insert(podUID)
|
||||
updatePodUIDFn(dev.ID)
|
||||
}
|
||||
} else {
|
||||
// if this is a new device, it might have existed before and disappeared for a while
|
||||
// but still be assigned to a Pod. In this case, we need to send an update to the channel
|
||||
podUID, _ := m.podDevices.getPodAndContainerForDevice(dev.ID)
|
||||
podsToUpdate.Insert(podUID)
|
||||
updatePodUIDFn(dev.ID)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -38,13 +38,17 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/tools/record"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
|
||||
watcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
|
||||
plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
@ -1953,3 +1957,76 @@ func sortContainerStatuses(statuses []v1.ContainerStatus) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFeatureGateResourceHealthStatus(t *testing.T) {
|
||||
tmpDir, err := os.MkdirTemp("", "checkpoint")
|
||||
require.NoError(t, err, "err should be nil")
|
||||
defer func() {
|
||||
err = os.RemoveAll(tmpDir)
|
||||
if err != nil {
|
||||
t.Fatalf("Fail to remove tmpdir: %v", err)
|
||||
}
|
||||
}()
|
||||
ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
|
||||
require.NoError(t, err, "err should be nil")
|
||||
resourceName := "domain1.com/resource1"
|
||||
existDevices := map[string]DeviceInstances{
|
||||
resourceName: map[string]pluginapi.Device{
|
||||
"dev1": {
|
||||
ID: "dev1",
|
||||
Health: pluginapi.Healthy,
|
||||
},
|
||||
"dev2": {
|
||||
ID: "dev2",
|
||||
Health: pluginapi.Unhealthy,
|
||||
},
|
||||
},
|
||||
}
|
||||
testManager := &ManagerImpl{
|
||||
allDevices: ResourceDeviceInstances(existDevices),
|
||||
endpoints: make(map[string]endpointInfo),
|
||||
healthyDevices: make(map[string]sets.Set[string]),
|
||||
unhealthyDevices: make(map[string]sets.Set[string]),
|
||||
allocatedDevices: make(map[string]sets.Set[string]),
|
||||
podDevices: newPodDevices(),
|
||||
checkpointManager: ckm,
|
||||
update: make(chan resourceupdates.Update),
|
||||
}
|
||||
|
||||
podID := "pod1"
|
||||
contID := "con1"
|
||||
devices := checkpoint.DevicesPerNUMA{0: []string{"dev1"}, 1: []string{"dev1"}}
|
||||
|
||||
testManager.podDevices.insert(podID, contID, resourceName,
|
||||
devices,
|
||||
newContainerAllocateResponse(
|
||||
withDevices(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"}),
|
||||
withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}),
|
||||
),
|
||||
)
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ResourceHealthStatus, true)
|
||||
|
||||
testManager.genericDeviceUpdateCallback(resourceName, []pluginapi.Device{
|
||||
{ID: "dev1", Health: pluginapi.Healthy},
|
||||
{ID: "dev2", Health: pluginapi.Unhealthy},
|
||||
})
|
||||
// update chan no data
|
||||
assert.Empty(t, len(testManager.update), 0)
|
||||
|
||||
// update chan receive pod1
|
||||
var wg sync.WaitGroup
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
u := <-testManager.update
|
||||
assert.Equal(t, resourceupdates.Update{
|
||||
PodUIDs: []string{"pod1"},
|
||||
}, u)
|
||||
}()
|
||||
wg.Add(1)
|
||||
testManager.genericDeviceUpdateCallback(resourceName, []pluginapi.Device{
|
||||
{ID: "dev1", Health: pluginapi.Unhealthy},
|
||||
{ID: "dev2", Health: pluginapi.Healthy},
|
||||
})
|
||||
wg.Wait()
|
||||
|
||||
}
|
||||
|
@ -251,3 +251,27 @@ func TestDeviceRunContainerOptions(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetPodAndContainerForDevice(t *testing.T) {
|
||||
podDevices := newPodDevices()
|
||||
resourceName1 := "domain1.com/resource1"
|
||||
podID := "pod1"
|
||||
contID := "con1"
|
||||
devices := checkpoint.DevicesPerNUMA{0: []string{"dev1"}, 1: []string{"dev1"}}
|
||||
|
||||
podDevices.insert(podID, contID, resourceName1,
|
||||
devices,
|
||||
newContainerAllocateResponse(
|
||||
withDevices(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"}),
|
||||
withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}),
|
||||
),
|
||||
)
|
||||
|
||||
// dev2 is a new device
|
||||
podUID, _ := podDevices.getPodAndContainerForDevice("dev2")
|
||||
assert.Equal(t, "", podUID)
|
||||
|
||||
// dev1 is a exist device
|
||||
podUID, _ = podDevices.getPodAndContainerForDevice("dev1")
|
||||
assert.Equal(t, "pod1", podUID)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user