node: sample-dp: Handle re-registration for controlled registrations

In case `REGISTER_CONTROL_FILE` is specified, we want to ensure that the
registration is triggered by deletion of the control file. This is
applicable both when the registration happens for the first time and
subsequent ones because of kubelet restarts.

Signed-off-by: Swati Sehgal <swsehgal@redhat.com>
This commit is contained in:
Swati Sehgal 2023-06-12 22:09:40 +01:00
parent 6714e678d3
commit c4c9d61d66
2 changed files with 69 additions and 57 deletions

View File

@ -178,7 +178,7 @@ func (m *Stub) Stop() error {
return m.cleanup()
}
func (m *Stub) Watch(kubeletEndpoint, resourceName string, pluginSockDir string) {
func (m *Stub) Watch(kubeletEndpoint, resourceName string, pluginSockDir string, autoregister bool) {
for {
select {
case stop := <-m.stopWatcher:
@ -196,10 +196,12 @@ func (m *Stub) Watch(kubeletEndpoint, resourceName string, pluginSockDir string)
panic(err)
}
if autoregister {
if err := m.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil {
klog.ErrorS(err, "Unable to register to kubelet")
panic(err)
}
}
}
}

View File

@ -107,20 +107,54 @@ func main() {
autoregister = false
}
if !autoregister {
go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath, autoregister)
if err := handleRegistrationProcess(registerControlFile); err != nil {
triggerPath := filepath.Dir(registerControlFile)
klog.InfoS("Registration process will be managed explicitly", "triggerPath", triggerPath, "triggerEntry", registerControlFile)
watcher, err := fsnotify.NewWatcher()
if err != nil {
klog.Errorf("Watcher creation failed: %v ", err)
panic(err)
}
defer watcher.Close()
updateCh := make(chan bool)
defer close(updateCh)
go handleRegistrationProcess(registerControlFile, dp1, watcher, updateCh)
err = watcher.Add(triggerPath)
if err != nil {
klog.Errorf("Failed to add watch to %q: %w", triggerPath, err)
panic(err)
}
for {
select {
case received := <-updateCh:
if received {
if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil {
panic(err)
}
select {}
klog.InfoS("Control file was deleted, registration succeeded")
}
// Catch termination signals
case sig := <-sigCh:
klog.InfoS("Shutting down, received signal", "signal", sig)
if err := dp1.Stop(); err != nil {
panic(err)
}
return
}
time.Sleep(5 * time.Second)
}
} else {
if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil {
panic(err)
}
go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath)
go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath, autoregister)
// Catch termination signals
sig := <-sigCh
@ -132,24 +166,10 @@ func main() {
}
}
func handleRegistrationProcess(registerControlFile string) error {
triggerPath := filepath.Dir(registerControlFile)
klog.InfoS("Registration process will be managed explicitly", "triggerPath", triggerPath, "triggerEntry", registerControlFile)
watcher, err := fsnotify.NewWatcher()
if err != nil {
klog.Errorf("Watcher creation failed: %v ", err)
return err
}
defer watcher.Close()
updateCh := make(chan bool)
defer close(updateCh)
go func() {
func handleRegistrationProcess(registerControlFile string, dpStub *plugin.Stub, watcher *fsnotify.Watcher, updateCh chan<- bool) {
klog.InfoS("Starting watching routine")
for {
klog.InfoS("handleRegistrationProcess for loop")
select {
case event, ok := <-watcher.Events:
if !ok {
@ -160,7 +180,7 @@ func handleRegistrationProcess(registerControlFile string) error {
if event.Name == registerControlFile {
klog.InfoS("Expected delete", "name", event.Name, "operation", event.Op)
updateCh <- true
return
continue
}
klog.InfoS("Spurious delete", "name", event.Name, "operation", event.Op)
}
@ -168,20 +188,10 @@ func handleRegistrationProcess(registerControlFile string) error {
if !ok {
return
}
klog.Errorf("error: %v", err)
klog.ErrorS(err, "error")
panic(err)
default:
time.Sleep(5 * time.Second)
}
}
}()
err = watcher.Add(triggerPath)
if err != nil {
klog.ErrorS(err, "Failed to add watch", "triggerPath", triggerPath)
return err
}
klog.InfoS("Waiting for control file to be deleted", "path", registerControlFile)
<-updateCh
klog.InfoS("Control file was deleted, connecting!")
return nil
}