diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go index 6d899a7ec90..8cc73a7949f 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go @@ -178,7 +178,7 @@ func (m *Stub) Stop() error { return m.cleanup() } -func (m *Stub) Watch(kubeletEndpoint, resourceName string, pluginSockDir string) { +func (m *Stub) Watch(kubeletEndpoint, resourceName string, pluginSockDir string, autoregister bool) { for { select { case stop := <-m.stopWatcher: @@ -196,9 +196,11 @@ func (m *Stub) Watch(kubeletEndpoint, resourceName string, pluginSockDir string) panic(err) } - if err := m.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { - klog.ErrorS(err, "Unable to register to kubelet") - panic(err) + if autoregister { + if err := m.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { + klog.ErrorS(err, "Unable to register to kubelet") + panic(err) + } } } diff --git a/test/images/sample-device-plugin/sampledeviceplugin.go b/test/images/sample-device-plugin/sampledeviceplugin.go index 083dc162418..ea53d526ccf 100644 --- a/test/images/sample-device-plugin/sampledeviceplugin.go +++ b/test/images/sample-device-plugin/sampledeviceplugin.go @@ -107,20 +107,54 @@ func main() { autoregister = false } if !autoregister { + go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath, autoregister) - if err := handleRegistrationProcess(registerControlFile); err != nil { + triggerPath := filepath.Dir(registerControlFile) + + klog.InfoS("Registration process will be managed explicitly", "triggerPath", triggerPath, "triggerEntry", registerControlFile) + + watcher, err := fsnotify.NewWatcher() + if err != nil { + klog.Errorf("Watcher creation failed: %v ", err) panic(err) } - if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { + defer watcher.Close() + + updateCh := make(chan bool) + defer close(updateCh) + + go handleRegistrationProcess(registerControlFile, dp1, watcher, updateCh) + + err = watcher.Add(triggerPath) + if err != nil { + klog.Errorf("Failed to add watch to %q: %w", triggerPath, err) panic(err) } - select {} + for { + select { + case received := <-updateCh: + if received { + if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { + panic(err) + } + klog.InfoS("Control file was deleted, registration succeeded") + } + // Catch termination signals + case sig := <-sigCh: + klog.InfoS("Shutting down, received signal", "signal", sig) + if err := dp1.Stop(); err != nil { + panic(err) + } + return + } + time.Sleep(5 * time.Second) + } } else { if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { panic(err) } - go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath) + go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath, autoregister) // Catch termination signals sig := <-sigCh @@ -132,56 +166,32 @@ func main() { } } -func handleRegistrationProcess(registerControlFile string) error { - triggerPath := filepath.Dir(registerControlFile) - - klog.InfoS("Registration process will be managed explicitly", "triggerPath", triggerPath, "triggerEntry", registerControlFile) - - watcher, err := fsnotify.NewWatcher() - if err != nil { - klog.Errorf("Watcher creation failed: %v ", err) - return err - } - - defer watcher.Close() - updateCh := make(chan bool) - defer close(updateCh) - - go func() { - klog.InfoS("Starting watching routine") - for { - select { - case event, ok := <-watcher.Events: - if !ok { - return - } - klog.InfoS("Received event", "name", event.Name, "operation", event.Op) - if event.Op&fsnotify.Remove == fsnotify.Remove { - if event.Name == registerControlFile { - klog.InfoS("Expected delete", "name", event.Name, "operation", event.Op) - updateCh <- true - return - } - klog.InfoS("Spurious delete", "name", event.Name, "operation", event.Op) - } - case err, ok := <-watcher.Errors: - if !ok { - return - } - klog.Errorf("error: %v", err) - panic(err) +func handleRegistrationProcess(registerControlFile string, dpStub *plugin.Stub, watcher *fsnotify.Watcher, updateCh chan<- bool) { + klog.InfoS("Starting watching routine") + for { + klog.InfoS("handleRegistrationProcess for loop") + select { + case event, ok := <-watcher.Events: + if !ok { + return } + klog.InfoS("Received event", "name", event.Name, "operation", event.Op) + if event.Op&fsnotify.Remove == fsnotify.Remove { + if event.Name == registerControlFile { + klog.InfoS("Expected delete", "name", event.Name, "operation", event.Op) + updateCh <- true + continue + } + klog.InfoS("Spurious delete", "name", event.Name, "operation", event.Op) + } + case err, ok := <-watcher.Errors: + if !ok { + return + } + klog.ErrorS(err, "error") + panic(err) + default: + time.Sleep(5 * time.Second) } - }() - - err = watcher.Add(triggerPath) - if err != nil { - klog.ErrorS(err, "Failed to add watch", "triggerPath", triggerPath) - return err } - - klog.InfoS("Waiting for control file to be deleted", "path", registerControlFile) - <-updateCh - klog.InfoS("Control file was deleted, connecting!") - return nil }