diff --git a/pkg/kubelet/cm/deviceplugin/endpoint.go b/pkg/kubelet/cm/deviceplugin/endpoint.go index 8e0b619d7cb..068446304bd 100644 --- a/pkg/kubelet/cm/deviceplugin/endpoint.go +++ b/pkg/kubelet/cm/deviceplugin/endpoint.go @@ -60,7 +60,7 @@ func newEndpoint(socketPath, resourceName string, callback MonitorCallback) (*en socketPath: socketPath, resourceName: resourceName, - devices: nil, + devices: make(map[string]pluginapi.Device), callback: callback, }, nil } @@ -77,45 +77,22 @@ func (e *endpoint) getDevices() []pluginapi.Device { return devs } -// list initializes ListAndWatch gRPC call for the device plugin and gets the -// initial list of the devices. Returns ListAndWatch gRPC stream on success. -func (e *endpoint) list() (pluginapi.DevicePlugin_ListAndWatchClient, error) { - stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{}) - if err != nil { - glog.Errorf(errListAndWatch, e.resourceName, err) - return nil, err - } - - devs, err := stream.Recv() - if err != nil { - glog.Errorf(errListAndWatch, e.resourceName, err) - return nil, err - } - - devices := make(map[string]pluginapi.Device) - var added, updated, deleted []pluginapi.Device - for _, d := range devs.Devices { - devices[d.ID] = *d - added = append(added, *d) - } - - e.mutex.Lock() - e.devices = devices - e.mutex.Unlock() - - e.callback(e.resourceName, added, updated, deleted) - - return stream, nil -} - -// listAndWatch blocks on receiving ListAndWatch gRPC stream updates. Each ListAndWatch +// run initializes ListAndWatch gRPC call for the device plugin and +// blocks on receiving ListAndWatch gRPC stream updates. Each ListAndWatch // stream update contains a new list of device states. listAndWatch compares the new // device states with its cached states to get list of new, updated, and deleted devices. // It then issues a callback to pass this information to the device_plugin_handler which // will adjust the resource available information accordingly. -func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient) { +func (e *endpoint) run() { glog.V(3).Infof("Starting ListAndWatch") + stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{}) + if err != nil { + glog.Errorf(errListAndWatch, e.resourceName, err) + + return + } + devices := make(map[string]pluginapi.Device) e.mutex.Lock() diff --git a/pkg/kubelet/cm/deviceplugin/endpoint_test.go b/pkg/kubelet/cm/deviceplugin/endpoint_test.go index a7e458258e5..1c6c85a0166 100644 --- a/pkg/kubelet/cm/deviceplugin/endpoint_test.go +++ b/pkg/kubelet/cm/deviceplugin/endpoint_test.go @@ -41,32 +41,7 @@ func TestNewEndpoint(t *testing.T) { defer ecleanup(t, p, e) } -func TestList(t *testing.T) { - socket := path.Join("/tmp", esocketName) - - devs := []*pluginapi.Device{ - {ID: "ADeviceId", Health: pluginapi.Healthy}, - } - - p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []pluginapi.Device) {}) - defer ecleanup(t, p, e) - - _, err := e.list() - require.NoError(t, err) - - e.mutex.Lock() - defer e.mutex.Unlock() - - require.Len(t, e.devices, 1) - - d, ok := e.devices[devs[0].ID] - require.True(t, ok) - - require.Equal(t, d.ID, devs[0].ID) - require.Equal(t, d.Health, devs[0].Health) -} - -func TestListAndWatch(t *testing.T) { +func TestRun(t *testing.T) { socket := path.Join("/tmp", esocketName) devs := []*pluginapi.Device{ @@ -93,10 +68,7 @@ func TestListAndWatch(t *testing.T) { }) defer ecleanup(t, p, e) - s, err := e.list() - require.NoError(t, err) - - go e.listAndWatch(s) + go e.run() p.Update(updated) time.Sleep(time.Second) diff --git a/pkg/kubelet/cm/deviceplugin/manager.go b/pkg/kubelet/cm/deviceplugin/manager.go index 39a5c713dcf..f8a0475d5ac 100644 --- a/pkg/kubelet/cm/deviceplugin/manager.go +++ b/pkg/kubelet/cm/deviceplugin/manager.go @@ -97,13 +97,13 @@ func (m *ManagerImpl) removeContents(dir string) error { } const ( - // defaultCheckpoint is the file name of device plugin checkpoint - defaultCheckpoint = "kubelet_internal_checkpoint" + // kubeletDevicePluginCheckpoint is the file name of device plugin checkpoint + kubeletDevicePluginCheckpoint = "kubelet_internal_checkpoint" ) // CheckpointFile returns device plugin checkpoint file path. func (m *ManagerImpl) CheckpointFile() string { - return filepath.Join(m.socketdir, defaultCheckpoint) + return filepath.Join(m.socketdir, kubeletDevicePluginCheckpoint) } // Start starts the Device Plugin Manager @@ -209,13 +209,6 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { return } - stream, err := e.list() - if err != nil { - glog.Errorf("Failed to List devices for plugin %v: %v", r.ResourceName, err) - e.stop() - return - } - // Associates the newly created endpoint with the corresponding resource name. // Stops existing endpoint if there is any. m.mutex.Lock() @@ -229,7 +222,7 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { } go func() { - e.listAndWatch(stream) + e.run() e.stop() m.mutex.Lock()