From 7101c17498f217fe9be3d6ea9f3175f125c50458 Mon Sep 17 00:00:00 2001 From: hui luo Date: Sun, 29 Jul 2018 17:50:18 -0700 Subject: [PATCH] While reviewing devicemanager code, found the caching layer on endpoint is redundant. Here are the 3 related objects in picture: devicemanager <-> endpoint <-> plugin Plugin is the source of truth for devices and device health status. devicemanager maintain healthyDevices, unhealthyDevices, allocatedDevices based on updates from plugin. So there is no point for endpoint caching devices, this patch is removing this caching layer on endpoint, Also removing the Manager.Devices() since i didn't find any caller of this other than test, i am adding a notification channel to facilitate testing, If we need to get all devices from manager in future, it just need to return healthyDevices + unhealthyDevices, we don't have to call endpoint after all. This patch makes code more readable, data model been simplified. --- pkg/kubelet/cm/devicemanager/endpoint.go | 95 +------- pkg/kubelet/cm/devicemanager/endpoint_test.go | 58 ++--- pkg/kubelet/cm/devicemanager/manager.go | 61 +---- pkg/kubelet/cm/devicemanager/manager_stub.go | 6 - pkg/kubelet/cm/devicemanager/manager_test.go | 222 ++++++++---------- pkg/kubelet/cm/devicemanager/types.go | 6 - 6 files changed, 143 insertions(+), 305 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/endpoint.go b/pkg/kubelet/cm/devicemanager/endpoint.go index 069ffc220e4..eed50de6561 100644 --- a/pkg/kubelet/cm/devicemanager/endpoint.go +++ b/pkg/kubelet/cm/devicemanager/endpoint.go @@ -37,8 +37,7 @@ type endpoint interface { stop() allocate(devs []string) (*pluginapi.AllocateResponse, error) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) - getDevices() []pluginapi.Device - callback(resourceName string, added, updated, deleted []pluginapi.Device) + callback(resourceName string, devices []pluginapi.Device) isStopped() bool stopGracePeriodExpired() bool } @@ -51,15 +50,13 @@ type endpointImpl struct { resourceName string stopTime time.Time - devices map[string]pluginapi.Device - mutex sync.Mutex - - cb monitorCallback + mutex sync.Mutex + cb monitorCallback } // newEndpoint creates a new endpoint for the given resourceName. // This is to be used during normal device plugin registration. -func newEndpointImpl(socketPath, resourceName string, devices map[string]pluginapi.Device, callback monitorCallback) (*endpointImpl, error) { +func newEndpointImpl(socketPath, resourceName string, callback monitorCallback) (*endpointImpl, error) { client, c, err := dial(socketPath) if err != nil { glog.Errorf("Can't create new endpoint with path %s err %v", socketPath, err) @@ -73,41 +70,26 @@ func newEndpointImpl(socketPath, resourceName string, devices map[string]plugina socketPath: socketPath, resourceName: resourceName, - devices: devices, - cb: callback, + cb: callback, }, nil } // newStoppedEndpointImpl creates a new endpoint for the given resourceName with stopTime set. // This is to be used during Kubelet restart, before the actual device plugin re-registers. -func newStoppedEndpointImpl(resourceName string, devices map[string]pluginapi.Device) *endpointImpl { +func newStoppedEndpointImpl(resourceName string) *endpointImpl { return &endpointImpl{ resourceName: resourceName, - devices: devices, stopTime: time.Now(), } } -func (e *endpointImpl) callback(resourceName string, added, updated, deleted []pluginapi.Device) { - e.cb(resourceName, added, updated, deleted) -} - -func (e *endpointImpl) getDevices() []pluginapi.Device { - e.mutex.Lock() - defer e.mutex.Unlock() - var devs []pluginapi.Device - - for _, d := range e.devices { - devs = append(devs, d) - } - - return devs +func (e *endpointImpl) callback(resourceName string, devices []pluginapi.Device) { + e.cb(resourceName, devices) } // run initializes ListAndWatch gRPC call for the device plugin and // blocks on receiving ListAndWatch gRPC stream updates. Each ListAndWatch -// stream update contains a new list of device states. listAndWatch compares the new -// device states with its cached states to get list of new, updated, and deleted devices. +// stream update contains a new list of device states. // It then issues a callback to pass this information to the device manager which // will adjust the resource available information accordingly. func (e *endpointImpl) run() { @@ -118,14 +100,6 @@ func (e *endpointImpl) run() { return } - devices := make(map[string]pluginapi.Device) - - e.mutex.Lock() - for _, d := range e.devices { - devices[d.ID] = d - } - e.mutex.Unlock() - for { response, err := stream.Recv() if err != nil { @@ -136,57 +110,12 @@ func (e *endpointImpl) run() { devs := response.Devices glog.V(2).Infof("State pushed for device plugin %s", e.resourceName) - newDevs := make(map[string]*pluginapi.Device) - var added, updated []pluginapi.Device - + var newDevs []pluginapi.Device for _, d := range devs { - dOld, ok := devices[d.ID] - newDevs[d.ID] = d - - if !ok { - glog.V(2).Infof("New device for Endpoint %s: %v", e.resourceName, d) - - devices[d.ID] = *d - added = append(added, *d) - - continue - } - - if d.Health == dOld.Health { - continue - } - - if d.Health == pluginapi.Unhealthy { - glog.Errorf("Device %s is now Unhealthy", d.ID) - } else if d.Health == pluginapi.Healthy { - glog.V(2).Infof("Device %s is now Healthy", d.ID) - } - - devices[d.ID] = *d - updated = append(updated, *d) + newDevs = append(newDevs, *d) } - var deleted []pluginapi.Device - for id, d := range devices { - if _, ok := newDevs[id]; ok { - continue - } - - glog.Errorf("Device %s was deleted", d.ID) - - deleted = append(deleted, d) - delete(devices, id) - } - - e.mutex.Lock() - // NOTE: Return a copy of 'devices' instead of returning a direct reference to local 'devices' - e.devices = make(map[string]pluginapi.Device) - for _, d := range devices { - e.devices[d.ID] = d - } - e.mutex.Unlock() - - e.callback(e.resourceName, added, updated, deleted) + e.callback(e.resourceName, newDevs) } } diff --git a/pkg/kubelet/cm/devicemanager/endpoint_test.go b/pkg/kubelet/cm/devicemanager/endpoint_test.go index 80ffb68d134..86756aac8a0 100644 --- a/pkg/kubelet/cm/devicemanager/endpoint_test.go +++ b/pkg/kubelet/cm/devicemanager/endpoint_test.go @@ -37,7 +37,7 @@ func TestNewEndpoint(t *testing.T) { {ID: "ADeviceId", Health: pluginapi.Healthy}, } - p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []pluginapi.Device) {}) + p, e := esetup(t, devs, socket, "mock", func(n string, d []pluginapi.Device) {}) defer ecleanup(t, p, e) } @@ -58,7 +58,7 @@ func TestRun(t *testing.T) { callbackCount := 0 callbackChan := make(chan int) - callback := func(n string, a, u, r []pluginapi.Device) { + callback := func(n string, devices []pluginapi.Device) { // Should be called twice: // one for plugin registration, one for plugin update. if callbackCount > 2 { @@ -67,23 +67,24 @@ func TestRun(t *testing.T) { // Check plugin registration if callbackCount == 0 { - require.Len(t, a, 3) - require.Len(t, u, 0) - require.Len(t, r, 0) + require.Len(t, devices, 3) + require.Equal(t, devices[0].ID, devs[0].ID) + require.Equal(t, devices[1].ID, devs[1].ID) + require.Equal(t, devices[2].ID, devs[2].ID) + require.Equal(t, devices[0].Health, devs[0].Health) + require.Equal(t, devices[1].Health, devs[1].Health) + require.Equal(t, devices[2].Health, devs[2].Health) } // Check plugin update if callbackCount == 1 { - require.Len(t, a, 1) - require.Len(t, u, 2) - require.Len(t, r, 1) - - require.Equal(t, a[0].ID, updated[2].ID) - require.Equal(t, u[0].ID, updated[0].ID) - require.Equal(t, u[0].Health, updated[0].Health) - require.Equal(t, u[1].ID, updated[1].ID) - require.Equal(t, u[1].Health, updated[1].Health) - require.Equal(t, r[0].ID, devs[1].ID) + require.Len(t, devices, 3) + require.Equal(t, devices[0].ID, updated[0].ID) + require.Equal(t, devices[1].ID, updated[1].ID) + require.Equal(t, devices[2].ID, updated[2].ID) + require.Equal(t, devices[0].Health, updated[0].Health) + require.Equal(t, devices[1].Health, updated[1].Health) + require.Equal(t, devices[2].Health, updated[2].Health) } callbackCount++ @@ -102,18 +103,7 @@ func TestRun(t *testing.T) { // Wait for the second callback to be issued. <-callbackChan - e.mutex.Lock() - defer e.mutex.Unlock() - - require.Len(t, e.devices, 3) - for _, dref := range updated { - d, ok := e.devices[dref.ID] - - require.True(t, ok) - require.Equal(t, d.ID, dref.ID) - require.Equal(t, d.Health, dref.Health) - } - + require.Equal(t, callbackCount, 2) } func TestAllocate(t *testing.T) { @@ -123,7 +113,7 @@ func TestAllocate(t *testing.T) { } callbackCount := 0 callbackChan := make(chan int) - p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []pluginapi.Device) { + p, e := esetup(t, devs, socket, "mock", func(n string, d []pluginapi.Device) { callbackCount++ callbackChan <- callbackCount }) @@ -169,23 +159,13 @@ func TestAllocate(t *testing.T) { require.Equal(t, resp, respOut) } -func TestGetDevices(t *testing.T) { - e := endpointImpl{ - devices: map[string]pluginapi.Device{ - "ADeviceId": {ID: "ADeviceId", Health: pluginapi.Healthy}, - }, - } - devs := e.getDevices() - require.Len(t, devs, 1) -} - func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback monitorCallback) (*Stub, *endpointImpl) { p := NewDevicePluginStub(devs, socket, resourceName, false) err := p.Start() require.NoError(t, err) - e, err := newEndpointImpl(socket, resourceName, make(map[string]pluginapi.Device), callback) + e, err := newEndpointImpl(socket, resourceName, callback) require.NoError(t, err) return p, e diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index f10324d360f..3969ce836c4 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -49,7 +49,7 @@ type ActivePodsFunc func() []*v1.Pod // monitorCallback is the function called when a device's health state changes, // or new devices are reported, or old devices are deleted. // Updated contains the most recent state of the Device. -type monitorCallback func(resourceName string, added, updated, deleted []pluginapi.Device) +type monitorCallback func(resourceName string, devices []pluginapi.Device) // ManagerImpl is the structure in charge of managing Device Plugins. type ManagerImpl struct { @@ -133,28 +133,17 @@ func newManagerImpl(socketPath string) (*ManagerImpl, error) { return manager, nil } -func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, added, updated, deleted []pluginapi.Device) { - kept := append(updated, added...) +func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) { m.mutex.Lock() - if _, ok := m.healthyDevices[resourceName]; !ok { - m.healthyDevices[resourceName] = sets.NewString() - } - if _, ok := m.unhealthyDevices[resourceName]; !ok { - m.unhealthyDevices[resourceName] = sets.NewString() - } - for _, dev := range kept { + m.healthyDevices[resourceName] = sets.NewString() + m.unhealthyDevices[resourceName] = sets.NewString() + for _, dev := range devices { if dev.Health == pluginapi.Healthy { m.healthyDevices[resourceName].Insert(dev.ID) - m.unhealthyDevices[resourceName].Delete(dev.ID) } else { m.unhealthyDevices[resourceName].Insert(dev.ID) - m.healthyDevices[resourceName].Delete(dev.ID) } } - for _, dev := range deleted { - m.healthyDevices[resourceName].Delete(dev.ID) - m.unhealthyDevices[resourceName].Delete(dev.ID) - } m.mutex.Unlock() m.writeCheckpoint() } @@ -277,21 +266,6 @@ func (m *ManagerImpl) isVersionCompatibleWithPlugin(versions []string) bool { return false } -// Devices is the map of devices that are known by the Device -// Plugin manager with the kind of the devices as key -func (m *ManagerImpl) Devices() map[string][]pluginapi.Device { - m.mutex.Lock() - defer m.mutex.Unlock() - - devs := make(map[string][]pluginapi.Device) - for k, e := range m.endpoints { - glog.V(3).Infof("Endpoint: %+v: %p", k, e) - devs[k] = e.getDevices() - } - - return devs -} - // Allocate is the call that you can use to allocate a set of devices // from the registered device plugins. func (m *ManagerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { @@ -376,7 +350,7 @@ func (m *ManagerImpl) Stop() error { func (m *ManagerImpl) addEndpointProbeMode(resourceName string, socketPath string) (chan bool, error) { chanForAckOfNotification := make(chan bool) - new, err := newEndpointImpl(socketPath, resourceName, make(map[string]pluginapi.Device), m.callback) + new, err := newEndpointImpl(socketPath, resourceName, m.callback) if err != nil { glog.Errorf("Failed to dial device plugin with socketPath %s: %v", socketPath, err) return nil, fmt.Errorf("Failed to dial device plugin with socketPath %s: %v", socketPath, err) @@ -407,26 +381,8 @@ func (m *ManagerImpl) registerEndpoint(resourceName string, options *pluginapi.D if options != nil { m.pluginOpts[resourceName] = options } - old, ok := m.endpoints[resourceName] - if ok && old != nil { - // Pass devices of previous endpoint into re-registered one, - // to avoid potential orphaned devices upon re-registration - devices := make(map[string]pluginapi.Device) - for _, device := range old.getDevices() { - device.Health = pluginapi.Unhealthy - devices[device.ID] = device - } - e.devices = devices - } - // Associates the newly created endpoint with the corresponding resource name. - // Stops existing endpoint if there is any. m.endpoints[resourceName] = e glog.V(2).Infof("Registered endpoint %v", e) - - if old != nil { - old.stop() - } - return } func (m *ManagerImpl) runEndpoint(resourceName string, e *endpointImpl) { @@ -441,13 +397,12 @@ func (m *ManagerImpl) runEndpoint(resourceName string, e *endpointImpl) { } func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { - new, err := newEndpointImpl(filepath.Join(m.socketdir, r.Endpoint), r.ResourceName, make(map[string]pluginapi.Device), m.callback) + new, err := newEndpointImpl(filepath.Join(m.socketdir, r.Endpoint), r.ResourceName, m.callback) if err != nil { glog.Errorf("Failed to dial device plugin with request %v: %v", r, err) return } m.registerEndpoint(r.ResourceName, r.Options, new) - go func() { m.runEndpoint(r.ResourceName, new) }() @@ -567,7 +522,7 @@ func (m *ManagerImpl) readCheckpoint() error { // will stay zero till the corresponding device plugin re-registers. m.healthyDevices[resource] = sets.NewString() m.unhealthyDevices[resource] = sets.NewString() - m.endpoints[resource] = newStoppedEndpointImpl(resource, make(map[string]pluginapi.Device)) + m.endpoints[resource] = newStoppedEndpointImpl(resource) } return nil } diff --git a/pkg/kubelet/cm/devicemanager/manager_stub.go b/pkg/kubelet/cm/devicemanager/manager_stub.go index ce852f697a4..66f8d1004cd 100644 --- a/pkg/kubelet/cm/devicemanager/manager_stub.go +++ b/pkg/kubelet/cm/devicemanager/manager_stub.go @@ -18,7 +18,6 @@ package devicemanager import ( "k8s.io/api/core/v1" - pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher" @@ -43,11 +42,6 @@ func (h *ManagerStub) Stop() error { return nil } -// Devices returns an empty map. -func (h *ManagerStub) Devices() map[string][]pluginapi.Device { - return make(map[string][]pluginapi.Device) -} - // Allocate simply returns nil. func (h *ManagerStub) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { return nil diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index da64d5ae637..e4b25844901 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -22,7 +22,6 @@ import ( "os" "path/filepath" "reflect" - "sync/atomic" "testing" "time" @@ -68,7 +67,9 @@ func TestNewManagerImplStart(t *testing.T) { socketDir, socketName, pluginSocketName, err := tmpSocketDir() require.NoError(t, err) defer os.RemoveAll(socketDir) - m, p := setup(t, []*pluginapi.Device{}, func(n string, a, u, r []pluginapi.Device) {}, socketName, pluginSocketName, false) + m, _, p := setup(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName) + cleanup(t, m, p, nil) + // Stop should tolerate being called more than once. cleanup(t, m, p, nil) } @@ -76,7 +77,7 @@ func TestNewManagerImplStartProbeMode(t *testing.T) { socketDir, socketName, pluginSocketName, err := tmpSocketDir() require.NoError(t, err) defer os.RemoveAll(socketDir) - m, p, w := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, a, u, r []pluginapi.Device) {}, socketName, pluginSocketName) + m, _, p, w := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName) cleanup(t, m, p, w) } @@ -95,68 +96,55 @@ func TestDevicePluginReRegistration(t *testing.T) { {ID: "Dev3", Health: pluginapi.Healthy}, } for _, preStartContainerFlag := range []bool{false, true} { - - expCallbackCount := int32(0) - callbackCount := int32(0) - callbackChan := make(chan int32) - callback := func(n string, a, u, r []pluginapi.Device) { - callbackCount++ - if callbackCount > atomic.LoadInt32(&expCallbackCount) { - t.FailNow() - } - callbackChan <- callbackCount - } - m, p1 := setup(t, devs, callback, socketName, pluginSocketName, preStartContainerFlag) - atomic.StoreInt32(&expCallbackCount, 1) + m, ch, p1 := setup(t, devs, nil, socketName, pluginSocketName) p1.Register(socketName, testResourceName, "") - // Wait for the first callback to be issued. select { - case <-callbackChan: - break - case <-time.After(time.Second): - t.FailNow() + case <-ch: + case <-time.After(5 * time.Second): + t.Fatalf("timeout while waiting for manager update") } - devices := m.Devices() - require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.") + capacity, allocatable, _ := m.GetCapacity() + resourceCapacity, _ := capacity[v1.ResourceName(testResourceName)] + resourceAllocatable, _ := allocatable[v1.ResourceName(testResourceName)] + require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable") + require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.") p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, preStartContainerFlag) err = p2.Start() require.NoError(t, err) - atomic.StoreInt32(&expCallbackCount, 2) p2.Register(socketName, testResourceName, "") - // Wait for the second callback to be issued. - select { - case <-callbackChan: - break - case <-time.After(time.Second): - t.FailNow() - } - devices2 := m.Devices() - require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.") + select { + case <-ch: + case <-time.After(5 * time.Second): + t.Fatalf("timeout while waiting for manager update") + } + capacity, allocatable, _ = m.GetCapacity() + resourceCapacity, _ = capacity[v1.ResourceName(testResourceName)] + resourceAllocatable, _ = allocatable[v1.ResourceName(testResourceName)] + require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable") + require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices shouldn't change.") // Test the scenario that a plugin re-registers with different devices. p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, preStartContainerFlag) err = p3.Start() require.NoError(t, err) - atomic.StoreInt32(&expCallbackCount, 3) p3.Register(socketName, testResourceName, "") - // Wait for the second callback to be issued. - select { - case <-callbackChan: - break - case <-time.After(time.Second): - t.FailNow() - } - devices3 := m.Devices() - require.Equal(t, 1, len(devices3[testResourceName]), "Devices of plugin previously registered should be removed.") + select { + case <-ch: + case <-time.After(5 * time.Second): + t.Fatalf("timeout while waiting for manager update") + } + capacity, allocatable, _ = m.GetCapacity() + resourceCapacity, _ = capacity[v1.ResourceName(testResourceName)] + resourceAllocatable, _ = allocatable[v1.ResourceName(testResourceName)] + require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable") + require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of plugin previously registered should be removed.") p2.Stop() p3.Stop() cleanup(t, m, p1, nil) - close(callbackChan) - } } @@ -177,105 +165,106 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) { {ID: "Dev3", Health: pluginapi.Healthy}, } - expCallbackCount := int32(0) - callbackCount := int32(0) - callbackChan := make(chan int32) - callback := func(n string, a, u, r []pluginapi.Device) { - callbackCount++ - if callbackCount > atomic.LoadInt32(&expCallbackCount) { - t.FailNow() - } - callbackChan <- callbackCount - } - m, p1, w := setupInProbeMode(t, devs, callback, socketName, pluginSocketName) - atomic.StoreInt32(&expCallbackCount, 1) + m, ch, p1, w := setupInProbeMode(t, devs, nil, socketName, pluginSocketName) + // Wait for the first callback to be issued. select { - case <-callbackChan: - break - case <-time.After(time.Second): + case <-ch: + case <-time.After(5 * time.Second): t.FailNow() } - devices := m.Devices() - require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.") + capacity, allocatable, _ := m.GetCapacity() + resourceCapacity, _ := capacity[v1.ResourceName(testResourceName)] + resourceAllocatable, _ := allocatable[v1.ResourceName(testResourceName)] + require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable") + require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.") p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, false) err = p2.Start() require.NoError(t, err) - atomic.StoreInt32(&expCallbackCount, 2) // Wait for the second callback to be issued. select { - case <-callbackChan: - break - case <-time.After(time.Second): + case <-ch: + case <-time.After(5 * time.Second): t.FailNow() } - devices2 := m.Devices() - require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.") + capacity, allocatable, _ = m.GetCapacity() + resourceCapacity, _ = capacity[v1.ResourceName(testResourceName)] + resourceAllocatable, _ = allocatable[v1.ResourceName(testResourceName)] + require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable") + require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.") // Test the scenario that a plugin re-registers with different devices. p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, false) err = p3.Start() require.NoError(t, err) - atomic.StoreInt32(&expCallbackCount, 3) - // Wait for the second callback to be issued. + // Wait for the third callback to be issued. select { - case <-callbackChan: - break - case <-time.After(time.Second): + case <-ch: + case <-time.After(5 * time.Second): t.FailNow() } - devices3 := m.Devices() - require.Equal(t, 1, len(devices3[testResourceName]), "Devices of plugin previously registered should be removed.") + capacity, allocatable, _ = m.GetCapacity() + resourceCapacity, _ = capacity[v1.ResourceName(testResourceName)] + resourceAllocatable, _ = allocatable[v1.ResourceName(testResourceName)] + require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable") + require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of previous registered should be removed") p2.Stop() p3.Stop() cleanup(t, m, p1, w) - close(callbackChan) } -func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string, preStartContainerFlag bool) (Manager, *Stub) { +func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string) (Manager, <-chan interface{}) { m, err := newManagerImpl(socketName) require.NoError(t, err) + updateChan := make(chan interface{}) - m.callback = callback + if callback != nil { + m.callback = callback + } + originalCallback := m.callback + m.callback = func(resourceName string, devices []pluginapi.Device) { + originalCallback(resourceName, devices) + updateChan <- new(interface{}) + } activePods := func() []*v1.Pod { return []*v1.Pod{} } + err = m.Start(activePods, &sourcesReadyStub{}) require.NoError(t, err) - p := NewDevicePluginStub(devs, pluginSocketName, testResourceName, preStartContainerFlag) - err = p.Start() - require.NoError(t, err) - - return m, p + return m, updateChan } -func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, *Stub, *pluginwatcher.Watcher) { - w := pluginwatcher.NewWatcher(filepath.Dir(pluginSocketName)) - - m, err := newManagerImpl(socketName) +func setupDevicePlugin(t *testing.T, devs []*pluginapi.Device, pluginSocketName string) *Stub { + p := NewDevicePluginStub(devs, pluginSocketName, testResourceName, false) + err := p.Start() require.NoError(t, err) + return p +} +func setupPluginWatcher(pluginSocketName string, m Manager) *pluginwatcher.Watcher { + w := pluginwatcher.NewWatcher(filepath.Dir(pluginSocketName)) w.AddHandler(watcherapi.DevicePlugin, m.GetWatcherCallback()) w.Start() + return &w +} - m.callback = callback +func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *Stub) { + m, updateChan := setupDeviceManager(t, devs, callback, socketName) + p := setupDevicePlugin(t, devs, pluginSocketName) + return m, updateChan, p +} - activePods := func() []*v1.Pod { - return []*v1.Pod{} - } - err = m.Start(activePods, &sourcesReadyStub{}) - require.NoError(t, err) - - p := NewDevicePluginStub(devs, pluginSocketName, testResourceName, false /*preStart*/) - err = p.Start() - require.NoError(t, err) - - return m, p, &w +func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *Stub, *pluginwatcher.Watcher) { + m, updateChan := setupDeviceManager(t, devs, callback, socketName) + w := setupPluginWatcher(pluginSocketName, m) + p := setupDevicePlugin(t, devs, pluginSocketName) + return m, updateChan, p, w } func cleanup(t *testing.T, m Manager, p *Stub, w *pluginwatcher.Watcher) { @@ -305,9 +294,9 @@ func TestUpdateCapacityAllocatable(t *testing.T) { // Adds three devices for resource1, two healthy and one unhealthy. // Expects capacity for resource1 to be 2. resourceName1 := "domain1.com/resource1" - e1 := &endpointImpl{devices: make(map[string]pluginapi.Device)} + e1 := &endpointImpl{} testManager.endpoints[resourceName1] = e1 - callback(resourceName1, devs, []pluginapi.Device{}, []pluginapi.Device{}) + callback(resourceName1, devs) capacity, allocatable, removedResources := testManager.GetCapacity() resource1Capacity, ok := capacity[v1.ResourceName(resourceName1)] as.True(ok) @@ -318,7 +307,8 @@ func TestUpdateCapacityAllocatable(t *testing.T) { as.Equal(0, len(removedResources)) // Deletes an unhealthy device should NOT change allocatable but change capacity. - callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[2]}) + devs1 := devs[:len(devs)-1] + callback(resourceName1, devs1) capacity, allocatable, removedResources = testManager.GetCapacity() resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)] as.True(ok) @@ -329,34 +319,34 @@ func TestUpdateCapacityAllocatable(t *testing.T) { as.Equal(0, len(removedResources)) // Updates a healthy device to unhealthy should reduce allocatable by 1. - dev2 := devs[1] - dev2.Health = pluginapi.Unhealthy - callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{dev2}, []pluginapi.Device{}) + devs[1].Health = pluginapi.Unhealthy + callback(resourceName1, devs) capacity, allocatable, removedResources = testManager.GetCapacity() resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)] as.True(ok) resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)] as.True(ok) - as.Equal(int64(2), resource1Capacity.Value()) + as.Equal(int64(3), resource1Capacity.Value()) as.Equal(int64(1), resource1Allocatable.Value()) as.Equal(0, len(removedResources)) // Deletes a healthy device should reduce capacity and allocatable by 1. - callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[0]}) + devs2 := devs[1:] + callback(resourceName1, devs2) capacity, allocatable, removedResources = testManager.GetCapacity() resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)] as.True(ok) resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)] as.True(ok) as.Equal(int64(0), resource1Allocatable.Value()) - as.Equal(int64(1), resource1Capacity.Value()) + as.Equal(int64(2), resource1Capacity.Value()) as.Equal(0, len(removedResources)) // Tests adding another resource. resourceName2 := "resource2" - e2 := &endpointImpl{devices: make(map[string]pluginapi.Device)} + e2 := &endpointImpl{} testManager.endpoints[resourceName2] = e2 - callback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{}) + callback(resourceName2, devs) capacity, allocatable, removedResources = testManager.GetCapacity() as.Equal(2, len(capacity)) resource2Capacity, ok := capacity[v1.ResourceName(resourceName2)] @@ -364,7 +354,7 @@ func TestUpdateCapacityAllocatable(t *testing.T) { resource2Allocatable, ok := allocatable[v1.ResourceName(resourceName2)] as.True(ok) as.Equal(int64(3), resource2Capacity.Value()) - as.Equal(int64(2), resource2Allocatable.Value()) + as.Equal(int64(1), resource2Allocatable.Value()) as.Equal(0, len(removedResources)) // Expires resourceName1 endpoint. Verifies testManager.GetCapacity() reports that resourceName1 @@ -552,11 +542,7 @@ type MockEndpoint struct { func (m *MockEndpoint) stop() {} func (m *MockEndpoint) run() {} -func (m *MockEndpoint) getDevices() []pluginapi.Device { - return []pluginapi.Device{} -} - -func (m *MockEndpoint) callback(resourceName string, added, updated, deleted []pluginapi.Device) {} +func (m *MockEndpoint) callback(resourceName string, devices []pluginapi.Device) {} func (m *MockEndpoint) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) { m.initChan <- devs @@ -592,7 +578,7 @@ func makePod(limits v1.ResourceList) *v1.Pod { } func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource, opts map[string]*pluginapi.DevicePluginOptions) (*ManagerImpl, error) { - monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {} + monitorCallback := func(resourceName string, devices []pluginapi.Device) {} ckm, err := checkpointmanager.NewCheckpointManager(tmpDir) if err != nil { return nil, err @@ -858,7 +844,7 @@ func TestSanitizeNodeAllocatable(t *testing.T) { devID2 := "dev2" as := assert.New(t) - monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {} + monitorCallback := func(resourceName string, devices []pluginapi.Device) {} tmpDir, err := ioutil.TempDir("", "checkpoint") as.Nil(err) diff --git a/pkg/kubelet/cm/devicemanager/types.go b/pkg/kubelet/cm/devicemanager/types.go index c6ba5f6490f..f9c4a17a12b 100644 --- a/pkg/kubelet/cm/devicemanager/types.go +++ b/pkg/kubelet/cm/devicemanager/types.go @@ -20,7 +20,6 @@ import ( "time" "k8s.io/api/core/v1" - pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/lifecycle" @@ -33,11 +32,6 @@ type Manager interface { // Start starts device plugin registration service. Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error - // Devices is the map of devices that have registered themselves - // against the manager. - // The map key is the ResourceName of the device plugins. - Devices() map[string][]pluginapi.Device - // Allocate configures and assigns devices to pods. The pods are provided // through the pod admission attributes in the attrs argument. From the // requested device resources, Allocate will communicate with the owning