diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 424aacfc224..93ebe02f0c3 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -1103,12 +1103,12 @@ func (m *ManagerImpl) isDevicePluginResource(resource string) bool { return false } -// GetAllocatableDevices returns information about all the devices known to the manager +// GetAllocatableDevices returns information about all the healthy devices known to the manager func (m *ManagerImpl) GetAllocatableDevices() ResourceDeviceInstances { m.mutex.Lock() - resp := m.allDevices.Clone() - m.mutex.Unlock() - klog.V(4).InfoS("Known devices", "numDevices", len(resp)) + defer m.mutex.Unlock() + resp := m.allDevices.Filter(m.healthyDevices) + klog.V(4).InfoS("GetAllocatableDevices", "known", len(m.allDevices), "allocatable", len(resp)) return resp } diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 0f74dd5a488..30c897e47d0 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -434,6 +434,100 @@ func TestUpdateCapacityAllocatable(t *testing.T) { as.True(testManager.isDevicePluginResource(resourceName2)) } +func TestGetAllocatableDevicesMultipleResources(t *testing.T) { + socketDir, socketName, _, err := tmpSocketDir() + topologyStore := topologymanager.NewFakeManager() + require.NoError(t, err) + defer os.RemoveAll(socketDir) + testManager, err := newManagerImpl(socketName, nil, topologyStore) + as := assert.New(t) + as.NotNil(testManager) + as.Nil(err) + + resource1Devs := []pluginapi.Device{ + {ID: "R1Device1", Health: pluginapi.Healthy}, + {ID: "R1Device2", Health: pluginapi.Healthy}, + {ID: "R1Device3", Health: pluginapi.Unhealthy}, + } + resourceName1 := "domain1.com/resource1" + e1 := &endpointImpl{} + testManager.endpoints[resourceName1] = endpointInfo{e: e1, opts: nil} + testManager.genericDeviceUpdateCallback(resourceName1, resource1Devs) + + resource2Devs := []pluginapi.Device{ + {ID: "R2Device1", Health: pluginapi.Healthy}, + } + resourceName2 := "other.domain2.org/resource2" + e2 := &endpointImpl{} + testManager.endpoints[resourceName2] = endpointInfo{e: e2, opts: nil} + testManager.genericDeviceUpdateCallback(resourceName2, resource2Devs) + + allocatableDevs := testManager.GetAllocatableDevices() + as.Equal(2, len(allocatableDevs)) + + devInstances1, ok := allocatableDevs[resourceName1] + as.True(ok) + checkAllocatableDevicesConsistsOf(as, devInstances1, []string{"R1Device1", "R1Device2"}) + + devInstances2, ok := allocatableDevs[resourceName2] + as.True(ok) + checkAllocatableDevicesConsistsOf(as, devInstances2, []string{"R2Device1"}) + +} + +func TestGetAllocatableDevicesHealthTransition(t *testing.T) { + socketDir, socketName, _, err := tmpSocketDir() + topologyStore := topologymanager.NewFakeManager() + require.NoError(t, err) + defer os.RemoveAll(socketDir) + testManager, err := newManagerImpl(socketName, nil, topologyStore) + as := assert.New(t) + as.NotNil(testManager) + as.Nil(err) + + resource1Devs := []pluginapi.Device{ + {ID: "R1Device1", Health: pluginapi.Healthy}, + {ID: "R1Device2", Health: pluginapi.Healthy}, + {ID: "R1Device3", Health: pluginapi.Unhealthy}, + } + + // Adds three devices for resource1, two healthy and one unhealthy. + // Expects allocatable devices for resource1 to be 2. + resourceName1 := "domain1.com/resource1" + e1 := &endpointImpl{} + testManager.endpoints[resourceName1] = endpointInfo{e: e1, opts: nil} + + testManager.genericDeviceUpdateCallback(resourceName1, resource1Devs) + + allocatableDevs := testManager.GetAllocatableDevices() + as.Equal(1, len(allocatableDevs)) + devInstances, ok := allocatableDevs[resourceName1] + as.True(ok) + checkAllocatableDevicesConsistsOf(as, devInstances, []string{"R1Device1", "R1Device2"}) + + // Unhealthy device becomes healthy + resource1Devs = []pluginapi.Device{ + {ID: "R1Device1", Health: pluginapi.Healthy}, + {ID: "R1Device2", Health: pluginapi.Healthy}, + {ID: "R1Device3", Health: pluginapi.Healthy}, + } + testManager.genericDeviceUpdateCallback(resourceName1, resource1Devs) + + allocatableDevs = testManager.GetAllocatableDevices() + as.Equal(1, len(allocatableDevs)) + devInstances, ok = allocatableDevs[resourceName1] + as.True(ok) + checkAllocatableDevicesConsistsOf(as, devInstances, []string{"R1Device1", "R1Device2", "R1Device3"}) +} + +func checkAllocatableDevicesConsistsOf(as *assert.Assertions, devInstances DeviceInstances, expectedDevs []string) { + as.Equal(len(expectedDevs), len(devInstances)) + for _, deviceID := range expectedDevs { + _, ok := devInstances[deviceID] + as.True(ok) + } +} + func constructDevices(devices []string) checkpoint.DevicesPerNUMA { ret := checkpoint.DevicesPerNUMA{} for _, dev := range devices { diff --git a/pkg/kubelet/cm/devicemanager/pod_devices.go b/pkg/kubelet/cm/devicemanager/pod_devices.go index 8996930056c..267e0bb2a7d 100644 --- a/pkg/kubelet/cm/devicemanager/pod_devices.go +++ b/pkg/kubelet/cm/devicemanager/pod_devices.go @@ -384,3 +384,21 @@ func (rdev ResourceDeviceInstances) Clone() ResourceDeviceInstances { } return clone } + +// Filter takes a condition set expressed as map[string]sets.String and returns a new +// ResourceDeviceInstances with only the devices matching the condition set. +func (rdev ResourceDeviceInstances) Filter(cond map[string]sets.String) ResourceDeviceInstances { + filtered := NewResourceDeviceInstances() + for resourceName, filterIDs := range cond { + if _, exists := rdev[resourceName]; !exists { + continue + } + filtered[resourceName] = DeviceInstances{} + for instanceID, instance := range rdev[resourceName] { + if filterIDs.Has(instanceID) { + filtered[resourceName][instanceID] = instance + } + } + } + return filtered +} diff --git a/pkg/kubelet/cm/devicemanager/pod_devices_test.go b/pkg/kubelet/cm/devicemanager/pod_devices_test.go index 72264c467f6..37a1e3a7dfb 100644 --- a/pkg/kubelet/cm/devicemanager/pod_devices_test.go +++ b/pkg/kubelet/cm/devicemanager/pod_devices_test.go @@ -17,10 +17,13 @@ limitations under the License. package devicemanager import ( + "encoding/json" "testing" "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/util/sets" + pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint" ) @@ -50,3 +53,99 @@ func TestGetContainerDevices(t *testing.T) { } } } + +func TestResourceDeviceInstanceFilter(t *testing.T) { + var expected string + var cond map[string]sets.String + var resp ResourceDeviceInstances + devs := ResourceDeviceInstances{ + "foo": DeviceInstances{ + "dev-foo1": pluginapi.Device{ + ID: "foo1", + }, + "dev-foo2": pluginapi.Device{ + ID: "foo2", + }, + "dev-foo3": pluginapi.Device{ + ID: "foo3", + }, + }, + "bar": DeviceInstances{ + "dev-bar1": pluginapi.Device{ + ID: "bar1", + }, + "dev-bar2": pluginapi.Device{ + ID: "bar2", + }, + "dev-bar3": pluginapi.Device{ + ID: "bar3", + }, + }, + "baz": DeviceInstances{ + "dev-baz1": pluginapi.Device{ + ID: "baz1", + }, + "dev-baz2": pluginapi.Device{ + ID: "baz2", + }, + "dev-baz3": pluginapi.Device{ + ID: "baz3", + }, + }, + } + + resp = devs.Filter(map[string]sets.String{}) + expected = `{}` + expectResourceDeviceInstances(t, resp, expected) + + cond = map[string]sets.String{ + "foo": sets.NewString("dev-foo1", "dev-foo2"), + "bar": sets.NewString("dev-bar1"), + } + resp = devs.Filter(cond) + expected = `{"bar":{"dev-bar1":{"ID":"bar1"}},"foo":{"dev-foo1":{"ID":"foo1"},"dev-foo2":{"ID":"foo2"}}}` + expectResourceDeviceInstances(t, resp, expected) + + cond = map[string]sets.String{ + "foo": sets.NewString("dev-foo1", "dev-foo2", "dev-foo3"), + "bar": sets.NewString("dev-bar1", "dev-bar2", "dev-bar3"), + "baz": sets.NewString("dev-baz1", "dev-baz2", "dev-baz3"), + } + resp = devs.Filter(cond) + expected = `{"bar":{"dev-bar1":{"ID":"bar1"},"dev-bar2":{"ID":"bar2"},"dev-bar3":{"ID":"bar3"}},"baz":{"dev-baz1":{"ID":"baz1"},"dev-baz2":{"ID":"baz2"},"dev-baz3":{"ID":"baz3"}},"foo":{"dev-foo1":{"ID":"foo1"},"dev-foo2":{"ID":"foo2"},"dev-foo3":{"ID":"foo3"}}}` + expectResourceDeviceInstances(t, resp, expected) + + cond = map[string]sets.String{ + "foo": sets.NewString("dev-foo1", "dev-foo2", "dev-foo3", "dev-foo4"), + "bar": sets.NewString("dev-bar1", "dev-bar2", "dev-bar3", "dev-bar4"), + "baz": sets.NewString("dev-baz1", "dev-baz2", "dev-baz3", "dev-bar4"), + } + resp = devs.Filter(cond) + expected = `{"bar":{"dev-bar1":{"ID":"bar1"},"dev-bar2":{"ID":"bar2"},"dev-bar3":{"ID":"bar3"}},"baz":{"dev-baz1":{"ID":"baz1"},"dev-baz2":{"ID":"baz2"},"dev-baz3":{"ID":"baz3"}},"foo":{"dev-foo1":{"ID":"foo1"},"dev-foo2":{"ID":"foo2"},"dev-foo3":{"ID":"foo3"}}}` + expectResourceDeviceInstances(t, resp, expected) + + cond = map[string]sets.String{ + "foo": sets.NewString("dev-foo1", "dev-foo4", "dev-foo7"), + "bar": sets.NewString("dev-bar1", "dev-bar4", "dev-bar7"), + "baz": sets.NewString("dev-baz1", "dev-baz4", "dev-baz7"), + } + resp = devs.Filter(cond) + expected = `{"bar":{"dev-bar1":{"ID":"bar1"}},"baz":{"dev-baz1":{"ID":"baz1"}},"foo":{"dev-foo1":{"ID":"foo1"}}}` + expectResourceDeviceInstances(t, resp, expected) + +} + +func expectResourceDeviceInstances(t *testing.T, resp ResourceDeviceInstances, expected string) { + // per docs in https://pkg.go.dev/encoding/json#Marshal + // "Map values encode as JSON objects. The map's key type must either be a string, an integer type, or + // implement encoding.TextMarshaler. The map keys are sorted [...]" + // so this check is expected to be stable and not flaky + data, err := json.Marshal(resp) + if err != nil { + t.Fatalf("unexpected JSON marshalling error: %v", err) + } + got := string(data) + if got != expected { + t.Errorf("expected %q got %q", expected, got) + } +}