From 5cac9fc984c2c5b22ce2956ba349f68a38cb2a62 Mon Sep 17 00:00:00 2001 From: Jiaying Zhang Date: Fri, 8 Sep 2017 17:05:07 -0700 Subject: [PATCH] Fixes device plugin re-registration handling logic to make sure: - If a device plugin exits, its exported resource will be removed. - No capacity change if a new device plugin instance comes up to replace the old instance. --- .../deviceplugin/device_plugin_stub.go | 25 +++++++ pkg/kubelet/deviceplugin/manager.go | 42 ++++++------ pkg/kubelet/deviceplugin/manager_test.go | 65 ++++++++++++++----- pkg/kubelet/kubelet_node_status.go | 16 ++++- 4 files changed, 109 insertions(+), 39 deletions(-) diff --git a/pkg/kubelet/deviceplugin/device_plugin_stub.go b/pkg/kubelet/deviceplugin/device_plugin_stub.go index 2bd3ff4c200..5e34669ccd7 100644 --- a/pkg/kubelet/deviceplugin/device_plugin_stub.go +++ b/pkg/kubelet/deviceplugin/device_plugin_stub.go @@ -20,6 +20,7 @@ import ( "log" "net" "os" + "path" "time" "golang.org/x/net/context" @@ -86,6 +87,30 @@ func (m *Stub) Stop() error { return m.cleanup() } +// Register registers the device plugin for the given resourceName with Kubelet. +func (m *Stub) Register(kubeletEndpoint, resourceName string) error { + conn, err := grpc.Dial(kubeletEndpoint, grpc.WithInsecure(), + grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout("unix", addr, timeout) + })) + defer conn.Close() + if err != nil { + return err + } + client := pluginapi.NewRegistrationClient(conn) + reqt := &pluginapi.RegisterRequest{ + Version: pluginapi.Version, + Endpoint: path.Base(m.socket), + ResourceName: resourceName, + } + + _, err = client.Register(context.Background(), reqt) + if err != nil { + return err + } + return nil +} + // ListAndWatch lists devices and update that list according to the Update call func (m *Stub) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error { log.Println("ListAndWatch") diff --git a/pkg/kubelet/deviceplugin/manager.go b/pkg/kubelet/deviceplugin/manager.go index 4aed8708c1a..001582f3ff0 100644 --- a/pkg/kubelet/deviceplugin/manager.go +++ b/pkg/kubelet/deviceplugin/manager.go @@ -35,7 +35,7 @@ type ManagerImpl struct { socketname string socketdir string - Endpoints map[string]*endpoint // Key is ResourceName + endpoints map[string]*endpoint // Key is ResourceName mutex sync.Mutex callback MonitorCallback @@ -55,7 +55,7 @@ func NewManagerImpl(socketPath string, f MonitorCallback) (*ManagerImpl, error) dir, file := filepath.Split(socketPath) return &ManagerImpl{ - Endpoints: make(map[string]*endpoint), + endpoints: make(map[string]*endpoint), socketname: file, socketdir: dir, @@ -138,7 +138,7 @@ func (m *ManagerImpl) Devices() map[string][]*pluginapi.Device { defer m.mutex.Unlock() devs := make(map[string][]*pluginapi.Device) - for k, e := range m.Endpoints { + for k, e := range m.endpoints { glog.V(3).Infof("Endpoint: %+v: %+v", k, e) devs[k] = e.getDevices() } @@ -157,7 +157,7 @@ func (m *ManagerImpl) Allocate(resourceName string, devs []string) (*pluginapi.A glog.V(3).Infof("Recieved allocation request for devices %v for device plugin %s", devs, resourceName) m.mutex.Lock() - e, ok := m.Endpoints[resourceName] + e, ok := m.endpoints[resourceName] m.mutex.Unlock() if !ok { return nil, fmt.Errorf("Unknown Device Plugin %s", resourceName) @@ -189,7 +189,7 @@ func (m *ManagerImpl) Register(ctx context.Context, // Stop is the function that can stop the gRPC server. func (m *ManagerImpl) Stop() error { - for _, e := range m.Endpoints { + for _, e := range m.endpoints { e.stop() } m.server.Stop() @@ -197,40 +197,40 @@ func (m *ManagerImpl) Stop() error { } func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { - // Stops existing endpoint if there is any. - m.mutex.Lock() - old, ok := m.Endpoints[r.ResourceName] - m.mutex.Unlock() - if ok && old != nil { - old.stop() - } - socketPath := filepath.Join(m.socketdir, r.Endpoint) e, err := newEndpoint(socketPath, r.ResourceName, m.callback) if err != nil { glog.Errorf("Failed to dial device plugin with request %v: %v", r, err) return } - stream, err := e.list() if err != nil { glog.Errorf("Failed to List devices for plugin %v: %v", r.ResourceName, err) return } + // Associates the newly created endpoint with the corresponding resource name. + // Stops existing endpoint if there is any. + m.mutex.Lock() + old, ok := m.endpoints[r.ResourceName] + m.endpoints[r.ResourceName] = e + m.mutex.Unlock() + glog.V(2).Infof("Registered endpoint %v", e) + if ok && old != nil { + old.stop() + } + go func() { e.listAndWatch(stream) m.mutex.Lock() - if old, ok := m.Endpoints[r.ResourceName]; ok && old == e { - delete(m.Endpoints, r.ResourceName) + if old, ok := m.endpoints[r.ResourceName]; ok && old == e { + glog.V(2).Infof("Delete resource for endpoint %v", e) + delete(m.endpoints, r.ResourceName) + // Issues callback to delete all of devices. + e.callback(e.resourceName, []*pluginapi.Device{}, []*pluginapi.Device{}, e.getDevices()) } glog.V(2).Infof("Unregistered endpoint %v", e) m.mutex.Unlock() }() - - m.mutex.Lock() - m.Endpoints[r.ResourceName] = e - glog.V(2).Infof("Registered endpoint %v", e) - m.mutex.Unlock() } diff --git a/pkg/kubelet/deviceplugin/manager_test.go b/pkg/kubelet/deviceplugin/manager_test.go index e524a581fc1..c6574aeea28 100644 --- a/pkg/kubelet/deviceplugin/manager_test.go +++ b/pkg/kubelet/deviceplugin/manager_test.go @@ -17,9 +17,8 @@ limitations under the License. package deviceplugin import ( - "os" - "path" "testing" + "time" "github.com/stretchr/testify/require" @@ -27,33 +26,69 @@ import ( ) const ( - msocketName = "/tmp/server.sock" + socketName = "/tmp/device_plugin/server.sock" + pluginSocketName = "/tmp/device_plugin/device-plugin.sock" + testResourceName = "fake-domain/resource" ) func TestNewManagerImpl(t *testing.T) { - wd, _ := os.Getwd() - socket := path.Join(wd, msocketName) - _, err := NewManagerImpl("", func(n string, a, u, r []*pluginapi.Device) {}) require.Error(t, err) - _, err = NewManagerImpl(socket, func(n string, a, u, r []*pluginapi.Device) {}) + _, err = NewManagerImpl(socketName, func(n string, a, u, r []*pluginapi.Device) {}) require.NoError(t, err) } func TestNewManagerImplStart(t *testing.T) { - wd, _ := os.Getwd() - socket := path.Join(wd, msocketName) - - _, err := NewManagerImpl(socket, func(n string, a, u, r []*pluginapi.Device) {}) - require.NoError(t, err) + setup(t, []*pluginapi.Device{}, func(n string, a, u, r []*pluginapi.Device) {}) } -func setup(t *testing.T, devs []*pluginapi.Device, pluginSocket, serverSocket string, callback MonitorCallback) (Manager, *Stub) { - m, err := NewManagerImpl(serverSocket, callback) +// Tests that the device plugin manager correctly handles registration and re-registration by +// making sure that after registration, devices are correctly updated and if a re-registration +// happens, we will NOT delete devices. +func TestDevicePluginReRegistration(t *testing.T) { + devs := []*pluginapi.Device{ + {ID: "Dev1", Health: pluginapi.Healthy}, + {ID: "Dev2", Health: pluginapi.Healthy}, + } + + callbackCount := 0 + callbackChan := make(chan int) + callback := func(n string, a, u, r []*pluginapi.Device) { + // Should be called twice, one for each plugin. + if callbackCount > 1 { + t.FailNow() + } + callbackCount++ + callbackChan <- callbackCount + } + m, p1 := setup(t, devs, callback) + p1.Register(socketName, testResourceName) + // Wait for the first callback to be issued. + <-callbackChan + devices := m.Devices() + require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.") + + p2 := NewDevicePluginStub(devs, pluginSocketName+".new") + err := p2.Start() + require.NoError(t, err) + p2.Register(socketName, testResourceName) + // Wait for the second callback to be issued. + <-callbackChan + + devices2 := m.Devices() + require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.") + // Wait long enough to catch unexpected callbacks. + time.Sleep(5 * time.Second) +} + +func setup(t *testing.T, devs []*pluginapi.Device, callback MonitorCallback) (Manager, *Stub) { + m, err := NewManagerImpl(socketName, callback) + require.NoError(t, err) + err = m.Start() require.NoError(t, err) - p := NewDevicePluginStub(devs, pluginSocket) + p := NewDevicePluginStub(devs, pluginSocketName) err = p.Start() require.NoError(t, err) diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 59b77a48934..0c1b4d5efdb 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -598,13 +598,23 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) { } } - initialCapacity := kl.containerManager.GetCapacity() - if initialCapacity != nil { - for k, v := range initialCapacity { + currentCapacity := kl.containerManager.GetCapacity() + if currentCapacity != nil { + for k, v := range currentCapacity { if v1helper.IsExtendedResourceName(k) { + glog.V(2).Infof("Update capacity for %s to %d", k, v.Value()) node.Status.Capacity[k] = v } } + // Remove stale extended resources. + for k := range node.Status.Capacity { + if v1helper.IsExtendedResourceName(k) { + if _, ok := currentCapacity[k]; !ok { + glog.V(2).Infof("delete capacity for %s", k) + delete(node.Status.Capacity, k) + } + } + } } }