diff --git a/pkg/kubelet/cm/devicemanager/endpoint.go b/pkg/kubelet/cm/devicemanager/endpoint.go index 069ffc220e4..eed50de6561 100644 --- a/pkg/kubelet/cm/devicemanager/endpoint.go +++ b/pkg/kubelet/cm/devicemanager/endpoint.go @@ -37,8 +37,7 @@ type endpoint interface { stop() allocate(devs []string) (*pluginapi.AllocateResponse, error) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) - getDevices() []pluginapi.Device - callback(resourceName string, added, updated, deleted []pluginapi.Device) + callback(resourceName string, devices []pluginapi.Device) isStopped() bool stopGracePeriodExpired() bool } @@ -51,15 +50,13 @@ type endpointImpl struct { resourceName string stopTime time.Time - devices map[string]pluginapi.Device - mutex sync.Mutex - - cb monitorCallback + mutex sync.Mutex + cb monitorCallback } // newEndpoint creates a new endpoint for the given resourceName. // This is to be used during normal device plugin registration. -func newEndpointImpl(socketPath, resourceName string, devices map[string]pluginapi.Device, callback monitorCallback) (*endpointImpl, error) { +func newEndpointImpl(socketPath, resourceName string, callback monitorCallback) (*endpointImpl, error) { client, c, err := dial(socketPath) if err != nil { glog.Errorf("Can't create new endpoint with path %s err %v", socketPath, err) @@ -73,41 +70,26 @@ func newEndpointImpl(socketPath, resourceName string, devices map[string]plugina socketPath: socketPath, resourceName: resourceName, - devices: devices, - cb: callback, + cb: callback, }, nil } // newStoppedEndpointImpl creates a new endpoint for the given resourceName with stopTime set. // This is to be used during Kubelet restart, before the actual device plugin re-registers. -func newStoppedEndpointImpl(resourceName string, devices map[string]pluginapi.Device) *endpointImpl { +func newStoppedEndpointImpl(resourceName string) *endpointImpl { return &endpointImpl{ resourceName: resourceName, - devices: devices, stopTime: time.Now(), } } -func (e *endpointImpl) callback(resourceName string, added, updated, deleted []pluginapi.Device) { - e.cb(resourceName, added, updated, deleted) -} - -func (e *endpointImpl) getDevices() []pluginapi.Device { - e.mutex.Lock() - defer e.mutex.Unlock() - var devs []pluginapi.Device - - for _, d := range e.devices { - devs = append(devs, d) - } - - return devs +func (e *endpointImpl) callback(resourceName string, devices []pluginapi.Device) { + e.cb(resourceName, devices) } // run initializes ListAndWatch gRPC call for the device plugin and // blocks on receiving ListAndWatch gRPC stream updates. Each ListAndWatch -// stream update contains a new list of device states. listAndWatch compares the new -// device states with its cached states to get list of new, updated, and deleted devices. +// stream update contains a new list of device states. // It then issues a callback to pass this information to the device manager which // will adjust the resource available information accordingly. func (e *endpointImpl) run() { @@ -118,14 +100,6 @@ func (e *endpointImpl) run() { return } - devices := make(map[string]pluginapi.Device) - - e.mutex.Lock() - for _, d := range e.devices { - devices[d.ID] = d - } - e.mutex.Unlock() - for { response, err := stream.Recv() if err != nil { @@ -136,57 +110,12 @@ func (e *endpointImpl) run() { devs := response.Devices glog.V(2).Infof("State pushed for device plugin %s", e.resourceName) - newDevs := make(map[string]*pluginapi.Device) - var added, updated []pluginapi.Device - + var newDevs []pluginapi.Device for _, d := range devs { - dOld, ok := devices[d.ID] - newDevs[d.ID] = d - - if !ok { - glog.V(2).Infof("New device for Endpoint %s: %v", e.resourceName, d) - - devices[d.ID] = *d - added = append(added, *d) - - continue - } - - if d.Health == dOld.Health { - continue - } - - if d.Health == pluginapi.Unhealthy { - glog.Errorf("Device %s is now Unhealthy", d.ID) - } else if d.Health == pluginapi.Healthy { - glog.V(2).Infof("Device %s is now Healthy", d.ID) - } - - devices[d.ID] = *d - updated = append(updated, *d) + newDevs = append(newDevs, *d) } - var deleted []pluginapi.Device - for id, d := range devices { - if _, ok := newDevs[id]; ok { - continue - } - - glog.Errorf("Device %s was deleted", d.ID) - - deleted = append(deleted, d) - delete(devices, id) - } - - e.mutex.Lock() - // NOTE: Return a copy of 'devices' instead of returning a direct reference to local 'devices' - e.devices = make(map[string]pluginapi.Device) - for _, d := range devices { - e.devices[d.ID] = d - } - e.mutex.Unlock() - - e.callback(e.resourceName, added, updated, deleted) + e.callback(e.resourceName, newDevs) } } diff --git a/pkg/kubelet/cm/devicemanager/endpoint_test.go b/pkg/kubelet/cm/devicemanager/endpoint_test.go index 80ffb68d134..86756aac8a0 100644 --- a/pkg/kubelet/cm/devicemanager/endpoint_test.go +++ b/pkg/kubelet/cm/devicemanager/endpoint_test.go @@ -37,7 +37,7 @@ func TestNewEndpoint(t *testing.T) { {ID: "ADeviceId", Health: pluginapi.Healthy}, } - p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []pluginapi.Device) {}) + p, e := esetup(t, devs, socket, "mock", func(n string, d []pluginapi.Device) {}) defer ecleanup(t, p, e) } @@ -58,7 +58,7 @@ func TestRun(t *testing.T) { callbackCount := 0 callbackChan := make(chan int) - callback := func(n string, a, u, r []pluginapi.Device) { + callback := func(n string, devices []pluginapi.Device) { // Should be called twice: // one for plugin registration, one for plugin update. if callbackCount > 2 { @@ -67,23 +67,24 @@ func TestRun(t *testing.T) { // Check plugin registration if callbackCount == 0 { - require.Len(t, a, 3) - require.Len(t, u, 0) - require.Len(t, r, 0) + require.Len(t, devices, 3) + require.Equal(t, devices[0].ID, devs[0].ID) + require.Equal(t, devices[1].ID, devs[1].ID) + require.Equal(t, devices[2].ID, devs[2].ID) + require.Equal(t, devices[0].Health, devs[0].Health) + require.Equal(t, devices[1].Health, devs[1].Health) + require.Equal(t, devices[2].Health, devs[2].Health) } // Check plugin update if callbackCount == 1 { - require.Len(t, a, 1) - require.Len(t, u, 2) - require.Len(t, r, 1) - - require.Equal(t, a[0].ID, updated[2].ID) - require.Equal(t, u[0].ID, updated[0].ID) - require.Equal(t, u[0].Health, updated[0].Health) - require.Equal(t, u[1].ID, updated[1].ID) - require.Equal(t, u[1].Health, updated[1].Health) - require.Equal(t, r[0].ID, devs[1].ID) + require.Len(t, devices, 3) + require.Equal(t, devices[0].ID, updated[0].ID) + require.Equal(t, devices[1].ID, updated[1].ID) + require.Equal(t, devices[2].ID, updated[2].ID) + require.Equal(t, devices[0].Health, updated[0].Health) + require.Equal(t, devices[1].Health, updated[1].Health) + require.Equal(t, devices[2].Health, updated[2].Health) } callbackCount++ @@ -102,18 +103,7 @@ func TestRun(t *testing.T) { // Wait for the second callback to be issued. <-callbackChan - e.mutex.Lock() - defer e.mutex.Unlock() - - require.Len(t, e.devices, 3) - for _, dref := range updated { - d, ok := e.devices[dref.ID] - - require.True(t, ok) - require.Equal(t, d.ID, dref.ID) - require.Equal(t, d.Health, dref.Health) - } - + require.Equal(t, callbackCount, 2) } func TestAllocate(t *testing.T) { @@ -123,7 +113,7 @@ func TestAllocate(t *testing.T) { } callbackCount := 0 callbackChan := make(chan int) - p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []pluginapi.Device) { + p, e := esetup(t, devs, socket, "mock", func(n string, d []pluginapi.Device) { callbackCount++ callbackChan <- callbackCount }) @@ -169,23 +159,13 @@ func TestAllocate(t *testing.T) { require.Equal(t, resp, respOut) } -func TestGetDevices(t *testing.T) { - e := endpointImpl{ - devices: map[string]pluginapi.Device{ - "ADeviceId": {ID: "ADeviceId", Health: pluginapi.Healthy}, - }, - } - devs := e.getDevices() - require.Len(t, devs, 1) -} - func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback monitorCallback) (*Stub, *endpointImpl) { p := NewDevicePluginStub(devs, socket, resourceName, false) err := p.Start() require.NoError(t, err) - e, err := newEndpointImpl(socket, resourceName, make(map[string]pluginapi.Device), callback) + e, err := newEndpointImpl(socket, resourceName, callback) require.NoError(t, err) return p, e diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index f10324d360f..3969ce836c4 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -49,7 +49,7 @@ type ActivePodsFunc func() []*v1.Pod // monitorCallback is the function called when a device's health state changes, // or new devices are reported, or old devices are deleted. // Updated contains the most recent state of the Device. -type monitorCallback func(resourceName string, added, updated, deleted []pluginapi.Device) +type monitorCallback func(resourceName string, devices []pluginapi.Device) // ManagerImpl is the structure in charge of managing Device Plugins. type ManagerImpl struct { @@ -133,28 +133,17 @@ func newManagerImpl(socketPath string) (*ManagerImpl, error) { return manager, nil } -func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, added, updated, deleted []pluginapi.Device) { - kept := append(updated, added...) +func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) { m.mutex.Lock() - if _, ok := m.healthyDevices[resourceName]; !ok { - m.healthyDevices[resourceName] = sets.NewString() - } - if _, ok := m.unhealthyDevices[resourceName]; !ok { - m.unhealthyDevices[resourceName] = sets.NewString() - } - for _, dev := range kept { + m.healthyDevices[resourceName] = sets.NewString() + m.unhealthyDevices[resourceName] = sets.NewString() + for _, dev := range devices { if dev.Health == pluginapi.Healthy { m.healthyDevices[resourceName].Insert(dev.ID) - m.unhealthyDevices[resourceName].Delete(dev.ID) } else { m.unhealthyDevices[resourceName].Insert(dev.ID) - m.healthyDevices[resourceName].Delete(dev.ID) } } - for _, dev := range deleted { - m.healthyDevices[resourceName].Delete(dev.ID) - m.unhealthyDevices[resourceName].Delete(dev.ID) - } m.mutex.Unlock() m.writeCheckpoint() } @@ -277,21 +266,6 @@ func (m *ManagerImpl) isVersionCompatibleWithPlugin(versions []string) bool { return false } -// Devices is the map of devices that are known by the Device -// Plugin manager with the kind of the devices as key -func (m *ManagerImpl) Devices() map[string][]pluginapi.Device { - m.mutex.Lock() - defer m.mutex.Unlock() - - devs := make(map[string][]pluginapi.Device) - for k, e := range m.endpoints { - glog.V(3).Infof("Endpoint: %+v: %p", k, e) - devs[k] = e.getDevices() - } - - return devs -} - // Allocate is the call that you can use to allocate a set of devices // from the registered device plugins. func (m *ManagerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { @@ -376,7 +350,7 @@ func (m *ManagerImpl) Stop() error { func (m *ManagerImpl) addEndpointProbeMode(resourceName string, socketPath string) (chan bool, error) { chanForAckOfNotification := make(chan bool) - new, err := newEndpointImpl(socketPath, resourceName, make(map[string]pluginapi.Device), m.callback) + new, err := newEndpointImpl(socketPath, resourceName, m.callback) if err != nil { glog.Errorf("Failed to dial device plugin with socketPath %s: %v", socketPath, err) return nil, fmt.Errorf("Failed to dial device plugin with socketPath %s: %v", socketPath, err) @@ -407,26 +381,8 @@ func (m *ManagerImpl) registerEndpoint(resourceName string, options *pluginapi.D if options != nil { m.pluginOpts[resourceName] = options } - old, ok := m.endpoints[resourceName] - if ok && old != nil { - // Pass devices of previous endpoint into re-registered one, - // to avoid potential orphaned devices upon re-registration - devices := make(map[string]pluginapi.Device) - for _, device := range old.getDevices() { - device.Health = pluginapi.Unhealthy - devices[device.ID] = device - } - e.devices = devices - } - // Associates the newly created endpoint with the corresponding resource name. - // Stops existing endpoint if there is any. m.endpoints[resourceName] = e glog.V(2).Infof("Registered endpoint %v", e) - - if old != nil { - old.stop() - } - return } func (m *ManagerImpl) runEndpoint(resourceName string, e *endpointImpl) { @@ -441,13 +397,12 @@ func (m *ManagerImpl) runEndpoint(resourceName string, e *endpointImpl) { } func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { - new, err := newEndpointImpl(filepath.Join(m.socketdir, r.Endpoint), r.ResourceName, make(map[string]pluginapi.Device), m.callback) + new, err := newEndpointImpl(filepath.Join(m.socketdir, r.Endpoint), r.ResourceName, m.callback) if err != nil { glog.Errorf("Failed to dial device plugin with request %v: %v", r, err) return } m.registerEndpoint(r.ResourceName, r.Options, new) - go func() { m.runEndpoint(r.ResourceName, new) }() @@ -567,7 +522,7 @@ func (m *ManagerImpl) readCheckpoint() error { // will stay zero till the corresponding device plugin re-registers. m.healthyDevices[resource] = sets.NewString() m.unhealthyDevices[resource] = sets.NewString() - m.endpoints[resource] = newStoppedEndpointImpl(resource, make(map[string]pluginapi.Device)) + m.endpoints[resource] = newStoppedEndpointImpl(resource) } return nil } diff --git a/pkg/kubelet/cm/devicemanager/manager_stub.go b/pkg/kubelet/cm/devicemanager/manager_stub.go index ce852f697a4..66f8d1004cd 100644 --- a/pkg/kubelet/cm/devicemanager/manager_stub.go +++ b/pkg/kubelet/cm/devicemanager/manager_stub.go @@ -18,7 +18,6 @@ package devicemanager import ( "k8s.io/api/core/v1" - pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher" @@ -43,11 +42,6 @@ func (h *ManagerStub) Stop() error { return nil } -// Devices returns an empty map. -func (h *ManagerStub) Devices() map[string][]pluginapi.Device { - return make(map[string][]pluginapi.Device) -} - // Allocate simply returns nil. func (h *ManagerStub) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { return nil diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index da64d5ae637..e4b25844901 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -22,7 +22,6 @@ import ( "os" "path/filepath" "reflect" - "sync/atomic" "testing" "time" @@ -68,7 +67,9 @@ func TestNewManagerImplStart(t *testing.T) { socketDir, socketName, pluginSocketName, err := tmpSocketDir() require.NoError(t, err) defer os.RemoveAll(socketDir) - m, p := setup(t, []*pluginapi.Device{}, func(n string, a, u, r []pluginapi.Device) {}, socketName, pluginSocketName, false) + m, _, p := setup(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName) + cleanup(t, m, p, nil) + // Stop should tolerate being called more than once. cleanup(t, m, p, nil) } @@ -76,7 +77,7 @@ func TestNewManagerImplStartProbeMode(t *testing.T) { socketDir, socketName, pluginSocketName, err := tmpSocketDir() require.NoError(t, err) defer os.RemoveAll(socketDir) - m, p, w := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, a, u, r []pluginapi.Device) {}, socketName, pluginSocketName) + m, _, p, w := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName) cleanup(t, m, p, w) } @@ -95,68 +96,55 @@ func TestDevicePluginReRegistration(t *testing.T) { {ID: "Dev3", Health: pluginapi.Healthy}, } for _, preStartContainerFlag := range []bool{false, true} { - - expCallbackCount := int32(0) - callbackCount := int32(0) - callbackChan := make(chan int32) - callback := func(n string, a, u, r []pluginapi.Device) { - callbackCount++ - if callbackCount > atomic.LoadInt32(&expCallbackCount) { - t.FailNow() - } - callbackChan <- callbackCount - } - m, p1 := setup(t, devs, callback, socketName, pluginSocketName, preStartContainerFlag) - atomic.StoreInt32(&expCallbackCount, 1) + m, ch, p1 := setup(t, devs, nil, socketName, pluginSocketName) p1.Register(socketName, testResourceName, "") - // Wait for the first callback to be issued. select { - case <-callbackChan: - break - case <-time.After(time.Second): - t.FailNow() + case <-ch: + case <-time.After(5 * time.Second): + t.Fatalf("timeout while waiting for manager update") } - devices := m.Devices() - require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.") + capacity, allocatable, _ := m.GetCapacity() + resourceCapacity, _ := capacity[v1.ResourceName(testResourceName)] + resourceAllocatable, _ := allocatable[v1.ResourceName(testResourceName)] + require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable") + require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.") p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, preStartContainerFlag) err = p2.Start() require.NoError(t, err) - atomic.StoreInt32(&expCallbackCount, 2) p2.Register(socketName, testResourceName, "") - // Wait for the second callback to be issued. - select { - case <-callbackChan: - break - case <-time.After(time.Second): - t.FailNow() - } - devices2 := m.Devices() - require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.") + select { + case <-ch: + case <-time.After(5 * time.Second): + t.Fatalf("timeout while waiting for manager update") + } + capacity, allocatable, _ = m.GetCapacity() + resourceCapacity, _ = capacity[v1.ResourceName(testResourceName)] + resourceAllocatable, _ = allocatable[v1.ResourceName(testResourceName)] + require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable") + require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices shouldn't change.") // Test the scenario that a plugin re-registers with different devices. p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, preStartContainerFlag) err = p3.Start() require.NoError(t, err) - atomic.StoreInt32(&expCallbackCount, 3) p3.Register(socketName, testResourceName, "") - // Wait for the second callback to be issued. - select { - case <-callbackChan: - break - case <-time.After(time.Second): - t.FailNow() - } - devices3 := m.Devices() - require.Equal(t, 1, len(devices3[testResourceName]), "Devices of plugin previously registered should be removed.") + select { + case <-ch: + case <-time.After(5 * time.Second): + t.Fatalf("timeout while waiting for manager update") + } + capacity, allocatable, _ = m.GetCapacity() + resourceCapacity, _ = capacity[v1.ResourceName(testResourceName)] + resourceAllocatable, _ = allocatable[v1.ResourceName(testResourceName)] + require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable") + require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of plugin previously registered should be removed.") p2.Stop() p3.Stop() cleanup(t, m, p1, nil) - close(callbackChan) - } } @@ -177,105 +165,106 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) { {ID: "Dev3", Health: pluginapi.Healthy}, } - expCallbackCount := int32(0) - callbackCount := int32(0) - callbackChan := make(chan int32) - callback := func(n string, a, u, r []pluginapi.Device) { - callbackCount++ - if callbackCount > atomic.LoadInt32(&expCallbackCount) { - t.FailNow() - } - callbackChan <- callbackCount - } - m, p1, w := setupInProbeMode(t, devs, callback, socketName, pluginSocketName) - atomic.StoreInt32(&expCallbackCount, 1) + m, ch, p1, w := setupInProbeMode(t, devs, nil, socketName, pluginSocketName) + // Wait for the first callback to be issued. select { - case <-callbackChan: - break - case <-time.After(time.Second): + case <-ch: + case <-time.After(5 * time.Second): t.FailNow() } - devices := m.Devices() - require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.") + capacity, allocatable, _ := m.GetCapacity() + resourceCapacity, _ := capacity[v1.ResourceName(testResourceName)] + resourceAllocatable, _ := allocatable[v1.ResourceName(testResourceName)] + require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable") + require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.") p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, false) err = p2.Start() require.NoError(t, err) - atomic.StoreInt32(&expCallbackCount, 2) // Wait for the second callback to be issued. select { - case <-callbackChan: - break - case <-time.After(time.Second): + case <-ch: + case <-time.After(5 * time.Second): t.FailNow() } - devices2 := m.Devices() - require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.") + capacity, allocatable, _ = m.GetCapacity() + resourceCapacity, _ = capacity[v1.ResourceName(testResourceName)] + resourceAllocatable, _ = allocatable[v1.ResourceName(testResourceName)] + require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable") + require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.") // Test the scenario that a plugin re-registers with different devices. p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, false) err = p3.Start() require.NoError(t, err) - atomic.StoreInt32(&expCallbackCount, 3) - // Wait for the second callback to be issued. + // Wait for the third callback to be issued. select { - case <-callbackChan: - break - case <-time.After(time.Second): + case <-ch: + case <-time.After(5 * time.Second): t.FailNow() } - devices3 := m.Devices() - require.Equal(t, 1, len(devices3[testResourceName]), "Devices of plugin previously registered should be removed.") + capacity, allocatable, _ = m.GetCapacity() + resourceCapacity, _ = capacity[v1.ResourceName(testResourceName)] + resourceAllocatable, _ = allocatable[v1.ResourceName(testResourceName)] + require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable") + require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of previous registered should be removed") p2.Stop() p3.Stop() cleanup(t, m, p1, w) - close(callbackChan) } -func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string, preStartContainerFlag bool) (Manager, *Stub) { +func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string) (Manager, <-chan interface{}) { m, err := newManagerImpl(socketName) require.NoError(t, err) + updateChan := make(chan interface{}) - m.callback = callback + if callback != nil { + m.callback = callback + } + originalCallback := m.callback + m.callback = func(resourceName string, devices []pluginapi.Device) { + originalCallback(resourceName, devices) + updateChan <- new(interface{}) + } activePods := func() []*v1.Pod { return []*v1.Pod{} } + err = m.Start(activePods, &sourcesReadyStub{}) require.NoError(t, err) - p := NewDevicePluginStub(devs, pluginSocketName, testResourceName, preStartContainerFlag) - err = p.Start() - require.NoError(t, err) - - return m, p + return m, updateChan } -func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, *Stub, *pluginwatcher.Watcher) { - w := pluginwatcher.NewWatcher(filepath.Dir(pluginSocketName)) - - m, err := newManagerImpl(socketName) +func setupDevicePlugin(t *testing.T, devs []*pluginapi.Device, pluginSocketName string) *Stub { + p := NewDevicePluginStub(devs, pluginSocketName, testResourceName, false) + err := p.Start() require.NoError(t, err) + return p +} +func setupPluginWatcher(pluginSocketName string, m Manager) *pluginwatcher.Watcher { + w := pluginwatcher.NewWatcher(filepath.Dir(pluginSocketName)) w.AddHandler(watcherapi.DevicePlugin, m.GetWatcherCallback()) w.Start() + return &w +} - m.callback = callback +func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *Stub) { + m, updateChan := setupDeviceManager(t, devs, callback, socketName) + p := setupDevicePlugin(t, devs, pluginSocketName) + return m, updateChan, p +} - activePods := func() []*v1.Pod { - return []*v1.Pod{} - } - err = m.Start(activePods, &sourcesReadyStub{}) - require.NoError(t, err) - - p := NewDevicePluginStub(devs, pluginSocketName, testResourceName, false /*preStart*/) - err = p.Start() - require.NoError(t, err) - - return m, p, &w +func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *Stub, *pluginwatcher.Watcher) { + m, updateChan := setupDeviceManager(t, devs, callback, socketName) + w := setupPluginWatcher(pluginSocketName, m) + p := setupDevicePlugin(t, devs, pluginSocketName) + return m, updateChan, p, w } func cleanup(t *testing.T, m Manager, p *Stub, w *pluginwatcher.Watcher) { @@ -305,9 +294,9 @@ func TestUpdateCapacityAllocatable(t *testing.T) { // Adds three devices for resource1, two healthy and one unhealthy. // Expects capacity for resource1 to be 2. resourceName1 := "domain1.com/resource1" - e1 := &endpointImpl{devices: make(map[string]pluginapi.Device)} + e1 := &endpointImpl{} testManager.endpoints[resourceName1] = e1 - callback(resourceName1, devs, []pluginapi.Device{}, []pluginapi.Device{}) + callback(resourceName1, devs) capacity, allocatable, removedResources := testManager.GetCapacity() resource1Capacity, ok := capacity[v1.ResourceName(resourceName1)] as.True(ok) @@ -318,7 +307,8 @@ func TestUpdateCapacityAllocatable(t *testing.T) { as.Equal(0, len(removedResources)) // Deletes an unhealthy device should NOT change allocatable but change capacity. - callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[2]}) + devs1 := devs[:len(devs)-1] + callback(resourceName1, devs1) capacity, allocatable, removedResources = testManager.GetCapacity() resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)] as.True(ok) @@ -329,34 +319,34 @@ func TestUpdateCapacityAllocatable(t *testing.T) { as.Equal(0, len(removedResources)) // Updates a healthy device to unhealthy should reduce allocatable by 1. - dev2 := devs[1] - dev2.Health = pluginapi.Unhealthy - callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{dev2}, []pluginapi.Device{}) + devs[1].Health = pluginapi.Unhealthy + callback(resourceName1, devs) capacity, allocatable, removedResources = testManager.GetCapacity() resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)] as.True(ok) resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)] as.True(ok) - as.Equal(int64(2), resource1Capacity.Value()) + as.Equal(int64(3), resource1Capacity.Value()) as.Equal(int64(1), resource1Allocatable.Value()) as.Equal(0, len(removedResources)) // Deletes a healthy device should reduce capacity and allocatable by 1. - callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[0]}) + devs2 := devs[1:] + callback(resourceName1, devs2) capacity, allocatable, removedResources = testManager.GetCapacity() resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)] as.True(ok) resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)] as.True(ok) as.Equal(int64(0), resource1Allocatable.Value()) - as.Equal(int64(1), resource1Capacity.Value()) + as.Equal(int64(2), resource1Capacity.Value()) as.Equal(0, len(removedResources)) // Tests adding another resource. resourceName2 := "resource2" - e2 := &endpointImpl{devices: make(map[string]pluginapi.Device)} + e2 := &endpointImpl{} testManager.endpoints[resourceName2] = e2 - callback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{}) + callback(resourceName2, devs) capacity, allocatable, removedResources = testManager.GetCapacity() as.Equal(2, len(capacity)) resource2Capacity, ok := capacity[v1.ResourceName(resourceName2)] @@ -364,7 +354,7 @@ func TestUpdateCapacityAllocatable(t *testing.T) { resource2Allocatable, ok := allocatable[v1.ResourceName(resourceName2)] as.True(ok) as.Equal(int64(3), resource2Capacity.Value()) - as.Equal(int64(2), resource2Allocatable.Value()) + as.Equal(int64(1), resource2Allocatable.Value()) as.Equal(0, len(removedResources)) // Expires resourceName1 endpoint. Verifies testManager.GetCapacity() reports that resourceName1 @@ -552,11 +542,7 @@ type MockEndpoint struct { func (m *MockEndpoint) stop() {} func (m *MockEndpoint) run() {} -func (m *MockEndpoint) getDevices() []pluginapi.Device { - return []pluginapi.Device{} -} - -func (m *MockEndpoint) callback(resourceName string, added, updated, deleted []pluginapi.Device) {} +func (m *MockEndpoint) callback(resourceName string, devices []pluginapi.Device) {} func (m *MockEndpoint) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) { m.initChan <- devs @@ -592,7 +578,7 @@ func makePod(limits v1.ResourceList) *v1.Pod { } func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource, opts map[string]*pluginapi.DevicePluginOptions) (*ManagerImpl, error) { - monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {} + monitorCallback := func(resourceName string, devices []pluginapi.Device) {} ckm, err := checkpointmanager.NewCheckpointManager(tmpDir) if err != nil { return nil, err @@ -858,7 +844,7 @@ func TestSanitizeNodeAllocatable(t *testing.T) { devID2 := "dev2" as := assert.New(t) - monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {} + monitorCallback := func(resourceName string, devices []pluginapi.Device) {} tmpDir, err := ioutil.TempDir("", "checkpoint") as.Nil(err) diff --git a/pkg/kubelet/cm/devicemanager/types.go b/pkg/kubelet/cm/devicemanager/types.go index c6ba5f6490f..f9c4a17a12b 100644 --- a/pkg/kubelet/cm/devicemanager/types.go +++ b/pkg/kubelet/cm/devicemanager/types.go @@ -20,7 +20,6 @@ import ( "time" "k8s.io/api/core/v1" - pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/lifecycle" @@ -33,11 +32,6 @@ type Manager interface { // Start starts device plugin registration service. Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error - // Devices is the map of devices that have registered themselves - // against the manager. - // The map key is the ResourceName of the device plugins. - Devices() map[string][]pluginapi.Device - // Allocate configures and assigns devices to pods. The pods are provided // through the pod admission attributes in the attrs argument. From the // requested device resources, Allocate will communicate with the owning