node: sample-dp: register by default and re-register on restarts

In issue: 115107 we added an environment variable to control the registration of sample
device plugin to kubelet. The intent of this patch is to ensure that the default
behaviour of the plugin is to register to kubelet (in case no environment
variable is specified).

In addition to that, we want to ensure that the plugin registers itself not just once.
It should re-register itself to kubelet in case of node reboot or kubelet restarts.

Signed-off-by: Swati Sehgal <swsehgal@redhat.com>
This commit is contained in:
Swati Sehgal 2023-06-07 13:56:05 +01:00
parent c46d737ce5
commit 6714e678d3
3 changed files with 90 additions and 9 deletions

View File

@ -56,6 +56,7 @@ type Stub struct {
registrationStatus chan watcherapi.RegistrationStatus // for testing
endpoint string // for testing
stopWatcher chan bool
}
// stubGetPreferredAllocFunc is the function called when a getPreferredAllocation request is received from Kubelet
@ -90,6 +91,7 @@ func NewDevicePluginStub(devs []*pluginapi.Device, socket string, name string, p
allocFunc: defaultAllocFunc,
getPreferredAllocFunc: defaultGetPreferredAllocFunc,
stopWatcher: make(chan bool, 1),
}
}
@ -106,6 +108,7 @@ func (m *Stub) SetAllocFunc(f stubAllocFunc) {
// Start starts the gRPC server of the device plugin. Can only
// be called once.
func (m *Stub) Start() error {
klog.InfoS("Starting device plugin server")
err := m.cleanup()
if err != nil {
return err
@ -144,13 +147,29 @@ func (m *Stub) Start() error {
return nil
}
func (m *Stub) Restart() error {
klog.InfoS("Restarting Device Plugin server")
if m.server == nil {
return nil
}
m.server.Stop()
m.server = nil
return m.Start()
}
// Stop stops the gRPC server. Can be called without a prior Start
// and more than once. Not safe to be called concurrently by different
// goroutines!
func (m *Stub) Stop() error {
klog.InfoS("Stopping device plugin server")
if m.server == nil {
return nil
}
m.stopWatcher <- true
m.server.Stop()
m.wg.Wait()
m.server = nil
@ -159,6 +178,35 @@ func (m *Stub) Stop() error {
return m.cleanup()
}
func (m *Stub) Watch(kubeletEndpoint, resourceName string, pluginSockDir string) {
for {
select {
case stop := <-m.stopWatcher:
if stop {
return
}
default:
_, err := os.Lstat(m.socket)
if err != nil {
// Socket file not found; restart server
klog.InfoS("Server endpoint not found", "socket", m.socket)
klog.InfoS("Most likely Kubelet restarted")
if err := m.Restart(); err != nil {
klog.ErrorS(err, "Unable to restart server")
panic(err)
}
if err := m.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil {
klog.ErrorS(err, "Unable to register to kubelet")
panic(err)
}
}
}
time.Sleep(5 * time.Second)
}
}
// GetInfo is the RPC which return pluginInfo
func (m *Stub) GetInfo(ctx context.Context, req *watcherapi.InfoRequest) (*watcherapi.PluginInfo, error) {
klog.InfoS("GetInfo")
@ -182,6 +230,8 @@ func (m *Stub) NotifyRegistrationStatus(ctx context.Context, status *watcherapi.
// Register registers the device plugin for the given resourceName with Kubelet.
func (m *Stub) Register(kubeletEndpoint, resourceName string, pluginSockDir string) error {
klog.InfoS("Register", "kubeletEndpoint", kubeletEndpoint, "resourceName", resourceName, "socket", pluginSockDir)
if pluginSockDir != "" {
if _, err := os.Stat(pluginSockDir + "DEPRECATION"); err == nil {
klog.InfoS("Deprecation file found. Skip registration")
@ -214,6 +264,13 @@ func (m *Stub) Register(kubeletEndpoint, resourceName string, pluginSockDir stri
}
_, err = client.Register(context.Background(), reqt)
if err != nil {
// Stop server
m.server.Stop()
klog.ErrorS(err, "Client unable to register to kubelet")
return err
}
klog.InfoS("Device Plugin registered with the Kubelet")
return err
}

View File

@ -1 +1 @@
1.5
1.6

View File

@ -19,7 +19,9 @@ package main
import (
"fmt"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
"github.com/fsnotify/fsnotify"
@ -74,6 +76,10 @@ func stubAllocFunc(r *pluginapi.AllocateRequest, devs map[string]pluginapi.Devic
}
func main() {
// respond to syscalls for termination
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
devs := []*pluginapi.Device{
{ID: "Dev-1", Health: pluginapi.Healthy},
{ID: "Dev-2", Health: pluginapi.Healthy},
@ -94,17 +100,36 @@ func main() {
}
dp1.SetAllocFunc(stubAllocFunc)
var registerControlFile string
autoregister := true
if registerControlFile = os.Getenv("REGISTER_CONTROL_FILE"); registerControlFile != "" {
autoregister = false
}
if !autoregister {
if registerControlFile := os.Getenv("REGISTER_CONTROL_FILE"); registerControlFile != "" {
if err := handleRegistrationProcess(registerControlFile); err != nil {
panic(err)
}
}
if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil {
panic(err)
}
select {}
} else {
if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil {
panic(err)
}
if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil {
panic(err)
go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath)
// Catch termination signals
sig := <-sigCh
klog.InfoS("Shutting down, received signal", "signal", sig)
if err := dp1.Stop(); err != nil {
panic(err)
}
return
}
select {}
}
func handleRegistrationProcess(registerControlFile string) error {
@ -123,7 +148,7 @@ func handleRegistrationProcess(registerControlFile string) error {
defer close(updateCh)
go func() {
klog.Infof("Starting watching routine")
klog.InfoS("Starting watching routine")
for {
select {
case event, ok := <-watcher.Events:
@ -131,8 +156,7 @@ func handleRegistrationProcess(registerControlFile string) error {
return
}
klog.InfoS("Received event", "name", event.Name, "operation", event.Op)
switch {
case event.Op&fsnotify.Remove == fsnotify.Remove:
if event.Op&fsnotify.Remove == fsnotify.Remove {
if event.Name == registerControlFile {
klog.InfoS("Expected delete", "name", event.Name, "operation", event.Op)
updateCh <- true