node: sample-dp: stubRegisterControlFunc for controlling registration

If the user specifies the intent to control registration process, we rely on
registration triggers (deletion of control file) to prompt registration.

This behvaiour is expected to be consistent across kubelet restarts and therefore
across the watch calls where we watch for changes to the unix socket so we make
this part of Stub object instead of a parameter.

Co-authored-by: Francesco Romani <fromani@redhat.com>
Signed-off-by: Swati Sehgal <swsehgal@redhat.com>
This commit is contained in:
Swati Sehgal 2023-06-20 10:45:54 +01:00
parent c4c9d61d66
commit 211d8cc80a
2 changed files with 27 additions and 5 deletions

View File

@ -53,6 +53,9 @@ type Stub struct {
// getPreferredAllocFunc is used for handling getPreferredAllocation request
getPreferredAllocFunc stubGetPreferredAllocFunc
// registerControlFunc is used for controlling auto-registration of requests
registerControlFunc stubRegisterControlFunc
registrationStatus chan watcherapi.RegistrationStatus // for testing
endpoint string // for testing
@ -77,6 +80,13 @@ func defaultAllocFunc(r *pluginapi.AllocateRequest, devs map[string]pluginapi.De
return &response, nil
}
// stubRegisterControlFunc is the function called when a registration request is received from Kubelet
type stubRegisterControlFunc func() bool
func defaultRegisterControlFunc() bool {
return true
}
// NewDevicePluginStub returns an initialized DevicePlugin Stub.
func NewDevicePluginStub(devs []*pluginapi.Device, socket string, name string, preStartContainerFlag bool, getPreferredAllocationFlag bool) *Stub {
return &Stub{
@ -85,6 +95,7 @@ func NewDevicePluginStub(devs []*pluginapi.Device, socket string, name string, p
resourceName: name,
preStartContainerFlag: preStartContainerFlag,
getPreferredAllocationFlag: getPreferredAllocationFlag,
registerControlFunc: defaultRegisterControlFunc,
stop: make(chan interface{}),
update: make(chan []*pluginapi.Device),
@ -105,6 +116,11 @@ func (m *Stub) SetAllocFunc(f stubAllocFunc) {
m.allocFunc = f
}
// SetRegisterControlFunc sets RegisterControlFunc of the device plugin
func (m *Stub) SetRegisterControlFunc(f stubRegisterControlFunc) {
m.registerControlFunc = f
}
// Start starts the gRPC server of the device plugin. Can only
// be called once.
func (m *Stub) Start() error {
@ -178,7 +194,7 @@ func (m *Stub) Stop() error {
return m.cleanup()
}
func (m *Stub) Watch(kubeletEndpoint, resourceName string, pluginSockDir string, autoregister bool) {
func (m *Stub) Watch(kubeletEndpoint, resourceName, pluginSockDir string) {
for {
select {
case stop := <-m.stopWatcher:
@ -196,7 +212,7 @@ func (m *Stub) Watch(kubeletEndpoint, resourceName string, pluginSockDir string,
panic(err)
}
if autoregister {
if ok := m.registerControlFunc(); ok {
if err := m.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath); err != nil {
klog.ErrorS(err, "Unable to register to kubelet")
panic(err)

View File

@ -75,6 +75,11 @@ func stubAllocFunc(r *pluginapi.AllocateRequest, devs map[string]pluginapi.Devic
return &responses, nil
}
// stubAllocFunc creates and returns allocation response for the input allocate request
func stubRegisterControlFunc() bool {
return false
}
func main() {
// respond to syscalls for termination
sigCh := make(chan os.Signal, 1)
@ -105,9 +110,11 @@ func main() {
if registerControlFile = os.Getenv("REGISTER_CONTROL_FILE"); registerControlFile != "" {
autoregister = false
dp1.SetRegisterControlFunc(stubRegisterControlFunc)
}
if !autoregister {
go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath, autoregister)
go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath)
triggerPath := filepath.Dir(registerControlFile)
@ -154,8 +161,7 @@ func main() {
panic(err)
}
go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath, autoregister)
go dp1.Watch(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath)
// Catch termination signals
sig := <-sigCh
klog.InfoS("Shutting down, received signal", "signal", sig)