From dc5384a139cc05ac31f76758c2c0923ce5cb88ff Mon Sep 17 00:00:00 2001 From: Penghao Cen Date: Fri, 17 Nov 2017 13:10:25 +0800 Subject: [PATCH 1/3] Don't rewrite device health --- pkg/kubelet/cm/deviceplugin/device_plugin_stub.go | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/pkg/kubelet/cm/deviceplugin/device_plugin_stub.go b/pkg/kubelet/cm/deviceplugin/device_plugin_stub.go index 01f08c15987..9969e99989b 100644 --- a/pkg/kubelet/cm/deviceplugin/device_plugin_stub.go +++ b/pkg/kubelet/cm/deviceplugin/device_plugin_stub.go @@ -115,16 +115,8 @@ func (m *Stub) Register(kubeletEndpoint, resourceName string) error { // ListAndWatch lists devices and update that list according to the Update call func (m *Stub) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error { log.Println("ListAndWatch") - var devs []*pluginapi.Device - for _, d := range m.devs { - devs = append(devs, &pluginapi.Device{ - ID: d.ID, - Health: pluginapi.Healthy, - }) - } - - s.Send(&pluginapi.ListAndWatchResponse{Devices: devs}) + s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs}) for { select { From 90bc1265cf9fd9a5c98f91274be2a0c3f4d3f528 Mon Sep 17 00:00:00 2001 From: Penghao Cen Date: Fri, 12 Jan 2018 20:09:07 +0800 Subject: [PATCH 2/3] Fix endpoint not work issue --- pkg/kubelet/cm/deviceplugin/endpoint_test.go | 55 ++++++++++++++++---- 1 file changed, 44 insertions(+), 11 deletions(-) diff --git a/pkg/kubelet/cm/deviceplugin/endpoint_test.go b/pkg/kubelet/cm/deviceplugin/endpoint_test.go index 226148a6b06..f4634db85f4 100644 --- a/pkg/kubelet/cm/deviceplugin/endpoint_test.go +++ b/pkg/kubelet/cm/deviceplugin/endpoint_test.go @@ -19,7 +19,6 @@ package deviceplugin import ( "path" "testing" - "time" "github.com/stretchr/testify/require" @@ -54,22 +53,56 @@ func TestRun(t *testing.T) { {ID: "AThirdDeviceId", Health: pluginapi.Healthy}, } - p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []pluginapi.Device) { - require.Len(t, a, 1) - require.Len(t, u, 1) - require.Len(t, r, 1) + callbackCount := 0 + callbackChan := make(chan int) + callback := func(n string, a, u, r []pluginapi.Device) { + // Should be called twice: + // one for plugin registration, one for plugin update. + if callbackCount > 2 { + t.FailNow() + } - require.Equal(t, a[0].ID, updated[1].ID) + // Check plugin registration + if callbackCount == 0 { + require.Len(t, a, 2) + require.Len(t, u, 0) + require.Len(t, r, 0) + } - require.Equal(t, u[0].ID, updated[0].ID) - require.Equal(t, u[0].Health, updated[0].Health) + // Check plugin update + if callbackCount == 1 { + require.Len(t, a, 1) + require.Len(t, u, 1) + require.Len(t, r, 1) - require.Equal(t, r[0].ID, devs[1].ID) - }) + require.Equal(t, a[0].ID, updated[1].ID) + require.Equal(t, u[0].ID, updated[0].ID) + require.Equal(t, u[0].Health, updated[0].Health) + require.Equal(t, r[0].ID, devs[1].ID) + } + + callbackCount++ + callbackChan <- callbackCount + } + + p, e := esetup(t, devs, socket, "mock", callback) defer ecleanup(t, p, e) go e.run() + // Wait for the first callback to be issued. + select { + case <-callbackChan: + break + } + p.Update(updated) + + // Wait for the second callback to be issued. + select { + case <-callbackChan: + break + } + time.Sleep(time.Second) e.mutex.Lock() @@ -102,7 +135,7 @@ func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, err := p.Start() require.NoError(t, err) - e, err := newEndpointImpl(socket, "mock", make(map[string]pluginapi.Device), func(n string, a, u, r []pluginapi.Device) {}) + e, err := newEndpointImpl(socket, resourceName, make(map[string]pluginapi.Device), callback) require.NoError(t, err) return p, e From b96c383ef7982f526db190ebba76730fd6c818df Mon Sep 17 00:00:00 2001 From: Penghao Cen Date: Sat, 13 Jan 2018 05:47:49 +0800 Subject: [PATCH 3/3] Check grpc server ready properly --- pkg/kubelet/cm/deviceplugin/device_plugin_stub.go | 2 +- pkg/kubelet/cm/deviceplugin/endpoint_test.go | 12 ++---------- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/pkg/kubelet/cm/deviceplugin/device_plugin_stub.go b/pkg/kubelet/cm/deviceplugin/device_plugin_stub.go index 9969e99989b..5e39dd00853 100644 --- a/pkg/kubelet/cm/deviceplugin/device_plugin_stub.go +++ b/pkg/kubelet/cm/deviceplugin/device_plugin_stub.go @@ -70,7 +70,7 @@ func (m *Stub) Start() error { // Wait till grpc server is ready. for i := 0; i < 10; i++ { services := m.server.GetServiceInfo() - if len(services) > 1 { + if len(services) > 0 { break } time.Sleep(1 * time.Second) diff --git a/pkg/kubelet/cm/deviceplugin/endpoint_test.go b/pkg/kubelet/cm/deviceplugin/endpoint_test.go index f4634db85f4..6005310181a 100644 --- a/pkg/kubelet/cm/deviceplugin/endpoint_test.go +++ b/pkg/kubelet/cm/deviceplugin/endpoint_test.go @@ -90,20 +90,12 @@ func TestRun(t *testing.T) { go e.run() // Wait for the first callback to be issued. - select { - case <-callbackChan: - break - } + <-callbackChan p.Update(updated) // Wait for the second callback to be issued. - select { - case <-callbackChan: - break - } - - time.Sleep(time.Second) + <-callbackChan e.mutex.Lock() defer e.mutex.Unlock()