diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go index d1bca0cccae..204afd3d1b1 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go @@ -43,8 +43,8 @@ func (s *server) RegisterPlugin(pluginName string, endpoint string, versions []s return s.connectClient(pluginName, endpoint) } -func (s *server) DeRegisterPlugin(pluginName string) { - klog.V(2).InfoS("Deregistering plugin", "plugin", pluginName) +func (s *server) DeRegisterPlugin(pluginName, endpoint string) { + klog.V(2).InfoS("Deregistering plugin", "plugin", pluginName, "endpoint", endpoint) client := s.getClient(pluginName) if client != nil { s.disconnectClient(pluginName, client) @@ -88,7 +88,6 @@ func (s *server) disconnectClient(name string, c Client) error { s.deregisterClient(name) return c.Disconnect() } - func (s *server) registerClient(name string, c Client) { s.mutex.Lock() defer s.mutex.Unlock() diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index 8e965c6b694..26043a0fd28 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -98,7 +98,20 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, n } func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler { - return cache.PluginHandler(dra.NewRegistrationHandler(m.kubeClient, m.getNode)) + // The time that DRA drivers have to come back after being unregistered + // before the kubelet removes their ResourceSlices. + // + // This must be long enough to actually allow stopping a pod and + // starting the replacement (otherwise ResourceSlices get deleted + // unnecessarily) and not too long (otherwise the time window were + // pods might still get scheduled to the node after removal of a + // driver is too long). + // + // 30 seconds might be long enough for a simple container restart. + // If a DRA driver wants to be sure that slices don't get wiped, + // it should use rolling updates. + wipingDelay := 30 * time.Second + return cache.PluginHandler(dra.NewRegistrationHandler(m.kubeClient, m.getNode, wipingDelay)) } // Start starts the reconcile loop of the manager. diff --git a/pkg/kubelet/cm/dra/manager_test.go b/pkg/kubelet/cm/dra/manager_test.go index bc2448690a8..f65ac87575a 100644 --- a/pkg/kubelet/cm/dra/manager_test.go +++ b/pkg/kubelet/cm/dra/manager_test.go @@ -580,11 +580,11 @@ func TestPrepareResources(t *testing.T) { } defer draServerInfo.teardownFn() - plg := plugin.NewRegistrationHandler(nil, getFakeNode) + plg := plugin.NewRegistrationHandler(nil, getFakeNode, time.Second /* very short wiping delay for testing */) if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil { t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err) } - defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests + defer plg.DeRegisterPlugin(test.driverName, draServerInfo.socketName) // for sake of next tests if test.claimInfo != nil { manager.cache.add(test.claimInfo) @@ -717,11 +717,11 @@ func TestUnprepareResources(t *testing.T) { } defer draServerInfo.teardownFn() - plg := plugin.NewRegistrationHandler(nil, getFakeNode) + plg := plugin.NewRegistrationHandler(nil, getFakeNode, time.Second /* very short wiping delay for testing */) if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil { t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err) } - defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests + defer plg.DeRegisterPlugin(test.driverName, draServerInfo.socketName) // for sake of next tests manager := &ManagerImpl{ kubeClient: fakeKubeClient, @@ -887,11 +887,11 @@ func TestParallelPrepareUnprepareResources(t *testing.T) { } defer draServerInfo.teardownFn() - plg := plugin.NewRegistrationHandler(nil, getFakeNode) + plg := plugin.NewRegistrationHandler(nil, getFakeNode, time.Second /* very short wiping delay for testing */) if err := plg.RegisterPlugin(driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, nil); err != nil { t.Fatalf("failed to register plugin %s, err: %v", driverName, err) } - defer plg.DeRegisterPlugin(driverName) + defer plg.DeRegisterPlugin(driverName, draServerInfo.socketName) // Create ClaimInfo cache cache, err := newClaimInfoCache(t.TempDir(), draManagerStateFileName) diff --git a/pkg/kubelet/cm/dra/plugin/plugin_test.go b/pkg/kubelet/cm/dra/plugin/plugin_test.go index 396c4439388..95f10083702 100644 --- a/pkg/kubelet/cm/dra/plugin/plugin_test.go +++ b/pkg/kubelet/cm/dra/plugin/plugin_test.go @@ -136,7 +136,7 @@ func TestGRPCConnIsReused(t *testing.T) { // ensure the plugin we are using is registered draPlugins.add(p) - defer draPlugins.delete(pluginName) + defer draPlugins.remove(pluginName, addr) // we call `NodePrepareResource` 2 times and check whether a new connection is created or the same is reused for i := 0; i < 2; i++ { @@ -210,7 +210,7 @@ func TestNewDRAPluginClient(t *testing.T) { setup: func(name string) tearDown { draPlugins.add(&Plugin{name: name}) return func() { - draPlugins.delete(name) + draPlugins.remove(name, "") } }, pluginName: "dummy-plugin", @@ -298,7 +298,7 @@ func TestGRPCMethods(t *testing.T) { } draPlugins.add(p) - defer draPlugins.delete(pluginName) + defer draPlugins.remove(pluginName, addr) client, err := NewDRAPluginClient(pluginName) if err != nil { diff --git a/pkg/kubelet/cm/dra/plugin/plugins_store.go b/pkg/kubelet/cm/dra/plugin/plugins_store.go index d2f36c17849..cc2fc240b5b 100644 --- a/pkg/kubelet/cm/dra/plugin/plugins_store.go +++ b/pkg/kubelet/cm/dra/plugin/plugins_store.go @@ -18,13 +18,16 @@ package plugin import ( "errors" + "fmt" + "slices" "sync" ) // PluginsStore holds a list of DRA Plugins. type pluginsStore struct { sync.RWMutex - store map[string]*Plugin + // plugin name -> Plugin in the order in which they got added + store map[string][]*Plugin } // draPlugins map keeps track of all registered DRA plugins on the node @@ -37,43 +40,57 @@ func (s *pluginsStore) get(pluginName string) *Plugin { s.RLock() defer s.RUnlock() - return s.store[pluginName] + instances := s.store[pluginName] + if len(instances) == 0 { + return nil + } + // Heuristic: pick the most recent one. It's most likely + // the newest, except when kubelet got restarted and registered + // all running plugins in random order. + return instances[len(instances)-1] } // Set lets you save a DRA Plugin to the list and give it a specific name. // This method is protected by a mutex. -func (s *pluginsStore) add(p *Plugin) (replacedPlugin *Plugin, replaced bool) { +func (s *pluginsStore) add(p *Plugin) error { s.Lock() defer s.Unlock() if s.store == nil { - s.store = make(map[string]*Plugin) + s.store = make(map[string][]*Plugin) } - - replacedPlugin, exists := s.store[p.name] - s.store[p.name] = p - - if replacedPlugin != nil && replacedPlugin.cancel != nil { - replacedPlugin.cancel(errors.New("plugin got replaced")) + for _, oldP := range s.store[p.name] { + if oldP.endpoint == p.endpoint { + // One plugin instance cannot hijack the endpoint of another instance. + return fmt.Errorf("endpoint %s already registered for plugin %s", p.endpoint, p.name) + } } - - return replacedPlugin, exists + s.store[p.name] = append(s.store[p.name], p) + return nil } -// Delete lets you delete a DRA Plugin by name. -// This method is protected by a mutex. -func (s *pluginsStore) delete(pluginName string) *Plugin { +// remove lets you remove one endpoint for a DRA Plugin. +// This method is protected by a mutex. It returns the +// plugin if found and true if that was the last instance +func (s *pluginsStore) remove(pluginName, endpoint string) (*Plugin, bool) { s.Lock() defer s.Unlock() - p, exists := s.store[pluginName] - if !exists { - return nil + instances := s.store[pluginName] + i := slices.IndexFunc(instances, func(p *Plugin) bool { return p.endpoint == endpoint }) + if i == -1 { + return nil, false } + p := instances[i] + last := len(instances) == 1 + if last { + delete(s.store, pluginName) + } else { + s.store[pluginName] = slices.Delete(instances, i, i+1) + } + if p.cancel != nil { p.cancel(errors.New("plugin got removed")) } - delete(s.store, pluginName) - - return p + return p, last } diff --git a/pkg/kubelet/cm/dra/plugin/plugins_store_test.go b/pkg/kubelet/cm/dra/plugin/plugins_store_test.go index b4d95abf30e..2550aaa3dd1 100644 --- a/pkg/kubelet/cm/dra/plugin/plugins_store_test.go +++ b/pkg/kubelet/cm/dra/plugin/plugins_store_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestAddSameName(t *testing.T) { @@ -30,26 +31,35 @@ func TestAddSameName(t *testing.T) { firstWasCancelled := false p := &Plugin{ - name: pluginName, - cancel: func(err error) { firstWasCancelled = true }, + name: pluginName, + endpoint: "old", + cancel: func(err error) { firstWasCancelled = true }, } // ensure the plugin we are using is registered - draPlugins.add(p) - defer draPlugins.delete(p.name) + require.NoError(t, draPlugins.add(p)) + defer draPlugins.remove(p.name, p.endpoint) assert.False(t, firstWasCancelled, "should not cancel context after the first call") + // Same name, same endpoint -> error. + require.Error(t, draPlugins.add(p)) + secondWasCancelled := false p2 := &Plugin{ - name: pluginName, - cancel: func(err error) { secondWasCancelled = true }, + name: pluginName, + endpoint: "new", + cancel: func(err error) { secondWasCancelled = true }, } + require.NoError(t, draPlugins.add(p2)) + defer draPlugins.remove(p2.name, p2.endpoint) - draPlugins.add(p2) - defer draPlugins.delete(p2.name) + assert.False(t, firstWasCancelled, "should not cancel context after registering the second instance") + assert.False(t, secondWasCancelled, "should not cancel context of a new plugin") - assert.True(t, firstWasCancelled, "should cancel context after the second call") + // Remove old plugin. + draPlugins.remove(p.name, p.endpoint) + assert.True(t, firstWasCancelled, "should have canceled context after the explicit removal") assert.False(t, secondWasCancelled, "should not cancel context of a new plugin") } @@ -65,7 +75,7 @@ func TestDelete(t *testing.T) { // ensure the plugin we are using is registered draPlugins.add(p) - draPlugins.delete(p.name) + draPlugins.remove(p.name, "") assert.True(t, wasCancelled, "should cancel context after the second call") } diff --git a/pkg/kubelet/cm/dra/plugin/registration.go b/pkg/kubelet/cm/dra/plugin/registration.go index 23ddbb73a70..1d95e582e0a 100644 --- a/pkg/kubelet/cm/dra/plugin/registration.go +++ b/pkg/kubelet/cm/dra/plugin/registration.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "slices" + "sync" "time" v1 "k8s.io/api/core/v1" @@ -53,6 +54,18 @@ type RegistrationHandler struct { backgroundCtx context.Context kubeClient kubernetes.Interface getNode func() (*v1.Node, error) + wipingDelay time.Duration + + mutex sync.Mutex + + // pendingWipes maps a plugin name to a cancel function for + // wiping of that plugin's ResourceSlices. Entries get added + // in DeRegisterPlugin and check in RegisterPlugin. If + // wiping is pending during RegisterPlugin, it gets canceled. + // + // Must use pointers to functions because the entries have to + // be comparable. + pendingWipes map[string]*context.CancelCauseFunc } var _ cache.PluginHandler = &RegistrationHandler{} @@ -62,12 +75,14 @@ var _ cache.PluginHandler = &RegistrationHandler{} // Must only be called once per process because it manages global state. // If a kubeClient is provided, then it synchronizes ResourceSlices // with the resource information provided by plugins. -func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1.Node, error)) *RegistrationHandler { +func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1.Node, error), wipingDelay time.Duration) *RegistrationHandler { handler := &RegistrationHandler{ // The context and thus logger should come from the caller. backgroundCtx: klog.NewContext(context.TODO(), klog.LoggerWithName(klog.TODO(), "DRA registration handler")), kubeClient: kubeClient, getNode: getNode, + wipingDelay: wipingDelay, + pendingWipes: make(map[string]*context.CancelCauseFunc), } // When kubelet starts up, no DRA driver has registered yet. None of @@ -77,19 +92,34 @@ func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1 // to start up. // // This has to run in the background. - go handler.wipeResourceSlices("") + logger := klog.LoggerWithName(klog.FromContext(handler.backgroundCtx), "startup") + ctx := klog.NewContext(handler.backgroundCtx, logger) + go handler.wipeResourceSlices(ctx, 0 /* no delay */, "" /* all drivers */) return handler } // wipeResourceSlices deletes ResourceSlices of the node, optionally just for a specific driver. -func (h *RegistrationHandler) wipeResourceSlices(driver string) { +// Wiping will delay for a while and can be canceled by canceling the context. +func (h *RegistrationHandler) wipeResourceSlices(ctx context.Context, delay time.Duration, driver string) { if h.kubeClient == nil { return } - ctx := h.backgroundCtx logger := klog.FromContext(ctx) + if delay != 0 { + // Before we start deleting, give the driver time to bounce back. + // Perhaps it got removed as part of a DaemonSet update and the + // replacement pod is about to start. + logger.V(4).Info("Starting to wait before wiping ResourceSlices", "delay", delay) + select { + case <-ctx.Done(): + logger.V(4).Info("Aborting wiping of ResourceSlices", "reason", context.Cause(ctx)) + case <-time.After(delay): + logger.V(4).Info("Starting to wipe ResourceSlices after waiting", "delay", delay) + } + } + backoff := wait.Backoff{ Duration: time.Second, Factor: 2, @@ -148,10 +178,10 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, // into all log output related to the plugin. ctx := h.backgroundCtx logger := klog.FromContext(ctx) - logger = klog.LoggerWithValues(logger, "pluginName", pluginName) + logger = klog.LoggerWithValues(logger, "pluginName", pluginName, "endpoint", endpoint) ctx = klog.NewContext(ctx, logger) - logger.V(3).Info("Register new DRA plugin", "endpoint", endpoint) + logger.V(3).Info("Register new DRA plugin") chosenService, err := h.validateSupportedServices(pluginName, supportedServices) if err != nil { @@ -179,9 +209,19 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, // Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key // all other DRA components will be able to get the actual socket of DRA plugins by its name. + if err := draPlugins.add(pluginInstance); err != nil { + cancel(err) + // No wrapping, the error already contains details. + return err + } - if oldPlugin, replaced := draPlugins.add(pluginInstance); replaced { - logger.V(1).Info("DRA plugin already registered, the old plugin was replaced and will be forgotten by the kubelet till the next kubelet restart", "oldEndpoint", oldPlugin.endpoint) + // Now cancel any pending ResourceSlice wiping for this plugin. + // Only needs to be done once. + h.mutex.Lock() + defer h.mutex.Unlock() + if cancel := h.pendingWipes[pluginName]; cancel != nil { + (*cancel)(errors.New("new plugin instance registered")) + delete(h.pendingWipes, pluginName) } return nil @@ -220,16 +260,51 @@ func (h *RegistrationHandler) validateSupportedServices(pluginName string, suppo // DeRegisterPlugin is called when a plugin has removed its socket, // signaling it is no longer available. -func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) { - if p := draPlugins.delete(pluginName); p != nil { +func (h *RegistrationHandler) DeRegisterPlugin(pluginName, endpoint string) { + if p, last := draPlugins.remove(pluginName, endpoint); p != nil { + // This logger includes endpoint and pluginName. logger := klog.FromContext(p.backgroundCtx) - logger.V(3).Info("Deregister DRA plugin", "endpoint", p.endpoint) + logger.V(3).Info("Deregister DRA plugin", "lastInstance", last) + if !last { + return + } + + // Prepare for canceling the background wiping. This needs to run + // in the context of the registration handler, the one from + // the plugin is canceled. + logger = klog.FromContext(h.backgroundCtx) + logger = klog.LoggerWithName(logger, "driver-cleanup") + logger = klog.LoggerWithValues(logger, "pluginName", pluginName) + ctx, cancel := context.WithCancelCause(h.backgroundCtx) + ctx = klog.NewContext(ctx, logger) // Clean up the ResourceSlices for the deleted Plugin since it // may have died without doing so itself and might never come // back. - go h.wipeResourceSlices(pluginName) + // + // May get canceled if the plugin comes back quickly enough + // (see RegisterPlugin). + h.mutex.Lock() + defer h.mutex.Unlock() + if cancel := h.pendingWipes[pluginName]; cancel != nil { + (*cancel)(errors.New("plugin deregistered a second time")) + } + h.pendingWipes[pluginName] = &cancel + go func() { + defer func() { + h.mutex.Lock() + defer h.mutex.Unlock() + + // Cancel our own context, but remove it from the map only if it + // is the current entry. Perhaps it already got replaced. + cancel(errors.New("wiping done")) + if h.pendingWipes[pluginName] == &cancel { + delete(h.pendingWipes, pluginName) + } + }() + h.wipeResourceSlices(ctx, h.wipingDelay, pluginName) + }() return } diff --git a/pkg/kubelet/cm/dra/plugin/registration_test.go b/pkg/kubelet/cm/dra/plugin/registration_test.go index 949682f6aab..59f3099ab65 100644 --- a/pkg/kubelet/cm/dra/plugin/registration_test.go +++ b/pkg/kubelet/cm/dra/plugin/registration_test.go @@ -157,7 +157,7 @@ func TestRegistrationHandler(t *testing.T) { } // The handler wipes all slices at startup. - handler := NewRegistrationHandler(client, getFakeNode) + handler := NewRegistrationHandler(client, getFakeNode, time.Second /* very short wiping delay for testing */) requireNoSlices := func() { t.Helper() if client == nil { @@ -176,6 +176,10 @@ func TestRegistrationHandler(t *testing.T) { // Simulate one existing plugin A. err := handler.RegisterPlugin(pluginA, endpointA, []string{drapb.DRAPluginService}, nil) require.NoError(t, err) + t.Cleanup(func() { + tCtx.Logf("Removing plugin %s", pluginA) + handler.DeRegisterPlugin(pluginA, endpointA) + }) err = handler.ValidatePlugin(test.pluginName, test.endpoint, test.supportedServices) if test.shouldError { @@ -206,9 +210,10 @@ func TestRegistrationHandler(t *testing.T) { assert.NoError(t, err, "recreate slice") } - handler.DeRegisterPlugin(test.pluginName) + tCtx.Logf("Removing plugin %s", test.pluginName) + handler.DeRegisterPlugin(test.pluginName, test.endpoint) // Nop. - handler.DeRegisterPlugin(test.pluginName) + handler.DeRegisterPlugin(test.pluginName, test.endpoint) requireNoSlices() }) diff --git a/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go b/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go index 50b5658af04..cdaa42c0a3a 100644 --- a/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go @@ -89,6 +89,7 @@ type PluginInfo struct { UUID types.UID Handler PluginHandler Name string + Endpoint string } func (asw *actualStateOfWorld) AddPlugin(pluginInfo PluginInfo) error { diff --git a/pkg/kubelet/pluginmanager/cache/types.go b/pkg/kubelet/pluginmanager/cache/types.go index 0656bc90646..bcffd117ef2 100644 --- a/pkg/kubelet/pluginmanager/cache/types.go +++ b/pkg/kubelet/pluginmanager/cache/types.go @@ -56,5 +56,5 @@ type PluginHandler interface { RegisterPlugin(pluginName, endpoint string, versions []string, pluginClientTimeout *time.Duration) error // DeRegisterPlugin is called once the pluginwatcher observes that the socket has // been deleted. - DeRegisterPlugin(pluginName string) + DeRegisterPlugin(pluginName, endpoint string) } diff --git a/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go b/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go index 310961c2e64..7b6fbba5b50 100644 --- a/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go +++ b/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go @@ -118,6 +118,7 @@ func (og *operationGenerator) GenerateRegisterPluginFunc( UUID: pluginUUID, Handler: handler, Name: infoResp.Name, + Endpoint: infoResp.Endpoint, }) if err != nil { klog.ErrorS(err, "RegisterPlugin error -- failed to add plugin", "path", socketPath) @@ -147,7 +148,7 @@ func (og *operationGenerator) GenerateUnregisterPluginFunc( // so that if we receive a register event during Register Plugin, we can process it as a Register call. actualStateOfWorldUpdater.RemovePlugin(pluginInfo.SocketPath) - pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name) + pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name, pluginInfo.Endpoint) klog.V(4).InfoS("DeRegisterPlugin called", "pluginName", pluginInfo.Name, "pluginHandler", pluginInfo.Handler) return nil diff --git a/pkg/kubelet/pluginmanager/plugin_manager_test.go b/pkg/kubelet/pluginmanager/plugin_manager_test.go index 407f821977e..99aa22ad7ae 100644 --- a/pkg/kubelet/pluginmanager/plugin_manager_test.go +++ b/pkg/kubelet/pluginmanager/plugin_manager_test.go @@ -54,7 +54,7 @@ func newFakePluginHandler() *fakePluginHandler { func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error { f.Lock() defer f.Unlock() - f.events = append(f.events, "validate "+pluginName) + f.events = append(f.events, "validate "+pluginName+" "+endpoint) return nil } @@ -62,15 +62,15 @@ func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, v func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions []string, pluginClientTimeout *time.Duration) error { f.Lock() defer f.Unlock() - f.events = append(f.events, "register "+pluginName) + f.events = append(f.events, "register "+pluginName+" "+endpoint) return nil } // DeRegisterPlugin is a fake method -func (f *fakePluginHandler) DeRegisterPlugin(pluginName string) { +func (f *fakePluginHandler) DeRegisterPlugin(pluginName, endpoint string) { f.Lock() defer f.Unlock() - f.events = append(f.events, "deregister "+pluginName) + f.events = append(f.events, "deregister "+pluginName+" "+endpoint) } func (f *fakePluginHandler) Reset() { @@ -93,8 +93,24 @@ func cleanup(t *testing.T) { os.MkdirAll(socketDir, 0755) } -func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName string) { - expected := []string{"validate " + pluginName, "register " + pluginName} +func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName, endpoint string) { + t.Helper() + waitFor(t, fakePluginHandler, + []string{"validate " + pluginName + " " + endpoint, "register " + pluginName + " " + endpoint}, + "Timed out waiting for plugin to be added to actual state of world cache.", + ) +} + +func waitForDeRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName, endpoint string) { + t.Helper() + waitFor(t, fakePluginHandler, + []string{"deregister " + pluginName + " " + endpoint}, + "Timed out waiting for plugin to be removed from actual state of the world cache.", + ) +} + +func waitFor(t *testing.T, fakePluginHandler *fakePluginHandler, expected []string, what string) { + t.Helper() err := retryWithExponentialBackOff( 100*time.Millisecond, func() (bool, error) { @@ -108,7 +124,7 @@ func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, plu }, ) if err != nil { - t.Fatalf("Timed out waiting for plugin to be added to actual state of world cache.") + t.Fatal(what) } } @@ -122,7 +138,7 @@ func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.Conditio return wait.ExponentialBackoff(backoff, fn) } -func TestPluginRegistration(t *testing.T) { +func TestPluginManager(t *testing.T) { defer cleanup(t) pluginManager := newTestPluginManager(socketDir) @@ -157,7 +173,12 @@ func TestPluginRegistration(t *testing.T) { require.NoError(t, p.Serve("v1beta1", "v1beta2")) // Verify that the plugin is registered - waitForRegistration(t, fakeHandler, pluginName) + waitForRegistration(t, fakeHandler, pluginName, socketPath) + + // And unregister. + fakeHandler.Reset() + require.NoError(t, p.Stop()) + waitForDeRegistration(t, fakeHandler, pluginName, socketPath) } } diff --git a/pkg/kubelet/pluginmanager/pluginwatcher/README.md b/pkg/kubelet/pluginmanager/pluginwatcher/README.md index aebbfd10254..9403829a2fb 100644 --- a/pkg/kubelet/pluginmanager/pluginwatcher/README.md +++ b/pkg/kubelet/pluginmanager/pluginwatcher/README.md @@ -16,6 +16,92 @@ there. This socket filename should not start with a '.' as it will be ignored. +To avoid conflicts between different plugins, the recommendation is to use +`[-].sock` as filename. `` +should end with a DNS domain that is unique for the plugin. Each time a plugin +starts, it has to delete old sockets if they exist and listen anew under the +same filename. + +## Seamless Upgrade + +To avoid downtime of a plugin on a node, it would be nice to support running an +old plugin in parallel to the new plugin. When deploying with a DaemonSet, +setting `maxSurge` to a value larger than zero enables such a seamless upgrade. + +**Warning**: Such a seamless upgrade is only supported for DRA at the moment. + +### In a plugin + +*Note*: For DRA, the +[k8s.io/dynamic-resource-allocation](https://pkg.go.dev/k8s.io/dynamic-resource-allocation/kubeletplugin) +helper package offers the `RollingUpdate` option which implements the socket +handling as described in this section. + +To support seamless upgrades, each plugin instance must use a unique +socket filename. Otherwise the following could happen: +- The old instance is registered with `plugin.example.com-reg.sock`. +- The new instance starts, unlinks that file, and starts listening on it again. +- In parallel, the kubelet notices the removal and unregisters the plugin + before probing the new instance, thus breaking the seamless upgrade. + +Even if the timing is more favorable and unregistration is avoided, using the +same socket is problematic: if the new instance fails, the kubelet cannot fall +back to the old instance because that old instance is not listening to the +socket that is available under `plugin.example.com-reg.sock`. + +This can be achieved in a DaemonSet by passing the UID of the pod into the pod +through the downward API. New instances may try to clean up stale sockets of +older instances, but have to be absolutely sure that those sockets really +aren't in use anymore. Each instance should catch termination signals and clean +up after itself. Then sockets only leak during abnormal events (power loss, +killing with SIGKILL). + +Last but not least, both plugin instances must be usable in parallel. It is not +predictable which instance the kubelet will use for which request. + +### In the kubelet + +For such a seamless upgrade with different sockets per plugin to work reliably, +the handler for the plugin type must track all registered instances. Then if +one of them fails and gets unregistered, it can fall back to some +other. Picking the most recently registered instance is a good heuristic. This +isn't perfect because after a kubelet restart, plugin instances get registered +in a random order. Restarting the kubelet in the middle of an upgrade should be +rare. + +At the moment, the following handlers do not support such seamless upgrades: + +- The device plugin handler suffers from temporarily removing the extended + resources during an upgrade. A proposed fix is pending in + https://github.com/kubernetes/kubernetes/pull/127821. + +- The CSI handler [tries to determine which instance is newer](https://github.com/kubernetes/kubernetes/blob/7140b4910c6c1179c9778a7f3bb8037356febd58/pkg/volume/csi/csi_plugin.go#L115-L125) based on the supported version(s) and + only remembers that one. If that newest instance fails, there is no fallback. + + In practice, most CSI drivers probably all pass [the hard-coded "1.0.0"](https://github.com/kubernetes-csi/node-driver-registrar/blob/27700e2962cd35b9f2336a156146181e5c75399e/cmd/csi-node-driver-registrar/main.go#L72) + from the csi-node-registrar as supported version, so this version + selection mechanism isn't used at all. + +This supports it: + +- DRA + +### Deployment + +Deploying a plugin with support for seamless upgrades and per-instance socket +filenames is *not* compatible with a kubelet version that does not have support +for seamless upgrades yet. It breaks like this: + +- New instance starts, gets registered and replaces the old one. +- Old instance stops, removing its socket. +- The kubelet notices that, unregisters the plugin. +- The plugin handler removes *the new* instance because it ignores the socket path -> no instance left. + +Plugin authors either have to assume that the cluster has a recent enough +kubelet or rely on labeling nodes with support. Then the plugin can use one +simple DaemonSet for nodes without support and another, more complex one where +`maxSurge` is increased to enable seamless upgrades on nodes which support it. +No such label is specified at the moment. ## gRPC Service Lifecycle diff --git a/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go b/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go index 41d8b46f4cd..68e72ddcaae 100644 --- a/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go @@ -132,7 +132,7 @@ func (d *DummyImpl) RegisterPlugin(pluginName string, endpoint string, versions } // DeRegisterPlugin is a dummy implementation -func (d *DummyImpl) DeRegisterPlugin(pluginName string) { +func (d *DummyImpl) DeRegisterPlugin(pluginName, endpoint string) { } // Calls Run() diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 54c28494754..902f31e9911 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -265,8 +265,8 @@ func (h *RegistrationHandler) validateVersions(callerName, pluginName string, en // DeRegisterPlugin is called when a plugin removed its socket, signaling // it is no longer available -func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) { - klog.Info(log("registrationHandler.DeRegisterPlugin request for plugin %s", pluginName)) +func (h *RegistrationHandler) DeRegisterPlugin(pluginName, endpoint string) { + klog.Info(log("registrationHandler.DeRegisterPlugin request for plugin %s, endpoint %s", pluginName, endpoint)) if err := unregisterDriver(pluginName); err != nil { klog.Error(log("registrationHandler.DeRegisterPlugin failed: %v", err)) } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/go.mod b/staging/src/k8s.io/dynamic-resource-allocation/go.mod index b152fb0bbfb..451a9332d4a 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/go.mod +++ b/staging/src/k8s.io/dynamic-resource-allocation/go.mod @@ -12,6 +12,7 @@ require ( github.com/google/go-cmp v0.7.0 github.com/onsi/gomega v1.35.1 github.com/stretchr/testify v1.10.0 + go.etcd.io/etcd/client/pkg/v3 v3.5.16 google.golang.org/grpc v1.68.1 k8s.io/api v0.0.0 k8s.io/apimachinery v0.0.0 @@ -57,6 +58,8 @@ require ( github.com/x448/float16 v0.8.4 // indirect go.opentelemetry.io/otel v1.33.0 // indirect go.opentelemetry.io/otel/trace v1.33.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/net v0.33.0 // indirect golang.org/x/oauth2 v0.27.0 // indirect diff --git a/staging/src/k8s.io/dynamic-resource-allocation/go.sum b/staging/src/k8s.io/dynamic-resource-allocation/go.sum index 991e9276662..aae134cac79 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/go.sum +++ b/staging/src/k8s.io/dynamic-resource-allocation/go.sum @@ -158,6 +158,7 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I= go.etcd.io/etcd/api/v3 v3.5.16/go.mod h1:1P4SlIP/VwkDmGo3OlOD7faPeP8KDIFhqvciH5EfN28= +go.etcd.io/etcd/client/pkg/v3 v3.5.16 h1:ZgY48uH6UvB+/7R9Yf4x574uCO3jIx0TRDyetSfId3Q= go.etcd.io/etcd/client/pkg/v3 v3.5.16/go.mod h1:V8acl8pcEK0Y2g19YlOV9m9ssUe6MgiDSobSoaBAM0E= go.etcd.io/etcd/client/v2 v2.305.16/go.mod h1:h9YxWCzcdvZENbfzBTFCnoNumr2ax3F19sKMqHFmXHE= go.etcd.io/etcd/client/v3 v3.5.16/go.mod h1:X+rExSGkyqxvu276cr2OwPLBaeqFu1cIl4vmRjAD/50= @@ -176,9 +177,12 @@ go.opentelemetry.io/otel/sdk v1.33.0/go.mod h1:A1Q5oi7/9XaMlIWzPSxLRWOI8nG3FnzHJ go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s= go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck= go.opentelemetry.io/proto/otlp v1.4.0/go.mod h1:PPBWZIP98o2ElSqI35IHfu7hIhSwvc5N38Jw8pXuGFY= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go index 57602cdaee0..92f1af253e1 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go @@ -21,12 +21,14 @@ import ( "errors" "fmt" "net" + "os" "path" "sync" "google.golang.org/grpc" "k8s.io/klog/v2" + "go.etcd.io/etcd/client/pkg/v3/fileutil" resourceapi "k8s.io/api/resource/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -195,8 +197,10 @@ func RegistrarDirectoryPath(path string) Option { // support updates from an installation which used an older release of // of the helper code. // -// The default is -reg.sock. When rolling updates are enabled (not supported yet), +// The default is -reg.sock. When rolling updates are enabled, // it is --reg.sock. +// +// This option and [RollingUpdate] are mutually exclusive. func RegistrarSocketFilename(name string) Option { return func(o *options) error { o.pluginRegistrationEndpoint.file = name @@ -248,6 +252,44 @@ func PluginListener(listen func(ctx context.Context, path string) (net.Listener, } } +// RollingUpdate can be used to enable support for running two plugin instances +// in parallel while a newer instance replaces the older. When enabled, both +// instances must share the same plugin data directory and driver name. +// They create different sockets to allow the kubelet to connect to both at +// the same time. +// +// There is no guarantee which of the two instances are used by kubelet. +// For example, it can happen that a claim gets prepared by one instance +// and then needs to be unprepared by the other. Kubelet then may fall back +// to the first one again for some other operation. In practice this means +// that each instance must be entirely stateless across method calls. +// Serialization (on by default, see [Serialize]) ensures that methods +// are serialized across all instances through file locking. The plugin +// implementation can load shared state from a file at the start +// of a call, execute and then store the updated shared state again. +// +// Passing a non-empty uid enables rolling updates, an empty uid disables it. +// The uid must be the pod UID. A DaemonSet can pass that into the driver container +// via the downward API (https://kubernetes.io/docs/concepts/workloads/pods/downward-api/#downwardapi-fieldRef). +// +// Because new instances cannot remove stale sockets of older instances, +// it is important that each pod shuts down cleanly: it must catch SIGINT/TERM +// and stop the helper instead of quitting immediately. +// +// This depends on support in the kubelet which was added in Kubernetes 1.33. +// Don't use this if it is not certain that the kubelet has that support! +// +// This option and [RegistrarSocketFilename] are mutually exclusive. +func RollingUpdate(uid types.UID) Option { + return func(o *options) error { + o.rollingUpdateUID = uid + + // TODO: ask the kubelet whether that pod is still running and + // clean up leftover sockets? + return nil + } +} + // GRPCInterceptor is called for each incoming gRPC method call. This option // may be used more than once and each interceptor will get called. func GRPCInterceptor(interceptor grpc.UnaryServerInterceptor) Option { @@ -322,6 +364,17 @@ func Serialize(enabled bool) Option { } } +// FlockDir changes where lock files are created and locked. A lock file +// is needed when serializing gRPC calls and rolling updates are enabled. +// The directory must exist and be reserved for exclusive use by the +// driver. The default is the plugin data directory. +func FlockDirectoryPath(path string) Option { + return func(o *options) error { + o.flockDirectoryPath = path + return nil + } +} + type options struct { logger klog.Logger grpcVerbosity int @@ -330,11 +383,13 @@ type options struct { nodeUID types.UID pluginRegistrationEndpoint endpoint pluginDataDirectoryPath string + rollingUpdateUID types.UID draEndpointListen func(ctx context.Context, path string) (net.Listener, error) unaryInterceptors []grpc.UnaryServerInterceptor streamInterceptors []grpc.StreamServerInterceptor kubeClient kubernetes.Interface serialize bool + flockDirectoryPath string nodeV1beta1 bool } @@ -344,17 +399,18 @@ type Helper struct { // backgroundCtx is for activities that are started later. backgroundCtx context.Context // cancel cancels the backgroundCtx. - cancel func(cause error) - wg sync.WaitGroup - registrar *nodeRegistrar - pluginServer *grpcServer - plugin DRAPlugin - driverName string - nodeName string - nodeUID types.UID - kubeClient kubernetes.Interface - serialize bool - grpcMutex sync.Mutex + cancel func(cause error) + wg sync.WaitGroup + registrar *nodeRegistrar + pluginServer *grpcServer + plugin DRAPlugin + driverName string + nodeName string + nodeUID types.UID + kubeClient kubernetes.Interface + serialize bool + grpcMutex sync.Mutex + grpcLockFilePath string // Information about resource publishing changes concurrently and thus // must be protected by the mutex. The controller gets started only @@ -392,12 +448,20 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe if o.driverName == "" { return nil, errors.New("driver name must be set") } + if o.rollingUpdateUID != "" && o.pluginRegistrationEndpoint.file != "" { + return nil, errors.New("rolling updates and explicit registration socket filename are mutually exclusive") + } + uidPart := "" + if o.rollingUpdateUID != "" { + uidPart = "-" + string(o.rollingUpdateUID) + } if o.pluginRegistrationEndpoint.file == "" { - o.pluginRegistrationEndpoint.file = o.driverName + "-reg.sock" + o.pluginRegistrationEndpoint.file = o.driverName + uidPart + "-reg.sock" } if o.pluginDataDirectoryPath == "" { o.pluginDataDirectoryPath = path.Join(KubeletPluginsDir, o.driverName) } + d := &Helper{ driverName: o.driverName, nodeName: o.nodeName, @@ -406,6 +470,14 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe serialize: o.serialize, plugin: plugin, } + if o.rollingUpdateUID != "" { + dir := o.pluginDataDirectoryPath + if o.flockDirectoryPath != "" { + dir = o.flockDirectoryPath + } + // Enable file locking, required for concurrently running pods. + d.grpcLockFilePath = path.Join(dir, "serialize.lock") + } // Stop calls cancel and therefore both cancellation // and Stop cause goroutines to stop. @@ -434,7 +506,7 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe var supportedServices []string draEndpoint := endpoint{ dir: o.pluginDataDirectoryPath, - file: "dra.sock", // "dra" is hard-coded. + file: "dra" + uidPart + ".sock", // "dra" is hard-coded. The directory is unique, so we get a unique full path also without the UID. listenFunc: o.draEndpointListen, } pluginServer, err := startGRPCServer(klog.LoggerWithName(logger, "dra"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, draEndpoint, func(grpcServer *grpc.Server) { @@ -575,12 +647,25 @@ func (d *Helper) RegistrationStatus() *registerapi.RegistrationStatus { // serializeGRPCIfEnabled locks a mutex if serialization is enabled. // Either way it returns a method that the caller must invoke // via defer. -func (d *Helper) serializeGRPCIfEnabled() func() { +func (d *Helper) serializeGRPCIfEnabled() (func(), error) { if !d.serialize { - return func() {} + return func() {}, nil } + + // If rolling updates are enabled, we cannot do only in-memory locking. + // We must use file locking. + if d.grpcLockFilePath != "" { + file, err := fileutil.LockFile(d.grpcLockFilePath, os.O_RDWR|os.O_CREATE, 0666) + if err != nil { + return nil, fmt.Errorf("lock file: %w", err) + } + return func() { + _ = file.Close() + }, nil + } + d.grpcMutex.Lock() - return d.grpcMutex.Unlock + return d.grpcMutex.Unlock, nil } // nodePluginImplementation is a thin wrapper around the helper instance. @@ -597,7 +682,11 @@ func (d *nodePluginImplementation) NodePrepareResources(ctx context.Context, req return nil, fmt.Errorf("get resource claims: %w", err) } - defer d.serializeGRPCIfEnabled()() + unlock, err := d.serializeGRPCIfEnabled() + if err != nil { + return nil, fmt.Errorf("serialize gRPC: %w", err) + } + defer unlock() result, err := d.plugin.PrepareResourceClaims(ctx, claims) if err != nil { @@ -659,7 +748,11 @@ func (d *nodePluginImplementation) getResourceClaims(ctx context.Context, claims // NodeUnprepareResources implements [draapi.NodeUnprepareResources]. func (d *nodePluginImplementation) NodeUnprepareResources(ctx context.Context, req *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) { - defer d.serializeGRPCIfEnabled() + unlock, err := d.serializeGRPCIfEnabled() + if err != nil { + return nil, fmt.Errorf("serialize gRPC: %w", err) + } + defer unlock() claims := make([]NamespacedObject, 0, len(req.Claims)) for _, claim := range req.Claims { diff --git a/test/e2e/dra/deploy.go b/test/e2e/dra/deploy.go index 37fc5226e0e..ba6ef51fa3b 100644 --- a/test/e2e/dra/deploy.go +++ b/test/e2e/dra/deploy.go @@ -66,6 +66,7 @@ import ( e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" "k8s.io/kubernetes/test/e2e/storage/drivers/proxy" "k8s.io/kubernetes/test/e2e/storage/utils" + "k8s.io/utils/ptr" "sigs.k8s.io/yaml" ) @@ -76,6 +77,7 @@ const ( type Nodes struct { NodeNames []string + tempDir string } type Resources struct { @@ -111,6 +113,8 @@ func NewNodesNow(ctx context.Context, f *framework.Framework, minNodes, maxNodes } func (nodes *Nodes) init(ctx context.Context, f *framework.Framework, minNodes, maxNodes int) { + nodes.tempDir = ginkgo.GinkgoT().TempDir() + ginkgo.By("selecting nodes") // The kubelet plugin is harder. We deploy the builtin manifest // after patching in the driver name and all nodes on which we @@ -214,11 +218,16 @@ func (d *Driver) Run(nodes *Nodes, configureResources func() Resources, devicesP // not run on all nodes. resources.Nodes = nodes.NodeNames } - ginkgo.DeferCleanup(d.IsGone) // Register first so it gets called last. d.SetUp(nodes, resources, devicesPerNode...) ginkgo.DeferCleanup(d.TearDown) } +// NewGetSlices generates a function for gomega.Eventually/Consistently which +// returns the ResourceSliceList. +func (d *Driver) NewGetSlices() framework.GetFunc[*resourceapi.ResourceSliceList] { + return framework.ListObjects(d.f.ClientSet.ResourceV1beta1().ResourceSlices().List, metav1.ListOptions{FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + d.Name}) +} + type MethodInstance struct { Nodename string FullMethod string @@ -227,12 +236,26 @@ type MethodInstance struct { type Driver struct { f *framework.Framework ctx context.Context - cleanup []func() // executed first-in-first-out + cleanup []func(context.Context) // executed first-in-first-out wg sync.WaitGroup serviceAccountName string + // NameSuffix can be set while registering a test to deploy different + // drivers in the same test namespace. NameSuffix string - Name string + + // InstanceSuffix can be set while registering a test to deploy two different + // instances of the same driver. Used to generate unique objects in the API server. + // The socket path is still the same. + InstanceSuffix string + + // RollingUpdate can be set to true to enable using different socket names + // for different pods and thus seamless upgrades. Must be supported by the kubelet! + RollingUpdate bool + + // Name gets derived automatically from the current test namespace and + // (if set) the NameSuffix while setting up the driver for a test. + Name string // Nodes contains entries for each node selected for a test when the test runs. // In addition, there is one entry for a fictional node. @@ -263,9 +286,13 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ ctx, cancel := context.WithCancel(context.Background()) logger := klog.FromContext(ctx) logger = klog.LoggerWithValues(logger, "driverName", d.Name) + if d.InstanceSuffix != "" { + instance, _ := strings.CutPrefix(d.InstanceSuffix, "-") + logger = klog.LoggerWithValues(logger, "instance", instance) + } ctx = klog.NewContext(ctx, logger) d.ctx = ctx - d.cleanup = append(d.cleanup, cancel) + d.cleanup = append(d.cleanup, func(context.Context) { cancel() }) if !resources.NodeLocal { // Publish one resource pool with "network-attached" devices. @@ -323,28 +350,31 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ } // Create service account and corresponding RBAC rules. - d.serviceAccountName = "dra-kubelet-plugin-" + d.Name + "-service-account" + d.serviceAccountName = "dra-kubelet-plugin-" + d.Name + d.InstanceSuffix + "-service-account" content := pluginPermissions content = strings.ReplaceAll(content, "dra-kubelet-plugin-namespace", d.f.Namespace.Name) - content = strings.ReplaceAll(content, "dra-kubelet-plugin", "dra-kubelet-plugin-"+d.Name) + content = strings.ReplaceAll(content, "dra-kubelet-plugin", "dra-kubelet-plugin-"+d.Name+d.InstanceSuffix) d.createFromYAML(ctx, []byte(content), d.f.Namespace.Name) + // Using a ReplicaSet instead of a DaemonSet has the advantage that we can control + // the lifecycle explicitly, in particular run two pods per node long enough to + // run checks. instanceKey := "app.kubernetes.io/instance" rsName := "" numNodes := int32(len(nodes.NodeNames)) pluginDataDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name) registrarDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins_registry") - registrarSocketFilename := d.Name + "-reg.sock" + instanceName := d.Name + d.InstanceSuffix err := utils.CreateFromManifests(ctx, d.f, d.f.Namespace, func(item interface{}) error { switch item := item.(type) { case *appsv1.ReplicaSet: - item.Name += d.NameSuffix + item.Name += d.NameSuffix + d.InstanceSuffix rsName = item.Name item.Spec.Replicas = &numNodes - item.Spec.Selector.MatchLabels[instanceKey] = d.Name - item.Spec.Template.Labels[instanceKey] = d.Name + item.Spec.Selector.MatchLabels[instanceKey] = instanceName + item.Spec.Template.Labels[instanceKey] = instanceName item.Spec.Template.Spec.ServiceAccountName = d.serviceAccountName - item.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution[0].LabelSelector.MatchLabels[instanceKey] = d.Name + item.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution[0].LabelSelector.MatchLabels[instanceKey] = instanceName item.Spec.Template.Spec.Affinity.NodeAffinity = &v1.NodeAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ NodeSelectorTerms: []v1.NodeSelectorTerm{ @@ -376,7 +406,7 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ if err := e2ereplicaset.WaitForReplicaSetTargetAvailableReplicas(ctx, d.f.ClientSet, rs, numNodes); err != nil { framework.ExpectNoError(err, "all kubelet plugin proxies running") } - requirement, err := labels.NewRequirement(instanceKey, selection.Equals, []string{d.Name}) + requirement, err := labels.NewRequirement(instanceKey, selection.Equals, []string{instanceName}) framework.ExpectNoError(err, "create label selector requirement") selector := labels.NewSelector().Add(*requirement) pods, err := d.f.ClientSet.CoreV1().Pods(d.f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: selector.String()}) @@ -429,6 +459,14 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ // All listeners running in this pod use a new unique local port number // by atomically incrementing this variable. listenerPort := int32(9000) + rollingUpdateUID := pod.UID + serialize := true + if !d.RollingUpdate { + rollingUpdateUID = "" + // A test might have to execute two gRPC calls in parallel, so only + // serialize when we explicitly want to test a rolling update. + serialize = false + } plugin, err := app.StartPlugin(loggerCtx, "/cdi", d.Name, driverClient, nodename, fileOps, kubeletplugin.GRPCVerbosity(0), kubeletplugin.GRPCInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { @@ -438,17 +476,31 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ return d.streamInterceptor(nodename, srv, ss, info, handler) }), + kubeletplugin.RollingUpdate(rollingUpdateUID), + kubeletplugin.Serialize(serialize), + kubeletplugin.FlockDirectoryPath(nodes.tempDir), + kubeletplugin.PluginDataDirectoryPath(pluginDataDirectoryPath), kubeletplugin.PluginListener(listen(d.f, &pod, &listenerPort)), kubeletplugin.RegistrarDirectoryPath(registrarDirectoryPath), - kubeletplugin.RegistrarSocketFilename(registrarSocketFilename), kubeletplugin.RegistrarListener(listen(d.f, &pod, &listenerPort)), ) framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName) - d.cleanup = append(d.cleanup, func() { + d.cleanup = append(d.cleanup, func(ctx context.Context) { // Depends on cancel being called first. plugin.Stop() + + // Also explicitly stop all pods. + ginkgo.By("scaling down driver proxy pods for " + d.Name) + rs, err := d.f.ClientSet.AppsV1().ReplicaSets(d.f.Namespace.Name).Get(ctx, rsName, metav1.GetOptions{}) + framework.ExpectNoError(err, "get ReplicaSet for driver "+d.Name) + rs.Spec.Replicas = ptr.To(int32(0)) + rs, err = d.f.ClientSet.AppsV1().ReplicaSets(d.f.Namespace.Name).Update(ctx, rs, metav1.UpdateOptions{}) + framework.ExpectNoError(err, "scale down ReplicaSet for driver "+d.Name) + if err := e2ereplicaset.WaitForReplicaSetTargetAvailableReplicas(ctx, d.f.ClientSet, rs, 0); err != nil { + framework.ExpectNoError(err, "all kubelet plugin proxies stopped") + } }) d.Nodes[nodename] = KubeletPlugin{ExamplePlugin: plugin, ClientSet: driverClient} } @@ -717,14 +769,19 @@ func pipe(ctx context.Context, msg string, verbosity int) *io.PipeWriter { return writer } -func (d *Driver) TearDown() { +func (d *Driver) TearDown(ctx context.Context) { for _, c := range d.cleanup { - c() + c(ctx) } d.cleanup = nil d.wg.Wait() } +// IsGone checks that the kubelet is done with the driver. +// This is done by waiting for the kubelet to remove the +// driver's ResourceSlices, which takes at least 5 minutes +// because of the delay in the kubelet. Only use this in slow +// tests... func (d *Driver) IsGone(ctx context.Context) { gomega.Eventually(ctx, func(ctx context.Context) ([]resourceapi.ResourceSlice, error) { slices, err := d.f.ClientSet.ResourceV1beta1().ResourceSlices().List(ctx, metav1.ListOptions{FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + d.Name}) @@ -732,7 +789,7 @@ func (d *Driver) IsGone(ctx context.Context) { return nil, err } return slices.Items, err - }).Should(gomega.BeEmpty()) + }).WithTimeout(7 * time.Minute).Should(gomega.BeEmpty()) } func (d *Driver) interceptor(nodename string, ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go index b93c44a9b1e..091cfdd6fd7 100644 --- a/test/e2e/dra/dra.go +++ b/test/e2e/dra/dra.go @@ -1801,6 +1801,150 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})) framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete)) }) + + ginkgo.It("rolling update", func(ctx context.Context) { + nodes := NewNodesNow(ctx, f, 1, 1) + + oldDriver := NewDriverInstance(f) + oldDriver.InstanceSuffix = "-old" + oldDriver.RollingUpdate = true + oldDriver.Run(nodes, perNode(1, nodes)) + + // We expect one ResourceSlice per node from the driver. + getSlices := oldDriver.NewGetSlices() + gomega.Eventually(ctx, getSlices).Should(gomega.HaveField("Items", gomega.HaveLen(len(nodes.NodeNames)))) + initialSlices, err := getSlices(ctx) + framework.ExpectNoError(err) + + // Same driver name, different socket paths because of rolling update. + newDriver := NewDriverInstance(f) + newDriver.InstanceSuffix = "-new" + newDriver.RollingUpdate = true + newDriver.Run(nodes, perNode(1, nodes)) + + // Stop old driver instance. + oldDriver.TearDown(ctx) + + // Build behaves the same for both driver instances. + b := newBuilderNow(ctx, f, oldDriver) + claim := b.externalClaim() + pod := b.podExternal() + b.create(ctx, claim, pod) + b.testPod(ctx, f, pod) + + // The exact same slices should still exist. + finalSlices, err := getSlices(ctx) + framework.ExpectNoError(err) + gomega.Expect(finalSlices.Items).Should(gomega.Equal(initialSlices.Items)) + + // We need to clean up explicitly because the normal + // cleanup doesn't work (driver shuts down first). + // framework.ExpectNoError(f.ClientSet.ResourceV1beta1().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{})) + framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})) + framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete)) + }) + + ginkgo.It("failed update", func(ctx context.Context) { + nodes := NewNodesNow(ctx, f, 1, 1) + + oldDriver := NewDriverInstance(f) + oldDriver.InstanceSuffix = "-old" + oldDriver.RollingUpdate = true + oldDriver.Run(nodes, perNode(1, nodes)) + + // We expect one ResourceSlice per node from the driver. + getSlices := oldDriver.NewGetSlices() + gomega.Eventually(ctx, getSlices).Should(gomega.HaveField("Items", gomega.HaveLen(len(nodes.NodeNames)))) + initialSlices, err := getSlices(ctx) + framework.ExpectNoError(err) + + // Same driver name, different socket paths because of rolling update. + newDriver := NewDriverInstance(f) + newDriver.InstanceSuffix = "-new" + newDriver.RollingUpdate = true + newDriver.Run(nodes, perNode(1, nodes)) + + // Stop new driver instance, simulating the failure of the new instance. + // The kubelet should still have the old instance. + newDriver.TearDown(ctx) + + // Build behaves the same for both driver instances. + b := newBuilderNow(ctx, f, oldDriver) + claim := b.externalClaim() + pod := b.podExternal() + b.create(ctx, claim, pod) + b.testPod(ctx, f, pod) + + // The exact same slices should still exist. + finalSlices, err := getSlices(ctx) + framework.ExpectNoError(err) + gomega.Expect(finalSlices.Items).Should(gomega.Equal(initialSlices.Items)) + + // We need to clean up explicitly because the normal + // cleanup doesn't work (driver shuts down first). + // framework.ExpectNoError(f.ClientSet.ResourceV1beta1().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{})) + framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})) + framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete)) + }) + + f.It("sequential update with pods replacing each other", framework.WithSlow(), func(ctx context.Context) { + nodes := NewNodesNow(ctx, f, 1, 1) + + // Same driver name, same socket path. + oldDriver := NewDriverInstance(f) + oldDriver.InstanceSuffix = "-old" + oldDriver.Run(nodes, perNode(1, nodes)) + + // Collect set of resource slices for that driver. + listSlices := framework.ListObjects(f.ClientSet.ResourceV1beta1().ResourceSlices().List, metav1.ListOptions{ + FieldSelector: "spec.driver=" + oldDriver.Name, + }) + gomega.Eventually(ctx, listSlices).Should(gomega.HaveField("Items", gomega.Not(gomega.BeEmpty())), "driver should have published ResourceSlices, got none") + oldSlices, err := listSlices(ctx) + framework.ExpectNoError(err, "list slices published by old driver") + if len(oldSlices.Items) == 0 { + framework.Fail("driver should have published ResourceSlices, got none") + } + + // "Update" the driver by taking it down and bringing up a new one. + // Pods never run in parallel, similar to how a DaemonSet would update + // its pods when maxSurge is zero. + ginkgo.By("reinstall driver") + start := time.Now() + oldDriver.TearDown(ctx) + newDriver := NewDriverInstance(f) + newDriver.InstanceSuffix = "-new" + newDriver.Run(nodes, perNode(1, nodes)) + updateDuration := time.Since(start) + + // Build behaves the same for both driver instances. + b := newBuilderNow(ctx, f, oldDriver) + claim := b.externalClaim() + pod := b.podExternal() + b.create(ctx, claim, pod) + b.testPod(ctx, f, pod) + + // The slices should have survived the update, but only if it happened + // quickly enough. If it took too long, the kubelet considered the driver + // gone and removed them. + if updateDuration <= 3*time.Minute { + newSlices, err := listSlices(ctx) + framework.ExpectNoError(err, "list slices again") + gomega.Expect(newSlices.Items).To(gomega.ConsistOf(oldSlices.Items)) + } + + // We need to clean up explicitly because the normal + // cleanup doesn't work (driver shuts down first). + // framework.ExpectNoError(f.ClientSet.ResourceV1beta1().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{})) + framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})) + framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete)) + + // Now shut down for good and wait for the kubelet to react. + // This takes time... + ginkgo.By("uninstalling driver and waiting for ResourceSlice wiping") + newDriver.TearDown(ctx) + newDriver.IsGone(ctx) + }) }) // builder contains a running counter to make objects unique within thir diff --git a/test/e2e/dra/test-driver/app/kubeletplugin.go b/test/e2e/dra/test-driver/app/kubeletplugin.go index 498e02fc715..875fdf886d8 100644 --- a/test/e2e/dra/test-driver/app/kubeletplugin.go +++ b/test/e2e/dra/test-driver/app/kubeletplugin.go @@ -168,7 +168,6 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kube kubeletplugin.KubeClient(kubeClient), kubeletplugin.GRPCInterceptor(ex.recordGRPCCall), kubeletplugin.GRPCStreamInterceptor(ex.recordGRPCStream), - kubeletplugin.Serialize(false), // The example plugin does its own locking. ) d, err := kubeletplugin.Start(ctx, ex, opts...) if err != nil {