From 1b6efa5e21a2eae2923399274173d99e737d73ec Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Wed, 4 Aug 2021 13:09:42 +0200 Subject: [PATCH] devicemanager: skip unhealthy devs in GetAllocatable The GetAllocatableDevices, needed to support the podresources API, doesn't take into account the device health when computing its output. In this PR we address this gap and add unit tests along the way to prevent regressions. This gives us a good initial coverage, E2E tests to cover this case are much harder to write, because we would need to inject faults to trigger the unhealthy status. We will evaluate if adding these tests into later PRs. Signed-off-by: Francesco Romani --- pkg/kubelet/cm/devicemanager/manager.go | 8 +- pkg/kubelet/cm/devicemanager/manager_test.go | 94 ++++++++++++++++++ pkg/kubelet/cm/devicemanager/pod_devices.go | 18 ++++ .../cm/devicemanager/pod_devices_test.go | 99 +++++++++++++++++++ 4 files changed, 215 insertions(+), 4 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 424aacfc224..93ebe02f0c3 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -1103,12 +1103,12 @@ func (m *ManagerImpl) isDevicePluginResource(resource string) bool { return false } -// GetAllocatableDevices returns information about all the devices known to the manager +// GetAllocatableDevices returns information about all the healthy devices known to the manager func (m *ManagerImpl) GetAllocatableDevices() ResourceDeviceInstances { m.mutex.Lock() - resp := m.allDevices.Clone() - m.mutex.Unlock() - klog.V(4).InfoS("Known devices", "numDevices", len(resp)) + defer m.mutex.Unlock() + resp := m.allDevices.Filter(m.healthyDevices) + klog.V(4).InfoS("GetAllocatableDevices", "known", len(m.allDevices), "allocatable", len(resp)) return resp } diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 0f74dd5a488..30c897e47d0 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -434,6 +434,100 @@ func TestUpdateCapacityAllocatable(t *testing.T) { as.True(testManager.isDevicePluginResource(resourceName2)) } +func TestGetAllocatableDevicesMultipleResources(t *testing.T) { + socketDir, socketName, _, err := tmpSocketDir() + topologyStore := topologymanager.NewFakeManager() + require.NoError(t, err) + defer os.RemoveAll(socketDir) + testManager, err := newManagerImpl(socketName, nil, topologyStore) + as := assert.New(t) + as.NotNil(testManager) + as.Nil(err) + + resource1Devs := []pluginapi.Device{ + {ID: "R1Device1", Health: pluginapi.Healthy}, + {ID: "R1Device2", Health: pluginapi.Healthy}, + {ID: "R1Device3", Health: pluginapi.Unhealthy}, + } + resourceName1 := "domain1.com/resource1" + e1 := &endpointImpl{} + testManager.endpoints[resourceName1] = endpointInfo{e: e1, opts: nil} + testManager.genericDeviceUpdateCallback(resourceName1, resource1Devs) + + resource2Devs := []pluginapi.Device{ + {ID: "R2Device1", Health: pluginapi.Healthy}, + } + resourceName2 := "other.domain2.org/resource2" + e2 := &endpointImpl{} + testManager.endpoints[resourceName2] = endpointInfo{e: e2, opts: nil} + testManager.genericDeviceUpdateCallback(resourceName2, resource2Devs) + + allocatableDevs := testManager.GetAllocatableDevices() + as.Equal(2, len(allocatableDevs)) + + devInstances1, ok := allocatableDevs[resourceName1] + as.True(ok) + checkAllocatableDevicesConsistsOf(as, devInstances1, []string{"R1Device1", "R1Device2"}) + + devInstances2, ok := allocatableDevs[resourceName2] + as.True(ok) + checkAllocatableDevicesConsistsOf(as, devInstances2, []string{"R2Device1"}) + +} + +func TestGetAllocatableDevicesHealthTransition(t *testing.T) { + socketDir, socketName, _, err := tmpSocketDir() + topologyStore := topologymanager.NewFakeManager() + require.NoError(t, err) + defer os.RemoveAll(socketDir) + testManager, err := newManagerImpl(socketName, nil, topologyStore) + as := assert.New(t) + as.NotNil(testManager) + as.Nil(err) + + resource1Devs := []pluginapi.Device{ + {ID: "R1Device1", Health: pluginapi.Healthy}, + {ID: "R1Device2", Health: pluginapi.Healthy}, + {ID: "R1Device3", Health: pluginapi.Unhealthy}, + } + + // Adds three devices for resource1, two healthy and one unhealthy. + // Expects allocatable devices for resource1 to be 2. + resourceName1 := "domain1.com/resource1" + e1 := &endpointImpl{} + testManager.endpoints[resourceName1] = endpointInfo{e: e1, opts: nil} + + testManager.genericDeviceUpdateCallback(resourceName1, resource1Devs) + + allocatableDevs := testManager.GetAllocatableDevices() + as.Equal(1, len(allocatableDevs)) + devInstances, ok := allocatableDevs[resourceName1] + as.True(ok) + checkAllocatableDevicesConsistsOf(as, devInstances, []string{"R1Device1", "R1Device2"}) + + // Unhealthy device becomes healthy + resource1Devs = []pluginapi.Device{ + {ID: "R1Device1", Health: pluginapi.Healthy}, + {ID: "R1Device2", Health: pluginapi.Healthy}, + {ID: "R1Device3", Health: pluginapi.Healthy}, + } + testManager.genericDeviceUpdateCallback(resourceName1, resource1Devs) + + allocatableDevs = testManager.GetAllocatableDevices() + as.Equal(1, len(allocatableDevs)) + devInstances, ok = allocatableDevs[resourceName1] + as.True(ok) + checkAllocatableDevicesConsistsOf(as, devInstances, []string{"R1Device1", "R1Device2", "R1Device3"}) +} + +func checkAllocatableDevicesConsistsOf(as *assert.Assertions, devInstances DeviceInstances, expectedDevs []string) { + as.Equal(len(expectedDevs), len(devInstances)) + for _, deviceID := range expectedDevs { + _, ok := devInstances[deviceID] + as.True(ok) + } +} + func constructDevices(devices []string) checkpoint.DevicesPerNUMA { ret := checkpoint.DevicesPerNUMA{} for _, dev := range devices { diff --git a/pkg/kubelet/cm/devicemanager/pod_devices.go b/pkg/kubelet/cm/devicemanager/pod_devices.go index 8996930056c..267e0bb2a7d 100644 --- a/pkg/kubelet/cm/devicemanager/pod_devices.go +++ b/pkg/kubelet/cm/devicemanager/pod_devices.go @@ -384,3 +384,21 @@ func (rdev ResourceDeviceInstances) Clone() ResourceDeviceInstances { } return clone } + +// Filter takes a condition set expressed as map[string]sets.String and returns a new +// ResourceDeviceInstances with only the devices matching the condition set. +func (rdev ResourceDeviceInstances) Filter(cond map[string]sets.String) ResourceDeviceInstances { + filtered := NewResourceDeviceInstances() + for resourceName, filterIDs := range cond { + if _, exists := rdev[resourceName]; !exists { + continue + } + filtered[resourceName] = DeviceInstances{} + for instanceID, instance := range rdev[resourceName] { + if filterIDs.Has(instanceID) { + filtered[resourceName][instanceID] = instance + } + } + } + return filtered +} diff --git a/pkg/kubelet/cm/devicemanager/pod_devices_test.go b/pkg/kubelet/cm/devicemanager/pod_devices_test.go index 72264c467f6..37a1e3a7dfb 100644 --- a/pkg/kubelet/cm/devicemanager/pod_devices_test.go +++ b/pkg/kubelet/cm/devicemanager/pod_devices_test.go @@ -17,10 +17,13 @@ limitations under the License. package devicemanager import ( + "encoding/json" "testing" "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/util/sets" + pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint" ) @@ -50,3 +53,99 @@ func TestGetContainerDevices(t *testing.T) { } } } + +func TestResourceDeviceInstanceFilter(t *testing.T) { + var expected string + var cond map[string]sets.String + var resp ResourceDeviceInstances + devs := ResourceDeviceInstances{ + "foo": DeviceInstances{ + "dev-foo1": pluginapi.Device{ + ID: "foo1", + }, + "dev-foo2": pluginapi.Device{ + ID: "foo2", + }, + "dev-foo3": pluginapi.Device{ + ID: "foo3", + }, + }, + "bar": DeviceInstances{ + "dev-bar1": pluginapi.Device{ + ID: "bar1", + }, + "dev-bar2": pluginapi.Device{ + ID: "bar2", + }, + "dev-bar3": pluginapi.Device{ + ID: "bar3", + }, + }, + "baz": DeviceInstances{ + "dev-baz1": pluginapi.Device{ + ID: "baz1", + }, + "dev-baz2": pluginapi.Device{ + ID: "baz2", + }, + "dev-baz3": pluginapi.Device{ + ID: "baz3", + }, + }, + } + + resp = devs.Filter(map[string]sets.String{}) + expected = `{}` + expectResourceDeviceInstances(t, resp, expected) + + cond = map[string]sets.String{ + "foo": sets.NewString("dev-foo1", "dev-foo2"), + "bar": sets.NewString("dev-bar1"), + } + resp = devs.Filter(cond) + expected = `{"bar":{"dev-bar1":{"ID":"bar1"}},"foo":{"dev-foo1":{"ID":"foo1"},"dev-foo2":{"ID":"foo2"}}}` + expectResourceDeviceInstances(t, resp, expected) + + cond = map[string]sets.String{ + "foo": sets.NewString("dev-foo1", "dev-foo2", "dev-foo3"), + "bar": sets.NewString("dev-bar1", "dev-bar2", "dev-bar3"), + "baz": sets.NewString("dev-baz1", "dev-baz2", "dev-baz3"), + } + resp = devs.Filter(cond) + expected = `{"bar":{"dev-bar1":{"ID":"bar1"},"dev-bar2":{"ID":"bar2"},"dev-bar3":{"ID":"bar3"}},"baz":{"dev-baz1":{"ID":"baz1"},"dev-baz2":{"ID":"baz2"},"dev-baz3":{"ID":"baz3"}},"foo":{"dev-foo1":{"ID":"foo1"},"dev-foo2":{"ID":"foo2"},"dev-foo3":{"ID":"foo3"}}}` + expectResourceDeviceInstances(t, resp, expected) + + cond = map[string]sets.String{ + "foo": sets.NewString("dev-foo1", "dev-foo2", "dev-foo3", "dev-foo4"), + "bar": sets.NewString("dev-bar1", "dev-bar2", "dev-bar3", "dev-bar4"), + "baz": sets.NewString("dev-baz1", "dev-baz2", "dev-baz3", "dev-bar4"), + } + resp = devs.Filter(cond) + expected = `{"bar":{"dev-bar1":{"ID":"bar1"},"dev-bar2":{"ID":"bar2"},"dev-bar3":{"ID":"bar3"}},"baz":{"dev-baz1":{"ID":"baz1"},"dev-baz2":{"ID":"baz2"},"dev-baz3":{"ID":"baz3"}},"foo":{"dev-foo1":{"ID":"foo1"},"dev-foo2":{"ID":"foo2"},"dev-foo3":{"ID":"foo3"}}}` + expectResourceDeviceInstances(t, resp, expected) + + cond = map[string]sets.String{ + "foo": sets.NewString("dev-foo1", "dev-foo4", "dev-foo7"), + "bar": sets.NewString("dev-bar1", "dev-bar4", "dev-bar7"), + "baz": sets.NewString("dev-baz1", "dev-baz4", "dev-baz7"), + } + resp = devs.Filter(cond) + expected = `{"bar":{"dev-bar1":{"ID":"bar1"}},"baz":{"dev-baz1":{"ID":"baz1"}},"foo":{"dev-foo1":{"ID":"foo1"}}}` + expectResourceDeviceInstances(t, resp, expected) + +} + +func expectResourceDeviceInstances(t *testing.T, resp ResourceDeviceInstances, expected string) { + // per docs in https://pkg.go.dev/encoding/json#Marshal + // "Map values encode as JSON objects. The map's key type must either be a string, an integer type, or + // implement encoding.TextMarshaler. The map keys are sorted [...]" + // so this check is expected to be stable and not flaky + data, err := json.Marshal(resp) + if err != nil { + t.Fatalf("unexpected JSON marshalling error: %v", err) + } + got := string(data) + if got != expected { + t.Errorf("expected %q got %q", expected, got) + } +}