From 57f8b31b42ca3da1595f40430e46603631b9c609 Mon Sep 17 00:00:00 2001 From: Kevin Klues Date: Fri, 25 Mar 2022 10:54:00 +0000 Subject: [PATCH] Update tests to accommodate devicemanager refactoring Signed-off-by: Kevin Klues --- pkg/kubelet/cm/devicemanager/endpoint_test.go | 83 +++++++++++++++++-- pkg/kubelet/cm/devicemanager/manager_test.go | 82 ++++++++++++------ 2 files changed, 131 insertions(+), 34 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/endpoint_test.go b/pkg/kubelet/cm/devicemanager/endpoint_test.go index f20550985a1..419002f06c0 100644 --- a/pkg/kubelet/cm/devicemanager/endpoint_test.go +++ b/pkg/kubelet/cm/devicemanager/endpoint_test.go @@ -19,14 +19,53 @@ package devicemanager import ( "fmt" "path" + "sync" "testing" "time" "github.com/stretchr/testify/require" pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" + plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1" ) +// monitorCallback is the function called when a device's health state changes, +// or new devices are reported, or old devices are deleted. +// Updated contains the most recent state of the Device. +type monitorCallback func(resourceName string, devices []pluginapi.Device) + +func newMockPluginManager() *mockPluginManager { + return &mockPluginManager{ + func(string) error { return nil }, + func(string, plugin.DevicePlugin) error { return nil }, + func(string) {}, + func(string, *pluginapi.ListAndWatchResponse) {}, + } +} + +type mockPluginManager struct { + cleanupPluginDirectory func(string) error + pluginConnected func(string, plugin.DevicePlugin) error + pluginDisconnected func(string) + pluginListAndWatchReceiver func(string, *pluginapi.ListAndWatchResponse) +} + +func (m *mockPluginManager) CleanupPluginDirectory(r string) error { + return m.cleanupPluginDirectory(r) +} + +func (m *mockPluginManager) PluginConnected(r string, p plugin.DevicePlugin) error { + return m.pluginConnected(r, p) +} + +func (m *mockPluginManager) PluginDisconnected(r string) { + m.pluginDisconnected(r) +} + +func (m *mockPluginManager) PluginListAndWatchReceiver(r string, lr *pluginapi.ListAndWatchResponse) { + m.pluginListAndWatchReceiver(r, lr) +} + func esocketName() string { return fmt.Sprintf("mock%d.sock", time.Now().UnixNano()) } @@ -95,7 +134,7 @@ func TestRun(t *testing.T) { p, e := esetup(t, devs, socket, "mock", callback) defer ecleanup(t, p, e) - go e.run() + go e.client.Run() // Wait for the first callback to be issued. <-callbackChan @@ -146,7 +185,7 @@ func TestAllocate(t *testing.T) { return resp, nil }) - go e.run() + go e.client.Run() // Wait for the callback to be issued. select { case <-callbackChan: @@ -180,7 +219,7 @@ func TestGetPreferredAllocation(t *testing.T) { return resp, nil }) - go e.run() + go e.client.Run() // Wait for the callback to be issued. select { case <-callbackChan: @@ -194,19 +233,47 @@ func TestGetPreferredAllocation(t *testing.T) { require.Equal(t, resp, respOut) } -func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback monitorCallback) (*Stub, *endpointImpl) { - p := NewDevicePluginStub(devs, socket, resourceName, false, false) +func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback monitorCallback) (*plugin.Stub, *endpointImpl) { + m := newMockPluginManager() + m.pluginListAndWatchReceiver = func(r string, resp *pluginapi.ListAndWatchResponse) { + var newDevs []pluginapi.Device + for _, d := range resp.Devices { + newDevs = append(newDevs, *d) + } + callback(resourceName, newDevs) + } + + var dp plugin.DevicePlugin + var wg sync.WaitGroup + wg.Add(1) + m.pluginConnected = func(r string, c plugin.DevicePlugin) error { + dp = c + wg.Done() + return nil + } + + p := plugin.NewDevicePluginStub(devs, socket, resourceName, false, false) err := p.Start() require.NoError(t, err) - e, err := newEndpointImpl(socket, resourceName, callback) + c := plugin.NewPluginClient(resourceName, socket, m) + err = c.Connect() require.NoError(t, err) + wg.Wait() + + e := newEndpointImpl(dp) + e.client = c + + m.pluginDisconnected = func(r string) { + e.setStopTime(time.Now()) + } + return p, e } -func ecleanup(t *testing.T, p *Stub, e *endpointImpl) { +func ecleanup(t *testing.T, p *plugin.Stub, e *endpointImpl) { p.Stop() - e.stop() + e.client.Disconnect() } diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 0345a67f8c8..22c1c6ec5af 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -40,6 +40,7 @@ import ( watcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint" + plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" "k8s.io/kubernetes/pkg/kubelet/config" @@ -52,6 +53,30 @@ const ( testResourceName = "fake-domain/resource" ) +func newWrappedManagerImpl(socketPath string, manager *ManagerImpl) *wrappedManagerImpl { + w := &wrappedManagerImpl{ + ManagerImpl: manager, + callback: manager.genericDeviceUpdateCallback, + } + w.socketdir, _ = filepath.Split(socketPath) + w.server, _ = plugin.NewServer(socketPath, w, w) + return w +} + +type wrappedManagerImpl struct { + *ManagerImpl + socketdir string + callback func(string, []pluginapi.Device) +} + +func (m *wrappedManagerImpl) PluginListAndWatchReceiver(r string, resp *pluginapi.ListAndWatchResponse) { + var devices []pluginapi.Device + for _, d := range resp.Devices { + devices = append(devices, *d) + } + m.callback(r, devices) +} + func tmpSocketDir() (socketDir, socketName, pluginSocketName string, err error) { socketDir, err = ioutil.TempDir("", "device_plugin") if err != nil { @@ -121,7 +146,7 @@ func TestDevicePluginReRegistration(t *testing.T) { require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable") require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.") - p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, preStartContainerFlag, getPreferredAllocationFlag) + p2 := plugin.NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, preStartContainerFlag, getPreferredAllocationFlag) err = p2.Start() require.NoError(t, err) p2.Register(socketName, testResourceName, "") @@ -138,7 +163,7 @@ func TestDevicePluginReRegistration(t *testing.T) { require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices shouldn't change.") // Test the scenario that a plugin re-registers with different devices. - p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, preStartContainerFlag, getPreferredAllocationFlag) + p3 := plugin.NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, preStartContainerFlag, getPreferredAllocationFlag) err = p3.Start() require.NoError(t, err) p3.Register(socketName, testResourceName, "") @@ -191,7 +216,7 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) { require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable") require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.") - p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, false, false) + p2 := plugin.NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, false, false) err = p2.Start() require.NoError(t, err) // Wait for the second callback to be issued. @@ -208,7 +233,7 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) { require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.") // Test the scenario that a plugin re-registers with different devices. - p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, false, false) + p3 := plugin.NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, false, false) err = p3.Start() require.NoError(t, err) // Wait for the third callback to be issued. @@ -234,12 +259,13 @@ func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitor require.NoError(t, err) updateChan := make(chan interface{}) + w := newWrappedManagerImpl(socketName, m) if callback != nil { - m.callback = callback + w.callback = callback } - originalCallback := m.callback - m.callback = func(resourceName string, devices []pluginapi.Device) { + originalCallback := w.callback + w.callback = func(resourceName string, devices []pluginapi.Device) { originalCallback(resourceName, devices) updateChan <- new(interface{}) } @@ -247,14 +273,14 @@ func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitor return []*v1.Pod{} } - err = m.Start(activePods, &sourcesReadyStub{}) + err = w.Start(activePods, &sourcesReadyStub{}) require.NoError(t, err) - return m, updateChan + return w, updateChan } -func setupDevicePlugin(t *testing.T, devs []*pluginapi.Device, pluginSocketName string) *Stub { - p := NewDevicePluginStub(devs, pluginSocketName, testResourceName, false, false) +func setupDevicePlugin(t *testing.T, devs []*pluginapi.Device, pluginSocketName string) *plugin.Stub { + p := plugin.NewDevicePluginStub(devs, pluginSocketName, testResourceName, false, false) err := p.Start() require.NoError(t, err) return p @@ -276,20 +302,20 @@ func runPluginManager(pluginManager pluginmanager.PluginManager) { go pluginManager.Run(sourcesReady, wait.NeverStop) } -func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *Stub) { +func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *plugin.Stub) { m, updateChan := setupDeviceManager(t, devs, callback, socketName) p := setupDevicePlugin(t, devs, pluginSocketName) return m, updateChan, p } -func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *Stub, pluginmanager.PluginManager) { +func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *plugin.Stub, pluginmanager.PluginManager) { m, updateChan := setupDeviceManager(t, devs, callback, socketName) p := setupDevicePlugin(t, devs, pluginSocketName) pm := setupPluginManager(t, pluginSocketName, m) return m, updateChan, p, pm } -func cleanup(t *testing.T, m Manager, p *Stub) { +func cleanup(t *testing.T, m Manager, p *plugin.Stub) { p.Stop() m.Stop() } @@ -365,6 +391,7 @@ func TestUpdateCapacityAllocatable(t *testing.T) { // Tests adding another resource. resourceName2 := "resource2" e2 := &endpointImpl{} + e2.client = plugin.NewPluginClient(resourceName2, socketName, testManager) testManager.endpoints[resourceName2] = endpointInfo{e: e2, opts: nil} callback(resourceName2, devs) capacity, allocatable, removedResources = testManager.GetCapacity() @@ -394,7 +421,7 @@ func TestUpdateCapacityAllocatable(t *testing.T) { // Stops resourceName2 endpoint. Verifies its stopTime is set, allocate and // preStartContainer calls return errors. - e2.stop() + e2.client.Disconnect() as.False(e2.stopTime.IsZero()) _, err = e2.allocate([]string{"Device1"}) reflect.DeepEqual(err, fmt.Errorf(errEndpointStopped, e2)) @@ -661,11 +688,6 @@ type MockEndpoint struct { initChan chan []string } -func (m *MockEndpoint) stop() {} -func (m *MockEndpoint) run() {} - -func (m *MockEndpoint) callback(resourceName string, devices []pluginapi.Device) {} - func (m *MockEndpoint) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) { m.initChan <- devs return &pluginapi.PreStartContainerResponse{}, nil @@ -685,6 +707,8 @@ func (m *MockEndpoint) allocate(devs []string) (*pluginapi.AllocateResponse, err return nil, nil } +func (m *MockEndpoint) setStopTime(t time.Time) {} + func (m *MockEndpoint) isStopped() bool { return false } func (m *MockEndpoint) stopGracePeriodExpired() bool { return false } @@ -706,15 +730,13 @@ func makePod(limits v1.ResourceList) *v1.Pod { } } -func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource) (*ManagerImpl, error) { +func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource) (*wrappedManagerImpl, error) { monitorCallback := func(resourceName string, devices []pluginapi.Device) {} ckm, err := checkpointmanager.NewCheckpointManager(tmpDir) if err != nil { return nil, err } - testManager := &ManagerImpl{ - socketdir: tmpDir, - callback: monitorCallback, + m := &ManagerImpl{ healthyDevices: make(map[string]sets.String), unhealthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), @@ -727,6 +749,11 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso checkpointManager: ckm, allDevices: NewResourceDeviceInstances(), } + testManager := &wrappedManagerImpl{ + ManagerImpl: m, + socketdir: tmpDir, + callback: monitorCallback, + } for _, res := range testRes { testManager.healthyDevices[res.resourceName] = sets.NewString(res.devs.Devices().UnsortedList()...) @@ -1141,13 +1168,16 @@ func TestUpdatePluginResources(t *testing.T) { ckm, err := checkpointmanager.NewCheckpointManager(tmpDir) as.Nil(err) - testManager := &ManagerImpl{ - callback: monitorCallback, + m := &ManagerImpl{ allocatedDevices: make(map[string]sets.String), healthyDevices: make(map[string]sets.String), podDevices: newPodDevices(), checkpointManager: ckm, } + testManager := wrappedManagerImpl{ + ManagerImpl: m, + callback: monitorCallback, + } testManager.podDevices.devs[string(pod.UID)] = make(containerDevices) // require one of resource1 and one of resource2