merge func list and listwatch into one

This commit is contained in:
lichuqiang 2017-10-26 16:36:16 +08:00
parent 2ea306a854
commit 6a39ac3874
3 changed files with 17 additions and 75 deletions

View File

@ -60,7 +60,7 @@ func newEndpoint(socketPath, resourceName string, callback MonitorCallback) (*en
socketPath: socketPath,
resourceName: resourceName,
devices: nil,
devices: make(map[string]pluginapi.Device),
callback: callback,
}, nil
}
@ -77,45 +77,22 @@ func (e *endpoint) getDevices() []pluginapi.Device {
return devs
}
// list initializes ListAndWatch gRPC call for the device plugin and gets the
// initial list of the devices. Returns ListAndWatch gRPC stream on success.
func (e *endpoint) list() (pluginapi.DevicePlugin_ListAndWatchClient, error) {
stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{})
if err != nil {
glog.Errorf(errListAndWatch, e.resourceName, err)
return nil, err
}
devs, err := stream.Recv()
if err != nil {
glog.Errorf(errListAndWatch, e.resourceName, err)
return nil, err
}
devices := make(map[string]pluginapi.Device)
var added, updated, deleted []pluginapi.Device
for _, d := range devs.Devices {
devices[d.ID] = *d
added = append(added, *d)
}
e.mutex.Lock()
e.devices = devices
e.mutex.Unlock()
e.callback(e.resourceName, added, updated, deleted)
return stream, nil
}
// listAndWatch blocks on receiving ListAndWatch gRPC stream updates. Each ListAndWatch
// run initializes ListAndWatch gRPC call for the device plugin and
// blocks on receiving ListAndWatch gRPC stream updates. Each ListAndWatch
// stream update contains a new list of device states. listAndWatch compares the new
// device states with its cached states to get list of new, updated, and deleted devices.
// It then issues a callback to pass this information to the device_plugin_handler which
// will adjust the resource available information accordingly.
func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient) {
func (e *endpoint) run() {
glog.V(3).Infof("Starting ListAndWatch")
stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{})
if err != nil {
glog.Errorf(errListAndWatch, e.resourceName, err)
return
}
devices := make(map[string]pluginapi.Device)
e.mutex.Lock()

View File

@ -41,32 +41,7 @@ func TestNewEndpoint(t *testing.T) {
defer ecleanup(t, p, e)
}
func TestList(t *testing.T) {
socket := path.Join("/tmp", esocketName)
devs := []*pluginapi.Device{
{ID: "ADeviceId", Health: pluginapi.Healthy},
}
p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []pluginapi.Device) {})
defer ecleanup(t, p, e)
_, err := e.list()
require.NoError(t, err)
e.mutex.Lock()
defer e.mutex.Unlock()
require.Len(t, e.devices, 1)
d, ok := e.devices[devs[0].ID]
require.True(t, ok)
require.Equal(t, d.ID, devs[0].ID)
require.Equal(t, d.Health, devs[0].Health)
}
func TestListAndWatch(t *testing.T) {
func TestRun(t *testing.T) {
socket := path.Join("/tmp", esocketName)
devs := []*pluginapi.Device{
@ -93,10 +68,7 @@ func TestListAndWatch(t *testing.T) {
})
defer ecleanup(t, p, e)
s, err := e.list()
require.NoError(t, err)
go e.listAndWatch(s)
go e.run()
p.Update(updated)
time.Sleep(time.Second)

View File

@ -97,13 +97,13 @@ func (m *ManagerImpl) removeContents(dir string) error {
}
const (
// defaultCheckpoint is the file name of device plugin checkpoint
defaultCheckpoint = "kubelet_internal_checkpoint"
// kubeletDevicePluginCheckpoint is the file name of device plugin checkpoint
kubeletDevicePluginCheckpoint = "kubelet_internal_checkpoint"
)
// CheckpointFile returns device plugin checkpoint file path.
func (m *ManagerImpl) CheckpointFile() string {
return filepath.Join(m.socketdir, defaultCheckpoint)
return filepath.Join(m.socketdir, kubeletDevicePluginCheckpoint)
}
// Start starts the Device Plugin Manager
@ -209,13 +209,6 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
return
}
stream, err := e.list()
if err != nil {
glog.Errorf("Failed to List devices for plugin %v: %v", r.ResourceName, err)
e.stop()
return
}
// Associates the newly created endpoint with the corresponding resource name.
// Stops existing endpoint if there is any.
m.mutex.Lock()
@ -229,7 +222,7 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
}
go func() {
e.listAndWatch(stream)
e.run()
e.stop()
m.mutex.Lock()