From 6714e678d33786e29bf6bfaf78bc8f2ea8e4fc7a Mon Sep 17 00:00:00 2001 From: Swati Sehgal Date: Wed, 7 Jun 2023 13:56:05 +0100 Subject: [PATCH 1/6] node: sample-dp: register by default and re-register on restarts In issue: 115107 we added an environment variable to control the registration of sample device plugin to kubelet. The intent of this patch is to ensure that the default behaviour of the plugin is to register to kubelet (in case no environment variable is specified). In addition to that, we want to ensure that the plugin registers itself not just once. It should re-register itself to kubelet in case of node reboot or kubelet restarts. Signed-off-by: Swati Sehgal --- .../cm/devicemanager/plugin/v1beta1/stub.go | 57 +++++++++++++++++++ test/images/sample-device-plugin/VERSION | 2 +- .../sampledeviceplugin.go | 40 ++++++++++--- 3 files changed, 90 insertions(+), 9 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go index 1d226f72a85..6d899a7ec90 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go @@ -56,6 +56,7 @@ type Stub struct { registrationStatus chan watcherapi.RegistrationStatus // for testing endpoint string // for testing + stopWatcher chan bool } // stubGetPreferredAllocFunc is the function called when a getPreferredAllocation request is received from Kubelet @@ -90,6 +91,7 @@ func NewDevicePluginStub(devs []*pluginapi.Device, socket string, name string, p allocFunc: defaultAllocFunc, getPreferredAllocFunc: defaultGetPreferredAllocFunc, + stopWatcher: make(chan bool, 1), } } @@ -106,6 +108,7 @@ func (m *Stub) SetAllocFunc(f stubAllocFunc) { // Start starts the gRPC server of the device plugin. Can only // be called once. func (m *Stub) Start() error { + klog.InfoS("Starting device plugin server") err := m.cleanup() if err != nil { return err @@ -144,13 +147,29 @@ func (m *Stub) Start() error { return nil } +func (m *Stub) Restart() error { + klog.InfoS("Restarting Device Plugin server") + if m.server == nil { + return nil + } + + m.server.Stop() + m.server = nil + + return m.Start() +} + // Stop stops the gRPC server. Can be called without a prior Start // and more than once. Not safe to be called concurrently by different // goroutines! func (m *Stub) Stop() error { + klog.InfoS("Stopping device plugin server") if m.server == nil { return nil } + + m.stopWatcher <- true + m.server.Stop() m.wg.Wait() m.server = nil @@ -159,6 +178,35 @@ func (m *Stub) Stop() error { return m.cleanup() } +func (m *Stub) Watch(kubeletEndpoint, resourceName string, pluginSockDir string) { + for { + select { + case stop := <-m.stopWatcher: + if stop { + return + } + default: + _, err := os.Lstat(m.socket) + if err != nil { + // Socket file not found; restart server + klog.InfoS("Server endpoint not found", "socket", m.socket) + klog.InfoS("Most likely Kubelet restarted") + if err := m.Restart(); err != nil { + klog.ErrorS(err, "Unable to restart server") + panic(err) + + } + if err := m.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { + klog.ErrorS(err, "Unable to register to kubelet") + panic(err) + } + + } + } + time.Sleep(5 * time.Second) + } +} + // GetInfo is the RPC which return pluginInfo func (m *Stub) GetInfo(ctx context.Context, req *watcherapi.InfoRequest) (*watcherapi.PluginInfo, error) { klog.InfoS("GetInfo") @@ -182,6 +230,8 @@ func (m *Stub) NotifyRegistrationStatus(ctx context.Context, status *watcherapi. // Register registers the device plugin for the given resourceName with Kubelet. func (m *Stub) Register(kubeletEndpoint, resourceName string, pluginSockDir string) error { + klog.InfoS("Register", "kubeletEndpoint", kubeletEndpoint, "resourceName", resourceName, "socket", pluginSockDir) + if pluginSockDir != "" { if _, err := os.Stat(pluginSockDir + "DEPRECATION"); err == nil { klog.InfoS("Deprecation file found. Skip registration") @@ -214,6 +264,13 @@ func (m *Stub) Register(kubeletEndpoint, resourceName string, pluginSockDir stri } _, err = client.Register(context.Background(), reqt) + if err != nil { + // Stop server + m.server.Stop() + klog.ErrorS(err, "Client unable to register to kubelet") + return err + } + klog.InfoS("Device Plugin registered with the Kubelet") return err } diff --git a/test/images/sample-device-plugin/VERSION b/test/images/sample-device-plugin/VERSION index c239c60cba2..810ee4e91e2 100644 --- a/test/images/sample-device-plugin/VERSION +++ b/test/images/sample-device-plugin/VERSION @@ -1 +1 @@ -1.5 +1.6 diff --git a/test/images/sample-device-plugin/sampledeviceplugin.go b/test/images/sample-device-plugin/sampledeviceplugin.go index b8d5460bb4b..083dc162418 100644 --- a/test/images/sample-device-plugin/sampledeviceplugin.go +++ b/test/images/sample-device-plugin/sampledeviceplugin.go @@ -19,7 +19,9 @@ package main import ( "fmt" "os" + "os/signal" "path/filepath" + "syscall" "time" "github.com/fsnotify/fsnotify" @@ -74,6 +76,10 @@ func stubAllocFunc(r *pluginapi.AllocateRequest, devs map[string]pluginapi.Devic } func main() { + // respond to syscalls for termination + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + devs := []*pluginapi.Device{ {ID: "Dev-1", Health: pluginapi.Healthy}, {ID: "Dev-2", Health: pluginapi.Healthy}, @@ -94,17 +100,36 @@ func main() { } dp1.SetAllocFunc(stubAllocFunc) + var registerControlFile string + autoregister := true + + if registerControlFile = os.Getenv("REGISTER_CONTROL_FILE"); registerControlFile != "" { + autoregister = false + } + if !autoregister { - if registerControlFile := os.Getenv("REGISTER_CONTROL_FILE"); registerControlFile != "" { if err := handleRegistrationProcess(registerControlFile); err != nil { panic(err) } - } + if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { + panic(err) + } + select {} + } else { + if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { + panic(err) + } - if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { - panic(err) + go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath) + + // Catch termination signals + sig := <-sigCh + klog.InfoS("Shutting down, received signal", "signal", sig) + if err := dp1.Stop(); err != nil { + panic(err) + } + return } - select {} } func handleRegistrationProcess(registerControlFile string) error { @@ -123,7 +148,7 @@ func handleRegistrationProcess(registerControlFile string) error { defer close(updateCh) go func() { - klog.Infof("Starting watching routine") + klog.InfoS("Starting watching routine") for { select { case event, ok := <-watcher.Events: @@ -131,8 +156,7 @@ func handleRegistrationProcess(registerControlFile string) error { return } klog.InfoS("Received event", "name", event.Name, "operation", event.Op) - switch { - case event.Op&fsnotify.Remove == fsnotify.Remove: + if event.Op&fsnotify.Remove == fsnotify.Remove { if event.Name == registerControlFile { klog.InfoS("Expected delete", "name", event.Name, "operation", event.Op) updateCh <- true From c4c9d61d66f258cb95adff7dcd9e2a7fd82b7d28 Mon Sep 17 00:00:00 2001 From: Swati Sehgal Date: Mon, 12 Jun 2023 22:09:40 +0100 Subject: [PATCH 2/6] node: sample-dp: Handle re-registration for controlled registrations In case `REGISTER_CONTROL_FILE` is specified, we want to ensure that the registration is triggered by deletion of the control file. This is applicable both when the registration happens for the first time and subsequent ones because of kubelet restarts. Signed-off-by: Swati Sehgal --- .../cm/devicemanager/plugin/v1beta1/stub.go | 10 +- .../sampledeviceplugin.go | 116 ++++++++++-------- 2 files changed, 69 insertions(+), 57 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go index 6d899a7ec90..8cc73a7949f 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go @@ -178,7 +178,7 @@ func (m *Stub) Stop() error { return m.cleanup() } -func (m *Stub) Watch(kubeletEndpoint, resourceName string, pluginSockDir string) { +func (m *Stub) Watch(kubeletEndpoint, resourceName string, pluginSockDir string, autoregister bool) { for { select { case stop := <-m.stopWatcher: @@ -196,9 +196,11 @@ func (m *Stub) Watch(kubeletEndpoint, resourceName string, pluginSockDir string) panic(err) } - if err := m.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { - klog.ErrorS(err, "Unable to register to kubelet") - panic(err) + if autoregister { + if err := m.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { + klog.ErrorS(err, "Unable to register to kubelet") + panic(err) + } } } diff --git a/test/images/sample-device-plugin/sampledeviceplugin.go b/test/images/sample-device-plugin/sampledeviceplugin.go index 083dc162418..ea53d526ccf 100644 --- a/test/images/sample-device-plugin/sampledeviceplugin.go +++ b/test/images/sample-device-plugin/sampledeviceplugin.go @@ -107,20 +107,54 @@ func main() { autoregister = false } if !autoregister { + go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath, autoregister) - if err := handleRegistrationProcess(registerControlFile); err != nil { + triggerPath := filepath.Dir(registerControlFile) + + klog.InfoS("Registration process will be managed explicitly", "triggerPath", triggerPath, "triggerEntry", registerControlFile) + + watcher, err := fsnotify.NewWatcher() + if err != nil { + klog.Errorf("Watcher creation failed: %v ", err) panic(err) } - if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { + defer watcher.Close() + + updateCh := make(chan bool) + defer close(updateCh) + + go handleRegistrationProcess(registerControlFile, dp1, watcher, updateCh) + + err = watcher.Add(triggerPath) + if err != nil { + klog.Errorf("Failed to add watch to %q: %w", triggerPath, err) panic(err) } - select {} + for { + select { + case received := <-updateCh: + if received { + if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { + panic(err) + } + klog.InfoS("Control file was deleted, registration succeeded") + } + // Catch termination signals + case sig := <-sigCh: + klog.InfoS("Shutting down, received signal", "signal", sig) + if err := dp1.Stop(); err != nil { + panic(err) + } + return + } + time.Sleep(5 * time.Second) + } } else { if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { panic(err) } - go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath) + go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath, autoregister) // Catch termination signals sig := <-sigCh @@ -132,56 +166,32 @@ func main() { } } -func handleRegistrationProcess(registerControlFile string) error { - triggerPath := filepath.Dir(registerControlFile) - - klog.InfoS("Registration process will be managed explicitly", "triggerPath", triggerPath, "triggerEntry", registerControlFile) - - watcher, err := fsnotify.NewWatcher() - if err != nil { - klog.Errorf("Watcher creation failed: %v ", err) - return err - } - - defer watcher.Close() - updateCh := make(chan bool) - defer close(updateCh) - - go func() { - klog.InfoS("Starting watching routine") - for { - select { - case event, ok := <-watcher.Events: - if !ok { - return - } - klog.InfoS("Received event", "name", event.Name, "operation", event.Op) - if event.Op&fsnotify.Remove == fsnotify.Remove { - if event.Name == registerControlFile { - klog.InfoS("Expected delete", "name", event.Name, "operation", event.Op) - updateCh <- true - return - } - klog.InfoS("Spurious delete", "name", event.Name, "operation", event.Op) - } - case err, ok := <-watcher.Errors: - if !ok { - return - } - klog.Errorf("error: %v", err) - panic(err) +func handleRegistrationProcess(registerControlFile string, dpStub *plugin.Stub, watcher *fsnotify.Watcher, updateCh chan<- bool) { + klog.InfoS("Starting watching routine") + for { + klog.InfoS("handleRegistrationProcess for loop") + select { + case event, ok := <-watcher.Events: + if !ok { + return } + klog.InfoS("Received event", "name", event.Name, "operation", event.Op) + if event.Op&fsnotify.Remove == fsnotify.Remove { + if event.Name == registerControlFile { + klog.InfoS("Expected delete", "name", event.Name, "operation", event.Op) + updateCh <- true + continue + } + klog.InfoS("Spurious delete", "name", event.Name, "operation", event.Op) + } + case err, ok := <-watcher.Errors: + if !ok { + return + } + klog.ErrorS(err, "error") + panic(err) + default: + time.Sleep(5 * time.Second) } - }() - - err = watcher.Add(triggerPath) - if err != nil { - klog.ErrorS(err, "Failed to add watch", "triggerPath", triggerPath) - return err } - - klog.InfoS("Waiting for control file to be deleted", "path", registerControlFile) - <-updateCh - klog.InfoS("Control file was deleted, connecting!") - return nil } From 211d8cc80a7e1f02d96b7fbae3e6e84693551cd7 Mon Sep 17 00:00:00 2001 From: Swati Sehgal Date: Tue, 20 Jun 2023 10:45:54 +0100 Subject: [PATCH 3/6] node: sample-dp: stubRegisterControlFunc for controlling registration If the user specifies the intent to control registration process, we rely on registration triggers (deletion of control file) to prompt registration. This behvaiour is expected to be consistent across kubelet restarts and therefore across the watch calls where we watch for changes to the unix socket so we make this part of Stub object instead of a parameter. Co-authored-by: Francesco Romani Signed-off-by: Swati Sehgal --- .../cm/devicemanager/plugin/v1beta1/stub.go | 20 +++++++++++++++++-- .../sampledeviceplugin.go | 12 ++++++++--- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go index 8cc73a7949f..f62ed55f445 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go @@ -53,6 +53,9 @@ type Stub struct { // getPreferredAllocFunc is used for handling getPreferredAllocation request getPreferredAllocFunc stubGetPreferredAllocFunc + // registerControlFunc is used for controlling auto-registration of requests + registerControlFunc stubRegisterControlFunc + registrationStatus chan watcherapi.RegistrationStatus // for testing endpoint string // for testing @@ -77,6 +80,13 @@ func defaultAllocFunc(r *pluginapi.AllocateRequest, devs map[string]pluginapi.De return &response, nil } +// stubRegisterControlFunc is the function called when a registration request is received from Kubelet +type stubRegisterControlFunc func() bool + +func defaultRegisterControlFunc() bool { + return true +} + // NewDevicePluginStub returns an initialized DevicePlugin Stub. func NewDevicePluginStub(devs []*pluginapi.Device, socket string, name string, preStartContainerFlag bool, getPreferredAllocationFlag bool) *Stub { return &Stub{ @@ -85,6 +95,7 @@ func NewDevicePluginStub(devs []*pluginapi.Device, socket string, name string, p resourceName: name, preStartContainerFlag: preStartContainerFlag, getPreferredAllocationFlag: getPreferredAllocationFlag, + registerControlFunc: defaultRegisterControlFunc, stop: make(chan interface{}), update: make(chan []*pluginapi.Device), @@ -105,6 +116,11 @@ func (m *Stub) SetAllocFunc(f stubAllocFunc) { m.allocFunc = f } +// SetRegisterControlFunc sets RegisterControlFunc of the device plugin +func (m *Stub) SetRegisterControlFunc(f stubRegisterControlFunc) { + m.registerControlFunc = f +} + // Start starts the gRPC server of the device plugin. Can only // be called once. func (m *Stub) Start() error { @@ -178,7 +194,7 @@ func (m *Stub) Stop() error { return m.cleanup() } -func (m *Stub) Watch(kubeletEndpoint, resourceName string, pluginSockDir string, autoregister bool) { +func (m *Stub) Watch(kubeletEndpoint, resourceName, pluginSockDir string) { for { select { case stop := <-m.stopWatcher: @@ -196,7 +212,7 @@ func (m *Stub) Watch(kubeletEndpoint, resourceName string, pluginSockDir string, panic(err) } - if autoregister { + if ok := m.registerControlFunc(); ok { if err := m.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { klog.ErrorS(err, "Unable to register to kubelet") panic(err) diff --git a/test/images/sample-device-plugin/sampledeviceplugin.go b/test/images/sample-device-plugin/sampledeviceplugin.go index ea53d526ccf..c58848534b1 100644 --- a/test/images/sample-device-plugin/sampledeviceplugin.go +++ b/test/images/sample-device-plugin/sampledeviceplugin.go @@ -75,6 +75,11 @@ func stubAllocFunc(r *pluginapi.AllocateRequest, devs map[string]pluginapi.Devic return &responses, nil } +// stubAllocFunc creates and returns allocation response for the input allocate request +func stubRegisterControlFunc() bool { + return false +} + func main() { // respond to syscalls for termination sigCh := make(chan os.Signal, 1) @@ -105,9 +110,11 @@ func main() { if registerControlFile = os.Getenv("REGISTER_CONTROL_FILE"); registerControlFile != "" { autoregister = false + dp1.SetRegisterControlFunc(stubRegisterControlFunc) } + if !autoregister { - go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath, autoregister) + go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath) triggerPath := filepath.Dir(registerControlFile) @@ -154,8 +161,7 @@ func main() { panic(err) } - go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath, autoregister) - + go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath) // Catch termination signals sig := <-sigCh klog.InfoS("Shutting down, received signal", "signal", sig) From f46cdf1428c7f3e22389dff947954ab820b50b8a Mon Sep 17 00:00:00 2001 From: Swati Sehgal Date: Tue, 20 Jun 2023 14:15:31 +0100 Subject: [PATCH 4/6] node: sample-dp: set default value for `pluginSocksDir` Setting a reasonable default in case `PLUGIN_SOCK_DIR` environment variable is not specified. Signed-off-by: Swati Sehgal --- test/images/sample-device-plugin/sampledeviceplugin.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/images/sample-device-plugin/sampledeviceplugin.go b/test/images/sample-device-plugin/sampledeviceplugin.go index c58848534b1..1f78c890a32 100644 --- a/test/images/sample-device-plugin/sampledeviceplugin.go +++ b/test/images/sample-device-plugin/sampledeviceplugin.go @@ -93,8 +93,7 @@ func main() { pluginSocksDir := os.Getenv("PLUGIN_SOCK_DIR") klog.Infof("pluginSocksDir: %s", pluginSocksDir) if pluginSocksDir == "" { - klog.Errorf("Empty pluginSocksDir") - return + pluginSocksDir = pluginapi.DevicePluginPath } socketPath := pluginSocksDir + "/dp." + fmt.Sprintf("%d", time.Now().Unix()) From d0d133298d95c03acdd53e1380fc3ea3e66339be Mon Sep 17 00:00:00 2001 From: Swati Sehgal Date: Thu, 13 Jul 2023 09:17:13 +0100 Subject: [PATCH 5/6] node: sample-dp: Use fsnotify for kubelet restart detection Add kubeletSocket file to fsnotify instead of polling and waiting for deletion of device plugin unix socket as a way of detecting kubelet restart. We need to ensure that the device plugin re-registers itself after kubelet restart depending on the configured registration mode (auto-registration or controller registration). Signed-off-by: Swati Sehgal --- .../cm/devicemanager/plugin/v1beta1/stub.go | 44 ++++++++++++------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go index f62ed55f445..d590e99fa2b 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "github.com/fsnotify/fsnotify" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -59,7 +60,7 @@ type Stub struct { registrationStatus chan watcherapi.RegistrationStatus // for testing endpoint string // for testing - stopWatcher chan bool + kubeletRestartWatcher *fsnotify.Watcher } // stubGetPreferredAllocFunc is the function called when a getPreferredAllocation request is received from Kubelet @@ -89,6 +90,13 @@ func defaultRegisterControlFunc() bool { // NewDevicePluginStub returns an initialized DevicePlugin Stub. func NewDevicePluginStub(devs []*pluginapi.Device, socket string, name string, preStartContainerFlag bool, getPreferredAllocationFlag bool) *Stub { + + watcher, err := fsnotify.NewWatcher() + if err != nil { + klog.ErrorS(err, "Watcher creation failed") + panic(err) + } + return &Stub{ devs: devs, socket: socket, @@ -102,7 +110,7 @@ func NewDevicePluginStub(devs []*pluginapi.Device, socket string, name string, p allocFunc: defaultAllocFunc, getPreferredAllocFunc: defaultGetPreferredAllocFunc, - stopWatcher: make(chan bool, 1), + kubeletRestartWatcher: watcher, } } @@ -140,6 +148,12 @@ func (m *Stub) Start() error { pluginapi.RegisterDevicePluginServer(m.server, m) watcherapi.RegisterRegistrationServer(m.server, m) + err = m.kubeletRestartWatcher.Add(filepath.Dir(m.socket)) + if err != nil { + klog.ErrorS(err, "Failed to add watch", "devicePluginPath", pluginapi.DevicePluginPath) + return err + } + go func() { defer m.wg.Done() m.server.Serve(sock) @@ -184,7 +198,7 @@ func (m *Stub) Stop() error { return nil } - m.stopWatcher <- true + m.kubeletRestartWatcher.Close() m.server.Stop() m.wg.Wait() @@ -197,31 +211,29 @@ func (m *Stub) Stop() error { func (m *Stub) Watch(kubeletEndpoint, resourceName, pluginSockDir string) { for { select { - case stop := <-m.stopWatcher: - if stop { - return - } - default: - _, err := os.Lstat(m.socket) - if err != nil { - // Socket file not found; restart server - klog.InfoS("Server endpoint not found", "socket", m.socket) - klog.InfoS("Most likely Kubelet restarted") + // Detect a kubelet restart by watching for a newly created + // 'pluginapi.KubeletSocket' file. When this occurs, restart + // the device plugin server + case event := <-m.kubeletRestartWatcher.Events: + if event.Name == kubeletEndpoint && event.Op&fsnotify.Create == fsnotify.Create { + klog.InfoS("inotify: file created, restarting", "kubeletEndpoint", kubeletEndpoint) if err := m.Restart(); err != nil { klog.ErrorS(err, "Unable to restart server") panic(err) } if ok := m.registerControlFunc(); ok { - if err := m.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { + if err := m.Register(kubeletEndpoint, resourceName, pluginSockDir); err != nil { klog.ErrorS(err, "Unable to register to kubelet") panic(err) } } - } + + // Watch for any other fs errors and log them. + case err := <-m.kubeletRestartWatcher.Errors: + klog.ErrorS(err, "inotify error") } - time.Sleep(5 * time.Second) } } From 9a354fc9d03f800d6014543841fc7cb2762aab22 Mon Sep 17 00:00:00 2001 From: Swati Sehgal Date: Fri, 22 Sep 2023 13:47:19 +0100 Subject: [PATCH 6/6] node: sample-dp: Add retry to handle device plugin restart failure Add retry mechanism to handle cases where after kubelet restarts, the device plugin unix socket(s) were created but not ready to serve yet. Signed-off-by: Swati Sehgal --- .../cm/devicemanager/plugin/v1beta1/stub.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go index d590e99fa2b..fbec3456e46 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go @@ -217,11 +217,22 @@ func (m *Stub) Watch(kubeletEndpoint, resourceName, pluginSockDir string) { case event := <-m.kubeletRestartWatcher.Events: if event.Name == kubeletEndpoint && event.Op&fsnotify.Create == fsnotify.Create { klog.InfoS("inotify: file created, restarting", "kubeletEndpoint", kubeletEndpoint) - if err := m.Restart(); err != nil { - klog.ErrorS(err, "Unable to restart server") - panic(err) + var lastErr error + err := wait.PollUntilContextTimeout(context.Background(), 10*time.Second, 2*time.Minute, false, func(context.Context) (done bool, err error) { + restartErr := m.Restart() + if restartErr == nil { + return true, nil + } + klog.ErrorS(restartErr, "Retrying after error") + lastErr = restartErr + return false, nil + }) + if err != nil { + klog.ErrorS(err, "Unable to restart server: wait timed out", "lastErr", lastErr.Error()) + panic(err) } + if ok := m.registerControlFunc(); ok { if err := m.Register(kubeletEndpoint, resourceName, pluginSockDir); err != nil { klog.ErrorS(err, "Unable to register to kubelet")