From c4c9d61d66f258cb95adff7dcd9e2a7fd82b7d28 Mon Sep 17 00:00:00 2001 From: Swati Sehgal Date: Mon, 12 Jun 2023 22:09:40 +0100 Subject: [PATCH] node: sample-dp: Handle re-registration for controlled registrations In case `REGISTER_CONTROL_FILE` is specified, we want to ensure that the registration is triggered by deletion of the control file. This is applicable both when the registration happens for the first time and subsequent ones because of kubelet restarts. Signed-off-by: Swati Sehgal --- .../cm/devicemanager/plugin/v1beta1/stub.go | 10 +- .../sampledeviceplugin.go | 116 ++++++++++-------- 2 files changed, 69 insertions(+), 57 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go index 6d899a7ec90..8cc73a7949f 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go @@ -178,7 +178,7 @@ func (m *Stub) Stop() error { return m.cleanup() } -func (m *Stub) Watch(kubeletEndpoint, resourceName string, pluginSockDir string) { +func (m *Stub) Watch(kubeletEndpoint, resourceName string, pluginSockDir string, autoregister bool) { for { select { case stop := <-m.stopWatcher: @@ -196,9 +196,11 @@ func (m *Stub) Watch(kubeletEndpoint, resourceName string, pluginSockDir string) panic(err) } - if err := m.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { - klog.ErrorS(err, "Unable to register to kubelet") - panic(err) + if autoregister { + if err := m.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { + klog.ErrorS(err, "Unable to register to kubelet") + panic(err) + } } } diff --git a/test/images/sample-device-plugin/sampledeviceplugin.go b/test/images/sample-device-plugin/sampledeviceplugin.go index 083dc162418..ea53d526ccf 100644 --- a/test/images/sample-device-plugin/sampledeviceplugin.go +++ b/test/images/sample-device-plugin/sampledeviceplugin.go @@ -107,20 +107,54 @@ func main() { autoregister = false } if !autoregister { + go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath, autoregister) - if err := handleRegistrationProcess(registerControlFile); err != nil { + triggerPath := filepath.Dir(registerControlFile) + + klog.InfoS("Registration process will be managed explicitly", "triggerPath", triggerPath, "triggerEntry", registerControlFile) + + watcher, err := fsnotify.NewWatcher() + if err != nil { + klog.Errorf("Watcher creation failed: %v ", err) panic(err) } - if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { + defer watcher.Close() + + updateCh := make(chan bool) + defer close(updateCh) + + go handleRegistrationProcess(registerControlFile, dp1, watcher, updateCh) + + err = watcher.Add(triggerPath) + if err != nil { + klog.Errorf("Failed to add watch to %q: %w", triggerPath, err) panic(err) } - select {} + for { + select { + case received := <-updateCh: + if received { + if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { + panic(err) + } + klog.InfoS("Control file was deleted, registration succeeded") + } + // Catch termination signals + case sig := <-sigCh: + klog.InfoS("Shutting down, received signal", "signal", sig) + if err := dp1.Stop(); err != nil { + panic(err) + } + return + } + time.Sleep(5 * time.Second) + } } else { if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { panic(err) } - go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath) + go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath, autoregister) // Catch termination signals sig := <-sigCh @@ -132,56 +166,32 @@ func main() { } } -func handleRegistrationProcess(registerControlFile string) error { - triggerPath := filepath.Dir(registerControlFile) - - klog.InfoS("Registration process will be managed explicitly", "triggerPath", triggerPath, "triggerEntry", registerControlFile) - - watcher, err := fsnotify.NewWatcher() - if err != nil { - klog.Errorf("Watcher creation failed: %v ", err) - return err - } - - defer watcher.Close() - updateCh := make(chan bool) - defer close(updateCh) - - go func() { - klog.InfoS("Starting watching routine") - for { - select { - case event, ok := <-watcher.Events: - if !ok { - return - } - klog.InfoS("Received event", "name", event.Name, "operation", event.Op) - if event.Op&fsnotify.Remove == fsnotify.Remove { - if event.Name == registerControlFile { - klog.InfoS("Expected delete", "name", event.Name, "operation", event.Op) - updateCh <- true - return - } - klog.InfoS("Spurious delete", "name", event.Name, "operation", event.Op) - } - case err, ok := <-watcher.Errors: - if !ok { - return - } - klog.Errorf("error: %v", err) - panic(err) +func handleRegistrationProcess(registerControlFile string, dpStub *plugin.Stub, watcher *fsnotify.Watcher, updateCh chan<- bool) { + klog.InfoS("Starting watching routine") + for { + klog.InfoS("handleRegistrationProcess for loop") + select { + case event, ok := <-watcher.Events: + if !ok { + return } + klog.InfoS("Received event", "name", event.Name, "operation", event.Op) + if event.Op&fsnotify.Remove == fsnotify.Remove { + if event.Name == registerControlFile { + klog.InfoS("Expected delete", "name", event.Name, "operation", event.Op) + updateCh <- true + continue + } + klog.InfoS("Spurious delete", "name", event.Name, "operation", event.Op) + } + case err, ok := <-watcher.Errors: + if !ok { + return + } + klog.ErrorS(err, "error") + panic(err) + default: + time.Sleep(5 * time.Second) } - }() - - err = watcher.Add(triggerPath) - if err != nil { - klog.ErrorS(err, "Failed to add watch", "triggerPath", triggerPath) - return err } - - klog.InfoS("Waiting for control file to be deleted", "path", registerControlFile) - <-updateCh - klog.InfoS("Control file was deleted, connecting!") - return nil }