Update tests to accommodate devicemanager refactoring

Signed-off-by: Kevin Klues <kklues@nvidia.com>
This commit is contained in:
Kevin Klues 2022-03-25 10:54:00 +00:00
parent f6eaa25b71
commit 57f8b31b42
2 changed files with 131 additions and 34 deletions

View File

@ -19,14 +19,53 @@ package devicemanager
import (
"fmt"
"path"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1"
)
// monitorCallback is the function called when a device's health state changes,
// or new devices are reported, or old devices are deleted.
// Updated contains the most recent state of the Device.
type monitorCallback func(resourceName string, devices []pluginapi.Device)
func newMockPluginManager() *mockPluginManager {
return &mockPluginManager{
func(string) error { return nil },
func(string, plugin.DevicePlugin) error { return nil },
func(string) {},
func(string, *pluginapi.ListAndWatchResponse) {},
}
}
type mockPluginManager struct {
cleanupPluginDirectory func(string) error
pluginConnected func(string, plugin.DevicePlugin) error
pluginDisconnected func(string)
pluginListAndWatchReceiver func(string, *pluginapi.ListAndWatchResponse)
}
func (m *mockPluginManager) CleanupPluginDirectory(r string) error {
return m.cleanupPluginDirectory(r)
}
func (m *mockPluginManager) PluginConnected(r string, p plugin.DevicePlugin) error {
return m.pluginConnected(r, p)
}
func (m *mockPluginManager) PluginDisconnected(r string) {
m.pluginDisconnected(r)
}
func (m *mockPluginManager) PluginListAndWatchReceiver(r string, lr *pluginapi.ListAndWatchResponse) {
m.pluginListAndWatchReceiver(r, lr)
}
func esocketName() string {
return fmt.Sprintf("mock%d.sock", time.Now().UnixNano())
}
@ -95,7 +134,7 @@ func TestRun(t *testing.T) {
p, e := esetup(t, devs, socket, "mock", callback)
defer ecleanup(t, p, e)
go e.run()
go e.client.Run()
// Wait for the first callback to be issued.
<-callbackChan
@ -146,7 +185,7 @@ func TestAllocate(t *testing.T) {
return resp, nil
})
go e.run()
go e.client.Run()
// Wait for the callback to be issued.
select {
case <-callbackChan:
@ -180,7 +219,7 @@ func TestGetPreferredAllocation(t *testing.T) {
return resp, nil
})
go e.run()
go e.client.Run()
// Wait for the callback to be issued.
select {
case <-callbackChan:
@ -194,19 +233,47 @@ func TestGetPreferredAllocation(t *testing.T) {
require.Equal(t, resp, respOut)
}
func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback monitorCallback) (*Stub, *endpointImpl) {
p := NewDevicePluginStub(devs, socket, resourceName, false, false)
func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback monitorCallback) (*plugin.Stub, *endpointImpl) {
m := newMockPluginManager()
m.pluginListAndWatchReceiver = func(r string, resp *pluginapi.ListAndWatchResponse) {
var newDevs []pluginapi.Device
for _, d := range resp.Devices {
newDevs = append(newDevs, *d)
}
callback(resourceName, newDevs)
}
var dp plugin.DevicePlugin
var wg sync.WaitGroup
wg.Add(1)
m.pluginConnected = func(r string, c plugin.DevicePlugin) error {
dp = c
wg.Done()
return nil
}
p := plugin.NewDevicePluginStub(devs, socket, resourceName, false, false)
err := p.Start()
require.NoError(t, err)
e, err := newEndpointImpl(socket, resourceName, callback)
c := plugin.NewPluginClient(resourceName, socket, m)
err = c.Connect()
require.NoError(t, err)
wg.Wait()
e := newEndpointImpl(dp)
e.client = c
m.pluginDisconnected = func(r string) {
e.setStopTime(time.Now())
}
return p, e
}
func ecleanup(t *testing.T, p *Stub, e *endpointImpl) {
func ecleanup(t *testing.T, p *plugin.Stub, e *endpointImpl) {
p.Stop()
e.stop()
e.client.Disconnect()
}

View File

@ -40,6 +40,7 @@ import (
watcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
"k8s.io/kubernetes/pkg/kubelet/config"
@ -52,6 +53,30 @@ const (
testResourceName = "fake-domain/resource"
)
func newWrappedManagerImpl(socketPath string, manager *ManagerImpl) *wrappedManagerImpl {
w := &wrappedManagerImpl{
ManagerImpl: manager,
callback: manager.genericDeviceUpdateCallback,
}
w.socketdir, _ = filepath.Split(socketPath)
w.server, _ = plugin.NewServer(socketPath, w, w)
return w
}
type wrappedManagerImpl struct {
*ManagerImpl
socketdir string
callback func(string, []pluginapi.Device)
}
func (m *wrappedManagerImpl) PluginListAndWatchReceiver(r string, resp *pluginapi.ListAndWatchResponse) {
var devices []pluginapi.Device
for _, d := range resp.Devices {
devices = append(devices, *d)
}
m.callback(r, devices)
}
func tmpSocketDir() (socketDir, socketName, pluginSocketName string, err error) {
socketDir, err = ioutil.TempDir("", "device_plugin")
if err != nil {
@ -121,7 +146,7 @@ func TestDevicePluginReRegistration(t *testing.T) {
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, preStartContainerFlag, getPreferredAllocationFlag)
p2 := plugin.NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, preStartContainerFlag, getPreferredAllocationFlag)
err = p2.Start()
require.NoError(t, err)
p2.Register(socketName, testResourceName, "")
@ -138,7 +163,7 @@ func TestDevicePluginReRegistration(t *testing.T) {
require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices shouldn't change.")
// Test the scenario that a plugin re-registers with different devices.
p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, preStartContainerFlag, getPreferredAllocationFlag)
p3 := plugin.NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, preStartContainerFlag, getPreferredAllocationFlag)
err = p3.Start()
require.NoError(t, err)
p3.Register(socketName, testResourceName, "")
@ -191,7 +216,7 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) {
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, false, false)
p2 := plugin.NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, false, false)
err = p2.Start()
require.NoError(t, err)
// Wait for the second callback to be issued.
@ -208,7 +233,7 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) {
require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
// Test the scenario that a plugin re-registers with different devices.
p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, false, false)
p3 := plugin.NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, false, false)
err = p3.Start()
require.NoError(t, err)
// Wait for the third callback to be issued.
@ -234,12 +259,13 @@ func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitor
require.NoError(t, err)
updateChan := make(chan interface{})
w := newWrappedManagerImpl(socketName, m)
if callback != nil {
m.callback = callback
w.callback = callback
}
originalCallback := m.callback
m.callback = func(resourceName string, devices []pluginapi.Device) {
originalCallback := w.callback
w.callback = func(resourceName string, devices []pluginapi.Device) {
originalCallback(resourceName, devices)
updateChan <- new(interface{})
}
@ -247,14 +273,14 @@ func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitor
return []*v1.Pod{}
}
err = m.Start(activePods, &sourcesReadyStub{})
err = w.Start(activePods, &sourcesReadyStub{})
require.NoError(t, err)
return m, updateChan
return w, updateChan
}
func setupDevicePlugin(t *testing.T, devs []*pluginapi.Device, pluginSocketName string) *Stub {
p := NewDevicePluginStub(devs, pluginSocketName, testResourceName, false, false)
func setupDevicePlugin(t *testing.T, devs []*pluginapi.Device, pluginSocketName string) *plugin.Stub {
p := plugin.NewDevicePluginStub(devs, pluginSocketName, testResourceName, false, false)
err := p.Start()
require.NoError(t, err)
return p
@ -276,20 +302,20 @@ func runPluginManager(pluginManager pluginmanager.PluginManager) {
go pluginManager.Run(sourcesReady, wait.NeverStop)
}
func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *Stub) {
func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *plugin.Stub) {
m, updateChan := setupDeviceManager(t, devs, callback, socketName)
p := setupDevicePlugin(t, devs, pluginSocketName)
return m, updateChan, p
}
func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *Stub, pluginmanager.PluginManager) {
func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *plugin.Stub, pluginmanager.PluginManager) {
m, updateChan := setupDeviceManager(t, devs, callback, socketName)
p := setupDevicePlugin(t, devs, pluginSocketName)
pm := setupPluginManager(t, pluginSocketName, m)
return m, updateChan, p, pm
}
func cleanup(t *testing.T, m Manager, p *Stub) {
func cleanup(t *testing.T, m Manager, p *plugin.Stub) {
p.Stop()
m.Stop()
}
@ -365,6 +391,7 @@ func TestUpdateCapacityAllocatable(t *testing.T) {
// Tests adding another resource.
resourceName2 := "resource2"
e2 := &endpointImpl{}
e2.client = plugin.NewPluginClient(resourceName2, socketName, testManager)
testManager.endpoints[resourceName2] = endpointInfo{e: e2, opts: nil}
callback(resourceName2, devs)
capacity, allocatable, removedResources = testManager.GetCapacity()
@ -394,7 +421,7 @@ func TestUpdateCapacityAllocatable(t *testing.T) {
// Stops resourceName2 endpoint. Verifies its stopTime is set, allocate and
// preStartContainer calls return errors.
e2.stop()
e2.client.Disconnect()
as.False(e2.stopTime.IsZero())
_, err = e2.allocate([]string{"Device1"})
reflect.DeepEqual(err, fmt.Errorf(errEndpointStopped, e2))
@ -661,11 +688,6 @@ type MockEndpoint struct {
initChan chan []string
}
func (m *MockEndpoint) stop() {}
func (m *MockEndpoint) run() {}
func (m *MockEndpoint) callback(resourceName string, devices []pluginapi.Device) {}
func (m *MockEndpoint) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) {
m.initChan <- devs
return &pluginapi.PreStartContainerResponse{}, nil
@ -685,6 +707,8 @@ func (m *MockEndpoint) allocate(devs []string) (*pluginapi.AllocateResponse, err
return nil, nil
}
func (m *MockEndpoint) setStopTime(t time.Time) {}
func (m *MockEndpoint) isStopped() bool { return false }
func (m *MockEndpoint) stopGracePeriodExpired() bool { return false }
@ -706,15 +730,13 @@ func makePod(limits v1.ResourceList) *v1.Pod {
}
}
func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource) (*ManagerImpl, error) {
func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource) (*wrappedManagerImpl, error) {
monitorCallback := func(resourceName string, devices []pluginapi.Device) {}
ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
if err != nil {
return nil, err
}
testManager := &ManagerImpl{
socketdir: tmpDir,
callback: monitorCallback,
m := &ManagerImpl{
healthyDevices: make(map[string]sets.String),
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
@ -727,6 +749,11 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso
checkpointManager: ckm,
allDevices: NewResourceDeviceInstances(),
}
testManager := &wrappedManagerImpl{
ManagerImpl: m,
socketdir: tmpDir,
callback: monitorCallback,
}
for _, res := range testRes {
testManager.healthyDevices[res.resourceName] = sets.NewString(res.devs.Devices().UnsortedList()...)
@ -1141,13 +1168,16 @@ func TestUpdatePluginResources(t *testing.T) {
ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
as.Nil(err)
testManager := &ManagerImpl{
callback: monitorCallback,
m := &ManagerImpl{
allocatedDevices: make(map[string]sets.String),
healthyDevices: make(map[string]sets.String),
podDevices: newPodDevices(),
checkpointManager: ckm,
}
testManager := wrappedManagerImpl{
ManagerImpl: m,
callback: monitorCallback,
}
testManager.podDevices.devs[string(pod.UID)] = make(containerDevices)
// require one of resource1 and one of resource2