From b471c2c11f6cf40da78a3cfa3626449d247b05ad Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 28 Jan 2025 16:20:07 +0100 Subject: [PATCH] DRA kubelet: support rolling upgrades The key difference is that the kubelet must remember all plugin instances because it could always happen that the new instance dies and leaves only the old one running. The endpoints of each instance must be different. Registering a plugin with the same endpoint as some other instance is not supported and triggers an error, which should get reported as "not registered" to the plugin. This should only happen when the kubelet missed some unregistration event and re-registers the same instance again. The recovery in this case is for the plugin to shut down, remove its socket, which should get observed by kubelet, and then try again after a restart. --- .../devicemanager/plugin/v1beta1/handler.go | 5 +- pkg/kubelet/cm/dra/manager_test.go | 6 +- pkg/kubelet/cm/dra/plugin/plugin_test.go | 6 +- pkg/kubelet/cm/dra/plugin/plugins_store.go | 59 ++++++++++++------- .../cm/dra/plugin/plugins_store_test.go | 30 ++++++---- pkg/kubelet/cm/dra/plugin/registration.go | 21 ++++--- .../cm/dra/plugin/registration_test.go | 9 ++- .../cache/actual_state_of_world.go | 1 + pkg/kubelet/pluginmanager/cache/types.go | 2 +- .../operationexecutor/operation_generator.go | 3 +- .../pluginmanager/plugin_manager_test.go | 39 +++++++++--- .../pluginmanager/pluginwatcher/README.md | 9 +-- .../reconciler/reconciler_test.go | 2 +- pkg/volume/csi/csi_plugin.go | 4 +- 14 files changed, 128 insertions(+), 68 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go index d1bca0cccae..204afd3d1b1 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go @@ -43,8 +43,8 @@ func (s *server) RegisterPlugin(pluginName string, endpoint string, versions []s return s.connectClient(pluginName, endpoint) } -func (s *server) DeRegisterPlugin(pluginName string) { - klog.V(2).InfoS("Deregistering plugin", "plugin", pluginName) +func (s *server) DeRegisterPlugin(pluginName, endpoint string) { + klog.V(2).InfoS("Deregistering plugin", "plugin", pluginName, "endpoint", endpoint) client := s.getClient(pluginName) if client != nil { s.disconnectClient(pluginName, client) @@ -88,7 +88,6 @@ func (s *server) disconnectClient(name string, c Client) error { s.deregisterClient(name) return c.Disconnect() } - func (s *server) registerClient(name string, c Client) { s.mutex.Lock() defer s.mutex.Unlock() diff --git a/pkg/kubelet/cm/dra/manager_test.go b/pkg/kubelet/cm/dra/manager_test.go index 908cd735847..f65ac87575a 100644 --- a/pkg/kubelet/cm/dra/manager_test.go +++ b/pkg/kubelet/cm/dra/manager_test.go @@ -584,7 +584,7 @@ func TestPrepareResources(t *testing.T) { if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil { t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err) } - defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests + defer plg.DeRegisterPlugin(test.driverName, draServerInfo.socketName) // for sake of next tests if test.claimInfo != nil { manager.cache.add(test.claimInfo) @@ -721,7 +721,7 @@ func TestUnprepareResources(t *testing.T) { if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil { t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err) } - defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests + defer plg.DeRegisterPlugin(test.driverName, draServerInfo.socketName) // for sake of next tests manager := &ManagerImpl{ kubeClient: fakeKubeClient, @@ -891,7 +891,7 @@ func TestParallelPrepareUnprepareResources(t *testing.T) { if err := plg.RegisterPlugin(driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, nil); err != nil { t.Fatalf("failed to register plugin %s, err: %v", driverName, err) } - defer plg.DeRegisterPlugin(driverName) + defer plg.DeRegisterPlugin(driverName, draServerInfo.socketName) // Create ClaimInfo cache cache, err := newClaimInfoCache(t.TempDir(), draManagerStateFileName) diff --git a/pkg/kubelet/cm/dra/plugin/plugin_test.go b/pkg/kubelet/cm/dra/plugin/plugin_test.go index 396c4439388..95f10083702 100644 --- a/pkg/kubelet/cm/dra/plugin/plugin_test.go +++ b/pkg/kubelet/cm/dra/plugin/plugin_test.go @@ -136,7 +136,7 @@ func TestGRPCConnIsReused(t *testing.T) { // ensure the plugin we are using is registered draPlugins.add(p) - defer draPlugins.delete(pluginName) + defer draPlugins.remove(pluginName, addr) // we call `NodePrepareResource` 2 times and check whether a new connection is created or the same is reused for i := 0; i < 2; i++ { @@ -210,7 +210,7 @@ func TestNewDRAPluginClient(t *testing.T) { setup: func(name string) tearDown { draPlugins.add(&Plugin{name: name}) return func() { - draPlugins.delete(name) + draPlugins.remove(name, "") } }, pluginName: "dummy-plugin", @@ -298,7 +298,7 @@ func TestGRPCMethods(t *testing.T) { } draPlugins.add(p) - defer draPlugins.delete(pluginName) + defer draPlugins.remove(pluginName, addr) client, err := NewDRAPluginClient(pluginName) if err != nil { diff --git a/pkg/kubelet/cm/dra/plugin/plugins_store.go b/pkg/kubelet/cm/dra/plugin/plugins_store.go index d2f36c17849..cc2fc240b5b 100644 --- a/pkg/kubelet/cm/dra/plugin/plugins_store.go +++ b/pkg/kubelet/cm/dra/plugin/plugins_store.go @@ -18,13 +18,16 @@ package plugin import ( "errors" + "fmt" + "slices" "sync" ) // PluginsStore holds a list of DRA Plugins. type pluginsStore struct { sync.RWMutex - store map[string]*Plugin + // plugin name -> Plugin in the order in which they got added + store map[string][]*Plugin } // draPlugins map keeps track of all registered DRA plugins on the node @@ -37,43 +40,57 @@ func (s *pluginsStore) get(pluginName string) *Plugin { s.RLock() defer s.RUnlock() - return s.store[pluginName] + instances := s.store[pluginName] + if len(instances) == 0 { + return nil + } + // Heuristic: pick the most recent one. It's most likely + // the newest, except when kubelet got restarted and registered + // all running plugins in random order. + return instances[len(instances)-1] } // Set lets you save a DRA Plugin to the list and give it a specific name. // This method is protected by a mutex. -func (s *pluginsStore) add(p *Plugin) (replacedPlugin *Plugin, replaced bool) { +func (s *pluginsStore) add(p *Plugin) error { s.Lock() defer s.Unlock() if s.store == nil { - s.store = make(map[string]*Plugin) + s.store = make(map[string][]*Plugin) } - - replacedPlugin, exists := s.store[p.name] - s.store[p.name] = p - - if replacedPlugin != nil && replacedPlugin.cancel != nil { - replacedPlugin.cancel(errors.New("plugin got replaced")) + for _, oldP := range s.store[p.name] { + if oldP.endpoint == p.endpoint { + // One plugin instance cannot hijack the endpoint of another instance. + return fmt.Errorf("endpoint %s already registered for plugin %s", p.endpoint, p.name) + } } - - return replacedPlugin, exists + s.store[p.name] = append(s.store[p.name], p) + return nil } -// Delete lets you delete a DRA Plugin by name. -// This method is protected by a mutex. -func (s *pluginsStore) delete(pluginName string) *Plugin { +// remove lets you remove one endpoint for a DRA Plugin. +// This method is protected by a mutex. It returns the +// plugin if found and true if that was the last instance +func (s *pluginsStore) remove(pluginName, endpoint string) (*Plugin, bool) { s.Lock() defer s.Unlock() - p, exists := s.store[pluginName] - if !exists { - return nil + instances := s.store[pluginName] + i := slices.IndexFunc(instances, func(p *Plugin) bool { return p.endpoint == endpoint }) + if i == -1 { + return nil, false } + p := instances[i] + last := len(instances) == 1 + if last { + delete(s.store, pluginName) + } else { + s.store[pluginName] = slices.Delete(instances, i, i+1) + } + if p.cancel != nil { p.cancel(errors.New("plugin got removed")) } - delete(s.store, pluginName) - - return p + return p, last } diff --git a/pkg/kubelet/cm/dra/plugin/plugins_store_test.go b/pkg/kubelet/cm/dra/plugin/plugins_store_test.go index b4d95abf30e..2550aaa3dd1 100644 --- a/pkg/kubelet/cm/dra/plugin/plugins_store_test.go +++ b/pkg/kubelet/cm/dra/plugin/plugins_store_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestAddSameName(t *testing.T) { @@ -30,26 +31,35 @@ func TestAddSameName(t *testing.T) { firstWasCancelled := false p := &Plugin{ - name: pluginName, - cancel: func(err error) { firstWasCancelled = true }, + name: pluginName, + endpoint: "old", + cancel: func(err error) { firstWasCancelled = true }, } // ensure the plugin we are using is registered - draPlugins.add(p) - defer draPlugins.delete(p.name) + require.NoError(t, draPlugins.add(p)) + defer draPlugins.remove(p.name, p.endpoint) assert.False(t, firstWasCancelled, "should not cancel context after the first call") + // Same name, same endpoint -> error. + require.Error(t, draPlugins.add(p)) + secondWasCancelled := false p2 := &Plugin{ - name: pluginName, - cancel: func(err error) { secondWasCancelled = true }, + name: pluginName, + endpoint: "new", + cancel: func(err error) { secondWasCancelled = true }, } + require.NoError(t, draPlugins.add(p2)) + defer draPlugins.remove(p2.name, p2.endpoint) - draPlugins.add(p2) - defer draPlugins.delete(p2.name) + assert.False(t, firstWasCancelled, "should not cancel context after registering the second instance") + assert.False(t, secondWasCancelled, "should not cancel context of a new plugin") - assert.True(t, firstWasCancelled, "should cancel context after the second call") + // Remove old plugin. + draPlugins.remove(p.name, p.endpoint) + assert.True(t, firstWasCancelled, "should have canceled context after the explicit removal") assert.False(t, secondWasCancelled, "should not cancel context of a new plugin") } @@ -65,7 +75,7 @@ func TestDelete(t *testing.T) { // ensure the plugin we are using is registered draPlugins.add(p) - draPlugins.delete(p.name) + draPlugins.remove(p.name, "") assert.True(t, wasCancelled, "should cancel context after the second call") } diff --git a/pkg/kubelet/cm/dra/plugin/registration.go b/pkg/kubelet/cm/dra/plugin/registration.go index 2af7db5bc3b..1d95e582e0a 100644 --- a/pkg/kubelet/cm/dra/plugin/registration.go +++ b/pkg/kubelet/cm/dra/plugin/registration.go @@ -178,10 +178,10 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, // into all log output related to the plugin. ctx := h.backgroundCtx logger := klog.FromContext(ctx) - logger = klog.LoggerWithValues(logger, "pluginName", pluginName) + logger = klog.LoggerWithValues(logger, "pluginName", pluginName, "endpoint", endpoint) ctx = klog.NewContext(ctx, logger) - logger.V(3).Info("Register new DRA plugin", "endpoint", endpoint) + logger.V(3).Info("Register new DRA plugin") chosenService, err := h.validateSupportedServices(pluginName, supportedServices) if err != nil { @@ -209,9 +209,10 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, // Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key // all other DRA components will be able to get the actual socket of DRA plugins by its name. - - if oldPlugin, replaced := draPlugins.add(pluginInstance); replaced { - logger.V(1).Info("DRA plugin already registered, the old plugin was replaced and will be forgotten by the kubelet till the next kubelet restart", "oldEndpoint", oldPlugin.endpoint) + if err := draPlugins.add(pluginInstance); err != nil { + cancel(err) + // No wrapping, the error already contains details. + return err } // Now cancel any pending ResourceSlice wiping for this plugin. @@ -259,10 +260,14 @@ func (h *RegistrationHandler) validateSupportedServices(pluginName string, suppo // DeRegisterPlugin is called when a plugin has removed its socket, // signaling it is no longer available. -func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) { - if p := draPlugins.delete(pluginName); p != nil { +func (h *RegistrationHandler) DeRegisterPlugin(pluginName, endpoint string) { + if p, last := draPlugins.remove(pluginName, endpoint); p != nil { + // This logger includes endpoint and pluginName. logger := klog.FromContext(p.backgroundCtx) - logger.V(3).Info("Deregister DRA plugin", "endpoint", p.endpoint) + logger.V(3).Info("Deregister DRA plugin", "lastInstance", last) + if !last { + return + } // Prepare for canceling the background wiping. This needs to run // in the context of the registration handler, the one from diff --git a/pkg/kubelet/cm/dra/plugin/registration_test.go b/pkg/kubelet/cm/dra/plugin/registration_test.go index 4920fdf34ec..59f3099ab65 100644 --- a/pkg/kubelet/cm/dra/plugin/registration_test.go +++ b/pkg/kubelet/cm/dra/plugin/registration_test.go @@ -176,6 +176,10 @@ func TestRegistrationHandler(t *testing.T) { // Simulate one existing plugin A. err := handler.RegisterPlugin(pluginA, endpointA, []string{drapb.DRAPluginService}, nil) require.NoError(t, err) + t.Cleanup(func() { + tCtx.Logf("Removing plugin %s", pluginA) + handler.DeRegisterPlugin(pluginA, endpointA) + }) err = handler.ValidatePlugin(test.pluginName, test.endpoint, test.supportedServices) if test.shouldError { @@ -206,9 +210,10 @@ func TestRegistrationHandler(t *testing.T) { assert.NoError(t, err, "recreate slice") } - handler.DeRegisterPlugin(test.pluginName) + tCtx.Logf("Removing plugin %s", test.pluginName) + handler.DeRegisterPlugin(test.pluginName, test.endpoint) // Nop. - handler.DeRegisterPlugin(test.pluginName) + handler.DeRegisterPlugin(test.pluginName, test.endpoint) requireNoSlices() }) diff --git a/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go b/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go index 50b5658af04..cdaa42c0a3a 100644 --- a/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go @@ -89,6 +89,7 @@ type PluginInfo struct { UUID types.UID Handler PluginHandler Name string + Endpoint string } func (asw *actualStateOfWorld) AddPlugin(pluginInfo PluginInfo) error { diff --git a/pkg/kubelet/pluginmanager/cache/types.go b/pkg/kubelet/pluginmanager/cache/types.go index 0656bc90646..bcffd117ef2 100644 --- a/pkg/kubelet/pluginmanager/cache/types.go +++ b/pkg/kubelet/pluginmanager/cache/types.go @@ -56,5 +56,5 @@ type PluginHandler interface { RegisterPlugin(pluginName, endpoint string, versions []string, pluginClientTimeout *time.Duration) error // DeRegisterPlugin is called once the pluginwatcher observes that the socket has // been deleted. - DeRegisterPlugin(pluginName string) + DeRegisterPlugin(pluginName, endpoint string) } diff --git a/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go b/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go index 310961c2e64..7b6fbba5b50 100644 --- a/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go +++ b/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go @@ -118,6 +118,7 @@ func (og *operationGenerator) GenerateRegisterPluginFunc( UUID: pluginUUID, Handler: handler, Name: infoResp.Name, + Endpoint: infoResp.Endpoint, }) if err != nil { klog.ErrorS(err, "RegisterPlugin error -- failed to add plugin", "path", socketPath) @@ -147,7 +148,7 @@ func (og *operationGenerator) GenerateUnregisterPluginFunc( // so that if we receive a register event during Register Plugin, we can process it as a Register call. actualStateOfWorldUpdater.RemovePlugin(pluginInfo.SocketPath) - pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name) + pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name, pluginInfo.Endpoint) klog.V(4).InfoS("DeRegisterPlugin called", "pluginName", pluginInfo.Name, "pluginHandler", pluginInfo.Handler) return nil diff --git a/pkg/kubelet/pluginmanager/plugin_manager_test.go b/pkg/kubelet/pluginmanager/plugin_manager_test.go index 407f821977e..99aa22ad7ae 100644 --- a/pkg/kubelet/pluginmanager/plugin_manager_test.go +++ b/pkg/kubelet/pluginmanager/plugin_manager_test.go @@ -54,7 +54,7 @@ func newFakePluginHandler() *fakePluginHandler { func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error { f.Lock() defer f.Unlock() - f.events = append(f.events, "validate "+pluginName) + f.events = append(f.events, "validate "+pluginName+" "+endpoint) return nil } @@ -62,15 +62,15 @@ func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, v func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions []string, pluginClientTimeout *time.Duration) error { f.Lock() defer f.Unlock() - f.events = append(f.events, "register "+pluginName) + f.events = append(f.events, "register "+pluginName+" "+endpoint) return nil } // DeRegisterPlugin is a fake method -func (f *fakePluginHandler) DeRegisterPlugin(pluginName string) { +func (f *fakePluginHandler) DeRegisterPlugin(pluginName, endpoint string) { f.Lock() defer f.Unlock() - f.events = append(f.events, "deregister "+pluginName) + f.events = append(f.events, "deregister "+pluginName+" "+endpoint) } func (f *fakePluginHandler) Reset() { @@ -93,8 +93,24 @@ func cleanup(t *testing.T) { os.MkdirAll(socketDir, 0755) } -func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName string) { - expected := []string{"validate " + pluginName, "register " + pluginName} +func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName, endpoint string) { + t.Helper() + waitFor(t, fakePluginHandler, + []string{"validate " + pluginName + " " + endpoint, "register " + pluginName + " " + endpoint}, + "Timed out waiting for plugin to be added to actual state of world cache.", + ) +} + +func waitForDeRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName, endpoint string) { + t.Helper() + waitFor(t, fakePluginHandler, + []string{"deregister " + pluginName + " " + endpoint}, + "Timed out waiting for plugin to be removed from actual state of the world cache.", + ) +} + +func waitFor(t *testing.T, fakePluginHandler *fakePluginHandler, expected []string, what string) { + t.Helper() err := retryWithExponentialBackOff( 100*time.Millisecond, func() (bool, error) { @@ -108,7 +124,7 @@ func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, plu }, ) if err != nil { - t.Fatalf("Timed out waiting for plugin to be added to actual state of world cache.") + t.Fatal(what) } } @@ -122,7 +138,7 @@ func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.Conditio return wait.ExponentialBackoff(backoff, fn) } -func TestPluginRegistration(t *testing.T) { +func TestPluginManager(t *testing.T) { defer cleanup(t) pluginManager := newTestPluginManager(socketDir) @@ -157,7 +173,12 @@ func TestPluginRegistration(t *testing.T) { require.NoError(t, p.Serve("v1beta1", "v1beta2")) // Verify that the plugin is registered - waitForRegistration(t, fakeHandler, pluginName) + waitForRegistration(t, fakeHandler, pluginName, socketPath) + + // And unregister. + fakeHandler.Reset() + require.NoError(t, p.Stop()) + waitForDeRegistration(t, fakeHandler, pluginName, socketPath) } } diff --git a/pkg/kubelet/pluginmanager/pluginwatcher/README.md b/pkg/kubelet/pluginmanager/pluginwatcher/README.md index 5de09c14f9c..87280fa93c7 100644 --- a/pkg/kubelet/pluginmanager/pluginwatcher/README.md +++ b/pkg/kubelet/pluginmanager/pluginwatcher/README.md @@ -28,8 +28,7 @@ To avoid downtime of a plugin on a node, it would be nice to support running an old plugin in parallel to the new plugin. When deploying with a DaemonSet, setting `maxSurge` to a value larger than zero enables such a seamless upgrade. -**Warning**: Such a seamless upgrade **is not** supported at the moment. This -section merely describes what would have to be changed to make it work. +**Warning**: Such a seamless upgrade is only supported for DRA at the moment. ### In a plugin @@ -65,7 +64,7 @@ isn't perfect because after a kubelet restart, plugin instances get registered in a random order. Restarting the kubelet in the middle of an upgrade should be rare. -At the moment, none of the existing handlers support such seamless upgrades: +At the moment, the following handlers do not support such seamless upgrades: - The device plugin handler suffers from temporarily removing the extended resources during an upgrade. A proposed fix is pending in @@ -78,7 +77,9 @@ At the moment, none of the existing handlers support such seamless upgrades: from the csi-node-registrar as supported version, so this version selection mechanism isn't used at all. -- The DRA handler only remembers the most recently registered instance. +This supports it: + +- DRA ### Deployment diff --git a/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go b/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go index 41d8b46f4cd..68e72ddcaae 100644 --- a/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go @@ -132,7 +132,7 @@ func (d *DummyImpl) RegisterPlugin(pluginName string, endpoint string, versions } // DeRegisterPlugin is a dummy implementation -func (d *DummyImpl) DeRegisterPlugin(pluginName string) { +func (d *DummyImpl) DeRegisterPlugin(pluginName, endpoint string) { } // Calls Run() diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 59fa6245b7f..ceefa3ff7c1 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -187,8 +187,8 @@ func (h *RegistrationHandler) validateVersions(callerName, pluginName string, en // DeRegisterPlugin is called when a plugin removed its socket, signaling // it is no longer available -func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) { - klog.Info(log("registrationHandler.DeRegisterPlugin request for plugin %s", pluginName)) +func (h *RegistrationHandler) DeRegisterPlugin(pluginName, endpoint string) { + klog.Info(log("registrationHandler.DeRegisterPlugin request for plugin %s, endpoint %s", pluginName, endpoint)) if err := unregisterDriver(pluginName); err != nil { klog.Error(log("registrationHandler.DeRegisterPlugin failed: %v", err)) }