diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go index f62ed55f445..d590e99fa2b 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "github.com/fsnotify/fsnotify" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -59,7 +60,7 @@ type Stub struct { registrationStatus chan watcherapi.RegistrationStatus // for testing endpoint string // for testing - stopWatcher chan bool + kubeletRestartWatcher *fsnotify.Watcher } // stubGetPreferredAllocFunc is the function called when a getPreferredAllocation request is received from Kubelet @@ -89,6 +90,13 @@ func defaultRegisterControlFunc() bool { // NewDevicePluginStub returns an initialized DevicePlugin Stub. func NewDevicePluginStub(devs []*pluginapi.Device, socket string, name string, preStartContainerFlag bool, getPreferredAllocationFlag bool) *Stub { + + watcher, err := fsnotify.NewWatcher() + if err != nil { + klog.ErrorS(err, "Watcher creation failed") + panic(err) + } + return &Stub{ devs: devs, socket: socket, @@ -102,7 +110,7 @@ func NewDevicePluginStub(devs []*pluginapi.Device, socket string, name string, p allocFunc: defaultAllocFunc, getPreferredAllocFunc: defaultGetPreferredAllocFunc, - stopWatcher: make(chan bool, 1), + kubeletRestartWatcher: watcher, } } @@ -140,6 +148,12 @@ func (m *Stub) Start() error { pluginapi.RegisterDevicePluginServer(m.server, m) watcherapi.RegisterRegistrationServer(m.server, m) + err = m.kubeletRestartWatcher.Add(filepath.Dir(m.socket)) + if err != nil { + klog.ErrorS(err, "Failed to add watch", "devicePluginPath", pluginapi.DevicePluginPath) + return err + } + go func() { defer m.wg.Done() m.server.Serve(sock) @@ -184,7 +198,7 @@ func (m *Stub) Stop() error { return nil } - m.stopWatcher <- true + m.kubeletRestartWatcher.Close() m.server.Stop() m.wg.Wait() @@ -197,31 +211,29 @@ func (m *Stub) Stop() error { func (m *Stub) Watch(kubeletEndpoint, resourceName, pluginSockDir string) { for { select { - case stop := <-m.stopWatcher: - if stop { - return - } - default: - _, err := os.Lstat(m.socket) - if err != nil { - // Socket file not found; restart server - klog.InfoS("Server endpoint not found", "socket", m.socket) - klog.InfoS("Most likely Kubelet restarted") + // Detect a kubelet restart by watching for a newly created + // 'pluginapi.KubeletSocket' file. When this occurs, restart + // the device plugin server + case event := <-m.kubeletRestartWatcher.Events: + if event.Name == kubeletEndpoint && event.Op&fsnotify.Create == fsnotify.Create { + klog.InfoS("inotify: file created, restarting", "kubeletEndpoint", kubeletEndpoint) if err := m.Restart(); err != nil { klog.ErrorS(err, "Unable to restart server") panic(err) } if ok := m.registerControlFunc(); ok { - if err := m.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { + if err := m.Register(kubeletEndpoint, resourceName, pluginSockDir); err != nil { klog.ErrorS(err, "Unable to register to kubelet") panic(err) } } - } + + // Watch for any other fs errors and log them. + case err := <-m.kubeletRestartWatcher.Errors: + klog.ErrorS(err, "inotify error") } - time.Sleep(5 * time.Second) } }