From 34dccc5d2a7c10ba8a3cf43c3273d66ec2491f35 Mon Sep 17 00:00:00 2001 From: Jiaying Zhang Date: Fri, 15 Sep 2017 11:19:46 -0700 Subject: [PATCH] Fixes some races in deviceplugin manager_test.go and manager.go. --- pkg/kubelet/deviceplugin/device_plugin_stub.go | 3 ++- pkg/kubelet/deviceplugin/manager.go | 3 +++ pkg/kubelet/deviceplugin/manager_test.go | 14 +++++++++++--- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/pkg/kubelet/deviceplugin/device_plugin_stub.go b/pkg/kubelet/deviceplugin/device_plugin_stub.go index 5e34669ccd7..a0f103d03a9 100644 --- a/pkg/kubelet/deviceplugin/device_plugin_stub.go +++ b/pkg/kubelet/deviceplugin/device_plugin_stub.go @@ -70,7 +70,7 @@ func (m *Stub) Start() error { // Wait till grpc server is ready. for i := 0; i < 10; i++ { services := m.server.GetServiceInfo() - if len(services) > 0 { + if len(services) > 1 { break } time.Sleep(1 * time.Second) @@ -83,6 +83,7 @@ func (m *Stub) Start() error { // Stop stops the gRPC server func (m *Stub) Stop() error { m.server.Stop() + close(m.stop) return m.cleanup() } diff --git a/pkg/kubelet/deviceplugin/manager.go b/pkg/kubelet/deviceplugin/manager.go index 001582f3ff0..5c05a972894 100644 --- a/pkg/kubelet/deviceplugin/manager.go +++ b/pkg/kubelet/deviceplugin/manager.go @@ -189,9 +189,12 @@ func (m *ManagerImpl) Register(ctx context.Context, // Stop is the function that can stop the gRPC server. func (m *ManagerImpl) Stop() error { + m.mutex.Lock() + defer m.mutex.Unlock() for _, e := range m.endpoints { e.stop() } + m.server.Stop() return nil } diff --git a/pkg/kubelet/deviceplugin/manager_test.go b/pkg/kubelet/deviceplugin/manager_test.go index c6574aeea28..354d33b0011 100644 --- a/pkg/kubelet/deviceplugin/manager_test.go +++ b/pkg/kubelet/deviceplugin/manager_test.go @@ -17,6 +17,7 @@ limitations under the License. package deviceplugin import ( + "sync/atomic" "testing" "time" @@ -40,7 +41,8 @@ func TestNewManagerImpl(t *testing.T) { } func TestNewManagerImplStart(t *testing.T) { - setup(t, []*pluginapi.Device{}, func(n string, a, u, r []*pluginapi.Device) {}) + m, p := setup(t, []*pluginapi.Device{}, func(n string, a, u, r []*pluginapi.Device) {}) + cleanup(t, m, p) } // Tests that the device plugin manager correctly handles registration and re-registration by @@ -54,9 +56,11 @@ func TestDevicePluginReRegistration(t *testing.T) { callbackCount := 0 callbackChan := make(chan int) + var stopping int32 + stopping = 0 callback := func(n string, a, u, r []*pluginapi.Device) { - // Should be called twice, one for each plugin. - if callbackCount > 1 { + // Should be called twice, one for each plugin registration, till we are stopping. + if callbackCount > 1 && atomic.LoadInt32(&stopping) <= 0 { t.FailNow() } callbackCount++ @@ -80,6 +84,10 @@ func TestDevicePluginReRegistration(t *testing.T) { require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.") // Wait long enough to catch unexpected callbacks. time.Sleep(5 * time.Second) + + atomic.StoreInt32(&stopping, 1) + cleanup(t, m, p1) + p2.Stop() } func setup(t *testing.T, devs []*pluginapi.Device, callback MonitorCallback) (Manager, *Stub) {