diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go index 1d226f72a85..fbec3456e46 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/stub.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "github.com/fsnotify/fsnotify" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -53,9 +54,13 @@ type Stub struct { // getPreferredAllocFunc is used for handling getPreferredAllocation request getPreferredAllocFunc stubGetPreferredAllocFunc + // registerControlFunc is used for controlling auto-registration of requests + registerControlFunc stubRegisterControlFunc + registrationStatus chan watcherapi.RegistrationStatus // for testing endpoint string // for testing + kubeletRestartWatcher *fsnotify.Watcher } // stubGetPreferredAllocFunc is the function called when a getPreferredAllocation request is received from Kubelet @@ -76,20 +81,36 @@ func defaultAllocFunc(r *pluginapi.AllocateRequest, devs map[string]pluginapi.De return &response, nil } +// stubRegisterControlFunc is the function called when a registration request is received from Kubelet +type stubRegisterControlFunc func() bool + +func defaultRegisterControlFunc() bool { + return true +} + // NewDevicePluginStub returns an initialized DevicePlugin Stub. func NewDevicePluginStub(devs []*pluginapi.Device, socket string, name string, preStartContainerFlag bool, getPreferredAllocationFlag bool) *Stub { + + watcher, err := fsnotify.NewWatcher() + if err != nil { + klog.ErrorS(err, "Watcher creation failed") + panic(err) + } + return &Stub{ devs: devs, socket: socket, resourceName: name, preStartContainerFlag: preStartContainerFlag, getPreferredAllocationFlag: getPreferredAllocationFlag, + registerControlFunc: defaultRegisterControlFunc, stop: make(chan interface{}), update: make(chan []*pluginapi.Device), allocFunc: defaultAllocFunc, getPreferredAllocFunc: defaultGetPreferredAllocFunc, + kubeletRestartWatcher: watcher, } } @@ -103,9 +124,15 @@ func (m *Stub) SetAllocFunc(f stubAllocFunc) { m.allocFunc = f } +// SetRegisterControlFunc sets RegisterControlFunc of the device plugin +func (m *Stub) SetRegisterControlFunc(f stubRegisterControlFunc) { + m.registerControlFunc = f +} + // Start starts the gRPC server of the device plugin. Can only // be called once. func (m *Stub) Start() error { + klog.InfoS("Starting device plugin server") err := m.cleanup() if err != nil { return err @@ -121,6 +148,12 @@ func (m *Stub) Start() error { pluginapi.RegisterDevicePluginServer(m.server, m) watcherapi.RegisterRegistrationServer(m.server, m) + err = m.kubeletRestartWatcher.Add(filepath.Dir(m.socket)) + if err != nil { + klog.ErrorS(err, "Failed to add watch", "devicePluginPath", pluginapi.DevicePluginPath) + return err + } + go func() { defer m.wg.Done() m.server.Serve(sock) @@ -144,13 +177,29 @@ func (m *Stub) Start() error { return nil } +func (m *Stub) Restart() error { + klog.InfoS("Restarting Device Plugin server") + if m.server == nil { + return nil + } + + m.server.Stop() + m.server = nil + + return m.Start() +} + // Stop stops the gRPC server. Can be called without a prior Start // and more than once. Not safe to be called concurrently by different // goroutines! func (m *Stub) Stop() error { + klog.InfoS("Stopping device plugin server") if m.server == nil { return nil } + + m.kubeletRestartWatcher.Close() + m.server.Stop() m.wg.Wait() m.server = nil @@ -159,6 +208,46 @@ func (m *Stub) Stop() error { return m.cleanup() } +func (m *Stub) Watch(kubeletEndpoint, resourceName, pluginSockDir string) { + for { + select { + // Detect a kubelet restart by watching for a newly created + // 'pluginapi.KubeletSocket' file. When this occurs, restart + // the device plugin server + case event := <-m.kubeletRestartWatcher.Events: + if event.Name == kubeletEndpoint && event.Op&fsnotify.Create == fsnotify.Create { + klog.InfoS("inotify: file created, restarting", "kubeletEndpoint", kubeletEndpoint) + var lastErr error + + err := wait.PollUntilContextTimeout(context.Background(), 10*time.Second, 2*time.Minute, false, func(context.Context) (done bool, err error) { + restartErr := m.Restart() + if restartErr == nil { + return true, nil + } + klog.ErrorS(restartErr, "Retrying after error") + lastErr = restartErr + return false, nil + }) + if err != nil { + klog.ErrorS(err, "Unable to restart server: wait timed out", "lastErr", lastErr.Error()) + panic(err) + } + + if ok := m.registerControlFunc(); ok { + if err := m.Register(kubeletEndpoint, resourceName, pluginSockDir); err != nil { + klog.ErrorS(err, "Unable to register to kubelet") + panic(err) + } + } + } + + // Watch for any other fs errors and log them. + case err := <-m.kubeletRestartWatcher.Errors: + klog.ErrorS(err, "inotify error") + } + } +} + // GetInfo is the RPC which return pluginInfo func (m *Stub) GetInfo(ctx context.Context, req *watcherapi.InfoRequest) (*watcherapi.PluginInfo, error) { klog.InfoS("GetInfo") @@ -182,6 +271,8 @@ func (m *Stub) NotifyRegistrationStatus(ctx context.Context, status *watcherapi. // Register registers the device plugin for the given resourceName with Kubelet. func (m *Stub) Register(kubeletEndpoint, resourceName string, pluginSockDir string) error { + klog.InfoS("Register", "kubeletEndpoint", kubeletEndpoint, "resourceName", resourceName, "socket", pluginSockDir) + if pluginSockDir != "" { if _, err := os.Stat(pluginSockDir + "DEPRECATION"); err == nil { klog.InfoS("Deprecation file found. Skip registration") @@ -214,6 +305,13 @@ func (m *Stub) Register(kubeletEndpoint, resourceName string, pluginSockDir stri } _, err = client.Register(context.Background(), reqt) + if err != nil { + // Stop server + m.server.Stop() + klog.ErrorS(err, "Client unable to register to kubelet") + return err + } + klog.InfoS("Device Plugin registered with the Kubelet") return err } diff --git a/test/images/sample-device-plugin/VERSION b/test/images/sample-device-plugin/VERSION index c239c60cba2..810ee4e91e2 100644 --- a/test/images/sample-device-plugin/VERSION +++ b/test/images/sample-device-plugin/VERSION @@ -1 +1 @@ -1.5 +1.6 diff --git a/test/images/sample-device-plugin/sampledeviceplugin.go b/test/images/sample-device-plugin/sampledeviceplugin.go index b8d5460bb4b..1f78c890a32 100644 --- a/test/images/sample-device-plugin/sampledeviceplugin.go +++ b/test/images/sample-device-plugin/sampledeviceplugin.go @@ -19,7 +19,9 @@ package main import ( "fmt" "os" + "os/signal" "path/filepath" + "syscall" "time" "github.com/fsnotify/fsnotify" @@ -73,7 +75,16 @@ func stubAllocFunc(r *pluginapi.AllocateRequest, devs map[string]pluginapi.Devic return &responses, nil } +// stubAllocFunc creates and returns allocation response for the input allocate request +func stubRegisterControlFunc() bool { + return false +} + func main() { + // respond to syscalls for termination + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + devs := []*pluginapi.Device{ {ID: "Dev-1", Health: pluginapi.Healthy}, {ID: "Dev-2", Health: pluginapi.Healthy}, @@ -82,8 +93,7 @@ func main() { pluginSocksDir := os.Getenv("PLUGIN_SOCK_DIR") klog.Infof("pluginSocksDir: %s", pluginSocksDir) if pluginSocksDir == "" { - klog.Errorf("Empty pluginSocksDir") - return + pluginSocksDir = pluginapi.DevicePluginPath } socketPath := pluginSocksDir + "/dp." + fmt.Sprintf("%d", time.Now().Unix()) @@ -94,70 +104,99 @@ func main() { } dp1.SetAllocFunc(stubAllocFunc) + var registerControlFile string + autoregister := true - if registerControlFile := os.Getenv("REGISTER_CONTROL_FILE"); registerControlFile != "" { - if err := handleRegistrationProcess(registerControlFile); err != nil { + if registerControlFile = os.Getenv("REGISTER_CONTROL_FILE"); registerControlFile != "" { + autoregister = false + dp1.SetRegisterControlFunc(stubRegisterControlFunc) + } + + if !autoregister { + go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath) + + triggerPath := filepath.Dir(registerControlFile) + + klog.InfoS("Registration process will be managed explicitly", "triggerPath", triggerPath, "triggerEntry", registerControlFile) + + watcher, err := fsnotify.NewWatcher() + if err != nil { + klog.Errorf("Watcher creation failed: %v ", err) panic(err) } - } + defer watcher.Close() - if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { - panic(err) - } - select {} -} + updateCh := make(chan bool) + defer close(updateCh) -func handleRegistrationProcess(registerControlFile string) error { - triggerPath := filepath.Dir(registerControlFile) + go handleRegistrationProcess(registerControlFile, dp1, watcher, updateCh) - klog.InfoS("Registration process will be managed explicitly", "triggerPath", triggerPath, "triggerEntry", registerControlFile) - - watcher, err := fsnotify.NewWatcher() - if err != nil { - klog.Errorf("Watcher creation failed: %v ", err) - return err - } - - defer watcher.Close() - updateCh := make(chan bool) - defer close(updateCh) - - go func() { - klog.Infof("Starting watching routine") + err = watcher.Add(triggerPath) + if err != nil { + klog.Errorf("Failed to add watch to %q: %w", triggerPath, err) + panic(err) + } for { select { - case event, ok := <-watcher.Events: - if !ok { - return - } - klog.InfoS("Received event", "name", event.Name, "operation", event.Op) - switch { - case event.Op&fsnotify.Remove == fsnotify.Remove: - if event.Name == registerControlFile { - klog.InfoS("Expected delete", "name", event.Name, "operation", event.Op) - updateCh <- true - return + case received := <-updateCh: + if received { + if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { + panic(err) } - klog.InfoS("Spurious delete", "name", event.Name, "operation", event.Op) + klog.InfoS("Control file was deleted, registration succeeded") } - case err, ok := <-watcher.Errors: - if !ok { - return + // Catch termination signals + case sig := <-sigCh: + klog.InfoS("Shutting down, received signal", "signal", sig) + if err := dp1.Stop(); err != nil { + panic(err) } - klog.Errorf("error: %v", err) - panic(err) + return } + time.Sleep(5 * time.Second) + } + } else { + if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil { + panic(err) } - }() - err = watcher.Add(triggerPath) - if err != nil { - klog.ErrorS(err, "Failed to add watch", "triggerPath", triggerPath) - return err + go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath) + // Catch termination signals + sig := <-sigCh + klog.InfoS("Shutting down, received signal", "signal", sig) + if err := dp1.Stop(); err != nil { + panic(err) + } + return + } +} + +func handleRegistrationProcess(registerControlFile string, dpStub *plugin.Stub, watcher *fsnotify.Watcher, updateCh chan<- bool) { + klog.InfoS("Starting watching routine") + for { + klog.InfoS("handleRegistrationProcess for loop") + select { + case event, ok := <-watcher.Events: + if !ok { + return + } + klog.InfoS("Received event", "name", event.Name, "operation", event.Op) + if event.Op&fsnotify.Remove == fsnotify.Remove { + if event.Name == registerControlFile { + klog.InfoS("Expected delete", "name", event.Name, "operation", event.Op) + updateCh <- true + continue + } + klog.InfoS("Spurious delete", "name", event.Name, "operation", event.Op) + } + case err, ok := <-watcher.Errors: + if !ok { + return + } + klog.ErrorS(err, "error") + panic(err) + default: + time.Sleep(5 * time.Second) + } } - - klog.InfoS("Waiting for control file to be deleted", "path", registerControlFile) - <-updateCh - klog.InfoS("Control file was deleted, connecting!") - return nil }