avoid race condition in device manager and plugin startup/shutdown

A flaky test exposed a race condition where shutting down one server
instance broke the startup of the next instance when using the same
socket path. Commit 1325c2f8be removed the reuse of the same socket
path and thus avoided the issue.

But the real fix is to ensure that the listening socket is really
closed once Stop returns. Two solutions were proposed in
https://github.com/grpc/grpc-go/issues/1861:
- waiting for the goroutine to complete
- closing the socket

The former is done here because it's cleaner to not keep lingering
goroutines. While at it, the Stop methods are made idempotent (similar
to e.g. Close on a socket) and no longer crash when called without
prior Start.

Fixes https://github.com/kubernetes/kubernetes/issues/59488
This commit is contained in:
Patrick Ohly 2018-02-14 09:36:47 +01:00
parent a71393a382
commit fcbb64b93d
3 changed files with 54 additions and 5 deletions

View File

@ -22,6 +22,7 @@ import (
"net"
"os"
"path"
"sync"
"time"
"google.golang.org/grpc"
@ -35,6 +36,7 @@ type Stub struct {
socket string
stop chan interface{}
wg sync.WaitGroup
update chan []*pluginapi.Device
server *grpc.Server
@ -70,7 +72,8 @@ func (m *Stub) SetAllocFunc(f stubAllocFunc) {
m.allocFunc = f
}
// Start starts the gRPC server of the device plugin
// Start starts the gRPC server of the device plugin. Can only
// be called once.
func (m *Stub) Start() error {
err := m.cleanup()
if err != nil {
@ -82,10 +85,14 @@ func (m *Stub) Start() error {
return err
}
m.wg.Add(1)
m.server = grpc.NewServer([]grpc.ServerOption{}...)
pluginapi.RegisterDevicePluginServer(m.server, m)
go m.server.Serve(sock)
go func() {
defer m.wg.Done()
m.server.Serve(sock)
}()
_, conn, err := dial(m.socket)
if err != nil {
return err
@ -96,10 +103,17 @@ func (m *Stub) Start() error {
return nil
}
// Stop stops the gRPC server
// Stop stops the gRPC server. Can be called without a prior Start
// and more than once. Not safe to be called concurrently by different
// goroutines!
func (m *Stub) Stop() error {
if m.server == nil {
return nil
}
m.server.Stop()
close(m.stop)
m.wg.Wait()
m.server = nil
close(m.stop) // This prevents re-starting the server.
return m.cleanup()
}

View File

@ -59,6 +59,7 @@ type ManagerImpl struct {
mutex sync.Mutex
server *grpc.Server
wg sync.WaitGroup
// activePods is a method for listing active pods on the node
// so the amount of pluginResources requested by existing pods
@ -224,10 +225,14 @@ func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.Sourc
return err
}
m.wg.Add(1)
m.server = grpc.NewServer([]grpc.ServerOption{}...)
pluginapi.RegisterRegistrationServer(m.server, m)
go m.server.Serve(s)
go func() {
defer m.wg.Done()
m.server.Serve(s)
}()
glog.V(2).Infof("Serving device plugin registration server on %q", socketPath)
@ -313,6 +318,8 @@ func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest
}
// Stop is the function that can stop the gRPC server.
// Can be called concurrently, more than once, and is safe to call
// without a prior Start.
func (m *ManagerImpl) Stop() error {
m.mutex.Lock()
defer m.mutex.Unlock()
@ -320,7 +327,12 @@ func (m *ManagerImpl) Stop() error {
e.stop()
}
if m.server == nil {
return nil
}
m.server.Stop()
m.wg.Wait()
m.server = nil
return nil
}

View File

@ -69,6 +69,29 @@ func TestNewManagerImplStart(t *testing.T) {
defer os.RemoveAll(socketDir)
m, p := setup(t, []*pluginapi.Device{}, func(n string, a, u, r []pluginapi.Device) {}, socketName, pluginSocketName)
cleanup(t, m, p)
// Stop should tolerate being called more than once.
cleanup(t, m, p)
}
func TestNewManagerImplStop(t *testing.T) {
socketDir, socketName, pluginSocketName, err := tmpSocketDir()
require.NoError(t, err)
defer os.RemoveAll(socketDir)
m, err := newManagerImpl(socketName)
require.NoError(t, err)
// No prior Start, but that should be okay.
err = m.Stop()
require.NoError(t, err)
devs := []*pluginapi.Device{
{ID: "Dev1", Health: pluginapi.Healthy},
{ID: "Dev2", Health: pluginapi.Healthy},
}
p := NewDevicePluginStub(devs, pluginSocketName)
// Same here.
err = p.Stop()
require.NoError(t, err)
}
// Tests that the device plugin manager correctly handles registration and re-registration by