diff --git a/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go b/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go index 904e8015a46..46c9ada1fc5 100644 --- a/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go @@ -75,6 +75,18 @@ type actualStateOfWorld struct { var _ ActualStateOfWorld = &actualStateOfWorld{} +// NamedPluginHandler holds information for handler and the name of the plugin +type NamedPluginHandler struct { + Handler PluginHandler + Name string +} + +// SocketPluginHandlers contains the map from socket path to NamedPluginHandler +type SocketPluginHandlers struct { + Handlers map[string]NamedPluginHandler + sync.Mutex +} + // PluginInfo holds information of a plugin type PluginInfo struct { SocketPath string diff --git a/pkg/kubelet/pluginmanager/operationexecutor/operation_executor.go b/pkg/kubelet/pluginmanager/operationexecutor/operation_executor.go index 12ae38ee404..603bb2e4abf 100644 --- a/pkg/kubelet/pluginmanager/operationexecutor/operation_executor.go +++ b/pkg/kubelet/pluginmanager/operationexecutor/operation_executor.go @@ -45,11 +45,11 @@ import ( type OperationExecutor interface { // RegisterPlugin registers the given plugin using the a handler in the plugin handler map. // It then updates the actual state of the world to reflect that. - RegisterPlugin(socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error + RegisterPlugin(socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, pathToHandlers *cache.SocketPluginHandlers, actualStateOfWorld ActualStateOfWorldUpdater) error // UnregisterPlugin deregisters the given plugin using a handler in the given plugin handler map. // It then updates the actual state of the world to reflect that. - UnregisterPlugin(socketPath string, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error + UnregisterPlugin(socketPath string, pluginHandlers map[string]cache.PluginHandler, pathToHandlers *cache.SocketPluginHandlers, actualStateOfWorld ActualStateOfWorldUpdater) error } // NewOperationExecutor returns a new instance of OperationExecutor. @@ -96,9 +96,10 @@ func (oe *operationExecutor) RegisterPlugin( socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, + pathToHandlers *cache.SocketPluginHandlers, actualStateOfWorld ActualStateOfWorldUpdater) error { generatedOperation := - oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, timestamp, pluginHandlers, actualStateOfWorld) + oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, timestamp, pluginHandlers, pathToHandlers, actualStateOfWorld) return oe.pendingOperations.Run( socketPath, generatedOperation) @@ -107,9 +108,10 @@ func (oe *operationExecutor) RegisterPlugin( func (oe *operationExecutor) UnregisterPlugin( socketPath string, pluginHandlers map[string]cache.PluginHandler, + pathToHandlers *cache.SocketPluginHandlers, actualStateOfWorld ActualStateOfWorldUpdater) error { generatedOperation := - oe.operationGenerator.GenerateUnregisterPluginFunc(socketPath, pluginHandlers, actualStateOfWorld) + oe.operationGenerator.GenerateUnregisterPluginFunc(socketPath, pluginHandlers, pathToHandlers, actualStateOfWorld) return oe.pendingOperations.Run( socketPath, generatedOperation) diff --git a/pkg/kubelet/pluginmanager/operationexecutor/operation_executor_test.go b/pkg/kubelet/pluginmanager/operationexecutor/operation_executor_test.go index a4b2424dae9..0af0df4fcbd 100644 --- a/pkg/kubelet/pluginmanager/operationexecutor/operation_executor_test.go +++ b/pkg/kubelet/pluginmanager/operationexecutor/operation_executor_test.go @@ -44,9 +44,10 @@ func init() { func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T) { ch, quit, oe := setup() + hdlr := cache.SocketPluginHandlers{} for i := 0; i < numPluginsToRegister; i++ { socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i) - oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */) + oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, &hdlr, nil /* actual state of the world updator */) } if !isOperationRunConcurrently(ch, quit, numPluginsToRegister) { t.Fatalf("Unable to start register operations in Concurrent for plugins") @@ -56,8 +57,9 @@ func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T) func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) { ch, quit, oe := setup() socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir) + hdlr := cache.SocketPluginHandlers{} for i := 0; i < numPluginsToRegister; i++ { - oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */) + oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, &hdlr, nil /* actual state of the world updator */) } if !isOperationRunSerially(ch, quit) { @@ -67,9 +69,10 @@ func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) { func TestOperationExecutor_UnregisterPlugin_ConcurrentUnregisterPlugin(t *testing.T) { ch, quit, oe := setup() + hdlr := cache.SocketPluginHandlers{} for i := 0; i < numPluginsToUnregister; i++ { socketPath := "socket-path" + strconv.Itoa(i) - oe.UnregisterPlugin(socketPath, nil /* plugin handlers */, nil /* actual state of the world updator */) + oe.UnregisterPlugin(socketPath, nil /* plugin handlers */, &hdlr, nil /* actual state of the world updator */) } if !isOperationRunConcurrently(ch, quit, numPluginsToUnregister) { @@ -80,8 +83,9 @@ func TestOperationExecutor_UnregisterPlugin_ConcurrentUnregisterPlugin(t *testin func TestOperationExecutor_UnregisterPlugin_SerialUnregisterPlugin(t *testing.T) { ch, quit, oe := setup() socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir) + hdlr := cache.SocketPluginHandlers{} for i := 0; i < numPluginsToUnregister; i++ { - oe.UnregisterPlugin(socketPath, nil /* plugin handlers */, nil /* actual state of the world updator */) + oe.UnregisterPlugin(socketPath, nil /* plugin handlers */, &hdlr, nil /* actual state of the world updator */) } if !isOperationRunSerially(ch, quit) { @@ -105,6 +109,7 @@ func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc( socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, + pathToHandlers *cache.SocketPluginHandlers, actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error { opFunc := func() error { @@ -117,6 +122,7 @@ func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc( func (fopg *fakeOperationGenerator) GenerateUnregisterPluginFunc( socketPath string, pluginHandlers map[string]cache.PluginHandler, + pathToHandlers *cache.SocketPluginHandlers, actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error { opFunc := func() error { startOperationAndBlock(fopg.ch, fopg.quit) diff --git a/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go b/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go index a16546601df..407da39e706 100644 --- a/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go +++ b/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go @@ -63,12 +63,14 @@ type OperationGenerator interface { socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, + pathToHandlers *cache.SocketPluginHandlers, actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error // Generates the UnregisterPlugin function needed to perform the unregistration of a plugin GenerateUnregisterPluginFunc( socketPath string, pluginHandlers map[string]cache.PluginHandler, + pathToHandlers *cache.SocketPluginHandlers, actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error } @@ -76,6 +78,7 @@ func (og *operationGenerator) GenerateRegisterPluginFunc( socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, + pathToHandlers *cache.SocketPluginHandlers, actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error { registerPluginFunc := func() error { @@ -122,6 +125,12 @@ func (og *operationGenerator) GenerateRegisterPluginFunc( if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil { return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err)) } + pathToHandlers.Lock() + if pathToHandlers.Handlers == nil { + pathToHandlers.Handlers = make(map[string]cache.NamedPluginHandler) + } + pathToHandlers.Handlers[socketPath] = cache.NamedPluginHandler{Handler: handler, Name: infoResp.Name} + pathToHandlers.Unlock() // Notify is called after register to guarantee that even if notify throws an error Register will always be called after validate if err := og.notifyPlugin(client, true, ""); err != nil { @@ -135,33 +144,35 @@ func (og *operationGenerator) GenerateRegisterPluginFunc( func (og *operationGenerator) GenerateUnregisterPluginFunc( socketPath string, pluginHandlers map[string]cache.PluginHandler, + pathToHandlers *cache.SocketPluginHandlers, actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error { unregisterPluginFunc := func() error { - client, conn, err := dial(socketPath, dialTimeoutDuration) + _, conn, err := dial(socketPath, dialTimeoutDuration) if err != nil { - return fmt.Errorf("UnregisterPlugin error -- dial failed at socket %s, err: %v", socketPath, err) - } - defer conn.Close() - - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - - infoResp, err := client.GetInfo(ctx, ®isterapi.InfoRequest{}) - if err != nil { - return fmt.Errorf("UnregisterPlugin error -- failed to get plugin info using RPC GetInfo at socket %s, err: %v", socketPath, err) + klog.V(4).Infof("unable to dial: %v", err) + } else { + conn.Close() } - handler, ok := pluginHandlers[infoResp.Type] - if !ok { - return fmt.Errorf("UnregisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath) - } + var handlerWithName cache.NamedPluginHandler + pathToHandlers.Lock() + handlerWithName, handlerFound := pathToHandlers.Handlers[socketPath] + pathToHandlers.Unlock() + if !handlerFound { + return fmt.Errorf("UnregisterPlugin error -- failed to get plugin handler for %s", socketPath) + } // We remove the plugin to the actual state of world cache before calling a plugin consumer's Unregister handle // so that if we receive a register event during Register Plugin, we can process it as a Register call. actualStateOfWorldUpdater.RemovePlugin(socketPath) - handler.DeRegisterPlugin(infoResp.Name) + handlerWithName.Handler.DeRegisterPlugin(handlerWithName.Name) + + pathToHandlers.Lock() + delete(pathToHandlers.Handlers, socketPath) + pathToHandlers.Unlock() + klog.V(4).Infof("DeRegisterPlugin called for %s on %v", handlerWithName.Name, handlerWithName.Handler) return nil } return unregisterPluginFunc diff --git a/pkg/kubelet/pluginmanager/reconciler/reconciler.go b/pkg/kubelet/pluginmanager/reconciler/reconciler.go index 7f6790d5c16..d23f4b0c3b6 100644 --- a/pkg/kubelet/pluginmanager/reconciler/reconciler.go +++ b/pkg/kubelet/pluginmanager/reconciler/reconciler.go @@ -67,6 +67,7 @@ func NewReconciler( desiredStateOfWorld: desiredStateOfWorld, actualStateOfWorld: actualStateOfWorld, handlers: make(map[string]cache.PluginHandler), + pathToHandlers: cache.SocketPluginHandlers{Handlers: make(map[string]cache.NamedPluginHandler)}, } } @@ -76,6 +77,7 @@ type reconciler struct { desiredStateOfWorld cache.DesiredStateOfWorld actualStateOfWorld cache.ActualStateOfWorld handlers map[string]cache.PluginHandler + pathToHandlers cache.SocketPluginHandlers sync.RWMutex } @@ -103,6 +105,13 @@ func (rc *reconciler) getHandlers() map[string]cache.PluginHandler { return rc.handlers } +func (rc *reconciler) getPathToHandlers() *cache.SocketPluginHandlers { + rc.RLock() + defer rc.RUnlock() + + return &rc.pathToHandlers +} + func (rc *reconciler) reconcile() { // Unregisterations are triggered before registrations @@ -127,7 +136,7 @@ func (rc *reconciler) reconcile() { if unregisterPlugin { klog.V(5).Infof(registeredPlugin.GenerateMsgDetailed("Starting operationExecutor.UnregisterPlugin", "")) - err := rc.operationExecutor.UnregisterPlugin(registeredPlugin.SocketPath, rc.getHandlers(), rc.actualStateOfWorld) + err := rc.operationExecutor.UnregisterPlugin(registeredPlugin.SocketPath, rc.getHandlers(), rc.getPathToHandlers(), rc.actualStateOfWorld) if err != nil && !goroutinemap.IsAlreadyExists(err) && !exponentialbackoff.IsExponentialBackoff(err) { @@ -145,7 +154,7 @@ func (rc *reconciler) reconcile() { for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() { if !rc.actualStateOfWorld.PluginExistsWithCorrectTimestamp(pluginToRegister) { klog.V(5).Infof(pluginToRegister.GenerateMsgDetailed("Starting operationExecutor.RegisterPlugin", "")) - err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.actualStateOfWorld) + err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.getPathToHandlers(), rc.actualStateOfWorld) if err != nil && !goroutinemap.IsAlreadyExists(err) && !exponentialbackoff.IsExponentialBackoff(err) { diff --git a/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go b/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go index 5749e56110c..7ccc133189e 100644 --- a/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go @@ -252,6 +252,7 @@ func Test_Run_Positive_RegisterThenUnregister(t *testing.T) { } dsw.RemovePlugin(socketPath) + os.Remove(socketPath) waitForUnregistration(t, socketPath, asw) // Get asw plugins; it should no longer contain the added plugin