pass devices of previous endpoint into re-registered one to avoid potential orphaned devices upon re-registration

This commit is contained in:
lichuqiang 2017-11-07 18:28:29 +08:00
parent 6a39ac3874
commit 4fa0fa5ad1
4 changed files with 46 additions and 11 deletions

View File

@ -46,7 +46,7 @@ type endpoint struct {
}
// newEndpoint creates a new endpoint for the given resourceName.
func newEndpoint(socketPath, resourceName string, callback MonitorCallback) (*endpoint, error) {
func newEndpoint(socketPath, resourceName string, devices map[string]pluginapi.Device, callback MonitorCallback) (*endpoint, error) {
client, c, err := dial(socketPath)
if err != nil {
glog.Errorf("Can't create new endpoint with path %s err %v", socketPath, err)
@ -60,7 +60,7 @@ func newEndpoint(socketPath, resourceName string, callback MonitorCallback) (*en
socketPath: socketPath,
resourceName: resourceName,
devices: make(map[string]pluginapi.Device),
devices: devices,
callback: callback,
}, nil
}

View File

@ -102,7 +102,7 @@ func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string,
err := p.Start()
require.NoError(t, err)
e, err := newEndpoint(socket, "mock", func(n string, a, u, r []pluginapi.Device) {})
e, err := newEndpoint(socket, "mock", make(map[string]pluginapi.Device), func(n string, a, u, r []pluginapi.Device) {})
require.NoError(t, err)
return p, e

View File

@ -202,22 +202,41 @@ func (m *ManagerImpl) Stop() error {
}
func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
existingDevs := make(map[string]pluginapi.Device)
m.mutex.Lock()
old, ok := m.endpoints[r.ResourceName]
if ok && old != nil {
// Pass devices of previous endpoint into re-registered one,
// to avoid potential orphaned devices upon re-registration
existingDevs = old.devices
}
m.mutex.Unlock()
socketPath := filepath.Join(m.socketdir, r.Endpoint)
e, err := newEndpoint(socketPath, r.ResourceName, m.callback)
e, err := newEndpoint(socketPath, r.ResourceName, existingDevs, m.callback)
if err != nil {
glog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
return
}
m.mutex.Lock()
// Check for potential re-registration during the initialization of new endpoint,
// and skip updating if re-registration happens.
// TODO: simplify the part once we have a better way to handle registered devices
ext := m.endpoints[r.ResourceName]
if ext != old {
glog.Warningf("Some other endpoint %v is added while endpoint %v is initialized", ext, e)
m.mutex.Unlock()
e.stop()
return
}
// Associates the newly created endpoint with the corresponding resource name.
// Stops existing endpoint if there is any.
m.mutex.Lock()
old, ok := m.endpoints[r.ResourceName]
m.endpoints[r.ResourceName] = e
glog.V(2).Infof("Registered endpoint %v", e)
m.mutex.Unlock()
if ok && old != nil {
if old != nil {
old.stop()
}

View File

@ -47,20 +47,23 @@ func TestNewManagerImplStart(t *testing.T) {
// Tests that the device plugin manager correctly handles registration and re-registration by
// making sure that after registration, devices are correctly updated and if a re-registration
// happens, we will NOT delete devices.
// happens, we will NOT delete devices; and no orphaned devices left.
func TestDevicePluginReRegistration(t *testing.T) {
devs := []*pluginapi.Device{
{ID: "Dev1", Health: pluginapi.Healthy},
{ID: "Dev2", Health: pluginapi.Healthy},
}
devsForRegistration := []*pluginapi.Device{
{ID: "Dev3", Health: pluginapi.Healthy},
}
callbackCount := 0
callbackChan := make(chan int)
var stopping int32
stopping = 0
callback := func(n string, a, u, r []pluginapi.Device) {
// Should be called twice, one for each plugin registration, till we are stopping.
if callbackCount > 1 && atomic.LoadInt32(&stopping) <= 0 {
// Should be called three times, one for each plugin registration, till we are stopping.
if callbackCount > 2 && atomic.LoadInt32(&stopping) <= 0 {
t.FailNow()
}
callbackCount++
@ -89,12 +92,25 @@ func TestDevicePluginReRegistration(t *testing.T) {
devices2 := m.Devices()
require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.")
// Test the scenario that a plugin re-registers with different devices.
p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third")
err = p3.Start()
require.NoError(t, err)
p3.Register(socketName, testResourceName)
// Wait for the second callback to be issued.
<-callbackChan
devices3 := m.Devices()
require.Equal(t, 1, len(devices3[testResourceName]), "Devices of plugin previously registered should be removed.")
// Wait long enough to catch unexpected callbacks.
time.Sleep(5 * time.Second)
atomic.StoreInt32(&stopping, 1)
cleanup(t, m, p1)
p2.Stop()
p3.Stop()
cleanup(t, m, p1)
}
func setup(t *testing.T, devs []*pluginapi.Device, callback MonitorCallback) (Manager, *Stub) {