From 6714e678d33786e29bf6bfaf78bc8f2ea8e4fc7a Mon Sep 17 00:00:00 2001 From: Swati Sehgal Date: Wed, 7 Jun 2023 13:56:05 +0100 Subject: [PATCH] node: sample-dp: register by default and re-register on restarts In issue: 115107 we added an environment variable to control the registration of sample device plugin to kubelet. The intent of this patch is to ensure that the default behaviour of the plugin is to register to kubelet (in case no environment variable is specified). In addition to that, we want to ensure that the plugin registers itself not just once. It should re-register itself to kubelet in case of node reboot or kubelet restarts. Signed-off-by: Swati Sehgal --- .../cm/devicemanager/plugin/v1beta1/stub.go | 57 +++++++++++++++++++ test/images/sample-device-plugin/VERSION | 2 +- .../sampledeviceplugin.go | 40 ++++++++++--- 3 files changed, 90 insertions(+), 9 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go index 1d226f72a85..6d899a7ec90 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go @@ -56,6 +56,7 @@ type Stub struct { registrationStatus chan watcherapi.RegistrationStatus // for testing endpoint string // for testing + stopWatcher chan bool } // stubGetPreferredAllocFunc is the function called when a getPreferredAllocation request is received from Kubelet @@ -90,6 +91,7 @@ func NewDevicePluginStub(devs []*pluginapi.Device, socket string, name string, p allocFunc: defaultAllocFunc, getPreferredAllocFunc: defaultGetPreferredAllocFunc, + stopWatcher: make(chan bool, 1), } } @@ -106,6 +108,7 @@ func (m *Stub) SetAllocFunc(f stubAllocFunc) { // Start starts the gRPC server of the device plugin. Can only // be called once. func (m *Stub) Start() error { + klog.InfoS("Starting device plugin server") err := m.cleanup() if err != nil { return err @@ -144,13 +147,29 @@ func (m *Stub) Start() error { return nil } +func (m *Stub) Restart() error { + klog.InfoS("Restarting Device Plugin server") + if m.server == nil { + return nil + } + + m.server.Stop() + m.server = nil + + return m.Start() +} + // Stop stops the gRPC server. Can be called without a prior Start // and more than once. Not safe to be called concurrently by different // goroutines! func (m *Stub) Stop() error { + klog.InfoS("Stopping device plugin server") if m.server == nil { return nil } + + m.stopWatcher <- true + m.server.Stop() m.wg.Wait() m.server = nil @@ -159,6 +178,35 @@ func (m *Stub) Stop() error { return m.cleanup() } +func (m *Stub) Watch(kubeletEndpoint, resourceName string, pluginSockDir string) { + for { + select { + case stop := <-m.stopWatcher: + if stop { + return + } + default: + _, err := os.Lstat(m.socket) + if err != nil { + // Socket file not found; restart server + klog.InfoS("Server endpoint not found", "socket", m.socket) + klog.InfoS("Most likely Kubelet restarted") + if err := m.Restart(); err != nil { + klog.ErrorS(err, "Unable to restart server") + panic(err) + + } + if err := m.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { + klog.ErrorS(err, "Unable to register to kubelet") + panic(err) + } + + } + } + time.Sleep(5 * time.Second) + } +} + // GetInfo is the RPC which return pluginInfo func (m *Stub) GetInfo(ctx context.Context, req *watcherapi.InfoRequest) (*watcherapi.PluginInfo, error) { klog.InfoS("GetInfo") @@ -182,6 +230,8 @@ func (m *Stub) NotifyRegistrationStatus(ctx context.Context, status *watcherapi. // Register registers the device plugin for the given resourceName with Kubelet. func (m *Stub) Register(kubeletEndpoint, resourceName string, pluginSockDir string) error { + klog.InfoS("Register", "kubeletEndpoint", kubeletEndpoint, "resourceName", resourceName, "socket", pluginSockDir) + if pluginSockDir != "" { if _, err := os.Stat(pluginSockDir + "DEPRECATION"); err == nil { klog.InfoS("Deprecation file found. Skip registration") @@ -214,6 +264,13 @@ func (m *Stub) Register(kubeletEndpoint, resourceName string, pluginSockDir stri } _, err = client.Register(context.Background(), reqt) + if err != nil { + // Stop server + m.server.Stop() + klog.ErrorS(err, "Client unable to register to kubelet") + return err + } + klog.InfoS("Device Plugin registered with the Kubelet") return err } diff --git a/test/images/sample-device-plugin/VERSION b/test/images/sample-device-plugin/VERSION index c239c60cba2..810ee4e91e2 100644 --- a/test/images/sample-device-plugin/VERSION +++ b/test/images/sample-device-plugin/VERSION @@ -1 +1 @@ -1.5 +1.6 diff --git a/test/images/sample-device-plugin/sampledeviceplugin.go b/test/images/sample-device-plugin/sampledeviceplugin.go index b8d5460bb4b..083dc162418 100644 --- a/test/images/sample-device-plugin/sampledeviceplugin.go +++ b/test/images/sample-device-plugin/sampledeviceplugin.go @@ -19,7 +19,9 @@ package main import ( "fmt" "os" + "os/signal" "path/filepath" + "syscall" "time" "github.com/fsnotify/fsnotify" @@ -74,6 +76,10 @@ func stubAllocFunc(r *pluginapi.AllocateRequest, devs map[string]pluginapi.Devic } func main() { + // respond to syscalls for termination + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + devs := []*pluginapi.Device{ {ID: "Dev-1", Health: pluginapi.Healthy}, {ID: "Dev-2", Health: pluginapi.Healthy}, @@ -94,17 +100,36 @@ func main() { } dp1.SetAllocFunc(stubAllocFunc) + var registerControlFile string + autoregister := true + + if registerControlFile = os.Getenv("REGISTER_CONTROL_FILE"); registerControlFile != "" { + autoregister = false + } + if !autoregister { - if registerControlFile := os.Getenv("REGISTER_CONTROL_FILE"); registerControlFile != "" { if err := handleRegistrationProcess(registerControlFile); err != nil { panic(err) } - } + if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { + panic(err) + } + select {} + } else { + if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { + panic(err) + } - if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { - panic(err) + go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath) + + // Catch termination signals + sig := <-sigCh + klog.InfoS("Shutting down, received signal", "signal", sig) + if err := dp1.Stop(); err != nil { + panic(err) + } + return } - select {} } func handleRegistrationProcess(registerControlFile string) error { @@ -123,7 +148,7 @@ func handleRegistrationProcess(registerControlFile string) error { defer close(updateCh) go func() { - klog.Infof("Starting watching routine") + klog.InfoS("Starting watching routine") for { select { case event, ok := <-watcher.Events: @@ -131,8 +156,7 @@ func handleRegistrationProcess(registerControlFile string) error { return } klog.InfoS("Received event", "name", event.Name, "operation", event.Op) - switch { - case event.Op&fsnotify.Remove == fsnotify.Remove: + if event.Op&fsnotify.Remove == fsnotify.Remove { if event.Name == registerControlFile { klog.InfoS("Expected delete", "name", event.Name, "operation", event.Op) updateCh <- true