diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go index d1bca0cccae..204afd3d1b1 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go @@ -43,8 +43,8 @@ func (s *server) RegisterPlugin(pluginName string, endpoint string, versions []s return s.connectClient(pluginName, endpoint) } -func (s *server) DeRegisterPlugin(pluginName string) { - klog.V(2).InfoS("Deregistering plugin", "plugin", pluginName) +func (s *server) DeRegisterPlugin(pluginName, endpoint string) { + klog.V(2).InfoS("Deregistering plugin", "plugin", pluginName, "endpoint", endpoint) client := s.getClient(pluginName) if client != nil { s.disconnectClient(pluginName, client) @@ -88,7 +88,6 @@ func (s *server) disconnectClient(name string, c Client) error { s.deregisterClient(name) return c.Disconnect() } - func (s *server) registerClient(name string, c Client) { s.mutex.Lock() defer s.mutex.Unlock() diff --git a/pkg/kubelet/cm/dra/manager_test.go b/pkg/kubelet/cm/dra/manager_test.go index 908cd735847..f65ac87575a 100644 --- a/pkg/kubelet/cm/dra/manager_test.go +++ b/pkg/kubelet/cm/dra/manager_test.go @@ -584,7 +584,7 @@ func TestPrepareResources(t *testing.T) { if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil { t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err) } - defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests + defer plg.DeRegisterPlugin(test.driverName, draServerInfo.socketName) // for sake of next tests if test.claimInfo != nil { manager.cache.add(test.claimInfo) @@ -721,7 +721,7 @@ func TestUnprepareResources(t *testing.T) { if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil { t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err) } - defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests + defer plg.DeRegisterPlugin(test.driverName, draServerInfo.socketName) // for sake of next tests manager := &ManagerImpl{ kubeClient: fakeKubeClient, @@ -891,7 +891,7 @@ func TestParallelPrepareUnprepareResources(t *testing.T) { if err := plg.RegisterPlugin(driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, nil); err != nil { t.Fatalf("failed to register plugin %s, err: %v", driverName, err) } - defer plg.DeRegisterPlugin(driverName) + defer plg.DeRegisterPlugin(driverName, draServerInfo.socketName) // Create ClaimInfo cache cache, err := newClaimInfoCache(t.TempDir(), draManagerStateFileName) diff --git a/pkg/kubelet/cm/dra/plugin/plugin_test.go b/pkg/kubelet/cm/dra/plugin/plugin_test.go index 396c4439388..95f10083702 100644 --- a/pkg/kubelet/cm/dra/plugin/plugin_test.go +++ b/pkg/kubelet/cm/dra/plugin/plugin_test.go @@ -136,7 +136,7 @@ func TestGRPCConnIsReused(t *testing.T) { // ensure the plugin we are using is registered draPlugins.add(p) - defer draPlugins.delete(pluginName) + defer draPlugins.remove(pluginName, addr) // we call `NodePrepareResource` 2 times and check whether a new connection is created or the same is reused for i := 0; i < 2; i++ { @@ -210,7 +210,7 @@ func TestNewDRAPluginClient(t *testing.T) { setup: func(name string) tearDown { draPlugins.add(&Plugin{name: name}) return func() { - draPlugins.delete(name) + draPlugins.remove(name, "") } }, pluginName: "dummy-plugin", @@ -298,7 +298,7 @@ func TestGRPCMethods(t *testing.T) { } draPlugins.add(p) - defer draPlugins.delete(pluginName) + defer draPlugins.remove(pluginName, addr) client, err := NewDRAPluginClient(pluginName) if err != nil { diff --git a/pkg/kubelet/cm/dra/plugin/plugins_store.go b/pkg/kubelet/cm/dra/plugin/plugins_store.go index d2f36c17849..cc2fc240b5b 100644 --- a/pkg/kubelet/cm/dra/plugin/plugins_store.go +++ b/pkg/kubelet/cm/dra/plugin/plugins_store.go @@ -18,13 +18,16 @@ package plugin import ( "errors" + "fmt" + "slices" "sync" ) // PluginsStore holds a list of DRA Plugins. type pluginsStore struct { sync.RWMutex - store map[string]*Plugin + // plugin name -> Plugin in the order in which they got added + store map[string][]*Plugin } // draPlugins map keeps track of all registered DRA plugins on the node @@ -37,43 +40,57 @@ func (s *pluginsStore) get(pluginName string) *Plugin { s.RLock() defer s.RUnlock() - return s.store[pluginName] + instances := s.store[pluginName] + if len(instances) == 0 { + return nil + } + // Heuristic: pick the most recent one. It's most likely + // the newest, except when kubelet got restarted and registered + // all running plugins in random order. + return instances[len(instances)-1] } // Set lets you save a DRA Plugin to the list and give it a specific name. // This method is protected by a mutex. -func (s *pluginsStore) add(p *Plugin) (replacedPlugin *Plugin, replaced bool) { +func (s *pluginsStore) add(p *Plugin) error { s.Lock() defer s.Unlock() if s.store == nil { - s.store = make(map[string]*Plugin) + s.store = make(map[string][]*Plugin) } - - replacedPlugin, exists := s.store[p.name] - s.store[p.name] = p - - if replacedPlugin != nil && replacedPlugin.cancel != nil { - replacedPlugin.cancel(errors.New("plugin got replaced")) + for _, oldP := range s.store[p.name] { + if oldP.endpoint == p.endpoint { + // One plugin instance cannot hijack the endpoint of another instance. + return fmt.Errorf("endpoint %s already registered for plugin %s", p.endpoint, p.name) + } } - - return replacedPlugin, exists + s.store[p.name] = append(s.store[p.name], p) + return nil } -// Delete lets you delete a DRA Plugin by name. -// This method is protected by a mutex. -func (s *pluginsStore) delete(pluginName string) *Plugin { +// remove lets you remove one endpoint for a DRA Plugin. +// This method is protected by a mutex. It returns the +// plugin if found and true if that was the last instance +func (s *pluginsStore) remove(pluginName, endpoint string) (*Plugin, bool) { s.Lock() defer s.Unlock() - p, exists := s.store[pluginName] - if !exists { - return nil + instances := s.store[pluginName] + i := slices.IndexFunc(instances, func(p *Plugin) bool { return p.endpoint == endpoint }) + if i == -1 { + return nil, false } + p := instances[i] + last := len(instances) == 1 + if last { + delete(s.store, pluginName) + } else { + s.store[pluginName] = slices.Delete(instances, i, i+1) + } + if p.cancel != nil { p.cancel(errors.New("plugin got removed")) } - delete(s.store, pluginName) - - return p + return p, last } diff --git a/pkg/kubelet/cm/dra/plugin/plugins_store_test.go b/pkg/kubelet/cm/dra/plugin/plugins_store_test.go index b4d95abf30e..2550aaa3dd1 100644 --- a/pkg/kubelet/cm/dra/plugin/plugins_store_test.go +++ b/pkg/kubelet/cm/dra/plugin/plugins_store_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestAddSameName(t *testing.T) { @@ -30,26 +31,35 @@ func TestAddSameName(t *testing.T) { firstWasCancelled := false p := &Plugin{ - name: pluginName, - cancel: func(err error) { firstWasCancelled = true }, + name: pluginName, + endpoint: "old", + cancel: func(err error) { firstWasCancelled = true }, } // ensure the plugin we are using is registered - draPlugins.add(p) - defer draPlugins.delete(p.name) + require.NoError(t, draPlugins.add(p)) + defer draPlugins.remove(p.name, p.endpoint) assert.False(t, firstWasCancelled, "should not cancel context after the first call") + // Same name, same endpoint -> error. + require.Error(t, draPlugins.add(p)) + secondWasCancelled := false p2 := &Plugin{ - name: pluginName, - cancel: func(err error) { secondWasCancelled = true }, + name: pluginName, + endpoint: "new", + cancel: func(err error) { secondWasCancelled = true }, } + require.NoError(t, draPlugins.add(p2)) + defer draPlugins.remove(p2.name, p2.endpoint) - draPlugins.add(p2) - defer draPlugins.delete(p2.name) + assert.False(t, firstWasCancelled, "should not cancel context after registering the second instance") + assert.False(t, secondWasCancelled, "should not cancel context of a new plugin") - assert.True(t, firstWasCancelled, "should cancel context after the second call") + // Remove old plugin. + draPlugins.remove(p.name, p.endpoint) + assert.True(t, firstWasCancelled, "should have canceled context after the explicit removal") assert.False(t, secondWasCancelled, "should not cancel context of a new plugin") } @@ -65,7 +75,7 @@ func TestDelete(t *testing.T) { // ensure the plugin we are using is registered draPlugins.add(p) - draPlugins.delete(p.name) + draPlugins.remove(p.name, "") assert.True(t, wasCancelled, "should cancel context after the second call") } diff --git a/pkg/kubelet/cm/dra/plugin/registration.go b/pkg/kubelet/cm/dra/plugin/registration.go index 2af7db5bc3b..1d95e582e0a 100644 --- a/pkg/kubelet/cm/dra/plugin/registration.go +++ b/pkg/kubelet/cm/dra/plugin/registration.go @@ -178,10 +178,10 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, // into all log output related to the plugin. ctx := h.backgroundCtx logger := klog.FromContext(ctx) - logger = klog.LoggerWithValues(logger, "pluginName", pluginName) + logger = klog.LoggerWithValues(logger, "pluginName", pluginName, "endpoint", endpoint) ctx = klog.NewContext(ctx, logger) - logger.V(3).Info("Register new DRA plugin", "endpoint", endpoint) + logger.V(3).Info("Register new DRA plugin") chosenService, err := h.validateSupportedServices(pluginName, supportedServices) if err != nil { @@ -209,9 +209,10 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, // Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key // all other DRA components will be able to get the actual socket of DRA plugins by its name. - - if oldPlugin, replaced := draPlugins.add(pluginInstance); replaced { - logger.V(1).Info("DRA plugin already registered, the old plugin was replaced and will be forgotten by the kubelet till the next kubelet restart", "oldEndpoint", oldPlugin.endpoint) + if err := draPlugins.add(pluginInstance); err != nil { + cancel(err) + // No wrapping, the error already contains details. + return err } // Now cancel any pending ResourceSlice wiping for this plugin. @@ -259,10 +260,14 @@ func (h *RegistrationHandler) validateSupportedServices(pluginName string, suppo // DeRegisterPlugin is called when a plugin has removed its socket, // signaling it is no longer available. -func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) { - if p := draPlugins.delete(pluginName); p != nil { +func (h *RegistrationHandler) DeRegisterPlugin(pluginName, endpoint string) { + if p, last := draPlugins.remove(pluginName, endpoint); p != nil { + // This logger includes endpoint and pluginName. logger := klog.FromContext(p.backgroundCtx) - logger.V(3).Info("Deregister DRA plugin", "endpoint", p.endpoint) + logger.V(3).Info("Deregister DRA plugin", "lastInstance", last) + if !last { + return + } // Prepare for canceling the background wiping. This needs to run // in the context of the registration handler, the one from diff --git a/pkg/kubelet/cm/dra/plugin/registration_test.go b/pkg/kubelet/cm/dra/plugin/registration_test.go index 4920fdf34ec..59f3099ab65 100644 --- a/pkg/kubelet/cm/dra/plugin/registration_test.go +++ b/pkg/kubelet/cm/dra/plugin/registration_test.go @@ -176,6 +176,10 @@ func TestRegistrationHandler(t *testing.T) { // Simulate one existing plugin A. err := handler.RegisterPlugin(pluginA, endpointA, []string{drapb.DRAPluginService}, nil) require.NoError(t, err) + t.Cleanup(func() { + tCtx.Logf("Removing plugin %s", pluginA) + handler.DeRegisterPlugin(pluginA, endpointA) + }) err = handler.ValidatePlugin(test.pluginName, test.endpoint, test.supportedServices) if test.shouldError { @@ -206,9 +210,10 @@ func TestRegistrationHandler(t *testing.T) { assert.NoError(t, err, "recreate slice") } - handler.DeRegisterPlugin(test.pluginName) + tCtx.Logf("Removing plugin %s", test.pluginName) + handler.DeRegisterPlugin(test.pluginName, test.endpoint) // Nop. - handler.DeRegisterPlugin(test.pluginName) + handler.DeRegisterPlugin(test.pluginName, test.endpoint) requireNoSlices() }) diff --git a/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go b/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go index 50b5658af04..cdaa42c0a3a 100644 --- a/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go @@ -89,6 +89,7 @@ type PluginInfo struct { UUID types.UID Handler PluginHandler Name string + Endpoint string } func (asw *actualStateOfWorld) AddPlugin(pluginInfo PluginInfo) error { diff --git a/pkg/kubelet/pluginmanager/cache/types.go b/pkg/kubelet/pluginmanager/cache/types.go index 0656bc90646..bcffd117ef2 100644 --- a/pkg/kubelet/pluginmanager/cache/types.go +++ b/pkg/kubelet/pluginmanager/cache/types.go @@ -56,5 +56,5 @@ type PluginHandler interface { RegisterPlugin(pluginName, endpoint string, versions []string, pluginClientTimeout *time.Duration) error // DeRegisterPlugin is called once the pluginwatcher observes that the socket has // been deleted. - DeRegisterPlugin(pluginName string) + DeRegisterPlugin(pluginName, endpoint string) } diff --git a/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go b/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go index 310961c2e64..7b6fbba5b50 100644 --- a/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go +++ b/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go @@ -118,6 +118,7 @@ func (og *operationGenerator) GenerateRegisterPluginFunc( UUID: pluginUUID, Handler: handler, Name: infoResp.Name, + Endpoint: infoResp.Endpoint, }) if err != nil { klog.ErrorS(err, "RegisterPlugin error -- failed to add plugin", "path", socketPath) @@ -147,7 +148,7 @@ func (og *operationGenerator) GenerateUnregisterPluginFunc( // so that if we receive a register event during Register Plugin, we can process it as a Register call. actualStateOfWorldUpdater.RemovePlugin(pluginInfo.SocketPath) - pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name) + pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name, pluginInfo.Endpoint) klog.V(4).InfoS("DeRegisterPlugin called", "pluginName", pluginInfo.Name, "pluginHandler", pluginInfo.Handler) return nil diff --git a/pkg/kubelet/pluginmanager/plugin_manager_test.go b/pkg/kubelet/pluginmanager/plugin_manager_test.go index 407f821977e..99aa22ad7ae 100644 --- a/pkg/kubelet/pluginmanager/plugin_manager_test.go +++ b/pkg/kubelet/pluginmanager/plugin_manager_test.go @@ -54,7 +54,7 @@ func newFakePluginHandler() *fakePluginHandler { func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error { f.Lock() defer f.Unlock() - f.events = append(f.events, "validate "+pluginName) + f.events = append(f.events, "validate "+pluginName+" "+endpoint) return nil } @@ -62,15 +62,15 @@ func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, v func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions []string, pluginClientTimeout *time.Duration) error { f.Lock() defer f.Unlock() - f.events = append(f.events, "register "+pluginName) + f.events = append(f.events, "register "+pluginName+" "+endpoint) return nil } // DeRegisterPlugin is a fake method -func (f *fakePluginHandler) DeRegisterPlugin(pluginName string) { +func (f *fakePluginHandler) DeRegisterPlugin(pluginName, endpoint string) { f.Lock() defer f.Unlock() - f.events = append(f.events, "deregister "+pluginName) + f.events = append(f.events, "deregister "+pluginName+" "+endpoint) } func (f *fakePluginHandler) Reset() { @@ -93,8 +93,24 @@ func cleanup(t *testing.T) { os.MkdirAll(socketDir, 0755) } -func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName string) { - expected := []string{"validate " + pluginName, "register " + pluginName} +func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName, endpoint string) { + t.Helper() + waitFor(t, fakePluginHandler, + []string{"validate " + pluginName + " " + endpoint, "register " + pluginName + " " + endpoint}, + "Timed out waiting for plugin to be added to actual state of world cache.", + ) +} + +func waitForDeRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName, endpoint string) { + t.Helper() + waitFor(t, fakePluginHandler, + []string{"deregister " + pluginName + " " + endpoint}, + "Timed out waiting for plugin to be removed from actual state of the world cache.", + ) +} + +func waitFor(t *testing.T, fakePluginHandler *fakePluginHandler, expected []string, what string) { + t.Helper() err := retryWithExponentialBackOff( 100*time.Millisecond, func() (bool, error) { @@ -108,7 +124,7 @@ func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, plu }, ) if err != nil { - t.Fatalf("Timed out waiting for plugin to be added to actual state of world cache.") + t.Fatal(what) } } @@ -122,7 +138,7 @@ func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.Conditio return wait.ExponentialBackoff(backoff, fn) } -func TestPluginRegistration(t *testing.T) { +func TestPluginManager(t *testing.T) { defer cleanup(t) pluginManager := newTestPluginManager(socketDir) @@ -157,7 +173,12 @@ func TestPluginRegistration(t *testing.T) { require.NoError(t, p.Serve("v1beta1", "v1beta2")) // Verify that the plugin is registered - waitForRegistration(t, fakeHandler, pluginName) + waitForRegistration(t, fakeHandler, pluginName, socketPath) + + // And unregister. + fakeHandler.Reset() + require.NoError(t, p.Stop()) + waitForDeRegistration(t, fakeHandler, pluginName, socketPath) } } diff --git a/pkg/kubelet/pluginmanager/pluginwatcher/README.md b/pkg/kubelet/pluginmanager/pluginwatcher/README.md index 5de09c14f9c..87280fa93c7 100644 --- a/pkg/kubelet/pluginmanager/pluginwatcher/README.md +++ b/pkg/kubelet/pluginmanager/pluginwatcher/README.md @@ -28,8 +28,7 @@ To avoid downtime of a plugin on a node, it would be nice to support running an old plugin in parallel to the new plugin. When deploying with a DaemonSet, setting `maxSurge` to a value larger than zero enables such a seamless upgrade. -**Warning**: Such a seamless upgrade **is not** supported at the moment. This -section merely describes what would have to be changed to make it work. +**Warning**: Such a seamless upgrade is only supported for DRA at the moment. ### In a plugin @@ -65,7 +64,7 @@ isn't perfect because after a kubelet restart, plugin instances get registered in a random order. Restarting the kubelet in the middle of an upgrade should be rare. -At the moment, none of the existing handlers support such seamless upgrades: +At the moment, the following handlers do not support such seamless upgrades: - The device plugin handler suffers from temporarily removing the extended resources during an upgrade. A proposed fix is pending in @@ -78,7 +77,9 @@ At the moment, none of the existing handlers support such seamless upgrades: from the csi-node-registrar as supported version, so this version selection mechanism isn't used at all. -- The DRA handler only remembers the most recently registered instance. +This supports it: + +- DRA ### Deployment diff --git a/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go b/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go index 41d8b46f4cd..68e72ddcaae 100644 --- a/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go @@ -132,7 +132,7 @@ func (d *DummyImpl) RegisterPlugin(pluginName string, endpoint string, versions } // DeRegisterPlugin is a dummy implementation -func (d *DummyImpl) DeRegisterPlugin(pluginName string) { +func (d *DummyImpl) DeRegisterPlugin(pluginName, endpoint string) { } // Calls Run() diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 59fa6245b7f..ceefa3ff7c1 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -187,8 +187,8 @@ func (h *RegistrationHandler) validateVersions(callerName, pluginName string, en // DeRegisterPlugin is called when a plugin removed its socket, signaling // it is no longer available -func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) { - klog.Info(log("registrationHandler.DeRegisterPlugin request for plugin %s", pluginName)) +func (h *RegistrationHandler) DeRegisterPlugin(pluginName, endpoint string) { + klog.Info(log("registrationHandler.DeRegisterPlugin request for plugin %s, endpoint %s", pluginName, endpoint)) if err := unregisterDriver(pluginName); err != nil { klog.Error(log("registrationHandler.DeRegisterPlugin failed: %v", err)) }