DRA kubelet: support rolling upgrades

The key difference is that the kubelet must remember all plugin instances
because it could always happen that the new instance dies and leaves only the
old one running.

The endpoints of each instance must be different. Registering a plugin with the
same endpoint as some other instance is not supported and triggers an error,
which should get reported as "not registered" to the plugin. This should only
happen when the kubelet missed some unregistration event and re-registers the
same instance again. The recovery in this case is for the plugin to shut down,
remove its socket, which should get observed by kubelet, and then try again
after a restart.
This commit is contained in:
Patrick Ohly 2025-01-28 16:20:07 +01:00
parent 760903c0de
commit b471c2c11f
14 changed files with 128 additions and 68 deletions

View File

@ -43,8 +43,8 @@ func (s *server) RegisterPlugin(pluginName string, endpoint string, versions []s
return s.connectClient(pluginName, endpoint)
}
func (s *server) DeRegisterPlugin(pluginName string) {
klog.V(2).InfoS("Deregistering plugin", "plugin", pluginName)
func (s *server) DeRegisterPlugin(pluginName, endpoint string) {
klog.V(2).InfoS("Deregistering plugin", "plugin", pluginName, "endpoint", endpoint)
client := s.getClient(pluginName)
if client != nil {
s.disconnectClient(pluginName, client)
@ -88,7 +88,6 @@ func (s *server) disconnectClient(name string, c Client) error {
s.deregisterClient(name)
return c.Disconnect()
}
func (s *server) registerClient(name string, c Client) {
s.mutex.Lock()
defer s.mutex.Unlock()

View File

@ -584,7 +584,7 @@ func TestPrepareResources(t *testing.T) {
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil {
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
}
defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests
defer plg.DeRegisterPlugin(test.driverName, draServerInfo.socketName) // for sake of next tests
if test.claimInfo != nil {
manager.cache.add(test.claimInfo)
@ -721,7 +721,7 @@ func TestUnprepareResources(t *testing.T) {
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil {
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
}
defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests
defer plg.DeRegisterPlugin(test.driverName, draServerInfo.socketName) // for sake of next tests
manager := &ManagerImpl{
kubeClient: fakeKubeClient,
@ -891,7 +891,7 @@ func TestParallelPrepareUnprepareResources(t *testing.T) {
if err := plg.RegisterPlugin(driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, nil); err != nil {
t.Fatalf("failed to register plugin %s, err: %v", driverName, err)
}
defer plg.DeRegisterPlugin(driverName)
defer plg.DeRegisterPlugin(driverName, draServerInfo.socketName)
// Create ClaimInfo cache
cache, err := newClaimInfoCache(t.TempDir(), draManagerStateFileName)

View File

@ -136,7 +136,7 @@ func TestGRPCConnIsReused(t *testing.T) {
// ensure the plugin we are using is registered
draPlugins.add(p)
defer draPlugins.delete(pluginName)
defer draPlugins.remove(pluginName, addr)
// we call `NodePrepareResource` 2 times and check whether a new connection is created or the same is reused
for i := 0; i < 2; i++ {
@ -210,7 +210,7 @@ func TestNewDRAPluginClient(t *testing.T) {
setup: func(name string) tearDown {
draPlugins.add(&Plugin{name: name})
return func() {
draPlugins.delete(name)
draPlugins.remove(name, "")
}
},
pluginName: "dummy-plugin",
@ -298,7 +298,7 @@ func TestGRPCMethods(t *testing.T) {
}
draPlugins.add(p)
defer draPlugins.delete(pluginName)
defer draPlugins.remove(pluginName, addr)
client, err := NewDRAPluginClient(pluginName)
if err != nil {

View File

@ -18,13 +18,16 @@ package plugin
import (
"errors"
"fmt"
"slices"
"sync"
)
// PluginsStore holds a list of DRA Plugins.
type pluginsStore struct {
sync.RWMutex
store map[string]*Plugin
// plugin name -> Plugin in the order in which they got added
store map[string][]*Plugin
}
// draPlugins map keeps track of all registered DRA plugins on the node
@ -37,43 +40,57 @@ func (s *pluginsStore) get(pluginName string) *Plugin {
s.RLock()
defer s.RUnlock()
return s.store[pluginName]
instances := s.store[pluginName]
if len(instances) == 0 {
return nil
}
// Heuristic: pick the most recent one. It's most likely
// the newest, except when kubelet got restarted and registered
// all running plugins in random order.
return instances[len(instances)-1]
}
// Set lets you save a DRA Plugin to the list and give it a specific name.
// This method is protected by a mutex.
func (s *pluginsStore) add(p *Plugin) (replacedPlugin *Plugin, replaced bool) {
func (s *pluginsStore) add(p *Plugin) error {
s.Lock()
defer s.Unlock()
if s.store == nil {
s.store = make(map[string]*Plugin)
s.store = make(map[string][]*Plugin)
}
replacedPlugin, exists := s.store[p.name]
s.store[p.name] = p
if replacedPlugin != nil && replacedPlugin.cancel != nil {
replacedPlugin.cancel(errors.New("plugin got replaced"))
for _, oldP := range s.store[p.name] {
if oldP.endpoint == p.endpoint {
// One plugin instance cannot hijack the endpoint of another instance.
return fmt.Errorf("endpoint %s already registered for plugin %s", p.endpoint, p.name)
}
}
return replacedPlugin, exists
s.store[p.name] = append(s.store[p.name], p)
return nil
}
// Delete lets you delete a DRA Plugin by name.
// This method is protected by a mutex.
func (s *pluginsStore) delete(pluginName string) *Plugin {
// remove lets you remove one endpoint for a DRA Plugin.
// This method is protected by a mutex. It returns the
// plugin if found and true if that was the last instance
func (s *pluginsStore) remove(pluginName, endpoint string) (*Plugin, bool) {
s.Lock()
defer s.Unlock()
p, exists := s.store[pluginName]
if !exists {
return nil
instances := s.store[pluginName]
i := slices.IndexFunc(instances, func(p *Plugin) bool { return p.endpoint == endpoint })
if i == -1 {
return nil, false
}
p := instances[i]
last := len(instances) == 1
if last {
delete(s.store, pluginName)
} else {
s.store[pluginName] = slices.Delete(instances, i, i+1)
}
if p.cancel != nil {
p.cancel(errors.New("plugin got removed"))
}
delete(s.store, pluginName)
return p
return p, last
}

View File

@ -22,6 +22,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestAddSameName(t *testing.T) {
@ -30,26 +31,35 @@ func TestAddSameName(t *testing.T) {
firstWasCancelled := false
p := &Plugin{
name: pluginName,
cancel: func(err error) { firstWasCancelled = true },
name: pluginName,
endpoint: "old",
cancel: func(err error) { firstWasCancelled = true },
}
// ensure the plugin we are using is registered
draPlugins.add(p)
defer draPlugins.delete(p.name)
require.NoError(t, draPlugins.add(p))
defer draPlugins.remove(p.name, p.endpoint)
assert.False(t, firstWasCancelled, "should not cancel context after the first call")
// Same name, same endpoint -> error.
require.Error(t, draPlugins.add(p))
secondWasCancelled := false
p2 := &Plugin{
name: pluginName,
cancel: func(err error) { secondWasCancelled = true },
name: pluginName,
endpoint: "new",
cancel: func(err error) { secondWasCancelled = true },
}
require.NoError(t, draPlugins.add(p2))
defer draPlugins.remove(p2.name, p2.endpoint)
draPlugins.add(p2)
defer draPlugins.delete(p2.name)
assert.False(t, firstWasCancelled, "should not cancel context after registering the second instance")
assert.False(t, secondWasCancelled, "should not cancel context of a new plugin")
assert.True(t, firstWasCancelled, "should cancel context after the second call")
// Remove old plugin.
draPlugins.remove(p.name, p.endpoint)
assert.True(t, firstWasCancelled, "should have canceled context after the explicit removal")
assert.False(t, secondWasCancelled, "should not cancel context of a new plugin")
}
@ -65,7 +75,7 @@ func TestDelete(t *testing.T) {
// ensure the plugin we are using is registered
draPlugins.add(p)
draPlugins.delete(p.name)
draPlugins.remove(p.name, "")
assert.True(t, wasCancelled, "should cancel context after the second call")
}

View File

@ -178,10 +178,10 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string,
// into all log output related to the plugin.
ctx := h.backgroundCtx
logger := klog.FromContext(ctx)
logger = klog.LoggerWithValues(logger, "pluginName", pluginName)
logger = klog.LoggerWithValues(logger, "pluginName", pluginName, "endpoint", endpoint)
ctx = klog.NewContext(ctx, logger)
logger.V(3).Info("Register new DRA plugin", "endpoint", endpoint)
logger.V(3).Info("Register new DRA plugin")
chosenService, err := h.validateSupportedServices(pluginName, supportedServices)
if err != nil {
@ -209,9 +209,10 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string,
// Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key
// all other DRA components will be able to get the actual socket of DRA plugins by its name.
if oldPlugin, replaced := draPlugins.add(pluginInstance); replaced {
logger.V(1).Info("DRA plugin already registered, the old plugin was replaced and will be forgotten by the kubelet till the next kubelet restart", "oldEndpoint", oldPlugin.endpoint)
if err := draPlugins.add(pluginInstance); err != nil {
cancel(err)
// No wrapping, the error already contains details.
return err
}
// Now cancel any pending ResourceSlice wiping for this plugin.
@ -259,10 +260,14 @@ func (h *RegistrationHandler) validateSupportedServices(pluginName string, suppo
// DeRegisterPlugin is called when a plugin has removed its socket,
// signaling it is no longer available.
func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
if p := draPlugins.delete(pluginName); p != nil {
func (h *RegistrationHandler) DeRegisterPlugin(pluginName, endpoint string) {
if p, last := draPlugins.remove(pluginName, endpoint); p != nil {
// This logger includes endpoint and pluginName.
logger := klog.FromContext(p.backgroundCtx)
logger.V(3).Info("Deregister DRA plugin", "endpoint", p.endpoint)
logger.V(3).Info("Deregister DRA plugin", "lastInstance", last)
if !last {
return
}
// Prepare for canceling the background wiping. This needs to run
// in the context of the registration handler, the one from

View File

@ -176,6 +176,10 @@ func TestRegistrationHandler(t *testing.T) {
// Simulate one existing plugin A.
err := handler.RegisterPlugin(pluginA, endpointA, []string{drapb.DRAPluginService}, nil)
require.NoError(t, err)
t.Cleanup(func() {
tCtx.Logf("Removing plugin %s", pluginA)
handler.DeRegisterPlugin(pluginA, endpointA)
})
err = handler.ValidatePlugin(test.pluginName, test.endpoint, test.supportedServices)
if test.shouldError {
@ -206,9 +210,10 @@ func TestRegistrationHandler(t *testing.T) {
assert.NoError(t, err, "recreate slice")
}
handler.DeRegisterPlugin(test.pluginName)
tCtx.Logf("Removing plugin %s", test.pluginName)
handler.DeRegisterPlugin(test.pluginName, test.endpoint)
// Nop.
handler.DeRegisterPlugin(test.pluginName)
handler.DeRegisterPlugin(test.pluginName, test.endpoint)
requireNoSlices()
})

View File

@ -89,6 +89,7 @@ type PluginInfo struct {
UUID types.UID
Handler PluginHandler
Name string
Endpoint string
}
func (asw *actualStateOfWorld) AddPlugin(pluginInfo PluginInfo) error {

View File

@ -56,5 +56,5 @@ type PluginHandler interface {
RegisterPlugin(pluginName, endpoint string, versions []string, pluginClientTimeout *time.Duration) error
// DeRegisterPlugin is called once the pluginwatcher observes that the socket has
// been deleted.
DeRegisterPlugin(pluginName string)
DeRegisterPlugin(pluginName, endpoint string)
}

View File

@ -118,6 +118,7 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
UUID: pluginUUID,
Handler: handler,
Name: infoResp.Name,
Endpoint: infoResp.Endpoint,
})
if err != nil {
klog.ErrorS(err, "RegisterPlugin error -- failed to add plugin", "path", socketPath)
@ -147,7 +148,7 @@ func (og *operationGenerator) GenerateUnregisterPluginFunc(
// so that if we receive a register event during Register Plugin, we can process it as a Register call.
actualStateOfWorldUpdater.RemovePlugin(pluginInfo.SocketPath)
pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name)
pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name, pluginInfo.Endpoint)
klog.V(4).InfoS("DeRegisterPlugin called", "pluginName", pluginInfo.Name, "pluginHandler", pluginInfo.Handler)
return nil

View File

@ -54,7 +54,7 @@ func newFakePluginHandler() *fakePluginHandler {
func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
f.Lock()
defer f.Unlock()
f.events = append(f.events, "validate "+pluginName)
f.events = append(f.events, "validate "+pluginName+" "+endpoint)
return nil
}
@ -62,15 +62,15 @@ func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, v
func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions []string, pluginClientTimeout *time.Duration) error {
f.Lock()
defer f.Unlock()
f.events = append(f.events, "register "+pluginName)
f.events = append(f.events, "register "+pluginName+" "+endpoint)
return nil
}
// DeRegisterPlugin is a fake method
func (f *fakePluginHandler) DeRegisterPlugin(pluginName string) {
func (f *fakePluginHandler) DeRegisterPlugin(pluginName, endpoint string) {
f.Lock()
defer f.Unlock()
f.events = append(f.events, "deregister "+pluginName)
f.events = append(f.events, "deregister "+pluginName+" "+endpoint)
}
func (f *fakePluginHandler) Reset() {
@ -93,8 +93,24 @@ func cleanup(t *testing.T) {
os.MkdirAll(socketDir, 0755)
}
func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName string) {
expected := []string{"validate " + pluginName, "register " + pluginName}
func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName, endpoint string) {
t.Helper()
waitFor(t, fakePluginHandler,
[]string{"validate " + pluginName + " " + endpoint, "register " + pluginName + " " + endpoint},
"Timed out waiting for plugin to be added to actual state of world cache.",
)
}
func waitForDeRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName, endpoint string) {
t.Helper()
waitFor(t, fakePluginHandler,
[]string{"deregister " + pluginName + " " + endpoint},
"Timed out waiting for plugin to be removed from actual state of the world cache.",
)
}
func waitFor(t *testing.T, fakePluginHandler *fakePluginHandler, expected []string, what string) {
t.Helper()
err := retryWithExponentialBackOff(
100*time.Millisecond,
func() (bool, error) {
@ -108,7 +124,7 @@ func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, plu
},
)
if err != nil {
t.Fatalf("Timed out waiting for plugin to be added to actual state of world cache.")
t.Fatal(what)
}
}
@ -122,7 +138,7 @@ func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.Conditio
return wait.ExponentialBackoff(backoff, fn)
}
func TestPluginRegistration(t *testing.T) {
func TestPluginManager(t *testing.T) {
defer cleanup(t)
pluginManager := newTestPluginManager(socketDir)
@ -157,7 +173,12 @@ func TestPluginRegistration(t *testing.T) {
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
// Verify that the plugin is registered
waitForRegistration(t, fakeHandler, pluginName)
waitForRegistration(t, fakeHandler, pluginName, socketPath)
// And unregister.
fakeHandler.Reset()
require.NoError(t, p.Stop())
waitForDeRegistration(t, fakeHandler, pluginName, socketPath)
}
}

View File

@ -28,8 +28,7 @@ To avoid downtime of a plugin on a node, it would be nice to support running an
old plugin in parallel to the new plugin. When deploying with a DaemonSet,
setting `maxSurge` to a value larger than zero enables such a seamless upgrade.
**Warning**: Such a seamless upgrade **is not** supported at the moment. This
section merely describes what would have to be changed to make it work.
**Warning**: Such a seamless upgrade is only supported for DRA at the moment.
### In a plugin
@ -65,7 +64,7 @@ isn't perfect because after a kubelet restart, plugin instances get registered
in a random order. Restarting the kubelet in the middle of an upgrade should be
rare.
At the moment, none of the existing handlers support such seamless upgrades:
At the moment, the following handlers do not support such seamless upgrades:
- The device plugin handler suffers from temporarily removing the extended
resources during an upgrade. A proposed fix is pending in
@ -78,7 +77,9 @@ At the moment, none of the existing handlers support such seamless upgrades:
from the csi-node-registrar as supported version, so this version
selection mechanism isn't used at all.
- The DRA handler only remembers the most recently registered instance.
This supports it:
- DRA
### Deployment

View File

@ -132,7 +132,7 @@ func (d *DummyImpl) RegisterPlugin(pluginName string, endpoint string, versions
}
// DeRegisterPlugin is a dummy implementation
func (d *DummyImpl) DeRegisterPlugin(pluginName string) {
func (d *DummyImpl) DeRegisterPlugin(pluginName, endpoint string) {
}
// Calls Run()

View File

@ -187,8 +187,8 @@ func (h *RegistrationHandler) validateVersions(callerName, pluginName string, en
// DeRegisterPlugin is called when a plugin removed its socket, signaling
// it is no longer available
func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
klog.Info(log("registrationHandler.DeRegisterPlugin request for plugin %s", pluginName))
func (h *RegistrationHandler) DeRegisterPlugin(pluginName, endpoint string) {
klog.Info(log("registrationHandler.DeRegisterPlugin request for plugin %s, endpoint %s", pluginName, endpoint))
if err := unregisterDriver(pluginName); err != nil {
klog.Error(log("registrationHandler.DeRegisterPlugin failed: %v", err))
}