Merge pull request #52561 from jiayingz/deviceplugin-failure

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>..

Fixes a race in deviceplugin/manager_test.go and a race in deviceplug…

…in/manager.go.



**What this PR does / why we need it**:

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #
https://github.com/kubernetes/kubernetes/issues/52560

**Special notes for your reviewer**:
Tested with  go test -count 50 -race k8s.io/kubernetes/pkg/kubelet/deviceplugin and all runs passed.

**Release note**:

```release-note
```
This commit is contained in:
Kubernetes Submit Queue 2017-09-19 13:35:44 -07:00 committed by GitHub
commit 08486ab4aa
3 changed files with 16 additions and 4 deletions

View File

@ -70,7 +70,7 @@ func (m *Stub) Start() error {
// Wait till grpc server is ready.
for i := 0; i < 10; i++ {
services := m.server.GetServiceInfo()
if len(services) > 0 {
if len(services) > 1 {
break
}
time.Sleep(1 * time.Second)
@ -83,6 +83,7 @@ func (m *Stub) Start() error {
// Stop stops the gRPC server
func (m *Stub) Stop() error {
m.server.Stop()
close(m.stop)
return m.cleanup()
}

View File

@ -189,9 +189,12 @@ func (m *ManagerImpl) Register(ctx context.Context,
// Stop is the function that can stop the gRPC server.
func (m *ManagerImpl) Stop() error {
m.mutex.Lock()
defer m.mutex.Unlock()
for _, e := range m.endpoints {
e.stop()
}
m.server.Stop()
return nil
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package deviceplugin
import (
"sync/atomic"
"testing"
"time"
@ -40,7 +41,8 @@ func TestNewManagerImpl(t *testing.T) {
}
func TestNewManagerImplStart(t *testing.T) {
setup(t, []*pluginapi.Device{}, func(n string, a, u, r []*pluginapi.Device) {})
m, p := setup(t, []*pluginapi.Device{}, func(n string, a, u, r []*pluginapi.Device) {})
cleanup(t, m, p)
}
// Tests that the device plugin manager correctly handles registration and re-registration by
@ -54,9 +56,11 @@ func TestDevicePluginReRegistration(t *testing.T) {
callbackCount := 0
callbackChan := make(chan int)
var stopping int32
stopping = 0
callback := func(n string, a, u, r []*pluginapi.Device) {
// Should be called twice, one for each plugin.
if callbackCount > 1 {
// Should be called twice, one for each plugin registration, till we are stopping.
if callbackCount > 1 && atomic.LoadInt32(&stopping) <= 0 {
t.FailNow()
}
callbackCount++
@ -80,6 +84,10 @@ func TestDevicePluginReRegistration(t *testing.T) {
require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.")
// Wait long enough to catch unexpected callbacks.
time.Sleep(5 * time.Second)
atomic.StoreInt32(&stopping, 1)
cleanup(t, m, p1)
p2.Stop()
}
func setup(t *testing.T, devs []*pluginapi.Device, callback MonitorCallback) (Manager, *Stub) {