diff --git a/pkg/kubelet/cm/devicemanager/device_plugin_stub.go b/pkg/kubelet/cm/devicemanager/device_plugin_stub.go index f858b4fe149..776d456d042 100644 --- a/pkg/kubelet/cm/devicemanager/device_plugin_stub.go +++ b/pkg/kubelet/cm/devicemanager/device_plugin_stub.go @@ -22,6 +22,7 @@ import ( "net" "os" "path" + "sync" "time" "google.golang.org/grpc" @@ -35,6 +36,7 @@ type Stub struct { socket string stop chan interface{} + wg sync.WaitGroup update chan []*pluginapi.Device server *grpc.Server @@ -70,7 +72,8 @@ func (m *Stub) SetAllocFunc(f stubAllocFunc) { m.allocFunc = f } -// Start starts the gRPC server of the device plugin +// Start starts the gRPC server of the device plugin. Can only +// be called once. func (m *Stub) Start() error { err := m.cleanup() if err != nil { @@ -82,10 +85,14 @@ func (m *Stub) Start() error { return err } + m.wg.Add(1) m.server = grpc.NewServer([]grpc.ServerOption{}...) pluginapi.RegisterDevicePluginServer(m.server, m) - go m.server.Serve(sock) + go func() { + defer m.wg.Done() + m.server.Serve(sock) + }() _, conn, err := dial(m.socket) if err != nil { return err @@ -96,10 +103,17 @@ func (m *Stub) Start() error { return nil } -// Stop stops the gRPC server +// Stop stops the gRPC server. Can be called without a prior Start +// and more than once. Not safe to be called concurrently by different +// goroutines! func (m *Stub) Stop() error { + if m.server == nil { + return nil + } m.server.Stop() - close(m.stop) + m.wg.Wait() + m.server = nil + close(m.stop) // This prevents re-starting the server. return m.cleanup() } diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 6a1cc92a2c2..576164fc8a5 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -59,6 +59,7 @@ type ManagerImpl struct { mutex sync.Mutex server *grpc.Server + wg sync.WaitGroup // activePods is a method for listing active pods on the node // so the amount of pluginResources requested by existing pods @@ -224,10 +225,14 @@ func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.Sourc return err } + m.wg.Add(1) m.server = grpc.NewServer([]grpc.ServerOption{}...) pluginapi.RegisterRegistrationServer(m.server, m) - go m.server.Serve(s) + go func() { + defer m.wg.Done() + m.server.Serve(s) + }() glog.V(2).Infof("Serving device plugin registration server on %q", socketPath) @@ -313,6 +318,8 @@ func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest } // Stop is the function that can stop the gRPC server. +// Can be called concurrently, more than once, and is safe to call +// without a prior Start. func (m *ManagerImpl) Stop() error { m.mutex.Lock() defer m.mutex.Unlock() @@ -320,7 +327,12 @@ func (m *ManagerImpl) Stop() error { e.stop() } + if m.server == nil { + return nil + } m.server.Stop() + m.wg.Wait() + m.server = nil return nil } diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 6c60e2c16b1..4f37cb191c2 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -67,6 +67,29 @@ func TestNewManagerImplStart(t *testing.T) { defer os.RemoveAll(socketDir) m, p := setup(t, []*pluginapi.Device{}, func(n string, a, u, r []pluginapi.Device) {}, socketName, pluginSocketName) cleanup(t, m, p) + // Stop should tolerate being called more than once. + cleanup(t, m, p) +} + +func TestNewManagerImplStop(t *testing.T) { + socketDir, socketName, pluginSocketName, err := tmpSocketDir() + require.NoError(t, err) + defer os.RemoveAll(socketDir) + + m, err := newManagerImpl(socketName) + require.NoError(t, err) + // No prior Start, but that should be okay. + err = m.Stop() + require.NoError(t, err) + + devs := []*pluginapi.Device{ + {ID: "Dev1", Health: pluginapi.Healthy}, + {ID: "Dev2", Health: pluginapi.Healthy}, + } + p := NewDevicePluginStub(devs, pluginSocketName) + // Same here. + err = p.Stop() + require.NoError(t, err) } // Tests that the device plugin manager correctly handles registration and re-registration by