From fcbb64b93dfb633f880bad9ed1771201843b8e7e Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Wed, 14 Feb 2018 09:36:47 +0100 Subject: [PATCH] avoid race condition in device manager and plugin startup/shutdown A flaky test exposed a race condition where shutting down one server instance broke the startup of the next instance when using the same socket path. Commit 1325c2f8bec removed the reuse of the same socket path and thus avoided the issue. But the real fix is to ensure that the listening socket is really closed once Stop returns. Two solutions were proposed in https://github.com/grpc/grpc-go/issues/1861: - waiting for the goroutine to complete - closing the socket The former is done here because it's cleaner to not keep lingering goroutines. While at it, the Stop methods are made idempotent (similar to e.g. Close on a socket) and no longer crash when called without prior Start. Fixes https://github.com/kubernetes/kubernetes/issues/59488 --- .../cm/devicemanager/device_plugin_stub.go | 22 ++++++++++++++---- pkg/kubelet/cm/devicemanager/manager.go | 14 ++++++++++- pkg/kubelet/cm/devicemanager/manager_test.go | 23 +++++++++++++++++++ 3 files changed, 54 insertions(+), 5 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/device_plugin_stub.go b/pkg/kubelet/cm/devicemanager/device_plugin_stub.go index f858b4fe149..776d456d042 100644 --- a/pkg/kubelet/cm/devicemanager/device_plugin_stub.go +++ b/pkg/kubelet/cm/devicemanager/device_plugin_stub.go @@ -22,6 +22,7 @@ import ( "net" "os" "path" + "sync" "time" "google.golang.org/grpc" @@ -35,6 +36,7 @@ type Stub struct { socket string stop chan interface{} + wg sync.WaitGroup update chan []*pluginapi.Device server *grpc.Server @@ -70,7 +72,8 @@ func (m *Stub) SetAllocFunc(f stubAllocFunc) { m.allocFunc = f } -// Start starts the gRPC server of the device plugin +// Start starts the gRPC server of the device plugin. Can only +// be called once. func (m *Stub) Start() error { err := m.cleanup() if err != nil { @@ -82,10 +85,14 @@ func (m *Stub) Start() error { return err } + m.wg.Add(1) m.server = grpc.NewServer([]grpc.ServerOption{}...) pluginapi.RegisterDevicePluginServer(m.server, m) - go m.server.Serve(sock) + go func() { + defer m.wg.Done() + m.server.Serve(sock) + }() _, conn, err := dial(m.socket) if err != nil { return err @@ -96,10 +103,17 @@ func (m *Stub) Start() error { return nil } -// Stop stops the gRPC server +// Stop stops the gRPC server. Can be called without a prior Start +// and more than once. Not safe to be called concurrently by different +// goroutines! func (m *Stub) Stop() error { + if m.server == nil { + return nil + } m.server.Stop() - close(m.stop) + m.wg.Wait() + m.server = nil + close(m.stop) // This prevents re-starting the server. return m.cleanup() } diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 6c82fbe53d9..fa63efc611f 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -59,6 +59,7 @@ type ManagerImpl struct { mutex sync.Mutex server *grpc.Server + wg sync.WaitGroup // activePods is a method for listing active pods on the node // so the amount of pluginResources requested by existing pods @@ -224,10 +225,14 @@ func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.Sourc return err } + m.wg.Add(1) m.server = grpc.NewServer([]grpc.ServerOption{}...) pluginapi.RegisterRegistrationServer(m.server, m) - go m.server.Serve(s) + go func() { + defer m.wg.Done() + m.server.Serve(s) + }() glog.V(2).Infof("Serving device plugin registration server on %q", socketPath) @@ -313,6 +318,8 @@ func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest } // Stop is the function that can stop the gRPC server. +// Can be called concurrently, more than once, and is safe to call +// without a prior Start. func (m *ManagerImpl) Stop() error { m.mutex.Lock() defer m.mutex.Unlock() @@ -320,7 +327,12 @@ func (m *ManagerImpl) Stop() error { e.stop() } + if m.server == nil { + return nil + } m.server.Stop() + m.wg.Wait() + m.server = nil return nil } diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index b0ef2857fd1..fde0ed2495e 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -69,6 +69,29 @@ func TestNewManagerImplStart(t *testing.T) { defer os.RemoveAll(socketDir) m, p := setup(t, []*pluginapi.Device{}, func(n string, a, u, r []pluginapi.Device) {}, socketName, pluginSocketName) cleanup(t, m, p) + // Stop should tolerate being called more than once. + cleanup(t, m, p) +} + +func TestNewManagerImplStop(t *testing.T) { + socketDir, socketName, pluginSocketName, err := tmpSocketDir() + require.NoError(t, err) + defer os.RemoveAll(socketDir) + + m, err := newManagerImpl(socketName) + require.NoError(t, err) + // No prior Start, but that should be okay. + err = m.Stop() + require.NoError(t, err) + + devs := []*pluginapi.Device{ + {ID: "Dev1", Health: pluginapi.Healthy}, + {ID: "Dev2", Health: pluginapi.Healthy}, + } + p := NewDevicePluginStub(devs, pluginSocketName) + // Same here. + err = p.Stop() + require.NoError(t, err) } // Tests that the device plugin manager correctly handles registration and re-registration by