node: sample-dp: Use fsnotify for kubelet restart detection

Add kubeletSocket file to fsnotify instead of polling and waiting for deletion
of device plugin unix socket as a way of detecting kubelet restart. We need to
ensure that the device plugin re-registers itself after kubelet restart depending
on the configured registration mode (auto-registration or controller registration).

Signed-off-by: Swati Sehgal <swsehgal@redhat.com>
This commit is contained in:
Swati Sehgal 2023-07-13 09:17:13 +01:00
parent f46cdf1428
commit d0d133298d

View File

@ -24,6 +24,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/fsnotify/fsnotify"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
@ -59,7 +60,7 @@ type Stub struct {
registrationStatus chan watcherapi.RegistrationStatus // for testing registrationStatus chan watcherapi.RegistrationStatus // for testing
endpoint string // for testing endpoint string // for testing
stopWatcher chan bool kubeletRestartWatcher *fsnotify.Watcher
} }
// stubGetPreferredAllocFunc is the function called when a getPreferredAllocation request is received from Kubelet // stubGetPreferredAllocFunc is the function called when a getPreferredAllocation request is received from Kubelet
@ -89,6 +90,13 @@ func defaultRegisterControlFunc() bool {
// NewDevicePluginStub returns an initialized DevicePlugin Stub. // NewDevicePluginStub returns an initialized DevicePlugin Stub.
func NewDevicePluginStub(devs []*pluginapi.Device, socket string, name string, preStartContainerFlag bool, getPreferredAllocationFlag bool) *Stub { func NewDevicePluginStub(devs []*pluginapi.Device, socket string, name string, preStartContainerFlag bool, getPreferredAllocationFlag bool) *Stub {
watcher, err := fsnotify.NewWatcher()
if err != nil {
klog.ErrorS(err, "Watcher creation failed")
panic(err)
}
return &Stub{ return &Stub{
devs: devs, devs: devs,
socket: socket, socket: socket,
@ -102,7 +110,7 @@ func NewDevicePluginStub(devs []*pluginapi.Device, socket string, name string, p
allocFunc: defaultAllocFunc, allocFunc: defaultAllocFunc,
getPreferredAllocFunc: defaultGetPreferredAllocFunc, getPreferredAllocFunc: defaultGetPreferredAllocFunc,
stopWatcher: make(chan bool, 1), kubeletRestartWatcher: watcher,
} }
} }
@ -140,6 +148,12 @@ func (m *Stub) Start() error {
pluginapi.RegisterDevicePluginServer(m.server, m) pluginapi.RegisterDevicePluginServer(m.server, m)
watcherapi.RegisterRegistrationServer(m.server, m) watcherapi.RegisterRegistrationServer(m.server, m)
err = m.kubeletRestartWatcher.Add(filepath.Dir(m.socket))
if err != nil {
klog.ErrorS(err, "Failed to add watch", "devicePluginPath", pluginapi.DevicePluginPath)
return err
}
go func() { go func() {
defer m.wg.Done() defer m.wg.Done()
m.server.Serve(sock) m.server.Serve(sock)
@ -184,7 +198,7 @@ func (m *Stub) Stop() error {
return nil return nil
} }
m.stopWatcher <- true m.kubeletRestartWatcher.Close()
m.server.Stop() m.server.Stop()
m.wg.Wait() m.wg.Wait()
@ -197,31 +211,29 @@ func (m *Stub) Stop() error {
func (m *Stub) Watch(kubeletEndpoint, resourceName, pluginSockDir string) { func (m *Stub) Watch(kubeletEndpoint, resourceName, pluginSockDir string) {
for { for {
select { select {
case stop := <-m.stopWatcher: // Detect a kubelet restart by watching for a newly created
if stop { // 'pluginapi.KubeletSocket' file. When this occurs, restart
return // the device plugin server
} case event := <-m.kubeletRestartWatcher.Events:
default: if event.Name == kubeletEndpoint && event.Op&fsnotify.Create == fsnotify.Create {
_, err := os.Lstat(m.socket) klog.InfoS("inotify: file created, restarting", "kubeletEndpoint", kubeletEndpoint)
if err != nil {
// Socket file not found; restart server
klog.InfoS("Server endpoint not found", "socket", m.socket)
klog.InfoS("Most likely Kubelet restarted")
if err := m.Restart(); err != nil { if err := m.Restart(); err != nil {
klog.ErrorS(err, "Unable to restart server") klog.ErrorS(err, "Unable to restart server")
panic(err) panic(err)
} }
if ok := m.registerControlFunc(); ok { if ok := m.registerControlFunc(); ok {
if err := m.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { if err := m.Register(kubeletEndpoint, resourceName, pluginSockDir); err != nil {
klog.ErrorS(err, "Unable to register to kubelet") klog.ErrorS(err, "Unable to register to kubelet")
panic(err) panic(err)
} }
} }
} }
// Watch for any other fs errors and log them.
case err := <-m.kubeletRestartWatcher.Errors:
klog.ErrorS(err, "inotify error")
} }
time.Sleep(5 * time.Second)
} }
} }