Merge pull request #118534 from swatisehgal/sample-dp-register-by-default

node: sample-device-plugin: register to kubelet by default and ensure re-registration to kubelet on kubelet restarts
This commit is contained in:
Kubernetes Prow Robot 2023-10-23 13:41:19 +02:00 committed by GitHub
commit f41ede6241
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 191 additions and 54 deletions

View File

@ -24,6 +24,7 @@ import (
"sync"
"time"
"github.com/fsnotify/fsnotify"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
@ -53,9 +54,13 @@ type Stub struct {
// getPreferredAllocFunc is used for handling getPreferredAllocation request
getPreferredAllocFunc stubGetPreferredAllocFunc
// registerControlFunc is used for controlling auto-registration of requests
registerControlFunc stubRegisterControlFunc
registrationStatus chan watcherapi.RegistrationStatus // for testing
endpoint string // for testing
kubeletRestartWatcher *fsnotify.Watcher
}
// stubGetPreferredAllocFunc is the function called when a getPreferredAllocation request is received from Kubelet
@ -76,20 +81,36 @@ func defaultAllocFunc(r *pluginapi.AllocateRequest, devs map[string]pluginapi.De
return &response, nil
}
// stubRegisterControlFunc is the function called when a registration request is received from Kubelet
type stubRegisterControlFunc func() bool
func defaultRegisterControlFunc() bool {
return true
}
// NewDevicePluginStub returns an initialized DevicePlugin Stub.
func NewDevicePluginStub(devs []*pluginapi.Device, socket string, name string, preStartContainerFlag bool, getPreferredAllocationFlag bool) *Stub {
watcher, err := fsnotify.NewWatcher()
if err != nil {
klog.ErrorS(err, "Watcher creation failed")
panic(err)
}
return &Stub{
devs: devs,
socket: socket,
resourceName: name,
preStartContainerFlag: preStartContainerFlag,
getPreferredAllocationFlag: getPreferredAllocationFlag,
registerControlFunc: defaultRegisterControlFunc,
stop: make(chan interface{}),
update: make(chan []*pluginapi.Device),
allocFunc: defaultAllocFunc,
getPreferredAllocFunc: defaultGetPreferredAllocFunc,
kubeletRestartWatcher: watcher,
}
}
@ -103,9 +124,15 @@ func (m *Stub) SetAllocFunc(f stubAllocFunc) {
m.allocFunc = f
}
// SetRegisterControlFunc sets RegisterControlFunc of the device plugin
func (m *Stub) SetRegisterControlFunc(f stubRegisterControlFunc) {
m.registerControlFunc = f
}
// Start starts the gRPC server of the device plugin. Can only
// be called once.
func (m *Stub) Start() error {
klog.InfoS("Starting device plugin server")
err := m.cleanup()
if err != nil {
return err
@ -121,6 +148,12 @@ func (m *Stub) Start() error {
pluginapi.RegisterDevicePluginServer(m.server, m)
watcherapi.RegisterRegistrationServer(m.server, m)
err = m.kubeletRestartWatcher.Add(filepath.Dir(m.socket))
if err != nil {
klog.ErrorS(err, "Failed to add watch", "devicePluginPath", pluginapi.DevicePluginPath)
return err
}
go func() {
defer m.wg.Done()
m.server.Serve(sock)
@ -144,13 +177,29 @@ func (m *Stub) Start() error {
return nil
}
func (m *Stub) Restart() error {
klog.InfoS("Restarting Device Plugin server")
if m.server == nil {
return nil
}
m.server.Stop()
m.server = nil
return m.Start()
}
// Stop stops the gRPC server. Can be called without a prior Start
// and more than once. Not safe to be called concurrently by different
// goroutines!
func (m *Stub) Stop() error {
klog.InfoS("Stopping device plugin server")
if m.server == nil {
return nil
}
m.kubeletRestartWatcher.Close()
m.server.Stop()
m.wg.Wait()
m.server = nil
@ -159,6 +208,46 @@ func (m *Stub) Stop() error {
return m.cleanup()
}
func (m *Stub) Watch(kubeletEndpoint, resourceName, pluginSockDir string) {
for {
select {
// Detect a kubelet restart by watching for a newly created
// 'pluginapi.KubeletSocket' file. When this occurs, restart
// the device plugin server
case event := <-m.kubeletRestartWatcher.Events:
if event.Name == kubeletEndpoint && event.Op&fsnotify.Create == fsnotify.Create {
klog.InfoS("inotify: file created, restarting", "kubeletEndpoint", kubeletEndpoint)
var lastErr error
err := wait.PollUntilContextTimeout(context.Background(), 10*time.Second, 2*time.Minute, false, func(context.Context) (done bool, err error) {
restartErr := m.Restart()
if restartErr == nil {
return true, nil
}
klog.ErrorS(restartErr, "Retrying after error")
lastErr = restartErr
return false, nil
})
if err != nil {
klog.ErrorS(err, "Unable to restart server: wait timed out", "lastErr", lastErr.Error())
panic(err)
}
if ok := m.registerControlFunc(); ok {
if err := m.Register(kubeletEndpoint, resourceName, pluginSockDir); err != nil {
klog.ErrorS(err, "Unable to register to kubelet")
panic(err)
}
}
}
// Watch for any other fs errors and log them.
case err := <-m.kubeletRestartWatcher.Errors:
klog.ErrorS(err, "inotify error")
}
}
}
// GetInfo is the RPC which return pluginInfo
func (m *Stub) GetInfo(ctx context.Context, req *watcherapi.InfoRequest) (*watcherapi.PluginInfo, error) {
klog.InfoS("GetInfo")
@ -182,6 +271,8 @@ func (m *Stub) NotifyRegistrationStatus(ctx context.Context, status *watcherapi.
// Register registers the device plugin for the given resourceName with Kubelet.
func (m *Stub) Register(kubeletEndpoint, resourceName string, pluginSockDir string) error {
klog.InfoS("Register", "kubeletEndpoint", kubeletEndpoint, "resourceName", resourceName, "socket", pluginSockDir)
if pluginSockDir != "" {
if _, err := os.Stat(pluginSockDir + "DEPRECATION"); err == nil {
klog.InfoS("Deprecation file found. Skip registration")
@ -214,6 +305,13 @@ func (m *Stub) Register(kubeletEndpoint, resourceName string, pluginSockDir stri
}
_, err = client.Register(context.Background(), reqt)
if err != nil {
// Stop server
m.server.Stop()
klog.ErrorS(err, "Client unable to register to kubelet")
return err
}
klog.InfoS("Device Plugin registered with the Kubelet")
return err
}

View File

@ -1 +1 @@
1.5
1.6

View File

@ -19,7 +19,9 @@ package main
import (
"fmt"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
"github.com/fsnotify/fsnotify"
@ -73,7 +75,16 @@ func stubAllocFunc(r *pluginapi.AllocateRequest, devs map[string]pluginapi.Devic
return &responses, nil
}
// stubAllocFunc creates and returns allocation response for the input allocate request
func stubRegisterControlFunc() bool {
return false
}
func main() {
// respond to syscalls for termination
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
devs := []*pluginapi.Device{
{ID: "Dev-1", Health: pluginapi.Healthy},
{ID: "Dev-2", Health: pluginapi.Healthy},
@ -82,8 +93,7 @@ func main() {
pluginSocksDir := os.Getenv("PLUGIN_SOCK_DIR")
klog.Infof("pluginSocksDir: %s", pluginSocksDir)
if pluginSocksDir == "" {
klog.Errorf("Empty pluginSocksDir")
return
pluginSocksDir = pluginapi.DevicePluginPath
}
socketPath := pluginSocksDir + "/dp." + fmt.Sprintf("%d", time.Now().Unix())
@ -94,70 +104,99 @@ func main() {
}
dp1.SetAllocFunc(stubAllocFunc)
var registerControlFile string
autoregister := true
if registerControlFile := os.Getenv("REGISTER_CONTROL_FILE"); registerControlFile != "" {
if err := handleRegistrationProcess(registerControlFile); err != nil {
if registerControlFile = os.Getenv("REGISTER_CONTROL_FILE"); registerControlFile != "" {
autoregister = false
dp1.SetRegisterControlFunc(stubRegisterControlFunc)
}
if !autoregister {
go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath)
triggerPath := filepath.Dir(registerControlFile)
klog.InfoS("Registration process will be managed explicitly", "triggerPath", triggerPath, "triggerEntry", registerControlFile)
watcher, err := fsnotify.NewWatcher()
if err != nil {
klog.Errorf("Watcher creation failed: %v ", err)
panic(err)
}
}
defer watcher.Close()
if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil {
panic(err)
}
select {}
}
updateCh := make(chan bool)
defer close(updateCh)
func handleRegistrationProcess(registerControlFile string) error {
triggerPath := filepath.Dir(registerControlFile)
go handleRegistrationProcess(registerControlFile, dp1, watcher, updateCh)
klog.InfoS("Registration process will be managed explicitly", "triggerPath", triggerPath, "triggerEntry", registerControlFile)
watcher, err := fsnotify.NewWatcher()
if err != nil {
klog.Errorf("Watcher creation failed: %v ", err)
return err
}
defer watcher.Close()
updateCh := make(chan bool)
defer close(updateCh)
go func() {
klog.Infof("Starting watching routine")
err = watcher.Add(triggerPath)
if err != nil {
klog.Errorf("Failed to add watch to %q: %w", triggerPath, err)
panic(err)
}
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
klog.InfoS("Received event", "name", event.Name, "operation", event.Op)
switch {
case event.Op&fsnotify.Remove == fsnotify.Remove:
if event.Name == registerControlFile {
klog.InfoS("Expected delete", "name", event.Name, "operation", event.Op)
updateCh <- true
return
case received := <-updateCh:
if received {
if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil {
panic(err)
}
klog.InfoS("Spurious delete", "name", event.Name, "operation", event.Op)
klog.InfoS("Control file was deleted, registration succeeded")
}
case err, ok := <-watcher.Errors:
if !ok {
return
// Catch termination signals
case sig := <-sigCh:
klog.InfoS("Shutting down, received signal", "signal", sig)
if err := dp1.Stop(); err != nil {
panic(err)
}
klog.Errorf("error: %v", err)
panic(err)
return
}
time.Sleep(5 * time.Second)
}
} else {
if err := dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil {
panic(err)
}
}()
err = watcher.Add(triggerPath)
if err != nil {
klog.ErrorS(err, "Failed to add watch", "triggerPath", triggerPath)
return err
go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath)
// Catch termination signals
sig := <-sigCh
klog.InfoS("Shutting down, received signal", "signal", sig)
if err := dp1.Stop(); err != nil {
panic(err)
}
return
}
}
func handleRegistrationProcess(registerControlFile string, dpStub *plugin.Stub, watcher *fsnotify.Watcher, updateCh chan<- bool) {
klog.InfoS("Starting watching routine")
for {
klog.InfoS("handleRegistrationProcess for loop")
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
klog.InfoS("Received event", "name", event.Name, "operation", event.Op)
if event.Op&fsnotify.Remove == fsnotify.Remove {
if event.Name == registerControlFile {
klog.InfoS("Expected delete", "name", event.Name, "operation", event.Op)
updateCh <- true
continue
}
klog.InfoS("Spurious delete", "name", event.Name, "operation", event.Op)
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
klog.ErrorS(err, "error")
panic(err)
default:
time.Sleep(5 * time.Second)
}
}
klog.InfoS("Waiting for control file to be deleted", "path", registerControlFile)
<-updateCh
klog.InfoS("Control file was deleted, connecting!")
return nil
}