diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go index 1d226f72a85..6d899a7ec90 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go @@ -56,6 +56,7 @@ type Stub struct { registrationStatus chan watcherapi.RegistrationStatus // for testing endpoint string // for testing + stopWatcher chan bool } // stubGetPreferredAllocFunc is the function called when a getPreferredAllocation request is received from Kubelet @@ -90,6 +91,7 @@ func NewDevicePluginStub(devs []*pluginapi.Device, socket string, name string, p allocFunc: defaultAllocFunc, getPreferredAllocFunc: defaultGetPreferredAllocFunc, + stopWatcher: make(chan bool, 1), } } @@ -106,6 +108,7 @@ func (m *Stub) SetAllocFunc(f stubAllocFunc) { // Start starts the gRPC server of the device plugin. Can only // be called once. func (m *Stub) Start() error { + klog.InfoS("Starting device plugin server") err := m.cleanup() if err != nil { return err @@ -144,13 +147,29 @@ func (m *Stub) Start() error { return nil } +func (m *Stub) Restart() error { + klog.InfoS("Restarting Device Plugin server") + if m.server == nil { + return nil + } + + m.server.Stop() + m.server = nil + + return m.Start() +} + // Stop stops the gRPC server. Can be called without a prior Start // and more than once. Not safe to be called concurrently by different // goroutines! func (m *Stub) Stop() error { + klog.InfoS("Stopping device plugin server") if m.server == nil { return nil } + + m.stopWatcher <- true + m.server.Stop() m.wg.Wait() m.server = nil @@ -159,6 +178,35 @@ func (m *Stub) Stop() error { return m.cleanup() } +func (m *Stub) Watch(kubeletEndpoint, resourceName string, pluginSockDir string) { + for { + select { + case stop := <-m.stopWatcher: + if stop { + return + } + default: + _, err := os.Lstat(m.socket) + if err != nil { + // Socket file not found; restart server + klog.InfoS("Server endpoint not found", "socket", m.socket) + klog.InfoS("Most likely Kubelet restarted") + if err := m.Restart(); err != nil { + klog.ErrorS(err, "Unable to restart server") + panic(err) + + } + if err := m.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { + klog.ErrorS(err, "Unable to register to kubelet") + panic(err) + } + + } + } + time.Sleep(5 * time.Second) + } +} + // GetInfo is the RPC which return pluginInfo func (m *Stub) GetInfo(ctx context.Context, req *watcherapi.InfoRequest) (*watcherapi.PluginInfo, error) { klog.InfoS("GetInfo") @@ -182,6 +230,8 @@ func (m *Stub) NotifyRegistrationStatus(ctx context.Context, status *watcherapi. // Register registers the device plugin for the given resourceName with Kubelet. func (m *Stub) Register(kubeletEndpoint, resourceName string, pluginSockDir string) error { + klog.InfoS("Register", "kubeletEndpoint", kubeletEndpoint, "resourceName", resourceName, "socket", pluginSockDir) + if pluginSockDir != "" { if _, err := os.Stat(pluginSockDir + "DEPRECATION"); err == nil { klog.InfoS("Deprecation file found. Skip registration") @@ -214,6 +264,13 @@ func (m *Stub) Register(kubeletEndpoint, resourceName string, pluginSockDir stri } _, err = client.Register(context.Background(), reqt) + if err != nil { + // Stop server + m.server.Stop() + klog.ErrorS(err, "Client unable to register to kubelet") + return err + } + klog.InfoS("Device Plugin registered with the Kubelet") return err } diff --git a/test/images/sample-device-plugin/VERSION b/test/images/sample-device-plugin/VERSION index c239c60cba2..810ee4e91e2 100644 --- a/test/images/sample-device-plugin/VERSION +++ b/test/images/sample-device-plugin/VERSION @@ -1 +1 @@ -1.5 +1.6 diff --git a/test/images/sample-device-plugin/sampledeviceplugin.go b/test/images/sample-device-plugin/sampledeviceplugin.go index b8d5460bb4b..083dc162418 100644 --- a/test/images/sample-device-plugin/sampledeviceplugin.go +++ b/test/images/sample-device-plugin/sampledeviceplugin.go @@ -19,7 +19,9 @@ package main import ( "fmt" "os" + "os/signal" "path/filepath" + "syscall" "time" "github.com/fsnotify/fsnotify" @@ -74,6 +76,10 @@ func stubAllocFunc(r *pluginapi.AllocateRequest, devs map[string]pluginapi.Devic } func main() { + // respond to syscalls for termination + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + devs := []*pluginapi.Device{ {ID: "Dev-1", Health: pluginapi.Healthy}, {ID: "Dev-2", Health: pluginapi.Healthy}, @@ -94,17 +100,36 @@ func main() { } dp1.SetAllocFunc(stubAllocFunc) + var registerControlFile string + autoregister := true + + if registerControlFile = os.Getenv("REGISTER_CONTROL_FILE"); registerControlFile != "" { + autoregister = false + } + if !autoregister { - if registerControlFile := os.Getenv("REGISTER_CONTROL_FILE"); registerControlFile != "" { if err := handleRegistrationProcess(registerControlFile); err != nil { panic(err) } - } + if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { + panic(err) + } + select {} + } else { + if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { + panic(err) + } - if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { - panic(err) + go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath) + + // Catch termination signals + sig := <-sigCh + klog.InfoS("Shutting down, received signal", "signal", sig) + if err := dp1.Stop(); err != nil { + panic(err) + } + return } - select {} } func handleRegistrationProcess(registerControlFile string) error { @@ -123,7 +148,7 @@ func handleRegistrationProcess(registerControlFile string) error { defer close(updateCh) go func() { - klog.Infof("Starting watching routine") + klog.InfoS("Starting watching routine") for { select { case event, ok := <-watcher.Events: @@ -131,8 +156,7 @@ func handleRegistrationProcess(registerControlFile string) error { return } klog.InfoS("Received event", "name", event.Name, "operation", event.Op) - switch { - case event.Op&fsnotify.Remove == fsnotify.Remove: + if event.Op&fsnotify.Remove == fsnotify.Remove { if event.Name == registerControlFile { klog.InfoS("Expected delete", "name", event.Name, "operation", event.Op) updateCh <- true